Extraction: Move non-stateful signature decryption functionality into yt_data_extract
This commit is contained in:
@@ -7,4 +7,5 @@ from .everything_else import (extract_channel_info, extract_search_info,
|
||||
extract_playlist_metadata, extract_playlist_info, extract_comments_info)
|
||||
|
||||
from .watch_extraction import (extract_watch_info, get_caption_url,
|
||||
update_with_age_restricted_info)
|
||||
update_with_age_restricted_info, requires_decryption,
|
||||
extract_decryption_function, decrypt_signatures)
|
||||
|
||||
@@ -7,6 +7,7 @@ from .common import (get, multi_get, deep_get, multi_deep_get,
|
||||
import json
|
||||
import urllib.parse
|
||||
import traceback
|
||||
import re
|
||||
|
||||
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py
|
||||
_formats = {
|
||||
@@ -377,7 +378,11 @@ def extract_watch_info(polymer_json):
|
||||
info['base_js'] = deep_get(top_level, 'player', 'assets', 'js')
|
||||
if info['base_js']:
|
||||
info['base_js'] = normalize_url(info['base_js'])
|
||||
info['player_name'] = get(info['base_js'].split('/'), -2)
|
||||
else:
|
||||
info['player_name'] = None
|
||||
|
||||
# extract stuff from visible parts of page
|
||||
mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={})
|
||||
if mobile:
|
||||
info.update(_extract_watch_info_mobile(top_level))
|
||||
@@ -447,3 +452,94 @@ def update_with_age_restricted_info(info, video_info_page):
|
||||
|
||||
_extract_formats(info, player_response)
|
||||
_extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX)
|
||||
|
||||
def requires_decryption(info):
|
||||
return ('formats' in info) and info['formats'] and info['formats'][0]['s']
|
||||
|
||||
# adapted from youtube-dl and invidious:
|
||||
# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
|
||||
decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}')
|
||||
op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)')
|
||||
def extract_decryption_function(info, base_js):
|
||||
'''Insert decryption function into info. Return error string if not successful.
|
||||
Decryption function is a list of list[2] of numbers.
|
||||
It is advisable to cache the decryption function (uniquely identified by info['player_name']) so base.js (1 MB) doesn't need to be redownloaded each time'''
|
||||
info['decryption_function'] = None
|
||||
decrypt_function_match = decrypt_function_re.search(base_js)
|
||||
if decrypt_function_match is None:
|
||||
return 'Could not find decryption function in base.js'
|
||||
|
||||
function_body = decrypt_function_match.group(1).split(';')[1:-1]
|
||||
if not function_body:
|
||||
return 'Empty decryption function body'
|
||||
|
||||
var_name = get(function_body[0].split('.'), 0)
|
||||
if var_name is None:
|
||||
return 'Could not find var_name'
|
||||
|
||||
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
|
||||
if var_body_match is None:
|
||||
return 'Could not find var_body'
|
||||
|
||||
operations = var_body_match.group(1).replace('\n', '').split('},')
|
||||
if not operations:
|
||||
return 'Did not find any definitions in var_body'
|
||||
operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
|
||||
operation_definitions = {}
|
||||
for op in operations:
|
||||
colon_index = op.find(':')
|
||||
opening_brace_index = op.find('{')
|
||||
|
||||
if colon_index == -1 or opening_brace_index == -1:
|
||||
return 'Could not parse operation'
|
||||
op_name = op[:colon_index]
|
||||
op_body = op[opening_brace_index+1:]
|
||||
if op_body == 'a.reverse()':
|
||||
operation_definitions[op_name] = 0
|
||||
elif op_body == 'a.splice(0,b)':
|
||||
operation_definitions[op_name] = 1
|
||||
elif op_body.startswith('var c=a[0]'):
|
||||
operation_definitions[op_name] = 2
|
||||
else:
|
||||
return 'Unknown op_body: ' + op_body
|
||||
|
||||
decryption_function = []
|
||||
for op_with_arg in function_body:
|
||||
match = op_with_arg_re.fullmatch(op_with_arg)
|
||||
if match is None:
|
||||
return 'Could not parse operation with arg'
|
||||
op_name = match.group(1)
|
||||
if op_name not in operation_definitions:
|
||||
return 'Unknown op_name: ' + op_name
|
||||
op_argument = match.group(2)
|
||||
decryption_function.append([operation_definitions[op_name], int(op_argument)])
|
||||
|
||||
info['decryption_function'] = decryption_function
|
||||
return False
|
||||
|
||||
def _operation_2(a, b):
|
||||
c = a[0]
|
||||
a[0] = a[b % len(a)]
|
||||
a[b % len(a)] = c
|
||||
|
||||
def decrypt_signatures(info):
|
||||
'''Applies info['decryption_function'] to decrypt all the signatures. Return err.'''
|
||||
if not info.get('decryption_function'):
|
||||
return 'decryption_function not in info'
|
||||
for format in info['formats']:
|
||||
if not format['s'] or not format['sp'] or not format['url']:
|
||||
print('Warning: s, sp, or url not in format')
|
||||
continue
|
||||
|
||||
a = list(format['s'])
|
||||
for op, argument in info['decryption_function']:
|
||||
if op == 0:
|
||||
a.reverse()
|
||||
elif op == 1:
|
||||
a = a[argument:]
|
||||
else:
|
||||
_operation_2(a, argument)
|
||||
|
||||
signature = ''.join(a)
|
||||
format['url'] += '&' + format['sp'] + '=' + signature
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user