
[something]Continuation renderers, all of which are junk except one. Check the items in each one until the one which contains the items being sought is found. The usage in extract_comments_info needed to be changed to specify the items being sought. It was unspecified before which is strictly incorrect since extract_items by default looks for video/playlist/channel thumbnail items. It was relying on this special case for continuations. But now that wouldn't work anymore.
273 lines
11 KiB
Python
273 lines
11 KiB
Python
from .common import (get, multi_get, deep_get, multi_deep_get,
|
|
liberal_update, conservative_update, remove_redirect, normalize_url,
|
|
extract_str, extract_formatted_text, extract_int, extract_approx_int,
|
|
extract_date, check_missing_keys, extract_item_info, extract_items,
|
|
extract_response)
|
|
from youtube import proto
|
|
|
|
import re
|
|
import urllib
|
|
from math import ceil
|
|
|
|
def extract_channel_info(polymer_json, tab):
|
|
response, err = extract_response(polymer_json)
|
|
if err:
|
|
return {'error': err}
|
|
|
|
try:
|
|
microformat = response['microformat']['microformatDataRenderer']
|
|
|
|
# channel doesn't exist or was terminated
|
|
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
|
|
except KeyError:
|
|
if response.get('alerts'):
|
|
error_string = ' '.join(
|
|
extract_str(deep_get(alert, 'alertRenderer', 'text'), default='')
|
|
for alert in response['alerts']
|
|
)
|
|
if not error_string:
|
|
error_string = 'Failed to extract error'
|
|
return {'error': error_string}
|
|
elif deep_get(response, 'responseContext', 'errors'):
|
|
for error in response['responseContext']['errors'].get('error', []):
|
|
if error.get('code') == 'INVALID_VALUE' and error.get('location') == 'browse_id':
|
|
return {'error': 'This channel does not exist'}
|
|
return {'error': 'Failure getting microformat'}
|
|
|
|
info = {'error': None}
|
|
info['current_tab'] = tab
|
|
|
|
info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
|
|
'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
|
|
|
|
# stuff from microformat (info given by youtube for every page on channel)
|
|
info['short_description'] = microformat.get('description')
|
|
info['channel_name'] = microformat.get('title')
|
|
info['avatar'] = deep_get(microformat, 'thumbnail', 'thumbnails', 0, 'url')
|
|
channel_url = microformat.get('urlCanonical')
|
|
if channel_url:
|
|
channel_id = get(channel_url.rstrip('/').split('/'), -1)
|
|
info['channel_id'] = channel_id
|
|
else:
|
|
info['channel_id'] = deep_get(response, 'metadata', 'channelMetadataRenderer', 'externalId')
|
|
if info['channel_id']:
|
|
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
|
|
else:
|
|
info['channel_url'] = None
|
|
|
|
# get items
|
|
info['items'] = []
|
|
|
|
# empty channel
|
|
if 'contents' not in response and 'continuationContents' not in response:
|
|
return info
|
|
|
|
if tab in ('videos', 'playlists', 'search'):
|
|
items, ctoken = extract_items(response)
|
|
additional_info = {'author': info['channel_name'], 'author_url': info['channel_url']}
|
|
info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
|
|
if tab == 'search':
|
|
info['is_last_page'] = (ctoken is None)
|
|
elif tab == 'about':
|
|
items, _ = extract_items(response, item_types={'channelAboutFullMetadataRenderer'})
|
|
if not items:
|
|
info['error'] = 'Could not find channelAboutFullMetadataRenderer'
|
|
return info
|
|
channel_metadata = items[0]['channelAboutFullMetadataRenderer']
|
|
|
|
info['links'] = []
|
|
for link_json in channel_metadata.get('primaryLinks', ()):
|
|
url = remove_redirect(deep_get(link_json, 'navigationEndpoint', 'urlEndpoint', 'url'))
|
|
text = extract_str(link_json.get('title'))
|
|
info['links'].append( (text, url) )
|
|
|
|
info['date_joined'] = extract_date(channel_metadata.get('joinedDateText'))
|
|
info['view_count'] = extract_int(channel_metadata.get('viewCountText'))
|
|
info['description'] = extract_str(channel_metadata.get('description'), default='')
|
|
else:
|
|
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
|
|
|
|
return info
|
|
|
|
def extract_search_info(polymer_json):
|
|
response, err = extract_response(polymer_json)
|
|
if err:
|
|
return {'error': err}
|
|
info = {'error': None}
|
|
info['estimated_results'] = int(response['estimatedResults'])
|
|
info['estimated_pages'] = ceil(info['estimated_results']/20)
|
|
|
|
|
|
results, _ = extract_items(response)
|
|
|
|
|
|
info['items'] = []
|
|
info['corrections'] = {'type': None}
|
|
for renderer in results:
|
|
type = list(renderer.keys())[0]
|
|
if type == 'shelfRenderer':
|
|
continue
|
|
if type == 'didYouMeanRenderer':
|
|
renderer = renderer[type]
|
|
|
|
info['corrections'] = {
|
|
'type': 'did_you_mean',
|
|
'corrected_query': renderer['correctedQueryEndpoint']['searchEndpoint']['query'],
|
|
'corrected_query_text': renderer['correctedQuery']['runs'],
|
|
}
|
|
continue
|
|
if type == 'showingResultsForRenderer':
|
|
renderer = renderer[type]
|
|
|
|
info['corrections'] = {
|
|
'type': 'showing_results_for',
|
|
'corrected_query_text': renderer['correctedQuery']['runs'],
|
|
'original_query_text': renderer['originalQuery']['simpleText'],
|
|
}
|
|
continue
|
|
|
|
i_info = extract_item_info(renderer)
|
|
if i_info.get('type') != 'unsupported':
|
|
info['items'].append(i_info)
|
|
|
|
|
|
return info
|
|
|
|
def extract_playlist_metadata(polymer_json):
|
|
response, err = extract_response(polymer_json)
|
|
if err:
|
|
return {'error': err}
|
|
|
|
metadata = {'error': None}
|
|
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
|
|
metadata['title'] = extract_str(header.get('title'))
|
|
|
|
metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
|
|
first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
|
|
'thumbnail', 'thumbnails', 0, 'url', default=''))
|
|
if first_id:
|
|
conservative_update(metadata, 'first_video_id', first_id.group(1))
|
|
if metadata['first_video_id'] is None:
|
|
metadata['thumbnail'] = None
|
|
else:
|
|
metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg'
|
|
|
|
metadata['video_count'] = extract_int(header.get('numVideosText'))
|
|
metadata['description'] = extract_str(header.get('descriptionText'), default='')
|
|
metadata['author'] = extract_str(header.get('ownerText'))
|
|
metadata['author_id'] = multi_deep_get(header,
|
|
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
|
|
['ownerEndpoint', 'browseEndpoint', 'browseId'])
|
|
if metadata['author_id']:
|
|
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
|
|
else:
|
|
metadata['author_url'] = None
|
|
metadata['view_count'] = extract_int(header.get('viewCountText'))
|
|
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
|
|
for stat in header.get('stats', ()):
|
|
text = extract_str(stat)
|
|
if 'videos' in text:
|
|
conservative_update(metadata, 'video_count', extract_int(text))
|
|
elif 'views' in text:
|
|
conservative_update(metadata, 'view_count', extract_int(text))
|
|
elif 'updated' in text:
|
|
metadata['time_published'] = extract_date(text)
|
|
|
|
return metadata
|
|
|
|
def extract_playlist_info(polymer_json):
|
|
response, err = extract_response(polymer_json)
|
|
if err:
|
|
return {'error': err}
|
|
info = {'error': None}
|
|
first_page = 'continuationContents' not in response
|
|
video_list, _ = extract_items(response)
|
|
|
|
info['items'] = [extract_item_info(renderer) for renderer in video_list]
|
|
|
|
if first_page:
|
|
info['metadata'] = extract_playlist_metadata(polymer_json)
|
|
|
|
return info
|
|
|
|
def _ctoken_metadata(ctoken):
|
|
result = dict()
|
|
params = proto.parse(proto.b64_to_bytes(ctoken))
|
|
result['video_id'] = proto.parse(params[2])[2].decode('ascii')
|
|
|
|
offset_information = proto.parse(params[6])
|
|
result['offset'] = offset_information.get(5, 0)
|
|
|
|
result['is_replies'] = False
|
|
if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
|
|
result['is_replies'] = True
|
|
result['sort'] = None
|
|
else:
|
|
try:
|
|
result['sort'] = proto.parse(offset_information[4])[6]
|
|
except KeyError:
|
|
result['sort'] = 0
|
|
return result
|
|
|
|
def extract_comments_info(polymer_json):
|
|
response, err = extract_response(polymer_json)
|
|
if err:
|
|
return {'error': err}
|
|
info = {'error': None}
|
|
|
|
url = multi_deep_get(polymer_json, [1, 'url'], ['url'])
|
|
if url:
|
|
ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
|
|
metadata = _ctoken_metadata(ctoken)
|
|
else:
|
|
metadata = {}
|
|
info['video_id'] = metadata.get('video_id')
|
|
info['offset'] = metadata.get('offset')
|
|
info['is_replies'] = metadata.get('is_replies')
|
|
info['sort'] = metadata.get('sort')
|
|
info['video_title'] = None
|
|
|
|
comments, ctoken = extract_items(response,
|
|
item_types={'commentThreadRenderer', 'commentRenderer'})
|
|
info['comments'] = []
|
|
info['ctoken'] = ctoken
|
|
for comment in comments:
|
|
comment_info = {}
|
|
|
|
if 'commentThreadRenderer' in comment: # top level comments
|
|
conservative_update(info, 'is_replies', False)
|
|
comment_thread = comment['commentThreadRenderer']
|
|
info['video_title'] = extract_str(comment_thread.get('commentTargetTitle'))
|
|
if 'replies' not in comment_thread:
|
|
comment_info['reply_count'] = 0
|
|
else:
|
|
comment_info['reply_count'] = extract_int(deep_get(comment_thread,
|
|
'replies', 'commentRepliesRenderer', 'moreText'
|
|
), default=1) # With 1 reply, the text reads "View reply"
|
|
comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={})
|
|
elif 'commentRenderer' in comment: # replies
|
|
comment_info['reply_count'] = 0 # replyCount, below, not present for replies even if the reply has further replies to it
|
|
conservative_update(info, 'is_replies', True)
|
|
comment_renderer = comment['commentRenderer']
|
|
else:
|
|
comment_renderer = {}
|
|
|
|
# These 3 are sometimes absent, likely because the channel was deleted
|
|
comment_info['author'] = extract_str(comment_renderer.get('authorText'))
|
|
comment_info['author_url'] = deep_get(comment_renderer,
|
|
'authorEndpoint', 'commandMetadata', 'webCommandMetadata', 'url')
|
|
comment_info['author_id'] = deep_get(comment_renderer,
|
|
'authorEndpoint', 'browseEndpoint', 'browseId')
|
|
|
|
comment_info['author_avatar'] = deep_get(comment_renderer,
|
|
'authorThumbnail', 'thumbnails', 0, 'url')
|
|
comment_info['id'] = comment_renderer.get('commentId')
|
|
comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText'))
|
|
comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText'))
|
|
comment_info['like_count'] = comment_renderer.get('likeCount')
|
|
liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount'))
|
|
|
|
info['comments'].append(comment_info)
|
|
|
|
return info
|