#!/usr/bin/python # -*- coding: utf-8 -*- """ StreamProxy daemon (based on Livestream daemon) Ensures persistent cookies, User-Agents and others tricks to play protected HLS/DASH streams """ import os import sys import time import atexit import re import binascii from signal import SIGTERM from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from SocketServer import ThreadingMixIn from urllib import unquote, quote import urllib,urlparse #import cookielib,urllib2 import requests try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except: pass HOST_NAME = "" PORT_NUMBER = 8880 DEBUG = True DEBUG2 = False SPLIT_CHAR = "~" SPLIT_CODE = "%7E" EQ_CODE = "%3D" COL_CODE = "%3A" headers2dict = lambda h: dict([l.strip().split(": ") for l in h.strip().splitlines()]) headers0 = headers2dict(""" icy-metadata: 1 User-Agent: GStreamer souphttpsrc libsoup/2.52.2 """) sessions = {} cur_directory = os.path.dirname(os.path.realpath(__file__)) sources = None slinks = {} class StreamHandler(BaseHTTPRequestHandler): def do_HEAD(self): print "**get_head" self.send_response(200) self.send_header("Server", "playstreamproxy") if ".m3u8" in self.path.lower(): ct = "application/vnd.apple.mpegurl" elif ".ts" in self.path.lower(): ct = "video/MP2T" elif ".mp4" in self.path.lower(): ct = "video/mp4" else: ct = "text/html" self.send_header("Content-type", ct) self.end_headers() def do_GET(self): """Respond to a GET request""" print "\n\n"+40*"#"+"\nget_url: \n%s", self.path p = self.path.split("~") #url = urllib.unquote(p[0][1:]) # TODO - vajag nocekot vai visi urli strādā urlp = p[0][1:] url = urlp.replace(COL_CODE, ":") #headers = self.headers.dict headers = {} # TODO izmanto saņemtos headerus, var aizvietot ar defaultajiem #headers["host"] = urlparse.urlparse(url).hostname if len(p)>1: for h in p[1:]: k = h.split("=")[0].lower() v = urllib.unquote(h.split("=")[1]) headers[k]=v if DEBUG: print "url=%s"%url print "Original request headers + url headers:" print_headers(self.headers.dict) self.protocol_version = 'HTTP/1.1' try: if "::" in url: # encoded source link self.fetch_source(urlp, headers) elif ".lattelecom.tv/" in url: # lattelecom.tv hack self.fetch_ltc( url, headers) elif "filmas.lv" in url or "viaplay" in url: # HLS session/decode filmas.lv in url: self.fetch_url2(url, headers) else: # plain fetch self.fetch_url( url, headers) except Exception as e: print "Got Exception: ", str(e) import traceback traceback.print_exc() ### Remote server request procedures ### def fetch_offline(self): print "** Fetch offline" self.send_response(200) self.send_header("Server", "playstreamproxy") self.send_header("Content-type", "video/mp4") self.end_headers() self.wfile.write(open("offline.mp4", "rb").read()) #self.wfile.close() def fetch_source(self, urlp, headers): if DEBUG: print "\n***********************************************************" print "fetch_source: \n%s"%urlp base_data = hls_base(urlp) data = urllib.unquote_plus(base_data)[:-1] if DEBUG: print "base_data=", base_data if DEBUG: print "data=", data if not base_data in slinks : streams = sources.get_streams(data) if not streams: self.write_error(500) # TODO return url = streams[0]["url"] base_url = hls_base(url) if DEBUG: print "New link, base_url=",base_url ses = requests.Session() ses.trust_env = False slinks[base_data] = {"data": data, "urlp":urlp,"url": url, "base_url": base_url,"session":ses} else: ses = slinks[base_data]["session"] if urlp == slinks[base_data]["urlp"]: url = slinks[base_data]["url"] if DEBUG: print "Existing base link", url else: url = urlp.replace(base_data, slinks[base_data]["base_url"]) if DEBUG: print "Existing new link", url r = self.get_page_ses(url,ses,True,headers = headers) code = r.status_code if not code in (200,206): # TODO mēģina vēlreiz get_streams self.write_error(code) return self.send_response(code) self.send_headers(r.headers) CHUNK_SIZE = 1024 *4 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): try: self.wfile.write(chunk) except Exception as e: print "Exception: ", str(e) self.wfile.close() return if DEBUG: print "**File downloaded" if "connection" in r.headers and r.headers["connection"] <> "keep-alive": self.wfile.close() return def fetch_url(self, url,headers): if DEBUG: print "\n***********************************************************" print "fetch_url: \n%s"%url r = self.get_page(url,headers = headers) code = r.status_code if not code in (200,206): self.write_error(code) return self.send_response(code) self.send_headers(r.headers) CHUNK_SIZE = 1024*4 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): try: self.wfile.write(chunk) except Exception as e: print "Exception: ", str(e) self.wfile.close() return if DEBUG: print "**File downloaded" if "connection" in r.headers and r.headers["connection"] <> "keep-alive": self.wfile.close() return def fetch_ltc(self, url, headers): "lattelecom.tv hack (have to update chunklist after each 6 min" if DEBUG: print "\n\n***********************************************************" print "fetch_ltc: \n%s"%url base_url = hls_base(url) if DEBUG: print "base_url=",base_url if base_url not in sessions: if DEBUG: print "New session" sessions[base_url] = {} sessions[base_url]["session"] = requests.Session() sessions[base_url]["session"].trust_env = False sessions[base_url]["session"].headers.update(headers0) sessions[base_url]["playlist"] = "" sessions[base_url]["chunklist"] = [] # change ts file to valid one media_w215689190_33.ts? tsfile = re.search("media_\w+_(\d+)\.ts", url, re.IGNORECASE) if tsfile and sessions[base_url]["chunklist"]: tnum = int(tsfile.group(1)) url2 = sessions[base_url]["chunklist"][tnum] if not url2.startswith("http"): url2 = base_url + url2 url = url2 if DEBUG: print "[playstreamproxy] url changed to ", url ### get_page ### ses = sessions[base_url]["session"] #ses.headers.update(headers0) ses.headers.update(headers) # ses.headers["Connection"]="Keep-Alive" r = self.get_page_ses(url,ses) code = r.status_code #r.status_code if not (code in (200,206)) and tsfile: # update chunklist r2 = self.get_page(sessions[base_url]["playlist"]) streams = re.findall(r"#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n(.+?)$", r2.content, re.IGNORECASE | re.MULTILINE) if streams: sorted(streams, key=lambda item: int(item[0]), reverse=True) chunklist = streams[0][1] if not chunklist.startswith("http"): chunklist = base_url + chunklist else: self.write_error(r.status_code) return print "[playstreamproxy] trying to update chunklist", chunklist r3 = self.get_page_ses(chunklist,ses,True) ts_list = re.findall(r"#EXTINF:.*?\n(.+?)$", r3.content, re.IGNORECASE | re.MULTILINE) sessions[base_url]["chunklist"]= ts_list tnum = int(tsfile.group(1)) url2 = sessions[base_url]["chunklist"][tnum] if not url2.startswith("http"): url2 = base_url + url2 r = self.get_page_ses(url2,ses,True) if not r.status_code in (200,206): self.write_error(r.status_code) return elif not r.status_code in (200,206): self.write_error(r.status_code) return if "playlist.m3u8" in url: sessions[base_url]["playlist"] = url ### Start of return formin and sending self.send_response(200) #headers2 = del_headers(r.headers,["Content-Encoding",'Transfer-Encoding',"Connection",'content-range',"range"]) headers2 = {"server":"playstreamproxy", "content-type":"text/html"} if DEBUG: print "\n** Return content" headers2["content-type"] = r.headers["content-type"] if "content-length" in r.headers: headers2["content-length"] = r.headers["content-length"] self.send_headers(r.headers) CHUNK_SIZE = 4 * 1024 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): try: #print "#", self.wfile.write(chunk) except Exception as e: print "Exception: ", str(e) return if DEBUG: print "File downloaded = " self.wfile.close() #time.sleep(1) return def fetch_url2(self, url, headers): if DEBUG: print "\n***********************************************************" print "fetch_url2: \n%s"%url base_url = hls_base(url) if DEBUG: print "base_url=",base_url if base_url not in sessions: if DEBUG: print "New session" sessions[base_url] = {} sessions[base_url]["session"] = requests.Session() sessions[base_url]["session"].trust_env = False sessions[base_url]["session"].headers.update(headers0) sessions[base_url]["key"] = binascii.a2b_hex(headers["key"]) if "key" in headers and headers["key"] else None ses = sessions[base_url]["session"] ses.trust_env = False key = sessions[base_url]["key"] #ses.headers.clear() ses.headers.update(headers) r = self.get_page_ses(url, ses,stream=False) code = r.status_code #r.status_code if not (code in (200,206)): self.write_error(r.status_code) return ### Start of return formin and sending self.send_response(200) #headers2 = del_headers(r.headers,["Content-Encoding",'Transfer-Encoding',"Connection",'content-range',"range"]) headers2 = {"server":"playstreamproxy", "content-type":"text/html"} # Content-Type: application/vnd.apple.mpegurl (encrypted) if r.headers["content-type"] == "application/vnd.apple.mpegurl" and key: content = r.content content = r.content.replace(base_url,"") content = re.sub("#EXT-X-KEY:METHOD=AES-128.+\n", "", content, 0, re.IGNORECASE | re.MULTILINE) headers2["content-type"] = "application/vnd.apple.mpegurl" headers2["content-length"] = "%s"%len(content) r.headers["content-length"] = "%s"%len(content) #headers2['content-range'] = 'bytes 0-%s/%s'%(len(content)-1,len(content)) self.send_headers(headers2) #self.send_headers(r.headers) self.wfile.write(content) self.wfile.close() # Content-Type: video/MP2T (encrypted) elif r.headers["content-type"] == "video/MP2T" and key: print "Decode video/MP2T" content = r.content from Crypto.Cipher import AES iv = content[:16] d = AES.new(key, AES.MODE_CBC, iv) content = d.decrypt(content[16:]) headers2["content-type"] = "video/MP2T" headers2["content-length"] = "%s"% (len(content)) #headers2['content-range'] = 'bytes 0-%s/%s' % (len(content) - 1, len(content)) print content[0:16] print "Finish decode" self.send_headers(headers2) self.wfile.write(content) self.wfile.close() else: if DEBUG: print "Return regular content" headers2["content-type"] = r.headers["content-type"] if "content-length" in r.headers: headers2["content-length"] = r.headers["content-length"] self.send_headers(r.headers) #self.send_headers(headers2) CHUNK_SIZE = 4 * 1024 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): try: #print "#", self.wfile.write(chunk) except Exception as e: print "Exception: ", str(e) return if DEBUG: print "File downloaded = " if "connection" in r.headers and r.headers["connection"]<>"keep-alive": self.wfile.close() #time.sleep(1) return def send_headers(self,headers): #if DEBUG: #print "**Return headers: " #print_headers(headers) for h in headers: self.send_header(h, headers[h]) self.end_headers() def write_error(self,code): print "***Error, code=%s" % code self.send_response(code) #self.send_headers(r.headers) self.wfile.close() # TODO? # self.fetch_offline() def get_page_ses(self,url,ses,stream=True, headers=None): headers= headers if headers else headers0 ses.headers.update(headers) if DEBUG: print "\n\n====================================================\n**get_page_ses\n%s"%url print "**Server request headers: " print_headers(ses.headers) r = ses.get(url, stream=stream, verify=False) if DEBUG: print "**Server response:", r.status_code print "**Server response headers: " print_headers(r.headers) return r def get_page(self,url,headers=None): if not headers: headers = headers0 if DEBUG: print "\n\n====================================================\n**get_page\n%s"%url print "**Server request headers: " print_headers(headers) r = requests.get(url, headers=headers,stream=True) if DEBUG: print "**Server response:", r.status_code print "**Server response headers: " print_headers(r.headers) return r def address_string(self): host, port = self.client_address[:2] #return socket.getfqdn(host) return host class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """Handle requests in a separate thread.""" def start(host = HOST_NAME, port = PORT_NUMBER): import ContentSources, util global sources sources = ContentSources.ContentSources(os.path.join(cur_directory, "sources")) httpd = ThreadedHTTPServer((host, port), StreamHandler) print time.asctime(), "Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER) try: httpd.serve_forever() except KeyboardInterrupt: pass httpd.server_close() print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER) class Daemon: """ A generic daemon class. Usage: subclass the Daemon class and override the run() method """ def __init__(self, pidfile, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile def daemonize(self): """ do the UNIX double-fork magic, see Stevens' "Advanced Programming in the UNIX Environment" for details (ISBN 0201563177) http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 """ try: pid = os.fork() if pid > 0: # exit first parent sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # decouple from parent environment os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # redirect standard file descriptors sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, "r") so = file(self.stdout, "a+") se = file(self.stderr, "a+", 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) # write pidfile atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile,"w+").write("%s\n" % pid) def delpid(self): os.remove(self.pidfile) def start(self): """ Start the daemon """ # Check for a pidfile to see if the daemon already runs try: pf = file(self.pidfile,"r") pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = "pidfile %s already exist. Daemon already running?\n" sys.stderr.write(message % self.pidfile) sys.exit(1) # Start the daemon self.daemonize() self.run() def stop(self): """ Stop the daemon """ # Get the pid from the pidfile try: pf = file(self.pidfile,"r") pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: message = "pidfile %s does not exist. Daemon not running?\n" sys.stderr.write(message % self.pidfile) return # not an error in a restart # Try killing the daemon process try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) except OSError, err: err = str(err) if err.find("No such process") > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): """ Restart the daemon """ self.stop() self.start() def run(self): """ You should override this method when you subclass Daemon. It will be called after the process has been daemonized by start() or restart(). """ class ProxyDaemon(Daemon): def run(self): start() def print_headers(headers): for h in headers: print "%s: %s"%(h,headers[h]) def del_headers(headers0,tags): headers = headers0.copy() for t in tags: if t in headers: del headers[t] if t.lower() in headers: del headers[t.lower()] return headers def hls_base(url): url2 = url.split("?")[0] url2 = "/".join(url2.split("/")[0:-1])+ "/" return url2 if __name__ == "__main__": daemon = ProxyDaemon("/var/run/playstreamproxy.pid") if len(sys.argv) == 2: if "start" == sys.argv[1]: daemon.start() elif "stop" == sys.argv[1]: daemon.stop() elif "restart" == sys.argv[1]: daemon.restart() elif "manualstart" == sys.argv[1]: start() else: print "Unknown command" sys.exit(2) sys.exit(0) else: print "usage: %s start|stop|restart|manualstart" % sys.argv[0] sys.exit(2)