import urllib.parse from datetime import timedelta from pathlib import Path from typing import Annotated, Optional import podgen import structlog from fastapi import FastAPI, Form, HTTPException, Request, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse, RedirectResponse from fastapi.templating import Jinja2Templates import data from process import AudioProcessor from settings import settings log = structlog.get_logger() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) templates = Jinja2Templates(directory="templates") audio_processor = AudioProcessor() audio_processor.start_processing() @app.get("/admin") def admin_list_feeds(request: Request): repo = data.load_repository() return templates.TemplateResponse( request=request, name="admin_feeds.html.j2", context={"repo": repo}, ) @app.get("/admin/{feed_id}") def admin_list_feed(request: Request, feed_id: str): repo = data.load_repository() feed = next( (podcast for id, podcast in repo.podcasts.items() if id == feed_id), None ) if feed is None: raise HTTPException(status_code=404, detail="Podcast not found") return templates.TemplateResponse( request=request, name="admin_feed.html.j2", context={ "feed": feed, "id": feed_id, "feed_uri": urllib.parse.urljoin(str(request.base_url), f"{feed_id}.xml"), }, ) def finish_processing( feed_id: str, episode: data.Episode, duration: Optional[float], file_hash: str, file_size: int, ): log.info("Saving episode %s", episode.id) repo = data.load_repository() episode.duration = duration episode.file_hash = file_hash episode.file_size = file_size repo.podcasts[feed_id].episodes.insert(0, episode) data.save_repository(repo) @app.post("/admin/{feed_id}/upload") async def admin_upload_episode(request: Request, feed_id: str, file: UploadFile): file_id = request.headers.get("uploader-file-id") chunks_total = int(request.headers.get("uploader-chunks-total")) chunk_number = int(request.headers.get("uploader-chunk-number")) episode_name = request.headers.get("name") if file_id is None or episode_name is None: raise HTTPException(400, "Invalid request") file_id = "".join(c for c in file_id if c.isalnum()).strip() repo = data.load_repository() if feed_id not in repo.podcasts: raise HTTPException(status_code=404, detail="Podcast not found") is_last = (chunk_number + 1) == chunks_total file_name = f"{file_id}_{chunk_number}" settings.uploads_directory.mkdir(parents=True, exist_ok=True) with open(settings.uploads_directory / file_name, "wb") as buf: buf.write(await file.read()) if is_last: episode = data.Episode( name=Path(urllib.parse.unquote(episode_name)).stem, file_size=0, file_hash="", ) upload_path = settings.uploads_directory / episode.id with open(upload_path, "wb") as buf: chunk = 0 while chunk < chunks_total: chunk_path = settings.uploads_directory / f"{file_id}_{chunk}" with open(chunk_path, "rb") as infile: buf.write(infile.read()) infile.close() chunk_path.unlink() chunk += 1 audio_processor.add_file( upload_path, settings.directory / f"{episode.id}.m4a", lambda duration, file_hash, file_size: finish_processing( feed_id, episode, duration, file_hash, file_size ), ) return JSONResponse({"message": "File Uploaded"}, status_code=200) return JSONResponse({"message": "Chunk Uploaded"}, status_code=200) @app.get("/admin/{feed_id}/{episode_id}/delete") def admin_delete_episode(feed_id: str, episode_id: str): repo = data.load_repository() if feed_id not in repo.podcasts: raise HTTPException(status_code=404, detail="Podcast not found") (settings.directory / f"{episode_id}.m4a").unlink() new_feed = repo.podcasts[feed_id] new_feed.episodes = [ episode for episode in new_feed.episodes if episode.id != episode_id ] repo.podcasts[feed_id] = new_feed data.save_repository(repo) return RedirectResponse(f"/admin/{feed_id}", status_code=303) @app.get("/admin/{feed_id}/{episode_id}/edit") def admin_edit_episode(request: Request, feed_id: str, episode_id: str): repo = data.load_repository() if feed_id not in repo.podcasts: raise HTTPException(status_code=404, detail="Podcast not found") episode = next( ( episode for episode in repo.podcasts[feed_id].episodes if episode.id == episode_id ), None, ) if episode is None: raise HTTPException(status_code=404, detail="Episode not found") return templates.TemplateResponse( request=request, name="admin_episode_edit.html.j2", context={"episode": episode}, ) @app.post("/admin/{feed_id}/{episode_id}/edit") def admin_edit_episode_post( feed_id: str, episode_id: str, name: Annotated[str, Form()], description: Annotated[str, Form()], ): repo = data.load_repository() if feed_id not in repo.podcasts: raise HTTPException(status_code=404, detail="Podcast not found") episode_idx = next( ( idx for idx, episode in enumerate(repo.podcasts[feed_id].episodes) if episode.id == episode_id ), None, ) if episode_idx is None: raise HTTPException(status_code=404, detail="Episode not found") if name.strip() != "": repo.podcasts[feed_id].episodes[episode_idx].name = name if description.strip() != "": repo.podcasts[feed_id].episodes[episode_idx].description = description else: repo.podcasts[feed_id].episodes[episode_idx].description = None data.save_repository(repo) return RedirectResponse(f"/admin/{feed_id}", status_code=303) @app.get("/admin/{feed_id}/edit") def admin_edit_feed(request: Request, feed_id: str): repo = data.load_repository() if feed_id not in repo.podcasts: raise HTTPException(status_code=404, detail="Podcast not found") return templates.TemplateResponse( request=request, name="admin_feed_edit.html.j2", context={"feed": repo.podcasts[feed_id]}, ) @app.post("/admin/{feed_id}/edit") def admin_edit_feed_post( feed_id: str, name: Annotated[str, Form()], description: Annotated[str, Form()], ): repo = data.load_repository() if feed_id not in repo.podcasts: raise HTTPException(status_code=404, detail="Podcast not found") new_feed = repo.podcasts[feed_id] if name.strip() != "": new_feed.name = name new_feed.description = description repo.podcasts[feed_id] = new_feed data.save_repository(repo) return RedirectResponse(f"/admin/{feed_id}", status_code=303) @app.get("/{feed_id}.xml") def get_feed(request: Request, feed_id: str): repo = data.load_repository() feed = next( (podcast for id, podcast in repo.podcasts.items() if id == feed_id), None ) if feed is None: raise HTTPException(status_code=404, detail="Podcast not found") podcast = podgen.Podcast( name=feed.name, description=feed.description, website=urllib.parse.urljoin(str(request.base_url), feed_id), explicit=feed.explicit, feed_url=str(request.url), ) for episode in feed.episodes: podcast.add_episode( podgen.Episode( id=episode.id, title=episode.name, publication_date=episode.publish_date, media=podgen.Media( urllib.parse.urljoin( str(request.base_url), f"{feed_id}/{episode.id}.m4a" ), episode.file_size, duration=timedelta(seconds=episode.duration) if episode.duration is not None else None, ), long_summary=episode.description, ) ) return Response(content=podcast.rss_str(), media_type="application/xml") @app.get("/{feed_id}/{episode_id}") def get_episode(feed_id: str, episode_id: str): episode_id = episode_id.removesuffix(".m4a") repo = data.load_repository() feed = next( (podcast for id, podcast in repo.podcasts.items() if id == feed_id), None ) if feed is None: raise HTTPException(status_code=404, detail="Podcast not found") episode = next( (episode for episode in feed.episodes if episode.id == episode_id), None ) if episode is None: raise HTTPException(status_code=404, detail="Episode not found") return FileResponse(settings.directory / f"{episode_id}.m4a")