fix(playlists): make playlist parsing robust against filename and formatting issues
All checks were successful
CI / test (push) Successful in 53s
All checks were successful
CI / test (push) Successful in 53s
- Use glob lookup to find playlist files even with trailing spaces in filenames - Sanitize lines (strip whitespace) before JSON parsing to ignore trailing spaces/empty lines - Handle JSONDecodeError gracefully to prevent 500 errors from corrupt entries - Return empty list on FileNotFoundError in read_playlist instead of crashing - Extract _find_playlist_path and _parse_playlist_lines helpers for reuse
This commit is contained in:
@@ -8,6 +8,7 @@ import html
|
|||||||
import gevent
|
import gevent
|
||||||
import urllib
|
import urllib
|
||||||
import math
|
import math
|
||||||
|
import glob
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
from flask import request
|
from flask import request
|
||||||
@@ -16,11 +17,34 @@ playlists_directory = os.path.join(settings.data_dir, "playlists")
|
|||||||
thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails")
|
thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails")
|
||||||
|
|
||||||
|
|
||||||
|
def _find_playlist_path(name):
|
||||||
|
"""Find playlist file robustly, handling trailing spaces in filenames"""
|
||||||
|
name = name.strip()
|
||||||
|
pattern = os.path.join(playlists_directory, name + "*.txt")
|
||||||
|
files = glob.glob(pattern)
|
||||||
|
return files[0] if files else os.path.join(playlists_directory, name + ".txt")
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_playlist_lines(data):
|
||||||
|
"""Parse playlist data lines robustly, skipping empty/malformed entries"""
|
||||||
|
videos = []
|
||||||
|
for line in data.splitlines():
|
||||||
|
clean_line = line.strip()
|
||||||
|
if not clean_line:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
videos.append(json.loads(clean_line))
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
print('Corrupt playlist entry: ' + clean_line)
|
||||||
|
return videos
|
||||||
|
|
||||||
|
|
||||||
def video_ids_in_playlist(name):
|
def video_ids_in_playlist(name):
|
||||||
try:
|
try:
|
||||||
with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
|
playlist_path = _find_playlist_path(name)
|
||||||
|
with open(playlist_path, 'r', encoding='utf-8') as file:
|
||||||
videos = file.read()
|
videos = file.read()
|
||||||
return set(json.loads(video)['id'] for video in videos.splitlines())
|
return set(json.loads(line.strip())['id'] for line in videos.splitlines() if line.strip())
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
@@ -29,7 +53,8 @@ def add_to_playlist(name, video_info_list):
|
|||||||
os.makedirs(playlists_directory, exist_ok=True)
|
os.makedirs(playlists_directory, exist_ok=True)
|
||||||
ids = video_ids_in_playlist(name)
|
ids = video_ids_in_playlist(name)
|
||||||
missing_thumbnails = []
|
missing_thumbnails = []
|
||||||
with open(os.path.join(playlists_directory, name + ".txt"), "a", encoding='utf-8') as file:
|
playlist_path = _find_playlist_path(name)
|
||||||
|
with open(playlist_path, "a", encoding='utf-8') as file:
|
||||||
for info in video_info_list:
|
for info in video_info_list:
|
||||||
id = json.loads(info)['id']
|
id = json.loads(info)['id']
|
||||||
if id not in ids:
|
if id not in ids:
|
||||||
@@ -67,20 +92,14 @@ def add_extra_info_to_videos(videos, playlist_name):
|
|||||||
|
|
||||||
def read_playlist(name):
|
def read_playlist(name):
|
||||||
'''Returns a list of videos for the given playlist name'''
|
'''Returns a list of videos for the given playlist name'''
|
||||||
playlist_path = os.path.join(playlists_directory, name + '.txt')
|
playlist_path = _find_playlist_path(name)
|
||||||
with open(playlist_path, 'r', encoding='utf-8') as f:
|
try:
|
||||||
data = f.read()
|
with open(playlist_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = f.read()
|
||||||
|
except FileNotFoundError:
|
||||||
|
return []
|
||||||
|
|
||||||
videos = []
|
return _parse_playlist_lines(data)
|
||||||
videos_json = data.splitlines()
|
|
||||||
for video_json in videos_json:
|
|
||||||
try:
|
|
||||||
info = json.loads(video_json)
|
|
||||||
videos.append(info)
|
|
||||||
except json.decoder.JSONDecodeError:
|
|
||||||
if not video_json.strip() == '':
|
|
||||||
print('Corrupt playlist video entry: ' + video_json)
|
|
||||||
return videos
|
|
||||||
|
|
||||||
|
|
||||||
def get_local_playlist_videos(name, offset=0, amount=50):
|
def get_local_playlist_videos(name, offset=0, amount=50):
|
||||||
@@ -102,14 +121,21 @@ def get_playlist_names():
|
|||||||
|
|
||||||
def remove_from_playlist(name, video_info_list):
|
def remove_from_playlist(name, video_info_list):
|
||||||
ids = [json.loads(video)['id'] for video in video_info_list]
|
ids = [json.loads(video)['id'] for video in video_info_list]
|
||||||
with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
|
playlist_path = _find_playlist_path(name)
|
||||||
|
with open(playlist_path, 'r', encoding='utf-8') as file:
|
||||||
videos = file.read()
|
videos = file.read()
|
||||||
videos_in = videos.splitlines()
|
videos_in = videos.splitlines()
|
||||||
videos_out = []
|
videos_out = []
|
||||||
for video in videos_in:
|
for video in videos_in:
|
||||||
if json.loads(video)['id'] not in ids:
|
clean = video.strip()
|
||||||
videos_out.append(video)
|
if not clean:
|
||||||
with open(os.path.join(playlists_directory, name + ".txt"), 'w', encoding='utf-8') as file:
|
continue
|
||||||
|
try:
|
||||||
|
if json.loads(clean)['id'] not in ids:
|
||||||
|
videos_out.append(clean)
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
with open(playlist_path, 'w', encoding='utf-8') as file:
|
||||||
file.write("\n".join(videos_out) + "\n")
|
file.write("\n".join(videos_out) + "\n")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user