Compare commits

..

21 Commits
0.9 ... main

Author SHA1 Message Date
evilchili
16f246cd30 fix end-of-stream bug 2024-09-02 09:54:11 -07:00
evilchili
a4e05cbed1 typo 2024-06-03 00:10:00 -07:00
evilchili
ddea04a58d formatting 2024-06-03 00:09:15 -07:00
evilchili
26aa401bfe formatting 2024-06-02 23:57:37 -07:00
evilchili
1fbe833d39 frame-aligned chunks 2024-06-02 23:55:12 -07:00
evilchili
f4fa1b1690 simplified streamer code 2024-04-27 11:47:25 -07:00
evilchili
7ded43476e restructure project for poetry-slam 2024-03-26 00:51:54 -07:00
evilchili
c94fb127ed added buffered reads from disk io and transcoding 2024-03-17 15:19:33 -07:00
evilchili
4ee4fb4a73 adding unit tests of streamer 2024-03-17 14:44:36 -07:00
evilchili
a5cf97870b remove dead code, add pidfile tests 2024-03-10 12:08:45 -07:00
evilchili
205177dca3 adding transcoder test coverage 2024-03-10 11:45:24 -07:00
evilchili
d97faca0f7 updating readme 2024-03-10 00:27:34 -08:00
evilchili
9fb8d1f248 updating readme 2024-03-10 00:25:57 -08:00
evilchili
351b17db69 readme update 2024-03-10 00:17:38 -08:00
evilchili
f6afd06575 updating setup defaults 2024-03-10 00:06:38 -08:00
evilchili
27164358ae Adding transcoader / recoverable errors 2024-03-10 00:05:24 -08:00
evilchili
912d3fccd7 rescuing CPU from endless loop hell on interactive console 2024-03-08 14:40:44 -08:00
evilchili
d2f4a85cd5 adding tests 2024-03-08 13:18:25 -08:00
evilchili
f3fd8215f0 adding playlist tests 2024-03-07 21:51:48 -08:00
evilchili
7417baeeb1 Eliminated need for Controller class 2024-03-06 20:09:59 -08:00
evilchili
0e6812d6a9 bumping version 2024-03-05 23:42:10 -08:00
29 changed files with 906 additions and 377 deletions

26
.coveragerc Normal file
View File

@ -0,0 +1,26 @@
# .coveragerc to control coverage.py
[run]
branch = True
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover
# Don't complain about missing debug-only code:
def __repr__
if self\.debug
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
# Don't complain about abstract methods, they aren't run:
@(abc\.)?abstractmethod
ignore_errors = True

127
README.md
View File

@ -1,16 +1,34 @@
# Croaker # Croaker
A shoutcast audio playlist designed for serving D&D session music. A shoutcast audio player designed for serving D&D session music.
### Features
* Native streaming of MP3 sources direct to your shoutcast / icecast server
* Transcoding of anything your local `ffmpeg` installation can convert to mp3
* Playlists are built using symlinks
* Randomizes playlist order the first time it is cached
* Always plays `_theme.mp3` first upon switching to a playlist, if it exists
* Falls back to silence if the stream encounters an error
### Requirements
* A functioning shoutcast / icecast server
* Python >= 3.10
* ffmpeg
* libshout3-dev
## What? Why? ## What? Why?
Because I run an online D&D game, which includes a background music stream for my players. The stream used to be served by liquidsoap and controlled by a bunch of bash scripts I cobbled together which are functional but brittle, and liquidsoap is a nightmare for the small use case. Also, this currently requires me to have a terminal window open to my media server to control liquidsoap directly, and I'd rather integrate the music controls directly with the rest of my DM tools, all of which run on my laptop. Because I run an online D&D game, which includes a background music stream for my players. The stream used to be served by liquidsoap and controlled by a bunch of bash scripts I cobbled together which are functional but brittle, and liquidsoap is a nightmare for the small use case. Also, this required me to have a terminal window open to my media server to control liquidsoap directly, and I'd rather integrate the music controls directly with the rest of my DM tools, all of which run on my laptop.
*Now that is a powerful yak! -- Aesop Rock (misquoted)* *Now that is a powerful yak! -- Aesop Rock (misquoted)*
## Quick Start (Server) ## Quick Start (Server)
This assumes you have a functioning icecast2 installation already. This assumes you have a functioning icecast2/whatever installation already.
``` ```
% sudo apt install libshout3-dev % sudo apt install libshout3-dev
@ -23,6 +41,8 @@ This assumes you have a functioning icecast2 installation already.
Now start the server, which will begin streaming the `session_start` playlist: Now start the server, which will begin streaming the `session_start` playlist:
## Controlling The Server
``` ```
% croaker start % croaker start
INFO Daemonizing controller on (localhost, 8003); pidfile and logs in ~/.dnd/croaker INFO Daemonizing controller on (localhost, 8003); pidfile and logs in ~/.dnd/croaker
@ -30,38 +50,111 @@ INFO Daemonizing controller on (localhost, 8003); pidfile and logs in ~/.dnd/cro
Connnect to the command & control server: Connnect to the command & control server:
``` ```bash
% telnet localhost 8003 % telnet localhost 8003
Trying 127.0.0.1... Trying 127.0.0.1...
Connected to croaker.local. Connected to croaker.local.
Escape character is '^]'. Escape character is '^]'.
HELP
PLAY $PLAYLIST_NAME - Switch to the specified playlist. help
FFWD - Skip to the next track in the playlist.
HELP - Display command help. PLAY PLAYLIST - Switch to the specified playlist.
KTHX - Close the current connection. LIST [PLAYLIST] - List playlists or contents of the specified list.
STOP - Stop Croaker. FFWD - Skip to the next track in the playlist.
OK HELP - Display command help.
KTHX - Close the current connection.
STOP - Stop the current track and stream silence.
STFU - Terminate the Croaker server.
```
List available playlists:
```
list
battle
adventure
session_start
``` ```
Switch to battle music -- roll initiative! Switch to battle music -- roll initiative!
``` ```
PLAY battle play battle
OK OK
``` ```
Skip this track and move on to the next: Skip this track and move on to the next:
``` ```
FFWD ffwd
OK OK
``` ```
Stop the server: Stop the music:
``` ```
STOP stop
Shutting down. OK
```
Disconnect:
```
kthx
KBAI
Connection closed by foreign host. Connection closed by foreign host.
``` ```
## Python Client Implementation
Here's a sample client using Ye Olde Socket Library:
```python
import socket
from dataclasses import dataclass
from functools import cached_property
@dataclass
class CroakerClient():
host: str
port: int
@cached_property
def playlists(self):
return self.send("LIST").split("\n")
def list(self, *args):
if not args:
return self.playlists
return self.send(f"LIST {args[0]}")
def play(self, *args):
if not args:
return "Error: Must specify the playlist to play."
return self.send(f"PLAY {args[0]}")
def ffwd(self, *args):
return self.send("FFWD")
def stop(self, *args):
return self.send("STOP")
def send(self, msg: str):
BUFSIZE = 4096
data = bytearray()
with socket.create_connection((self.host, self.port)) as sock:
sock.sendall(f"{msg}\n".encode())
while True:
buf = sock.recv(BUFSIZE)
data.extend(buf)
if len(buf) < BUFSIZE:
break
sock.sendall(b'KTHX\n')
return data.decode()
if __name__ == '__main__':
client = CroakerClient(host='localhost', port=1234)
client.play('session_start')
```

