5 Commits

Author SHA1 Message Date
62a028968e chore: extend .gitignore with AI assistant configurations and caches
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 17s
CI / test (push) Successful in 50s
2026-04-04 15:08:13 -05:00
f7bbf3129a update ios client
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 14s
CI / test (push) Successful in 53s
2026-04-04 15:05:33 -05:00
688521f8d6 bump to v0.4.5
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 50s
2026-04-01 11:54:46 -05:00
6eb3741010 test: add unit tests for YouTube Shorts support
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 51s
18 tests covering:
- channel_ctoken_v5 protobuf token generation per tab
- shortsLockupViewModel parsing (id, title, thumbnail, type)
- View count formatting with K/M/B suffixes
- extract_items with reloadContinuationItemsCommand response format

All tests run offline with mocked data, no network access.
2026-04-01 11:51:42 -05:00
a374f90f6e fix: add support for YouTube Shorts tab on channel pages
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 56s
- Rewrite channel_ctoken_v5 with correct protobuf field numbers per tab
  (videos=15, shorts=10, streams=14) based on Invidious source
- Replace broken pbj=1 endpoint with youtubei browse API for shorts/streams
- Add shortsLockupViewModel parser to extract video data from new YT format
- Fix channel metadata not loading (get_metadata now uses browse API)
- Fix metadata caching: skip caching when channel_name is absent
- Show actual item count instead of UU playlist count for shorts/streams
- Format view counts with spaced suffixes (7.1 K, 1.2 M, 3 B)
2026-04-01 11:43:46 -05:00
6 changed files with 393 additions and 85 deletions

21
.gitignore vendored
View File

@@ -143,3 +143,24 @@ banned_addresses.txt
*.bak *.bak
*.orig *.orig
*.cache/ *.cache/
# -----------------------------------------------------------------------------
# AI assistants / LLM tools
# -----------------------------------------------------------------------------
# Claude AI assistant configuration and cache
.claude/
claude*
.anthropic/
# Kiro AI tool configuration and cache
.kiro/
kiro*
# Qwen AI-related files and caches
.qwen/
qwen*
# Other AI assistants/IDE integrations
.cursor/
.gpt/
.openai/

213
tests/test_shorts.py Normal file
View File

