From c94fb127edf683f01b4be2a722f400bea382836c Mon Sep 17 00:00:00 2001 From: evilchili Date: Sun, 17 Mar 2024 15:15:41 -0700 Subject: [PATCH] added buffered reads from disk io and transcoding --- croaker/streamer.py | 9 +-- croaker/transcoder.py | 6 +- pyproject.toml | 2 +- test/test_streamer.py | 132 ++++++++++++++++++++++++++++++++++++++++ test/test_transcoder.py | 2 +- 5 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 test/test_streamer.py diff --git a/croaker/streamer.py b/croaker/streamer.py index f0126c7..327a0b0 100644 --- a/croaker/streamer.py +++ b/croaker/streamer.py @@ -1,6 +1,5 @@ import queue import logging -import io import os import threading from functools import cached_property @@ -28,8 +27,7 @@ class AudioStreamer(threading.Thread): @cached_property def silence(self): - with (Path(__file__).parent / 'silence.mp3').open('rb') as stream: - return io.BytesIO(stream.read()) + return transcoder.open(Path(__file__).parent / 'silence.mp3', bufsize=2*self.chunk_size) @cached_property def _shout(self): @@ -92,13 +90,12 @@ class AudioStreamer(threading.Thread): logger.debug("Load event cleared.") def _read_chunk(self, filehandle): - chunk = filehandle.read(self.chunk_size) - return chunk + return filehandle.read(self.chunk_size) def play_file(self, track: Path): logger.debug(f"Streaming {track.stem = }") self._shout.set_metadata({"song": track.stem}) - with transcoder.open(track) as fh: + with transcoder.open(track, bufsize=2*self.chunk_size) as fh: return self.play_from_stream(fh) def play_from_stream(self, stream): diff --git a/croaker/transcoder.py b/croaker/transcoder.py index 253cb61..c14e988 100644 --- a/croaker/transcoder.py +++ b/croaker/transcoder.py @@ -7,7 +7,7 @@ import ffmpeg logger = logging.getLogger('transcoder') -def open(infile: Path): +def open(infile: Path, bufsize: int = 4096): """ Return a stream of mp3 data for the given path on disk. @@ -18,7 +18,7 @@ def open(infile: Path): suffix = infile.suffix.lower() if suffix == '.mp3': logger.debug(f"Not transcoding mp3 {infile = }") - return infile.open('rb') + return infile.open('rb', buffering=bufsize) ffmpeg_args = ( ffmpeg @@ -29,7 +29,7 @@ def open(infile: Path): ) # Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg. - proc = subprocess.Popen(ffmpeg_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + proc = subprocess.Popen(ffmpeg_args, bufsize=bufsize, stdout=subprocess.PIPE, stdin=subprocess.PIPE) proc.stdin.close() logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }") return proc.stdout diff --git a/pyproject.toml b/pyproject.toml index 03936c3..15141dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "croaker" -version = "0.9.1" +version = "0.9.2" description = "" authors = ["evilchili "] readme = "README.md" diff --git a/test/test_streamer.py b/test/test_streamer.py new file mode 100644 index 0000000..f6a7cfa --- /dev/null +++ b/test/test_streamer.py @@ -0,0 +1,132 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import io +import queue +import threading + +import pytest +import shout + +from croaker import streamer, playlist + + +def get_stream_output(stream): + stream.seek(0, 0) + return stream.read() + + +@pytest.fixture(scope='session') +def silence_bytes(): + return (Path(streamer.__file__).parent / 'silence.mp3').read_bytes() + + +@pytest.fixture +def output_stream(): + return io.BytesIO() + + +@pytest.fixture +def mock_shout(output_stream, monkeypatch): + def handle_send(buf): + output_stream.write(buf) + mm = MagicMock(spec=shout.Shout, **{ + 'return_value.send.side_effect': handle_send + }) + monkeypatch.setattr('shout.Shout', mm) + return mm + + +@pytest.fixture +def input_queue(): + return queue.Queue() + +@pytest.fixture +def skip_event(): + return threading.Event() + + +@pytest.fixture +def stop_event(): + return threading.Event() + + +@pytest.fixture +def load_event(): + return threading.Event() + + +@pytest.fixture +def audio_streamer(mock_shout, input_queue, skip_event, stop_event, load_event): + return streamer.AudioStreamer(input_queue, skip_event, stop_event, load_event) + + +def test_streamer_stop(audio_streamer, stop_event, output_stream): + stop_event.set() + audio_streamer.do_one_loop() + assert not stop_event.is_set() + + +def test_streamer_skip(audio_streamer, skip_event, output_stream): + skip_event.set() + audio_streamer.do_one_loop() + assert not skip_event.is_set() + + +def test_streamer_load(audio_streamer, load_event, output_stream): + load_event.set() + audio_streamer.do_one_loop() + assert not load_event.is_set() + + +def test_clear_queue(audio_streamer, input_queue): + pl = playlist.Playlist(name='test_playlist') + for track in pl.tracks: + input_queue.put(bytes(track)) + assert input_queue.not_empty + audio_streamer.clear_queue() + assert input_queue.empty + + +def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes): + audio_streamer.do_one_loop() + track = playlist.Playlist(name='test_playlist').tracks[0] + input_queue.put(bytes(track)) + audio_streamer.do_one_loop() + audio_streamer.do_one_loop() + assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes + + +def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes): + monkeypatch.setattr(audio_streamer, 'play_file', MagicMock(side_effect=Exception)) + track = playlist.Playlist(name='test_playlist').tracks[0] + input_queue.put(bytes(track)) + audio_streamer.do_one_loop() + assert get_stream_output(output_stream) == silence_bytes + +def test_streamer_plays_from_queue(audio_streamer, input_queue, output_stream): + pl = playlist.Playlist(name='test_playlist') + expected = b'' + for track in pl.tracks: + input_queue.put(bytes(track)) + expected += track.read_bytes() + while not input_queue.empty(): + audio_streamer.do_one_loop() + assert get_stream_output(output_stream) == expected + + +def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event): + stop_event.set() + audio_streamer.silence.seek(0, 0) + audio_streamer.play_from_stream(audio_streamer.silence) + assert get_stream_output(output_stream) == b'' + + +def test_streamer_handles_load_interrupt(audio_streamer, input_queue, output_stream, load_event): + pl = playlist.Playlist(name='test_playlist') + input_queue.put(bytes(pl.tracks[0])) + load_event.set() + audio_streamer.silence.seek(0, 0) + audio_streamer.play_from_stream(audio_streamer.silence) + assert get_stream_output(output_stream) == b'' + assert input_queue.empty diff --git a/test/test_transcoder.py b/test/test_transcoder.py index a098c11..b71078b 100644 --- a/test/test_transcoder.py +++ b/test/test_transcoder.py @@ -8,7 +8,7 @@ from croaker import transcoder @pytest.mark.parametrize('suffix, expected', [ - ('.mp3', b''), + ('.mp3', b'_theme.mp3\n'), ('.foo', b'transcoding!\n'), ]) def test_transcoder_open(monkeypatch, suffix, expected):