Extraction: Move item extraction into a generic, robust function

This commit is contained in:
James Taylor 2019-09-27 18:03:19 -07:00
parent 61c50e0b54
commit ce8a658a0e

View File

@ -4,6 +4,7 @@ import html
import json
import re
import urllib
import collections
from math import ceil
# videos (all of type str):
@ -58,15 +59,52 @@ def format_text_runs(runs):
result += html.escape(text_run["text"])
return result
def default_get(object, key, default, types=()):
'''Like dict.get(), but returns default if the result doesn't match one of the types.
Also works for indexing lists.'''
try:
result = object[key]
except (TypeError, IndexError, KeyError):
return default
def default_multi_get(object, *keys, default):
''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
if not types or isinstance(result, types):
return result
else:
return default
def default_multi_get(object, *keys, default, types=()):
'''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices.
Last argument is the default value to use in case of any IndexErrors or KeyErrors.
If types is given and the result doesn't match one of those types, default is returned'''
try:
for key in keys:
object = object[key]
return object
except (IndexError, KeyError):
except (TypeError, IndexError, KeyError):
return default
else:
if not types or isinstance(object, types):
return object
else:
return default
def multi_default_multi_get(object, *key_sequences, default=None, types=()):
'''Like default_multi_get, but can try different key sequences in case one fails.
Return default if all of them fail. key_sequences is a list of lists'''
for key_sequence in key_sequences:
_object = object
try:
for key in key_sequence:
_object = _object[key]
except (TypeError, IndexError, KeyError):
pass
else:
if not types or isinstance(_object, types):
return _object
else:
continue
return default
def get_url(node):
@ -284,6 +322,7 @@ def parse_info_prepare_for_html(renderer, additional_info={}):
return item
# TODO: Type checking
def get_response(polymer_json):
'''return response, error'''
@ -301,6 +340,123 @@ def get_response(polymer_json):
return None, 'Failed to extract response'
list_types = {
'sectionListRenderer',
'itemSectionRenderer',
'gridRenderer',
'playlistVideoListRenderer',
}
item_types = {
'movieRenderer',
'didYouMeanRenderer',
'showingResultsForRenderer',
'videoRenderer',
'compactVideoRenderer',
'gridVideoRenderer',
'playlistVideoRenderer',
'playlistRenderer',
'compactPlaylistRenderer',
'gridPlaylistRenderer',
'radioRenderer',
'compactRadioRenderer',
'gridRadioRenderer',
'showRenderer',
'compactShowRenderer',
'gridShowRenderer',
'channelRenderer',
'compactChannelRenderer',
'gridChannelRenderer',
'channelAboutFullMetadataRenderer',
}
def traverse_browse_renderer(renderer):
for tab in default_get(renderer, 'tabs', (), types=(list, tuple)):
tab_renderer = multi_default_multi_get(tab, ['tabRenderer'], ['expandableTabRenderer'], default=None, types=dict)
if tab_renderer is None:
continue
if tab_renderer.get('selected', False):
return default_get(tab_renderer, 'content', {}, types=(dict))
print('Could not find tab with content')
return {}
# these renderers contain one inside them
nested_renderer_dispatch = {
'singleColumnBrowseResultsRenderer': traverse_browse_renderer,
'twoColumnBrowseResultsRenderer': traverse_browse_renderer,
'twoColumnSearchResultsRenderer': lambda renderer: default_get(renderer, 'primaryContents', {}, types=dict),
}
def extract_items(response):
'''return items, ctoken'''
if 'continuationContents' in response:
# always has just the one [something]Continuation key, but do this just in case they add some tracking key or something
for key, renderer_continuation in default_get(response, 'continuationContents', {}, types=dict).items():
if key.endswith('Continuation'): # e.g. commentSectionContinuation, playlistVideoListContinuation
items = multi_default_multi_get(renderer_continuation, ['contents'], ['items'], default=None, types=(list, tuple))
ctoken = default_multi_get(renderer_continuation, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str)
return items, ctoken
return [], None
elif 'contents' in response:
ctoken = None
items = []
iter_stack = collections.deque()
current_iter = iter(())
renderer = default_get(response, 'contents', {}, types=dict)
while True:
# mode 1: dig into the current renderer
# Will stay in mode 1 (via continue) if a new renderer is found inside this one
# Otherwise, after finding that it is an item renderer,
# contains a list, or contains nothing,
# falls through into mode 2 to get a new renderer
if len(renderer) != 0:
key, value = list(renderer.items())[0]
# has a list in it, add it to the iter stack
if key in list_types:
renderer_list = multi_default_multi_get(value, ['contents'], ['items'], default=(), types=(list, tuple))
if renderer_list:
iter_stack.append(current_iter)
current_iter = iter(renderer_list)
continuation = default_multi_get(value, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str)
if continuation:
ctoken = continuation
# new renderer nested inside this one
elif key in nested_renderer_dispatch:
renderer = nested_renderer_dispatch[key](value)
continue # back to mode 1
# the renderer is an item
elif key in item_types:
items.append(renderer)
# mode 2: get a new renderer by iterating.
# goes up the stack for an iterator if one has been exhausted
while current_iter is not None:
try:
renderer = current_iter.__next__()
break
except StopIteration:
try:
current_iter = iter_stack.pop() # go back up the stack
except IndexError:
return items, ctoken
else:
return [], None
def extract_channel_info(polymer_json, tab):
response, err = get_response(polymer_json)
@ -341,54 +497,21 @@ def extract_channel_info(polymer_json, tab):
return info
# find the tab with content
# example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg
# TODO: maybe use the 'selected' attribute for this?
if 'continuationContents' not in response:
tab_renderer = None
tab_content = None
for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']:
try:
tab_renderer = tab_json['tabRenderer']
except KeyError:
tab_renderer = tab_json['expandableTabRenderer']
try:
tab_content = tab_renderer['content']
break
except KeyError:
pass
else: # didn't break
raise Exception("No tabs found with content")
assert tab == tab_renderer['title'].lower()
# extract tab-specific info
if tab in ('videos', 'playlists', 'search'): # find the list of items
if 'continuationContents' in response:
try:
items = response['continuationContents']['gridContinuation']['items']
except KeyError:
items = response['continuationContents']['sectionListContinuation']['contents'] # for search
else:
contents = tab_content['sectionListRenderer']['contents']
if 'itemSectionRenderer' in contents[0]:
item_section = contents[0]['itemSectionRenderer']['contents'][0]
try:
items = item_section['gridRenderer']['items']
except KeyError:
if "messageRenderer" in item_section:
items = []
else:
raise Exception('gridRenderer missing but messageRenderer not found')
else:
items = contents # for search
items, _ = extract_items(response)
if tab in ('videos', 'playlists', 'search'):
additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
info['items'] = [renderer_info(renderer, additional_info) for renderer in items]
elif tab == 'about':
channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer']
for item in items:
try:
channel_metadata = item['channelAboutFullMetadataRenderer']
break
except KeyError:
pass
else:
info['error'] = 'Could not find channelAboutFullMetadataRenderer'
return info
info['links'] = []
for link_json in channel_metadata.get('primaryLinks', ()):
@ -428,10 +551,9 @@ def extract_search_info(polymer_json):
info['estimated_results'] = int(response['estimatedResults'])
info['estimated_pages'] = ceil(info['estimated_results']/20)
# almost always is the first "section", but if there's an advertisement for a google product like Stadia or Home in the search results, then that becomes the first "section" and the search results are in the second. So just join all of them for resiliency
results = []
for section in response['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents']:
results += section['itemSectionRenderer']['contents']
results, _ = extract_items(response)
info['items'] = []
info['corrections'] = {'type': None}
@ -491,12 +613,8 @@ def extract_playlist_info(polymer_json):
if err:
return {'error': err}
info = {'error': None}
try: # first page
video_list = response['contents']['singleColumnBrowseResultsRenderer']['tabs'][0]['tabRenderer']['content']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['playlistVideoListRenderer']['contents']
first_page = True
except KeyError: # other pages
video_list = response['continuationContents']['playlistVideoListContinuation']['contents']
first_page = False
first_page = 'continuationContents' not in response
video_list, _ = extract_items(response)
info['items'] = [renderer_info(renderer) for renderer in video_list]
@ -539,12 +657,7 @@ def parse_comments_polymer(polymer_json):
ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
metadata = ctoken_metadata(ctoken)
try:
comments_raw = response['continuationContents']['commentSectionContinuation']['items']
except KeyError:
comments_raw = response['continuationContents']['commentRepliesContinuation']['contents']
ctoken = default_multi_get(response, 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='')
comments_raw, ctoken = extract_items(response)
comments = []
for comment_json in comments_raw: