Files
Bithumb/deepcoin/ground_truth/causal_gt_trades.py
xavis d385456867 hybrid DD tier와 Option C 2차(+1000%) 검증을 추가하고 실거래 사이징을 정합한다.
인과 GT leg 엔진·drawdown tier·train 캘리브레이션, Phase 2 Go/No-Go 및 시뮬 리포트를 반영한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-01 16:09:28 +09:00

434 lines
13 KiB
Python

"""
인과적 GT leg 타점 생성 — t 시점까지 데이터만 사용.
GT split_buy_peak_sell 과 동일 구조(분할매수·65/35 매도·leg_id)이나
피벗·leg 종료는 gt_signal_causal 확정 신호만 사용합니다.
"""
from __future__ import annotations
from typing import Any, Literal
import pandas as pd
from config import (
GT_BUY_MIN_BARS,
GT_BUY_MIN_SWING_PCT,
GT_MAX_BUYS_PER_LEG,
GT_MAX_SELLS_PER_LEG,
GT_MIN_SWING_PCT,
GT_PIVOT_ORDER,
GT_SELL_SPLIT_GAP_PCT,
)
from deepcoin.ground_truth.gt_model import leg_entry_weights, leg_exit_weights
from deepcoin.ground_truth.gt_signal_causal import enrich_scan_frame_gt_signals_causal
PeakMode = Literal["zigzag", "local"]
def _collect_causal_buy_bars(
frame: pd.DataFrame,
start: pd.Timestamp,
end: pd.Timestamp,
*,
min_bars: int,
max_buys: int,
use_local_trough: bool,
bb_max: float,
) -> list[tuple[pd.Timestamp, float]]:
"""
leg 구간 (start, end) 내 인과적 매수 후보 봉.
Args:
frame: gt_buy_signal 등 포함.
start: 이전 매도 시각(미포함).
end: leg 종료 peak 시각(포함).
min_bars: 분할 매수 최소 간격.
max_buys: leg당 최대 매수.
use_local_trough: True면 gt_trough_local+BB, False면 gt_buy_signal.
bb_max: BB %B 상한.
Returns:
(dt, low_price) 리스트 (시간순).
"""
seg = frame[(frame.index > start) & (frame.index <= end)]
if seg.empty:
return []
if use_local_trough:
bb = pd.to_numeric(seg.get("bb_pos"), errors="coerce")
mask = (seg["gt_trough_local"] == 1) & (bb <= bb_max)
else:
mask = seg["gt_buy_signal"] == 1
cands: list[tuple[pd.Timestamp, float, int]] = []
for ts, row in seg[mask].iterrows():
price = float(row["Low"]) if "Low" in row else float(row.get("close", 0))
if price <= 0:
continue
idx = frame.index.get_loc(ts)
if isinstance(idx, slice):
idx = int(idx.start or 0)
cands.append((ts, price, int(idx)))
cands.sort(key=lambda x: x[0])
filtered: list[tuple[pd.Timestamp, float, int]] = []
for ts, price, idx in cands:
if filtered and idx - filtered[-1][2] < min_bars:
if price < filtered[-1][1]:
filtered[-1] = (ts, price, idx)
continue
filtered.append((ts, price, idx))
if len(filtered) > max_buys:
filtered.sort(key=lambda x: x[1])
filtered = sorted(filtered[:max_buys], key=lambda x: x[0])
return [(ts, price) for ts, price, _ in filtered]
def _causal_sell_points(
frame: pd.DataFrame,
peak_ts: pd.Timestamp,
max_splits: int,
*,
peak_signal_col: str = "gt_peak_zigzag",
) -> list[tuple[pd.Timestamp, float, float]]:
"""
인과적 매도: peak 확정봉 + (선택) 직후 확정 peak 1건 분할.
Args:
frame: OHLC + gt peak 컬럼.
peak_ts: leg 종료 peak 시각.
max_splits: 최대 분할(2).
peak_signal_col: 두 번째 분할 탐색 컬럼.
Returns:
(dt, high_price, weight) 리스트.
"""
if peak_ts not in frame.index:
return []
row = frame.loc[peak_ts]
if isinstance(row, pd.DataFrame):
row = row.iloc[-1]
main_price = float(row["High"]) if "High" in row else float(row.get("close", 0))
weights = leg_exit_weights(max_splits if max_splits >= 2 else 1)
if max_splits < 2 or len(weights) < 2:
return [(peak_ts, main_price, 1.0)]
peak_idx = frame.index.get_loc(peak_ts)
if isinstance(peak_idx, slice):
peak_idx = int(peak_idx.start or 0)
seg = frame.iloc[peak_idx + 1 : peak_idx + 81]
second_ts: pd.Timestamp | None = None
second_price = main_price
for ts, srow in seg.iterrows():
if int(srow.get(peak_signal_col, 0)) != 1:
continue
px = float(srow["High"]) if "High" in srow else float(srow.get("close", 0))
gap = abs(px - main_price) / max(main_price, 1e-9) * 100.0
if gap <= GT_SELL_SPLIT_GAP_PCT:
second_ts = ts
second_price = px
break
if second_ts is None:
return [(peak_ts, main_price, 1.0)]
return [
(peak_ts, main_price, weights[0]),
(second_ts, second_price, weights[1]),
]
def _peak_signal_column(peak_mode: PeakMode) -> str:
"""leg 종료 peak 컬럼명."""
return "gt_peak_local" if peak_mode == "local" else "gt_peak_zigzag"
def _filter_peak_times(
frame: pd.DataFrame,
peak_col: str,
min_bars: int,
) -> list[pd.Timestamp]:
"""
peak 후보를 min_bars 간격으로稀疏화 (인과적, 시간순).
Args:
frame: OHLC frame.
peak_col: peak 신호 컬럼.
min_bars: 최소 봉 간격.
Returns:
peak 타임스탬프 리스트.
"""
peaks = frame.index[frame[peak_col] == 1]
if len(peaks) == 0:
return []
kept: list[pd.Timestamp] = []
last_idx = -min_bars
for ts in peaks:
idx = frame.index.get_loc(ts)
if isinstance(idx, slice):
idx = int(idx.start or 0)
if idx - last_idx >= min_bars:
kept.append(ts)
last_idx = int(idx)
return kept
def _precompute_buy_candidates(
frame: pd.DataFrame,
*,
use_local_trough: bool,
bb_max: float,
) -> list[tuple[int, pd.Timestamp, float]]:
"""
전구간 매수 후보 (bar_idx, ts, price).
Args:
frame: enriched frame.
use_local_trough: local trough vs zigzag buy.
bb_max: BB 상한.
Returns:
(idx, ts, price) 리스트.
"""
if use_local_trough:
bb = pd.to_numeric(frame.get("bb_pos"), errors="coerce")
mask = (frame["gt_trough_local"] == 1) & (bb <= bb_max)
else:
mask = frame["gt_buy_signal"] == 1
out: list[tuple[int, pd.Timestamp, float]] = []
for ts in frame.index[mask]:
row = frame.loc[ts]
if isinstance(row, pd.DataFrame):
row = row.iloc[-1]
price = float(row["Low"]) if "Low" in row else float(row.get("close", 0))
if price <= 0:
continue
idx = frame.index.get_loc(ts)
if isinstance(idx, slice):
idx = int(idx.start or 0)
out.append((int(idx), ts, price))
return out
def _buys_in_range(
candidates: list[tuple[int, pd.Timestamp, float]],
start_idx: int,
end_idx: int,
*,
min_bars: int,
max_buys: int,
) -> list[tuple[pd.Timestamp, float]]:
"""start_idx < bar_idx <= end_idx 구간 매수 후보 (min_bars·max_buys 적용)."""
seg = [(i, ts, p) for i, ts, p in candidates if start_idx < i <= end_idx]
if not seg:
return []
filtered: list[tuple[int, pd.Timestamp, float]] = []
for i, ts, p in seg:
if filtered and i - filtered[-1][0] < min_bars:
if p < filtered[-1][2]:
filtered[-1] = (i, ts, p)
continue
filtered.append((i, ts, p))
if len(filtered) > max_buys:
filtered.sort(key=lambda x: x[2])
filtered = sorted(filtered[:max_buys], key=lambda x: x[0])
return [(ts, p) for _, ts, p in filtered]
def build_causal_split_buy_peak_sell_trades(
df: pd.DataFrame,
*,
pivot_order: int = GT_PIVOT_ORDER,
buy_swing_pct: float = GT_BUY_MIN_SWING_PCT,
sell_swing_pct: float = GT_MIN_SWING_PCT,
bb_max: float = 0.65,
min_leg_pct: float = GT_MIN_SWING_PCT,
buy_min_bars: int = GT_BUY_MIN_BARS,
max_buys: int = GT_MAX_BUYS_PER_LEG,
max_sells: int = GT_MAX_SELLS_PER_LEG,
use_local_trough: bool = True,
peak_mode: PeakMode = "local",
min_bars_between_legs: int = 60,
) -> list[dict[str, Any]]:
"""
인과적 split_buy_peak_sell trade dict 리스트.
Args:
df: 3m OHLCV+bb_pos (DatetimeIndex).
pivot_order: 피벗 확정 지연.
buy_swing_pct: 매수 ZigZag %.
sell_swing_pct: 매도 ZigZag %.
bb_max: BB %B 상한.
min_leg_pct: leg 최소 수익률(%).
buy_min_bars: 분할 매수 간격.
max_buys: leg당 매수 상한.
max_sells: leg당 매도 상한.
use_local_trough: local trough 분할매수 사용.
peak_mode: zigzag | local (leg 종료 peak).
min_bars_between_legs: 연속 leg 종료 최소 간격(봉).
Returns:
{dt, action, price, weight, leg_id} dict 리스트.
"""
frame = enrich_scan_frame_gt_signals_causal(
df,
pivot_order=pivot_order,
buy_swing_pct=buy_swing_pct,
sell_swing_pct=sell_swing_pct,
bb_max=bb_max,
)
peak_col = _peak_signal_column(peak_mode)
if peak_col not in frame.columns:
return []
peak_times = _filter_peak_times(frame, peak_col, min_bars_between_legs)
if not peak_times:
return []
buy_candidates = _precompute_buy_candidates(
frame,
use_local_trough=use_local_trough,
bb_max=bb_max,
)
start_idx = 0
if frame.index.size:
loc = frame.index.get_loc(frame.index[0])
start_idx = int(loc.start or 0) if isinstance(loc, slice) else int(loc)
peak_signal_col = peak_col
trades: list[dict[str, Any]] = []
prev_sell_idx = start_idx
leg_id = 0
leg_trough_price = 0.0
for peak_ts in peak_times:
peak_idx = frame.index.get_loc(peak_ts)
if isinstance(peak_idx, slice):
peak_idx = int(peak_idx.start or 0)
if peak_idx - prev_sell_idx < min_bars_between_legs:
continue
prow = frame.loc[peak_ts]
if isinstance(prow, pd.DataFrame):
prow = prow.iloc[-1]
peak_price = float(prow["High"]) if "High" in prow else float(prow.get("close", 0))
seg = frame.iloc[prev_sell_idx + 1 : peak_idx + 1]
if not seg.empty and "Low" in seg.columns:
leg_trough_price = float(seg["Low"].astype(float).min())
leg_pct = (
(peak_price - leg_trough_price) / max(leg_trough_price, 1e-9) * 100.0
if leg_trough_price > 0
else 0.0
)
if leg_pct < min_leg_pct:
continue
buys = _buys_in_range(
buy_candidates,
prev_sell_idx,
int(peak_idx),
min_bars=buy_min_bars,
max_buys=max_buys,
)
if not buys:
prev_sell_idx = int(peak_idx)
leg_trough_price = peak_price
continue
prices = [p for _, p in buys]
weights = leg_entry_weights(prices)
for (dt, price), w in zip(buys, weights):
trades.append(
{
"dt": dt.strftime("%Y-%m-%d %H:%M:%S"),
"action": "buy",
"price": round(price, 2),
"weight": round(w, 4),
"leg_id": leg_id,
}
)
sell_pts = _causal_sell_points(
frame,
peak_ts,
max_sells,
peak_signal_col=peak_signal_col,
)
for dt, price, w in sell_pts[:max_sells]:
trades.append(
{
"dt": dt.strftime("%Y-%m-%d %H:%M:%S"),
"action": "sell",
"price": round(price, 2),
"weight": round(w, 4),
"leg_id": leg_id,
}
)
prev_sell_idx = int(peak_idx)
leg_trough_price = peak_price
leg_id += 1
return trades
def simulate_causal_gt_portfolio(
df: pd.DataFrame,
*,
last_price: float | None = None,
**build_kw: Any,
) -> dict[str, Any]:
"""
인과 GT 타점 + causal tier 복리 포트폴리오.
Args:
df: 3m OHLCV.
last_price: 미청산 평가 종가.
build_kw: build_causal_split_buy_peak_sell_trades 인자.
Returns:
simulate_portfolio_summary 형식 dict + leg_count, params.
"""
from deepcoin.ground_truth.gt_allocation import (
allocate_order_amounts_chronological,
simulate_portfolio_summary,
)
raw = build_causal_split_buy_peak_sell_trades(df, **build_kw)
if not raw:
return {
"pnl_pct": 0.0,
"trade_count": 0,
"leg_count": 0,
"note": "no trades",
"sizing_mode": "causal_gt_leg_engine",
}
sized, alloc_stats = allocate_order_amounts_chronological(raw, causal_tier=True)
mark = last_price
if mark is None and "close" in df.columns:
mark = float(df["close"].iloc[-1])
result = simulate_portfolio_summary(
sized,
last_price=mark,
use_amount_krw=True,
)
leg_count = len({t.get("leg_id") for t in raw})
result["leg_count"] = leg_count
result["sizing_mode"] = "causal_gt_leg_engine"
result["sizing_note"] = (
"인과 GT leg: split_buy + peak_sell, causal tier 복리 (미래 미사용)"
)
result["causal_gt_params"] = dict(build_kw)
result["alloc_stats"] = alloc_stats
return result