simplified streamer code

This commit is contained in:
evilchili 2024-04-27 11:47:25 -07:00
parent 7ded43476e
commit f4fa1b1690
2 changed files with 58 additions and 78 deletions

View File

@ -26,7 +26,7 @@ class AudioStreamer(threading.Thread):
self.load_requested = load_event self.load_requested = load_event
self.chunk_size = chunk_size self.chunk_size = chunk_size
@cached_property @property
def silence(self): def silence(self):
return transcoder.open(Path(__file__).parent / "silence.mp3", bufsize=2 * self.chunk_size) return transcoder.open(Path(__file__).parent / "silence.mp3", bufsize=2 * self.chunk_size)
@ -48,82 +48,62 @@ class AudioStreamer(threading.Thread):
self._shout.open() self._shout.open()
logger.debug(f"Connnected to shoutcast server at {self._shout.host}:{self._shout.port}") logger.debug(f"Connnected to shoutcast server at {self._shout.host}:{self._shout.port}")
while True: while True:
self.do_one_loop()
self._shout.close()
def do_one_loop(self):
# If the user said STOP, clear the queue.
if self.stop_requested.is_set():
logger.debug("Stop requested; clearing queue.")
self.clear_queue()
self.stop_requested.clear()
# Check to see if there is a queued request. If there is, play it.
# If there isn't, or if there's a problem playing the request,
# fallback to silence.
not_playing = False
try: try:
request = self.queue.get(block=False) self.stream_queued_audio()
logger.debug(f"Received: {request = }")
self.play_file(Path(request.decode()))
except queue.Empty:
logger.debug("Nothing queued; looping silence.")
not_playing = True
except Exception as exc: except Exception as exc:
logger.error("Caught exception; falling back to silence.", exc_info=exc) logger.error("Caught exception.", exc_info=exc)
not_playing = True self._shout.close()
if not_playing:
try:
self.silence.seek(0, 0)
self._shout.set_metadata({"song": "[NOTHING PLAYING]"})
self.play_from_stream(self.silence)
except Exception as exc: # pragma: no cover
logger.error("Caught exception trying to loop silence!", exc_info=exc)
def clear_queue(self): def clear_queue(self):
logger.debug("Clearing queue...") logger.debug("Clearing queue...")
while not self.queue.empty(): while not self.queue.empty():
track = self.queue.get() self.queue.get()
logger.debug(f"Clearing: {track}")
self.load_requested.clear()
logger.debug("Load event cleared.")
def _read_chunk(self, filehandle): def _read_chunk(self, filehandle):
return filehandle.read(self.chunk_size) return filehandle.read(self.chunk_size)
def play_file(self, track: Path): def queued_audio_source(self):
"""
Return a filehandle to the next queued audio source, or silence if the queue is empty.
"""
try:
track = Path(self.queue.get(block=False).decode())
logger.debug(f"Streaming {track.stem = }") logger.debug(f"Streaming {track.stem = }")
self._shout.set_metadata({"song": track.stem}) self._shout.set_metadata({"song": track.stem})
with transcoder.open(track, bufsize=2 * self.chunk_size) as fh: return transcoder.open(track, bufsize=2 * self.chunk_size)
return self.play_from_stream(fh) except queue.Empty:
logger.debug("Nothing queued; looping silence.")
except Exception as exc:
logger.error("Caught exception; falling back to silence.", exc_info=exc)
self._shout.set_metadata({"song": "[NOTHING PLAYING]"})
return self.silence
def play_from_stream(self, stream): def stream_queued_audio(self):
self._shout.get_connected() with self.queued_audio_source() as stream:
input_buffer = self._read_chunk(stream) buf = self._read_chunk(stream)
while True: while len(buf):
# To load a playlist, stop streaming the current track and clear the queue # stop streaming the current source
# but do not clear the event. run() will detect it and
if self.load_requested.is_set():
logger.debug("Load was requested.")
self.clear_queue()
return
# Stop streaming and clear the queue
if self.stop_requested.is_set():
logger.debug("Stop was requested; aborting current stream.")
return
# Stop streaming and clear the queue
if self.skip_requested.is_set(): if self.skip_requested.is_set():
logger.debug("Skip was requested.") logger.debug("Skip was requested.")
self.skip_requested.clear() self.skip_requested.clear()
return return
# continue streaming the current track to icecast, until complete # clear the queue
buf = input_buffer if self.load_requested.is_set():
logger.debug("Load was requested.")
self.clear_queue()
self.load_requested.clear()
return
# Stop streaming and clear the queue
if self.stop_requested.is_set():
logger.debug("Stop was requested.")
self.clear_queue()
self.stop_requested.clear()
return
# stream buffered audio and refill with the next chunk
input_buffer = self._read_chunk(stream) input_buffer = self._read_chunk(stream)
if len(buf) == 0:
break
self._shout.send(buf) self._shout.send(buf)
self._shout.sync() self._shout.sync()
buf = input_buffer