View File

@ -1,73 +0,0 @@
import logging
import queue
import threading
from croaker.playlist import load_playlist
from croaker.streamer import AudioStreamer
logger = logging.getLogger('controller')
class Controller(threading.Thread):
"""
A background thread started by the CroakerServer instance that controls a
shoutcast source streamer. The primary purpose of this class is to allow
the command and control server to interrupt streaming operations to
skip to a new track or load a new playlist.
"""
def __init__(self, control_queue):
self._streamer_queue = None
self._control_queue = control_queue
self.skip_event = threading.Event()
self.stop_event = threading.Event()
self._streamer = None
super().__init__()
@property
def streamer(self):
if not self._streamer:
self._streamer_queue = queue.Queue()
self._streamer = AudioStreamer(self._streamer_queue, self.skip_event, self.stop_event)
return self._streamer
def stop(self):
if self._streamer:
logging.debug("Sending STOP signal to streamer...")
self.stop_event.set()
self.playlist = None
def load(self, playlist_name: str):
self.playlist = load_playlist(playlist_name)
logger.debug(f"Switching to {self.playlist = }")
for track in self.playlist.tracks:
self._streamer_queue.put(str(track).encode())
def run(self):
logger.debug("Starting AudioStreamer...")
self.streamer.start()
self.load("session_start")
while True:
data = self._control_queue.get()
logger.debug(f"{data = }")
self.process_request(data)
def process_request(self, data):
cmd, *args = data.split(" ")
cmd = cmd.strip()
if not cmd:
return
handler = getattr(self, f"handle_{cmd}", None)
if not handler:
logger.debug("Ignoring invalid command: {cmd} = }")
return
handler(args)
def handle_PLAY(self, args):
return self.load(args[0])
def handle_FFWD(self, args):
logger.debug("Sending SKIP signal to streamer...")
self.skip_event.set()
def handle_STOP(self):
return self.stop()

View File

@ -1,16 +0,0 @@
class APIHandlingException(Exception):
"""
An API reqeust could not be encoded or decoded.
"""
class ConfigurationError(Exception):
"""
An error was discovered with the Groove on Demand configuration.
"""
class InvalidPathError(Exception):
"""
The specified path was invalid -- either it was not the expected type or wasn't accessible.
"""

View File

