Extraction: Rewrite item_extraction for better error handling and readability, rename extracted names for more consistency

This commit is contained in:
James Taylor 2019-12-18 19:39:16 -08:00
parent ee0a118a6c
commit 98777ee825
12 changed files with 304 additions and 339 deletions

View File

@ -23,3 +23,10 @@ def inject_theme_preference():
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css', 'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css',
} }
@yt_app.template_filter('commatize')
def commatize(num):
if num is None:
return ''
if isinstance(num, str):
num = int(num)
return '{:,}'.format(num)

View File

@ -91,33 +91,33 @@ def post_process_comments_info(comments_info):
comment['author_url'] = util.URL_ORIGIN + comment['author_url'] comment['author_url'] = util.URL_ORIGIN + comment['author_url']
comment['author_avatar'] = '/' + comment['author_avatar'] comment['author_avatar'] = '/' + comment['author_avatar']
comment['permalink'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] + '&lc=' + comment['comment_id'] comment['permalink'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] + '&lc=' + comment['id']
if comment['author_channel_id'] in accounts.accounts: if comment['author_channel_id'] in accounts.accounts:
comment['delete_url'] = (util.URL_ORIGIN + '/delete_comment?video_id=' comment['delete_url'] = (util.URL_ORIGIN + '/delete_comment?video_id='
+ comments_info['video_id'] + comments_info['video_id']
+ '&channel_id='+ comment['author_channel_id'] + '&channel_id='+ comment['author_channel_id']
+ '&author_id=' + comment['author_id'] + '&author_id=' + comment['author_id']
+ '&comment_id=' + comment['comment_id']) + '&comment_id=' + comment['id'])
num_replies = comment['number_of_replies'] reply_count = comment['reply_count']
if num_replies == 0: if reply_count == 0:
comment['replies_url'] = util.URL_ORIGIN + '/post_comment?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id'] comment['replies_url'] = util.URL_ORIGIN + '/post_comment?parent_id=' + comment['id'] + "&video_id=" + comments_info['video_id']
else: else:
comment['replies_url'] = util.URL_ORIGIN + '/comments?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id'] comment['replies_url'] = util.URL_ORIGIN + '/comments?parent_id=' + comment['id'] + "&video_id=" + comments_info['video_id']
if num_replies == 0: if reply_count == 0:
comment['view_replies_text'] = 'Reply' comment['view_replies_text'] = 'Reply'
elif num_replies == 1: elif reply_count == 1:
comment['view_replies_text'] = '1 reply' comment['view_replies_text'] = '1 reply'
else: else:
comment['view_replies_text'] = str(num_replies) + ' replies' comment['view_replies_text'] = str(reply_count) + ' replies'
if comment['likes'] == 1: if comment['like_count'] == 1:
comment['likes_text'] = '1 like' comment['likes_text'] = '1 like'
else: else:
comment['likes_text'] = str(comment['likes']) + ' likes' comment['likes_text'] = str(comment['like_count']) + ' likes'
comments_info['include_avatars'] = settings.enable_comment_avatars comments_info['include_avatars'] = settings.enable_comment_avatars
if comments_info['ctoken']: if comments_info['ctoken']:

View File

@ -98,13 +98,19 @@ def get_playlist_page():
info['metadata'] = yt_data_extract.extract_playlist_metadata(first_page_json) info['metadata'] = yt_data_extract.extract_playlist_metadata(first_page_json)
yt_data_extract.prefix_urls(info['metadata']) yt_data_extract.prefix_urls(info['metadata'])
for item in info['items']: for item in info.get('items', ()):
yt_data_extract.prefix_urls(item) yt_data_extract.prefix_urls(item)
yt_data_extract.add_extra_html_info(item) yt_data_extract.add_extra_html_info(item)
if 'id' in item:
item['thumbnail'] = '/https://i.ytimg.com/vi/' + item['id'] + '/default.jpg'
video_count = yt_data_extract.default_multi_get(info, 'metadata', 'video_count')
if video_count is None:
video_count = 40
return flask.render_template('playlist.html', return flask.render_template('playlist.html',
video_list = info['items'], video_list = info.get('items', []),
num_pages = math.ceil(info['metadata']['size']/20), num_pages = math.ceil(video_count/20),
parameters_dictionary = request.args, parameters_dictionary = request.args,
**info['metadata'] **info['metadata']

View File

@ -79,9 +79,9 @@ def get_search_page():
if search_info['error']: if search_info['error']:
return flask.render_template('error.html', error_message = search_info['error']) return flask.render_template('error.html', error_message = search_info['error'])
for item_info in search_info['items']: for extract_item_info in search_info['items']:
yt_data_extract.prefix_urls(item_info) yt_data_extract.prefix_urls(extract_item_info)
yt_data_extract.add_extra_html_info(item_info) yt_data_extract.add_extra_html_info(extract_item_info)
corrections = search_info['corrections'] corrections = search_info['corrections']
if corrections['type'] == 'did_you_mean': if corrections['type'] == 'did_you_mean':

View File

@ -172,7 +172,7 @@ def _get_videos(cursor, number_per_page, offset, tag = None):
'id': db_video[0], 'id': db_video[0],
'title': db_video[1], 'title': db_video[1],
'duration': db_video[2], 'duration': db_video[2],
'published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]), 'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'author': db_video[5], 'author': db_video[5],
}) })
@ -462,8 +462,10 @@ def _get_upstream_videos(channel_id):
videos = channel_info['items'] videos = channel_info['items']
for i, video_item in enumerate(videos): for i, video_item in enumerate(videos):
if 'description' not in video_item: if not video_item.get('description'):
video_item['description'] = '' video_item['description'] = ''
else:
video_item['description'] = ''.join(run.get('text', '') for run in video_item['description'])
if video_item['id'] in times_published: if video_item['id'] in times_published:
video_item['time_published'] = times_published[video_item['id']] video_item['time_published'] = times_published[video_item['id']]
@ -471,7 +473,7 @@ def _get_upstream_videos(channel_id):
else: else:
video_item['is_time_published_exact'] = False video_item['is_time_published_exact'] = False
try: try:
video_item['time_published'] = youtube_timestamp_to_posix(video_item['published']) - i # subtract a few seconds off the videos so they will be in the right order video_item['time_published'] = youtube_timestamp_to_posix(video_item['time_published']) - i # subtract a few seconds off the videos so they will be in the right order
except KeyError: except KeyError:
print(video_item) print(video_item)

