This commit is contained in:
dsyoon
2025-12-27 14:06:26 +09:00
parent 23f5388c56
commit 46460b77f8
33 changed files with 4600 additions and 1 deletions

1
backend/__init__.py Normal file
View File

@@ -0,0 +1 @@

852
backend/app.py Normal file
View File

@@ -0,0 +1,852 @@
"""backend.app
FastAPI 엔트리포인트.
• 공통 미들웨어/CORS 및 로거 설정
• 엔진 레지스트리(engines.__init__)에서 TOOLS 를 가져와 `/tools` API 에 노출
• 개발챗봇(dev_chatbot) Router 포함 PDF 업로드·벡터검색 등 전용 API 분리
• `/chat` 하나의 엔드포인트로 모든 도구 요청을 처리하며,
- 세션 관리(tool_sessions)
- 이미지 OCR
- knowledge_mode(kb_only/hybrid) 분기
- 각 엔진별 prepare_context 로 컨텍스트 생성
• 실시간 로그(chat.log) 저장
코드를 길게 설명하기보다, 각 섹션(Import / 설정 / 유틸 / Router 등록 / 엔드포인트) 위에
블록 주석을 배치해 가독성을 높였다.
"""
from fastapi import FastAPI, Request, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import os
from datetime import datetime
from typing import List, Dict, Optional
from PIL import Image
import json
import logging
import openai
from dotenv import load_dotenv
# --- LangChain 1.1+ 호환성 패치 (레거시 경로 alias) ---
import sys, importlib, types
try:
import langchain # 확인용
mappings = {
'langchain.docstore.document': 'langchain_community.docstore',
'langchain.text_splitter': 'langchain_text_splitters',
'langchain.callbacks': 'langchain_community.callbacks',
'langchain.callbacks.streaming_stdout': 'langchain_community.callbacks.streaming_stdout',
'langchain.prompts': 'langchain_core.prompts',
'langchain.output_parsers.openai_tools': 'langchain_community.output_parsers.openai_tools',
'langchain.tools': 'langchain_community.tools',
'langchain.tools.render': 'langchain_community.tools.render',
'langchain_ollama': None,
}
for old, new in mappings.items():
if old not in sys.modules:
if new:
try:
mod = importlib.import_module(new)
sys.modules[old] = mod
except Exception:
pass
else:
import types as _t
dummy = _t.ModuleType('langchain_ollama')
class _Dummy:
def __init__(self,*a,**kw):
raise ImportError('langchain_ollama removed')
dummy.OllamaLLM = _Dummy
dummy.OllamaEmbeddings = _Dummy
sys.modules['langchain_ollama']=dummy
# schema.Document alias
if 'langchain.schema' not in sys.modules:
try:
from langchain_core.documents import Document as _Doc
schema_mod = types.ModuleType('langchain.schema')
schema_mod.Document = _Doc
sys.modules['langchain.schema'] = schema_mod
except Exception:
pass
except Exception:
pass
import re
# PDF 관련 로더 미사용
from fastapi.responses import FileResponse
from fastapi.responses import StreamingResponse
import urllib.parse
import psycopg2
from psycopg2.extras import RealDictCursor
import requests
from bs4 import BeautifulSoup
# GxP 챗봇 제거로 관련 컨트롤러 import 삭제
from engines.chatgpt_tool.controller.ChatGPTController import router as chatgpt_router
# .env 파일 로드 (프로젝트 루트에서)
load_dotenv()
# 환경 변수에서 API Key 가져오기
OPEN_API_KEY = os.getenv("OPENAI_API_KEY", "")
if not OPEN_API_KEY:
raise RuntimeError("OPENAI_API_KEY 환경 변수가 설정되어 있지 않습니다. .env 파일을 확인하세요.")
openai_client = openai.OpenAI(api_key=OPEN_API_KEY)
app = FastAPI(title="엔큐톡 AI 채팅 서버", version="1.0.0")
# CORS 설정 (credentials 포함 시 * 를 사용할 수 없음)
DEV_FRONT_URLS = [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://ncue.net:5173",
os.getenv("FRONTEND_ORIGIN", ""),
]
app.add_middleware(
CORSMiddleware,
allow_origins=[o for o in DEV_FRONT_URLS if o],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(chatgpt_router)
# 파일 경로 설정
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 프로젝트 루트 디렉터리 (backend 의 상위)
ROOT_DIR = os.path.dirname(BASE_DIR)
# uploads, chroma_db 를 프로젝트 루트로 이동했으므로 해당 경로를 사용
UPLOAD_DIR = os.path.join(ROOT_DIR, "uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# 벡터스토어 디렉터리 (chroma_db)
VECTOR_DIR = os.path.join(ROOT_DIR, "chroma_db")
os.makedirs(VECTOR_DIR, exist_ok=True)
# 도구별 세션 저장소
tool_sessions: Dict[str, List[Dict]] = {}
# circled number map 1-20
CIRCLED = {1:'',2:'',3:'',4:'',5:'',6:'',7:'',8:'',9:'',10:'',11:'',12:'',13:'',14:'',15:'',16:'',17:'',18:'',19:'',20:''}
# 답변이 "정보 없음" 류의 부정적 응답인지 판별하는 간단한 휴리스틱
def is_negative_answer(text: str) -> bool:
"""사용자 질문에 대한 답변이 관련 정보를 찾지 못했음을 나타내는지 확인"""
if not text:
return True
lowers = text.lower()
negative_keywords = [
"없습니다", "없어요", "없다", "찾을 수 없", "파악할 수 없", "모르", "not found",
"정보가 없습니다", "관련 정보가 없습니다", "않습니다", "죄송", "포함되어 있지", "구체적인 설명", "구체적 설명", "구체적으로 언급"
]
return any(kw.lower() in lowers for kw in negative_keywords)
# 도구 정의
from engines import TOOLS
# dev_chatbot 전용 기능 가져오기
from engines.dev_chatbot import (
router as gc_router,
get_vector_store as gc_get_vector_store,
prepare_context as gc_prepare_context,
)
# doc_translation 전용 기능 가져오기
from engines.doc_translation import (
router as doc_translation_router,
prepare_context as doc_translation_prepare_context,
)
# (옵션) GxP 챗봇 벡터 DB 서비스 현재 모듈이 제거되었을 수 있으므로 더미 처리
try:
from engines.chatbot_gxp.service.GxPVectorDBService import GxPVectorDBService
except ModuleNotFoundError:
class GxPVectorDBService: # type: ignore
def __init__(self, *a, **kw):
pass
def similarity_search(self, *a, **kw):
return []
# FastAPI 라우터 등록
app.include_router(gc_router)
app.include_router(doc_translation_router, prefix="/doc_translation")
class ToolInfo(BaseModel):
id: str
name: str
description: str
@app.get("/")
async def root():
return {"message": "엔큐톡 AI 채팅 서버가 실행 중입니다."}
@app.get("/tools", response_model=List[ToolInfo])
async def get_tools():
# 프론트엔드 카드 표시 순서를 사용자 요구에 맞춰 개발챗봇 → GxP 챗봇 순으로 고정하고,
# 나머지 도구들은 기존 등록 순서를 그대로 유지합니다.
preferred_order = ["chatgpt", "chatbot_gxp"] # dev_chatbot 은 맨 뒤로 이동
# preferred_order 에 명시된 도구를 먼저, 이후 나머지 도구를 추가
ordered_ids = [tid for tid in preferred_order if tid in TOOLS]
# dev_chatbot 을 제외한 나머지 도구들 추가
ordered_ids += [tid for tid in TOOLS.keys() if tid not in ordered_ids and tid != "dev_chatbot"]
# 최종적으로 dev_chatbot 을 맨 뒤에 추가
if "dev_chatbot" in TOOLS:
ordered_ids.append("dev_chatbot")
return [
ToolInfo(id=tid, name=TOOLS[tid]["name"], description=TOOLS[tid]["description"])
for tid in ordered_ids
]
class ChatRequest(BaseModel):
message: str
tool_id: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
response: str
status: str = "success"
session_id: str
tool_name: str
def is_meaningful_text(text):
import re
# 한글 자모(ㄱㄴㅏ 등) 단독 포함 시 = 오타/무의미로 간주
if re.search(r'[\u3130-\u318F]', text):
return False
# 영어·한글·숫자만 추출 후 길이 판단
cleaned = re.sub(r'[^가-힣a-zA-Z0-9]', '', text)
# 반복문자(같은 글자 4회 이상 반복) 무의미
if re.search(r'(.)\1{3,}', cleaned):
return False
# 안내·설명 패턴 필터
if re.search(r'(이미지에서 추출|안내|설명|반환|추출하지 못했습니다)', text):
return False
# 너무 짧으면 의미 없음 (완성형 기준 8자 이상)
return len(cleaned) >= 8
def extract_ocr_text(image_context):
# 실제 OCR 결과만 추출
if image_context.startswith("\n이미지에서 추출한 텍스트:\n"):
extracted = image_context.split("\n이미지에서 추출한 텍스트:\n", 1)[-1].strip()
else:
extracted = image_context.strip()
# 안내문, 설명문, 특수문자 등 의미 없는 값 필터링
if not is_meaningful_text(extracted):
return "[이미지에서 텍스트를 추출하지 못했습니다.]"
return extracted
OCR_COMMANDS = ["텍스트 추출", "텍스트 추출해줘", "텍스트 추출해 주세요", "텍스트 추출해주세요"]
# 로그 디렉터리 및 로거 설정
LOG_DIR = os.path.join(ROOT_DIR, "logs")
os.makedirs(LOG_DIR, exist_ok=True)
LOGGER = logging.getLogger("chat_logger")
if not LOGGER.handlers:
LOGGER.setLevel(logging.INFO)
_fh = logging.FileHandler(os.path.join(LOG_DIR, "chat.log"), encoding="utf-8")
_fh.setFormatter(logging.Formatter('[%(asctime)s] %(message)s'))
LOGGER.addHandler(_fh)
def _safe_chat_log(payload: dict) -> None:
"""logs/chat.log 에 JSON 한 줄로 안전하게 기록한다.
phase(요청/응답), status, reason 등의 필드를 받아도 되고, 없어도 된다.
"""
try:
LOGGER.info(json.dumps(payload, ensure_ascii=False))
except Exception:
# 로깅 중 예외는 서비스 흐름에 영향 주지 않음
pass
# ------------------------
# Auth APIs (login/logout)
# ------------------------
class LoginDTO(BaseModel):
email: str
password: str
def _get_db_conn():
# .env 의 DATABASE_URL(예: postgresql://user:pw@host:port/dbname) 사용
dsn = os.getenv("DATABASE_URL") or os.getenv("POSTGRES_DSN")
if not dsn:
raise RuntimeError("DATABASE_URL 환경변수가 필요합니다")
return psycopg2.connect(dsn)
@app.post("/auth/login")
def auth_login(dto: LoginDTO):
try:
with _get_db_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# id 컬럼 → user_id 로 변경 반영
cur.execute(
"SELECT user_id, email FROM users WHERE email=%s AND password = crypt(%s, password)",
(dto.email, dto.password),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=401, detail="invalid credentials")
return {"user_id": row["user_id"], "email": row["email"]}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/auth/logout")
def auth_logout():
return {"ok": True}
# ------------------------
# Community: QnA Board APIs
# ------------------------
class QnaCreateDTO(BaseModel):
title: str
content: str
author_id: Optional[str] = None
author_email: Optional[str] = None
@app.get("/community/qna")
def list_qna():
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT id, title, content, created_at, updated_at, views, author_id FROM qna_board ORDER BY id DESC")
rows = cur.fetchall() or []
return rows
@app.post("/community/qna")
def create_qna(dto: QnaCreateDTO):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
# resolve author_no from id/no/email
author_no = None
if dto.author_id is not None:
try:
num = int(dto.author_id)
cur.execute("SELECT no FROM users WHERE no=%s", (num,))
r = cur.fetchone()
if r:
author_no = r["no"]
except Exception:
cur.execute("SELECT no FROM users WHERE user_id=%s", (dto.author_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None and dto.author_email:
cur.execute("SELECT no FROM users WHERE email=%s", (dto.author_email,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
# allow anonymous with default author from env
default_id = os.getenv("DEFAULT_QNA_AUTHOR_ID", "admin")
default_email = os.getenv("DEFAULT_QNA_AUTHOR_EMAIL")
if default_email:
cur.execute("SELECT no FROM users WHERE email=%s", (default_email,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None and default_id:
cur.execute("SELECT no FROM users WHERE user_id=%s", (default_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
raise HTTPException(status_code=400, detail="author not found; set DEFAULT_QNA_AUTHOR_ID or pass author_email/id")
cur.execute(
"INSERT INTO qna_board (title, content, author_id) VALUES (%s, %s, %s) RETURNING id",
(dto.title, dto.content, author_no),
)
new_id = cur.fetchone()["id"]
conn.commit()
return {"id": new_id}
@app.get("/community/qna/{qid}")
def get_qna(qid: int, increase: int = 1):
"""단건 조회. increase=1 일 때 조회수 +1 처리 후 반환."""
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
if increase:
cur.execute("UPDATE qna_board SET views=views+1, updated_at=NOW() WHERE id=%s", (qid,))
conn.commit()
cur.execute("SELECT id, title, content, created_at, updated_at, views, author_id FROM qna_board WHERE id=%s", (qid,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="not found")
return row
# ------------------------
# Community: AI News Board APIs
# ------------------------
class AiNewsCreateDTO(BaseModel):
url: str
author_id: Optional[str] = None
author_email: Optional[str] = None
def _extract_og(url: str) -> dict:
meta = {"title": "", "description": "", "image": "", "url": url}
try:
resp = requests.get(url, timeout=5, headers={"User-Agent": "Mozilla/5.0"})
if resp.ok:
soup = BeautifulSoup(resp.text, 'html.parser')
og_title = soup.find('meta', property='og:title')
og_desc = soup.find('meta', property='og:description')
og_img = soup.find('meta', property='og:image')
title_tag = soup.find('title')
meta["title"] = (og_title["content"].strip() if og_title and og_title.get("content") else (title_tag.text.strip() if title_tag else ""))
meta["description"] = (og_desc["content"].strip() if og_desc and og_desc.get("content") else "")
meta["image"] = (og_img["content"].strip() if og_img and og_img.get("content") else "")
except Exception:
pass
return meta
@app.get("/community/ai_news")
def list_ai_news(offset: int = 0, limit: int = 10):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT id, url, created_at, updated_at, views, author_id FROM ai_news_board ORDER BY id DESC LIMIT %s OFFSET %s",
(limit, offset),
)
rows = cur.fetchall() or []
enriched = []
for r in rows:
og = _extract_og(r["url"]) if r.get("url") else {"title":"","description":"","image":"","url":r.get("url")}
r.update({"meta": og})
enriched.append(r)
return {"items": enriched, "nextOffset": offset + len(enriched)}
@app.post("/community/ai_news")
def create_ai_news(dto: AiNewsCreateDTO):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
# resolve author
author_no = None
if dto.author_id is not None:
try:
num = int(dto.author_id)
cur.execute("SELECT no FROM users WHERE no=%s", (num,))
r = cur.fetchone()
if r:
author_no = r["no"]
except Exception:
cur.execute("SELECT no FROM users WHERE user_id=%s", (dto.author_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None and dto.author_email:
cur.execute("SELECT no FROM users WHERE email=%s", (dto.author_email,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
default_id = os.getenv("DEFAULT_QNA_AUTHOR_ID", "admin")
cur.execute("SELECT no FROM users WHERE user_id=%s", (default_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
raise HTTPException(status_code=400, detail="author not found")
cur.execute("INSERT INTO ai_news_board (url, author_id) VALUES (%s, %s) RETURNING id", (dto.url, author_no))
new_id = cur.fetchone()["id"]
conn.commit()
return {"id": new_id}
@app.get("/community/ai_news/{nid}")
def get_ai_news(nid: int, increase: int = 1):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
if increase:
cur.execute("UPDATE ai_news_board SET views=views+1, updated_at=NOW() WHERE id=%s", (nid,))
conn.commit()
cur.execute("SELECT id, url, created_at, updated_at, views, author_id FROM ai_news_board WHERE id=%s", (nid,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="not found")
row.update({"meta": _extract_og(row["url"])})
return row
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(
message: str = Form(...),
tool_id: str = Form(...),
session_id: Optional[str] = Form(None),
image: List[UploadFile] = File(None),
model: str = Form("gpt-5"),
ocr_model: str = Form("none"),
knowledge_mode: str = Form("hybrid"),
):
try:
# 세션 ID를 가장 먼저 확보하여, 모든 분기에서 동일 ID로 로깅되도록 함
if not session_id:
# tool_id 가 잘못되었더라도 임시 세션 ID를 생성하여 추적 가능하게 처리
safe_tool = tool_id if tool_id else "unknown"
session_id = f"{safe_tool}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
# 요청 수신 로그 (항상 기록)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "request",
"tool_id": tool_id,
"session_id": session_id,
"model": model,
"ocr_model": ocr_model,
"knowledge_mode": knowledge_mode,
"user_message": message,
})
# ---- 1) 입력 검증: 무의미한 메시지 필터링 ----
# ChatGPT, 문서번역 도구는 간단 인사말이나 모든 텍스트를 처리해야 하므로 필터를 우회
if tool_id not in ["chatgpt", "doc_translation"] and not is_meaningful_text(message):
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "error",
"reason": "meaningless_text",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
})
return ChatResponse(
response="질문이 명확하지 않습니다. 좀 더 구체적인 내용을 입력해 주세요.",
status="error",
session_id=session_id or "",
tool_name=""
)
# 도구 ID 검증
if tool_id not in TOOLS:
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "error",
"reason": "invalid_tool_id",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
})
return ChatResponse(
response="유효하지 않은 도구입니다.",
status="error",
session_id="",
tool_name=""
)
# ------------------------------------------------------------------
# chatbot_gxp 은 GxPChatController.chat 로 위임처리 (중복 로직 제거)
# ------------------------------------------------------------------
if tool_id == "chatbot_gxp":
# GxPChatController.chat 은 query, session_id 를 매개변수로 받음
gxp_resp = await GxPChatController.chat(query=message, session_id=session_id)
# 오류 코드 처리
if gxp_resp.status_code != 200:
try:
err_detail = json.loads(gxp_resp.body.decode())
err_msg = err_detail.get("error", "GxP 챗봇 처리 중 오류가 발생했습니다.")
except Exception:
err_msg = "GxP 챗봇 처리 중 오류가 발생했습니다."
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "error",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": err_msg,
})
return ChatResponse(
response=err_msg,
status="error",
session_id=session_id,
tool_name=TOOLS[tool_id]["name"],
)
# 정상 응답 처리
data = json.loads(gxp_resp.body.decode()) if isinstance(gxp_resp.body, (bytes, bytearray)) else gxp_resp.body
answer_text = data.get("answer", "") if isinstance(data, dict) else str(data)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "success",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": answer_text[:1000],
})
return ChatResponse(
response=answer_text,
session_id=session_id,
tool_name=TOOLS[tool_id]["name"],
)
# 세션 초기화
if session_id not in tool_sessions:
tool_sessions[session_id] = []
# 이미지 처리 (생략, 기존 코드 유지)
image_context = ""
if image:
ocr_texts = []
for img in image:
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')}_{img.filename}"
image_path = os.path.join(UPLOAD_DIR, filename)
with open(image_path, "wb") as f:
f.write(await img.read())
try:
ocr_result = "[OCR 비활성화]"
if ocr_result:
ocr_texts.append(ocr_result)
except Exception as ocr_err:
ocr_texts.append(f"[OCR 실패: {ocr_err}]")
if ocr_texts:
image_context = f"\n이미지에서 추출한 텍스트:\n{chr(10).join(ocr_texts)}\n"
else:
image_context = "\n[이미지에서 텍스트를 추출하지 못했습니다.]\n"
# 대화 기록 구성
conversation_history = ""
if tool_sessions[session_id]:
history = tool_sessions[session_id][-5:]
conversation_history = "\n".join([
f"{'사용자' if msg['role'] == 'user' else 'AI'}: {msg['content']}"
for msg in history
])
# 프롬프트 구성
tool_info = TOOLS[tool_id]
system_prompt = tool_info["system_prompt"]
# 벡터 검색 (dev_chatbot 전용)
context_text = ""
retrieved_docs = []
if tool_id == "dev_chatbot":
context_text, retrieved_docs = gc_prepare_context(message, knowledge_mode, gc_get_vector_store())
if knowledge_mode == "kb_only" and not context_text:
return ChatResponse(response="지식베이스에 관련 정보가 없습니다.", session_id=session_id, tool_name=tool_info["name"])
# ---- GxP 챗봇 전용 벡터 검색 ----
if tool_id == "chatbot_gxp":
gxp_vec_service = GxPVectorDBService()
retrieved_docs = gxp_vec_service.similarity_search(query=message)
if retrieved_docs:
# 페이지별 최대 2개, 길이 1200자 제한 snippet 생성
page_groups = {}
for d in retrieved_docs:
meta = d.get("metadata", {}) if isinstance(d, dict) else d.metadata
pg = meta.get("page") or meta.get("page_number", "")
filename = meta.get("filename") or meta.get("source", "")
key = (filename, pg)
page_groups.setdefault(key, []).append(d.get("content") if isinstance(d, dict) else d.page_content)
selected = list(page_groups.keys())[:2]
snippets = []
for (fname, pg) in selected:
joined = "\n".join(page_groups[(fname, pg)])[:1200]
snippets.append(f"[출처:{fname} p{pg}]\n{joined}")
context_text = "\n\n[관련 문서 발췌]\n" + "\n---\n".join(snippets)
# ---- 문서번역 전용 처리 ----
if tool_id == "doc_translation":
# 실시간 채팅 번역 처리 (모델 선택 지원)
translated_response = doc_translation_prepare_context(message, model=model)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "success",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": translated_response[:1000],
})
return ChatResponse(response=translated_response, session_id=session_id, tool_name=tool_info["name"])
# kb_only 모드용 추가 지침
if knowledge_mode == "kb_only":
system_prompt += "\n\n주의: 반드시 위의 [관련 문서 발췌] 내용에 근거해서만 답변하고, 모르면 모른다고 답해라."
full_prompt = f"""{system_prompt}
{context_text}
{image_context}
{conversation_history}
사용자: {message}
AI:"""
response = ""
completion = openai_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
*( [{"role": "user" if msg['role']=="user" else "assistant", "content": msg['content']} for msg in tool_sessions[session_id][-5:]] if tool_sessions[session_id] else [] ),
{"role": "user", "content": (image_context + message) if image_context else message}
]
)
response = completion.choices[0].message.content.strip()
# ----------------------------------------------------------
# 공통: GxP 챗봇 근거 문서명/페이지 삽입
# ----------------------------------------------------------
if tool_id == "chatbot_gxp" and retrieved_docs:
cites = []
seen = set()
for d in retrieved_docs:
meta = d.get("metadata", {}) if isinstance(d, dict) else d.metadata
fname = meta.get("filename") or meta.get("source") or meta.get("path")
pg = meta.get("page") or meta.get("page_number")
if fname and pg and (fname, pg) not in seen:
cites.append(f"**(문서명: {fname}, 페이지: {pg})**")
seen.add((fname, pg))
if len(cites) >= 3:
break
if cites:
response = response.rstrip() + "\n\n" + "\n".join(cites)
# 응답 앞부분의 불필요한 사과 문구 제거 (의미 있는 답변인데 "죄송합니다" 로 시작하는 경우)
if re.match(r"^죄송합니다[.,]?\s*", response, flags=re.I) and not is_negative_answer(response):
response = re.sub(r"^죄송합니다[.,]?\s*", "", response, flags=re.I)
# 세션에 대화 기록 저장
tool_sessions[session_id].append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
tool_sessions[session_id].append({
"role": "assistant",
"content": response,
"timestamp": datetime.now().isoformat()
})
# 세션 크기 제한 (최대 20개 메시지)
if len(tool_sessions[session_id]) > 20:
tool_sessions[session_id] = tool_sessions[session_id][-20:]
# dev_chatbot: 인라인 "출처:" 문구 제거 후 [출처] 블록 추가
if tool_id == "dev_chatbot" and retrieved_docs and not is_negative_answer(response):
# 1) 기존 응답에서 "출처:"가 포함된 구문/괄호 제거
response = re.sub(r"[\(\[]?출처[:][^\]\)\n]*[\)\]]?", "", response)
# '**출처**:' 형태의 블록 제거
response = re.sub(r"\*\*출처\*\*[:]?\s*(?:\n|\r|.)*?(?=\n{2,}|$)", "", response, flags=re.I)
# '**참고 문헌**' 블록 제거
response = re.sub(r"\*\*참고[^\n]*\n(?:- .*\n?)*", "", response, flags=re.I)
# 2) 파일별 페이지 모음 생성
file_pages = {}
for d in retrieved_docs:
src = d.metadata.get("source", "")
pg_raw = d.metadata.get("page_number", d.metadata.get("page", ""))
if src and str(pg_raw).isdigit():
file_pages.setdefault(src, set()).add(int(pg_raw))
if file_pages:
ref_lines = []
for src, pages in file_pages.items():
sorted_pages = sorted(pages)
# timestamp prefix 제거
display_src = src
ts_part = src.split("_",1)[0]
if len(src.split("_",1))>1 and ts_part.isdigit():
display_src = src.split("_",1)[1]
page_links = []
for p in sorted_pages:
page_num = p
fname_enc = urllib.parse.quote(src)
page_links.append(f"[p{page_num}](/pdf?filename={fname_enc}#page={page_num})")
ref_lines.append(f"{display_src}: " + ", ".join(page_links))
# 출처 블록 삽입 (중복 방지)
response = response.rstrip()
response += "\n\n[출처]\n" + "\n".join(ref_lines)
# 로깅
try:
LOGGER.info(json.dumps({
"timestamp": datetime.now().isoformat(),
"tool_id": tool_id,
"tool_name": tool_info["name"],
"session_id": session_id,
"model": model,
"ocr_model": ocr_model,
"knowledge_mode": knowledge_mode,
"user_message": message,
"response": response[:1000] # 응답 길이 제한
}, ensure_ascii=False))
except Exception:
pass
# 이미지에서 텍스트 추출만 요청한 경우(프롬프트 없이 OCR 결과만 반환)
if image and message.strip() in OCR_COMMANDS:
ocr_only = extract_ocr_text(image_context)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "success",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": ocr_only[:1000],
})
return ChatResponse(
response=ocr_only,
session_id=session_id,
tool_name=tool_info["name"]
)
return ChatResponse(
response=response,
session_id=session_id,
tool_name=tool_info["name"]
)
except Exception as e:
return ChatResponse(
response=f"오류가 발생했습니다: {str(e)}",
status="error",
session_id=session_id if 'session_id' in locals() else "",
tool_name=TOOLS.get(tool_id, {}).get("name", "") if 'tool_id' in locals() else ""
)
@app.get("/sessions/{tool_id}")
async def get_sessions(tool_id: str):
"""특정 도구의 세션 목록을 반환합니다."""
if tool_id not in TOOLS:
return {"error": "유효하지 않은 도구입니다."}
sessions = [
{
"session_id": session_id,
"message_count": len(messages),
"last_updated": messages[-1]["timestamp"] if messages else None
}
for session_id, messages in tool_sessions.items()
if session_id.startswith(f"{tool_id}_")
]
return {"sessions": sessions}
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
"""특정 세션을 삭제합니다."""
if session_id in tool_sessions:
del tool_sessions[session_id]
return {"message": "세션이 삭제되었습니다."}
return {"error": "세션을 찾을 수 없습니다."}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8010)