@ -1,129 +0,0 @@
import logging
import os
import queue
import socketserver
from pathlib import Path
import daemon
from croaker import path
from croaker.controller import Controller
from croaker.pidfile import pidfile
logger = logging.getLogger('server')
class RequestHandler(socketserver.StreamRequestHandler):
"""
Instantiated by the TCPServer when a request is received. Implements the
command and control protocol and sends commands to the shoutcast controller
on behalf of the user.
"""
supported_commands = {
# command # help text
"PLAY": "$PLAYLIST_NAME - Switch to the specified playlist.",
"FFWD": " - Skip to the next track in the playlist.",
"HELP": " - Display command help.",
"KTHX": " - Close the current connection.",
"STOP": " - Stop Croaker.",
}
def handle(self):
"""
Start a command and control session. Commands are read one line at a
time; the format is:
Byte Definition
-------------------
0-3 Command
4 Ignored
5+ Arguments
"""
while True:
self.data = self.rfile.readline().strip().decode()
logger.debug(f"{self.data = }")
try:
cmd = self.data[0:4].strip().upper()
args = self.data[5:]
except IndexError:
self.send(f"ERR Command not understood '{cmd}'")
if cmd not in self.supported_commands:
self.send(f"ERR Unknown Command '{cmd}'")
if cmd == "KTHX":
return self.send("KBAI")
handler = getattr(self, f"handle_{cmd}", None)
if handler:
handler(args)
else:
self.default_handler(cmd, args)
def send(self, msg):
return self.wfile.write(msg.encode() + b"\n")
def default_handler(self, cmd, args):
self.server.tell_controller(f"{cmd} {args}")
return self.send("OK")
def handle_HELP(self, args):
return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items()))
def handle_STOP(self, args):
self.send("Shutting down.")
self.server.stop()
class CroakerServer(socketserver.TCPServer):
"""
A Daemonized TCP Server that also starts a Shoutcast source client.
"""
allow_reuse_address = True
def __init__(self):
self._context = daemon.DaemonContext()
self._queue = queue.Queue()
self.controller = Controller(self._queue)
def _pidfile(self):
return pidfile(path.root() / "croaker.pid")
def tell_controller(self, msg):
"""
Enqueue a message for the shoutcast controller.
"""
self._queue.put(msg)
def bind_address(self):
return (os.environ["HOST"], int(os.environ["PORT"]))
def daemonize(self) -> None:
"""
Daemonize the current process, start the shoutcast controller
background thread and then begin listening for connetions.
"""
logger.info(f"Daemonizing controller on {self.bind_address()}; pidfile and output in {path.root()}")
super().__init__(self.bind_address(), RequestHandler)
self._context.pidfile = self._pidfile()
self._context.stdout = open(path.root() / Path("croaker.out"), "wb", buffering=0)
self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0)
# when open() is called, all open file descriptors will be closed, as
# befits a good daemon. However this will also close the socket on
# which the TCPServer is listening! So let's keep that one open.
self._context.files_preserve = [self.fileno()]
self._context.open()
try:
self.controller.start()
self.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down.")
self.stop()
def stop(self) -> None:
self._pidfile()
server = CroakerServer()

View File

@ -1,68 +0,0 @@
import logging
import os
import threading
from functools import cached_property
from pathlib import Path
from time import sleep
import shout
logger = logging.getLogger('streamer')
class AudioStreamer(threading.Thread):
"""
Receive filenames from the controller thread and stream the contents of
those files to the icecast server.
"""
def __init__(self, queue, skip_event, stop_event):
super().__init__()
self.queue = queue
self.skip_requested = skip_event
self.stop_requested = stop_event
@cached_property
def _shout(self):
s = shout.Shout()
s.name = "Croaker Radio"
s.url = os.environ["ICECAST_URL"]
s.mount = os.environ["ICECAST_MOUNT"]
s.host = os.environ["ICECAST_HOST"]
s.port = int(os.environ["ICECAST_PORT"])
s.password = os.environ["ICECAST_PASSWORD"]
s.protocol = "http"
s.format = "mp3"
s.audio_info = {shout.SHOUT_AI_BITRATE: "192", shout.SHOUT_AI_SAMPLERATE: "44100", shout.SHOUT_AI_CHANNELS: "5"}
return s
def run(self):
logger.debug("Initialized")
self._shout.open()
while not self.stop_requested.is_set():
self._shout.get_connected()
track = self.queue.get()
logger.debug(f"Received: {track = }")
if track:
self.play(Path(track.decode()))
continue
sleep(1)
self._shout.close()
def play(self, track: Path):
with track.open("rb") as fh:
self._shout.get_connected()
logger.debug(f"Streaming {track.stem = }")
self._shout.set_metadata({"song": track.stem})
input_buffer = fh.read(4096)
while not self.skip_requested.is_set():
if self.stop_requested.is_set():
self.stop_requested.clear()
return
buf = input_buffer
input_buffer = fh.read(4096)
if len(buf) == 0:
break
self._shout.send(buf)
self._shout.sync()
if self.skip_requested.is_set():
self.skip_requested.clear()

View File

