This commit is contained in:
Jesús 2020-12-21 21:44:14 -05:00
parent dd2d856514
commit 90f8317b36
No known key found for this signature in database
GPG Key ID: F6EE7BC59A315766

View File

@ -104,6 +104,7 @@ from math import ceil
import base64 import base64
import io import io
def byte(n): def byte(n):
return bytes((n,)) return bytes((n,))
@ -116,12 +117,13 @@ def varint_encode(offset):
This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data. This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.''' See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case. # (0).bit_length() returns 0, but we need 1 in that case.
needed_bytes = ceil(offset.bit_length()/7) or 1
encoded_bytes = bytearray(needed_bytes) encoded_bytes = bytearray(needed_bytes)
for i in range(0, needed_bytes - 1): for i in range(0, needed_bytes - 1):
encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
offset = offset >> 7 offset = offset >> 7
encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
return bytes(encoded_bytes) return bytes(encoded_bytes)
@ -139,15 +141,18 @@ def varint_decode(encoded):
def string(field_number, data): def string(field_number, data):
data = as_bytes(data) data = as_bytes(data)
return _proto_field(2, field_number, varint_encode(len(data)) + data) return _proto_field(2, field_number, varint_encode(len(data)) + data)
nested = string nested = string
def uint(field_number, value): def uint(field_number, value):
return _proto_field(0, field_number, varint_encode(value)) return _proto_field(0, field_number, varint_encode(value))
def _proto_field(wire_type, field_number, data): def _proto_field(wire_type, field_number, data):
''' See https://developers.google.com/protocol-buffers/docs/encoding#structure ''' ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
return varint_encode( (field_number << 3) | wire_type) + data return varint_encode((field_number << 3) | wire_type) + data
def percent_b64encode(data): def percent_b64encode(data):
@ -173,7 +178,9 @@ def read_varint(data):
except IndexError: except IndexError:
if i == 0: if i == 0:
raise EOFError() raise EOFError()
raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) raise Exception(
'Unterminated varint starting at ' + str(data.tell() - i)
)
result |= (byte & 127) << 7*i result |= (byte & 127) << 7*i
if not byte & 128: if not byte & 128:
break break
@ -192,14 +199,17 @@ def read_group(data, end_sequence):
def parse(data): def parse(data):
return {field_number: value for _, field_number, value in read_protobuf(data)} return {
field_number: value for _,
field_number, value in read_protobuf(data)
}
def b64_to_bytes(data): def b64_to_bytes(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
data = data.replace("%3D", "=") data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data)%4)%4) ) return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
# -------------------------------------------------------------------- # --------------------------------------------------------------------
@ -209,30 +219,41 @@ dec = b64_to_bytes
def enc(t): def enc(t):
return base64.urlsafe_b64encode(t).decode('ascii') return base64.urlsafe_b64encode(t).decode('ascii')
def uenc(t): def uenc(t):
return enc(t).replace("=", "%3D") return enc(t).replace("=", "%3D")
def b64_to_ascii(t): def b64_to_ascii(t):
return base64.urlsafe_b64decode(t).decode('ascii', errors='replace') return base64.urlsafe_b64decode(t).decode('ascii', errors='replace')
def b64_to_bin(t): def b64_to_bin(t):
decoded = base64.urlsafe_b64decode(t) decoded = base64.urlsafe_b64decode(t)
#print(len(decoded)*8) # print(len(decoded)*8)
return " ".join(["{:08b}".format(x) for x in decoded]) return " ".join(["{:08b}".format(x) for x in decoded])
def bytes_to_bin(t): def bytes_to_bin(t):
return " ".join(["{:08b}".format(x) for x in t]) return " ".join(["{:08b}".format(x) for x in t])
def bin_to_bytes(t): def bin_to_bytes(t):
return int(t, 2).to_bytes((len(t) + 7) // 8, 'big') return int(t, 2).to_bytes((len(t) + 7) // 8, 'big')
def bytes_to_hex(t): def bytes_to_hex(t):
return ' '.join(hex(n)[2:].zfill(2) for n in t) return ' '.join(hex(n)[2:].zfill(2) for n in t)
tohex = bytes_to_hex tohex = bytes_to_hex
fromhex = bytes.fromhex fromhex = bytes.fromhex
def aligned_ascii(data): def aligned_ascii(data):
return ' '.join(' ' + chr(n) if n in range(32,128) else ' _' for n in data) return ' '.join(' ' + chr(n) if n in range(
32, 128) else ' _' for n in data)
def parse_protobuf(data, mutable=False, spec=()): def parse_protobuf(data, mutable=False, spec=()):
data_original = data data_original = data
@ -245,7 +266,7 @@ def parse_protobuf(data, mutable=False, spec=()):
break break
wire_type = tag & 7 wire_type = tag & 7
field_number = tag >> 3 field_number = tag >> 3
if wire_type == 0: if wire_type == 0:
value = read_varint(data) value = read_varint(data)
elif wire_type == 1: elif wire_type == 1:
@ -265,9 +286,11 @@ def parse_protobuf(data, mutable=False, spec=()):
else: else:
yield (wire_type, field_number, value) yield (wire_type, field_number, value)
def pb(data, mutable=False): def pb(data, mutable=False):
return list(parse_protobuf(data, mutable=mutable)) return list(parse_protobuf(data, mutable=mutable))
def make_proto(fields): def make_proto(fields):
if len(fields) == 2 and fields[0] == 'base64': if len(fields) == 2 and fields[0] == 'base64':
return enc(make_proto(fields[1])) return enc(make_proto(fields[1]))
@ -312,6 +335,7 @@ _b32rev = None
bytes_types = (bytes, bytearray) # Types acceptable as binary data bytes_types = (bytes, bytearray) # Types acceptable as binary data
def _bytes_from_decode_data(s): def _bytes_from_decode_data(s):
if isinstance(s, str): if isinstance(s, str):
try: try:
@ -327,7 +351,6 @@ def _bytes_from_decode_data(s):
"string, not %r" % s.__class__.__name__) from None "string, not %r" % s.__class__.__name__) from None
def b32decode(s, casefold=False, map01=None): def b32decode(s, casefold=False, map01=None):
"""Decode the Base32 encoded bytes-like object or ASCII string s. """Decode the Base32 encoded bytes-like object or ASCII string s.
@ -397,10 +420,12 @@ def b32decode(s, casefold=False, map01=None):
raise binascii.Error('Incorrect padding') raise binascii.Error('Incorrect padding')
return bytes(decoded) return bytes(decoded)
def dec32(data): def dec32(data):
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('ascii') data = data.decode('ascii')
return b32decode(data + "="*((8 - len(data)%8)%8)) return b32decode(data + "="*((8 - len(data) % 8) % 8))
def recursive_pb(data, filt=True): def recursive_pb(data, filt=True):
b64 = False b64 = False
@ -415,7 +440,7 @@ def recursive_pb(data, filt=True):
return data return data
try: try:
result = pb(data, mutable=True) result = pb(data, mutable=True)
except Exception as e: except Exception as e:
return data return data
for tuple in result: for tuple in result:
@ -429,10 +454,10 @@ def recursive_pb(data, filt=True):
return result return result
def indent_lines(lines, indent): def indent_lines(lines, indent):
return re.sub(r'^', ' '*indent, lines, flags=re.MULTILINE) return re.sub(r'^', ' '*indent, lines, flags=re.MULTILINE)
def _pp(obj, indent): # not my best work def _pp(obj, indent): # not my best work
if isinstance(obj, tuple): if isinstance(obj, tuple):
if len(obj) == 3: # (wire_type, field_number, data) if len(obj) == 3: # (wire_type, field_number, data)
@ -443,15 +468,13 @@ def _pp(obj, indent): # not my best work
+ ')') + ')')
elif isinstance(obj, list): elif isinstance(obj, list):
# [wire_type, field_number, data] # [wire_type, field_number, data]
if (len(obj) == 3 if (len(obj) == 3 and not any(
and not any(isinstance(x, (list, tuple)) for x in obj) isinstance(x, (list, tuple)) for x in obj)):
):
return obj.__repr__() return obj.__repr__()
# [wire_type, field_number, [...]] # [wire_type, field_number, [...]]
elif (len(obj) == 3 elif (len(obj) == 3 and not any(
and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) isinstance(x, (list, tuple)) for x in obj[0:2])):
):
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n'
+ indent_lines(_pp(obj[2], indent), indent) + '\n' + indent_lines(_pp(obj[2], indent), indent) + '\n'
+ ']') + ']')
@ -464,6 +487,7 @@ def _pp(obj, indent): # not my best work
else: else:
return obj.__repr__() return obj.__repr__()
def pp(obj, indent=1): def pp(obj, indent=1):
'''Pretty prints the recursive pb structure''' '''Pretty prints the recursive pb structure'''
print(_pp(obj, indent)) print(_pp(obj, indent))
@ -484,5 +508,3 @@ mobile_headers = (
('X-YouTube-Client-Name', '2'), ('X-YouTube-Client-Name', '2'),
('X-YouTube-Client-Version', '2.20180830'), ('X-YouTube-Client-Version', '2.20180830'),
) + (('User-Agent', mobile_user_agent),) ) + (('User-Agent', mobile_user_agent),)