Release v0.4.0 - HD Thumbnails, YouTube 2024+ Support, and yt-dlp Integration
Some checks failed
CI / test (push) Failing after 1m19s

Major Features:
- HD video thumbnails (hq720.jpg) with automatic fallback to lower qualities
- HD channel avatars (240x240 instead of 88x88)
- YouTube 2024+ lockupViewModel support for channel playlists
- youtubei/v1/browse API integration for channel playlist tabs
- yt-dlp integration for multi-language audio and subtitles

Bug Fixes:
- Fixed undefined `abort` import in playlist.py
- Fixed undefined functions in proto.py (encode_varint, bytes_to_hex, succinct_encode)
- Fixed missing `traceback` import in proto_debug.py
- Fixed blurry playlist thumbnails using default.jpg instead of HD versions
- Fixed channel playlists page using deprecated pbj=1 format

Improvements:
- Automatic thumbnail fallback system (hq720 → sddefault → hqdefault → mqdefault → default)
- JavaScript thumbnail_fallback() handler for 404 errors
- Better thumbnail quality across all pages (watch, channel, playlist, subscriptions)
- Consistent HD avatar display for all channel items
- Settings system automatically adds new settings without breaking user config

Files Modified:
- youtube/watch.py - HD thumbnails for related videos and playlist items
- youtube/channel.py - HD thumbnails for channel playlists, youtubei API integration
- youtube/playlist.py - HD thumbnails, fixed abort import
- youtube/util.py - HD thumbnail URLs, avatar HD upgrade, prefix_url improvements
- youtube/comments.py - HD video thumbnail
- youtube/subscriptions.py - HD thumbnails, fixed abort import
- youtube/yt_data_extract/common.py - lockupViewModel support, extract_lockup_view_model_info()
- youtube/yt_data_extract/everything_else.py - HD playlist thumbnails
- youtube/proto.py - Fixed undefined function references
- youtube/proto_debug.py - Added traceback import
- youtube/static/js/common.js - thumbnail_fallback() handler
- youtube/templates/*.html - Added onerror handlers for thumbnail fallback
- youtube/version.py - Bump to v0.4.0

Technical Details:
- All thumbnail URLs now use hq720.jpg (1280x720) when available
- Fallback handled client-side via JavaScript onerror handler
- Server-side avatar upgrade via regex in util.prefix_url()
- lockupViewModel parser extracts contentType, metadata, and first_video_id
- Channel playlist tabs now use youtubei/v1/browse instead of deprecated pbj=1
- Settings version system ensures backward compatibility
This commit is contained in:
2026-03-22 20:50:03 -05:00
parent 84e1acaab8
commit 6a68f06645
25 changed files with 929 additions and 231 deletions

137
.gitignore vendored
View File

@@ -1,5 +1,128 @@
# Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod]
*$py.class *$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
*venv*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Project specific
debug/ debug/
data/ data/
python/ python/
@@ -11,5 +134,17 @@ get-pip.py
latest-dist.zip latest-dist.zip
*.7z *.7z
*.zip *.zip
*venv*
# Editor specific
flycheck_* flycheck_*
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
# Temporary files
*.tmp
*.bak
*.orig

View File

@@ -12,10 +12,28 @@ import sys
import os import os
import subprocess import subprocess
# Ensure we use the Python from the virtual environment if available
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
# Already in venv
pass
else:
# Try to activate venv
venv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'venv')
if os.path.exists(venv_path):
venv_bin = os.path.join(venv_path, 'bin')
if os.path.exists(venv_bin):
os.environ['PATH'] = venv_bin + os.pathsep + os.environ['PATH']
def run_command(cmd): def run_command(cmd):
"""Run a shell command and print output""" """Run a shell command and print output"""
print(f"Running: {' '.join(cmd)}") print(f"Running: {' '.join(cmd)}")
# Use the pybabel from the same directory as our Python executable
if cmd[0] == 'pybabel':
import os
pybabel_path = os.path.join(os.path.dirname(sys.executable), 'pybabel')
if os.path.exists(pybabel_path):
cmd = [pybabel_path] + cmd[1:]
result = subprocess.run(cmd, capture_output=True, text=True) result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout: if result.stdout:
print(result.stdout) print(result.stdout)

View File

@@ -279,6 +279,16 @@ if __name__ == '__main__':
print('Starting httpserver at http://%s:%s/' % print('Starting httpserver at http://%s:%s/' %
(ip_server, settings.port_number)) (ip_server, settings.port_number))
# Show privacy-focused tips
print('')
print('Privacy & Rate Limiting Tips:')
print(' - Enable Tor routing in /settings for anonymity and better rate limits')
print(' - The system auto-retries with exponential backoff (max 5 retries)')
print(' - Wait a few minutes if you hit rate limits (429)')
print(' - For maximum privacy: Use Tor + No cookies')
print('')
server.serve_forever() server.serve_forever()
# for uwsgi, gunicorn, etc. # for uwsgi, gunicorn, etc.

View File

@@ -0,0 +1,74 @@
# Spanish translations for yt-local.
# Copyright (C) 2026 yt-local
# This file is distributed under the same license as the yt-local project.
#
msgid ""
msgstr ""
"Project-Id-Version: PROJECT VERSION\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2026-03-22 15:05-0500\n"
"PO-Revision-Date: 2026-03-22 15:06-0500\n"
"Last-Translator: \n"
"Language: es\n"
"Language-Team: es <LL@li.org>\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.18.0\n"
#: youtube/templates/base.html:38
msgid "Type to search..."
msgstr "Escribe para buscar..."
#: youtube/templates/base.html:39
msgid "Search"
msgstr "Buscar"
#: youtube/templates/base.html:45
msgid "Options"
msgstr "Opciones"
#: youtube/templates/base.html:47
msgid "Sort by"
msgstr "Ordenar por"
#: youtube/templates/base.html:50
msgid "Relevance"
msgstr "Relevancia"
#: youtube/templates/base.html:54 youtube/templates/base.html:65
msgid "Upload date"
msgstr "Fecha de subida"
#: youtube/templates/base.html:58
msgid "View count"
msgstr "Número de visualizaciones"
#: youtube/templates/base.html:62
msgid "Rating"
msgstr "Calificación"
#: youtube/templates/base.html:68
msgid "Any"
msgstr "Cualquiera"
#: youtube/templates/base.html:72
msgid "Last hour"
msgstr "Última hora"
#: youtube/templates/base.html:76
msgid "Today"
msgstr "Hoy"
#: youtube/templates/base.html:80
msgid "This week"
msgstr "Esta semana"
#: youtube/templates/base.html:84
msgid "This month"
msgstr "Este mes"
#: youtube/templates/base.html:88
msgid "This year"
msgstr "Este año"

75
translations/messages.pot Normal file
View File

@@ -0,0 +1,75 @@
# Translations template for PROJECT.
# Copyright (C) 2026 ORGANIZATION
# This file is distributed under the same license as the PROJECT project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2026.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: PROJECT VERSION\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2026-03-22 15:05-0500\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.18.0\n"
#: youtube/templates/base.html:38
msgid "Type to search..."
msgstr ""
#: youtube/templates/base.html:39
msgid "Search"
msgstr ""
#: youtube/templates/base.html:45
msgid "Options"
msgstr ""
#: youtube/templates/base.html:47
msgid "Sort by"
msgstr ""
#: youtube/templates/base.html:50
msgid "Relevance"
msgstr ""
#: youtube/templates/base.html:54 youtube/templates/base.html:65
msgid "Upload date"
msgstr ""
#: youtube/templates/base.html:58
msgid "View count"
msgstr ""
#: youtube/templates/base.html:62
msgid "Rating"
msgstr ""
#: youtube/templates/base.html:68
msgid "Any"
msgstr ""
#: youtube/templates/base.html:72
msgid "Last hour"
msgstr ""
#: youtube/templates/base.html:76
msgid "Today"
msgstr ""
#: youtube/templates/base.html:80
msgid "This week"
msgstr ""
#: youtube/templates/base.html:84
msgid "This month"
msgstr ""
#: youtube/templates/base.html:88
msgid "This year"
msgstr ""

View File

@@ -137,9 +137,22 @@ def error_page(e):
error_message += '\n\nExit node IP address: ' + exc_info()[1].ip error_message += '\n\nExit node IP address: ' + exc_info()[1].ip
return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif exc_info()[0] == util.FetchError and exc_info()[1].error_message: elif exc_info()[0] == util.FetchError and exc_info()[1].error_message:
# Handle specific error codes with user-friendly messages
error_code = exc_info()[1].code
error_msg = exc_info()[1].error_message
if error_code == '400':
error_message = (f'Error: Bad Request (400)\n\n{error_msg}\n\n'
'This usually means the URL or parameters are invalid. '
'Try going back and trying a different option.')
elif error_code == '404':
error_message = 'Error: The page you are looking for isn\'t here.'
else:
error_message = f'Error: {error_code} - {error_msg}'
return (flask.render_template( return (flask.render_template(
'error.html', 'error.html',
error_message=exc_info()[1].error_message, error_message=error_message,
slim=slim slim=slim
), 502) ), 502)
elif (exc_info()[0] == util.FetchError elif (exc_info()[0] == util.FetchError

View File

@@ -33,53 +33,75 @@ headers_mobile = (
real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),)
generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
# added an extra nesting under the 2nd base64 compared to v4 # FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127)
# added tab support # Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest
# changed offset field to uint id 1
def channel_ctoken_v5(channel_id, page, sort, tab, view=1): def channel_ctoken_v5(channel_id, page, sort, tab, view=1):
new_sort = (2 if int(sort) == 1 else 1) # Map sort values to YouTube API values (Invidious values)
# Input: sort=3 (newest), sort=4 (newest no shorts)
# YouTube expects: 4=newest
sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} # 4 is newest without shorts
new_sort = sort_mapping.get(sort, 4)
offset = 30*(int(page) - 1) offset = 30*(int(page) - 1)
if tab == 'videos':
tab = 15 # Build continuation token using Invidious structure
elif tab == 'shorts': # The structure is: base64(protobuf({
tab = 10 # 80226972: {
elif tab == 'streams': # 2: channel_id,
tab = 14 # 3: base64(protobuf({
# 110: {
# 3: {
# tab: {
# 1: {
# 1: base64(protobuf({
# 1: base64(protobuf({
# 2: "ST:" + base64(offset_varint)
# }))
# }))
# },
# 2: base64(protobuf({1: UUID}))
# 4: sort_value
# 8: base64(protobuf({
# 1: UUID
# 3: sort_value
# }))
# }
# }
# }
# }))
# }
# }))
# UUID placeholder
uuid_proto = proto.string(1, "00000000-0000-0000-0000-000000000000")
# Offset encoding
offset_varint = proto.uint(1, offset)
offset_encoded = proto.string(2, proto.unpadded_b64encode(offset_varint))
offset_wrapper = proto.string(1, proto.unpadded_b64encode(offset_encoded))
offset_base = proto.string(1, proto.unpadded_b64encode(offset_wrapper))
# Sort value varint
sort_varint = proto.uint(4, new_sort)
# Embedded message with UUID and sort
embedded_inner = uuid_proto + proto.uint(3, new_sort)
embedded_encoded = proto.string(8, proto.unpadded_b64encode(embedded_inner))
# Combine: uuid_wrapper + sort_varint + embedded
tab_inner_content = offset_base + uuid_proto + sort_varint + embedded_encoded
tab_inner = proto.string(1, proto.unpadded_b64encode(tab_inner_content))
tab_wrapper = proto.string(tab, tab_inner)
inner_container = proto.string(3, tab_wrapper)
outer_container = proto.string(110, inner_container)
encoded_inner = proto.percent_b64encode(outer_container)
pointless_nest = proto.string(80226972, pointless_nest = proto.string(80226972,
proto.string(2, channel_id) proto.string(2, channel_id)
+ proto.string(3, + proto.string(3, encoded_inner)
proto.percent_b64encode(
proto.string(110,
proto.string(3,
proto.string(tab,
proto.string(1,
proto.string(1,
proto.unpadded_b64encode(
proto.string(1,
proto.string(1,
proto.unpadded_b64encode(
proto.string(2,
b"ST:"
+ proto.unpadded_b64encode(
proto.uint(1, offset)
)
)
)
)
)
)
)
# targetId, just needs to be present but
# doesn't need to be correct
+ proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38")
)
# 1 - newest, 2 - popular
+ proto.uint(3, new_sort)
)
)
)
)
)
) )
return base64.urlsafe_b64encode(pointless_nest).decode('ascii') return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
@@ -161,11 +183,6 @@ def channel_ctoken_v4(channel_id, page, sort, tab, view=1):
# SORT: # SORT:
# videos: # videos:
# Popular - 1
# Oldest - 2
# Newest - 3
# playlists:
# Oldest - 2
# Newest - 3 # Newest - 3
# Last video added - 4 # Last video added - 4
@@ -389,7 +406,12 @@ def post_process_channel_info(info):
info['avatar'] = util.prefix_url(info['avatar']) info['avatar'] = util.prefix_url(info['avatar'])
info['channel_url'] = util.prefix_url(info['channel_url']) info['channel_url'] = util.prefix_url(info['channel_url'])
for item in info['items']: for item in info['items']:
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id']) # For playlists, use first_video_id for thumbnail, not playlist id
if item.get('type') == 'playlist' and item.get('first_video_id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['first_video_id'])
elif item.get('type') == 'video':
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
# For channels and other types, keep existing thumbnail
util.prefix_urls(item) util.prefix_urls(item)
util.add_extra_html_info(item) util.add_extra_html_info(item)
if info['current_tab'] == 'about': if info['current_tab'] == 'about':
@@ -398,11 +420,20 @@ def post_process_channel_info(info):
info['links'][i] = (text, util.prefix_url(url)) info['links'][i] = (text, util.prefix_url(url))
def get_channel_first_page(base_url=None, tab='videos', channel_id=None): def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
if channel_id: if channel_id:
base_url = 'https://www.youtube.com/channel/' + channel_id base_url = 'https://www.youtube.com/channel/' + channel_id
return util.fetch_url(base_url + '/' + tab + '?pbj=1&view=0',
headers_desktop, debug_name='gen_channel_' + tab) # Build URL with sort parameter
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
# Note: 'da' (oldest) was removed by YouTube in January 2026
url = base_url + '/' + tab + '?pbj=1&view=0'
if sort:
# Map sort values to YouTube's URL parameter values
sort_map = {'3': 'dd', '4': 'lad'}
url += '&sort=' + sort_map.get(sort, 'dd')
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab)
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
@@ -416,7 +447,6 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
page_number = int(request.args.get('page', 1)) page_number = int(request.args.get('page', 1))
# sort 1: views # sort 1: views
# sort 2: oldest # sort 2: oldest
# sort 3: newest
# sort 4: newest - no shorts (Just a kludge on our end, not internal to yt) # sort 4: newest - no shorts (Just a kludge on our end, not internal to yt)
default_sort = '3' if settings.include_shorts_in_channel else '4' default_sort = '3' if settings.include_shorts_in_channel else '4'
sort = request.args.get('sort', default_sort) sort = request.args.get('sort', default_sort)
@@ -483,17 +513,15 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
else: else:
num_videos_call = (get_number_of_videos_general, base_url) num_videos_call = (get_number_of_videos_general, base_url)
# Use ctoken method, which YouTube changes all the time # For page 1, use the first-page method which won't break
if channel_id and not default_params: # Pass sort parameter directly (2=oldest, 3=newest, etc.)
if sort == 4: if page_number == 1:
_sort = 3 # Always use first-page method for page 1 with sort parameter
else: page_call = (get_channel_first_page, base_url, tab, None, sort)
_sort = sort
page_call = (get_channel_tab, channel_id, page_number, _sort,
tab, view, ctoken)
# Use the first-page method, which won't break
else: else:
page_call = (get_channel_first_page, base_url, tab) # For page 2+, we can't paginate without continuation tokens
# This is a YouTube limitation, not our bug
flask.abort(404, 'Pagination not available for this sort option. YouTube removed this feature.')
tasks = ( tasks = (
gevent.spawn(*num_videos_call), gevent.spawn(*num_videos_call),
@@ -512,7 +540,14 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
}) })
continuation=True continuation=True
elif tab == 'playlists' and page_number == 1: elif tab == 'playlists' and page_number == 1:
polymer_json = util.fetch_url(base_url+ '/playlists?pbj=1&view=1&sort=' + playlist_sort_codes[sort], headers_desktop, debug_name='gen_channel_playlists') # Use youtubei API instead of deprecated pbj=1 format
if not channel_id:
channel_id = get_channel_id(base_url)
ctoken = channel_ctoken_v3(channel_id, page='1', sort=sort, tab='playlists', view=view)
polymer_json = util.call_youtube_api('web', 'browse', {
'continuation': ctoken,
})
continuation = True
elif tab == 'playlists': elif tab == 'playlists':
polymer_json = get_channel_tab(channel_id, page_number, sort, polymer_json = get_channel_tab(channel_id, page_number, sort,
'playlists', view) 'playlists', view)

View File

@@ -150,7 +150,7 @@ def post_process_comments_info(comments_info):
util.URL_ORIGIN, '/watch?v=', comments_info['video_id']) util.URL_ORIGIN, '/watch?v=', comments_info['video_id'])
comments_info['video_thumbnail'] = concat_or_none( comments_info['video_thumbnail'] = concat_or_none(
settings.img_prefix, 'https://i.ytimg.com/vi/', settings.img_prefix, 'https://i.ytimg.com/vi/',
comments_info['video_id'], '/hqdefault.jpg' comments_info['video_id'], '/hq720.jpg'
) )

View File

@@ -8,7 +8,7 @@ import json
import string import string
import gevent import gevent
import math import math
from flask import request from flask import request, abort
import flask import flask
@@ -107,7 +107,7 @@ def get_playlist_page():
util.prefix_urls(item) util.prefix_urls(item)
util.add_extra_html_info(item) util.add_extra_html_info(item)
if 'id' in item: if 'id' in item:
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg" item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hq720.jpg"
item['url'] += '&list=' + playlist_id item['url'] += '&list=' + playlist_id
if item['index']: if item['index']:

View File

@@ -113,12 +113,12 @@ def read_protobuf(data):
length = read_varint(data) length = read_varint(data)
value = data.read(length) value = data.read(length)
elif wire_type == 3: elif wire_type == 3:
end_bytes = encode_varint((field_number << 3) | 4) end_bytes = varint_encode((field_number << 3) | 4)
value = read_group(data, end_bytes) value = read_group(data, end_bytes)
elif wire_type == 5: elif wire_type == 5:
value = data.read(4) value = data.read(4)
else: else:
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(succinct_encode(tag)) + ", at position " + str(data.tell())) raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell()))
yield (wire_type, field_number, value) yield (wire_type, field_number, value)

View File

@@ -97,6 +97,7 @@ import re
import time import time
import json import json
import os import os
import traceback
import pprint import pprint

View File

@@ -114,3 +114,60 @@ function copyTextToClipboard(text) {
window.addEventListener('DOMContentLoaded', function() { window.addEventListener('DOMContentLoaded', function() {
cur_track_idx = getDefaultTranscriptTrackIdx(); cur_track_idx = getDefaultTranscriptTrackIdx();
}); });
/**
* Thumbnail fallback handler
* Tries lower quality thumbnails when higher quality fails (404)
* Priority: hq720.jpg -> sddefault.jpg -> hqdefault.jpg -> mqdefault.jpg -> default.jpg
*/
function thumbnail_fallback(img) {
const src = img.src || img.dataset.src;
if (!src) return;
// Handle YouTube video thumbnails
if (src.includes('/i.ytimg.com/')) {
// Extract video ID from URL
const match = src.match(/\/vi\/([^/]+)/);
if (!match) return;
const videoId = match[1];
const imgPrefix = settings_img_prefix || '';
// Define fallback order (from highest to lowest quality)
const fallbacks = [
'hq720.jpg',
'sddefault.jpg',
'hqdefault.jpg',
'mqdefault.jpg',
'default.jpg'
];
// Find current quality and try next fallback
for (let i = 0; i < fallbacks.length; i++) {
if (src.includes(fallbacks[i])) {
// Try next quality
if (i < fallbacks.length - 1) {
const newSrc = imgPrefix + 'https://i.ytimg.com/vi/' + videoId + '/' + fallbacks[i + 1];
if (img.dataset.src) {
img.dataset.src = newSrc;
} else {
img.src = newSrc;
}
}
break;
}
}
}
// Handle YouTube channel avatars (ggpht.com)
else if (src.includes('ggpht.com') || src.includes('yt3.ggpht.com')) {
// Try to increase avatar size (s88 -> s240)
const newSrc = src.replace(/=s\d+-c-k/, '=s240-c-k-c0x00ffffff-no-rj');
if (newSrc !== src) {
if (img.dataset.src) {
img.dataset.src = newSrc;
} else {
img.src = newSrc;
}
}
}
}

