Rewrite channel extraction with proper error handling and new extraction names. Extract subscriber_count correctly.

Don't just shove english strings into info['stats']. Actually give semantic names for the stats.
This commit is contained in:
James Taylor 2019-12-21 15:45:01 -08:00
parent 3936310e7e
commit 7a6bcb6128
3 changed files with 48 additions and 47 deletions

View File

@ -116,8 +116,14 @@
{% if current_tab == 'about' %}
<div class="channel-info">
<ul>
{% for stat in stats %}
<li>{{ stat }}</li>
{% for (before_text, stat, after_text) in [
('Joined ', date_joined, ''),
('', view_count|commatize, ' views'),
('', approx_subscriber_count, ' subscribers'),
] %}
{% if stat %}
<li>{{ before_text + stat|string + after_text }}</li>
{% endif %}
{% endfor %}
</ul>
<hr>

View File

@ -74,6 +74,8 @@ def conservative_update(obj, key, value):
obj[key] = value
def remove_redirect(url):
if url is None:
return None
if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking
query_string = url[url.find('?')+1: ]
return urllib.parse.parse_qs(query_string)['q'][0]
@ -155,6 +157,8 @@ def extract_approx_int(string):
MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'}
def extract_date(date_text):
'''Input: "Mar 9, 2019". Output: "2019-3-9"'''
if not isinstance(date_text, str):
date_text = extract_str(date_text)
if date_text is None:
return None
@ -165,6 +169,7 @@ def extract_date(date_text):
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day
return None
def check_missing_keys(object, *key_sequences):
for key_sequence in key_sequences:
@ -319,8 +324,6 @@ item_types = {
'channelRenderer',
'compactChannelRenderer',
'gridChannelRenderer',
'channelAboutFullMetadataRenderer',
}
def _traverse_browse_renderer(renderer):

View File

@ -20,72 +20,64 @@ def extract_channel_info(polymer_json, tab):
# channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
except KeyError:
if 'alerts' in response and len(response['alerts']) > 0:
return {'error': ' '.join(alert['alertRenderer']['text']['simpleText'] for alert in response['alerts']) }
elif 'errors' in response['responseContext']:
for error in response['responseContext']['errors']['error']:
if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id':
if response.get('alerts'):
return {'error': ' '.join(
deep_get(alert, 'alertRenderer', 'text', 'simpleText', default='')
for alert in response['alerts']
)}
elif deep_get(response, 'responseContext', 'errors'):
for error in response['responseContext']['errors'].get('error', []):
if error.get('code') == 'INVALID_VALUE' and error.get('location') == 'browse_id':
return {'error': 'This channel does not exist'}
return {'error': 'Failure getting microformat'}
info = {'error': None}
info['current_tab'] = tab
info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
# stuff from microformat (info given by youtube for every page on channel)
info['short_description'] = microformat['description']
info['channel_name'] = microformat['title']
info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url']
channel_url = microformat['urlCanonical'].rstrip('/')
channel_id = channel_url[channel_url.rfind('/')+1:]
info['channel_id'] = channel_id
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['items'] = []
info['short_description'] = microformat.get('description')
info['channel_name'] = microformat.get('title')
info['avatar'] = deep_get(microformat, 'thumbnail', 'thumbnails', 0, 'url')
channel_url = microformat.get('urlCanonical')
if channel_url:
channel_id = get(channel_url.rstrip('/').split('/'), -1)
info['channel_id'] = channel_id
else:
info['channel_id'] = deep_get(response, 'metadata', 'channelMetadataRenderer', 'externalId')
if info['channel_id']:
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
else:
info['channel_url'] = None
# empty channel
if 'contents' not in response and 'continuationContents' not in response:
return info
items, _ = extract_items(response)
# get items
info['items'] = []
if tab in ('videos', 'playlists', 'search'):
additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
items, _ = extract_items(response)
additional_info = {'author': info['channel_name'], 'author_url': info['channel_url']}
info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
elif tab == 'about':
for item in items:
try:
channel_metadata = item['channelAboutFullMetadataRenderer']
break
except KeyError:
pass
else:
items, _ = extract_items(response, item_types={'channelAboutFullMetadataRenderer'})
if not items:
info['error'] = 'Could not find channelAboutFullMetadataRenderer'
return info
channel_metadata = items[0]['channelAboutFullMetadataRenderer']
info['links'] = []
for link_json in channel_metadata.get('primaryLinks', ()):
url = remove_redirect(link_json['navigationEndpoint']['urlEndpoint']['url'])
text = extract_str(link_json['title'])
url = remove_redirect(deep_get(link_json, 'navigationEndpoint', 'urlEndpoint', 'url'))
text = extract_str(link_json.get('title'))
info['links'].append( (text, url) )
info['stats'] = []
for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
try:
stat = channel_metadata[stat_name]
except KeyError:
continue
info['stats'].append(extract_str(stat))
if 'description' in channel_metadata:
info['description'] = extract_str(channel_metadata['description'])
else:
info['description'] = ''
info['date_joined'] = extract_date(channel_metadata.get('joinedDateText'))
info['view_count'] = extract_int(channel_metadata.get('viewCountText'))
info['description'] = extract_str(channel_metadata.get('description'), default='')
else:
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)