Python module (submodule repositary), which provides content (video streams) from various online stream sources to corresponding Enigma2, Kodi, Plex plugins

playstreamproxy.py 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. """
  4. Playstreamproxy daemon (based on Livestream daemon)
  5. Provides API to ContetSources + stream serving to play via m3u8 playlists
  6. """
  7. import os, sys, time, re, json
  8. import ConfigParser
  9. import atexit
  10. from signal import SIGTERM
  11. from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
  12. from SocketServer import ThreadingMixIn
  13. import time, threading, socket, SocketServer
  14. from urllib import unquote, quote
  15. import urllib,urlparse
  16. import requests
  17. #import cookielib,urllib2
  18. from ContentSources import ContentSources
  19. from sources.SourceBase import stream_type
  20. import util
  21. from util import streamproxy_decode3, streamproxy_encode3
  22. try:
  23. from requests.packages.urllib3.exceptions import InsecureRequestWarning
  24. requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
  25. except:
  26. pass
  27. HOST_NAME = "0.0.0.0"
  28. PORT_NUMBER = 8880
  29. REDIRECT = True
  30. MULTITHREAD = False
  31. WORKERS = 10
  32. KEY = ""
  33. DEBUG = False
  34. DEBUG2 = False
  35. SPLIT_CHAR = "~"
  36. SPLIT_CODE = "%7E"
  37. EQ_CODE = "%3D"
  38. COL_CODE = "%3A"
  39. cunicode = lambda s: s.decode("utf8") if isinstance(s, str) else s
  40. cstr = lambda s: s.encode("utf8") if isinstance(s, unicode) else s
  41. headers2dict = lambda h: dict([l.strip().split(": ") for l in h.strip().splitlines()])
  42. headers0 = headers2dict("""
  43. User-Agent: GStreamer souphttpsrc libsoup/2.52.2
  44. icy-metadata: 1
  45. """)
  46. headers0_ = headers2dict("""
  47. Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
  48. User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36
  49. """)
  50. cur_directory = os.path.dirname(os.path.realpath(__file__))
  51. slinks = {}
  52. sessions = {}
  53. cfg_file = "streams.cfg"
  54. sources = ContentSources("", cfg_file)
  55. config = ConfigParser.ConfigParser()
  56. proxy_cfg_file = os.path.join(cur_directory, "playstreamproxy.cfg")
  57. if not os.path.exists(proxy_cfg_file):
  58. config.add_section("playstreamproxy")
  59. config.set("playstreamproxy", "debug", DEBUG)
  60. config.set("playstreamproxy", "host", HOST_NAME)
  61. config.set("playstreamproxy", "port", PORT_NUMBER)
  62. config.set("playstreamproxy", "redirect", REDIRECT)
  63. config.set("playstreamproxy", "key", KEY)
  64. config.set("playstreamproxy", "multithread", MULTITHREAD)
  65. config.set("playstreamproxy", "workers", WORKERS)
  66. config.set("playstreamproxy", "wsgi", "wsgiref")
  67. config.write(open(proxy_cfg_file, "w"))
  68. else:
  69. config.read(proxy_cfg_file)
  70. DEBUG = config.getboolean("playstreamproxy", "debug")
  71. HOST_NAME = config.get("playstreamproxy", "host")
  72. PORT_NUMBER = config.getint("playstreamproxy", "port")
  73. REDIRECT = config.getboolean("playstreamproxy", "redirect")
  74. KEY = config.get("playstreamproxy", "key")
  75. MULTITHREAD = config.getboolean("playstreamproxy", "multithread")
  76. WORKERS = config.getint("playstreamproxy", "workers")
  77. pass
  78. class StreamHandler(BaseHTTPRequestHandler):
  79. def do_HEAD(self):
  80. print "**get_head"
  81. self.send_response(200)
  82. self.send_header("Server", "playstreamproxy")
  83. if ".m3u8" in self.path.lower():
  84. ct = "application/vnd.apple.mpegurl"
  85. elif ".ts" in self.path.lower():
  86. ct = "video/MP2T"
  87. elif ".mp4" in self.path.lower():
  88. ct = "video/mp4"
  89. else:
  90. ct = "text/html"
  91. self.send_header("Content-type", ct)
  92. self.end_headers()
  93. def do_GET(self):
  94. """Respond to a GET request"""
  95. #
  96. print "\n\n"+40*"#"+"\nget_url: \n%s" % self.path
  97. cmd, data, headers, qs = streamproxy_decode3(self.path)
  98. if KEY:
  99. key = qs["key"] if "key" in qs else ""
  100. if key <> KEY:
  101. print "No key provided"
  102. #self.send_response(404)
  103. self.write_error(404)
  104. return
  105. if DEBUG:
  106. print "cmd=%s"%cmd
  107. print "Original request headers + url headers:"
  108. print_headers(self.headers.dict)
  109. self.protocol_version = 'HTTP/1.1'
  110. try:
  111. if cmd == "playstream":
  112. self.fetch_source( self.path)
  113. elif cmd in ["get_content", "get_streams", "get_info", "is_video", "options_read", "options_write"]:
  114. if cmd == "get_content":
  115. content = sources.get_content(data)
  116. elif cmd == "get_streams":
  117. content = sources.get_streams(data)
  118. elif cmd == "get_info":
  119. content = sources.get_info(data)
  120. elif cmd == "is_video":
  121. content = sources.is_video(data)
  122. elif cmd == "options_read":
  123. content = sources.options_read(data)
  124. else:
  125. content = []
  126. txt = json.dumps(content)
  127. self.send_response(200)
  128. self.send_header("Server", "playstreamproxy")
  129. self.send_header("Content-type", "application/json")
  130. self.end_headers()
  131. self.wfile.write(txt)
  132. self.wfile.close()
  133. else:
  134. self.write_error(404)
  135. except Exception as e:
  136. print "Got Exception: ", str(e)
  137. import traceback
  138. traceback.print_exc()
  139. ### Remote server request procedures ###
  140. def fetch_offline(self):
  141. print "** Fetch offline"
  142. self.send_response(200)
  143. self.send_header("Server", "playstreamproxy")
  144. self.send_header("Content-type", "video/mp4")
  145. self.end_headers()
  146. self.wfile.write(open("offline.mp4", "rb").read())
  147. #self.wfile.close()
  148. def redirect_source(self, urlp):
  149. cmd, data, headers, qs = streamproxy_decode3(urlp)
  150. streams = sources.get_streams(data)
  151. if not streams:
  152. self.write_error(500) # TODO
  153. return
  154. stream = streams[0]
  155. url = stream["url"]
  156. headers = stream["headers"] if "headers" in stream else headers0
  157. self.send_response(307)
  158. self.send_header("Location", url)
  159. self.end_headers()
  160. def fetch_source(self, urlp):
  161. cmd, data, headers, qs = streamproxy_decode3(urlp)
  162. if DEBUG:
  163. print "\n***********************************************************"
  164. print "fetch_source: \n%s"%urlp
  165. base_data = hls_base(urlp)
  166. if DEBUG:
  167. print "base_data=", base_data
  168. print "data=", data
  169. if not base_data in slinks:
  170. streams = sources.get_streams(data)
  171. if not streams:
  172. self.write_error(500) # TODO
  173. return
  174. stream = streams[0]
  175. url = stream["url"]
  176. headers = stream["headers"] if "headers" in stream else headers0
  177. base_url = hls_base2(url)
  178. if DEBUG: print "New link, base_url=",base_url
  179. ses = requests.Session()
  180. ses.trust_env = False
  181. slinks[base_data] = {"data": data, "urlp":urlp,"url": url, "base_url": base_url,"session":ses}
  182. else:
  183. ses = slinks[base_data]["session"]
  184. if urlp == slinks[base_data]["urlp"]:
  185. url = slinks[base_data]["url"]
  186. if DEBUG: print "Existing base link", url
  187. else:
  188. url = urlp.replace(base_data, slinks[base_data]["base_url"])
  189. if DEBUG: print "Existing new link", url
  190. if REDIRECT:
  191. print "-->redirect to: " + url
  192. self.send_response(307)
  193. self.send_header("Location", url)
  194. self.end_headers()
  195. #self.wfile.close()
  196. return
  197. headers2 = headers if headers else self.headers.dict
  198. headers2 = del_headers(headers2, ["host"])
  199. r = self.get_page_ses(url,ses,True,headers = headers2)
  200. code = r.status_code
  201. if not code in (200,206): # TODO 206 apstrāde!
  202. self.write_error(code)
  203. return
  204. if code == 206:
  205. print "Code=206"
  206. self.send_response(code)
  207. #headers2 = del_headers(r.headers, ["Content-Encoding"])
  208. self.send_headers(r.headers)
  209. CHUNK_SIZE = 1024 *4
  210. while True:
  211. chunk = r.raw.read(CHUNK_SIZE, decode_content=False)
  212. if not chunk:
  213. break
  214. try:
  215. self.wfile.write(chunk)
  216. except Exception as e:
  217. print "Exception: ", str(e)
  218. self.wfile.close()
  219. return
  220. if DEBUG: print "**File downloaded"
  221. #if "connection" in r.headers and r.headers["connection"] <> "keep-alive":
  222. self.wfile.close()
  223. return
  224. def send_headers(self,headers):
  225. #if DEBUG:
  226. #print "**Return headers: "
  227. #print_headers(headers)
  228. for h in headers:
  229. self.send_header(h, headers[h])
  230. self.end_headers()
  231. def write_error(self,code):
  232. print "***Error, code=%s" % code
  233. self.send_response(code)
  234. #self.send_headers(r.headers)
  235. self.wfile.close() # TODO?
  236. # self.fetch_offline()
  237. def get_page_ses(self,url,ses,stream=True, headers=None):
  238. headers= headers if headers else headers0
  239. ses.headers.update(headers)
  240. if DEBUG:
  241. print "\n\n====================================================\n**get_page_ses\n%s"%url
  242. print "**Server request headers: "
  243. print_headers(ses.headers)
  244. r = ses.get(url, stream=stream, verify=False)
  245. if DEBUG:
  246. print "**Server response:", r.status_code
  247. print "**Server response headers: "
  248. print_headers(r.headers)
  249. return r
  250. def get_page(self,url,headers=None):
  251. if not headers:
  252. headers = headers0
  253. if DEBUG:
  254. print "\n\n====================================================\n**get_page\n%s"%url
  255. print "**Server request headers: "
  256. print_headers(headers)
  257. r = requests.get(url, headers=headers,stream=True)
  258. if DEBUG:
  259. print "**Server response:", r.status_code
  260. print "**Server response headers: "
  261. print_headers(r.headers)
  262. return r
  263. def address_string(self):
  264. host, port = self.client_address[:2]
  265. #return socket.getfqdn(host)
  266. return host
  267. class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
  268. """Handle requests in a separate thread."""
  269. def start(host = HOST_NAME, port = PORT_NUMBER, key=KEY, redirect=REDIRECT):
  270. global REDIRECT,KEY
  271. if redirect:
  272. REDIRECT = redirect
  273. if key:
  274. KEY = key
  275. httpd = ThreadedHTTPServer((host, port), StreamHandler)
  276. print time.asctime(), "Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER)
  277. try:
  278. httpd.serve_forever()
  279. except KeyboardInterrupt:
  280. pass
  281. httpd.server_close()
  282. print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)
  283. def start2(host = HOST_NAME, port = PORT_NUMBER, redirect=REDIRECT, key=KEY, workers=WORKERS):
  284. global REDIRECT,KEY
  285. if redirect:
  286. REDIRECT = redirect
  287. if key:
  288. KEY = key
  289. print time.asctime(), "Server Starts - %s:%s" % (host, port)
  290. addr = ('', port)
  291. sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
  292. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  293. sock.bind(addr)
  294. sock.listen(5)
  295. # Launch listener threads.
  296. class Thread(threading.Thread):
  297. def __init__(self, i):
  298. threading.Thread.__init__(self)
  299. self.i = i
  300. self.daemon = True
  301. self.start()
  302. def run(self):
  303. httpd = HTTPServer(addr, StreamHandler, False)
  304. # Prevent the HTTP server from re-binding every handler.
  305. # https://stackoverflow.com/questions/46210672/
  306. httpd.socket = sock
  307. httpd.server_bind = self.server_close = lambda self: None
  308. httpd.serve_forever()
  309. [Thread(i) for i in range(workers)]
  310. time.sleep(9e5)
  311. print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)
  312. class Daemon:
  313. """
  314. A generic daemon class.
  315. Usage: subclass the Daemon class and override the run() method
  316. """
  317. def __init__(self, pidfile, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
  318. self.stdin = stdin
  319. self.stdout = stdout
  320. self.stderr = stderr
  321. self.pidfile = pidfile
  322. def daemonize(self):
  323. """
  324. do the UNIX double-fork magic, see Stevens' "Advanced
  325. Programming in the UNIX Environment" for details (ISBN 0201563177)
  326. http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
  327. """
  328. try:
  329. pid = os.fork()
  330. if pid > 0:
  331. # exit first parent
  332. sys.exit(0)
  333. except OSError, e:
  334. sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
  335. sys.exit(1)
  336. # decouple from parent environment
  337. os.chdir("/")
  338. os.setsid()
  339. os.umask(0)
  340. # do second fork
  341. try:
  342. pid = os.fork()
  343. if pid > 0:
  344. # exit from second parent
  345. sys.exit(0)
  346. except OSError, e:
  347. sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
  348. sys.exit(1)
  349. # redirect standard file descriptors
  350. sys.stdout.flush()
  351. sys.stderr.flush()
  352. si = file(self.stdin, "r")
  353. so = file(self.stdout, "a+")
  354. se = file(self.stderr, "a+", 0)
  355. os.dup2(si.fileno(), sys.stdin.fileno())
  356. os.dup2(so.fileno(), sys.stdout.fileno())
  357. os.dup2(se.fileno(), sys.stderr.fileno())
  358. # write pidfile
  359. atexit.register(self.delpid)
  360. pid = str(os.getpid())
  361. file(self.pidfile,"w+").write("%s\n" % pid)
  362. def delpid(self):
  363. os.remove(self.pidfile)
  364. def start(self):
  365. """
  366. Start the daemon
  367. """
  368. # Check for a pidfile to see if the daemon already runs
  369. try:
  370. pf = file(self.pidfile,"r")
  371. pid = int(pf.read().strip())
  372. pf.close()
  373. except IOError:
  374. pid = None
  375. if pid:
  376. message = "pidfile %s already exist. Daemon already running?\n"
  377. sys.stderr.write(message % self.pidfile)
  378. sys.exit(1)
  379. # Start the daemon
  380. self.daemonize()
  381. self.run()
  382. def stop(self):
  383. """
  384. Stop the daemon
  385. """
  386. # Get the pid from the pidfile
  387. try:
  388. pf = file(self.pidfile,"r")
  389. pid = int(pf.read().strip())
  390. pf.close()
  391. except IOError:
  392. pid = None
  393. if not pid:
  394. message = "pidfile %s does not exist. Daemon not running?\n"
  395. sys.stderr.write(message % self.pidfile)
  396. return # not an error in a restart
  397. # Try killing the daemon process
  398. try:
  399. while 1:
  400. os.kill(pid, SIGTERM)
  401. time.sleep(0.1)
  402. except OSError, err:
  403. err = str(err)
  404. if err.find("No such process") > 0:
  405. if os.path.exists(self.pidfile):
  406. os.remove(self.pidfile)
  407. else:
  408. print str(err)
  409. sys.exit(1)
  410. def restart(self):
  411. """
  412. Restart the daemon
  413. """
  414. self.stop()
  415. self.start()
  416. def run(self):
  417. """
  418. You should override this method when you subclass Daemon. It will be called after the process has been
  419. daemonized by start() or restart().
  420. """
  421. class ProxyDaemon(Daemon):
  422. def run(self):
  423. if MULTITHREAD:
  424. start2()
  425. else:
  426. start()
  427. def print_headers(headers):
  428. for h in headers:
  429. print "%s: %s"%(h,headers[h])
  430. def del_headers(headers0,tags):
  431. headers = headers0.copy()
  432. for t in tags:
  433. if t in headers:
  434. del headers[t]
  435. if t.lower() in headers:
  436. del headers[t.lower()]
  437. return headers
  438. def hls_base(url):
  439. base = url.split("?")[0]
  440. base = "/".join(base.split("/")[0:3])+ "/"
  441. rest = url.replace(base, "")
  442. return base
  443. def hls_base2(url):
  444. base = url.split("?")[0]
  445. base = "/".join(base.split("/")[0:-1])+ "/"
  446. rest = url.replace(base, "")
  447. return base
  448. if __name__ == "__main__":
  449. daemon = ProxyDaemon("/var/run/playstreamproxy.pid")
  450. if len(sys.argv) == 2:
  451. if "start" == sys.argv[1]:
  452. daemon.start()
  453. elif "stop" == sys.argv[1]:
  454. daemon.stop()
  455. elif "restart" == sys.argv[1]:
  456. daemon.restart()
  457. elif "manualstart" == sys.argv[1]:
  458. daemon.run()
  459. elif "multithread" == sys.argv[1]:
  460. start2()
  461. else:
  462. print "Unknown command"
  463. sys.exit(2)
  464. sys.exit(0)
  465. else:
  466. print "usage: %s start|stop|restart|manualstart" % sys.argv[0]
  467. sys.exit(2)