Merge branch 'modular-data-extract'

Commits in this branch are prefixed with "Extraction:"
This branch refactors data extraction. All such functionality has been moved to the yt_data_extract module.
Responses from requests are given to the module and it parses them into a consistent, more useful format.
The dependency on youtube-dl has also been dropped and this functionality has been built from scratch for these reasons:
(1) I've noticed youtube-dl breaks more often than invidious (which uses watch page extraction built from scratch) in response to changes from Youtube, so I'm hoping what I wrote will also be less brittle.
(2) Such breakage is inconvenient because I have to manually merge the fixes since I had to make changes to youtube-dl to make it do things such as extracting related videos.
(3) I have no control over error handling and request pooling with youtube-dl, since it does all the requests (these would require intrusive changes I don't want to maintain).
(4) I will now be able to finally display the number of comments and whether comments are disabled without making additional requests.
This commit is contained in:
James Taylor
2019-12-19 21:33:54 -08:00
61 changed files with 1753 additions and 32293 deletions

View File

@@ -23,3 +23,10 @@ def inject_theme_preference():
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css',
}
@yt_app.template_filter('commatize')
def commatize(num):
if num is None:
return ''
if isinstance(num, str):
num = int(num)
return '{:,}'.format(num)

View File

@@ -137,132 +137,13 @@ def get_channel_search_json(channel_id, query, page):
return polymer_json
def extract_info(polymer_json, tab):
response = polymer_json[1]['response']
try:
microformat = response['microformat']['microformatDataRenderer']
# channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
except KeyError:
if 'alerts' in response and len(response['alerts']) > 0:
result = ''
for alert in response['alerts']:
result += alert['alertRenderer']['text']['simpleText'] + '\n'
flask.abort(200, result)
elif 'errors' in response['responseContext']:
for error in response['responseContext']['errors']['error']:
if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id':
flask.abort(404, 'This channel does not exist')
raise
info = {}
info['current_tab'] = tab
# stuff from microformat (info given by youtube for every page on channel)
info['short_description'] = microformat['description']
info['channel_name'] = microformat['title']
info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url']
channel_url = microformat['urlCanonical'].rstrip('/')
channel_id = channel_url[channel_url.rfind('/')+1:]
info['channel_id'] = channel_id
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['items'] = []
# empty channel
if 'contents' not in response and 'continuationContents' not in response:
return info
# find the tab with content
# example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg
# TODO: maybe use the 'selected' attribute for this?
if 'continuationContents' not in response:
tab_renderer = None
tab_content = None
for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']:
try:
tab_renderer = tab_json['tabRenderer']
except KeyError:
tab_renderer = tab_json['expandableTabRenderer']
try:
tab_content = tab_renderer['content']
break
except KeyError:
pass
else: # didn't break
raise Exception("No tabs found with content")
assert tab == tab_renderer['title'].lower()
# extract tab-specific info
if tab in ('videos', 'playlists', 'search'): # find the list of items
if 'continuationContents' in response:
try:
items = response['continuationContents']['gridContinuation']['items']
except KeyError:
items = response['continuationContents']['sectionListContinuation']['contents'] # for search
else:
contents = tab_content['sectionListRenderer']['contents']
if 'itemSectionRenderer' in contents[0]:
item_section = contents[0]['itemSectionRenderer']['contents'][0]
try:
items = item_section['gridRenderer']['items']
except KeyError:
if "messageRenderer" in item_section:
items = []
else:
raise Exception('gridRenderer missing but messageRenderer not found')
else:
items = contents # for search
# TODO: Fix this URL prefixing shit
additional_info = {'author': info['channel_name'], 'author_url': '/channel/' + channel_id}
info['items'] = [yt_data_extract.renderer_info(renderer, additional_info) for renderer in items]
elif tab == 'about':
channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer']
info['links'] = []
for link_json in channel_metadata.get('primaryLinks', ()):
url = link_json['navigationEndpoint']['urlEndpoint']['url']
if url.startswith('/redirect'): # youtube puts these on external links to do tracking
query_string = url[url.find('?')+1: ]
url = urllib.parse.parse_qs(query_string)['q'][0]
text = yt_data_extract.get_plain_text(link_json['title'])
info['links'].append( (text, url) )
info['stats'] = []
for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
try:
stat = channel_metadata[stat_name]
except KeyError:
continue
info['stats'].append(yt_data_extract.get_plain_text(stat))
if 'description' in channel_metadata:
info['description'] = yt_data_extract.get_text(channel_metadata['description'])
else:
info['description'] = ''
else:
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
return info
def post_process_channel_info(info):
info['avatar'] = '/' + info['avatar']
info['channel_url'] = '/' + info['channel_url']
info['avatar'] = util.prefix_url(info['avatar'])
info['channel_url'] = util.prefix_url(info['channel_url'])
for item in info['items']:
yt_data_extract.prefix_urls(item)
yt_data_extract.add_extra_html_info(item)
util.prefix_urls(item)
util.add_extra_html_info(item)
@@ -304,7 +185,9 @@ def get_channel_page(channel_id, tab='videos'):
flask.abort(404, 'Unknown channel tab: ' + tab)
info = extract_info(json.loads(polymer_json), tab)
info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab)
if info['error']:
return flask.render_template('error.html', error_message = info['error'])
post_process_channel_info(info)
if tab in ('videos', 'search'):
info['number_of_videos'] = number_of_videos
@@ -344,7 +227,10 @@ def get_channel_page_general_url(base_url, tab, request):
flask.abort(404, 'Unknown channel tab: ' + tab)
info = extract_info(json.loads(polymer_json), tab)
info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab)
if info['error']:
return flask.render_template('error.html', error_message = info['error'])
post_process_channel_info(info)
if tab in ('videos', 'search'):
info['number_of_videos'] = 1000

View File

@@ -48,24 +48,6 @@ def comment_replies_ctoken(video_id, comment_id, max_results=500):
result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params)
return base64.urlsafe_b64encode(result).decode('ascii')
def ctoken_metadata(ctoken):
result = dict()
params = proto.parse(proto.b64_to_bytes(ctoken))
result['video_id'] = proto.parse(params[2])[2].decode('ascii')
offset_information = proto.parse(params[6])
result['offset'] = offset_information.get(5, 0)
result['is_replies'] = False
if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
result['is_replies'] = True
result['sort'] = None
else:
try:
result['sort'] = proto.parse(offset_information[4])[6]
except KeyError:
result['sort'] = 0
return result
mobile_headers = {
@@ -91,7 +73,9 @@ def request_comments(ctoken, replies=False):
print("got <!DOCTYPE>, retrying")
continue
break
return content
polymer_json = json.loads(util.uppercase_escape(content.decode('utf-8')))
return polymer_json
def single_comment_ctoken(video_id, comment_id):
@@ -102,112 +86,40 @@ def single_comment_ctoken(video_id, comment_id):
def parse_comments_polymer(content):
try:
video_title = ''
content = json.loads(util.uppercase_escape(content.decode('utf-8')))
url = content[1]['url']
ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
metadata = ctoken_metadata(ctoken)
try:
comments_raw = content[1]['response']['continuationContents']['commentSectionContinuation']['items']
except KeyError:
comments_raw = content[1]['response']['continuationContents']['commentRepliesContinuation']['contents']
ctoken = util.default_multi_get(content, 1, 'response', 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='')
comments = []
for comment_json in comments_raw:
number_of_replies = 0
try:
comment_thread = comment_json['commentThreadRenderer']
except KeyError:
comment_renderer = comment_json['commentRenderer']
else:
if 'commentTargetTitle' in comment_thread:
video_title = comment_thread['commentTargetTitle']['runs'][0]['text']
if 'replies' in comment_thread:
view_replies_text = yt_data_extract.get_plain_text(comment_thread['replies']['commentRepliesRenderer']['moreText'])
view_replies_text = view_replies_text.replace(',', '')
match = re.search(r'(\d+)', view_replies_text)
if match is None:
number_of_replies = 1
else:
number_of_replies = int(match.group(1))
comment_renderer = comment_thread['comment']['commentRenderer']
comment = {
'author_id': comment_renderer.get('authorId', ''),
'author_avatar': comment_renderer['authorThumbnail']['thumbnails'][0]['url'],
'likes': comment_renderer['likeCount'],
'published': yt_data_extract.get_plain_text(comment_renderer['publishedTimeText']),
'text': comment_renderer['contentText'].get('runs', ''),
'number_of_replies': number_of_replies,
'comment_id': comment_renderer['commentId'],
}
if 'authorText' in comment_renderer: # deleted channels have no name or channel link
comment['author'] = yt_data_extract.get_plain_text(comment_renderer['authorText'])
comment['author_url'] = comment_renderer['authorEndpoint']['commandMetadata']['webCommandMetadata']['url']
comment['author_channel_id'] = comment_renderer['authorEndpoint']['browseEndpoint']['browseId']
else:
comment['author'] = ''
comment['author_url'] = ''
comment['author_channel_id'] = ''
comments.append(comment)
except Exception as e:
print('Error parsing comments: ' + str(e))
comments = ()
ctoken = ''
return {
'ctoken': ctoken,
'comments': comments,
'video_title': video_title,
'video_id': metadata['video_id'],
'offset': metadata['offset'],
'is_replies': metadata['is_replies'],
'sort': metadata['sort'],
}
def post_process_comments_info(comments_info):
for comment in comments_info['comments']:
comment['author_url'] = util.URL_ORIGIN + comment['author_url']
comment['author_avatar'] = '/' + comment['author_avatar']
comment['permalink'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] + '&lc=' + comment['comment_id']
comment['permalink'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] + '&lc=' + comment['id']
if comment['author_channel_id'] in accounts.accounts:
if comment['author_id'] in accounts.accounts:
comment['delete_url'] = (util.URL_ORIGIN + '/delete_comment?video_id='
+ comments_info['video_id']
+ '&channel_id='+ comment['author_channel_id']
+ '&author_id=' + comment['author_id']
+ '&comment_id=' + comment['comment_id'])
+ '&channel_id='+ comment['author_id']
+ '&comment_id=' + comment['id'])
num_replies = comment['number_of_replies']
if num_replies == 0:
comment['replies_url'] = util.URL_ORIGIN + '/post_comment?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id']
reply_count = comment['reply_count']
if reply_count == 0:
comment['replies_url'] = util.URL_ORIGIN + '/post_comment?parent_id=' + comment['id'] + "&video_id=" + comments_info['video_id']
else:
comment['replies_url'] = util.URL_ORIGIN + '/comments?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id']
comment['replies_url'] = util.URL_ORIGIN + '/comments?parent_id=' + comment['id'] + "&video_id=" + comments_info['video_id']
if num_replies == 0:
if reply_count == 0:
comment['view_replies_text'] = 'Reply'
elif num_replies == 1:
elif reply_count == 1:
comment['view_replies_text'] = '1 reply'
else:
comment['view_replies_text'] = str(num_replies) + ' replies'
comment['view_replies_text'] = str(reply_count) + ' replies'
if comment['likes'] == 1:
if comment['like_count'] == 1:
comment['likes_text'] = '1 like'
else:
comment['likes_text'] = str(comment['likes']) + ' likes'
comment['likes_text'] = str(comment['like_count']) + ' likes'
comments_info['include_avatars'] = settings.enable_comment_avatars
if comments_info['ctoken'] != '':
if comments_info['ctoken']:
comments_info['more_comments_url'] = util.URL_ORIGIN + '/comments?ctoken=' + comments_info['ctoken']
comments_info['page_number'] = page_number = str(int(comments_info['offset']/20) + 1)
@@ -222,7 +134,7 @@ def post_process_comments_info(comments_info):
def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if settings.comments_mode:
comments_info = parse_comments_polymer(request_comments(make_comment_ctoken(video_id, sort, offset, lc, secret_key)))
comments_info = yt_data_extract.extract_comments_info(request_comments(make_comment_ctoken(video_id, sort, offset, lc, secret_key)))
post_process_comments_info(comments_info)
post_comment_url = util.URL_ORIGIN + "/post_comment?video_id=" + video_id
@@ -247,7 +159,7 @@ def get_comments_page():
ctoken = comment_replies_ctoken(video_id, parent_id)
replies = True
comments_info = parse_comments_polymer(request_comments(ctoken, replies))
comments_info = yt_data_extract.extract_comments_info(request_comments(ctoken, replies))
post_process_comments_info(comments_info)
if not replies:

