refactor: DeepCoin 1·2단계 파이프라인으로 구조 재편

레거시 분석·매칭·운영 코드를 정리하고 src/deepcoin 기반으로 재구성한다.
1단계 GT는 2년 스윙·눌림목·돌파·다이버전스 타점을 차트에 표시하고,
2단계는 8개 매매 기법과 GT 정합 평가 스크립트를 추가한다.
.env와 GT JSON 산출물은 추적에서 제외한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dsyoon
2026-06-08 23:51:26 +09:00
parent 51f70076fb
commit df3c9aecb9
154 changed files with 4629 additions and 215122 deletions

3
src/deepcoin/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""DeepCoin — 빗썸 암호화폐 데이터 수집·분석."""
__version__ = "0.1.0"

View File

@@ -0,0 +1 @@
"""외부 API 클라이언트."""

134
src/deepcoin/api/bithumb.py Normal file
View File

@@ -0,0 +1,134 @@
"""빗썸 Public REST API — 캔들 조회."""
from __future__ import annotations
import logging
import time
from datetime import datetime
from typing import Any
import requests
from deepcoin.data.intervals import INTERVAL_DAILY, INTERVAL_MONTHLY, INTERVAL_WEEKLY
logger = logging.getLogger(__name__)
# 인터벌(분) → 빗썸 candles API 경로 세그먼트
_CALENDAR_PATHS: dict[int, str] = {
INTERVAL_DAILY: "days",
INTERVAL_WEEKLY: "weeks",
INTERVAL_MONTHLY: "months",
}
KST_FMT = "%Y-%m-%d %H:%M:%S"
KST_FMT_T = "%Y-%m-%dT%H:%M:%S"
def parse_kst_datetime(value: str) -> datetime:
"""KST 캔들 시각 문자열을 datetime으로 변환한다.
Args:
value: `yyyy-MM-dd HH:mm:ss` 또는 ISO 형식.
Returns:
naive datetime (KST 기준).
"""
normalized = value.strip().replace("T", " ")
return datetime.strptime(normalized, KST_FMT)
def format_kst_datetime(dt: datetime) -> str:
"""datetime을 빗썸 `to` 파라미터 형식으로 포맷한다.
Args:
dt: KST 기준 시각.
Returns:
`yyyy-MM-dd HH:mm:ss` 문자열.
"""
return dt.strftime(KST_FMT)
class BithumbCandleClient:
"""빗썸 캔들 API 클라이언트."""
def __init__(
self,
base_url: str = "https://api.bithumb.com",
count: int = 200,
sleep_sec: float = 0.35,
retries: int = 3,
) -> None:
"""클라이언트를 초기화한다.
Args:
base_url: API 베이스 URL.
count: 요청당 캔들 개수 (최대 200).
sleep_sec: 연속 요청 간 대기(초).
retries: 실패 시 재시도 횟수.
"""
self.base_url = base_url.rstrip("/")
self.count = min(max(count, 1), 200)
self.sleep_sec = sleep_sec
self.retries = retries
self._session = requests.Session()
self._session.headers.update({"accept": "application/json"})
def fetch_candles(
self,
market: str,
interval_min: int,
to_kst: datetime | None = None,
) -> list[dict[str, Any]]:
"""캔들 배치를 조회한다 (최신순).
Args:
market: 거래 페어 (예: KRW-WLD).
interval_min: 분 단위. 1440이면 일봉 API 사용.
to_kst: 조회 기준 시각(KST). 해당 시각 캔들은 제외.
Returns:
캔들 dict 리스트. API 오류 시 빈 리스트.
Raises:
requests.RequestException: 재시도 후에도 네트워크 실패.
"""
if interval_min in _CALENDAR_PATHS:
segment = _CALENDAR_PATHS[interval_min]
url = f"{self.base_url}/v1/candles/{segment}"
params: dict[str, Any] = {"market": market, "count": self.count}
else:
url = f"{self.base_url}/v1/candles/minutes/{interval_min}"
params = {"market": market, "count": self.count}
if to_kst is not None:
params["to"] = format_kst_datetime(to_kst)
last_error: Exception | None = None
for attempt in range(1, self.retries + 1):
try:
response = self._session.get(url, params=params, timeout=30)
if response.status_code == 429:
wait = self.sleep_sec * attempt * 3
logger.warning("Rate limit 429 — %ss 대기 후 재시도", wait)
time.sleep(wait)
continue
response.raise_for_status()
payload = response.json()
if isinstance(payload, dict) and "error" in payload:
logger.error("API error: %s", payload["error"])
return []
if not isinstance(payload, list):
logger.error("Unexpected response type: %s", type(payload))
return []
time.sleep(self.sleep_sec)
return payload
except requests.RequestException as exc:
last_error = exc
wait = self.sleep_sec * attempt * 2
logger.warning("Request failed (%s/%s): %s", attempt, self.retries, exc)
time.sleep(wait)
if last_error is not None:
raise last_error
return []

148
src/deepcoin/config.py Normal file
View File

@@ -0,0 +1,148 @@
"""환경 변수 로드 및 애플리케이션 설정."""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from dotenv import load_dotenv
from deepcoin.data.intervals import DEFAULT_DOWNLOAD_INTERVALS
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
def _parse_int_list(raw: str) -> list[int]:
"""쉼표 구분 정수 목록을 파싱한다.
Args:
raw: 예) "3,5,10,15"
Returns:
정수 리스트. 빈 입력이면 빈 리스트.
"""
if not raw or not raw.strip():
return []
return [int(part.strip()) for part in raw.split(",") if part.strip()]
@dataclass(frozen=True)
class Settings:
"""DeepCoin 실행 설정."""
symbol: str
coin_name: str
api_url: str
candle_count: int
download_intervals: list[int]
download_days: int
db_path: Path
request_sleep_sec: float
request_retries: int
# Ground Truth (1단계)
gt_interval_min: int
gt_lookback_days: int
gt_zigzag_reversal_pct: float
gt_min_leg_pct: float
gt_pullback_min_pct: float
gt_pullback_local_order: int
gt_breakout_buffer_pct: float
gt_breakout_consolidation_bars: int
gt_breakout_min_rally_pct: float
gt_div_local_order: int
gt_div_min_bars_between: int
gt_div_min_rsi_diff: float
gt_div_min_future_move_pct: float
ground_truth_file: Path
ground_truth_chart_file: Path
gt_initial_cash_krw: float
gt_trading_fee_rate: float
# Techniques (2단계)
techniques_dir: Path
analysis_report_json: Path
analysis_report_html: Path
gt_align_tolerance_bars: int
@property
def market(self) -> str:
"""빗썸 마켓 코드 (예: KRW-BTC)."""
return f"KRW-{self.symbol}"
def load_settings(env_path: Path | None = None) -> Settings:
"""`.env`를 로드하고 Settings를 반환한다.
Args:
env_path: `.env` 경로. None이면 프로젝트 루트.
Returns:
Settings 인스턴스.
"""
path = env_path or (_PROJECT_ROOT / ".env")
load_dotenv(path, override=False)
default_intervals = ",".join(str(i) for i in DEFAULT_DOWNLOAD_INTERVALS)
intervals_raw = os.getenv("DOWNLOAD_INTERVALS", default_intervals)
intervals = sorted(set(_parse_int_list(intervals_raw)))
db_raw = os.getenv("DB_PATH", "coins.db")
db_path = Path(db_raw)
if not db_path.is_absolute():
db_path = _PROJECT_ROOT / db_path
gt_file_raw = os.getenv("GROUND_TRUTH_FILE", "data/ground_truth/ground_truth_trades.json")
gt_chart_raw = os.getenv("GROUND_TRUTH_CHART_FILE", "docs/02_ground_truth/ground_truth_chart.html")
gt_file = Path(gt_file_raw)
gt_chart = Path(gt_chart_raw)
if not gt_file.is_absolute():
gt_file = _PROJECT_ROOT / gt_file
if not gt_chart.is_absolute():
gt_chart = _PROJECT_ROOT / gt_chart
tech_dir_raw = os.getenv("TECHNIQUES_DIR", "data/techniques")
tech_dir = Path(tech_dir_raw)
if not tech_dir.is_absolute():
tech_dir = _PROJECT_ROOT / tech_dir
analysis_json_raw = os.getenv("ANALYSIS_REPORT_JSON", "docs/03_analysis/comparison_report.json")
analysis_html_raw = os.getenv("ANALYSIS_REPORT_HTML", "docs/03_analysis/comparison_report.html")
analysis_json = Path(analysis_json_raw)
analysis_html = Path(analysis_html_raw)
if not analysis_json.is_absolute():
analysis_json = _PROJECT_ROOT / analysis_json
if not analysis_html.is_absolute():
analysis_html = _PROJECT_ROOT / analysis_html
return Settings(
symbol=os.getenv("SYMBOL", "BTC").upper(),
coin_name=os.getenv("COIN_NAME", "비트코인"),
api_url=os.getenv("BITHUMB_API_URL", "https://api.bithumb.com").rstrip("/"),
candle_count=int(os.getenv("BITHUMB_API_CANDLE_COUNT", "200")),
download_intervals=intervals,
download_days=int(os.getenv("DOWNLOAD_DAYS", "730")),
db_path=db_path,
request_sleep_sec=float(os.getenv("API_REQUEST_SLEEP_SEC", "0.35")),
request_retries=int(os.getenv("API_REQUEST_RETRIES", "3")),
gt_interval_min=int(os.getenv("GT_INTERVAL_MIN", "240")),
gt_lookback_days=int(os.getenv("GT_LOOKBACK_DAYS", "730")),
gt_zigzag_reversal_pct=float(os.getenv("GT_ZIGZAG_REVERSAL_PCT", "5.0")),
gt_min_leg_pct=float(os.getenv("GT_MIN_LEG_PCT", "3.0")),
gt_pullback_min_pct=float(os.getenv("GT_PULLBACK_MIN_PCT", "1.5")),
gt_pullback_local_order=int(os.getenv("GT_PULLBACK_LOCAL_ORDER", "10")),
gt_breakout_buffer_pct=float(os.getenv("GT_BREAKOUT_BUFFER_PCT", "0.1")),
gt_breakout_consolidation_bars=int(os.getenv("GT_BREAKOUT_CONSOLIDATION_BARS", "200")),
gt_breakout_min_rally_pct=float(os.getenv("GT_BREAKOUT_MIN_RALLY_PCT", "2.0")),
gt_div_local_order=int(os.getenv("GT_DIV_LOCAL_ORDER", "20")),
gt_div_min_bars_between=int(os.getenv("GT_DIV_MIN_BARS_BETWEEN", "1500")),
gt_div_min_rsi_diff=float(os.getenv("GT_DIV_MIN_RSI_DIFF", "5.0")),
gt_div_min_future_move_pct=float(os.getenv("GT_DIV_MIN_FUTURE_MOVE_PCT", "4.0")),
ground_truth_file=gt_file,
ground_truth_chart_file=gt_chart,
gt_initial_cash_krw=float(os.getenv("GT_INITIAL_CASH_KRW", "400000")),
gt_trading_fee_rate=float(os.getenv("GT_TRADING_FEE_RATE", "0.0005")),
techniques_dir=tech_dir,
analysis_report_json=analysis_json,
analysis_report_html=analysis_html,
gt_align_tolerance_bars=int(os.getenv("GT_ALIGN_TOLERANCE_BARS", "480")),
)

View File

@@ -0,0 +1 @@
"""데이터 저장·수집."""

View File

@@ -0,0 +1,70 @@
"""SQLite 캔들 로더."""
from __future__ import annotations
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from deepcoin.data.candle_store import table_name
def load_candles(
db_path: Path,
symbol: str,
interval_min: int,
lookback_days: int | None = None,
end_dt: datetime | None = None,
) -> pd.DataFrame:
"""DB에서 캔들 DataFrame을 로드한다.
Args:
db_path: SQLite 경로.
symbol: 코인 심볼.
interval_min: 분 단위 인터벌.
lookback_days: 최근 N일만 로드. None이면 전체.
end_dt: 종료 시각(KST). None이면 DB 최신.
Returns:
컬럼: datetime, open, high, low, close, volume (시간 오름차순).
"""
table = table_name(symbol, interval_min)
conn = sqlite3.connect(db_path)
try:
query = f"""
SELECT ymdhms, Open, High, Low, Close, Volume
FROM {table}
WHERE CODE = ?
ORDER BY ymdhms ASC
"""
df = pd.read_sql_query(query, conn, params=(symbol.upper(),))
except sqlite3.OperationalError as exc:
raise ValueError(f"테이블 {table} 없음 — 먼저 download_candles.py 실행") from exc
finally:
conn.close()
if df.empty:
raise ValueError(f"{table} 데이터가 비어 있습니다.")
df = df.rename(
columns={
"ymdhms": "datetime",
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
}
)
df["datetime"] = pd.to_datetime(df["datetime"])
if end_dt is not None:
df = df[df["datetime"] <= end_dt]
if lookback_days is not None and not df.empty:
cutoff = df["datetime"].max() - timedelta(days=lookback_days)
df = df[df["datetime"] >= cutoff]
return df.reset_index(drop=True)

View File

@@ -0,0 +1,216 @@
"""SQLite 캔들 저장소."""
from __future__ import annotations
import logging
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
from deepcoin.api.bithumb import parse_kst_datetime
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class CandleRow:
"""DB에 저장할 단일 캔들 행."""
code: str
name: str
ymdhms: str
ymd: str
hms: str
close: float
open: float
high: float
low: float
volume: float
def table_name(symbol: str, interval_min: int) -> str:
"""심볼·인터벌에 대응하는 테이블명을 반환한다.
Args:
symbol: 코인 심볼 (예: WLD).
interval_min: 분 단위 (1440=일봉).
Returns:
테이블명 (예: WLD_60).
"""
return f"{symbol.upper()}_{interval_min}"
def candle_to_row(
candle: dict[str, Any],
symbol: str,
coin_name: str,
) -> CandleRow:
"""빗썸 API 캔들 dict를 CandleRow로 변환한다.
Args:
candle: API 응답 캔들 객체.
symbol: 코인 심볼.
coin_name: 코인 한글명.
Returns:
CandleRow.
"""
kst_raw = candle["candle_date_time_kst"]
dt = parse_kst_datetime(kst_raw)
ymdhms = dt.strftime("%Y-%m-%d %H:%M:%S")
return CandleRow(
code=symbol.upper(),
name=coin_name,
ymdhms=ymdhms,
ymd=dt.strftime("%Y-%m-%d"),
hms=dt.strftime("%H:%M:%S"),
close=float(candle["trade_price"]),
open=float(candle["opening_price"]),
high=float(candle["high_price"]),
low=float(candle["low_price"]),
volume=float(candle["candle_acc_trade_volume"]),
)
class CandleStore:
"""SQLite 기반 캔들 저장소."""
def __init__(self, db_path: Path) -> None:
"""저장소를 연다.
Args:
db_path: SQLite DB 파일 경로.
"""
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(self.db_path)
self._conn.execute("PRAGMA journal_mode=WAL")
def close(self) -> None:
"""DB 연결을 닫는다."""
self._conn.close()
def ensure_table(self, symbol: str, interval_min: int) -> str:
"""테이블과 인덱스를 보장한다.
Args:
symbol: 코인 심볼.
interval_min: 분 단위.
Returns:
테이블명.
"""
name = table_name(symbol, interval_min)
self._conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {name} (
CODE text,
NAME text,
ymdhms datetime,
ymd text,
hms text,
Close REAL,
Open REAL,
High REAL,
Low REAL,
Volume REAL
)
"""
)
self._conn.execute(
f"CREATE INDEX IF NOT EXISTS {name}_idx ON {name}(CODE, ymdhms)"
)
self._ensure_unique_index(name)
self._conn.commit()
return name
def _ensure_unique_index(self, table: str) -> None:
"""(CODE, ymdhms) 유니크 인덱스를 보장한다. 중복이 있으면 제거 후 생성.
Args:
table: 테이블명.
"""
index_name = f"{table}_uk"
existing = self._conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND name=?",
(index_name,),
).fetchone()
if existing:
return
self._conn.execute(
f"""
DELETE FROM {table}
WHERE rowid NOT IN (
SELECT MAX(rowid) FROM {table} GROUP BY CODE, ymdhms
)
"""
)
self._conn.execute(
f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table}(CODE, ymdhms)"
)
def upsert_rows(self, table: str, rows: list[CandleRow]) -> int:
"""캔들 행을 upsert한다.
Args:
table: 대상 테이블명.
rows: 저장할 행 목록.
Returns:
저장(갱신)된 행 수.
"""
if not rows:
return 0
sql = f"""
INSERT OR REPLACE INTO {table}
(CODE, NAME, ymdhms, ymd, hms, Close, Open, High, Low, Volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
payload = [
(
row.code,
row.name,
row.ymdhms,
row.ymd,
row.hms,
row.close,
row.open,
row.high,
row.low,
row.volume,
)
for row in rows
]
with self._conn:
self._conn.executemany(sql, payload)
return len(rows)
def get_range(self, symbol: str, interval_min: int) -> tuple[int, datetime | None, datetime | None]:
"""저장된 캔들 범위를 조회한다.
Args:
symbol: 코인 심볼.
interval_min: 분 단위.
Returns:
(행 수, 최소 시각, 최대 시각). 데이터 없으면 (0, None, None).
"""
table = table_name(symbol, interval_min)
try:
cursor = self._conn.execute(
f"SELECT COUNT(*), MIN(ymdhms), MAX(ymdhms) FROM {table}"
)
count, min_raw, max_raw = cursor.fetchone()
except sqlite3.OperationalError:
return 0, None, None
if not count:
return 0, None, None
min_dt = datetime.strptime(min_raw, "%Y-%m-%d %H:%M:%S") if min_raw else None
max_dt = datetime.strptime(max_raw, "%Y-%m-%d %H:%M:%S") if max_raw else None
return int(count), min_dt, max_dt

View File

@@ -0,0 +1,161 @@
"""빗썸 캔들 역방향 페이지네이션 다운로더."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
from deepcoin.api.bithumb import BithumbCandleClient, parse_kst_datetime
from deepcoin.config import Settings
from deepcoin.data.candle_store import CandleStore, candle_to_row
from deepcoin.data.intervals import interval_label
logger = logging.getLogger(__name__)
@dataclass
class DownloadResult:
"""단일 인터벌 다운로드 결과."""
interval_min: int
requests: int
saved_rows: int
oldest_kst: datetime | None
newest_kst: datetime | None
reached_target: bool
class CandleDownloader:
"""지정 기간 캔들을 수집해 SQLite에 저장한다."""
def __init__(self, settings: Settings, client: BithumbCandleClient | None = None) -> None:
"""다운로더를 초기화한다.
Args:
settings: 애플리케이션 설정.
client: API 클라이언트. None이면 기본 생성.
"""
self.settings = settings
self.client = client or BithumbCandleClient(
base_url=settings.api_url,
count=settings.candle_count,
sleep_sec=settings.request_sleep_sec,
retries=settings.request_retries,
)
def download_interval(
self,
store: CandleStore,
interval_min: int,
days: int | None = None,
) -> DownloadResult:
"""한 인터벌의 캔들을 역방향으로 수집한다.
Args:
store: SQLite 저장소.
interval_min: 분 단위 (1440=일봉).
days: 수집 일수. None이면 settings.download_days.
Returns:
DownloadResult.
"""
lookback_days = days if days is not None else self.settings.download_days
target_start = datetime.now() - timedelta(days=lookback_days)
table = store.ensure_table(self.settings.symbol, interval_min)
to_kst: datetime | None = None
requests = 0
saved_rows = 0
oldest_kst: datetime | None = None
newest_kst: datetime | None = None
reached_target = False
seen_oldest: set[str] = set()
logger.info(
"수집 시작: %s %s, 목표=%s 이후",
self.settings.market,
interval_label(interval_min),
target_start.strftime("%Y-%m-%d %H:%M:%S"),
)
while True:
batch = self.client.fetch_candles(
market=self.settings.market,
interval_min=interval_min,
to_kst=to_kst,
)
requests += 1
if not batch:
logger.info("%s: 더 이상 데이터 없음", interval_label(interval_min))
break
rows = [
candle_to_row(c, self.settings.symbol, self.settings.coin_name)
for c in batch
]
saved_rows += store.upsert_rows(table, rows)
batch_oldest = parse_kst_datetime(batch[-1]["candle_date_time_kst"])
batch_newest = parse_kst_datetime(batch[0]["candle_date_time_kst"])
if newest_kst is None or batch_newest > newest_kst:
newest_kst = batch_newest
if oldest_kst is None or batch_oldest < oldest_kst:
oldest_kst = batch_oldest
if batch_oldest <= target_start:
reached_target = True
logger.info(
"%s: 목표 기간 도달 (oldest=%s)",
interval_label(interval_min),
batch_oldest.strftime("%Y-%m-%d %H:%M:%S"),
)
break
oldest_key = batch[-1]["candle_date_time_kst"]
if oldest_key in seen_oldest:
logger.warning("%s: 페이지네이션 정체 — 중단", interval_label(interval_min))
break
seen_oldest.add(oldest_key)
to_kst = batch_oldest
if requests % 20 == 0:
logger.info(
"%s: 진행 중 requests=%s saved=%s oldest=%s",
interval_label(interval_min),
requests,
saved_rows,
batch_oldest.strftime("%Y-%m-%d %H:%M:%S"),
)
return DownloadResult(
interval_min=interval_min,
requests=requests,
saved_rows=saved_rows,
oldest_kst=oldest_kst,
newest_kst=newest_kst,
reached_target=reached_target,
)
def download_all(self, store: CandleStore, days: int | None = None) -> list[DownloadResult]:
"""설정된 모든 인터벌을 수집한다.
Args:
store: SQLite 저장소.
days: 수집 일수.
Returns:
인터벌별 DownloadResult 리스트.
"""
results: list[DownloadResult] = []
for interval in self.settings.download_intervals:
if interval == 1:
logger.warning("1분봉은 장기 수집 시 요청량이 매우 큽니다 — 건너뜁니다.")
continue
result = self.download_interval(store, interval, days=days)
results.append(result)
return results

View File

@@ -0,0 +1,47 @@
"""캔들 인터벌 상수 및 라벨."""
from __future__ import annotations
# 분 단위 인터벌 코드 (DB 테이블명: {SYMBOL}_{코드})
INTERVAL_DAILY = 1440
INTERVAL_WEEKLY = 10080 # 7 * 24 * 60
INTERVAL_MONTHLY = 43200 # 30 * 24 * 60 (월봉 식별용 관례값)
MINUTE_INTERVALS = frozenset({1, 3, 5, 10, 15, 30, 60, 240})
CALENDAR_INTERVALS = frozenset({INTERVAL_DAILY, INTERVAL_WEEKLY, INTERVAL_MONTHLY})
DEFAULT_DOWNLOAD_INTERVALS = [
3, 5, 10, 15, 30, 60, 240,
INTERVAL_DAILY, INTERVAL_WEEKLY, INTERVAL_MONTHLY,
]
INTERVAL_LABELS: dict[int, str] = {
1: "1분",
3: "3분",
5: "5분",
10: "10분",
15: "15분",
30: "30분",
60: "60분",
240: "240분",
INTERVAL_DAILY: "일봉",
INTERVAL_WEEKLY: "주봉",
INTERVAL_MONTHLY: "월봉",
}
def interval_label(interval_min: int) -> str:
"""인터벌 표시명을 반환한다.
Args:
interval_min: 분 단위 인터벌 코드.
Returns:
한글 라벨 (예: 일봉, 60분).
"""
return INTERVAL_LABELS.get(interval_min, f"{interval_min}")
def is_calendar_interval(interval_min: int) -> bool:
"""일/주/월봉 여부."""
return interval_min in CALENDAR_INTERVALS

View File

@@ -0,0 +1,11 @@
"""Ground Truth 정합 평가."""
from deepcoin.evaluation.gt_align import align_with_ground_truth
from deepcoin.evaluation.report import build_comparison_report, render_comparison_html, save_comparison_report
__all__ = [
"align_with_ground_truth",
"build_comparison_report",
"render_comparison_html",
"save_comparison_report",
]

View File

@@ -0,0 +1,223 @@
"""Ground Truth와 기법 신호·레그 정합 평가."""
from __future__ import annotations
from typing import Any
def _bar_distance(a: int, b: int) -> int:
"""두 봉 인덱스 간 절대 거리."""
return abs(a - b)
def _match_signal(
gt_bar: int,
candidates: list[dict[str, Any]],
tolerance_bars: int,
used: set[int],
) -> dict[str, Any] | None:
"""GT 신호에 가장 가까운 기법 신호를 찾는다."""
best: dict[str, Any] | None = None
best_dist = tolerance_bars + 1
best_idx = -1
for idx, candidate in enumerate(candidates):
if idx in used:
continue
pivot_bar = candidate.get("pivot_bar_index")
compare_bar = pivot_bar if pivot_bar is not None else candidate["bar_index"]
dist = _bar_distance(gt_bar, compare_bar)
if dist <= tolerance_bars and dist < best_dist:
best = candidate
best_dist = dist
best_idx = idx
if best is None:
return None
return {
"matched": True,
"gt_bar_index": gt_bar,
"tech_bar_index": best.get("pivot_bar_index") or best["bar_index"],
"signal_bar_index": best["bar_index"],
"bar_offset": best_dist,
"tech_price": best["price"],
"tech_datetime": best["datetime"],
"candidate_index": best_idx,
}
def align_signals(
gt_signals: list[dict[str, Any]],
tech_signals: list[dict[str, Any]],
tolerance_bars: int,
side: str,
) -> dict[str, Any]:
"""GT 신호와 기법 신호의 정합률을 계산한다.
Args:
gt_signals: GT signals 리스트.
tech_signals: 기법 신호 dict 리스트.
tolerance_bars: 허용 봉 오차.
side: buy | sell.
Returns:
정합 메트릭 dict.
"""
gt_filtered = [s for s in gt_signals if s["side"] == side]
tech_filtered = [s for s in tech_signals if s["side"] == side]
used: set[int] = set()
hits: list[dict[str, Any]] = []
misses: list[dict[str, Any]] = []
for gt_sig in gt_filtered:
match = _match_signal(gt_sig["bar_index"], tech_filtered, tolerance_bars, used)
if match:
used.add(match["candidate_index"])
hits.append({**match, "gt_datetime": gt_sig["datetime"], "gt_price": gt_sig["price"]})
else:
misses.append(
{
"gt_bar_index": gt_sig["bar_index"],
"gt_datetime": gt_sig["datetime"],
"gt_price": gt_sig["price"],
}
)
gt_count = len(gt_filtered)
hit_count = len(hits)
recall = hit_count / gt_count if gt_count else 0.0
precision = hit_count / len(tech_filtered) if tech_filtered else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0
avg_offset = sum(h["bar_offset"] for h in hits) / hit_count if hit_count else 0.0
return {
"side": side,
"gt_count": gt_count,
"tech_count": len(tech_filtered),
"hit_count": hit_count,
"miss_count": len(misses),
"recall": round(recall, 4),
"precision": round(precision, 4),
"f1": round(f1, 4),
"avg_bar_offset": round(avg_offset, 1),
"hits": hits,
"misses": misses,
}
def align_legs(
gt_legs: list[dict[str, Any]],
tech_legs: list[dict[str, Any]],
tolerance_bars: int,
) -> dict[str, Any]:
"""GT 레그와 기법 레그의 정합률을 계산한다."""
captured: list[dict[str, Any]] = []
missed: list[dict[str, Any]] = []
used_tech: set[int] = set()
for gt_leg in gt_legs:
best_leg: dict[str, Any] | None = None
best_score = tolerance_bars * 2 + 1
best_id = -1
for tech_leg in tech_legs:
tid = int(tech_leg["leg_id"])
if tid in used_tech:
continue
buy_dist = _bar_distance(gt_leg["buy_bar_index"], tech_leg["buy_bar_index"])
sell_dist = _bar_distance(gt_leg["sell_bar_index"], tech_leg["sell_bar_index"])
score = buy_dist + sell_dist
if buy_dist <= tolerance_bars and sell_dist <= tolerance_bars and score < best_score:
best_leg = tech_leg
best_score = score
best_id = tid
if best_leg:
used_tech.add(best_id)
captured.append(
{
"gt_leg_id": gt_leg["leg_id"],
"tech_leg_id": best_leg["leg_id"],
"gt_buy": gt_leg["buy_datetime"],
"tech_buy": best_leg["buy_datetime"],
"gt_sell": gt_leg["sell_datetime"],
"tech_sell": best_leg["sell_datetime"],
"buy_bar_offset": _bar_distance(gt_leg["buy_bar_index"], best_leg["buy_bar_index"]),
"sell_bar_offset": _bar_distance(gt_leg["sell_bar_index"], best_leg["sell_bar_index"]),
"gt_leg_pct": gt_leg["leg_pct"],
"tech_leg_pct": best_leg["leg_pct"],
}
)
else:
missed.append(
{
"gt_leg_id": gt_leg["leg_id"],
"buy_datetime": gt_leg["buy_datetime"],
"sell_datetime": gt_leg["sell_datetime"],
"leg_pct": gt_leg["leg_pct"],
}
)
gt_count = len(gt_legs)
recall = len(captured) / gt_count if gt_count else 0.0
return {
"gt_leg_count": gt_count,
"tech_leg_count": len(tech_legs),
"captured_count": len(captured),
"missed_count": len(missed),
"leg_recall": round(recall, 4),
"captured": captured,
"missed": missed,
}
def align_with_ground_truth(
gt_result: dict[str, Any],
technique_signals: list[dict[str, Any]],
technique_legs: list[dict[str, Any]],
tolerance_bars: int,
) -> dict[str, Any]:
"""GT 대비 기법 정합 결과 전체를 반환한다."""
gt_signals = gt_result.get("signals", [])
gt_legs = gt_result.get("legs", [])
gt_pnl = gt_result.get("pnl", {})
buy_align = align_signals(gt_signals, technique_signals, tolerance_bars, "buy")
sell_align = align_signals(gt_signals, technique_signals, tolerance_bars, "sell")
leg_align = align_legs(gt_legs, technique_legs, tolerance_bars)
tech_return = 0.0
if technique_legs:
from deepcoin.ground_truth.pnl import simulate_gt_pnl
tech_pnl = simulate_gt_pnl(
technique_legs,
initial_cash_krw=gt_pnl.get("initial_cash_krw", 400_000),
fee_rate=gt_pnl.get("fee_rate", 0.0005),
)
tech_return = tech_pnl["total_return_pct"]
gt_return = gt_pnl.get("total_return_pct", 0.0)
return_capture = tech_return / gt_return if gt_return else 0.0
return {
"tolerance_bars": tolerance_bars,
"buy": buy_align,
"sell": sell_align,
"legs": leg_align,
"gt_return_pct": gt_return,
"tech_return_pct": tech_return,
"return_capture_ratio": round(return_capture, 4),
"score": round(
(
buy_align["recall"] * 0.25
+ sell_align["recall"] * 0.25
+ leg_align["leg_recall"] * 0.35
+ min(return_capture, 1.0) * 0.15
),
4,
),
}

View File

