Files
DeepCoin/combination_analyzer.py
dsyoon e218a8ea32 전 봉 BB·일목 조합 분석 및 simulation 단일 실행으로 통합
9개 간격(1~1440분) BB·일목 위치 특징을 3분 타임라인에 맞춰 분석하고,
discover로 매수·매도 규칙을 찾은 뒤 HTML 차트에 해당 체결만 표시한다.
simulation_1h.py를 simulation.py로 변경했으며, 파라미터 없이 실행하면
analyze→discover→차트가 한 번에 수행된다.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 01:20:36 +09:00

223 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
모든 봉의 BB·일목 위치를 조합해 매수/매도 후보 규칙을 분석합니다.
python simulation.py analyze
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from pathlib import Path
import numpy as np
import pandas as pd
from candle_features import (
FEATURE_BOOL_COLS,
INTERVAL_LABELS,
build_master_feature_matrix,
describe_latest_position,
interval_prefix,
)
from config import ALL_INTERVALS, ENTRY_INTERVAL, SYMBOL
REPORT_FILE = Path(__file__).parent / "combination_report.json"
@dataclass
class CombinationReport:
"""봉 조합 분석 결과."""
generated_at: str
intervals_loaded: list[int]
latest_positions: list[dict]
buy_recommendations: list[str] = field(default_factory=list)
sell_recommendations: list[str] = field(default_factory=list)
buy_avoid: list[str] = field(default_factory=list)
top_buy_pairs: list[dict] = field(default_factory=list)
top_sell_pairs: list[dict] = field(default_factory=list)
suggested_rules: dict = field(default_factory=dict)
def _forward_return(close: pd.Series, bars: int = 20) -> pd.Series:
"""N봉 후 수익률 (%)."""
future = close.shift(-bars)
return (future - close) / close.replace(0, np.nan) * 100
def _predicate_keys(intervals: list[int]) -> list[str]:
keys: list[str] = []
for iv in intervals:
pfx = interval_prefix(iv)
for feat in FEATURE_BOOL_COLS:
keys.append(f"{pfx}:{feat}")
return keys
def analyze_forward_edge(
matrix: pd.DataFrame,
forward_bars: int = 20,
min_samples: int = 40,
) -> tuple[list[dict], list[dict], list[dict]]:
"""
단일 조건·2봉 조합별 미래 수익 통계 (학습용 힌트).
Returns:
(매수 유리 top, 매도/회피 top)
"""
close = matrix["Close"].astype(float)
fwd = _forward_return(close, forward_bars)
valid = fwd.notna()
base_mean = float(fwd[valid].mean()) if valid.any() else 0.0
singles: list[dict] = []
cols = [c for c in matrix.columns if any(c.startswith(f"{interval_prefix(iv)}_") for iv in ALL_INTERVALS)]
for col in cols:
mask = matrix[col].fillna(0).astype(bool) & valid
n = int(mask.sum())
if n < min_samples:
continue
avg = float(fwd[mask].mean())
singles.append(
{
"key": _col_to_key(col),
"column": col,
"count": n,
"avg_forward_pct": round(avg, 4),
"edge_vs_base": round(avg - base_mean, 4),
}
)
singles.sort(key=lambda x: x["edge_vs_base"], reverse=True)
buy_top = [s for s in singles if s["edge_vs_base"] > 0][:25]
sell_top = sorted(singles, key=lambda x: x["edge_vs_base"])[:15]
pairs: list[dict] = []
buy_cols = [s["column"] for s in buy_top[:12]]
for i, c1 in enumerate(buy_cols):
for c2 in buy_cols[i + 1 :]:
if c1.split("_")[0] == c2.split("_")[0]:
continue
mask = (
matrix[c1].fillna(0).astype(bool)
& matrix[c2].fillna(0).astype(bool)
& valid
)
n = int(mask.sum())
if n < min_samples // 2:
continue
avg = float(fwd[mask].mean())
pairs.append(
{
"keys": [_col_to_key(c1), _col_to_key(c2)],
"count": n,
"avg_forward_pct": round(avg, 4),
"edge_vs_base": round(avg - base_mean, 4),
}
)
pairs.sort(key=lambda x: x["edge_vs_base"], reverse=True)
return buy_top, pairs[:20], sell_top
def _col_to_key(col: str) -> str:
"""m3_cross_up_lower -> m3:cross_up_lower."""
for pfx in INTERVAL_LABELS.values():
if col.startswith(f"{pfx}_"):
return f"{pfx}:{col[len(pfx) + 1:]}"
return col
def build_recommendations(
buy_top: list[dict],
pair_top: list[dict],
sell_top: list[dict],
) -> tuple[list[str], list[str], list[str], dict]:
"""사람이 읽을 수 있는 권장·규칙 초안."""
buy_rec: list[str] = []
sell_rec: list[str] = []
avoid: list[str] = []
for s in buy_top[:8]:
buy_rec.append(
f"{s['key']}{s['count']}회, {s['avg_forward_pct']:+.2f}% ({s['edge_vs_base']:+.2f}%p)"
)
for p in pair_top[:5]:
buy_rec.append(
f"조합 {' + '.join(p['keys'])}{p['count']}회, {p['edge_vs_base']:+.2f}%p"
)
for s in sell_top[:6]:
if s["edge_vs_base"] < -0.05:
avoid.append(f"매수 회피: {s['key']} ({s['edge_vs_base']:.2f}%p)")
for s in sell_top[:5]:
if "cross_up_upper" in s["key"] or "above_upper" in s["key"] or "ichi_above" in s["key"]:
sell_rec.append(f"매도 후보: {s['key']}")
suggested: dict = {"buy_all": [], "buy_any": [], "sell_all": [], "sell_stop": []}
if pair_top:
suggested["buy_all"] = pair_top[0]["keys"]
elif buy_top:
suggested["buy_all"] = [buy_top[0]["key"]]
if sell_top:
for s in sell_top:
if "cross_up_upper" in s.get("key", ""):
suggested["sell_all"] = [s["key"]]
break
return buy_rec, sell_rec, avoid, suggested
def analyze_combinations(frames: dict[int, pd.DataFrame]) -> CombinationReport:
"""전체 봉 BB·일목 위치 분석 + 조합 매매 힌트."""
from datetime import datetime
loaded = sorted(frames.keys())
latest = [describe_latest_position(frames[iv], iv) for iv in ALL_INTERVALS if iv in frames]
print("\n=== 봉별 최신 BB·일목 위치 ===")
for p in latest:
print(
f" {p['label']:>6} | BB {p['bb_zone']:>6} ({p['bb_pos']:.2f}) {p['bb_state']:>16} | "
f"일목 {p['ichi_position']:>12} TK={p['ichi_tk']} 구름={p['ichi_cloud']}"
)
matrix = build_master_feature_matrix(frames).iloc[52:].copy()
print(f"\n특징 행렬: {len(matrix)}× {len(matrix.columns)}")
buy_top, pair_top, sell_top = analyze_forward_edge(matrix)
buy_rec, sell_rec, avoid, suggested = build_recommendations(buy_top, pair_top, sell_top)
print("\n=== 매수 유리 조건 (단일·상위) ===")
for line in buy_rec[:10]:
print(f" {line}")
print("\n=== 매수 회피 / 매도 참고 ===")
for line in avoid[:6]:
print(f" {line}")
for line in sell_rec[:5]:
print(f" {line}")
return CombinationReport(
generated_at=datetime.now().isoformat(timespec="seconds"),
intervals_loaded=loaded,
latest_positions=latest,
buy_recommendations=buy_rec,
sell_recommendations=sell_rec,
buy_avoid=avoid,
top_buy_pairs=pair_top,
suggested_rules=suggested,
)
def save_report(report: CombinationReport, path: Path = REPORT_FILE) -> None:
path.write_text(json.dumps(asdict(report), ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\n저장: {path}")
def load_frames(monitor) -> dict[int, pd.DataFrame]:
from mtf_bb import load_frames_from_db
return load_frames_from_db(monitor, SYMBOL)