View File

@@ -57,7 +57,7 @@ def get_local_playlist_videos(name, offset=0, amount=50):
info['thumbnail'] = util.get_thumbnail_url(info['id'])
missing_thumbnails.append(info['id'])
info['type'] = 'video'
yt_data_extract.add_extra_html_info(info)
util.add_extra_html_info(info)
videos.append(info)
except json.decoder.JSONDecodeError:
if not video_json.strip() == '':

View File

@@ -89,28 +89,29 @@ def get_playlist_page():
)
gevent.joinall(tasks)
first_page_json, this_page_json = tasks[0].value, tasks[1].value
try: # first page
video_list = this_page_json['response']['contents']['singleColumnBrowseResultsRenderer']['tabs'][0]['tabRenderer']['content']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['playlistVideoListRenderer']['contents']
except KeyError: # other pages
video_list = this_page_json['response']['continuationContents']['playlistVideoListContinuation']['contents']
parsed_video_list = [yt_data_extract.parse_info_prepare_for_html(video_json) for video_json in video_list]
info = yt_data_extract.extract_playlist_info(this_page_json)
if info['error']:
return flask.render_template('error.html', error_message = info['error'])
if page != '1':
info['metadata'] = yt_data_extract.extract_playlist_metadata(first_page_json)
metadata = yt_data_extract.renderer_info(first_page_json['response']['header'])
yt_data_extract.prefix_urls(metadata)
util.prefix_urls(info['metadata'])
for item in info.get('items', ()):
util.prefix_urls(item)
util.add_extra_html_info(item)
if 'id' in item:
item['thumbnail'] = '/https://i.ytimg.com/vi/' + item['id'] + '/default.jpg'
if 'description' not in metadata:
metadata['description'] = ''
video_count = int(metadata['size'].replace(',', ''))
metadata['size'] += ' videos'
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
if video_count is None:
video_count = 40
return flask.render_template('playlist.html',
video_list = parsed_video_list,
video_list = info.get('items', []),
num_pages = math.ceil(video_count/20),
parameters_dictionary = request.args,
**metadata
**info['metadata']
).encode('utf-8')

View File

@@ -70,7 +70,7 @@ def _post_comment_reply(text, video_id, parent_comment_id, session_token, cookie
print("Comment posting code: " + code)
return code
def _delete_comment(video_id, comment_id, author_id, session_token, cookiejar):
def _delete_comment(video_id, comment_id, session_token, cookiejar):
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1',
'Accept': '*/*',
@@ -79,7 +79,7 @@ def _delete_comment(video_id, comment_id, author_id, session_token, cookiejar):
'X-YouTube-Client-Version': '2.20180823',
'Content-Type': 'application/x-www-form-urlencoded',
}
action = proto.uint(1,6) + proto.string(3, comment_id) + proto.string(5, video_id) + proto.string(9, author_id)
action = proto.uint(1,6) + proto.string(3, comment_id) + proto.string(5, video_id)
action = proto.percent_b64encode(action).decode('ascii')
sej = json.dumps({"clickTrackingParams":"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=","commandMetadata":{"webCommandMetadata":{"url":"/service_ajax","sendPost":True}},"performCommentActionEndpoint":{"action":action}})
@@ -115,7 +115,7 @@ def delete_comment():
cookiejar = accounts.account_cookiejar(request.values['channel_id'])
token = get_session_token(video_id, cookiejar)
code = _delete_comment(video_id, request.values['comment_id'], request.values['author_id'], token, cookiejar)
code = _delete_comment(video_id, request.values['comment_id'], token, cookiejar)
if code == "SUCCESS":
return flask.redirect(util.URL_ORIGIN + '/comment_delete_success', 303)
@@ -147,7 +147,7 @@ def post_comment():
@yt_app.route('/delete_comment', methods=['GET'])
def get_delete_comment_page():
parameters = [(parameter_name, request.args[parameter_name]) for parameter_name in ('video_id', 'channel_id', 'author_id', 'comment_id')]
parameters = [(parameter_name, request.args[parameter_name]) for parameter_name in ('video_id', 'channel_id', 'comment_id')]
return flask.render_template('delete_comment.html', parameters = parameters)

View File

@@ -5,7 +5,6 @@ import settings
import json
import urllib
import base64
from math import ceil
import mimetypes
from flask import request
import flask
@@ -74,59 +73,34 @@ def get_search_page():
filters['time'] = int(request.args.get("time", "0"))
filters['type'] = int(request.args.get("type", "0"))
filters['duration'] = int(request.args.get("duration", "0"))
info = get_search_json(query, page, autocorrect, sort, filters)
estimated_results = int(info[1]['response']['estimatedResults'])
estimated_pages = ceil(estimated_results/20)
polymer_json = get_search_json(query, page, autocorrect, sort, filters)
# almost always is the first "section", but if there's an advertisement for a google product like Stadia or Home in the search results, then that becomes the first "section" and the search results are in the second. So just join all of them for resiliency
results = []
for section in info[1]['response']['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents']:
results += section['itemSectionRenderer']['contents']
search_info = yt_data_extract.extract_search_info(polymer_json)
if search_info['error']:
return flask.render_template('error.html', error_message = search_info['error'])
parsed_results = []
corrections = {'type': None}
for renderer in results:
type = list(renderer.keys())[0]
if type == 'shelfRenderer':
continue
if type == 'didYouMeanRenderer':
renderer = renderer[type]
corrected_query_string = request.args.to_dict(flat=False)
corrected_query_string['query'] = [renderer['correctedQueryEndpoint']['searchEndpoint']['query']]
corrected_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
for extract_item_info in search_info['items']:
util.prefix_urls(extract_item_info)
util.add_extra_html_info(extract_item_info)
corrections = {
'type': 'did_you_mean',
'corrected_query': yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
'corrected_query_url': corrected_query_url,
}
continue
if type == 'showingResultsForRenderer':
renderer = renderer[type]
no_autocorrect_query_string = request.args.to_dict(flat=False)
no_autocorrect_query_string['autocorrect'] = ['0']
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
corrections = {
'type': 'showing_results_for',
'corrected_query': yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
'original_query_url': no_autocorrect_query_url,
'original_query': renderer['originalQuery']['simpleText'],
}
continue
info = yt_data_extract.parse_info_prepare_for_html(renderer)
if info['type'] != 'unsupported':
parsed_results.append(info)
corrections = search_info['corrections']
if corrections['type'] == 'did_you_mean':
corrected_query_string = request.args.to_dict(flat=False)
corrected_query_string['query'] = [corrections['corrected_query']]
corrections['corrected_query_url'] = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
elif corrections['type'] == 'showing_results_for':
no_autocorrect_query_string = request.args.to_dict(flat=False)
no_autocorrect_query_string['autocorrect'] = ['0']
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
corrections['original_query_url'] = no_autocorrect_query_url
return flask.render_template('search.html',
header_playlist_names = local_playlist.get_playlist_names(),
query = query,
estimated_results = estimated_results,
estimated_pages = estimated_pages,
corrections = corrections,
results = parsed_results,
estimated_results = search_info['estimated_results'],
estimated_pages = search_info['estimated_pages'],
corrections = search_info['corrections'],
results = search_info['items'],
parameters_dictionary = request.args,
)

View File

@@ -172,7 +172,7 @@ def _get_videos(cursor, number_per_page, offset, tag = None):
'id': db_video[0],
'title': db_video[1],
'duration': db_video[2],
'published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'author': db_video[5],
})
@@ -455,10 +455,17 @@ def _get_upstream_videos(channel_id):
print('Failed to read atoma feed for ' + channel_status_name)
traceback.print_exc()
videos = channel.extract_info(json.loads(channel_tab), 'videos')['items']
channel_info = yt_data_extract.extract_channel_info(json.loads(channel_tab), 'videos')
if channel_info['error']:
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error'])
return
videos = channel_info['items']
for i, video_item in enumerate(videos):
if 'description' not in video_item:
if not video_item.get('description'):
video_item['description'] = ''
else:
video_item['description'] = ''.join(run.get('text', '') for run in video_item['description'])
if video_item['id'] in times_published:
video_item['time_published'] = times_published[video_item['id']]
@@ -466,7 +473,7 @@ def _get_upstream_videos(channel_id):
else:
video_item['is_time_published_exact'] = False
try:
video_item['time_published'] = youtube_timestamp_to_posix(video_item['published']) - i # subtract a few seconds off the videos so they will be in the right order
video_item['time_published'] = youtube_timestamp_to_posix(video_item['time_published']) - i # subtract a few seconds off the videos so they will be in the right order
except KeyError:
print(video_item)
@@ -759,7 +766,7 @@ def get_subscriptions_page():
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg'
video['type'] = 'video'
video['item_size'] = 'small'
yt_data_extract.add_extra_html_info(video)
util.add_extra_html_info(video)
tags = _get_all_tags(cursor)

View File

@@ -12,11 +12,11 @@
<a class="author" href="{{ comment['author_url'] }}" title="{{ comment['author'] }}">{{ comment['author'] }}</a>
</address>
<a class="permalink" href="{{ comment['permalink'] }}" title="permalink">
<time datetime="">{{ comment['published'] }}</time>
<time datetime="">{{ comment['time_published'] }}</time>
</a>
<span class="text">{{ common_elements.text_runs(comment['text']) }}</span>
<span class="likes">{{ comment['likes_text'] if comment['likes'] else ''}}</span>
<span class="likes">{{ comment['likes_text'] if comment['like_count'] else ''}}</span>
<div class="bottom-row">
<a href="{{ comment['replies_url'] }}" class="replies">{{ comment['view_replies_text'] }}</a>
{% if 'delete_url' is in comment %}

View File