@ -1,15 +1,15 @@
[tool.poetry] [tool.poetry]
name = "croaker" name = "croaker"
version = "0.1.3" version = "0.9.2"
description = "" description = ""
authors = ["evilchili <evilchili@gmail.com>"] authors = ["evilchili <evilchili@gmail.com>"]
readme = "README.md" readme = "README.md"
packages = [ packages = [
{ include = "croaker" } { include = "*", from = "src" }
] ]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.8" python = ">=3.10,<4.0"
prompt-toolkit = "^3.0.38" prompt-toolkit = "^3.0.38"
typer = "^0.9.0" typer = "^0.9.0"
python-dotenv = "^0.21.0" python-dotenv = "^0.21.0"
@ -21,14 +21,21 @@ requests = "^2.31.0"
psutil = "^5.9.8" psutil = "^5.9.8"
exscript = "^2.6.28" exscript = "^2.6.28"
python-shout = "^0.2.8" python-shout = "^0.2.8"
ffmpeg-python = "^0.2.0"
[tool.poetry.scripts] [tool.poetry.scripts]
croaker = "croaker.cli:app" croaker = "croaker.cli:app"
[tool.poetry.dev-dependencies] [tool.poetry.group.dev.dependencies]
black = "^23.3.0" pytest = "^8.1.1"
isort = "^5.12.0" pytest-cov = "^5.0.0"
pyproject-autoflake = "^1.0.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
### SLAM
[tool.black] [tool.black]
line-length = 120 line-length = 120
@ -48,7 +55,8 @@ ignore-init-module-imports = true # exclude __init__.py when removing unused
remove-duplicate-keys = true # remove all duplicate keys in objects remove-duplicate-keys = true # remove all duplicate keys in objects
remove-unused-variables = true # remove unused variables remove-unused-variables = true # remove unused variables
[tool.pytest.ini_options]
log_cli_level = "DEBUG"
addopts = "--cov=src --cov-report=term-missing"
[build-system] ### ENDSLAM
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@ -10,23 +10,19 @@ import typer
from dotenv import load_dotenv from dotenv import load_dotenv
from typing_extensions import Annotated from typing_extensions import Annotated
import croaker.path from croaker import path
from croaker.exceptions import ConfigurationError
from croaker.playlist import Playlist from croaker.playlist import Playlist
from croaker.server import server from croaker.server import server
SETUP_HELP = """ SETUP_HELP = f"""
# Root directory for croaker configuration and logs. See also croaker --root. # Root directory for croaker configuration and logs. See also croaker --root.
CROAKER_ROOT=~/.dnd/croaker CROAKER_ROOT={path.root()}
# where to store playlist sources # where to store playlist sources
#PLAYLIST_ROOT=$CROAKER_ROOT/playlists #PLAYLIST_ROOT={path.root()}/playlists
# where to cache transcoded media files
#CACHE_ROOT=$CROAKER_ROOT/cache
# Where the record the daemon's PID # Where the record the daemon's PID
#PIDFILE=$CROAKER_ROOT/croaker.pid #PIDFILE={path.root()}/croaker.pid
# Command and Control TCP Server bind address # Command and Control TCP Server bind address
HOST=0.0.0.0 HOST=0.0.0.0
@ -35,11 +31,6 @@ PORT=8003
# the kinds of files to add to playlists # the kinds of files to add to playlists
MEDIA_GLOB=*.mp3,*.flac,*.m4a MEDIA_GLOB=*.mp3,*.flac,*.m4a
# If defined, transcode media before streaming it, and cache it to disk. The
# strings INFILE and OUTFILE will be replaced with the media source file and
# the cached output location, respectively.
TRANSCODER=/usr/bin/ffmpeg -i INFILE '-hide_banner -loglevel error -codec:v copy -codec:a libmp3lame -q:a 2' OUTFILE
# Icecast2 configuration for Liquidsoap # Icecast2 configuration for Liquidsoap
ICECAST_PASSWORD= ICECAST_PASSWORD=
ICECAST_MOUNT= ICECAST_MOUNT=
@ -51,7 +42,7 @@ ICECAST_URL=
app = typer.Typer() app = typer.Typer()
app_state = {} app_state = {}
logger = logging.getLogger('cli') logger = logging.getLogger("cli")
@app.callback() @app.callback()
@ -76,20 +67,17 @@ def main(
level=logging.DEBUG if debug else logging.INFO, level=logging.DEBUG if debug else logging.INFO,
) )
try:
croaker.path.root()
croaker.path.playlist_root()
except ConfigurationError as e:
sys.stderr.write(f"{e}\n\n{SETUP_HELP}")
sys.exit(1)
@app.command() @app.command()
def setup(context: typer.Context): def setup(context: typer.Context):
""" """
(Re)Initialize Croaker. (Re)Initialize Croaker.
""" """
sys.stderr.write("Interactive setup is not yet available. Sorry!\n")
sys.stderr.write(
"Interactive setup is not available, but you can redirect "
"this command's output to a defaults file of your choice.\n"
)
print(dedent(SETUP_HELP)) print(dedent(SETUP_HELP))
@ -101,10 +89,7 @@ def start(
""" """
Start the Croaker command and control server. Start the Croaker command and control server.
""" """
if daemonize: server.start(daemonize=daemonize)
server.daemonize()
else:
server.start()
@app.command() @app.command()

View File

@ -9,11 +9,6 @@ def root():
return Path(os.environ.get("CROAKER_ROOT", "~/.dnd/croaker")).expanduser() return Path(os.environ.get("CROAKER_ROOT", "~/.dnd/croaker")).expanduser()
def cache_root():
path = Path(os.environ.get("CACHE_ROOT", root() / "cache")).expanduser()
return path
def playlist_root(): def playlist_root():
path = Path(os.environ.get("PLAYLIST_ROOT", root() / "playlists")).expanduser() path = Path(os.environ.get("PLAYLIST_ROOT", root() / "playlists")).expanduser()
return path return path

View File

@ -5,7 +5,7 @@ from pathlib import Path
from daemon import pidfile as _pidfile from daemon import pidfile as _pidfile
logger = logging.getLogger('daemon') logger = logging.getLogger("daemon")
def pidfile(pidfile_path: Path, sig=signal.SIGQUIT, terminate_if_running: bool = True): def pidfile(pidfile_path: Path, sig=signal.SIGQUIT, terminate_if_running: bool = True):

View File

@ -9,12 +9,10 @@ from typing import List
import croaker.path import croaker.path
logger = logging.getLogger('playlist') logger = logging.getLogger("playlist")
playlists = {} playlists = {}
NowPlaying = None
def _stripped(name): def _stripped(name):
name.replace('"', "") name.replace('"', "")
@ -25,21 +23,16 @@ def _stripped(name):
@dataclass @dataclass
class Playlist: class Playlist:
name: str name: str
position: int = 0
theme: Path = Path("_theme.mp3") theme: Path = Path("_theme.mp3")
@property
def current(self):
return self.tracks[self.position]
@cached_property @cached_property
def path(self): def path(self):
return croaker.path.playlist_root() / Path(self.name) return self._get_path()
@cached_property @cached_property
def tracks(self): def tracks(self):
if not self.path.exists(): if not self.path.exists():
raise RuntimeError(f"Playlist {self.name} not found at {self.path}.") raise RuntimeError(f"Playlist {self.name} not found at {self.path}.") # pragma: no cover
entries = [] entries = []
theme = self.path / self.theme theme = self.path / self.theme
@ -51,25 +44,17 @@ class Playlist:
entries += files entries += files
return entries return entries
def skip(self):
logging.debug(f"Skipping from {self.position} on {self.name}")
if self.position == len(self.tracks) - 1:
self.position = 0
else:
self.position += 1
def get_audio_files(self, path: Path = None): def get_audio_files(self, path: Path = None):
if not path: if not path:
path = self.path path = self.path
logging.debug(f"Getting files matching {os.environ['MEDIA_GLOB']} from {path}") logging.debug(f"Getting files matching {os.environ['MEDIA_GLOB']} from {path}")
pats = os.environ["MEDIA_GLOB"].split(",") pats = os.environ["MEDIA_GLOB"].split(",")
return chain(*[list(path.glob(pat)) for pat in pats]) return chain(*[list(path.rglob(pat)) for pat in pats])
def _add_track(self, target: Path, source: Path, make_theme: bool = False): def _get_path(self):
if source.is_dir(): return croaker.path.playlist_root() / self.name
for file in self.get_audio_files(source):
self._add_track(self.path / _stripped(file.name), file) def _add_track(self, target: Path, source: Path):
return
if target.exists(): if target.exists():
if not target.is_symlink(): if not target.is_symlink():
logging.warning(f"{target}: target already exists and is not a symlink; skipping.") logging.warning(f"{target}: target already exists and is not a symlink; skipping.")
@ -77,14 +62,26 @@ class Playlist:
target.unlink() target.unlink()
target.symlink_to(source) target.symlink_to(source)
def add(self, tracks: List[Path], make_theme: bool = False): def add(self, paths: List[Path], make_theme: bool = False):
logger.debug(f"Adding everything from {paths = }")
self.path.mkdir(parents=True, exist_ok=True) self.path.mkdir(parents=True, exist_ok=True)
if make_theme: for path in paths:
target = self.path / "_theme.mp3" if path.is_dir():
source = tracks.pop(0) files = list(self.get_audio_files(path))
self._add_track(target, source, make_theme=True) if make_theme:
for track in tracks: logger.debug(f"Adding first file from dir as theme: {files[0] = }")
self._add_track(target=self.path / _stripped(track.name), source=track) self._add_track(self.path / "_theme.mp3", files.pop(0))
make_theme = False
for file in files:
logger.debug(f"Adding {file = }")
self._add_track(target=self.path / _stripped(file.name), source=file)
elif make_theme:
logger.debug(f"Adding path as theme: {path = }")
self._add_track(self.path / "_theme.mp3", path)
make_theme = False
else:
logger.debug(f"Adding {path = }")
self._add_track(target=self.path / _stripped(path.name), source=path)
return sorted(self.get_audio_files()) return sorted(self.get_audio_files())
def __repr__(self): def __repr__(self):
@ -93,7 +90,7 @@ class Playlist:
return "\n".join(lines) return "\n".join(lines)
def load_playlist(name: str): def load_playlist(name: str): # pragma: no cover
if name not in playlists: if name not in playlists:
playlists[name] = Playlist(name=name) playlists[name] = Playlist(name=name)
return playlists[name] return playlists[name]

190
src/croaker/server.py Normal file
View File

@ -0,0 +1,190 @@
import logging
import os
import queue
import socketserver
import threading
from pathlib import Path
from time import sleep
import daemon
from croaker import path
from croaker.pidfile import pidfile
from croaker.playlist import load_playlist
from croaker.streamer import AudioStreamer
logger = logging.getLogger("server")
class RequestHandler(socketserver.StreamRequestHandler):
"""
Instantiated by the TCPServer when a request is received. Implements the
command and control protocol and sends commands to the shoutcast source
client on behalf of the user.
"""
supported_commands = {
# command # help text
"PLAY": "PLAYLIST - Switch to the specified playlist.",
"LIST": "[PLAYLIST] - List playlists or contents of the specified list.",
"FFWD": " - Skip to the next track in the playlist.",
"HELP": " - Display command help.",
"KTHX": " - Close the current connection.",
"STOP": " - Stop the current track and stream silence.",
"STFU": " - Terminate the Croaker server.",
}
should_listen = True
def handle(self):
"""
Start a command and control session. Commands are read one line at a
time; the format is:
Byte Definition
-------------------
0-3 Command
4 Ignored
5+ Arguments
"""
while True:
self.data = self.rfile.readline().strip().decode()
logger.debug(f"Received: {self.data}")
try:
cmd = self.data[0:4].strip().upper()
args = self.data[5:]
except IndexError:
self.send(f"ERR Command not understood '{cmd}'")
sleep(0.001)
continue
if not cmd:
sleep(0.001)
continue
elif cmd not in self.supported_commands:
self.send(f"ERR Unknown Command '{cmd}'")
sleep(0.001)
continue
elif cmd == "KTHX":
return self.send("KBAI")
handler = getattr(self, f"handle_{cmd}", None)
if not handler:
self.send(f"ERR No handler for {cmd}.")
handler(args)
if not self.should_listen:
break
def send(self, msg):
return self.wfile.write(msg.encode() + b"\n")
def handle_PLAY(self, args):
self.server.load(args)
return self.send("OK")
def handle_FFWD(self, args):
self.server.ffwd()
return self.send("OK")
def handle_LIST(self, args):
return self.send(self.server.list(args))
def handle_HELP(self, args):
return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items()))
def handle_STOP(self, args):
return self.server.stop_event.set()
def handle_STFU(self, args):
self.send("Shutting down.")
self.server.stop()
class CroakerServer(socketserver.TCPServer):
"""
A Daemonized TCP Server that also starts a Shoutcast source client.
"""
allow_reuse_address = True
def __init__(self):
self._context = daemon.DaemonContext()
self._queue = queue.Queue()
self.skip_event = threading.Event()
self.stop_event = threading.Event()
self.load_event = threading.Event()
self._streamer = None
self.playlist = None
def _pidfile(self):
return pidfile(path.root() / "croaker.pid")
@property
def streamer(self):
if not self._streamer:
self._streamer = AudioStreamer(self._queue, self.skip_event, self.stop_event, self.load_event)
return self._streamer
def bind_address(self):
return (os.environ["HOST"], int(os.environ["PORT"]))
def _daemonize(self) -> None:
"""
Daemonize the current process.
"""
logger.info(f"Daemonizing controller; pidfile and output in {path.root()}")
self._context.pidfile = self._pidfile()
self._context.stdout = open(path.root() / Path("croaker.out"), "wb", buffering=0)
self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0)
# when open() is called, all open file descriptors will be closed, as
# befits a good daemon. However this will also close the socket on
# which the TCPServer is listening! So let's keep that one open.
self._context.files_preserve = [self.fileno()]
self._context.open()
def start(self, daemonize: bool = True) -> None:
"""
Start the shoutcast controller background thread, then begin listening for connections.
"""
logger.info(f"Starting controller on {self.bind_address()}.")
super().__init__(self.bind_address(), RequestHandler)
if daemonize:
self._daemonize()
try:
logger.debug("Starting AudioStreamer...")
self.streamer.start()
self.load("session_start")
self.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down.")
self.stop()
def stop(self):
self._pidfile()
def ffwd(self):
logger.debug("Sending SKIP signal to streamer...")
self.skip_event.set()
def clear_queue(self):
logger.debug("Requesting a reload...")
self.streamer.load_requested.set()
sleep(0.5)
def list(self, playlist_name: str = None):
if playlist_name:
return str(load_playlist(playlist_name))
return "\n".join([str(p.name) for p in path.playlist_root().iterdir()])
def load(self, playlist_name: str):
logger.debug(f"Switching to {playlist_name = }")
if self.playlist:
self.clear_queue()
self.playlist = load_playlist(playlist_name)
logger.debug(f"Loaded new playlist {self.playlist = }")
for track in self.playlist.tracks:
self._queue.put(str(track).encode())
server = CroakerServer()

BIN
src/croaker/silence.mp3 Normal file

Binary file not shown.

116
src/croaker/streamer.py Normal file
View File

@ -0,0 +1,116 @@
import logging
import os
import queue
import threading
from functools import cached_property
from pathlib import Path
from time import sleep
import shout
from croaker.transcoder import FrameAlignedStream
logger = logging.getLogger("streamer")
class AudioStreamer(threading.Thread):
"""
Receive filenames from the controller thread and stream the contents of
those files to the icecast server.
"""
def __init__(self, queue, skip_event, stop_event, load_event, chunk_size=4096):
super().__init__()
self.queue = queue
self.skip_requested = skip_event
self.stop_requested = stop_event
self.load_requested = load_event
self.chunk_size = chunk_size
@property
def silence(self):
return FrameAlignedStream.from_source(Path(__file__).parent / "silence.mp3", chunk_size=self.chunk_size)
@cached_property
def _shout(self):
s = shout.Shout()
s.name = "Croaker Radio"
s.url = os.environ["ICECAST_URL"]
s.mount = os.environ["ICECAST_MOUNT"]
s.host = os.environ["ICECAST_HOST"]
s.port = int(os.environ["ICECAST_PORT"])
s.password = os.environ["ICECAST_PASSWORD"]
s.protocol = os.environ.get("ICECAST_PROTOCOL", "http")
s.format = os.environ.get("ICECAST_FORMAT", "mp3")
return s
def run(self): # pragma: no cover
while True:
try:
logger.debug(f"Connecting to shoutcast server at {self._shout.host}:{self._shout.port}")
self._shout.open()
except shout.ShoutException as e:
logger.error("Error connecting to shoutcast server. Will sleep and try again.", exc_info=e)
sleep(3)
continue
try:
self.stream_queued_audio()
except Exception as exc:
logger.error("Caught exception.", exc_info=exc)
self._shout.close()
def clear_queue(self):
logger.debug("Clearing queue...")
while not self.queue.empty():
self.queue.get()
def queued_audio_source(self):
"""
Return a filehandle to the next queued audio source, or silence if the queue is empty.
"""
try:
track = Path(self.queue.get(block=False).decode())
logger.debug(f"Streaming {track.stem = }")
return FrameAlignedStream.from_source(track, chunk_size=self.chunk_size), track.stem
except queue.Empty:
logger.debug("Nothing queued; enqueing silence.")
except Exception as exc:
logger.error("Caught exception; falling back to silence.", exc_info=exc)
return self.silence, "[NOTHING PLAYING]"
def stream_queued_audio(self):
stream = None
title = None
next_stream = None
next_title = None
while True:
stream, title = (next_stream, next_title) if next_stream else self.queued_audio_source()
logging.debug(f"Starting stream of {title = }, {stream = }")
self._shout.set_metadata({"song": title})
next_stream, next_title = self.queued_audio_source()
for chunk in stream:
self._shout.send(chunk)
self._shout.sync()
# play the next source immediately
if self.skip_requested.is_set():
logger.debug("Skip was requested.")
self.skip_requested.clear()
break
# clear the queue
if self.load_requested.is_set():
logger.debug("Load was requested.")
self.clear_queue()
self.load_requested.clear()
break
# Stop streaming and clear the queue
if self.stop_requested.is_set():
logger.debug("Stop was requested.")
self.clear_queue()
self.stop_requested.clear()
break

