Fresh start on MariaDB Gitea

This commit is contained in:
2026-02-25 19:04:18 +09:00
commit 326b749ca8
20 changed files with 1171 additions and 0 deletions

1
server/__init__.py Normal file
View File

@@ -0,0 +1 @@

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

159
server/db.py Normal file
View File

@@ -0,0 +1,159 @@
import os
from typing import List, Optional, Dict, Any
import psycopg2
import psycopg2.extras
def get_conn():
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
if not user or not password:
raise RuntimeError("DB_USER 또는 DB_PASSWORD가 설정되지 않았습니다.")
return psycopg2.connect(
host="ncue.net",
port=5432,
dbname="tts",
user=user,
password=password,
)
def init_db():
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS tts_items (
id SERIAL PRIMARY KEY,
text TEXT NOT NULL,
filename TEXT,
size_bytes BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
"""
)
cur.execute(
"""
ALTER TABLE tts_items
ADD COLUMN IF NOT EXISTS size_bytes BIGINT;
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS tts_items_created_at_idx
ON tts_items (created_at DESC);
"""
)
conn.commit()
def create_item(text: str) -> Dict[str, Any]:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
INSERT INTO tts_items (text)
VALUES (%s)
RETURNING id, created_at;
""",
(text,),
)
row = cur.fetchone()
conn.commit()
return row
def update_filename(tts_id: int, filename: str) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE tts_items
SET filename = %s
WHERE id = %s;
""",
(filename, tts_id),
)
conn.commit()
def update_size_bytes(tts_id: int, size_bytes: int) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE tts_items
SET size_bytes = %s
WHERE id = %s;
""",
(size_bytes, tts_id),
)
conn.commit()
def list_items() -> List[Dict[str, Any]]:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, created_at, filename, size_bytes
FROM tts_items
ORDER BY created_at DESC;
"""
)
rows = cur.fetchall()
return rows
def get_item(tts_id: int) -> Optional[Dict[str, Any]]:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, text, filename, size_bytes, created_at
FROM tts_items
WHERE id = %s;
""",
(tts_id,),
)
row = cur.fetchone()
return row
def delete_items(ids: List[int]) -> List[Dict[str, Any]]:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id, filename
FROM tts_items
WHERE id = ANY(%s);
""",
(ids,),
)
rows = cur.fetchall()
cur.execute(
"""
DELETE FROM tts_items
WHERE id = ANY(%s);
""",
(ids,),
)
conn.commit()
return rows
def delete_item_by_id(tts_id: int) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
DELETE FROM tts_items
WHERE id = %s;
""",
(tts_id,),
)
conn.commit()

213
server/main.py Normal file
View File

