refactor: GT·시뮬·운영 3축 정리 및 hybrid 실거래 정합

Phase C/dry-run·미사용 모듈·재생성 HTML을 제거하고, 운영 체결을
sim_causal_hybrid와 동일한 hybrid 로직으로 통합한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
xavis
2026-06-03 23:50:28 +09:00
parent a16c942be4
commit d7848df6f7
85 changed files with 177180 additions and 196131 deletions

View File

@@ -1,6 +1,7 @@
# Phase 04 — Matching (GT + 전구간 EV)
# Matching — Simulation 축
안2 파이프라인: 03b GT 스냅샷에서 규칙 후보를 만들고, 3분봉 전 구간에서 발화·forward 수익을 검증한 뒤 valid 구간 EV로 최종 규칙을 고릅니다.
03b GT 스냅샷에서 규칙 후보 → 전 구간 인과 스캔 → EV·holdout → `matched_rules.json`.
설계: [docs/reference/ARCHITECTURE.md](../../docs/reference/ARCHITECTURE.md)
## PDCA

View File

@@ -1,177 +0,0 @@
"""
GT 총자산 대비 시뮬/규칙 정확도 측정 (동일 체결·평가 모델).
"""
from __future__ import annotations
from typing import Any
import pandas as pd
from config import GT_INITIAL_CASH_KRW, MATCH_GT_TOLERANCE_MIN, TRADING_FEE_RATE
from deepcoin.ground_truth.ground_truth import simulate_truth_portfolio
from deepcoin.matching.rule_eval import eval_rule_mask
def gt_trades_for_legs(
trades: list[dict[str, Any]],
leg_ids: set[int],
) -> list[dict[str, Any]]:
"""
leg_id 집합에 속한 GT 체결만 반환.
Args:
trades: ground_truth trades.
leg_ids: 포함할 leg_id.
Returns:
필터된 trade dict 리스트.
"""
return [t for t in trades if int(t.get("leg_id", 0)) in leg_ids]
def covered_legs_from_fires(
trades: list[dict[str, Any]],
fires: pd.DataFrame,
buy_rule_ids: list[str],
sell_rule_ids: list[str],
tolerance_min: int = MATCH_GT_TOLERANCE_MIN,
) -> set[int]:
"""
매수·매도 규칙 발화가 GT 타점 ±허용 내인 leg_id 집합.
Args:
trades: GT trades.
fires: rule_fires.
buy_rule_ids: 매수 규칙 ID.
sell_rule_ids: 매도 규칙 ID.
tolerance_min: 허용 분.
Returns:
양쪽 모두 커버된 leg_id.
"""
if fires.empty:
return set()
tol = pd.Timedelta(minutes=tolerance_min)
gt_df = pd.DataFrame(trades)
gt_df["ts"] = pd.to_datetime(gt_df["dt"])
fires = fires.copy()
fires["ts"] = pd.to_datetime(fires["dt"])
bf = fires[fires["rule_id"].isin(buy_rule_ids) & (fires["side"] == "buy")]
sf = fires[fires["rule_id"].isin(sell_rule_ids) & (fires["side"] == "sell")]
covered: set[int] = set()
for lid in gt_df["leg_id"].unique():
leg = gt_df[gt_df["leg_id"] == lid]
buys = leg[leg["action"] == "buy"]
sells = leg[leg["action"] == "sell"]
buy_ok = True
for ts in buys["ts"]:
if bf.empty or (bf["ts"] - ts).abs().min() > tol:
buy_ok = False
break
sell_ok = True
for ts in sells["ts"]:
if sf.empty or (sf["ts"] - ts).abs().min() > tol:
sell_ok = False
break
if buy_ok and sell_ok:
covered.add(int(lid))
return covered
def portfolio_asset_ratio(
trades: list[dict[str, Any]],
leg_ids: set[int],
last_price: float | None,
) -> dict[str, Any]:
"""
GT 체결 모델로 전체 vs 부분 leg 포트폴리오 비율.
Args:
trades: 전체 GT trades.
leg_ids: 포함 leg.
last_price: 종가 평가.
Returns:
full/subset final_asset, asset_ratio, leg counts.
"""
full = simulate_truth_portfolio(
trades,
initial_cash=GT_INITIAL_CASH_KRW,
fee_rate=TRADING_FEE_RATE,
last_price=last_price,
)
subset_trades = gt_trades_for_legs(trades, leg_ids)
part = simulate_truth_portfolio(
subset_trades,
initial_cash=GT_INITIAL_CASH_KRW,
fee_rate=TRADING_FEE_RATE,
last_price=last_price,
)
gt_final = float(full["final_asset_krw"])
sub_final = float(part["final_asset_krw"])
ratio = sub_final / gt_final if gt_final > 0 else 0.0
return {
"gt_final_asset_krw": gt_final,
"subset_final_asset_krw": sub_final,
"asset_ratio": round(ratio, 4),
"asset_accuracy_pct": round(ratio * 100.0, 2),
"target_met_90": ratio >= 0.9,
"legs_total": len(set(int(t.get("leg_id", 0)) for t in trades)),
"legs_covered": len(leg_ids),
"leg_coverage_ratio": round(
len(leg_ids) / max(len(set(int(t.get("leg_id", 0)) for t in trades)), 1),
4,
),
"full_pnl_pct": full.get("pnl_pct"),
"subset_pnl_pct": part.get("pnl_pct"),
}
def evaluate_gt_snapshot_recall(
trades_df: pd.DataFrame,
rules: list[dict[str, Any]],
) -> dict[str, Any]:
"""
03b 각 GT 행에서 규칙 스냅샷 충족 여부(OR across rules per side).
Args:
trades_df: general_analysis_trades.csv.
rules: rule dict 리스트.
Returns:
buy/sell recall, per-rule counts.
"""
buy_gt = trades_df[trades_df["action"] == "buy"]
sell_gt = trades_df[trades_df["action"] == "sell"]
buy_rules = [r for r in rules if r.get("side") == "buy"]
sell_rules = [r for r in rules if r.get("side") == "sell"]
def _side_recall(gt: pd.DataFrame, side_rules: list[dict]) -> dict[str, Any]:
if gt.empty or not side_rules:
return {"gt_count": int(len(gt)), "matched": 0, "recall": 0.0}
hit = 0
per_rule: dict[str, int] = {}
for _, row in gt.iterrows():
fr = pd.DataFrame([row])
ok = False
for rule in side_rules:
if bool(eval_rule_mask(fr, rule).iloc[0]):
ok = True
rid = rule["rule_id"]
per_rule[rid] = per_rule.get(rid, 0) + 1
if ok:
hit += 1
n = len(gt)
return {
"gt_count": n,
"matched": hit,
"recall": round(hit / n, 4) if n else 0.0,
"per_rule_hits": per_rule,
}
return {
"buy": _side_recall(buy_gt, buy_rules),
"sell": _side_recall(sell_gt, sell_rules),
}

View File

