2 Commits

Author SHA1 Message Date
50ad959a80 refactor: replace string concatenations with f-strings
All checks were successful
CI / test (push) Successful in 50s
2026-04-25 01:02:17 -05:00
a0f315be51 feature/hls: Add HLS playback support, and refactors documentation for better usability and maintainability. (#1)
All checks were successful
git-sync-with-mirror / git-sync (push) Successful in 32s
CI / test (push) Successful in 46s
## Overview
This PR introduces HLS playback support, improves the player experience, and refactors documentation for better usability and maintainability.

## Key Features

### HLS Playback Support
- Add HLS integration via new JavaScript assets:
  - `hls.min.js`
  - `plyr.hls.start.js`
  - `watch.hls.js`
- Separate DASH and HLS logic:
  - `plyr-start.js` → `plyr.dash.start.js`
  - `watch.js` → `watch.dash.js`
- Update templates (`embed.html`, `watch.html`) for conditional player loading

### Native Storyboard Preview
- Add `native_player_storyboard` setting in `settings.py`
- Implement hover thumbnail preview for native player modes
- Add `storyboard-preview.js`

### UI and Player Adjustments
- Update templates and styles (`custom_plyr.css`)
- Modify backend modules to support new player modes:
  - `watch.py`, `channel.py`, `util.py`, and related components

### Internationalization
- Update translation files:
  - `messages.po`
  - `messages.pot`

### Testing and CI
- Add and update tests:
  - `test_shorts.py`
  - `test_util.py`
- Minor CI and release script improvements

## Documentation

### OpenRC Service Guide Rewrite
- Restructure `docs/basic-script-openrc/README.md` into:
  - Prerequisites
  - Installation
  - Service Management
  - Verification
  - Troubleshooting
- Add admonition blocks:
  - `[!NOTE]`, `[!TIP]`, `[!IMPORTANT]`, `[!WARNING]`, `[!CAUTION]`
- Fix log inspection command:
  ```bash
  doas tail -f /var/log/ytlocal.log
  ````

* Add path placeholders and clarify permission requirements
* Remove legacy and duplicate content

Reviewed-on: #1
Co-authored-by: Astounds <kirito@disroot.org>
Co-committed-by: Astounds <kirito@disroot.org>
2026-04-20 01:22:55 -04:00
26 changed files with 459 additions and 317 deletions

View File

@@ -1,76 +1,108 @@
## Basic init yt-local for openrc # Basic init yt-local for openrc
1. Write `/etc/init.d/ytlocal` file. ## Prerequisites
``` - System with OpenRC installed and configured.
#!/sbin/openrc-run - Administrative privileges (doas or sudo).
# Distributed under the terms of the GNU General Public License v3 or later - `ytlocal` script located at `/usr/sbin/ytlocal` and application files in an accessible directory.
name="yt-local"
pidfile="/var/run/ytlocal.pid"
command="/usr/sbin/ytlocal"
depend() { ## Service Installation
1. **Create the OpenRC service script** `/etc/init.d/ytlocal`:
```sh
#!/sbin/openrc-run
# Distributed under the terms of the GNU General Public License v3 or later
name="yt-local"
pidfile="/var/run/ytlocal.pid"
command="/usr/sbin/ytlocal"
depend() {
use net use net
} }
start_pre() { start_pre() {
if [ ! -f /usr/sbin/ytlocal ] ; then if [ ! -f /usr/sbin/ytlocal ]; then
eerror "Please create script file of ytlocal in '/usr/sbin/ytlocal'" eerror "Please create script file of ytlocal in '/usr/sbin/ytlocal'"
return 1 return 1
else else
return 0 return 0
fi fi
} }
start() { start() {
ebegin "Starting yt-local" ebegin "Starting yt-local"
start-stop-daemon --start --exec "${command}" --pidfile "${pidfile}" start-stop-daemon --start --exec "${command}" --pidfile "${pidfile}"
eend $? eend $?
} }
reload() { reload() {
ebegin "Reloading ${name}" ebegin "Reloading ${name}"
start-stop-daemon --signal HUP --pidfile "${pidfile}" start-stop-daemon --signal HUP --pidfile "${pidfile}"
eend $? eend $?
} }
stop() { stop() {
ebegin "Stopping ${name}" ebegin "Stopping ${name}"
start-stop-daemon --quiet --stop --exec "${command}" --pidfile "${pidfile}" start-stop-daemon --quiet --stop --exec "${command}" --pidfile "${pidfile}"
eend $? eend $?
} }
``` ```
after, modified execute permissions: > [!NOTE]
> Ensure the script is executable:
>
> ```sh
> doas chmod a+x /etc/init.d/ytlocal
> ```
$ doas chmod a+x /etc/init.d/ytlocal 2. **Create the executable script** `/usr/sbin/ytlocal`:
```bash
#!/usr/bin/env bash
2. Write `/usr/sbin/ytlocal` and configure path. # Change the working directory according to your installation path
# Example: if installed in /usr/local/ytlocal, use:
cd /home/your-path/ytlocal/ # <-- MODIFY TO YOUR PATH
source venv/bin/activate
python server.py > /dev/null 2>&1 &
echo $! > /var/run/ytlocal.pid
```
``` > [!WARNING]
#!/usr/bin/env bash > Run this script only as root or via `doas`, as it writes to `/var/run` and uses network privileges.
cd /home/your-path/ytlocal/ # change me > [!TIP]
source venv/bin/activate > To store the PID in a different location, adjust the `pidfile` variable in the service script.
python server.py > /dev/null 2>&1 &
echo $! > /var/run/ytlocal.pid
```
after, modified execute permissions: > [!IMPORTANT]
> Verify that the virtual environment (`venv`) is correctly set up and that `python` points to the appropriate version.
$ doas chmod a+x /usr/sbin/ytlocal > [!CAUTION]
> Do not stop the process manually; use OpenRC commands (`rc-service ytlocal stop`) to avoid race conditions.
> [!NOTE]
> When run with administrative privileges, the configuration is saved in `/root/.yt-local`, which is rootonly.
3. OpenRC check ## Service Management
- status: `doas rc-service ytlocal status` - **Status**: `doas rc-service ytlocal status`
- start: `doas rc-service ytlocal start` - **Start**: `doas rc-service ytlocal start`
- restart: `doas rc-service ytlocal restart` - **Restart**: `doas rc-service ytlocal restart`
- stop: `doas rc-service ytlocal stop` - **Stop**: `doas rc-service ytlocal stop`
- **Enable at boot**: `doas rc-update add ytlocal default`
- **Disable**: `doas rc-update del ytlocal`
- enable: `doas rc-update add ytlocal default` ## PostInstallation Verification
- disable: `doas rc-update del ytlocal`
When yt-local is run with administrator privileges, - Confirm the process is running: `doas rc-service ytlocal status`
the configuration file is stored in /root/.yt-local - Inspect logs for issues: `doas tail -f /var/log/ytlocal.log` (if logging is configured).
## Troubleshooting Common Issues
- **Service fails to start**: verify script permissions, correct `command=` path, and that the virtualenv exists.
- **Port conflict**: adjust the servers port configuration before launching.
- **Import errors**: ensure all dependencies are installed in the virtual environment.
[!IMPORTANT]
Keep the service script updated when modifying startup logic or adding new dependencies.

View File

@@ -33,7 +33,7 @@ def check_subp(x):
raise Exception('Got nonzero exit code from command') raise Exception('Got nonzero exit code from command')
def log(line): def log(line):
print('[generate_release.py] ' + line) print(f'[generate_release.py] {line}')
# https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions # https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions
def remove_files_with_extensions(path, extensions): def remove_files_with_extensions(path, extensions):
@@ -43,23 +43,23 @@ def remove_files_with_extensions(path, extensions):
os.remove(os.path.join(root, file)) os.remove(os.path.join(root, file))
def download_if_not_exists(file_name, url, sha256=None): def download_if_not_exists(file_name, url, sha256=None):
if not os.path.exists('./' + file_name): if not os.path.exists(f'./{file_name}'):
# Reject non-https URLs so a mistaken constant cannot cause a # Reject non-https URLs so a mistaken constant cannot cause a
# plaintext download (bandit B310 hardening). # plaintext download (bandit B310 hardening).
if not url.startswith('https://'): if not url.startswith('https://'):
raise Exception('Refusing to download over non-https URL: ' + url) raise Exception(f'Refusing to download over non-https URL: {url}')
log('Downloading ' + file_name + '..') log(f'Downloading {file_name}..')
data = urllib.request.urlopen(url).read() data = urllib.request.urlopen(url).read()
log('Finished downloading ' + file_name) log(f'Finished downloading {file_name}')
with open('./' + file_name, 'wb') as f: with open(f'./{file_name}', 'wb') as f:
f.write(data) f.write(data)
if sha256: if sha256:
digest = hashlib.sha256(data).hexdigest() digest = hashlib.sha256(data).hexdigest()
if digest != sha256: if digest != sha256:
log('Error: ' + file_name + ' has wrong hash: ' + digest) log(f'Error: {file_name} has wrong hash: {digest}')
sys.exit(1) sys.exit(1)
else: else:
log('Using existing ' + file_name) log(f'Using existing {file_name}')
def wine_run_shell(command): def wine_run_shell(command):
# Keep argv-style invocation (no shell) to avoid command injection. # Keep argv-style invocation (no shell) to avoid command injection.
@@ -120,7 +120,7 @@ if len(os.listdir('./yt-local')) == 0:
# ----------- Generate embedded python distribution ----------- # ----------- Generate embedded python distribution -----------
os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution
get_pip_url = 'https://bootstrap.pypa.io/get-pip.py' get_pip_url = 'https://bootstrap.pypa.io/get-pip.py'
latest_dist_url = 'https://www.python.org/ftp/python/' + latest_version + '/python-' + latest_version latest_dist_url = f'https://www.python.org/ftp/python/{latest_version}/python-{latest_version}'
if bitness == '32': if bitness == '32':
latest_dist_url += '-embed-win32.zip' latest_dist_url += '-embed-win32.zip'
else: else:
@@ -142,7 +142,7 @@ else:
download_if_not_exists('get-pip.py', get_pip_url) download_if_not_exists('get-pip.py', get_pip_url)
python_dist_name = 'python-dist-' + latest_version + '-' + bitness + '.zip' python_dist_name = f'python-dist-{latest_version}-{bitness}.zip'
download_if_not_exists(python_dist_name, latest_dist_url) download_if_not_exists(python_dist_name, latest_dist_url)
download_if_not_exists(visual_c_name, download_if_not_exists(visual_c_name,
@@ -203,7 +203,7 @@ and replaced with a .pth. Isolated mode will have to be specified manually.
log('Removing ._pth') log('Removing ._pth')
major_release = latest_version.split('.')[1] major_release = latest_version.split('.')[1]
os.remove(r'./python/python3' + major_release + '._pth') os.remove(rf'./python/python3{major_release}._pth')
log('Adding path_fixes.pth') log('Adding path_fixes.pth')
with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f: with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
@@ -214,7 +214,7 @@ with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
# Need to add the directory where packages are installed, # Need to add the directory where packages are installed,
# and the parent directory (which is where the yt-local files are) # and the parent directory (which is where the yt-local files are)
major_release = latest_version.split('.')[1] major_release = latest_version.split('.')[1]
with open('./python/python3' + major_release + '._pth', 'a', encoding='utf-8') as f: with open(rf'./python/python3{major_release}._pth', 'a', encoding='utf-8') as f:
f.write('.\\Lib\\site-packages\n') f.write('.\\Lib\\site-packages\n')
f.write('..\n')''' f.write('..\n')'''
@@ -255,10 +255,10 @@ log('Copying python distribution into release folder')
shutil.copytree(r'./python', r'./yt-local/python') shutil.copytree(r'./python', r'./yt-local/python')
# ----------- Create release zip ----------- # ----------- Create release zip -----------
output_filename = 'yt-local-' + release_tag + '-' + suffix + '.zip' output_filename = f'yt-local-{release_tag}-{suffix}.zip'
if os.path.exists('./' + output_filename): if os.path.exists(f'./{output_filename}'):
log('Removing previous zipped release') log('Removing previous zipped release')
os.remove('./' + output_filename) os.remove(f'./{output_filename}')
log('Zipping release') log('Zipping release')
check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local'])) check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local']))

View File

@@ -32,9 +32,9 @@ def youtu_be(env, start_response):
id = env['PATH_INFO'][1:] id = env['PATH_INFO'][1:]
env['PATH_INFO'] = '/watch' env['PATH_INFO'] = '/watch'
if not env['QUERY_STRING']: if not env['QUERY_STRING']:
env['QUERY_STRING'] = 'v=' + id env['QUERY_STRING'] = f'v={id}'
else: else:
env['QUERY_STRING'] += '&v=' + id env['QUERY_STRING'] += f'&v={id}'
yield from yt_app(env, start_response) yield from yt_app(env, start_response)
@@ -64,12 +64,12 @@ def proxy_site(env, start_response, video=False):
if 'HTTP_RANGE' in env: if 'HTTP_RANGE' in env:
send_headers['Range'] = env['HTTP_RANGE'] send_headers['Range'] = env['HTTP_RANGE']
url = "https://" + env['SERVER_NAME'] + env['PATH_INFO'] url = f"https://{env['SERVER_NAME']}{env['PATH_INFO']}"
# remove /name portion # remove /name portion
if video and '/videoplayback/name/' in url: if video and '/videoplayback/name/' in url:
url = url[0:url.rfind('/name/')] url = url[0:url.rfind('/name/')]
if env['QUERY_STRING']: if env['QUERY_STRING']:
url += '?' + env['QUERY_STRING'] url += f'?{env["QUERY_STRING"]}'
try_num = 1 try_num = 1
first_attempt = True first_attempt = True
@@ -96,7 +96,7 @@ def proxy_site(env, start_response, video=False):
+[('Access-Control-Allow-Origin', '*')]) +[('Access-Control-Allow-Origin', '*')])
if first_attempt: if first_attempt:
start_response(str(response.status) + ' ' + response.reason, start_response(f"{response.status} {response.reason}",
response_headers) response_headers)
content_length = int(dict(response_headers).get('Content-Length', 0)) content_length = int(dict(response_headers).get('Content-Length', 0))
@@ -136,9 +136,8 @@ def proxy_site(env, start_response, video=False):
fail_byte = start + total_received fail_byte = start + total_received
send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end) send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end)
print( print(
'Warning: YouTube closed the connection before byte', f'Warning: YouTube closed the connection before byte {fail_byte}. '
str(fail_byte) + '.', 'Expected', start+content_length, f'Expected {start+content_length} bytes.'
'bytes.'
) )
retry = True retry = True
@@ -185,7 +184,7 @@ def split_url(url):
# python STILL doesn't have a proper regular expression engine like grep uses built in... # python STILL doesn't have a proper regular expression engine like grep uses built in...
match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url) match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url)
if match is None: if match is None:
raise ValueError('Invalid or unsupported url: ' + url) raise ValueError(f'Invalid or unsupported url: {url}')
return match.group(1), match.group(2) return match.group(1), match.group(2)
@@ -238,7 +237,7 @@ def site_dispatch(env, start_response):
if base_name == '': if base_name == '':
base_name = domain base_name = domain
else: else:
base_name = domain + '.' + base_name base_name = f"{domain}.{base_name}"
try: try:
handler = site_handlers[base_name] handler = site_handlers[base_name]

View File

@@ -261,6 +261,17 @@ For security reasons, enabling this is not recommended.''',
'category': 'interface', 'category': 'interface',
}), }),
('native_player_storyboard', {
'type': bool,
'default': False,
'label': 'Storyboard preview (native)',
'comment': '''Show thumbnail preview on hover (native player modes).
Positioning is heuristic; may misalign in Firefox/Safari.
Works best on Chromium browsers.
No effect in Plyr.''',
'category': 'interface',
}),
('use_video_download', { ('use_video_download', {
'type': int, 'type': int,
'default': 0, 'default': 0,
@@ -386,14 +397,14 @@ acceptable_targets = SETTINGS_INFO.keys() | {
def comment_string(comment): def comment_string(comment):
result = '' result = ''
for line in comment.splitlines(): for line in comment.splitlines():
result += '# ' + line + '\n' result += f'# {line}\n'
return result return result
def save_settings(settings_dict): def save_settings(settings_dict):
with open(settings_file_path, 'w', encoding='utf-8') as file: with open(settings_file_path, 'w', encoding='utf-8') as file:
for setting_name, setting_info in SETTINGS_INFO.items(): for setting_name, setting_info in SETTINGS_INFO.items():
file.write(comment_string(setting_info['comment']) + setting_name + ' = ' + repr(settings_dict[setting_name]) + '\n\n') file.write(f"{comment_string(setting_info['comment'])}{setting_name} = {repr(settings_dict[setting_name])}\n\n")
def add_missing_settings(settings_dict): def add_missing_settings(settings_dict):
@@ -470,7 +481,7 @@ upgrade_functions = {
def log_ignored_line(line_number, message): def log_ignored_line(line_number, message):
print('WARNING: Ignoring settings.txt line ' + str(line_number) + ' (' + message + ')') print(f'WARNING: Ignoring settings.txt line {line_number} ({message})')
if os.path.isfile("settings.txt"): if os.path.isfile("settings.txt"):
@@ -524,7 +535,7 @@ else:
continue continue
if target.id not in acceptable_targets: if target.id not in acceptable_targets:
log_ignored_line(node.lineno, target.id + " is not a valid setting") log_ignored_line(node.lineno, f"{target.id} is not a valid setting")
continue continue
if type(node.value) not in attributes: if type(node.value) not in attributes:
@@ -634,6 +645,6 @@ def settings_page():
for func, old_value, value in to_call: for func, old_value, value in to_call:
func(old_value, value) func(old_value, value)
return flask.redirect(util.URL_ORIGIN + '/settings', 303) return flask.redirect(f'{util.URL_ORIGIN}/settings', 303)
else: else:
flask.abort(400) flask.abort(400)

View File

@@ -27,7 +27,7 @@ class TestChannelCtokenV5:
def _decode_outer(self, ctoken): def _decode_outer(self, ctoken):
"""Decode the outer protobuf layer of a ctoken.""" """Decode the outer protobuf layer of a ctoken."""
raw = base64.urlsafe_b64decode(ctoken + '==') raw = base64.urlsafe_b64decode(f'{ctoken}==')
return {fn: val for _, fn, val in proto.read_protobuf(raw)} return {fn: val for _, fn, val in proto.read_protobuf(raw)}
def test_shorts_token_generates_without_error(self): def test_shorts_token_generates_without_error(self):
@@ -68,8 +68,8 @@ class TestChannelCtokenV5:
assert t_with_shorts != t_without_shorts assert t_with_shorts != t_without_shorts
# Decode and verify the filter is present # Decode and verify the filter is present
raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==') raw_with_shorts = base64.urlsafe_b64decode(f'{t_with_shorts}==')
raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==') raw_without_shorts = base64.urlsafe_b64decode(f'{t_without_shorts}==')
# Parse the outer protobuf structure # Parse the outer protobuf structure
import youtube.proto as proto import youtube.proto as proto
@@ -95,8 +95,8 @@ class TestChannelCtokenV5:
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii')) decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
# Decode the base64 data # Decode the base64 data
decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==') decoded_with_bytes = base64.urlsafe_b64decode(f'{decoded_with}==')
decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==') decoded_without_bytes = base64.urlsafe_b64decode(f'{decoded_without}==')
# Parse the decoded protobuf data # Parse the decoded protobuf data
fields_with = list(proto.read_protobuf(decoded_with_bytes)) fields_with = list(proto.read_protobuf(decoded_with_bytes))

View File

@@ -76,7 +76,7 @@ theme_names = {
@yt_app.context_processor @yt_app.context_processor
def inject_theme_preference(): def inject_theme_preference():
return { return {
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css', 'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css',
'settings': settings, 'settings': settings,
# Detect version # Detect version
'current_version': app_version()['version'], 'current_version': app_version()['version'],
@@ -145,9 +145,9 @@ def error_page(e):
' exit node is overutilized. Try getting a new exit node by' ' exit node is overutilized. Try getting a new exit node by'
' using the New Identity button in the Tor Browser.') ' using the New Identity button in the Tor Browser.')
if fetch_err.error_message: if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message error_message += f'\n\n{fetch_err.error_message}'
if fetch_err.ip: if fetch_err.ip:
error_message += '\n\nExit node IP address: ' + fetch_err.ip error_message += f'\n\nExit node IP address: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif error_code == '429': elif error_code == '429':
@@ -157,7 +157,7 @@ def error_page(e):
'• Enable Tor routing in Settings for automatic IP rotation\n' '• Enable Tor routing in Settings for automatic IP rotation\n'
'• Use a VPN to change your IP address') '• Use a VPN to change your IP address')
if fetch_err.ip: if fetch_err.ip:
error_message += '\n\nYour IP: ' + fetch_err.ip error_message += f'\n\nYour IP: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 429 return flask.render_template('error.html', error_message=error_message, slim=slim), 429
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)): elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
@@ -179,7 +179,7 @@ def error_page(e):
# Catch-all for any other FetchError (400, etc.) # Catch-all for any other FetchError (400, etc.)
error_message = f'Error communicating with YouTube ({error_code}).' error_message = f'Error communicating with YouTube ({error_code}).'
if fetch_err.error_message: if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message error_message += f'\n\n{fetch_err.error_message}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', error_message=error_message, slim=slim), 502
return flask.render_template('error.html', traceback=traceback.format_exc(), return flask.render_template('error.html', traceback=traceback.format_exc(),

View File

@@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
# For now it seems to be constant for the API endpoint, not dependent # For now it seems to be constant for the API endpoint, not dependent
# on the browsing session or channel # on the browsing session or channel
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = { data = {
'context': { 'context': {
@@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id):
return 1000 return 1000
# Uploads playlist # Uploads playlist
playlist_id = 'UU' + channel_id[2:] playlist_id = f'UU{channel_id[2:]}'
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1'
try: try:
response = util.fetch_url(url, headers_mobile, response = util.fetch_url(url, headers_mobile,
@@ -328,7 +328,7 @@ def get_channel_id(base_url):
# method that gives the smallest possible response at ~4 kb # method that gives the smallest possible response at ~4 kb
# needs to be as fast as possible # needs to be as fast as possible
base_url = base_url.replace('https://www', 'https://m') # avoid redirect base_url = base_url.replace('https://www', 'https://m') # avoid redirect
response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile,
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
match = channel_id_re.search(response) match = channel_id_re.search(response)
if match: if match:
@@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page):
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = { data = {
'context': { 'context': {
@@ -414,18 +414,18 @@ def post_process_channel_info(info):
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
if channel_id: if channel_id:
base_url = 'https://www.youtube.com/channel/' + channel_id base_url = f'https://www.youtube.com/channel/{channel_id}'
# Build URL with sort parameter # Build URL with sort parameter
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
# Note: 'da' (oldest) was removed by YouTube in January 2026 # Note: 'da' (oldest) was removed by YouTube in January 2026
url = base_url + '/' + tab + '?pbj=1&view=0' url = f'{base_url}/{tab}?pbj=1&view=0'
if sort: if sort:
# Map sort values to YouTube's URL parameter values # Map sort values to YouTube's URL parameter values
sort_map = {'3': 'dd', '4': 'lad'} sort_map = {'3': 'dd', '4': 'lad'}
url += '&sort=' + sort_map.get(sort, 'dd') url += f'&sort={sort_map.get(sort, "dd")}'
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}')
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
@@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
if page_number == 1: if page_number == 1:
tasks = ( tasks = (
gevent.spawn(playlist.playlist_first_page, gevent.spawn(playlist.playlist_first_page,
'UU' + channel_id[2:], f'UU{channel_id[2:]}',
report_text='Retrieved channel videos'), report_text='Retrieved channel videos'),
gevent.spawn(get_metadata, channel_id), gevent.spawn(get_metadata, channel_id),
) )
@@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
set_cached_number_of_videos(channel_id, number_of_videos) set_cached_number_of_videos(channel_id, number_of_videos)
else: else:
tasks = ( tasks = (
gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}',
page_number, include_shorts=True), page_number, include_shorts=True),
gevent.spawn(get_metadata, channel_id), gevent.spawn(get_metadata, channel_id),
gevent.spawn(get_number_of_videos_channel, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id),
gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}',
report_text='Retrieved channel video count'), report_text='Retrieved channel video count'),
) )
gevent.joinall(tasks) gevent.joinall(tasks)
@@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
elif tab == 'search' and channel_id: elif tab == 'search' and channel_id:
polymer_json = get_channel_search_json(channel_id, query, page_number) polymer_json = get_channel_search_json(channel_id, query, page_number)
elif tab == 'search': elif tab == 'search':
url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}'
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
elif tab != 'videos': elif tab != 'videos':
flask.abort(404, 'Unknown channel tab: ' + tab) flask.abort(404, f'Unknown channel tab: {tab}')
if polymer_json is not None and info is None: if polymer_json is not None and info is None:
info = yt_data_extract.extract_channel_info( info = yt_data_extract.extract_channel_info(
@@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
return flask.render_template('error.html', error_message=info['error']) return flask.render_template('error.html', error_message=info['error'])
if channel_id: if channel_id:
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}'
info['channel_id'] = channel_id info['channel_id'] = channel_id
else: else:
channel_id = info['channel_id'] channel_id = info['channel_id']
@@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
@yt_app.route('/channel/<channel_id>/') @yt_app.route('/channel/<channel_id>/')
@yt_app.route('/channel/<channel_id>/<tab>') @yt_app.route('/channel/<channel_id>/<tab>')
def get_channel_page(channel_id, tab='videos'): def get_channel_page(channel_id, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id)
@yt_app.route('/user/<username>/') @yt_app.route('/user/<username>/')
@yt_app.route('/user/<username>/<tab>') @yt_app.route('/user/<username>/<tab>')
def get_user_page(username, tab='videos'): def get_user_page(username, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request)
@yt_app.route('/c/<custom>/') @yt_app.route('/c/<custom>/')
@yt_app.route('/c/<custom>/<tab>') @yt_app.route('/c/<custom>/<tab>')
def get_custom_c_page(custom, tab='videos'): def get_custom_c_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request)
@yt_app.route('/<custom>') @yt_app.route('/<custom>')
@yt_app.route('/<custom>/<tab>') @yt_app.route('/<custom>/<tab>')
def get_toplevel_custom_page(custom, tab='videos'): def get_toplevel_custom_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request)

View File

@@ -104,20 +104,19 @@ def post_process_comments_info(comments_info):
comment['replies_url'] = None comment['replies_url'] = None
comment['replies_url'] = concat_or_none( comment['replies_url'] = concat_or_none(
util.URL_ORIGIN, util.URL_ORIGIN,
'/comments?replies=1&ctoken=' + ctoken) f'/comments?replies=1&ctoken={ctoken}')
if reply_count == 0: if reply_count == 0:
comment['view_replies_text'] = 'Reply' comment['view_replies_text'] = 'Reply'
elif reply_count == 1: elif reply_count == 1:
comment['view_replies_text'] = '1 reply' comment['view_replies_text'] = '1 reply'
else: else:
comment['view_replies_text'] = str(reply_count) + ' replies' comment['view_replies_text'] = f'{reply_count} replies'
if comment['approx_like_count'] == '1': if comment['approx_like_count'] == '1':
comment['likes_text'] = '1 like' comment['likes_text'] = '1 like'
else: else:
comment['likes_text'] = (str(comment['approx_like_count']) comment['likes_text'] = f"{comment['approx_like_count']} likes"
+ ' likes')
comments_info['include_avatars'] = settings.enable_comment_avatars comments_info['include_avatars'] = settings.enable_comment_avatars
if comments_info['ctoken']: if comments_info['ctoken']:
@@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
comments_info = {'error': None} comments_info = {'error': None}
try: try:
other_sort_url = ( other_sort_url = (
util.URL_ORIGIN + '/comments?ctoken=' f"{util.URL_ORIGIN}/comments?ctoken="
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc) f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}"
) )
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top') other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}'
this_sort_url = (util.URL_ORIGIN this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken="
+ '/comments?ctoken=' f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}")
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
comments_info['comment_links'] = [ comments_info['comment_links'] = [
(other_sort_text, other_sort_url), (other_sort_text, other_sort_url),
@@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if e.code == '429' and settings.route_tor: if e.code == '429' and settings.route_tor:
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.' comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
if e.error_message: if e.error_message:
comments_info['error'] += '\n\n' + e.error_message comments_info['error'] += f'\n\n{e.error_message}'
comments_info['error'] += '\n\nExit node IP address: %s' % e.ip comments_info['error'] += f'\n\nExit node IP address: {e.ip}'
else: else:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) comments_info['error'] = f'YouTube blocked the request. Error: {e}'
except Exception as e: except Exception as e:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) comments_info['error'] = f'YouTube blocked the request. Error: {e}'
if comments_info.get('error'): if comments_info.get('error'):
print('Error retrieving comments for ' + str(video_id) + ':\n' + print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}')
comments_info['error'])
return comments_info return comments_info
@@ -218,12 +215,10 @@ def get_comments_page():
other_sort_url = None other_sort_url = None
else: else:
other_sort_url = ( other_sort_url = (
util.URL_ORIGIN f'{util.URL_ORIGIN}/comments?ctoken='
+ '/comments?ctoken=' f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}'
+ make_comment_ctoken(comments_info['video_id'],
sort=1-comments_info['sort'])
) )
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top') other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}'
comments_info['comment_links'] = [(other_sort_text, other_sort_url)] comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
return flask.render_template( return flask.render_template(

View File

@@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name):
util.add_extra_html_info(video) util.add_extra_html_info(video)
if video['id'] + '.jpg' in thumbnails: if video['id'] + '.jpg' in thumbnails:
video['thumbnail'] = ( video['thumbnail'] = (
'/https://youtube.com/data/playlist_thumbnails/' f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg')
+ playlist_name
+ '/' + video['id'] + '.jpg')
else: else:
video['thumbnail'] = util.get_thumbnail_url(video['id']) video['thumbnail'] = util.get_thumbnail_url(video['id'])
missing_thumbnails.append(video['id']) missing_thumbnails.append(video['id'])

View File

@@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
continuation_info = proto.string(3, proto.percent_b64encode(offset)) continuation_info = proto.string(3, proto.percent_b64encode(offset))
playlist_id = proto.string(2, 'VL' + playlist_id) playlist_id = proto.string(2, f'VL{playlist_id}')
pointless_nest = proto.string(80226972, playlist_id + continuation_info) pointless_nest = proto.string(80226972, playlist_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii') return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
@@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
use_mobile=False): use_mobile=False):
# Use innertube API (pbj=1 no longer works for many playlists) # Use innertube API (pbj=1 no longer works for many playlists)
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = { data = {
'context': { 'context': {
@@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
'clientVersion': '2.20240327.00.00', 'clientVersion': '2.20240327.00.00',
}, },
}, },
'browseId': 'VL' + playlist_id, 'browseId': f'VL{playlist_id}',
} }
content_type_header = (('Content-Type', 'application/json'),) content_type_header = (('Content-Type', 'application/json'),)
@@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
page_size = 100 page_size = 100
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size, ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
include_shorts=include_shorts) include_shorts=include_shorts)
@@ -97,7 +97,7 @@ def get_playlist_page():
if playlist_id.startswith('RD'): if playlist_id.startswith('RD'):
first_video_id = playlist_id[2:] # video ID after 'RD' prefix first_video_id = playlist_id[2:] # video ID after 'RD' prefix
return flask.redirect( return flask.redirect(
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id, f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}',
302 302
) )
@@ -132,9 +132,9 @@ def get_playlist_page():
if 'id' in item and not item.get('thumbnail'): if 'id' in item and not item.get('thumbnail'):
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg" item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
item['url'] += '&list=' + playlist_id item['url'] += f'&list={playlist_id}'
if item['index']: if item['index']:
item['url'] += '&index=' + str(item['index']) item['url'] += f'&index={item["index"]}'
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count') video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
if video_count is None: if video_count is None:

View File

@@ -76,7 +76,7 @@ def read_varint(data):
except IndexError: except IndexError:
if i == 0: if i == 0:
raise EOFError() raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i result |= (byte & 127) << 7*i
if not byte & 128: if not byte & 128:
break break
@@ -118,7 +118,7 @@ def read_protobuf(data):
elif wire_type == 5: elif wire_type == 5:
value = data.read(4) value = data.read(4)
else: else:
raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell())) raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}")
yield (wire_type, field_number, value) yield (wire_type, field_number, value)
@@ -170,8 +170,7 @@ def _make_protobuf(data):
elif field[0] == 2: elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2])) result += string(field[1], _make_protobuf(field[2]))
else: else:
raise NotImplementedError('Wire type ' + str(field[0]) raise NotImplementedError(f'Wire type {field[0]} not implemented')
+ ' not implemented')
return result return result
return data return data
@@ -218,4 +217,4 @@ def b64_to_bytes(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
data = data.replace("%3D", "=") data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')

View File

@@ -179,7 +179,7 @@ def read_varint(data):
except IndexError: except IndexError:
if i == 0: if i == 0:
raise EOFError() raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i result |= (byte & 127) << 7*i
if not byte & 128: if not byte & 128:
break break
@@ -235,8 +235,7 @@ def _make_protobuf(data):
elif field[0] == 2: elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2])) result += string(field[1], _make_protobuf(field[2]))
else: else:
raise NotImplementedError('Wire type ' + str(field[0]) raise NotImplementedError(f'Wire type {field[0]} not implemented')
+ ' not implemented')
return result return result
return data return data
@@ -286,7 +285,7 @@ def b64_to_bytes(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
data = data.replace("%3D", "=") data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
# -------------------------------------------------------------------- # --------------------------------------------------------------------
@@ -344,7 +343,7 @@ fromhex = bytes.fromhex
def aligned_ascii(data): def aligned_ascii(data):
return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data) return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data)
def parse_protobuf(data, mutable=False, spec=()): def parse_protobuf(data, mutable=False, spec=()):
@@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()):
elif wire_type == 5: elif wire_type == 5:
value = data.read(4) value = data.read(4)
else: else:
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell())) raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}")
if mutable: if mutable:
yield [wire_type, field_number, value] yield [wire_type, field_number, value]
else: else:
@@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None):
if map01 is not None: if map01 is not None:
map01 = _bytes_from_decode_data(map01) map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01) assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b'01', b'O' + map01)) s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}'))
if casefold: if casefold:
s = s.upper() s = s.upper()
# Strip off pad characters from the right. We need to count the pad # Strip off pad characters from the right. We need to count the pad
@@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None):
def dec32(data): def dec32(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
return b32decode(data + "="*((8 - len(data)%8)%8)) return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}')
_patterns = [ _patterns = [
@@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work
if len(obj) == 3: # (wire_type, field_number, data) if len(obj) == 3: # (wire_type, field_number, data)
return obj.__repr__() return obj.__repr__()
else: # (base64, [...]) else: # (base64, [...])
return ('(' + obj[0].__repr__() + ',\n' return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)"
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
+ ')')
elif isinstance(obj, list): elif isinstance(obj, list):
# [wire_type, field_number, data] # [wire_type, field_number, data]
if (len(obj) == 3 if (len(obj) == 3
@@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work
elif (len(obj) == 3 elif (len(obj) == 3
and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
): ):
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]"
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
+ ']')
else: else:
s = '[\n' s = '[\n'
for x in obj: for x in obj:
s += indent_lines(_pp(x, indent), indent) + ',\n' s += f"{indent_lines(_pp(x, indent), indent)},\n"
s += ']' s += ']'
return s return s
else: else:

View File

@@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Name': '1', 'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418', 'X-YouTube-Client-Version': '2.20180418',
} }
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D") url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}"
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results') content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
info = json.loads(content) info = json.loads(content)
return info return info

View File

@@ -9,6 +9,8 @@
--thumb-background: #222222; --thumb-background: #222222;
--link: #00B0FF; --link: #00B0FF;
--link-visited: #40C4FF; --link-visited: #40C4FF;
--border-color: #333333;
--thead-background: #0a0a0b;
--border-bg: #222222; --border-bg: #222222;
--border-bg-settings: #000000; --border-bg-settings: #000000;
--border-bg-license: #000000; --border-bg-license: #000000;

View File

@@ -9,6 +9,8 @@
--thumb-background: #35404D; --thumb-background: #35404D;
--link: #22AAFF; --link: #22AAFF;
--link-visited: #7755FF; --link-visited: #7755FF;
--border-color: #4A5568;
--thead-background: #1a2530;
--border-bg: #FFFFFF; --border-bg: #FFFFFF;
--border-bg-settings: #FFFFFF; --border-bg-settings: #FFFFFF;
--border-bg-license: #FFFFFF; --border-bg-license: #FFFFFF;

View File

@@ -9,6 +9,8 @@
--thumb-background: #F5F5F5; --thumb-background: #F5F5F5;
--link: #212121; --link: #212121;
--link-visited: #808080; --link-visited: #808080;
--border-color: #CCCCCC;
--thead-background: #d0d0d0;
--border-bg: #212121; --border-bg: #212121;
--border-bg-settings: #91918C; --border-bg-settings: #91918C;
--border-bg-license: #91918C; --border-bg-license: #91918C;

View File

@@ -307,18 +307,122 @@ figure.sc-video {
padding-top: 0.5rem; padding-top: 0.5rem;
padding-bottom: 0.5rem; padding-bottom: 0.5rem;
} }
.v-download { grid-area: v-download; } .v-download {
.v-download > ul.download-dropdown-content { grid-area: v-download;
background: var(--secondary-background); margin-bottom: 0.5rem;
padding-left: 0px;
} }
.v-download > ul.download-dropdown-content > li.download-format { .v-download details {
list-style: none; display: block;
width: 100%;
}
.v-download > summary {
cursor: pointer;
padding: 0.4rem 0; padding: 0.4rem 0;
padding-left: 1rem; padding-left: 1rem;
} }
.v-download > ul.download-dropdown-content > li.download-format a.download-link { .v-download > summary.download-dropdown-label {
cursor: pointer;
-webkit-touch-callout: none;
-webkit-user-select: none;
-khtml-user-select: none;
-moz-user-select: none;
-ms-user-select: none;
user-select: none;
padding-bottom: 6px;
padding-left: .75em;
padding-right: .75em;
padding-top: 6px;
text-align: center;
white-space: nowrap;
background-color: var(--buttom);
border: 1px solid var(--button-border);
color: var(--buttom-text);
border-radius: 5px;
margin-bottom: 0.5rem;
}
.v-download > summary.download-dropdown-label:hover {
background-color: var(--buttom-hover);
}
.v-download > .download-table-container {
background: var(--secondary-background);
max-height: 65vh;
overflow-y: auto;
border: 1px solid var(--button-border);
border-radius: 8px;
box-shadow: 0 4px 12px rgba(0,0,0,0.15);
}
.download-table {
width: 100%;
border-collapse: separate;
border-spacing: 0;
font-size: 0.875rem;
}
.download-table thead {
background: var(--thead-background);
position: sticky;
top: 0;
z-index: 1;
}
.download-table th,
.download-table td {
padding: 0.7rem 0.9rem;
text-align: left;
border-bottom: 1px solid var(--button-border);
}
.download-table th {
font-weight: 600;
font-size: 0.7rem;
text-transform: uppercase;
letter-spacing: 0.8px;
}
.download-table tbody tr {
transition: all 0.2s ease;
}
.download-table tbody tr:hover {
background: var(--primary-background);
}
.download-table a.download-link {
display: inline-block;
padding: 0.4rem 0.85rem;
background: rgba(0,0,0,0.12);
color: var(--buttom-text);
text-decoration: none; text-decoration: none;
border-radius: 5px;
font-weight: 500;
font-size: 0.85rem;
transition: background 0.2s ease;
white-space: nowrap;
}
.download-table a.download-link:hover {
background: rgba(0,0,0,0.28);
color: var(--buttom-text);
}
.download-table tbody tr:last-child td {
border-bottom: none;
}
.download-table td[data-label="Ext"] {
font-family: monospace;
font-size: 0.8rem;
font-weight: 600;
}
.download-table td[data-label="Link"] {
white-space: nowrap;
vertical-align: middle;
}
.download-table td[data-label="Codecs"] {
max-width: 180px;
text-overflow: ellipsis;
overflow: hidden;
font-family: monospace;
font-size: 0.75rem;
}
.download-table td[data-label="Size"] {
font-family: monospace;
font-size: 0.85rem;
}
.download-table td[colspan="3"] {
font-style: italic;
opacity: 0.7;
} }
.v-description { .v-description {

View File

@@ -126,7 +126,7 @@ def delete_thumbnails(to_delete):
os.remove(os.path.join(thumbnails_directory, thumbnail)) os.remove(os.path.join(thumbnails_directory, thumbnail))
existing_thumbnails.remove(video_id) existing_thumbnails.remove(video_id)
except Exception: except Exception:
print('Failed to delete thumbnail: ' + thumbnail) print(f'Failed to delete thumbnail: {thumbnail}')
traceback.print_exc() traceback.print_exc()
@@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None):
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]), 'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'author': db_video[5], 'author': db_video[5],
'author_id': db_video[6], 'author_id': db_video[6],
'author_url': '/https://www.youtube.com/channel/' + db_video[6], 'author_url': f'/https://www.youtube.com/channel/{db_video[6]}',
}) })
return videos, pseudo_number_of_videos return videos, pseudo_number_of_videos
@@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time):
if delta >= unit_time: if delta >= unit_time:
quantifier = round(delta/unit_time) quantifier = round(delta/unit_time)
if quantifier == 1: if quantifier == 1:
return '1 ' + unit_name + ' ago' return f'1 {unit_name} ago'
else: else:
return str(quantifier) + ' ' + unit_name + 's ago' return f'{quantifier} {unit_name}s ago'
else: else:
raise Exception() raise Exception()
@@ -363,7 +363,7 @@ def autocheck_dispatcher():
time_until_earliest_job = earliest_job['next_check_time'] - time.time() time_until_earliest_job = earliest_job['next_check_time'] - time.time()
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}')
next_check_time = time.time() + 3600*secrets.randbelow(60)/60 next_check_time = time.time() + 3600*secrets.randbelow(60)/60
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
@@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids):
def _get_atoma_feed(channel_id): def _get_atoma_feed(channel_id):
url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}'
try: try:
return util.fetch_url(url).decode('utf-8') return util.fetch_url(url).decode('utf-8')
except util.FetchError as e: except util.FetchError as e:
@@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name):
return channel_info return channel_info
except util.FetchError as e: except util.FetchError as e:
if e.code == '429' and settings.route_tor: if e.code == '429' and settings.route_tor:
error_message = ('Error checking channel ' + channel_status_name error_message = (f'Error checking channel {channel_status_name}: '
+ ': YouTube blocked the request because the' f'YouTube blocked the request because the Tor exit node is overutilized. '
+ ' Tor exit node is overutilized. Try getting a new exit node' f'Try getting a new exit node by using the New Identity button in the Tor Browser.')
+ ' by using the New Identity button in the Tor Browser.')
if e.ip: if e.ip:
error_message += ' Exit node IP address: ' + e.ip error_message += f' Exit node IP address: {e.ip}'
print(error_message) print(error_message)
return None return None
elif e.code == '502': elif e.code == '502':
print('Error checking channel', channel_status_name + ':', str(e)) print(f'Error checking channel {channel_status_name}: {e}')
return None return None
raise raise
@@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id):
except KeyError: except KeyError:
channel_status_name = channel_id channel_status_name = channel_id
print("Checking channel: " + channel_status_name) print(f"Checking channel: {channel_status_name}")
tasks = ( tasks = (
# channel page, need for video duration # channel page, need for video duration
@@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id):
times_published[video_id_element.text] = time_published times_published[video_id_element.text] = time_published
except ValueError: except ValueError:
print('Failed to read atoma feed for ' + channel_status_name) print(f'Failed to read atoma feed for {channel_status_name}')
traceback.print_exc() traceback.print_exc()
except defusedxml.ElementTree.ParseError: except defusedxml.ElementTree.ParseError:
print('Failed to read atoma feed for ' + channel_status_name) print(f'Failed to read atoma feed for {channel_status_name}')
if channel_info is None: # there was an error if channel_info is None: # there was an error
return return
if channel_info['error']: if channel_info['error']:
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error']) print(f'Error checking channel {channel_status_name}: {channel_info["error"]}')
return return
videos = channel_info['items'] videos = channel_info['items']
@@ -1023,7 +1022,7 @@ def get_subscriptions_page():
tag = request.args.get('tag', None) tag = request.args.get('tag', None)
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag) videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
for video in videos: for video in videos:
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg' video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg'
video['type'] = 'video' video['type'] = 'video'
video['item_size'] = 'small' video['item_size'] = 'small'
util.add_extra_html_info(video) util.add_extra_html_info(video)
@@ -1033,7 +1032,7 @@ def get_subscriptions_page():
subscription_list = [] subscription_list = []
for channel_name, channel_id, muted in _get_subscribed_channels(cursor): for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
subscription_list.append({ subscription_list.append({
'channel_url': util.URL_ORIGIN + '/channel/' + channel_id, 'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}',
'channel_name': channel_name, 'channel_name': channel_name,
'channel_id': channel_id, 'channel_id': channel_id,
'muted': muted, 'muted': muted,
@@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}" url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
try: try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id) image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}")
break break
except util.FetchError as e: except util.FetchError as e:
if '404' in str(e): if '404' in str(e):
continue continue
print("Failed to download thumbnail for " + video_id + ": " + str(e)) print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(500) flask.abort(500)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
if e.code == 404: if e.code == 404:
continue continue
print("Failed to download thumbnail for " + video_id + ": " + str(e)) print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(e.code) flask.abort(e.code)
if image is None: if image is None:

View File

@@ -105,5 +105,10 @@
{% if use_dash %} {% if use_dash %}
<script src="/youtube.com/static/js/av-merge.js"></script> <script src="/youtube.com/static/js/av-merge.js"></script>
{% endif %} {% endif %}
<!-- Storyboard Preview Thumbnails (native players only; Plyr handles this internally) -->
{% if settings.use_video_player != 2 and settings.native_player_storyboard %}
<script src="/youtube.com/static/js/storyboard-preview.js"></script>
{% endif %}
</body> </body>
</html> </html>

View File

@@ -102,22 +102,40 @@
{% if settings.use_video_download != 0 %} {% if settings.use_video_download != 0 %}
<details class="v-download"> <details class="v-download">
<summary class="download-dropdown-label">{{ _('Download') }}</summary> <summary class="download-dropdown-label">{{ _('Download') }}</summary>
<ul class="download-dropdown-content"> <div class="download-table-container">
<table class="download-table" aria-label="Download formats">
<thead>
<tr>
<th scope="col">{{ _('Ext') }}</th>
<th scope="col">{{ _('Video') }}</th>
<th scope="col">{{ _('Audio') }}</th>
<th scope="col">{{ _('Size') }}</th>
<th scope="col">{{ _('Codecs') }}</th>
<th scope="col">{{ _('Link') }}</th>
</tr>
</thead>
<tbody>
{% for format in download_formats %} {% for format in download_formats %}
<li class="download-format"> <tr>
<a class="download-link" href="{{ format['url'] }}" download="{{ title }}.{{ format['ext'] }}"> <td data-label="{{ _('Ext') }}">{{ format['ext'] }}</td>
{{ format['ext'] }} {{ format['video_quality'] }} {{ format['audio_quality'] }} {{ format['file_size'] }} {{ format['codecs'] }} <td data-label="{{ _('Video') }}">{{ format['video_quality'] }}</td>
</a> <td data-label="{{ _('Audio') }}">{{ format['audio_quality'] }}</td>
</li> <td data-label="{{ _('Size') }}">{{ format['file_size'] }}</td>
<td data-label="{{ _('Codecs') }}">{{ format['codecs'] }}</td>
<td data-label="{{ _('Link') }}"><a class="download-link" href="{{ format['url'] }}" download="{{ title }}.{{ format['ext'] }}" aria-label="{{ _('Download') }} {{ format['ext'] }} {{ format['video_quality'] }} {{ format['audio_quality'] }}">{{ _('Download') }}</a></td>
</tr>
{% endfor %} {% endfor %}
{% for download in other_downloads %} {% for download in other_downloads %}
<li class="download-format"> <tr>
<a href="{{ download['url'] }}" download> <td data-label="{{ _('Ext') }}">{{ download['ext'] }}</td>
{{ download['ext'] }} {{ download['label'] }} <td data-label="{{ _('Video') }}" colspan="3">{{ download['label'] }}</td>
</a> <td data-label="{{ _('Codecs') }}">{{ download.get('codecs', 'N/A') }}</td>
</li> <td data-label="{{ _('Link') }}"><a class="download-link" href="{{ download['url'] }}" download aria-label="{{ _('Download') }} {{ download['label'] }}">{{ _('Download') }}</a></td>
</tr>
{% endfor %} {% endfor %}
</ul> </tbody>
</table>
</div>
</details> </details>
{% else %} {% else %}
<span class="v-download"></span> <span class="v-download"></span>
@@ -304,8 +322,8 @@
<!-- /plyr --> <!-- /plyr -->
{% endif %} {% endif %}
<!-- Storyboard Preview Thumbnails --> <!-- Storyboard Preview Thumbnails (native players only; Plyr handles this internally) -->
{% if settings.use_video_player != 2 %} {% if settings.use_video_player != 2 and settings.native_player_storyboard %}
<script src="/youtube.com/static/js/storyboard-preview.js"></script> <script src="/youtube.com/static/js/storyboard-preview.js"></script>
{% endif %} {% endif %}

View File

@@ -72,7 +72,7 @@ class TorManager:
def __init__(self): def __init__(self):
self.old_tor_connection_pool = None self.old_tor_connection_pool = None
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED') cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic() self.tor_pool_refresh_time = time.monotonic()
settings.add_setting_changed_hook( settings.add_setting_changed_hook(
@@ -92,7 +92,7 @@ class TorManager:
self.old_tor_connection_pool = self.tor_connection_pool self.old_tor_connection_pool = self.tor_connection_pool
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED') cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic() self.tor_pool_refresh_time = time.monotonic()
@@ -198,9 +198,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
class FetchError(Exception): class FetchError(Exception):
def __init__(self, code, reason='', ip=None, error_message=None): def __init__(self, code, reason='', ip=None, error_message=None):
if error_message: if error_message:
string = code + ' ' + reason + ': ' + error_message string = f"{code} {reason}: {error_message}"
else: else:
string = 'HTTP error during request: ' + code + ' ' + reason string = f"HTTP error during request: {code} {reason}"
Exception.__init__(self, string) Exception.__init__(self, string)
self.code = code self.code = code
self.reason = reason self.reason = reason
@@ -294,14 +294,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
exception_cause = e.__context__.__context__ exception_cause = e.__context__.__context__
if (isinstance(exception_cause, socks.ProxyConnectionError) if (isinstance(exception_cause, socks.ProxyConnectionError)
and settings.route_tor): and settings.route_tor):
msg = ('Failed to connect to Tor. Check that Tor is open and ' msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}'
'that your internet connection is working.\n\n'
+ str(e))
raise FetchError('502', reason='Bad Gateway', raise FetchError('502', reason='Bad Gateway',
error_message=msg) error_message=msg)
elif isinstance(e.__context__, elif isinstance(e.__context__,
urllib3.exceptions.NewConnectionError): urllib3.exceptions.NewConnectionError):
msg = 'Failed to establish a connection.\n\n' + str(e) msg = f'Failed to establish a connection.\n\n{e}'
raise FetchError( raise FetchError(
'502', reason='Bad Gateway', '502', reason='Bad Gateway',
error_message=msg) error_message=msg)
@@ -391,7 +389,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
if error: if error:
raise FetchError( raise FetchError(
'429', reason=response.reason, ip=ip, '429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error) error_message=f'Automatic circuit change: {error}')
continue # retry with new identity continue # retry with new identity
# Check for client errors (400, 404) - don't retry these # Check for client errors (400, 404) - don't retry these
@@ -467,10 +465,7 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
headers = {'User-Agent': 'Python-urllib'} headers = {'User-Agent': 'Python-urllib'}
response = pool.request('HEAD', url, headers=headers, retries=retries) response = pool.request('HEAD', url, headers=headers, retries=retries)
if report_text: if report_text:
print( print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}')
report_text,
' Latency:',
round(time.monotonic() - start_time, 3))
return response return response
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
@@ -544,16 +539,16 @@ def download_thumbnail(save_directory, video_id):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f'https://i.ytimg.com/vi/{video_id}/{quality}' url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
try: try:
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id) thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}')
except FetchError as e: except FetchError as e:
if '404' in str(e): if '404' in str(e):
continue continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) print(f'Failed to download thumbnail for {video_id}: {e}')
return False return False
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
if e.code == 404: if e.code == 404:
continue continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) print(f'Failed to download thumbnail for {video_id}: {e}')
return False return False
try: try:
with open(save_location, 'wb') as f: with open(save_location, 'wb') as f:
@@ -563,7 +558,7 @@ def download_thumbnail(save_directory, video_id):
with open(save_location, 'wb') as f: with open(save_location, 'wb') as f:
f.write(thumbnail) f.write(thumbnail)
return True return True
print('No thumbnail available for ' + video_id) print(f'No thumbnail available for {video_id}')
return False return False
@@ -698,7 +693,7 @@ def prefix_urls(item):
def add_extra_html_info(item): def add_extra_html_info(item):
if item['type'] == 'video': if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None
video_info = {} video_info = {}
for key in ('id', 'title', 'author', 'duration', 'author_id'): for key in ('id', 'title', 'author', 'duration', 'author_id'):
@@ -721,7 +716,7 @@ def add_extra_html_info(item):
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id']) item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
if item.get('author_id') and 'author_url' not in item: if item.get('author_id') and 'author_url' not in item:
item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id'] item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}'
def check_gevent_exceptions(*tasks): def check_gevent_exceptions(*tasks):
@@ -967,7 +962,7 @@ def call_youtube_api(client, api, data):
user_agent = context['client'].get('userAgent') or mobile_user_agent user_agent = context['client'].get('userAgent') or mobile_user_agent
visitor_data = get_visitor_data() visitor_data = get_visitor_data()
url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key url = f'https://{host}/youtubei/v1/{api}?key={key}'
if visitor_data: if visitor_data:
context['client'].update({'visitorData': visitor_data}) context['client'].update({'visitorData': visitor_data})
data['context'] = context data['context'] = context
@@ -978,8 +973,8 @@ def call_youtube_api(client, api, data):
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data )) headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
response = fetch_url( response = fetch_url(
url, data=data, headers=headers, url, data=data, headers=headers,
debug_name='youtubei_' + api + '_' + client, debug_name=f'youtubei_{api}_{client}',
report_text='Fetched ' + client + ' youtubei ' + api report_text=f'Fetched {client} youtubei {api}'
).decode('utf-8') ).decode('utf-8')
return response return response

View File

@@ -1,3 +1,3 @@
from __future__ import unicode_literals from __future__ import unicode_literals
__version__ = 'v0.4.5' __version__ = 'v0.5.0'

View File

@@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and fmt['vcodec']: if fmt['acodec'] and fmt['vcodec']:
if fmt.get('audio_track_is_default', True) is False: if fmt.get('audio_track_is_default', True) is False:
continue continue
source = {'type': 'video/' + fmt['ext'], source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)} 'quality_string': short_video_quality_string(fmt)}
source['quality_string'] += ' (integrated)' source['quality_string'] += ' (integrated)'
source.update(fmt) source.update(fmt)
@@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']): if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
if fmt['bitrate']: if fmt['bitrate']:
fmt['audio_bitrate'] = int(fmt['bitrate']/1000) fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
source = {'type': 'audio/' + fmt['ext'], source = {'type': f"audio/{fmt['ext']}",
'quality_string': audio_quality_string(fmt)} 'quality_string': audio_quality_string(fmt)}
source.update(fmt) source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"' source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
tid = fmt.get('audio_track_id') or 'default' tid = fmt.get('audio_track_id') or 'default'
if tid not in audio_by_track: if tid not in audio_by_track:
audio_by_track[tid] = { audio_by_track[tid] = {
@@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution):
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')): elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
if codec_name(fmt['vcodec']) == 'unknown': if codec_name(fmt['vcodec']) == 'unknown':
continue continue
source = {'type': 'video/' + fmt['ext'], source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)} 'quality_string': short_video_quality_string(fmt)}
source.update(fmt) source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"' source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
quality = str(fmt['quality']) + 'p' + str(fmt['fps']) quality = f"{fmt['quality']}p{fmt['fps']}"
video_only_sources.setdefault(quality, []).append(source) video_only_sources.setdefault(quality, []).append(source)
audio_tracks = [] audio_tracks = []
@@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution):
def video_rank(src): def video_rank(src):
''' Sort by settings preference. Use file size as tiebreaker ''' ''' Sort by settings preference. Use file size as tiebreaker '''
setting_name = 'codec_rank_' + codec_name(src['vcodec']) setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
return (settings.current_settings_dict[setting_name], return (settings.current_settings_dict[setting_name],
src['file_size']) src['file_size'])
pair_info['videos'].sort(key=video_rank) pair_info['videos'].sort(key=video_rank)
@@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
if auto: if auto:
label += ' (Automatic)' label += ' (Automatic)'
if trans_lang: if trans_lang:
label += ' -> ' + trans_lang label += f' -> {trans_lang}'
# Try to use Android caption URL directly (no PO Token needed) # Try to use Android caption URL directly (no PO Token needed)
caption_url = None caption_url = None
@@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
else: else:
caption_url += '&fmt=vtt' caption_url += '&fmt=vtt'
if trans_lang: if trans_lang:
caption_url += '&tlang=' + trans_lang caption_url += f'&tlang={trans_lang}'
url = util.prefix_url(caption_url) url = util.prefix_url(caption_url)
else: else:
# Fallback to old method # Fallback to old method
@@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id):
player_name = info['player_name'] player_name = info['player_name']
if player_name in decrypt_cache: if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name) print(f'Using cached decryption function for: {player_name}')
info['decryption_function'] = decrypt_cache[player_name] info['decryption_function'] = decrypt_cache[player_name]
else: else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
base_js = base_js.decode('utf-8') base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js) err = yt_data_extract.extract_decryption_function(info, base_js)
if err: if err:
@@ -387,11 +387,11 @@ def fetch_player_response(client, video_id):
def fetch_watch_page_info(video_id, playlist_id, index): def fetch_watch_page_info(video_id, playlist_id, index):
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial # bpctr=9999999999 will bypass are-you-sure dialogs for controversial
# videos # videos
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999' url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
if playlist_id: if playlist_id:
url += '&list=' + playlist_id url += f'&list={playlist_id}'
if index: if index:
url += '&index=' + index url += f'&index={index}'
headers = ( headers = (
('Accept', '*/*'), ('Accept', '*/*'),
@@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS audio tracks for proxy access # Register HLS audio tracks for proxy access
added = 0 added = 0
for lang, track in info['hls_audio_tracks'].items(): for lang, track in info['hls_audio_tracks'].items():
ck = video_id + '_' + lang ck = f"{video_id}_{lang}"
from youtube.hls_cache import register_track from youtube.hls_cache import register_track
register_track(ck, track['hls_url'], register_track(ck, track['hls_url'],
video_id=video_id, track_id=lang) video_id=video_id, track_id=lang)
@@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'audio_track_id': lang, 'audio_track_id': lang,
'audio_track_name': track['name'], 'audio_track_name': track['name'],
'audio_track_is_default': track['is_default'], 'audio_track_is_default': track['is_default'],
'itag': 'hls_' + lang, 'itag': f'hls_{lang}',
'ext': 'mp4', 'ext': 'mp4',
'audio_bitrate': 128, 'audio_bitrate': 128,
'bitrate': 128000, 'bitrate': 128000,
@@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'fps': None, 'fps': None,
'init_range': {'start': 0, 'end': 0}, 'init_range': {'start': 0, 'end': 0},
'index_range': {'start': 0, 'end': 0}, 'index_range': {'start': 0, 'end': 0},
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck), 'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
's': None, 's': None,
'sp': None, 'sp': None,
'quality': None, 'quality': None,
@@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS manifest for proxying # Register HLS manifest for proxying
if info['hls_manifest_url']: if info['hls_manifest_url']:
ck = video_id + '_video' ck = f"{video_id}_video"
from youtube.hls_cache import register_track from youtube.hls_cache import register_track
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video') register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
# Use proxy URL instead of direct Google Video URL # Use proxy URL instead of direct Google Video URL
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck) info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
# Fallback to 'ios' if no valid URLs are found # Fallback to 'ios' if no valid URLs are found
if not info.get('formats') or info.get('player_urls_missing'): if not info.get('formats') or info.get('player_urls_missing'):
@@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
if info.get('formats'): if info.get('formats'):
decryption_error = decrypt_signatures(info, video_id) decryption_error = decrypt_signatures(info, video_id)
if decryption_error: if decryption_error:
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
# check if urls ready (non-live format) in former livestream # check if urls ready (non-live format) in former livestream
# urls not ready if all of them have no filesize # urls not ready if all of them have no filesize
@@ -623,9 +623,9 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
def video_quality_string(format): def video_quality_string(format):
if format['vcodec']: if format['vcodec']:
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?') result = f"{format['width'] or '?'}x{format['height'] or '?'}"
if format['fps']: if format['fps']:
result += ' ' + str(format['fps']) + 'fps' result += f" {format['fps']}fps"
return result return result
elif format['acodec']: elif format['acodec']:
return 'audio only' return 'audio only'
@@ -634,7 +634,7 @@ def video_quality_string(format):
def short_video_quality_string(fmt): def short_video_quality_string(fmt):
result = str(fmt['quality'] or '?') + 'p' result = f"{fmt['quality'] or '?'}p"
if fmt['fps']: if fmt['fps']:
result += str(fmt['fps']) result += str(fmt['fps'])
if fmt['vcodec'].startswith('av01'): if fmt['vcodec'].startswith('av01'):
@@ -642,18 +642,18 @@ def short_video_quality_string(fmt):
elif fmt['vcodec'].startswith('avc'): elif fmt['vcodec'].startswith('avc'):
result += ' h264' result += ' h264'
else: else:
result += ' ' + fmt['vcodec'] result += f" {fmt['vcodec']}"
return result return result
def audio_quality_string(fmt): def audio_quality_string(fmt):
if fmt['acodec']: if fmt['acodec']:
if fmt['audio_bitrate']: if fmt['audio_bitrate']:
result = '%d' % fmt['audio_bitrate'] + 'k' result = f"{fmt['audio_bitrate']}k"
else: else:
result = '?k' result = '?k'
if fmt['audio_sample_rate']: if fmt['audio_sample_rate']:
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz' result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz"
return result return result
elif fmt['vcodec']: elif fmt['vcodec']:
return 'video only' return 'video only'
@@ -737,9 +737,9 @@ def get_audio_track():
seg = line if line.startswith('http') else urljoin(playlist_base, line) seg = line if line.startswith('http') else urljoin(playlist_base, line)
# Always use &seg= parameter, never &url= for segments # Always use &seg= parameter, never &url= for segments
playlist_lines.append( playlist_lines.append(
base_url + '/ytl-api/audio-track?id=' f'{base_url}/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key) f'{urllib.parse.quote(cache_key)}'
+ '&seg=' + urllib.parse.quote(seg, safe='') f'&seg={urllib.parse.quote(seg, safe="")}'
) )
playlist = '\n'.join(playlist_lines) playlist = '\n'.join(playlist_lines)
@@ -797,9 +797,7 @@ def get_audio_track():
return url return url
if not url.startswith('http://') and not url.startswith('https://'): if not url.startswith('http://') and not url.startswith('https://'):
url = urljoin(playlist_base, url) url = urljoin(playlist_base, url)
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
playlist_lines = [] playlist_lines = []
for line in playlist.split('\n'): for line in playlist.split('\n'):
@@ -812,7 +810,7 @@ def get_audio_track():
if line.startswith('#') and 'URI=' in line: if line.startswith('#') and 'URI=' in line:
def rewrite_uri_attr(match): def rewrite_uri_attr(match):
uri = match.group(1) uri = match.group(1)
return 'URI="' + proxy_url(uri) + '"' return f'URI="{proxy_url(uri)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line) line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
playlist_lines.append(line) playlist_lines.append(line)
elif line.startswith('#'): elif line.startswith('#'):
@@ -883,9 +881,7 @@ def get_audio_track():
if segment_url.startswith('/ytl-api/audio-track'): if segment_url.startswith('/ytl-api/audio-track'):
return segment_url return segment_url
base_url = request.url_root.rstrip('/') base_url = request.url_root.rstrip('/')
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(segment_url))
playlist_lines = [] playlist_lines = []
for line in playlist.split('\n'): for line in playlist.split('\n'):
@@ -949,14 +945,10 @@ def get_hls_manifest():
if is_audio_track: if is_audio_track:
# Audio track playlist - proxy through audio-track endpoint # Audio track playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
+ urllib.parse.quote(cache_key)
+ '&url=' + urllib.parse.quote(url, safe=''))
else: else:
# Video segment or variant playlist - proxy through audio-track endpoint # Video segment or variant playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id=' return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
# Parse and rewrite the manifest # Parse and rewrite the manifest
manifest_lines = [] manifest_lines = []
@@ -974,7 +966,7 @@ def get_hls_manifest():
nonlocal rewritten_count nonlocal rewritten_count
uri = match.group(1) uri = match.group(1)
rewritten_count += 1 rewritten_count += 1
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"' return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line) line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
manifest_lines.append(line) manifest_lines.append(line)
elif line.startswith('#'): elif line.startswith('#'):
@@ -1053,7 +1045,7 @@ def get_storyboard_vtt():
ts = 0 # current timestamp ts = 0 # current timestamp
for i in range(storyboard.storyboard_count): for i in range(storyboard.storyboard_count):
url = '/' + storyboard.url.replace("$M", str(i)) url = f'/{storyboard.url.replace("$M", str(i))}'
interval = storyboard.interval interval = storyboard.interval
w, h = storyboard.width, storyboard.height w, h = storyboard.width, storyboard.height
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
@@ -1078,7 +1070,7 @@ def get_watch_page(video_id=None):
if not video_id: if not video_id:
return flask.render_template('error.html', error_message='Missing video id'), 404 return flask.render_template('error.html', error_message='Missing video id'), 404
if len(video_id) < 11: if len(video_id) < 11:
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404 return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
time_start_str = request.args.get('t', '0s') time_start_str = request.args.get('t', '0s')
time_start = 0 time_start = 0
@@ -1141,9 +1133,9 @@ def get_watch_page(video_id=None):
util.prefix_urls(item) util.prefix_urls(item)
util.add_extra_html_info(item) util.add_extra_html_info(item)
if playlist_id: if playlist_id:
item['url'] += '&list=' + playlist_id item['url'] += f'&list={playlist_id}'
if item['index']: if item['index']:
item['url'] += '&index=' + str(item['index']) item['url'] += f'&index={item["index"]}'
info['playlist']['author_url'] = util.prefix_url( info['playlist']['author_url'] = util.prefix_url(
info['playlist']['author_url']) info['playlist']['author_url'])
if settings.img_prefix: if settings.img_prefix:
@@ -1159,16 +1151,16 @@ def get_watch_page(video_id=None):
filename = title filename = title
ext = fmt.get('ext') ext = fmt.get('ext')
if ext: if ext:
filename += '.' + ext filename += f'.{ext}'
fmt['url'] = fmt['url'].replace( fmt['url'] = fmt['url'].replace(
'/videoplayback', '/videoplayback',
'/videoplayback/name/' + filename) f'/videoplayback/name/{filename}')
download_formats = [] download_formats = []
for format in (info['formats'] + info['hls_formats']): for format in (info['formats'] + info['hls_formats']):
if format['acodec'] and format['vcodec']: if format['acodec'] and format['vcodec']:
codecs_string = format['acodec'] + ', ' + format['vcodec'] codecs_string = f"{format['acodec']}, {format['vcodec']}"
else: else:
codecs_string = format['acodec'] or format['vcodec'] or '?' codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({ download_formats.append({
@@ -1247,12 +1239,9 @@ def get_watch_page(video_id=None):
for source in subtitle_sources: for source in subtitle_sources:
best_caption_parse = urllib.parse.urlparse( best_caption_parse = urllib.parse.urlparse(
source['url'].lstrip('/')) source['url'].lstrip('/'))
transcript_url = (util.URL_ORIGIN transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
+ '/watch/transcript'
+ best_caption_parse.path
+ '?' + best_caption_parse.query)
other_downloads.append({ other_downloads.append({
'label': 'Video Transcript: ' + source['label'], 'label': f'Video Transcript: {source["label"]}',
'ext': 'txt', 'ext': 'txt',
'url': transcript_url 'url': transcript_url
}) })
@@ -1263,7 +1252,7 @@ def get_watch_page(video_id=None):
template_name = 'watch.html' template_name = 'watch.html'
return flask.render_template(template_name, return flask.render_template(template_name,
header_playlist_names = local_playlist.get_playlist_names(), header_playlist_names = local_playlist.get_playlist_names(),
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '', uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
time_published = info['time_published'], time_published = info['time_published'],
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
@@ -1305,10 +1294,10 @@ def get_watch_page(video_id=None):
ip_address = info['ip_address'] if settings.route_tor else None, ip_address = info['ip_address'] if settings.route_tor else None,
invidious_used = info['invidious_used'], invidious_used = info['invidious_used'],
invidious_reload_button = info['invidious_reload_button'], invidious_reload_button = info['invidious_reload_button'],
video_url = util.URL_ORIGIN + '/watch?v=' + video_id, video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
video_id = video_id, video_id = video_id,
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' + storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
urlencode([('spec_url', info['storyboard_spec_url'])]) f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
if info['storyboard_spec_url'] else None), if info['storyboard_spec_url'] else None),
js_data = { js_data = {
@@ -1335,7 +1324,7 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>') @yt_app.route('/api/<path:dummy>')
def get_captions(dummy): def get_captions(dummy):
url = 'https://www.youtube.com' + request.full_path url = f'https://www.youtube.com{request.full_path}'
try: try:
result = util.fetch_url(url, headers=util.mobile_ua) result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"") result = result.replace(b"align:start position:0%", b"")
@@ -1350,12 +1339,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
@yt_app.route('/watch/transcript/<path:caption_path>') @yt_app.route('/watch/transcript/<path:caption_path>')
def get_transcript(caption_path): def get_transcript(caption_path):
try: try:
captions = util.fetch_url('https://www.youtube.com/' captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
+ caption_path
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
except util.FetchError as e: except util.FetchError as e:
msg = ('Error retrieving captions: ' + str(e) + '\n\n' msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
+ 'The caption url may have expired.')
print(msg) print(msg)
return flask.Response( return flask.Response(
msg, msg,
@@ -1403,7 +1389,7 @@ def get_transcript(caption_path):
result = '' result = ''
for seg in segments: for seg in segments:
if seg['text'] != ' ': if seg['text'] != ' ':
result += seg['begin'] + ' ' + seg['text'] + '\r\n' result += f"{seg['begin']} {seg['text']}\r\n"
return flask.Response(result.encode('utf-8'), return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8') mimetype='text/plain;charset=UTF-8')

View File

@@ -212,7 +212,7 @@ def extract_date(date_text):
month, day, year = parts[-3:] month, day, year = parts[-3:]
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day return f'{year}-{month}-{day}'
return None return None
def check_missing_keys(object, *key_sequences): def check_missing_keys(object, *key_sequences):
@@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences):
for key in key_sequence: for key in key_sequence:
_object = _object[key] _object = _object[key]
except (KeyError, IndexError, TypeError): except (KeyError, IndexError, TypeError):
return 'Could not find ' + key return f'Could not find {key}'
return None return None
@@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}):
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
)) ))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['description'] = extract_formatted_text(multi_deep_get( info['description'] = extract_formatted_text(multi_deep_get(
item, item,
['descriptionText'], ['descriptionSnippet'], ['descriptionText'], ['descriptionSnippet'],

View File

@@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json):
metadata['description'] = desc metadata['description'] = desc
if metadata['author_id']: if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}'
if metadata['first_video_id'] is None: if metadata['first_video_id'] is None:
metadata['thumbnail'] = None metadata['thumbnail'] = None

View File

@@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''):
) )
if playability_status not in (None, 'OK'): if playability_status not in (None, 'OK'):
info['playability_error'] = error_prefix + playability_reason info['playability_error'] = f'{error_prefix}{playability_reason}'
elif not info['playability_error']: # do not override elif not info['playability_error']: # do not override
info['playability_error'] = error_prefix + 'Unknown playability error' info['playability_error'] = f'{error_prefix}Unknown playability error'
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json): def extract_watch_info(polymer_json):
@@ -726,7 +726,7 @@ def extract_watch_info(polymer_json):
# Store the full URL from the player response (includes valid tokens) # Store the full URL from the player response (includes valid tokens)
if base_url: if base_url:
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name: if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name info['_manual_caption_language_names'][lang_code] = lang_name
@@ -806,7 +806,7 @@ def extract_watch_info(polymer_json):
info['allowed_countries'] = mf.get('availableCountries', []) info['allowed_countries'] = mf.get('availableCountries', [])
# other stuff # other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec') info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
return info return info
@@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
url = info['_captions_base_url'] url = info['_captions_base_url']
if not url: if not url:
return None return None
url += '&lang=' + language url += f'&lang={language}'
url += '&fmt=' + format url += f'&fmt={format}'
if automatic: if automatic:
url += '&kind=asr' url += '&kind=asr'
elif language in info['_manual_caption_language_names']: elif language in info['_manual_caption_language_names']:
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='') url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}'
if translation_language: if translation_language:
url += '&tlang=' + translation_language url += '&tlang=' + translation_language
@@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js):
return 'Could not find var_name' return 'Could not find var_name'
var_name = var_with_operation_match.group(1) var_name = var_with_operation_match.group(1)
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL)
if var_body_match is None: if var_body_match is None:
return 'Could not find var_body' return 'Could not find var_body'
@@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js):
elif op_body.startswith('var c=a[0]'): elif op_body.startswith('var c=a[0]'):
operation_definitions[op_name] = 2 operation_definitions[op_name] = 2
else: else:
return 'Unknown op_body: ' + op_body return f'Unknown op_body: {op_body}'
decryption_function = [] decryption_function = []
for op_with_arg in function_body: for op_with_arg in function_body:
@@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js):
return 'Could not parse operation with arg' return 'Could not parse operation with arg'
op_name = match.group(2).strip('[].') op_name = match.group(2).strip('[].')
if op_name not in operation_definitions: if op_name not in operation_definitions:
return 'Unknown op_name: ' + str(op_name) return f'Unknown op_name: {op_name}'
op_argument = match.group(3) op_argument = match.group(3)
decryption_function.append([operation_definitions[op_name], int(op_argument)]) decryption_function.append([operation_definitions[op_name], int(op_argument)])
@@ -1028,5 +1028,5 @@ def decrypt_signatures(info):
_operation_2(a, argument) _operation_2(a, argument)
signature = ''.join(a) signature = ''.join(a)
format['url'] += '&' + format['sp'] + '=' + signature format['url'] += f'&{format["sp"]}={signature}'
return False return False