refactor: replace string concatenations with f-strings
All checks were successful
CI / test (push) Successful in 50s

This commit is contained in:
2026-04-25 01:02:17 -05:00
parent a0f315be51
commit 50ad959a80
18 changed files with 201 additions and 235 deletions

View File

@@ -76,7 +76,7 @@ theme_names = {
@yt_app.context_processor
def inject_theme_preference():
return {
'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css',
'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css',
'settings': settings,
# Detect version
'current_version': app_version()['version'],
@@ -145,9 +145,9 @@ def error_page(e):
' exit node is overutilized. Try getting a new exit node by'
' using the New Identity button in the Tor Browser.')
if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message
error_message += f'\n\n{fetch_err.error_message}'
if fetch_err.ip:
error_message += '\n\nExit node IP address: ' + fetch_err.ip
error_message += f'\n\nExit node IP address: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
elif error_code == '429':
@@ -157,7 +157,7 @@ def error_page(e):
'• Enable Tor routing in Settings for automatic IP rotation\n'
'• Use a VPN to change your IP address')
if fetch_err.ip:
error_message += '\n\nYour IP: ' + fetch_err.ip
error_message += f'\n\nYour IP: {fetch_err.ip}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 429
elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)):
@@ -179,7 +179,7 @@ def error_page(e):
# Catch-all for any other FetchError (400, etc.)
error_message = f'Error communicating with YouTube ({error_code}).'
if fetch_err.error_message:
error_message += '\n\n' + fetch_err.error_message
error_message += f'\n\n{fetch_err.error_message}'
return flask.render_template('error.html', error_message=error_message, slim=slim), 502
return flask.render_template('error.html', traceback=traceback.format_exc(),

View File

@@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
# For now it seems to be constant for the API endpoint, not dependent
# on the browsing session or channel
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = {
'context': {
@@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id):
return 1000
# Uploads playlist
playlist_id = 'UU' + channel_id[2:]
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
playlist_id = f'UU{channel_id[2:]}'
url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1'
try:
response = util.fetch_url(url, headers_mobile,
@@ -328,7 +328,7 @@ def get_channel_id(base_url):
# method that gives the smallest possible response at ~4 kb
# needs to be as fast as possible
base_url = base_url.replace('https://www', 'https://m') # avoid redirect
response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile,
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
match = channel_id_re.search(response)
if match:
@@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page):
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = {
'context': {
@@ -414,18 +414,18 @@ def post_process_channel_info(info):
def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
if channel_id:
base_url = 'https://www.youtube.com/channel/' + channel_id
base_url = f'https://www.youtube.com/channel/{channel_id}'
# Build URL with sort parameter
# YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
# Note: 'da' (oldest) was removed by YouTube in January 2026
url = base_url + '/' + tab + '?pbj=1&view=0'
url = f'{base_url}/{tab}?pbj=1&view=0'
if sort:
# Map sort values to YouTube's URL parameter values
sort_map = {'3': 'dd', '4': 'lad'}
url += '&sort=' + sort_map.get(sort, 'dd')
url += f'&sort={sort_map.get(sort, "dd")}'
return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab)
return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}')
playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
@@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
if page_number == 1:
tasks = (
gevent.spawn(playlist.playlist_first_page,
'UU' + channel_id[2:],
f'UU{channel_id[2:]}',
report_text='Retrieved channel videos'),
gevent.spawn(get_metadata, channel_id),
)
@@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
set_cached_number_of_videos(channel_id, number_of_videos)
else:
tasks = (
gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:],
gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}',
page_number, include_shorts=True),
gevent.spawn(get_metadata, channel_id),
gevent.spawn(get_number_of_videos_channel, channel_id),
gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:],
gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}',
report_text='Retrieved channel video count'),
)
gevent.joinall(tasks)
@@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
elif tab == 'search' and channel_id:
polymer_json = get_channel_search_json(channel_id, query, page_number)
elif tab == 'search':
url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='')
url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}'
polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
elif tab != 'videos':
flask.abort(404, 'Unknown channel tab: ' + tab)
flask.abort(404, f'Unknown channel tab: {tab}')
if polymer_json is not None and info is None:
info = yt_data_extract.extract_channel_info(
@@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
return flask.render_template('error.html', error_message=info['error'])
if channel_id:
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}'
info['channel_id'] = channel_id
else:
channel_id = info['channel_id']
@@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
@yt_app.route('/channel/<channel_id>/')
@yt_app.route('/channel/<channel_id>/<tab>')
def get_channel_page(channel_id, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id)
@yt_app.route('/user/<username>/')
@yt_app.route('/user/<username>/<tab>')
def get_user_page(username, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request)
@yt_app.route('/c/<custom>/')
@yt_app.route('/c/<custom>/<tab>')
def get_custom_c_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request)
@yt_app.route('/<custom>')
@yt_app.route('/<custom>/<tab>')
def get_toplevel_custom_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)
return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request)

