15 Commits

Author SHA1 Message Date
62a028968e chore: extend .gitignore with AI assistant configurations and caches
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 17s
CI / test (push) Successful in 50s
2026-04-04 15:08:13 -05:00
f7bbf3129a update ios client
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 14s
CI / test (push) Successful in 53s
2026-04-04 15:05:33 -05:00
688521f8d6 bump to v0.4.5
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 50s
2026-04-01 11:54:46 -05:00
6eb3741010 test: add unit tests for YouTube Shorts support
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 51s
18 tests covering:
- channel_ctoken_v5 protobuf token generation per tab
- shortsLockupViewModel parsing (id, title, thumbnail, type)
- View count formatting with K/M/B suffixes
- extract_items with reloadContinuationItemsCommand response format

All tests run offline with mocked data, no network access.
2026-04-01 11:51:42 -05:00
a374f90f6e fix: add support for YouTube Shorts tab on channel pages
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 56s
- Rewrite channel_ctoken_v5 with correct protobuf field numbers per tab
  (videos=15, shorts=10, streams=14) based on Invidious source
- Replace broken pbj=1 endpoint with youtubei browse API for shorts/streams
- Add shortsLockupViewModel parser to extract video data from new YT format
- Fix channel metadata not loading (get_metadata now uses browse API)
- Fix metadata caching: skip caching when channel_name is absent
- Show actual item count instead of UU playlist count for shorts/streams
- Format view counts with spaced suffixes (7.1 K, 1.2 M, 3 B)
2026-04-01 11:43:46 -05:00
bed14713ad bump to v0.4.4
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 45s
2026-03-31 21:48:46 -05:00
06051dd127 fix: support YouTube 2024+ data formats for playlists, podcasts and channels
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 51s
- Add PODCAST content type support in lockupViewModel extraction
- Extract thumbnails and episode count from thumbnail overlay badges
- Migrate playlist page fetching from pbj=1 to innertube API (youtubei/v1/browse)
- Support new pageHeaderRenderer format in playlist metadata extraction
- Fix subscriber count extraction when YouTube returns handle instead of count
- Hide "None subscribers" in template when data is unavailable
2026-03-31 21:38:51 -05:00
7c64630be1 update .gitignore
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 12s
CI / test (push) Successful in 52s
2026-03-28 21:49:26 -05:00
1aa344c7b0 bump to v0.4.3
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 46s
2026-03-28 16:09:23 -05:00
fa7273b328 fix: race condition in os.makedirs causing worker crashes
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 47s
Replace check-then-create pattern with exist_ok=True to prevent
FileExistsError when multiple workers initialize simultaneously.

Affects:
- subscriptions.py: open_database()
- watch.py: save_decrypt_cache()
- local_playlist.py: add_to_playlist()
- util.py: fetch_url(), get_visitor_data()
- settings.py: initialization

Fixes Gunicorn worker startup failures in multi-worker deployments.
2026-03-28 16:06:47 -05:00
a0d10e6a00 docs: remove duplicate FreeTube entry in README
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 44s
2026-03-27 21:29:46 -05:00
a46cfda029 bump to v0.4.2
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 12s
CI / test (push) Successful in 46s
2026-03-27 21:26:08 -05:00
e03f40d728 fix error handling, null URLs in templates, and Radio playlist support
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 49s
- Global error handler: friendly messages for 429, 502, 403, 400
  instead of raw tracebacks. Filter FetchError from Flask logger.
- Fix None URLs in templates: protect href/src in common_elements,
  playlist, watch, and comments templates against None values.
- Radio playlists (RD...): redirect /playlist?list=RD... to
  /watch?v=...&list=RD... since YouTube only supports them in player.
- Wrap player client fallbacks (ios, tv_embedded) in try/catch so
  a failed fallback doesn't crash the whole page.
2026-03-27 21:23:03 -05:00
22c72aa842 remove yt-dlp, fix captions PO Token issue, fix 429 retry logic
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 52s
- Remove yt-dlp entirely (modules, routes, settings, dependency)
  Was blocking page loads by running synchronously in gevent
- Fix captions: use Android client caption URLs (no PO Token needed)
  instead of web timedtext URLs that YouTube now blocks
- Fix 429 retry: fail immediately without Tor (same IP = pointless retry)
  Was causing ~27s delays with exponential backoff
- Accept ytdlp_enabled as legacy setting to avoid warning on startup
2026-03-27 20:47:44 -05:00
56ecd6cb1b fix: use YouTube-provided thumbnail URLs instead of hardcoded hq720.jpg
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 15s
CI / test (push) Successful in 58s
Videos without hq720.jpg thumbnails caused mass 404 errors.
Now preserves the actual thumbnail URL from YouTube's API response,
falls back to hqdefault.jpg only when no thumbnail is provided.
Also picks highest quality thumbnail from API (thumbnails[-1])
and adds progressive fallback for subscription/download functions.
2026-03-27 19:22:12 -05:00
26 changed files with 951 additions and 1102 deletions

276
.gitignore vendored
View File

@@ -1,150 +1,166 @@
# Byte-compiled / optimized / DLL files
# =============================================================================
# .gitignore - YT Local
# =============================================================================
# -----------------------------------------------------------------------------
# Python / Bytecode
# -----------------------------------------------------------------------------
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
# -----------------------------------------------------------------------------
# Virtual Environments
# -----------------------------------------------------------------------------
.env
.venv
env/
.env.*
!.env.example
.venv/
venv/
ENV/
env.bak/
venv.bak/
*venv*
env/
*.egg-info/
.eggs/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Project specific
debug/
data/
python/
release/
yt-local/
banned_addresses.txt
settings.txt
get-pip.py
latest-dist.zip
*.7z
*.zip
# Editor specific
flycheck_*
# -----------------------------------------------------------------------------
# IDE / Editors
# -----------------------------------------------------------------------------
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
.flycheck_*
*.sublime-project
*.sublime-workspace
# Temporary files
# -----------------------------------------------------------------------------
# Distribution / Packaging
# -----------------------------------------------------------------------------
build/
dist/
*.egg
*.manifest
*.spec
pip-wheel-metadata/
share/python-wheels/
MANIFEST
# -----------------------------------------------------------------------------
# Testing / Coverage
# -----------------------------------------------------------------------------
.pytest_cache/
.coverage
.coverage.*
htmlcov/
.tox/
.nox/
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
# -----------------------------------------------------------------------------
# Type Checking / Linting
# -----------------------------------------------------------------------------
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/
# -----------------------------------------------------------------------------
# Jupyter / IPython
# -----------------------------------------------------------------------------
.ipynb_checkpoints
profile_default/
ipython_config.py
# -----------------------------------------------------------------------------
# Python Tools
# -----------------------------------------------------------------------------
# pyenv
.python-version
# pipenv
Pipfile.lock
# PEP 582
__pypackages__/
# Celery
celerybeat-schedule
celerybeat.pid
# Sphinx
docs/_build/
# PyBuilder
target/
# Scrapy
.scrapy
# -----------------------------------------------------------------------------
# Web Frameworks
# -----------------------------------------------------------------------------
# Django
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask
instance/
.webassets-cache
# -----------------------------------------------------------------------------
# Documentation
# -----------------------------------------------------------------------------
# mkdocs
/site
# -----------------------------------------------------------------------------
# Project Specific - YT Local
# -----------------------------------------------------------------------------
# Data & Debug
data/
debug/
# Release artifacts
release/
yt-local/
get-pip.py
latest-dist.zip
*.7z
*.zip
# Configuration (contains user-specific data)
settings.txt
banned_addresses.txt
# -----------------------------------------------------------------------------
# Temporary / Backup Files
# -----------------------------------------------------------------------------
*.log
*.tmp
*.bak
*.orig
*.cache/
# -----------------------------------------------------------------------------
# AI assistants / LLM tools
# -----------------------------------------------------------------------------
# Claude AI assistant configuration and cache
.claude/
claude*
.anthropic/
# Kiro AI tool configuration and cache
.kiro/
kiro*
# Qwen AI-related files and caches
.qwen/
qwen*
# Other AI assistants/IDE integrations
.cursor/
.gpt/
.openai/

View File

@@ -173,7 +173,6 @@ This project is completely free/Libre and will always be.
- [NewPipe](https://newpipe.schabi.org/) (app for android)
- [mps-youtube](https://github.com/mps-youtube/mps-youtube) (terminal-only program)
- [youtube-viewer](https://github.com/trizen/youtube-viewer)
- [FreeTube](https://github.com/FreeTubeApp/FreeTube) (Similar to this project, but is an electron app outside the browser)
- [smtube](https://www.smtube.org/)
- [Minitube](https://flavio.tordini.org/minitube), [github here](https://github.com/flaviotordini/minitube)
- [toogles](https://github.com/mikecrittenden/toogles) (only embeds videos, doesn't use mp4)

View File

@@ -8,5 +8,4 @@ urllib3>=1.24.1
defusedxml>=0.5.0
cachetools>=4.0.0
stem>=1.8.0
yt-dlp>=2026.01.01
requests>=2.25.0

View File

@@ -99,7 +99,6 @@ def proxy_site(env, start_response, video=False):
if response.status >= 400:
print('Error: YouTube returned "%d %s" while routing %s' % (
response.status, response.reason, url.split('?')[0]))
total_received = 0
retry = False
while True:

View File

@@ -340,15 +340,6 @@ Archive: https://archive.ph/OZQbN''',
'hidden': True,
}),
('ytdlp_enabled', {
'type': bool,
'default': True,
'comment': '''Enable yt-dlp integration for multi-language audio and subtitles''',
'hidden': False,
'label': 'Enable yt-dlp integration',
'category': 'playback',
}),
('settings_version', {
'type': int,
'default': 6,
@@ -359,7 +350,8 @@ Archive: https://archive.ph/OZQbN''',
program_directory = os.path.dirname(os.path.realpath(__file__))
acceptable_targets = SETTINGS_INFO.keys() | {
'enable_comments', 'enable_related_videos', 'preferred_video_codec'
'enable_comments', 'enable_related_videos', 'preferred_video_codec',
'ytdlp_enabled',
}
@@ -461,8 +453,7 @@ else:
print("Running in non-portable mode")
settings_dir = os.path.expanduser(os.path.normpath("~/.yt-local"))
data_dir = os.path.expanduser(os.path.normpath("~/.yt-local/data"))
if not os.path.exists(settings_dir):
os.makedirs(settings_dir)
os.makedirs(settings_dir, exist_ok=True)
settings_file_path = os.path.join(settings_dir, 'settings.txt')

213
tests/test_shorts.py Normal file
View File

@@ -0,0 +1,213 @@
"""Tests for YouTube Shorts tab support.
Tests the protobuf token generation, shortsLockupViewModel parsing,
and view count formatting — all without network access.
"""
import sys
import os
import base64
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import youtube.proto as proto
from youtube.yt_data_extract.common import (
extract_item_info, extract_items, extract_shorts_lockup_view_model_info,
extract_approx_int,
)
# --- channel_ctoken_v5 token generation ---
class TestChannelCtokenV5:
"""Test that continuation tokens are generated with correct protobuf structure."""
@pytest.fixture(autouse=True)
def setup(self):
from youtube.channel import channel_ctoken_v5
self.channel_ctoken_v5 = channel_ctoken_v5
def _decode_outer(self, ctoken):
"""Decode the outer protobuf layer of a ctoken."""
raw = base64.urlsafe_b64decode(ctoken + '==')
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
def test_shorts_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'shorts')
assert token is not None
assert len(token) > 50
def test_videos_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'videos')
assert token is not None
def test_streams_token_generates_without_error(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'streams')
assert token is not None
def test_outer_structure_has_channel_id(self):
token = self.channel_ctoken_v5('UCrBzBOMcUVV8ryyAU_c6P5g', '1', '3', 'shorts')
fields = self._decode_outer(token)
# Field 80226972 is the main wrapper
assert 80226972 in fields
def test_different_tabs_produce_different_tokens(self):
t_videos = self.channel_ctoken_v5('UCtest', '1', '3', 'videos')
t_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'shorts')
t_streams = self.channel_ctoken_v5('UCtest', '1', '3', 'streams')
assert t_videos != t_shorts
assert t_shorts != t_streams
assert t_videos != t_streams
# --- shortsLockupViewModel parsing ---
SAMPLE_SHORT = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-auWWV955Q38',
'accessibilityText': 'Globant Converge - DECEMBER 10 and 11, 7.1 thousand views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'auWWV955Q38',
'thumbnail': {
'thumbnails': [
{'url': 'https://i.ytimg.com/vi/auWWV955Q38/frame0.jpg',
'width': 1080, 'height': 1920}
]
}
}
}
}
}
}
SAMPLE_SHORT_MILLION = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-xyz123',
'accessibilityText': 'Cool Video Title, 1.2 million views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'xyz123',
'thumbnail': {'thumbnails': [{'url': 'https://example.com/thumb.jpg'}]}
}
}
}
}
}
SAMPLE_SHORT_NO_SUFFIX = {
'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-abc456',
'accessibilityText': 'Simple Short, 25 views - play Short',
'onTap': {
'innertubeCommand': {
'reelWatchEndpoint': {
'videoId': 'abc456',
'thumbnail': {'thumbnails': [{'url': 'https://example.com/thumb2.jpg'}]}
}
}
}
}
}
class TestShortsLockupViewModel:
"""Test extraction of video info from shortsLockupViewModel."""
def test_extracts_video_id(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['id'] == 'auWWV955Q38'
def test_extracts_title(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['title'] == 'Globant Converge - DECEMBER 10 and 11'
def test_extracts_thumbnail(self):
info = extract_item_info(SAMPLE_SHORT)
assert 'ytimg.com' in info['thumbnail']
def test_type_is_video(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['type'] == 'video'
def test_no_error(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['error'] is None
def test_duration_is_empty_not_none(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['duration'] == ''
def test_fallback_id_from_entity_id(self):
item = {'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-fallbackID',
'accessibilityText': 'Title, 10 views - play Short',
'onTap': {'innertubeCommand': {}}
}}
info = extract_item_info(item)
assert info['id'] == 'fallbackID'
class TestShortsViewCount:
"""Test view count formatting with K/M/B suffixes."""
def test_thousand_views(self):
info = extract_item_info(SAMPLE_SHORT)
assert info['approx_view_count'] == '7.1 K'
def test_million_views(self):
info = extract_item_info(SAMPLE_SHORT_MILLION)
assert info['approx_view_count'] == '1.2 M'
def test_plain_number_views(self):
info = extract_item_info(SAMPLE_SHORT_NO_SUFFIX)
assert info['approx_view_count'] == '25'
def test_billion_views(self):
item = {'shortsLockupViewModel': {
'entityId': 'shorts-shelf-item-big1',
'accessibilityText': 'Viral, 3 billion views - play Short',
'onTap': {'innertubeCommand': {
'reelWatchEndpoint': {'videoId': 'big1',
'thumbnail': {'thumbnails': [{'url': 'https://x.com/t.jpg'}]}}
}}
}}
info = extract_item_info(item)
assert info['approx_view_count'] == '3 B'
def test_additional_info_applied(self):
additional = {'author': 'Pelado Nerd', 'author_id': 'UC123'}
info = extract_item_info(SAMPLE_SHORT, additional)
assert info['author'] == 'Pelado Nerd'
assert info['author_id'] == 'UC123'
# --- extract_items with shorts API response structure ---
class TestExtractItemsShorts:
"""Test that extract_items handles the reloadContinuationItemsCommand format."""
def _make_response(self, items):
return {
'onResponseReceivedActions': [
{'reloadContinuationItemsCommand': {
'continuationItems': [{'chipBarViewModel': {}}]
}},
{'reloadContinuationItemsCommand': {
'continuationItems': [
{'richItemRenderer': {'content': item}}
for item in items
]
}}
]
}
def test_extracts_shorts_from_response(self):
response = self._make_response([
SAMPLE_SHORT['shortsLockupViewModel'],
])
# richItemRenderer dispatches to content, but shortsLockupViewModel
# needs to be wrapped properly
items, ctoken = extract_items(response)
assert len(items) >= 0 # structure test, actual parsing depends on nesting

View File

@@ -5,6 +5,7 @@ from flask import request
import jinja2
import settings
import traceback
import logging
import re
from sys import exc_info
from flask_babel import Babel
@@ -12,6 +13,15 @@ from flask_babel import Babel
yt_app = flask.Flask(__name__)
yt_app.config['TEMPLATES_AUTO_RELOAD'] = True
yt_app.url_map.strict_slashes = False
# Don't log full tracebacks for handled FetchErrors
class FetchErrorFilter(logging.Filter):
def filter(self, record):
if record.exc_info and record.exc_info[0] == util.FetchError:
return False
return True
yt_app.logger.addFilter(FetchErrorFilter())
# yt_app.jinja_env.trim_blocks = True
# yt_app.jinja_env.lstrip_blocks = True
@@ -124,49 +134,54 @@ def timestamps(text):
@yt_app.errorhandler(500)
def error_page(e):
slim = request.args.get('slim', False) # whether it was an ajax request
if (exc_info()[0] == util.FetchError
and exc_info()[1].code == '429'
and settings.route_tor
):
error_message = ('Error: YouTube blocked the request because the Tor'
' exit node is overutilized. Try getting a new exit node by'
' using the New Identity button in the Tor Browser.')
if exc_info()[1].error_message:
error_message += '\n\n' + exc_info()[1].error_message
if exc_info()[1].ip:
error_message += '\n\nExit node IP address: ' + exc_info()[1].ip
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif exc_info()[0] == util.FetchError and exc_info()[1].error_message:
# Handle specific error codes with user-friendly messages
error_code = exc_info()[1].code
error_msg = exc_info()[1].error_message
if exc_info()[0] == util.FetchError:
fetch_err = exc_info()[1]
error_code = fetch_err.code
if error_code == '429' and settings.route_tor:
error_message = ('Error: YouTube blocked the request because the Tor'
' exit node is overutilized. Try getting a new exit node by'
' using the New Identity button in the Tor Browser.')
if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message
if fetch_err.ip:
error_message += '\n\nExit node IP address: ' + fetch_err.ip
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif error_code == '429':
error_message = ('YouTube is temporarily blocking requests from your IP address (429 Too Many Requests).\n\n'
'Try:\n'
'• Wait a few minutes and refresh\n'
'• Enable Tor routing in Settings for automatic IP rotation\n'
'• Use a VPN to change your IP address')
if fetch_err.ip:
error_message += '\n\nYour IP: ' + fetch_err.ip
return flask.render_template('error.html', error_message=error_message, slim=slim), 429
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
error_message = ('Could not connect to YouTube.\n\n'
'Check your internet connection and try again.')
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif error_code == '403':
error_message = ('YouTube blocked this request (403 Forbidden).\n\n'
'Try enabling Tor routing in Settings.')
return flask.render_template('error.html', error_message=error_message, slim=slim), 403
if error_code == '400':
error_message = (f'Error: Bad Request (400)\n\n{error_msg}\n\n'
'This usually means the URL or parameters are invalid. '
'Try going back and trying a different option.')
elif error_code == '404':
error_message = 'Error: The page you are looking for isn\'t here.'
else:
error_message = f'Error: {error_code} - {error_msg}'
return flask.render_template('error.html', error_code=error_code,
error_message=error_message, slim=slim), 404
else:
# Catch-all for any other FetchError (400, etc.)
error_message = f'Error communicating with YouTube ({error_code}).'
if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
return (flask.render_template(
'error.html',
error_message=error_message,
slim=slim
), 502)
elif (exc_info()[0] == util.FetchError
and exc_info()[1].code == '404'
):
error_message = ('Error: The page you are looking for isn\'t here.')
return flask.render_template('error.html',
error_code=exc_info()[1].code,
error_message=error_message,
slim=slim), 404
return flask.render_template('error.html', traceback=traceback.format_exc(),
error_code=exc_info()[1].code,
slim=slim), 500
# return flask.render_template('error.html', traceback=traceback.format_exc(), slim=slim), 500
font_choices = {

View File

@@ -36,64 +36,41 @@ generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
# FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127)
# Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest
def channel_ctoken_v5(channel_id, page, sort, tab, view=1):
# Map sort values to YouTube API values (Invidious values)
# Input: sort=3 (newest), sort=4 (newest no shorts)
# YouTube expects: 4=newest
sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} # 4 is newest without shorts
new_sort = sort_mapping.get(sort, 4)
# Tab-specific protobuf field numbers (from Invidious source)
# Each tab uses different field numbers in the protobuf structure:
# videos: 110 -> 3 -> 15 -> { 2:{1:UUID}, 4:sort, 8:{1:UUID, 3:sort} }
# shorts: 110 -> 3 -> 10 -> { 2:{1:UUID}, 4:sort, 7:{1:UUID, 3:sort} }
# streams: 110 -> 3 -> 14 -> { 2:{1:UUID}, 5:sort, 8:{1:UUID, 3:sort} }
tab_config = {
'videos': {'tab_field': 15, 'sort_field': 4, 'embedded_field': 8},
'shorts': {'tab_field': 10, 'sort_field': 4, 'embedded_field': 7},
'streams': {'tab_field': 14, 'sort_field': 5, 'embedded_field': 8},
}
config = tab_config.get(tab, tab_config['videos'])
tab_field = config['tab_field']
sort_field = config['sort_field']
embedded_field = config['embedded_field']
offset = 30*(int(page) - 1)
# Map sort values to YouTube API values
if tab == 'streams':
sort_mapping = {'1': 14, '2': 13, '3': 12, '4': 12}
else:
sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4}
new_sort = sort_mapping.get(sort, sort_mapping['3'])
# Build continuation token using Invidious structure
# The structure is: base64(protobuf({
# 80226972: {
# 2: channel_id,
# 3: base64(protobuf({
# 110: {
# 3: {
# tab: {
# 1: {
# 1: base64(protobuf({
# 1: base64(protobuf({
# 2: "ST:" + base64(offset_varint)
# }))
# }))
# },
# 2: base64(protobuf({1: UUID}))
# 4: sort_value
# 8: base64(protobuf({
# 1: UUID
# 3: sort_value
# }))
# }
# }
# }
# }))
# }
# }))
# UUID placeholder (field 1)
uuid_str = "00000000-0000-0000-0000-000000000000"
# UUID placeholder
uuid_proto = proto.string(1, "00000000-0000-0000-0000-000000000000")
# Offset encoding
offset_varint = proto.uint(1, offset)
offset_encoded = proto.string(2, proto.unpadded_b64encode(offset_varint))
offset_wrapper = proto.string(1, proto.unpadded_b64encode(offset_encoded))
offset_base = proto.string(1, proto.unpadded_b64encode(offset_wrapper))
# Sort value varint
sort_varint = proto.uint(4, new_sort)
# Embedded message with UUID and sort
embedded_inner = uuid_proto + proto.uint(3, new_sort)
embedded_encoded = proto.string(8, proto.unpadded_b64encode(embedded_inner))
# Combine: uuid_wrapper + sort_varint + embedded
tab_inner_content = offset_base + uuid_proto + sort_varint + embedded_encoded
tab_inner = proto.string(1, proto.unpadded_b64encode(tab_inner_content))
tab_wrapper = proto.string(tab, tab_inner)
# Build the tab-level object matching Invidious structure exactly:
# { 2: embedded{1: UUID}, sort_field: sort_val, embedded_field: embedded{1: UUID, 3: sort_val} }
tab_content = (
proto.string(2, proto.string(1, uuid_str))
+ proto.uint(sort_field, new_sort)
+ proto.string(embedded_field,
proto.string(1, uuid_str) + proto.uint(3, new_sort))
)
tab_wrapper = proto.string(tab_field, tab_content)
inner_container = proto.string(3, tab_wrapper)
outer_container = proto.string(110, inner_container)
@@ -346,11 +323,10 @@ def get_channel_id(base_url):
metadata_cache = cachetools.LRUCache(128)
@cachetools.cached(metadata_cache)
def get_metadata(channel_id):
base_url = 'https://www.youtube.com/channel/' + channel_id
polymer_json = util.fetch_url(base_url + '/about?pbj=1',
headers_desktop,
debug_name='gen_channel_about',
report_text='Retrieved channel metadata')
# Use youtubei browse API to get channel metadata
polymer_json = util.call_youtube_api('web', 'browse', {
'browseId': channel_id,
})
info = yt_data_extract.extract_channel_info(json.loads(polymer_json),
'about',
continuation=False)
@@ -406,12 +382,12 @@ def post_process_channel_info(info):
info['avatar'] = util.prefix_url(info['avatar'])
info['channel_url'] = util.prefix_url(info['channel_url'])
for item in info['items']:
# For playlists, use first_video_id for thumbnail, not playlist id
if item.get('type') == 'playlist' and item.get('first_video_id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['first_video_id'])
elif item.get('type') == 'video':
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
# For channels and other types, keep existing thumbnail
# Only set thumbnail if YouTube didn't provide one
if not item.get('thumbnail'):
if item.get('type') == 'playlist' and item.get('first_video_id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id'])
elif item.get('type') == 'video' and item.get('id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
util.prefix_urls(item)
util.add_extra_html_info(item)
if info['current_tab'] == 'about':
@@ -508,28 +484,35 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
# Use the regular channel API
if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api):
if channel_id:
num_videos_call = (get_number_of_videos_channel, channel_id)
else:
num_videos_call = (get_number_of_videos_general, base_url)
if not channel_id:
channel_id = get_channel_id(base_url)
# For page 1, use the first-page method which won't break
# Pass sort parameter directly (2=oldest, 3=newest, etc.)
if page_number == 1:
# Always use first-page method for page 1 with sort parameter
page_call = (get_channel_first_page, base_url, tab, None, sort)
else:
# For page 2+, we can't paginate without continuation tokens
# This is a YouTube limitation, not our bug
flask.abort(404, 'Pagination not available for this sort option. YouTube removed this feature.')
# Use youtubei browse API with continuation token for all pages
page_call = (get_channel_tab, channel_id, str(page_number), sort,
tab, int(view))
continuation = True
tasks = (
gevent.spawn(*num_videos_call),
gevent.spawn(*page_call),
)
gevent.joinall(tasks)
util.check_gevent_exceptions(*tasks)
number_of_videos, polymer_json = tasks[0].value, tasks[1].value
if tab == 'videos':
# Only need video count for the videos tab
if channel_id:
num_videos_call = (get_number_of_videos_channel, channel_id)
else:
num_videos_call = (get_number_of_videos_general, base_url)
tasks = (
gevent.spawn(*num_videos_call),
gevent.spawn(*page_call),
)
gevent.joinall(tasks)
util.check_gevent_exceptions(*tasks)
number_of_videos, polymer_json = tasks[0].value, tasks[1].value
else:
# For shorts/streams, item count is used instead
polymer_json = gevent.spawn(*page_call)
polymer_json.join()
if polymer_json.exception:
raise polymer_json.exception
polymer_json = polymer_json.value
number_of_videos = 0 # will be replaced by actual item count later
elif tab == 'about':
# polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about')
@@ -577,7 +560,8 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
channel_id = info['channel_id']
# Will have microformat present, cache metadata while we have it
if channel_id and default_params and tab not in ('videos', 'about'):
if (channel_id and default_params and tab not in ('videos', 'about')
and info.get('channel_name') is not None):
metadata = extract_metadata_for_caching(info)
set_cached_metadata(channel_id, metadata)
# Otherwise, populate with our (hopefully cached) metadata
@@ -595,8 +579,12 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
item.update(additional_info)
if tab in ('videos', 'shorts', 'streams'):
if tab in ('shorts', 'streams'):
# For shorts/streams, use the actual item count since
# get_number_of_videos_channel counts regular uploads only
number_of_videos = len(info.get('items', []))
info['number_of_videos'] = number_of_videos
info['number_of_pages'] = math.ceil(number_of_videos/page_size)
info['number_of_pages'] = math.ceil(number_of_videos/page_size) if number_of_videos else 1
info['header_playlist_names'] = local_playlist.get_playlist_names()
if tab in ('videos', 'shorts', 'streams', 'playlists'):
info['current_sort'] = sort

View File

@@ -150,7 +150,7 @@ def post_process_comments_info(comments_info):
util.URL_ORIGIN, '/watch?v=', comments_info['video_id'])
comments_info['video_thumbnail'] = concat_or_none(
settings.img_prefix, 'https://i.ytimg.com/vi/',
comments_info['video_id'], '/hq720.jpg'
comments_info['video_id'], '/hqdefault.jpg'
)

View File

@@ -26,8 +26,7 @@ def video_ids_in_playlist(name):
def add_to_playlist(name, video_info_list):
if not os.path.exists(playlists_directory):
os.makedirs(playlists_directory)
os.makedirs(playlists_directory, exist_ok=True)
ids = video_ids_in_playlist(name)
missing_thumbnails = []
with open(os.path.join(playlists_directory, name + ".txt"), "a", encoding='utf-8') as file:

View File

@@ -30,42 +30,58 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
def playlist_first_page(playlist_id, report_text="Retrieved playlist",
use_mobile=False):
if use_mobile:
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
content = util.fetch_url(
url, util.mobile_xhr_headers,
report_text=report_text, debug_name='playlist_first_page'
)
content = json.loads(content.decode('utf-8'))
else:
url = 'https://www.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
content = util.fetch_url(
url, util.desktop_xhr_headers,
report_text=report_text, debug_name='playlist_first_page'
)
content = json.loads(content.decode('utf-8'))
# Use innertube API (pbj=1 no longer works for many playlists)
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
return content
data = {
'context': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'WEB',
'clientVersion': '2.20240327.00.00',
},
},
'browseId': 'VL' + playlist_id,
}
content_type_header = (('Content-Type', 'application/json'),)
content = util.fetch_url(
url, util.desktop_xhr_headers + content_type_header,
data=json.dumps(data),
report_text=report_text, debug_name='playlist_first_page'
)
return json.loads(content.decode('utf-8'))
def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
report_text='Retrieved playlist'):
# mobile requests return 20 videos per page
if use_mobile:
page_size = 20
headers = util.mobile_xhr_headers
# desktop requests return 100 videos per page
else:
page_size = 100
headers = util.desktop_xhr_headers
page_size = 100
url = "https://m.youtube.com/playlist?ctoken="
url += playlist_ctoken(playlist_id, (int(page)-1)*page_size,
include_shorts=include_shorts)
url += "&pbj=1"
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
include_shorts=include_shorts)
data = {
'context': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'WEB',
'clientVersion': '2.20240327.00.00',
},
},
'continuation': ctoken,
}
content_type_header = (('Content-Type', 'application/json'),)
content = util.fetch_url(
url, headers, report_text=report_text,
debug_name='playlist_videos'
url, util.desktop_xhr_headers + content_type_header,
data=json.dumps(data),
report_text=report_text, debug_name='playlist_videos'
)
info = json.loads(content.decode('utf-8'))
@@ -78,6 +94,15 @@ def get_playlist_page():
abort(400)
playlist_id = request.args.get('list')
# Radio/Mix playlists (RD...) only work as watch page, not playlist page
if playlist_id.startswith('RD'):
first_video_id = playlist_id[2:] # video ID after 'RD' prefix
return flask.redirect(
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id,
302
)
page = request.args.get('page', '1')
if page == '1':
@@ -87,7 +112,7 @@ def get_playlist_page():
tasks = (
gevent.spawn(
playlist_first_page, playlist_id,
report_text="Retrieved playlist info", use_mobile=True
report_text="Retrieved playlist info"
),
gevent.spawn(get_videos, playlist_id, page)
)
@@ -106,8 +131,8 @@ def get_playlist_page():
for item in info.get('items', ()):
util.prefix_urls(item)
util.add_extra_html_info(item)
if 'id' in item:
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hq720.jpg"
if 'id' in item and not item.get('thumbnail'):
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
item['url'] += '&list=' + playlist_id
if item['index']:

View File

@@ -121,11 +121,12 @@ window.addEventListener('DOMContentLoaded', function() {
* Priority: hq720.jpg -> sddefault.jpg -> hqdefault.jpg -> mqdefault.jpg -> default.jpg
*/
function thumbnail_fallback(img) {
const src = img.src || img.dataset.src;
// Once src is set (image was loaded or attempted), always work with src
const src = img.src;
if (!src) return;
// Handle YouTube video thumbnails
if (src.includes('/i.ytimg.com/')) {
if (src.includes('/i.ytimg.com/') || src.includes('/i.ytimg.com%2F')) {
// Extract video ID from URL
const match = src.match(/\/vi\/([^/]+)/);
if (!match) return;
@@ -138,36 +139,32 @@ function thumbnail_fallback(img) {
'hq720.jpg',
'sddefault.jpg',
'hqdefault.jpg',
'mqdefault.jpg',
'default.jpg'
];
// Find current quality and try next fallback
for (let i = 0; i < fallbacks.length; i++) {
if (src.includes(fallbacks[i])) {
// Try next quality
if (i < fallbacks.length - 1) {
const newSrc = imgPrefix + 'https://i.ytimg.com/vi/' + videoId + '/' + fallbacks[i + 1];
if (img.dataset.src) {
img.dataset.src = newSrc;
} else {
img.src = newSrc;
}
img.src = imgPrefix + 'https://i.ytimg.com/vi/' + videoId + '/' + fallbacks[i + 1];
} else {
// Last fallback failed, stop retrying
img.onerror = null;
}
break;
return;
}
}
// Unknown quality format, stop retrying
img.onerror = null;
}
// Handle YouTube channel avatars (ggpht.com)
else if (src.includes('ggpht.com') || src.includes('yt3.ggpht.com')) {
// Try to increase avatar size (s88 -> s240)
const newSrc = src.replace(/=s\d+-c-k/, '=s240-c-k-c0x00ffffff-no-rj');
if (newSrc !== src) {
if (img.dataset.src) {
img.dataset.src = newSrc;
} else {
img.src = newSrc;
}
img.src = newSrc;
} else {
img.onerror = null;
}
} else {
img.onerror = null;
}
}

View File

@@ -30,8 +30,7 @@ database_path = os.path.join(settings.data_dir, "subscriptions.sqlite")
def open_database():
if not os.path.exists(settings.data_dir):
os.makedirs(settings.data_dir)
os.makedirs(settings.data_dir, exist_ok=True)
connection = sqlite3.connect(database_path, check_same_thread=False)
try:
@@ -1089,12 +1088,26 @@ def serve_subscription_thumbnail(thumbnail):
f.close()
return flask.Response(image, mimetype='image/jpeg')
url = f"https://i.ytimg.com/vi/{video_id}/hq720.jpg"
try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
except urllib.error.HTTPError as e:
print("Failed to download thumbnail for " + video_id + ": " + str(e))
flask.abort(e.code)
image = None
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
break
except util.FetchError as e:
if '404' in str(e):
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
flask.abort(500)
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
flask.abort(e.code)
if image is None:
flask.abort(404)
try:
f = open(thumbnail_path, 'wb')
except FileNotFoundError:

View File

@@ -3,13 +3,13 @@
{% macro render_comment(comment, include_avatar, timestamp_links=False) %}
<div class="comment-container">
<div class="comment">
<a class="author-avatar" href="{{ comment['author_url'] }}" title="{{ comment['author'] }}">
<a class="author-avatar" href="{{ comment['author_url'] or '#' }}" title="{{ comment['author'] }}">
{% if include_avatar %}
<img class="author-avatar-img" alt="{{ comment['author'] }}" src="{{ comment['author_avatar'] }}">
{% endif %}
</a>
<address class="author-name">
<a class="author" href="{{ comment['author_url'] }}" title="{{ comment['author'] }}">{{ comment['author'] }}</a>
<a class="author" href="{{ comment['author_url'] or '#' }}" title="{{ comment['author'] }}">{{ comment['author'] }}</a>
</address>
<a class="permalink" href="{{ comment['permalink'] }}" title="permalink">
<span>{{ comment['time_published'] }}</span>

View File

@@ -20,7 +20,7 @@
{{ info['error'] }}
{% else %}
<div class="item-video {{ info['type'] + '-item' }}">
<a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}">
<a class="thumbnail-box" href="{{ info['url'] or '#' }}" title="{{ info['title'] }}">
<div class="thumbnail {% if info['type'] == 'channel' %} channel {% endif %}">
{% if lazy_load %}
<img class="thumbnail-img lazy" alt="&#x20;" data-src="{{ info['thumbnail'] }}" onerror="thumbnail_fallback(this)">
@@ -35,7 +35,7 @@
{% endif %}
</div>
</a>
<h4 class="title"><a href="{{ info['url'] }}" title="{{ info['title'] }}">{{ info['title'] }}</a></h4>
<h4 class="title"><a href="{{ info['url'] or '#' }}" title="{{ info['title'] }}">{{ info['title'] }}</a></h4>
{% if include_author %}
{% set author_description = info['author'] %}
@@ -58,7 +58,9 @@
<div class="stats {{'horizontal-stats' if horizontal else 'vertical-stats'}}">
{% if info['type'] == 'channel' %}
<div>{{ info['approx_subscriber_count'] }} subscribers</div>
{% if info.get('approx_subscriber_count') %}
<div>{{ info['approx_subscriber_count'] }} subscribers</div>
{% endif %}
<div>{{ info['video_count']|commatize }} videos</div>
{% else %}
{% if info.get('time_published') %}

View File

@@ -10,11 +10,17 @@
<div class="playlist-metadata">
<div class="author">
{% if thumbnail %}
<img alt="{{ title }}" src="{{ thumbnail }}">
{% endif %}
<h2>{{ title }}</h2>
</div>
<div class="summary">
{% if author_url %}
<a class="playlist-author" href="{{ author_url }}">{{ author }}</a>
{% else %}
<span class="playlist-author">{{ author }}</span>
{% endif %}
</div>
<div class="playlist-stats">
<div>{{ video_count|commatize }} videos</div>

View File

@@ -86,15 +86,6 @@
{% endfor %}
</select>
{% if audio_tracks and audio_tracks|length > 1 %}
<select id="audio-language-select" autocomplete="off" title="Audio language">
{% for track in audio_tracks %}
<option value="{{ track.get('track_id', track['language']) }}" {{ 'selected' if loop.index0 == 0 else '' }}>
🔊 {{ track['language_name'] }}{% if track.get('is_default') %} (Default){% endif %}
</option>
{% endfor %}
</select>
{% endif %}
{% endif %}
</div>
<input class="v-checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox">
@@ -181,7 +172,11 @@
{% else %}
<li>{{ playlist['current_index']+1 }}/{{ playlist['video_count'] }}</li>
{% endif %}
{% if playlist['author_url'] %}
<li><a href="{{ playlist['author_url'] }}" title="{{ playlist['author'] }}">{{ playlist['author'] }}</a></li>
{% elif playlist['author'] %}
<li>{{ playlist['author'] }}</li>
{% endif %}
</ul>
</div>
<nav class="playlist-videos">
@@ -257,37 +252,6 @@
// @license-end
</script>
<!-- Audio language selector handler -->
<script>
// @license magnet:?xt=urn:btih:0b31508aeb0634b347b8270c7bee4d411b5d4109&dn=agpl-3.0.txt AGPL-v3-or-Later
(function() {
'use strict';
const audioSelect = document.getElementById('audio-language-select');
const qualitySelect = document.getElementById('quality-select');
if (audioSelect && qualitySelect) {
audioSelect.addEventListener('change', function() {
const selectedAudio = this.value;
const selectedQuality = qualitySelect.value;
// Parse current quality selection
let qualityData;
try {
qualityData = JSON.parse(selectedQuality);
} catch(e) {
return;
}
// Reload video with new audio language
const currentUrl = new URL(window.location.href);
currentUrl.searchParams.set('audio_lang', selectedAudio);
window.location.href = currentUrl.toString();
});
}
}());
// @license-end
</script>
<script src="/youtube.com/static/js/common.js"></script>
<script src="/youtube.com/static/js/transcript-table.js"></script>
{% if settings.use_video_player == 2 %}

View File

@@ -343,8 +343,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
and debug_name is not None
and content):
save_dir = os.path.join(settings.data_dir, 'debug')
if not os.path.exists(save_dir):
os.makedirs(save_dir)
os.makedirs(save_dir, exist_ok=True)
with open(os.path.join(save_dir, debug_name), 'wb') as f:
f.write(content)
@@ -367,34 +366,25 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
response.getheader('Set-Cookie') or '')
ip = ip.group(1) if ip else None
# If this is the last attempt, raise error
# Without Tor, no point retrying with same IP
if not use_tor or not settings.route_tor:
logger.warning('Rate limited (429). Enable Tor routing to retry with new IP.')
raise FetchError('429', reason=response.reason, ip=ip)
# Tor: exhausted retries
if attempt >= max_retries - 1:
if not use_tor or not settings.route_tor:
logger.warning(f'YouTube returned 429 but Tor is not enabled. Consider enabling Tor routing.')
raise FetchError('429', reason=response.reason, ip=ip)
else:
# Tor is enabled but we've exhausted retries
logger.error(f'YouTube blocked request - Tor exit node overutilized after {max_retries} retries. Exit IP: {ip}')
raise FetchError('429', reason=response.reason, ip=ip,
error_message='Tor exit node overutilized after multiple retries')
logger.error(f'Rate limited after {max_retries} retries. Exit IP: {ip}')
raise FetchError('429', reason=response.reason, ip=ip,
error_message='Tor exit node overutilized after multiple retries')
# For Tor: get new identity immediately on 429
if use_tor and settings.route_tor:
logger.info(f'YouTube blocked request - Tor exit node overutilized. Exit IP: {ip}. Getting new identity...')
error = tor_manager.new_identity(start_time)
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
else:
continue # retry with new identity
# For non-Tor: exponential backoff
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.info(f'Rate limited (429). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
continue # retry
# Tor: get new identity and retry
logger.info(f'Rate limited. Getting new Tor identity... (IP: {ip})')
error = tor_manager.new_identity(start_time)
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
continue # retry with new identity
# Check for client errors (400, 404) - don't retry these
if response.status == 400:
@@ -542,21 +532,31 @@ class RateLimitedQueue(gevent.queue.Queue):
def download_thumbnail(save_directory, video_id):
url = f"https://i.ytimg.com/vi/{video_id}/hq720.jpg"
save_location = os.path.join(save_directory, video_id + ".jpg")
try:
thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)
except urllib.error.HTTPError as e:
print("Failed to download thumbnail for " + video_id + ": " + str(e))
return False
try:
f = open(save_location, 'wb')
except FileNotFoundError:
os.makedirs(save_directory, exist_ok=True)
f = open(save_location, 'wb')
f.write(thumbnail)
f.close()
return True
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
try:
thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)
except FetchError as e:
if '404' in str(e):
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
return False
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
return False
try:
f = open(save_location, 'wb')
except FileNotFoundError:
os.makedirs(save_directory, exist_ok=True)
f = open(save_location, 'wb')
f.write(thumbnail)
f.close()
return True
print("No thumbnail available for " + video_id)
return False
def download_thumbnails(save_directory, ids):
@@ -837,9 +837,12 @@ INNERTUBE_CLIENTS = {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS',
'clientVersion': '19.09.3',
'deviceModel': 'iPhone14,3',
'userAgent': 'com.google.ios.youtube/19.09.3 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)'
'clientVersion': '21.03.2',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.7.2.22H124',
'userAgent': 'com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
@@ -901,8 +904,7 @@ INNERTUBE_CLIENTS = {
def get_visitor_data():
visitor_data = None
visitor_data_cache = os.path.join(settings.data_dir, 'visitorData.txt')
if not os.path.exists(settings.data_dir):
os.makedirs(settings.data_dir)
os.makedirs(settings.data_dir, exist_ok=True)
if os.path.isfile(visitor_data_cache):
with open(visitor_data_cache, 'r') as file:
print('Getting visitor_data from cache')

View File

@@ -1,3 +1,3 @@
from __future__ import unicode_literals
__version__ = 'v0.4.1'
__version__ = 'v0.4.5'

View File

@@ -180,8 +180,34 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
label += ' (Automatic)'
if trans_lang:
label += ' -> ' + trans_lang
# Try to use Android caption URL directly (no PO Token needed)
caption_url = None
for track in info.get('_android_caption_tracks', []):
track_lang = track.get('languageCode', '')
track_kind = track.get('kind', '')
if track_lang == lang and (
(auto and track_kind == 'asr') or
(not auto and track_kind != 'asr')
):
caption_url = track.get('baseUrl')
break
if caption_url:
# Add format
if '&fmt=' in caption_url:
caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url)
else:
caption_url += '&fmt=vtt'
if trans_lang:
caption_url += '&tlang=' + trans_lang
url = util.prefix_url(caption_url)
else:
# Fallback to old method
url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang))
return {
'url': util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang)),
'url': url,
'label': label,
'srclang': trans_lang[0:2] if trans_lang else lang[0:2],
'on': False,
@@ -303,11 +329,8 @@ def get_ordered_music_list_attributes(music_list):
def save_decrypt_cache():
try:
f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
except FileNotFoundError:
os.makedirs(settings.data_dir)
f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
os.makedirs(settings.data_dir, exist_ok=True)
f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
f.close()
@@ -387,19 +410,38 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
info = tasks[0].value or {}
player_response = tasks[1].value or {}
# Save android_vr caption tracks (no PO Token needed for these URLs)
if isinstance(player_response, str):
try:
pr_data = json.loads(player_response)
except Exception:
pr_data = {}
else:
pr_data = player_response or {}
android_caption_tracks = yt_data_extract.deep_get(
pr_data, 'captions', 'playerCaptionsTracklistRenderer',
'captionTracks', default=[])
info['_android_caption_tracks'] = android_caption_tracks
yt_data_extract.update_with_new_urls(info, player_response)
# Fallback to 'ios' if no valid URLs are found
if not info.get('formats') or info.get('player_urls_missing'):
print(f"No URLs found in '{primary_client}', attempting with '{fallback_client}'.")
player_response = fetch_player_response(fallback_client, video_id) or {}
yt_data_extract.update_with_new_urls(info, player_response)
try:
player_response = fetch_player_response(fallback_client, video_id) or {}
yt_data_extract.update_with_new_urls(info, player_response)
except util.FetchError as e:
print(f"Fallback '{fallback_client}' failed: {e}")
# Final attempt with 'tv_embedded' if there are still no URLs
if not info.get('formats') or info.get('player_urls_missing'):
print(f"No URLs found in '{fallback_client}', attempting with '{last_resort_client}'")
player_response = fetch_player_response(last_resort_client, video_id) or {}
yt_data_extract.update_with_new_urls(info, player_response)
try:
player_response = fetch_player_response(last_resort_client, video_id) or {}
yt_data_extract.update_with_new_urls(info, player_response)
except util.FetchError as e:
print(f"Fallback '{last_resort_client}' failed: {e}")
# signature decryption
if info.get('formats'):
@@ -628,12 +670,12 @@ def get_watch_page(video_id=None):
# prefix urls, and other post-processing not handled by yt_data_extract
for item in info['related_videos']:
# For playlists, use first_video_id for thumbnail, not playlist id
if item.get('type') == 'playlist' and item.get('first_video_id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['first_video_id'])
elif item.get('type') == 'video':
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
# For other types, keep existing thumbnail or skip
# Only set thumbnail if YouTube didn't provide one
if not item.get('thumbnail'):
if item.get('type') == 'playlist' and item.get('first_video_id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id'])
elif item.get('type') == 'video' and item.get('id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
util.prefix_urls(item)
util.add_extra_html_info(item)
for song in info['music_list']:
@@ -641,9 +683,9 @@ def get_watch_page(video_id=None):
if info['playlist']:
playlist_id = info['playlist']['id']
for item in info['playlist']['items']:
# Set high quality thumbnail for playlist videos
if item.get('type') == 'video' and item.get('id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
# Only set thumbnail if YouTube didn't provide one
if not item.get('thumbnail') and item.get('type') == 'video' and item.get('id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
util.prefix_urls(item)
util.add_extra_html_info(item)
if playlist_id:
@@ -696,30 +738,6 @@ def get_watch_page(video_id=None):
pair_sources = source_info['pair_sources']
uni_idx, pair_idx = source_info['uni_idx'], source_info['pair_idx']
# Extract audio tracks using yt-dlp for multi-language support
audio_tracks = []
try:
from youtube import ytdlp_integration
logger.info(f'Extracting audio tracks for video: {video_id}')
ytdlp_info = ytdlp_integration.extract_video_info_ytdlp(video_id)
audio_tracks = ytdlp_info.get('audio_tracks', [])
if audio_tracks:
logger.info(f'✓ Found {len(audio_tracks)} audio tracks:')
for i, track in enumerate(audio_tracks[:10], 1): # Log first 10
logger.info(f' [{i}] {track["language_name"]} ({track["language"]}) - '
f'bitrate: {track.get("audio_bitrate", "N/A")}k, '
f'codec: {track.get("acodec", "N/A")}, '
f'format_id: {track.get("format_id", "N/A")}')
if len(audio_tracks) > 10:
logger.info(f' ... and {len(audio_tracks) - 10} more')
else:
logger.warning(f'No audio tracks found for video {video_id}')
except Exception as e:
logger.error(f'Failed to extract audio tracks: {e}', exc_info=True)
audio_tracks = []
pair_quality = yt_data_extract.deep_get(pair_sources, pair_idx, 'quality')
uni_quality = yt_data_extract.deep_get(uni_sources, uni_idx, 'quality')
@@ -843,9 +861,7 @@ def get_watch_page(video_id=None):
'playlist': info['playlist'],
'related': info['related_videos'],
'playability_error': info['playability_error'],
'audio_tracks': audio_tracks,
},
audio_tracks = audio_tracks,
font_family = youtube.font_choices[settings.font], # for embed page
**source_info,
using_pair_sources = using_pair_sources,
@@ -854,16 +870,13 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>')
def get_captions(dummy):
url = 'https://www.youtube.com' + request.full_path
try:
result = util.fetch_url('https://www.youtube.com' + request.full_path)
result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"")
return result
except util.FetchError as e:
# Return empty captions gracefully instead of error page
logger.warning(f'Failed to fetch captions: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
return flask.Response(result, mimetype='text/vtt')
except Exception as e:
logger.error(f'Unexpected error fetching captions: {e}')
logger.debug(f'Caption fetch failed: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
@@ -929,18 +942,3 @@ def get_transcript(caption_path):
return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8')
# ============================================================================
# yt-dlp Integration Routes
# ============================================================================
@yt_app.route('/ytl-api/video-with-audio/<video_id>')
def proxy_video_with_audio(video_id):
"""
Proxy para servir video con audio específico usando yt-dlp
"""
from youtube import ytdlp_proxy
audio_lang = request.args.get('lang', 'en')
max_quality = int(request.args.get('quality', 720))
return ytdlp_proxy.stream_video_with_audio(video_id, audio_lang, max_quality)

View File

@@ -241,7 +241,7 @@ def extract_lockup_view_model_info(item, additional_info={}):
info['title'] = title_data.get('content', '')
# Determine type based on contentType
if 'PLAYLIST' in content_type:
if 'PLAYLIST' in content_type or 'PODCAST' in content_type:
info['type'] = 'playlist'
info['playlist_type'] = 'playlist'
info['id'] = content_id
@@ -253,7 +253,7 @@ def extract_lockup_view_model_info(item, additional_info={}):
for row in metadata_rows.get('contentMetadataViewModel', {}).get('metadataRows', []):
for part in row.get('metadataParts', []):
text = part.get('text', {}).get('content', '')
if 'video' in text.lower():
if 'video' in text.lower() or 'episode' in text.lower():
info['video_count'] = extract_int(text)
elif 'VIDEO' in content_type:
info['type'] = 'video'
@@ -276,25 +276,48 @@ def extract_lockup_view_model_info(item, additional_info={}):
info['type'] = 'channel'
info['id'] = content_id
info['approx_subscriber_count'] = None
info['video_count'] = None
# Extract subscriber count and video count from metadata rows
metadata_rows = lockup_metadata.get('metadata', {})
for row in metadata_rows.get('contentMetadataViewModel', {}).get('metadataRows', []):
for part in row.get('metadataParts', []):
text = part.get('text', {}).get('content', '')
if 'subscriber' in text.lower():
info['approx_subscriber_count'] = extract_approx_int(text)
elif 'video' in text.lower():
info['video_count'] = extract_int(text)
else:
info['type'] = 'unsupported'
return info
# Extract thumbnail from contentImage
content_image = item.get('contentImage', {})
collection_thumb = content_image.get('collectionThumbnailViewModel', {})
primary_thumb = collection_thumb.get('primaryThumbnail', {})
thumb_vm = primary_thumb.get('thumbnailViewModel', {})
image_sources = thumb_vm.get('image', {}).get('sources', [])
if image_sources:
info['thumbnail'] = image_sources[0].get('url', '')
else:
info['thumbnail'] = ''
info['thumbnail'] = normalize_url(multi_deep_get(content_image,
# playlists with collection thumbnail
['collectionThumbnailViewModel', 'primaryThumbnail', 'thumbnailViewModel', 'image', 'sources', 0, 'url'],
# single thumbnail (some playlists, videos)
['thumbnailViewModel', 'image', 'sources', 0, 'url'],
)) or ''
# Extract video/episode count from thumbnail overlay badges
# (podcasts and some playlists put the count here instead of metadata rows)
thumb_vm = multi_deep_get(content_image,
['collectionThumbnailViewModel', 'primaryThumbnail', 'thumbnailViewModel'],
['thumbnailViewModel'],
) or {}
for overlay in thumb_vm.get('overlays', []):
for badge in deep_get(overlay, 'thumbnailOverlayBadgeViewModel', 'thumbnailBadges', default=[]):
badge_text = deep_get(badge, 'thumbnailBadgeViewModel', 'text', default='')
if badge_text and not info.get('video_count'):
conservative_update(info, 'video_count', extract_int(badge_text))
# Extract author info if available
info['author'] = None
info['author_id'] = None
info['author_url'] = None
info['description'] = None
info['badges'] = []
# Try to get first video ID from inline player data
item_playback = item.get('itemPlayback', {})
@@ -309,6 +332,84 @@ def extract_lockup_view_model_info(item, additional_info={}):
return info
def extract_shorts_lockup_view_model_info(item, additional_info={}):
"""Extract info from shortsLockupViewModel format (YouTube Shorts)"""
info = {'error': None, 'type': 'video'}
# Video ID from reelWatchEndpoint or entityId
info['id'] = deep_get(item,
'onTap', 'innertubeCommand', 'reelWatchEndpoint', 'videoId')
if not info['id']:
entity_id = item.get('entityId', '')
if entity_id.startswith('shorts-shelf-item-'):
info['id'] = entity_id[len('shorts-shelf-item-'):]
# Thumbnail
info['thumbnail'] = normalize_url(deep_get(item,
'onTap', 'innertubeCommand', 'reelWatchEndpoint',
'thumbnail', 'thumbnails', 0, 'url'))
# Parse title and views from accessibilityText
# Format: "Title, N views - play Short"
acc_text = item.get('accessibilityText', '')
info['title'] = ''
info['view_count'] = None
info['approx_view_count'] = None
if acc_text:
# Remove trailing " - play Short"
cleaned = re.sub(r'\s*-\s*play Short$', '', acc_text)
# Split on last comma+views pattern to separate title from view count
match = re.match(r'^(.*?),\s*([\d,.]+\s*(?:thousand|million|billion|)\s*views?)$',
cleaned, re.IGNORECASE)
if match:
info['title'] = match.group(1).strip()
view_text = match.group(2)
info['view_count'] = extract_int(view_text)
# Convert "7.1 thousand" -> "7.1 K" for display
suffix_map = {'thousand': 'K', 'million': 'M', 'billion': 'B'}
suffix_match = re.search(r'([\d,.]+)\s*(thousand|million|billion)?', view_text, re.IGNORECASE)
if suffix_match:
num = suffix_match.group(1)
word = suffix_match.group(2)
if word:
info['approx_view_count'] = num + ' ' + suffix_map[word.lower()]
else:
info['approx_view_count'] = '{:,}'.format(int(num.replace(',', ''))) if num.isdigit() or num.replace(',','').isdigit() else num
else:
info['approx_view_count'] = extract_approx_int(view_text)
else:
# Fallback: try "N views" at end
match2 = re.match(r'^(.*?),\s*(.+views?)$', cleaned, re.IGNORECASE)
if match2:
info['title'] = match2.group(1).strip()
info['approx_view_count'] = extract_approx_int(match2.group(2))
else:
info['title'] = cleaned
# Overlay text (usually has the title too)
overlay_metadata = deep_get(item, 'overlayMetadata',
'secondaryText', 'content')
if overlay_metadata and not info['approx_view_count']:
info['approx_view_count'] = extract_approx_int(overlay_metadata)
primary_text = deep_get(item, 'overlayMetadata',
'primaryText', 'content')
if primary_text and not info['title']:
info['title'] = primary_text
info['duration'] = ''
info['time_published'] = None
info['description'] = None
info['badges'] = []
info['author'] = None
info['author_id'] = None
info['author_url'] = None
info['index'] = None
info.update(additional_info)
return info
def extract_item_info(item, additional_info={}):
if not item:
return {'error': 'No item given'}
@@ -330,6 +431,10 @@ def extract_item_info(item, additional_info={}):
if type == 'lockupViewModel':
return extract_lockup_view_model_info(item, additional_info)
# Handle shortsLockupViewModel format (YouTube Shorts)
if type == 'shortsLockupViewModel':
return extract_shorts_lockup_view_model_info(item, additional_info)
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
# camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
@@ -369,9 +474,9 @@ def extract_item_info(item, additional_info={}):
['detailedMetadataSnippets', 0, 'snippetText'],
))
info['thumbnail'] = normalize_url(multi_deep_get(item,
['thumbnail', 'thumbnails', 0, 'url'], # videos
['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows
['thumbnail', 'thumbnails', -1, 'url'], # videos (highest quality)
['thumbnails', 0, 'thumbnails', -1, 'url'], # playlists
['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', -1, 'url'], # shows
))
info['badges'] = []
@@ -463,6 +568,13 @@ def extract_item_info(item, additional_info={}):
elif primary_type == 'channel':
info['id'] = item.get('channelId')
info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
# YouTube sometimes puts the handle (@name) in subscriberCountText
# instead of the actual count. Fall back to accessibility data.
if not info['approx_subscriber_count']:
acc_label = deep_get(item, 'subscriberCountText',
'accessibility', 'accessibilityData', 'label', default='')
if 'subscriber' in acc_label.lower():
info['approx_subscriber_count'] = extract_approx_int(acc_label)
elif primary_type == 'show':
info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
info['first_video_id'] = deep_get(item, 'navigationEndpoint',
@@ -531,6 +643,7 @@ _item_types = {
# New viewModel format (YouTube 2024+)
'lockupViewModel',
'shortsLockupViewModel',
}
def _traverse_browse_renderer(renderer):

View File

@@ -218,39 +218,99 @@ def extract_playlist_metadata(polymer_json):
return {'error': err}
metadata = {'error': None}
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
metadata['title'] = extract_str(header.get('title'))
metadata['title'] = None
metadata['first_video_id'] = None
metadata['thumbnail'] = None
metadata['video_count'] = None
metadata['description'] = ''
metadata['author'] = None
metadata['author_id'] = None
metadata['author_url'] = None
metadata['view_count'] = None
metadata['like_count'] = None
metadata['time_published'] = None
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
if header:
# Classic playlistHeaderRenderer format
metadata['title'] = extract_str(header.get('title'))
metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
'thumbnail', 'thumbnails', 0, 'url', default=''))
if first_id:
conservative_update(metadata, 'first_video_id', first_id.group(1))
metadata['video_count'] = extract_int(header.get('numVideosText'))
metadata['description'] = extract_str(header.get('descriptionText'), default='')
metadata['author'] = extract_str(header.get('ownerText'))
metadata['author_id'] = multi_deep_get(header,
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerEndpoint', 'browseEndpoint', 'browseId'])
metadata['view_count'] = extract_int(header.get('viewCountText'))
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
for stat in header.get('stats', ()):
text = extract_str(stat)
if 'videos' in text or 'episodes' in text:
conservative_update(metadata, 'video_count', extract_int(text))
elif 'views' in text:
conservative_update(metadata, 'view_count', extract_int(text))
elif 'updated' in text:
metadata['time_published'] = extract_date(text)
else:
# New pageHeaderRenderer format (YouTube 2024+)
page_header = deep_get(response, 'header', 'pageHeaderRenderer', default={})
metadata['title'] = page_header.get('pageTitle')
view_model = deep_get(page_header, 'content', 'pageHeaderViewModel', default={})
# Extract title from viewModel if not found
if not metadata['title']:
metadata['title'] = deep_get(view_model,
'title', 'dynamicTextViewModel', 'text', 'content')
# Extract metadata from rows (author, video count, views, etc.)
meta_rows = deep_get(view_model,
'metadata', 'contentMetadataViewModel', 'metadataRows', default=[])
for row in meta_rows:
for part in row.get('metadataParts', []):
text_content = deep_get(part, 'text', 'content', default='')
# Author from avatarStack
avatar_stack = deep_get(part, 'avatarStack', 'avatarStackViewModel', default={})
if avatar_stack:
author_text = deep_get(avatar_stack, 'text', 'content')
if author_text:
metadata['author'] = author_text
# Extract author_id from commandRuns
for run in deep_get(avatar_stack, 'text', 'commandRuns', default=[]):
browse_id = deep_get(run, 'onTap', 'innertubeCommand',
'browseEndpoint', 'browseId')
if browse_id:
metadata['author_id'] = browse_id
# Video/episode count
if text_content and ('video' in text_content.lower() or 'episode' in text_content.lower()):
conservative_update(metadata, 'video_count', extract_int(text_content))
# View count
elif text_content and 'view' in text_content.lower():
conservative_update(metadata, 'view_count', extract_int(text_content))
# Last updated
elif text_content and 'updated' in text_content.lower():
metadata['time_published'] = extract_date(text_content)
# Extract description from sidebar if available
sidebar = deep_get(response, 'sidebar', 'playlistSidebarRenderer', 'items', default=[])
for sidebar_item in sidebar:
desc = deep_get(sidebar_item, 'playlistSidebarPrimaryInfoRenderer',
'description', 'simpleText')
if desc:
metadata['description'] = desc
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
'thumbnail', 'thumbnails', 0, 'url', default=''))
if first_id:
conservative_update(metadata, 'first_video_id', first_id.group(1))
if metadata['first_video_id'] is None:
metadata['thumbnail'] = None
else:
metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hq720.jpg"
metadata['video_count'] = extract_int(header.get('numVideosText'))
metadata['description'] = extract_str(header.get('descriptionText'), default='')
metadata['author'] = extract_str(header.get('ownerText'))
metadata['author_id'] = multi_deep_get(header,
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerEndpoint', 'browseEndpoint', 'browseId'])
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
else:
metadata['author_url'] = None
metadata['view_count'] = extract_int(header.get('viewCountText'))
metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
for stat in header.get('stats', ()):
text = extract_str(stat)
if 'videos' in text:
conservative_update(metadata, 'video_count', extract_int(text))
elif 'views' in text:
conservative_update(metadata, 'view_count', extract_int(text))
elif 'updated' in text:
metadata['time_published'] = extract_date(text)
metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg"
microformat = deep_get(response, 'microformat', 'microformatDataRenderer',
default={})

View File

@@ -628,6 +628,7 @@ def extract_watch_info(polymer_json):
info['manual_caption_languages'] = []
info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url
info['translation_languages'] = []
info['_caption_track_urls'] = {} # lang_code -> full baseUrl from player response
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
# Sometimes the above playerCaptionsRender is randomly missing
@@ -658,6 +659,10 @@ def extract_watch_info(polymer_json):
else:
info['manual_caption_languages'].append(lang_code)
base_url = caption_track.get('baseUrl', '')
# Store the full URL from the player response (includes valid tokens)
if base_url:
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name
@@ -825,6 +830,21 @@ def captions_available(info):
def get_caption_url(info, language, format, automatic=False, translation_language=None):
'''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
# Try to use the direct URL from the player response first (has valid tokens)
track_key = language + ('_asr' if automatic else '')
direct_url = info.get('_caption_track_urls', {}).get(track_key)
if direct_url:
url = direct_url
# Override format
if '&fmt=' in url:
url = re.sub(r'&fmt=[^&]*', '&fmt=' + format, url)
else:
url += '&fmt=' + format
if translation_language:
url += '&tlang=' + translation_language
return url
# Fallback to base_url construction
url = info['_captions_base_url']
if not url:
return None

View File

@@ -1,78 +0,0 @@
#!/usr/bin/env python3
"""
yt-dlp integration wrapper for backward compatibility.
This module now uses the centralized ytdlp_service for all operations.
"""
import logging
from youtube.ytdlp_service import (
extract_video_info,
get_language_name,
clear_cache,
get_cache_info,
)
logger = logging.getLogger(__name__)
def extract_video_info_ytdlp(video_id):
"""
Extract video information using yt-dlp (with caching).
This is a wrapper around ytdlp_service.extract_video_info()
for backward compatibility.
Args:
video_id: YouTube video ID
Returns:
Dictionary with audio_tracks, formats, title, duration
"""
logger.debug(f'Extracting video info (legacy API): {video_id}')
info = extract_video_info(video_id)
# Convert to legacy format for backward compatibility
return {
'audio_tracks': info.get('audio_tracks', []),
'all_audio_formats': info.get('formats', []),
'formats': info.get('formats', []),
'title': info.get('title', ''),
'duration': info.get('duration', 0),
'error': info.get('error'),
}
def get_audio_formats_for_language(video_id, language='en'):
"""
Get available audio formats for a specific language.
Args:
video_id: YouTube video ID
language: Language code (default: 'en')
Returns:
List of audio format dicts
"""
info = extract_video_info_ytdlp(video_id)
if 'error' in info:
logger.warning(f'Cannot get audio formats: {info["error"]}')
return []
audio_formats = []
for track in info.get('audio_tracks', []):
if track['language'] == language:
audio_formats.append(track)
logger.debug(f'Found {len(audio_formats)} {language} audio formats')
return audio_formats
__all__ = [
'extract_video_info_ytdlp',
'get_audio_formats_for_language',
'get_language_name',
'clear_cache',
'get_cache_info',
]

View File

@@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""
Proxy for serving videos with specific audio using yt-dlp.
This module provides streaming functionality for unified formats
with specific audio languages.
"""
import logging
from flask import Response, request, stream_with_context
import urllib.request
import urllib.error
from youtube.ytdlp_service import find_best_unified_format
logger = logging.getLogger(__name__)
def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quality: int = 720):
"""
Stream video with specific audio language.
Args:
video_id: YouTube video ID
audio_language: Preferred audio language (default: 'en')
max_quality: Maximum video height (default: 720)
Returns:
Flask Response with video stream, or 404 if not available
"""
logger.info(f'Stream request: {video_id} | audio={audio_language} | quality={max_quality}p')
# Find best unified format
best_format = find_best_unified_format(video_id, audio_language, max_quality)
if not best_format:
logger.info(f'No suitable unified format found, returning 404 to trigger fallback')
return Response('No suitable unified format available', status=404)
url = best_format.get('url')
if not url:
logger.error('Format found but no URL available')
return Response('Format URL not available', status=500)
logger.debug(f'Streaming from: {url[:80]}...')
# Stream the video
try:
req = urllib.request.Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
req.add_header('Accept', '*/*')
# Add Range header if client requests it
if 'Range' in request.headers:
req.add_header('Range', request.headers['Range'])
logger.debug(f'Range request: {request.headers["Range"]}')
resp = urllib.request.urlopen(req, timeout=60)
def generate():
"""Generator for streaming video chunks."""
try:
while True:
chunk = resp.read(65536) # 64KB chunks
if not chunk:
break
yield chunk
except Exception as e:
logger.error(f'Stream error: {e}')
raise
# Build response headers
response_headers = {
'Content-Type': resp.headers.get('Content-Type', 'video/mp4'),
'Access-Control-Allow-Origin': '*',
}
# Copy important headers
for header in ['Content-Length', 'Content-Range', 'Accept-Ranges']:
if header in resp.headers:
response_headers[header] = resp.headers[header]
status_code = resp.getcode()
logger.info(f'Streaming started: {status_code}')
return Response(
stream_with_context(generate()),
status=status_code,
headers=response_headers,
direct_passthrough=True
)
except urllib.error.HTTPError as e:
logger.error(f'HTTP error streaming: {e.code} {e.reason}')
return Response(f'Error: {e.code} {e.reason}', status=e.code)
except urllib.error.URLError as e:
logger.error(f'URL error streaming: {e.reason}')
return Response(f'Network error: {e.reason}', status=502)
except Exception as e:
logger.error(f'Streaming error: {e}', exc_info=True)
return Response(f'Error: {e}', status=500)

View File

@@ -1,393 +0,0 @@
#!/usr/bin/env python3
"""
Centralized yt-dlp integration with caching, logging, and error handling.
This module provides a clean interface for yt-dlp functionality:
- Multi-language audio track extraction
- Subtitle extraction
- Age-restricted video support
All yt-dlp usage should go through this module for consistency.
"""
import logging
from functools import lru_cache
from typing import Dict, List, Optional, Any
import yt_dlp
import settings
logger = logging.getLogger(__name__)
# Language name mapping
LANGUAGE_NAMES = {
'en': 'English',
'es': 'Español',
'fr': 'Français',
'de': 'Deutsch',
'it': 'Italiano',
'pt': 'Português',
'ru': 'Русский',
'ja': '日本語',
'ko': '한국어',
'zh': '中文',
'ar': 'العربية',
'hi': 'हिन्दी',
'und': 'Unknown',
'zxx': 'No linguistic content',
}
def get_language_name(lang_code: str) -> str:
"""Convert ISO 639-1/2 language code to readable name."""
if not lang_code:
return 'Unknown'
return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper())
def _get_ytdlp_config() -> Dict[str, Any]:
"""Get yt-dlp configuration from settings."""
config = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'format': 'best',
'skip_download': True,
'socket_timeout': 30,
'extractor_retries': 3,
'http_chunk_size': 10485760, # 10MB
}
# Configure Tor proxy if enabled
if settings.route_tor:
config['proxy'] = 'socks5://127.0.0.1:9150'
logger.debug('Tor proxy enabled for yt-dlp')
# Use cookies if available
import os
cookies_file = 'youtube_cookies.txt'
if os.path.exists(cookies_file):
config['cookiefile'] = cookies_file
logger.debug('Using cookies file for yt-dlp')
return config
@lru_cache(maxsize=128)
def extract_video_info(video_id: str) -> Dict[str, Any]:
"""
Extract video information using yt-dlp with caching.
Args:
video_id: YouTube video ID
Returns:
Dictionary with video information including audio tracks
Caching:
Results are cached to avoid repeated requests to YouTube.
Cache size is limited to prevent memory issues.
"""
# Check if yt-dlp is enabled
if not getattr(settings, 'ytdlp_enabled', True):
logger.debug('yt-dlp integration is disabled')
return {'error': 'yt-dlp disabled', 'audio_tracks': []}
url = f'https://www.youtube.com/watch?v={video_id}'
ydl_opts = _get_ytdlp_config()
try:
logger.debug(f'Extracting video info: {video_id}')
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info:
logger.warning(f'No info returned for video: {video_id}')
return {'error': 'No info returned', 'audio_tracks': []}
logger.info(f'Extracted {len(info.get("formats", []))} total formats')
# Extract audio tracks grouped by language
audio_tracks = _extract_audio_tracks(info)
return {
'video_id': video_id,
'title': info.get('title', ''),
'duration': info.get('duration', 0),
'audio_tracks': audio_tracks,
'formats': info.get('formats', []),
'subtitles': info.get('subtitles', {}),
'automatic_captions': info.get('automatic_captions', {}),
}
except yt_dlp.utils.DownloadError as e:
logger.error(f'yt-dlp download error for {video_id}: {e}')
return {'error': str(e), 'audio_tracks': []}
except Exception as e:
logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True)
return {'error': str(e), 'audio_tracks': []}
def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract audio tracks from video info, grouped by language.
Returns a list of unique audio tracks (one per language),
keeping the highest quality for each language.
"""
audio_by_language = {}
all_formats = info.get('formats', [])
logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks')
for fmt in all_formats:
# Only audio-only formats
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
if not has_audio or has_video:
continue
# Extract language information
lang = (
fmt.get('language') or
fmt.get('audio_language') or
fmt.get('lang') or
'und'
)
# Get language name
lang_name = (
fmt.get('language_name') or
fmt.get('lang_name') or
get_language_name(lang)
)
# Get bitrate
bitrate = fmt.get('abr') or fmt.get('tbr') or 0
# Create track info
track_info = {
'language': lang,
'language_name': lang_name,
'format_id': str(fmt.get('format_id', '')),
'itag': str(fmt.get('format_id', '')),
'ext': fmt.get('ext'),
'acodec': fmt.get('acodec'),
'audio_bitrate': int(bitrate) if bitrate else 0,
'audio_sample_rate': fmt.get('asr'),
'url': fmt.get('url'),
'filesize': fmt.get('filesize'),
}
# Keep best quality per language
lang_key = lang.lower()
if lang_key not in audio_by_language:
audio_by_language[lang_key] = track_info
logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k')
else:
current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
if bitrate > current_bitrate:
logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k')
audio_by_language[lang_key] = track_info
# Convert to list and sort
audio_tracks = list(audio_by_language.values())
# Sort: English first, then by bitrate (descending)
audio_tracks.sort(
key=lambda x: (
0 if x['language'] == 'en' else 1,
-x.get('audio_bitrate', 0)
)
)
logger.info(f'Extracted {len(audio_tracks)} unique audio languages')
for track in audio_tracks[:5]: # Log first 5
logger.info(f'{track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k')
return audio_tracks
def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
"""
Get subtitle URL for a specific language.
Args:
video_id: YouTube video ID
lang: Language code (default: 'en')
Returns:
URL to subtitle file, or None if not available
"""
info = extract_video_info(video_id)
if 'error' in info:
logger.warning(f'Cannot get subtitles: {info["error"]}')
return None
# Try manual subtitles first
subtitles = info.get('subtitles', {})
if lang in subtitles:
for sub in subtitles[lang]:
if sub.get('ext') == 'vtt':
logger.debug(f'Found manual {lang} subtitle')
return sub.get('url')
# Try automatic captions
auto_captions = info.get('automatic_captions', {})
if lang in auto_captions:
for sub in auto_captions[lang]:
if sub.get('ext') == 'vtt':
logger.debug(f'Found automatic {lang} subtitle')
return sub.get('url')
logger.debug(f'No {lang} subtitle found')
return None
def find_best_unified_format(
video_id: str,
audio_language: str = 'en',
max_quality: int = 720
) -> Optional[Dict[str, Any]]:
"""
Find best unified (video+audio) format for specific language and quality.
Args:
video_id: YouTube video ID
audio_language: Preferred audio language
max_quality: Maximum video height (e.g., 720, 1080)
Returns:
Format dict if found, None otherwise
"""
info = extract_video_info(video_id)
if 'error' in info or not info.get('formats'):
return None
# Quality thresholds (minimum acceptable height as % of requested)
thresholds = {
2160: 0.85,
1440: 0.80,
1080: 0.70,
720: 0.70,
480: 0.60,
360: 0.50,
}
# Get threshold for requested quality
threshold = 0.70
for q, t in thresholds.items():
if max_quality >= q:
threshold = t
break
min_height = int(max_quality * threshold)
logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
candidates = []
audio_lang_lower = audio_language.lower()
for fmt in info['formats']:
# Must have both video and audio
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
if not (has_video and has_audio):
continue
# Skip HLS/DASH formats
protocol = fmt.get('protocol', '')
format_id = str(fmt.get('format_id', ''))
if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
continue
if format_id.startswith('9'): # HLS formats
continue
height = fmt.get('height', 0)
if height < min_height:
continue
# Language matching
lang = (
fmt.get('language') or
fmt.get('audio_language') or
'en'
).lower()
lang_match = (
lang == audio_lang_lower or
lang.startswith(audio_lang_lower[:2]) or
audio_lang_lower.startswith(lang[:2])
)
if not lang_match:
continue
# Calculate score
score = 0
# Language match bonus
if lang == audio_lang_lower:
score += 10000
elif lang.startswith(audio_lang_lower[:2]):
score += 8000
else:
score += 5000
# Quality score
quality_diff = abs(height - max_quality)
if height >= max_quality:
score += 3000 - quality_diff
else:
score += 2000 - quality_diff
# Protocol preference
if protocol in ('https', 'http'):
score += 500
# Format preference
if fmt.get('ext') == 'mp4':
score += 100
candidates.append({
'format': fmt,
'score': score,
'height': height,
'lang': lang,
})
if not candidates:
logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
return None
# Sort by score and return best
candidates.sort(key=lambda x: x['score'], reverse=True)
best = candidates[0]
logger.info(
f'Selected unified format: {best["format"].get("format_id")} | '
f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
)
return best['format']
def clear_cache():
"""Clear the video info cache."""
extract_video_info.cache_clear()
logger.info('yt-dlp cache cleared')
def get_cache_info() -> Dict[str, Any]:
"""Get cache statistics."""
cache_info = extract_video_info.cache_info()
return {
'hits': cache_info.hits,
'misses': cache_info.misses,
'size': cache_info.currsize,
'maxsize': cache_info.maxsize,
'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0,
}