import base64 from youtube import (util, yt_data_extract, local_playlist, subscriptions, playlist) from youtube import yt_app import settings import urllib import json from string import Template import youtube.proto as proto import html import math import gevent import re import cachetools.func import traceback import flask from flask import request headers_desktop = ( ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), ('X-YouTube-Client-Name', '1'), ('X-YouTube-Client-Version', '2.20180830'), ) + util.desktop_ua headers_mobile = ( ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), ('X-YouTube-Client-Name', '2'), ('X-YouTube-Client-Version', '2.20180830'), ) + util.mobile_ua real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) # Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest # include_shorts only applies to tab='videos'; tab='shorts'/'streams' always include their own content. def channel_ctoken_v5(channel_id, page, sort, tab, view=1, include_shorts=True): # Tab-specific protobuf field numbers (from Invidious source) # Each tab uses different field numbers in the protobuf structure: # videos: 110 -> 3 -> 15 -> { 2:{1:UUID}, 4:sort, 8:{1:UUID, 3:sort} } # shorts: 110 -> 3 -> 10 -> { 2:{1:UUID}, 4:sort, 7:{1:UUID, 3:sort} } # streams: 110 -> 3 -> 14 -> { 2:{1:UUID}, 5:sort, 8:{1:UUID, 3:sort} } tab_config = { 'videos': {'tab_field': 15, 'sort_field': 4, 'embedded_field': 8}, 'shorts': {'tab_field': 10, 'sort_field': 4, 'embedded_field': 7}, 'streams': {'tab_field': 14, 'sort_field': 5, 'embedded_field': 8}, } config = tab_config.get(tab, tab_config['videos']) tab_field = config['tab_field'] sort_field = config['sort_field'] embedded_field = config['embedded_field'] # Map sort values to YouTube API values if tab == 'streams': sort_mapping = {'1': 14, '2': 13, '3': 12, '4': 12} else: sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} new_sort = sort_mapping.get(sort, sort_mapping['3']) # UUID placeholder (field 1) uuid_str = "00000000-0000-0000-0000-000000000000" # Build the tab-level object matching Invidious structure exactly: # { 2: embedded{1: UUID}, sort_field: sort_val, embedded_field: embedded{1: UUID, 3: sort_val} } tab_content = ( proto.string(2, proto.string(1, uuid_str)) + proto.uint(sort_field, new_sort) + proto.string(embedded_field, proto.string(1, uuid_str) + proto.uint(3, new_sort)) ) tab_wrapper = proto.string(tab_field, tab_content) inner_container = proto.string(3, tab_wrapper) outer_container = proto.string(110, inner_container) # Add shorts filter when include_shorts=False (field 104, same as playlist.py) # This tells YouTube to exclude shorts from the results if not include_shorts: outer_container += proto.string(104, proto.uint(2, 1)) encoded_inner = proto.percent_b64encode(outer_container) pointless_nest = proto.string(80226972, proto.string(2, channel_id) + proto.string(3, encoded_inner) ) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def channel_about_ctoken(channel_id): return proto.make_protobuf( ('base64p', [ [2, 80226972, [ [2, 2, channel_id], [2, 3, ('base64p', [ [2, 110, [ [2, 3, [ [2, 19, [ [2, 1, b'66b0e9e9-0000-2820-9589-582429a83980'], ] ], ] ], ] ], ] ) ], ] ], ] ) ) # https://github.com/user234683/youtube-local/issues/151 def channel_ctoken_v4(channel_id, page, sort, tab, view=1): new_sort = (2 if int(sort) == 1 else 1) offset = str(30*(int(page) - 1)) pointless_nest = proto.string(80226972, proto.string(2, channel_id) + proto.string(3, proto.percent_b64encode( proto.string(110, proto.string(3, proto.string(15, proto.string(1, proto.string(1, proto.unpadded_b64encode( proto.string(1, proto.unpadded_b64encode( proto.string(2, b"ST:" + proto.unpadded_b64encode( proto.string(2, offset) ) ) ) ) ) ) # targetId, just needs to be present but # doesn't need to be correct + proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38") ) # 1 - newest, 2 - popular + proto.uint(3, new_sort) ) ) ) ) ) ) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') # SORT: # videos: # Newest - 3 # Last video added - 4 # view: # grid: 0 or 1 # list: 2 def channel_ctoken_v3(channel_id, page, sort, tab, view=1): # page > 1 doesn't work when sorting by oldest offset = 30*(int(page) - 1) page_token = proto.string(61, proto.unpadded_b64encode( proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) )) tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string(3, proto.percent_b64encode(tab + sort + shelf_view + view + page_token) ) channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def channel_ctoken_v2(channel_id, page, sort, tab, view=1): # see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646 # page > 1 doesn't work when sorting by oldest offset = 30*(int(page) - 1) schema_number = { 3: 6307666885028338688, 2: 17254859483345278706, 1: 16570086088270825023, }[int(sort)] page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1, proto.uint(1, schema_number) + proto.string(2, proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) ) ))) tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) #page = proto.string(15, str(page)) shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string( 3, proto.percent_b64encode(tab + sort + shelf_view + view + page_token) ) channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def channel_ctoken_v1(channel_id, page, sort, tab, view=1): tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) page = proto.string(15, str(page)) # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string(3, proto.percent_b64encode(tab + view + sort + shelf_view + page + proto.uint(23, 0)) ) channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, ctoken=None, print_status=True, include_shorts=True): message = 'Got channel tab' if print_status else None if not ctoken: if tab in ('videos', 'shorts', 'streams'): ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view, include_shorts) else: ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = ctoken.replace('=', '%3D') # Not sure what the purpose of the key is or whether it will change # For now it seems to be constant for the API endpoint, not dependent # on the browsing session or channel key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key data = { 'context': { 'client': { 'hl': 'en', 'gl': 'US', 'clientName': 'WEB', 'clientVersion': '2.20240327.00.00', }, }, 'continuation': ctoken, } content_type_header = (('Content-Type', 'application/json'),) content = util.fetch_url( url, headers_desktop + content_type_header, data=json.dumps(data), debug_name='channel_tab', report_text=message) return content # cache entries expire after 30 minutes number_of_videos_cache = cachetools.TTLCache(128, 30*60) # Cache for continuation tokens (shorts/streams pagination) continuation_token_cache = cachetools.TTLCache(512, 15*60) @cachetools.cached(number_of_videos_cache) def get_number_of_videos_channel(channel_id): if channel_id is None: return 1000 # Uploads playlist playlist_id = 'UU' + channel_id[2:] url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' try: response = util.fetch_url(url, headers_mobile, debug_name='number_of_videos', report_text='Got number of videos') except (urllib.error.HTTPError, util.FetchError) as e: traceback.print_exc() print("Couldn't retrieve number of videos") return 1000 response = response.decode('utf-8') # Try several patterns since YouTube's format changes: # "numVideosText":{"runs":[{"text":"1,234"},{"text":" videos"}]} # "stats":[..., {"runs":[{"text":"1,234"},{"text":" videos"}]}] for pattern in ( r'"numVideosText".*?"text":\s*"([\d,]+)"', r'"numVideosText".*?([\d,]+)\s*videos?', r'"numVideosText".*?([,\d]+)', r'([\d,]+)\s*videos?\s*', ): match = re.search(pattern, response) if match: try: return int(match.group(1).replace(',', '')) except ValueError: continue # Fallback: unknown count return 0 def set_cached_number_of_videos(channel_id, num_videos): @cachetools.cached(number_of_videos_cache) def dummy_func_using_same_cache(channel_id): return num_videos dummy_func_using_same_cache(channel_id) channel_id_re = re.compile(r'videos\.xml\?channel_id=([a-zA-Z0-9_-]{24})"') @cachetools.func.lru_cache(maxsize=128) def get_channel_id(base_url): # method that gives the smallest possible response at ~4 kb # needs to be as fast as possible base_url = base_url.replace('https://www', 'https://m') # avoid redirect response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') match = channel_id_re.search(response) if match: return match.group(1) return None metadata_cache = cachetools.LRUCache(128) @cachetools.cached(metadata_cache) def get_metadata(channel_id): # Use youtubei browse API to get channel metadata polymer_json = util.call_youtube_api('web', 'browse', { 'browseId': channel_id, }) info = yt_data_extract.extract_channel_info(json.loads(polymer_json), 'about', continuation=False) return extract_metadata_for_caching(info) def set_cached_metadata(channel_id, metadata): @cachetools.cached(metadata_cache) def dummy_func_using_same_cache(channel_id): return metadata dummy_func_using_same_cache(channel_id) def extract_metadata_for_caching(channel_info): metadata = {} for key in ('approx_subscriber_count', 'short_description', 'channel_name', 'avatar'): metadata[key] = channel_info[key] return metadata def get_number_of_videos_general(base_url): return get_number_of_videos_channel(get_channel_id(base_url)) def get_channel_search_json(channel_id, query, page): offset = proto.unpadded_b64encode(proto.uint(3, (page-1)*30)) params = proto.string(2, 'search') + proto.string(15, offset) params = proto.percent_b64encode(params) ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query) ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key data = { 'context': { 'client': { 'hl': 'en', 'gl': 'US', 'clientName': 'WEB', 'clientVersion': '2.20240327.00.00', }, }, 'continuation': ctoken, } content_type_header = (('Content-Type', 'application/json'),) polymer_json = util.fetch_url( url, headers_desktop + content_type_header, data=json.dumps(data), debug_name='channel_search') return polymer_json def post_process_channel_info(info): info['avatar'] = util.prefix_url(info['avatar']) info['channel_url'] = util.prefix_url(info['channel_url']) for item in info['items']: # Only set thumbnail if YouTube didn't provide one if not item.get('thumbnail'): if item.get('type') == 'playlist' and item.get('first_video_id'): item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id']) elif item.get('type') == 'video' and item.get('id'): item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id']) util.prefix_urls(item) util.add_extra_html_info(item) if info['current_tab'] == 'about': for i, (text, url) in enumerate(info['links']): if isinstance(url, str) and util.YOUTUBE_URL_RE.fullmatch(url): info['links'][i] = (text, util.prefix_url(url)) def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): if channel_id: base_url = 'https://www.youtube.com/channel/' + channel_id # Build URL with sort parameter # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts # Note: 'da' (oldest) was removed by YouTube in January 2026 url = base_url + '/' + tab + '?pbj=1&view=0' if sort: # Map sort values to YouTube's URL parameter values sort_map = {'3': 'dd', '4': 'lad'} url += '&sort=' + sort_map.get(sort, 'dd') return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} # youtube.com/[channel_id]/[tab] # youtube.com/user/[username]/[tab] # youtube.com/c/[custom]/[tab] # youtube.com/[custom]/[tab] def get_channel_page_general_url(base_url, tab, request, channel_id=None): page_number = int(request.args.get('page', 1)) # sort 1: views # sort 2: oldest # sort 3: newest (includes shorts, via UU uploads playlist) # sort 4: newest - no shorts (uses channel Videos tab API directly, like Invidious) default_sort = '3' if settings.include_shorts_in_channel else '4' sort = request.args.get('sort', default_sort) view = request.args.get('view', '1') query = request.args.get('query', '') ctoken = request.args.get('ctoken', '') default_params = (page_number == 1 and sort in ('3', '4') and view == '1') continuation = bool(ctoken) page_size = 30 polymer_json = None number_of_videos = 0 info = None # ------------------------------------------------------------------------- # sort=3: use UU uploads playlist (includes shorts) # ------------------------------------------------------------------------- if tab == 'videos' and sort == '3': if not channel_id: channel_id = get_channel_id(base_url) if page_number == 1: tasks = ( gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], report_text='Retrieved channel videos'), gevent.spawn(get_metadata, channel_id), ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) number_of_videos = pl_info['metadata']['video_count'] if number_of_videos is None: number_of_videos = 1000 else: set_cached_number_of_videos(channel_id, number_of_videos) else: tasks = ( gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], page_number, include_shorts=True), gevent.spawn(get_metadata, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id), gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], report_text='Retrieved channel video count'), ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) first_page_meta = yt_data_extract.extract_playlist_metadata(tasks[3].value) number_of_videos = (tasks[2].value or first_page_meta.get('video_count') or 0) if pl_info['items']: info = pl_info info['channel_id'] = channel_id info['current_tab'] = 'videos' page_size = 100 # else fall through to the channel browse API below # ------------------------------------------------------------------------- # Channel browse API: sort=4 (videos tab, no shorts), shorts, streams, # or fallback when the UU playlist returned no items. # Uses channel_ctoken_v5 per-tab tokens, mirroring Invidious's approach. # Pagination is driven by the continuation token YouTube returns each page. # ------------------------------------------------------------------------- used_channel_api = False if info is None and ( tab in ('shorts', 'streams') or (tab == 'videos' and sort == '4') or (tab == 'videos' and sort == '3') # UU-playlist fallback ): if not channel_id: channel_id = get_channel_id(base_url) used_channel_api = True # Determine what browse call to make if ctoken: browse_call = (util.call_youtube_api, 'web', 'browse', {'continuation': ctoken}) continuation = True elif page_number > 1: cache_key = (channel_id, tab, sort, page_number - 1) cached_ctoken = continuation_token_cache.get(cache_key) if cached_ctoken: browse_call = (util.call_youtube_api, 'web', 'browse', {'continuation': cached_ctoken}) else: # Cache miss — restart from page 1 (better than an error) browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) continuation = True else: browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) continuation = True # Single browse call; number_of_videos is computed from items actually # fetched so we don't mislead the user with a total that includes # shorts (which this branch is explicitly excluding for sort=4). task = gevent.spawn(*browse_call) task.join() util.check_gevent_exceptions(task) polymer_json = task.value elif tab == 'about': # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about') channel_id = get_channel_id(base_url) ctoken = channel_about_ctoken(channel_id) polymer_json = util.call_youtube_api('web', 'browse', { 'continuation': ctoken, }) continuation=True elif tab == 'playlists' and page_number == 1: # Use youtubei API instead of deprecated pbj=1 format if not channel_id: channel_id = get_channel_id(base_url) ctoken = channel_ctoken_v3(channel_id, page='1', sort=sort, tab='playlists', view=view) polymer_json = util.call_youtube_api('web', 'browse', { 'continuation': ctoken, }) continuation = True elif tab == 'playlists': polymer_json = get_channel_tab(channel_id, page_number, sort, 'playlists', view) continuation = True elif tab == 'search' and channel_id: polymer_json = get_channel_search_json(channel_id, query, page_number) elif tab == 'search': url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') elif tab != 'videos': flask.abort(404, 'Unknown channel tab: ' + tab) if polymer_json is not None and info is None: info = yt_data_extract.extract_channel_info( json.loads(polymer_json), tab, continuation=continuation ) if info is None: return flask.render_template('error.html', error_message='Could not retrieve channel data') if info['error'] is not None: return flask.render_template('error.html', error_message=info['error']) if channel_id: info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id info['channel_id'] = channel_id else: channel_id = info['channel_id'] # Will have microformat present, cache metadata while we have it if (channel_id and default_params and tab not in ('videos', 'about') and info.get('channel_name') is not None): metadata = extract_metadata_for_caching(info) set_cached_metadata(channel_id, metadata) # Otherwise, populate with our (hopefully cached) metadata elif channel_id and info.get('channel_name') is None: metadata = get_metadata(channel_id) for key, value in metadata.items(): yt_data_extract.conservative_update(info, key, value) # need to add this metadata to the videos/playlists additional_info = { 'author': info['channel_name'], 'author_id': info['channel_id'], 'author_url': info['channel_url'], } for item in info['items']: item.update(additional_info) if tab in ('videos', 'shorts', 'streams'): # For any tab using the channel browse API (sort=4, shorts, streams), # pagination is driven by the ctoken YouTube returns in the response. # Cache it so the next page request can use it. if info.get('ctoken'): cache_key = (channel_id, tab, sort, page_number) continuation_token_cache[cache_key] = info['ctoken'] # Determine is_last_page and final number_of_pages. # For channel-API-driven tabs (sort=4, shorts, streams, UU fallback), # YouTube doesn't give us a reliable total filtered count. So instead # of displaying a misleading number (the total-including-shorts from # get_number_of_videos_channel), we count only what we've actually # paged through, and use the ctoken to know whether to show "next". if used_channel_api: info['is_last_page'] = (info.get('ctoken') is None) items_on_page = len(info.get('items', [])) items_seen_so_far = (page_number - 1) * page_size + items_on_page # Use accumulated count as the displayed total so "N videos" shown # to the user always matches what they could actually reach. number_of_videos = items_seen_so_far # If there's more content, bump by 1 so the Next-page button exists if info.get('ctoken'): number_of_videos = max(number_of_videos, page_number * page_size + 1) # For sort=3 via UU playlist (used_channel_api=False), number_of_videos # was already set from playlist metadata above. info['number_of_videos'] = number_of_videos info['number_of_pages'] = math.ceil(number_of_videos / page_size) if number_of_videos else 1 # Never show fewer pages than the page the user is actually on if info['number_of_pages'] < page_number: info['number_of_pages'] = page_number info['header_playlist_names'] = local_playlist.get_playlist_names() if tab in ('videos', 'shorts', 'streams', 'playlists'): info['current_sort'] = sort elif tab == 'search': info['search_box_value'] = query info['header_playlist_names'] = local_playlist.get_playlist_names() if tab in ('search', 'playlists'): info['page_number'] = page_number info['subscribed'] = subscriptions.is_subscribed(info['channel_id']) post_process_channel_info(info) return flask.render_template('channel.html', parameters_dictionary = request.args, **info ) @yt_app.route('/channel//') @yt_app.route('/channel//') def get_channel_page(channel_id, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) @yt_app.route('/user//') @yt_app.route('/user//') def get_user_page(username, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) @yt_app.route('/c//') @yt_app.route('/c//') def get_custom_c_page(custom, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) @yt_app.route('/') @yt_app.route('//') def get_toplevel_custom_page(custom, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)