8 Commits

Author SHA1 Message Date
13708bb4ea update README.md
All checks were successful
CI / test (push) Successful in 44s
git-sync-with-mirror / git-sync (push) Successful in 12s
2026-05-03 13:55:24 -05:00
b9248cfe2d fix: init Plyr before HLS manifest loads to avoid bare video flash
All checks were successful
CI / test (push) Successful in 45s
Create Plyr instance immediately in start() so the styled player UI
appears right away. Quality and audio controls are injected once the
HLS manifest is ready, running doAdd() directly when player.ready is
already true instead of waiting on the 'ready' event that already fired.
2026-05-03 13:45:00 -05:00
07ca635a84 bump to v0.5.1
All checks were successful
CI / test (push) Successful in 46s
2026-05-03 13:20:32 -05:00
afbe137676 update HACKING.md
All checks were successful
CI / test (push) Successful in 44s
2026-05-03 13:19:00 -05:00
76c9263cd6 update README.md
All checks were successful
CI / test (push) Successful in 48s
2026-05-03 13:12:09 -05:00
ab55cf79bb fix: show only current branch in version display
All checks were successful
CI / test (push) Successful in 47s
git branch lists all branches; replace with git branch --show-current
2026-05-03 12:36:32 -05:00
8d66143c90 fix: update innertube clients and fix HLS/DASH quality switching
All checks were successful
CI / test (push) Successful in 53s
- Update innertube client versions to match yt-dlp (android 21.02.35,
  ios 21.02.3, web 2.20260114.08.00, android_vr 1.65.10)
- Remove obsolete clients (android-test-suite, ios_vr)
- Replace tv_embedded with TVHTML5_SIMPLY (cn 75)
- Add new clients: web_embedded, mweb, tv
- Fix HLS freeze on quality switch: use nextLevel instead of
  currentLevel, handle bufferStalledError, stream proxy segments
  instead of buffering in memory
- Populate DASH quality selector with actual sources (no Auto)
- Render quality-select empty in template, let JS populate per mode
2026-05-03 12:32:55 -05:00
50ad959a80 refactor: replace string concatenations with f-strings
All checks were successful
CI / test (push) Successful in 50s
2026-04-25 01:02:17 -05:00
30 changed files with 790 additions and 610 deletions

View File