@@ -9,53 +9,59 @@
{{ text_run["text"] }}
{%- endif -%}
{%- endfor -%}
{%- else -%}
{%- elif runs -%}
{{ runs }}
{%- endif -%}
{% endmacro %}
{% macro item(info, description=false, horizontal=true, include_author=true) %}
{% macro item(info, description=false, horizontal=true, include_author=true, include_badges=true) %}
<div class="item-box {{ info['type'] + '-item-box' }} {{'horizontal-item-box' if horizontal else 'vertical-item-box'}} {{'has-description' if description else 'no-description'}}">
<div class="item {{ info['type'] + '-item' }}">
<a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}">
<img class="thumbnail-img" src="{{ info['thumbnail'] }}">
{% if info['type'] != 'channel' %}
<div class="thumbnail-info">
<span>{{ info['size'] if info['type'] == 'playlist' else info['duration'] }}</span>
</div>
{% if info['error'] %}
{{ info['error'] }}
{% else %}
<div class="item {{ info['type'] + '-item' }}">
<a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}">
<img class="thumbnail-img" src="{{ info['thumbnail'] }}">
{% if info['type'] != 'channel' %}
<div class="thumbnail-info">
<span>{{ (info['video_count']|string + ' videos') if info['type'] == 'playlist' else info['duration'] }}</span>
</div>
{% endif %}
</a>
<div class="title"><a class="title" href="{{ info['url'] }}" title="{{ info['title'] }}">{{ info['title'] }}</a></div>
{% if include_author %}
{% if info.get('author_url') %}
<address title="{{ info['author'] }}">By <a href="{{ info['author_url'] }}">{{ info['author'] }}</a></address>
{% else %}
<address title="{{ info['author'] }}"><b>{{ info['author'] }}</b></address>
{% endif %}
{% endif %}
</a>
<div class="title"><a class="title" href="{{ info['url'] }}" title="{{ info['title'] }}">{{ info['title'] }}</a></div>
<ul class="stats {{'vertical-stats' if horizontal and not description and include_author else 'horizontal-stats'}}">
{% if info['type'] == 'channel' %}
<li><span>{{ info['subscriber_count'] }} subscribers</span></li>
<li><span>{{ info['size'] }} videos</span></li>
{% else %}
{% if include_author %}
{% if 'author_url' is in(info) %}
<li><address title="{{ info['author'] }}">By <a href="{{ info['author_url'] }}">{{ info['author'] }}</a></address></li>
{% else %}
<li><address title="{{ info['author'] }}"><b>{{ info['author'] }}</b></address></li>
<ul class="stats {{'horizontal-stats' if horizontal else 'vertical-stats'}}">
{% if info['type'] == 'channel' %}
<li><span>{{ info['approx_subscriber_count'] }} subscribers</span></li>
<li><span>{{ info['video_count'] }} videos</span></li>
{% else %}
{% if info.get('approx_view_count') %}
<li><span class="views">{{ info['approx_view_count'] }} views</span></li>
{% endif %}
{% if info.get('time_published') %}
<li><time>{{ info['time_published'] }}</time></li>
{% endif %}
{% endif %}
{% if 'views' is in(info) %}
<li><span class="views">{{ info['views'] }}</span></li>
{% endif %}
{% if 'published' is in(info) %}
<li><time>{{ info['published'] }}</time></li>
{% endif %}
{% endif %}
</ul>
</ul>
{% if description %}
<span class="description">{{ text_runs(info.get('description', '')) }}</span>
{% if description %}
<span class="description">{{ text_runs(info.get('description', '')) }}</span>
{% endif %}
{% if include_badges %}
<span class="badges">{{ info['badges']|join(' | ') }}</span>
{% endif %}
</div>
{% if info['type'] == 'video' %}
<input class="item-checkbox" type="checkbox" name="video_info_list" value="{{ info['video_info'] }}" form="playlist-edit">
{% endif %}
<span class="badges">{{ info['badges']|join(' | ') }}</span>
</div>
{% if info['type'] == 'video' %}
<input class="item-checkbox" type="checkbox" name="video_info_list" value="{{ info['video_info'] }}" form="playlist-edit">
{% endif %}
</div>

View File

@@ -54,8 +54,9 @@
<h2 class="playlist-title">{{ title }}</h2>
<a class="playlist-author" href="{{ author_url }}">{{ author }}</a>
<div class="playlist-stats">
<div>{{ views }}</div>
<div>{{ size }}</div>
<div>{{ video_count|commatize }} videos</div>
<div>{{ view_count|commatize }} views</div>
<div>Last updated {{ time_published }}</div>
</div>
<div class="playlist-description">{{ common_elements.text_runs(description) }}</div>
</div>

View File

@@ -29,10 +29,10 @@
<div id="result-info">
<div id="number-of-results">Approximately {{ '{:,}'.format(estimated_results) }} results ({{ '{:,}'.format(estimated_pages) }} pages)</div>
{% if corrections['type'] == 'showing_results_for' %}
<div>Showing results for <a>{{ corrections['corrected_query']|safe }}</a></div>
<div>Search instead for <a href="{{ corrections['original_query_url'] }}">{{ corrections['original_query'] }}</a></div>
<div>Showing results for <a>{{ common_elements.text_runs(corrections['corrected_query_text']) }}</a></div>
<div>Search instead for <a href="{{ corrections['original_query_url'] }}">{{ corrections['original_query_text'] }}</a></div>
{% elif corrections['type'] == 'did_you_mean' %}
<div>Did you mean <a href="{{ corrections['corrected_query_url'] }}">{{ corrections['corrected_query']|safe }}</a></div>
<div>Did you mean <a href="{{ corrections['corrected_query_url'] }}">{{ common_elements.text_runs(corrections['corrected_query_text']) }}</a></div>
{% endif %}
</div>
<div class="item-list">

View File

