# whisper_stt.py — OpenAI Whisper CLI (m4a/mp3 등 디코딩에 ffmpeg 필요) # 기본: 화자 구분(pyannote) 켜짐, 모델 ./models/pyannote-diarization-3.1 (로컬 폴더 필수) # 끄기: --no-diarize from __future__ import annotations import argparse import os import shutil import sys import time from typing import Any import whisper import whisper.audio as whisper_audio DEFAULT_DIARIZE_MODEL_DIR = "./models/pyannote-diarization-3.1" def _validate_pyannote_snapshot(model_dir: str) -> None: """README만 있거나 중간에 끊긴 다운로드면 config.yaml 이 없다.""" cfg = os.path.join(model_dir, "config.yaml") if os.path.isfile(cfg): return abs_dir = os.path.abspath(model_dir) print( f"오류: pyannote 모델 폴더가 불완전합니다 (config.yaml 없음): {abs_dir}\n" "Hugging Face에서 README만 받아졌거나, 다운로드가 중간에 끊긴 상태일 수 있습니다.\n\n" "프로젝트 루트에서 로그인 후 전체 스냅샷을 다시 받으세요:\n" " hf auth login\n" " hf download pyannote/speaker-diarization-3.1 \\\n" f" --local-dir {DEFAULT_DIARIZE_MODEL_DIR}\n\n" "기존 폴더를 비우고 받으려면(주의: 폴더 안 파일 전부 삭제):\n" f" rm -rf \"{abs_dir}\"/*\n" " hf download pyannote/speaker-diarization-3.1 \\\n" f" --local-dir {DEFAULT_DIARIZE_MODEL_DIR}\n\n" "https://hf.co/pyannote/speaker-diarization-3.1 에서 모델 약관 동의가 되어 있어야 합니다.\n" "화자 구분 없이 Whisper만 쓰려면:\n" " python whisper_stt.py 입력.m4a 출력.txt --no-diarize\n", file=sys.stderr, ) sys.exit(1) def _resolve_ffmpeg_exe() -> str: """PATH의 ffmpeg 또는 imageio-ffmpeg 번들 바이너리.""" path = shutil.which("ffmpeg") if path: return path try: import imageio_ffmpeg return imageio_ffmpeg.get_ffmpeg_exe() except ImportError: pass print( "오류: ffmpeg를 찾을 수 없습니다. Whisper는 m4a/mp3 등을 ffmpeg로 디코딩합니다.\n\n" "설치 방법(택 1):\n" " • Homebrew: brew install ffmpeg\n" " • conda: conda install -c conda-forge ffmpeg\n" " • pip 번들: pip install imageio-ffmpeg\n" " (이 프로젝트 requirements.txt에 포함되어 있으면 pip install -r requirements.txt)\n", file=sys.stderr, ) sys.exit(1) def _patch_whisper_ffmpeg() -> None: """whisper.audio는 명령 이름 'ffmpeg'만 사용하므로, 실제 경로로 치환한다.""" ffmpeg_exe = _resolve_ffmpeg_exe() _orig_run = whisper_audio.run def _run(cmd, *args, **kwargs): if isinstance(cmd, (list, tuple)) and cmd and cmd[0] == "ffmpeg": cmd = [ffmpeg_exe] + list(cmd[1:]) return _orig_run(cmd, *args, **kwargs) whisper_audio.run = _run # type: ignore[method-assign] def _overlap_sec(a0: float, a1: float, b0: float, b1: float) -> float: return max(0.0, min(a1, b1) - max(a0, b0)) def _assign_speaker( seg_start: float, seg_end: float, turns: list[tuple[float, float, str]] ) -> str | None: best: str | None = None best_ov = 0.0 for t0, t1, sp in turns: ov = _overlap_sec(seg_start, seg_end, t0, t1) if ov > best_ov: best_ov = ov best = sp if best is None or best_ov < 0.05: return None return best def _speaker_label_order(turns: list[tuple[float, float, str]]) -> dict[str, str]: """다이어리제이션 타임라인 순으로 처음 등장하는 화자 → A, B, C, …""" order: list[str] = [] for t0, _, sp in sorted(turns, key=lambda x: x[0]): if sp not in order: order.append(sp) def letter(i: int) -> str: if i < 26: return chr(ord("A") + i) return f"SP{i + 1}" return {sp: letter(i) for i, sp in enumerate(order)} def _format_diarized_text( whisper_segments: list[dict[str, Any]], turns: list[tuple[float, float, str]], ) -> str: labels = _speaker_label_order(turns) lines: list[str] = [] current_letter: str | None = None current_parts: list[str] = [] def flush() -> None: nonlocal current_letter, current_parts if current_letter is not None and current_parts: lines.append(f"{current_letter}: {' '.join(current_parts).strip()}") current_letter = None current_parts = [] for seg in whisper_segments: text = (seg.get("text") or "").strip() if not text: continue start = float(seg["start"]) end = float(seg["end"]) sp = _assign_speaker(start, end, turns) letter = labels.get(sp, "?") if sp is not None else "?" if letter == current_letter: current_parts.append(text) else: flush() current_letter = letter current_parts = [text] flush() return "\n".join(lines) def _resolve_local_diarize_dir(cli_dir: str | None) -> str: """ 로컬 pyannote 스냅샷 디렉터리만 사용(허브 자동 다운로드 없음). 우선순위: --diarize-model-dir > WHISPER_DIARIZE_MODEL_DIR > PYANNOTE_MODEL_DIR > 기본값. """ if cli_dir is not None: path = os.path.abspath(os.path.expanduser(cli_dir)) if os.path.isdir(path): return path print( f"오류: --diarize-model-dir 가 가리키는 폴더가 없습니다: {path}", file=sys.stderr, ) sys.exit(1) for cand in ( os.environ.get("WHISPER_DIARIZE_MODEL_DIR"), os.environ.get("PYANNOTE_MODEL_DIR"), ): if cand: path = os.path.abspath(os.path.expanduser(cand)) if os.path.isdir(path): return path path = os.path.abspath(os.path.expanduser(DEFAULT_DIARIZE_MODEL_DIR)) if os.path.isdir(path): return path print( f"오류: 화자 분리 모델 폴더가 없습니다: {path}\n\n" "다음으로 받은 뒤 다시 실행하세요 (한 번만, 약관 동의·hf auth login 필요):\n" " hf download pyannote/speaker-diarization-3.1 \\\n" f" --local-dir {DEFAULT_DIARIZE_MODEL_DIR}\n\n" "화자 구분 없이 Whisper만 쓰려면:\n" " python whisper_stt.py 입력.m4a 출력.txt --no-diarize\n", file=sys.stderr, ) sys.exit(1) def _run_diarization(audio_path: str, *, diarize_model_dir: str | None) -> list[tuple[float, float, str]]: try: import torch # noqa: F401 — device 계산용(아래) import pyannote.audio # noqa: F401 — 설치 여부 확인 except ImportError: print( "오류: pyannote.audio 가 설치되어 있지 않습니다.\n" " pip install -r requirements-whisper-stt.txt\n", file=sys.stderr, ) sys.exit(1) from pyannote.audio import Pipeline model_dir = _resolve_local_diarize_dir(diarize_model_dir) _validate_pyannote_snapshot(model_dir) print(f"[4/4] 화자 분리(pyannote) — 로컬 모델: {model_dir}", flush=True) print("[4/4] 화자 분리 실행 중... (수 분 걸릴 수 있음)", flush=True) t0 = time.perf_counter() try: pipeline = Pipeline.from_pretrained(model_dir) except Exception as e: print( f"오류: pyannote 파이프라인을 불러오지 못했습니다: {e}\n\n" "config.yaml 은 있는데도 실패하면 하위 체크포인트(segmentation·embedding 등)가 빠졌을 수 있습니다.\n" "폴더를 비운 뒤 전체 재다운로드:\n" " hf download pyannote/speaker-diarization-3.1 \\\n" f" --local-dir {DEFAULT_DIARIZE_MODEL_DIR}\n", file=sys.stderr, ) sys.exit(1) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") pipeline.to(device) diarization = pipeline(audio_path) turns: list[tuple[float, float, str]] = [] for segment, _, label in diarization.itertracks(yield_label=True): turns.append((float(segment.start), float(segment.end), str(label))) turns.sort(key=lambda x: x[0]) print(f"[4/4] 화자 분리 완료 ({time.perf_counter() - t0:.1f}초, 구간 {len(turns)}개)", flush=True) return turns def main() -> None: parser = argparse.ArgumentParser( description="OpenAI Whisper로 음성을 텍스트로 변환합니다. 기본은 화자 구분(A/B/C) 포함.", ) parser.add_argument("input_file", help="입력 오디오 파일") parser.add_argument("output_file", help="출력 .txt 경로") parser.add_argument( "--no-diarize", action="store_true", help="화자 구분 끄기 (Whisper 전체 텍스트만 저장)", ) parser.add_argument( "--diarize-model-dir", default=None, metavar="DIR", help=( f"pyannote 로컬 스냅샷 폴더 (기본: {DEFAULT_DIARIZE_MODEL_DIR} 또는 " "WHISPER_DIARIZE_MODEL_DIR / PYANNOTE_MODEL_DIR)" ), ) args = parser.parse_args() input_file = os.path.expanduser(args.input_file) output_file = os.path.expanduser(args.output_file) if not os.path.exists(input_file): print(f"오류: 입력 파일이 존재하지 않습니다: {input_file}") sys.exit(1) t_all = time.perf_counter() use_diarize = not args.no_diarize n_steps = 4 if use_diarize else 3 _patch_whisper_ffmpeg() print(f"[1/{n_steps}] ffmpeg 준비 완료", flush=True) print( f"[2/{n_steps}] Whisper 모델 로드 중... (medium, 한국어)\n" " 최초 실행 시 가중치 다운로드로 수 분 걸릴 수 있습니다.", flush=True, ) t0 = time.perf_counter() model = whisper.load_model("medium") print(f"[2/{n_steps}] 모델 로드 완료 ({time.perf_counter() - t0:.1f}초)", flush=True) print( f"[3/{n_steps}] 음성 인식 중: {input_file}\n" " 아래 progress bar가 프레임 단위 진행률(%)과 예상 남은 시간을 표시합니다.\n" " (직후 잠시 멈춘 것처럼 보이면 오디오 디코딩·멜 스펙트럼 계산 중일 수 있습니다.)", flush=True, ) t0 = time.perf_counter() result = model.transcribe( input_file, language="ko", fp16=False, verbose=False, ) print(f"\n[3/{n_steps}] 음성 인식 완료 ({time.perf_counter() - t0:.1f}초)", flush=True) if use_diarize: turns = _run_diarization(input_file, diarize_model_dir=args.diarize_model_dir) body = _format_diarized_text(result["segments"], turns) body = ( "※ 화자 A, B, C… 는 실제 이름이 아니라, 이 녹음에서 말이 처음 잡힌 순서로 붙인 구분자입니다.\n" "※ 같은 사람이 여러 구간으로 나뉘면 라벨이 바뀌거나 섞일 수 있으니, 중요한 회의는 검수가 필요합니다.\n\n" + body ) else: body = result["text"].strip() print("\n===== 변환 결과 미리보기 =====\n", flush=True) preview = body[:800] + ("..." if len(body) > 800 else "") print(preview, flush=True) with open(output_file, "w", encoding="utf-8") as f: f.write(body) if not body.endswith("\n"): f.write("\n") print( f"\n전체 소요: {time.perf_counter() - t_all:.1f}초\n변환 완료. 출력 파일: {output_file}", flush=True, ) if __name__ == "__main__": main()