import urllib.parse import uuid from datetime import timedelta, timezone from pathlib import Path from typing import Annotated, Any, Generator, Optional import podgen import structlog from fastapi import Depends, FastAPI, HTTPException, Request, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from PIL import Image from sqlmodel import Session, and_, desc, select import models from auth import CurrentUser, user_token from episode_file import process_additional_episode_upload from helpers import render_markdown from process import AudioProcessor from settings import AppConfig, settings def get_session() -> Generator[Session, Any, None]: with Session(models.engine) as session: yield session def use_route_names_as_operation_ids(app: FastAPI) -> None: for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name SessionDep = Annotated[Session, Depends(get_session)] AuthDep = Annotated[CurrentUser, Depends(user_token)] log = structlog.get_logger() app = FastAPI( title="Podcast Generator", swagger_ui_init_oauth={ "appName": "PodcastGenerator", "clientId": settings.oidc_client_id, "usePkceWithAuthorizationCodeGrant": True, }, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) templates = Jinja2Templates(directory="src/templates") audio_processor = AudioProcessor() audio_processor.start_processing() app.mount("/static", StaticFiles(directory="dist/static"), name="static") @app.get("/api/config", response_model=AppConfig) def get_app_config(): return AppConfig( oidc_authority=settings.oidc_authority, oidc_client_id=settings.oidc_client_id, oidc_scopes=settings.oidc_scopes, ) @app.get("/api/user", response_model=CurrentUser) def read_user(auth: AuthDep): return auth @app.post("/api/podcasts", response_model=models.PodcastPublic) def create_podcast(auth: AuthDep, session: SessionDep, podcast: models.PodcastCreate): db_podcast = models.Podcast.model_validate(podcast) db_podcast.owner_id = auth.user_id session.add(db_podcast) session.commit() session.refresh(db_podcast) return db_podcast @app.get("/api/podcasts", response_model=list[models.PodcastPublic]) def read_podcasts(auth: AuthDep, session: SessionDep): podcasts = session.exec( select(models.Podcast).where(models.Podcast.owner_id == auth.user_id) ).all() return podcasts @app.get("/api/podcasts/{podcast_id}", response_model=models.PodcastPublic) def read_podcast(session: SessionDep, podcast_id: str): podcast = session.get(models.Podcast, podcast_id) if not podcast: raise HTTPException(status_code=404, detail="Podcast not found") return podcast @app.patch("/api/podcasts/{podcast_id}", response_model=models.PodcastPublic) def update_podcast( auth: AuthDep, session: SessionDep, podcast_id: str, podcast: models.PodcastUpdate = None, ): db_podcast = session.get(models.Podcast, podcast_id) if not db_podcast: raise HTTPException(status_code=404, detail="Podcast not found") if db_podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to update this podcast" ) podcast_data = podcast.model_dump(exclude_unset=True) db_podcast.sqlmodel_update(podcast_data) session.add(db_podcast) session.commit() session.refresh(db_podcast) return db_podcast @app.post("/api/podcasts/{podcast_id}/image", response_model=models.PodcastPublic) def update_podcast_image( auth: AuthDep, session: SessionDep, podcast_id: str, image: UploadFile, ): podcast = session.get(models.Podcast, podcast_id) if not podcast: raise HTTPException(status_code=404, detail="Podcast not found") if podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to update this podcast" ) if image is not None and image.size > 0: if not (image.filename.endswith(".jpg") or image.filename.endswith(".png")): raise HTTPException( 400, detail="The uploaded podcast image must be a jpg or png" ) im = Image.open(image.file) if im.size[0] != im.size[1] or im.size[0] < 1400 or im.size[0] > 3000: raise HTTPException( 400, detail="The uploaded podcast image must be square and between 1400x1400px and 3000x3000px in size", ) if im.mode != "RGB": raise HTTPException( 400, detail="The uploaded podcast image must be in RGB format" ) if podcast.image_filename is not None: (settings.directory / podcast.image_filename).unlink(missing_ok=True) filename = f"img_{uuid.uuid4()}" + Path(image.filename).suffix im.save(settings.directory / filename) podcast.image_filename = filename session.add(podcast) session.commit() session.refresh(podcast) return podcast @app.delete("/api/podcasts/{podcast_id}") def delete_podcast(auth: AuthDep, session: SessionDep, podcast_id: str): podcast = session.get(models.Podcast, podcast_id) if not podcast: raise HTTPException(status_code=404, detail="Podcast not found") if podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to delete this podcast" ) session.delete(podcast) session.commit() return {"ok": True} def finish_processing( podcast_id: str, episode: models.PodcastEpisode, duration: Optional[float], file_hash: str, file_size: int, ): with Session(models.engine) as session: log.info("Saving episode %s", episode.id) podcast = session.exec( select(models.Podcast).where(models.Podcast.id == podcast_id) ).first() if podcast is None: log.error("Failed processing episode, podcast does not exist.") return episode.duration = duration episode.file_hash = file_hash episode.file_size = file_size episode.podcast_id = podcast.id session.add(episode) session.commit() @app.post("/api/podcasts/{podcast_id}/episodes") async def admin_upload_episode( auth: AuthDep, session: SessionDep, request: Request, podcast_id: str, file: UploadFile, ): file_id = request.headers.get("uploader-file-id") chunks_total = int(request.headers.get("uploader-chunks-total")) chunk_number = int(request.headers.get("uploader-chunk-number")) episode_name = request.headers.get("name") if file_id is None or episode_name is None: raise HTTPException(400, "Invalid request") file_id = "".join(c for c in file_id if c.isalnum()).strip() podcast = session.exec( select(models.Podcast).where(models.Podcast.id == podcast_id) ).first() if podcast is None: raise HTTPException(status_code=404, detail="Podcast not found") if podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to add episodes to this podcast", ) is_last = (chunk_number + 1) == chunks_total file_name = f"{file_id}_{chunk_number}" settings.uploads_directory.mkdir(parents=True, exist_ok=True) with open(settings.uploads_directory / file_name, "wb") as buf: buf.write(await file.read()) if is_last: episode = models.PodcastEpisode( name=Path(urllib.parse.unquote(episode_name)).stem, file_size=0, file_hash="", ) upload_path = settings.uploads_directory / episode.id with open(upload_path, "wb") as buf: chunk = 0 while chunk < chunks_total: chunk_path = settings.uploads_directory / f"{file_id}_{chunk}" with open(chunk_path, "rb") as infile: buf.write(infile.read()) infile.close() chunk_path.unlink() chunk += 1 audio_processor.add_file( upload_path, settings.directory / f"{episode.id}.m4a", lambda duration, file_hash, file_size: finish_processing( podcast_id, episode, duration, file_hash, file_size ), ) return {"message": "File Uploaded"} return {"message": "Chunk Uploaded"} @app.get( "/api/podcasts/{podcast_id}/episodes", response_model=list[models.PodcastEpisodePublic], ) def read_episodes(session: SessionDep, podcast_id: str): episodes = session.exec( select(models.PodcastEpisode) .where(models.PodcastEpisode.podcast_id == podcast_id) .order_by(desc(models.PodcastEpisode.publish_date)) ).all() return episodes @app.get( "/api/podcasts/{podcast_id}/episodes/{episode_id}", response_model=models.PodcastEpisodePublic, ) def read_episode(session: SessionDep, podcast_id: str, episode_id: str): episode = session.exec( select(models.PodcastEpisode).where( and_( models.PodcastEpisode.id == episode_id, models.PodcastEpisode.podcast_id == podcast_id, ) ) ).first() if not episode: raise HTTPException(status_code=404, detail="Episode not found") return episode @app.patch( "/api/podcasts/{podcast_id}/episodes/{episode_id}", response_model=models.PodcastEpisodePublic, ) def update_episode( auth: AuthDep, session: SessionDep, podcast_id: str, episode_id: str, episode: models.PodcastEpisodeUpdate, ): db_episode = session.exec( select(models.PodcastEpisode).where( and_( models.PodcastEpisode.id == episode_id, models.PodcastEpisode.podcast_id == podcast_id, ) ) ).first() if not db_episode: raise HTTPException(status_code=404, detail="Episode not found") if db_episode.podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to update this episode" ) episode_data = episode.model_dump(exclude_unset=True) db_episode.sqlmodel_update(episode_data) session.add(db_episode) session.commit() session.refresh(db_episode) return db_episode @app.post( "/api/podcasts/{podcast_id}/episodes/{episode_id}", response_model=models.PodcastEpisodePublic, ) async def episode_additional_upload( auth: AuthDep, session: SessionDep, podcast_id: str, episode_id: str, file: UploadFile, ): db_episode = session.exec( select(models.PodcastEpisode).where( and_( models.PodcastEpisode.id == episode_id, models.PodcastEpisode.podcast_id == podcast_id, ) ) ).first() if not db_episode: raise HTTPException(status_code=404, detail="Episode not found") if db_episode.podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to update this episode" ) new_episode = await process_additional_episode_upload(db_episode, file) session.add(new_episode) session.commit() session.refresh(new_episode) return new_episode @app.delete("/api/podcasts/{podcast_id}/episodes/{episode_id}") def delete_episode( auth: AuthDep, session: SessionDep, podcast_id: str, episode_id: str ): episode = session.exec( select(models.PodcastEpisode).where( and_( models.PodcastEpisode.id == episode_id, models.PodcastEpisode.podcast_id == podcast_id, ) ) ).first() if not episode: raise HTTPException(status_code=404, detail="Episode not found") if episode.podcast.owner_id != auth.user_id: raise HTTPException( status_code=403, detail="You do not have permission to delete this episode" ) session.delete(episode) session.commit() return {"ok": True} @app.get("/{podcast_id}.xml") def get_podcast_feed(session: SessionDep, request: Request, podcast_id: str): podcast = session.exec( select(models.Podcast).where(models.Podcast.id == podcast_id) ).first() if podcast is None: raise HTTPException(status_code=404, detail="Podcast not found") feed = podgen.Podcast( name=podcast.name, description=podcast.description, image=urllib.parse.urljoin( str(request.base_url), f"/files/{podcast.id}/{podcast.image_filename}", ) if podcast.image_filename is not None else None, website=urllib.parse.urljoin(str(request.base_url), podcast.id), explicit=podcast.explicit, feed_url=str(request.url), ) for episode in podcast.episodes: feed.add_episode( podgen.Episode( id=episode.id, title=episode.name, publication_date=episode.publish_date.astimezone(tz=timezone.utc), media=podgen.Media( urllib.parse.urljoin( str(request.base_url), f"/files/{podcast.id}/{episode.id}.m4a" ), episode.file_size, duration=timedelta(seconds=episode.duration) if episode.duration is not None else None, ), long_summary=render_markdown(episode.description), ) ) return Response(content=feed.rss_str(), media_type="application/xml") @app.get("/files/{podcast_id}/{filename}") def get_episode_or_cover(session: SessionDep, podcast_id: str, filename: str): podcast = session.exec( select(models.Podcast).where(models.Podcast.id == podcast_id) ).first() if podcast is None: raise HTTPException(status_code=404, detail="Podcast not found") if filename.endswith(".m4a"): episode = session.exec( select(models.PodcastEpisode).where( and_( models.PodcastEpisode.podcast_id == podcast_id, models.PodcastEpisode.id == filename.removesuffix(".m4a"), ) ) ).first() if episode is None: raise HTTPException(status_code=404, detail="Episode or podcast not found") return FileResponse(settings.directory / f"{episode.id}.m4a") elif ( filename.endswith(".jpg") or filename.endswith(".png") ) and filename == podcast.image_filename: return FileResponse( settings.directory / podcast.image_filename, headers={"Access-Control-Allow-Origin": "*"}, ) raise HTTPException(status_code=404, detail="File not found") @app.get("/{full_path:path}") async def serve_app(full_path: str): if not full_path.startswith("api/"): return FileResponse("dist/index.html") raise HTTPException(status_code=404, detail="Not found") use_route_names_as_operation_ids(app)