Compare commits
3 Commits
c9927656ce
...
0e8fd9a1b0
Author | SHA1 | Date | |
---|---|---|---|
![]() |
0e8fd9a1b0 | ||
![]() |
36006ceeea | ||
![]() |
3854a877bf |
|
@ -19,6 +19,7 @@ pyyaml = "^6.0.2"
|
||||||
nanoid = "^2.0.0"
|
nanoid = "^2.0.0"
|
||||||
# grung-db = {git = "https://git.evilchi.li/evilchili/grung-db.git"}
|
# grung-db = {git = "https://git.evilchi.li/evilchili/grung-db.git"}
|
||||||
grung-db = {git = "file:///home/greg/dev/grung-db/"}
|
grung-db = {git = "file:///home/greg/dev/grung-db/"}
|
||||||
|
flask-session = "^0.8.0"
|
||||||
|
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
pytest = "*"
|
pytest = "*"
|
||||||
|
|
|
@ -5,8 +5,8 @@ from types import SimpleNamespace
|
||||||
|
|
||||||
from dotenv import dotenv_values
|
from dotenv import dotenv_values
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
|
from flask_session import Session
|
||||||
from grung.db import GrungDB
|
from grung.db import GrungDB
|
||||||
from grung.exceptions import UniqueConstraintError
|
|
||||||
from tinydb.storages import MemoryStorage
|
from tinydb.storages import MemoryStorage
|
||||||
|
|
||||||
from ttfrog import schema
|
from ttfrog import schema
|
||||||
|
@ -78,6 +78,7 @@ VIEW_URI=/
|
||||||
config=config_file,
|
config=config_file,
|
||||||
data_root=data_root,
|
data_root=data_root,
|
||||||
database=data_root / f"{self.config.NAME}.json",
|
database=data_root / f"{self.config.NAME}.json",
|
||||||
|
sessions=data_root / "session_cache",
|
||||||
)
|
)
|
||||||
|
|
||||||
def initialize(self, db: GrungDB = None, force: bool = False) -> None:
|
def initialize(self, db: GrungDB = None, force: bool = False) -> None:
|
||||||
|
@ -85,7 +86,9 @@ VIEW_URI=/
|
||||||
Instantiate both the database and the flask application.
|
Instantiate both the database and the flask application.
|
||||||
"""
|
"""
|
||||||
if force or not self._initialized:
|
if force or not self._initialized:
|
||||||
if self.config.IN_MEMORY_DB:
|
if db:
|
||||||
|
self.db = db
|
||||||
|
elif self.config.IN_MEMORY_DB:
|
||||||
self.db = GrungDB.with_schema(schema, storage=MemoryStorage)
|
self.db = GrungDB.with_schema(schema, storage=MemoryStorage)
|
||||||
else:
|
else:
|
||||||
self.db = GrungDB.with_schema(
|
self.db = GrungDB.with_schema(
|
||||||
|
@ -98,6 +101,10 @@ VIEW_URI=/
|
||||||
self.web.config["SECRET_KEY"] = self.config.SECRET_KEY
|
self.web.config["SECRET_KEY"] = self.config.SECRET_KEY
|
||||||
self.web.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
|
self.web.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
|
||||||
self.web.config["DEBUG"] = True
|
self.web.config["DEBUG"] = True
|
||||||
|
self.web.config["SESSION_TYPE"] = "filesystem"
|
||||||
|
self.web.config["SESSION_REFRESH_EACH_REQUEST"] = True
|
||||||
|
self.web.config["SESSION_FILE_DIR"] = self.path.sessions
|
||||||
|
Session(self.web)
|
||||||
|
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
|
|
||||||
|
@ -128,11 +135,28 @@ VIEW_URI=/
|
||||||
self.add_member(npcs, sabetha)
|
self.add_member(npcs, sabetha)
|
||||||
self.add_member(npcs, johns)
|
self.add_member(npcs, johns)
|
||||||
|
|
||||||
|
guest = self.add_member(users, schema.User(name="guest"))
|
||||||
|
|
||||||
# create the admin user and admins group
|
# create the admin user and admins group
|
||||||
admin = self.add_member(users, schema.User(name=self.config.ADMIN_USERNAME, email=self.config.ADMIN_EMAIL))
|
admin = self.add_member(
|
||||||
|
users, schema.User(name=self.config.ADMIN_USERNAME, password="fnord", email=self.config.ADMIN_EMAIL)
|
||||||
|
)
|
||||||
admins = self.db.save(schema.Group(name="administrators"))
|
admins = self.db.save(schema.Group(name="administrators"))
|
||||||
admins = self.add_member(groups, admins)
|
admins = self.add_member(groups, admins)
|
||||||
self.add_member(admins, admin)
|
admin = self.add_member(admins, admin)
|
||||||
|
|
||||||
|
groups.set_permissions(admins, permissions=[
|
||||||
|
schema.Permissions.READ,
|
||||||
|
schema.Permissions.WRITE,
|
||||||
|
schema.Permissions.DELETE
|
||||||
|
], db=self.db)
|
||||||
|
|
||||||
|
users.set_permissions(admins, permissions=[
|
||||||
|
schema.Permissions.READ,
|
||||||
|
schema.Permissions.WRITE,
|
||||||
|
schema.Permissions.DELETE
|
||||||
|
], db=self.db)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
sys.modules[__name__] = ApplicationContext()
|
sys.modules[__name__] = ApplicationContext()
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
|
|
||||||
|
from flask import g
|
||||||
from grung.types import BackReference, Collection, Pointer, Record
|
from grung.types import BackReference, Collection, Pointer, Record
|
||||||
|
|
||||||
from ttfrog import schema
|
from ttfrog import schema
|
||||||
|
@ -13,6 +14,7 @@ class Form:
|
||||||
"""
|
"""
|
||||||
The base Form controller for the web UI.
|
The base Form controller for the web UI.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
record: Record
|
record: Record
|
||||||
data: field(default_factory=dict)
|
data: field(default_factory=dict)
|
||||||
|
|
||||||
|
@ -24,12 +26,12 @@ class Form:
|
||||||
|
|
||||||
def prepare(self):
|
def prepare(self):
|
||||||
for key, value in self.data.items():
|
for key, value in self.data.items():
|
||||||
|
|
||||||
# filter out fields that cannot be set by the user
|
# filter out fields that cannot be set by the user
|
||||||
if key in self.read_only:
|
if key in self.read_only:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.record[key] = value
|
self.record[key] = value
|
||||||
|
self.record.author = g.user
|
||||||
return self.record
|
return self.record
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,6 +40,7 @@ class Page(Form):
|
||||||
"""
|
"""
|
||||||
A form for creating and updating Page records.
|
A form for creating and updating Page records.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
record: schema.Page
|
record: schema.Page
|
||||||
|
|
||||||
@cached_property
|
@cached_property
|
||||||
|
@ -50,6 +53,7 @@ class NPC(Page):
|
||||||
"""
|
"""
|
||||||
A form for creating and updating Page records.
|
A form for creating and updating Page records.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
record: schema.NPC
|
record: schema.NPC
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,6 +62,7 @@ class User(Page):
|
||||||
"""
|
"""
|
||||||
A form for creating and updating Page records.
|
A form for creating and updating Page records.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
record: schema.NPC
|
record: schema.NPC
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,4 +71,5 @@ class Group(Page):
|
||||||
"""
|
"""
|
||||||
A form for creating and updating Page records.
|
A form for creating and updating Page records.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
record: schema.NPC
|
record: schema.NPC
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
from grung.types import BackReference, Collection, Field, Record, Pointer
|
from datetime import datetime
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from grung.types import BackReference, Collection, DateTime, Field, Password, Pointer, Record, Timestamp
|
||||||
|
from tinydb import Query
|
||||||
|
|
||||||
|
|
||||||
class Page(Record):
|
class Page(Record):
|
||||||
|
@ -18,7 +23,9 @@ class Page(Record):
|
||||||
Collection("members", Page), # The pages that exist below this page's URI
|
Collection("members", Page), # The pages that exist below this page's URI
|
||||||
BackReference("parent", value_type=Page), # The page that exists above this page's URI
|
BackReference("parent", value_type=Page), # The page that exists above this page's URI
|
||||||
Pointer("author", value_type=User), # The last user to touch the page.
|
Pointer("author", value_type=User), # The last user to touch the page.
|
||||||
# DateTime("last_modified"), # The last time the page was modified.
|
DateTime("created"), # When the page was created
|
||||||
|
Timestamp("last_modified"), # The last time the page was modified.
|
||||||
|
Collection("acl", Permissions),
|
||||||
]
|
]
|
||||||
|
|
||||||
def before_insert(self, db):
|
def before_insert(self, db):
|
||||||
|
@ -29,6 +36,10 @@ class Page(Record):
|
||||||
"""
|
"""
|
||||||
super().before_insert(db)
|
super().before_insert(db)
|
||||||
|
|
||||||
|
now = datetime.utcnow()
|
||||||
|
if not self.doc_id and self.created < now:
|
||||||
|
self.created = now
|
||||||
|
|
||||||
if not self.name and not self.title:
|
if not self.name and not self.title:
|
||||||
raise Exception("Must provide either a name or a title!")
|
raise Exception("Must provide either a name or a title!")
|
||||||
if not self.name:
|
if not self.name:
|
||||||
|
@ -44,7 +55,7 @@ class Page(Record):
|
||||||
correct URI. This ensures that if a page is moved from one collection to another, the URI is updated.
|
correct URI. This ensures that if a page is moved from one collection to another, the URI is updated.
|
||||||
"""
|
"""
|
||||||
super().after_insert(db)
|
super().after_insert(db)
|
||||||
if not hasattr(self, 'members'):
|
if not hasattr(self, "members"):
|
||||||
return
|
return
|
||||||
for child in self.members:
|
for child in self.members:
|
||||||
obj = BackReference.dereference(child, db)
|
obj = BackReference.dereference(child, db)
|
||||||
|
@ -57,24 +68,62 @@ class Page(Record):
|
||||||
return page
|
return page
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def set_permissions(self, entity: Entity, permissions: List, db) -> Record:
|
||||||
|
perms = db.save(Permissions(entity=entity, grants="".join(permissions)))
|
||||||
|
self.acl = [entry for entry in self.acl if entry.entity != entity] + [perms]
|
||||||
|
self = db.save(self)
|
||||||
|
return perms
|
||||||
|
|
||||||
class User(Page):
|
|
||||||
|
class Entity(Page):
|
||||||
|
def has_permission(self, record: Record, requested: str, db) -> bool:
|
||||||
|
|
||||||
|
# if there's no ACL at all, the record is world-readable.
|
||||||
|
if not getattr(record, "acl", None):
|
||||||
|
return requested == Permissions.READ
|
||||||
|
|
||||||
|
# Use the grant specific to this entity, if there is one
|
||||||
|
for entry in record.acl:
|
||||||
|
if entry.entity.uid == self.uid:
|
||||||
|
return requested in entry.grants
|
||||||
|
|
||||||
|
for group in db.Group.search(Query()["members"].any([self.reference])):
|
||||||
|
if group.has_permission(record, requested, db):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def can_read(self, record: Record, db):
|
||||||
|
return self.has_permission(record, Permissions.READ, db)
|
||||||
|
|
||||||
|
def can_write(self, record: Record, db):
|
||||||
|
return self.has_permission(record, Permissions.WRITE, db)
|
||||||
|
|
||||||
|
def can_delete(self, record: Record, db):
|
||||||
|
return self.has_permission(record, Permissions.DELETE, db)
|
||||||
|
|
||||||
|
|
||||||
|
class User(Entity):
|
||||||
"""
|
"""
|
||||||
A website user, editable as a wiki page.
|
A website user, editable as a wiki page.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def check_credentials(self, username: str, password: str) -> bool:
|
||||||
|
return username == self.name and self._metadata.fields["password"].compare(password, self.password)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fields(cls):
|
def fields(cls):
|
||||||
return [
|
return [
|
||||||
field
|
field
|
||||||
for field in [
|
for field in [
|
||||||
*super().fields(),
|
*super().fields(),
|
||||||
Field("email", unique=True)
|
Field("email", unique=True),
|
||||||
] if field.name != "members"
|
Password("password"),
|
||||||
|
]
|
||||||
|
if field.name != "members"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class Group(Page):
|
class Group(Entity):
|
||||||
"""
|
"""
|
||||||
A set of users, editable as a wiki page.
|
A set of users, editable as a wiki page.
|
||||||
"""
|
"""
|
||||||
|
@ -84,3 +133,13 @@ class NPC(Page):
|
||||||
"""
|
"""
|
||||||
An NPC, editable as a wiki page.
|
An NPC, editable as a wiki page.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class Permissions(Record):
|
||||||
|
READ = "r"
|
||||||
|
WRITE = "w"
|
||||||
|
DELETE = "d"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fields(cls):
|
||||||
|
return [*super().fields(), Pointer("entity", Entity), Field("grants")]
|
||||||
|
|
|
@ -16,15 +16,22 @@
|
||||||
<span> / <a href="{{ app.config.VIEW_URI }}{{ uri }}">{{ name }}</a></span>
|
<span> / <a href="{{ app.config.VIEW_URI }}{{ uri }}">{{ name }}</a></span>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</nav>
|
</nav>
|
||||||
|
{% if session['user_id'] == 1 %}
|
||||||
|
Welcome, {{ user['name'] }}. [ <a href="{{ url_for('login') }}">LOGIN</a> ]
|
||||||
|
{% else %}
|
||||||
|
Welcome, <a href="{{ url_for('User/' + user['name']) }}">{{ user['name'] }}</a>.
|
||||||
|
{% endif %}
|
||||||
<nav>
|
<nav>
|
||||||
Menu:
|
Menu:
|
||||||
<ul>
|
<ul>
|
||||||
{% block menu %}{% endblock %}
|
{% block menu %}{% endblock %}
|
||||||
</ul>
|
</ul>
|
||||||
</nav>
|
</nav>
|
||||||
|
Last Edited By: {{ page.author.name }}
|
||||||
|
<br>
|
||||||
|
|
||||||
<main>
|
<main>
|
||||||
{% for message in get_flashed_messages() %}
|
{% for message in g.messages %}
|
||||||
<div class="alert">
|
<div class="alert">
|
||||||
{{ message }}
|
{{ message }}
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
from flask import Response, render_template, request
|
from flask import Response, g, redirect, render_template, request, session, url_for
|
||||||
from tinydb import where
|
from tinydb import where
|
||||||
|
|
||||||
from ttfrog import app, schema, forms
|
from ttfrog import app, forms, schema
|
||||||
|
|
||||||
STATIC = ["static"]
|
STATIC = ["static"]
|
||||||
|
|
||||||
|
@ -16,11 +16,11 @@ def relative_uri(path: str = ""):
|
||||||
|
|
||||||
def get_parent(table: str, uri: str):
|
def get_parent(table: str, uri: str):
|
||||||
try:
|
try:
|
||||||
parent_uri = uri.strip("/").split("/", -1)[0]
|
parent_uri = uri.strip("/").rsplit("/", 1)[0]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return get_page(parent_uri, table=table if '/' in parent_uri else 'Page', create_okay=False)
|
return get_page(parent_uri, table=table if "/" in parent_uri else "Page", create_okay=False)
|
||||||
|
|
||||||
|
|
||||||
def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
|
def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
|
||||||
|
@ -32,20 +32,28 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
|
||||||
if table not in app.db.tables():
|
if table not in app.db.tables():
|
||||||
return None
|
return None
|
||||||
|
|
||||||
matches = app.db.table(table).search(where("uri") == uri, recurse=False)
|
page = app.db.table(table).get(where("uri") == uri, recurse=False)
|
||||||
if not matches:
|
if hasattr(page, "acl"):
|
||||||
|
acl = []
|
||||||
|
for pointer in page.acl:
|
||||||
|
table, uid = pointer.split("::")
|
||||||
|
acl += app.db.table(table).search(where("uid") == uid, recurse=False)
|
||||||
|
page.acl = acl
|
||||||
|
|
||||||
|
if not page:
|
||||||
if not create_okay:
|
if not create_okay:
|
||||||
return None
|
return None
|
||||||
return getattr(schema, table)(
|
|
||||||
name=uri.split("/")[-1],
|
|
||||||
body="This page does not exist",
|
|
||||||
parent = get_parent(table, uri)
|
parent = get_parent(table, uri)
|
||||||
)
|
if not g.user.can_read(parent, app.db):
|
||||||
page = matches[0]
|
return None
|
||||||
|
return getattr(schema, table)(name=uri.split("/")[-1], body="This page does not exist", parent=parent)
|
||||||
|
|
||||||
if hasattr(page, 'members'):
|
if not g.user.can_read(page, app.db):
|
||||||
|
return None
|
||||||
|
|
||||||
|
if hasattr(page, "members"):
|
||||||
subpages = []
|
subpages = []
|
||||||
for pointer in matches[0].members:
|
for pointer in page.members:
|
||||||
table, uid = pointer.split("::")
|
table, uid = pointer.split("::")
|
||||||
subpages += app.db.table(table).search(where("uid") == uid, recurse=False)
|
subpages += app.db.table(table).search(where("uid") == uid, recurse=False)
|
||||||
page.members = subpages
|
page.members = subpages
|
||||||
|
@ -56,7 +64,7 @@ def get_page(path: str = "", table: str = "Page", create_okay: bool = False):
|
||||||
def rendered(page: schema.Record, template: str = "page.html"):
|
def rendered(page: schema.Record, template: str = "page.html"):
|
||||||
if not page:
|
if not page:
|
||||||
return Response("Page not found", status=404)
|
return Response("Page not found", status=404)
|
||||||
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs())
|
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs(), user=session['user'], g=g)
|
||||||
|
|
||||||
|
|
||||||
def get_static(path):
|
def get_static(path):
|
||||||
|
@ -81,19 +89,62 @@ def index():
|
||||||
return rendered(get_page(create_okay=False))
|
return rendered(get_page(create_okay=False))
|
||||||
|
|
||||||
|
|
||||||
|
def do_login(username: str, password: str) -> bool:
|
||||||
|
"""
|
||||||
|
Update the session with the user record if the credenteials are valid.
|
||||||
|
"""
|
||||||
|
if not (username and password):
|
||||||
|
return False
|
||||||
|
|
||||||
|
user = app.db.User.get(where("name") == "username")
|
||||||
|
if not user:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not user.check_credentials(username, password):
|
||||||
|
return False
|
||||||
|
|
||||||
|
app.web.logger.debug(f"Session for {user.name} ({user.doc_id}) started.")
|
||||||
|
g.user = user
|
||||||
|
session["user_id"] = g.user.doc_id
|
||||||
|
session["user"] = dict(g.user.serialize())
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@app.web.route("/login", methods=["GET", "POST"])
|
||||||
|
def login():
|
||||||
|
app.web.session_interface.regenerate(session)
|
||||||
|
|
||||||
|
if request.method == "POST":
|
||||||
|
username = request.form.get("username")
|
||||||
|
password = request.form.get("password")
|
||||||
|
if do_login(username, password):
|
||||||
|
return redirect(url_for("index"))
|
||||||
|
g.messages.append(f"Invalid login for {username}")
|
||||||
|
return rendered(schema.Page(name="Login", title="Please enter your login details"), "login.html")
|
||||||
|
|
||||||
|
|
||||||
|
@app.web.route("/logout")
|
||||||
|
def logout():
|
||||||
|
if "user_id" in session:
|
||||||
|
del session["user_id"]
|
||||||
|
del g.user
|
||||||
|
|
||||||
|
|
||||||
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["GET"])
|
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["GET"])
|
||||||
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={'table': 'Page'})
|
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["GET"], defaults={"table": "Page"})
|
||||||
def view(table, path):
|
def view(table, path):
|
||||||
return rendered(get_page(request.path, table=table, create_okay=True))
|
parent = get_parent(table, relative_uri())
|
||||||
|
print(parent)
|
||||||
|
return rendered(get_page(request.path, table=table, create_okay=(parent and parent.doc_id is not None)))
|
||||||
|
|
||||||
|
|
||||||
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["POST"])
|
@app.web.route(f"{app.config.VIEW_URI}/<path:table>/<path:path>", methods=["POST"])
|
||||||
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["POST"], defaults={'table': 'Page'})
|
@app.web.route(f"{app.config.VIEW_URI}/<path:path>", methods=["POST"], defaults={"table": "Page"})
|
||||||
def edit(table, path):
|
def edit(table, path):
|
||||||
uri = relative_uri()
|
uri = relative_uri()
|
||||||
parent = get_parent(table, uri)
|
parent = get_parent(table, uri)
|
||||||
if not parent:
|
if not parent:
|
||||||
return Response(f"Parent for {uri} does not exist.", status=403)
|
return Response("You cannot create a page at this location.", status=403)
|
||||||
|
|
||||||
# get or create the docoument at this uri
|
# get or create the docoument at this uri
|
||||||
page = get_page(uri, table=table, create_okay=True)
|
page = get_page(uri, table=table, create_okay=True)
|
||||||
|
@ -109,6 +160,15 @@ def edit(table, path):
|
||||||
return rendered(app.add_member(parent, save_data))
|
return rendered(app.add_member(parent, save_data))
|
||||||
|
|
||||||
|
|
||||||
|
@app.web.before_request
|
||||||
|
def before_request():
|
||||||
|
g.messages = []
|
||||||
|
user_id = session.get("user_id", 1)
|
||||||
|
g.user = app.db.User.get(doc_id=user_id)
|
||||||
|
session["user_id"] = user_id
|
||||||
|
session["user"] = dict(g.user.serialize())
|
||||||
|
|
||||||
|
|
||||||
@app.web.after_request
|
@app.web.after_request
|
||||||
def add_header(r):
|
def add_header(r):
|
||||||
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"
|
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"
|
||||||
|
|
|
@ -2,18 +2,21 @@ import pytest
|
||||||
|
|
||||||
import ttfrog.app
|
import ttfrog.app
|
||||||
from ttfrog import schema
|
from ttfrog import schema
|
||||||
|
from grung.db import GrungDB
|
||||||
|
from tinydb.storages import MemoryStorage
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def app():
|
def app():
|
||||||
|
fixture_db = GrungDB.with_schema(schema, storage=MemoryStorage)
|
||||||
ttfrog.app.load_config(defaults=None, IN_MEMORY_DB=1)
|
ttfrog.app.load_config(defaults=None, IN_MEMORY_DB=1)
|
||||||
ttfrog.app.initialize()
|
ttfrog.app.initialize(db=fixture_db, force=True)
|
||||||
yield ttfrog.app
|
yield ttfrog.app
|
||||||
ttfrog.app.db.close()
|
ttfrog.app.db.truncate()
|
||||||
|
|
||||||
|
|
||||||
def test_create(app):
|
def test_create(app):
|
||||||
user = schema.User(name="john", email="john@foo")
|
user = schema.User(name="john", email="john@foo", password="powerfulCat")
|
||||||
assert user.uid
|
assert user.uid
|
||||||
assert user._metadata.fields["uid"].unique
|
assert user._metadata.fields["uid"].unique
|
||||||
|
|
||||||
|
@ -34,7 +37,29 @@ def test_create(app):
|
||||||
assert after_update == john_something
|
assert after_update == john_something
|
||||||
assert before_update != after_update
|
assert before_update != after_update
|
||||||
|
|
||||||
players = schema.Group(name="players", users=[john_something])
|
|
||||||
players = app.db.save(players)
|
def test_permissions(app):
|
||||||
players.users[0]["name"] = "fnord"
|
john = app.db.save(schema.User(name="john", email="john@foo", password="powerfulCat"))
|
||||||
app.db.save(players)
|
players = app.db.save(schema.Group(name="players", members=[john]))
|
||||||
|
notes = app.db.save(schema.Page(name="notes"))
|
||||||
|
|
||||||
|
# default read-only
|
||||||
|
assert players.can_read(notes, app.db)
|
||||||
|
assert not players.can_write(notes, app.db)
|
||||||
|
assert not players.can_delete(notes, app.db)
|
||||||
|
|
||||||
|
# set to rw, no delete
|
||||||
|
notes.set_permissions(players, [schema.Permissions.READ, schema.Permissions.WRITE], app.db)
|
||||||
|
assert players.can_read(notes, app.db)
|
||||||
|
assert players.can_write(notes, app.db)
|
||||||
|
assert not players.can_delete(notes, app.db)
|
||||||
|
|
||||||
|
# members of the group inherit group permissions
|
||||||
|
assert john.can_read(notes, app.db)
|
||||||
|
assert john.can_write(notes, app.db)
|
||||||
|
assert not john.can_delete(notes, app.db)
|
||||||
|
|
||||||
|
# permissions are the union of user + group permissions
|
||||||
|
notes.set_permissions(john, [schema.Permissions.DELETE], app.db)
|
||||||
|
assert not players.can_delete(notes, app.db)
|
||||||
|
assert john.can_delete(notes, app.db)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user