View File

@@ -104,20 +104,19 @@ def post_process_comments_info(comments_info):
comment['replies_url'] = None
comment['replies_url'] = concat_or_none(
util.URL_ORIGIN,
'/comments?replies=1&ctoken=' + ctoken)
f'/comments?replies=1&ctoken={ctoken}')
if reply_count == 0:
comment['view_replies_text'] = 'Reply'
elif reply_count == 1:
comment['view_replies_text'] = '1 reply'
else:
comment['view_replies_text'] = str(reply_count) + ' replies'
comment['view_replies_text'] = f'{reply_count} replies'
if comment['approx_like_count'] == '1':
comment['likes_text'] = '1 like'
else:
comment['likes_text'] = (str(comment['approx_like_count'])
+ ' likes')
comment['likes_text'] = f"{comment['approx_like_count']} likes"
comments_info['include_avatars'] = settings.enable_comment_avatars
if comments_info['ctoken']:
@@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
comments_info = {'error': None}
try:
other_sort_url = (
util.URL_ORIGIN + '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
f"{util.URL_ORIGIN}/comments?ctoken="
f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}"
)
other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}'
this_sort_url = (util.URL_ORIGIN
+ '/comments?ctoken='
+ make_comment_ctoken(video_id, sort=sort, lc=lc))
this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken="
f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}")
comments_info['comment_links'] = [
(other_sort_text, other_sort_url),
@@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if e.code == '429' and settings.route_tor:
comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
if e.error_message:
comments_info['error'] += '\n\n' + e.error_message
comments_info['error'] += '\n\nExit node IP address: %s' % e.ip
comments_info['error'] += f'\n\n{e.error_message}'
comments_info['error'] += f'\n\nExit node IP address: {e.ip}'
else:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
except Exception as e:
comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
comments_info['error'] = f'YouTube blocked the request. Error: {e}'
if comments_info.get('error'):
print('Error retrieving comments for ' + str(video_id) + ':\n' +
comments_info['error'])
print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}')
return comments_info
@@ -218,12 +215,10 @@ def get_comments_page():
other_sort_url = None
else:
other_sort_url = (
util.URL_ORIGIN
+ '/comments?ctoken='
+ make_comment_ctoken(comments_info['video_id'],
sort=1-comments_info['sort'])
f'{util.URL_ORIGIN}/comments?ctoken='
f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}'
)
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top')
other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}'
comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
return flask.render_template(

View File

@@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name):
util.add_extra_html_info(video)
if video['id'] + '.jpg' in thumbnails:
video['thumbnail'] = (
'/https://youtube.com/data/playlist_thumbnails/'
+ playlist_name
+ '/' + video['id'] + '.jpg')
f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg')
else:
video['thumbnail'] = util.get_thumbnail_url(video['id'])
missing_thumbnails.append(video['id'])

View File

@@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True):
continuation_info = proto.string(3, proto.percent_b64encode(offset))
playlist_id = proto.string(2, 'VL' + playlist_id)
playlist_id = proto.string(2, f'VL{playlist_id}')
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
@@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
use_mobile=False):
# Use innertube API (pbj=1 no longer works for many playlists)
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
data = {
'context': {
@@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist",
'clientVersion': '2.20240327.00.00',
},
},
'browseId': 'VL' + playlist_id,
'browseId': f'VL{playlist_id}',
}
content_type_header = (('Content-Type', 'application/json'),)
@@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False,
page_size = 100
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
url = f'https://www.youtube.com/youtubei/v1/browse?key={key}'
ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size,
include_shorts=include_shorts)
@@ -97,7 +97,7 @@ def get_playlist_page():
if playlist_id.startswith('RD'):
first_video_id = playlist_id[2:] # video ID after 'RD' prefix
return flask.redirect(
util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id,
f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}',
302
)
@@ -132,9 +132,9 @@ def get_playlist_page():
if 'id' in item and not item.get('thumbnail'):
item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg"
item['url'] += '&list=' + playlist_id
item['url'] += f'&list={playlist_id}'
if item['index']:
item['url'] += '&index=' + str(item['index'])
item['url'] += f'&index={item["index"]}'
video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count')
if video_count is None:

