114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
import hashlib
|
|
import queue
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Callable, Optional
|
|
|
|
import ffmpeg
|
|
import structlog
|
|
from ffmpeg_normalize import FFmpegNormalize
|
|
|
|
log = structlog.get_logger()
|
|
|
|
|
|
class AudioProcessor:
|
|
def __init__(self):
|
|
self.queue: queue.Queue[
|
|
(Path, Path, Callable[[Optional[float], str, int], None])
|
|
] = queue.Queue()
|
|
self.is_running = False
|
|
self.processor_thread: Optional[threading.Thread] = None
|
|
|
|
def add_file(
|
|
self,
|
|
input_filename: Path,
|
|
output_filename: Path,
|
|
generate_callback: Callable[[Optional[float], str, int], None],
|
|
) -> None:
|
|
self.queue.put((input_filename, output_filename, generate_callback))
|
|
log.debug(
|
|
"Added file for processing",
|
|
input_filename=input_filename,
|
|
output_filename=output_filename,
|
|
)
|
|
|
|
def start_processing(self) -> None:
|
|
if self.is_running:
|
|
return
|
|
|
|
self.is_running = True
|
|
self.processor_thread = threading.Thread(target=self._process_queue)
|
|
self.processor_thread.daemon = True
|
|
self.processor_thread.start()
|
|
log.info("Started audio processing queue")
|
|
|
|
def stop_processing(self) -> None:
|
|
self.is_running = False
|
|
|
|
def process_audio(
|
|
self,
|
|
input_filename: Path,
|
|
output_filename: Path,
|
|
generate_callback: Callable[[Optional[float], str, int], None],
|
|
) -> None:
|
|
log.info(
|
|
"Processing file",
|
|
input_filename=input_filename,
|
|
output_filename=output_filename,
|
|
)
|
|
|
|
if not input_filename.is_file():
|
|
log.error("Could not process non-file", input_filename=input_filename)
|
|
return
|
|
|
|
ffmpeg_normalize = FFmpegNormalize(
|
|
"ebu", audio_codec="aac", audio_bitrate="192k"
|
|
)
|
|
ffmpeg_normalize.add_media_file(str(input_filename), str(output_filename))
|
|
ffmpeg_normalize.run_normalization()
|
|
|
|
# get duration
|
|
probe = ffmpeg.probe(str(output_filename))
|
|
stream = next(
|
|
(stream for stream in probe["streams"] if stream["codec_type"] == "audio"),
|
|
None,
|
|
)
|
|
|
|
file_hash = hashlib.sha256()
|
|
with open(output_filename, "rb") as f:
|
|
for byte_block in iter(lambda: f.read(4096), b""):
|
|
file_hash.update(byte_block)
|
|
|
|
input_filename.unlink()
|
|
|
|
generate_callback(
|
|
float(stream["duration"])
|
|
if stream is not None and "duration" in stream
|
|
else None,
|
|
file_hash.hexdigest(),
|
|
output_filename.stat().st_size,
|
|
)
|
|
|
|
def _process_queue(self) -> None:
|
|
while self.is_running:
|
|
try:
|
|
(input_filename, output_filename, generate_callback) = self.queue.get(
|
|
timeout=1.0
|
|
)
|
|
|
|
try:
|
|
self.process_audio(
|
|
input_filename, output_filename, generate_callback
|
|
)
|
|
log.info("Finished processing", output_filename=output_filename)
|
|
except Exception as e:
|
|
log.error(
|
|
"Failed processing",
|
|
input_filename=input_filename,
|
|
output_filename=output_filename,
|
|
error=e,
|
|
)
|
|
|
|
self.queue.task_done()
|
|
except queue.Empty:
|
|
continue
|