import json from datetime import datetime, timezone from typing import Optional import nanoid from sqlalchemy import Engine from sqlmodel import Field, Relationship, Session, SQLModel, create_engine from settings import settings class Podcast(SQLModel, table=True): id: str = Field(primary_key=True, default_factory=lambda: nanoid.generate()) name: str description: str explicit: bool = Field(default=True) image_filename: Optional[str] = Field(default=None) owner_id: Optional[str] = Field(default=None) episodes: list["PodcastEpisode"] = Relationship(back_populates="podcast") class PodcastEpisode(SQLModel, table=True): id: str = Field(primary_key=True, default_factory=lambda: nanoid.generate()) name: str duration: Optional[float] = Field(default=None) description: Optional[float] = Field(default=None) file_hash: str file_size: int publish_date: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) podcast_id: str = Field(foreign_key="podcast.id") podcast: Podcast = Relationship(back_populates="episodes") def setup_db(engine: Engine): SQLModel.metadata.create_all(engine) # try and migrate old data old_data = settings.directory / "data.json" if old_data.is_file(): try: session = Session(engine) with open(old_data, "r") as f: data = json.load(f) for id, item in data["podcasts"].items(): podcast = Podcast( id=id, name=item.get("name"), description=item.get("description"), explicit=item.get("explicit"), image_filename=item.get("image_filename"), ) session.add(podcast) session.commit() for episode in item["episodes"]: ep = PodcastEpisode.model_validate( {**episode, "podcast_id": podcast.id, "id": nanoid.generate()} ) session.add(ep) session.commit() old_data.unlink() except Exception as ex: print("Failed to migrate old data", ex) engine = create_engine(f"sqlite:///{settings.directory / 'data.db'}")