add scripts

This commit is contained in:
Jake Walker 2025-01-17 16:48:51 +00:00
parent 4e89158d96
commit dc88dc49e8
2 changed files with 140 additions and 0 deletions

90
scripts/import-episode.py Normal file
View file

@ -0,0 +1,90 @@
import argparse
import hashlib
import shutil
import sys
from pathlib import Path
import ffmpeg
import structlog
from sqlmodel import Session, select
# Add the src directory to the system path
sys.path.append(str(Path(__file__).resolve().parent.parent / "src"))
import models as models
from settings import settings
log = structlog.get_logger()
def import_episode(filename: Path, podcast_id: str, process: bool, move: bool = True):
if process:
raise NotImplementedError("Importing with processing is not implemented")
if filename.suffix != ".m4a" and not process:
log.error("Input file must be in an m4a container if not processing")
return
with Session(models.engine) as session:
podcast = session.exec(
select(models.Podcast).where(models.Podcast.id == podcast_id)
).first()
if podcast is None:
log.error("Failed importing episode, podcast does not exist.")
return
episode = models.PodcastEpisode(
name=filename.stem, file_size=0, file_hash="", podcast_id=podcast.id
)
episode_filename = settings.directory / f"{episode.id}.m4a"
if move:
log.info("Moving episode to %s...", episode_filename)
shutil.move(filename, episode_filename)
else:
log.info("Copying episode to %s...", episode_filename)
shutil.copyfile(filename, episode_filename)
probe = ffmpeg.probe(str(episode_filename))
stream = next(
(stream for stream in probe["streams"] if stream["codec_type"] == "audio"),
None,
)
file_hash = hashlib.sha256()
with open(episode_filename, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
file_hash.update(byte_block)
episode.duration = (
float(stream["duration"])
if stream is not None and "duration" in stream
else None
)
episode.file_hash = file_hash.hexdigest()
episode.file_size = episode_filename.stat().st_size
session.add(episode)
session.commit()
log.info("Imported episode as %s", episode.id)
def main():
parser = argparse.ArgumentParser(
prog="import-episode.py",
description="Import an episode",
)
parser.add_argument("filename")
parser.add_argument("podcast_id")
parser.add_argument("--process", action="store_true")
args = parser.parse_args()
import_episode(Path(args.filename), args.podcast_id, args.process)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,50 @@
import argparse
import sys
from datetime import datetime
from pathlib import Path
import structlog
from sqlmodel import Session, select
# Add the src directory to the system path
sys.path.append(str(Path(__file__).resolve().parent.parent / "src"))
import models as models
log = structlog.get_logger()
def update_pub_date(episode_id: str, new_date: str):
with Session(models.engine) as session:
episode = session.exec(
select(models.PodcastEpisode).where(models.PodcastEpisode.id == episode_id)
).first()
if episode is None:
log.error("Could not find episode")
return
episode.publish_date = datetime.fromisoformat(new_date)
assert episode.publish_date.tzinfo is not None, "timezone is required"
session.add(episode)
session.commit()
log.info("Updated episode", episode.id)
def main():
parser = argparse.ArgumentParser(
prog="update-pub-date.py",
description="Update an episode publish date",
)
parser.add_argument("episode_id")
parser.add_argument("new_date")
args = parser.parse_args()
update_pub_date(args.episode_id, args.new_date)
if __name__ == "__main__":
main()