add serato tracklist parsing
This commit is contained in:
parent
80d81712c7
commit
44827ef205
1 changed files with 70 additions and 13 deletions
|
@ -16,6 +16,35 @@ class TrackListItem(TypedDict):
|
||||||
timestamp: Optional[timedelta]
|
timestamp: Optional[timedelta]
|
||||||
|
|
||||||
|
|
||||||
|
def clean_track_title(title: str) -> str:
|
||||||
|
# remove track suffixes
|
||||||
|
new_title = re.sub(
|
||||||
|
r"\s*\((Clean Extended|Clean|Extended.*|Original Mix|Radio Edit|Free .*)\)",
|
||||||
|
"",
|
||||||
|
title,
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
)
|
||||||
|
new_title = re.sub(
|
||||||
|
r"\s*\[(Free .*|.*bandcamp.*|extended.*)\]",
|
||||||
|
"",
|
||||||
|
new_title,
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# remove other parts of title
|
||||||
|
new_title = re.sub(r"out now", "", new_title, flags=re.IGNORECASE)
|
||||||
|
new_title = re.sub(
|
||||||
|
r"^(Premiere|Free\s\w*|premear|\w*\sPremiere):\s",
|
||||||
|
"",
|
||||||
|
new_title,
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# remove multiple spaces
|
||||||
|
new_title = re.sub(" +", " ", new_title)
|
||||||
|
return new_title.strip()
|
||||||
|
|
||||||
|
|
||||||
def update_episode_tracklist(
|
def update_episode_tracklist(
|
||||||
episode: PodcastEpisode, track_list: List[TrackListItem]
|
episode: PodcastEpisode, track_list: List[TrackListItem]
|
||||||
) -> Optional[PodcastEpisode]:
|
) -> Optional[PodcastEpisode]:
|
||||||
|
@ -31,16 +60,32 @@ def update_episode_tracklist(
|
||||||
description += f"\n\n{TRACK_LIST_HEADING}\n\n"
|
description += f"\n\n{TRACK_LIST_HEADING}\n\n"
|
||||||
|
|
||||||
sorted_tracks = sorted(track_list, key=lambda x: x["timestamp"].total_seconds())
|
sorted_tracks = sorted(track_list, key=lambda x: x["timestamp"].total_seconds())
|
||||||
|
id_count = 1
|
||||||
|
|
||||||
for i, track in enumerate(sorted_tracks):
|
for i, track in enumerate(sorted_tracks):
|
||||||
description += f"{i + 1}. {track.get('title', 'ID')} _- {track.get('artist', 'ID')} [{str(track.get('timestamp', timedelta(seconds=0)))}]_\n"
|
title_str = track.get("title")
|
||||||
|
artist_str = track.get("artist", "Unknown Artist")
|
||||||
|
|
||||||
|
if title_str is None:
|
||||||
|
title_str = f"ID{id_count}"
|
||||||
|
id_count += 1
|
||||||
|
else:
|
||||||
|
# if the title looks like it contains the artist too, overwrite the existing artist
|
||||||
|
title_segments = title_str.split(" - ")
|
||||||
|
if len(title_segments) == 2:
|
||||||
|
artist_str, title_str = title_segments
|
||||||
|
|
||||||
|
# clean up the title
|
||||||
|
title_str = clean_track_title(title_str)
|
||||||
|
|
||||||
|
description += f"{i + 1}. {title_str} _- {artist_str} [{str(track.get('timestamp', timedelta(seconds=0)))}]_\n"
|
||||||
|
|
||||||
episode.description = description.strip()
|
episode.description = description.strip()
|
||||||
|
|
||||||
return episode
|
return episode
|
||||||
|
|
||||||
|
|
||||||
async def djuced_track_list(
|
async def djuced_tracklist(
|
||||||
episode: PodcastEpisode, file: UploadFile
|
episode: PodcastEpisode, file: UploadFile
|
||||||
) -> Optional[PodcastEpisode]:
|
) -> Optional[PodcastEpisode]:
|
||||||
root = ET.fromstring(await file.read())
|
root = ET.fromstring(await file.read())
|
||||||
|
@ -57,11 +102,6 @@ async def djuced_track_list(
|
||||||
|
|
||||||
intervals = track.findall("interval")
|
intervals = track.findall("interval")
|
||||||
|
|
||||||
# if the title looks like it contains the artist too, overwrite the existing artist
|
|
||||||
title_segments = title.split(" - ")
|
|
||||||
if len(title_segments) == 2:
|
|
||||||
artist, title = title_segments
|
|
||||||
|
|
||||||
if len(intervals) > 0:
|
if len(intervals) > 0:
|
||||||
tracks.append(
|
tracks.append(
|
||||||
{
|
{
|
||||||
|
@ -74,7 +114,7 @@ async def djuced_track_list(
|
||||||
return update_episode_tracklist(episode, tracks)
|
return update_episode_tracklist(episode, tracks)
|
||||||
|
|
||||||
|
|
||||||
async def rekordbox_track_list(
|
async def rekordbox_tracklist(
|
||||||
episode: PodcastEpisode, file: UploadFile
|
episode: PodcastEpisode, file: UploadFile
|
||||||
) -> Optional[PodcastEpisode]:
|
) -> Optional[PodcastEpisode]:
|
||||||
if not file.filename.endswith(".cue"):
|
if not file.filename.endswith(".cue"):
|
||||||
|
@ -89,9 +129,6 @@ async def rekordbox_track_list(
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line.startswith("TITLE"):
|
if line.startswith("TITLE"):
|
||||||
title = re.search(r'"(.*?)"', line).group(1)
|
title = re.search(r'"(.*?)"', line).group(1)
|
||||||
title = re.sub(
|
|
||||||
r"\s*\((Clean Extended|Clean|Extended)\)", "", title
|
|
||||||
) # Remove specific suffixes
|
|
||||||
current_track["title"] = title
|
current_track["title"] = title
|
||||||
elif line.startswith("PERFORMER"):
|
elif line.startswith("PERFORMER"):
|
||||||
current_track["artist"] = re.search(r'"(.*?)"', line).group(1)
|
current_track["artist"] = re.search(r'"(.*?)"', line).group(1)
|
||||||
|
@ -112,14 +149,34 @@ async def rekordbox_track_list(
|
||||||
return update_episode_tracklist(episode, tracks)
|
return update_episode_tracklist(episode, tracks)
|
||||||
|
|
||||||
|
|
||||||
|
async def serato_m3u_tracklist(
|
||||||
|
episode: PodcastEpisode, file: UploadFile
|
||||||
|
) -> Optional[PodcastEpisode]:
|
||||||
|
file_content = (await file.read()).decode("utf-8")
|
||||||
|
|
||||||
|
tracks: List[TrackListItem] = []
|
||||||
|
|
||||||
|
for line in file_content.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith("#EXTINF:"):
|
||||||
|
parts = ",".join(line.split(",")[1:]).split(" - ")
|
||||||
|
artist = parts[0]
|
||||||
|
title = " - ".join(parts[1:])
|
||||||
|
|
||||||
|
tracks.append(TrackListItem(title=title, artist=artist, timestamp=None))
|
||||||
|
|
||||||
|
return update_episode_tracklist(episode, tracks)
|
||||||
|
|
||||||
|
|
||||||
# list of file processors
|
# list of file processors
|
||||||
# these are processed in order and only run if the file content type matches the first tuple string
|
# these are processed in order and only run if the file content type matches the first tuple string
|
||||||
# the second tuple item is the function to run which should return none if the file was not able to be processed, otherwise a mutated episode object
|
# the second tuple item is the function to run which should return none if the file was not able to be processed, otherwise a mutated episode object
|
||||||
processors: List[
|
processors: List[
|
||||||
Tuple[str, Callable[[PodcastEpisode, UploadFile], Optional[PodcastEpisode]]]
|
Tuple[str, Callable[[PodcastEpisode, UploadFile], Optional[PodcastEpisode]]]
|
||||||
] = [
|
] = [
|
||||||
("text/xml", djuced_track_list),
|
("text/xml", djuced_tracklist),
|
||||||
("application/octet-stream", rekordbox_track_list),
|
("application/octet-stream", rekordbox_tracklist),
|
||||||
|
("audio/mpegurl", serato_m3u_tracklist),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue