Subscriptions auto-checking system

This commit is contained in:
James Taylor 2019-08-14 00:12:24 -07:00
parent 38792081d4
commit 31a04555ae
2 changed files with 94 additions and 4 deletions

View File

@ -66,6 +66,12 @@ For security reasons, enabling this is not recommended.''',
1 to sort by newest''',
}),
('autocheck_subscriptions', {
'type': bool,
'default': 0,
'comment': '',
}),
('gather_googlevideo_domains', {
'type': bool,
'default': False,

View File

@ -12,6 +12,7 @@ import contextlib
import defusedxml.ElementTree
import urllib
import math
import secrets
import flask
from flask import request
@ -37,8 +38,8 @@ def open_database():
yt_channel_id text UNIQUE NOT NULL,
channel_name text NOT NULL,
time_last_checked integer,
muted integer DEFAULT 0,
upload_frequency integer
next_check_time integer,
muted integer DEFAULT 0
)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS videos (
id integer PRIMARY KEY,
@ -227,6 +228,11 @@ def _channels_with_tag(cursor, tag, order=False, exclude_muted=False, include_mu
return cursor.execute(statement, [tag]).fetchall()
def _schedule_checking(cursor, channel_id, next_check_time):
cursor.execute('''UPDATE subscribed_channels SET next_check_time = ? WHERE yt_channel_id = ?''', [int(next_check_time), channel_id])
def _is_muted(cursor, channel_id):
return bool(cursor.execute('''SELECT muted FROM subscribed_channels WHERE yt_channel_id=?''', [channel_id]).fetchone()[0])
units = {
'year': 31536000, # 365*24*3600
@ -257,6 +263,9 @@ except FileNotFoundError:
existing_thumbnails = set()
# --- Manual checking system. Rate limited in order to support very large numbers of channels to be checked ---
# Auto checking system plugs into this for convenience, though it doesn't really need the rate limiting
check_channels_queue = util.RateLimitedQueue()
checking_channels = set()
@ -273,9 +282,66 @@ def check_channel_worker():
for i in range(0,5):
gevent.spawn(check_channel_worker)
# ----------------------------
# --- Auto checking system ---
if settings.autocheck_subscriptions:
# job application format: dict with keys (channel_id, channel_name, next_check_time)
autocheck_job_application = gevent.queue.Queue() # only really meant to hold 1 item, just reusing gevent's wait and timeout machinery
autocheck_jobs = [] # list of dicts with the keys (channel_id, channel_name, next_check_time). Stores all the channels that need to be autochecked and when to check them
with open_database() as connection:
with connection as cursor:
now = time.time()
for row in cursor.execute('''SELECT yt_channel_id, channel_name, next_check_time FROM subscribed_channels WHERE next_check_time IS NOT NULL AND muted != 1''').fetchall():
if row[2] < now: # expired, check randomly within the 30 minutes
next_check_time = now + 3600*secrets.randbelow(60)/60
row = (row[0], row[1], next_check_time)
_schedule_checking(cursor, row[0], next_check_time)
autocheck_jobs.append({'channel_id': row[0], 'channel_name': row[1], 'next_check_time': row[2]})
def autocheck_dispatcher():
'''Scans the auto_check_list. Sleeps until the earliest job is due, then adds that channel to the checking queue above. Can be sent a new job through autocheck_job_application'''
while True:
if len(autocheck_jobs) == 0:
new_job = autocheck_job_application.get()
autocheck_jobs.append(new_job)
else:
earliest_job_index = min(range(0, len(autocheck_jobs)), key=lambda index: autocheck_jobs[index]['next_check_time']) # https://stackoverflow.com/a/11825864
earliest_job = autocheck_jobs[earliest_job_index]
time_until_earliest_job = earliest_job['next_check_time'] - time.time()
if time_until_earliest_job <= 0:
print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time']))
next_check_time = time.time() + 3600*secrets.randbelow(60)/60
with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
continue
# make sure it's not muted
if with_open_db(_is_muted, earliest_job['channel_id']):
del autocheck_jobs[earliest_job_index]
continue
try:
new_job = autocheck_job_application.get(timeout = time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs
except gevent.queue.Empty: # no new jobs, time to execute the earliest job
channel_names[earliest_job['channel_id']] = earliest_job['channel_name']
checking_channels.add(earliest_job['channel_id'])
check_channels_queue.put(earliest_job['channel_id'])
del autocheck_jobs[earliest_job_index]
else: # new job, add it to the list
autocheck_jobs.append(new_job)
gevent.spawn(autocheck_dispatcher)
# ----------------------------
def check_channels_if_necessary(channel_ids):
@ -305,13 +371,31 @@ def _get_upstream_videos(channel_id):
videos.append((channel_id, video_item['id'], video_item['title'], video_item['duration'], video_item['time_published'], video_item['description']))
if len(videos) == 0:
average_upload_period = 4*7*24*3600 # assume 1 month for channel with no videos
elif len(videos) < 5:
average_upload_period = int((time.time() - videos[len(videos)-1][4])/len(videos))
else:
average_upload_period = int((time.time() - videos[4][4])/5) # equivalent to averaging the time between videos for the last 5 videos
# calculate when to check next for auto checking
# add some quantization and randomness to make pattern analysis by Youtube slightly harder
quantized_upload_period = average_upload_period - (average_upload_period % (4*3600)) + 4*3600 # round up to nearest 4 hours
randomized_upload_period = quantized_upload_period*(1 + secrets.randbelow(50)/50*0.5) # randomly between 1x and 1.5x
next_check_delay = randomized_upload_period/5 # check at 5x the channel posting rate. might want to fine tune this number
next_check_time = int(time.time() + next_check_delay)
with open_database() as connection:
with connection as cursor:
cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description)
VALUES ((SELECT id FROM subscribed_channels WHERE yt_channel_id=?), ?, ?, ?, ?, ?)''', videos)
cursor.execute('''UPDATE subscribed_channels
SET time_last_checked = ?
WHERE yt_channel_id=?''', [int(time.time()), channel_id])
SET time_last_checked = ?, next_check_time = ?
WHERE yt_channel_id=?''', [int(time.time()), next_check_time, channel_id])
if settings.autocheck_subscriptions:
if not _is_muted(cursor, channel_id):
autocheck_job_application.put({'channel_id': channel_id, 'channel_name': channel_names[channel_id], 'next_check_time': next_check_time})
def check_all_channels():