"""MTF 상관 리포트 기반 규칙 정의·도출.""" from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path from typing import Any, Literal from deepcoin.evaluation.gt_align import GT_SIGNAL_TYPES Operator = Literal["<=", ">="] # BTC 가격 스케일에 민감한 지표는 자동 규칙에서 제외 _EXCLUDED_AUTO_FEATURES: frozenset[str] = frozenset({"macd_hist", "zigzag_leg_pct", "close"}) # 자동 규칙에 사용할 안정 피처 _PREFERRED_FEATURES: tuple[str, ...] = ( "close_vs_ema60_pct", "ema60_slope_5_pct", "rsi14", "bb_position", "atr_pct", ) @dataclass(frozen=True) class MtfRule: """단일 MTF 조건 (신호 유형별).""" signal_type: str timeframe_label: str interval_min: int feature: str operator: Operator threshold: float cohens_d: float positive_mean: float negative_mean: float def to_dict(self) -> dict[str, Any]: """JSON 직렬화 dict.""" return { "signal_type": self.signal_type, "timeframe_label": self.timeframe_label, "interval_min": self.interval_min, "feature": self.feature, "operator": self.operator, "threshold": self.threshold, "cohens_d": self.cohens_d, "positive_mean": self.positive_mean, "negative_mean": self.negative_mean, } @classmethod def from_dict(cls, raw: dict[str, Any]) -> MtfRule: """dict에서 MtfRule 생성.""" return cls( signal_type=str(raw["signal_type"]), timeframe_label=str(raw["timeframe_label"]), interval_min=int(raw["interval_min"]), feature=str(raw["feature"]), operator=raw["operator"], # type: ignore[arg-type] threshold=float(raw["threshold"]), cohens_d=float(raw.get("cohens_d", 0.0)), positive_mean=float(raw.get("positive_mean", 0.0)), negative_mean=float(raw.get("negative_mean", 0.0)), ) @dataclass class MtfRuleSet: """신호 유형별 MTF 규칙 묶음.""" version: str = "v1" min_rules_pass: int = 2 min_cohens_d: float = 1.2 max_rules_per_type: int = 4 rules_by_type: dict[str, list[MtfRule]] = field(default_factory=dict) source_report: str = "" def rules_for(self, signal_type: str) -> list[MtfRule]: """신호 유형에 해당하는 규칙 목록.""" return self.rules_by_type.get(signal_type, []) def to_dict(self) -> dict[str, Any]: """JSON 직렬화 dict.""" return { "version": self.version, "min_rules_pass": self.min_rules_pass, "min_cohens_d": self.min_cohens_d, "max_rules_per_type": self.max_rules_per_type, "source_report": self.source_report, "rules_by_type": { st: [r.to_dict() for r in rules] for st, rules in self.rules_by_type.items() }, } @classmethod def from_dict(cls, raw: dict[str, Any]) -> MtfRuleSet: """dict에서 MtfRuleSet 생성.""" rules_by_type: dict[str, list[MtfRule]] = {} for st, items in (raw.get("rules_by_type") or {}).items(): rules_by_type[st] = [MtfRule.from_dict(item) for item in items] return cls( version=str(raw.get("version", "v1")), min_rules_pass=int(raw.get("min_rules_pass", 2)), min_cohens_d=float(raw.get("min_cohens_d", 1.2)), max_rules_per_type=int(raw.get("max_rules_per_type", 4)), rules_by_type=rules_by_type, source_report=str(raw.get("source_report", "")), ) def _label_to_interval(label: str, tf_list: list[dict[str, Any]]) -> int: """TF 라벨 → interval_min.""" for item in tf_list: if item.get("label") == label: return int(item["interval_min"]) raise KeyError(f"unknown timeframe label: {label}") def derive_rules_from_report( report: dict[str, Any], min_cohens_d: float = 1.2, max_rules_per_type: int = 4, min_rules_pass: int = 2, preferred_features: tuple[str, ...] = _PREFERRED_FEATURES, ) -> MtfRuleSet: """MTF 상관 리포트에서 신호 유형별 규칙 후보를 도출한다. 임계값은 GT(양성) 평균과 음성 평균의 중간값으로 설정한다. Args: report: build_mtf_correlation_report 출력 JSON. min_cohens_d: |Cohen's d| 최소값. max_rules_per_type: 유형당 최대 규칙 수. min_rules_pass: 필터 통과에 필요한 최소 충족 규칙 수. preferred_features: 우선 사용 피처. Returns: MtfRuleSet. """ analysis = report.get("analysis", {}) tf_list = analysis.get("timeframes") or [] by_type = report.get("by_signal_type") or {} rule_set = MtfRuleSet( min_rules_pass=min_rules_pass, min_cohens_d=min_cohens_d, max_rules_per_type=max_rules_per_type, source_report=str(report.get("generated_at", "")), ) for signal_type in GT_SIGNAL_TYPES: block = by_type.get(signal_type) if not block: continue candidates: list[tuple[float, MtfRule]] = [] for tf_label, tf_data in (block.get("timeframes") or {}).items(): interval_min = _label_to_interval(tf_label, tf_list) numeric = tf_data.get("numeric") or {} for feat_name, summary in numeric.items(): if feat_name in _EXCLUDED_AUTO_FEATURES: continue if feat_name not in preferred_features: continue d = summary.get("cohens_d") if d is None or abs(float(d)) < min_cohens_d: continue pos_mean = float(summary.get("positive_mean", 0.0)) neg_mean = float(summary.get("negative_mean", 0.0)) threshold = round((pos_mean + neg_mean) / 2.0, 4) operator: Operator = "<=" if pos_mean < neg_mean else ">=" rule = MtfRule( signal_type=signal_type, timeframe_label=tf_label, interval_min=interval_min, feature=feat_name, operator=operator, threshold=threshold, cohens_d=float(d), positive_mean=pos_mean, negative_mean=neg_mean, ) candidates.append((abs(float(d)), rule)) candidates.sort(key=lambda x: x[0], reverse=True) seen: set[tuple[str, str]] = set() picked: list[MtfRule] = [] for _, rule in candidates: key = (rule.timeframe_label, rule.feature) if key in seen: continue seen.add(key) picked.append(rule) if len(picked) >= max_rules_per_type: break if picked: rule_set.rules_by_type[signal_type] = picked return rule_set def save_mtf_rules(rule_set: MtfRuleSet, json_path: Path) -> Path: """규칙 JSON 저장.""" json_path.parent.mkdir(parents=True, exist_ok=True) with json_path.open("w", encoding="utf-8") as fp: json.dump(rule_set.to_dict(), fp, ensure_ascii=False, indent=2) return json_path def load_mtf_rules(json_path: Path) -> MtfRuleSet: """규칙 JSON 로드.""" with json_path.open(encoding="utf-8") as fp: return MtfRuleSet.from_dict(json.load(fp)) def load_or_derive_mtf_rules( rules_path: Path, report_path: Path, min_cohens_d: float = 1.2, max_rules_per_type: int = 4, min_rules_pass: int = 2, force_derive: bool = False, ) -> MtfRuleSet: """규칙 파일이 있으면 로드, 없거나 force면 리포트에서 재도출.""" if rules_path.exists() and not force_derive: return load_mtf_rules(rules_path) if not report_path.exists(): raise FileNotFoundError( f"MTF 규칙/리포트 없음: rules={rules_path}, report={report_path}. " "먼저 scripts/2_run_mtf_analysis.py 실행" ) with report_path.open(encoding="utf-8") as fp: report = json.load(fp) rule_set = derive_rules_from_report( report, min_cohens_d=min_cohens_d, max_rules_per_type=max_rules_per_type, min_rules_pass=min_rules_pass, ) save_mtf_rules(rule_set, rules_path) return rule_set