@@ -2,7 +2,7 @@
[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0) [![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0)
[![Python 3.7+](https://img.shields.io/badge/python-3.7+-blue.svg)](https://www.python.org/downloads/) [![Python 3.7+](https://img.shields.io/badge/python-3.7+-blue.svg)](https://www.python.org/downloads/)
[![Tests](https://img.shields.io/badge/tests-passing-brightgreen.svg)](https://github.com/user234683/youtube-local) [![Tests](https://img.shields.io/badge/tests-passing-brightgreen.svg)](https://git.fridu.us/heckyel/youtube-local)
A privacy-focused, browser-based YouTube client that routes requests through Tor for anonymous viewing—**without compromising on speed or features**. A privacy-focused, browser-based YouTube client that routes requests through Tor for anonymous viewing—**without compromising on speed or features**.
@@ -47,13 +47,13 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
## Screenshots ## Screenshots
| Light Theme | Gray Theme | Dark Theme | | Light Theme | Gray Theme | Dark Theme |
|:-----------------------------------------------------:|:----------------------------------------------------:|:----------------------------------------------------:| |:----------------------------------------------------------------------------------------------:|:---------------------------------------------------------------------------------------------:|:---------------------------------------------------------------------------------------------:|
| ![Light](https://pic.infini.fr/l7WINjzS/0Ru6MrhA.png) | ![Gray](https://pic.infini.fr/znnQXWNc/hL78CRzo.png) | ![Dark](https://pic.infini.fr/iXwFtTWv/mt2kS5bv.png) | | ![Light](https://gist.github.com/user-attachments/assets/9552533c-6be0-4757-98aa-a59d1c294e90) | ![Gray](https://gist.github.com/user-attachments/assets/d7692a0f-b86f-4375-a011-7ee026bbbcdb) | ![Dark](https://gist.github.com/user-attachments/assets/abd4c38b-8612-4f43-9483-081eb950ab99) |
| Channel View | Playlist View | | Channel View | Playlist View |
|:-------------------------------------------------------:|:---------------------:| |:------------------------------------------------------------------------------------------------:|:---------------------:|
| ![Channel](https://pic.infini.fr/JsenWVYe/SbdIQlS6.png) | *(similar structure)* | | ![Channel](https://gist.github.com/user-attachments/assets/d359847c-96e1-403f-a190-3c159defe8a7) | *(similar structure)* |
--- ---
@@ -61,7 +61,7 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
### Windows ### Windows
1. Download the latest [release ZIP](https://github.com/user234683/yt-local/releases) 1. Download the latest [release ZIP](https://git.fridu.us/heckyel/yt-local/releases)
2. Extract to any folder 2. Extract to any folder
3. Run `run.bat` to start 3. Run `run.bat` to start
@@ -69,7 +69,7 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
```bash ```bash
# 1. Clone or extract the release # 1. Clone or extract the release
git clone https://github.com/user234683/yt-local.git git clone https://git.fridu.us/heckyel/yt-local.git
cd yt-local cd yt-local
# 2. Create and activate virtual environment # 2. Create and activate virtual environment
@@ -279,7 +279,7 @@ yt-local is designed for self-hosting.
This project is 100% free and open-source. If you'd like to support development: This project is 100% free and open-source. If you'd like to support development:
- **Bitcoin**: `1JrC3iqs3PP5Ge1m1vu7WE8LEf4S85eo7y` - **Bitcoin**: `1JrC3iqs3PP5Ge1m1vu7WE8LEf4S85eo7y`
- **Tor node donation**: https://torservers.net/donate - **Tor node donation**: <https://torservers.net/donate>
--- ---
@@ -310,4 +310,4 @@ Permission is granted to relicense code portions into youtube-dl's license (curr
Made for privacy-conscious users Made for privacy-conscious users
Last updated: 2026-04-19 Last updated: 2026-05-03

View File

@@ -1,4 +1,5 @@
# Coding guidelines # Coding guidelines
* Follow the [PEP 8 guidelines](https://www.python.org/dev/peps/pep-0008/) for all new Python code as best you can. Some old code doesn't follow PEP 8 yet. This includes limiting line length to 79 characters (with exception for long strings such as URLs that can't reasonably be broken across multiple lines) and using 4 spaces for indentation. * Follow the [PEP 8 guidelines](https://www.python.org/dev/peps/pep-0008/) for all new Python code as best you can. Some old code doesn't follow PEP 8 yet. This includes limiting line length to 79 characters (with exception for long strings such as URLs that can't reasonably be broken across multiple lines) and using 4 spaces for indentation.
* Do not use single letter or cryptic names for variables (except iterator variables or the like). When in doubt, choose the more verbose option. * Do not use single letter or cryptic names for variables (except iterator variables or the like). When in doubt, choose the more verbose option.
@@ -12,30 +13,34 @@
* The same guidelines apply to commenting code. If a piece of code is not self-explanatory, add a comment explaining what it does and why it's there. * The same guidelines apply to commenting code. If a piece of code is not self-explanatory, add a comment explaining what it does and why it's there.
# Testing and releases # Testing and releases
* This project uses pytest. To install pytest and any future dependencies needed for development, run pip3 on the requirements-dev.txt file. To run tests, run `python3 -m pytest` rather than just `pytest` because the former will make sure the toplevel directory is in Python's import search path. * This project uses pytest. To install pytest and any future dependencies needed for development, run pip3 on the requirements-dev.txt file. To run tests, run `python3 -m pytest` rather than just `pytest` because the former will make sure the toplevel directory is in Python's import search path.
* To build releases for Windows, run `python3 generate_release.py [intended python version here, without v infront]`. The required software (such as 7z, git) are listed in the `generate_release.py` file. For instance, wine is required if building on GNU+Linux. The build script will automatically download the embedded Python release to include. Use the latest release of Python 3.7.x so that Vista will be supported. See https://github.com/user234683/youtube-local/issues/6#issuecomment-672608388 * To build releases for Windows, run `python3 generate_release.py [intended python version here, without v infront]`. The required software (such as 7z, git) are listed in the `generate_release.py` file. For instance, wine is required if building on GNU+Linux. The build script will automatically download the embedded Python release to include. Use the latest release of Python 3.7.x so that Vista will be supported. See https://github.com/user234683/youtube-local/issues/6#issuecomment-672608388
# Overview of the software architecture ## Overview of the software architecture
### server.py
## server.py
* This is the entry point, and sets up the HTTP server that listens for incoming requests. It delegates the request to the appropriate "site_handler". For instance, `localhost:8080/youtube.com/...` goes to the `youtube` site handler, whereas `localhost:8080/ytimg.com/...` (the url for video thumbnails) goes to the site handler for just fetching static resources such as images from youtube. * This is the entry point, and sets up the HTTP server that listens for incoming requests. It delegates the request to the appropriate "site_handler". For instance, `localhost:8080/youtube.com/...` goes to the `youtube` site handler, whereas `localhost:8080/ytimg.com/...` (the url for video thumbnails) goes to the site handler for just fetching static resources such as images from youtube.
* The reason for this architecture: the original design philosophy when I first conceived the project was that this would work for any site supported by youtube-dl, including YouTube, Vimeo, DailyMotion, etc. I've dropped this idea for now, though I might pick it up later. (youtube-dl is no longer used) * The reason for this architecture: the original design philosophy when I first conceived the project was that this would work for any site supported by youtube-dl, including YouTube, Vimeo, DailyMotion, etc. I've dropped this idea for now, though I might pick it up later. (youtube-dl is no longer used)
* This file uses the raw [WSGI request](https://www.python.org/dev/peps/pep-3333/) format. The WSGI format is a Python standard for how HTTP servers (I use the stock server provided by gevent) should call HTTP applications. So that's why the file contains stuff like `env['REQUEST_METHOD']`. * This file uses the raw [WSGI request](https://www.python.org/dev/peps/pep-3333/) format. The WSGI format is a Python standard for how HTTP servers (I use the stock server provided by gevent) should call HTTP applications. So that's why the file contains stuff like `env['REQUEST_METHOD']`.
### Flask and Gevent
## Flask and Gevent
* The `youtube` handler in server.py then delegates the request to the Flask yt_app object, which the rest of the project uses. [Flask](https://flask.palletsprojects.com/en/1.1.x/) is a web application framework that makes handling requests easier than accessing the raw WSGI requests. Flask (Werkzeug specifically) figures out which function to call for a particular url. Each request handling function is registered into Flask's routing table by using function annotations above it. The request handling functions are always at the bottom of the file for a particular youtube page (channel, watch, playlist, etc.), and they're where you want to look to see how the response gets constructed for a particular url. Miscellaneous request handlers that don't belong anywhere else are located in `__init__.py`, which is where the `yt_app` object is instantiated. * The `youtube` handler in server.py then delegates the request to the Flask yt_app object, which the rest of the project uses. [Flask](https://flask.palletsprojects.com/en/1.1.x/) is a web application framework that makes handling requests easier than accessing the raw WSGI requests. Flask (Werkzeug specifically) figures out which function to call for a particular url. Each request handling function is registered into Flask's routing table by using function annotations above it. The request handling functions are always at the bottom of the file for a particular youtube page (channel, watch, playlist, etc.), and they're where you want to look to see how the response gets constructed for a particular url. Miscellaneous request handlers that don't belong anywhere else are located in `__init__.py`, which is where the `yt_app` object is instantiated.
* The actual html for yt-local is generated using Jinja templates. Jinja lets you embed a Python-like language inside html files so you can use constructs such as for loops to construct the html for a list of 30 videos given a dictionary with information for those videos. Jinja is included as part of Flask. It has some annoying differences from Python in a lot of details, so check the [docs here](https://jinja.palletsprojects.com/en/2.11.x/) when you use it. The request handling functions will pass the information that has been scraped from YouTube into these templates for the final result. * The actual html for yt-local is generated using Jinja templates. Jinja lets you embed a Python-like language inside html files so you can use constructs such as for loops to construct the html for a list of 30 videos given a dictionary with information for those videos. Jinja is included as part of Flask. It has some annoying differences from Python in a lot of details, so check the [docs here](https://jinja.palletsprojects.com/en/2.11.x/) when you use it. The request handling functions will pass the information that has been scraped from YouTube into these templates for the final result.
* The project uses the gevent library for parallelism (such as for launching requests in parallel), as opposed to using the async keyword. * The project uses the gevent library for parallelism (such as for launching requests in parallel), as opposed to using the async keyword.
## util.py ### util.py
* util.py is a grab-bag of miscellaneous things; admittedly I need to get around to refactoring it. The biggest thing it has is the `fetch_url` function which is what I use for sending out requests for YouTube. The Tor routing is managed here. `fetch_url` will raise an a `FetchError` exception if the request fails. The parameter `debug_name` in `fetch_url` is the filename that the response from YouTube will be saved to if the hidden debugging option is enabled in settings.txt. So if there's a bug when YouTube changes something, you can check the response from YouTube from that file. * util.py is a grab-bag of miscellaneous things; admittedly I need to get around to refactoring it. The biggest thing it has is the `fetch_url` function which is what I use for sending out requests for YouTube. The Tor routing is managed here. `fetch_url` will raise an a `FetchError` exception if the request fails. The parameter `debug_name` in `fetch_url` is the filename that the response from YouTube will be saved to if the hidden debugging option is enabled in settings.txt. So if there's a bug when YouTube changes something, you can check the response from YouTube from that file.
## Data extraction - protobuf, polymer, and yt_data_extract ### Data extraction - protobuf, polymer, and yt_data_extract
* proto.py is used for generating what are called ctokens needed when making requests to YouTube. These ctokens use Google's [protobuf](https://developers.google.com/protocol-buffers) format. Figuring out how to generate these in new instances requires some reverse engineering. I have a messy python file I use to make this convenient which you can find under ./youtube/proto_debug.py * proto.py is used for generating what are called ctokens needed when making requests to YouTube. These ctokens use Google's [protobuf](https://developers.google.com/protocol-buffers) format. Figuring out how to generate these in new instances requires some reverse engineering. I have a messy python file I use to make this convenient which you can find under ./youtube/proto_debug.py
* The responses from YouTube are in a JSON format called polymer (polymer is the name of the 2017-present YouTube layout). The JSON consists of a bunch of nested dictionaries which basically specify the layout of the page via objects called renderers. A renderer represents an object on a page in a similar way to html tags; the renders often contain renders inside them. The Javascript on YouTube's page translates this JSON to HTML. Example: `compactVideoRenderer` represents a video item in you can click on such as in the related videos (so these are called "items" in the codebase). This JSON is very messy. You'll need a JSON prettifier or something that gives you a tree view in order to study it. * The responses from YouTube are in a JSON format called polymer (polymer is the name of the 2017-present YouTube layout). The JSON consists of a bunch of nested dictionaries which basically specify the layout of the page via objects called renderers. A renderer represents an object on a page in a similar way to html tags; the renders often contain renders inside them. The Javascript on YouTube's page translates this JSON to HTML. Example: `compactVideoRenderer` represents a video item in you can click on such as in the related videos (so these are called "items" in the codebase). This JSON is very messy. You'll need a JSON prettifier or something that gives you a tree view in order to study it.
@@ -46,15 +51,16 @@
* The `extract_items` function is similar but works on the response object, automatically finding the appropriate renderer to call `extract_items_from_renderer` on. * The `extract_items` function is similar but works on the response object, automatically finding the appropriate renderer to call `extract_items_from_renderer` on.
### Other
## Other
* subscriptions.py uses SQLite to store data. * subscriptions.py uses SQLite to store data.
* Hidden settings only relevant to developers (such as for debugging) are not displayed on the settings page. They can be found in the settings.txt file. * Hidden settings only relevant to developers (such as for debugging) are not displayed on the settings page. They can be found in the settings.txt file.
* Since I can't anticipate the things that will trip up beginners to the codebase, if you spend awhile figuring something out, go ahead and make a pull request adding a brief description of your findings to this document to help other beginners. * Since I can't anticipate the things that will trip up beginners to the codebase, if you spend awhile figuring something out, go ahead and make a pull request adding a brief description of your findings to this document to help other beginners.
## Development tips ### Development tips
* When developing functionality to interact with YouTube in new ways, you'll want to use the network tab in your browser's devtools to inspect which requests get made under normal usage of YouTube. You'll also want a tool you can use to construct custom requests and specify headers to reverse engineer the request format. I use the [HeaderTool](https://github.com/loreii/HeaderTool) extension in Firefox, but there's probably a more streamlined program out there. * When developing functionality to interact with YouTube in new ways, you'll want to use the network tab in your browser's devtools to inspect which requests get made under normal usage of YouTube. You'll also want a tool you can use to construct custom requests and specify headers to reverse engineer the request format. I use the [HeaderTool](https://github.com/loreii/HeaderTool) extension in Firefox, but there's probably a more streamlined program out there.
* You'll want to have a utility or IDE that can perform full text search on a repository, since this is crucial for navigating unfamiliar codebases to figure out where certain strings appear or where things get defined. * You'll want to have a utility or IDE that can perform full text search on a repository, since this is crucial for navigating unfamiliar codebases to figure out where certain strings appear or where things get defined.

View File

@@ -33,7 +33,7 @@ def check_subp(x):
raise Exception('Got nonzero exit code from command') raise Exception('Got nonzero exit code from command')
def log(line): def log(line):
print('[generate_release.py] ' + line) print(f'[generate_release.py] {line}')
# https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions # https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions
def remove_files_with_extensions(path, extensions): def remove_files_with_extensions(path, extensions):
@@ -43,23 +43,23 @@ def remove_files_with_extensions(path, extensions):
os.remove(os.path.join(root, file)) os.remove(os.path.join(root, file))
def download_if_not_exists(file_name, url, sha256=None): def download_if_not_exists(file_name, url, sha256=None):
if not os.path.exists('./' + file_name): if not os.path.exists(f'./{file_name}'):
# Reject non-https URLs so a mistaken constant cannot cause a # Reject non-https URLs so a mistaken constant cannot cause a
# plaintext download (bandit B310 hardening). # plaintext download (bandit B310 hardening).
if not url.startswith('https://'): if not url.startswith('https://'):
raise Exception('Refusing to download over non-https URL: ' + url) raise Exception(f'Refusing to download over non-https URL: {url}')
log('Downloading ' + file_name + '..') log(f'Downloading {file_name}..')
data = urllib.request.urlopen(url).read() data = urllib.request.urlopen(url).read()
log('Finished downloading ' + file_name) log(f'Finished downloading {file_name}')
with open('./' + file_name, 'wb') as f: with open(f'./{file_name}', 'wb') as f:
f.write(data) f.write(data)
if sha256: if sha256:
digest = hashlib.sha256(data).hexdigest() digest = hashlib.sha256(data).hexdigest()
if digest != sha256: if digest != sha256:
log('Error: ' + file_name + ' has wrong hash: ' + digest) log(f'Error: {file_name} has wrong hash: {digest}')
sys.exit(1) sys.exit(1)
else: else:
log('Using existing ' + file_name) log(f'Using existing {file_name}')
def wine_run_shell(command): def wine_run_shell(command):
# Keep argv-style invocation (no shell) to avoid command injection. # Keep argv-style invocation (no shell) to avoid command injection.
@@ -120,7 +120,7 @@ if len(os.listdir('./yt-local')) == 0:
# ----------- Generate embedded python distribution ----------- # ----------- Generate embedded python distribution -----------
os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution
get_pip_url = 'https://bootstrap.pypa.io/get-pip.py' get_pip_url = 'https://bootstrap.pypa.io/get-pip.py'
latest_dist_url = 'https://www.python.org/ftp/python/' + latest_version + '/python-' + latest_version latest_dist_url = f'https://www.python.org/ftp/python/{latest_version}/python-{latest_version}'
if bitness == '32': if bitness == '32':
latest_dist_url += '-embed-win32.zip' latest_dist_url += '-embed-win32.zip'
else: else:
@@ -142,7 +142,7 @@ else:
download_if_not_exists('get-pip.py', get_pip_url) download_if_not_exists('get-pip.py', get_pip_url)
python_dist_name = 'python-dist-' + latest_version + '-' + bitness + '.zip' python_dist_name = f'python-dist-{latest_version}-{bitness}.zip'
download_if_not_exists(python_dist_name, latest_dist_url) download_if_not_exists(python_dist_name, latest_dist_url)
download_if_not_exists(visual_c_name, download_if_not_exists(visual_c_name,
@@ -203,7 +203,7 @@ and replaced with a .pth. Isolated mode will have to be specified manually.
log('Removing ._pth') log('Removing ._pth')
major_release = latest_version.split('.')[1] major_release = latest_version.split('.')[1]
os.remove(r'./python/python3' + major_release + '._pth') os.remove(rf'./python/python3{major_release}._pth')
log('Adding path_fixes.pth') log('Adding path_fixes.pth')
with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f: with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
@@ -214,7 +214,7 @@ with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
# Need to add the directory where packages are installed, # Need to add the directory where packages are installed,
# and the parent directory (which is where the yt-local files are) # and the parent directory (which is where the yt-local files are)
major_release = latest_version.split('.')[1] major_release = latest_version.split('.')[1]
with open('./python/python3' + major_release + '._pth', 'a', encoding='utf-8') as f: with open(rf'./python/python3{major_release}._pth', 'a', encoding='utf-8') as f:
f.write('.\\Lib\\site-packages\n') f.write('.\\Lib\\site-packages\n')
f.write('..\n')''' f.write('..\n')'''
@@ -255,10 +255,10 @@ log('Copying python distribution into release folder')
shutil.copytree(r'./python', r'./yt-local/python') shutil.copytree(r'./python', r'./yt-local/python')
# ----------- Create release zip ----------- # ----------- Create release zip -----------
output_filename = 'yt-local-' + release_tag + '-' + suffix + '.zip' output_filename = f'yt-local-{release_tag}-{suffix}.zip'
if os.path.exists('./' + output_filename): if os.path.exists(f'./{output_filename}'):
log('Removing previous zipped release') log('Removing previous zipped release')
os.remove('./' + output_filename) os.remove(f'./{output_filename}')
log('Zipping release') log('Zipping release')
check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local'])) check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local']))

View File

@@ -32,9 +32,9 @@ def youtu_be(env, start_response):
id = env['PATH_INFO'][1:] id = env['PATH_INFO'][1:]
env['PATH_INFO'] = '/watch' env['PATH_INFO'] = '/watch'
if not env['QUERY_STRING']: if not env['QUERY_STRING']:
env['QUERY_STRING'] = 'v=' + id env['QUERY_STRING'] = f'v={id}'
else: else:
env['QUERY_STRING'] += '&v=' + id env['QUERY_STRING'] += f'&v={id}'
yield from yt_app(env, start_response) yield from yt_app(env, start_response)
@@ -64,12 +64,12 @@ def proxy_site(env, start_response, video=False):
if 'HTTP_RANGE' in env: if 'HTTP_RANGE' in env:
send_headers['Range'] = env['HTTP_RANGE'] send_headers['Range'] = env['HTTP_RANGE']
url = "https://" + env['SERVER_NAME'] + env['PATH_INFO'] url = f"https://{env['SERVER_NAME']}{env['PATH_INFO']}"
# remove /name portion # remove /name portion
if video and '/videoplayback/name/' in url: if video and '/videoplayback/name/' in url:
url = url[0:url.rfind('/name/')] url = url[0:url.rfind('/name/')]
if env['QUERY_STRING']: if env['QUERY_STRING']:
url += '?' + env['QUERY_STRING'] url += f'?{env["QUERY_STRING"]}'
try_num = 1 try_num = 1
first_attempt = True first_attempt = True
@@ -96,7 +96,7 @@ def proxy_site(env, start_response, video=False):
+[('Access-Control-Allow-Origin', '*')]) +[('Access-Control-Allow-Origin', '*')])
if first_attempt: if first_attempt:
start_response(str(response.status) + ' ' + response.reason, start_response(f"{response.status} {response.reason}",
response_headers) response_headers)
content_length = int(dict(response_headers).get('Content-Length', 0)) content_length = int(dict(response_headers).get('Content-Length', 0))
@@ -136,9 +136,8 @@ def proxy_site(env, start_response, video=False):
fail_byte = start + total_received fail_byte = start + total_received
send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end) send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end)
print( print(
'Warning: YouTube closed the connection before byte', f'Warning: YouTube closed the connection before byte {fail_byte}. '
str(fail_byte) + '.', 'Expected', start+content_length, f'Expected {start+content_length} bytes.'
'bytes.'
) )
retry = True retry = True
@@ -185,7 +184,7 @@ def split_url(url):
# python STILL doesn't have a proper regular expression engine like grep uses built in... # python STILL doesn't have a proper regular expression engine like grep uses built in...
match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url) match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url)
if match is None: if match is None:
raise ValueError('Invalid or unsupported url: ' + url) raise ValueError(f'Invalid or unsupported url: {url}')
return match.group(1), match.group(2) return match.group(1), match.group(2)
@@ -238,7 +237,7 @@ def site_dispatch(env, start_response):
if base_name == '': if base_name == '':
base_name = domain base_name = domain
else: else:
base_name = domain + '.' + base_name base_name = f"{domain}.{base_name}"
try: try:
handler = site_handlers[base_name] handler = site_handlers[base_name]

View File

@@ -397,14 +397,14 @@ acceptable_targets = SETTINGS_INFO.keys() | {
def comment_string(comment): def comment_string(comment):
result = '' result = ''
for line in comment.splitlines(): for line in comment.splitlines():
result += '# ' + line + '\n' result += f'# {line}\n'
return result return result
def save_settings(settings_dict): def save_settings(settings_dict):
with open(settings_file_path, 'w', encoding='utf-8') as file: with open(settings_file_path, 'w', encoding='utf-8') as file:
for setting_name, setting_info in SETTINGS_INFO.items(): for setting_name, setting_info in SETTINGS_INFO.items():
file.write(comment_string(setting_info['comment']) + setting_name + ' = ' + repr(settings_dict[setting_name]) + '\n\n') file.write(f"{comment_string(setting_info['comment'])}{setting_name} = {repr(settings_dict[setting_name])}\n\n")
def add_missing_settings(settings_dict): def add_missing_settings(settings_dict):
@@ -481,7 +481,7 @@ upgrade_functions = {
def log_ignored_line(line_number, message): def log_ignored_line(line_number, message):
print('WARNING: Ignoring settings.txt line ' + str(line_number) + ' (' + message + ')') print(f'WARNING: Ignoring settings.txt line {line_number} ({message})')
if os.path.isfile("settings.txt"): if os.path.isfile("settings.txt"):
@@ -535,7 +535,7 @@ else:
continue continue
if target.id not in acceptable_targets: if target.id not in acceptable_targets:
log_ignored_line(node.lineno, target.id + " is not a valid setting") log_ignored_line(node.lineno, f"{target.id} is not a valid setting")
continue continue
if type(node.value) not in attributes: if type(node.value) not in attributes:
@@ -645,6 +645,6 @@ def settings_page():
for func, old_value, value in to_call: for func, old_value, value in to_call:
func(old_value, value) func(old_value, value)
return flask.redirect(util.URL_ORIGIN + '/settings', 303) return flask.redirect(f'{util.URL_ORIGIN}/settings', 303)
else: else:
flask.abort(400) flask.abort(400)

View File

@@ -27,7 +27,7 @@ class TestChannelCtokenV5:
def _decode_outer(self, ctoken): def _decode_outer(self, ctoken):
"""Decode the outer protobuf layer of a ctoken.""" """Decode the outer protobuf layer of a ctoken."""
raw = base64.urlsafe_b64decode(ctoken + '==') raw = base64.urlsafe_b64decode(f'{ctoken}==')
return {fn: val for _, fn, val in proto.read_protobuf(raw)} return {fn: val for _, fn, val in proto.read_protobuf(raw)}
def test_shorts_token_generates_without_error(self): def test_shorts_token_generates_without_error(self):
@@ -68,8 +68,8 @@ class TestChannelCtokenV5:
assert t_with_shorts != t_without_shorts assert t_with_shorts != t_without_shorts
# Decode and verify the filter is present # Decode and verify the filter is present
raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==') raw_with_shorts = base64.urlsafe_b64decode(f'{t_with_shorts}==')
raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==') raw_without_shorts = base64.urlsafe_b64decode(f'{t_without_shorts}==')
# Parse the outer protobuf structure # Parse the outer protobuf structure
import youtube.proto as proto import youtube.proto as proto
@@ -95,8 +95,8 @@ class TestChannelCtokenV5:
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii')) decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
# Decode the base64 data # Decode the base64 data
decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==') decoded_with_bytes = base64.urlsafe_b64decode(f'{decoded_with}==')
decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==') decoded_without_bytes = base64.urlsafe_b64decode(f'{decoded_without}==')
# Parse the decoded protobuf data # Parse the decoded protobuf data
fields_with = list(proto.read_protobuf(decoded_with_bytes)) fields_with = list(proto.read_protobuf(decoded_with_bytes))

View File

@@ -0,0 +1,72 @@
import pytest
from youtube import watch_formats
class TestCodecName:
def test_avc_returns_h264(self):
assert watch_formats.codec_name('avc1.64001F') == 'h264'
def test_av01_returns_av1(self):
assert watch_formats.codec_name('av01.0.05M.08') == 'av1'
def test_vp9_returns_vp(self):
assert watch_formats.codec_name('vp9') == 'vp'
def test_unknown_returns_unknown(self):
assert watch_formats.codec_name('unknown_codec') == 'unknown'
class TestVideoQualityString:
def test_with_vcodec(self):
fmt = {'vcodec': 'avc1', 'width': 1920, 'height': 1080, 'fps': 30}
assert watch_formats.video_quality_string(fmt) == '1920x1080 30fps'
def test_with_vcodec_no_fps(self):
fmt = {'vcodec': 'avc1', 'width': 1280, 'height': 720}
assert watch_formats.video_quality_string(fmt) == '1280x720'
def test_with_acodec_only(self):
fmt = {'acodec': 'mp4a.40.2'}
assert watch_formats.video_quality_string(fmt) == 'audio only'
def test_empty(self):
fmt = {}
assert watch_formats.video_quality_string(fmt) == '?'
class TestShortVideoQualityString:
def test_with_fps(self):
fmt = {'quality': 1080, 'fps': 60, 'vcodec': 'av01.0.05M.08'}
assert watch_formats.short_video_quality_string(fmt) == '1080p60 AV1'
def test_h264(self):
fmt = {'quality': 720, 'fps': 30, 'vcodec': 'avc1.64001E'}
assert watch_formats.short_video_quality_string(fmt) == '720p30 h264'
class TestAudioQualityString:
def test_with_bitrate(self):
fmt = {'acodec': 'mp4a.40.2', 'audio_bitrate': 128}
assert watch_formats.audio_quality_string(fmt) == '128k'
def test_with_sample_rate(self):
fmt = {'acodec': 'mp4a.40.2', 'audio_bitrate': 128, 'audio_sample_rate': 44100}
assert watch_formats.audio_quality_string(fmt) == '128k 44.1kHz'
def test_video_only(self):
fmt = {'vcodec': 'avc1'}
assert watch_formats.audio_quality_string(fmt) == 'video only'
class TestFormatBytes:
def test_none(self):
assert watch_formats.format_bytes(None) == 'N/A'
def test_bytes(self):
assert watch_formats.format_bytes(512) == '512.00B'
def test_kibibytes(self):
assert watch_formats.format_bytes(1024) == '1.00KiB'
def test_mebibytes(self):
assert watch_formats.format_bytes(1048576) == '1.00MiB'

View File

@@ -76,7 +76,7 @@ theme_names = {
@yt_app.context_processor @yt_app.context_processor
def inject_theme_preference(): def inject_theme_preference():
return { return {
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css', 'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css',
'settings': settings, 'settings': settings,
# Detect version # Detect version
'current_version': app_version()['version'], 'current_version': app_version()['version'],
@@ -145,9 +145,9 @@ def error_page(e):
' exit node is overutilized. Try getting a new exit node by' ' exit node is overutilized. Try getting a new exit node by'
' using the New Identity button in the Tor Browser.') ' using the New Identity button in the Tor Browser.')
if fetch_err.error_message: if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message error_message += f'\n\n{fetch_err.error_message}'
if fetch_err.ip: if fetch_err.ip:
error_message += '\n\nExit node IP address: ' + fetch_err.ip error_message += f'\n\nExit node IP address: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif error_code == '429': elif error_code == '429':
@@ -157,7 +157,7 @@ def error_page(e):
'• Enable Tor routing in Settings for automatic IP rotation\n' '• Enable Tor routing in Settings for automatic IP rotation\n'
'• Use a VPN to change your IP address') '• Use a VPN to change your IP address')
if fetch_err.ip: if fetch_err.ip:
error_message += '\n\nYour IP: ' + fetch_err.ip error_message += f'\n\nYour IP: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 429 return flask.render_template('error.html', error_message=error_message, slim=slim), 429
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)): elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
@@ -179,7 +179,7 @@ def error_page(e):
# Catch-all for any other FetchError (400, etc.) # Catch-all for any other FetchError (400, etc.)
error_message = f'Error communicating with YouTube ({error_code}).' error_message = f'Error communicating with YouTube ({error_code}).'
if fetch_err.error_message: if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message error_message += f'\n\n{fetch_err.error_message}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', error_message=error_message, slim=slim), 502
return flask.render_template('error.html', traceback=traceback.format_exc(), return flask.render_template('error.html', traceback=traceback.format_exc(),

View File

@@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
# For now it seems to be constant for the API endpoint, not dependent # For now it seems to be constant for the API endpoint, not dependent
# on the browsing session or channel # on the browsing session or channel
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = { data = {
'context': { 'context': {
@@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id):
return 1000 return 1000
# Uploads playlist # Uploads playlist
playlist_id = 'UU' + channel_id[2:] playlist_id = f'UU{channel_id[2:]}'
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1'
try: try:
response = util.fetch_url(url, headers_mobile, response = util.fetch_url(url, headers_mobile,
@@ -328,7 +328,7 @@ def get_channel_id(base_url):
# method that gives the smallest possible response at ~4 kb # method that gives the smallest possible response at ~4 kb
# needs to be as fast as possible # needs to be as fast as possible
base_url = base_url.replace('https://www', 'https://m') # avoid redirect base_url = base_url.replace('https://www', 'https://m') # avoid redirect
response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile,
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
match = channel_id_re.search(response) match = channel_id_re.search(response)
if match: if match:
@@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page):
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = { data = {
'context': { 'context': {
@@ -414,18 +414,18 @@ def post_process_channel_info(info):
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
if channel_id: if channel_id:
base_url = 'https://www.youtube.com/channel/' + channel_id base_url = f'https://www.youtube.com/channel/{channel_id}'
# Build URL with sort parameter # Build URL with sort parameter
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
# Note: 'da' (oldest) was removed by YouTube in January 2026 # Note: 'da' (oldest) was removed by YouTube in January 2026
url = base_url + '/' + tab + '?pbj=1&view=0' url = f'{base_url}/{tab}?pbj=1&view=0'
if sort: if sort:
# Map sort values to YouTube's URL parameter values # Map sort values to YouTube's URL parameter values
sort_map = {'3': 'dd', '4': 'lad'} sort_map = {'3': 'dd', '4': 'lad'}
url += '&sort=' + sort_map.get(sort, 'dd') url += f'&sort={sort_map.get(sort, "dd")}'
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}')
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
@@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
if page_number == 1: if page_number == 1:
tasks = ( tasks = (
gevent.spawn(playlist.playlist_first_page, gevent.spawn(playlist.playlist_first_page,
'UU' + channel_id[2:], f'UU{channel_id[2:]}',
report_text='Retrieved channel videos'), report_text='Retrieved channel videos'),
gevent.spawn(get_metadata, channel_id), gevent.spawn(get_metadata, channel_id),
) )
@@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
set_cached_number_of_videos(channel_id, number_of_videos) set_cached_number_of_videos(channel_id, number_of_videos)
else: else:
tasks = ( tasks = (
gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}',
page_number, include_shorts=True), page_number, include_shorts=True),
gevent.spawn(get_metadata, channel_id), gevent.spawn(get_metadata, channel_id),
gevent.spawn(get_number_of_videos_channel, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id),
gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}',
report_text='Retrieved channel video count'), report_text='Retrieved channel video count'),
) )
gevent.joinall(tasks) gevent.joinall(tasks)
@@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
elif tab == 'search' and channel_id: elif tab == 'search' and channel_id:
polymer_json = get_channel_search_json(channel_id, query, page_number) polymer_json = get_channel_search_json(channel_id, query, page_number)
elif tab == 'search': elif tab == 'search':
url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}'
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
elif tab != 'videos': elif tab != 'videos':
flask.abort(404, 'Unknown channel tab: ' + tab) flask.abort(404, f'Unknown channel tab: {tab}')
if polymer_json is not None and info is None: if polymer_json is not None and info is None:
info = yt_data_extract.extract_channel_info( info = yt_data_extract.extract_channel_info(
@@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
return flask.render_template('error.html', error_message=info['error']) return flask.render_template('error.html', error_message=info['error'])
if channel_id: if channel_id:
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}'
info['channel_id'] = channel_id info['channel_id'] = channel_id
else: else:
channel_id = info['channel_id'] channel_id = info['channel_id']
@@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
@yt_app.route('/channel/<channel_id>/') @yt_app.route('/channel/<channel_id>/')
@yt_app.route('/channel/<channel_id>/<tab>') @yt_app.route('/channel/<channel_id>/<tab>')
def get_channel_page(channel_id, tab='videos'): def get_channel_page(channel_id, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id)
@yt_app.route('/user/<username>/') @yt_app.route('/user/<username>/')
@yt_app.route('/user/<username>/<tab>') @yt_app.route('/user/<username>/<tab>')
def get_user_page(username, tab='videos'): def get_user_page(username, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request)
@yt_app.route('/c/<custom>/') @yt_app.route('/c/<custom>/')
@yt_app.route('/c/<custom>/<tab>') @yt_app.route('/c/<custom>/<tab>')
def get_custom_c_page(custom, tab='videos'): def get_custom_c_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request)
@yt_app.route('/<custom>') @yt_app.route('/<custom>')
@yt_app.route('/<custom>/<tab>') @yt_app.route('/<custom>/<tab>')
def get_toplevel_custom_page(custom, tab='videos'): def get_toplevel_custom_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request)

View File

@@ -104,20 +104,19 @@ def post_process_comments_info(comments_info):
comment['replies_url'] = None comment['replies_url'] = None
comment['replies_url'] = concat_or_none( comment['replies_url'] = concat_or_none(
util.URL_ORIGIN, util.URL_ORIGIN,
'/comments?replies=1&ctoken=' + ctoken) f'/comments?replies=1&ctoken={ctoken}')
if reply_count == 0: if reply_count == 0:
comment['view_replies_text'] = 'Reply' comment['view_replies_text'] = 'Reply'
elif reply_count == 1: elif reply_count == 1:
comment['view_replies_text'] = '1 reply' comment['view_replies_text'] = '1 reply'
else: else:
comment['view_replies_text'] = str(reply_count) + ' replies' comment['view_replies_text'] = f'{reply_count} replies'
if comment['approx_like_count'] == '1': if comment['approx_like_count'] == '1':
comment['likes_text'] = '1 like' comment['likes_text'] = '1 like'
else: else:
comment['likes_text'] = (str(comment['approx_like_count']) comment['likes_text'] = f"{comment['approx_like_count']} likes"
+ ' likes')
comments_info['include_avatars'] = settings.enable_comment_avatars comments_info['include_avatars'] = settings.enable_comment_avatars
if comments_info['ctoken']: if comments_info['ctoken']:
@@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
comments_info = {'error': None} comments_info = {'error': None}
try: try:
other_sort_url = ( other_sort_url = (
util.URL_ORIGIN + '/comments?ctoken=' f"{util.URL_ORIGIN}/comments?ctoken="
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc) f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}"
) )
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top') other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}'
this_sort_url = (util.URL_ORIGIN this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken="
+ '/comments?ctoken=' f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}")
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
comments_info['comment_links'] = [ comments_info['comment_links'] = [
(other_sort_text, other_sort_url), (other_sort_text, other_sort_url),
@@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if e.code == '429' and settings.route_tor: if e.code == '429' and settings.route_tor:
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.' comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
if e.error_message: if e.error_message:
comments_info['error'] += '\n\n' + e.error_message comments_info['error'] += f'\n\n{e.error_message}'
comments_info['error'] += '\n\nExit node IP address: %s' % e.ip comments_info['error'] += f'\n\nExit node IP address: {e.ip}'
else: else:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) comments_info['error'] = f'YouTube blocked the request. Error: {e}'
except Exception as e: except Exception as e:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) comments_info['error'] = f'YouTube blocked the request. Error: {e}'
if comments_info.get('error'): if comments_info.get('error'):
print('Error retrieving comments for ' + str(video_id) + ':\n' + print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}')
comments_info['error'])
return comments_info return comments_info
@@ -218,12 +215,10 @@ def get_comments_page():
other_sort_url = None other_sort_url = None
else: else:
other_sort_url = ( other_sort_url = (
util.URL_ORIGIN f'{util.URL_ORIGIN}/comments?ctoken='
+ '/comments?ctoken=' f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}'
+ make_comment_ctoken(comments_info['video_id'],
sort=1-comments_info['sort'])
) )
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top') other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}'
comments_info['comment_links'] = [(other_sort_text, other_sort_url)] comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
return flask.render_template( return flask.render_template(

190
youtube/constants.py Normal file
View File

@@ -0,0 +1,190 @@
"""Constants used across yt-local application."""
import collections
YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com')
DEFAULT_USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'
MOBILE_USER_AGENT = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
REPLACEMENT_MAP = collections.OrderedDict([
('<', '_'),
('>', '_'),
(': ', ' - '),
(':', '-'),
('"', "'"),
('/', '_'),
('\\', '_'),
('|', '-'),
('?', ''),
('*', '_'),
('\t', ' '),
])
DOS_RESERVED_NAMES = frozenset({
'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
'lpt8', 'lpt9'
})
INNERTUBE_CLIENTS = {
'android': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'ANDROID',
'clientVersion': '21.02.35',
'osName': 'Android',
'osVersion': '11',
'androidSdkVersion': 30,
'platform': 'MOBILE',
'userAgent': 'com.google.android.youtube/21.02.35 (Linux; U; Android 11) gzip'
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
},
'ios': {
'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS',
'clientVersion': '21.02.3',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.3.2.22D82',
'userAgent': 'com.google.ios.youtube/21.02.3 (iPhone16,2; U; CPU iOS 18_3_2 like Mac OS X;)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False
},
'tv_embedded': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5_SIMPLY',
'clientVersion': '1.0',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 75,
'REQUIRE_JS_PLAYER': True,
},
'web': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB',
'clientVersion': '2.20260114.08.00',
'userAgent': DEFAULT_USER_AGENT,
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1
},
'web_embedded': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'WEB_EMBEDDED_PLAYER',
'clientVersion': '1.20260115.01.00',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 56,
'REQUIRE_JS_PLAYER': True,
},
'mweb': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'MWEB',
'clientVersion': '2.20260115.01.00',
'userAgent': 'Mozilla/5.0 (iPad; CPU OS 16_7_10 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1,gzip(gfe)',
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 2,
'REQUIRE_JS_PLAYER': True,
},
'tv': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5',
'clientVersion': '7.20260114.12.00',
'userAgent': 'Mozilla/5.0 (ChromiumStylePlatform) Cobalt/25.lts.30.1034943-gold (unlike Gecko), Unknown_TV_Unknown_0/Unknown (Unknown, Unknown)',
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 7,
'REQUIRE_JS_PLAYER': True,
},
'android_vr': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_VR',
'clientVersion': '1.65.10',
'deviceMake': 'Oculus',
'deviceModel': 'Quest 3',
'androidSdkVersion': 32,
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.65.10 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'osName': 'Android',
'osVersion': '12L',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
'REQUIRE_JS_PLAYER': False,
},
}
THEME_NAMES = {
0: 'light_theme',
1: 'gray_theme',
2: 'dark_theme',
}
FONT_CHOICES = {
0: 'initial',
1: '"liberation serif", "times new roman", calibri, carlito, serif',
2: 'arial, "liberation sans", sans-serif',
3: 'verdana, sans-serif',
4: 'tahoma, sans-serif',
}
URL_ORIGIN = "/https://www.youtube.com"
MAX_RETRIES = 5
BASE_DELAY = 1.0
TOR_DEFAULT_PORT = 9050
TOR_CONTROL_DEFAULT_PORT = 9151
DEFAULT_PORT = 9010
# Backward compatibility aliases (matching existing code names)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
mobile_ua = (('User-Agent', MOBILE_USER_AGENT),)
json_header = (('Content-Type', 'application/json'),)
# Re-export for convenience
url_origin = URL_ORIGIN

