rewrite in pyramid

This commit is contained in:
evilchili 2024-01-30 01:25:02 -08:00
parent 8f17ddfb05
commit 5faf5c97c1
7 changed files with 98 additions and 129 deletions

View File

@ -1,4 +0,0 @@
from .manager import db, session
__ALL__ = [db, session]

View File

@ -1,17 +1,20 @@
import logging import logging
import transaction
from ttfrog.db import db, session from ttfrog.db.manager import db
from ttfrog.db import schema
from sqlalchemy.exc import IntegrityError
# move this to json or whatever # move this to json or whatever
data = { data = {
'ancestry': [ 'Ancestry': [
{'name': 'human'}, {'id': 1, 'name': 'human'},
{'name': 'dragonborn'}, {'id': 2, 'name': 'dragonborn'},
{'name': 'tiefling'}, {'id': 3, 'name': 'tiefling'},
], ],
'character': [ 'Character': [
{'name': 'Sabetha', 'ancestry_name': 'tiefling', 'level': 10, 'str': 10, 'dex': 10, 'con': 10, 'int': 10, 'wis': 10, 'cha': 10}, {'id': 1, 'name': 'Sabetha', 'ancestry': 'tiefling', 'level': 10, 'str': 10, 'dex': 10, 'con': 10, 'int': 10, 'wis': 10, 'cha': 10},
] ]
} }
@ -20,23 +23,21 @@ def bootstrap():
""" """
Initialize the database with source data. Idempotent; will skip anything that already exists. Initialize the database with source data. Idempotent; will skip anything that already exists.
""" """
db.init_model() db.init()
for table_name, table in db.tables.items(): for table, records in data.items():
if table_name not in data: model = getattr(schema, table)
logging.debug("No bootstrap data for table {table_name}; skipping.")
continue
for rec in data[table_name]:
stmt = table.insert().values(**rec).prefix_with("OR IGNORE")
result, error = db.execute(stmt)
if error:
raise RuntimeError(error)
rec['id'] = result.inserted_primary_key[0] for rec in records:
if rec['id'] == 0: with transaction.manager as tx:
logging.info(f"Skipped existing {table_name} {rec}") obj = model(**rec)
continue db.session.add(obj)
obj.slug = db.slugify(rec)
if 'slug' in table.columns: try:
rec['slug'] = db.slugify(rec) tx.commit()
db.update(table, **rec) except IntegrityError as e:
logging.info(f"Created {table_name} {rec}") tx.abort()
if 'UNIQUE constraint failed' in str(e):
logging.info(f"Skipping existing {table} {rec}")
continue
raise
logging.info(f"Created {table} {rec}")

View File

@ -1,15 +1,21 @@
import transaction
import base64 import base64
import hashlib import hashlib
import logging import logging
from functools import cached_property from functools import cached_property
from pyramid_sqlalchemy import Session
from pyramid_sqlalchemy import init_sqlalchemy
from pyramid_sqlalchemy import metadata as _metadata
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from ttfrog.path import database from ttfrog.path import database
from ttfrog.db.schema import metadata import ttfrog.db.schema
ttfrog.db.schema
class SQLDatabaseManager: class SQLDatabaseManager:
@ -22,30 +28,34 @@ class SQLDatabaseManager:
@cached_property @cached_property
def engine(self): def engine(self):
return create_engine(self.url, future=True) return create_engine(self.url)
@cached_property @cached_property
def DBSession(self): def session(self):
maker = sessionmaker(bind=self.engine, future=True, autoflush=True) return Session
return scoped_session(maker)
@cached_property
def metadata(self):
return _metadata
@cached_property @cached_property
def tables(self): def tables(self):
return dict((t.name, t) for t in metadata.sorted_tables) return dict((t.name, t) for t in self.metadata.sorted_tables)
def query(self, *args, **kwargs): def query(self, *args, **kwargs):
return self.DBSession.query(*args, **kwargs) return self.session.query(*args, **kwargs)
def execute(self, statement) -> tuple: def execute(self, statement) -> tuple:
logging.debug(statement) logging.info(statement)
result = None result = None
error = None error = None
try: try:
result = self.DBSession.execute(statement) with transaction.manager as tx:
self.DBSession.commit() result = self.session.execute(statement)
tx.commit()
except IntegrityError as exc: except IntegrityError as exc:
logging.error(exc) logging.error(exc)
error = "An error occurred when saving changes." error = "I AM ERROR."
return result, error return result, error
def insert(self, table, **kwargs) -> tuple: def insert(self, table, **kwargs) -> tuple:
@ -57,10 +67,6 @@ class SQLDatabaseManager:
stmt = table.update().values(**kwargs).where(table.columns.id == primary_key) stmt = table.update().values(**kwargs).where(table.columns.id == primary_key)
return self.execute(stmt) return self.execute(stmt)
def init_model(self, engine=None):
metadata.create_all(bind=engine or self.engine)
return self.DBSession
def slugify(self, rec: dict) -> str: def slugify(self, rec: dict) -> str:
""" """
Create a uniquish slug from a dictionary. Create a uniquish slug from a dictionary.
@ -68,6 +74,9 @@ class SQLDatabaseManager:
sha1bytes = hashlib.sha1(str(rec['id']).encode()) sha1bytes = hashlib.sha1(str(rec['id']).encode())
return base64.urlsafe_b64encode(sha1bytes.digest()).decode("ascii")[:10] return base64.urlsafe_b64encode(sha1bytes.digest()).decode("ascii")[:10]
def init(self):
init_sqlalchemy(self.engine)
self.metadata.create_all(self.engine)
def __getattr__(self, name: str): def __getattr__(self, name: str):
try: try:
@ -75,14 +84,5 @@ class SQLDatabaseManager:
except KeyError: except KeyError:
raise AttributeError(f"{self} does not contain the attribute '{name}'.") raise AttributeError(f"{self} does not contain the attribute '{name}'.")
def __enter__(self):
self.init_model(self.engine)
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.DBSession:
self.DBSession.close()
db = SQLDatabaseManager() db = SQLDatabaseManager()
session = db.DBSession