@@ -1,383 +0,0 @@
"""
Ground truth(450타점) vs 규칙 발화·시뮬 결과 비교 리포트.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from config import MATCH_GT_TOLERANCE_MIN
from deepcoin.ground_truth.ground_truth import load_ground_truth
from deepcoin.matching.select_rules import (
_rule_metrics,
_split_train_valid_holdout,
gt_overlap_report,
)
from deepcoin.paths import (
MATCHING_FIRE_OUTCOMES,
MATCHING_GT_COMPARISON_HTML,
MATCHING_GT_COMPARISON_JSON,
MATCHING_MATCHED_RULES,
MATCHING_SIMULATION_JSON,
resolve_ground_truth_file,
)
def _precision_near_gt(
fire_ts: pd.Series,
gt_ts: pd.Series,
tolerance: pd.Timedelta,
) -> dict[str, Any]:
"""
발화 시각이 GT 타점 ±허용 내인 비율(precision proxy).
Args:
fire_ts: 규칙 발화 시각.
gt_ts: GT 시각.
tolerance: 허용 timedelta.
Returns:
near_count, fire_count, precision.
"""
if fire_ts.empty:
return {"near_count": 0, "fire_count": 0, "precision": 0.0}
gt_sorted = gt_ts.sort_values()
near = 0
for fts in fire_ts:
if (gt_sorted - fts).abs().min() <= tolerance:
near += 1
n = len(fire_ts)
return {
"near_count": near,
"fire_count": n,
"precision": round(near / n, 4) if n else 0.0,
}
def _matched_pairs(
fires: pd.DataFrame,
gt_df: pd.DataFrame,
rule_id: str,
tolerance: pd.Timedelta,
) -> pd.DataFrame:
"""
GT 타점별 가장 가까운 동일 rule·side 발화와 수익률 쌍을 만듭니다.
Args:
fires: fire_outcomes.
gt_df: GT trades DataFrame.
rule_id: 규칙 ID.
tolerance: 매칭 허용.
Returns:
매칭된 행 DataFrame.
"""
sub = fires[fires["rule_id"] == rule_id].copy()
if sub.empty:
return pd.DataFrame()
side = sub["side"].iloc[0]
g = gt_df[gt_df["action"] == side].copy()
g["ts"] = pd.to_datetime(g["dt"])
sub["ts"] = pd.to_datetime(sub["dt"])
rows: list[dict[str, Any]] = []
for _, gt_row in g.iterrows():
gts = pd.Timestamp(gt_row["ts"])
delta = (sub["ts"] - gts).abs()
if delta.empty or delta.min() > tolerance:
continue
idx = delta.idxmin()
fr = sub.loc[idx]
rows.append(
{
"side": side,
"rule_id": rule_id,
"gt_dt": str(gt_row["dt"]),
"fire_dt": str(fr["dt"]),
"delta_min": round(delta.min().total_seconds() / 60, 2),
"gt_forward_pct": float(gt_row.get("forward_return_pct") or 0),
"sim_leg_gt_pct": float(fr["forward_ret_pct"]),
"split": fr.get("split"),
}
)
return pd.DataFrame(rows)
def build_gt_comparison_report(
outcomes_path: Path | None = None,
matched_path: Path | None = None,
gt_path: Path | None = None,
sim_path: Path | None = None,
tolerance_min: int = MATCH_GT_TOLERANCE_MIN,
) -> dict[str, Any]:
"""
GT vs 발화·시뮬 비교 dict 생성.
Args:
outcomes_path: fire_outcomes.csv.
matched_path: matched_rules.json.
gt_path: ground_truth_trades.json.
sim_path: simulation_report.json.
tolerance_min: GT 매칭 허용(분).
Returns:
gt_comparison_report dict.
"""
op = outcomes_path or MATCHING_FIRE_OUTCOMES
mp = matched_path or MATCHING_MATCHED_RULES
if not op.is_file():
raise FileNotFoundError(f"fire_outcomes 없음: {op}")
outcomes = pd.read_csv(op)
outcomes["ts"] = pd.to_datetime(outcomes["dt"])
outcomes["split"] = _split_train_valid_holdout(outcomes)
matched: dict[str, Any] = {}
if mp.is_file():
matched = json.loads(mp.read_text(encoding="utf-8"))
sim_report: dict[str, Any] = {}
sp = sim_path or MATCHING_SIMULATION_JSON
if sp.is_file():
sim_report = json.loads(sp.read_text(encoding="utf-8"))
gt_data = load_ground_truth(gt_path or resolve_ground_truth_file()) or {}
gt_trades = gt_data.get("trades") or []
gt_df = pd.DataFrame(gt_trades)
tol = pd.Timedelta(minutes=tolerance_min)
gt_baseline: dict[str, Any] = {
"total": len(gt_df),
"buy": int((gt_df["action"] == "buy").sum()) if not gt_df.empty else 0,
"sell": int((gt_df["action"] == "sell").sum()) if not gt_df.empty else 0,
}
for side in ("buy", "sell"):
sub = gt_df[gt_df["action"] == side] if not gt_df.empty else pd.DataFrame()
if sub.empty or "forward_return_pct" not in sub.columns:
gt_baseline[side] = {}
continue
r = sub["forward_return_pct"].astype(float)
gt_baseline[side] = {
"mean_forward_pct": round(float(r.mean()), 4),
"median_forward_pct": round(float(r.median()), 4),
"win_rate": round(float((r > 0).mean()), 4),
"count": int(len(r)),
}
all_fires = outcomes.copy()
if "rule_id" not in all_fires.columns:
all_fires["rule_id"] = "all"
overlap_all = gt_overlap_report(
all_fires.drop_duplicates(subset=["dt", "side"]),
gt_trades,
tolerance_min=tolerance_min,
)
per_rule: list[dict[str, Any]] = []
pair_stats: list[dict[str, Any]] = []
for rid in sorted(outcomes["rule_id"].unique()):
sub = outcomes[outcomes["rule_id"] == rid]
side = str(sub["side"].iloc[0])
gt_side = gt_df[gt_df["action"] == side]
gt_ts = pd.to_datetime(gt_side["dt"]) if not gt_side.empty else pd.Series(dtype="datetime64[ns]")
fire_ts = sub["ts"]
ov = gt_overlap_report(sub, gt_trades, tolerance_min=tolerance_min)
prec = _precision_near_gt(fire_ts, gt_ts, tol)
m_all = _rule_metrics(sub)
m_hold = _rule_metrics(sub[sub["split"] == "holdout"])
pairs = _matched_pairs(outcomes, gt_df, rid, tol)
pair_row: dict[str, Any] = {"rule_id": rid, "side": side, "pair_count": len(pairs)}
if len(pairs) >= 2:
corr = pairs["gt_forward_pct"].corr(pairs["sim_leg_gt_pct"])
pair_row["corr_gt_vs_sim"] = round(float(corr), 4) if pd.notna(corr) else None
pair_row["mean_abs_diff_pct"] = round(
float((pairs["gt_forward_pct"] - pairs["sim_leg_gt_pct"]).abs().mean()),
4,
)
pair_row["mean_delta_min"] = round(float(pairs["delta_min"].mean()), 2)
pair_stats.append(pair_row)
near_mask = []
for fts in fire_ts:
near_mask.append(
not gt_ts.empty and (gt_ts - fts).abs().min() <= tol
)
sub_near = sub.loc[near_mask] if near_mask else sub.iloc[0:0]
sub_far = sub.loc[[not x for x in near_mask]] if near_mask else sub
per_rule.append(
{
"rule_id": rid,
"side": side,
"fire_count": int(len(sub)),
"gt_recall": ov.get(side, {}).get("recall", 0),
"gt_matched": ov.get(side, {}).get("matched", 0),
"gt_count": ov.get(side, {}).get("gt_count", 0),
"precision_near_gt": prec["precision"],
"fires_near_gt": prec["near_count"],
"sim_ev_all_pct": m_all.get("ev_pct"),
"sim_ev_near_gt_pct": _rule_metrics(sub_near).get("ev_pct") if len(sub_near) else None,
"sim_ev_far_gt_pct": _rule_metrics(sub_far).get("ev_pct") if len(sub_far) else None,
"sim_win_rate": m_all.get("win_rate"),
"sim_profit_factor": m_all.get("profit_factor"),
"holdout_ev_pct": m_hold.get("ev_pct"),
"holdout_count": m_hold.get("count"),
}
)
monitor_ids = [r["rule_id"] for r in matched.get("monitor_rules", [])]
monitor_summary = [r for r in per_rule if r["rule_id"] in monitor_ids]
go = sim_report.get("go_no_go", {})
return {
"tolerance_min": tolerance_min,
"label_mode": matched.get("label_mode"),
"gt_baseline": gt_baseline,
"gt_overlap_all_fires_dedup": overlap_all,
"gt_overlap_matched_json": matched.get("gt_overlap"),
"per_rule": per_rule,
"pair_alignment": pair_stats,
"monitor_rules": monitor_summary,
"simulation_go_no_go": {
"go": go.get("go"),
"checks": go.get("checks", []),
"live_cap_taken_ratio": go.get("live_cap_taken_ratio"),
},
"notes": [
"gt_overlap_matched_json: 04 선별 시 전 규칙 발화 합산(중복 dt 제거 전) 기준.",
"per_rule.gt_recall: 해당 규칙 발화만으로 GT 타점 커버.",
"precision_near_gt: 발화 중 GT±tolerance 내 비율(낮을수록 잡음 많음).",
"gt_forward_pct vs sim_leg_gt_pct: leg_gt 라벨과 GT JSON forward_return_pct 정의 차이 가능.",
],
}
def write_gt_comparison_html(report: dict[str, Any], out_path: Path) -> Path:
"""
gt_comparison_report.html 저장.
Args:
report: build_gt_comparison_report 결과.
out_path: HTML 경로.
Returns:
out_path.
"""
def _rows(items: list[dict], cols: list[str]) -> str:
lines = []
for it in items:
cells = "".join(f"<td>{it.get(c, '')}</td>" for c in cols)
lines.append(f"<tr>{cells}</tr>")
return "\n".join(lines)
pr_cols = [
"rule_id", "side", "fire_count", "gt_recall", "precision_near_gt",
"sim_ev_all_pct", "sim_ev_near_gt_pct", "sim_ev_far_gt_pct", "holdout_ev_pct",
]
go = report.get("simulation_go_no_go", {})
go_flag = "GO" if go.get("go") else "NO-GO"
gb = report.get("gt_baseline", {})
html = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8"/>
<title>GT vs Simulation Comparison</title>
<style>
body {{ font-family: "Malgun Gothic", Arial, sans-serif; margin: 24px; max-width: 1100px; }}
table {{ border-collapse: collapse; width: 100%; margin: 12px 0; font-size: 0.9rem; }}
th, td {{ border: 1px solid #ccc; padding: 6px 8px; text-align: right; }}
th {{ background: #e2e8f0; text-align: center; }}
td:first-child, th:first-child {{ text-align: left; }}
h2 {{ margin-top: 28px; }}
.warn {{ color: #b45309; }}
</style></head><body>
<h1>Ground Truth vs 규칙·시뮬 비교</h1>
<p>허용 오차: ±{report.get('tolerance_min')}분 · 라벨: {report.get('label_mode')}</p>
<p><strong>시뮬 Go/No-Go: {go_flag}</strong></p>
<h2>GT 기준선 (forward_return_pct)</h2>
<p>총 {gb.get('total')}건 (매수 {gb.get('buy')} / 매도 {gb.get('sell')})</p>
<table>
<thead><tr><th>구분</th><th>건수</th><th>평균 forward%</th><th>중앙값</th><th>승률</th></tr></thead>
<tbody>
<tr><td>매수 GT</td><td>{gb.get('buy', {}).get('count', '')}</td>
<td>{gb.get('buy', {}).get('mean_forward_pct', '')}</td>
<td>{gb.get('buy', {}).get('median_forward_pct', '')}</td>
<td>{gb.get('buy', {}).get('win_rate', '')}</td></tr>
<tr><td>매도 GT</td><td>{gb.get('sell', {}).get('count', '')}</td>
<td>{gb.get('sell', {}).get('mean_forward_pct', '')}</td>
<td>{gb.get('sell', {}).get('median_forward_pct', '')}</td>
<td>{gb.get('sell', {}).get('win_rate', '')}</td></tr>
</tbody></table>
<h2>규칙별 GT recall / precision / EV</h2>
<table>
<thead><tr>{''.join(f'<th>{c}</th>' for c in pr_cols)}</tr></thead>
<tbody>{_rows(report.get('per_rule', []), pr_cols)}</tbody>
</table>
<h2>monitor_rules (실감시·시뮬 대상)</h2>
<table>
<thead><tr>{''.join(f'<th>{c}</th>' for c in pr_cols)}</tr></thead>
<tbody>{_rows(report.get('monitor_rules', []), pr_cols)}</tbody>
</table>
<h2>GT발화 수익률 정렬 (±{report.get('tolerance_min')}분)</h2>
<table>
<thead><tr><th>rule</th><th>side</th><th>pairs</th><th>corr</th><th>mean|diff|%</th><th>mean Δmin</th></tr></thead>
<tbody>
{''.join(
f"<tr><td>{p['rule_id']}</td><td>{p['side']}</td><td>{p['pair_count']}</td>"
f"<td>{p.get('corr_gt_vs_sim','')}</td><td>{p.get('mean_abs_diff_pct','')}</td>"
f"<td>{p.get('mean_delta_min','')}</td></tr>"
for p in report.get('pair_alignment', [])
)}
</tbody></table>
<h2>시뮬 검증 (monitor)</h2>
<pre>{json.dumps(go, ensure_ascii=False, indent=2)}</pre>
<h2>참고</h2>
<ul>
{''.join(f'<li>{n}</li>' for n in report.get('notes', []))}
</ul>
</body></html>"""
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(html, encoding="utf-8")
return out_path
def run_gt_comparison_report(
outcomes_path: Path | None = None,
matched_path: Path | None = None,
) -> dict[str, Any]:
"""
GT 비교 리포트 생성·저장.
Args:
outcomes_path: fire_outcomes.csv.
matched_path: matched_rules.json.
Returns:
report dict.
"""
report = build_gt_comparison_report(outcomes_path, matched_path)
MATCHING_GT_COMPARISON_JSON.parent.mkdir(parents=True, exist_ok=True)
MATCHING_GT_COMPARISON_JSON.write_text(
json.dumps(report, ensure_ascii=False, indent=2),
encoding="utf-8",
)
write_gt_comparison_html(report, MATCHING_GT_COMPARISON_HTML)
print(f"[GT비교] 저장: {MATCHING_GT_COMPARISON_JSON}")
print(f"[GT비교] 저장: {MATCHING_GT_COMPARISON_HTML}")
for m in report.get("monitor_rules", []):
print(
f" {m['rule_id']}: recall={m['gt_recall']:.1%} prec={m['precision_near_gt']:.1%} "
f"fires={m['fire_count']} EV={m['sim_ev_all_pct']}% holdout={m['holdout_ev_pct']}%"
)
go = report.get("simulation_go_no_go", {})
print(f"[GT비교] 시뮬 연동: {'GO' if go.get('go') else 'NO-GO'}")
return report