@@ -14,6 +14,19 @@
text-decoration: underline;
}
.playability-error{
height: 360px;
width: 640px;
grid-column: 2;
background-color: var(--video-background-color);
text-align:center;
}
.playability-error span{
position: relative;
top: 50%;
transform: translate(-50%, -50%);
}
{% if theater_mode %}
video{
grid-column: 1 / span 5;
@@ -61,12 +74,21 @@
grid-column: 1 / span 2;
min-width: 0;
}
.video-info > .is-unlisted{
background-color: var(--interface-color);
.video-info > .labels{
justify-self:start;
padding-left:2px;
padding-right:2px;
list-style: none;
padding: 0px;
margin: 5px 0px;
}
.video-info > .labels:empty{
margin: 0px;
}
.labels > li{
display: inline;
margin-right:5px;
background-color: var(--interface-color);
padding: 2px 5px
}
.video-info > address{
grid-column: 1;
grid-row: 3;
@@ -143,9 +165,13 @@
.related-videos-inner{
padding-top: 10px;
display: grid;
grid-auto-rows: 94px;
grid-auto-rows: 90px;
grid-row-gap: 10px;
}
.thumbnail-box{ /* overides rule in shared.css */
height: 90px !important;
width: 120px !important;
}
/* Put related vids below videos when window is too small */
/* 1100px instead of 1080 because W3C is full of idiots who include scrollbar width */
@@ -187,38 +213,59 @@
.format-ext{
width: 60px;
}
.format-res{
width:90px;
.format-video-quality{
width: 140px;
}
.format-audio-quality{
width: 120px;
}
.format-file-size{
width: 80px;
}
.format-codecs{
width: 120px;
}
{% endblock style %}
{% block main %}
<video controls autofocus>
{% for video_source in video_sources %}
<source src="{{ video_source['src'] }}" type="{{ video_source['type'] }}">
{% endfor %}
{% if playability_error %}
<div class="playability-error"><span>{{ 'Error: ' + playability_error }}</span></div>
{% else %}
<video controls autofocus class="video">
{% for video_source in video_sources %}
<source src="{{ video_source['src'] }}" type="{{ video_source['type'] }}">
{% endfor %}
{% for source in subtitle_sources %}
{% if source['on'] %}
<track label="{{ source['label'] }}" src="{{ source['url'] }}" kind="subtitles" srclang="{{ source['srclang'] }}" default>
{% else %}
<track label="{{ source['label'] }}" src="{{ source['url'] }}" kind="subtitles" srclang="{{ source['srclang'] }}">
{% endif %}
{% endfor %}
{% for source in subtitle_sources %}
{% if source['on'] %}
<track label="{{ source['label'] }}" src="{{ source['url'] }}" kind="subtitles" srclang="{{ source['srclang'] }}" default>
{% else %}
<track label="{{ source['label'] }}" src="{{ source['url'] }}" kind="subtitles" srclang="{{ source['srclang'] }}">
{% endif %}
{% endfor %}
</video>
</video>
{% endif %}
<div class="video-info">
<h2 class="title">{{ title }}</h2>
{% if unlisted %}
<span class="is-unlisted">Unlisted</span>
{% endif %}
<ul class="labels">
{%- if unlisted -%}
<li class="is-unlisted">Unlisted</li>
{%- endif -%}
{%- if age_restricted -%}
<li class="age-restricted">Age-restricted</li>
{%- endif -%}
{%- if limited_state -%}
<li>Limited state</li>
{%- endif -%}
</ul>
<address>Uploaded by <a href="{{ uploader_channel_url }}">{{ uploader }}</a></address>
<span class="views">{{ views }} views</span>
<span class="views">{{ view_count }} views</span>
<time datetime="$upload_date">Published on {{ upload_date }}</time>
<span class="likes-dislikes">{{ likes }} likes {{ dislikes }} dislikes</span>
<time datetime="$upload_date">Published on {{ time_published }}</time>
<span class="likes-dislikes">{{ like_count }} likes {{ dislike_count }} dislikes</span>
<details class="download-dropdown">
<summary class="download-dropdown-label">Download</summary>
<ul class="download-dropdown-content">
@@ -227,8 +274,10 @@
<a class="download-link" href="{{ format['url'] }}">
<ol class="format-attributes">
<li class="format-ext">{{ format['ext'] }}</li>
<li class="format-res">{{ format['resolution'] }}</li>
<li class="format-note">{{ format['note'] }}</li>
<li class="format-video-quality">{{ format['video_quality'] }}</li>
<li class="format-audio-quality">{{ format['audio_quality'] }}</li>
<li class="format-file-size">{{ format['file_size'] }}</li>
<li class="format-codecs">{{ format['codecs'] }}</li>
</ol>
</a>
</li>
@@ -238,7 +287,7 @@
<input class="checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox">
<span class="description">{{ description }}</span>
<span class="description">{{ common_elements.text_runs(description) }}</span>
<div class="music-list">
{% if music_list.__len__() != 0 %}
<hr>
@@ -266,7 +315,7 @@
<summary>Related Videos</summary>
<nav class="related-videos-inner">
{% for info in related %}
{{ common_elements.item(info) }}
{{ common_elements.item(info, include_badges=false) }}
{% endfor %}
</nav>
</details>

View File

@@ -1,4 +1,5 @@
import settings
from youtube import yt_data_extract
import socks, sockshandler
import gzip
import brotli
@@ -6,6 +7,7 @@ import urllib.parse
import re
import time
import os
import json
import gevent
import gevent.queue
import gevent.lock
@@ -176,7 +178,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja
return content, response
return content
mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
mobile_ua = (('User-Agent', mobile_user_agent),)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
@@ -277,15 +279,6 @@ def video_id(url):
url_parts = urllib.parse.urlparse(url)
return urllib.parse.parse_qs(url_parts.query)['v'][0]
def default_multi_get(object, *keys, default):
''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
try:
for key in keys:
object = object[key]
return object
except (IndexError, KeyError):
return default
# default, sddefault, mqdefault, hqdefault, hq720
def get_thumbnail_url(video_id):
@@ -317,3 +310,52 @@ def uppercase_escape(s):
return re.sub(
r'\\U([0-9a-fA-F]{8})',
lambda m: chr(int(m.group(1), base=16)), s)
def prefix_url(url):
if url is None:
return None
url = url.lstrip('/') # some urls have // before them, which has a special meaning
return '/' + url
def left_remove(string, substring):
'''removes substring from the start of string, if present'''
if string.startswith(substring):
return string[len(substring):]
return string
def prefix_urls(item):
try:
item['thumbnail'] = prefix_url(item['thumbnail'])
except KeyError:
pass
try:
item['author_url'] = prefix_url(item['author_url'])
except KeyError:
pass
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
video_info = {}
for key in ('id', 'title', 'author', 'duration'):
try:
video_info[key] = item[key]
except KeyError:
video_info[key] = ''
item['video_info'] = json.dumps(video_info)
elif item['type'] == 'playlist':
item['url'] = (URL_ORIGIN + '/playlist?list=' + item['id']) if item.get('id') else None
elif item['type'] == 'channel':
item['url'] = (URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None
def parse_info_prepare_for_html(renderer, additional_info={}):
item = yt_data_extract.extract_item_info(renderer, additional_info)
prefix_urls(item)
add_extra_html_info(item)
return item

View File

@@ -5,49 +5,20 @@ import settings
from flask import request
import flask
from youtube_dl.YoutubeDL import YoutubeDL
from youtube_dl.extractor.youtube import YoutubeError
import json
import html
import gevent
import os
import math
import traceback
import urllib
try:
with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f:
decrypt_cache = json.loads(f.read())['decrypt_cache']
except FileNotFoundError:
decrypt_cache = {}
def get_related_items(info):
results = []
for item in info['related_vids']:
if 'list' in item: # playlist:
result = watch_page_related_playlist_info(item)
else:
result = watch_page_related_video_info(item)
yt_data_extract.prefix_urls(result)
yt_data_extract.add_extra_html_info(result)
results.append(result)
return results
# json of related items retrieved directly from the watch page has different names for everything
# converts these to standard names
def watch_page_related_video_info(item):
result = {key: item[key] for key in ('id', 'title', 'author')}
result['duration'] = util.seconds_to_timestamp(item['length_seconds'])
try:
result['views'] = item['short_view_count_text']
except KeyError:
result['views'] = ''
result['thumbnail'] = util.get_thumbnail_url(item['id'])
result['type'] = 'video'
return result
def watch_page_related_playlist_info(item):
return {
'size': item['playlist_length'] if item['playlist_length'] != "0" else "50+",
'title': item['playlist_title'],
'id': item['list'],
'first_video_id': item['video_id'],
'thumbnail': util.get_thumbnail_url(item['video_id']),
'type': 'playlist',
}
def get_video_sources(info):
video_sources = []
@@ -55,9 +26,10 @@ def get_video_sources(info):
max_resolution = 360
else:
max_resolution = settings.default_resolution
for format in info['formats']:
if format['acodec'] != 'none' and format['vcodec'] != 'none' and format['height'] <= max_resolution:
if not all(format[attr] for attr in ('height', 'width', 'ext', 'url')):
continue
if format['acodec'] and format['vcodec'] and format['height'] <= max_resolution:
video_sources.append({
'src': format['url'],
'type': 'video/' + format['ext'],
@@ -71,50 +43,108 @@ def get_video_sources(info):
return video_sources
def make_caption_src(info, lang, auto=False, trans_lang=None):
label = lang
if auto:
label += ' (Automatic)'
if trans_lang:
label += ' -> ' + trans_lang
return {
'url': '/' + yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang),
'label': label,
'srclang': trans_lang[0:2] if trans_lang else lang[0:2],
'on': False,
}
def lang_in(lang, sequence):
'''Tests if the language is in sequence, with e.g. en and en-US considered the same'''
if lang is None:
return False
lang = lang[0:2]
return lang in (l[0:2] for l in sequence)
def lang_eq(lang1, lang2):
'''Tests if two iso 639-1 codes are equal, with en and en-US considered the same.
Just because the codes are equal does not mean the dialects are mutually intelligible, but this will have to do for now without a complex language model'''
if lang1 is None or lang2 is None:
return False
return lang1[0:2] == lang2[0:2]
def equiv_lang_in(lang, sequence):
'''Extracts a language in sequence which is equivalent to lang.
e.g. if lang is en, extracts en-GB from sequence.
Necessary because if only a specific variant like en-GB is available, can't ask Youtube for simply en. Need to get the available variant.'''
lang = lang[0:2]
for l in sequence:
if l[0:2] == lang:
return l
return None
def get_subtitle_sources(info):
'''Returns these sources, ordered from least to most intelligible:
native_video_lang (Automatic)
foreign_langs (Manual)
native_video_lang (Automatic) -> pref_lang
foreign_langs (Manual) -> pref_lang
native_video_lang (Manual) -> pref_lang
pref_lang (Automatic)
pref_lang (Manual)'''
sources = []
default_found = False
default = None
for language, formats in info['subtitles'].items():
for format in formats:
if format['ext'] == 'vtt':
source = {
'url': '/' + format['url'],
'label': language,
'srclang': language,
pref_lang = settings.subtitles_language
native_video_lang = None
if info['automatic_caption_languages']:
native_video_lang = info['automatic_caption_languages'][0]
# set as on by default if this is the preferred language and a default-on subtitles mode is in settings
'on': language == settings.subtitles_language and settings.subtitles_mode > 0,
}
highest_fidelity_is_manual = False
if language == settings.subtitles_language:
default_found = True
default = source
else:
sources.append(source)
break
# Put it at the end to avoid browser bug when there are too many languages
# Sources are added in very specific order outlined above
# More intelligible sources are put further down to avoid browser bug when there are too many languages
# (in firefox, it is impossible to select a language near the top of the list because it is cut off)
if default_found:
sources.append(default)
try:
formats = info['automatic_captions'][settings.subtitles_language]
except KeyError:
pass
else:
for format in formats:
if format['ext'] == 'vtt':
sources.append({
'url': '/' + format['url'],
'label': settings.subtitles_language + ' - Automatic',
'srclang': settings.subtitles_language,
# native_video_lang (Automatic)
if native_video_lang and not lang_eq(native_video_lang, pref_lang):
sources.append(make_caption_src(info, native_video_lang, auto=True))
# set as on by default if this is the preferred language and a default-on subtitles mode is in settings
'on': settings.subtitles_mode == 2 and not default_found,
# foreign_langs (Manual)
for lang in info['manual_caption_languages']:
if not lang_eq(lang, pref_lang):
sources.append(make_caption_src(info, lang))
})
if (lang_in(pref_lang, info['translation_languages'])
and not lang_in(pref_lang, info['automatic_caption_languages'])
and not lang_in(pref_lang, info['manual_caption_languages'])):
# native_video_lang (Automatic) -> pref_lang
if native_video_lang and not lang_eq(pref_lang, native_video_lang):
sources.append(make_caption_src(info, native_video_lang, auto=True, trans_lang=pref_lang))
# foreign_langs (Manual) -> pref_lang
for lang in info['manual_caption_languages']:
if not lang_eq(lang, native_video_lang) and not lang_eq(lang, pref_lang):
sources.append(make_caption_src(info, lang, trans_lang=pref_lang))
# native_video_lang (Manual) -> pref_lang
if lang_in(native_video_lang, info['manual_caption_languages']):
sources.append(make_caption_src(info, native_video_lang, trans_lang=pref_lang))
# pref_lang (Automatic)
if lang_in(pref_lang, info['automatic_caption_languages']):
sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['automatic_caption_languages']), auto=True))
# pref_lang (Manual)
if lang_in(pref_lang, info['manual_caption_languages']):
sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['manual_caption_languages'])))
highest_fidelity_is_manual = True
if sources and sources[-1]['srclang'] == pref_lang:
# set as on by default since it's manual a default-on subtitles mode is in settings
if highest_fidelity_is_manual and settings.subtitles_mode > 0:
sources[-1]['on'] = True
# set as on by default since settings indicate to set it as such even if it's not manual
elif settings.subtitles_mode == 2:
sources[-1]['on'] = True
if len(sources) == 0:
assert len(info['automatic_caption_languages']) == 0 and len(info['manual_caption_languages']) == 0
return sources
@@ -134,14 +164,111 @@ def get_ordered_music_list_attributes(music_list):
return ordered_attributes
def extract_info(downloader, *args, **kwargs):
def save_decrypt_cache():
try:
return downloader.extract_info(*args, **kwargs)
except YoutubeError as e:
return str(e)
f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
except FileNotFoundError:
os.makedirs(settings.data_dir)
f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
f.close()
def decrypt_signatures(info):
'''return error string, or False if no errors'''
if not yt_data_extract.requires_decryption(info):
return False
if not info['player_name']:
return 'Could not find player name'
if not info['base_js']:
return 'Failed to find base.js'
player_name = info['player_name']
if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name)
info['decryption_function'] = decrypt_cache[player_name]
else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js)
if err:
return err
decrypt_cache[player_name] = info['decryption_function']
save_decrypt_cache()
err = yt_data_extract.decrypt_signatures(info)
return err
headers = (
('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'),
('X-YouTube-Client-Name', '2'),
('X-YouTube-Client-Version', '2.20180830'),
) + util.mobile_ua
def extract_info(video_id):
polymer_json = util.fetch_url('https://m.youtube.com/watch?v=' + video_id + '&pbj=1&bpctr=9999999999', headers=headers, debug_name='watch').decode('utf-8')
# TODO: Decide whether this should be done in yt_data_extract.extract_watch_info
try:
polymer_json = json.loads(polymer_json)
except json.decoder.JSONDecodeError:
traceback.print_exc()
return {'error': 'Failed to parse json response'}
info = yt_data_extract.extract_watch_info(polymer_json)
# age restriction bypass
if info['age_restricted']:
print('Fetching age restriction bypass page')
data = {
'video_id': video_id,
'eurl': 'https://youtube.googleapis.com/v/' + video_id,
}
url = 'https://www.youtube.com/get_video_info?' + urllib.parse.urlencode(data)
video_info_page = util.fetch_url(url, debug_name='get_video_info', report_text='Fetched age restriction bypass page').decode('utf-8')
yt_data_extract.update_with_age_restricted_info(info, video_info_page)
# signature decryption
decryption_error = decrypt_signatures(info)
if decryption_error:
decryption_error = 'Error decrypting url signatures: ' + decryption_error
info['playability_error'] = decryption_error
return info
def video_quality_string(format):
if format['vcodec']:
result =str(format['width'] or '?') + 'x' + str(format['height'] or '?')
if format['fps']:
result += ' ' + str(format['fps']) + 'fps'
return result
elif format['acodec']:
return 'audio only'
return '?'
def audio_quality_string(format):
if format['acodec']:
result = str(format['audio_bitrate'] or '?') + 'k'
if format['audio_sample_rate']:
result += ' ' + str(format['audio_sample_rate']) + ' Hz'
return result
elif format['vcodec']:
return 'video only'
return '?'
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/utils.py
def format_bytes(bytes):
if bytes is None:
return 'N/A'
if type(bytes) is str:
bytes = float(bytes)
if bytes == 0.0:
exponent = 0
else:
exponent = int(math.log(bytes, 1024.0))
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
converted = float(bytes) / float(1024 ** exponent)
return '%.2f%s' % (converted, suffix)
@yt_app.route('/watch')
@@ -152,38 +279,26 @@ def get_watch_page():
flask.abort(flask.Response('Incomplete video id (too short): ' + video_id))
lc = request.args.get('lc', '')
if settings.route_tor:
proxy = 'socks5://127.0.0.1:9150/'
else:
proxy = ''
yt_dl_downloader = YoutubeDL(params={'youtube_include_dash_manifest':False, 'proxy':proxy})
tasks = (
gevent.spawn(comments.video_comments, video_id, int(settings.default_comment_sorting), lc=lc ),
gevent.spawn(extract_info, yt_dl_downloader, "https://www.youtube.com/watch?v=" + video_id, download=False)
gevent.spawn(extract_info, video_id)
)
gevent.joinall(tasks)
comments_info, info = tasks[0].value, tasks[1].value
if isinstance(info, str): # youtube error
return flask.render_template('error.html', error_message = info)
if info['error']:
return flask.render_template('error.html', error_message = info['error'])
video_info = {
"duration": util.seconds_to_timestamp(info["duration"]),
"duration": util.seconds_to_timestamp(info["duration"] or 0),
"id": info['id'],
"title": info['title'],
"author": info['uploader'],
"author": info['author'],
}
upload_year = info["upload_date"][0:4]
upload_month = info["upload_date"][4:6]
upload_day = info["upload_date"][6:8]
upload_date = upload_month + "/" + upload_day + "/" + upload_year
if settings.related_videos_mode:
related_videos = get_related_items(info)
else:
related_videos = []
for item in info['related_videos']:
util.prefix_urls(item)
util.add_extra_html_info(item)
if settings.gather_googlevideo_domains:
with open(os.path.join(settings.data_dir, 'googlevideo-domains.txt'), 'a+', encoding='utf-8') as f:
@@ -195,31 +310,37 @@ def get_watch_page():
download_formats = []
for format in info['formats']:
if format['acodec'] and format['vcodec']:
codecs_string = format['acodec'] + ', ' + format['vcodec']
else:
codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({
'url': format['url'],
'ext': format['ext'],
'resolution': yt_dl_downloader.format_resolution(format),
'note': yt_dl_downloader._format_note(format),
'ext': format['ext'] or '?',
'audio_quality': audio_quality_string(format),
'video_quality': video_quality_string(format),
'file_size': format_bytes(format['file_size']),
'codecs': codecs_string,
})
video_sources = get_video_sources(info)
video_height = video_sources[0]['height']
video_height = yt_data_extract.deep_get(video_sources, 0, 'height', default=360)
video_width = yt_data_extract.deep_get(video_sources, 0, 'width', default=640)
# 1 second per pixel, or the actual video width
theater_video_target_width = max(640, info['duration'], video_sources[0]['width'])
theater_video_target_width = max(640, info['duration'] or 0, video_width)
return flask.render_template('watch.html',
header_playlist_names = local_playlist.get_playlist_names(),
uploader_channel_url = '/' + info['uploader_url'],
upload_date = upload_date,
views = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
likes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
dislikes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)),
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
time_published = info['time_published'],
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
dislike_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)),
download_formats = download_formats,
video_info = json.dumps(video_info),
video_sources = video_sources,
subtitle_sources = get_subtitle_sources(info),
related = related_videos,
related = info['related_videos'],
music_list = info['music_list'],
music_attributes = get_ordered_music_list_attributes(info['music_list']),
comments_info = comments_info,
@@ -232,9 +353,12 @@ def get_watch_page():
theater_video_target_width = theater_video_target_width,
title = info['title'],
uploader = info['uploader'],
uploader = info['author'],
description = info['description'],
unlisted = info['unlisted'],
limited_state = info['limited_state'],
age_restricted = info['age_restricted'],
playability_error = info['playability_error'],
)

