Flask 백엔드 추가(8023) 및 문서 업데이트

- Flask로 /api/* 및 정적 파일 서빙
- Postgres(ncue_user/ncue_app_config) 연동 및 Auth0 ID 토큰 검증
- README/requirements/.gitignore 업데이트

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dsyoon
2026-02-07 21:45:23 +09:00
parent fb2153cbb0
commit 9fe71ad6a4
4 changed files with 465 additions and 16 deletions

5
.gitignore vendored
View File

@@ -4,3 +4,8 @@ node_modules/
.env.* .env.*
!.env.example !.env.example
# Python
.venv/
__pycache__/
*.pyc

View File

@@ -18,12 +18,17 @@ python3 -m http.server 8000
## 서버(Node) + PostgreSQL 사용자 저장 ## 서버(Node) + PostgreSQL 사용자 저장
로그인 후 사용자 정보를 `ncue_user`에 저장하고(Upsert), `can_manage`로 관리 권한을 DB에서 제어하려면 Node 서버로 실행하세요. 로그인 후 사용자 정보를 `ncue_user`에 저장하고(Upsert), 로그인/로그아웃 시간을 기록하며,
`/api/config/auth`로 Auth0 설정을 공유하려면 백엔드 서버가 필요합니다.
1) 의존성 설치 현재 백엔드는 **Python Flask(기본 포트 8023)** 로 제공합니다. (정적 HTML/JS는 그대로 사용 가능)
1) 의존성 설치(Python)
```bash ```bash
npm install python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
``` ```
2) 테이블 생성 2) 테이블 생성
@@ -35,10 +40,16 @@ psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -f db/schema.sql
3) 서버 실행 3) 서버 실행
```bash ```bash
npm start python flask_app.py
``` ```
### 권한 부여 기본적으로 `.env``ADMIN_EMAILS`에 포함된 이메일은 `can_manage=true`로 자동 승격됩니다.
### (선택) 역프록시/분리 배포
- Flask가 정적까지 함께 서빙: `http://ncue.net:8023`로 접속 (same-origin)
- 정적은 별도 호스팅 + API만 Flask: `index.html``window.AUTH_CONFIG.apiBase`에 API 주소를 넣고,
Flask에서는 `CORS_ORIGINS`로 허용 도메인을 지정하세요.
최초 로그인 사용자는 DB에 저장되지만 `can_manage=false`입니다. 관리 권한을 주려면: 최초 로그인 사용자는 DB에 저장되지만 `can_manage=false`입니다. 관리 권한을 주려면:
@@ -48,26 +59,22 @@ update ncue_user set can_manage = true where email = 'me@example.com';
## 로그인(관리 기능 잠금) ## 로그인(관리 기능 잠금)
이 프로젝트는 **정적 사이트**에서 동작하도록, 관리 기능(추가/편집/삭제/가져오기)을 **로그인 후(허용 이메일)** 에만 활성화할 수 있습니다. 이 프로젝트는 **정적 사이트**에서 동작하도록, 관리 기능(추가/편집/삭제/가져오기)을 **로그인 후(관리자 이메일)** 에만 활성화할 수 있습니다.
- **지원 방식**: Auth0 SPA SDK + Auth0 Universal Login - **지원 방식**: Auth0 SPA SDK + Auth0 Universal Login
- **구글/카카오/네이버**: Auth0 대시보드에서 Social/Custom OAuth 연결로 구성합니다. - **구글/카카오/네이버**: Auth0 대시보드에서 Social/Custom OAuth 연결로 구성합니다.
설정 방법: 설정 방법(.env):
1. Auth0에서 **Single Page Application** 생성 1. Auth0에서 **Single Page Application** 생성
2. `index.html``window.AUTH_CONFIG``domain`, `clientId` 입력 2. `.env`에 아래 값을 설정
- `AUTH0_DOMAIN`
- `AUTH0_CLIENT_ID`
- `AUTH0_GOOGLE_CONNECTION` (예: `google-oauth2`)
- `ADMIN_EMAILS` (예: `dosangyoon@gmail.com,dsyoon@ncue.net`)
3. Auth0 Application 설정에서 아래 URL들을 등록 3. Auth0 Application 설정에서 아래 URL들을 등록
- Allowed Callback URLs: 사이트 주소 (예: `https://example.com/`) - Allowed Callback URLs: 사이트 주소 (예: `https://example.com/`)
- Allowed Logout URLs: 사이트 주소 (예: `https://example.com/`) - Allowed Logout URLs: 사이트 주소 (예: `https://example.com/`)
4. `allowedEmails`에 관리 허용 이메일 목록을 입력
팁:
- 서버에 바로 반영하기 전 테스트가 필요하면, 페이지 상단의 **로그인**을 누르면 뜨는 **로그인 설정 모달**에서
`domain/clientId/allowedEmails`를 입력하면 브라우저에 저장되어 즉시 테스트할 수 있습니다.
- Auth0에서 각 소셜 로그인 연결(connection)을 만들었다면, 모달의 connection 이름(예: `google-oauth2`, `kakao`, `naver`)을 입력하면
상단에 **구글/카카오/네이버** 간편 로그인 버튼이 표시됩니다.
## 데이터 저장 ## 데이터 저장

430
flask_app.py Normal file
View File

@@ -0,0 +1,430 @@
from __future__ import annotations
import json
import os
import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import jwt
import psycopg2
import psycopg2.pool
import requests
from dotenv import load_dotenv
from flask import Flask, Response, jsonify, request, send_from_directory
from flask_cors import CORS
load_dotenv()
def env(name: str, default: str = "") -> str:
return str(os.getenv(name, default))
def must(name: str) -> str:
v = env(name).strip()
if not v:
raise RuntimeError(f"Missing env: {name}")
return v
_IDENT_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def safe_ident(s: str) -> str:
v = str(s or "").strip()
if not _IDENT_RE.match(v):
raise RuntimeError("Invalid TABLE identifier")
return v
def parse_csv(s: str) -> list[str]:
return [x.strip() for x in str(s or "").split(",") if x.strip()]
def parse_email_csv(s: str) -> list[str]:
return [x.lower() for x in parse_csv(s)]
PORT = int(env("PORT", "8023") or "8023")
DB_HOST = must("DB_HOST")
DB_PORT = int(env("DB_PORT", "5432") or "5432")
DB_NAME = must("DB_NAME")
DB_USER = must("DB_USER")
DB_PASSWORD = must("DB_PASSWORD")
TABLE = safe_ident(env("TABLE", "ncue_user") or "ncue_user")
CONFIG_TABLE = "ncue_app_config"
CONFIG_TOKEN = env("CONFIG_TOKEN", "").strip()
ADMIN_EMAILS = set(parse_email_csv(env("ADMIN_EMAILS", "dosangyoon@gmail.com,dsyoon@ncue.net")))
# Auth0 config via .env (preferred)
AUTH0_DOMAIN = env("AUTH0_DOMAIN", "").strip()
AUTH0_CLIENT_ID = env("AUTH0_CLIENT_ID", "").strip()
AUTH0_GOOGLE_CONNECTION = env("AUTH0_GOOGLE_CONNECTION", "").strip()
# Optional CORS (for static on different origin)
CORS_ORIGINS = env("CORS_ORIGINS", "*").strip() or "*"
POOL: psycopg2.pool.SimpleConnectionPool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
sslmode="disable",
)
def db_exec(sql: str, params: Tuple[Any, ...] = ()) -> None:
conn = POOL.getconn()
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(sql, params)
finally:
POOL.putconn(conn)
def db_one(sql: str, params: Tuple[Any, ...] = ()) -> Optional[tuple]:
conn = POOL.getconn()
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
return row
finally:
POOL.putconn(conn)
def ensure_user_table() -> None:
db_exec(
f"""
create table if not exists public.{TABLE} (
sub text primary key,
email text,
name text,
picture text,
provider text,
first_login_at timestamptz,
last_login_at timestamptz,
last_logout_at timestamptz,
can_manage boolean not null default false,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
)
"""
)
db_exec(f"create index if not exists idx_{TABLE}_email on public.{TABLE} (email)")
db_exec(f"alter table public.{TABLE} add column if not exists first_login_at timestamptz")
db_exec(f"alter table public.{TABLE} add column if not exists last_logout_at timestamptz")
def ensure_config_table() -> None:
db_exec(
f"""
create table if not exists public.{CONFIG_TABLE} (
key text primary key,
value jsonb not null,
updated_at timestamptz not null default now()
)
"""
)
def is_admin_email(email: str) -> bool:
e = str(email or "").strip().lower()
return e in ADMIN_EMAILS
def bearer_token() -> str:
h = request.headers.get("Authorization", "")
m = re.match(r"^Bearer\s+(.+)$", h, flags=re.IGNORECASE)
return m.group(1).strip() if m else ""
@dataclass(frozen=True)
class JwksCacheEntry:
jwks_url: str
fetched_at: float
keys: dict
_JWKS_CACHE: Dict[str, JwksCacheEntry] = {}
_JWKS_TTL_SECONDS = 60 * 15
def _jwks_url(issuer: str) -> str:
iss = issuer.rstrip("/") + "/"
return iss + ".well-known/jwks.json"
def fetch_jwks(issuer: str) -> dict:
url = _jwks_url(issuer)
now = time.time()
cached = _JWKS_CACHE.get(url)
if cached and (now - cached.fetched_at) < _JWKS_TTL_SECONDS:
return cached.keys
r = requests.get(url, timeout=5)
r.raise_for_status()
keys = r.json()
if not isinstance(keys, dict) or "keys" not in keys:
raise RuntimeError("invalid_jwks")
_JWKS_CACHE[url] = JwksCacheEntry(jwks_url=url, fetched_at=now, keys=keys)
return keys
def verify_id_token(id_token: str, issuer: str, audience: str) -> dict:
# 1) read header -> kid
header = jwt.get_unverified_header(id_token)
kid = header.get("kid")
if not kid:
raise RuntimeError("missing_kid")
jwks = fetch_jwks(issuer)
key = None
for k in jwks.get("keys", []):
if k.get("kid") == kid:
key = k
break
if not key:
raise RuntimeError("kid_not_found")
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key))
payload = jwt.decode(
id_token,
key=public_key,
algorithms=["RS256"],
audience=audience,
issuer=issuer.rstrip("/") + "/",
options={"require": ["exp", "iat", "iss", "aud", "sub"]},
)
if not isinstance(payload, dict):
raise RuntimeError("invalid_payload")
return payload
ROOT_DIR = Path(__file__).resolve().parent
app = Flask(__name__)
if CORS_ORIGINS:
origins = CORS_ORIGINS if CORS_ORIGINS == "*" else [o.strip() for o in CORS_ORIGINS.split(",") if o.strip()]
CORS(app, resources={r"/api/*": {"origins": origins}})
ALLOWED_STATIC = {
"index.html",
"styles.css",
"script.js",
"links.json",
"favicon.ico",
}
@app.get("/")
def home() -> Response:
return send_from_directory(ROOT_DIR, "index.html")
@app.get("/<path:filename>")
def static_files(filename: str) -> Response:
# Prevent exposing .env etc. Serve only allowlisted files.
if filename not in ALLOWED_STATIC:
return jsonify({"ok": False, "error": "not_found"}), 404
return send_from_directory(ROOT_DIR, filename)
@app.get("/healthz")
def healthz() -> Response:
try:
row = db_one("select 1 as ok")
if not row:
return jsonify({"ok": False}), 500
return jsonify({"ok": True})
except Exception:
return jsonify({"ok": False}), 500
@app.post("/api/auth/sync")
def api_auth_sync() -> Response:
try:
ensure_user_table()
id_token = bearer_token()
if not id_token:
return jsonify({"ok": False, "error": "missing_token"}), 401
issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip()
audience = str(request.headers.get("X-Auth0-ClientId", "")).strip()
if not issuer or not audience:
return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400
payload = verify_id_token(id_token, issuer=issuer, audience=audience)
sub = str(payload.get("sub") or "").strip()
email = (str(payload.get("email")).strip().lower() if payload.get("email") else None)
name = (str(payload.get("name")).strip() if payload.get("name") else None)
picture = (str(payload.get("picture")).strip() if payload.get("picture") else None)
provider = sub.split("|", 1)[0] if "|" in sub else None
admin = bool(email and is_admin_email(email))
if not sub:
return jsonify({"ok": False, "error": "missing_sub"}), 400
sql = f"""
insert into public.{TABLE}
(sub, email, name, picture, provider, first_login_at, last_login_at, can_manage, updated_at)
values
(%s, %s, %s, %s, %s, now(), now(), %s, now())
on conflict (sub) do update set
email = excluded.email,
name = excluded.name,
picture = excluded.picture,
provider = excluded.provider,
first_login_at = coalesce(public.{TABLE}.first_login_at, excluded.first_login_at),
last_login_at = now(),
can_manage = (public.{TABLE}.can_manage or %s),
updated_at = now()
returning can_manage, first_login_at, last_login_at, last_logout_at
"""
row = db_one(sql, (sub, email, name, picture, provider, admin, admin))
can_manage = bool(row[0]) if row else False
user = (
{
"can_manage": can_manage,
"first_login_at": row[1],
"last_login_at": row[2],
"last_logout_at": row[3],
}
if row
else None
)
return jsonify({"ok": True, "canManage": can_manage, "user": user})
except Exception:
return jsonify({"ok": False, "error": "verify_failed"}), 401
@app.post("/api/auth/logout")
def api_auth_logout() -> Response:
try:
ensure_user_table()
id_token = bearer_token()
if not id_token:
return jsonify({"ok": False, "error": "missing_token"}), 401
issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip()
audience = str(request.headers.get("X-Auth0-ClientId", "")).strip()
if not issuer or not audience:
return jsonify({"ok": False, "error": "missing_auth0_headers"}), 400
payload = verify_id_token(id_token, issuer=issuer, audience=audience)
sub = str(payload.get("sub") or "").strip()
if not sub:
return jsonify({"ok": False, "error": "missing_sub"}), 400
sql = f"""
update public.{TABLE}
set last_logout_at = now(),
updated_at = now()
where sub = %s
returning last_logout_at
"""
row = db_one(sql, (sub,))
return jsonify({"ok": True, "last_logout_at": row[0] if row else None})
except Exception:
return jsonify({"ok": False, "error": "verify_failed"}), 401
@app.get("/api/config/auth")
def api_config_auth_get() -> Response:
try:
# Prefer .env config (no UI needed)
if AUTH0_DOMAIN and AUTH0_CLIENT_ID and AUTH0_GOOGLE_CONNECTION:
return jsonify(
{
"ok": True,
"value": {
"auth0": {"domain": AUTH0_DOMAIN, "clientId": AUTH0_CLIENT_ID},
"connections": {"google": AUTH0_GOOGLE_CONNECTION},
"adminEmails": sorted(list(ADMIN_EMAILS)),
},
"updated_at": None,
"source": "env",
}
)
ensure_config_table()
row = db_one(f"select value, updated_at from public.{CONFIG_TABLE} where key = %s", ("auth",))
if not row:
return jsonify({"ok": False, "error": "not_set"}), 404
value = row[0] or {}
if isinstance(value, str):
value = json.loads(value)
if isinstance(value, dict) and "adminEmails" not in value and isinstance(value.get("allowedEmails"), list):
value["adminEmails"] = value.get("allowedEmails")
return jsonify({"ok": True, "value": value, "updated_at": row[1], "source": "db"})
except Exception:
return jsonify({"ok": False, "error": "server_error"}), 500
@app.post("/api/config/auth")
def api_config_auth_post() -> Response:
try:
ensure_config_table()
if not CONFIG_TOKEN:
return jsonify({"ok": False, "error": "config_token_not_set"}), 403
token = str(request.headers.get("X-Config-Token", "")).strip()
if token != CONFIG_TOKEN:
return jsonify({"ok": False, "error": "forbidden"}), 403
body = request.get_json(silent=True) or {}
auth0 = body.get("auth0") or {}
connections = body.get("connections") or {}
admin_emails = body.get("adminEmails")
if not isinstance(admin_emails, list):
# legacy
admin_emails = body.get("allowedEmails")
if not isinstance(admin_emails, list):
admin_emails = []
domain = str(auth0.get("domain") or "").strip()
client_id = str(auth0.get("clientId") or "").strip()
google_conn = str(connections.get("google") or "").strip()
emails = [str(x).strip().lower() for x in admin_emails if str(x).strip()]
if not domain or not client_id or not google_conn:
return jsonify({"ok": False, "error": "missing_fields"}), 400
value = {"auth0": {"domain": domain, "clientId": client_id}, "connections": {"google": google_conn}, "adminEmails": emails}
sql = f"""
insert into public.{CONFIG_TABLE} (key, value, updated_at)
values (%s, %s::jsonb, now())
on conflict (key) do update set value = excluded.value, updated_at = now()
"""
db_exec(sql, ("auth", json.dumps(value)))
return jsonify({"ok": True})
except Exception:
return jsonify({"ok": False, "error": "server_error"}), 500
if __name__ == "__main__":
# Production should run behind a reverse proxy (nginx) or gunicorn.
app.run(host="0.0.0.0", port=PORT, debug=False)

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
Flask
python-dotenv
psycopg2-binary
PyJWT[crypto]
requests
flask-cors