Files
ncuetalk_backend/scripts/gxp_bulk_ingest.py
dsyoon 46460b77f8 init
2025-12-27 14:06:26 +09:00

171 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""gxp_bulk_ingest.py
Bulk-ingest GxP PDF files into the GxP Vector DB.
Usage::
python scripts/gxp_bulk_ingest.py [--dir PATH_TO_PDFS]
If no --dir given, defaults to scripts/gxp/ .
The script will:
1. Recursively scan the directory for *.pdf files.
2. For each file, run the PDF-Plumber extraction via GxPDocumentPreprocessingService.
3. Send the extracted result to GxPVectorDBService.construct_vector_db().
This bypasses the HTTP API layer and calls the internal services directly, so it must
be run in the project root (or ensure PYTHONPATH includes project root).
"""
# 표준 라이브러리
from pathlib import Path
import argparse
import sys
import os
from typing import Set
# 외부 라이브러리 (런타임에 없으면 requirements.txt 참고)
from langchain_openai import OpenAIEmbeddings
# Ensure backend path importable
ROOT = Path(__file__).resolve().parents[1]
BACKEND_PATH = ROOT / "backend"
sys.path.append(str(BACKEND_PATH))
# 내부 서비스 모듈
from engines.chatbot_gxp.service.GxPDocumentPreprocessingService import (
GxPDocumentPreprocessingService,
)
from engines.chatbot_gxp.service.GxPVectorDBService import GxPVectorDBService
# ---------------------------------------------------------------------------
# 유틸리티
# ---------------------------------------------------------------------------
def ensure_ollama() -> None:
"""Ollama 서버가 동작 중인지 사전 확인한다.
임베딩 테스트가 실패하면 즉시 종료한다.
"""
try:
_ = OpenAIEmbeddings().embed_query("ping")
except Exception as exc: # pylint: disable=broad-except
print(
"[!] Ollama 서버에 연결할 수 없습니다. 'ollama serve'가 실행 중인지 확인하세요.\n",
f" 상세 오류: {exc}",
sep="",
)
sys.exit(1)
def ingest_pdfs(
pdf_dir: Path,
*,
skip_existing: bool = False,
reindex_existing: bool = False,
) -> None:
"""디렉터리 내 PDF를 벡터 DB 에 일괄 인덱싱한다."""
pre_service = GxPDocumentPreprocessingService()
vec_service = GxPVectorDBService()
# 기존 컬렉션 목록 캐싱
existing_collections: Set[str] = {
col["name"] for col in vec_service._list_collections() # type: ignore
}
pdf_files = list(pdf_dir.rglob("*.pdf"))
if not pdf_files:
print(f"[!] No PDF files found in {pdf_dir}")
return
stats = {"indexed": 0, "skipped": 0, "failed": 0}
for pdf_path in pdf_files:
rel_path = pdf_path.relative_to(ROOT)
print(f"[+] Processing {rel_path}")
try:
# 1단계: 전처리
doc = pre_service.pdf_plumber_edms_document_text_extraction(str(pdf_path))
# 2단계: 컬렉션 이름 계산 후 존재 여부 판단
raw_name = f"gxp_{doc.get('plant', 'default')}_{doc.get('filename', 'document')}"
collection_name = vec_service._sanitize_collection_name(raw_name) # type: ignore
if collection_name in existing_collections:
if skip_existing:
print(" ↩︎ skip (already indexed)")
stats["skipped"] += 1
continue
if reindex_existing:
print(" collection exists → 삭제 후 재인덱싱")
vec_service.delete_collection(collection_name)
existing_collections.remove(collection_name)
# 3단계: 벡터 DB 구축
ok = vec_service.construct_vector_db(doc)
if ok:
print(" ✔ indexed")
stats["indexed"] += 1
existing_collections.add(collection_name)
else:
print(" ✖ service returned False")
stats["failed"] += 1
except Exception as exc: # pylint: disable=broad-except
print(f" ✖ failed: {exc}")
stats["failed"] += 1
# 요약 통계 출력
print("\n──────── 요약 통계 ────────")
for k, v in stats.items():
print(f"{k:8}: {v}")
def main() -> None:
"""엔트리 포인트"""
parser = argparse.ArgumentParser(
description="Bulk ingest GxP PDFs into Chroma vector DB",
)
parser.add_argument(
"--dir",
type=str,
default=str(ROOT / "scripts" / "gxp"),
help="Directory containing PDF files (default: scripts/gxp)",
)
excl = parser.add_mutually_exclusive_group()
excl.add_argument(
"--skip-existing",
action="store_true",
help="Skip PDFs whose collection already exists",
)
excl.add_argument(
"--reindex",
action="store_true",
help="Delete existing collection then reindex",
)
args = parser.parse_args()
pdf_dir = Path(args.dir).expanduser().resolve()
if not pdf_dir.is_dir():
print(f"Directory not found: {pdf_dir}")
sys.exit(1)
ensure_ollama()
ingest_pdfs(
pdf_dir,
skip_existing=args.skip_existing,
reindex_existing=args.reindex,
)
if __name__ == "__main__":
main()