From dc88dc49e89d91f1ad9ede9beec12d7f66ed5815 Mon Sep 17 00:00:00 2001 From: Jake Walker Date: Fri, 17 Jan 2025 16:48:51 +0000 Subject: [PATCH] add scripts --- scripts/import-episode.py | 90 ++++++++++++++++++++++++++++++++++++++ scripts/update-pub-date.py | 50 +++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 scripts/import-episode.py create mode 100644 scripts/update-pub-date.py diff --git a/scripts/import-episode.py b/scripts/import-episode.py new file mode 100644 index 0000000..1664dc7 --- /dev/null +++ b/scripts/import-episode.py @@ -0,0 +1,90 @@ +import argparse +import hashlib +import shutil +import sys +from pathlib import Path + +import ffmpeg +import structlog +from sqlmodel import Session, select + +# Add the src directory to the system path +sys.path.append(str(Path(__file__).resolve().parent.parent / "src")) + +import models as models +from settings import settings + +log = structlog.get_logger() + + +def import_episode(filename: Path, podcast_id: str, process: bool, move: bool = True): + if process: + raise NotImplementedError("Importing with processing is not implemented") + + if filename.suffix != ".m4a" and not process: + log.error("Input file must be in an m4a container if not processing") + return + + with Session(models.engine) as session: + podcast = session.exec( + select(models.Podcast).where(models.Podcast.id == podcast_id) + ).first() + + if podcast is None: + log.error("Failed importing episode, podcast does not exist.") + return + + episode = models.PodcastEpisode( + name=filename.stem, file_size=0, file_hash="", podcast_id=podcast.id + ) + + episode_filename = settings.directory / f"{episode.id}.m4a" + + if move: + log.info("Moving episode to %s...", episode_filename) + shutil.move(filename, episode_filename) + else: + log.info("Copying episode to %s...", episode_filename) + shutil.copyfile(filename, episode_filename) + + probe = ffmpeg.probe(str(episode_filename)) + stream = next( + (stream for stream in probe["streams"] if stream["codec_type"] == "audio"), + None, + ) + + file_hash = hashlib.sha256() + with open(episode_filename, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + file_hash.update(byte_block) + + episode.duration = ( + float(stream["duration"]) + if stream is not None and "duration" in stream + else None + ) + episode.file_hash = file_hash.hexdigest() + episode.file_size = episode_filename.stat().st_size + + session.add(episode) + session.commit() + + log.info("Imported episode as %s", episode.id) + + +def main(): + parser = argparse.ArgumentParser( + prog="import-episode.py", + description="Import an episode", + ) + parser.add_argument("filename") + parser.add_argument("podcast_id") + parser.add_argument("--process", action="store_true") + + args = parser.parse_args() + + import_episode(Path(args.filename), args.podcast_id, args.process) + + +if __name__ == "__main__": + main() diff --git a/scripts/update-pub-date.py b/scripts/update-pub-date.py new file mode 100644 index 0000000..d8c18d2 --- /dev/null +++ b/scripts/update-pub-date.py @@ -0,0 +1,50 @@ +import argparse +import sys +from datetime import datetime +from pathlib import Path + +import structlog +from sqlmodel import Session, select + +# Add the src directory to the system path +sys.path.append(str(Path(__file__).resolve().parent.parent / "src")) + +import models as models + +log = structlog.get_logger() + + +def update_pub_date(episode_id: str, new_date: str): + with Session(models.engine) as session: + episode = session.exec( + select(models.PodcastEpisode).where(models.PodcastEpisode.id == episode_id) + ).first() + + if episode is None: + log.error("Could not find episode") + return + + episode.publish_date = datetime.fromisoformat(new_date) + assert episode.publish_date.tzinfo is not None, "timezone is required" + + session.add(episode) + session.commit() + + log.info("Updated episode", episode.id) + + +def main(): + parser = argparse.ArgumentParser( + prog="update-pub-date.py", + description="Update an episode publish date", + ) + parser.add_argument("episode_id") + parser.add_argument("new_date") + + args = parser.parse_args() + + update_pub_date(args.episode_id, args.new_date) + + +if __name__ == "__main__": + main()