Files
Bithumb/deepcoin/ground_truth/causal_gt_hybrid.py
xavis d385456867 hybrid DD tier와 Option C 2차(+1000%) 검증을 추가하고 실거래 사이징을 정합한다.
인과 GT leg 엔진·drawdown tier·train 캘리브레이션, Phase 2 Go/No-Go 및 시뮬 리포트를 반영한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-01 16:09:28 +09:00

448 lines
13 KiB
Python

"""
Phase 3: monitor 발화 + drawdown/past-leg tier (인과적).
매도는 monitor(sell_mtf_cross) 유지, tier만 drawdown·과거 leg 수익으로 강화합니다.
"""
from __future__ import annotations
from typing import Any
import pandas as pd
from config import (
CAUSAL_GT_DD_LARGE_PCT,
CAUSAL_GT_DD_MEDIUM_PCT,
GT_BUY_PCT_LARGE_LEG,
GT_BUY_PCT_MEDIUM_LEG,
GT_BUY_PCT_SMALL_LEG,
GT_INITIAL_CASH_KRW,
SIM_TIER_CONVICTION_DD_PCT,
TRADING_FEE_RATE,
)
from deepcoin.ground_truth.gt_allocation import (
allocate_order_amounts_chronological,
simulate_portfolio_summary,
)
from deepcoin.matching.portfolio_sim import sort_fires_chronological
from deepcoin.matching.position_sizing import enrich_sim_trades_with_gt_weights
def _deduped_ohlc(df: pd.DataFrame) -> pd.DataFrame:
"""
DatetimeIndex 중복 제거·정렬 (drawdown lookup용).
Args:
df: OHLC DataFrame.
Returns:
index unique OHLC.
"""
if df.empty:
return df
out = df.sort_index()
if not out.index.is_unique:
out = out[~out.index.duplicated(keep="last")]
return out
def _close_series_from_df(df: pd.DataFrame) -> pd.Series:
"""
OHLC DataFrame에서 종가 시리즈 추출 (positional index).
Args:
df: Open/Close 또는 open/close 컬럼을 가진 OHLC.
Returns:
float 종가 시리즈.
"""
if df.empty:
return pd.Series(dtype=float)
frame = _deduped_ohlc(df)
for col in ("close", "Close"):
if col in frame.columns:
return frame[col].astype(float).reset_index(drop=True)
raise KeyError("OHLC DataFrame에 close/Close 컬럼이 없습니다.")
def _bar_index_at(df: pd.DataFrame, dt: str) -> int:
"""
시각 dt에 대응하는 bar 위치 (인덱스 중복 시 nearest).
Args:
df: DatetimeIndex OHLC.
dt: ISO 시각 문자열.
Returns:
정수 bar 위치 (0..n-1).
"""
frame = _deduped_ohlc(df)
if frame.empty:
return 0
try:
ts = pd.to_datetime(dt)
except (TypeError, ValueError):
return 0
pos = int(frame.index.get_indexer([ts], method="nearest")[0])
return max(pos, 0)
def _drawdown_pct_at_index(closes: pd.Series, idx: int) -> float:
"""
bar idx 시점 drawdown % (과거 rolling high 대비, 인과적).
Args:
closes: 종가 시리즈.
idx: 봉 위치.
Returns:
drawdown % (0~100).
"""
if idx < 0 or idx >= len(closes):
return 0.0
seg = closes.iloc[: idx + 1].astype(float)
if seg.empty:
return 0.0
peak = float(seg.max())
cur = float(seg.iloc[-1])
if peak <= 0:
return 0.0
return max((peak - cur) / peak * 100.0, 0.0)
def hybrid_tier_scale(
trade: dict[str, Any],
*,
completed_leg_ret: dict[int, float],
enhanced: bool = False,
dd_large_pct: float | None = None,
dd_medium_pct: float | None = None,
) -> float:
"""
과거 leg 수익 tier + drawdown tier (인과적).
Args:
trade: 매수 trade dict (drawdown_pct 포함).
completed_leg_ret: 청산 완료 leg realized return %.
enhanced: True면 medium tier·conviction 플래그 적용.
dd_large_pct: drawdown large tier 임계(%). None이면 config.
dd_medium_pct: drawdown medium tier 임계(%). None이면 config.
Returns:
asset_pct_scale.
"""
from config import GT_LARGE_LEG_TOP_PCT
from deepcoin.matching.position_sizing import (
large_leg_ids_from_past_returns,
)
dd_large = float(dd_large_pct if dd_large_pct is not None else CAUSAL_GT_DD_LARGE_PCT)
dd_medium = float(dd_medium_pct if dd_medium_pct is not None else CAUSAL_GT_DD_MEDIUM_PCT)
lid = int(trade.get("leg_id", 0))
large_past = large_leg_ids_from_past_returns(completed_leg_ret, GT_LARGE_LEG_TOP_PCT)
dd = float(trade.get("drawdown_pct") or 0.0)
if lid in large_past:
if enhanced and dd >= SIM_TIER_CONVICTION_DD_PCT:
trade["conviction_buy"] = True
return float(GT_BUY_PCT_LARGE_LEG)
if dd >= dd_large:
if enhanced:
trade["conviction_buy"] = True
return float(GT_BUY_PCT_LARGE_LEG)
if dd >= dd_medium:
if enhanced and dd >= SIM_TIER_CONVICTION_DD_PCT:
trade["conviction_buy"] = True
return float(GT_BUY_PCT_MEDIUM_LEG) if enhanced else float(GT_BUY_PCT_LARGE_LEG) * 0.5
return float(GT_BUY_PCT_SMALL_LEG)
def _monitor_rows_from_fires(fires: pd.DataFrame) -> list[dict[str, Any]]:
"""monitor 발화 DataFrame → trade dict 리스트."""
rows: list[dict[str, Any]] = []
for _, r in sort_fires_chronological(fires).iterrows():
rows.append(
{
"dt": str(r["dt"]),
"action": r["side"],
"price": float(r["close"]),
"rule_id": r.get("rule_id", ""),
}
)
return rows
def build_monitor_hybrid_sized_trades(
fires: pd.DataFrame,
df: pd.DataFrame,
*,
enhanced: bool = False,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
dd_large_pct: float | None = None,
dd_medium_pct: float | None = None,
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""
monitor 발화 → hybrid tier amount_krw 배분 (인과적).
Args:
fires: monitor rule 발화 (buy+sell).
df: 3m OHLC (drawdown 계산).
enhanced: conviction·medium tier 사용.
initial_cash: 시작 현금.
fee_rate: 수수료율.
Returns:
(amount_krw가 채워진 trade dict, alloc_stats).
"""
from deepcoin.ground_truth.ground_truth import load_ground_truth, order_trades_chronological
from deepcoin.paths import resolve_ground_truth_file
if fires.empty:
return [], {"buy_executed": 0, "buy_skipped": 0}
gt_data = load_ground_truth(resolve_ground_truth_file()) or {}
gt_trades = order_trades_chronological(gt_data.get("trades") or [])
enriched = enrich_sim_trades_with_gt_weights(
_monitor_rows_from_fires(fires),
gt_trades,
causal_legs=True,
)
enriched = _attach_drawdown_to_buys(enriched, df)
def scale_fn(t: dict[str, Any], completed_leg_ret: dict[int, float]) -> float:
return hybrid_tier_scale(
t,
completed_leg_ret=completed_leg_ret,
enhanced=enhanced,
dd_large_pct=dd_large_pct,
dd_medium_pct=dd_medium_pct,
)
return allocate_order_amounts_chronological(
enriched,
initial_cash=initial_cash,
fee_rate=fee_rate,
causal_tier=False,
asset_pct_scale_fn=scale_fn,
)
def _simulate_monitor_tier_portfolio(
fires: pd.DataFrame,
df: pd.DataFrame,
*,
enhanced: bool = False,
last_price: float | None = None,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
dd_large_pct: float | None = None,
dd_medium_pct: float | None = None,
) -> dict[str, Any]:
"""
monitor buy+sell + tier 복리 시뮬 (hybrid 또는 enhanced).
Args:
fires: monitor rule 발화 (buy+sell).
df: 3m OHLC (drawdown 계산).
enhanced: conviction·medium tier 사용.
last_price: 미청산 평가가.
initial_cash: 시작 현금.
fee_rate: 수수료율.
dd_large_pct: drawdown large tier 임계(%).
dd_medium_pct: drawdown medium tier 임계(%).
Returns:
portfolio summary dict.
"""
mode = "monitor_tier_enhanced" if enhanced else "monitor_dd_tier"
if fires.empty:
return {"pnl_pct": 0.0, "trade_count": 0, "sizing_mode": mode}
sized, alloc_stats = build_monitor_hybrid_sized_trades(
fires,
df,
enhanced=enhanced,
initial_cash=initial_cash,
fee_rate=fee_rate,
dd_large_pct=dd_large_pct,
dd_medium_pct=dd_medium_pct,
)
mark = last_price
if mark is None and not df.empty:
try:
mark = float(_close_series_from_df(df).iloc[-1])
except KeyError:
mark = None
result = simulate_portfolio_summary(
sized,
initial_cash=initial_cash,
fee_rate=fee_rate,
last_price=mark,
use_amount_krw=True,
)
result["sizing_mode"] = mode
if enhanced:
result["sizing_note"] = (
"monitor buy+sell + past-leg·drawdown tier + conviction (미래 미사용)"
)
else:
result["sizing_note"] = (
"monitor buy+sell + drawdown·past-leg tier (미래 미사용)"
)
result["alloc_stats"] = alloc_stats
result["input_fires"] = int(len(fires))
return result
def _attach_drawdown_to_buys(
trades: list[dict[str, Any]],
df: pd.DataFrame,
) -> list[dict[str, Any]]:
"""
매수 trade에 bar drawdown % 부여 (인과적).
Args:
trades: enrich된 trade dict.
df: 3m OHLC (DatetimeIndex).
Returns:
drawdown_pct가 추가된 trade dict.
"""
if df.empty:
return trades
close_s = _close_series_from_df(df)
out: list[dict[str, Any]] = []
for t in trades:
row = dict(t)
if row.get("action") != "buy":
out.append(row)
continue
bar_idx = _bar_index_at(df, str(row.get("dt", "")))
row["drawdown_pct"] = round(_drawdown_pct_at_index(close_s, bar_idx), 2)
out.append(row)
return out
def simulate_monitor_dd_tier_portfolio(
fires: pd.DataFrame,
df: pd.DataFrame,
*,
last_price: float | None = None,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
dd_large_pct: float | None = None,
dd_medium_pct: float | None = None,
) -> dict[str, Any]:
"""
monitor buy+sell + drawdown/past-leg tier 복리 시뮬.
Args:
fires: monitor rule 발화 (buy+sell).
df: 3m OHLC (drawdown 계산).
last_price: 미청산 평가가.
initial_cash: 시작 현금.
fee_rate: 수수료율.
dd_large_pct: drawdown large tier 임계(%).
dd_medium_pct: drawdown medium tier 임계(%).
Returns:
portfolio summary dict.
"""
return _simulate_monitor_tier_portfolio(
fires,
df,
enhanced=False,
last_price=last_price,
initial_cash=initial_cash,
fee_rate=fee_rate,
dd_large_pct=dd_large_pct,
dd_medium_pct=dd_medium_pct,
)
def simulate_monitor_tier_enhanced_portfolio(
fires: pd.DataFrame,
df: pd.DataFrame,
*,
last_price: float | None = None,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
) -> dict[str, Any]:
"""
Phase 4: monitor + past-leg·drawdown tier + conviction (weight 분할 생략).
Args:
fires: monitor rule 발화 (buy+sell).
df: 3m OHLC (drawdown 계산).
last_price: 미청산 평가가.
initial_cash: 시작 현금.
fee_rate: 수수료율.
Returns:
portfolio summary dict.
"""
return _simulate_monitor_tier_portfolio(
fires,
df,
enhanced=True,
last_price=last_price,
initial_cash=initial_cash,
fee_rate=fee_rate,
)
def simulate_causal_gt_hybrid_portfolio(
buy_fires: pd.DataFrame,
df: pd.DataFrame,
*,
monitor_fires: pd.DataFrame | None = None,
last_price: float | None = None,
cg_params: dict[str, Any] | None = None,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
dd_large_pct: float | None = None,
dd_medium_pct: float | None = None,
) -> dict[str, Any]:
"""
Phase 3 하이브리드: monitor buy+sell + DD tier (권장).
monitor_fires가 있으면 DD tier 경로, 없으면 구 peak-sell 경로(legacy).
Args:
buy_fires: buy 발화 (legacy peak-sell 경로용).
df: 3m OHLCV.
monitor_fires: monitor buy+sell (권장).
last_price: 미청산 평가가.
cg_params: legacy 파라미터.
initial_cash: 시작 현금.
fee_rate: 수수료율.
dd_large_pct: drawdown large tier 임계(%).
dd_medium_pct: drawdown medium tier 임계(%).
Returns:
portfolio summary dict.
"""
if monitor_fires is not None and not monitor_fires.empty:
return simulate_monitor_dd_tier_portfolio(
monitor_fires,
df,
last_price=last_price,
initial_cash=initial_cash,
fee_rate=fee_rate,
dd_large_pct=dd_large_pct,
dd_medium_pct=dd_medium_pct,
)
return {
"pnl_pct": 0.0,
"trade_count": 0,
"note": "monitor_fires required",
"sizing_mode": "causal_gt_hybrid",
}