Compare commits

..

No commits in common. "0e8fd9a1b087521efcaa1681396171281f2c7368" and "c9927656ce7523c9f3dab060ac9d7be560e87b81" have entirely different histories.

7 changed files with 46 additions and 228 deletions

View File

@ -19,7 +19,6 @@ pyyaml = "^6.0.2"
nanoid = "^2.0.0"
# grung-db = {git = "https://git.evilchi.li/evilchili/grung-db.git"}
grung-db = {git = "file:///home/greg/dev/grung-db/"}
flask-session = "^0.8.0"
[tool.poetry.group.dev.dependencies]
pytest = "*"

View File

@ -5,8 +5,8 @@ from types import SimpleNamespace
from dotenv import dotenv_values
from flask import Flask
from flask_session import Session
from grung.db import GrungDB
from grung.exceptions import UniqueConstraintError
from tinydb.storages import MemoryStorage
from ttfrog import schema
@ -78,7 +78,6 @@ VIEW_URI=/
config=config_file,
data_root=data_root,
database=data_root / f"{self.config.NAME}.json",
sessions=data_root / "session_cache",
)
def initialize(self, db: GrungDB = None, force: bool = False) -> None:
@ -86,9 +85,7 @@ VIEW_URI=/
Instantiate both the database and the flask application.
"""
if force or not self._initialized:
if db:
self.db = db
elif self.config.IN_MEMORY_DB:
if self.config.IN_MEMORY_DB:
self.db = GrungDB.with_schema(schema, storage=MemoryStorage)
else:
self.db = GrungDB.with_schema(
@ -101,10 +98,6 @@ VIEW_URI=/
self.web.config["SECRET_KEY"] = self.config.SECRET_KEY
self.web.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
self.web.config["DEBUG"] = True
self.web.config["SESSION_TYPE"] = "filesystem"
self.web.config["SESSION_REFRESH_EACH_REQUEST"] = True
self.web.config["SESSION_FILE_DIR"] = self.path.sessions
Session(self.web)
self._initialized = True
@ -135,28 +128,11 @@ VIEW_URI=/
self.add_member(npcs, sabetha)
self.add_member(npcs, johns)
guest = self.add_member(users, schema.User(name="guest"))
# create the admin user and admins group
admin = self.add_member(
users, schema.User(name=self.config.ADMIN_USERNAME, password="fnord", email=self.config.ADMIN_EMAIL)
)
admin = self.add_member(users, schema.User(name=self.config.ADMIN_USERNAME, email=self.config.ADMIN_EMAIL))
admins = self.db.save(schema.Group(name="administrators"))
admins = self.add_member(groups, admins)
admin = self.add_member(admins, admin)
groups.set_permissions(admins, permissions=[
schema.Permissions.READ,
schema.Permissions.WRITE,
schema.Permissions.DELETE
], db=self.db)
users.set_permissions(admins, permissions=[
schema.Permissions.READ,
schema.Permissions.WRITE,
schema.Permissions.DELETE
], db=self.db)
self.add_member(admins, admin)
sys.modules[__name__] = ApplicationContext()

View File

@ -1,7 +1,6 @@
from dataclasses import dataclass, field
from functools import cached_property
from flask import g
from grung.types import BackReference, Collection, Pointer, Record
from ttfrog import schema
@ -14,7 +13,6 @@ class Form:
"""
The base Form controller for the web UI.
"""
record: Record
data: field(default_factory=dict)
@ -26,12 +24,12 @@ class Form:
def prepare(self):
for key, value in self.data.items():
# filter out fields that cannot be set by the user
if key in self.read_only:
continue
self.record[key] = value
self.record.author = g.user
return self.record
@ -40,7 +38,6 @@ class Page(Form):
"""
A form for creating and updating Page records.
"""
record: schema.Page
@cached_property
@ -53,7 +50,6 @@ class NPC(Page):
"""
A form for creating and updating Page records.
"""
record: schema.NPC
@ -62,7 +58,6 @@ class User(Page):
"""
A form for creating and updating Page records.
"""
record: schema.NPC
@ -71,5 +66,4 @@ class Group(Page):
"""
A form for creating and updating Page records.
"""
record: schema.NPC

View File

@ -1,10 +1,5 @@
from __future__ import annotations
from datetime import datetime
from typing import List
from grung.types import BackReference, Collection, DateTime, Field, Password, Pointer, Record, Timestamp
from tinydb import Query
from grung.types import BackReference, Collection, Field, Record, Pointer
class Page(Record):
@ -23,9 +18,7 @@ class Page(Record):
Collection("members", Page), # The pages that exist below this page's URI
BackReference("parent", value_type=Page), # The page that exists above this page's URI
Pointer("author", value_type=User), # The last user to touch the page.
DateTime("created"), # When the page was created
Timestamp("last_modified"), # The last time the page was modified.
Collection("acl", Permissions),
# DateTime("last_modified"), # The last time the page was modified.
]
def before_insert(self, db):
@ -36,10 +29,6 @@ class Page(Record):
"""
super().before_insert(db)
now = datetime.utcnow()
if not self.doc_id and self.created < now:
self.created = now
if not self.name and not self.title:
raise Exception("Must provide either a name or a title!")
if not self.name:
@ -55,7 +44,7 @@ class Page(Record):
correct URI. This ensures that if a page is moved from one collection to another, the URI is updated.
"""
super().after_insert(db)
if not hasattr(self, "members"):
if not hasattr(self, 'members'):
return
for child in self.members:
obj = BackReference.dereference(child, db)
@ -68,62 +57,24 @@ class Page(Record):
return page
return None
def set_permissions(self, entity: Entity, permissions: List, db) -> Record:
perms = db.save(Permissions(entity=entity, grants="".join(permissions)))
self.acl = [entry for entry in self.acl if entry.entity != entity] + [perms]
self = db.save(self)
return perms
class Entity(Page):
def has_permission(self, record: Record, requested: str, db) -> bool:
# if there's no ACL at all, the record is world-readable.
if not getattr(record, "acl", None):
return requested == Permissions.READ
# Use the grant specific to this entity, if there is one
for entry in record.acl:
if entry.entity.uid == self.uid:
return requested in entry.grants
for group in db.Group.search(Query()["members"].any([self.reference])):
if group.has_permission(record, requested, db):
return True
return False
def can_read(self, record: Record, db):
return self.has_permission(record, Permissions.READ, db)
def can_write(self, record: Record, db):
return self.has_permission(record, Permissions.WRITE, db)
def can_delete(self, record: Record, db):
return self.has_permission(record, Permissions.DELETE, db)
class User(Entity):
class User(Page):
"""
A website user, editable as a wiki page.
"""
def check_credentials(self, username: str, password: str) -> bool:
return username == self.name and self._metadata.fields["password"].compare(password, self.password)
@classmethod
def fields(cls):
return [
field
for field in [
*super().fields(),
Field("email", unique=True),
Password("password"),
]
if field.name != "members"
Field("email", unique=True)
] if field.name != "members"
]
class Group(Entity):
class Group(Page):
"""
A set of users, editable as a wiki page.
"""
@ -133,13 +84,3 @@ class NPC(Page):
"""
An NPC, editable as a wiki page.
"""
class Permissions(Record):
READ = "r"
WRITE = "w"
DELETE = "d"
@classmethod
def fields(cls):
return [*super().fields(), Pointer("entity", Entity), Field("grants")]

View File

@ -16,22 +16,15 @@
<span>&nbsp; / <a href="{{ app.config.VIEW_URI }}{{ uri }}">{{ name }}</a></span>
{% endfor %}
</nav>
{% if session['user_id'] == 1 %}
Welcome, {{ user['name'] }}. [ <a href="{{ url_for('login') }}">LOGIN</a> ]
{% else %}
Welcome, <a href="{{ url_for('User/' + user['name']) }}">{{ user['name'] }}</a>.
{% endif %}
<nav>
Menu:
<ul>
{% block menu %}{% endblock %}
</ul>
</nav>
Last Edited By: {{ page.author.name }}
<br>
<main>
{% for message in g.messages %}
{% for message in get_flashed_messages() %}
<div class="alert">
{{ message }}
</div>

View File

@ -1,7 +1,7 @@
from flask import Response, g, redirect, render_template, request, session, url_for
from flask import Response, render_template, request
from tinydb import where
from ttfrog import app, forms, schema
from ttfrog import app, schema, forms
STATIC = ["static"]
@ -16,11 +16,11 @@ def relative_uri(path: str = ""):
def get_parent(table: str, uri: str):
try:
parent_uri = uri.strip("/").rsplit("/", 1)[0]
parent_uri = uri.strip("/").split("/", -1)[0]
except IndexError:
return None
return get_page(parent_uri, table=table if "/" in parent_uri else "Page", create_okay=False)
return get_page(parent_uri, table=table if '/' in parent_uri else 'Page', create_okay=False)
def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
@ -32,28 +32,20 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
if table not in app.db.tables():
return None
page = app.db.table(table).get(where("uri") == uri, recurse=False)
if hasattr(page, "acl"):
acl = []
for pointer in page.acl:
table, uid = pointer.split("::")
acl += app.db.table(table).search(where("uid") == uid, recurse=False)
page.acl = acl
if not page:
matches = app.db.table(table).search(where("uri") == uri, recurse=False)
if not matches:
if not create_okay:
return None
parent = get_parent(table, uri)
if not g.user.can_read(parent, app.db):
return None
return getattr(schema, table)(name=uri.split("/")[-1], body="This page does not exist", parent=parent)
return getattr(schema, table)(
name=uri.split("/")[-1],
body="This page does not exist",
parent=get_parent(table, uri)
)
page = matches[0]
if not g.user.can_read(page, app.db):
return None
if hasattr(page, "members"):
if hasattr(page, 'members'):
subpages = []
for pointer in page.members:
for pointer in matches[0].members:
table, uid = pointer.split("::")
subpages += app.db.table(table).search(where("uid") == uid, recurse=False)
page.members = subpages
@ -64,7 +56,7 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
def rendered(page: schema.Record, template: str = "page.html"):
if not page:
return Response("Page not found", status=404)
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs(), user=session['user'], g=g)
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs())
def get_static(path):
@ -89,62 +81,19 @@ def index():
return rendered(get_page(create_okay=False))
def do_login(username: str, password: str) -> bool:
"""
Update the session with the user record if the credenteials are valid.
"""
if not (username and password):
return False
user = app.db.User.get(where("name") == "username")
if not user:
return False
if not user.check_credentials(username, password):
return False
app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.")
g.user = user
session["user_id"] = g.user.doc_id
session["user"] = dict(g.user.serialize())
return True
@app.web.route("/login", methods=["GET", "POST"])
def login():
app.web.session_interface.regenerate(session)
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if do_login(username, password):
return redirect(url_for("index"))
g.messages.append(f"Invalid login for {username}")
return rendered(schema.Page(name="Login", title="Please enter your login details"), "login.html")
@app.web.route("/logout")
def logout():
if "user_id" in session:
del session["user_id"]
del g.user
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["GET"])
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={"table": "Page"})
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={'table': 'Page'})
def view(table, path):
parent = get_parent(table, relative_uri())
print(parent)
return rendered(get_page(request.path, table=table, create_okay=(parent and parent.doc_id is not None)))
return rendered(get_page(request.path, table=table, create_okay=True))
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["POST"])
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["POST"], defaults={"table": "Page"})
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["POST"], defaults={'table': 'Page'})
def edit(table, path):
uri = relative_uri()
parent = get_parent(table, uri)
if not parent:
return Response("You cannot create a page at this location.", status=403)
return Response(f"Parent for {uri} does not exist.", status=403)
# get or create the docoument at this uri
page = get_page(uri, table=table, create_okay=True)
@ -160,15 +109,6 @@ def edit(table, path):
return rendered(app.add_member(parent, save_data))
@app.web.before_request
def before_request():
g.messages = []
user_id = session.get("user_id", 1)
g.user = app.db.User.get(doc_id=user_id)
session["user_id"] = user_id
session["user"] = dict(g.user.serialize())
@app.web.after_request
def add_header(r):
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"

View File

@ -2,21 +2,18 @@ import pytest
import ttfrog.app
from ttfrog import schema
from grung.db import GrungDB
from tinydb.storages import MemoryStorage
@pytest.fixture
def app():
fixture_db = GrungDB.with_schema(schema, storage=MemoryStorage)
ttfrog.app.load_config(defaults=None, IN_MEMORY_DB=1)
ttfrog.app.initialize(db=fixture_db, force=True)
ttfrog.app.initialize()
yield ttfrog.app
ttfrog.app.db.truncate()
ttfrog.app.db.close()
def test_create(app):
user = schema.User(name="john", email="john@foo", password="powerfulCat")
user = schema.User(name="john", email="john@foo")
assert user.uid
assert user._metadata.fields["uid"].unique
@ -37,29 +34,7 @@ def test_create(app):
assert after_update == john_something
assert before_update != after_update
def test_permissions(app):
john = app.db.save(schema.User(name="john", email="john@foo", password="powerfulCat"))
players = app.db.save(schema.Group(name="players", members=[john]))
notes = app.db.save(schema.Page(name="notes"))
# default read-only
assert players.can_read(notes, app.db)
assert not players.can_write(notes, app.db)
assert not players.can_delete(notes, app.db)
# set to rw, no delete
notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE], app.db)
assert players.can_read(notes, app.db)
assert players.can_write(notes, app.db)
assert not players.can_delete(notes, app.db)
# members of the group inherit group permissions
assert john.can_read(notes, app.db)
assert john.can_write(notes, app.db)
assert not john.can_delete(notes, app.db)
# permissions are the union of user + group permissions
notes.set_permissions(john, [schema.Permissions.DELETE], app.db)
assert not players.can_delete(notes, app.db)
assert john.can_delete(notes, app.db)
players = schema.Group(name="players", users=[john_something])
players = app.db.save(players)
players.users[0]["name"] = "fnord"
app.db.save(players)