Adding transcoader / recoverable errors

This commit is contained in:
evilchili 2024-03-10 00:05:24 -08:00
parent 912d3fccd7
commit 27164358ae
5 changed files with 118 additions and 43 deletions

View File

@ -29,7 +29,8 @@ class RequestHandler(socketserver.StreamRequestHandler):
"FFWD": " - Skip to the next track in the playlist.", "FFWD": " - Skip to the next track in the playlist.",
"HELP": " - Display command help.", "HELP": " - Display command help.",
"KTHX": " - Close the current connection.", "KTHX": " - Close the current connection.",
"STOP": " - Stop Croaker.", "STOP": " - Stop the current track and stream silence.",
"STFU": " - Terminate the Croaker server."
} }
def handle(self): def handle(self):
@ -45,6 +46,7 @@ class RequestHandler(socketserver.StreamRequestHandler):
""" """
while True: while True:
self.data = self.rfile.readline().strip().decode() self.data = self.rfile.readline().strip().decode()
logger.debug(f"Received: {self.data}")
try: try:
cmd = self.data[0:4].strip().upper() cmd = self.data[0:4].strip().upper()
args = self.data[5:] args = self.data[5:]
@ -86,6 +88,9 @@ class RequestHandler(socketserver.StreamRequestHandler):
return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items())) return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items()))
def handle_STOP(self, args): def handle_STOP(self, args):
return(self.server.stop_event.set())
def handle_STFU(self, args):
self.send("Shutting down.") self.send("Shutting down.")
self.server.stop() self.server.stop()

BIN
croaker/silence.mp3 Normal file

Binary file not shown.

View File

@ -1,4 +1,6 @@
import queue
import logging import logging
import io
import os import os
import threading import threading
from functools import cached_property from functools import cached_property
@ -6,6 +8,8 @@ from pathlib import Path
import shout import shout
from croaker import transcoder
logger = logging.getLogger('streamer') logger = logging.getLogger('streamer')
@ -22,6 +26,11 @@ class AudioStreamer(threading.Thread):
self.load_requested = load_event self.load_requested = load_event
self.chunk_size = chunk_size self.chunk_size = chunk_size
@cached_property
def silence(self):
with (Path(__file__).parent / 'silence.mp3').open('rb') as stream:
return io.BytesIO(stream.read())
@cached_property @cached_property
def _shout(self): def _shout(self):
s = shout.Shout() s = shout.Shout()
@ -37,19 +46,38 @@ class AudioStreamer(threading.Thread):
return s return s
def run(self): def run(self):
logger.debug("Initialized")
self._shout.open() self._shout.open()
while not self.stop_requested.is_set(): logger.debug(f"Connnected to shoutcast server at {self._shout.host}:{self._shout.port}")
self._shout.get_connected() while True:
track = self.queue.get(block=True)
logger.debug(f"Received: {track = }") # If the user said STOP, clear the queue.
if track: if self.stop_requested.is_set():
logger.debug("Stop requested; clearing queue.")
self.clear_queue()
self.stop_requested.clear()
# Check to see if there is a queued request. If there is, play it.
# If there isn't, or if there's a problem playing the request,
# fallback to silence.
not_playing = False
try:
request = self.queue.get(block=False)
logger.debug(f"Received: {request = }")
self.play_file(Path(request.decode()))
except queue.Empty:
logger.debug("Nothing queued; looping silence.")
not_playing = True
except Exception as exc:
logger.error("Caught exception; falling back to silence.", exc_info=exc)
not_playing = True
if not_playing:
try: try:
self.play(Path(track.decode())) self.silence.seek(0, 0)
except shout.ShoutException as e: self._shout.set_metadata({"song": '[NOTHING PLAYING]'})
logger.error("An error occurred while streaming a track.", exc_info=e) self.play_from_stream(self.silence)
self._shout.close() except Exception as exc:
self._shout.open() logger.error("Caught exception trying to loop silence!", exc_info=exc)
self._shout.close() self._shout.close()
def clear_queue(self): def clear_queue(self):
@ -60,37 +88,43 @@ class AudioStreamer(threading.Thread):
self.load_requested.clear() self.load_requested.clear()
logger.debug("Load event cleared.") logger.debug("Load event cleared.")
def play(self, track: Path): def _read_chunk(self, filehandle):
with track.open("rb") as fh: chunk = filehandle.read(self.chunk_size)
self._shout.get_connected() return chunk
logger.debug(f"Streaming {track.stem = }")
self._shout.set_metadata({"song": track.stem})
input_buffer = fh.read(self.chunk_size)
while True:
# To load a playlist, stop streaming the current track and clear the queue def play_file(self, track: Path):
# but do not clear the event. run() will detect it and logger.debug(f"Streaming {track.stem = }")
if self.load_requested.is_set(): self._shout.set_metadata({"song": track.stem})
logger.debug("Load was requested.") with transcoder.open(track) as fh:
self.clear_queue() return self.play_from_stream(fh)
return
# Stop streaming and clear the queue def play_from_stream(self, stream):
if self.stop_requested.is_set(): self._shout.get_connected()
logger.debug("Stop was requested.") input_buffer = self._read_chunk(stream)
self.stop_requested.clear() while True:
return
# Stop streaming and clear the queue # To load a playlist, stop streaming the current track and clear the queue
if self.skip_requested.is_set(): # but do not clear the event. run() will detect it and
logger.debug("Skip was requested.") if self.load_requested.is_set():
self.skip_requested.clear() logger.debug("Load was requested.")
return self.clear_queue()
return
# continue streaming the current track to icecast, until complete # Stop streaming and clear the queue
buf = input_buffer if self.stop_requested.is_set():
input_buffer = fh.read(self.chunk_size) logger.debug("Stop was requested; aborting current stream.")
if len(buf) == 0: return
break
self._shout.send(buf) # Stop streaming and clear the queue
self._shout.sync() if self.skip_requested.is_set():
logger.debug("Skip was requested.")
self.skip_requested.clear()
return
# continue streaming the current track to icecast, until complete
buf = input_buffer
input_buffer = self._read_chunk(stream)
if len(buf) == 0:
break
self._shout.send(buf)
self._shout.sync()

