import argparse import hashlib import shutil import sys from pathlib import Path import ffmpeg import structlog from sqlmodel import Session, select # Add the src directory to the system path sys.path.append(str(Path(__file__).resolve().parent.parent / "src")) import models as models from settings import settings log = structlog.get_logger() def import_episode(filename: Path, podcast_id: str, process: bool, move: bool = True): if process: raise NotImplementedError("Importing with processing is not implemented") if filename.suffix != ".m4a" and not process: log.error("Input file must be in an m4a container if not processing") return with Session(models.engine) as session: podcast = session.exec( select(models.Podcast).where(models.Podcast.id == podcast_id) ).first() if podcast is None: log.error("Failed importing episode, podcast does not exist.") return episode = models.PodcastEpisode( name=filename.stem, file_size=0, file_hash="", podcast_id=podcast.id ) episode_filename = settings.directory / f"{episode.id}.m4a" if move: log.info("Moving episode to %s...", episode_filename) shutil.move(filename, episode_filename) else: log.info("Copying episode to %s...", episode_filename) shutil.copyfile(filename, episode_filename) probe = ffmpeg.probe(str(episode_filename)) stream = next( (stream for stream in probe["streams"] if stream["codec_type"] == "audio"), None, ) file_hash = hashlib.sha256() with open(episode_filename, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): file_hash.update(byte_block) episode.duration = ( float(stream["duration"]) if stream is not None and "duration" in stream else None ) episode.file_hash = file_hash.hexdigest() episode.file_size = episode_filename.stat().st_size session.add(episode) session.commit() log.info("Imported episode as %s", episode.id) def main(): parser = argparse.ArgumentParser( prog="import-episode.py", description="Import an episode", ) parser.add_argument("filename") parser.add_argument("podcast_id") parser.add_argument("--process", action="store_true") args = parser.parse_args() import_episode(Path(args.filename), args.podcast_id, args.process) if __name__ == "__main__": main()