from __future__ import annotations import json import os import re import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Tuple from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse import jwt import psycopg2 import psycopg2.pool import requests from dotenv import load_dotenv from flask import Flask, Response, jsonify, redirect, request, send_from_directory from flask_cors import CORS load_dotenv() def env(name: str, default: str = "") -> str: return str(os.getenv(name, default)) _IDENT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") def safe_ident(s: str) -> str: v = str(s or "").strip() if not _IDENT_RE.match(v): raise RuntimeError("Invalid TABLE identifier") return v def parse_csv(s: str) -> list[str]: return [x.strip() for x in str(s or "").split(",") if x.strip()] def parse_email_csv(s: str) -> list[str]: return [x.lower() for x in parse_csv(s)] PORT = int(env("PORT", "8023") or "8023") DB_HOST = env("DB_HOST", "").strip() DB_PORT = int(env("DB_PORT", "5432") or "5432") DB_NAME = env("DB_NAME", "").strip() DB_USER = env("DB_USER", "").strip() DB_PASSWORD = env("DB_PASSWORD", "").strip() DB_SSLMODE = env("DB_SSLMODE", "prefer").strip() or "prefer" DB_CONNECT_TIMEOUT = int(env("DB_CONNECT_TIMEOUT", "5") or "5") TABLE = safe_ident(env("TABLE", "ncue_user") or "ncue_user") CONFIG_TABLE = "ncue_app_config" CONFIG_TOKEN = env("CONFIG_TOKEN", "").strip() ADMIN_EMAILS = set(parse_email_csv(env("ADMIN_EMAILS", ""))) # Auth0 config via .env (preferred) AUTH0_DOMAIN = env("AUTH0_DOMAIN", "").strip() AUTH0_CLIENT_ID = env("AUTH0_CLIENT_ID", "").strip() AUTH0_GOOGLE_CONNECTION = env("AUTH0_GOOGLE_CONNECTION", "").strip() # Optional CORS (for static on different origin) CORS_ORIGINS = env("CORS_ORIGINS", "*").strip() or "*" _POOL: Optional[psycopg2.pool.SimpleConnectionPool] = None def db_configured() -> bool: return bool(DB_HOST and DB_NAME and DB_USER and DB_PASSWORD) def get_pool() -> psycopg2.pool.SimpleConnectionPool: """ Lazy DB pool creation. - prevents app import failure (which causes Apache 503) when DB is temporarily unavailable - /api/config/auth can still work purely from .env without DB """ global _POOL if _POOL is not None: return _POOL if not db_configured(): raise RuntimeError("db_not_configured") _POOL = psycopg2.pool.SimpleConnectionPool( minconn=1, maxconn=10, host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, sslmode=DB_SSLMODE, connect_timeout=DB_CONNECT_TIMEOUT, ) return _POOL def db_exec(sql: str, params: Tuple[Any, ...] = ()) -> None: pool = get_pool() conn = pool.getconn() try: conn.autocommit = True with conn.cursor() as cur: cur.execute(sql, params) finally: pool.putconn(conn) def db_one(sql: str, params: Tuple[Any, ...] = ()) -> Optional[tuple]: pool = get_pool() conn = pool.getconn() try: conn.autocommit = True with conn.cursor() as cur: cur.execute(sql, params) row = cur.fetchone() return row finally: pool.putconn(conn) def ensure_user_table() -> None: db_exec( f""" create table if not exists public.{TABLE} ( sub text primary key, email text, name text, picture text, provider text, first_login_at timestamptz, last_login_at timestamptz, last_logout_at timestamptz, is_admin boolean not null default false, can_manage boolean not null default false, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ) """ ) db_exec(f"create index if not exists idx_{TABLE}_email on public.{TABLE} (email)") db_exec(f"alter table public.{TABLE} add column if not exists first_login_at timestamptz") db_exec(f"alter table public.{TABLE} add column if not exists last_logout_at timestamptz") db_exec(f"alter table public.{TABLE} add column if not exists is_admin boolean not null default false") # Backward-compat: previously can_manage was used as admin flag. Preserve existing admins. db_exec(f"update public.{TABLE} set is_admin = true where can_manage = true and is_admin = false") # Keep can_manage consistent with is_admin going forward. db_exec(f"update public.{TABLE} set can_manage = is_admin where can_manage is distinct from is_admin") def ensure_config_table() -> None: db_exec( f""" create table if not exists public.{CONFIG_TABLE} ( key text primary key, value jsonb not null, updated_at timestamptz not null default now() ) """ ) def bearer_token() -> str: h = request.headers.get("Authorization", "") m = re.match(r"^Bearer\s+(.+)$", h, flags=re.IGNORECASE) return m.group(1).strip() if m else "" def is_bootstrap_admin(email: Optional[str]) -> bool: e = str(email or "").strip().lower() return bool(e and e in ADMIN_EMAILS) def verify_user_from_request() -> Tuple[Optional[str], str]: """ Returns (sub, email_lowercase). Uses the same headers as /api/auth/sync: - Authorization: Bearer - X-Auth0-Issuer - X-Auth0-ClientId """ id_token = bearer_token() if not id_token: return (None, "") issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip() audience = str(request.headers.get("X-Auth0-ClientId", "")).strip() if not issuer or not audience: return (None, "") payload = verify_id_token(id_token, issuer=issuer, audience=audience) sub = str(payload.get("sub") or "").strip() email = (str(payload.get("email")).strip().lower() if payload.get("email") else "") return (sub or None, email) def is_request_admin() -> Tuple[bool, str]: """ Returns (is_admin, email_lowercase) using DB-stored flag. """ if not db_configured(): return (False, "") ensure_user_table() sub, email = verify_user_from_request() if not sub: return (False, email) row = db_one(f"select is_admin from public.{TABLE} where sub = %s", (sub,)) return (bool(row and row[0] is True), email) def safe_write_json(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") tmp.replace(path) @dataclass(frozen=True) class JwksCacheEntry: jwks_url: str fetched_at: float keys: dict _JWKS_CACHE: Dict[str, JwksCacheEntry] = {} _JWKS_TTL_SECONDS = 60 * 15 def _jwks_url(issuer: str) -> str: iss = issuer.rstrip("/") + "/" return iss + ".well-known/jwks.json" def fetch_jwks(issuer: str) -> dict: url = _jwks_url(issuer) now = time.time() cached = _JWKS_CACHE.get(url) if cached and (now - cached.fetched_at) < _JWKS_TTL_SECONDS: return cached.keys r = requests.get(url, timeout=5) r.raise_for_status() keys = r.json() if not isinstance(keys, dict) or "keys" not in keys: raise RuntimeError("invalid_jwks") _JWKS_CACHE[url] = JwksCacheEntry(jwks_url=url, fetched_at=now, keys=keys) return keys def verify_id_token(id_token: str, issuer: str, audience: str) -> dict: # 1) read header -> kid header = jwt.get_unverified_header(id_token) kid = header.get("kid") if not kid: raise RuntimeError("missing_kid") jwks = fetch_jwks(issuer) key = None for k in jwks.get("keys", []): if k.get("kid") == kid: key = k break if not key: raise RuntimeError("kid_not_found") public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key)) payload = jwt.decode( id_token, key=public_key, algorithms=["RS256"], audience=audience, issuer=issuer.rstrip("/") + "/", options={"require": ["exp", "iat", "iss", "aud", "sub"]}, ) if not isinstance(payload, dict): raise RuntimeError("invalid_payload") return payload ROOT_DIR = Path(__file__).resolve().parent LINKS_FILE = ROOT_DIR / "links.json" app = Flask(__name__) if CORS_ORIGINS: origins = CORS_ORIGINS if CORS_ORIGINS == "*" else [o.strip() for o in CORS_ORIGINS.split(",") if o.strip()] CORS(app, resources={r"/api/*": {"origins": origins}}) ALLOWED_STATIC = { "index.html", "styles.css", "script.js", "links.json", "favicon.ico", } def client_ip() -> str: # Prefer proxy header (Apache ProxyPass sets this) xff = str(request.headers.get("X-Forwarded-For", "")).strip() if xff: return xff.split(",")[0].strip() return str(request.remote_addr or "").strip() def is_http_url(u: str) -> bool: try: p = urlparse(u) return p.scheme in ("http", "https") and bool(p.netloc) except Exception: return False def is_ncue_host(host: str) -> bool: h = str(host or "").strip().lower() return h == "ncue.net" or h.endswith(".ncue.net") def add_ref_params(target_url: str, ref_type: str, ref_value: str) -> str: p = urlparse(target_url) q = dict(parse_qsl(p.query, keep_blank_values=True)) # Keep names short + explicit q["ref_type"] = ref_type q["ref"] = ref_value new_query = urlencode(q, doseq=True) return urlunparse((p.scheme, p.netloc, p.path, p.params, new_query, p.fragment)) @app.get("/") def home() -> Response: return send_from_directory(ROOT_DIR, "index.html") @app.get("/") def static_files(filename: str) -> Response: # Prevent exposing .env etc. Serve only allowlisted files. if filename not in ALLOWED_STATIC: return jsonify({"ok": False, "error": "not_found"}), 404 return send_from_directory(ROOT_DIR, filename) @app.get("/go") def go() -> Response: """ Redirect helper to pass identity to internal apps. - Logged-in user: pass email (from query param) - Anonymous user: pass client IP (server-side) For safety, only redirects to ncue.net / *.ncue.net targets. """ u = str(request.args.get("u", "")).strip() if not u or not is_http_url(u): return jsonify({"ok": False, "error": "invalid_url"}), 400 p = urlparse(u) if not is_ncue_host(p.hostname or ""): return jsonify({"ok": False, "error": "host_not_allowed"}), 400 email = str(request.args.get("e", "") or request.args.get("email", "")).strip().lower() if email: target = add_ref_params(u, "email", email) else: ip = client_ip() target = add_ref_params(u, "ip", ip) resp = redirect(target, code=302) resp.headers["Cache-Control"] = "no-store" return resp @app.get("/healthz") def healthz() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 row = db_one("select 1 as ok") if not row: return jsonify({"ok": False}), 500 return jsonify({"ok": True}) except Exception: # Keep response minimal but actionable return jsonify({"ok": False, "error": "db_connect_failed"}), 500 @app.post("/api/auth/sync") def api_auth_sync() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_user_table() id_token = bearer_token() if not id_token: return jsonify({"ok": False, "error": "missing_token"}), 401 issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip() audience = str(request.headers.get("X-Auth0-ClientId", "")).strip() if not issuer or not audience: return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400 payload = verify_id_token(id_token, issuer=issuer, audience=audience) sub = str(payload.get("sub") or "").strip() email = (str(payload.get("email")).strip().lower() if payload.get("email") else None) name = (str(payload.get("name")).strip() if payload.get("name") else None) picture = (str(payload.get("picture")).strip() if payload.get("picture") else None) provider = sub.split("|", 1)[0] if "|" in sub else None if not sub: return jsonify({"ok": False, "error": "missing_sub"}), 400 bootstrap_admin = bool(email and is_bootstrap_admin(email)) sql = f""" insert into public.{TABLE} (sub, email, name, picture, provider, first_login_at, last_login_at, is_admin, can_manage, updated_at) values (%s, %s, %s, %s, %s, now(), now(), %s, %s, now()) on conflict (sub) do update set email = excluded.email, name = excluded.name, picture = excluded.picture, provider = excluded.provider, first_login_at = coalesce(public.{TABLE}.first_login_at, excluded.first_login_at), last_login_at = now(), is_admin = (public.{TABLE}.is_admin or public.{TABLE}.can_manage or %s), can_manage = (public.{TABLE}.is_admin or public.{TABLE}.can_manage or %s), updated_at = now() returning is_admin, can_manage, first_login_at, last_login_at, last_logout_at """ row = db_one(sql, (sub, email, name, picture, provider, bootstrap_admin, bootstrap_admin, bootstrap_admin, bootstrap_admin)) is_admin = bool(row[0]) if row else False can_manage = bool(row[1]) if row else False user = ( { "is_admin": is_admin, "can_manage": can_manage, "first_login_at": row[2], "last_login_at": row[3], "last_logout_at": row[4], } if row else None ) return jsonify({"ok": True, "canManage": is_admin, "user": user}) except Exception: return jsonify({"ok": False, "error": "verify_failed"}), 401 @app.post("/api/auth/logout") def api_auth_logout() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_user_table() id_token = bearer_token() if not id_token: return jsonify({"ok": False, "error": "missing_token"}), 401 issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip() audience = str(request.headers.get("X-Auth0-ClientId", "")).strip() if not issuer or not audience: return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400 payload = verify_id_token(id_token, issuer=issuer, audience=audience) sub = str(payload.get("sub") or "").strip() if not sub: return jsonify({"ok": False, "error": "missing_sub"}), 400 sql = f""" update public.{TABLE} set last_logout_at = now(), updated_at = now() where sub = %s returning last_logout_at """ row = db_one(sql, (sub,)) return jsonify({"ok": True, "last_logout_at": row[0] if row else None}) except Exception: return jsonify({"ok": False, "error": "verify_failed"}), 401 @app.get("/api/config/auth") def api_config_auth_get() -> Response: try: # Prefer .env config (no UI needed) if AUTH0_DOMAIN and AUTH0_CLIENT_ID and AUTH0_GOOGLE_CONNECTION: return jsonify( { "ok": True, "value": { "auth0": {"domain": AUTH0_DOMAIN, "clientId": AUTH0_CLIENT_ID}, "connections": {"google": AUTH0_GOOGLE_CONNECTION}, # Deprecated: admin is stored per-user in DB (ncue_user.is_admin) "adminEmails": [], }, "updated_at": None, "source": "env", } ) if not db_configured(): return jsonify({"ok": False, "error": "not_set"}), 404 ensure_config_table() row = db_one(f"select value, updated_at from public.{CONFIG_TABLE} where key = %s", ("auth",)) if not row: return jsonify({"ok": False, "error": "not_set"}), 404 value = row[0] or {} if isinstance(value, str): value = json.loads(value) if isinstance(value, dict) and "adminEmails" not in value and isinstance(value.get("allowedEmails"), list): value["adminEmails"] = value.get("allowedEmails") return jsonify({"ok": True, "value": value, "updated_at": row[1], "source": "db"}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 @app.get("/api/links") def api_links_get() -> Response: """ Shared links source for all browsers. Reads from links.json on disk (same directory). """ try: if not LINKS_FILE.exists(): return jsonify({"ok": True, "links": []}) raw = LINKS_FILE.read_text(encoding="utf-8") data = json.loads(raw) if raw.strip() else [] links = data if isinstance(data, list) else data.get("links") if isinstance(data, dict) else [] if not isinstance(links, list): links = [] return jsonify({"ok": True, "links": links}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 @app.put("/api/links") def api_links_put() -> Response: """ Admin-only: overwrite shared links.json with provided array. Body can be: - JSON array - {"links":[...]} """ try: ok_admin, _email = is_request_admin() if not ok_admin: return jsonify({"ok": False, "error": "forbidden"}), 403 body = request.get_json(silent=True) links = body if isinstance(body, list) else body.get("links") if isinstance(body, dict) else None if not isinstance(links, list): return jsonify({"ok": False, "error": "invalid_body"}), 400 safe_write_json(LINKS_FILE, links) return jsonify({"ok": True, "count": len(links)}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 @app.post("/api/config/auth") def api_config_auth_post() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_config_table() if not CONFIG_TOKEN: return jsonify({"ok": False, "error": "config_token_not_set"}), 403 token = str(request.headers.get("X-Config-Token", "")).strip() if token != CONFIG_TOKEN: return jsonify({"ok": False, "error": "forbidden"}), 403 body = request.get_json(silent=True) or {} auth0 = body.get("auth0") or {} connections = body.get("connections") or {} admin_emails = body.get("adminEmails") if not isinstance(admin_emails, list): # legacy admin_emails = body.get("allowedEmails") if not isinstance(admin_emails, list): admin_emails = [] domain = str(auth0.get("domain") or "").strip() client_id = str(auth0.get("clientId") or "").strip() google_conn = str(connections.get("google") or "").strip() emails = [str(x).strip().lower() for x in admin_emails if str(x).strip()] if not domain or not client_id or not google_conn: return jsonify({"ok": False, "error": "missing_fields"}), 400 value = {"auth0": {"domain": domain, "clientId": client_id}, "connections": {"google": google_conn}, "adminEmails": emails} sql = f""" insert into public.{CONFIG_TABLE} (key, value, updated_at) values (%s, %s::jsonb, now()) on conflict (key) do update set value = excluded.value, updated_at = now() """ db_exec(sql, ("auth", json.dumps(value))) return jsonify({"ok": True}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 @app.post("/api/admin/users/set_admin") def api_admin_users_set_admin() -> Response: """ CONFIG_TOKEN-protected API to set per-user admin flag in DB. Body: - {"sub":"...", "isAdmin": true} - or {"email":"user@example.com", "isAdmin": true} """ try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_user_table() if not CONFIG_TOKEN: return jsonify({"ok": False, "error": "config_token_not_set"}), 403 token = str(request.headers.get("X-Config-Token", "")).strip() if token != CONFIG_TOKEN: return jsonify({"ok": False, "error": "forbidden"}), 403 body = request.get_json(silent=True) or {} sub = str(body.get("sub") or "").strip() email = str(body.get("email") or "").strip().lower() is_admin = body.get("isAdmin") if not isinstance(is_admin, bool): # accept legacy keys is_admin = body.get("admin") if not isinstance(is_admin, bool): return jsonify({"ok": False, "error": "missing_isAdmin"}), 400 if not sub and not email: return jsonify({"ok": False, "error": "missing_identifier"}), 400 if sub: sql = f""" update public.{TABLE} set is_admin = %s, can_manage = %s, updated_at = now() where sub = %s returning sub, email, is_admin """ row = db_one(sql, (is_admin, is_admin, sub)) else: sql = f""" update public.{TABLE} set is_admin = %s, can_manage = %s, updated_at = now() where email = %s returning sub, email, is_admin """ row = db_one(sql, (is_admin, is_admin, email)) if not row: return jsonify({"ok": False, "error": "not_found"}), 404 return jsonify({"ok": True, "user": {"sub": row[0], "email": row[1], "is_admin": bool(row[2])}}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 if __name__ == "__main__": # Production should run behind a reverse proxy (nginx) or gunicorn. app.run(host="0.0.0.0", port=PORT, debug=False)