Extraction: Bypass age-restriction

This commit is contained in:
James Taylor 2019-12-12 22:13:17 -08:00
parent 205ad29cb0
commit 26f37521ba
2 changed files with 113 additions and 58 deletions

View File

@ -275,17 +275,32 @@ headers = (
) + util.mobile_ua
def extract_info(video_id):
polymer_json = util.fetch_url('https://m.youtube.com/watch?v=' + video_id + '&pbj=1', headers=headers, debug_name='watch')
polymer_json = util.fetch_url('https://m.youtube.com/watch?v=' + video_id + '&pbj=1', headers=headers, debug_name='watch').decode('utf-8')
# TODO: Decide whether this should be done in yt_data_extract.extract_watch_info
try:
polymer_json = json.loads(polymer_json)
except json.decoder.JSONDecodeError:
traceback.print_exc()
return {'error': 'Failed to parse json response'}
info = yt_data_extract.extract_watch_info(polymer_json)
error = decrypt_signatures(info)
if error:
print('Error decrypting url signatures: ' + error)
info['playability_error'] = error
# age restriction bypass
if info['age_restricted']:
print('Fetching age restriction bypass page')
data = {
'video_id': video_id,
'eurl': 'https://youtube.googleapis.com/v/' + video_id,
}
url = 'https://www.youtube.com/get_video_info?' + urllib.parse.urlencode(data)
video_info_page = util.fetch_url(url, debug_name='get_video_info', report_text='Fetched age restriction bypass page').decode('utf-8')
yt_data_extract.update_with_age_restricted_info(info, video_info_page)
# signature decryption
decryption_error = decrypt_signatures(info)
if decryption_error:
decryption_error = 'Error decrypting url signatures: ' + decryption_error
info['playability_error'] = decryption_error
return info
def video_quality_string(format):
@ -410,6 +425,7 @@ def get_watch_page():
uploader = info['author'],
description = info['description'],
unlisted = info['unlisted'],
age_restricted = info['age_restricted'],
playability_error = info['playability_error'],
)

View File

