1237 lines
62 KiB
Python
1237 lines
62 KiB
Python
"""
|
|
LangChain v0.3 기반 AI 서비스
|
|
향후 고도화를 위한 확장 가능한 아키텍처
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from .context_retrieval import ContextRetrieval
|
|
|
|
# LangChain Core
|
|
from langchain_core.documents import Document
|
|
from langchain_core.embeddings import Embeddings
|
|
from langchain_core.vectorstores import VectorStore
|
|
from langchain_core.retrievers import BaseRetriever
|
|
from langchain_core.language_models import BaseLanguageModel
|
|
from langchain_core.prompts import PromptTemplate
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
|
|
|
|
# LangChain Community
|
|
from langchain_community.vectorstores import Chroma
|
|
from langchain_community.embeddings import SentenceTransformerEmbeddings
|
|
from langchain_community.llms import Ollama
|
|
|
|
# LangChain Chains
|
|
from langchain.chains import RetrievalQA
|
|
from langchain.chains.combine_documents import create_stuff_documents_chain
|
|
from langchain.chains import create_retrieval_chain
|
|
|
|
# Database
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LangChainRAGService:
|
|
"""LangChain 기반 RAG 서비스"""
|
|
|
|
def __init__(self):
|
|
self.embeddings: Optional[Embeddings] = None
|
|
self.vectorstore: Optional[VectorStore] = None
|
|
self.llm: Optional[BaseLanguageModel] = None
|
|
self.retriever: Optional[BaseRetriever] = None
|
|
self.vector_retriever: Optional[BaseRetriever] = None
|
|
self.mmr_retriever: Optional[BaseRetriever] = None
|
|
self.qa_chain: Optional[Any] = None
|
|
self.db_connection = None
|
|
self.context_retrieval: Optional[ContextRetrieval] = None
|
|
|
|
def initialize(self):
|
|
"""LangChain 컴포넌트 초기화"""
|
|
try:
|
|
# 임베딩 모델 초기화
|
|
self.embeddings = SentenceTransformerEmbeddings(
|
|
model_name="jhgan/ko-sroberta-multitask"
|
|
)
|
|
logger.info("✅ LangChain 임베딩 모델 로드 완료")
|
|
|
|
# ChromaDB 벡터스토어 초기화 (권한 문제 해결)
|
|
self._initialize_chromadb()
|
|
logger.info("✅ LangChain ChromaDB 초기화 완료")
|
|
|
|
# Ollama LLM 초기화 (temperature 0.3으로 설정)
|
|
self.llm = Ollama(
|
|
model="qwen3:latest",
|
|
base_url="http://localhost:11434",
|
|
temperature=0.3 # 더 일관되고 정확한 답변을 위한 낮은 temperature
|
|
)
|
|
logger.info("✅ LangChain Ollama LLM 초기화 완료")
|
|
|
|
# 리트리버 설정 (ChromaDB 초기화 완료 후에 호출)
|
|
self._setup_retrievers()
|
|
logger.info("✅ LangChain 리트리버 초기화 완료")
|
|
|
|
# RAG 체인 구성
|
|
self._setup_rag_chain()
|
|
|
|
# 데이터베이스 연결
|
|
self._setup_database()
|
|
|
|
# 컨텍스트 검색 시스템 초기화
|
|
self._initialize_context_retrieval()
|
|
|
|
logger.info("🚀 LangChain RAG 서비스 초기화 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ LangChain 서비스 초기화 실패: {e}")
|
|
raise
|
|
|
|
def _initialize_chromadb(self):
|
|
"""ChromaDB 초기화 (권한 문제 해결)"""
|
|
try:
|
|
import os
|
|
import shutil
|
|
|
|
# vectordb 디렉토리 생성 및 권한 설정
|
|
vectordb_dir = "./vectordb"
|
|
if not os.path.exists(vectordb_dir):
|
|
os.makedirs(vectordb_dir, mode=0o755, exist_ok=True)
|
|
logger.info(f"📁 vectordb 디렉토리 생성: {vectordb_dir}")
|
|
|
|
try:
|
|
# 기존 컬렉션이 있는지 확인
|
|
try:
|
|
temp_client = Chroma(
|
|
persist_directory=vectordb_dir,
|
|
embedding_function=self.embeddings,
|
|
collection_name="research_documents"
|
|
)
|
|
# 컬렉션 접근 테스트
|
|
temp_client._collection.count()
|
|
self.vectorstore = temp_client
|
|
logger.info("✅ 기존 ChromaDB 컬렉션 접근 성공")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ 기존 컬렉션 접근 실패 (시도 {attempt + 1}/{max_retries}): {e}")
|
|
|
|
if attempt < max_retries - 1:
|
|
# 컬렉션 재생성 시도
|
|
try:
|
|
# 기존 vectordb 디렉토리 삭제 후 재생성
|
|
if os.path.exists(vectordb_dir):
|
|
shutil.rmtree(vectordb_dir)
|
|
os.makedirs(vectordb_dir, mode=0o755, exist_ok=True)
|
|
logger.info(f"🔄 vectordb 디렉토리 재생성: {vectordb_dir}")
|
|
except Exception as cleanup_error:
|
|
logger.error(f"❌ vectordb 디렉토리 재생성 실패: {cleanup_error}")
|
|
else:
|
|
# 마지막 시도에서도 실패하면 vectordb 디렉토리 완전 삭제 후 재생성
|
|
try:
|
|
if os.path.exists(vectordb_dir):
|
|
shutil.rmtree(vectordb_dir)
|
|
os.makedirs(vectordb_dir, mode=0o755, exist_ok=True)
|
|
logger.info(f"🔄 vectordb 디렉토리 완전 재생성: {vectordb_dir}")
|
|
|
|
# 새로운 ChromaDB 클라이언트 생성
|
|
self.vectorstore = Chroma(
|
|
persist_directory=vectordb_dir,
|
|
embedding_function=self.embeddings,
|
|
collection_name="research_documents"
|
|
)
|
|
logger.info("✅ 새로운 ChromaDB 컬렉션 생성 성공")
|
|
except Exception as final_error:
|
|
logger.error(f"❌ ChromaDB 최종 초기화 실패: {final_error}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ ChromaDB 초기화 실패 (시도 {attempt + 1}/{max_retries}): {e}")
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
else:
|
|
# 재시도 전 잠시 대기
|
|
import time
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ ChromaDB 초기화 실패: {e}")
|
|
raise
|
|
|
|
def _setup_retrievers(self):
|
|
"""리트리버 설정"""
|
|
try:
|
|
logger.info("🔧 리트리버 설정 시작...")
|
|
logger.info(f"🔧 vectorstore 상태: {self.vectorstore is not None}")
|
|
|
|
# 하이브리드 검색을 위한 다중 리트리버 설정
|
|
self.vector_retriever = self.vectorstore.as_retriever(
|
|
search_type="similarity",
|
|
search_kwargs={"k": 10}
|
|
)
|
|
logger.info("🔧 vector_retriever 설정 완료")
|
|
|
|
# MMR (Maximal Marginal Relevance) 리트리버 추가
|
|
self.mmr_retriever = self.vectorstore.as_retriever(
|
|
search_type="mmr",
|
|
search_kwargs={"k": 8, "fetch_k": 15, "lambda_mult": 0.5}
|
|
)
|
|
logger.info("🔧 mmr_retriever 설정 완료")
|
|
|
|
# 기본 리트리버는 벡터 검색 사용
|
|
self.retriever = self.vector_retriever
|
|
logger.info("✅ 리트리버 설정 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 리트리버 설정 실패: {e}")
|
|
raise
|
|
|
|
def _setup_rag_chain(self):
|
|
"""RAG 체인 설정"""
|
|
try:
|
|
# 개선된 프롬프트 템플릿
|
|
prompt_template = """
|
|
당신은 연구 문서 전문가입니다. 주어진 문서들을 바탕으로 사용자의 질문에 정확하고 상세하게 답변해주세요.
|
|
|
|
**답변 규칙:**
|
|
1. 답변은 반드시 한국어로만 시작하세요
|
|
2. <think> 태그나 영어 사고 과정을 절대 포함하지 마세요
|
|
3. 바로 최종 답변만 제공하세요
|
|
4. 문서의 내용을 정확히 파악하고 요약하여 답변하세요
|
|
5. 구체적인 절차나 방법이 있다면 단계별로 설명하세요
|
|
6. 문서에 없는 내용은 추측하지 말고 "문서에 명시되지 않음"이라고 하세요
|
|
7. **마크다운 형식을 적극적으로 사용하여 구조화된 답변을 제공하세요**
|
|
|
|
**마크다운 구조화 규칙 (중요):**
|
|
- 답변을 여러 섹션으로 나누어 ## 헤딩을 사용하세요
|
|
- 절차나 단계가 있다면 ### 소제목으로 구분하세요
|
|
- 중요한 내용은 **볼드**로 강조하세요
|
|
- 목록이나 절차는 - 또는 1. 2. 3. 형태의 리스트로 작성하세요
|
|
- 주의사항이나 중요 정보는 > 인용문으로 표시하세요
|
|
- 표가 필요한 경우 | 표 형태로 작성하세요
|
|
- 답변을 최소 3-4개 섹션으로 나누어 상세하게 설명하세요
|
|
- 각 섹션은 2-3문장 이상으로 구성하세요
|
|
|
|
|
|
**참조 문서:**
|
|
{context}
|
|
|
|
**사용자 질문:** {input}
|
|
|
|
**답변:** 위 문서의 내용을 바탕으로 질문에 대한 구체적이고 실용적인 답변을 마크다운 형식으로 바로 제공해주세요.
|
|
|
|
**답변 구조 요구사항:**
|
|
- 답변을 최소 3-4개 섹션으로 나누어 작성하세요
|
|
- 각 섹션은 ## 헤딩으로 시작하세요
|
|
- 절차나 단계가 있다면 ### 소제목과 번호 리스트를 사용하세요
|
|
- 중요한 용어나 개념은 **볼드**로 강조하세요
|
|
- 주의사항이나 중요 정보는 > 인용문으로 표시하세요
|
|
- 답변을 충분히 상세하고 길게 작성하세요 (최소 300자 이상)
|
|
|
|
**금지사항:**
|
|
- [문서명](https://example.com/attachment1) 같은 마크다운 링크 형태 절대 사용 금지
|
|
- [문서명](RSC-PM-SOP-0001-F01) 같은 괄호 내 문서 ID 절대 사용 금지
|
|
- example.com 같은 가짜 URL 절대 사용 금지
|
|
- [문서명] 형태의 대괄호 절대 사용 금지
|
|
- 괄호나 추가 정보는 절대 포함하지 마세요
|
|
- 문서명은 일반 텍스트로만 표시하세요 (예: 기밀자료 관리 신청서, 의사결정 프로세스)
|
|
"""
|
|
|
|
prompt = PromptTemplate(
|
|
template=prompt_template,
|
|
input_variables=["context", "input"]
|
|
)
|
|
|
|
# 문서 체인 생성
|
|
document_chain = create_stuff_documents_chain(
|
|
llm=self.llm,
|
|
prompt=prompt
|
|
)
|
|
|
|
# RAG 체인 생성
|
|
self.qa_chain = create_retrieval_chain(
|
|
retriever=self.retriever,
|
|
combine_docs_chain=document_chain
|
|
)
|
|
|
|
logger.info("✅ RAG 체인 설정 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ RAG 체인 설정 실패: {e}")
|
|
raise
|
|
|
|
|
|
def _setup_database(self):
|
|
"""데이터베이스 연결 설정"""
|
|
try:
|
|
self.db_connection = psycopg2.connect(
|
|
host="localhost",
|
|
port=5432,
|
|
database="researchqa",
|
|
user="woonglab",
|
|
password="!@#woonglab"
|
|
)
|
|
self.db_connection.autocommit = True
|
|
logger.info("✅ PostgreSQL 연결 완료")
|
|
except Exception as e:
|
|
logger.error(f"❌ PostgreSQL 연결 실패: {e}")
|
|
raise
|
|
|
|
def _initialize_context_retrieval(self):
|
|
"""컨텍스트 검색 시스템 초기화"""
|
|
try:
|
|
self.context_retrieval = ContextRetrieval()
|
|
logger.info("✅ 컨텍스트 검색 시스템 초기화 완료")
|
|
|
|
# 기존 벡터스토어에서 문서들을 가져와서 컨텍스트 검색 인덱스 구축
|
|
self._build_context_index()
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 컨텍스트 검색 시스템 초기화 실패: {e}")
|
|
raise
|
|
|
|
def _build_context_index(self):
|
|
"""컨텍스트 검색을 위한 인덱스 구축"""
|
|
try:
|
|
if not self.vectorstore:
|
|
logger.warning("⚠️ 벡터스토어가 초기화되지 않았습니다.")
|
|
return
|
|
|
|
# 벡터스토어에서 모든 문서 가져오기
|
|
all_docs = self.vectorstore.similarity_search("", k=10000) # 모든 문서 검색
|
|
|
|
# 컨텍스트 검색용 문서 형식으로 변환
|
|
context_documents = []
|
|
for doc in all_docs:
|
|
context_documents.append({
|
|
'content': doc.page_content,
|
|
'metadata': doc.metadata,
|
|
'document': doc
|
|
})
|
|
|
|
# 컨텍스트 검색 인덱스 구축
|
|
self.context_retrieval.build_index(context_documents)
|
|
logger.info(f"✅ 컨텍스트 검색 인덱스 구축 완료: {len(context_documents)}개 문서")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 컨텍스트 검색 인덱스 구축 실패: {e}")
|
|
# 인덱스 구축 실패는 치명적이지 않으므로 경고만 출력
|
|
logger.warning("⚠️ 컨텍스트 검색을 사용하지 않고 기본 검색을 사용합니다.")
|
|
|
|
def _update_context_index(self, new_documents: List[Document]):
|
|
"""새로운 문서로 컨텍스트 검색 인덱스 업데이트"""
|
|
try:
|
|
if not self.context_retrieval:
|
|
return
|
|
|
|
# 새 문서를 컨텍스트 검색용 형식으로 변환
|
|
context_documents = []
|
|
for doc in new_documents:
|
|
context_documents.append({
|
|
'content': doc.page_content,
|
|
'metadata': doc.metadata,
|
|
'document': doc
|
|
})
|
|
|
|
# 기존 인덱스에 새 문서 추가
|
|
if hasattr(self.context_retrieval.context_bm25, 'documents'):
|
|
self.context_retrieval.context_bm25.documents.extend(context_documents)
|
|
# BM25 인덱스 재구축
|
|
self.context_retrieval.context_bm25.build_index(self.context_retrieval.context_bm25.documents)
|
|
logger.info(f"✅ 컨텍스트 검색 인덱스 업데이트 완료: {len(new_documents)}개 문서 추가")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 컨텍스트 검색 인덱스 업데이트 실패: {e}")
|
|
# 인덱스 업데이트 실패는 치명적이지 않으므로 경고만 출력
|
|
logger.warning("⚠️ 컨텍스트 검색 인덱스 업데이트를 건너뜁니다.")
|
|
|
|
def add_documents(self, documents: List[Document], metadata: Dict[str, Any] = None):
|
|
"""문서를 벡터스토어에 추가 (재시도 로직 포함)"""
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
if metadata:
|
|
for doc in documents:
|
|
doc.metadata.update(metadata)
|
|
|
|
# ChromaDB에 문서 추가
|
|
self.vectorstore.add_documents(documents)
|
|
logger.info(f"✅ {len(documents)}개 문서 추가 완료")
|
|
|
|
# 컨텍스트 검색 인덱스 업데이트
|
|
self._update_context_index(documents)
|
|
|
|
return # 성공 시 함수 종료
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 문서 추가 실패 (시도 {attempt + 1}/{max_retries}): {e}")
|
|
|
|
# vectordb 관련 오류인지 확인
|
|
error_message = str(e).lower()
|
|
if any(keyword in error_message for keyword in [
|
|
"readonly database",
|
|
"code: 1032",
|
|
"chromadb",
|
|
"vectorstore",
|
|
"collection",
|
|
"database error"
|
|
]):
|
|
logger.error(f"🔍 벡터DB 관련 오류 감지: {e}")
|
|
|
|
if attempt < max_retries - 1:
|
|
# ChromaDB 재초기화 시도
|
|
try:
|
|
logger.info(f"🔄 ChromaDB 재초기화 시도 (시도 {attempt + 1}/{max_retries})")
|
|
self._initialize_chromadb()
|
|
logger.info("✅ ChromaDB 재초기화 완료")
|
|
except Exception as reinit_error:
|
|
logger.error(f"❌ ChromaDB 재초기화 실패: {reinit_error}")
|
|
else:
|
|
# 마지막 시도에서도 실패하면 사용자 친화적 메시지 반환
|
|
raise Exception("벡터DB에 문제가 있습니다.")
|
|
else:
|
|
# vectordb 관련이 아닌 오류는 즉시 재발생
|
|
raise
|
|
|
|
# 모든 재시도가 실패한 경우
|
|
raise Exception("벡터DB에 문제가 있습니다.")
|
|
|
|
def add_documents_with_metadata(self, documents_with_metadata: List[Dict], additional_metadata: Dict[str, Any]):
|
|
"""메타데이터가 포함된 문서들을 벡터스토어에 추가"""
|
|
try:
|
|
from langchain_core.documents import Document
|
|
|
|
# 메타데이터가 포함된 문서들을 LangChain Document로 변환
|
|
langchain_docs = []
|
|
for doc_data in documents_with_metadata:
|
|
# 기본 메타데이터와 추가 메타데이터 병합
|
|
merged_metadata = {**doc_data.get("metadata", {}), **additional_metadata}
|
|
|
|
langchain_doc = Document(
|
|
page_content=doc_data.get("page_content", ""),
|
|
metadata=merged_metadata
|
|
)
|
|
langchain_docs.append(langchain_doc)
|
|
|
|
# 기존 add_documents 메서드 사용
|
|
self.add_documents(langchain_docs)
|
|
logger.info(f"✅ 메타데이터 포함 {len(langchain_docs)}개 문서 추가 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 메타데이터 포함 문서 추가 실패: {e}")
|
|
raise
|
|
|
|
def search_similar_documents(self, query: str, k: int = 5) -> List[Document]:
|
|
"""유사 문서 검색"""
|
|
try:
|
|
docs = self.vectorstore.similarity_search(query, k=k)
|
|
logger.info(f"✅ {len(docs)}개 유사 문서 검색 완료")
|
|
return docs
|
|
except Exception as e:
|
|
logger.error(f"❌ 유사 문서 검색 실패: {e}")
|
|
|
|
# vectordb 관련 오류인지 확인하고 사용자 친화적 메시지로 변환
|
|
error_message = str(e).lower()
|
|
if any(keyword in error_message for keyword in [
|
|
"readonly database",
|
|
"code: 1032",
|
|
"chromadb",
|
|
"vectorstore",
|
|
"collection",
|
|
"database error"
|
|
]):
|
|
logger.error(f"🔍 벡터DB 관련 오류 감지: {e}")
|
|
raise Exception("벡터DB에 문제가 있습니다.")
|
|
else:
|
|
raise
|
|
|
|
def _filter_relevant_documents(self, documents: List[Document], question: str) -> List[Document]:
|
|
"""고급 문서 관련성 필터링 (최신 RAG 기법 적용)"""
|
|
if not documents:
|
|
return documents
|
|
|
|
# 질문에서 키워드 추출
|
|
question_keywords = self._extract_keywords(question)
|
|
|
|
# 각 문서의 관련성 점수 계산
|
|
scored_documents = []
|
|
for doc in documents:
|
|
score = self._calculate_advanced_relevance_score(doc, question_keywords, question)
|
|
scored_documents.append((doc, score))
|
|
|
|
# 점수순으로 정렬
|
|
scored_documents.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# 1단계: 높은 관련성 문서만 선별 (더 완화)
|
|
high_relevance_docs = [doc for doc, score in scored_documents if score > 0.1] # 0.3→0.1
|
|
|
|
# 2단계: 중간 관련성 문서 선별 (더 완화)
|
|
medium_relevance_docs = [doc for doc, score in scored_documents if 0.05 < score <= 0.1] # 0.1→0.05
|
|
|
|
# 3단계: 최종 문서 선택 (다양성 강화)
|
|
final_docs = []
|
|
used_filenames = set()
|
|
used_pages = set() # 페이지 다양성도 고려
|
|
|
|
# 높은 관련성 문서 우선 추가 (더 완화)
|
|
for doc in high_relevance_docs:
|
|
if len(final_docs) >= 15: # 최대 15개로 완화 (12→15)
|
|
break
|
|
filename = doc.metadata.get('filename', '')
|
|
page_key = f"{filename}_{doc.metadata.get('estimated_page', 1)}"
|
|
|
|
# 파일명과 페이지 다양성 모두 고려 (완화)
|
|
if filename not in used_filenames and page_key not in used_pages:
|
|
final_docs.append(doc)
|
|
used_filenames.add(filename)
|
|
used_pages.add(page_key)
|
|
|
|
# 중간 관련성 문서에서 다양성을 위해 추가 선택 (더 완화)
|
|
if len(final_docs) < 10 and medium_relevance_docs: # 최소 10개로 완화 (8→10)
|
|
for doc in medium_relevance_docs:
|
|
if len(final_docs) >= 15: # 최대 15개로 완화 (12→15)
|
|
break
|
|
filename = doc.metadata.get('filename', '')
|
|
page_key = f"{filename}_{doc.metadata.get('estimated_page', 1)}"
|
|
|
|
# 파일명과 페이지 다양성 모두 고려
|
|
if filename not in used_filenames or page_key not in used_pages:
|
|
final_docs.append(doc)
|
|
used_filenames.add(filename)
|
|
used_pages.add(page_key)
|
|
|
|
# 최소 2개 문서 보장 (유지)
|
|
if len(final_docs) < 2:
|
|
for doc, score in scored_documents:
|
|
if len(final_docs) >= 2: # 최소 2개로 50% 감소 (3→2)
|
|
break
|
|
filename = doc.metadata.get('filename', '')
|
|
page_key = f"{filename}_{doc.metadata.get('estimated_page', 1)}"
|
|
|
|
if filename not in used_filenames or page_key not in used_pages:
|
|
final_docs.append(doc)
|
|
used_filenames.add(filename)
|
|
used_pages.add(page_key)
|
|
|
|
logger.info(f"📊 고급 문서 필터링: {len(documents)}개 → {len(final_docs)}개")
|
|
|
|
# 필터링된 문서들의 점수와 파일명 로깅
|
|
for i, (doc, score) in enumerate(scored_documents[:8]):
|
|
# filename 추출 로직 개선
|
|
filename = doc.metadata.get('filename', 'Unknown') if doc.metadata else 'Unknown'
|
|
if filename == 'Unknown' or not filename:
|
|
# source 필드에서 filename 추출 시도
|
|
source = doc.metadata.get('source', '') if doc.metadata else ''
|
|
if source:
|
|
# UUID_filename.pdf 형태에서 filename.pdf 추출
|
|
import re
|
|
match = re.search(r'[a-f0-9-]{36}_(.+)', source)
|
|
if match:
|
|
filename = match.group(1)
|
|
else:
|
|
filename = source
|
|
status = "✅ 선택됨" if doc in final_docs else "❌ 제외됨"
|
|
logger.info(f"📋 문서 {i+1}: {filename} (점수: {score:.2f}) - {status}")
|
|
|
|
return final_docs
|
|
|
|
def _extract_keywords(self, text: str) -> List[str]:
|
|
"""텍스트에서 키워드 추출"""
|
|
import re
|
|
|
|
# 한국어 키워드 추출 (2글자 이상)
|
|
korean_keywords = re.findall(r'[가-힣]{2,}', text)
|
|
|
|
# 영어 키워드 추출 (3글자 이상)
|
|
english_keywords = re.findall(r'[A-Za-z]{3,}', text)
|
|
|
|
# 숫자와 특수문자 제거
|
|
keywords = []
|
|
for keyword in korean_keywords + english_keywords:
|
|
if not re.search(r'[0-9]', keyword) and len(keyword) >= 2:
|
|
keywords.append(keyword.lower())
|
|
|
|
return list(set(keywords)) # 중복 제거
|
|
|
|
def _calculate_advanced_relevance_score(self, doc: Document, question_keywords: List[str], question: str) -> float:
|
|
"""고급 문서 관련성 점수 계산 (최신 RAG 기법 적용)"""
|
|
if not hasattr(doc, 'page_content'):
|
|
return 0.0
|
|
|
|
content = doc.page_content.lower()
|
|
# filename 추출 로직 개선
|
|
filename = doc.metadata.get('filename', '').lower() if doc.metadata else ''
|
|
if not filename:
|
|
# source 필드에서 filename 추출 시도
|
|
source = doc.metadata.get('source', '') if doc.metadata else ''
|
|
if source:
|
|
# UUID_filename.pdf 형태에서 filename.pdf 추출
|
|
import re
|
|
match = re.search(r'[a-f0-9-]{36}_(.+)', source)
|
|
if match:
|
|
filename = match.group(1).lower()
|
|
else:
|
|
filename = source.lower()
|
|
|
|
score = 0.0
|
|
|
|
# 1. 파일명 관련성 (가중치 매우 높음)
|
|
filename_score = 0.0
|
|
for keyword in question_keywords:
|
|
if keyword in filename:
|
|
filename_score += 2.0 # 파일명 매칭에 더 높은 점수
|
|
score += filename_score * 3.0 # 파일명 매칭에 매우 높은 가중치
|
|
|
|
# 2. 내용 관련성 (키워드 밀도 고려)
|
|
content_score = 0.0
|
|
total_keywords = len(question_keywords)
|
|
matched_keywords = 0
|
|
|
|
for keyword in question_keywords:
|
|
if keyword in content:
|
|
matched_keywords += 1
|
|
# 키워드가 여러 번 나타날수록 더 높은 점수
|
|
keyword_count = content.count(keyword)
|
|
content_score += min(keyword_count * 0.5, 2.0) # 최대 2.0점
|
|
|
|
# 키워드 매칭 비율에 따른 보너스
|
|
if total_keywords > 0:
|
|
match_ratio = matched_keywords / total_keywords
|
|
content_score += match_ratio * 2.0 # 매칭 비율에 따른 보너스
|
|
|
|
score += content_score
|
|
|
|
# 3. 구체적 키워드 매칭 (도메인 특화)
|
|
domain_keywords = {
|
|
'연구노트': ['연구노트', '연구노트문서', '노트대출', '노트관리', '연구노트작성'],
|
|
'대출': ['대출', '대출절차', '대출신청', '대출승인', '문서대출'],
|
|
'신청': ['신청', '신청서', '승인절차', '신청절차', '승인신청'],
|
|
'자료실': ['자료실', '연구자료실', '자료실운영', '자료실관리', '자료실접근']
|
|
}
|
|
|
|
for main_keyword, related_keywords in domain_keywords.items():
|
|
if main_keyword in question:
|
|
for related_keyword in related_keywords:
|
|
if related_keyword in content:
|
|
score += 1.5 # 도메인 키워드 매칭에 높은 점수
|
|
if related_keyword in filename:
|
|
score += 2.0 # 파일명에 도메인 키워드가 있으면 더 높은 점수
|
|
|
|
# 4. 문맥적 관련성 (문장 단위 매칭)
|
|
sentences = content.split('.')
|
|
context_score = 0.0
|
|
for sentence in sentences:
|
|
sentence_lower = sentence.lower().strip()
|
|
if len(sentence_lower) > 10: # 의미있는 문장만 고려
|
|
keyword_in_sentence = sum(1 for keyword in question_keywords if keyword in sentence_lower)
|
|
if keyword_in_sentence >= 2: # 한 문장에 2개 이상 키워드가 있으면
|
|
context_score += 1.0
|
|
|
|
score += context_score * 0.8
|
|
|
|
# 5. 관련성 낮은 문서 패턴 제거 (50% 더 엄격하게)
|
|
irrelevant_patterns = [
|
|
'시료', '외부송부', '보고서작성', '기밀자료송부', '안전성시험', '임상시험',
|
|
'참고문헌', '관련문서', '붙임', '목차', '인덱스', 'table of contents',
|
|
'해당사항 없음', '없음', '참조', 'reference', 'attachment',
|
|
'문서 번호', '개정번호', '발효일자', 'format no', 'document no',
|
|
'e-signature', '전자서명', 'written by', 'reviewed by', 'approved by',
|
|
'justification', 'author', 'qa review', 'director approval', 'head approval',
|
|
'의사결정', '결재라인', '승인라인', '프로세스', '송부프로세스',
|
|
'대외성과발표', '승인프로세스', '성과발표', '발표승인', '승인절차'
|
|
]
|
|
|
|
for pattern in irrelevant_patterns:
|
|
if pattern in filename and not any(keyword in question for keyword in ['시료', '송부', '보고서', '기밀', '참고', '관련', '성과', '발표']):
|
|
score -= 3.0 # 관련성 낮은 패턴에 더 큰 페널티 (2.0→3.0)
|
|
|
|
# 6. 목차/인덱스/메타데이터 페이지 필터링 (더 정교하게)
|
|
is_index_page = (
|
|
len(content) < 500 or # 내용이 너무 짧음
|
|
content.count('\n') > content.count(' ') * 0.4 or # 줄바꿈이 많아서 목록 형태
|
|
content.count('dwprnd') + content.count('sop') + content.count('f0') >= 3 or # 문서 번호가 너무 많음
|
|
len([s for s in content.split('.') if len(s.strip()) > 30]) < 3 or # 구체적인 설명이 부족
|
|
# 추가: 목차 특성 패턴 감지
|
|
(content.count('붙임') >= 2 and content.count('f0') >= 2) or # 붙임 목록이 많음
|
|
(content.count('참고문헌') > 0 and content.count('관련문서') > 0) or # 목차 구조
|
|
content.count('해당사항 없음') > 0 or # 목차에서 자주 나타나는 표현
|
|
# 추가: 개정내역/메타데이터 페이지 감지
|
|
(content.count('개정번호') > 0 and content.count('개정항목') > 0) or # 개정내역 테이블
|
|
(content.count('개정사유') > 0 and content.count('발효일자') > 0) or # 개정내역 테이블
|
|
content.count('신규제정') > 0 or # 개정내역에서 자주 나타나는 표현
|
|
content.count('sop 양식 개정') > 0 or # 개정내역에서 자주 나타나는 표현
|
|
content.count('조직변경') > 0 or # 개정내역에서 자주 나타나는 표현
|
|
# 추가: 테이블 형태의 메타데이터 감지
|
|
(content.count('|') > 10 and content.count('개정') > 0) or # 개정내역 테이블
|
|
(content.count('번호') > 0 and content.count('일자') > 0 and content.count('변경') > 0) # 메타데이터 테이블
|
|
)
|
|
|
|
if is_index_page:
|
|
score *= 0.02 # 목차/인덱스 페이지는 점수를 매우 크게 감소
|
|
logger.info(f"📋 목차/인덱스 페이지 감지: {filename} - 점수 대폭 감소")
|
|
|
|
# 7. 질문 특화 검증 (완화된 버전)
|
|
if '연구노트' in question and '대출' in question:
|
|
# 연구노트 대출과 직접 관련된 키워드들이 문서에 있는지 확인
|
|
research_note_loan_keywords = ['연구노트', '대출', '비기밀자료', '기밀자료관리', '외부송부', '신청서', '승인', '문서', '관리', '절차']
|
|
keyword_matches = sum(1 for keyword in research_note_loan_keywords if keyword in content)
|
|
if keyword_matches < 2: # 최소 2개 이상의 관련 키워드가 있으면 포함 (4→2로 완화)
|
|
score *= 0.3 # 관련성이 낮으면 점수를 감소하지만 완전히 제외하지 않음 (0.1→0.3)
|
|
|
|
# 완화: "보고서 작성" 관련 문서도 부분적으로 포함
|
|
if '보고서' in filename and '작성' in filename:
|
|
score *= 0.2 # 보고서 작성 관련 문서도 부분적으로 포함 (0.05→0.2)
|
|
logger.info(f"📋 보고서 작성 문서 - 점수 감소: {filename}")
|
|
|
|
# 완화: "송부 프로세스" 관련 문서도 부분적으로 포함
|
|
if '송부' in filename and '프로세스' in filename:
|
|
score *= 0.2 # 송부 프로세스 관련 문서도 부분적으로 포함 (0.05→0.2)
|
|
logger.info(f"📋 송부 프로세스 문서 - 점수 감소: {filename}")
|
|
|
|
# 완화: "의사결정 프로세스" 관련 문서도 부분적으로 포함
|
|
if '의사결정' in content or '결재라인' in content or '승인라인' in content:
|
|
score *= 0.2 # 의사결정 프로세스 관련 문서도 부분적으로 포함 (0.05→0.2)
|
|
logger.info(f"📋 의사결정 프로세스 문서 - 점수 감소: {filename}")
|
|
|
|
return max(0.0, score) # 음수 점수 방지
|
|
|
|
def _verify_content_relevance(self, content: str, question: str) -> bool:
|
|
"""문서 내용이 질문과 실제로 관련이 있는지 검증 (간소화)"""
|
|
if not content or not question:
|
|
return False
|
|
|
|
content_lower = content.lower()
|
|
question_keywords = self._extract_keywords(question)
|
|
|
|
# 명백히 관련 없는 페이지 패턴들
|
|
irrelevant_patterns = [
|
|
'개정번호', '개정항목', '개정사유', '발효일자', '신규제정',
|
|
'바인더 표지', '보관번호', '보관목록', '관리부서',
|
|
'prepared by', 'format no', 'storage number'
|
|
]
|
|
|
|
if any(pattern in content_lower for pattern in irrelevant_patterns):
|
|
return False
|
|
|
|
# 키워드 매칭 확인 (완화된 버전)
|
|
keyword_matches = sum(1 for keyword in question_keywords if keyword in content_lower)
|
|
|
|
# 최소 1개 이상의 키워드가 매칭되면 관련성이 있다고 판단 (2→1로 완화)
|
|
return keyword_matches >= 1
|
|
|
|
def _verify_answer_document_relevance(self, answer: str, doc_content: str, question: str) -> bool:
|
|
"""답변과 문서 내용의 직접적 연관성 검증"""
|
|
if not answer or not doc_content:
|
|
return False
|
|
|
|
answer_lower = answer.lower()
|
|
doc_content_lower = doc_content.lower()
|
|
|
|
# 답변에서 추출한 핵심 키워드들
|
|
answer_keywords = []
|
|
|
|
# 답변에서 구체적인 정보나 절차를 추출
|
|
import re
|
|
|
|
# 숫자나 코드 패턴 (예: "DWPRND-DT-SOP-001-F07", "201609T001")
|
|
code_patterns = re.findall(r'[A-Z]{2,}[A-Z0-9-]+', answer)
|
|
answer_keywords.extend(code_patterns)
|
|
|
|
# 구체적인 절차나 단계 (예: "신청서 작성", "승인 절차")
|
|
procedure_patterns = re.findall(r'[가-힣]{2,}\s*[가-힣]{2,}', answer)
|
|
answer_keywords.extend(procedure_patterns)
|
|
|
|
# 문서 내용에서 답변의 핵심 키워드가 실제로 언급되는지 확인
|
|
relevance_score = 0
|
|
for keyword in answer_keywords:
|
|
if keyword.lower() in doc_content_lower:
|
|
relevance_score += 1
|
|
|
|
# 답변의 핵심 내용이 문서에 실제로 존재하는지 확인
|
|
answer_sentences = [s.strip() for s in answer.split('.') if len(s.strip()) > 10]
|
|
doc_sentence_matches = 0
|
|
|
|
for sentence in answer_sentences[:3]: # 답변의 처음 3개 문장만 확인
|
|
sentence_keywords = re.findall(r'[가-힣]{2,}', sentence)
|
|
if len(sentence_keywords) >= 2:
|
|
# 문장의 키워드들이 문서에 모두 있는지 확인
|
|
if all(keyword in doc_content_lower for keyword in sentence_keywords[:2]):
|
|
doc_sentence_matches += 1
|
|
|
|
# 관련성 판단: 더 관대하게 판단 (완화)
|
|
return relevance_score >= 0 or doc_sentence_matches >= 0 or len(answer_keywords) > 0
|
|
|
|
def _fallback_hybrid_search(self, question: str) -> List[Document]:
|
|
"""컨텍스트 검색 실패 시 사용할 기본 하이브리드 검색"""
|
|
logger.info(f"🔄 기본 하이브리드 검색 실행: {question}")
|
|
|
|
# 1. 벡터 검색으로 관련 문서 검색
|
|
vector_docs = self.vector_retriever.get_relevant_documents(question)
|
|
logger.info(f"📊 벡터 검색 결과: {len(vector_docs)}개 문서")
|
|
|
|
# 2. MMR 검색으로 다양성 있는 문서 검색
|
|
mmr_docs = self.mmr_retriever.get_relevant_documents(question)
|
|
logger.info(f"📊 MMR 검색 결과: {len(mmr_docs)}개 문서")
|
|
|
|
# 3. 두 검색 결과를 결합하여 중복 제거
|
|
all_docs = vector_docs + mmr_docs
|
|
unique_docs = []
|
|
seen_metadata = set()
|
|
|
|
for doc in all_docs:
|
|
doc_key = f"{doc.metadata.get('filename', '')}_{doc.metadata.get('chunk_index', 0)}"
|
|
if doc_key not in seen_metadata:
|
|
unique_docs.append(doc)
|
|
seen_metadata.add(doc_key)
|
|
|
|
logger.info(f"📊 중복 제거 후 총 {len(unique_docs)}개 문서")
|
|
return unique_docs
|
|
|
|
def generate_answer(self, question: str) -> Dict[str, Any]:
|
|
"""RAG를 통한 답변 생성"""
|
|
try:
|
|
# 컨텍스트 기반 검색으로 더 정확한 문서 검색
|
|
logger.info(f"🤖 컨텍스트 기반 검색으로 문서 검색 시작: {question}")
|
|
|
|
# 1. 컨텍스트 검색 시스템 사용
|
|
if self.context_retrieval:
|
|
try:
|
|
# 컨텍스트 기반 통합 검색 (임베딩 + BM25 + Reranker)
|
|
context_results = self.context_retrieval.search(question, top_k=15)
|
|
|
|
# 컨텍스트 검색 결과를 LangChain Document 형식으로 변환
|
|
unique_docs = []
|
|
for result in context_results:
|
|
if 'document' in result:
|
|
unique_docs.append(result['document'])
|
|
|
|
logger.info(f"📊 컨텍스트 기반 검색 결과: {len(unique_docs)}개 문서")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 컨텍스트 검색 실패, 기본 검색으로 대체: {e}")
|
|
# 컨텍스트 검색 실패 시 기본 하이브리드 검색 사용
|
|
unique_docs = self._fallback_hybrid_search(question)
|
|
else:
|
|
# 컨텍스트 검색 시스템이 없는 경우 기본 하이브리드 검색 사용
|
|
unique_docs = self._fallback_hybrid_search(question)
|
|
|
|
# 2. RAG 체인을 사용하여 답변 생성
|
|
if self.qa_chain:
|
|
logger.info(f"🤖 LLM을 사용한 RAG 답변 생성 시작: {question}")
|
|
result = self.qa_chain.invoke({"input": question})
|
|
|
|
# LLM이 실제로 사용한 문서 정보 추출
|
|
references = []
|
|
detailed_references = [] # 인라인 링크용 상세 정보
|
|
source_documents = result.get("context", []) # LLM이 실제로 사용한 문서
|
|
|
|
logger.info(f"📋 LLM이 실제로 사용한 문서 수: {len(source_documents)}개")
|
|
for i, doc in enumerate(source_documents):
|
|
# filename 추출 로직 개선
|
|
filename = doc.metadata.get('filename', 'Unknown') if hasattr(doc, 'metadata') and doc.metadata else 'Unknown'
|
|
if filename == 'Unknown' or not filename:
|
|
# source 필드에서 filename 추출 시도
|
|
source = doc.metadata.get('source', '') if hasattr(doc, 'metadata') and doc.metadata else ''
|
|
if source:
|
|
# UUID_filename.pdf 형태에서 filename.pdf 추출
|
|
import re
|
|
match = re.search(r'[a-f0-9-]{36}_(.+)', source)
|
|
if match:
|
|
filename = match.group(1)
|
|
else:
|
|
filename = source
|
|
logger.info(f"📋 문서 {i+1}: {filename}")
|
|
|
|
# LLM이 실제로 사용한 문서에 대해 관련성 재검증 및 필터링
|
|
logger.info(f"📋 LLM이 사용한 문서에 대해 관련성 재검증 시작: {len(source_documents)}개")
|
|
filtered_documents = self._filter_relevant_documents(source_documents, question)
|
|
logger.info(f"📋 관련성 재검증 완료: {len(source_documents)}개 → {len(filtered_documents)}개")
|
|
|
|
# 답변과 문서의 직접적 연관성 추가 검증
|
|
answer = result.get("answer", "")
|
|
final_relevant_documents = []
|
|
|
|
for doc in filtered_documents:
|
|
if hasattr(doc, 'metadata') and doc.metadata:
|
|
filename = doc.metadata.get('filename', 'Unknown')
|
|
file_id = doc.metadata.get('file_id', 'unknown')
|
|
chunk_index = doc.metadata.get('chunk_index', 0)
|
|
page_count = doc.metadata.get('page_count', 0)
|
|
estimated_page = doc.metadata.get('estimated_page', chunk_index + 1)
|
|
|
|
# 실제 페이지 번호가 존재하는지 확인하고 범위 내에서만 참조
|
|
if page_count > 0 and estimated_page <= page_count:
|
|
page_number = estimated_page
|
|
else:
|
|
# 페이지 정보가 없거나 범위를 벗어난 경우 청크 인덱스 사용
|
|
page_number = min(chunk_index + 1, page_count) if page_count > 0 else chunk_index + 1
|
|
|
|
# 답변과 문서 내용의 직접적 연관성 추가 검증 (완화)
|
|
doc_content = doc.page_content if hasattr(doc, 'page_content') else ""
|
|
answer_doc_relevance = self._verify_answer_document_relevance(answer, doc_content, question)
|
|
|
|
# 연관성 검증을 거의 제거 (매우 관대하게 적용)
|
|
if answer_doc_relevance or len(doc_content) > 20: # 내용이 조금만 있어도 포함
|
|
final_relevant_documents.append(doc)
|
|
logger.info(f"📋 답변-문서 연관성 검증 통과: {filename} p{page_number}")
|
|
else:
|
|
logger.info(f"📋 답변-문서 연관성 검증 실패 - 제외됨: {filename} p{page_number}")
|
|
|
|
logger.info(f"📋 최종 관련성 검증 완료: {len(filtered_documents)}개 → {len(final_relevant_documents)}개")
|
|
|
|
# 최종 관련성 검증을 통과한 문서들만 참조 문서로 사용
|
|
for doc in final_relevant_documents:
|
|
if hasattr(doc, 'metadata') and doc.metadata:
|
|
# filename 추출 로직 개선
|
|
filename = doc.metadata.get('filename', 'Unknown')
|
|
if filename == 'Unknown' or not filename:
|
|
# source 필드에서 filename 추출 시도
|
|
source = doc.metadata.get('source', '')
|
|
if source:
|
|
# UUID_filename.pdf 형태에서 filename.pdf 추출
|
|
import re
|
|
match = re.search(r'[a-f0-9-]{36}_(.+)', source)
|
|
if match:
|
|
filename = match.group(1)
|
|
else:
|
|
filename = source
|
|
|
|
file_id = doc.metadata.get('file_id', 'unknown')
|
|
chunk_index = doc.metadata.get('chunk_index', 0)
|
|
page_count = doc.metadata.get('page_count', 0)
|
|
estimated_page = doc.metadata.get('estimated_page', chunk_index + 1)
|
|
|
|
# 실제 페이지 번호가 존재하는지 확인하고 범위 내에서만 참조
|
|
if page_count > 0 and estimated_page <= page_count:
|
|
page_number = estimated_page
|
|
else:
|
|
# 페이지 정보가 없거나 범위를 벗어난 경우 청크 인덱스 사용
|
|
page_number = min(chunk_index + 1, page_count) if page_count > 0 else chunk_index + 1
|
|
|
|
doc_content = doc.page_content if hasattr(doc, 'page_content') else ""
|
|
|
|
# 기존 참조 문서 형식 (하단 참조 문서 섹션용)
|
|
references.append(f"{filename}::{file_id} [p{page_number}]")
|
|
|
|
# 인라인 링크용 상세 정보 (최종 관련성 검증 통과한 문서만)
|
|
detailed_references.append({
|
|
"filename": filename,
|
|
"file_id": file_id,
|
|
"page_number": page_number,
|
|
"chunk_index": chunk_index,
|
|
"content_preview": doc_content[:100] + "..." if len(doc_content) > 100 else doc_content,
|
|
"full_content": doc_content, # 전체 내용 추가
|
|
"is_relevant": True # 최종 검증 통과
|
|
})
|
|
|
|
logger.info(f"📋 최종 참조 문서에 추가: {filename} p{page_number}")
|
|
logger.info(f"📋 detailed_references 길이: {len(detailed_references)}")
|
|
logger.info(f"📋 detailed_references 내용: {detailed_references[-1]}")
|
|
|
|
# <think> 태그 및 영어 사고 과정 제거 (강화된 버전)
|
|
answer = result.get("answer", "답변을 생성할 수 없습니다.")
|
|
import re
|
|
|
|
# <think>부터 </think>까지의 모든 내용 제거 (대소문자 구분 없음)
|
|
answer = re.sub(r'<think>.*?</think>', '', answer, flags=re.DOTALL | re.IGNORECASE).strip()
|
|
# </think> 태그만 있는 경우도 제거
|
|
answer = re.sub(r'</think>', '', answer, flags=re.IGNORECASE).strip()
|
|
# <think> 태그만 있는 경우도 제거
|
|
answer = re.sub(r'<think>', '', answer, flags=re.IGNORECASE).strip()
|
|
|
|
# 영어 사고 과정 완전 제거 (극강화 버전)
|
|
lines = answer.split('\n')
|
|
filtered_lines = []
|
|
korean_found = False
|
|
|
|
for line in lines:
|
|
original_line = line # 원본 줄 보존 (들여쓰기, 공백 등)
|
|
line_stripped = line.strip()
|
|
|
|
# 빈 줄은 그대로 유지 (마크다운 구조 보존)
|
|
if not line_stripped:
|
|
filtered_lines.append(original_line)
|
|
continue
|
|
|
|
# 한국어가 포함된 줄이 나오면 korean_found = True
|
|
if re.search(r'[가-힣]', line_stripped):
|
|
korean_found = True
|
|
|
|
# 한국어가 발견되기 전까지는 모든 영어 줄 무조건 제거
|
|
if not korean_found:
|
|
if re.match(r'^[A-Za-z]', line_stripped):
|
|
continue
|
|
|
|
# 한국어가 발견된 후에도 영어 사고 과정 제거 (극강화)
|
|
if re.match(r'^[A-Za-z]', line_stripped):
|
|
# 모든 영어 패턴 제거 (더 많은 패턴 추가)
|
|
if any(phrase in line_stripped.lower() for phrase in [
|
|
'let me', 'i need to', 'i should', 'i will', 'i can',
|
|
'the answer should', 'make sure', 'avoid any',
|
|
'structure the answer', 'break down', 'organize',
|
|
'tag or any thinking', 'so i need to focus',
|
|
'focus on the main content', 'documents',
|
|
'thinking process', 'internal thought',
|
|
'i need to focus', 'main content', 'tag', 'thinking',
|
|
'tag or any', 'so i need', 'focus on', 'main content of',
|
|
'documents', 'thinking', 'process', 'internal',
|
|
'part. but the user', 'wait, the initial', 'let me check',
|
|
'user also wants', 'answer in markdown', 'instructions say',
|
|
'use markdown for', 'check again', 'but the user also',
|
|
'wants the answer in', 'markdown. wait', 'the initial instructions',
|
|
'say to use markdown', 'for the answer', 'let me check again',
|
|
'part. but the user also wants the answer in markdown',
|
|
'wait, the initial instructions say to use markdown',
|
|
'let me check again', 'but the user also wants',
|
|
'wants the answer in markdown', 'markdown. wait',
|
|
'the initial instructions', 'say to use markdown for the answer'
|
|
]):
|
|
continue
|
|
# 영어 문장이지만 중요한 내용이 아닌 경우 제거
|
|
if len(line_stripped) > 10 and any(word in line_stripped.lower() for word in [
|
|
'thinking', 'process', 'structure', 'format', 'markdown',
|
|
'tag', 'focus', 'content', 'documents', 'internal',
|
|
'answer', 'should', 'make', 'sure', 'avoid',
|
|
'user', 'wants', 'instructions', 'check', 'again',
|
|
'part', 'but', 'let', 'me', 'say', 'use', 'for',
|
|
'wait', 'initial', 'also', 'the', 'in', 'to'
|
|
]):
|
|
continue
|
|
# 짧은 영어 문장도 제거 (사고 과정일 가능성)
|
|
if len(line_stripped) < 200 and not re.search(r'[가-힣]', line_stripped):
|
|
continue
|
|
|
|
# 원본 줄 유지 (들여쓰기, 공백 등 마크다운 구조 보존)
|
|
filtered_lines.append(original_line)
|
|
|
|
answer = '\n'.join(filtered_lines).strip()
|
|
|
|
# 마크다운 구조 보존을 위한 추가 처리
|
|
# 연속된 빈 줄을 하나로 정리하되, 마크다운 구조는 유지
|
|
answer = re.sub(r'\n\s*\n\s*\n+', '\n\n', answer)
|
|
|
|
# 마크다운 문법이 있는지 확인하고, 없다면 강제로 마크다운 형식 적용
|
|
if not re.search(r'^#{1,6}\s|^\*\*|^[-*]\s|^\d+\.\s|^\|', answer, re.MULTILINE):
|
|
# 마크다운 문법이 없으면 첫 번째 줄을 제목으로 만들기
|
|
lines = answer.split('\n')
|
|
if lines and lines[0].strip():
|
|
# 첫 번째 줄이 제목이 될 수 있는 내용인지 확인
|
|
first_line = lines[0].strip()
|
|
if len(first_line) > 5 and not first_line.endswith('.'):
|
|
lines[0] = f"# {first_line}"
|
|
answer = '\n'.join(lines)
|
|
|
|
# 추가 영어 사고 과정 제거 (전체 텍스트 레벨)
|
|
english_phrases_to_remove = [
|
|
"part. but the user also wants the answer in markdown. wait, the initial instructions say to use markdown for the answer. let me check again.",
|
|
"part. but the user also wants the answer in markdown",
|
|
"wait, the initial instructions say to use markdown",
|
|
"let me check again",
|
|
"but the user also wants",
|
|
"wants the answer in markdown",
|
|
"markdown. wait",
|
|
"the initial instructions",
|
|
"say to use markdown for the answer",
|
|
"part. but the user",
|
|
"user also wants",
|
|
"answer in markdown",
|
|
"initial instructions",
|
|
"use markdown",
|
|
"check again"
|
|
]
|
|
|
|
for phrase in english_phrases_to_remove:
|
|
answer = answer.replace(phrase, '').strip()
|
|
answer = answer.replace(phrase.lower(), '').strip()
|
|
answer = answer.replace(phrase.upper(), '').strip()
|
|
answer = answer.replace(phrase.capitalize(), '').strip()
|
|
|
|
# 정규식으로 영어 사고 과정 제거
|
|
import re
|
|
english_thinking_patterns = [
|
|
r'^[A-Za-z].*user.*wants.*answer.*markdown.*$',
|
|
r'^[A-Za-z].*part\.\s*but.*user.*also.*wants.*$',
|
|
r'^[A-Za-z].*wait.*initial.*instructions.*$',
|
|
r'^[A-Za-z].*let.*me.*check.*again.*$'
|
|
]
|
|
|
|
for pattern in english_thinking_patterns:
|
|
answer = re.sub(pattern, '', answer, flags=re.MULTILINE | re.IGNORECASE).strip()
|
|
|
|
# 마크다운 링크 제거 (예: [문서명](https://example.com/attachment1))
|
|
answer = re.sub(r'\[([^\]]+)\]\(https?://[^\)]+\)', r'[\1]', answer)
|
|
answer = re.sub(r'\[([^\]]+)\]\([^\)]*example\.com[^\)]*\)', r'[\1]', answer)
|
|
answer = re.sub(r'\[([^\]]+)\]\([^\)]*attachment\d+[^\)]*\)', r'[\1]', answer)
|
|
|
|
# 괄호 내 문서 ID 제거 (예: [문서명](RSC-PM-SOP-0001-F01))
|
|
answer = re.sub(r'\[([^\]]+)\]\([A-Z0-9-]+\)', r'[\1]', answer)
|
|
answer = re.sub(r'\[([^\]]+)\]\([A-Z0-9-]+-[A-Z0-9-]+\)', r'[\1]', answer)
|
|
|
|
|
|
response = {
|
|
"answer": answer,
|
|
"references": references,
|
|
"detailed_references": detailed_references, # 인라인 링크용 상세 정보
|
|
"source_documents": source_documents
|
|
}
|
|
|
|
logger.info(f"✅ LLM RAG 답변 생성 완료: {len(references)}개 참조")
|
|
return response
|
|
else:
|
|
# RAG 체인이 없는 경우 폴백
|
|
logger.warning("⚠️ RAG 체인이 초기화되지 않음. 폴백 모드로 전환")
|
|
return self._generate_fallback_answer(question)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ RAG 답변 생성 실패: {e}")
|
|
|
|
# vectordb 관련 오류인지 확인하고 사용자 친화적 메시지로 변환
|
|
error_message = str(e).lower()
|
|
if any(keyword in error_message for keyword in [
|
|
"readonly database",
|
|
"code: 1032",
|
|
"chromadb",
|
|
"vectorstore",
|
|
"collection",
|
|
"database error"
|
|
]):
|
|
logger.error(f"🔍 벡터DB 관련 오류 감지: {e}")
|
|
return {
|
|
"answer": "벡터DB에 문제가 있습니다. 관리자에게 문의해주세요.",
|
|
"references": [],
|
|
"source_documents": [],
|
|
"detailed_references": []
|
|
}
|
|
else:
|
|
# 오류 시 폴백 답변 생성
|
|
return self._generate_fallback_answer(question)
|
|
|
|
def _generate_fallback_answer(self, question: str) -> Dict[str, Any]:
|
|
"""폴백 답변 생성 (LLM 없이)"""
|
|
try:
|
|
# 유사 문서 검색
|
|
similar_docs = self.search_similar_documents(question, k=3)
|
|
|
|
if not similar_docs:
|
|
return {
|
|
"answer": "죄송합니다. 관련 문서를 찾을 수 없습니다. 다른 키워드로 검색해보시거나 문서를 업로드해주세요.",
|
|
"references": [],
|
|
"source_documents": []
|
|
}
|
|
|
|
# 문서 내용 요약
|
|
context_summary = ""
|
|
references = []
|
|
|
|
for i, doc in enumerate(similar_docs):
|
|
# 문서 내용의 핵심 부분 추출 (처음 300자)
|
|
content_preview = doc.page_content[:300].replace('\n', ' ').strip()
|
|
context_summary += f"\n• {content_preview}...\n"
|
|
|
|
if hasattr(doc, 'metadata') and doc.metadata:
|
|
filename = doc.metadata.get('filename', 'Unknown')
|
|
file_id = doc.metadata.get('file_id', 'unknown')
|
|
chunk_index = doc.metadata.get('chunk_index', 0)
|
|
page_number = chunk_index + 1
|
|
references.append(f"{filename}::{file_id} [p{page_number}]")
|
|
|
|
# 간단한 답변 생성
|
|
answer = f"질문하신 '{question}'에 대한 관련 문서를 찾았습니다.\n\n참조 문서에서 관련 내용을 확인할 수 있습니다:\n{context_summary}"
|
|
|
|
response = {
|
|
"answer": answer,
|
|
"references": references,
|
|
"source_documents": similar_docs
|
|
}
|
|
|
|
logger.info(f"✅ 폴백 답변 생성 완료: {len(references)}개 참조")
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 폴백 답변 생성 실패: {e}")
|
|
|
|
# vectordb 관련 오류인지 확인하고 사용자 친화적 메시지로 변환
|
|
error_message = str(e).lower()
|
|
if any(keyword in error_message for keyword in [
|
|
"readonly database",
|
|
"code: 1032",
|
|
"chromadb",
|
|
"vectorstore",
|
|
"collection",
|
|
"database error"
|
|
]):
|
|
logger.error(f"🔍 벡터DB 관련 오류 감지: {e}")
|
|
return {
|
|
"answer": "벡터DB에 문제가 있습니다. 관리자에게 문의해주세요.",
|
|
"references": [],
|
|
"source_documents": []
|
|
}
|
|
else:
|
|
return {
|
|
"answer": "죄송합니다. 현재 시스템 오류로 인해 답변을 생성할 수 없습니다. 잠시 후 다시 시도해주세요.",
|
|
"references": [],
|
|
"source_documents": []
|
|
}
|
|
|
|
def get_collection_info(self) -> Dict[str, Any]:
|
|
"""컬렉션 정보 조회"""
|
|
try:
|
|
# ChromaDB 컬렉션 정보
|
|
collection = self.vectorstore._collection
|
|
count = collection.count()
|
|
|
|
return {
|
|
"total_documents": count,
|
|
"collection_name": "research_documents",
|
|
"embedding_model": "jhgan/ko-sroberta-multitask"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 컬렉션 정보 조회 실패: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def delete_documents_by_filename(self, filename: str):
|
|
"""파일명으로 문서 삭제"""
|
|
try:
|
|
# 메타데이터로 필터링하여 삭제
|
|
collection = self.vectorstore._collection
|
|
|
|
# source 필드에서 파일명 추출하여 삭제
|
|
# source는 "UUID_파일명.pdf" 형태
|
|
collection.delete(where={"source": {"$contains": filename}})
|
|
logger.info(f"✅ {filename} 관련 문서 삭제 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 문서 삭제 실패: {e}")
|
|
# 삭제 실패해도 계속 진행 (파일이 이미 삭제되었을 수 있음)
|
|
logger.warning(f"⚠️ 문서 삭제 실패했지만 계속 진행: {e}")
|
|
|
|
def cleanup_database_by_filename(self, filename: str):
|
|
"""데이터베이스에서 파일 관련 데이터 정리"""
|
|
try:
|
|
cursor = self.db_connection.cursor()
|
|
|
|
# 파일 관련 벡터 데이터 삭제
|
|
cursor.execute(
|
|
"DELETE FROM file_vectors WHERE filename = %s",
|
|
(filename,)
|
|
)
|
|
|
|
# 파일 메타데이터 삭제
|
|
cursor.execute(
|
|
"DELETE FROM files WHERE filename = %s",
|
|
(filename,)
|
|
)
|
|
|
|
cursor.close()
|
|
logger.info(f"✅ {filename} 데이터베이스 정리 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 데이터베이스 정리 실패: {e}")
|
|
raise
|
|
|
|
|
|
# 전역 서비스 인스턴스
|
|
langchain_service = LangChainRAGService()
|