Compare commits
8 Commits
v0.5.0
...
security/h
| Author | SHA1 | Date | |
|---|---|---|---|
|
13708bb4ea
|
|||
|
b9248cfe2d
|
|||
|
07ca635a84
|
|||
|
afbe137676
|
|||
| 76c9263cd6 | |||
| ab55cf79bb | |||
| 8d66143c90 | |||
|
50ad959a80
|
18
README.md
18
README.md
@@ -2,7 +2,7 @@
|
||||
|
||||
[](https://www.gnu.org/licenses/agpl-3.0)
|
||||
[](https://www.python.org/downloads/)
|
||||
[](https://github.com/user234683/youtube-local)
|
||||
[](https://git.fridu.us/heckyel/youtube-local)
|
||||
|
||||
A privacy-focused, browser-based YouTube client that routes requests through Tor for anonymous viewing—**without compromising on speed or features**.
|
||||
|
||||
@@ -48,12 +48,12 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
|
||||
## Screenshots
|
||||
|
||||
| Light Theme | Gray Theme | Dark Theme |
|
||||
|:-----------------------------------------------------:|:----------------------------------------------------:|:----------------------------------------------------:|
|
||||
|  |  |  |
|
||||
|:----------------------------------------------------------------------------------------------:|:---------------------------------------------------------------------------------------------:|:---------------------------------------------------------------------------------------------:|
|
||||
|  |  |  |
|
||||
|
||||
| Channel View | Playlist View |
|
||||
|:-------------------------------------------------------:|:---------------------:|
|
||||
|  | *(similar structure)* |
|
||||
|:------------------------------------------------------------------------------------------------:|:---------------------:|
|
||||
|  | *(similar structure)* |
|
||||
|
||||
---
|
||||
|
||||
@@ -61,7 +61,7 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
|
||||
|
||||
### Windows
|
||||
|
||||
1. Download the latest [release ZIP](https://github.com/user234683/yt-local/releases)
|
||||
1. Download the latest [release ZIP](https://git.fridu.us/heckyel/yt-local/releases)
|
||||
2. Extract to any folder
|
||||
3. Run `run.bat` to start
|
||||
|
||||
@@ -69,7 +69,7 @@ yt-local is a lightweight, self-hosted YouTube client written in Python that giv
|
||||
|
||||
```bash
|
||||
# 1. Clone or extract the release
|
||||
git clone https://github.com/user234683/yt-local.git
|
||||
git clone https://git.fridu.us/heckyel/yt-local.git
|
||||
cd yt-local
|
||||
|
||||
# 2. Create and activate virtual environment
|
||||
@@ -279,7 +279,7 @@ yt-local is designed for self-hosting.
|
||||
This project is 100% free and open-source. If you'd like to support development:
|
||||
|
||||
- **Bitcoin**: `1JrC3iqs3PP5Ge1m1vu7WE8LEf4S85eo7y`
|
||||
- **Tor node donation**: https://torservers.net/donate
|
||||
- **Tor node donation**: <https://torservers.net/donate>
|
||||
|
||||
---
|
||||
|
||||
@@ -310,4 +310,4 @@ Permission is granted to relicense code portions into youtube-dl's license (curr
|
||||
|
||||
Made for privacy-conscious users
|
||||
|
||||
Last updated: 2026-04-19
|
||||
Last updated: 2026-05-03
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# Coding guidelines
|
||||
|
||||
* Follow the [PEP 8 guidelines](https://www.python.org/dev/peps/pep-0008/) for all new Python code as best you can. Some old code doesn't follow PEP 8 yet. This includes limiting line length to 79 characters (with exception for long strings such as URLs that can't reasonably be broken across multiple lines) and using 4 spaces for indentation.
|
||||
|
||||
* Do not use single letter or cryptic names for variables (except iterator variables or the like). When in doubt, choose the more verbose option.
|
||||
@@ -12,30 +13,34 @@
|
||||
* The same guidelines apply to commenting code. If a piece of code is not self-explanatory, add a comment explaining what it does and why it's there.
|
||||
|
||||
# Testing and releases
|
||||
|
||||
* This project uses pytest. To install pytest and any future dependencies needed for development, run pip3 on the requirements-dev.txt file. To run tests, run `python3 -m pytest` rather than just `pytest` because the former will make sure the toplevel directory is in Python's import search path.
|
||||
|
||||
* To build releases for Windows, run `python3 generate_release.py [intended python version here, without v infront]`. The required software (such as 7z, git) are listed in the `generate_release.py` file. For instance, wine is required if building on GNU+Linux. The build script will automatically download the embedded Python release to include. Use the latest release of Python 3.7.x so that Vista will be supported. See https://github.com/user234683/youtube-local/issues/6#issuecomment-672608388
|
||||
|
||||
# Overview of the software architecture
|
||||
## Overview of the software architecture
|
||||
|
||||
### server.py
|
||||
|
||||
## server.py
|
||||
* This is the entry point, and sets up the HTTP server that listens for incoming requests. It delegates the request to the appropriate "site_handler". For instance, `localhost:8080/youtube.com/...` goes to the `youtube` site handler, whereas `localhost:8080/ytimg.com/...` (the url for video thumbnails) goes to the site handler for just fetching static resources such as images from youtube.
|
||||
|
||||
* The reason for this architecture: the original design philosophy when I first conceived the project was that this would work for any site supported by youtube-dl, including YouTube, Vimeo, DailyMotion, etc. I've dropped this idea for now, though I might pick it up later. (youtube-dl is no longer used)
|
||||
|
||||
* This file uses the raw [WSGI request](https://www.python.org/dev/peps/pep-3333/) format. The WSGI format is a Python standard for how HTTP servers (I use the stock server provided by gevent) should call HTTP applications. So that's why the file contains stuff like `env['REQUEST_METHOD']`.
|
||||
|
||||
### Flask and Gevent
|
||||
|
||||
## Flask and Gevent
|
||||
* The `youtube` handler in server.py then delegates the request to the Flask yt_app object, which the rest of the project uses. [Flask](https://flask.palletsprojects.com/en/1.1.x/) is a web application framework that makes handling requests easier than accessing the raw WSGI requests. Flask (Werkzeug specifically) figures out which function to call for a particular url. Each request handling function is registered into Flask's routing table by using function annotations above it. The request handling functions are always at the bottom of the file for a particular youtube page (channel, watch, playlist, etc.), and they're where you want to look to see how the response gets constructed for a particular url. Miscellaneous request handlers that don't belong anywhere else are located in `__init__.py`, which is where the `yt_app` object is instantiated.
|
||||
|
||||
* The actual html for yt-local is generated using Jinja templates. Jinja lets you embed a Python-like language inside html files so you can use constructs such as for loops to construct the html for a list of 30 videos given a dictionary with information for those videos. Jinja is included as part of Flask. It has some annoying differences from Python in a lot of details, so check the [docs here](https://jinja.palletsprojects.com/en/2.11.x/) when you use it. The request handling functions will pass the information that has been scraped from YouTube into these templates for the final result.
|
||||
* The project uses the gevent library for parallelism (such as for launching requests in parallel), as opposed to using the async keyword.
|
||||
|
||||
## util.py
|
||||
### util.py
|
||||
|
||||
* util.py is a grab-bag of miscellaneous things; admittedly I need to get around to refactoring it. The biggest thing it has is the `fetch_url` function which is what I use for sending out requests for YouTube. The Tor routing is managed here. `fetch_url` will raise an a `FetchError` exception if the request fails. The parameter `debug_name` in `fetch_url` is the filename that the response from YouTube will be saved to if the hidden debugging option is enabled in settings.txt. So if there's a bug when YouTube changes something, you can check the response from YouTube from that file.
|
||||
|
||||
## Data extraction - protobuf, polymer, and yt_data_extract
|
||||
### Data extraction - protobuf, polymer, and yt_data_extract
|
||||
|
||||
* proto.py is used for generating what are called ctokens needed when making requests to YouTube. These ctokens use Google's [protobuf](https://developers.google.com/protocol-buffers) format. Figuring out how to generate these in new instances requires some reverse engineering. I have a messy python file I use to make this convenient which you can find under ./youtube/proto_debug.py
|
||||
|
||||
* The responses from YouTube are in a JSON format called polymer (polymer is the name of the 2017-present YouTube layout). The JSON consists of a bunch of nested dictionaries which basically specify the layout of the page via objects called renderers. A renderer represents an object on a page in a similar way to html tags; the renders often contain renders inside them. The Javascript on YouTube's page translates this JSON to HTML. Example: `compactVideoRenderer` represents a video item in you can click on such as in the related videos (so these are called "items" in the codebase). This JSON is very messy. You'll need a JSON prettifier or something that gives you a tree view in order to study it.
|
||||
@@ -46,15 +51,16 @@
|
||||
|
||||
* The `extract_items` function is similar but works on the response object, automatically finding the appropriate renderer to call `extract_items_from_renderer` on.
|
||||
|
||||
### Other
|
||||
|
||||
## Other
|
||||
* subscriptions.py uses SQLite to store data.
|
||||
|
||||
* Hidden settings only relevant to developers (such as for debugging) are not displayed on the settings page. They can be found in the settings.txt file.
|
||||
|
||||
* Since I can't anticipate the things that will trip up beginners to the codebase, if you spend awhile figuring something out, go ahead and make a pull request adding a brief description of your findings to this document to help other beginners.
|
||||
|
||||
## Development tips
|
||||
### Development tips
|
||||
|
||||
* When developing functionality to interact with YouTube in new ways, you'll want to use the network tab in your browser's devtools to inspect which requests get made under normal usage of YouTube. You'll also want a tool you can use to construct custom requests and specify headers to reverse engineer the request format. I use the [HeaderTool](https://github.com/loreii/HeaderTool) extension in Firefox, but there's probably a more streamlined program out there.
|
||||
|
||||
* You'll want to have a utility or IDE that can perform full text search on a repository, since this is crucial for navigating unfamiliar codebases to figure out where certain strings appear or where things get defined.
|
||||
|
||||
@@ -33,7 +33,7 @@ def check_subp(x):
|
||||
raise Exception('Got nonzero exit code from command')
|
||||
|
||||
def log(line):
|
||||
print('[generate_release.py] ' + line)
|
||||
print(f'[generate_release.py] {line}')
|
||||
|
||||
# https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions
|
||||
def remove_files_with_extensions(path, extensions):
|
||||
@@ -43,23 +43,23 @@ def remove_files_with_extensions(path, extensions):
|
||||
os.remove(os.path.join(root, file))
|
||||
|
||||
def download_if_not_exists(file_name, url, sha256=None):
|
||||
if not os.path.exists('./' + file_name):
|
||||
if not os.path.exists(f'./{file_name}'):
|
||||
# Reject non-https URLs so a mistaken constant cannot cause a
|
||||
# plaintext download (bandit B310 hardening).
|
||||
if not url.startswith('https://'):
|
||||
raise Exception('Refusing to download over non-https URL: ' + url)
|
||||
log('Downloading ' + file_name + '..')
|
||||
raise Exception(f'Refusing to download over non-https URL: {url}')
|
||||
log(f'Downloading {file_name}..')
|
||||
data = urllib.request.urlopen(url).read()
|
||||
log('Finished downloading ' + file_name)
|
||||
with open('./' + file_name, 'wb') as f:
|
||||
log(f'Finished downloading {file_name}')
|
||||
with open(f'./{file_name}', 'wb') as f:
|
||||
f.write(data)
|
||||
if sha256:
|
||||
digest = hashlib.sha256(data).hexdigest()
|
||||
if digest != sha256:
|
||||
log('Error: ' + file_name + ' has wrong hash: ' + digest)
|
||||
log(f'Error: {file_name} has wrong hash: {digest}')
|
||||
sys.exit(1)
|
||||
else:
|
||||
log('Using existing ' + file_name)
|
||||
log(f'Using existing {file_name}')
|
||||
|
||||
def wine_run_shell(command):
|
||||
# Keep argv-style invocation (no shell) to avoid command injection.
|
||||
@@ -120,7 +120,7 @@ if len(os.listdir('./yt-local')) == 0:
|
||||
# ----------- Generate embedded python distribution -----------
|
||||
os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution
|
||||
get_pip_url = 'https://bootstrap.pypa.io/get-pip.py'
|
||||
latest_dist_url = 'https://www.python.org/ftp/python/' + latest_version + '/python-' + latest_version
|
||||
latest_dist_url = f'https://www.python.org/ftp/python/{latest_version}/python-{latest_version}'
|
||||
if bitness == '32':
|
||||
latest_dist_url += '-embed-win32.zip'
|
||||
else:
|
||||
@@ -142,7 +142,7 @@ else:
|
||||
|
||||
download_if_not_exists('get-pip.py', get_pip_url)
|
||||
|
||||
python_dist_name = 'python-dist-' + latest_version + '-' + bitness + '.zip'
|
||||
python_dist_name = f'python-dist-{latest_version}-{bitness}.zip'
|
||||
|
||||
download_if_not_exists(python_dist_name, latest_dist_url)
|
||||
download_if_not_exists(visual_c_name,
|
||||
@@ -203,7 +203,7 @@ and replaced with a .pth. Isolated mode will have to be specified manually.
|
||||
|
||||
log('Removing ._pth')
|
||||
major_release = latest_version.split('.')[1]
|
||||
os.remove(r'./python/python3' + major_release + '._pth')
|
||||
os.remove(rf'./python/python3{major_release}._pth')
|
||||
|
||||
log('Adding path_fixes.pth')
|
||||
with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
|
||||
@@ -214,7 +214,7 @@ with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
|
||||
# Need to add the directory where packages are installed,
|
||||
# and the parent directory (which is where the yt-local files are)
|
||||
major_release = latest_version.split('.')[1]
|
||||
with open('./python/python3' + major_release + '._pth', 'a', encoding='utf-8') as f:
|
||||
with open(rf'./python/python3{major_release}._pth', 'a', encoding='utf-8') as f:
|
||||
f.write('.\\Lib\\site-packages\n')
|
||||
f.write('..\n')'''
|
||||
|
||||
@@ -255,10 +255,10 @@ log('Copying python distribution into release folder')
|
||||
shutil.copytree(r'./python', r'./yt-local/python')
|
||||
|
||||
# ----------- Create release zip -----------
|
||||
output_filename = 'yt-local-' + release_tag + '-' + suffix + '.zip'
|
||||
if os.path.exists('./' + output_filename):
|
||||
output_filename = f'yt-local-{release_tag}-{suffix}.zip'
|
||||
if os.path.exists(f'./{output_filename}'):
|
||||
log('Removing previous zipped release')
|
||||
os.remove('./' + output_filename)
|
||||
os.remove(f'./{output_filename}')
|
||||
log('Zipping release')
|
||||
check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local']))
|
||||
|
||||
|
||||
19
server.py
19
server.py
@@ -32,9 +32,9 @@ def youtu_be(env, start_response):
|
||||
id = env['PATH_INFO'][1:]
|
||||
env['PATH_INFO'] = '/watch'
|
||||
if not env['QUERY_STRING']:
|
||||
env['QUERY_STRING'] = 'v=' + id
|
||||
env['QUERY_STRING'] = f'v={id}'
|
||||
else:
|
||||
env['QUERY_STRING'] += '&v=' + id
|
||||
env['QUERY_STRING'] += f'&v={id}'
|
||||
yield from yt_app(env, start_response)
|
||||
|
||||
|
||||
@@ -64,12 +64,12 @@ def proxy_site(env, start_response, video=False):
|
||||
if 'HTTP_RANGE' in env:
|
||||
send_headers['Range'] = env['HTTP_RANGE']
|
||||
|
||||
url = "https://" + env['SERVER_NAME'] + env['PATH_INFO']
|
||||
url = f"https://{env['SERVER_NAME']}{env['PATH_INFO']}"
|
||||
# remove /name portion
|
||||
if video and '/videoplayback/name/' in url:
|
||||
url = url[0:url.rfind('/name/')]
|
||||
if env['QUERY_STRING']:
|
||||
url += '?' + env['QUERY_STRING']
|
||||
url += f'?{env["QUERY_STRING"]}'
|
||||
|
||||
try_num = 1
|
||||
first_attempt = True
|
||||
@@ -96,7 +96,7 @@ def proxy_site(env, start_response, video=False):
|
||||
+[('Access-Control-Allow-Origin', '*')])
|
||||
|
||||
if first_attempt:
|
||||
start_response(str(response.status) + ' ' + response.reason,
|
||||
start_response(f"{response.status} {response.reason}",
|
||||
response_headers)
|
||||
|
||||
content_length = int(dict(response_headers).get('Content-Length', 0))
|
||||
@@ -136,9 +136,8 @@ def proxy_site(env, start_response, video=False):
|
||||
fail_byte = start + total_received
|
||||
send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end)
|
||||
print(
|
||||
'Warning: YouTube closed the connection before byte',
|
||||
str(fail_byte) + '.', 'Expected', start+content_length,
|
||||
'bytes.'
|
||||
f'Warning: YouTube closed the connection before byte {fail_byte}. '
|
||||
f'Expected {start+content_length} bytes.'
|
||||
)
|
||||
|
||||
retry = True
|
||||
@@ -185,7 +184,7 @@ def split_url(url):
|
||||
# python STILL doesn't have a proper regular expression engine like grep uses built in...
|
||||
match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url)
|
||||
if match is None:
|
||||
raise ValueError('Invalid or unsupported url: ' + url)
|
||||
raise ValueError(f'Invalid or unsupported url: {url}')
|
||||
|
||||
return match.group(1), match.group(2)
|
||||
|
||||
@@ -238,7 +237,7 @@ def site_dispatch(env, start_response):
|
||||
if base_name == '':
|
||||
base_name = domain
|
||||
else:
|
||||
base_name = domain + '.' + base_name
|
||||
base_name = f"{domain}.{base_name}"
|
||||
|
||||
try:
|
||||
handler = site_handlers[base_name]
|
||||
|
||||
10
settings.py
10
settings.py
@@ -397,14 +397,14 @@ acceptable_targets = SETTINGS_INFO.keys() | {
|
||||
def comment_string(comment):
|
||||
result = ''
|
||||
for line in comment.splitlines():
|
||||
result += '# ' + line + '\n'
|
||||
result += f'# {line}\n'
|
||||
return result
|
||||
|
||||
|
||||
def save_settings(settings_dict):
|
||||
with open(settings_file_path, 'w', encoding='utf-8') as file:
|
||||
for setting_name, setting_info in SETTINGS_INFO.items():
|
||||
file.write(comment_string(setting_info['comment']) + setting_name + ' = ' + repr(settings_dict[setting_name]) + '\n\n')
|
||||
file.write(f"{comment_string(setting_info['comment'])}{setting_name} = {repr(settings_dict[setting_name])}\n\n")
|
||||
|
||||
|
||||
def add_missing_settings(settings_dict):
|
||||
@@ -481,7 +481,7 @@ upgrade_functions = {
|
||||
|
||||
|
||||
def log_ignored_line(line_number, message):
|
||||
print('WARNING: Ignoring settings.txt line ' + str(line_number) + ' (' + message + ')')
|
||||
print(f'WARNING: Ignoring settings.txt line {line_number} ({message})')
|
||||
|
||||
|
||||
if os.path.isfile("settings.txt"):
|
||||
@@ -535,7 +535,7 @@ else:
|
||||
continue
|
||||
|
||||
if target.id not in acceptable_targets:
|
||||
log_ignored_line(node.lineno, target.id + " is not a valid setting")
|
||||
log_ignored_line(node.lineno, f"{target.id} is not a valid setting")
|
||||
continue
|
||||
|
||||
if type(node.value) not in attributes:
|
||||
@@ -645,6 +645,6 @@ def settings_page():
|
||||
for func, old_value, value in to_call:
|
||||
func(old_value, value)
|
||||
|
||||
return flask.redirect(util.URL_ORIGIN + '/settings', 303)
|
||||
return flask.redirect(f'{util.URL_ORIGIN}/settings', 303)
|
||||
else:
|
||||
flask.abort(400)
|
||||
|
||||
@@ -27,7 +27,7 @@ class TestChannelCtokenV5:
|
||||
|
||||
def _decode_outer(self, ctoken):
|
||||
"""Decode the outer protobuf layer of a ctoken."""
|
||||
raw = base64.urlsafe_b64decode(ctoken + '==')
|
||||
raw = base64.urlsafe_b64decode(f'{ctoken}==')
|
||||
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
|
||||
|
||||
def test_shorts_token_generates_without_error(self):
|
||||
@@ -68,8 +68,8 @@ class TestChannelCtokenV5:
|
||||
assert t_with_shorts != t_without_shorts
|
||||
|
||||
# Decode and verify the filter is present
|
||||
raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==')
|
||||
raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==')
|
||||
raw_with_shorts = base64.urlsafe_b64decode(f'{t_with_shorts}==')
|
||||
raw_without_shorts = base64.urlsafe_b64decode(f'{t_without_shorts}==')
|
||||
|
||||
# Parse the outer protobuf structure
|
||||
import youtube.proto as proto
|
||||
@@ -95,8 +95,8 @@ class TestChannelCtokenV5:
|
||||
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
|
||||
|
||||
# Decode the base64 data
|
||||
decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==')
|
||||
decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==')
|
||||
decoded_with_bytes = base64.urlsafe_b64decode(f'{decoded_with}==')
|
||||
decoded_without_bytes = base64.urlsafe_b64decode(f'{decoded_without}==')
|
||||
|
||||
# Parse the decoded protobuf data
|
||||
fields_with = list(proto.read_protobuf(decoded_with_bytes))
|
||||
|
||||
72
tests/test_watch_formats.py
Normal file
72
tests/test_watch_formats.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import pytest
|
||||
from youtube import watch_formats
|
||||
|
||||
|
||||
class TestCodecName:
|
||||
def test_avc_returns_h264(self):
|
||||
assert watch_formats.codec_name('avc1.64001F') == 'h264'
|
||||
|
||||
def test_av01_returns_av1(self):
|
||||
assert watch_formats.codec_name('av01.0.05M.08') == 'av1'
|
||||
|
||||
def test_vp9_returns_vp(self):
|
||||
assert watch_formats.codec_name('vp9') == 'vp'
|
||||
|
||||
def test_unknown_returns_unknown(self):
|
||||
assert watch_formats.codec_name('unknown_codec') == 'unknown'
|
||||
|
||||
|
||||
class TestVideoQualityString:
|
||||
def test_with_vcodec(self):
|
||||
fmt = {'vcodec': 'avc1', 'width': 1920, 'height': 1080, 'fps': 30}
|
||||
assert watch_formats.video_quality_string(fmt) == '1920x1080 30fps'
|
||||
|
||||
def test_with_vcodec_no_fps(self):
|
||||
fmt = {'vcodec': 'avc1', 'width': 1280, 'height': 720}
|
||||
assert watch_formats.video_quality_string(fmt) == '1280x720'
|
||||
|
||||
def test_with_acodec_only(self):
|
||||
fmt = {'acodec': 'mp4a.40.2'}
|
||||
assert watch_formats.video_quality_string(fmt) == 'audio only'
|
||||
|
||||
def test_empty(self):
|
||||
fmt = {}
|
||||
assert watch_formats.video_quality_string(fmt) == '?'
|
||||
|
||||
|
||||
class TestShortVideoQualityString:
|
||||
def test_with_fps(self):
|
||||
fmt = {'quality': 1080, 'fps': 60, 'vcodec': 'av01.0.05M.08'}
|
||||
assert watch_formats.short_video_quality_string(fmt) == '1080p60 AV1'
|
||||
|
||||
def test_h264(self):
|
||||
fmt = {'quality': 720, 'fps': 30, 'vcodec': 'avc1.64001E'}
|
||||
assert watch_formats.short_video_quality_string(fmt) == '720p30 h264'
|
||||
|
||||
|
||||
class TestAudioQualityString:
|
||||
def test_with_bitrate(self):
|
||||
fmt = {'acodec': 'mp4a.40.2', 'audio_bitrate': 128}
|
||||
assert watch_formats.audio_quality_string(fmt) == '128k'
|
||||
|
||||
def test_with_sample_rate(self):
|
||||
fmt = {'acodec': 'mp4a.40.2', 'audio_bitrate': 128, 'audio_sample_rate': 44100}
|
||||
assert watch_formats.audio_quality_string(fmt) == '128k 44.1kHz'
|
||||
|
||||
def test_video_only(self):
|
||||
fmt = {'vcodec': 'avc1'}
|
||||
assert watch_formats.audio_quality_string(fmt) == 'video only'
|
||||
|
||||
|
||||
class TestFormatBytes:
|
||||
def test_none(self):
|
||||
assert watch_formats.format_bytes(None) == 'N/A'
|
||||
|
||||
def test_bytes(self):
|
||||
assert watch_formats.format_bytes(512) == '512.00B'
|
||||
|
||||
def test_kibibytes(self):
|
||||
assert watch_formats.format_bytes(1024) == '1.00KiB'
|
||||
|
||||
def test_mebibytes(self):
|
||||
assert watch_formats.format_bytes(1048576) == '1.00MiB'
|
||||
@@ -76,7 +76,7 @@ theme_names = {
|
||||
@yt_app.context_processor
|
||||
def inject_theme_preference():
|
||||
return {
|
||||
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css',
|
||||
'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css',
|
||||
'settings': settings,
|
||||
# Detect version
|
||||
'current_version': app_version()['version'],
|
||||
@@ -145,9 +145,9 @@ def error_page(e):
|
||||
' exit node is overutilized. Try getting a new exit node by'
|
||||
' using the New Identity button in the Tor Browser.')
|
||||
if fetch_err.error_message:
|
||||
error_message += '\n\n' + fetch_err.error_message
|
||||
error_message += f'\n\n{fetch_err.error_message}'
|
||||
if fetch_err.ip:
|
||||
error_message += '\n\nExit node IP address: ' + fetch_err.ip
|
||||
error_message += f'\n\nExit node IP address: {fetch_err.ip}'
|
||||
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
|
||||
|
||||
elif error_code == '429':
|
||||
@@ -157,7 +157,7 @@ def error_page(e):
|
||||
'• Enable Tor routing in Settings for automatic IP rotation\n'
|
||||
'• Use a VPN to change your IP address')
|
||||
if fetch_err.ip:
|
||||
error_message += '\n\nYour IP: ' + fetch_err.ip
|
||||
error_message += f'\n\nYour IP: {fetch_err.ip}'
|
||||
return flask.render_template('error.html', error_message=error_message, slim=slim), 429
|
||||
|
||||
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
|
||||
@@ -179,7 +179,7 @@ def error_page(e):
|
||||
# Catch-all for any other FetchError (400, etc.)
|
||||
error_message = f'Error communicating with YouTube ({error_code}).'
|
||||
if fetch_err.error_message:
|
||||
error_message += '\n\n' + fetch_err.error_message
|
||||
error_message += f'\n\n{fetch_err.error_message}'
|
||||
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
|
||||
|
||||
return flask.render_template('error.html', traceback=traceback.format_exc(),
|
||||
|
||||
@@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
|
||||
# For now it seems to be constant for the API endpoint, not dependent
|
||||
# on the browsing session or channel
|
||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
||||
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||
|
||||
data = {
|
||||
'context': {
|
||||
@@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id):
|
||||
return 1000
|
||||
|
||||
# Uploads playlist
|
||||
playlist_id = 'UU' + channel_id[2:]
|
||||
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
|
||||
playlist_id = f'UU{channel_id[2:]}'
|
||||
url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1'
|
||||
|
||||
try:
|
||||
response = util.fetch_url(url, headers_mobile,
|
||||
@@ -328,7 +328,7 @@ def get_channel_id(base_url):
|
||||
# method that gives the smallest possible response at ~4 kb
|
||||
# needs to be as fast as possible
|
||||
base_url = base_url.replace('https://www', 'https://m') # avoid redirect
|
||||
response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
|
||||
response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile,
|
||||
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
|
||||
match = channel_id_re.search(response)
|
||||
if match:
|
||||
@@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page):
|
||||
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
|
||||
|
||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
||||
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||
|
||||
data = {
|
||||
'context': {
|
||||
@@ -414,18 +414,18 @@ def post_process_channel_info(info):
|
||||
|
||||
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
|
||||
if channel_id:
|
||||
base_url = 'https://www.youtube.com/channel/' + channel_id
|
||||
base_url = f'https://www.youtube.com/channel/{channel_id}'
|
||||
|
||||
# Build URL with sort parameter
|
||||
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
|
||||
# Note: 'da' (oldest) was removed by YouTube in January 2026
|
||||
url = base_url + '/' + tab + '?pbj=1&view=0'
|
||||
url = f'{base_url}/{tab}?pbj=1&view=0'
|
||||
if sort:
|
||||
# Map sort values to YouTube's URL parameter values
|
||||
sort_map = {'3': 'dd', '4': 'lad'}
|
||||
url += '&sort=' + sort_map.get(sort, 'dd')
|
||||
url += f'&sort={sort_map.get(sort, "dd")}'
|
||||
|
||||
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab)
|
||||
return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}')
|
||||
|
||||
|
||||
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
|
||||
@@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
||||
if page_number == 1:
|
||||
tasks = (
|
||||
gevent.spawn(playlist.playlist_first_page,
|
||||
'UU' + channel_id[2:],
|
||||
f'UU{channel_id[2:]}',
|
||||
report_text='Retrieved channel videos'),
|
||||
gevent.spawn(get_metadata, channel_id),
|
||||
)
|
||||
@@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
||||
set_cached_number_of_videos(channel_id, number_of_videos)
|
||||
else:
|
||||
tasks = (
|
||||
gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:],
|
||||
gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}',
|
||||
page_number, include_shorts=True),
|
||||
gevent.spawn(get_metadata, channel_id),
|
||||
gevent.spawn(get_number_of_videos_channel, channel_id),
|
||||
gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:],
|
||||
gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}',
|
||||
report_text='Retrieved channel video count'),
|
||||
)
|
||||
gevent.joinall(tasks)
|
||||
@@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
||||
elif tab == 'search' and channel_id:
|
||||
polymer_json = get_channel_search_json(channel_id, query, page_number)
|
||||
elif tab == 'search':
|
||||
url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='')
|
||||
url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}'
|
||||
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
|
||||
elif tab != 'videos':
|
||||
flask.abort(404, 'Unknown channel tab: ' + tab)
|
||||
flask.abort(404, f'Unknown channel tab: {tab}')
|
||||
|
||||
if polymer_json is not None and info is None:
|
||||
info = yt_data_extract.extract_channel_info(
|
||||
@@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
||||
return flask.render_template('error.html', error_message=info['error'])
|
||||
|
||||
if channel_id:
|
||||
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
|
||||
info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}'
|
||||
info['channel_id'] = channel_id
|
||||
else:
|
||||
channel_id = info['channel_id']
|
||||
@@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
||||
@yt_app.route('/channel/<channel_id>/')
|
||||
@yt_app.route('/channel/<channel_id>/<tab>')
|
||||
def get_channel_page(channel_id, tab='videos'):
|
||||
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
|
||||
return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id)
|
||||
|
||||
|
||||
@yt_app.route('/user/<username>/')
|
||||
@yt_app.route('/user/<username>/<tab>')
|
||||
def get_user_page(username, tab='videos'):
|
||||
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
|
||||
return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request)
|
||||
|
||||
|
||||
@yt_app.route('/c/<custom>/')
|
||||
@yt_app.route('/c/<custom>/<tab>')
|
||||
def get_custom_c_page(custom, tab='videos'):
|
||||
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
|
||||
return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request)
|
||||
|
||||
|
||||
@yt_app.route('/<custom>')
|
||||
@yt_app.route('/<custom>/<tab>')
|
||||
def get_toplevel_custom_page(custom, tab='videos'):
|
||||
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)
|
||||
return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request)
|
||||
|
||||
@@ -104,20 +104,19 @@ def post_process_comments_info(comments_info):
|
||||
comment['replies_url'] = None
|
||||
comment['replies_url'] = concat_or_none(
|
||||
util.URL_ORIGIN,
|
||||
'/comments?replies=1&ctoken=' + ctoken)
|
||||
f'/comments?replies=1&ctoken={ctoken}')
|
||||
|
||||
if reply_count == 0:
|
||||
comment['view_replies_text'] = 'Reply'
|
||||
elif reply_count == 1:
|
||||
comment['view_replies_text'] = '1 reply'
|
||||
else:
|
||||
comment['view_replies_text'] = str(reply_count) + ' replies'
|
||||
comment['view_replies_text'] = f'{reply_count} replies'
|
||||
|
||||
if comment['approx_like_count'] == '1':
|
||||
comment['likes_text'] = '1 like'
|
||||
else:
|
||||
comment['likes_text'] = (str(comment['approx_like_count'])
|
||||
+ ' likes')
|
||||
comment['likes_text'] = f"{comment['approx_like_count']} likes"
|
||||
|
||||
comments_info['include_avatars'] = settings.enable_comment_avatars
|
||||
if comments_info['ctoken']:
|
||||
@@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
|
||||
comments_info = {'error': None}
|
||||
try:
|
||||
other_sort_url = (
|
||||
util.URL_ORIGIN + '/comments?ctoken='
|
||||
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
|
||||
f"{util.URL_ORIGIN}/comments?ctoken="
|
||||
f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}"
|
||||
)
|
||||
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
|
||||
other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}'
|
||||
|
||||
this_sort_url = (util.URL_ORIGIN
|
||||
+ '/comments?ctoken='
|
||||
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
|
||||
this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken="
|
||||
f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}")
|
||||
|
||||
comments_info['comment_links'] = [
|
||||
(other_sort_text, other_sort_url),
|
||||
@@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
|
||||
if e.code == '429' and settings.route_tor:
|
||||
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
|
||||
if e.error_message:
|
||||
comments_info['error'] += '\n\n' + e.error_message
|
||||
comments_info['error'] += '\n\nExit node IP address: %s' % e.ip
|
||||
comments_info['error'] += f'\n\n{e.error_message}'
|
||||
comments_info['error'] += f'\n\nExit node IP address: {e.ip}'
|
||||
else:
|
||||
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
|
||||
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
|
||||
|
||||
except Exception as e:
|
||||
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
|
||||
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
|
||||
|
||||
if comments_info.get('error'):
|
||||
print('Error retrieving comments for ' + str(video_id) + ':\n' +
|
||||
comments_info['error'])
|
||||
print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}')
|
||||
|
||||
return comments_info
|
||||
|
||||
@@ -218,12 +215,10 @@ def get_comments_page():
|
||||
other_sort_url = None
|
||||
else:
|
||||
other_sort_url = (
|
||||
util.URL_ORIGIN
|
||||
+ '/comments?ctoken='
|
||||
+ make_comment_ctoken(comments_info['video_id'],
|
||||
sort=1-comments_info['sort'])
|
||||
f'{util.URL_ORIGIN}/comments?ctoken='
|
||||
f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}'
|
||||
)
|
||||
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top')
|
||||
other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}'
|
||||
comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
|
||||
|
||||
return flask.render_template(
|
||||
|
||||
190
youtube/constants.py
Normal file
190
youtube/constants.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""Constants used across yt-local application."""
|
||||
|
||||
import collections
|
||||
|
||||
YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com')
|
||||
|
||||
DEFAULT_USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'
|
||||
MOBILE_USER_AGENT = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
|
||||
|
||||
REPLACEMENT_MAP = collections.OrderedDict([
|
||||
('<', '_'),
|
||||
('>', '_'),
|
||||
(': ', ' - '),
|
||||
(':', '-'),
|
||||
('"', "'"),
|
||||
('/', '_'),
|
||||
('\\', '_'),
|
||||
('|', '-'),
|
||||
('?', ''),
|
||||
('*', '_'),
|
||||
('\t', ' '),
|
||||
])
|
||||
|
||||
DOS_RESERVED_NAMES = frozenset({
|
||||
'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
|
||||
'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
|
||||
'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
|
||||
'lpt8', 'lpt9'
|
||||
})
|
||||
|
||||
INNERTUBE_CLIENTS = {
|
||||
'android': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'ANDROID',
|
||||
'clientVersion': '21.02.35',
|
||||
'osName': 'Android',
|
||||
'osVersion': '11',
|
||||
'androidSdkVersion': 30,
|
||||
'platform': 'MOBILE',
|
||||
'userAgent': 'com.google.android.youtube/21.02.35 (Linux; U; Android 11) gzip'
|
||||
},
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
|
||||
'REQUIRE_JS_PLAYER': False,
|
||||
},
|
||||
|
||||
'ios': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'IOS',
|
||||
'clientVersion': '21.02.3',
|
||||
'deviceMake': 'Apple',
|
||||
'deviceModel': 'iPhone16,2',
|
||||
'osName': 'iPhone',
|
||||
'osVersion': '18.3.2.22D82',
|
||||
'userAgent': 'com.google.ios.youtube/21.02.3 (iPhone16,2; U; CPU iOS 18_3_2 like Mac OS X;)'
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
|
||||
'REQUIRE_JS_PLAYER': False
|
||||
},
|
||||
|
||||
'tv_embedded': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'TVHTML5_SIMPLY',
|
||||
'clientVersion': '1.0',
|
||||
},
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 75,
|
||||
'REQUIRE_JS_PLAYER': True,
|
||||
},
|
||||
|
||||
'web': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'clientName': 'WEB',
|
||||
'clientVersion': '2.20260114.08.00',
|
||||
'userAgent': DEFAULT_USER_AGENT,
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 1
|
||||
},
|
||||
|
||||
'web_embedded': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'WEB_EMBEDDED_PLAYER',
|
||||
'clientVersion': '1.20260115.01.00',
|
||||
},
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 56,
|
||||
'REQUIRE_JS_PLAYER': True,
|
||||
},
|
||||
|
||||
'mweb': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'MWEB',
|
||||
'clientVersion': '2.20260115.01.00',
|
||||
'userAgent': 'Mozilla/5.0 (iPad; CPU OS 16_7_10 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1,gzip(gfe)',
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 2,
|
||||
'REQUIRE_JS_PLAYER': True,
|
||||
},
|
||||
|
||||
'tv': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'TVHTML5',
|
||||
'clientVersion': '7.20260114.12.00',
|
||||
'userAgent': 'Mozilla/5.0 (ChromiumStylePlatform) Cobalt/25.lts.30.1034943-gold (unlike Gecko), Unknown_TV_Unknown_0/Unknown (Unknown, Unknown)',
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 7,
|
||||
'REQUIRE_JS_PLAYER': True,
|
||||
},
|
||||
|
||||
'android_vr': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'clientName': 'ANDROID_VR',
|
||||
'clientVersion': '1.65.10',
|
||||
'deviceMake': 'Oculus',
|
||||
'deviceModel': 'Quest 3',
|
||||
'androidSdkVersion': 32,
|
||||
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.65.10 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
|
||||
'osName': 'Android',
|
||||
'osVersion': '12L',
|
||||
},
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
|
||||
'REQUIRE_JS_PLAYER': False,
|
||||
},
|
||||
}
|
||||
|
||||
THEME_NAMES = {
|
||||
0: 'light_theme',
|
||||
1: 'gray_theme',
|
||||
2: 'dark_theme',
|
||||
}
|
||||
|
||||
FONT_CHOICES = {
|
||||
0: 'initial',
|
||||
1: '"liberation serif", "times new roman", calibri, carlito, serif',
|
||||
2: 'arial, "liberation sans", sans-serif',
|
||||
3: 'verdana, sans-serif',
|
||||
4: 'tahoma, sans-serif',
|
||||
}
|
||||
|
||||
URL_ORIGIN = "/https://www.youtube.com"
|
||||
|
||||
MAX_RETRIES = 5
|
||||
BASE_DELAY = 1.0
|
||||
|
||||
TOR_DEFAULT_PORT = 9050
|
||||
TOR_CONTROL_DEFAULT_PORT = 9151
|
||||
|
||||
DEFAULT_PORT = 9010
|
||||
|
||||
# Backward compatibility aliases (matching existing code names)
|
||||
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
|
||||
desktop_ua = (('User-Agent', desktop_user_agent),)
|
||||
mobile_ua = (('User-Agent', MOBILE_USER_AGENT),)
|
||||
json_header = (('Content-Type', 'application/json'),)
|
||||
|
||||
# Re-export for convenience
|
||||
url_origin = URL_ORIGIN
|
||||
@@ -41,8 +41,8 @@ def app_version():
|
||||
describe = minimal_env_cmd(['git', 'describe', '--tags', '--always'])
|
||||
git_revision = describe.strip().decode('ascii')
|
||||
|
||||
branch = minimal_env_cmd(['git', 'branch'])
|
||||
git_branch = branch.strip().decode('ascii').replace('* ', '')
|
||||
branch = minimal_env_cmd(['git', 'branch', '--show-current'])
|
||||
git_branch = branch.strip().decode('ascii')
|
||||
|
||||
subst_list.update({
|
||||
'branch': git_branch,
|
||||
|
||||
@@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name):
|
||||
util.add_extra_html_info(video)
|
||||
if video['id'] + '.jpg' in thumbnails:
|
||||
video['thumbnail'] = (
|
||||
'/https://youtube.com/data/playlist_thumbnails/'
|
||||
+ playlist_name
|
||||
+ '/' + video['id'] + '.jpg')
|
||||
f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg')
|
||||
else:
|
||||
video['thumbnail'] = util.get_thumbnail_url(video['id'])
|
||||
missing_thumbnails.append(video['id'])
|
||||
|
||||
@@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
|
||||
|
||||
continuation_info = proto.string(3, proto.percent_b64encode(offset))
|
||||
|
||||
playlist_id = proto.string(2, 'VL' + playlist_id)
|
||||
playlist_id = proto.string(2, f'VL{playlist_id}')
|
||||
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
|
||||
|
||||
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
|
||||
@@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
|
||||
use_mobile=False):
|
||||
# Use innertube API (pbj=1 no longer works for many playlists)
|
||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
||||
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||
|
||||
data = {
|
||||
'context': {
|
||||
@@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
|
||||
'clientVersion': '2.20240327.00.00',
|
||||
},
|
||||
},
|
||||
'browseId': 'VL' + playlist_id,
|
||||
'browseId': f'VL{playlist_id}',
|
||||
}
|
||||
|
||||
content_type_header = (('Content-Type', 'application/json'),)
|
||||
@@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
|
||||
page_size = 100
|
||||
|
||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
||||
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||
|
||||
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
|
||||
include_shorts=include_shorts)
|
||||
@@ -97,7 +97,7 @@ def get_playlist_page():
|
||||
if playlist_id.startswith('RD'):
|
||||
first_video_id = playlist_id[2:] # video ID after 'RD' prefix
|
||||
return flask.redirect(
|
||||
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id,
|
||||
f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}',
|
||||
302
|
||||
)
|
||||
|
||||
@@ -132,9 +132,9 @@ def get_playlist_page():
|
||||
if 'id' in item and not item.get('thumbnail'):
|
||||
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
|
||||
|
||||
item['url'] += '&list=' + playlist_id
|
||||
item['url'] += f'&list={playlist_id}'
|
||||
if item['index']:
|
||||
item['url'] += '&index=' + str(item['index'])
|
||||
item['url'] += f'&index={item["index"]}'
|
||||
|
||||
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
|
||||
if video_count is None:
|
||||
|
||||
@@ -76,7 +76,7 @@ def read_varint(data):
|
||||
except IndexError:
|
||||
if i == 0:
|
||||
raise EOFError()
|
||||
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
|
||||
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
|
||||
result |= (byte & 127) << 7*i
|
||||
if not byte & 128:
|
||||
break
|
||||
@@ -118,7 +118,7 @@ def read_protobuf(data):
|
||||
elif wire_type == 5:
|
||||
value = data.read(4)
|
||||
else:
|
||||
raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell()))
|
||||
raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}")
|
||||
yield (wire_type, field_number, value)
|
||||
|
||||
|
||||
@@ -170,8 +170,7 @@ def _make_protobuf(data):
|
||||
elif field[0] == 2:
|
||||
result += string(field[1], _make_protobuf(field[2]))
|
||||
else:
|
||||
raise NotImplementedError('Wire type ' + str(field[0])
|
||||
+ ' not implemented')
|
||||
raise NotImplementedError(f'Wire type {field[0]} not implemented')
|
||||
return result
|
||||
return data
|
||||
|
||||
@@ -218,4 +217,4 @@ def b64_to_bytes(data):
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('ascii')
|
||||
data = data.replace("%3D", "=")
|
||||
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
|
||||
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
|
||||
|
||||
@@ -179,7 +179,7 @@ def read_varint(data):
|
||||
except IndexError:
|
||||
if i == 0:
|
||||
raise EOFError()
|
||||
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
|
||||
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
|
||||
result |= (byte & 127) << 7*i
|
||||
if not byte & 128:
|
||||
break
|
||||
@@ -235,8 +235,7 @@ def _make_protobuf(data):
|
||||
elif field[0] == 2:
|
||||
result += string(field[1], _make_protobuf(field[2]))
|
||||
else:
|
||||
raise NotImplementedError('Wire type ' + str(field[0])
|
||||
+ ' not implemented')
|
||||
raise NotImplementedError(f'Wire type {field[0]} not implemented')
|
||||
return result
|
||||
return data
|
||||
|
||||
@@ -286,7 +285,7 @@ def b64_to_bytes(data):
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('ascii')
|
||||
data = data.replace("%3D", "=")
|
||||
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
|
||||
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -344,7 +343,7 @@ fromhex = bytes.fromhex
|
||||
|
||||
|
||||
def aligned_ascii(data):
|
||||
return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data)
|
||||
return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data)
|
||||
|
||||
|
||||
def parse_protobuf(data, mutable=False, spec=()):
|
||||
@@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()):
|
||||
elif wire_type == 5:
|
||||
value = data.read(4)
|
||||
else:
|
||||
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell()))
|
||||
raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}")
|
||||
if mutable:
|
||||
yield [wire_type, field_number, value]
|
||||
else:
|
||||
@@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None):
|
||||
if map01 is not None:
|
||||
map01 = _bytes_from_decode_data(map01)
|
||||
assert len(map01) == 1, repr(map01)
|
||||
s = s.translate(bytes.maketrans(b'01', b'O' + map01))
|
||||
s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}'))
|
||||
if casefold:
|
||||
s = s.upper()
|
||||
# Strip off pad characters from the right. We need to count the pad
|
||||
@@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None):
|
||||
def dec32(data):
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('ascii')
|
||||
return b32decode(data + "="*((8 - len(data)%8)%8))
|
||||
return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}')
|
||||
|
||||
|
||||
_patterns = [
|
||||
@@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work
|
||||
if len(obj) == 3: # (wire_type, field_number, data)
|
||||
return obj.__repr__()
|
||||
else: # (base64, [...])
|
||||
return ('(' + obj[0].__repr__() + ',\n'
|
||||
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
|
||||
+ ')')
|
||||
return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)"
|
||||
elif isinstance(obj, list):
|
||||
# [wire_type, field_number, data]
|
||||
if (len(obj) == 3
|
||||
@@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work
|
||||
elif (len(obj) == 3
|
||||
and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
|
||||
):
|
||||
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n'
|
||||
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
|
||||
+ ']')
|
||||
return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]"
|
||||
else:
|
||||
s = '[\n'
|
||||
for x in obj:
|
||||
s += indent_lines(_pp(x, indent), indent) + ',\n'
|
||||
s += f"{indent_lines(_pp(x, indent), indent)},\n"
|
||||
s += ']'
|
||||
return s
|
||||
else:
|
||||
|
||||
@@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
|
||||
'X-YouTube-Client-Name': '1',
|
||||
'X-YouTube-Client-Version': '2.20180418',
|
||||
}
|
||||
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
|
||||
url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}"
|
||||
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
|
||||
info = json.loads(content)
|
||||
return info
|
||||
|
||||
@@ -150,7 +150,7 @@
|
||||
* Create custom quality control in Plyr controls
|
||||
*/
|
||||
function addCustomQualityControl(player, qualityLabels) {
|
||||
player.on('ready', () => {
|
||||
function doAdd() {
|
||||
console.log('Adding custom quality control...');
|
||||
|
||||
const controls = player.elements.container.querySelector('.plyr__controls');
|
||||
@@ -238,14 +238,21 @@
|
||||
}
|
||||
|
||||
console.log('Custom quality control added');
|
||||
});
|
||||
}
|
||||
|
||||
// Run immediately if Plyr is already ready, otherwise wait
|
||||
if (player.ready) {
|
||||
doAdd();
|
||||
} else {
|
||||
player.on('ready', doAdd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create custom audio tracks control in Plyr controls
|
||||
*/
|
||||
function addCustomAudioTracksControl(player, hlsInstance) {
|
||||
player.on('ready', () => {
|
||||
function doAdd() {
|
||||
console.log('Adding custom audio tracks control...');
|
||||
|
||||
const controls = player.elements.container.querySelector('.plyr__controls');
|
||||
@@ -397,52 +404,32 @@
|
||||
});
|
||||
|
||||
console.log('Custom audio tracks control added');
|
||||
});
|
||||
}
|
||||
|
||||
// Run immediately if Plyr is already ready, otherwise wait
|
||||
if (player.ready) {
|
||||
doAdd();
|
||||
} else {
|
||||
player.on('ready', doAdd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize Plyr with HLS quality options
|
||||
* Main initialization
|
||||
*/
|
||||
function initPlyrWithQuality(hlsInstance) {
|
||||
async function start() {
|
||||
console.log('Starting Plyr with HLS...');
|
||||
|
||||
if (typeof hls_manifest_url === 'undefined' || !hls_manifest_url) {
|
||||
console.error('No HLS manifest URL available');
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize Plyr immediately so the player UI shows right away
|
||||
// instead of a bare <video> element while the manifest loads.
|
||||
const video = document.getElementById('js-video-player');
|
||||
|
||||
if (!hlsInstance || !hlsInstance.levels || hlsInstance.levels.length === 0) {
|
||||
console.error('HLS not ready');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!video) {
|
||||
console.error('Video element not found');
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('HLS levels available:', hlsInstance.levels.length);
|
||||
|
||||
const sortedLevels = [...hlsInstance.levels].sort((a, b) => b.height - a.height);
|
||||
|
||||
const seenHeights = new Set();
|
||||
const uniqueLevels = [];
|
||||
|
||||
sortedLevels.forEach((level) => {
|
||||
if (!seenHeights.has(level.height)) {
|
||||
seenHeights.add(level.height);
|
||||
uniqueLevels.push(level);
|
||||
}
|
||||
});
|
||||
|
||||
const qualityLabels = ['auto'];
|
||||
uniqueLevels.forEach((level) => {
|
||||
const originalIndex = hlsInstance.levels.indexOf(level);
|
||||
const label = level.height + 'p';
|
||||
if (!window.hlsQualityMap[label]) {
|
||||
qualityLabels.push(label);
|
||||
window.hlsQualityMap[label] = originalIndex;
|
||||
}
|
||||
});
|
||||
|
||||
console.log('Quality labels:', qualityLabels);
|
||||
|
||||
const playerOptions = {
|
||||
if (video) {
|
||||
plyrInstance = new Plyr(video, {
|
||||
autoplay: autoplayActive,
|
||||
disableContextMenu: false,
|
||||
captions: {
|
||||
@@ -472,64 +459,64 @@
|
||||
src: typeof storyboard_url !== 'undefined' && storyboard_url !== null ? [storyboard_url] : [],
|
||||
},
|
||||
settings: ['captions', 'speed', 'loop'],
|
||||
tooltips: {
|
||||
controls: true,
|
||||
},
|
||||
};
|
||||
|
||||
console.log('Creating Plyr...');
|
||||
|
||||
try {
|
||||
plyrInstance = new Plyr(video, playerOptions);
|
||||
console.log('Plyr instance created');
|
||||
|
||||
tooltips: { controls: true },
|
||||
});
|
||||
window.plyrInstance = plyrInstance;
|
||||
|
||||
addCustomQualityControl(plyrInstance, qualityLabels);
|
||||
addCustomAudioTracksControl(plyrInstance, hlsInstance);
|
||||
|
||||
if (plyrInstance.eventListeners) {
|
||||
plyrInstance.eventListeners.forEach(function(eventListener) {
|
||||
if(eventListener.type === 'dblclick') {
|
||||
eventListener.element.removeEventListener(eventListener.type, eventListener.callback, eventListener.options);
|
||||
if (eventListener.type === 'dblclick') {
|
||||
eventListener.element.removeEventListener(
|
||||
eventListener.type, eventListener.callback, eventListener.options);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
plyrInstance.started = false;
|
||||
plyrInstance.once('playing', function(){this.started = true});
|
||||
plyrInstance.once('playing', function(){ this.started = true; });
|
||||
|
||||
if (typeof data !== 'undefined' && data.time_start != 0) {
|
||||
video.addEventListener('loadedmetadata', function() {
|
||||
video.currentTime = data.time_start;
|
||||
});
|
||||
}
|
||||
|
||||
console.log('Plyr init complete');
|
||||
} catch (e) {
|
||||
console.error('Failed to initialize Plyr:', e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main initialization
|
||||
*/
|
||||
async function start() {
|
||||
console.log('Starting Plyr with HLS...');
|
||||
|
||||
if (typeof hls_manifest_url === 'undefined' || !hls_manifest_url) {
|
||||
console.error('No HLS manifest URL available');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const hlsInstance = await initHLS(hls_manifest_url);
|
||||
initPlyrWithQuality(hlsInstance);
|
||||
// Manifest is ready — add quality and audio controls
|
||||
addCustomQualityControl(plyrInstance, buildQualityLabels(hlsInstance));
|
||||
addCustomAudioTracksControl(plyrInstance, hlsInstance);
|
||||
} catch (error) {
|
||||
console.error('Failed to initialize:', error);
|
||||
console.error('Failed to initialize HLS:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build quality labels from HLS levels
|
||||
*/
|
||||
function buildQualityLabels(hlsInstance) {
|
||||
const qualityLabels = ['auto'];
|
||||
if (!hlsInstance || !hlsInstance.levels) return qualityLabels;
|
||||
|
||||
const sortedLevels = [...hlsInstance.levels].sort((a, b) => b.height - a.height);
|
||||
const seenHeights = new Set();
|
||||
|
||||
sortedLevels.forEach((level) => {
|
||||
if (!seenHeights.has(level.height)) {
|
||||
seenHeights.add(level.height);
|
||||
const originalIndex = hlsInstance.levels.indexOf(level);
|
||||
const label = level.height + 'p';
|
||||
if (!window.hlsQualityMap[label]) {
|
||||
qualityLabels.push(label);
|
||||
window.hlsQualityMap[label] = originalIndex;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return qualityLabels;
|
||||
}
|
||||
|
||||
if (document.readyState === 'loading') {
|
||||
document.addEventListener('DOMContentLoaded', start);
|
||||
} else {
|
||||
|
||||
@@ -31,9 +31,34 @@ if (data.using_pair_sources) {
|
||||
avMerge = new AVMerge(video, srcPair, 0);
|
||||
}
|
||||
|
||||
// Quality selector
|
||||
// Quality selector — populate with available sources
|
||||
const qs = document.getElementById('quality-select');
|
||||
if (qs) {
|
||||
// Clear the HLS-oriented "Auto" default; DASH has discrete sources
|
||||
qs.innerHTML = '';
|
||||
|
||||
// Add pair_sources (video+audio, used by AVMerge)
|
||||
if (data['pair_sources'] && data['pair_sources'].length) {
|
||||
data['pair_sources'].forEach(function(src, i) {
|
||||
let opt = document.createElement('option');
|
||||
opt.value = JSON.stringify({type: 'pair', index: i});
|
||||
opt.textContent = src.quality_string;
|
||||
if (i === data['pair_idx']) opt.selected = true;
|
||||
qs.appendChild(opt);
|
||||
});
|
||||
}
|
||||
|
||||
// Add uni_sources (integrated video+audio, single file)
|
||||
if (data['uni_sources'] && data['uni_sources'].length) {
|
||||
data['uni_sources'].forEach(function(src, i) {
|
||||
let opt = document.createElement('option');
|
||||
opt.value = JSON.stringify({type: 'uni', index: i});
|
||||
opt.textContent = src.quality_string;
|
||||
if (!data['pair_sources'].length && i === data['uni_idx']) opt.selected = true;
|
||||
qs.appendChild(opt);
|
||||
});
|
||||
}
|
||||
|
||||
qs.addEventListener('change', function(e) {
|
||||
changeQuality(JSON.parse(this.value))
|
||||
});
|
||||
|
||||
@@ -26,7 +26,17 @@ function initHLSNative(manifestUrl) {
|
||||
lowLatencyMode: false,
|
||||
maxBufferLength: 30,
|
||||
maxMaxBufferLength: 60,
|
||||
maxBufferHole: 0.5,
|
||||
startLevel: -1,
|
||||
// Prevent stalls on quality switch: nudge playback past small gaps
|
||||
nudgeMaxRetry: 5,
|
||||
// Allow more time for segments coming through our proxy
|
||||
fragLoadingTimeOut: 30000,
|
||||
fragLoadingMaxRetry: 5,
|
||||
fragLoadingRetryDelay: 1000,
|
||||
levelLoadingTimeOut: 15000,
|
||||
levelLoadingMaxRetry: 4,
|
||||
levelLoadingRetryDelay: 1000,
|
||||
});
|
||||
|
||||
window.hls = hls;
|
||||
@@ -89,15 +99,26 @@ function initHLSNative(manifestUrl) {
|
||||
console.error('HLS fatal error:', data.type, data.details);
|
||||
switch(data.type) {
|
||||
case Hls.ErrorTypes.NETWORK_ERROR:
|
||||
console.warn('HLS network error, attempting recovery...');
|
||||
hls.startLoad();
|
||||
break;
|
||||
case Hls.ErrorTypes.MEDIA_ERROR:
|
||||
console.warn('HLS media error, attempting recovery...');
|
||||
hls.recoverMediaError();
|
||||
break;
|
||||
default:
|
||||
hls.destroy();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Non-fatal errors can still cause stalls, especially
|
||||
// bufferStalledError after a quality switch through our proxy
|
||||
console.warn('HLS non-fatal error:', data.type, data.details);
|
||||
if (data.details === 'bufferStalledError') {
|
||||
// Buffer ran dry — HLS.js is waiting for data.
|
||||
// Nudge it to retry loading the current fragment.
|
||||
hls.startLoad();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -122,13 +143,36 @@ function initPlayer() {
|
||||
initHLSNative(hls_manifest_url);
|
||||
|
||||
const qualitySelect = document.getElementById('quality-select');
|
||||
// Set initial Auto option while manifest loads
|
||||
if (qualitySelect) {
|
||||
qualitySelect.innerHTML = '<option value="-1" selected>Auto</option>';
|
||||
}
|
||||
if (qualitySelect) {
|
||||
qualitySelect.addEventListener('change', function () {
|
||||
const level = parseInt(this.value);
|
||||
|
||||
if (hls) {
|
||||
hls.currentLevel = level;
|
||||
console.log('Quality:', level === -1 ? 'Auto' : hls.levels[level]?.height + 'p');
|
||||
const currentTime = video.currentTime;
|
||||
const wasPaused = video.paused;
|
||||
|
||||
// Use nextLevel for smoother transition: it waits for the
|
||||
// current segment to finish before switching, avoiding an
|
||||
// abrupt buffer flush that starves the player.
|
||||
if (level === -1) {
|
||||
// Back to auto — re-enable ABR
|
||||
hls.currentLevel = -1;
|
||||
console.log('Quality: Auto (ABR)');
|
||||
} else {
|
||||
hls.nextLevel = level;
|
||||
console.log('Quality: switching to',
|
||||
hls.levels[level]?.height + 'p');
|
||||
}
|
||||
|
||||
// If the video was already stalled, kick the loader
|
||||
// so it starts fetching the new level immediately.
|
||||
if (video.readyState < 3) {
|
||||
hls.startLoad(currentTime);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ def delete_thumbnails(to_delete):
|
||||
os.remove(os.path.join(thumbnails_directory, thumbnail))
|
||||
existing_thumbnails.remove(video_id)
|
||||
except Exception:
|
||||
print('Failed to delete thumbnail: ' + thumbnail)
|
||||
print(f'Failed to delete thumbnail: {thumbnail}')
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
@@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None):
|
||||
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
|
||||
'author': db_video[5],
|
||||
'author_id': db_video[6],
|
||||
'author_url': '/https://www.youtube.com/channel/' + db_video[6],
|
||||
'author_url': f'/https://www.youtube.com/channel/{db_video[6]}',
|
||||
})
|
||||
|
||||
return videos, pseudo_number_of_videos
|
||||
@@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time):
|
||||
if delta >= unit_time:
|
||||
quantifier = round(delta/unit_time)
|
||||
if quantifier == 1:
|
||||
return '1 ' + unit_name + ' ago'
|
||||
return f'1 {unit_name} ago'
|
||||
else:
|
||||
return str(quantifier) + ' ' + unit_name + 's ago'
|
||||
return f'{quantifier} {unit_name}s ago'
|
||||
else:
|
||||
raise Exception()
|
||||
|
||||
@@ -363,7 +363,7 @@ def autocheck_dispatcher():
|
||||
time_until_earliest_job = earliest_job['next_check_time'] - time.time()
|
||||
|
||||
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
|
||||
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time']))
|
||||
print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}')
|
||||
next_check_time = time.time() + 3600*secrets.randbelow(60)/60
|
||||
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
|
||||
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
|
||||
@@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids):
|
||||
|
||||
|
||||
def _get_atoma_feed(channel_id):
|
||||
url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id
|
||||
url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}'
|
||||
try:
|
||||
return util.fetch_url(url).decode('utf-8')
|
||||
except util.FetchError as e:
|
||||
@@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name):
|
||||
return channel_info
|
||||
except util.FetchError as e:
|
||||
if e.code == '429' and settings.route_tor:
|
||||
error_message = ('Error checking channel ' + channel_status_name
|
||||
+ ': YouTube blocked the request because the'
|
||||
+ ' Tor exit node is overutilized. Try getting a new exit node'
|
||||
+ ' by using the New Identity button in the Tor Browser.')
|
||||
error_message = (f'Error checking channel {channel_status_name}: '
|
||||
f'YouTube blocked the request because the Tor exit node is overutilized. '
|
||||
f'Try getting a new exit node by using the New Identity button in the Tor Browser.')
|
||||
if e.ip:
|
||||
error_message += ' Exit node IP address: ' + e.ip
|
||||
error_message += f' Exit node IP address: {e.ip}'
|
||||
print(error_message)
|
||||
return None
|
||||
elif e.code == '502':
|
||||
print('Error checking channel', channel_status_name + ':', str(e))
|
||||
print(f'Error checking channel {channel_status_name}: {e}')
|
||||
return None
|
||||
raise
|
||||
|
||||
@@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id):
|
||||
except KeyError:
|
||||
channel_status_name = channel_id
|
||||
|
||||
print("Checking channel: " + channel_status_name)
|
||||
print(f"Checking channel: {channel_status_name}")
|
||||
|
||||
tasks = (
|
||||
# channel page, need for video duration
|
||||
@@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id):
|
||||
times_published[video_id_element.text] = time_published
|
||||
|
||||
except ValueError:
|
||||
print('Failed to read atoma feed for ' + channel_status_name)
|
||||
print(f'Failed to read atoma feed for {channel_status_name}')
|
||||
traceback.print_exc()
|
||||
except defusedxml.ElementTree.ParseError:
|
||||
print('Failed to read atoma feed for ' + channel_status_name)
|
||||
print(f'Failed to read atoma feed for {channel_status_name}')
|
||||
|
||||
if channel_info is None: # there was an error
|
||||
return
|
||||
if channel_info['error']:
|
||||
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error'])
|
||||
print(f'Error checking channel {channel_status_name}: {channel_info["error"]}')
|
||||
return
|
||||
|
||||
videos = channel_info['items']
|
||||
@@ -1023,7 +1022,7 @@ def get_subscriptions_page():
|
||||
tag = request.args.get('tag', None)
|
||||
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
|
||||
for video in videos:
|
||||
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg'
|
||||
video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg'
|
||||
video['type'] = 'video'
|
||||
video['item_size'] = 'small'
|
||||
util.add_extra_html_info(video)
|
||||
@@ -1033,7 +1032,7 @@ def get_subscriptions_page():
|
||||
subscription_list = []
|
||||
for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
|
||||
subscription_list.append({
|
||||
'channel_url': util.URL_ORIGIN + '/channel/' + channel_id,
|
||||
'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}',
|
||||
'channel_name': channel_name,
|
||||
'channel_id': channel_id,
|
||||
'muted': muted,
|
||||
@@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail):
|
||||
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
|
||||
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
|
||||
try:
|
||||
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
|
||||
image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}")
|
||||
break
|
||||
except util.FetchError as e:
|
||||
if '404' in str(e):
|
||||
continue
|
||||
print("Failed to download thumbnail for " + video_id + ": " + str(e))
|
||||
print(f"Failed to download thumbnail for {video_id}: {e}")
|
||||
flask.abort(500)
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
continue
|
||||
print("Failed to download thumbnail for " + video_id + ": " + str(e))
|
||||
print(f"Failed to download thumbnail for {video_id}: {e}")
|
||||
flask.abort(e.code)
|
||||
|
||||
if image is None:
|
||||
|
||||
@@ -75,14 +75,11 @@
|
||||
<div class="external-player-controls">
|
||||
<input class="speed" id="speed-control" type="text" title="Video speed">
|
||||
{% if settings.use_video_player < 2 %}
|
||||
<!-- Native player quality selector -->
|
||||
<!-- Quality selector (populated by JS: HLS adds Auto+levels, DASH adds discrete sources) -->
|
||||
<select id="quality-select" autocomplete="off">
|
||||
<option value="-1" selected>Auto</option>
|
||||
<!-- Quality options will be populated by HLS -->
|
||||
</select>
|
||||
{% else %}
|
||||
<select id="quality-select" autocomplete="off" style="display: none;">
|
||||
<!-- Quality options will be populated by HLS -->
|
||||
</select>
|
||||
{% endif %}
|
||||
{% if settings.use_video_player != 2 %}
|
||||
|
||||
222
youtube/util.py
222
youtube/util.py
@@ -23,6 +23,9 @@ import stem
|
||||
import stem.control
|
||||
import traceback
|
||||
|
||||
from youtube.yt_data_extract.common import concat_or_none
|
||||
from youtube import constants
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# The trouble with the requests library: It ships its own certificate bundle via certifi
|
||||
@@ -72,7 +75,7 @@ class TorManager:
|
||||
def __init__(self):
|
||||
self.old_tor_connection_pool = None
|
||||
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
|
||||
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
|
||||
f'socks5h://127.0.0.1:{settings.tor_port}/',
|
||||
cert_reqs='CERT_REQUIRED')
|
||||
self.tor_pool_refresh_time = time.monotonic()
|
||||
settings.add_setting_changed_hook(
|
||||
@@ -92,7 +95,7 @@ class TorManager:
|
||||
self.old_tor_connection_pool = self.tor_connection_pool
|
||||
|
||||
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
|
||||
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
|
||||
f'socks5h://127.0.0.1:{settings.tor_port}/',
|
||||
cert_reqs='CERT_REQUIRED')
|
||||
self.tor_pool_refresh_time = time.monotonic()
|
||||
|
||||
@@ -198,9 +201,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
|
||||
class FetchError(Exception):
|
||||
def __init__(self, code, reason='', ip=None, error_message=None):
|
||||
if error_message:
|
||||
string = code + ' ' + reason + ': ' + error_message
|
||||
string = f"{code} {reason}: {error_message}"
|
||||
else:
|
||||
string = 'HTTP error during request: ' + code + ' ' + reason
|
||||
string = f"HTTP error during request: {code} {reason}"
|
||||
Exception.__init__(self, string)
|
||||
self.code = code
|
||||
self.reason = reason
|
||||
@@ -294,14 +297,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
|
||||
exception_cause = e.__context__.__context__
|
||||
if (isinstance(exception_cause, socks.ProxyConnectionError)
|
||||
and settings.route_tor):
|
||||
msg = ('Failed to connect to Tor. Check that Tor is open and '
|
||||
'that your internet connection is working.\n\n'
|
||||
+ str(e))
|
||||
msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}'
|
||||
raise FetchError('502', reason='Bad Gateway',
|
||||
error_message=msg)
|
||||
elif isinstance(e.__context__,
|
||||
urllib3.exceptions.NewConnectionError):
|
||||
msg = 'Failed to establish a connection.\n\n' + str(e)
|
||||
msg = f'Failed to establish a connection.\n\n{e}'
|
||||
raise FetchError(
|
||||
'502', reason='Bad Gateway',
|
||||
error_message=msg)
|
||||
@@ -391,7 +392,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
|
||||
if error:
|
||||
raise FetchError(
|
||||
'429', reason=response.reason, ip=ip,
|
||||
error_message='Automatic circuit change: ' + error)
|
||||
error_message=f'Automatic circuit change: {error}')
|
||||
continue # retry with new identity
|
||||
|
||||
# Check for client errors (400, 404) - don't retry these
|
||||
@@ -467,17 +468,14 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
|
||||
headers = {'User-Agent': 'Python-urllib'}
|
||||
response = pool.request('HEAD', url, headers=headers, retries=retries)
|
||||
if report_text:
|
||||
print(
|
||||
report_text,
|
||||
' Latency:',
|
||||
round(time.monotonic() - start_time, 3))
|
||||
print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}')
|
||||
return response
|
||||
|
||||
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
|
||||
mobile_ua = (('User-Agent', mobile_user_agent),)
|
||||
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
|
||||
desktop_ua = (('User-Agent', desktop_user_agent),)
|
||||
json_header = (('Content-Type', 'application/json'),)
|
||||
mobile_user_agent = constants.MOBILE_USER_AGENT
|
||||
mobile_ua = constants.mobile_ua
|
||||
desktop_user_agent = constants.desktop_user_agent
|
||||
desktop_ua = constants.desktop_ua
|
||||
json_header = constants.json_header
|
||||
desktop_xhr_headers = (
|
||||
('Accept', '*/*'),
|
||||
('Accept-Language', 'en-US,en;q=0.5'),
|
||||
@@ -544,16 +542,16 @@ def download_thumbnail(save_directory, video_id):
|
||||
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
|
||||
url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
|
||||
try:
|
||||
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id)
|
||||
thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}')
|
||||
except FetchError as e:
|
||||
if '404' in str(e):
|
||||
continue
|
||||
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
|
||||
print(f'Failed to download thumbnail for {video_id}: {e}')
|
||||
return False
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
continue
|
||||
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
|
||||
print(f'Failed to download thumbnail for {video_id}: {e}')
|
||||
return False
|
||||
try:
|
||||
with open(save_location, 'wb') as f:
|
||||
@@ -563,7 +561,7 @@ def download_thumbnail(save_directory, video_id):
|
||||
with open(save_location, 'wb') as f:
|
||||
f.write(thumbnail)
|
||||
return True
|
||||
print('No thumbnail available for ' + video_id)
|
||||
print(f'No thumbnail available for {video_id}')
|
||||
return False
|
||||
|
||||
|
||||
@@ -646,7 +644,7 @@ def update_query_string(query_string, items):
|
||||
return urllib.parse.urlencode(parameters, doseq=True)
|
||||
|
||||
|
||||
YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com')
|
||||
YOUTUBE_DOMAINS = constants.YOUTUBE_DOMAINS
|
||||
YOUTUBE_URL_RE_STR = r'https?://(?:[a-zA-Z0-9_-]*\.)?(?:'
|
||||
YOUTUBE_URL_RE_STR += r'|'.join(map(re.escape, YOUTUBE_DOMAINS))
|
||||
YOUTUBE_URL_RE_STR += r')(?:/[^"]*)?'
|
||||
@@ -673,16 +671,6 @@ def left_remove(string, substring):
|
||||
return string
|
||||
|
||||
|
||||
def concat_or_none(*strings):
|
||||
'''Concatenates strings. Returns None if any of the arguments are None'''
|
||||
result = ''
|
||||
for string in strings:
|
||||
if string is None:
|
||||
return None
|
||||
result += string
|
||||
return result
|
||||
|
||||
|
||||
def prefix_urls(item):
|
||||
if settings.proxy_images:
|
||||
try:
|
||||
@@ -698,7 +686,7 @@ def prefix_urls(item):
|
||||
|
||||
def add_extra_html_info(item):
|
||||
if item['type'] == 'video':
|
||||
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
|
||||
item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None
|
||||
|
||||
video_info = {}
|
||||
for key in ('id', 'title', 'author', 'duration', 'author_id'):
|
||||
@@ -721,7 +709,7 @@ def add_extra_html_info(item):
|
||||
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
|
||||
|
||||
if item.get('author_id') and 'author_url' not in item:
|
||||
item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id']
|
||||
item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}'
|
||||
|
||||
|
||||
def check_gevent_exceptions(*tasks):
|
||||
@@ -731,24 +719,8 @@ def check_gevent_exceptions(*tasks):
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/62888
|
||||
replacement_map = collections.OrderedDict([
|
||||
('<', '_'),
|
||||
('>', '_'),
|
||||
(': ', ' - '),
|
||||
(':', '-'),
|
||||
('"', "'"),
|
||||
('/', '_'),
|
||||
('\\', '_'),
|
||||
('|', '-'),
|
||||
('?', ''),
|
||||
('*', '_'),
|
||||
('\t', ' '),
|
||||
])
|
||||
|
||||
DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
|
||||
'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
|
||||
'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
|
||||
'lpt8', 'lpt9'}
|
||||
replacement_map = constants.REPLACEMENT_MAP
|
||||
DOS_names = constants.DOS_RESERVED_NAMES
|
||||
|
||||
|
||||
def to_valid_filename(name):
|
||||
@@ -790,143 +762,7 @@ def to_valid_filename(name):
|
||||
return name
|
||||
|
||||
|
||||
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/youtube.py#L72
|
||||
INNERTUBE_CLIENTS = {
|
||||
'android': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'ANDROID',
|
||||
'clientVersion': '19.09.36',
|
||||
'osName': 'Android',
|
||||
'osVersion': '12',
|
||||
'androidSdkVersion': 31,
|
||||
'platform': 'MOBILE',
|
||||
'userAgent': 'com.google.android.youtube/19.09.36 (Linux; U; Android 12; US) gzip'
|
||||
},
|
||||
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
|
||||
#'thirdParty': {
|
||||
# 'embedUrl': 'https://google.com', # Can be any valid URL
|
||||
#}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
|
||||
'REQUIRE_JS_PLAYER': False,
|
||||
},
|
||||
|
||||
'android-test-suite': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'ANDROID_TESTSUITE',
|
||||
'clientVersion': '1.9',
|
||||
'osName': 'Android',
|
||||
'osVersion': '12',
|
||||
'androidSdkVersion': 31,
|
||||
'platform': 'MOBILE',
|
||||
'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 12; US) gzip'
|
||||
},
|
||||
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
|
||||
#'thirdParty': {
|
||||
# 'embedUrl': 'https://google.com', # Can be any valid URL
|
||||
#}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
|
||||
'REQUIRE_JS_PLAYER': False,
|
||||
},
|
||||
|
||||
'ios': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'IOS',
|
||||
'clientVersion': '21.03.2',
|
||||
'deviceMake': 'Apple',
|
||||
'deviceModel': 'iPhone16,2',
|
||||
'osName': 'iPhone',
|
||||
'osVersion': '18.7.2.22H124',
|
||||
'userAgent': 'com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
|
||||
'REQUIRE_JS_PLAYER': False
|
||||
},
|
||||
|
||||
# This client can access age restricted videos (unless the uploader has disabled the 'allow embedding' option)
|
||||
# See: https://github.com/zerodytrash/YouTube-Internal-Clients
|
||||
'tv_embedded': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
|
||||
'clientVersion': '2.0',
|
||||
'clientScreen': 'EMBED',
|
||||
},
|
||||
# https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
|
||||
'thirdParty': {
|
||||
'embedUrl': 'https://google.com', # Can be any valid URL
|
||||
}
|
||||
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 85,
|
||||
'REQUIRE_JS_PLAYER': True,
|
||||
},
|
||||
|
||||
'web': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'clientName': 'WEB',
|
||||
'clientVersion': '2.20220801.00.00',
|
||||
'userAgent': desktop_user_agent,
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 1
|
||||
},
|
||||
'android_vr': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'clientName': 'ANDROID_VR',
|
||||
'clientVersion': '1.60.19',
|
||||
'deviceMake': 'Oculus',
|
||||
'deviceModel': 'Quest 3',
|
||||
'androidSdkVersion': 32,
|
||||
'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
|
||||
'osName': 'Android',
|
||||
'osVersion': '12L',
|
||||
},
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
|
||||
'REQUIRE_JS_PLAYER': False,
|
||||
},
|
||||
|
||||
'ios_vr': {
|
||||
'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
||||
'INNERTUBE_CONTEXT': {
|
||||
'client': {
|
||||
'hl': 'en',
|
||||
'gl': 'US',
|
||||
'clientName': 'IOS_VR',
|
||||
'clientVersion': '1.0',
|
||||
'deviceMake': 'Apple',
|
||||
'deviceModel': 'iPhone16,2',
|
||||
'osName': 'iPhone',
|
||||
'osVersion': '18.7.2.22H124',
|
||||
'userAgent': 'com.google.ios.youtube/1.0 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X)'
|
||||
}
|
||||
},
|
||||
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
|
||||
'REQUIRE_JS_PLAYER': False
|
||||
},
|
||||
}
|
||||
INNERTUBE_CLIENTS = constants.INNERTUBE_CLIENTS
|
||||
|
||||
def get_visitor_data():
|
||||
visitor_data = None
|
||||
@@ -967,7 +803,7 @@ def call_youtube_api(client, api, data):
|
||||
user_agent = context['client'].get('userAgent') or mobile_user_agent
|
||||
visitor_data = get_visitor_data()
|
||||
|
||||
url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key
|
||||
url = f'https://{host}/youtubei/v1/{api}?key={key}'
|
||||
if visitor_data:
|
||||
context['client'].update({'visitorData': visitor_data})
|
||||
data['context'] = context
|
||||
@@ -978,8 +814,8 @@ def call_youtube_api(client, api, data):
|
||||
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
|
||||
response = fetch_url(
|
||||
url, data=data, headers=headers,
|
||||
debug_name='youtubei_' + api + '_' + client,
|
||||
report_text='Fetched ' + client + ' youtubei ' + api
|
||||
debug_name=f'youtubei_{api}_{client}',
|
||||
report_text=f'Fetched {client} youtubei {api}'
|
||||
).decode('utf-8')
|
||||
return response
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
__version__ = 'v0.5.0'
|
||||
__version__ = 'v0.5.1'
|
||||
|
||||
209
youtube/watch.py
209
youtube/watch.py
@@ -17,8 +17,16 @@ from flask import request
|
||||
import youtube
|
||||
from youtube import yt_app
|
||||
from youtube import util, comments, local_playlist, yt_data_extract
|
||||
from youtube import watch_formats
|
||||
import settings
|
||||
|
||||
# Backward compatibility aliases
|
||||
codec_name = watch_formats.codec_name
|
||||
video_quality_string = watch_formats.video_quality_string
|
||||
short_video_quality_string = watch_formats.short_video_quality_string
|
||||
audio_quality_string = watch_formats.audio_quality_string
|
||||
format_bytes = watch_formats.format_bytes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -29,15 +37,7 @@ except FileNotFoundError:
|
||||
decrypt_cache = {}
|
||||
|
||||
|
||||
def codec_name(vcodec):
|
||||
if vcodec.startswith('avc'):
|
||||
return 'h264'
|
||||
elif vcodec.startswith('av01'):
|
||||
return 'av1'
|
||||
elif vcodec.startswith('vp'):
|
||||
return 'vp'
|
||||
else:
|
||||
return 'unknown'
|
||||
# codec_name imported from watch_formats
|
||||
|
||||
|
||||
def get_video_sources(info, target_resolution):
|
||||
@@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution):
|
||||
if fmt['acodec'] and fmt['vcodec']:
|
||||
if fmt.get('audio_track_is_default', True) is False:
|
||||
continue
|
||||
source = {'type': 'video/' + fmt['ext'],
|
||||
source = {'type': f"video/{fmt['ext']}",
|
||||
'quality_string': short_video_quality_string(fmt)}
|
||||
source['quality_string'] += ' (integrated)'
|
||||
source.update(fmt)
|
||||
@@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution):
|
||||
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
|
||||
if fmt['bitrate']:
|
||||
fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
|
||||
source = {'type': 'audio/' + fmt['ext'],
|
||||
source = {'type': f"audio/{fmt['ext']}",
|
||||
'quality_string': audio_quality_string(fmt)}
|
||||
source.update(fmt)
|
||||
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"'
|
||||
source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
|
||||
tid = fmt.get('audio_track_id') or 'default'
|
||||
if tid not in audio_by_track:
|
||||
audio_by_track[tid] = {
|
||||
@@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution):
|
||||
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
|
||||
if codec_name(fmt['vcodec']) == 'unknown':
|
||||
continue
|
||||
source = {'type': 'video/' + fmt['ext'],
|
||||
source = {'type': f"video/{fmt['ext']}",
|
||||
'quality_string': short_video_quality_string(fmt)}
|
||||
source.update(fmt)
|
||||
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"'
|
||||
quality = str(fmt['quality']) + 'p' + str(fmt['fps'])
|
||||
source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
|
||||
quality = f"{fmt['quality']}p{fmt['fps']}"
|
||||
video_only_sources.setdefault(quality, []).append(source)
|
||||
|
||||
audio_tracks = []
|
||||
@@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution):
|
||||
|
||||
def video_rank(src):
|
||||
''' Sort by settings preference. Use file size as tiebreaker '''
|
||||
setting_name = 'codec_rank_' + codec_name(src['vcodec'])
|
||||
setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
|
||||
return (settings.current_settings_dict[setting_name],
|
||||
src['file_size'])
|
||||
pair_info['videos'].sort(key=video_rank)
|
||||
@@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
|
||||
if auto:
|
||||
label += ' (Automatic)'
|
||||
if trans_lang:
|
||||
label += ' -> ' + trans_lang
|
||||
label += f' -> {trans_lang}'
|
||||
|
||||
# Try to use Android caption URL directly (no PO Token needed)
|
||||
caption_url = None
|
||||
@@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
|
||||
else:
|
||||
caption_url += '&fmt=vtt'
|
||||
if trans_lang:
|
||||
caption_url += '&tlang=' + trans_lang
|
||||
caption_url += f'&tlang={trans_lang}'
|
||||
url = util.prefix_url(caption_url)
|
||||
else:
|
||||
# Fallback to old method
|
||||
@@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id):
|
||||
|
||||
player_name = info['player_name']
|
||||
if player_name in decrypt_cache:
|
||||
print('Using cached decryption function for: ' + player_name)
|
||||
print(f'Using cached decryption function for: {player_name}')
|
||||
info['decryption_function'] = decrypt_cache[player_name]
|
||||
else:
|
||||
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
|
||||
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
|
||||
base_js = base_js.decode('utf-8')
|
||||
err = yt_data_extract.extract_decryption_function(info, base_js)
|
||||
if err:
|
||||
@@ -387,11 +387,11 @@ def fetch_player_response(client, video_id):
|
||||
def fetch_watch_page_info(video_id, playlist_id, index):
|
||||
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
|
||||
# videos
|
||||
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999'
|
||||
url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
|
||||
if playlist_id:
|
||||
url += '&list=' + playlist_id
|
||||
url += f'&list={playlist_id}'
|
||||
if index:
|
||||
url += '&index=' + index
|
||||
url += f'&index={index}'
|
||||
|
||||
headers = (
|
||||
('Accept', '*/*'),
|
||||
@@ -446,7 +446,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
info['hls_audio_tracks'] = {}
|
||||
hls_data = None
|
||||
hls_client_used = None
|
||||
for hls_client in ('ios', 'ios_vr', 'android'):
|
||||
for hls_client in ('ios', 'android'):
|
||||
try:
|
||||
resp = fetch_player_response(hls_client, video_id) or {}
|
||||
hls_data = json.loads(resp) if isinstance(resp, str) else resp
|
||||
@@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
# Register HLS audio tracks for proxy access
|
||||
added = 0
|
||||
for lang, track in info['hls_audio_tracks'].items():
|
||||
ck = video_id + '_' + lang
|
||||
ck = f"{video_id}_{lang}"
|
||||
from youtube.hls_cache import register_track
|
||||
register_track(ck, track['hls_url'],
|
||||
video_id=video_id, track_id=lang)
|
||||
@@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
'audio_track_id': lang,
|
||||
'audio_track_name': track['name'],
|
||||
'audio_track_is_default': track['is_default'],
|
||||
'itag': 'hls_' + lang,
|
||||
'itag': f'hls_{lang}',
|
||||
'ext': 'mp4',
|
||||
'audio_bitrate': 128,
|
||||
'bitrate': 128000,
|
||||
@@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
'fps': None,
|
||||
'init_range': {'start': 0, 'end': 0},
|
||||
'index_range': {'start': 0, 'end': 0},
|
||||
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck),
|
||||
'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
|
||||
's': None,
|
||||
'sp': None,
|
||||
'quality': None,
|
||||
@@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
|
||||
# Register HLS manifest for proxying
|
||||
if info['hls_manifest_url']:
|
||||
ck = video_id + '_video'
|
||||
ck = f"{video_id}_video"
|
||||
from youtube.hls_cache import register_track
|
||||
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
|
||||
# Use proxy URL instead of direct Google Video URL
|
||||
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck)
|
||||
info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
|
||||
|
||||
# Fallback to 'ios' if no valid URLs are found
|
||||
if not info.get('formats') or info.get('player_urls_missing'):
|
||||
@@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
if info.get('formats'):
|
||||
decryption_error = decrypt_signatures(info, video_id)
|
||||
if decryption_error:
|
||||
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error
|
||||
info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
|
||||
|
||||
# check if urls ready (non-live format) in former livestream
|
||||
# urls not ready if all of them have no filesize
|
||||
@@ -621,55 +621,10 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
||||
return info
|
||||
|
||||
|
||||
def video_quality_string(format):
|
||||
if format['vcodec']:
|
||||
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?')
|
||||
if format['fps']:
|
||||
result += ' ' + str(format['fps']) + 'fps'
|
||||
return result
|
||||
elif format['acodec']:
|
||||
return 'audio only'
|
||||
|
||||
return '?'
|
||||
|
||||
|
||||
def short_video_quality_string(fmt):
|
||||
result = str(fmt['quality'] or '?') + 'p'
|
||||
if fmt['fps']:
|
||||
result += str(fmt['fps'])
|
||||
if fmt['vcodec'].startswith('av01'):
|
||||
result += ' AV1'
|
||||
elif fmt['vcodec'].startswith('avc'):
|
||||
result += ' h264'
|
||||
else:
|
||||
result += ' ' + fmt['vcodec']
|
||||
return result
|
||||
|
||||
|
||||
def audio_quality_string(fmt):
|
||||
if fmt['acodec']:
|
||||
if fmt['audio_bitrate']:
|
||||
result = '%d' % fmt['audio_bitrate'] + 'k'
|
||||
else:
|
||||
result = '?k'
|
||||
if fmt['audio_sample_rate']:
|
||||
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz'
|
||||
return result
|
||||
elif fmt['vcodec']:
|
||||
return 'video only'
|
||||
return '?'
|
||||
|
||||
|
||||
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/utils.py
|
||||
def format_bytes(bytes):
|
||||
if bytes is None:
|
||||
return 'N/A'
|
||||
if type(bytes) is str:
|
||||
bytes = float(bytes)
|
||||
if bytes == 0.0:
|
||||
exponent = 0
|
||||
else:
|
||||
exponent = int(math.log(bytes, 1024.0))
|
||||
# video_quality_string imported from watch_formats
|
||||
# short_video_quality_string imported from watch_formats
|
||||
# audio_quality_string imported from watch_formats
|
||||
# format_bytes imported from watch_formats
|
||||
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
|
||||
converted = float(bytes) / float(1024 ** exponent)
|
||||
return '%.2f%s' % (converted, suffix)
|
||||
@@ -737,9 +692,9 @@ def get_audio_track():
|
||||
seg = line if line.startswith('http') else urljoin(playlist_base, line)
|
||||
# Always use &seg= parameter, never &url= for segments
|
||||
playlist_lines.append(
|
||||
base_url + '/ytl-api/audio-track?id='
|
||||
+ urllib.parse.quote(cache_key)
|
||||
+ '&seg=' + urllib.parse.quote(seg, safe='')
|
||||
f'{base_url}/ytl-api/audio-track?id='
|
||||
f'{urllib.parse.quote(cache_key)}'
|
||||
f'&seg={urllib.parse.quote(seg, safe="")}'
|
||||
)
|
||||
|
||||
playlist = '\n'.join(playlist_lines)
|
||||
@@ -797,9 +752,7 @@ def get_audio_track():
|
||||
return url
|
||||
if not url.startswith('http://') and not url.startswith('https://'):
|
||||
url = urljoin(playlist_base, url)
|
||||
return (base_url + '/ytl-api/audio-track?id='
|
||||
+ urllib.parse.quote(cache_key)
|
||||
+ '&seg=' + urllib.parse.quote(url, safe=''))
|
||||
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
|
||||
|
||||
playlist_lines = []
|
||||
for line in playlist.split('\n'):
|
||||
@@ -812,7 +765,7 @@ def get_audio_track():
|
||||
if line.startswith('#') and 'URI=' in line:
|
||||
def rewrite_uri_attr(match):
|
||||
uri = match.group(1)
|
||||
return 'URI="' + proxy_url(uri) + '"'
|
||||
return f'URI="{proxy_url(uri)}"'
|
||||
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
|
||||
playlist_lines.append(line)
|
||||
elif line.startswith('#'):
|
||||
@@ -834,14 +787,12 @@ def get_audio_track():
|
||||
|
||||
# This is an actual segment - fetch and serve it
|
||||
try:
|
||||
headers = (
|
||||
('User-Agent', 'Mozilla/5.0'),
|
||||
('Accept', '*/*'),
|
||||
)
|
||||
content = util.fetch_url(seg_url, headers=headers,
|
||||
debug_name='hls_seg', report_text=None)
|
||||
headers_dict = {
|
||||
'User-Agent': 'Mozilla/5.0',
|
||||
'Accept': '*/*',
|
||||
}
|
||||
|
||||
# Determine content type based on URL or content
|
||||
# Determine content type based on URL
|
||||
# HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s)
|
||||
if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'):
|
||||
content_type = 'video/mp4'
|
||||
@@ -851,7 +802,23 @@ def get_audio_track():
|
||||
# Default to MPEG-TS for HLS
|
||||
content_type = 'video/mp2t'
|
||||
|
||||
return flask.Response(content, mimetype=content_type,
|
||||
response, cleanup_func = util.fetch_url_response(
|
||||
seg_url, headers=tuple(headers_dict.items()),
|
||||
timeout=30, use_tor=settings.route_tor)
|
||||
|
||||
def generate():
|
||||
try:
|
||||
while True:
|
||||
chunk = response.read(64 * 1024) # 64 KB chunks
|
||||
if not chunk:
|
||||
break
|
||||
yield chunk
|
||||
finally:
|
||||
cleanup_func(response)
|
||||
|
||||
return flask.Response(
|
||||
flask.stream_with_context(generate()),
|
||||
mimetype=content_type,
|
||||
headers={
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Methods': 'GET, OPTIONS',
|
||||
@@ -883,9 +850,7 @@ def get_audio_track():
|
||||
if segment_url.startswith('/ytl-api/audio-track'):
|
||||
return segment_url
|
||||
base_url = request.url_root.rstrip('/')
|
||||
return (base_url + '/ytl-api/audio-track?id='
|
||||
+ urllib.parse.quote(cache_key)
|
||||
+ '&seg=' + urllib.parse.quote(segment_url))
|
||||
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
|
||||
|
||||
playlist_lines = []
|
||||
for line in playlist.split('\n'):
|
||||
@@ -949,14 +914,10 @@ def get_hls_manifest():
|
||||
|
||||
if is_audio_track:
|
||||
# Audio track playlist - proxy through audio-track endpoint
|
||||
return (base_url + '/ytl-api/audio-track?id='
|
||||
+ urllib.parse.quote(cache_key)
|
||||
+ '&url=' + urllib.parse.quote(url, safe=''))
|
||||
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
|
||||
else:
|
||||
# Video segment or variant playlist - proxy through audio-track endpoint
|
||||
return (base_url + '/ytl-api/audio-track?id='
|
||||
+ urllib.parse.quote(cache_key)
|
||||
+ '&seg=' + urllib.parse.quote(url, safe=''))
|
||||
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
|
||||
|
||||
# Parse and rewrite the manifest
|
||||
manifest_lines = []
|
||||
@@ -974,7 +935,7 @@ def get_hls_manifest():
|
||||
nonlocal rewritten_count
|
||||
uri = match.group(1)
|
||||
rewritten_count += 1
|
||||
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"'
|
||||
return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
|
||||
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
|
||||
manifest_lines.append(line)
|
||||
elif line.startswith('#'):
|
||||
@@ -1053,7 +1014,7 @@ def get_storyboard_vtt():
|
||||
ts = 0 # current timestamp
|
||||
|
||||
for i in range(storyboard.storyboard_count):
|
||||
url = '/' + storyboard.url.replace("$M", str(i))
|
||||
url = f'/{storyboard.url.replace("$M", str(i))}'
|
||||
interval = storyboard.interval
|
||||
w, h = storyboard.width, storyboard.height
|
||||
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
|
||||
@@ -1078,7 +1039,7 @@ def get_watch_page(video_id=None):
|
||||
if not video_id:
|
||||
return flask.render_template('error.html', error_message='Missing video id'), 404
|
||||
if len(video_id) < 11:
|
||||
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404
|
||||
return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
|
||||
|
||||
time_start_str = request.args.get('t', '0s')
|
||||
time_start = 0
|
||||
@@ -1141,9 +1102,9 @@ def get_watch_page(video_id=None):
|
||||
util.prefix_urls(item)
|
||||
util.add_extra_html_info(item)
|
||||
if playlist_id:
|
||||
item['url'] += '&list=' + playlist_id
|
||||
item['url'] += f'&list={playlist_id}'
|
||||
if item['index']:
|
||||
item['url'] += '&index=' + str(item['index'])
|
||||
item['url'] += f'&index={item["index"]}'
|
||||
info['playlist']['author_url'] = util.prefix_url(
|
||||
info['playlist']['author_url'])
|
||||
if settings.img_prefix:
|
||||
@@ -1159,16 +1120,16 @@ def get_watch_page(video_id=None):
|
||||
filename = title
|
||||
ext = fmt.get('ext')
|
||||
if ext:
|
||||
filename += '.' + ext
|
||||
filename += f'.{ext}'
|
||||
fmt['url'] = fmt['url'].replace(
|
||||
'/videoplayback',
|
||||
'/videoplayback/name/' + filename)
|
||||
f'/videoplayback/name/{filename}')
|
||||
|
||||
download_formats = []
|
||||
|
||||
for format in (info['formats'] + info['hls_formats']):
|
||||
if format['acodec'] and format['vcodec']:
|
||||
codecs_string = format['acodec'] + ', ' + format['vcodec']
|
||||
codecs_string = f"{format['acodec']}, {format['vcodec']}"
|
||||
else:
|
||||
codecs_string = format['acodec'] or format['vcodec'] or '?'
|
||||
download_formats.append({
|
||||
@@ -1247,12 +1208,9 @@ def get_watch_page(video_id=None):
|
||||
for source in subtitle_sources:
|
||||
best_caption_parse = urllib.parse.urlparse(
|
||||
source['url'].lstrip('/'))
|
||||
transcript_url = (util.URL_ORIGIN
|
||||
+ '/watch/transcript'
|
||||
+ best_caption_parse.path
|
||||
+ '?' + best_caption_parse.query)
|
||||
transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
|
||||
other_downloads.append({
|
||||
'label': 'Video Transcript: ' + source['label'],
|
||||
'label': f'Video Transcript: {source["label"]}',
|
||||
'ext': 'txt',
|
||||
'url': transcript_url
|
||||
})
|
||||
@@ -1263,7 +1221,7 @@ def get_watch_page(video_id=None):
|
||||
template_name = 'watch.html'
|
||||
return flask.render_template(template_name,
|
||||
header_playlist_names = local_playlist.get_playlist_names(),
|
||||
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
|
||||
uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
|
||||
time_published = info['time_published'],
|
||||
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
|
||||
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
|
||||
@@ -1305,10 +1263,10 @@ def get_watch_page(video_id=None):
|
||||
ip_address = info['ip_address'] if settings.route_tor else None,
|
||||
invidious_used = info['invidious_used'],
|
||||
invidious_reload_button = info['invidious_reload_button'],
|
||||
video_url = util.URL_ORIGIN + '/watch?v=' + video_id,
|
||||
video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
|
||||
video_id = video_id,
|
||||
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' +
|
||||
urlencode([('spec_url', info['storyboard_spec_url'])])
|
||||
storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
|
||||
f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
|
||||
if info['storyboard_spec_url'] else None),
|
||||
|
||||
js_data = {
|
||||
@@ -1335,7 +1293,7 @@ def get_watch_page(video_id=None):
|
||||
|
||||
@yt_app.route('/api/<path:dummy>')
|
||||
def get_captions(dummy):
|
||||
url = 'https://www.youtube.com' + request.full_path
|
||||
url = f'https://www.youtube.com{request.full_path}'
|
||||
try:
|
||||
result = util.fetch_url(url, headers=util.mobile_ua)
|
||||
result = result.replace(b"align:start position:0%", b"")
|
||||
@@ -1350,12 +1308,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
|
||||
@yt_app.route('/watch/transcript/<path:caption_path>')
|
||||
def get_transcript(caption_path):
|
||||
try:
|
||||
captions = util.fetch_url('https://www.youtube.com/'
|
||||
+ caption_path
|
||||
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
|
||||
captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
|
||||
except util.FetchError as e:
|
||||
msg = ('Error retrieving captions: ' + str(e) + '\n\n'
|
||||
+ 'The caption url may have expired.')
|
||||
msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
|
||||
print(msg)
|
||||
return flask.Response(
|
||||
msg,
|
||||
@@ -1403,7 +1358,7 @@ def get_transcript(caption_path):
|
||||
result = ''
|
||||
for seg in segments:
|
||||
if seg['text'] != ' ':
|
||||
result += seg['begin'] + ' ' + seg['text'] + '\r\n'
|
||||
result += f"{seg['begin']} {seg['text']}\r\n"
|
||||
|
||||
return flask.Response(result.encode('utf-8'),
|
||||
mimetype='text/plain;charset=UTF-8')
|
||||
|
||||
82
youtube/watch_formats.py
Normal file
82
youtube/watch_formats.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Video format helpers for yt-local."""
|
||||
|
||||
import math
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
def codec_name(vcodec: str) -> str:
|
||||
"""Extract codec short name from codec string."""
|
||||
if vcodec.startswith('avc'):
|
||||
return 'h264'
|
||||
elif vcodec.startswith('av01'):
|
||||
return 'av1'
|
||||
elif vcodec.startswith('vp'):
|
||||
return 'vp'
|
||||
else:
|
||||
return 'unknown'
|
||||
|
||||
|
||||
def video_quality_string(fmt: Dict[str, Any]) -> str:
|
||||
"""Return video quality string (e.g., '1920x1080 30fps')."""
|
||||
if fmt.get('vcodec'):
|
||||
result = f"{fmt.get('width') or '?'}x{fmt.get('height') or '?'}"
|
||||
if fmt.get('fps'):
|
||||
result += f" {fmt['fps']}fps"
|
||||
return result
|
||||
elif fmt.get('acodec'):
|
||||
return 'audio only'
|
||||
return '?'
|
||||
|
||||
|
||||
def short_video_quality_string(fmt: Dict[str, Any]) -> str:
|
||||
"""Return short video quality string (e.g., '1080p60 AV1')."""
|
||||
result = f"{fmt.get('quality') or '?'}p"
|
||||
if fmt.get('fps'):
|
||||
result += str(fmt['fps'])
|
||||
vcodec = fmt.get('vcodec', '')
|
||||
if vcodec.startswith('av01'):
|
||||
result += ' AV1'
|
||||
elif vcodec.startswith('avc'):
|
||||
result += ' h264'
|
||||
else:
|
||||
result += f" {vcodec}"
|
||||
return result
|
||||
|
||||
|
||||
def audio_quality_string(fmt: Dict[str, Any]) -> str:
|
||||
"""Return audio quality string (e.g., '128k 44.1kHz')."""
|
||||
if fmt.get('acodec'):
|
||||
if fmt.get('audio_bitrate'):
|
||||
result = f"{fmt['audio_bitrate']}k"
|
||||
else:
|
||||
result = '?k'
|
||||
if fmt.get('audio_sample_rate'):
|
||||
result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz"
|
||||
return result
|
||||
elif fmt.get('vcodec'):
|
||||
return 'video only'
|
||||
return '?'
|
||||
|
||||
|
||||
def format_bytes(bytes_val: Optional[float]) -> str:
|
||||
"""Convert bytes to human-readable string (e.g., '1.5 MiB')."""
|
||||
if bytes_val is None:
|
||||
return 'N/A'
|
||||
if type(bytes_val) is str:
|
||||
bytes_val = float(bytes_val)
|
||||
if bytes_val == 0.0:
|
||||
exponent = 0
|
||||
else:
|
||||
exponent = int(math.log(bytes_val, 1024.0))
|
||||
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
|
||||
converted = float(bytes_val) / float(1024 ** exponent)
|
||||
return '%.2f%s' % (converted, suffix)
|
||||
|
||||
|
||||
__all__ = [
|
||||
'codec_name',
|
||||
'video_quality_string',
|
||||
'short_video_quality_string',
|
||||
'audio_quality_string',
|
||||
'format_bytes',
|
||||
]
|
||||
@@ -1,7 +1,8 @@
|
||||
from .common import (get, multi_get, deep_get, multi_deep_get,
|
||||
liberal_update, conservative_update, remove_redirect, normalize_url,
|
||||
extract_str, extract_formatted_text, extract_int, extract_approx_int,
|
||||
extract_date, extract_item_info, extract_items, extract_response)
|
||||
extract_date, extract_item_info, extract_items, extract_response,
|
||||
concat_or_none)
|
||||
|
||||
from .everything_else import (extract_channel_info, extract_search_info,
|
||||
extract_playlist_metadata, extract_playlist_info, extract_comments_info)
|
||||
|
||||
@@ -212,7 +212,7 @@ def extract_date(date_text):
|
||||
month, day, year = parts[-3:]
|
||||
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
|
||||
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
|
||||
return year + '-' + month + '-' + day
|
||||
return f'{year}-{month}-{day}'
|
||||
return None
|
||||
|
||||
def check_missing_keys(object, *key_sequences):
|
||||
@@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences):
|
||||
for key in key_sequence:
|
||||
_object = _object[key]
|
||||
except (KeyError, IndexError, TypeError):
|
||||
return 'Could not find ' + key
|
||||
return f'Could not find {key}'
|
||||
|
||||
return None
|
||||
|
||||
@@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}):
|
||||
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
|
||||
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
|
||||
))
|
||||
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
|
||||
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
|
||||
info['description'] = extract_formatted_text(multi_deep_get(
|
||||
item,
|
||||
['descriptionText'], ['descriptionSnippet'],
|
||||
|
||||
@@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json):
|
||||
metadata['description'] = desc
|
||||
|
||||
if metadata['author_id']:
|
||||
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
|
||||
metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}'
|
||||
|
||||
if metadata['first_video_id'] is None:
|
||||
metadata['thumbnail'] = None
|
||||
|
||||
@@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''):
|
||||
)
|
||||
|
||||
if playability_status not in (None, 'OK'):
|
||||
info['playability_error'] = error_prefix + playability_reason
|
||||
info['playability_error'] = f'{error_prefix}{playability_reason}'
|
||||
elif not info['playability_error']: # do not override
|
||||
info['playability_error'] = error_prefix + 'Unknown playability error'
|
||||
info['playability_error'] = f'{error_prefix}Unknown playability error'
|
||||
|
||||
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
|
||||
def extract_watch_info(polymer_json):
|
||||
@@ -726,7 +726,7 @@ def extract_watch_info(polymer_json):
|
||||
# Store the full URL from the player response (includes valid tokens)
|
||||
if base_url:
|
||||
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
|
||||
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
|
||||
info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized
|
||||
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
|
||||
if lang_name:
|
||||
info['_manual_caption_language_names'][lang_code] = lang_name
|
||||
@@ -806,7 +806,7 @@ def extract_watch_info(polymer_json):
|
||||
info['allowed_countries'] = mf.get('availableCountries', [])
|
||||
|
||||
# other stuff
|
||||
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
|
||||
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
|
||||
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
|
||||
|
||||
return info
|
||||
@@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
|
||||
url = info['_captions_base_url']
|
||||
if not url:
|
||||
return None
|
||||
url += '&lang=' + language
|
||||
url += '&fmt=' + format
|
||||
url += f'&lang={language}'
|
||||
url += f'&fmt={format}'
|
||||
if automatic:
|
||||
url += '&kind=asr'
|
||||
elif language in info['_manual_caption_language_names']:
|
||||
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='')
|
||||
url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}'
|
||||
|
||||
if translation_language:
|
||||
url += '&tlang=' + translation_language
|
||||
@@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js):
|
||||
return 'Could not find var_name'
|
||||
|
||||
var_name = var_with_operation_match.group(1)
|
||||
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
|
||||
var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL)
|
||||
if var_body_match is None:
|
||||
return 'Could not find var_body'
|
||||
|
||||
@@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js):
|
||||
elif op_body.startswith('var c=a[0]'):
|
||||
operation_definitions[op_name] = 2
|
||||
else:
|
||||
return 'Unknown op_body: ' + op_body
|
||||
return f'Unknown op_body: {op_body}'
|
||||
|
||||
decryption_function = []
|
||||
for op_with_arg in function_body:
|
||||
@@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js):
|
||||
return 'Could not parse operation with arg'
|
||||
op_name = match.group(2).strip('[].')
|
||||
if op_name not in operation_definitions:
|
||||
return 'Unknown op_name: ' + str(op_name)
|
||||
return f'Unknown op_name: {op_name}'
|
||||
op_argument = match.group(3)
|
||||
decryption_function.append([operation_definitions[op_name], int(op_argument)])
|
||||
|
||||
@@ -1028,5 +1028,5 @@ def decrypt_signatures(info):
|
||||
_operation_2(a, argument)
|
||||
|
||||
signature = ''.join(a)
|
||||
format['url'] += '&' + format['sp'] + '=' + signature
|
||||
format['url'] += f'&{format["sp"]}={signature}'
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user