import json from hashlib import scrypt from os import urandom from hmac import compare_digest from flask import Flask, render_template, request, redirect, url_for, session, abort from flask_pymongo import PyMongo from bson import ObjectId import bson.json_util as bson SCRYPT_PARAMS = {"n": 16384, "r": 8, "p": 1} app = Flask(__name__) with open("./config.json", "r", encoding="utf-8") as file: config = json.load(file) app.config["MONGO_URI"] = f"mongodb://{config['db']}/akasha" app.config["SECRET_KEY"] = config["fsk"] mongo = PyMongo( app, username="index", password=config["key"], authSource="akasha", authMechanism="SCRAM-SHA-256", ) del config if mongo.db is None: raise Exception("Unable to connect to database.") db = mongo.db def partition(predicate, iterable): trues = [] falses = [] for item in iterable: (trues if predicate(item) else falses).append(item) return falses, trues @app.context_processor def inject_data(): ddata = db.domains.find_one({"id": request.endpoint}) return dict(hue=ddata["quad"]) @app.before_request def apply_checks(): username = session.get("username") if request.endpoint is "static": return ddata = db.domains.find_one_or_404({"id": request.endpoint}) udata = db.users.find_one({"id": username}) if username else None if not can_access(ddata, udata): abort(401) def can_access(ddata, udata) -> bool: if udata: if ddata["quad"] not in udata["quad"]: return False if (not ddata["public"]) and ddata["id"] not in udata["perm"]: return False else: if ddata["quad"] != "ade": return False if not ddata["public"]: return False return True @app.route("/") def index(): db.welcomes.find_one({"id": "index_ade"}) return redirect(url_for("index_ade")) @app.route("/ade") def index_ade(): return render_main() @app.route("/bea") def index_bea(): return render_main() @app.route("/cam") def index_cam(): return render_main() @app.route("/des") def index_des(): return render_main() def render_main(): session["main"] = request.endpoint fpdata = db.frontpage.find_one({"id": request.endpoint}) dsdata = db.domains.find({"cat": {"$ne": None}}) udata = ( db.users.find_one({"id": session["username"]}) if session["username"] else None ) results = {"ade": {}, "bea": {}, "cam": {}, "des": {}} for ddata in dsdata: if not can_access(ddata, udata): continue if ddata["cat"] not in results[ddata["quad"]]: results[ddata["quad"]][ddata["cat"]] = {} results[ddata["quad"]][ddata["cat"]][ddata["name"]] = ddata["id"] return render_template( "index.html", fpdata=fpdata, domains=results, quad=(request.endpoint or "ade")[-3:], ) @app.post("/login") def login(): username = request.form.get("username") if username: password = request.form.get("password", "") udata = db.users.find_one({"id": username}) if not udata: abort(401) return hash = udata["hash"] salt = udata["salt"] computed = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS) if not compare_digest(hash, computed): abort(401) return session["username"] = username else: session["username"] = None return redirect(url_for(session.get("main", "index_ade"))) @app.route("/search") def search(): stype = request.args.get("search-type") query = request.args.get("search") print(stype, query) return redirect(url_for("index")) @app.route("/database", methods=["GET", "POST"]) def database(): cnames = db.list_collection_names() results = [] collection = None if request.method == "POST": collection = request.form.get("collection") if not collection: abort(400) query = request.form.get("query") if not query or not query.strip(): query = "{}" try: query = json.loads(query) except: abort(400) results = db[collection].find(query, {"_id": 1, "id": 1}) return render_template( "database.html", cnames=cnames, collection=collection, results=results ) @app.route("/database//", methods=["GET", "POST"]) def database_edit(collection, oid): if request.method == "POST": document = request.form.get("document") if not document: abort(400) document = bson.loads(document) if document and "_id" in document: del document["_id"] try: if not document: db[collection].delete_one({"_id": ObjectId(oid)}) print("DELETED") elif oid: db[collection].replace_one({"_id": ObjectId(oid)}, document) print("REPLACED") else: db[collection].insert_one(document) print("INSERTED") except: abort(500) return redirect(url_for("database")) result = db[collection].find_one_or_404({"_id": ObjectId(oid)}) name = result["id"] document = bson.dumps(result, indent=4) return render_template( "database_edit.html", collection=collection, oid=oid, name=name, document=document, ) @app.route("/tasks", methods=["GET", "POST"]) def tasks(): print(request.form, request.method) if request.method == "POST": act = request.form.get("action", "save") match act: case "save": oid = request.form.get("oid") if oid: db.tasks.update_one( {"_id": ObjectId(oid)}, { "$set": { "id": request.form.get("tid") or "000", "txt": request.form.get("content", ""), "pre": request.form.get("pre", "").split(), } }, ) else: db.tasks.insert_one( { "id": request.form.get("tid") or "000", "txt": request.form.get("content", ""), "pre": request.form.get("pre", "").split(), "done": False, } ) case "delete": oid = request.form.get("oid") db.tasks.delete_one({"_id": ObjectId(oid)}) case "mark": oid = request.form.get("oid") db.tasks.update_one( {"_id": ObjectId(oid)}, {"$set": {"done": request.form.get("done") == "False"}}, ) tasks = sorted(db.tasks.find(), key=lambda task: task["id"]) other_tasks, complete_tasks = partition(lambda task: task["done"], tasks) active_tasks, blocked_tasks = partition( lambda task: any( any(not ot["done"] for ot in tasks if ot["id"] == pre) for pre in task["pre"] ), other_tasks, ) return render_template( "tasks.html", active_tasks=active_tasks, blocked_tasks=blocked_tasks, complete_tasks=complete_tasks, )