View File

@@ -41,8 +41,8 @@ def app_version():
describe = minimal_env_cmd(['git', 'describe', '--tags', '--always']) describe = minimal_env_cmd(['git', 'describe', '--tags', '--always'])
git_revision = describe.strip().decode('ascii') git_revision = describe.strip().decode('ascii')
branch = minimal_env_cmd(['git', 'branch']) branch = minimal_env_cmd(['git', 'branch', '--show-current'])
git_branch = branch.strip().decode('ascii').replace('* ', '') git_branch = branch.strip().decode('ascii')
subst_list.update({ subst_list.update({
'branch': git_branch, 'branch': git_branch,

View File

@@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name):
util.add_extra_html_info(video) util.add_extra_html_info(video)
if video['id'] + '.jpg' in thumbnails: if video['id'] + '.jpg' in thumbnails:
video['thumbnail'] = ( video['thumbnail'] = (
'/https://youtube.com/data/playlist_thumbnails/' f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg')
+ playlist_name
+ '/' + video['id'] + '.jpg')
else: else:
video['thumbnail'] = util.get_thumbnail_url(video['id']) video['thumbnail'] = util.get_thumbnail_url(video['id'])
missing_thumbnails.append(video['id']) missing_thumbnails.append(video['id'])

View File

@@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
continuation_info = proto.string(3, proto.percent_b64encode(offset)) continuation_info = proto.string(3, proto.percent_b64encode(offset))
playlist_id = proto.string(2, 'VL' + playlist_id) playlist_id = proto.string(2, f'VL{playlist_id}')
pointless_nest = proto.string(80226972, playlist_id + continuation_info) pointless_nest = proto.string(80226972, playlist_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii') return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
@@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
use_mobile=False): use_mobile=False):
# Use innertube API (pbj=1 no longer works for many playlists) # Use innertube API (pbj=1 no longer works for many playlists)
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = { data = {
'context': { 'context': {
@@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
'clientVersion': '2.20240327.00.00', 'clientVersion': '2.20240327.00.00',
}, },
}, },
'browseId': 'VL' + playlist_id, 'browseId': f'VL{playlist_id}',
} }
content_type_header = (('Content-Type', 'application/json'),) content_type_header = (('Content-Type', 'application/json'),)
@@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
page_size = 100 page_size = 100
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size, ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
include_shorts=include_shorts) include_shorts=include_shorts)
@@ -97,7 +97,7 @@ def get_playlist_page():
if playlist_id.startswith('RD'): if playlist_id.startswith('RD'):
first_video_id = playlist_id[2:] # video ID after 'RD' prefix first_video_id = playlist_id[2:] # video ID after 'RD' prefix
return flask.redirect( return flask.redirect(
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id, f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}',
302 302
) )
@@ -132,9 +132,9 @@ def get_playlist_page():
if 'id' in item and not item.get('thumbnail'): if 'id' in item and not item.get('thumbnail'):
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg" item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
item['url'] += '&list=' + playlist_id item['url'] += f'&list={playlist_id}'
if item['index']: if item['index']:
item['url'] += '&index=' + str(item['index']) item['url'] += f'&index={item["index"]}'
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count') video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
if video_count is None: if video_count is None:

