
Comment reply protobuf now requires the channel id of the uploader of the video. Otherwise the endpoint returns 500. Instead of making the protobuf ourselves and passing this data around through query parameters, just use the ctoken provided to us but modify the max_replies field from 10 to 250. Fixes #53 Signed-off-by: Jesús <heckyel@hyperbola.info>
211 lines
6.3 KiB
Python
211 lines
6.3 KiB
Python
from math import ceil
|
|
import base64
|
|
import io
|
|
import traceback
|
|
|
|
|
|
def byte(n):
|
|
return bytes((n,))
|
|
|
|
|
|
def varint_encode(offset):
|
|
'''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
|
|
The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
|
|
aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
|
|
1ccccccc 1bbbbbbb 0aaaaaaa
|
|
|
|
This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
|
|
See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
|
|
needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case.
|
|
encoded_bytes = bytearray(needed_bytes)
|
|
for i in range(0, needed_bytes - 1):
|
|
encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
|
|
offset = offset >> 7
|
|
encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
|
|
|
|
return bytes(encoded_bytes)
|
|
|
|
|
|
def varint_decode(encoded):
|
|
decoded = 0
|
|
for i, byte in enumerate(encoded):
|
|
decoded |= (byte & 127) << 7*i
|
|
|
|
if not (byte & 128):
|
|
break
|
|
return decoded
|
|
|
|
|
|
def string(field_number, data):
|
|
data = as_bytes(data)
|
|
return _proto_field(2, field_number, varint_encode(len(data)) + data)
|
|
|
|
|
|
nested = string
|
|
|
|
|
|
def uint(field_number, value):
|
|
return _proto_field(0, field_number, varint_encode(value))
|
|
|
|
|
|
def _proto_field(wire_type, field_number, data):
|
|
''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
|
|
return varint_encode((field_number << 3) | wire_type) + data
|
|
|
|
|
|
def percent_b64encode(data):
|
|
return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
|
|
|
|
|
|
def unpadded_b64encode(data):
|
|
return base64.urlsafe_b64encode(data).replace(b'=', b'')
|
|
|
|
|
|
def as_bytes(value):
|
|
if isinstance(value, str):
|
|
return value.encode('utf-8')
|
|
return value
|
|
|
|
|
|
def read_varint(data):
|
|
result = 0
|
|
i = 0
|
|
while True:
|
|
try:
|
|
byte = data.read(1)[0]
|
|
except IndexError:
|
|
if i == 0:
|
|
raise EOFError()
|
|
raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
|
|
result |= (byte & 127) << 7*i
|
|
if not byte & 128:
|
|
break
|
|
|
|
i += 1
|
|
return result
|
|
|
|
|
|
def read_group(data, end_sequence):
|
|
start = data.tell()
|
|
index = data.original.find(end_sequence, start)
|
|
if index == -1:
|
|
raise Exception('Unterminated group')
|
|
data.seek(index + len(end_sequence))
|
|
return data.original[start:index]
|
|
|
|
def read_protobuf(data):
|
|
data_original = data
|
|
data = io.BytesIO(data)
|
|
data.original = data_original
|
|
while True:
|
|
try:
|
|
tag = read_varint(data)
|
|
except EOFError:
|
|
break
|
|
wire_type = tag & 7
|
|
field_number = tag >> 3
|
|
|
|
if wire_type == 0:
|
|
value = read_varint(data)
|
|
elif wire_type == 1:
|
|
value = data.read(8)
|
|
elif wire_type == 2:
|
|
length = read_varint(data)
|
|
value = data.read(length)
|
|
elif wire_type == 3:
|
|
end_bytes = encode_varint((field_number << 3) | 4)
|
|
value = read_group(data, end_bytes)
|
|
elif wire_type == 5:
|
|
value = data.read(4)
|
|
else:
|
|
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(succinct_encode(tag)) + ", at position " + str(data.tell()))
|
|
yield (wire_type, field_number, value)
|
|
|
|
|
|
def parse(data, include_wire_type=False):
|
|
'''Returns a dict mapping field numbers to values
|
|
|
|
data is the protobuf structure, which must not be b64-encoded'''
|
|
if include_wire_type:
|
|
return {field_number: [wire_type, value]
|
|
for wire_type, field_number, value in read_protobuf(data)}
|
|
return {field_number: value
|
|
for _, field_number, value in read_protobuf(data)}
|
|
|
|
|
|
base64_enc_funcs = {
|
|
'base64': base64.urlsafe_b64encode,
|
|
'base64s': unpadded_b64encode,
|
|
'base64p': percent_b64encode,
|
|
}
|
|
|
|
|
|
def _make_protobuf(data):
|
|
# must be dict mapping field_number to [wire_type, value]
|
|
if isinstance(data, dict):
|
|
new_data = []
|
|
for field_num, (wire_type, value) in sorted(data.items()):
|
|
new_data.append((wire_type, field_num, value))
|
|
data = new_data
|
|
if isinstance(data, str):
|
|
return data.encode('utf-8')
|
|
elif len(data) == 2 and data[0] in base64_enc_funcs:
|
|
return base64_enc_funcs[data[0]](make_proto(data[1]))
|
|
elif isinstance(data, list):
|
|
result = b''
|
|
for field in data:
|
|
if field[0] == 0:
|
|
result += uint(field[1], field[2])
|
|
elif field[0] == 2:
|
|
result += string(field[1], _make_protobuf(field[2]))
|
|
else:
|
|
raise NotImplementedError('Wire type ' + str(field[0])
|
|
+ ' not implemented')
|
|
return result
|
|
return data
|
|
|
|
|
|
def make_protobuf(data):
|
|
return _make_protobuf(data).decode('ascii')
|
|
|
|
|
|
def _set_protobuf_value(data, *path, value):
|
|
if not path:
|
|
return value
|
|
op = path[0]
|
|
if op in base64_enc_funcs:
|
|
inner_data = b64_to_bytes(data)
|
|
return base64_enc_funcs[op](
|
|
_set_protobuf_value(inner_data, *path[1:], value=value)
|
|
)
|
|
pb_dict = parse(data, include_wire_type=True)
|
|
pb_dict[op][1] = _set_protobuf_value(
|
|
pb_dict[op][1], *path[1:], value=value
|
|
)
|
|
return _make_protobuf(pb_dict)
|
|
|
|
|
|
def set_protobuf_value(data, *path, value):
|
|
'''Set a field's value in a raw protobuf structure
|
|
|
|
path is a list of field numbers and/or base64 encoding directives
|
|
|
|
The directives are
|
|
base64: normal base64 encoding with equal signs padding
|
|
base64s ("stripped"): no padding
|
|
base64p: %3D instead of = for padding
|
|
|
|
return new_protobuf, err'''
|
|
try:
|
|
new_protobuf = _set_protobuf_value(data, *path, value=value)
|
|
return new_protobuf.decode('ascii'), None
|
|
except Exception:
|
|
return None, traceback.format_exc()
|
|
|
|
|
|
def b64_to_bytes(data):
|
|
if isinstance(data, bytes):
|
|
data = data.decode('ascii')
|
|
data = data.replace("%3D", "=")
|
|
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
|