diff --git a/generate_release.py b/generate_release.py index 5ee0a51..76c979d 100644 --- a/generate_release.py +++ b/generate_release.py @@ -33,7 +33,7 @@ def check_subp(x): raise Exception('Got nonzero exit code from command') def log(line): - print('[generate_release.py] ' + line) + print(f'[generate_release.py] {line}') # https://stackoverflow.com/questions/7833715/python-deleting-certain-file-extensions def remove_files_with_extensions(path, extensions): @@ -43,23 +43,23 @@ def remove_files_with_extensions(path, extensions): os.remove(os.path.join(root, file)) def download_if_not_exists(file_name, url, sha256=None): - if not os.path.exists('./' + file_name): + if not os.path.exists(f'./{file_name}'): # Reject non-https URLs so a mistaken constant cannot cause a # plaintext download (bandit B310 hardening). if not url.startswith('https://'): - raise Exception('Refusing to download over non-https URL: ' + url) - log('Downloading ' + file_name + '..') + raise Exception(f'Refusing to download over non-https URL: {url}') + log(f'Downloading {file_name}..') data = urllib.request.urlopen(url).read() - log('Finished downloading ' + file_name) - with open('./' + file_name, 'wb') as f: + log(f'Finished downloading {file_name}') + with open(f'./{file_name}', 'wb') as f: f.write(data) if sha256: digest = hashlib.sha256(data).hexdigest() if digest != sha256: - log('Error: ' + file_name + ' has wrong hash: ' + digest) + log(f'Error: {file_name} has wrong hash: {digest}') sys.exit(1) else: - log('Using existing ' + file_name) + log(f'Using existing {file_name}') def wine_run_shell(command): # Keep argv-style invocation (no shell) to avoid command injection. @@ -120,7 +120,7 @@ if len(os.listdir('./yt-local')) == 0: # ----------- Generate embedded python distribution ----------- os.environ['PYTHONDONTWRITEBYTECODE'] = '1' # *.pyc files double the size of the distribution get_pip_url = 'https://bootstrap.pypa.io/get-pip.py' -latest_dist_url = 'https://www.python.org/ftp/python/' + latest_version + '/python-' + latest_version +latest_dist_url = f'https://www.python.org/ftp/python/{latest_version}/python-{latest_version}' if bitness == '32': latest_dist_url += '-embed-win32.zip' else: @@ -142,7 +142,7 @@ else: download_if_not_exists('get-pip.py', get_pip_url) -python_dist_name = 'python-dist-' + latest_version + '-' + bitness + '.zip' +python_dist_name = f'python-dist-{latest_version}-{bitness}.zip' download_if_not_exists(python_dist_name, latest_dist_url) download_if_not_exists(visual_c_name, @@ -203,7 +203,7 @@ and replaced with a .pth. Isolated mode will have to be specified manually. log('Removing ._pth') major_release = latest_version.split('.')[1] -os.remove(r'./python/python3' + major_release + '._pth') +os.remove(rf'./python/python3{major_release}._pth') log('Adding path_fixes.pth') with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f: @@ -214,7 +214,7 @@ with open(r'./python/path_fixes.pth', 'w', encoding='utf-8') as f: # Need to add the directory where packages are installed, # and the parent directory (which is where the yt-local files are) major_release = latest_version.split('.')[1] -with open('./python/python3' + major_release + '._pth', 'a', encoding='utf-8') as f: +with open(rf'./python/python3{major_release}._pth', 'a', encoding='utf-8') as f: f.write('.\\Lib\\site-packages\n') f.write('..\n')''' @@ -255,10 +255,10 @@ log('Copying python distribution into release folder') shutil.copytree(r'./python', r'./yt-local/python') # ----------- Create release zip ----------- -output_filename = 'yt-local-' + release_tag + '-' + suffix + '.zip' -if os.path.exists('./' + output_filename): +output_filename = f'yt-local-{release_tag}-{suffix}.zip' +if os.path.exists(f'./{output_filename}'): log('Removing previous zipped release') - os.remove('./' + output_filename) + os.remove(f'./{output_filename}') log('Zipping release') check_subp(subprocess.run(['7z', '-mx=9', 'a', output_filename, './yt-local'])) diff --git a/server.py b/server.py index 36ddae5..7f0100c 100644 --- a/server.py +++ b/server.py @@ -32,9 +32,9 @@ def youtu_be(env, start_response): id = env['PATH_INFO'][1:] env['PATH_INFO'] = '/watch' if not env['QUERY_STRING']: - env['QUERY_STRING'] = 'v=' + id + env['QUERY_STRING'] = f'v={id}' else: - env['QUERY_STRING'] += '&v=' + id + env['QUERY_STRING'] += f'&v={id}' yield from yt_app(env, start_response) @@ -64,12 +64,12 @@ def proxy_site(env, start_response, video=False): if 'HTTP_RANGE' in env: send_headers['Range'] = env['HTTP_RANGE'] - url = "https://" + env['SERVER_NAME'] + env['PATH_INFO'] + url = f"https://{env['SERVER_NAME']}{env['PATH_INFO']}" # remove /name portion if video and '/videoplayback/name/' in url: url = url[0:url.rfind('/name/')] if env['QUERY_STRING']: - url += '?' + env['QUERY_STRING'] + url += f'?{env["QUERY_STRING"]}' try_num = 1 first_attempt = True @@ -96,7 +96,7 @@ def proxy_site(env, start_response, video=False): +[('Access-Control-Allow-Origin', '*')]) if first_attempt: - start_response(str(response.status) + ' ' + response.reason, + start_response(f"{response.status} {response.reason}", response_headers) content_length = int(dict(response_headers).get('Content-Length', 0)) @@ -136,9 +136,8 @@ def proxy_site(env, start_response, video=False): fail_byte = start + total_received send_headers['Range'] = 'bytes=%d-%d' % (fail_byte, end) print( - 'Warning: YouTube closed the connection before byte', - str(fail_byte) + '.', 'Expected', start+content_length, - 'bytes.' + f'Warning: YouTube closed the connection before byte {fail_byte}. ' + f'Expected {start+content_length} bytes.' ) retry = True @@ -185,7 +184,7 @@ def split_url(url): # python STILL doesn't have a proper regular expression engine like grep uses built in... match = re.match(r'(?:https?://)?([\w-]+(?:\.[\w-]+)+?)(/.*|$)', url) if match is None: - raise ValueError('Invalid or unsupported url: ' + url) + raise ValueError(f'Invalid or unsupported url: {url}') return match.group(1), match.group(2) @@ -238,7 +237,7 @@ def site_dispatch(env, start_response): if base_name == '': base_name = domain else: - base_name = domain + '.' + base_name + base_name = f"{domain}.{base_name}" try: handler = site_handlers[base_name] diff --git a/settings.py b/settings.py index 3fd96ac..677a33a 100644 --- a/settings.py +++ b/settings.py @@ -397,14 +397,14 @@ acceptable_targets = SETTINGS_INFO.keys() | { def comment_string(comment): result = '' for line in comment.splitlines(): - result += '# ' + line + '\n' + result += f'# {line}\n' return result def save_settings(settings_dict): with open(settings_file_path, 'w', encoding='utf-8') as file: for setting_name, setting_info in SETTINGS_INFO.items(): - file.write(comment_string(setting_info['comment']) + setting_name + ' = ' + repr(settings_dict[setting_name]) + '\n\n') + file.write(f"{comment_string(setting_info['comment'])}{setting_name} = {repr(settings_dict[setting_name])}\n\n") def add_missing_settings(settings_dict): @@ -481,7 +481,7 @@ upgrade_functions = { def log_ignored_line(line_number, message): - print('WARNING: Ignoring settings.txt line ' + str(line_number) + ' (' + message + ')') + print(f'WARNING: Ignoring settings.txt line {line_number} ({message})') if os.path.isfile("settings.txt"): @@ -535,7 +535,7 @@ else: continue if target.id not in acceptable_targets: - log_ignored_line(node.lineno, target.id + " is not a valid setting") + log_ignored_line(node.lineno, f"{target.id} is not a valid setting") continue if type(node.value) not in attributes: @@ -645,6 +645,6 @@ def settings_page(): for func, old_value, value in to_call: func(old_value, value) - return flask.redirect(util.URL_ORIGIN + '/settings', 303) + return flask.redirect(f'{util.URL_ORIGIN}/settings', 303) else: flask.abort(400) diff --git a/tests/test_shorts.py b/tests/test_shorts.py index c5b7301..c413a8b 100644 --- a/tests/test_shorts.py +++ b/tests/test_shorts.py @@ -27,7 +27,7 @@ class TestChannelCtokenV5: def _decode_outer(self, ctoken): """Decode the outer protobuf layer of a ctoken.""" - raw = base64.urlsafe_b64decode(ctoken + '==') + raw = base64.urlsafe_b64decode(f'{ctoken}==') return {fn: val for _, fn, val in proto.read_protobuf(raw)} def test_shorts_token_generates_without_error(self): @@ -68,8 +68,8 @@ class TestChannelCtokenV5: assert t_with_shorts != t_without_shorts # Decode and verify the filter is present - raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==') - raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==') + raw_with_shorts = base64.urlsafe_b64decode(f'{t_with_shorts}==') + raw_without_shorts = base64.urlsafe_b64decode(f'{t_without_shorts}==') # Parse the outer protobuf structure import youtube.proto as proto @@ -95,8 +95,8 @@ class TestChannelCtokenV5: decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii')) # Decode the base64 data - decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==') - decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==') + decoded_with_bytes = base64.urlsafe_b64decode(f'{decoded_with}==') + decoded_without_bytes = base64.urlsafe_b64decode(f'{decoded_without}==') # Parse the decoded protobuf data fields_with = list(proto.read_protobuf(decoded_with_bytes)) diff --git a/youtube/__init__.py b/youtube/__init__.py index b0e7cd3..885cadc 100644 --- a/youtube/__init__.py +++ b/youtube/__init__.py @@ -76,7 +76,7 @@ theme_names = { @yt_app.context_processor def inject_theme_preference(): return { - 'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css', + 'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css', 'settings': settings, # Detect version 'current_version': app_version()['version'], @@ -145,9 +145,9 @@ def error_page(e): ' exit node is overutilized. Try getting a new exit node by' ' using the New Identity button in the Tor Browser.') if fetch_err.error_message: - error_message += '\n\n' + fetch_err.error_message + error_message += f'\n\n{fetch_err.error_message}' if fetch_err.ip: - error_message += '\n\nExit node IP address: ' + fetch_err.ip + error_message += f'\n\nExit node IP address: {fetch_err.ip}' return flask.render_template('error.html', error_message=error_message, slim=slim), 502 elif error_code == '429': @@ -157,7 +157,7 @@ def error_page(e): '• Enable Tor routing in Settings for automatic IP rotation\n' '• Use a VPN to change your IP address') if fetch_err.ip: - error_message += '\n\nYour IP: ' + fetch_err.ip + error_message += f'\n\nYour IP: {fetch_err.ip}' return flask.render_template('error.html', error_message=error_message, slim=slim), 429 elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)): @@ -179,7 +179,7 @@ def error_page(e): # Catch-all for any other FetchError (400, etc.) error_message = f'Error communicating with YouTube ({error_code}).' if fetch_err.error_message: - error_message += '\n\n' + fetch_err.error_message + error_message += f'\n\n{fetch_err.error_message}' return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', traceback=traceback.format_exc(), diff --git a/youtube/channel.py b/youtube/channel.py index 14a565e..8baf588 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, # For now it seems to be constant for the API endpoint, not dependent # on the browsing session or channel key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' data = { 'context': { @@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id): return 1000 # Uploads playlist - playlist_id = 'UU' + channel_id[2:] - url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' + playlist_id = f'UU{channel_id[2:]}' + url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1' try: response = util.fetch_url(url, headers_mobile, @@ -328,7 +328,7 @@ def get_channel_id(base_url): # method that gives the smallest possible response at ~4 kb # needs to be as fast as possible base_url = base_url.replace('https://www', 'https://m') # avoid redirect - response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, + response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile, debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') match = channel_id_re.search(response) if match: @@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page): ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' data = { 'context': { @@ -414,18 +414,18 @@ def post_process_channel_info(info): def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): if channel_id: - base_url = 'https://www.youtube.com/channel/' + channel_id + base_url = f'https://www.youtube.com/channel/{channel_id}' # Build URL with sort parameter # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts # Note: 'da' (oldest) was removed by YouTube in January 2026 - url = base_url + '/' + tab + '?pbj=1&view=0' + url = f'{base_url}/{tab}?pbj=1&view=0' if sort: # Map sort values to YouTube's URL parameter values sort_map = {'3': 'dd', '4': 'lad'} - url += '&sort=' + sort_map.get(sort, 'dd') + url += f'&sort={sort_map.get(sort, "dd")}' - return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) + return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}') playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} @@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): if page_number == 1: tasks = ( gevent.spawn(playlist.playlist_first_page, - 'UU' + channel_id[2:], + f'UU{channel_id[2:]}', report_text='Retrieved channel videos'), gevent.spawn(get_metadata, channel_id), ) @@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): set_cached_number_of_videos(channel_id, number_of_videos) else: tasks = ( - gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], + gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}', page_number, include_shorts=True), gevent.spawn(get_metadata, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id), - gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], + gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}', report_text='Retrieved channel video count'), ) gevent.joinall(tasks) @@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): elif tab == 'search' and channel_id: polymer_json = get_channel_search_json(channel_id, query, page_number) elif tab == 'search': - url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') + url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}' polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') elif tab != 'videos': - flask.abort(404, 'Unknown channel tab: ' + tab) + flask.abort(404, f'Unknown channel tab: {tab}') if polymer_json is not None and info is None: info = yt_data_extract.extract_channel_info( @@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): return flask.render_template('error.html', error_message=info['error']) if channel_id: - info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id + info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}' info['channel_id'] = channel_id else: channel_id = info['channel_id'] @@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): @yt_app.route('/channel//') @yt_app.route('/channel//') def get_channel_page(channel_id, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) + return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id) @yt_app.route('/user//') @yt_app.route('/user//') def get_user_page(username, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) + return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request) @yt_app.route('/c//') @yt_app.route('/c//') def get_custom_c_page(custom, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) + return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request) @yt_app.route('/') @yt_app.route('//') def get_toplevel_custom_page(custom, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) + return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request) diff --git a/youtube/comments.py b/youtube/comments.py index 8d03f22..c4e30dd 100644 --- a/youtube/comments.py +++ b/youtube/comments.py @@ -104,20 +104,19 @@ def post_process_comments_info(comments_info): comment['replies_url'] = None comment['replies_url'] = concat_or_none( util.URL_ORIGIN, - '/comments?replies=1&ctoken=' + ctoken) + f'/comments?replies=1&ctoken={ctoken}') if reply_count == 0: comment['view_replies_text'] = 'Reply' elif reply_count == 1: comment['view_replies_text'] = '1 reply' else: - comment['view_replies_text'] = str(reply_count) + ' replies' + comment['view_replies_text'] = f'{reply_count} replies' if comment['approx_like_count'] == '1': comment['likes_text'] = '1 like' else: - comment['likes_text'] = (str(comment['approx_like_count']) - + ' likes') + comment['likes_text'] = f"{comment['approx_like_count']} likes" comments_info['include_avatars'] = settings.enable_comment_avatars if comments_info['ctoken']: @@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): comments_info = {'error': None} try: other_sort_url = ( - util.URL_ORIGIN + '/comments?ctoken=' - + make_comment_ctoken(video_id, sort=1 - sort, lc=lc) + f"{util.URL_ORIGIN}/comments?ctoken=" + f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}" ) - other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top') + other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}' - this_sort_url = (util.URL_ORIGIN - + '/comments?ctoken=' - + make_comment_ctoken(video_id, sort=sort, lc=lc)) + this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken=" + f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}") comments_info['comment_links'] = [ (other_sort_text, other_sort_url), @@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): if e.code == '429' and settings.route_tor: comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.' if e.error_message: - comments_info['error'] += '\n\n' + e.error_message - comments_info['error'] += '\n\nExit node IP address: %s' % e.ip + comments_info['error'] += f'\n\n{e.error_message}' + comments_info['error'] += f'\n\nExit node IP address: {e.ip}' else: - comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) + comments_info['error'] = f'YouTube blocked the request. Error: {e}' except Exception as e: - comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) + comments_info['error'] = f'YouTube blocked the request. Error: {e}' if comments_info.get('error'): - print('Error retrieving comments for ' + str(video_id) + ':\n' + - comments_info['error']) + print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}') return comments_info @@ -218,12 +215,10 @@ def get_comments_page(): other_sort_url = None else: other_sort_url = ( - util.URL_ORIGIN - + '/comments?ctoken=' - + make_comment_ctoken(comments_info['video_id'], - sort=1-comments_info['sort']) + f'{util.URL_ORIGIN}/comments?ctoken=' + f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}' ) - other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top') + other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}' comments_info['comment_links'] = [(other_sort_text, other_sort_url)] return flask.render_template( diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py index e9b0b20..44207d2 100644 --- a/youtube/local_playlist.py +++ b/youtube/local_playlist.py @@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name): util.add_extra_html_info(video) if video['id'] + '.jpg' in thumbnails: video['thumbnail'] = ( - '/https://youtube.com/data/playlist_thumbnails/' - + playlist_name - + '/' + video['id'] + '.jpg') + f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg') else: video['thumbnail'] = util.get_thumbnail_url(video['id']) missing_thumbnails.append(video['id']) diff --git a/youtube/playlist.py b/youtube/playlist.py index 3b784ba..e1e1342 100644 --- a/youtube/playlist.py +++ b/youtube/playlist.py @@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True): continuation_info = proto.string(3, proto.percent_b64encode(offset)) - playlist_id = proto.string(2, 'VL' + playlist_id) + playlist_id = proto.string(2, f'VL{playlist_id}') pointless_nest = proto.string(80226972, playlist_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') @@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist", use_mobile=False): # Use innertube API (pbj=1 no longer works for many playlists) key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' data = { 'context': { @@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist", 'clientVersion': '2.20240327.00.00', }, }, - 'browseId': 'VL' + playlist_id, + 'browseId': f'VL{playlist_id}', } content_type_header = (('Content-Type', 'application/json'),) @@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False, page_size = 100 key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size, include_shorts=include_shorts) @@ -97,7 +97,7 @@ def get_playlist_page(): if playlist_id.startswith('RD'): first_video_id = playlist_id[2:] # video ID after 'RD' prefix return flask.redirect( - util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id, + f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}', 302 ) @@ -132,9 +132,9 @@ def get_playlist_page(): if 'id' in item and not item.get('thumbnail'): item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg" - item['url'] += '&list=' + playlist_id + item['url'] += f'&list={playlist_id}' if item['index']: - item['url'] += '&index=' + str(item['index']) + item['url'] += f'&index={item["index"]}' video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count') if video_count is None: diff --git a/youtube/proto.py b/youtube/proto.py index db83a06..72a8a94 100644 --- a/youtube/proto.py +++ b/youtube/proto.py @@ -76,7 +76,7 @@ def read_varint(data): except IndexError: if i == 0: raise EOFError() - raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) + raise Exception(f'Unterminated varint starting at {data.tell() - i}') result |= (byte & 127) << 7*i if not byte & 128: break @@ -118,7 +118,7 @@ def read_protobuf(data): elif wire_type == 5: value = data.read(4) else: - raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell())) + raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}") yield (wire_type, field_number, value) @@ -170,8 +170,7 @@ def _make_protobuf(data): elif field[0] == 2: result += string(field[1], _make_protobuf(field[2])) else: - raise NotImplementedError('Wire type ' + str(field[0]) - + ' not implemented') + raise NotImplementedError(f'Wire type {field[0]} not implemented') return result return data @@ -218,4 +217,4 @@ def b64_to_bytes(data): if isinstance(data, bytes): data = data.decode('ascii') data = data.replace("%3D", "=") - return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) + return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}') diff --git a/youtube/proto_debug.py b/youtube/proto_debug.py index 927b385..bb8da7c 100644 --- a/youtube/proto_debug.py +++ b/youtube/proto_debug.py @@ -179,7 +179,7 @@ def read_varint(data): except IndexError: if i == 0: raise EOFError() - raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) + raise Exception(f'Unterminated varint starting at {data.tell() - i}') result |= (byte & 127) << 7*i if not byte & 128: break @@ -235,8 +235,7 @@ def _make_protobuf(data): elif field[0] == 2: result += string(field[1], _make_protobuf(field[2])) else: - raise NotImplementedError('Wire type ' + str(field[0]) - + ' not implemented') + raise NotImplementedError(f'Wire type {field[0]} not implemented') return result return data @@ -286,7 +285,7 @@ def b64_to_bytes(data): if isinstance(data, bytes): data = data.decode('ascii') data = data.replace("%3D", "=") - return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) + return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}') # -------------------------------------------------------------------- @@ -344,7 +343,7 @@ fromhex = bytes.fromhex def aligned_ascii(data): - return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data) + return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data) def parse_protobuf(data, mutable=False, spec=()): @@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()): elif wire_type == 5: value = data.read(4) else: - raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell())) + raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}") if mutable: yield [wire_type, field_number, value] else: @@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None): if map01 is not None: map01 = _bytes_from_decode_data(map01) assert len(map01) == 1, repr(map01) - s = s.translate(bytes.maketrans(b'01', b'O' + map01)) + s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}')) if casefold: s = s.upper() # Strip off pad characters from the right. We need to count the pad @@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None): def dec32(data): if isinstance(data, bytes): data = data.decode('ascii') - return b32decode(data + "="*((8 - len(data)%8)%8)) + return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}') _patterns = [ @@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work if len(obj) == 3: # (wire_type, field_number, data) return obj.__repr__() else: # (base64, [...]) - return ('(' + obj[0].__repr__() + ',\n' - + indent_lines(_pp(obj[1], indent), indent) + '\n' - + ')') + return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)" elif isinstance(obj, list): # [wire_type, field_number, data] if (len(obj) == 3 @@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work elif (len(obj) == 3 and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) ): - return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' - + indent_lines(_pp(obj[2], indent), indent) + '\n' - + ']') + return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]" else: s = '[\n' for x in obj: - s += indent_lines(_pp(x, indent), indent) + ',\n' + s += f"{indent_lines(_pp(x, indent), indent)},\n" s += ']' return s else: diff --git a/youtube/search.py b/youtube/search.py index 6e62e28..7573595 100644 --- a/youtube/search.py +++ b/youtube/search.py @@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters): 'X-YouTube-Client-Name': '1', 'X-YouTube-Client-Version': '2.20180418', } - url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D") + url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}" content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results') info = json.loads(content) return info diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index dafea58..7d3efab 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -126,7 +126,7 @@ def delete_thumbnails(to_delete): os.remove(os.path.join(thumbnails_directory, thumbnail)) existing_thumbnails.remove(video_id) except Exception: - print('Failed to delete thumbnail: ' + thumbnail) + print(f'Failed to delete thumbnail: {thumbnail}') traceback.print_exc() @@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None): 'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]), 'author': db_video[5], 'author_id': db_video[6], - 'author_url': '/https://www.youtube.com/channel/' + db_video[6], + 'author_url': f'/https://www.youtube.com/channel/{db_video[6]}', }) return videos, pseudo_number_of_videos @@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time): if delta >= unit_time: quantifier = round(delta/unit_time) if quantifier == 1: - return '1 ' + unit_name + ' ago' + return f'1 {unit_name} ago' else: - return str(quantifier) + ' ' + unit_name + 's ago' + return f'{quantifier} {unit_name}s ago' else: raise Exception() @@ -363,7 +363,7 @@ def autocheck_dispatcher(): time_until_earliest_job = earliest_job['next_check_time'] - time.time() if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow - print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) + print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}') next_check_time = time.time() + 3600*secrets.randbelow(60)/60 with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time @@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids): def _get_atoma_feed(channel_id): - url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id + url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}' try: return util.fetch_url(url).decode('utf-8') except util.FetchError as e: @@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name): return channel_info except util.FetchError as e: if e.code == '429' and settings.route_tor: - error_message = ('Error checking channel ' + channel_status_name - + ': YouTube blocked the request because the' - + ' Tor exit node is overutilized. Try getting a new exit node' - + ' by using the New Identity button in the Tor Browser.') + error_message = (f'Error checking channel {channel_status_name}: ' + f'YouTube blocked the request because the Tor exit node is overutilized. ' + f'Try getting a new exit node by using the New Identity button in the Tor Browser.') if e.ip: - error_message += ' Exit node IP address: ' + e.ip + error_message += f' Exit node IP address: {e.ip}' print(error_message) return None elif e.code == '502': - print('Error checking channel', channel_status_name + ':', str(e)) + print(f'Error checking channel {channel_status_name}: {e}') return None raise @@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id): except KeyError: channel_status_name = channel_id - print("Checking channel: " + channel_status_name) + print(f"Checking channel: {channel_status_name}") tasks = ( # channel page, need for video duration @@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id): times_published[video_id_element.text] = time_published except ValueError: - print('Failed to read atoma feed for ' + channel_status_name) + print(f'Failed to read atoma feed for {channel_status_name}') traceback.print_exc() except defusedxml.ElementTree.ParseError: - print('Failed to read atoma feed for ' + channel_status_name) + print(f'Failed to read atoma feed for {channel_status_name}') if channel_info is None: # there was an error return if channel_info['error']: - print('Error checking channel ' + channel_status_name + ': ' + channel_info['error']) + print(f'Error checking channel {channel_status_name}: {channel_info["error"]}') return videos = channel_info['items'] @@ -1023,7 +1022,7 @@ def get_subscriptions_page(): tag = request.args.get('tag', None) videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag) for video in videos: - video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg' + video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg' video['type'] = 'video' video['item_size'] = 'small' util.add_extra_html_info(video) @@ -1033,7 +1032,7 @@ def get_subscriptions_page(): subscription_list = [] for channel_name, channel_id, muted in _get_subscribed_channels(cursor): subscription_list.append({ - 'channel_url': util.URL_ORIGIN + '/channel/' + channel_id, + 'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}', 'channel_name': channel_name, 'channel_id': channel_id, 'muted': muted, @@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): url = f"https://i.ytimg.com/vi/{video_id}/{quality}" try: - image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id) + image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}") break except util.FetchError as e: if '404' in str(e): continue - print("Failed to download thumbnail for " + video_id + ": " + str(e)) + print(f"Failed to download thumbnail for {video_id}: {e}") flask.abort(500) except urllib.error.HTTPError as e: if e.code == 404: continue - print("Failed to download thumbnail for " + video_id + ": " + str(e)) + print(f"Failed to download thumbnail for {video_id}: {e}") flask.abort(e.code) if image is None: diff --git a/youtube/util.py b/youtube/util.py index 5e60d1c..7901a89 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -72,7 +72,7 @@ class TorManager: def __init__(self): self.old_tor_connection_pool = None self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( - 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', + f'socks5h://127.0.0.1:{settings.tor_port}/', cert_reqs='CERT_REQUIRED') self.tor_pool_refresh_time = time.monotonic() settings.add_setting_changed_hook( @@ -92,7 +92,7 @@ class TorManager: self.old_tor_connection_pool = self.tor_connection_pool self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( - 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', + f'socks5h://127.0.0.1:{settings.tor_port}/', cert_reqs='CERT_REQUIRED') self.tor_pool_refresh_time = time.monotonic() @@ -198,9 +198,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): class FetchError(Exception): def __init__(self, code, reason='', ip=None, error_message=None): if error_message: - string = code + ' ' + reason + ': ' + error_message + string = f"{code} {reason}: {error_message}" else: - string = 'HTTP error during request: ' + code + ' ' + reason + string = f"HTTP error during request: {code} {reason}" Exception.__init__(self, string) self.code = code self.reason = reason @@ -294,14 +294,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None, exception_cause = e.__context__.__context__ if (isinstance(exception_cause, socks.ProxyConnectionError) and settings.route_tor): - msg = ('Failed to connect to Tor. Check that Tor is open and ' - 'that your internet connection is working.\n\n' - + str(e)) + msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}' raise FetchError('502', reason='Bad Gateway', error_message=msg) elif isinstance(e.__context__, urllib3.exceptions.NewConnectionError): - msg = 'Failed to establish a connection.\n\n' + str(e) + msg = f'Failed to establish a connection.\n\n{e}' raise FetchError( '502', reason='Bad Gateway', error_message=msg) @@ -391,7 +389,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, if error: raise FetchError( '429', reason=response.reason, ip=ip, - error_message='Automatic circuit change: ' + error) + error_message=f'Automatic circuit change: {error}') continue # retry with new identity # Check for client errors (400, 404) - don't retry these @@ -467,10 +465,7 @@ def head(url, use_tor=False, report_text=None, max_redirects=10): headers = {'User-Agent': 'Python-urllib'} response = pool.request('HEAD', url, headers=headers, retries=retries) if report_text: - print( - report_text, - ' Latency:', - round(time.monotonic() - start_time, 3)) + print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}') return response mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' @@ -544,16 +539,16 @@ def download_thumbnail(save_directory, video_id): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): url = f'https://i.ytimg.com/vi/{video_id}/{quality}' try: - thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id) + thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}') except FetchError as e: if '404' in str(e): continue - print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) + print(f'Failed to download thumbnail for {video_id}: {e}') return False except urllib.error.HTTPError as e: if e.code == 404: continue - print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) + print(f'Failed to download thumbnail for {video_id}: {e}') return False try: with open(save_location, 'wb') as f: @@ -563,7 +558,7 @@ def download_thumbnail(save_directory, video_id): with open(save_location, 'wb') as f: f.write(thumbnail) return True - print('No thumbnail available for ' + video_id) + print(f'No thumbnail available for {video_id}') return False @@ -698,7 +693,7 @@ def prefix_urls(item): def add_extra_html_info(item): if item['type'] == 'video': - item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None + item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None video_info = {} for key in ('id', 'title', 'author', 'duration', 'author_id'): @@ -721,7 +716,7 @@ def add_extra_html_info(item): item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id']) if item.get('author_id') and 'author_url' not in item: - item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id'] + item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}' def check_gevent_exceptions(*tasks): @@ -967,7 +962,7 @@ def call_youtube_api(client, api, data): user_agent = context['client'].get('userAgent') or mobile_user_agent visitor_data = get_visitor_data() - url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key + url = f'https://{host}/youtubei/v1/{api}?key={key}' if visitor_data: context['client'].update({'visitorData': visitor_data}) data['context'] = context @@ -978,8 +973,8 @@ def call_youtube_api(client, api, data): headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data )) response = fetch_url( url, data=data, headers=headers, - debug_name='youtubei_' + api + '_' + client, - report_text='Fetched ' + client + ' youtubei ' + api + debug_name=f'youtubei_{api}_{client}', + report_text=f'Fetched {client} youtubei {api}' ).decode('utf-8') return response diff --git a/youtube/watch.py b/youtube/watch.py index 7f87215..9d1e442 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution): if fmt['acodec'] and fmt['vcodec']: if fmt.get('audio_track_is_default', True) is False: continue - source = {'type': 'video/' + fmt['ext'], + source = {'type': f"video/{fmt['ext']}", 'quality_string': short_video_quality_string(fmt)} source['quality_string'] += ' (integrated)' source.update(fmt) @@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution): if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']): if fmt['bitrate']: fmt['audio_bitrate'] = int(fmt['bitrate']/1000) - source = {'type': 'audio/' + fmt['ext'], + source = {'type': f"audio/{fmt['ext']}", 'quality_string': audio_quality_string(fmt)} source.update(fmt) - source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"' + source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\"" tid = fmt.get('audio_track_id') or 'default' if tid not in audio_by_track: audio_by_track[tid] = { @@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution): elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')): if codec_name(fmt['vcodec']) == 'unknown': continue - source = {'type': 'video/' + fmt['ext'], + source = {'type': f"video/{fmt['ext']}", 'quality_string': short_video_quality_string(fmt)} source.update(fmt) - source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"' - quality = str(fmt['quality']) + 'p' + str(fmt['fps']) + source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\"" + quality = f"{fmt['quality']}p{fmt['fps']}" video_only_sources.setdefault(quality, []).append(source) audio_tracks = [] @@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution): def video_rank(src): ''' Sort by settings preference. Use file size as tiebreaker ''' - setting_name = 'codec_rank_' + codec_name(src['vcodec']) + setting_name = f'codec_rank_{codec_name(src["vcodec"])}' return (settings.current_settings_dict[setting_name], src['file_size']) pair_info['videos'].sort(key=video_rank) @@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None): if auto: label += ' (Automatic)' if trans_lang: - label += ' -> ' + trans_lang + label += f' -> {trans_lang}' # Try to use Android caption URL directly (no PO Token needed) caption_url = None @@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None): else: caption_url += '&fmt=vtt' if trans_lang: - caption_url += '&tlang=' + trans_lang + caption_url += f'&tlang={trans_lang}' url = util.prefix_url(caption_url) else: # Fallback to old method @@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id): player_name = info['player_name'] if player_name in decrypt_cache: - print('Using cached decryption function for: ' + player_name) + print(f'Using cached decryption function for: {player_name}') info['decryption_function'] = decrypt_cache[player_name] else: - base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) + base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}') base_js = base_js.decode('utf-8') err = yt_data_extract.extract_decryption_function(info, base_js) if err: @@ -387,11 +387,11 @@ def fetch_player_response(client, video_id): def fetch_watch_page_info(video_id, playlist_id, index): # bpctr=9999999999 will bypass are-you-sure dialogs for controversial # videos - url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999' + url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999' if playlist_id: - url += '&list=' + playlist_id + url += f'&list={playlist_id}' if index: - url += '&index=' + index + url += f'&index={index}' headers = ( ('Accept', '*/*'), @@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): # Register HLS audio tracks for proxy access added = 0 for lang, track in info['hls_audio_tracks'].items(): - ck = video_id + '_' + lang + ck = f"{video_id}_{lang}" from youtube.hls_cache import register_track register_track(ck, track['hls_url'], video_id=video_id, track_id=lang) @@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): 'audio_track_id': lang, 'audio_track_name': track['name'], 'audio_track_is_default': track['is_default'], - 'itag': 'hls_' + lang, + 'itag': f'hls_{lang}', 'ext': 'mp4', 'audio_bitrate': 128, 'bitrate': 128000, @@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): 'fps': None, 'init_range': {'start': 0, 'end': 0}, 'index_range': {'start': 0, 'end': 0}, - 'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck), + 'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}', 's': None, 'sp': None, 'quality': None, @@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): # Register HLS manifest for proxying if info['hls_manifest_url']: - ck = video_id + '_video' + ck = f"{video_id}_video" from youtube.hls_cache import register_track register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video') # Use proxy URL instead of direct Google Video URL - info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck) + info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}' # Fallback to 'ios' if no valid URLs are found if not info.get('formats') or info.get('player_urls_missing'): @@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): if info.get('formats'): decryption_error = decrypt_signatures(info, video_id) if decryption_error: - info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error + info['playability_error'] = f'Error decrypting url signatures: {decryption_error}' # check if urls ready (non-live format) in former livestream # urls not ready if all of them have no filesize @@ -623,9 +623,9 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): def video_quality_string(format): if format['vcodec']: - result = str(format['width'] or '?') + 'x' + str(format['height'] or '?') + result = f"{format['width'] or '?'}x{format['height'] or '?'}" if format['fps']: - result += ' ' + str(format['fps']) + 'fps' + result += f" {format['fps']}fps" return result elif format['acodec']: return 'audio only' @@ -634,7 +634,7 @@ def video_quality_string(format): def short_video_quality_string(fmt): - result = str(fmt['quality'] or '?') + 'p' + result = f"{fmt['quality'] or '?'}p" if fmt['fps']: result += str(fmt['fps']) if fmt['vcodec'].startswith('av01'): @@ -642,18 +642,18 @@ def short_video_quality_string(fmt): elif fmt['vcodec'].startswith('avc'): result += ' h264' else: - result += ' ' + fmt['vcodec'] + result += f" {fmt['vcodec']}" return result def audio_quality_string(fmt): if fmt['acodec']: if fmt['audio_bitrate']: - result = '%d' % fmt['audio_bitrate'] + 'k' + result = f"{fmt['audio_bitrate']}k" else: result = '?k' if fmt['audio_sample_rate']: - result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz' + result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz" return result elif fmt['vcodec']: return 'video only' @@ -737,9 +737,9 @@ def get_audio_track(): seg = line if line.startswith('http') else urljoin(playlist_base, line) # Always use &seg= parameter, never &url= for segments playlist_lines.append( - base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(seg, safe='') + f'{base_url}/ytl-api/audio-track?id=' + f'{urllib.parse.quote(cache_key)}' + f'&seg={urllib.parse.quote(seg, safe="")}' ) playlist = '\n'.join(playlist_lines) @@ -797,9 +797,7 @@ def get_audio_track(): return url if not url.startswith('http://') and not url.startswith('https://'): url = urljoin(playlist_base, url) - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(url, safe='')) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}' playlist_lines = [] for line in playlist.split('\n'): @@ -812,7 +810,7 @@ def get_audio_track(): if line.startswith('#') and 'URI=' in line: def rewrite_uri_attr(match): uri = match.group(1) - return 'URI="' + proxy_url(uri) + '"' + return f'URI="{proxy_url(uri)}"' line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line) playlist_lines.append(line) elif line.startswith('#'): @@ -883,9 +881,7 @@ def get_audio_track(): if segment_url.startswith('/ytl-api/audio-track'): return segment_url base_url = request.url_root.rstrip('/') - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(segment_url)) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}' playlist_lines = [] for line in playlist.split('\n'): @@ -949,14 +945,10 @@ def get_hls_manifest(): if is_audio_track: # Audio track playlist - proxy through audio-track endpoint - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&url=' + urllib.parse.quote(url, safe='')) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}' else: # Video segment or variant playlist - proxy through audio-track endpoint - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(url, safe='')) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}' # Parse and rewrite the manifest manifest_lines = [] @@ -974,7 +966,7 @@ def get_hls_manifest(): nonlocal rewritten_count uri = match.group(1) rewritten_count += 1 - return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"' + return f'URI="{rewrite_url(uri, is_audio_track=True)}"' line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line) manifest_lines.append(line) elif line.startswith('#'): @@ -1053,7 +1045,7 @@ def get_storyboard_vtt(): ts = 0 # current timestamp for i in range(storyboard.storyboard_count): - url = '/' + storyboard.url.replace("$M", str(i)) + url = f'/{storyboard.url.replace("$M", str(i))}' interval = storyboard.interval w, h = storyboard.width, storyboard.height w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt @@ -1078,7 +1070,7 @@ def get_watch_page(video_id=None): if not video_id: return flask.render_template('error.html', error_message='Missing video id'), 404 if len(video_id) < 11: - return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404 + return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404 time_start_str = request.args.get('t', '0s') time_start = 0 @@ -1141,9 +1133,9 @@ def get_watch_page(video_id=None): util.prefix_urls(item) util.add_extra_html_info(item) if playlist_id: - item['url'] += '&list=' + playlist_id + item['url'] += f'&list={playlist_id}' if item['index']: - item['url'] += '&index=' + str(item['index']) + item['url'] += f'&index={item["index"]}' info['playlist']['author_url'] = util.prefix_url( info['playlist']['author_url']) if settings.img_prefix: @@ -1159,16 +1151,16 @@ def get_watch_page(video_id=None): filename = title ext = fmt.get('ext') if ext: - filename += '.' + ext + filename += f'.{ext}' fmt['url'] = fmt['url'].replace( '/videoplayback', - '/videoplayback/name/' + filename) + f'/videoplayback/name/{filename}') download_formats = [] for format in (info['formats'] + info['hls_formats']): if format['acodec'] and format['vcodec']: - codecs_string = format['acodec'] + ', ' + format['vcodec'] + codecs_string = f"{format['acodec']}, {format['vcodec']}" else: codecs_string = format['acodec'] or format['vcodec'] or '?' download_formats.append({ @@ -1247,12 +1239,9 @@ def get_watch_page(video_id=None): for source in subtitle_sources: best_caption_parse = urllib.parse.urlparse( source['url'].lstrip('/')) - transcript_url = (util.URL_ORIGIN - + '/watch/transcript' - + best_caption_parse.path - + '?' + best_caption_parse.query) + transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}' other_downloads.append({ - 'label': 'Video Transcript: ' + source['label'], + 'label': f'Video Transcript: {source["label"]}', 'ext': 'txt', 'url': transcript_url }) @@ -1263,7 +1252,7 @@ def get_watch_page(video_id=None): template_name = 'watch.html' return flask.render_template(template_name, header_playlist_names = local_playlist.get_playlist_names(), - uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '', + uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '', time_published = info['time_published'], view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), @@ -1305,10 +1294,10 @@ def get_watch_page(video_id=None): ip_address = info['ip_address'] if settings.route_tor else None, invidious_used = info['invidious_used'], invidious_reload_button = info['invidious_reload_button'], - video_url = util.URL_ORIGIN + '/watch?v=' + video_id, + video_url = f'{util.URL_ORIGIN}/watch?v={video_id}', video_id = video_id, - storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' + - urlencode([('spec_url', info['storyboard_spec_url'])]) + storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?' + f'{urlencode([("spec_url", info["storyboard_spec_url"])])}' if info['storyboard_spec_url'] else None), js_data = { @@ -1335,7 +1324,7 @@ def get_watch_page(video_id=None): @yt_app.route('/api/') def get_captions(dummy): - url = 'https://www.youtube.com' + request.full_path + url = f'https://www.youtube.com{request.full_path}' try: result = util.fetch_url(url, headers=util.mobile_ua) result = result.replace(b"align:start position:0%", b"") @@ -1350,12 +1339,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>') @yt_app.route('/watch/transcript/') def get_transcript(caption_path): try: - captions = util.fetch_url('https://www.youtube.com/' - + caption_path - + '?' + request.environ['QUERY_STRING']).decode('utf-8') + captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8') except util.FetchError as e: - msg = ('Error retrieving captions: ' + str(e) + '\n\n' - + 'The caption url may have expired.') + msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.' print(msg) return flask.Response( msg, @@ -1403,7 +1389,7 @@ def get_transcript(caption_path): result = '' for seg in segments: if seg['text'] != ' ': - result += seg['begin'] + ' ' + seg['text'] + '\r\n' + result += f"{seg['begin']} {seg['text']}\r\n" return flask.Response(result.encode('utf-8'), mimetype='text/plain;charset=UTF-8') diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py index dce1d30..f91a467 100644 --- a/youtube/yt_data_extract/common.py +++ b/youtube/yt_data_extract/common.py @@ -212,7 +212,7 @@ def extract_date(date_text): month, day, year = parts[-3:] month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): - return year + '-' + month + '-' + day + return f'{year}-{month}-{day}' return None def check_missing_keys(object, *key_sequences): @@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences): for key in key_sequence: _object = _object[key] except (KeyError, IndexError, TypeError): - return 'Could not find ' + key + return f'Could not find {key}' return None @@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}): ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] )) - info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None + info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None info['description'] = extract_formatted_text(multi_deep_get( item, ['descriptionText'], ['descriptionSnippet'], diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py index 5930111..b7379a5 100644 --- a/youtube/yt_data_extract/everything_else.py +++ b/youtube/yt_data_extract/everything_else.py @@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json): metadata['description'] = desc if metadata['author_id']: - metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] + metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}' if metadata['first_video_id'] is None: metadata['thumbnail'] = None diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py index de87a6a..2a60741 100644 --- a/youtube/yt_data_extract/watch_extraction.py +++ b/youtube/yt_data_extract/watch_extraction.py @@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''): ) if playability_status not in (None, 'OK'): - info['playability_error'] = error_prefix + playability_reason + info['playability_error'] = f'{error_prefix}{playability_reason}' elif not info['playability_error']: # do not override - info['playability_error'] = error_prefix + 'Unknown playability error' + info['playability_error'] = f'{error_prefix}Unknown playability error' SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') def extract_watch_info(polymer_json): @@ -726,7 +726,7 @@ def extract_watch_info(polymer_json): # Store the full URL from the player response (includes valid tokens) if base_url: normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url - info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized + info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) if lang_name: info['_manual_caption_language_names'][lang_code] = lang_name @@ -806,7 +806,7 @@ def extract_watch_info(polymer_json): info['allowed_countries'] = mf.get('availableCountries', []) # other stuff - info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None + info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec') return info @@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag url = info['_captions_base_url'] if not url: return None - url += '&lang=' + language - url += '&fmt=' + format + url += f'&lang={language}' + url += f'&fmt={format}' if automatic: url += '&kind=asr' elif language in info['_manual_caption_language_names']: - url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='') + url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}' if translation_language: url += '&tlang=' + translation_language @@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js): return 'Could not find var_name' var_name = var_with_operation_match.group(1) - var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) + var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL) if var_body_match is None: return 'Could not find var_body' @@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js): elif op_body.startswith('var c=a[0]'): operation_definitions[op_name] = 2 else: - return 'Unknown op_body: ' + op_body + return f'Unknown op_body: {op_body}' decryption_function = [] for op_with_arg in function_body: @@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js): return 'Could not parse operation with arg' op_name = match.group(2).strip('[].') if op_name not in operation_definitions: - return 'Unknown op_name: ' + str(op_name) + return f'Unknown op_name: {op_name}' op_argument = match.group(3) decryption_function.append([operation_definitions[op_name], int(op_argument)]) @@ -1028,5 +1028,5 @@ def decrypt_signatures(info): _operation_2(a, argument) signature = ''.join(a) - format['url'] += '&' + format['sp'] + '=' + signature + format['url'] += f'&{format["sp"]}={signature}' return False