1402 lines
55 KiB
Python
1402 lines
55 KiB
Python
import youtube
|
|
from youtube import yt_app
|
|
from youtube import util, comments, local_playlist, yt_data_extract
|
|
from youtube.util import time_utc_isoformat
|
|
import settings
|
|
|
|
from flask import request
|
|
import flask
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
import json
|
|
import gevent
|
|
import os
|
|
import math
|
|
import traceback
|
|
import urllib
|
|
import re
|
|
import urllib3.exceptions
|
|
from urllib.parse import parse_qs, urlencode
|
|
from types import SimpleNamespace
|
|
from math import ceil
|
|
|
|
|
|
try:
|
|
with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f:
|
|
decrypt_cache = json.loads(f.read())['decrypt_cache']
|
|
except FileNotFoundError:
|
|
decrypt_cache = {}
|
|
|
|
|
|
def codec_name(vcodec):
|
|
if vcodec.startswith('avc'):
|
|
return 'h264'
|
|
elif vcodec.startswith('av01'):
|
|
return 'av1'
|
|
elif vcodec.startswith('vp'):
|
|
return 'vp'
|
|
else:
|
|
return 'unknown'
|
|
|
|
|
|
def get_video_sources(info, target_resolution):
|
|
'''return dict with organized sources'''
|
|
audio_by_track = {}
|
|
video_only_sources = {}
|
|
uni_sources = []
|
|
pair_sources = []
|
|
|
|
for fmt in info['formats']:
|
|
if not all(fmt[attr] for attr in ('ext', 'url', 'itag')):
|
|
continue
|
|
if fmt['acodec'] and fmt['vcodec']:
|
|
if fmt.get('audio_track_is_default', True) is False:
|
|
continue
|
|
source = {'type': 'video/' + fmt['ext'],
|
|
'quality_string': short_video_quality_string(fmt)}
|
|
source['quality_string'] += ' (integrated)'
|
|
source.update(fmt)
|
|
uni_sources.append(source)
|
|
continue
|
|
if not (fmt['init_range'] and fmt['index_range']):
|
|
# Allow HLS-backed audio tracks (served locally, no init/index needed)
|
|
if not fmt.get('url', '').startswith('http://127.') and not '/ytl-api/' in fmt.get('url', ''):
|
|
continue
|
|
# Mark as HLS for frontend
|
|
fmt['is_hls'] = True
|
|
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
|
|
if fmt['bitrate']:
|
|
fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
|
|
source = {'type': 'audio/' + fmt['ext'],
|
|
'quality_string': audio_quality_string(fmt)}
|
|
source.update(fmt)
|
|
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"'
|
|
tid = fmt.get('audio_track_id') or 'default'
|
|
if tid not in audio_by_track:
|
|
audio_by_track[tid] = {
|
|
'name': fmt.get('audio_track_name') or 'Default',
|
|
'is_default': fmt.get('audio_track_is_default', True),
|
|
'sources': [],
|
|
}
|
|
audio_by_track[tid]['sources'].append(source)
|
|
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
|
|
if codec_name(fmt['vcodec']) == 'unknown':
|
|
continue
|
|
source = {'type': 'video/' + fmt['ext'],
|
|
'quality_string': short_video_quality_string(fmt)}
|
|
source.update(fmt)
|
|
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"'
|
|
quality = str(fmt['quality']) + 'p' + str(fmt['fps'])
|
|
video_only_sources.setdefault(quality, []).append(source)
|
|
|
|
audio_tracks = []
|
|
default_track_id = 'default'
|
|
for tid, ti in audio_by_track.items():
|
|
audio_tracks.append({'id': tid, 'name': ti['name'], 'is_default': ti['is_default']})
|
|
if ti['is_default']:
|
|
default_track_id = tid
|
|
audio_tracks.sort(key=lambda t: (not t['is_default'], t['name']))
|
|
|
|
default_audio = audio_by_track.get(default_track_id, {}).get('sources', [])
|
|
default_audio.sort(key=lambda s: s['audio_bitrate'])
|
|
uni_sources.sort(key=lambda src: src['quality'])
|
|
webm_audios = [a for a in default_audio if a['ext'] == 'webm']
|
|
mp4_audios = [a for a in default_audio if a['ext'] == 'mp4']
|
|
|
|
for quality_string, sources in video_only_sources.items():
|
|
# choose an audio source to go with it
|
|
# 0.5 is semiarbitrary empirical constant to spread audio sources
|
|
# between 144p and 1080p. Use something better eventually.
|
|
quality, fps = map(int, quality_string.split('p'))
|
|
target_audio_bitrate = quality*fps/30*0.5
|
|
pair_info = {
|
|
'quality_string': quality_string,
|
|
'quality': quality,
|
|
'height': sources[0]['height'],
|
|
'width': sources[0]['width'],
|
|
'fps': fps,
|
|
'videos': sources,
|
|
'audios': [],
|
|
}
|
|
for audio_choices in (webm_audios, mp4_audios):
|
|
if not audio_choices:
|
|
continue
|
|
closest_audio_source = audio_choices[0]
|
|
best_err = target_audio_bitrate - audio_choices[0]['audio_bitrate']
|
|
best_err = abs(best_err)
|
|
for audio_source in audio_choices[1:]:
|
|
err = abs(audio_source['audio_bitrate'] - target_audio_bitrate)
|
|
# once err gets worse we have passed the closest one
|
|
if err > best_err:
|
|
break
|
|
best_err = err
|
|
closest_audio_source = audio_source
|
|
pair_info['audios'].append(closest_audio_source)
|
|
|
|
if not pair_info['audios']:
|
|
continue
|
|
|
|
def video_rank(src):
|
|
''' Sort by settings preference. Use file size as tiebreaker '''
|
|
setting_name = 'codec_rank_' + codec_name(src['vcodec'])
|
|
return (settings.current_settings_dict[setting_name],
|
|
src['file_size'])
|
|
pair_info['videos'].sort(key=video_rank)
|
|
|
|
pair_sources.append(pair_info)
|
|
|
|
pair_sources.sort(key=lambda src: src['quality'])
|
|
|
|
uni_idx = 0 if uni_sources else None
|
|
for i, source in enumerate(uni_sources):
|
|
if source['quality'] > target_resolution:
|
|
break
|
|
uni_idx = i
|
|
|
|
pair_idx = 0 if pair_sources else None
|
|
for i, pair_info in enumerate(pair_sources):
|
|
if pair_info['quality'] > target_resolution:
|
|
break
|
|
pair_idx = i
|
|
|
|
audio_track_sources = {}
|
|
for tid, ti in audio_by_track.items():
|
|
srcs = ti['sources']
|
|
srcs.sort(key=lambda s: s.get('audio_bitrate', 0))
|
|
audio_track_sources[tid] = srcs
|
|
|
|
return {
|
|
'uni_sources': uni_sources,
|
|
'uni_idx': uni_idx,
|
|
'pair_sources': pair_sources,
|
|
'pair_idx': pair_idx,
|
|
'audio_tracks': audio_tracks,
|
|
'audio_track_sources': audio_track_sources,
|
|
}
|
|
|
|
|
|
def make_caption_src(info, lang, auto=False, trans_lang=None):
|
|
label = lang
|
|
if auto:
|
|
label += ' (Automatic)'
|
|
if trans_lang:
|
|
label += ' -> ' + trans_lang
|
|
|
|
# Try to use Android caption URL directly (no PO Token needed)
|
|
caption_url = None
|
|
for track in info.get('_android_caption_tracks', []):
|
|
track_lang = track.get('languageCode', '')
|
|
track_kind = track.get('kind', '')
|
|
if track_lang == lang and (
|
|
(auto and track_kind == 'asr') or
|
|
(not auto and track_kind != 'asr')
|
|
):
|
|
caption_url = track.get('baseUrl')
|
|
break
|
|
|
|
if caption_url:
|
|
# Add format
|
|
if '&fmt=' in caption_url:
|
|
caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url)
|
|
else:
|
|
caption_url += '&fmt=vtt'
|
|
if trans_lang:
|
|
caption_url += '&tlang=' + trans_lang
|
|
url = util.prefix_url(caption_url)
|
|
else:
|
|
# Fallback to old method
|
|
url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang))
|
|
|
|
return {
|
|
'url': url,
|
|
'label': label,
|
|
'srclang': trans_lang[0:2] if trans_lang else lang[0:2],
|
|
'on': False,
|
|
}
|
|
|
|
|
|
def lang_in(lang, sequence):
|
|
'''Tests if the language is in sequence, with e.g. en and en-US considered the same'''
|
|
if lang is None:
|
|
return False
|
|
lang = lang[0:2]
|
|
return lang in (l[0:2] for l in sequence)
|
|
|
|
|
|
def lang_eq(lang1, lang2):
|
|
'''Tests if two iso 639-1 codes are equal, with en and en-US considered the same.
|
|
Just because the codes are equal does not mean the dialects are mutually intelligible, but this will have to do for now without a complex language model'''
|
|
if lang1 is None or lang2 is None:
|
|
return False
|
|
return lang1[0:2] == lang2[0:2]
|
|
|
|
|
|
def equiv_lang_in(lang, sequence):
|
|
'''Extracts a language in sequence which is equivalent to lang.
|
|
e.g. if lang is en, extracts en-GB from sequence.
|
|
Necessary because if only a specific variant like en-GB is available, can't ask YouTube for simply en. Need to get the available variant.'''
|
|
lang = lang[0:2]
|
|
for l in sequence:
|
|
if l[0:2] == lang:
|
|
return l
|
|
return None
|
|
|
|
|
|
def get_subtitle_sources(info):
|
|
'''Returns these sources, ordered from least to most intelligible:
|
|
native_video_lang (Automatic)
|
|
foreign_langs (Manual)
|
|
native_video_lang (Automatic) -> pref_lang
|
|
foreign_langs (Manual) -> pref_lang
|
|
native_video_lang (Manual) -> pref_lang
|
|
pref_lang (Automatic)
|
|
pref_lang (Manual)'''
|
|
sources = []
|
|
if not yt_data_extract.captions_available(info):
|
|
return []
|
|
pref_lang = settings.subtitles_language
|
|
native_video_lang = None
|
|
if info['automatic_caption_languages']:
|
|
native_video_lang = info['automatic_caption_languages'][0]
|
|
|
|
highest_fidelity_is_manual = False
|
|
|
|
# Sources are added in very specific order outlined above
|
|
# More intelligible sources are put further down to avoid browser bug when there are too many languages
|
|
# (in firefox, it is impossible to select a language near the top of the list because it is cut off)
|
|
|
|
# native_video_lang (Automatic)
|
|
if native_video_lang and not lang_eq(native_video_lang, pref_lang):
|
|
sources.append(make_caption_src(info, native_video_lang, auto=True))
|
|
|
|
# foreign_langs (Manual)
|
|
for lang in info['manual_caption_languages']:
|
|
if not lang_eq(lang, pref_lang):
|
|
sources.append(make_caption_src(info, lang))
|
|
|
|
if (lang_in(pref_lang, info['translation_languages'])
|
|
and not lang_in(pref_lang, info['automatic_caption_languages'])
|
|
and not lang_in(pref_lang, info['manual_caption_languages'])):
|
|
# native_video_lang (Automatic) -> pref_lang
|
|
if native_video_lang and not lang_eq(pref_lang, native_video_lang):
|
|
sources.append(make_caption_src(info, native_video_lang, auto=True, trans_lang=pref_lang))
|
|
|
|
# foreign_langs (Manual) -> pref_lang
|
|
for lang in info['manual_caption_languages']:
|
|
if not lang_eq(lang, native_video_lang) and not lang_eq(lang, pref_lang):
|
|
sources.append(make_caption_src(info, lang, trans_lang=pref_lang))
|
|
|
|
# native_video_lang (Manual) -> pref_lang
|
|
if lang_in(native_video_lang, info['manual_caption_languages']):
|
|
sources.append(make_caption_src(info, native_video_lang, trans_lang=pref_lang))
|
|
|
|
# pref_lang (Automatic)
|
|
if lang_in(pref_lang, info['automatic_caption_languages']):
|
|
sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['automatic_caption_languages']), auto=True))
|
|
|
|
# pref_lang (Manual)
|
|
if lang_in(pref_lang, info['manual_caption_languages']):
|
|
sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['manual_caption_languages'])))
|
|
highest_fidelity_is_manual = True
|
|
|
|
if sources and sources[-1]['srclang'] == pref_lang:
|
|
# set as on by default since it's manual a default-on subtitles mode is in settings
|
|
if highest_fidelity_is_manual and settings.subtitles_mode > 0:
|
|
sources[-1]['on'] = True
|
|
# set as on by default since settings indicate to set it as such even if it's not manual
|
|
elif settings.subtitles_mode == 2:
|
|
sources[-1]['on'] = True
|
|
|
|
if len(sources) == 0:
|
|
assert len(info['automatic_caption_languages']) == 0 and len(info['manual_caption_languages']) == 0
|
|
|
|
return sources
|
|
|
|
|
|
def get_ordered_music_list_attributes(music_list):
|
|
# get the set of attributes which are used by atleast 1 track
|
|
# so there isn't an empty, extraneous album column which no tracks use, for example
|
|
used_attributes = set()
|
|
for track in music_list:
|
|
used_attributes = used_attributes | track.keys()
|
|
|
|
# now put them in the right order
|
|
ordered_attributes = []
|
|
for attribute in ('Artist', 'Title', 'Album'):
|
|
if attribute.lower() in used_attributes:
|
|
ordered_attributes.append(attribute)
|
|
|
|
return ordered_attributes
|
|
|
|
|
|
def save_decrypt_cache():
|
|
os.makedirs(settings.data_dir, exist_ok=True)
|
|
f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
|
|
|
|
f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
|
|
f.close()
|
|
|
|
|
|
def decrypt_signatures(info, video_id):
|
|
'''return error string, or False if no errors'''
|
|
if not yt_data_extract.requires_decryption(info):
|
|
return False
|
|
if not info['player_name']:
|
|
return 'Could not find player name'
|
|
|
|
player_name = info['player_name']
|
|
if player_name in decrypt_cache:
|
|
print('Using cached decryption function for: ' + player_name)
|
|
info['decryption_function'] = decrypt_cache[player_name]
|
|
else:
|
|
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
|
|
base_js = base_js.decode('utf-8')
|
|
err = yt_data_extract.extract_decryption_function(info, base_js)
|
|
if err:
|
|
return err
|
|
decrypt_cache[player_name] = info['decryption_function']
|
|
save_decrypt_cache()
|
|
err = yt_data_extract.decrypt_signatures(info)
|
|
return err
|
|
|
|
|
|
def _add_to_error(info, key, additional_message):
|
|
if key in info and info[key]:
|
|
info[key] += additional_message
|
|
else:
|
|
info[key] = additional_message
|
|
|
|
|
|
def fetch_player_response(client, video_id):
|
|
return util.call_youtube_api(client, 'player', {
|
|
'videoId': video_id,
|
|
})
|
|
|
|
|
|
def fetch_watch_page_info(video_id, playlist_id, index):
|
|
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
|
|
# videos
|
|
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999'
|
|
if playlist_id:
|
|
url += '&list=' + playlist_id
|
|
if index:
|
|
url += '&index=' + index
|
|
|
|
headers = (
|
|
('Accept', '*/*'),
|
|
('Accept-Language', 'en-US,en;q=0.5'),
|
|
('X-YouTube-Client-Name', '2'),
|
|
('X-YouTube-Client-Version', '2.20180830'),
|
|
) + util.mobile_ua
|
|
|
|
watch_page = util.fetch_url(url, headers=headers,
|
|
debug_name='watch')
|
|
watch_page = watch_page.decode('utf-8')
|
|
return yt_data_extract.extract_watch_info_from_html(watch_page)
|
|
|
|
|
|
def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|
primary_client = 'android_vr'
|
|
fallback_client = 'ios'
|
|
last_resort_client = 'tv_embedded'
|
|
|
|
tasks = (
|
|
# Get video metadata from here
|
|
gevent.spawn(fetch_watch_page_info, video_id, playlist_id, index),
|
|
gevent.spawn(fetch_player_response, primary_client, video_id)
|
|
)
|
|
gevent.joinall(tasks)
|
|
util.check_gevent_exceptions(*tasks)
|
|
|
|
info = tasks[0].value or {}
|
|
player_response = tasks[1].value or {}
|
|
|
|
# Save android_vr caption tracks (no PO Token needed for these URLs)
|
|
if isinstance(player_response, str):
|
|
try:
|
|
pr_data = json.loads(player_response)
|
|
except Exception:
|
|
pr_data = {}
|
|
else:
|
|
pr_data = player_response or {}
|
|
android_caption_tracks = yt_data_extract.deep_get(
|
|
pr_data, 'captions', 'playerCaptionsTracklistRenderer',
|
|
'captionTracks', default=[])
|
|
info['_android_caption_tracks'] = android_caption_tracks
|
|
|
|
# Save streamingData for multi-audio extraction
|
|
pr_streaming_data = pr_data.get('streamingData', {})
|
|
info['_streamingData'] = pr_streaming_data
|
|
|
|
yt_data_extract.update_with_new_urls(info, player_response)
|
|
|
|
# HLS manifest - try multiple clients in case one is blocked
|
|
info['hls_manifest_url'] = None
|
|
info['hls_audio_tracks'] = {}
|
|
hls_data = None
|
|
hls_client_used = None
|
|
for hls_client in ('ios', 'ios_vr', 'android'):
|
|
try:
|
|
resp = fetch_player_response(hls_client, video_id) or {}
|
|
hls_data = json.loads(resp) if isinstance(resp, str) else resp
|
|
hls_manifest_url = (hls_data.get('streamingData') or {}).get('hlsManifestUrl', '')
|
|
if hls_manifest_url:
|
|
hls_client_used = hls_client
|
|
break
|
|
except Exception as e:
|
|
print(f'HLS fetch with {hls_client} failed: {e}')
|
|
|
|
if hls_manifest_url:
|
|
info['hls_manifest_url'] = hls_manifest_url
|
|
import re as _re
|
|
from urllib.parse import urljoin
|
|
hls_manifest = util.fetch_url(hls_manifest_url,
|
|
headers=(('User-Agent', 'Mozilla/5.0'),),
|
|
debug_name='hls_manifest').decode('utf-8')
|
|
|
|
# Parse EXT-X-MEDIA audio tracks from HLS manifest
|
|
for line in hls_manifest.split('\n'):
|
|
if '#EXT-X-MEDIA' not in line or 'TYPE=AUDIO' not in line:
|
|
continue
|
|
name_m = _re.search(r'NAME="([^"]+)"', line)
|
|
lang_m = _re.search(r'LANGUAGE="([^"]+)"', line)
|
|
default_m = _re.search(r'DEFAULT=(YES|NO)', line)
|
|
group_m = _re.search(r'GROUP-ID="([^"]+)"', line)
|
|
uri_m = _re.search(r'URI="([^"]+)"', line)
|
|
if not uri_m or not lang_m:
|
|
continue
|
|
lang = lang_m.group(1)
|
|
is_default = default_m and default_m.group(1) == 'YES'
|
|
group = group_m.group(1) if group_m else '0'
|
|
key = lang
|
|
absolute_hls_url = urljoin(hls_manifest_url, uri_m.group(1))
|
|
if key not in info['hls_audio_tracks'] or group > info['hls_audio_tracks'][key].get('group', '0'):
|
|
info['hls_audio_tracks'][key] = {
|
|
'name': name_m.group(1) if name_m else lang,
|
|
'lang': lang,
|
|
'hls_url': absolute_hls_url,
|
|
'group': group,
|
|
'is_default': is_default,
|
|
}
|
|
|
|
# Register HLS audio tracks for proxy access
|
|
added = 0
|
|
for lang, track in info['hls_audio_tracks'].items():
|
|
ck = video_id + '_' + lang
|
|
from youtube.hls_cache import register_track
|
|
register_track(ck, track['hls_url'],
|
|
video_id=video_id, track_id=lang)
|
|
|
|
fmt = {
|
|
'audio_track_id': lang,
|
|
'audio_track_name': track['name'],
|
|
'audio_track_is_default': track['is_default'],
|
|
'itag': 'hls_' + lang,
|
|
'ext': 'mp4',
|
|
'audio_bitrate': 128,
|
|
'bitrate': 128000,
|
|
'acodec': 'mp4a.40.2',
|
|
'vcodec': None,
|
|
'width': None,
|
|
'height': None,
|
|
'file_size': None,
|
|
'audio_sample_rate': 44100,
|
|
'duration_ms': None,
|
|
'fps': None,
|
|
'init_range': {'start': 0, 'end': 0},
|
|
'index_range': {'start': 0, 'end': 0},
|
|
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck),
|
|
's': None,
|
|
'sp': None,
|
|
'quality': None,
|
|
'type': 'audio/mp4',
|
|
'quality_string': track['name'],
|
|
'mime_codec': 'audio/mp4; codecs="mp4a.40.2"',
|
|
'is_hls': True,
|
|
}
|
|
info['formats'].append(fmt)
|
|
added += 1
|
|
|
|
if added:
|
|
print(f"Added {added} HLS audio tracks (via {hls_client_used})")
|
|
else:
|
|
print("No HLS manifest available from any client")
|
|
info['hls_manifest_url'] = None
|
|
info['hls_audio_tracks'] = {}
|
|
info['hls_unavailable'] = True
|
|
|
|
# Register HLS manifest for proxying
|
|
if info['hls_manifest_url']:
|
|
ck = video_id + '_video'
|
|
from youtube.hls_cache import register_track
|
|
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
|
|
# Use proxy URL instead of direct Google Video URL
|
|
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck)
|
|
|
|
# Fallback to 'ios' if no valid URLs are found
|
|
if not info.get('formats') or info.get('player_urls_missing'):
|
|
print(f"No URLs found in '{primary_client}', attempting with '{fallback_client}'.")
|
|
try:
|
|
player_response = fetch_player_response(fallback_client, video_id) or {}
|
|
yt_data_extract.update_with_new_urls(info, player_response)
|
|
except util.FetchError as e:
|
|
print(f"Fallback '{fallback_client}' failed: {e}")
|
|
|
|
# Final attempt with 'tv_embedded' if there are still no URLs
|
|
if not info.get('formats') or info.get('player_urls_missing'):
|
|
print(f"No URLs found in '{fallback_client}', attempting with '{last_resort_client}'")
|
|
try:
|
|
player_response = fetch_player_response(last_resort_client, video_id) or {}
|
|
yt_data_extract.update_with_new_urls(info, player_response)
|
|
except util.FetchError as e:
|
|
print(f"Fallback '{last_resort_client}' failed: {e}")
|
|
|
|
# signature decryption
|
|
if info.get('formats'):
|
|
decryption_error = decrypt_signatures(info, video_id)
|
|
if decryption_error:
|
|
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error
|
|
|
|
# check if urls ready (non-live format) in former livestream
|
|
# urls not ready if all of them have no filesize
|
|
if info['was_live']:
|
|
info['urls_ready'] = False
|
|
for fmt in info['formats']:
|
|
if fmt['file_size'] is not None:
|
|
info['urls_ready'] = True
|
|
else:
|
|
info['urls_ready'] = True
|
|
|
|
# livestream urls
|
|
# sometimes only the livestream urls work soon after the livestream is over
|
|
info['hls_formats'] = []
|
|
if info.get('hls_manifest_url') and (info.get('live') or not info.get('formats') or not info['urls_ready']):
|
|
try:
|
|
manifest = util.fetch_url(info['hls_manifest_url'],
|
|
debug_name='hls_manifest.m3u8',
|
|
report_text='Fetched hls manifest'
|
|
).decode('utf-8')
|
|
info['hls_formats'], err = yt_data_extract.extract_hls_formats(manifest)
|
|
if not err:
|
|
info['playability_error'] = None
|
|
for fmt in info['hls_formats']:
|
|
fmt['video_quality'] = video_quality_string(fmt)
|
|
except Exception as e:
|
|
print(f"Error obteniendo HLS manifest: {e}")
|
|
info['hls_formats'] = []
|
|
|
|
# check for 403. Unnecessary for tor video routing b/c ip address is same
|
|
info['invidious_used'] = False
|
|
info['invidious_reload_button'] = False
|
|
info['tor_bypass_used'] = False
|
|
if (settings.route_tor == 1
|
|
and info['formats'] and info['formats'][0]['url']):
|
|
try:
|
|
response = util.head(info['formats'][0]['url'],
|
|
report_text='Checked for URL access')
|
|
except urllib3.exceptions.HTTPError:
|
|
print('Error while checking for URL access:\n')
|
|
traceback.print_exc()
|
|
return info
|
|
|
|
if response.status == 403:
|
|
print('Access denied (403) for video urls.')
|
|
print('Routing video through Tor')
|
|
info['tor_bypass_used'] = True
|
|
for fmt in info['formats']:
|
|
fmt['url'] += '&use_tor=1'
|
|
elif 300 <= response.status < 400:
|
|
print('Error: exceeded max redirects while checking video URL')
|
|
return info
|
|
|
|
|
|
def video_quality_string(format):
|
|
if format['vcodec']:
|
|
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?')
|
|
if format['fps']:
|
|
result += ' ' + str(format['fps']) + 'fps'
|
|
return result
|
|
elif format['acodec']:
|
|
return 'audio only'
|
|
|
|
return '?'
|
|
|
|
|
|
def short_video_quality_string(fmt):
|
|
result = str(fmt['quality'] or '?') + 'p'
|
|
if fmt['fps']:
|
|
result += str(fmt['fps'])
|
|
if fmt['vcodec'].startswith('av01'):
|
|
result += ' AV1'
|
|
elif fmt['vcodec'].startswith('avc'):
|
|
result += ' h264'
|
|
else:
|
|
result += ' ' + fmt['vcodec']
|
|
return result
|
|
|
|
|
|
def audio_quality_string(fmt):
|
|
if fmt['acodec']:
|
|
if fmt['audio_bitrate']:
|
|
result = '%d' % fmt['audio_bitrate'] + 'k'
|
|
else:
|
|
result = '?k'
|
|
if fmt['audio_sample_rate']:
|
|
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz'
|
|
return result
|
|
elif fmt['vcodec']:
|
|
return 'video only'
|
|
return '?'
|
|
|
|
|
|
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/utils.py
|
|
def format_bytes(bytes):
|
|
if bytes is None:
|
|
return 'N/A'
|
|
if type(bytes) is str:
|
|
bytes = float(bytes)
|
|
if bytes == 0.0:
|
|
exponent = 0
|
|
else:
|
|
exponent = int(math.log(bytes, 1024.0))
|
|
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
|
|
converted = float(bytes) / float(1024 ** exponent)
|
|
return '%.2f%s' % (converted, suffix)
|
|
|
|
|
|
@yt_app.route('/ytl-api/audio-track-proxy')
|
|
def audio_track_proxy():
|
|
"""Proxy for DASH audio tracks to avoid throttling."""
|
|
cache_key = request.args.get('id', '')
|
|
audio_url = request.args.get('url', '')
|
|
|
|
if not audio_url:
|
|
flask.abort(400, 'Missing URL')
|
|
|
|
try:
|
|
headers = (
|
|
('User-Agent', 'Mozilla/5.0'),
|
|
('Accept', '*/*'),
|
|
)
|
|
content = util.fetch_url(audio_url, headers=headers,
|
|
debug_name='audio_dash', report_text=None)
|
|
return flask.Response(content, mimetype='audio/mp4',
|
|
headers={'Access-Control-Allow-Origin': '*',
|
|
'Cache-Control': 'max-age=3600'})
|
|
except Exception as e:
|
|
flask.abort(502, f'Audio fetch failed: {e}')
|
|
|
|
|
|
@yt_app.route('/ytl-api/audio-track')
|
|
def get_audio_track():
|
|
"""Proxy HLS audio/video: playlist or individual segment."""
|
|
from youtube.hls_cache import get_hls_url, _tracks
|
|
|
|
cache_key = request.args.get('id', '')
|
|
seg_url = request.args.get('seg', '')
|
|
playlist_url = request.args.get('url', '')
|
|
|
|
# Handle playlist/manifest URL (used for audio track playlists)
|
|
if playlist_url:
|
|
# Unwrap if double-proxied
|
|
if '/ytl-api/audio-track' in playlist_url:
|
|
import urllib.parse as _up
|
|
parsed = _up.parse_qs(_up.urlparse(playlist_url).query)
|
|
if 'url' in parsed:
|
|
playlist_url = parsed['url'][0]
|
|
|
|
try:
|
|
playlist = util.fetch_url(playlist_url,
|
|
headers=(('User-Agent', 'Mozilla/5.0'),),
|
|
debug_name='audio_playlist').decode('utf-8')
|
|
|
|
# Rewrite segment URLs
|
|
import re as _re
|
|
from urllib.parse import urljoin
|
|
base_url = request.url_root.rstrip('/')
|
|
playlist_base = playlist_url.rsplit('/', 1)[0] + '/'
|
|
|
|
playlist_lines = []
|
|
for line in playlist.split('\n'):
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
playlist_lines.append(line)
|
|
continue
|
|
|
|
# Resolve and proxy segment URL
|
|
seg = line if line.startswith('http') else urljoin(playlist_base, line)
|
|
# Always use &seg= parameter, never &url= for segments
|
|
playlist_lines.append(
|
|
base_url + '/ytl-api/audio-track?id='
|
|
+ urllib.parse.quote(cache_key)
|
|
+ '&seg=' + urllib.parse.quote(seg, safe='')
|
|
)
|
|
|
|
playlist = '\n'.join(playlist_lines)
|
|
|
|
return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl',
|
|
headers={'Access-Control-Allow-Origin': '*'})
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
flask.abort(502, f'Playlist fetch failed: {e}')
|
|
|
|
# Handle individual segment or nested playlist
|
|
if seg_url:
|
|
# Check if seg_url is already a proxied URL
|
|
if '/ytl-api/audio-track' in seg_url:
|
|
import urllib.parse as _up
|
|
parsed = _up.parse_qs(_up.urlparse(seg_url).query)
|
|
if 'seg' in parsed:
|
|
seg_url = parsed['seg'][0]
|
|
elif 'url' in parsed:
|
|
seg_url = parsed['url'][0]
|
|
|
|
# Check if this is a nested playlist (m3u8) that needs rewriting
|
|
# Playlists END with .m3u8 (optionally followed by query params)
|
|
# Segments may contain /index.m3u8/ in their path but end with .ts or similar
|
|
url_path = urllib.parse.urlparse(seg_url).path
|
|
|
|
# Only treat as playlist if path ends with .m3u8
|
|
# Don't use 'in' check because segments can have /index.m3u8/ in their path
|
|
is_playlist = url_path.endswith('.m3u8')
|
|
|
|
if is_playlist:
|
|
# This is a variant playlist - fetch and rewrite it
|
|
try:
|
|
raw_content = util.fetch_url(seg_url,
|
|
headers=(('User-Agent', 'Mozilla/5.0'),),
|
|
debug_name='nested_playlist')
|
|
|
|
# Check if this is actually binary data (segment) misidentified as playlist
|
|
try:
|
|
playlist = raw_content.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
is_playlist = False # Fall through to segment handler
|
|
|
|
if is_playlist:
|
|
# Rewrite segment URLs in this playlist
|
|
from urllib.parse import urljoin
|
|
import re as _re
|
|
base_url = request.url_root.rstrip('/')
|
|
playlist_base = seg_url.rsplit('/', 1)[0] + '/'
|
|
|
|
def proxy_url(url):
|
|
"""Rewrite a single URL to go through the proxy"""
|
|
if not url or url.startswith('/ytl-api/'):
|
|
return url
|
|
if not url.startswith('http://') and not url.startswith('https://'):
|
|
url = urljoin(playlist_base, url)
|
|
return (base_url + '/ytl-api/audio-track?id='
|
|
+ urllib.parse.quote(cache_key)
|
|
+ '&seg=' + urllib.parse.quote(url, safe=''))
|
|
|
|
playlist_lines = []
|
|
for line in playlist.split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
playlist_lines.append(line)
|
|
continue
|
|
|
|
# Handle tags with URI attributes (EXT-X-MAP, EXT-X-KEY, etc.)
|
|
if line.startswith('#') and 'URI=' in line:
|
|
def rewrite_uri_attr(match):
|
|
uri = match.group(1)
|
|
return 'URI="' + proxy_url(uri) + '"'
|
|
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
|
|
playlist_lines.append(line)
|
|
elif line.startswith('#'):
|
|
# Other tags pass through unchanged
|
|
playlist_lines.append(line)
|
|
else:
|
|
# This is a segment URL line
|
|
seg = line if line.startswith('http') else urljoin(playlist_base, line)
|
|
playlist_lines.append(proxy_url(seg))
|
|
|
|
playlist = '\n'.join(playlist_lines)
|
|
|
|
return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl',
|
|
headers={'Access-Control-Allow-Origin': '*'})
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
flask.abort(502, f'Nested playlist fetch failed: {e}')
|
|
|
|
# This is an actual segment - fetch and serve it
|
|
try:
|
|
headers = (
|
|
('User-Agent', 'Mozilla/5.0'),
|
|
('Accept', '*/*'),
|
|
)
|
|
content = util.fetch_url(seg_url, headers=headers,
|
|
debug_name='hls_seg', report_text=None)
|
|
|
|
# Determine content type based on URL or content
|
|
# HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s)
|
|
if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'):
|
|
content_type = 'video/mp4'
|
|
elif '.webm' in seg_url or seg_url.lower().endswith('.webm'):
|
|
content_type = 'video/webm'
|
|
else:
|
|
# Default to MPEG-TS for HLS
|
|
content_type = 'video/mp2t'
|
|
|
|
return flask.Response(content, mimetype=content_type,
|
|
headers={
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Methods': 'GET, OPTIONS',
|
|
'Access-Control-Allow-Headers': 'Range, Content-Type',
|
|
'Cache-Control': 'max-age=3600',
|
|
'Content-Type': content_type,
|
|
})
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
flask.abort(502, f'Segment fetch failed: {e}')
|
|
|
|
# Legacy: Proxy the HLS playlist for audio tracks (using get_hls_url)
|
|
hls_url = get_hls_url(cache_key)
|
|
if not hls_url:
|
|
flask.abort(404, 'Audio track not found')
|
|
|
|
try:
|
|
playlist = util.fetch_url(hls_url,
|
|
headers=(('User-Agent', 'Mozilla/5.0'),),
|
|
debug_name='audio_hls_playlist').decode('utf-8')
|
|
|
|
# Rewrite segment URLs to go through our proxy endpoint
|
|
import re as _re
|
|
from urllib.parse import urljoin
|
|
hls_base_url = hls_url.rsplit('/', 1)[0] + '/'
|
|
|
|
def make_proxy_url(segment_url):
|
|
if segment_url.startswith('/ytl-api/audio-track'):
|
|
return segment_url
|
|
base_url = request.url_root.rstrip('/')
|
|
return (base_url + '/ytl-api/audio-track?id='
|
|
+ urllib.parse.quote(cache_key)
|
|
+ '&seg=' + urllib.parse.quote(segment_url))
|
|
|
|
playlist_lines = []
|
|
for line in playlist.split('\n'):
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
playlist_lines.append(line)
|
|
continue
|
|
|
|
if line.startswith('http://') or line.startswith('https://'):
|
|
segment_url = line
|
|
else:
|
|
segment_url = urljoin(hls_base_url, line)
|
|
|
|
playlist_lines.append(make_proxy_url(segment_url))
|
|
|
|
playlist = '\n'.join(playlist_lines)
|
|
|
|
return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl',
|
|
headers={'Access-Control-Allow-Origin': '*'})
|
|
except Exception as e:
|
|
flask.abort(502, f'Playlist fetch failed: {e}')
|
|
|
|
|
|
@yt_app.route('/ytl-api/hls-manifest')
|
|
def get_hls_manifest():
|
|
"""Proxy HLS video manifest, rewriting ALL URLs including audio tracks."""
|
|
from youtube.hls_cache import get_hls_url
|
|
|
|
cache_key = request.args.get('id', '')
|
|
is_audio = '_audio_' in cache_key or cache_key.endswith('_audio')
|
|
print(f'[hls-manifest] Request: id={cache_key[:40] if cache_key else ""}... (audio={is_audio})')
|
|
|
|
hls_url = get_hls_url(cache_key)
|
|
print(f'[hls-manifest] HLS URL: {hls_url[:80] if hls_url else None}...')
|
|
if not hls_url:
|
|
flask.abort(404, 'HLS manifest not found')
|
|
|
|
try:
|
|
print(f'[hls-manifest] Fetching HLS manifest...')
|
|
manifest = util.fetch_url(hls_url,
|
|
headers=(('User-Agent', 'Mozilla/5.0'),),
|
|
debug_name='hls_manifest').decode('utf-8')
|
|
print(f'[hls-manifest] Successfully fetched manifest ({len(manifest)} bytes)')
|
|
|
|
# Rewrite all URLs in the manifest to go through our proxy
|
|
import re as _re
|
|
from urllib.parse import urljoin
|
|
|
|
# Get the base URL for resolving relative URLs
|
|
hls_base_url = hls_url.rsplit('/', 1)[0] + '/'
|
|
base_url = request.url_root.rstrip('/')
|
|
|
|
# Rewrite URLs - handle both segment URLs and audio track URIs
|
|
def rewrite_url(url, is_audio_track=False):
|
|
if not url or url.startswith('/ytl-api/'):
|
|
return url
|
|
|
|
# Resolve relative URLs
|
|
if not url.startswith('http://') and not url.startswith('https://'):
|
|
url = urljoin(hls_base_url, url)
|
|
|
|
if is_audio_track:
|
|
# Audio track playlist - proxy through audio-track endpoint
|
|
return (base_url + '/ytl-api/audio-track?id='
|
|
+ urllib.parse.quote(cache_key)
|
|
+ '&url=' + urllib.parse.quote(url, safe=''))
|
|
else:
|
|
# Video segment or variant playlist - proxy through audio-track endpoint
|
|
return (base_url + '/ytl-api/audio-track?id='
|
|
+ urllib.parse.quote(cache_key)
|
|
+ '&seg=' + urllib.parse.quote(url, safe=''))
|
|
|
|
# Parse and rewrite the manifest
|
|
manifest_lines = []
|
|
rewritten_count = 0
|
|
for line in manifest.split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
manifest_lines.append(line)
|
|
continue
|
|
|
|
# Handle EXT-X-MEDIA tags with URI (audio tracks)
|
|
if line.startswith('#EXT-X-MEDIA:') and 'URI=' in line:
|
|
# Extract and rewrite the URI attribute
|
|
def rewrite_media_uri(match):
|
|
nonlocal rewritten_count
|
|
uri = match.group(1)
|
|
rewritten_count += 1
|
|
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"'
|
|
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
|
|
manifest_lines.append(line)
|
|
elif line.startswith('#'):
|
|
# Other tags pass through
|
|
manifest_lines.append(line)
|
|
else:
|
|
# This is a URL (segment or variant playlist)
|
|
if line.startswith('http://') or line.startswith('https://'):
|
|
url = line
|
|
else:
|
|
url = urljoin(hls_base_url, line)
|
|
rewritten_count += 1
|
|
manifest_lines.append(rewrite_url(url))
|
|
|
|
manifest = '\n'.join(manifest_lines)
|
|
print(f'[hls-manifest] Rewrote manifest with {len(manifest_lines)} lines, {rewritten_count} URLs rewritten')
|
|
|
|
return flask.Response(manifest, mimetype='application/vnd.apple.mpegurl',
|
|
headers={
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Methods': 'GET, OPTIONS',
|
|
'Access-Control-Allow-Headers': 'Range, Content-Type',
|
|
'Cache-Control': 'no-cache',
|
|
'Content-Type': 'application/vnd.apple.mpegurl',
|
|
})
|
|
except Exception as e:
|
|
print(f'[hls-manifest] Error: {e}')
|
|
import traceback
|
|
traceback.print_exc()
|
|
flask.abort(502, f'Manifest fetch failed: {e}')
|
|
|
|
|
|
@yt_app.route('/ytl-api/storyboard.vtt')
|
|
def get_storyboard_vtt():
|
|
"""
|
|
See:
|
|
https://github.com/iv-org/invidious/blob/9a8b81fcbe49ff8d88f197b7f731d6bf79fc8087/src/invidious.cr#L3603
|
|
https://github.com/iv-org/invidious/blob/3bb7fbb2f119790ee6675076b31cd990f75f64bb/src/invidious/videos.cr#L623
|
|
"""
|
|
|
|
spec_url = request.args.get('spec_url')
|
|
url, *boards = spec_url.split('|')
|
|
base_url, q = url.split('?')
|
|
q = parse_qs(q) # for url query
|
|
|
|
storyboard = None
|
|
wanted_height = 90
|
|
|
|
for i, board in enumerate(boards):
|
|
*t, _, sigh = board.split("#")
|
|
width, height, count, width_cnt, height_cnt, interval = map(int, t)
|
|
if height != wanted_height: continue
|
|
q['sigh'] = [sigh]
|
|
url = f"{base_url}?{urlencode(q, doseq=True)}"
|
|
storyboard = SimpleNamespace(
|
|
url = url.replace("$L", str(i)).replace("$N", "M$M"),
|
|
width = width,
|
|
height = height,
|
|
interval = interval,
|
|
width_cnt = width_cnt,
|
|
height_cnt = height_cnt,
|
|
storyboard_count = ceil(count / (width_cnt * height_cnt))
|
|
)
|
|
|
|
if not storyboard:
|
|
flask.abort(404)
|
|
|
|
def to_ts(ms):
|
|
s, ms = divmod(ms, 1000)
|
|
h, s = divmod(s, 3600)
|
|
m, s = divmod(s, 60)
|
|
return f"{h:02}:{m:02}:{s:02}.{ms:03}"
|
|
|
|
r = "WEBVTT" # result
|
|
ts = 0 # current timestamp
|
|
|
|
for i in range(storyboard.storyboard_count):
|
|
url = '/' + storyboard.url.replace("$M", str(i))
|
|
interval = storyboard.interval
|
|
w, h = storyboard.width, storyboard.height
|
|
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
|
|
|
|
for j in range(h_cnt):
|
|
for k in range(w_cnt):
|
|
r += f"{to_ts(ts)} --> {to_ts(ts+interval)}\n"
|
|
r += f"{url}#xywh={w * k},{h * j},{w},{h}\n\n"
|
|
ts += interval
|
|
|
|
return flask.Response(r, mimetype='text/vtt')
|
|
|
|
|
|
time_table = {'h': 3600, 'm': 60, 's': 1}
|
|
@yt_app.route('/watch')
|
|
@yt_app.route('/embed')
|
|
@yt_app.route('/embed/<video_id>')
|
|
@yt_app.route('/shorts')
|
|
@yt_app.route('/shorts/<video_id>')
|
|
def get_watch_page(video_id=None):
|
|
video_id = request.args.get('v') or video_id
|
|
if not video_id:
|
|
return flask.render_template('error.html', error_message='Missing video id'), 404
|
|
if len(video_id) < 11:
|
|
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404
|
|
|
|
time_start_str = request.args.get('t', '0s')
|
|
time_start = 0
|
|
if re.fullmatch(r'(\d+(h|m|s))+', time_start_str):
|
|
for match in re.finditer(r'(\d+)(h|m|s)', time_start_str):
|
|
time_start += int(match.group(1))*time_table[match.group(2)]
|
|
elif re.fullmatch(r'\d+', time_start_str):
|
|
time_start = int(time_start_str)
|
|
|
|
lc = request.args.get('lc', '')
|
|
playlist_id = request.args.get('list')
|
|
index = request.args.get('index')
|
|
use_invidious = bool(int(request.args.get('use_invidious', '1')))
|
|
if request.path.startswith('/embed') and settings.embed_page_mode:
|
|
tasks = (
|
|
gevent.spawn((lambda: {})),
|
|
gevent.spawn(extract_info, video_id, use_invidious,
|
|
playlist_id=playlist_id, index=index),
|
|
)
|
|
else:
|
|
tasks = (
|
|
gevent.spawn(comments.video_comments, video_id,
|
|
int(settings.default_comment_sorting), lc=lc),
|
|
gevent.spawn(extract_info, video_id, use_invidious,
|
|
playlist_id=playlist_id, index=index),
|
|
)
|
|
gevent.joinall(tasks)
|
|
util.check_gevent_exceptions(tasks[1])
|
|
comments_info, info = tasks[0].value, tasks[1].value
|
|
|
|
if info['error']:
|
|
return flask.render_template('error.html', error_message=info['error'])
|
|
|
|
video_info = {
|
|
'duration': util.seconds_to_timestamp(info['duration'] or 0),
|
|
'id': info['id'],
|
|
'title': info['title'],
|
|
'author': info['author'],
|
|
'author_id': info['author_id'],
|
|
}
|
|
|
|
# prefix urls, and other post-processing not handled by yt_data_extract
|
|
for item in info['related_videos']:
|
|
# Only set thumbnail if YouTube didn't provide one
|
|
if not item.get('thumbnail'):
|
|
if item.get('type') == 'playlist' and item.get('first_video_id'):
|
|
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id'])
|
|
elif item.get('type') == 'video' and item.get('id'):
|
|
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
|
|
util.prefix_urls(item)
|
|
util.add_extra_html_info(item)
|
|
for song in info['music_list']:
|
|
song['url'] = util.prefix_url(song['url'])
|
|
if info['playlist']:
|
|
playlist_id = info['playlist']['id']
|
|
for item in info['playlist']['items']:
|
|
# Only set thumbnail if YouTube didn't provide one
|
|
if not item.get('thumbnail') and item.get('type') == 'video' and item.get('id'):
|
|
item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
|
|
util.prefix_urls(item)
|
|
util.add_extra_html_info(item)
|
|
if playlist_id:
|
|
item['url'] += '&list=' + playlist_id
|
|
if item['index']:
|
|
item['url'] += '&index=' + str(item['index'])
|
|
info['playlist']['author_url'] = util.prefix_url(
|
|
info['playlist']['author_url'])
|
|
if settings.img_prefix:
|
|
# Don't prefix hls_formats for now because the urls inside the manifest
|
|
# would need to be prefixed as well.
|
|
for fmt in info['formats']:
|
|
fmt['url'] = util.prefix_url(fmt['url'])
|
|
|
|
# Add video title to end of url path so it has a filename other than just
|
|
# "videoplayback" when downloaded
|
|
title = urllib.parse.quote(util.to_valid_filename(info['title'] or ''))
|
|
for fmt in info['formats']:
|
|
filename = title
|
|
ext = fmt.get('ext')
|
|
if ext:
|
|
filename += '.' + ext
|
|
fmt['url'] = fmt['url'].replace(
|
|
'/videoplayback',
|
|
'/videoplayback/name/' + filename)
|
|
|
|
download_formats = []
|
|
|
|
for format in (info['formats'] + info['hls_formats']):
|
|
if format['acodec'] and format['vcodec']:
|
|
codecs_string = format['acodec'] + ', ' + format['vcodec']
|
|
else:
|
|
codecs_string = format['acodec'] or format['vcodec'] or '?'
|
|
download_formats.append({
|
|
'url': format['url'],
|
|
'ext': format['ext'] or '?',
|
|
'audio_quality': audio_quality_string(format),
|
|
'video_quality': video_quality_string(format),
|
|
'file_size': format_bytes(format['file_size']),
|
|
'codecs': codecs_string,
|
|
})
|
|
|
|
if (settings.route_tor == 2) or info['tor_bypass_used']:
|
|
target_resolution = 240
|
|
else:
|
|
res = settings.default_resolution
|
|
target_resolution = 1080 if res == 'auto' else int(res)
|
|
|
|
# Get video sources for no-JS fallback and DASH (av-merge) fallback
|
|
video_sources = get_video_sources(info, target_resolution)
|
|
uni_sources = video_sources['uni_sources']
|
|
pair_sources = video_sources['pair_sources']
|
|
pair_idx = video_sources['pair_idx']
|
|
audio_track_sources = video_sources['audio_track_sources']
|
|
|
|
# Build audio tracks list from HLS
|
|
audio_tracks = []
|
|
hls_audio_tracks = info.get('hls_audio_tracks', {})
|
|
hls_manifest_url = info.get('hls_manifest_url')
|
|
if hls_audio_tracks:
|
|
# Prefer "original" audio track
|
|
original_lang = None
|
|
for lang, track in hls_audio_tracks.items():
|
|
if 'original' in (track.get('name') or '').lower():
|
|
original_lang = lang
|
|
break
|
|
|
|
# Add tracks, preferring original as default
|
|
for lang, track in hls_audio_tracks.items():
|
|
is_default = (lang == original_lang) if original_lang else track['is_default']
|
|
if is_default:
|
|
audio_tracks.insert(0, {
|
|
'id': lang,
|
|
'name': track['name'],
|
|
'is_default': True,
|
|
})
|
|
else:
|
|
audio_tracks.append({
|
|
'id': lang,
|
|
'name': track['name'],
|
|
'is_default': False,
|
|
})
|
|
else:
|
|
# Fallback: single default audio track
|
|
audio_tracks = [{'id': 'default', 'name': 'Default', 'is_default': True}]
|
|
|
|
# Get video dimensions
|
|
video_height = info.get('height') or 360
|
|
video_width = info.get('width') or 640
|
|
|
|
|
|
|
|
# 1 second per pixel, or the actual video width
|
|
theater_video_target_width = max(640, info['duration'] or 0, video_width)
|
|
|
|
# Check for false determination of disabled comments, which comes from
|
|
# the watch page. But if we got comments in the separate request for those,
|
|
# then the determination is wrong.
|
|
if info['comments_disabled'] and comments_info.get('comments'):
|
|
info['comments_disabled'] = False
|
|
print('Warning: False determination that comments are disabled')
|
|
print('Comment count:', info['comment_count'])
|
|
info['comment_count'] = None # hack to make it obvious there's a bug
|
|
|
|
# captions and transcript
|
|
subtitle_sources = get_subtitle_sources(info)
|
|
other_downloads = []
|
|
for source in subtitle_sources:
|
|
best_caption_parse = urllib.parse.urlparse(
|
|
source['url'].lstrip('/'))
|
|
transcript_url = (util.URL_ORIGIN
|
|
+ '/watch/transcript'
|
|
+ best_caption_parse.path
|
|
+ '?' + best_caption_parse.query)
|
|
other_downloads.append({
|
|
'label': 'Video Transcript: ' + source['label'],
|
|
'ext': 'txt',
|
|
'url': transcript_url
|
|
})
|
|
|
|
if request.path.startswith('/embed') and settings.embed_page_mode:
|
|
template_name = 'embed.html'
|
|
else:
|
|
template_name = 'watch.html'
|
|
return flask.render_template(template_name,
|
|
header_playlist_names = local_playlist.get_playlist_names(),
|
|
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
|
|
time_published = info['time_published'],
|
|
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
|
|
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
|
|
dislike_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)),
|
|
download_formats = download_formats,
|
|
other_downloads = other_downloads,
|
|
video_info = json.dumps(video_info),
|
|
hls_formats = info['hls_formats'],
|
|
hls_manifest_url = hls_manifest_url,
|
|
audio_tracks = audio_tracks,
|
|
subtitle_sources = subtitle_sources,
|
|
uni_sources = uni_sources,
|
|
pair_sources = pair_sources,
|
|
pair_idx = pair_idx,
|
|
hls_unavailable = info.get('hls_unavailable', False),
|
|
playback_mode = settings.playback_mode,
|
|
related = info['related_videos'],
|
|
playlist = info['playlist'],
|
|
music_list = info['music_list'],
|
|
music_attributes = get_ordered_music_list_attributes(info['music_list']),
|
|
comments_info = comments_info,
|
|
comment_count = info['comment_count'],
|
|
comments_disabled = info['comments_disabled'],
|
|
|
|
video_height = video_height,
|
|
video_width = video_width,
|
|
theater_video_target_width = theater_video_target_width,
|
|
|
|
title = info['title'],
|
|
uploader = info['author'],
|
|
description = info['description'],
|
|
unlisted = info['unlisted'],
|
|
limited_state = info['limited_state'],
|
|
age_restricted = info['age_restricted'],
|
|
live = info['live'],
|
|
playability_error = info['playability_error'],
|
|
|
|
allowed_countries = info['allowed_countries'],
|
|
ip_address = info['ip_address'] if settings.route_tor else None,
|
|
invidious_used = info['invidious_used'],
|
|
invidious_reload_button = info['invidious_reload_button'],
|
|
video_url = util.URL_ORIGIN + '/watch?v=' + video_id,
|
|
video_id = video_id,
|
|
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' +
|
|
urlencode([('spec_url', info['storyboard_spec_url'])])
|
|
if info['storyboard_spec_url'] else None),
|
|
|
|
js_data = {
|
|
'video_id': info['id'],
|
|
'video_duration': info['duration'],
|
|
'settings': settings.current_settings_dict,
|
|
'has_manual_captions': any(s.get('on') for s in subtitle_sources),
|
|
'audio_tracks': audio_tracks,
|
|
'hls_manifest_url': hls_manifest_url,
|
|
'time_start': time_start,
|
|
'playlist': info['playlist'],
|
|
'related': info['related_videos'],
|
|
'playability_error': info['playability_error'],
|
|
'hls_unavailable': info.get('hls_unavailable', False),
|
|
'pair_sources': pair_sources,
|
|
'pair_idx': pair_idx,
|
|
'uni_sources': uni_sources,
|
|
'uni_idx': video_sources['uni_idx'],
|
|
'using_pair_sources': bool(pair_sources),
|
|
},
|
|
font_family = youtube.font_choices[settings.font], # for embed page
|
|
)
|
|
|
|
|
|
@yt_app.route('/api/<path:dummy>')
|
|
def get_captions(dummy):
|
|
url = 'https://www.youtube.com' + request.full_path
|
|
try:
|
|
result = util.fetch_url(url, headers=util.mobile_ua)
|
|
result = result.replace(b"align:start position:0%", b"")
|
|
return flask.Response(result, mimetype='text/vtt')
|
|
except Exception as e:
|
|
logger.debug(f'Caption fetch failed: {e}')
|
|
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
|
|
|
|
|
|
times_reg = re.compile(r'^\d\d:\d\d:\d\d\.\d\d\d --> \d\d:\d\d:\d\d\.\d\d\d.*$')
|
|
inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
|
|
@yt_app.route('/watch/transcript/<path:caption_path>')
|
|
def get_transcript(caption_path):
|
|
try:
|
|
captions = util.fetch_url('https://www.youtube.com/'
|
|
+ caption_path
|
|
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
|
|
except util.FetchError as e:
|
|
msg = ('Error retrieving captions: ' + str(e) + '\n\n'
|
|
+ 'The caption url may have expired.')
|
|
print(msg)
|
|
return flask.Response(
|
|
msg,
|
|
status=e.code,
|
|
mimetype='text/plain;charset=UTF-8')
|
|
|
|
lines = captions.splitlines()
|
|
segments = []
|
|
|
|
# skip captions file header
|
|
i = 0
|
|
while lines[i] != '':
|
|
i += 1
|
|
|
|
current_segment = None
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
if line == '':
|
|
if ((current_segment is not None)
|
|
and (current_segment['begin'] is not None)):
|
|
segments.append(current_segment)
|
|
current_segment = {
|
|
'begin': None,
|
|
'end': None,
|
|
'lines': [],
|
|
}
|
|
elif times_reg.fullmatch(line.rstrip()):
|
|
current_segment['begin'], current_segment['end'] = line.split(' --> ')
|
|
else:
|
|
current_segment['lines'].append(
|
|
inner_timestamp_removal_reg.sub('', line))
|
|
i += 1
|
|
|
|
# if automatic captions, but not translated
|
|
if request.args.get('kind') == 'asr' and not request.args.get('tlang'):
|
|
# Automatic captions repeat content. The new segment is displayed
|
|
# on the bottom row; the old one is displayed on the top row.
|
|
# So grab the bottom row only
|
|
for seg in segments:
|
|
seg['text'] = seg['lines'][1]
|
|
else:
|
|
for seg in segments:
|
|
seg['text'] = ' '.join(map(str.rstrip, seg['lines']))
|
|
|
|
result = ''
|
|
for seg in segments:
|
|
if seg['text'] != ' ':
|
|
result += seg['begin'] + ' ' + seg['text'] + '\r\n'
|
|
|
|
return flask.Response(result.encode('utf-8'),
|
|
mimetype='text/plain;charset=UTF-8')
|