From 27164358ae64eb4b89056f242612dc65dbd62298 Mon Sep 17 00:00:00 2001 From: evilchili Date: Sun, 10 Mar 2024 00:05:24 -0800 Subject: [PATCH] Adding transcoader / recoverable errors --- croaker/server.py | 7 ++- croaker/silence.mp3 | Bin 0 -> 12292 bytes croaker/streamer.py | 116 +++++++++++++++++++++++++++--------------- croaker/transcoder.py | 35 +++++++++++++ pyproject.toml | 3 +- 5 files changed, 118 insertions(+), 43 deletions(-) create mode 100644 croaker/silence.mp3 create mode 100644 croaker/transcoder.py diff --git a/croaker/server.py b/croaker/server.py index e13d570..8488073 100644 --- a/croaker/server.py +++ b/croaker/server.py @@ -29,7 +29,8 @@ class RequestHandler(socketserver.StreamRequestHandler): "FFWD": " - Skip to the next track in the playlist.", "HELP": " - Display command help.", "KTHX": " - Close the current connection.", - "STOP": " - Stop Croaker.", + "STOP": " - Stop the current track and stream silence.", + "STFU": " - Terminate the Croaker server." } def handle(self): @@ -45,6 +46,7 @@ class RequestHandler(socketserver.StreamRequestHandler): """ while True: self.data = self.rfile.readline().strip().decode() + logger.debug(f"Received: {self.data}") try: cmd = self.data[0:4].strip().upper() args = self.data[5:] @@ -86,6 +88,9 @@ class RequestHandler(socketserver.StreamRequestHandler): return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items())) def handle_STOP(self, args): + return(self.server.stop_event.set()) + + def handle_STFU(self, args): self.send("Shutting down.") self.server.stop() diff --git a/croaker/silence.mp3 b/croaker/silence.mp3 new file mode 100644 index 0000000000000000000000000000000000000000..7d528758ef20f0fb104725be9df9b85b67052af3 GIT binary patch literal 12292 zcmeZtF=k-^0WL5JCjS2pKyYxdJoD1>fl~ZHT*APh|AB#ni9gj{Td-vL%GGPv zZ`rnE*PeZcj~+jH`rL&p*KXXtd;js%7q8yD`}Fnu&)40A0<}#Yz&?0DX#SgvG!KgiG$slwz9bj$0p$8KN zpSW-vEqBlZZg7R$X#5QB_@P&MO|Nt?-0Y!O{LsrDdc_aD{4;6~dF9)13%B8xu7{g{ zhFf|Y_0MRz0~)^`Zuyj6{uyrejFtzZsGg|JD7eAxnMqaoLxAI`P`G;QV in(}Z%Pk&hWpmoF)kGo(o7hQ;s_RuTbhMPTfNe2MnBpGr5 literal 0 HcmV?d00001 diff --git a/croaker/streamer.py b/croaker/streamer.py index 8ed37b6..f8b9d10 100644 --- a/croaker/streamer.py +++ b/croaker/streamer.py @@ -1,4 +1,6 @@ +import queue import logging +import io import os import threading from functools import cached_property @@ -6,6 +8,8 @@ from pathlib import Path import shout +from croaker import transcoder + logger = logging.getLogger('streamer') @@ -22,6 +26,11 @@ class AudioStreamer(threading.Thread): self.load_requested = load_event self.chunk_size = chunk_size + @cached_property + def silence(self): + with (Path(__file__).parent / 'silence.mp3').open('rb') as stream: + return io.BytesIO(stream.read()) + @cached_property def _shout(self): s = shout.Shout() @@ -37,19 +46,38 @@ class AudioStreamer(threading.Thread): return s def run(self): - logger.debug("Initialized") self._shout.open() - while not self.stop_requested.is_set(): - self._shout.get_connected() - track = self.queue.get(block=True) - logger.debug(f"Received: {track = }") - if track: + logger.debug(f"Connnected to shoutcast server at {self._shout.host}:{self._shout.port}") + while True: + + # If the user said STOP, clear the queue. + if self.stop_requested.is_set(): + logger.debug("Stop requested; clearing queue.") + self.clear_queue() + self.stop_requested.clear() + + # Check to see if there is a queued request. If there is, play it. + # If there isn't, or if there's a problem playing the request, + # fallback to silence. + not_playing = False + try: + request = self.queue.get(block=False) + logger.debug(f"Received: {request = }") + self.play_file(Path(request.decode())) + except queue.Empty: + logger.debug("Nothing queued; looping silence.") + not_playing = True + except Exception as exc: + logger.error("Caught exception; falling back to silence.", exc_info=exc) + not_playing = True + + if not_playing: try: - self.play(Path(track.decode())) - except shout.ShoutException as e: - logger.error("An error occurred while streaming a track.", exc_info=e) - self._shout.close() - self._shout.open() + self.silence.seek(0, 0) + self._shout.set_metadata({"song": '[NOTHING PLAYING]'}) + self.play_from_stream(self.silence) + except Exception as exc: + logger.error("Caught exception trying to loop silence!", exc_info=exc) self._shout.close() def clear_queue(self): @@ -60,37 +88,43 @@ class AudioStreamer(threading.Thread): self.load_requested.clear() logger.debug("Load event cleared.") - def play(self, track: Path): - with track.open("rb") as fh: - self._shout.get_connected() - logger.debug(f"Streaming {track.stem = }") - self._shout.set_metadata({"song": track.stem}) - input_buffer = fh.read(self.chunk_size) - while True: + def _read_chunk(self, filehandle): + chunk = filehandle.read(self.chunk_size) + return chunk - # To load a playlist, stop streaming the current track and clear the queue - # but do not clear the event. run() will detect it and - if self.load_requested.is_set(): - logger.debug("Load was requested.") - self.clear_queue() - return + def play_file(self, track: Path): + logger.debug(f"Streaming {track.stem = }") + self._shout.set_metadata({"song": track.stem}) + with transcoder.open(track) as fh: + return self.play_from_stream(fh) - # Stop streaming and clear the queue - if self.stop_requested.is_set(): - logger.debug("Stop was requested.") - self.stop_requested.clear() - return + def play_from_stream(self, stream): + self._shout.get_connected() + input_buffer = self._read_chunk(stream) + while True: - # Stop streaming and clear the queue - if self.skip_requested.is_set(): - logger.debug("Skip was requested.") - self.skip_requested.clear() - return + # To load a playlist, stop streaming the current track and clear the queue + # but do not clear the event. run() will detect it and + if self.load_requested.is_set(): + logger.debug("Load was requested.") + self.clear_queue() + return - # continue streaming the current track to icecast, until complete - buf = input_buffer - input_buffer = fh.read(self.chunk_size) - if len(buf) == 0: - break - self._shout.send(buf) - self._shout.sync() + # Stop streaming and clear the queue + if self.stop_requested.is_set(): + logger.debug("Stop was requested; aborting current stream.") + return + + # Stop streaming and clear the queue + if self.skip_requested.is_set(): + logger.debug("Skip was requested.") + self.skip_requested.clear() + return + + # continue streaming the current track to icecast, until complete + buf = input_buffer + input_buffer = self._read_chunk(stream) + if len(buf) == 0: + break + self._shout.send(buf) + self._shout.sync() diff --git a/croaker/transcoder.py b/croaker/transcoder.py new file mode 100644 index 0000000..253cb61 --- /dev/null +++ b/croaker/transcoder.py @@ -0,0 +1,35 @@ +from pathlib import Path +import subprocess +import logging + +import ffmpeg + +logger = logging.getLogger('transcoder') + + +def open(infile: Path): + """ + Return a stream of mp3 data for the given path on disk. + + If the requested path is an mp3, return a filehandle on the file. Otherwise, + invoke ffmpeg to tranascode whatever was requested to mp3 format and return + a pipe to ffmpeg's STDOUT. + """ + suffix = infile.suffix.lower() + if suffix == '.mp3': + logger.debug(f"Not transcoding mp3 {infile = }") + return infile.open('rb') + + ffmpeg_args = ( + ffmpeg + .input(str(infile)) + .output('-', format='mp3', q=2) + .global_args('-hide_banner', '-loglevel', 'quiet') + .compile() + ) + + # Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg. + proc = subprocess.Popen(ffmpeg_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + proc.stdin.close() + logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }") + return proc.stdout diff --git a/pyproject.toml b/pyproject.toml index 4a55a3a..03936c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ packages = [ ] [tool.poetry.dependencies] -python = "^3.8" +python = ">=3.10,<4.0" prompt-toolkit = "^3.0.38" typer = "^0.9.0" python-dotenv = "^0.21.0" @@ -21,6 +21,7 @@ requests = "^2.31.0" psutil = "^5.9.8" exscript = "^2.6.28" python-shout = "^0.2.8" +ffmpeg-python = "^0.2.0" [tool.poetry.scripts] croaker = "croaker.cli:app"