35
croaker/transcoder.py Normal file
View File

@ -0,0 +1,35 @@
from pathlib import Path
import subprocess
import logging
import ffmpeg
logger = logging.getLogger('transcoder')
def open(infile: Path):
"""
Return a stream of mp3 data for the given path on disk.
If the requested path is an mp3, return a filehandle on the file. Otherwise,
invoke ffmpeg to tranascode whatever was requested to mp3 format and return
a pipe to ffmpeg's STDOUT.
"""
suffix = infile.suffix.lower()
if suffix == '.mp3':
logger.debug(f"Not transcoding mp3 {infile = }")
return infile.open('rb')
ffmpeg_args = (
ffmpeg
.input(str(infile))
.output('-', format='mp3', q=2)
.global_args('-hide_banner', '-loglevel', 'quiet')
.compile()
)
# Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg.
proc = subprocess.Popen(ffmpeg_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
proc.stdin.close()
logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }")
return proc.stdout

View File

@ -9,7 +9,7 @@ packages = [
] ]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.8" python = ">=3.10,<4.0"
prompt-toolkit = "^3.0.38" prompt-toolkit = "^3.0.38"
typer = "^0.9.0" typer = "^0.9.0"
python-dotenv = "^0.21.0" python-dotenv = "^0.21.0"
@ -21,6 +21,7 @@ requests = "^2.31.0"
psutil = "^5.9.8" psutil = "^5.9.8"
exscript = "^2.6.28" exscript = "^2.6.28"
python-shout = "^0.2.8" python-shout = "^0.2.8"
ffmpeg-python = "^0.2.0"
[tool.poetry.scripts] [tool.poetry.scripts]
croaker = "croaker.cli:app" croaker = "croaker.cli:app"