tabletop-frog/src/ttfrog/db/manager.py

92 lines
2.4 KiB
Python

import base64
import hashlib
import json
import os
from contextlib import contextmanager
from functools import cached_property
import transaction
from pyramid_sqlalchemy import Session, init_sqlalchemy
from pyramid_sqlalchemy import metadata as _metadata
from sqlalchemy import create_engine
import ttfrog.db.schema
from ttfrog.path import database
assert ttfrog.db.schema
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
try:
return getattr(obj, "__json__")()
except (AttributeError, NotImplementedError): # pragma: no cover
return super().default(obj)
class SQLDatabaseManager:
"""
A context manager for working with sqllite database.
"""
@cached_property
def url(self):
return os.environ.get("DATABASE_URL", f"sqlite:///{database()}")
@cached_property
def engine(self):
return create_engine(self.url)
@cached_property
def session(self):
return Session
@cached_property
def metadata(self):
return _metadata
@cached_property
def tables(self):
return dict((t.name, t) for t in self.metadata.sorted_tables)
@contextmanager
def transaction(self):
with transaction.manager as tm:
yield tm
try:
tm.commit()
except Exception: # pragam: no cover
tm.abort()
raise
def add_or_update(self, *args, **kwargs):
self.session.add(*args, **kwargs)
self.session.flush()
def query(self, *args, **kwargs):
return self.session.query(*args, **kwargs)
def slugify(self, rec: dict) -> str:
"""
Create a uniquish slug from a dictionary.
"""
sha1bytes = hashlib.sha1(str(rec["id"]).encode())
return base64.urlsafe_b64encode(sha1bytes.digest()).decode("ascii")[:10]
def init(self):
init_sqlalchemy(self.engine)
self.metadata.create_all(self.engine)
def dump(self, names: list = []):
results = {}
for table_name, table in self.tables.items():
if not names or table_name in names:
results[table_name] = [dict(row._mapping) for row in self.query(table).all()]
return json.dumps(results, indent=2, cls=AlchemyEncoder)
def __getattr__(self, name: str):
return self.query(getattr(ttfrog.db.schema, name))
db = SQLDatabaseManager()