8 Commits

Author SHA1 Message Date
13708bb4ea update README.md
All checks were successful
CI / test (push) Successful in 44s
git-sync-with-mirror / git-sync (push) Successful in 12s
2026-05-03 13:55:24 -05:00
b9248cfe2d fix: init Plyr before HLS manifest loads to avoid bare video flash
All checks were successful
CI / test (push) Successful in 45s
Create Plyr instance immediately in start() so the styled player UI
appears right away. Quality and audio controls are injected once the
HLS manifest is ready, running doAdd() directly when player.ready is
already true instead of waiting on the 'ready' event that already fired.
2026-05-03 13:45:00 -05:00
07ca635a84 bump to v0.5.1
All checks were successful
CI / test (push) Successful in 46s
2026-05-03 13:20:32 -05:00
afbe137676 update HACKING.md
All checks were successful
CI / test (push) Successful in 44s
2026-05-03 13:19:00 -05:00
76c9263cd6 update README.md
All checks were successful
CI / test (push) Successful in 48s
2026-05-03 13:12:09 -05:00
ab55cf79bb fix: show only current branch in version display
All checks were successful
CI / test (push) Successful in 47s
git branch lists all branches; replace with git branch --show-current
2026-05-03 12:36:32 -05:00
8d66143c90 fix: update innertube clients and fix HLS/DASH quality switching
All checks were successful
CI / test (push) Successful in 53s
- Update innertube client versions to match yt-dlp (android 21.02.35,
  ios 21.02.3, web 2.20260114.08.00, android_vr 1.65.10)
- Remove obsolete clients (android-test-suite, ios_vr)
- Replace tv_embedded with TVHTML5_SIMPLY (cn 75)
- Add new clients: web_embedded, mweb, tv
- Fix HLS freeze on quality switch: use nextLevel instead of
  currentLevel, handle bufferStalledError, stream proxy segments
  instead of buffering in memory
- Populate DASH quality selector with actual sources (no Auto)
- Render quality-select empty in template, let JS populate per mode
2026-05-03 12:32:55 -05:00
50ad959a80 refactor: replace string concatenations with f-strings
All checks were successful
CI / test (push) Successful in 50s
2026-04-25 01:02:17 -05:00
30 changed files with 790 additions and 610 deletions

View File