View File

@@ -1,273 +0,0 @@
from youtube import util
import html
import json
# videos (all of type str):
# id
# title
# url
# author
# author_url
# thumbnail
# description
# published
# duration
# likes
# dislikes
# views
# playlist_index
# playlists:
# id
# title
# url
# author
# author_url
# thumbnail
# description
# updated
# size
# first_video_id
def get_plain_text(node):
try:
return node['simpleText']
except KeyError:
return ''.join(text_run['text'] for text_run in node['runs'])
def format_text_runs(runs):
if isinstance(runs, str):
return runs
result = ''
for text_run in runs:
if text_run.get("bold", False):
result += "<b>" + html.escape(text_run["text"]) + "</b>"
elif text_run.get('italics', False):
result += "<i>" + html.escape(text_run["text"]) + "</i>"
else:
result += html.escape(text_run["text"])
return result
def get_url(node):
try:
return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
except KeyError:
return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
def get_text(node):
if node == {}:
return ''
try:
return node['simpleText']
except KeyError:
pass
try:
return node['runs'][0]['text']
except IndexError: # empty text runs
return ''
except KeyError:
print(node)
raise
def get_formatted_text(node):
try:
return node['runs']
except KeyError:
return node['simpleText']
def get_badges(node):
badges = []
for badge_node in node:
badge = badge_node['metadataBadgeRenderer']['label']
badges.append(badge)
return badges
def get_thumbnail(node):
try:
return node['thumbnails'][0]['url'] # polymer format
except KeyError:
return node['url'] # ajax format
dispatch = {
# polymer format
'title': ('title', get_text),
'publishedTimeText': ('published', get_text),
'videoId': ('id', lambda node: node),
'descriptionSnippet': ('description', get_formatted_text),
'lengthText': ('duration', get_text),
'thumbnail': ('thumbnail', get_thumbnail),
'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
'viewCountText': ('views', get_text),
'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos"
'videoCountText': ('size', get_text),
'playlistId': ('id', lambda node: node),
'descriptionText': ('description', get_formatted_text),
'subscriberCountText': ('subscriber_count', get_text),
'channelId': ('id', lambda node: node),
'badges': ('badges', get_badges),
# ajax format
'view_count_text': ('views', get_text),
'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
'owner_text': ('author', get_text),
'owner_endpoint': ('author_url', lambda node: node['url']),
'description': ('description', get_formatted_text),
'index': ('playlist_index', get_text),
'short_byline': ('author', get_text),
'length': ('duration', get_text),
'video_id': ('id', lambda node: node),
}
def ajax_info(item_json):
try:
info = {}
for key, node in item_json.items():
try:
simple_key, function = dispatch[key]
except KeyError:
continue
info[simple_key] = function(node)
return info
except KeyError:
print(item_json)
raise
def prefix_urls(item):
try:
item['thumbnail'] = '/' + item['thumbnail'].lstrip('/')
except KeyError:
pass
try:
item['author_url'] = util.URL_ORIGIN + item['author_url']
except KeyError:
pass
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = util.URL_ORIGIN + '/watch?v=' + item['id']
video_info = {}
for key in ('id', 'title', 'author', 'duration'):
try:
video_info[key] = item[key]
except KeyError:
video_info[key] = ''
item['video_info'] = json.dumps(video_info)
elif item['type'] == 'playlist':
item['url'] = util.URL_ORIGIN + '/playlist?list=' + item['id']
elif item['type'] == 'channel':
item['url'] = util.URL_ORIGIN + "/channel/" + item['id']
def renderer_info(renderer, additional_info={}):
type = list(renderer.keys())[0]
renderer = renderer[type]
info = {}
if type == 'itemSectionRenderer':
return renderer_info(renderer['contents'][0], additional_info)
if type in ('movieRenderer', 'clarificationRenderer'):
info['type'] = 'unsupported'
return info
info.update(additional_info)
if type in ('compactVideoRenderer', 'videoRenderer', 'playlistVideoRenderer', 'gridVideoRenderer'):
info['type'] = 'video'
elif type in ('playlistRenderer', 'compactPlaylistRenderer', 'gridPlaylistRenderer',
'radioRenderer', 'compactRadioRenderer', 'gridRadioRenderer',
'showRenderer', 'compactShowRenderer', 'gridShowRenderer'):
info['type'] = 'playlist'
elif type == 'channelRenderer':
info['type'] = 'channel'
elif type == 'playlistHeaderRenderer':
info['type'] = 'playlist_metadata'
else:
info['type'] = 'unsupported'
return info
try:
if 'viewCountText' in renderer: # prefer this one as it contains all the digits
info['views'] = get_text(renderer['viewCountText'])
elif 'shortViewCountText' in renderer:
info['views'] = get_text(renderer['shortViewCountText'])
if 'ownerText' in renderer:
info['author'] = renderer['ownerText']['runs'][0]['text']
info['author_url'] = renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
try:
overlays = renderer['thumbnailOverlays']
except KeyError:
pass
else:
for overlay in overlays:
if 'thumbnailOverlayTimeStatusRenderer' in overlay:
info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text'])
# show renderers don't have videoCountText
elif 'thumbnailOverlayBottomPanelRenderer' in overlay:
info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text'])
# show renderers don't have playlistId, have to dig into the url to get it
try:
info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId']
except KeyError:
pass
for key, node in renderer.items():
if key in ('longBylineText', 'shortBylineText'):
info['author'] = get_text(node)
try:
info['author_url'] = get_url(node)
except KeyError:
pass
# show renderers don't have thumbnail key at top level, dig into thumbnailRenderer
elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node:
info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url']
else:
try:
simple_key, function = dispatch[key]
except KeyError:
continue
info[simple_key] = function(node)
if info['type'] == 'video' and 'duration' not in info:
info['duration'] = 'Live'
return info
except KeyError:
print(renderer)
raise
def parse_info_prepare_for_html(renderer, additional_info={}):
item = renderer_info(renderer, additional_info)
prefix_urls(item)
add_extra_html_info(item)
return item

View File

@@ -0,0 +1,11 @@
from .common import (get, multi_get, deep_get, multi_deep_get,
liberal_update, conservative_update, remove_redirect, normalize_url,
extract_str, extract_formatted_text, extract_int, extract_approx_int,
extract_date, extract_item_info, extract_items, extract_response)
from .everything_else import (extract_channel_info, extract_search_info,
extract_playlist_metadata, extract_playlist_info, extract_comments_info)
from .watch_extraction import (extract_watch_info, get_caption_url,
update_with_age_restricted_info, requires_decryption,
extract_decryption_function, decrypt_signatures)

View File

