from __future__ import annotations import json import os import re import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Tuple import jwt import psycopg2 import psycopg2.pool import requests from dotenv import load_dotenv from flask import Flask, Response, jsonify, request, send_from_directory from flask_cors import CORS load_dotenv() def env(name: str, default: str = "") -> str: return str(os.getenv(name, default)) _IDENT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") def safe_ident(s: str) -> str: v = str(s or "").strip() if not _IDENT_RE.match(v): raise RuntimeError("Invalid TABLE identifier") return v def parse_csv(s: str) -> list[str]: return [x.strip() for x in str(s or "").split(",") if x.strip()] def parse_email_csv(s: str) -> list[str]: return [x.lower() for x in parse_csv(s)] PORT = int(env("PORT", "8023") or "8023") DB_HOST = env("DB_HOST", "").strip() DB_PORT = int(env("DB_PORT", "5432") or "5432") DB_NAME = env("DB_NAME", "").strip() DB_USER = env("DB_USER", "").strip() DB_PASSWORD = env("DB_PASSWORD", "").strip() DB_SSLMODE = env("DB_SSLMODE", "prefer").strip() or "prefer" DB_CONNECT_TIMEOUT = int(env("DB_CONNECT_TIMEOUT", "5") or "5") TABLE = safe_ident(env("TABLE", "ncue_user") or "ncue_user") CONFIG_TABLE = "ncue_app_config" CONFIG_TOKEN = env("CONFIG_TOKEN", "").strip() ADMIN_EMAILS = set(parse_email_csv(env("ADMIN_EMAILS", "dosangyoon@gmail.com,dsyoon@ncue.net"))) # Auth0 config via .env (preferred) AUTH0_DOMAIN = env("AUTH0_DOMAIN", "").strip() AUTH0_CLIENT_ID = env("AUTH0_CLIENT_ID", "").strip() AUTH0_GOOGLE_CONNECTION = env("AUTH0_GOOGLE_CONNECTION", "").strip() # Optional CORS (for static on different origin) CORS_ORIGINS = env("CORS_ORIGINS", "*").strip() or "*" _POOL: Optional[psycopg2.pool.SimpleConnectionPool] = None def db_configured() -> bool: return bool(DB_HOST and DB_NAME and DB_USER and DB_PASSWORD) def get_pool() -> psycopg2.pool.SimpleConnectionPool: """ Lazy DB pool creation. - prevents app import failure (which causes Apache 503) when DB is temporarily unavailable - /api/config/auth can still work purely from .env without DB """ global _POOL if _POOL is not None: return _POOL if not db_configured(): raise RuntimeError("db_not_configured") _POOL = psycopg2.pool.SimpleConnectionPool( minconn=1, maxconn=10, host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, sslmode=DB_SSLMODE, connect_timeout=DB_CONNECT_TIMEOUT, ) return _POOL def db_exec(sql: str, params: Tuple[Any, ...] = ()) -> None: pool = get_pool() conn = pool.getconn() try: conn.autocommit = True with conn.cursor() as cur: cur.execute(sql, params) finally: pool.putconn(conn) def db_one(sql: str, params: Tuple[Any, ...] = ()) -> Optional[tuple]: pool = get_pool() conn = pool.getconn() try: conn.autocommit = True with conn.cursor() as cur: cur.execute(sql, params) row = cur.fetchone() return row finally: pool.putconn(conn) def ensure_user_table() -> None: db_exec( f""" create table if not exists public.{TABLE} ( sub text primary key, email text, name text, picture text, provider text, first_login_at timestamptz, last_login_at timestamptz, last_logout_at timestamptz, can_manage boolean not null default false, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ) """ ) db_exec(f"create index if not exists idx_{TABLE}_email on public.{TABLE} (email)") db_exec(f"alter table public.{TABLE} add column if not exists first_login_at timestamptz") db_exec(f"alter table public.{TABLE} add column if not exists last_logout_at timestamptz") def ensure_config_table() -> None: db_exec( f""" create table if not exists public.{CONFIG_TABLE} ( key text primary key, value jsonb not null, updated_at timestamptz not null default now() ) """ ) def is_admin_email(email: str) -> bool: e = str(email or "").strip().lower() return e in ADMIN_EMAILS def bearer_token() -> str: h = request.headers.get("Authorization", "") m = re.match(r"^Bearer\s+(.+)$", h, flags=re.IGNORECASE) return m.group(1).strip() if m else "" @dataclass(frozen=True) class JwksCacheEntry: jwks_url: str fetched_at: float keys: dict _JWKS_CACHE: Dict[str, JwksCacheEntry] = {} _JWKS_TTL_SECONDS = 60 * 15 def _jwks_url(issuer: str) -> str: iss = issuer.rstrip("/") + "/" return iss + ".well-known/jwks.json" def fetch_jwks(issuer: str) -> dict: url = _jwks_url(issuer) now = time.time() cached = _JWKS_CACHE.get(url) if cached and (now - cached.fetched_at) < _JWKS_TTL_SECONDS: return cached.keys r = requests.get(url, timeout=5) r.raise_for_status() keys = r.json() if not isinstance(keys, dict) or "keys" not in keys: raise RuntimeError("invalid_jwks") _JWKS_CACHE[url] = JwksCacheEntry(jwks_url=url, fetched_at=now, keys=keys) return keys def verify_id_token(id_token: str, issuer: str, audience: str) -> dict: # 1) read header -> kid header = jwt.get_unverified_header(id_token) kid = header.get("kid") if not kid: raise RuntimeError("missing_kid") jwks = fetch_jwks(issuer) key = None for k in jwks.get("keys", []): if k.get("kid") == kid: key = k break if not key: raise RuntimeError("kid_not_found") public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key)) payload = jwt.decode( id_token, key=public_key, algorithms=["RS256"], audience=audience, issuer=issuer.rstrip("/") + "/", options={"require": ["exp", "iat", "iss", "aud", "sub"]}, ) if not isinstance(payload, dict): raise RuntimeError("invalid_payload") return payload ROOT_DIR = Path(__file__).resolve().parent app = Flask(__name__) if CORS_ORIGINS: origins = CORS_ORIGINS if CORS_ORIGINS == "*" else [o.strip() for o in CORS_ORIGINS.split(",") if o.strip()] CORS(app, resources={r"/api/*": {"origins": origins}}) ALLOWED_STATIC = { "index.html", "styles.css", "script.js", "links.json", "favicon.ico", } @app.get("/") def home() -> Response: return send_from_directory(ROOT_DIR, "index.html") @app.get("/") def static_files(filename: str) -> Response: # Prevent exposing .env etc. Serve only allowlisted files. if filename not in ALLOWED_STATIC: return jsonify({"ok": False, "error": "not_found"}), 404 return send_from_directory(ROOT_DIR, filename) @app.get("/healthz") def healthz() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 row = db_one("select 1 as ok") if not row: return jsonify({"ok": False}), 500 return jsonify({"ok": True}) except Exception: # Keep response minimal but actionable return jsonify({"ok": False, "error": "db_connect_failed"}), 500 @app.post("/api/auth/sync") def api_auth_sync() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_user_table() id_token = bearer_token() if not id_token: return jsonify({"ok": False, "error": "missing_token"}), 401 issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip() audience = str(request.headers.get("X-Auth0-ClientId", "")).strip() if not issuer or not audience: return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400 payload = verify_id_token(id_token, issuer=issuer, audience=audience) sub = str(payload.get("sub") or "").strip() email = (str(payload.get("email")).strip().lower() if payload.get("email") else None) name = (str(payload.get("name")).strip() if payload.get("name") else None) picture = (str(payload.get("picture")).strip() if payload.get("picture") else None) provider = sub.split("|", 1)[0] if "|" in sub else None admin = bool(email and is_admin_email(email)) if not sub: return jsonify({"ok": False, "error": "missing_sub"}), 400 sql = f""" insert into public.{TABLE} (sub, email, name, picture, provider, first_login_at, last_login_at, can_manage, updated_at) values (%s, %s, %s, %s, %s, now(), now(), %s, now()) on conflict (sub) do update set email = excluded.email, name = excluded.name, picture = excluded.picture, provider = excluded.provider, first_login_at = coalesce(public.{TABLE}.first_login_at, excluded.first_login_at), last_login_at = now(), can_manage = (public.{TABLE}.can_manage or %s), updated_at = now() returning can_manage, first_login_at, last_login_at, last_logout_at """ row = db_one(sql, (sub, email, name, picture, provider, admin, admin)) can_manage = bool(row[0]) if row else False user = ( { "can_manage": can_manage, "first_login_at": row[1], "last_login_at": row[2], "last_logout_at": row[3], } if row else None ) return jsonify({"ok": True, "canManage": can_manage, "user": user}) except Exception: return jsonify({"ok": False, "error": "verify_failed"}), 401 @app.post("/api/auth/logout") def api_auth_logout() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_user_table() id_token = bearer_token() if not id_token: return jsonify({"ok": False, "error": "missing_token"}), 401 issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip() audience = str(request.headers.get("X-Auth0-ClientId", "")).strip() if not issuer or not audience: return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400 payload = verify_id_token(id_token, issuer=issuer, audience=audience) sub = str(payload.get("sub") or "").strip() if not sub: return jsonify({"ok": False, "error": "missing_sub"}), 400 sql = f""" update public.{TABLE} set last_logout_at = now(), updated_at = now() where sub = %s returning last_logout_at """ row = db_one(sql, (sub,)) return jsonify({"ok": True, "last_logout_at": row[0] if row else None}) except Exception: return jsonify({"ok": False, "error": "verify_failed"}), 401 @app.get("/api/config/auth") def api_config_auth_get() -> Response: try: # Prefer .env config (no UI needed) if AUTH0_DOMAIN and AUTH0_CLIENT_ID and AUTH0_GOOGLE_CONNECTION: return jsonify( { "ok": True, "value": { "auth0": {"domain": AUTH0_DOMAIN, "clientId": AUTH0_CLIENT_ID}, "connections": {"google": AUTH0_GOOGLE_CONNECTION}, "adminEmails": sorted(list(ADMIN_EMAILS)), }, "updated_at": None, "source": "env", } ) if not db_configured(): return jsonify({"ok": False, "error": "not_set"}), 404 ensure_config_table() row = db_one(f"select value, updated_at from public.{CONFIG_TABLE} where key = %s", ("auth",)) if not row: return jsonify({"ok": False, "error": "not_set"}), 404 value = row[0] or {} if isinstance(value, str): value = json.loads(value) if isinstance(value, dict) and "adminEmails" not in value and isinstance(value.get("allowedEmails"), list): value["adminEmails"] = value.get("allowedEmails") return jsonify({"ok": True, "value": value, "updated_at": row[1], "source": "db"}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 @app.post("/api/config/auth") def api_config_auth_post() -> Response: try: if not db_configured(): return jsonify({"ok": False, "error": "db_not_configured"}), 500 ensure_config_table() if not CONFIG_TOKEN: return jsonify({"ok": False, "error": "config_token_not_set"}), 403 token = str(request.headers.get("X-Config-Token", "")).strip() if token != CONFIG_TOKEN: return jsonify({"ok": False, "error": "forbidden"}), 403 body = request.get_json(silent=True) or {} auth0 = body.get("auth0") or {} connections = body.get("connections") or {} admin_emails = body.get("adminEmails") if not isinstance(admin_emails, list): # legacy admin_emails = body.get("allowedEmails") if not isinstance(admin_emails, list): admin_emails = [] domain = str(auth0.get("domain") or "").strip() client_id = str(auth0.get("clientId") or "").strip() google_conn = str(connections.get("google") or "").strip() emails = [str(x).strip().lower() for x in admin_emails if str(x).strip()] if not domain or not client_id or not google_conn: return jsonify({"ok": False, "error": "missing_fields"}), 400 value = {"auth0": {"domain": domain, "clientId": client_id}, "connections": {"google": google_conn}, "adminEmails": emails} sql = f""" insert into public.{CONFIG_TABLE} (key, value, updated_at) values (%s, %s::jsonb, now()) on conflict (key) do update set value = excluded.value, updated_at = now() """ db_exec(sql, ("auth", json.dumps(value))) return jsonify({"ok": True}) except Exception: return jsonify({"ok": False, "error": "server_error"}), 500 if __name__ == "__main__": # Production should run behind a reverse proxy (nginx) or gunicorn. app.run(host="0.0.0.0", port=PORT, debug=False)