podcast-generator/process.py
Jake Walker dd268a8028
All checks were successful
ci/woodpecker/push/build Pipeline was successful
web app rewrite
2025-01-10 13:30:09 +00:00

114 lines
3.5 KiB
Python

import hashlib
import queue
import threading
from pathlib import Path
from typing import Callable, Optional
import ffmpeg
import structlog
from ffmpeg_normalize import FFmpegNormalize
log = structlog.get_logger()
class AudioProcessor:
def __init__(self):
self.queue: queue.Queue[
(Path, Path, Callable[[Optional[float], str, int], None])
] = queue.Queue()
self.is_running = False
self.processor_thread: Optional[threading.Thread] = None
def add_file(
self,
input_filename: Path,
output_filename: Path,
generate_callback: Callable[[Optional[float], str, int], None],
) -> None:
self.queue.put((input_filename, output_filename, generate_callback))
log.debug(
"Added file for processing",
input_filename=input_filename,
output_filename=output_filename,
)
def start_processing(self) -> None:
if self.is_running:
return
self.is_running = True
self.processor_thread = threading.Thread(target=self._process_queue)
self.processor_thread.daemon = True
self.processor_thread.start()
log.info("Started audio processing queue")
def stop_processing(self) -> None:
self.is_running = False
def process_audio(
self,
input_filename: Path,
output_filename: Path,
generate_callback: Callable[[Optional[float], str, int], None],
) -> None:
log.info(
"Processing file",
input_filename=input_filename,
output_filename=output_filename,
)
if not input_filename.is_file():
log.error("Could not process non-file", input_filename=input_filename)
return
ffmpeg_normalize = FFmpegNormalize(
"ebu", audio_codec="aac", audio_bitrate="192k"
)
ffmpeg_normalize.add_media_file(str(input_filename), str(output_filename))
ffmpeg_normalize.run_normalization()
# get duration
probe = ffmpeg.probe(str(output_filename))
stream = next(
(stream for stream in probe["streams"] if stream["codec_type"] == "audio"),
None,
)
file_hash = hashlib.sha256()
with open(output_filename, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
file_hash.update(byte_block)
input_filename.unlink()
generate_callback(
float(stream["duration"])
if stream is not None and "duration" in stream
else None,
file_hash.hexdigest(),
output_filename.stat().st_size,
)
def _process_queue(self) -> None:
while self.is_running:
try:
(input_filename, output_filename, generate_callback) = self.queue.get(
timeout=1.0
)
try:
self.process_audio(
input_filename, output_filename, generate_callback
)
log.info("Finished processing", output_filename=output_filename)
except Exception as e:
log.error(
"Failed processing",
input_filename=input_filename,
output_filename=output_filename,
error=e,
)
self.queue.task_done()
except queue.Empty:
continue