podcast-generator/main.py
Jake Walker 80085fcad1
All checks were successful
ci/woodpecker/push/build Pipeline was successful
improvements
2025-01-09 11:28:18 +00:00

252 lines
8 KiB
Python

import hashlib
import shutil
import time
import urllib.parse
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import ffmpeg
import structlog
from podgen import Episode, Media, Podcast
from pydantic import BaseModel, Field
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from process import AudioProcessor
from settings import Settings
EXTENSIONS = [
".aac",
".ac3",
".aif",
".aiff",
".ape",
".flac",
".m4a",
".mp3",
".ogg",
".opus",
".ra",
".ram",
".wav",
".wma",
]
META_FILENAME = "meta.json"
DESCRIPTION_EXTENSION = "txt"
log = structlog.get_logger()
class PodcastMeta(BaseModel):
name: str
description: str
explicit: bool = Field(default=True)
output_name: str = Field(default_factory=lambda: str(uuid.uuid4()))
class PodcastGenerator:
def __init__(self, settings: Settings):
self.settings = settings
self.setup_directories()
def setup_directories(self) -> None:
self.settings.output_directory.mkdir(parents=True, exist_ok=True)
for feed_name in self.settings.feeds:
feed_dir = self.settings.directory / feed_name
for dir in ["consume", "episodes"]:
(feed_dir / dir).mkdir(parents=True, exist_ok=True)
meta_filename = feed_dir / META_FILENAME
if not meta_filename.is_file():
with open(meta_filename, "w") as f:
f.write(
PodcastMeta(
name=feed_name,
description=feed_name,
explicit=True,
).model_dump_json()
)
def get_feed_meta(self, feed_name: str) -> PodcastMeta:
with open(self.settings.directory / feed_name / META_FILENAME, "r") as f:
return PodcastMeta.model_validate_json(f.read())
def get_audio_duration(self, filename: Path) -> Optional[timedelta]:
probe = ffmpeg.probe(str(filename))
stream = next(
(stream for stream in probe["streams"] if stream["codec_type"] == "audio"),
None,
)
return (
timedelta(seconds=float(stream["duration"]))
if stream is not None and "duration" in stream
else None
)
def generate_all_feeds(self) -> None:
shutil.rmtree(self.settings.output_directory, ignore_errors=True)
for feed_name in self.settings.feeds:
self.generate_feed(feed_name)
def generate_feed(self, feed_name: str) -> None:
log.info("Generating feed for %s", feed_name)
podcast_meta = self.get_feed_meta(feed_name)
feed = Podcast(
name=podcast_meta.name,
description=podcast_meta.description,
website=urllib.parse.urljoin(
self.settings.url_base, podcast_meta.output_name
),
explicit=podcast_meta.explicit,
feed_url=urllib.parse.urljoin(
self.settings.url_base, f"{podcast_meta.output_name}/feed.xml"
),
)
output_dir = self.settings.output_directory / podcast_meta.output_name
feed_episodes_dir = self.settings.directory / feed_name / "episodes"
shutil.rmtree(output_dir, ignore_errors=True)
output_dir.mkdir(parents=True)
for file in feed_episodes_dir.glob("*"):
if file.suffix not in EXTENSIONS:
continue
log.debug("Adding episode %s to feed", str(file.name))
try:
file_date = file.stat().st_birthtime
except AttributeError:
try:
file_date = file.stat().st_ctime
except AttributeError:
file_date = datetime.now().timestamp()
h = hashlib.sha256()
with open(file, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
h.update(byte_block)
episode = Episode(
id=h.hexdigest(),
title=file.stem,
media=Media(
urllib.parse.urljoin(
self.settings.url_base,
urllib.parse.quote(f"{feed_name}/{file.name}"),
),
file.stat().st_size,
duration=self.get_audio_duration(file),
),
publication_date=datetime.fromtimestamp(file_date, timezone.utc),
)
description_filename = (
feed_episodes_dir / f"{file.stem}.{DESCRIPTION_EXTENSION}"
)
if description_filename.is_file():
with open(description_filename, "r") as f:
content = f.read()
if content.strip() != "":
episode.long_summary = content.strip()
shutil.copyfile(file, output_dir / file.name)
feed.add_episode(episode)
output_feed_file = output_dir / "feed.xml"
log.info("Saving feed to %s", output_feed_file)
with open(output_feed_file, "w") as f:
feed.rss_file(f)
class GeneratorEventHandler(FileSystemEventHandler):
def __init__(self, settings: Settings):
self.settings = settings
self.generator = PodcastGenerator(settings=settings)
self.audio_processor = AudioProcessor(
generate_callback=lambda: self.generator.generate_all_feeds()
)
self.generate_time: Optional[datetime] = None
self.audio_processor.start_processing()
self.generator.generate_all_feeds()
super().__init__()
def on_any_event(self, event):
src_path = Path(event.src_path)
# log.debug("Got file watch event", e=event)
for feed_name in self.settings.feeds:
feed_consume_dir = self.settings.directory / feed_name / "consume"
feed_meta_path = self.settings.directory / feed_name / META_FILENAME
feed_episodes_dir = self.settings.directory / feed_name / "episodes"
# if a file is created in a consume directory
if event.event_type == "created":
if (
src_path.parent != feed_consume_dir
or src_path.suffix not in EXTENSIONS
or src_path.name.startswith(".")
):
continue
output_path = (
self.settings.directory
/ feed_name
/ "episodes"
/ f"{src_path.stem}.m4a"
)
self.audio_processor.add_file(
src_path,
output_path,
)
open(
output_path.parent / f"{output_path.stem}.{DESCRIPTION_EXTENSION}",
"a",
).close()
# if a file is modified in the episodes directory or meta has changed
if (
src_path == feed_meta_path
or feed_episodes_dir in src_path.parents
and not event.is_directory
):
self.generate_time = datetime.now() + timedelta(minutes=1)
if __name__ == "__main__":
settings = Settings()
log.info("Loaded settings", settings=settings)
event_handler = GeneratorEventHandler(settings)
observer = Observer()
observer.schedule(event_handler, settings.directory, recursive=True)
observer.start()
log.info("Listening for changes at %s...", settings.directory)
try:
while True:
if (
event_handler.generate_time is not None
and datetime.now() >= event_handler.generate_time
):
event_handler.generate_time = None
event_handler.generator.generate_all_feeds()
time.sleep(1)
finally:
observer.stop()
observer.join()
observer.join()