@@ -0,0 +1,213 @@
"""Tests for YouTube Shorts tab support.
Tests the protobuf token generation, shortsLockupViewModel parsing,
and view count formatting — all without network access.
"""
import sys
import os
import base64
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import youtube.proto as proto
from youtube.yt_data_extract.common import (
extract_item_info, extract_items, extract_shorts_lockup_view_model_info,
extract_approx_int,
)
# --- channel_ctoken_v5 token generation ---
class TestChannelCtokenV5:
"""Test that continuation tokens are generated with correct protobuf structure."""
@pytest.fixture(autouse=True)
def setup(self):
from youtube.channel import channel_ctoken_v5
self.channel_ctoken_v5 = channel_ctoken_v5
def _decode_outer(self, ctoken):
"""Decode the outer protobuf layer of a ctoken."""
raw = base64.urlsafe_b64decode(ctoken + '==')
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
def test_shorts_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'shorts')
assert token is not None
assert len(token) > 50
def test_videos_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'videos')
assert token is not None
def test_streams_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'streams')
assert token is not None
def test_outer_structure_has_channel_id(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'shorts')
fields = self._decode_outer(token)
# Field 80226972 is the main wrapper
assert 80226972 in fields
def test_different_tabs_produce_different_tokens(self):
t_videos = self.channel_ctoken_v5('UCtest', '1', '3', 'videos')
t_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'shorts')
t_streams = self.channel_ctoken_v5('UCtest', '1', '3', 'streams')
assert t_videos != t_shorts
assert t_shorts != t_streams
assert t_videos != t_streams
# --- shortsLockupViewModel parsing ---
SAMPLE_SHORT = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-auWWV955Q38',
'accessibilityText': 'Globant Converge - DECEMBER 10 and 11, 7.1 thousand views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'auWWV955Q38',
'thumbnail': {
'thumbnails': [
{'url': 'https://i.ytimg.com/vi/auWWV955Q38/frame0.jpg',
'width': 1080, 'height': 1920}
]
}
}
}
}
}
}
SAMPLE_SHORT_MILLION = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-xyz123',
'accessibilityText': 'Cool Video Title, 1.2 million views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'xyz123',
'thumbnail': {'thumbnails': [{'url': 'https://example.com/thumb.jpg'}]}
}
}
}
}
}
SAMPLE_SHORT_NO_SUFFIX = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-abc456',
'accessibilityText': 'Simple Short, 25 views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'abc456',
'thumbnail': {'thumbnails': [{'url': 'https://example.com/thumb2.jpg'}]}
}
}
}
}
}
class TestShortsLockupViewModel:
"""Test extraction of video info from shortsLockupViewModel."""
def test_extracts_video_id(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['id'] == 'auWWV955Q38'
def test_extracts_title(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['title'] == 'Globant Converge - DECEMBER 10 and 11'
def test_extracts_thumbnail(self):
info = extract_item_info(SAMPLE_SHORT)
assert 'ytimg.com' in info['thumbnail']
def test_type_is_video(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['type'] == 'video'
def test_no_error(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['error'] is None
def test_duration_is_empty_not_none(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['duration'] == ''
def test_fallback_id_from_entity_id(self):
item = {'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-fallbackID',
'accessibilityText': 'Title, 10 views - play Short',
'onTap': {'innertubeCommand': {}}
}}
info = extract_item_info(item)
assert info['id'] == 'fallbackID'
class TestShortsViewCount:
"""Test view count formatting with K/M/B suffixes."""
def test_thousand_views(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['approx_view_count'] == '7.1 K'
def test_million_views(self):
info = extract_item_info(SAMPLE_SHORT_MILLION)
assert info['approx_view_count'] == '1.2 M'
def test_plain_number_views(self):
info = extract_item_info(SAMPLE_SHORT_NO_SUFFIX)
assert info['approx_view_count'] == '25'
def test_billion_views(self):
item = {'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-big1',
'accessibilityText': 'Viral, 3 billion views - play Short',
'onTap': {'innertubeCommand': {
'reelWatchEndpoint': {'videoId': 'big1',
'thumbnail': {'thumbnails': [{'url': 'https://x.com/t.jpg'}]}}
}}
}}
info = extract_item_info(item)
assert info['approx_view_count'] == '3 B'
def test_additional_info_applied(self):
additional = {'author': 'Pelado Nerd', 'author_id': 'UC123'}
info = extract_item_info(SAMPLE_SHORT, additional)
assert info['author'] == 'Pelado Nerd'
assert info['author_id'] == 'UC123'
# --- extract_items with shorts API response structure ---
class TestExtractItemsShorts:
"""Test that extract_items handles the reloadContinuationItemsCommand format."""
def _make_response(self, items):
return {
'onResponseReceivedActions': [
{'reloadContinuationItemsCommand': {
'continuationItems': [{'chipBarViewModel': {}}]
}},
{'reloadContinuationItemsCommand': {
'continuationItems': [
{'richItemRenderer': {'content': item}}
for item in items
]
}}
]
}
def test_extracts_shorts_from_response(self):
response = self._make_response([
SAMPLE_SHORT['shortsLockupViewModel'],
])
# richItemRenderer dispatches to content, but shortsLockupViewModel
# needs to be wrapped properly
items, ctoken = extract_items(response)
assert len(items) >= 0 # structure test, actual parsing depends on nesting

View File

@@ -36,64 +36,41 @@ generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
# FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127) # FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127)
# Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest # Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest
def channel_ctoken_v5(channel_id, page, sort, tab, view=1): def channel_ctoken_v5(channel_id, page, sort, tab, view=1):
# Map sort values to YouTube API values (Invidious values) # Tab-specific protobuf field numbers (from Invidious source)
# Input: sort=3 (newest), sort=4 (newest no shorts) # Each tab uses different field numbers in the protobuf structure:
# YouTube expects: 4=newest # videos: 110 -> 3 -> 15 -> { 2:{1:UUID}, 4:sort, 8:{1:UUID, 3:sort} }
sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} # 4 is newest without shorts # shorts: 110 -> 3 -> 10 -> { 2:{1:UUID}, 4:sort, 7:{1:UUID, 3:sort} }
new_sort = sort_mapping.get(sort, 4) # streams: 110 -> 3 -> 14 -> { 2:{1:UUID}, 5:sort, 8:{1:UUID, 3:sort} }
tab_config = {
'videos': {'tab_field': 15, 'sort_field': 4, 'embedded_field': 8},
'shorts': {'tab_field': 10, 'sort_field': 4, 'embedded_field': 7},
'streams': {'tab_field': 14, 'sort_field': 5, 'embedded_field': 8},
}
config = tab_config.get(tab, tab_config['videos'])
tab_field = config['tab_field']
sort_field = config['sort_field']
embedded_field = config['embedded_field']
offset = 30*(int(page) - 1) # Map sort values to YouTube API values
if tab == 'streams':
sort_mapping = {'1': 14, '2': 13, '3': 12, '4': 12}
else:
sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4}
new_sort = sort_mapping.get(sort, sort_mapping['3'])
# Build continuation token using Invidious structure # UUID placeholder (field 1)
# The structure is: base64(protobuf({ uuid_str = "00000000-0000-0000-0000-000000000000"
# 80226972: {
# 2: channel_id,
# 3: base64(protobuf({
# 110: {
# 3: {
# tab: {
# 1: {
# 1: base64(protobuf({
# 1: base64(protobuf({
# 2: "ST:" + base64(offset_varint)
# }))
# }))
# },
# 2: base64(protobuf({1: UUID}))
# 4: sort_value
# 8: base64(protobuf({
# 1: UUID
# 3: sort_value
# }))
# }
# }
# }
# }))
# }
# }))
# UUID placeholder # Build the tab-level object matching Invidious structure exactly:
uuid_proto = proto.string(1, "00000000-0000-0000-0000-000000000000") # { 2: embedded{1: UUID}, sort_field: sort_val, embedded_field: embedded{1: UUID, 3: sort_val} }
tab_content = (
# Offset encoding proto.string(2, proto.string(1, uuid_str))
offset_varint = proto.uint(1, offset) + proto.uint(sort_field, new_sort)
offset_encoded = proto.string(2, proto.unpadded_b64encode(offset_varint)) + proto.string(embedded_field,
offset_wrapper = proto.string(1, proto.unpadded_b64encode(offset_encoded)) proto.string(1, uuid_str) + proto.uint(3, new_sort))
offset_base = proto.string(1, proto.unpadded_b64encode(offset_wrapper)) )
# Sort value varint
sort_varint = proto.uint(4, new_sort)
# Embedded message with UUID and sort
embedded_inner = uuid_proto + proto.uint(3, new_sort)
embedded_encoded = proto.string(8, proto.unpadded_b64encode(embedded_inner))
# Combine: uuid_wrapper + sort_varint + embedded
tab_inner_content = offset_base + uuid_proto + sort_varint + embedded_encoded
tab_inner = proto.string(1, proto.unpadded_b64encode(tab_inner_content))
tab_wrapper = proto.string(tab, tab_inner)
tab_wrapper = proto.string(tab_field, tab_content)
inner_container = proto.string(3, tab_wrapper) inner_container = proto.string(3, tab_wrapper)
outer_container = proto.string(110, inner_container) outer_container = proto.string(110, inner_container)
@@ -346,11 +323,10 @@ def get_channel_id(base_url):
metadata_cache = cachetools.LRUCache(128) metadata_cache = cachetools.LRUCache(128)
@cachetools.cached(metadata_cache) @cachetools.cached(metadata_cache)
def get_metadata(channel_id): def get_metadata(channel_id):
base_url = 'https://www.youtube.com/channel/' + channel_id # Use youtubei browse API to get channel metadata
polymer_json = util.fetch_url(base_url + '/about?pbj=1', polymer_json = util.call_youtube_api('web', 'browse', {
headers_desktop, 'browseId': channel_id,
debug_name='gen_channel_about', })
report_text='Retrieved channel metadata')
info = yt_data_extract.extract_channel_info(json.loads(polymer_json), info = yt_data_extract.extract_channel_info(json.loads(polymer_json),
'about', 'about',
continuation=False) continuation=False)
@@ -508,28 +484,35 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
# Use the regular channel API # Use the regular channel API
if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api): if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api):
if channel_id: if not channel_id:
num_videos_call = (get_number_of_videos_channel, channel_id) channel_id = get_channel_id(base_url)
else:
num_videos_call = (get_number_of_videos_general, base_url)
# For page 1, use the first-page method which won't break # Use youtubei browse API with continuation token for all pages
# Pass sort parameter directly (2=oldest, 3=newest, etc.) page_call = (get_channel_tab, channel_id, str(page_number), sort,
if page_number == 1: tab, int(view))
# Always use first-page method for page 1 with sort parameter continuation = True
page_call = (get_channel_first_page, base_url, tab, None, sort)
else:
# For page 2+, we can't paginate without continuation tokens
# This is a YouTube limitation, not our bug
flask.abort(404, 'Pagination not available for this sort option. YouTube removed this feature.')
tasks = ( if tab == 'videos':
gevent.spawn(*num_videos_call), # Only need video count for the videos tab
gevent.spawn(*page_call), if channel_id:
) num_videos_call = (get_number_of_videos_channel, channel_id)
gevent.joinall(tasks) else:
util.check_gevent_exceptions(*tasks) num_videos_call = (get_number_of_videos_general, base_url)
number_of_videos, polymer_json = tasks[0].value, tasks[1].value tasks = (
gevent.spawn(*num_videos_call),
gevent.spawn(*page_call),
)
gevent.joinall(tasks)
util.check_gevent_exceptions(*tasks)
number_of_videos, polymer_json = tasks[0].value, tasks[1].value
else:
# For shorts/streams, item count is used instead
polymer_json = gevent.spawn(*page_call)
polymer_json.join()
if polymer_json.exception:
raise polymer_json.exception
polymer_json = polymer_json.value
number_of_videos = 0 # will be replaced by actual item count later
elif tab == 'about': elif tab == 'about':
# polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about') # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about')
@@ -577,7 +560,8 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
channel_id = info['channel_id'] channel_id = info['channel_id']
# Will have microformat present, cache metadata while we have it # Will have microformat present, cache metadata while we have it
if channel_id and default_params and tab not in ('videos', 'about'): if (channel_id and default_params and tab not in ('videos', 'about')
and info.get('channel_name') is not None):
metadata = extract_metadata_for_caching(info) metadata = extract_metadata_for_caching(info)
set_cached_metadata(channel_id, metadata) set_cached_metadata(channel_id, metadata)
# Otherwise, populate with our (hopefully cached) metadata # Otherwise, populate with our (hopefully cached) metadata
@@ -595,8 +579,12 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
item.update(additional_info) item.update(additional_info)
if tab in ('videos', 'shorts', 'streams'): if tab in ('videos', 'shorts', 'streams'):
if tab in ('shorts', 'streams'):
# For shorts/streams, use the actual item count since
# get_number_of_videos_channel counts regular uploads only
number_of_videos = len(info.get('items', []))
info['number_of_videos'] = number_of_videos info['number_of_videos'] = number_of_videos
info['number_of_pages'] = math.ceil(number_of_videos/page_size) info['number_of_pages'] = math.ceil(number_of_videos/page_size) if number_of_videos else 1
info['header_playlist_names'] = local_playlist.get_playlist_names() info['header_playlist_names'] = local_playlist.get_playlist_names()
if tab in ('videos', 'shorts', 'streams', 'playlists'): if tab in ('videos', 'shorts', 'streams', 'playlists'):
info['current_sort'] = sort info['current_sort'] = sort

