Eliminated need for Controller class

This commit is contained in:
evilchili 2024-03-06 17:04:08 -08:00
parent 0e6812d6a9
commit 7417baeeb1
6 changed files with 124 additions and 138 deletions

View File

@ -101,10 +101,7 @@ def start(
"""
Start the Croaker command and control server.
"""
if daemonize:
server.daemonize()
else:
server.start()
server.start(daemonize=daemonize)
@app.command()

View File

@ -1,73 +0,0 @@
import logging
import queue
import threading
from croaker.playlist import load_playlist
from croaker.streamer import AudioStreamer
logger = logging.getLogger('controller')
class Controller(threading.Thread):
"""
A background thread started by the CroakerServer instance that controls a
shoutcast source streamer. The primary purpose of this class is to allow
the command and control server to interrupt streaming operations to
skip to a new track or load a new playlist.
"""
def __init__(self, control_queue):
self._streamer_queue = None
self._control_queue = control_queue
self.skip_event = threading.Event()
self.stop_event = threading.Event()
self._streamer = None
super().__init__()
@property
def streamer(self):
if not self._streamer:
self._streamer_queue = queue.Queue()
self._streamer = AudioStreamer(self._streamer_queue, self.skip_event, self.stop_event)
return self._streamer
def stop(self):
if self._streamer:
logging.debug("Sending STOP signal to streamer...")
self.stop_event.set()
self.playlist = None
def load(self, playlist_name: str):
self.playlist = load_playlist(playlist_name)
logger.debug(f"Switching to {self.playlist = }")
for track in self.playlist.tracks:
self._streamer_queue.put(str(track).encode())
def run(self):
logger.debug("Starting AudioStreamer...")
self.streamer.start()
self.load("session_start")
while True:
data = self._control_queue.get()
logger.debug(f"{data = }")
self.process_request(data)
def process_request(self, data):
cmd, *args = data.split(" ")
cmd = cmd.strip()
if not cmd:
return
handler = getattr(self, f"handle_{cmd}", None)
if not handler:
logger.debug("Ignoring invalid command: {cmd} = }")
return
handler(args)
def handle_PLAY(self, args):
return self.load(args[0])
def handle_FFWD(self, args):
logger.debug("Sending SKIP signal to streamer...")
self.skip_event.set()
def handle_STOP(self):
return self.stop()

View File

@ -13,8 +13,6 @@ logger = logging.getLogger('playlist')
playlists = {}
NowPlaying = None
def _stripped(name):
name.replace('"', "")
@ -25,16 +23,11 @@ def _stripped(name):
@dataclass
class Playlist:
name: str
position: int = 0
theme: Path = Path("_theme.mp3")
@property
def current(self):
return self.tracks[self.position]
@cached_property
def path(self):
return croaker.path.playlist_root() / Path(self.name)
return croaker.path.playlist_root() / self.name
@cached_property
def tracks(self):
@ -51,13 +44,6 @@ class Playlist:
entries += files
return entries
def skip(self):
logging.debug(f"Skipping from {self.position} on {self.name}")
if self.position == len(self.tracks) - 1:
self.position = 0
else:
self.position += 1
def get_audio_files(self, path: Path = None):
if not path:
path = self.path

View File

@ -2,13 +2,16 @@ import logging
import os
import queue
import socketserver
import threading
from pathlib import Path
from time import sleep
import daemon
from croaker import path
from croaker.controller import Controller
from croaker.pidfile import pidfile
from croaker.playlist import load_playlist
from croaker.streamer import AudioStreamer
logger = logging.getLogger('server')
@ -16,12 +19,13 @@ logger = logging.getLogger('server')
class RequestHandler(socketserver.StreamRequestHandler):
"""
Instantiated by the TCPServer when a request is received. Implements the
command and control protocol and sends commands to the shoutcast controller
on behalf of the user.
command and control protocol and sends commands to the shoutcast source
client on behalf of the user.
"""
supported_commands = {
# command # help text
"PLAY": "$PLAYLIST_NAME - Switch to the specified playlist.",
"PLAY": "PLAYLIST - Switch to the specified playlist.",
"LIST": "[PLAYLIST] - List playlists or contents of the specified list.",
"FFWD": " - Skip to the next track in the playlist.",
"HELP": " - Display command help.",
"KTHX": " - Close the current connection.",
@ -47,26 +51,35 @@ class RequestHandler(socketserver.StreamRequestHandler):
args = self.data[5:]
except IndexError:
self.send(f"ERR Command not understood '{cmd}'")
continue
if cmd not in self.supported_commands:
if not cmd:
continue
elif cmd not in self.supported_commands:
self.send(f"ERR Unknown Command '{cmd}'")
if cmd == "KTHX":
continue
elif cmd == "KTHX":
return self.send("KBAI")
handler = getattr(self, f"handle_{cmd}", None)
if handler:
if not handler:
self.send(f"ERR No handler for {cmd}.")
handler(args)
else:
self.default_handler(cmd, args)
def send(self, msg):
return self.wfile.write(msg.encode() + b"\n")
def default_handler(self, cmd, args):
self.server.tell_controller(f"{cmd} {args}")
def handle_PLAY(self, args):
self.server.load(args)
return self.send("OK")
def handle_FFWD(self, args):
self.server.ffwd()
return self.send("OK")
def handle_LIST(self, args):
return self.send(self.server.list(args))
def handle_HELP(self, args):
return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items()))
@ -84,28 +97,29 @@ class CroakerServer(socketserver.TCPServer):
def __init__(self):
self._context = daemon.DaemonContext()
self._queue = queue.Queue()
self.controller = Controller(self._queue)
self.skip_event = threading.Event()
self.stop_event = threading.Event()
self.load_event = threading.Event()
self._streamer = None
self.playlist = None
def _pidfile(self):
return pidfile(path.root() / "croaker.pid")
def tell_controller(self, msg):
"""
Enqueue a message for the shoutcast controller.
"""
self._queue.put(msg)
@property
def streamer(self):
if not self._streamer:
self._streamer = AudioStreamer(self._queue, self.skip_event, self.stop_event, self.load_event)
return self._streamer
def bind_address(self):
return (os.environ["HOST"], int(os.environ["PORT"]))
def daemonize(self) -> None:
def _daemonize(self) -> None:
"""
Daemonize the current process, start the shoutcast controller
background thread and then begin listening for connetions.
Daemonize the current process.
"""
logger.info(f"Daemonizing controller on {self.bind_address()}; pidfile and output in {path.root()}")
super().__init__(self.bind_address(), RequestHandler)
logger.info(f"Daemonizing controller; pidfile and output in {path.root()}")
self._context.pidfile = self._pidfile()
self._context.stdout = open(path.root() / Path("croaker.out"), "wb", buffering=0)
self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0)
@ -115,15 +129,49 @@ class CroakerServer(socketserver.TCPServer):
# which the TCPServer is listening! So let's keep that one open.
self._context.files_preserve = [self.fileno()]
self._context.open()
def start(self, daemonize: bool = True) -> None:
"""
Start the shoutcast controller background thread, then begin listening for connections.
"""
logger.info(f"Starting controller on {self.bind_address()}.")
super().__init__(self.bind_address(), RequestHandler)
if daemonize:
self._daemonize()
try:
self.controller.start()
logger.debug("Starting AudioStreamer...")
self.streamer.start()
self.load("session_start")
self.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down.")
self.stop()
def stop(self) -> None:
def stop(self):
self._pidfile()
def ffwd(self):
logger.debug("Sending SKIP signal to streamer...")
self.skip_event.set()
def clear_queue(self):
logger.debug("Requesting a reload...")
self.streamer.load_requested.set()
sleep(0.5)
def list(self, playlist_name: str = None):
if playlist_name:
return str(load_playlist(playlist_name))
return '\n'.join([str(p.name) for p in path.playlist_root().iterdir()])
def load(self, playlist_name: str):
logger.debug(f"Switching to {playlist_name = }")
if self.playlist:
self.clear_queue()
self.playlist = load_playlist(playlist_name)
logger.debug(f"Loaded new playlist {self.playlist = }")
for track in self.playlist.tracks:
self._queue.put(str(track).encode())
server = CroakerServer()

