refactor subshells, add help system

This commit is contained in:
evilchili 2022-12-17 18:02:12 -08:00
parent f526b42d65
commit fe671194a0
14 changed files with 388 additions and 218 deletions

View File

@ -71,6 +71,7 @@ class Console(_Console):
caption_style=background_style,
style=background_style,
)
params['min_width'] = 80
width = os.environ.get('CONSOLE_WIDTH', 'auto')
if width == 'expand':
params['expand'] = True

View File

@ -271,6 +271,10 @@ class Playlist:
def get_or_create(self, create_ok: bool = False) -> Row:
if self._record is None:
self._record = self._get()
if self._record:
self._description = self._record.description
self._name = self._record.name
self._slug = self._record.slug
if not self._record:
if self.deleted:
raise PlaylistValidationError("Object has been deleted.")

View File

@ -1,7 +1,2 @@
from .base import BasePrompt
from .quit import quit
from .help import help
from .browse import browse
from .stats import stats
from .playlist import _playlist
from .load import load

View File

@ -1,8 +1,31 @@
from prompt_toolkit import prompt
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit import print_formatted_text, HTML
from groove.console import Console
import functools
from collections import namedtuple, defaultdict
from prompt_toolkit.completion import Completer, Completion
from groove.console import Console
from textwrap import dedent
COMMANDS = defaultdict(dict)
Command = namedtuple('Commmand', 'prompt,handler,usage')
def register_command(handler, usage):
prompt = handler.__qualname__.split('.', -1)[0]
cmd = handler.__name__
if cmd not in COMMANDS[prompt]:
COMMANDS[prompt][cmd] = Command(prompt=prompt, handler=handler, usage=usage)
def command(usage):
def decorator(func):
register_command(func, usage)
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator
class BasePrompt(Completer):
@ -14,26 +37,45 @@ class BasePrompt(Completer):
raise RuntimeError("Must define either a database manager or a parent object.")
self._prompt = ''
self._values = []
self._autocomplete_values = []
self._parent = parent
self._manager = manager
self._console = None
self._theme = None
def _get_help(self, cmd=None):
try:
return dedent(COMMANDS[self.__class__.__name__][cmd].usage)
except KeyError:
return self.usage
def default_completer(self, document, complete_event):
raise NotImplementedError(f"Implement the 'default_completer' method of {self.__class__.__name__}")
@property
def usage(self):
text = dedent("""
[title]GROOVE ON DEMAND INTERACTIVE SHELL[/title]
Available commands are listed below. Try 'help COMMAND' for detailed help.
[title]COMMANDS[/title]
""")
for (name, cmd) in sorted(self.commands.items()):
text += f" [b]{name:10s}[/b] {cmd.handler.__doc__.strip()}\n"
return text
@property
def commands(self):
return COMMANDS[self.__class__.__name__]
@property
def console(self):
if not self._console:
self._console = Console(color_system='truecolor')
return self._console
@property
def usage(self):
return self.__class__.__name__
@property
def help_text(self):
return self.__doc__
@property
def manager(self):
if self._manager:
@ -50,13 +92,13 @@ class BasePrompt(Completer):
return self._prompt
@property
def values(self):
return self._values
def autocomplete_values(self):
return self._autocomplete_values
def get_completions(self, document, complete_event):
word = document.get_word_before_cursor()
found = False
for value in self.values:
for value in self.autocomplete_values:
if word in value:
found = True
yield Completion(value, start_position=-len(word))
@ -68,11 +110,17 @@ class BasePrompt(Completer):
pass
def help(self, parts):
self.console.print(
getattr(self, parts[0]).__doc__ if parts else self.help_text
)
attr = None
if parts:
attr = parts[0]
self.console.print(self._get_help(attr))
return True
def process(self, cmd, *parts):
if cmd in self.commands:
return self.commands[cmd].handler(self, parts)
self.console.error(f"Command {cmd} not understood.")
def start(self, cmd=''):
while True:
if not cmd:
@ -80,12 +128,7 @@ class BasePrompt(Completer):
if not cmd:
return
cmd, *parts = cmd.split()
if not self.process(cmd, *parts):
return
res = self.process(cmd, *parts)
if res is False:
return res
cmd = ''
def default_completer(self, document, complete_event):
raise NotImplementedError()
def process(self, cmd, *parts):
raise NotImplementedError()

View File

