Files
prompt/main.py
dsyoon 27540269b7 Initial commit: add FastAPI MVP (모프) and existing web app
Includes FastAPI+Jinja2+HTMX+SQLite implementation with seed categories, plus deployment templates.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-16 17:17:22 +09:00

446 lines
13 KiB
Python

"""
main.py
FastAPI + Jinja2 + HTMX + SQLite(SQLAlchemy)로 만드는 초경량 프롬프트 공유 커뮤니티 "모프(all prompt)".
실행:
python main.py
핵심 UX:
- 회원가입/로그인 없음
- 닉네임은 쿠키에 저장(처음 프롬프트 등록 시 입력)
- 프롬프트: 탐색/등록/복사/좋아요
"""
from __future__ import annotations
import hashlib
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Any
import uvicorn
from fastapi import Depends, FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from database import get_db, init_db_and_seed
from models import Category, Like, Prompt, User
APP_NAME = "all prompt"
APP_KO_NAME = "모프"
COOKIE_UID = "mopf_uid" # 브라우저별 고유 식별자(익명)
COOKIE_NICKNAME = "mopf_nickname"
# 식별자 해시에 사용하는 salt(로컬 MVP라 간단히). 운영이면 환경변수로 설정 권장.
IDENTIFIER_SALT = os.getenv("MOPF_SALT", "mopf-local-dev-salt")
PAGE_SIZE = 20
@asynccontextmanager
async def lifespan(app: FastAPI):
# migration 도구 없이 create_all + seed
init_db_and_seed()
yield
app = FastAPI(title=f"{APP_NAME} ({APP_KO_NAME})", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
# static 폴더가 비어 있어도 mount 가능
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.middleware("http")
async def ensure_uid_cookie(request: Request, call_next):
"""
브라우저마다 고유 uid 쿠키를 하나 발급합니다.
- 좋아요 중복 방지(쿠키 기반)
- 익명 닉네임 자동 생성 시에도 사용
"""
uid = request.cookies.get(COOKIE_UID)
response = await call_next(request)
if not uid:
uid = uuid.uuid4().hex
response.set_cookie(
key=COOKIE_UID,
value=uid,
max_age=60 * 60 * 24 * 365 * 5, # 5년
httponly=True,
samesite="lax",
)
return response
def _client_ip(request: Request) -> str:
"""
프록시 뒤일 수 있으니 X-Forwarded-For를 우선 사용합니다.
로컬 개발/소규모 서버용이므로 최소 구현만 합니다.
"""
xff = request.headers.get("x-forwarded-for")
if xff:
return xff.split(",")[0].strip()
if request.client:
return request.client.host
return "0.0.0.0"
def user_identifier(request: Request) -> str:
"""
쿠키 uid 또는 IP를 기반으로 식별자를 만들고, DB에는 해시만 저장합니다.
(요구사항: 쿠키 or IP 해시)
"""
raw = request.cookies.get(COOKIE_UID) or _client_ip(request)
digest = hashlib.sha256(f"{IDENTIFIER_SALT}:{raw}".encode("utf-8")).hexdigest()
return digest
def current_nickname(request: Request) -> str | None:
nick = request.cookies.get(COOKIE_NICKNAME)
if not nick:
return None
nick = nick.strip()
return nick or None
def _get_or_create_user(db: Session, nickname: str) -> None:
"""중복 nick은 unique 제약으로 방어하고, 실패 시 무시."""
if not nickname:
return
if db.query(User.id).filter(User.nickname == nickname).first():
return
db.add(User(nickname=nickname))
try:
db.commit()
except IntegrityError:
db.rollback()
def _like_container_html(prompt_id: int, liked: bool, like_count: int) -> str:
"""
HTMX가 교체할 작은 HTML 조각.
- 템플릿 파일을 추가하지 않기 위해(요구된 템플릿 4개 유지) 서버에서 간단히 생성.
- user 입력이 들어가지 않는 숫자/ID만 포함하므로 XSS 위험이 낮음.
"""
label = "좋아요" if not liked else "좋아요✓"
disabled = "disabled" if liked else ""
btn_classes = (
"px-3 py-1 rounded border text-sm "
+ ("bg-gray-100 text-gray-500 cursor-not-allowed" if liked else "bg-white hover:bg-gray-50")
)
return f"""
<div id="like-{prompt_id}">
<button
class="{btn_classes}"
hx-post="/like/{prompt_id}"
hx-target="#like-{prompt_id}"
hx-swap="outerHTML"
{disabled}
>
{label} <span class="font-semibold">({like_count})</span>
</button>
</div>
""".strip()
def _paginate(page: int) -> tuple[int, int]:
page = max(page, 1)
offset = (page - 1) * PAGE_SIZE
return page, offset
def _list_prompts(
db: Session,
request: Request,
*,
q: str | None,
page: int,
category_id: int | None,
) -> dict[str, Any]:
"""목록/검색 화면에서 공용으로 쓰는 조회 로직."""
page, offset = _paginate(page)
base = db.query(Prompt).join(Category, Prompt.category_id == Category.id)
if category_id:
base = base.filter(Prompt.category_id == category_id)
if q:
# SQLite에서 대소문자 차이를 줄이기 위해 lower+LIKE 사용(초경량 MVP).
q2 = q.strip()
if q2:
like = f"%{q2.lower()}%"
base = base.filter(
or_(
func.lower(Prompt.title).like(like),
func.lower(Prompt.content).like(like),
)
)
total = base.with_entities(func.count(Prompt.id)).scalar() or 0
prompts = (
base.order_by(Prompt.created_at.desc())
.offset(offset)
.limit(PAGE_SIZE)
.all()
)
prompt_ids = [p.id for p in prompts]
like_counts: dict[int, int] = {}
liked_ids: set[int] = set()
if prompt_ids:
like_counts_rows = (
db.query(Like.prompt_id, func.count(Like.id))
.filter(Like.prompt_id.in_(prompt_ids))
.group_by(Like.prompt_id)
.all()
)
like_counts = {pid: int(cnt) for pid, cnt in like_counts_rows}
ident = user_identifier(request)
liked_rows = (
db.query(Like.prompt_id)
.filter(Like.prompt_id.in_(prompt_ids), Like.user_identifier == ident)
.all()
)
liked_ids = {pid for (pid,) in liked_rows}
categories = db.query(Category).order_by(Category.name.asc()).all()
return {
"page": page,
"page_size": PAGE_SIZE,
"total": total,
"prompts": prompts,
"categories": categories,
"like_counts": like_counts,
"liked_ids": liked_ids,
"category_id": category_id,
"q": q or "",
"now": datetime.now(),
}
@app.get("/", response_class=HTMLResponse)
def index(
request: Request,
page: int = 1,
category: int | None = None,
db: Session = Depends(get_db),
):
ctx = _list_prompts(db, request, q=None, page=page, category_id=category)
ctx.update(
{
"request": request,
"app_name": APP_NAME,
"app_ko_name": APP_KO_NAME,
"nickname": current_nickname(request),
}
)
return templates.TemplateResponse("index.html", ctx)
@app.get("/search", response_class=HTMLResponse)
def search(
request: Request,
q: str = "",
page: int = 1,
category: int | None = None,
db: Session = Depends(get_db),
):
q = (q or "").strip()
ctx = _list_prompts(db, request, q=q, page=page, category_id=category)
ctx.update(
{
"request": request,
"app_name": APP_NAME,
"app_ko_name": APP_KO_NAME,
"nickname": current_nickname(request),
}
)
return templates.TemplateResponse("index.html", ctx)
@app.get("/prompt/{id}", response_class=HTMLResponse)
def prompt_detail(
request: Request,
id: int,
db: Session = Depends(get_db),
):
prompt = db.query(Prompt).filter(Prompt.id == id).first()
if not prompt:
raise HTTPException(status_code=404, detail="프롬프트를 찾을 수 없습니다.")
like_count = db.query(func.count(Like.id)).filter(Like.prompt_id == id).scalar() or 0
liked = (
db.query(Like.id)
.filter(Like.prompt_id == id, Like.user_identifier == user_identifier(request))
.first()
is not None
)
categories = db.query(Category).order_by(Category.name.asc()).all()
return templates.TemplateResponse(
"detail.html",
{
"request": request,
"app_name": APP_NAME,
"app_ko_name": APP_KO_NAME,
"nickname": current_nickname(request),
"prompt": prompt,
"categories": categories,
"like_count": like_count,
"liked": liked,
},
)
@app.get("/new", response_class=HTMLResponse)
def new_prompt_form(
request: Request,
db: Session = Depends(get_db),
):
categories = db.query(Category).order_by(Category.name.asc()).all()
return templates.TemplateResponse(
"new.html",
{
"request": request,
"app_name": APP_NAME,
"app_ko_name": APP_KO_NAME,
"nickname": current_nickname(request),
"categories": categories,
"error": None,
},
)
@app.post("/new")
def create_prompt(
request: Request,
title: str = Form(...),
content: str = Form(...),
category_id: int = Form(...),
description: str | None = Form(None),
nickname: str | None = Form(None),
db: Session = Depends(get_db),
):
title = (title or "").strip()
content = (content or "").strip()
description = (description or "").strip() if description is not None else None
if not title or not content:
categories = db.query(Category).order_by(Category.name.asc()).all()
return templates.TemplateResponse(
"new.html",
{
"request": request,
"app_name": APP_NAME,
"app_ko_name": APP_KO_NAME,
"nickname": current_nickname(request),
"categories": categories,
"error": "제목과 프롬프트 내용은 필수입니다.",
},
status_code=400,
)
# 닉네임: 쿠키에 있으면 우선, 없으면 폼 입력, 그래도 없으면 uid 기반 자동 생성
nick = current_nickname(request) or (nickname or "").strip()
uid = request.cookies.get(COOKIE_UID, "")
if not nick:
nick = f"익명-{uid[:6] or 'user'}"
# 카테고리 존재 여부 확인(잘못된 ID 방지)
cat = db.query(Category).filter(Category.id == category_id).first()
if not cat:
raise HTTPException(status_code=400, detail="카테고리가 올바르지 않습니다.")
# users 테이블에도 nickname을 기록(나중에 로그인 붙이기 쉬움)
_get_or_create_user(db, nick)
prompt = Prompt(
title=title,
content=content,
description=description or None,
category_id=category_id,
author_nickname=nick,
copy_count=0,
)
db.add(prompt)
db.commit()
db.refresh(prompt)
resp = RedirectResponse(url=f"/prompt/{prompt.id}", status_code=303)
# 닉네임 쿠키 저장(“로그인 없이 시작”)
if not current_nickname(request) and nick:
resp.set_cookie(
key=COOKIE_NICKNAME,
value=nick,
max_age=60 * 60 * 24 * 365 * 5,
httponly=False, # UI에 표시/폼 기본값에 사용
samesite="lax",
)
return resp
@app.post("/like/{id}", response_class=HTMLResponse)
def like_prompt(
request: Request,
id: int,
db: Session = Depends(get_db),
):
# 프롬프트 존재 확인
prompt_exists = db.query(Prompt.id).filter(Prompt.id == id).first()
if not prompt_exists:
raise HTTPException(status_code=404, detail="프롬프트를 찾을 수 없습니다.")
ident = user_identifier(request)
# 이미 좋아요가 있으면 그대로(중복 방지)
existing = db.query(Like.id).filter(Like.prompt_id == id, Like.user_identifier == ident).first()
if not existing:
db.add(Like(prompt_id=id, user_identifier=ident))
try:
db.commit()
except IntegrityError:
# 유니크 제약으로 중복이 걸릴 수 있으니 안전하게 처리
db.rollback()
like_count = db.query(func.count(Like.id)).filter(Like.prompt_id == id).scalar() or 0
liked = (
db.query(Like.id)
.filter(Like.prompt_id == id, Like.user_identifier == ident)
.first()
is not None
)
return HTMLResponse(_like_container_html(id, liked, int(like_count)))
@app.post("/copy/{id}")
def increment_copy_count(
request: Request,
id: int,
db: Session = Depends(get_db),
):
prompt = db.query(Prompt).filter(Prompt.id == id).first()
if not prompt:
raise HTTPException(status_code=404, detail="프롬프트를 찾을 수 없습니다.")
prompt.copy_count = int(prompt.copy_count or 0) + 1
db.commit()
return JSONResponse({"copy_count": int(prompt.copy_count)})
if __name__ == "__main__":
# 개발 편의상 uvicorn을 코드에서 직접 실행합니다.
# 포트 변경: `PORT=8001 python main.py`
port = int(os.getenv("PORT", "8000"))
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)