View File

@ -62,19 +62,19 @@ def audio_streamer(mock_shout, input_queue, skip_event, stop_event, load_event):
def test_streamer_stop(audio_streamer, stop_event, output_stream): def test_streamer_stop(audio_streamer, stop_event, output_stream):
stop_event.set() stop_event.set()
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
assert not stop_event.is_set() assert not stop_event.is_set()
def test_streamer_skip(audio_streamer, skip_event, output_stream): def test_streamer_skip(audio_streamer, skip_event, output_stream):
skip_event.set() skip_event.set()
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
assert not skip_event.is_set() assert not skip_event.is_set()
def test_streamer_load(audio_streamer, load_event, output_stream): def test_streamer_load(audio_streamer, load_event, output_stream):
load_event.set() load_event.set()
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
assert not load_event.is_set() assert not load_event.is_set()
@ -88,19 +88,19 @@ def test_clear_queue(audio_streamer, input_queue):
def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes): def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes):
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
track = playlist.Playlist(name="test_playlist").tracks[0] track = playlist.Playlist(name="test_playlist").tracks[0]
input_queue.put(bytes(track)) input_queue.put(bytes(track))
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes
def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes): def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes):
monkeypatch.setattr(audio_streamer, "play_file", MagicMock(side_effect=Exception)) monkeypatch.setattr(audio_streamer.queue, "get", MagicMock(side_effect=Exception))
track = playlist.Playlist(name="test_playlist").tracks[0] track = playlist.Playlist(name="test_playlist").tracks[0]
input_queue.put(bytes(track)) input_queue.put(bytes(track))
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == silence_bytes assert get_stream_output(output_stream) == silence_bytes
@ -111,14 +111,14 @@ def test_streamer_plays_from_queue(audio_streamer, input_queue, output_stream):
input_queue.put(bytes(track)) input_queue.put(bytes(track))
expected += track.read_bytes() expected += track.read_bytes()
while not input_queue.empty(): while not input_queue.empty():
audio_streamer.do_one_loop() audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == expected assert get_stream_output(output_stream) == expected
def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event): def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event):
stop_event.set() stop_event.set()
audio_streamer.silence.seek(0, 0) audio_streamer.silence.seek(0, 0)
audio_streamer.play_from_stream(audio_streamer.silence) audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == b"" assert get_stream_output(output_stream) == b""
@ -127,6 +127,6 @@ def test_streamer_handles_load_interrupt(audio_streamer, input_queue, output_str
input_queue.put(bytes(pl.tracks[0])) input_queue.put(bytes(pl.tracks[0]))
load_event.set() load_event.set()
audio_streamer.silence.seek(0, 0) audio_streamer.silence.seek(0, 0)
audio_streamer.play_from_stream(audio_streamer.silence) audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == b"" assert get_stream_output(output_stream) == b""
assert input_queue.empty assert input_queue.empty