remove yt-dlp, fix captions PO Token issue, fix 429 retry logic
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 13s
CI / test (push) Successful in 52s

- Remove yt-dlp entirely (modules, routes, settings, dependency)
  Was blocking page loads by running synchronously in gevent
- Fix captions: use Android client caption URLs (no PO Token needed)
  instead of web timedtext URLs that YouTube now blocks
- Fix 429 retry: fail immediately without Tor (same IP = pointless retry)
  Was causing ~27s delays with exponential backoff
- Accept ytdlp_enabled as legacy setting to avoid warning on startup
This commit is contained in:
2026-03-27 20:47:44 -05:00
parent 56ecd6cb1b
commit 22c72aa842
9 changed files with 83 additions and 696 deletions

View File

@@ -8,5 +8,4 @@ urllib3>=1.24.1
defusedxml>=0.5.0
cachetools>=4.0.0
stem>=1.8.0
yt-dlp>=2026.01.01
requests>=2.25.0

View File

@@ -340,15 +340,6 @@ Archive: https://archive.ph/OZQbN''',
'hidden': True,
}),
('ytdlp_enabled', {
'type': bool,
'default': True,
'comment': '''Enable yt-dlp integration for multi-language audio and subtitles''',
'hidden': False,
'label': 'Enable yt-dlp integration',
'category': 'playback',
}),
('settings_version', {
'type': int,
'default': 6,
@@ -359,7 +350,8 @@ Archive: https://archive.ph/OZQbN''',
program_directory = os.path.dirname(os.path.realpath(__file__))
acceptable_targets = SETTINGS_INFO.keys() | {
'enable_comments', 'enable_related_videos', 'preferred_video_codec'
'enable_comments', 'enable_related_videos', 'preferred_video_codec',
'ytdlp_enabled',
}

View File

@@ -86,15 +86,6 @@
{% endfor %}
</select>
{% if audio_tracks and audio_tracks|length > 1 %}
<select id="audio-language-select" autocomplete="off" title="Audio language">
{% for track in audio_tracks %}
<option value="{{ track.get('track_id', track['language']) }}" {{ 'selected' if loop.index0 == 0 else '' }}>
🔊 {{ track['language_name'] }}{% if track.get('is_default') %} (Default){% endif %}
</option>
{% endfor %}
</select>
{% endif %}
{% endif %}
</div>
<input class="v-checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox">
@@ -257,37 +248,6 @@
// @license-end
</script>
<!-- Audio language selector handler -->
<script>
// @license magnet:?xt=urn:btih:0b31508aeb0634b347b8270c7bee4d411b5d4109&dn=agpl-3.0.txt AGPL-v3-or-Later
(function() {
'use strict';
const audioSelect = document.getElementById('audio-language-select');
const qualitySelect = document.getElementById('quality-select');
if (audioSelect && qualitySelect) {
audioSelect.addEventListener('change', function() {
const selectedAudio = this.value;
const selectedQuality = qualitySelect.value;
// Parse current quality selection
let qualityData;
try {
qualityData = JSON.parse(selectedQuality);
} catch(e) {
return;
}
// Reload video with new audio language
const currentUrl = new URL(window.location.href);
currentUrl.searchParams.set('audio_lang', selectedAudio);
window.location.href = currentUrl.toString();
});
}
}());
// @license-end
</script>
<script src="/youtube.com/static/js/common.js"></script>
<script src="/youtube.com/static/js/transcript-table.js"></script>
{% if settings.use_video_player == 2 %}

View File

@@ -367,34 +367,25 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
response.getheader('Set-Cookie') or '')
ip = ip.group(1) if ip else None
# If this is the last attempt, raise error
# Without Tor, no point retrying with same IP
if not use_tor or not settings.route_tor:
logger.warning('Rate limited (429). Enable Tor routing to retry with new IP.')
raise FetchError('429', reason=response.reason, ip=ip)
# Tor: exhausted retries
if attempt >= max_retries - 1:
if not use_tor or not settings.route_tor:
logger.warning(f'YouTube returned 429 but Tor is not enabled. Consider enabling Tor routing.')
raise FetchError('429', reason=response.reason, ip=ip)
else:
# Tor is enabled but we've exhausted retries
logger.error(f'YouTube blocked request - Tor exit node overutilized after {max_retries} retries. Exit IP: {ip}')
raise FetchError('429', reason=response.reason, ip=ip,
error_message='Tor exit node overutilized after multiple retries')
logger.error(f'Rate limited after {max_retries} retries. Exit IP: {ip}')
raise FetchError('429', reason=response.reason, ip=ip,
error_message='Tor exit node overutilized after multiple retries')
# For Tor: get new identity immediately on 429
if use_tor and settings.route_tor:
logger.info(f'YouTube blocked request - Tor exit node overutilized. Exit IP: {ip}. Getting new identity...')
error = tor_manager.new_identity(start_time)
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
else:
continue # retry with new identity
# For non-Tor: exponential backoff
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.info(f'Rate limited (429). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
continue # retry
# Tor: get new identity and retry
logger.info(f'Rate limited. Getting new Tor identity... (IP: {ip})')
error = tor_manager.new_identity(start_time)
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
continue # retry with new identity
# Check for client errors (400, 404) - don't retry these
if response.status == 400:

View File

@@ -180,8 +180,34 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
label += ' (Automatic)'
if trans_lang:
label += ' -> ' + trans_lang
# Try to use Android caption URL directly (no PO Token needed)
caption_url = None
for track in info.get('_android_caption_tracks', []):
track_lang = track.get('languageCode', '')
track_kind = track.get('kind', '')
if track_lang == lang and (
(auto and track_kind == 'asr') or
(not auto and track_kind != 'asr')
):
caption_url = track.get('baseUrl')
break
if caption_url:
# Add format
if '&fmt=' in caption_url:
caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url)
else:
caption_url += '&fmt=vtt'
if trans_lang:
caption_url += '&tlang=' + trans_lang
url = util.prefix_url(caption_url)
else:
# Fallback to old method
url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang))
return {
'url': util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang)),
'url': url,
'label': label,
'srclang': trans_lang[0:2] if trans_lang else lang[0:2],
'on': False,
@@ -387,6 +413,19 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
info = tasks[0].value or {}
player_response = tasks[1].value or {}
# Save android_vr caption tracks (no PO Token needed for these URLs)
if isinstance(player_response, str):
try:
pr_data = json.loads(player_response)
except Exception:
pr_data = {}
else:
pr_data = player_response or {}
android_caption_tracks = yt_data_extract.deep_get(
pr_data, 'captions', 'playerCaptionsTracklistRenderer',
'captionTracks', default=[])
info['_android_caption_tracks'] = android_caption_tracks
yt_data_extract.update_with_new_urls(info, player_response)
# Fallback to 'ios' if no valid URLs are found
@@ -696,30 +735,6 @@ def get_watch_page(video_id=None):
pair_sources = source_info['pair_sources']
uni_idx, pair_idx = source_info['uni_idx'], source_info['pair_idx']
# Extract audio tracks using yt-dlp for multi-language support
audio_tracks = []
try:
from youtube import ytdlp_integration
logger.info(f'Extracting audio tracks for video: {video_id}')
ytdlp_info = ytdlp_integration.extract_video_info_ytdlp(video_id)
audio_tracks = ytdlp_info.get('audio_tracks', [])
if audio_tracks:
logger.info(f'✓ Found {len(audio_tracks)} audio tracks:')
for i, track in enumerate(audio_tracks[:10], 1): # Log first 10
logger.info(f' [{i}] {track["language_name"]} ({track["language"]}) - '
f'bitrate: {track.get("audio_bitrate", "N/A")}k, '
f'codec: {track.get("acodec", "N/A")}, '
f'format_id: {track.get("format_id", "N/A")}')
if len(audio_tracks) > 10:
logger.info(f' ... and {len(audio_tracks) - 10} more')
else:
logger.warning(f'No audio tracks found for video {video_id}')
except Exception as e:
logger.error(f'Failed to extract audio tracks: {e}', exc_info=True)
audio_tracks = []
pair_quality = yt_data_extract.deep_get(pair_sources, pair_idx, 'quality')
uni_quality = yt_data_extract.deep_get(uni_sources, uni_idx, 'quality')
@@ -843,9 +858,7 @@ def get_watch_page(video_id=None):
'playlist': info['playlist'],
'related': info['related_videos'],
'playability_error': info['playability_error'],
'audio_tracks': audio_tracks,
},
audio_tracks = audio_tracks,
font_family = youtube.font_choices[settings.font], # for embed page
**source_info,
using_pair_sources = using_pair_sources,
@@ -854,16 +867,13 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>')
def get_captions(dummy):
url = 'https://www.youtube.com' + request.full_path
try:
result = util.fetch_url('https://www.youtube.com' + request.full_path)
result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"")
return result
except util.FetchError as e:
# Return empty captions gracefully instead of error page
logger.warning(f'Failed to fetch captions: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
return flask.Response(result, mimetype='text/vtt')
except Exception as e:
logger.error(f'Unexpected error fetching captions: {e}')
logger.debug(f'Caption fetch failed: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
@@ -929,18 +939,3 @@ def get_transcript(caption_path):
return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8')
# ============================================================================
# yt-dlp Integration Routes
# ============================================================================
@yt_app.route('/ytl-api/video-with-audio/<video_id>')
def proxy_video_with_audio(video_id):
"""
Proxy para servir video con audio específico usando yt-dlp
"""
from youtube import ytdlp_proxy
audio_lang = request.args.get('lang', 'en')
max_quality = int(request.args.get('quality', 720))
return ytdlp_proxy.stream_video_with_audio(video_id, audio_lang, max_quality)

View File

@@ -628,6 +628,7 @@ def extract_watch_info(polymer_json):
info['manual_caption_languages'] = []
info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url
info['translation_languages'] = []
info['_caption_track_urls'] = {} # lang_code -> full baseUrl from player response
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
# Sometimes the above playerCaptionsRender is randomly missing
@@ -658,6 +659,10 @@ def extract_watch_info(polymer_json):
else:
info['manual_caption_languages'].append(lang_code)
base_url = caption_track.get('baseUrl', '')
# Store the full URL from the player response (includes valid tokens)
if base_url:
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name
@@ -825,6 +830,21 @@ def captions_available(info):
def get_caption_url(info, language, format, automatic=False, translation_language=None):
'''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
# Try to use the direct URL from the player response first (has valid tokens)
track_key = language + ('_asr' if automatic else '')
direct_url = info.get('_caption_track_urls', {}).get(track_key)
if direct_url:
url = direct_url
# Override format
if '&fmt=' in url:
url = re.sub(r'&fmt=[^&]*', '&fmt=' + format, url)
else:
url += '&fmt=' + format
if translation_language:
url += '&tlang=' + translation_language
return url
# Fallback to base_url construction
url = info['_captions_base_url']
if not url:
return None

View File

@@ -1,78 +0,0 @@
#!/usr/bin/env python3
"""
yt-dlp integration wrapper for backward compatibility.
This module now uses the centralized ytdlp_service for all operations.
"""
import logging
from youtube.ytdlp_service import (
extract_video_info,
get_language_name,
clear_cache,
get_cache_info,
)
logger = logging.getLogger(__name__)
def extract_video_info_ytdlp(video_id):
"""
Extract video information using yt-dlp (with caching).
This is a wrapper around ytdlp_service.extract_video_info()
for backward compatibility.
Args:
video_id: YouTube video ID
Returns:
Dictionary with audio_tracks, formats, title, duration
"""
logger.debug(f'Extracting video info (legacy API): {video_id}')
info = extract_video_info(video_id)
# Convert to legacy format for backward compatibility
return {
'audio_tracks': info.get('audio_tracks', []),
'all_audio_formats': info.get('formats', []),
'formats': info.get('formats', []),
'title': info.get('title', ''),
'duration': info.get('duration', 0),
'error': info.get('error'),
}
def get_audio_formats_for_language(video_id, language='en'):
"""
Get available audio formats for a specific language.
Args:
video_id: YouTube video ID
language: Language code (default: 'en')
Returns:
List of audio format dicts
"""
info = extract_video_info_ytdlp(video_id)
if 'error' in info:
logger.warning(f'Cannot get audio formats: {info["error"]}')
return []
audio_formats = []
for track in info.get('audio_tracks', []):
if track['language'] == language:
audio_formats.append(track)
logger.debug(f'Found {len(audio_formats)} {language} audio formats')
return audio_formats
__all__ = [
'extract_video_info_ytdlp',
'get_audio_formats_for_language',
'get_language_name',
'clear_cache',
'get_cache_info',
]

View File

@@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""
Proxy for serving videos with specific audio using yt-dlp.
This module provides streaming functionality for unified formats
with specific audio languages.
"""
import logging
from flask import Response, request, stream_with_context
import urllib.request
import urllib.error
from youtube.ytdlp_service import find_best_unified_format
logger = logging.getLogger(__name__)
def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quality: int = 720):
"""
Stream video with specific audio language.
Args:
video_id: YouTube video ID
audio_language: Preferred audio language (default: 'en')
max_quality: Maximum video height (default: 720)
Returns:
Flask Response with video stream, or 404 if not available
"""
logger.info(f'Stream request: {video_id} | audio={audio_language} | quality={max_quality}p')
# Find best unified format
best_format = find_best_unified_format(video_id, audio_language, max_quality)
if not best_format:
logger.info(f'No suitable unified format found, returning 404 to trigger fallback')
return Response('No suitable unified format available', status=404)
url = best_format.get('url')
if not url:
logger.error('Format found but no URL available')
return Response('Format URL not available', status=500)
logger.debug(f'Streaming from: {url[:80]}...')
# Stream the video
try:
req = urllib.request.Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
req.add_header('Accept', '*/*')
# Add Range header if client requests it
if 'Range' in request.headers:
req.add_header('Range', request.headers['Range'])
logger.debug(f'Range request: {request.headers["Range"]}')
resp = urllib.request.urlopen(req, timeout=60)
def generate():
"""Generator for streaming video chunks."""
try:
while True:
chunk = resp.read(65536) # 64KB chunks
if not chunk:
break
yield chunk
except Exception as e:
logger.error(f'Stream error: {e}')
raise
# Build response headers
response_headers = {
'Content-Type': resp.headers.get('Content-Type', 'video/mp4'),
'Access-Control-Allow-Origin': '*',
}
# Copy important headers
for header in ['Content-Length', 'Content-Range', 'Accept-Ranges']:
if header in resp.headers:
response_headers[header] = resp.headers[header]
status_code = resp.getcode()
logger.info(f'Streaming started: {status_code}')
return Response(
stream_with_context(generate()),
status=status_code,
headers=response_headers,
direct_passthrough=True
)
except urllib.error.HTTPError as e:
logger.error(f'HTTP error streaming: {e.code} {e.reason}')
return Response(f'Error: {e.code} {e.reason}', status=e.code)
except urllib.error.URLError as e:
logger.error(f'URL error streaming: {e.reason}')
return Response(f'Network error: {e.reason}', status=502)
except Exception as e:
logger.error(f'Streaming error: {e}', exc_info=True)
return Response(f'Error: {e}', status=500)

View File

@@ -1,393 +0,0 @@
#!/usr/bin/env python3
"""
Centralized yt-dlp integration with caching, logging, and error handling.
This module provides a clean interface for yt-dlp functionality:
- Multi-language audio track extraction
- Subtitle extraction
- Age-restricted video support
All yt-dlp usage should go through this module for consistency.
"""
import logging
from functools import lru_cache
from typing import Dict, List, Optional, Any
import yt_dlp
import settings
logger = logging.getLogger(__name__)
# Language name mapping
LANGUAGE_NAMES = {
'en': 'English',
'es': 'Español',
'fr': 'Français',
'de': 'Deutsch',
'it': 'Italiano',
'pt': 'Português',
'ru': 'Русский',
'ja': '日本語',
'ko': '한국어',
'zh': '中文',
'ar': 'العربية',
'hi': 'हिन्दी',
'und': 'Unknown',
'zxx': 'No linguistic content',
}
def get_language_name(lang_code: str) -> str:
"""Convert ISO 639-1/2 language code to readable name."""
if not lang_code:
return 'Unknown'
return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper())
def _get_ytdlp_config() -> Dict[str, Any]:
"""Get yt-dlp configuration from settings."""
config = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'format': 'best',
'skip_download': True,
'socket_timeout': 30,
'extractor_retries': 3,
'http_chunk_size': 10485760, # 10MB
}
# Configure Tor proxy if enabled
if settings.route_tor:
config['proxy'] = 'socks5://127.0.0.1:9150'
logger.debug('Tor proxy enabled for yt-dlp')
# Use cookies if available
import os
cookies_file = 'youtube_cookies.txt'
if os.path.exists(cookies_file):
config['cookiefile'] = cookies_file
logger.debug('Using cookies file for yt-dlp')
return config
@lru_cache(maxsize=128)
def extract_video_info(video_id: str) -> Dict[str, Any]:
"""
Extract video information using yt-dlp with caching.
Args:
video_id: YouTube video ID
Returns:
Dictionary with video information including audio tracks
Caching:
Results are cached to avoid repeated requests to YouTube.
Cache size is limited to prevent memory issues.
"""
# Check if yt-dlp is enabled
if not getattr(settings, 'ytdlp_enabled', True):
logger.debug('yt-dlp integration is disabled')
return {'error': 'yt-dlp disabled', 'audio_tracks': []}
url = f'https://www.youtube.com/watch?v={video_id}'
ydl_opts = _get_ytdlp_config()
try:
logger.debug(f'Extracting video info: {video_id}')
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info:
logger.warning(f'No info returned for video: {video_id}')
return {'error': 'No info returned', 'audio_tracks': []}
logger.info(f'Extracted {len(info.get("formats", []))} total formats')
# Extract audio tracks grouped by language
audio_tracks = _extract_audio_tracks(info)
return {
'video_id': video_id,
'title': info.get('title', ''),
'duration': info.get('duration', 0),
'audio_tracks': audio_tracks,
'formats': info.get('formats', []),
'subtitles': info.get('subtitles', {}),
'automatic_captions': info.get('automatic_captions', {}),
}
except yt_dlp.utils.DownloadError as e:
logger.error(f'yt-dlp download error for {video_id}: {e}')
return {'error': str(e), 'audio_tracks': []}
except Exception as e:
logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True)
return {'error': str(e), 'audio_tracks': []}
def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract audio tracks from video info, grouped by language.
Returns a list of unique audio tracks (one per language),
keeping the highest quality for each language.
"""
audio_by_language = {}
all_formats = info.get('formats', [])
logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks')
for fmt in all_formats:
# Only audio-only formats
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
if not has_audio or has_video:
continue
# Extract language information
lang = (
fmt.get('language') or
fmt.get('audio_language') or
fmt.get('lang') or
'und'
)
# Get language name
lang_name = (
fmt.get('language_name') or
fmt.get('lang_name') or
get_language_name(lang)
)
# Get bitrate
bitrate = fmt.get('abr') or fmt.get('tbr') or 0
# Create track info
track_info = {
'language': lang,
'language_name': lang_name,
'format_id': str(fmt.get('format_id', '')),
'itag': str(fmt.get('format_id', '')),
'ext': fmt.get('ext'),
'acodec': fmt.get('acodec'),
'audio_bitrate': int(bitrate) if bitrate else 0,
'audio_sample_rate': fmt.get('asr'),
'url': fmt.get('url'),
'filesize': fmt.get('filesize'),
}
# Keep best quality per language
lang_key = lang.lower()
if lang_key not in audio_by_language:
audio_by_language[lang_key] = track_info
logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k')
else:
current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
if bitrate > current_bitrate:
logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k')
audio_by_language[lang_key] = track_info
# Convert to list and sort
audio_tracks = list(audio_by_language.values())
# Sort: English first, then by bitrate (descending)
audio_tracks.sort(
key=lambda x: (
0 if x['language'] == 'en' else 1,
-x.get('audio_bitrate', 0)
)
)
logger.info(f'Extracted {len(audio_tracks)} unique audio languages')
for track in audio_tracks[:5]: # Log first 5
logger.info(f'{track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k')
return audio_tracks
def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
"""
Get subtitle URL for a specific language.
Args:
video_id: YouTube video ID
lang: Language code (default: 'en')
Returns:
URL to subtitle file, or None if not available
"""
info = extract_video_info(video_id)
if 'error' in info:
logger.warning(f'Cannot get subtitles: {info["error"]}')
return None
# Try manual subtitles first
subtitles = info.get('subtitles', {})
if lang in subtitles:
for sub in subtitles[lang]:
if sub.get('ext') == 'vtt':
logger.debug(f'Found manual {lang} subtitle')
return sub.get('url')
# Try automatic captions
auto_captions = info.get('automatic_captions', {})
if lang in auto_captions:
for sub in auto_captions[lang]:
if sub.get('ext') == 'vtt':
logger.debug(f'Found automatic {lang} subtitle')
return sub.get('url')
logger.debug(f'No {lang} subtitle found')
return None
def find_best_unified_format(
video_id: str,
audio_language: str = 'en',
max_quality: int = 720
) -> Optional[Dict[str, Any]]:
"""
Find best unified (video+audio) format for specific language and quality.
Args:
video_id: YouTube video ID
audio_language: Preferred audio language
max_quality: Maximum video height (e.g., 720, 1080)
Returns:
Format dict if found, None otherwise
"""
info = extract_video_info(video_id)
if 'error' in info or not info.get('formats'):
return None
# Quality thresholds (minimum acceptable height as % of requested)
thresholds = {
2160: 0.85,
1440: 0.80,
1080: 0.70,
720: 0.70,
480: 0.60,
360: 0.50,
}
# Get threshold for requested quality
threshold = 0.70
for q, t in thresholds.items():
if max_quality >= q:
threshold = t
break
min_height = int(max_quality * threshold)
logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
candidates = []
audio_lang_lower = audio_language.lower()
for fmt in info['formats']:
# Must have both video and audio
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
if not (has_video and has_audio):
continue
# Skip HLS/DASH formats
protocol = fmt.get('protocol', '')
format_id = str(fmt.get('format_id', ''))
if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
continue
if format_id.startswith('9'): # HLS formats
continue
height = fmt.get('height', 0)
if height < min_height:
continue
# Language matching
lang = (
fmt.get('language') or
fmt.get('audio_language') or
'en'
).lower()
lang_match = (
lang == audio_lang_lower or
lang.startswith(audio_lang_lower[:2]) or
audio_lang_lower.startswith(lang[:2])
)
if not lang_match:
continue
# Calculate score
score = 0
# Language match bonus
if lang == audio_lang_lower:
score += 10000
elif lang.startswith(audio_lang_lower[:2]):
score += 8000
else:
score += 5000
# Quality score
quality_diff = abs(height - max_quality)
if height >= max_quality:
score += 3000 - quality_diff
else:
score += 2000 - quality_diff
# Protocol preference
if protocol in ('https', 'http'):
score += 500
# Format preference
if fmt.get('ext') == 'mp4':
score += 100
candidates.append({
'format': fmt,
'score': score,
'height': height,
'lang': lang,
})
if not candidates:
logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
return None
# Sort by score and return best
candidates.sort(key=lambda x: x['score'], reverse=True)
best = candidates[0]
logger.info(
f'Selected unified format: {best["format"].get("format_id")} | '
f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
)
return best['format']
def clear_cache():
"""Clear the video info cache."""
extract_video_info.cache_clear()
logger.info('yt-dlp cache cleared')
def get_cache_info() -> Dict[str, Any]:
"""Get cache statistics."""
cache_info = extract_video_info.cache_info()
return {
'hits': cache_info.hits,
'misses': cache_info.misses,
'size': cache_info.currsize,
'maxsize': cache_info.maxsize,
'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0,
}