@@ -0,0 +1,132 @@
"""2단계 기법 비교 리포트 생성."""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any
from deepcoin.techniques.base import TechniqueResult
def build_comparison_report(
results: list[TechniqueResult],
gt_result: dict[str, Any],
symbol: str,
) -> dict[str, Any]:
"""기법 비교 요약 리포트를 생성한다."""
gt_meta = gt_result.get("meta", {})
gt_pnl = gt_result.get("pnl", {})
gt_summary = gt_result.get("summary", {})
rows: list[dict[str, Any]] = []
for result in results:
align = result.alignment or {}
buy = align.get("buy", {})
sell = align.get("sell", {})
legs = align.get("legs", {})
rows.append(
{
"technique_id": result.technique_id,
"technique_name": result.technique_name,
"category": result.category,
"causal": result.causal,
"leg_count": result.summary.get("leg_count", 0),
"tech_return_pct": result.pnl.get("total_return_pct", 0.0),
"buy_recall": buy.get("recall", 0.0),
"sell_recall": sell.get("recall", 0.0),
"leg_recall": legs.get("leg_recall", 0.0),
"return_capture_ratio": align.get("return_capture_ratio", 0.0),
"score": align.get("score", 0.0),
"avg_buy_offset": buy.get("avg_bar_offset", 0.0),
"avg_sell_offset": sell.get("avg_bar_offset", 0.0),
}
)
rows.sort(key=lambda r: r["score"], reverse=True)
return {
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"symbol": symbol,
"gt": {
"leg_count": gt_summary.get("leg_count", 0),
"return_pct": gt_pnl.get("total_return_pct", 0.0),
"interval_label": gt_meta.get("interval_label", ""),
"lookback_days": gt_meta.get("lookback_days", 365),
},
"ranking": rows,
}
def save_comparison_report(report: dict[str, Any], json_path: Path) -> Path:
"""비교 리포트 JSON을 저장한다."""
json_path.parent.mkdir(parents=True, exist_ok=True)
with json_path.open("w", encoding="utf-8") as fp:
json.dump(report, fp, ensure_ascii=False, indent=2)
return json_path
def render_comparison_html(report: dict[str, Any], html_path: Path) -> Path:
"""비교 리포트 HTML을 생성한다."""
html_path.parent.mkdir(parents=True, exist_ok=True)
rows = report.get("ranking", [])
gt = report.get("gt", {})
table_rows = ""
for idx, row in enumerate(rows, start=1):
table_rows += f"""
<tr>
<td>{idx}</td>
<td>{row['technique_name']}</td>
<td>{row['category']}</td>
<td>{'Y' if row['causal'] else 'N'}</td>
<td>{row['leg_count']}</td>
<td>{row['tech_return_pct']:+.1f}%</td>
<td>{row['buy_recall']*100:.1f}%</td>
<td>{row['sell_recall']*100:.1f}%</td>
<td>{row['leg_recall']*100:.1f}%</td>
<td>{row['return_capture_ratio']*100:.1f}%</td>
<td><strong>{row['score']*100:.1f}</strong></td>
</tr>"""
html = f"""<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>DeepCoin 2단계 — GT 정합 비교</title>
<style>
body {{ font-family: "Malgun Gothic", Arial, sans-serif; margin: 24px; color: #333; background: #f5f5f5; }}
h1 {{ font-size: 20px; margin-bottom: 8px; }}
.meta {{ color: #666; margin-bottom: 20px; font-size: 14px; }}
table {{ border-collapse: collapse; width: 100%; background: #fff; }}
th, td {{ border: 1px solid #ddd; padding: 8px 10px; text-align: center; font-size: 13px; }}
th {{ background: #e8e8e8; }}
tr:nth-child(even) {{ background: #fafafa; }}
</style>
</head>
<body>
<h1>DeepCoin 2단계 — Ground Truth 정합 비교</h1>
<div class="meta">
생성: {report.get('generated_at', '')} |
{report.get('symbol', '')} |
GT: {gt.get('leg_count', 0)}레그, {gt.get('return_pct', 0):+.1f}% |
기간: 최근 {gt.get('lookback_days', 365)}
</div>
<table>
<thead>
<tr>
<th>순위</th><th>기법</th><th>유형</th><th>인과</th><th>레그</th>
<th>수익률</th><th>매수 Recall</th><th>매도 Recall</th><th>레그 Recall</th>
<th>수익 포착</th><th>종합 Score</th>
</tr>
</thead>
<tbody>{table_rows}
</tbody>
</table>
</body>
</html>"""
with html_path.open("w", encoding="utf-8") as fp:
fp.write(html)
return html_path

View File

@@ -0,0 +1 @@
"""Ground Truth — 사후 벤치마크 매수·매도 타점."""

View File