View File

@@ -76,7 +76,7 @@ def read_varint(data):
except IndexError: except IndexError:
if i == 0: if i == 0:
raise EOFError() raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i result |= (byte & 127) << 7*i
if not byte & 128: if not byte & 128:
break break
@@ -118,7 +118,7 @@ def read_protobuf(data):
elif wire_type == 5: elif wire_type == 5:
value = data.read(4) value = data.read(4)
else: else:
raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell())) raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}")
yield (wire_type, field_number, value) yield (wire_type, field_number, value)
@@ -170,8 +170,7 @@ def _make_protobuf(data):
elif field[0] == 2: elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2])) result += string(field[1], _make_protobuf(field[2]))
else: else:
raise NotImplementedError('Wire type ' + str(field[0]) raise NotImplementedError(f'Wire type {field[0]} not implemented')
+ ' not implemented')
return result return result
return data return data
@@ -218,4 +217,4 @@ def b64_to_bytes(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
data = data.replace("%3D", "=") data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')

View File

@@ -179,7 +179,7 @@ def read_varint(data):
except IndexError: except IndexError:
if i == 0: if i == 0:
raise EOFError() raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i result |= (byte & 127) << 7*i
if not byte & 128: if not byte & 128:
break break
@@ -235,8 +235,7 @@ def _make_protobuf(data):
elif field[0] == 2: elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2])) result += string(field[1], _make_protobuf(field[2]))
else: else:
raise NotImplementedError('Wire type ' + str(field[0]) raise NotImplementedError(f'Wire type {field[0]} not implemented')
+ ' not implemented')
return result return result
return data return data
@@ -286,7 +285,7 @@ def b64_to_bytes(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
data = data.replace("%3D", "=") data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
# -------------------------------------------------------------------- # --------------------------------------------------------------------
@@ -344,7 +343,7 @@ fromhex = bytes.fromhex
def aligned_ascii(data): def aligned_ascii(data):
return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data) return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data)
def parse_protobuf(data, mutable=False, spec=()): def parse_protobuf(data, mutable=False, spec=()):
@@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()):
elif wire_type == 5: elif wire_type == 5:
value = data.read(4) value = data.read(4)
else: else:
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell())) raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}")
if mutable: if mutable:
yield [wire_type, field_number, value] yield [wire_type, field_number, value]
else: else:
@@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None):
if map01 is not None: if map01 is not None:
map01 = _bytes_from_decode_data(map01) map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01) assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b'01', b'O' + map01)) s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}'))
if casefold: if casefold:
s = s.upper() s = s.upper()
# Strip off pad characters from the right. We need to count the pad # Strip off pad characters from the right. We need to count the pad
@@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None):
def dec32(data): def dec32(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
return b32decode(data + "="*((8 - len(data)%8)%8)) return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}')
_patterns = [ _patterns = [
@@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work
if len(obj) == 3: # (wire_type, field_number, data) if len(obj) == 3: # (wire_type, field_number, data)
return obj.__repr__() return obj.__repr__()
else: # (base64, [...]) else: # (base64, [...])
return ('(' + obj[0].__repr__() + ',\n' return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)"
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
+ ')')
elif isinstance(obj, list): elif isinstance(obj, list):
# [wire_type, field_number, data] # [wire_type, field_number, data]
if (len(obj) == 3 if (len(obj) == 3
@@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work
elif (len(obj) == 3 elif (len(obj) == 3
and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
): ):
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]"
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
+ ']')
else: else:
s = '[\n' s = '[\n'
for x in obj: for x in obj:
s += indent_lines(_pp(x, indent), indent) + ',\n' s += f"{indent_lines(_pp(x, indent), indent)},\n"
s += ']' s += ']'
return s return s
else: else:

View File

@@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Name': '1', 'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418', 'X-YouTube-Client-Version': '2.20180418',
} }
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D") url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}"
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results') content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
info = json.loads(content) info = json.loads(content)
return info return info

View File

