138 lines
5.2 KiB
Python
138 lines
5.2 KiB
Python
from youtube import util, yt_data_extract, proto, local_playlist
|
|
from youtube import yt_app
|
|
import settings
|
|
|
|
import json
|
|
import urllib
|
|
import base64
|
|
from math import ceil
|
|
import mimetypes
|
|
from flask import request
|
|
import flask
|
|
|
|
# Sort: 1
|
|
# Upload date: 2
|
|
# View count: 3
|
|
# Rating: 1
|
|
# Relevance: 0
|
|
# Offset: 9
|
|
# Filters: 2
|
|
# Upload date: 1
|
|
# Type: 2
|
|
# Duration: 3
|
|
|
|
|
|
features = {
|
|
'4k': 14,
|
|
'hd': 4,
|
|
'hdr': 25,
|
|
'subtitles': 5,
|
|
'creative_commons': 6,
|
|
'3d': 7,
|
|
'live': 8,
|
|
'purchased': 9,
|
|
'360': 15,
|
|
'location': 23,
|
|
}
|
|
|
|
def page_number_to_sp_parameter(page, autocorrect, sort, filters):
|
|
offset = (int(page) - 1)*20 # 20 results per page
|
|
autocorrect = proto.nested(8, proto.uint(1, 1 - int(autocorrect) ))
|
|
filters_enc = proto.nested(2, proto.uint(1, filters['time']) + proto.uint(2, filters['type']) + proto.uint(3, filters['duration']))
|
|
result = proto.uint(1, sort) + filters_enc + autocorrect + proto.uint(9, offset) + proto.string(61, b'')
|
|
return base64.urlsafe_b64encode(result).decode('ascii')
|
|
|
|
def get_search_json(query, page, autocorrect, sort, filters):
|
|
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(query)
|
|
headers = {
|
|
'Host': 'www.youtube.com',
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
|
|
'Accept': '*/*',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'X-YouTube-Client-Name': '1',
|
|
'X-YouTube-Client-Version': '2.20180418',
|
|
}
|
|
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
|
|
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
|
|
info = json.loads(content)
|
|
return info
|
|
|
|
|
|
@yt_app.route('/search')
|
|
def get_search_page():
|
|
if len(request.args) == 0:
|
|
return flask.render_template('base.html', title="Search")
|
|
|
|
if 'query' not in request.args:
|
|
abort(400)
|
|
|
|
query = request.args.get("query")
|
|
page = request.args.get("page", "1")
|
|
autocorrect = int(request.args.get("autocorrect", "1"))
|
|
sort = int(request.args.get("sort", "0"))
|
|
filters = {}
|
|
filters['time'] = int(request.args.get("time", "0"))
|
|
filters['type'] = int(request.args.get("type", "0"))
|
|
filters['duration'] = int(request.args.get("duration", "0"))
|
|
info = get_search_json(query, page, autocorrect, sort, filters)
|
|
|
|
estimated_results = int(info[1]['response']['estimatedResults'])
|
|
estimated_pages = ceil(estimated_results/20)
|
|
|
|
# almost always is the first "section", but if there's an advertisement for a google product like Stadia or Home in the search results, then that becomes the first "section" and the search results are in the second. So just join all of them for resiliency
|
|
results = []
|
|
for section in info[1]['response']['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents']:
|
|
results += section['itemSectionRenderer']['contents']
|
|
|
|
parsed_results = []
|
|
corrections = {'type': None}
|
|
for renderer in results:
|
|
type = list(renderer.keys())[0]
|
|
if type == 'shelfRenderer':
|
|
continue
|
|
if type == 'didYouMeanRenderer':
|
|
renderer = renderer[type]
|
|
corrected_query_string = request.args.to_dict(flat=False)
|
|
corrected_query_string['query'] = [renderer['correctedQueryEndpoint']['searchEndpoint']['query']]
|
|
corrected_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
|
|
|
|
corrections = {
|
|
'type': 'did_you_mean',
|
|
'corrected_query': yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
|
|
'corrected_query_url': corrected_query_url,
|
|
}
|
|
continue
|
|
if type == 'showingResultsForRenderer':
|
|
renderer = renderer[type]
|
|
no_autocorrect_query_string = request.args.to_dict(flat=False)
|
|
no_autocorrect_query_string['autocorrect'] = ['0']
|
|
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
|
|
|
|
corrections = {
|
|
'type': 'showing_results_for',
|
|
'corrected_query': yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
|
|
'original_query_url': no_autocorrect_query_url,
|
|
'original_query': renderer['originalQuery']['simpleText'],
|
|
}
|
|
continue
|
|
|
|
info = yt_data_extract.parse_info_prepare_for_html(renderer)
|
|
if info['type'] != 'unsupported':
|
|
parsed_results.append(info)
|
|
|
|
return flask.render_template('search.html',
|
|
header_playlist_names = local_playlist.get_playlist_names(),
|
|
query = query,
|
|
estimated_results = estimated_results,
|
|
estimated_pages = estimated_pages,
|
|
corrections = corrections,
|
|
results = parsed_results,
|
|
parameters_dictionary = request.args,
|
|
)
|
|
|
|
@yt_app.route('/opensearch.xml')
|
|
def get_search_engine_xml():
|
|
with open("youtube/opensearch.xml", 'rb') as f:
|
|
content = f.read().replace(b'$port_number', str(settings.port_number).encode())
|
|
return flask.Response(content, mimetype='application/xml')
|