@@ -0,0 +1,162 @@
"""돌파 매수 타점 (Ground Truth 보조, 사후 최적)."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
import pandas as pd
from deepcoin.ground_truth.zigzag import Pivot
class _LegLike(Protocol):
"""레그 최소 인터페이스."""
leg_id: int
buy_bar_index: int
sell_bar_index: int
sell_price: float
@dataclass(frozen=True)
class BreakoutBuy:
"""돌파 매수 타점."""
bar_index: int
price: float
datetime: pd.Timestamp
leg_id: int
resistance_price: float
def find_breakout_buy_pivots(
df: pd.DataFrame,
legs: list[_LegLike],
pullback_buys: list[Pivot],
breakout_buffer_pct: float = 0.1,
min_bars_after_anchor: int = 10,
consolidation_bars: int = 200,
leg_end_margin_bars: int = 80,
min_rally_to_sell_pct: float = 2.0,
) -> list[BreakoutBuy]:
"""각 레그에서 횡보·눌림 후 저항 돌파 시점을 찾는다 (1단계 GT, 미래 허용).
눌림목 이후 구간 고점을 저항으로 보고, 종가가 저항을 돌파한
첫 봉을 레그당 돌파 매수 1개로 표시한다.
Args:
df: OHLCV DataFrame.
legs: ZigZag 스윙 레그.
pullback_buys: 눌림목 Pivot (레그별 매칭).
breakout_buffer_pct: 저항 대비 돌파 확인 버퍼(%).
min_bars_after_anchor: 눌림목/구간 시작 후 최소 대기 봉.
consolidation_bars: 횡보 저항 산정 구간(봉).
leg_end_margin_bars: 매도 직전 제외 봉.
min_rally_to_sell_pct: 돌파 후 레그 고점까지 최소 추가 상승(%).
Returns:
BreakoutBuy 리스트.
"""
if not legs or len(df) < 20:
return []
highs = df["high"].astype(float).values
closes = df["close"].astype(float).values
pullback_by_leg = _map_pullback_to_legs(legs, pullback_buys)
breakouts: list[BreakoutBuy] = []
buffer = breakout_buffer_pct / 100.0
for leg in legs:
anchor = pullback_by_leg.get(leg.leg_id)
if anchor is not None:
zone_start = anchor.bar_index
else:
leg_len = leg.sell_bar_index - leg.buy_bar_index
zone_start = max(
leg.buy_bar_index + 120,
leg.sell_bar_index - int(leg_len * 0.35),
)
zone_end = min(
zone_start + consolidation_bars,
leg.sell_bar_index - leg_end_margin_bars - 5,
)
search_start = zone_start + min_bars_after_anchor
search_end = leg.sell_bar_index - leg_end_margin_bars
if zone_end <= zone_start or search_end <= search_start + 5:
continue
resistance = float(highs[zone_start:zone_end].max())
if resistance <= 0:
continue
breakout = _first_breakout_above(
df=df,
highs=highs,
closes=closes,
search_start=max(search_start, zone_end),
search_end=search_end,
resistance=resistance,
buffer=buffer,
sell_price=float(leg.sell_price),
min_rally_pct=min_rally_to_sell_pct,
)
if breakout is None:
continue
bar_idx, price, resistance = breakout
breakouts.append(
BreakoutBuy(
bar_index=bar_idx,
price=round(price, 2),
datetime=pd.Timestamp(df.iloc[bar_idx]["datetime"]),
leg_id=leg.leg_id,
resistance_price=round(resistance, 2),
)
)
return breakouts
def _map_pullback_to_legs(legs: list[_LegLike], pullback_buys: list[Pivot]) -> dict[int, Pivot]:
"""눌림목을 소속 레그에 매핑한다."""
mapping: dict[int, Pivot] = {}
for pivot in pullback_buys:
for leg in legs:
if leg.buy_bar_index < pivot.bar_index < leg.sell_bar_index:
prev = mapping.get(leg.leg_id)
if prev is None or pivot.bar_index > prev.bar_index:
mapping[leg.leg_id] = pivot
return mapping
def _first_breakout_above(
df: pd.DataFrame,
highs,
closes,
search_start: int,
search_end: int,
resistance: float,
buffer: float,
sell_price: float,
min_rally_pct: float,
) -> tuple[int, float, float] | None:
"""고정 저항선 상향 돌파 첫 봉을 반환한다."""
threshold = resistance * (1 + buffer)
for i in range(search_start, search_end):
close_val = float(closes[i])
if close_val <= threshold:
continue
future_max = float(highs[i : search_end + 1].max())
rally_pct = (future_max - close_val) / close_val * 100.0
sell_rally_pct = (sell_price - close_val) / close_val * 100.0
if rally_pct < min_rally_pct or sell_rally_pct < min_rally_pct:
continue
return i, close_val, resistance
return None

View File

@@ -0,0 +1,500 @@
"""Ground Truth 차트 HTML 생성 (전체 기간 지원)."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
from deepcoin.data.candle_loader import load_candles
# 0이면 제한 없이 전체 봉 표시
DEFAULT_MAX_CANDLES = 0
def _data_js_path(html_path: Path) -> Path:
"""HTML과 짝을 이루는 데이터 JS 경로 (file:// 프로토콜 호환)."""
return html_path.with_name("ground_truth_chart_data.js")
def render_ground_truth_chart(
db_path: Path,
symbol: str,
gt_result: dict[str, Any],
output_path: Path,
chart_lookback_days: int | None = None,
max_candles: int = DEFAULT_MAX_CANDLES,
) -> Path:
"""GT 타점이 표시된 HTML 차트를 생성한다.
대용량(3분봉 2년 등)은 종가 라인으로 전체 기간을 표시하고,
JSON 데이터는 별도 파일로 분리한다.
Args:
db_path: SQLite 경로.
symbol: 코인 심볼.
gt_result: build_ground_truth 결과.
output_path: HTML 출력 경로.
chart_lookback_days: 차트에 표시할 일수. None이면 GT lookback과 동일.
max_candles: 0이면 전체, 양수면 최근 N봉만.
Returns:
HTML 저장 경로.
"""
interval_min = gt_result["meta"]["interval_min"]
gt_lookback_days = gt_result["meta"]["lookback_days"]
chart_days = chart_lookback_days if chart_lookback_days is not None else gt_lookback_days
df = load_candles(db_path, symbol, interval_min, lookback_days=chart_days)
if max_candles > 0 and len(df) > max_candles:
df = df.iloc[-max_candles:].reset_index(drop=True)
times = (pd.to_datetime(df["datetime"]).astype("int64") // 10**9).astype(int).tolist()
buy_markers = []
sell_markers = []
for sig in gt_result.get("signals") or []:
ts = int(pd.Timestamp(sig["datetime"]).timestamp())
marker = {
"time": ts,
"price": sig["price"],
"marker_id": sig.get("marker_id", sig.get("leg_id")),
"signal_type": sig.get("signal_type", "swing_low" if sig["side"] == "buy" else "swing_high"),
}
if sig["side"] == "buy":
buy_markers.append(marker)
else:
sell_markers.append(marker)
chart_meta = {
**gt_result["meta"],
"chart_lookback_days": chart_days,
"gt_lookback_days": gt_lookback_days,
"chart_data_from": str(df["datetime"].min()),
"chart_data_to": str(df["datetime"].max()),
"gt_marker_count": len(buy_markers),
}
payload = {
"times": times,
"open": df["open"].astype(float).tolist(),
"high": df["high"].astype(float).tolist(),
"low": df["low"].astype(float).tolist(),
"close": df["close"].astype(float).tolist(),
"buy_markers": buy_markers,
"sell_markers": sell_markers,
"meta": chart_meta,
"bar_count": len(df),
}
output_path.parent.mkdir(parents=True, exist_ok=True)
data_path = _data_js_path(output_path)
with data_path.open("w", encoding="utf-8") as fp:
fp.write("window.CHART_DATA=")
json.dump(payload, fp, ensure_ascii=False, separators=(",", ":"))
fp.write(";")
output_path.write_text(_HTML_TEMPLATE, encoding="utf-8")
return output_path
_HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8" />
<title>Ground Truth Chart</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/uplot@1.6.31/dist/uPlot.min.css" />
<script src="https://cdn.jsdelivr.net/npm/uplot@1.6.31/dist/uPlot.iife.min.js"></script>
<script src="https://unpkg.com/lightweight-charts@4.2.0/dist/lightweight-charts.standalone.production.js"></script>
<script src="ground_truth_chart_data.js"></script>
<style>
body { font-family: "Malgun Gothic", Arial, sans-serif; margin: 0; background: #f5f5f5; color: #333; }
header { padding: 16px 24px; background: #fff; border-bottom: 1px solid #ddd; }
h1 { margin: 0 0 6px; font-size: 20px; }
.meta { font-size: 13px; color: #666; }
.toolbar { padding: 10px 24px; background: #fff; border-bottom: 1px solid #eee; display: flex; gap: 12px; flex-wrap: wrap; align-items: center; }
.toolbar-group { display: flex; gap: 6px; align-items: center; padding-right: 12px; border-right: 1px solid #e0e0e0; }
.toolbar-group:last-of-type { border-right: none; }
.toolbar button { padding: 6px 12px; border: 1px solid #bbb; background: #fff; cursor: pointer; border-radius: 4px; font-size: 13px; white-space: nowrap; }
.toolbar button:hover { background: #f0f4f8; }
.toolbar button.active { background: #1565c0; color: #fff; border-color: #1565c0; }
.toolbar button.home { background: #2e7d32; color: #fff; border-color: #2e7d32; font-weight: bold; }
.toolbar button.home:hover { background: #1b5e20; }
.toolbar .leg-info { font-size: 12px; color: #555; min-width: 90px; }
#status { font-size: 12px; color: #888; margin-left: auto; }
#overview { height: 480px; margin: 12px 24px; background: #fff; border: 1px solid #ddd; }
#detail-wrap { margin: 0 24px 12px; display: none; }
#detail-wrap h2 { font-size: 15px; margin: 0 0 8px; }
#detail { height: 360px; background: #fff; border: 1px solid #ddd; }
</style>
</head>
<body>
<header>
<h1 id="title">Ground Truth Chart</h1>
<div class="meta" id="meta"></div>
</header>
<div class="toolbar">
<div class="toolbar-group">
<button id="btn-home" class="home" title="전체 2년 화면으로 복귀">홈</button>
<button id="btn-prev-leg" title="이전 매수·매도 타점">◀ 이전</button>
<button id="btn-next-leg" title="다음 매수·매도 타점">다음 ▶</button>
<span class="leg-info" id="leg-info">타점 - / -</span>
</div>
<div class="toolbar-group">
<button id="btn-all" class="btn-period active">전체</button>
<button id="btn-365d" class="btn-period">1년</button>
<button id="btn-30d" class="btn-period">30일</button>
<button id="btn-7d" class="btn-period">7일</button>
<button id="btn-3d" class="btn-period">3일</button>
</div>
<div class="toolbar-group">
<button id="btn-zoom-in" title="확대">+ 확대</button>
<button id="btn-zoom-out" title="축소"> 축소</button>
<button id="btn-fit" title="현재 뷰 맞춤">맞춤</button>
</div>
<div class="toolbar-group">
<button id="btn-markers" title="매수·매도 마커 표시/숨김">마커 숨김</button>
<button id="btn-toggle-detail" title="상세 캔들 패널 표시/숨김">상세 패널</button>
</div>
<span id="status">데이터 로딩 중…</span>
</div>
<div id="overview"></div>
<div id="detail-wrap">
<h2 id="detail-title">상세 캔들</h2>
<div id="detail"></div>
</div>
<script>
let DATA = null;
let overviewPlot = null;
let detailChart = null;
let detailSeries = null;
let currentMode = "overview";
let currentLegIdx = 0;
let showMarkers = true;
let detailVisible = false;
let lastDetailStart = 0;
function fmtPrice(v) {
return Math.round(v).toLocaleString("ko-KR");
}
function updateLegInfo() {
const total = DATA.buy_markers.length;
const el = document.getElementById("leg-info");
if (!total) { el.textContent = "타점 없음"; return; }
el.textContent = `타점 ${currentLegIdx + 1} / ${total}`;
}
function drawMarkers(u, buys, sells) {
if (!showMarkers) return;
const ctx = u.ctx;
const drawOne = (m, color, up) => {
const x = u.valToPos(m.time, "x", true);
const y = u.valToPos(m.price, "y", true);
if (x < u.bbox.left || x > u.bbox.left + u.bbox.width) return;
ctx.fillStyle = color;
ctx.beginPath();
const s = 5;
if (up) {
ctx.moveTo(x, y + 10); ctx.lineTo(x - s, y + 18); ctx.lineTo(x + s, y + 18);
} else {
ctx.moveTo(x, y - 10); ctx.lineTo(x - s, y - 18); ctx.lineTo(x + s, y - 18);
}
ctx.closePath(); ctx.fill();
ctx.fillStyle = "#333";
ctx.font = "10px Malgun Gothic, Arial";
let suffix = "";
if (m.signal_type === "pullback") suffix = "*";
else if (m.signal_type === "breakout") suffix = "^";
else if (m.signal_type === "div_bull" || m.signal_type === "div_bear") suffix = "d";
const label = (up ? "B" : "S") + m.marker_id + suffix;
ctx.fillText(label, x + 6, y + (up ? 14 : -12));
};
buys.forEach(m => {
let color = "#2e7d32";
if (m.signal_type === "breakout") color = "#ef6c00";
else if (m.signal_type === "div_bull") color = "#7b1fa2";
drawOne(m, color, true);
});
sells.forEach(m => {
const color = m.signal_type === "div_bear" ? "#7b1fa2" : "#c62828";
drawOne(m, color, false);
});
}
function overviewXRange() {
if (!overviewPlot) return { min: DATA.times[0], max: DATA.times[DATA.times.length - 1] };
const s = overviewPlot.scales.x;
return { min: s.min, max: s.max };
}
function setOverviewXRange(min, max) {
const t0 = DATA.times[0];
const t1 = DATA.times[DATA.times.length - 1];
overviewPlot.setScale("x", {
min: Math.max(t0, min),
max: Math.min(t1, max),
});
}
function fitOverview() {
setOverviewXRange(DATA.times[0], DATA.times[DATA.times.length - 1]);
}
function zoomOverview(factor) {
const { min, max } = overviewXRange();
const mid = (min + max) / 2;
const half = Math.max((max - min) * factor / 2, 3600);
setOverviewXRange(mid - half, mid + half);
}
function buildOverview(keepRange) {
const prev = keepRange ? overviewXRange() : null;
if (overviewPlot) { overviewPlot.destroy(); overviewPlot = null; }
const opts = {
width: document.getElementById("overview").clientWidth,
height: 480,
scales: { x: { time: true } },
axes: [
{},
{ values: (u, vals) => vals.map(v => fmtPrice(v)) },
],
series: [
{},
{ label: "종가", stroke: "#1565c0", width: 1 },
],
cursor: { drag: { x: true, y: false, setScale: true } },
hooks: {
draw: [(u) => drawMarkers(u, DATA.buy_markers, DATA.sell_markers)],
},
};
overviewPlot = new uPlot(opts, [DATA.times, DATA.close], document.getElementById("overview"));
if (prev && keepRange) setOverviewXRange(prev.min, prev.max);
else fitOverview();
}
function sliceLastDays(days) {
const cutoff = DATA.times[DATA.times.length - 1] - days * 86400;
let start = 0;
for (let i = DATA.times.length - 1; i >= 0; i--) {
if (DATA.times[i] < cutoff) { start = i + 1; break; }
}
return { start, end: DATA.times.length };
}
function buildDetailCandles(startIdx, endIdx) {
lastDetailStart = startIdx;
const end = endIdx || DATA.times.length;
document.getElementById("detail-wrap").style.display = detailVisible ? "block" : "none";
const wrap = document.getElementById("detail");
wrap.innerHTML = "";
detailChart = LightweightCharts.createChart(wrap, {
layout: { background: { color: "#fff" }, textColor: "#333" },
grid: { vertLines: { color: "#eee" }, horzLines: { color: "#eee" } },
timeScale: { timeVisible: true, secondsVisible: false },
width: wrap.clientWidth,
height: 360,
});
detailSeries = detailChart.addCandlestickSeries({
upColor: "#c62828", downColor: "#1565c0",
borderUpColor: "#c62828", borderDownColor: "#1565c0",
wickUpColor: "#c62828", wickDownColor: "#1565c0",
});
const candles = [];
for (let i = startIdx; i < end; i++) {
candles.push({
time: DATA.times[i],
open: DATA.open[i], high: DATA.high[i],
low: DATA.low[i], close: DATA.close[i],
});
}
detailSeries.setData(candles);
const t0 = DATA.times[startIdx];
const t1 = DATA.times[end - 1];
const markers = [];
if (showMarkers) {
DATA.buy_markers.forEach(m => {
if (m.time >= t0 && m.time <= t1) markers.push({
time: m.time, position: "belowBar",
color: m.signal_type === "breakout" ? "#ef6c00"
: m.signal_type === "div_bull" ? "#7b1fa2" : "#2e7d32",
shape: "arrowUp",
text: "B" + m.marker_id + (m.signal_type === "pullback" ? "*"
: m.signal_type === "breakout" ? "^" : m.signal_type === "div_bull" ? "d" : ""),
});
});
DATA.sell_markers.forEach(m => {
if (m.time >= t0 && m.time <= t1) markers.push({
time: m.time, position: "aboveBar",
color: m.signal_type === "div_bear" ? "#7b1fa2" : "#c62828",
shape: "arrowDown",
text: "S" + m.marker_id + (m.signal_type === "div_bear" ? "d" : ""),
});
});
}
markers.sort((a, b) => a.time - b.time);
detailSeries.setMarkers(markers);
detailChart.timeScale().fitContent();
}
function setActive(btnId) {
document.querySelectorAll(".btn-period").forEach(b => b.classList.remove("active"));
const el = document.getElementById(btnId);
if (el) el.classList.add("active");
}
function goHome() {
currentMode = "overview";
setActive("btn-all");
document.getElementById("overview").style.display = "block";
if (!overviewPlot) buildOverview(false);
else fitOverview();
document.getElementById("status").textContent =
`전체 ${DATA.bar_count.toLocaleString()}봉 | 드래그=줌, 더블클릭=리셋`;
}
function nearestSellAfter(buyTime) {
let best = null;
for (const s of DATA.sell_markers) {
if (s.time >= buyTime && (!best || s.time < best.time)) best = s;
}
return best || DATA.sell_markers[DATA.sell_markers.length - 1];
}
function jumpToLeg(idx) {
const total = DATA.buy_markers.length;
if (!total) return;
currentLegIdx = Math.max(0, Math.min(idx, total - 1));
updateLegInfo();
const buy = DATA.buy_markers[currentLegIdx];
const sell = nearestSellAfter(buy.time);
const span = sell ? Math.max(sell.time - buy.time, 86400) : 86400 * 3;
const pad = span * 0.4;
const vmin = buy.time - pad;
const vmax = (sell ? sell.time : buy.time) + pad;
currentMode = "overview";
setActive("btn-all");
document.getElementById("overview").style.display = "block";
if (!overviewPlot) buildOverview(false);
setOverviewXRange(vmin, vmax);
let start = 0;
for (let i = 0; i < DATA.times.length; i++) {
if (DATA.times[i] >= vmin) { start = i; break; }
}
let end = DATA.times.length;
for (let i = DATA.times.length - 1; i >= 0; i--) {
if (DATA.times[i] <= vmax) { end = i + 1; break; }
}
const buyLabel = buy.signal_type === "pullback" ? "눌림목 매수"
: buy.signal_type === "breakout" ? "돌파 매수"
: buy.signal_type === "div_bull" ? "다이버전스 매수" : "스윙 매수";
document.getElementById("detail-title").textContent =
`B${buy.marker_id} ${buyLabel} — ${new Date(buy.time * 1000).toLocaleString("ko-KR")}`;
buildDetailCandles(start, end);
const sellText = sell ? ` → 매도 ${fmtPrice(sell.price)}` : "";
document.getElementById("status").textContent =
`B${buy.marker_id} ${buyLabel} ${fmtPrice(buy.price)}${sellText}`;
}
function showPeriod(days, btnId, label) {
currentMode = "detail";
setActive(btnId);
detailVisible = true;
document.getElementById("btn-toggle-detail").textContent = "상세 숨김";
const { start } = sliceLastDays(days);
document.getElementById("detail-title").textContent =
`${label} 3분봉 캔들 (${(DATA.times.length - start).toLocaleString()}봉)`;
buildDetailCandles(start);
document.getElementById("overview").style.display = "block";
if (!overviewPlot) buildOverview(false);
const t0 = DATA.times[start];
setOverviewXRange(t0, DATA.times[DATA.times.length - 1]);
document.getElementById("status").textContent = `${label} 구간 표시`;
}
function applyZoom(factor) {
if (currentMode === "detail" && detailChart) {
const ts = detailChart.timeScale();
const r = ts.getVisibleLogicalRange();
if (!r) return;
const mid = (r.from + r.to) / 2;
const half = Math.max((r.to - r.from) * factor / 2, 10);
ts.setVisibleLogicalRange({ from: mid - half, to: mid + half });
} else if (overviewPlot) {
zoomOverview(factor);
}
}
function applyFit() {
if (currentMode === "detail" && detailChart) {
detailChart.timeScale().fitContent();
} else if (overviewPlot) {
fitOverview();
}
}
function init() {
DATA = window.CHART_DATA;
if (!DATA) throw new Error("ground_truth_chart_data.js 없음");
const m = DATA.meta;
const chartDays = m.chart_lookback_days || m.lookback_days;
const gtDays = m.gt_lookback_days || m.lookback_days;
const chartLabel = chartDays >= 365 ? `${Math.round(chartDays / 365)}년` : `${chartDays}일`;
const gtLabel = gtDays >= 365 ? `${Math.round(gtDays / 365)}년` : `${gtDays}일`;
document.getElementById("title").textContent =
`${m.symbol} Ground Truth (${m.interval_label}) — 차트 ${chartLabel} / GT ${gtLabel}`;
document.getElementById("btn-all").textContent = `전체 ${chartLabel}`;
const chartFrom = m.chart_data_from || m.data_from;
const chartTo = m.chart_data_to || m.data_to;
document.getElementById("meta").textContent =
`차트 ${chartFrom} ~ ${chartTo} (${DATA.bar_count.toLocaleString()}봉) | 매수 ${DATA.buy_markers.length} / 매도 ${DATA.sell_markers.length} (${gtLabel}) | B*=눌림 B^=돌파 Bd/Sd=다이버전스`;
updateLegInfo();
document.getElementById("status").textContent =
`전체 ${DATA.bar_count.toLocaleString()}봉 | 드래그=줌, 더블클릭=리셋`;
buildOverview(false);
document.getElementById("btn-home").onclick = goHome;
document.getElementById("btn-prev-leg").onclick = () => jumpToLeg(currentLegIdx - 1);
document.getElementById("btn-next-leg").onclick = () => jumpToLeg(currentLegIdx + 1);
document.getElementById("btn-all").onclick = goHome;
document.getElementById("btn-365d").onclick = () => showPeriod(365, "btn-365d", "최근 1년");
document.getElementById("btn-30d").onclick = () => showPeriod(30, "btn-30d", "최근 30일");
document.getElementById("btn-7d").onclick = () => showPeriod(7, "btn-7d", "최근 7일");
document.getElementById("btn-3d").onclick = () => showPeriod(3, "btn-3d", "최근 3일");
document.getElementById("btn-zoom-in").onclick = () => applyZoom(0.6);
document.getElementById("btn-zoom-out").onclick = () => applyZoom(1.4);
document.getElementById("btn-fit").onclick = applyFit;
document.getElementById("btn-markers").onclick = () => {
showMarkers = !showMarkers;
document.getElementById("btn-markers").textContent = showMarkers ? "마커 숨김" : "마커 표시";
if (overviewPlot) buildOverview(true);
if (detailChart) buildDetailCandles(lastDetailStart);
};
document.getElementById("btn-toggle-detail").onclick = () => {
detailVisible = !detailVisible;
document.getElementById("detail-wrap").style.display = detailVisible ? "block" : "none";
document.getElementById("btn-toggle-detail").textContent = detailVisible ? "상세 숨김" : "상세 패널";
if (detailVisible && !detailChart) {
const { start } = sliceLastDays(7);
buildDetailCandles(start);
}
};
document.getElementById("overview").addEventListener("dblclick", () => {
if (currentMode === "overview") fitOverview();
});
window.addEventListener("resize", () => {
if (overviewPlot) buildOverview(true);
});
}
try { init(); } catch (err) {
document.getElementById("status").textContent = "데이터 로드 실패: " + err;
}
</script>
</body>
</html>"""

View File

@@ -0,0 +1,303 @@
"""RSI·MACD 다이버전스 매수·매도 타점 (Ground Truth, 사후 검증)."""
from __future__ import annotations
from dataclasses import dataclass
import pandas as pd
from deepcoin.techniques.indicators import macd, rsi
@dataclass(frozen=True)
class DivergenceSignal:
"""다이버전스 신호."""
side: str # buy | sell
bar_index: int
price: float
datetime: pd.Timestamp
indicator: str # rsi | macd_hist
price_prev: float
price_curr: float
ind_prev: float
ind_curr: float
def find_divergence_signals(
df: pd.DataFrame,
local_order: int = 15,
min_bars_between: int = 100,
max_pair_lookback_bars: int = 5000,
rsi_period: int = 14,
min_rsi_diff: float = 2.0,
min_macd_hist_diff: float = 0.0,
min_price_move_pct: float = 1.5,
future_bars: int = 2000,
min_future_move_pct: float = 2.0,
) -> tuple[list[DivergenceSignal], list[DivergenceSignal]]:
"""가격·지표 다이버전스 매수·매도 후보를 찾는다.
- 상승 다이버전스(매수): 가격 LL + RSI/MACD HL
- 하락 다이버전스(매도): 가격 HH + RSI/MACD LH
- 미래 데이터로 이후 유의미한 반등·하락을 사후 검증
Args:
df: OHLCV DataFrame.
local_order: 국소 극값 반경(봉).
min_bars_between: 연속 다이버전스 최소 간격(봉).
max_pair_lookback_bars: 비교할 이전 극값 최대 거리(봉).
rsi_period: RSI 기간.
min_rsi_diff: RSI 다이버전스 최소 차이(포인트).
min_macd_hist_diff: MACD 히스토그램 최소 차이.
min_price_move_pct: 극값 간 최소 가격 변동(%).
future_bars: 사후 검증 구간(봉).
min_future_move_pct: 사후 최소 가격 변동(%).
Returns:
(매수 신호, 매도 신호) 리스트.
"""
if len(df) < local_order * 4 + rsi_period:
return [], []
close = df["close"].astype(float)
low = df["low"].astype(float)
high = df["high"].astype(float)
rsi_vals = rsi(close, period=rsi_period)
_, _, macd_hist = macd(close)
lows = _find_local_extrema(df, low, rsi_vals, macd_hist, local_order, "low")
highs = _find_local_extrema(df, high, rsi_vals, macd_hist, local_order, "high")
bull_rsi = _detect_bullish(
df, lows, rsi_vals, "rsi", min_rsi_diff, min_price_move_pct,
max_pair_lookback_bars, future_bars, min_future_move_pct, high,
)
bear_rsi = _detect_bearish(
df, highs, rsi_vals, "rsi", min_rsi_diff, min_price_move_pct,
max_pair_lookback_bars, future_bars, min_future_move_pct, low,
)
bull_macd: list[DivergenceSignal] = []
bear_macd: list[DivergenceSignal] = []
if min_macd_hist_diff > 0:
bull_macd = _detect_bullish(
df, lows, macd_hist, "macd_hist", min_macd_hist_diff, min_price_move_pct,
max_pair_lookback_bars, future_bars, min_future_move_pct, high,
)
bear_macd = _detect_bearish(
df, highs, macd_hist, "macd_hist", min_macd_hist_diff, min_price_move_pct,
max_pair_lookback_bars, future_bars, min_future_move_pct, low,
)
buys = _dedupe_signals(
_merge_same_bar(bull_rsi + bull_macd), min_bars_between, prefer_lower=True
)
sells = _dedupe_signals(
_merge_same_bar(bear_rsi + bear_macd), min_bars_between, prefer_lower=False
)
return buys, sells
@dataclass
class _ExtremePoint:
"""국소 극값."""
bar_index: int
price: float
rsi: float
macd_hist: float
datetime: pd.Timestamp
def _find_local_extrema(
df: pd.DataFrame,
series: pd.Series,
rsi_vals: pd.Series,
macd_hist: pd.Series,
order: int,
side: str,
) -> list[_ExtremePoint]:
"""국소 저점·고점을 수집한다."""
points: list[_ExtremePoint] = []
values = series.values
for i in range(order, len(df) - order):
window = values[i - order : i + order + 1]
val = float(values[i])
if side == "low" and val > window.min():
continue
if side == "high" and val < window.max():
continue
if pd.isna(rsi_vals.iloc[i]) or pd.isna(macd_hist.iloc[i]):
continue
points.append(
_ExtremePoint(
bar_index=i,
price=val,
rsi=float(rsi_vals.iloc[i]),
macd_hist=float(macd_hist.iloc[i]),
datetime=pd.Timestamp(df.iloc[i]["datetime"]),
)
)
return points
def _detect_bullish(
df: pd.DataFrame,
lows: list[_ExtremePoint],
indicator: pd.Series,
indicator_name: str,
min_ind_diff: float,
min_price_move_pct: float,
max_lookback: int,
future_bars: int,
min_future_move_pct: float,
highs,
) -> list[DivergenceSignal]:
"""상승 다이버전스 매수를 탐지한다."""
signals: list[DivergenceSignal] = []
if len(lows) < 2:
return signals
for idx in range(1, len(lows)):
curr = lows[idx]
for prev in reversed(lows[:idx]):
if curr.bar_index - prev.bar_index > max_lookback:
break
if curr.bar_index - prev.bar_index < 20:
continue
price_move = (prev.price - curr.price) / prev.price * 100.0
if price_move < min_price_move_pct:
continue
ind_prev = float(indicator.iloc[prev.bar_index])
ind_curr = float(indicator.iloc[curr.bar_index])
if pd.isna(ind_prev) or pd.isna(ind_curr):
continue
if not (curr.price < prev.price and ind_curr > ind_prev + min_ind_diff):
continue
end = min(len(df), curr.bar_index + future_bars + 1)
future_high = float(highs[curr.bar_index:end].max())
rally = (future_high - curr.price) / curr.price * 100.0
if rally < min_future_move_pct:
continue
signals.append(
DivergenceSignal(
side="buy",
bar_index=curr.bar_index,
price=round(curr.price, 2),
datetime=curr.datetime,
indicator=indicator_name,
price_prev=round(prev.price, 2),
price_curr=round(curr.price, 2),
ind_prev=round(ind_prev, 4),
ind_curr=round(ind_curr, 4),
)
)
break
return signals
def _detect_bearish(
df: pd.DataFrame,
highs: list[_ExtremePoint],
indicator: pd.Series,
indicator_name: str,
min_ind_diff: float,
min_price_move_pct: float,
max_lookback: int,
future_bars: int,
min_future_move_pct: float,
lows,
) -> list[DivergenceSignal]:
"""하락 다이버전스 매도를 탐지한다."""
signals: list[DivergenceSignal] = []
if len(highs) < 2:
return signals
for idx in range(1, len(highs)):
curr = highs[idx]
for prev in reversed(highs[:idx]):
if curr.bar_index - prev.bar_index > max_lookback:
break
if curr.bar_index - prev.bar_index < 20:
continue
price_move = (curr.price - prev.price) / prev.price * 100.0
if price_move < min_price_move_pct:
continue
ind_prev = float(indicator.iloc[prev.bar_index])
ind_curr = float(indicator.iloc[curr.bar_index])
if pd.isna(ind_prev) or pd.isna(ind_curr):
continue
if not (curr.price > prev.price and ind_curr < ind_prev - min_ind_diff):
continue
end = min(len(df), curr.bar_index + future_bars + 1)
future_low = float(lows[curr.bar_index:end].min())
drop = (curr.price - future_low) / curr.price * 100.0
if drop < min_future_move_pct:
continue
signals.append(
DivergenceSignal(
side="sell",
bar_index=curr.bar_index,
price=round(curr.price, 2),
datetime=curr.datetime,
indicator=indicator_name,
price_prev=round(prev.price, 2),
price_curr=round(curr.price, 2),
ind_prev=round(ind_prev, 4),
ind_curr=round(ind_curr, 4),
)
)
break
return signals
def _merge_same_bar(signals: list[DivergenceSignal]) -> list[DivergenceSignal]:
"""동일 봉·동일 방향 신호를 하나로 합친다."""
by_bar: dict[int, DivergenceSignal] = {}
for signal in signals:
prev = by_bar.get(signal.bar_index)
if prev is None:
by_bar[signal.bar_index] = signal
continue
prev_diff = abs(prev.ind_curr - prev.ind_prev)
curr_diff = abs(signal.ind_curr - signal.ind_prev)
if curr_diff > prev_diff:
by_bar[signal.bar_index] = signal
return sorted(by_bar.values(), key=lambda s: s.bar_index)
def _dedupe_signals(
signals: list[DivergenceSignal],
min_bars: int,
prefer_lower: bool,
) -> list[DivergenceSignal]:
"""근접 신호를 병합한다."""
if not signals:
return []
sorted_signals = sorted(signals, key=lambda s: s.bar_index)
merged: list[DivergenceSignal] = [sorted_signals[0]]
for signal in sorted_signals[1:]:
last = merged[-1]
if signal.bar_index - last.bar_index < min_bars:
if prefer_lower and signal.price < last.price:
merged[-1] = signal
elif not prefer_lower and signal.price > last.price:
merged[-1] = signal
else:
merged.append(signal)
return merged

View File

@@ -0,0 +1,375 @@
"""Ground Truth 매수·매도 타점 생성 (1단계 벤치마크)."""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import pandas as pd
from deepcoin.data.candle_loader import load_candles
from deepcoin.data.intervals import interval_label
from deepcoin.ground_truth.pnl import simulate_gt_pnl
from deepcoin.ground_truth.breakout import find_breakout_buy_pivots
from deepcoin.ground_truth.divergence import find_divergence_signals
from deepcoin.ground_truth.pullback import find_pullback_buy_pivots
from deepcoin.ground_truth.zigzag import Pivot, find_zigzag_pivots
@dataclass(frozen=True)
class GtParams:
"""Ground Truth 생성 파라미터."""
interval_min: int
lookback_days: int
zigzag_reversal_pct: float
min_leg_pct: float
pullback_min_pct: float = 1.5
pullback_local_order: int = 10
breakout_buffer_pct: float = 0.1
breakout_consolidation_bars: int = 200
breakout_min_rally_pct: float = 2.0
div_local_order: int = 20
div_min_bars_between: int = 1500
div_min_rsi_diff: float = 5.0
div_min_future_move_pct: float = 4.0
@dataclass
class GtLeg:
"""매수→매도 1레그 (최대 스윙 수익 구간)."""
leg_id: int
buy_datetime: str
buy_price: float
buy_bar_index: int
sell_datetime: str
sell_price: float
sell_bar_index: int
leg_pct: float
bars_held: int
def build_ground_truth(
db_path: Path,
symbol: str,
coin_name: str,
params: GtParams,
initial_cash_krw: float = 400_000.0,
fee_rate: float = 0.0005,
) -> dict[str, Any]:
"""최근 1년 구간에서 사후 최적 스윙 레그(1매수·1매도) GT를 생성한다.
미래 데이터를 사용해 ZigZag 스윙 저점 매수·고점 매도 쌍을 찾는다.
1단계 벤치마크: 최대 스윙 수익을 포착하는 타점.
Args:
db_path: SQLite 경로.
symbol: 코인 심볼.
coin_name: 코인 이름.
params: GT 파라미터.
initial_cash_krw: 수익률 계산 초기 자본 (1년 시작 시점).
fee_rate: 거래 수수료율.
Returns:
JSON 직렬화 가능한 GT 결과 dict.
"""
df = load_candles(
db_path=db_path,
symbol=symbol,
interval_min=params.interval_min,
lookback_days=params.lookback_days,
)
pivots = find_zigzag_pivots(df, reversal_pct=params.zigzag_reversal_pct)
legs = _pivots_to_legs(pivots, min_leg_pct=params.min_leg_pct)
leg_dicts = [asdict(leg) for leg in legs]
pullback_buys = find_pullback_buy_pivots(
df,
legs=legs,
min_pullback_pct=params.pullback_min_pct,
local_order=params.pullback_local_order,
)
breakout_buys = find_breakout_buy_pivots(
df,
legs=legs,
pullback_buys=pullback_buys,
breakout_buffer_pct=params.breakout_buffer_pct,
consolidation_bars=params.breakout_consolidation_bars,
min_rally_to_sell_pct=params.breakout_min_rally_pct,
)
div_buys, div_sells = find_divergence_signals(
df,
local_order=params.div_local_order,
min_bars_between=params.div_min_bars_between,
min_rsi_diff=params.div_min_rsi_diff,
min_future_move_pct=params.div_min_future_move_pct,
)
signals = _build_signals(legs, pullback_buys, breakout_buys, div_buys, div_sells)
summary = _summarize(legs, signals)
pnl = simulate_gt_pnl(leg_dicts, initial_cash_krw=initial_cash_krw, fee_rate=fee_rate)
return {
"meta": {
"symbol": symbol.upper(),
"coin_name": coin_name,
"interval_min": params.interval_min,
"interval_label": interval_label(params.interval_min),
"lookback_days": params.lookback_days,
"mode": "optimal_swing_legs_with_pullback_breakout_divergence",
"zigzag_reversal_pct": params.zigzag_reversal_pct,
"min_leg_pct": params.min_leg_pct,
"pullback_min_pct": params.pullback_min_pct,
"initial_cash_krw": initial_cash_krw,
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"data_from": str(df["datetime"].min()),
"data_to": str(df["datetime"].max()),
"bar_count": len(df),
"pivot_count": len(pivots),
"pullback_buy_count": len(pullback_buys),
"breakout_buy_count": len(breakout_buys),
"breakout_buffer_pct": params.breakout_buffer_pct,
"divergence_buy_count": len(div_buys),
"divergence_sell_count": len(div_sells),
},
"legs": leg_dicts,
"signals": signals,
"summary": summary,
"pnl": pnl,
}
def save_ground_truth(result: dict[str, Any], output_path: Path) -> Path:
"""GT 결과를 JSON으로 저장한다."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as fp:
json.dump(result, fp, ensure_ascii=False, indent=2)
return output_path
def _pivots_to_legs(pivots: list[Pivot], min_leg_pct: float) -> list[GtLeg]:
"""스윙 저점→고점을 1매수·1매도 레그로 변환한다."""
legs: list[GtLeg] = []
leg_id = 0
i = 0
while i < len(pivots) - 1:
buy_pivot = pivots[i]
sell_pivot = pivots[i + 1]
if buy_pivot.side != "low" or sell_pivot.side != "high":
i += 1
continue
if sell_pivot.bar_index <= buy_pivot.bar_index:
i += 1
continue
leg_pct = (sell_pivot.price - buy_pivot.price) / buy_pivot.price * 100.0
if leg_pct < min_leg_pct:
i += 1
continue
leg_id += 1
legs.append(
GtLeg(
leg_id=leg_id,
buy_datetime=buy_pivot.datetime.strftime("%Y-%m-%d %H:%M:%S"),
buy_price=round(buy_pivot.price, 2),
buy_bar_index=buy_pivot.bar_index,
sell_datetime=sell_pivot.datetime.strftime("%Y-%m-%d %H:%M:%S"),
sell_price=round(sell_pivot.price, 2),
sell_bar_index=sell_pivot.bar_index,
leg_pct=round(leg_pct, 2),
bars_held=sell_pivot.bar_index - buy_pivot.bar_index,
)
)
i += 2
return legs
def _build_signals(
legs: list[GtLeg],
pullback_buys: list[Pivot],
breakout_buys: list,
div_buys: list,
div_sells: list,
) -> list[dict[str, Any]]:
"""스윙·눌림목·돌파·다이버전스 신호를 통합한다."""
signals: list[dict[str, Any]] = []
buy_marker_id = 0
sell_marker_id = 0
existing_buy_bars: set[int] = {leg.buy_bar_index for leg in legs}
existing_sell_bars: set[int] = {leg.sell_bar_index for leg in legs}
nearby_tolerance = 120
for leg in legs:
buy_marker_id += 1
signals.append(
{
"marker_id": buy_marker_id,
"leg_id": leg.leg_id,
"side": "buy",
"signal_type": "swing_low",
"datetime": leg.buy_datetime,
"price": leg.buy_price,
"bar_index": leg.buy_bar_index,
}
)
sell_marker_id += 1
existing_sell_bars.add(leg.sell_bar_index)
signals.append(
{
"marker_id": sell_marker_id,
"leg_id": leg.leg_id,
"side": "sell",
"signal_type": "swing_high",
"datetime": leg.sell_datetime,
"price": leg.sell_price,
"bar_index": leg.sell_bar_index,
"leg_pct": leg.leg_pct,
}
)
for pivot in pullback_buys:
if _is_near_existing_buy(pivot.bar_index, existing_buy_bars, nearby_tolerance):
continue
buy_marker_id += 1
existing_buy_bars.add(pivot.bar_index)
signals.append(
{
"marker_id": buy_marker_id,
"leg_id": None,
"side": "buy",
"signal_type": "pullback",
"datetime": pivot.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"price": round(pivot.price, 2),
"bar_index": pivot.bar_index,
}
)
for breakout in breakout_buys:
if _is_near_existing_buy(breakout.bar_index, existing_buy_bars, nearby_tolerance):
continue
buy_marker_id += 1
existing_buy_bars.add(breakout.bar_index)
signals.append(
{
"marker_id": buy_marker_id,
"leg_id": breakout.leg_id,
"side": "buy",
"signal_type": "breakout",
"datetime": breakout.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"price": breakout.price,
"bar_index": breakout.bar_index,
"resistance_price": breakout.resistance_price,
}
)
div_tolerance = 400
for div in div_buys:
if _is_near_bar(div.bar_index, existing_buy_bars, div_tolerance):
continue
buy_marker_id += 1
existing_buy_bars.add(div.bar_index)
signals.append(_divergence_to_dict(div, buy_marker_id, "div_bull"))
for div in div_sells:
if _is_near_bar(div.bar_index, existing_sell_bars, div_tolerance):
continue
sell_marker_id += 1
existing_sell_bars.add(div.bar_index)
signals.append(_divergence_to_dict(div, sell_marker_id, "div_bear"))
signals.sort(key=lambda s: (s["bar_index"], _signal_sort_key(s)))
return signals
def _divergence_to_dict(div, marker_id: int, signal_type: str) -> dict[str, Any]:
"""DivergenceSignal을 GT signal dict로 변환한다."""
return {
"marker_id": marker_id,
"leg_id": None,
"side": div.side,
"signal_type": signal_type,
"datetime": div.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"price": div.price,
"bar_index": div.bar_index,
"indicator": div.indicator,
"price_prev": div.price_prev,
"ind_prev": div.ind_prev,
"ind_curr": div.ind_curr,
}
def _signal_sort_key(signal: dict[str, Any]) -> int:
"""동일 봉에서 신호 유형 정렬 우선순위."""
order = {
"swing_low": 0,
"pullback": 1,
"breakout": 2,
"div_bull": 3,
"swing_high": 4,
"div_bear": 5,
}
return order.get(signal.get("signal_type", ""), 9)
def _is_near_bar(bar_index: int, existing_bars: set[int], tolerance: int) -> bool:
"""기존 타점과 너무 가까우면 보조 신호를 제외한다."""
for existing in existing_bars:
if abs(bar_index - existing) <= tolerance:
return True
return False
def _is_near_existing_buy(bar_index: int, existing_bars: set[int], tolerance: int) -> bool:
"""기존 매수와 너무 가까우면 보조 매수를 제외한다."""
return _is_near_bar(bar_index, existing_bars, tolerance)
def _summarize(legs: list[GtLeg], signals: list[dict[str, Any]]) -> dict[str, Any]:
"""GT 요약 통계."""
buy_count = sum(1 for s in signals if s["side"] == "buy")
sell_count = sum(1 for s in signals if s["side"] == "sell")
pullback_count = sum(1 for s in signals if s.get("signal_type") == "pullback")
breakout_count = sum(1 for s in signals if s.get("signal_type") == "breakout")
div_buy_count = sum(1 for s in signals if s.get("signal_type") == "div_bull")
div_sell_count = sum(1 for s in signals if s.get("signal_type") == "div_bear")
if not legs:
return {
"leg_count": 0,
"buy_count": buy_count,
"sell_count": sell_count,
"pullback_buy_count": pullback_count,
"breakout_buy_count": breakout_count,
"divergence_buy_count": div_buy_count,
"divergence_sell_count": div_sell_count,
"avg_leg_pct": 0.0,
"median_leg_pct": 0.0,
"max_leg_pct": 0.0,
"min_leg_pct": 0.0,
"avg_bars_held": 0.0,
}
pcts = [leg.leg_pct for leg in legs]
bars = [leg.bars_held for leg in legs]
return {
"leg_count": len(legs),
"buy_count": buy_count,
"sell_count": sell_count,
"pullback_buy_count": pullback_count,
"breakout_buy_count": breakout_count,
"divergence_buy_count": div_buy_count,
"divergence_sell_count": div_sell_count,
"avg_leg_pct": round(sum(pcts) / len(pcts), 2),
"median_leg_pct": round(float(pd.Series(pcts).median()), 2),
"max_leg_pct": round(max(pcts), 2),
"min_leg_pct": round(min(pcts), 2),
"avg_bars_held": round(sum(bars) / len(bars), 1),
}

View File

@@ -0,0 +1,101 @@
"""Ground Truth 기준 초기 자본 누적 수익률 계산."""
from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any
@dataclass
class LegPnl:
"""레그별 손익 (매수→매도 1쌍)."""
leg_id: int
buy_datetime: str
sell_datetime: str
buy_price: float
sell_price: float
cash_before: float
cash_after: float
leg_return_pct: float
cumulative_return_pct: float
btc_qty: float
def simulate_gt_pnl(
legs: list[dict[str, Any]],
initial_cash_krw: float = 400_000.0,
fee_rate: float = 0.0005,
min_order_krw: float = 5_000.0,
) -> dict[str, Any]:
"""GT 레그(1매수·1매도)를 순서대로 실행한 누적 수익률.
- 초기 현금 전액 매수 → 전량 매도 반복 (복리)
- 1단계 Ground Truth 벤치마크용
Args:
legs: GT legs 리스트.
initial_cash_krw: 초기 원화.
fee_rate: 편도 수수료율.
min_order_krw: 최소 주문 금액.
Returns:
요약 + 레그별 손익 dict.
"""
cash = float(initial_cash_krw)
leg_pnls: list[LegPnl] = []
skipped = 0
for leg in legs:
buy_price = float(leg["buy_price"])
sell_price = float(leg["sell_price"])
cash_before = cash
if cash < min_order_krw:
skipped += 1
continue
buy_fee = cash * fee_rate
btc_bought = (cash - buy_fee) / buy_price
cash = 0.0
sell_gross = btc_bought * sell_price
sell_fee = sell_gross * fee_rate
cash = sell_gross - sell_fee
leg_return = (cash - cash_before) / cash_before * 100.0
cumulative = (cash - initial_cash_krw) / initial_cash_krw * 100.0
leg_pnls.append(
LegPnl(
leg_id=int(leg["leg_id"]),
buy_datetime=leg["buy_datetime"],
sell_datetime=leg["sell_datetime"],
buy_price=buy_price,
sell_price=sell_price,
cash_before=round(cash_before, 0),
cash_after=round(cash, 0),
leg_return_pct=round(leg_return, 2),
cumulative_return_pct=round(cumulative, 2),
btc_qty=round(btc_bought, 8),
)
)
final_cash = cash
total_return_pct = (final_cash - initial_cash_krw) / initial_cash_krw * 100.0
period_from = leg_pnls[0].buy_datetime if leg_pnls else None
period_to = leg_pnls[-1].sell_datetime if leg_pnls else None
return {
"initial_cash_krw": initial_cash_krw,
"final_cash_krw": round(final_cash, 0),
"total_pnl_krw": round(final_cash - initial_cash_krw, 0),
"total_return_pct": round(total_return_pct, 2),
"fee_rate": fee_rate,
"legs_traded": len(leg_pnls),
"legs_skipped": skipped,
"period_from": period_from,
"period_to": period_to,
"leg_pnls": [asdict(x) for x in leg_pnls],
}

View File

@@ -0,0 +1,118 @@
"""상승 추세 중 눌림목 매수 타점 (Ground Truth 보조)."""
from __future__ import annotations
import pandas as pd
from typing import Protocol
from deepcoin.ground_truth.zigzag import Pivot
class _LegLike(Protocol):
"""레그 최소 인터페이스."""
buy_bar_index: int
sell_bar_index: int
def find_pullback_buy_pivots(
df: pd.DataFrame,
legs: list[_LegLike],
min_pullback_pct: float = 1.5,
local_order: int = 10,
swing_near_bars: int = 120,
leg_end_margin_bars: int = 100,
) -> list[Pivot]:
"""각 스윙 레그 구간에서 돌파 직전 최적 눌림목 1개를 찾는다 (1단계 GT).
빨간 원 유형(9/27, 7/8 등) — 스윙 매수 후·매도 전 구간의
가장 깊은 되돌림 저점을 레그당 1개만 선택한다.
Args:
df: OHLCV DataFrame.
legs: ZigZag 스윙 레그.
min_pullback_pct: 직전 고점 대비 최소 되돌림(%).
local_order: 국소 저점 판별 반경(봉).
swing_near_bars: 스윙 매수 직후 구간 제외(봉).
leg_end_margin_bars: 매도 직전 구간 제외(봉).
Returns:
시간순 Pivot(low) 리스트.
"""
if not legs or len(df) < local_order * 2 + 10:
return []
highs = df["high"].astype(float).values
lows = df["low"].astype(float).values
pullbacks: list[Pivot] = []
for leg in legs:
full_start = leg.buy_bar_index + swing_near_bars
full_end = leg.sell_bar_index - leg_end_margin_bars
if full_end <= full_start + local_order * 2:
continue
leg_len = leg.sell_bar_index - leg.buy_bar_index
pre_breakout_start = leg.sell_bar_index - int(leg_len * 0.35)
start = max(full_start, pre_breakout_start)
end = full_end
best_idx = _best_pullback_in_range(
df, lows, highs, leg.buy_bar_index, start, end, local_order, min_pullback_pct
)
if best_idx is None:
best_idx = _best_pullback_in_range(
df, lows, highs, leg.buy_bar_index, full_start, full_end, local_order, min_pullback_pct
)
if best_idx is None:
continue
pullbacks.append(
Pivot(
bar_index=best_idx,
side="low",
price=float(lows[best_idx]),
datetime=pd.Timestamp(df.iloc[best_idx]["datetime"]),
)
)
return pullbacks
def _best_pullback_in_range(
df: pd.DataFrame,
lows,
highs,
buy_bar_index: int,
start: int,
end: int,
local_order: int,
min_pullback_pct: float,
) -> int | None:
"""구간 내 최대 되돌림 국소 저점 bar_index를 반환한다."""
if end <= start + local_order * 2:
return None
best_idx: int | None = None
best_price = float("inf")
for i in range(max(start, local_order), min(end, len(df) - local_order)):
low_val = float(lows[i])
window = lows[i - local_order : i + local_order + 1]
if low_val > float(window.min()):
continue
region_high = float(highs[buy_bar_index : i + 1].max())
if region_high <= 0:
continue
pullback_pct = (region_high - low_val) / region_high * 100.0
if pullback_pct < min_pullback_pct:
continue
if low_val < best_price:
best_price = low_val
best_idx = i
return best_idx

View File

@@ -0,0 +1,173 @@
"""독립 매수·매도 스윙 타점 탐지 (레그 1:1 강제 없음)."""
from __future__ import annotations
from dataclasses import dataclass
import pandas as pd
from deepcoin.ground_truth.zigzag import Pivot, find_zigzag_pivots
@dataclass(frozen=True)
class SwingSignal:
"""매수 또는 매도 타점."""
signal_id: int
side: str # buy | sell
bar_index: int
price: float
datetime: pd.Timestamp
source: str # minor_zigzag | local_extrema
swing_pct: float
def find_swing_signals(
df: pd.DataFrame,
minor_reversal_pct: float = 2.5,
local_order: int = 20,
min_swing_pct: float = 2.0,
min_bars_between: int = 30,
) -> tuple[list[SwingSignal], list[SwingSignal]]:
"""적절한 매수·매도 타점을 독립적으로 찾는다.
- 소형 ZigZag(되돌림 2.5% 등): 눌림목·반등 고점
- 국소 극값: 횡보 후 돌파 전 저점(빨간 원 구간 유형) 보완
- 매수·매도를 1:1 레그로 묶지 않음
Args:
df: OHLCV DataFrame.
minor_reversal_pct: 소형 ZigZag 되돌림 %.
local_order: 국소 극값 탐지 반경(봉).
min_swing_pct: 국소 극값 인정 최소 변동 %.
min_bars_between: 동일 방향 신호 최소 간격(봉).
Returns:
(매수 신호 리스트, 매도 신호 리스트) — 각각 시간순.
"""
minor_pivots = find_zigzag_pivots(df, reversal_pct=minor_reversal_pct)
local_lows = _find_local_lows(df, order=local_order, min_swing_pct=min_swing_pct)
local_highs = _find_local_highs(df, order=local_order, min_swing_pct=min_swing_pct)
buy_candidates = [p for p in minor_pivots if p.side == "low"]
sell_candidates = [p for p in minor_pivots if p.side == "high"]
buy_candidates.extend(local_lows)
sell_candidates.extend(local_highs)
buys = _to_signals(_dedupe_pivots(buy_candidates, min_bars_between, prefer="low"), "buy")
sells = _to_signals(_dedupe_pivots(sell_candidates, min_bars_between, prefer="high"), "sell")
return buys, sells
def _find_local_lows(
df: pd.DataFrame,
order: int,
min_swing_pct: float,
) -> list[Pivot]:
"""최근 고점 대비 되돌림 후 형성된 국소 저점."""
pivots: list[Pivot] = []
lookback = order * 4
for i in range(order, len(df) - order):
low_val = float(df.iloc[i]["low"])
window = df["low"].iloc[i - order : i + order + 1]
if low_val > window.min():
continue
start = max(0, i - lookback)
recent_high = float(df["high"].iloc[start : i + 1].max())
if recent_high <= 0:
continue
drop_pct = (recent_high - low_val) / recent_high * 100.0
if drop_pct < min_swing_pct:
continue
pivots.append(
Pivot(
bar_index=i,
side="low",
price=low_val,
datetime=pd.Timestamp(df.iloc[i]["datetime"]),
)
)
return pivots
def _find_local_highs(
df: pd.DataFrame,
order: int,
min_swing_pct: float,
) -> list[Pivot]:
"""최근 저점 대비 반등 후 형성된 국소 고점."""
pivots: list[Pivot] = []
lookback = order * 4
for i in range(order, len(df) - order):
high_val = float(df.iloc[i]["high"])
window = df["high"].iloc[i - order : i + order + 1]
if high_val < window.max():
continue
start = max(0, i - lookback)
recent_low = float(df["low"].iloc[start : i + 1].min())
if recent_low <= 0:
continue
rise_pct = (high_val - recent_low) / recent_low * 100.0
if rise_pct < min_swing_pct:
continue
pivots.append(
Pivot(
bar_index=i,
side="high",
price=high_val,
datetime=pd.Timestamp(df.iloc[i]["datetime"]),
)
)
return pivots
def _dedupe_pivots(
pivots: list[Pivot],
min_bars: int,
prefer: str,
) -> list[Pivot]:
"""가까운 동일 방향 피벗을 병합한다."""
if not pivots:
return []
sorted_pivots = sorted(pivots, key=lambda p: p.bar_index)
merged: list[Pivot] = [sorted_pivots[0]]
for pivot in sorted_pivots[1:]:
last = merged[-1]
if pivot.bar_index - last.bar_index < min_bars:
if prefer == "low" and pivot.price < last.price:
merged[-1] = pivot
elif prefer == "high" and pivot.price > last.price:
merged[-1] = pivot
else:
merged.append(pivot)
return merged
def _to_signals(pivots: list[Pivot], side: str) -> list[SwingSignal]:
"""Pivot 리스트를 SwingSignal로 변환한다."""
source = "local_extrema"
signals: list[SwingSignal] = []
for idx, pivot in enumerate(pivots, start=1):
signals.append(
SwingSignal(
signal_id=idx,
side=side,
bar_index=pivot.bar_index,
price=round(pivot.price, 2),
datetime=pivot.datetime,
source=source,
swing_pct=0.0,
)
)
return signals

View File

@@ -0,0 +1,120 @@
"""ZigZag 스윙 피벗 탐지 (Ground Truth용, 미래 데이터 허용)."""
from __future__ import annotations
from dataclasses import dataclass
import pandas as pd
@dataclass(frozen=True)
class Pivot:
"""스윙 고점/저점."""
bar_index: int
side: str # "low" | "high"
price: float
datetime: pd.Timestamp
def find_zigzag_pivots(
df: pd.DataFrame,
reversal_pct: float = 5.0,
) -> list[Pivot]:
"""ZigZag 알고리즘으로 교대 스윙 피벗을 찾는다.
가격이 직전 극값 대비 `reversal_pct`% 이상 되돌림되면 피벗을 확정한다.
Ground Truth 라벨링용이므로 전체 시계열(미래 포함)을 사용한다.
Args:
df: datetime, high, low, close 컬럼 DataFrame.
reversal_pct: 피벗 확정 최소 되돌림 비율(%).
Returns:
시간순 피벗 리스트 (low/high 교대).
"""
if len(df) < 3:
return []
threshold = reversal_pct / 100.0
pivots: list[Pivot] = []
direction: str | None = None
extreme_idx = 0
extreme_price = float(df.iloc[0]["close"])
anchor_price = extreme_price
for i in range(1, len(df)):
high = float(df.iloc[i]["high"])
low = float(df.iloc[i]["low"])
if direction is None:
if high >= anchor_price * (1 + threshold):
direction = "up"
extreme_idx = i
extreme_price = high
elif low <= anchor_price * (1 - threshold):
direction = "down"
extreme_idx = i
extreme_price = low
continue
if direction == "up":
if high >= extreme_price:
extreme_price = high
extreme_idx = i
if low <= extreme_price * (1 - threshold):
pivots.append(_make_pivot(df, extreme_idx, "high", extreme_price))
direction = "down"
extreme_idx = i
extreme_price = low
anchor_price = extreme_price
else:
if low <= extreme_price:
extreme_price = low
extreme_idx = i
if high >= extreme_price * (1 + threshold):
pivots.append(_make_pivot(df, extreme_idx, "low", extreme_price))
direction = "up"
extreme_idx = i
extreme_price = high
anchor_price = extreme_price
# 마지막 미확정 극값도 피벗으로 추가 (방향에 따라)
if direction == "up":
pivots.append(_make_pivot(df, extreme_idx, "high", extreme_price))
elif direction == "down":
pivots.append(_make_pivot(df, extreme_idx, "low", extreme_price))
return _normalize_alternating(pivots)
def _make_pivot(df: pd.DataFrame, idx: int, side: str, price: float) -> Pivot:
"""Pivot 객체를 생성한다."""
return Pivot(
bar_index=idx,
side=side,
price=price,
datetime=pd.Timestamp(df.iloc[idx]["datetime"]),
)
def _normalize_alternating(pivots: list[Pivot]) -> list[Pivot]:
"""인접 동일 side 피벗을 제거하고 low/high 교대를 맞춘다."""
if not pivots:
return []
cleaned: list[Pivot] = [pivots[0]]
for pivot in pivots[1:]:
last = cleaned[-1]
if pivot.side == last.side:
# 같은 방향이면 더 극단적인 가격으로 갱신
if pivot.side == "high" and pivot.price >= last.price:
cleaned[-1] = pivot
elif pivot.side == "low" and pivot.price <= last.price:
cleaned[-1] = pivot
else:
cleaned.append(pivot)
# 첫 피벗이 high면 앞 low가 누락된 경우 — GT에서는 허용
return cleaned

View File

@@ -0,0 +1,11 @@
"""2단계: Ground Truth 정합 매매 기법."""
from deepcoin.techniques.registry import get_all_techniques, list_technique_ids
from deepcoin.techniques.runner import run_all_techniques, run_technique
__all__ = [
"get_all_techniques",
"list_technique_ids",
"run_all_techniques",
"run_technique",
]

View File

@@ -0,0 +1,85 @@
"""매매 기법 공통 인터페이스 및 결과 타입."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass, field
from typing import Any
import pandas as pd
@dataclass
class TechniqueSignal:
"""기법이 생성한 매수·매도 신호."""
side: str # buy | sell
bar_index: int
price: float
datetime: str
pivot_bar_index: int | None = None
confidence: float = 1.0
reason: str = ""
def to_dict(self) -> dict[str, Any]:
"""JSON 직렬화용 dict."""
return asdict(self)
@dataclass
class TechniqueParams:
"""기법 실행 공통 파라미터."""
interval_min: int
lookback_days: int
min_leg_pct: float
initial_cash_krw: float
fee_rate: float
extra: dict[str, Any] = field(default_factory=dict)
@dataclass
class TechniqueResult:
"""기법 실행 결과 (GT JSON과 호환 구조)."""
technique_id: str
technique_name: str
category: str
causal: bool
description: str
params: dict[str, Any]
signals: list[dict[str, Any]]
legs: list[dict[str, Any]]
summary: dict[str, Any]
pnl: dict[str, Any]
alignment: dict[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
"""JSON 직렬화용 dict."""
return asdict(self)
class BaseTechnique(ABC):
"""Ground Truth 정합을 목표로 하는 매매 기법 베이스."""
technique_id: str
technique_name: str
category: str
causal: bool
description: str
@abstractmethod
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
"""캔들 DataFrame에서 매수·매도 신호를 생성한다.
Args:
df: datetime, open, high, low, close, volume 컬럼.
params: 공통 실행 파라미터.
Returns:
시간순 신호 리스트.
"""
def default_extra_params(self) -> dict[str, Any]:
"""기법별 기본 추가 파라미터."""
return {}

View File

@@ -0,0 +1,80 @@
"""볼린저 밴드 역추세 매매."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
from deepcoin.techniques.indicators import bollinger_bands, ema
class BbReversalTechnique(BaseTechnique):
"""BB 하단 터치 후 반등 매수, 상단 터치 후 하락 매도."""
technique_id = "bb_reversal"
technique_name = "볼린저 역추세"
category = "indicator"
causal = True
description = "BB(20,2) 하단 매수·상단 매도 + EMA 추세 필터"
def default_extra_params(self) -> dict:
return {"window": 20, "num_std": 2.0, "ema_span": 60}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
window = int(params.extra.get("window", 20))
num_std = float(params.extra.get("num_std", 2.0))
ema_span = int(params.extra.get("ema_span", 60))
close = df["close"].astype(float)
low = df["low"].astype(float)
high = df["high"].astype(float)
_, upper, lower = bollinger_bands(close, window=window, num_std=num_std)
trend = ema(close, ema_span)
signals: list[TechniqueSignal] = []
in_oversold = False
in_overbought = False
for i in range(window, len(df)):
if pd.isna(lower.iloc[i]) or pd.isna(upper.iloc[i]):
continue
low_i = float(low.iloc[i])
high_i = float(high.iloc[i])
close_i = float(close.iloc[i])
lower_i = float(lower.iloc[i])
upper_i = float(upper.iloc[i])
trend_i = float(trend.iloc[i])
dt = pd.Timestamp(df.iloc[i]["datetime"]).strftime("%Y-%m-%d %H:%M:%S")
if low_i <= lower_i:
in_oversold = True
if in_oversold and close_i > lower_i and close_i >= trend_i * 0.98:
signals.append(
TechniqueSignal(
side="buy",
bar_index=i,
price=round(close_i, 2),
datetime=dt,
confidence=0.7,
reason="bb_lower_bounce",
)
)
in_oversold = False
if high_i >= upper_i:
in_overbought = True
if in_overbought and close_i < upper_i:
signals.append(
TechniqueSignal(
side="sell",
bar_index=i,
price=round(close_i, 2),
datetime=dt,
confidence=0.7,
reason="bb_upper_reject",
)
)
in_overbought = False
return signals

View File

@@ -0,0 +1,76 @@
"""돈치안 채널 스윙."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
class DonchianTechnique(BaseTechnique):
"""N봉 최저가 터치 후 반등 매수, N봉 최고가 터치 후 하락 매도."""
technique_id = "donchian"
technique_name = "돈치안 채널"
category = "swing"
causal = True
description = "돈치안(40) 채널 하단 매수·상단 매도"
def default_extra_params(self) -> dict:
return {"window": 40}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
window = int(params.extra.get("window", 40))
low = df["low"].astype(float)
high = df["high"].astype(float)
close = df["close"].astype(float)
lower_channel = low.rolling(window=window, min_periods=window).min()
upper_channel = high.rolling(window=window, min_periods=window).max()
signals: list[TechniqueSignal] = []
touched_lower = False
touched_upper = False
for i in range(window, len(df)):
if pd.isna(lower_channel.iloc[i]):
continue
low_i = float(low.iloc[i])
high_i = float(high.iloc[i])
close_i = float(close.iloc[i])
lower_i = float(lower_channel.iloc[i])
upper_i = float(upper_channel.iloc[i])
dt = pd.Timestamp(df.iloc[i]["datetime"]).strftime("%Y-%m-%d %H:%M:%S")
if low_i <= lower_i:
touched_lower = True
if touched_lower and close_i > lower_i * 1.005:
signals.append(
TechniqueSignal(
side="buy",
bar_index=i,
price=round(close_i, 2),
datetime=dt,
confidence=0.65,
reason="donchian_lower_bounce",
)
)
touched_lower = False
if high_i >= upper_i:
touched_upper = True
if touched_upper and close_i < upper_i * 0.995:
signals.append(
TechniqueSignal(
side="sell",
bar_index=i,
price=round(close_i, 2),
datetime=dt,
confidence=0.65,
reason="donchian_upper_reject",
)
)
touched_upper = False
return signals

View File

@@ -0,0 +1,54 @@
"""기술적 지표 계산 (인과 신호용)."""
from __future__ import annotations
import pandas as pd
def ema(series: pd.Series, span: int) -> pd.Series:
"""지수이동평균을 계산한다."""
return series.ewm(span=span, adjust=False).mean()
def sma(series: pd.Series, window: int) -> pd.Series:
"""단순이동평균을 계산한다."""
return series.rolling(window=window, min_periods=window).mean()
def bollinger_bands(
close: pd.Series,
window: int = 20,
num_std: float = 2.0,
) -> tuple[pd.Series, pd.Series, pd.Series]:
"""볼린저 밴드 (중심, 상단, 하단)를 계산한다."""
mid = sma(close, window)
std = close.rolling(window=window, min_periods=window).std()
upper = mid + num_std * std
lower = mid - num_std * std
return mid, upper, lower
def rsi(close: pd.Series, period: int = 14) -> pd.Series:
"""RSI(상대강도지수)를 계산한다."""
delta = close.diff()
gain = delta.clip(lower=0.0)
loss = -delta.clip(upper=0.0)
avg_gain = gain.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, pd.NA)
return 100 - (100 / (1 + rs))
def macd(
close: pd.Series,
fast: int = 12,
slow: int = 26,
signal: int = 9,
) -> tuple[pd.Series, pd.Series, pd.Series]:
"""MACD, 시그널, 히스토그램을 계산한다."""
ema_fast = ema(close, fast)
ema_slow = ema(close, slow)
macd_line = ema_fast - ema_slow
signal_line = ema(macd_line, signal)
hist = macd_line - signal_line
return macd_line, signal_line, hist

View File

@@ -0,0 +1,123 @@
"""신호 → 1매수·1매도 레그 변환."""
from __future__ import annotations
from typing import Any
from deepcoin.techniques.base import TechniqueSignal
def signals_to_legs(
signals: list[TechniqueSignal],
min_leg_pct: float = 3.0,
) -> list[dict[str, Any]]:
"""시간순 신호를 1매수·1매도 레그로 짝짓는다.
포지션 없을 때만 매수, 보유 중일 때만 매도를 인정한다.
Args:
signals: 시간순 TechniqueSignal 리스트.
min_leg_pct: 최소 레그 수익률(%) 필터.
Returns:
GT legs와 동일 스키마의 레그 dict 리스트.
"""
sorted_signals = sorted(signals, key=lambda s: s.bar_index)
legs: list[dict[str, Any]] = []
pending_buy: TechniqueSignal | None = None
leg_id = 0
for signal in sorted_signals:
if signal.side == "buy":
if pending_buy is None:
pending_buy = signal
elif signal.price < pending_buy.price:
pending_buy = signal
continue
if signal.side != "sell" or pending_buy is None:
continue
leg_pct = (signal.price - pending_buy.price) / pending_buy.price * 100.0
if leg_pct < min_leg_pct:
continue
leg_id += 1
legs.append(
{
"leg_id": leg_id,
"buy_datetime": pending_buy.datetime,
"buy_price": round(pending_buy.price, 2),
"buy_bar_index": pending_buy.bar_index,
"sell_datetime": signal.datetime,
"sell_price": round(signal.price, 2),
"sell_bar_index": signal.bar_index,
"leg_pct": round(leg_pct, 2),
"bars_held": signal.bar_index - pending_buy.bar_index,
}
)
pending_buy = None
return legs
def summarize_legs(legs: list[dict[str, Any]]) -> dict[str, Any]:
"""레그 요약 통계를 계산한다."""
if not legs:
return {
"leg_count": 0,
"buy_count": 0,
"sell_count": 0,
"avg_leg_pct": 0.0,
"median_leg_pct": 0.0,
"max_leg_pct": 0.0,
"min_leg_pct": 0.0,
"avg_bars_held": 0.0,
}
pcts = [float(leg["leg_pct"]) for leg in legs]
bars = [int(leg["bars_held"]) for leg in legs]
pcts_sorted = sorted(pcts)
mid = len(pcts_sorted) // 2
median = (
pcts_sorted[mid]
if len(pcts_sorted) % 2 == 1
else (pcts_sorted[mid - 1] + pcts_sorted[mid]) / 2
)
return {
"leg_count": len(legs),
"buy_count": len(legs),
"sell_count": len(legs),
"avg_leg_pct": round(sum(pcts) / len(pcts), 2),
"median_leg_pct": round(median, 2),
"max_leg_pct": round(max(pcts), 2),
"min_leg_pct": round(min(pcts), 2),
"avg_bars_held": round(sum(bars) / len(bars), 1),
}
def legs_to_signal_dicts(legs: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""레그를 GT signals 형식으로 변환한다."""
signals: list[dict[str, Any]] = []
for leg in legs:
signals.append(
{
"leg_id": leg["leg_id"],
"side": "buy",
"datetime": leg["buy_datetime"],
"price": leg["buy_price"],
"bar_index": leg["buy_bar_index"],
}
)
signals.append(
{
"leg_id": leg["leg_id"],
"side": "sell",
"datetime": leg["sell_datetime"],
"price": leg["sell_price"],
"bar_index": leg["sell_bar_index"],
"leg_pct": leg["leg_pct"],
}
)
return signals

View File

@@ -0,0 +1,125 @@
"""국소 극값 기반 매수·매도 (눌림목·반등 고점)."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
class LocalExtremaTechnique(BaseTechnique):
"""좌우 N봉 대비 국소 저점·고점에서 신호 (인과: 확정 지연 order봉)."""
technique_id = "local_extrema"
technique_name = "국소 극값"
category = "swing"
causal = True
description = "국소 저점 매수·고점 매도 (눌림목 유형 포착, 9/27 구간 등)"
def default_extra_params(self) -> dict:
return {"order": 20, "min_swing_pct": 2.0, "min_bars_between": 30}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
order = int(params.extra.get("order", 20))
min_swing_pct = float(params.extra.get("min_swing_pct", 2.0))
min_bars = int(params.extra.get("min_bars_between", 30))
buys = _find_local_low_signals(df, order=order, min_swing_pct=min_swing_pct)
sells = _find_local_high_signals(df, order=order, min_swing_pct=min_swing_pct)
return _dedupe_signals(buys + sells, min_bars=min_bars)
def _find_local_low_signals(
df: pd.DataFrame,
order: int,
min_swing_pct: float,
) -> list[TechniqueSignal]:
"""국소 저점 매수 신호 (order봉 후 확정)."""
signals: list[TechniqueSignal] = []
lookback = order * 4
for pivot_idx in range(order, len(df) - order):
low_val = float(df.iloc[pivot_idx]["low"])
window = df["low"].iloc[pivot_idx - order : pivot_idx + order + 1]
if low_val > window.min():
continue
start = max(0, pivot_idx - lookback)
recent_high = float(df["high"].iloc[start : pivot_idx + 1].max())
if recent_high <= 0:
continue
drop_pct = (recent_high - low_val) / recent_high * 100.0
if drop_pct < min_swing_pct:
continue
confirm_idx = pivot_idx + order
signals.append(
TechniqueSignal(
side="buy",
bar_index=confirm_idx,
price=round(low_val, 2),
datetime=pd.Timestamp(df.iloc[confirm_idx]["datetime"]).strftime("%Y-%m-%d %H:%M:%S"),
pivot_bar_index=pivot_idx,
confidence=min(1.0, drop_pct / 10.0),
reason="local_low",
)
)
return signals
def _find_local_high_signals(
df: pd.DataFrame,
order: int,
min_swing_pct: float,
) -> list[TechniqueSignal]:
"""국소 고점 매도 신호 (order봉 후 확정)."""
signals: list[TechniqueSignal] = []
lookback = order * 4
for pivot_idx in range(order, len(df) - order):
high_val = float(df.iloc[pivot_idx]["high"])
window = df["high"].iloc[pivot_idx - order : pivot_idx + order + 1]
if high_val < window.max():
continue
start = max(0, pivot_idx - lookback)
recent_low = float(df["low"].iloc[start : pivot_idx + 1].min())
if recent_low <= 0:
continue
rise_pct = (high_val - recent_low) / recent_low * 100.0
if rise_pct < min_swing_pct:
continue
confirm_idx = pivot_idx + order
signals.append(
TechniqueSignal(
side="sell",
bar_index=confirm_idx,
price=round(high_val, 2),
datetime=pd.Timestamp(df.iloc[confirm_idx]["datetime"]).strftime("%Y-%m-%d %H:%M:%S"),
pivot_bar_index=pivot_idx,
confidence=min(1.0, rise_pct / 10.0),
reason="local_high",
)
)
return signals
def _dedupe_signals(signals: list[TechniqueSignal], min_bars: int) -> list[TechniqueSignal]:
"""동일 방향 근접 신호를 병합한다."""
if not signals:
return []
sorted_signals = sorted(signals, key=lambda s: s.bar_index)
merged: list[TechniqueSignal] = [sorted_signals[0]]
for signal in sorted_signals[1:]:
last = merged[-1]
if signal.side == last.side and signal.bar_index - last.bar_index < min_bars:
if signal.side == "buy" and signal.price < last.price:
merged[-1] = signal
elif signal.side == "sell" and signal.price > last.price:
merged[-1] = signal
else:
merged.append(signal)
return sorted(merged, key=lambda s: s.bar_index)

View File

@@ -0,0 +1,68 @@
"""이동평균 골든·데드 크로스."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
from deepcoin.techniques.indicators import ema
class MaCrossTechnique(BaseTechnique):
"""단기 EMA가 장기 EMA를 상향 돌파 시 매수, 하향 돌파 시 매도."""
technique_id = "ma_cross"
technique_name = "EMA 크로스"
category = "indicator"
causal = True
description = "EMA(20/60) 골든크로스 매수·데드크로스 매도"
def default_extra_params(self) -> dict:
return {"fast_span": 20, "slow_span": 60}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
fast_span = int(params.extra.get("fast_span", 20))
slow_span = int(params.extra.get("slow_span", 60))
close = df["close"].astype(float)
fast = ema(close, fast_span)
slow = ema(close, slow_span)
signals: list[TechniqueSignal] = []
start = max(fast_span, slow_span)
for i in range(start + 1, len(df)):
if pd.isna(fast.iloc[i]) or pd.isna(slow.iloc[i]):
continue
prev_fast = float(fast.iloc[i - 1])
prev_slow = float(slow.iloc[i - 1])
curr_fast = float(fast.iloc[i])
curr_slow = float(slow.iloc[i])
price = float(close.iloc[i])
dt = pd.Timestamp(df.iloc[i]["datetime"]).strftime("%Y-%m-%d %H:%M:%S")
if prev_fast <= prev_slow and curr_fast > curr_slow:
signals.append(
TechniqueSignal(
side="buy",
bar_index=i,
price=round(price, 2),
datetime=dt,
confidence=0.6,
reason="golden_cross",
)
)
elif prev_fast >= prev_slow and curr_fast < curr_slow:
signals.append(
TechniqueSignal(
side="sell",
bar_index=i,
price=round(price, 2),
datetime=dt,
confidence=0.6,
reason="death_cross",
)
)
return signals

View File

@@ -0,0 +1,68 @@
"""MACD 시그널선 크로스."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
from deepcoin.techniques.indicators import macd
class MacdCrossTechnique(BaseTechnique):
"""MACD가 시그널선을 상향 돌파 시 매수, 하향 돌파 시 매도."""
technique_id = "macd_cross"
technique_name = "MACD 크로스"
category = "indicator"
causal = True
description = "MACD(12,26,9) 시그널선 골든·데드 크로스"
def default_extra_params(self) -> dict:
return {"fast": 12, "slow": 26, "signal": 9}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
fast = int(params.extra.get("fast", 12))
slow = int(params.extra.get("slow", 26))
signal_span = int(params.extra.get("signal", 9))
close = df["close"].astype(float)
macd_line, signal_line, _ = macd(close, fast=fast, slow=slow, signal=signal_span)
signals: list[TechniqueSignal] = []
start = slow + signal_span
for i in range(start + 1, len(df)):
if pd.isna(macd_line.iloc[i]) or pd.isna(signal_line.iloc[i]):
continue
prev_macd = float(macd_line.iloc[i - 1])
prev_sig = float(signal_line.iloc[i - 1])
curr_macd = float(macd_line.iloc[i])
curr_sig = float(signal_line.iloc[i])
price = float(close.iloc[i])
dt = pd.Timestamp(df.iloc[i]["datetime"]).strftime("%Y-%m-%d %H:%M:%S")
if prev_macd <= prev_sig and curr_macd > curr_sig:
signals.append(
TechniqueSignal(
side="buy",
bar_index=i,
price=round(price, 2),
datetime=dt,
confidence=0.6,
reason="macd_bull_cross",
)
)
elif prev_macd >= prev_sig and curr_macd < curr_sig:
signals.append(
TechniqueSignal(
side="sell",
bar_index=i,
price=round(price, 2),
datetime=dt,
confidence=0.6,
reason="macd_bear_cross",
)
)
return signals

View File

@@ -0,0 +1,67 @@
"""소형 ZigZag + 국소 극값 하이브리드."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
from deepcoin.techniques.local_extrema import LocalExtremaTechnique
from deepcoin.techniques.zigzag_causal import find_causal_zigzag_signals
class MinorSwingTechnique(BaseTechnique):
"""2.5% 인과 ZigZag와 국소 극값을 결합한 하이브리드."""
technique_id = "minor_swing"
technique_name = "소형 스윙 하이브리드"
category = "hybrid"
causal = True
description = "소형 ZigZag(2.5%) + 국소 극값 — GT 중간 눌림목 보완"
def default_extra_params(self) -> dict:
return {
"reversal_pct": 2.5,
"order": 15,
"min_swing_pct": 2.0,
"min_bars_between": 20,
}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
reversal_pct = float(params.extra.get("reversal_pct", 2.5))
zz_signals = find_causal_zigzag_signals(df, reversal_pct=reversal_pct)
local_params = TechniqueParams(
interval_min=params.interval_min,
lookback_days=params.lookback_days,
min_leg_pct=params.min_leg_pct,
initial_cash_krw=params.initial_cash_krw,
fee_rate=params.fee_rate,
extra={
"order": int(params.extra.get("order", 15)),
"min_swing_pct": float(params.extra.get("min_swing_pct", 2.0)),
"min_bars_between": int(params.extra.get("min_bars_between", 20)),
},
)
local_signals = LocalExtremaTechnique().generate_signals(df, local_params)
return _merge_signals(zz_signals + local_signals, min_bars=20)
def _merge_signals(signals: list[TechniqueSignal], min_bars: int) -> list[TechniqueSignal]:
"""방향별 근접 신호를 병합한다."""
if not signals:
return []
sorted_signals = sorted(signals, key=lambda s: s.bar_index)
merged: list[TechniqueSignal] = [sorted_signals[0]]
for signal in sorted_signals[1:]:
last = merged[-1]
if signal.side == last.side and signal.bar_index - last.bar_index < min_bars:
if signal.side == "buy" and signal.price < last.price:
merged[-1] = signal
elif signal.side == "sell" and signal.price > last.price:
merged[-1] = signal
else:
merged.append(signal)
return sorted(merged, key=lambda s: s.bar_index)

View File

@@ -0,0 +1,42 @@
"""매매 기법 레지스트리."""
from __future__ import annotations
from deepcoin.techniques.base import BaseTechnique
from deepcoin.techniques.bb_reversal import BbReversalTechnique
from deepcoin.techniques.donchian import DonchianTechnique
from deepcoin.techniques.local_extrema import LocalExtremaTechnique
from deepcoin.techniques.ma_cross import MaCrossTechnique
from deepcoin.techniques.macd_cross import MacdCrossTechnique
from deepcoin.techniques.minor_swing import MinorSwingTechnique
from deepcoin.techniques.rsi_swing import RsiSwingTechnique
from deepcoin.techniques.zigzag_causal import ZigzagCausalTechnique
_ALL_TECHNIQUES: list[BaseTechnique] = [
ZigzagCausalTechnique(),
MinorSwingTechnique(),
LocalExtremaTechnique(),
BbReversalTechnique(),
MaCrossTechnique(),
RsiSwingTechnique(),
MacdCrossTechnique(),
DonchianTechnique(),
]
def get_all_techniques() -> list[BaseTechnique]:
"""등록된 모든 매매 기법을 반환한다."""
return list(_ALL_TECHNIQUES)
def get_technique(technique_id: str) -> BaseTechnique | None:
"""ID로 기법을 조회한다."""
for technique in _ALL_TECHNIQUES:
if technique.technique_id == technique_id:
return technique
return None
def list_technique_ids() -> list[str]:
"""기법 ID 목록을 반환한다."""
return [t.technique_id for t in _ALL_TECHNIQUES]

View File

@@ -0,0 +1,74 @@
"""RSI 과매도·과매수 스윙."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
from deepcoin.techniques.indicators import rsi
class RsiSwingTechnique(BaseTechnique):
"""RSI 30 이탈 후 복귀 시 매수, 70 이탈 후 복귀 시 매도."""
technique_id = "rsi_swing"
technique_name = "RSI 스윙"
category = "indicator"
causal = True
description = "RSI(14) 과매도 반등 매수·과매수 하락 매도"
def default_extra_params(self) -> dict:
return {"period": 14, "oversold": 30.0, "overbought": 70.0}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
period = int(params.extra.get("period", 14))
oversold = float(params.extra.get("oversold", 30.0))
overbought = float(params.extra.get("overbought", 70.0))
close = df["close"].astype(float)
rsi_vals = rsi(close, period=period)
signals: list[TechniqueSignal] = []
was_oversold = False
was_overbought = False
for i in range(period + 1, len(df)):
if pd.isna(rsi_vals.iloc[i]):
continue
curr = float(rsi_vals.iloc[i])
prev = float(rsi_vals.iloc[i - 1])
price = float(close.iloc[i])
dt = pd.Timestamp(df.iloc[i]["datetime"]).strftime("%Y-%m-%d %H:%M:%S")
if curr < oversold:
was_oversold = True
if was_oversold and prev < oversold <= curr:
signals.append(
TechniqueSignal(
side="buy",
bar_index=i,
price=round(price, 2),
datetime=dt,
confidence=0.65,
reason="rsi_oversold_exit",
)
)
was_oversold = False
if curr > overbought:
was_overbought = True
if was_overbought and prev > overbought >= curr:
signals.append(
TechniqueSignal(
side="sell",
bar_index=i,
price=round(price, 2),
datetime=dt,
confidence=0.65,
reason="rsi_overbought_exit",
)
)
was_overbought = False
return signals

View File

@@ -0,0 +1,141 @@
"""매매 기법 실행 및 결과 저장."""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any
import pandas as pd
from deepcoin.data.candle_loader import load_candles
from deepcoin.data.intervals import interval_label
from deepcoin.evaluation.gt_align import align_with_ground_truth
from deepcoin.ground_truth.pnl import simulate_gt_pnl
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueResult
from deepcoin.techniques.legs import legs_to_signal_dicts, signals_to_legs, summarize_legs
from deepcoin.techniques.registry import get_all_techniques, get_technique
def run_technique(
technique: BaseTechnique,
df: pd.DataFrame,
params: TechniqueParams,
gt_result: dict[str, Any] | None = None,
tolerance_bars: int = 480,
) -> TechniqueResult:
"""단일 기법을 실행하고 GT 정합을 계산한다.
Args:
technique: 실행할 기법.
df: 캔들 DataFrame.
params: 실행 파라미터.
gt_result: Ground Truth JSON dict (정합 평가용).
tolerance_bars: GT 신호 매칭 허용 봉 수.
Returns:
TechniqueResult.
"""
merged_extra = {**technique.default_extra_params(), **params.extra}
run_params = TechniqueParams(
interval_min=params.interval_min,
lookback_days=params.lookback_days,
min_leg_pct=params.min_leg_pct,
initial_cash_krw=params.initial_cash_krw,
fee_rate=params.fee_rate,
extra=merged_extra,
)
raw_signals = technique.generate_signals(df, run_params)
legs = signals_to_legs(raw_signals, min_leg_pct=run_params.min_leg_pct)
summary = summarize_legs(legs)
pnl = simulate_gt_pnl(
legs,
initial_cash_krw=run_params.initial_cash_krw,
fee_rate=run_params.fee_rate,
)
alignment = None
if gt_result is not None:
alignment = align_with_ground_truth(
gt_result=gt_result,
technique_signals=[s.to_dict() for s in raw_signals],
technique_legs=legs,
tolerance_bars=tolerance_bars,
)
return TechniqueResult(
technique_id=technique.technique_id,
technique_name=technique.technique_name,
category=technique.category,
causal=technique.causal,
description=technique.description,
params={
"interval_min": run_params.interval_min,
"lookback_days": run_params.lookback_days,
"min_leg_pct": run_params.min_leg_pct,
"initial_cash_krw": run_params.initial_cash_krw,
"fee_rate": run_params.fee_rate,
**merged_extra,
},
signals=[s.to_dict() for s in raw_signals],
legs=legs,
summary=summary,
pnl=pnl,
alignment=alignment,
)
def run_all_techniques(
db_path: Path,
symbol: str,
params: TechniqueParams,
gt_result: dict[str, Any] | None = None,
tolerance_bars: int = 480,
technique_ids: list[str] | None = None,
) -> list[TechniqueResult]:
"""등록된 기법을 일괄 실행한다."""
df = load_candles(
db_path=db_path,
symbol=symbol,
interval_min=params.interval_min,
lookback_days=params.lookback_days,
)
techniques = get_all_techniques()
if technique_ids:
techniques = [t for t in techniques if t.technique_id in technique_ids]
results: list[TechniqueResult] = []
for technique in techniques:
results.append(
run_technique(
technique=technique,
df=df,
params=params,
gt_result=gt_result,
tolerance_bars=tolerance_bars,
)
)
return results
def save_technique_result(result: TechniqueResult, output_dir: Path) -> Path:
"""기법 결과를 JSON으로 저장한다."""
output_dir.mkdir(parents=True, exist_ok=True)
out_path = output_dir / f"{result.technique_id}.json"
payload = result.to_dict()
payload["meta"] = {
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"interval_label": interval_label(int(result.params["interval_min"])),
}
with out_path.open("w", encoding="utf-8") as fp:
json.dump(payload, fp, ensure_ascii=False, indent=2)
return out_path
def load_ground_truth(gt_path: Path) -> dict[str, Any]:
"""Ground Truth JSON을 로드한다."""
with gt_path.open(encoding="utf-8") as fp:
return json.load(fp)

View File

@@ -0,0 +1,112 @@
"""인과 ZigZag — 피벗 확정 시점에 매수·매도 신호."""
from __future__ import annotations
import pandas as pd
from deepcoin.techniques.base import BaseTechnique, TechniqueParams, TechniqueSignal
class ZigzagCausalTechnique(BaseTechnique):
"""되돌림 % 확정 시 피벗을 인정하는 인과 ZigZag 기법.
Ground Truth ZigZag와 동일 임계값을 쓰되, 미래 데이터 없이
피벗이 확정된 봉에서만 신호를 발생시킨다.
"""
technique_id = "zigzag_causal"
technique_name = "인과 ZigZag"
category = "swing"
causal = True
description = "되돌림 % 확정 시 스윙 저점 매수·고점 매도 (GT ZigZag 인과 버전)"
def default_extra_params(self) -> dict:
return {"reversal_pct": 5.0}
def generate_signals(self, df: pd.DataFrame, params: TechniqueParams) -> list[TechniqueSignal]:
reversal_pct = float(params.extra.get("reversal_pct", 5.0))
return find_causal_zigzag_signals(df, reversal_pct=reversal_pct)
def find_causal_zigzag_signals(
df: pd.DataFrame,
reversal_pct: float = 5.0,
) -> list[TechniqueSignal]:
"""인과 ZigZag 신호를 생성한다.
Args:
df: OHLCV DataFrame.
reversal_pct: 피벗 확정 최소 되돌림(%).
Returns:
시간순 신호 리스트.
"""
if len(df) < 3:
return []
threshold = reversal_pct / 100.0
signals: list[TechniqueSignal] = []
direction: str | None = None
extreme_idx = 0
extreme_price = float(df.iloc[0]["close"])
for i in range(1, len(df)):
high = float(df.iloc[i]["high"])
low = float(df.iloc[i]["low"])
if direction is None:
if high >= extreme_price * (1 + threshold):
direction = "up"
extreme_idx = i
extreme_price = high
elif low <= extreme_price * (1 - threshold):
direction = "down"
extreme_idx = i
extreme_price = low
continue
if direction == "up":
if high >= extreme_price:
extreme_price = high
extreme_idx = i
if low <= extreme_price * (1 - threshold):
signals.append(
_make_signal(df, i, extreme_idx, extreme_price, "sell", reversal_pct)
)
direction = "down"
extreme_idx = i
extreme_price = low
else:
if low <= extreme_price:
extreme_price = low
extreme_idx = i
if high >= extreme_price * (1 + threshold):
signals.append(
_make_signal(df, i, extreme_idx, extreme_price, "buy", reversal_pct)
)
direction = "up"
extreme_idx = i
extreme_price = high
return signals
def _make_signal(
df: pd.DataFrame,
confirm_idx: int,
pivot_idx: int,
pivot_price: float,
side: str,
reversal_pct: float,
) -> TechniqueSignal:
"""확정 봉 기준 TechniqueSignal을 생성한다."""
dt = pd.Timestamp(df.iloc[confirm_idx]["datetime"]).strftime("%Y-%m-%d %H:%M:%S")
return TechniqueSignal(
side=side,
bar_index=confirm_idx,
price=round(pivot_price, 2),
datetime=dt,
pivot_bar_index=pivot_idx,
confidence=min(1.0, reversal_pct / 10.0),
reason=f"zigzag_{side}_confirmed",
)