import queue import threading from pathlib import Path from typing import Optional import structlog from ffmpeg_normalize import FFmpegNormalize from settings import Settings DELETE_INPUTS = Settings().delete_consume_files log = structlog.get_logger() class AudioProcessor: def __init__(self): self.queue: queue.Queue[(Path, Path)] = queue.Queue() self.is_running = False self.processor_thread: Optional[threading.Thread] = None def add_file(self, input_filename: Path, output_filename: Path) -> None: self.queue.put((input_filename, output_filename)) log.debug( "Added file for processing", input_filename=input_filename, output_filename=output_filename, ) def start_processing(self) -> None: if self.is_running: return self.is_running = True self.processor_thread = threading.Thread(target=self._process_queue) self.processor_thread.daemon = True self.processor_thread.start() log.info("Started audio processing queue") def stop_processing(self) -> None: self.is_running = False def process_audio(self, input_filename: Path, output_filename: Path) -> None: log.info( "Processing file", input_filename=input_filename, output_filename=output_filename, ) ffmpeg_normalize = FFmpegNormalize( "ebu", audio_codec="aac", audio_bitrate="192k" ) ffmpeg_normalize.add_media_file(str(input_filename), str(output_filename)) ffmpeg_normalize.run_normalization() # delete the original if DELETE_INPUTS: output_filename.unlink() def _process_queue(self) -> None: while self.is_running: try: (input_filename, output_filename) = self.queue.get(timeout=1.0) try: self.process_audio(input_filename, output_filename) log.info("Finished processing", output_filename=output_filename) except Exception as e: log.error( "Failed processing", input_filename=input_filename, output_filename=output_filename, error=e, ) self.queue.task_done() except queue.Empty: continue