import hashlib import queue import threading from pathlib import Path from typing import Callable, Optional import ffmpeg import structlog from ffmpeg_normalize import FFmpegNormalize log = structlog.get_logger() class AudioProcessor: def __init__(self): self.queue: queue.Queue[ (Path, Path, Callable[[Optional[float], str, int], None]) ] = queue.Queue() self.is_running = False self.processor_thread: Optional[threading.Thread] = None def add_file( self, input_filename: Path, output_filename: Path, generate_callback: Callable[[Optional[float], str, int], None], ) -> None: self.queue.put((input_filename, output_filename, generate_callback)) log.debug( "Added file for processing", input_filename=input_filename, output_filename=output_filename, ) def start_processing(self) -> None: if self.is_running: return self.is_running = True self.processor_thread = threading.Thread(target=self._process_queue) self.processor_thread.daemon = True self.processor_thread.start() log.info("Started audio processing queue") def stop_processing(self) -> None: self.is_running = False def process_audio( self, input_filename: Path, output_filename: Path, generate_callback: Callable[[Optional[float], str, int], None], ) -> None: log.info( "Processing file", input_filename=input_filename, output_filename=output_filename, ) if not input_filename.is_file(): log.error("Could not process non-file", input_filename=input_filename) return ffmpeg_normalize = FFmpegNormalize( "peak", audio_codec="aac", audio_bitrate="192k", target_level=-0.1 ) ffmpeg_normalize.add_media_file(str(input_filename), str(output_filename)) ffmpeg_normalize.run_normalization() # get duration probe = ffmpeg.probe(str(output_filename)) stream = next( (stream for stream in probe["streams"] if stream["codec_type"] == "audio"), None, ) file_hash = hashlib.sha256() with open(output_filename, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): file_hash.update(byte_block) input_filename.unlink() generate_callback( float(stream["duration"]) if stream is not None and "duration" in stream else None, file_hash.hexdigest(), output_filename.stat().st_size, ) def _process_queue(self) -> None: while self.is_running: try: (input_filename, output_filename, generate_callback) = self.queue.get( timeout=1.0 ) try: self.process_audio( input_filename, output_filename, generate_callback ) log.info("Finished processing", output_filename=output_filename) except Exception as e: log.error( "Failed processing", input_filename=input_filename, output_filename=output_filename, error=e, ) self.queue.task_done() except queue.Empty: continue