#!/usr/bin/python # -*- coding: utf-8 -*- """ Playstreamproxy daemon (based on Livestream daemon) Provides API to ContetSources + stream serving to play via m3u8 playlists """ import os, sys, time, re, json import ConfigParser import atexit from signal import SIGTERM from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from SocketServer import ThreadingMixIn import time, threading, socket, SocketServer from urllib import unquote, quote import urllib,urlparse import requests #import cookielib,urllib2 from ContentSources import ContentSources from sources.SourceBase import stream_type import util from util import streamproxy_decode3, streamproxy_encode3 try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except: pass HOST_NAME = "0.0.0.0" PORT_NUMBER = 8880 REDIRECT = True MULTITHREAD = False WORKERS = 10 KEY = "" DEBUG = False DEBUG2 = False SPLIT_CHAR = "~" SPLIT_CODE = "%7E" EQ_CODE = "%3D" COL_CODE = "%3A" cunicode = lambda s: s.decode("utf8") if isinstance(s, str) else s cstr = lambda s: s.encode("utf8") if isinstance(s, unicode) else s headers2dict = lambda h: dict([l.strip().split(": ") for l in h.strip().splitlines()]) headers0 = headers2dict(""" User-Agent: GStreamer souphttpsrc libsoup/2.52.2 icy-metadata: 1 """) headers0_ = headers2dict(""" Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36 """) cur_directory = os.path.dirname(os.path.realpath(__file__)) slinks = {} sessions = {} cfg_file = "streams.cfg" sources = ContentSources("", cfg_file) config = ConfigParser.ConfigParser() proxy_cfg_file = os.path.join(cur_directory, "playstreamproxy.cfg") if not os.path.exists(proxy_cfg_file): config.add_section("playstreamproxy") config.set("playstreamproxy", "debug", DEBUG) config.set("playstreamproxy", "host", HOST_NAME) config.set("playstreamproxy", "port", PORT_NUMBER) config.set("playstreamproxy", "redirect", REDIRECT) config.set("playstreamproxy", "key", KEY) config.set("playstreamproxy", "multithread", MULTITHREAD) config.set("playstreamproxy", "workers", WORKERS) config.set("playstreamproxy", "wsgi", "wsgiref") config.write(open(proxy_cfg_file, "w")) else: config.read(proxy_cfg_file) DEBUG = config.getboolean("playstreamproxy", "debug") HOST_NAME = config.get("playstreamproxy", "host") PORT_NUMBER = config.getint("playstreamproxy", "port") REDIRECT = config.getboolean("playstreamproxy", "redirect") KEY = config.get("playstreamproxy", "key") MULTITHREAD = config.getboolean("playstreamproxy", "multithread") WORKERS = config.getint("playstreamproxy", "workers") pass class StreamHandler(BaseHTTPRequestHandler): def do_HEAD(self): print "**get_head" self.send_response(200) self.send_header("Server", "playstreamproxy") if ".m3u8" in self.path.lower(): ct = "application/vnd.apple.mpegurl" elif ".ts" in self.path.lower(): ct = "video/MP2T" elif ".mp4" in self.path.lower(): ct = "video/mp4" else: ct = "text/html" self.send_header("Content-type", ct) self.end_headers() def do_GET(self): """Respond to a GET request""" # print "\n\n"+40*"#"+"\nget_url: \n%s" % self.path cmd, data, headers, qs = streamproxy_decode3(self.path) print "cmd=", cmd, print " data=", data print_headers(qs) if KEY: key = qs["key"] if "key" in qs else "" if key <> KEY: print "No key provided" #self.send_response(404) self.write_error(404) return if DEBUG: print "=== Headers:" print_headers(self.headers.dict) self.protocol_version = 'HTTP/1.1' try: if cmd == "playstream": self.fetch_source( self.path) elif cmd in ["get_content", "get_streams", "get_info", "is_video", "options_read", "options_write"]: if cmd == "get_content": content = sources.get_content(data) elif cmd == "get_streams": content = sources.get_streams(data) elif cmd == "get_info": content = sources.get_info(data) elif cmd == "get_image": content = sources.get_image(data) elif cmd == "is_video": content = sources.is_video(data) elif cmd == "options_read": content = sources.options_read(data) elif cmd == "options_write": content = sources.options_write(data) else: content = [] txt = json.dumps(content) self.send_response(200) self.send_header("Server", "playstreamproxy") self.send_header("Content-type", "application/json") self.end_headers() self.wfile.write(txt) self.wfile.close() else: self.write_error(404) except Exception as e: print "Got Exception: ", str(e) import traceback traceback.print_exc() ### Remote server request procedures ### def fetch_offline(self): print "** Fetch offline" self.send_response(200) self.send_header("Server", "playstreamproxy") self.send_header("Content-type", "video/mp4") self.end_headers() self.wfile.write(open("offline.mp4", "rb").read()) #self.wfile.close() def redirect_source(self, urlp): cmd, data, headers, qs = streamproxy_decode3(urlp) streams = sources.get_streams(data) if not streams: self.write_error(500) # TODO return stream = streams[0] url = stream["url"] headers = stream["headers"] if "headers" in stream else headers0 self.send_response(307) self.send_header("Location", url) self.end_headers() def fetch_source(self, urlp): cmd, data, headers, qs = streamproxy_decode3(urlp) if DEBUG: print "\n***********************************************************" print "fetch_source: \n%s"%urlp base_data = hls_base(urlp) if DEBUG: print "base_data=", base_data print "data=", data if not base_data in slinks: streams = sources.get_streams(data) if not streams: self.write_error(500) # TODO return stream = streams[0] url = stream["url"] headers = stream["headers"] if "headers" in stream else headers0 base_url = hls_base2(url) if DEBUG: print "New link, base_url=",base_url ses = requests.Session() ses.trust_env = False slinks[base_data] = {"data": data, "urlp":urlp,"url": url, "base_url": base_url,"session":ses} else: ses = slinks[base_data]["session"] if urlp == slinks[base_data]["urlp"]: url = slinks[base_data]["url"] if DEBUG: print "Existing base link", url else: url = urlp.replace(base_data, slinks[base_data]["base_url"]) if DEBUG: print "Existing new link", url if REDIRECT: print "-->redirect to: " + url self.send_response(307) self.send_header("Location", url) self.end_headers() #self.wfile.close() return headers2 = headers if headers else self.headers.dict headers2 = del_headers(headers2, ["host"]) # if ".ts" in url: # print url r = self.get_page_ses(url,ses,True,headers = headers2) code = r.status_code if not code in (200,206): # TODO 206 apstrāde! self.write_error(code) return if code == 206: print "Code=206" self.send_response(code) #headers2 = del_headers(r.headers, ["Content-Encoding"]) self.send_headers(r.headers) CHUNK_SIZE = 1024 *4 while True: chunk = r.raw.read(CHUNK_SIZE, decode_content=False) if not chunk: break try: self.wfile.write(chunk) except Exception as e: print "Exception: ", str(e) self.wfile.close() return if DEBUG: print "**File downloaded" #if "connection" in r.headers and r.headers["connection"] <> "keep-alive": self.wfile.close() return def send_headers(self,headers): #if DEBUG: #print "**Return headers: " #print_headers(headers) for h in headers: self.send_header(h, headers[h]) self.end_headers() def write_error(self,code): print "***Error, code=%s" % code self.send_response(code) #self.send_headers(r.headers) self.wfile.close() # TODO? # self.fetch_offline() def get_page_ses(self,url,ses,stream=True, headers=None): headers= headers if headers else headers0 ses.headers.update(headers) if DEBUG: print "\n\n====================================================\n**get_page_ses\n%s"%url print "**Server request headers: " print_headers(ses.headers) r = ses.get(url, stream=stream, verify=False) if DEBUG: print "**Server response:", r.status_code print "**Server response headers: " print_headers(r.headers) return r def get_page(self,url,headers=None): if not headers: headers = headers0 if DEBUG: print "\n\n====================================================\n**get_page\n%s"%url print "**Server request headers: " print_headers(headers) r = requests.get(url, headers=headers,stream=True) if DEBUG: print "**Server response:", r.status_code print "**Server response headers: " print_headers(r.headers) return r def address_string(self): host, port = self.client_address[:2] #return socket.getfqdn(host) return host class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """Handle requests in a separate thread.""" def start(host = HOST_NAME, port = PORT_NUMBER, redirect=REDIRECT, key=KEY): global REDIRECT,KEY if redirect: REDIRECT = redirect if key: KEY = key httpd = ThreadedHTTPServer((host, port), StreamHandler) print time.asctime(), "Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER) try: httpd.serve_forever() except KeyboardInterrupt: pass httpd.server_close() print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER) def start2(host = HOST_NAME, port = PORT_NUMBER, redirect=REDIRECT, key=KEY, workers=WORKERS): global REDIRECT,KEY if redirect: REDIRECT = redirect if key: KEY = key print time.asctime(), "Server Starts - %s:%s" % (host, port) addr = ('', port) sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(addr) sock.listen(5) # Launch listener threads. class Thread(threading.Thread): def __init__(self, i): threading.Thread.__init__(self) self.i = i self.daemon = True self.start() def run(self): httpd = HTTPServer(addr, StreamHandler, False) # Prevent the HTTP server from re-binding every handler. # https://stackoverflow.com/questions/46210672/ httpd.socket = sock httpd.server_bind = self.server_close = lambda self: None httpd.serve_forever() [Thread(i) for i in range(workers)] time.sleep(9e5) print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER) class Daemon: """ A generic daemon class. Usage: subclass the Daemon class and override the run() method """ def __init__(self, pidfile, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile def daemonize(self): """ do the UNIX double-fork magic, see Stevens' "Advanced Programming in the UNIX Environment" for details (ISBN 0201563177) http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 """ try: pid = os.fork() if pid > 0: # exit first parent sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # decouple from parent environment os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # redirect standard file descriptors sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, "r") so = file(self.stdout, "a+") se = file(self.stderr, "a+", 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) # write pidfile atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile,"w+").write("%s\n" % pid) def delpid(self): os.remove(self.pidfile) def start(self): """ Start the daemon """ # Check for a pidfile to see if the daemon already runs try: pf = file(self.pidfile,"r") pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = "pidfile %s already exist. Daemon already running?\n" sys.stderr.write(message % self.pidfile) sys.exit(1) # Start the daemon self.daemonize() self.run() def stop(self): """ Stop the daemon """ # Get the pid from the pidfile try: pf = file(self.pidfile,"r") pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: message = "pidfile %s does not exist. Daemon not running?\n" sys.stderr.write(message % self.pidfile) return # not an error in a restart # Try killing the daemon process try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) except OSError, err: err = str(err) if err.find("No such process") > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): """ Restart the daemon """ self.stop() self.start() def run(self): """ You should override this method when you subclass Daemon. It will be called after the process has been daemonized by start() or restart(). """ class ProxyDaemon(Daemon): def run(self): print "Daemon start - %s" % MULTITHREAD if MULTITHREAD: start2() else: start() def print_headers(headers): for h in headers: print "%s: %s"%(h,headers[h]) def del_headers(headers0,tags): headers = headers0.copy() for t in tags: if t in headers: del headers[t] if t.lower() in headers: del headers[t.lower()] return headers def hls_base(url): base = url.split("?")[0] base = "/".join(base.split("/")[0:3])+ "/" rest = url.replace(base, "") return base def hls_base2(url): base = url.split("?")[0] base = "/".join(base.split("/")[0:-1])+ "/" rest = url.replace(base, "") return base if __name__ == "__main__": daemon = ProxyDaemon("/var/run/playstreamproxy.pid") if len(sys.argv) == 2: if "start" == sys.argv[1]: daemon.start() elif "stop" == sys.argv[1]: daemon.stop() elif "restart" == sys.argv[1]: daemon.restart() elif "manualstart" == sys.argv[1]: daemon.run() elif "multithread" == sys.argv[1]: start2() else: print "Unknown command" sys.exit(2) sys.exit(0) else: print "usage: %s start|stop|restart|manualstart" % sys.argv[0] sys.exit(2)