Compare commits

..

3 Commits

Author SHA1 Message Date
evilchili
0e8fd9a1b0 Implement ACLs 2025-10-05 00:15:37 -07:00
evilchili
36006ceeea Adding sessions 2025-10-04 10:48:18 -07:00
evilchili
3854a877bf only allow page creation if parent exists 2025-10-04 09:00:50 -07:00
7 changed files with 228 additions and 46 deletions

View File

@ -19,6 +19,7 @@ pyyaml = "^6.0.2"
nanoid = "^2.0.0"
# grung-db = {git = "https://git.evilchi.li/evilchili/grung-db.git"}
grung-db = {git = "file:///home/greg/dev/grung-db/"}
flask-session = "^0.8.0"
[tool.poetry.group.dev.dependencies]
pytest = "*"

View File

@ -5,8 +5,8 @@ from types import SimpleNamespace
from dotenv import dotenv_values
from flask import Flask
from flask_session import Session
from grung.db import GrungDB
from grung.exceptions import UniqueConstraintError
from tinydb.storages import MemoryStorage
from ttfrog import schema
@ -78,6 +78,7 @@ VIEW_URI=/
config=config_file,
data_root=data_root,
database=data_root / f"{self.config.NAME}.json",
sessions=data_root / "session_cache",
)
def initialize(self, db: GrungDB = None, force: bool = False) -> None:
@ -85,7 +86,9 @@ VIEW_URI=/
Instantiate both the database and the flask application.
"""
if force or not self._initialized:
if self.config.IN_MEMORY_DB:
if db:
self.db = db
elif self.config.IN_MEMORY_DB:
self.db = GrungDB.with_schema(schema, storage=MemoryStorage)
else:
self.db = GrungDB.with_schema(
@ -98,6 +101,10 @@ VIEW_URI=/
self.web.config["SECRET_KEY"] = self.config.SECRET_KEY
self.web.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
self.web.config["DEBUG"] = True
self.web.config["SESSION_TYPE"] = "filesystem"
self.web.config["SESSION_REFRESH_EACH_REQUEST"] = True
self.web.config["SESSION_FILE_DIR"] = self.path.sessions
Session(self.web)
self._initialized = True
@ -128,11 +135,28 @@ VIEW_URI=/
self.add_member(npcs, sabetha)
self.add_member(npcs, johns)
guest = self.add_member(users, schema.User(name="guest"))
# create the admin user and admins group
admin = self.add_member(users, schema.User(name=self.config.ADMIN_USERNAME, email=self.config.ADMIN_EMAIL))
admin = self.add_member(
users, schema.User(name=self.config.ADMIN_USERNAME, password="fnord", email=self.config.ADMIN_EMAIL)
)
admins = self.db.save(schema.Group(name="administrators"))
admins = self.add_member(groups, admins)
self.add_member(admins, admin)
admin = self.add_member(admins, admin)
groups.set_permissions(admins, permissions=[
schema.Permissions.READ,
schema.Permissions.WRITE,
schema.Permissions.DELETE
], db=self.db)
users.set_permissions(admins, permissions=[
schema.Permissions.READ,
schema.Permissions.WRITE,
schema.Permissions.DELETE
], db=self.db)
sys.modules[__name__] = ApplicationContext()

View File

@ -1,6 +1,7 @@
from dataclasses import dataclass, field
from functools import cached_property
from flask import g
from grung.types import BackReference, Collection, Pointer, Record
from ttfrog import schema
@ -13,6 +14,7 @@ class Form:
"""
The base Form controller for the web UI.
"""
record: Record
data: field(default_factory=dict)
@ -24,12 +26,12 @@ class Form:
def prepare(self):
for key, value in self.data.items():
# filter out fields that cannot be set by the user
if key in self.read_only:
continue
self.record[key] = value
self.record.author = g.user
return self.record
@ -38,6 +40,7 @@ class Page(Form):
"""
A form for creating and updating Page records.
"""
record: schema.Page
@cached_property
@ -50,6 +53,7 @@ class NPC(Page):
"""
A form for creating and updating Page records.
"""
record: schema.NPC
@ -58,6 +62,7 @@ class User(Page):
"""
A form for creating and updating Page records.
"""
record: schema.NPC
@ -66,4 +71,5 @@ class Group(Page):
"""
A form for creating and updating Page records.
"""
record: schema.NPC

View File

@ -1,5 +1,10 @@
from __future__ import annotations
from grung.types import BackReference, Collection, Field, Record, Pointer
from datetime import datetime
from typing import List
from grung.types import BackReference, Collection, DateTime, Field, Password, Pointer, Record, Timestamp
from tinydb import Query
class Page(Record):
@ -18,7 +23,9 @@ class Page(Record):
Collection("members", Page), # The pages that exist below this page's URI
BackReference("parent", value_type=Page), # The page that exists above this page's URI
Pointer("author", value_type=User), # The last user to touch the page.
# DateTime("last_modified"), # The last time the page was modified.
DateTime("created"), # When the page was created
Timestamp("last_modified"), # The last time the page was modified.
Collection("acl", Permissions),
]
def before_insert(self, db):
@ -29,6 +36,10 @@ class Page(Record):
"""
super().before_insert(db)
now = datetime.utcnow()
if not self.doc_id and self.created < now:
self.created = now
if not self.name and not self.title:
raise Exception("Must provide either a name or a title!")
if not self.name:
@ -44,7 +55,7 @@ class Page(Record):
correct URI. This ensures that if a page is moved from one collection to another, the URI is updated.
"""
super().after_insert(db)
if not hasattr(self, 'members'):
if not hasattr(self, "members"):
return
for child in self.members:
obj = BackReference.dereference(child, db)
@ -57,24 +68,62 @@ class Page(Record):
return page
return None
def set_permissions(self, entity: Entity, permissions: List, db) -> Record:
perms = db.save(Permissions(entity=entity, grants="".join(permissions)))
self.acl = [entry for entry in self.acl if entry.entity != entity] + [perms]
self = db.save(self)
return perms
class User(Page):
class Entity(Page):
def has_permission(self, record: Record, requested: str, db) -> bool:
# if there's no ACL at all, the record is world-readable.
if not getattr(record, "acl", None):
return requested == Permissions.READ
# Use the grant specific to this entity, if there is one
for entry in record.acl:
if entry.entity.uid == self.uid:
return requested in entry.grants
for group in db.Group.search(Query()["members"].any([self.reference])):
if group.has_permission(record, requested, db):
return True
return False
def can_read(self, record: Record, db):
return self.has_permission(record, Permissions.READ, db)
def can_write(self, record: Record, db):
return self.has_permission(record, Permissions.WRITE, db)
def can_delete(self, record: Record, db):
return self.has_permission(record, Permissions.DELETE, db)
class User(Entity):
"""
A website user, editable as a wiki page.
"""
def check_credentials(self, username: str, password: str) -> bool:
return username == self.name and self._metadata.fields["password"].compare(password, self.password)
@classmethod
def fields(cls):
return [
field
for field in [
*super().fields(),
Field("email", unique=True)
] if field.name != "members"
Field("email", unique=True),
Password("password"),
]
if field.name != "members"
]
class Group(Page):
class Group(Entity):
"""
A set of users, editable as a wiki page.
"""
@ -84,3 +133,13 @@ class NPC(Page):
"""
An NPC, editable as a wiki page.
"""
class Permissions(Record):
READ = "r"
WRITE = "w"
DELETE = "d"
@classmethod
def fields(cls):
return [*super().fields(), Pointer("entity", Entity), Field("grants")]

View File

@ -16,15 +16,22 @@
<span>&nbsp; / <a href="{{ app.config.VIEW_URI }}{{ uri }}">{{ name }}</a></span>
{% endfor %}
</nav>
{% if session['user_id'] == 1 %}
Welcome, {{ user['name'] }}. [ <a href="{{ url_for('login') }}">LOGIN</a> ]
{% else %}
Welcome, <a href="{{ url_for('User/' + user['name']) }}">{{ user['name'] }}</a>.
{% endif %}
<nav>
Menu:
<ul>
{% block menu %}{% endblock %}
</ul>
</nav>
Last Edited By: {{ page.author.name }}
<br>
<main>
{% for message in get_flashed_messages() %}
{% for message in g.messages %}
<div class="alert">
{{ message }}
</div>

View File

@ -1,7 +1,7 @@
from flask import Response, render_template, request
from flask import Response, g, redirect, render_template, request, session, url_for
from tinydb import where
from ttfrog import app, schema, forms
from ttfrog import app, forms, schema
STATIC = ["static"]
@ -16,11 +16,11 @@ def relative_uri(path: str = ""):
def get_parent(table: str, uri: str):
try:
parent_uri = uri.strip("/").split("/", -1)[0]
parent_uri = uri.strip("/").rsplit("/", 1)[0]
except IndexError:
return None
return get_page(parent_uri, table=table if '/' in parent_uri else 'Page', create_okay=False)
return get_page(parent_uri, table=table if "/" in parent_uri else "Page", create_okay=False)
def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
@ -32,20 +32,28 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
if table not in app.db.tables():
return None
matches = app.db.table(table).search(where("uri") == uri, recurse=False)
if not matches:
page = app.db.table(table).get(where("uri") == uri, recurse=False)
if hasattr(page, "acl"):
acl = []
for pointer in page.acl:
table, uid = pointer.split("::")
acl += app.db.table(table).search(where("uid") == uid, recurse=False)
page.acl = acl
if not page:
if not create_okay:
return None
return getattr(schema, table)(
name=uri.split("/")[-1],
body="This page does not exist",
parent = get_parent(table, uri)
)
page = matches[0]
if not g.user.can_read(parent, app.db):
return None
return getattr(schema, table)(name=uri.split("/")[-1], body="This page does not exist", parent=parent)
if hasattr(page, 'members'):
if not g.user.can_read(page, app.db):
return None
if hasattr(page, "members"):
subpages = []
for pointer in matches[0].members:
for pointer in page.members:
table, uid = pointer.split("::")
subpages += app.db.table(table).search(where("uid") == uid, recurse=False)
page.members = subpages
@ -56,7 +64,7 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
def rendered(page: schema.Record, template: str = "page.html"):
if not page:
return Response("Page not found", status=404)
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs())
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs(), user=session['user'], g=g)
def get_static(path):
@ -81,19 +89,62 @@ def index():
return rendered(get_page(create_okay=False))
def do_login(username: str, password: str) -> bool:
"""
Update the session with the user record if the credenteials are valid.
"""
if not (username and password):
return False
user = app.db.User.get(where("name") == "username")
if not user:
return False
if not user.check_credentials(username, password):
return False
app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.")
g.user = user
session["user_id"] = g.user.doc_id
session["user"] = dict(g.user.serialize())
return True
@app.web.route("/login", methods=["GET", "POST"])
def login():
app.web.session_interface.regenerate(session)
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if do_login(username, password):
return redirect(url_for("index"))
g.messages.append(f"Invalid login for {username}")
return rendered(schema.Page(name="Login", title="Please enter your login details"), "login.html")
@app.web.route("/logout")
def logout():
if "user_id" in session:
del session["user_id"]
del g.user
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["GET"])
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={'table': 'Page'})
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={"table": "Page"})
def view(table, path):
return rendered(get_page(request.path, table=table, create_okay=True))
parent = get_parent(table, relative_uri())
print(parent)
return rendered(get_page(request.path, table=table, create_okay=(parent and parent.doc_id is not None)))
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["POST"])
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["POST"], defaults={'table': 'Page'})
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["POST"], defaults={"table": "Page"})
def edit(table, path):
uri = relative_uri()
parent = get_parent(table, uri)
if not parent:
return Response(f"Parent for {uri} does not exist.", status=403)
return Response("You cannot create a page at this location.", status=403)
# get or create the docoument at this uri
page = get_page(uri, table=table, create_okay=True)
@ -109,6 +160,15 @@ def edit(table, path):
return rendered(app.add_member(parent, save_data))
@app.web.before_request
def before_request():
g.messages = []
user_id = session.get("user_id", 1)
g.user = app.db.User.get(doc_id=user_id)
session["user_id"] = user_id
session["user"] = dict(g.user.serialize())
@app.web.after_request
def add_header(r):
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"

View File

@ -2,18 +2,21 @@ import pytest
import ttfrog.app
from ttfrog import schema
from grung.db import GrungDB
from tinydb.storages import MemoryStorage
@pytest.fixture
def app():
fixture_db = GrungDB.with_schema(schema, storage=MemoryStorage)
ttfrog.app.load_config(defaults=None, IN_MEMORY_DB=1)
ttfrog.app.initialize()
ttfrog.app.initialize(db=fixture_db, force=True)
yield ttfrog.app
ttfrog.app.db.close()
ttfrog.app.db.truncate()
def test_create(app):
user = schema.User(name="john", email="john@foo")
user = schema.User(name="john", email="john@foo", password="powerfulCat")
assert user.uid
assert user._metadata.fields["uid"].unique
@ -34,7 +37,29 @@ def test_create(app):
assert after_update == john_something
assert before_update != after_update
players = schema.Group(name="players", users=[john_something])
players = app.db.save(players)
players.users[0]["name"] = "fnord"
app.db.save(players)
def test_permissions(app):
john = app.db.save(schema.User(name="john", email="john@foo", password="powerfulCat"))
players = app.db.save(schema.Group(name="players", members=[john]))
notes = app.db.save(schema.Page(name="notes"))
# default read-only
assert players.can_read(notes, app.db)
assert not players.can_write(notes, app.db)
assert not players.can_delete(notes, app.db)
# set to rw, no delete
notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE], app.db)
assert players.can_read(notes, app.db)
assert players.can_write(notes, app.db)
assert not players.can_delete(notes, app.db)
# members of the group inherit group permissions
assert john.can_read(notes, app.db)
assert john.can_write(notes, app.db)
assert not john.can_delete(notes, app.db)
# permissions are the union of user + group permissions
notes.set_permissions(john, [schema.Permissions.DELETE], app.db)
assert not players.can_delete(notes, app.db)
assert john.can_delete(notes, app.db)