+''')
+
+def renderer_html(renderer, additional_info={}, current_query_string=''):
+ type = list(renderer.keys())[0]
+ renderer = renderer[type]
+ if type in ('videoRenderer', 'playlistRenderer', 'radioRenderer', 'compactVideoRenderer', 'compactPlaylistRenderer', 'compactRadioRenderer', 'gridVideoRenderer', 'gridPlaylistRenderer', 'gridRadioRenderer'):
+ info = renderer_info(renderer)
+ info.update(additional_info)
+ if type == 'compactVideoRenderer':
+ return video_item_html(info, small_video_item_template)
+ if type in ('compactPlaylistRenderer', 'compactRadioRenderer'):
+ return playlist_item_html(info, small_playlist_item_template)
+ if type in ('videoRenderer', 'gridVideoRenderer'):
+ return video_item_html(info, medium_video_item_template)
+ if type in ('playlistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'gridRadioRenderer'):
+ return playlist_item_html(info, medium_playlist_item_template)
+
+ if type == 'channelRenderer':
+ info = renderer_info(renderer)
+ html_ready = get_html_ready(info)
+ html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id']
+ return medium_channel_item_template.substitute(html_ready)
+
+ if type == 'movieRenderer':
+ return ''
+ print(renderer)
+ raise NotImplementedError('Unknown renderer type: ' + type)
+
+
+'videoRenderer'
+'playlistRenderer'
+'channelRenderer'
+'radioRenderer'
+'gridVideoRenderer'
+'gridPlaylistRenderer'
+
+'didYouMeanRenderer'
+'showingResultsForRenderer'
diff --git a/youtube/opensearch.xml b/youtube/opensearch.xml
index 1764138..c9de40c 100644
--- a/youtube/opensearch.xml
+++ b/youtube/opensearch.xml
@@ -1,11 +1,11 @@
-
-Youtube local
-no CIA shit in the background
-UTF-8
-
-
-
-
-
-http://localhost/youtube.com/search
+
+Youtube local
+no CIA shit in the background
+UTF-8
+
+
+
+
+
+http://localhost/youtube.com/search
\ No newline at end of file
diff --git a/youtube/playlist.py b/youtube/playlist.py
index fc09191..592d1b4 100644
--- a/youtube/playlist.py
+++ b/youtube/playlist.py
@@ -1,243 +1,243 @@
-import base64
-import youtube.common as common
-import urllib
-import json
-from string import Template
-import youtube.proto as proto
-import gevent
-import math
-
-with open("yt_playlist_template.html", "r") as file:
- yt_playlist_template = Template(file.read())
-
-
-
-
-
-
-def youtube_obfuscated_endian(offset):
- if offset < 128:
- return bytes((offset,))
- first_byte = 255 & offset
- second_byte = 255 & (offset >> 7)
- second_byte = second_byte | 1
-
- # The next 2 bytes encode the offset in little endian order,
- # BUT, it's done in a strange way. The least significant bit (LSB) of the second byte is not part
- # of the offset. Instead, to get the number which the two bytes encode, that LSB
- # of the second byte is combined with the most significant bit (MSB) of the first byte
- # in a logical AND. Replace the two bits with the result of the AND to get the two little endian
- # bytes that represent the offset.
-
- return bytes((first_byte, second_byte))
-
-
-
-# just some garbage that's required, don't know what it means, if it means anything.
-ctoken_header = b'\xe2\xa9\x85\xb2\x02' # e2 a9 85 b2 02
-
-def byte(x):
- return bytes((x,))
-
-# TL;DR: the offset is hidden inside 3 nested base 64 encodes with random junk data added on the side periodically
-def create_ctoken(playlist_id, offset):
- obfuscated_offset = b'\x08' + youtube_obfuscated_endian(offset) # 0x08 slapped on for no apparent reason
- obfuscated_offset = b'PT:' + base64.urlsafe_b64encode(obfuscated_offset).replace(b'=', b'')
- obfuscated_offset = b'z' + byte(len(obfuscated_offset)) + obfuscated_offset
- obfuscated_offset = base64.urlsafe_b64encode(obfuscated_offset).replace(b'=', b'%3D')
-
- playlist_bytes = b'VL' + bytes(playlist_id, 'ascii')
- main_info = b'\x12' + byte(len(playlist_bytes)) + playlist_bytes + b'\x1a' + byte(len(obfuscated_offset)) + obfuscated_offset
-
- ctoken = base64.urlsafe_b64encode(ctoken_header + byte(len(main_info)) + main_info)
-
- return ctoken.decode('ascii')
-
-def playlist_ctoken(playlist_id, offset):
-
- offset = proto.uint(1, offset)
- # this is just obfuscation as far as I can tell. It doesn't even follow protobuf
- offset = b'PT:' + proto.unpadded_b64encode(offset)
- offset = proto.string(15, offset)
-
- continuation_info = proto.string( 3, proto.percent_b64encode(offset) )
-
- playlist_id = proto.string(2, 'VL' + playlist_id )
- pointless_nest = proto.string(80226972, playlist_id + continuation_info)
-
- return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
-
-# initial request types:
-# polymer_json: https://m.youtube.com/playlist?list=PLv3TTBr1W_9tppikBxAE_G6qjWdBljBHJ&pbj=1&lact=0
-# ajax json: https://m.youtube.com/playlist?list=PLv3TTBr1W_9tppikBxAE_G6qjWdBljBHJ&pbj=1&lact=0 with header X-YouTube-Client-Version: 1.20180418
-
-
-# continuation request types:
-# polymer_json: https://m.youtube.com/playlist?&ctoken=[...]&pbj=1
-# ajax json: https://m.youtube.com/playlist?action_continuation=1&ajax=1&ctoken=[...]
-
-
-headers_1 = (
- ('Accept', '*/*'),
- ('Accept-Language', 'en-US,en;q=0.5'),
- ('X-YouTube-Client-Name', '1'),
- ('X-YouTube-Client-Version', '2.20180614'),
-)
-
-def playlist_first_page(playlist_id):
- url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&ajax=1&disable_polymer=true'
- content = common.fetch_url(url, common.mobile_ua + headers_1)
- if content[0:4] == b")]}'":
- content = content[4:]
- content = json.loads(common.uppercase_escape(content.decode('utf-8')))
- return content
-
-ajax_info_dispatch = {
- 'view_count_text': ('views', common.get_text),
- 'num_videos_text': ('size', lambda node: common.get_text(node).split(' ')[0]),
- 'thumbnail': ('thumbnail', lambda node: node.url),
- 'title': ('title', common.get_text),
- 'owner_text': ('author', common.get_text),
- 'owner_endpoint': ('author_url', lambda node: node.url),
- 'description': ('description', common.get_formatted_text),
-
-}
-def metadata_info(ajax_json):
- info = {}
- try:
- for key, node in ajax_json.items():
- try:
- simple_key, function = dispatch[key]
- except KeyError:
- continue
- info[simple_key] = function(node)
- return info
- except (KeyError,IndexError):
- print(ajax_json)
- raise
-
-
-
-
-#https://m.youtube.com/playlist?itct=CBMQybcCIhMIptj9xJaJ2wIV2JKcCh3Idwu-&ctoken=4qmFsgI2EiRWTFBMT3kwajlBdmxWWlB0bzZJa2pLZnB1MFNjeC0tN1BHVEMaDmVnWlFWRHBEUWxFJTNE&pbj=1
-def get_videos_ajax(playlist_id, page):
-
- url = "https://m.youtube.com/playlist?action_continuation=1&ajax=1&ctoken=" + playlist_ctoken(playlist_id, (int(page)-1)*20)
- headers = {
- 'User-Agent': ' Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1',
- 'Accept': '*/*',
- 'Accept-Language': 'en-US,en;q=0.5',
- 'X-YouTube-Client-Name': '2',
- 'X-YouTube-Client-Version': '1.20180508',
- }
- print("Sending playlist ajax request")
- content = common.fetch_url(url, headers)
- with open('playlist_debug', 'wb') as f:
- f.write(content)
- content = content[4:]
- print("Finished recieving playlist response")
-
- info = json.loads(common.uppercase_escape(content.decode('utf-8')))
- return info
-
-def get_playlist_videos(ajax_json):
- videos = []
- #info = get_bloated_playlist_videos(playlist_id, page)
- #print(info)
- video_list = ajax_json['content']['continuation_contents']['contents']
-
-
- for video_json_crap in video_list:
- try:
- videos.append({
- "title": video_json_crap["title"]['runs'][0]['text'],
- "id": video_json_crap["video_id"],
- "views": "",
- "duration": common.default_multi_get(video_json_crap, 'length', 'runs', 0, 'text', default=''), # livestreams dont have a length
- "author": video_json_crap['short_byline']['runs'][0]['text'],
- "author_url": '',
- "published": '',
- 'playlist_index': '',
-
- })
- except (KeyError, IndexError):
- print(video_json_crap)
- raise
- return videos
-
-def get_playlist_videos_format2(playlist_id, page):
- videos = []
- info = get_bloated_playlist_videos(playlist_id, page)
- video_list = info['response']['continuationContents']['playlistVideoListContinuation']['contents']
-
- for video_json_crap in video_list:
-
- video_json_crap = video_json_crap['videoRenderer']
-
- try:
- videos.append({
- "title": video_json_crap["title"]['runs'][0]['text'],
- "video_id": video_json_crap["videoId"],
- "views": "",
- "duration": common.default_multi_get(video_json_crap, 'lengthText', 'runs', 0, 'text', default=''), # livestreams dont have a length
- "uploader": video_json_crap['shortBylineText']['runs'][0]['text'],
- "uploader_url": common.ORIGIN_URL + video_json_crap['shortBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
- "published": common.default_multi_get(video_json_crap, 'publishedTimeText', 'simpleText', default=''),
- 'playlist_index': video_json_crap['index']['runs'][0]['text'],
-
- })
- except (KeyError, IndexError):
- print(video_json_crap)
- raise
- return videos
-
-
-def playlist_videos_html(ajax_json):
- result = ''
- for info in get_playlist_videos(ajax_json):
- result += common.small_video_item_html(info)
- return result
-
-playlist_stat_template = Template('''
-
$stat
''')
-def get_playlist_page(query_string):
- parameters = urllib.parse.parse_qs(query_string)
- playlist_id = parameters['list'][0]
- page = parameters.get("page", "1")[0]
- if page == "1":
- first_page_json = playlist_first_page(playlist_id)
- this_page_json = first_page_json
- else:
- tasks = (
- gevent.spawn(playlist_first_page, playlist_id ),
- gevent.spawn(get_videos_ajax, playlist_id, page)
- )
- gevent.joinall(tasks)
- first_page_json, this_page_json = tasks[0].value, tasks[1].value
-
- try:
- video_list = this_page_json['content']['section_list']['contents'][0]['contents'][0]['contents']
- except KeyError:
- video_list = this_page_json['content']['continuation_contents']['contents']
- videos_html = ''
- for video_json in video_list:
- info = common.ajax_info(video_json)
- videos_html += common.video_item_html(info, common.small_video_item_template)
-
-
- metadata = common.ajax_info(first_page_json['content']['playlist_header'])
- video_count = int(metadata['size'].replace(',', ''))
- page_buttons = common.page_buttons_html(int(page), math.ceil(video_count/20), common.URL_ORIGIN + "/playlist", query_string)
-
- html_ready = common.get_html_ready(metadata)
- html_ready['page_title'] = html_ready['title'] + ' - Page ' + str(page)
-
- stats = ''
- stats += playlist_stat_template.substitute(stat=html_ready['size'] + ' videos')
- stats += playlist_stat_template.substitute(stat=html_ready['views'])
- return yt_playlist_template.substitute(
- videos = videos_html,
- page_buttons = page_buttons,
- stats = stats,
- **html_ready
+import base64
+import youtube.common as common
+import urllib
+import json
+from string import Template
+import youtube.proto as proto
+import gevent
+import math
+
+with open("yt_playlist_template.html", "r") as file:
+ yt_playlist_template = Template(file.read())
+
+
+
+
+
+
+def youtube_obfuscated_endian(offset):
+ if offset < 128:
+ return bytes((offset,))
+ first_byte = 255 & offset
+ second_byte = 255 & (offset >> 7)
+ second_byte = second_byte | 1
+
+ # The next 2 bytes encode the offset in little endian order,
+ # BUT, it's done in a strange way. The least significant bit (LSB) of the second byte is not part
+ # of the offset. Instead, to get the number which the two bytes encode, that LSB
+ # of the second byte is combined with the most significant bit (MSB) of the first byte
+ # in a logical AND. Replace the two bits with the result of the AND to get the two little endian
+ # bytes that represent the offset.
+
+ return bytes((first_byte, second_byte))
+
+
+
+# just some garbage that's required, don't know what it means, if it means anything.
+ctoken_header = b'\xe2\xa9\x85\xb2\x02' # e2 a9 85 b2 02
+
+def byte(x):
+ return bytes((x,))
+
+# TL;DR: the offset is hidden inside 3 nested base 64 encodes with random junk data added on the side periodically
+def create_ctoken(playlist_id, offset):
+ obfuscated_offset = b'\x08' + youtube_obfuscated_endian(offset) # 0x08 slapped on for no apparent reason
+ obfuscated_offset = b'PT:' + base64.urlsafe_b64encode(obfuscated_offset).replace(b'=', b'')
+ obfuscated_offset = b'z' + byte(len(obfuscated_offset)) + obfuscated_offset
+ obfuscated_offset = base64.urlsafe_b64encode(obfuscated_offset).replace(b'=', b'%3D')
+
+ playlist_bytes = b'VL' + bytes(playlist_id, 'ascii')
+ main_info = b'\x12' + byte(len(playlist_bytes)) + playlist_bytes + b'\x1a' + byte(len(obfuscated_offset)) + obfuscated_offset
+
+ ctoken = base64.urlsafe_b64encode(ctoken_header + byte(len(main_info)) + main_info)
+
+ return ctoken.decode('ascii')
+
+def playlist_ctoken(playlist_id, offset):
+
+ offset = proto.uint(1, offset)
+ # this is just obfuscation as far as I can tell. It doesn't even follow protobuf
+ offset = b'PT:' + proto.unpadded_b64encode(offset)
+ offset = proto.string(15, offset)
+
+ continuation_info = proto.string( 3, proto.percent_b64encode(offset) )
+
+ playlist_id = proto.string(2, 'VL' + playlist_id )
+ pointless_nest = proto.string(80226972, playlist_id + continuation_info)
+
+ return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
+
+# initial request types:
+# polymer_json: https://m.youtube.com/playlist?list=PLv3TTBr1W_9tppikBxAE_G6qjWdBljBHJ&pbj=1&lact=0
+# ajax json: https://m.youtube.com/playlist?list=PLv3TTBr1W_9tppikBxAE_G6qjWdBljBHJ&pbj=1&lact=0 with header X-YouTube-Client-Version: 1.20180418
+
+
+# continuation request types:
+# polymer_json: https://m.youtube.com/playlist?&ctoken=[...]&pbj=1
+# ajax json: https://m.youtube.com/playlist?action_continuation=1&ajax=1&ctoken=[...]
+
+
+headers_1 = (
+ ('Accept', '*/*'),
+ ('Accept-Language', 'en-US,en;q=0.5'),
+ ('X-YouTube-Client-Name', '1'),
+ ('X-YouTube-Client-Version', '2.20180614'),
+)
+
+def playlist_first_page(playlist_id):
+ url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&ajax=1&disable_polymer=true'
+ content = common.fetch_url(url, common.mobile_ua + headers_1)
+ if content[0:4] == b")]}'":
+ content = content[4:]
+ content = json.loads(common.uppercase_escape(content.decode('utf-8')))
+ return content
+
+ajax_info_dispatch = {
+ 'view_count_text': ('views', common.get_text),
+ 'num_videos_text': ('size', lambda node: common.get_text(node).split(' ')[0]),
+ 'thumbnail': ('thumbnail', lambda node: node.url),
+ 'title': ('title', common.get_text),
+ 'owner_text': ('author', common.get_text),
+ 'owner_endpoint': ('author_url', lambda node: node.url),
+ 'description': ('description', common.get_formatted_text),
+
+}
+def metadata_info(ajax_json):
+ info = {}
+ try:
+ for key, node in ajax_json.items():
+ try:
+ simple_key, function = dispatch[key]
+ except KeyError:
+ continue
+ info[simple_key] = function(node)
+ return info
+ except (KeyError,IndexError):
+ print(ajax_json)
+ raise
+
+
+
+
+#https://m.youtube.com/playlist?itct=CBMQybcCIhMIptj9xJaJ2wIV2JKcCh3Idwu-&ctoken=4qmFsgI2EiRWTFBMT3kwajlBdmxWWlB0bzZJa2pLZnB1MFNjeC0tN1BHVEMaDmVnWlFWRHBEUWxFJTNE&pbj=1
+def get_videos_ajax(playlist_id, page):
+
+ url = "https://m.youtube.com/playlist?action_continuation=1&ajax=1&ctoken=" + playlist_ctoken(playlist_id, (int(page)-1)*20)
+ headers = {
+ 'User-Agent': ' Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1',
+ 'Accept': '*/*',
+ 'Accept-Language': 'en-US,en;q=0.5',
+ 'X-YouTube-Client-Name': '2',
+ 'X-YouTube-Client-Version': '1.20180508',
+ }
+ print("Sending playlist ajax request")
+ content = common.fetch_url(url, headers)
+ with open('playlist_debug', 'wb') as f:
+ f.write(content)
+ content = content[4:]
+ print("Finished recieving playlist response")
+
+ info = json.loads(common.uppercase_escape(content.decode('utf-8')))
+ return info
+
+def get_playlist_videos(ajax_json):
+ videos = []
+ #info = get_bloated_playlist_videos(playlist_id, page)
+ #print(info)
+ video_list = ajax_json['content']['continuation_contents']['contents']
+
+
+ for video_json_crap in video_list:
+ try:
+ videos.append({
+ "title": video_json_crap["title"]['runs'][0]['text'],
+ "id": video_json_crap["video_id"],
+ "views": "",
+ "duration": common.default_multi_get(video_json_crap, 'length', 'runs', 0, 'text', default=''), # livestreams dont have a length
+ "author": video_json_crap['short_byline']['runs'][0]['text'],
+ "author_url": '',
+ "published": '',
+ 'playlist_index': '',
+
+ })
+ except (KeyError, IndexError):
+ print(video_json_crap)
+ raise
+ return videos
+
+def get_playlist_videos_format2(playlist_id, page):
+ videos = []
+ info = get_bloated_playlist_videos(playlist_id, page)
+ video_list = info['response']['continuationContents']['playlistVideoListContinuation']['contents']
+
+ for video_json_crap in video_list:
+
+ video_json_crap = video_json_crap['videoRenderer']
+
+ try:
+ videos.append({
+ "title": video_json_crap["title"]['runs'][0]['text'],
+ "video_id": video_json_crap["videoId"],
+ "views": "",
+ "duration": common.default_multi_get(video_json_crap, 'lengthText', 'runs', 0, 'text', default=''), # livestreams dont have a length
+ "uploader": video_json_crap['shortBylineText']['runs'][0]['text'],
+ "uploader_url": common.ORIGIN_URL + video_json_crap['shortBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
+ "published": common.default_multi_get(video_json_crap, 'publishedTimeText', 'simpleText', default=''),
+ 'playlist_index': video_json_crap['index']['runs'][0]['text'],
+
+ })
+ except (KeyError, IndexError):
+ print(video_json_crap)
+ raise
+ return videos
+
+
+def playlist_videos_html(ajax_json):
+ result = ''
+ for info in get_playlist_videos(ajax_json):
+ result += common.small_video_item_html(info)
+ return result
+
+playlist_stat_template = Template('''
+
$stat
''')
+def get_playlist_page(query_string):
+ parameters = urllib.parse.parse_qs(query_string)
+ playlist_id = parameters['list'][0]
+ page = parameters.get("page", "1")[0]
+ if page == "1":
+ first_page_json = playlist_first_page(playlist_id)
+ this_page_json = first_page_json
+ else:
+ tasks = (
+ gevent.spawn(playlist_first_page, playlist_id ),
+ gevent.spawn(get_videos_ajax, playlist_id, page)
+ )
+ gevent.joinall(tasks)
+ first_page_json, this_page_json = tasks[0].value, tasks[1].value
+
+ try:
+ video_list = this_page_json['content']['section_list']['contents'][0]['contents'][0]['contents']
+ except KeyError:
+ video_list = this_page_json['content']['continuation_contents']['contents']
+ videos_html = ''
+ for video_json in video_list:
+ info = common.ajax_info(video_json)
+ videos_html += common.video_item_html(info, common.small_video_item_template)
+
+
+ metadata = common.ajax_info(first_page_json['content']['playlist_header'])
+ video_count = int(metadata['size'].replace(',', ''))
+ page_buttons = common.page_buttons_html(int(page), math.ceil(video_count/20), common.URL_ORIGIN + "/playlist", query_string)
+
+ html_ready = common.get_html_ready(metadata)
+ html_ready['page_title'] = html_ready['title'] + ' - Page ' + str(page)
+
+ stats = ''
+ stats += playlist_stat_template.substitute(stat=html_ready['size'] + ' videos')
+ stats += playlist_stat_template.substitute(stat=html_ready['views'])
+ return yt_playlist_template.substitute(
+ videos = videos_html,
+ page_buttons = page_buttons,
+ stats = stats,
+ **html_ready
)
\ No newline at end of file
diff --git a/youtube/proto.py b/youtube/proto.py
index 9f9dbcc..6230e51 100644
--- a/youtube/proto.py
+++ b/youtube/proto.py
@@ -1,65 +1,65 @@
-from math import ceil
-import base64
-
-def byte(n):
- return bytes((n,))
-
-
-def varint_encode(offset):
- '''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
- The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
- aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
- 1ccccccc 1bbbbbbb 0aaaaaaa
-
- This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
- See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
- needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case.
- encoded_bytes = bytearray(needed_bytes)
- for i in range(0, needed_bytes - 1):
- encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
- offset = offset >> 7
- encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
-
- return bytes(encoded_bytes)
-
-
-def varint_decode(encoded):
- decoded = 0
- for i, byte in enumerate(encoded):
- decoded |= (byte & 127) << 7*i
-
- if not (byte & 128):
- break
- return decoded
-
-
-def string(field_number, data):
- data = as_bytes(data)
- return _proto_field(2, field_number, varint_encode(len(data)) + data)
-nested = string
-
-def uint(field_number, value):
- return _proto_field(0, field_number, varint_encode(value))
-
-
-
-
-def _proto_field(wire_type, field_number, data):
- ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
- return varint_encode( (field_number << 3) | wire_type) + data
-
-
-
-def percent_b64encode(data):
- return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
-
-
-def unpadded_b64encode(data):
- return base64.urlsafe_b64encode(data).replace(b'=', b'')
-
-def as_bytes(value):
- if isinstance(value, str):
- return value.encode('ascii')
- return value
-
+from math import ceil
+import base64
+
+def byte(n):
+ return bytes((n,))
+
+
+def varint_encode(offset):
+ '''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
+ The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
+ aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
+ 1ccccccc 1bbbbbbb 0aaaaaaa
+
+ This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
+ See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
+ needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case.
+ encoded_bytes = bytearray(needed_bytes)
+ for i in range(0, needed_bytes - 1):
+ encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
+ offset = offset >> 7
+ encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
+
+ return bytes(encoded_bytes)
+
+
+def varint_decode(encoded):
+ decoded = 0
+ for i, byte in enumerate(encoded):
+ decoded |= (byte & 127) << 7*i
+
+ if not (byte & 128):
+ break
+ return decoded
+
+
+def string(field_number, data):
+ data = as_bytes(data)
+ return _proto_field(2, field_number, varint_encode(len(data)) + data)
+nested = string
+
+def uint(field_number, value):
+ return _proto_field(0, field_number, varint_encode(value))
+
+
+
+
+def _proto_field(wire_type, field_number, data):
+ ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
+ return varint_encode( (field_number << 3) | wire_type) + data
+
+
+
+def percent_b64encode(data):
+ return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
+
+
+def unpadded_b64encode(data):
+ return base64.urlsafe_b64encode(data).replace(b'=', b'')
+
+def as_bytes(value):
+ if isinstance(value, str):
+ return value.encode('ascii')
+ return value
+
\ No newline at end of file
diff --git a/youtube/search.py b/youtube/search.py
index 5268dbe..5982d9b 100644
--- a/youtube/search.py
+++ b/youtube/search.py
@@ -1,231 +1,231 @@
-import json
-import urllib
-import html
-from string import Template
-import base64
-from math import ceil
-from youtube.common import default_multi_get, get_thumbnail_url, URL_ORIGIN
-import youtube.common as common
-
-with open("yt_search_results_template.html", "r") as file:
- yt_search_results_template = file.read()
-
-with open("yt_search_template.html", "r") as file:
- yt_search_template = file.read()
-
-page_button_template = Template('''$page''')
-current_page_button_template = Template('''
-'''
-
-
-
-# Sort: 1
- # Upload date: 2
- # View count: 3
- # Rating: 1
-# Offset: 9
-# Filters: 2
- # Upload date: 1
- # Type: 2
- # Duration: 3
-
-
-features = {
- '4k': 14,
- 'hd': 4,
- 'hdr': 25,
- 'subtitles': 5,
- 'creative_commons': 6,
- '3d': 7,
- 'live': 8,
- 'purchased': 9,
- '360': 15,
- 'location': 23,
-}
-
-def page_number_to_sp_parameter(page):
- offset = (int(page) - 1)*20 # 20 results per page
- first_byte = 255 & offset
- second_byte = 255 & (offset >> 7)
- second_byte = second_byte | 1
-
- # 0b01001000 is required, and is always the same.
- # The next 2 bytes encode the offset in little endian order,
- # BUT, it's done in a strange way. The least significant bit (LSB) of the second byte is not part
- # of the offset. Instead, to get the number which the two bytes encode, that LSB
- # of the second byte is combined with the most significant bit (MSB) of the first byte
- # in a logical AND. Replace the two bits with the result of the AND to get the two little endian
- # bytes that represent the offset.
- # I figured this out by trial and error on the sp parameter. I don't know why it's done like this;
- # perhaps it's just obfuscation.
- param_bytes = bytes((0b01001000, first_byte, second_byte))
- param_encoded = urllib.parse.quote(base64.urlsafe_b64encode(param_bytes))
- return param_encoded
-
-def get_search_json(query, page):
- url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(query)
- headers = {
- 'Host': 'www.youtube.com',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
- 'Accept': '*/*',
- 'Accept-Language': 'en-US,en;q=0.5',
- 'X-YouTube-Client-Name': '1',
- 'X-YouTube-Client-Version': '2.20180418',
- }
- url += "&pbj=1&sp=" + page_number_to_sp_parameter(page)
- content = common.fetch_url(url, headers=headers)
- info = json.loads(content)
- return info
-
-"""def get_search_info(query, page):
- result_info = dict()
- info = get_bloated_search_info(query, page)
-
- estimated_results = int(info[1]['response']['estimatedResults'])
- estimated_pages = ceil(estimated_results/20)
- result_info['estimated_results'] = estimated_results
- result_info['estimated_pages'] = estimated_pages
-
- result_info['results'] = []
- # this is what you get when you hire H-1B's
- video_list = info[1]['response']['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents']
-
-
- for video_json_crap in video_list:
- # they have a dictionary whose only content is another dictionary...
- try:
- type = list(video_json_crap.keys())[0]
- except KeyError:
- continue #channelRenderer or playlistRenderer
- '''description = ""
- for text_run in video_json_crap["descriptionSnippet"]["runs"]:
- if text_run.get("bold", False):
- description += "" + html.escape'''
- try:
- result_info['results'].append({
- "title": video_json_crap["title"]["simpleText"],
- "video_id": video_json_crap["videoId"],
- "description": video_json_crap.get("descriptionSnippet",dict()).get('runs',[]), # a list of text runs (formmated), rather than plain text
- "thumbnail": get_thumbnail_url(video_json_crap["videoId"]),
- "views_text": video_json_crap['viewCountText'].get('simpleText', None) or video_json_crap['viewCountText']['runs'][0]['text'],
- "length_text": default_multi_get(video_json_crap, 'lengthText', 'simpleText', default=''), # livestreams dont have a length
- "uploader": video_json_crap['longBylineText']['runs'][0]['text'],
- "uploader_url": URL_ORIGIN + video_json_crap['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
- "published_time_text": default_multi_get(video_json_crap, 'publishedTimeText', 'simpleText', default=''),
-
- })
- except KeyError:
- print(video_json_crap)
- raise
- return result_info"""
-
-
-def page_buttons_html(page_start, page_end, current_page, query):
- result = ""
- for page in range(page_start, page_end+1):
- if page == current_page:
- template = current_page_button_template
- else:
- template = page_button_template
- result += template.substitute(page=page, href=URL_ORIGIN + "/search?query=" + urllib.parse.quote_plus(query) + "&page=" + str(page))
- return result
-
-showing_results_for = Template('''
-
+'''
+
+
+
+# Sort: 1
+ # Upload date: 2
+ # View count: 3
+ # Rating: 1
+# Offset: 9
+# Filters: 2
+ # Upload date: 1
+ # Type: 2
+ # Duration: 3
+
+
+features = {
+ '4k': 14,
+ 'hd': 4,
+ 'hdr': 25,
+ 'subtitles': 5,
+ 'creative_commons': 6,
+ '3d': 7,
+ 'live': 8,
+ 'purchased': 9,
+ '360': 15,
+ 'location': 23,
+}
+
+def page_number_to_sp_parameter(page):
+ offset = (int(page) - 1)*20 # 20 results per page
+ first_byte = 255 & offset
+ second_byte = 255 & (offset >> 7)
+ second_byte = second_byte | 1
+
+ # 0b01001000 is required, and is always the same.
+ # The next 2 bytes encode the offset in little endian order,
+ # BUT, it's done in a strange way. The least significant bit (LSB) of the second byte is not part
+ # of the offset. Instead, to get the number which the two bytes encode, that LSB
+ # of the second byte is combined with the most significant bit (MSB) of the first byte
+ # in a logical AND. Replace the two bits with the result of the AND to get the two little endian
+ # bytes that represent the offset.
+ # I figured this out by trial and error on the sp parameter. I don't know why it's done like this;
+ # perhaps it's just obfuscation.
+ param_bytes = bytes((0b01001000, first_byte, second_byte))
+ param_encoded = urllib.parse.quote(base64.urlsafe_b64encode(param_bytes))
+ return param_encoded
+
+def get_search_json(query, page):
+ url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(query)
+ headers = {
+ 'Host': 'www.youtube.com',
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
+ 'Accept': '*/*',
+ 'Accept-Language': 'en-US,en;q=0.5',
+ 'X-YouTube-Client-Name': '1',
+ 'X-YouTube-Client-Version': '2.20180418',
+ }
+ url += "&pbj=1&sp=" + page_number_to_sp_parameter(page)
+ content = common.fetch_url(url, headers=headers)
+ info = json.loads(content)
+ return info
+
+"""def get_search_info(query, page):
+ result_info = dict()
+ info = get_bloated_search_info(query, page)
+
+ estimated_results = int(info[1]['response']['estimatedResults'])
+ estimated_pages = ceil(estimated_results/20)
+ result_info['estimated_results'] = estimated_results
+ result_info['estimated_pages'] = estimated_pages
+
+ result_info['results'] = []
+ # this is what you get when you hire H-1B's
+ video_list = info[1]['response']['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents']
+
+
+ for video_json_crap in video_list:
+ # they have a dictionary whose only content is another dictionary...
+ try:
+ type = list(video_json_crap.keys())[0]
+ except KeyError:
+ continue #channelRenderer or playlistRenderer
+ '''description = ""
+ for text_run in video_json_crap["descriptionSnippet"]["runs"]:
+ if text_run.get("bold", False):
+ description += "" + html.escape'''
+ try:
+ result_info['results'].append({
+ "title": video_json_crap["title"]["simpleText"],
+ "video_id": video_json_crap["videoId"],
+ "description": video_json_crap.get("descriptionSnippet",dict()).get('runs',[]), # a list of text runs (formmated), rather than plain text
+ "thumbnail": get_thumbnail_url(video_json_crap["videoId"]),
+ "views_text": video_json_crap['viewCountText'].get('simpleText', None) or video_json_crap['viewCountText']['runs'][0]['text'],
+ "length_text": default_multi_get(video_json_crap, 'lengthText', 'simpleText', default=''), # livestreams dont have a length
+ "uploader": video_json_crap['longBylineText']['runs'][0]['text'],
+ "uploader_url": URL_ORIGIN + video_json_crap['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
+ "published_time_text": default_multi_get(video_json_crap, 'publishedTimeText', 'simpleText', default=''),
+
+ })
+ except KeyError:
+ print(video_json_crap)
+ raise
+ return result_info"""
+
+
+def page_buttons_html(page_start, page_end, current_page, query):
+ result = ""
+ for page in range(page_start, page_end+1):
+ if page == current_page:
+ template = current_page_button_template
+ else:
+ template = page_button_template
+ result += template.substitute(page=page, href=URL_ORIGIN + "/search?query=" + urllib.parse.quote_plus(query) + "&page=" + str(page))
+ return result
+
+showing_results_for = Template('''
+