From 5f8ab8fe25803193511a0f42f703154c03dcd650 Mon Sep 17 00:00:00 2001 From: evilchili Date: Thu, 24 Nov 2022 13:45:09 -0800 Subject: [PATCH] WIP adding CRUD operations for playlists --- groove/cli.py | 70 ++++++++++++++++++++++++++- groove/helper.py | 49 ------------------- groove/playlist.py | 104 +++++++++++++++++++++++++++++++++++++++++ groove/webserver.py | 12 +++-- pyproject.toml | 1 + test/test_playlists.py | 12 +++++ 6 files changed, 193 insertions(+), 55 deletions(-) delete mode 100644 groove/helper.py create mode 100644 groove/playlist.py create mode 100644 test/test_playlists.py diff --git a/groove/cli.py b/groove/cli.py index bb7c4f4..5c5e784 100644 --- a/groove/cli.py +++ b/groove/cli.py @@ -3,15 +3,20 @@ import os import typer from pathlib import Path -from typing import Optional +from typing import Optional, List from dotenv import load_dotenv +from slugify import slugify +from pprint import pprint from groove import webserver +from groove.playlist import Playlist from groove.db.manager import database_manager from groove.db.scanner import media_scanner +playlist_app = typer.Typer() app = typer.Typer() +app.add_typer(playlist_app, name='playlist', help='Manage playlists.') def initialize(): @@ -21,6 +26,68 @@ def initialize(): level=logging.DEBUG if debug else logging.INFO) +@playlist_app.command() +def delete( + name: str = typer.Argument( + ..., + help="The name of the playlist to create." + ), + no_dry_run: bool = typer.Option( + False, + help="If True, actually delete the playlist, Otherwise, show what would be deleted." + ) +): + """ + Delete a playlist + """ + initialize() + with database_manager() as manager: + pl = Playlist(slug=slugify(name), connection=manager.session, create_if_not_exists=False) + if not pl.exists: + logging.info(f"No playlist named '{name}' could be found.") + return + + if no_dry_run is False: + print(f"Would delete playlist {pl.record.id}, which contains {len(pl.entries)} tracks.") + return + deleted_playlist = pl.delete() + print(f"Playlist {deleted_playlist} deleted.") + + +@playlist_app.command() +def add( + name: str = typer.Argument( + ..., + help="The name of the playlist to create." + ), + tracks: List[str] = typer.Option( + None, + help="A list of tracks to add to the playlist." + ), + exists_ok: bool = typer.Option( + True, + help="If True, it is okay if the playlist already exists." + ), + multiples_ok: bool = typer.Option( + False, + help="If True, the same track can be added to the playlist multiple times." + ) +): + """ + Create a new playlist with the specified name, unless it already exists. + """ + initialize() + with database_manager() as manager: + pl = Playlist(slug=slugify(name), connection=manager.session, create_if_not_exists=True) + if pl.exists: + if not exists_ok: + raise RuntimeError(f"Playlist with slug {pl.slug} already exists!") + logging.debug(pl.as_dict) + if tracks: + pl.add(tracks) + pprint(pl.as_dict) + + @app.command() def scan( root: Optional[Path] = typer.Option( @@ -41,6 +108,7 @@ def scan( count = scanner.scan() logging.info(f"Imported {count} new tracks from {root}.") + @app.command() def server( host: str = typer.Argument( diff --git a/groove/helper.py b/groove/helper.py deleted file mode 100644 index bc67023..0000000 --- a/groove/helper.py +++ /dev/null @@ -1,49 +0,0 @@ -import json -import logging - -from bottle import HTTPResponse - -from sqlalchemy import bindparam - -from groove import db - - -class PlaylistDatabaseHelper: - """ - Convenience class for database interactions. - """ - def __init__(self, connection): - self._conn = connection - - @property - def conn(self): - return self._conn - - def playlist(self, slug: str) -> dict: - """ - Retrieve a playlist and its entries by its slug. - """ - playlist = {} - - query = db.playlist.select(db.playlist.c.slug == bindparam('slug')) - logging.debug(f"playlist: '{slug}' requested. Query: {query}") - results = self.conn.execute(str(query), {'slug': slug}).fetchone() - if not results: - return playlist - - playlist = results - query = db.entry.select(db.entry.c.playlist_id == bindparam('playlist_id')) - logging.debug(f"Retrieving playlist entries. Query: {query}") - entries = self.conn.execute(str(query), {'playlist_id': playlist['id']}).fetchall() - - playlist = dict(playlist) - playlist['entries'] = [dict(entry) for entry in entries] - return playlist - - def json_response(self, playlist: dict, status: int = 200) -> HTTPResponse: - """ - Create an application/json HTTPResponse object out of a playlist and its entries. - """ - response = json.dumps(playlist) - logging.debug(response) - return HTTPResponse(status=status, content_type='application/json', body=response) diff --git a/groove/playlist.py b/groove/playlist.py new file mode 100644 index 0000000..6d62bdc --- /dev/null +++ b/groove/playlist.py @@ -0,0 +1,104 @@ +from groove import db +from sqlalchemy import func, delete +from sqlalchemy.exc import NoResultFound +import logging + + +class Playlist: + """ + CRUD operations and convenience methods for playlists. + """ + def __init__(self, slug, connection, create_if_not_exists=False): + self._conn = connection + self._slug = slug + self._record = None + self._entries = None + self._create_if_not_exists = create_if_not_exists + + @property + def exists(self): + return self.record is not None + + @property + def slug(self): + return self._slug + + @property + def conn(self): + return self._conn + + @property + def record(self): + if not self._record: + try: + self._record = self.conn.query(db.playlist).filter(db.playlist.c.slug == self.slug).one() + logging.debug(f"Retrieved playlist {self._record.id}") + except NoResultFound: + pass + if self._create_if_not_exists: + self._record = self._create() + if not self._record: + raise RuntimeError(f"Tried to create a playlist but couldn't read it back using slug {self.slug}") + return self._record + + @property + def entries(self): + if not self._entries: + self._entries = self.conn.query( + db.entry, + db.track + ).filter( + (db.playlist.c.id == self.record.id) + ).filter( + db.entry.c.playlist_id == db.playlist.c.id + ).filter( + db.entry.c.track_id == db.track.c.id + ).all() + return self._entries + + @property + def as_dict(self) -> dict: + """ + Retrieve a playlist and its entries by its slug. + """ + playlist = {} + playlist = dict(self.record) + playlist['entries'] = [dict(entry) for entry in self.entries] + return playlist + + def add(self, paths) -> int: + return self._create_entries(self._get_tracks_by_path(paths)) + + def delete(self): + plid = self.record.id + stmt = delete(db.entry).where(db.entry.c.playlist_id == plid) + logging.debug(f"Deleting entries associated with playlist {plid}: {stmt}") + self.conn.execute(stmt) + stmt = delete(db.playlist).where(db.playlist.c.id == plid) + logging.debug(f"Deleting playlist {plid}: {stmt}") + self.conn.execute(stmt) + self.conn.commit() + return plid + + def _get_tracks_by_path(self, paths): + return [self.conn.query(db.track).filter(db.track.c.relpath.ilike(f"%{path}%")).one() for path in paths] + + def _create_entries(self, tracks): + + maxtrack = self.conn.query(func.max(db.entry.c.track)).filter_by(playlist_id=self.record.id).one()[0] + self.conn.execute( + db.entry.insert(), + [ + {'playlist_id': self.record.id, 'track_id': obj.id, 'track': idx} + for (idx, obj) in enumerate(tracks, start=maxtrack+1) + ] + ) + self.conn.commit() + return len(tracks) + + def _create(self): + stmt = db.playlist.insert({'slug': self.slug}) + results = self.conn.execute(stmt) + self.conn.commit() + logging.debug(f"Created new playlist {results.inserted_primary_key} with slug {self.slug}") + return self.conn.query(db.playlist).filter(db.playlist.c.id == results.inserted_primary_key).one() diff --git a/groove/webserver.py b/groove/webserver.py index b6032ae..3867e27 100644 --- a/groove/webserver.py +++ b/groove/webserver.py @@ -1,3 +1,4 @@ +import json import logging import os @@ -6,7 +7,7 @@ from bottle import HTTPResponse from bottle.ext import sqlite from groove.auth import is_authenticated -from groove.helper import PlaylistDatabaseHelper +from groove.playlist import Playlist server = bottle.Bottle() @@ -44,8 +45,9 @@ def get_playlist(slug, db): Retrieve a playlist and its entries by a slug. """ logging.debug(f"Looking up playlist: {slug}...") - pldb = PlaylistDatabaseHelper(connection=db) - playlist = pldb.playlist(slug) - if not playlist: + playlist = Playlist(slug=slug, conn=db) + if not playlist.exists: return HTTPResponse(status=404, body="Not found") - return pldb.json_response(playlist) + response = json.dumps(playlist.as_dict) + logging.debug(response) + return HTTPResponse(status=200, content_type='application/json', body=response) diff --git a/pyproject.toml b/pyproject.toml index b9cdcbb..ed6ce0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ python-dotenv = "^0.21.0" Paste = "^3.5.2" bottle-sqlite = "^0.2.0" SQLAlchemy = "^1.4.44" +python-slugify = "^7.0.0" [tool.poetry.dev-dependencies] pytest = "^7.2.0" diff --git a/test/test_playlists.py b/test/test_playlists.py new file mode 100644 index 0000000..3b047bb --- /dev/null +++ b/test/test_playlists.py @@ -0,0 +1,12 @@ + + +def test_create_playlist(): + pass + + +def test_update_playlist(): + pass + + +def delete_playlist(): + pass