import io import logging import os import subprocess from dataclasses import dataclass from pathlib import Path import ffmpeg logger = logging.getLogger("transcoder") @dataclass class FrameAlignedStream: """ Use ffmpeg to transcode a source audio file to mp3 and iterate over the result in frame-aligned chunks. This will ensure that readers will always have a full frame of audio data to parse or emit. I learned a lot from https://github.com/pylon/streamp3 figuring this stuff out! Usage: >>> stream = FrameAlignedStream.from_source(Path('test.flac').open('rb')) >>> for segment in stream: ... """ source_file: Path chunk_size: int = 1024 bit_rate: int = 192000 sample_rate: int = 44100 _transcoder: subprocess.Popen = None _buffer: io.BufferedReader = None @property def source(self): if self._buffer: return self._buffer if self._transcoder: return self._transcoder.stdout logger.info("Source is empty") return None @property def frames(self): while True: frame = self._read_one_frame() if not frame: return yield frame def _read_one_frame(self): """ Read the next full audio frame from the input source and return it """ # step through the source a byte at a time and look for the frame sync. header = None buffer = b"" while not header: buffer += self.source.read(4 - len(buffer)) if len(buffer) != 4: logging.debug("Reached the end of the source stream without finding another framesync.") return False header = buffer[:4] if header[0] != 0b11111111 or header[1] >> 5 != 0b111: logging.debug(f"Expected a framesync but got {buffer} instead; moving fwd 1 byte.") header = None buffer = buffer[1:] # Decode the mp3 header. We could derive the bit_rate and sample_rate # here if we had the lookup tables etc. from the MPEG spec, but since # we control the input, we can rely on them being predefined. version_code = (header[1] & 0b00011000) >> 3 padding_code = (header[2] & 0b00000010) >> 1 version = version_code & 1 if version_code >> 1 else 2 is_padded = bool(padding_code) # calculate the size of the whole frame frame_size = 1152 if version == 1 else 576 frame_size = self.bit_rate // 8 * frame_size // self.sample_rate if is_padded: frame_size += 1 # read the rest of the frame from the source frame_data = self.source.read(frame_size - len(header)) if len(frame_data) != frame_size - len(header): logging.debug("Reached the end of the source stream without finding a full frame.") return None # return the entire frame return header + frame_data def __iter__(self): """ Generate approximately chunk_size segments of audio data by iterating over the frames, buffering them, and then yielding several as a single bytes object. """ try: self._start_transcoder() buf = b"" for frame in self.frames: if len(buf) >= self.chunk_size: yield buf buf = b"" if not frame: break buf += frame if buf: yield buf finally: self._stop_transcoder() def _stop_transcoder(self): if self._transcoder: logger.debug(f"Killing {self._transcoder = }") self._transcoder.kill() self._transcoder = None self._buffer = None def _start_transcoder(self): args = [] if os.environ.get("DEBUG") else ["-hide_banner", "-loglevel", "quiet"] self._transcoder = subprocess.Popen( ( ffmpeg.input(str(self.source_file)) .output( "pipe:", map="a", format="mp3", # no ID3 headers -- saves having to decode them later write_xing=0, id3v2_version=0, # force sample and bit rates **{ "b:a": self.bit_rate, "ar": self.sample_rate, }, ) .global_args("-vn", *args) .compile() ), bufsize=self.chunk_size, stdout=subprocess.PIPE, stdin=subprocess.PIPE, ) # Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg. self._transcoder.stdin.close() logger.debug(f"Spawned ffmpeg (PID {self._transcoder.pid}): {' '.join(self._transcoder.args)}")