@ -1,26 +0,0 @@
from .base import BasePrompt
from rich.table import Table, Column
from rich import print
from sqlalchemy import func
from groove import db
from groove.playlist import Playlist
class browse(BasePrompt):
"""Browse the playlists."""
def process(self, cmd, *parts):
count = self.parent.manager.session.query(func.count(db.playlist.c.id)).scalar()
print(f"Displaying {count} playlists:")
query = self.parent.manager.session.query(db.playlist)
table = Table(
*[Column(k.name.title()) for k in db.playlist.columns]
)
for row in db.windowed_query(query, db.playlist.c.id, 1000):
columns = tuple(Playlist.from_row(row, self.manager.session).record)[0:-1]
table.add_row(*[str(col) for col in columns])
print()
print(table)
print()

View File

@ -1,26 +0,0 @@
from .base import BasePrompt
from rich import print
import rich.table
class help(BasePrompt):
"""Display help documentation."""
@property
def usage(self):
return "help [COMMAND]"
def process(self, cmd, *parts):
if not parts:
print("Available Commands:")
table = rich.table.Table()
table.add_column("Command", style="yellow", no_wrap=True)
table.add_column("Description")
for name, obj in self.parent.commands.items():
if name.startswith('_'):
continue
table.add_row(getattr(obj, 'usage', name), obj.__doc__)
print(table)
else:
print(f"Help for {' '.join(parts)}:")

View File

@ -1,40 +1,48 @@
from slugify import slugify
from groove.db.manager import database_manager
from groove.shell.base import BasePrompt
from groove.shell.base import BasePrompt, command, register_command
from groove import db
from groove.playlist import Playlist
from rich.table import Table, Column
from rich import box
class CommandPrompt(BasePrompt):
from sqlalchemy import func
class InteractiveShell(BasePrompt):
def __init__(self, manager):
super().__init__(manager=manager)
self._playlist = None
self._completer = None
self._commands = None
self._prompt = [
"[help]Groove on Demand interactive shell. Try 'help' for help.[/help]",
"groove"
]
self._subshells = {}
self._register_subshells()
def _register_subshells(self):
for subclass in BasePrompt.__subclasses__():
if subclass.__name__ == self.__class__.__name__:
continue
self._subshells[subclass.__name__] = subclass(manager=self.manager, parent=self)
def _get_stats(self):
playlists = self.manager.session.query(func.count(db.playlist.c.id)).scalar()
entries = self.manager.session.query(func.count(db.entry.c.track)).scalar()
tracks = self.manager.session.query(func.count(db.track.c.relpath)).scalar()
return f"Database contains {playlists} playlists with a total of {entries} entries, from {tracks} known tracks."
@property
def playlist(self):
return self._playlist
@property
def commands(self):
if not self._commands:
self._commands = {}
for cmd in BasePrompt.__subclasses__():
if cmd.__name__ == self.__class__.__name__:
continue
self._commands[cmd.__name__] = cmd(manager=self.manager, parent=self)
return self._commands
@property
def values(self):
return [k for k in self.commands.keys() if not k.startswith('_')]
def autocomplete_values(self):
return list(self.commands.keys())
def default_completer(self, document, complete_event): # pragma: no cover
def _formatter(row):
@ -47,20 +55,133 @@ class CommandPrompt(BasePrompt):
).get_completions(document, complete_event)
def process(self, cmd, *parts):
name = cmd + ' ' + ' '.join(parts)
if cmd in self.commands:
self.commands[cmd].start(name)
return self.commands[cmd].handler(self, parts)
name = cmd + ' ' + ' '.join(parts)
self.load([name.strip()])
@command("""
[title]LISTS FOR THE LIST LOVER[/title]
The [b]list[/b] command will display a summary of all playlists currently stored
in the Groove on Demand database.
[title]USAGE[/title]
[link]> list[/link]
""")
def list(self, parts):
"""
List all playlists.
"""
count = self.manager.session.query(func.count(db.playlist.c.id)).scalar()
table = self.console.table(
Column('#', justify='right', width=4),
Column('Name'),
Column('Tracks', justify='right', width=4),
Column('Description'),
Column('Link'),
box=box.HORIZONTALS,
title=' :headphones: Groove on Demand Playlists',
title_justify='left',
caption=self._get_stats(),
caption_justify='right',
expand=True
)
query = self.manager.session.query(db.playlist)
for row in db.windowed_query(query, db.playlist.c.id, 1000):
pl = Playlist.from_row(row, self.manager.session)
table.add_row(
f"[dim]{pl.record.id}[/dim]",
f"[title]{pl.record.name}[/title]",
f"[text]{len(pl.entries)}[/text]",
f"[text]{pl.record.description}[/text]",
f"[link]{pl.url}[/link]",
)
self.console.print(table)
return True
@command(usage="""
[title]LOADING PLAYLISTS[/title]
Use the [b]load[/b] command to load an existing playlist from the database
and start the playlist editor. If the specified playlist does not exist,
it will be created automatically.
Matching playlist names will be suggested as you type; hit <TAB> to accept
the current suggestion, or use the arrow keys to choose a different
suggestion.
[title]USAGE[/title]
[link]> load NAME[/link]
""")
def load(self, parts):
"""
Load the named playlist and create it if it does not exist.
"""
name = ' '.join(parts)
if not name:
return
slug = slugify(name)
self._playlist = Playlist(
slug=slugify(name),
slug=slug,
name=name,
session=self.manager.session,
create_ok=True
)
self.commands['_playlist'].start()
self._subshells['_playlist'].start()
return True
@command(usage="""
[title]DATABASE STATISTICS[/title]
The [b]stats[/b] command displays interesting statistics about the database.
[title]USAGE[/title]
[link]> stats[/link]
""")
def stats(self, parts):
"""
Display database statistics.
"""
self.console.print(self._get_stats())
@command(usage="""
[title]HIT IT AND QUIT IT[/title]
The [b]quit[/b] command exits the Groove on Demand interactive shell.
[title]USAGE[/title]
[link]> quit|^D|<ENTER>[/link]
""")
def quit(self, parts):
"""
Quit Groove on Demand.
"""
raise SystemExit('Find the 1.')
@command(usage="""
[title]HELP FOR THE HELP LORD[/title]
The [b]help[/b] command will print usage information for whatever you're currently
doing. You can also ask for help on any command currently available.
[title]USAGE[/title]
[link]> help [COMMAND][/link]
""")
def help(self, parts):
"""
Display the help message.
"""
super().help(parts)
return True
def start(): # pragma: no cover
with database_manager() as manager:
CommandPrompt(manager).start()
InteractiveShell(manager).start()

