""" 04-1: GT 스냅샷(03b)에서 규칙 후보 생성. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd from config import ( MATCH_INCLUDE_ATOMIC, MATCH_INCLUDE_MTF_CROSS, MATCH_INCLUDE_WIDE_RULES, MATCH_PROFILE_QUANTILE_HI, MATCH_PROFILE_QUANTILE_LO, MATCH_PROFILE_TIGHT_HI, MATCH_PROFILE_TIGHT_LO, ) from deepcoin.analysis.general_analysis_config import GENERAL_ANALYSIS_INTERVALS from deepcoin.analysis.general_analysis_core import interval_tf_prefix from deepcoin.matching.config import ( ANALYSIS_TRADES_CSV, BUY_PROFILE_FEATURES, SELL_PROFILE_FEATURES, ) from deepcoin.matching.gt_mtf_profile import ( analyze_gt_mtf_profile, load_selected_features, ) from deepcoin.paths import ( ANALYSIS_GT_CALIBRATION_JSON, ANALYSIS_GT_MTF_PROFILE_JSON, ) def _feature_separation( buy: pd.DataFrame, sell: pd.DataFrame, col: str, ) -> float: """ 매수·매도 GT 분포 간 분리도(절대 평균차/합동표준편차)를 계산합니다. Args: buy: 매수 타점 행. sell: 매도 타점 행. col: 컬럼명. Returns: 분리도(숫자형만, 그 외 0). """ if col not in buy.columns or not pd.api.types.is_numeric_dtype(buy[col]): return 0.0 a = pd.to_numeric(buy[col], errors="coerce").dropna() b = pd.to_numeric(sell[col], errors="coerce").dropna() if len(a) < 5 or len(b) < 5: return 0.0 pooled = np.sqrt((a.var() + b.var()) / 2) if pooled < 1e-9: return abs(float(a.mean() - b.mean())) return abs(float(a.mean() - b.mean())) / pooled def _condition_from_series(series: pd.Series, side: str) -> dict[str, Any] | None: """ 한 컬럼의 GT 분포에서 단일 조건을 추출합니다. Args: series: 해당 side 타점 값. side: buy | sell (설명용). Returns: 조건 dict 또는 None. """ col_name = series.name if series.dtype == object or series.dtype.name == "string": mode = series.dropna().astype(str).mode() if mode.empty: return None return {"col": col_name, "op": "eq", "value": str(mode.iloc[0])} s = pd.to_numeric(series, errors="coerce").dropna() if len(s) < 10: return None if set(s.unique()).issubset({0, 1, 0.0, 1.0}): frac = float(s.mean()) if frac >= 0.55: return {"col": col_name, "op": "eq_int", "value": 1} if frac <= 0.45: return {"col": col_name, "op": "eq_int", "value": 0} return None lo = float(s.quantile(MATCH_PROFILE_QUANTILE_LO)) hi = float(s.quantile(MATCH_PROFILE_QUANTILE_HI)) if lo >= hi: return None return {"col": col_name, "op": "between", "lo": lo, "hi": hi} def _condition_tight(series: pd.Series) -> dict[str, Any] | None: """ q35~q65 좁은 구간 조건. Args: series: GT 부분집합 값. Returns: between 조건 또는 None. """ s = pd.to_numeric(series, errors="coerce").dropna() if len(s) < 10: return None lo = float(s.quantile(MATCH_PROFILE_TIGHT_LO)) hi = float(s.quantile(MATCH_PROFILE_TIGHT_HI)) if lo >= hi: return None return {"col": series.name, "op": "between", "lo": lo, "hi": hi} def _contrast_conditions( buy: pd.DataFrame, sell: pd.DataFrame, col: str, side: str, ) -> list[dict[str, Any]]: """ 매수·매도 GT 분리가 큰 컬럼에 대해 쪽별 타이트 AND 대조 조건. Args: buy: 매수 GT. sell: 매도 GT. col: 컬럼명. side: buy | sell. Returns: 조건 리스트(비어 있을 수 있음). """ if col not in buy.columns or not pd.api.types.is_numeric_dtype(buy[col]): return [] b = pd.to_numeric(buy[col], errors="coerce").dropna() s = pd.to_numeric(sell[col], errors="coerce").dropna() if len(b) < 10 or len(s) < 10: return [] tight = _condition_tight(b if side == "buy" else s) if tight is None: return [] conds = [tight] if side == "buy" and float(b.median()) < float(s.median()): conds.append({"col": col, "op": "lte", "value": float(s.quantile(0.40))}) elif side == "sell" and float(b.median()) < float(s.median()): conds.append({"col": col, "op": "gte", "value": float(b.quantile(0.60))}) return conds def _resolve_profile_features( trades_csv: Path, df: pd.DataFrame, ) -> tuple[list[str], list[str], dict[str, Any] | None]: """ 03c 프로필 JSON 갱신 후 buy/sell 피처 목록 반환. Args: trades_csv: 03b CSV 경로. df: 동일 CSV DataFrame. Returns: (buy_features, sell_features, profile_analysis 또는 None). """ profile_path = ANALYSIS_GT_MTF_PROFILE_JSON need_run = not profile_path.is_file() if not need_run and profile_path.stat().st_mtime < trades_csv.stat().st_mtime: need_run = True analysis: dict[str, Any] | None = None if need_run: analysis = analyze_gt_mtf_profile(df) profile_path.parent.mkdir(parents=True, exist_ok=True) profile_path.write_text( json.dumps(analysis, ensure_ascii=False, indent=2), encoding="utf-8", ) from deepcoin.matching.gt_mtf_profile import write_gt_mtf_profile_html from deepcoin.paths import ANALYSIS_GT_MTF_PROFILE_HTML write_gt_mtf_profile_html(analysis, ANALYSIS_GT_MTF_PROFILE_HTML) print(f"[04-1] 03c GT MTF 프로필 갱신: {profile_path}") buy_f, sell_f = load_selected_features(profile_path) if not buy_f: buy_f = list(BUY_PROFILE_FEATURES) if not sell_f: sell_f = list(SELL_PROFILE_FEATURES) return buy_f, sell_f, analysis def _mtf_cross_conditions( buy: pd.DataFrame, sell: pd.DataFrame, features: list[str], side: str, ) -> list[dict[str, Any]]: """ 각 TF에서 분리도 1위 컬럼 조건을 AND (크로스-TF 복합). Args: buy: 매수 GT. sell: 매도 GT. features: 후보 컬럼. side: buy | sell. Returns: 조건 리스트(2개 이상일 때만 의미). """ subset = buy if side == "buy" else sell conds: list[dict[str, Any]] = [] for iv in GENERAL_ANALYSIS_INTERVALS: pfx = interval_tf_prefix(iv) iv_feats = [f for f in features if f.startswith(f"{pfx}_") and f in subset.columns] if not iv_feats: continue best = max(iv_feats, key=lambda c: _feature_separation(buy, sell, c)) cond = _condition_from_series(subset[best], side) if cond: conds.append(cond) return conds def build_rule_candidates( trades_csv: Path | None = None, ) -> dict[str, Any]: """ 03b CSV + 03c MTF 프로필에서 매수·매도별 규칙 후보를 생성합니다. Args: trades_csv: general_analysis_trades.csv 경로. Returns: rule_candidates 메타·rules 리스트 dict. """ path = trades_csv or ANALYSIS_TRADES_CSV if not path.is_file(): raise FileNotFoundError(f"03b CSV 없음: {path} — scripts/03_analyze_trades.py 먼저 실행") df = pd.read_csv(path) buy = df[df["action"] == "buy"].copy() sell = df[df["action"] == "sell"].copy() buy_features, sell_features, profile = _resolve_profile_features(path, df) rules: list[dict[str, Any]] = [] rid = 0 for side, subset, features in ( ("buy", buy, buy_features), ("sell", sell, sell_features), ): skip_cols = { "ga_align_trend_score", # 분포가 넓어 전구간 발화 과다 } if MATCH_INCLUDE_ATOMIC: for feat in features: if feat not in df.columns or feat in skip_cols: continue cond = _condition_from_series(subset[feat], side) if cond is None: continue rules.append( { "rule_id": f"{side}_a{rid:03d}_{feat}", "side": side, "kind": "atomic", "conditions": [cond], "profile_col": feat, } ) rid += 1 ranked = sorted( [f for f in features if f in df.columns], key=lambda c: _feature_separation(buy, sell, c), reverse=True, ) ranked_top = ranked[:5] compound_conds: list[dict[str, Any]] = [] for feat in ranked_top[:3]: cond = _condition_from_series(subset[feat], side) if cond: compound_conds.append(cond) if len(compound_conds) >= 2: rules.append( { "rule_id": f"{side}_compound_top3", "side": side, "kind": "compound", "conditions": compound_conds, "profile_cols": ranked_top[:3], } ) tight_conds: list[dict[str, Any]] = [] for feat in ranked_top[:4]: if feat not in subset.columns: continue tc = _condition_tight(subset[feat]) if tc: tight_conds.append(tc) if len(tight_conds) >= 2: rules.append( { "rule_id": f"{side}_compound_tight", "side": side, "kind": "compound_tight", "conditions": tight_conds, } ) if ranked_top: c0 = ranked_top[0] contrast = _contrast_conditions(buy, sell, c0, side) if len(contrast) >= 2: rules.append( { "rule_id": f"{side}_contrast_{c0}", "side": side, "kind": "contrast", "conditions": contrast, } ) if MATCH_INCLUDE_MTF_CROSS: cross = _mtf_cross_conditions(buy, sell, features, side) if len(cross) >= 3: rules.append( { "rule_id": f"{side}_mtf_cross_all_tf", "side": side, "kind": "mtf_cross", "conditions": cross, } ) if MATCH_INCLUDE_WIDE_RULES: for feat in ranked_top[:2]: if feat not in subset.columns: continue s = pd.to_numeric(subset[feat], errors="coerce").dropna() if len(s) < 10: continue lo, hi = float(s.quantile(0.10)), float(s.quantile(0.90)) if lo < hi: rules.append( { "rule_id": f"{side}_wide_{feat}", "side": side, "kind": "wide", "conditions": [ {"col": feat, "op": "between", "lo": lo, "hi": hi} ], } ) if ANALYSIS_GT_CALIBRATION_JSON.is_file(): cal = json.loads(ANALYSIS_GT_CALIBRATION_JSON.read_text(encoding="utf-8")) cal_rules = cal.get("calibrated_rules") or [] if cal.get("final", {}).get("targets_met") and cal_rules: rules = [] for cr in cal_rules: if "logic" not in cr: cr["logic"] = "and" rules.append(cr) print(f"[04-1] 캘리브레이션 규칙 적용(90% 달성) → {len(rules)}개") else: seen_ids = {r["rule_id"] for r in rules} for cr in cal_rules: if cr.get("rule_id") not in seen_ids: if "logic" not in cr: cr["logic"] = "and" rules.append(cr) seen_ids.add(cr["rule_id"]) print(f"[04-1] 캘리브레이션 규칙 병합 → 총 {len(rules)}개") out = { "source": str(path), "profile_json": str(ANALYSIS_GT_MTF_PROFILE_JSON), "calibration_json": str(ANALYSIS_GT_CALIBRATION_JSON), "buy_profile_features": buy_features[:50], "sell_profile_features": sell_features[:50], "buy_gt_count": int(len(buy)), "sell_gt_count": int(len(sell)), "rule_count": len(rules), "rules": rules, } print( f"[04-1] 규칙 후보 {len(rules)}개 " f"(매수 GT {len(buy)}, 매도 GT {len(sell)})" ) return out def save_rule_candidates( data: dict[str, Any], out_path: Path, ) -> Path: """ rule_candidates.json 저장. Args: data: build_rule_candidates 결과. out_path: 출력 경로. Returns: out_path. """ out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") print(f"[04-1] 저장: {out_path}") return out_path