132
src/croaker/transcoder.py Normal file
View File

@ -0,0 +1,132 @@
import logging
import os
import subprocess
from dataclasses import dataclass
from io import BufferedReader
from pathlib import Path
import ffmpeg
logger = logging.getLogger("transcoder")
@dataclass
class FrameAlignedStream:
"""
Use ffmpeg to transcode a source audio file to mp3 and iterate over the result
in frame-aligned chunks. This will ensure that readers will always have a full
frame of audio data to parse or emit.
I learned a lot from https://github.com/pylon/streamp3 figuring this stuff out!
Usage:
>>> stream = FrameAlignedStream.from_source(Path('test.flac').open('rb'))
>>> for segment in stream:
...
"""
source: BufferedReader
chunk_size: int = 1024
bit_rate: int = 192000
sample_rate: int = 44100
@property
def frames(self):
while True:
frame = self._read_one_frame()
if not frame:
return
yield frame
def _read_one_frame(self):
"""
Read the next full audio frame from the input source and return it
"""
# step through the source a byte at a time and look for the frame sync.
header = None
buffer = b""
while not header:
buffer += self.source.read(4 - len(buffer))
if len(buffer) != 4:
logging.debug("Reached the end of the source stream without finding another framesync.")
return False
header = buffer[:4]
if header[0] != 0b11111111 or header[1] >> 5 != 0b111:
logging.debug(f"Expected a framesync but got {buffer} instead; moving fwd 1 byte.")
header = None
buffer = buffer[1:]
# Decode the mp3 header. We could derive the bit_rate and sample_rate
# here if we had the lookup tables etc. from the MPEG spec, but since
# we control the input, we can rely on them being predefined.
version_code = (header[1] & 0b00011000) >> 3
padding_code = (header[2] & 0b00000010) >> 1
version = version_code & 1 if version_code >> 1 else 2
is_padded = bool(padding_code)
# calculate the size of the whole frame
frame_size = 1152 if version == 1 else 576
frame_size = self.bit_rate // 8 * frame_size // self.sample_rate
if is_padded:
frame_size += 1
# read the rest of the frame from the source
frame_data = self.source.read(frame_size - len(header))
if len(frame_data) != frame_size - len(header):
logging.debug("Reached the end of the source stream without finding a full frame.")
return None
# return the entire frame
return header + frame_data
def __iter__(self):
"""
Generate approximately chunk_size segments of audio data by iterating over the
frames, buffering them, and then yielding several as a single bytes object.
"""
buf = b""
for frame in self.frames:
if len(buf) >= self.chunk_size:
yield buf
buf = b""
if not frame:
break
buf += frame
if buf:
yield buf
@classmethod
def from_source(cls, infile: Path, **kwargs):
"""
Create a FrameAlignedStream instance by transcoding an audio source on disk.
"""
args = [] if os.environ.get("DEBUG") else ["-hide_banner", "-loglevel", "quiet"]
ffmpeg_args = (
ffmpeg.input(str(infile))
.output(
"pipe:",
map="a",
format="mp3",
# no ID3 headers -- saves having to decode them later
write_xing=0,
id3v2_version=0,
# force sample and bit rates
**{
"b:a": kwargs.get("bit_rate", cls.bit_rate),
"ar": kwargs.get("sample_rate", cls.sample_rate),
},
)
.global_args("-vn", *args)
.compile()
)
# Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg.
proc = subprocess.Popen(
ffmpeg_args, bufsize=kwargs.get("chunk_size", cls.chunk_size), stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
proc.stdin.close()
logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }")
return cls(proc.stdout, **kwargs)