View File

@ -9,29 +9,27 @@ from sqlalchemy import CheckConstraint
# from sqlalchemy import PrimaryKeyConstraint # from sqlalchemy import PrimaryKeyConstraint
# from sqlalchemy import DateTime # from sqlalchemy import DateTime
metadata = MetaData() from pyramid_sqlalchemy import BaseObject
Ancestry = Table( class Ancestry(BaseObject):
"ancestry", __tablename__ = "ancestry"
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String, index=True, unique=True),
Column("slug", String, index=True, unique=True),
Column("description", UnicodeText),
)
Character = Table( id = Column(Integer, primary_key=True, autoincrement=True)
"character", name = Column(String, index=True, unique=True)
metadata, slug = Column(String, index=True, unique=True)
Column("id", Integer, primary_key=True, autoincrement=True),
Column("slug", String, index=True, unique=True),
Column("ancestry_name", Integer, ForeignKey("ancestry.name")), class Character(BaseObject):
Column("name", String), __tablename__ = "character"
Column("level", Integer, CheckConstraint('level > 0 AND level <= 20')),
Column("str", Integer, CheckConstraint('str >=0')), id = Column(Integer, primary_key=True, autoincrement=True)
Column("dex", Integer, CheckConstraint('dex >=0')), slug = Column(String, index=True, unique=True)
Column("con", Integer, CheckConstraint('con >=0')), ancestry = Column(String, ForeignKey("ancestry.name"))
Column("int", Integer, CheckConstraint('int >=0')), name = Column(String)
Column("wis", Integer, CheckConstraint('wis >=0')), level = Column(Integer, CheckConstraint('level > 0 AND level <= 20'))
Column("cha", Integer, CheckConstraint('cha >=0')), str = Column(Integer, CheckConstraint('str >=0'))
) dex = Column(Integer, CheckConstraint('dex >=0'))
con = Column(Integer, CheckConstraint('con >=0'))
int = Column(Integer, CheckConstraint('int >=0'))
wis = Column(Integer, CheckConstraint('wis >=0'))
cha = Column(Integer, CheckConstraint('cha >=0'))

View File

@ -1,62 +1,24 @@
import logging import logging
from tg import MinimalApplicationConfigurator
from tg.configurator.components.statics import StaticsConfigurationComponent
from tg.configurator.components.sqlalchemy import SQLAlchemyConfigurationComponent
from tg.util.bunch import Bunch
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
import webhelpers2 from pyramid.config import Configurator
import tw2.core
from ttfrog.webserver.controllers.root import RootController from ttfrog.db.manager import db
from ttfrog.db import db from ttfrog.webserver.routes import routes
import ttfrog.path
def app_globals(): def configuration():
return Bunch config = Configurator(settings={
def application():
"""
Create a TurboGears2 application
"""
config = MinimalApplicationConfigurator()
config.register(StaticsConfigurationComponent)
config.register(SQLAlchemyConfigurationComponent)
config.update_blueprint({
# rendering
'root_controller': RootController(),
'default_renderer': 'jinja',
'renderers': ['jinja'],
'tg.jinja_filters': {},
'auto_reload_templates': True,
# helpers
'app_globals': app_globals,
'helpers': webhelpers2,
'use_toscawidgets2': True,
# assets
'serve_static': True,
'paths': {
'static_files': ttfrog.path.static_files(),
'templates': [ttfrog.path.templates()],
},
# db
'use_sqlalchemy': True,
'sqlalchemy.url': db.url, 'sqlalchemy.url': db.url,
'model': db,
}) })
config.include('pyramid_tm')
# wrap the core wsgi app in a ToscaWidgets2 app config.include('pyramid_sqlalchemy')
return tw2.core.make_middleware(config.make_wsgi_app(), default_engine='jinja') return config
def start(host: str, port: int, debug: bool = False) -> None: def start(host: str, port: int, debug: bool = False) -> None:
logging.debug(f"Configuring webserver with {host=}, {port=}, {debug=}") logging.debug(f"Configuring webserver with {host=}, {port=}, {debug=}")
make_server(host, int(port), application()).serve_forever() config = configuration()
config.include(routes)
config.scan('ttfrog.webserver.views')
make_server(host, int(port), config.make_wsgi_app()).serve_forever()

View File

@ -0,0 +1,2 @@
def routes(config):
config.add_route('index', '/')

10
ttfrog/webserver/views.py Normal file
View File

@ -0,0 +1,10 @@
from pyramid.response import Response
from pyramid.view import view_config
from ttfrog.db.manager import db
from ttfrog.db.schema import Ancestry
@view_config(route_name='index')
def index(request):
ancestries = [a.name for a in db.session.query(Ancestry).all()]
return Response(','.join(ancestries))