Files
home/flask_app.py
dsyoon f2119ed8da README에 conda/systemd/Apache 설치 문서 추가
- Miniconda(ncue) 기반 설치/실행 및 systemd 예시 추가
- Apache ProxyPass 설정 및 503 트러블슈팅 절차 정리
- Flask DB pool lazy 생성으로 임포트 단계 장애(503) 완화

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-08 09:05:47 +09:00

454 lines
14 KiB
Python

from __future__ import annotations
import json
import os
import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import jwt
import psycopg2
import psycopg2.pool
import requests
from dotenv import load_dotenv
from flask import Flask, Response, jsonify, request, send_from_directory
from flask_cors import CORS
load_dotenv()
def env(name: str, default: str = "") -> str:
return str(os.getenv(name, default))
_IDENT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def safe_ident(s: str) -> str:
v = str(s or "").strip()
if not _IDENT_RE.match(v):
raise RuntimeError("Invalid TABLE identifier")
return v
def parse_csv(s: str) -> list[str]:
return [x.strip() for x in str(s or "").split(",") if x.strip()]
def parse_email_csv(s: str) -> list[str]:
return [x.lower() for x in parse_csv(s)]
PORT = int(env("PORT", "8023") or "8023")
DB_HOST = env("DB_HOST", "").strip()
DB_PORT = int(env("DB_PORT", "5432") or "5432")
DB_NAME = env("DB_NAME", "").strip()
DB_USER = env("DB_USER", "").strip()
DB_PASSWORD = env("DB_PASSWORD", "").strip()
TABLE = safe_ident(env("TABLE", "ncue_user") or "ncue_user")
CONFIG_TABLE = "ncue_app_config"
CONFIG_TOKEN = env("CONFIG_TOKEN", "").strip()
ADMIN_EMAILS = set(parse_email_csv(env("ADMIN_EMAILS", "dosangyoon@gmail.com,dsyoon@ncue.net")))
# Auth0 config via .env (preferred)
AUTH0_DOMAIN = env("AUTH0_DOMAIN", "").strip()
AUTH0_CLIENT_ID = env("AUTH0_CLIENT_ID", "").strip()
AUTH0_GOOGLE_CONNECTION = env("AUTH0_GOOGLE_CONNECTION", "").strip()
# Optional CORS (for static on different origin)
CORS_ORIGINS = env("CORS_ORIGINS", "*").strip() or "*"
_POOL: Optional[psycopg2.pool.SimpleConnectionPool] = None
def db_configured() -> bool:
return bool(DB_HOST and DB_NAME and DB_USER and DB_PASSWORD)
def get_pool() -> psycopg2.pool.SimpleConnectionPool:
"""
Lazy DB pool creation.
- prevents app import failure (which causes Apache 503) when DB is temporarily unavailable
- /api/config/auth can still work purely from .env without DB
"""
global _POOL
if _POOL is not None:
return _POOL
if not db_configured():
raise RuntimeError("db_not_configured")
_POOL = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
sslmode="disable",
)
return _POOL
def db_exec(sql: str, params: Tuple[Any, ...] = ()) -> None:
pool = get_pool()
conn = pool.getconn()
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(sql, params)
finally:
pool.putconn(conn)
def db_one(sql: str, params: Tuple[Any, ...] = ()) -> Optional[tuple]:
pool = get_pool()
conn = pool.getconn()
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
return row
finally:
pool.putconn(conn)
def ensure_user_table() -> None:
db_exec(
f"""
create table if not exists public.{TABLE} (
sub text primary key,
email text,
name text,
picture text,
provider text,
first_login_at timestamptz,
last_login_at timestamptz,
last_logout_at timestamptz,
can_manage boolean not null default false,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
)
"""
)
db_exec(f"create index if not exists idx_{TABLE}_email on public.{TABLE} (email)")
db_exec(f"alter table public.{TABLE} add column if not exists first_login_at timestamptz")
db_exec(f"alter table public.{TABLE} add column if not exists last_logout_at timestamptz")
def ensure_config_table() -> None:
db_exec(
f"""
create table if not exists public.{CONFIG_TABLE} (
key text primary key,
value jsonb not null,
updated_at timestamptz not null default now()
)
"""
)
def is_admin_email(email: str) -> bool:
e = str(email or "").strip().lower()
return e in ADMIN_EMAILS
def bearer_token() -> str:
h = request.headers.get("Authorization", "")
m = re.match(r"^Bearer\s+(.+)$", h, flags=re.IGNORECASE)
return m.group(1).strip() if m else ""
@dataclass(frozen=True)
class JwksCacheEntry:
jwks_url: str
fetched_at: float
keys: dict
_JWKS_CACHE: Dict[str, JwksCacheEntry] = {}
_JWKS_TTL_SECONDS = 60 * 15
def _jwks_url(issuer: str) -> str:
iss = issuer.rstrip("/") + "/"
return iss + ".well-known/jwks.json"
def fetch_jwks(issuer: str) -> dict:
url = _jwks_url(issuer)
now = time.time()
cached = _JWKS_CACHE.get(url)
if cached and (now - cached.fetched_at) < _JWKS_TTL_SECONDS:
return cached.keys
r = requests.get(url, timeout=5)
r.raise_for_status()
keys = r.json()
if not isinstance(keys, dict) or "keys" not in keys:
raise RuntimeError("invalid_jwks")
_JWKS_CACHE[url] = JwksCacheEntry(jwks_url=url, fetched_at=now, keys=keys)
return keys
def verify_id_token(id_token: str, issuer: str, audience: str) -> dict:
# 1) read header -> kid
header = jwt.get_unverified_header(id_token)
kid = header.get("kid")
if not kid:
raise RuntimeError("missing_kid")
jwks = fetch_jwks(issuer)
key = None
for k in jwks.get("keys", []):
if k.get("kid") == kid:
key = k
break
if not key:
raise RuntimeError("kid_not_found")
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key))
payload = jwt.decode(
id_token,
key=public_key,
algorithms=["RS256"],
audience=audience,
issuer=issuer.rstrip("/") + "/",
options={"require": ["exp", "iat", "iss", "aud", "sub"]},
)
if not isinstance(payload, dict):
raise RuntimeError("invalid_payload")
return payload
ROOT_DIR = Path(__file__).resolve().parent
app = Flask(__name__)
if CORS_ORIGINS:
origins = CORS_ORIGINS if CORS_ORIGINS == "*" else [o.strip() for o in CORS_ORIGINS.split(",") if o.strip()]
CORS(app, resources={r"/api/*": {"origins": origins}})
ALLOWED_STATIC = {
"index.html",
"styles.css",
"script.js",
"links.json",
"favicon.ico",
}
@app.get("/")
def home() -> Response:
return send_from_directory(ROOT_DIR, "index.html")
@app.get("/<path:filename>")
def static_files(filename: str) -> Response:
# Prevent exposing .env etc. Serve only allowlisted files.
if filename not in ALLOWED_STATIC:
return jsonify({"ok": False, "error": "not_found"}), 404
return send_from_directory(ROOT_DIR, filename)
@app.get("/healthz")
def healthz() -> Response:
try:
if not db_configured():
return jsonify({"ok": False, "error": "db_not_configured"}), 500
row = db_one("select 1 as ok")
if not row:
return jsonify({"ok": False}), 500
return jsonify({"ok": True})
except Exception:
return jsonify({"ok": False}), 500
@app.post("/api/auth/sync")
def api_auth_sync() -> Response:
try:
if not db_configured():
return jsonify({"ok": False, "error": "db_not_configured"}), 500
ensure_user_table()
id_token = bearer_token()
if not id_token:
return jsonify({"ok": False, "error": "missing_token"}), 401
issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip()
audience = str(request.headers.get("X-Auth0-ClientId", "")).strip()
if not issuer or not audience:
return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400
payload = verify_id_token(id_token, issuer=issuer, audience=audience)
sub = str(payload.get("sub") or "").strip()
email = (str(payload.get("email")).strip().lower() if payload.get("email") else None)
name = (str(payload.get("name")).strip() if payload.get("name") else None)
picture = (str(payload.get("picture")).strip() if payload.get("picture") else None)
provider = sub.split("|", 1)[0] if "|" in sub else None
admin = bool(email and is_admin_email(email))
if not sub:
return jsonify({"ok": False, "error": "missing_sub"}), 400
sql = f"""
insert into public.{TABLE}
(sub, email, name, picture, provider, first_login_at, last_login_at, can_manage, updated_at)
values
(%s, %s, %s, %s, %s, now(), now(), %s, now())
on conflict (sub) do update set
email = excluded.email,
name = excluded.name,
picture = excluded.picture,
provider = excluded.provider,
first_login_at = coalesce(public.{TABLE}.first_login_at, excluded.first_login_at),
last_login_at = now(),
can_manage = (public.{TABLE}.can_manage or %s),
updated_at = now()
returning can_manage, first_login_at, last_login_at, last_logout_at
"""
row = db_one(sql, (sub, email, name, picture, provider, admin, admin))
can_manage = bool(row[0]) if row else False
user = (
{
"can_manage": can_manage,
"first_login_at": row[1],
"last_login_at": row[2],
"last_logout_at": row[3],
}
if row
else None
)
return jsonify({"ok": True, "canManage": can_manage, "user": user})
except Exception:
return jsonify({"ok": False, "error": "verify_failed"}), 401
@app.post("/api/auth/logout")
def api_auth_logout() -> Response:
try:
if not db_configured():
return jsonify({"ok": False, "error": "db_not_configured"}), 500
ensure_user_table()
id_token = bearer_token()
if not id_token:
return jsonify({"ok": False, "error": "missing_token"}), 401
issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip()
audience = str(request.headers.get("X-Auth0-ClientId", "")).strip()
if not issuer or not audience:
return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400
payload = verify_id_token(id_token, issuer=issuer, audience=audience)
sub = str(payload.get("sub") or "").strip()
if not sub:
return jsonify({"ok": False, "error": "missing_sub"}), 400
sql = f"""
update public.{TABLE}
set last_logout_at = now(),
updated_at = now()
where sub = %s
returning last_logout_at
"""
row = db_one(sql, (sub,))
return jsonify({"ok": True, "last_logout_at": row[0] if row else None})
except Exception:
return jsonify({"ok": False, "error": "verify_failed"}), 401
@app.get("/api/config/auth")
def api_config_auth_get() -> Response:
try:
# Prefer .env config (no UI needed)
if AUTH0_DOMAIN and AUTH0_CLIENT_ID and AUTH0_GOOGLE_CONNECTION:
return jsonify(
{
"ok": True,
"value": {
"auth0": {"domain": AUTH0_DOMAIN, "clientId": AUTH0_CLIENT_ID},
"connections": {"google": AUTH0_GOOGLE_CONNECTION},
"adminEmails": sorted(list(ADMIN_EMAILS)),
},
"updated_at": None,
"source": "env",
}
)
if not db_configured():
return jsonify({"ok": False, "error": "not_set"}), 404
ensure_config_table()
row = db_one(f"select value, updated_at from public.{CONFIG_TABLE} where key = %s", ("auth",))
if not row:
return jsonify({"ok": False, "error": "not_set"}), 404
value = row[0] or {}
if isinstance(value, str):
value = json.loads(value)
if isinstance(value, dict) and "adminEmails" not in value and isinstance(value.get("allowedEmails"), list):
value["adminEmails"] = value.get("allowedEmails")
return jsonify({"ok": True, "value": value, "updated_at": row[1], "source": "db"})
except Exception:
return jsonify({"ok": False, "error": "server_error"}), 500
@app.post("/api/config/auth")
def api_config_auth_post() -> Response:
try:
if not db_configured():
return jsonify({"ok": False, "error": "db_not_configured"}), 500
ensure_config_table()
if not CONFIG_TOKEN:
return jsonify({"ok": False, "error": "config_token_not_set"}), 403
token = str(request.headers.get("X-Config-Token", "")).strip()
if token != CONFIG_TOKEN:
return jsonify({"ok": False, "error": "forbidden"}), 403
body = request.get_json(silent=True) or {}
auth0 = body.get("auth0") or {}
connections = body.get("connections") or {}
admin_emails = body.get("adminEmails")
if not isinstance(admin_emails, list):
# legacy
admin_emails = body.get("allowedEmails")
if not isinstance(admin_emails, list):
admin_emails = []
domain = str(auth0.get("domain") or "").strip()
client_id = str(auth0.get("clientId") or "").strip()
google_conn = str(connections.get("google") or "").strip()
emails = [str(x).strip().lower() for x in admin_emails if str(x).strip()]
if not domain or not client_id or not google_conn:
return jsonify({"ok": False, "error": "missing_fields"}), 400
value = {"auth0": {"domain": domain, "clientId": client_id}, "connections": {"google": google_conn}, "adminEmails": emails}
sql = f"""
insert into public.{CONFIG_TABLE} (key, value, updated_at)
values (%s, %s::jsonb, now())
on conflict (key) do update set value = excluded.value, updated_at = now()
"""
db_exec(sql, ("auth", json.dumps(value)))
return jsonify({"ok": True})
except Exception:
return jsonify({"ok": False, "error": "server_error"}), 500
if __name__ == "__main__":
# Production should run behind a reverse proxy (nginx) or gunicorn.
app.run(host="0.0.0.0", port=PORT, debug=False)