diff --git a/src/ttfrog/app.py b/src/ttfrog/app.py index e4ea247..341ed24 100644 --- a/src/ttfrog/app.py +++ b/src/ttfrog/app.py @@ -7,6 +7,7 @@ from dotenv import dotenv_values from flask import Flask from flask_session import Session from grung.db import GrungDB +from tinydb import where from tinydb.storages import MemoryStorage from ttfrog import schema @@ -112,50 +113,27 @@ VIEW_URI=/ if not self._initialized: raise ApplicationNotInitializedError("This action requires the application to be initialized.") - def add_member(self, parent: schema.Page, child: schema.Page): - parent.members.append(self.db.save(child)) - parent = self.db.save(parent) - return parent.get_child(child) - - def bootstrap(self): + def authenticate(self, username: str, password: str) -> schema.User: """ - Bootstrap the database entries by populating the first Page, the Admin user and the Admins group. + Returns the User record matching the given username and password """ - self.check_state() + if not (username and password): + self.web.logger.debug("Need both username and password to login") + return None - # create the top-level pages - root = self.db.save(schema.Page(name=self.config.VIEW_URI, title="Home", body="This is the home page")) - users = self.add_member(root, schema.Page(name="User", title="Users", body="users go here.")) - groups = self.add_member(root, schema.Page(name="Group", title="Groups", body="groups go here.")) - npcs = self.add_member(root, schema.Page(name="NPC", title="NPCS!", body="NPCS!")) + user = self.db.User.get(where("name") == username) + if not user: + self.web.logger.debug(f"No user matching {username}") + return None - # create the NPCs - sabetha = self.db.save(schema.NPC(name="Sabetha", body="")) - johns = self.db.save(schema.NPC(name="John", body="")) - self.add_member(npcs, sabetha) - self.add_member(npcs, johns) + if not user.check_credentials(username, password): + self.web.logger.debug(f"Invalid credentials for {username}") + return None - guest = self.add_member(users, schema.User(name="guest")) + return user - # create the admin user and admins group - admin = self.add_member( - users, schema.User(name=self.config.ADMIN_USERNAME, password="fnord", email=self.config.ADMIN_EMAIL) - ) - admins = self.db.save(schema.Group(name="administrators")) - admins = self.add_member(groups, admins) - admin = self.add_member(admins, admin) - - groups.set_permissions( - admins, - permissions=[schema.Permissions.READ, schema.Permissions.WRITE, schema.Permissions.DELETE], - db=self.db, - ) - - users.set_permissions( - admins, - permissions=[schema.Permissions.READ, schema.Permissions.WRITE, schema.Permissions.DELETE], - db=self.db, - ) + def authorize(self, user, record, requested): + return user.has_permission(record, requested) sys.modules[__name__] = ApplicationContext() diff --git a/src/ttfrog/cli.py b/src/ttfrog/cli.py index 86e7ec3..22910c5 100644 --- a/src/ttfrog/cli.py +++ b/src/ttfrog/cli.py @@ -9,6 +9,7 @@ from rich import print from rich.logging import RichHandler import ttfrog.app +from ttfrog.bootstrap import bootstrap main_app = typer.Typer() @@ -52,7 +53,7 @@ def init(context: typer.Context, drop: bool = typer.Option(False, help="Drop tab ttfrog.app.db.drop_tables() ttfrog.app.db.close() ttfrog.app.initialize(force=True) - ttfrog.app.bootstrap() + bootstrap() print(ttfrog.app.db.Page.all()) print(ttfrog.app.db) diff --git a/src/ttfrog/schema.py b/src/ttfrog/schema.py index f9557b9..727197b 100644 --- a/src/ttfrog/schema.py +++ b/src/ttfrog/schema.py @@ -1,10 +1,11 @@ from __future__ import annotations from datetime import datetime +from functools import cache from typing import List from grung.types import BackReference, Collection, DateTime, Field, Password, Pointer, Record, Timestamp -from tinydb import Query +from tinydb import where class Page(Record): @@ -14,19 +15,21 @@ class Page(Record): @classmethod def fields(cls): + # fmt: off return [ - *super().fields(), # Pick up the UID and whatever other non-optional fields exist - Field("uri", unique=True), # The URI for the page, relative to the app's VIEW_URI - Field("name"), # The portion of the URI after the last / - Field("title"), # The page title - Field("body"), # The main content blob of the page - Collection("members", Page), # The pages that exist below this page's URI + *super().fields(), # Pick up the UID and whatever other non-optional fields exist + Field("uri", unique=True), # The URI for the page, relative to the app's VIEW_URI + Field("name"), # The portion of the URI after the last / + Field("title"), # The page title + Field("body"), # The main content blob of the page + Collection("members", Page), # The pages that exist below this page's URI BackReference("parent", value_type=Page), # The page that exists above this page's URI - Pointer("author", value_type=User), # The last user to touch the page. - DateTime("created"), # When the page was created - Timestamp("last_modified"), # The last time the page was modified. - Collection("acl", Permissions), + Pointer("author", value_type=User), # The last user to touch the page. + DateTime("created"), # When the page was created + Timestamp("last_modified"), # The last time the page was modified. + Collection("acl", Permissions), # The access control list ] + # fmt: on def before_insert(self, db): """ @@ -62,62 +65,90 @@ class Page(Record): obj.uri = f"{self.uri}/{obj.name}" child = db.save(obj) + def add_member(self, child: Record): + from ttfrog import app + + app.check_state() + self.members = list(set(self.members + [app.db.save(child)])) + app.db.save(self) + return self.get_child(child) + def get_child(self, obj: Record): for page in self.members: if page.uid == obj.uid: return page return None - def set_permissions(self, entity: Entity, permissions: List, db) -> Record: - perms = db.save(Permissions(entity=entity, grants="".join(permissions))) - self.acl = [entry for entry in self.acl if entry.entity != entity] + [perms] - self = db.save(self) + def set_permissions(self, entity: Entity, permissions: List) -> Record: + from ttfrog import app + + app.check_state() + + perms = app.db.save(Permissions(entity=entity, grants="".join(permissions))) + self.acl = list(set(self.acl + [perms])) + app.db.save(self) return perms + @cache + def get_acl_for_entity(self, entity) -> list: + """ + Search upward through the page hierarchy looking for one with an ACL that either + has a grant for the entity we care about, or at least one group in which the entity is a member. + """ + from ttfrog import app + + app.check_state() + + def find_acl(obj): + if hasattr(obj, "acl"): + # examine each entry in the ACL and see if one refers to the entity we care about + group_grants = [] + for entry in obj.acl: + if type(entry) == str: + entry = app.db.Permissions.get(where("uid") == entry.split("::")[1], recurse=False) + + # grants specific to the entity always take precedence + if entry.entity.uid == entity.uid: + return [entry] + + # keep track of grants to groups containing the entity + elif entity.reference in getattr(entry.entity, "members", []): + group_grants.append(entry) + + # if we found group grants, return them + if group_grants: + return group_grants + + # no ACL on this object, so check its parent, if there is one + if not hasattr(obj, "parent"): + return [] + return find_acl(obj.parent) + + return find_acl(self) + class Entity(Page): - def has_permission(self, record: Record, requested: str, db) -> bool | None: - - # Find a non-empty ACL to use by starting with the requested reecord and traversing - # the hierarchy upwards. If we get to the root and there's no ACL anywhere, default - # to READ permissions. - def find_acl(obj): - if hasattr(obj, 'acl') and obj.acl: - return obj.acl - if not hasattr(obj, "parent"): - return None - return find_acl(obj.parent) - - acl = find_acl(record) - if not acl: - return requested == Permissions.READ - - # Use the grant specific to this entity, if there is one - for entry in record.acl: - if entry.entity.uid == self.uid: - return requested in entry.grants - - # Check for grants for each of the entity's groups, if any - for group in db.Group.search(Query()["members"].any([self.reference])): - if group.has_permission(record, requested, db): + def has_permission(self, record: Record, requested: str) -> bool | None: + for acl in record.get_acl_for_entity(self): + if requested in acl.grants: return True - return False - def can_read(self, record: Record, db): - return self.has_permission(record, Permissions.READ, db) + def can_read(self, record: Record): + return self.has_permission(record, Permissions.READ) - def can_write(self, record: Record, db): - return self.has_permission(record, Permissions.WRITE, db) + def can_write(self, record: Record): + return self.has_permission(record, Permissions.WRITE) - def can_delete(self, record: Record, db): - return self.has_permission(record, Permissions.DELETE, db) + def can_delete(self, record: Record): + return self.has_permission(record, Permissions.DELETE) class User(Entity): """ A website user, editable as a wiki page. """ + def check_credentials(self, username: str, password: str) -> bool: return username == self.name and self._metadata.fields["password"].compare(password, self.password) diff --git a/src/ttfrog/web.py b/src/ttfrog/web.py index f972b50..0f94679 100644 --- a/src/ttfrog/web.py +++ b/src/ttfrog/web.py @@ -33,22 +33,16 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False): return None page = app.db.table(table).get(where("uri") == uri, recurse=False) - if hasattr(page, "acl"): - acl = [] - for pointer in page.acl: - table, uid = pointer.split("::") - acl += app.db.table(table).search(where("uid") == uid, recurse=False) - page.acl = acl if not page: if not create_okay: return None parent = get_parent(table, uri) - if not g.user.can_read(parent, app.db): + if not app.authorize(g.user, parent, schema.Permissions.READ): return None return getattr(schema, table)(name=uri.split("/")[-1], body="This page does not exist", parent=parent) - if not g.user.can_read(page, app.db): + if not app.authorize(g.user, page, schema.Permissions.READ): return None if hasattr(page, "members"): @@ -89,37 +83,18 @@ def index(): return rendered(get_page(create_okay=False)) -def do_login(username: str, password: str) -> bool: - """ - Update the session with the user record if the credenteials are valid. - """ - if not (username and password): - app.web.logger.debug("Need both username and password to login") - return False - - user = app.db.User.get(where("name") == username) - if not user: - app.web.logger.debug(f"No user matching {username}") - return False - - if not user.check_credentials(username, password): - app.web.logger.debug(f"Invalid credentials for {username}") - return False - - app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.") - g.user = user - session["user_id"] = g.user.doc_id - session["user"] = dict(g.user.serialize()) - return True - - @app.web.route("/login", methods=["GET", "POST"]) def login(): app.web.session_interface.regenerate(session) if request.method == "POST": username = request.form.get("username") password = request.form.get("password") - if do_login(username, password): + user = app.authenticate(username, password) + if user: + g.user = user + session["user_id"] = g.user.doc_id + session["user"] = dict(g.user.serialize()) + app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.") return redirect(url_for("index")) g.messages.append(f"Invalid login for {username}") return rendered(schema.Page(name="Login", title="Please enter your login details"), "login.html") @@ -136,7 +111,6 @@ def logout(): @app.web.route(f"{app.config.VIEW_URI}/", methods=["GET"], defaults={"table": "Page"}) def view(table, path): parent = get_parent(table, relative_uri()) - print(parent) return rendered(get_page(request.path, table=table, create_okay=(parent and parent.doc_id is not None))) @@ -150,6 +124,8 @@ def edit(table, path): # get or create the docoument at this uri page = get_page(uri, table=table, create_okay=True) + if not app.authorize(g.user, page, schema.Permissions.WRITE): + return Response("Permission denied.", status=403) save_data = getattr(forms, table)(page, request.form).prepare() # editing existing document @@ -159,7 +135,7 @@ def edit(table, path): return rendered(app.db.save(save_data)) # saving a new document - return rendered(app.add_member(parent, save_data)) + return rendered(parent.add_member(save_data)) @app.web.before_request diff --git a/test/test_db.py b/test/test_db.py index a73f76a..e654881 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -1,9 +1,10 @@ import pytest +from grung.db import GrungDB +from tinydb import where +from tinydb.storages import MemoryStorage import ttfrog.app from ttfrog import schema -from grung.db import GrungDB -from tinydb.storages import MemoryStorage @pytest.fixture @@ -43,23 +44,38 @@ def test_permissions(app): players = app.db.save(schema.Group(name="players", members=[john])) notes = app.db.save(schema.Page(name="notes")) - # default read-only - assert players.can_read(notes, app.db) - assert not players.can_write(notes, app.db) - assert not players.can_delete(notes, app.db) + # default no access + assert not players.can_read(notes) + assert not players.can_write(notes) + assert not players.can_delete(notes) + assert not john.can_read(notes) + assert not john.can_write(notes) + assert not john.can_delete(notes) # set to rw, no delete - notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE], app.db) - assert players.can_read(notes, app.db) - assert players.can_write(notes, app.db) - assert not players.can_delete(notes, app.db) + notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE]) + notes = app.db.Page.get(doc_id=notes.doc_id) + + assert players.can_read(notes) + assert players.can_write(notes) + assert not players.can_delete(notes) # members of the group inherit group permissions - assert john.can_read(notes, app.db) - assert john.can_write(notes, app.db) - assert not john.can_delete(notes, app.db) + assert john.can_read(notes) + assert john.can_write(notes) + assert not john.can_delete(notes) # permissions are the union of user + group permissions - notes.set_permissions(john, [schema.Permissions.DELETE], app.db) - assert not players.can_delete(notes, app.db) - assert john.can_delete(notes, app.db) + notes.set_permissions(john, [schema.Permissions.DELETE]) + assert not players.can_delete(notes) + assert john.can_delete(notes) + + +def test_bootstrap(app): + from ttfrog.bootstrap import bootstrap + + bootstrap() + + admins = app.db.Group.get(where("name") == "administrators") + admin = app.db.User.get(where("name") == "admin") + assert admin.reference in admins.members