View File

@@ -0,0 +1,28 @@
"""backend.engines package
엔진 레지스트리.
서브패키지를 순회해 각 엔진 모듈이 노출한
`TOOL_ID` 와 `TOOL_INFO` 를 자동 수집하여 `TOOLS` dict 로 제공한다.
외부(backend.app 등)에서는 from engines import TOOLS 로 공통 접근.
"""
from importlib import import_module
import pkgutil
# Dictionary mapping tool_id to its info
TOOLS = {}
# Discover and import all subpackages to collect TOOL_INFO
package_name = __name__
for _, module_name, is_pkg in pkgutil.iter_modules(__path__):
if not is_pkg:
# Skip if it's not a package (we expect dirs)
continue
try:
module = import_module(f"{package_name}.{module_name}")
# Each engine package must expose TOOL_ID and TOOL_INFO
if hasattr(module, "TOOL_ID") and hasattr(module, "TOOL_INFO"):
TOOLS[module.TOOL_ID] = module.TOOL_INFO
except ModuleNotFoundError:
# If submodule import fails, skip silently
continue

View File

@@ -0,0 +1,11 @@
TOOL_ID = "chatgpt"
TOOL_INFO = {
"name": "ChatGPT",
"description": "OpenAI ChatGPT 모델과 대화할 수 있는 도구입니다.",
"system_prompt": (
"You are ChatGPT, a large language model trained by OpenAI. "
"Provide thorough, well-structured answers in Korean using 전문지식과 신뢰할 수 있는 공개 자료를 바탕으로 설명하세요. 필요 시 표/리스트/소제목을 활용해 가독성을 높이십시오. "
"만약 기업의 위치나 연락처 등 요청이 오면 본사·연구소·공장 등 주요 거점을 빠짐없이 요약해 주세요."
)
}

