dnd-music-console/croaker/streamer.py
2024-03-06 20:09:59 -08:00

97 lines
3.4 KiB
Python

import logging
import os
import threading
from functools import cached_property
from pathlib import Path
import shout
logger = logging.getLogger('streamer')
class AudioStreamer(threading.Thread):
"""
Receive filenames from the controller thread and stream the contents of
those files to the icecast server.
"""
def __init__(self, queue, skip_event, stop_event, load_event, chunk_size=4096):
super().__init__()
self.queue = queue
self.skip_requested = skip_event
self.stop_requested = stop_event
self.load_requested = load_event
self.chunk_size = chunk_size
@cached_property
def _shout(self):
s = shout.Shout()
s.name = "Croaker Radio"
s.url = os.environ["ICECAST_URL"]
s.mount = os.environ["ICECAST_MOUNT"]
s.host = os.environ["ICECAST_HOST"]
s.port = int(os.environ["ICECAST_PORT"])
s.password = os.environ["ICECAST_PASSWORD"]
s.protocol = os.environ.get("ICECAST_PROTOCOL", "http")
s.format = os.environ.get("ICECAST_FORMAT", "mp3")
s.audio_info = {shout.SHOUT_AI_BITRATE: "192", shout.SHOUT_AI_SAMPLERATE: "44100", shout.SHOUT_AI_CHANNELS: "5"}
return s
def run(self):
logger.debug("Initialized")
self._shout.open()
while not self.stop_requested.is_set():
self._shout.get_connected()
track = self.queue.get(block=True)
logger.debug(f"Received: {track = }")
if track:
try:
self.play(Path(track.decode()))
except shout.ShoutException as e:
logger.error("An error occurred while streaming a track.", exc_info=e)
self._shout.close()
self._shout.open()
self._shout.close()
def clear_queue(self):
logger.debug("Clearing queue...")
while not self.queue.empty():
track = self.queue.get()
logger.debug(f"Clearing: {track}")
self.load_requested.clear()
logger.debug("Load event cleared.")
def play(self, track: Path):
with track.open("rb") as fh:
self._shout.get_connected()
logger.debug(f"Streaming {track.stem = }")
self._shout.set_metadata({"song": track.stem})
input_buffer = fh.read(self.chunk_size)
while True:
# To load a playlist, stop streaming the current track and clear the queue
# but do not clear the event. run() will detect it and
if self.load_requested.is_set():
logger.debug("Load was requested.")
self.clear_queue()
return
# Stop streaming and clear the queue
if self.stop_requested.is_set():
logger.debug("Stop was requested.")
self.stop_requested.clear()
return
# Stop streaming and clear the queue
if self.skip_requested.is_set():
logger.debug("Skip was requested.")
self.skip_requested.clear()
return
# continue streaming the current track to icecast, until complete
buf = input_buffer
input_buffer = fh.read(self.chunk_size)
if len(buf) == 0:
break
self._shout.send(buf)
self._shout.sync()