@ -943,6 +943,11 @@ def extract_watch_info_mobile(top_level):
info = {}
microformat = default_multi_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={})
family_safe = microformat.get('isFamilySafe')
if family_safe is None:
info['age_restricted'] = None
else:
info['age_restricted'] = not family_safe
info['allowed_countries'] = microformat.get('availableCountries', [])
info['published_date'] = microformat.get('publishDate')
@ -1055,57 +1060,7 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
url += '&tlang=' + translation_language
return url
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json):
info = {'playability_error': None, 'error': None}
if isinstance(polymer_json, dict):
top_level = polymer_json
elif isinstance(polymer_json, (list, tuple)):
top_level = {}
for page_part in polymer_json:
if not isinstance(page_part, dict):
return {'error': 'Invalid page part'}
top_level.update(page_part)
else:
return {'error': 'Invalid top level polymer data'}
error = check_missing_keys(top_level,
['player', 'args'],
['player', 'assets', 'js'],
['playerResponse'],
)
if error:
info['playability_error'] = error
player_args = default_multi_get(top_level, 'player', 'args', default={})
player_response = json.loads(player_args['player_response']) if 'player_response' in player_args else {}
playability_status = default_multi_get(player_response, 'playabilityStatus', 'status', default=None)
playability_reason = default_multi_get(player_response, 'playabilityStatus', 'reason', default='Unknown error')
if playability_status not in (None, 'OK'):
info['playability_error'] = playability_reason
# captions
info['automatic_caption_languages'] = []
info['manual_caption_languages'] = []
info['translation_languages'] = []
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(default_multi_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
for caption_track in default_multi_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()):
lang_code = caption_track.get('languageCode')
if lang_code:
if caption_track.get('kind') == 'asr':
info['automatic_caption_languages'].append(lang_code)
else:
info['manual_caption_languages'].append(lang_code)
for translation_lang_info in default_multi_get(captions_info, 'playerCaptionsTracklistRenderer', 'translationLanguages', default=()):
lang_code = translation_lang_info.get('languageCode')
if lang_code:
info['translation_languages'].append(lang_code)
if translation_lang_info.get('isTranslatable') == False:
print('WARNING: Found non-translatable caption language')
# formats
def extract_formats(info, player_response):
streaming_data = player_response.get('streamingData', {})
yt_formats = streaming_data.get('formats', []) + streaming_data.get('adaptiveFormats', [])
@ -1132,9 +1087,67 @@ def extract_watch_info(polymer_json):
fmt.update(_formats.get(str(yt_fmt.get('itag')), {}))
info['formats'].append(fmt)
if info['formats']:
info['playability_error'] = None # in case they lie
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json):
info = {'playability_error': None, 'error': None}
if isinstance(polymer_json, dict):
top_level = polymer_json
elif isinstance(polymer_json, (list, tuple)):
top_level = {}
for page_part in polymer_json:
if not isinstance(page_part, dict):
return {'error': 'Invalid page part'}
top_level.update(page_part)
else:
return {'error': 'Invalid top level polymer data'}
error = check_missing_keys(top_level,
['player', 'args'],
['player', 'assets', 'js'],
['playerResponse'],
)
if error:
info['playability_error'] = error
player_args = default_multi_get(top_level, 'player', 'args', default={})
player_response = json.loads(player_args['player_response']) if 'player_response' in player_args else {}
# captions
info['automatic_caption_languages'] = []
info['manual_caption_languages'] = []
info['translation_languages'] = []
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(default_multi_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
for caption_track in default_multi_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()):
lang_code = caption_track.get('languageCode')
if lang_code:
if caption_track.get('kind') == 'asr':
info['automatic_caption_languages'].append(lang_code)
else:
info['manual_caption_languages'].append(lang_code)
for translation_lang_info in default_multi_get(captions_info, 'playerCaptionsTracklistRenderer', 'translationLanguages', default=()):
lang_code = translation_lang_info.get('languageCode')
if lang_code:
info['translation_languages'].append(lang_code)
if translation_lang_info.get('isTranslatable') == False:
print('WARNING: Found non-translatable caption language')
# formats
extract_formats(info, player_response)
playability_status = default_multi_get(player_response, 'playabilityStatus', 'status', default=None)
playability_reason = default_multi_get(player_response, 'playabilityStatus', 'reason', default='Could not find playability error')
if not info['formats']:
if playability_status not in (None, 'OK'):
info['playability_error'] = playability_reason
else:
info['playability_error'] = 'Unknown playability error'
# check age-restriction
info['age_restricted'] = (playability_status == 'LOGIN_REQUIRED' and playability_reason and ' age' in playability_reason)
# base_js (for decryption of signatures)
info['base_js'] = default_multi_get(top_level, 'player', 'assets', 'js')
if info['base_js']:
info['base_js'] = normalize_url(info['base_js'])
@ -1162,3 +1175,29 @@ def extract_watch_info(polymer_json):
# other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
return info
def update_with_age_restricted_info(info, video_info_page):
ERROR_PREFIX = 'Error bypassing age-restriction: '
video_info = urllib.parse.parse_qs(video_info_page)
player_response = default_multi_get(video_info, 'player_response', 0)
if player_response is None:
info['playability_error'] = ERROR_PREFIX + 'Could not find player_response in video_info_page'
return
try:
player_response = json.loads(player_response)
except json.decoder.JSONDecodeError:
traceback.print_exc()
info['playability_error'] = ERROR_PREFIX + 'Failed to parse json response'
return
extract_formats(info, player_response)
if info['formats']:
info['playability_error'] = None
else:
playability_status = default_multi_get(player_response, 'playabilityStatus', 'status', default=None)
playability_reason = default_multi_get(player_response, 'playabilityStatus', 'reason', default=ERROR_PREFIX + 'Could not find playability error')
if playability_status not in (None, 'OK'):
info['playability_error'] = ERROR_PREFIX + playability_reason
else:
info['playability_error'] = ERROR_PREFIX + 'Unknown playability error'