grooveondemand/groove/shell/interactive_shell.py
2022-12-31 12:43:48 -08:00

239 lines
7.3 KiB
Python

from slugify import slugify
from groove.db.manager import database_manager
from groove.media.scanner import MediaScanner
from groove.media.transcoder import Transcoder
from groove.shell.base import BasePrompt, command
from groove.exceptions import InvalidPathError
from groove import db
from groove.playlist import Playlist
from rich.table import Column
from rich import box
from sqlalchemy import func
class InteractiveShell(BasePrompt):
def __init__(self, manager):
super().__init__(manager=manager)
self._playlist = None
self._completer = None
self._prompt = [
"[help]Groove on Demand interactive shell. Try 'help' for help.[/help]",
"groove"
]
self._subshells = {}
self._register_subshells()
def _register_subshells(self):
for subclass in BasePrompt.__subclasses__():
if subclass.__name__ == self.__class__.__name__:
continue
self._subshells[subclass.__name__] = subclass(manager=self.manager, parent=self)
def _get_stats(self):
playlists = self.manager.session.query(func.count(db.playlist.c.id)).scalar()
entries = self.manager.session.query(func.count(db.entry.c.track)).scalar()
tracks = self.manager.session.query(func.count(db.track.c.relpath)).scalar()
return f"Database contains {playlists} playlists with a total of {entries} entries, from {tracks} known tracks."
@property
def playlist(self):
return self._playlist
@property
def autocomplete_values(self):
return list(self.commands.keys())
def default_completer(self, document, complete_event): # pragma: no cover
def _formatter(row):
self._playlist = Playlist.from_row(row, self.manager.session)
return self.playlist.record.name
return self.manager.fuzzy_table_completer(
db.playlist,
db.playlist.c.name,
_formatter
).get_completions(document, complete_event)
def process(self, cmd, *parts):
if cmd in self.commands:
return self.commands[cmd].handler(self, parts)
name = cmd + ' ' + ' '.join(parts)
self.load([name.strip()])
@command("""
[title]SCANNING YOUR MEDIA[/title]
Use the [b]scan[/b] function to scan your media root for new, changed, and
deleted audio files. This process may take some time if you have a large
library!
Instead of scanning the entire MEDIA_ROOT, you can specify a PATH, which
must be a subdirectory of your MEDIA_ROOT. This is useful to import that
new new.
[title]USAGE[/title]
[link]> scan [PATH][/link]
""")
def scan(self, parts):
"""
Scan your MEDIA_ROOT for changes.
"""
path = ' '.join(parts) if parts else None
try:
scanner = MediaScanner(path=path, db=self.manager.session, console=self.console)
except InvalidPathError as e:
self.console.error(str(e))
return True
scanner.scan()
@command(usage="""
[title]TRANSCODING[/title]
Groove on Demand will stream audio to web clients in the native format of your source media files, but for maximum
portability, performance, and interoperability with reverse proxies, it's a good idea to transcode to .webm first.
Use the [b]transcode[/b] command to cache transcoded copies of every track currently in a playlist that isn't
already in .webm format. Existing cache entries will be skipped. See also [b]cache[/b].
[title]USAGE[/title]
[link]> transcode[/link]
""")
def transcode(self, parts):
"""
Run the transcoder.
"""
tracks = self.manager.session.query(db.track).filter(db.entry.c.track_id == db.track.c.id).all()
transcoder = Transcoder(console=self.console)
transcoder.transcode([track['relpath'] for track in tracks])
@command("""
[title]LISTS FOR THE LIST LOVER[/title]
The [b]list[/b] command will display a summary of all playlists currently stored
in the Groove on Demand database.
[title]USAGE[/title]
[link]> list[/link]
""")
def list(self, parts):
"""
List all playlists.
"""
table = self.console.table(
Column('#', justify='right', width=4),
Column('Name'),
Column('Tracks', justify='right', width=4),
Column('Description'),
Column('Link'),
box=box.HORIZONTALS,
title=' :headphones: Groove on Demand Playlists',
title_justify='left',
caption=self._get_stats(),
caption_justify='right',
expand=True
)
query = self.manager.session.query(db.playlist)
for row in db.windowed_query(query, db.playlist.c.id, 1000):
pl = Playlist.from_row(row, self.manager.session)
table.add_row(
f"[dim]{pl.record.id}[/dim]",
f"[title]{pl.record.name}[/title]",
f"[text]{len(pl.entries)}[/text]",
f"[text]{pl.record.description}[/text]",
f"[link]{pl.url}[/link]",
)
self.console.print(table)
return True
@command(usage="""
[title]LOADING PLAYLISTS[/title]
Use the [b]load[/b] command to load an existing playlist from the database
and start the playlist editor. If the specified playlist does not exist,
it will be created automatically.
Matching playlist names will be suggested as you type; hit <TAB> to accept
the current suggestion, or use the arrow keys to choose a different
suggestion.
[title]USAGE[/title]
[link]> load NAME[/link]
""")
def load(self, parts):
"""
Load the named playlist and create it if it does not exist.
"""
name = ' '.join(parts)
if not name:
return
slug = slugify(name)
self._playlist = Playlist(
slug=slug,
name=name,
session=self.manager.session,
create_ok=True
)
self._subshells['_playlist'].start()
return True
@command(usage="""
[title]DATABASE STATISTICS[/title]
The [b]stats[/b] command displays interesting statistics about the database.
[title]USAGE[/title]
[link]> stats[/link]
""")
def stats(self, parts):
"""
Display database statistics.
"""
self.console.print(self._get_stats())
@command(usage="""
[title]HIT IT AND QUIT IT[/title]
The [b]quit[/b] command exits the Groove on Demand interactive shell.
[title]USAGE[/title]
[link]> quit|^D|<ENTER>[/link]
""")
def quit(self, parts):
"""
Quit Groove on Demand.
"""
raise SystemExit('Find the 1.')
@command(usage="""
[title]HELP FOR THE HELP LORD[/title]
The [b]help[/b] command will print usage information for whatever you're currently
doing. You can also ask for help on any command currently available.
[title]USAGE[/title]
[link]> help [COMMAND][/link]
""")
def help(self, parts):
"""
Display the help message.
"""
super().help(parts)
return True
def start(): # pragma: no cover
with database_manager() as manager:
InteractiveShell(manager).start()