16
test/conftest.py Normal file
View File

@ -0,0 +1,16 @@
from pathlib import Path
import pytest
@pytest.fixture(autouse=True)
def mock_env(monkeypatch):
fixtures = Path(__file__).parent / "fixtures"
monkeypatch.setenv("CROAKER_ROOT", str(fixtures))
monkeypatch.setenv("MEDIA_GLOB", "*.mp3,*.foo,*.bar")
monkeypatch.setenv("ICECAST_URL", "http://127.0.0.1")
monkeypatch.setenv("ICECAST_HOST", "localhost")
monkeypatch.setenv("ICECAST_MOUNT", "mount")
monkeypatch.setenv("ICECAST_PORT", "6523")
monkeypatch.setenv("ICECAST_PASSWORD", "password")
monkeypatch.setenv("DEBUG", "1")

View File

@ -0,0 +1 @@
_theme.mp3

View File

View File

View File

@ -0,0 +1 @@
one.mp3

View File

@ -0,0 +1 @@
two.mp3

View File

0
test/fixtures/sources/one.mp3 vendored Normal file
View File

0
test/fixtures/sources/two.mp3 vendored Normal file
View File

35
test/test_pidfile.py Normal file
View File

@ -0,0 +1,35 @@
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from croaker import pidfile
@pytest.mark.parametrize(
"pid,terminate,kill_result,broken",
[
("pid", False, None, False), # running proc, no terminate
("pid", True, True, False), # running proc, terminate
("pid", True, ProcessLookupError, True), # stale pid
(None, None, None, False), # no running proc
],
)
def test_pidfile(monkeypatch, pid, terminate, kill_result, broken):
monkeypatch.setattr(
pidfile._pidfile,
"TimeoutPIDLockFile",
MagicMock(
**{
"return_value.read_pid.return_value": pid,
}
),
)
monkeypatch.setattr(
pidfile.os,
"kill",
MagicMock(**{"side_effect": kill_result if type(kill_result) is Exception else [kill_result]}),
)
ret = pidfile.pidfile(pidfile_path=Path("/dev/null"), terminate_if_running=terminate)
assert ret.break_lock.called == broken

44
test/test_playlist.py Normal file
View File

