""" 04-4: EV·리스크 필터로 최종 규칙 선별 및 리포트 생성. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd from config import ( MATCH_BEST_EFFORT_PER_SIDE, MATCH_GT_TOLERANCE_MIN, MATCH_HOLDOUT_RATIO, MATCH_KIND_PRIORITY, MATCH_LABEL_MODE, MATCH_MAX_RULES_PER_SIDE, MATCH_MAX_VALID_FIRE_RATE, MATCH_MIN_EV_VALID, MATCH_MIN_FIRES, MATCH_MIN_FIRES_HOLDOUT, MATCH_MIN_PROFIT_FACTOR, MATCH_MONITOR_MAX_PER_SIDE, MATCH_TRAIN_RATIO, ) from deepcoin.ground_truth.ground_truth import load_ground_truth from deepcoin.paths import resolve_ground_truth_file def _split_train_valid_holdout(df: pd.DataFrame, dt_col: str = "dt") -> pd.Series: """ 시계열 3분할: train / valid / holdout(최근 MATCH_HOLDOUT_RATIO). Args: df: fire_outcomes. dt_col: 시각 컬럼. Returns: 'train' | 'valid' | 'holdout' Series. """ ts = pd.to_datetime(df[dt_col]) holdout_start = ts.quantile(1.0 - MATCH_HOLDOUT_RATIO) in_sample = ts <= holdout_start cutoff = ( ts[in_sample].quantile(MATCH_TRAIN_RATIO) if in_sample.any() else ts.quantile(MATCH_TRAIN_RATIO) ) split = np.where( in_sample, np.where(ts <= cutoff, "train", "valid"), "holdout", ) return pd.Series(split, index=df.index) def _kind_rank(kind: str) -> int: """kind 우선순위 (작을수록 우선).""" try: return MATCH_KIND_PRIORITY.index(kind) except ValueError: return len(MATCH_KIND_PRIORITY) def _rule_metrics(sub: pd.DataFrame) -> dict[str, float | int]: """ 규칙·구간별 집계 지표. Args: sub: fire_outcomes 부분집합. Returns: count, ev, win_rate, profit_factor. """ if sub.empty: return {"count": 0, "ev_pct": 0.0, "win_rate": 0.0, "profit_factor": 0.0} r = sub["forward_ret_pct"] wins = r[r > 0] losses = r[r <= 0] pf = ( float(wins.sum() / abs(losses.sum())) if len(losses) and losses.sum() != 0 else float(wins.sum()) if len(wins) else 0.0 ) return { "count": int(len(sub)), "ev_pct": round(float(r.mean()), 4), "win_rate": round(float((r > 0).mean()), 4), "profit_factor": round(pf, 4), } def gt_overlap_report( fires: pd.DataFrame, gt_trades: list[dict[str, Any]], tolerance_min: int = MATCH_GT_TOLERANCE_MIN, ) -> dict[str, Any]: """ GT 타점이 규칙 발화와 ±tolerance 내 겹치는 비율을 계산합니다. Args: fires: rule_fires. gt_trades: ground truth trades. tolerance_min: 분 단위 허용. Returns: side별 recall dict. """ tol = pd.Timedelta(minutes=tolerance_min) report: dict[str, Any] = {} for side in ("buy", "sell"): gt_side = [t for t in gt_trades if t.get("action") == side] f_side = fires[fires["side"] == side] if not fires.empty else pd.DataFrame() if not gt_side or f_side.empty: report[side] = {"gt_count": len(gt_side), "matched": 0, "recall": 0.0} continue fire_ts = pd.to_datetime(f_side["dt"]).sort_values() matched = 0 for t in gt_side: gts = pd.Timestamp(t["dt"]) delta = (fire_ts - gts).abs() if (delta <= tol).any(): matched += 1 report[side] = { "gt_count": len(gt_side), "matched": matched, "recall": round(matched / len(gt_side), 4) if gt_side else 0.0, } return report def select_matched_rules( outcomes: pd.DataFrame, candidates: dict[str, Any], gt_path: Path | None = None, ) -> dict[str, Any]: """ valid 구간 EV·PF 기준으로 규칙을 선별합니다. Args: outcomes: fire_outcomes. candidates: rule_candidates dict. gt_path: ground truth JSON. Returns: matched_rules + summaries. """ if outcomes.empty: return {"selected": [], "rejected": [], "note": "발화 없음"} outcomes = outcomes.copy() outcomes["split"] = _split_train_valid_holdout(outcomes) valid_dt = pd.to_datetime(outcomes.loc[outcomes["split"] == "valid", "dt"]) valid_bars = max( int((valid_dt.max() - valid_dt.min()).total_seconds() / 180) + 1, 1 ) if len(valid_dt) > 1 else 1 gt_file = gt_path or resolve_ground_truth_file() gt_data = load_ground_truth(gt_file) or {} gt_trades = gt_data.get("trades") or [] summaries: list[dict[str, Any]] = [] for rule in candidates.get("rules", []): rid = rule["rule_id"] sub = outcomes[outcomes["rule_id"] == rid] train = sub[sub["split"] == "train"] valid = sub[sub["split"] == "valid"] holdout = sub[sub["split"] == "holdout"] m_all = _rule_metrics(sub) m_train = _rule_metrics(train) m_valid = _rule_metrics(valid) m_holdout = _rule_metrics(holdout) fire_rate = m_valid["count"] / valid_bars if valid_bars else 1.0 pass_valid = ( m_valid["count"] >= MATCH_MIN_FIRES and m_valid["ev_pct"] >= MATCH_MIN_EV_VALID and m_valid["profit_factor"] >= MATCH_MIN_PROFIT_FACTOR and fire_rate <= MATCH_MAX_VALID_FIRE_RATE ) pass_holdout = ( m_holdout["count"] >= MATCH_MIN_FIRES_HOLDOUT and m_holdout["ev_pct"] >= MATCH_MIN_EV_VALID and m_holdout["profit_factor"] >= MATCH_MIN_PROFIT_FACTOR ) summaries.append( { "rule_id": rid, "side": rule["side"], "kind": rule.get("kind", ""), "conditions": rule["conditions"], "valid_fire_rate": round(fire_rate, 4), "metrics": { "all": m_all, "train": m_train, "valid": m_valid, "holdout": m_holdout, }, "pass_valid": pass_valid, "pass_holdout": pass_holdout, } ) selected: list[dict[str, Any]] = [] for side in ("buy", "sell"): pool = [s for s in summaries if s["side"] == side and s["pass_valid"]] pool.sort( key=lambda x: ( x["metrics"]["valid"]["ev_pct"], -_kind_rank(x.get("kind", "")), ), reverse=True, ) selected.extend(pool[:MATCH_MAX_RULES_PER_SIDE]) best_effort: list[dict[str, Any]] = [] if not selected: for side in ("buy", "sell"): pool = [ s for s in summaries if s["side"] == side and s["metrics"]["valid"]["count"] >= MATCH_MIN_FIRES and s.get("valid_fire_rate", 1) <= MATCH_MAX_VALID_FIRE_RATE ] pool.sort( key=lambda x: ( x["metrics"]["valid"]["ev_pct"], -_kind_rank(x.get("kind", "")), ), reverse=True, ) best_effort.extend(pool[:MATCH_BEST_EFFORT_PER_SIDE]) rejected = [s for s in summaries if s not in selected and s not in best_effort] overlap = gt_overlap_report( outcomes[["rule_id", "side", "dt"]].drop_duplicates(), gt_trades, ) holdout_passed = [s for s in summaries if s["pass_valid"] and s["pass_holdout"]] monitor_rules: list[dict[str, Any]] = [] for side in ("buy", "sell"): pool = [s for s in holdout_passed if s["side"] == side] pool.sort( key=lambda x: ( x["metrics"]["holdout"]["ev_pct"], -_kind_rank(x.get("kind", "")), ), reverse=True, ) monitor_rules.extend(pool[:MATCH_MONITOR_MAX_PER_SIDE]) if not monitor_rules: for side in ("buy", "sell"): pool = [s for s in selected if s["side"] == side] or [ s for s in best_effort if s["side"] == side ] pool.sort( key=lambda x: ( x["metrics"].get("holdout", x["metrics"]["valid"])["ev_pct"], -_kind_rank(x.get("kind", "")), ), reverse=True, ) monitor_rules.extend(pool[:MATCH_MONITOR_MAX_PER_SIDE]) active = selected if selected else best_effort result = { "method": "gt_profile_plus_full_bar_ev_filter", "label_mode": MATCH_LABEL_MODE, "train_ratio": MATCH_TRAIN_RATIO, "holdout_ratio": MATCH_HOLDOUT_RATIO, "criteria": { "min_fires_valid": MATCH_MIN_FIRES, "min_fires_holdout": MATCH_MIN_FIRES_HOLDOUT, "min_ev_valid_pct": MATCH_MIN_EV_VALID, "min_profit_factor_valid": MATCH_MIN_PROFIT_FACTOR, "max_valid_fire_rate": MATCH_MAX_VALID_FIRE_RATE, }, "selected": selected, "selected_best_effort": best_effort, "holdout_passed": holdout_passed, "monitor_rules": monitor_rules, "active_rules": active, "strict_pass": len(selected) > 0, "holdout_pass": len(holdout_passed) > 0, "rejected_count": len(rejected), "gt_overlap": overlap, "valid_bars_approx": valid_bars, "all_rule_summaries": summaries, "note": ( "strict EV/PF 통과 규칙 없음 — selected_best_effort는 valid EV 상위(튜닝용)" if not selected else "" ), } n_out = len(selected) or len(best_effort) print( f"[04-4] 선별: strict {len(selected)}개, holdout통과 {len(holdout_passed)}개, " f"05감시 {len(monitor_rules)}개 / 후보 {len(summaries)}개" ) return result def write_backtest_summary_html( matched: dict[str, Any], out_path: Path, ) -> Path: """ backtest_summary.html 생성. Args: matched: select_matched_rules 결과. out_path: HTML 경로. Returns: out_path. """ rows = [] show = matched.get("monitor_rules") or matched.get("selected") or [] title = "05 monitor_rules (holdout 우선)" for s in show: v = s["metrics"]["valid"] h = s["metrics"].get("holdout", {}) rows.append( f"{s['rule_id']}{s['side']}" f"{v['count']}{v['ev_pct']}" f"{h.get('count', 0)}{h.get('ev_pct', 0)}" f"{h.get('profit_factor', 0)}" ) gt = matched.get("gt_overlap", {}) html = f""" 04 Backtest Summary

04 매칭 — {title} (valid 구간)

방법: {matched.get('method','')}

{matched.get('note','')}

선별 규칙

{''.join(rows) if rows else ''}
rule_idsidevalid_nvalid_ev holdout_nholdout_evholdout_pf
통과 규칙 없음

GT recall (±{MATCH_GT_TOLERANCE_MIN}분, 전체 발화 기준)

""" out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(html, encoding="utf-8") print(f"[04-4] 리포트: {out_path}") return out_path