View File

@@ -76,7 +76,7 @@ def read_varint(data):
except IndexError:
if i == 0:
raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i
if not byte & 128:
break
@@ -118,7 +118,7 @@ def read_protobuf(data):
elif wire_type == 5:
value = data.read(4)
else:
raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell()))
raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}")
yield (wire_type, field_number, value)
@@ -170,8 +170,7 @@ def _make_protobuf(data):
elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2]))
else:
raise NotImplementedError('Wire type ' + str(field[0])
+ ' not implemented')
raise NotImplementedError(f'Wire type {field[0]} not implemented')
return result
return data
@@ -218,4 +217,4 @@ def b64_to_bytes(data):
if isinstance(data, bytes):
data = data.decode('ascii')
data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')

View File

@@ -179,7 +179,7 @@ def read_varint(data):
except IndexError:
if i == 0:
raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
raise Exception(f'Unterminated varint starting at {data.tell() - i}')
result |= (byte & 127) << 7*i
if not byte & 128:
break
@@ -235,8 +235,7 @@ def _make_protobuf(data):
elif field[0] == 2:
result += string(field[1], _make_protobuf(field[2]))
else:
raise NotImplementedError('Wire type ' + str(field[0])
+ ' not implemented')
raise NotImplementedError(f'Wire type {field[0]} not implemented')
return result
return data
@@ -286,7 +285,7 @@ def b64_to_bytes(data):
if isinstance(data, bytes):
data = data.decode('ascii')
data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}')
# --------------------------------------------------------------------
@@ -344,7 +343,7 @@ fromhex = bytes.fromhex
def aligned_ascii(data):
return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data)
return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data)
def parse_protobuf(data, mutable=False, spec=()):
@@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()):
elif wire_type == 5:
value = data.read(4)
else:
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell()))
raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}")
if mutable:
yield [wire_type, field_number, value]
else:
@@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None):
if map01 is not None:
map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b'01', b'O' + map01))
s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}'))
if casefold:
s = s.upper()
# Strip off pad characters from the right. We need to count the pad
@@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None):
def dec32(data):
if isinstance(data, bytes):
data = data.decode('ascii')
return b32decode(data + "="*((8 - len(data)%8)%8))
return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}')
_patterns = [
@@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work
if len(obj) == 3: # (wire_type, field_number, data)
return obj.__repr__()
else: # (base64, [...])
return ('(' + obj[0].__repr__() + ',\n'
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
+ ')')
return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)"
elif isinstance(obj, list):
# [wire_type, field_number, data]
if (len(obj) == 3
@@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work
elif (len(obj) == 3
and not any(isinstance(x, (list, tuple)) for x in obj[0:2])
):
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n'
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
+ ']')
return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]"
else:
s = '[\n'
for x in obj:
s += indent_lines(_pp(x, indent), indent) + ',\n'
s += f"{indent_lines(_pp(x, indent), indent)},\n"
s += ']'
return s
else:

View File

@@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}"
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
info = json.loads(content)
return info

View File