View File

@ -12,11 +12,11 @@
<a class="author" href="{{ comment['author_url'] }}" title="{{ comment['author'] }}">{{ comment['author'] }}</a> <a class="author" href="{{ comment['author_url'] }}" title="{{ comment['author'] }}">{{ comment['author'] }}</a>
</address> </address>
<a class="permalink" href="{{ comment['permalink'] }}" title="permalink"> <a class="permalink" href="{{ comment['permalink'] }}" title="permalink">
<time datetime="">{{ comment['published'] }}</time> <time datetime="">{{ comment['time_published'] }}</time>
</a> </a>
<span class="text">{{ common_elements.text_runs(comment['text']) }}</span> <span class="text">{{ common_elements.text_runs(comment['text']) }}</span>
<span class="likes">{{ comment['likes_text'] if comment['likes'] else ''}}</span> <span class="likes">{{ comment['likes_text'] if comment['like_count'] else ''}}</span>
<div class="bottom-row"> <div class="bottom-row">
<a href="{{ comment['replies_url'] }}" class="replies">{{ comment['view_replies_text'] }}</a> <a href="{{ comment['replies_url'] }}" class="replies">{{ comment['view_replies_text'] }}</a>
{% if 'delete_url' is in comment %} {% if 'delete_url' is in comment %}

View File

@ -9,55 +9,59 @@
{{ text_run["text"] }} {{ text_run["text"] }}
{%- endif -%} {%- endif -%}
{%- endfor -%} {%- endfor -%}
{%- else -%} {%- elif runs -%}
{{ runs }} {{ runs }}
{%- endif -%} {%- endif -%}
{% endmacro %} {% endmacro %}
{% macro item(info, description=false, horizontal=true, include_author=true, include_badges=true) %} {% macro item(info, description=false, horizontal=true, include_author=true, include_badges=true) %}
<div class="item-box {{ info['type'] + '-item-box' }} {{'horizontal-item-box' if horizontal else 'vertical-item-box'}} {{'has-description' if description else 'no-description'}}"> <div class="item-box {{ info['type'] + '-item-box' }} {{'horizontal-item-box' if horizontal else 'vertical-item-box'}} {{'has-description' if description else 'no-description'}}">
<div class="item {{ info['type'] + '-item' }}"> {% if info['error'] %}
<a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}"> {{ info['error'] }}
<img class="thumbnail-img" src="{{ info['thumbnail'] }}"> {% else %}
{% if info['type'] != 'channel' %} <div class="item {{ info['type'] + '-item' }}">
<div class="thumbnail-info"> <a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}">
<span>{{ info['size'] if info['type'] == 'playlist' else info['duration'] }}</span> <img class="thumbnail-img" src="{{ info['thumbnail'] }}">
</div> {% if info['type'] != 'channel' %}
{% endif %} <div class="thumbnail-info">
</a> <span>{{ (info['video_count']|string + ' videos') if info['type'] == 'playlist' else info['duration'] }}</span>
</div>
{% endif %}
</a>
<div class="title"><a class="title" href="{{ info['url'] }}" title="{{ info['title'] }}">{{ info['title'] }}</a></div> <div class="title"><a class="title" href="{{ info['url'] }}" title="{{ info['title'] }}">{{ info['title'] }}</a></div>
<ul class="stats {{'vertical-stats' if horizontal and not description and include_author else 'horizontal-stats'}}"> <ul class="stats {{'vertical-stats' if horizontal and not description and include_author else 'horizontal-stats'}}">
{% if info['type'] == 'channel' %} {% if info['type'] == 'channel' %}
<li><span>{{ info['subscriber_count'] }} subscribers</span></li> <li><span>{{ info['approx_subscriber_count'] }} subscribers</span></li>
<li><span>{{ info['size'] }} videos</span></li> <li><span>{{ info['video_count'] }} videos</span></li>
{% else %} {% else %}
{% if include_author %} {% if include_author %}
{% if 'author_url' is in(info) %} {% if info.get('author_url') %}
<li><address title="{{ info['author'] }}">By <a href="{{ info['author_url'] }}">{{ info['author'] }}</a></address></li> <li><address title="{{ info['author'] }}">By <a href="{{ info['author_url'] }}">{{ info['author'] }}</a></address></li>
{% else %} {% else %}
<li><address title="{{ info['author'] }}"><b>{{ info['author'] }}</b></address></li> <li><address title="{{ info['author'] }}"><b>{{ info['author'] }}</b></address></li>
{% endif %}
{% endif %}
{% if info.get('approx_view_count') %}
<li><span class="views">{{ info['approx_view_count'] }} views</span></li>
{% endif %}
{% if info.get('time_published') %}
<li><time>{{ info['time_published'] }}</time></li>
{% endif %} {% endif %}
{% endif %} {% endif %}
{% if 'views' is in(info) %} </ul>
<li><span class="views">{{ info['views'] }}</span></li>
{% endif %}
{% if 'published' is in(info) %}
<li><time>{{ info['published'] }}</time></li>
{% endif %}
{% endif %}
</ul>
{% if description %} {% if description %}
<span class="description">{{ text_runs(info.get('description', '')) }}</span> <span class="description">{{ text_runs(info.get('description', '')) }}</span>
{% endif %}
{% if include_badges %}
<span class="badges">{{ info['badges']|join(' | ') }}</span>
{% endif %}
</div>
{% if info['type'] == 'video' %}
<input class="item-checkbox" type="checkbox" name="video_info_list" value="{{ info['video_info'] }}" form="playlist-edit">
{% endif %} {% endif %}
{% if include_badges %}
<span class="badges">{{ info['badges']|join(' | ') }}</span>
{% endif %}
</div>
{% if info['type'] == 'video' %}
<input class="item-checkbox" type="checkbox" name="video_info_list" value="{{ info['video_info'] }}" form="playlist-edit">
{% endif %} {% endif %}
</div> </div>

