76 lines
2.4 KiB
Python
76 lines
2.4 KiB
Python
import xml.etree.ElementTree as ET
|
|
from datetime import timedelta
|
|
from typing import Callable, List, Optional, Tuple
|
|
|
|
from fastapi import HTTPException, UploadFile
|
|
|
|
from models import PodcastEpisode
|
|
|
|
|
|
async def djuced_track_list(
|
|
episode: PodcastEpisode, file: UploadFile
|
|
) -> Optional[PodcastEpisode]:
|
|
root = ET.fromstring(await file.read())
|
|
|
|
# if this doesn't look like a djuced track list xml, return nothing
|
|
if root.tag != "recordEvents":
|
|
return None
|
|
|
|
tracks = []
|
|
|
|
for track in root.iter("track"):
|
|
title = track.get("song")
|
|
artist = track.get("artist")
|
|
|
|
intervals = track.findall("interval")
|
|
|
|
# if the title looks like it contains the artist too, overwrite the existing artist
|
|
title_segments = title.split(" - ")
|
|
if len(title_segments) == 2:
|
|
artist, title = title_segments
|
|
|
|
for interval in intervals:
|
|
tracks.append((float(interval.get("start")), title, artist))
|
|
|
|
# sort by start time
|
|
tracks = sorted(tracks, key=lambda x: x[0], reverse=False)
|
|
|
|
# update description
|
|
track_list_str = ""
|
|
|
|
for i, (t, title, artist) in enumerate(tracks):
|
|
time = timedelta(seconds=round(t))
|
|
track_list_str += f"{i + 1}. {title} _- {artist} [{time}]_\n"
|
|
|
|
episode.description += "\n\n**Track list**\n\n" + track_list_str
|
|
return episode
|
|
|
|
|
|
# list of file processors
|
|
# these are processed in order and only run if the file content type matches the first tuple string
|
|
# the second tuple item is the function to run which should return none if the file was not able to be processed, otherwise a mutated episode object
|
|
processors: List[
|
|
Tuple[str, Callable[[PodcastEpisode, UploadFile], Optional[PodcastEpisode]]]
|
|
] = [("text/xml", djuced_track_list)]
|
|
|
|
|
|
async def process_additional_episode_upload(
|
|
episode: PodcastEpisode, file: UploadFile
|
|
) -> PodcastEpisode:
|
|
for processor_content_type, do_process in processors:
|
|
if processor_content_type != file.content_type:
|
|
continue
|
|
|
|
try:
|
|
result = await do_process(episode, file)
|
|
|
|
if result is not None:
|
|
return result
|
|
except Exception as e:
|
|
print(f"Failed to process using {do_process.__name__}: {e}")
|
|
continue
|
|
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Unable to process additional episode upload ({file.content_type})",
|
|
)
|