@@ -2,7 +2,7 @@
[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0)
[![Python 3.7+](https://img.shields.io/badge/python-3.7+-blue.svg)](https://www.python.org/downloads/)
[![Tests](https://img.shields.io/badge/tests-passing-brightgreen.svg)](https://github.com/user234683/youtube-local)
[![Tests](https://img.shields.io/badge/tests-passing-brightgreen.svg)](https://git.fridu.us/heckyel/youtube-local)
A privacy-focused, browser-based YouTube client that routes requests through Tor for anonymous viewing—**without compromising on speed or features**.
@@ -48,12 +48,12 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
## Screenshots
| Light Theme | Gray Theme | Dark Theme |
|:-----------------------------------------------------:|:----------------------------------------------------:|:----------------------------------------------------:|
| ![Light](https://pic.infini.fr/l7WINjzS/0Ru6MrhA.png) | ![Gray](https://pic.infini.fr/znnQXWNc/hL78CRzo.png) | ![Dark](https://pic.infini.fr/iXwFtTWv/mt2kS5bv.png) |
|:----------------------------------------------------------------------------------------------:|:---------------------------------------------------------------------------------------------:|:---------------------------------------------------------------------------------------------:|
| ![Light](https://gist.github.com/user-attachments/assets/9552533c-6be0-4757-98aa-a59d1c294e90) | ![Gray](https://gist.github.com/user-attachments/assets/d7692a0f-b86f-4375-a011-7ee026bbbcdb) | ![Dark](https://gist.github.com/user-attachments/assets/abd4c38b-8612-4f43-9483-081eb950ab99) |
| Channel View | Playlist View |
|:-------------------------------------------------------:|:---------------------:|
| ![Channel](https://pic.infini.fr/JsenWVYe/SbdIQlS6.png) | *(similar structure)* |
|:------------------------------------------------------------------------------------------------:|:---------------------:|
| ![Channel](https://gist.github.com/user-attachments/assets/d359847c-96e1-403f-a190-3c159defe8a7) | *(similar structure)* |
---
@@ -61,7 +61,7 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
### Windows
1. Download the latest [release ZIP](https://github.com/user234683/yt-local/releases)
1. Download the latest [release ZIP](https://git.fridu.us/heckyel/yt-local/releases)
2. Extract to any folder
3. Run `run.bat` to start
@@ -69,7 +69,7 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
```bash
# 1. Clone or extract the release
git clone https://github.com/user234683/yt-local.git
git clone https://git.fridu.us/heckyel/yt-local.git
cd yt-local
# 2. Create and activate virtual environment
@@ -279,7 +279,7 @@ yt-local is designed for self-hosting.
This project is 100% free and open-source. If you'd like to support development:
- **Bitcoin**: `1JrC3iqs3PP5Ge1m1vu7WE8LEf4S85eo7y`
- **Tor node donation**: https://torservers.net/donate
- **Tor node donation**: <https://torservers.net/donate>
---
@@ -310,4 +310,4 @@ Permission is granted to relicense code portions into youtube-dl's license (curr
Made for privacy-conscious users
Last updated: 2026-04-19
Last updated: 2026-05-03

View File

@@ -1,4 +1,5 @@
# Coding guidelines
* Follow the [PEP 8 guidelines](https://www.python.org/dev/peps/pep-0008/) for all new Python code as best you can. Some old code doesn't follow PEP 8 yet. This includes limiting line length to 79 characters (with exception for long strings such as URLs that can't reasonably be broken across multiple lines) and using 4 spaces for indentation.
* Do not use single letter or cryptic names for variables (except iterator variables or the like). When in doubt, choose the more verbose option.
@@ -12,30 +13,34 @@
* The same guidelines apply to commenting code. If a piece of code is not self-explanatory, add a comment explaining what it does and why it's there.
# Testing and releases
* This project uses pytest. To install pytest and any future dependencies needed for development, run pip3 on the requirements-dev.txt file. To run tests, run `python3 -m pytest` rather than just `pytest` because the former will make sure the toplevel directory is in Python's import search path.
* To build releases for Windows, run `python3 generate_release.py [intended python version here, without v infront]`. The required software (such as 7z, git) are listed in the `generate_release.py` file. For instance, wine is required if building on GNU+Linux. The build script will automatically download the embedded Python release to include. Use the latest release of Python 3.7.x so that Vista will be supported. See https://github.com/user234683/youtube-local/issues/6#issuecomment-672608388
# Overview of the software architecture
## Overview of the software architecture
### server.py
## server.py
* This is the entry point, and sets up the HTTP server that listens for incoming requests. It delegates the request to the appropriate "site_handler". For instance, `localhost:8080/youtube.com/...` goes to the `youtube` site handler, whereas `localhost:8080/ytimg.com/...` (the url for video thumbnails) goes to the site handler for just fetching static resources such as images from youtube.
* The reason for this architecture: the original design philosophy when I first conceived the project was that this would work for any site supported by youtube-dl, including YouTube, Vimeo, DailyMotion, etc. I've dropped this idea for now, though I might pick it up later. (youtube-dl is no longer used)
* This file uses the raw [WSGI request](https://www.python.org/dev/peps/pep-3333/) format. The WSGI format is a Python standard for how HTTP servers (I use the stock server provided by gevent) should call HTTP applications. So that's why the file contains stuff like `env['REQUEST_METHOD']`.
### Flask and Gevent
## Flask and Gevent
* The `youtube` handler in server.py then delegates the request to the Flask yt_app object, which the rest of the project uses. [Flask](https://flask.palletsprojects.com/en/1.1.x/) is a web application framework that makes handling requests easier than accessing the raw WSGI requests. Flask (Werkzeug specifically) figures out which function to call for a particular url. Each request handling function is registered into Flask's routing table by using function annotations above it. The request handling functions are always at the bottom of the file for a particular youtube page (channel, watch, playlist, etc.), and they're where you want to look to see how the response gets constructed for a particular url. Miscellaneous request handlers that don't belong anywhere else are located in `__init__.py`, which is where the `yt_app` object is instantiated.
* The actual html for yt-local is generated using Jinja templates. Jinja lets you embed a Python-like language inside html files so you can use constructs such as for loops to construct the html for a list of 30 videos given a dictionary with information for those videos. Jinja is included as part of Flask. It has some annoying differences from Python in a lot of details, so check the [docs here](https://jinja.palletsprojects.com/en/2.11.x/) when you use it. The request handling functions will pass the information that has been scraped from YouTube into these templates for the final result.
* The project uses the gevent library for parallelism (such as for launching requests in parallel), as opposed to using the async keyword.
## util.py
### util.py
* util.py is a grab-bag of miscellaneous things; admittedly I need to get around to refactoring it. The biggest thing it has is the `fetch_url` function which is what I use for sending out requests for YouTube. The Tor routing is managed here. `fetch_url` will raise an a `FetchError` exception if the request fails. The parameter `debug_name` in `fetch_url` is the filename that the response from YouTube will be saved to if the hidden debugging option is enabled in settings.txt. So if there's a bug when YouTube changes something, you can check the response from YouTube from that file.
## Data extraction - protobuf, polymer, and yt_data_extract
### Data extraction - protobuf, polymer, and yt_data_extract
* proto.py is used for generating what are called ctokens needed when making requests to YouTube. These ctokens use Google's [protobuf](https://developers.google.com/protocol-buffers) format. Figuring out how to generate these in new instances requires some reverse engineering. I have a messy python file I use to make this convenient which you can find under ./youtube/proto_debug.py
* The responses from YouTube are in a JSON format called polymer (polymer is the name of the 2017-present YouTube layout). The JSON consists of a bunch of nested dictionaries which basically specify the layout of the page via objects called renderers. A renderer represents an object on a page in a similar way to html tags; the renders often contain renders inside them. The Javascript on YouTube's page translates this JSON to HTML. Example: `compactVideoRenderer` represents a video item in you can click on such as in the related videos (so these are called "items" in the codebase). This JSON is very messy. You'll need a JSON prettifier or something that gives you a tree view in order to study it.
@@ -46,15 +51,16 @@
* The `extract_items` function is similar but works on the response object, automatically finding the appropriate renderer to call `extract_items_from_renderer` on.
### Other
## Other
* subscriptions.py uses SQLite to store data.
* Hidden settings only relevant to developers (such as for debugging) are not displayed on the settings page. They can be found in the settings.txt file.
* Since I can't anticipate the things that will trip up beginners to the codebase, if you spend awhile figuring something out, go ahead and make a pull request adding a brief description of your findings to this document to help other beginners.
## Development tips
### Development tips
* When developing functionality to interact with YouTube in new ways, you'll want to use the network tab in your browser's devtools to inspect which requests get made under normal usage of YouTube. You'll also want a tool you can use to construct custom requests and specify headers to reverse engineer the request format. I use the [HeaderTool](https://github.com/loreii/HeaderTool) extension in Firefox, but there's probably a more streamlined program out there.
* You'll want to have a utility or IDE that can perform full text search on a repository, since this is crucial for navigating unfamiliar codebases to figure out where certain strings appear or where things get defined.

View File

@@ -33,7 +33,7 @@ def check_subp(x):
raise Exception('Got nonzero exit code from command')
def log(line):
print('[generate_release.py] ' + line)
print(f'[generate_release.py] {line}')
# https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions
def remove_files_with_extensions(path, extensions):
@@ -43,23 +43,23 @@ def remove_files_with_extensions(path, extensions):
os.remove(os.path.join(root, file))
def download_if_not_exists(file_name, url, sha256=None):
if not os.path.exists('./' + file_name):
if not os.path.exists(f'./{file_name}'):
# Reject non-https URLs so a mistaken constant cannot cause a
# plaintext download (bandit B310 hardening).
if not url.startswith('https://'):
raise Exception('Refusing to download over non-https URL: ' + url)
log('Downloading ' + file_name + '..')
raise Exception(f'Refusing to download over non-https URL: {url}')
log(f'Downloading {file_name}..')
data = urllib.request.urlopen(url).read()
log('Finished downloading ' + file_name)
with open('./' + file_name, 'wb') as f:
log(f'Finished downloading {file_name}')
with open(f'./{file_name}', 'wb') as f:
f.write(data)
if sha256:
digest = hashlib.sha256(data).hexdigest()
if digest != sha256:
log('Error: ' + file_name + ' has wrong hash: ' + digest)
log(f'Error: {file_name} has wrong hash: {digest}')
sys.exit(1)
else:
log('Using existing ' + file_name)
log(f'Using existing {file_name}')
def wine_run_shell(command):
# Keep argv-style invocation (no shell) to avoid command injection.
@@ -120,7 +120,7 @@ if len(os.listdir('./yt-local')) == 0:
# ----------- Generate embedded python distribution -----------
os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution
get_pip_url = 'https://bootstrap.pypa.io/get-pip.py'
latest_dist_url = 'https://www.python.org/ftp/python/' + latest_version + '/python-' + latest_version
latest_dist_url = f'https://www.python.org/ftp/python/{latest_version}/python-{latest_version}'
if bitness == '32':
latest_dist_url += '-embed-win32.zip'
else:
@@ -142,7 +142,7 @@ else:
download_if_not_exists('get-pip.py', get_pip_url)
python_dist_name = 'python-dist-' + latest_version + '-' + bitness + '.zip'
python_dist_name = f'python-dist-{latest_version}-{bitness}.zip'
download_if_not_exists(python_dist_name, latest_dist_url)
download_if_not_exists(visual_c_name,
@@ -203,7 +203,7 @@ and replaced with a .pth. Isolated mode will have to be specified manually.
log('Removing ._pth')
major_release = latest_version.split('.')[1]
os.remove(r'./python/python3' + major_release + '._pth')
os.remove(rf'./python/python3{major_release}._pth')
log('Adding path_fixes.pth')
with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
@@ -214,7 +214,7 @@ with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
# Need to add the directory where packages are installed,
# and the parent directory (which is where the yt-local files are)
major_release = latest_version.split('.')[1]
with open('./python/python3' + major_release + '._pth', 'a', encoding='utf-8') as f:
with open(rf'./python/python3{major_release}._pth', 'a', encoding='utf-8') as f:
f.write('.\\Lib\\site-packages\n')
f.write('..\n')'''
@@ -255,10 +255,10 @@ log('Copying python distribution into release folder')
shutil.copytree(r'./python', r'./yt-local/python')
# ----------- Create release zip -----------
output_filename = 'yt-local-' + release_tag + '-' + suffix + '.zip'
if os.path.exists('./' + output_filename):
output_filename = f'yt-local-{release_tag}-{suffix}.zip'
if os.path.exists(f'./{output_filename}'):
log('Removing previous zipped release')
os.remove('./' + output_filename)
os.remove(f'./{output_filename}')
log('Zipping release')
check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local']))

View File

@@ -32,9 +32,9 @@ def youtu_be(env, start_response):
id = env['PATH_INFO'][1:]
env['PATH_INFO'] = '/watch'
if not env['QUERY_STRING']:
env['QUERY_STRING'] = 'v=' + id
env['QUERY_STRING'] = f'v={id}'
else:
env['QUERY_STRING'] += '&v=' + id
env['QUERY_STRING'] += f'&v={id}'
yield from yt_app(env, start_response)
@@ -64,12 +64,12 @@ def proxy_site(env, start_response, video=False):
if 'HTTP_RANGE' in env:
send_headers['Range'] = env['HTTP_RANGE']
url = "https://" + env['SERVER_NAME'] + env['PATH_INFO']
url = f"https://{env['SERVER_NAME']}{env['PATH_INFO']}"
# remove /name portion
if video and '/videoplayback/name/' in url:
url = url[0:url.rfind('/name/')]
if env['QUERY_STRING']:
url += '?' + env['QUERY_STRING']
url += f'?{env["QUERY_STRING"]}'
try_num = 1
first_attempt = True
@@ -96,7 +96,7 @@ def proxy_site(env, start_response, video=False):
+[('Access-Control-Allow-Origin', '*')])
if first_attempt:
start_response(str(response.status) + ' ' + response.reason,
start_response(f"{response.status} {response.reason}",
response_headers)
content_length = int(dict(response_headers).get('Content-Length', 0))
@@ -136,9 +136,8 @@ def proxy_site(env, start_response, video=False):
fail_byte = start + total_received
send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end)
print(
'Warning: YouTube closed the connection before byte',
str(fail_byte) + '.', 'Expected', start+content_length,
'bytes.'
f'Warning: YouTube closed the connection before byte {fail_byte}. '
f'Expected {start+content_length} bytes.'
)
retry = True
@@ -185,7 +184,7 @@ def split_url(url):
# python STILL doesn't have a proper regular expression engine like grep uses built in...
match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url)
if match is None:
raise ValueError('Invalid or unsupported url: ' + url)
raise ValueError(f'Invalid or unsupported url: {url}')
return match.group(1), match.group(2)
@@ -238,7 +237,7 @@ def site_dispatch(env, start_response):
if base_name == '':
base_name = domain
else:
base_name = domain + '.' + base_name
base_name = f"{domain}.{base_name}"
try:
handler = site_handlers[base_name]

View File

@@ -397,14 +397,14 @@ acceptable_targets = SETTINGS_INFO.keys() | {
def comment_string(comment):
result = ''
for line in comment.splitlines():
result += '# ' + line + '\n'
result += f'# {line}\n'
return result
def save_settings(settings_dict):
with open(settings_file_path, 'w', encoding='utf-8') as file:
for setting_name, setting_info in SETTINGS_INFO.items():
file.write(comment_string(setting_info['comment']) + setting_name + ' = ' + repr(settings_dict[setting_name]) + '\n\n')
file.write(f"{comment_string(setting_info['comment'])}{setting_name} = {repr(settings_dict[setting_name])}\n\n")
def add_missing_settings(settings_dict):
@@ -481,7 +481,7 @@ upgrade_functions = {
def log_ignored_line(line_number, message):
print('WARNING: Ignoring settings.txt line ' + str(line_number) + ' (' + message + ')')
print(f'WARNING: Ignoring settings.txt line {line_number} ({message})')
if os.path.isfile("settings.txt"):
@@ -535,7 +535,7 @@ else:
continue
if target.id not in acceptable_targets:
log_ignored_line(node.lineno, target.id + " is not a valid setting")
log_ignored_line(node.lineno, f"{target.id} is not a valid setting")
continue
if type(node.value) not in attributes:
@@ -645,6 +645,6 @@ def settings_page():
for func, old_value, value in to_call:
func(old_value, value)
return flask.redirect(util.URL_ORIGIN + '/settings', 303)
return flask.redirect(f'{util.URL_ORIGIN}/settings', 303)
else:
flask.abort(400)

View File

@@ -27,7 +27,7 @@ class TestChannelCtokenV5:
def _decode_outer(self, ctoken):
"""Decode the outer protobuf layer of a ctoken."""
raw = base64.urlsafe_b64decode(ctoken + '==')
raw = base64.urlsafe_b64decode(f'{ctoken}==')
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
def test_shorts_token_generates_without_error(self):
@@ -68,8 +68,8 @@ class TestChannelCtokenV5:
assert t_with_shorts != t_without_shorts
# Decode and verify the filter is present
raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==')
raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==')
raw_with_shorts = base64.urlsafe_b64decode(f'{t_with_shorts}==')
raw_without_shorts = base64.urlsafe_b64decode(f'{t_without_shorts}==')
# Parse the outer protobuf structure
import youtube.proto as proto
@@ -95,8 +95,8 @@ class TestChannelCtokenV5:
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
# Decode the base64 data
decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==')
decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==')
decoded_with_bytes = base64.urlsafe_b64decode(f'{decoded_with}==')
decoded_without_bytes = base64.urlsafe_b64decode(f'{decoded_without}==')
# Parse the decoded protobuf data
fields_with = list(proto.read_protobuf(decoded_with_bytes))

View File

@@ -0,0 +1,72 @@
import pytest
from youtube import watch_formats
class TestCodecName:
def test_avc_returns_h264(self):
assert watch_formats.codec_name('avc1.64001F') == 'h264'
def test_av01_returns_av1(self):
assert watch_formats.codec_name('av01.0.05M.08') == 'av1'
def test_vp9_returns_vp(self):
assert watch_formats.codec_name('vp9') == 'vp'
def test_unknown_returns_unknown(self):
assert watch_formats.codec_name('unknown_codec') == 'unknown'
class TestVideoQualityString:
def test_with_vcodec(self):
fmt = {'vcodec': 'avc1', 'width': 1920, 'height': 1080, 'fps': 30}
assert watch_formats.video_quality_string(fmt) == '1920x1080 30fps'
def test_with_vcodec_no_fps(self):
fmt = {'vcodec': 'avc1', 'width': 1280, 'height': 720}
assert watch_formats.video_quality_string(fmt) == '1280x720'
def test_with_acodec_only(self):
fmt = {'acodec': 'mp4a.40.2'}
assert watch_formats.video_quality_string(fmt) == 'audio only'
def test_empty(self):
fmt = {}
assert watch_formats.video_quality_string(fmt) == '?'
class TestShortVideoQualityString:
def test_with_fps(self):
fmt = {'quality': 1080, 'fps': 60, 'vcodec': 'av01.0.05M.08'}
assert watch_formats.short_video_quality_string(fmt) == '1080p60 AV1'
def test_h264(self):
fmt = {'quality': 720, 'fps': 30, 'vcodec': 'avc1.64001E'}
assert watch_formats.short_video_quality_string(fmt) == '720p30 h264'
class TestAudioQualityString:
def test_with_bitrate(self):
fmt = {'acodec': 'mp4a.40.2', 'audio_bitrate': 128}
assert watch_formats.audio_quality_string(fmt) == '128k'
def test_with_sample_rate(self):
fmt = {'acodec': 'mp4a.40.2', 'audio_bitrate': 128, 'audio_sample_rate': 44100}
assert watch_formats.audio_quality_string(fmt) == '128k 44.1kHz'
def test_video_only(self):
fmt = {'vcodec': 'avc1'}
assert watch_formats.audio_quality_string(fmt) == 'video only'
class TestFormatBytes:
def test_none(self):
assert watch_formats.format_bytes(None) == 'N/A'
def test_bytes(self):
assert watch_formats.format_bytes(512) == '512.00B'
def test_kibibytes(self):
assert watch_formats.format_bytes(1024) == '1.00KiB'
def test_mebibytes(self):
assert watch_formats.format_bytes(1048576) == '1.00MiB'

View File

@@ -76,7 +76,7 @@ theme_names = {
@yt_app.context_processor
def inject_theme_preference():
return {
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css',
'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css',
'settings': settings,
# Detect version
'current_version': app_version()['version'],
@@ -145,9 +145,9 @@ def error_page(e):
' exit node is overutilized. Try getting a new exit node by'
' using the New Identity button in the Tor Browser.')
if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message
error_message += f'\n\n{fetch_err.error_message}'
if fetch_err.ip:
error_message += '\n\nExit node IP address: ' + fetch_err.ip
error_message += f'\n\nExit node IP address: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif error_code == '429':
@@ -157,7 +157,7 @@ def error_page(e):
'• Enable Tor routing in Settings for automatic IP rotation\n'
'• Use a VPN to change your IP address')
if fetch_err.ip:
error_message += '\n\nYour IP: ' + fetch_err.ip
error_message += f'\n\nYour IP: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 429
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
@@ -179,7 +179,7 @@ def error_page(e):
# Catch-all for any other FetchError (400, etc.)
error_message = f'Error communicating with YouTube ({error_code}).'
if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message
error_message += f'\n\n{fetch_err.error_message}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
return flask.render_template('error.html', traceback=traceback.format_exc(),

View File

@@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
# For now it seems to be constant for the API endpoint, not dependent
# on the browsing session or channel
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = {
'context': {
@@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id):
return 1000
# Uploads playlist
playlist_id = 'UU' + channel_id[2:]
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
playlist_id = f'UU{channel_id[2:]}'
url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1'
try:
response = util.fetch_url(url, headers_mobile,
@@ -328,7 +328,7 @@ def get_channel_id(base_url):
# method that gives the smallest possible response at ~4 kb
# needs to be as fast as possible
base_url = base_url.replace('https://www', 'https://m') # avoid redirect
response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile,
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
match = channel_id_re.search(response)
if match:
@@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page):
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = {
'context': {
@@ -414,18 +414,18 @@ def post_process_channel_info(info):
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
if channel_id:
base_url = 'https://www.youtube.com/channel/' + channel_id
base_url = f'https://www.youtube.com/channel/{channel_id}'
# Build URL with sort parameter
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
# Note: 'da' (oldest) was removed by YouTube in January 2026
url = base_url + '/' + tab + '?pbj=1&view=0'
url = f'{base_url}/{tab}?pbj=1&view=0'
if sort:
# Map sort values to YouTube's URL parameter values
sort_map = {'3': 'dd', '4': 'lad'}
url += '&sort=' + sort_map.get(sort, 'dd')
url += f'&sort={sort_map.get(sort, "dd")}'
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab)
return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}')
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
@@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
if page_number == 1:
tasks = (
gevent.spawn(playlist.playlist_first_page,
'UU' + channel_id[2:],
f'UU{channel_id[2:]}',
report_text='Retrieved channel videos'),
gevent.spawn(get_metadata, channel_id),
)
@@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
set_cached_number_of_videos(channel_id, number_of_videos)
else:
tasks = (
gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:],
gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}',
page_number, include_shorts=True),
gevent.spawn(get_metadata, channel_id),
gevent.spawn(get_number_of_videos_channel, channel_id),
gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:],
gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}',
report_text='Retrieved channel video count'),
)
gevent.joinall(tasks)
@@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
elif tab == 'search' and channel_id:
polymer_json = get_channel_search_json(channel_id, query, page_number)
elif tab == 'search':
url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='')
url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}'
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
elif tab != 'videos':
flask.abort(404, 'Unknown channel tab: ' + tab)
flask.abort(404, f'Unknown channel tab: {tab}')
if polymer_json is not None and info is None:
info = yt_data_extract.extract_channel_info(
@@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
return flask.render_template('error.html', error_message=info['error'])
if channel_id:
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}'
info['channel_id'] = channel_id
else:
channel_id = info['channel_id']
@@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
@yt_app.route('/channel/<channel_id>/')
@yt_app.route('/channel/<channel_id>/<tab>')
def get_channel_page(channel_id, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id)
@yt_app.route('/user/<username>/')
@yt_app.route('/user/<username>/<tab>')
def get_user_page(username, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request)
@yt_app.route('/c/<custom>/')
@yt_app.route('/c/<custom>/<tab>')
def get_custom_c_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request)
@yt_app.route('/<custom>')
@yt_app.route('/<custom>/<tab>')
def get_toplevel_custom_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)
return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request)

View File

@@ -104,20 +104,19 @@ def post_process_comments_info(comments_info):
comment['replies_url'] = None
comment['replies_url'] = concat_or_none(
util.URL_ORIGIN,
'/comments?replies=1&ctoken=' + ctoken)
f'/comments?replies=1&ctoken={ctoken}')
if reply_count == 0:
comment['view_replies_text'] = 'Reply'
elif reply_count == 1:
comment['view_replies_text'] = '1 reply'
else:
comment['view_replies_text'] = str(reply_count) + ' replies'
comment['view_replies_text'] = f'{reply_count} replies'
if comment['approx_like_count'] == '1':
comment['likes_text'] = '1 like'
else:
comment['likes_text'] = (str(comment['approx_like_count'])
+ ' likes')
comment['likes_text'] = f"{comment['approx_like_count']} likes"
comments_info['include_avatars'] = settings.enable_comment_avatars
if comments_info['ctoken']:
@@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
comments_info = {'error': None}
try:
other_sort_url = (
util.URL_ORIGIN + '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
f"{util.URL_ORIGIN}/comments?ctoken="
f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}"
)
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}'
this_sort_url = (util.URL_ORIGIN
+ '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken="
f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}")
comments_info['comment_links'] = [
(other_sort_text, other_sort_url),
@@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if e.code == '429' and settings.route_tor:
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
if e.error_message:
comments_info['error'] += '\n\n' + e.error_message
comments_info['error'] += '\n\nExit node IP address: %s' % e.ip
comments_info['error'] += f'\n\n{e.error_message}'
comments_info['error'] += f'\n\nExit node IP address: {e.ip}'
else:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
except Exception as e:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
if comments_info.get('error'):
print('Error retrieving comments for ' + str(video_id) + ':\n' +
comments_info['error'])
print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}')
return comments_info
@@ -218,12 +215,10 @@ def get_comments_page():
other_sort_url = None
else:
other_sort_url = (
util.URL_ORIGIN
+ '/comments?ctoken='
+ make_comment_ctoken(comments_info['video_id'],
sort=1-comments_info['sort'])
f'{util.URL_ORIGIN}/comments?ctoken='
f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}'
)
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top')
other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}'
comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
return flask.render_template(

190
youtube/constants.py Normal file
View File

@@ -0,0 +1,190 @@
"""Constants used across yt-local application."""
import collections
YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com')
DEFAULT_USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'
MOBILE_USER_AGENT = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
REPLACEMENT_MAP = collections.OrderedDict([
('<', '_'),
('>', '_'),
(': ', ' - '),
(':', '-'),
('"', "'"),
('/', '_'),
('\\', '_'),
('|', '-'),
('?', ''),
('*', '_'),
('\t', ' '),
])
DOS_RESERVED_NAMES = frozenset({
'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
'lpt8', 'lpt9'
})
INNERTUBE_CLIENTS = {
'android': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'ANDROID',
'clientVersion': '21.02.35',
'osName': 'Android',
'osVersion': '11',
'androidSdkVersion': 30,
'platform': 'MOBILE',
'userAgent': 'com.google.android.youtube/21.02.35 (Linux; U; Android 11) gzip'
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
},
'ios': {
'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS',
'clientVersion': '21.02.3',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.3.2.22D82',
'userAgent': 'com.google.ios.youtube/21.02.3 (iPhone16,2; U; CPU iOS 18_3_2 like Mac OS X;)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False
},
'tv_embedded': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5_SIMPLY',
'clientVersion': '1.0',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 75,
'REQUIRE_JS_PLAYER': True,
},
'web': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB',
'clientVersion': '2.20260114.08.00',
'userAgent': DEFAULT_USER_AGENT,
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1
},
'web_embedded': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'WEB_EMBEDDED_PLAYER',
'clientVersion': '1.20260115.01.00',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 56,
'REQUIRE_JS_PLAYER': True,
},
'mweb': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'MWEB',
'clientVersion': '2.20260115.01.00',
'userAgent': 'Mozilla/5.0 (iPad; CPU OS 16_7_10 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1,gzip(gfe)',
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 2,
'REQUIRE_JS_PLAYER': True,
},
'tv': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5',
'clientVersion': '7.20260114.12.00',
'userAgent': 'Mozilla/5.0 (ChromiumStylePlatform) Cobalt/25.lts.30.1034943-gold (unlike Gecko), Unknown_TV_Unknown_0/Unknown (Unknown, Unknown)',
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 7,
'REQUIRE_JS_PLAYER': True,
},
'android_vr': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_VR',
'clientVersion': '1.65.10',
'deviceMake': 'Oculus',
'deviceModel': 'Quest 3',
'androidSdkVersion': 32,
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.65.10 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'osName': 'Android',
'osVersion': '12L',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
'REQUIRE_JS_PLAYER': False,
},
}
THEME_NAMES = {
0: 'light_theme',
1: 'gray_theme',
2: 'dark_theme',
}
FONT_CHOICES = {
0: 'initial',
1: '"liberation serif", "times new roman", calibri, carlito, serif',
2: 'arial, "liberation sans", sans-serif',
3: 'verdana, sans-serif',
4: 'tahoma, sans-serif',
}
URL_ORIGIN = "/https://www.youtube.com"
MAX_RETRIES = 5
BASE_DELAY = 1.0
TOR_DEFAULT_PORT = 9050
TOR_CONTROL_DEFAULT_PORT = 9151
DEFAULT_PORT = 9010
# Backward compatibility aliases (matching existing code names)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
mobile_ua = (('User-Agent', MOBILE_USER_AGENT),)
json_header = (('Content-Type', 'application/json'),)
# Re-export for convenience
url_origin = URL_ORIGIN

View File

@@ -41,8 +41,8 @@ def app_version():
describe = minimal_env_cmd(['git', 'describe', '--tags', '--always'])
git_revision = describe.strip().decode('ascii')
branch = minimal_env_cmd(['git', 'branch'])
git_branch = branch.strip().decode('ascii').replace('* ', '')
branch = minimal_env_cmd(['git', 'branch', '--show-current'])
git_branch = branch.strip().decode('ascii')
subst_list.update({
'branch': git_branch,

View File

@@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name):
util.add_extra_html_info(video)
if video['id'] + '.jpg' in thumbnails:
video['thumbnail'] = (
'/https://youtube.com/data/playlist_thumbnails/'
+ playlist_name
+ '/' + video['id'] + '.jpg')
f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg')
else:
video['thumbnail'] = util.get_thumbnail_url(video['id'])
missing_thumbnails.append(video['id'])

View File

@@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
continuation_info = proto.string(3, proto.percent_b64encode(offset))
playlist_id = proto.string(2, 'VL' + playlist_id)
playlist_id = proto.string(2, f'VL{playlist_id}')
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
@@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
use_mobile=False):
# Use innertube API (pbj=1 no longer works for many playlists)
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = {
'context': {
@@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
'clientVersion': '2.20240327.00.00',
},
},
'browseId': 'VL' + playlist_id,
'browseId': f'VL{playlist_id}',
}
content_type_header = (('Content-Type', 'application/json'),)
@@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
page_size = 100
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
include_shorts=include_shorts)
@@ -97,7 +97,7 @@ def get_playlist_page():
if playlist_id.startswith('RD'):
first_video_id = playlist_id[2:] # video ID after 'RD' prefix
return flask.redirect(
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id,
f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}',
302
)
@@ -132,9 +132,9 @@ def get_playlist_page():
if 'id' in item and not item.get('thumbnail'):
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
item['url'] += '&list=' + playlist_id
item['url'] += f'&list={playlist_id}'
if item['index']:
item['url'] += '&index=' + str(item['index'])
item['url'] += f'&index={item["index"]}'
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
if video_count is None:

View File

@@ -76,7 +76,7 @@ def read_varint(data):
except IndexError:
if i == 0:
raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i
if not byte & 128:
break
@@ -118,7 +118,7 @@ def read_protobuf(data):
elif wire_type == 5:
value = data.read(4)
else:
raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell()))
raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}")
yield (wire_type, field_number, value)
@@ -170,8 +170,7 @@ def _make_protobuf(data):
elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2]))
else:
raise NotImplementedError('Wire type ' + str(field[0])
+ ' not implemented')
raise NotImplementedError(f'Wire type {field[0]} not implemented')
return result
return data
@@ -218,4 +217,4 @@ def b64_to_bytes(data):
if isinstance(data, bytes):
data = data.decode('ascii')
data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')

View File

@@ -179,7 +179,7 @@ def read_varint(data):
except IndexError:
if i == 0:
raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i
if not byte & 128:
break
@@ -235,8 +235,7 @@ def _make_protobuf(data):
elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2]))
else:
raise NotImplementedError('Wire type ' + str(field[0])
+ ' not implemented')
raise NotImplementedError(f'Wire type {field[0]} not implemented')
return result
return data
@@ -286,7 +285,7 @@ def b64_to_bytes(data):
if isinstance(data, bytes):
data = data.decode('ascii')
data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
# --------------------------------------------------------------------
@@ -344,7 +343,7 @@ fromhex = bytes.fromhex
def aligned_ascii(data):
return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data)
return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data)
def parse_protobuf(data, mutable=False, spec=()):
@@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()):
elif wire_type == 5:
value = data.read(4)
else:
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell()))
raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}")
if mutable:
yield [wire_type, field_number, value]
else:
@@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None):
if map01 is not None:
map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b'01', b'O' + map01))
s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}'))
if casefold:
s = s.upper()
# Strip off pad characters from the right. We need to count the pad
@@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None):
def dec32(data):
if isinstance(data, bytes):
data = data.decode('ascii')
return b32decode(data + "="*((8 - len(data)%8)%8))
return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}')
_patterns = [
@@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work
if len(obj) == 3: # (wire_type, field_number, data)
return obj.__repr__()
else: # (base64, [...])
return ('(' + obj[0].__repr__() + ',\n'
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
+ ')')
return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)"
elif isinstance(obj, list):
# [wire_type, field_number, data]
if (len(obj) == 3
@@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work
elif (len(obj) == 3
and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
):
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n'
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
+ ']')
return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]"
else:
s = '[\n'
for x in obj:
s += indent_lines(_pp(x, indent), indent) + ',\n'
s += f"{indent_lines(_pp(x, indent), indent)},\n"
s += ']'
return s
else:

View File

@@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}"
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
info = json.loads(content)
return info

View File

@@ -150,7 +150,7 @@
* Create custom quality control in Plyr controls
*/
function addCustomQualityControl(player, qualityLabels) {
player.on('ready', () => {
function doAdd() {
console.log('Adding custom quality control...');
const controls = player.elements.container.querySelector('.plyr__controls');
@@ -238,14 +238,21 @@
}
console.log('Custom quality control added');
});
}
// Run immediately if Plyr is already ready, otherwise wait
if (player.ready) {
doAdd();
} else {
player.on('ready', doAdd);
}
}
/**
* Create custom audio tracks control in Plyr controls
*/
function addCustomAudioTracksControl(player, hlsInstance) {
player.on('ready', () => {
function doAdd() {
console.log('Adding custom audio tracks control...');
const controls = player.elements.container.querySelector('.plyr__controls');
@@ -397,52 +404,32 @@
});
console.log('Custom audio tracks control added');
});
}
// Run immediately if Plyr is already ready, otherwise wait
if (player.ready) {
doAdd();
} else {
player.on('ready', doAdd);
}
}
/**
* Initialize Plyr with HLS quality options
* Main initialization
*/
function initPlyrWithQuality(hlsInstance) {
async function start() {
console.log('Starting Plyr with HLS...');
if (typeof hls_manifest_url === 'undefined' || !hls_manifest_url) {
console.error('No HLS manifest URL available');
return;
}
// Initialize Plyr immediately so the player UI shows right away
// instead of a bare <video> element while the manifest loads.
const video = document.getElementById('js-video-player');
if (!hlsInstance || !hlsInstance.levels || hlsInstance.levels.length === 0) {
console.error('HLS not ready');
return;
}
if (!video) {
console.error('Video element not found');
return;
}
console.log('HLS levels available:', hlsInstance.levels.length);
const sortedLevels = [...hlsInstance.levels].sort((a, b) => b.height - a.height);
const seenHeights = new Set();
const uniqueLevels = [];
sortedLevels.forEach((level) => {
if (!seenHeights.has(level.height)) {
seenHeights.add(level.height);
uniqueLevels.push(level);
}
});
const qualityLabels = ['auto'];
uniqueLevels.forEach((level) => {
const originalIndex = hlsInstance.levels.indexOf(level);
const label = level.height + 'p';
if (!window.hlsQualityMap[label]) {
qualityLabels.push(label);
window.hlsQualityMap[label] = originalIndex;
}
});
console.log('Quality labels:', qualityLabels);
const playerOptions = {
if (video) {
plyrInstance = new Plyr(video, {
autoplay: autoplayActive,
disableContextMenu: false,
captions: {
@@ -472,64 +459,64 @@
src: typeof storyboard_url !== 'undefined' && storyboard_url !== null ? [storyboard_url] : [],
},
settings: ['captions', 'speed', 'loop'],
tooltips: {
controls: true,
},
};
console.log('Creating Plyr...');
try {
plyrInstance = new Plyr(video, playerOptions);
console.log('Plyr instance created');
tooltips: { controls: true },
});
window.plyrInstance = plyrInstance;
addCustomQualityControl(plyrInstance, qualityLabels);
addCustomAudioTracksControl(plyrInstance, hlsInstance);
if (plyrInstance.eventListeners) {
plyrInstance.eventListeners.forEach(function(eventListener) {
if(eventListener.type === 'dblclick') {
eventListener.element.removeEventListener(eventListener.type, eventListener.callback, eventListener.options);
if (eventListener.type === 'dblclick') {
eventListener.element.removeEventListener(
eventListener.type, eventListener.callback, eventListener.options);
}
});
}
plyrInstance.started = false;
plyrInstance.once('playing', function(){this.started = true});
plyrInstance.once('playing', function(){ this.started = true; });
if (typeof data !== 'undefined' && data.time_start != 0) {
video.addEventListener('loadedmetadata', function() {
video.currentTime = data.time_start;
});
}
console.log('Plyr init complete');
} catch (e) {
console.error('Failed to initialize Plyr:', e);
}
}
/**
* Main initialization
*/
async function start() {
console.log('Starting Plyr with HLS...');
if (typeof hls_manifest_url === 'undefined' || !hls_manifest_url) {
console.error('No HLS manifest URL available');
return;
}
try {
const hlsInstance = await initHLS(hls_manifest_url);
initPlyrWithQuality(hlsInstance);
// Manifest is ready — add quality and audio controls
addCustomQualityControl(plyrInstance, buildQualityLabels(hlsInstance));
addCustomAudioTracksControl(plyrInstance, hlsInstance);
} catch (error) {
console.error('Failed to initialize:', error);
console.error('Failed to initialize HLS:', error);
}
}
/**
* Build quality labels from HLS levels
*/
function buildQualityLabels(hlsInstance) {
const qualityLabels = ['auto'];
if (!hlsInstance || !hlsInstance.levels) return qualityLabels;
const sortedLevels = [...hlsInstance.levels].sort((a, b) => b.height - a.height);
const seenHeights = new Set();
sortedLevels.forEach((level) => {
if (!seenHeights.has(level.height)) {
seenHeights.add(level.height);
const originalIndex = hlsInstance.levels.indexOf(level);
const label = level.height + 'p';
if (!window.hlsQualityMap[label]) {
qualityLabels.push(label);
window.hlsQualityMap[label] = originalIndex;
}
}
});
return qualityLabels;
}
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', start);
} else {

View File

@@ -31,9 +31,34 @@ if (data.using_pair_sources) {
avMerge = new AVMerge(video, srcPair, 0);
}
// Quality selector
// Quality selector — populate with available sources
const qs = document.getElementById('quality-select');
if (qs) {
// Clear the HLS-oriented "Auto" default; DASH has discrete sources
qs.innerHTML = '';
// Add pair_sources (video+audio, used by AVMerge)
if (data['pair_sources'] && data['pair_sources'].length) {
data['pair_sources'].forEach(function(src, i) {
let opt = document.createElement('option');
opt.value = JSON.stringify({type: 'pair', index: i});
opt.textContent = src.quality_string;
if (i === data['pair_idx']) opt.selected = true;
qs.appendChild(opt);
});
}
// Add uni_sources (integrated video+audio, single file)
if (data['uni_sources'] && data['uni_sources'].length) {
data['uni_sources'].forEach(function(src, i) {
let opt = document.createElement('option');
opt.value = JSON.stringify({type: 'uni', index: i});
opt.textContent = src.quality_string;
if (!data['pair_sources'].length && i === data['uni_idx']) opt.selected = true;
qs.appendChild(opt);
});
}
qs.addEventListener('change', function(e) {
changeQuality(JSON.parse(this.value))
});

View File

@@ -26,7 +26,17 @@ function initHLSNative(manifestUrl) {
lowLatencyMode: false,
maxBufferLength: 30,
maxMaxBufferLength: 60,
maxBufferHole: 0.5,
startLevel: -1,
// Prevent stalls on quality switch: nudge playback past small gaps
nudgeMaxRetry: 5,
// Allow more time for segments coming through our proxy
fragLoadingTimeOut: 30000,
fragLoadingMaxRetry: 5,
fragLoadingRetryDelay: 1000,
levelLoadingTimeOut: 15000,
levelLoadingMaxRetry: 4,
levelLoadingRetryDelay: 1000,
});
window.hls = hls;
@@ -89,15 +99,26 @@ function initHLSNative(manifestUrl) {
console.error('HLS fatal error:', data.type, data.details);
switch(data.type) {
case Hls.ErrorTypes.NETWORK_ERROR:
console.warn('HLS network error, attempting recovery...');
hls.startLoad();
break;
case Hls.ErrorTypes.MEDIA_ERROR:
console.warn('HLS media error, attempting recovery...');
hls.recoverMediaError();
break;
default:
hls.destroy();
break;
}
} else {
// Non-fatal errors can still cause stalls, especially
// bufferStalledError after a quality switch through our proxy
console.warn('HLS non-fatal error:', data.type, data.details);
if (data.details === 'bufferStalledError') {
// Buffer ran dry — HLS.js is waiting for data.
// Nudge it to retry loading the current fragment.
hls.startLoad();
}
}
});
@@ -122,13 +143,36 @@ function initPlayer() {
initHLSNative(hls_manifest_url);
const qualitySelect = document.getElementById('quality-select');
// Set initial Auto option while manifest loads
if (qualitySelect) {
qualitySelect.innerHTML = '<option value="-1" selected>Auto</option>';
}
if (qualitySelect) {
qualitySelect.addEventListener('change', function () {
const level = parseInt(this.value);
if (hls) {
hls.currentLevel = level;
console.log('Quality:', level === -1 ? 'Auto' : hls.levels[level]?.height + 'p');
const currentTime = video.currentTime;
const wasPaused = video.paused;
// Use nextLevel for smoother transition: it waits for the
// current segment to finish before switching, avoiding an
// abrupt buffer flush that starves the player.
if (level === -1) {
// Back to auto — re-enable ABR
hls.currentLevel = -1;
console.log('Quality: Auto (ABR)');
} else {
hls.nextLevel = level;
console.log('Quality: switching to',
hls.levels[level]?.height + 'p');
}
// If the video was already stalled, kick the loader
// so it starts fetching the new level immediately.
if (video.readyState < 3) {
hls.startLoad(currentTime);
}
}
});
}

View File

@@ -126,7 +126,7 @@ def delete_thumbnails(to_delete):
os.remove(os.path.join(thumbnails_directory, thumbnail))
existing_thumbnails.remove(video_id)
except Exception:
print('Failed to delete thumbnail: ' + thumbnail)
print(f'Failed to delete thumbnail: {thumbnail}')
traceback.print_exc()
@@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None):
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'author': db_video[5],
'author_id': db_video[6],
'author_url': '/https://www.youtube.com/channel/' + db_video[6],
'author_url': f'/https://www.youtube.com/channel/{db_video[6]}',
})
return videos, pseudo_number_of_videos
@@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time):
if delta >= unit_time:
quantifier = round(delta/unit_time)
if quantifier == 1:
return '1 ' + unit_name + ' ago'
return f'1 {unit_name} ago'
else:
return str(quantifier) + ' ' + unit_name + 's ago'
return f'{quantifier} {unit_name}s ago'
else:
raise Exception()
@@ -363,7 +363,7 @@ def autocheck_dispatcher():
time_until_earliest_job = earliest_job['next_check_time'] - time.time()
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time']))
print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}')
next_check_time = time.time() + 3600*secrets.randbelow(60)/60
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
@@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids):
def _get_atoma_feed(channel_id):
url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id
url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}'
try:
return util.fetch_url(url).decode('utf-8')
except util.FetchError as e:
@@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name):
return channel_info
except util.FetchError as e:
if e.code == '429' and settings.route_tor:
error_message = ('Error checking channel ' + channel_status_name
+ ': YouTube blocked the request because the'
+ ' Tor exit node is overutilized. Try getting a new exit node'
+ ' by using the New Identity button in the Tor Browser.')
error_message = (f'Error checking channel {channel_status_name}: '
f'YouTube blocked the request because the Tor exit node is overutilized. '
f'Try getting a new exit node by using the New Identity button in the Tor Browser.')
if e.ip:
error_message += ' Exit node IP address: ' + e.ip
error_message += f' Exit node IP address: {e.ip}'
print(error_message)
return None
elif e.code == '502':
print('Error checking channel', channel_status_name + ':', str(e))
print(f'Error checking channel {channel_status_name}: {e}')
return None
raise
@@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id):
except KeyError:
channel_status_name = channel_id
print("Checking channel: " + channel_status_name)
print(f"Checking channel: {channel_status_name}")
tasks = (
# channel page, need for video duration
@@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id):
times_published[video_id_element.text] = time_published
except ValueError:
print('Failed to read atoma feed for ' + channel_status_name)
print(f'Failed to read atoma feed for {channel_status_name}')
traceback.print_exc()
except defusedxml.ElementTree.ParseError:
print('Failed to read atoma feed for ' + channel_status_name)
print(f'Failed to read atoma feed for {channel_status_name}')
if channel_info is None: # there was an error
return
if channel_info['error']:
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error'])
print(f'Error checking channel {channel_status_name}: {channel_info["error"]}')
return
videos = channel_info['items']
@@ -1023,7 +1022,7 @@ def get_subscriptions_page():
tag = request.args.get('tag', None)
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
for video in videos:
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg'
video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg'
video['type'] = 'video'
video['item_size'] = 'small'
util.add_extra_html_info(video)
@@ -1033,7 +1032,7 @@ def get_subscriptions_page():
subscription_list = []
for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
subscription_list.append({
'channel_url': util.URL_ORIGIN + '/channel/' + channel_id,
'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}',
'channel_name': channel_name,
'channel_id': channel_id,
'muted': muted,
@@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}")
break
except util.FetchError as e:
if '404' in str(e):
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(500)
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(e.code)
if image is None:

View File

@@ -75,14 +75,11 @@
<div class="external-player-controls">
<input class="speed" id="speed-control" type="text" title="Video speed">
{% if settings.use_video_player < 2 %}
<!-- Native player quality selector -->
<!-- Quality selector (populated by JS: HLS adds Auto+levels, DASH adds discrete sources) -->
<select id="quality-select" autocomplete="off">
<option value="-1" selected>Auto</option>
<!-- Quality options will be populated by HLS -->
</select>
{% else %}
<select id="quality-select" autocomplete="off" style="display: none;">
<!-- Quality options will be populated by HLS -->
</select>
{% endif %}
{% if settings.use_video_player != 2 %}

View File

@@ -23,6 +23,9 @@ import stem
import stem.control
import traceback
from youtube.yt_data_extract.common import concat_or_none
from youtube import constants
logger = logging.getLogger(__name__)
# The trouble with the requests library: It ships its own certificate bundle via certifi
@@ -72,7 +75,7 @@ class TorManager:
def __init__(self):
self.old_tor_connection_pool = None
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic()
settings.add_setting_changed_hook(
@@ -92,7 +95,7 @@ class TorManager:
self.old_tor_connection_pool = self.tor_connection_pool
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic()
@@ -198,9 +201,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
class FetchError(Exception):
def __init__(self, code, reason='', ip=None, error_message=None):
if error_message:
string = code + ' ' + reason + ': ' + error_message
string = f"{code} {reason}: {error_message}"
else:
string = 'HTTP error during request: ' + code + ' ' + reason
string = f"HTTP error during request: {code} {reason}"
Exception.__init__(self, string)
self.code = code
self.reason = reason
@@ -294,14 +297,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
exception_cause = e.__context__.__context__
if (isinstance(exception_cause, socks.ProxyConnectionError)
and settings.route_tor):
msg = ('Failed to connect to Tor. Check that Tor is open and '
'that your internet connection is working.\n\n'
+ str(e))
msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}'
raise FetchError('502', reason='Bad Gateway',
error_message=msg)
elif isinstance(e.__context__,
urllib3.exceptions.NewConnectionError):
msg = 'Failed to establish a connection.\n\n' + str(e)
msg = f'Failed to establish a connection.\n\n{e}'
raise FetchError(
'502', reason='Bad Gateway',
error_message=msg)
@@ -391,7 +392,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
error_message=f'Automatic circuit change: {error}')
continue # retry with new identity
# Check for client errors (400, 404) - don't retry these
@@ -467,17 +468,14 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
headers = {'User-Agent': 'Python-urllib'}
response = pool.request('HEAD', url, headers=headers, retries=retries)
if report_text:
print(
report_text,
' Latency:',
round(time.monotonic() - start_time, 3))
print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}')
return response
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
mobile_ua = (('User-Agent', mobile_user_agent),)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
json_header = (('Content-Type', 'application/json'),)
mobile_user_agent = constants.MOBILE_USER_AGENT
mobile_ua = constants.mobile_ua
desktop_user_agent = constants.desktop_user_agent
desktop_ua = constants.desktop_ua
json_header = constants.json_header
desktop_xhr_headers = (
('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'),
@@ -544,16 +542,16 @@ def download_thumbnail(save_directory, video_id):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
try:
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id)
thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}')
except FetchError as e:
if '404' in str(e):
continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
print(f'Failed to download thumbnail for {video_id}: {e}')
return False
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
print(f'Failed to download thumbnail for {video_id}: {e}')
return False
try:
with open(save_location, 'wb') as f:
@@ -563,7 +561,7 @@ def download_thumbnail(save_directory, video_id):
with open(save_location, 'wb') as f:
f.write(thumbnail)
return True
print('No thumbnail available for ' + video_id)
print(f'No thumbnail available for {video_id}')
return False
@@ -646,7 +644,7 @@ def update_query_string(query_string, items):
return urllib.parse.urlencode(parameters, doseq=True)
YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com')
YOUTUBE_DOMAINS = constants.YOUTUBE_DOMAINS
YOUTUBE_URL_RE_STR = r'https?://(?:[a-zA-Z0-9_-]*\.)?(?:'
YOUTUBE_URL_RE_STR += r'|'.join(map(re.escape, YOUTUBE_DOMAINS))
YOUTUBE_URL_RE_STR += r')(?:/[^"]*)?'
@@ -673,16 +671,6 @@ def left_remove(string, substring):
return string
def concat_or_none(*strings):
'''Concatenates strings. Returns None if any of the arguments are None'''
result = ''
for string in strings:
if string is None:
return None
result += string
return result
def prefix_urls(item):
if settings.proxy_images:
try:
@@ -698,7 +686,7 @@ def prefix_urls(item):
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None
video_info = {}
for key in ('id', 'title', 'author', 'duration', 'author_id'):
@@ -721,7 +709,7 @@ def add_extra_html_info(item):
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
if item.get('author_id') and 'author_url' not in item:
item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id']
item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}'
def check_gevent_exceptions(*tasks):
@@ -731,24 +719,8 @@ def check_gevent_exceptions(*tasks):
# https://stackoverflow.com/a/62888
replacement_map = collections.OrderedDict([
('<', '_'),
('>', '_'),
(': ', ' - '),
(':', '-'),
('"', "'"),
('/', '_'),
('\\', '_'),
('|', '-'),
('?', ''),
('*', '_'),
('\t', ' '),
])
DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
'lpt8', 'lpt9'}
replacement_map = constants.REPLACEMENT_MAP
DOS_names = constants.DOS_RESERVED_NAMES
def to_valid_filename(name):
@@ -790,143 +762,7 @@ def to_valid_filename(name):
return name
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/youtube.py#L72
INNERTUBE_CLIENTS = {
'android': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'ANDROID',
'clientVersion': '19.09.36',
'osName': 'Android',
'osVersion': '12',
'androidSdkVersion': 31,
'platform': 'MOBILE',
'userAgent': 'com.google.android.youtube/19.09.36 (Linux; U; Android 12; US) gzip'
},
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
#'thirdParty': {
# 'embedUrl': 'https://google.com', # Can be any valid URL
#}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
},
'android-test-suite': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'ANDROID_TESTSUITE',
'clientVersion': '1.9',
'osName': 'Android',
'osVersion': '12',
'androidSdkVersion': 31,
'platform': 'MOBILE',
'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 12; US) gzip'
},
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
#'thirdParty': {
# 'embedUrl': 'https://google.com', # Can be any valid URL
#}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
},
'ios': {
'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS',
'clientVersion': '21.03.2',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.7.2.22H124',
'userAgent': 'com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False
},
# This client can access age restricted videos (unless the uploader has disabled the 'allow embedding' option)
# See: https://github.com/zerodytrash/YouTube-Internal-Clients
'tv_embedded': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
'clientVersion': '2.0',
'clientScreen': 'EMBED',
},
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
'thirdParty': {
'embedUrl': 'https://google.com', # Can be any valid URL
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 85,
'REQUIRE_JS_PLAYER': True,
},
'web': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB',
'clientVersion': '2.20220801.00.00',
'userAgent': desktop_user_agent,
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1
},
'android_vr': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_VR',
'clientVersion': '1.60.19',
'deviceMake': 'Oculus',
'deviceModel': 'Quest 3',
'androidSdkVersion': 32,
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'osName': 'Android',
'osVersion': '12L',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
'REQUIRE_JS_PLAYER': False,
},
'ios_vr': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS_VR',
'clientVersion': '1.0',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.7.2.22H124',
'userAgent': 'com.google.ios.youtube/1.0 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False
},
}
INNERTUBE_CLIENTS = constants.INNERTUBE_CLIENTS
def get_visitor_data():
visitor_data = None
@@ -967,7 +803,7 @@ def call_youtube_api(client, api, data):
user_agent = context['client'].get('userAgent') or mobile_user_agent
visitor_data = get_visitor_data()
url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key
url = f'https://{host}/youtubei/v1/{api}?key={key}'
if visitor_data:
context['client'].update({'visitorData': visitor_data})
data['context'] = context
@@ -978,8 +814,8 @@ def call_youtube_api(client, api, data):
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
response = fetch_url(
url, data=data, headers=headers,
debug_name='youtubei_' + api + '_' + client,
report_text='Fetched ' + client + ' youtubei ' + api
debug_name=f'youtubei_{api}_{client}',
report_text=f'Fetched {client} youtubei {api}'
).decode('utf-8')
return response

View File

@@ -1,3 +1,3 @@
from __future__ import unicode_literals
__version__ = 'v0.5.0'
__version__ = 'v0.5.1'

View File

@@ -17,8 +17,16 @@ from flask import request
import youtube
from youtube import yt_app
from youtube import util, comments, local_playlist, yt_data_extract
from youtube import watch_formats
import settings
# Backward compatibility aliases
codec_name = watch_formats.codec_name
video_quality_string = watch_formats.video_quality_string
short_video_quality_string = watch_formats.short_video_quality_string
audio_quality_string = watch_formats.audio_quality_string
format_bytes = watch_formats.format_bytes
logger = logging.getLogger(__name__)
@@ -29,15 +37,7 @@ except FileNotFoundError:
decrypt_cache = {}
def codec_name(vcodec):
if vcodec.startswith('avc'):
return 'h264'
elif vcodec.startswith('av01'):
return 'av1'
elif vcodec.startswith('vp'):
return 'vp'
else:
return 'unknown'
# codec_name imported from watch_formats
def get_video_sources(info, target_resolution):
@@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and fmt['vcodec']:
if fmt.get('audio_track_is_default', True) is False:
continue
source = {'type': 'video/' + fmt['ext'],
source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)}
source['quality_string'] += ' (integrated)'
source.update(fmt)
@@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
if fmt['bitrate']:
fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
source = {'type': 'audio/' + fmt['ext'],
source = {'type': f"audio/{fmt['ext']}",
'quality_string': audio_quality_string(fmt)}
source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"'
source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
tid = fmt.get('audio_track_id') or 'default'
if tid not in audio_by_track:
audio_by_track[tid] = {
@@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution):
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
if codec_name(fmt['vcodec']) == 'unknown':
continue
source = {'type': 'video/' + fmt['ext'],
source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)}
source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"'
quality = str(fmt['quality']) + 'p' + str(fmt['fps'])
source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
quality = f"{fmt['quality']}p{fmt['fps']}"
video_only_sources.setdefault(quality, []).append(source)
audio_tracks = []
@@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution):
def video_rank(src):
''' Sort by settings preference. Use file size as tiebreaker '''
setting_name = 'codec_rank_' + codec_name(src['vcodec'])
setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
return (settings.current_settings_dict[setting_name],
src['file_size'])
pair_info['videos'].sort(key=video_rank)
@@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
if auto:
label += ' (Automatic)'
if trans_lang:
label += ' -> ' + trans_lang
label += f' -> {trans_lang}'
# Try to use Android caption URL directly (no PO Token needed)
caption_url = None
@@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
else:
caption_url += '&fmt=vtt'
if trans_lang:
caption_url += '&tlang=' + trans_lang
caption_url += f'&tlang={trans_lang}'
url = util.prefix_url(caption_url)
else:
# Fallback to old method
@@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id):
player_name = info['player_name']
if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name)
print(f'Using cached decryption function for: {player_name}')
info['decryption_function'] = decrypt_cache[player_name]
else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js)
if err:
@@ -387,11 +387,11 @@ def fetch_player_response(client, video_id):
def fetch_watch_page_info(video_id, playlist_id, index):
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
# videos
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999'
url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
if playlist_id:
url += '&list=' + playlist_id
url += f'&list={playlist_id}'
if index:
url += '&index=' + index
url += f'&index={index}'
headers = (
('Accept', '*/*'),
@@ -446,7 +446,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
info['hls_audio_tracks'] = {}
hls_data = None
hls_client_used = None
for hls_client in ('ios', 'ios_vr', 'android'):
for hls_client in ('ios', 'android'):
try:
resp = fetch_player_response(hls_client, video_id) or {}
hls_data = json.loads(resp) if isinstance(resp, str) else resp
@@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS audio tracks for proxy access
added = 0
for lang, track in info['hls_audio_tracks'].items():
ck = video_id + '_' + lang
ck = f"{video_id}_{lang}"
from youtube.hls_cache import register_track
register_track(ck, track['hls_url'],
video_id=video_id, track_id=lang)
@@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'audio_track_id': lang,
'audio_track_name': track['name'],
'audio_track_is_default': track['is_default'],
'itag': 'hls_' + lang,
'itag': f'hls_{lang}',
'ext': 'mp4',
'audio_bitrate': 128,
'bitrate': 128000,
@@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'fps': None,
'init_range': {'start': 0, 'end': 0},
'index_range': {'start': 0, 'end': 0},
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck),
'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
's': None,
'sp': None,
'quality': None,
@@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS manifest for proxying
if info['hls_manifest_url']:
ck = video_id + '_video'
ck = f"{video_id}_video"
from youtube.hls_cache import register_track
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
# Use proxy URL instead of direct Google Video URL
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck)
info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
# Fallback to 'ios' if no valid URLs are found
if not info.get('formats') or info.get('player_urls_missing'):
@@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
if info.get('formats'):
decryption_error = decrypt_signatures(info, video_id)
if decryption_error:
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error
info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
# check if urls ready (non-live format) in former livestream
# urls not ready if all of them have no filesize
@@ -621,55 +621,10 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
return info
def video_quality_string(format):
if format['vcodec']:
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?')
if format['fps']:
result += ' ' + str(format['fps']) + 'fps'
return result
elif format['acodec']:
return 'audio only'
return '?'
def short_video_quality_string(fmt):
result = str(fmt['quality'] or '?') + 'p'
if fmt['fps']:
result += str(fmt['fps'])
if fmt['vcodec'].startswith('av01'):
result += ' AV1'
elif fmt['vcodec'].startswith('avc'):
result += ' h264'
else:
result += ' ' + fmt['vcodec']
return result
def audio_quality_string(fmt):
if fmt['acodec']:
if fmt['audio_bitrate']:
result = '%d' % fmt['audio_bitrate'] + 'k'
else:
result = '?k'
if fmt['audio_sample_rate']:
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz'
return result
elif fmt['vcodec']:
return 'video only'
return '?'
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/utils.py
def format_bytes(bytes):
if bytes is None:
return 'N/A'
if type(bytes) is str:
bytes = float(bytes)
if bytes == 0.0:
exponent = 0
else:
exponent = int(math.log(bytes, 1024.0))
# video_quality_string imported from watch_formats
# short_video_quality_string imported from watch_formats
# audio_quality_string imported from watch_formats
# format_bytes imported from watch_formats
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
converted = float(bytes) / float(1024 ** exponent)
return '%.2f%s' % (converted, suffix)
@@ -737,9 +692,9 @@ def get_audio_track():
seg = line if line.startswith('http') else urljoin(playlist_base, line)
# Always use &seg= parameter, never &url= for segments
playlist_lines.append(
base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(seg, safe='')
f'{base_url}/ytl-api/audio-track?id='
f'{urllib.parse.quote(cache_key)}'
f'&seg={urllib.parse.quote(seg, safe="")}'
)
playlist = '\n'.join(playlist_lines)
@@ -797,9 +752,7 @@ def get_audio_track():
return url
if not url.startswith('http://') and not url.startswith('https://'):
url = urljoin(playlist_base, url)
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
playlist_lines = []
for line in playlist.split('\n'):
@@ -812,7 +765,7 @@ def get_audio_track():
if line.startswith('#') and 'URI=' in line:
def rewrite_uri_attr(match):
uri = match.group(1)
return 'URI="' + proxy_url(uri) + '"'
return f'URI="{proxy_url(uri)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
playlist_lines.append(line)
elif line.startswith('#'):
@@ -834,14 +787,12 @@ def get_audio_track():
# This is an actual segment - fetch and serve it
try:
headers = (
('User-Agent', 'Mozilla/5.0'),
('Accept', '*/*'),
)
content = util.fetch_url(seg_url, headers=headers,
debug_name='hls_seg', report_text=None)
headers_dict = {
'User-Agent': 'Mozilla/5.0',
'Accept': '*/*',
}
# Determine content type based on URL or content
# Determine content type based on URL
# HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s)
if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'):
content_type = 'video/mp4'
@@ -851,7 +802,23 @@ def get_audio_track():
# Default to MPEG-TS for HLS
content_type = 'video/mp2t'
return flask.Response(content, mimetype=content_type,
response, cleanup_func = util.fetch_url_response(
seg_url, headers=tuple(headers_dict.items()),
timeout=30, use_tor=settings.route_tor)
def generate():
try:
while True:
chunk = response.read(64 * 1024) # 64 KB chunks
if not chunk:
break
yield chunk
finally:
cleanup_func(response)
return flask.Response(
flask.stream_with_context(generate()),
mimetype=content_type,
headers={
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
@@ -883,9 +850,7 @@ def get_audio_track():
if segment_url.startswith('/ytl-api/audio-track'):
return segment_url
base_url = request.url_root.rstrip('/')
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(segment_url))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
playlist_lines = []
for line in playlist.split('\n'):
@@ -949,14 +914,10 @@ def get_hls_manifest():
if is_audio_track:
# Audio track playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&url=' + urllib.parse.quote(url, safe=''))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
else:
# Video segment or variant playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
# Parse and rewrite the manifest
manifest_lines = []
@@ -974,7 +935,7 @@ def get_hls_manifest():
nonlocal rewritten_count
uri = match.group(1)
rewritten_count += 1
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"'
return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
manifest_lines.append(line)
elif line.startswith('#'):
@@ -1053,7 +1014,7 @@ def get_storyboard_vtt():
ts = 0 # current timestamp
for i in range(storyboard.storyboard_count):
url = '/' + storyboard.url.replace("$M", str(i))
url = f'/{storyboard.url.replace("$M", str(i))}'
interval = storyboard.interval
w, h = storyboard.width, storyboard.height
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
@@ -1078,7 +1039,7 @@ def get_watch_page(video_id=None):
if not video_id:
return flask.render_template('error.html', error_message='Missing video id'), 404
if len(video_id) < 11:
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404
return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
time_start_str = request.args.get('t', '0s')
time_start = 0
@@ -1141,9 +1102,9 @@ def get_watch_page(video_id=None):
util.prefix_urls(item)
util.add_extra_html_info(item)
if playlist_id:
item['url'] += '&list=' + playlist_id
item['url'] += f'&list={playlist_id}'
if item['index']:
item['url'] += '&index=' + str(item['index'])
item['url'] += f'&index={item["index"]}'
info['playlist']['author_url'] = util.prefix_url(
info['playlist']['author_url'])
if settings.img_prefix:
@@ -1159,16 +1120,16 @@ def get_watch_page(video_id=None):
filename = title
ext = fmt.get('ext')
if ext:
filename += '.' + ext
filename += f'.{ext}'
fmt['url'] = fmt['url'].replace(
'/videoplayback',
'/videoplayback/name/' + filename)
f'/videoplayback/name/{filename}')
download_formats = []
for format in (info['formats'] + info['hls_formats']):
if format['acodec'] and format['vcodec']:
codecs_string = format['acodec'] + ', ' + format['vcodec']
codecs_string = f"{format['acodec']}, {format['vcodec']}"
else:
codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({
@@ -1247,12 +1208,9 @@ def get_watch_page(video_id=None):
for source in subtitle_sources:
best_caption_parse = urllib.parse.urlparse(
source['url'].lstrip('/'))
transcript_url = (util.URL_ORIGIN
+ '/watch/transcript'
+ best_caption_parse.path
+ '?' + best_caption_parse.query)
transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
other_downloads.append({
'label': 'Video Transcript: ' + source['label'],
'label': f'Video Transcript: {source["label"]}',
'ext': 'txt',
'url': transcript_url
})
@@ -1263,7 +1221,7 @@ def get_watch_page(video_id=None):
template_name = 'watch.html'
return flask.render_template(template_name,
header_playlist_names = local_playlist.get_playlist_names(),
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
time_published = info['time_published'],
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
@@ -1305,10 +1263,10 @@ def get_watch_page(video_id=None):
ip_address = info['ip_address'] if settings.route_tor else None,
invidious_used = info['invidious_used'],
invidious_reload_button = info['invidious_reload_button'],
video_url = util.URL_ORIGIN + '/watch?v=' + video_id,
video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
video_id = video_id,
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' +
urlencode([('spec_url', info['storyboard_spec_url'])])
storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
if info['storyboard_spec_url'] else None),
js_data = {
@@ -1335,7 +1293,7 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>')
def get_captions(dummy):
url = 'https://www.youtube.com' + request.full_path
url = f'https://www.youtube.com{request.full_path}'
try:
result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"")
@@ -1350,12 +1308,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
@yt_app.route('/watch/transcript/<path:caption_path>')
def get_transcript(caption_path):
try:
captions = util.fetch_url('https://www.youtube.com/'
+ caption_path
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
except util.FetchError as e:
msg = ('Error retrieving captions: ' + str(e) + '\n\n'
+ 'The caption url may have expired.')
msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
print(msg)
return flask.Response(
msg,
@@ -1403,7 +1358,7 @@ def get_transcript(caption_path):
result = ''
for seg in segments:
if seg['text'] != ' ':
result += seg['begin'] + ' ' + seg['text'] + '\r\n'
result += f"{seg['begin']} {seg['text']}\r\n"
return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8')

82
youtube/watch_formats.py Normal file
View File

@@ -0,0 +1,82 @@
"""Video format helpers for yt-local."""
import math
from typing import Any, Dict, Optional
def codec_name(vcodec: str) -> str:
"""Extract codec short name from codec string."""
if vcodec.startswith('avc'):
return 'h264'
elif vcodec.startswith('av01'):
return 'av1'
elif vcodec.startswith('vp'):
return 'vp'
else:
return 'unknown'
def video_quality_string(fmt: Dict[str, Any]) -> str:
"""Return video quality string (e.g., '1920x1080 30fps')."""
if fmt.get('vcodec'):
result = f"{fmt.get('width') or '?'}x{fmt.get('height') or '?'}"
if fmt.get('fps'):
result += f" {fmt['fps']}fps"
return result
elif fmt.get('acodec'):
return 'audio only'
return '?'
def short_video_quality_string(fmt: Dict[str, Any]) -> str:
"""Return short video quality string (e.g., '1080p60 AV1')."""
result = f"{fmt.get('quality') or '?'}p"
if fmt.get('fps'):
result += str(fmt['fps'])
vcodec = fmt.get('vcodec', '')
if vcodec.startswith('av01'):
result += ' AV1'
elif vcodec.startswith('avc'):
result += ' h264'
else:
result += f" {vcodec}"
return result
def audio_quality_string(fmt: Dict[str, Any]) -> str:
"""Return audio quality string (e.g., '128k 44.1kHz')."""
if fmt.get('acodec'):
if fmt.get('audio_bitrate'):
result = f"{fmt['audio_bitrate']}k"
else:
result = '?k'
if fmt.get('audio_sample_rate'):
result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz"
return result
elif fmt.get('vcodec'):
return 'video only'
return '?'
def format_bytes(bytes_val: Optional[float]) -> str:
"""Convert bytes to human-readable string (e.g., '1.5 MiB')."""
if bytes_val is None:
return 'N/A'
if type(bytes_val) is str:
bytes_val = float(bytes_val)
if bytes_val == 0.0:
exponent = 0
else:
exponent = int(math.log(bytes_val, 1024.0))
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
converted = float(bytes_val) / float(1024 ** exponent)
return '%.2f%s' % (converted, suffix)
__all__ = [
'codec_name',
'video_quality_string',
'short_video_quality_string',
'audio_quality_string',
'format_bytes',
]

View File

@@ -1,7 +1,8 @@
from .common import (get, multi_get, deep_get, multi_deep_get,
liberal_update, conservative_update, remove_redirect, normalize_url,
extract_str, extract_formatted_text, extract_int, extract_approx_int,
extract_date, extract_item_info, extract_items, extract_response)
extract_date, extract_item_info, extract_items, extract_response,
concat_or_none)
from .everything_else import (extract_channel_info, extract_search_info,
extract_playlist_metadata, extract_playlist_info, extract_comments_info)

View File

@@ -212,7 +212,7 @@ def extract_date(date_text):
month, day, year = parts[-3:]
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day
return f'{year}-{month}-{day}'
return None
def check_missing_keys(object, *key_sequences):
@@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences):
for key in key_sequence:
_object = _object[key]
except (KeyError, IndexError, TypeError):
return 'Could not find ' + key
return f'Could not find {key}'
return None
@@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}):
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['description'] = extract_formatted_text(multi_deep_get(
item,
['descriptionText'], ['descriptionSnippet'],

View File

@@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json):
metadata['description'] = desc
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}'
if metadata['first_video_id'] is None:
metadata['thumbnail'] = None

View File

@@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''):
)
if playability_status not in (None, 'OK'):
info['playability_error'] = error_prefix + playability_reason
info['playability_error'] = f'{error_prefix}{playability_reason}'
elif not info['playability_error']: # do not override
info['playability_error'] = error_prefix + 'Unknown playability error'
info['playability_error'] = f'{error_prefix}Unknown playability error'
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json):
@@ -726,7 +726,7 @@ def extract_watch_info(polymer_json):
# Store the full URL from the player response (includes valid tokens)
if base_url:
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name
@@ -806,7 +806,7 @@ def extract_watch_info(polymer_json):
info['allowed_countries'] = mf.get('availableCountries', [])
# other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
return info
@@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
url = info['_captions_base_url']
if not url:
return None
url += '&lang=' + language
url += '&fmt=' + format
url += f'&lang={language}'
url += f'&fmt={format}'
if automatic:
url += '&kind=asr'
elif language in info['_manual_caption_language_names']:
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='')
url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}'
if translation_language:
url += '&tlang=' + translation_language
@@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js):
return 'Could not find var_name'
var_name = var_with_operation_match.group(1)
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL)
if var_body_match is None:
return 'Could not find var_body'
@@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js):
elif op_body.startswith('var c=a[0]'):
operation_definitions[op_name] = 2
else:
return 'Unknown op_body: ' + op_body
return f'Unknown op_body: {op_body}'
decryption_function = []
for op_with_arg in function_body:
@@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js):
return 'Could not parse operation with arg'
op_name = match.group(2).strip('[].')
if op_name not in operation_definitions:
return 'Unknown op_name: ' + str(op_name)
return f'Unknown op_name: {op_name}'
op_argument = match.group(3)
decryption_function.append([operation_definitions[op_name], int(op_argument)])
@@ -1028,5 +1028,5 @@ def decrypt_signatures(info):
_operation_2(a, argument)
signature = ''.join(a)
format['url'] += '&' + format['sp'] + '=' + signature
format['url'] += f'&{format["sp"]}={signature}'
return False