import shutil import time import urllib.parse from datetime import datetime, timezone from pathlib import Path import structlog from podgen import Episode, Media, Podcast from pydantic import BaseModel, Field from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer from process import AudioProcessor from settings import Settings EXTENSIONS = [".m4a"] META_FILENAME = "meta.json" DESCRIPTION_EXTENSION = "txt" log = structlog.get_logger() class PodcastMeta(BaseModel): name: str description: str explicit: bool = Field(default=True) class PodcastGenerator: def __init__(self, settings: Settings): self.settings = settings self.setup_directories() def setup_directories(self) -> None: self.settings.output_directory.mkdir(parents=True, exist_ok=True) for feed_name in self.settings.feeds: feed_dir = self.settings.directory / feed_name for dir in ["consume", "episodes"]: (feed_dir / dir).mkdir(parents=True, exist_ok=True) meta_filename = feed_dir / META_FILENAME if not meta_filename.is_file(): with open(meta_filename, "w") as f: f.write( PodcastMeta( name=feed_name, description=feed_name, explicit=True, ).model_dump_json() ) def get_feed_meta(self, feed_name: str) -> PodcastMeta: with open(self.settings.directory / feed_name / META_FILENAME, "r") as f: return PodcastMeta.model_validate_json(f.read()) def generate_feed(self, feed_name: str) -> None: log.info("Generating feed for %s", feed_name) podcast_meta = self.get_feed_meta(feed_name) feed = Podcast( name=podcast_meta.name, description=podcast_meta.description, website=urllib.parse.urljoin(self.settings.url_base, feed_name), explicit=podcast_meta.explicit, feed_url=urllib.parse.urljoin(self.settings.url_base, f"{feed_name}.xml"), ) output_dir = self.settings.output_directory / feed_name feed_episodes_dir = self.settings.directory / feed_name / "episodes" shutil.rmtree(output_dir, ignore_errors=True) output_dir.mkdir() for file in feed_episodes_dir.glob("*"): if file.suffix not in EXTENSIONS: continue log.debug("Adding episode %s to feed", str(file.name)) episode = Episode( title=file.stem, media=Media( urllib.parse.urljoin( self.settings.url_base, urllib.parse.quote(f"{feed_name}/{file.name}"), ), file.stat().st_size, ), publication_date=datetime.fromtimestamp( file.stat().st_birthtime, timezone.utc ), ) description_filename = ( feed_episodes_dir / f"{file.stem}.{DESCRIPTION_EXTENSION}" ) if description_filename.is_file(): with open(description_filename, "r") as f: episode.long_summary = f.read() shutil.copyfile(file, output_dir / file.name) feed.add_episode(episode) output_feed_file = self.settings.output_directory / f"{feed_name}.xml" log.info("Saving feed to %s", output_feed_file) with open(output_feed_file, "w") as f: feed.rss_file(f) class GeneratorEventHandler(FileSystemEventHandler): def __init__(self, settings: Settings): self.settings = settings self.audio_processor = AudioProcessor() self.generator = PodcastGenerator(settings=settings) self.audio_processor.start_processing() super().__init__() def on_any_event(self, event): src_path = Path(event.src_path) for feed_name in self.settings.feeds: feed_consume_dir = self.settings.directory / feed_name / "consume" feed_meta_path = self.settings.directory / feed_name / META_FILENAME feed_episodes_dir = self.settings.directory / feed_name / "episodes" # if a file is created in a consume directory if event.event_type == "created": if src_path.parent != feed_consume_dir: continue output_path = ( self.settings.directory / feed_name / "episodes" / f"{src_path.stem}.m4a" ) self.audio_processor.add_file( src_path, output_path, ) open( output_path.parent / f"{output_path.stem}.{DESCRIPTION_EXTENSION}", "a", ).close() self.generator.generate_feed(feed_name) # if a file is modified in the episodes directory or meta has changed if src_path == feed_meta_path or feed_episodes_dir in src_path.parents: self.generator.generate_feed(feed_name) if __name__ == "__main__": settings = Settings() log.info("Loaded settings", settings=settings) observer = Observer() observer.schedule( GeneratorEventHandler(settings), settings.directory, recursive=True ) observer.start() log.info("Listening for changes at %s...", settings.directory) try: while True: time.sleep(1) finally: observer.stop() observer.join()