diff --git a/src/croaker/streamer.py b/src/croaker/streamer.py index 88aa424..f4bcfd5 100644 --- a/src/croaker/streamer.py +++ b/src/croaker/streamer.py @@ -26,7 +26,7 @@ class AudioStreamer(threading.Thread): self.load_requested = load_event self.chunk_size = chunk_size - @cached_property + @property def silence(self): return transcoder.open(Path(__file__).parent / "silence.mp3", bufsize=2 * self.chunk_size) @@ -48,82 +48,62 @@ class AudioStreamer(threading.Thread): self._shout.open() logger.debug(f"Connnected to shoutcast server at {self._shout.host}:{self._shout.port}") while True: - self.do_one_loop() - self._shout.close() - - def do_one_loop(self): - # If the user said STOP, clear the queue. - if self.stop_requested.is_set(): - logger.debug("Stop requested; clearing queue.") - self.clear_queue() - self.stop_requested.clear() - - # Check to see if there is a queued request. If there is, play it. - # If there isn't, or if there's a problem playing the request, - # fallback to silence. - not_playing = False - try: - request = self.queue.get(block=False) - logger.debug(f"Received: {request = }") - self.play_file(Path(request.decode())) - except queue.Empty: - logger.debug("Nothing queued; looping silence.") - not_playing = True - except Exception as exc: - logger.error("Caught exception; falling back to silence.", exc_info=exc) - not_playing = True - - if not_playing: try: - self.silence.seek(0, 0) - self._shout.set_metadata({"song": "[NOTHING PLAYING]"}) - self.play_from_stream(self.silence) - except Exception as exc: # pragma: no cover - logger.error("Caught exception trying to loop silence!", exc_info=exc) + self.stream_queued_audio() + except Exception as exc: + logger.error("Caught exception.", exc_info=exc) + self._shout.close() def clear_queue(self): logger.debug("Clearing queue...") while not self.queue.empty(): - track = self.queue.get() - logger.debug(f"Clearing: {track}") - self.load_requested.clear() - logger.debug("Load event cleared.") + self.queue.get() def _read_chunk(self, filehandle): return filehandle.read(self.chunk_size) - def play_file(self, track: Path): - logger.debug(f"Streaming {track.stem = }") - self._shout.set_metadata({"song": track.stem}) - with transcoder.open(track, bufsize=2 * self.chunk_size) as fh: - return self.play_from_stream(fh) + def queued_audio_source(self): + """ + Return a filehandle to the next queued audio source, or silence if the queue is empty. + """ + try: + track = Path(self.queue.get(block=False).decode()) + logger.debug(f"Streaming {track.stem = }") + self._shout.set_metadata({"song": track.stem}) + return transcoder.open(track, bufsize=2 * self.chunk_size) + except queue.Empty: + logger.debug("Nothing queued; looping silence.") + except Exception as exc: + logger.error("Caught exception; falling back to silence.", exc_info=exc) + self._shout.set_metadata({"song": "[NOTHING PLAYING]"}) + return self.silence - def play_from_stream(self, stream): - self._shout.get_connected() - input_buffer = self._read_chunk(stream) - while True: - # To load a playlist, stop streaming the current track and clear the queue - # but do not clear the event. run() will detect it and - if self.load_requested.is_set(): - logger.debug("Load was requested.") - self.clear_queue() - return + def stream_queued_audio(self): + with self.queued_audio_source() as stream: + buf = self._read_chunk(stream) + while len(buf): + # stop streaming the current source + if self.skip_requested.is_set(): + logger.debug("Skip was requested.") + self.skip_requested.clear() + return - # Stop streaming and clear the queue - if self.stop_requested.is_set(): - logger.debug("Stop was requested; aborting current stream.") - return + # clear the queue + if self.load_requested.is_set(): + logger.debug("Load was requested.") + self.clear_queue() + self.load_requested.clear() + return - # Stop streaming and clear the queue - if self.skip_requested.is_set(): - logger.debug("Skip was requested.") - self.skip_requested.clear() - return + # Stop streaming and clear the queue + if self.stop_requested.is_set(): + logger.debug("Stop was requested.") + self.clear_queue() + self.stop_requested.clear() + return - # continue streaming the current track to icecast, until complete - buf = input_buffer - input_buffer = self._read_chunk(stream) - if len(buf) == 0: - break - self._shout.send(buf) - self._shout.sync() + # stream buffered audio and refill with the next chunk + input_buffer = self._read_chunk(stream) + self._shout.send(buf) + self._shout.sync() + buf = input_buffer diff --git a/test/test_streamer.py b/test/test_streamer.py index a814443..95b2eed 100644 --- a/test/test_streamer.py +++ b/test/test_streamer.py @@ -62,19 +62,19 @@ def audio_streamer(mock_shout, input_queue, skip_event, stop_event, load_event): def test_streamer_stop(audio_streamer, stop_event, output_stream): stop_event.set() - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() assert not stop_event.is_set() def test_streamer_skip(audio_streamer, skip_event, output_stream): skip_event.set() - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() assert not skip_event.is_set() def test_streamer_load(audio_streamer, load_event, output_stream): load_event.set() - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() assert not load_event.is_set() @@ -88,19 +88,19 @@ def test_clear_queue(audio_streamer, input_queue): def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes): - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() track = playlist.Playlist(name="test_playlist").tracks[0] input_queue.put(bytes(track)) - audio_streamer.do_one_loop() - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() + audio_streamer.stream_queued_audio() assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes): - monkeypatch.setattr(audio_streamer, "play_file", MagicMock(side_effect=Exception)) + monkeypatch.setattr(audio_streamer.queue, "get", MagicMock(side_effect=Exception)) track = playlist.Playlist(name="test_playlist").tracks[0] input_queue.put(bytes(track)) - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() assert get_stream_output(output_stream) == silence_bytes @@ -111,14 +111,14 @@ def test_streamer_plays_from_queue(audio_streamer, input_queue, output_stream): input_queue.put(bytes(track)) expected += track.read_bytes() while not input_queue.empty(): - audio_streamer.do_one_loop() + audio_streamer.stream_queued_audio() assert get_stream_output(output_stream) == expected def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event): stop_event.set() audio_streamer.silence.seek(0, 0) - audio_streamer.play_from_stream(audio_streamer.silence) + audio_streamer.stream_queued_audio() assert get_stream_output(output_stream) == b"" @@ -127,6 +127,6 @@ def test_streamer_handles_load_interrupt(audio_streamer, input_queue, output_str input_queue.put(bytes(pl.tracks[0])) load_event.set() audio_streamer.silence.seek(0, 0) - audio_streamer.play_from_stream(audio_streamer.silence) + audio_streamer.stream_queued_audio() assert get_stream_output(output_stream) == b"" assert input_queue.empty