961 lines
38 KiB
Python
961 lines
38 KiB
Python
"""backend.app
|
||
FastAPI 엔트리포인트.
|
||
|
||
• 공통 미들웨어/CORS 및 로거 설정
|
||
• 엔진 레지스트리(engines.__init__)에서 TOOLS 를 가져와 `/tools` API 에 노출
|
||
• 개발챗봇(dev_chatbot) Router 포함 – PDF 업로드·벡터검색 등 전용 API 분리
|
||
• `/chat` 하나의 엔드포인트로 모든 도구 요청을 처리하며,
|
||
- 세션 관리(tool_sessions)
|
||
- 이미지 OCR
|
||
- knowledge_mode(kb_only/hybrid) 분기
|
||
- 각 엔진별 prepare_context 로 컨텍스트 생성
|
||
• 실시간 로그(chat.log) 저장
|
||
|
||
코드를 길게 설명하기보다, 각 섹션(Import / 설정 / 유틸 / Router 등록 / 엔드포인트) 위에
|
||
블록 주석을 배치해 가독성을 높였다.
|
||
"""
|
||
from fastapi import FastAPI, Request, UploadFile, File, Form, HTTPException
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
import uvicorn
|
||
import os
|
||
from datetime import datetime
|
||
from typing import List, Dict, Optional
|
||
from PIL import Image
|
||
import json
|
||
import logging
|
||
import openai
|
||
from dotenv import load_dotenv
|
||
# --- LangChain 1.1+ 호환성 패치 (레거시 경로 alias) ---
|
||
import sys, importlib, types
|
||
try:
|
||
import langchain # 확인용
|
||
mappings = {
|
||
'langchain.docstore.document': 'langchain_community.docstore',
|
||
'langchain.text_splitter': 'langchain_text_splitters',
|
||
'langchain.callbacks': 'langchain_community.callbacks',
|
||
'langchain.callbacks.streaming_stdout': 'langchain_community.callbacks.streaming_stdout',
|
||
'langchain.prompts': 'langchain_core.prompts',
|
||
'langchain.output_parsers.openai_tools': 'langchain_community.output_parsers.openai_tools',
|
||
'langchain.tools': 'langchain_community.tools',
|
||
'langchain.tools.render': 'langchain_community.tools.render',
|
||
'langchain_ollama': None,
|
||
}
|
||
for old, new in mappings.items():
|
||
if old not in sys.modules:
|
||
if new:
|
||
try:
|
||
mod = importlib.import_module(new)
|
||
sys.modules[old] = mod
|
||
except Exception:
|
||
pass
|
||
else:
|
||
import types as _t
|
||
dummy = _t.ModuleType('langchain_ollama')
|
||
class _Dummy:
|
||
def __init__(self,*a,**kw):
|
||
raise ImportError('langchain_ollama removed')
|
||
dummy.OllamaLLM = _Dummy
|
||
dummy.OllamaEmbeddings = _Dummy
|
||
sys.modules['langchain_ollama']=dummy
|
||
# schema.Document alias
|
||
if 'langchain.schema' not in sys.modules:
|
||
try:
|
||
from langchain_core.documents import Document as _Doc
|
||
schema_mod = types.ModuleType('langchain.schema')
|
||
schema_mod.Document = _Doc
|
||
sys.modules['langchain.schema'] = schema_mod
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
import re
|
||
# PDF 관련 로더 미사용
|
||
from fastapi.responses import FileResponse
|
||
from fastapi.responses import StreamingResponse
|
||
import urllib.parse
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
import time
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
# GxP 챗봇 제거로 관련 컨트롤러 import 삭제
|
||
from engines.chatgpt_tool.controller.ChatGPTController import router as chatgpt_router
|
||
# .env 파일 로드 (프로젝트 루트에서)
|
||
load_dotenv()
|
||
|
||
# 환경 변수에서 API Key 가져오기
|
||
OPEN_API_KEY = os.getenv("OPENAI_API_KEY", "")
|
||
if not OPEN_API_KEY:
|
||
raise RuntimeError("OPENAI_API_KEY 환경 변수가 설정되어 있지 않습니다. .env 파일을 확인하세요.")
|
||
|
||
openai_client = openai.OpenAI(api_key=OPEN_API_KEY)
|
||
|
||
app = FastAPI(title="엔큐톡 AI 채팅 서버", version="1.0.0")
|
||
|
||
# CORS 설정 (credentials 포함 시 * 를 사용할 수 없음)
|
||
DEV_FRONT_URLS = [
|
||
"http://localhost:5173",
|
||
"http://127.0.0.1:5173",
|
||
"http://ncue.net:5173",
|
||
os.getenv("FRONTEND_ORIGIN", ""),
|
||
]
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=[o for o in DEV_FRONT_URLS if o],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
app.include_router(chatgpt_router)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
# 파일 경로 설정
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
# 프로젝트 루트 디렉터리 (backend 의 상위)
|
||
ROOT_DIR = os.path.dirname(BASE_DIR)
|
||
|
||
# uploads, chroma_db 를 프로젝트 루트로 이동했으므로 해당 경로를 사용
|
||
UPLOAD_DIR = os.path.join(ROOT_DIR, "uploads")
|
||
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||
|
||
# 벡터스토어 디렉터리 (chroma_db)
|
||
VECTOR_DIR = os.path.join(ROOT_DIR, "chroma_db")
|
||
os.makedirs(VECTOR_DIR, exist_ok=True)
|
||
|
||
# 도구별 세션 저장소
|
||
tool_sessions: Dict[str, List[Dict]] = {}
|
||
|
||
# circled number map 1-20
|
||
CIRCLED = {1:'①',2:'②',3:'③',4:'④',5:'⑤',6:'⑥',7:'⑦',8:'⑧',9:'⑨',10:'⑩',11:'⑪',12:'⑫',13:'⑬',14:'⑭',15:'⑮',16:'⑯',17:'⑰',18:'⑱',19:'⑲',20:'⑳'}
|
||
|
||
# 답변이 "정보 없음" 류의 부정적 응답인지 판별하는 간단한 휴리스틱
|
||
def is_negative_answer(text: str) -> bool:
|
||
"""사용자 질문에 대한 답변이 관련 정보를 찾지 못했음을 나타내는지 확인"""
|
||
if not text:
|
||
return True
|
||
lowers = text.lower()
|
||
negative_keywords = [
|
||
"없습니다", "없어요", "없다", "찾을 수 없", "파악할 수 없", "모르", "not found",
|
||
"정보가 없습니다", "관련 정보가 없습니다", "않습니다", "죄송", "포함되어 있지", "구체적인 설명", "구체적 설명", "구체적으로 언급"
|
||
]
|
||
return any(kw.lower() in lowers for kw in negative_keywords)
|
||
|
||
# 도구 정의
|
||
from engines import TOOLS
|
||
# dev_chatbot 전용 기능 가져오기
|
||
from engines.dev_chatbot import (
|
||
router as gc_router,
|
||
get_vector_store as gc_get_vector_store,
|
||
prepare_context as gc_prepare_context,
|
||
)
|
||
|
||
# doc_translation 전용 기능 가져오기
|
||
from engines.doc_translation import (
|
||
router as doc_translation_router,
|
||
prepare_context as doc_translation_prepare_context,
|
||
)
|
||
|
||
# (옵션) GxP 챗봇 벡터 DB 서비스 – 현재 모듈이 제거되었을 수 있으므로 더미 처리
|
||
try:
|
||
from engines.chatbot_gxp.service.GxPVectorDBService import GxPVectorDBService
|
||
except ModuleNotFoundError:
|
||
class GxPVectorDBService: # type: ignore
|
||
def __init__(self, *a, **kw):
|
||
pass
|
||
def similarity_search(self, *a, **kw):
|
||
return []
|
||
|
||
# FastAPI 라우터 등록
|
||
app.include_router(gc_router)
|
||
app.include_router(doc_translation_router, prefix="/doc_translation")
|
||
|
||
class ToolInfo(BaseModel):
|
||
id: str
|
||
name: str
|
||
description: str
|
||
|
||
@app.get("/")
|
||
async def root():
|
||
return {"message": "엔큐톡 AI 채팅 서버가 실행 중입니다."}
|
||
|
||
@app.get("/tools", response_model=List[ToolInfo])
|
||
async def get_tools():
|
||
# 프론트엔드 카드 표시 순서를 사용자 요구에 맞춰 개발챗봇 → GxP 챗봇 순으로 고정하고,
|
||
# 나머지 도구들은 기존 등록 순서를 그대로 유지합니다.
|
||
|
||
preferred_order = ["chatgpt", "chatbot_gxp"] # dev_chatbot 은 맨 뒤로 이동
|
||
|
||
# preferred_order 에 명시된 도구를 먼저, 이후 나머지 도구를 추가
|
||
ordered_ids = [tid for tid in preferred_order if tid in TOOLS]
|
||
|
||
# dev_chatbot 을 제외한 나머지 도구들 추가
|
||
ordered_ids += [tid for tid in TOOLS.keys() if tid not in ordered_ids and tid != "dev_chatbot"]
|
||
|
||
# 최종적으로 dev_chatbot 을 맨 뒤에 추가
|
||
if "dev_chatbot" in TOOLS:
|
||
ordered_ids.append("dev_chatbot")
|
||
|
||
return [
|
||
ToolInfo(id=tid, name=TOOLS[tid]["name"], description=TOOLS[tid]["description"])
|
||
for tid in ordered_ids
|
||
]
|
||
|
||
class ChatRequest(BaseModel):
|
||
message: str
|
||
tool_id: str
|
||
session_id: Optional[str] = None
|
||
|
||
class ChatResponse(BaseModel):
|
||
response: str
|
||
status: str = "success"
|
||
session_id: str
|
||
tool_name: str
|
||
|
||
def is_meaningful_text(text):
|
||
import re
|
||
# 한글 자모(ㄱㄴㅏ 등) 단독 포함 시 = 오타/무의미로 간주
|
||
if re.search(r'[\u3130-\u318F]', text):
|
||
return False
|
||
# 영어·한글·숫자만 추출 후 길이 판단
|
||
cleaned = re.sub(r'[^가-힣a-zA-Z0-9]', '', text)
|
||
# 반복문자(같은 글자 4회 이상 반복) 무의미
|
||
if re.search(r'(.)\1{3,}', cleaned):
|
||
return False
|
||
# 안내·설명 패턴 필터
|
||
if re.search(r'(이미지에서 추출|안내|설명|반환|추출하지 못했습니다)', text):
|
||
return False
|
||
# 너무 짧으면 의미 없음 (완성형 기준 8자 이상)
|
||
return len(cleaned) >= 8
|
||
|
||
def extract_ocr_text(image_context):
|
||
# 실제 OCR 결과만 추출
|
||
if image_context.startswith("\n이미지에서 추출한 텍스트:\n"):
|
||
extracted = image_context.split("\n이미지에서 추출한 텍스트:\n", 1)[-1].strip()
|
||
else:
|
||
extracted = image_context.strip()
|
||
# 안내문, 설명문, 특수문자 등 의미 없는 값 필터링
|
||
if not is_meaningful_text(extracted):
|
||
return "[이미지에서 텍스트를 추출하지 못했습니다.]"
|
||
return extracted
|
||
|
||
OCR_COMMANDS = ["텍스트 추출", "텍스트 추출해줘", "텍스트 추출해 주세요", "텍스트 추출해주세요"]
|
||
|
||
# 로그 디렉터리 및 로거 설정
|
||
LOG_DIR = os.path.join(ROOT_DIR, "logs")
|
||
os.makedirs(LOG_DIR, exist_ok=True)
|
||
|
||
LOGGER = logging.getLogger("chat_logger")
|
||
if not LOGGER.handlers:
|
||
LOGGER.setLevel(logging.INFO)
|
||
_fh = logging.FileHandler(os.path.join(LOG_DIR, "chat.log"), encoding="utf-8")
|
||
_fh.setFormatter(logging.Formatter('[%(asctime)s] %(message)s'))
|
||
LOGGER.addHandler(_fh)
|
||
|
||
def _safe_chat_log(payload: dict) -> None:
|
||
"""logs/chat.log 에 JSON 한 줄로 안전하게 기록한다.
|
||
phase(요청/응답), status, reason 등의 필드를 받아도 되고, 없어도 된다.
|
||
"""
|
||
try:
|
||
LOGGER.info(json.dumps(payload, ensure_ascii=False))
|
||
except Exception:
|
||
# 로깅 중 예외는 서비스 흐름에 영향 주지 않음
|
||
pass
|
||
|
||
# ------------------------
|
||
# Auth APIs (login/logout)
|
||
# ------------------------
|
||
|
||
class LoginDTO(BaseModel):
|
||
email: str
|
||
password: str
|
||
|
||
def _get_db_conn():
|
||
# .env 의 DATABASE_URL(예: postgresql://user:pw@host:port/dbname) 사용
|
||
dsn = os.getenv("DATABASE_URL") or os.getenv("POSTGRES_DSN")
|
||
if not dsn:
|
||
raise RuntimeError("DATABASE_URL 환경변수가 필요합니다")
|
||
return psycopg2.connect(dsn)
|
||
|
||
@app.post("/auth/login")
|
||
def auth_login(dto: LoginDTO):
|
||
try:
|
||
with _get_db_conn() as conn:
|
||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
# id 컬럼 → user_id 로 변경 반영
|
||
cur.execute(
|
||
"SELECT user_id, email FROM users WHERE email=%s AND password = crypt(%s, password)",
|
||
(dto.email, dto.password),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=401, detail="invalid credentials")
|
||
return {"user_id": row["user_id"], "email": row["email"]}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
@app.post("/auth/logout")
|
||
def auth_logout():
|
||
return {"ok": True}
|
||
|
||
# ------------------------
|
||
# Community: QnA Board APIs
|
||
# ------------------------
|
||
class QnaCreateDTO(BaseModel):
|
||
title: str
|
||
content: str
|
||
author_id: Optional[str] = None
|
||
author_email: Optional[str] = None
|
||
|
||
@app.get("/community/qna")
|
||
def list_qna():
|
||
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("SELECT id, title, content, created_at, updated_at, views, author_id FROM qna_board ORDER BY id DESC")
|
||
rows = cur.fetchall() or []
|
||
return rows
|
||
|
||
@app.post("/community/qna")
|
||
def create_qna(dto: QnaCreateDTO):
|
||
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
# resolve author_no from id/no/email
|
||
author_no = None
|
||
if dto.author_id is not None:
|
||
try:
|
||
num = int(dto.author_id)
|
||
cur.execute("SELECT no FROM users WHERE no=%s", (num,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
except Exception:
|
||
cur.execute("SELECT no FROM users WHERE user_id=%s", (dto.author_id,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None and dto.author_email:
|
||
cur.execute("SELECT no FROM users WHERE email=%s", (dto.author_email,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None:
|
||
# allow anonymous with default author from env
|
||
default_id = os.getenv("DEFAULT_QNA_AUTHOR_ID", "admin")
|
||
default_email = os.getenv("DEFAULT_QNA_AUTHOR_EMAIL")
|
||
if default_email:
|
||
cur.execute("SELECT no FROM users WHERE email=%s", (default_email,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None and default_id:
|
||
cur.execute("SELECT no FROM users WHERE user_id=%s", (default_id,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None:
|
||
raise HTTPException(status_code=400, detail="author not found; set DEFAULT_QNA_AUTHOR_ID or pass author_email/id")
|
||
cur.execute(
|
||
"INSERT INTO qna_board (title, content, author_id) VALUES (%s, %s, %s) RETURNING id",
|
||
(dto.title, dto.content, author_no),
|
||
)
|
||
new_id = cur.fetchone()["id"]
|
||
conn.commit()
|
||
return {"id": new_id}
|
||
|
||
@app.get("/community/qna/{qid}")
|
||
def get_qna(qid: int, increase: int = 1):
|
||
"""단건 조회. increase=1 일 때 조회수 +1 처리 후 반환."""
|
||
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
if increase:
|
||
cur.execute("UPDATE qna_board SET views=views+1, updated_at=NOW() WHERE id=%s", (qid,))
|
||
conn.commit()
|
||
cur.execute("SELECT id, title, content, created_at, updated_at, views, author_id FROM qna_board WHERE id=%s", (qid,))
|
||
row = cur.fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="not found")
|
||
return row
|
||
|
||
# ------------------------
|
||
# Community: AI News Board APIs
|
||
# ------------------------
|
||
class AiNewsCreateDTO(BaseModel):
|
||
url: str
|
||
author_id: Optional[str] = None
|
||
author_email: Optional[str] = None
|
||
|
||
def _pick_html_encoding(resp: requests.Response) -> str:
|
||
"""HTML 인코딩 추정 순서(우선순위 높음 → 낮음)
|
||
1) Content-Type 헤더 charset
|
||
2) requests가 파싱한 resp.encoding (단, iso-8859-1 디폴트는 제외)
|
||
3) HTML 내부 <meta charset=...> / <meta http-equiv="content-type" ... charset=...>
|
||
4) resp.apparent_encoding (chardet/charset_normalizer)
|
||
5) utf-8
|
||
"""
|
||
# 1) header charset
|
||
ct = (resp.headers.get("content-type") or "").lower()
|
||
m = re.search(r"charset\s*=\s*([a-z0-9_\-]+)", ct, re.IGNORECASE)
|
||
if m:
|
||
return m.group(1)
|
||
|
||
# 2) requests chosen encoding (but ignore the common default)
|
||
enc = resp.encoding
|
||
if enc and enc.lower() != "iso-8859-1":
|
||
return enc
|
||
|
||
# 3) meta charset sniff (scan first 64KB)
|
||
try:
|
||
head = resp.content[:65536]
|
||
# decode as latin-1 to preserve byte values 0-255 one-to-one
|
||
head_text = head.decode("latin-1", errors="ignore")
|
||
m1 = re.search(r"<meta[^>]+charset\s*=\s*['\"]?\s*([a-z0-9_\-]+)\s*['\"]?", head_text, re.IGNORECASE)
|
||
if m1:
|
||
return m1.group(1)
|
||
m2 = re.search(r"charset\s*=\s*([a-z0-9_\-]+)", head_text, re.IGNORECASE)
|
||
if m2:
|
||
return m2.group(1)
|
||
except Exception:
|
||
pass
|
||
|
||
# 4) heuristic
|
||
try:
|
||
if resp.apparent_encoding:
|
||
return resp.apparent_encoding
|
||
except Exception:
|
||
pass
|
||
|
||
# 5) fallback
|
||
return "utf-8"
|
||
|
||
|
||
def _decode_html(resp: requests.Response) -> str:
|
||
encoding = _pick_html_encoding(resp)
|
||
try:
|
||
return resp.content.decode(encoding or "utf-8", errors="replace")
|
||
except Exception:
|
||
return resp.content.decode("utf-8", errors="replace")
|
||
|
||
def _extract_og(url: str) -> dict:
|
||
meta = {"title": "", "description": "", "image": "", "url": url}
|
||
try:
|
||
# 외부 사이트 응답 지연이 잦아 타임아웃을 짧게 유지(캐시 + 병렬로 커버)
|
||
resp = requests.get(url, timeout=2.5, headers={"User-Agent": "Mozilla/5.0"})
|
||
if resp.ok:
|
||
html = _decode_html(resp)
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
og_title = soup.find("meta", property="og:title")
|
||
og_desc = soup.find("meta", property="og:description")
|
||
og_img = soup.find("meta", property="og:image")
|
||
title_tag = soup.find("title")
|
||
meta["title"] = (og_title["content"].strip() if og_title and og_title.get("content") else (title_tag.text.strip() if title_tag else ""))
|
||
meta["description"] = (og_desc["content"].strip() if og_desc and og_desc.get("content") else "")
|
||
meta["image"] = (og_img["content"].strip() if og_img and og_img.get("content") else "")
|
||
except Exception:
|
||
pass
|
||
return meta
|
||
|
||
# --- OG cache (in-memory) ---
|
||
# 목적: 리스트 로딩 시 매번 외부 사이트를 때려 3~10초 지연되는 문제를 방지.
|
||
# 운영 시 Redis 같은 외부 캐시가 더 좋지만, 우선 체감 개선을 위해 프로세스 메모리 캐시를 사용한다.
|
||
_OG_CACHE_LOCK = threading.Lock()
|
||
_OG_CACHE: Dict[str, Dict[str, object]] = {} # url -> {"ts": float, "meta": dict}
|
||
_OG_CACHE_TTL_SEC = float(os.getenv("OG_CACHE_TTL_SEC", "3600")) # default 1h
|
||
_OG_CACHE_MAX = int(os.getenv("OG_CACHE_MAX", "2000"))
|
||
_OG_CACHE_VERSION = 2
|
||
|
||
def _extract_og_cached(url: str) -> dict:
|
||
now = time.time()
|
||
with _OG_CACHE_LOCK:
|
||
ent = _OG_CACHE.get(url)
|
||
if (
|
||
ent
|
||
and ent.get("v") == _OG_CACHE_VERSION
|
||
and (now - float(ent.get("ts", 0))) < _OG_CACHE_TTL_SEC
|
||
):
|
||
return ent.get("meta") or {"title": "", "description": "", "image": "", "url": url}
|
||
|
||
meta = _extract_og(url)
|
||
|
||
with _OG_CACHE_LOCK:
|
||
_OG_CACHE[url] = {"ts": now, "meta": meta, "v": _OG_CACHE_VERSION}
|
||
# 단순한 크기 제한 (초과 시 오래된 엔트리부터 정리)
|
||
if len(_OG_CACHE) > _OG_CACHE_MAX:
|
||
items = sorted(_OG_CACHE.items(), key=lambda kv: float(kv[1].get("ts", 0)))
|
||
for k, _ in items[: max(1, len(_OG_CACHE) - _OG_CACHE_MAX)]:
|
||
_OG_CACHE.pop(k, None)
|
||
return meta
|
||
|
||
@app.get("/community/ai_news")
|
||
def list_ai_news(offset: int = 0, limit: int = 10):
|
||
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute(
|
||
"SELECT id, url, created_at, updated_at, views, author_id FROM ai_news_board ORDER BY id DESC LIMIT %s OFFSET %s",
|
||
(limit, offset),
|
||
)
|
||
rows = cur.fetchall() or []
|
||
# OG 메타는 외부 요청이므로 병렬 + 캐시로 속도 개선
|
||
metas: Dict[str, dict] = {}
|
||
urls = [r.get("url") for r in rows if r.get("url")]
|
||
if urls:
|
||
# limit이 커져도 서버를 과도하게 압박하지 않도록 상한
|
||
max_workers = min(8, len(urls))
|
||
with ThreadPoolExecutor(max_workers=max_workers) as ex:
|
||
fut_map = {ex.submit(_extract_og_cached, u): u for u in urls}
|
||
for fut in as_completed(fut_map):
|
||
u = fut_map[fut]
|
||
try:
|
||
metas[u] = fut.result() or {"title": "", "description": "", "image": "", "url": u}
|
||
except Exception:
|
||
metas[u] = {"title": "", "description": "", "image": "", "url": u}
|
||
|
||
enriched = []
|
||
for r in rows:
|
||
u = r.get("url")
|
||
r.update({"meta": metas.get(u) if u else {"title": "", "description": "", "image": "", "url": u}})
|
||
enriched.append(r)
|
||
# Frontend infinite-scroll safety:
|
||
# - Return `nextOffset: null` when there is no next page.
|
||
# - Otherwise return the next numeric offset.
|
||
if len(enriched) < limit:
|
||
next_offset = None
|
||
else:
|
||
next_offset = offset + len(enriched)
|
||
return {"items": enriched, "nextOffset": next_offset}
|
||
|
||
@app.post("/community/ai_news")
|
||
def create_ai_news(dto: AiNewsCreateDTO):
|
||
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
# resolve author
|
||
author_no = None
|
||
if dto.author_id is not None:
|
||
try:
|
||
num = int(dto.author_id)
|
||
cur.execute("SELECT no FROM users WHERE no=%s", (num,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
except Exception:
|
||
cur.execute("SELECT no FROM users WHERE user_id=%s", (dto.author_id,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None and dto.author_email:
|
||
cur.execute("SELECT no FROM users WHERE email=%s", (dto.author_email,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None:
|
||
default_id = os.getenv("DEFAULT_QNA_AUTHOR_ID", "admin")
|
||
cur.execute("SELECT no FROM users WHERE user_id=%s", (default_id,))
|
||
r = cur.fetchone()
|
||
if r:
|
||
author_no = r["no"]
|
||
if author_no is None:
|
||
raise HTTPException(status_code=400, detail="author not found")
|
||
cur.execute("INSERT INTO ai_news_board (url, author_id) VALUES (%s, %s) RETURNING id", (dto.url, author_no))
|
||
new_id = cur.fetchone()["id"]
|
||
conn.commit()
|
||
return {"id": new_id}
|
||
|
||
@app.get("/community/ai_news/{nid}")
|
||
def get_ai_news(nid: int, increase: int = 1):
|
||
with _get_db_conn() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
if increase:
|
||
cur.execute("UPDATE ai_news_board SET views=views+1, updated_at=NOW() WHERE id=%s", (nid,))
|
||
conn.commit()
|
||
cur.execute("SELECT id, url, created_at, updated_at, views, author_id FROM ai_news_board WHERE id=%s", (nid,))
|
||
row = cur.fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="not found")
|
||
row.update({"meta": _extract_og(row["url"])})
|
||
return row
|
||
|
||
@app.post("/chat", response_model=ChatResponse)
|
||
async def chat_endpoint(
|
||
message: str = Form(...),
|
||
tool_id: str = Form(...),
|
||
session_id: Optional[str] = Form(None),
|
||
image: List[UploadFile] = File(None),
|
||
model: str = Form("gpt-5"),
|
||
ocr_model: str = Form("none"),
|
||
knowledge_mode: str = Form("hybrid"),
|
||
):
|
||
try:
|
||
# 세션 ID를 가장 먼저 확보하여, 모든 분기에서 동일 ID로 로깅되도록 함
|
||
if not session_id:
|
||
# tool_id 가 잘못되었더라도 임시 세션 ID를 생성하여 추적 가능하게 처리
|
||
safe_tool = tool_id if tool_id else "unknown"
|
||
session_id = f"{safe_tool}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
|
||
|
||
# 요청 수신 로그 (항상 기록)
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "request",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"model": model,
|
||
"ocr_model": ocr_model,
|
||
"knowledge_mode": knowledge_mode,
|
||
"user_message": message,
|
||
})
|
||
|
||
# ---- 1) 입력 검증: 무의미한 메시지 필터링 ----
|
||
# ChatGPT, 문서번역 도구는 간단 인사말이나 모든 텍스트를 처리해야 하므로 필터를 우회
|
||
if tool_id not in ["chatgpt", "doc_translation"] and not is_meaningful_text(message):
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "response",
|
||
"status": "error",
|
||
"reason": "meaningless_text",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"user_message": message,
|
||
})
|
||
return ChatResponse(
|
||
response="질문이 명확하지 않습니다. 좀 더 구체적인 내용을 입력해 주세요.",
|
||
status="error",
|
||
session_id=session_id or "",
|
||
tool_name=""
|
||
)
|
||
# 도구 ID 검증
|
||
if tool_id not in TOOLS:
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "response",
|
||
"status": "error",
|
||
"reason": "invalid_tool_id",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"user_message": message,
|
||
})
|
||
return ChatResponse(
|
||
response="유효하지 않은 도구입니다.",
|
||
status="error",
|
||
session_id="",
|
||
tool_name=""
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# chatbot_gxp 은 GxPChatController.chat 로 위임처리 (중복 로직 제거)
|
||
# ------------------------------------------------------------------
|
||
if tool_id == "chatbot_gxp":
|
||
# GxPChatController.chat 은 query, session_id 를 매개변수로 받음
|
||
gxp_resp = await GxPChatController.chat(query=message, session_id=session_id)
|
||
|
||
# 오류 코드 처리
|
||
if gxp_resp.status_code != 200:
|
||
try:
|
||
err_detail = json.loads(gxp_resp.body.decode())
|
||
err_msg = err_detail.get("error", "GxP 챗봇 처리 중 오류가 발생했습니다.")
|
||
except Exception:
|
||
err_msg = "GxP 챗봇 처리 중 오류가 발생했습니다."
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "response",
|
||
"status": "error",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"user_message": message,
|
||
"response": err_msg,
|
||
})
|
||
return ChatResponse(
|
||
response=err_msg,
|
||
status="error",
|
||
session_id=session_id,
|
||
tool_name=TOOLS[tool_id]["name"],
|
||
)
|
||
|
||
# 정상 응답 처리
|
||
data = json.loads(gxp_resp.body.decode()) if isinstance(gxp_resp.body, (bytes, bytearray)) else gxp_resp.body
|
||
answer_text = data.get("answer", "") if isinstance(data, dict) else str(data)
|
||
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "response",
|
||
"status": "success",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"user_message": message,
|
||
"response": answer_text[:1000],
|
||
})
|
||
|
||
return ChatResponse(
|
||
response=answer_text,
|
||
session_id=session_id,
|
||
tool_name=TOOLS[tool_id]["name"],
|
||
)
|
||
|
||
# 세션 초기화
|
||
if session_id not in tool_sessions:
|
||
tool_sessions[session_id] = []
|
||
# 이미지 처리 (생략, 기존 코드 유지)
|
||
image_context = ""
|
||
if image:
|
||
ocr_texts = []
|
||
for img in image:
|
||
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')}_{img.filename}"
|
||
image_path = os.path.join(UPLOAD_DIR, filename)
|
||
with open(image_path, "wb") as f:
|
||
f.write(await img.read())
|
||
try:
|
||
ocr_result = "[OCR 비활성화]"
|
||
if ocr_result:
|
||
ocr_texts.append(ocr_result)
|
||
except Exception as ocr_err:
|
||
ocr_texts.append(f"[OCR 실패: {ocr_err}]")
|
||
if ocr_texts:
|
||
image_context = f"\n이미지에서 추출한 텍스트:\n{chr(10).join(ocr_texts)}\n"
|
||
else:
|
||
image_context = "\n[이미지에서 텍스트를 추출하지 못했습니다.]\n"
|
||
# 대화 기록 구성
|
||
conversation_history = ""
|
||
if tool_sessions[session_id]:
|
||
history = tool_sessions[session_id][-5:]
|
||
conversation_history = "\n".join([
|
||
f"{'사용자' if msg['role'] == 'user' else 'AI'}: {msg['content']}"
|
||
for msg in history
|
||
])
|
||
# 프롬프트 구성
|
||
tool_info = TOOLS[tool_id]
|
||
system_prompt = tool_info["system_prompt"]
|
||
# 벡터 검색 (dev_chatbot 전용)
|
||
context_text = ""
|
||
retrieved_docs = []
|
||
if tool_id == "dev_chatbot":
|
||
context_text, retrieved_docs = gc_prepare_context(message, knowledge_mode, gc_get_vector_store())
|
||
if knowledge_mode == "kb_only" and not context_text:
|
||
return ChatResponse(response="지식베이스에 관련 정보가 없습니다.", session_id=session_id, tool_name=tool_info["name"])
|
||
|
||
# ---- GxP 챗봇 전용 벡터 검색 ----
|
||
if tool_id == "chatbot_gxp":
|
||
gxp_vec_service = GxPVectorDBService()
|
||
retrieved_docs = gxp_vec_service.similarity_search(query=message)
|
||
|
||
if retrieved_docs:
|
||
# 페이지별 최대 2개, 길이 1200자 제한 snippet 생성
|
||
page_groups = {}
|
||
for d in retrieved_docs:
|
||
meta = d.get("metadata", {}) if isinstance(d, dict) else d.metadata
|
||
pg = meta.get("page") or meta.get("page_number", "")
|
||
filename = meta.get("filename") or meta.get("source", "")
|
||
key = (filename, pg)
|
||
page_groups.setdefault(key, []).append(d.get("content") if isinstance(d, dict) else d.page_content)
|
||
selected = list(page_groups.keys())[:2]
|
||
snippets = []
|
||
for (fname, pg) in selected:
|
||
joined = "\n".join(page_groups[(fname, pg)])[:1200]
|
||
snippets.append(f"[출처:{fname} p{pg}]\n{joined}")
|
||
context_text = "\n\n[관련 문서 발췌]\n" + "\n---\n".join(snippets)
|
||
|
||
# ---- 문서번역 전용 처리 ----
|
||
if tool_id == "doc_translation":
|
||
# 실시간 채팅 번역 처리 (모델 선택 지원)
|
||
translated_response = doc_translation_prepare_context(message, model=model)
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "response",
|
||
"status": "success",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"user_message": message,
|
||
"response": translated_response[:1000],
|
||
})
|
||
return ChatResponse(response=translated_response, session_id=session_id, tool_name=tool_info["name"])
|
||
|
||
# kb_only 모드용 추가 지침
|
||
if knowledge_mode == "kb_only":
|
||
system_prompt += "\n\n주의: 반드시 위의 [관련 문서 발췌] 내용에 근거해서만 답변하고, 모르면 모른다고 답해라."
|
||
|
||
full_prompt = f"""{system_prompt}
|
||
|
||
{context_text}
|
||
|
||
{image_context}
|
||
{conversation_history}
|
||
|
||
사용자: {message}
|
||
AI:"""
|
||
response = ""
|
||
|
||
completion = openai_client.chat.completions.create(
|
||
model=model,
|
||
messages=[
|
||
{"role": "system", "content": system_prompt},
|
||
*( [{"role": "user" if msg['role']=="user" else "assistant", "content": msg['content']} for msg in tool_sessions[session_id][-5:]] if tool_sessions[session_id] else [] ),
|
||
{"role": "user", "content": (image_context + message) if image_context else message}
|
||
]
|
||
)
|
||
response = completion.choices[0].message.content.strip()
|
||
|
||
# ----------------------------------------------------------
|
||
# 공통: GxP 챗봇 근거 문서명/페이지 삽입
|
||
# ----------------------------------------------------------
|
||
if tool_id == "chatbot_gxp" and retrieved_docs:
|
||
cites = []
|
||
seen = set()
|
||
for d in retrieved_docs:
|
||
meta = d.get("metadata", {}) if isinstance(d, dict) else d.metadata
|
||
fname = meta.get("filename") or meta.get("source") or meta.get("path")
|
||
pg = meta.get("page") or meta.get("page_number")
|
||
if fname and pg and (fname, pg) not in seen:
|
||
cites.append(f"**(문서명: {fname}, 페이지: {pg})**")
|
||
seen.add((fname, pg))
|
||
if len(cites) >= 3:
|
||
break
|
||
if cites:
|
||
response = response.rstrip() + "\n\n" + "\n".join(cites)
|
||
|
||
# 응답 앞부분의 불필요한 사과 문구 제거 (의미 있는 답변인데 "죄송합니다" 로 시작하는 경우)
|
||
if re.match(r"^죄송합니다[.,]?\s*", response, flags=re.I) and not is_negative_answer(response):
|
||
response = re.sub(r"^죄송합니다[.,]?\s*", "", response, flags=re.I)
|
||
|
||
# 세션에 대화 기록 저장
|
||
tool_sessions[session_id].append({
|
||
"role": "user",
|
||
"content": message,
|
||
"timestamp": datetime.now().isoformat()
|
||
})
|
||
tool_sessions[session_id].append({
|
||
"role": "assistant",
|
||
"content": response,
|
||
"timestamp": datetime.now().isoformat()
|
||
})
|
||
# 세션 크기 제한 (최대 20개 메시지)
|
||
if len(tool_sessions[session_id]) > 20:
|
||
tool_sessions[session_id] = tool_sessions[session_id][-20:]
|
||
# dev_chatbot: 인라인 "출처:" 문구 제거 후 [출처] 블록 추가
|
||
if tool_id == "dev_chatbot" and retrieved_docs and not is_negative_answer(response):
|
||
# 1) 기존 응답에서 "출처:"가 포함된 구문/괄호 제거
|
||
response = re.sub(r"[\(\[]?출처[::][^\]\)\n]*[\)\]]?", "", response)
|
||
# '**출처**:' 형태의 블록 제거
|
||
response = re.sub(r"\*\*출처\*\*[::]?\s*(?:\n|\r|.)*?(?=\n{2,}|$)", "", response, flags=re.I)
|
||
# '**참고 문헌**' 블록 제거
|
||
response = re.sub(r"\*\*참고[^\n]*\n(?:- .*\n?)*", "", response, flags=re.I)
|
||
|
||
# 2) 파일별 페이지 모음 생성
|
||
file_pages = {}
|
||
for d in retrieved_docs:
|
||
src = d.metadata.get("source", "")
|
||
pg_raw = d.metadata.get("page_number", d.metadata.get("page", ""))
|
||
if src and str(pg_raw).isdigit():
|
||
file_pages.setdefault(src, set()).add(int(pg_raw))
|
||
|
||
if file_pages:
|
||
ref_lines = []
|
||
for src, pages in file_pages.items():
|
||
sorted_pages = sorted(pages)
|
||
# timestamp prefix 제거
|
||
display_src = src
|
||
ts_part = src.split("_",1)[0]
|
||
if len(src.split("_",1))>1 and ts_part.isdigit():
|
||
display_src = src.split("_",1)[1]
|
||
page_links = []
|
||
for p in sorted_pages:
|
||
page_num = p
|
||
fname_enc = urllib.parse.quote(src)
|
||
page_links.append(f"[p{page_num}](/pdf?filename={fname_enc}#page={page_num})")
|
||
ref_lines.append(f"{display_src}: " + ", ".join(page_links))
|
||
|
||
# 출처 블록 삽입 (중복 방지)
|
||
response = response.rstrip()
|
||
response += "\n\n[출처]\n" + "\n".join(ref_lines)
|
||
|
||
# 로깅
|
||
try:
|
||
LOGGER.info(json.dumps({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"tool_id": tool_id,
|
||
"tool_name": tool_info["name"],
|
||
"session_id": session_id,
|
||
"model": model,
|
||
"ocr_model": ocr_model,
|
||
"knowledge_mode": knowledge_mode,
|
||
"user_message": message,
|
||
"response": response[:1000] # 응답 길이 제한
|
||
}, ensure_ascii=False))
|
||
except Exception:
|
||
pass
|
||
|
||
# 이미지에서 텍스트 추출만 요청한 경우(프롬프트 없이 OCR 결과만 반환)
|
||
if image and message.strip() in OCR_COMMANDS:
|
||
ocr_only = extract_ocr_text(image_context)
|
||
_safe_chat_log({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"phase": "response",
|
||
"status": "success",
|
||
"tool_id": tool_id,
|
||
"session_id": session_id,
|
||
"user_message": message,
|
||
"response": ocr_only[:1000],
|
||
})
|
||
return ChatResponse(
|
||
response=ocr_only,
|
||
session_id=session_id,
|
||
tool_name=tool_info["name"]
|
||
)
|
||
return ChatResponse(
|
||
response=response,
|
||
session_id=session_id,
|
||
tool_name=tool_info["name"]
|
||
)
|
||
except Exception as e:
|
||
return ChatResponse(
|
||
response=f"오류가 발생했습니다: {str(e)}",
|
||
status="error",
|
||
session_id=session_id if 'session_id' in locals() else "",
|
||
tool_name=TOOLS.get(tool_id, {}).get("name", "") if 'tool_id' in locals() else ""
|
||
)
|
||
|
||
@app.get("/sessions/{tool_id}")
|
||
async def get_sessions(tool_id: str):
|
||
"""특정 도구의 세션 목록을 반환합니다."""
|
||
if tool_id not in TOOLS:
|
||
return {"error": "유효하지 않은 도구입니다."}
|
||
|
||
sessions = [
|
||
{
|
||
"session_id": session_id,
|
||
"message_count": len(messages),
|
||
"last_updated": messages[-1]["timestamp"] if messages else None
|
||
}
|
||
for session_id, messages in tool_sessions.items()
|
||
if session_id.startswith(f"{tool_id}_")
|
||
]
|
||
|
||
return {"sessions": sessions}
|
||
|
||
@app.delete("/sessions/{session_id}")
|
||
async def delete_session(session_id: str):
|
||
"""특정 세션을 삭제합니다."""
|
||
if session_id in tool_sessions:
|
||
del tool_sessions[session_id]
|
||
return {"message": "세션이 삭제되었습니다."}
|
||
return {"error": "세션을 찾을 수 없습니다."}
|
||
|
||
@app.get("/health")
|
||
async def health_check():
|
||
return {"status": "healthy"}
|
||
|
||
if __name__ == "__main__":
|
||
uvicorn.run(app, host="0.0.0.0", port=8010) |