docs
This commit is contained in:
parent
affcf2d7dc
commit
aca24f6a4d
|
@ -9,6 +9,12 @@ logger = logging.getLogger('controller')
|
||||||
|
|
||||||
|
|
||||||
class Controller(threading.Thread):
|
class Controller(threading.Thread):
|
||||||
|
"""
|
||||||
|
A background thread started by the CroakerServer instance that controls a
|
||||||
|
shoutcast source streamer. The primary purpose of this class is to allow
|
||||||
|
the command and control server to interrupt streaming operations to
|
||||||
|
skip to a new track or load a new playlist.
|
||||||
|
"""
|
||||||
def __init__(self, control_queue):
|
def __init__(self, control_queue):
|
||||||
self._streamer_queue = None
|
self._streamer_queue = None
|
||||||
self._control_queue = control_queue
|
self._control_queue = control_queue
|
||||||
|
@ -24,6 +30,18 @@ class Controller(threading.Thread):
|
||||||
self._streamer = AudioStreamer(self._streamer_queue, self.skip_event, self.stop_event)
|
self._streamer = AudioStreamer(self._streamer_queue, self.skip_event, self.stop_event)
|
||||||
return self._streamer
|
return self._streamer
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self._streamer:
|
||||||
|
logging.debug("Sending STOP signal to streamer...")
|
||||||
|
self.stop_event.set()
|
||||||
|
self.playlist = None
|
||||||
|
|
||||||
|
def load(self, playlist_name: str):
|
||||||
|
self.playlist = load_playlist(playlist_name)
|
||||||
|
logger.debug(f"Switching to {self.playlist = }")
|
||||||
|
for track in self.playlist.tracks:
|
||||||
|
self._streamer_queue.put(str(track).encode())
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logger.debug("Starting AudioStreamer...")
|
logger.debug("Starting AudioStreamer...")
|
||||||
self.streamer.start()
|
self.streamer.start()
|
||||||
|
@ -53,15 +71,3 @@ class Controller(threading.Thread):
|
||||||
|
|
||||||
def handle_STOP(self):
|
def handle_STOP(self):
|
||||||
return self.stop()
|
return self.stop()
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self._streamer:
|
|
||||||
logging.debug("Sending STOP signal to streamer...")
|
|
||||||
self.stop_event.set()
|
|
||||||
self.playlist = None
|
|
||||||
|
|
||||||
def load(self, playlist_name: str):
|
|
||||||
self.playlist = load_playlist(playlist_name)
|
|
||||||
logger.debug(f"Switching to {self.playlist = }")
|
|
||||||
for track in self.playlist.tracks:
|
|
||||||
self._streamer_queue.put(str(track).encode())
|
|
||||||
|
|
|
@ -14,7 +14,13 @@ logger = logging.getLogger('server')
|
||||||
|
|
||||||
|
|
||||||
class RequestHandler(socketserver.StreamRequestHandler):
|
class RequestHandler(socketserver.StreamRequestHandler):
|
||||||
|
"""
|
||||||
|
Instantiated by the TCPServer when a request is received. Implements the
|
||||||
|
command and control protocol and sends commands to the shoutcast controller
|
||||||
|
on behalf of the user.
|
||||||
|
"""
|
||||||
supported_commands = {
|
supported_commands = {
|
||||||
|
# command # help text
|
||||||
"PLAY": "$PLAYLIST_NAME - Switch to the specified playlist.",
|
"PLAY": "$PLAYLIST_NAME - Switch to the specified playlist.",
|
||||||
"FFWD": " - Skip to the next track in the playlist.",
|
"FFWD": " - Skip to the next track in the playlist.",
|
||||||
"HELP": " - Display command help.",
|
"HELP": " - Display command help.",
|
||||||
|
@ -23,6 +29,16 @@ class RequestHandler(socketserver.StreamRequestHandler):
|
||||||
}
|
}
|
||||||
|
|
||||||
def handle(self):
|
def handle(self):
|
||||||
|
"""
|
||||||
|
Start a command and control session. Commands are read one line at a
|
||||||
|
time; the format is:
|
||||||
|
|
||||||
|
Byte Definition
|
||||||
|
-------------------
|
||||||
|
0-3 Command
|
||||||
|
4 Ignored
|
||||||
|
5+ Arguments
|
||||||
|
"""
|
||||||
while True:
|
while True:
|
||||||
self.data = self.rfile.readline().strip().decode()
|
self.data = self.rfile.readline().strip().decode()
|
||||||
logger.debug(f"{self.data = }")
|
logger.debug(f"{self.data = }")
|
||||||
|
@ -60,6 +76,9 @@ class RequestHandler(socketserver.StreamRequestHandler):
|
||||||
|
|
||||||
|
|
||||||
class CroakerServer(socketserver.TCPServer):
|
class CroakerServer(socketserver.TCPServer):
|
||||||
|
"""
|
||||||
|
A Daemonized TCP Server that also starts a Shoutcast source client.
|
||||||
|
"""
|
||||||
allow_reuse_address = True
|
allow_reuse_address = True
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -67,22 +86,33 @@ class CroakerServer(socketserver.TCPServer):
|
||||||
self._queue = queue.Queue()
|
self._queue = queue.Queue()
|
||||||
self.controller = Controller(self._queue)
|
self.controller = Controller(self._queue)
|
||||||
|
|
||||||
def _pidfile(self, terminate_if_running: bool = True):
|
def _pidfile(self):
|
||||||
return pidfile(path.root() / "croaker.pid", terminate_if_running=terminate_if_running)
|
return pidfile(path.root() / "croaker.pid")
|
||||||
|
|
||||||
def tell_controller(self, msg):
|
def tell_controller(self, msg):
|
||||||
|
"""
|
||||||
|
Enqueue a message for the shoutcast controller.
|
||||||
|
"""
|
||||||
self._queue.put(msg)
|
self._queue.put(msg)
|
||||||
|
|
||||||
def bind_address(self):
|
def bind_address(self):
|
||||||
return (os.environ["HOST"], int(os.environ["PORT"]))
|
return (os.environ["HOST"], int(os.environ["PORT"]))
|
||||||
|
|
||||||
def daemonize(self) -> None:
|
def daemonize(self) -> None:
|
||||||
|
"""
|
||||||
|
Daemonize the current process, start the shoutcast controller
|
||||||
|
background thread and then begin listening for connetions.
|
||||||
|
"""
|
||||||
logger.info(f"Daemonizing controller on {self.bind_address()}; pidfile and output in {path.root()}")
|
logger.info(f"Daemonizing controller on {self.bind_address()}; pidfile and output in {path.root()}")
|
||||||
super().__init__(self.bind_address(), RequestHandler)
|
super().__init__(self.bind_address(), RequestHandler)
|
||||||
|
|
||||||
self._context.pidfile = self._pidfile()
|
self._context.pidfile = self._pidfile()
|
||||||
self._context.stdout = open(path.root() / Path("croaker.out"), "wb")
|
self._context.stdout = open(path.root() / Path("croaker.out"), "wb", buffering=0)
|
||||||
self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0)
|
self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0)
|
||||||
|
|
||||||
|
# when open() is called, all open file descriptors will be closed, as
|
||||||
|
# befits a good daemon. However this will also close the socket on
|
||||||
|
# which the TCPServer is listening! So let's keep that one open.
|
||||||
self._context.files_preserve = [self.fileno()]
|
self._context.files_preserve = [self.fileno()]
|
||||||
self._context.open()
|
self._context.open()
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -11,6 +11,10 @@ logger = logging.getLogger('streamer')
|
||||||
|
|
||||||
|
|
||||||
class AudioStreamer(threading.Thread):
|
class AudioStreamer(threading.Thread):
|
||||||
|
"""
|
||||||
|
Receive filenames from the controller thread and stream the contents of
|
||||||
|
those files to the icecast server.
|
||||||
|
"""
|
||||||
def __init__(self, queue, skip_event, stop_event):
|
def __init__(self, queue, skip_event, stop_event):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
Loading…
Reference in New Issue
Block a user