researchqa/backend/services/langchain_service.py
2025-10-05 23:22:54 +09:00

276 lines
9.9 KiB
Python

"""
LangChain v0.3 기반 AI 서비스
향후 고도화를 위한 확장 가능한 아키텍처
"""
import os
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime
# LangChain Core
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_core.retrievers import BaseRetriever
from langchain_core.language_models import BaseLanguageModel
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
# LangChain Community
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_community.llms import Ollama
# LangChain Chains
from langchain.chains import RetrievalQA
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
# Database
import psycopg2
from psycopg2.extras import RealDictCursor
logger = logging.getLogger(__name__)
class LangChainRAGService:
"""LangChain 기반 RAG 서비스"""
def __init__(self):
self.embeddings: Optional[Embeddings] = None
self.vectorstore: Optional[VectorStore] = None
self.llm: Optional[BaseLanguageModel] = None
self.retriever: Optional[BaseRetriever] = None
self.qa_chain: Optional[Any] = None
self.db_connection = None
def initialize(self):
"""LangChain 컴포넌트 초기화"""
try:
# 임베딩 모델 초기화
self.embeddings = SentenceTransformerEmbeddings(
model_name="jhgan/ko-sroberta-multitask"
)
logger.info("✅ LangChain 임베딩 모델 로드 완료")
# ChromaDB 벡터스토어 초기화
self.vectorstore = Chroma(
persist_directory="./vectordb",
embedding_function=self.embeddings,
collection_name="research_documents"
)
logger.info("✅ LangChain ChromaDB 초기화 완료")
# Ollama LLM 초기화
self.llm = Ollama(
model="qwen3:latest",
base_url="http://localhost:11434"
)
logger.info("✅ LangChain Ollama LLM 초기화 완료")
# 리트리버 초기화
self.retriever = self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
logger.info("✅ LangChain 리트리버 초기화 완료")
# RAG 체인 구성
self._setup_rag_chain()
# 데이터베이스 연결
self._setup_database()
logger.info("🚀 LangChain RAG 서비스 초기화 완료")
except Exception as e:
logger.error(f"❌ LangChain 서비스 초기화 실패: {e}")
raise
def _setup_rag_chain(self):
"""RAG 체인 설정"""
try:
# 프롬프트 템플릿
prompt_template = """
다음 문서들을 참고하여 질문에 답변해주세요.
문서들:
{context}
질문: {input}
답변: 문서의 내용을 바탕으로 정확하고 상세하게 답변해주세요.
"""
prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "input"]
)
# 문서 체인 생성
document_chain = create_stuff_documents_chain(
llm=self.llm,
prompt=prompt
)
# RAG 체인 생성
self.qa_chain = create_retrieval_chain(
retriever=self.retriever,
combine_docs_chain=document_chain
)
logger.info("✅ RAG 체인 설정 완료")
except Exception as e:
logger.error(f"❌ RAG 체인 설정 실패: {e}")
raise
def _setup_database(self):
"""데이터베이스 연결 설정"""
try:
self.db_connection = psycopg2.connect(
host="localhost",
port=5432,
database="researchqa",
user="woonglab",
password="!@#woonglab"
)
self.db_connection.autocommit = True
logger.info("✅ PostgreSQL 연결 완료")
except Exception as e:
logger.error(f"❌ PostgreSQL 연결 실패: {e}")
raise
def add_documents(self, documents: List[Document], metadata: Dict[str, Any] = None):
"""문서를 벡터스토어에 추가"""
try:
if metadata:
for doc in documents:
doc.metadata.update(metadata)
# ChromaDB에 문서 추가
self.vectorstore.add_documents(documents)
logger.info(f"{len(documents)}개 문서 추가 완료")
except Exception as e:
logger.error(f"❌ 문서 추가 실패: {e}")
raise
def search_similar_documents(self, query: str, k: int = 5) -> List[Document]:
"""유사 문서 검색"""
try:
docs = self.vectorstore.similarity_search(query, k=k)
logger.info(f"{len(docs)}개 유사 문서 검색 완료")
return docs
except Exception as e:
logger.error(f"❌ 유사 문서 검색 실패: {e}")
raise
def generate_answer(self, question: str) -> Dict[str, Any]:
"""RAG를 통한 답변 생성"""
try:
# 간단한 유사 문서 검색으로 시작
similar_docs = self.search_similar_documents(question, k=3)
if not similar_docs:
return {
"answer": "죄송합니다. 관련 문서를 찾을 수 없습니다.",
"references": ["문서 없음"],
"source_documents": []
}
# 문서 내용을 기반으로 간단한 답변 생성
context_text = ""
references = []
for i, doc in enumerate(similar_docs):
context_text += f"\n문서 {i+1}:\n{doc.page_content[:500]}...\n"
if hasattr(doc, 'metadata') and doc.metadata:
filename = doc.metadata.get('filename', 'Unknown')
file_id = doc.metadata.get('file_id', 'unknown')
chunk_index = doc.metadata.get('chunk_index', 0)
# 페이지 번호는 청크 인덱스를 기반으로 추정 (실제로는 더 정확한 방법 필요)
page_number = chunk_index + 1
references.append(f"{filename}::{file_id} [p{page_number}]")
# 간단한 답변 생성 (LLM 없이)
answer = f"질문하신 '{question}'에 대한 관련 문서를 찾았습니다.\n\n참조 문서에서 관련 내용을 확인할 수 있습니다."
response = {
"answer": answer,
"references": references,
"source_documents": similar_docs
}
logger.info(f"✅ RAG 답변 생성 완료: {len(references)}개 참조")
return response
except Exception as e:
logger.error(f"❌ RAG 답변 생성 실패: {e}")
# 오류 시 기본 응답 반환
return {
"answer": "죄송합니다. 현재 시스템 오류로 인해 답변을 생성할 수 없습니다.",
"references": ["시스템 오류"],
"source_documents": []
}
def get_collection_info(self) -> Dict[str, Any]:
"""컬렉션 정보 조회"""
try:
# ChromaDB 컬렉션 정보
collection = self.vectorstore._collection
count = collection.count()
return {
"total_documents": count,
"collection_name": "research_documents",
"embedding_model": "jhgan/ko-sroberta-multitask"
}
except Exception as e:
logger.error(f"❌ 컬렉션 정보 조회 실패: {e}")
return {"error": str(e)}
def delete_documents_by_filename(self, filename: str):
"""파일명으로 문서 삭제"""
try:
# 메타데이터로 필터링하여 삭제
collection = self.vectorstore._collection
collection.delete(where={"filename": filename})
logger.info(f"{filename} 관련 문서 삭제 완료")
except Exception as e:
logger.error(f"❌ 문서 삭제 실패: {e}")
raise
def cleanup_database_by_filename(self, filename: str):
"""데이터베이스에서 파일 관련 데이터 정리"""
try:
cursor = self.db_connection.cursor()
# 파일 관련 벡터 데이터 삭제
cursor.execute(
"DELETE FROM file_vectors WHERE filename = %s",
(filename,)
)
# 파일 메타데이터 삭제
cursor.execute(
"DELETE FROM files WHERE filename = %s",
(filename,)
)
cursor.close()
logger.info(f"{filename} 데이터베이스 정리 완료")
except Exception as e:
logger.error(f"❌ 데이터베이스 정리 실패: {e}")
raise
# 전역 서비스 인스턴스
langchain_service = LangChainRAGService()