podcast-generator/main.py
Jake Walker dd268a8028
All checks were successful
ci/woodpecker/push/build Pipeline was successful
web app rewrite
2025-01-10 13:30:09 +00:00

285 lines
8 KiB
Python

import urllib.parse
from datetime import timedelta
from pathlib import Path
from typing import Annotated, Optional
import aiofiles
import podgen
import structlog
from fastapi import FastAPI, Form, HTTPException, Request, Response
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
import data
from process import AudioProcessor
from settings import settings
log = structlog.get_logger()
app = FastAPI()
templates = Jinja2Templates(directory="templates")
audio_processor = AudioProcessor()
audio_processor.start_processing()
@app.get("/admin")
def admin_list_feeds(request: Request):
repo = data.load_repository()
return templates.TemplateResponse(
request=request,
name="admin_feeds.html.j2",
context={"repo": repo},
)
@app.get("/admin/{feed_id}")
def admin_list_feed(request: Request, feed_id: str):
repo = data.load_repository()
feed = next(
(podcast for id, podcast in repo.podcasts.items() if id == feed_id), None
)
if feed is None:
raise HTTPException(status_code=404, detail="Podcast not found")
return templates.TemplateResponse(
request=request,
name="admin_feed.html.j2",
context={
"feed": feed,
"id": feed_id,
"feed_uri": urllib.parse.urljoin(str(request.base_url), f"{feed_id}.xml"),
},
)
def finish_processing(
feed_id: str,
episode: data.Episode,
duration: Optional[float],
file_hash: str,
file_size: int,
):
log.info("Saving episode %s", episode.id)
repo = data.load_repository()
episode.duration = duration
episode.file_hash = file_hash
episode.file_size = file_size
repo.podcasts[feed_id].episodes.insert(0, episode)
data.save_repository(repo)
@app.post("/admin/{feed_id}/upload")
async def admin_upload_episode(request: Request, feed_id: str):
repo = data.load_repository()
if feed_id not in repo.podcasts:
raise HTTPException(status_code=404, detail="Podcast not found")
try:
filename = Path(urllib.parse.unquote(request.headers["filename"]))
episode = data.Episode(name=filename.stem, file_size=0, file_hash="")
settings.uploads_directory.mkdir(parents=True, exist_ok=True)
upload_path = settings.uploads_directory / episode.id
async with aiofiles.open(upload_path, "wb") as f:
async for chunk in request.stream():
await f.write(chunk)
audio_processor.add_file(
upload_path,
settings.directory / f"{episode.id}.m4a",
lambda duration, file_hash, file_size: finish_processing(
feed_id, episode, duration, file_hash, file_size
),
)
except Exception as e:
log.error("Failed to upload file", error=e)
raise HTTPException(status_code=500, detail="Something went wrong")
return {"ok": True}
@app.get("/admin/{feed_id}/{episode_id}/delete")
def admin_delete_episode(feed_id: str, episode_id: str):
repo = data.load_repository()
if feed_id not in repo.podcasts:
raise HTTPException(status_code=404, detail="Podcast not found")
(settings.directory / f"{episode_id}.m4a").unlink()
new_feed = repo.podcasts[feed_id]
new_feed.episodes = [
episode for episode in new_feed.episodes if episode.id != episode_id
]
repo.podcasts[feed_id] = new_feed
data.save_repository(repo)
return RedirectResponse(f"/admin/{feed_id}", status_code=303)
@app.get("/admin/{feed_id}/{episode_id}/edit")
def admin_edit_episode(request: Request, feed_id: str, episode_id: str):
repo = data.load_repository()
if feed_id not in repo.podcasts:
raise HTTPException(status_code=404, detail="Podcast not found")
episode = next(
(
episode
for episode in repo.podcasts[feed_id].episodes
if episode.id == episode_id
),
None,
)
if episode is None:
raise HTTPException(status_code=404, detail="Episode not found")
return templates.TemplateResponse(
request=request,
name="admin_episode_edit.html.j2",
context={"episode": episode},
)
@app.post("/admin/{feed_id}/{episode_id}/edit")
def admin_edit_episode_post(
feed_id: str,
episode_id: str,
name: Annotated[str, Form()],
description: Annotated[str, Form()],
):
repo = data.load_repository()
if feed_id not in repo.podcasts:
raise HTTPException(status_code=404, detail="Podcast not found")
episode_idx = next(
(
idx
for idx, episode in enumerate(repo.podcasts[feed_id].episodes)
if episode.id == episode_id
),
None,
)
if episode_idx is None:
raise HTTPException(status_code=404, detail="Episode not found")
if name.strip() != "":
repo.podcasts[feed_id].episodes[episode_idx].name = name
if description.strip() != "":
repo.podcasts[feed_id].episodes[episode_idx].description = description
else:
repo.podcasts[feed_id].episodes[episode_idx].description = None
data.save_repository(repo)
return RedirectResponse(f"/admin/{feed_id}", status_code=303)
@app.get("/admin/{feed_id}/edit")
def admin_edit_feed(request: Request, feed_id: str):
repo = data.load_repository()
if feed_id not in repo.podcasts:
raise HTTPException(status_code=404, detail="Podcast not found")
return templates.TemplateResponse(
request=request,
name="admin_feed_edit.html.j2",
context={"feed": repo.podcasts[feed_id]},
)
@app.post("/admin/{feed_id}/edit")
def admin_edit_feed_post(
feed_id: str,
name: Annotated[str, Form()],
description: Annotated[str, Form()],
):
repo = data.load_repository()
if feed_id not in repo.podcasts:
raise HTTPException(status_code=404, detail="Podcast not found")
new_feed = repo.podcasts[feed_id]
if name.strip() != "":
new_feed.name = name
new_feed.description = description
repo.podcasts[feed_id] = new_feed
data.save_repository(repo)
return RedirectResponse(f"/admin/{feed_id}", status_code=303)
@app.get("/{feed_id}.xml")
def get_feed(request: Request, feed_id: str):
repo = data.load_repository()
feed = next(
(podcast for id, podcast in repo.podcasts.items() if id == feed_id), None
)
if feed is None:
raise HTTPException(status_code=404, detail="Podcast not found")
podcast = podgen.Podcast(
name=feed.name,
description=feed.description,
website=urllib.parse.urljoin(str(request.base_url), feed_id),
explicit=feed.explicit,
feed_url=str(request.url),
)
for episode in feed.episodes:
podcast.add_episode(
podgen.Episode(
id=episode.id,
title=episode.name,
publication_date=episode.publish_date,
media=podgen.Media(
urllib.parse.urljoin(
str(request.base_url), f"{feed_id}/{episode.id}.m4a"
),
episode.file_size,
duration=timedelta(seconds=episode.duration)
if episode.duration is not None
else None,
),
long_summary=episode.description,
)
)
return Response(content=podcast.rss_str(), media_type="application/xml")
@app.get("/{feed_id}/{episode_id}")
def get_episode(feed_id: str, episode_id: str):
episode_id = episode_id.removesuffix(".m4a")
repo = data.load_repository()
feed = next(
(podcast for id, podcast in repo.podcasts.items() if id == feed_id), None
)
if feed is None:
raise HTTPException(status_code=404, detail="Podcast not found")
episode = next(
(episode for episode in feed.episodes if episode.id == episode_id), None
)
if episode is None:
raise HTTPException(status_code=404, detail="Episode not found")
return FileResponse(settings.directory / f"{episode_id}.m4a")