@@ -126,7 +126,7 @@ def delete_thumbnails(to_delete):
os.remove(os.path.join(thumbnails_directory, thumbnail))
existing_thumbnails.remove(video_id)
except Exception:
print('Failed to delete thumbnail: ' + thumbnail)
print(f'Failed to delete thumbnail: {thumbnail}')
traceback.print_exc()
@@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None):
'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]),
'author': db_video[5],
'author_id': db_video[6],
'author_url': '/https://www.youtube.com/channel/' + db_video[6],
'author_url': f'/https://www.youtube.com/channel/{db_video[6]}',
})
return videos, pseudo_number_of_videos
@@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time):
if delta >= unit_time:
quantifier = round(delta/unit_time)
if quantifier == 1:
return '1 ' + unit_name + ' ago'
return f'1 {unit_name} ago'
else:
return str(quantifier) + ' ' + unit_name + 's ago'
return f'{quantifier} {unit_name}s ago'
else:
raise Exception()
@@ -363,7 +363,7 @@ def autocheck_dispatcher():
time_until_earliest_job = earliest_job['next_check_time'] - time.time()
if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time']))
print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}')
next_check_time = time.time() + 3600*secrets.randbelow(60)/60
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
@@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids):
def _get_atoma_feed(channel_id):
url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id
url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}'
try:
return util.fetch_url(url).decode('utf-8')
except util.FetchError as e:
@@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name):
return channel_info
except util.FetchError as e:
if e.code == '429' and settings.route_tor:
error_message = ('Error checking channel ' + channel_status_name
+ ': YouTube blocked the request because the'
+ ' Tor exit node is overutilized. Try getting a new exit node'
+ ' by using the New Identity button in the Tor Browser.')
error_message = (f'Error checking channel {channel_status_name}: '
f'YouTube blocked the request because the Tor exit node is overutilized. '
f'Try getting a new exit node by using the New Identity button in the Tor Browser.')
if e.ip:
error_message += ' Exit node IP address: ' + e.ip
error_message += f' Exit node IP address: {e.ip}'
print(error_message)
return None
elif e.code == '502':
print('Error checking channel', channel_status_name + ':', str(e))
print(f'Error checking channel {channel_status_name}: {e}')
return None
raise
@@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id):
except KeyError:
channel_status_name = channel_id
print("Checking channel: " + channel_status_name)
print(f"Checking channel: {channel_status_name}")
tasks = (
# channel page, need for video duration
@@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id):
times_published[video_id_element.text] = time_published
except ValueError:
print('Failed to read atoma feed for ' + channel_status_name)
print(f'Failed to read atoma feed for {channel_status_name}')
traceback.print_exc()
except defusedxml.ElementTree.ParseError:
print('Failed to read atoma feed for ' + channel_status_name)
print(f'Failed to read atoma feed for {channel_status_name}')
if channel_info is None: # there was an error
return
if channel_info['error']:
print('Error checking channel ' + channel_status_name + ': ' + channel_info['error'])
print(f'Error checking channel {channel_status_name}: {channel_info["error"]}')
return
videos = channel_info['items']
@@ -1023,7 +1022,7 @@ def get_subscriptions_page():
tag = request.args.get('tag', None)
videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag)
for video in videos:
video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg'
video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg'
video['type'] = 'video'
video['item_size'] = 'small'
util.add_extra_html_info(video)
@@ -1033,7 +1032,7 @@ def get_subscriptions_page():
subscription_list = []
for channel_name, channel_id, muted in _get_subscribed_channels(cursor):
subscription_list.append({
'channel_url': util.URL_ORIGIN + '/channel/' + channel_id,
'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}',
'channel_name': channel_name,
'channel_id': channel_id,
'muted': muted,
@@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f"https://i.ytimg.com/vi/{video_id}/{quality}"
try:
image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id)
image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}")
break
except util.FetchError as e:
if '404' in str(e):
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(500)
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print("Failed to download thumbnail for " + video_id + ": " + str(e))
print(f"Failed to download thumbnail for {video_id}: {e}")
flask.abort(e.code)
if image is None:

View File

