""" LangChain v0.3 기반 AI 서비스 향후 고도화를 위한 확장 가능한 아키텍처 """ import os import logging from typing import List, Dict, Any, Optional from datetime import datetime # LangChain Core from langchain_core.documents import Document from langchain_core.embeddings import Embeddings from langchain_core.vectorstores import VectorStore from langchain_core.retrievers import BaseRetriever from langchain_core.language_models import BaseLanguageModel from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough, RunnableParallel # LangChain Community from langchain_community.vectorstores import Chroma from langchain_community.embeddings import SentenceTransformerEmbeddings from langchain_community.llms import Ollama # LangChain Chains from langchain.chains import RetrievalQA from langchain.chains.combine_documents import create_stuff_documents_chain from langchain.chains import create_retrieval_chain # Database import psycopg2 from psycopg2.extras import RealDictCursor logger = logging.getLogger(__name__) class LangChainRAGService: """LangChain 기반 RAG 서비스""" def __init__(self): self.embeddings: Optional[Embeddings] = None self.vectorstore: Optional[VectorStore] = None self.llm: Optional[BaseLanguageModel] = None self.retriever: Optional[BaseRetriever] = None self.qa_chain: Optional[Any] = None self.db_connection = None def initialize(self): """LangChain 컴포넌트 초기화""" try: # 임베딩 모델 초기화 self.embeddings = SentenceTransformerEmbeddings( model_name="jhgan/ko-sroberta-multitask" ) logger.info("✅ LangChain 임베딩 모델 로드 완료") # ChromaDB 벡터스토어 초기화 self.vectorstore = Chroma( persist_directory="./vectordb", embedding_function=self.embeddings, collection_name="research_documents" ) logger.info("✅ LangChain ChromaDB 초기화 완료") # Ollama LLM 초기화 self.llm = Ollama( model="qwen3:latest", base_url="http://localhost:11434" ) logger.info("✅ LangChain Ollama LLM 초기화 완료") # 리트리버 초기화 self.retriever = self.vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 5} ) logger.info("✅ LangChain 리트리버 초기화 완료") # RAG 체인 구성 self._setup_rag_chain() # 데이터베이스 연결 self._setup_database() logger.info("🚀 LangChain RAG 서비스 초기화 완료") except Exception as e: logger.error(f"❌ LangChain 서비스 초기화 실패: {e}") raise def _setup_rag_chain(self): """RAG 체인 설정""" try: # 개선된 프롬프트 템플릿 prompt_template = """ 당신은 연구 문서 전문가입니다. 주어진 문서들을 바탕으로 사용자의 질문에 정확하고 상세하게 답변해주세요. **절대 지켜야 할 답변 규칙:** 1. 답변은 반드시 한국어로만 시작하세요 2. 영어 단어나 문장을 절대 사용하지 마세요 3. 부터 까지의 모든 내용을 절대 포함하지 마세요 4. 사고 과정, 분석 과정, 고민 과정을 전혀 보여주지 마세요 5. 바로 최종 답변만 제공하세요 6. 문서의 내용을 정확히 파악하고 요약하여 답변하세요 7. 구체적인 절차나 방법이 있다면 단계별로 설명하세요 8. 중요한 정보나 주의사항이 있다면 강조하세요 9. 문서에 없는 내용은 추측하지 말고 "문서에 명시되지 않음"이라고 하세요 10. 마크다운 형식을 사용하여 구조화된 답변을 제공하세요 **참조 문서:** {context} **사용자 질문:** {input} **답변:** 위 문서의 내용을 바탕으로 질문에 대한 구체적이고 실용적인 답변을 마크다운 형식으로 바로 제공해주세요. 반드시 한국어로 시작하고, 영어나 태그는 절대 포함하지 마세요. """ prompt = PromptTemplate( template=prompt_template, input_variables=["context", "input"] ) # 문서 체인 생성 document_chain = create_stuff_documents_chain( llm=self.llm, prompt=prompt ) # RAG 체인 생성 self.qa_chain = create_retrieval_chain( retriever=self.retriever, combine_docs_chain=document_chain ) logger.info("✅ RAG 체인 설정 완료") except Exception as e: logger.error(f"❌ RAG 체인 설정 실패: {e}") raise def _setup_database(self): """데이터베이스 연결 설정""" try: self.db_connection = psycopg2.connect( host="localhost", port=5432, database="researchqa", user="woonglab", password="!@#woonglab" ) self.db_connection.autocommit = True logger.info("✅ PostgreSQL 연결 완료") except Exception as e: logger.error(f"❌ PostgreSQL 연결 실패: {e}") raise def add_documents(self, documents: List[Document], metadata: Dict[str, Any] = None): """문서를 벡터스토어에 추가""" try: if metadata: for doc in documents: doc.metadata.update(metadata) # ChromaDB에 문서 추가 self.vectorstore.add_documents(documents) logger.info(f"✅ {len(documents)}개 문서 추가 완료") except Exception as e: logger.error(f"❌ 문서 추가 실패: {e}") raise def search_similar_documents(self, query: str, k: int = 5) -> List[Document]: """유사 문서 검색""" try: docs = self.vectorstore.similarity_search(query, k=k) logger.info(f"✅ {len(docs)}개 유사 문서 검색 완료") return docs except Exception as e: logger.error(f"❌ 유사 문서 검색 실패: {e}") raise def generate_answer(self, question: str) -> Dict[str, Any]: """RAG를 통한 답변 생성""" try: # RAG 체인을 사용하여 답변 생성 if self.qa_chain: logger.info(f"🤖 LLM을 사용한 RAG 답변 생성 시작: {question}") result = self.qa_chain.invoke({"input": question}) # 참조 문서 정보 추출 references = [] source_documents = result.get("context", []) for doc in source_documents: if hasattr(doc, 'metadata') and doc.metadata: filename = doc.metadata.get('filename', 'Unknown') file_id = doc.metadata.get('file_id', 'unknown') chunk_index = doc.metadata.get('chunk_index', 0) page_number = chunk_index + 1 references.append(f"{filename}::{file_id} [p{page_number}]") # 태그 및 영어 사고 과정 제거 (강화된 버전) answer = result.get("answer", "답변을 생성할 수 없습니다.") import re # 부터 까지의 모든 내용 제거 (대소문자 구분 없음) answer = re.sub(r'.*?', '', answer, flags=re.DOTALL | re.IGNORECASE).strip() # 태그만 있는 경우도 제거 answer = re.sub(r'', '', answer, flags=re.IGNORECASE).strip() # 태그만 있는 경우도 제거 answer = re.sub(r'', '', answer, flags=re.IGNORECASE).strip() # 영어 사고 과정 완전 제거 (극강화 버전) lines = answer.split('\n') filtered_lines = [] korean_found = False for line in lines: line = line.strip() if not line: filtered_lines.append(line) continue # 한국어가 포함된 줄이 나오면 korean_found = True if re.search(r'[가-힣]', line): korean_found = True # 한국어가 발견되기 전까지는 모든 영어 줄 무조건 제거 if not korean_found: if re.match(r'^[A-Za-z]', line): continue # 한국어가 발견된 후에도 영어 사고 과정 제거 if re.match(r'^[A-Za-z]', line): # 모든 영어 패턴 제거 (더 많은 패턴 추가) if any(phrase in line.lower() for phrase in [ 'let me', 'i need to', 'i should', 'i will', 'i can', 'the answer should', 'make sure', 'avoid any', 'structure the answer', 'break down', 'organize', 'tag or any thinking', 'so i need to focus', 'focus on the main content', 'documents', 'thinking process', 'internal thought', 'i need to focus', 'main content', 'tag', 'thinking', 'tag or any', 'so i need', 'focus on', 'main content of', 'documents', 'thinking', 'process', 'internal' ]): continue # 영어 문장이지만 중요한 내용이 아닌 경우 제거 if len(line) > 10 and any(word in line.lower() for word in [ 'thinking', 'process', 'structure', 'format', 'markdown', 'tag', 'focus', 'content', 'documents', 'internal', 'answer', 'should', 'make', 'sure', 'avoid' ]): continue # 짧은 영어 문장도 제거 (사고 과정일 가능성) if len(line) < 200 and not re.search(r'[가-힣]', line): continue filtered_lines.append(line) answer = '\n'.join(filtered_lines).strip() response = { "answer": answer, "references": references, "source_documents": source_documents } logger.info(f"✅ LLM RAG 답변 생성 완료: {len(references)}개 참조") return response else: # RAG 체인이 없는 경우 폴백 logger.warning("⚠️ RAG 체인이 초기화되지 않음. 폴백 모드로 전환") return self._generate_fallback_answer(question) except Exception as e: logger.error(f"❌ RAG 답변 생성 실패: {e}") # 오류 시 폴백 답변 생성 return self._generate_fallback_answer(question) def _generate_fallback_answer(self, question: str) -> Dict[str, Any]: """폴백 답변 생성 (LLM 없이)""" try: # 유사 문서 검색 similar_docs = self.search_similar_documents(question, k=3) if not similar_docs: return { "answer": "죄송합니다. 관련 문서를 찾을 수 없습니다. 다른 키워드로 검색해보시거나 문서를 업로드해주세요.", "references": [], "source_documents": [] } # 문서 내용 요약 context_summary = "" references = [] for i, doc in enumerate(similar_docs): # 문서 내용의 핵심 부분 추출 (처음 300자) content_preview = doc.page_content[:300].replace('\n', ' ').strip() context_summary += f"\n• {content_preview}...\n" if hasattr(doc, 'metadata') and doc.metadata: filename = doc.metadata.get('filename', 'Unknown') file_id = doc.metadata.get('file_id', 'unknown') chunk_index = doc.metadata.get('chunk_index', 0) page_number = chunk_index + 1 references.append(f"{filename}::{file_id} [p{page_number}]") # 간단한 답변 생성 answer = f"질문하신 '{question}'에 대한 관련 문서를 찾았습니다.\n\n참조 문서에서 관련 내용을 확인할 수 있습니다:\n{context_summary}" response = { "answer": answer, "references": references, "source_documents": similar_docs } logger.info(f"✅ 폴백 답변 생성 완료: {len(references)}개 참조") return response except Exception as e: logger.error(f"❌ 폴백 답변 생성 실패: {e}") return { "answer": "죄송합니다. 현재 시스템 오류로 인해 답변을 생성할 수 없습니다. 잠시 후 다시 시도해주세요.", "references": [], "source_documents": [] } def get_collection_info(self) -> Dict[str, Any]: """컬렉션 정보 조회""" try: # ChromaDB 컬렉션 정보 collection = self.vectorstore._collection count = collection.count() return { "total_documents": count, "collection_name": "research_documents", "embedding_model": "jhgan/ko-sroberta-multitask" } except Exception as e: logger.error(f"❌ 컬렉션 정보 조회 실패: {e}") return {"error": str(e)} def delete_documents_by_filename(self, filename: str): """파일명으로 문서 삭제""" try: # 메타데이터로 필터링하여 삭제 collection = self.vectorstore._collection collection.delete(where={"filename": filename}) logger.info(f"✅ {filename} 관련 문서 삭제 완료") except Exception as e: logger.error(f"❌ 문서 삭제 실패: {e}") raise def cleanup_database_by_filename(self, filename: str): """데이터베이스에서 파일 관련 데이터 정리""" try: cursor = self.db_connection.cursor() # 파일 관련 벡터 데이터 삭제 cursor.execute( "DELETE FROM file_vectors WHERE filename = %s", (filename,) ) # 파일 메타데이터 삭제 cursor.execute( "DELETE FROM files WHERE filename = %s", (filename,) ) cursor.close() logger.info(f"✅ {filename} 데이터베이스 정리 완료") except Exception as e: logger.error(f"❌ 데이터베이스 정리 실패: {e}") raise # 전역 서비스 인스턴스 langchain_service = LangChainRAGService()