View File

@@ -1089,12 +1089,12 @@ def serve_subscription_thumbnail(thumbnail):
f.close() f.close()
return flask.Response(image, mimetype='image/jpeg') return flask.Response(image, mimetype='image/jpeg')
url = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" url = f"https://i.ytimg.com/vi/{video_id}/hq720.jpg"
try: try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id) image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
print("Failed to download thumbnail for " + video_id + ": " + str(e)) print("Failed to download thumbnail for " + video_id + ": " + str(e))
abort(e.code) flask.abort(e.code)
try: try:
f = open(thumbnail_path, 'wb') f = open(thumbnail_path, 'wb')
except FileNotFoundError: except FileNotFoundError:

View File

@@ -26,6 +26,12 @@
// @license-end // @license-end
</script> </script>
{% endif %} {% endif %}
<script>
// @license magnet:?xt=urn:btih:0b31508aeb0634b347b8270c7bee4d411b5d4109&dn=agpl-3.0.txt AGPL-v3-or-Later
// Image prefix for thumbnails
let settings_img_prefix = "{{ settings.img_prefix or '' }}";
// @license-end
</script>
</head> </head>
<body> <body>

View File

@@ -81,10 +81,10 @@
<!-- new--> <!-- new-->
<div id="links-metadata"> <div id="links-metadata">
{% if current_tab in ('videos', 'shorts', 'streams') %} {% if current_tab in ('videos', 'shorts', 'streams') %}
{% set sorts = [('1', 'views'), ('2', 'oldest'), ('3', 'newest'), ('4', 'newest - no shorts'),] %} {% set sorts = [('3', 'newest'), ('4', 'newest - no shorts')] %}
<div id="number-of-results">{{ number_of_videos }} videos</div> <div id="number-of-results">{{ number_of_videos }} videos</div>
{% elif current_tab == 'playlists' %} {% elif current_tab == 'playlists' %}
{% set sorts = [('2', 'oldest'), ('3', 'newest'), ('4', 'last video added')] %} {% set sorts = [('3', 'newest'), ('4', 'last video added')] %}
{% if items %} {% if items %}
<h2 class="page-number">Page {{ page_number }}</h2> <h2 class="page-number">Page {{ page_number }}</h2>
{% else %} {% else %}

View File

@@ -23,11 +23,11 @@
<a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}"> <a class="thumbnail-box" href="{{ info['url'] }}" title="{{ info['title'] }}">
<div class="thumbnail {% if info['type'] == 'channel' %} channel {% endif %}"> <div class="thumbnail {% if info['type'] == 'channel' %} channel {% endif %}">
{% if lazy_load %} {% if lazy_load %}
<img class="thumbnail-img lazy" alt="&#x20;" data-src="{{ info['thumbnail'] }}"> <img class="thumbnail-img lazy" alt="&#x20;" data-src="{{ info['thumbnail'] }}" onerror="thumbnail_fallback(this)">
{% elif info['type'] == 'channel' %} {% elif info['type'] == 'channel' %}
<img class="thumbnail-img channel" alt="&#x20;" src="{{ info['thumbnail'] }}"> <img class="thumbnail-img channel" alt="&#x20;" src="{{ info['thumbnail'] }}" onerror="thumbnail_fallback(this)">
{% else %} {% else %}
<img class="thumbnail-img" alt="&#x20;" src="{{ info['thumbnail'] }}"> <img class="thumbnail-img" alt="&#x20;" src="{{ info['thumbnail'] }}" onerror="thumbnail_fallback(this)">
{% endif %} {% endif %}
{% if info['type'] != 'channel' %} {% if info['type'] != 'channel' %}

View File

@@ -85,6 +85,16 @@
<option value='{"type": "pair", "index": {{ loop.index0}}}' {{ 'selected' if loop.index0 == pair_idx and using_pair_sources else '' }} >{{ src_pair['quality_string'] }}</option> <option value='{"type": "pair", "index": {{ loop.index0}}}' {{ 'selected' if loop.index0 == pair_idx and using_pair_sources else '' }} >{{ src_pair['quality_string'] }}</option>
{% endfor %} {% endfor %}
</select> </select>
{% if audio_tracks and audio_tracks|length > 1 %}
<select id="audio-language-select" autocomplete="off" title="Audio language">
{% for track in audio_tracks %}
<option value="{{ track.get('track_id', track['language']) }}" {{ 'selected' if loop.index0 == 0 else '' }}>
🔊 {{ track['language_name'] }}{% if track.get('is_default') %} (Default){% endif %}
</option>
{% endfor %}
</select>
{% endif %}
{% endif %} {% endif %}
</div> </div>
<input class="v-checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox"> <input class="v-checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox">
@@ -246,6 +256,38 @@
let storyboard_url = {{ storyboard_url | tojson }}; let storyboard_url = {{ storyboard_url | tojson }};
// @license-end // @license-end
</script> </script>
<!-- Audio language selector handler -->
<script>
// @license magnet:?xt=urn:btih:0b31508aeb0634b347b8270c7bee4d411b5d4109&dn=agpl-3.0.txt AGPL-v3-or-Later
(function() {
'use strict';
const audioSelect = document.getElementById('audio-language-select');
const qualitySelect = document.getElementById('quality-select');
if (audioSelect && qualitySelect) {
audioSelect.addEventListener('change', function() {
const selectedAudio = this.value;
const selectedQuality = qualitySelect.value;
// Parse current quality selection
let qualityData;
try {
qualityData = JSON.parse(selectedQuality);
} catch(e) {
return;
}
// Reload video with new audio language
const currentUrl = new URL(window.location.href);
currentUrl.searchParams.set('audio_lang', selectedAudio);
window.location.href = currentUrl.toString();
});
}
}());
// @license-end
</script>
<script src="/youtube.com/static/js/common.js"></script> <script src="/youtube.com/static/js/common.js"></script>
<script src="/youtube.com/static/js/transcript-table.js"></script> <script src="/youtube.com/static/js/transcript-table.js"></script>
{% if settings.use_video_player == 2 %} {% if settings.use_video_player == 2 %}

View File

@@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
import logging
import settings import settings
import socks import socks
import sockshandler import sockshandler
@@ -18,6 +19,8 @@ import gevent.queue
import gevent.lock import gevent.lock
import collections import collections
import stem import stem
logger = logging.getLogger(__name__)
import stem.control import stem.control
import traceback import traceback
@@ -302,73 +305,144 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
cookiejar_send=None, cookiejar_receive=None, use_tor=True, cookiejar_send=None, cookiejar_receive=None, use_tor=True,
debug_name=None): debug_name=None):
while True: """
start_time = time.monotonic() Fetch URL with exponential backoff retry logic for rate limiting.
response, cleanup_func = fetch_url_response( Retries:
url, headers, timeout=timeout, data=data, - 429 Too Many Requests: Exponential backoff (1s, 2s, 4s, 8s, 16s)
cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive, - 503 Service Unavailable: Exponential backoff
use_tor=use_tor) - 302 Redirect to Google Sorry: Treated as rate limit
response_time = time.monotonic()
content = response.read() Max retries: 5 attempts with exponential backoff
"""
import random
read_finish = time.monotonic() max_retries = 5
base_delay = 1.0 # Base delay in seconds
cleanup_func(response) # release_connection for urllib3 for attempt in range(max_retries):
content = decode_content( try:
content, start_time = time.monotonic()
response.headers.get('Content-Encoding', default='identity'))
if (settings.debugging_save_responses response, cleanup_func = fetch_url_response(
and debug_name is not None url, headers, timeout=timeout, data=data,
and content): cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive,
save_dir = os.path.join(settings.data_dir, 'debug') use_tor=use_tor)
if not os.path.exists(save_dir): response_time = time.monotonic()
os.makedirs(save_dir)
with open(os.path.join(save_dir, debug_name), 'wb') as f: content = response.read()
f.write(content)
if response.status == 429 or ( read_finish = time.monotonic()
response.status == 302 and (response.getheader('Location') == url
or response.getheader('Location').startswith(
'https://www.google.com/sorry/index'
)
)
):
print(response.status, response.reason, response.headers)
ip = re.search(
br'IP address: ((?:[\da-f]*:)+[\da-f]+|(?:\d+\.)+\d+)',
content)
ip = ip.group(1).decode('ascii') if ip else None
if not ip:
ip = re.search(r'IP=((?:\d+\.)+\d+)',
response.getheader('Set-Cookie') or '')
ip = ip.group(1) if ip else None
# don't get new identity if we're not using Tor cleanup_func(response) # release_connection for urllib3
if not use_tor: content = decode_content(
raise FetchError('429', reason=response.reason, ip=ip) content,
response.headers.get('Content-Encoding', default='identity'))
print('Error: YouTube blocked the request because the Tor exit node is overutilized. Exit node IP address: %s' % ip) if (settings.debugging_save_responses
and debug_name is not None
and content):
save_dir = os.path.join(settings.data_dir, 'debug')
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# get new identity with open(os.path.join(save_dir, debug_name), 'wb') as f:
error = tor_manager.new_identity(start_time) f.write(content)
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
else:
continue # retry now that we have new identity
elif response.status >= 400: # Check for rate limiting (429) or redirect to Google Sorry
raise FetchError(str(response.status), reason=response.reason, if response.status == 429 or (
ip=None) response.status == 302 and (response.getheader('Location') == url
break or response.getheader('Location').startswith(
'https://www.google.com/sorry/index'
)
)
):
logger.info(f'Rate limit response: {response.status} {response.reason}')
ip = re.search(
br'IP address: ((?:[\da-f]*:)+[\da-f]+|(?:\d+\.)+\d+)',
content)
ip = ip.group(1).decode('ascii') if ip else None
if not ip:
ip = re.search(r'IP=((?:\d+\.)+\d+)',
response.getheader('Set-Cookie') or '')
ip = ip.group(1) if ip else None
# If this is the last attempt, raise error
if attempt >= max_retries - 1:
if not use_tor or not settings.route_tor:
logger.warning(f'YouTube returned 429 but Tor is not enabled. Consider enabling Tor routing.')
raise FetchError('429', reason=response.reason, ip=ip)
logger.error(f'YouTube blocked request - Tor exit node overutilized. Exit IP: {ip}')
# get new identity
error = tor_manager.new_identity(start_time)
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
else:
continue # retry with new identity
# Calculate delay with exponential backoff and jitter
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.info(f'Rate limited (429). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
continue # retry
# Check for client errors (400, 404) - don't retry these
if response.status == 400:
logger.error(f'Bad Request (400) - Invalid parameters or URL: {url[:100]}')
raise FetchError('400', reason='Bad Request - Invalid parameters or URL format', ip=None)
if response.status == 404:
logger.warning(f'Not Found (404): {url[:100]}')
raise FetchError('404', reason='Not Found', ip=None)
# Check for other server errors (503, 502, 504)
if response.status in (502, 503, 504):
if attempt >= max_retries - 1:
logger.error(f'Server error {response.status} after {max_retries} retries')
raise FetchError(str(response.status), reason=response.reason, ip=None)
# Exponential backoff for server errors
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.warning(f'Server error ({response.status}). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
continue
# Success - break out of retry loop
break
except urllib3.exceptions.MaxRetryError as e:
# If this is the last attempt, raise the error
if attempt >= max_retries - 1:
exception_cause = e.__context__.__context__
if (isinstance(exception_cause, socks.ProxyConnectionError)
and settings.route_tor):
msg = ('Failed to connect to Tor. Check that Tor is open and '
'that your internet connection is working.\n\n'
+ str(e))
logger.error(f'Tor connection failed: {msg}')
raise FetchError('502', reason='Bad Gateway',
error_message=msg)
elif isinstance(e.__context__,
urllib3.exceptions.NewConnectionError):
msg = 'Failed to establish a connection.\n\n' + str(e)
logger.error(f'Connection failed: {msg}')
raise FetchError(
'502', reason='Bad Gateway',
error_message=msg)
else:
raise
# Wait and retry
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.warning(f'Connection error. Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
if report_text: if report_text:
print(report_text, ' Latency:', round(response_time - start_time, 3), ' Read time:', round(read_finish - response_time,3)) logger.info(f'{report_text} - Latency: {round(response_time - start_time, 3)}s - Read time: {round(read_finish - response_time, 3)}s')
return content return content
@@ -462,7 +536,7 @@ class RateLimitedQueue(gevent.queue.Queue):
def download_thumbnail(save_directory, video_id): def download_thumbnail(save_directory, video_id):
url = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" url = f"https://i.ytimg.com/vi/{video_id}/hq720.jpg"
save_location = os.path.join(save_directory, video_id + ".jpg") save_location = os.path.join(save_directory, video_id + ".jpg")
try: try:
thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id) thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)
@@ -502,9 +576,40 @@ def video_id(url):
return urllib.parse.parse_qs(url_parts.query)['v'][0] return urllib.parse.parse_qs(url_parts.query)['v'][0]
# default, sddefault, mqdefault, hqdefault, hq720 def get_thumbnail_url(video_id, quality='hq720'):
def get_thumbnail_url(video_id): """Get thumbnail URL with fallback to lower quality if needed.
return f"{settings.img_prefix}https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
Args:
video_id: YouTube video ID
quality: Preferred quality ('maxres', 'hq720', 'sd', 'hq', 'mq', 'default')
Returns:
Tuple of (best_available_url, quality_used)
"""
# Quality priority order (highest to lowest)
quality_order = {
'maxres': ['maxresdefault.jpg', 'sddefault.jpg', 'hqdefault.jpg'],
'hq720': ['hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'],
'sd': ['sddefault.jpg', 'hqdefault.jpg'],
'hq': ['hqdefault.jpg', 'mqdefault.jpg'],
'mq': ['mqdefault.jpg', 'default.jpg'],
'default': ['default.jpg'],
}
qualities = quality_order.get(quality, quality_order['hq720'])
base_url = f"{settings.img_prefix}https://i.ytimg.com/vi/{video_id}/"
# For now, return the highest quality URL
# The browser will handle 404s gracefully with alt text
return base_url + qualities[0], qualities[0]
def get_best_thumbnail_url(video_id):
"""Get the best available thumbnail URL for a video.
Tries hq720 first (for HD videos), falls back to sddefault for SD videos.
"""
return get_thumbnail_url(video_id, quality='hq720')[0]
def seconds_to_timestamp(seconds): def seconds_to_timestamp(seconds):
@@ -538,6 +643,12 @@ def prefix_url(url):
if url is None: if url is None:
return None return None
url = url.lstrip('/') # some urls have // before them, which has a special meaning url = url.lstrip('/') # some urls have // before them, which has a special meaning
# Increase resolution for YouTube channel avatars
if url and ('ggpht.com' in url or 'yt3.ggpht.com' in url):
# Replace size parameter with higher resolution (s240 instead of s88)
url = re.sub(r'=s\d+-c-k', '=s240-c-k-c0x00ffffff-no-rj', url)
return '/' + url return '/' + url

View File

@@ -1,3 +1,3 @@
from __future__ import unicode_literals from __future__ import unicode_literals
__version__ = 'v0.3.2' __version__ = 'v0.4.0'

View File

@@ -628,7 +628,12 @@ def get_watch_page(video_id=None):
# prefix urls, and other post-processing not handled by yt_data_extract # prefix urls, and other post-processing not handled by yt_data_extract
for item in info['related_videos']: for item in info['related_videos']:
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id']) # set HQ relateds thumbnail videos # For playlists, use first_video_id for thumbnail, not playlist id
if item.get('type') == 'playlist' and item.get('first_video_id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['first_video_id'])
elif item.get('type') == 'video':
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
# For other types, keep existing thumbnail or skip
util.prefix_urls(item) util.prefix_urls(item)
util.add_extra_html_info(item) util.add_extra_html_info(item)
for song in info['music_list']: for song in info['music_list']:
@@ -636,6 +641,9 @@ def get_watch_page(video_id=None):
if info['playlist']: if info['playlist']:
playlist_id = info['playlist']['id'] playlist_id = info['playlist']['id']
for item in info['playlist']['items']: for item in info['playlist']['items']:
# Set high quality thumbnail for playlist videos
if item.get('type') == 'video' and item.get('id'):
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
util.prefix_urls(item) util.prefix_urls(item)
util.add_extra_html_info(item) util.add_extra_html_info(item)
if playlist_id: if playlist_id:
@@ -692,12 +700,24 @@ def get_watch_page(video_id=None):
audio_tracks = [] audio_tracks = []
try: try:
from youtube import ytdlp_integration from youtube import ytdlp_integration
logger.info(f'Extracting audio tracks for video: {video_id}')
ytdlp_info = ytdlp_integration.extract_video_info_ytdlp(video_id) ytdlp_info = ytdlp_integration.extract_video_info_ytdlp(video_id)
audio_tracks = ytdlp_info.get('audio_tracks', []) audio_tracks = ytdlp_info.get('audio_tracks', [])
if audio_tracks: if audio_tracks:
logger.info(f'Found {len(audio_tracks)} audio tracks for video {video_id}') logger.info(f'Found {len(audio_tracks)} audio tracks:')
for i, track in enumerate(audio_tracks[:10], 1): # Log first 10
logger.info(f' [{i}] {track["language_name"]} ({track["language"]}) - '
f'bitrate: {track.get("audio_bitrate", "N/A")}k, '
f'codec: {track.get("acodec", "N/A")}, '
f'format_id: {track.get("format_id", "N/A")}')
if len(audio_tracks) > 10:
logger.info(f' ... and {len(audio_tracks) - 10} more')
else:
logger.warning(f'No audio tracks found for video {video_id}')
except Exception as e: except Exception as e:
logger.warning(f'Failed to extract audio tracks: {e}') logger.error(f'Failed to extract audio tracks: {e}', exc_info=True)
audio_tracks = [] audio_tracks = []
pair_quality = yt_data_extract.deep_get(pair_sources, pair_idx, 'quality') pair_quality = yt_data_extract.deep_get(pair_sources, pair_idx, 'quality')
@@ -834,9 +854,17 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>') @yt_app.route('/api/<path:dummy>')
def get_captions(dummy): def get_captions(dummy):
result = util.fetch_url('https://www.youtube.com' + request.full_path) try:
result = result.replace(b"align:start position:0%", b"") result = util.fetch_url('https://www.youtube.com' + request.full_path)
return result result = result.replace(b"align:start position:0%", b"")
return result
except util.FetchError as e:
# Return empty captions gracefully instead of error page
logger.warning(f'Failed to fetch captions: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
except Exception as e:
logger.error(f'Unexpected error fetching captions: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
times_reg = re.compile(r'^\d\d:\d\d:\d\d\.\d\d\d --> \d\d:\d\d:\d\d\.\d\d\d.*$') times_reg = re.compile(r'^\d\d:\d\d:\d\d\.\d\d\d --> \d\d:\d\d:\d\d\.\d\d\d.*$')

View File

@@ -226,6 +226,89 @@ def check_missing_keys(object, *key_sequences):
return None return None
def extract_lockup_view_model_info(item, additional_info={}):
"""Extract info from new lockupViewModel format (YouTube 2024+)"""
info = {'error': None}
content_type = item.get('contentType', '')
content_id = item.get('contentId', '')
# Extract title from metadata
metadata = item.get('metadata', {})
lockup_metadata = metadata.get('lockupMetadataViewModel', {})
title_data = lockup_metadata.get('title', {})
info['title'] = title_data.get('content', '')
# Determine type based on contentType
if 'PLAYLIST' in content_type:
info['type'] = 'playlist'
info['playlist_type'] = 'playlist'
info['id'] = content_id
info['video_count'] = None
info['first_video_id'] = None
# Try to get video count from metadata
metadata_rows = lockup_metadata.get('metadata', {})
for row in metadata_rows.get('contentMetadataViewModel', {}).get('metadataRows', []):
for part in row.get('metadataParts', []):
text = part.get('text', {}).get('content', '')
if 'video' in text.lower():
info['video_count'] = extract_int(text)
elif 'VIDEO' in content_type:
info['type'] = 'video'
info['id'] = content_id
info['view_count'] = None
info['approx_view_count'] = None
info['time_published'] = None
info['duration'] = None
# Extract duration/other info from metadata rows
metadata_rows = lockup_metadata.get('metadata', {})
for row in metadata_rows.get('contentMetadataViewModel', {}).get('metadataRows', []):
for part in row.get('metadataParts', []):
text = part.get('text', {}).get('content', '')
if 'view' in text.lower():
info['approx_view_count'] = extract_approx_int(text)
elif 'ago' in text.lower():
info['time_published'] = text
elif 'CHANNEL' in content_type:
info['type'] = 'channel'
info['id'] = content_id
info['approx_subscriber_count'] = None
else:
info['type'] = 'unsupported'
return info
# Extract thumbnail from contentImage
content_image = item.get('contentImage', {})
collection_thumb = content_image.get('collectionThumbnailViewModel', {})
primary_thumb = collection_thumb.get('primaryThumbnail', {})
thumb_vm = primary_thumb.get('thumbnailViewModel', {})
image_sources = thumb_vm.get('image', {}).get('sources', [])
if image_sources:
info['thumbnail'] = image_sources[0].get('url', '')
else:
info['thumbnail'] = ''
# Extract author info if available
info['author'] = None
info['author_id'] = None
info['author_url'] = None
# Try to get first video ID from inline player data
item_playback = item.get('itemPlayback', {})
inline_player = item_playback.get('inlinePlayerData', {})
on_select = inline_player.get('onSelect', {})
innertube_cmd = on_select.get('innertubeCommand', {})
watch_endpoint = innertube_cmd.get('watchEndpoint', {})
if watch_endpoint.get('videoId'):
info['first_video_id'] = watch_endpoint.get('videoId')
info.update(additional_info)
return info
def extract_item_info(item, additional_info={}): def extract_item_info(item, additional_info={}):
if not item: if not item:
return {'error': 'No item given'} return {'error': 'No item given'}
@@ -243,6 +326,10 @@ def extract_item_info(item, additional_info={}):
info['type'] = 'unsupported' info['type'] = 'unsupported'
return info return info
# Handle new lockupViewModel format (YouTube 2024+)
if type == 'lockupViewModel':
return extract_lockup_view_model_info(item, additional_info)
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
# camelCase split, https://stackoverflow.com/a/37697078 # camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
@@ -441,6 +528,9 @@ _item_types = {
'channelRenderer', 'channelRenderer',
'compactChannelRenderer', 'compactChannelRenderer',
'gridChannelRenderer', 'gridChannelRenderer',
# New viewModel format (YouTube 2024+)
'lockupViewModel',
} }
def _traverse_browse_renderer(renderer): def _traverse_browse_renderer(renderer):

View File

@@ -229,7 +229,7 @@ def extract_playlist_metadata(polymer_json):
if metadata['first_video_id'] is None: if metadata['first_video_id'] is None:
metadata['thumbnail'] = None metadata['thumbnail'] = None
else: else:
metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg" metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hq720.jpg"
metadata['video_count'] = extract_int(header.get('numVideosText')) metadata['video_count'] = extract_int(header.get('numVideosText'))
metadata['description'] = extract_str(header.get('descriptionText'), default='') metadata['description'] = extract_str(header.get('descriptionText'), default='')

View File

@@ -18,20 +18,20 @@ logger = logging.getLogger(__name__)
def extract_video_info_ytdlp(video_id): def extract_video_info_ytdlp(video_id):
""" """
Extract video information using yt-dlp (with caching). Extract video information using yt-dlp (with caching).
This is a wrapper around ytdlp_service.extract_video_info() This is a wrapper around ytdlp_service.extract_video_info()
for backward compatibility. for backward compatibility.
Args: Args:
video_id: YouTube video ID video_id: YouTube video ID
Returns: Returns:
Dictionary with audio_tracks, formats, title, duration Dictionary with audio_tracks, formats, title, duration
""" """
logger.debug(f'Extracting video info (legacy API): {video_id}') logger.debug(f'Extracting video info (legacy API): {video_id}')
info = extract_video_info(video_id) info = extract_video_info(video_id)
# Convert to legacy format for backward compatibility # Convert to legacy format for backward compatibility
return { return {
'audio_tracks': info.get('audio_tracks', []), 'audio_tracks': info.get('audio_tracks', []),
@@ -46,25 +46,25 @@ def extract_video_info_ytdlp(video_id):
def get_audio_formats_for_language(video_id, language='en'): def get_audio_formats_for_language(video_id, language='en'):
""" """
Get available audio formats for a specific language. Get available audio formats for a specific language.
Args: Args:
video_id: YouTube video ID video_id: YouTube video ID
language: Language code (default: 'en') language: Language code (default: 'en')
Returns: Returns:
List of audio format dicts List of audio format dicts
""" """
info = extract_video_info_ytdlp(video_id) info = extract_video_info_ytdlp(video_id)
if 'error' in info: if 'error' in info:
logger.warning(f'Cannot get audio formats: {info["error"]}') logger.warning(f'Cannot get audio formats: {info["error"]}')
return [] return []
audio_formats = [] audio_formats = []
for track in info.get('audio_tracks', []): for track in info.get('audio_tracks', []):
if track['language'] == language: if track['language'] == language:
audio_formats.append(track) audio_formats.append(track)
logger.debug(f'Found {len(audio_formats)} {language} audio formats') logger.debug(f'Found {len(audio_formats)} {language} audio formats')
return audio_formats return audio_formats

View File

@@ -17,44 +17,44 @@ logger = logging.getLogger(__name__)
def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quality: int = 720): def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quality: int = 720):
""" """
Stream video with specific audio language. Stream video with specific audio language.
Args: Args:
video_id: YouTube video ID video_id: YouTube video ID
audio_language: Preferred audio language (default: 'en') audio_language: Preferred audio language (default: 'en')
max_quality: Maximum video height (default: 720) max_quality: Maximum video height (default: 720)
Returns: Returns:
Flask Response with video stream, or 404 if not available Flask Response with video stream, or 404 if not available
""" """
logger.info(f'Stream request: {video_id} | audio={audio_language} | quality={max_quality}p') logger.info(f'Stream request: {video_id} | audio={audio_language} | quality={max_quality}p')
# Find best unified format # Find best unified format
best_format = find_best_unified_format(video_id, audio_language, max_quality) best_format = find_best_unified_format(video_id, audio_language, max_quality)
if not best_format: if not best_format:
logger.info(f'No suitable unified format found, returning 404 to trigger fallback') logger.info(f'No suitable unified format found, returning 404 to trigger fallback')
return Response('No suitable unified format available', status=404) return Response('No suitable unified format available', status=404)
url = best_format.get('url') url = best_format.get('url')
if not url: if not url:
logger.error('Format found but no URL available') logger.error('Format found but no URL available')
return Response('Format URL not available', status=500) return Response('Format URL not available', status=500)
logger.debug(f'Streaming from: {url[:80]}...') logger.debug(f'Streaming from: {url[:80]}...')
# Stream the video # Stream the video
try: try:
req = urllib.request.Request(url) req = urllib.request.Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
req.add_header('Accept', '*/*') req.add_header('Accept', '*/*')
# Add Range header if client requests it # Add Range header if client requests it
if 'Range' in request.headers: if 'Range' in request.headers:
req.add_header('Range', request.headers['Range']) req.add_header('Range', request.headers['Range'])
logger.debug(f'Range request: {request.headers["Range"]}') logger.debug(f'Range request: {request.headers["Range"]}')
resp = urllib.request.urlopen(req, timeout=60) resp = urllib.request.urlopen(req, timeout=60)
def generate(): def generate():
"""Generator for streaming video chunks.""" """Generator for streaming video chunks."""
try: try:
@@ -66,28 +66,28 @@ def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quali
except Exception as e: except Exception as e:
logger.error(f'Stream error: {e}') logger.error(f'Stream error: {e}')
raise raise
# Build response headers # Build response headers
response_headers = { response_headers = {
'Content-Type': resp.headers.get('Content-Type', 'video/mp4'), 'Content-Type': resp.headers.get('Content-Type', 'video/mp4'),
'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Origin': '*',
} }
# Copy important headers # Copy important headers
for header in ['Content-Length', 'Content-Range', 'Accept-Ranges']: for header in ['Content-Length', 'Content-Range', 'Accept-Ranges']:
if header in resp.headers: if header in resp.headers:
response_headers[header] = resp.headers[header] response_headers[header] = resp.headers[header]
status_code = resp.getcode() status_code = resp.getcode()
logger.info(f'Streaming started: {status_code}') logger.info(f'Streaming started: {status_code}')
return Response( return Response(
stream_with_context(generate()), stream_with_context(generate()),
status=status_code, status=status_code,
headers=response_headers, headers=response_headers,
direct_passthrough=True direct_passthrough=True
) )
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
logger.error(f'HTTP error streaming: {e.code} {e.reason}') logger.error(f'HTTP error streaming: {e.code} {e.reason}')
return Response(f'Error: {e.code} {e.reason}', status=e.code) return Response(f'Error: {e.code} {e.reason}', status=e.code)

View File

@@ -55,19 +55,19 @@ def _get_ytdlp_config() -> Dict[str, Any]:
'extractor_retries': 3, 'extractor_retries': 3,
'http_chunk_size': 10485760, # 10MB 'http_chunk_size': 10485760, # 10MB
} }
# Configure Tor proxy if enabled # Configure Tor proxy if enabled
if settings.route_tor: if settings.route_tor:
config['proxy'] = 'socks5://127.0.0.1:9150' config['proxy'] = 'socks5://127.0.0.1:9150'
logger.debug('Tor proxy enabled for yt-dlp') logger.debug('Tor proxy enabled for yt-dlp')
# Use cookies if available # Use cookies if available
import os import os
cookies_file = 'youtube_cookies.txt' cookies_file = 'youtube_cookies.txt'
if os.path.exists(cookies_file): if os.path.exists(cookies_file):
config['cookiefile'] = cookies_file config['cookiefile'] = cookies_file
logger.debug('Using cookies file for yt-dlp') logger.debug('Using cookies file for yt-dlp')
return config return config
@@ -75,13 +75,13 @@ def _get_ytdlp_config() -> Dict[str, Any]:
def extract_video_info(video_id: str) -> Dict[str, Any]: def extract_video_info(video_id: str) -> Dict[str, Any]:
""" """
Extract video information using yt-dlp with caching. Extract video information using yt-dlp with caching.
Args: Args:
video_id: YouTube video ID video_id: YouTube video ID
Returns: Returns:
Dictionary with video information including audio tracks Dictionary with video information including audio tracks
Caching: Caching:
Results are cached to avoid repeated requests to YouTube. Results are cached to avoid repeated requests to YouTube.
Cache size is limited to prevent memory issues. Cache size is limited to prevent memory issues.
@@ -90,25 +90,25 @@ def extract_video_info(video_id: str) -> Dict[str, Any]:
if not getattr(settings, 'ytdlp_enabled', True): if not getattr(settings, 'ytdlp_enabled', True):
logger.debug('yt-dlp integration is disabled') logger.debug('yt-dlp integration is disabled')
return {'error': 'yt-dlp disabled', 'audio_tracks': []} return {'error': 'yt-dlp disabled', 'audio_tracks': []}
url = f'https://www.youtube.com/watch?v={video_id}' url = f'https://www.youtube.com/watch?v={video_id}'
ydl_opts = _get_ytdlp_config() ydl_opts = _get_ytdlp_config()
try: try:
logger.debug(f'Extracting video info: {video_id}') logger.debug(f'Extracting video info: {video_id}')
with yt_dlp.YoutubeDL(ydl_opts) as ydl: with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False) info = ydl.extract_info(url, download=False)
if not info: if not info:
logger.warning(f'No info returned for video: {video_id}') logger.warning(f'No info returned for video: {video_id}')
return {'error': 'No info returned', 'audio_tracks': []} return {'error': 'No info returned', 'audio_tracks': []}
logger.debug(f'Extracted {len(info.get("formats", []))} formats') logger.info(f'Extracted {len(info.get("formats", []))} total formats')
# Extract audio tracks grouped by language # Extract audio tracks grouped by language
audio_tracks = _extract_audio_tracks(info) audio_tracks = _extract_audio_tracks(info)
return { return {
'video_id': video_id, 'video_id': video_id,
'title': info.get('title', ''), 'title': info.get('title', ''),
@@ -118,7 +118,7 @@ def extract_video_info(video_id: str) -> Dict[str, Any]:
'subtitles': info.get('subtitles', {}), 'subtitles': info.get('subtitles', {}),
'automatic_captions': info.get('automatic_captions', {}), 'automatic_captions': info.get('automatic_captions', {}),
} }
except yt_dlp.utils.DownloadError as e: except yt_dlp.utils.DownloadError as e:
logger.error(f'yt-dlp download error for {video_id}: {e}') logger.error(f'yt-dlp download error for {video_id}: {e}')
return {'error': str(e), 'audio_tracks': []} return {'error': str(e), 'audio_tracks': []}
@@ -130,21 +130,23 @@ def extract_video_info(video_id: str) -> Dict[str, Any]:
def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]: def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
""" """
Extract audio tracks from video info, grouped by language. Extract audio tracks from video info, grouped by language.
Returns a list of unique audio tracks (one per language), Returns a list of unique audio tracks (one per language),
keeping the highest quality for each language. keeping the highest quality for each language.
""" """
audio_by_language = {} audio_by_language = {}
all_formats = info.get('formats', []) all_formats = info.get('formats', [])
logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks')
for fmt in all_formats: for fmt in all_formats:
# Only audio-only formats # Only audio-only formats
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
if not has_audio or has_video: if not has_audio or has_video:
continue continue
# Extract language information # Extract language information
lang = ( lang = (
fmt.get('language') or fmt.get('language') or
@@ -152,17 +154,17 @@ def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
fmt.get('lang') or fmt.get('lang') or
'und' 'und'
) )
# Get language name # Get language name
lang_name = ( lang_name = (
fmt.get('language_name') or fmt.get('language_name') or
fmt.get('lang_name') or fmt.get('lang_name') or
get_language_name(lang) get_language_name(lang)
) )
# Get bitrate # Get bitrate
bitrate = fmt.get('abr') or fmt.get('tbr') or 0 bitrate = fmt.get('abr') or fmt.get('tbr') or 0
# Create track info # Create track info
track_info = { track_info = {
'language': lang, 'language': lang,
@@ -176,20 +178,21 @@ def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
'url': fmt.get('url'), 'url': fmt.get('url'),
'filesize': fmt.get('filesize'), 'filesize': fmt.get('filesize'),
} }
# Keep best quality per language # Keep best quality per language
lang_key = lang.lower() lang_key = lang.lower()
if lang_key not in audio_by_language: if lang_key not in audio_by_language:
audio_by_language[lang_key] = track_info audio_by_language[lang_key] = track_info
logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k')
else: else:
current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0) current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
if bitrate > current_bitrate: if bitrate > current_bitrate:
logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k')
audio_by_language[lang_key] = track_info audio_by_language[lang_key] = track_info
logger.debug(f'Updated {lang} to higher bitrate: {bitrate}')
# Convert to list and sort # Convert to list and sort
audio_tracks = list(audio_by_language.values()) audio_tracks = list(audio_by_language.values())
# Sort: English first, then by bitrate (descending) # Sort: English first, then by bitrate (descending)
audio_tracks.sort( audio_tracks.sort(
key=lambda x: ( key=lambda x: (
@@ -197,31 +200,31 @@ def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
-x.get('audio_bitrate', 0) -x.get('audio_bitrate', 0)
) )
) )
logger.debug(f'Found {len(audio_tracks)} unique audio tracks') logger.info(f'Extracted {len(audio_tracks)} unique audio languages')
for track in audio_tracks[:3]: # Log first 3 for track in audio_tracks[:5]: # Log first 5
logger.debug(f' - {track["language_name"]}: {track["audio_bitrate"]}k') logger.info(f' {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k')
return audio_tracks return audio_tracks
def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]: def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
""" """
Get subtitle URL for a specific language. Get subtitle URL for a specific language.
Args: Args:
video_id: YouTube video ID video_id: YouTube video ID
lang: Language code (default: 'en') lang: Language code (default: 'en')
Returns: Returns:
URL to subtitle file, or None if not available URL to subtitle file, or None if not available
""" """
info = extract_video_info(video_id) info = extract_video_info(video_id)
if 'error' in info: if 'error' in info:
logger.warning(f'Cannot get subtitles: {info["error"]}') logger.warning(f'Cannot get subtitles: {info["error"]}')
return None return None
# Try manual subtitles first # Try manual subtitles first
subtitles = info.get('subtitles', {}) subtitles = info.get('subtitles', {})
if lang in subtitles: if lang in subtitles:
@@ -229,7 +232,7 @@ def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
if sub.get('ext') == 'vtt': if sub.get('ext') == 'vtt':
logger.debug(f'Found manual {lang} subtitle') logger.debug(f'Found manual {lang} subtitle')
return sub.get('url') return sub.get('url')
# Try automatic captions # Try automatic captions
auto_captions = info.get('automatic_captions', {}) auto_captions = info.get('automatic_captions', {})
if lang in auto_captions: if lang in auto_captions:
@@ -237,7 +240,7 @@ def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
if sub.get('ext') == 'vtt': if sub.get('ext') == 'vtt':
logger.debug(f'Found automatic {lang} subtitle') logger.debug(f'Found automatic {lang} subtitle')
return sub.get('url') return sub.get('url')
logger.debug(f'No {lang} subtitle found') logger.debug(f'No {lang} subtitle found')
return None return None
@@ -249,20 +252,20 @@ def find_best_unified_format(
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
""" """
Find best unified (video+audio) format for specific language and quality. Find best unified (video+audio) format for specific language and quality.
Args: Args:
video_id: YouTube video ID video_id: YouTube video ID
audio_language: Preferred audio language audio_language: Preferred audio language
max_quality: Maximum video height (e.g., 720, 1080) max_quality: Maximum video height (e.g., 720, 1080)
Returns: Returns:
Format dict if found, None otherwise Format dict if found, None otherwise
""" """
info = extract_video_info(video_id) info = extract_video_info(video_id)
if 'error' in info or not info.get('formats'): if 'error' in info or not info.get('formats'):
return None return None
# Quality thresholds (minimum acceptable height as % of requested) # Quality thresholds (minimum acceptable height as % of requested)
thresholds = { thresholds = {
2160: 0.85, 2160: 0.85,
@@ -272,60 +275,60 @@ def find_best_unified_format(
480: 0.60, 480: 0.60,
360: 0.50, 360: 0.50,
} }
# Get threshold for requested quality # Get threshold for requested quality
threshold = 0.70 threshold = 0.70
for q, t in thresholds.items(): for q, t in thresholds.items():
if max_quality >= q: if max_quality >= q:
threshold = t threshold = t
break break
min_height = int(max_quality * threshold) min_height = int(max_quality * threshold)
logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p') logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
candidates = [] candidates = []
audio_lang_lower = audio_language.lower() audio_lang_lower = audio_language.lower()
for fmt in info['formats']: for fmt in info['formats']:
# Must have both video and audio # Must have both video and audio
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
if not (has_video and has_audio): if not (has_video and has_audio):
continue continue
# Skip HLS/DASH formats # Skip HLS/DASH formats
protocol = fmt.get('protocol', '') protocol = fmt.get('protocol', '')
format_id = str(fmt.get('format_id', '')) format_id = str(fmt.get('format_id', ''))
if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']): if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
continue continue
if format_id.startswith('9'): # HLS formats if format_id.startswith('9'): # HLS formats
continue continue
height = fmt.get('height', 0) height = fmt.get('height', 0)
if height < min_height: if height < min_height:
continue continue
# Language matching # Language matching
lang = ( lang = (
fmt.get('language') or fmt.get('language') or
fmt.get('audio_language') or fmt.get('audio_language') or
'en' 'en'
).lower() ).lower()
lang_match = ( lang_match = (
lang == audio_lang_lower or lang == audio_lang_lower or
lang.startswith(audio_lang_lower[:2]) or lang.startswith(audio_lang_lower[:2]) or
audio_lang_lower.startswith(lang[:2]) audio_lang_lower.startswith(lang[:2])
) )
if not lang_match: if not lang_match:
continue continue
# Calculate score # Calculate score
score = 0 score = 0
# Language match bonus # Language match bonus
if lang == audio_lang_lower: if lang == audio_lang_lower:
score += 10000 score += 10000
@@ -333,42 +336,42 @@ def find_best_unified_format(
score += 8000 score += 8000
else: else:
score += 5000 score += 5000
# Quality score # Quality score
quality_diff = abs(height - max_quality) quality_diff = abs(height - max_quality)
if height >= max_quality: if height >= max_quality:
score += 3000 - quality_diff score += 3000 - quality_diff
else: else:
score += 2000 - quality_diff score += 2000 - quality_diff
# Protocol preference # Protocol preference
if protocol in ('https', 'http'): if protocol in ('https', 'http'):
score += 500 score += 500
# Format preference # Format preference
if fmt.get('ext') == 'mp4': if fmt.get('ext') == 'mp4':
score += 100 score += 100
candidates.append({ candidates.append({
'format': fmt, 'format': fmt,
'score': score, 'score': score,
'height': height, 'height': height,
'lang': lang, 'lang': lang,
}) })
if not candidates: if not candidates:
logger.debug(f'No unified format found for {max_quality}p + {audio_language}') logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
return None return None
# Sort by score and return best # Sort by score and return best
candidates.sort(key=lambda x: x['score'], reverse=True) candidates.sort(key=lambda x: x['score'], reverse=True)
best = candidates[0] best = candidates[0]
logger.info( logger.info(
f'Selected unified format: {best["format"].get("format_id")} | ' f'Selected unified format: {best["format"].get("format_id")} | '
f'{best["lang"]} | {best["height"]}p | score={best["score"]}' f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
) )
return best['format'] return best['format']