View File

@ -3,7 +3,6 @@ import os
import threading
from functools import cached_property
from pathlib import Path
from time import sleep
import shout
@ -15,11 +14,13 @@ class AudioStreamer(threading.Thread):
Receive filenames from the controller thread and stream the contents of
those files to the icecast server.
"""
def __init__(self, queue, skip_event, stop_event):
def __init__(self, queue, skip_event, stop_event, load_event, chunk_size=4096):
super().__init__()
self.queue = queue
self.skip_requested = skip_event
self.stop_requested = stop_event
self.load_requested = load_event
self.chunk_size = chunk_size
@cached_property
def _shout(self):
@ -30,8 +31,8 @@ class AudioStreamer(threading.Thread):
s.host = os.environ["ICECAST_HOST"]
s.port = int(os.environ["ICECAST_PORT"])
s.password = os.environ["ICECAST_PASSWORD"]
s.protocol = "http"
s.format = "mp3"
s.protocol = os.environ.get("ICECAST_PROTOCOL", "http")
s.format = os.environ.get("ICECAST_FORMAT", "mp3")
s.audio_info = {shout.SHOUT_AI_BITRATE: "192", shout.SHOUT_AI_SAMPLERATE: "44100", shout.SHOUT_AI_CHANNELS: "5"}
return s
@ -40,29 +41,56 @@ class AudioStreamer(threading.Thread):
self._shout.open()
while not self.stop_requested.is_set():
self._shout.get_connected()
track = self.queue.get()
track = self.queue.get(block=True)
logger.debug(f"Received: {track = }")
if track:
try:
self.play(Path(track.decode()))
continue
sleep(1)
except shout.ShoutException as e:
logger.error("An error occurred while streaming a track.", exc_info=e)
self._shout.close()
self._shout.open()
self._shout.close()
def clear_queue(self):
logger.debug("Clearing queue...")
while not self.queue.empty():
track = self.queue.get()
logger.debug(f"Clearing: {track}")
self.load_requested.clear()
logger.debug("Load event cleared.")
def play(self, track: Path):
with track.open("rb") as fh:
self._shout.get_connected()
logger.debug(f"Streaming {track.stem = }")
self._shout.set_metadata({"song": track.stem})
input_buffer = fh.read(4096)
while not self.skip_requested.is_set():
input_buffer = fh.read(self.chunk_size)
while True:
# To load a playlist, stop streaming the current track and clear the queue
# but do not clear the event. run() will detect it and
if self.load_requested.is_set():
logger.debug("Load was requested.")
self.clear_queue()
return
# Stop streaming and clear the queue
if self.stop_requested.is_set():
logger.debug("Stop was requested.")
self.stop_requested.clear()
return
# Stop streaming and clear the queue
if self.skip_requested.is_set():
logger.debug("Skip was requested.")
self.skip_requested.clear()
return
# continue streaming the current track to icecast, until complete
buf = input_buffer
input_buffer = fh.read(4096)
input_buffer = fh.read(self.chunk_size)
if len(buf) == 0:
break
self._shout.send(buf)
self._shout.sync()
if self.skip_requested.is_set():
self.skip_requested.clear()

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "croaker"
version = "0.9"
version = "0.9.1"
description = ""
authors = ["evilchili <evilchili@gmail.com>"]
readme = "README.md"