View File

@ -1,28 +0,0 @@
from .base import BasePrompt
from slugify import slugify
from groove.playlist import Playlist
class load(BasePrompt):
"""Create a new playlist."""
@property
def usage(self):
return "load PLAYLIST_NAME"
def process(self, cmd, *parts):
name = ' '.join(parts)
if not name:
print(f"Usage: {self.usage}")
return
slug = slugify(name)
self.parent._playlist = Playlist(
slug=slug,
name=name,
session=self.manager.session,
create_ok=True
)
print(self.parent.playlist.info)
return self.parent.commands['_playlist'].start()

View File

@ -1,4 +1,4 @@
from .base import BasePrompt
from .base import BasePrompt, command
import os
@ -18,10 +18,10 @@ class _playlist(BasePrompt):
def __init__(self, parent, manager=None):
super().__init__(manager=manager, parent=parent)
self._parent = parent
self._commands = None
self._console = parent.console
@property
def help_text(self):
def usage(self):
synopsis = (
f"You are currently editing the [b]{self.parent.playlist.name}[/b]"
f" playlist. From this prompt you can quickly append new tracks "
@ -33,7 +33,7 @@ class _playlist(BasePrompt):
try:
width = int(os.environ.get('CONSOLE_WIDTH', '80'))
except ValueError:
except ValueError: # pragma: no cover
width = 80
synopsis = '\n '.join(wrap(synopsis, width=width))
@ -54,7 +54,7 @@ class _playlist(BasePrompt):
[b]delete[/b] Delete the playlist
[b]help[/b] This message
Try 'help command' for command-specific help.[/help]
Try 'help COMMAND' for command-specific help.[/help]
""")
@property
@ -65,43 +65,50 @@ class _playlist(BasePrompt):
f"[prompt]{self.parent.playlist.slug}[/prompt]",
]
@property
def values(self):
return self.commands.keys()
def start(self):
self.show()
super().start()
@property
def commands(self):
if not self._commands:
self._commands = {
'delete': self.delete,
'add': self.add,
'show': self.show,
'edit': self.edit,
'help': self.help
}
return self._commands
@command("""
[title]EDITING A PLAYLIST[/title]
def process(self, cmd, *parts):
if cmd in self.commands:
return True if self.commands[cmd](parts) else False
self.parent.console.error(f"Command not understood: {cmd}")
return True
Use the [b]edit[/b] commmand to edit a YAML-formatted versin of the playlist
in your external editor as specified by the $EDITOR environment variable.
You can use this feature to rename a playlist, change its description, and
delete or reorder the playlist's entries. Save and exit the file when you
are finished editing, and the playlist will be updated with your changes.
To abort the edit session, exit your editor without saving the file.
[title]USAGE[/title]
[link]playlist> edit[/link]
""")
def edit(self, *parts):
try:
self.parent.playlist.edit()
except PlaylistValidationError as e:
self.parent.console.error(f"Changes were not saved: {e}")
except PlaylistValidationError as e: # pragma: no cover
self.console.error(f"Changes were not saved: {e}")
else:
self.show()
return True
@command("""
[title]VIEWS FOR THE VIEWMASTER[/title]
Use the [b]show[/b] command to display the contents of the current playlist.
[title]USAGE[/title]
[link]playlist> show[/link]
""")
def show(self, *parts):
pl = self.parent.playlist
title = f"\n [b]:headphones: {pl.name}[/b]"
if pl.description:
title += f"\n [italic]{pl.description}[/italic]\n"
table = self.parent.console.table(
table = self.console.table(
Column('#', justify='right', width=4),
Column('Artist', justify='left'),
Column('Title', justify='left'),
@ -117,16 +124,34 @@ class _playlist(BasePrompt):
f"[artist]{entry.artist}[/artist]",
f"[title]{entry.title}[/title]"
)
self.parent.console.print(table)
self.console.print(table)
return True
@command("""
[title]ADDING TRACKS TO A PLAYLIST[/title]
Use the [b]add[/b] command to interactively add one or more tracks from
your media sources to the current playlist. At the prompt, start typing the
name of an artist, album, or song title; matches from the file names in
your library will be suggested automatically. To accept a match, hit <TAB>,
or use the arrow keys to choose a different suggestion.
Hit <ENTER> to add your selected track to the current playlist. You can
then add another track, or hit <ENTER> again to return to the playlist
editor.
[title]USAGE[/title]
[link]playlist> add[/link]
[link] ?> PATHNAME
""")
def add(self, *parts):
self.parent.console.print(
self.console.print(
"Add tracks one at a time by title. Hit Enter to finish."
)
added = False
while True:
text = self.parent.console.prompt(
text = self.console.prompt(
[' ?'],
completer=self.manager.fuzzy_table_completer(
db.track,
@ -147,26 +172,50 @@ class _playlist(BasePrompt):
try:
track = sess.query(db.track).filter(db.track.c.relpath == text).one()
self.parent.playlist.create_entries([track])
except NoResultFound:
self.parent.console.error("No match for '{text}'")
except NoResultFound: # pragma: no cover
self.console.error("No match for '{text}'")
return
return text
def delete(self, *parts):
res = self.parent.console.prompt([
@command("""
[title]DELETING A PLAYLIST[/title]
Use the [b]delete[/b] command to delete the current playlist. You will be
prompted to type DELETE, to ensure you really mean it. If you
enter anything other than DELETE, the delete request will
be aborted.
[danger]Deleting a playlist cannot be undone![/danger]
[title]USAGE[/title]
[link]playlist> delete[/link]
Type DELETE to permanently delete the playlist.
[link]DELETE playlist> DELETE
""")
def delete(self, parts):
res = self.console.prompt([
f"[error]Type [b]DELETE[/b] to permanently delete the playlist "
f'"{self.parent.playlist.record.name}".[/error]',
f"[prompt]DELETE {self.parent.playlist.slug}[/prompt]",
])
if res != 'DELETE':
self.parent.console.error("Delete aborted. No changes have been made.")
self.console.error("Delete aborted. No changes have been made.")
return True
self.parent.playlist.delete()
self.parent.console.print("Deleted the playlist.")
self.console.print("Deleted the playlist.")
self.parent._playlist = None
return False
def start(self):
self.show()
super().start()
@command("""
[title]HELP![/title]
The [b]help[/b] command will print usage information for whatever you're currently
doing. You can also ask for help on any command currently available.
[title]USAGE[/title]
[link]> help [COMMAND][/link]
""")
def help(self, parts):
super().help(parts)

View File

@ -1,8 +0,0 @@
from .base import BasePrompt
class quit(BasePrompt):
"""Exit the interactive shell."""
def process(self, cmd, *parts):
raise SystemExit()

View File

@ -1,16 +0,0 @@
from .base import BasePrompt
from sqlalchemy import func
from rich import print
from groove import db
class stats(BasePrompt):
def process(self, cmd, *parts):
sess = self.parent.manager.session
playlists = sess.query(func.count(db.playlist.c.id)).scalar()
entries = sess.query(func.count(db.entry.c.track)).scalar()
tracks = sess.query(func.count(db.track.c.relpath)).scalar()
print(f"Database contains {playlists} playlists with a total of {entries} entries, from {tracks} known tracks.")

View File

@ -6,6 +6,8 @@ from unittest.mock import MagicMock
from groove import playlist, editor
from groove.exceptions import PlaylistValidationError, TrackNotFoundError
from yaml.scanner import ScannerError
def test_create(empty_playlist):
assert empty_playlist.record.id
@ -174,6 +176,32 @@ def test_edit(monkeypatch, edits, expected, empty_playlist):
assert empty_playlist.name == expected
@pytest.mark.parametrize('error', [TypeError, ScannerError, TrackNotFoundError])
def test_edit_errors(monkeypatch, error, empty_playlist):
monkeypatch.setattr('groove.playlist.Playlist.from_yaml', MagicMock(
side_effect=error
))
with pytest.raises(PlaylistValidationError):
empty_playlist.edit()
@pytest.mark.parametrize('error', [IOError, OSError, FileNotFoundError])
def test_edit_errors_in_editor(monkeypatch, error, empty_playlist):
monkeypatch.setattr('groove.editor.subprocess.check_call', MagicMock(
side_effect=error
))
with pytest.raises(RuntimeError):
empty_playlist.edit()
def test_edit_yaml_error_in_editor(monkeypatch, empty_playlist):
monkeypatch.setattr('groove.editor.PlaylistEditor.read', MagicMock(
side_effect=ScannerError
))
with pytest.raises(PlaylistValidationError):
empty_playlist.edit()
@pytest.mark.parametrize('slug', [None, ''])
def test_save_no_slug(slug, empty_playlist):
empty_playlist._slug = slug

View File

@ -9,7 +9,7 @@ from unittest.mock import MagicMock
def cmd_prompt(monkeypatch, in_memory_engine, db):
with database_manager() as manager:
manager._session = db
yield interactive_shell.CommandPrompt(manager)
yield interactive_shell.InteractiveShell(manager)
def response_factory(responses):
@ -35,30 +35,15 @@ def test_quit(monkeypatch, capsys, cmd_prompt, inputs, expected):
cmd_prompt.start()
def test_browse(monkeypatch, capsys, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory(['browse']))
def test_list(monkeypatch, capsys, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory(['list']))
cmd_prompt.start()
output = capsys.readouterr()
assert 'Displaying 4 playlists' in output.out
assert 'playlist one' in output.out
assert 'the first one' in output.out
assert 'playlist-one' in output.out
assert 'the second one' in output.out
assert 'the threerd one' in output.out
assert 'empty playlist' in output.out
@pytest.mark.parametrize('inputs, expected', [
(['help'], ['Available Commands', ' help ', ' stats ', ' browse ']),
(['help browse'], ['Help for browse']),
])
def test_help(monkeypatch, capsys, cmd_prompt, inputs, expected):
monkeypatch.setattr('groove.console.Console.prompt', response_factory(inputs))
@pytest.mark.parametrize('inputs', ['help', 'help list'])
def test_help(monkeypatch, capsys, cmd_prompt, inputs):
monkeypatch.setattr('groove.console.Console.prompt', response_factory([inputs]))
cmd_prompt.start()
output = capsys.readouterr()
for txt in expected:
assert txt in output.out
assert cmd_prompt.__doc__ == cmd_prompt.help_text
@pytest.mark.parametrize('inputs, expected', [
@ -74,4 +59,51 @@ def test_load(monkeypatch, caplog, cmd_prompt, inputs, expected):
def test_values(cmd_prompt):
for cmd in [cmd for cmd in cmd_prompt.commands.keys() if not cmd.startswith('_')]:
assert cmd in cmd_prompt.values
assert cmd in cmd_prompt.autocomplete_values
def test_playlist_usage(monkeypatch, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory([
'load new playlist',
'help'
]))
cmd_prompt.start()
def test_playliest_edit(monkeypatch, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory([
'load new playlist',
'edit'
]))
cmd_prompt.start()
def test_playlist_show(monkeypatch, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory([
'load playlist one',
'show'
]))
cmd_prompt.start()
def test_playlist_add(monkeypatch, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory([
'load playlist one',
'add',
'',
'add',
'UNKLE/Psyence Fiction/01 Guns Blazing (Drums of Death, Part 1).flac',
'',
]))
cmd_prompt.start()
def test_playlist_delete(monkeypatch, cmd_prompt):
monkeypatch.setattr('groove.console.Console.prompt', response_factory([
'load playlist one',
'delete',
'',
'delete',
'DELETE',
]))
cmd_prompt.start()

View File

@ -18,3 +18,4 @@ help = #999999
background = #001321
error = #FF8888
danger = #FF8888