""" 인과적 GT leg 타점 생성 — t 시점까지 데이터만 사용. GT split_buy_peak_sell 과 동일 구조(분할매수·65/35 매도·leg_id)이나 피벗·leg 종료는 gt_signal_causal 확정 신호만 사용합니다. """ from __future__ import annotations from typing import Any, Literal import pandas as pd from config import ( GT_BUY_MIN_BARS, GT_BUY_MIN_SWING_PCT, GT_MAX_BUYS_PER_LEG, GT_MAX_SELLS_PER_LEG, GT_MIN_SWING_PCT, GT_PIVOT_ORDER, GT_SELL_SPLIT_GAP_PCT, ) from deepcoin.ground_truth.gt_model import leg_entry_weights, leg_exit_weights from deepcoin.ground_truth.gt_signal_causal import enrich_scan_frame_gt_signals_causal PeakMode = Literal["zigzag", "local"] def _collect_causal_buy_bars( frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp, *, min_bars: int, max_buys: int, use_local_trough: bool, bb_max: float, ) -> list[tuple[pd.Timestamp, float]]: """ leg 구간 (start, end) 내 인과적 매수 후보 봉. Args: frame: gt_buy_signal 등 포함. start: 이전 매도 시각(미포함). end: leg 종료 peak 시각(포함). min_bars: 분할 매수 최소 간격. max_buys: leg당 최대 매수. use_local_trough: True면 gt_trough_local+BB, False면 gt_buy_signal. bb_max: BB %B 상한. Returns: (dt, low_price) 리스트 (시간순). """ seg = frame[(frame.index > start) & (frame.index <= end)] if seg.empty: return [] if use_local_trough: bb = pd.to_numeric(seg.get("bb_pos"), errors="coerce") mask = (seg["gt_trough_local"] == 1) & (bb <= bb_max) else: mask = seg["gt_buy_signal"] == 1 cands: list[tuple[pd.Timestamp, float, int]] = [] for ts, row in seg[mask].iterrows(): price = float(row["Low"]) if "Low" in row else float(row.get("close", 0)) if price <= 0: continue idx = frame.index.get_loc(ts) if isinstance(idx, slice): idx = int(idx.start or 0) cands.append((ts, price, int(idx))) cands.sort(key=lambda x: x[0]) filtered: list[tuple[pd.Timestamp, float, int]] = [] for ts, price, idx in cands: if filtered and idx - filtered[-1][2] < min_bars: if price < filtered[-1][1]: filtered[-1] = (ts, price, idx) continue filtered.append((ts, price, idx)) if len(filtered) > max_buys: filtered.sort(key=lambda x: x[1]) filtered = sorted(filtered[:max_buys], key=lambda x: x[0]) return [(ts, price) for ts, price, _ in filtered] def _causal_sell_points( frame: pd.DataFrame, peak_ts: pd.Timestamp, max_splits: int, *, peak_signal_col: str = "gt_peak_zigzag", ) -> list[tuple[pd.Timestamp, float, float]]: """ 인과적 매도: peak 확정봉 + (선택) 직후 확정 peak 1건 분할. Args: frame: OHLC + gt peak 컬럼. peak_ts: leg 종료 peak 시각. max_splits: 최대 분할(2). peak_signal_col: 두 번째 분할 탐색 컬럼. Returns: (dt, high_price, weight) 리스트. """ if peak_ts not in frame.index: return [] row = frame.loc[peak_ts] if isinstance(row, pd.DataFrame): row = row.iloc[-1] main_price = float(row["High"]) if "High" in row else float(row.get("close", 0)) weights = leg_exit_weights(max_splits if max_splits >= 2 else 1) if max_splits < 2 or len(weights) < 2: return [(peak_ts, main_price, 1.0)] peak_idx = frame.index.get_loc(peak_ts) if isinstance(peak_idx, slice): peak_idx = int(peak_idx.start or 0) seg = frame.iloc[peak_idx + 1 : peak_idx + 81] second_ts: pd.Timestamp | None = None second_price = main_price for ts, srow in seg.iterrows(): if int(srow.get(peak_signal_col, 0)) != 1: continue px = float(srow["High"]) if "High" in srow else float(srow.get("close", 0)) gap = abs(px - main_price) / max(main_price, 1e-9) * 100.0 if gap <= GT_SELL_SPLIT_GAP_PCT: second_ts = ts second_price = px break if second_ts is None: return [(peak_ts, main_price, 1.0)] return [ (peak_ts, main_price, weights[0]), (second_ts, second_price, weights[1]), ] def _peak_signal_column(peak_mode: PeakMode) -> str: """leg 종료 peak 컬럼명.""" return "gt_peak_local" if peak_mode == "local" else "gt_peak_zigzag" def _filter_peak_times( frame: pd.DataFrame, peak_col: str, min_bars: int, ) -> list[pd.Timestamp]: """ peak 후보를 min_bars 간격으로稀疏화 (인과적, 시간순). Args: frame: OHLC frame. peak_col: peak 신호 컬럼. min_bars: 최소 봉 간격. Returns: peak 타임스탬프 리스트. """ peaks = frame.index[frame[peak_col] == 1] if len(peaks) == 0: return [] kept: list[pd.Timestamp] = [] last_idx = -min_bars for ts in peaks: idx = frame.index.get_loc(ts) if isinstance(idx, slice): idx = int(idx.start or 0) if idx - last_idx >= min_bars: kept.append(ts) last_idx = int(idx) return kept def _precompute_buy_candidates( frame: pd.DataFrame, *, use_local_trough: bool, bb_max: float, ) -> list[tuple[int, pd.Timestamp, float]]: """ 전구간 매수 후보 (bar_idx, ts, price). Args: frame: enriched frame. use_local_trough: local trough vs zigzag buy. bb_max: BB 상한. Returns: (idx, ts, price) 리스트. """ if use_local_trough: bb = pd.to_numeric(frame.get("bb_pos"), errors="coerce") mask = (frame["gt_trough_local"] == 1) & (bb <= bb_max) else: mask = frame["gt_buy_signal"] == 1 out: list[tuple[int, pd.Timestamp, float]] = [] for ts in frame.index[mask]: row = frame.loc[ts] if isinstance(row, pd.DataFrame): row = row.iloc[-1] price = float(row["Low"]) if "Low" in row else float(row.get("close", 0)) if price <= 0: continue idx = frame.index.get_loc(ts) if isinstance(idx, slice): idx = int(idx.start or 0) out.append((int(idx), ts, price)) return out def _buys_in_range( candidates: list[tuple[int, pd.Timestamp, float]], start_idx: int, end_idx: int, *, min_bars: int, max_buys: int, ) -> list[tuple[pd.Timestamp, float]]: """start_idx < bar_idx <= end_idx 구간 매수 후보 (min_bars·max_buys 적용).""" seg = [(i, ts, p) for i, ts, p in candidates if start_idx < i <= end_idx] if not seg: return [] filtered: list[tuple[int, pd.Timestamp, float]] = [] for i, ts, p in seg: if filtered and i - filtered[-1][0] < min_bars: if p < filtered[-1][2]: filtered[-1] = (i, ts, p) continue filtered.append((i, ts, p)) if len(filtered) > max_buys: filtered.sort(key=lambda x: x[2]) filtered = sorted(filtered[:max_buys], key=lambda x: x[0]) return [(ts, p) for _, ts, p in filtered] def build_causal_split_buy_peak_sell_trades( df: pd.DataFrame, *, pivot_order: int = GT_PIVOT_ORDER, buy_swing_pct: float = GT_BUY_MIN_SWING_PCT, sell_swing_pct: float = GT_MIN_SWING_PCT, bb_max: float = 0.65, min_leg_pct: float = GT_MIN_SWING_PCT, buy_min_bars: int = GT_BUY_MIN_BARS, max_buys: int = GT_MAX_BUYS_PER_LEG, max_sells: int = GT_MAX_SELLS_PER_LEG, use_local_trough: bool = True, peak_mode: PeakMode = "local", min_bars_between_legs: int = 60, ) -> list[dict[str, Any]]: """ 인과적 split_buy_peak_sell trade dict 리스트. Args: df: 3m OHLCV+bb_pos (DatetimeIndex). pivot_order: 피벗 확정 지연. buy_swing_pct: 매수 ZigZag %. sell_swing_pct: 매도 ZigZag %. bb_max: BB %B 상한. min_leg_pct: leg 최소 수익률(%). buy_min_bars: 분할 매수 간격. max_buys: leg당 매수 상한. max_sells: leg당 매도 상한. use_local_trough: local trough 분할매수 사용. peak_mode: zigzag | local (leg 종료 peak). min_bars_between_legs: 연속 leg 종료 최소 간격(봉). Returns: {dt, action, price, weight, leg_id} dict 리스트. """ frame = enrich_scan_frame_gt_signals_causal( df, pivot_order=pivot_order, buy_swing_pct=buy_swing_pct, sell_swing_pct=sell_swing_pct, bb_max=bb_max, ) peak_col = _peak_signal_column(peak_mode) if peak_col not in frame.columns: return [] peak_times = _filter_peak_times(frame, peak_col, min_bars_between_legs) if not peak_times: return [] buy_candidates = _precompute_buy_candidates( frame, use_local_trough=use_local_trough, bb_max=bb_max, ) start_idx = 0 if frame.index.size: loc = frame.index.get_loc(frame.index[0]) start_idx = int(loc.start or 0) if isinstance(loc, slice) else int(loc) peak_signal_col = peak_col trades: list[dict[str, Any]] = [] prev_sell_idx = start_idx leg_id = 0 leg_trough_price = 0.0 for peak_ts in peak_times: peak_idx = frame.index.get_loc(peak_ts) if isinstance(peak_idx, slice): peak_idx = int(peak_idx.start or 0) if peak_idx - prev_sell_idx < min_bars_between_legs: continue prow = frame.loc[peak_ts] if isinstance(prow, pd.DataFrame): prow = prow.iloc[-1] peak_price = float(prow["High"]) if "High" in prow else float(prow.get("close", 0)) seg = frame.iloc[prev_sell_idx + 1 : peak_idx + 1] if not seg.empty and "Low" in seg.columns: leg_trough_price = float(seg["Low"].astype(float).min()) leg_pct = ( (peak_price - leg_trough_price) / max(leg_trough_price, 1e-9) * 100.0 if leg_trough_price > 0 else 0.0 ) if leg_pct < min_leg_pct: continue buys = _buys_in_range( buy_candidates, prev_sell_idx, int(peak_idx), min_bars=buy_min_bars, max_buys=max_buys, ) if not buys: prev_sell_idx = int(peak_idx) leg_trough_price = peak_price continue prices = [p for _, p in buys] weights = leg_entry_weights(prices) for (dt, price), w in zip(buys, weights): trades.append( { "dt": dt.strftime("%Y-%m-%d %H:%M:%S"), "action": "buy", "price": round(price, 2), "weight": round(w, 4), "leg_id": leg_id, } ) sell_pts = _causal_sell_points( frame, peak_ts, max_sells, peak_signal_col=peak_signal_col, ) for dt, price, w in sell_pts[:max_sells]: trades.append( { "dt": dt.strftime("%Y-%m-%d %H:%M:%S"), "action": "sell", "price": round(price, 2), "weight": round(w, 4), "leg_id": leg_id, } ) prev_sell_idx = int(peak_idx) leg_trough_price = peak_price leg_id += 1 return trades def simulate_causal_gt_portfolio( df: pd.DataFrame, *, last_price: float | None = None, **build_kw: Any, ) -> dict[str, Any]: """ 인과 GT 타점 + causal tier 복리 포트폴리오. Args: df: 3m OHLCV. last_price: 미청산 평가 종가. build_kw: build_causal_split_buy_peak_sell_trades 인자. Returns: simulate_portfolio_summary 형식 dict + leg_count, params. """ from deepcoin.ground_truth.gt_allocation import ( allocate_order_amounts_chronological, simulate_portfolio_summary, ) raw = build_causal_split_buy_peak_sell_trades(df, **build_kw) if not raw: return { "pnl_pct": 0.0, "trade_count": 0, "leg_count": 0, "note": "no trades", "sizing_mode": "causal_gt_leg_engine", } sized, alloc_stats = allocate_order_amounts_chronological(raw, causal_tier=True) mark = last_price if mark is None and "close" in df.columns: mark = float(df["close"].iloc[-1]) result = simulate_portfolio_summary( sized, last_price=mark, use_amount_krw=True, ) leg_count = len({t.get("leg_id") for t in raw}) result["leg_count"] = leg_count result["sizing_mode"] = "causal_gt_leg_engine" result["sizing_note"] = ( "인과 GT leg: split_buy + peak_sell, causal tier 복리 (미래 미사용)" ) result["causal_gt_params"] = dict(build_kw) result["alloc_stats"] = alloc_stats return result