import json from hashlib import scrypt from os import urandom from hmac import compare_digest from colorsys import rgb_to_hsv from flask import Flask, render_template, request, redirect, url_for, session, abort from flask_pymongo import PyMongo from bson import ObjectId import bson.json_util as bson import requests SCRYPT_PARAMS = {"n": 16384, "r": 8, "p": 1} HEX_CHARS = set("0123456789ABCDEF") app = Flask(__name__) with open("./config.json", "r", encoding="utf-8") as file: config = json.load(file) app.config["MONGO_URI"] = f"mongodb://{config['db']}/akasha" app.config["SECRET_KEY"] = config["fsk"] LIGHT_URL = f"http://192.168.1.10/api/{config['light']}" mongo = PyMongo( app, username="index", password=config["key"], authSource="akasha", authMechanism="SCRAM-SHA-256", ) del config if mongo.db is None: raise Exception("Unable to connect to database.") db = mongo.db def partition(predicate, iterable): trues = [] falses = [] for item in iterable: (trues if predicate(item) else falses).append(item) return falses, trues @app.context_processor def inject_data(): ddata = db.domains.find_one({"id": request.endpoint}) return dict(hue=ddata["quad"]) @app.before_request def apply_checks(): username = session.get("username") if request.endpoint == "static": return ddata = db.domains.find_one_or_404({"id": request.endpoint}) udata = db.users.find_one({"id": username}) if username else None if not can_access(ddata, udata): abort(401) def can_access(ddata, udata) -> bool: if udata: if ddata["quad"] not in udata["quad"]: return False if (not ddata["public"]) and ddata["id"] not in udata["perm"]: return False else: if ddata["quad"] != "ade": return False if not ddata["public"]: return False return True @app.route("/") def index(): db.welcomes.find_one({"id": "index_ade"}) return redirect(url_for("index_ade")) @app.route("/ade") def index_ade(): return render_main() @app.route("/bea") def index_bea(): return render_main() @app.route("/cam") def index_cam(): return render_main() @app.route("/des") def index_des(): return render_main() def render_main(): session["main"] = request.endpoint fpdata = db.frontpage.find_one({"id": request.endpoint}) dsdata = db.domains.find({"cat": {"$ne": None}}) udata = ( db.users.find_one({"id": session["username"]}) if session.get("username") else None ) results = {"ade": {}, "bea": {}, "cam": {}, "des": {}} for ddata in dsdata: if not can_access(ddata, udata): continue if ddata["cat"] not in results[ddata["quad"]]: results[ddata["quad"]][ddata["cat"]] = {} results[ddata["quad"]][ddata["cat"]][ddata["name"]] = ddata["id"] return render_template( "index.html", fpdata=fpdata, domains=results, quad=(request.endpoint or "ade")[-3:], ) @app.route("/help") def help(): return render_template("help.html") @app.post("/login") def login(): username = request.form.get("username") if username: password = request.form.get("password", "") udata = db.users.find_one({"id": username}) if not udata: abort(401) return hash = udata["hash"] salt = udata["salt"] computed = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS) if not compare_digest(hash, computed): abort(401) return session["username"] = username else: session["username"] = None return redirect(url_for(session.get("main", "index_ade"))) @app.route("/logout") def logout(): session["username"] = None return redirect(url_for("index_ade")) @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": act = request.form.get("action") match act: case "create": username = request.form["username"] password = request.form["password"] udata = db.users.find_one({"id": username}) if udata or not username or not password: abort(400) salt = urandom(32) hash = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS) db.users.insert_one( { "id": username, "hash": hash, "salt": salt, "perm": [], "quad": ["ade"], } ) case "edit": for entry in request.form: if entry.count("/") == 2: ptype, pid, uid = entry.split("/") value = request.form[entry] if ptype == "perm": cur_perms = db.users.find_one( {"_id": ObjectId(uid)}, {"perm": 1} ).get("perm", []) if value == "True": if pid not in cur_perms: cur_perms.append(pid) else: if pid in cur_perms: cur_perms.remove(pid) db.users.update_one( {"_id": ObjectId(uid)}, {"$set": {"perm": cur_perms}} ) if ptype == "quad": cur_quads = db.users.find_one( {"_id": ObjectId(uid)}, {"quad": 1} ).get("quad", []) if value == "True": if pid not in cur_quads: cur_quads.append(pid) else: if pid in cur_quads: cur_quads.remove(pid) db.users.update_one( {"_id": ObjectId(uid)}, {"$set": {"quad": cur_quads}} ) udatas = list(db.users.find()) ddatas = sorted(db.domains.find({"public": False}), key=lambda ddata: ddata["id"]) return render_template("register.html", udatas=udatas, ddatas=ddatas) @app.route("/search") def search(): abort(500) # stype = request.args.get("search-type") # query = request.args.get("search") # print(stype, query) # return redirect(url_for("index")) @app.route("/database", methods=["GET", "POST"]) def database(): cnames = db.list_collection_names() results = [] collection = None if request.method == "POST": collection = request.form.get("collection") if not collection: abort(400) query = request.form.get("query") if not query or not query.strip(): query = "{}" try: query = json.loads(query) except: abort(400) results = list(db[collection].find(query, {"_id": 1, "id": 1})) results.append({"_id": "new", "id": "new"}) return render_template( "database.html", cnames=cnames, collection=collection, results=results ) @app.route("/database//", methods=["GET", "POST"]) def database_edit(collection, oid): if request.method == "POST": document = request.form.get("document") if not document: abort(400) document = bson.loads(document) if document and "_id" in document: del document["_id"] try: if oid == "new": db[collection].insert_one(document) elif not document: db[collection].delete_one({"_id": ObjectId(oid)}) elif oid: db[collection].replace_one({"_id": ObjectId(oid)}, document) else: db[collection].insert_one(document) except: abort(500) return redirect(url_for("database")) if oid == "new": name = "New Document" document = "{}" else: result = db[collection].find_one_or_404({"_id": ObjectId(oid)}) name = result["id"] document = bson.dumps(result, indent=4) return render_template( "database_edit.html", collection=collection, oid=oid, name=name, document=document, ) @app.route("/tasks", methods=["GET", "POST"]) def tasks(): if request.method == "POST": act = request.form.get("action", "save") match act: case "save": oid = request.form.get("oid") if oid: db.tasks.update_one( {"_id": ObjectId(oid)}, { "$set": { "id": request.form.get("tid") or "000", "txt": request.form.get("content", ""), "pre": request.form.get("pre", "").split(), } }, ) else: db.tasks.insert_one( { "id": request.form.get("tid") or "000", "txt": request.form.get("content", ""), "pre": request.form.get("pre", "").split(), "done": False, } ) case "delete": oid = request.form.get("oid") db.tasks.delete_one({"_id": ObjectId(oid)}) case "mark": oid = request.form.get("oid") db.tasks.update_one( {"_id": ObjectId(oid)}, {"$set": {"done": request.form.get("done") == "False"}}, ) tasks = sorted(db.tasks.find(), key=lambda task: task["id"]) other_tasks, complete_tasks = partition(lambda task: task["done"], tasks) active_tasks, blocked_tasks = partition( lambda task: any( any(not ot["done"] for ot in tasks if ot["id"] == pre) for pre in task["pre"] ), other_tasks, ) return render_template( "tasks.html", active_tasks=active_tasks, blocked_tasks=blocked_tasks, complete_tasks=complete_tasks, ) @app.route("/lighting", methods=["GET", "POST"]) def lighting(): print(request.form, request.method) try: if request.method == "POST": act = request.form.get("action", "none") bri = request.form.get("brightness", "0") light = request.form.get("light", "-1") try: bri = int(bri) except: bri = -1 try: light = int(light) except: light = -1 if light < 0: abort(404) match act: case "toggle": if bri > 0: requests.put( f"{LIGHT_URL}/lights/{light}/state", json={"on": False}, timeout=5, ) else: requests.put( f"{LIGHT_URL}/lights/{light}/state", json={"on": True}, timeout=5, ) case "bright": if bri > 0: requests.put( f"{LIGHT_URL}/lights/{light}/state", json={"on": True, "bri": bri}, timeout=5, ) else: requests.put( f"{LIGHT_URL}/lights/{light}/state", json={"on": False}, timeout=5, ) case "ctsel": ct = request.form.get("ct", "366") try: ct = int(ct) except: ct = 366 print("CT: ", ct) requests.put( f"{LIGHT_URL}/lights/{light}/state", json={"on": True, "ct": ct}, timeout=5, ) case "hssel": color = request.form.get("hs", "#FF00AA") if color[0] == "#": color = color[1:] color = color.upper() if len(color) != 6 or not all(c in HEX_CHARS for c in color): color = "FF00AA" rgb = [int(color[i : i + 2], 16) / 255 for i in range(0, 6, 2)] hsv = rgb_to_hsv(*rgb) requests.put( f"{LIGHT_URL}/lights/{light}/state", json={ "on": True, "hue": int(hsv[0] * 65535), "sat": int(hsv[1] * 254), "bri": int(hsv[2] * 254), }, timeout=5, ) res = requests.get(LIGHT_URL, timeout=5) try: raw = res.json() except: abort(500) result = { group: { "name": raw["groups"][group]["name"], "lights": {}, } for group in raw["groups"] } result[-1] = {"name": "Ungrouped", "lights": {}} for light in raw["lights"]: ldata = raw["lights"][light] if not ldata["state"]["reachable"]: continue if not ldata["state"]["on"]: color = "#000000" elif ldata["state"].get("colormode") == "hs": color = f"hsl({ldata['state']['hue']/65535*360} {ldata['state']['sat']*100/256} {ldata['state']['bri']*50/256})" else: color = f"hsl(35 75 {int(ldata["state"]["bri"] / 3)})" ldata["display"] = color ungrouped = True for group in raw["groups"]: if light in raw["groups"][group]["lights"]: result[group]["lights"][light] = ldata ungrouped = False if ungrouped: result[-1]["lights"][light] = ldata return render_template("lighting.html", ldatas=result) except requests.exceptions.ConnectionError: abort(500)