DB 기반 관리자 권한으로 전환

- ncue_user에 is_admin 추가 및 can_manage 호환 유지
- /api/auth/sync 및 관리 API를 DB is_admin 기반으로 변경
- index.html 폴백에서 관리자 이메일 하드코딩 제거
- .env로 관리자 부트스트랩(ADMIN_EMAILS) 지원

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dsyoon
2026-02-25 21:09:02 +09:00
parent 19c6814d2f
commit c21a7b3739
7 changed files with 190 additions and 114 deletions

View File

@@ -56,8 +56,7 @@ DB_CONNECT_TIMEOUT = int(env("DB_CONNECT_TIMEOUT", "5") or "5")
TABLE = safe_ident(env("TABLE", "ncue_user") or "ncue_user")
CONFIG_TABLE = "ncue_app_config"
CONFIG_TOKEN = env("CONFIG_TOKEN", "").strip()
ADMIN_EMAILS = set(parse_email_csv(env("ADMIN_EMAILS", "dosangyoon@gmail.com,dsyoon@ncue.net")))
ADMIN_EMAILS = set(parse_email_csv(env("ADMIN_EMAILS", "")))
# Auth0 config via .env (preferred)
AUTH0_DOMAIN = env("AUTH0_DOMAIN", "").strip()
@@ -135,6 +134,7 @@ def ensure_user_table() -> None:
first_login_at timestamptz,
last_login_at timestamptz,
last_logout_at timestamptz,
is_admin boolean not null default false,
can_manage boolean not null default false,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
@@ -144,6 +144,11 @@ def ensure_user_table() -> None:
db_exec(f"create index if not exists idx_{TABLE}_email on public.{TABLE} (email)")
db_exec(f"alter table public.{TABLE} add column if not exists first_login_at timestamptz")
db_exec(f"alter table public.{TABLE} add column if not exists last_logout_at timestamptz")
db_exec(f"alter table public.{TABLE} add column if not exists is_admin boolean not null default false")
# Backward-compat: previously can_manage was used as admin flag. Preserve existing admins.
db_exec(f"update public.{TABLE} set is_admin = true where can_manage = true and is_admin = false")
# Keep can_manage consistent with is_admin going forward.
db_exec(f"update public.{TABLE} set can_manage = is_admin where can_manage is distinct from is_admin")
def ensure_config_table() -> None:
@@ -158,20 +163,20 @@ def ensure_config_table() -> None:
)
def is_admin_email(email: str) -> bool:
e = str(email or "").strip().lower()
return e in ADMIN_EMAILS
def bearer_token() -> str:
h = request.headers.get("Authorization", "")
m = re.match(r"^Bearer\s+(.+)$", h, flags=re.IGNORECASE)
return m.group(1).strip() if m else ""
def verify_admin_from_request() -> Tuple[bool, str]:
def is_bootstrap_admin(email: Optional[str]) -> bool:
e = str(email or "").strip().lower()
return bool(e and e in ADMIN_EMAILS)
def verify_user_from_request() -> Tuple[Optional[str], str]:
"""
Returns (is_admin, email_lowercase).
Returns (sub, email_lowercase).
Uses the same headers as /api/auth/sync:
- Authorization: Bearer <id_token>
- X-Auth0-Issuer
@@ -179,16 +184,31 @@ def verify_admin_from_request() -> Tuple[bool, str]:
"""
id_token = bearer_token()
if not id_token:
return (False, "")
return (None, "")
issuer = str(request.headers.get("X-Auth0-Issuer", "")).strip()
audience = str(request.headers.get("X-Auth0-ClientId", "")).strip()
if not issuer or not audience:
return (False, "")
return (None, "")
payload = verify_id_token(id_token, issuer=issuer, audience=audience)
sub = str(payload.get("sub") or "").strip()
email = (str(payload.get("email")).strip().lower() if payload.get("email") else "")
return (bool(email and is_admin_email(email)), email)
return (sub or None, email)
def is_request_admin() -> Tuple[bool, str]:
"""
Returns (is_admin, email_lowercase) using DB-stored flag.
"""
if not db_configured():
return (False, "")
ensure_user_table()
sub, email = verify_user_from_request()
if not sub:
return (False, email)
row = db_one(f"select is_admin from public.{TABLE} where sub = %s", (sub,))
return (bool(row and row[0] is True), email)
def safe_write_json(path: Path, data: Any) -> None:
@@ -388,16 +408,17 @@ def api_auth_sync() -> Response:
name = (str(payload.get("name")).strip() if payload.get("name") else None)
picture = (str(payload.get("picture")).strip() if payload.get("picture") else None)
provider = sub.split("|", 1)[0] if "|" in sub else None
admin = bool(email and is_admin_email(email))
if not sub:
return jsonify({"ok": False, "error": "missing_sub"}), 400
bootstrap_admin = bool(email and is_bootstrap_admin(email))
sql = f"""
insert into public.{TABLE}
(sub, email, name, picture, provider, first_login_at, last_login_at, can_manage, updated_at)
(sub, email, name, picture, provider, first_login_at, last_login_at, is_admin, can_manage, updated_at)
values
(%s, %s, %s, %s, %s, now(), now(), %s, now())
(%s, %s, %s, %s, %s, now(), now(), %s, %s, now())
on conflict (sub) do update set
email = excluded.email,
name = excluded.name,
@@ -405,24 +426,27 @@ def api_auth_sync() -> Response:
provider = excluded.provider,
first_login_at = coalesce(public.{TABLE}.first_login_at, excluded.first_login_at),
last_login_at = now(),
can_manage = (public.{TABLE}.can_manage or %s),
is_admin = (public.{TABLE}.is_admin or public.{TABLE}.can_manage or %s),
can_manage = (public.{TABLE}.is_admin or public.{TABLE}.can_manage or %s),
updated_at = now()
returning can_manage, first_login_at, last_login_at, last_logout_at
returning is_admin, can_manage, first_login_at, last_login_at, last_logout_at
"""
row = db_one(sql, (sub, email, name, picture, provider, admin, admin))
can_manage = bool(row[0]) if row else False
row = db_one(sql, (sub, email, name, picture, provider, bootstrap_admin, bootstrap_admin, bootstrap_admin, bootstrap_admin))
is_admin = bool(row[0]) if row else False
can_manage = bool(row[1]) if row else False
user = (
{
"is_admin": is_admin,
"can_manage": can_manage,
"first_login_at": row[1],
"last_login_at": row[2],
"last_logout_at": row[3],
"first_login_at": row[2],
"last_login_at": row[3],
"last_logout_at": row[4],
}
if row
else None
)
return jsonify({"ok": True, "canManage": can_manage, "user": user})
return jsonify({"ok": True, "canManage": is_admin, "user": user})
except Exception:
return jsonify({"ok": False, "error": "verify_failed"}), 401
@@ -471,7 +495,8 @@ def api_config_auth_get() -> Response:
"value": {
"auth0": {"domain": AUTH0_DOMAIN, "clientId": AUTH0_CLIENT_ID},
"connections": {"google": AUTH0_GOOGLE_CONNECTION},
"adminEmails": sorted(list(ADMIN_EMAILS)),
# Deprecated: admin is stored per-user in DB (ncue_user.is_admin)
"adminEmails": [],
},
"updated_at": None,
"source": "env",
@@ -525,7 +550,7 @@ def api_links_put() -> Response:
- {"links":[...]}
"""
try:
ok_admin, _email = verify_admin_from_request()
ok_admin, _email = is_request_admin()
if not ok_admin:
return jsonify({"ok": False, "error": "forbidden"}), 403
@@ -585,6 +610,65 @@ def api_config_auth_post() -> Response:
return jsonify({"ok": False, "error": "server_error"}), 500
@app.post("/api/admin/users/set_admin")
def api_admin_users_set_admin() -> Response:
"""
CONFIG_TOKEN-protected API to set per-user admin flag in DB.
Body:
- {"sub":"...", "isAdmin": true}
- or {"email":"user@example.com", "isAdmin": true}
"""
try:
if not db_configured():
return jsonify({"ok": False, "error": "db_not_configured"}), 500
ensure_user_table()
if not CONFIG_TOKEN:
return jsonify({"ok": False, "error": "config_token_not_set"}), 403
token = str(request.headers.get("X-Config-Token", "")).strip()
if token != CONFIG_TOKEN:
return jsonify({"ok": False, "error": "forbidden"}), 403
body = request.get_json(silent=True) or {}
sub = str(body.get("sub") or "").strip()
email = str(body.get("email") or "").strip().lower()
is_admin = body.get("isAdmin")
if not isinstance(is_admin, bool):
# accept legacy keys
is_admin = body.get("admin")
if not isinstance(is_admin, bool):
return jsonify({"ok": False, "error": "missing_isAdmin"}), 400
if not sub and not email:
return jsonify({"ok": False, "error": "missing_identifier"}), 400
if sub:
sql = f"""
update public.{TABLE}
set is_admin = %s,
can_manage = %s,
updated_at = now()
where sub = %s
returning sub, email, is_admin
"""
row = db_one(sql, (is_admin, is_admin, sub))
else:
sql = f"""
update public.{TABLE}
set is_admin = %s,
can_manage = %s,
updated_at = now()
where email = %s
returning sub, email, is_admin
"""
row = db_one(sql, (is_admin, is_admin, email))
if not row:
return jsonify({"ok": False, "error": "not_found"}), 404
return jsonify({"ok": True, "user": {"sub": row[0], "email": row[1], "is_admin": bool(row[2])}})
except Exception:
return jsonify({"ok": False, "error": "server_error"}), 500
if __name__ == "__main__":
# Production should run behind a reverse proxy (nginx) or gunicorn.
app.run(host="0.0.0.0", port=PORT, debug=False)