@@ -72,7 +72,7 @@ class TorManager:
def __init__(self):
self.old_tor_connection_pool = None
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic()
settings.add_setting_changed_hook(
@@ -92,7 +92,7 @@ class TorManager:
self.old_tor_connection_pool = self.tor_connection_pool
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
f'socks5h://127.0.0.1:{settings.tor_port}/',
cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic()
@@ -198,9 +198,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
class FetchError(Exception):
def __init__(self, code, reason='', ip=None, error_message=None):
if error_message:
string = code + ' ' + reason + ': ' + error_message
string = f"{code} {reason}: {error_message}"
else:
string = 'HTTP error during request: ' + code + ' ' + reason
string = f"HTTP error during request: {code} {reason}"
Exception.__init__(self, string)
self.code = code
self.reason = reason
@@ -294,14 +294,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
exception_cause = e.__context__.__context__
if (isinstance(exception_cause, socks.ProxyConnectionError)
and settings.route_tor):
msg = ('Failed to connect to Tor. Check that Tor is open and '
'that your internet connection is working.\n\n'
+ str(e))
msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}'
raise FetchError('502', reason='Bad Gateway',
error_message=msg)
elif isinstance(e.__context__,
urllib3.exceptions.NewConnectionError):
msg = 'Failed to establish a connection.\n\n' + str(e)
msg = f'Failed to establish a connection.\n\n{e}'
raise FetchError(
'502', reason='Bad Gateway',
error_message=msg)
@@ -391,7 +389,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
if error:
raise FetchError(
'429', reason=response.reason, ip=ip,
error_message='Automatic circuit change: ' + error)
error_message=f'Automatic circuit change: {error}')
continue # retry with new identity
# Check for client errors (400, 404) - don't retry these
@@ -467,10 +465,7 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
headers = {'User-Agent': 'Python-urllib'}
response = pool.request('HEAD', url, headers=headers, retries=retries)
if report_text:
print(
report_text,
' Latency:',
round(time.monotonic() - start_time, 3))
print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}')
return response
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
@@ -544,16 +539,16 @@ def download_thumbnail(save_directory, video_id):
for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'):
url = f'https://i.ytimg.com/vi/{video_id}/{quality}'
try:
thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id)
thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}')
except FetchError as e:
if '404' in str(e):
continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
print(f'Failed to download thumbnail for {video_id}: {e}')
return False
except urllib.error.HTTPError as e:
if e.code == 404:
continue
print('Failed to download thumbnail for ' + video_id + ': ' + str(e))
print(f'Failed to download thumbnail for {video_id}: {e}')
return False
try:
with open(save_location, 'wb') as f:
@@ -563,7 +558,7 @@ def download_thumbnail(save_directory, video_id):
with open(save_location, 'wb') as f:
f.write(thumbnail)
return True
print('No thumbnail available for ' + video_id)
print(f'No thumbnail available for {video_id}')
return False
@@ -698,7 +693,7 @@ def prefix_urls(item):
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None
video_info = {}
for key in ('id', 'title', 'author', 'duration', 'author_id'):
@@ -721,7 +716,7 @@ def add_extra_html_info(item):
item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
if item.get('author_id') and 'author_url' not in item:
item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id']
item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}'
def check_gevent_exceptions(*tasks):
@@ -967,7 +962,7 @@ def call_youtube_api(client, api, data):
user_agent = context['client'].get('userAgent') or mobile_user_agent
visitor_data = get_visitor_data()
url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key
url = f'https://{host}/youtubei/v1/{api}?key={key}'
if visitor_data:
context['client'].update({'visitorData': visitor_data})
data['context'] = context
@@ -978,8 +973,8 @@ def call_youtube_api(client, api, data):
headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
response = fetch_url(
url, data=data, headers=headers,
debug_name='youtubei_' + api + '_' + client,
report_text='Fetched ' + client + ' youtubei ' + api
debug_name=f'youtubei_{api}_{client}',
report_text=f'Fetched {client} youtubei {api}'
).decode('utf-8')
return response

View File

