Add whisper_stt CLI with default diarization, Ubuntu README, editor config

- Replace test.py with whisper_stt.py: OpenAI Whisper + default speaker diarization
  via local ./models/pyannote-diarization-3.1; --no-diarize for plain text
- Add requirements-whisper-stt.txt (whisper, pyannote, huggingface_hub, imageio-ffmpeg)
- README: stt conda env, Ubuntu/macOS ffmpeg, CLI usage
- .vscode: Python interpreter /opt/anaconda3/envs/stt; .cursor rule for stt env
- .gitignore: exclude downloaded pyannote snapshot under models/

Made-with: Cursor
This commit is contained in:
dosangyoon
2026-03-23 11:34:46 +09:00
parent 78244da09f
commit 1c25bed926
7 changed files with 412 additions and 98 deletions

294
whisper_stt.py Normal file
View File

@@ -0,0 +1,294 @@
# whisper_stt.py — OpenAI Whisper CLI (m4a/mp3 등 디코딩에 ffmpeg 필요)
# 기본: 화자 구분(pyannote) 켜짐, 모델 ./models/pyannote-diarization-3.1 (로컬 폴더 필수)
# 끄기: --no-diarize
from __future__ import annotations
import argparse
import os
import shutil
import sys
import time
from typing import Any
import whisper
import whisper.audio as whisper_audio
DEFAULT_DIARIZE_MODEL_DIR = "./models/pyannote-diarization-3.1"
def _resolve_ffmpeg_exe() -> str:
"""PATH의 ffmpeg 또는 imageio-ffmpeg 번들 바이너리."""
path = shutil.which("ffmpeg")
if path:
return path
try:
import imageio_ffmpeg
return imageio_ffmpeg.get_ffmpeg_exe()
except ImportError:
pass
print(
"오류: ffmpeg를 찾을 수 없습니다. Whisper는 m4a/mp3 등을 ffmpeg로 디코딩합니다.\n\n"
"설치 방법(택 1):\n"
" • Homebrew: brew install ffmpeg\n"
" • conda: conda install -c conda-forge ffmpeg\n"
" • pip 번들: pip install imageio-ffmpeg\n"
" (이 프로젝트 requirements.txt에 포함되어 있으면 pip install -r requirements.txt)\n",
file=sys.stderr,
)
sys.exit(1)
def _patch_whisper_ffmpeg() -> None:
"""whisper.audio는 명령 이름 'ffmpeg'만 사용하므로, 실제 경로로 치환한다."""
ffmpeg_exe = _resolve_ffmpeg_exe()
_orig_run = whisper_audio.run
def _run(cmd, *args, **kwargs):
if isinstance(cmd, (list, tuple)) and cmd and cmd[0] == "ffmpeg":
cmd = [ffmpeg_exe] + list(cmd[1:])
return _orig_run(cmd, *args, **kwargs)
whisper_audio.run = _run # type: ignore[method-assign]
def _overlap_sec(a0: float, a1: float, b0: float, b1: float) -> float:
return max(0.0, min(a1, b1) - max(a0, b0))
def _assign_speaker(
seg_start: float, seg_end: float, turns: list[tuple[float, float, str]]
) -> str | None:
best: str | None = None
best_ov = 0.0
for t0, t1, sp in turns:
ov = _overlap_sec(seg_start, seg_end, t0, t1)
if ov > best_ov:
best_ov = ov
best = sp
if best is None or best_ov < 0.05:
return None
return best
def _speaker_label_order(turns: list[tuple[float, float, str]]) -> dict[str, str]:
"""다이어리제이션 타임라인 순으로 처음 등장하는 화자 → A, B, C, …"""
order: list[str] = []
for t0, _, sp in sorted(turns, key=lambda x: x[0]):
if sp not in order:
order.append(sp)
def letter(i: int) -> str:
if i < 26:
return chr(ord("A") + i)
return f"SP{i + 1}"
return {sp: letter(i) for i, sp in enumerate(order)}
def _format_diarized_text(
whisper_segments: list[dict[str, Any]],
turns: list[tuple[float, float, str]],
) -> str:
labels = _speaker_label_order(turns)
lines: list[str] = []
current_letter: str | None = None
current_parts: list[str] = []
def flush() -> None:
nonlocal current_letter, current_parts
if current_letter is not None and current_parts:
lines.append(f"{current_letter}: {' '.join(current_parts).strip()}")
current_letter = None
current_parts = []
for seg in whisper_segments:
text = (seg.get("text") or "").strip()
if not text:
continue
start = float(seg["start"])
end = float(seg["end"])
sp = _assign_speaker(start, end, turns)
letter = labels.get(sp, "?") if sp is not None else "?"
if letter == current_letter:
current_parts.append(text)
else:
flush()
current_letter = letter
current_parts = [text]
flush()
return "\n".join(lines)
def _resolve_local_diarize_dir(cli_dir: str | None) -> str:
"""
로컬 pyannote 스냅샷 디렉터리만 사용(허브 자동 다운로드 없음).
우선순위: --diarize-model-dir > WHISPER_DIARIZE_MODEL_DIR > PYANNOTE_MODEL_DIR > 기본값.
"""
if cli_dir is not None:
path = os.path.abspath(os.path.expanduser(cli_dir))
if os.path.isdir(path):
return path
print(
f"오류: --diarize-model-dir 가 가리키는 폴더가 없습니다: {path}",
file=sys.stderr,
)
sys.exit(1)
for cand in (
os.environ.get("WHISPER_DIARIZE_MODEL_DIR"),
os.environ.get("PYANNOTE_MODEL_DIR"),
):
if cand:
path = os.path.abspath(os.path.expanduser(cand))
if os.path.isdir(path):
return path
path = os.path.abspath(os.path.expanduser(DEFAULT_DIARIZE_MODEL_DIR))
if os.path.isdir(path):
return path
print(
f"오류: 화자 분리 모델 폴더가 없습니다: {path}\n\n"
"다음으로 받은 뒤 다시 실행하세요 (한 번만, 약관 동의·hf auth login 필요):\n"
" hf download pyannote/speaker-diarization-3.1 \\\n"
f" --local-dir {DEFAULT_DIARIZE_MODEL_DIR}\n\n"
"화자 구분 없이 Whisper만 쓰려면:\n"
" python whisper_stt.py 입력.m4a 출력.txt --no-diarize\n",
file=sys.stderr,
)
sys.exit(1)
def _run_diarization(audio_path: str, *, diarize_model_dir: str | None) -> list[tuple[float, float, str]]:
try:
import torch # noqa: F401 — device 계산용(아래)
import pyannote.audio # noqa: F401 — 설치 여부 확인
except ImportError:
print(
"오류: pyannote.audio 가 설치되어 있지 않습니다.\n"
" pip install -r requirements-whisper-stt.txt\n",
file=sys.stderr,
)
sys.exit(1)
from pyannote.audio import Pipeline
model_dir = _resolve_local_diarize_dir(diarize_model_dir)
print(f"[4/4] 화자 분리(pyannote) — 로컬 모델: {model_dir}", flush=True)
print("[4/4] 화자 분리 실행 중... (수 분 걸릴 수 있음)", flush=True)
t0 = time.perf_counter()
try:
pipeline = Pipeline.from_pretrained(model_dir)
except Exception as e:
print(
f"오류: pyannote 파이프라인을 불러오지 못했습니다: {e}\n\n"
"모델 파일이 손상되었거나 하위 가중치가 빠졌을 수 있습니다.\n"
"다시 받기:\n"
" hf download pyannote/speaker-diarization-3.1 \\\n"
f" --local-dir {DEFAULT_DIARIZE_MODEL_DIR}\n",
file=sys.stderr,
)
sys.exit(1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pipeline.to(device)
diarization = pipeline(audio_path)
turns: list[tuple[float, float, str]] = []
for segment, _, label in diarization.itertracks(yield_label=True):
turns.append((float(segment.start), float(segment.end), str(label)))
turns.sort(key=lambda x: x[0])
print(f"[4/4] 화자 분리 완료 ({time.perf_counter() - t0:.1f}초, 구간 {len(turns)}개)", flush=True)
return turns
def main() -> None:
parser = argparse.ArgumentParser(
description="OpenAI Whisper로 음성을 텍스트로 변환합니다. 기본은 화자 구분(A/B/C) 포함.",
)
parser.add_argument("input_file", help="입력 오디오 파일")
parser.add_argument("output_file", help="출력 .txt 경로")
parser.add_argument(
"--no-diarize",
action="store_true",
help="화자 구분 끄기 (Whisper 전체 텍스트만 저장)",
)
parser.add_argument(
"--diarize-model-dir",
default=None,
metavar="DIR",
help=(
f"pyannote 로컬 스냅샷 폴더 (기본: {DEFAULT_DIARIZE_MODEL_DIR} 또는 "
"WHISPER_DIARIZE_MODEL_DIR / PYANNOTE_MODEL_DIR)"
),
)
args = parser.parse_args()
input_file = os.path.expanduser(args.input_file)
output_file = os.path.expanduser(args.output_file)
if not os.path.exists(input_file):
print(f"오류: 입력 파일이 존재하지 않습니다: {input_file}")
sys.exit(1)
t_all = time.perf_counter()
use_diarize = not args.no_diarize
n_steps = 4 if use_diarize else 3
_patch_whisper_ffmpeg()
print(f"[1/{n_steps}] ffmpeg 준비 완료", flush=True)
print(
f"[2/{n_steps}] Whisper 모델 로드 중... (medium, 한국어)\n"
" 최초 실행 시 가중치 다운로드로 수 분 걸릴 수 있습니다.",
flush=True,
)
t0 = time.perf_counter()
model = whisper.load_model("medium")
print(f"[2/{n_steps}] 모델 로드 완료 ({time.perf_counter() - t0:.1f}초)", flush=True)
print(
f"[3/{n_steps}] 음성 인식 중: {input_file}\n"
" 아래 progress bar가 프레임 단위 진행률(%)과 예상 남은 시간을 표시합니다.\n"
" (직후 잠시 멈춘 것처럼 보이면 오디오 디코딩·멜 스펙트럼 계산 중일 수 있습니다.)",
flush=True,
)
t0 = time.perf_counter()
result = model.transcribe(
input_file,
language="ko",
fp16=False,
verbose=False,
)
print(f"\n[3/{n_steps}] 음성 인식 완료 ({time.perf_counter() - t0:.1f}초)", flush=True)
if use_diarize:
turns = _run_diarization(input_file, diarize_model_dir=args.diarize_model_dir)
body = _format_diarized_text(result["segments"], turns)
body = (
"※ 화자 A, B, C… 는 실제 이름이 아니라, 이 녹음에서 말이 처음 잡힌 순서로 붙인 구분자입니다.\n"
"※ 같은 사람이 여러 구간으로 나뉘면 라벨이 바뀌거나 섞일 수 있으니, 중요한 회의는 검수가 필요합니다.\n\n"
+ body
)
else:
body = result["text"].strip()
print("\n===== 변환 결과 미리보기 =====\n", flush=True)
preview = body[:800] + ("..." if len(body) > 800 else "")
print(preview, flush=True)
with open(output_file, "w", encoding="utf-8") as f:
f.write(body)
if not body.endswith("\n"):
f.write("\n")
print(
f"\n전체 소요: {time.perf_counter() - t_all:.1f}\n변환 완료. 출력 파일: {output_file}",
flush=True,
)
if __name__ == "__main__":
main()