""" LangChain v0.3 기반 AI 서비스 향후 고도화를 위한 확장 가능한 아키텍처 """ import os import logging from typing import List, Dict, Any, Optional from .context_retrieval import ContextRetrieval # LangChain Core from langchain_core.documents import Document from langchain_core.embeddings import Embeddings from langchain_core.vectorstores import VectorStore from langchain_core.retrievers import BaseRetriever from langchain_core.language_models import BaseLanguageModel from langchain_core.prompts import PromptTemplate # LangChain Community from langchain_community.vectorstores import Chroma from langchain_community.embeddings import SentenceTransformerEmbeddings from langchain_community.llms import Ollama # LangChain Chains from langchain.chains.combine_documents import create_stuff_documents_chain from langchain.chains import create_retrieval_chain # Database import psycopg2 from psycopg2.extras import RealDictCursor logger = logging.getLogger(__name__) class LangChainRAGService: """LangChain 기반 RAG 서비스""" def __init__(self): self.embeddings: Optional[Embeddings] = None self.vectorstore: Optional[VectorStore] = None self.llm: Optional[BaseLanguageModel] = None self.retriever: Optional[BaseRetriever] = None self.vector_retriever: Optional[BaseRetriever] = None self.mmr_retriever: Optional[BaseRetriever] = None self.qa_chain: Optional[Any] = None self.db_connection = None self.context_retrieval: Optional[ContextRetrieval] = None def initialize(self): """LangChain 컴포넌트 초기화""" try: # 임베딩 모델 초기화 self.embeddings = SentenceTransformerEmbeddings( model_name="jhgan/ko-sroberta-multitask" ) logger.info("✅ LangChain 임베딩 모델 로드 완료") # ChromaDB 벡터스토어 초기화 (권한 문제 해결) self._initialize_chromadb() logger.info("✅ LangChain ChromaDB 초기화 완료") # Ollama LLM 초기화 (temperature 0.3으로 설정) self.llm = Ollama( model="qwen3:latest", base_url="http://localhost:11434", temperature=0.3 # 더 일관되고 정확한 답변을 위한 낮은 temperature ) logger.info("✅ LangChain Ollama LLM 초기화 완료") # 리트리버 설정 (ChromaDB 초기화 완료 후에 호출) self._setup_retrievers() logger.info("✅ LangChain 리트리버 초기화 완료") # RAG 체인 구성 self._setup_rag_chain() # 데이터베이스 연결 self._setup_database() # 컨텍스트 검색 시스템 초기화 self._initialize_context_retrieval() logger.info("🚀 LangChain RAG 서비스 초기화 완료") except Exception as e: logger.error(f"❌ LangChain 서비스 초기화 실패: {e}") raise def _initialize_chromadb(self): """ChromaDB 초기화 (권한 문제 해결)""" try: import os import shutil # vectordb 디렉토리 생성 및 권한 설정 vectordb_dir = "./vectordb" if not os.path.exists(vectordb_dir): os.makedirs(vectordb_dir, mode=0o755, exist_ok=True) logger.info(f"📁 vectordb 디렉토리 생성: {vectordb_dir}") try: # 기존 컬렉션이 있는지 확인 try: temp_client = Chroma( persist_directory=vectordb_dir, embedding_function=self.embeddings, collection_name="research_documents" ) # 컬렉션 접근 테스트 temp_client._collection.count() self.vectorstore = temp_client logger.info("✅ 기존 ChromaDB 컬렉션 접근 성공") except Exception as e: logger.warning(f"⚠️ 기존 컬렉션 접근 실패 (시도 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: # 컬렉션 재생성 시도 try: # 기존 vectordb 디렉토리 삭제 후 재생성 if os.path.exists(vectordb_dir): shutil.rmtree(vectordb_dir) os.makedirs(vectordb_dir, mode=0o755, exist_ok=True) logger.info(f"🔄 vectordb 디렉토리 재생성: {vectordb_dir}") except Exception as cleanup_error: logger.error(f"❌ vectordb 디렉토리 재생성 실패: {cleanup_error}") else: # 마지막 시도에서도 실패하면 vectordb 디렉토리 완전 삭제 후 재생성 try: if os.path.exists(vectordb_dir): shutil.rmtree(vectordb_dir) os.makedirs(vectordb_dir, mode=0o755, exist_ok=True) logger.info(f"🔄 vectordb 디렉토리 완전 재생성: {vectordb_dir}") # 새로운 ChromaDB 클라이언트 생성 self.vectorstore = Chroma( persist_directory=vectordb_dir, embedding_function=self.embeddings, collection_name="research_documents" ) logger.info("✅ 새로운 ChromaDB 컬렉션 생성 성공") except Exception as final_error: logger.error(f"❌ ChromaDB 최종 초기화 실패: {final_error}") raise except Exception as e: logger.error(f"❌ ChromaDB 초기화 실패 (시도 {attempt + 1}/{max_retries}): {e}") if attempt == max_retries - 1: raise else: # 재시도 전 잠시 대기 import time time.sleep(1) except Exception as e: logger.error(f"❌ ChromaDB 초기화 실패: {e}") raise def _setup_retrievers(self): """리트리버 설정""" try: logger.info("🔧 리트리버 설정 시작...") logger.info(f"🔧 vectorstore 상태: {self.vectorstore is not None}") # 하이브리드 검색을 위한 다중 리트리버 설정 self.vector_retriever = self.vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 10} ) logger.info("🔧 vector_retriever 설정 완료") # MMR (Maximal Marginal Relevance) 리트리버 추가 self.mmr_retriever = self.vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 8, "fetch_k": 15, "lambda_mult": 0.5} ) logger.info("🔧 mmr_retriever 설정 완료") # 기본 리트리버는 벡터 검색 사용 self.retriever = self.vector_retriever logger.info("✅ 리트리버 설정 완료") except Exception as e: logger.error(f"❌ 리트리버 설정 실패: {e}") raise def _setup_rag_chain(self): """RAG 체인 설정""" try: # 개선된 프롬프트 템플릿 prompt_template = """ 당신은 연구 문서 전문가입니다. 주어진 문서들을 바탕으로 사용자의 질문에 정확하고 상세하게 답변해주세요. **답변 규칙:** 1. 답변은 반드시 한국어로만 시작하세요 2. 태그나 영어 사고 과정을 절대 포함하지 마세요 3. 바로 최종 답변만 제공하세요 4. 문서의 내용을 정확히 파악하고 요약하여 답변하세요 5. 구체적인 절차나 방법이 있다면 단계별로 설명하세요 6. 문서에 없는 내용은 추측하지 말고 "문서에 명시되지 않음"이라고 하세요 7. **마크다운 형식을 적극적으로 사용하여 구조화된 답변을 제공하세요** **마크다운 구조화 규칙 (중요):** - 답변을 여러 섹션으로 나누어 ## 헤딩을 사용하세요 - 절차나 단계가 있다면 ### 소제목으로 구분하세요 - 중요한 내용은 **볼드**로 강조하세요 - 목록이나 절차는 - 또는 1. 2. 3. 형태의 리스트로 작성하세요 - 주의사항이나 중요 정보는 > 인용문으로 표시하세요 - 표가 필요한 경우 | 표 형태로 작성하세요 - 답변을 최소 3-4개 섹션으로 나누어 상세하게 설명하세요 - 각 섹션은 2-3문장 이상으로 구성하세요 **참조 문서:** {context} **사용자 질문:** {input} **답변:** 위 문서의 내용을 바탕으로 질문에 대한 구체적이고 실용적인 답변을 마크다운 형식으로 바로 제공해주세요. **답변 구조 요구사항:** - 답변을 최소 3-4개 섹션으로 나누어 작성하세요 - 각 섹션은 ## 헤딩으로 시작하세요 - 절차나 단계가 있다면 ### 소제목과 번호 리스트를 사용하세요 - 중요한 용어나 개념은 **볼드**로 강조하세요 - 주의사항이나 중요 정보는 > 인용문으로 표시하세요 - 답변을 충분히 상세하고 길게 작성하세요 (최소 300자 이상) **금지사항:** - [문서명](https://example.com/attachment1) 같은 마크다운 링크 형태 절대 사용 금지 - [문서명](RSC-PM-SOP-0001-F01) 같은 괄호 내 문서 ID 절대 사용 금지 - example.com 같은 가짜 URL 절대 사용 금지 - [문서명] 형태의 대괄호 절대 사용 금지 - 괄호나 추가 정보는 절대 포함하지 마세요 - 문서명은 일반 텍스트로만 표시하세요 (예: 기밀자료 관리 신청서, 의사결정 프로세스) """ prompt = PromptTemplate( template=prompt_template, input_variables=["context", "input"] ) # 문서 체인 생성 document_chain = create_stuff_documents_chain( llm=self.llm, prompt=prompt ) # RAG 체인 생성 self.qa_chain = create_retrieval_chain( retriever=self.retriever, combine_docs_chain=document_chain ) logger.info("✅ RAG 체인 설정 완료") except Exception as e: logger.error(f"❌ RAG 체인 설정 실패: {e}") raise def _setup_database(self): """데이터베이스 연결 설정""" try: self.db_connection = psycopg2.connect( host="localhost", port=5432, database="researchqa", user="woonglab", password="!@#woonglab" ) self.db_connection.autocommit = True logger.info("✅ PostgreSQL 연결 완료") except Exception as e: logger.error(f"❌ PostgreSQL 연결 실패: {e}") raise def _initialize_context_retrieval(self): """컨텍스트 검색 시스템 초기화""" try: self.context_retrieval = ContextRetrieval() logger.info("✅ 컨텍스트 검색 시스템 초기화 완료") # 기존 벡터스토어에서 문서들을 가져와서 컨텍스트 검색 인덱스 구축 self._build_context_index() except Exception as e: logger.error(f"❌ 컨텍스트 검색 시스템 초기화 실패: {e}") raise def _build_context_index(self): """컨텍스트 검색을 위한 인덱스 구축""" try: if not self.vectorstore: logger.warning("⚠️ 벡터스토어가 초기화되지 않았습니다.") return # 벡터스토어에서 모든 문서 가져오기 all_docs = self.vectorstore.similarity_search("", k=10000) # 모든 문서 검색 # 컨텍스트 검색용 문서 형식으로 변환 context_documents = [] for doc in all_docs: context_documents.append({ 'content': doc.page_content, 'metadata': doc.metadata, 'document': doc }) # 컨텍스트 검색 인덱스 구축 self.context_retrieval.build_index(context_documents) logger.info(f"✅ 컨텍스트 검색 인덱스 구축 완료: {len(context_documents)}개 문서") except Exception as e: logger.error(f"❌ 컨텍스트 검색 인덱스 구축 실패: {e}") # 인덱스 구축 실패는 치명적이지 않으므로 경고만 출력 logger.warning("⚠️ 컨텍스트 검색을 사용하지 않고 기본 검색을 사용합니다.") def _update_context_index(self, new_documents: List[Document]): """새로운 문서로 컨텍스트 검색 인덱스 업데이트""" try: if not self.context_retrieval: return # 새 문서를 컨텍스트 검색용 형식으로 변환 context_documents = [] for doc in new_documents: context_documents.append({ 'content': doc.page_content, 'metadata': doc.metadata, 'document': doc }) # 기존 인덱스에 새 문서 추가 if hasattr(self.context_retrieval.context_bm25, 'documents'): self.context_retrieval.context_bm25.documents.extend(context_documents) # BM25 인덱스 재구축 self.context_retrieval.context_bm25.build_index(self.context_retrieval.context_bm25.documents) logger.info(f"✅ 컨텍스트 검색 인덱스 업데이트 완료: {len(new_documents)}개 문서 추가") except Exception as e: logger.error(f"❌ 컨텍스트 검색 인덱스 업데이트 실패: {e}") # 인덱스 업데이트 실패는 치명적이지 않으므로 경고만 출력 logger.warning("⚠️ 컨텍스트 검색 인덱스 업데이트를 건너뜁니다.") def add_documents(self, documents: List[Document], metadata: Dict[str, Any] = None): """문서를 벡터스토어에 추가 (재시도 로직 포함)""" max_retries = 3 for attempt in range(max_retries): try: if metadata: for doc in documents: doc.metadata.update(metadata) # ChromaDB에 문서 추가 self.vectorstore.add_documents(documents) logger.info(f"✅ {len(documents)}개 문서 추가 완료") # 컨텍스트 검색 인덱스 업데이트 self._update_context_index(documents) return # 성공 시 함수 종료 except Exception as e: logger.error(f"❌ 문서 추가 실패 (시도 {attempt + 1}/{max_retries}): {e}") # vectordb 관련 오류인지 확인 error_message = str(e).lower() if any(keyword in error_message for keyword in [ "readonly database", "code: 1032", "chromadb", "vectorstore", "collection", "database error" ]): logger.error(f"🔍 벡터DB 관련 오류 감지: {e}") if attempt < max_retries - 1: # ChromaDB 재초기화 시도 try: logger.info(f"🔄 ChromaDB 재초기화 시도 (시도 {attempt + 1}/{max_retries})") self._initialize_chromadb() logger.info("✅ ChromaDB 재초기화 완료") except Exception as reinit_error: logger.error(f"❌ ChromaDB 재초기화 실패: {reinit_error}") else: # 마지막 시도에서도 실패하면 사용자 친화적 메시지 반환 raise Exception("벡터DB에 문제가 있습니다.") else: # vectordb 관련이 아닌 오류는 즉시 재발생 raise # 모든 재시도가 실패한 경우 raise Exception("벡터DB에 문제가 있습니다.") def add_documents_with_metadata(self, documents_with_metadata: List[Dict], additional_metadata: Dict[str, Any]): """메타데이터가 포함된 문서들을 벡터스토어에 추가""" try: from langchain_core.documents import Document # 메타데이터가 포함된 문서들을 LangChain Document로 변환 langchain_docs = [] for doc_data in documents_with_metadata: # 기본 메타데이터와 추가 메타데이터 병합 merged_metadata = {**doc_data.get("metadata", {}), **additional_metadata} langchain_doc = Document( page_content=doc_data.get("page_content", ""), metadata=merged_metadata ) langchain_docs.append(langchain_doc) # 기존 add_documents 메서드 사용 self.add_documents(langchain_docs) logger.info(f"✅ 메타데이터 포함 {len(langchain_docs)}개 문서 추가 완료") except Exception as e: logger.error(f"❌ 메타데이터 포함 문서 추가 실패: {e}") raise def search_similar_documents(self, query: str, k: int = 5) -> List[Document]: """유사 문서 검색""" try: docs = self.vectorstore.similarity_search(query, k=k) logger.info(f"✅ {len(docs)}개 유사 문서 검색 완료") return docs except Exception as e: logger.error(f"❌ 유사 문서 검색 실패: {e}") # vectordb 관련 오류인지 확인하고 사용자 친화적 메시지로 변환 error_message = str(e).lower() if any(keyword in error_message for keyword in [ "readonly database", "code: 1032", "chromadb", "vectorstore", "collection", "database error" ]): logger.error(f"🔍 벡터DB 관련 오류 감지: {e}") raise Exception("벡터DB에 문제가 있습니다.") else: raise def _filter_relevant_documents(self, documents: List[Document], question: str) -> List[Document]: """고급 문서 관련성 필터링 (최신 RAG 기법 적용)""" if not documents: return documents # 질문에서 키워드 추출 question_keywords = self._extract_keywords(question) # 각 문서의 관련성 점수 계산 scored_documents = [] for doc in documents: score = self._calculate_advanced_relevance_score(doc, question_keywords, question) scored_documents.append((doc, score)) # 점수순으로 정렬 scored_documents.sort(key=lambda x: x[1], reverse=True) # 1단계: 높은 관련성 문서만 선별 (더 완화) high_relevance_docs = [doc for doc, score in scored_documents if score > 0.1] # 0.3→0.1 # 2단계: 중간 관련성 문서 선별 (더 완화) medium_relevance_docs = [doc for doc, score in scored_documents if 0.05 < score <= 0.1] # 0.1→0.05 # 3단계: 최종 문서 선택 (다양성 강화) final_docs = [] used_filenames = set() used_pages = set() # 페이지 다양성도 고려 # 높은 관련성 문서 우선 추가 (더 완화) for doc in high_relevance_docs: if len(final_docs) >= 15: # 최대 15개로 완화 (12→15) break filename = doc.metadata.get('filename', '') page_key = f"{filename}_{doc.metadata.get('estimated_page', 1)}" # 파일명과 페이지 다양성 모두 고려 (완화) if filename not in used_filenames and page_key not in used_pages: final_docs.append(doc) used_filenames.add(filename) used_pages.add(page_key) # 중간 관련성 문서에서 다양성을 위해 추가 선택 (더 완화) if len(final_docs) < 10 and medium_relevance_docs: # 최소 10개로 완화 (8→10) for doc in medium_relevance_docs: if len(final_docs) >= 15: # 최대 15개로 완화 (12→15) break filename = doc.metadata.get('filename', '') page_key = f"{filename}_{doc.metadata.get('estimated_page', 1)}" # 파일명과 페이지 다양성 모두 고려 if filename not in used_filenames or page_key not in used_pages: final_docs.append(doc) used_filenames.add(filename) used_pages.add(page_key) # 최소 2개 문서 보장 (유지) if len(final_docs) < 2: for doc, score in scored_documents: if len(final_docs) >= 2: # 최소 2개로 50% 감소 (3→2) break filename = doc.metadata.get('filename', '') page_key = f"{filename}_{doc.metadata.get('estimated_page', 1)}" if filename not in used_filenames or page_key not in used_pages: final_docs.append(doc) used_filenames.add(filename) used_pages.add(page_key) logger.info(f"📊 고급 문서 필터링: {len(documents)}개 → {len(final_docs)}개") # 필터링된 문서들의 점수와 파일명 로깅 for i, (doc, score) in enumerate(scored_documents[:8]): # filename 추출 로직 개선 filename = doc.metadata.get('filename', 'Unknown') if doc.metadata else 'Unknown' if filename == 'Unknown' or not filename: # source 필드에서 filename 추출 시도 source = doc.metadata.get('source', '') if doc.metadata else '' if source: # UUID_filename.pdf 형태에서 filename.pdf 추출 import re match = re.search(r'[a-f0-9-]{36}_(.+)', source) if match: filename = match.group(1) else: filename = source status = "✅ 선택됨" if doc in final_docs else "❌ 제외됨" logger.info(f"📋 문서 {i+1}: {filename} (점수: {score:.2f}) - {status}") return final_docs def _extract_keywords(self, text: str) -> List[str]: """텍스트에서 키워드 추출""" import re # 한국어 키워드 추출 (2글자 이상) korean_keywords = re.findall(r'[가-힣]{2,}', text) # 영어 키워드 추출 (3글자 이상) english_keywords = re.findall(r'[A-Za-z]{3,}', text) # 숫자와 특수문자 제거 keywords = [] for keyword in korean_keywords + english_keywords: if not re.search(r'[0-9]', keyword) and len(keyword) >= 2: keywords.append(keyword.lower()) return list(set(keywords)) # 중복 제거 def _calculate_advanced_relevance_score(self, doc: Document, question_keywords: List[str], question: str) -> float: """고급 문서 관련성 점수 계산 (최신 RAG 기법 적용)""" if not hasattr(doc, 'page_content'): return 0.0 content = doc.page_content.lower() # filename 추출 로직 개선 filename = doc.metadata.get('filename', '').lower() if doc.metadata else '' if not filename: # source 필드에서 filename 추출 시도 source = doc.metadata.get('source', '') if doc.metadata else '' if source: # UUID_filename.pdf 형태에서 filename.pdf 추출 import re match = re.search(r'[a-f0-9-]{36}_(.+)', source) if match: filename = match.group(1).lower() else: filename = source.lower() score = 0.0 # 1. 파일명 관련성 (가중치 매우 높음) filename_score = 0.0 for keyword in question_keywords: if keyword in filename: filename_score += 2.0 # 파일명 매칭에 더 높은 점수 score += filename_score * 3.0 # 파일명 매칭에 매우 높은 가중치 # 2. 내용 관련성 (키워드 밀도 고려) content_score = 0.0 total_keywords = len(question_keywords) matched_keywords = 0 for keyword in question_keywords: if keyword in content: matched_keywords += 1 # 키워드가 여러 번 나타날수록 더 높은 점수 keyword_count = content.count(keyword) content_score += min(keyword_count * 0.5, 2.0) # 최대 2.0점 # 키워드 매칭 비율에 따른 보너스 if total_keywords > 0: match_ratio = matched_keywords / total_keywords content_score += match_ratio * 2.0 # 매칭 비율에 따른 보너스 score += content_score # 3. 구체적 키워드 매칭 (도메인 특화) domain_keywords = { '연구노트': ['연구노트', '연구노트문서', '노트대출', '노트관리', '연구노트작성'], '대출': ['대출', '대출절차', '대출신청', '대출승인', '문서대출'], '신청': ['신청', '신청서', '승인절차', '신청절차', '승인신청'], '자료실': ['자료실', '연구자료실', '자료실운영', '자료실관리', '자료실접근'] } for main_keyword, related_keywords in domain_keywords.items(): if main_keyword in question: for related_keyword in related_keywords: if related_keyword in content: score += 1.5 # 도메인 키워드 매칭에 높은 점수 if related_keyword in filename: score += 2.0 # 파일명에 도메인 키워드가 있으면 더 높은 점수 # 4. 문맥적 관련성 (문장 단위 매칭) sentences = content.split('.') context_score = 0.0 for sentence in sentences: sentence_lower = sentence.lower().strip() if len(sentence_lower) > 10: # 의미있는 문장만 고려 keyword_in_sentence = sum(1 for keyword in question_keywords if keyword in sentence_lower) if keyword_in_sentence >= 2: # 한 문장에 2개 이상 키워드가 있으면 context_score += 1.0 score += context_score * 0.8 # 5. 관련성 낮은 문서 패턴 제거 (50% 더 엄격하게) irrelevant_patterns = [ '시료', '외부송부', '보고서작성', '기밀자료송부', '안전성시험', '임상시험', '참고문헌', '관련문서', '붙임', '목차', '인덱스', 'table of contents', '해당사항 없음', '없음', '참조', 'reference', 'attachment', '문서 번호', '개정번호', '발효일자', 'format no', 'document no', 'e-signature', '전자서명', 'written by', 'reviewed by', 'approved by', 'justification', 'author', 'qa review', 'director approval', 'head approval', '의사결정', '결재라인', '승인라인', '프로세스', '송부프로세스', '대외성과발표', '승인프로세스', '성과발표', '발표승인', '승인절차' ] for pattern in irrelevant_patterns: if pattern in filename and not any(keyword in question for keyword in ['시료', '송부', '보고서', '기밀', '참고', '관련', '성과', '발표']): score -= 3.0 # 관련성 낮은 패턴에 더 큰 페널티 (2.0→3.0) # 6. 목차/인덱스/메타데이터 페이지 필터링 (더 정교하게) is_index_page = ( len(content) < 500 or # 내용이 너무 짧음 content.count('\n') > content.count(' ') * 0.4 or # 줄바꿈이 많아서 목록 형태 content.count('dwprnd') + content.count('sop') + content.count('f0') >= 3 or # 문서 번호가 너무 많음 len([s for s in content.split('.') if len(s.strip()) > 30]) < 3 or # 구체적인 설명이 부족 # 추가: 목차 특성 패턴 감지 (content.count('붙임') >= 2 and content.count('f0') >= 2) or # 붙임 목록이 많음 (content.count('참고문헌') > 0 and content.count('관련문서') > 0) or # 목차 구조 content.count('해당사항 없음') > 0 or # 목차에서 자주 나타나는 표현 # 추가: 개정내역/메타데이터 페이지 감지 (content.count('개정번호') > 0 and content.count('개정항목') > 0) or # 개정내역 테이블 (content.count('개정사유') > 0 and content.count('발효일자') > 0) or # 개정내역 테이블 content.count('신규제정') > 0 or # 개정내역에서 자주 나타나는 표현 content.count('sop 양식 개정') > 0 or # 개정내역에서 자주 나타나는 표현 content.count('조직변경') > 0 or # 개정내역에서 자주 나타나는 표현 # 추가: 테이블 형태의 메타데이터 감지 (content.count('|') > 10 and content.count('개정') > 0) or # 개정내역 테이블 (content.count('번호') > 0 and content.count('일자') > 0 and content.count('변경') > 0) # 메타데이터 테이블 ) if is_index_page: score *= 0.02 # 목차/인덱스 페이지는 점수를 매우 크게 감소 logger.info(f"📋 목차/인덱스 페이지 감지: {filename} - 점수 대폭 감소") # 7. 질문 특화 검증 (완화된 버전) if '연구노트' in question and '대출' in question: # 연구노트 대출과 직접 관련된 키워드들이 문서에 있는지 확인 research_note_loan_keywords = ['연구노트', '대출', '비기밀자료', '기밀자료관리', '외부송부', '신청서', '승인', '문서', '관리', '절차'] keyword_matches = sum(1 for keyword in research_note_loan_keywords if keyword in content) if keyword_matches < 2: # 최소 2개 이상의 관련 키워드가 있으면 포함 (4→2로 완화) score *= 0.3 # 관련성이 낮으면 점수를 감소하지만 완전히 제외하지 않음 (0.1→0.3) # 완화: "보고서 작성" 관련 문서도 부분적으로 포함 if '보고서' in filename and '작성' in filename: score *= 0.2 # 보고서 작성 관련 문서도 부분적으로 포함 (0.05→0.2) logger.info(f"📋 보고서 작성 문서 - 점수 감소: {filename}") # 완화: "송부 프로세스" 관련 문서도 부분적으로 포함 if '송부' in filename and '프로세스' in filename: score *= 0.2 # 송부 프로세스 관련 문서도 부분적으로 포함 (0.05→0.2) logger.info(f"📋 송부 프로세스 문서 - 점수 감소: {filename}") # 완화: "의사결정 프로세스" 관련 문서도 부분적으로 포함 if '의사결정' in content or '결재라인' in content or '승인라인' in content: score *= 0.2 # 의사결정 프로세스 관련 문서도 부분적으로 포함 (0.05→0.2) logger.info(f"📋 의사결정 프로세스 문서 - 점수 감소: {filename}") return max(0.0, score) # 음수 점수 방지 def _verify_answer_document_relevance(self, answer: str, doc_content: str, question: str) -> bool: """답변과 문서 내용의 직접적 연관성 검증""" if not answer or not doc_content: return False answer_lower = answer.lower() doc_content_lower = doc_content.lower() # 답변에서 추출한 핵심 키워드들 answer_keywords = [] # 답변에서 구체적인 정보나 절차를 추출 import re # 숫자나 코드 패턴 (예: "DWPRND-DT-SOP-001-F07", "201609T001") code_patterns = re.findall(r'[A-Z]{2,}[A-Z0-9-]+', answer) answer_keywords.extend(code_patterns) # 구체적인 절차나 단계 (예: "신청서 작성", "승인 절차") procedure_patterns = re.findall(r'[가-힣]{2,}\s*[가-힣]{2,}', answer) answer_keywords.extend(procedure_patterns) # 문서 내용에서 답변의 핵심 키워드가 실제로 언급되는지 확인 relevance_score = 0 for keyword in answer_keywords: if keyword.lower() in doc_content_lower: relevance_score += 1 # 답변의 핵심 내용이 문서에 실제로 존재하는지 확인 answer_sentences = [s.strip() for s in answer.split('.') if len(s.strip()) > 10] doc_sentence_matches = 0 for sentence in answer_sentences[:3]: # 답변의 처음 3개 문장만 확인 sentence_keywords = re.findall(r'[가-힣]{2,}', sentence) if len(sentence_keywords) >= 2: # 문장의 키워드들이 문서에 모두 있는지 확인 if all(keyword in doc_content_lower for keyword in sentence_keywords[:2]): doc_sentence_matches += 1 # 관련성 판단: 더 관대하게 판단 (완화) return relevance_score >= 0 or doc_sentence_matches >= 0 or len(answer_keywords) > 0 def _fallback_hybrid_search(self, question: str) -> List[Document]: """컨텍스트 검색 실패 시 사용할 기본 하이브리드 검색""" logger.info(f"🔄 기본 하이브리드 검색 실행: {question}") # 1. 벡터 검색으로 관련 문서 검색 vector_docs = self.vector_retriever.get_relevant_documents(question) logger.info(f"📊 벡터 검색 결과: {len(vector_docs)}개 문서") # 2. MMR 검색으로 다양성 있는 문서 검색 mmr_docs = self.mmr_retriever.get_relevant_documents(question) logger.info(f"📊 MMR 검색 결과: {len(mmr_docs)}개 문서") # 3. 두 검색 결과를 결합하여 중복 제거 all_docs = vector_docs + mmr_docs unique_docs = [] seen_metadata = set() for doc in all_docs: doc_key = f"{doc.metadata.get('filename', '')}_{doc.metadata.get('chunk_index', 0)}" if doc_key not in seen_metadata: unique_docs.append(doc) seen_metadata.add(doc_key) logger.info(f"📊 중복 제거 후 총 {len(unique_docs)}개 문서") return unique_docs def generate_answer(self, question: str) -> Dict[str, Any]: """RAG를 통한 답변 생성""" try: # 컨텍스트 기반 검색으로 더 정확한 문서 검색 logger.info(f"🤖 컨텍스트 기반 검색으로 문서 검색 시작: {question}") # 1. 컨텍스트 검색 시스템 사용 if self.context_retrieval: try: # 컨텍스트 기반 통합 검색 (임베딩 + BM25 + Reranker) context_results = self.context_retrieval.search(question, top_k=15) # 컨텍스트 검색 결과를 LangChain Document 형식으로 변환 unique_docs = [] for result in context_results: if 'document' in result: unique_docs.append(result['document']) logger.info(f"📊 컨텍스트 기반 검색 결과: {len(unique_docs)}개 문서") except Exception as e: logger.error(f"❌ 컨텍스트 검색 실패, 기본 검색으로 대체: {e}") # 컨텍스트 검색 실패 시 기본 하이브리드 검색 사용 unique_docs = self._fallback_hybrid_search(question) else: # 컨텍스트 검색 시스템이 없는 경우 기본 하이브리드 검색 사용 unique_docs = self._fallback_hybrid_search(question) # 2. RAG 체인을 사용하여 답변 생성 if self.qa_chain: logger.info(f"🤖 LLM을 사용한 RAG 답변 생성 시작: {question}") result = self.qa_chain.invoke({"input": question}) # LLM이 실제로 사용한 문서 정보 추출 references = [] detailed_references = [] # 인라인 링크용 상세 정보 source_documents = result.get("context", []) # LLM이 실제로 사용한 문서 logger.info(f"📋 LLM이 실제로 사용한 문서 수: {len(source_documents)}개") for i, doc in enumerate(source_documents): # filename 추출 로직 개선 filename = doc.metadata.get('filename', 'Unknown') if hasattr(doc, 'metadata') and doc.metadata else 'Unknown' if filename == 'Unknown' or not filename: # source 필드에서 filename 추출 시도 source = doc.metadata.get('source', '') if hasattr(doc, 'metadata') and doc.metadata else '' if source: # UUID_filename.pdf 형태에서 filename.pdf 추출 import re match = re.search(r'[a-f0-9-]{36}_(.+)', source) if match: filename = match.group(1) else: filename = source logger.info(f"📋 문서 {i+1}: {filename}") # LLM이 실제로 사용한 문서에 대해 관련성 재검증 및 필터링 logger.info(f"📋 LLM이 사용한 문서에 대해 관련성 재검증 시작: {len(source_documents)}개") filtered_documents = self._filter_relevant_documents(source_documents, question) logger.info(f"📋 관련성 재검증 완료: {len(source_documents)}개 → {len(filtered_documents)}개") # 답변과 문서의 직접적 연관성 추가 검증 answer = result.get("answer", "") final_relevant_documents = [] for doc in filtered_documents: if hasattr(doc, 'metadata') and doc.metadata: filename = doc.metadata.get('filename', 'Unknown') file_id = doc.metadata.get('file_id', 'unknown') chunk_index = doc.metadata.get('chunk_index', 0) page_count = doc.metadata.get('page_count', 0) estimated_page = doc.metadata.get('estimated_page', chunk_index + 1) # 실제 페이지 번호가 존재하는지 확인하고 범위 내에서만 참조 if page_count > 0 and estimated_page <= page_count: page_number = estimated_page else: # 페이지 정보가 없거나 범위를 벗어난 경우 청크 인덱스 사용 page_number = min(chunk_index + 1, page_count) if page_count > 0 else chunk_index + 1 # 답변과 문서 내용의 직접적 연관성 추가 검증 (완화) doc_content = doc.page_content if hasattr(doc, 'page_content') else "" answer_doc_relevance = self._verify_answer_document_relevance(answer, doc_content, question) # 연관성 검증을 거의 제거 (매우 관대하게 적용) if answer_doc_relevance or len(doc_content) > 20: # 내용이 조금만 있어도 포함 final_relevant_documents.append(doc) logger.info(f"📋 답변-문서 연관성 검증 통과: {filename} p{page_number}") else: logger.info(f"📋 답변-문서 연관성 검증 실패 - 제외됨: {filename} p{page_number}") logger.info(f"📋 최종 관련성 검증 완료: {len(filtered_documents)}개 → {len(final_relevant_documents)}개") # 최종 관련성 검증을 통과한 문서들만 참조 문서로 사용 for doc in final_relevant_documents: if hasattr(doc, 'metadata') and doc.metadata: # filename 추출 로직 개선 filename = doc.metadata.get('filename', 'Unknown') if filename == 'Unknown' or not filename: # source 필드에서 filename 추출 시도 source = doc.metadata.get('source', '') if source: # UUID_filename.pdf 형태에서 filename.pdf 추출 import re match = re.search(r'[a-f0-9-]{36}_(.+)', source) if match: filename = match.group(1) else: filename = source file_id = doc.metadata.get('file_id', 'unknown') chunk_index = doc.metadata.get('chunk_index', 0) page_count = doc.metadata.get('page_count', 0) estimated_page = doc.metadata.get('estimated_page', chunk_index + 1) # 실제 페이지 번호가 존재하는지 확인하고 범위 내에서만 참조 if page_count > 0 and estimated_page <= page_count: page_number = estimated_page else: # 페이지 정보가 없거나 범위를 벗어난 경우 청크 인덱스 사용 page_number = min(chunk_index + 1, page_count) if page_count > 0 else chunk_index + 1 doc_content = doc.page_content if hasattr(doc, 'page_content') else "" # 기존 참조 문서 형식 (하단 참조 문서 섹션용) references.append(f"{filename}::{file_id} [p{page_number}]") # 인라인 링크용 상세 정보 (최종 관련성 검증 통과한 문서만) detailed_references.append({ "filename": filename, "file_id": file_id, "page_number": page_number, "chunk_index": chunk_index, "content_preview": doc_content[:100] + "..." if len(doc_content) > 100 else doc_content, "full_content": doc_content, # 전체 내용 추가 "is_relevant": True # 최종 검증 통과 }) logger.info(f"📋 최종 참조 문서에 추가: {filename} p{page_number}") logger.info(f"📋 detailed_references 길이: {len(detailed_references)}") logger.info(f"📋 detailed_references 내용: {detailed_references[-1]}") # 태그 및 영어 사고 과정 제거 (강화된 버전) answer = result.get("answer", "답변을 생성할 수 없습니다.") import re # 부터 까지의 모든 내용 제거 (대소문자 구분 없음) answer = re.sub(r'.*?', '', answer, flags=re.DOTALL | re.IGNORECASE).strip() # 태그만 있는 경우도 제거 answer = re.sub(r'', '', answer, flags=re.IGNORECASE).strip() # 태그만 있는 경우도 제거 answer = re.sub(r'', '', answer, flags=re.IGNORECASE).strip() # 영어 사고 과정 완전 제거 (극강화 버전) lines = answer.split('\n') filtered_lines = [] korean_found = False for line in lines: original_line = line # 원본 줄 보존 (들여쓰기, 공백 등) line_stripped = line.strip() # 빈 줄은 그대로 유지 (마크다운 구조 보존) if not line_stripped: filtered_lines.append(original_line) continue # 한국어가 포함된 줄이 나오면 korean_found = True if re.search(r'[가-힣]', line_stripped): korean_found = True # 한국어가 발견되기 전까지는 모든 영어 줄 무조건 제거 if not korean_found: if re.match(r'^[A-Za-z]', line_stripped): continue # 한국어가 발견된 후에도 영어 사고 과정 제거 (극강화) if re.match(r'^[A-Za-z]', line_stripped): # 모든 영어 패턴 제거 (더 많은 패턴 추가) if any(phrase in line_stripped.lower() for phrase in [ 'let me', 'i need to', 'i should', 'i will', 'i can', 'the answer should', 'make sure', 'avoid any', 'structure the answer', 'break down', 'organize', 'tag or any thinking', 'so i need to focus', 'focus on the main content', 'documents', 'thinking process', 'internal thought', 'i need to focus', 'main content', 'tag', 'thinking', 'tag or any', 'so i need', 'focus on', 'main content of', 'documents', 'thinking', 'process', 'internal', 'part. but the user', 'wait, the initial', 'let me check', 'user also wants', 'answer in markdown', 'instructions say', 'use markdown for', 'check again', 'but the user also', 'wants the answer in', 'markdown. wait', 'the initial instructions', 'say to use markdown', 'for the answer', 'let me check again', 'part. but the user also wants the answer in markdown', 'wait, the initial instructions say to use markdown', 'let me check again', 'but the user also wants', 'wants the answer in markdown', 'markdown. wait', 'the initial instructions', 'say to use markdown for the answer' ]): continue # 영어 문장이지만 중요한 내용이 아닌 경우 제거 if len(line_stripped) > 10 and any(word in line_stripped.lower() for word in [ 'thinking', 'process', 'structure', 'format', 'markdown', 'tag', 'focus', 'content', 'documents', 'internal', 'answer', 'should', 'make', 'sure', 'avoid', 'user', 'wants', 'instructions', 'check', 'again', 'part', 'but', 'let', 'me', 'say', 'use', 'for', 'wait', 'initial', 'also', 'the', 'in', 'to' ]): continue # 짧은 영어 문장도 제거 (사고 과정일 가능성) if len(line_stripped) < 200 and not re.search(r'[가-힣]', line_stripped): continue # 원본 줄 유지 (들여쓰기, 공백 등 마크다운 구조 보존) filtered_lines.append(original_line) answer = '\n'.join(filtered_lines).strip() # 마크다운 구조 보존을 위한 추가 처리 # 연속된 빈 줄을 하나로 정리하되, 마크다운 구조는 유지 answer = re.sub(r'\n\s*\n\s*\n+', '\n\n', answer) # 마크다운 문법이 있는지 확인하고, 없다면 강제로 마크다운 형식 적용 if not re.search(r'^#{1,6}\s|^\*\*|^[-*]\s|^\d+\.\s|^\|', answer, re.MULTILINE): # 마크다운 문법이 없으면 첫 번째 줄을 제목으로 만들기 lines = answer.split('\n') if lines and lines[0].strip(): # 첫 번째 줄이 제목이 될 수 있는 내용인지 확인 first_line = lines[0].strip() if len(first_line) > 5 and not first_line.endswith('.'): lines[0] = f"# {first_line}" answer = '\n'.join(lines) # 추가 영어 사고 과정 제거 (전체 텍스트 레벨) english_phrases_to_remove = [ "part. but the user also wants the answer in markdown. wait, the initial instructions say to use markdown for the answer. let me check again.", "part. but the user also wants the answer in markdown", "wait, the initial instructions say to use markdown", "let me check again", "but the user also wants", "wants the answer in markdown", "markdown. wait", "the initial instructions", "say to use markdown for the answer", "part. but the user", "user also wants", "answer in markdown", "initial instructions", "use markdown", "check again" ] for phrase in english_phrases_to_remove: answer = answer.replace(phrase, '').strip() answer = answer.replace(phrase.lower(), '').strip() answer = answer.replace(phrase.upper(), '').strip() answer = answer.replace(phrase.capitalize(), '').strip() # 정규식으로 영어 사고 과정 제거 import re english_thinking_patterns = [ r'^[A-Za-z].*user.*wants.*answer.*markdown.*$', r'^[A-Za-z].*part\.\s*but.*user.*also.*wants.*$', r'^[A-Za-z].*wait.*initial.*instructions.*$', r'^[A-Za-z].*let.*me.*check.*again.*$' ] for pattern in english_thinking_patterns: answer = re.sub(pattern, '', answer, flags=re.MULTILINE | re.IGNORECASE).strip() # 마크다운 링크 제거 (예: [문서명](https://example.com/attachment1)) answer = re.sub(r'\[([^\]]+)\]\(https?://[^\)]+\)', r'[\1]', answer) answer = re.sub(r'\[([^\]]+)\]\([^\)]*example\.com[^\)]*\)', r'[\1]', answer) answer = re.sub(r'\[([^\]]+)\]\([^\)]*attachment\d+[^\)]*\)', r'[\1]', answer) # 괄호 내 문서 ID 제거 (예: [문서명](RSC-PM-SOP-0001-F01)) answer = re.sub(r'\[([^\]]+)\]\([A-Z0-9-]+\)', r'[\1]', answer) answer = re.sub(r'\[([^\]]+)\]\([A-Z0-9-]+-[A-Z0-9-]+\)', r'[\1]', answer) response = { "answer": answer, "references": references, "detailed_references": detailed_references, # 인라인 링크용 상세 정보 "source_documents": source_documents } logger.info(f"✅ LLM RAG 답변 생성 완료: {len(references)}개 참조") return response else: # RAG 체인이 없는 경우 폴백 logger.warning("⚠️ RAG 체인이 초기화되지 않음. 폴백 모드로 전환") return self._generate_fallback_answer(question) except Exception as e: logger.error(f"❌ RAG 답변 생성 실패: {e}") # vectordb 관련 오류인지 확인하고 사용자 친화적 메시지로 변환 error_message = str(e).lower() if any(keyword in error_message for keyword in [ "readonly database", "code: 1032", "chromadb", "vectorstore", "collection", "database error" ]): logger.error(f"🔍 벡터DB 관련 오류 감지: {e}") return { "answer": "벡터DB에 문제가 있습니다. 관리자에게 문의해주세요.", "references": [], "source_documents": [], "detailed_references": [] } else: # 오류 시 폴백 답변 생성 return self._generate_fallback_answer(question) def _generate_fallback_answer(self, question: str) -> Dict[str, Any]: """폴백 답변 생성 (LLM 없이)""" try: # 유사 문서 검색 similar_docs = self.search_similar_documents(question, k=3) if not similar_docs: return { "answer": "죄송합니다. 관련 문서를 찾을 수 없습니다. 다른 키워드로 검색해보시거나 문서를 업로드해주세요.", "references": [], "source_documents": [] } # 문서 내용 요약 context_summary = "" references = [] for i, doc in enumerate(similar_docs): # 문서 내용의 핵심 부분 추출 (처음 300자) content_preview = doc.page_content[:300].replace('\n', ' ').strip() context_summary += f"\n• {content_preview}...\n" if hasattr(doc, 'metadata') and doc.metadata: filename = doc.metadata.get('filename', 'Unknown') file_id = doc.metadata.get('file_id', 'unknown') chunk_index = doc.metadata.get('chunk_index', 0) page_number = chunk_index + 1 references.append(f"{filename}::{file_id} [p{page_number}]") # 간단한 답변 생성 answer = f"질문하신 '{question}'에 대한 관련 문서를 찾았습니다.\n\n참조 문서에서 관련 내용을 확인할 수 있습니다:\n{context_summary}" response = { "answer": answer, "references": references, "source_documents": similar_docs } logger.info(f"✅ 폴백 답변 생성 완료: {len(references)}개 참조") return response except Exception as e: logger.error(f"❌ 폴백 답변 생성 실패: {e}") # vectordb 관련 오류인지 확인하고 사용자 친화적 메시지로 변환 error_message = str(e).lower() if any(keyword in error_message for keyword in [ "readonly database", "code: 1032", "chromadb", "vectorstore", "collection", "database error" ]): logger.error(f"🔍 벡터DB 관련 오류 감지: {e}") return { "answer": "벡터DB에 문제가 있습니다. 관리자에게 문의해주세요.", "references": [], "source_documents": [] } else: return { "answer": "죄송합니다. 현재 시스템 오류로 인해 답변을 생성할 수 없습니다. 잠시 후 다시 시도해주세요.", "references": [], "source_documents": [] } def get_collection_info(self) -> Dict[str, Any]: """컬렉션 정보 조회""" try: # ChromaDB 컬렉션 정보 collection = self.vectorstore._collection count = collection.count() return { "total_documents": count, "collection_name": "research_documents", "embedding_model": "jhgan/ko-sroberta-multitask" } except Exception as e: logger.error(f"❌ 컬렉션 정보 조회 실패: {e}") return {"error": str(e)} def delete_documents_by_filename(self, filename: str): """파일명으로 문서 삭제""" try: # 메타데이터로 필터링하여 삭제 collection = self.vectorstore._collection # source 필드에서 파일명 추출하여 삭제 # source는 "UUID_파일명.pdf" 형태 collection.delete(where={"source": {"$contains": filename}}) logger.info(f"✅ {filename} 관련 문서 삭제 완료") except Exception as e: logger.error(f"❌ 문서 삭제 실패: {e}") # 삭제 실패해도 계속 진행 (파일이 이미 삭제되었을 수 있음) logger.warning(f"⚠️ 문서 삭제 실패했지만 계속 진행: {e}") def cleanup_database_by_filename(self, filename: str): """데이터베이스에서 파일 관련 데이터 정리""" try: cursor = self.db_connection.cursor() # 파일 관련 벡터 데이터 삭제 cursor.execute( "DELETE FROM file_vectors WHERE filename = %s", (filename,) ) # 파일 메타데이터 삭제 cursor.execute( "DELETE FROM files WHERE filename = %s", (filename,) ) cursor.close() logger.info(f"✅ {filename} 데이터베이스 정리 완료") except Exception as e: logger.error(f"❌ 데이터베이스 정리 실패: {e}") raise # 전역 서비스 인스턴스 langchain_service = LangChainRAGService()