"""Ground Truth 매수·매도 타점 생성 (1단계 · 0단계 sim 입력).""" from __future__ import annotations import json from dataclasses import asdict, dataclass from datetime import datetime from pathlib import Path from typing import Any import pandas as pd from bithumb.data.candle_loader import load_candles from bithumb.data.intervals import interval_label from bithumb.ground_truth.pnl import simulate_gt_pnl from bithumb.ground_truth.breakout import find_breakout_buy_pivots from bithumb.ground_truth.divergence import find_divergence_signals from bithumb.ground_truth.pullback import find_pullback_buy_pivots from bithumb.ground_truth.zigzag import Pivot, find_zigzag_pivots @dataclass(frozen=True) class GtParams: """Ground Truth 생성 파라미터.""" interval_min: int lookback_days: int zigzag_reversal_pct: float min_leg_pct: float pullback_min_pct: float = 1.5 pullback_local_order: int = 10 breakout_buffer_pct: float = 0.1 breakout_consolidation_bars: int = 200 breakout_min_rally_pct: float = 2.0 div_local_order: int = 20 div_min_bars_between: int = 1500 div_min_rsi_diff: float = 5.0 div_min_future_move_pct: float = 4.0 chart_tier: str = "v3" def _tier_flags(tier: str) -> tuple[bool, bool, bool]: """차트 버전별 보조 신호 포함 여부 (눌림목, 돌파, 다이버전스). v1: ZigZag 스윙만 (레그당 1매수·1매도 최소) v2: 스윙 + 눌림목 v3: v2 + 돌파 + 다이버전스 """ tier = tier.lower() if tier == "v1": return False, False, False if tier == "v2": return True, False, False return True, True, True @dataclass class GtLeg: """매수→매도 1레그 (최대 스윙 수익 구간).""" leg_id: int buy_datetime: str buy_price: float buy_bar_index: int sell_datetime: str sell_price: float sell_bar_index: int leg_pct: float bars_held: int def build_ground_truth( db_path: Path, symbol: str, coin_name: str, params: GtParams, initial_cash_krw: float = 400_000.0, fee_rate: float = 0.0005, ) -> dict[str, Any]: """최근 1년 구간에서 사후 최적 스윙 레그(1매수·1매도) GT를 생성한다. 미래 데이터를 사용해 ZigZag 스윙 저점 매수·고점 매도 쌍을 찾는다. 1단계 벤치마크: 최대 스윙 수익을 포착하는 타점. Args: db_path: SQLite 경로. symbol: 코인 심볼. coin_name: 코인 이름. params: GT 파라미터. initial_cash_krw: 수익률 계산 초기 자본 (1년 시작 시점). fee_rate: 거래 수수료율. Returns: JSON 직렬화 가능한 GT 결과 dict. """ df = load_candles( db_path=db_path, symbol=symbol, interval_min=params.interval_min, lookback_days=params.lookback_days, ) pivots = find_zigzag_pivots(df, reversal_pct=params.zigzag_reversal_pct) legs = _pivots_to_legs(pivots, min_leg_pct=params.min_leg_pct) leg_dicts = [asdict(leg) for leg in legs] include_pullback, include_breakout, include_divergence = _tier_flags(params.chart_tier) pullback_buys: list[Pivot] = [] if include_pullback: pullback_buys = find_pullback_buy_pivots( df, legs=legs, min_pullback_pct=params.pullback_min_pct, local_order=params.pullback_local_order, ) breakout_buys = [] if include_breakout: breakout_buys = find_breakout_buy_pivots( df, legs=legs, pullback_buys=pullback_buys, breakout_buffer_pct=params.breakout_buffer_pct, consolidation_bars=params.breakout_consolidation_bars, min_rally_to_sell_pct=params.breakout_min_rally_pct, ) div_buys: list = [] div_sells: list = [] if include_divergence: div_buys, div_sells = find_divergence_signals( df, local_order=params.div_local_order, min_bars_between=params.div_min_bars_between, min_rsi_diff=params.div_min_rsi_diff, min_future_move_pct=params.div_min_future_move_pct, ) mode_map = { "v1": "optimal_swing_legs", "v2": "optimal_swing_legs_with_pullback", "v3": "optimal_swing_legs_with_pullback_breakout_divergence", } mode = mode_map.get(params.chart_tier.lower(), mode_map["v3"]) signals = _build_signals(legs, pullback_buys, breakout_buys, div_buys, div_sells) summary = _summarize(legs, signals) pnl = simulate_gt_pnl(leg_dicts, initial_cash_krw=initial_cash_krw, fee_rate=fee_rate) return { "meta": { "symbol": symbol.upper(), "coin_name": coin_name, "interval_min": params.interval_min, "interval_label": interval_label(params.interval_min), "lookback_days": params.lookback_days, "chart_tier": params.chart_tier.lower(), "mode": mode, "zigzag_reversal_pct": params.zigzag_reversal_pct, "min_leg_pct": params.min_leg_pct, "pullback_min_pct": params.pullback_min_pct, "initial_cash_krw": initial_cash_krw, "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "data_from": str(df["datetime"].min()), "data_to": str(df["datetime"].max()), "bar_count": len(df), "pivot_count": len(pivots), "pullback_buy_count": len(pullback_buys), "breakout_buy_count": len(breakout_buys), "breakout_buffer_pct": params.breakout_buffer_pct, "divergence_buy_count": len(div_buys), "divergence_sell_count": len(div_sells), }, "legs": leg_dicts, "signals": signals, "summary": summary, "pnl": pnl, } def save_ground_truth(result: dict[str, Any], output_path: Path) -> Path: """GT 결과를 JSON으로 저장한다.""" output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as fp: json.dump(result, fp, ensure_ascii=False, indent=2) return output_path def _pivots_to_legs(pivots: list[Pivot], min_leg_pct: float) -> list[GtLeg]: """스윙 저점→고점을 1매수·1매도 레그로 변환한다.""" legs: list[GtLeg] = [] leg_id = 0 i = 0 while i < len(pivots) - 1: buy_pivot = pivots[i] sell_pivot = pivots[i + 1] if buy_pivot.side != "low" or sell_pivot.side != "high": i += 1 continue if sell_pivot.bar_index <= buy_pivot.bar_index: i += 1 continue leg_pct = (sell_pivot.price - buy_pivot.price) / buy_pivot.price * 100.0 if leg_pct < min_leg_pct: i += 1 continue leg_id += 1 legs.append( GtLeg( leg_id=leg_id, buy_datetime=buy_pivot.datetime.strftime("%Y-%m-%d %H:%M:%S"), buy_price=round(buy_pivot.price, 2), buy_bar_index=buy_pivot.bar_index, sell_datetime=sell_pivot.datetime.strftime("%Y-%m-%d %H:%M:%S"), sell_price=round(sell_pivot.price, 2), sell_bar_index=sell_pivot.bar_index, leg_pct=round(leg_pct, 2), bars_held=sell_pivot.bar_index - buy_pivot.bar_index, ) ) i += 2 return legs def _build_signals( legs: list[GtLeg], pullback_buys: list[Pivot], breakout_buys: list, div_buys: list, div_sells: list, ) -> list[dict[str, Any]]: """스윙·눌림목·돌파·다이버전스 신호를 통합한다.""" signals: list[dict[str, Any]] = [] buy_marker_id = 0 sell_marker_id = 0 existing_buy_bars: set[int] = {leg.buy_bar_index for leg in legs} existing_sell_bars: set[int] = {leg.sell_bar_index for leg in legs} nearby_tolerance = 120 for leg in legs: buy_marker_id += 1 signals.append( { "marker_id": buy_marker_id, "leg_id": leg.leg_id, "side": "buy", "signal_type": "swing_low", "datetime": leg.buy_datetime, "price": leg.buy_price, "bar_index": leg.buy_bar_index, } ) sell_marker_id += 1 existing_sell_bars.add(leg.sell_bar_index) signals.append( { "marker_id": sell_marker_id, "leg_id": leg.leg_id, "side": "sell", "signal_type": "swing_high", "datetime": leg.sell_datetime, "price": leg.sell_price, "bar_index": leg.sell_bar_index, "leg_pct": leg.leg_pct, } ) for pivot in pullback_buys: if _is_near_existing_buy(pivot.bar_index, existing_buy_bars, nearby_tolerance): continue buy_marker_id += 1 existing_buy_bars.add(pivot.bar_index) signals.append( { "marker_id": buy_marker_id, "leg_id": None, "side": "buy", "signal_type": "pullback", "datetime": pivot.datetime.strftime("%Y-%m-%d %H:%M:%S"), "price": round(pivot.price, 2), "bar_index": pivot.bar_index, } ) for breakout in breakout_buys: if _is_near_existing_buy(breakout.bar_index, existing_buy_bars, nearby_tolerance): continue buy_marker_id += 1 existing_buy_bars.add(breakout.bar_index) signals.append( { "marker_id": buy_marker_id, "leg_id": breakout.leg_id, "side": "buy", "signal_type": "breakout", "datetime": breakout.datetime.strftime("%Y-%m-%d %H:%M:%S"), "price": breakout.price, "bar_index": breakout.bar_index, "resistance_price": breakout.resistance_price, } ) div_tolerance = 400 for div in div_buys: if _is_near_bar(div.bar_index, existing_buy_bars, div_tolerance): continue buy_marker_id += 1 existing_buy_bars.add(div.bar_index) signals.append(_divergence_to_dict(div, buy_marker_id, "div_bull")) for div in div_sells: if _is_near_bar(div.bar_index, existing_sell_bars, div_tolerance): continue sell_marker_id += 1 existing_sell_bars.add(div.bar_index) signals.append(_divergence_to_dict(div, sell_marker_id, "div_bear")) signals.sort(key=lambda s: (s["bar_index"], _signal_sort_key(s))) return signals def _divergence_to_dict(div, marker_id: int, signal_type: str) -> dict[str, Any]: """DivergenceSignal을 GT signal dict로 변환한다.""" return { "marker_id": marker_id, "leg_id": None, "side": div.side, "signal_type": signal_type, "datetime": div.datetime.strftime("%Y-%m-%d %H:%M:%S"), "price": div.price, "bar_index": div.bar_index, "indicator": div.indicator, "price_prev": div.price_prev, "ind_prev": div.ind_prev, "ind_curr": div.ind_curr, } def _signal_sort_key(signal: dict[str, Any]) -> int: """동일 봉에서 신호 유형 정렬 우선순위.""" order = { "swing_low": 0, "pullback": 1, "breakout": 2, "div_bull": 3, "swing_high": 4, "div_bear": 5, } return order.get(signal.get("signal_type", ""), 9) def _is_near_bar(bar_index: int, existing_bars: set[int], tolerance: int) -> bool: """기존 타점과 너무 가까우면 보조 신호를 제외한다.""" for existing in existing_bars: if abs(bar_index - existing) <= tolerance: return True return False def _is_near_existing_buy(bar_index: int, existing_bars: set[int], tolerance: int) -> bool: """기존 매수와 너무 가까우면 보조 매수를 제외한다.""" return _is_near_bar(bar_index, existing_bars, tolerance) def _summarize(legs: list[GtLeg], signals: list[dict[str, Any]]) -> dict[str, Any]: """GT 요약 통계.""" buy_count = sum(1 for s in signals if s["side"] == "buy") sell_count = sum(1 for s in signals if s["side"] == "sell") pullback_count = sum(1 for s in signals if s.get("signal_type") == "pullback") breakout_count = sum(1 for s in signals if s.get("signal_type") == "breakout") div_buy_count = sum(1 for s in signals if s.get("signal_type") == "div_bull") div_sell_count = sum(1 for s in signals if s.get("signal_type") == "div_bear") if not legs: return { "leg_count": 0, "buy_count": buy_count, "sell_count": sell_count, "pullback_buy_count": pullback_count, "breakout_buy_count": breakout_count, "divergence_buy_count": div_buy_count, "divergence_sell_count": div_sell_count, "avg_leg_pct": 0.0, "median_leg_pct": 0.0, "max_leg_pct": 0.0, "min_leg_pct": 0.0, "avg_bars_held": 0.0, } pcts = [leg.leg_pct for leg in legs] bars = [leg.bars_held for leg in legs] return { "leg_count": len(legs), "buy_count": buy_count, "sell_count": sell_count, "pullback_buy_count": pullback_count, "breakout_buy_count": breakout_count, "divergence_buy_count": div_buy_count, "divergence_sell_count": div_sell_count, "avg_leg_pct": round(sum(pcts) / len(pcts), 2), "median_leg_pct": round(float(pd.Series(pcts).median()), 2), "max_leg_pct": round(max(pcts), 2), "min_leg_pct": round(min(pcts), 2), "avg_bars_held": round(sum(bars) / len(bars), 1), }