""" 모든 봉의 BB·일목 위치를 조합해 매수/매도 후보 규칙을 분석합니다. python simulation.py analyze """ from __future__ import annotations import json from dataclasses import asdict, dataclass, field from pathlib import Path import numpy as np import pandas as pd from candle_features import ( FEATURE_BOOL_COLS, INTERVAL_LABELS, build_master_feature_matrix, describe_latest_position, interval_prefix, ) from config import ALL_INTERVALS, ENTRY_INTERVAL, SYMBOL REPORT_FILE = Path(__file__).parent / "combination_report.json" @dataclass class CombinationReport: """봉 조합 분석 결과.""" generated_at: str intervals_loaded: list[int] latest_positions: list[dict] buy_recommendations: list[str] = field(default_factory=list) sell_recommendations: list[str] = field(default_factory=list) buy_avoid: list[str] = field(default_factory=list) top_buy_pairs: list[dict] = field(default_factory=list) top_sell_pairs: list[dict] = field(default_factory=list) suggested_rules: dict = field(default_factory=dict) def _forward_return(close: pd.Series, bars: int = 20) -> pd.Series: """N봉 후 수익률 (%).""" future = close.shift(-bars) return (future - close) / close.replace(0, np.nan) * 100 def _predicate_keys(intervals: list[int]) -> list[str]: keys: list[str] = [] for iv in intervals: pfx = interval_prefix(iv) for feat in FEATURE_BOOL_COLS: keys.append(f"{pfx}:{feat}") return keys def analyze_forward_edge( matrix: pd.DataFrame, forward_bars: int = 20, min_samples: int = 40, ) -> tuple[list[dict], list[dict], list[dict]]: """ 단일 조건·2봉 조합별 미래 수익 통계 (학습용 힌트). Returns: (매수 유리 top, 매도/회피 top) """ close = matrix["Close"].astype(float) fwd = _forward_return(close, forward_bars) valid = fwd.notna() base_mean = float(fwd[valid].mean()) if valid.any() else 0.0 singles: list[dict] = [] cols = [c for c in matrix.columns if any(c.startswith(f"{interval_prefix(iv)}_") for iv in ALL_INTERVALS)] for col in cols: mask = matrix[col].fillna(0).astype(bool) & valid n = int(mask.sum()) if n < min_samples: continue avg = float(fwd[mask].mean()) singles.append( { "key": _col_to_key(col), "column": col, "count": n, "avg_forward_pct": round(avg, 4), "edge_vs_base": round(avg - base_mean, 4), } ) singles.sort(key=lambda x: x["edge_vs_base"], reverse=True) buy_top = [s for s in singles if s["edge_vs_base"] > 0][:25] sell_top = sorted(singles, key=lambda x: x["edge_vs_base"])[:15] pairs: list[dict] = [] buy_cols = [s["column"] for s in buy_top[:12]] for i, c1 in enumerate(buy_cols): for c2 in buy_cols[i + 1 :]: if c1.split("_")[0] == c2.split("_")[0]: continue mask = ( matrix[c1].fillna(0).astype(bool) & matrix[c2].fillna(0).astype(bool) & valid ) n = int(mask.sum()) if n < min_samples // 2: continue avg = float(fwd[mask].mean()) pairs.append( { "keys": [_col_to_key(c1), _col_to_key(c2)], "count": n, "avg_forward_pct": round(avg, 4), "edge_vs_base": round(avg - base_mean, 4), } ) pairs.sort(key=lambda x: x["edge_vs_base"], reverse=True) return buy_top, pairs[:20], sell_top def _col_to_key(col: str) -> str: """m3_cross_up_lower -> m3:cross_up_lower.""" for pfx in INTERVAL_LABELS.values(): if col.startswith(f"{pfx}_"): return f"{pfx}:{col[len(pfx) + 1:]}" return col def build_recommendations( buy_top: list[dict], pair_top: list[dict], sell_top: list[dict], ) -> tuple[list[str], list[str], list[str], dict]: """사람이 읽을 수 있는 권장·규칙 초안.""" buy_rec: list[str] = [] sell_rec: list[str] = [] avoid: list[str] = [] for s in buy_top[:8]: buy_rec.append( f"{s['key']} — {s['count']}회, {s['avg_forward_pct']:+.2f}% ({s['edge_vs_base']:+.2f}%p)" ) for p in pair_top[:5]: buy_rec.append( f"조합 {' + '.join(p['keys'])} — {p['count']}회, {p['edge_vs_base']:+.2f}%p" ) for s in sell_top[:6]: if s["edge_vs_base"] < -0.05: avoid.append(f"매수 회피: {s['key']} ({s['edge_vs_base']:.2f}%p)") for s in sell_top[:5]: if "cross_up_upper" in s["key"] or "above_upper" in s["key"] or "ichi_above" in s["key"]: sell_rec.append(f"매도 후보: {s['key']}") suggested: dict = {"buy_all": [], "buy_any": [], "sell_all": [], "sell_stop": []} if pair_top: suggested["buy_all"] = pair_top[0]["keys"] elif buy_top: suggested["buy_all"] = [buy_top[0]["key"]] if sell_top: for s in sell_top: if "cross_up_upper" in s.get("key", ""): suggested["sell_all"] = [s["key"]] break return buy_rec, sell_rec, avoid, suggested def analyze_combinations(frames: dict[int, pd.DataFrame]) -> CombinationReport: """전체 봉 BB·일목 위치 분석 + 조합 매매 힌트.""" from datetime import datetime loaded = sorted(frames.keys()) latest = [describe_latest_position(frames[iv], iv) for iv in ALL_INTERVALS if iv in frames] print("\n=== 봉별 최신 BB·일목 위치 ===") for p in latest: print( f" {p['label']:>6} | BB {p['bb_zone']:>6} ({p['bb_pos']:.2f}) {p['bb_state']:>16} | " f"일목 {p['ichi_position']:>12} TK={p['ichi_tk']} 구름={p['ichi_cloud']}" ) matrix = build_master_feature_matrix(frames).iloc[52:].copy() print(f"\n특징 행렬: {len(matrix)}행 × {len(matrix.columns)}열") buy_top, pair_top, sell_top = analyze_forward_edge(matrix) buy_rec, sell_rec, avoid, suggested = build_recommendations(buy_top, pair_top, sell_top) print("\n=== 매수 유리 조건 (단일·상위) ===") for line in buy_rec[:10]: print(f" {line}") print("\n=== 매수 회피 / 매도 참고 ===") for line in avoid[:6]: print(f" {line}") for line in sell_rec[:5]: print(f" {line}") return CombinationReport( generated_at=datetime.now().isoformat(timespec="seconds"), intervals_loaded=loaded, latest_positions=latest, buy_recommendations=buy_rec, sell_recommendations=sell_rec, buy_avoid=avoid, top_buy_pairs=pair_top, suggested_rules=suggested, ) def save_report(report: CombinationReport, path: Path = REPORT_FILE) -> None: path.write_text(json.dumps(asdict(report), ensure_ascii=False, indent=2), encoding="utf-8") print(f"\n저장: {path}") def load_frames(monitor) -> dict[int, pd.DataFrame]: from mtf_bb import load_frames_from_db return load_frames_from_db(monitor, SYMBOL)