@@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and fmt['vcodec']:
if fmt.get('audio_track_is_default', True) is False:
continue
source = {'type': 'video/' + fmt['ext'],
source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)}
source['quality_string'] += ' (integrated)'
source.update(fmt)
@@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution):
if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
if fmt['bitrate']:
fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
source = {'type': 'audio/' + fmt['ext'],
source = {'type': f"audio/{fmt['ext']}",
'quality_string': audio_quality_string(fmt)}
source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"'
source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
tid = fmt.get('audio_track_id') or 'default'
if tid not in audio_by_track:
audio_by_track[tid] = {
@@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution):
elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
if codec_name(fmt['vcodec']) == 'unknown':
continue
source = {'type': 'video/' + fmt['ext'],
source = {'type': f"video/{fmt['ext']}",
'quality_string': short_video_quality_string(fmt)}
source.update(fmt)
source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"'
quality = str(fmt['quality']) + 'p' + str(fmt['fps'])
source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
quality = f"{fmt['quality']}p{fmt['fps']}"
video_only_sources.setdefault(quality, []).append(source)
audio_tracks = []
@@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution):
def video_rank(src):
''' Sort by settings preference. Use file size as tiebreaker '''
setting_name = 'codec_rank_' + codec_name(src['vcodec'])
setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
return (settings.current_settings_dict[setting_name],
src['file_size'])
pair_info['videos'].sort(key=video_rank)
@@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
if auto:
label += ' (Automatic)'
if trans_lang:
label += ' -> ' + trans_lang
label += f' -> {trans_lang}'
# Try to use Android caption URL directly (no PO Token needed)
caption_url = None
@@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
else:
caption_url += '&fmt=vtt'
if trans_lang:
caption_url += '&tlang=' + trans_lang
caption_url += f'&tlang={trans_lang}'
url = util.prefix_url(caption_url)
else:
# Fallback to old method
@@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id):
player_name = info['player_name']
if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name)
print(f'Using cached decryption function for: {player_name}')
info['decryption_function'] = decrypt_cache[player_name]
else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js)
if err:
@@ -387,11 +387,11 @@ def fetch_player_response(client, video_id):
def fetch_watch_page_info(video_id, playlist_id, index):
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
# videos
url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999'
url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
if playlist_id:
url += '&list=' + playlist_id
url += f'&list={playlist_id}'
if index:
url += '&index=' + index
url += f'&index={index}'
headers = (
('Accept', '*/*'),
@@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS audio tracks for proxy access
added = 0
for lang, track in info['hls_audio_tracks'].items():
ck = video_id + '_' + lang
ck = f"{video_id}_{lang}"
from youtube.hls_cache import register_track
register_track(ck, track['hls_url'],
video_id=video_id, track_id=lang)
@@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'audio_track_id': lang,
'audio_track_name': track['name'],
'audio_track_is_default': track['is_default'],
'itag': 'hls_' + lang,
'itag': f'hls_{lang}',
'ext': 'mp4',
'audio_bitrate': 128,
'bitrate': 128000,
@@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
'fps': None,
'init_range': {'start': 0, 'end': 0},
'index_range': {'start': 0, 'end': 0},
'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck),
'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
's': None,
'sp': None,
'quality': None,
@@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# Register HLS manifest for proxying
if info['hls_manifest_url']:
ck = video_id + '_video'
ck = f"{video_id}_video"
from youtube.hls_cache import register_track
register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
# Use proxy URL instead of direct Google Video URL
info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck)
info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
# Fallback to 'ios' if no valid URLs are found
if not info.get('formats') or info.get('player_urls_missing'):
@@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
if info.get('formats'):
decryption_error = decrypt_signatures(info, video_id)
if decryption_error:
info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error
info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
# check if urls ready (non-live format) in former livestream
# urls not ready if all of them have no filesize
@@ -623,9 +623,9 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
def video_quality_string(format):
if format['vcodec']:
result = str(format['width'] or '?') + 'x' + str(format['height'] or '?')
result = f"{format['width'] or '?'}x{format['height'] or '?'}"
if format['fps']:
result += ' ' + str(format['fps']) + 'fps'
result += f" {format['fps']}fps"
return result
elif format['acodec']:
return 'audio only'
@@ -634,7 +634,7 @@ def video_quality_string(format):
def short_video_quality_string(fmt):
result = str(fmt['quality'] or '?') + 'p'
result = f"{fmt['quality'] or '?'}p"
if fmt['fps']:
result += str(fmt['fps'])
if fmt['vcodec'].startswith('av01'):
@@ -642,18 +642,18 @@ def short_video_quality_string(fmt):
elif fmt['vcodec'].startswith('avc'):
result += ' h264'
else:
result += ' ' + fmt['vcodec']
result += f" {fmt['vcodec']}"
return result
def audio_quality_string(fmt):
if fmt['acodec']:
if fmt['audio_bitrate']:
result = '%d' % fmt['audio_bitrate'] + 'k'
result = f"{fmt['audio_bitrate']}k"
else:
result = '?k'
if fmt['audio_sample_rate']:
result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz'
result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz"
return result
elif fmt['vcodec']:
return 'video only'
@@ -737,9 +737,9 @@ def get_audio_track():
seg = line if line.startswith('http') else urljoin(playlist_base, line)
# Always use &seg= parameter, never &url= for segments
playlist_lines.append(
base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(seg, safe='')
f'{base_url}/ytl-api/audio-track?id='
f'{urllib.parse.quote(cache_key)}'
f'&seg={urllib.parse.quote(seg, safe="")}'
)
playlist = '\n'.join(playlist_lines)
@@ -797,9 +797,7 @@ def get_audio_track():
return url
if not url.startswith('http://') and not url.startswith('https://'):
url = urljoin(playlist_base, url)
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
playlist_lines = []
for line in playlist.split('\n'):
@@ -812,7 +810,7 @@ def get_audio_track():
if line.startswith('#') and 'URI=' in line:
def rewrite_uri_attr(match):
uri = match.group(1)
return 'URI="' + proxy_url(uri) + '"'
return f'URI="{proxy_url(uri)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
playlist_lines.append(line)
elif line.startswith('#'):
@@ -883,9 +881,7 @@ def get_audio_track():
if segment_url.startswith('/ytl-api/audio-track'):
return segment_url
base_url = request.url_root.rstrip('/')
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(segment_url))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
playlist_lines = []
for line in playlist.split('\n'):
@@ -949,14 +945,10 @@ def get_hls_manifest():
if is_audio_track:
# Audio track playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&url=' + urllib.parse.quote(url, safe=''))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
else:
# Video segment or variant playlist - proxy through audio-track endpoint
return (base_url + '/ytl-api/audio-track?id='
+ urllib.parse.quote(cache_key)
+ '&seg=' + urllib.parse.quote(url, safe=''))
return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
# Parse and rewrite the manifest
manifest_lines = []
@@ -974,7 +966,7 @@ def get_hls_manifest():
nonlocal rewritten_count
uri = match.group(1)
rewritten_count += 1
return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"'
return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
manifest_lines.append(line)
elif line.startswith('#'):
@@ -1053,7 +1045,7 @@ def get_storyboard_vtt():
ts = 0 # current timestamp
for i in range(storyboard.storyboard_count):
url = '/' + storyboard.url.replace("$M", str(i))
url = f'/{storyboard.url.replace("$M", str(i))}'
interval = storyboard.interval
w, h = storyboard.width, storyboard.height
w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
@@ -1078,7 +1070,7 @@ def get_watch_page(video_id=None):
if not video_id:
return flask.render_template('error.html', error_message='Missing video id'), 404
if len(video_id) < 11:
return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404
return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
time_start_str = request.args.get('t', '0s')
time_start = 0
@@ -1141,9 +1133,9 @@ def get_watch_page(video_id=None):
util.prefix_urls(item)
util.add_extra_html_info(item)
if playlist_id:
item['url'] += '&list=' + playlist_id
item['url'] += f'&list={playlist_id}'
if item['index']:
item['url'] += '&index=' + str(item['index'])
item['url'] += f'&index={item["index"]}'
info['playlist']['author_url'] = util.prefix_url(
info['playlist']['author_url'])
if settings.img_prefix:
@@ -1159,16 +1151,16 @@ def get_watch_page(video_id=None):
filename = title
ext = fmt.get('ext')
if ext:
filename += '.' + ext
filename += f'.{ext}'
fmt['url'] = fmt['url'].replace(
'/videoplayback',
'/videoplayback/name/' + filename)
f'/videoplayback/name/{filename}')
download_formats = []
for format in (info['formats'] + info['hls_formats']):
if format['acodec'] and format['vcodec']:
codecs_string = format['acodec'] + ', ' + format['vcodec']
codecs_string = f"{format['acodec']}, {format['vcodec']}"
else:
codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({
@@ -1247,12 +1239,9 @@ def get_watch_page(video_id=None):
for source in subtitle_sources:
best_caption_parse = urllib.parse.urlparse(
source['url'].lstrip('/'))
transcript_url = (util.URL_ORIGIN
+ '/watch/transcript'
+ best_caption_parse.path
+ '?' + best_caption_parse.query)
transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
other_downloads.append({
'label': 'Video Transcript: ' + source['label'],
'label': f'Video Transcript: {source["label"]}',
'ext': 'txt',
'url': transcript_url
})
@@ -1263,7 +1252,7 @@ def get_watch_page(video_id=None):
template_name = 'watch.html'
return flask.render_template(template_name,
header_playlist_names = local_playlist.get_playlist_names(),
uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '',
uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
time_published = info['time_published'],
view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
@@ -1305,10 +1294,10 @@ def get_watch_page(video_id=None):
ip_address = info['ip_address'] if settings.route_tor else None,
invidious_used = info['invidious_used'],
invidious_reload_button = info['invidious_reload_button'],
video_url = util.URL_ORIGIN + '/watch?v=' + video_id,
video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
video_id = video_id,
storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' +
urlencode([('spec_url', info['storyboard_spec_url'])])
storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
if info['storyboard_spec_url'] else None),
js_data = {
@@ -1335,7 +1324,7 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>')
def get_captions(dummy):
url = 'https://www.youtube.com' + request.full_path
url = f'https://www.youtube.com{request.full_path}'
try:
result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"")
@@ -1350,12 +1339,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
@yt_app.route('/watch/transcript/<path:caption_path>')
def get_transcript(caption_path):
try:
captions = util.fetch_url('https://www.youtube.com/'
+ caption_path
+ '?' + request.environ['QUERY_STRING']).decode('utf-8')
captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
except util.FetchError as e:
msg = ('Error retrieving captions: ' + str(e) + '\n\n'
+ 'The caption url may have expired.')
msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
print(msg)
return flask.Response(
msg,
@@ -1403,7 +1389,7 @@ def get_transcript(caption_path):
result = ''
for seg in segments:
if seg['text'] != ' ':
result += seg['begin'] + ' ' + seg['text'] + '\r\n'
result += f"{seg['begin']} {seg['text']}\r\n"
return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8')

View File

@@ -212,7 +212,7 @@ def extract_date(date_text):
month, day, year = parts[-3:]
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day
return f'{year}-{month}-{day}'
return None
def check_missing_keys(object, *key_sequences):
@@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences):
for key in key_sequence:
_object = _object[key]
except (KeyError, IndexError, TypeError):
return 'Could not find ' + key
return f'Could not find {key}'
return None
@@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}):
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['description'] = extract_formatted_text(multi_deep_get(
item,
['descriptionText'], ['descriptionSnippet'],

View File

@@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json):
metadata['description'] = desc
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}'
if metadata['first_video_id'] is None:
metadata['thumbnail'] = None