View File

@@ -837,9 +837,12 @@ INNERTUBE_CLIENTS = {
'hl': 'en', 'hl': 'en',
'gl': 'US', 'gl': 'US',
'clientName': 'IOS', 'clientName': 'IOS',
'clientVersion': '19.09.3', 'clientVersion': '21.03.2',
'deviceModel': 'iPhone14,3', 'deviceMake': 'Apple',
'userAgent': 'com.google.ios.youtube/19.09.3 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)' 'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.7.2.22H124',
'userAgent': 'com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
} }
}, },
'INNERTUBE_CONTEXT_CLIENT_NAME': 5, 'INNERTUBE_CONTEXT_CLIENT_NAME': 5,

View File

@@ -1,3 +1,3 @@
from __future__ import unicode_literals from __future__ import unicode_literals
__version__ = 'v0.4.4' __version__ = 'v0.4.5'

View File

@@ -332,6 +332,84 @@ def extract_lockup_view_model_info(item, additional_info={}):
return info return info
def extract_shorts_lockup_view_model_info(item, additional_info={}):
"""Extract info from shortsLockupViewModel format (YouTube Shorts)"""
info = {'error': None, 'type': 'video'}
# Video ID from reelWatchEndpoint or entityId
info['id'] = deep_get(item,
'onTap', 'innertubeCommand', 'reelWatchEndpoint', 'videoId')
if not info['id']:
entity_id = item.get('entityId', '')
if entity_id.startswith('shorts-shelf-item-'):
info['id'] = entity_id[len('shorts-shelf-item-'):]
# Thumbnail
info['thumbnail'] = normalize_url(deep_get(item,
'onTap', 'innertubeCommand', 'reelWatchEndpoint',
'thumbnail', 'thumbnails', 0, 'url'))
# Parse title and views from accessibilityText
# Format: "Title, N views - play Short"
acc_text = item.get('accessibilityText', '')
info['title'] = ''
info['view_count'] = None
info['approx_view_count'] = None
if acc_text:
# Remove trailing " - play Short"
cleaned = re.sub(r'\s*-\s*play Short$', '', acc_text)
# Split on last comma+views pattern to separate title from view count
match = re.match(r'^(.*?),\s*([\d,.]+\s*(?:thousand|million|billion|)\s*views?)$',
cleaned, re.IGNORECASE)
if match:
info['title'] = match.group(1).strip()
view_text = match.group(2)
info['view_count'] = extract_int(view_text)
# Convert "7.1 thousand" -> "7.1 K" for display
suffix_map = {'thousand': 'K', 'million': 'M', 'billion': 'B'}
suffix_match = re.search(r'([\d,.]+)\s*(thousand|million|billion)?', view_text, re.IGNORECASE)
if suffix_match:
num = suffix_match.group(1)
word = suffix_match.group(2)
if word:
info['approx_view_count'] = num + ' ' + suffix_map[word.lower()]
else:
info['approx_view_count'] = '{:,}'.format(int(num.replace(',', ''))) if num.isdigit() or num.replace(',','').isdigit() else num
else:
info['approx_view_count'] = extract_approx_int(view_text)
else:
# Fallback: try "N views" at end
match2 = re.match(r'^(.*?),\s*(.+views?)$', cleaned, re.IGNORECASE)
if match2:
info['title'] = match2.group(1).strip()
info['approx_view_count'] = extract_approx_int(match2.group(2))
else:
info['title'] = cleaned
# Overlay text (usually has the title too)
overlay_metadata = deep_get(item, 'overlayMetadata',
'secondaryText', 'content')
if overlay_metadata and not info['approx_view_count']:
info['approx_view_count'] = extract_approx_int(overlay_metadata)
primary_text = deep_get(item, 'overlayMetadata',
'primaryText', 'content')
if primary_text and not info['title']:
info['title'] = primary_text
info['duration'] = ''
info['time_published'] = None
info['description'] = None
info['badges'] = []
info['author'] = None
info['author_id'] = None
info['author_url'] = None
info['index'] = None
info.update(additional_info)
return info
def extract_item_info(item, additional_info={}): def extract_item_info(item, additional_info={}):
if not item: if not item:
return {'error': 'No item given'} return {'error': 'No item given'}
@@ -353,6 +431,10 @@ def extract_item_info(item, additional_info={}):
if type == 'lockupViewModel': if type == 'lockupViewModel':
return extract_lockup_view_model_info(item, additional_info) return extract_lockup_view_model_info(item, additional_info)
# Handle shortsLockupViewModel format (YouTube Shorts)
if type == 'shortsLockupViewModel':
return extract_shorts_lockup_view_model_info(item, additional_info)
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
# camelCase split, https://stackoverflow.com/a/37697078 # camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
@@ -561,6 +643,7 @@ _item_types = {
# New viewModel format (YouTube 2024+) # New viewModel format (YouTube 2024+)
'lockupViewModel', 'lockupViewModel',
'shortsLockupViewModel',
} }
def _traverse_browse_renderer(renderer): def _traverse_browse_renderer(renderer):