security: harden code against command injection and path traversal

Core changes:

* enforce HTTPS URLs and remove shell usage in generate_release.py
* replace os.system calls with subprocess across the codebase
* validate external inputs (playlist names, video IDs)

Improvements and fixes:

* settings.py: fix typo (node.lineno → line_number); use isinstance() over type()
* youtube/get_app_version: improve git detection using subprocess.DEVNULL
* youtube/util.py: add cleanup helpers; use shutil.which for binary resolution

YouTube modules:

* watch.py: detect and flag HLS streams; remove unused audio_track_sources
* comments.py: return early when comments are disabled; add error handling
* local_playlist.py: validate playlist names to prevent path traversal
* subscriptions.py: replace asserts with proper error handling; validate video IDs

Cleanup:

* remove unused imports across modules (playlist, search, channel)
* reorganize package imports in youtube/**init**.py
* simplify test imports and fix cleanup_func in tests

Tests:

* tests/test_shorts.py: simplify imports
* tests/test_util.py: fix cleanup_func definition
This commit is contained in:
2026-04-20 00:39:35 -05:00
parent 155bd4df49
commit d6190a2d0b
16 changed files with 237 additions and 146 deletions

View File

@@ -1,14 +1,17 @@
import logging
import os
import re
import traceback
from sys import exc_info
import flask
import jinja2
from flask import request
from flask_babel import Babel
from youtube import util
from .get_app_version import app_version
import flask
from flask import request
import jinja2
import settings
import traceback
import logging
import re
from sys import exc_info
from flask_babel import Babel
yt_app = flask.Flask(__name__)
yt_app.config['TEMPLATES_AUTO_RELOAD'] = True
@@ -26,7 +29,6 @@ yt_app.logger.addFilter(FetchErrorFilter())
# yt_app.jinja_env.lstrip_blocks = True
# Configure Babel for i18n
import os
yt_app.config['BABEL_DEFAULT_LOCALE'] = 'en'
# Use absolute path for translations directory to avoid issues with package structure changes
_app_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

View File

@@ -6,9 +6,7 @@ import settings
import urllib
import json
from string import Template
import youtube.proto as proto
import html
import math
import gevent
import re
@@ -293,7 +291,7 @@ def get_number_of_videos_channel(channel_id):
try:
response = util.fetch_url(url, headers_mobile,
debug_name='number_of_videos', report_text='Got number of videos')
except (urllib.error.HTTPError, util.FetchError) as e:
except (urllib.error.HTTPError, util.FetchError):
traceback.print_exc()
print("Couldn't retrieve number of videos")
return 1000

View File

@@ -155,33 +155,35 @@ def post_process_comments_info(comments_info):
def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if not settings.comments_mode:
return {}
# Initialize the result dict up-front so that any exception path below
# can safely attach an 'error' field without risking UnboundLocalError.
comments_info = {'error': None}
try:
if settings.comments_mode:
comments_info = {'error': None}
other_sort_url = (
util.URL_ORIGIN + '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
)
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
other_sort_url = (
util.URL_ORIGIN + '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
)
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
this_sort_url = (util.URL_ORIGIN
+ '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
this_sort_url = (util.URL_ORIGIN
+ '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
comments_info['comment_links'] = [
(other_sort_text, other_sort_url),
('Direct link', this_sort_url)
]
comments_info['comment_links'] = [
(other_sort_text, other_sort_url),
('Direct link', this_sort_url)
]
ctoken = make_comment_ctoken(video_id, sort, offset, lc)
comments_info.update(yt_data_extract.extract_comments_info(
request_comments(ctoken), ctoken=ctoken
))
post_process_comments_info(comments_info)
ctoken = make_comment_ctoken(video_id, sort, offset, lc)
comments_info.update(yt_data_extract.extract_comments_info(
request_comments(ctoken), ctoken=ctoken
))
post_process_comments_info(comments_info)
return comments_info
else:
return {}
return comments_info
except util.FetchError as e:
if e.code == '429' and settings.route_tor:
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'

View File

@@ -1 +1,3 @@
from .get_app_version import *
from .get_app_version import app_version
__all__ = ['app_version']

View File

@@ -1,47 +1,56 @@
from __future__ import unicode_literals
from subprocess import (
call,
STDOUT
)
from ..version import __version__
import os
import shutil
import subprocess
from ..version import __version__
def app_version():
def minimal_env_cmd(cmd):
# make minimal environment
env = {k: os.environ[k] for k in ['SYSTEMROOT', 'PATH'] if k in os.environ}
env.update({'LANGUAGE': 'C', 'LANG': 'C', 'LC_ALL': 'C'})
out = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env).communicate()[0]
return out
subst_list = {
"version": __version__,
"branch": None,
"commit": None
'version': __version__,
'branch': None,
'commit': None,
}
if os.system("command -v git > /dev/null 2>&1") != 0:
# Use shutil.which instead of `command -v`/os.system so we don't spawn a
# shell (CWE-78 hardening) and so it works cross-platform.
if shutil.which('git') is None:
return subst_list
if call(["git", "branch"], stderr=STDOUT, stdout=open(os.devnull, 'w')) != 0:
try:
# Check we are inside a git work tree. Using DEVNULL avoids the
# file-handle leak from `open(os.devnull, 'w')`.
rc = subprocess.call(
['git', 'branch'],
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
except OSError:
return subst_list
if rc != 0:
return subst_list
describe = minimal_env_cmd(["git", "describe", "--tags", "--always"])
describe = minimal_env_cmd(['git', 'describe', '--tags', '--always'])
git_revision = describe.strip().decode('ascii')
branch = minimal_env_cmd(["git", "branch"])
branch = minimal_env_cmd(['git', 'branch'])
git_branch = branch.strip().decode('ascii').replace('* ', '')
subst_list.update({
"branch": git_branch,
"commit": git_revision
'branch': git_branch,
'commit': git_revision,
})
return subst_list
if __name__ == "__main__":
if __name__ == '__main__':
app_version()

View File

@@ -1,28 +1,42 @@
from youtube import util, yt_data_extract
from youtube import util
from youtube import yt_app
import settings
import os
import json
import html
import gevent
import urllib
import math
import glob
import re
import flask
from flask import request
playlists_directory = os.path.join(settings.data_dir, "playlists")
thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails")
playlists_directory = os.path.join(settings.data_dir, 'playlists')
thumbnails_directory = os.path.join(settings.data_dir, 'playlist_thumbnails')
# Whitelist accepted playlist names so user input cannot escape
# `playlists_directory` / `thumbnails_directory` (CWE-22, OWASP A01:2021).
# Allow letters, digits, spaces, dot, dash and underscore.
_PLAYLIST_NAME_RE = re.compile(r'^[\w .\-]{1,128}$')
def _validate_playlist_name(name):
'''Return the stripped name if safe, otherwise abort with 400.'''
if name is None:
flask.abort(400)
name = name.strip()
if not _PLAYLIST_NAME_RE.match(name):
flask.abort(400)
return name
def _find_playlist_path(name):
"""Find playlist file robustly, handling trailing spaces in filenames"""
name = name.strip()
pattern = os.path.join(playlists_directory, name + "*.txt")
'''Find playlist file robustly, handling trailing spaces in filenames'''
name = _validate_playlist_name(name)
pattern = os.path.join(playlists_directory, name + '*.txt')
files = glob.glob(pattern)
return files[0] if files else os.path.join(playlists_directory, name + ".txt")
return files[0] if files else os.path.join(playlists_directory, name + '.txt')
def _parse_playlist_lines(data):
@@ -179,8 +193,9 @@ def path_edit_playlist(playlist_name):
redirect_page_number = min(int(request.values.get('page', 1)), math.ceil(number_of_videos_remaining/50))
return flask.redirect(util.URL_ORIGIN + request.path + '?page=' + str(redirect_page_number))
elif request.values['action'] == 'remove_playlist':
safe_name = _validate_playlist_name(playlist_name)
try:
os.remove(os.path.join(playlists_directory, playlist_name + ".txt"))
os.remove(os.path.join(playlists_directory, safe_name + '.txt'))
except OSError:
pass
return flask.redirect(util.URL_ORIGIN + '/playlists')
@@ -220,8 +235,17 @@ def edit_playlist():
flask.abort(400)
_THUMBNAIL_RE = re.compile(r'^[A-Za-z0-9_-]{11}\.jpg$')
@yt_app.route('/data/playlist_thumbnails/<playlist_name>/<thumbnail>')
def serve_thumbnail(playlist_name, thumbnail):
# .. is necessary because flask always uses the application directory at ./youtube, not the working directory
# Validate both path components so a crafted URL cannot escape
# `thumbnails_directory` via `..` or NUL tricks (CWE-22).
safe_name = _validate_playlist_name(playlist_name)
if not _THUMBNAIL_RE.match(thumbnail):
flask.abort(400)
# .. is necessary because flask always uses the application directory at
# ./youtube, not the working directory.
return flask.send_from_directory(
os.path.join('..', thumbnails_directory, playlist_name), thumbnail)
os.path.join('..', thumbnails_directory, safe_name), thumbnail)

View File

@@ -3,9 +3,7 @@ from youtube import yt_app
import settings
import base64
import urllib
import json
import string
import gevent
import math
from flask import request, abort

View File

@@ -5,7 +5,6 @@ import settings
import json
import urllib
import base64
import mimetypes
from flask import request
import flask
import os

View File

@@ -292,7 +292,10 @@ def youtube_timestamp_to_posix(dumb_timestamp):
def posix_to_dumbed_down(posix_time):
'''Inverse of youtube_timestamp_to_posix.'''
delta = int(time.time() - posix_time)
assert delta >= 0
# Guard against future timestamps (clock drift) without relying on
# `assert` (which is stripped under `python -O`).
if delta < 0:
delta = 0
if delta == 0:
return '0 seconds ago'
@@ -531,7 +534,8 @@ def _get_upstream_videos(channel_id):
return None
root = defusedxml.ElementTree.fromstring(feed)
assert remove_bullshit(root.tag) == 'feed'
if remove_bullshit(root.tag) != 'feed':
raise ValueError('Root element is not <feed>')
for entry in root:
if (remove_bullshit(entry.tag) != 'entry'):
continue
@@ -539,13 +543,13 @@ def _get_upstream_videos(channel_id):
# it's yt:videoId in the xml but the yt: is turned into a namespace which is removed by remove_bullshit
video_id_element = find_element(entry, 'videoId')
time_published_element = find_element(entry, 'published')
assert video_id_element is not None
assert time_published_element is not None
if video_id_element is None or time_published_element is None:
raise ValueError('Missing videoId or published element')
time_published = int(calendar.timegm(time.strptime(time_published_element.text, '%Y-%m-%dT%H:%M:%S+00:00')))
times_published[video_id_element.text] = time_published
except AssertionError:
except ValueError:
print('Failed to read atoma feed for ' + channel_status_name)
traceback.print_exc()
except defusedxml.ElementTree.ParseError:
@@ -593,7 +597,10 @@ def _get_upstream_videos(channel_id):
# Special case: none of the videos have a time published.
# In this case, make something up
if videos and videos[0]['time_published'] is None:
assert all(v['time_published'] is None for v in videos)
# Invariant: if the first video has no timestamp, earlier passes
# ensure all of them are unset. Don't rely on `assert`.
if not all(v['time_published'] is None for v in videos):
raise RuntimeError('Inconsistent time_published state')
now = time.time()
for i in range(len(videos)):
# 1 month between videos
@@ -808,7 +815,8 @@ def import_subscriptions():
file = file.read().decode('utf-8')
try:
root = defusedxml.ElementTree.fromstring(file)
assert root.tag == 'opml'
if root.tag != 'opml':
raise ValueError('Root element is not <opml>')
channels = []
for outline_element in root[0][0]:
if (outline_element.tag != 'outline') or ('xmlUrl' not in outline_element.attrib):
@@ -819,7 +827,7 @@ def import_subscriptions():
channel_id = channel_rss_url[channel_rss_url.find('channel_id=')+11:].strip()
channels.append((channel_id, channel_name))
except (AssertionError, IndexError, defusedxml.ElementTree.ParseError) as e:
except (ValueError, IndexError, defusedxml.ElementTree.ParseError):
return '400 Bad Request: Unable to read opml xml file, or the file is not the expected format', 400
elif mime_type in ('text/csv', 'application/vnd.ms-excel'):
content = file.read().decode('utf-8')
@@ -1071,11 +1079,20 @@ def post_subscriptions_page():
return '', 204
# YouTube video IDs are exactly 11 chars from [A-Za-z0-9_-]. Enforce this
# before using the value in filesystem paths to prevent path traversal
# (CWE-22, OWASP A01:2021).
_VIDEO_ID_RE = re.compile(r'^[A-Za-z0-9_-]{11}$')
@yt_app.route('/data/subscription_thumbnails/<thumbnail>')
def serve_subscription_thumbnail(thumbnail):
'''Serves thumbnail from disk if it's been saved already. If not, downloads the thumbnail, saves to disk, and serves it.'''
assert thumbnail[-4:] == '.jpg'
if not thumbnail.endswith('.jpg'):
flask.abort(400)
video_id = thumbnail[0:-4]
if not _VIDEO_ID_RE.match(video_id):
flask.abort(400)
thumbnail_path = os.path.join(thumbnails_directory, thumbnail)
if video_id in existing_thumbnails:

View File

@@ -1,5 +1,6 @@
from datetime import datetime
import logging
import random
import settings
import socks
import sockshandler
@@ -19,11 +20,11 @@ import gevent.queue
import gevent.lock
import collections
import stem
logger = logging.getLogger(__name__)
import stem.control
import traceback
logger = logging.getLogger(__name__)
# The trouble with the requests library: It ships its own certificate bundle via certifi
# instead of using the system certificate store, meaning self-signed certificates
# configured by the user will not work. Some draconian networks block TLS unless a corporate
@@ -54,8 +55,8 @@ import traceback
# https://github.com/kennethreitz/requests/issues/2966
# Until then, I will use a mix of urllib3 and urllib.
import urllib3
import urllib3.contrib.socks
import urllib3 # noqa: E402 (imported here intentionally after the long note above)
import urllib3.contrib.socks # noqa: E402
URL_ORIGIN = "/https://www.youtube.com"
@@ -177,7 +178,6 @@ def get_pool(use_tor):
class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
'''Separate cookiejars for receiving and sending'''
def __init__(self, cookiejar_send=None, cookiejar_receive=None):
import http.cookiejar
self.cookiejar_send = cookiejar_send
self.cookiejar_receive = cookiejar_receive
@@ -208,6 +208,16 @@ class FetchError(Exception):
self.error_message = error_message
def _noop_cleanup(response):
'''No-op cleanup used when the urllib opener owns the response.'''
return None
def _release_conn_cleanup(response):
'''Release the urllib3 pooled connection back to the pool.'''
response.release_conn()
def decode_content(content, encoding_header):
encodings = encoding_header.replace(' ', '').split(',')
for encoding in reversed(encodings):
@@ -263,7 +273,7 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
opener = urllib.request.build_opener(cookie_processor)
response = opener.open(req, timeout=timeout)
cleanup_func = (lambda r: None)
cleanup_func = _noop_cleanup
else: # Use a urllib3 pool. Cookies can't be used since urllib3 doesn't have easy support for them.
# default: Retry.DEFAULT = Retry(3)
@@ -297,7 +307,7 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
error_message=msg)
else:
raise
cleanup_func = (lambda r: r.release_conn())
cleanup_func = _release_conn_cleanup
return response, cleanup_func
@@ -315,8 +325,6 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
Max retries: 5 attempts with exponential backoff
"""
import random
max_retries = 5
base_delay = 1.0 # Base delay in seconds
@@ -401,7 +409,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
logger.error(f'Server error {response.status} after {max_retries} retries')
raise FetchError(str(response.status), reason=response.reason, ip=None)
# Exponential backoff for server errors
# Exponential backoff for server errors. Non-crypto jitter.
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.warning(f'Server error ({response.status}). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
@@ -432,7 +440,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
else:
raise
# Wait and retry
# Wait and retry. Non-crypto jitter.
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
logger.warning(f'Connection error. Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
time.sleep(delay)
@@ -532,30 +540,30 @@ class RateLimitedQueue(gevent.queue.Queue):
def download_thumbnail(save_directory, video_id):
save_location = os.path.join(save_directory, video_id + ".jpg")
save_location = os.path.join(save_directory, video_id + '.jpg')
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
try:
thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id)
except FetchError as e:
if '404' in str(e):
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
return False
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
return False
try:
f = open(save_location, 'wb')
with open(save_location, 'wb') as f:
f.write(thumbnail)
except FileNotFoundError:
os.makedirs(save_directory, exist_ok=True)
f = open(save_location, 'wb')
f.write(thumbnail)
f.close()
with open(save_location, 'wb') as f:
f.write(thumbnail)
return True
print("No thumbnail available for " + video_id)
print('No thumbnail available for ' + video_id)
return False

View File

@@ -1,27 +1,26 @@
import json
import logging
import math
import os
import re
import traceback
import urllib
from math import ceil
from types import SimpleNamespace
from urllib.parse import parse_qs, urlencode
import flask
import gevent
import urllib3.exceptions
from flask import request
import youtube
from youtube import yt_app
from youtube import util, comments, local_playlist, yt_data_extract
from youtube.util import time_utc_isoformat
import settings
from flask import request
import flask
import logging
logger = logging.getLogger(__name__)
import json
import gevent
import os
import math
import traceback
import urllib
import re
import urllib3.exceptions
from urllib.parse import parse_qs, urlencode
from types import SimpleNamespace
from math import ceil
try:
with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f:
@@ -62,7 +61,9 @@ def get_video_sources(info, target_resolution):
continue
if not (fmt['init_range'] and fmt['index_range']):
# Allow HLS-backed audio tracks (served locally, no init/index needed)
if not fmt.get('url', '').startswith('http://127.') and not '/ytl-api/' in fmt.get('url', ''):
url_value = fmt.get('url', '')
if (not url_value.startswith('http://127.')
and '/ytl-api/' not in url_value):
continue
# Mark as HLS for frontend
fmt['is_hls'] = True
@@ -222,7 +223,7 @@ def lang_in(lang, sequence):
if lang is None:
return False
lang = lang[0:2]
return lang in (l[0:2] for l in sequence)
return lang in (item[0:2] for item in sequence)
def lang_eq(lang1, lang2):
@@ -238,9 +239,9 @@ def equiv_lang_in(lang, sequence):
e.g. if lang is en, extracts en-GB from sequence.
Necessary because if only a specific variant like en-GB is available, can't ask YouTube for simply en. Need to get the available variant.'''
lang = lang[0:2]
for l in sequence:
if l[0:2] == lang:
return l
for item in sequence:
if item[0:2] == lang:
return item
return None
@@ -310,7 +311,15 @@ def get_subtitle_sources(info):
sources[-1]['on'] = True
if len(sources) == 0:
assert len(info['automatic_caption_languages']) == 0 and len(info['manual_caption_languages']) == 0
# Invariant: with no caption sources there should be no languages
# either. Don't rely on `assert` which is stripped under `python -O`.
if (len(info['automatic_caption_languages']) != 0
or len(info['manual_caption_languages']) != 0):
logger.warning(
'Unexpected state: no subtitle sources but %d auto / %d manual languages',
len(info['automatic_caption_languages']),
len(info['manual_caption_languages']),
)
return sources
@@ -669,7 +678,6 @@ def format_bytes(bytes):
@yt_app.route('/ytl-api/audio-track-proxy')
def audio_track_proxy():
"""Proxy for DASH audio tracks to avoid throttling."""
cache_key = request.args.get('id', '')
audio_url = request.args.get('url', '')
if not audio_url:
@@ -692,7 +700,7 @@ def audio_track_proxy():
@yt_app.route('/ytl-api/audio-track')
def get_audio_track():
"""Proxy HLS audio/video: playlist or individual segment."""
from youtube.hls_cache import get_hls_url, _tracks
from youtube.hls_cache import get_hls_url
cache_key = request.args.get('id', '')
seg_url = request.args.get('seg', '')
@@ -916,7 +924,7 @@ def get_hls_manifest():
flask.abort(404, 'HLS manifest not found')
try:
print(f'[hls-manifest] Fetching HLS manifest...')
print('[hls-manifest] Fetching HLS manifest...')
manifest = util.fetch_url(hls_url,
headers=(('User-Agent', 'Mozilla/5.0'),),
debug_name='hls_manifest').decode('utf-8')
@@ -1018,7 +1026,8 @@ def get_storyboard_vtt():
for i, board in enumerate(boards):
*t, _, sigh = board.split("#")
width, height, count, width_cnt, height_cnt, interval = map(int, t)
if height != wanted_height: continue
if height != wanted_height:
continue
q['sigh'] = [sigh]
url = f"{base_url}?{urlencode(q, doseq=True)}"
storyboard = SimpleNamespace(
@@ -1182,7 +1191,6 @@ def get_watch_page(video_id=None):
uni_sources = video_sources['uni_sources']
pair_sources = video_sources['pair_sources']
pair_idx = video_sources['pair_idx']
audio_track_sources = video_sources['audio_track_sources']
# Build audio tracks list from HLS
audio_tracks = []