Compare commits
1 Commits
master
...
security/h
| Author | SHA1 | Date | |
|---|---|---|---|
|
50ad959a80
|
@@ -33,7 +33,7 @@ def check_subp(x):
|
|||||||
raise Exception('Got nonzero exit code from command')
|
raise Exception('Got nonzero exit code from command')
|
||||||
|
|
||||||
def log(line):
|
def log(line):
|
||||||
print('[generate_release.py] ' + line)
|
print(f'[generate_release.py] {line}')
|
||||||
|
|
||||||
# https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions
|
# https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions
|
||||||
def remove_files_with_extensions(path, extensions):
|
def remove_files_with_extensions(path, extensions):
|
||||||
@@ -43,23 +43,23 @@ def remove_files_with_extensions(path, extensions):
|
|||||||
os.remove(os.path.join(root, file))
|
os.remove(os.path.join(root, file))
|
||||||
|
|
||||||
def download_if_not_exists(file_name, url, sha256=None):
|
def download_if_not_exists(file_name, url, sha256=None):
|
||||||
if not os.path.exists('./' + file_name):
|
if not os.path.exists(f'./{file_name}'):
|
||||||
# Reject non-https URLs so a mistaken constant cannot cause a
|
# Reject non-https URLs so a mistaken constant cannot cause a
|
||||||
# plaintext download (bandit B310 hardening).
|
# plaintext download (bandit B310 hardening).
|
||||||
if not url.startswith('https://'):
|
if not url.startswith('https://'):
|
||||||
raise Exception('Refusing to download over non-https URL: ' + url)
|
raise Exception(f'Refusing to download over non-https URL: {url}')
|
||||||
log('Downloading ' + file_name + '..')
|
log(f'Downloading {file_name}..')
|
||||||
data = urllib.request.urlopen(url).read()
|
data = urllib.request.urlopen(url).read()
|
||||||
log('Finished downloading ' + file_name)
|
log(f'Finished downloading {file_name}')
|
||||||
with open('./' + file_name, 'wb') as f:
|
with open(f'./{file_name}', 'wb') as f:
|
||||||
f.write(data)
|
f.write(data)
|
||||||
if sha256:
|
if sha256:
|
||||||
digest = hashlib.sha256(data).hexdigest()
|
digest = hashlib.sha256(data).hexdigest()
|
||||||
if digest != sha256:
|
if digest != sha256:
|
||||||
log('Error: ' + file_name + ' has wrong hash: ' + digest)
|
log(f'Error: {file_name} has wrong hash: {digest}')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
else:
|
else:
|
||||||
log('Using existing ' + file_name)
|
log(f'Using existing {file_name}')
|
||||||
|
|
||||||
def wine_run_shell(command):
|
def wine_run_shell(command):
|
||||||
# Keep argv-style invocation (no shell) to avoid command injection.
|
# Keep argv-style invocation (no shell) to avoid command injection.
|
||||||
@@ -120,7 +120,7 @@ if len(os.listdir('./yt-local')) == 0:
|
|||||||
# ----------- Generate embedded python distribution -----------
|
# ----------- Generate embedded python distribution -----------
|
||||||
os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution
|
os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution
|
||||||
get_pip_url = 'https://bootstrap.pypa.io/get-pip.py'
|
get_pip_url = 'https://bootstrap.pypa.io/get-pip.py'
|
||||||
latest_dist_url = 'https://www.python.org/ftp/python/' + latest_version + '/python-' + latest_version
|
latest_dist_url = f'https://www.python.org/ftp/python/{latest_version}/python-{latest_version}'
|
||||||
if bitness == '32':
|
if bitness == '32':
|
||||||
latest_dist_url += '-embed-win32.zip'
|
latest_dist_url += '-embed-win32.zip'
|
||||||
else:
|
else:
|
||||||
@@ -142,7 +142,7 @@ else:
|
|||||||
|
|
||||||
download_if_not_exists('get-pip.py', get_pip_url)
|
download_if_not_exists('get-pip.py', get_pip_url)
|
||||||
|
|
||||||
python_dist_name = 'python-dist-' + latest_version + '-' + bitness + '.zip'
|
python_dist_name = f'python-dist-{latest_version}-{bitness}.zip'
|
||||||
|
|
||||||
download_if_not_exists(python_dist_name, latest_dist_url)
|
download_if_not_exists(python_dist_name, latest_dist_url)
|
||||||
download_if_not_exists(visual_c_name,
|
download_if_not_exists(visual_c_name,
|
||||||
@@ -203,7 +203,7 @@ and replaced with a .pth. Isolated mode will have to be specified manually.
|
|||||||
|
|
||||||
log('Removing ._pth')
|
log('Removing ._pth')
|
||||||
major_release = latest_version.split('.')[1]
|
major_release = latest_version.split('.')[1]
|
||||||
os.remove(r'./python/python3' + major_release + '._pth')
|
os.remove(rf'./python/python3{major_release}._pth')
|
||||||
|
|
||||||
log('Adding path_fixes.pth')
|
log('Adding path_fixes.pth')
|
||||||
with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
|
with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
|
||||||
@@ -214,7 +214,7 @@ with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f:
|
|||||||
# Need to add the directory where packages are installed,
|
# Need to add the directory where packages are installed,
|
||||||
# and the parent directory (which is where the yt-local files are)
|
# and the parent directory (which is where the yt-local files are)
|
||||||
major_release = latest_version.split('.')[1]
|
major_release = latest_version.split('.')[1]
|
||||||
with open('./python/python3' + major_release + '._pth', 'a', encoding='utf-8') as f:
|
with open(rf'./python/python3{major_release}._pth', 'a', encoding='utf-8') as f:
|
||||||
f.write('.\\Lib\\site-packages\n')
|
f.write('.\\Lib\\site-packages\n')
|
||||||
f.write('..\n')'''
|
f.write('..\n')'''
|
||||||
|
|
||||||
@@ -255,10 +255,10 @@ log('Copying python distribution into release folder')
|
|||||||
shutil.copytree(r'./python', r'./yt-local/python')
|
shutil.copytree(r'./python', r'./yt-local/python')
|
||||||
|
|
||||||
# ----------- Create release zip -----------
|
# ----------- Create release zip -----------
|
||||||
output_filename = 'yt-local-' + release_tag + '-' + suffix + '.zip'
|
output_filename = f'yt-local-{release_tag}-{suffix}.zip'
|
||||||
if os.path.exists('./' + output_filename):
|
if os.path.exists(f'./{output_filename}'):
|
||||||
log('Removing previous zipped release')
|
log('Removing previous zipped release')
|
||||||
os.remove('./' + output_filename)
|
os.remove(f'./{output_filename}')
|
||||||
log('Zipping release')
|
log('Zipping release')
|
||||||
check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local']))
|
check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local']))
|
||||||
|
|
||||||
|
|||||||
19
server.py
19
server.py
@@ -32,9 +32,9 @@ def youtu_be(env, start_response):
|
|||||||
id = env['PATH_INFO'][1:]
|
id = env['PATH_INFO'][1:]
|
||||||
env['PATH_INFO'] = '/watch'
|
env['PATH_INFO'] = '/watch'
|
||||||
if not env['QUERY_STRING']:
|
if not env['QUERY_STRING']:
|
||||||
env['QUERY_STRING'] = 'v=' + id
|
env['QUERY_STRING'] = f'v={id}'
|
||||||
else:
|
else:
|
||||||
env['QUERY_STRING'] += '&v=' + id
|
env['QUERY_STRING'] += f'&v={id}'
|
||||||
yield from yt_app(env, start_response)
|
yield from yt_app(env, start_response)
|
||||||
|
|
||||||
|
|
||||||
@@ -64,12 +64,12 @@ def proxy_site(env, start_response, video=False):
|
|||||||
if 'HTTP_RANGE' in env:
|
if 'HTTP_RANGE' in env:
|
||||||
send_headers['Range'] = env['HTTP_RANGE']
|
send_headers['Range'] = env['HTTP_RANGE']
|
||||||
|
|
||||||
url = "https://" + env['SERVER_NAME'] + env['PATH_INFO']
|
url = f"https://{env['SERVER_NAME']}{env['PATH_INFO']}"
|
||||||
# remove /name portion
|
# remove /name portion
|
||||||
if video and '/videoplayback/name/' in url:
|
if video and '/videoplayback/name/' in url:
|
||||||
url = url[0:url.rfind('/name/')]
|
url = url[0:url.rfind('/name/')]
|
||||||
if env['QUERY_STRING']:
|
if env['QUERY_STRING']:
|
||||||
url += '?' + env['QUERY_STRING']
|
url += f'?{env["QUERY_STRING"]}'
|
||||||
|
|
||||||
try_num = 1
|
try_num = 1
|
||||||
first_attempt = True
|
first_attempt = True
|
||||||
@@ -96,7 +96,7 @@ def proxy_site(env, start_response, video=False):
|
|||||||
+[('Access-Control-Allow-Origin', '*')])
|
+[('Access-Control-Allow-Origin', '*')])
|
||||||
|
|
||||||
if first_attempt:
|
if first_attempt:
|
||||||
start_response(str(response.status) + ' ' + response.reason,
|
start_response(f"{response.status} {response.reason}",
|
||||||
response_headers)
|
response_headers)
|
||||||
|
|
||||||
content_length = int(dict(response_headers).get('Content-Length', 0))
|
content_length = int(dict(response_headers).get('Content-Length', 0))
|
||||||
@@ -136,9 +136,8 @@ def proxy_site(env, start_response, video=False):
|
|||||||
fail_byte = start + total_received
|
fail_byte = start + total_received
|
||||||
send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end)
|
send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end)
|
||||||
print(
|
print(
|
||||||
'Warning: YouTube closed the connection before byte',
|
f'Warning: YouTube closed the connection before byte {fail_byte}. '
|
||||||
str(fail_byte) + '.', 'Expected', start+content_length,
|
f'Expected {start+content_length} bytes.'
|
||||||
'bytes.'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
retry = True
|
retry = True
|
||||||
@@ -185,7 +184,7 @@ def split_url(url):
|
|||||||
# python STILL doesn't have a proper regular expression engine like grep uses built in...
|
# python STILL doesn't have a proper regular expression engine like grep uses built in...
|
||||||
match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url)
|
match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url)
|
||||||
if match is None:
|
if match is None:
|
||||||
raise ValueError('Invalid or unsupported url: ' + url)
|
raise ValueError(f'Invalid or unsupported url: {url}')
|
||||||
|
|
||||||
return match.group(1), match.group(2)
|
return match.group(1), match.group(2)
|
||||||
|
|
||||||
@@ -238,7 +237,7 @@ def site_dispatch(env, start_response):
|
|||||||
if base_name == '':
|
if base_name == '':
|
||||||
base_name = domain
|
base_name = domain
|
||||||
else:
|
else:
|
||||||
base_name = domain + '.' + base_name
|
base_name = f"{domain}.{base_name}"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
handler = site_handlers[base_name]
|
handler = site_handlers[base_name]
|
||||||
|
|||||||
10
settings.py
10
settings.py
@@ -397,14 +397,14 @@ acceptable_targets = SETTINGS_INFO.keys() | {
|
|||||||
def comment_string(comment):
|
def comment_string(comment):
|
||||||
result = ''
|
result = ''
|
||||||
for line in comment.splitlines():
|
for line in comment.splitlines():
|
||||||
result += '# ' + line + '\n'
|
result += f'# {line}\n'
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def save_settings(settings_dict):
|
def save_settings(settings_dict):
|
||||||
with open(settings_file_path, 'w', encoding='utf-8') as file:
|
with open(settings_file_path, 'w', encoding='utf-8') as file:
|
||||||
for setting_name, setting_info in SETTINGS_INFO.items():
|
for setting_name, setting_info in SETTINGS_INFO.items():
|
||||||
file.write(comment_string(setting_info['comment']) + setting_name + ' = ' + repr(settings_dict[setting_name]) + '\n\n')
|
file.write(f"{comment_string(setting_info['comment'])}{setting_name} = {repr(settings_dict[setting_name])}\n\n")
|
||||||
|
|
||||||
|
|
||||||
def add_missing_settings(settings_dict):
|
def add_missing_settings(settings_dict):
|
||||||
@@ -481,7 +481,7 @@ upgrade_functions = {
|
|||||||
|
|
||||||
|
|
||||||
def log_ignored_line(line_number, message):
|
def log_ignored_line(line_number, message):
|
||||||
print('WARNING: Ignoring settings.txt line ' + str(line_number) + ' (' + message + ')')
|
print(f'WARNING: Ignoring settings.txt line {line_number} ({message})')
|
||||||
|
|
||||||
|
|
||||||
if os.path.isfile("settings.txt"):
|
if os.path.isfile("settings.txt"):
|
||||||
@@ -535,7 +535,7 @@ else:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if target.id not in acceptable_targets:
|
if target.id not in acceptable_targets:
|
||||||
log_ignored_line(node.lineno, target.id + " is not a valid setting")
|
log_ignored_line(node.lineno, f"{target.id} is not a valid setting")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if type(node.value) not in attributes:
|
if type(node.value) not in attributes:
|
||||||
@@ -645,6 +645,6 @@ def settings_page():
|
|||||||
for func, old_value, value in to_call:
|
for func, old_value, value in to_call:
|
||||||
func(old_value, value)
|
func(old_value, value)
|
||||||
|
|
||||||
return flask.redirect(util.URL_ORIGIN + '/settings', 303)
|
return flask.redirect(f'{util.URL_ORIGIN}/settings', 303)
|
||||||
else:
|
else:
|
||||||
flask.abort(400)
|
flask.abort(400)
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ class TestChannelCtokenV5:
|
|||||||
|
|
||||||
def _decode_outer(self, ctoken):
|
def _decode_outer(self, ctoken):
|
||||||
"""Decode the outer protobuf layer of a ctoken."""
|
"""Decode the outer protobuf layer of a ctoken."""
|
||||||
raw = base64.urlsafe_b64decode(ctoken + '==')
|
raw = base64.urlsafe_b64decode(f'{ctoken}==')
|
||||||
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
|
return {fn: val for _, fn, val in proto.read_protobuf(raw)}
|
||||||
|
|
||||||
def test_shorts_token_generates_without_error(self):
|
def test_shorts_token_generates_without_error(self):
|
||||||
@@ -68,8 +68,8 @@ class TestChannelCtokenV5:
|
|||||||
assert t_with_shorts != t_without_shorts
|
assert t_with_shorts != t_without_shorts
|
||||||
|
|
||||||
# Decode and verify the filter is present
|
# Decode and verify the filter is present
|
||||||
raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==')
|
raw_with_shorts = base64.urlsafe_b64decode(f'{t_with_shorts}==')
|
||||||
raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==')
|
raw_without_shorts = base64.urlsafe_b64decode(f'{t_without_shorts}==')
|
||||||
|
|
||||||
# Parse the outer protobuf structure
|
# Parse the outer protobuf structure
|
||||||
import youtube.proto as proto
|
import youtube.proto as proto
|
||||||
@@ -95,8 +95,8 @@ class TestChannelCtokenV5:
|
|||||||
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
|
decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii'))
|
||||||
|
|
||||||
# Decode the base64 data
|
# Decode the base64 data
|
||||||
decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==')
|
decoded_with_bytes = base64.urlsafe_b64decode(f'{decoded_with}==')
|
||||||
decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==')
|
decoded_without_bytes = base64.urlsafe_b64decode(f'{decoded_without}==')
|
||||||
|
|
||||||
# Parse the decoded protobuf data
|
# Parse the decoded protobuf data
|
||||||
fields_with = list(proto.read_protobuf(decoded_with_bytes))
|
fields_with = list(proto.read_protobuf(decoded_with_bytes))
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ theme_names = {
|
|||||||
@yt_app.context_processor
|
@yt_app.context_processor
|
||||||
def inject_theme_preference():
|
def inject_theme_preference():
|
||||||
return {
|
return {
|
||||||
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css',
|
'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css',
|
||||||
'settings': settings,
|
'settings': settings,
|
||||||
# Detect version
|
# Detect version
|
||||||
'current_version': app_version()['version'],
|
'current_version': app_version()['version'],
|
||||||
@@ -145,9 +145,9 @@ def error_page(e):
|
|||||||
' exit node is overutilized. Try getting a new exit node by'
|
' exit node is overutilized. Try getting a new exit node by'
|
||||||
' using the New Identity button in the Tor Browser.')
|
' using the New Identity button in the Tor Browser.')
|
||||||
if fetch_err.error_message:
|
if fetch_err.error_message:
|
||||||
error_message += '\n\n' + fetch_err.error_message
|
error_message += f'\n\n{fetch_err.error_message}'
|
||||||
if fetch_err.ip:
|
if fetch_err.ip:
|
||||||
error_message += '\n\nExit node IP address: ' + fetch_err.ip
|
error_message += f'\n\nExit node IP address: {fetch_err.ip}'
|
||||||
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
|
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
|
||||||
|
|
||||||
elif error_code == '429':
|
elif error_code == '429':
|
||||||
@@ -157,7 +157,7 @@ def error_page(e):
|
|||||||
'• Enable Tor routing in Settings for automatic IP rotation\n'
|
'• Enable Tor routing in Settings for automatic IP rotation\n'
|
||||||
'• Use a VPN to change your IP address')
|
'• Use a VPN to change your IP address')
|
||||||
if fetch_err.ip:
|
if fetch_err.ip:
|
||||||
error_message += '\n\nYour IP: ' + fetch_err.ip
|
error_message += f'\n\nYour IP: {fetch_err.ip}'
|
||||||
return flask.render_template('error.html', error_message=error_message, slim=slim), 429
|
return flask.render_template('error.html', error_message=error_message, slim=slim), 429
|
||||||
|
|
||||||
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
|
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
|
||||||
@@ -179,7 +179,7 @@ def error_page(e):
|
|||||||
# Catch-all for any other FetchError (400, etc.)
|
# Catch-all for any other FetchError (400, etc.)
|
||||||
error_message = f'Error communicating with YouTube ({error_code}).'
|
error_message = f'Error communicating with YouTube ({error_code}).'
|
||||||
if fetch_err.error_message:
|
if fetch_err.error_message:
|
||||||
error_message += '\n\n' + fetch_err.error_message
|
error_message += f'\n\n{fetch_err.error_message}'
|
||||||
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
|
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
|
||||||
|
|
||||||
return flask.render_template('error.html', traceback=traceback.format_exc(),
|
return flask.render_template('error.html', traceback=traceback.format_exc(),
|
||||||
|
|||||||
@@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
|
|||||||
# For now it seems to be constant for the API endpoint, not dependent
|
# For now it seems to be constant for the API endpoint, not dependent
|
||||||
# on the browsing session or channel
|
# on the browsing session or channel
|
||||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'context': {
|
'context': {
|
||||||
@@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id):
|
|||||||
return 1000
|
return 1000
|
||||||
|
|
||||||
# Uploads playlist
|
# Uploads playlist
|
||||||
playlist_id = 'UU' + channel_id[2:]
|
playlist_id = f'UU{channel_id[2:]}'
|
||||||
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
|
url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = util.fetch_url(url, headers_mobile,
|
response = util.fetch_url(url, headers_mobile,
|
||||||
@@ -328,7 +328,7 @@ def get_channel_id(base_url):
|
|||||||
# method that gives the smallest possible response at ~4 kb
|
# method that gives the smallest possible response at ~4 kb
|
||||||
# needs to be as fast as possible
|
# needs to be as fast as possible
|
||||||
base_url = base_url.replace('https://www', 'https://m') # avoid redirect
|
base_url = base_url.replace('https://www', 'https://m') # avoid redirect
|
||||||
response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
|
response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile,
|
||||||
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
|
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
|
||||||
match = channel_id_re.search(response)
|
match = channel_id_re.search(response)
|
||||||
if match:
|
if match:
|
||||||
@@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page):
|
|||||||
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
|
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
|
||||||
|
|
||||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'context': {
|
'context': {
|
||||||
@@ -414,18 +414,18 @@ def post_process_channel_info(info):
|
|||||||
|
|
||||||
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
|
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
|
||||||
if channel_id:
|
if channel_id:
|
||||||
base_url = 'https://www.youtube.com/channel/' + channel_id
|
base_url = f'https://www.youtube.com/channel/{channel_id}'
|
||||||
|
|
||||||
# Build URL with sort parameter
|
# Build URL with sort parameter
|
||||||
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
|
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
|
||||||
# Note: 'da' (oldest) was removed by YouTube in January 2026
|
# Note: 'da' (oldest) was removed by YouTube in January 2026
|
||||||
url = base_url + '/' + tab + '?pbj=1&view=0'
|
url = f'{base_url}/{tab}?pbj=1&view=0'
|
||||||
if sort:
|
if sort:
|
||||||
# Map sort values to YouTube's URL parameter values
|
# Map sort values to YouTube's URL parameter values
|
||||||
sort_map = {'3': 'dd', '4': 'lad'}
|
sort_map = {'3': 'dd', '4': 'lad'}
|
||||||
url += '&sort=' + sort_map.get(sort, 'dd')
|
url += f'&sort={sort_map.get(sort, "dd")}'
|
||||||
|
|
||||||
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab)
|
return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}')
|
||||||
|
|
||||||
|
|
||||||
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
|
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
|
||||||
@@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
|||||||
if page_number == 1:
|
if page_number == 1:
|
||||||
tasks = (
|
tasks = (
|
||||||
gevent.spawn(playlist.playlist_first_page,
|
gevent.spawn(playlist.playlist_first_page,
|
||||||
'UU' + channel_id[2:],
|
f'UU{channel_id[2:]}',
|
||||||
report_text='Retrieved channel videos'),
|
report_text='Retrieved channel videos'),
|
||||||
gevent.spawn(get_metadata, channel_id),
|
gevent.spawn(get_metadata, channel_id),
|
||||||
)
|
)
|
||||||
@@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
|||||||
set_cached_number_of_videos(channel_id, number_of_videos)
|
set_cached_number_of_videos(channel_id, number_of_videos)
|
||||||
else:
|
else:
|
||||||
tasks = (
|
tasks = (
|
||||||
gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:],
|
gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}',
|
||||||
page_number, include_shorts=True),
|
page_number, include_shorts=True),
|
||||||
gevent.spawn(get_metadata, channel_id),
|
gevent.spawn(get_metadata, channel_id),
|
||||||
gevent.spawn(get_number_of_videos_channel, channel_id),
|
gevent.spawn(get_number_of_videos_channel, channel_id),
|
||||||
gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:],
|
gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}',
|
||||||
report_text='Retrieved channel video count'),
|
report_text='Retrieved channel video count'),
|
||||||
)
|
)
|
||||||
gevent.joinall(tasks)
|
gevent.joinall(tasks)
|
||||||
@@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
|||||||
elif tab == 'search' and channel_id:
|
elif tab == 'search' and channel_id:
|
||||||
polymer_json = get_channel_search_json(channel_id, query, page_number)
|
polymer_json = get_channel_search_json(channel_id, query, page_number)
|
||||||
elif tab == 'search':
|
elif tab == 'search':
|
||||||
url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='')
|
url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}'
|
||||||
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
|
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
|
||||||
elif tab != 'videos':
|
elif tab != 'videos':
|
||||||
flask.abort(404, 'Unknown channel tab: ' + tab)
|
flask.abort(404, f'Unknown channel tab: {tab}')
|
||||||
|
|
||||||
if polymer_json is not None and info is None:
|
if polymer_json is not None and info is None:
|
||||||
info = yt_data_extract.extract_channel_info(
|
info = yt_data_extract.extract_channel_info(
|
||||||
@@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
|||||||
return flask.render_template('error.html', error_message=info['error'])
|
return flask.render_template('error.html', error_message=info['error'])
|
||||||
|
|
||||||
if channel_id:
|
if channel_id:
|
||||||
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
|
info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}'
|
||||||
info['channel_id'] = channel_id
|
info['channel_id'] = channel_id
|
||||||
else:
|
else:
|
||||||
channel_id = info['channel_id']
|
channel_id = info['channel_id']
|
||||||
@@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
|||||||
@yt_app.route('/channel/<channel_id>/')
|
@yt_app.route('/channel/<channel_id>/')
|
||||||
@yt_app.route('/channel/<channel_id>/<tab>')
|
@yt_app.route('/channel/<channel_id>/<tab>')
|
||||||
def get_channel_page(channel_id, tab='videos'):
|
def get_channel_page(channel_id, tab='videos'):
|
||||||
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
|
return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id)
|
||||||
|
|
||||||
|
|
||||||
@yt_app.route('/user/<username>/')
|
@yt_app.route('/user/<username>/')
|
||||||
@yt_app.route('/user/<username>/<tab>')
|
@yt_app.route('/user/<username>/<tab>')
|
||||||
def get_user_page(username, tab='videos'):
|
def get_user_page(username, tab='videos'):
|
||||||
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
|
return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request)
|
||||||
|
|
||||||
|
|
||||||
@yt_app.route('/c/<custom>/')
|
@yt_app.route('/c/<custom>/')
|
||||||
@yt_app.route('/c/<custom>/<tab>')
|
@yt_app.route('/c/<custom>/<tab>')
|
||||||
def get_custom_c_page(custom, tab='videos'):
|
def get_custom_c_page(custom, tab='videos'):
|
||||||
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
|
return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request)
|
||||||
|
|
||||||
|
|
||||||
@yt_app.route('/<custom>')
|
@yt_app.route('/<custom>')
|
||||||
@yt_app.route('/<custom>/<tab>')
|
@yt_app.route('/<custom>/<tab>')
|
||||||
def get_toplevel_custom_page(custom, tab='videos'):
|
def get_toplevel_custom_page(custom, tab='videos'):
|
||||||
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)
|
return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request)
|
||||||
|
|||||||
@@ -104,20 +104,19 @@ def post_process_comments_info(comments_info):
|
|||||||
comment['replies_url'] = None
|
comment['replies_url'] = None
|
||||||
comment['replies_url'] = concat_or_none(
|
comment['replies_url'] = concat_or_none(
|
||||||
util.URL_ORIGIN,
|
util.URL_ORIGIN,
|
||||||
'/comments?replies=1&ctoken=' + ctoken)
|
f'/comments?replies=1&ctoken={ctoken}')
|
||||||
|
|
||||||
if reply_count == 0:
|
if reply_count == 0:
|
||||||
comment['view_replies_text'] = 'Reply'
|
comment['view_replies_text'] = 'Reply'
|
||||||
elif reply_count == 1:
|
elif reply_count == 1:
|
||||||
comment['view_replies_text'] = '1 reply'
|
comment['view_replies_text'] = '1 reply'
|
||||||
else:
|
else:
|
||||||
comment['view_replies_text'] = str(reply_count) + ' replies'
|
comment['view_replies_text'] = f'{reply_count} replies'
|
||||||
|
|
||||||
if comment['approx_like_count'] == '1':
|
if comment['approx_like_count'] == '1':
|
||||||
comment['likes_text'] = '1 like'
|
comment['likes_text'] = '1 like'
|
||||||
else:
|
else:
|
||||||
comment['likes_text'] = (str(comment['approx_like_count'])
|
comment['likes_text'] = f"{comment['approx_like_count']} likes"
|
||||||
+ ' likes')
|
|
||||||
|
|
||||||
comments_info['include_avatars'] = settings.enable_comment_avatars
|
comments_info['include_avatars'] = settings.enable_comment_avatars
|
||||||
if comments_info['ctoken']:
|
if comments_info['ctoken']:
|
||||||
@@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
|
|||||||
comments_info = {'error': None}
|
comments_info = {'error': None}
|
||||||
try:
|
try:
|
||||||
other_sort_url = (
|
other_sort_url = (
|
||||||
util.URL_ORIGIN + '/comments?ctoken='
|
f"{util.URL_ORIGIN}/comments?ctoken="
|
||||||
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
|
f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}"
|
||||||
)
|
)
|
||||||
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
|
other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}'
|
||||||
|
|
||||||
this_sort_url = (util.URL_ORIGIN
|
this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken="
|
||||||
+ '/comments?ctoken='
|
f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}")
|
||||||
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
|
|
||||||
|
|
||||||
comments_info['comment_links'] = [
|
comments_info['comment_links'] = [
|
||||||
(other_sort_text, other_sort_url),
|
(other_sort_text, other_sort_url),
|
||||||
@@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
|
|||||||
if e.code == '429' and settings.route_tor:
|
if e.code == '429' and settings.route_tor:
|
||||||
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
|
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
|
||||||
if e.error_message:
|
if e.error_message:
|
||||||
comments_info['error'] += '\n\n' + e.error_message
|
comments_info['error'] += f'\n\n{e.error_message}'
|
||||||
comments_info['error'] += '\n\nExit node IP address: %s' % e.ip
|
comments_info['error'] += f'\n\nExit node IP address: {e.ip}'
|
||||||
else:
|
else:
|
||||||
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
|
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
|
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
|
||||||
|
|
||||||
if comments_info.get('error'):
|
if comments_info.get('error'):
|
||||||
print('Error retrieving comments for ' + str(video_id) + ':\n' +
|
print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}')
|
||||||
comments_info['error'])
|
|
||||||
|
|
||||||
return comments_info
|
return comments_info
|
||||||
|
|
||||||
@@ -218,12 +215,10 @@ def get_comments_page():
|
|||||||
other_sort_url = None
|
other_sort_url = None
|
||||||
else:
|
else:
|
||||||
other_sort_url = (
|
other_sort_url = (
|
||||||
util.URL_ORIGIN
|
f'{util.URL_ORIGIN}/comments?ctoken='
|
||||||
+ '/comments?ctoken='
|
f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}'
|
||||||
+ make_comment_ctoken(comments_info['video_id'],
|
|
||||||
sort=1-comments_info['sort'])
|
|
||||||
)
|
)
|
||||||
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top')
|
other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}'
|
||||||
comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
|
comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
|
||||||
|
|
||||||
return flask.render_template(
|
return flask.render_template(
|
||||||
|
|||||||
@@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name):
|
|||||||
util.add_extra_html_info(video)
|
util.add_extra_html_info(video)
|
||||||
if video['id'] + '.jpg' in thumbnails:
|
if video['id'] + '.jpg' in thumbnails:
|
||||||
video['thumbnail'] = (
|
video['thumbnail'] = (
|
||||||
'/https://youtube.com/data/playlist_thumbnails/'
|
f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg')
|
||||||
+ playlist_name
|
|
||||||
+ '/' + video['id'] + '.jpg')
|
|
||||||
else:
|
else:
|
||||||
video['thumbnail'] = util.get_thumbnail_url(video['id'])
|
video['thumbnail'] = util.get_thumbnail_url(video['id'])
|
||||||
missing_thumbnails.append(video['id'])
|
missing_thumbnails.append(video['id'])
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
|
|||||||
|
|
||||||
continuation_info = proto.string(3, proto.percent_b64encode(offset))
|
continuation_info = proto.string(3, proto.percent_b64encode(offset))
|
||||||
|
|
||||||
playlist_id = proto.string(2, 'VL' + playlist_id)
|
playlist_id = proto.string(2, f'VL{playlist_id}')
|
||||||
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
|
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
|
||||||
|
|
||||||
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
|
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
|
||||||
@@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
|
|||||||
use_mobile=False):
|
use_mobile=False):
|
||||||
# Use innertube API (pbj=1 no longer works for many playlists)
|
# Use innertube API (pbj=1 no longer works for many playlists)
|
||||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'context': {
|
'context': {
|
||||||
@@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
|
|||||||
'clientVersion': '2.20240327.00.00',
|
'clientVersion': '2.20240327.00.00',
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
'browseId': 'VL' + playlist_id,
|
'browseId': f'VL{playlist_id}',
|
||||||
}
|
}
|
||||||
|
|
||||||
content_type_header = (('Content-Type', 'application/json'),)
|
content_type_header = (('Content-Type', 'application/json'),)
|
||||||
@@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
|
|||||||
page_size = 100
|
page_size = 100
|
||||||
|
|
||||||
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
||||||
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
|
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
|
||||||
|
|
||||||
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
|
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
|
||||||
include_shorts=include_shorts)
|
include_shorts=include_shorts)
|
||||||
@@ -97,7 +97,7 @@ def get_playlist_page():
|
|||||||
if playlist_id.startswith('RD'):
|
if playlist_id.startswith('RD'):
|
||||||
first_video_id = playlist_id[2:] # video ID after 'RD' prefix
|
first_video_id = playlist_id[2:] # video ID after 'RD' prefix
|
||||||
return flask.redirect(
|
return flask.redirect(
|
||||||
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id,
|
f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}',
|
||||||
302
|
302
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -132,9 +132,9 @@ def get_playlist_page():
|
|||||||
if 'id' in item and not item.get('thumbnail'):
|
if 'id' in item and not item.get('thumbnail'):
|
||||||
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
|
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
|
||||||
|
|
||||||
item['url'] += '&list=' + playlist_id
|
item['url'] += f'&list={playlist_id}'
|
||||||
if item['index']:
|
if item['index']:
|
||||||
item['url'] += '&index=' + str(item['index'])
|
item['url'] += f'&index={item["index"]}'
|
||||||
|
|
||||||
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
|
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
|
||||||
if video_count is None:
|
if video_count is None:
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ def read_varint(data):
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
if i == 0:
|
if i == 0:
|
||||||
raise EOFError()
|
raise EOFError()
|
||||||
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
|
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
|
||||||
result |= (byte & 127) << 7*i
|
result |= (byte & 127) << 7*i
|
||||||
if not byte & 128:
|
if not byte & 128:
|
||||||
break
|
break
|
||||||
@@ -118,7 +118,7 @@ def read_protobuf(data):
|
|||||||
elif wire_type == 5:
|
elif wire_type == 5:
|
||||||
value = data.read(4)
|
value = data.read(4)
|
||||||
else:
|
else:
|
||||||
raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell()))
|
raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}")
|
||||||
yield (wire_type, field_number, value)
|
yield (wire_type, field_number, value)
|
||||||
|
|
||||||
|
|
||||||
@@ -170,8 +170,7 @@ def _make_protobuf(data):
|
|||||||
elif field[0] == 2:
|
elif field[0] == 2:
|
||||||
result += string(field[1], _make_protobuf(field[2]))
|
result += string(field[1], _make_protobuf(field[2]))
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError('Wire type ' + str(field[0])
|
raise NotImplementedError(f'Wire type {field[0]} not implemented')
|
||||||
+ ' not implemented')
|
|
||||||
return result
|
return result
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@@ -218,4 +217,4 @@ def b64_to_bytes(data):
|
|||||||
if isinstance(data, bytes):
|
if isinstance(data, bytes):
|
||||||
data = data.decode('ascii')
|
data = data.decode('ascii')
|
||||||
data = data.replace("%3D", "=")
|
data = data.replace("%3D", "=")
|
||||||
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
|
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ def read_varint(data):
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
if i == 0:
|
if i == 0:
|
||||||
raise EOFError()
|
raise EOFError()
|
||||||
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
|
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
|
||||||
result |= (byte & 127) << 7*i
|
result |= (byte & 127) << 7*i
|
||||||
if not byte & 128:
|
if not byte & 128:
|
||||||
break
|
break
|
||||||
@@ -235,8 +235,7 @@ def _make_protobuf(data):
|
|||||||
elif field[0] == 2:
|
elif field[0] == 2:
|
||||||
result += string(field[1], _make_protobuf(field[2]))
|
result += string(field[1], _make_protobuf(field[2]))
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError('Wire type ' + str(field[0])
|
raise NotImplementedError(f'Wire type {field[0]} not implemented')
|
||||||
+ ' not implemented')
|
|
||||||
return result
|
return result
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@@ -286,7 +285,7 @@ def b64_to_bytes(data):
|
|||||||
if isinstance(data, bytes):
|
if isinstance(data, bytes):
|
||||||
data = data.decode('ascii')
|
data = data.decode('ascii')
|
||||||
data = data.replace("%3D", "=")
|
data = data.replace("%3D", "=")
|
||||||
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
|
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
|
||||||
# --------------------------------------------------------------------
|
# --------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -344,7 +343,7 @@ fromhex = bytes.fromhex
|
|||||||
|
|
||||||
|
|
||||||
def aligned_ascii(data):
|
def aligned_ascii(data):
|
||||||
return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data)
|
return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data)
|
||||||
|
|
||||||
|
|
||||||
def parse_protobuf(data, mutable=False, spec=()):
|
def parse_protobuf(data, mutable=False, spec=()):
|
||||||
@@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()):
|
|||||||
elif wire_type == 5:
|
elif wire_type == 5:
|
||||||
value = data.read(4)
|
value = data.read(4)
|
||||||
else:
|
else:
|
||||||
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell()))
|
raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}")
|
||||||
if mutable:
|
if mutable:
|
||||||
yield [wire_type, field_number, value]
|
yield [wire_type, field_number, value]
|
||||||
else:
|
else:
|
||||||
@@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None):
|
|||||||
if map01 is not None:
|
if map01 is not None:
|
||||||
map01 = _bytes_from_decode_data(map01)
|
map01 = _bytes_from_decode_data(map01)
|
||||||
assert len(map01) == 1, repr(map01)
|
assert len(map01) == 1, repr(map01)
|
||||||
s = s.translate(bytes.maketrans(b'01', b'O' + map01))
|
s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}'))
|
||||||
if casefold:
|
if casefold:
|
||||||
s = s.upper()
|
s = s.upper()
|
||||||
# Strip off pad characters from the right. We need to count the pad
|
# Strip off pad characters from the right. We need to count the pad
|
||||||
@@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None):
|
|||||||
def dec32(data):
|
def dec32(data):
|
||||||
if isinstance(data, bytes):
|
if isinstance(data, bytes):
|
||||||
data = data.decode('ascii')
|
data = data.decode('ascii')
|
||||||
return b32decode(data + "="*((8 - len(data)%8)%8))
|
return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}')
|
||||||
|
|
||||||
|
|
||||||
_patterns = [
|
_patterns = [
|
||||||
@@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work
|
|||||||
if len(obj) == 3: # (wire_type, field_number, data)
|
if len(obj) == 3: # (wire_type, field_number, data)
|
||||||
return obj.__repr__()
|
return obj.__repr__()
|
||||||
else: # (base64, [...])
|
else: # (base64, [...])
|
||||||
return ('(' + obj[0].__repr__() + ',\n'
|
return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)"
|
||||||
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
|
|
||||||
+ ')')
|
|
||||||
elif isinstance(obj, list):
|
elif isinstance(obj, list):
|
||||||
# [wire_type, field_number, data]
|
# [wire_type, field_number, data]
|
||||||
if (len(obj) == 3
|
if (len(obj) == 3
|
||||||
@@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work
|
|||||||
elif (len(obj) == 3
|
elif (len(obj) == 3
|
||||||
and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
|
and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
|
||||||
):
|
):
|
||||||
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n'
|
return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]"
|
||||||
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
|
|
||||||
+ ']')
|
|
||||||
else:
|
else:
|
||||||
s = '[\n'
|
s = '[\n'
|
||||||
for x in obj:
|
for x in obj:
|
||||||
s += indent_lines(_pp(x, indent), indent) + ',\n'
|
s += f"{indent_lines(_pp(x, indent), indent)},\n"
|
||||||
s += ']'
|
s += ']'
|
||||||
return s
|
return s
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
|
|||||||
'X-YouTube-Client-Name': '1',
|
'X-YouTube-Client-Name': '1',
|
||||||
'X-YouTube-Client-Version': '2.20180418',
|
'X-YouTube-Client-Version': '2.20180418',
|
||||||
}
|
}
|
||||||
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
|
url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}"
|
||||||
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
|
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
|
||||||
info = json.loads(content)
|
info = json.loads(content)
|
||||||
return info
|
return info
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ def delete_thumbnails(to_delete):
|
|||||||
os.remove(os.path.join(thumbnails_directory, thumbnail))
|
os.remove(os.path.join(thumbnails_directory, thumbnail))
|
||||||
existing_thumbnails.remove(video_id)
|
existing_thumbnails.remove(video_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
print('Failed to delete thumbnail: ' + thumbnail)
|
print(f'Failed to delete thumbnail: {thumbnail}')
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
|
|
||||||
@@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None):
|
|||||||
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
|
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
|
||||||
'author': db_video[5],
|
'author': db_video[5],
|
||||||
'author_id': db_video[6],
|
'author_id': db_video[6],
|
||||||
'author_url': '/https://www.youtube.com/channel/' + db_video[6],
|
'author_url': f'/https://www.youtube.com/channel/{db_video[6]}',
|
||||||
})
|
})
|
||||||
|
|
||||||
return videos, pseudo_number_of_videos
|
return videos, pseudo_number_of_videos
|
||||||
@@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time):
|
|||||||
if delta >= unit_time:
|
if delta >= unit_time:
|
||||||
quantifier = round(delta/unit_time)
|
quantifier = round(delta/unit_time)
|
||||||
if quantifier == 1:
|
if quantifier == 1:
|
||||||
return '1 ' + unit_name + ' ago'
|
return f'1 {unit_name} ago'
|
||||||
else:
|
else:
|
||||||
return str(quantifier) + ' ' + unit_name + 's ago'
|
return f'{quantifier} {unit_name}s ago'
|
||||||
else:
|
else:
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
@@ -363,7 +363,7 @@ def autocheck_dispatcher():
|
|||||||
time_until_earliest_job = earliest_job['next_check_time'] - time.time()
|
time_until_earliest_job = earliest_job['next_check_time'] - time.time()
|
||||||
|
|
||||||
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
|
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
|
||||||
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time']))
|
print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}')
|
||||||
next_check_time = time.time() + 3600*secrets.randbelow(60)/60
|
next_check_time = time.time() + 3600*secrets.randbelow(60)/60
|
||||||
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
|
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
|
||||||
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
|
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
|
||||||
@@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids):
|
|||||||
|
|
||||||
|
|
||||||
def _get_atoma_feed(channel_id):
|
def _get_atoma_feed(channel_id):
|
||||||
url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id
|
url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}'
|
||||||
try:
|
try:
|
||||||
return util.fetch_url(url).decode('utf-8')
|
return util.fetch_url(url).decode('utf-8')
|
||||||
except util.FetchError as e:
|
except util.FetchError as e:
|
||||||
@@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name):
|
|||||||
return channel_info
|
return channel_info
|
||||||
except util.FetchError as e:
|
except util.FetchError as e:
|
||||||
if e.code == '429' and settings.route_tor:
|
if e.code == '429' and settings.route_tor:
|
||||||
error_message = ('Error checking channel ' + channel_status_name
|
error_message = (f'Error checking channel {channel_status_name}: '
|
||||||
+ ': YouTube blocked the request because the'
|
f'YouTube blocked the request because the Tor exit node is overutilized. '
|
||||||
+ ' Tor exit node is overutilized. Try getting a new exit node'
|
f'Try getting a new exit node by using the New Identity button in the Tor Browser.')
|
||||||
+ ' by using the New Identity button in the Tor Browser.')
|
|
||||||
if e.ip:
|
if e.ip:
|
||||||
error_message += ' Exit node IP address: ' + e.ip
|
error_message += f' Exit node IP address: {e.ip}'
|
||||||
print(error_message)
|
print(error_message)
|
||||||
return None
|
return None
|
||||||
elif e.code == '502':
|
elif e.code == '502':
|
||||||
print('Error checking channel', channel_status_name + ':', str(e))
|
print(f'Error checking channel {channel_status_name}: {e}')
|
||||||
return None
|
return None
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
channel_status_name = channel_id
|
channel_status_name = channel_id
|
||||||
|
|
||||||
print("Checking channel: " + channel_status_name)
|
print(f"Checking channel: {channel_status_name}")
|
||||||
|
|
||||||
tasks = (
|
tasks = (
|
||||||
# channel page, need for video duration
|
# channel page, need for video duration
|
||||||
@@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id):
|
|||||||
times_published[video_id_element.text] = time_published
|
times_published[video_id_element.text] = time_published
|
||||||
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
print('Failed to read atoma feed for ' + channel_status_name)
|
print(f'Failed to read atoma feed for {channel_status_name}')
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
except defusedxml.ElementTree.ParseError:
|
except defusedxml.ElementTree.ParseError:
|
||||||
print('Failed to read atoma feed for ' + channel_status_name)
|
print(f'Failed to read atoma feed for {channel_status_name}')
|
||||||
|
|
||||||
if channel_info is None: # there was an error
|
if channel_info is None: # there was an error
|
||||||
return
|
return
|
||||||
if channel_info['error']:
|
if channel_info['error']:
|
||||||
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error'])
|
print(f'Error checking channel {channel_status_name}: {channel_info["error"]}')
|
||||||
return
|
return
|
||||||
|
|
||||||
videos = channel_info['items']
|
videos = channel_info['items']
|
||||||
@@ -1023,7 +1022,7 @@ def get_subscriptions_page():
|
|||||||
tag = request.args.get('tag', None)
|
tag = request.args.get('tag', None)
|
||||||
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
|
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
|
||||||
for video in videos:
|
for video in videos:
|
||||||
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg'
|
video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg'
|
||||||
video['type'] = 'video'
|
video['type'] = 'video'
|
||||||
video['item_size'] = 'small'
|
video['item_size'] = 'small'
|
||||||
util.add_extra_html_info(video)
|
util.add_extra_html_info(video)
|
||||||
@@ -1033,7 +1032,7 @@ def get_subscriptions_page():
|
|||||||
subscription_list = []
|
subscription_list = []
|
||||||
for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
|
for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
|
||||||
subscription_list.append({
|
subscription_list.append({
|
||||||
'channel_url': util.URL_ORIGIN + '/channel/' + channel_id,
|
'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}',
|
||||||
'channel_name': channel_name,
|
'channel_name': channel_name,
|
||||||
'channel_id': channel_id,
|
'channel_id': channel_id,
|
||||||
'muted': muted,
|
'muted': muted,
|
||||||
@@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail):
|
|||||||
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
|
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
|
||||||
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
|
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
|
||||||
try:
|
try:
|
||||||
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
|
image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}")
|
||||||
break
|
break
|
||||||
except util.FetchError as e:
|
except util.FetchError as e:
|
||||||
if '404' in str(e):
|
if '404' in str(e):
|
||||||
continue
|
continue
|
||||||
print("Failed to download thumbnail for " + video_id + ": " + str(e))
|
print(f"Failed to download thumbnail for {video_id}: {e}")
|
||||||
flask.abort(500)
|
flask.abort(500)
|
||||||
except urllib.error.HTTPError as e:
|
except urllib.error.HTTPError as e:
|
||||||
if e.code == 404:
|
if e.code == 404:
|
||||||
continue
|
continue
|
||||||
print("Failed to download thumbnail for " + video_id + ": " + str(e))
|
print(f"Failed to download thumbnail for {video_id}: {e}")
|
||||||
flask.abort(e.code)
|
flask.abort(e.code)
|
||||||
|
|
||||||
if image is None:
|
if image is None:
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ class TorManager:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.old_tor_connection_pool = None
|
self.old_tor_connection_pool = None
|
||||||
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
|
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
|
||||||
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
|
f'socks5h://127.0.0.1:{settings.tor_port}/',
|
||||||
cert_reqs='CERT_REQUIRED')
|
cert_reqs='CERT_REQUIRED')
|
||||||
self.tor_pool_refresh_time = time.monotonic()
|
self.tor_pool_refresh_time = time.monotonic()
|
||||||
settings.add_setting_changed_hook(
|
settings.add_setting_changed_hook(
|
||||||
@@ -92,7 +92,7 @@ class TorManager:
|
|||||||
self.old_tor_connection_pool = self.tor_connection_pool
|
self.old_tor_connection_pool = self.tor_connection_pool
|
||||||
|
|
||||||
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
|
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
|
||||||
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
|
f'socks5h://127.0.0.1:{settings.tor_port}/',
|
||||||
cert_reqs='CERT_REQUIRED')
|
cert_reqs='CERT_REQUIRED')
|
||||||
self.tor_pool_refresh_time = time.monotonic()
|
self.tor_pool_refresh_time = time.monotonic()
|
||||||
|
|
||||||
@@ -198,9 +198,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
|
|||||||
class FetchError(Exception):
|
class FetchError(Exception):
|
||||||
def __init__(self, code, reason='', ip=None, error_message=None):
|
def __init__(self, code, reason='', ip=None, error_message=None):
|
||||||
if error_message:
|
if error_message:
|
||||||
string = code + ' ' + reason + ': ' + error_message
|
string = f"{code} {reason}: {error_message}"
|
||||||
else:
|
else:
|
||||||
string = 'HTTP error during request: ' + code + ' ' + reason
|
string = f"HTTP error during request: {code} {reason}"
|
||||||
Exception.__init__(self, string)
|
Exception.__init__(self, string)
|
||||||
self.code = code
|
self.code = code
|
||||||
self.reason = reason
|
self.reason = reason
|
||||||
@@ -294,14 +294,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
|
|||||||
exception_cause = e.__context__.__context__
|
exception_cause = e.__context__.__context__
|
||||||
if (isinstance(exception_cause, socks.ProxyConnectionError)
|
if (isinstance(exception_cause, socks.ProxyConnectionError)
|
||||||
and settings.route_tor):
|
and settings.route_tor):
|
||||||
msg = ('Failed to connect to Tor. Check that Tor is open and '
|
msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}'
|
||||||
'that your internet connection is working.\n\n'
|
|
||||||
+ str(e))
|
|
||||||
raise FetchError('502', reason='Bad Gateway',
|
raise FetchError('502', reason='Bad Gateway',
|
||||||
error_message=msg)
|
error_message=msg)
|
||||||
elif isinstance(e.__context__,
|
elif isinstance(e.__context__,
|
||||||
urllib3.exceptions.NewConnectionError):
|
urllib3.exceptions.NewConnectionError):
|
||||||
msg = 'Failed to establish a connection.\n\n' + str(e)
|
msg = f'Failed to establish a connection.\n\n{e}'
|
||||||
raise FetchError(
|
raise FetchError(
|
||||||
'502', reason='Bad Gateway',
|
'502', reason='Bad Gateway',
|
||||||
error_message=msg)
|
error_message=msg)
|
||||||
@@ -391,7 +389,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
|
|||||||
if error:
|
if error:
|
||||||
raise FetchError(
|
raise FetchError(
|
||||||
'429', reason=response.reason, ip=ip,
|
'429', reason=response.reason, ip=ip,
|
||||||
error_message='Automatic circuit change: ' + error)
|
error_message=f'Automatic circuit change: {error}')
|
||||||
continue # retry with new identity
|
continue # retry with new identity
|
||||||
|
|
||||||
# Check for client errors (400, 404) - don't retry these
|
# Check for client errors (400, 404) - don't retry these
|
||||||
@@ -467,10 +465,7 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
|
|||||||
headers = {'User-Agent': 'Python-urllib'}
|
headers = {'User-Agent': 'Python-urllib'}
|
||||||
response = pool.request('HEAD', url, headers=headers, retries=retries)
|
response = pool.request('HEAD', url, headers=headers, retries=retries)
|
||||||
if report_text:
|
if report_text:
|
||||||
print(
|
print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}')
|
||||||
report_text,
|
|
||||||
' Latency:',
|
|
||||||
round(time.monotonic() - start_time, 3))
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
|
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
|
||||||
@@ -544,16 +539,16 @@ def download_thumbnail(save_directory, video_id):
|
|||||||
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
|
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
|
||||||
url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
|
url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
|
||||||
try:
|
try:
|
||||||
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id)
|
thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}')
|
||||||
except FetchError as e:
|
except FetchError as e:
|
||||||
if '404' in str(e):
|
if '404' in str(e):
|
||||||
continue
|
continue
|
||||||
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
|
print(f'Failed to download thumbnail for {video_id}: {e}')
|
||||||
return False
|
return False
|
||||||
except urllib.error.HTTPError as e:
|
except urllib.error.HTTPError as e:
|
||||||
if e.code == 404:
|
if e.code == 404:
|
||||||
continue
|
continue
|
||||||
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
|
print(f'Failed to download thumbnail for {video_id}: {e}')
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
with open(save_location, 'wb') as f:
|
with open(save_location, 'wb') as f:
|
||||||
@@ -563,7 +558,7 @@ def download_thumbnail(save_directory, video_id):
|
|||||||
with open(save_location, 'wb') as f:
|
with open(save_location, 'wb') as f:
|
||||||
f.write(thumbnail)
|
f.write(thumbnail)
|
||||||
return True
|
return True
|
||||||
print('No thumbnail available for ' + video_id)
|
print(f'No thumbnail available for {video_id}')
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@@ -698,7 +693,7 @@ def prefix_urls(item):
|
|||||||
|
|
||||||
def add_extra_html_info(item):
|
def add_extra_html_info(item):
|
||||||
if item['type'] == 'video':
|
if item['type'] == 'video':
|
||||||
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
|
item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None
|
||||||
|
|
||||||
video_info = {}
|
video_info = {}
|
||||||
for key in ('id', 'title', 'author', 'duration', 'author_id'):
|
for key in ('id', 'title', 'author', 'duration', 'author_id'):
|
||||||
@@ -721,7 +716,7 @@ def add_extra_html_info(item):
|
|||||||
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
|
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
|
||||||
|
|
||||||
if item.get('author_id') and 'author_url' not in item:
|
if item.get('author_id') and 'author_url' not in item:
|
||||||
item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id']
|
item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}'
|
||||||
|
|
||||||
|
|
||||||
def check_gevent_exceptions(*tasks):
|
def check_gevent_exceptions(*tasks):
|
||||||
@@ -967,7 +962,7 @@ def call_youtube_api(client, api, data):
|
|||||||
user_agent = context['client'].get('userAgent') or mobile_user_agent
|
user_agent = context['client'].get('userAgent') or mobile_user_agent
|
||||||
visitor_data = get_visitor_data()
|
visitor_data = get_visitor_data()
|
||||||
|
|
||||||
url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key
|
url = f'https://{host}/youtubei/v1/{api}?key={key}'
|
||||||
if visitor_data:
|
if visitor_data:
|
||||||
context['client'].update({'visitorData': visitor_data})
|
context['client'].update({'visitorData': visitor_data})
|
||||||
data['context'] = context
|
data['context'] = context
|
||||||
@@ -978,8 +973,8 @@ def call_youtube_api(client, api, data):
|
|||||||
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
|
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
|
||||||
response = fetch_url(
|
response = fetch_url(
|
||||||
url, data=data, headers=headers,
|
url, data=data, headers=headers,
|
||||||
debug_name='youtubei_' + api + '_' + client,
|
debug_name=f'youtubei_{api}_{client}',
|
||||||
report_text='Fetched ' + client + ' youtubei ' + api
|
report_text=f'Fetched {client} youtubei {api}'
|
||||||
).decode('utf-8')
|
).decode('utf-8')
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|||||||
118
youtube/watch.py
118
youtube/watch.py
@@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution):
|
|||||||
if fmt['acodec'] and fmt['vcodec']:
|
if fmt['acodec'] and fmt['vcodec']:
|
||||||
if fmt.get('audio_track_is_default', True) is False:
|
if fmt.get('audio_track_is_default', True) is False:
|
||||||
continue
|
continue
|
||||||
source = {'type': 'video/' + fmt['ext'],
|
source = {'type': f"video/{fmt['ext']}",
|
||||||
'quality_string': short_video_quality_string(fmt)}
|
'quality_string': short_video_quality_string(fmt)}
|
||||||
source['quality_string'] += ' (integrated)'
|
source['quality_string'] += ' (integrated)'
|
||||||
source.update(fmt)
|
source.update(fmt)
|
||||||
@@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution):
|
|||||||
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
|
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
|
||||||
if fmt['bitrate']:
|
if fmt['bitrate']:
|
||||||
fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
|
fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
|
||||||
source = {'type': 'audio/' + fmt['ext'],
|
source = {'type': f"audio/{fmt['ext']}",
|
||||||
'quality_string': audio_quality_string(fmt)}
|
'quality_string': audio_quality_string(fmt)}
|
||||||
source.update(fmt)
|
source.update(fmt)
|
||||||
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"'
|
source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
|
||||||
tid = fmt.get('audio_track_id') or 'default'
|
tid = fmt.get('audio_track_id') or 'default'
|
||||||
if tid not in audio_by_track:
|
if tid not in audio_by_track:
|
||||||
audio_by_track[tid] = {
|
audio_by_track[tid] = {
|
||||||
@@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution):
|
|||||||
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
|
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
|
||||||
if codec_name(fmt['vcodec']) == 'unknown':
|
if codec_name(fmt['vcodec']) == 'unknown':
|
||||||
continue
|
continue
|
||||||
source = {'type': 'video/' + fmt['ext'],
|
source = {'type': f"video/{fmt['ext']}",
|
||||||
'quality_string': short_video_quality_string(fmt)}
|
'quality_string': short_video_quality_string(fmt)}
|
||||||
source.update(fmt)
|
source.update(fmt)
|
||||||
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"'
|
source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
|
||||||
quality = str(fmt['quality']) + 'p' + str(fmt['fps'])
|
quality = f"{fmt['quality']}p{fmt['fps']}"
|
||||||
video_only_sources.setdefault(quality, []).append(source)
|
video_only_sources.setdefault(quality, []).append(source)
|
||||||
|
|
||||||
audio_tracks = []
|
audio_tracks = []
|
||||||
@@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution):
|
|||||||
|
|
||||||
def video_rank(src):
|
def video_rank(src):
|
||||||
''' Sort by settings preference. Use file size as tiebreaker '''
|
''' Sort by settings preference. Use file size as tiebreaker '''
|
||||||
setting_name = 'codec_rank_' + codec_name(src['vcodec'])
|
setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
|
||||||
return (settings.current_settings_dict[setting_name],
|
return (settings.current_settings_dict[setting_name],
|
||||||
src['file_size'])
|
src['file_size'])
|
||||||
pair_info['videos'].sort(key=video_rank)
|
pair_info['videos'].sort(key=video_rank)
|
||||||
@@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
|
|||||||
if auto:
|
if auto:
|
||||||
label += ' (Automatic)'
|
label += ' (Automatic)'
|
||||||
if trans_lang:
|
if trans_lang:
|
||||||
label += ' -> ' + trans_lang
|
label += f' -> {trans_lang}'
|
||||||
|
|
||||||
# Try to use Android caption URL directly (no PO Token needed)
|
# Try to use Android caption URL directly (no PO Token needed)
|
||||||
caption_url = None
|
caption_url = None
|
||||||
@@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
|
|||||||
else:
|
else:
|
||||||
caption_url += '&fmt=vtt'
|
caption_url += '&fmt=vtt'
|
||||||
if trans_lang:
|
if trans_lang:
|
||||||
caption_url += '&tlang=' + trans_lang
|
caption_url += f'&tlang={trans_lang}'
|
||||||
url = util.prefix_url(caption_url)
|
url = util.prefix_url(caption_url)
|
||||||
else:
|
else:
|
||||||
# Fallback to old method
|
# Fallback to old method
|
||||||
@@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id):
|
|||||||
|
|
||||||
player_name = info['player_name']
|
player_name = info['player_name']
|
||||||
if player_name in decrypt_cache:
|
if player_name in decrypt_cache:
|
||||||
print('Using cached decryption function for: ' + player_name)
|
print(f'Using cached decryption function for: {player_name}')
|
||||||
info['decryption_function'] = decrypt_cache[player_name]
|
info['decryption_function'] = decrypt_cache[player_name]
|
||||||
else:
|
else:
|
||||||
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
|
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
|
||||||
base_js = base_js.decode('utf-8')
|
base_js = base_js.decode('utf-8')
|
||||||
err = yt_data_extract.extract_decryption_function(info, base_js)
|
err = yt_data_extract.extract_decryption_function(info, base_js)
|
||||||
if err:
|
if err:
|
||||||
@@ -387,11 +387,11 @@ def fetch_player_response(client, video_id):
|
|||||||
def fetch_watch_page_info(video_id, playlist_id, index):
|
def fetch_watch_page_info(video_id, playlist_id, index):
|
||||||
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
|
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
|
||||||
# videos
|
# videos
|
||||||
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999'
|
url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
|
||||||
if playlist_id:
|
if playlist_id:
|
||||||
url += '&list=' + playlist_id
|
url += f'&list={playlist_id}'
|
||||||
if index:
|
if index:
|
||||||
url += '&index=' + index
|
url += f'&index={index}'
|
||||||
|
|
||||||
headers = (
|
headers = (
|
||||||
('Accept', '*/*'),
|
('Accept', '*/*'),
|
||||||
@@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|||||||
# Register HLS audio tracks for proxy access
|
# Register HLS audio tracks for proxy access
|
||||||
added = 0
|
added = 0
|
||||||
for lang, track in info['hls_audio_tracks'].items():
|
for lang, track in info['hls_audio_tracks'].items():
|
||||||
ck = video_id + '_' + lang
|
ck = f"{video_id}_{lang}"
|
||||||
from youtube.hls_cache import register_track
|
from youtube.hls_cache import register_track
|
||||||
register_track(ck, track['hls_url'],
|
register_track(ck, track['hls_url'],
|
||||||
video_id=video_id, track_id=lang)
|
video_id=video_id, track_id=lang)
|
||||||
@@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|||||||
'audio_track_id': lang,
|
'audio_track_id': lang,
|
||||||
'audio_track_name': track['name'],
|
'audio_track_name': track['name'],
|
||||||
'audio_track_is_default': track['is_default'],
|
'audio_track_is_default': track['is_default'],
|
||||||
'itag': 'hls_' + lang,
|
'itag': f'hls_{lang}',
|
||||||
'ext': 'mp4',
|
'ext': 'mp4',
|
||||||
'audio_bitrate': 128,
|
'audio_bitrate': 128,
|
||||||
'bitrate': 128000,
|
'bitrate': 128000,
|
||||||
@@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|||||||
'fps': None,
|
'fps': None,
|
||||||
'init_range': {'start': 0, 'end': 0},
|
'init_range': {'start': 0, 'end': 0},
|
||||||
'index_range': {'start': 0, 'end': 0},
|
'index_range': {'start': 0, 'end': 0},
|
||||||
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck),
|
'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
|
||||||
's': None,
|
's': None,
|
||||||
'sp': None,
|
'sp': None,
|
||||||
'quality': None,
|
'quality': None,
|
||||||
@@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|||||||
|
|
||||||
# Register HLS manifest for proxying
|
# Register HLS manifest for proxying
|
||||||
if info['hls_manifest_url']:
|
if info['hls_manifest_url']:
|
||||||
ck = video_id + '_video'
|
ck = f"{video_id}_video"
|
||||||
from youtube.hls_cache import register_track
|
from youtube.hls_cache import register_track
|
||||||
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
|
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
|
||||||
# Use proxy URL instead of direct Google Video URL
|
# Use proxy URL instead of direct Google Video URL
|
||||||
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck)
|
info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
|
||||||
|
|
||||||
# Fallback to 'ios' if no valid URLs are found
|
# Fallback to 'ios' if no valid URLs are found
|
||||||
if not info.get('formats') or info.get('player_urls_missing'):
|
if not info.get('formats') or info.get('player_urls_missing'):
|
||||||
@@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|||||||
if info.get('formats'):
|
if info.get('formats'):
|
||||||
decryption_error = decrypt_signatures(info, video_id)
|
decryption_error = decrypt_signatures(info, video_id)
|
||||||
if decryption_error:
|
if decryption_error:
|
||||||
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error
|
info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
|
||||||
|
|
||||||
# check if urls ready (non-live format) in former livestream
|
# check if urls ready (non-live format) in former livestream
|
||||||
# urls not ready if all of them have no filesize
|
# urls not ready if all of them have no filesize
|
||||||
@@ -623,9 +623,9 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
|
|||||||
|
|
||||||
def video_quality_string(format):
|
def video_quality_string(format):
|
||||||
if format['vcodec']:
|
if format['vcodec']:
|
||||||
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?')
|
result = f"{format['width'] or '?'}x{format['height'] or '?'}"
|
||||||
if format['fps']:
|
if format['fps']:
|
||||||
result += ' ' + str(format['fps']) + 'fps'
|
result += f" {format['fps']}fps"
|
||||||
return result
|
return result
|
||||||
elif format['acodec']:
|
elif format['acodec']:
|
||||||
return 'audio only'
|
return 'audio only'
|
||||||
@@ -634,7 +634,7 @@ def video_quality_string(format):
|
|||||||
|
|
||||||
|
|
||||||
def short_video_quality_string(fmt):
|
def short_video_quality_string(fmt):
|
||||||
result = str(fmt['quality'] or '?') + 'p'
|
result = f"{fmt['quality'] or '?'}p"
|
||||||
if fmt['fps']:
|
if fmt['fps']:
|
||||||
result += str(fmt['fps'])
|
result += str(fmt['fps'])
|
||||||
if fmt['vcodec'].startswith('av01'):
|
if fmt['vcodec'].startswith('av01'):
|
||||||
@@ -642,18 +642,18 @@ def short_video_quality_string(fmt):
|
|||||||
elif fmt['vcodec'].startswith('avc'):
|
elif fmt['vcodec'].startswith('avc'):
|
||||||
result += ' h264'
|
result += ' h264'
|
||||||
else:
|
else:
|
||||||
result += ' ' + fmt['vcodec']
|
result += f" {fmt['vcodec']}"
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def audio_quality_string(fmt):
|
def audio_quality_string(fmt):
|
||||||
if fmt['acodec']:
|
if fmt['acodec']:
|
||||||
if fmt['audio_bitrate']:
|
if fmt['audio_bitrate']:
|
||||||
result = '%d' % fmt['audio_bitrate'] + 'k'
|
result = f"{fmt['audio_bitrate']}k"
|
||||||
else:
|
else:
|
||||||
result = '?k'
|
result = '?k'
|
||||||
if fmt['audio_sample_rate']:
|
if fmt['audio_sample_rate']:
|
||||||
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz'
|
result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz"
|
||||||
return result
|
return result
|
||||||
elif fmt['vcodec']:
|
elif fmt['vcodec']:
|
||||||
return 'video only'
|
return 'video only'
|
||||||
@@ -737,9 +737,9 @@ def get_audio_track():
|
|||||||
seg = line if line.startswith('http') else urljoin(playlist_base, line)
|
seg = line if line.startswith('http') else urljoin(playlist_base, line)
|
||||||
# Always use &seg= parameter, never &url= for segments
|
# Always use &seg= parameter, never &url= for segments
|
||||||
playlist_lines.append(
|
playlist_lines.append(
|
||||||
base_url + '/ytl-api/audio-track?id='
|
f'{base_url}/ytl-api/audio-track?id='
|
||||||
+ urllib.parse.quote(cache_key)
|
f'{urllib.parse.quote(cache_key)}'
|
||||||
+ '&seg=' + urllib.parse.quote(seg, safe='')
|
f'&seg={urllib.parse.quote(seg, safe="")}'
|
||||||
)
|
)
|
||||||
|
|
||||||
playlist = '\n'.join(playlist_lines)
|
playlist = '\n'.join(playlist_lines)
|
||||||
@@ -797,9 +797,7 @@ def get_audio_track():
|
|||||||
return url
|
return url
|
||||||
if not url.startswith('http://') and not url.startswith('https://'):
|
if not url.startswith('http://') and not url.startswith('https://'):
|
||||||
url = urljoin(playlist_base, url)
|
url = urljoin(playlist_base, url)
|
||||||
return (base_url + '/ytl-api/audio-track?id='
|
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
|
||||||
+ urllib.parse.quote(cache_key)
|
|
||||||
+ '&seg=' + urllib.parse.quote(url, safe=''))
|
|
||||||
|
|
||||||
playlist_lines = []
|
playlist_lines = []
|
||||||
for line in playlist.split('\n'):
|
for line in playlist.split('\n'):
|
||||||
@@ -812,7 +810,7 @@ def get_audio_track():
|
|||||||
if line.startswith('#') and 'URI=' in line:
|
if line.startswith('#') and 'URI=' in line:
|
||||||
def rewrite_uri_attr(match):
|
def rewrite_uri_attr(match):
|
||||||
uri = match.group(1)
|
uri = match.group(1)
|
||||||
return 'URI="' + proxy_url(uri) + '"'
|
return f'URI="{proxy_url(uri)}"'
|
||||||
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
|
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
|
||||||
playlist_lines.append(line)
|
playlist_lines.append(line)
|
||||||
elif line.startswith('#'):
|
elif line.startswith('#'):
|
||||||
@@ -883,9 +881,7 @@ def get_audio_track():
|
|||||||
if segment_url.startswith('/ytl-api/audio-track'):
|
if segment_url.startswith('/ytl-api/audio-track'):
|
||||||
return segment_url
|
return segment_url
|
||||||
base_url = request.url_root.rstrip('/')
|
base_url = request.url_root.rstrip('/')
|
||||||
return (base_url + '/ytl-api/audio-track?id='
|
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
|
||||||
+ urllib.parse.quote(cache_key)
|
|
||||||
+ '&seg=' + urllib.parse.quote(segment_url))
|
|
||||||
|
|
||||||
playlist_lines = []
|
playlist_lines = []
|
||||||
for line in playlist.split('\n'):
|
for line in playlist.split('\n'):
|
||||||
@@ -949,14 +945,10 @@ def get_hls_manifest():
|
|||||||
|
|
||||||
if is_audio_track:
|
if is_audio_track:
|
||||||
# Audio track playlist - proxy through audio-track endpoint
|
# Audio track playlist - proxy through audio-track endpoint
|
||||||
return (base_url + '/ytl-api/audio-track?id='
|
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
|
||||||
+ urllib.parse.quote(cache_key)
|
|
||||||
+ '&url=' + urllib.parse.quote(url, safe=''))
|
|
||||||
else:
|
else:
|
||||||
# Video segment or variant playlist - proxy through audio-track endpoint
|
# Video segment or variant playlist - proxy through audio-track endpoint
|
||||||
return (base_url + '/ytl-api/audio-track?id='
|
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
|
||||||
+ urllib.parse.quote(cache_key)
|
|
||||||
+ '&seg=' + urllib.parse.quote(url, safe=''))
|
|
||||||
|
|
||||||
# Parse and rewrite the manifest
|
# Parse and rewrite the manifest
|
||||||
manifest_lines = []
|
manifest_lines = []
|
||||||
@@ -974,7 +966,7 @@ def get_hls_manifest():
|
|||||||
nonlocal rewritten_count
|
nonlocal rewritten_count
|
||||||
uri = match.group(1)
|
uri = match.group(1)
|
||||||
rewritten_count += 1
|
rewritten_count += 1
|
||||||
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"'
|
return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
|
||||||
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
|
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
|
||||||
manifest_lines.append(line)
|
manifest_lines.append(line)
|
||||||
elif line.startswith('#'):
|
elif line.startswith('#'):
|
||||||
@@ -1053,7 +1045,7 @@ def get_storyboard_vtt():
|
|||||||
ts = 0 # current timestamp
|
ts = 0 # current timestamp
|
||||||
|
|
||||||
for i in range(storyboard.storyboard_count):
|
for i in range(storyboard.storyboard_count):
|
||||||
url = '/' + storyboard.url.replace("$M", str(i))
|
url = f'/{storyboard.url.replace("$M", str(i))}'
|
||||||
interval = storyboard.interval
|
interval = storyboard.interval
|
||||||
w, h = storyboard.width, storyboard.height
|
w, h = storyboard.width, storyboard.height
|
||||||
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
|
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
|
||||||
@@ -1078,7 +1070,7 @@ def get_watch_page(video_id=None):
|
|||||||
if not video_id:
|
if not video_id:
|
||||||
return flask.render_template('error.html', error_message='Missing video id'), 404
|
return flask.render_template('error.html', error_message='Missing video id'), 404
|
||||||
if len(video_id) < 11:
|
if len(video_id) < 11:
|
||||||
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404
|
return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
|
||||||
|
|
||||||
time_start_str = request.args.get('t', '0s')
|
time_start_str = request.args.get('t', '0s')
|
||||||
time_start = 0
|
time_start = 0
|
||||||
@@ -1141,9 +1133,9 @@ def get_watch_page(video_id=None):
|
|||||||
util.prefix_urls(item)
|
util.prefix_urls(item)
|
||||||
util.add_extra_html_info(item)
|
util.add_extra_html_info(item)
|
||||||
if playlist_id:
|
if playlist_id:
|
||||||
item['url'] += '&list=' + playlist_id
|
item['url'] += f'&list={playlist_id}'
|
||||||
if item['index']:
|
if item['index']:
|
||||||
item['url'] += '&index=' + str(item['index'])
|
item['url'] += f'&index={item["index"]}'
|
||||||
info['playlist']['author_url'] = util.prefix_url(
|
info['playlist']['author_url'] = util.prefix_url(
|
||||||
info['playlist']['author_url'])
|
info['playlist']['author_url'])
|
||||||
if settings.img_prefix:
|
if settings.img_prefix:
|
||||||
@@ -1159,16 +1151,16 @@ def get_watch_page(video_id=None):
|
|||||||
filename = title
|
filename = title
|
||||||
ext = fmt.get('ext')
|
ext = fmt.get('ext')
|
||||||
if ext:
|
if ext:
|
||||||
filename += '.' + ext
|
filename += f'.{ext}'
|
||||||
fmt['url'] = fmt['url'].replace(
|
fmt['url'] = fmt['url'].replace(
|
||||||
'/videoplayback',
|
'/videoplayback',
|
||||||
'/videoplayback/name/' + filename)
|
f'/videoplayback/name/{filename}')
|
||||||
|
|
||||||
download_formats = []
|
download_formats = []
|
||||||
|
|
||||||
for format in (info['formats'] + info['hls_formats']):
|
for format in (info['formats'] + info['hls_formats']):
|
||||||
if format['acodec'] and format['vcodec']:
|
if format['acodec'] and format['vcodec']:
|
||||||
codecs_string = format['acodec'] + ', ' + format['vcodec']
|
codecs_string = f"{format['acodec']}, {format['vcodec']}"
|
||||||
else:
|
else:
|
||||||
codecs_string = format['acodec'] or format['vcodec'] or '?'
|
codecs_string = format['acodec'] or format['vcodec'] or '?'
|
||||||
download_formats.append({
|
download_formats.append({
|
||||||
@@ -1247,12 +1239,9 @@ def get_watch_page(video_id=None):
|
|||||||
for source in subtitle_sources:
|
for source in subtitle_sources:
|
||||||
best_caption_parse = urllib.parse.urlparse(
|
best_caption_parse = urllib.parse.urlparse(
|
||||||
source['url'].lstrip('/'))
|
source['url'].lstrip('/'))
|
||||||
transcript_url = (util.URL_ORIGIN
|
transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
|
||||||
+ '/watch/transcript'
|
|
||||||
+ best_caption_parse.path
|
|
||||||
+ '?' + best_caption_parse.query)
|
|
||||||
other_downloads.append({
|
other_downloads.append({
|
||||||
'label': 'Video Transcript: ' + source['label'],
|
'label': f'Video Transcript: {source["label"]}',
|
||||||
'ext': 'txt',
|
'ext': 'txt',
|
||||||
'url': transcript_url
|
'url': transcript_url
|
||||||
})
|
})
|
||||||
@@ -1263,7 +1252,7 @@ def get_watch_page(video_id=None):
|
|||||||
template_name = 'watch.html'
|
template_name = 'watch.html'
|
||||||
return flask.render_template(template_name,
|
return flask.render_template(template_name,
|
||||||
header_playlist_names = local_playlist.get_playlist_names(),
|
header_playlist_names = local_playlist.get_playlist_names(),
|
||||||
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
|
uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
|
||||||
time_published = info['time_published'],
|
time_published = info['time_published'],
|
||||||
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
|
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
|
||||||
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
|
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
|
||||||
@@ -1305,10 +1294,10 @@ def get_watch_page(video_id=None):
|
|||||||
ip_address = info['ip_address'] if settings.route_tor else None,
|
ip_address = info['ip_address'] if settings.route_tor else None,
|
||||||
invidious_used = info['invidious_used'],
|
invidious_used = info['invidious_used'],
|
||||||
invidious_reload_button = info['invidious_reload_button'],
|
invidious_reload_button = info['invidious_reload_button'],
|
||||||
video_url = util.URL_ORIGIN + '/watch?v=' + video_id,
|
video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
|
||||||
video_id = video_id,
|
video_id = video_id,
|
||||||
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' +
|
storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
|
||||||
urlencode([('spec_url', info['storyboard_spec_url'])])
|
f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
|
||||||
if info['storyboard_spec_url'] else None),
|
if info['storyboard_spec_url'] else None),
|
||||||
|
|
||||||
js_data = {
|
js_data = {
|
||||||
@@ -1335,7 +1324,7 @@ def get_watch_page(video_id=None):
|
|||||||
|
|
||||||
@yt_app.route('/api/<path:dummy>')
|
@yt_app.route('/api/<path:dummy>')
|
||||||
def get_captions(dummy):
|
def get_captions(dummy):
|
||||||
url = 'https://www.youtube.com' + request.full_path
|
url = f'https://www.youtube.com{request.full_path}'
|
||||||
try:
|
try:
|
||||||
result = util.fetch_url(url, headers=util.mobile_ua)
|
result = util.fetch_url(url, headers=util.mobile_ua)
|
||||||
result = result.replace(b"align:start position:0%", b"")
|
result = result.replace(b"align:start position:0%", b"")
|
||||||
@@ -1350,12 +1339,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
|
|||||||
@yt_app.route('/watch/transcript/<path:caption_path>')
|
@yt_app.route('/watch/transcript/<path:caption_path>')
|
||||||
def get_transcript(caption_path):
|
def get_transcript(caption_path):
|
||||||
try:
|
try:
|
||||||
captions = util.fetch_url('https://www.youtube.com/'
|
captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
|
||||||
+ caption_path
|
|
||||||
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
|
|
||||||
except util.FetchError as e:
|
except util.FetchError as e:
|
||||||
msg = ('Error retrieving captions: ' + str(e) + '\n\n'
|
msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
|
||||||
+ 'The caption url may have expired.')
|
|
||||||
print(msg)
|
print(msg)
|
||||||
return flask.Response(
|
return flask.Response(
|
||||||
msg,
|
msg,
|
||||||
@@ -1403,7 +1389,7 @@ def get_transcript(caption_path):
|
|||||||
result = ''
|
result = ''
|
||||||
for seg in segments:
|
for seg in segments:
|
||||||
if seg['text'] != ' ':
|
if seg['text'] != ' ':
|
||||||
result += seg['begin'] + ' ' + seg['text'] + '\r\n'
|
result += f"{seg['begin']} {seg['text']}\r\n"
|
||||||
|
|
||||||
return flask.Response(result.encode('utf-8'),
|
return flask.Response(result.encode('utf-8'),
|
||||||
mimetype='text/plain;charset=UTF-8')
|
mimetype='text/plain;charset=UTF-8')
|
||||||
|
|||||||
@@ -212,7 +212,7 @@ def extract_date(date_text):
|
|||||||
month, day, year = parts[-3:]
|
month, day, year = parts[-3:]
|
||||||
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
|
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
|
||||||
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
|
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
|
||||||
return year + '-' + month + '-' + day
|
return f'{year}-{month}-{day}'
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def check_missing_keys(object, *key_sequences):
|
def check_missing_keys(object, *key_sequences):
|
||||||
@@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences):
|
|||||||
for key in key_sequence:
|
for key in key_sequence:
|
||||||
_object = _object[key]
|
_object = _object[key]
|
||||||
except (KeyError, IndexError, TypeError):
|
except (KeyError, IndexError, TypeError):
|
||||||
return 'Could not find ' + key
|
return f'Could not find {key}'
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}):
|
|||||||
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
|
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
|
||||||
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
|
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
|
||||||
))
|
))
|
||||||
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
|
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
|
||||||
info['description'] = extract_formatted_text(multi_deep_get(
|
info['description'] = extract_formatted_text(multi_deep_get(
|
||||||
item,
|
item,
|
||||||
['descriptionText'], ['descriptionSnippet'],
|
['descriptionText'], ['descriptionSnippet'],
|
||||||
|
|||||||
@@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json):
|
|||||||
metadata['description'] = desc
|
metadata['description'] = desc
|
||||||
|
|
||||||
if metadata['author_id']:
|
if metadata['author_id']:
|
||||||
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
|
metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}'
|
||||||
|
|
||||||
if metadata['first_video_id'] is None:
|
if metadata['first_video_id'] is None:
|
||||||
metadata['thumbnail'] = None
|
metadata['thumbnail'] = None
|
||||||
|
|||||||
@@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if playability_status not in (None, 'OK'):
|
if playability_status not in (None, 'OK'):
|
||||||
info['playability_error'] = error_prefix + playability_reason
|
info['playability_error'] = f'{error_prefix}{playability_reason}'
|
||||||
elif not info['playability_error']: # do not override
|
elif not info['playability_error']: # do not override
|
||||||
info['playability_error'] = error_prefix + 'Unknown playability error'
|
info['playability_error'] = f'{error_prefix}Unknown playability error'
|
||||||
|
|
||||||
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
|
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
|
||||||
def extract_watch_info(polymer_json):
|
def extract_watch_info(polymer_json):
|
||||||
@@ -726,7 +726,7 @@ def extract_watch_info(polymer_json):
|
|||||||
# Store the full URL from the player response (includes valid tokens)
|
# Store the full URL from the player response (includes valid tokens)
|
||||||
if base_url:
|
if base_url:
|
||||||
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
|
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
|
||||||
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
|
info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized
|
||||||
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
|
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
|
||||||
if lang_name:
|
if lang_name:
|
||||||
info['_manual_caption_language_names'][lang_code] = lang_name
|
info['_manual_caption_language_names'][lang_code] = lang_name
|
||||||
@@ -806,7 +806,7 @@ def extract_watch_info(polymer_json):
|
|||||||
info['allowed_countries'] = mf.get('availableCountries', [])
|
info['allowed_countries'] = mf.get('availableCountries', [])
|
||||||
|
|
||||||
# other stuff
|
# other stuff
|
||||||
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
|
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
|
||||||
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
|
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
|
||||||
|
|
||||||
return info
|
return info
|
||||||
@@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
|
|||||||
url = info['_captions_base_url']
|
url = info['_captions_base_url']
|
||||||
if not url:
|
if not url:
|
||||||
return None
|
return None
|
||||||
url += '&lang=' + language
|
url += f'&lang={language}'
|
||||||
url += '&fmt=' + format
|
url += f'&fmt={format}'
|
||||||
if automatic:
|
if automatic:
|
||||||
url += '&kind=asr'
|
url += '&kind=asr'
|
||||||
elif language in info['_manual_caption_language_names']:
|
elif language in info['_manual_caption_language_names']:
|
||||||
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='')
|
url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}'
|
||||||
|
|
||||||
if translation_language:
|
if translation_language:
|
||||||
url += '&tlang=' + translation_language
|
url += '&tlang=' + translation_language
|
||||||
@@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js):
|
|||||||
return 'Could not find var_name'
|
return 'Could not find var_name'
|
||||||
|
|
||||||
var_name = var_with_operation_match.group(1)
|
var_name = var_with_operation_match.group(1)
|
||||||
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
|
var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL)
|
||||||
if var_body_match is None:
|
if var_body_match is None:
|
||||||
return 'Could not find var_body'
|
return 'Could not find var_body'
|
||||||
|
|
||||||
@@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js):
|
|||||||
elif op_body.startswith('var c=a[0]'):
|
elif op_body.startswith('var c=a[0]'):
|
||||||
operation_definitions[op_name] = 2
|
operation_definitions[op_name] = 2
|
||||||
else:
|
else:
|
||||||
return 'Unknown op_body: ' + op_body
|
return f'Unknown op_body: {op_body}'
|
||||||
|
|
||||||
decryption_function = []
|
decryption_function = []
|
||||||
for op_with_arg in function_body:
|
for op_with_arg in function_body:
|
||||||
@@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js):
|
|||||||
return 'Could not parse operation with arg'
|
return 'Could not parse operation with arg'
|
||||||
op_name = match.group(2).strip('[].')
|
op_name = match.group(2).strip('[].')
|
||||||
if op_name not in operation_definitions:
|
if op_name not in operation_definitions:
|
||||||
return 'Unknown op_name: ' + str(op_name)
|
return f'Unknown op_name: {op_name}'
|
||||||
op_argument = match.group(3)
|
op_argument = match.group(3)
|
||||||
decryption_function.append([operation_definitions[op_name], int(op_argument)])
|
decryption_function.append([operation_definitions[op_name], int(op_argument)])
|
||||||
|
|
||||||
@@ -1028,5 +1028,5 @@ def decrypt_signatures(info):
|
|||||||
_operation_2(a, argument)
|
_operation_2(a, argument)
|
||||||
|
|
||||||
signature = ''.join(a)
|
signature = ''.join(a)
|
||||||
format['url'] += '&' + format['sp'] + '=' + signature
|
format['url'] += f'&{format["sp"]}={signature}'
|
||||||
return False
|
return False
|
||||||
|
|||||||
Reference in New Issue
Block a user