yt-dlp
This commit is contained in:
390
youtube/ytdlp_service.py
Normal file
390
youtube/ytdlp_service.py
Normal file
@@ -0,0 +1,390 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Centralized yt-dlp integration with caching, logging, and error handling.
|
||||
|
||||
This module provides a clean interface for yt-dlp functionality:
|
||||
- Multi-language audio track extraction
|
||||
- Subtitle extraction
|
||||
- Age-restricted video support
|
||||
|
||||
All yt-dlp usage should go through this module for consistency.
|
||||
"""
|
||||
import logging
|
||||
from functools import lru_cache
|
||||
from typing import Dict, List, Optional, Any
|
||||
import yt_dlp
|
||||
import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Language name mapping
|
||||
LANGUAGE_NAMES = {
|
||||
'en': 'English',
|
||||
'es': 'Español',
|
||||
'fr': 'Français',
|
||||
'de': 'Deutsch',
|
||||
'it': 'Italiano',
|
||||
'pt': 'Português',
|
||||
'ru': 'Русский',
|
||||
'ja': '日本語',
|
||||
'ko': '한국어',
|
||||
'zh': '中文',
|
||||
'ar': 'العربية',
|
||||
'hi': 'हिन्दी',
|
||||
'und': 'Unknown',
|
||||
'zxx': 'No linguistic content',
|
||||
}
|
||||
|
||||
|
||||
def get_language_name(lang_code: str) -> str:
|
||||
"""Convert ISO 639-1/2 language code to readable name."""
|
||||
if not lang_code:
|
||||
return 'Unknown'
|
||||
return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper())
|
||||
|
||||
|
||||
def _get_ytdlp_config() -> Dict[str, Any]:
|
||||
"""Get yt-dlp configuration from settings."""
|
||||
config = {
|
||||
'quiet': True,
|
||||
'no_warnings': True,
|
||||
'extract_flat': False,
|
||||
'format': 'best',
|
||||
'skip_download': True,
|
||||
'socket_timeout': 30,
|
||||
'extractor_retries': 3,
|
||||
'http_chunk_size': 10485760, # 10MB
|
||||
}
|
||||
|
||||
# Configure Tor proxy if enabled
|
||||
if settings.route_tor:
|
||||
config['proxy'] = 'socks5://127.0.0.1:9150'
|
||||
logger.debug('Tor proxy enabled for yt-dlp')
|
||||
|
||||
# Use cookies if available
|
||||
import os
|
||||
cookies_file = 'youtube_cookies.txt'
|
||||
if os.path.exists(cookies_file):
|
||||
config['cookiefile'] = cookies_file
|
||||
logger.debug('Using cookies file for yt-dlp')
|
||||
|
||||
return config
|
||||
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def extract_video_info(video_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Extract video information using yt-dlp with caching.
|
||||
|
||||
Args:
|
||||
video_id: YouTube video ID
|
||||
|
||||
Returns:
|
||||
Dictionary with video information including audio tracks
|
||||
|
||||
Caching:
|
||||
Results are cached to avoid repeated requests to YouTube.
|
||||
Cache size is limited to prevent memory issues.
|
||||
"""
|
||||
# Check if yt-dlp is enabled
|
||||
if not getattr(settings, 'ytdlp_enabled', True):
|
||||
logger.debug('yt-dlp integration is disabled')
|
||||
return {'error': 'yt-dlp disabled', 'audio_tracks': []}
|
||||
|
||||
url = f'https://www.youtube.com/watch?v={video_id}'
|
||||
ydl_opts = _get_ytdlp_config()
|
||||
|
||||
try:
|
||||
logger.debug(f'Extracting video info: {video_id}')
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
|
||||
if not info:
|
||||
logger.warning(f'No info returned for video: {video_id}')
|
||||
return {'error': 'No info returned', 'audio_tracks': []}
|
||||
|
||||
logger.debug(f'Extracted {len(info.get("formats", []))} formats')
|
||||
|
||||
# Extract audio tracks grouped by language
|
||||
audio_tracks = _extract_audio_tracks(info)
|
||||
|
||||
return {
|
||||
'video_id': video_id,
|
||||
'title': info.get('title', ''),
|
||||
'duration': info.get('duration', 0),
|
||||
'audio_tracks': audio_tracks,
|
||||
'formats': info.get('formats', []),
|
||||
'subtitles': info.get('subtitles', {}),
|
||||
'automatic_captions': info.get('automatic_captions', {}),
|
||||
}
|
||||
|
||||
except yt_dlp.utils.DownloadError as e:
|
||||
logger.error(f'yt-dlp download error for {video_id}: {e}')
|
||||
return {'error': str(e), 'audio_tracks': []}
|
||||
except Exception as e:
|
||||
logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True)
|
||||
return {'error': str(e), 'audio_tracks': []}
|
||||
|
||||
|
||||
def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Extract audio tracks from video info, grouped by language.
|
||||
|
||||
Returns a list of unique audio tracks (one per language),
|
||||
keeping the highest quality for each language.
|
||||
"""
|
||||
audio_by_language = {}
|
||||
all_formats = info.get('formats', [])
|
||||
|
||||
for fmt in all_formats:
|
||||
# Only audio-only formats
|
||||
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
|
||||
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
|
||||
|
||||
if not has_audio or has_video:
|
||||
continue
|
||||
|
||||
# Extract language information
|
||||
lang = (
|
||||
fmt.get('language') or
|
||||
fmt.get('audio_language') or
|
||||
fmt.get('lang') or
|
||||
'und'
|
||||
)
|
||||
|
||||
# Get language name
|
||||
lang_name = (
|
||||
fmt.get('language_name') or
|
||||
fmt.get('lang_name') or
|
||||
get_language_name(lang)
|
||||
)
|
||||
|
||||
# Get bitrate
|
||||
bitrate = fmt.get('abr') or fmt.get('tbr') or 0
|
||||
|
||||
# Create track info
|
||||
track_info = {
|
||||
'language': lang,
|
||||
'language_name': lang_name,
|
||||
'format_id': str(fmt.get('format_id', '')),
|
||||
'itag': str(fmt.get('format_id', '')),
|
||||
'ext': fmt.get('ext'),
|
||||
'acodec': fmt.get('acodec'),
|
||||
'audio_bitrate': int(bitrate) if bitrate else 0,
|
||||
'audio_sample_rate': fmt.get('asr'),
|
||||
'url': fmt.get('url'),
|
||||
'filesize': fmt.get('filesize'),
|
||||
}
|
||||
|
||||
# Keep best quality per language
|
||||
lang_key = lang.lower()
|
||||
if lang_key not in audio_by_language:
|
||||
audio_by_language[lang_key] = track_info
|
||||
else:
|
||||
current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
|
||||
if bitrate > current_bitrate:
|
||||
audio_by_language[lang_key] = track_info
|
||||
logger.debug(f'Updated {lang} to higher bitrate: {bitrate}')
|
||||
|
||||
# Convert to list and sort
|
||||
audio_tracks = list(audio_by_language.values())
|
||||
|
||||
# Sort: English first, then by bitrate (descending)
|
||||
audio_tracks.sort(
|
||||
key=lambda x: (
|
||||
0 if x['language'] == 'en' else 1,
|
||||
-x.get('audio_bitrate', 0)
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f'Found {len(audio_tracks)} unique audio tracks')
|
||||
for track in audio_tracks[:3]: # Log first 3
|
||||
logger.debug(f' - {track["language_name"]}: {track["audio_bitrate"]}k')
|
||||
|
||||
return audio_tracks
|
||||
|
||||
|
||||
def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
|
||||
"""
|
||||
Get subtitle URL for a specific language.
|
||||
|
||||
Args:
|
||||
video_id: YouTube video ID
|
||||
lang: Language code (default: 'en')
|
||||
|
||||
Returns:
|
||||
URL to subtitle file, or None if not available
|
||||
"""
|
||||
info = extract_video_info(video_id)
|
||||
|
||||
if 'error' in info:
|
||||
logger.warning(f'Cannot get subtitles: {info["error"]}')
|
||||
return None
|
||||
|
||||
# Try manual subtitles first
|
||||
subtitles = info.get('subtitles', {})
|
||||
if lang in subtitles:
|
||||
for sub in subtitles[lang]:
|
||||
if sub.get('ext') == 'vtt':
|
||||
logger.debug(f'Found manual {lang} subtitle')
|
||||
return sub.get('url')
|
||||
|
||||
# Try automatic captions
|
||||
auto_captions = info.get('automatic_captions', {})
|
||||
if lang in auto_captions:
|
||||
for sub in auto_captions[lang]:
|
||||
if sub.get('ext') == 'vtt':
|
||||
logger.debug(f'Found automatic {lang} subtitle')
|
||||
return sub.get('url')
|
||||
|
||||
logger.debug(f'No {lang} subtitle found')
|
||||
return None
|
||||
|
||||
|
||||
def find_best_unified_format(
|
||||
video_id: str,
|
||||
audio_language: str = 'en',
|
||||
max_quality: int = 720
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Find best unified (video+audio) format for specific language and quality.
|
||||
|
||||
Args:
|
||||
video_id: YouTube video ID
|
||||
audio_language: Preferred audio language
|
||||
max_quality: Maximum video height (e.g., 720, 1080)
|
||||
|
||||
Returns:
|
||||
Format dict if found, None otherwise
|
||||
"""
|
||||
info = extract_video_info(video_id)
|
||||
|
||||
if 'error' in info or not info.get('formats'):
|
||||
return None
|
||||
|
||||
# Quality thresholds (minimum acceptable height as % of requested)
|
||||
thresholds = {
|
||||
2160: 0.85,
|
||||
1440: 0.80,
|
||||
1080: 0.70,
|
||||
720: 0.70,
|
||||
480: 0.60,
|
||||
360: 0.50,
|
||||
}
|
||||
|
||||
# Get threshold for requested quality
|
||||
threshold = 0.70
|
||||
for q, t in thresholds.items():
|
||||
if max_quality >= q:
|
||||
threshold = t
|
||||
break
|
||||
|
||||
min_height = int(max_quality * threshold)
|
||||
logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
|
||||
|
||||
candidates = []
|
||||
audio_lang_lower = audio_language.lower()
|
||||
|
||||
for fmt in info['formats']:
|
||||
# Must have both video and audio
|
||||
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
|
||||
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
|
||||
|
||||
if not (has_video and has_audio):
|
||||
continue
|
||||
|
||||
# Skip HLS/DASH formats
|
||||
protocol = fmt.get('protocol', '')
|
||||
format_id = str(fmt.get('format_id', ''))
|
||||
|
||||
if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
|
||||
continue
|
||||
if format_id.startswith('9'): # HLS formats
|
||||
continue
|
||||
|
||||
height = fmt.get('height', 0)
|
||||
if height < min_height:
|
||||
continue
|
||||
|
||||
# Language matching
|
||||
lang = (
|
||||
fmt.get('language') or
|
||||
fmt.get('audio_language') or
|
||||
'en'
|
||||
).lower()
|
||||
|
||||
lang_match = (
|
||||
lang == audio_lang_lower or
|
||||
lang.startswith(audio_lang_lower[:2]) or
|
||||
audio_lang_lower.startswith(lang[:2])
|
||||
)
|
||||
|
||||
if not lang_match:
|
||||
continue
|
||||
|
||||
# Calculate score
|
||||
score = 0
|
||||
|
||||
# Language match bonus
|
||||
if lang == audio_lang_lower:
|
||||
score += 10000
|
||||
elif lang.startswith(audio_lang_lower[:2]):
|
||||
score += 8000
|
||||
else:
|
||||
score += 5000
|
||||
|
||||
# Quality score
|
||||
quality_diff = abs(height - max_quality)
|
||||
if height >= max_quality:
|
||||
score += 3000 - quality_diff
|
||||
else:
|
||||
score += 2000 - quality_diff
|
||||
|
||||
# Protocol preference
|
||||
if protocol in ('https', 'http'):
|
||||
score += 500
|
||||
|
||||
# Format preference
|
||||
if fmt.get('ext') == 'mp4':
|
||||
score += 100
|
||||
|
||||
candidates.append({
|
||||
'format': fmt,
|
||||
'score': score,
|
||||
'height': height,
|
||||
'lang': lang,
|
||||
})
|
||||
|
||||
if not candidates:
|
||||
logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
|
||||
return None
|
||||
|
||||
# Sort by score and return best
|
||||
candidates.sort(key=lambda x: x['score'], reverse=True)
|
||||
best = candidates[0]
|
||||
|
||||
logger.info(
|
||||
f'Selected unified format: {best["format"].get("format_id")} | '
|
||||
f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
|
||||
)
|
||||
|
||||
return best['format']
|
||||
|
||||
|
||||
def clear_cache():
|
||||
"""Clear the video info cache."""
|
||||
extract_video_info.cache_clear()
|
||||
logger.info('yt-dlp cache cleared')
|
||||
|
||||
|
||||
def get_cache_info() -> Dict[str, Any]:
|
||||
"""Get cache statistics."""
|
||||
cache_info = extract_video_info.cache_info()
|
||||
return {
|
||||
'hits': cache_info.hits,
|
||||
'misses': cache_info.misses,
|
||||
'size': cache_info.currsize,
|
||||
'maxsize': cache_info.maxsize,
|
||||
'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0,
|
||||
}
|
||||
Reference in New Issue
Block a user