#!/usr/bin/env python3 """gxp_bulk_ingest.py Bulk-ingest GxP PDF files into the GxP Vector DB. Usage:: python scripts/gxp_bulk_ingest.py [--dir PATH_TO_PDFS] If no --dir given, defaults to scripts/gxp/ . The script will: 1. Recursively scan the directory for *.pdf files. 2. For each file, run the PDF-Plumber extraction via GxPDocumentPreprocessingService. 3. Send the extracted result to GxPVectorDBService.construct_vector_db(). This bypasses the HTTP API layer and calls the internal services directly, so it must be run in the project root (or ensure PYTHONPATH includes project root). """ # 표준 라이브러리 from pathlib import Path import argparse import sys import os from typing import Set # 외부 라이브러리 (런타임에 없으면 requirements.txt 참고) from langchain_openai import OpenAIEmbeddings # Ensure backend path importable ROOT = Path(__file__).resolve().parents[1] BACKEND_PATH = ROOT / "backend" sys.path.append(str(BACKEND_PATH)) # 내부 서비스 모듈 from engines.chatbot_gxp.service.GxPDocumentPreprocessingService import ( GxPDocumentPreprocessingService, ) from engines.chatbot_gxp.service.GxPVectorDBService import GxPVectorDBService # --------------------------------------------------------------------------- # 유틸리티 # --------------------------------------------------------------------------- def ensure_ollama() -> None: """Ollama 서버가 동작 중인지 사전 확인한다. 임베딩 테스트가 실패하면 즉시 종료한다. """ try: _ = OpenAIEmbeddings().embed_query("ping") except Exception as exc: # pylint: disable=broad-except print( "[!] Ollama 서버에 연결할 수 없습니다. 'ollama serve'가 실행 중인지 확인하세요.\n", f" 상세 오류: {exc}", sep="", ) sys.exit(1) def ingest_pdfs( pdf_dir: Path, *, skip_existing: bool = False, reindex_existing: bool = False, ) -> None: """디렉터리 내 PDF를 벡터 DB 에 일괄 인덱싱한다.""" pre_service = GxPDocumentPreprocessingService() vec_service = GxPVectorDBService() # 기존 컬렉션 목록 캐싱 existing_collections: Set[str] = { col["name"] for col in vec_service._list_collections() # type: ignore } pdf_files = list(pdf_dir.rglob("*.pdf")) if not pdf_files: print(f"[!] No PDF files found in {pdf_dir}") return stats = {"indexed": 0, "skipped": 0, "failed": 0} for pdf_path in pdf_files: rel_path = pdf_path.relative_to(ROOT) print(f"[+] Processing {rel_path}") try: # 1단계: 전처리 doc = pre_service.pdf_plumber_edms_document_text_extraction(str(pdf_path)) # 2단계: 컬렉션 이름 계산 후 존재 여부 판단 raw_name = f"gxp_{doc.get('plant', 'default')}_{doc.get('filename', 'document')}" collection_name = vec_service._sanitize_collection_name(raw_name) # type: ignore if collection_name in existing_collections: if skip_existing: print(" ↩︎ skip (already indexed)") stats["skipped"] += 1 continue if reindex_existing: print(" ℹ︎ collection exists → 삭제 후 재인덱싱") vec_service.delete_collection(collection_name) existing_collections.remove(collection_name) # 3단계: 벡터 DB 구축 ok = vec_service.construct_vector_db(doc) if ok: print(" ✔ indexed") stats["indexed"] += 1 existing_collections.add(collection_name) else: print(" ✖ service returned False") stats["failed"] += 1 except Exception as exc: # pylint: disable=broad-except print(f" ✖ failed: {exc}") stats["failed"] += 1 # 요약 통계 출력 print("\n──────── 요약 통계 ────────") for k, v in stats.items(): print(f"{k:8}: {v}") def main() -> None: """엔트리 포인트""" parser = argparse.ArgumentParser( description="Bulk ingest GxP PDFs into Chroma vector DB", ) parser.add_argument( "--dir", type=str, default=str(ROOT / "scripts" / "gxp"), help="Directory containing PDF files (default: scripts/gxp)", ) excl = parser.add_mutually_exclusive_group() excl.add_argument( "--skip-existing", action="store_true", help="Skip PDFs whose collection already exists", ) excl.add_argument( "--reindex", action="store_true", help="Delete existing collection then reindex", ) args = parser.parse_args() pdf_dir = Path(args.dir).expanduser().resolve() if not pdf_dir.is_dir(): print(f"Directory not found: {pdf_dir}") sys.exit(1) ensure_ollama() ingest_pdfs( pdf_dir, skip_existing=args.skip_existing, reindex_existing=args.reindex, ) if __name__ == "__main__": main()