123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- # -*- coding: UTF-8 -*-
- # This video extraction code based on youtube-dl: https://github.com/rg3/youtube-dl
-
- import codecs
- import json
- import re
-
- from urllib import urlencode
- from urllib2 import urlopen, URLError
- import sys
- import ssl
- if "_create_unverified_context" in dir(ssl):
- ssl._create_default_https_context = ssl._create_unverified_context
-
- #from Components.config import config
-
- #from . import sslContext
- sslContext = None
- if sys.version_info >= (2, 7, 9):
- try:
- import ssl
- sslContext = ssl._create_unverified_context()
- except:
- pass
- from jsinterp import JSInterpreter
- from swfinterp import SWFInterpreter
-
-
- PRIORITY_VIDEO_FORMAT = []
- maxResolution = '22'
-
-
- def createPriorityFormats():
- global PRIORITY_VIDEO_FORMAT,maxResolution
- PRIORITY_VIDEO_FORMAT = []
- use_format = False
- for itag_value in ['38', '37', '96', '22', '95', '120',
- '35', '94', '18', '93', '5', '92', '132', '17']:
- if itag_value == maxResolution: #config.plugins.YouTube.maxResolution.value:
- use_format = True
- if use_format:
- PRIORITY_VIDEO_FORMAT.append(itag_value)
-
- createPriorityFormats()
-
- IGNORE_VIDEO_FORMAT = [
- '43', # webm
- '44', # webm
- '45', # webm
- '46', # webm
- '100', # webm
- '101', # webm
- '102' # webm
- ]
-
-
- def uppercase_escape(s):
- unicode_escape = codecs.getdecoder('unicode_escape')
- return re.sub(
- r'\\U[0-9a-fA-F]{8}',
- lambda m: unicode_escape(m.group(0))[0],
- s)
-
-
- def compat_urllib_parse_unquote(string, encoding='utf-8', errors='replace'):
- if string == '':
- return string
- res = string.split('%')
- if len(res) == 1:
- return string
- if encoding is None:
- encoding = 'utf-8'
- if errors is None:
- errors = 'replace'
- # pct_sequence: contiguous sequence of percent-encoded bytes, decoded
- pct_sequence = b''
- string = res[0]
- for item in res[1:]:
- try:
- if not item:
- raise ValueError
- pct_sequence += item[:2].decode('hex')
- rest = item[2:]
- if not rest:
- # This segment was just a single percent-encoded character.
- # May be part of a sequence of code units, so delay decoding.
- # (Stored in pct_sequence).
- continue
- except ValueError:
- rest = '%' + item
- # Encountered non-percent-encoded characters. Flush the current
- # pct_sequence.
- string += pct_sequence.decode(encoding, errors) + rest
- pct_sequence = b''
- if pct_sequence:
- # Flush the final pct_sequence
- string += pct_sequence.decode(encoding, errors)
- return string
-
-
- def _parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
- encoding='utf-8', errors='replace'):
- qs, _coerce_result = qs, unicode
- pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
- r = []
- for name_value in pairs:
- if not name_value and not strict_parsing:
- continue
- nv = name_value.split('=', 1)
- if len(nv) != 2:
- if strict_parsing:
- raise ValueError("bad query field: %r" % (name_value,))
- # Handle case of a control-name with no equal sign
- if keep_blank_values:
- nv.append('')
- else:
- continue
- if len(nv[1]) or keep_blank_values:
- name = nv[0].replace('+', ' ')
- name = compat_urllib_parse_unquote(
- name, encoding=encoding, errors=errors)
- name = _coerce_result(name)
- value = nv[1].replace('+', ' ')
- value = compat_urllib_parse_unquote(
- value, encoding=encoding, errors=errors)
- value = _coerce_result(value)
- r.append((name, value))
- return r
-
-
- def compat_parse_qs(qs, keep_blank_values=False, strict_parsing=False,
- encoding='utf-8', errors='replace'):
- parsed_result = {}
- pairs = _parse_qsl(qs, keep_blank_values, strict_parsing,
- encoding=encoding, errors=errors)
- for name, value in pairs:
- if name in parsed_result:
- parsed_result[name].append(value)
- else:
- parsed_result[name] = [value]
- return parsed_result
-
-
- class YouTubeVideoUrl():
-
- def _download_webpage(self, url):
- """ Returns a tuple (page content as string, URL handle) """
- try:
- if sslContext:
- urlh = urlopen(url, context = sslContext)
- else:
- urlh = urlopen(url)
- except URLError, e:
- #raise Exception(e.reason)
- return ""
- return urlh.read()
-
- def _search_regex(self, pattern, string):
- """
- Perform a regex search on the given string, using a single or a list of
- patterns returning the first matching group.
- """
- mobj = re.search(pattern, string, 0)
- if mobj:
- # return the first matching group
- return next(g for g in mobj.groups() if g is not None)
- else:
- raise Exception('Unable extract pattern from string!')
-
- def _decrypt_signature(self, s, player_url):
- """Turn the encrypted s field into a working signature"""
-
- if player_url is None:
- raise Exception('Cannot decrypt signature without player_url!')
-
- if player_url[:2] == '//':
- player_url = 'https:' + player_url
- try:
- func = self._extract_signature_function(player_url)
- return func(s)
- except:
- raise Exception('Signature extraction failed!')
-
- def _extract_signature_function(self, player_url):
- id_m = re.match(
- r'.*?-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player(?:-new)?|/base)?\.(?P<ext>[a-z]+)$',
- player_url)
- if not id_m:
- raise Exception('Cannot identify player %r!' % player_url)
- player_type = id_m.group('ext')
- code = self._download_webpage(player_url)
- if player_type == 'js':
- return self._parse_sig_js(code)
- elif player_type == 'swf':
- return self._parse_sig_swf(code)
- else:
- raise Exception('Invalid player type %r!' % player_type)
-
- def _parse_sig_js(self, jscode):
- funcname = self._search_regex(r'\.sig\|\|([a-zA-Z0-9$]+)\(', jscode)
- jsi = JSInterpreter(jscode)
- initial_function = jsi.extract_function(funcname)
- return lambda s: initial_function([s])
-
- def _parse_sig_swf(self, file_contents):
- swfi = SWFInterpreter(file_contents)
- TARGET_CLASSNAME = 'SignatureDecipher'
- searched_class = swfi.extract_class(TARGET_CLASSNAME)
- initial_function = swfi.extract_function(searched_class, 'decipher')
- return lambda s: initial_function([s])
-
- def _extract_from_m3u8(self, manifest_url):
- url_map = {}
-
- def _get_urls(_manifest):
- lines = _manifest.split('\n')
- urls = filter(lambda l: l and not l.startswith('#'), lines)
- return urls
-
- manifest = self._download_webpage(manifest_url)
- formats_urls = _get_urls(manifest)
- for format_url in formats_urls:
- itag = self._search_regex(r'itag/(\d+?)/', format_url)
- url_map[itag] = format_url
- return url_map
-
- def _get_ytplayer_config(self, webpage):
- # User data may contain arbitrary character sequences that may affect
- # JSON extraction with regex, e.g. when '};' is contained the second
- # regex won't capture the whole JSON. Yet working around by trying more
- # concrete regex first keeping in mind proper quoted string handling
- # to be implemented in future that will replace this workaround (see
- # https://github.com/rg3/youtube-dl/issues/7468,
- # https://github.com/rg3/youtube-dl/pull/7599)
- patterns = [
- r';ytplayer\.config\s*=\s*({.+?});ytplayer',
- r';ytplayer\.config\s*=\s*({.+?});',
- ]
- for pattern in patterns:
- config = self._search_regex(pattern, webpage)
- if config:
- return json.loads(uppercase_escape(config))
-
- def extract(self, video_id):
- url = 'https://www.youtube.com/watch?v=%s&gl=US&hl=en&has_verified=1&bpctr=9999999999' % video_id
-
- # Get video webpage
- video_webpage = self._download_webpage(url)
- if not video_webpage:
- #raise Exception('Video webpage not found!')
- return ""
-
- # Attempt to extract SWF player URL
- mobj = re.search(r'swfConfig.*?"(https?:\\/\\/.*?watch.*?-.*?\.swf)"', video_webpage)
- if mobj is not None:
- player_url = re.sub(r'\\(.)', r'\1', mobj.group(1))
- else:
- player_url = None
-
- # Get video info
- embed_webpage = None
- if re.search(r'player-age-gate-content">', video_webpage) is not None:
- age_gate = True
- # We simulate the access to the video from www.youtube.com/v/{video_id}
- # this can be viewed without login into Youtube
- url = 'https://www.youtube.com/embed/%s' % video_id
- embed_webpage = self._download_webpage(url)
- data = urlencode({
- 'video_id': video_id,
- 'eurl': 'https://youtube.googleapis.com/v/' + video_id,
- 'sts': self._search_regex(r'"sts"\s*:\s*(\d+)', embed_webpage),
- })
- video_info_url = 'https://www.youtube.com/get_video_info?' + data
- video_info_webpage = self._download_webpage(video_info_url)
- video_info = compat_parse_qs(video_info_webpage)
- else:
- age_gate = False
- video_info = None
- # Try looking directly into the video webpage
- ytplayer_config = self._get_ytplayer_config(video_webpage)
- if ytplayer_config:
- args = ytplayer_config['args']
- if args.get('url_encoded_fmt_stream_map'):
- # Convert to the same format returned by compat_parse_qs
- video_info = dict((k, [v]) for k, v in args.items())
-
- if not video_info:
- # We also try looking in get_video_info since it may contain different dashmpd
- # URL that points to a DASH manifest with possibly different itag set (some itags
- # are missing from DASH manifest pointed by webpage's dashmpd, some - from DASH
- # manifest pointed by get_video_info's dashmpd).
- # The general idea is to take a union of itags of both DASH manifests (for example
- # video with such 'manifest behavior' see https://github.com/rg3/youtube-dl/issues/6093)
- for el_type in ['&el=info', '&el=embedded', '&el=detailpage', '&el=vevo', '']:
- video_info_url = (
- 'https://www.youtube.com/get_video_info?&video_id=%s%s&ps=default&eurl=&gl=US&hl=en'
- % (video_id, el_type))
- video_info_webpage = self._download_webpage(video_info_url)
- video_info = compat_parse_qs(video_info_webpage)
- if 'token' in video_info:
- break
- if 'token' not in video_info:
- if 'reason' in video_info:
- print '[YouTubeVideoUrl] %s' % video_info['reason'][0]
- else:
- print '[YouTubeVideoUrl] "token" parameter not in video info for unknown reason'
-
- # Start extracting information
- if 'conn' in video_info and video_info['conn'][0][:4] == 'rtmp':
- url = video_info['conn'][0]
- elif len(video_info.get('url_encoded_fmt_stream_map', [''])[0]) >= 1 or \
- len(video_info.get('adaptive_fmts', [''])[0]) >= 1:
- encoded_url_map = video_info.get('url_encoded_fmt_stream_map', [''])[0] + \
- ',' + video_info.get('adaptive_fmts', [''])[0]
- if 'rtmpe%3Dyes' in encoded_url_map:
- raise Exception('rtmpe downloads are not supported, see https://github.com/rg3/youtube-dl/issues/343')
-
- # Find the best format from our format priority map
- encoded_url_map = encoded_url_map.split(',')
- url_map_str = None
- # If format changed in config, recreate priority list
- if PRIORITY_VIDEO_FORMAT[0] != maxResolution: #config.plugins.YouTube.maxResolution.value:
- createPriorityFormats()
- for our_format in PRIORITY_VIDEO_FORMAT:
- our_format = 'itag=' + our_format
- for encoded_url in encoded_url_map:
- if our_format in encoded_url and 'url=' in encoded_url:
- url_map_str = encoded_url
- break
- if url_map_str:
- break
- # If anything not found, used first in the list if it not in ignore map
- if not url_map_str:
- for encoded_url in encoded_url_map:
- if 'url=' in encoded_url:
- url_map_str = encoded_url
- for ignore_format in IGNORE_VIDEO_FORMAT:
- ignore_format = 'itag=' + ignore_format
- if ignore_format in encoded_url:
- url_map_str = None
- break
- if url_map_str:
- break
- if not url_map_str:
- url_map_str = encoded_url_map[0]
-
- url_data = compat_parse_qs(url_map_str)
- url = url_data['url'][0]
- if 'sig' in url_data:
- url += '&signature=' + url_data['sig'][0]
- elif 's' in url_data:
- encrypted_sig = url_data['s'][0]
- ASSETS_RE = r'"assets":.+?"js":\s*("[^"]+")'
-
- jsplayer_url_json = self._search_regex(ASSETS_RE,
- embed_webpage if age_gate else video_webpage)
- if not jsplayer_url_json and not age_gate:
- # We need the embed website after all
- if embed_webpage is None:
- embed_url = 'https://www.youtube.com/embed/%s' % video_id
- embed_webpage = self._download_webpage(embed_url)
- jsplayer_url_json = self._search_regex(ASSETS_RE, embed_webpage)
-
- player_url = json.loads(jsplayer_url_json)
- if player_url is None:
- player_url_json = self._search_regex(
- r'ytplayer\.config.*?"url"\s*:\s*("[^"]+")',
- video_webpage)
- player_url = json.loads(player_url_json)
-
- signature = self._decrypt_signature(encrypted_sig, player_url)
- url += '&signature=' + signature
- if 'ratebypass' not in url:
- url += '&ratebypass=yes'
- elif video_info.get('hlsvp'):
- url = None
- manifest_url = video_info['hlsvp'][0]
- url_map = self._extract_from_m3u8(manifest_url)
-
- # Find the best format from our format priority map
- for our_format in PRIORITY_VIDEO_FORMAT:
- if url_map.get(our_format):
- url = url_map[our_format]
- break
- # If anything not found, used first in the list if it not in ignore map
- if not url:
- for url_map_key in url_map.keys():
- if url_map_key not in IGNORE_VIDEO_FORMAT:
- url = url_map[url_map_key]
- break
- if not url:
- url = url_map.values()[0]
- else:
- #raise Exception('No supported formats found in video info!')
- return ""
-
- return str(url)
-
- if __name__ == "__main__":
-
- #yt = YouTubeVideoUrl()
- if len(sys.argv)>1:
- video_id= sys.argv[1]
- else:
- video_id = "2rlTF6HiMGg"
- e = YouTubeVideoUrl().extract(video_id)
- print e
|