Overhaul refresh system, make it asynchronous
This commit is contained in:
parent
f5c76462d7
commit
27ee2990e9
@ -26,7 +26,7 @@ database_path = os.path.join(settings.data_dir, "subscriptions.sqlite")
|
||||
def open_database():
|
||||
if not os.path.exists(settings.data_dir):
|
||||
os.makedirs(settings.data_dir)
|
||||
connection = sqlite3.connect(database_path)
|
||||
connection = sqlite3.connect(database_path, check_same_thread=False)
|
||||
|
||||
# Create tables if they don't exist
|
||||
try:
|
||||
@ -172,17 +172,75 @@ def youtube_timestamp_to_posix(dumb_timestamp):
|
||||
unit = unit[:-1] # remove s from end
|
||||
return now - number*units[unit]
|
||||
|
||||
|
||||
try:
|
||||
existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory))
|
||||
except FileNotFoundError:
|
||||
existing_thumbnails = set()
|
||||
|
||||
|
||||
thumbnails_queue = util.RateLimitedQueue()
|
||||
check_channels_queue = util.RateLimitedQueue()
|
||||
|
||||
|
||||
# Use this to mark a thumbnail acceptable to be retrieved at the request of the browser
|
||||
# can't simply check if it's in the queue because items are removed when the download starts, not when it finishes
|
||||
downloading_thumbnails = set()
|
||||
def download_thumbnails(thumbnails_directory, thumbnails):
|
||||
try:
|
||||
g = gevent.spawn(util.download_thumbnails, thumbnails_directory, thumbnails)
|
||||
g.join()
|
||||
finally:
|
||||
downloading_thumbnails.difference_update(thumbnails)
|
||||
|
||||
checking_channels = set()
|
||||
|
||||
# Just to use for printing channel checking status to console without opening database
|
||||
channel_names = dict()
|
||||
|
||||
def download_thumbnail_worker():
|
||||
while True:
|
||||
video_id = thumbnails_queue.get()
|
||||
try:
|
||||
success = util.download_thumbnail(thumbnails_directory, video_id)
|
||||
if success:
|
||||
existing_thumbnails.add(video_id)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
downloading_thumbnails.remove(video_id)
|
||||
|
||||
def check_channel_worker():
|
||||
while True:
|
||||
channel_id = check_channels_queue.get()
|
||||
try:
|
||||
_get_upstream_videos(channel_id)
|
||||
finally:
|
||||
checking_channels.remove(channel_id)
|
||||
|
||||
for i in range(0,5):
|
||||
gevent.spawn(download_thumbnail_worker)
|
||||
gevent.spawn(check_channel_worker)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def download_thumbnails_if_necessary(thumbnails):
|
||||
for video_id in thumbnails:
|
||||
if video_id not in existing_thumbnails and video_id not in downloading_thumbnails:
|
||||
downloading_thumbnails.add(video_id)
|
||||
thumbnails_queue.put(video_id)
|
||||
|
||||
def check_channels_if_necessary(channel_ids):
|
||||
for channel_id in channel_ids:
|
||||
if channel_id not in checking_channels:
|
||||
checking_channels.add(channel_id)
|
||||
check_channels_queue.put(channel_id)
|
||||
|
||||
|
||||
|
||||
def _get_upstream_videos(channel_id):
|
||||
try:
|
||||
print("Checking channel: " + channel_names[channel_id])
|
||||
except KeyError:
|
||||
print("Checking channel " + channel_id)
|
||||
|
||||
videos = []
|
||||
|
||||
json_channel_videos = channel.get_grid_items(channel.get_channel_tab(channel_id)[1]['response'])
|
||||
@ -190,24 +248,57 @@ def _get_upstream_videos(channel_id):
|
||||
info = yt_data_extract.renderer_info(json_video['gridVideoRenderer'])
|
||||
if 'description' not in info:
|
||||
info['description'] = ''
|
||||
info['time_published'] = youtube_timestamp_to_posix(info['published']) - i # subtract a few seconds off the videos so they will be in the right order
|
||||
videos.append(info)
|
||||
try:
|
||||
info['time_published'] = youtube_timestamp_to_posix(info['published']) - i # subtract a few seconds off the videos so they will be in the right order
|
||||
except KeyError:
|
||||
print(info)
|
||||
videos.append((channel_id, info['id'], info['title'], info['duration'], info['time_published'], info['description']))
|
||||
|
||||
try:
|
||||
existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory))
|
||||
except FileNotFoundError:
|
||||
existing_thumbnails = set()
|
||||
missing_thumbnails = set(video['id'] for video in videos) - existing_thumbnails
|
||||
downloading_thumbnails.update(missing_thumbnails)
|
||||
gevent.spawn(download_thumbnails, thumbnails_directory, missing_thumbnails)
|
||||
now = time.time()
|
||||
download_thumbnails_if_necessary(video[1] for video in videos if (now - video[4]) < 30*24*3600) # Don't download thumbnails from videos older than a month
|
||||
|
||||
return videos
|
||||
with open_database() as connection:
|
||||
with connection as cursor:
|
||||
cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description)
|
||||
VALUES ((SELECT id FROM subscribed_channels WHERE yt_channel_id=?), ?, ?, ?, ?, ?)''', videos)
|
||||
cursor.execute('''UPDATE subscribed_channels
|
||||
SET time_last_checked = ?
|
||||
WHERE yt_channel_id=?''', [int(time.time()), channel_id])
|
||||
|
||||
|
||||
def check_all_channels():
|
||||
with open_database() as connection:
|
||||
with connection as cursor:
|
||||
channel_id_name_list = cursor.execute('''SELECT yt_channel_id, channel_name FROM subscribed_channels''').fetchall()
|
||||
|
||||
channel_names.update(channel_id_name_list)
|
||||
check_channels_if_necessary([item[0] for item in channel_id_name_list])
|
||||
|
||||
|
||||
def check_tags(tags):
|
||||
channel_id_name_list = []
|
||||
with open_database() as connection:
|
||||
with connection as cursor:
|
||||
for tag in tags:
|
||||
channel_id_name_list += cursor.execute('''SELECT yt_channel_id, channel_name
|
||||
FROM subscribed_channels
|
||||
WHERE subscribed_channels.id IN (
|
||||
SELECT tag_associations.sql_channel_id FROM tag_associations WHERE tag=?
|
||||
)''', [tag]).fetchall()
|
||||
channel_names.update(channel_id_name_list)
|
||||
check_channels_if_necessary([item[0] for item in channel_id_name_list])
|
||||
|
||||
|
||||
def check_specific_channels(channel_ids):
|
||||
with open_database() as connection:
|
||||
with connection as cursor:
|
||||
for channel_id in channel_ids:
|
||||
channel_id_name_list += cursor.execute('''SELECT yt_channel_id, channel_name
|
||||
FROM subscribed_channels
|
||||
WHERE yt_channel_id=?''', [channel_id]).fetchall()
|
||||
channel_names.update(channel_id_name_list)
|
||||
check_channels_if_necessary(channel_ids)
|
||||
|
||||
|
||||
|
||||
|
||||
@ -408,15 +499,18 @@ def post_subscriptions_page(env, start_response):
|
||||
_unsubscribe(params['channel_id'])
|
||||
|
||||
elif action == 'refresh':
|
||||
with open_database() as connection:
|
||||
with connection as cursor:
|
||||
for sql_channel_id, yt_channel_id in cursor.execute('''SELECT id, yt_channel_id FROM subscribed_channels''').fetchall():
|
||||
db_videos = ( (sql_channel_id, info['id'], info['title'], info['duration'], info['time_published'], info['description']) for info in _get_upstream_videos(yt_channel_id) )
|
||||
cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description) VALUES (?, ?, ?, ?, ?, ?)''', db_videos)
|
||||
type = params['type'][0]
|
||||
if type == 'all':
|
||||
check_all_channels()
|
||||
elif type == 'tag':
|
||||
check_tags(params['tag_name'])
|
||||
elif type == 'channel':
|
||||
check_specific_channels(params['channel_id'])
|
||||
else:
|
||||
start_response('400 Bad Request', ())
|
||||
return b'400 Bad Request'
|
||||
|
||||
cursor.execute('''UPDATE subscribed_channels SET time_last_checked = ?''', ( int(time.time()), ) )
|
||||
|
||||
start_response('303 See Other', [('Location', util.URL_ORIGIN + '/subscriptions'),] )
|
||||
start_response('204 No Content', ())
|
||||
return b''
|
||||
else:
|
||||
start_response('400 Bad Request', ())
|
||||
|
@ -7,6 +7,8 @@ import re
|
||||
import time
|
||||
import os
|
||||
import gevent
|
||||
import gevent.queue
|
||||
import gevent.lock
|
||||
|
||||
# The trouble with the requests library: It ships its own certificate bundle via certifi
|
||||
# instead of using the system certificate store, meaning self-signed certificates
|
||||
@ -176,6 +178,53 @@ desktop_ua = (('User-Agent', desktop_user_agent),)
|
||||
|
||||
|
||||
|
||||
class RateLimitedQueue(gevent.queue.Queue):
|
||||
''' Does initial_burst (def. 30) at first, then alternates between waiting waiting_period (def. 5) seconds and doing subsequent_bursts (def. 10) queries. After 5 seconds with nothing left in the queue, resets rate limiting. '''
|
||||
|
||||
def __init__(self, initial_burst=30, waiting_period=5, subsequent_bursts=10):
|
||||
self.initial_burst = initial_burst
|
||||
self.waiting_period = waiting_period
|
||||
self.subsequent_bursts = subsequent_bursts
|
||||
|
||||
self.count_since_last_wait = 0
|
||||
self.surpassed_initial = False
|
||||
|
||||
self.lock = gevent.lock.BoundedSemaphore(1)
|
||||
self.currently_empty = False
|
||||
self.empty_start = 0
|
||||
gevent.queue.Queue.__init__(self)
|
||||
|
||||
|
||||
def get(self):
|
||||
self.lock.acquire() # blocks if another greenlet currently has the lock
|
||||
if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial:
|
||||
gevent.sleep(self.waiting_period)
|
||||
self.count_since_last_wait = 0
|
||||
|
||||
elif self.count_since_last_wait >= self.initial_burst and not self.surpassed_initial:
|
||||
self.surpassed_initial = True
|
||||
gevent.sleep(self.waiting_period)
|
||||
self.count_since_last_wait = 0
|
||||
|
||||
self.count_since_last_wait += 1
|
||||
|
||||
if not self.currently_empty and self.empty():
|
||||
self.currently_empty = True
|
||||
self.empty_start = time.monotonic()
|
||||
|
||||
item = gevent.queue.Queue.get(self) # blocks when nothing left
|
||||
|
||||
if self.currently_empty:
|
||||
if time.monotonic() - self.empty_start >= self.waiting_period:
|
||||
self.count_since_last_wait = 0
|
||||
self.surpassed_initial = False
|
||||
|
||||
self.currently_empty = False
|
||||
|
||||
self.lock.release()
|
||||
|
||||
return item
|
||||
|
||||
|
||||
|
||||
def download_thumbnail(save_directory, video_id):
|
||||
@ -185,14 +234,15 @@ def download_thumbnail(save_directory, video_id):
|
||||
thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)
|
||||
except urllib.error.HTTPError as e:
|
||||
print("Failed to download thumbnail for " + video_id + ": " + str(e))
|
||||
return
|
||||
return False
|
||||
try:
|
||||
f = open(save_location, 'wb')
|
||||
except FileNotFoundError:
|
||||
os.makedirs(save_directory)
|
||||
os.makedirs(save_directory, exist_ok = True)
|
||||
f = open(save_location, 'wb')
|
||||
f.write(thumbnail)
|
||||
f.close()
|
||||
return True
|
||||
|
||||
def download_thumbnails(save_directory, ids):
|
||||
if not isinstance(ids, (list, tuple)):
|
||||
|
@ -56,6 +56,7 @@ $items
|
||||
<form method="POST" class="refresh-all">
|
||||
<input type="submit" value="Check All">
|
||||
<input type="hidden" name="action" value="refresh">
|
||||
<input type="hidden" name="type" value="all">
|
||||
</form>
|
||||
</div>
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user