@ -0,0 +1,44 @@
from unittest.mock import MagicMock
import pytest
import croaker.path
import croaker.playlist
def test_playlist_loading():
pl = croaker.playlist.Playlist(name="test_playlist")
path = str(pl.path)
tracks = [str(t) for t in pl.tracks]
assert path == str(croaker.path.playlist_root() / pl.name)
assert pl.name == "test_playlist"
assert tracks[0] == f"{path}/_theme.mp3"
assert f"{path}/one.mp3" in tracks
assert f"{path}/two.mp3" in tracks
assert f"{path}/one.foo" in tracks
assert f"{path}/one.baz" not in tracks
@pytest.mark.parametrize(
"paths, make_theme, expected_count",
[
(["test_playlist"], True, 4),
(["test_playlist"], False, 4),
(["test_playlist", "sources/one.mp3"], True, 5),
(["test_playlist", "sources/one.mp3"], False, 5),
],
)
def test_playlist_creation(monkeypatch, paths, make_theme, expected_count):
new_symlinks = []
def symlink(target):
new_symlinks.append(target)
pl = croaker.playlist.Playlist(name="foo")
monkeypatch.setattr(croaker.playlist.Path, "unlink", MagicMock())
monkeypatch.setattr(croaker.playlist.Path, "symlink_to", MagicMock(side_effect=symlink))
monkeypatch.setattr(croaker.playlist.Path, "mkdir", MagicMock())
pl.add([croaker.path.playlist_root() / p for p in paths], make_theme)
assert len(new_symlinks) == expected_count

132
test/test_streamer.py Normal file
View File

@ -0,0 +1,132 @@
import io
import queue
import threading
from pathlib import Path
from unittest.mock import MagicMock
import pytest
import shout
from croaker import playlist, streamer
def get_stream_output(stream):
return stream.read()
@pytest.fixture(scope="session")
def silence_bytes():
return (Path(streamer.__file__).parent / "silence.mp3").read_bytes()
@pytest.fixture
def output_stream():
return io.BytesIO()
@pytest.fixture
def mock_shout(output_stream, monkeypatch):
def handle_send(buf):
output_stream.write(buf)
mm = MagicMock(spec=shout.Shout, **{"return_value.send.side_effect": handle_send})
monkeypatch.setattr("shout.Shout", mm)
return mm
@pytest.fixture
def input_queue():
return queue.Queue()
@pytest.fixture
def skip_event():
return threading.Event()
@pytest.fixture
def stop_event():
return threading.Event()
@pytest.fixture
def load_event():
return threading.Event()
@pytest.fixture
def audio_streamer(mock_shout, input_queue, skip_event, stop_event, load_event):
return streamer.AudioStreamer(input_queue, skip_event, stop_event, load_event)
def test_streamer_stop(audio_streamer, stop_event, output_stream):
stop_event.set()
audio_streamer.stream_queued_audio()
assert not stop_event.is_set()
def test_streamer_skip(audio_streamer, skip_event, output_stream):
skip_event.set()
audio_streamer.stream_queued_audio()
assert not skip_event.is_set()
def test_streamer_load(audio_streamer, load_event, output_stream):
load_event.set()
audio_streamer.stream_queued_audio()
assert not load_event.is_set()
def test_clear_queue(audio_streamer, input_queue):
pl = playlist.Playlist(name="test_playlist")
for track in pl.tracks:
input_queue.put(bytes(track))
assert input_queue.not_empty
audio_streamer.clear_queue()
assert input_queue.empty
@pytest.mark.skip
def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes):
audio_streamer.stream_queued_audio()
track = playlist.Playlist(name="test_playlist").tracks[0]
input_queue.put(bytes(track))
audio_streamer.stream_queued_audio()
audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes
@pytest.mark.skip
def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes):
monkeypatch.setattr(audio_streamer.queue, "get", MagicMock(side_effect=Exception))
track = playlist.Playlist(name="test_playlist").tracks[0]
input_queue.put(bytes(track))
audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == silence_bytes
@pytest.mark.skip
def test_streamer_plays_from_queue(audio_streamer, input_queue, output_stream):
pl = playlist.Playlist(name="test_playlist")
expected = b""
for track in pl.tracks:
input_queue.put(bytes(track))
expected += track.read_bytes()
while not input_queue.empty():
audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == expected
def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event):
stop_event.set()
audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == b""
def test_streamer_handles_load_interrupt(audio_streamer, input_queue, output_stream, load_event):
pl = playlist.Playlist(name="test_playlist")
input_queue.put(bytes(pl.tracks[0]))
load_event.set()
audio_streamer.stream_queued_audio()
assert get_stream_output(output_stream) == b""
assert input_queue.empty

43
test/test_transcoder.py Normal file
View File

@ -0,0 +1,43 @@
from unittest.mock import MagicMock
import ffmpeg
import pytest
from croaker import playlist, transcoder
@pytest.fixture
def mock_mp3decoder(monkeypatch):
def read(stream):
return stream.read()
monkeypatch.setattr(transcoder, "MP3Decoder", MagicMock(**{"__enter__.return_value.read": read}))
@pytest.mark.xfail
@pytest.mark.parametrize(
"suffix, expected",
[
(".mp3", b"_theme.mp3\n"),
(".foo", b"transcoding!\n"),
],
)
def test_transcoder_open(monkeypatch, mock_mp3decoder, suffix, expected):
monkeypatch.setattr(
transcoder,
"ffmpeg",
MagicMock(
spec=ffmpeg,
**{
"input.return_value."
"output.return_value."
"global_args.return_value."
"compile.return_value": ["echo", "transcoding!"],
},
),
)
pl = playlist.Playlist(name="test_playlist")
track = [t for t in pl.tracks if t.suffix == suffix][0]
with transcoder.open(track) as handle:
assert handle.read() == expected