@@ -150,7 +150,7 @@
* Create custom quality control in Plyr controls * Create custom quality control in Plyr controls
*/ */
function addCustomQualityControl(player, qualityLabels) { function addCustomQualityControl(player, qualityLabels) {
player.on('ready', () => { function doAdd() {
console.log('Adding custom quality control...'); console.log('Adding custom quality control...');
const controls = player.elements.container.querySelector('.plyr__controls'); const controls = player.elements.container.querySelector('.plyr__controls');
@@ -238,14 +238,21 @@
} }
console.log('Custom quality control added'); console.log('Custom quality control added');
}); }
// Run immediately if Plyr is already ready, otherwise wait
if (player.ready) {
doAdd();
} else {
player.on('ready', doAdd);
}
} }
/** /**
* Create custom audio tracks control in Plyr controls * Create custom audio tracks control in Plyr controls
*/ */
function addCustomAudioTracksControl(player, hlsInstance) { function addCustomAudioTracksControl(player, hlsInstance) {
player.on('ready', () => { function doAdd() {
console.log('Adding custom audio tracks control...'); console.log('Adding custom audio tracks control...');
const controls = player.elements.container.querySelector('.plyr__controls'); const controls = player.elements.container.querySelector('.plyr__controls');
@@ -397,117 +404,13 @@
}); });
console.log('Custom audio tracks control added'); console.log('Custom audio tracks control added');
});
}
/**
* Initialize Plyr with HLS quality options
*/
function initPlyrWithQuality(hlsInstance) {
const video = document.getElementById('js-video-player');
if (!hlsInstance || !hlsInstance.levels || hlsInstance.levels.length === 0) {
console.error('HLS not ready');
return;
} }
if (!video) { // Run immediately if Plyr is already ready, otherwise wait
console.error('Video element not found'); if (player.ready) {
return; doAdd();
} } else {
player.on('ready', doAdd);
console.log('HLS levels available:', hlsInstance.levels.length);
const sortedLevels = [...hlsInstance.levels].sort((a, b) => b.height - a.height);
const seenHeights = new Set();
const uniqueLevels = [];
sortedLevels.forEach((level) => {
if (!seenHeights.has(level.height)) {
seenHeights.add(level.height);
uniqueLevels.push(level);
}
});
const qualityLabels = ['auto'];
uniqueLevels.forEach((level) => {
const originalIndex = hlsInstance.levels.indexOf(level);
const label = level.height + 'p';
if (!window.hlsQualityMap[label]) {
qualityLabels.push(label);
window.hlsQualityMap[label] = originalIndex;
}
});
console.log('Quality labels:', qualityLabels);
const playerOptions = {
autoplay: autoplayActive,
disableContextMenu: false,
captions: {
active: captionsActive,
language: typeof data !== 'undefined' ? data.settings.subtitles_language : 'en',
},
controls: [
'play-large',
'play',
'progress',
'current-time',
'duration',
'mute',
'volume',
'captions',
'settings',
'pip',
'airplay',
'fullscreen',
],
iconUrl: '/youtube.com/static/modules/plyr/plyr.svg',
blankVideo: '/youtube.com/static/modules/plyr/blank.webm',
debug: false,
storage: { enabled: false },
previewThumbnails: {
enabled: typeof storyboard_url !== 'undefined' && storyboard_url !== null,
src: typeof storyboard_url !== 'undefined' && storyboard_url !== null ? [storyboard_url] : [],
},
settings: ['captions', 'speed', 'loop'],
tooltips: {
controls: true,
},
};
console.log('Creating Plyr...');
try {
plyrInstance = new Plyr(video, playerOptions);
console.log('Plyr instance created');
window.plyrInstance = plyrInstance;
addCustomQualityControl(plyrInstance, qualityLabels);
addCustomAudioTracksControl(plyrInstance, hlsInstance);
if (plyrInstance.eventListeners) {
plyrInstance.eventListeners.forEach(function(eventListener) {
if(eventListener.type === 'dblclick') {
eventListener.element.removeEventListener(eventListener.type, eventListener.callback, eventListener.options);
}
});
}
plyrInstance.started = false;
plyrInstance.once('playing', function(){this.started = true});
if (typeof data !== 'undefined' && data.time_start != 0) {
video.addEventListener('loadedmetadata', function() {
video.currentTime = data.time_start;
});
}
console.log('Plyr init complete');
} catch (e) {
console.error('Failed to initialize Plyr:', e);
} }
} }
@@ -522,14 +425,98 @@
return; return;
} }
// Initialize Plyr immediately so the player UI shows right away
// instead of a bare <video> element while the manifest loads.
const video = document.getElementById('js-video-player');
if (video) {
plyrInstance = new Plyr(video, {
autoplay: autoplayActive,
disableContextMenu: false,
captions: {
active: captionsActive,
language: typeof data !== 'undefined' ? data.settings.subtitles_language : 'en',
},
controls: [
'play-large',
'play',
'progress',
'current-time',
'duration',
'mute',
'volume',
'captions',
'settings',
'pip',
'airplay',
'fullscreen',
],
iconUrl: '/youtube.com/static/modules/plyr/plyr.svg',
blankVideo: '/youtube.com/static/modules/plyr/blank.webm',
debug: false,
storage: { enabled: false },
previewThumbnails: {
enabled: typeof storyboard_url !== 'undefined' && storyboard_url !== null,
src: typeof storyboard_url !== 'undefined' && storyboard_url !== null ? [storyboard_url] : [],
},
settings: ['captions', 'speed', 'loop'],
tooltips: { controls: true },
});
window.plyrInstance = plyrInstance;
if (plyrInstance.eventListeners) {
plyrInstance.eventListeners.forEach(function(eventListener) {
if (eventListener.type === 'dblclick') {
eventListener.element.removeEventListener(
eventListener.type, eventListener.callback, eventListener.options);
}
});
}
plyrInstance.started = false;
plyrInstance.once('playing', function(){ this.started = true; });
if (typeof data !== 'undefined' && data.time_start != 0) {
video.addEventListener('loadedmetadata', function() {
video.currentTime = data.time_start;
});
}
}
try { try {
const hlsInstance = await initHLS(hls_manifest_url); const hlsInstance = await initHLS(hls_manifest_url);
initPlyrWithQuality(hlsInstance); // Manifest is ready — add quality and audio controls
addCustomQualityControl(plyrInstance, buildQualityLabels(hlsInstance));
addCustomAudioTracksControl(plyrInstance, hlsInstance);
} catch (error) { } catch (error) {
console.error('Failed to initialize:', error); console.error('Failed to initialize HLS:', error);
} }
} }
/**
* Build quality labels from HLS levels
*/
function buildQualityLabels(hlsInstance) {
const qualityLabels = ['auto'];
if (!hlsInstance || !hlsInstance.levels) return qualityLabels;
const sortedLevels = [...hlsInstance.levels].sort((a, b) => b.height - a.height);
const seenHeights = new Set();
sortedLevels.forEach((level) => {
if (!seenHeights.has(level.height)) {
seenHeights.add(level.height);
const originalIndex = hlsInstance.levels.indexOf(level);
const label = level.height + 'p';
if (!window.hlsQualityMap[label]) {
qualityLabels.push(label);
window.hlsQualityMap[label] = originalIndex;
}
}
});
return qualityLabels;
}
if (document.readyState === 'loading') { if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', start); document.addEventListener('DOMContentLoaded', start);
} else { } else {

View File

@@ -31,9 +31,34 @@ if (data.using_pair_sources) {
avMerge = new AVMerge(video, srcPair, 0); avMerge = new AVMerge(video, srcPair, 0);
} }
// Quality selector // Quality selector — populate with available sources
const qs = document.getElementById('quality-select'); const qs = document.getElementById('quality-select');
if (qs) { if (qs) {
// Clear the HLS-oriented "Auto" default; DASH has discrete sources
qs.innerHTML = '';
// Add pair_sources (video+audio, used by AVMerge)
if (data['pair_sources'] && data['pair_sources'].length) {
data['pair_sources'].forEach(function(src, i) {
let opt = document.createElement('option');
opt.value = JSON.stringify({type: 'pair', index: i});
opt.textContent = src.quality_string;
if (i === data['pair_idx']) opt.selected = true;
qs.appendChild(opt);
});
}
// Add uni_sources (integrated video+audio, single file)
if (data['uni_sources'] && data['uni_sources'].length) {
data['uni_sources'].forEach(function(src, i) {
let opt = document.createElement('option');
opt.value = JSON.stringify({type: 'uni', index: i});
opt.textContent = src.quality_string;
if (!data['pair_sources'].length && i === data['uni_idx']) opt.selected = true;
qs.appendChild(opt);
});
}
qs.addEventListener('change', function(e) { qs.addEventListener('change', function(e) {
changeQuality(JSON.parse(this.value)) changeQuality(JSON.parse(this.value))
}); });

View File

@@ -26,7 +26,17 @@ function initHLSNative(manifestUrl) {
lowLatencyMode: false, lowLatencyMode: false,
maxBufferLength: 30, maxBufferLength: 30,
maxMaxBufferLength: 60, maxMaxBufferLength: 60,
maxBufferHole: 0.5,
startLevel: -1, startLevel: -1,
// Prevent stalls on quality switch: nudge playback past small gaps
nudgeMaxRetry: 5,
// Allow more time for segments coming through our proxy
fragLoadingTimeOut: 30000,
fragLoadingMaxRetry: 5,
fragLoadingRetryDelay: 1000,
levelLoadingTimeOut: 15000,
levelLoadingMaxRetry: 4,
levelLoadingRetryDelay: 1000,
}); });
window.hls = hls; window.hls = hls;
@@ -89,15 +99,26 @@ function initHLSNative(manifestUrl) {
console.error('HLS fatal error:', data.type, data.details); console.error('HLS fatal error:', data.type, data.details);
switch(data.type) { switch(data.type) {
case Hls.ErrorTypes.NETWORK_ERROR: case Hls.ErrorTypes.NETWORK_ERROR:
console.warn('HLS network error, attempting recovery...');
hls.startLoad(); hls.startLoad();
break; break;
case Hls.ErrorTypes.MEDIA_ERROR: case Hls.ErrorTypes.MEDIA_ERROR:
console.warn('HLS media error, attempting recovery...');
hls.recoverMediaError(); hls.recoverMediaError();
break; break;
default: default:
hls.destroy(); hls.destroy();
break; break;
} }
} else {
// Non-fatal errors can still cause stalls, especially
// bufferStalledError after a quality switch through our proxy
console.warn('HLS non-fatal error:', data.type, data.details);
if (data.details === 'bufferStalledError') {
// Buffer ran dry — HLS.js is waiting for data.
// Nudge it to retry loading the current fragment.
hls.startLoad();
}
} }
}); });
@@ -122,13 +143,36 @@ function initPlayer() {
initHLSNative(hls_manifest_url); initHLSNative(hls_manifest_url);
const qualitySelect = document.getElementById('quality-select'); const qualitySelect = document.getElementById('quality-select');
// Set initial Auto option while manifest loads
if (qualitySelect) {
qualitySelect.innerHTML = '<option value="-1" selected>Auto</option>';
}
if (qualitySelect) { if (qualitySelect) {
qualitySelect.addEventListener('change', function () { qualitySelect.addEventListener('change', function () {
const level = parseInt(this.value); const level = parseInt(this.value);
if (hls) { if (hls) {
hls.currentLevel = level; const currentTime = video.currentTime;
console.log('Quality:', level === -1 ? 'Auto' : hls.levels[level]?.height + 'p'); const wasPaused = video.paused;
// Use nextLevel for smoother transition: it waits for the
// current segment to finish before switching, avoiding an
// abrupt buffer flush that starves the player.
if (level === -1) {
// Back to auto — re-enable ABR
hls.currentLevel = -1;
console.log('Quality: Auto (ABR)');
} else {
hls.nextLevel = level;
console.log('Quality: switching to',
hls.levels[level]?.height + 'p');
}
// If the video was already stalled, kick the loader
// so it starts fetching the new level immediately.
if (video.readyState < 3) {
hls.startLoad(currentTime);
}
} }
}); });
} }

View File

@@ -126,7 +126,7 @@ def delete_thumbnails(to_delete):
os.remove(os.path.join(thumbnails_directory, thumbnail)) os.remove(os.path.join(thumbnails_directory, thumbnail))
existing_thumbnails.remove(video_id) existing_thumbnails.remove(video_id)
except Exception: except Exception:
print('Failed to delete thumbnail: ' + thumbnail) print(f'Failed to delete thumbnail: {thumbnail}')
traceback.print_exc() traceback.print_exc()
@@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None):
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]), 'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'author': db_video[5], 'author': db_video[5],
'author_id': db_video[6], 'author_id': db_video[6],
'author_url': '/https://www.youtube.com/channel/' + db_video[6], 'author_url': f'/https://www.youtube.com/channel/{db_video[6]}',
}) })
return videos, pseudo_number_of_videos return videos, pseudo_number_of_videos
@@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time):
if delta >= unit_time: if delta >= unit_time:
quantifier = round(delta/unit_time) quantifier = round(delta/unit_time)
if quantifier == 1: if quantifier == 1:
return '1 ' + unit_name + ' ago' return f'1 {unit_name} ago'
else: else:
return str(quantifier) + ' ' + unit_name + 's ago' return f'{quantifier} {unit_name}s ago'
else: else:
raise Exception() raise Exception()
@@ -363,7 +363,7 @@ def autocheck_dispatcher():
time_until_earliest_job = earliest_job['next_check_time'] - time.time() time_until_earliest_job = earliest_job['next_check_time'] - time.time()
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}')
next_check_time = time.time() + 3600*secrets.randbelow(60)/60 next_check_time = time.time() + 3600*secrets.randbelow(60)/60
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
@@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids):
def _get_atoma_feed(channel_id): def _get_atoma_feed(channel_id):
url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}'
try: try:
return util.fetch_url(url).decode('utf-8') return util.fetch_url(url).decode('utf-8')
except util.FetchError as e: except util.FetchError as e:
@@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name):
return channel_info return channel_info
except util.FetchError as e: except util.FetchError as e:
if e.code == '429' and settings.route_tor: if e.code == '429' and settings.route_tor:
error_message = ('Error checking channel ' + channel_status_name error_message = (f'Error checking channel {channel_status_name}: '
+ ': YouTube blocked the request because the' f'YouTube blocked the request because the Tor exit node is overutilized. '
+ ' Tor exit node is overutilized. Try getting a new exit node' f'Try getting a new exit node by using the New Identity button in the Tor Browser.')
+ ' by using the New Identity button in the Tor Browser.')
if e.ip: if e.ip:
error_message += ' Exit node IP address: ' + e.ip error_message += f' Exit node IP address: {e.ip}'
print(error_message) print(error_message)
return None return None
elif e.code == '502': elif e.code == '502':
print('Error checking channel', channel_status_name + ':', str(e)) print(f'Error checking channel {channel_status_name}: {e}')
return None return None
raise raise
@@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id):
except KeyError: except KeyError:
channel_status_name = channel_id channel_status_name = channel_id
print("Checking channel: " + channel_status_name) print(f"Checking channel: {channel_status_name}")
tasks = ( tasks = (
# channel page, need for video duration # channel page, need for video duration
@@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id):
times_published[video_id_element.text] = time_published times_published[video_id_element.text] = time_published
except ValueError: except ValueError:
print('Failed to read atoma feed for ' + channel_status_name) print(f'Failed to read atoma feed for {channel_status_name}')
traceback.print_exc() traceback.print_exc()
except defusedxml.ElementTree.ParseError: except defusedxml.ElementTree.ParseError:
print('Failed to read atoma feed for ' + channel_status_name) print(f'Failed to read atoma feed for {channel_status_name}')
if channel_info is None: # there was an error if channel_info is None: # there was an error
return return
if channel_info['error']: if channel_info['error']:
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error']) print(f'Error checking channel {channel_status_name}: {channel_info["error"]}')
return return
videos = channel_info['items'] videos = channel_info['items']
@@ -1023,7 +1022,7 @@ def get_subscriptions_page():
tag = request.args.get('tag', None) tag = request.args.get('tag', None)
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag) videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
for video in videos: for video in videos:
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg' video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg'
video['type'] = 'video' video['type'] = 'video'
video['item_size'] = 'small' video['item_size'] = 'small'
util.add_extra_html_info(video) util.add_extra_html_info(video)
@@ -1033,7 +1032,7 @@ def get_subscriptions_page():
subscription_list = [] subscription_list = []
for channel_name, channel_id, muted in _get_subscribed_channels(cursor): for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
subscription_list.append({ subscription_list.append({
'channel_url': util.URL_ORIGIN + '/channel/' + channel_id, 'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}',
'channel_name': channel_name, 'channel_name': channel_name,
'channel_id': channel_id, 'channel_id': channel_id,
'muted': muted, 'muted': muted,
@@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}" url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
try: try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id) image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}")
break break
except util.FetchError as e: except util.FetchError as e:
if '404' in str(e): if '404' in str(e):
continue continue
print("Failed to download thumbnail for " + video_id + ": " + str(e)) print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(500) flask.abort(500)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
if e.code == 404: if e.code == 404:
continue continue
print("Failed to download thumbnail for " + video_id + ": " + str(e)) print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(e.code) flask.abort(e.code)
if image is None: if image is None:

View File

@@ -75,14 +75,11 @@
<div class="external-player-controls"> <div class="external-player-controls">
<input class="speed" id="speed-control" type="text" title="Video speed"> <input class="speed" id="speed-control" type="text" title="Video speed">
{% if settings.use_video_player < 2 %} {% if settings.use_video_player < 2 %}
<!-- Native player quality selector --> <!-- Quality selector (populated by JS: HLS adds Auto+levels, DASH adds discrete sources) -->
<select id="quality-select" autocomplete="off"> <select id="quality-select" autocomplete="off">
<option value="-1" selected>Auto</option>
<!-- Quality options will be populated by HLS -->
</select> </select>
{% else %} {% else %}
<select id="quality-select" autocomplete="off" style="display: none;"> <select id="quality-select" autocomplete="off" style="display: none;">
<!-- Quality options will be populated by HLS -->
</select> </select>
{% endif %} {% endif %}
{% if settings.use_video_player != 2 %} {% if settings.use_video_player != 2 %}

View File

@@ -23,6 +23,9 @@ import stem
import stem.control import stem.control
import traceback import traceback
from youtube.yt_data_extract.common import concat_or_none
from youtube import constants
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# The trouble with the requests library: It ships its own certificate bundle via certifi # The trouble with the requests library: It ships its own certificate bundle via certifi
@@ -72,7 +75,7 @@ class TorManager:
def __init__(self): def __init__(self):
self.old_tor_connection_pool = None self.old_tor_connection_pool = None
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED') cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic() self.tor_pool_refresh_time = time.monotonic()
settings.add_setting_changed_hook( settings.add_setting_changed_hook(
@@ -92,7 +95,7 @@ class TorManager:
self.old_tor_connection_pool = self.tor_connection_pool self.old_tor_connection_pool = self.tor_connection_pool
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED') cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic() self.tor_pool_refresh_time = time.monotonic()
@@ -198,9 +201,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
class FetchError(Exception): class FetchError(Exception):
def __init__(self, code, reason='', ip=None, error_message=None): def __init__(self, code, reason='', ip=None, error_message=None):
if error_message: if error_message:
string = code + ' ' + reason + ': ' + error_message string = f"{code} {reason}: {error_message}"
else: else:
string = 'HTTP error during request: ' + code + ' ' + reason string = f"HTTP error during request: {code} {reason}"
Exception.__init__(self, string) Exception.__init__(self, string)
self.code = code self.code = code
self.reason = reason self.reason = reason
@@ -294,14 +297,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
exception_cause = e.__context__.__context__ exception_cause = e.__context__.__context__
if (isinstance(exception_cause, socks.ProxyConnectionError) if (isinstance(exception_cause, socks.ProxyConnectionError)
and settings.route_tor): and settings.route_tor):
msg = ('Failed to connect to Tor. Check that Tor is open and ' msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}'
'that your internet connection is working.\n\n'
+ str(e))
raise FetchError('502', reason='Bad Gateway', raise FetchError('502', reason='Bad Gateway',
error_message=msg) error_message=msg)
elif isinstance(e.__context__, elif isinstance(e.__context__,
urllib3.exceptions.NewConnectionError): urllib3.exceptions.NewConnectionError):
msg = 'Failed to establish a connection.\n\n' + str(e) msg = f'Failed to establish a connection.\n\n{e}'
raise FetchError( raise FetchError(
'502', reason='Bad Gateway', '502', reason='Bad Gateway',
error_message=msg) error_message=msg)
@@ -391,7 +392,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
if error: if error:
raise FetchError( raise FetchError(
'429', reason=response.reason, ip=ip, '429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error) error_message=f'Automatic circuit change: {error}')
continue # retry with new identity continue # retry with new identity
# Check for client errors (400, 404) - don't retry these # Check for client errors (400, 404) - don't retry these
@@ -467,17 +468,14 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
headers = {'User-Agent': 'Python-urllib'} headers = {'User-Agent': 'Python-urllib'}
response = pool.request('HEAD', url, headers=headers, retries=retries) response = pool.request('HEAD', url, headers=headers, retries=retries)
if report_text: if report_text:
print( print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}')
report_text,
' Latency:',
round(time.monotonic() - start_time, 3))
return response return response
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' mobile_user_agent = constants.MOBILE_USER_AGENT
mobile_ua = (('User-Agent', mobile_user_agent),) mobile_ua = constants.mobile_ua
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0' desktop_user_agent = constants.desktop_user_agent
desktop_ua = (('User-Agent', desktop_user_agent),) desktop_ua = constants.desktop_ua
json_header = (('Content-Type', 'application/json'),) json_header = constants.json_header
desktop_xhr_headers = ( desktop_xhr_headers = (
('Accept', '*/*'), ('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'), ('Accept-Language', 'en-US,en;q=0.5'),
@@ -544,16 +542,16 @@ def download_thumbnail(save_directory, video_id):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f'https://i.ytimg.com/vi/{video_id}/{quality}' url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
try: try:
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id) thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}')
except FetchError as e: except FetchError as e:
if '404' in str(e): if '404' in str(e):
continue continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) print(f'Failed to download thumbnail for {video_id}: {e}')
return False return False
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
if e.code == 404: if e.code == 404:
continue continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) print(f'Failed to download thumbnail for {video_id}: {e}')
return False return False
try: try:
with open(save_location, 'wb') as f: with open(save_location, 'wb') as f:
@@ -563,7 +561,7 @@ def download_thumbnail(save_directory, video_id):
with open(save_location, 'wb') as f: with open(save_location, 'wb') as f:
f.write(thumbnail) f.write(thumbnail)
return True return True
print('No thumbnail available for ' + video_id) print(f'No thumbnail available for {video_id}')
return False return False
@@ -646,7 +644,7 @@ def update_query_string(query_string, items):
return urllib.parse.urlencode(parameters, doseq=True) return urllib.parse.urlencode(parameters, doseq=True)
YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com') YOUTUBE_DOMAINS = constants.YOUTUBE_DOMAINS
YOUTUBE_URL_RE_STR = r'https?://(?:[a-zA-Z0-9_-]*\.)?(?:' YOUTUBE_URL_RE_STR = r'https?://(?:[a-zA-Z0-9_-]*\.)?(?:'
YOUTUBE_URL_RE_STR += r'|'.join(map(re.escape, YOUTUBE_DOMAINS)) YOUTUBE_URL_RE_STR += r'|'.join(map(re.escape, YOUTUBE_DOMAINS))
YOUTUBE_URL_RE_STR += r')(?:/[^"]*)?' YOUTUBE_URL_RE_STR += r')(?:/[^"]*)?'
@@ -673,16 +671,6 @@ def left_remove(string, substring):
return string return string
def concat_or_none(*strings):
'''Concatenates strings. Returns None if any of the arguments are None'''
result = ''
for string in strings:
if string is None:
return None
result += string
return result
def prefix_urls(item): def prefix_urls(item):
if settings.proxy_images: if settings.proxy_images:
try: try:
@@ -698,7 +686,7 @@ def prefix_urls(item):
def add_extra_html_info(item): def add_extra_html_info(item):
if item['type'] == 'video': if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None
video_info = {} video_info = {}
for key in ('id', 'title', 'author', 'duration', 'author_id'): for key in ('id', 'title', 'author', 'duration', 'author_id'):
@@ -721,7 +709,7 @@ def add_extra_html_info(item):
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id']) item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
if item.get('author_id') and 'author_url' not in item: if item.get('author_id') and 'author_url' not in item:
item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id'] item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}'
def check_gevent_exceptions(*tasks): def check_gevent_exceptions(*tasks):
@@ -731,24 +719,8 @@ def check_gevent_exceptions(*tasks):
# https://stackoverflow.com/a/62888 # https://stackoverflow.com/a/62888
replacement_map = collections.OrderedDict([ replacement_map = constants.REPLACEMENT_MAP
('<', '_'), DOS_names = constants.DOS_RESERVED_NAMES
('>', '_'),
(': ', ' - '),
(':', '-'),
('"', "'"),
('/', '_'),
('\\', '_'),
('|', '-'),
('?', ''),
('*', '_'),
('\t', ' '),
])
DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
'lpt8', 'lpt9'}
def to_valid_filename(name): def to_valid_filename(name):
@@ -790,143 +762,7 @@ def to_valid_filename(name):
return name return name
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/youtube.py#L72 INNERTUBE_CLIENTS = constants.INNERTUBE_CLIENTS
INNERTUBE_CLIENTS = {
'android': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'ANDROID',
'clientVersion': '19.09.36',
'osName': 'Android',
'osVersion': '12',
'androidSdkVersion': 31,
'platform': 'MOBILE',
'userAgent': 'com.google.android.youtube/19.09.36 (Linux; U; Android 12; US) gzip'
},
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
#'thirdParty': {
# 'embedUrl': 'https://google.com', # Can be any valid URL
#}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
},
'android-test-suite': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'ANDROID_TESTSUITE',
'clientVersion': '1.9',
'osName': 'Android',
'osVersion': '12',
'androidSdkVersion': 31,
'platform': 'MOBILE',
'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 12; US) gzip'
},
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
#'thirdParty': {
# 'embedUrl': 'https://google.com', # Can be any valid URL
#}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
},
'ios': {
'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS',
'clientVersion': '21.03.2',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.7.2.22H124',
'userAgent': 'com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False
},
# This client can access age restricted videos (unless the uploader has disabled the 'allow embedding' option)
# See: https://github.com/zerodytrash/YouTube-Internal-Clients
'tv_embedded': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
'clientVersion': '2.0',
'clientScreen': 'EMBED',
},
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
'thirdParty': {
'embedUrl': 'https://google.com', # Can be any valid URL
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 85,
'REQUIRE_JS_PLAYER': True,
},
'web': {
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB',
'clientVersion': '2.20220801.00.00',
'userAgent': desktop_user_agent,
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1
},
'android_vr': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_VR',
'clientVersion': '1.60.19',
'deviceMake': 'Oculus',
'deviceModel': 'Quest 3',
'androidSdkVersion': 32,
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'osName': 'Android',
'osVersion': '12L',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
'REQUIRE_JS_PLAYER': False,
},
'ios_vr': {
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
'INNERTUBE_CONTEXT': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'IOS_VR',
'clientVersion': '1.0',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
'osName': 'iPhone',
'osVersion': '18.7.2.22H124',
'userAgent': 'com.google.ios.youtube/1.0 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
}
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False
},
}
def get_visitor_data(): def get_visitor_data():
visitor_data = None visitor_data = None
@@ -967,7 +803,7 @@ def call_youtube_api(client, api, data):
user_agent = context['client'].get('userAgent') or mobile_user_agent user_agent = context['client'].get('userAgent') or mobile_user_agent
visitor_data = get_visitor_data() visitor_data = get_visitor_data()
url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key url = f'https://{host}/youtubei/v1/{api}?key={key}'
if visitor_data: if visitor_data:
context['client'].update({'visitorData': visitor_data}) context['client'].update({'visitorData': visitor_data})
data['context'] = context data['context'] = context
@@ -978,8 +814,8 @@ def call_youtube_api(client, api, data):
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data )) headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
response = fetch_url( response = fetch_url(
url, data=data, headers=headers, url, data=data, headers=headers,
debug_name='youtubei_' + api + '_' + client, debug_name=f'youtubei_{api}_{client}',
report_text='Fetched ' + client + ' youtubei ' + api report_text=f'Fetched {client} youtubei {api}'
).decode('utf-8') ).decode('utf-8')
return response return response

