153 lines
5.1 KiB
Python
153 lines
5.1 KiB
Python
import socks, sockshandler
|
|
import gzip
|
|
import brotli
|
|
import urllib.parse
|
|
import re
|
|
import time
|
|
import settings
|
|
|
|
|
|
URL_ORIGIN = "/https://www.youtube.com"
|
|
|
|
|
|
class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
|
|
'''Separate cookiejars for receiving and sending'''
|
|
def __init__(self, cookiejar_send=None, cookiejar_receive=None):
|
|
import http.cookiejar
|
|
self.cookiejar_send = cookiejar_send
|
|
self.cookiejar_receive = cookiejar_receive
|
|
|
|
def http_request(self, request):
|
|
if self.cookiejar_send is not None:
|
|
self.cookiejar_send.add_cookie_header(request)
|
|
return request
|
|
|
|
def http_response(self, request, response):
|
|
if self.cookiejar_receive is not None:
|
|
self.cookiejar_receive.extract_cookies(response, request)
|
|
return response
|
|
|
|
https_request = http_request
|
|
https_response = http_response
|
|
|
|
|
|
def decode_content(content, encoding_header):
|
|
encodings = encoding_header.replace(' ', '').split(',')
|
|
for encoding in reversed(encodings):
|
|
if encoding == 'identity':
|
|
continue
|
|
if encoding == 'br':
|
|
content = brotli.decompress(content)
|
|
elif encoding == 'gzip':
|
|
content = gzip.decompress(content)
|
|
return content
|
|
|
|
def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True):
|
|
'''
|
|
When cookiejar_send is set to a CookieJar object,
|
|
those cookies will be sent in the request (but cookies in response will not be merged into it)
|
|
When cookiejar_receive is set to a CookieJar object,
|
|
cookies received in the response will be merged into the object (nothing will be sent from it)
|
|
When both are set to the same object, cookies will be sent from the object,
|
|
and response cookies will be merged into it.
|
|
'''
|
|
headers = dict(headers) # Note: Calling dict() on a dict will make a copy
|
|
headers['Accept-Encoding'] = 'gzip, br'
|
|
|
|
# prevent python version being leaked by urllib if User-Agent isn't provided
|
|
# (urllib will use ex. Python-urllib/3.6 otherwise)
|
|
if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers:
|
|
headers['User-Agent'] = 'Python-urllib'
|
|
|
|
if data is not None:
|
|
if isinstance(data, str):
|
|
data = data.encode('ascii')
|
|
elif not isinstance(data, bytes):
|
|
data = urllib.parse.urlencode(data).encode('ascii')
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
req = urllib.request.Request(url, data=data, headers=headers)
|
|
|
|
cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
|
|
|
|
if use_tor and settings.route_tor:
|
|
opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor)
|
|
else:
|
|
opener = urllib.request.build_opener(cookie_processor)
|
|
|
|
response = opener.open(req, timeout=timeout)
|
|
response_time = time.time()
|
|
|
|
|
|
content = response.read()
|
|
read_finish = time.time()
|
|
if report_text:
|
|
print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
|
|
content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
|
|
return content
|
|
|
|
mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
|
|
mobile_ua = (('User-Agent', mobile_user_agent),)
|
|
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
|
|
desktop_ua = (('User-Agent', desktop_user_agent),)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dict_add(*dicts):
|
|
for dictionary in dicts[1:]:
|
|
dicts[0].update(dictionary)
|
|
return dicts[0]
|
|
|
|
def video_id(url):
|
|
url_parts = urllib.parse.urlparse(url)
|
|
return urllib.parse.parse_qs(url_parts.query)['v'][0]
|
|
|
|
def default_multi_get(object, *keys, default):
|
|
''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
|
|
try:
|
|
for key in keys:
|
|
object = object[key]
|
|
return object
|
|
except (IndexError, KeyError):
|
|
return default
|
|
|
|
|
|
# default, sddefault, mqdefault, hqdefault, hq720
|
|
def get_thumbnail_url(video_id):
|
|
return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
|
|
|
|
def seconds_to_timestamp(seconds):
|
|
seconds = int(seconds)
|
|
hours, seconds = divmod(seconds,3600)
|
|
minutes, seconds = divmod(seconds,60)
|
|
if hours != 0:
|
|
timestamp = str(hours) + ":"
|
|
timestamp += str(minutes).zfill(2) # zfill pads with zeros
|
|
else:
|
|
timestamp = str(minutes)
|
|
|
|
timestamp += ":" + str(seconds).zfill(2)
|
|
return timestamp
|
|
|
|
|
|
|
|
def update_query_string(query_string, items):
|
|
parameters = urllib.parse.parse_qs(query_string)
|
|
parameters.update(items)
|
|
return urllib.parse.urlencode(parameters, doseq=True)
|
|
|
|
|
|
|
|
def uppercase_escape(s):
|
|
return re.sub(
|
|
r'\\U([0-9a-fA-F]{8})',
|
|
lambda m: chr(int(m.group(1), base=16)), s) |