Files
index/app.py
2026-04-13 11:01:20 -05:00

193 lines
5.2 KiB
Python

import json
from hashlib import scrypt
from os import urandom
from hmac import compare_digest
from flask import Flask, render_template, request, redirect, url_for, session, abort
from flask_pymongo import PyMongo
from bson import ObjectId
import bson.json_util as bson
SCRYPT_PARAMS = {"n": 16384, "r": 8, "p": 1}
app = Flask(__name__)
with open("./config.json", "r", encoding="utf-8") as file:
config = json.load(file)
app.config["MONGO_URI"] = f"mongodb://{config['db']}/akasha"
app.config["SECRET_KEY"] = config["fsk"]
mongo = PyMongo(
app,
username="index",
password=config["key"],
authSource="akasha",
authMechanism="SCRAM-SHA-256",
)
del config
if mongo.db is None:
raise Exception("Unable to connect to database.")
db = mongo.db
@app.context_processor
def inject_data():
ddata = db.domains.find_one({"id": request.endpoint})
return dict(hue=ddata["quad"])
@app.before_request
def apply_checks():
username = session.get("username")
if request.endpoint is "static":
return
ddata = db.domains.find_one_or_404({"id": request.endpoint})
udata = db.users.find_one({"id": username}) if username else None
if not can_access(ddata, udata):
abort(401)
def can_access(ddata, udata) -> bool:
if udata:
if ddata["quad"] not in udata["quad"]:
return False
if (not ddata["public"]) and ddata["id"] not in udata["perm"]:
return False
else:
if ddata["quad"] != "ade":
return False
if not ddata["public"]:
return False
return True
@app.route("/")
def index():
db.welcomes.find_one({"id": "index_ade"})
return redirect(url_for("index_ade"))
@app.route("/ade")
def index_ade():
return render_main()
@app.route("/bea")
def index_bea():
return render_main()
@app.route("/cam")
def index_cam():
return render_main()
@app.route("/des")
def index_des():
return render_main()
def render_main():
session["main"] = request.endpoint
wdata = db.welcomes.find_one({"id": request.endpoint})
dsdata = db.domains.find({"cat": {"$ne": None}})
udata = (
db.users.find_one({"id": session["username"]}) if session["username"] else None
)
results = {"ade": {}, "bea": {}, "cam": {}, "des": {}}
for ddata in dsdata:
if not can_access(ddata, udata):
continue
if ddata["cat"] not in results[ddata["quad"]]:
results[ddata["quad"]][ddata["cat"]] = {}
results[ddata["quad"]][ddata["cat"]][ddata["name"]] = ddata["id"]
return render_template(
"index.html",
wdata=wdata,
domains=results,
quad=(request.endpoint or "ade")[-3:],
)
@app.post("/login")
def login():
username = request.form.get("username")
if username:
password = request.form.get("password", "")
udata = db.users.find_one({"id": username})
if not udata:
abort(401)
return
hash = udata["hash"]
salt = udata["salt"]
computed = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS)
if not compare_digest(hash, computed):
abort(401)
return
session["username"] = username
else:
session["username"] = None
return redirect(url_for(session.get("main", "index_ade")))
@app.route("/search")
def search():
stype = request.args.get("search-type")
query = request.args.get("search")
print(stype, query)
return redirect(url_for("index"))
@app.route("/database", methods=["GET", "POST"])
def database():
cnames = db.list_collection_names()
results = []
collection = None
if request.method == "POST":
collection = request.form.get("collection")
if not collection:
abort(400)
query = request.form.get("query")
if not query or not query.strip():
query = "{}"
try:
query = json.loads(query)
except:
abort(400)
results = db[collection].find(query, {"_id": 1, "id": 1})
return render_template(
"database.html", cnames=cnames, collection=collection, results=results
)
@app.route("/database/<collection>/<oid>", methods=["GET", "POST"])
def database_edit(collection, oid):
if request.method == "POST":
document = request.form.get("document")
if not document:
abort(400)
document = bson.loads(document)
if document and "_id" in document:
del document["_id"]
try:
if not document:
db[collection].delete_one({"_id": ObjectId(oid)})
print("DELETED")
elif oid:
db[collection].replace_one({"_id": ObjectId(oid)}, document)
print("REPLACED")
else:
db[collection].insert_one(document)
print("INSERTED")
except:
abort(500)
return redirect(url_for("database"))
result = db[collection].find_one_or_404({"_id": ObjectId(oid)})
name = result["id"]
document = bson.dumps(result, indent=4)
return render_template(
"database_edit.html",
collection=collection,
oid=oid,
name=name,
document=document,
)