View File

@@ -1,3 +1,3 @@
from __future__ import unicode_literals from __future__ import unicode_literals
__version__ = 'v0.5.0' __version__ = 'v0.5.1'

View File

@@ -17,8 +17,16 @@ from flask import request
import youtube import youtube
from youtube import yt_app from youtube import yt_app
from youtube import util, comments, local_playlist, yt_data_extract from youtube import util, comments, local_playlist, yt_data_extract
from youtube import watch_formats
import settings import settings
# Backward compatibility aliases
codec_name = watch_formats.codec_name
video_quality_string = watch_formats.video_quality_string
short_video_quality_string = watch_formats.short_video_quality_string
audio_quality_string = watch_formats.audio_quality_string
format_bytes = watch_formats.format_bytes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -29,15 +37,7 @@ except FileNotFoundError:
decrypt_cache = {} decrypt_cache = {}
def codec_name(vcodec): # codec_name imported from watch_formats
if vcodec.startswith('avc'):
return 'h264'
elif vcodec.startswith('av01'):
return 'av1'
elif vcodec.startswith('vp'):
return 'vp'
else:
return 'unknown'
def get_video_sources(info, target_resolution): def get_video_sources(info, target_resolution):
@@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and fmt['vcodec']: if fmt['acodec'] and fmt['vcodec']:
if fmt.get('audio_track_is_default', True) is False: if fmt.get('audio_track_is_default', True) is False:
continue continue
source = {'type': 'video/' + fmt['ext'], source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)} 'quality_string': short_video_quality_string(fmt)}
source['quality_string'] += ' (integrated)' source['quality_string'] += ' (integrated)'
source.update(fmt) source.update(fmt)
@@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']): if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
if fmt['bitrate']: if fmt['bitrate']:
fmt['audio_bitrate'] = int(fmt['bitrate']/1000) fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
source = {'type': 'audio/' + fmt['ext'], source = {'type': f"audio/{fmt['ext']}",
'quality_string': audio_quality_string(fmt)} 'quality_string': audio_quality_string(fmt)}
source.update(fmt) source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"' source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
tid = fmt.get('audio_track_id') or 'default' tid = fmt.get('audio_track_id') or 'default'
if tid not in audio_by_track: if tid not in audio_by_track:
audio_by_track[tid] = { audio_by_track[tid] = {
@@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution):
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')): elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
if codec_name(fmt['vcodec']) == 'unknown': if codec_name(fmt['vcodec']) == 'unknown':
continue continue
source = {'type': 'video/' + fmt['ext'], source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)} 'quality_string': short_video_quality_string(fmt)}
source.update(fmt) source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"' source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
quality = str(fmt['quality']) + 'p' + str(fmt['fps']) quality = f"{fmt['quality']}p{fmt['fps']}"
video_only_sources.setdefault(quality, []).append(source) video_only_sources.setdefault(quality, []).append(source)
audio_tracks = [] audio_tracks = []
@@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution):
def video_rank(src): def video_rank(src):
''' Sort by settings preference. Use file size as tiebreaker ''' ''' Sort by settings preference. Use file size as tiebreaker '''
setting_name = 'codec_rank_' + codec_name(src['vcodec']) setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
return (settings.current_settings_dict[setting_name], return (settings.current_settings_dict[setting_name],
src['file_size']) src['file_size'])
pair_info['videos'].sort(key=video_rank) pair_info['videos'].sort(key=video_rank)
@@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
if auto: if auto:
label += ' (Automatic)' label += ' (Automatic)'
if trans_lang: if trans_lang:
label += ' -> ' + trans_lang label += f' -> {trans_lang}'
# Try to use Android caption URL directly (no PO Token needed) # Try to use Android caption URL directly (no PO Token needed)
caption_url = None caption_url = None
@@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
else: else:
caption_url += '&fmt=vtt' caption_url += '&fmt=vtt'
if trans_lang: if trans_lang:
caption_url += '&tlang=' + trans_lang caption_url += f'&tlang={trans_lang}'
url = util.prefix_url(caption_url) url = util.prefix_url(caption_url)
else: else:
# Fallback to old method # Fallback to old method
@@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id):
player_name = info['player_name'] player_name = info['player_name']
if player_name in decrypt_cache: if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name) print(f'Using cached decryption function for: {player_name}')
info['decryption_function'] = decrypt_cache[player_name] info['decryption_function'] = decrypt_cache[player_name]
else: else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
base_js = base_js.decode('utf-8') base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js) err = yt_data_extract.extract_decryption_function(info, base_js)
if err: if err:
@@ -387,11 +387,11 @@ def fetch_player_response(client, video_id):
def fetch_watch_page_info(video_id, playlist_id, index): def fetch_watch_page_info(video_id, playlist_id, index):
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial # bpctr=9999999999 will bypass are-you-sure dialogs for controversial
# videos # videos
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999' url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
if playlist_id: if playlist_id:
url += '&list=' + playlist_id url += f'&list={playlist_id}'
if index: if index:
url += '&index=' + index url += f'&index={index}'
headers = ( headers = (
('Accept', '*/*'), ('Accept', '*/*'),
@@ -446,7 +446,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
info['hls_audio_tracks'] = {} info['hls_audio_tracks'] = {}
hls_data = None hls_data = None
hls_client_used = None hls_client_used = None
for hls_client in ('ios', 'ios_vr', 'android'): for hls_client in ('ios', 'android'):
try: try:
resp = fetch_player_response(hls_client, video_id) or {} resp = fetch_player_response(hls_client, video_id) or {}
hls_data = json.loads(resp) if isinstance(resp, str) else resp hls_data = json.loads(resp) if isinstance(resp, str) else resp
@@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS audio tracks for proxy access # Register HLS audio tracks for proxy access
added = 0 added = 0
for lang, track in info['hls_audio_tracks'].items(): for lang, track in info['hls_audio_tracks'].items():
ck = video_id + '_' + lang ck = f"{video_id}_{lang}"
from youtube.hls_cache import register_track from youtube.hls_cache import register_track
register_track(ck, track['hls_url'], register_track(ck, track['hls_url'],
video_id=video_id, track_id=lang) video_id=video_id, track_id=lang)
@@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'audio_track_id': lang, 'audio_track_id': lang,
'audio_track_name': track['name'], 'audio_track_name': track['name'],
'audio_track_is_default': track['is_default'], 'audio_track_is_default': track['is_default'],
'itag': 'hls_' + lang, 'itag': f'hls_{lang}',
'ext': 'mp4', 'ext': 'mp4',
'audio_bitrate': 128, 'audio_bitrate': 128,
'bitrate': 128000, 'bitrate': 128000,
@@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'fps': None, 'fps': None,
'init_range': {'start': 0, 'end': 0}, 'init_range': {'start': 0, 'end': 0},
'index_range': {'start': 0, 'end': 0}, 'index_range': {'start': 0, 'end': 0},
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck), 'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
's': None, 's': None,
'sp': None, 'sp': None,
'quality': None, 'quality': None,
@@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS manifest for proxying # Register HLS manifest for proxying
if info['hls_manifest_url']: if info['hls_manifest_url']:
ck = video_id + '_video' ck = f"{video_id}_video"
from youtube.hls_cache import register_track from youtube.hls_cache import register_track
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video') register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
# Use proxy URL instead of direct Google Video URL # Use proxy URL instead of direct Google Video URL
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck) info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
# Fallback to 'ios' if no valid URLs are found # Fallback to 'ios' if no valid URLs are found
if not info.get('formats') or info.get('player_urls_missing'): if not info.get('formats') or info.get('player_urls_missing'):
@@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
if info.get('formats'): if info.get('formats'):
decryption_error = decrypt_signatures(info, video_id) decryption_error = decrypt_signatures(info, video_id)
if decryption_error: if decryption_error:
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
# check if urls ready (non-live format) in former livestream # check if urls ready (non-live format) in former livestream
# urls not ready if all of them have no filesize # urls not ready if all of them have no filesize
@@ -621,55 +621,10 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
return info return info
def video_quality_string(format): # video_quality_string imported from watch_formats
if format['vcodec']: # short_video_quality_string imported from watch_formats
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?') # audio_quality_string imported from watch_formats
if format['fps']: # format_bytes imported from watch_formats
result += ' ' + str(format['fps']) + 'fps'
return result
elif format['acodec']:
return 'audio only'
return '?'
def short_video_quality_string(fmt):
result = str(fmt['quality'] or '?') + 'p'
if fmt['fps']:
result += str(fmt['fps'])
if fmt['vcodec'].startswith('av01'):
result += ' AV1'
elif fmt['vcodec'].startswith('avc'):
result += ' h264'
else:
result += ' ' + fmt['vcodec']
return result
def audio_quality_string(fmt):
if fmt['acodec']:
if fmt['audio_bitrate']:
result = '%d' % fmt['audio_bitrate'] + 'k'
else:
result = '?k'
if fmt['audio_sample_rate']:
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz'
return result
elif fmt['vcodec']:
return 'video only'
return '?'
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/utils.py
def format_bytes(bytes):
if bytes is None:
return 'N/A'
if type(bytes) is str:
bytes = float(bytes)
if bytes == 0.0:
exponent = 0
else:
exponent = int(math.log(bytes, 1024.0))
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent] suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
converted = float(bytes) / float(1024 ** exponent) converted = float(bytes) / float(1024 ** exponent)
return '%.2f%s' % (converted, suffix) return '%.2f%s' % (converted, suffix)
@@ -737,9 +692,9 @@ def get_audio_track():
seg = line if line.startswith('http') else urljoin(playlist_base, line) seg = line if line.startswith('http') else urljoin(playlist_base, line)
# Always use &seg= parameter, never &url= for segments # Always use &seg= parameter, never &url= for segments
playlist_lines.append( playlist_lines.append(
base_url + '/ytl-api/audio-track?id=' f'{base_url}/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key) f'{urllib.parse.quote(cache_key)}'
+ '&seg=' + urllib.parse.quote(seg, safe='') f'&seg={urllib.parse.quote(seg, safe="")}'
) )
playlist = '\n'.join(playlist_lines) playlist = '\n'.join(playlist_lines)
@@ -797,9 +752,7 @@ def get_audio_track():
return url return url
if not url.startswith('http://') and not url.startswith('https://'): if not url.startswith('http://') and not url.startswith('https://'):
url = urljoin(playlist_base, url) url = urljoin(playlist_base, url)
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
playlist_lines = [] playlist_lines = []
for line in playlist.split('\n'): for line in playlist.split('\n'):
@@ -812,7 +765,7 @@ def get_audio_track():
if line.startswith('#') and 'URI=' in line: if line.startswith('#') and 'URI=' in line:
def rewrite_uri_attr(match): def rewrite_uri_attr(match):
uri = match.group(1) uri = match.group(1)
return 'URI="' + proxy_url(uri) + '"' return f'URI="{proxy_url(uri)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line) line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
playlist_lines.append(line) playlist_lines.append(line)
elif line.startswith('#'): elif line.startswith('#'):
@@ -834,14 +787,12 @@ def get_audio_track():
# This is an actual segment - fetch and serve it # This is an actual segment - fetch and serve it
try: try:
headers = ( headers_dict = {
('User-Agent', 'Mozilla/5.0'), 'User-Agent': 'Mozilla/5.0',
('Accept', '*/*'), 'Accept': '*/*',
) }
content = util.fetch_url(seg_url, headers=headers,
debug_name='hls_seg', report_text=None)
# Determine content type based on URL or content # Determine content type based on URL
# HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s) # HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s)
if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'): if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'):
content_type = 'video/mp4' content_type = 'video/mp4'
@@ -851,7 +802,23 @@ def get_audio_track():
# Default to MPEG-TS for HLS # Default to MPEG-TS for HLS
content_type = 'video/mp2t' content_type = 'video/mp2t'
return flask.Response(content, mimetype=content_type, response, cleanup_func = util.fetch_url_response(
seg_url, headers=tuple(headers_dict.items()),
timeout=30, use_tor=settings.route_tor)
def generate():
try:
while True:
chunk = response.read(64 * 1024) # 64 KB chunks
if not chunk:
break
yield chunk
finally:
cleanup_func(response)
return flask.Response(
flask.stream_with_context(generate()),
mimetype=content_type,
headers={ headers={
'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS', 'Access-Control-Allow-Methods': 'GET, OPTIONS',
@@ -883,9 +850,7 @@ def get_audio_track():
if segment_url.startswith('/ytl-api/audio-track'): if segment_url.startswith('/ytl-api/audio-track'):
return segment_url return segment_url
base_url = request.url_root.rstrip('/') base_url = request.url_root.rstrip('/')
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(segment_url))
playlist_lines = [] playlist_lines = []
for line in playlist.split('\n'): for line in playlist.split('\n'):
@@ -949,14 +914,10 @@ def get_hls_manifest():
if is_audio_track: if is_audio_track:
# Audio track playlist - proxy through audio-track endpoint # Audio track playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
+ urllib.parse.quote(cache_key)
+ '&url=' + urllib.parse.quote(url, safe=''))
else: else:
# Video segment or variant playlist - proxy through audio-track endpoint # Video segment or variant playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
# Parse and rewrite the manifest # Parse and rewrite the manifest
manifest_lines = [] manifest_lines = []
@@ -974,7 +935,7 @@ def get_hls_manifest():
nonlocal rewritten_count nonlocal rewritten_count
uri = match.group(1) uri = match.group(1)
rewritten_count += 1 rewritten_count += 1
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"' return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line) line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
manifest_lines.append(line) manifest_lines.append(line)
elif line.startswith('#'): elif line.startswith('#'):
@@ -1053,7 +1014,7 @@ def get_storyboard_vtt():
ts = 0 # current timestamp ts = 0 # current timestamp
for i in range(storyboard.storyboard_count): for i in range(storyboard.storyboard_count):
url = '/' + storyboard.url.replace("$M", str(i)) url = f'/{storyboard.url.replace("$M", str(i))}'
interval = storyboard.interval interval = storyboard.interval
w, h = storyboard.width, storyboard.height w, h = storyboard.width, storyboard.height
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
@@ -1078,7 +1039,7 @@ def get_watch_page(video_id=None):
if not video_id: if not video_id:
return flask.render_template('error.html', error_message='Missing video id'), 404 return flask.render_template('error.html', error_message='Missing video id'), 404
if len(video_id) < 11: if len(video_id) < 11:
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404 return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
time_start_str = request.args.get('t', '0s') time_start_str = request.args.get('t', '0s')
time_start = 0 time_start = 0
@@ -1141,9 +1102,9 @@ def get_watch_page(video_id=None):
util.prefix_urls(item) util.prefix_urls(item)
util.add_extra_html_info(item) util.add_extra_html_info(item)
if playlist_id: if playlist_id:
item['url'] += '&list=' + playlist_id item['url'] += f'&list={playlist_id}'
if item['index']: if item['index']:
item['url'] += '&index=' + str(item['index']) item['url'] += f'&index={item["index"]}'
info['playlist']['author_url'] = util.prefix_url( info['playlist']['author_url'] = util.prefix_url(
info['playlist']['author_url']) info['playlist']['author_url'])
if settings.img_prefix: if settings.img_prefix:
@@ -1159,16 +1120,16 @@ def get_watch_page(video_id=None):
filename = title filename = title
ext = fmt.get('ext') ext = fmt.get('ext')
if ext: if ext:
filename += '.' + ext filename += f'.{ext}'
fmt['url'] = fmt['url'].replace( fmt['url'] = fmt['url'].replace(
'/videoplayback', '/videoplayback',
'/videoplayback/name/' + filename) f'/videoplayback/name/{filename}')
download_formats = [] download_formats = []
for format in (info['formats'] + info['hls_formats']): for format in (info['formats'] + info['hls_formats']):
if format['acodec'] and format['vcodec']: if format['acodec'] and format['vcodec']:
codecs_string = format['acodec'] + ', ' + format['vcodec'] codecs_string = f"{format['acodec']}, {format['vcodec']}"
else: else:
codecs_string = format['acodec'] or format['vcodec'] or '?' codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({ download_formats.append({
@@ -1247,12 +1208,9 @@ def get_watch_page(video_id=None):
for source in subtitle_sources: for source in subtitle_sources:
best_caption_parse = urllib.parse.urlparse( best_caption_parse = urllib.parse.urlparse(
source['url'].lstrip('/')) source['url'].lstrip('/'))
transcript_url = (util.URL_ORIGIN transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
+ '/watch/transcript'
+ best_caption_parse.path
+ '?' + best_caption_parse.query)
other_downloads.append({ other_downloads.append({
'label': 'Video Transcript: ' + source['label'], 'label': f'Video Transcript: {source["label"]}',
'ext': 'txt', 'ext': 'txt',
'url': transcript_url 'url': transcript_url
}) })
@@ -1263,7 +1221,7 @@ def get_watch_page(video_id=None):
template_name = 'watch.html' template_name = 'watch.html'
return flask.render_template(template_name, return flask.render_template(template_name,
header_playlist_names = local_playlist.get_playlist_names(), header_playlist_names = local_playlist.get_playlist_names(),
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '', uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
time_published = info['time_published'], time_published = info['time_published'],
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
@@ -1305,10 +1263,10 @@ def get_watch_page(video_id=None):
ip_address = info['ip_address'] if settings.route_tor else None, ip_address = info['ip_address'] if settings.route_tor else None,
invidious_used = info['invidious_used'], invidious_used = info['invidious_used'],
invidious_reload_button = info['invidious_reload_button'], invidious_reload_button = info['invidious_reload_button'],
video_url = util.URL_ORIGIN + '/watch?v=' + video_id, video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
video_id = video_id, video_id = video_id,
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' + storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
urlencode([('spec_url', info['storyboard_spec_url'])]) f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
if info['storyboard_spec_url'] else None), if info['storyboard_spec_url'] else None),
js_data = { js_data = {
@@ -1335,7 +1293,7 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>') @yt_app.route('/api/<path:dummy>')
def get_captions(dummy): def get_captions(dummy):
url = 'https://www.youtube.com' + request.full_path url = f'https://www.youtube.com{request.full_path}'
try: try:
result = util.fetch_url(url, headers=util.mobile_ua) result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"") result = result.replace(b"align:start position:0%", b"")
@@ -1350,12 +1308,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
@yt_app.route('/watch/transcript/<path:caption_path>') @yt_app.route('/watch/transcript/<path:caption_path>')
def get_transcript(caption_path): def get_transcript(caption_path):
try: try:
captions = util.fetch_url('https://www.youtube.com/' captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
+ caption_path
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
except util.FetchError as e: except util.FetchError as e:
msg = ('Error retrieving captions: ' + str(e) + '\n\n' msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
+ 'The caption url may have expired.')
print(msg) print(msg)
return flask.Response( return flask.Response(
msg, msg,
@@ -1403,7 +1358,7 @@ def get_transcript(caption_path):
result = '' result = ''
for seg in segments: for seg in segments:
if seg['text'] != ' ': if seg['text'] != ' ':
result += seg['begin'] + ' ' + seg['text'] + '\r\n' result += f"{seg['begin']} {seg['text']}\r\n"
return flask.Response(result.encode('utf-8'), return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8') mimetype='text/plain;charset=UTF-8')

82
youtube/watch_formats.py Normal file
View File

@@ -0,0 +1,82 @@
"""Video format helpers for yt-local."""
import math
from typing import Any, Dict, Optional
def codec_name(vcodec: str) -> str:
"""Extract codec short name from codec string."""
if vcodec.startswith('avc'):
return 'h264'
elif vcodec.startswith('av01'):
return 'av1'
elif vcodec.startswith('vp'):
return 'vp'
else:
return 'unknown'
def video_quality_string(fmt: Dict[str, Any]) -> str:
"""Return video quality string (e.g., '1920x1080 30fps')."""
if fmt.get('vcodec'):
result = f"{fmt.get('width') or '?'}x{fmt.get('height') or '?'}"
if fmt.get('fps'):
result += f" {fmt['fps']}fps"
return result
elif fmt.get('acodec'):
return 'audio only'
return '?'
def short_video_quality_string(fmt: Dict[str, Any]) -> str:
"""Return short video quality string (e.g., '1080p60 AV1')."""
result = f"{fmt.get('quality') or '?'}p"
if fmt.get('fps'):
result += str(fmt['fps'])
vcodec = fmt.get('vcodec', '')
if vcodec.startswith('av01'):
result += ' AV1'
elif vcodec.startswith('avc'):
result += ' h264'
else:
result += f" {vcodec}"
return result
def audio_quality_string(fmt: Dict[str, Any]) -> str:
"""Return audio quality string (e.g., '128k 44.1kHz')."""
if fmt.get('acodec'):
if fmt.get('audio_bitrate'):
result = f"{fmt['audio_bitrate']}k"
else:
result = '?k'
if fmt.get('audio_sample_rate'):
result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz"
return result
elif fmt.get('vcodec'):
return 'video only'
return '?'
def format_bytes(bytes_val: Optional[float]) -> str:
"""Convert bytes to human-readable string (e.g., '1.5 MiB')."""
if bytes_val is None:
return 'N/A'
if type(bytes_val) is str:
bytes_val = float(bytes_val)
if bytes_val == 0.0:
exponent = 0
else:
exponent = int(math.log(bytes_val, 1024.0))
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
converted = float(bytes_val) / float(1024 ** exponent)
return '%.2f%s' % (converted, suffix)
__all__ = [
'codec_name',
'video_quality_string',
'short_video_quality_string',
'audio_quality_string',
'format_bytes',
]

View File

@@ -1,7 +1,8 @@
from .common import (get, multi_get, deep_get, multi_deep_get, from .common import (get, multi_get, deep_get, multi_deep_get,
liberal_update, conservative_update, remove_redirect, normalize_url, liberal_update, conservative_update, remove_redirect, normalize_url,
extract_str, extract_formatted_text, extract_int, extract_approx_int, extract_str, extract_formatted_text, extract_int, extract_approx_int,
extract_date, extract_item_info, extract_items, extract_response) extract_date, extract_item_info, extract_items, extract_response,
concat_or_none)
from .everything_else import (extract_channel_info, extract_search_info, from .everything_else import (extract_channel_info, extract_search_info,
extract_playlist_metadata, extract_playlist_info, extract_comments_info) extract_playlist_metadata, extract_playlist_info, extract_comments_info)

View File

@@ -212,7 +212,7 @@ def extract_date(date_text):
month, day, year = parts[-3:] month, day, year = parts[-3:]
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day return f'{year}-{month}-{day}'
return None return None
def check_missing_keys(object, *key_sequences): def check_missing_keys(object, *key_sequences):
@@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences):
for key in key_sequence: for key in key_sequence:
_object = _object[key] _object = _object[key]
except (KeyError, IndexError, TypeError): except (KeyError, IndexError, TypeError):
return 'Could not find ' + key return f'Could not find {key}'
return None return None
@@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}):
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
)) ))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['description'] = extract_formatted_text(multi_deep_get( info['description'] = extract_formatted_text(multi_deep_get(
item, item,
['descriptionText'], ['descriptionSnippet'], ['descriptionText'], ['descriptionSnippet'],

View File

@@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json):
metadata['description'] = desc metadata['description'] = desc
if metadata['author_id']: if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}'
if metadata['first_video_id'] is None: if metadata['first_video_id'] is None:
metadata['thumbnail'] = None metadata['thumbnail'] = None

View File

@@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''):
) )
if playability_status not in (None, 'OK'): if playability_status not in (None, 'OK'):
info['playability_error'] = error_prefix + playability_reason info['playability_error'] = f'{error_prefix}{playability_reason}'
elif not info['playability_error']: # do not override elif not info['playability_error']: # do not override
info['playability_error'] = error_prefix + 'Unknown playability error' info['playability_error'] = f'{error_prefix}Unknown playability error'
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json): def extract_watch_info(polymer_json):
@@ -726,7 +726,7 @@ def extract_watch_info(polymer_json):
# Store the full URL from the player response (includes valid tokens) # Store the full URL from the player response (includes valid tokens)
if base_url: if base_url:
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name: if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name info['_manual_caption_language_names'][lang_code] = lang_name
@@ -806,7 +806,7 @@ def extract_watch_info(polymer_json):
info['allowed_countries'] = mf.get('availableCountries', []) info['allowed_countries'] = mf.get('availableCountries', [])
# other stuff # other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec') info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
return info return info
@@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
url = info['_captions_base_url'] url = info['_captions_base_url']
if not url: if not url:
return None return None
url += '&lang=' + language url += f'&lang={language}'
url += '&fmt=' + format url += f'&fmt={format}'
if automatic: if automatic:
url += '&kind=asr' url += '&kind=asr'
elif language in info['_manual_caption_language_names']: elif language in info['_manual_caption_language_names']:
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='') url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}'
if translation_language: if translation_language:
url += '&tlang=' + translation_language url += '&tlang=' + translation_language
@@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js):
return 'Could not find var_name' return 'Could not find var_name'
var_name = var_with_operation_match.group(1) var_name = var_with_operation_match.group(1)
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL)
if var_body_match is None: if var_body_match is None:
return 'Could not find var_body' return 'Could not find var_body'
@@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js):
elif op_body.startswith('var c=a[0]'): elif op_body.startswith('var c=a[0]'):
operation_definitions[op_name] = 2 operation_definitions[op_name] = 2
else: else:
return 'Unknown op_body: ' + op_body return f'Unknown op_body: {op_body}'
decryption_function = [] decryption_function = []
for op_with_arg in function_body: for op_with_arg in function_body:
@@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js):
return 'Could not parse operation with arg' return 'Could not parse operation with arg'
op_name = match.group(2).strip('[].') op_name = match.group(2).strip('[].')
if op_name not in operation_definitions: if op_name not in operation_definitions:
return 'Unknown op_name: ' + str(op_name) return f'Unknown op_name: {op_name}'
op_argument = match.group(3) op_argument = match.group(3)
decryption_function.append([operation_definitions[op_name], int(op_argument)]) decryption_function.append([operation_definitions[op_name], int(op_argument)])
@@ -1028,5 +1028,5 @@ def decrypt_signatures(info):
_operation_2(a, argument) _operation_2(a, argument)
signature = ''.join(a) signature = ''.join(a)
format['url'] += '&' + format['sp'] + '=' + signature format['url'] += f'&{format["sp"]}={signature}'
return False return False