Files
index/app.py

475 lines
16 KiB
Python

# Icolotl Index Webserver
# Copyright (C) 2026 Morgana <morgana@icolotl.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
from hashlib import scrypt
from os import urandom
from hmac import compare_digest
from colorsys import rgb_to_hsv
from flask import Flask, render_template, request, redirect, url_for, session, abort
from flask_pymongo import PyMongo
from werkzeug.exceptions import HTTPException
from bson import ObjectId
import bson.json_util as bson
import requests
SCRYPT_PARAMS = {"n": 16384, "r": 8, "p": 1}
HEX_CHARS = set("0123456789ABCDEF")
app = Flask(__name__)
with open("./config.json", "r", encoding="utf-8") as file:
config = json.load(file)
app.config["MONGO_URI"] = f"mongodb://{config['db']}/akasha"
app.config["SECRET_KEY"] = config["fsk"]
LIGHT_URL = f"http://192.168.1.10/api/{config['light']}"
mongo = PyMongo(
app,
username="index",
password=config["key"],
authSource="akasha",
authMechanism="SCRAM-SHA-256",
)
del config
if mongo.db is None:
raise Exception("Unable to connect to database.")
db = mongo.db
def partition(predicate, iterable):
trues = []
falses = []
for item in iterable:
(trues if predicate(item) else falses).append(item)
return falses, trues
@app.context_processor
def inject_data():
if request.endpoint:
ddata = db.domains.find_one({"id": request.endpoint})
return {"hue": ddata.get("quad", "ade")}
return {"hue": "ade"}
@app.before_request
def apply_checks():
username = session.get("username")
if request.endpoint == "static":
return
ddata = db.domains.find_one_or_404({"id": request.endpoint})
udata = db.users.find_one({"id": username}) if username else None
if not can_access(ddata, udata):
abort(403 if udata else 401)
def can_access(ddata, udata) -> bool:
if udata:
if ddata["quad"] not in udata["quad"]:
return False
if (not ddata["public"]) and ddata["id"] not in udata["perm"]:
return False
else:
if ddata["quad"] != "ade":
return False
if not ddata["public"]:
return False
return True
@app.route("/")
def index():
db.welcomes.find_one({"id": "index_ade"})
return redirect(url_for("index_ade"))
@app.route("/ade")
def index_ade():
return render_main()
@app.route("/bea")
def index_bea():
return render_main()
@app.route("/cam")
def index_cam():
return render_main()
@app.route("/des")
def index_des():
return render_main()
def render_main():
session["main"] = request.endpoint
fpdata = db.frontpage.find_one({"id": request.endpoint})
dsdata = db.domains.find({"cat": {"$ne": None}})
udata = (
db.users.find_one({"id": session["username"]})
if session.get("username")
else None
)
results = {"ade": {}, "bea": {}, "cam": {}, "des": {}}
for ddata in dsdata:
if not can_access(ddata, udata):
continue
if ddata["cat"] not in results[ddata["quad"]]:
results[ddata["quad"]][ddata["cat"]] = {}
results[ddata["quad"]][ddata["cat"]][ddata["name"]] = ddata["id"]
return render_template(
"index.html",
fpdata=fpdata,
domains=results,
quad=(request.endpoint or "ade")[-3:],
)
@app.route("/help")
def help():
return render_template("help.html")
@app.route("/license")
def license():
return render_template("license.html")
@app.post("/login")
def login():
username = request.form.get("username")
if username:
password = request.form.get("password", "")
udata = db.users.find_one({"id": username})
if not udata:
abort(401)
return
hash = udata["hash"]
salt = udata["salt"]
computed = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS)
if not compare_digest(hash, computed):
abort(401)
return
session["username"] = username
else:
session["username"] = None
return redirect(url_for(session.get("main", "index_ade")))
@app.route("/logout")
def logout():
session["username"] = None
return redirect(url_for("index_ade"))
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
act = request.form.get("action")
match act:
case "create":
username = request.form["username"]
password = request.form["password"]
udata = db.users.find_one({"id": username})
if udata or not username or not password:
abort(400)
salt = urandom(32)
hash = scrypt(password.encode("utf-8"), salt=salt, **SCRYPT_PARAMS)
db.users.insert_one(
{
"id": username,
"hash": hash,
"salt": salt,
"perm": [],
"quad": ["ade"],
}
)
case "edit":
for entry in request.form:
if entry.count("/") == 2:
ptype, pid, uid = entry.split("/")
value = request.form[entry]
if ptype == "perm":
cur_perms = db.users.find_one(
{"_id": ObjectId(uid)}, {"perm": 1}
).get("perm", [])
if value == "True":
if pid not in cur_perms:
cur_perms.append(pid)
else:
if pid in cur_perms:
cur_perms.remove(pid)
db.users.update_one(
{"_id": ObjectId(uid)}, {"$set": {"perm": cur_perms}}
)
if ptype == "quad":
cur_quads = db.users.find_one(
{"_id": ObjectId(uid)}, {"quad": 1}
).get("quad", [])
if value == "True":
if pid not in cur_quads:
cur_quads.append(pid)
else:
if pid in cur_quads:
cur_quads.remove(pid)
db.users.update_one(
{"_id": ObjectId(uid)}, {"$set": {"quad": cur_quads}}
)
udatas = list(db.users.find())
ddatas = sorted(db.domains.find({"public": False}), key=lambda ddata: ddata["id"])
return render_template("register.html", udatas=udatas, ddatas=ddatas)
@app.route("/search")
def search():
abort(500)
# stype = request.args.get("search-type")
# query = request.args.get("search")
# print(stype, query)
# return redirect(url_for("index"))
@app.route("/database", methods=["GET", "POST"])
def database():
cnames = db.list_collection_names()
results = []
collection = None
if request.method == "POST":
collection = request.form.get("collection")
if not collection:
abort(400)
query = request.form.get("query")
if not query or not query.strip():
query = "{}"
try:
query = json.loads(query)
except:
abort(400)
results = list(db[collection].find(query, {"_id": 1, "id": 1}))
results.append({"_id": "new", "id": "new"})
return render_template(
"database.html", cnames=cnames, collection=collection, results=results
)
@app.route("/database/<collection>/<oid>", methods=["GET", "POST"])
def database_edit(collection, oid):
if request.method == "POST":
document = request.form.get("document")
if not document:
abort(400)
document = bson.loads(document)
if document and "_id" in document:
del document["_id"]
try:
if oid == "new":
db[collection].insert_one(document)
elif not document:
db[collection].delete_one({"_id": ObjectId(oid)})
elif oid:
db[collection].replace_one({"_id": ObjectId(oid)}, document)
else:
db[collection].insert_one(document)
except:
abort(500)
return redirect(url_for("database"))
if oid == "new":
name = "New Document"
document = "{}"
else:
result = db[collection].find_one_or_404({"_id": ObjectId(oid)})
name = result["id"]
document = bson.dumps(result, indent=4)
return render_template(
"database_edit.html",
collection=collection,
oid=oid,
name=name,
document=document,
)
@app.route("/tasks", methods=["GET", "POST"])
def tasks():
if request.method == "POST":
act = request.form.get("action", "save")
match act:
case "save":
oid = request.form.get("oid")
if oid:
db.tasks.update_one(
{"_id": ObjectId(oid)},
{
"$set": {
"id": request.form.get("tid") or "000",
"txt": request.form.get("content", ""),
"pre": request.form.get("pre", "").split(),
}
},
)
else:
db.tasks.insert_one(
{
"id": request.form.get("tid") or "000",
"txt": request.form.get("content", ""),
"pre": request.form.get("pre", "").split(),
"done": False,
}
)
case "delete":
oid = request.form.get("oid")
db.tasks.delete_one({"_id": ObjectId(oid)})
case "mark":
oid = request.form.get("oid")
db.tasks.update_one(
{"_id": ObjectId(oid)},
{"$set": {"done": request.form.get("done") == "False"}},
)
tasks = sorted(db.tasks.find(), key=lambda task: task["id"])
other_tasks, complete_tasks = partition(lambda task: task["done"], tasks)
active_tasks, blocked_tasks = partition(
lambda task: any(
any(not ot["done"] for ot in tasks if ot["id"] == pre)
for pre in task["pre"]
),
other_tasks,
)
return render_template(
"tasks.html",
active_tasks=active_tasks,
blocked_tasks=blocked_tasks,
complete_tasks=complete_tasks,
)
@app.route("/lighting", methods=["GET", "POST"])
def lighting():
print(request.form, request.method)
try:
if request.method == "POST":
act = request.form.get("action", "none")
bri = request.form.get("brightness", "0")
light = request.form.get("light", "-1")
try:
bri = int(bri)
except:
bri = -1
try:
light = int(light)
except:
light = -1
if light < 0:
abort(404)
match act:
case "toggle":
if bri > 0:
requests.put(
f"{LIGHT_URL}/lights/{light}/state",
json={"on": False},
timeout=5,
)
else:
requests.put(
f"{LIGHT_URL}/lights/{light}/state",
json={"on": True},
timeout=5,
)
case "bright":
if bri > 0:
requests.put(
f"{LIGHT_URL}/lights/{light}/state",
json={"on": True, "bri": bri},
timeout=5,
)
else:
requests.put(
f"{LIGHT_URL}/lights/{light}/state",
json={"on": False},
timeout=5,
)
case "ctsel":
ct = request.form.get("ct", "366")
try:
ct = int(ct)
except:
ct = 366
print("CT: ", ct)
requests.put(
f"{LIGHT_URL}/lights/{light}/state",
json={"on": True, "ct": ct},
timeout=5,
)
case "hssel":
color = request.form.get("hs", "#FF00AA")
if color[0] == "#":
color = color[1:]
color = color.upper()
if len(color) != 6 or not all(c in HEX_CHARS for c in color):
color = "FF00AA"
rgb = [int(color[i : i + 2], 16) / 255 for i in range(0, 6, 2)]
hsv = rgb_to_hsv(*rgb)
requests.put(
f"{LIGHT_URL}/lights/{light}/state",
json={
"on": True,
"hue": int(hsv[0] * 65535),
"sat": int(hsv[1] * 254),
"bri": int(hsv[2] * 254),
},
timeout=5,
)
res = requests.get(LIGHT_URL, timeout=5)
try:
raw = res.json()
except:
abort(500)
result = {
group: {
"name": raw["groups"][group]["name"],
"lights": {},
}
for group in raw["groups"]
}
result[-1] = {"name": "Ungrouped", "lights": {}}
for light in raw["lights"]:
ldata = raw["lights"][light]
if not ldata["state"]["reachable"]:
continue
if not ldata["state"]["on"]:
color = "#000000"
elif ldata["state"].get("colormode") == "hs":
color = f"hsl({ldata['state']['hue']/65535*360} {ldata['state']['sat']*100/256} {ldata['state']['bri']*50/256})"
else:
color = f"hsl(35 75 {int(ldata["state"]["bri"] / 3)})"
ldata["display"] = color
ungrouped = True
for group in raw["groups"]:
if light in raw["groups"][group]["lights"]:
result[group]["lights"][light] = ldata
ungrouped = False
if ungrouped:
result[-1]["lights"][light] = ldata
return render_template("lighting.html", ldatas=result)
except requests.exceptions.ConnectionError:
abort(500)
@app.errorhandler(HTTPException)
def handle_error(error):
return render_template("error.html", error=error), error.code