This commit is contained in:
evilchili 2025-10-07 01:18:36 -07:00
parent 582fd2d9a1
commit 6224d5fea4
5 changed files with 138 additions and 136 deletions

View File

@ -7,6 +7,7 @@ from dotenv import dotenv_values
from flask import Flask
from flask_session import Session
from grung.db import GrungDB
from tinydb import where
from tinydb.storages import MemoryStorage
from ttfrog import schema
@ -112,50 +113,27 @@ VIEW_URI=/
if not self._initialized:
raise ApplicationNotInitializedError("This action requires the application to be initialized.")
def add_member(self, parent: schema.Page, child: schema.Page):
parent.members.append(self.db.save(child))
parent = self.db.save(parent)
return parent.get_child(child)
def bootstrap(self):
def authenticate(self, username: str, password: str) -> schema.User:
"""
Bootstrap the database entries by populating the first Page, the Admin user and the Admins group.
Returns the User record matching the given username and password
"""
self.check_state()
if not (username and password):
self.web.logger.debug("Need both username and password to login")
return None
# create the top-level pages
root = self.db.save(schema.Page(name=self.config.VIEW_URI, title="Home", body="This is the home page"))
users = self.add_member(root, schema.Page(name="User", title="Users", body="users go here."))
groups = self.add_member(root, schema.Page(name="Group", title="Groups", body="groups go here."))
npcs = self.add_member(root, schema.Page(name="NPC", title="NPCS!", body="NPCS!"))
user = self.db.User.get(where("name") == username)
if not user:
self.web.logger.debug(f"No user matching {username}")
return None
# create the NPCs
sabetha = self.db.save(schema.NPC(name="Sabetha", body=""))
johns = self.db.save(schema.NPC(name="John", body=""))
self.add_member(npcs, sabetha)
self.add_member(npcs, johns)
if not user.check_credentials(username, password):
self.web.logger.debug(f"Invalid credentials for {username}")
return None
guest = self.add_member(users, schema.User(name="guest"))
return user
# create the admin user and admins group
admin = self.add_member(
users, schema.User(name=self.config.ADMIN_USERNAME, password="fnord", email=self.config.ADMIN_EMAIL)
)
admins = self.db.save(schema.Group(name="administrators"))
admins = self.add_member(groups, admins)
admin = self.add_member(admins, admin)
groups.set_permissions(
admins,
permissions=[schema.Permissions.READ, schema.Permissions.WRITE, schema.Permissions.DELETE],
db=self.db,
)
users.set_permissions(
admins,
permissions=[schema.Permissions.READ, schema.Permissions.WRITE, schema.Permissions.DELETE],
db=self.db,
)
def authorize(self, user, record, requested):
return user.has_permission(record, requested)
sys.modules[__name__] = ApplicationContext()

View File