@@ -0,0 +1,213 @@
from pathlib import Path
from typing import List
import logging
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from .db import (
init_db,
create_item,
update_filename,
update_size_bytes,
list_items,
get_item,
delete_items,
delete_item_by_id,
)
from .tts_service import text_to_mp3
BASE_DIR = Path(__file__).resolve().parent
ROOT_DIR = BASE_DIR.parent
CLIENT_DIR = ROOT_DIR / "client"
RESOURCES_DIR = ROOT_DIR / "resources"
# 프로젝트 루트의 .env를 명시적으로 로드
load_dotenv(dotenv_path=ROOT_DIR / ".env")
app = FastAPI()
logger = logging.getLogger("tts")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory=str(CLIENT_DIR / "static")), name="static")
templates = Jinja2Templates(directory=str(CLIENT_DIR / "templates"))
class TtsCreateRequest(BaseModel):
text: str
voice: str | None = None
class TtsDeleteRequest(BaseModel):
ids: List[int]
def format_display_time(dt):
# 한국 표기 형식으로 변환
local_dt = dt.astimezone()
return local_dt.strftime("%Y년 %m월 %d%H:%M:%S")
def ensure_resources_dir():
# mp3 저장 디렉토리 보장
RESOURCES_DIR.mkdir(parents=True, exist_ok=True)
def format_size(bytes_size: int) -> str:
if bytes_size < 1024:
return f"{bytes_size}B"
if bytes_size < 1024 * 1024:
return f"{bytes_size / 1024:.1f}KB"
return f"{bytes_size / (1024 * 1024):.1f}MB"
def get_file_size_display(size_bytes: int | None) -> str | None:
if size_bytes is None:
return None
return format_size(size_bytes)
def get_file_size_bytes(filename: str | None) -> int | None:
if not filename:
return None
file_path = RESOURCES_DIR / filename
if not file_path.exists():
return None
return file_path.stat().st_size
@app.on_event("startup")
def on_startup():
ensure_resources_dir()
init_db()
@app.get("/")
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/tts")
def api_list_tts():
rows = list_items()
payload = []
for row in rows:
size_bytes = row.get("size_bytes")
if size_bytes is None and row.get("filename"):
computed = get_file_size_bytes(row["filename"])
if computed is not None:
update_size_bytes(row["id"], computed)
size_bytes = computed
payload.append(
{
"id": row["id"],
"created_at": row["created_at"].isoformat(),
"display_time": format_display_time(row["created_at"]),
"filename": row["filename"],
"size_display": get_file_size_display(size_bytes),
}
)
return payload
@app.post("/api/tts")
def api_create_tts(payload: TtsCreateRequest):
text = (payload.text or "").strip()
voice = (payload.voice or "").strip().lower()
if len(text) < 11:
raise HTTPException(status_code=400, detail="텍스트는 11글자 이상이어야 합니다.")
created = create_item(text)
tts_id = created["id"]
created_at = created["created_at"]
timestamp = created_at.astimezone().strftime("%Y%m%d_%H%M%S")
filename = f"tts_{tts_id}_{timestamp}.mp3"
mp3_path = RESOURCES_DIR / filename
try:
text_to_mp3(text=text, mp3_path=str(mp3_path), voice=voice)
except Exception as exc:
logger.exception("TTS 생성 실패")
delete_item_by_id(tts_id)
raise HTTPException(status_code=500, detail=str(exc)) from exc
size_bytes = get_file_size_bytes(filename)
update_filename(tts_id, filename)
if size_bytes is not None:
update_size_bytes(tts_id, size_bytes)
return {
"id": tts_id,
"created_at": created_at.isoformat(),
"display_time": format_display_time(created_at),
"filename": filename,
"size_display": get_file_size_display(size_bytes),
}
@app.get("/api/tts/{tts_id}")
def api_get_tts(tts_id: int):
row = get_item(tts_id)
if not row:
raise HTTPException(status_code=404, detail="해당 항목이 없습니다.")
return {
"id": row["id"],
"text": row["text"],
"created_at": row["created_at"].isoformat(),
"display_time": format_display_time(row["created_at"]),
"filename": row["filename"],
"download_url": f"/api/tts/{row['id']}/download",
}
@app.get("/api/tts/{tts_id}/download")
def api_download_tts(tts_id: int):
row = get_item(tts_id)
if not row or not row["filename"]:
raise HTTPException(status_code=404, detail="파일이 없습니다.")
file_path = RESOURCES_DIR / row["filename"]
if not file_path.exists():
raise HTTPException(status_code=404, detail="파일이 없습니다.")
return FileResponse(
path=str(file_path),
media_type="audio/mpeg",
filename=row["filename"],
)
@app.delete("/api/tts")
def api_delete_tts(payload: TtsDeleteRequest):
ids = [int(i) for i in payload.ids if isinstance(i, int) or str(i).isdigit()]
if not ids:
raise HTTPException(status_code=400, detail="삭제할 항목이 없습니다.")
deleted_rows = delete_items(ids)
deleted_ids = []
for row in deleted_rows:
deleted_ids.append(row["id"])
if row.get("filename"):
file_path = RESOURCES_DIR / row["filename"]
if file_path.exists():
try:
file_path.unlink()
except OSError:
pass
return {"deleted": deleted_ids}

