grooveondemand/groove/playlist.py

215 lines
7.0 KiB
Python
Raw Normal View History

from groove import db
from sqlalchemy import func, delete
from sqlalchemy.orm.session import Session
from sqlalchemy.engine.row import Row
from sqlalchemy.exc import NoResultFound, MultipleResultsFound
from typing import Union, List
2022-11-30 00:09:23 -08:00
import logging
2022-11-30 00:09:23 -08:00
import os
class Playlist:
"""
CRUD operations and convenience methods for playlists.
"""
def __init__(self,
slug: str,
session: Session,
name: str = '',
description: str = '',
2022-12-02 00:21:19 -08:00
create_ok=True):
self._session = session
self._slug = slug
self._name = name
self._description = description
self._entries = None
2022-12-02 00:21:19 -08:00
self._record = None
self._create_ok = create_ok
self._deleted = False
@property
def deleted(self) -> bool:
return self._deleted
@property
def exists(self) -> bool:
2022-12-02 00:21:19 -08:00
if self.deleted:
2022-12-02 21:43:51 -08:00
logging.debug("Playlist has been deleted.")
2022-12-02 00:21:19 -08:00
return False
if not self._record:
2022-12-02 21:43:51 -08:00
if self._create_ok:
return True and self.record
return False
2022-12-02 00:21:19 -08:00
return True
2022-11-30 00:09:23 -08:00
@property
def summary(self):
return ' :: '.join([
f"[ {self.record.id} ]",
self.record.name,
f"http://{os.environ['HOST']}/{self.slug}",
])
@property
2022-12-02 00:21:19 -08:00
def slug(self) -> str:
return self._slug
@property
2022-12-02 00:21:19 -08:00
def session(self) -> Session:
return self._session
@property
def record(self) -> Union[Row, None]:
"""
Cache the playlist row from the database and return it. Optionally create it if it doesn't exist.
"""
if not self._record:
2022-12-02 00:21:19 -08:00
self._record = self.get_or_create()
return self._record
@property
def entries(self) -> Union[List, None]:
"""
Cache the list of entries on this playlist and return it.
"""
2022-11-30 00:09:23 -08:00
if self.record and not self._entries:
query = self.session.query(
db.entry,
db.track
).filter(
(db.playlist.c.id == self.record.id)
).filter(
db.entry.c.playlist_id == db.playlist.c.id
).filter(
db.entry.c.track_id == db.track.c.id
2022-11-30 00:09:23 -08:00
).order_by(
db.entry.c.track
)
2022-11-30 00:09:23 -08:00
self._entries = query.all()
return self._entries
@property
def as_dict(self) -> dict:
"""
Return a dictionary of the playlist and its entries.
"""
2022-12-02 00:21:19 -08:00
if not self.exists:
return {}
playlist = dict(self.record)
playlist['entries'] = [dict(entry) for entry in self.entries]
return playlist
2022-11-30 00:09:23 -08:00
@property
def as_string(self) -> str:
2022-12-02 00:21:19 -08:00
if not self.exists:
return ''
2022-11-30 00:09:23 -08:00
text = f"{self.summary}\n"
for entry in self.entries:
text += f" - {entry.track} {entry.artist} - {entry.title}\n"
return text
2022-12-02 00:21:19 -08:00
def _get_tracks_by_path(self, paths: List[str]) -> List:
"""
Retrieve tracks from the database that match the specified path fragments. The exceptions NoResultFound and
MultipleResultsFound are expected in the case of no matches and multiple matches, respectively.
"""
return [self.session.query(db.track).filter(db.track.c.relpath.ilike(f"%{path}%")).one() for path in paths]
def add(self, paths: List[str]) -> int:
"""
Add entries to the playlist. Each path should match one and only one track in the database (case-insensitive).
If a path doesn't match any track, or if a path matches multiple tracks, nothing is added to the playlist.
Args:
paths (list): A list of partial paths to add.
Returns:
int: The number of tracks added.
"""
logging.debug(f"Attempting to add tracks matching: {paths}")
try:
2022-11-30 00:09:23 -08:00
return self.create_entries(self._get_tracks_by_path(paths))
except NoResultFound:
logging.error("One or more of the specified paths do not match any tracks in the database.")
return 0
except MultipleResultsFound:
logging.error("One or more of the specified paths matches multiple tracks in the database.")
return 0
def delete(self) -> Union[int, None]:
"""
Delete a playlist and its entries from the database, then clear the cached values.
"""
if not self.record:
return None
plid = self.record.id
stmt = delete(db.entry).where(db.entry.c.playlist_id == plid)
logging.debug(f"Deleting entries associated with playlist {plid}: {stmt}")
self.session.execute(stmt)
stmt = delete(db.playlist).where(db.playlist.c.id == plid)
logging.debug(f"Deleting playlist {plid}: {stmt}")
self.session.execute(stmt)
self.session.commit()
self._record = None
self._entries = None
2022-12-02 00:21:19 -08:00
self._deleted = True
return plid
2022-12-02 00:21:19 -08:00
def get_or_create(self, create_ok: bool = False) -> Row:
try:
return self.session.query(db.playlist).filter(db.playlist.c.slug == self.slug).one()
except NoResultFound:
logging.debug(f"Could not find a playlist with slug {self.slug}.")
if self.deleted:
raise RuntimeError("Object has been deleted.")
if self._create_ok or create_ok:
return self.save()
2022-12-02 21:43:51 -08:00
def load(self):
self.get_or_create(create_ok=False)
return self
2022-12-02 00:21:19 -08:00
def save(self) -> Row:
keys = {'slug': self.slug, 'name': self._name, 'description': self._description}
stmt = db.playlist.update(keys) if self._record else db.playlist.insert(keys)
results = self.session.execute(stmt)
self.session.commit()
logging.debug(f"Saved playlist {results.inserted_primary_key[0]} with slug {self.slug}")
return self.session.query(db.playlist).filter(db.playlist.c.id == results.inserted_primary_key[0]).one()
2022-11-30 00:09:23 -08:00
def create_entries(self, tracks: List[Row]) -> int:
"""
Append a list of tracks to a playlist by populating the entries table with records referencing the playlist and
the specified tracks.
Args:
tracks (list): A list of Row objects from the track table.
Returns:
int: The number of tracks added.
"""
2022-12-02 00:21:19 -08:00
maxtrack = self.session.query(func.max(db.entry.c.track)).filter_by(
playlist_id=self.record.id
).one()[0] or 0
self.session.execute(
db.entry.insert(),
[
{'playlist_id': self.record.id, 'track_id': obj.id, 'track': idx}
for (idx, obj) in enumerate(tracks, start=maxtrack+1)
]
)
self.session.commit()
self._entries = None
return len(tracks)
@classmethod
def from_row(cls, row, session):
pl = Playlist(slug=row.slug, session=session)
pl._record = row
return pl
def __repr__(self):
2022-11-30 00:09:23 -08:00
return self.as_string