import base64 from youtube import (util, yt_data_extract, local_playlist, subscriptions, playlist) from youtube import yt_app import settings import urllib import json import youtube.proto as proto import math import gevent import re import cachetools.func import traceback import flask from flask import request headers_desktop = ( ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), ('X-YouTube-Client-Name', '1'), ('X-YouTube-Client-Version', '2.20180830'), ) + util.desktop_ua headers_mobile = ( ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), ('X-YouTube-Client-Name', '2'), ('X-YouTube-Client-Version', '2.20180830'), ) + util.mobile_ua real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) # Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest # include_shorts only applies to tab='videos'; tab='shorts'/'streams' always include their own content. def channel_ctoken_v5(channel_id, page, sort, tab, view=1, include_shorts=True): # Tab-specific protobuf field numbers (from Invidious source) # Each tab uses different field numbers in the protobuf structure: # videos: 110 -> 3 -> 15 -> { 2:{1:UUID}, 4:sort, 8:{1:UUID, 3:sort} } # shorts: 110 -> 3 -> 10 -> { 2:{1:UUID}, 4:sort, 7:{1:UUID, 3:sort} } # streams: 110 -> 3 -> 14 -> { 2:{1:UUID}, 5:sort, 8:{1:UUID, 3:sort} } tab_config = { 'videos': {'tab_field': 15, 'sort_field': 4, 'embedded_field': 8}, 'shorts': {'tab_field': 10, 'sort_field': 4, 'embedded_field': 7}, 'streams': {'tab_field': 14, 'sort_field': 5, 'embedded_field': 8}, } config = tab_config.get(tab, tab_config['videos']) tab_field = config['tab_field'] sort_field = config['sort_field'] embedded_field = config['embedded_field'] # Map sort values to YouTube API values if tab == 'streams': sort_mapping = {'1': 14, '2': 13, '3': 12, '4': 12} else: sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} new_sort = sort_mapping.get(sort, sort_mapping['3']) # UUID placeholder (field 1) uuid_str = "00000000-0000-0000-0000-000000000000" # Build the tab-level object matching Invidious structure exactly: # { 2: embedded{1: UUID}, sort_field: sort_val, embedded_field: embedded{1: UUID, 3: sort_val} } tab_content = ( proto.string(2, proto.string(1, uuid_str)) + proto.uint(sort_field, new_sort) + proto.string(embedded_field, proto.string(1, uuid_str) + proto.uint(3, new_sort)) ) tab_wrapper = proto.string(tab_field, tab_content) inner_container = proto.string(3, tab_wrapper) outer_container = proto.string(110, inner_container) # Add shorts filter when include_shorts=False (field 104, same as playlist.py) # This tells YouTube to exclude shorts from the results if not include_shorts: outer_container += proto.string(104, proto.uint(2, 1)) encoded_inner = proto.percent_b64encode(outer_container) pointless_nest = proto.string(80226972, proto.string(2, channel_id) + proto.string(3, encoded_inner) ) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def channel_about_ctoken(channel_id): return proto.make_protobuf( ('base64p', [ [2, 80226972, [ [2, 2, channel_id], [2, 3, ('base64p', [ [2, 110, [ [2, 3, [ [2, 19, [ [2, 1, b'66b0e9e9-0000-2820-9589-582429a83980'], ] ], ] ], ] ], ] ) ], ] ], ] ) ) # https://github.com/user234683/youtube-local/issues/151 def channel_ctoken_v4(channel_id, page, sort, tab, view=1): new_sort = (2 if int(sort) == 1 else 1) offset = str(30*(int(page) - 1)) pointless_nest = proto.string(80226972, proto.string(2, channel_id) + proto.string(3, proto.percent_b64encode( proto.string(110, proto.string(3, proto.string(15, proto.string(1, proto.string(1, proto.unpadded_b64encode( proto.string(1, proto.unpadded_b64encode( proto.string(2, b"ST:" + proto.unpadded_b64encode( proto.string(2, offset) ) ) ) ) ) ) # targetId, just needs to be present but # doesn't need to be correct + proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38") ) # 1 - newest, 2 - popular + proto.uint(3, new_sort) ) ) ) ) ) ) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') # SORT: # videos: # Newest - 3 # Last video added - 4 # view: # grid: 0 or 1 # list: 2 def channel_ctoken_v3(channel_id, page, sort, tab, view=1): # page > 1 doesn't work when sorting by oldest offset = 30*(int(page) - 1) page_token = proto.string(61, proto.unpadded_b64encode( proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) )) tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string(3, proto.percent_b64encode(tab + sort + shelf_view + view + page_token) ) channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def channel_ctoken_v2(channel_id, page, sort, tab, view=1): # see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646 # page > 1 doesn't work when sorting by oldest offset = 30*(int(page) - 1) schema_number = { 3: 6307666885028338688, 2: 17254859483345278706, 1: 16570086088270825023, }[int(sort)] page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1, proto.uint(1, schema_number) + proto.string(2, proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) ) ))) tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) #page = proto.string(15, str(page)) shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string( 3, proto.percent_b64encode(tab + sort + shelf_view + view + page_token) ) channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def channel_ctoken_v1(channel_id, page, sort, tab, view=1): tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) page = proto.string(15, str(page)) # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string(3, proto.percent_b64encode(tab + view + sort + shelf_view + page + proto.uint(23, 0)) ) channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, ctoken=None, print_status=True, include_shorts=True): message = 'Got channel tab' if print_status else None if not ctoken: if tab in ('videos', 'shorts', 'streams'): ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view, include_shorts) else: ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = ctoken.replace('=', '%3D') # Not sure what the purpose of the key is or whether it will change # For now it seems to be constant for the API endpoint, not dependent # on the browsing session or channel key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key data = { 'context': { 'client': { 'hl': 'en', 'gl': 'US', 'clientName': 'WEB', 'clientVersion': '2.20240327.00.00', }, }, 'continuation': ctoken, } content_type_header = (('Content-Type', 'application/json'),) content = util.fetch_url( url, headers_desktop + content_type_header, data=json.dumps(data), debug_name='channel_tab', report_text=message) return content # cache entries expire after 30 minutes number_of_videos_cache = cachetools.TTLCache(128, 30*60) # Cache for continuation tokens (shorts/streams pagination) continuation_token_cache = cachetools.TTLCache(512, 15*60) @cachetools.cached(number_of_videos_cache) def get_number_of_videos_channel(channel_id): if channel_id is None: return 1000 # Uploads playlist playlist_id = 'UU' + channel_id[2:] url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' try: response = util.fetch_url(url, headers_mobile, debug_name='number_of_videos', report_text='Got number of videos') except (urllib.error.HTTPError, util.FetchError): traceback.print_exc() print("Couldn't retrieve number of videos") return 1000 response = response.decode('utf-8') # Try several patterns since YouTube's format changes: # "numVideosText":{"runs":[{"text":"1,234"},{"text":" videos"}]} # "stats":[..., {"runs":[{"text":"1,234"},{"text":" videos"}]}] for pattern in ( r'"numVideosText".*?"text":\s*"([\d,]+)"', r'"numVideosText".*?([\d,]+)\s*videos?', r'"numVideosText".*?([,\d]+)', r'([\d,]+)\s*videos?\s*', ): match = re.search(pattern, response) if match: try: return int(match.group(1).replace(',', '')) except ValueError: continue # Fallback: unknown count return 0 def set_cached_number_of_videos(channel_id, num_videos): @cachetools.cached(number_of_videos_cache) def dummy_func_using_same_cache(channel_id): return num_videos dummy_func_using_same_cache(channel_id) channel_id_re = re.compile(r'videos\.xml\?channel_id=([a-zA-Z0-9_-]{24})"') @cachetools.func.lru_cache(maxsize=128) def get_channel_id(base_url): # method that gives the smallest possible response at ~4 kb # needs to be as fast as possible base_url = base_url.replace('https://www', 'https://m') # avoid redirect response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') match = channel_id_re.search(response) if match: return match.group(1) return None metadata_cache = cachetools.LRUCache(128) @cachetools.cached(metadata_cache) def get_metadata(channel_id): # Use youtubei browse API to get channel metadata polymer_json = util.call_youtube_api('web', 'browse', { 'browseId': channel_id, }) info = yt_data_extract.extract_channel_info(json.loads(polymer_json), 'about', continuation=False) return extract_metadata_for_caching(info) def set_cached_metadata(channel_id, metadata): @cachetools.cached(metadata_cache) def dummy_func_using_same_cache(channel_id): return metadata dummy_func_using_same_cache(channel_id) def extract_metadata_for_caching(channel_info): metadata = {} for key in ('approx_subscriber_count', 'short_description', 'channel_name', 'avatar'): metadata[key] = channel_info[key] return metadata def get_number_of_videos_general(base_url): return get_number_of_videos_channel(get_channel_id(base_url)) def get_channel_search_json(channel_id, query, page): offset = proto.unpadded_b64encode(proto.uint(3, (page-1)*30)) params = proto.string(2, 'search') + proto.string(15, offset) params = proto.percent_b64encode(params) ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query) ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key data = { 'context': { 'client': { 'hl': 'en', 'gl': 'US', 'clientName': 'WEB', 'clientVersion': '2.20240327.00.00', }, }, 'continuation': ctoken, } content_type_header = (('Content-Type', 'application/json'),) polymer_json = util.fetch_url( url, headers_desktop + content_type_header, data=json.dumps(data), debug_name='channel_search') return polymer_json def post_process_channel_info(info): info['avatar'] = util.prefix_url(info['avatar']) info['channel_url'] = util.prefix_url(info['channel_url']) for item in info['items']: # Only set thumbnail if YouTube didn't provide one if not item.get('thumbnail'): if item.get('type') == 'playlist' and item.get('first_video_id'): item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id']) elif item.get('type') == 'video' and item.get('id'): item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id']) util.prefix_urls(item) util.add_extra_html_info(item) if info['current_tab'] == 'about': for i, (text, url) in enumerate(info['links']): if isinstance(url, str) and util.YOUTUBE_URL_RE.fullmatch(url): info['links'][i] = (text, util.prefix_url(url)) def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): if channel_id: base_url = 'https://www.youtube.com/channel/' + channel_id # Build URL with sort parameter # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts # Note: 'da' (oldest) was removed by YouTube in January 2026 url = base_url + '/' + tab + '?pbj=1&view=0' if sort: # Map sort values to YouTube's URL parameter values sort_map = {'3': 'dd', '4': 'lad'} url += '&sort=' + sort_map.get(sort, 'dd') return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} # youtube.com/[channel_id]/[tab] # youtube.com/user/[username]/[tab] # youtube.com/c/[custom]/[tab] # youtube.com/[custom]/[tab] def get_channel_page_general_url(base_url, tab, request, channel_id=None): page_number = int(request.args.get('page', 1)) # sort 1: views # sort 2: oldest # sort 3: newest (includes shorts, via UU uploads playlist) # sort 4: newest - no shorts (uses channel Videos tab API directly, like Invidious) default_sort = '3' if settings.include_shorts_in_channel else '4' sort = request.args.get('sort', default_sort) view = request.args.get('view', '1') query = request.args.get('query', '') ctoken = request.args.get('ctoken', '') default_params = (page_number == 1 and sort in ('3', '4') and view == '1') continuation = bool(ctoken) page_size = 30 polymer_json = None number_of_videos = 0 info = None # ------------------------------------------------------------------------- # sort=3: use UU uploads playlist (includes shorts) # ------------------------------------------------------------------------- if tab == 'videos' and sort == '3': if not channel_id: channel_id = get_channel_id(base_url) if page_number == 1: tasks = ( gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], report_text='Retrieved channel videos'), gevent.spawn(get_metadata, channel_id), ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) number_of_videos = pl_info['metadata']['video_count'] if number_of_videos is None: number_of_videos = 1000 else: set_cached_number_of_videos(channel_id, number_of_videos) else: tasks = ( gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], page_number, include_shorts=True), gevent.spawn(get_metadata, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id), gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], report_text='Retrieved channel video count'), ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) first_page_meta = yt_data_extract.extract_playlist_metadata(tasks[3].value) number_of_videos = (tasks[2].value or first_page_meta.get('video_count') or 0) if pl_info['items']: info = pl_info info['channel_id'] = channel_id info['current_tab'] = 'videos' page_size = 100 # else fall through to the channel browse API below # ------------------------------------------------------------------------- # Channel browse API: sort=4 (videos tab, no shorts), shorts, streams, # or fallback when the UU playlist returned no items. # Uses channel_ctoken_v5 per-tab tokens, mirroring Invidious's approach. # Pagination is driven by the continuation token YouTube returns each page. # ------------------------------------------------------------------------- used_channel_api = False if info is None and ( tab in ('shorts', 'streams') or (tab == 'videos' and sort == '4') or (tab == 'videos' and sort == '3') # UU-playlist fallback ): if not channel_id: channel_id = get_channel_id(base_url) used_channel_api = True # Determine what browse call to make if ctoken: browse_call = (util.call_youtube_api, 'web', 'browse', {'continuation': ctoken}) continuation = True elif page_number > 1: cache_key = (channel_id, tab, sort, page_number - 1) cached_ctoken = continuation_token_cache.get(cache_key) if cached_ctoken: browse_call = (util.call_youtube_api, 'web', 'browse', {'continuation': cached_ctoken}) else: # Cache miss — restart from page 1 (better than an error) browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) continuation = True else: browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) continuation = True # Single browse call; number_of_videos is computed from items actually # fetched so we don't mislead the user with a total that includes # shorts (which this branch is explicitly excluding for sort=4). task = gevent.spawn(*browse_call) task.join() util.check_gevent_exceptions(task) polymer_json = task.value elif tab == 'about': # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about') channel_id = get_channel_id(base_url) ctoken = channel_about_ctoken(channel_id) polymer_json = util.call_youtube_api('web', 'browse', { 'continuation': ctoken, }) continuation=True elif tab == 'playlists' and page_number == 1: # Use youtubei API instead of deprecated pbj=1 format if not channel_id: channel_id = get_channel_id(base_url) ctoken = channel_ctoken_v3(channel_id, page='1', sort=sort, tab='playlists', view=view) polymer_json = util.call_youtube_api('web', 'browse', { 'continuation': ctoken, }) continuation = True elif tab == 'playlists': polymer_json = get_channel_tab(channel_id, page_number, sort, 'playlists', view) continuation = True elif tab == 'search' and channel_id: polymer_json = get_channel_search_json(channel_id, query, page_number) elif tab == 'search': url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') elif tab != 'videos': flask.abort(404, 'Unknown channel tab: ' + tab) if polymer_json is not None and info is None: info = yt_data_extract.extract_channel_info( json.loads(polymer_json), tab, continuation=continuation ) if info is None: return flask.render_template('error.html', error_message='Could not retrieve channel data') if info['error'] is not None: return flask.render_template('error.html', error_message=info['error']) if channel_id: info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id info['channel_id'] = channel_id else: channel_id = info['channel_id'] # Will have microformat present, cache metadata while we have it if (channel_id and default_params and tab not in ('videos', 'about') and info.get('channel_name') is not None): metadata = extract_metadata_for_caching(info) set_cached_metadata(channel_id, metadata) # Otherwise, populate with our (hopefully cached) metadata elif channel_id and info.get('channel_name') is None: metadata = get_metadata(channel_id) for key, value in metadata.items(): yt_data_extract.conservative_update(info, key, value) # need to add this metadata to the videos/playlists additional_info = { 'author': info['channel_name'], 'author_id': info['channel_id'], 'author_url': info['channel_url'], } for item in info['items']: item.update(additional_info) if tab in ('videos', 'shorts', 'streams'): # For any tab using the channel browse API (sort=4, shorts, streams), # pagination is driven by the ctoken YouTube returns in the response. # Cache it so the next page request can use it. if info.get('ctoken'): cache_key = (channel_id, tab, sort, page_number) continuation_token_cache[cache_key] = info['ctoken'] # Determine is_last_page and final number_of_pages. # For channel-API-driven tabs (sort=4, shorts, streams, UU fallback), # YouTube doesn't give us a reliable total filtered count. So instead # of displaying a misleading number (the total-including-shorts from # get_number_of_videos_channel), we count only what we've actually # paged through, and use the ctoken to know whether to show "next". if used_channel_api: info['is_last_page'] = (info.get('ctoken') is None) items_on_page = len(info.get('items', [])) items_seen_so_far = (page_number - 1) * page_size + items_on_page # Use accumulated count as the displayed total so "N videos" shown # to the user always matches what they could actually reach. number_of_videos = items_seen_so_far # If there's more content, bump by 1 so the Next-page button exists if info.get('ctoken'): number_of_videos = max(number_of_videos, page_number * page_size + 1) # For sort=3 via UU playlist (used_channel_api=False), number_of_videos # was already set from playlist metadata above. info['number_of_videos'] = number_of_videos info['number_of_pages'] = math.ceil(number_of_videos / page_size) if number_of_videos else 1 # Never show fewer pages than the page the user is actually on if info['number_of_pages'] < page_number: info['number_of_pages'] = page_number info['header_playlist_names'] = local_playlist.get_playlist_names() if tab in ('videos', 'shorts', 'streams', 'playlists'): info['current_sort'] = sort elif tab == 'search': info['search_box_value'] = query info['header_playlist_names'] = local_playlist.get_playlist_names() if tab in ('search', 'playlists'): info['page_number'] = page_number info['subscribed'] = subscriptions.is_subscribed(info['channel_id']) post_process_channel_info(info) return flask.render_template('channel.html', parameters_dictionary = request.args, **info ) @yt_app.route('/channel//') @yt_app.route('/channel//') def get_channel_page(channel_id, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) @yt_app.route('/user//') @yt_app.route('/user//') def get_user_page(username, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) @yt_app.route('/c//') @yt_app.route('/c//') def get_custom_c_page(custom, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) @yt_app.route('/') @yt_app.route('//') def get_toplevel_custom_page(custom, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)