19
server/run.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -euo pipefail
cd /home/dsyoon/workspace/tts
CONDA_BASE="/home/dsyoon/workspace/miniconda3"
source "${CONDA_BASE}/bin/activate" tts
export LD_LIBRARY_PATH="${CONDA_PREFIX}/lib:${LD_LIBRARY_PATH:-}"
PORT="${PORT:-8019}"
if lsof -ti tcp:"${PORT}" >/dev/null 2>&1; then
echo "Stopping existing server on port ${PORT}..."
lsof -ti tcp:"${PORT}" | xargs -r kill -9
sleep 1
fi
PORT="${PORT}" nohup python -m uvicorn server.main:app --host 0.0.0.0 --port "${PORT}" > server.log 2>&1 &
echo "Server started (PID: $!). Logs: server.log"

285
server/tts_service.py Normal file
View File

@@ -0,0 +1,285 @@
import os
import re
import subprocess
import tempfile
from pathlib import Path
from typing import Optional, Tuple
import pyttsx3
_MMS_CACHE: Optional[Tuple[object, object]] = None
_LETTER_KO = {
"A": "에이",
"B": "",
"C": "",
"D": "",
"E": "",
"F": "에프",
"G": "",
"H": "에이치",
"I": "아이",
"J": "제이",
"K": "케이",
"L": "",
"M": "",
"N": "",
"O": "",
"P": "",
"Q": "",
"R": "",
"S": "에스",
"T": "",
"U": "",
"V": "브이",
"W": "더블유",
"X": "엑스",
"Y": "와이",
"Z": "",
}
_PHRASE_MAP = [
("Automatic Document Feeder", "오토매틱 도큐먼트 피더"),
("Naver Blog", "네이버 블로그"),
("Brother Korea", "브라더 코리아"),
]
_NUM_KO = {
0: "",
1: "",
2: "",
3: "",
4: "",
5: "",
6: "",
7: "",
8: "",
9: "",
}
def _get_mms():
global _MMS_CACHE
if _MMS_CACHE is not None:
return _MMS_CACHE
try:
from transformers import VitsModel, AutoTokenizer
import torch
except Exception as exc:
raise RuntimeError("MMS TTS 사용을 위해 transformers/torch 설치가 필요합니다.") from exc
model_name = os.getenv("MMS_MODEL", "facebook/mms-tts-kor")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = VitsModel.from_pretrained(model_name)
model.eval()
_MMS_CACHE = (model, tokenizer)
return _MMS_CACHE
def _text_to_wav_mms(text: str, wav_path: str) -> None:
try:
import torch
except Exception as exc:
raise RuntimeError("MMS TTS 사용을 위해 torch/numpy가 정상 설치되어야 합니다.") from exc
try:
import soundfile as sf
except Exception as exc:
raise RuntimeError("MMS TTS 사용을 위해 soundfile 설치가 필요합니다.") from exc
model, tokenizer = _get_mms()
text = text.strip()
if not text:
raise RuntimeError("MMS 입력 텍스트가 비어 있습니다.")
# 한국어 입력은 uroman 전처리가 필요할 수 있음
try:
from uroman import uroman
text = uroman(text)
except Exception:
pass
inputs = tokenizer(text, return_tensors="pt")
if inputs["input_ids"].shape[1] == 0:
raise RuntimeError("MMS 토크나이저 입력이 비어 있습니다.")
with torch.no_grad():
audio = model(**inputs).waveform.squeeze().cpu().numpy()
sample_rate = getattr(model.config, "sampling_rate", 22050)
# MMS 출력은 float이므로 PCM16으로 저장해 왜곡을 줄입니다.
sf.write(wav_path, audio, sample_rate, subtype="PCM_16")
def _select_korean_voice(engine: pyttsx3.Engine, prefer_female: bool = False) -> None:
try:
voices = engine.getProperty("voices") or []
except Exception:
return
def _voice_info(v):
values = []
if getattr(v, "languages", None):
values.extend(v.languages)
if getattr(v, "id", None):
values.append(v.id)
if getattr(v, "name", None):
values.append(v.name)
return " ".join(str(x) for x in values).lower()
def _is_korean(info: str) -> bool:
return "ko" in info or "korean" in info
def _is_female(info: str) -> bool:
return any(token in info for token in ["female", "woman", "girl", "여성", "여자"])
if prefer_female:
for voice in voices:
info = _voice_info(voice)
if _is_korean(info) and _is_female(info):
try:
engine.setProperty("voice", voice.id)
return
except Exception:
continue
for voice in voices:
info = _voice_info(voice)
if _is_korean(info):
try:
engine.setProperty("voice", voice.id)
return
except Exception:
continue
def _spell_abbrev(match: re.Match) -> str:
return " ".join(_LETTER_KO.get(ch, ch) for ch in match.group(0))
def _sino_korean(num: int) -> str:
if num == 0:
return _NUM_KO[0]
parts = []
if num >= 1000:
thousands = num // 1000
if thousands > 1:
parts.append(_NUM_KO[thousands])
parts.append("")
num %= 1000
if num >= 100:
hundreds = num // 100
if hundreds > 1:
parts.append(_NUM_KO[hundreds])
parts.append("")
num %= 100
if num >= 10:
tens = num // 10
if tens > 1:
parts.append(_NUM_KO[tens])
parts.append("")
num %= 10
if num > 0:
parts.append(_NUM_KO[num])
return "".join(parts)
def _replace_numbers(text: str) -> str:
def _year(match: re.Match) -> str:
return f"{_sino_korean(int(match.group(1)))}"
def _month_day(match: re.Match) -> str:
month = _sino_korean(int(match.group(1)))
day = _sino_korean(int(match.group(2)))
return f"{month}{day}"
def _approx(match: re.Match) -> str:
return f"{_sino_korean(int(match.group(1)))}"
def _count(match: re.Match) -> str:
return f"{_sino_korean(int(match.group(1)))}"
text = re.sub(r"\b(\d{4})\s*년\b", _year, text)
text = re.sub(r"\b(\d{1,2})\s*월\s*(\d{1,2})\s*일\b", _month_day, text)
text = re.sub(r"\b(\d+)\s*여\b", _approx, text)
text = re.sub(r"\b(\d+)\s*명\b", _count, text)
return text
def _preprocess_text(text: str) -> str:
# 영어 약어/브랜드 발음 보정
for src, dst in _PHRASE_MAP:
text = re.sub(rf"\b{re.escape(src)}\b", dst, text, flags=re.IGNORECASE)
text = _replace_numbers(text)
text = re.sub(r"\b[A-Z]{2,6}\b", _spell_abbrev, text)
# 괄호/구두점으로 인한 끊김을 완화
text = text.replace("(", " ").replace(")", " ")
return text
def text_to_mp3(text: str, mp3_path: str, voice: Optional[str] = None) -> None:
if not text:
raise RuntimeError("텍스트가 비어 있습니다.")
text = _preprocess_text(text)
mp3_target = Path(mp3_path)
mp3_target.parent.mkdir(parents=True, exist_ok=True)
tts_engine = os.getenv("TTS_ENGINE", "pyttsx3").strip().lower()
voice = (voice or "").strip().lower() or None
wav_fd, wav_path = tempfile.mkstemp(suffix=".wav")
os.close(wav_fd)
try:
if tts_engine == "mms":
_text_to_wav_mms(text, wav_path)
audio_filter = "highpass=f=80,lowpass=f=12000"
else:
engine = pyttsx3.init()
# 음질 개선: 속도/볼륨 조정 및 한국어 음성 우선 선택
try:
# 서버 음성이 늘어지는 현상 완화
engine.setProperty("rate", 210)
engine.setProperty("volume", 1.0)
except Exception:
pass
_select_korean_voice(engine, prefer_female=voice == "female")
# pyttsx3로 wav 생성 후 ffmpeg로 mp3 변환
engine.save_to_file(text, wav_path)
engine.runAndWait()
audio_filter = "loudnorm=I=-16:LRA=11:TP=-1.5,atempo=1.15"
subprocess.run(
[
"ffmpeg",
"-y",
"-i",
wav_path,
"-ac",
"2",
"-ar",
"44100",
"-b:a",
"192k",
"-af",
audio_filter,
str(mp3_target),
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if not mp3_target.exists():
raise RuntimeError("mp3 파일 생성에 실패했습니다.")
except subprocess.CalledProcessError as exc:
raise RuntimeError("ffmpeg 변환에 실패했습니다.") from exc
except OSError as exc:
raise RuntimeError("파일 생성 권한 또는 경로 오류입니다.") from exc
finally:
try:
os.remove(wav_path)
except OSError:
pass