361 lines
11 KiB
Python
361 lines
11 KiB
Python
import urllib.parse
|
|
import uuid
|
|
from datetime import timedelta
|
|
from pathlib import Path
|
|
from typing import Annotated, Optional
|
|
|
|
import podgen
|
|
import structlog
|
|
from fastapi import FastAPI, Form, HTTPException, Request, Response, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from PIL import Image
|
|
|
|
import data
|
|
from process import AudioProcessor
|
|
from settings import settings
|
|
|
|
log = structlog.get_logger()
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
|
|
)
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
audio_processor = AudioProcessor()
|
|
audio_processor.start_processing()
|
|
|
|
|
|
@app.get("/admin")
|
|
def admin_list_feeds(request: Request):
|
|
repo = data.load_repository()
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="admin_feeds.html.j2",
|
|
context={"repo": repo},
|
|
)
|
|
|
|
|
|
@app.get("/admin/{feed_id}")
|
|
def admin_list_feed(request: Request, feed_id: str):
|
|
repo = data.load_repository()
|
|
feed = next(
|
|
(podcast for id, podcast in repo.podcasts.items() if id == feed_id), None
|
|
)
|
|
|
|
if feed is None:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="admin_feed.html.j2",
|
|
context={
|
|
"feed": feed,
|
|
"id": feed_id,
|
|
"feed_uri": urllib.parse.urljoin(str(request.base_url), f"{feed_id}.xml"),
|
|
},
|
|
)
|
|
|
|
|
|
def finish_processing(
|
|
feed_id: str,
|
|
episode: data.Episode,
|
|
duration: Optional[float],
|
|
file_hash: str,
|
|
file_size: int,
|
|
):
|
|
log.info("Saving episode %s", episode.id)
|
|
repo = data.load_repository()
|
|
episode.duration = duration
|
|
episode.file_hash = file_hash
|
|
episode.file_size = file_size
|
|
repo.podcasts[feed_id].episodes.insert(0, episode)
|
|
data.save_repository(repo)
|
|
|
|
|
|
@app.post("/admin/{feed_id}/upload")
|
|
async def admin_upload_episode(request: Request, feed_id: str, file: UploadFile):
|
|
file_id = request.headers.get("uploader-file-id")
|
|
chunks_total = int(request.headers.get("uploader-chunks-total"))
|
|
chunk_number = int(request.headers.get("uploader-chunk-number"))
|
|
episode_name = request.headers.get("name")
|
|
|
|
if file_id is None or episode_name is None:
|
|
raise HTTPException(400, "Invalid request")
|
|
|
|
file_id = "".join(c for c in file_id if c.isalnum()).strip()
|
|
|
|
repo = data.load_repository()
|
|
|
|
if feed_id not in repo.podcasts:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
is_last = (chunk_number + 1) == chunks_total
|
|
|
|
file_name = f"{file_id}_{chunk_number}"
|
|
|
|
settings.uploads_directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(settings.uploads_directory / file_name, "wb") as buf:
|
|
buf.write(await file.read())
|
|
|
|
if is_last:
|
|
episode = data.Episode(
|
|
name=Path(urllib.parse.unquote(episode_name)).stem,
|
|
file_size=0,
|
|
file_hash="",
|
|
)
|
|
upload_path = settings.uploads_directory / episode.id
|
|
with open(upload_path, "wb") as buf:
|
|
chunk = 0
|
|
while chunk < chunks_total:
|
|
chunk_path = settings.uploads_directory / f"{file_id}_{chunk}"
|
|
with open(chunk_path, "rb") as infile:
|
|
buf.write(infile.read())
|
|
infile.close()
|
|
|
|
chunk_path.unlink()
|
|
chunk += 1
|
|
|
|
audio_processor.add_file(
|
|
upload_path,
|
|
settings.directory / f"{episode.id}.m4a",
|
|
lambda duration, file_hash, file_size: finish_processing(
|
|
feed_id, episode, duration, file_hash, file_size
|
|
),
|
|
)
|
|
|
|
return JSONResponse({"message": "File Uploaded"}, status_code=200)
|
|
|
|
return JSONResponse({"message": "Chunk Uploaded"}, status_code=200)
|
|
|
|
|
|
@app.get("/admin/{feed_id}/{episode_id}/delete")
|
|
def admin_delete_episode(feed_id: str, episode_id: str):
|
|
repo = data.load_repository()
|
|
|
|
if feed_id not in repo.podcasts:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
(settings.directory / f"{episode_id}.m4a").unlink()
|
|
|
|
new_feed = repo.podcasts[feed_id]
|
|
new_feed.episodes = [
|
|
episode for episode in new_feed.episodes if episode.id != episode_id
|
|
]
|
|
repo.podcasts[feed_id] = new_feed
|
|
data.save_repository(repo)
|
|
|
|
return RedirectResponse(f"/admin/{feed_id}", status_code=303)
|
|
|
|
|
|
@app.get("/admin/{feed_id}/{episode_id}/edit")
|
|
def admin_edit_episode(request: Request, feed_id: str, episode_id: str):
|
|
repo = data.load_repository()
|
|
|
|
if feed_id not in repo.podcasts:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
episode = next(
|
|
(
|
|
episode
|
|
for episode in repo.podcasts[feed_id].episodes
|
|
if episode.id == episode_id
|
|
),
|
|
None,
|
|
)
|
|
|
|
if episode is None:
|
|
raise HTTPException(status_code=404, detail="Episode not found")
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="admin_episode_edit.html.j2",
|
|
context={"episode": episode},
|
|
)
|
|
|
|
|
|
@app.post("/admin/{feed_id}/{episode_id}/edit")
|
|
def admin_edit_episode_post(
|
|
feed_id: str,
|
|
episode_id: str,
|
|
name: Annotated[str, Form()],
|
|
description: Annotated[str, Form()],
|
|
):
|
|
repo = data.load_repository()
|
|
|
|
if feed_id not in repo.podcasts:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
episode_idx = next(
|
|
(
|
|
idx
|
|
for idx, episode in enumerate(repo.podcasts[feed_id].episodes)
|
|
if episode.id == episode_id
|
|
),
|
|
None,
|
|
)
|
|
|
|
if episode_idx is None:
|
|
raise HTTPException(status_code=404, detail="Episode not found")
|
|
|
|
if name.strip() != "":
|
|
repo.podcasts[feed_id].episodes[episode_idx].name = name
|
|
|
|
if description.strip() != "":
|
|
repo.podcasts[feed_id].episodes[episode_idx].description = description
|
|
else:
|
|
repo.podcasts[feed_id].episodes[episode_idx].description = None
|
|
|
|
data.save_repository(repo)
|
|
|
|
return RedirectResponse(f"/admin/{feed_id}", status_code=303)
|
|
|
|
|
|
@app.get("/admin/{feed_id}/edit")
|
|
def admin_edit_feed(request: Request, feed_id: str):
|
|
repo = data.load_repository()
|
|
|
|
if feed_id not in repo.podcasts:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="admin_feed_edit.html.j2",
|
|
context={"feed": repo.podcasts[feed_id]},
|
|
)
|
|
|
|
|
|
@app.post("/admin/{feed_id}/edit")
|
|
def admin_edit_feed_post(
|
|
feed_id: str,
|
|
name: Annotated[str, Form()],
|
|
description: Annotated[str, Form()],
|
|
image: Optional[UploadFile] = None,
|
|
):
|
|
repo = data.load_repository()
|
|
|
|
if feed_id not in repo.podcasts:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
new_feed = repo.podcasts[feed_id]
|
|
|
|
if name.strip() != "":
|
|
new_feed.name = name
|
|
|
|
new_feed.description = description
|
|
|
|
if image is not None and image.size > 0:
|
|
if not (image.filename.endswith(".jpg") or image.filename.endswith(".png")):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="The uploaded podcast image must be a jpg or png",
|
|
)
|
|
|
|
im = Image.open(image.file)
|
|
|
|
if im.size[0] != im.size[1] or im.size[0] < 1400 or im.size[0] > 3000:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="The uploaded podcast image must be square and between 1400x1400px and 3000x3000px in size",
|
|
)
|
|
|
|
if im.mode != "RGB":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="The uploaded podcast image must be in RGB format",
|
|
)
|
|
|
|
if new_feed.image_filename is not None:
|
|
(settings.directory / new_feed.image_filename).unlink(missing_ok=True)
|
|
|
|
filename = f"img_{uuid.uuid4()}" + Path(image.filename).suffix
|
|
im.save(settings.directory / filename)
|
|
new_feed.image_filename = filename
|
|
|
|
repo.podcasts[feed_id] = new_feed
|
|
|
|
data.save_repository(repo)
|
|
|
|
return RedirectResponse(f"/admin/{feed_id}", status_code=303)
|
|
|
|
|
|
@app.get("/{feed_id}.xml")
|
|
def get_feed(request: Request, feed_id: str):
|
|
repo = data.load_repository()
|
|
feed = next(
|
|
(podcast for id, podcast in repo.podcasts.items() if id == feed_id), None
|
|
)
|
|
|
|
if feed is None:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
podcast = podgen.Podcast(
|
|
name=feed.name,
|
|
description=feed.description,
|
|
image=urllib.parse.urljoin(
|
|
str(request.base_url),
|
|
f"/{feed_id}/{feed.image_filename}",
|
|
)
|
|
if feed.image_filename is not None
|
|
else None,
|
|
website=urllib.parse.urljoin(str(request.base_url), feed_id),
|
|
explicit=feed.explicit,
|
|
feed_url=str(request.url),
|
|
)
|
|
|
|
for episode in feed.episodes:
|
|
podcast.add_episode(
|
|
podgen.Episode(
|
|
id=episode.id,
|
|
title=episode.name,
|
|
publication_date=episode.publish_date,
|
|
media=podgen.Media(
|
|
urllib.parse.urljoin(
|
|
str(request.base_url), f"{feed_id}/{episode.id}.m4a"
|
|
),
|
|
episode.file_size,
|
|
duration=timedelta(seconds=episode.duration)
|
|
if episode.duration is not None
|
|
else None,
|
|
),
|
|
long_summary=episode.description,
|
|
)
|
|
)
|
|
|
|
return Response(content=podcast.rss_str(), media_type="application/xml")
|
|
|
|
|
|
@app.get("/{feed_id}/{filename}")
|
|
def get_episode_or_cover(feed_id: str, filename: str):
|
|
repo = data.load_repository()
|
|
feed = next(
|
|
(podcast for id, podcast in repo.podcasts.items() if id == feed_id), None
|
|
)
|
|
|
|
if feed is None:
|
|
raise HTTPException(status_code=404, detail="Podcast not found")
|
|
|
|
if filename.endswith(".m4a"):
|
|
episode = next(
|
|
(
|
|
episode
|
|
for episode in feed.episodes
|
|
if episode.id == filename.removesuffix(".m4a")
|
|
),
|
|
None,
|
|
)
|
|
|
|
if episode is None:
|
|
raise HTTPException(status_code=404, detail="Episode not found")
|
|
|
|
return FileResponse(settings.directory / f"{episode.id}.m4a")
|
|
|
|
elif (
|
|
filename.endswith(".jpg") or filename.endswith(".png")
|
|
) and filename == feed.image_filename:
|
|
return FileResponse(settings.directory / feed.image_filename)
|
|
|
|
return HTTPException(status_code=404, detail="File not found")
|