added buffered reads from disk io and transcoding

This commit is contained in:
evilchili 2024-03-17 15:15:41 -07:00
parent 4ee4fb4a73
commit c94fb127ed
5 changed files with 140 additions and 11 deletions

View File

@ -1,6 +1,5 @@
import queue import queue
import logging import logging
import io
import os import os
import threading import threading
from functools import cached_property from functools import cached_property
@ -28,8 +27,7 @@ class AudioStreamer(threading.Thread):
@cached_property @cached_property
def silence(self): def silence(self):
with (Path(__file__).parent / 'silence.mp3').open('rb') as stream: return transcoder.open(Path(__file__).parent / 'silence.mp3', bufsize=2*self.chunk_size)
return io.BytesIO(stream.read())
@cached_property @cached_property
def _shout(self): def _shout(self):
@ -92,13 +90,12 @@ class AudioStreamer(threading.Thread):
logger.debug("Load event cleared.") logger.debug("Load event cleared.")
def _read_chunk(self, filehandle): def _read_chunk(self, filehandle):
chunk = filehandle.read(self.chunk_size) return filehandle.read(self.chunk_size)
return chunk
def play_file(self, track: Path): def play_file(self, track: Path):
logger.debug(f"Streaming {track.stem = }") logger.debug(f"Streaming {track.stem = }")
self._shout.set_metadata({"song": track.stem}) self._shout.set_metadata({"song": track.stem})
with transcoder.open(track) as fh: with transcoder.open(track, bufsize=2*self.chunk_size) as fh:
return self.play_from_stream(fh) return self.play_from_stream(fh)
def play_from_stream(self, stream): def play_from_stream(self, stream):

View File

@ -7,7 +7,7 @@ import ffmpeg
logger = logging.getLogger('transcoder') logger = logging.getLogger('transcoder')
def open(infile: Path): def open(infile: Path, bufsize: int = 4096):
""" """
Return a stream of mp3 data for the given path on disk. Return a stream of mp3 data for the given path on disk.
@ -18,7 +18,7 @@ def open(infile: Path):
suffix = infile.suffix.lower() suffix = infile.suffix.lower()
if suffix == '.mp3': if suffix == '.mp3':
logger.debug(f"Not transcoding mp3 {infile = }") logger.debug(f"Not transcoding mp3 {infile = }")
return infile.open('rb') return infile.open('rb', buffering=bufsize)
ffmpeg_args = ( ffmpeg_args = (
ffmpeg ffmpeg
@ -29,7 +29,7 @@ def open(infile: Path):
) )
# Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg. # Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg.
proc = subprocess.Popen(ffmpeg_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE) proc = subprocess.Popen(ffmpeg_args, bufsize=bufsize, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
proc.stdin.close() proc.stdin.close()
logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }") logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }")
return proc.stdout return proc.stdout

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "croaker" name = "croaker"
version = "0.9.1" version = "0.9.2"
description = "" description = ""
authors = ["evilchili <evilchili@gmail.com>"] authors = ["evilchili <evilchili@gmail.com>"]
readme = "README.md" readme = "README.md"

132
test/test_streamer.py Normal file
View File

@ -0,0 +1,132 @@
from pathlib import Path
from unittest.mock import MagicMock
import io
import queue
import threading
import pytest
import shout
from croaker import streamer, playlist
def get_stream_output(stream):
stream.seek(0, 0)
return stream.read()
@pytest.fixture(scope='session')
def silence_bytes():
return (Path(streamer.__file__).parent / 'silence.mp3').read_bytes()
@pytest.fixture
def output_stream():
return io.BytesIO()
@pytest.fixture
def mock_shout(output_stream, monkeypatch):
def handle_send(buf):
output_stream.write(buf)
mm = MagicMock(spec=shout.Shout, **{
'return_value.send.side_effect': handle_send
})
monkeypatch.setattr('shout.Shout', mm)
return mm
@pytest.fixture
def input_queue():
return queue.Queue()
@pytest.fixture
def skip_event():
return threading.Event()
@pytest.fixture
def stop_event():
return threading.Event()
@pytest.fixture
def load_event():
return threading.Event()
@pytest.fixture
def audio_streamer(mock_shout, input_queue, skip_event, stop_event, load_event):
return streamer.AudioStreamer(input_queue, skip_event, stop_event, load_event)
def test_streamer_stop(audio_streamer, stop_event, output_stream):
stop_event.set()
audio_streamer.do_one_loop()
assert not stop_event.is_set()
def test_streamer_skip(audio_streamer, skip_event, output_stream):
skip_event.set()
audio_streamer.do_one_loop()
assert not skip_event.is_set()
def test_streamer_load(audio_streamer, load_event, output_stream):
load_event.set()
audio_streamer.do_one_loop()
assert not load_event.is_set()
def test_clear_queue(audio_streamer, input_queue):
pl = playlist.Playlist(name='test_playlist')
for track in pl.tracks:
input_queue.put(bytes(track))
assert input_queue.not_empty
audio_streamer.clear_queue()
assert input_queue.empty
def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes):
audio_streamer.do_one_loop()
track = playlist.Playlist(name='test_playlist').tracks[0]
input_queue.put(bytes(track))
audio_streamer.do_one_loop()
audio_streamer.do_one_loop()
assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes
def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes):
monkeypatch.setattr(audio_streamer, 'play_file', MagicMock(side_effect=Exception))
track = playlist.Playlist(name='test_playlist').tracks[0]
input_queue.put(bytes(track))
audio_streamer.do_one_loop()
assert get_stream_output(output_stream) == silence_bytes
def test_streamer_plays_from_queue(audio_streamer, input_queue, output_stream):
pl = playlist.Playlist(name='test_playlist')
expected = b''
for track in pl.tracks:
input_queue.put(bytes(track))
expected += track.read_bytes()
while not input_queue.empty():
audio_streamer.do_one_loop()
assert get_stream_output(output_stream) == expected
def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event):
stop_event.set()
audio_streamer.silence.seek(0, 0)
audio_streamer.play_from_stream(audio_streamer.silence)
assert get_stream_output(output_stream) == b''
def test_streamer_handles_load_interrupt(audio_streamer, input_queue, output_stream, load_event):
pl = playlist.Playlist(name='test_playlist')
input_queue.put(bytes(pl.tracks[0]))
load_event.set()
audio_streamer.silence.seek(0, 0)
audio_streamer.play_from_stream(audio_streamer.silence)
assert get_stream_output(output_stream) == b''
assert input_queue.empty

View File

@ -8,7 +8,7 @@ from croaker import transcoder
@pytest.mark.parametrize('suffix, expected', [ @pytest.mark.parametrize('suffix, expected', [
('.mp3', b''), ('.mp3', b'_theme.mp3\n'),
('.foo', b'transcoding!\n'), ('.foo', b'transcoding!\n'),
]) ])
def test_transcoder_open(monkeypatch, suffix, expected): def test_transcoder_open(monkeypatch, suffix, expected):