489 lines
17 KiB
Python
489 lines
17 KiB
Python
"""
|
|
LangChain v0.3 기반 연구QA 챗봇 API
|
|
향후 고도화를 위한 확장 가능한 아키텍처
|
|
"""
|
|
|
|
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel
|
|
from typing import List, Optional
|
|
from contextlib import asynccontextmanager
|
|
import os
|
|
import uuid
|
|
import shutil
|
|
from datetime import datetime
|
|
import json
|
|
import logging
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
# LangChain 서비스 임포트
|
|
from services.langchain_service import langchain_service
|
|
from parser.pdf.MainParser import PDFParser
|
|
|
|
# 로깅 설정
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Pydantic 모델들
|
|
class ChatRequest(BaseModel):
|
|
message: str
|
|
user_id: Optional[str] = None
|
|
|
|
class ChatResponse(BaseModel):
|
|
response: str
|
|
sources: List[str]
|
|
detailed_references: Optional[List[dict]] = []
|
|
timestamp: str
|
|
|
|
class FileUploadResponse(BaseModel):
|
|
message: str
|
|
file_id: str
|
|
filename: str
|
|
status: str
|
|
|
|
class FileListResponse(BaseModel):
|
|
files: List[dict]
|
|
total: int
|
|
|
|
# FastAPI 앱 생성
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""앱 시작/종료 시 실행"""
|
|
# 시작 시
|
|
logger.info("🚀 LangChain 기반 연구QA 챗봇 시작")
|
|
try:
|
|
langchain_service.initialize()
|
|
logger.info("✅ LangChain 서비스 초기화 완료")
|
|
except Exception as e:
|
|
logger.error(f"❌ LangChain 서비스 초기화 실패: {e}")
|
|
raise
|
|
|
|
yield
|
|
|
|
# 종료 시
|
|
logger.info("🛑 LangChain 기반 연구QA 챗봇 종료")
|
|
|
|
app = FastAPI(
|
|
title="연구QA Chatbot API",
|
|
description="LangChain v0.3 기반 고성능 PDF 파싱과 벡터 검색을 활용한 연구 질의응답 시스템",
|
|
version="2.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# CORS 설정
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:3000", "http://127.0.0.1:3000"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# 보안 설정
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
def get_db_connection():
|
|
"""PostgreSQL 데이터베이스 연결"""
|
|
try:
|
|
connection = psycopg2.connect(
|
|
host="localhost",
|
|
port=5432,
|
|
database="researchqa",
|
|
user="woonglab",
|
|
password="!@#woonglab"
|
|
)
|
|
connection.autocommit = True
|
|
return connection
|
|
except Exception as e:
|
|
logger.error(f"PostgreSQL 연결 실패: {e}")
|
|
raise HTTPException(status_code=500, detail="데이터베이스 연결 실패")
|
|
|
|
# API 엔드포인트들
|
|
@app.get("/")
|
|
async def root():
|
|
"""루트 엔드포인트"""
|
|
return {
|
|
"message": "LangChain 기반 연구QA 챗봇 API",
|
|
"version": "2.0.0",
|
|
"status": "running"
|
|
}
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""헬스 체크"""
|
|
try:
|
|
# LangChain 서비스 상태 확인
|
|
collection_info = langchain_service.get_collection_info()
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"langchain_service": "active",
|
|
"collection_info": collection_info,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"헬스 체크 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"서비스 상태 불량: {e}")
|
|
|
|
@app.post("/chat", response_model=ChatResponse)
|
|
async def chat(request: ChatRequest):
|
|
"""LangChain RAG 기반 채팅"""
|
|
try:
|
|
logger.info(f"💬 채팅 요청: {request.message}")
|
|
|
|
# LangChain RAG를 통한 답변 생성
|
|
result = langchain_service.generate_answer(request.message)
|
|
|
|
response = ChatResponse(
|
|
response=result["answer"],
|
|
sources=result["references"],
|
|
detailed_references=result.get("detailed_references", []),
|
|
timestamp=datetime.now().isoformat()
|
|
)
|
|
|
|
logger.info(f"✅ 답변 생성 완료: {len(result['references'])}개 참조")
|
|
logger.info(f"📋 전달할 sources: {result['references']}")
|
|
logger.info(f"📋 전달할 detailed_references: {len(result.get('detailed_references', []))}개")
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 채팅 처리 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"채팅 처리 실패: {e}")
|
|
|
|
@app.post("/upload", response_model=FileUploadResponse)
|
|
async def upload_file(file: UploadFile = File(...)):
|
|
"""PDF 파일 업로드 및 LangChain 처리"""
|
|
try:
|
|
# 파일 유효성 검사
|
|
if not file.filename.lower().endswith('.pdf'):
|
|
raise HTTPException(status_code=400, detail="PDF 파일만 업로드 가능합니다")
|
|
|
|
# 파일 ID 생성 (UUID)
|
|
file_id = str(uuid.uuid4())
|
|
filename = file.filename
|
|
|
|
logger.info(f"📄 파일 업로드 시작: {filename}")
|
|
|
|
# 파일 저장
|
|
upload_dir = "uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
file_path = os.path.join(upload_dir, f"{file_id}_{filename}")
|
|
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
# PDF 파싱 (단순한 방식)
|
|
parser = PDFParser()
|
|
result = parser.process_pdf(file_path)
|
|
|
|
# 중단 체크는 프론트엔드 AbortController로 처리
|
|
|
|
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=400, detail=f"PDF 파싱 실패: {result.get('error', 'Unknown error')}")
|
|
|
|
# 데이터베이스에 먼저 저장하여 정수 ID 획득
|
|
db_conn = get_db_connection()
|
|
cursor = db_conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO uploaded_file (filename, file_path, status, upload_dt)
|
|
VALUES (%s, %s, %s, %s) RETURNING id
|
|
""", (filename, file_path, "processing", datetime.now()))
|
|
|
|
# 삽입된 레코드의 정수 ID 가져오기
|
|
db_file_id = cursor.fetchone()[0]
|
|
cursor.close()
|
|
db_conn.close()
|
|
|
|
# 청크가 생성되었는지 확인
|
|
if not result["chunks"] or len(result["chunks"]) == 0:
|
|
raise HTTPException(status_code=400, detail="PDF에서 텍스트를 추출할 수 없습니다. 빈 문서이거나 텍스트가 없는 이미지 파일일 수 있습니다.")
|
|
|
|
# 중단 체크는 프론트엔드 AbortController로 처리
|
|
|
|
# 메타데이터가 포함된 청크가 있는 경우 사용
|
|
if "chunks_with_metadata" in result and result["chunks_with_metadata"]:
|
|
# 메타데이터가 포함된 청크를 벡터스토어에 추가 (정수 ID 사용)
|
|
additional_metadata = {
|
|
"file_id": str(db_file_id), # 정수 ID를 문자열로 저장
|
|
"upload_time": datetime.now().isoformat(),
|
|
"total_chunks": len(result["chunks"]),
|
|
"page_count": result["page_count"]
|
|
}
|
|
# 벡터스토어에 추가
|
|
langchain_service.add_documents_with_metadata(result["chunks_with_metadata"], additional_metadata)
|
|
|
|
# 중단 체크는 프론트엔드 AbortController로 처리
|
|
else:
|
|
# 기존 방식으로 폴백
|
|
from langchain_core.documents import Document
|
|
langchain_docs = []
|
|
|
|
# 청크별로 문서 생성
|
|
for i, chunk in enumerate(result["chunks"]):
|
|
# 청크가 속한 페이지 번호 계산 (간단한 추정)
|
|
estimated_page = min(i + 1, result["page_count"]) if result["page_count"] > 0 else 1
|
|
|
|
langchain_doc = Document(
|
|
page_content=chunk,
|
|
metadata={
|
|
"filename": filename,
|
|
"chunk_index": i,
|
|
"file_id": str(db_file_id), # 정수 ID를 문자열로 저장
|
|
"upload_time": datetime.now().isoformat(),
|
|
"total_chunks": len(result["chunks"]),
|
|
"page_count": result["page_count"],
|
|
"estimated_page": estimated_page
|
|
}
|
|
)
|
|
langchain_docs.append(langchain_doc)
|
|
|
|
# LangChain 벡터스토어에 추가
|
|
langchain_service.add_documents(langchain_docs)
|
|
|
|
# 중단 체크는 프론트엔드 AbortController로 처리
|
|
|
|
# 데이터베이스 상태를 'processed'로 업데이트
|
|
db_conn = get_db_connection()
|
|
cursor = db_conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE uploaded_file SET status = %s WHERE id = %s
|
|
""", ("processed", db_file_id))
|
|
|
|
cursor.close()
|
|
db_conn.close()
|
|
|
|
logger.info(f"✅ 파일 업로드 완료: {filename} (DB ID: {db_file_id}, UUID: {file_id})")
|
|
|
|
return FileUploadResponse(
|
|
message=f"파일 업로드 및 처리 완료: {len(result['chunks'])}개 청크",
|
|
file_id=str(db_file_id), # 정수 ID를 문자열로 반환
|
|
filename=filename,
|
|
status="success"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 파일 업로드 실패: {e}")
|
|
|
|
# vectordb 관련 오류인지 확인
|
|
error_message = str(e).lower()
|
|
if any(keyword in error_message for keyword in [
|
|
"readonly database",
|
|
"code: 1032",
|
|
"chromadb",
|
|
"vectorstore",
|
|
"collection",
|
|
"database error"
|
|
]):
|
|
logger.error(f"🔍 벡터DB 관련 오류 감지: {e}")
|
|
raise HTTPException(status_code=500, detail="벡터DB에 문제가 있습니다.")
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f"파일 업로드 실패: {e}")
|
|
|
|
@app.get("/files", response_model=FileListResponse)
|
|
async def get_files(search: str = None):
|
|
"""업로드된 파일 목록 조회 (검색 기능 포함)"""
|
|
try:
|
|
db_conn = get_db_connection()
|
|
cursor = db_conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
if search and search.strip():
|
|
# 검색어가 있는 경우 파일명으로 검색
|
|
search_term = f"%{search.strip()}%"
|
|
cursor.execute("""
|
|
SELECT id, filename, upload_dt as upload_time, status
|
|
FROM uploaded_file
|
|
WHERE filename ILIKE %s
|
|
ORDER BY upload_dt DESC
|
|
""", (search_term,))
|
|
else:
|
|
# 검색어가 없는 경우 전체 목록 조회
|
|
cursor.execute("""
|
|
SELECT id, filename, upload_dt as upload_time, status
|
|
FROM uploaded_file
|
|
ORDER BY upload_dt DESC
|
|
""")
|
|
|
|
files = cursor.fetchall()
|
|
cursor.close()
|
|
|
|
return FileListResponse(
|
|
files=[dict(file) for file in files],
|
|
total=len(files)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 파일 목록 조회 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"파일 목록 조회 실패: {e}")
|
|
|
|
@app.delete("/files/{file_id}")
|
|
async def delete_file(file_id: str):
|
|
"""파일 삭제"""
|
|
try:
|
|
db_conn = get_db_connection()
|
|
cursor = db_conn.cursor()
|
|
|
|
# 파일 정보 조회
|
|
cursor.execute("SELECT filename FROM uploaded_file WHERE id = %s", (file_id,))
|
|
result = cursor.fetchone()
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
|
|
|
|
filename = result[0]
|
|
|
|
# LangChain 벡터스토어에서 삭제
|
|
langchain_service.delete_documents_by_filename(filename)
|
|
|
|
# 데이터베이스에서 삭제
|
|
cursor.execute("DELETE FROM uploaded_file WHERE id = %s", (file_id,))
|
|
|
|
# 실제 파일 삭제
|
|
try:
|
|
os.remove(f"uploads/{file_id}_{filename}")
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
cursor.close()
|
|
|
|
logger.info(f"✅ 파일 삭제 완료: {filename}")
|
|
|
|
return {"message": f"파일 삭제 완료: {filename}"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 파일 삭제 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"파일 삭제 실패: {e}")
|
|
|
|
@app.get("/pdf/{file_id}/view")
|
|
@app.head("/pdf/{file_id}/view")
|
|
async def view_pdf(file_id: str):
|
|
"""PDF 파일 뷰어"""
|
|
try:
|
|
logger.info(f"📄 PDF 뷰어 요청: file_id={file_id}")
|
|
|
|
db_conn = get_db_connection()
|
|
cursor = db_conn.cursor()
|
|
|
|
# 모든 파일 조회하여 file_id 매칭
|
|
cursor.execute("SELECT id, filename, file_path FROM uploaded_file")
|
|
all_files = cursor.fetchall()
|
|
|
|
result = None
|
|
for file_row in all_files:
|
|
db_id, filename, file_path = file_row
|
|
|
|
# 정수 ID로 매칭 시도
|
|
try:
|
|
if int(file_id) == db_id:
|
|
result = (filename, file_path)
|
|
logger.info(f"📄 정수 ID로 매칭 성공: {filename}")
|
|
break
|
|
except ValueError:
|
|
pass
|
|
|
|
# UUID로 매칭 시도 (file_path에서 UUID 추출)
|
|
import re
|
|
uuid_pattern = r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})'
|
|
file_uuid_match = re.search(uuid_pattern, file_path, re.IGNORECASE)
|
|
|
|
if file_uuid_match:
|
|
file_uuid = file_uuid_match.group(1)
|
|
if file_id.lower() == file_uuid.lower():
|
|
result = (filename, file_path)
|
|
logger.info(f"📄 UUID로 매칭 성공: {filename} (UUID: {file_uuid})")
|
|
break
|
|
|
|
# 부분 매칭도 시도 (file_id가 file_path에 포함된 경우)
|
|
if file_id in file_path:
|
|
result = (filename, file_path)
|
|
logger.info(f"📄 부분 매칭 성공: {filename}")
|
|
break
|
|
|
|
if not result:
|
|
logger.error(f"❌ 파일을 찾을 수 없음: file_id={file_id}")
|
|
raise HTTPException(status_code=404, detail=f"파일을 찾을 수 없습니다: {file_id}")
|
|
|
|
filename, file_path = result
|
|
|
|
# 절대 경로로 변환
|
|
if not os.path.isabs(file_path):
|
|
file_path = os.path.abspath(file_path)
|
|
|
|
if not os.path.exists(file_path):
|
|
logger.error(f"❌ 파일이 존재하지 않음: {file_path}")
|
|
raise HTTPException(status_code=404, detail="파일이 존재하지 않습니다")
|
|
|
|
logger.info(f"✅ PDF 파일 반환: {filename} -> {file_path}")
|
|
cursor.close()
|
|
|
|
return FileResponse(
|
|
path=file_path,
|
|
media_type="application/pdf",
|
|
filename=filename
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ PDF 뷰어 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"PDF 뷰어 실패: {e}")
|
|
|
|
@app.get("/search")
|
|
async def search_documents(query: str, limit: int = 5):
|
|
"""문서 검색"""
|
|
try:
|
|
# LangChain 유사 문서 검색
|
|
documents = langchain_service.search_similar_documents(query, k=limit)
|
|
|
|
results = []
|
|
for doc in documents:
|
|
results.append({
|
|
"content": doc.page_content[:200] + "...",
|
|
"metadata": doc.metadata,
|
|
"score": getattr(doc, 'score', 0.0)
|
|
})
|
|
|
|
return {
|
|
"query": query,
|
|
"results": results,
|
|
"total": len(results)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 문서 검색 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"문서 검색 실패: {e}")
|
|
|
|
@app.get("/stats")
|
|
async def get_stats():
|
|
"""시스템 통계"""
|
|
try:
|
|
# LangChain 컬렉션 정보
|
|
collection_info = langchain_service.get_collection_info()
|
|
|
|
# 데이터베이스 통계
|
|
db_conn = get_db_connection()
|
|
cursor = db_conn.cursor()
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM uploaded_file")
|
|
file_count = cursor.fetchone()[0]
|
|
|
|
cursor.close()
|
|
|
|
return {
|
|
"langchain_stats": collection_info,
|
|
"database_stats": {
|
|
"total_files": file_count
|
|
},
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 통계 조회 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"통계 조회 실패: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |