Add rekordbox tracklist parsing
All checks were successful
ci/woodpecker/push/build Pipeline was successful

Co-authored-by: James Walker <james@noreply.git.jakew.me>
This commit is contained in:
Jake Walker 2025-02-08 16:58:25 +00:00
parent 93bde35cbf
commit e38edc45fa
No known key found for this signature in database

View file

@ -1,11 +1,44 @@
import re
import xml.etree.ElementTree as ET
from datetime import timedelta
from typing import Callable, List, Optional, Tuple
from typing import Callable, Final, List, Optional, Tuple, TypedDict
from fastapi import HTTPException, UploadFile
from models import PodcastEpisode
TRACK_LIST_HEADING: Final[str] = "**Track list**"
class TrackListItem(TypedDict):
title: Optional[str]
artist: Optional[str]
timestamp: Optional[timedelta]
def update_episode_tracklist(
episode: PodcastEpisode, track_list: List[TrackListItem]
) -> Optional[PodcastEpisode]:
if len(track_list) == 0:
return None
description = (
episode.description.split(TRACK_LIST_HEADING)[0].strip()
if episode.description is not None
else ""
)
description += f"\n\n{TRACK_LIST_HEADING}\n\n"
sorted_tracks = sorted(track_list, key=lambda x: x["timestamp"].total_seconds())
for i, track in enumerate(sorted_tracks):
description += f"{i + 1}. {track.get('title', 'ID')} _- {track.get('artist', 'ID')} [{str(track.get('timestamp', timedelta(seconds=0)))}]_\n"
episode.description = description.strip()
return episode
async def djuced_track_list(
episode: PodcastEpisode, file: UploadFile
@ -16,7 +49,7 @@ async def djuced_track_list(
if root.tag != "recordEvents":
return None
tracks = []
tracks: List[TrackListItem] = []
for track in root.iter("track"):
title = track.get("song")
@ -29,21 +62,54 @@ async def djuced_track_list(
if len(title_segments) == 2:
artist, title = title_segments
for interval in intervals:
tracks.append((float(interval.get("start")), title, artist))
if len(intervals) > 0:
tracks.append(
{
"title": title,
"artist": artist,
"timestamp": timedelta(seconds=float(intervals[0].get("start"))),
}
)
# sort by start time
tracks = sorted(tracks, key=lambda x: x[0], reverse=False)
return update_episode_tracklist(episode, tracks)
# update description
track_list_str = ""
for i, (t, title, artist) in enumerate(tracks):
time = timedelta(seconds=round(t))
track_list_str += f"{i + 1}. {title} _- {artist} [{time}]_\n"
async def rekordbox_track_list(
episode: PodcastEpisode, file: UploadFile
) -> Optional[PodcastEpisode]:
if not file.filename.endswith(".cue"):
return None
episode.description += "\n\n**Track list**\n\n" + track_list_str
return episode
tracks: List[TrackListItem] = []
current_track: TrackListItem = {}
content = (await file.read()).decode("utf-8")
for line in content.splitlines():
line = line.strip()
if line.startswith("TITLE"):
title = re.search(r'"(.*?)"', line).group(1)
title = re.sub(
r"\s*\((Clean Extended|Clean|Extended)\)", "", title
) # Remove specific suffixes
current_track["title"] = title
elif line.startswith("PERFORMER"):
current_track["artist"] = re.search(r'"(.*?)"', line).group(1)
elif line.startswith("INDEX 01"):
time_match = re.search(r"INDEX 01 (\d{2}):(\d{2}):(\d{2})", line)
if time_match:
hours = int(time_match.group(1))
minutes = int(time_match.group(2))
seconds = int(time_match.group(3))
current_track["timestamp"] = timedelta(
hours=hours, minutes=minutes, seconds=seconds
)
tracks.append(
current_track.copy()
) # Ensure current track is added properly
current_track = {}
return update_episode_tracklist(episode, tracks)
# list of file processors
@ -51,7 +117,10 @@ async def djuced_track_list(
# the second tuple item is the function to run which should return none if the file was not able to be processed, otherwise a mutated episode object
processors: List[
Tuple[str, Callable[[PodcastEpisode, UploadFile], Optional[PodcastEpisode]]]
] = [("text/xml", djuced_track_list)]
] = [
("text/xml", djuced_track_list),
("application/octet-stream", rekordbox_track_list),
]
async def process_additional_episode_upload(