255 lines
7.4 KiB
Python
255 lines
7.4 KiB
Python
import json
|
|
from hashlib import scrypt
|
|
from os import urandom
|
|
from hmac import compare_digest
|
|
|
|
from flask import Flask, render_template, request, redirect, url_for, session, abort
|
|
from flask_pymongo import PyMongo
|
|
from bson import ObjectId
|
|
import bson.json_util as bson
|
|
|
|
SCRYPT_PARAMS = {"n": 16384, "r": 8, "p": 1}
|
|
|
|
app = Flask(__name__)
|
|
with open("./config.json", "r", encoding="utf-8") as file:
|
|
config = json.load(file)
|
|
app.config["MONGO_URI"] = f"mongodb://{config['db']}/akasha"
|
|
app.config["SECRET_KEY"] = config["fsk"]
|
|
mongo = PyMongo(
|
|
app,
|
|
username="index",
|
|
password=config["key"],
|
|
authSource="akasha",
|
|
authMechanism="SCRAM-SHA-256",
|
|
)
|
|
del config
|
|
if mongo.db is None:
|
|
raise Exception("Unable to connect to database.")
|
|
db = mongo.db
|
|
|
|
|
|
def partition(predicate, iterable):
|
|
trues = []
|
|
falses = []
|
|
for item in iterable:
|
|
(trues if predicate(item) else falses).append(item)
|
|
return falses, trues
|
|
|
|
|
|
@app.context_processor
|
|
def inject_data():
|
|
ddata = db.domains.find_one({"id": request.endpoint})
|
|
return dict(hue=ddata["quad"])
|
|
|
|
|
|
@app.before_request
|
|
def apply_checks():
|
|
username = session.get("username")
|
|
if request.endpoint is "static":
|
|
return
|
|
ddata = db.domains.find_one_or_404({"id": request.endpoint})
|
|
udata = db.users.find_one({"id": username}) if username else None
|
|
if not can_access(ddata, udata):
|
|
abort(401)
|
|
|
|
|
|
def can_access(ddata, udata) -> bool:
|
|
if udata:
|
|
if ddata["quad"] not in udata["quad"]:
|
|
return False
|
|
if (not ddata["public"]) and ddata["id"] not in udata["perm"]:
|
|
return False
|
|
else:
|
|
if ddata["quad"] != "ade":
|
|
return False
|
|
if not ddata["public"]:
|
|
return False
|
|
return True
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
db.welcomes.find_one({"id": "index_ade"})
|
|
return redirect(url_for("index_ade"))
|
|
|
|
|
|
@app.route("/ade")
|
|
def index_ade():
|
|
return render_main()
|
|
|
|
|
|
@app.route("/bea")
|
|
def index_bea():
|
|
return render_main()
|
|
|
|
|
|
@app.route("/cam")
|
|
def index_cam():
|
|
return render_main()
|
|
|
|
|
|
@app.route("/des")
|
|
def index_des():
|
|
return render_main()
|
|
|
|
|
|
def render_main():
|
|
session["main"] = request.endpoint
|
|
fpdata = db.frontpage.find_one({"id": request.endpoint})
|
|
dsdata = db.domains.find({"cat": {"$ne": None}})
|
|
udata = (
|
|
db.users.find_one({"id": session["username"]}) if session["username"] else None
|
|
)
|
|
results = {"ade": {}, "bea": {}, "cam": {}, "des": {}}
|
|
for ddata in dsdata:
|
|
if not can_access(ddata, udata):
|
|
continue
|
|
if ddata["cat"] not in results[ddata["quad"]]:
|
|
results[ddata["quad"]][ddata["cat"]] = {}
|
|
results[ddata["quad"]][ddata["cat"]][ddata["name"]] = ddata["id"]
|
|
return render_template(
|
|
"index.html",
|
|
fpdata=fpdata,
|
|
domains=results,
|
|
quad=(request.endpoint or "ade")[-3:],
|
|
)
|
|
|
|
|
|
@app.post("/login")
|
|
def login():
|
|
username = request.form.get("username")
|
|
if username:
|
|
password = request.form.get("password", "")
|
|
udata = db.users.find_one({"id": username})
|
|
if not udata:
|
|
abort(401)
|
|
return
|
|
hash = udata["hash"]
|
|
salt = udata["salt"]
|
|
computed = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS)
|
|
if not compare_digest(hash, computed):
|
|
abort(401)
|
|
return
|
|
session["username"] = username
|
|
else:
|
|
session["username"] = None
|
|
return redirect(url_for(session.get("main", "index_ade")))
|
|
|
|
|
|
@app.route("/search")
|
|
def search():
|
|
stype = request.args.get("search-type")
|
|
query = request.args.get("search")
|
|
print(stype, query)
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@app.route("/database", methods=["GET", "POST"])
|
|
def database():
|
|
cnames = db.list_collection_names()
|
|
results = []
|
|
collection = None
|
|
if request.method == "POST":
|
|
collection = request.form.get("collection")
|
|
if not collection:
|
|
abort(400)
|
|
query = request.form.get("query")
|
|
if not query or not query.strip():
|
|
query = "{}"
|
|
try:
|
|
query = json.loads(query)
|
|
except:
|
|
abort(400)
|
|
results = db[collection].find(query, {"_id": 1, "id": 1})
|
|
return render_template(
|
|
"database.html", cnames=cnames, collection=collection, results=results
|
|
)
|
|
|
|
|
|
@app.route("/database/<collection>/<oid>", methods=["GET", "POST"])
|
|
def database_edit(collection, oid):
|
|
if request.method == "POST":
|
|
document = request.form.get("document")
|
|
if not document:
|
|
abort(400)
|
|
document = bson.loads(document)
|
|
if document and "_id" in document:
|
|
del document["_id"]
|
|
try:
|
|
if not document:
|
|
db[collection].delete_one({"_id": ObjectId(oid)})
|
|
print("DELETED")
|
|
elif oid:
|
|
db[collection].replace_one({"_id": ObjectId(oid)}, document)
|
|
print("REPLACED")
|
|
else:
|
|
db[collection].insert_one(document)
|
|
print("INSERTED")
|
|
except:
|
|
abort(500)
|
|
return redirect(url_for("database"))
|
|
result = db[collection].find_one_or_404({"_id": ObjectId(oid)})
|
|
name = result["id"]
|
|
document = bson.dumps(result, indent=4)
|
|
return render_template(
|
|
"database_edit.html",
|
|
collection=collection,
|
|
oid=oid,
|
|
name=name,
|
|
document=document,
|
|
)
|
|
|
|
|
|
@app.route("/tasks", methods=["GET", "POST"])
|
|
def tasks():
|
|
print(request.form, request.method)
|
|
if request.method == "POST":
|
|
act = request.form.get("action", "save")
|
|
match act:
|
|
case "save":
|
|
oid = request.form.get("oid")
|
|
if oid:
|
|
db.tasks.update_one(
|
|
{"_id": ObjectId(oid)},
|
|
{
|
|
"$set": {
|
|
"id": request.form.get("tid") or "000",
|
|
"txt": request.form.get("content", ""),
|
|
"pre": request.form.get("pre", "").split(),
|
|
}
|
|
},
|
|
)
|
|
else:
|
|
db.tasks.insert_one(
|
|
{
|
|
"id": request.form.get("tid") or "000",
|
|
"txt": request.form.get("content", ""),
|
|
"pre": request.form.get("pre", "").split(),
|
|
"done": False,
|
|
}
|
|
)
|
|
case "delete":
|
|
oid = request.form.get("oid")
|
|
db.tasks.delete_one({"_id": ObjectId(oid)})
|
|
case "mark":
|
|
oid = request.form.get("oid")
|
|
db.tasks.update_one(
|
|
{"_id": ObjectId(oid)},
|
|
{"$set": {"done": request.form.get("done") == "False"}},
|
|
)
|
|
tasks = sorted(db.tasks.find(), key=lambda task: task["id"])
|
|
other_tasks, complete_tasks = partition(lambda task: task["done"], tasks)
|
|
active_tasks, blocked_tasks = partition(
|
|
lambda task: any(
|
|
any(not ot["done"] for ot in tasks if ot["id"] == pre)
|
|
for pre in task["pre"]
|
|
),
|
|
other_tasks,
|
|
)
|
|
return render_template(
|
|
"tasks.html",
|
|
active_tasks=active_tasks,
|
|
blocked_tasks=blocked_tasks,
|
|
complete_tasks=complete_tasks,
|
|
)
|