# coding: utf-8 """ Enigma2 handling objects (c)Ivars 2013-2015, v0.2 """ import sys, os, os.path from urllib import quote, unquote from record3 import Rec class DBServices(object): """Dreambox services, stored in lamedb (E2)/services(E1) file handling class""" def __init__(self,service_path=None,enigma=2): self.enigma = enigma if not service_path: service_path = "enigma2" if sys.platform=="win32" else "/etc/enigma2" self.service_path = service_path #print service_path if self.enigma == 2: if not "ftp:" in service_path: self.service_fname= os.path.join(service_path,"lamedb") self.bouquets_fname = os.path.join(service_path, "bouquets.tv") else: sep = "" if service_path[-1]=="//" else "//" self.service_fname= service_path+sep+"lamedb" self.bouquets_fname = service_path+sep+"bouquets.tv" else: # ENIGMA === "1": self.service_fname = os.path.join(service_path,"services") self.bouquets_fname = os.path.join(service_path, "userbouquets.tv.epl") #print self.service_fname self.services = [] self.index_sref= {} self.index_chid = {} self.index_name = {} self.index_provider = {} self.bouquets = [] self.bouquets_services = {} self.transp = [] self.tp_index_tpid = {} self.tp_index_pic ={} self._load_services() self._load_bouquets() def _load_services(self): #tpref - lamedb/transponders # NS:TSID:ONID # s FREQ:SR:POLAR:FREC:49:2:0 # X X X # D D D D # lref - lamedb/services # SID:NS:TSID:ONID:STYPE:UNUSED(channelnumber in enigma1) # 0 1 2 3 4 5 # X X X X D D # wref - bouquets/picon # REFTYPE:FLAGS:STYPE:SID:TSID:ONID:NS:PARENT_SID:PARENT_TSID:UNUSED # 0 1 2 3 4 5 6 7 8 9 # D D X X X X X X X X f_services = open(self.service_fname,"r") if not self.service_fname[0:4] == "ftp:" else urllib2.urlopen(self.service_fname) line = f_services.readline() if not "eDVB services" in line: raise Exception("no correct lamedb file") line = f_services.readline() # Read transponders i = 0 while True: line = f_services.readline() if "end" in line: break tp = Rec() #tp = {} ff = line.strip().split(":") tp["ns"] = ns = pos_str2int(ff[0]) tp["tsid"] = tsid = int(ff[1],16) tp["onid"] = onid = int(ff[2],16) line = f_services.readline() tp["params"] = params = line.strip().split(" ")[1] ff = params.split(":") tp["freq"] = freq = int(ff[0][:-3]) tp["polar"] = polar = int(ff[2]) tp["tpid"] = tpid = (ns,tsid,onid) self.transp.append(tp) self.tp_index_tpid[tpid] = i self.tp_index_pic[(ns,freq,polar)] = i line = f_services.readline() i += 1 # Reading services i= 0 line = f_services.readline() while True: line = f_services.readline() if line[0:3] == "end": break if not line: break rec = Rec() #rec = {} rec["lref"] = line.lower().strip().decode("utf-8") line = f_services.readline() #line = line.replace('\xc2\x87', '').replace('\xc2\x86', '') #line = decode_charset(line) line = line.decode("utf-8") rec["name"] = line.strip() line = f_services.readline() provider = "" caid=[] for p in line.split(","): if p[0:2].lower() == "p:": provider = p[2:].strip() elif p[0:2].lower() == "c:": caid.append(p[2:].strip()) if provider == "": provider = "unknown" rec["provider"] = provider.decode("utf-8") rec["sref"] = lref2sref(rec["lref"]) r = lref_parse(rec["lref"]) rec["stype"] = r["stype"] rec["chid"] = (r["sid"],r["tsid"],r["onid"]) rec["caid"] = caid self.services.append(rec) self.index_sref[rec["sref"]] = i self.index_chid[rec["chid"]] = i name = rec["name"].lower() if not self.index_name.has_key(name): self.index_name[name] = [] self.index_name[name].append(i) provider = rec["provider"].lower() if not self.index_provider.has_key(provider): self.index_provider[provider] = [] self.index_provider[provider].append(i) i += 1 f_services.close() def _load_bouquets(self): f_bouquets = open(self.bouquets_fname,"r") if not self.bouquets_fname[0:4] == "ftp:" else urllib2.urlopen(self.bouquets_fname) self.bouquets=Rec() for line in f_bouquets.readlines(): if line[0:9] == "#SERVICE:": if self.enigma==1: bn = line.strip().split("/") else: bn = line.strip().split(":") bfn = bn[-1] bid = bfn.split(".")[1] ### Process one bouquet file ### bouq = Rec() bouq.bid = bid bouq.name = "" bouq.services=[] #fb = open(self.fname,"r") if not self.fname[0:4] == "ftp:" else urllib2.urlopen(self.fname) fb = open(os.path.join(self.service_path,bfn)) i= 0 line = fb.readline() while True: if line[0:5] == "#NAME": bouq.name = line.strip()[6:].decode("utf8") elif line[0:8] == "#SERVICE": sref = line.strip()[9:] s = sref_parse(sref) s["sref"] = sref if s["flags"] == 64: s["type"] = "marker" line = fb.readline() if line[0:12] =='#DESCRIPTION': s["name"] = line [13:].strip().decode("utf8") else: continue elif s["url"]: # IPTV stream s["type"] = "stream" s["sref"] = ":".join(s["sref"].split(":")[0:10])+":" line = fb.readline() if line[0:12] =='#DESCRIPTION': s["name"] = line [13:].strip().decode("utf8") else: continue bouq.services.append(s) line = fb.readline() if not line: break fb.close() self.bouquets[bid] = bouq f_bouquets.close() def _load_bouquets2(self): f_bouquets = open(self.bouquets_fname,"r") if not self.bouquets_fname[0:4] == "ftp:" else urllib2.urlopen(self.bouquets_fname) for line in f_bouquets.readlines(): if line[0:9] == "#SERVICE:": if self.enigma==1: bn = line.strip().split("/") else: bn = line.strip().split(":") bfn = bn[-1] bid = bfn.split(".")[1] bouq = Bouquet(bid, self.service_path) self.bouquets.append(bouq) f_bouquets.close() def create_bouqet(self,bid,name): fname = "userbouquet.%s.tv"%bid f_bouquets = open(self.bouquets_fname,"r") if not self.bouquets_fname[0:4] == "ftp:" else urllib2.urlopen(self.bouquets_fname) lines = f_bouquets.read().strip() f_bouquets.close() if not fname in lines: lines += "\n#SERVICE: 1:7:1:0:0:0:0:0:0:0:%s\n"%fname f_bouquets = open(self.bouquets_fname,"w") f_bouquets.write(lines) f_bouquets.close() with open(os.path.join(self.service_path,fname), 'w') as f: f.write("#NAME %s"%name.encode("utf8")) pass def get_bouquets(self): if not self.bouquets: self._load_bouquets() return self.bouquets def get_bouquet_services(self,bn): if not self.bouquets: self._load_bouquets() return self.bouquets[bn].services #---------------------------------------------------------------------- def save_bouquet(self,bn): """Save bouquet to file""" fn = "userbouquet.%s.tv"%bn fb = open(os.path.join(self.service_path,fn),"w") fb.write("#NAME %s\n"%self.bouquets[bn].name.encode("utf8")) for s in self.bouquets[bn].services: if s.type=="marker": fb.write("#SERVICE %s\n"%s.sref) fb.write("#DESCRIPTION %s\n"%s.name.encode("utf8")) elif s.type == "stream": fb.write("#SERVICE %s%s:%s\n"%(s.sref,quote(s.url),s.name.encode("utf8"))) fb.write("#DESCRIPTION %s\n"%s.name.encode("utf8")) else: fb.write("#SERVICE %s\n"%s.sref) fb.close() def get_service_by_sref(self,sref): return self.services[self.index_sref[sref]] if self.index_sref.has_key(sref) else None def get_service_by_chid(self,chid): return self.services[self.index_chid[chid]] if self.index_chid.has_key(chid) else None def get_service_by_name(self,name): return [self.services[i] for i in self.index_name[name]] if name in self.index_name else None def get_service_by_provider(self,provider): return [self.services[i] for i in self.index_provider[provider]] if provider in self.index_privider else Non def get_transp_by_tpid(self,tpid): return self.transp[self.tp_index_tpid[tpid]] if self.tp_index_tpid.has_key(tpid) else None def get_transp_by_pic(self,picid): return self.transp[self.tp_index_pic[picid]] if self.tp_index_pic.has_key(picid) else None def find_sref(ns,freq,polar,sid): "Find service reference according to given parameters" tp = services.get_transp_by_pic((ns,freq,polar)) if tp: serv = self.services.get_service_by_chid((sid,tp["tsid"],tp["onid"])) if serv: return serv["sref"] return "" ######################################################################## class Bouquet(object): """Bouquet file object""" #---------------------------------------------------------------------- def __init__(self, bid, service_path=None): """Constructor""" if not service_path: service_path = "enigma2" if sys.platform=="win32" else "/etc/enigma2" self.service_path = service_path self.bid = bid if not "ftp:" in service_path: self.fname = os.path.join(service_path, "userbouquet.%s.tv"%self.bid) else: sep = "" if service_path[-1]=="//" else "//" self.fname = os.path.join(service_path, "userbouquet.%s.tv"%self.bid) self.load() #---------------------------------------------------------------------- def load(self): """Load bouquet from file""" self.services=[] fb = open(self.fname,"r") if not self.fname[0:4] == "ftp:" else urllib2.urlopen(self.fname) i= 0 line = fb.readline() while True: if line[0:5] == "#NAME": self.name = line.strip()[6:].decode("utf8") elif line[0:8] == "#SERVICE": sref = line.strip()[9:] s = sref_parse(sref) s["sref"] = sref if s["flags"] == 64: s["type"] = "marker" line = fb.readline() if line[0:12] =='#DESCRIPTION': s["name"] = line [13:].strip().decode("utf8") else: continue elif s["url"]: # IPTV stream s["type"] = "stream" s["sref"] = ":".join(s["sref"].split(":")[0:10]) line = fb.readline() if line[0:12] =='#DESCRIPTION': s["name"] = line [13:].strip().decode("utf8") else: continue self.services.append(s) line = fb.readline() if not line: break fb.close() def decode_charset(s): u = None if isinstance(s,unicode): return s charset_list=('iso-8859-4','iso-8859-1','iso-8859-2''iso-8859-15') for charset in charset_list: try: u=unicode(s,charset,"strict") except: pass else: break if u == None: raise Error, "CHARSET ERROR while decoding lamedb. Aborting !" else: return(u) # lref - lamedb/services # SID:NS:TSID:ONID:STYPE:UNUSED(channelnumber in enigma1) # 0 1 2 3 4 5 # X X X X D D # # sref bouquets/picon # REFTYPE:FLAGS:STYPE:SID:TSID:ONID:NS:PARENT_SID:PARENT_TSID:UNUSED # 0 1 2 3 4 5 6 7 8 9 # D D X X X X X X X X def lref2sref(s): "Converts service refefence from lamedb format to bouquets/picon (standard) format" f = s.split(":") sid = int(f[0],16) ns = int(f[1], 16) tsid = int(f[2],16) onid = int(f[3],16) stype = int(f[4]) s2 = "1:0:%X:%X:%X:%X:%X:0:0:0:" % (stype,sid,tsid,onid,ns) return s2 def lref_parse(s): "Parse lamedb/service string, return dictionary of items" f = s.split(":") #r = {} r = Rec() r["sid"] = int(f[0],16) r["ns"] = int(f[1], 16) r["tsid"] = int(f[2],16) r["onid"] = int(f[3],16) r["stype"] = int(f[4]) return r def sref2lref(s): "Converts service refefence from bouquets/picon (standard) format to lamedb format" f = s.split(":") reftype = int(f[0]) flags = int(f[1]) stype = int(f[2],16) sid = int(f[3],16) tsid = int(f[4],16) onid = int(f[5],16) ns = int(f[6],16) s2 = "%04x:%08x:%04x:%04x:%i:0" % (sid,ns,tsid,onid,stype) return s2 def sref_parse(s): "Parse service refefence string , return dictionary of items" f = s.split(":") #r = {} r = Rec() r["reftype"] = int(f[0]) r["flags"] = int(f[1]) r["stype"] = int(f[2],16) r["sid"] = int(f[3],16) r["tsid"] = int(f[4],16) r["onid"] = int(f[5],16) r["ns"] = int(f[6],16) r["url"] = unquote(f[10]) if len(f)>10 else "" return r pos_int2str = lambda pos_num: "%04x0000"%pos_num pos_str2int = lambda pos: int(pos[0:4],16) def decode_charset(s): u = None charset_list = ('utf-8','iso-8859-1','iso-8859-2','iso-8859-15') for charset in charset_list: try: u = unicode(s,charset,"strict") except: pass else: break if u == None: print("CHARSET ERROR while decoding string") sys.exit(1) else: return(u) import UserDict if __name__ == "__main__": #print "** Starting..." #sys.exit(main(sys.argv)) e2 = DBServices() #e2.save_buquet("english") e2.create_bouqet("test", "Tests") pass