diff --git a/tests/test_shorts.py b/tests/test_shorts.py index edf7d73..9a49450 100644 --- a/tests/test_shorts.py +++ b/tests/test_shorts.py @@ -58,6 +58,59 @@ class TestChannelCtokenV5: assert t_shorts != t_streams assert t_videos != t_streams + def test_include_shorts_false_adds_filter(self): + """Test that include_shorts=False adds the shorts filter (field 104).""" + # Token with shorts included (default) + t_with_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'videos', include_shorts=True) + # Token with shorts excluded + t_without_shorts = self.channel_ctoken_v5('UCtest', '1', '3', 'videos', include_shorts=False) + + # The tokens should be different because of the shorts filter + assert t_with_shorts != t_without_shorts + + # Decode and verify the filter is present + raw_with_shorts = base64.urlsafe_b64decode(t_with_shorts + '==') + raw_without_shorts = base64.urlsafe_b64decode(t_without_shorts + '==') + + # Parse the outer protobuf structure + import youtube.proto as proto + outer_fields_with = list(proto.read_protobuf(raw_with_shorts)) + outer_fields_without = list(proto.read_protobuf(raw_without_shorts)) + + # Field 80226972 contains the inner data + inner_with = [v for _, fn, v in outer_fields_with if fn == 80226972][0] + inner_without = [v for _, fn, v in outer_fields_without if fn == 80226972][0] + + # Parse the inner data - field 3 contains percent-encoded base64 data + inner_fields_with = list(proto.read_protobuf(inner_with)) + inner_fields_without = list(proto.read_protobuf(inner_without)) + + # Get field 3 data (the encoded inner which is percent-encoded base64) + encoded_inner_with = [v for _, fn, v in inner_fields_with if fn == 3][0] + encoded_inner_without = [v for _, fn, v in inner_fields_without if fn == 3][0] + + # The inner without shorts should contain field 104 + # Decode the percent-encoded base64 data + import urllib.parse + decoded_with = urllib.parse.unquote(encoded_inner_with.decode('ascii')) + decoded_without = urllib.parse.unquote(encoded_inner_without.decode('ascii')) + + # Decode the base64 data + decoded_with_bytes = base64.urlsafe_b64decode(decoded_with + '==') + decoded_without_bytes = base64.urlsafe_b64decode(decoded_without + '==') + + # Parse the decoded protobuf data + fields_with = list(proto.read_protobuf(decoded_with_bytes)) + fields_without = list(proto.read_protobuf(decoded_without_bytes)) + + field_numbers_with = [fn for _, fn, _ in fields_with] + field_numbers_without = [fn for _, fn, _ in fields_without] + + # The 'with' version should NOT have field 104 + assert 104 not in field_numbers_with + # The 'without' version SHOULD have field 104 + assert 104 in field_numbers_without + # --- shortsLockupViewModel parsing --- diff --git a/youtube/channel.py b/youtube/channel.py index a139bc1..17e46f5 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -33,9 +33,9 @@ headers_mobile = ( real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) -# FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127) # Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest -def channel_ctoken_v5(channel_id, page, sort, tab, view=1): +# include_shorts only applies to tab='videos'; tab='shorts'/'streams' always include their own content. +def channel_ctoken_v5(channel_id, page, sort, tab, view=1, include_shorts=True): # Tab-specific protobuf field numbers (from Invidious source) # Each tab uses different field numbers in the protobuf structure: # videos: 110 -> 3 -> 15 -> { 2:{1:UUID}, 4:sort, 8:{1:UUID, 3:sort} } @@ -74,6 +74,11 @@ def channel_ctoken_v5(channel_id, page, sort, tab, view=1): inner_container = proto.string(3, tab_wrapper) outer_container = proto.string(110, inner_container) + # Add shorts filter when include_shorts=False (field 104, same as playlist.py) + # This tells YouTube to exclude shorts from the results + if not include_shorts: + outer_container += proto.string(104, proto.uint(2, 1)) + encoded_inner = proto.percent_b64encode(outer_container) pointless_nest = proto.string(80226972, @@ -236,12 +241,12 @@ def channel_ctoken_v1(channel_id, page, sort, tab, view=1): def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, - ctoken=None, print_status=True): + ctoken=None, print_status=True, include_shorts=True): message = 'Got channel tab' if print_status else None if not ctoken: if tab in ('videos', 'shorts', 'streams'): - ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view) + ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view, include_shorts) else: ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = ctoken.replace('=', '%3D') @@ -295,12 +300,23 @@ def get_number_of_videos_channel(channel_id): response = response.decode('utf-8') - # match = re.search(r'"numVideosText":\s*{\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response) - match = re.search(r'"numVideosText".*?([,\d]+)', response) - if match: - return int(match.group(1).replace(',','')) - else: - return 0 + # Try several patterns since YouTube's format changes: + # "numVideosText":{"runs":[{"text":"1,234"},{"text":" videos"}]} + # "stats":[..., {"runs":[{"text":"1,234"},{"text":" videos"}]}] + for pattern in ( + r'"numVideosText".*?"text":\s*"([\d,]+)"', + r'"numVideosText".*?([\d,]+)\s*videos?', + r'"numVideosText".*?([,\d]+)', + r'([\d,]+)\s*videos?\s*', + ): + match = re.search(pattern, response) + if match: + try: + return int(match.group(1).replace(',', '')) + except ValueError: + continue + # Fallback: unknown count + return 0 def set_cached_number_of_videos(channel_id, num_videos): @cachetools.cached(number_of_videos_cache) def dummy_func_using_same_cache(channel_id): @@ -425,24 +441,27 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): page_number = int(request.args.get('page', 1)) # sort 1: views # sort 2: oldest - # sort 4: newest - no shorts (Just a kludge on our end, not internal to yt) + # sort 3: newest (includes shorts, via UU uploads playlist) + # sort 4: newest - no shorts (uses channel Videos tab API directly, like Invidious) default_sort = '3' if settings.include_shorts_in_channel else '4' sort = request.args.get('sort', default_sort) view = request.args.get('view', '1') query = request.args.get('query', '') ctoken = request.args.get('ctoken', '') - include_shorts = (sort != '4') default_params = (page_number == 1 and sort in ('3', '4') and view == '1') - continuation = bool(ctoken) # whether or not we're using a continuation + continuation = bool(ctoken) page_size = 30 - try_channel_api = True polymer_json = None + number_of_videos = 0 + info = None - # Use the special UU playlist which contains all the channel's uploads - if tab == 'videos' and sort in ('3', '4'): + # ------------------------------------------------------------------------- + # sort=3: use UU uploads playlist (includes shorts) + # ------------------------------------------------------------------------- + if tab == 'videos' and sort == '3': if not channel_id: channel_id = get_channel_id(base_url) - if page_number == 1 and include_shorts: + if page_number == 1: tasks = ( gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], @@ -451,9 +470,6 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) - - # Ignore the metadata for now, it is cached and will be - # recalled later pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) number_of_videos = pl_info['metadata']['video_count'] @@ -464,86 +480,70 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): else: tasks = ( gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], - page_number, include_shorts=include_shorts), + page_number, include_shorts=True), gevent.spawn(get_metadata, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id), + gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], + report_text='Retrieved channel video count'), ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) - pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) - number_of_videos = tasks[2].value + first_page_meta = yt_data_extract.extract_playlist_metadata(tasks[3].value) + number_of_videos = (tasks[2].value + or first_page_meta.get('video_count') + or 0) - info = pl_info - info['channel_id'] = channel_id - info['current_tab'] = 'videos' - if info['items']: # Success + if pl_info['items']: + info = pl_info + info['channel_id'] = channel_id + info['current_tab'] = 'videos' page_size = 100 - try_channel_api = False - else: # Try the first-page method next - try_channel_api = True + # else fall through to the channel browse API below - # Use the regular channel API - if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api): + # ------------------------------------------------------------------------- + # Channel browse API: sort=4 (videos tab, no shorts), shorts, streams, + # or fallback when the UU playlist returned no items. + # Uses channel_ctoken_v5 per-tab tokens, mirroring Invidious's approach. + # Pagination is driven by the continuation token YouTube returns each page. + # ------------------------------------------------------------------------- + used_channel_api = False + if info is None and ( + tab in ('shorts', 'streams') + or (tab == 'videos' and sort == '4') + or (tab == 'videos' and sort == '3') # UU-playlist fallback + ): if not channel_id: channel_id = get_channel_id(base_url) + used_channel_api = True - # For shorts/streams, use continuation token from cache or request - if tab in ('shorts', 'streams'): - if ctoken: - # Use ctoken directly from request (passed via pagination) - polymer_json = util.call_youtube_api('web', 'browse', { - 'continuation': ctoken, - }) - continuation = True - elif page_number > 1: - # For page 2+, get ctoken from cache - cache_key = (channel_id, tab, sort, page_number - 1) - cached_ctoken = continuation_token_cache.get(cache_key) - if cached_ctoken: - polymer_json = util.call_youtube_api('web', 'browse', { - 'continuation': cached_ctoken, - }) - continuation = True - else: - # Fallback: generate fresh ctoken - page_call = (get_channel_tab, channel_id, str(page_number), sort, tab, int(view)) - continuation = True - polymer_json = gevent.spawn(*page_call) - polymer_json.join() - if polymer_json.exception: - raise polymer_json.exception - polymer_json = polymer_json.value + # Determine what browse call to make + if ctoken: + browse_call = (util.call_youtube_api, 'web', 'browse', + {'continuation': ctoken}) + continuation = True + elif page_number > 1: + cache_key = (channel_id, tab, sort, page_number - 1) + cached_ctoken = continuation_token_cache.get(cache_key) + if cached_ctoken: + browse_call = (util.call_youtube_api, 'web', 'browse', + {'continuation': cached_ctoken}) else: - # Page 1: generate fresh ctoken - page_call = (get_channel_tab, channel_id, str(page_number), sort, tab, int(view)) - continuation = True - polymer_json = gevent.spawn(*page_call) - polymer_json.join() - if polymer_json.exception: - raise polymer_json.exception - polymer_json = polymer_json.value + # Cache miss — restart from page 1 (better than an error) + browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) + continuation = True else: - # videos tab - original logic - page_call = (get_channel_tab, channel_id, str(page_number), sort, - tab, int(view)) + browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) continuation = True - if tab == 'videos': - # Only need video count for the videos tab - if channel_id: - num_videos_call = (get_number_of_videos_channel, channel_id) - else: - num_videos_call = (get_number_of_videos_general, base_url) - tasks = ( - gevent.spawn(*num_videos_call), - gevent.spawn(*page_call), - ) - gevent.joinall(tasks) - util.check_gevent_exceptions(*tasks) - number_of_videos, polymer_json = tasks[0].value, tasks[1].value - # For shorts/streams, polymer_json is already set above, nothing to do here + # Single browse call; number_of_videos is computed from items actually + # fetched so we don't mislead the user with a total that includes + # shorts (which this branch is explicitly excluding for sort=4). + task = gevent.spawn(*browse_call) + task.join() + util.check_gevent_exceptions(task) + polymer_json = task.value elif tab == 'about': # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about') @@ -571,16 +571,16 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): elif tab == 'search': url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') - elif tab == 'videos': - pass - else: + elif tab != 'videos': flask.abort(404, 'Unknown channel tab: ' + tab) - if polymer_json is not None: + if polymer_json is not None and info is None: info = yt_data_extract.extract_channel_info( json.loads(polymer_json), tab, continuation=continuation ) + if info is None: + return flask.render_template('error.html', error_message='Could not retrieve channel data') if info['error'] is not None: return flask.render_template('error.html', error_message=info['error']) @@ -610,16 +610,40 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): item.update(additional_info) if tab in ('videos', 'shorts', 'streams'): - if tab in ('shorts', 'streams'): - # For shorts/streams, use ctoken to determine pagination + # For any tab using the channel browse API (sort=4, shorts, streams), + # pagination is driven by the ctoken YouTube returns in the response. + # Cache it so the next page request can use it. + if info.get('ctoken'): + cache_key = (channel_id, tab, sort, page_number) + continuation_token_cache[cache_key] = info['ctoken'] + + # Determine is_last_page and final number_of_pages. + # For channel-API-driven tabs (sort=4, shorts, streams, UU fallback), + # YouTube doesn't give us a reliable total filtered count. So instead + # of displaying a misleading number (the total-including-shorts from + # get_number_of_videos_channel), we count only what we've actually + # paged through, and use the ctoken to know whether to show "next". + if used_channel_api: info['is_last_page'] = (info.get('ctoken') is None) - number_of_videos = len(info.get('items', [])) - # Cache the ctoken for next page + items_on_page = len(info.get('items', [])) + items_seen_so_far = (page_number - 1) * page_size + items_on_page + + # Use accumulated count as the displayed total so "N videos" shown + # to the user always matches what they could actually reach. + number_of_videos = items_seen_so_far + + # If there's more content, bump by 1 so the Next-page button exists if info.get('ctoken'): - cache_key = (channel_id, tab, sort, page_number) - continuation_token_cache[cache_key] = info['ctoken'] + number_of_videos = max(number_of_videos, + page_number * page_size + 1) + # For sort=3 via UU playlist (used_channel_api=False), number_of_videos + # was already set from playlist metadata above. + info['number_of_videos'] = number_of_videos - info['number_of_pages'] = math.ceil(number_of_videos/page_size) if number_of_videos else 1 + info['number_of_pages'] = math.ceil(number_of_videos / page_size) if number_of_videos else 1 + # Never show fewer pages than the page the user is actually on + if info['number_of_pages'] < page_number: + info['number_of_pages'] = page_number info['header_playlist_names'] = local_playlist.get_playlist_names() if tab in ('videos', 'shorts', 'streams', 'playlists'): info['current_sort'] = sort