Python module (submodule repositary), which provides content (video streams) from various online stream sources to corresponding Enigma2, Kodi, Plex plugins

playstreamproxy.py 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. """
  4. Playstreamproxy daemon (based on Livestream daemon)
  5. Provides API to ContetSources + stream serving to play via m3u8 playlists
  6. """
  7. import os, sys, time, re, json
  8. import ConfigParser
  9. import atexit
  10. from signal import SIGTERM
  11. from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
  12. from SocketServer import ThreadingMixIn
  13. import time, threading, socket, SocketServer
  14. from urllib import unquote, quote
  15. import urllib,urlparse
  16. import requests
  17. #import cookielib,urllib2
  18. from ContentSources import ContentSources
  19. from sources.SourceBase import stream_type
  20. import util
  21. from util import streamproxy_decode3, streamproxy_encode3
  22. try:
  23. from requests.packages.urllib3.exceptions import InsecureRequestWarning
  24. requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
  25. except:
  26. pass
  27. HOST_NAME = "0.0.0.0"
  28. PORT_NUMBER = 8880
  29. REDIRECT = True
  30. MULTITHREAD = False
  31. WORKERS = 10
  32. KEY = ""
  33. DEBUG = False
  34. DEBUG2 = False
  35. SPLIT_CHAR = "~"
  36. SPLIT_CODE = "%7E"
  37. EQ_CODE = "%3D"
  38. COL_CODE = "%3A"
  39. cunicode = lambda s: s.decode("utf8") if isinstance(s, str) else s
  40. cstr = lambda s: s.encode("utf8") if isinstance(s, unicode) else s
  41. headers2dict = lambda h: dict([l.strip().split(": ") for l in h.strip().splitlines()])
  42. headers0 = headers2dict("""
  43. User-Agent: GStreamer souphttpsrc libsoup/2.52.2
  44. icy-metadata: 1
  45. """)
  46. headers0_ = headers2dict("""
  47. Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
  48. User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36
  49. """)
  50. cur_directory = os.path.dirname(os.path.realpath(__file__))
  51. slinks = {}
  52. sessions = {}
  53. cfg_file = "streams.cfg"
  54. sources = ContentSources("", cfg_file)
  55. config = ConfigParser.ConfigParser()
  56. proxy_cfg_file = os.path.join(cur_directory, "playstreamproxy.cfg")
  57. if not os.path.exists(proxy_cfg_file):
  58. config.add_section("playstreamproxy")
  59. config.set("playstreamproxy", "debug", DEBUG)
  60. config.set("playstreamproxy", "host", HOST_NAME)
  61. config.set("playstreamproxy", "port", PORT_NUMBER)
  62. config.set("playstreamproxy", "redirect", REDIRECT)
  63. config.set("playstreamproxy", "key", KEY)
  64. config.set("playstreamproxy", "multithread", MULTITHREAD)
  65. config.set("playstreamproxy", "workers", WORKERS)
  66. config.set("playstreamproxy", "wsgi", "wsgiref")
  67. config.write(open(proxy_cfg_file, "w"))
  68. else:
  69. config.read(proxy_cfg_file)
  70. DEBUG = config.getboolean("playstreamproxy", "debug")
  71. HOST_NAME = config.get("playstreamproxy", "host")
  72. PORT_NUMBER = config.getint("playstreamproxy", "port")
  73. REDIRECT = config.getboolean("playstreamproxy", "redirect")
  74. KEY = config.get("playstreamproxy", "key")
  75. MULTITHREAD = config.getboolean("playstreamproxy", "multithread")
  76. WORKERS = config.getint("playstreamproxy", "workers")
  77. pass
  78. class StreamHandler(BaseHTTPRequestHandler):
  79. def do_HEAD(self):
  80. print "**get_head"
  81. self.send_response(200)
  82. self.send_header("Server", "playstreamproxy")
  83. if ".m3u8" in self.path.lower():
  84. ct = "application/vnd.apple.mpegurl"
  85. elif ".ts" in self.path.lower():
  86. ct = "video/MP2T"
  87. elif ".mp4" in self.path.lower():
  88. ct = "video/mp4"
  89. else:
  90. ct = "text/html"
  91. self.send_header("Content-type", ct)
  92. self.end_headers()
  93. def do_GET(self):
  94. """Respond to a GET request"""
  95. #
  96. print "\n\n"+40*"#"+"\nget_url: \n%s" % self.path
  97. cmd, data, headers, qs = streamproxy_decode3(self.path)
  98. print "cmd=", cmd,
  99. print " data=", data
  100. print_headers(qs)
  101. if KEY:
  102. key = qs["key"] if "key" in qs else ""
  103. if key <> KEY:
  104. print "No key provided"
  105. #self.send_response(404)
  106. self.write_error(404)
  107. return
  108. if DEBUG:
  109. print "=== Headers:"
  110. print_headers(self.headers.dict)
  111. self.protocol_version = 'HTTP/1.1'
  112. try:
  113. if cmd == "playstream":
  114. self.fetch_source( self.path)
  115. elif cmd in ["get_content", "get_streams", "get_info", "is_video", "options_read", "options_write"]:
  116. if cmd == "get_content":
  117. content = sources.get_content(data)
  118. elif cmd == "get_streams":
  119. content = sources.get_streams(data)
  120. elif cmd == "get_info":
  121. content = sources.get_info(data)
  122. elif cmd == "get_image":
  123. content = sources.get_image(data)
  124. elif cmd == "is_video":
  125. content = sources.is_video(data)
  126. elif cmd == "options_read":
  127. content = sources.options_read(data)
  128. elif cmd == "options_write":
  129. content = sources.options_write(data)
  130. else:
  131. content = []
  132. txt = json.dumps(content)
  133. self.send_response(200)
  134. self.send_header("Server", "playstreamproxy")
  135. self.send_header("Content-type", "application/json")
  136. self.end_headers()
  137. self.wfile.write(txt)
  138. self.wfile.close()
  139. else:
  140. self.write_error(404)
  141. except Exception as e:
  142. print "Got Exception: ", str(e)
  143. import traceback
  144. traceback.print_exc()
  145. ### Remote server request procedures ###
  146. def fetch_offline(self):
  147. print "** Fetch offline"
  148. self.send_response(200)
  149. self.send_header("Server", "playstreamproxy")
  150. self.send_header("Content-type", "video/mp4")
  151. self.end_headers()
  152. self.wfile.write(open("offline.mp4", "rb").read())
  153. #self.wfile.close()
  154. def redirect_source(self, urlp):
  155. cmd, data, headers, qs = streamproxy_decode3(urlp)
  156. streams = sources.get_streams(data)
  157. if not streams:
  158. self.write_error(500) # TODO
  159. return
  160. stream = streams[0]
  161. url = stream["url"]
  162. headers = stream["headers"] if "headers" in stream else headers0
  163. self.send_response(307)
  164. self.send_header("Location", url)
  165. self.end_headers()
  166. def fetch_source(self, urlp):
  167. cmd, data, headers, qs = streamproxy_decode3(urlp)
  168. if DEBUG:
  169. print "\n***********************************************************"
  170. print "fetch_source: \n%s"%urlp
  171. base_data = hls_base(urlp)
  172. if DEBUG:
  173. print "base_data=", base_data
  174. print "data=", data
  175. if not base_data in slinks:
  176. streams = sources.get_streams(data)
  177. if not streams:
  178. self.write_error(500) # TODO
  179. return
  180. stream = streams[0]
  181. url = stream["url"]
  182. headers = stream["headers"] if "headers" in stream else headers0
  183. base_url = hls_base2(url)
  184. if DEBUG: print "New link, base_url=",base_url
  185. ses = requests.Session()
  186. ses.trust_env = False
  187. slinks[base_data] = {"data": data, "urlp":urlp,"url": url, "base_url": base_url,"session":ses}
  188. else:
  189. ses = slinks[base_data]["session"]
  190. if urlp == slinks[base_data]["urlp"]:
  191. url = slinks[base_data]["url"]
  192. if DEBUG: print "Existing base link", url
  193. else:
  194. url = urlp.replace(base_data, slinks[base_data]["base_url"])
  195. if DEBUG: print "Existing new link", url
  196. if REDIRECT:
  197. print "-->redirect to: " + url
  198. self.send_response(307)
  199. self.send_header("Location", url)
  200. self.end_headers()
  201. #self.wfile.close()
  202. return
  203. headers2 = headers if headers else self.headers.dict
  204. headers2 = del_headers(headers2, ["host"])
  205. # if ".ts" in url:
  206. # print url
  207. r = self.get_page_ses(url,ses,True,headers = headers2)
  208. code = r.status_code
  209. if not code in (200,206): # TODO 206 apstrāde!
  210. self.write_error(code)
  211. return
  212. if code == 206:
  213. print "Code=206"
  214. self.send_response(code)
  215. #headers2 = del_headers(r.headers, ["Content-Encoding"])
  216. self.send_headers(r.headers)
  217. CHUNK_SIZE = 1024 *4
  218. while True:
  219. chunk = r.raw.read(CHUNK_SIZE, decode_content=False)
  220. if not chunk:
  221. break
  222. try:
  223. self.wfile.write(chunk)
  224. except Exception as e:
  225. print "Exception: ", str(e)
  226. self.wfile.close()
  227. return
  228. if DEBUG: print "**File downloaded"
  229. #if "connection" in r.headers and r.headers["connection"] <> "keep-alive":
  230. self.wfile.close()
  231. return
  232. def send_headers(self,headers):
  233. #if DEBUG:
  234. #print "**Return headers: "
  235. #print_headers(headers)
  236. for h in headers:
  237. self.send_header(h, headers[h])
  238. self.end_headers()
  239. def write_error(self,code):
  240. print "***Error, code=%s" % code
  241. self.send_response(code)
  242. #self.send_headers(r.headers)
  243. self.wfile.close() # TODO?
  244. # self.fetch_offline()
  245. def get_page_ses(self,url,ses,stream=True, headers=None):
  246. headers= headers if headers else headers0
  247. ses.headers.update(headers)
  248. if DEBUG:
  249. print "\n\n====================================================\n**get_page_ses\n%s"%url
  250. print "**Server request headers: "
  251. print_headers(ses.headers)
  252. r = ses.get(url, stream=stream, verify=False)
  253. if DEBUG:
  254. print "**Server response:", r.status_code
  255. print "**Server response headers: "
  256. print_headers(r.headers)
  257. return r
  258. def get_page(self,url,headers=None):
  259. if not headers:
  260. headers = headers0
  261. if DEBUG:
  262. print "\n\n====================================================\n**get_page\n%s"%url
  263. print "**Server request headers: "
  264. print_headers(headers)
  265. r = requests.get(url, headers=headers,stream=True)
  266. if DEBUG:
  267. print "**Server response:", r.status_code
  268. print "**Server response headers: "
  269. print_headers(r.headers)
  270. return r
  271. def address_string(self):
  272. host, port = self.client_address[:2]
  273. #return socket.getfqdn(host)
  274. return host
  275. class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
  276. """Handle requests in a separate thread."""
  277. def start(host = HOST_NAME, port = PORT_NUMBER, redirect=REDIRECT, key=KEY):
  278. global REDIRECT,KEY
  279. if redirect:
  280. REDIRECT = redirect
  281. if key:
  282. KEY = key
  283. httpd = ThreadedHTTPServer((host, port), StreamHandler)
  284. print time.asctime(), "Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER)
  285. try:
  286. httpd.serve_forever()
  287. except KeyboardInterrupt:
  288. pass
  289. httpd.server_close()
  290. print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)
  291. def start2(host = HOST_NAME, port = PORT_NUMBER, redirect=REDIRECT, key=KEY, workers=WORKERS):
  292. global REDIRECT,KEY
  293. if redirect:
  294. REDIRECT = redirect
  295. if key:
  296. KEY = key
  297. print time.asctime(), "Server Starts - %s:%s" % (host, port)
  298. addr = ('', port)
  299. sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
  300. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  301. sock.bind(addr)
  302. sock.listen(5)
  303. # Launch listener threads.
  304. class Thread(threading.Thread):
  305. def __init__(self, i):
  306. threading.Thread.__init__(self)
  307. self.i = i
  308. self.daemon = True
  309. self.start()
  310. def run(self):
  311. httpd = HTTPServer(addr, StreamHandler, False)
  312. # Prevent the HTTP server from re-binding every handler.
  313. # https://stackoverflow.com/questions/46210672/
  314. httpd.socket = sock
  315. httpd.server_bind = self.server_close = lambda self: None
  316. httpd.serve_forever()
  317. [Thread(i) for i in range(workers)]
  318. time.sleep(9e5)
  319. print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)
  320. class Daemon:
  321. """
  322. A generic daemon class.
  323. Usage: subclass the Daemon class and override the run() method
  324. """
  325. def __init__(self, pidfile, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
  326. self.stdin = stdin
  327. self.stdout = stdout
  328. self.stderr = stderr
  329. self.pidfile = pidfile
  330. def daemonize(self):
  331. """
  332. do the UNIX double-fork magic, see Stevens' "Advanced
  333. Programming in the UNIX Environment" for details (ISBN 0201563177)
  334. http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
  335. """
  336. try:
  337. pid = os.fork()
  338. if pid > 0:
  339. # exit first parent
  340. sys.exit(0)
  341. except OSError, e:
  342. sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
  343. sys.exit(1)
  344. # decouple from parent environment
  345. os.chdir("/")
  346. os.setsid()
  347. os.umask(0)
  348. # do second fork
  349. try:
  350. pid = os.fork()
  351. if pid > 0:
  352. # exit from second parent
  353. sys.exit(0)
  354. except OSError, e:
  355. sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
  356. sys.exit(1)
  357. # redirect standard file descriptors
  358. sys.stdout.flush()
  359. sys.stderr.flush()
  360. si = file(self.stdin, "r")
  361. so = file(self.stdout, "a+")
  362. se = file(self.stderr, "a+", 0)
  363. os.dup2(si.fileno(), sys.stdin.fileno())
  364. os.dup2(so.fileno(), sys.stdout.fileno())
  365. os.dup2(se.fileno(), sys.stderr.fileno())
  366. # write pidfile
  367. atexit.register(self.delpid)
  368. pid = str(os.getpid())
  369. file(self.pidfile,"w+").write("%s\n" % pid)
  370. def delpid(self):
  371. os.remove(self.pidfile)
  372. def start(self):
  373. """
  374. Start the daemon
  375. """
  376. # Check for a pidfile to see if the daemon already runs
  377. try:
  378. pf = file(self.pidfile,"r")
  379. pid = int(pf.read().strip())
  380. pf.close()
  381. except IOError:
  382. pid = None
  383. if pid:
  384. message = "pidfile %s already exist. Daemon already running?\n"
  385. sys.stderr.write(message % self.pidfile)
  386. sys.exit(1)
  387. # Start the daemon
  388. self.daemonize()
  389. self.run()
  390. def stop(self):
  391. """
  392. Stop the daemon
  393. """
  394. # Get the pid from the pidfile
  395. try:
  396. pf = file(self.pidfile,"r")
  397. pid = int(pf.read().strip())
  398. pf.close()
  399. except IOError:
  400. pid = None
  401. if not pid:
  402. message = "pidfile %s does not exist. Daemon not running?\n"
  403. sys.stderr.write(message % self.pidfile)
  404. return # not an error in a restart
  405. # Try killing the daemon process
  406. try:
  407. while 1:
  408. os.kill(pid, SIGTERM)
  409. time.sleep(0.1)
  410. except OSError, err:
  411. err = str(err)
  412. if err.find("No such process") > 0:
  413. if os.path.exists(self.pidfile):
  414. os.remove(self.pidfile)
  415. else:
  416. print str(err)
  417. sys.exit(1)
  418. def restart(self):
  419. """
  420. Restart the daemon
  421. """
  422. self.stop()
  423. self.start()
  424. def run(self):
  425. """
  426. You should override this method when you subclass Daemon. It will be called after the process has been
  427. daemonized by start() or restart().
  428. """
  429. class ProxyDaemon(Daemon):
  430. def run(self):
  431. print "Daemon start - %s" % MULTITHREAD
  432. if MULTITHREAD:
  433. start2()
  434. else:
  435. start()
  436. def print_headers(headers):
  437. for h in headers:
  438. print "%s: %s"%(h,headers[h])
  439. def del_headers(headers0,tags):
  440. headers = headers0.copy()
  441. for t in tags:
  442. if t in headers:
  443. del headers[t]
  444. if t.lower() in headers:
  445. del headers[t.lower()]
  446. return headers
  447. def hls_base(url):
  448. base = url.split("?")[0]
  449. base = "/".join(base.split("/")[0:3])+ "/"
  450. rest = url.replace(base, "")
  451. return base
  452. def hls_base2(url):
  453. base = url.split("?")[0]
  454. base = "/".join(base.split("/")[0:-1])+ "/"
  455. rest = url.replace(base, "")
  456. return base
  457. if __name__ == "__main__":
  458. daemon = ProxyDaemon("/var/run/playstreamproxy.pid")
  459. if len(sys.argv) == 2:
  460. if "start" == sys.argv[1]:
  461. daemon.start()
  462. elif "stop" == sys.argv[1]:
  463. daemon.stop()
  464. elif "restart" == sys.argv[1]:
  465. daemon.restart()
  466. elif "manualstart" == sys.argv[1]:
  467. daemon.run()
  468. elif "multithread" == sys.argv[1]:
  469. start2()
  470. else:
  471. print "Unknown command"
  472. sys.exit(2)
  473. sys.exit(0)
  474. else:
  475. print "usage: %s start|stop|restart|manualstart" % sys.argv[0]
  476. sys.exit(2)