Includes FastAPI+Jinja2+HTMX+SQLite implementation with seed categories, plus deployment templates. Co-authored-by: Cursor <cursoragent@cursor.com>
446 lines
13 KiB
Python
446 lines
13 KiB
Python
"""
|
|
main.py
|
|
|
|
FastAPI + Jinja2 + HTMX + SQLite(SQLAlchemy)로 만드는 초경량 프롬프트 공유 커뮤니티 "모프(all prompt)".
|
|
|
|
실행:
|
|
python main.py
|
|
|
|
핵심 UX:
|
|
- 회원가입/로그인 없음
|
|
- 닉네임은 쿠키에 저장(처음 프롬프트 등록 시 입력)
|
|
- 프롬프트: 탐색/등록/복사/좋아요
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import uvicorn
|
|
from fastapi import Depends, FastAPI, Form, HTTPException, Request
|
|
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import func, or_
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db, init_db_and_seed
|
|
from models import Category, Like, Prompt, User
|
|
|
|
|
|
APP_NAME = "all prompt"
|
|
APP_KO_NAME = "모프"
|
|
|
|
COOKIE_UID = "mopf_uid" # 브라우저별 고유 식별자(익명)
|
|
COOKIE_NICKNAME = "mopf_nickname"
|
|
|
|
# 식별자 해시에 사용하는 salt(로컬 MVP라 간단히). 운영이면 환경변수로 설정 권장.
|
|
IDENTIFIER_SALT = os.getenv("MOPF_SALT", "mopf-local-dev-salt")
|
|
|
|
PAGE_SIZE = 20
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# migration 도구 없이 create_all + seed
|
|
init_db_and_seed()
|
|
yield
|
|
|
|
|
|
app = FastAPI(title=f"{APP_NAME} ({APP_KO_NAME})", lifespan=lifespan)
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
# static 폴더가 비어 있어도 mount 가능
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
@app.middleware("http")
|
|
async def ensure_uid_cookie(request: Request, call_next):
|
|
"""
|
|
브라우저마다 고유 uid 쿠키를 하나 발급합니다.
|
|
- 좋아요 중복 방지(쿠키 기반)
|
|
- 익명 닉네임 자동 생성 시에도 사용
|
|
"""
|
|
uid = request.cookies.get(COOKIE_UID)
|
|
response = await call_next(request)
|
|
if not uid:
|
|
uid = uuid.uuid4().hex
|
|
response.set_cookie(
|
|
key=COOKIE_UID,
|
|
value=uid,
|
|
max_age=60 * 60 * 24 * 365 * 5, # 5년
|
|
httponly=True,
|
|
samesite="lax",
|
|
)
|
|
return response
|
|
|
|
|
|
def _client_ip(request: Request) -> str:
|
|
"""
|
|
프록시 뒤일 수 있으니 X-Forwarded-For를 우선 사용합니다.
|
|
로컬 개발/소규모 서버용이므로 최소 구현만 합니다.
|
|
"""
|
|
xff = request.headers.get("x-forwarded-for")
|
|
if xff:
|
|
return xff.split(",")[0].strip()
|
|
if request.client:
|
|
return request.client.host
|
|
return "0.0.0.0"
|
|
|
|
|
|
def user_identifier(request: Request) -> str:
|
|
"""
|
|
쿠키 uid 또는 IP를 기반으로 식별자를 만들고, DB에는 해시만 저장합니다.
|
|
(요구사항: 쿠키 or IP 해시)
|
|
"""
|
|
raw = request.cookies.get(COOKIE_UID) or _client_ip(request)
|
|
digest = hashlib.sha256(f"{IDENTIFIER_SALT}:{raw}".encode("utf-8")).hexdigest()
|
|
return digest
|
|
|
|
|
|
def current_nickname(request: Request) -> str | None:
|
|
nick = request.cookies.get(COOKIE_NICKNAME)
|
|
if not nick:
|
|
return None
|
|
nick = nick.strip()
|
|
return nick or None
|
|
|
|
|
|
def _get_or_create_user(db: Session, nickname: str) -> None:
|
|
"""중복 nick은 unique 제약으로 방어하고, 실패 시 무시."""
|
|
if not nickname:
|
|
return
|
|
if db.query(User.id).filter(User.nickname == nickname).first():
|
|
return
|
|
db.add(User(nickname=nickname))
|
|
try:
|
|
db.commit()
|
|
except IntegrityError:
|
|
db.rollback()
|
|
|
|
|
|
def _like_container_html(prompt_id: int, liked: bool, like_count: int) -> str:
|
|
"""
|
|
HTMX가 교체할 작은 HTML 조각.
|
|
- 템플릿 파일을 추가하지 않기 위해(요구된 템플릿 4개 유지) 서버에서 간단히 생성.
|
|
- user 입력이 들어가지 않는 숫자/ID만 포함하므로 XSS 위험이 낮음.
|
|
"""
|
|
label = "좋아요" if not liked else "좋아요✓"
|
|
disabled = "disabled" if liked else ""
|
|
btn_classes = (
|
|
"px-3 py-1 rounded border text-sm "
|
|
+ ("bg-gray-100 text-gray-500 cursor-not-allowed" if liked else "bg-white hover:bg-gray-50")
|
|
)
|
|
return f"""
|
|
<div id="like-{prompt_id}">
|
|
<button
|
|
class="{btn_classes}"
|
|
hx-post="/like/{prompt_id}"
|
|
hx-target="#like-{prompt_id}"
|
|
hx-swap="outerHTML"
|
|
{disabled}
|
|
>
|
|
{label} <span class="font-semibold">({like_count})</span>
|
|
</button>
|
|
</div>
|
|
""".strip()
|
|
|
|
|
|
def _paginate(page: int) -> tuple[int, int]:
|
|
page = max(page, 1)
|
|
offset = (page - 1) * PAGE_SIZE
|
|
return page, offset
|
|
|
|
|
|
def _list_prompts(
|
|
db: Session,
|
|
request: Request,
|
|
*,
|
|
q: str | None,
|
|
page: int,
|
|
category_id: int | None,
|
|
) -> dict[str, Any]:
|
|
"""목록/검색 화면에서 공용으로 쓰는 조회 로직."""
|
|
page, offset = _paginate(page)
|
|
|
|
base = db.query(Prompt).join(Category, Prompt.category_id == Category.id)
|
|
if category_id:
|
|
base = base.filter(Prompt.category_id == category_id)
|
|
|
|
if q:
|
|
# SQLite에서 대소문자 차이를 줄이기 위해 lower+LIKE 사용(초경량 MVP).
|
|
q2 = q.strip()
|
|
if q2:
|
|
like = f"%{q2.lower()}%"
|
|
base = base.filter(
|
|
or_(
|
|
func.lower(Prompt.title).like(like),
|
|
func.lower(Prompt.content).like(like),
|
|
)
|
|
)
|
|
|
|
total = base.with_entities(func.count(Prompt.id)).scalar() or 0
|
|
prompts = (
|
|
base.order_by(Prompt.created_at.desc())
|
|
.offset(offset)
|
|
.limit(PAGE_SIZE)
|
|
.all()
|
|
)
|
|
|
|
prompt_ids = [p.id for p in prompts]
|
|
|
|
like_counts: dict[int, int] = {}
|
|
liked_ids: set[int] = set()
|
|
if prompt_ids:
|
|
like_counts_rows = (
|
|
db.query(Like.prompt_id, func.count(Like.id))
|
|
.filter(Like.prompt_id.in_(prompt_ids))
|
|
.group_by(Like.prompt_id)
|
|
.all()
|
|
)
|
|
like_counts = {pid: int(cnt) for pid, cnt in like_counts_rows}
|
|
|
|
ident = user_identifier(request)
|
|
liked_rows = (
|
|
db.query(Like.prompt_id)
|
|
.filter(Like.prompt_id.in_(prompt_ids), Like.user_identifier == ident)
|
|
.all()
|
|
)
|
|
liked_ids = {pid for (pid,) in liked_rows}
|
|
|
|
categories = db.query(Category).order_by(Category.name.asc()).all()
|
|
|
|
return {
|
|
"page": page,
|
|
"page_size": PAGE_SIZE,
|
|
"total": total,
|
|
"prompts": prompts,
|
|
"categories": categories,
|
|
"like_counts": like_counts,
|
|
"liked_ids": liked_ids,
|
|
"category_id": category_id,
|
|
"q": q or "",
|
|
"now": datetime.now(),
|
|
}
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index(
|
|
request: Request,
|
|
page: int = 1,
|
|
category: int | None = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
ctx = _list_prompts(db, request, q=None, page=page, category_id=category)
|
|
ctx.update(
|
|
{
|
|
"request": request,
|
|
"app_name": APP_NAME,
|
|
"app_ko_name": APP_KO_NAME,
|
|
"nickname": current_nickname(request),
|
|
}
|
|
)
|
|
return templates.TemplateResponse("index.html", ctx)
|
|
|
|
|
|
@app.get("/search", response_class=HTMLResponse)
|
|
def search(
|
|
request: Request,
|
|
q: str = "",
|
|
page: int = 1,
|
|
category: int | None = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
q = (q or "").strip()
|
|
ctx = _list_prompts(db, request, q=q, page=page, category_id=category)
|
|
ctx.update(
|
|
{
|
|
"request": request,
|
|
"app_name": APP_NAME,
|
|
"app_ko_name": APP_KO_NAME,
|
|
"nickname": current_nickname(request),
|
|
}
|
|
)
|
|
return templates.TemplateResponse("index.html", ctx)
|
|
|
|
|
|
@app.get("/prompt/{id}", response_class=HTMLResponse)
|
|
def prompt_detail(
|
|
request: Request,
|
|
id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
prompt = db.query(Prompt).filter(Prompt.id == id).first()
|
|
if not prompt:
|
|
raise HTTPException(status_code=404, detail="프롬프트를 찾을 수 없습니다.")
|
|
|
|
like_count = db.query(func.count(Like.id)).filter(Like.prompt_id == id).scalar() or 0
|
|
liked = (
|
|
db.query(Like.id)
|
|
.filter(Like.prompt_id == id, Like.user_identifier == user_identifier(request))
|
|
.first()
|
|
is not None
|
|
)
|
|
|
|
categories = db.query(Category).order_by(Category.name.asc()).all()
|
|
return templates.TemplateResponse(
|
|
"detail.html",
|
|
{
|
|
"request": request,
|
|
"app_name": APP_NAME,
|
|
"app_ko_name": APP_KO_NAME,
|
|
"nickname": current_nickname(request),
|
|
"prompt": prompt,
|
|
"categories": categories,
|
|
"like_count": like_count,
|
|
"liked": liked,
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/new", response_class=HTMLResponse)
|
|
def new_prompt_form(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
categories = db.query(Category).order_by(Category.name.asc()).all()
|
|
return templates.TemplateResponse(
|
|
"new.html",
|
|
{
|
|
"request": request,
|
|
"app_name": APP_NAME,
|
|
"app_ko_name": APP_KO_NAME,
|
|
"nickname": current_nickname(request),
|
|
"categories": categories,
|
|
"error": None,
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/new")
|
|
def create_prompt(
|
|
request: Request,
|
|
title: str = Form(...),
|
|
content: str = Form(...),
|
|
category_id: int = Form(...),
|
|
description: str | None = Form(None),
|
|
nickname: str | None = Form(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
title = (title or "").strip()
|
|
content = (content or "").strip()
|
|
description = (description or "").strip() if description is not None else None
|
|
|
|
if not title or not content:
|
|
categories = db.query(Category).order_by(Category.name.asc()).all()
|
|
return templates.TemplateResponse(
|
|
"new.html",
|
|
{
|
|
"request": request,
|
|
"app_name": APP_NAME,
|
|
"app_ko_name": APP_KO_NAME,
|
|
"nickname": current_nickname(request),
|
|
"categories": categories,
|
|
"error": "제목과 프롬프트 내용은 필수입니다.",
|
|
},
|
|
status_code=400,
|
|
)
|
|
|
|
# 닉네임: 쿠키에 있으면 우선, 없으면 폼 입력, 그래도 없으면 uid 기반 자동 생성
|
|
nick = current_nickname(request) or (nickname or "").strip()
|
|
uid = request.cookies.get(COOKIE_UID, "")
|
|
if not nick:
|
|
nick = f"익명-{uid[:6] or 'user'}"
|
|
|
|
# 카테고리 존재 여부 확인(잘못된 ID 방지)
|
|
cat = db.query(Category).filter(Category.id == category_id).first()
|
|
if not cat:
|
|
raise HTTPException(status_code=400, detail="카테고리가 올바르지 않습니다.")
|
|
|
|
# users 테이블에도 nickname을 기록(나중에 로그인 붙이기 쉬움)
|
|
_get_or_create_user(db, nick)
|
|
|
|
prompt = Prompt(
|
|
title=title,
|
|
content=content,
|
|
description=description or None,
|
|
category_id=category_id,
|
|
author_nickname=nick,
|
|
copy_count=0,
|
|
)
|
|
db.add(prompt)
|
|
db.commit()
|
|
db.refresh(prompt)
|
|
|
|
resp = RedirectResponse(url=f"/prompt/{prompt.id}", status_code=303)
|
|
# 닉네임 쿠키 저장(“로그인 없이 시작”)
|
|
if not current_nickname(request) and nick:
|
|
resp.set_cookie(
|
|
key=COOKIE_NICKNAME,
|
|
value=nick,
|
|
max_age=60 * 60 * 24 * 365 * 5,
|
|
httponly=False, # UI에 표시/폼 기본값에 사용
|
|
samesite="lax",
|
|
)
|
|
return resp
|
|
|
|
|
|
@app.post("/like/{id}", response_class=HTMLResponse)
|
|
def like_prompt(
|
|
request: Request,
|
|
id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
# 프롬프트 존재 확인
|
|
prompt_exists = db.query(Prompt.id).filter(Prompt.id == id).first()
|
|
if not prompt_exists:
|
|
raise HTTPException(status_code=404, detail="프롬프트를 찾을 수 없습니다.")
|
|
|
|
ident = user_identifier(request)
|
|
|
|
# 이미 좋아요가 있으면 그대로(중복 방지)
|
|
existing = db.query(Like.id).filter(Like.prompt_id == id, Like.user_identifier == ident).first()
|
|
if not existing:
|
|
db.add(Like(prompt_id=id, user_identifier=ident))
|
|
try:
|
|
db.commit()
|
|
except IntegrityError:
|
|
# 유니크 제약으로 중복이 걸릴 수 있으니 안전하게 처리
|
|
db.rollback()
|
|
|
|
like_count = db.query(func.count(Like.id)).filter(Like.prompt_id == id).scalar() or 0
|
|
liked = (
|
|
db.query(Like.id)
|
|
.filter(Like.prompt_id == id, Like.user_identifier == ident)
|
|
.first()
|
|
is not None
|
|
)
|
|
return HTMLResponse(_like_container_html(id, liked, int(like_count)))
|
|
|
|
|
|
@app.post("/copy/{id}")
|
|
def increment_copy_count(
|
|
request: Request,
|
|
id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
prompt = db.query(Prompt).filter(Prompt.id == id).first()
|
|
if not prompt:
|
|
raise HTTPException(status_code=404, detail="프롬프트를 찾을 수 없습니다.")
|
|
prompt.copy_count = int(prompt.copy_count or 0) + 1
|
|
db.commit()
|
|
return JSONResponse({"copy_count": int(prompt.copy_count)})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 개발 편의상 uvicorn을 코드에서 직접 실행합니다.
|
|
# 포트 변경: `PORT=8001 python main.py`
|
|
port = int(os.getenv("PORT", "8000"))
|
|
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)
|
|
|