View File

@ -54,8 +54,9 @@
<h2 class="playlist-title">{{ title }}</h2> <h2 class="playlist-title">{{ title }}</h2>
<a class="playlist-author" href="{{ author_url }}">{{ author }}</a> <a class="playlist-author" href="{{ author_url }}">{{ author }}</a>
<div class="playlist-stats"> <div class="playlist-stats">
<div>{{ views }}</div> <div>{{ video_count|commatize }} videos</div>
<div>{{ size }} videos</div> <div>{{ view_count|commatize }} views</div>
<div>Last updated {{ time_published }}</div>
</div> </div>
<div class="playlist-description">{{ common_elements.text_runs(description) }}</div> <div class="playlist-description">{{ common_elements.text_runs(description) }}</div>
</div> </div>

View File

@ -261,11 +261,11 @@
{%- endif -%} {%- endif -%}
</ul> </ul>
<address>Uploaded by <a href="{{ uploader_channel_url }}">{{ uploader }}</a></address> <address>Uploaded by <a href="{{ uploader_channel_url }}">{{ uploader }}</a></address>
<span class="views">{{ views }} views</span> <span class="views">{{ view_count }} views</span>
<time datetime="$upload_date">Published on {{ upload_date }}</time> <time datetime="$upload_date">Published on {{ time_published }}</time>
<span class="likes-dislikes">{{ likes }} likes {{ dislikes }} dislikes</span> <span class="likes-dislikes">{{ like_count }} likes {{ dislike_count }} dislikes</span>
<details class="download-dropdown"> <details class="download-dropdown">
<summary class="download-dropdown-label">Download</summary> <summary class="download-dropdown-label">Download</summary>
<ul class="download-dropdown-content"> <ul class="download-dropdown-content">

View File

@ -310,6 +310,8 @@ def uppercase_escape(s):
lambda m: chr(int(m.group(1), base=16)), s) lambda m: chr(int(m.group(1), base=16)), s)
def prefix_url(url): def prefix_url(url):
if url is None:
return None
url = url.lstrip('/') # some urls have // before them, which has a special meaning url = url.lstrip('/') # some urls have // before them, which has a special meaning
return '/' + url return '/' + url

View File

@ -405,10 +405,10 @@ def get_watch_page():
return flask.render_template('watch.html', return flask.render_template('watch.html',
header_playlist_names = local_playlist.get_playlist_names(), header_playlist_names = local_playlist.get_playlist_names(),
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '', uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
upload_date = info['published_date'], time_published = info['time_published'],
views = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
likes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
dislikes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)), dislike_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)),
download_formats = download_formats, download_formats = download_formats,
video_info = json.dumps(video_info), video_info = json.dumps(video_info),
video_sources = video_sources, video_sources = video_sources,

View File

