Files
ncuetalk_backend/backend/app.py
2026-01-17 20:05:36 +09:00

914 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""backend.app
FastAPI 엔트리포인트.
• 공통 미들웨어/CORS 및 로거 설정
• 엔진 레지스트리(engines.__init__)에서 TOOLS 를 가져와 `/tools` API 에 노출
• 개발챗봇(dev_chatbot) Router 포함 PDF 업로드·벡터검색 등 전용 API 분리
• `/chat` 하나의 엔드포인트로 모든 도구 요청을 처리하며,
- 세션 관리(tool_sessions)
- 이미지 OCR
- knowledge_mode(kb_only/hybrid) 분기
- 각 엔진별 prepare_context 로 컨텍스트 생성
• 실시간 로그(chat.log) 저장
코드를 길게 설명하기보다, 각 섹션(Import / 설정 / 유틸 / Router 등록 / 엔드포인트) 위에
블록 주석을 배치해 가독성을 높였다.
"""
from fastapi import FastAPI, Request, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import os
from datetime import datetime
from typing import List, Dict, Optional
from PIL import Image
import json
import logging
import openai
from dotenv import load_dotenv
# --- LangChain 1.1+ 호환성 패치 (레거시 경로 alias) ---
import sys, importlib, types
try:
import langchain # 확인용
mappings = {
'langchain.docstore.document': 'langchain_community.docstore',
'langchain.text_splitter': 'langchain_text_splitters',
'langchain.callbacks': 'langchain_community.callbacks',
'langchain.callbacks.streaming_stdout': 'langchain_community.callbacks.streaming_stdout',
'langchain.prompts': 'langchain_core.prompts',
'langchain.output_parsers.openai_tools': 'langchain_community.output_parsers.openai_tools',
'langchain.tools': 'langchain_community.tools',
'langchain.tools.render': 'langchain_community.tools.render',
'langchain_ollama': None,
}
for old, new in mappings.items():
if old not in sys.modules:
if new:
try:
mod = importlib.import_module(new)
sys.modules[old] = mod
except Exception:
pass
else:
import types as _t
dummy = _t.ModuleType('langchain_ollama')
class _Dummy:
def __init__(self,*a,**kw):
raise ImportError('langchain_ollama removed')
dummy.OllamaLLM = _Dummy
dummy.OllamaEmbeddings = _Dummy
sys.modules['langchain_ollama']=dummy
# schema.Document alias
if 'langchain.schema' not in sys.modules:
try:
from langchain_core.documents import Document as _Doc
schema_mod = types.ModuleType('langchain.schema')
schema_mod.Document = _Doc
sys.modules['langchain.schema'] = schema_mod
except Exception:
pass
except Exception:
pass
import re
# PDF 관련 로더 미사용
from fastapi.responses import FileResponse
from fastapi.responses import StreamingResponse
import urllib.parse
import psycopg2
from psycopg2.extras import RealDictCursor
import requests
from bs4 import BeautifulSoup
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# GxP 챗봇 제거로 관련 컨트롤러 import 삭제
from engines.chatgpt_tool.controller.ChatGPTController import router as chatgpt_router
# .env 파일 로드 (프로젝트 루트에서)
load_dotenv()
# 환경 변수에서 API Key 가져오기
OPEN_API_KEY = os.getenv("OPENAI_API_KEY", "")
if not OPEN_API_KEY:
raise RuntimeError("OPENAI_API_KEY 환경 변수가 설정되어 있지 않습니다. .env 파일을 확인하세요.")
openai_client = openai.OpenAI(api_key=OPEN_API_KEY)
app = FastAPI(title="엔큐톡 AI 채팅 서버", version="1.0.0")
# CORS 설정 (credentials 포함 시 * 를 사용할 수 없음)
DEV_FRONT_URLS = [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://ncue.net:5173",
os.getenv("FRONTEND_ORIGIN", ""),
]
app.add_middleware(
CORSMiddleware,
allow_origins=[o for o in DEV_FRONT_URLS if o],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(chatgpt_router)
# 파일 경로 설정
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 프로젝트 루트 디렉터리 (backend 의 상위)
ROOT_DIR = os.path.dirname(BASE_DIR)
# uploads, chroma_db 를 프로젝트 루트로 이동했으므로 해당 경로를 사용
UPLOAD_DIR = os.path.join(ROOT_DIR, "uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# 벡터스토어 디렉터리 (chroma_db)
VECTOR_DIR = os.path.join(ROOT_DIR, "chroma_db")
os.makedirs(VECTOR_DIR, exist_ok=True)
# 도구별 세션 저장소
tool_sessions: Dict[str, List[Dict]] = {}
# circled number map 1-20
CIRCLED = {1:'',2:'',3:'',4:'',5:'',6:'',7:'',8:'',9:'',10:'',11:'',12:'',13:'',14:'',15:'',16:'',17:'',18:'',19:'',20:''}
# 답변이 "정보 없음" 류의 부정적 응답인지 판별하는 간단한 휴리스틱
def is_negative_answer(text: str) -> bool:
"""사용자 질문에 대한 답변이 관련 정보를 찾지 못했음을 나타내는지 확인"""
if not text:
return True
lowers = text.lower()
negative_keywords = [
"없습니다", "없어요", "없다", "찾을 수 없", "파악할 수 없", "모르", "not found",
"정보가 없습니다", "관련 정보가 없습니다", "않습니다", "죄송", "포함되어 있지", "구체적인 설명", "구체적 설명", "구체적으로 언급"
]
return any(kw.lower() in lowers for kw in negative_keywords)
# 도구 정의
from engines import TOOLS
# dev_chatbot 전용 기능 가져오기
from engines.dev_chatbot import (
router as gc_router,
get_vector_store as gc_get_vector_store,
prepare_context as gc_prepare_context,
)
# doc_translation 전용 기능 가져오기
from engines.doc_translation import (
router as doc_translation_router,
prepare_context as doc_translation_prepare_context,
)
# (옵션) GxP 챗봇 벡터 DB 서비스 현재 모듈이 제거되었을 수 있으므로 더미 처리
try:
from engines.chatbot_gxp.service.GxPVectorDBService import GxPVectorDBService
except ModuleNotFoundError:
class GxPVectorDBService: # type: ignore
def __init__(self, *a, **kw):
pass
def similarity_search(self, *a, **kw):
return []
# FastAPI 라우터 등록
app.include_router(gc_router)
app.include_router(doc_translation_router, prefix="/doc_translation")
class ToolInfo(BaseModel):
id: str
name: str
description: str
@app.get("/")
async def root():
return {"message": "엔큐톡 AI 채팅 서버가 실행 중입니다."}
@app.get("/tools", response_model=List[ToolInfo])
async def get_tools():
# 프론트엔드 카드 표시 순서를 사용자 요구에 맞춰 개발챗봇 → GxP 챗봇 순으로 고정하고,
# 나머지 도구들은 기존 등록 순서를 그대로 유지합니다.
preferred_order = ["chatgpt", "chatbot_gxp"] # dev_chatbot 은 맨 뒤로 이동
# preferred_order 에 명시된 도구를 먼저, 이후 나머지 도구를 추가
ordered_ids = [tid for tid in preferred_order if tid in TOOLS]
# dev_chatbot 을 제외한 나머지 도구들 추가
ordered_ids += [tid for tid in TOOLS.keys() if tid not in ordered_ids and tid != "dev_chatbot"]
# 최종적으로 dev_chatbot 을 맨 뒤에 추가
if "dev_chatbot" in TOOLS:
ordered_ids.append("dev_chatbot")
return [
ToolInfo(id=tid, name=TOOLS[tid]["name"], description=TOOLS[tid]["description"])
for tid in ordered_ids
]
class ChatRequest(BaseModel):
message: str
tool_id: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
response: str
status: str = "success"
session_id: str
tool_name: str
def is_meaningful_text(text):
import re
# 한글 자모(ㄱㄴㅏ 등) 단독 포함 시 = 오타/무의미로 간주
if re.search(r'[\u3130-\u318F]', text):
return False
# 영어·한글·숫자만 추출 후 길이 판단
cleaned = re.sub(r'[^가-힣a-zA-Z0-9]', '', text)
# 반복문자(같은 글자 4회 이상 반복) 무의미
if re.search(r'(.)\1{3,}', cleaned):
return False
# 안내·설명 패턴 필터
if re.search(r'(이미지에서 추출|안내|설명|반환|추출하지 못했습니다)', text):
return False
# 너무 짧으면 의미 없음 (완성형 기준 8자 이상)
return len(cleaned) >= 8
def extract_ocr_text(image_context):
# 실제 OCR 결과만 추출
if image_context.startswith("\n이미지에서 추출한 텍스트:\n"):
extracted = image_context.split("\n이미지에서 추출한 텍스트:\n", 1)[-1].strip()
else:
extracted = image_context.strip()
# 안내문, 설명문, 특수문자 등 의미 없는 값 필터링
if not is_meaningful_text(extracted):
return "[이미지에서 텍스트를 추출하지 못했습니다.]"
return extracted
OCR_COMMANDS = ["텍스트 추출", "텍스트 추출해줘", "텍스트 추출해 주세요", "텍스트 추출해주세요"]
# 로그 디렉터리 및 로거 설정
LOG_DIR = os.path.join(ROOT_DIR, "logs")
os.makedirs(LOG_DIR, exist_ok=True)
LOGGER = logging.getLogger("chat_logger")
if not LOGGER.handlers:
LOGGER.setLevel(logging.INFO)
_fh = logging.FileHandler(os.path.join(LOG_DIR, "chat.log"), encoding="utf-8")
_fh.setFormatter(logging.Formatter('[%(asctime)s] %(message)s'))
LOGGER.addHandler(_fh)
def _safe_chat_log(payload: dict) -> None:
"""logs/chat.log 에 JSON 한 줄로 안전하게 기록한다.
phase(요청/응답), status, reason 등의 필드를 받아도 되고, 없어도 된다.
"""
try:
LOGGER.info(json.dumps(payload, ensure_ascii=False))
except Exception:
# 로깅 중 예외는 서비스 흐름에 영향 주지 않음
pass
# ------------------------
# Auth APIs (login/logout)
# ------------------------
class LoginDTO(BaseModel):
email: str
password: str
def _get_db_conn():
# .env 의 DATABASE_URL(예: postgresql://user:pw@host:port/dbname) 사용
dsn = os.getenv("DATABASE_URL") or os.getenv("POSTGRES_DSN")
if not dsn:
raise RuntimeError("DATABASE_URL 환경변수가 필요합니다")
return psycopg2.connect(dsn)
@app.post("/auth/login")
def auth_login(dto: LoginDTO):
try:
with _get_db_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# id 컬럼 → user_id 로 변경 반영
cur.execute(
"SELECT user_id, email FROM users WHERE email=%s AND password = crypt(%s, password)",
(dto.email, dto.password),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=401, detail="invalid credentials")
return {"user_id": row["user_id"], "email": row["email"]}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/auth/logout")
def auth_logout():
return {"ok": True}
# ------------------------
# Community: QnA Board APIs
# ------------------------
class QnaCreateDTO(BaseModel):
title: str
content: str
author_id: Optional[str] = None
author_email: Optional[str] = None
@app.get("/community/qna")
def list_qna():
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT id, title, content, created_at, updated_at, views, author_id FROM qna_board ORDER BY id DESC")
rows = cur.fetchall() or []
return rows
@app.post("/community/qna")
def create_qna(dto: QnaCreateDTO):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
# resolve author_no from id/no/email
author_no = None
if dto.author_id is not None:
try:
num = int(dto.author_id)
cur.execute("SELECT no FROM users WHERE no=%s", (num,))
r = cur.fetchone()
if r:
author_no = r["no"]
except Exception:
cur.execute("SELECT no FROM users WHERE user_id=%s", (dto.author_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None and dto.author_email:
cur.execute("SELECT no FROM users WHERE email=%s", (dto.author_email,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
# allow anonymous with default author from env
default_id = os.getenv("DEFAULT_QNA_AUTHOR_ID", "admin")
default_email = os.getenv("DEFAULT_QNA_AUTHOR_EMAIL")
if default_email:
cur.execute("SELECT no FROM users WHERE email=%s", (default_email,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None and default_id:
cur.execute("SELECT no FROM users WHERE user_id=%s", (default_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
raise HTTPException(status_code=400, detail="author not found; set DEFAULT_QNA_AUTHOR_ID or pass author_email/id")
cur.execute(
"INSERT INTO qna_board (title, content, author_id) VALUES (%s, %s, %s) RETURNING id",
(dto.title, dto.content, author_no),
)
new_id = cur.fetchone()["id"]
conn.commit()
return {"id": new_id}
@app.get("/community/qna/{qid}")
def get_qna(qid: int, increase: int = 1):
"""단건 조회. increase=1 일 때 조회수 +1 처리 후 반환."""
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
if increase:
cur.execute("UPDATE qna_board SET views=views+1, updated_at=NOW() WHERE id=%s", (qid,))
conn.commit()
cur.execute("SELECT id, title, content, created_at, updated_at, views, author_id FROM qna_board WHERE id=%s", (qid,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="not found")
return row
# ------------------------
# Community: AI News Board APIs
# ------------------------
class AiNewsCreateDTO(BaseModel):
url: str
author_id: Optional[str] = None
author_email: Optional[str] = None
def _decode_html(resp: requests.Response) -> str:
encoding = resp.encoding
if not encoding or encoding.lower() == "iso-8859-1":
encoding = resp.apparent_encoding
try:
return resp.content.decode(encoding or "utf-8", errors="replace")
except Exception:
return resp.content.decode("utf-8", errors="replace")
def _extract_og(url: str) -> dict:
meta = {"title": "", "description": "", "image": "", "url": url}
try:
# 외부 사이트 응답 지연이 잦아 타임아웃을 짧게 유지(캐시 + 병렬로 커버)
resp = requests.get(url, timeout=2.5, headers={"User-Agent": "Mozilla/5.0"})
if resp.ok:
html = _decode_html(resp)
soup = BeautifulSoup(html, "html.parser")
og_title = soup.find("meta", property="og:title")
og_desc = soup.find("meta", property="og:description")
og_img = soup.find("meta", property="og:image")
title_tag = soup.find("title")
meta["title"] = (og_title["content"].strip() if og_title and og_title.get("content") else (title_tag.text.strip() if title_tag else ""))
meta["description"] = (og_desc["content"].strip() if og_desc and og_desc.get("content") else "")
meta["image"] = (og_img["content"].strip() if og_img and og_img.get("content") else "")
except Exception:
pass
return meta
# --- OG cache (in-memory) ---
# 목적: 리스트 로딩 시 매번 외부 사이트를 때려 3~10초 지연되는 문제를 방지.
# 운영 시 Redis 같은 외부 캐시가 더 좋지만, 우선 체감 개선을 위해 프로세스 메모리 캐시를 사용한다.
_OG_CACHE_LOCK = threading.Lock()
_OG_CACHE: Dict[str, Dict[str, object]] = {} # url -> {"ts": float, "meta": dict}
_OG_CACHE_TTL_SEC = float(os.getenv("OG_CACHE_TTL_SEC", "3600")) # default 1h
_OG_CACHE_MAX = int(os.getenv("OG_CACHE_MAX", "2000"))
def _extract_og_cached(url: str) -> dict:
now = time.time()
with _OG_CACHE_LOCK:
ent = _OG_CACHE.get(url)
if ent and (now - float(ent.get("ts", 0))) < _OG_CACHE_TTL_SEC:
return ent.get("meta") or {"title": "", "description": "", "image": "", "url": url}
meta = _extract_og(url)
with _OG_CACHE_LOCK:
_OG_CACHE[url] = {"ts": now, "meta": meta}
# 단순한 크기 제한 (초과 시 오래된 엔트리부터 정리)
if len(_OG_CACHE) > _OG_CACHE_MAX:
items = sorted(_OG_CACHE.items(), key=lambda kv: float(kv[1].get("ts", 0)))
for k, _ in items[: max(1, len(_OG_CACHE) - _OG_CACHE_MAX)]:
_OG_CACHE.pop(k, None)
return meta
@app.get("/community/ai_news")
def list_ai_news(offset: int = 0, limit: int = 10):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT id, url, created_at, updated_at, views, author_id FROM ai_news_board ORDER BY id DESC LIMIT %s OFFSET %s",
(limit, offset),
)
rows = cur.fetchall() or []
# OG 메타는 외부 요청이므로 병렬 + 캐시로 속도 개선
metas: Dict[str, dict] = {}
urls = [r.get("url") for r in rows if r.get("url")]
if urls:
# limit이 커져도 서버를 과도하게 압박하지 않도록 상한
max_workers = min(8, len(urls))
with ThreadPoolExecutor(max_workers=max_workers) as ex:
fut_map = {ex.submit(_extract_og_cached, u): u for u in urls}
for fut in as_completed(fut_map):
u = fut_map[fut]
try:
metas[u] = fut.result() or {"title": "", "description": "", "image": "", "url": u}
except Exception:
metas[u] = {"title": "", "description": "", "image": "", "url": u}
enriched = []
for r in rows:
u = r.get("url")
r.update({"meta": metas.get(u) if u else {"title": "", "description": "", "image": "", "url": u}})
enriched.append(r)
# Frontend infinite-scroll safety:
# - Return `nextOffset: null` when there is no next page.
# - Otherwise return the next numeric offset.
if len(enriched) < limit:
next_offset = None
else:
next_offset = offset + len(enriched)
return {"items": enriched, "nextOffset": next_offset}
@app.post("/community/ai_news")
def create_ai_news(dto: AiNewsCreateDTO):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
# resolve author
author_no = None
if dto.author_id is not None:
try:
num = int(dto.author_id)
cur.execute("SELECT no FROM users WHERE no=%s", (num,))
r = cur.fetchone()
if r:
author_no = r["no"]
except Exception:
cur.execute("SELECT no FROM users WHERE user_id=%s", (dto.author_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None and dto.author_email:
cur.execute("SELECT no FROM users WHERE email=%s", (dto.author_email,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
default_id = os.getenv("DEFAULT_QNA_AUTHOR_ID", "admin")
cur.execute("SELECT no FROM users WHERE user_id=%s", (default_id,))
r = cur.fetchone()
if r:
author_no = r["no"]
if author_no is None:
raise HTTPException(status_code=400, detail="author not found")
cur.execute("INSERT INTO ai_news_board (url, author_id) VALUES (%s, %s) RETURNING id", (dto.url, author_no))
new_id = cur.fetchone()["id"]
conn.commit()
return {"id": new_id}
@app.get("/community/ai_news/{nid}")
def get_ai_news(nid: int, increase: int = 1):
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
if increase:
cur.execute("UPDATE ai_news_board SET views=views+1, updated_at=NOW() WHERE id=%s", (nid,))
conn.commit()
cur.execute("SELECT id, url, created_at, updated_at, views, author_id FROM ai_news_board WHERE id=%s", (nid,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="not found")
row.update({"meta": _extract_og(row["url"])})
return row
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(
message: str = Form(...),
tool_id: str = Form(...),
session_id: Optional[str] = Form(None),
image: List[UploadFile] = File(None),
model: str = Form("gpt-5"),
ocr_model: str = Form("none"),
knowledge_mode: str = Form("hybrid"),
):
try:
# 세션 ID를 가장 먼저 확보하여, 모든 분기에서 동일 ID로 로깅되도록 함
if not session_id:
# tool_id 가 잘못되었더라도 임시 세션 ID를 생성하여 추적 가능하게 처리
safe_tool = tool_id if tool_id else "unknown"
session_id = f"{safe_tool}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
# 요청 수신 로그 (항상 기록)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "request",
"tool_id": tool_id,
"session_id": session_id,
"model": model,
"ocr_model": ocr_model,
"knowledge_mode": knowledge_mode,
"user_message": message,
})
# ---- 1) 입력 검증: 무의미한 메시지 필터링 ----
# ChatGPT, 문서번역 도구는 간단 인사말이나 모든 텍스트를 처리해야 하므로 필터를 우회
if tool_id not in ["chatgpt", "doc_translation"] and not is_meaningful_text(message):
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "error",
"reason": "meaningless_text",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
})
return ChatResponse(
response="질문이 명확하지 않습니다. 좀 더 구체적인 내용을 입력해 주세요.",
status="error",
session_id=session_id or "",
tool_name=""
)
# 도구 ID 검증
if tool_id not in TOOLS:
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "error",
"reason": "invalid_tool_id",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
})
return ChatResponse(
response="유효하지 않은 도구입니다.",
status="error",
session_id="",
tool_name=""
)
# ------------------------------------------------------------------
# chatbot_gxp 은 GxPChatController.chat 로 위임처리 (중복 로직 제거)
# ------------------------------------------------------------------
if tool_id == "chatbot_gxp":
# GxPChatController.chat 은 query, session_id 를 매개변수로 받음
gxp_resp = await GxPChatController.chat(query=message, session_id=session_id)
# 오류 코드 처리
if gxp_resp.status_code != 200:
try:
err_detail = json.loads(gxp_resp.body.decode())
err_msg = err_detail.get("error", "GxP 챗봇 처리 중 오류가 발생했습니다.")
except Exception:
err_msg = "GxP 챗봇 처리 중 오류가 발생했습니다."
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "error",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": err_msg,
})
return ChatResponse(
response=err_msg,
status="error",
session_id=session_id,
tool_name=TOOLS[tool_id]["name"],
)
# 정상 응답 처리
data = json.loads(gxp_resp.body.decode()) if isinstance(gxp_resp.body, (bytes, bytearray)) else gxp_resp.body
answer_text = data.get("answer", "") if isinstance(data, dict) else str(data)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "success",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": answer_text[:1000],
})
return ChatResponse(
response=answer_text,
session_id=session_id,
tool_name=TOOLS[tool_id]["name"],
)
# 세션 초기화
if session_id not in tool_sessions:
tool_sessions[session_id] = []
# 이미지 처리 (생략, 기존 코드 유지)
image_context = ""
if image:
ocr_texts = []
for img in image:
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')}_{img.filename}"
image_path = os.path.join(UPLOAD_DIR, filename)
with open(image_path, "wb") as f:
f.write(await img.read())
try:
ocr_result = "[OCR 비활성화]"
if ocr_result:
ocr_texts.append(ocr_result)
except Exception as ocr_err:
ocr_texts.append(f"[OCR 실패: {ocr_err}]")
if ocr_texts:
image_context = f"\n이미지에서 추출한 텍스트:\n{chr(10).join(ocr_texts)}\n"
else:
image_context = "\n[이미지에서 텍스트를 추출하지 못했습니다.]\n"
# 대화 기록 구성
conversation_history = ""
if tool_sessions[session_id]:
history = tool_sessions[session_id][-5:]
conversation_history = "\n".join([
f"{'사용자' if msg['role'] == 'user' else 'AI'}: {msg['content']}"
for msg in history
])
# 프롬프트 구성
tool_info = TOOLS[tool_id]
system_prompt = tool_info["system_prompt"]
# 벡터 검색 (dev_chatbot 전용)
context_text = ""
retrieved_docs = []
if tool_id == "dev_chatbot":
context_text, retrieved_docs = gc_prepare_context(message, knowledge_mode, gc_get_vector_store())
if knowledge_mode == "kb_only" and not context_text:
return ChatResponse(response="지식베이스에 관련 정보가 없습니다.", session_id=session_id, tool_name=tool_info["name"])
# ---- GxP 챗봇 전용 벡터 검색 ----
if tool_id == "chatbot_gxp":
gxp_vec_service = GxPVectorDBService()
retrieved_docs = gxp_vec_service.similarity_search(query=message)
if retrieved_docs:
# 페이지별 최대 2개, 길이 1200자 제한 snippet 생성
page_groups = {}
for d in retrieved_docs:
meta = d.get("metadata", {}) if isinstance(d, dict) else d.metadata
pg = meta.get("page") or meta.get("page_number", "")
filename = meta.get("filename") or meta.get("source", "")
key = (filename, pg)
page_groups.setdefault(key, []).append(d.get("content") if isinstance(d, dict) else d.page_content)
selected = list(page_groups.keys())[:2]
snippets = []
for (fname, pg) in selected:
joined = "\n".join(page_groups[(fname, pg)])[:1200]
snippets.append(f"[출처:{fname} p{pg}]\n{joined}")
context_text = "\n\n[관련 문서 발췌]\n" + "\n---\n".join(snippets)
# ---- 문서번역 전용 처리 ----
if tool_id == "doc_translation":
# 실시간 채팅 번역 처리 (모델 선택 지원)
translated_response = doc_translation_prepare_context(message, model=model)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "success",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": translated_response[:1000],
})
return ChatResponse(response=translated_response, session_id=session_id, tool_name=tool_info["name"])
# kb_only 모드용 추가 지침
if knowledge_mode == "kb_only":
system_prompt += "\n\n주의: 반드시 위의 [관련 문서 발췌] 내용에 근거해서만 답변하고, 모르면 모른다고 답해라."
full_prompt = f"""{system_prompt}
{context_text}
{image_context}
{conversation_history}
사용자: {message}
AI:"""
response = ""
completion = openai_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
*( [{"role": "user" if msg['role']=="user" else "assistant", "content": msg['content']} for msg in tool_sessions[session_id][-5:]] if tool_sessions[session_id] else [] ),
{"role": "user", "content": (image_context + message) if image_context else message}
]
)
response = completion.choices[0].message.content.strip()
# ----------------------------------------------------------
# 공통: GxP 챗봇 근거 문서명/페이지 삽입
# ----------------------------------------------------------
if tool_id == "chatbot_gxp" and retrieved_docs:
cites = []
seen = set()
for d in retrieved_docs:
meta = d.get("metadata", {}) if isinstance(d, dict) else d.metadata
fname = meta.get("filename") or meta.get("source") or meta.get("path")
pg = meta.get("page") or meta.get("page_number")
if fname and pg and (fname, pg) not in seen:
cites.append(f"**(문서명: {fname}, 페이지: {pg})**")
seen.add((fname, pg))
if len(cites) >= 3:
break
if cites:
response = response.rstrip() + "\n\n" + "\n".join(cites)
# 응답 앞부분의 불필요한 사과 문구 제거 (의미 있는 답변인데 "죄송합니다" 로 시작하는 경우)
if re.match(r"^죄송합니다[.,]?\s*", response, flags=re.I) and not is_negative_answer(response):
response = re.sub(r"^죄송합니다[.,]?\s*", "", response, flags=re.I)
# 세션에 대화 기록 저장
tool_sessions[session_id].append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
tool_sessions[session_id].append({
"role": "assistant",
"content": response,
"timestamp": datetime.now().isoformat()
})
# 세션 크기 제한 (최대 20개 메시지)
if len(tool_sessions[session_id]) > 20:
tool_sessions[session_id] = tool_sessions[session_id][-20:]
# dev_chatbot: 인라인 "출처:" 문구 제거 후 [출처] 블록 추가
if tool_id == "dev_chatbot" and retrieved_docs and not is_negative_answer(response):
# 1) 기존 응답에서 "출처:"가 포함된 구문/괄호 제거
response = re.sub(r"[\(\[]?출처[:][^\]\)\n]*[\)\]]?", "", response)
# '**출처**:' 형태의 블록 제거
response = re.sub(r"\*\*출처\*\*[:]?\s*(?:\n|\r|.)*?(?=\n{2,}|$)", "", response, flags=re.I)
# '**참고 문헌**' 블록 제거
response = re.sub(r"\*\*참고[^\n]*\n(?:- .*\n?)*", "", response, flags=re.I)
# 2) 파일별 페이지 모음 생성
file_pages = {}
for d in retrieved_docs:
src = d.metadata.get("source", "")
pg_raw = d.metadata.get("page_number", d.metadata.get("page", ""))
if src and str(pg_raw).isdigit():
file_pages.setdefault(src, set()).add(int(pg_raw))
if file_pages:
ref_lines = []
for src, pages in file_pages.items():
sorted_pages = sorted(pages)
# timestamp prefix 제거
display_src = src
ts_part = src.split("_",1)[0]
if len(src.split("_",1))>1 and ts_part.isdigit():
display_src = src.split("_",1)[1]
page_links = []
for p in sorted_pages:
page_num = p
fname_enc = urllib.parse.quote(src)
page_links.append(f"[p{page_num}](/pdf?filename={fname_enc}#page={page_num})")
ref_lines.append(f"{display_src}: " + ", ".join(page_links))
# 출처 블록 삽입 (중복 방지)
response = response.rstrip()
response += "\n\n[출처]\n" + "\n".join(ref_lines)
# 로깅
try:
LOGGER.info(json.dumps({
"timestamp": datetime.now().isoformat(),
"tool_id": tool_id,
"tool_name": tool_info["name"],
"session_id": session_id,
"model": model,
"ocr_model": ocr_model,
"knowledge_mode": knowledge_mode,
"user_message": message,
"response": response[:1000] # 응답 길이 제한
}, ensure_ascii=False))
except Exception:
pass
# 이미지에서 텍스트 추출만 요청한 경우(프롬프트 없이 OCR 결과만 반환)
if image and message.strip() in OCR_COMMANDS:
ocr_only = extract_ocr_text(image_context)
_safe_chat_log({
"timestamp": datetime.now().isoformat(),
"phase": "response",
"status": "success",
"tool_id": tool_id,
"session_id": session_id,
"user_message": message,
"response": ocr_only[:1000],
})
return ChatResponse(
response=ocr_only,
session_id=session_id,
tool_name=tool_info["name"]
)
return ChatResponse(
response=response,
session_id=session_id,
tool_name=tool_info["name"]
)
except Exception as e:
return ChatResponse(
response=f"오류가 발생했습니다: {str(e)}",
status="error",
session_id=session_id if 'session_id' in locals() else "",
tool_name=TOOLS.get(tool_id, {}).get("name", "") if 'tool_id' in locals() else ""
)
@app.get("/sessions/{tool_id}")
async def get_sessions(tool_id: str):
"""특정 도구의 세션 목록을 반환합니다."""
if tool_id not in TOOLS:
return {"error": "유효하지 않은 도구입니다."}
sessions = [
{
"session_id": session_id,
"message_count": len(messages),
"last_updated": messages[-1]["timestamp"] if messages else None
}
for session_id, messages in tool_sessions.items()
if session_id.startswith(f"{tool_id}_")
]
return {"sessions": sessions}
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
"""특정 세션을 삭제합니다."""
if session_id in tool_sessions:
del tool_sessions[session_id]
return {"message": "세션이 삭제되었습니다."}
return {"error": "세션을 찾을 수 없습니다."}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8010)