@ -9,6 +9,7 @@ from rich import print
from rich.logging import RichHandler
import ttfrog.app
from ttfrog.bootstrap import bootstrap
main_app = typer.Typer()
@ -52,7 +53,7 @@ def init(context: typer.Context, drop: bool = typer.Option(False, help="Drop tab
ttfrog.app.db.drop_tables()
ttfrog.app.db.close()
ttfrog.app.initialize(force=True)
ttfrog.app.bootstrap()
bootstrap()
print(ttfrog.app.db.Page.all())
print(ttfrog.app.db)

View File

@ -1,10 +1,11 @@
from __future__ import annotations
from datetime import datetime
from functools import cache
from typing import List
from grung.types import BackReference, Collection, DateTime, Field, Password, Pointer, Record, Timestamp
from tinydb import Query
from tinydb import where
class Page(Record):
@ -14,6 +15,7 @@ class Page(Record):
@classmethod
def fields(cls):
# fmt: off
return [
*super().fields(), # Pick up the UID and whatever other non-optional fields exist
Field("uri", unique=True), # The URI for the page, relative to the app's VIEW_URI
@ -25,8 +27,9 @@ class Page(Record):
Pointer("author", value_type=User), # The last user to touch the page.
DateTime("created"), # When the page was created
Timestamp("last_modified"), # The last time the page was modified.
Collection("acl", Permissions),
Collection("acl", Permissions), # The access control list
]
# fmt: on
def before_insert(self, db):
"""
@ -62,62 +65,90 @@ class Page(Record):
obj.uri = f"{self.uri}/{obj.name}"
child = db.save(obj)
def add_member(self, child: Record):
from ttfrog import app
app.check_state()
self.members = list(set(self.members + [app.db.save(child)]))
app.db.save(self)
return self.get_child(child)
def get_child(self, obj: Record):
for page in self.members:
if page.uid == obj.uid:
return page
return None
def set_permissions(self, entity: Entity, permissions: List, db) -> Record:
perms = db.save(Permissions(entity=entity, grants="".join(permissions)))
self.acl = [entry for entry in self.acl if entry.entity != entity] + [perms]
self = db.save(self)
def set_permissions(self, entity: Entity, permissions: List) -> Record:
from ttfrog import app
app.check_state()
perms = app.db.save(Permissions(entity=entity, grants="".join(permissions)))
self.acl = list(set(self.acl + [perms]))
app.db.save(self)
return perms
@cache
def get_acl_for_entity(self, entity) -> list:
"""
Search upward through the page hierarchy looking for one with an ACL that either
has a grant for the entity we care about, or at least one group in which the entity is a member.
"""
from ttfrog import app
app.check_state()
def find_acl(obj):
if hasattr(obj, "acl"):
# examine each entry in the ACL and see if one refers to the entity we care about
group_grants = []
for entry in obj.acl:
if type(entry) == str:
entry = app.db.Permissions.get(where("uid") == entry.split("::")[1], recurse=False)
# grants specific to the entity always take precedence
if entry.entity.uid == entity.uid:
return [entry]
# keep track of grants to groups containing the entity
elif entity.reference in getattr(entry.entity, "members", []):
group_grants.append(entry)
# if we found group grants, return them
if group_grants:
return group_grants
# no ACL on this object, so check its parent, if there is one
if not hasattr(obj, "parent"):
return []
return find_acl(obj.parent)
return find_acl(self)
class Entity(Page):
def has_permission(self, record: Record, requested: str, db) -> bool | None:
# Find a non-empty ACL to use by starting with the requested reecord and traversing
# the hierarchy upwards. If we get to the root and there's no ACL anywhere, default
# to READ permissions.
def find_acl(obj):
if hasattr(obj, 'acl') and obj.acl:
return obj.acl
if not hasattr(obj, "parent"):
return None
return find_acl(obj.parent)
acl = find_acl(record)
if not acl:
return requested == Permissions.READ
# Use the grant specific to this entity, if there is one
for entry in record.acl:
if entry.entity.uid == self.uid:
return requested in entry.grants
# Check for grants for each of the entity's groups, if any
for group in db.Group.search(Query()["members"].any([self.reference])):
if group.has_permission(record, requested, db):
def has_permission(self, record: Record, requested: str) -> bool | None:
for acl in record.get_acl_for_entity(self):
if requested in acl.grants:
return True
return False
def can_read(self, record: Record, db):
return self.has_permission(record, Permissions.READ, db)
def can_read(self, record: Record):
return self.has_permission(record, Permissions.READ)
def can_write(self, record: Record, db):
return self.has_permission(record, Permissions.WRITE, db)
def can_write(self, record: Record):
return self.has_permission(record, Permissions.WRITE)
def can_delete(self, record: Record, db):
return self.has_permission(record, Permissions.DELETE, db)
def can_delete(self, record: Record):
return self.has_permission(record, Permissions.DELETE)
class User(Entity):
"""
A website user, editable as a wiki page.
"""
def check_credentials(self, username: str, password: str) -> bool:
return username == self.name and self._metadata.fields["password"].compare(password, self.password)

View File

@ -33,22 +33,16 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
return None
page = app.db.table(table).get(where("uri") == uri, recurse=False)
if hasattr(page, "acl"):
acl = []
for pointer in page.acl:
table, uid = pointer.split("::")
acl += app.db.table(table).search(where("uid") == uid, recurse=False)
page.acl = acl
if not page:
if not create_okay:
return None
parent = get_parent(table, uri)
if not g.user.can_read(parent, app.db):
if not app.authorize(g.user, parent, schema.Permissions.READ):
return None
return getattr(schema, table)(name=uri.split("/")[-1], body="This page does not exist", parent=parent)
if not g.user.can_read(page, app.db):
if not app.authorize(g.user, page, schema.Permissions.READ):
return None
if hasattr(page, "members"):
@ -89,37 +83,18 @@ def index():
return rendered(get_page(create_okay=False))
def do_login(username: str, password: str) -> bool:
"""
Update the session with the user record if the credenteials are valid.
"""
if not (username and password):
app.web.logger.debug("Need both username and password to login")
return False
user = app.db.User.get(where("name") == username)
if not user:
app.web.logger.debug(f"No user matching {username}")
return False
if not user.check_credentials(username, password):
app.web.logger.debug(f"Invalid credentials for {username}")
return False
app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.")
g.user = user
session["user_id"] = g.user.doc_id
session["user"] = dict(g.user.serialize())
return True
@app.web.route("/login", methods=["GET", "POST"])
def login():
app.web.session_interface.regenerate(session)
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if do_login(username, password):
user = app.authenticate(username, password)
if user:
g.user = user
session["user_id"] = g.user.doc_id
session["user"] = dict(g.user.serialize())
app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.")
return redirect(url_for("index"))
g.messages.append(f"Invalid login for {username}")
return rendered(schema.Page(name="Login", title="Please enter your login details"), "login.html")
@ -136,7 +111,6 @@ def logout():
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={"table": "Page"})
def view(table, path):
parent = get_parent(table, relative_uri())
print(parent)
return rendered(get_page(request.path, table=table, create_okay=(parent and parent.doc_id is not None)))
@ -150,6 +124,8 @@ def edit(table, path):
# get or create the docoument at this uri
page = get_page(uri, table=table, create_okay=True)
if not app.authorize(g.user, page, schema.Permissions.WRITE):
return Response("Permission denied.", status=403)
save_data = getattr(forms, table)(page, request.form).prepare()
# editing existing document
@ -159,7 +135,7 @@ def edit(table, path):
return rendered(app.db.save(save_data))
# saving a new document
return rendered(app.add_member(parent, save_data))
return rendered(parent.add_member(save_data))
@app.web.before_request

View File

@ -1,9 +1,10 @@
import pytest
from grung.db import GrungDB
from tinydb import where
from tinydb.storages import MemoryStorage
import ttfrog.app
from ttfrog import schema
from grung.db import GrungDB
from tinydb.storages import MemoryStorage
@pytest.fixture
@ -43,23 +44,38 @@ def test_permissions(app):
players = app.db.save(schema.Group(name="players", members=[john]))
notes = app.db.save(schema.Page(name="notes"))
# default read-only
assert players.can_read(notes, app.db)
assert not players.can_write(notes, app.db)
assert not players.can_delete(notes, app.db)
# default no access
assert not players.can_read(notes)
assert not players.can_write(notes)
assert not players.can_delete(notes)
assert not john.can_read(notes)
assert not john.can_write(notes)
assert not john.can_delete(notes)
# set to rw, no delete
notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE], app.db)
assert players.can_read(notes, app.db)
assert players.can_write(notes, app.db)
assert not players.can_delete(notes, app.db)
notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE])
notes = app.db.Page.get(doc_id=notes.doc_id)
assert players.can_read(notes)
assert players.can_write(notes)
assert not players.can_delete(notes)
# members of the group inherit group permissions
assert john.can_read(notes, app.db)
assert john.can_write(notes, app.db)
assert not john.can_delete(notes, app.db)
assert john.can_read(notes)
assert john.can_write(notes)
assert not john.can_delete(notes)
# permissions are the union of user + group permissions
notes.set_permissions(john, [schema.Permissions.DELETE], app.db)
assert not players.can_delete(notes, app.db)
assert john.can_delete(notes, app.db)
notes.set_permissions(john, [schema.Permissions.DELETE])
assert not players.can_delete(notes)
assert john.can_delete(notes)
def test_bootstrap(app):
from ttfrog.bootstrap import bootstrap
bootstrap()
admins = app.db.Group.get(where("name") == "administrators")
admin = app.db.User.get(where("name") == "admin")
assert admin.reference in admins.members