View File

@@ -0,0 +1,53 @@
from fastapi import APIRouter, Form
from typing import Optional
import os
import openai
from dotenv import load_dotenv
load_dotenv()
OPEN_API_KEY = os.getenv("OPENAI_API_KEY", "")
openai_client = openai.OpenAI(api_key=OPEN_API_KEY)
router = APIRouter(prefix="/chatgpt", tags=["ChatGPT"])
# 모델 매핑 테이블 (프론트 선택값 ➜ OpenAI 모델명)
MODEL_MAP = {
"auto": "gpt-5",
"gpt-5": "gpt-5",
"gpt-5-mini": "gpt-5-mini",
"gpt-5-nano": "gpt-5-nano",
}
SYSTEM_PROMPT = (
"You are ChatGPT, a large language model trained by OpenAI. "
"Provide thorough, well-structured answers in Korean with rich formatting. "
"Always include 적절한 소제목과 불릿, 줄바꿈을 활용하고, 각 소제목 앞에 관련 이모지(예: 🏢 본사, 🧪 연구소, 🏭 공장 등)를 붙여 가독성을 높이십시오. "
"필요하면 표 또는 번호 리스트를 사용하세요. "
"회사의 위치·연락처·교통편을 묻는 질문에는 반드시 본사, 연구소, 공장 등 주요 거점 정보를 빠짐없이 상세히 제공합니다. "
"답변 길이 제한 없이 충분히 상세히 작성하세요."
)
@router.post("/chat")
async def chat_gpt_endpoint(
message: str = Form(...),
model: str = Form("auto"),
session_id: Optional[str] = Form(None)
):
# 모델 매핑
model_name = MODEL_MAP.get(model, "gpt-5")
# 직접 OpenAI ChatCompletion 호출
completion = openai_client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": message},
]
)
return {
"status": "success",
"response": completion.choices[0].message.content.strip(),
"session_id": session_id or "",
"tool_name": "ChatGPT",
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,192 @@
"""backend.engines.dev_chatbot
개발챗봇 엔진 모듈.
구성
1. TOOL_ID / TOOL_INFO 엔진 메타 데이터 (레지스트리에 자동 수집)
2. prepare_context() Question ↔ VectorStore 유사도 검색 후 컨텍스트 생성
3. 벡터/파일 유틸 index_pdf, delete_pdf, get_vector_store
4. FastAPI router PDF 업로드·조회·삭제 등 API
다른 엔진을 만들 때 이 파일을 템플릿으로 삼으면 일관된 구조를 유지할 수 있다.
"""
# General Chatbot engine configuration
TOOL_ID = "dev_chatbot"
TOOL_INFO = {
"name": "개발챗봇",
"description": "일반적인 챗봇입니다.",
"system_prompt": (
"당신은 다목적 AI 어시스턴트입니다. 사용자의 질문에 대해 PDF 문서 분석 및 실시간 웹 검색을 통해 가장 신뢰할 수 있는 정보를 찾아 제공해야 합니다. "
"1) 질문과 관련된 내부·외부 PDF 자료가 있으면 우선적으로 내용을 요약·분석하여 근거와 함께 답변하세요. "
"2) 추론·계산·코드 예시·표·수식이 도움이 되면 적극 활용하세요. "
"3) 답변의 신뢰도를 높이기 위해 항상 출처(페이지 번호, 링크 등)를 명시하고, 확인되지 않은 정보는 가정임을 분명히 밝히세요. "
"4) 모호한 요청이나 추가 정보가 필요한 경우에는 명확한 follow-up 질문을 통해 요구 사항을 파악한 뒤 답변하세요. "
"5) 사실과 다른 정보를 임의로 생성하지 마세요. 필요한 정보가 없을 경우 솔직히 설명하고 가능한 대안을 제시합니다."
"6) 한국어로 대답해주세요."
)
}
from typing import Tuple, List
import os
from langchain.schema import Document
# get_vector_store 함수는 app.py 에 정의되어 주입받는다.
import urllib.parse
from fastapi import APIRouter, UploadFile, File
from fastapi.responses import StreamingResponse
from datetime import datetime
# -------------------------------------------------
# 벡터스토어 및 PDF 색인/삭제 기능
# -------------------------------------------------
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir))
UPLOAD_DIR = os.path.join(ROOT_DIR, "uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# 벡터스토어 기능 비활성화(GPT 전용 모드)
def get_vector_store():
return None
# PDF 처리 기능 비활성화
try:
from langchain.text_splitter import RecursiveCharacterTextSplitter
except ModuleNotFoundError:
from langchain_text_splitters import RecursiveCharacterTextSplitter
try:
import pdfplumber # type: ignore
except ImportError:
pdfplumber = None
def delete_pdf(filename: str):
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
os.remove(file_path)
vectordb = get_vector_store()
try:
vectordb.delete(where={"source": filename, "tool": TOOL_ID})
except Exception:
pass
# -------------------------------------------------
# FastAPI Router (PDF 업로드/조회/삭제)
# -------------------------------------------------
router = APIRouter()
@router.post("/upload_pdf")
async def upload_pdf(files: List[UploadFile] = File(...)):
saved_files = []
for up in files:
if not up.filename.lower().endswith(".pdf"):
continue
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')}_{up.filename}"
file_path = os.path.join(UPLOAD_DIR, filename)
with open(file_path, "wb") as f:
f.write(await up.read())
try:
if pdfplumber is None:
# pdfplumber 미설치 시 인덱싱 건너뜀
return
filename = os.path.basename(file_path)
docs = []
saved_files.append(filename)
except Exception as e:
return {"status": "error", "message": str(e)}
return {"status": "success", "files": saved_files}
@router.get("/files")
async def list_files():
pdfs = [f for f in os.listdir(UPLOAD_DIR) if f.lower().endswith('.pdf')]
return {"files": pdfs}
@router.get("/file_content")
async def get_file_content(filename: str):
file_path = os.path.join(UPLOAD_DIR, filename)
if not os.path.isfile(file_path):
return {"status": "error", "message": "파일을 찾을 수 없습니다."}
try:
# Assuming PDFPlumberLoader is available or needs to be imported
# from langchain.document_loaders import PDFPlumberLoader
# loader = PDFPlumberLoader(file_path)
# docs = loader.load()
# text = "\n\n".join(d.page_content for d in docs)
# return {"status": "success", "content": text}
# Placeholder for actual PDF content extraction if PDFPlumberLoader is not available
return {"status": "success", "content": "PDF content extraction is not implemented in this engine."}
except Exception as e:
return {"status": "error", "message": str(e)}
@router.get("/pdf")
async def serve_pdf(filename: str):
# 1차: uploads 디렉터리
file_path = os.path.join(UPLOAD_DIR, filename)
# 2차: scripts/gxp 원본 디렉터리 (GxP 문서용)
if not os.path.isfile(file_path):
alt_dir = os.path.join(ROOT_DIR, "scripts", "gxp")
file_path = os.path.join(alt_dir, filename)
if not os.path.isfile(file_path):
return {"status":"error","message":"파일을 찾을 수 없습니다."}
file_stream = open(file_path, "rb")
safe_name = urllib.parse.quote(filename)
headers = {"Content-Disposition": f'inline; filename="{safe_name}"'}
return StreamingResponse(file_stream, media_type="application/pdf", headers=headers)
@router.delete("/file")
async def remove_file(filename: str):
try:
delete_pdf(filename)
return {"status": "success"}
except Exception as e:
return {"status": "error", "message": str(e)}
def prepare_context(message: str, knowledge_mode: str, vectordb) -> Tuple[str, List[Document]]:
"""사용자 메시지에 대한 벡터 검색 컨텍스트와 문서 리스트를 반환합니다.
vectordb: langchain_chroma.Chroma 인스턴스 (지연 로드)
"""
context_text = ""
retrieved_docs: List[Document] = []
if not vectordb or vectordb._collection.count() == 0:
return context_text, retrieved_docs
# dev_chatbot 문서만 대상으로 검색
docs_with_scores = vectordb.similarity_search_with_score(
message,
k=4,
filter={"tool": TOOL_ID}
)
score_threshold = 0.9
filtered = [d for d, s in docs_with_scores if s is not None and s < score_threshold]
retrieved_docs = filtered if filtered else [d for d, _ in docs_with_scores[:2]]
if not retrieved_docs:
return context_text, []
# 페이지별 그룹화 후 상위 2페이지만 사용
page_groups = {}
for d in retrieved_docs:
src = d.metadata.get("source", "")
pg = d.metadata.get("page_number", "")
key = (src, pg)
page_groups.setdefault(key, []).append(d.page_content)
selected_pages = list(page_groups.keys())[:2]
snippets = []
for (src, pg) in selected_pages:
texts = page_groups[(src, pg)]
joined = "\n".join(texts)
snippet_txt = joined[:1500]
header = f"[출처:{src} p{pg}]"
snippets.append(header + "\n" + snippet_txt)
context_text = "\n\n[관련 문서 발췌]\n" + "\n---\n".join(snippets)
# retrieved_docs 축소
retrieved_docs = [d for d in retrieved_docs if (d.metadata.get("source",""), d.metadata.get("page_number","")) in selected_pages]
return context_text, retrieved_docs

View File

@@ -0,0 +1,346 @@
"""backend.engines.doc_translation
문서번역 엔진 모듈.
MS Word 문서를 업로드하여 한글을 영어로 번역하는 기능을 제공합니다.
"""
import os
import shutil
import re
from datetime import datetime
from typing import List
from fastapi import APIRouter, UploadFile, File, HTTPException, Form
from fastapi.responses import FileResponse
from docx import Document as DocxDocument
import openai
from dotenv import load_dotenv
# .env 파일 로드
load_dotenv()
TOOL_ID = "doc_translation"
TOOL_INFO = {
"name": "문서번역",
"description": "MS Word 문서를 업로드하면 한글을 영어로 번역합니다.",
"system_prompt": (
"당신은 전문 번역가입니다. 한국어 문서를 영어로 정확하고 자연스럽게 번역해주세요. "
"번역할 때는 원문의 의미와 뉘앙스를 최대한 보존하면서도 영어로 자연스럽게 표현해주세요. "
"대상 문장 다음 줄에 번역 문장을 제공해주세요."
)
}
# 업로드 디렉토리 설정 - 프로젝트 루트/uploads (개발챗봇과 동일)
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir))
UPLOAD_DIR = os.path.join(ROOT_DIR, "uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# OpenAI API 키 설정
openai.api_key = os.getenv("OPENAI_API_KEY")
def translate_text_with_gpt(text: str) -> str:
"""ChatGPT를 사용하여 텍스트를 한글에서 영어로 번역 (채팅용)"""
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.chat.completions.create(
model="gpt-5",
messages=[
{
"role": "system",
"content": TOOL_INFO["system_prompt"]
},
{
"role": "user",
"content": f"다음 한국어 텍스트를 영어로 번역해주세요:\n\n{text}"
}
]
)
return response.choices[0].message.content
except Exception as e:
print(f"번역 오류: {e}")
return f"번역 오류가 발생했습니다: {str(e)}"
# (Ollama 내부 모델 번역 기능 제거)
def translate_paragraph(paragraph: str) -> str:
"""단일 단락을 번역"""
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.chat.completions.create(
model="gpt-5",
messages=[
{
"role": "system",
"content": "당신은 전문 번역가입니다. 한국어 문단을 영어로 정확하고 자연스럽게 번역해주세요. 번역문만 제공하세요."
},
{
"role": "user",
"content": f"다음 한국어 문단을 영어로 번역해주세요:\n\n{paragraph}"
}
]
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"단락 번역 오류: {e}")
return f"[번역 오류: {str(e)}]"
def extract_paragraphs_from_docx(file_path: str) -> List[str]:
"""Word 문서에서 단락별 텍스트 추출"""
try:
doc = DocxDocument(file_path)
paragraphs = []
for paragraph in doc.paragraphs:
if paragraph.text.strip():
paragraphs.append(paragraph.text.strip())
return paragraphs
except Exception as e:
raise HTTPException(status_code=400, detail=f"Word 파일 읽기 오류: {str(e)}")
def create_bilingual_docx(original_paragraphs: List[str], translated_paragraphs: List[str], output_path: str):
"""원본과 번역이 번갈아 나타나는 이중언어 Word 문서 생성"""
try:
doc = DocxDocument()
# 각 단락별로 원본과 번역을 번갈아 추가
for i, (original, translated) in enumerate(zip(original_paragraphs, translated_paragraphs)):
# 원본 단락 추가 (굵은 글씨로)
original_para = doc.add_paragraph()
original_run = original_para.add_run(original)
original_run.bold = True
# 번역 단락 추가 (일반 글씨로)
translated_para = doc.add_paragraph(translated)
# 단락 사이에 여백 추가 (마지막 단락이 아닌 경우)
if i < len(original_paragraphs) - 1:
doc.add_paragraph("") # 빈 줄 추가
doc.save(output_path)
except Exception as e:
raise HTTPException(status_code=500, detail=f"이중언어 파일 생성 오류: {str(e)}")
def prepare_context(question: str, vector_store=None, model: str = "external") -> str:
"""채팅용 컨텍스트 준비 (실시간 번역)"""
if not question.strip():
return ""
# 모델 선택에 따른 번역 처리
translated = translate_text_with_gpt(question)
return translated
# FastAPI Router
router = APIRouter()
@router.post("/upload_doc")
async def upload_document(files: List[UploadFile] = File(...)):
"""MS Word 문서 업로드 및 번역 처리"""
if not files:
raise HTTPException(status_code=400, detail="파일이 업로드되지 않았습니다.")
processed_files = []
for file in files:
# MS Word 파일 확인
if not (file.filename.lower().endswith('.doc') or file.filename.lower().endswith('.docx')):
raise HTTPException(status_code=400, detail="MS Word 파일(.doc, .docx)만 업로드 가능합니다.")
# 파일명 생성: [doc_translation]_admin_YYYYMMDDHHMMSSMMM_원본파일명
timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] # 밀리초 3자리까지
original_name = os.path.splitext(file.filename)[0]
extension = os.path.splitext(file.filename)[1]
uploaded_filename = f"[{TOOL_ID}]_admin_{timestamp}_{original_name}{extension}"
uploaded_path = os.path.join(UPLOAD_DIR, uploaded_filename)
# 파일 저장
try:
with open(uploaded_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 단락별 텍스트 추출
paragraphs = extract_paragraphs_from_docx(uploaded_path)
if not paragraphs:
raise HTTPException(status_code=400, detail="문서에 번역할 텍스트가 없습니다.")
# 각 단락을 개별적으로 번역
translated_paragraphs = []
for paragraph in paragraphs:
translated = translate_paragraph(paragraph)
translated_paragraphs.append(translated)
# 이중언어 문서 저장 (원본 + 번역)
result_filename = f"[{TOOL_ID}]_admin_{timestamp}_{original_name}_결과{extension}"
result_path = os.path.join(UPLOAD_DIR, result_filename)
create_bilingual_docx(paragraphs, translated_paragraphs, result_path)
processed_files.append({
"original_filename": uploaded_filename,
"result_filename": result_filename,
"status": "success"
})
except Exception as e:
# 업로드된 파일이 있다면 삭제
if os.path.exists(uploaded_path):
os.remove(uploaded_path)
raise HTTPException(status_code=500, detail=f"파일 처리 오류: {str(e)}")
return {"status": "success", "files": processed_files}
@router.get("/files")
async def list_files():
"""업로드된 파일 목록 조회"""
try:
files = []
for filename in os.listdir(UPLOAD_DIR):
if filename.startswith(f"[{TOOL_ID}]") and not filename.endswith("_결과.doc") and not filename.endswith("_결과.docx"):
file_path = os.path.join(UPLOAD_DIR, filename)
stat = os.stat(file_path)
# 결과 파일 존재 여부 확인
# 파일명에서 확장자 분리
base_name, ext = os.path.splitext(filename)
result_filename = f"{base_name}_결과{ext}"
result_exists = os.path.exists(os.path.join(UPLOAD_DIR, result_filename))
files.append({
"filename": filename,
"upload_time": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"size": stat.st_size,
"has_result": result_exists,
"result_filename": result_filename if result_exists else None
})
# 업로드 시간 순으로 정렬 (최신순)
files.sort(key=lambda x: x["upload_time"], reverse=True)
return files
except Exception as e:
raise HTTPException(status_code=500, detail=f"파일 목록 조회 오류: {str(e)}")
@router.get("/download/{filename}")
async def download_file(filename: str):
"""번역된 파일 다운로드"""
file_path = os.path.join(UPLOAD_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다.")
return FileResponse(
path=file_path,
filename=filename,
media_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
@router.post("/translate_chat")
async def translate_chat(message: str = Form(...)):
"""실시간 채팅 번역"""
if not message.strip():
raise HTTPException(status_code=400, detail="번역할 메시지가 없습니다.")
try:
translated = translate_text_with_gpt(message)
return {"translated_text": translated}
except Exception as e:
raise HTTPException(status_code=500, detail=f"번역 오류: {str(e)}")
@router.delete("/files/{filename}")
async def delete_file(filename: str):
"""파일 삭제 (서버 저장 파일명으로)"""
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
os.remove(file_path)
# 해당하는 결과 파일도 삭제
if not filename.endswith("_결과.doc") and not filename.endswith("_결과.docx"):
base_name, ext = os.path.splitext(filename)
result_filename = f"{base_name}_결과{ext}"
result_path = os.path.join(UPLOAD_DIR, result_filename)
if os.path.exists(result_path):
os.remove(result_path)
return {"status": "success", "message": "파일이 삭제되었습니다."}
@router.delete("/delete_by_original_name/{original_filename}")
async def delete_file_by_original_name(original_filename: str):
"""파일 삭제 (원본 파일명으로) - 외부 접근용"""
try:
# 업로드된 파일 중에서 원본 파일명과 일치하는 파일 찾기
deleted_files = []
for filename in os.listdir(UPLOAD_DIR):
if filename.startswith(f"[{TOOL_ID}]"):
# 실제 파일명 추출
pattern = r'^\[doc_translation\]_admin_\d{14,17}_(.+)$'
match = re.match(pattern, filename)
if match and match.group(1) == original_filename:
# 원본 파일 삭제
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
os.remove(file_path)
deleted_files.append(filename)
# 결과 파일도 삭제
if not filename.endswith("_결과.doc") and not filename.endswith("_결과.docx"):
base_name, ext = os.path.splitext(filename)
result_filename = f"{base_name}_결과{ext}"
result_path = os.path.join(UPLOAD_DIR, result_filename)
if os.path.exists(result_path):
os.remove(result_path)
deleted_files.append(result_filename)
if deleted_files:
return {"status": "success", "message": f"파일이 삭제되었습니다.", "deleted_files": deleted_files}
else:
return {"status": "error", "message": "해당하는 파일을 찾을 수 없습니다."}
except Exception as e:
raise HTTPException(status_code=500, detail=f"파일 삭제 오류: {str(e)}")
@router.get("/list_files")
async def list_all_files():
"""모든 파일 목록 조회 (원본 파일명으로) - 외부 접근용"""
try:
files = []
for filename in os.listdir(UPLOAD_DIR):
if filename.startswith(f"[{TOOL_ID}]") and not filename.endswith("_결과.doc") and not filename.endswith("_결과.docx"):
# 실제 파일명 추출
pattern = r'^\[doc_translation\]_admin_\d{14,17}_(.+)$'
match = re.match(pattern, filename)
if match:
original_name = match.group(1)
file_path = os.path.join(UPLOAD_DIR, filename)
stat = os.stat(file_path)
# 결과 파일 존재 여부 확인
base_name, ext = os.path.splitext(filename)
result_filename = f"{base_name}_결과{ext}"
result_exists = os.path.exists(os.path.join(UPLOAD_DIR, result_filename))
files.append({
"original_filename": original_name,
"server_filename": filename,
"upload_time": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"size": stat.st_size,
"has_result": result_exists,
"result_filename": result_filename if result_exists else None
})
# 업로드 시간 순으로 정렬 (최신순)
files.sort(key=lambda x: x["upload_time"], reverse=True)
return {"status": "success", "files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=f"파일 목록 조회 오류: {str(e)}")