@ -8,7 +8,7 @@ import collections
from math import ceil from math import ceil
import traceback import traceback
# videos (all of type str): # videos:
# id # id
# title # title
@ -17,11 +17,12 @@ import traceback
# author_url # author_url
# thumbnail # thumbnail
# description # description
# published # time_published (str)
# duration # duration (str)
# likes # like_count (int)
# dislikes # dislike_count (int)
# views # view_count (int)
# approx_view_count (str)
# playlist_index # playlist_index
# playlists: # playlists:
@ -33,8 +34,8 @@ import traceback
# author_url # author_url
# thumbnail # thumbnail
# description # description
# updated # time_published (str)
# size # video_count (int)
# first_video_id # first_video_id
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py # from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py
@ -144,26 +145,6 @@ _formats = {
'397': {'vcodec': 'av01.0.05M.08'}, '397': {'vcodec': 'av01.0.05M.08'},
} }
def get_plain_text(node):
try:
return node['simpleText']
except KeyError:
return ''.join(text_run['text'] for text_run in node['runs'])
def format_text_runs(runs):
if isinstance(runs, str):
return runs
result = ''
for text_run in runs:
if text_run.get("bold", False):
result += "<b>" + html.escape(text_run["text"]) + "</b>"
elif text_run.get('italics', False):
result += "<i>" + html.escape(text_run["text"]) + "</i>"
else:
result += html.escape(text_run["text"])
return result
def default_get(object, key, default=None, types=()): def default_get(object, key, default=None, types=()):
'''Like dict.get(), but returns default if the result doesn't match one of the types. '''Like dict.get(), but returns default if the result doesn't match one of the types.
Also works for indexing lists.''' Also works for indexing lists.'''
@ -177,6 +158,19 @@ def default_get(object, key, default=None, types=()):
else: else:
return default return default
def multi_default_get(object, *keys, default=None, types=()):
'''Like default_get, but try other keys if the first fails'''
for key in keys:
try:
result = object[key]
except (TypeError, IndexError, KeyError):
pass
else:
if not types or isinstance(result, types):
return result
else:
continue
return default
def default_multi_get(object, *keys, default=None, types=()): def default_multi_get(object, *keys, default=None, types=()):
@ -211,101 +205,85 @@ def multi_default_multi_get(object, *key_sequences, default=None, types=()):
continue continue
return default return default
def liberal_update(obj, key, value):
'''Updates obj[key] with value as long as value is not None.
Ensures obj[key] will at least get a value of None, however'''
if (value is not None) or (key not in obj):
obj[key] = value
def conservative_update(obj, key, value):
'''Only updates obj if it doesn't have key or obj[key] is None'''
if obj.get(key) is None:
obj[key] = value
def remove_redirect(url): def remove_redirect(url):
if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking
query_string = url[url.find('?')+1: ] query_string = url[url.find('?')+1: ]
return urllib.parse.parse_qs(query_string)['q'][0] return urllib.parse.parse_qs(query_string)['q'][0]
return url return url
def get_url(node): def _recover_urls(runs):
try: for run in runs:
return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] url = default_multi_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
except KeyError: text = run.get('text', '')
return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] # second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text
if url is not None and (text.startswith('http://') or text.startswith('https://')):
url = remove_redirect(url)
run['url'] = url
run['text'] = url # youtube truncates the url text, use actual url instead
def extract_str(node, default=None, recover_urls=False):
'''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)'''
if isinstance(node, str):
return node
def get_text(node):
if node == {}:
return ''
try: try:
return node['simpleText'] return node['simpleText']
except KeyError: except (KeyError, TypeError):
pass pass
try:
return node['runs'][0]['text']
except IndexError: # empty text runs
return ''
except KeyError:
print(node)
raise
def get_formatted_text(node): if isinstance(node, dict) and 'runs' in node:
try: if recover_urls:
_recover_urls(node['runs'])
return ''.join(text_run.get('text', '') for text_run in node['runs'])
return default
def extract_formatted_text(node):
if not node:
return []
if 'runs' in node:
_recover_urls(node['runs'])
return node['runs'] return node['runs']
except KeyError: elif 'simpleText' in node:
return node['simpleText'] return [{'text': node['simpleText']}]
return []
def get_badges(node): def extract_int(string):
badges = [] if isinstance(string, int):
for badge_node in node: return string
badge = badge_node['metadataBadgeRenderer']['label'] if not isinstance(string, str):
badges.append(badge) string = extract_str(string)
return badges if not string:
return None
def get_thumbnail(node): match = re.search(r'(\d+)', string.replace(',', ''))
if match is None:
return None
try: try:
return node['thumbnails'][0]['url'] # polymer format return int(match.group(1))
except KeyError: except ValueError:
return node['url'] # ajax format return None
dispatch = {
# polymer format
'title': ('title', get_text),
'publishedTimeText': ('published', get_text),
'videoId': ('id', lambda node: node),
'descriptionSnippet': ('description', get_formatted_text),
'lengthText': ('duration', get_text),
'thumbnail': ('thumbnail', get_thumbnail),
'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
'viewCountText': ('views', get_text),
'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos"
'videoCountText': ('size', get_text),
'playlistId': ('id', lambda node: node),
'descriptionText': ('description', get_formatted_text),
'subscriberCountText': ('subscriber_count', get_text),
'channelId': ('id', lambda node: node),
'badges': ('badges', get_badges),
# ajax format
'view_count_text': ('views', get_text),
'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
'owner_text': ('author', get_text),
'owner_endpoint': ('author_url', lambda node: node['url']),
'description': ('description', get_formatted_text),
'index': ('playlist_index', get_text),
'short_byline': ('author', get_text),
'length': ('duration', get_text),
'video_id': ('id', lambda node: node),
}
def ajax_info(item_json):
try:
info = {}
for key, node in item_json.items():
try:
simple_key, function = dispatch[key]
except KeyError:
continue
info[simple_key] = function(node)
return info
except KeyError:
print(item_json)
raise
def extract_approx_int(string):
'''e.g. "15M" from "15M subscribers"'''
if not isinstance(string, str):
string = extract_str(string)
if not string:
return None
match = re.search(r'(\d+[KMBTkmbt])', string.replace(',', ''))
if match is None:
return None
return match.group(1)
youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$') youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$')
def normalize_url(url): def normalize_url(url):
@ -330,7 +308,7 @@ def prefix_urls(item):
def add_extra_html_info(item): def add_extra_html_info(item):
if item['type'] == 'video': if item['type'] == 'video':
item['url'] = util.URL_ORIGIN + '/watch?v=' + item['id'] item['url'] = (util.URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
video_info = {} video_info = {}
for key in ('id', 'title', 'author', 'duration'): for key in ('id', 'title', 'author', 'duration'):
@ -342,17 +320,22 @@ def add_extra_html_info(item):
item['video_info'] = json.dumps(video_info) item['video_info'] = json.dumps(video_info)
elif item['type'] == 'playlist': elif item['type'] == 'playlist':
item['url'] = util.URL_ORIGIN + '/playlist?list=' + item['id'] item['url'] = (util.URL_ORIGIN + '/playlist?list=' + item['id']) if item.get('id') else None
elif item['type'] == 'channel': elif item['type'] == 'channel':
item['url'] = util.URL_ORIGIN + "/channel/" + item['id'] item['url'] = (util.URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None
def extract_item_info(item, additional_info={}):
if not item:
return {'error': 'No item given'}
def renderer_info(renderer, additional_info={}): type = default_get(list(item.keys()), 0)
type = list(renderer.keys())[0] if not type:
renderer = renderer[type] return {'error': 'Could not find type'}
info = {} item = item[type]
info = {'error': None}
if type in ('itemSectionRenderer', 'compactAutoplayRenderer'): if type in ('itemSectionRenderer', 'compactAutoplayRenderer'):
return renderer_info(renderer['contents'][0], additional_info) return extract_item_info(default_multi_get(item, 'contents', 0), additional_info)
if type in ('movieRenderer', 'clarificationRenderer'): if type in ('movieRenderer', 'clarificationRenderer'):
info['type'] = 'unsupported' info['type'] = 'unsupported'
@ -360,75 +343,78 @@ def renderer_info(renderer, additional_info={}):
info.update(additional_info) info.update(additional_info)
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
if type in ('compactVideoRenderer', 'videoRenderer', 'playlistVideoRenderer', 'gridVideoRenderer'): # camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
if len(type_parts) < 2:
info['type'] = 'unsupported'
return
primary_type = type_parts[-2]
if primary_type == 'video':
info['type'] = 'video' info['type'] = 'video'
elif type in ('playlistRenderer', 'compactPlaylistRenderer', 'gridPlaylistRenderer', elif primary_type in ('playlist', 'radio', 'show'):
'radioRenderer', 'compactRadioRenderer', 'gridRadioRenderer',
'showRenderer', 'compactShowRenderer', 'gridShowRenderer'):
info['type'] = 'playlist' info['type'] = 'playlist'
elif type == 'channelRenderer': elif primary_type == 'channel':
info['type'] = 'channel' info['type'] = 'channel'
elif type == 'playlistHeaderRenderer':
info['type'] = 'playlist_metadata'
else: else:
info['type'] = 'unsupported' info['type'] = 'unsupported'
return info
try: info['title'] = extract_str(item.get('title'))
if 'viewCountText' in renderer: # prefer this one as it contains all the digits info['author'] = extract_str(multi_default_get(item, 'longBylineText', 'shortBylineText', 'ownerText'))
info['views'] = get_text(renderer['viewCountText']) info['author_id'] = extract_str(multi_default_multi_get(item,
elif 'shortViewCountText' in renderer: ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
info['views'] = get_text(renderer['shortViewCountText']) ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
info['description'] = extract_formatted_text(multi_default_get(item, 'descriptionSnippet', 'descriptionText'))
info['thumbnail'] = multi_default_multi_get(item,
['thumbnail', 'thumbnails', 0, 'url'], # videos
['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows
)
if 'ownerText' in renderer: info['badges'] = []
info['author'] = renderer['ownerText']['runs'][0]['text'] for badge_node in multi_default_get(item, 'badges', 'ownerBadges', default=()):
info['author_url'] = normalize_url(renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']) badge = default_multi_get(badge_node, 'metadataBadgeRenderer', 'label')
try: if badge:
overlays = renderer['thumbnailOverlays'] info['badges'].append(badge)
except KeyError:
pass if primary_type in ('video', 'playlist'):
info['time_published'] = extract_str(item.get('publishedTimeText'))
if primary_type == 'video':
info['id'] = item.get('videoId')
info['view_count'] = extract_int(item.get('viewCountText'))
if info['view_count']:
info['approx_view_count'] = '{:,}'.format(info['view_count'])
else: else:
for overlay in overlays: info['approx_view_count'] = extract_approx_int(multi_default_get(item, 'shortViewCountText'))
if 'thumbnailOverlayTimeStatusRenderer' in overlay: info['duration'] = extract_str(item.get('lengthText'))
info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text']) elif primary_type == 'playlist':
# show renderers don't have videoCountText info['id'] = item.get('playlistId')
elif 'thumbnailOverlayBottomPanelRenderer' in overlay: info['video_count'] = extract_int(item.get('videoCount'))
info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text']) elif primary_type == 'channel':
info['id'] = item.get('channelId')
info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
elif primary_type == 'show':
info['id'] = default_multi_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
# show renderers don't have playlistId, have to dig into the url to get it if primary_type in ('playlist', 'channel'):
try: conservative_update(info, 'video_count', extract_int(item.get('videoCountText')))
info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId']
except KeyError:
pass
for key, node in renderer.items():
if key in ('longBylineText', 'shortBylineText'):
info['author'] = get_text(node)
try:
info['author_url'] = normalize_url(get_url(node))
except KeyError:
pass
# show renderers don't have thumbnail key at top level, dig into thumbnailRenderer
elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node:
info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url']
else:
try:
simple_key, function = dispatch[key]
except KeyError:
continue
info[simple_key] = function(node)
if info['type'] == 'video' and 'duration' not in info:
info['duration'] = 'Live'
return info
except KeyError:
print(renderer)
raise
for overlay in item.get('thumbnailOverlays', []):
conservative_update(info, 'duration', extract_str(default_multi_get(
overlay, 'thumbnailOverlayTimeStatusRenderer', 'text'
)))
# show renderers don't have videoCountText
conservative_update(info, 'video_count', extract_int(default_multi_get(
overlay, 'thumbnailOverlayBottomPanelRenderer', 'text'
)))
return info
def parse_info_prepare_for_html(renderer, additional_info={}): def parse_info_prepare_for_html(renderer, additional_info={}):
item = renderer_info(renderer, additional_info) item = extract_item_info(renderer, additional_info)
prefix_urls(item) prefix_urls(item)
add_extra_html_info(item) add_extra_html_info(item)
@ -616,7 +602,7 @@ def extract_channel_info(polymer_json, tab):
items, _ = extract_items(response) items, _ = extract_items(response)
if tab in ('videos', 'playlists', 'search'): if tab in ('videos', 'playlists', 'search'):
additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
info['items'] = [renderer_info(renderer, additional_info) for renderer in items] info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
elif tab == 'about': elif tab == 'about':
for item in items: for item in items:
@ -633,7 +619,7 @@ def extract_channel_info(polymer_json, tab):
for link_json in channel_metadata.get('primaryLinks', ()): for link_json in channel_metadata.get('primaryLinks', ()):
url = remove_redirect(link_json['navigationEndpoint']['urlEndpoint']['url']) url = remove_redirect(link_json['navigationEndpoint']['urlEndpoint']['url'])
text = get_plain_text(link_json['title']) text = extract_str(link_json['title'])
info['links'].append( (text, url) ) info['links'].append( (text, url) )
@ -644,10 +630,10 @@ def extract_channel_info(polymer_json, tab):
stat = channel_metadata[stat_name] stat = channel_metadata[stat_name]
except KeyError: except KeyError:
continue continue
info['stats'].append(get_plain_text(stat)) info['stats'].append(extract_str(stat))
if 'description' in channel_metadata: if 'description' in channel_metadata:
info['description'] = get_text(channel_metadata['description']) info['description'] = extract_str(channel_metadata['description'])
else: else:
info['description'] = '' info['description'] = ''
@ -693,9 +679,9 @@ def extract_search_info(polymer_json):
} }
continue continue
item_info = renderer_info(renderer) i_info = extract_item_info(renderer)
if item_info['type'] != 'unsupported': if i_info.get('type') != 'unsupported':
info['items'].append(item_info) info['items'].append(i_info)
return info return info
@ -704,13 +690,41 @@ def extract_playlist_metadata(polymer_json):
response, err = extract_response(polymer_json) response, err = extract_response(polymer_json)
if err: if err:
return {'error': err} return {'error': err}
metadata = renderer_info(response['header'])
metadata['error'] = None
if 'description' not in metadata: metadata = {'error': None}
metadata['description'] = '' header = default_multi_get(response, 'header', 'playlistHeaderRenderer', default={})
metadata['title'] = extract_str(header.get('title'))
metadata['size'] = int(metadata['size'].replace(',', '')) metadata['first_video_id'] = default_multi_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
first_id = re.search(r'([a-z_\-]{11})', default_multi_get(header,
'thumbnail', 'thumbnails', 0, 'url', default=''))
if first_id:
conservative_update(metadata, 'first_video_id', first_id.group(1))
if metadata['first_video_id'] is None:
metadata['thumbnail'] = None
else:
metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg'
metadata['video_count'] = extract_int(header.get('numVideosText'))
metadata['description'] = extract_str(header.get('descriptionText'), default='')
metadata['author'] = extract_str(header.get('ownerText'))
metadata['author_id'] = multi_default_multi_get(header,
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerEndpoint', 'browseEndpoint', 'browseId'])
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
else:
metadata['author_url'] = None
metadata['view_count'] = extract_int(header.get('viewCountText'))
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
for stat in header.get('stats', ()):
text = extract_str(stat)
if 'videos' in text:
conservative_update(metadata, 'video_count', extract_int(text))
elif 'views' in text:
conservative_update(metadata, 'view_count', extract_int(text))
elif 'updated' in text:
metadata['time_published'] = extract_date(text)
return metadata return metadata
@ -722,7 +736,7 @@ def extract_playlist_info(polymer_json):
first_page = 'continuationContents' not in response first_page = 'continuationContents' not in response
video_list, _ = extract_items(response) video_list, _ = extract_items(response)
info['items'] = [renderer_info(renderer) for renderer in video_list] info['items'] = [extract_item_info(renderer) for renderer in video_list]
if first_page: if first_page:
info['metadata'] = extract_playlist_metadata(polymer_json) info['metadata'] = extract_playlist_metadata(polymer_json)
@ -777,7 +791,7 @@ def parse_comments_polymer(polymer_json):
video_title = comment_thread['commentTargetTitle']['runs'][0]['text'] video_title = comment_thread['commentTargetTitle']['runs'][0]['text']
if 'replies' in comment_thread: if 'replies' in comment_thread:
view_replies_text = get_plain_text(comment_thread['replies']['commentRepliesRenderer']['moreText']) view_replies_text = extract_str(comment_thread['replies']['commentRepliesRenderer']['moreText'])
view_replies_text = view_replies_text.replace(',', '') view_replies_text = view_replies_text.replace(',', '')
match = re.search(r'(\d+)', view_replies_text) match = re.search(r'(\d+)', view_replies_text)
if match is None: if match is None:
@ -789,15 +803,15 @@ def parse_comments_polymer(polymer_json):
comment = { comment = {
'author_id': comment_renderer.get('authorId', ''), 'author_id': comment_renderer.get('authorId', ''),
'author_avatar': comment_renderer['authorThumbnail']['thumbnails'][0]['url'], 'author_avatar': comment_renderer['authorThumbnail']['thumbnails'][0]['url'],
'likes': comment_renderer['likeCount'], 'like_count': comment_renderer['likeCount'],
'published': get_plain_text(comment_renderer['publishedTimeText']), 'time_published': extract_str(comment_renderer['publishedTimeText']),
'text': comment_renderer['contentText'].get('runs', ''), 'text': comment_renderer['contentText'].get('runs', ''),
'number_of_replies': number_of_replies, 'reply_count': number_of_replies,
'comment_id': comment_renderer['commentId'], 'id': comment_renderer['commentId'],
} }
if 'authorText' in comment_renderer: # deleted channels have no name or channel link if 'authorText' in comment_renderer: # deleted channels have no name or channel link
comment['author'] = get_plain_text(comment_renderer['authorText']) comment['author'] = extract_str(comment_renderer['authorText'])
comment['author_url'] = comment_renderer['authorEndpoint']['commandMetadata']['webCommandMetadata']['url'] comment['author_url'] = comment_renderer['authorEndpoint']['commandMetadata']['webCommandMetadata']['url']
comment['author_channel_id'] = comment_renderer['authorEndpoint']['browseEndpoint']['browseId'] comment['author_channel_id'] = comment_renderer['authorEndpoint']['browseEndpoint']['browseId']
else: else:
@ -832,66 +846,6 @@ def check_missing_keys(object, *key_sequences):
return None return None
def extract_str(node, default=None, recover_urls=False):
'''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)'''
if isinstance(node, str):
return node
try:
return node['simpleText']
except (KeyError, TypeError):
pass
if isinstance(node, dict) and 'runs' in node:
if recover_urls:
result = ''
for run in node['runs']:
url = default_multi_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
text = run.get('text', '')
# second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text
if url is not None and (text.startswith('http://') or text.startswith('https://')):
url = remove_redirect(url)
result += url # youtube truncates the url text, use actual url instead
else:
result += text
return result
else:
return ''.join(text_run.get('text', '') for text_run in node['runs'])
return default
def extract_formatted_text(node):
try:
result = []
runs = node['runs']
for run in runs:
url = default_multi_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
if url is not None:
run['url'] = remove_redirect(url)
run['text'] = run['url'] # youtube truncates the url text, we don't want that nonsense
return runs
except (KeyError, TypeError):
traceback.print_exc()
pass
try:
return [{'text': node['simpleText']}]
except (KeyError, TypeError):
pass
return []
def extract_int(string):
if not isinstance(string, str):
return None
match = re.search(r'(\d+)', string.replace(',', ''))
if match is None:
return None
try:
return int(match.group(1))
except ValueError:
return None
def extract_metadata_row_info(video_renderer_info): def extract_metadata_row_info(video_renderer_info):
# extract category and music list # extract category and music list
info = { info = {
@ -944,7 +898,7 @@ def extract_watch_info_mobile(top_level):
else: else:
info['age_restricted'] = not family_safe info['age_restricted'] = not family_safe
info['allowed_countries'] = microformat.get('availableCountries', []) info['allowed_countries'] = microformat.get('availableCountries', [])
info['published_date'] = microformat.get('publishDate') info['time_published'] = microformat.get('publishDate')
response = top_level.get('response', {}) response = top_level.get('response', {})
@ -962,15 +916,15 @@ def extract_watch_info_mobile(top_level):
info['author'] = extract_str(default_multi_get(video_info, 'owner', 'slimOwnerRenderer', 'title')) info['author'] = extract_str(default_multi_get(video_info, 'owner', 'slimOwnerRenderer', 'title'))
info['author_id'] = default_multi_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId') info['author_id'] = default_multi_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId')
info['title'] = extract_str(video_info.get('title')) info['title'] = extract_str(video_info.get('title'))
info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle')) info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle'), default='')
info['unlisted'] = False info['unlisted'] = False
for badge in video_info.get('badges', []): for badge in video_info.get('badges', []):
if default_multi_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted': if default_multi_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted':
info['unlisted'] = True info['unlisted'] = True
info['like_count'] = None info['like_count'] = None
info['dislike_count'] = None info['dislike_count'] = None
if not info['published_date']: if not info['time_published']:
info['published_date'] = extract_date(extract_str(video_info.get('dateText', None))) info['time_published'] = extract_date(extract_str(video_info.get('dateText', None)))
for button in video_info.get('buttons', ()): for button in video_info.get('buttons', ()):
button_renderer = button.get('slimMetadataToggleButtonRenderer', {}) button_renderer = button.get('slimMetadataToggleButtonRenderer', {})
@ -1012,7 +966,7 @@ def extract_watch_info_mobile(top_level):
# related videos # related videos
related, _ = extract_items(response) related, _ = extract_items(response)
info['related_videos'] = [renderer_info(renderer) for renderer in related] info['related_videos'] = [extract_item_info(renderer) for renderer in related]
return info return info
@ -1032,7 +986,7 @@ def extract_watch_info_desktop(top_level):
info.update(extract_metadata_row_info(video_info)) info.update(extract_metadata_row_info(video_info))
info['description'] = extract_str(video_info.get('description', None), recover_urls=True) info['description'] = extract_str(video_info.get('description', None), recover_urls=True)
info['published_date'] = extract_date(extract_str(video_info.get('dateText', None))) info['time_published'] = extract_date(extract_str(video_info.get('dateText', None)))
likes_dislikes = default_multi_get(video_info, 'sentimentBar', 'sentimentBarRenderer', 'tooltip', default='').split('/') likes_dislikes = default_multi_get(video_info, 'sentimentBar', 'sentimentBarRenderer', 'tooltip', default='').split('/')
if len(likes_dislikes) == 2: if len(likes_dislikes) == 2:
@ -1048,7 +1002,7 @@ def extract_watch_info_desktop(top_level):
info['view_count'] = extract_int(extract_str(default_multi_get(video_info, 'viewCount', 'videoViewCountRenderer', 'viewCount'))) info['view_count'] = extract_int(extract_str(default_multi_get(video_info, 'viewCount', 'videoViewCountRenderer', 'viewCount')))
related = default_multi_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results', default=[]) related = default_multi_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results', default=[])
info['related_videos'] = [renderer_info(renderer) for renderer in related] info['related_videos'] = [extract_item_info(renderer) for renderer in related]
return info return info
@ -1114,17 +1068,6 @@ def extract_playability_error(info, player_response, error_prefix=''):
else: else:
info['playability_error'] = error_prefix + 'Unknown playability error' info['playability_error'] = error_prefix + 'Unknown playability error'
def liberal_update(obj, key, value):
'''Updates obj[key] with value as long as value is not None.
Ensures obj[key] will at least get a value of None, however'''
if (value is not None) or (key not in obj):
obj[key] = value
def conservative_update(obj, key, value):
'''Only updates obj if it doesn't have key or obj[key] is None'''
if obj.get(key) is None:
obj[key] = value
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json): def extract_watch_info(polymer_json):
info = {'playability_error': None, 'error': None} info = {'playability_error': None, 'error': None}
@ -1223,8 +1166,8 @@ def extract_watch_info(polymer_json):
conservative_update(info, 'author_id', mf.get('externalChannelId')) conservative_update(info, 'author_id', mf.get('externalChannelId'))
liberal_update(info, 'unlisted', mf.get('isUnlisted')) liberal_update(info, 'unlisted', mf.get('isUnlisted'))
liberal_update(info, 'category', mf.get('category')) liberal_update(info, 'category', mf.get('category'))
liberal_update(info, 'published_date', mf.get('publishDate')) liberal_update(info, 'time_published', mf.get('publishDate'))
liberal_update(info, 'uploaded_date', mf.get('uploadDate')) liberal_update(info, 'time_uploaded', mf.get('uploadDate'))
# other stuff # other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None