From e38edc45facdf8b20777ea74005f549c838f881c Mon Sep 17 00:00:00 2001 From: Jake Walker Date: Sat, 8 Feb 2025 16:58:25 +0000 Subject: [PATCH] Add rekordbox tracklist parsing Co-authored-by: James Walker --- src/episode_file.py | 97 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 14 deletions(-) diff --git a/src/episode_file.py b/src/episode_file.py index 2c0825d..eb3f1be 100644 --- a/src/episode_file.py +++ b/src/episode_file.py @@ -1,11 +1,44 @@ +import re import xml.etree.ElementTree as ET from datetime import timedelta -from typing import Callable, List, Optional, Tuple +from typing import Callable, Final, List, Optional, Tuple, TypedDict from fastapi import HTTPException, UploadFile from models import PodcastEpisode +TRACK_LIST_HEADING: Final[str] = "**Track list**" + + +class TrackListItem(TypedDict): + title: Optional[str] + artist: Optional[str] + timestamp: Optional[timedelta] + + +def update_episode_tracklist( + episode: PodcastEpisode, track_list: List[TrackListItem] +) -> Optional[PodcastEpisode]: + if len(track_list) == 0: + return None + + description = ( + episode.description.split(TRACK_LIST_HEADING)[0].strip() + if episode.description is not None + else "" + ) + + description += f"\n\n{TRACK_LIST_HEADING}\n\n" + + sorted_tracks = sorted(track_list, key=lambda x: x["timestamp"].total_seconds()) + + for i, track in enumerate(sorted_tracks): + description += f"{i + 1}. {track.get('title', 'ID')} _- {track.get('artist', 'ID')} [{str(track.get('timestamp', timedelta(seconds=0)))}]_\n" + + episode.description = description.strip() + + return episode + async def djuced_track_list( episode: PodcastEpisode, file: UploadFile @@ -16,7 +49,7 @@ async def djuced_track_list( if root.tag != "recordEvents": return None - tracks = [] + tracks: List[TrackListItem] = [] for track in root.iter("track"): title = track.get("song") @@ -29,21 +62,54 @@ async def djuced_track_list( if len(title_segments) == 2: artist, title = title_segments - for interval in intervals: - tracks.append((float(interval.get("start")), title, artist)) + if len(intervals) > 0: + tracks.append( + { + "title": title, + "artist": artist, + "timestamp": timedelta(seconds=float(intervals[0].get("start"))), + } + ) - # sort by start time - tracks = sorted(tracks, key=lambda x: x[0], reverse=False) + return update_episode_tracklist(episode, tracks) - # update description - track_list_str = "" - for i, (t, title, artist) in enumerate(tracks): - time = timedelta(seconds=round(t)) - track_list_str += f"{i + 1}. {title} _- {artist} [{time}]_\n" +async def rekordbox_track_list( + episode: PodcastEpisode, file: UploadFile +) -> Optional[PodcastEpisode]: + if not file.filename.endswith(".cue"): + return None - episode.description += "\n\n**Track list**\n\n" + track_list_str - return episode + tracks: List[TrackListItem] = [] + current_track: TrackListItem = {} + + content = (await file.read()).decode("utf-8") + + for line in content.splitlines(): + line = line.strip() + if line.startswith("TITLE"): + title = re.search(r'"(.*?)"', line).group(1) + title = re.sub( + r"\s*\((Clean Extended|Clean|Extended)\)", "", title + ) # Remove specific suffixes + current_track["title"] = title + elif line.startswith("PERFORMER"): + current_track["artist"] = re.search(r'"(.*?)"', line).group(1) + elif line.startswith("INDEX 01"): + time_match = re.search(r"INDEX 01 (\d{2}):(\d{2}):(\d{2})", line) + if time_match: + hours = int(time_match.group(1)) + minutes = int(time_match.group(2)) + seconds = int(time_match.group(3)) + current_track["timestamp"] = timedelta( + hours=hours, minutes=minutes, seconds=seconds + ) + tracks.append( + current_track.copy() + ) # Ensure current track is added properly + current_track = {} + + return update_episode_tracklist(episode, tracks) # list of file processors @@ -51,7 +117,10 @@ async def djuced_track_list( # the second tuple item is the function to run which should return none if the file was not able to be processed, otherwise a mutated episode object processors: List[ Tuple[str, Callable[[PodcastEpisode, UploadFile], Optional[PodcastEpisode]]] -] = [("text/xml", djuced_track_list)] +] = [ + ("text/xml", djuced_track_list), + ("application/octet-stream", rekordbox_track_list), +] async def process_additional_episode_upload(