@@ -0,0 +1,415 @@
import re
import urllib.parse
import collections
def get(object, key, default=None, types=()):
'''Like dict.get(), but returns default if the result doesn't match one of the types.
Also works for indexing lists.'''
try:
result = object[key]
except (TypeError, IndexError, KeyError):
return default
if not types or isinstance(result, types):
return result
else:
return default
def multi_get(object, *keys, default=None, types=()):
'''Like get, but try other keys if the first fails'''
for key in keys:
try:
result = object[key]
except (TypeError, IndexError, KeyError):
pass
else:
if not types or isinstance(result, types):
return result
else:
continue
return default
def deep_get(object, *keys, default=None, types=()):
'''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices.
Last argument is the default value to use in case of any IndexErrors or KeyErrors.
If types is given and the result doesn't match one of those types, default is returned'''
try:
for key in keys:
object = object[key]
except (TypeError, IndexError, KeyError):
return default
else:
if not types or isinstance(object, types):
return object
else:
return default
def multi_deep_get(object, *key_sequences, default=None, types=()):
'''Like deep_get, but can try different key sequences in case one fails.
Return default if all of them fail. key_sequences is a list of lists'''
for key_sequence in key_sequences:
_object = object
try:
for key in key_sequence:
_object = _object[key]
except (TypeError, IndexError, KeyError):
pass
else:
if not types or isinstance(_object, types):
return _object
else:
continue
return default
def liberal_update(obj, key, value):
'''Updates obj[key] with value as long as value is not None.
Ensures obj[key] will at least get a value of None, however'''
if (value is not None) or (key not in obj):
obj[key] = value
def conservative_update(obj, key, value):
'''Only updates obj if it doesn't have key or obj[key] is None'''
if obj.get(key) is None:
obj[key] = value
def remove_redirect(url):
if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking
query_string = url[url.find('?')+1: ]
return urllib.parse.parse_qs(query_string)['q'][0]
return url
youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$')
def normalize_url(url):
if url is None:
return None
match = youtube_url_re.fullmatch(url)
if match is None:
raise Exception()
return 'https://www.youtube.com' + match.group(1)
def _recover_urls(runs):
for run in runs:
url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
text = run.get('text', '')
# second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text
if url is not None and (text.startswith('http://') or text.startswith('https://')):
url = remove_redirect(url)
run['url'] = url
run['text'] = url # youtube truncates the url text, use actual url instead
def extract_str(node, default=None, recover_urls=False):
'''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)'''
if isinstance(node, str):
return node
try:
return node['simpleText']
except (KeyError, TypeError):
pass
if isinstance(node, dict) and 'runs' in node:
if recover_urls:
_recover_urls(node['runs'])
return ''.join(text_run.get('text', '') for text_run in node['runs'])
return default
def extract_formatted_text(node):
if not node:
return []
if 'runs' in node:
_recover_urls(node['runs'])
return node['runs']
elif 'simpleText' in node:
return [{'text': node['simpleText']}]
return []
def extract_int(string, default=None):
if isinstance(string, int):
return string
if not isinstance(string, str):
string = extract_str(string)
if not string:
return default
match = re.search(r'(\d+)', string.replace(',', ''))
if match is None:
return default
try:
return int(match.group(1))
except ValueError:
return default
def extract_approx_int(string):
'''e.g. "15M" from "15M subscribers"'''
if not isinstance(string, str):
string = extract_str(string)
if not string:
return None
match = re.search(r'(\d+[KMBTkmbt])', string.replace(',', ''))
if match is None:
return None
return match.group(1)
def extract_date(date_text):
'''Input: "Mar 9, 2019". Output: "2019-3-9"'''
if date_text is None:
return None
date_text = date_text.replace(',', '').lower()
parts = date_text.split()
if len(parts) >= 3:
month, day, year = parts[-3:]
month = month_abbreviations.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day
def check_missing_keys(object, *key_sequences):
for key_sequence in key_sequences:
_object = object
try:
for key in key_sequence:
_object = _object[key]
except (KeyError, IndexError, TypeError):
return 'Could not find ' + key
return None
def extract_item_info(item, additional_info={}):
if not item:
return {'error': 'No item given'}
type = get(list(item.keys()), 0)
if not type:
return {'error': 'Could not find type'}
item = item[type]
info = {'error': None}
if type in ('itemSectionRenderer', 'compactAutoplayRenderer'):
return extract_item_info(deep_get(item, 'contents', 0), additional_info)
if type in ('movieRenderer', 'clarificationRenderer'):
info['type'] = 'unsupported'
return info
info.update(additional_info)
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
# camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
if len(type_parts) < 2:
info['type'] = 'unsupported'
return
primary_type = type_parts[-2]
if primary_type == 'video':
info['type'] = 'video'
elif primary_type in ('playlist', 'radio', 'show'):
info['type'] = 'playlist'
elif primary_type == 'channel':
info['type'] = 'channel'
else:
info['type'] = 'unsupported'
info['title'] = extract_str(item.get('title'))
info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText'))
info['author_id'] = extract_str(multi_deep_get(item,
['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText'))
info['thumbnail'] = multi_deep_get(item,
['thumbnail', 'thumbnails', 0, 'url'], # videos
['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows
)
info['badges'] = []
for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()):
badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label')
if badge:
info['badges'].append(badge)
if primary_type in ('video', 'playlist'):
info['time_published'] = extract_str(item.get('publishedTimeText'))
if primary_type == 'video':
info['id'] = item.get('videoId')
info['view_count'] = extract_int(item.get('viewCountText'))
# dig into accessibility data to get view_count for videos marked as recommended, and to get time_published
accessibility_label = deep_get(item, 'title', 'accessibility', 'accessibilityData', 'label', default='')
timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label)
if timestamp:
conservative_update(info, 'time_published', timestamp.group(1))
view_count = re.search(r'(\d+) views', accessibility_label.replace(',', ''))
if view_count:
conservative_update(info, 'view_count', int(view_count.group(1)))
if info['view_count']:
info['approx_view_count'] = '{:,}'.format(info['view_count'])
else:
info['approx_view_count'] = extract_approx_int(multi_get(item, 'shortViewCountText'))
info['duration'] = extract_str(item.get('lengthText'))
elif primary_type == 'playlist':
info['id'] = item.get('playlistId')
info['video_count'] = extract_int(item.get('videoCount'))
elif primary_type == 'channel':
info['id'] = item.get('channelId')
info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
elif primary_type == 'show':
info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
if primary_type in ('playlist', 'channel'):
conservative_update(info, 'video_count', extract_int(item.get('videoCountText')))
for overlay in item.get('thumbnailOverlays', []):
conservative_update(info, 'duration', extract_str(deep_get(
overlay, 'thumbnailOverlayTimeStatusRenderer', 'text'
)))
# show renderers don't have videoCountText
conservative_update(info, 'video_count', extract_int(deep_get(
overlay, 'thumbnailOverlayBottomPanelRenderer', 'text'
)))
return info
def extract_response(polymer_json):
'''return response, error'''
response = multi_deep_get(polymer_json, [1, 'response'], ['response'], default=None, types=dict)
if response is None:
return None, 'Failed to extract response'
else:
return response, None
list_types = {
'sectionListRenderer',
'itemSectionRenderer',
'gridRenderer',
'playlistVideoListRenderer',
}
item_types = {
'movieRenderer',
'didYouMeanRenderer',
'showingResultsForRenderer',
'videoRenderer',
'compactVideoRenderer',
'compactAutoplayRenderer',
'gridVideoRenderer',
'playlistVideoRenderer',
'playlistRenderer',
'compactPlaylistRenderer',
'gridPlaylistRenderer',
'radioRenderer',
'compactRadioRenderer',
'gridRadioRenderer',
'showRenderer',
'compactShowRenderer',
'gridShowRenderer',
'channelRenderer',
'compactChannelRenderer',
'gridChannelRenderer',
'channelAboutFullMetadataRenderer',
}
def _traverse_browse_renderer(renderer):
for tab in get(renderer, 'tabs', (), types=(list, tuple)):
tab_renderer = multi_deep_get(tab, ['tabRenderer'], ['expandableTabRenderer'], default=None, types=dict)
if tab_renderer is None:
continue
if tab_renderer.get('selected', False):
return get(tab_renderer, 'content', {}, types=(dict))
print('Could not find tab with content')
return {}
def _traverse_standard_list(renderer):
renderer_list = multi_deep_get(renderer, ['contents'], ['items'], default=(), types=(list, tuple))
continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation')
return renderer_list, continuation
# these renderers contain one inside them
nested_renderer_dispatch = {
'singleColumnBrowseResultsRenderer': _traverse_browse_renderer,
'twoColumnBrowseResultsRenderer': _traverse_browse_renderer,
'twoColumnSearchResultsRenderer': lambda renderer: get(renderer, 'primaryContents', {}, types=dict),
}
# these renderers contain a list of renderers inside them
nested_renderer_list_dispatch = {
'sectionListRenderer': _traverse_standard_list,
'itemSectionRenderer': _traverse_standard_list,
'gridRenderer': _traverse_standard_list,
'playlistVideoListRenderer': _traverse_standard_list,
'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[], types=(list, tuple)), None),
}
def extract_items(response, item_types=item_types):
'''return items, ctoken'''
if 'continuationContents' in response:
# always has just the one [something]Continuation key, but do this just in case they add some tracking key or something
for key, renderer_continuation in get(response, 'continuationContents', {}, types=dict).items():
if key.endswith('Continuation'): # e.g. commentSectionContinuation, playlistVideoListContinuation
items = multi_deep_get(renderer_continuation, ['contents'], ['items'], default=[], types=(list, tuple))
ctoken = deep_get(renderer_continuation, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str)
return items, ctoken
return [], None
elif 'contents' in response:
ctoken = None
items = []
iter_stack = collections.deque()
current_iter = iter(())
renderer = get(response, 'contents', {}, types=dict)
while True:
# mode 1: dig into the current renderer
# Will stay in mode 1 (via continue) if a new renderer is found inside this one
# Otherwise, after finding that it is an item renderer,
# contains a list, or contains nothing,
# falls through into mode 2 to get a new renderer
if len(renderer) != 0:
key, value = list(renderer.items())[0]
# has a list in it, add it to the iter stack
if key in nested_renderer_list_dispatch:
renderer_list, continuation = nested_renderer_list_dispatch[key](value)
if renderer_list:
iter_stack.append(current_iter)
current_iter = iter(renderer_list)
if continuation:
ctoken = continuation
# new renderer nested inside this one
elif key in nested_renderer_dispatch:
renderer = nested_renderer_dispatch[key](value)
continue # back to mode 1
# the renderer is an item
elif key in item_types:
items.append(renderer)
# mode 2: get a new renderer by iterating.
# goes up the stack for an iterator if one has been exhausted
while current_iter is not None:
try:
renderer = current_iter.__next__()
break
except StopIteration:
try:
current_iter = iter_stack.pop() # go back up the stack
except IndexError:
return items, ctoken
else:
return [], None

View File

