Update channel to new ctoken format

Huge thanks to @michaelweiser

Different sortings still don't work for videos and playlists
This commit is contained in:
Jesus E 2023-05-28 21:04:36 -04:00
parent 7b60751e99
commit 68752000f0
No known key found for this signature in database
GPG Key ID: 159C8F8BC9AED8B6
4 changed files with 70 additions and 17 deletions

View File

@ -31,6 +31,47 @@ headers_mobile = (
real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),)
generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
# https://github.com/user234683/youtube-local/issues/151
def channel_ctoken_v4(channel_id, page, sort, tab, view=1):
new_sort = (2 if sort == 1 else 1)
offset = str(30*(int(page) - 1))
pointless_nest = proto.string(80226972,
proto.string(2, channel_id)
+ proto.string(3,
proto.percent_b64encode(
proto.string(110,
proto.string(3,
proto.string(15,
proto.string(1,
proto.string(1,
proto.unpadded_b64encode(
proto.string(1,
proto.unpadded_b64encode(
proto.string(2,
b"ST:"
+ proto.unpadded_b64encode(
proto.string(2, offset)
)
)
)
)
)
)
# targetId, just needs to be present but
# doesn't need to be correct
+ proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38")
)
# 1 - newest, 2 - popular
+ proto.uint(3, new_sort)
)
)
)
)
)
)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
# SORT: # SORT:
# videos: # videos:
# Popular - 1 # Popular - 1
@ -75,11 +116,11 @@ def channel_ctoken_v2(channel_id, page, sort, tab, view=1):
2: 17254859483345278706, 2: 17254859483345278706,
1: 16570086088270825023, 1: 16570086088270825023,
}[int(sort)] }[int(sort)]
page_token = proto.string(61, proto.unpadded_b64encode( page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1,
proto.string(1, proto.uint(1, schema_number) + proto.string( proto.uint(1, schema_number) + proto.string(2,
2,
proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
)))) )
)))
tab = proto.string(2, tab) tab = proto.string(2, tab)
sort = proto.uint(3, int(sort)) sort = proto.uint(3, int(sort))
@ -118,8 +159,9 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
message = 'Got channel tab' if print_status else None message = 'Got channel tab' if print_status else None
if not ctoken: if not ctoken:
ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = channel_ctoken_v4(channel_id, page, sort, tab, view)
ctoken = ctoken.replace('=', '%3D') ctoken = ctoken.replace('=', '%3D')
# Not sure what the purpose of the key is or whether it will change # Not sure what the purpose of the key is or whether it will change
# For now it seems to be constant for the API endpoint, not dependent # For now it seems to be constant for the API endpoint, not dependent
# on the browsing session or channel # on the browsing session or channel
@ -182,8 +224,7 @@ def get_channel_id(base_url):
# method that gives the smallest possible response at ~4 kb # method that gives the smallest possible response at ~4 kb
# needs to be as fast as possible # needs to be as fast as possible
base_url = base_url.replace('https://www', 'https://m') # avoid redirect base_url = base_url.replace('https://www', 'https://m') # avoid redirect
response = util.fetch_url( response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
base_url + '/about?pbj=1', headers_mobile,
debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
match = channel_id_re.search(response) match = channel_id_re.search(response)
if match: if match:
@ -260,6 +301,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
query = request.args.get('query', '') query = request.args.get('query', '')
ctoken = request.args.get('ctoken', '') ctoken = request.args.get('ctoken', '')
default_params = (page_number == 1 and sort == '3' and view == '1') default_params = (page_number == 1 and sort == '3' and view == '1')
continuation = bool(ctoken) # whether or not we're using a continuation
if tab == 'videos' and channel_id and not default_params: if tab == 'videos' and channel_id and not default_params:
tasks = ( tasks = (
@ -270,6 +312,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
gevent.joinall(tasks) gevent.joinall(tasks)
util.check_gevent_exceptions(*tasks) util.check_gevent_exceptions(*tasks)
number_of_videos, polymer_json = tasks[0].value, tasks[1].value number_of_videos, polymer_json = tasks[0].value, tasks[1].value
continuation = True
elif tab == 'videos': elif tab == 'videos':
if channel_id: if channel_id:
num_videos_call = (get_number_of_videos_channel, channel_id) num_videos_call = (get_number_of_videos_channel, channel_id)
@ -289,6 +332,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
elif tab == 'playlists': elif tab == 'playlists':
polymer_json = get_channel_tab(channel_id, page_number, sort, polymer_json = get_channel_tab(channel_id, page_number, sort,
'playlists', view) 'playlists', view)
continuation = True
elif tab == 'search' and channel_id: elif tab == 'search' and channel_id:
polymer_json = get_channel_search_json(channel_id, query, page_number) polymer_json = get_channel_search_json(channel_id, query, page_number)
elif tab == 'search': elif tab == 'search':
@ -297,7 +341,9 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
else: else:
flask.abort(404, 'Unknown channel tab: ' + tab) flask.abort(404, 'Unknown channel tab: ' + tab)
info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab)
info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab,
continuation=continuation)
if info['error'] is not None: if info['error'] is not None:
return flask.render_template('error.html', error_message=info['error']) return flask.render_template('error.html', error_message=info['error'])
@ -308,6 +354,8 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
info['header_playlist_names'] = local_playlist.get_playlist_names() info['header_playlist_names'] = local_playlist.get_playlist_names()
if tab in ('videos', 'playlists'): if tab in ('videos', 'playlists'):
info['current_sort'] = sort info['current_sort'] = sort
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['channel_id'] = channel_id
elif tab == 'search': elif tab == 'search':
info['search_box_value'] = query info['search_box_value'] = query
info['header_playlist_names'] = local_playlist.get_playlist_names() info['header_playlist_names'] = local_playlist.get_playlist_names()

View File

@ -1,7 +1,7 @@
{% if current_tab == 'search' %} {% if current_tab == 'search' %}
{% set page_title = search_box_value + ' - Page ' + page_number|string %} {% set page_title = search_box_value + ' - Page ' + page_number|string %}
{% else %} {% else %}
{% set page_title = channel_name + ' - Channel' %} {% set page_title = channel_name|string + ' - Channel' %}
{% endif %} {% endif %}
{% extends "base.html" %} {% extends "base.html" %}

View File

@ -542,8 +542,12 @@ def extract_items(response, item_types=_item_types,
item_types=item_types) item_types=item_types)
if items: if items:
break break
elif 'onResponseReceivedEndpoints' in response: elif ('onResponseReceivedEndpoints' in response
for endpoint in response.get('onResponseReceivedEndpoints', []): or 'onResponseReceivedActions' in response):
for endpoint in multi_get(response,
'onResponseReceivedEndpoints',
'onResponseReceivedActions',
[]):
items, ctoken = extract_items_from_renderer_list( items, ctoken = extract_items_from_renderer_list(
multi_deep_get( multi_deep_get(
endpoint, endpoint,

View File

@ -9,7 +9,7 @@ import re
import urllib import urllib
from math import ceil from math import ceil
def extract_channel_info(polymer_json, tab): def extract_channel_info(polymer_json, tab, continuation=False):
response, err = extract_response(polymer_json) response, err = extract_response(polymer_json)
if err: if err:
return {'error': err} return {'error': err}
@ -23,7 +23,8 @@ def extract_channel_info(polymer_json, tab):
# channel doesn't exist or was terminated # channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
if not metadata: # metadata and microformat are not present for continuation requests
if not metadata and not continuation:
if response.get('alerts'): if response.get('alerts'):
error_string = ' '.join( error_string = ' '.join(
extract_str(deep_get(alert, 'alertRenderer', 'text'), default='') extract_str(deep_get(alert, 'alertRenderer', 'text'), default='')
@ -44,7 +45,7 @@ def extract_channel_info(polymer_json, tab):
info['approx_subscriber_count'] = extract_approx_int(deep_get(response, info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
'header', 'c4TabbedHeaderRenderer', 'subscriberCountText')) 'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
# stuff from microformat (info given by youtube for every page on channel) # stuff from microformat (info given by youtube for first page on channel)
info['short_description'] = metadata.get('description') info['short_description'] = metadata.get('description')
if info['short_description'] and len(info['short_description']) > 730: if info['short_description'] and len(info['short_description']) > 730:
info['short_description'] = info['short_description'][0:730] + '...' info['short_description'] = info['short_description'][0:730] + '...'
@ -69,8 +70,8 @@ def extract_channel_info(polymer_json, tab):
info['ctoken'] = None info['ctoken'] = None
# empty channel # empty channel
if 'contents' not in response and 'continuationContents' not in response: #if 'contents' not in response and 'continuationContents' not in response:
return info # return info
if tab in ('videos', 'playlists', 'search'): if tab in ('videos', 'playlists', 'search'):
items, ctoken = extract_items(response) items, ctoken = extract_items(response)