Files
yt-local/tests/test_shorts.py
Astounds d6190a2d0b security: harden code against command injection and path traversal
Core changes:

* enforce HTTPS URLs and remove shell usage in generate_release.py
* replace os.system calls with subprocess across the codebase
* validate external inputs (playlist names, video IDs)

Improvements and fixes:

* settings.py: fix typo (node.lineno → line_number); use isinstance() over type()
* youtube/get_app_version: improve git detection using subprocess.DEVNULL
* youtube/util.py: add cleanup helpers; use shutil.which for binary resolution

YouTube modules:

* watch.py: detect and flag HLS streams; remove unused audio_track_sources
* comments.py: return early when comments are disabled; add error handling
* local_playlist.py: validate playlist names to prevent path traversal
* subscriptions.py: replace asserts with proper error handling; validate video IDs

Cleanup:

* remove unused imports across modules (playlist, search, channel)
* reorganize package imports in youtube/**init**.py
* simplify test imports and fix cleanup_func in tests

Tests:

* tests/test_shorts.py: simplify imports
* tests/test_util.py: fix cleanup_func definition
2026-04-20 00:39:35 -05:00

266 lines
9.8 KiB
Python

"""Tests for YouTube Shorts tab support.
Tests the protobuf token generation, shortsLockupViewModel parsing,
and view count formatting — all without network access.
"""
import sys
import os
import base64
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import youtube.proto as proto
from youtube.yt_data_extract.common import (
extract_item_info, extract_items,
)
# --- channel_ctoken_v5 token generation ---
class TestChannelCtokenV5:
"""Test that continuation tokens are generated with correct protobuf structure."""
@pytest.fixture(autouse=True)
def setup(self):
from youtube.channel import channel_ctoken_v5
self.channel_ctoken_v5 = channel_ctoken_v5
def _decode_outer(self, ctoken):
"""Decode the outer protobuf layer of a ctoken."""
raw = base64.urlsafe_b64decode(ctoken + '==')
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
def test_shorts_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'shorts')
assert token is not None
assert len(token) > 50
def test_videos_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'videos')
assert token is not None
def test_streams_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'streams')
assert token is not None
def test_outer_structure_has_channel_id(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'shorts')
fields = self._decode_outer(token)
# Field 80226972 is the main wrapper
assert 80226972 in fields
def test_different_tabs_produce_different_tokens(self):
t_videos = self.channel_ctoken_v5('UCtest', '1', '3', 'videos')
t_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'shorts')
t_streams = self.channel_ctoken_v5('UCtest', '1', '3', 'streams')
assert t_videos != t_shorts
assert t_shorts != t_streams
assert t_videos != t_streams
def test_include_shorts_false_adds_filter(self):
"""Test that include_shorts=False adds the shorts filter (field 104)."""
# Token with shorts included (default)
t_with_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'videos', include_shorts=True)
# Token with shorts excluded
t_without_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'videos', include_shorts=False)
# The tokens should be different because of the shorts filter
assert t_with_shorts != t_without_shorts
# Decode and verify the filter is present
raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==')
raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==')
# Parse the outer protobuf structure
import youtube.proto as proto
outer_fields_with = list(proto.read_protobuf(raw_with_shorts))
outer_fields_without = list(proto.read_protobuf(raw_without_shorts))
# Field 80226972 contains the inner data
inner_with = [v for _, fn, v in outer_fields_with if fn == 80226972][0]
inner_without = [v for _, fn, v in outer_fields_without if fn == 80226972][0]
# Parse the inner data - field 3 contains percent-encoded base64 data
inner_fields_with = list(proto.read_protobuf(inner_with))
inner_fields_without = list(proto.read_protobuf(inner_without))
# Get field 3 data (the encoded inner which is percent-encoded base64)
encoded_inner_with = [v for _, fn, v in inner_fields_with if fn == 3][0]
encoded_inner_without = [v for _, fn, v in inner_fields_without if fn == 3][0]
# The inner without shorts should contain field 104
# Decode the percent-encoded base64 data
import urllib.parse
decoded_with = urllib.parse.unquote(encoded_inner_with.decode('ascii'))
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
# Decode the base64 data
decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==')
decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==')
# Parse the decoded protobuf data
fields_with = list(proto.read_protobuf(decoded_with_bytes))
fields_without = list(proto.read_protobuf(decoded_without_bytes))
field_numbers_with = [fn for _, fn, _ in fields_with]
field_numbers_without = [fn for _, fn, _ in fields_without]
# The 'with' version should NOT have field 104
assert 104 not in field_numbers_with
# The 'without' version SHOULD have field 104
assert 104 in field_numbers_without
# --- shortsLockupViewModel parsing ---
SAMPLE_SHORT = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-auWWV955Q38',
'accessibilityText': 'Globant Converge - DECEMBER 10 and 11, 7.1 thousand views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'auWWV955Q38',
'thumbnail': {
'thumbnails': [
{'url': 'https://i.ytimg.com/vi/auWWV955Q38/frame0.jpg',
'width': 1080, 'height': 1920}
]
}
}
}
}
}
}
SAMPLE_SHORT_MILLION = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-xyz123',
'accessibilityText': 'Cool Video Title, 1.2 million views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'xyz123',
'thumbnail': {'thumbnails': [{'url': 'https://example.com/thumb.jpg'}]}
}
}
}
}
}
SAMPLE_SHORT_NO_SUFFIX = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-abc456',
'accessibilityText': 'Simple Short, 25 views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'abc456',
'thumbnail': {'thumbnails': [{'url': 'https://example.com/thumb2.jpg'}]}
}
}
}
}
}
class TestShortsLockupViewModel:
"""Test extraction of video info from shortsLockupViewModel."""
def test_extracts_video_id(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['id'] == 'auWWV955Q38'
def test_extracts_title(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['title'] == 'Globant Converge - DECEMBER 10 and 11'
def test_extracts_thumbnail(self):
info = extract_item_info(SAMPLE_SHORT)
assert 'ytimg.com' in info['thumbnail']
def test_type_is_video(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['type'] == 'video'
def test_no_error(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['error'] is None
def test_duration_is_empty_not_none(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['duration'] == ''
def test_fallback_id_from_entity_id(self):
item = {'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-fallbackID',
'accessibilityText': 'Title, 10 views - play Short',
'onTap': {'innertubeCommand': {}}
}}
info = extract_item_info(item)
assert info['id'] == 'fallbackID'
class TestShortsViewCount:
"""Test view count formatting with K/M/B suffixes."""
def test_thousand_views(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['approx_view_count'] == '7.1 K'
def test_million_views(self):
info = extract_item_info(SAMPLE_SHORT_MILLION)
assert info['approx_view_count'] == '1.2 M'
def test_plain_number_views(self):
info = extract_item_info(SAMPLE_SHORT_NO_SUFFIX)
assert info['approx_view_count'] == '25'
def test_billion_views(self):
item = {'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-big1',
'accessibilityText': 'Viral, 3 billion views - play Short',
'onTap': {'innertubeCommand': {
'reelWatchEndpoint': {'videoId': 'big1',
'thumbnail': {'thumbnails': [{'url': 'https://x.com/t.jpg'}]}}
}}
}}
info = extract_item_info(item)
assert info['approx_view_count'] == '3 B'
def test_additional_info_applied(self):
additional = {'author': 'Pelado Nerd', 'author_id': 'UC123'}
info = extract_item_info(SAMPLE_SHORT, additional)
assert info['author'] == 'Pelado Nerd'
assert info['author_id'] == 'UC123'
# --- extract_items with shorts API response structure ---
class TestExtractItemsShorts:
"""Test that extract_items handles the reloadContinuationItemsCommand format."""
def _make_response(self, items):
return {
'onResponseReceivedActions': [
{'reloadContinuationItemsCommand': {
'continuationItems': [{'chipBarViewModel': {}}]
}},
{'reloadContinuationItemsCommand': {
'continuationItems': [
{'richItemRenderer': {'content': item}}
for item in items
]
}}
]
}
def test_extracts_shorts_from_response(self):
response = self._make_response([
SAMPLE_SHORT['shortsLockupViewModel'],
])
# richItemRenderer dispatches to content, but shortsLockupViewModel
# needs to be wrapped properly
items, ctoken = extract_items(response)
assert len(items) >= 0 # structure test, actual parsing depends on nesting