""" 시뮬 sim_causal_hybrid 와 동일 체결 엔진 (build_monitor_hybrid_sized_trades). dry-run·live(06) 모두 발화 이력 → hybrid 배분 → amount_krw·수량 적용. """ from __future__ import annotations from dataclasses import dataclass from typing import Any import pandas as pd from config import GT_INITIAL_CASH_KRW, TRADING_FEE_RATE from deepcoin.ground_truth.causal_gt_hybrid import build_monitor_hybrid_sized_trades from deepcoin.ground_truth.hybrid_dd_calibrate import load_hybrid_dd_params from deepcoin.ops.paper_portfolio import PaperPortfolio @dataclass class SimTradeResult: """단일 발화에 대한 시뮬 배분·체결 결과.""" hit: dict[str, Any] amount_krw: float sell_qty: float ok: bool message: str leg_id: int | None = None def hit_key(hit: dict[str, Any]) -> tuple[str, str, str]: """발화 고유 키 (dt, rule_id, side).""" return (str(hit["dt"]), str(hit["rule_id"]), str(hit["side"])) def sort_hits_sim_order(hits: list[dict[str, Any]]) -> list[dict[str, Any]]: """ 시뮬·allocate 순서: 시각순, 동일 시각이면 buy → sell. Args: hits: evaluate_live_rules 발화. Returns: 정렬된 리스트. """ side_rank = {"buy": 0, "sell": 1} def _key(h: dict[str, Any]) -> tuple: return (str(h["dt"]), side_rank.get(str(h["side"]), 9), str(h["rule_id"])) return sorted(hits, key=_key) def _signals_for_hybrid( signal_history: list[dict[str, Any]], *, approved_buy_rules: set[str] | None, ) -> list[dict[str, Any]]: """ hybrid 배분용 신호 목록 (EV/WF 미통과 매수 제외). Args: signal_history: {dt, rule_id, side, close}. approved_buy_rules: 허용 매수 rule_id. Returns: 시뮬 입력 trade dict 리스트. """ out: list[dict[str, Any]] = [] for h in sort_hits_sim_order(signal_history): side = str(h["side"]) rid = str(h["rule_id"]) if side == "buy" and approved_buy_rules is not None and rid not in approved_buy_rules: continue out.append( { "dt": str(h["dt"]), "side": side, "close": float(h["close"]), "rule_id": rid, } ) return out def size_monitor_signals( signal_history: list[dict[str, Any]], ohlc_df: pd.DataFrame, *, approved_buy_rules: set[str] | None = None, ) -> list[dict[str, Any]]: """ 시뮬과 동일 hybrid tier 배분 (amount_krw·weight·leg_id). Args: signal_history: 누적 발화. ohlc_df: 3m OHLC. approved_buy_rules: 매수 허용 규칙. Returns: sized trade dict 리스트 (시각순). """ rows = _signals_for_hybrid(signal_history, approved_buy_rules=approved_buy_rules) if not rows: return [] fires = pd.DataFrame(rows) dd = load_hybrid_dd_params() sized, _stats = build_monitor_hybrid_sized_trades( fires, ohlc_df, enhanced=False, initial_cash=float(GT_INITIAL_CASH_KRW), fee_rate=TRADING_FEE_RATE, dd_large_pct=dd.get("dd_large_pct"), dd_medium_pct=dd.get("dd_medium_pct"), ) return sized def _find_sized_trade(sized: list[dict[str, Any]], hit: dict[str, Any]) -> dict[str, Any] | None: """sized 목록에서 발화 1건 조회.""" dt, rid, side = hit_key(hit) for t in sized: action = str(t.get("action", t.get("side", ""))) if str(t.get("dt")) == dt and str(t.get("rule_id", "")) == rid and action == side: return t return None def replay_paper_portfolio( signal_history: list[dict[str, Any]], ohlc_df: pd.DataFrame, *, approved_buy_rules: set[str] | None = None, ) -> tuple[PaperPortfolio, dict[tuple[str, str, str], SimTradeResult]]: """ 신호 이력 전체를 시뮬 엔진으로 재생 → 모의 계좌(GT_INITIAL_CASH_KRW) 상태. Args: signal_history: Phase C 누적 발화. ohlc_df: 3m OHLC. approved_buy_rules: EV/WF 통과 매수 규칙. Returns: (portfolio, hit_key → SimTradeResult). """ sized = size_monitor_signals( signal_history, ohlc_df, approved_buy_rules=approved_buy_rules ) paper = PaperPortfolio() paper.cash_krw = float(GT_INITIAL_CASH_KRW) paper.qty = 0.0 paper.qty_by_leg = {} results: dict[tuple[str, str, str], SimTradeResult] = {} leg_sell_idxs: dict[int, list[int]] = {} for i, t in enumerate(sized): lid = int(t.get("leg_id", 0)) if str(t.get("action", t.get("side"))) == "sell": leg_sell_idxs.setdefault(lid, []).append(i) sell_leg: int | None = None sell_base_qty = 0.0 for i, t in enumerate(sized): side = str(t.get("action", t.get("side", ""))) price = float(t["price"]) dt = str(t["dt"]) rid = str(t.get("rule_id", "")) leg_id = int(t.get("leg_id", 0)) hit = {"dt": dt, "rule_id": rid, "side": side, "close": price} key = hit_key(hit) amount = float(t.get("amount_krw") or 0) if side == "buy": if amount <= 0: results[key] = SimTradeResult( hit, 0.0, 0.0, False, "시뮬 매수 스킵(현금·tier)" ) continue ok = paper.apply_buy(amount, price, leg_id) msg = f"paper_buy sim leg={leg_id} ₩{amount:,.0f}" if ok else "paper_buy 실패" results[key] = SimTradeResult( hit, amount, 0.0, ok, msg, leg_id=leg_id ) sell_leg = None continue leg_qty = paper.qty_by_leg.get(leg_id, 0.0) if leg_qty <= 1e-12: results[key] = SimTradeResult(hit, 0.0, 0.0, False, "모의 보유 없음") continue if amount <= 0: results[key] = SimTradeResult(hit, 0.0, 0.0, False, "시뮬 매도 스킵") continue if sell_leg != leg_id: sell_leg = leg_id sell_base_qty = leg_qty rem = [j for j in leg_sell_idxs.get(leg_id, []) if j >= i] is_last = bool(rem) and i == rem[-1] sell_qty = leg_qty if is_last else amount / price if price > 0 else 0.0 ok = paper.apply_sell(amount, sell_qty, price, leg_id) msg = f"paper_sell sim qty={sell_qty:.4f} ₩{amount:,.0f}" if ok else "paper_sell 실패" results[key] = SimTradeResult( hit, amount, sell_qty, ok, msg, leg_id=leg_id ) return paper, results def plan_live_hit( signal_history: list[dict[str, Any]], hit: dict[str, Any], ohlc_df: pd.DataFrame, *, approved_buy_rules: set[str] | None = None, ) -> SimTradeResult: """ live: 누적 이력 + 신규 발화 1건 — replay 와 동일 sell_qty·amount. Args: signal_history: 기존 이력(신규 hit 미포함). hit: 이번 발화. ohlc_df: 3m OHLC. approved_buy_rules: 매수 허용. Returns: SimTradeResult (dry-run replay_paper_portfolio 와 동일). """ if ohlc_df is None or getattr(ohlc_df, "empty", True): return SimTradeResult(hit, 0.0, 0.0, False, "OHLC 없음") dt, rid, side = hit_key(hit) hist = list(signal_history) if not any( str(s["dt"]) == dt and str(s["rule_id"]) == rid and str(s["side"]) == side for s in hist ): hist.append( { "dt": dt, "rule_id": rid, "side": side, "close": float(hit["close"]), } ) _, results = replay_paper_portfolio( hist, ohlc_df, approved_buy_rules=approved_buy_rules ) res = results.get((dt, rid, side)) if res is not None: return res return SimTradeResult(hit, 0.0, 0.0, False, "시뮬 배분 없음")