""" 시뮬 sim_causal_hybrid 와 동일 체결 엔진 (build_monitor_hybrid_sized_trades). live(06) plan_live_hit: 발화 이력 → hybrid 배분 → amount_krw·수량 (인과, 현금·보유 제약). """ from __future__ import annotations from dataclasses import dataclass from typing import Any import pandas as pd from config import ( CHART_LOOKBACK_DAYS, GT_INITIAL_CASH_KRW, LIVE_HYBRID_BOOTSTRAP_FIRES, TRADING_FEE_RATE, ) from deepcoin.ground_truth.causal_gt_hybrid import build_monitor_hybrid_sized_trades from deepcoin.ground_truth.gt_allocation import resolve_sell_qty from deepcoin.ground_truth.hybrid_dd_calibrate import load_hybrid_dd_params from deepcoin.matching.load_rules import load_monitor_rules from deepcoin.paths import MATCHING_FIRE_OUTCOMES @dataclass class SimTradeResult: """단일 발화에 대한 시뮬 배분·체결 결과.""" hit: dict[str, Any] amount_krw: float sell_qty: float ok: bool message: str leg_id: int | None = None def bootstrap_monitor_signals_from_outcomes( *, end_dt: str | None = None, lookback_days: int | None = None, ) -> list[dict[str, Any]]: """ 04 fire_outcomes 에서 monitor 규칙 발화를 로드 (시뮬 all_monitor 와 동일 입력). Args: end_dt: 이 시각 이전만 포함 (None=전체). lookback_days: CHART_LOOKBACK_DAYS 대신 사용할 일수. Returns: {dt, rule_id, side, close} 리스트 (시각순). """ import pandas as pd path = MATCHING_FIRE_OUTCOMES if not path.is_file(): return [] monitor_ids = {r["rule_id"] for r in load_monitor_rules()} if not monitor_ids: return [] df = pd.read_csv(path) if df.empty or "rule_id" not in df.columns: return [] sub = df[df["rule_id"].isin(monitor_ids)].copy() if sub.empty: return [] sub["dt"] = sub["dt"].astype(str) if end_dt: sub = sub[sub["dt"] <= str(end_dt)] if lookback_days is not None and lookback_days > 0: end_ts = pd.to_datetime(sub["dt"].max()) if end_dt is None else pd.to_datetime(end_dt) start = end_ts - pd.Timedelta(days=int(lookback_days)) sub = sub[pd.to_datetime(sub["dt"]) >= start] elif lookback_days is None and CHART_LOOKBACK_DAYS > 0: end_ts = pd.to_datetime(sub["dt"].max()) start = end_ts - pd.Timedelta(days=int(CHART_LOOKBACK_DAYS)) sub = sub[pd.to_datetime(sub["dt"]) >= start] rows: list[dict[str, Any]] = [] for _, r in sub.iterrows(): rows.append( { "dt": str(r["dt"]), "rule_id": str(r["rule_id"]), "side": str(r["side"]), "close": float(r["close"]), } ) return sort_hits_sim_order(rows) def merge_signal_histories( *histories: list[dict[str, Any]], ) -> list[dict[str, Any]]: """ 발화 이력 병합 (dt+rule_id+side 기준 중복 제거, 시뮬 정렬). Args: *histories: 신호 dict 리스트들. Returns: 병합·정렬된 리스트. """ seen: set[tuple[str, str, str]] = set() merged: list[dict[str, Any]] = [] for hist in histories: for h in hist: key = hit_key(h) if key in seen: continue seen.add(key) merged.append( { "dt": key[0], "rule_id": key[1], "side": key[2], "close": float(h["close"]), } ) return sort_hits_sim_order(merged) def build_live_signal_history( persisted: list[dict[str, Any]] | None = None, *, bootstrap_fires: bool | None = None, ) -> list[dict[str, Any]]: """ 운영 hybrid 이력: fire_outcomes 부트스트랩 + live 저장분 병합. Args: persisted: live_signal_history.json signals. bootstrap_fires: fire_outcomes 부트스트랩 여부. None이면 config. Returns: sim_causal_hybrid 입력과 동일 형식의 이력. """ use_boot = ( LIVE_HYBRID_BOOTSTRAP_FIRES if bootstrap_fires is None else bootstrap_fires ) parts: list[list[dict[str, Any]]] = [] if use_boot: boot = bootstrap_monitor_signals_from_outcomes() if boot: parts.append(boot) if persisted: parts.append(persisted) if not parts: return [] return merge_signal_histories(*parts) class HybridSimPortfolio: """ hybrid 배분 결과를 현금·보유 수량에 적용 (시뮬·live plan 공용, API 미사용). """ def __init__(self) -> None: """초기 현금만 보유.""" self.cash_krw: float = float(GT_INITIAL_CASH_KRW) self.qty: float = 0.0 self.qty_by_leg: dict[int, float] = {} self.sell_leg: int | None = None self.sell_base_qty: float = 0.0 def apply_buy(self, amount_krw: float, price: float, leg_id: int) -> bool: """ 매수 체결 (가용 현금·수수료 범위). Args: amount_krw: 매수 원화. price: 체결가. leg_id: leg ID. Returns: 체결 성공 여부. """ if amount_krw <= 0 or price <= 0: return False fee = amount_krw * TRADING_FEE_RATE if self.cash_krw < amount_krw + fee: return False self.cash_krw -= amount_krw + fee bought = amount_krw / price self.qty += bought self.qty_by_leg[leg_id] = self.qty_by_leg.get(leg_id, 0.0) + bought self.sell_leg = None self.sell_base_qty = 0.0 return True def apply_sell(self, amount_krw: float, sell_qty: float, price: float, leg_id: int) -> bool: """ 매도 체결 (보유 수량 필요). Args: amount_krw: 매도 원화(총액). sell_qty: 매도 수량. price: 체결가. leg_id: leg ID. Returns: 체결 성공 여부. """ if sell_qty <= 0 or amount_krw <= 0: return False leg_qty = self.qty_by_leg.get(leg_id, 0.0) if leg_qty <= 1e-12: return False fee = amount_krw * TRADING_FEE_RATE self.cash_krw += amount_krw - fee leg_qty -= sell_qty self.qty_by_leg[leg_id] = max(leg_qty, 0.0) self.qty = max(self.qty - sell_qty, 0.0) if self.qty < 1e-12: self.qty = 0.0 if self.qty_by_leg.get(leg_id, 0.0) <= 1e-12: self.qty_by_leg.pop(leg_id, None) self.sell_leg = None self.sell_base_qty = 0.0 return True def hit_key(hit: dict[str, Any]) -> tuple[str, str, str]: """발화 고유 키 (dt, rule_id, side).""" return (str(hit["dt"]), str(hit["rule_id"]), str(hit["side"])) def sort_hits_sim_order(hits: list[dict[str, Any]]) -> list[dict[str, Any]]: """ 시뮬·allocate 순서: 시각순, 동일 시각이면 buy → sell. Args: hits: evaluate_live_rules 발화. Returns: 정렬된 리스트. """ side_rank = {"buy": 0, "sell": 1} def _key(h: dict[str, Any]) -> tuple: return (str(h["dt"]), side_rank.get(str(h["side"]), 9), str(h["rule_id"])) return sorted(hits, key=_key) def _signals_for_hybrid( signal_history: list[dict[str, Any]], *, approved_buy_rules: set[str] | None = None, ) -> list[dict[str, Any]]: """ hybrid 배분용 신호 목록. sim_causal_hybrid 와 동일하려면 approved_buy_rules=None (monitor 전체 발화). 운영에서 추가로 EV/WF 매수만 허용하려면 rule_id 집합을 넘깁니다. Args: signal_history: {dt, rule_id, side, close}. approved_buy_rules: None=필터 없음. set 이면 해당 매수 rule_id 만. Returns: 시뮬 입력 trade dict 리스트. """ out: list[dict[str, Any]] = [] for h in sort_hits_sim_order(signal_history): side = str(h["side"]) rid = str(h["rule_id"]) if ( side == "buy" and approved_buy_rules is not None and rid not in approved_buy_rules ): continue out.append( { "dt": str(h["dt"]), "side": side, "close": float(h["close"]), "rule_id": rid, } ) return out def size_monitor_signals( signal_history: list[dict[str, Any]], ohlc_df: pd.DataFrame, *, approved_buy_rules: set[str] | None = None, ) -> list[dict[str, Any]]: """ 시뮬과 동일 hybrid tier 배분 (amount_krw·weight·leg_id). Args: signal_history: 누적 발화. ohlc_df: 3m OHLC. approved_buy_rules: 매수 허용 규칙. Returns: sized trade dict 리스트 (시각순). """ rows = _signals_for_hybrid(signal_history, approved_buy_rules=approved_buy_rules) if not rows: return [] fires = pd.DataFrame(rows) dd = load_hybrid_dd_params() sized, _stats = build_monitor_hybrid_sized_trades( fires, ohlc_df, enhanced=False, initial_cash=float(GT_INITIAL_CASH_KRW), fee_rate=TRADING_FEE_RATE, dd_large_pct=dd.get("dd_large_pct"), dd_medium_pct=dd.get("dd_medium_pct"), ) return sized def replay_hybrid_signals( signal_history: list[dict[str, Any]], ohlc_df: pd.DataFrame, *, approved_buy_rules: set[str] | None = None, ) -> tuple[HybridSimPortfolio, dict[tuple[str, str, str], SimTradeResult]]: """ 신호 이력 전체를 hybrid 배분·체결 규칙으로 재생 (simulate_portfolio_steps 동일). Args: signal_history: 누적 발화. ohlc_df: 3m OHLC. approved_buy_rules: None=시뮬 동일. set 이면 매수 rule_id 필터. Returns: (portfolio, hit_key → SimTradeResult). """ sized = size_monitor_signals( signal_history, ohlc_df, approved_buy_rules=approved_buy_rules ) portfolio = HybridSimPortfolio() results: dict[tuple[str, str, str], SimTradeResult] = {} fee_rate = TRADING_FEE_RATE current_leg: int | None = None leg_budget = 0.0 for t in sorted(sized, key=lambda x: x["dt"]): action = str(t.get("action", t.get("side", ""))) price = float(t["price"]) if price <= 0: continue dt = str(t["dt"]) rid = str(t.get("rule_id", "")) leg_id = int(t.get("leg_id", 0)) weight = float(t.get("weight", 1.0)) hit = {"dt": dt, "rule_id": rid, "side": action, "close": price} key = hit_key(hit) if action == "buy": ak = t.get("amount_krw") if ak is not None and float(ak) > 0: amount = min( float(ak), max(portfolio.cash_krw / (1.0 + fee_rate), 0.0), ) else: if leg_id != current_leg: current_leg = leg_id leg_budget = portfolio.cash_krw amount = min( leg_budget * weight, max(portfolio.cash_krw / (1.0 + fee_rate), 0.0), ) if amount <= 0: results[key] = SimTradeResult( hit, 0.0, 0.0, False, "시뮬 매수 스킵(현금·tier)" ) continue fee = amount * fee_rate portfolio.cash_krw -= amount + fee bought = amount / price portfolio.qty += bought portfolio.qty_by_leg[leg_id] = ( portfolio.qty_by_leg.get(leg_id, 0.0) + bought ) portfolio.sell_leg = None portfolio.sell_base_qty = 0.0 results[key] = SimTradeResult( hit, amount, 0.0, True, f"sim_buy leg={leg_id} ₩{amount:,.0f}", leg_id=leg_id, ) continue if action == "sell" and portfolio.qty > 0: leg_qty = portfolio.qty_by_leg.get(leg_id, portfolio.qty) if portfolio.sell_leg != leg_id: portfolio.sell_leg = leg_id portfolio.sell_base_qty = leg_qty sell_qty = resolve_sell_qty( t, leg_qty, price, portfolio.sell_base_qty, weight ) if sell_qty <= 0: results[key] = SimTradeResult( hit, 0.0, 0.0, False, "시뮬 매도 스킵" ) continue gross = sell_qty * price fee = gross * fee_rate portfolio.cash_krw += gross - fee leg_qty -= sell_qty portfolio.qty_by_leg[leg_id] = max(leg_qty, 0.0) portfolio.qty = max(portfolio.qty - sell_qty, 0.0) if portfolio.qty < 1e-12: portfolio.qty = 0.0 results[key] = SimTradeResult( hit, gross, sell_qty, True, f"sim_sell qty={sell_qty:.4f} ₩{gross:,.0f}", leg_id=leg_id, ) continue results[key] = SimTradeResult(hit, 0.0, 0.0, False, "보유 없음") return portfolio, results def plan_live_hit( signal_history: list[dict[str, Any]], hit: dict[str, Any], ohlc_df: pd.DataFrame, *, approved_buy_rules: set[str] | None = None, ) -> SimTradeResult: """ live: 누적 이력 + 신규 발화 1건 — replay 와 동일 sell_qty·amount. Args: signal_history: 기존 이력(신규 hit 미포함). hit: 이번 발화. ohlc_df: 3m OHLC. approved_buy_rules: 매수 허용. Returns: SimTradeResult. """ if ohlc_df is None or getattr(ohlc_df, "empty", True): return SimTradeResult(hit, 0.0, 0.0, False, "OHLC 없음") dt, rid, side = hit_key(hit) hist = list(signal_history) if not any( str(s["dt"]) == dt and str(s["rule_id"]) == rid and str(s["side"]) == side for s in hist ): hist.append( { "dt": dt, "rule_id": rid, "side": side, "close": float(hit["close"]), } ) _, results = replay_hybrid_signals( hist, ohlc_df, approved_buy_rules=approved_buy_rules ) res = results.get((dt, rid, side)) if res is not None: return res return SimTradeResult(hit, 0.0, 0.0, False, "시뮬 배분 없음")