""" GT 매수/매도 타점 MTF 프로필 분석 (3분~일봉 전 TF). 03b wide CSV에서 간격별·기법별 분포를 비교하고, 04 규칙 후보 생성용 피처 목록을 산출합니다. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd from config import ( GENERAL_ANALYSIS_INTERVALS, MATCH_PROFILE_MIN_SAMPLES, MATCH_PROFILE_MIN_SEPARATION, MATCH_PROFILE_TOP_GLOBAL, MATCH_PROFILE_TOP_PER_TF, ) from deepcoin.analysis.general_analysis_config import INTERVAL_PREFIX from deepcoin.analysis.general_analysis_core import interval_tf_prefix from deepcoin.matching.config import ANALYSIS_TRADES_CSV, META_COLS from deepcoin.paths import ANALYSIS_GT_MTF_PROFILE_HTML, ANALYSIS_GT_MTF_PROFILE_JSON def _feature_separation( buy: pd.Series, sell: pd.Series, ) -> float: """ 매수·매도 GT 분포 간 분리도(Cohen 유사). Args: buy: 매수 타점 값. sell: 매도 타점 값. Returns: 분리도(비숫자·표본 부족 시 0). """ a = pd.to_numeric(buy, errors="coerce").dropna() b = pd.to_numeric(sell, errors="coerce").dropna() if len(a) < MATCH_PROFILE_MIN_SAMPLES or len(b) < MATCH_PROFILE_MIN_SAMPLES: return 0.0 pooled = np.sqrt((a.var() + b.var()) / 2) if pooled < 1e-9: return abs(float(a.mean() - b.mean())) return abs(float(a.mean() - b.mean())) / pooled def _numeric_stats(series: pd.Series) -> dict[str, float | int]: """ 숫자 컬럼 요약 통계. Args: series: 한 side GT 값. Returns: count, mean, median, q25, q75, std. """ s = pd.to_numeric(series, errors="coerce").dropna() if s.empty: return {"count": 0} return { "count": int(len(s)), "mean": round(float(s.mean()), 4), "median": round(float(s.median()), 4), "q25": round(float(s.quantile(0.25)), 4), "q75": round(float(s.quantile(0.75)), 4), "std": round(float(s.std()), 4) if len(s) > 1 else 0.0, } def _categorical_stats(series: pd.Series) -> dict[str, Any]: """ 범주형 컬럼 최빈값·비율. Args: series: GT 값. Returns: mode, mode_frac, value_counts 상위 5. """ s = series.dropna().astype(str) if s.empty: return {"count": 0} vc = s.value_counts() mode = str(vc.index[0]) return { "count": int(len(s)), "mode": mode, "mode_frac": round(float(vc.iloc[0] / len(s)), 3), "top": {str(k): int(v) for k, v in vc.head(5).items()}, } def _parse_tf_column(col: str) -> tuple[str, int | None, str]: """ 컬럼명에서 TF 접두사·간격·베이스명 추출. Args: col: 예 m3_ga_rsi, ga_align_timing_buy_score. Returns: (tf_label, interval_minutes|None, base_name). """ if col.startswith("ga_align_"): return ("mtf_align", None, col) prefixes = sorted( set(INTERVAL_PREFIX.values()), key=len, reverse=True, ) for p in prefixes: if col.startswith(f"{p}_"): inv = {v: k for k, v in INTERVAL_PREFIX.items()} return (p, inv.get(p), col[len(p) + 1 :]) return ("other", None, col) def _feature_family(base: str) -> str: """기법군 라벨.""" if base in ("bb_pos", "RSI", "macd_hist", "stoch_k", "stoch_d", "BB_Width"): return "legacy" if base.startswith("ga_align_"): return "mtf_align" if "pattern" in base: return "pattern" if "struct" in base or "elliott" in base or "wyckoff" in base or "fib_" in base: return "wave_structure" if "chart" in base: return "chart" if "volume" in base or "vp_" in base: return "volume" if "harmonic" in base: return "harmonic" if base.startswith("ga_"): return "indicator" return "other" def discover_profile_columns(df: pd.DataFrame) -> list[str]: """ 규칙·프로필 분석 대상 컬럼 목록. Args: df: 03b wide CSV DataFrame. Returns: META 제외·분석 가능 컬럼명. """ meta = set(META_COLS) out: list[str] = [] for col in df.columns: if col in meta: continue if df[col].notna().sum() < MATCH_PROFILE_MIN_SAMPLES: continue if pd.api.types.is_numeric_dtype(df[col]): out.append(col) continue nuniq = df[col].dropna().astype(str).nunique() if 1 < nuniq <= 20: out.append(col) return out def _analyze_one_column( buy: pd.DataFrame, sell: pd.DataFrame, col: str, ) -> dict[str, Any]: """ 단일 컬럼 매수 vs 매도 GT 비교. Args: buy: 매수 행. sell: 매도 행. col: 컬럼명. Returns: 분리도·통계·방향 힌트. """ tf_label, interval, base = _parse_tf_column(col) family = _feature_family(base) row: dict[str, Any] = { "col": col, "tf": tf_label, "interval": interval, "base": base, "family": family, "dtype": "numeric" if pd.api.types.is_numeric_dtype(buy[col]) else "categorical", } if row["dtype"] == "numeric": row["buy"] = _numeric_stats(buy[col]) row["sell"] = _numeric_stats(sell[col]) sep = _feature_separation(buy[col], sell[col]) row["separation"] = round(sep, 4) bm = row["buy"].get("median") sm = row["sell"].get("median") if bm is not None and sm is not None: row["buy_lower_than_sell"] = bm < sm else: row["buy_lower_than_sell"] = None else: row["buy"] = _categorical_stats(buy[col]) row["sell"] = _categorical_stats(sell[col]) row["separation"] = 0.0 if row["buy"].get("mode") and row["sell"].get("mode"): row["modes_differ"] = row["buy"]["mode"] != row["sell"]["mode"] return row def analyze_gt_mtf_profile(df: pd.DataFrame) -> dict[str, Any]: """ 전 TF·전 컬럼 GT 매수/매도 프로필 분석. Args: df: general_analysis_trades.csv. Returns: JSON 직렬화 가능 분석 결과. """ buy = df[df["action"] == "buy"].copy() sell = df[df["action"] == "sell"].copy() cols = discover_profile_columns(df) features: list[dict[str, Any]] = [] for col in cols: features.append(_analyze_one_column(buy, sell, col)) numeric_feats = [f for f in features if f["dtype"] == "numeric"] ranked = sorted(numeric_feats, key=lambda x: x["separation"], reverse=True) by_interval: dict[str, dict[str, Any]] = {} for iv in GENERAL_ANALYSIS_INTERVALS: pfx = interval_tf_prefix(iv) iv_feats = [f for f in numeric_feats if f["tf"] == pfx] iv_ranked = sorted(iv_feats, key=lambda x: x["separation"], reverse=True) buy_favor = [f for f in iv_ranked if f.get("buy_lower_than_sell") is True][:10] sell_favor = [f for f in iv_ranked if f.get("buy_lower_than_sell") is False][:10] by_interval[pfx] = { "interval_minutes": iv, "feature_count": len(iv_feats), "top_separation": [ {"col": x["col"], "separation": x["separation"]} for x in iv_ranked[:15] ], "buy_favor_lower_median": [ {"col": x["col"], "separation": x["separation"]} for x in buy_favor[:8] ], "sell_favor_higher_median": [ {"col": x["col"], "separation": x["separation"]} for x in sell_favor[:8] ], } align_feats = [f for f in features if f["family"] == "mtf_align"] selected_buy = _select_side_features(ranked, "buy") selected_sell = _select_side_features(ranked, "sell") return { "source_rows": int(len(df)), "buy_gt_count": int(len(buy)), "sell_gt_count": int(len(sell)), "columns_analyzed": len(cols), "intervals": list(GENERAL_ANALYSIS_INTERVALS), "config": { "top_per_tf": MATCH_PROFILE_TOP_PER_TF, "top_global": MATCH_PROFILE_TOP_GLOBAL, "min_separation": MATCH_PROFILE_MIN_SEPARATION, "min_samples": MATCH_PROFILE_MIN_SAMPLES, }, "global_top_separation": [ { "col": x["col"], "tf": x["tf"], "family": x["family"], "separation": x["separation"], "buy_median": x["buy"].get("median"), "sell_median": x["sell"].get("median"), } for x in ranked[:40] ], "by_interval": by_interval, "mtf_align": align_feats, "selected_features": { "buy": selected_buy, "sell": selected_sell, }, "features": features, } def _select_side_features( ranked: list[dict[str, Any]], side: str, ) -> list[str]: """ 04 규칙용 피처 목록: TF별 상위 + 글로벌 상위. Args: ranked: separation 내림차순 numeric feature dicts. side: buy | sell. Returns: 컬럼명 리스트(중복 제거, 순서 유지). """ chosen: list[str] = [] seen: set[str] = set() def add(col: str) -> None: if col not in seen: seen.add(col) chosen.append(col) for iv in GENERAL_ANALYSIS_INTERVALS: pfx = interval_tf_prefix(iv) iv_list = [ f for f in ranked if f["tf"] == pfx and f["separation"] >= MATCH_PROFILE_MIN_SEPARATION ] if side == "buy": iv_list.sort( key=lambda x: ( x["separation"], 1 if x.get("buy_lower_than_sell") else 0, ), reverse=True, ) else: iv_list.sort( key=lambda x: ( x["separation"], 1 if x.get("buy_lower_than_sell") is False else 0, ), reverse=True, ) for f in iv_list[:MATCH_PROFILE_TOP_PER_TF]: add(f["col"]) global_list = [f for f in ranked if f["separation"] >= MATCH_PROFILE_MIN_SEPARATION] if side == "buy": global_list.sort( key=lambda x: ( x["separation"], 1 if x.get("buy_lower_than_sell") else 0, ), reverse=True, ) else: global_list.sort( key=lambda x: ( x["separation"], 1 if x.get("buy_lower_than_sell") is False else 0, ), reverse=True, ) for f in global_list[:MATCH_PROFILE_TOP_GLOBAL]: add(f["col"]) for name in ( "ga_align_timing_buy_score", "ga_align_timing_sell_score", "ga_align_trend_score", "ga_align_rsi_oversold_tf", "ga_align_rsi_overbought_tf", "ga_align_mtf_conflict", ): add(name) return chosen def load_selected_features( profile_path: Path | None = None, ) -> tuple[list[str], list[str]]: """ 저장된 프로필 JSON에서 buy/sell 피처 목록 로드. Args: profile_path: gt_mtf_profile.json. Returns: (buy_features, sell_features). 없으면 빈 리스트. """ path = profile_path or ANALYSIS_GT_MTF_PROFILE_JSON if not path.is_file(): return [], [] data = json.loads(path.read_text(encoding="utf-8")) sel = data.get("selected_features") or {} return list(sel.get("buy") or []), list(sel.get("sell") or []) def run_gt_mtf_profile( trades_csv: Path | None = None, *, write_json: bool = True, write_html: bool = True, ) -> dict[str, Any]: """ 03b CSV 분석 후 JSON/HTML 저장. Args: trades_csv: 입력 CSV. write_json: JSON 저장 여부. write_html: HTML 저장 여부. Returns: analyze_gt_mtf_profile 결과. """ path = trades_csv or ANALYSIS_TRADES_CSV if not path.is_file(): raise FileNotFoundError(f"03b CSV 없음: {path}") df = pd.read_csv(path) analysis = analyze_gt_mtf_profile(df) buy_n = len(analysis["selected_features"]["buy"]) sell_n = len(analysis["selected_features"]["sell"]) print( f"[03c] GT MTF 프로필: 분석 {analysis['columns_analyzed']}열 " f"→ 매수 피처 {buy_n}, 매도 피처 {sell_n}" ) if write_json: ANALYSIS_GT_MTF_PROFILE_JSON.parent.mkdir(parents=True, exist_ok=True) ANALYSIS_GT_MTF_PROFILE_JSON.write_text( json.dumps(analysis, ensure_ascii=False, indent=2), encoding="utf-8", ) print(f"[03c] 저장: {ANALYSIS_GT_MTF_PROFILE_JSON}") if write_html: write_gt_mtf_profile_html(analysis, ANALYSIS_GT_MTF_PROFILE_HTML) print(f"[03c] 저장: {ANALYSIS_GT_MTF_PROFILE_HTML}") return analysis def write_gt_mtf_profile_html( analysis: dict[str, Any], html_path: Path, ) -> Path: """ TF별·글로벌 분리도 요약 HTML. Args: analysis: analyze_gt_mtf_profile 결과. html_path: 출력 경로. Returns: html_path. """ html_path.parent.mkdir(parents=True, exist_ok=True) def _rows_interval() -> str: rows = "" for pfx, block in analysis.get("by_interval", {}).items(): top = block.get("top_separation") or [] top_s = ", ".join( f"{t['col'].split('_', 1)[-1][:20]}({t['separation']:.2f})" for t in top[:5] ) or "-" rows += ( f"
매수 GT {analysis['buy_gt_count']}건 · 매도 GT {analysis['sell_gt_count']}건 · 분석 컬럼 {analysis['columns_analyzed']}개 (3,5,10,15,30,60,240,1440분 + MTF 합성)
분리도 = |mean_buy − mean_sell| / pooled_std. TF별·글로벌 상위 피처로 04 규칙 후보를 생성합니다.
| TF | 숫자 피처 수 | 상위 5 (분리도) |
|---|
| 컬럼 | TF | 기법군 | 분리도 | 매수 median | 매도 median |
|---|
매수{buy_feats}
매도{sell_feats}