@@ -0,0 +1,273 @@
from .common import (get, multi_get, deep_get, multi_deep_get,
liberal_update, conservative_update, remove_redirect, normalize_url,
extract_str, extract_formatted_text, extract_int, extract_approx_int,
extract_date, check_missing_keys, extract_item_info, extract_items,
extract_response)
from youtube import proto
import re
import urllib
from math import ceil
def extract_channel_info(polymer_json, tab):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
try:
microformat = response['microformat']['microformatDataRenderer']
# channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
except KeyError:
if 'alerts' in response and len(response['alerts']) > 0:
return {'error': ' '.join(alert['alertRenderer']['text']['simpleText'] for alert in response['alerts']) }
elif 'errors' in response['responseContext']:
for error in response['responseContext']['errors']['error']:
if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id':
return {'error': 'This channel does not exist'}
return {'error': 'Failure getting microformat'}
info = {'error': None}
info['current_tab'] = tab
# stuff from microformat (info given by youtube for every page on channel)
info['short_description'] = microformat['description']
info['channel_name'] = microformat['title']
info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url']
channel_url = microformat['urlCanonical'].rstrip('/')
channel_id = channel_url[channel_url.rfind('/')+1:]
info['channel_id'] = channel_id
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['items'] = []
# empty channel
if 'contents' not in response and 'continuationContents' not in response:
return info
items, _ = extract_items(response)
if tab in ('videos', 'playlists', 'search'):
additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
elif tab == 'about':
for item in items:
try:
channel_metadata = item['channelAboutFullMetadataRenderer']
break
except KeyError:
pass
else:
info['error'] = 'Could not find channelAboutFullMetadataRenderer'
return info
info['links'] = []
for link_json in channel_metadata.get('primaryLinks', ()):
url = remove_redirect(link_json['navigationEndpoint']['urlEndpoint']['url'])
text = extract_str(link_json['title'])
info['links'].append( (text, url) )
info['stats'] = []
for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
try:
stat = channel_metadata[stat_name]
except KeyError:
continue
info['stats'].append(extract_str(stat))
if 'description' in channel_metadata:
info['description'] = extract_str(channel_metadata['description'])
else:
info['description'] = ''
else:
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
return info
def extract_search_info(polymer_json):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
info['estimated_results'] = int(response['estimatedResults'])
info['estimated_pages'] = ceil(info['estimated_results']/20)
results, _ = extract_items(response)
info['items'] = []
info['corrections'] = {'type': None}
for renderer in results:
type = list(renderer.keys())[0]
if type == 'shelfRenderer':
continue
if type == 'didYouMeanRenderer':
renderer = renderer[type]
info['corrections'] = {
'type': 'did_you_mean',
'corrected_query': renderer['correctedQueryEndpoint']['searchEndpoint']['query'],
'corrected_query_text': renderer['correctedQuery']['runs'],
}
continue
if type == 'showingResultsForRenderer':
renderer = renderer[type]
info['corrections'] = {
'type': 'showing_results_for',
'corrected_query_text': renderer['correctedQuery']['runs'],
'original_query_text': renderer['originalQuery']['simpleText'],
}
continue
i_info = extract_item_info(renderer)
if i_info.get('type') != 'unsupported':
info['items'].append(i_info)
return info
def extract_playlist_metadata(polymer_json):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
metadata = {'error': None}
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
metadata['title'] = extract_str(header.get('title'))
metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
'thumbnail', 'thumbnails', 0, 'url', default=''))
if first_id:
conservative_update(metadata, 'first_video_id', first_id.group(1))
if metadata['first_video_id'] is None:
metadata['thumbnail'] = None
else:
metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg'
metadata['video_count'] = extract_int(header.get('numVideosText'))
metadata['description'] = extract_str(header.get('descriptionText'), default='')
metadata['author'] = extract_str(header.get('ownerText'))
metadata['author_id'] = multi_deep_get(header,
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerEndpoint', 'browseEndpoint', 'browseId'])
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
else:
metadata['author_url'] = None
metadata['view_count'] = extract_int(header.get('viewCountText'))
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
for stat in header.get('stats', ()):
text = extract_str(stat)
if 'videos' in text:
conservative_update(metadata, 'video_count', extract_int(text))
elif 'views' in text:
conservative_update(metadata, 'view_count', extract_int(text))
elif 'updated' in text:
metadata['time_published'] = extract_date(text)
return metadata
def extract_playlist_info(polymer_json):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
first_page = 'continuationContents' not in response
video_list, _ = extract_items(response)
info['items'] = [extract_item_info(renderer) for renderer in video_list]
if first_page:
info['metadata'] = extract_playlist_metadata(polymer_json)
return info
def _ctoken_metadata(ctoken):
result = dict()
params = proto.parse(proto.b64_to_bytes(ctoken))
result['video_id'] = proto.parse(params[2])[2].decode('ascii')
offset_information = proto.parse(params[6])
result['offset'] = offset_information.get(5, 0)
result['is_replies'] = False
if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
result['is_replies'] = True
result['sort'] = None
else:
try:
result['sort'] = proto.parse(offset_information[4])[6]
except KeyError:
result['sort'] = 0
return result
def extract_comments_info(polymer_json):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
url = multi_deep_get(polymer_json, [1, 'url'], ['url'])
if url:
ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
metadata = _ctoken_metadata(ctoken)
else:
metadata = {}
info['video_id'] = metadata.get('video_id')
info['offset'] = metadata.get('offset')
info['is_replies'] = metadata.get('is_replies')
info['sort'] = metadata.get('sort')
info['video_title'] = None
comments, ctoken = extract_items(response)
info['comments'] = []
info['ctoken'] = ctoken
for comment in comments:
comment_info = {}
if 'commentThreadRenderer' in comment: # top level comments
conservative_update(info, 'is_replies', False)
comment_thread = comment['commentThreadRenderer']
info['video_title'] = extract_str(comment_thread.get('commentTargetTitle'))
if 'replies' not in comment_thread:
comment_info['reply_count'] = 0
else:
comment_info['reply_count'] = extract_int(deep_get(comment_thread,
'replies', 'commentRepliesRenderer', 'moreText'
), default=1) # With 1 reply, the text reads "View reply"
comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={})
elif 'commentRenderer' in comment: # replies
comment_info['reply_count'] = 0 # replyCount, below, not present for replies even if the reply has further replies to it
conservative_update(info, 'is_replies', True)
comment_renderer = comment['commentRenderer']
else:
comment_renderer = {}
# These 3 are sometimes absent, likely because the channel was deleted
comment_info['author'] = extract_str(comment_renderer.get('authorText'))
comment_info['author_url'] = deep_get(comment_renderer,
'authorEndpoint', 'commandMetadata', 'webCommandMetadata', 'url')
comment_info['author_id'] = deep_get(comment_renderer,
'authorEndpoint', 'browseEndpoint', 'browseId')
comment_info['author_avatar'] = deep_get(comment_renderer,
'authorThumbnail', 'thumbnails', 0, 'url')
comment_info['id'] = comment_renderer.get('commentId')
comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText'))
comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText'))
comment_info['like_count'] = comment_renderer.get('likeCount')
liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount'))
info['comments'].append(comment_info)
return info

View File