View File

@@ -1,539 +0,0 @@
"""
GT 타점 MTF 프로필 반복 보강 — 스냅샷 recall·총자산 비율 90% 목표.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from config import (
GENERAL_ANALYSIS_INTERVALS,
MATCH_PROFILE_MIN_SAMPLES,
MATCH_PROFILE_MIN_SEPARATION,
)
from deepcoin.analysis.general_analysis_core import interval_tf_prefix
from deepcoin.matching.config import ANALYSIS_TRADES_CSV
from deepcoin.matching.gt_asset_calibration import (
evaluate_gt_snapshot_recall,
portfolio_asset_ratio,
)
from deepcoin.matching.gt_mtf_profile import (
analyze_gt_mtf_profile,
discover_profile_columns,
)
from deepcoin.matching.profile_rules import (
_condition_from_series,
_feature_separation,
build_rule_candidates,
)
from deepcoin.matching.rule_eval import eval_rule_mask
from deepcoin.paths import (
ANALYSIS_GT_CALIBRATION_JSON,
ANALYSIS_GT_MTF_PROFILE_JSON,
resolve_ground_truth_file,
)
from deepcoin.ground_truth.ground_truth import load_ground_truth
def _condition_or_group(
series: pd.Series,
side: str,
quantile_lo: float = 0.15,
quantile_hi: float = 0.85,
) -> dict[str, Any] | None:
"""
한 컬럼 GT 분포에서 between 조건.
Args:
series: side GT 값.
side: buy | sell.
quantile_lo: 하한 분위.
quantile_hi: 상한 분위.
Returns:
조건 dict.
"""
col_name = series.name
if series.dtype == object or not pd.api.types.is_numeric_dtype(series):
mode = series.dropna().astype(str).mode()
if mode.empty:
return None
return {"col": col_name, "op": "eq", "value": str(mode.iloc[0])}
s = pd.to_numeric(series, errors="coerce").dropna()
if len(s) < MATCH_PROFILE_MIN_SAMPLES:
return None
lo = float(s.quantile(quantile_lo))
hi = float(s.quantile(quantile_hi))
if lo >= hi:
return None
return {"col": col_name, "op": "between", "lo": lo, "hi": hi}
def build_or_tf_rules(
buy: pd.DataFrame,
sell: pd.DataFrame,
ranked_cols: list[str],
*,
per_tf: int = 4,
) -> list[dict[str, Any]]:
"""
TF별 OR 복합 규칙 (해당 TF 상위 분리 컬럼 중 하나만 충족).
Args:
buy: 매수 GT.
sell: 매도 GT.
ranked_cols: 분리도 순 컬럼.
per_tf: TF당 OR 조건 수.
Returns:
rule dict 리스트.
"""
rules: list[dict[str, Any]] = []
for side, subset in (("buy", buy), ("sell", sell)):
for iv in GENERAL_ANALYSIS_INTERVALS:
pfx = interval_tf_prefix(iv)
iv_cols = [
c
for c in ranked_cols
if c.startswith(f"{pfx}_") and c in subset.columns
]
iv_cols = sorted(
iv_cols,
key=lambda c: _feature_separation(buy, sell, c),
reverse=True,
)[:per_tf]
conds: list[dict[str, Any]] = []
for col in iv_cols:
c = _condition_or_group(subset[col], side, 0.20, 0.80)
if c:
conds.append(c)
if len(conds) >= 2 and pfx not in ("m240",):
rules.append(
{
"rule_id": f"{side}_or_{pfx}",
"side": side,
"kind": "or_tf",
"logic": "or",
"conditions": conds,
}
)
return rules
def build_unmatched_atomic_rules(
trades_df: pd.DataFrame,
rules: list[dict[str, Any]],
side: str,
*,
max_new: int = 12,
) -> list[dict[str, Any]]:
"""
스냅샷 미매칭 GT 행에서 분리도 큰 컬럼 atomic 규칙 추가.
Args:
trades_df: 03b CSV.
rules: 기존 규칙.
side: buy | sell.
Returns:
신규 atomic rule dict.
"""
gt = trades_df[trades_df["action"] == side]
buy_all = trades_df[trades_df["action"] == "buy"]
sell_all = trades_df[trades_df["action"] == "sell"]
side_rules = [r for r in rules if r.get("side") == side]
unmatched_idx: list[int] = []
for idx, row in gt.iterrows():
fr = pd.DataFrame([row])
if not any(bool(eval_rule_mask(fr, r).iloc[0]) for r in side_rules):
unmatched_idx.append(idx)
if not unmatched_idx:
return []
unmatched = gt.loc[unmatched_idx]
matched = gt.drop(index=unmatched_idx, errors="ignore")
other = sell_all if side == "buy" else buy_all
cols = discover_profile_columns(trades_df)
scores: list[tuple[float, str]] = []
for col in cols:
if col not in unmatched.columns:
continue
if not pd.api.types.is_numeric_dtype(unmatched[col]):
continue
u = pd.to_numeric(unmatched[col], errors="coerce").dropna()
m = pd.to_numeric(matched[col], errors="coerce").dropna() if len(matched) >= 5 else pd.to_numeric(gt[col], errors="coerce").dropna()
o = pd.to_numeric(other[col], errors="coerce").dropna()
if len(u) < 3 or len(o) < 5:
continue
sep = abs(float(u.mean() - o.mean())) / (np.sqrt((u.var() + o.var()) / 2) + 1e-9)
scores.append((sep, col))
scores.sort(reverse=True)
new_rules: list[dict[str, Any]] = []
existing_cols = {
c["col"]
for r in rules
if r.get("side") == side
for c in r.get("conditions", [])
}
for sep, col in scores[: max_new * 3]:
if col in existing_cols:
continue
if sep < MATCH_PROFILE_MIN_SEPARATION * 0.5:
continue
cond = _condition_from_series(unmatched[col], side)
if cond is None:
cond = _condition_or_group(unmatched[col], side, 0.10, 0.90)
if cond is None:
continue
rid = f"{side}_cal_{col}"
new_rules.append(
{
"rule_id": rid,
"side": side,
"kind": "calibration_atomic",
"logic": "and",
"conditions": [cond],
"profile_col": col,
"calibration_sep": round(sep, 4),
}
)
existing_cols.add(col)
if len(new_rules) >= max_new:
break
return new_rules
def _feature_separation_df(
buy: pd.DataFrame,
sell: pd.DataFrame,
col: str,
) -> float:
"""DataFrame 컬럼 분리도."""
if col not in buy.columns:
return 0.0
a = pd.to_numeric(buy[col], errors="coerce").dropna()
b = pd.to_numeric(sell[col], errors="coerce").dropna()
if len(a) < 5 or len(b) < 5:
return 0.0
pooled = np.sqrt((a.var() + b.var()) / 2)
if pooled < 1e-9:
return abs(float(a.mean() - b.mean()))
return abs(float(a.mean() - b.mean())) / pooled
def run_profile_calibration_loop(
trades_csv: Path | None = None,
*,
target_recall: float = 0.90,
target_asset_ratio: float = 0.90,
max_iterations: int = 5,
) -> dict[str, Any]:
"""
03b·GT 기준 반복 규칙 보강 및 검증.
Args:
trades_csv: 03b CSV.
target_recall: 매수·매도 스냅샷 recall 목표.
target_asset_ratio: GT 총자산 대비 subset 비율 목표.
max_iterations: 최대 반복.
Returns:
calibration 리포트 dict.
"""
path = trades_csv or ANALYSIS_TRADES_CSV
df = pd.read_csv(path)
buy = df[df["action"] == "buy"]
sell = df[df["action"] == "sell"]
analysis = analyze_gt_mtf_profile(df)
ANALYSIS_GT_MTF_PROFILE_JSON.parent.mkdir(parents=True, exist_ok=True)
ANALYSIS_GT_MTF_PROFILE_JSON.write_text(
json.dumps(analysis, ensure_ascii=False, indent=2),
encoding="utf-8",
)
numeric_ranked = sorted(
[
f["col"]
for f in analysis["features"]
if f["dtype"] == "numeric"
],
key=lambda c: next(
(x["separation"] for x in analysis["global_top_separation"] if x["col"] == c),
_feature_separation_df(buy, sell, c),
),
reverse=True,
)
base = build_rule_candidates(path)
rules: list[dict[str, Any]] = list(base.get("rules", []))
for r in rules:
if "logic" not in r:
r["logic"] = "and"
rules.extend(build_or_tf_rules(buy, sell, numeric_ranked[:80]))
history: list[dict[str, Any]] = []
best_rules: list[dict[str, Any]] = list(rules)
best_asset_ratio = -1.0
gt_data = load_ground_truth(resolve_ground_truth_file()) or {}
gt_trades = gt_data.get("trades") or []
mark = (gt_data.get("summary") or {}).get("mark_price")
for it in range(max_iterations):
recall = evaluate_gt_snapshot_recall(df, rules)
buy_rec = recall["buy"]["recall"]
sell_rec = recall["sell"]["recall"]
buy_legs = {int(t["leg_id"]) for t in gt_trades if t["action"] == "buy"}
sell_legs = {int(t["leg_id"]) for t in gt_trades if t["action"] == "sell"}
all_legs = buy_legs | sell_legs
included_legs = set()
gt_df = pd.DataFrame(gt_trades)
for lid in all_legs:
leg = gt_df[gt_df["leg_id"] == lid]
leg_buy_ok = True
leg_sell_ok = True
for _, row in leg[leg["action"] == "buy"].iterrows():
sub = df[(df["dt"] == row["dt"]) & (df["action"] == "buy")]
if sub.empty:
leg_buy_ok = False
break
fr = pd.DataFrame([sub.iloc[0]])
if not any(
bool(eval_rule_mask(fr, r).iloc[0])
for r in rules
if r.get("side") == "buy"
):
leg_buy_ok = False
break
for _, row in leg[leg["action"] == "sell"].iterrows():
sub = df[(df["dt"] == row["dt"]) & (df["action"] == "sell")]
if sub.empty:
leg_sell_ok = False
break
fr = pd.DataFrame([sub.iloc[0]])
if not any(
bool(eval_rule_mask(fr, r).iloc[0])
for r in rules
if r.get("side") == "sell"
):
leg_sell_ok = False
break
if leg_buy_ok and leg_sell_ok:
included_legs.add(int(lid))
asset = portfolio_asset_ratio(gt_trades, included_legs, mark)
row_hist = {
"iteration": it,
"rule_count": len(rules),
"buy_recall": buy_rec,
"sell_recall": sell_rec,
**asset,
}
history.append(row_hist)
print(
f"[cal {it}] rules={len(rules)} "
f"buy_rec={buy_rec:.2%} sell_rec={sell_rec:.2%} "
f"asset_ratio={asset['asset_ratio']:.2%} legs={asset['legs_covered']}/{asset['legs_total']}"
)
if asset["asset_ratio"] > best_asset_ratio:
best_asset_ratio = asset["asset_ratio"]
best_rules = list(rules)
if (
buy_rec >= target_recall
and sell_rec >= target_recall
and asset["asset_ratio"] >= target_asset_ratio
):
break
added = 0
for side in ("buy", "sell"):
rec = recall[side]["recall"]
if rec >= target_recall:
continue
new_rules = build_unmatched_atomic_rules(df, rules, side, max_new=15)
rules.extend(new_rules)
added += len(new_rules)
if added == 0:
rules.extend(build_or_tf_rules(buy, sell, numeric_ranked[:120]))
for side in ("buy", "sell"):
rules.extend(
build_unmatched_atomic_rules(df, rules, side, max_new=20)
)
if len(rules) > 200:
break
final_recall = evaluate_gt_snapshot_recall(df, rules)
final_legs: set[int] = set()
gt_df = pd.DataFrame(gt_trades)
for lid in gt_df["leg_id"].unique():
leg = gt_df[gt_df["leg_id"] == lid]
ok_b = ok_s = True
for _, row in leg[leg["action"] == "buy"].iterrows():
sub = df[(df["dt"] == row["dt"]) & (df["action"] == "buy")]
if sub.empty or not any(
bool(eval_rule_mask(pd.DataFrame([sub.iloc[0]]), r).iloc[0])
for r in rules
if r.get("side") == "buy"
):
ok_b = False
for _, row in leg[leg["action"] == "sell"].iterrows():
sub = df[(df["dt"] == row["dt"]) & (df["action"] == "sell")]
if sub.empty or not any(
bool(eval_rule_mask(pd.DataFrame([sub.iloc[0]]), r).iloc[0])
for r in rules
if r.get("side") == "sell"
):
ok_s = False
if ok_b and ok_s:
final_legs.add(int(lid))
final_asset = portfolio_asset_ratio(gt_trades, final_legs, mark)
out = {
"target_recall": target_recall,
"target_asset_ratio": target_asset_ratio,
"iterations": history,
"final": {
"rule_count": len(rules),
"snapshot_recall": final_recall,
"portfolio": final_asset,
"targets_met": (
final_recall["buy"]["recall"] >= target_recall
and final_recall["sell"]["recall"] >= target_recall
and final_asset["asset_ratio"] >= target_asset_ratio
),
},
"calibrated_rules": rules,
}
deduped: list[dict[str, Any]] = []
seen_rid: set[str] = set()
for r in best_rules:
rid = r.get("rule_id", "")
if rid in seen_rid:
continue
seen_rid.add(rid)
deduped.append(r)
rules = _greedy_recall_cover(df, deduped, target_recall=target_recall)
out["final"]["rule_count_after_greedy"] = len(rules)
out["calibrated_rules"] = rules
out["final"]["snapshot_recall"] = evaluate_gt_snapshot_recall(df, rules)
final_legs_g: set[int] = set()
gt_df = pd.DataFrame(gt_trades)
for lid in gt_df["leg_id"].unique():
leg = gt_df[gt_df["leg_id"] == lid]
ok_b = ok_s = True
for _, row in leg[leg["action"] == "buy"].iterrows():
sub = df[(df["dt"] == row["dt"]) & (df["action"] == "buy")]
if sub.empty or not any(
bool(eval_rule_mask(pd.DataFrame([sub.iloc[0]]), r).iloc[0])
for r in rules
if r.get("side") == "buy"
):
ok_b = False
for _, row in leg[leg["action"] == "sell"].iterrows():
sub = df[(df["dt"] == row["dt"]) & (df["action"] == "sell")]
if sub.empty or not any(
bool(eval_rule_mask(pd.DataFrame([sub.iloc[0]]), r).iloc[0])
for r in rules
if r.get("side") == "sell"
):
ok_s = False
if ok_b and ok_s:
final_legs_g.add(int(lid))
out["final"]["portfolio"] = portfolio_asset_ratio(
gt_trades, final_legs_g, mark
)
fr = out["final"]["snapshot_recall"]
pa = out["final"]["portfolio"]
out["final"]["targets_met"] = (
fr["buy"]["recall"] >= target_recall
and fr["sell"]["recall"] >= target_recall
and pa["asset_ratio"] >= target_asset_ratio
)
ANALYSIS_GT_CALIBRATION_JSON.parent.mkdir(parents=True, exist_ok=True)
ANALYSIS_GT_CALIBRATION_JSON.write_text(
json.dumps(out, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return out
def _greedy_recall_cover(
trades_df: pd.DataFrame,
rules: list[dict[str, Any]],
*,
target_recall: float = 0.90,
max_per_side: int = 40,
) -> list[dict[str, Any]]:
"""
측면별 recall 목표까지 greedy로 규칙 축소.
Args:
trades_df: 03b CSV.
rules: 후보 규칙 전체.
target_recall: 목표 recall.
Returns:
축소된 규칙 + 기존 compound/mtf_cross 유지.
"""
keep_kinds = {
"compound_tight",
"compound",
"contrast",
"mtf_cross",
"or_tf",
}
kept = [r for r in rules if r.get("kind") in keep_kinds]
pool = [r for r in rules if r not in kept]
for side in ("buy", "sell"):
gt = trades_df[trades_df["action"] == side]
if gt.empty:
continue
uncovered = set(gt.index)
side_pool = [r for r in pool if r.get("side") == side]
picked: list[dict[str, Any]] = []
while uncovered and len(picked) < max_per_side:
best_rule = None
best_new = 0
for rule in side_pool:
if rule in picked:
continue
new_hit = 0
for idx in list(uncovered):
row = gt.loc[idx]
if bool(eval_rule_mask(pd.DataFrame([row]), rule).iloc[0]):
new_hit += 1
if new_hit > best_new:
best_new = new_hit
best_rule = rule
if best_rule is None or best_new == 0:
break
picked.append(best_rule)
still = set()
for idx in uncovered:
row = gt.loc[idx]
if not any(
bool(eval_rule_mask(pd.DataFrame([row]), r).iloc[0])
for r in picked + [x for x in kept if x.get("side") == side]
):
still.add(idx)
uncovered = still
rec = 1.0 - len(uncovered) / len(gt)
if rec >= target_recall:
break
kept.extend(picked)
return kept

View File

@@ -1,214 +0,0 @@
"""
실거래 매수 사이징 — 시뮬(sim_tier_enhanced)과 동일 인과 tier·weight 정책.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
from config import (
GT_SIGNAL_CAUSAL,
TRADING_FEE_RATE,
)
from deepcoin.ground_truth.causal_gt_hybrid import (
_attach_drawdown_to_buys,
_bar_index_at,
_close_series_from_df,
_drawdown_pct_at_index,
hybrid_tier_scale,
)
from deepcoin.ground_truth.gt_model import leg_entry_weights, remaining_weight_sum
from deepcoin.matching.position_sizing import compute_buy_amount_krw
from deepcoin.paths import OPS_STATE_DIR
LIVE_SIZING_STATE_JSON = OPS_STATE_DIR / "live_sizing_state.json"
class LivePositionState:
"""
미청산 leg·과거 leg 수익·매수 weight 추적 (시뮬 enrich/causal tier 정합).
"""
def __init__(self) -> None:
"""빈 포지션 상태."""
self.current_leg_id: int = 0
self.open_buys: list[dict[str, Any]] = []
self.completed_leg_ret: dict[int, float] = {}
self.leg_cost_krw: float = 0.0
self.leg_proceeds_krw: float = 0.0
@classmethod
def load(cls, path: Path | None = None) -> LivePositionState:
"""
디스크에서 상태 복원.
Args:
path: JSON 경로. None이면 기본 경로.
Returns:
LivePositionState 인스턴스.
"""
p = path or LIVE_SIZING_STATE_JSON
st = cls()
if not p.is_file():
return st
try:
data = json.loads(p.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return st
st.current_leg_id = int(data.get("current_leg_id") or 0)
st.open_buys = list(data.get("open_buys") or [])
st.completed_leg_ret = {
int(k): float(v) for k, v in (data.get("completed_leg_ret") or {}).items()
}
st.leg_cost_krw = float(data.get("leg_cost_krw") or 0.0)
st.leg_proceeds_krw = float(data.get("leg_proceeds_krw") or 0.0)
return st
def save(self, path: Path | None = None) -> None:
"""
상태를 디스크에 저장.
Args:
path: JSON 경로. None이면 기본 경로.
"""
p = path or LIVE_SIZING_STATE_JSON
p.parent.mkdir(parents=True, exist_ok=True)
payload = {
"current_leg_id": self.current_leg_id,
"open_buys": self.open_buys,
"completed_leg_ret": self.completed_leg_ret,
"leg_cost_krw": round(self.leg_cost_krw, 0),
"leg_proceeds_krw": round(self.leg_proceeds_krw, 0),
}
p.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def _start_new_leg_if_needed(self) -> None:
"""포지션 없을 때 새 leg 시작."""
if not self.open_buys:
self.current_leg_id += 1
self.leg_cost_krw = 0.0
self.leg_proceeds_krw = 0.0
def record_buy(self, dt: str, price: float, amount_krw: float, fee: float) -> None:
"""
체결 매수 기록.
Args:
dt: 체결 시각.
price: 체결가.
amount_krw: 매수 원화.
fee: 수수료.
"""
self._start_new_leg_if_needed()
self.open_buys.append({"dt": dt, "price": price, "amount_krw": amount_krw})
self.leg_cost_krw += amount_krw + fee
def record_sell(self, amount_krw: float, fee: float, *, full_close: bool) -> None:
"""
체결 매도 기록.
Args:
amount_krw: 매도 원화(총액).
fee: 수수료.
full_close: leg 전량 청산 여부.
"""
net = amount_krw - fee
self.leg_proceeds_krw += net
if full_close and self.leg_cost_krw > 0:
ret_pct = (self.leg_proceeds_krw - self.leg_cost_krw) / self.leg_cost_krw * 100.0
self.completed_leg_ret[self.current_leg_id] = ret_pct
self.open_buys = []
self.leg_cost_krw = 0.0
self.leg_proceeds_krw = 0.0
def plan_buy_amount_krw(
self,
dt: str,
price: float,
cash: float,
qty: float,
df: pd.DataFrame | None = None,
*,
enhanced: bool = True,
fee_rate: float = TRADING_FEE_RATE,
) -> float:
"""
시뮬과 동일 tier·weight로 매수 원화 산출.
Args:
dt: 신호 시각.
price: 종가.
cash: 가용 원화.
qty: 보유 수량.
df: OHLC (drawdown).
enhanced: conviction·medium tier 사용.
fee_rate: 수수료율.
Returns:
매수 원화.
"""
self._start_new_leg_if_needed()
prices = [float(b["price"]) for b in self.open_buys] + [price]
weights = leg_entry_weights(prices)
idx = len(self.open_buys)
weight = float(weights[idx])
w_sum = float(sum(weights[idx:]))
trade: dict[str, Any] = {
"dt": dt,
"action": "buy",
"price": price,
"leg_id": self.current_leg_id,
"weight": round(weight, 4),
}
if df is not None and not df.empty:
attached = _attach_drawdown_to_buys([trade], df)
if attached:
trade = attached[0]
from deepcoin.ground_truth.hybrid_dd_calibrate import load_hybrid_dd_params
dd_params = load_hybrid_dd_params()
scale = hybrid_tier_scale(
trade,
completed_leg_ret=self.completed_leg_ret,
enhanced=enhanced,
dd_large_pct=dd_params.get("dd_large_pct"),
dd_medium_pct=dd_params.get("dd_medium_pct"),
)
return compute_buy_amount_krw(
cash,
qty,
price,
weight,
w_sum,
asset_pct_scale=scale,
fee_rate=fee_rate,
ignore_weight_split=bool(trade.get("conviction_buy")),
)
def drawdown_pct_from_df(df: pd.DataFrame, dt: str) -> float:
"""
bar 시점 drawdown % (인과적).
Args:
df: DatetimeIndex OHLC.
dt: 시각 문자열.
Returns:
drawdown %.
"""
if df.empty:
return 0.0
close_s = _close_series_from_df(df)
bar_idx = _bar_index_at(df, dt)
return _drawdown_pct_at_index(close_s, bar_idx)
def live_sizing_enabled() -> bool:
"""실거래 사이징을 시뮬 인과 tier와 정합할지."""
return bool(GT_SIGNAL_CAUSAL)

View File

@@ -1,44 +0,0 @@
"""
04단계: GT 프로필 + 전구간 EV 필터 매칭 파이프라인.
"""
from __future__ import annotations
from pathlib import Path
from deepcoin.matching.pipeline import run_matching_pipeline
from deepcoin.paths import ANALYSIS_TRADES_CSV, REPORTS_ANALYSIS, REPORTS_MATCHING
def run_match(
phase: str = "all",
trades_csv: Path | None = None,
) -> None:
"""
04 파이프라인 실행.
Args:
phase: all | profile | scan | label | select.
trades_csv: 03b CSV 경로(선택).
"""
REPORTS_MATCHING.mkdir(parents=True, exist_ok=True)
csv = trades_csv or ANALYSIS_TRADES_CSV
if not csv.is_file():
raise FileNotFoundError(
f"03b CSV 없음: {csv}\n python scripts/03_analyze_trades.py 먼저 실행"
)
run_matching_pipeline(phase=phase, trades_csv=csv)
def run_match_stub() -> Path:
"""하위 호환: 스텁 대신 phase=profile만 안내."""
print("=== Phase 04 Matching ===")
print(" 전체 파이프라인: python scripts/04_match_rules.py")
print(" 단계별: --phase profile|scan|label|select")
print(f" analysis csv: {ANALYSIS_TRADES_CSV}")
print(f" output dir: {REPORTS_MATCHING}")
return REPORTS_MATCHING
if __name__ == "__main__":
run_match()

View File

@@ -217,72 +217,6 @@ def nearest_gt_leg_id(
return best_buy if best_buy is not None else best_any
_APPROVED_RULES_CACHE: set[str] | None = None
def load_ev_wf_approved_rule_ids(
matched_path: Path | None = None,
outcomes_path: Path | None = None,
) -> set[str]:
"""
holdout EV·PF, walk-forward, 수수료 스트레스를 모두 통과한 rule_id.
Args:
matched_path: matched_rules.json.
outcomes_path: fire_outcomes.csv.
Returns:
통과 rule_id set. 산출 불가 시 monitor_rules 전체 fallback.
"""
global _APPROVED_RULES_CACHE
if _APPROVED_RULES_CACHE is not None:
return set(_APPROVED_RULES_CACHE)
from config import SIM_FEE_STRESS_MULT
from deepcoin.matching.select_rules import _rule_metrics, _split_train_valid_holdout
from deepcoin.matching.simulation import (
evaluate_go_no_go,
simulate_live_order_cap,
walk_forward_by_month,
walk_forward_summary,
)
mp = matched_path or MATCHING_MATCHED_RULES
op = outcomes_path or MATCHING_FIRE_OUTCOMES
matched = load_matched_rules(mp)
rules = matched.get("monitor_rules") or []
if not rules or not op.is_file():
return {r["rule_id"] for r in rules}
import pandas as pd
from config import MATCH_FEE_RATE
outcomes = pd.read_csv(op)
outcomes["split"] = _split_train_valid_holdout(outcomes)
wf_sum = walk_forward_summary(walk_forward_by_month(outcomes))
fee_stress: dict[str, Any] = {}
for rid in outcomes["rule_id"].unique():
sub = outcomes[outcomes["rule_id"] == rid]
from deepcoin.matching.simulation import _fee_adjust_ret
adj = _fee_adjust_ret(sub["forward_ret_pct"], SIM_FEE_STRESS_MULT)
fee_stress[rid] = _rule_metrics(sub.assign(forward_ret_pct=adj))
monitor_ids = {r["rule_id"] for r in rules}
live_cap = simulate_live_order_cap(
outcomes, rule_ids=monitor_ids, holdout_only=True
)
go = evaluate_go_no_go(matched, wf_sum, fee_stress, live_cap)
passed = {c["rule_id"] for c in go.get("checks", []) if c.get("pass")}
if passed:
_APPROVED_RULES_CACHE = passed
return passed
fallback = monitor_ids
_APPROVED_RULES_CACHE = fallback
return fallback
def load_gt_allocation_analysis(
gt_trades: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
@@ -328,8 +262,6 @@ def gt_tier_scale_for_trade(
"""
GT leg tier 배분 스케일 (분석 권장값 또는 config).
시뮬은 live_buy_asset_pct_scale 대신 GT와 동일 tier 정책을 사용합니다.
Args:
trade: {dt, leg_id?, action, ...}.
gt_trades: GT trades (leg 매칭).
@@ -349,37 +281,6 @@ def gt_tier_scale_for_trade(
return gt_tier_scale_from_analysis(int(lid), large_legs, analysis)
def live_buy_asset_pct_scale(
rule_id: str,
dt: str,
gt_trades: list[dict[str, Any]],
*,
approved_rules: set[str],
large_legs: set[int],
) -> float:
"""
실거래 전용 매수 tier (EV/WF·leg 상위). 시뮬은 gt_tier_scale_for_trade 사용.
Args:
rule_id: 규칙 ID.
dt: 체결 시각.
gt_trades: GT trades.
approved_rules: 통과 rule_id.
large_legs: 상위 leg.
Returns:
LIVE_BUY_PCT_LARGE 또는 LIVE_BUY_PCT_SMALL(또는 0에 가까운 소형).
"""
from config import LIVE_BUY_PCT_LARGE, LIVE_BUY_PCT_SMALL
if rule_id not in approved_rules:
return float(LIVE_BUY_PCT_SMALL)
lid = nearest_gt_leg_id(dt, gt_trades)
if lid is not None and lid in large_legs:
return float(LIVE_BUY_PCT_LARGE)
return float(LIVE_BUY_PCT_SMALL)
def enrich_sim_trades_with_gt_weights(
trades: list[dict[str, Any]],
gt_trades: list[dict[str, Any]],
@@ -504,65 +405,6 @@ def attach_gt_model_amounts(
return enriched
def plan_open_position_buy(
open_buys: list[dict[str, Any]],
candidate: dict[str, Any],
cash: float,
qty: float,
gt_trades: list[dict[str, Any]] | None = None,
*,
large_legs: set[int],
analysis: dict[str, Any] | None = None,
fee_rate: float = TRADING_FEE_RATE,
) -> float:
"""
미청산 포지션 내 다음 매수 원화 (GT tier·보유 현금 한도, 1회 상한 없음).
Args:
open_buys: 현재 포지션에서 이미 체결된 매수 dict.
candidate: 이번 매수 후보 {dt, price, rule_id, leg_id?, ...}.
cash: 보유 현금.
qty: 보유 수량.
gt_trades: GT leg 매칭용.
large_legs: 상위 leg.
analysis: GT 배분 분석.
fee_rate: 수수료율.
Returns:
매수 계획 원화.
"""
from deepcoin.ground_truth.gt_model import leg_entry_weights
if gt_trades is None:
gt_trades, _, _ = load_sizing_context_from_gt()
if analysis is None:
analysis = load_gt_allocation_analysis(gt_trades)
prices = [float(t["price"]) for t in open_buys] + [float(candidate["price"])]
weights = leg_entry_weights(prices)
idx = len(open_buys)
w = weights[idx]
w_sum = sum(weights[idx:])
cand = dict(candidate)
if "leg_id" not in cand:
cand["leg_id"] = nearest_gt_leg_id(str(candidate["dt"]), gt_trades)
scale = gt_tier_scale_for_trade(
cand,
gt_trades,
large_legs,
analysis=analysis,
)
return compute_buy_amount_krw(
cash,
qty,
float(candidate["price"]),
w,
w_sum,
asset_pct_scale=scale,
fee_rate=fee_rate,
)
def attach_dynamic_buy_amounts(
trades: list[dict[str, Any]],
*,

View File

@@ -131,7 +131,8 @@ def build_mtf_scan_frame(
if raw is None or raw.empty:
raise RuntimeError(f"주간격 {primary}분 데이터 없음")
print(f"[04b] Phase A: 8TF enrich (스캔용)...")
n_tf = len(GENERAL_ANALYSIS_INTERVALS)
print(f"[04b] Phase A: {n_tf}TF enrich (스캔용, 주·월봉 포함)...")
enriched: dict[int, pd.DataFrame] = {}
for iv in GENERAL_ANALYSIS_INTERVALS:
r = frames.get(iv)

View File

@@ -1,5 +1,5 @@
"""
1단계: walk-forward·민감도·실거래 한도 가정 시뮬·Go/No-Go 리포트.
Simulation: walk-forward·민감도·Go/No-Go·portfolio_compare 리포트.
"""
from __future__ import annotations
@@ -762,19 +762,6 @@ def build_simulation_report(
if ANALYSIS_GT_CALIBRATION_JSON.is_file():
cal = json.loads(ANALYSIS_GT_CALIBRATION_JSON.read_text(encoding="utf-8"))
gt_portfolio = cal.get("final", {})
else:
from deepcoin.matching.gt_asset_calibration import (
portfolio_asset_ratio,
)
gt_data_cal = load_ground_truth(resolve_ground_truth_file()) or {}
trades = gt_data_cal.get("trades") or []
mark_cal = (gt_data_cal.get("summary") or {}).get("mark_price")
if trades:
gt_portfolio = {
"portfolio": portfolio_asset_ratio(trades, set(), mark_cal),
"note": "캘리브레이션 미실행 — scripts/04_calibrate_gt_assets.py",
}
summaries = matched.get("all_rule_summaries") or matched.get("monitor_rules") or []
leg_weight_check = summarize_leg_weights(gt_trades) if gt_trades else {}