133 lines
4.4 KiB
Python
133 lines
4.4 KiB
Python
import logging
|
|
import os
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from io import BufferedReader
|
|
from pathlib import Path
|
|
|
|
import ffmpeg
|
|
|
|
logger = logging.getLogger("transcoder")
|
|
|
|
|
|
@dataclass
|
|
class FrameAlignedStream:
|
|
"""
|
|
Use ffmpeg to transcode a source audio file to mp3 and iterate over the result
|
|
in frame-aligned chunks. This will ensure that readers will always have a full
|
|
frame of audio data to parse or emit.
|
|
|
|
I learned a lot from https://github.com/pylon/streamp3 figuring this stuff out!
|
|
|
|
Usage:
|
|
|
|
>>> stream = FrameAlignedStream.from_source(Path('test.flac').open('rb'))
|
|
>>> for segment in stream:
|
|
...
|
|
"""
|
|
|
|
source: BufferedReader
|
|
chunk_size: int = 1024
|
|
bit_rate: int = 192000
|
|
sample_rate: int = 44100
|
|
|
|
@property
|
|
def frames(self):
|
|
while True:
|
|
frame = self._read_one_frame()
|
|
if frame is None:
|
|
return
|
|
yield frame
|
|
|
|
def _read_one_frame(self):
|
|
"""
|
|
Read the next full audio frame from the input source and return it
|
|
"""
|
|
|
|
# step through the source a byte at a time and look for the frame sync.
|
|
header = None
|
|
buffer = b""
|
|
while not header:
|
|
buffer += self.source.read(4 - len(buffer))
|
|
if len(buffer) != 4:
|
|
logging.debug("Reached the end of the source stream without finding another framesync.")
|
|
return False
|
|
header = buffer[:4]
|
|
if header[0] != 0b11111111 or header[1] >> 5 != 0b111:
|
|
logging.debug(f"Expected a framesync but got {buffer} instead; moving fwd 1 byte.")
|
|
header = None
|
|
buffer = buffer[1:]
|
|
|
|
# Decode the mp3 header. We could derive the bit_rate and sample_rate
|
|
# here if we had the lookup tables etc. from the MPEG spec, but since
|
|
# we control the input, we can rely on them being predefined.
|
|
version_code = (header[1] & 0b00011000) >> 3
|
|
padding_code = (header[2] & 0b00000010) >> 1
|
|
version = version_code & 1 if version_code >> 1 else 2
|
|
is_padded = bool(padding_code)
|
|
|
|
# calculate the size of the whole frame
|
|
frame_size = 1152 if version == 1 else 576
|
|
frame_size = self.bit_rate // 8 * frame_size // self.sample_rate
|
|
if is_padded:
|
|
frame_size += 1
|
|
|
|
# read the rest of the frame from the source
|
|
frame_data = self.source.read(frame_size - len(header))
|
|
if len(frame_data) != frame_size - len(header):
|
|
logging.debug("Reached the end of the source stream without finding a full frame.")
|
|
return None
|
|
|
|
# return the entire frame
|
|
return header + frame_data
|
|
|
|
def __iter__(self):
|
|
"""
|
|
Generate approximately chunk_size segments of audio data by iterating over the
|
|
frames, buffering them, and then yielding several as a single bytes object.
|
|
"""
|
|
buf = b""
|
|
for frame in self.frames:
|
|
if len(buf) >= self.chunk_size:
|
|
yield buf
|
|
buf = b""
|
|
if not frame:
|
|
break
|
|
buf += frame
|
|
if buf:
|
|
yield buf
|
|
|
|
@classmethod
|
|
def from_source(cls, infile: Path, **kwargs):
|
|
"""
|
|
Create a FrameAlignedStream instance by transcoding an audio source on disk.
|
|
"""
|
|
|
|
args = [] if os.environ.get("DEBUG") else ["-hide_banner", "-loglevel", "quiet"]
|
|
ffmpeg_args = (
|
|
ffmpeg.input(str(infile))
|
|
.output(
|
|
"pipe:",
|
|
map="a",
|
|
format="mp3",
|
|
# no ID3 headers -- saves having to decode them later
|
|
write_xing=0,
|
|
id3v2_version=0,
|
|
# force sample and bit rates
|
|
**{
|
|
"b:a": kwargs.get("bit_rate", cls.bit_rate),
|
|
"ar": kwargs.get("sample_rate", cls.sample_rate),
|
|
},
|
|
)
|
|
.global_args("-vn", *args)
|
|
.compile()
|
|
)
|
|
|
|
# Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg.
|
|
proc = subprocess.Popen(
|
|
ffmpeg_args, bufsize=kwargs.get("chunk_size", cls.chunk_size), stdout=subprocess.PIPE, stdin=subprocess.PIPE
|
|
)
|
|
proc.stdin.close()
|
|
logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }")
|
|
return cls(proc.stdout, **kwargs)
|