@@ -0,0 +1,545 @@
from .common import (get, multi_get, deep_get, multi_deep_get,
liberal_update, conservative_update, remove_redirect, normalize_url,
extract_str, extract_formatted_text, extract_int, extract_approx_int,
extract_date, check_missing_keys, extract_item_info, extract_items,
extract_response)
import json
import urllib.parse
import traceback
import re
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py
_formats = {
'5': {'ext': 'flv', 'width': 400, 'height': 240, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'},
'6': {'ext': 'flv', 'width': 450, 'height': 270, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'},
'13': {'ext': '3gp', 'acodec': 'aac', 'vcodec': 'mp4v'},
'17': {'ext': '3gp', 'width': 176, 'height': 144, 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'mp4v'},
'18': {'ext': 'mp4', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 96, 'vcodec': 'h264'},
'22': {'ext': 'mp4', 'width': 1280, 'height': 720, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
'34': {'ext': 'flv', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
'35': {'ext': 'flv', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
# itag 36 videos are either 320x180 (BaW_jenozKc) or 320x240 (__2ABJjxzNo), audio_bitrate varies as well
'36': {'ext': '3gp', 'width': 320, 'acodec': 'aac', 'vcodec': 'mp4v'},
'37': {'ext': 'mp4', 'width': 1920, 'height': 1080, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
'38': {'ext': 'mp4', 'width': 4096, 'height': 3072, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
'43': {'ext': 'webm', 'width': 640, 'height': 360, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'},
'44': {'ext': 'webm', 'width': 854, 'height': 480, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'},
'45': {'ext': 'webm', 'width': 1280, 'height': 720, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
'46': {'ext': 'webm', 'width': 1920, 'height': 1080, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
'59': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
'78': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
# 3D videos
'82': {'ext': 'mp4', 'height': 360, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
'83': {'ext': 'mp4', 'height': 480, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
'84': {'ext': 'mp4', 'height': 720, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
'85': {'ext': 'mp4', 'height': 1080, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
'100': {'ext': 'webm', 'height': 360, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'},
'101': {'ext': 'webm', 'height': 480, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
'102': {'ext': 'webm', 'height': 720, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
# Apple HTTP Live Streaming
'91': {'ext': 'mp4', 'height': 144, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'},
'92': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'},
'93': {'ext': 'mp4', 'height': 360, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
'94': {'ext': 'mp4', 'height': 480, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
'95': {'ext': 'mp4', 'height': 720, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'},
'96': {'ext': 'mp4', 'height': 1080, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'},
'132': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'},
'151': {'ext': 'mp4', 'height': 72, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'h264'},
# DASH mp4 video
'133': {'ext': 'mp4', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'h264'},
'134': {'ext': 'mp4', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'h264'},
'135': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'},
'136': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264'},
'137': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264'},
'138': {'ext': 'mp4', 'format_note': 'DASH video', 'vcodec': 'h264'}, # Height can vary (https://github.com/ytdl-org/youtube-dl/issues/4559)
'160': {'ext': 'mp4', 'height': 144, 'format_note': 'DASH video', 'vcodec': 'h264'},
'212': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'},
'264': {'ext': 'mp4', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'h264'},
'298': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60},
'299': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60},
'266': {'ext': 'mp4', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'h264'},
# Dash mp4 audio
'139': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 48, 'container': 'm4a_dash'},
'140': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 128, 'container': 'm4a_dash'},
'141': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 256, 'container': 'm4a_dash'},
'256': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'},
'258': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'},
'325': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'dtse', 'container': 'm4a_dash'},
'328': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'ec-3', 'container': 'm4a_dash'},
# Dash webm
'167': {'ext': 'webm', 'height': 360, 'width': 640, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
'168': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
'169': {'ext': 'webm', 'height': 720, 'width': 1280, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
'170': {'ext': 'webm', 'height': 1080, 'width': 1920, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
'218': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
'219': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
'278': {'ext': 'webm', 'height': 144, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp9'},
'242': {'ext': 'webm', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'243': {'ext': 'webm', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'244': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'245': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'246': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'247': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'248': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'271': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9'},
# itag 272 videos are either 3840x2160 (e.g. RtoitU2A-3E) or 7680x4320 (sLprVF6d7Ug)
'272': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'302': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
'303': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
'308': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
'313': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'},
'315': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
# Dash webm audio
'171': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 128},
'172': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 256},
# Dash webm audio with opus inside
'249': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 50},
'250': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 70},
'251': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 160},
# RTMP (unnamed)
'_rtmp': {'protocol': 'rtmp'},
# av01 video only formats sometimes served with "unknown" codecs
'394': {'vcodec': 'av01.0.05M.08'},
'395': {'vcodec': 'av01.0.05M.08'},
'396': {'vcodec': 'av01.0.05M.08'},
'397': {'vcodec': 'av01.0.05M.08'},
}
def _extract_metadata_row_info(video_renderer_info):
# extract category and music list
info = {
'category': None,
'music_list': [],
}
current_song = {}
for row in deep_get(video_renderer_info, 'metadataRowContainer', 'metadataRowContainerRenderer', 'rows', default=[]):
row_title = extract_str(deep_get(row, 'metadataRowRenderer', 'title'), default='')
row_content = extract_str(deep_get(row, 'metadataRowRenderer', 'contents', 0))
if row_title == 'Category':
info['category'] = row_content
elif row_title in ('Song', 'Music'):
if current_song:
info['music_list'].append(current_song)
current_song = {'title': row_content}
elif row_title == 'Artist':
current_song['artist'] = row_content
elif row_title == 'Album':
current_song['album'] = row_content
elif row_title == 'Writers':
current_song['writers'] = row_content
elif row_title.startswith('Licensed'):
current_song['licensor'] = row_content
if current_song:
info['music_list'].append(current_song)
return info
def _extract_watch_info_mobile(top_level):
info = {}
microformat = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={})
family_safe = microformat.get('isFamilySafe')
if family_safe is None:
info['age_restricted'] = None
else:
info['age_restricted'] = not family_safe
info['allowed_countries'] = microformat.get('availableCountries', [])
info['time_published'] = microformat.get('publishDate')
response = top_level.get('response', {})
# video info from metadata renderers
items, _ = extract_items(response, item_types={'slimVideoMetadataRenderer'})
if items:
video_info = items[0]['slimVideoMetadataRenderer']
else:
print('Failed to extract video metadata')
video_info = {}
info.update(_extract_metadata_row_info(video_info))
info['description'] = extract_str(video_info.get('description'), recover_urls=True)
info['view_count'] = extract_int(extract_str(video_info.get('expandedSubtitle')))
info['author'] = extract_str(deep_get(video_info, 'owner', 'slimOwnerRenderer', 'title'))
info['author_id'] = deep_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId')
info['title'] = extract_str(video_info.get('title'))
info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle'), default='')
info['unlisted'] = False
for badge in video_info.get('badges', []):
if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted':
info['unlisted'] = True
info['like_count'] = None
info['dislike_count'] = None
if not info['time_published']:
info['time_published'] = extract_date(extract_str(video_info.get('dateText', None)))
for button in video_info.get('buttons', ()):
button_renderer = button.get('slimMetadataToggleButtonRenderer', {})
# all the digits can be found in the accessibility data
count = extract_int(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText', 'accessibility', 'accessibilityData', 'label'))
# this count doesn't have all the digits, it's like 53K for instance
dumb_count = extract_int(extract_str(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText')))
# the accessibility text will be "No likes" or "No dislikes" or something like that, but dumb count will be 0
if dumb_count == 0:
count = 0
if 'isLike' in button_renderer:
info['like_count'] = count
elif 'isDislike' in button_renderer:
info['dislike_count'] = count
# comment section info
items, _ = extract_items(response, item_types={'commentSectionRenderer'})
if items:
comment_info = items[0]['commentSectionRenderer']
comment_count_text = extract_str(deep_get(comment_info, 'header', 'commentSectionHeaderRenderer', 'countText'))
if comment_count_text == 'Comments': # just this with no number, means 0 comments
info['comment_count'] = 0
else:
info['comment_count'] = extract_int(comment_count_text)
info['comments_disabled'] = False
else: # no comment section present means comments are disabled
info['comment_count'] = 0
info['comments_disabled'] = True
# check for limited state
items, _ = extract_items(response, item_types={'limitedStateMessageRenderer'})
if items:
info['limited_state'] = True
else:
info['limited_state'] = False
# related videos
related, _ = extract_items(response)
info['related_videos'] = [extract_item_info(renderer) for renderer in related]
return info
month_abbreviations = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'}
def _extract_watch_info_desktop(top_level):
info = {
'comment_count': None,
'comments_disabled': None,
'allowed_countries': None,
'limited_state': None,
}
video_info = {}
for renderer in deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'results', 'results', 'contents', default=()):
if renderer and list(renderer.keys())[0] in ('videoPrimaryInfoRenderer', 'videoSecondaryInfoRenderer'):
video_info.update(list(renderer.values())[0])
info.update(_extract_metadata_row_info(video_info))
info['description'] = extract_str(video_info.get('description', None), recover_urls=True)
info['time_published'] = extract_date(extract_str(video_info.get('dateText', None)))
likes_dislikes = deep_get(video_info, 'sentimentBar', 'sentimentBarRenderer', 'tooltip', default='').split('/')
if len(likes_dislikes) == 2:
info['like_count'] = extract_int(likes_dislikes[0])
info['dislike_count'] = extract_int(likes_dislikes[1])
else:
info['like_count'] = None
info['dislike_count'] = None
info['title'] = extract_str(video_info.get('title', None))
info['author'] = extract_str(deep_get(video_info, 'owner', 'videoOwnerRenderer', 'title'))
info['author_id'] = deep_get(video_info, 'owner', 'videoOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId')
info['view_count'] = extract_int(extract_str(deep_get(video_info, 'viewCount', 'videoViewCountRenderer', 'viewCount')))
related = deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results', default=[])
info['related_videos'] = [extract_item_info(renderer) for renderer in related]
return info
def _extract_formats(info, player_response):
streaming_data = player_response.get('streamingData', {})
yt_formats = streaming_data.get('formats', []) + streaming_data.get('adaptiveFormats', [])
info['formats'] = []
for yt_fmt in yt_formats:
fmt = {}
fmt['ext'] = None
fmt['audio_bitrate'] = None
fmt['acodec'] = None
fmt['vcodec'] = None
fmt['width'] = yt_fmt.get('width')
fmt['height'] = yt_fmt.get('height')
fmt['file_size'] = yt_fmt.get('contentLength')
fmt['audio_sample_rate'] = yt_fmt.get('audioSampleRate')
fmt['fps'] = yt_fmt.get('fps')
cipher = dict(urllib.parse.parse_qsl(yt_fmt.get('cipher', '')))
if cipher:
fmt['url'] = cipher.get('url')
else:
fmt['url'] = yt_fmt.get('url')
fmt['s'] = cipher.get('s')
fmt['sp'] = cipher.get('sp')
fmt.update(_formats.get(str(yt_fmt.get('itag')), {}))
info['formats'].append(fmt)
def _extract_playability_error(info, player_response, error_prefix=''):
if info['formats']:
info['playability_status'] = None
info['playability_error'] = None
return
playability_status = deep_get(player_response, 'playabilityStatus', 'status', default=None)
info['playability_status'] = playability_status
playability_reason = extract_str(multi_deep_get(player_response,
['playabilityStatus', 'reason'],
['playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'reason'],
default='Could not find playability error')
)
if playability_status not in (None, 'OK'):
info['playability_error'] = error_prefix + playability_reason
else:
info['playability_error'] = error_prefix + 'Unknown playability error'
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json):
info = {'playability_error': None, 'error': None}
if isinstance(polymer_json, dict):
top_level = polymer_json
elif isinstance(polymer_json, (list, tuple)):
top_level = {}
for page_part in polymer_json:
if not isinstance(page_part, dict):
return {'error': 'Invalid page part'}
top_level.update(page_part)
else:
return {'error': 'Invalid top level polymer data'}
error = check_missing_keys(top_level,
['player', 'args'],
['player', 'assets', 'js'],
['playerResponse'],
)
if error:
info['playability_error'] = error
player_args = deep_get(top_level, 'player', 'args', default={})
player_response = json.loads(player_args['player_response']) if 'player_response' in player_args else {}
# captions
info['automatic_caption_languages'] = []
info['manual_caption_languages'] = []
info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url
info['translation_languages'] = []
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
for caption_track in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()):
lang_code = caption_track.get('languageCode')
if not lang_code:
continue
if caption_track.get('kind') == 'asr':
info['automatic_caption_languages'].append(lang_code)
else:
info['manual_caption_languages'].append(lang_code)
base_url = caption_track.get('baseUrl', '')
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name
for translation_lang_info in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'translationLanguages', default=()):
lang_code = translation_lang_info.get('languageCode')
if lang_code:
info['translation_languages'].append(lang_code)
if translation_lang_info.get('isTranslatable') == False:
print('WARNING: Found non-translatable caption language')
# formats
_extract_formats(info, player_response)
# playability errors
_extract_playability_error(info, player_response)
# check age-restriction
info['age_restricted'] = (info['playability_status'] == 'LOGIN_REQUIRED' and info['playability_error'] and ' age' in info['playability_error'])
# base_js (for decryption of signatures)
info['base_js'] = deep_get(top_level, 'player', 'assets', 'js')
if info['base_js']:
info['base_js'] = normalize_url(info['base_js'])
info['player_name'] = get(info['base_js'].split('/'), -2)
else:
info['player_name'] = None
# extract stuff from visible parts of page
mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={})
if mobile:
info.update(_extract_watch_info_mobile(top_level))
else:
info.update(_extract_watch_info_desktop(top_level))
# stuff from videoDetails. Use liberal_update to prioritize info from videoDetails over existing info
vd = deep_get(top_level, 'playerResponse', 'videoDetails', default={})
liberal_update(info, 'title', extract_str(vd.get('title')))
liberal_update(info, 'duration', extract_int(vd.get('lengthSeconds')))
liberal_update(info, 'view_count', extract_int(vd.get('viewCount')))
# videos with no description have a blank string
liberal_update(info, 'description', vd.get('shortDescription'))
liberal_update(info, 'id', vd.get('videoId'))
liberal_update(info, 'author', vd.get('author'))
liberal_update(info, 'author_id', vd.get('channelId'))
liberal_update(info, 'live', vd.get('isLiveContent'))
conservative_update(info, 'unlisted', not vd.get('isCrawlable', True)) #isCrawlable is false on limited state videos even if they aren't unlisted
liberal_update(info, 'tags', vd.get('keywords', []))
# fallback stuff from microformat
mf = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={})
conservative_update(info, 'title', extract_str(mf.get('title')))
conservative_update(info, 'duration', extract_int(mf.get('lengthSeconds')))
# this gives the view count for limited state videos
conservative_update(info, 'view_count', extract_int(mf.get('viewCount')))
conservative_update(info, 'description', extract_str(mf.get('description'), recover_urls=True))
conservative_update(info, 'author', mf.get('ownerChannelName'))
conservative_update(info, 'author_id', mf.get('externalChannelId'))
liberal_update(info, 'unlisted', mf.get('isUnlisted'))
liberal_update(info, 'category', mf.get('category'))
liberal_update(info, 'time_published', mf.get('publishDate'))
liberal_update(info, 'time_uploaded', mf.get('uploadDate'))
# other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
return info
def get_caption_url(info, language, format, automatic=False, translation_language=None):
'''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
url = info['_captions_base_url']
url += '&lang=' + language
url += '&fmt=' + format
if automatic:
url += '&kind=asr'
elif language in info['_manual_caption_language_names']:
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='')
if translation_language:
url += '&tlang=' + translation_language
return url
def update_with_age_restricted_info(info, video_info_page):
ERROR_PREFIX = 'Error bypassing age-restriction: '
video_info = urllib.parse.parse_qs(video_info_page)
player_response = deep_get(video_info, 'player_response', 0)
if player_response is None:
info['playability_error'] = ERROR_PREFIX + 'Could not find player_response in video_info_page'
return
try:
player_response = json.loads(player_response)
except json.decoder.JSONDecodeError:
traceback.print_exc()
info['playability_error'] = ERROR_PREFIX + 'Failed to parse json response'
return
_extract_formats(info, player_response)
_extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX)
def requires_decryption(info):
return ('formats' in info) and info['formats'] and info['formats'][0]['s']
# adapted from youtube-dl and invidious:
# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}')
op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)')
def extract_decryption_function(info, base_js):
'''Insert decryption function into info. Return error string if not successful.
Decryption function is a list of list[2] of numbers.
It is advisable to cache the decryption function (uniquely identified by info['player_name']) so base.js (1 MB) doesn't need to be redownloaded each time'''
info['decryption_function'] = None
decrypt_function_match = decrypt_function_re.search(base_js)
if decrypt_function_match is None:
return 'Could not find decryption function in base.js'
function_body = decrypt_function_match.group(1).split(';')[1:-1]
if not function_body:
return 'Empty decryption function body'
var_name = get(function_body[0].split('.'), 0)
if var_name is None:
return 'Could not find var_name'
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
if var_body_match is None:
return 'Could not find var_body'
operations = var_body_match.group(1).replace('\n', '').split('},')
if not operations:
return 'Did not find any definitions in var_body'
operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
operation_definitions = {}
for op in operations:
colon_index = op.find(':')
opening_brace_index = op.find('{')
if colon_index == -1 or opening_brace_index == -1:
return 'Could not parse operation'
op_name = op[:colon_index]
op_body = op[opening_brace_index+1:]
if op_body == 'a.reverse()':
operation_definitions[op_name] = 0
elif op_body == 'a.splice(0,b)':
operation_definitions[op_name] = 1
elif op_body.startswith('var c=a[0]'):
operation_definitions[op_name] = 2
else:
return 'Unknown op_body: ' + op_body
decryption_function = []
for op_with_arg in function_body:
match = op_with_arg_re.fullmatch(op_with_arg)
if match is None:
return 'Could not parse operation with arg'
op_name = match.group(1)
if op_name not in operation_definitions:
return 'Unknown op_name: ' + op_name
op_argument = match.group(2)
decryption_function.append([operation_definitions[op_name], int(op_argument)])
info['decryption_function'] = decryption_function
return False
def _operation_2(a, b):
c = a[0]
a[0] = a[b % len(a)]
a[b % len(a)] = c
def decrypt_signatures(info):
'''Applies info['decryption_function'] to decrypt all the signatures. Return err.'''
if not info.get('decryption_function'):
return 'decryption_function not in info'
for format in info['formats']:
if not format['s'] or not format['sp'] or not format['url']:
print('Warning: s, sp, or url not in format')
continue
a = list(format['s'])
for op, argument in info['decryption_function']:
if op == 0:
a.reverse()
elif op == 1:
a = a[argument:]
else:
_operation_2(a, argument)
signature = ''.join(a)
format['url'] += '&' + format['sp'] + '=' + signature
return False