View File

@@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''):
)
if playability_status not in (None, 'OK'):
info['playability_error'] = error_prefix + playability_reason
info['playability_error'] = f'{error_prefix}{playability_reason}'
elif not info['playability_error']: # do not override
info['playability_error'] = error_prefix + 'Unknown playability error'
info['playability_error'] = f'{error_prefix}Unknown playability error'
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json):
@@ -726,7 +726,7 @@ def extract_watch_info(polymer_json):
# Store the full URL from the player response (includes valid tokens)
if base_url:
normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name
@@ -806,7 +806,7 @@ def extract_watch_info(polymer_json):
info['allowed_countries'] = mf.get('availableCountries', [])
# other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None
info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
return info
@@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
url = info['_captions_base_url']
if not url:
return None
url += '&lang=' + language
url += '&fmt=' + format
url += f'&lang={language}'
url += f'&fmt={format}'
if automatic:
url += '&kind=asr'
elif language in info['_manual_caption_language_names']:
url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='')
url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}'
if translation_language:
url += '&tlang=' + translation_language
@@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js):
return 'Could not find var_name'
var_name = var_with_operation_match.group(1)
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL)
if var_body_match is None:
return 'Could not find var_body'
@@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js):
elif op_body.startswith('var c=a[0]'):
operation_definitions[op_name] = 2
else:
return 'Unknown op_body: ' + op_body
return f'Unknown op_body: {op_body}'
decryption_function = []
for op_with_arg in function_body:
@@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js):
return 'Could not parse operation with arg'
op_name = match.group(2).strip('[].')
if op_name not in operation_definitions:
return 'Unknown op_name: ' + str(op_name)
return f'Unknown op_name: {op_name}'
op_argument = match.group(3)
decryption_function.append([operation_definitions[op_name], int(op_argument)])
@@ -1028,5 +1028,5 @@ def decrypt_signatures(info):
_operation_2(a, argument)
signature = ''.join(a)
format['url'] += '&' + format['sp'] + '=' + signature
format['url'] += f'&{format["sp"]}={signature}'
return False