fix: support YouTube 2024+ data formats for playlists, podcasts and channels
- Add PODCAST content type support in lockupViewModel extraction - Extract thumbnails and episode count from thumbnail overlay badges - Migrate playlist page fetching from pbj=1 to innertube API (youtubei/v1/browse) - Support new pageHeaderRenderer format in playlist metadata extraction - Fix subscriber count extraction when YouTube returns handle instead of count - Hide "None subscribers" in template when data is unavailable
This commit is contained in:
@@ -30,42 +30,58 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
|
||||
|
||||
def playlist_first_page(playlist_id, report_text="Retrieved playlist",
|
||||
use_mobile=False):
|
||||
if use_mobile:
|
||||
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
|
||||
content = util.fetch_url(
|
||||
url, util.mobile_xhr_headers,
|
||||
report_text=report_text, debug_name='playlist_first_page'
|
||||
)
|
||||
content = json.loads(content.decode('utf-8'))
|
||||
else:
|
||||
url = 'https://www.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
|
||||
content = util.fetch_url(
|
||||
url, util.desktop_xhr_headers,
|
||||
report_text=report_text, debug_name='playlist_first_page'
|
||||
)
|
||||
content = json.loads(content.decode('utf-8'))
|
||||
# Use innertube API (pbj=1 no longer works for many playlists)
|
||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
||||
|
||||
return content
|
||||
data = {
|
||||
'context': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'WEB',
|
||||
'clientVersion': '2.20240327.00.00',
|
||||
},
|
||||
},
|
||||
'browseId': 'VL' + playlist_id,
|
||||
}
|
||||
|
||||
content_type_header = (('Content-Type', 'application/json'),)
|
||||
content = util.fetch_url(
|
||||
url, util.desktop_xhr_headers + content_type_header,
|
||||
data=json.dumps(data),
|
||||
report_text=report_text, debug_name='playlist_first_page'
|
||||
)
|
||||
return json.loads(content.decode('utf-8'))
|
||||
|
||||
|
||||
def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
|
||||
report_text='Retrieved playlist'):
|
||||
# mobile requests return 20 videos per page
|
||||
if use_mobile:
|
||||
page_size = 20
|
||||
headers = util.mobile_xhr_headers
|
||||
# desktop requests return 100 videos per page
|
||||
else:
|
||||
page_size = 100
|
||||
headers = util.desktop_xhr_headers
|
||||
page_size = 100
|
||||
|
||||
url = "https://m.youtube.com/playlist?ctoken="
|
||||
url += playlist_ctoken(playlist_id, (int(page)-1)*page_size,
|
||||
include_shorts=include_shorts)
|
||||
url += "&pbj=1"
|
||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
||||
|
||||
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
|
||||
include_shorts=include_shorts)
|
||||
|
||||
data = {
|
||||
'context': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'WEB',
|
||||
'clientVersion': '2.20240327.00.00',
|
||||
},
|
||||
},
|
||||
'continuation': ctoken,
|
||||
}
|
||||
|
||||
content_type_header = (('Content-Type', 'application/json'),)
|
||||
content = util.fetch_url(
|
||||
url, headers, report_text=report_text,
|
||||
debug_name='playlist_videos'
|
||||
url, util.desktop_xhr_headers + content_type_header,
|
||||
data=json.dumps(data),
|
||||
report_text=report_text, debug_name='playlist_videos'
|
||||
)
|
||||
|
||||
info = json.loads(content.decode('utf-8'))
|
||||
@@ -96,7 +112,7 @@ def get_playlist_page():
|
||||
tasks = (
|
||||
gevent.spawn(
|
||||
playlist_first_page, playlist_id,
|
||||
report_text="Retrieved playlist info", use_mobile=True
|
||||
report_text="Retrieved playlist info"
|
||||
),
|
||||
gevent.spawn(get_videos, playlist_id, page)
|
||||
)
|
||||
|
||||
@@ -58,7 +58,9 @@
|
||||
|
||||
<div class="stats {{'horizontal-stats' if horizontal else 'vertical-stats'}}">
|
||||
{% if info['type'] == 'channel' %}
|
||||
<div>{{ info['approx_subscriber_count'] }} subscribers</div>
|
||||
{% if info.get('approx_subscriber_count') %}
|
||||
<div>{{ info['approx_subscriber_count'] }} subscribers</div>
|
||||
{% endif %}
|
||||
<div>{{ info['video_count']|commatize }} videos</div>
|
||||
{% else %}
|
||||
{% if info.get('time_published') %}
|
||||
|
||||
@@ -241,7 +241,7 @@ def extract_lockup_view_model_info(item, additional_info={}):
|
||||
info['title'] = title_data.get('content', '')
|
||||
|
||||
# Determine type based on contentType
|
||||
if 'PLAYLIST' in content_type:
|
||||
if 'PLAYLIST' in content_type or 'PODCAST' in content_type:
|
||||
info['type'] = 'playlist'
|
||||
info['playlist_type'] = 'playlist'
|
||||
info['id'] = content_id
|
||||
@@ -253,7 +253,7 @@ def extract_lockup_view_model_info(item, additional_info={}):
|
||||
for row in metadata_rows.get('contentMetadataViewModel', {}).get('metadataRows', []):
|
||||
for part in row.get('metadataParts', []):
|
||||
text = part.get('text', {}).get('content', '')
|
||||
if 'video' in text.lower():
|
||||
if 'video' in text.lower() or 'episode' in text.lower():
|
||||
info['video_count'] = extract_int(text)
|
||||
elif 'VIDEO' in content_type:
|
||||
info['type'] = 'video'
|
||||
@@ -276,25 +276,48 @@ def extract_lockup_view_model_info(item, additional_info={}):
|
||||
info['type'] = 'channel'
|
||||
info['id'] = content_id
|
||||
info['approx_subscriber_count'] = None
|
||||
info['video_count'] = None
|
||||
|
||||
# Extract subscriber count and video count from metadata rows
|
||||
metadata_rows = lockup_metadata.get('metadata', {})
|
||||
for row in metadata_rows.get('contentMetadataViewModel', {}).get('metadataRows', []):
|
||||
for part in row.get('metadataParts', []):
|
||||
text = part.get('text', {}).get('content', '')
|
||||
if 'subscriber' in text.lower():
|
||||
info['approx_subscriber_count'] = extract_approx_int(text)
|
||||
elif 'video' in text.lower():
|
||||
info['video_count'] = extract_int(text)
|
||||
else:
|
||||
info['type'] = 'unsupported'
|
||||
return info
|
||||
|
||||
# Extract thumbnail from contentImage
|
||||
content_image = item.get('contentImage', {})
|
||||
collection_thumb = content_image.get('collectionThumbnailViewModel', {})
|
||||
primary_thumb = collection_thumb.get('primaryThumbnail', {})
|
||||
thumb_vm = primary_thumb.get('thumbnailViewModel', {})
|
||||
image_sources = thumb_vm.get('image', {}).get('sources', [])
|
||||
if image_sources:
|
||||
info['thumbnail'] = image_sources[0].get('url', '')
|
||||
else:
|
||||
info['thumbnail'] = ''
|
||||
info['thumbnail'] = normalize_url(multi_deep_get(content_image,
|
||||
# playlists with collection thumbnail
|
||||
['collectionThumbnailViewModel', 'primaryThumbnail', 'thumbnailViewModel', 'image', 'sources', 0, 'url'],
|
||||
# single thumbnail (some playlists, videos)
|
||||
['thumbnailViewModel', 'image', 'sources', 0, 'url'],
|
||||
)) or ''
|
||||
|
||||
# Extract video/episode count from thumbnail overlay badges
|
||||
# (podcasts and some playlists put the count here instead of metadata rows)
|
||||
thumb_vm = multi_deep_get(content_image,
|
||||
['collectionThumbnailViewModel', 'primaryThumbnail', 'thumbnailViewModel'],
|
||||
['thumbnailViewModel'],
|
||||
) or {}
|
||||
for overlay in thumb_vm.get('overlays', []):
|
||||
for badge in deep_get(overlay, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', default=[]):
|
||||
badge_text = deep_get(badge, 'thumbnailBadgeViewModel', 'text', default='')
|
||||
if badge_text and not info.get('video_count'):
|
||||
conservative_update(info, 'video_count', extract_int(badge_text))
|
||||
|
||||
# Extract author info if available
|
||||
info['author'] = None
|
||||
info['author_id'] = None
|
||||
info['author_url'] = None
|
||||
info['description'] = None
|
||||
info['badges'] = []
|
||||
|
||||
# Try to get first video ID from inline player data
|
||||
item_playback = item.get('itemPlayback', {})
|
||||
@@ -463,6 +486,13 @@ def extract_item_info(item, additional_info={}):
|
||||
elif primary_type == 'channel':
|
||||
info['id'] = item.get('channelId')
|
||||
info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
|
||||
# YouTube sometimes puts the handle (@name) in subscriberCountText
|
||||
# instead of the actual count. Fall back to accessibility data.
|
||||
if not info['approx_subscriber_count']:
|
||||
acc_label = deep_get(item, 'subscriberCountText',
|
||||
'accessibility', 'accessibilityData', 'label', default='')
|
||||
if 'subscriber' in acc_label.lower():
|
||||
info['approx_subscriber_count'] = extract_approx_int(acc_label)
|
||||
elif primary_type == 'show':
|
||||
info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
|
||||
info['first_video_id'] = deep_get(item, 'navigationEndpoint',
|
||||
|
||||
@@ -218,40 +218,100 @@ def extract_playlist_metadata(polymer_json):
|
||||
return {'error': err}
|
||||
|
||||
metadata = {'error': None}
|
||||
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
|
||||
metadata['title'] = extract_str(header.get('title'))
|
||||
metadata['title'] = None
|
||||
metadata['first_video_id'] = None
|
||||
metadata['thumbnail'] = None
|
||||
metadata['video_count'] = None
|
||||
metadata['description'] = ''
|
||||
metadata['author'] = None
|
||||
metadata['author_id'] = None
|
||||
metadata['author_url'] = None
|
||||
metadata['view_count'] = None
|
||||
metadata['like_count'] = None
|
||||
metadata['time_published'] = None
|
||||
|
||||
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
|
||||
|
||||
if header:
|
||||
# Classic playlistHeaderRenderer format
|
||||
metadata['title'] = extract_str(header.get('title'))
|
||||
metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
|
||||
first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
|
||||
'thumbnail', 'thumbnails', 0, 'url', default=''))
|
||||
if first_id:
|
||||
conservative_update(metadata, 'first_video_id', first_id.group(1))
|
||||
|
||||
metadata['video_count'] = extract_int(header.get('numVideosText'))
|
||||
metadata['description'] = extract_str(header.get('descriptionText'), default='')
|
||||
metadata['author'] = extract_str(header.get('ownerText'))
|
||||
metadata['author_id'] = multi_deep_get(header,
|
||||
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
|
||||
['ownerEndpoint', 'browseEndpoint', 'browseId'])
|
||||
metadata['view_count'] = extract_int(header.get('viewCountText'))
|
||||
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
|
||||
for stat in header.get('stats', ()):
|
||||
text = extract_str(stat)
|
||||
if 'videos' in text or 'episodes' in text:
|
||||
conservative_update(metadata, 'video_count', extract_int(text))
|
||||
elif 'views' in text:
|
||||
conservative_update(metadata, 'view_count', extract_int(text))
|
||||
elif 'updated' in text:
|
||||
metadata['time_published'] = extract_date(text)
|
||||
else:
|
||||
# New pageHeaderRenderer format (YouTube 2024+)
|
||||
page_header = deep_get(response, 'header', 'pageHeaderRenderer', default={})
|
||||
metadata['title'] = page_header.get('pageTitle')
|
||||
view_model = deep_get(page_header, 'content', 'pageHeaderViewModel', default={})
|
||||
|
||||
# Extract title from viewModel if not found
|
||||
if not metadata['title']:
|
||||
metadata['title'] = deep_get(view_model,
|
||||
'title', 'dynamicTextViewModel', 'text', 'content')
|
||||
|
||||
# Extract metadata from rows (author, video count, views, etc.)
|
||||
meta_rows = deep_get(view_model,
|
||||
'metadata', 'contentMetadataViewModel', 'metadataRows', default=[])
|
||||
for row in meta_rows:
|
||||
for part in row.get('metadataParts', []):
|
||||
text_content = deep_get(part, 'text', 'content', default='')
|
||||
# Author from avatarStack
|
||||
avatar_stack = deep_get(part, 'avatarStack', 'avatarStackViewModel', default={})
|
||||
if avatar_stack:
|
||||
author_text = deep_get(avatar_stack, 'text', 'content')
|
||||
if author_text:
|
||||
metadata['author'] = author_text
|
||||
# Extract author_id from commandRuns
|
||||
for run in deep_get(avatar_stack, 'text', 'commandRuns', default=[]):
|
||||
browse_id = deep_get(run, 'onTap', 'innertubeCommand',
|
||||
'browseEndpoint', 'browseId')
|
||||
if browse_id:
|
||||
metadata['author_id'] = browse_id
|
||||
# Video/episode count
|
||||
if text_content and ('video' in text_content.lower() or 'episode' in text_content.lower()):
|
||||
conservative_update(metadata, 'video_count', extract_int(text_content))
|
||||
# View count
|
||||
elif text_content and 'view' in text_content.lower():
|
||||
conservative_update(metadata, 'view_count', extract_int(text_content))
|
||||
# Last updated
|
||||
elif text_content and 'updated' in text_content.lower():
|
||||
metadata['time_published'] = extract_date(text_content)
|
||||
|
||||
# Extract description from sidebar if available
|
||||
sidebar = deep_get(response, 'sidebar', 'playlistSidebarRenderer', 'items', default=[])
|
||||
for sidebar_item in sidebar:
|
||||
desc = deep_get(sidebar_item, 'playlistSidebarPrimaryInfoRenderer',
|
||||
'description', 'simpleText')
|
||||
if desc:
|
||||
metadata['description'] = desc
|
||||
|
||||
if metadata['author_id']:
|
||||
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
|
||||
|
||||
metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
|
||||
first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
|
||||
'thumbnail', 'thumbnails', 0, 'url', default=''))
|
||||
if first_id:
|
||||
conservative_update(metadata, 'first_video_id', first_id.group(1))
|
||||
if metadata['first_video_id'] is None:
|
||||
metadata['thumbnail'] = None
|
||||
else:
|
||||
metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg"
|
||||
|
||||
metadata['video_count'] = extract_int(header.get('numVideosText'))
|
||||
metadata['description'] = extract_str(header.get('descriptionText'), default='')
|
||||
metadata['author'] = extract_str(header.get('ownerText'))
|
||||
metadata['author_id'] = multi_deep_get(header,
|
||||
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
|
||||
['ownerEndpoint', 'browseEndpoint', 'browseId'])
|
||||
if metadata['author_id']:
|
||||
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
|
||||
else:
|
||||
metadata['author_url'] = None
|
||||
metadata['view_count'] = extract_int(header.get('viewCountText'))
|
||||
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
|
||||
for stat in header.get('stats', ()):
|
||||
text = extract_str(stat)
|
||||
if 'videos' in text:
|
||||
conservative_update(metadata, 'video_count', extract_int(text))
|
||||
elif 'views' in text:
|
||||
conservative_update(metadata, 'view_count', extract_int(text))
|
||||
elif 'updated' in text:
|
||||
metadata['time_published'] = extract_date(text)
|
||||
|
||||
microformat = deep_get(response, 'microformat', 'microformatDataRenderer',
|
||||
default={})
|
||||
conservative_update(
|
||||
|
||||
Reference in New Issue
Block a user