diff --git a/croaker/cli.py b/croaker/cli.py index bff121d..9888bb6 100644 --- a/croaker/cli.py +++ b/croaker/cli.py @@ -101,10 +101,7 @@ def start( """ Start the Croaker command and control server. """ - if daemonize: - server.daemonize() - else: - server.start() + server.start(daemonize=daemonize) @app.command() diff --git a/croaker/controller.py b/croaker/controller.py deleted file mode 100644 index fd2003d..0000000 --- a/croaker/controller.py +++ /dev/null @@ -1,73 +0,0 @@ -import logging -import queue -import threading - -from croaker.playlist import load_playlist -from croaker.streamer import AudioStreamer - -logger = logging.getLogger('controller') - - -class Controller(threading.Thread): - """ - A background thread started by the CroakerServer instance that controls a - shoutcast source streamer. The primary purpose of this class is to allow - the command and control server to interrupt streaming operations to - skip to a new track or load a new playlist. - """ - def __init__(self, control_queue): - self._streamer_queue = None - self._control_queue = control_queue - self.skip_event = threading.Event() - self.stop_event = threading.Event() - self._streamer = None - super().__init__() - - @property - def streamer(self): - if not self._streamer: - self._streamer_queue = queue.Queue() - self._streamer = AudioStreamer(self._streamer_queue, self.skip_event, self.stop_event) - return self._streamer - - def stop(self): - if self._streamer: - logging.debug("Sending STOP signal to streamer...") - self.stop_event.set() - self.playlist = None - - def load(self, playlist_name: str): - self.playlist = load_playlist(playlist_name) - logger.debug(f"Switching to {self.playlist = }") - for track in self.playlist.tracks: - self._streamer_queue.put(str(track).encode()) - - def run(self): - logger.debug("Starting AudioStreamer...") - self.streamer.start() - self.load("session_start") - while True: - data = self._control_queue.get() - logger.debug(f"{data = }") - self.process_request(data) - - def process_request(self, data): - cmd, *args = data.split(" ") - cmd = cmd.strip() - if not cmd: - return - handler = getattr(self, f"handle_{cmd}", None) - if not handler: - logger.debug("Ignoring invalid command: {cmd} = }") - return - handler(args) - - def handle_PLAY(self, args): - return self.load(args[0]) - - def handle_FFWD(self, args): - logger.debug("Sending SKIP signal to streamer...") - self.skip_event.set() - - def handle_STOP(self): - return self.stop() diff --git a/croaker/playlist.py b/croaker/playlist.py index 64e6c59..23db30e 100644 --- a/croaker/playlist.py +++ b/croaker/playlist.py @@ -13,8 +13,6 @@ logger = logging.getLogger('playlist') playlists = {} -NowPlaying = None - def _stripped(name): name.replace('"', "") @@ -25,16 +23,11 @@ def _stripped(name): @dataclass class Playlist: name: str - position: int = 0 theme: Path = Path("_theme.mp3") - @property - def current(self): - return self.tracks[self.position] - @cached_property def path(self): - return croaker.path.playlist_root() / Path(self.name) + return croaker.path.playlist_root() / self.name @cached_property def tracks(self): @@ -51,13 +44,6 @@ class Playlist: entries += files return entries - def skip(self): - logging.debug(f"Skipping from {self.position} on {self.name}") - if self.position == len(self.tracks) - 1: - self.position = 0 - else: - self.position += 1 - def get_audio_files(self, path: Path = None): if not path: path = self.path diff --git a/croaker/server.py b/croaker/server.py index a5c833c..a341211 100644 --- a/croaker/server.py +++ b/croaker/server.py @@ -2,13 +2,16 @@ import logging import os import queue import socketserver +import threading from pathlib import Path +from time import sleep import daemon from croaker import path -from croaker.controller import Controller from croaker.pidfile import pidfile +from croaker.playlist import load_playlist +from croaker.streamer import AudioStreamer logger = logging.getLogger('server') @@ -16,16 +19,17 @@ logger = logging.getLogger('server') class RequestHandler(socketserver.StreamRequestHandler): """ Instantiated by the TCPServer when a request is received. Implements the - command and control protocol and sends commands to the shoutcast controller - on behalf of the user. + command and control protocol and sends commands to the shoutcast source + client on behalf of the user. """ supported_commands = { - # command # help text - "PLAY": "$PLAYLIST_NAME - Switch to the specified playlist.", - "FFWD": " - Skip to the next track in the playlist.", - "HELP": " - Display command help.", - "KTHX": " - Close the current connection.", - "STOP": " - Stop Croaker.", + # command # help text + "PLAY": "PLAYLIST - Switch to the specified playlist.", + "LIST": "[PLAYLIST] - List playlists or contents of the specified list.", + "FFWD": " - Skip to the next track in the playlist.", + "HELP": " - Display command help.", + "KTHX": " - Close the current connection.", + "STOP": " - Stop Croaker.", } def handle(self): @@ -47,26 +51,35 @@ class RequestHandler(socketserver.StreamRequestHandler): args = self.data[5:] except IndexError: self.send(f"ERR Command not understood '{cmd}'") + continue - if cmd not in self.supported_commands: + if not cmd: + continue + elif cmd not in self.supported_commands: self.send(f"ERR Unknown Command '{cmd}'") - - if cmd == "KTHX": + continue + elif cmd == "KTHX": return self.send("KBAI") handler = getattr(self, f"handle_{cmd}", None) - if handler: - handler(args) - else: - self.default_handler(cmd, args) + if not handler: + self.send(f"ERR No handler for {cmd}.") + handler(args) def send(self, msg): return self.wfile.write(msg.encode() + b"\n") - def default_handler(self, cmd, args): - self.server.tell_controller(f"{cmd} {args}") + def handle_PLAY(self, args): + self.server.load(args) return self.send("OK") + def handle_FFWD(self, args): + self.server.ffwd() + return self.send("OK") + + def handle_LIST(self, args): + return self.send(self.server.list(args)) + def handle_HELP(self, args): return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items())) @@ -84,28 +97,29 @@ class CroakerServer(socketserver.TCPServer): def __init__(self): self._context = daemon.DaemonContext() self._queue = queue.Queue() - self.controller = Controller(self._queue) + self.skip_event = threading.Event() + self.stop_event = threading.Event() + self.load_event = threading.Event() + self._streamer = None + self.playlist = None def _pidfile(self): return pidfile(path.root() / "croaker.pid") - def tell_controller(self, msg): - """ - Enqueue a message for the shoutcast controller. - """ - self._queue.put(msg) + @property + def streamer(self): + if not self._streamer: + self._streamer = AudioStreamer(self._queue, self.skip_event, self.stop_event, self.load_event) + return self._streamer def bind_address(self): return (os.environ["HOST"], int(os.environ["PORT"])) - def daemonize(self) -> None: + def _daemonize(self) -> None: """ - Daemonize the current process, start the shoutcast controller - background thread and then begin listening for connetions. + Daemonize the current process. """ - logger.info(f"Daemonizing controller on {self.bind_address()}; pidfile and output in {path.root()}") - super().__init__(self.bind_address(), RequestHandler) - + logger.info(f"Daemonizing controller; pidfile and output in {path.root()}") self._context.pidfile = self._pidfile() self._context.stdout = open(path.root() / Path("croaker.out"), "wb", buffering=0) self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0) @@ -115,15 +129,49 @@ class CroakerServer(socketserver.TCPServer): # which the TCPServer is listening! So let's keep that one open. self._context.files_preserve = [self.fileno()] self._context.open() + + def start(self, daemonize: bool = True) -> None: + """ + Start the shoutcast controller background thread, then begin listening for connections. + """ + logger.info(f"Starting controller on {self.bind_address()}.") + super().__init__(self.bind_address(), RequestHandler) + if daemonize: + self._daemonize() try: - self.controller.start() + logger.debug("Starting AudioStreamer...") + self.streamer.start() + self.load("session_start") self.serve_forever() except KeyboardInterrupt: logger.info("Shutting down.") self.stop() - def stop(self) -> None: + def stop(self): self._pidfile() + def ffwd(self): + logger.debug("Sending SKIP signal to streamer...") + self.skip_event.set() + + def clear_queue(self): + logger.debug("Requesting a reload...") + self.streamer.load_requested.set() + sleep(0.5) + + def list(self, playlist_name: str = None): + if playlist_name: + return str(load_playlist(playlist_name)) + return '\n'.join([str(p.name) for p in path.playlist_root().iterdir()]) + + def load(self, playlist_name: str): + logger.debug(f"Switching to {playlist_name = }") + if self.playlist: + self.clear_queue() + self.playlist = load_playlist(playlist_name) + logger.debug(f"Loaded new playlist {self.playlist = }") + for track in self.playlist.tracks: + self._queue.put(str(track).encode()) + server = CroakerServer() diff --git a/croaker/streamer.py b/croaker/streamer.py index 0626897..8ed37b6 100644 --- a/croaker/streamer.py +++ b/croaker/streamer.py @@ -3,7 +3,6 @@ import os import threading from functools import cached_property from pathlib import Path -from time import sleep import shout @@ -15,11 +14,13 @@ class AudioStreamer(threading.Thread): Receive filenames from the controller thread and stream the contents of those files to the icecast server. """ - def __init__(self, queue, skip_event, stop_event): + def __init__(self, queue, skip_event, stop_event, load_event, chunk_size=4096): super().__init__() self.queue = queue self.skip_requested = skip_event self.stop_requested = stop_event + self.load_requested = load_event + self.chunk_size = chunk_size @cached_property def _shout(self): @@ -30,8 +31,8 @@ class AudioStreamer(threading.Thread): s.host = os.environ["ICECAST_HOST"] s.port = int(os.environ["ICECAST_PORT"]) s.password = os.environ["ICECAST_PASSWORD"] - s.protocol = "http" - s.format = "mp3" + s.protocol = os.environ.get("ICECAST_PROTOCOL", "http") + s.format = os.environ.get("ICECAST_FORMAT", "mp3") s.audio_info = {shout.SHOUT_AI_BITRATE: "192", shout.SHOUT_AI_SAMPLERATE: "44100", shout.SHOUT_AI_CHANNELS: "5"} return s @@ -40,29 +41,56 @@ class AudioStreamer(threading.Thread): self._shout.open() while not self.stop_requested.is_set(): self._shout.get_connected() - track = self.queue.get() + track = self.queue.get(block=True) logger.debug(f"Received: {track = }") if track: - self.play(Path(track.decode())) - continue - sleep(1) + try: + self.play(Path(track.decode())) + except shout.ShoutException as e: + logger.error("An error occurred while streaming a track.", exc_info=e) + self._shout.close() + self._shout.open() self._shout.close() + def clear_queue(self): + logger.debug("Clearing queue...") + while not self.queue.empty(): + track = self.queue.get() + logger.debug(f"Clearing: {track}") + self.load_requested.clear() + logger.debug("Load event cleared.") + def play(self, track: Path): with track.open("rb") as fh: self._shout.get_connected() logger.debug(f"Streaming {track.stem = }") self._shout.set_metadata({"song": track.stem}) - input_buffer = fh.read(4096) - while not self.skip_requested.is_set(): + input_buffer = fh.read(self.chunk_size) + while True: + + # To load a playlist, stop streaming the current track and clear the queue + # but do not clear the event. run() will detect it and + if self.load_requested.is_set(): + logger.debug("Load was requested.") + self.clear_queue() + return + + # Stop streaming and clear the queue if self.stop_requested.is_set(): + logger.debug("Stop was requested.") self.stop_requested.clear() return + + # Stop streaming and clear the queue + if self.skip_requested.is_set(): + logger.debug("Skip was requested.") + self.skip_requested.clear() + return + + # continue streaming the current track to icecast, until complete buf = input_buffer - input_buffer = fh.read(4096) + input_buffer = fh.read(self.chunk_size) if len(buf) == 0: break self._shout.send(buf) self._shout.sync() - if self.skip_requested.is_set(): - self.skip_requested.clear() diff --git a/pyproject.toml b/pyproject.toml index f28c191..a6786b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "croaker" -version = "0.9" +version = "0.9.1" description = "" authors = ["evilchili "] readme = "README.md"