인과적 GT 신호·복리 배분 시뮬을 도입하고 운영 정합성을 맞춘다.

미래 데이터를 쓰지 않는 causal 신호/tier와 전기간 복리 포트폴리오 비교로 GT 대비 sim_sized 검증 경로를 정리하고, 일한도·매수 상한·live_buy 스케일을 제거한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
xavis
2026-05-31 19:50:54 +09:00
parent 5842cc9fa3
commit e68bb44083
16 changed files with 1817 additions and 474 deletions

View File

@@ -33,7 +33,6 @@ from config import (
GT_INITIAL_CASH_KRW,
GT_LARGE_LEG_TOP_PCT,
GT_MIN_ORDER_KRW,
GT_MAX_BUY_ORDER_KRW,
GT_MAX_BUYS_PER_LEG,
GT_MAX_ROUND_TRIPS,
TRADING_FEE_RATE,
@@ -49,6 +48,16 @@ from config import (
from deepcoin.common.indicators import apply_bar_indicators, get_trend
from deepcoin.data.mtf_bb import load_frames_from_db
from deepcoin.ground_truth.gt_allocation import (
allocate_order_amounts_chronological,
resolve_sell_qty as _resolve_sell_qty,
)
from deepcoin.ground_truth.gt_model import (
compute_entry_weights,
leg_entry_weights,
leg_exit_weights,
sell_split_weights,
)
from deepcoin.paths import resolve_ground_truth_file
DEFAULT_OUTPUT = resolve_ground_truth_file()
@@ -476,15 +485,6 @@ def _row_at_ts(df: pd.DataFrame, ts: pd.Timestamp) -> pd.Series:
return row
def _normalize_weights(scores: list[float]) -> list[float]:
"""비중 점수를 합 1로 정규화."""
total = sum(scores)
if total <= 0:
n = len(scores)
return [1.0 / n] * n if n else []
return [s / total for s in scores]
def _collect_buy_troughs(
df: pd.DataFrame,
buy_pivots: list[Pivot],
@@ -525,7 +525,6 @@ def _collect_buy_troughs(
filtered.append(p)
if len(filtered) > max_buys:
# 가격이 낮은(저점) 순으로 max_buys만 유지 후 시간순
filtered.sort(key=lambda x: x.price)
filtered = sorted(filtered[:max_buys], key=lambda x: x.ts)
return filtered
@@ -570,7 +569,8 @@ def _peak_sell_points(
second = max(sub_peaks, key=lambda x: x.price)
if second.ts == main.ts:
return [(main, 1.0)]
return [(main, 0.65), (second, 0.35)]
w = sell_split_weights(2)
return [(main, w[0]), (second, w[1])]
def build_split_buy_peak_sell_trades(
@@ -603,8 +603,11 @@ def build_split_buy_peak_sell_trades(
for leg_id, peak in enumerate(sell_peaks):
troughs = _collect_buy_troughs(df, buy_pivots, prev_sell_ts, peak.ts, buy_min_bars)
if troughs:
scores = [1.0 / max(t.price, 1e-9) for t in troughs]
weights = _normalize_weights(scores)
prices = [
float(_row_at_ts(df, t.ts)["Low"]) if "Low" in _row_at_ts(df, t.ts) else t.price
for t in troughs
]
weights = leg_entry_weights(prices)
for t, w in zip(troughs, weights):
row = _row_at_ts(df, t.ts)
bb_pos, rsi, disp = _bb_context(row)
@@ -672,7 +675,11 @@ def build_split_buy_peak_sell_trades(
)
leg_id = len(sell_peaks)
if troughs:
weights = _normalize_weights([1.0 / max(t.price, 1e-9) for t in troughs])
prices = [
float(_row_at_ts(df, t.ts)["Low"]) if "Low" in _row_at_ts(df, t.ts) else t.price
for t in troughs
]
weights = leg_entry_weights(prices)
leg_buys: list[TradePoint] = []
for t, w in zip(troughs, weights):
row = _row_at_ts(df, t.ts)
@@ -916,7 +923,6 @@ def generate_ground_truth(
else "leg_block"
),
"order_amount_min_krw": GT_MIN_ORDER_KRW,
"order_amount_max_buy_krw": GT_MAX_BUY_ORDER_KRW,
"buy_pct_large_leg": GT_BUY_PCT_LARGE_LEG,
"buy_pct_small_leg": GT_BUY_PCT_SMALL_LEG,
"large_leg_top_pct": GT_LARGE_LEG_TOP_PCT,
@@ -963,13 +969,12 @@ def allocate_gt_order_amounts(
trades: list[dict[str, Any]],
initial_cash: float = GT_INITIAL_CASH_KRW,
min_order_krw: float = GT_MIN_ORDER_KRW,
max_buy_krw: float = GT_MAX_BUY_ORDER_KRW,
fee_rate: float = TRADING_FEE_RATE,
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""
GT 각 타점에 amount_krw를 시각순·총자산·비중(최적 매수율)으로 배분합니다.
매수: 총보유자산 × (leg 비중 share × 티어 스케일), 상한=가용 현금.
매수: 목표=총보유자산×(leg 비중 share×티어 스케일), 체결=min(목표, 보유현금/(1+fee)).
leg 상위 GT_LARGE_LEG_TOP_PCT는 GT_BUY_PCT_LARGE_LEG, 그 외는 GT_BUY_PCT_SMALL_LEG.
매도 후 현금 증가분은 다음 매수부터 자동 반영(시각순 복리).
@@ -977,159 +982,18 @@ def allocate_gt_order_amounts(
trades: trade dict 리스트(시각순 정렬 전).
initial_cash: 초기 현금.
min_order_krw: 매수·매도 최소 원화 금액.
max_buy_krw: 매수 1회 상한(가용 현금·비중 배분 후 캡).
fee_rate: 수수료율.
Returns:
(동일 dict 참조, amount_krw 채움), alloc_stats 요약.
"""
from deepcoin.matching.position_sizing import (
compute_buy_amount_krw,
leg_asset_pct_scale,
top_leg_ids_by_forward_return,
return allocate_order_amounts_chronological(
trades,
initial_cash=initial_cash,
min_order_krw=min_order_krw,
fee_rate=fee_rate,
)
chron = sorted(trades, key=lambda x: x["dt"])
large_legs = top_leg_ids_by_forward_return(chron)
leg_buy_idxs: dict[int, list[int]] = {}
leg_sell_idxs: dict[int, list[int]] = {}
for i, t in enumerate(chron):
lid = int(t.get("leg_id", 0))
if t["action"] == "buy":
leg_buy_idxs.setdefault(lid, []).append(i)
elif t["action"] == "sell":
leg_sell_idxs.setdefault(lid, []).append(i)
cash = float(initial_cash)
qty = 0.0
qty_by_leg: dict[int, float] = {}
sell_leg: int | None = None
sell_base_qty = 0.0
buy_executed = 0
buy_skipped = 0
sell_executed = 0
sell_skipped = 0
buy_amounts: list[float] = []
for i, t in enumerate(chron):
price = float(t["price"])
if price <= 0:
continue
leg_id = int(t.get("leg_id", 0))
weight = float(t.get("weight", 1.0))
if t["action"] == "buy":
rem = [j for j in leg_buy_idxs.get(leg_id, []) if j >= i]
w_sum = sum(float(chron[j].get("weight", 1.0)) for j in rem)
w_share = (
weight / w_sum if w_sum > 0 else 1.0 / max(len(rem), 1)
)
scale = leg_asset_pct_scale(leg_id, large_legs)
amount = compute_buy_amount_krw(
cash,
qty,
price,
weight,
w_sum,
asset_pct_scale=scale,
min_order_krw=min_order_krw,
fee_rate=fee_rate,
)
if amount <= 0:
t["amount_krw"] = 0
buy_skipped += 1
continue
t["amount_krw"] = amount
fee = amount * fee_rate
cash -= amount + fee
bought_qty = amount / price
qty += bought_qty
qty_by_leg[leg_id] = qty_by_leg.get(leg_id, 0.0) + bought_qty
buy_executed += 1
buy_amounts.append(amount)
sell_leg = None
elif t["action"] == "sell":
leg_qty = qty_by_leg.get(leg_id, 0.0)
if leg_qty <= 1e-12:
sell_skipped += 1
continue
if sell_leg != leg_id:
sell_leg = leg_id
sell_base_qty = leg_qty
rem_sells = [j for j in leg_sell_idxs.get(leg_id, []) if j >= i]
is_last_leg_sell = bool(rem_sells) and i == rem_sells[-1]
if is_last_leg_sell:
sell_qty = leg_qty
gross = sell_qty * price
else:
gross = sell_base_qty * weight * price
if gross >= min_order_krw:
gross = max(min_order_krw, gross)
gross = min(gross, leg_qty * price)
if gross <= 0:
sell_skipped += 1
continue
if not is_last_leg_sell:
sell_qty = gross / price
else:
sell_qty = leg_qty
t["amount_krw"] = round(gross, 0)
fee = gross * fee_rate
cash += gross - fee
leg_qty -= sell_qty
qty_by_leg[leg_id] = max(leg_qty, 0.0)
qty = max(qty - sell_qty, 0.0)
if qty < 1e-12:
qty = 0.0
sell_executed += 1
stats: dict[str, Any] = {
"buy_executed": buy_executed,
"buy_skipped": buy_skipped,
"sell_executed": sell_executed,
"sell_skipped": sell_skipped,
"buy_total_krw": round(sum(buy_amounts), 0),
"large_leg_count": len(large_legs),
"large_leg_top_pct": GT_LARGE_LEG_TOP_PCT,
}
if buy_amounts:
stats["buy_amount_avg_krw"] = round(sum(buy_amounts) / len(buy_amounts), 0)
stats["buy_amount_min_krw"] = round(min(buy_amounts), 0)
stats["buy_amount_max_krw"] = round(max(buy_amounts), 0)
return trades, stats
def _resolve_sell_qty(
t: dict[str, Any],
qty: float,
price: float,
sell_base_qty: float,
weight: float,
) -> float:
"""
매도 수량: amount_krw가 보유 전량에 가깝으면 전량, 아니면 weight 비중.
Args:
t: trade dict.
qty: 현재 보유 수량.
price: 체결가.
sell_base_qty: leg 첫 매도 시점 보유량.
weight: 매도 비중.
Returns:
매도 수량.
"""
if qty <= 0 or price <= 0:
return 0.0
ak = t.get("amount_krw")
if ak is not None and float(ak) > 0:
gross_cap = float(ak)
if gross_cap >= qty * price * 0.999:
return qty
return min(qty, gross_cap / price)
return min(sell_base_qty * weight, qty)
def _trade_buy_amount(
t: dict[str, Any],

View File

@@ -0,0 +1,402 @@
"""
GT 공통 자본 배분·포트폴리오 시뮬 엔진.
ground_truth.allocate_gt_order_amounts · simulate_truth_portfolio ·
matching/portfolio_sim 이 동일 규칙을 공유합니다.
"""
from __future__ import annotations
from typing import Any, Callable
from config import (
GT_INITIAL_CASH_KRW,
GT_MIN_ORDER_KRW,
TRADING_FEE_RATE,
)
from deepcoin.ground_truth.gt_model import remaining_weight_sum
def resolve_sell_qty(
t: dict[str, Any],
qty: float,
price: float,
sell_base_qty: float,
weight: float,
) -> float:
"""
매도 수량: amount_krw 우선, 없으면 sell_base_qty × weight.
Args:
t: trade dict.
qty: 현재 보유 수량.
price: 체결가.
sell_base_qty: leg 첫 매도 시점 보유량.
weight: 매도 비중.
Returns:
매도 수량.
"""
if qty <= 0 or price <= 0:
return 0.0
ak = t.get("amount_krw")
if ak is not None and float(ak) > 0:
gross_cap = float(ak)
if gross_cap >= qty * price * 0.999:
return qty
return min(qty, gross_cap / price)
return min(sell_base_qty * weight, qty)
def allocate_order_amounts_chronological(
trades: list[dict[str, Any]],
*,
initial_cash: float = GT_INITIAL_CASH_KRW,
min_order_krw: float = GT_MIN_ORDER_KRW,
fee_rate: float = TRADING_FEE_RATE,
large_legs: set[int] | None = None,
asset_pct_scale_fn: Callable[[dict[str, Any]], float] | None = None,
causal_tier: bool = False,
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""
시각순·leg 비중·티어 스케일로 amount_krw를 배분합니다.
causal_tier=True: 청산 완료 leg의 realized return 만으로 tier 산정 (인과적).
Args:
trades: trade dict (weight·leg_id·action·price).
initial_cash: 초기 현금.
min_order_krw: 최소 체결 원화.
fee_rate: 수수료율.
large_legs: 대형 leg. None이면 GT trades에서 산출(비인과).
asset_pct_scale_fn: 매수 trade별 tier scale.
causal_tier: 과거 청산 leg 수익률만으로 tier.
Returns:
(amount_krw 채워진 trades, alloc_stats).
"""
from config import GT_LARGE_LEG_TOP_PCT
from deepcoin.matching.position_sizing import (
compute_buy_amount_krw,
large_leg_ids_from_past_returns,
leg_asset_pct_scale,
top_leg_ids_by_forward_return,
)
chron = sorted(trades, key=lambda x: x["dt"])
if large_legs is None and not causal_tier:
large_legs = top_leg_ids_by_forward_return(chron)
elif large_legs is None:
large_legs = set()
leg_buy_idxs: dict[int, list[int]] = {}
leg_sell_idxs: dict[int, list[int]] = {}
for i, t in enumerate(chron):
lid = int(t.get("leg_id", 0))
if t["action"] == "buy":
leg_buy_idxs.setdefault(lid, []).append(i)
elif t["action"] == "sell":
leg_sell_idxs.setdefault(lid, []).append(i)
cash = float(initial_cash)
qty = 0.0
qty_by_leg: dict[int, float] = {}
sell_leg: int | None = None
sell_base_qty = 0.0
buy_executed = 0
buy_skipped = 0
sell_executed = 0
sell_skipped = 0
buy_amounts: list[float] = []
completed_leg_ret: dict[int, float] = {}
leg_cost_krw: dict[int, float] = {}
leg_proceeds_krw: dict[int, float] = {}
for i, t in enumerate(chron):
price = float(t["price"])
if price <= 0:
continue
leg_id = int(t.get("leg_id", 0))
weight = float(t.get("weight", 1.0))
if t["action"] == "buy":
w_sum = remaining_weight_sum(chron, leg_id, i)
if causal_tier:
large_now = large_leg_ids_from_past_returns(
completed_leg_ret, GT_LARGE_LEG_TOP_PCT
)
scale = leg_asset_pct_scale(leg_id, large_now)
elif asset_pct_scale_fn is not None:
scale = asset_pct_scale_fn(t)
else:
scale = leg_asset_pct_scale(leg_id, large_legs)
amount = compute_buy_amount_krw(
cash,
qty,
price,
weight,
w_sum,
asset_pct_scale=scale,
min_order_krw=min_order_krw,
fee_rate=fee_rate,
)
if amount <= 0:
t["amount_krw"] = 0
buy_skipped += 1
continue
t["amount_krw"] = amount
fee = amount * fee_rate
cash -= amount + fee
bought_qty = amount / price
qty += bought_qty
qty_by_leg[leg_id] = qty_by_leg.get(leg_id, 0.0) + bought_qty
leg_cost_krw[leg_id] = leg_cost_krw.get(leg_id, 0.0) + amount + fee
buy_executed += 1
buy_amounts.append(amount)
sell_leg = None
elif t["action"] == "sell":
leg_qty = qty_by_leg.get(leg_id, 0.0)
if leg_qty <= 1e-12:
sell_skipped += 1
continue
if sell_leg != leg_id:
sell_leg = leg_id
sell_base_qty = leg_qty
rem_sells = [j for j in leg_sell_idxs.get(leg_id, []) if j >= i]
is_last_leg_sell = bool(rem_sells) and i == rem_sells[-1]
if is_last_leg_sell:
sell_qty = leg_qty
gross = sell_qty * price
else:
gross = sell_base_qty * weight * price
if gross >= min_order_krw:
gross = max(min_order_krw, gross)
gross = min(gross, leg_qty * price)
if gross <= 0:
sell_skipped += 1
continue
sell_qty = leg_qty if is_last_leg_sell else gross / price
t["amount_krw"] = round(gross, 0)
fee = gross * fee_rate
cash += gross - fee
leg_proceeds_krw[leg_id] = leg_proceeds_krw.get(leg_id, 0.0) + (gross - fee)
leg_qty -= sell_qty
qty_by_leg[leg_id] = max(leg_qty, 0.0)
qty = max(qty - sell_qty, 0.0)
if qty < 1e-12:
qty = 0.0
sell_executed += 1
if causal_tier and leg_qty <= 1e-12:
cost = leg_cost_krw.pop(leg_id, 0.0)
proceeds = leg_proceeds_krw.pop(leg_id, 0.0)
if cost > 0:
completed_leg_ret[leg_id] = (proceeds - cost) / cost * 100.0
stats: dict[str, Any] = {
"buy_executed": buy_executed,
"buy_skipped": buy_skipped,
"sell_executed": sell_executed,
"sell_skipped": sell_skipped,
"buy_total_krw": round(sum(buy_amounts), 0),
"large_leg_count": len(large_legs),
}
if buy_amounts:
stats["buy_amount_avg_krw"] = round(sum(buy_amounts) / len(buy_amounts), 0)
stats["buy_amount_min_krw"] = round(min(buy_amounts), 0)
stats["buy_amount_max_krw"] = round(max(buy_amounts), 0)
return trades, stats
def simulate_portfolio_steps(
trades: list[dict[str, Any]],
*,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
use_amount_krw: bool = True,
) -> list[dict[str, Any]]:
"""
체결마다 현금·보유·총평가 스냅샷.
Args:
trades: 시각순 trade dict (amount_krw·weight·leg_id).
initial_cash: 시작 현금.
fee_rate: 수수료율.
use_amount_krw: True면 amount_krw 기준 체결.
Returns:
step dict 리스트.
"""
rows = sorted(trades, key=lambda x: x["dt"])
cash = float(initial_cash)
qty = 0.0
qty_by_leg: dict[int, float] = {}
sell_leg: int | None = None
sell_base_qty = 0.0
leg_budget = 0.0
current_leg: int | None = None
steps: list[dict[str, Any]] = []
for t in rows:
action = t.get("action", t.get("side", ""))
price = float(t["price"])
if price <= 0:
continue
weight = float(t.get("weight", 1.0))
leg_id = int(t.get("leg_id", 0))
if action == "buy":
if use_amount_krw and t.get("amount_krw") is not None and float(t["amount_krw"]) > 0:
amount = min(float(t["amount_krw"]), max(cash / (1.0 + fee_rate), 0.0))
else:
if leg_id != current_leg:
current_leg = leg_id
leg_budget = cash
amount = min(leg_budget * weight, max(cash / (1.0 + fee_rate), 0.0))
if amount <= 0:
continue
fee = amount * fee_rate
cash -= amount + fee
bought = amount / price
qty += bought
qty_by_leg[leg_id] = qty_by_leg.get(leg_id, 0.0) + bought
sell_leg = None
elif action == "sell" and qty > 0:
leg_qty = qty_by_leg.get(leg_id, qty)
if sell_leg != leg_id:
sell_leg = leg_id
sell_base_qty = leg_qty
sell_qty = resolve_sell_qty(t, leg_qty, price, sell_base_qty, weight)
if sell_qty <= 0:
continue
gross = sell_qty * price
fee = gross * fee_rate
cash += gross - fee
leg_qty -= sell_qty
qty_by_leg[leg_id] = max(leg_qty, 0.0)
qty -= sell_qty
if qty < 1e-12:
qty = 0.0
steps.append(
{
"dt": t["dt"],
"action": action,
"price": price,
"weight": weight,
"leg_id": leg_id,
"amount_krw": t.get("amount_krw"),
"cash_krw": round(cash, 0),
"holding_qty": round(qty, 6),
"total_asset_krw": round(cash + qty * price, 0),
}
)
return steps
def compute_drawdown_metrics(steps: list[dict[str, Any]]) -> dict[str, Any]:
"""
equity curve 기준 최대 낙폭·고점 대비 하락.
Args:
steps: simulate_portfolio_steps 결과.
Returns:
max_drawdown_pct, peak_asset_krw, trough_after_peak_krw.
"""
if not steps:
return {
"max_drawdown_pct": 0.0,
"peak_asset_krw": 0.0,
"trough_asset_krw": 0.0,
}
assets = [float(s["total_asset_krw"]) for s in steps]
peak = assets[0]
max_dd = 0.0
peak_at = assets[0]
trough_at = assets[0]
for a in assets:
if a > peak:
peak = a
dd = (peak - a) / peak * 100.0 if peak > 0 else 0.0
if dd > max_dd:
max_dd = dd
peak_at = peak
trough_at = a
return {
"max_drawdown_pct": round(max_dd, 2),
"peak_asset_krw": round(peak_at, 0),
"trough_asset_krw": round(trough_at, 0),
}
def simulate_portfolio_summary(
trades: list[dict[str, Any]],
*,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
last_price: float | None = None,
use_amount_krw: bool = True,
) -> dict[str, Any]:
"""
포트폴리오 시뮬 요약 + MDD.
Args:
trades: trade dict 리스트.
initial_cash: 시작 현금.
fee_rate: 수수료율.
last_price: 미청산 평가가.
use_amount_krw: amount_krw 체결 사용.
Returns:
pnl·fee·MDD 포함 dict.
"""
steps = simulate_portfolio_steps(
trades,
initial_cash=initial_cash,
fee_rate=fee_rate,
use_amount_krw=use_amount_krw,
)
if not steps:
return {
"initial_cash_krw": round(initial_cash, 0),
"final_asset_krw": round(initial_cash, 0),
"pnl_krw": 0.0,
"pnl_pct": 0.0,
"trade_count": len(trades),
"max_drawdown_pct": 0.0,
}
last_step = steps[-1]
cash = float(last_step["cash_krw"])
qty = float(last_step["holding_qty"])
mark = float(last_price if last_price is not None else last_step["price"])
holding_value = qty * mark
final_asset = cash + holding_value
pnl = final_asset - initial_cash
pnl_pct = pnl / initial_cash * 100.0 if initial_cash else 0.0
fees = 0.0
for t in sorted(trades, key=lambda x: x["dt"]):
ak = float(t.get("amount_krw") or 0)
if ak <= 0:
continue
fees += ak * fee_rate
dd = compute_drawdown_metrics(steps)
return {
"initial_cash_krw": round(initial_cash, 0),
"final_asset_krw": round(final_asset, 0),
"pnl_krw": round(pnl, 0),
"pnl_pct": round(pnl_pct, 2),
"total_fees_krw": round(fees, 0),
"cash_krw": round(cash, 0),
"holding_qty": round(qty, 6),
"holding_value_krw": round(holding_value, 0),
"mark_price": round(mark, 2),
"fee_rate": fee_rate,
"trade_count": len(trades),
**dd,
}

View File

@@ -0,0 +1,150 @@
"""
GT 체결 amount_krw·총자산 비율 분석 — 시뮬 tier·배분율 최적 추정.
"""
from __future__ import annotations
from typing import Any
from config import (
GT_BUY_PCT_LARGE_LEG,
GT_BUY_PCT_SMALL_LEG,
GT_INITIAL_CASH_KRW,
GT_LARGE_LEG_TOP_PCT,
TRADING_FEE_RATE,
)
from deepcoin.matching.position_sizing import (
leg_asset_pct_scale,
optimal_weight_share,
portfolio_totals,
top_leg_ids_by_forward_return,
)
def analyze_gt_buy_allocation(
trades: list[dict[str, Any]],
*,
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
) -> dict[str, Any]:
"""
GT 시각순 체결에서 매수별 (실제투입/총자산) 비율을 분석합니다.
Args:
trades: amount_krw·weight·leg_id가 채워진 GT trade dict.
initial_cash: 시작 현금.
fee_rate: 수수료율.
Returns:
leg tier별·전체 배분 통계 및 권장 pct_large/pct_small.
"""
chron = sorted(trades, key=lambda x: x["dt"])
if not chron:
return {"note": "체결 없음"}
large_legs = top_leg_ids_by_forward_return(chron, GT_LARGE_LEG_TOP_PCT)
cash = float(initial_cash)
qty = 0.0
ratios_large: list[float] = []
ratios_small: list[float] = []
ratios_all: list[float] = []
for i, t in enumerate(chron):
price = float(t["price"])
if price <= 0:
continue
leg_id = int(t.get("leg_id", 0))
action = t.get("action", "")
if action == "buy":
w = float(t.get("weight", 1.0))
rem = sum(
float(chron[j].get("weight", 1.0))
for j in range(i, len(chron))
if int(chron[j].get("leg_id", 0)) == leg_id
and chron[j].get("action") == "buy"
)
opt = optimal_weight_share(w, rem) if rem > 0 else 1.0
total_asset, _, _ = portfolio_totals(cash, qty, price)
amount = float(t.get("amount_krw") or 0)
if total_asset > 0 and amount > 0 and opt > 0:
implied = amount / (total_asset * opt)
ratios_all.append(implied)
if leg_id in large_legs:
ratios_large.append(implied)
else:
ratios_small.append(implied)
if amount > 0:
fee = amount * fee_rate
cash -= amount + fee
qty += amount / price
elif action == "sell" and qty > 0:
gross = float(t.get("amount_krw") or qty * price)
cash += gross * (1.0 - fee_rate)
qty = 0.0
def _stats(vals: list[float]) -> dict[str, float]:
if not vals:
return {}
s = sorted(vals)
n = len(s)
return {
"count": n,
"mean": round(sum(s) / n, 4),
"median": round(s[n // 2], 4),
"p25": round(s[max(0, n // 4)], 4),
"p75": round(s[min(n - 1, 3 * n // 4)], 4),
}
st_all = _stats(ratios_all)
st_large = _stats(ratios_large)
st_small = _stats(ratios_small)
rec_large = st_large.get("median", GT_BUY_PCT_LARGE_LEG)
rec_small = st_small.get("median", GT_BUY_PCT_SMALL_LEG)
if not rec_large or rec_large <= 0:
rec_large = GT_BUY_PCT_LARGE_LEG
if not rec_small or rec_small <= 0:
rec_small = GT_BUY_PCT_SMALL_LEG
return {
"large_leg_ids": sorted(large_legs),
"large_leg_count": len(large_legs),
"config_pct_large": GT_BUY_PCT_LARGE_LEG,
"config_pct_small": GT_BUY_PCT_SMALL_LEG,
"observed_implied_scale": {
"all": st_all,
"large_leg": st_large,
"small_leg": st_small,
},
"recommended_pct_large_leg": round(rec_large, 4),
"recommended_pct_small_leg": round(rec_small, 4),
"note": (
"implied_scale = amount / (pre_buy_total_asset × weight_share); "
"시뮬 tier는 GT 분석 median 사용"
),
}
def gt_tier_scale_from_analysis(
leg_id: int,
large_legs: set[int],
analysis: dict[str, Any] | None = None,
) -> float:
"""
GT 분석 권장값 또는 config tier scale.
Args:
leg_id: leg 번호.
large_legs: 상위 leg.
analysis: analyze_gt_buy_allocation 결과.
Returns:
총자산 대비 매수 스케일 (0~1).
"""
if analysis and analysis.get("observed_implied_scale", {}).get("all"):
if leg_id in large_legs:
return float(analysis.get("recommended_pct_large_leg", GT_BUY_PCT_LARGE_LEG))
return float(analysis.get("recommended_pct_small_leg", GT_BUY_PCT_SMALL_LEG))
return leg_asset_pct_scale(leg_id, large_legs)

View File

@@ -1,13 +1,14 @@
"""
Ground Truth 타점·비중·자본 배분 모델 (일반화 명세).
타점 생성(ground_truth.py) 자본 배분(position_sizing.py)의 공통 언어.
타점 생성(ground_truth.py), 자본 배분(gt_allocation.py),
시뮬(position_sizing.py)의 공통 언어.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from typing import Any, Callable
from config import (
GT_BUY_BB_MAX,
@@ -15,14 +16,130 @@ from config import (
GT_BUY_MIN_SWING_PCT,
GT_BUY_PCT_LARGE_LEG,
GT_BUY_PCT_SMALL_LEG,
GT_BUY_WEIGHT_RULE,
GT_LARGE_LEG_TOP_PCT,
GT_MAX_BUYS_PER_LEG,
GT_MAX_SELLS_PER_LEG,
GT_MIN_ORDER_KRW,
GT_SELL_SPLIT_GAP_PCT,
GT_SELL_SPLIT_WEIGHTS,
GT_SELECTION_MODE,
)
# --- 매수 비중 규칙 (확장 가능) ---
EntryWeightFn = Callable[[list[float]], list[float]]
def normalize_weights(scores: list[float]) -> list[float]:
"""
비중 점수를 합 1로 정규화합니다.
Args:
scores: raw score 리스트.
Returns:
정규화 weight (합 ≈ 1).
"""
if not scores:
return []
total = sum(scores)
if total <= 0:
n = len(scores)
return [1.0 / n] * n
return [s / total for s in scores]
def compute_buy_weights_inverse_price(prices: list[float]) -> list[float]:
"""
저점 매수 비중: score_i = 1/price_i → 합=1 정규화.
Args:
prices: leg 내 매수 후보 가격.
Returns:
weight 리스트 (합 ≈ 1).
"""
if not prices:
return []
scores = [1.0 / max(p, 1e-9) for p in prices]
return normalize_weights(scores)
def compute_buy_weights_equal(_prices: list[float]) -> list[float]:
"""
균등 분할 매수 비중.
Args:
_prices: leg 내 매수 후보 가격(미사용).
Returns:
weight 리스트 (합 = 1).
"""
n = len(_prices)
if n <= 0:
return []
return [1.0 / n] * n
ENTRY_WEIGHT_RULES: dict[str, EntryWeightFn] = {
"inverse_price_normalized": compute_buy_weights_inverse_price,
"equal": compute_buy_weights_equal,
}
def compute_entry_weights(
prices: list[float],
rule: str | None = None,
) -> list[float]:
"""
매수 타점 비중을 규칙명으로 계산합니다.
Args:
prices: 체결 가격 리스트.
rule: `inverse_price_normalized` | `equal`. None이면 config.
Returns:
leg 내 매수 weight (합 ≈ 1).
"""
key = (rule or GT_BUY_WEIGHT_RULE).strip()
fn = ENTRY_WEIGHT_RULES.get(key, compute_buy_weights_inverse_price)
return fn(prices)
def leg_entry_weights(
prices: list[float],
rule: str | None = None,
) -> list[float]:
"""
leg 내 매수 타점 비중 (compute_entry_weights 별칭).
Args:
prices: 체결 가격 리스트.
rule: 비중 규칙 키.
Returns:
weight 리스트 (합 ≈ 1).
"""
return compute_entry_weights(prices, rule)
def leg_exit_weights(
n_sells: int,
exit_spec: GtExitSpec | None = None,
) -> list[float]:
"""
leg 매도 분할 비중.
Args:
n_sells: 매도 횟수.
exit_spec: 매도 명세.
Returns:
weight 리스트 (합 ≈ 1).
"""
return sell_split_weights(n_sells, exit_spec)
@dataclass(frozen=True)
class GtEntrySpec:
@@ -32,14 +149,14 @@ class GtEntrySpec:
Attributes:
pivot_kind: ZigZag 저점(trough).
price_field: 체결가 = 봉 Low.
weight_rule: 저가일수록 큰 비중 (1/price 정규화).
weight_rule: 매수 비중 규칙 키.
max_per_leg: leg당 최대 매수 횟수.
min_bars_gap: 분할 매수 최소 봉 간격.
"""
pivot_kind: str = "trough"
price_field: str = "Low"
weight_rule: str = "inverse_price_normalized"
weight_rule: str = GT_BUY_WEIGHT_RULE
max_per_leg: int = GT_MAX_BUYS_PER_LEG
min_bars_gap: int = GT_BUY_MIN_BARS
bb_filter: str = f"bb_pos <= {GT_BUY_BB_MAX}"
@@ -53,13 +170,13 @@ class GtExitSpec:
Attributes:
pivot_kind: major swing 고점(peak).
price_field: 체결가 = 봉 High.
split_weights: 2회 분할 시 (65%, 35%).
split_weights: N회 분할 시 각 매도 비중 (합=1).
split_gap_pct: 2차 고점 인정 최소 괴리(%).
"""
pivot_kind: str = "peak"
price_field: str = "High"
split_weights: tuple[float, float] = (0.65, 0.35)
split_weights: tuple[float, ...] = GT_SELL_SPLIT_WEIGHTS
split_gap_pct: float = GT_SELL_SPLIT_GAP_PCT
max_per_leg: int = GT_MAX_SELLS_PER_LEG
@@ -78,13 +195,16 @@ class GtCapitalSpec:
min_order_krw: 최소 체결 원화.
"""
buy_formula: str = "min(total_asset * w_share * tier_scale, cash/(1+fee))"
buy_formula: str = (
"target = total_asset * (weight/remaining_weights) * tier_scale; "
"amount = min(target, available_cash/(1+fee))"
)
optimal_buy_rate: str = "weight / sum(remaining_buy_weights_in_leg)"
large_leg_top_pct: float = GT_LARGE_LEG_TOP_PCT
pct_large: float = GT_BUY_PCT_LARGE_LEG
pct_small: float = GT_BUY_PCT_SMALL_LEG
min_order_krw: float = float(GT_MIN_ORDER_KRW)
sell_formula: str = "leg_qty * sell_weight * price (last sell = full leg_qty)"
sell_formula: str = "sell_base_qty * sell_weight * price (last sell = full leg_qty)"
@dataclass
@@ -121,37 +241,74 @@ def default_model() -> GroundTruthModel:
return GroundTruthModel()
def compute_buy_weights_inverse_price(prices: list[float]) -> list[float]:
def sell_split_weights(
n_sells: int,
exit_spec: GtExitSpec | None = None,
) -> list[float]:
"""
저점 비중: score_i = 1/price_i → 합=1 정규화.
leg 비중 (1회=100%, N회=split_weights 정규화).
Args:
prices: leg 내 매수 후보 가격.
n_sells: 매도 횟수(1 이상).
exit_spec: None이면 default.
Returns:
weight 리스트 (합 ≈ 1).
"""
if not prices:
return []
scores = [1.0 / max(p, 1e-9) for p in prices]
s = sum(scores)
return [x / s for x in scores] if s > 0 else [1.0 / len(prices)] * len(prices)
spec = exit_spec or GtExitSpec()
if n_sells <= 1:
return [1.0]
weights = list(spec.split_weights[:n_sells])
if len(weights) < n_sells:
weights.extend([weights[-1]] * (n_sells - len(weights)))
return normalize_weights(weights)
def sell_split_weights(n_sells: int) -> list[float]:
def pair_peak_sell_weights(
n_peaks: int,
exit_spec: GtExitSpec | None = None,
) -> list[tuple[float, float]]:
"""
leg 매도 비중.
고점 피벗 (피벗, weight) 쌍 — 1회 또는 분할.
Args:
n_sells: 매도 횟수(1 또는 2).
n_peaks: 인정된 고점 수 (1 또는 2+).
exit_spec: 매도 명세.
Returns:
weight 리스트.
(weight,) 또는 (w1, w2) 리스트. 호출측에서 피벗과 zip.
"""
spec = GtExitSpec()
if n_sells >= 2:
return list(spec.split_weights)
return [1.0]
if n_peaks <= 1:
return [(1.0,)]
w = sell_split_weights(2, exit_spec)
return [(w[0],), (w[1],)]
def remaining_weight_sum(
trades: list[dict[str, Any]],
leg_id: int,
from_index: int,
) -> float:
"""
leg 내 from_index 이후 남은 매수 weight 합.
Args:
trades: 시각순 trade dict.
leg_id: leg 번호.
from_index: chron 리스트 인덱스.
Returns:
남은 weight 합.
"""
total = 0.0
for j, t in enumerate(trades):
if j < from_index:
continue
if int(t.get("leg_id", 0)) != leg_id:
continue
if t.get("action") == "buy":
total += float(t.get("weight", 1.0))
return total
def model_to_dict(model: GroundTruthModel | None = None) -> dict[str, Any]:
@@ -165,6 +322,12 @@ def model_to_dict(model: GroundTruthModel | None = None) -> dict[str, Any]:
직렬화 dict.
"""
m = model or default_model()
w_rule = m.entry.weight_rule
w_formula = (
"w_i = (1/price_i) / sum(1/price_j)"
if w_rule == "inverse_price_normalized"
else "w_i = 1 / n"
)
return {
"selection_mode": m.selection_mode,
"leg_definition": m.leg_definition,
@@ -172,7 +335,7 @@ def model_to_dict(model: GroundTruthModel | None = None) -> dict[str, Any]:
"pivot": m.entry.pivot_kind,
"price": m.entry.price_field,
"weight_rule": m.entry.weight_rule,
"weight_formula": "w_i = (1/price_i) / sum(1/price_j)",
"weight_formula": w_formula,
"max_buys_per_leg": m.entry.max_per_leg,
"min_bars_between_buys": m.entry.min_bars_gap,
"bb_filter": m.entry.bb_filter,
@@ -181,7 +344,7 @@ def model_to_dict(model: GroundTruthModel | None = None) -> dict[str, Any]:
"pivot": m.exit.pivot_kind,
"price": m.exit.price_field,
"weight_rule": "fixed_split_or_full",
"weights_two_sell": list(m.exit.split_weights),
"weights_split": list(m.exit.split_weights),
"split_gap_pct": m.exit.split_gap_pct,
"max_sells_per_leg": m.exit.max_per_leg,
},
@@ -209,7 +372,7 @@ def summarize_leg_weights(trades: list[dict[str, Any]]) -> dict[str, Any]:
trades: GT trade dict.
Returns:
leg_id → {buy_sum, sell_sum, n_buy, n_sell}.
leg_id → {buy_sum, sell_sum, n_buy, n_sell, valid}.
"""
legs: dict[int, dict[str, Any]] = {}
for t in trades:
@@ -225,4 +388,30 @@ def summarize_leg_weights(trades: list[dict[str, Any]]) -> dict[str, Any]:
elif t.get("action") == "sell":
legs[lid]["sell_sum"] += w
legs[lid]["n_sell"] += 1
for lid, info in legs.items():
buy_ok = abs(info["buy_sum"] - 1.0) < 0.02 or info["n_buy"] == 0
sell_ok = abs(info["sell_sum"] - 1.0) < 0.02 or info["n_sell"] == 0
info["valid"] = buy_ok and sell_ok
info["buy_sum"] = round(info["buy_sum"], 4)
info["sell_sum"] = round(info["sell_sum"], 4)
return legs
def weight_policy_summary(model: GroundTruthModel | None = None) -> dict[str, Any]:
"""
시뮬·리포트용 비중 정책 요약.
Args:
model: GT 모델.
Returns:
entry/exit/capital 요약 dict.
"""
m = model or default_model()
return {
"entry_weight_rule": m.entry.weight_rule,
"exit_split_weights": list(m.exit.split_weights),
"capital_large_pct": m.capital.pct_large,
"capital_small_pct": m.capital.pct_small,
"large_leg_top_pct": m.capital.large_leg_top_pct,
}

View File

@@ -0,0 +1,181 @@
"""
인과적(미래 미사용) GT 스타일 신호 — t봉 시점에 t 이하 데이터만 사용.
ZigZag/국소극값: pivot bar i-order 는 bar i 에서 확정 (i-order..i 구간만 관측).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from config import (
GT_BUY_BB_MAX,
GT_BUY_MIN_SWING_PCT,
GT_MIN_SWING_PCT,
GT_PIVOT_ORDER,
)
def _confirmed_trough_mask(
low: np.ndarray,
order: int,
) -> np.ndarray:
"""
bar i 에서 i-order 봉이 저점임을 확정 (low[i-order:i+1] 만 사용).
Args:
low: Low 가격 배열.
order: pivot 반경(봉).
Returns:
길이 n, i 에 1이면 i 시점 매수 확인 신호.
"""
n = len(low)
out = np.zeros(n, dtype=np.int8)
for i in range(2 * order, n):
p = i - order
seg = low[p - order : i + 1]
if len(seg) == 0:
continue
if low[p] <= seg.min() + 1e-12:
out[i] = 1
return out
def _confirmed_peak_mask(
high: np.ndarray,
order: int,
) -> np.ndarray:
"""
bar i 에서 i-order 봉이 고점임을 확정.
Args:
high: High 가격 배열.
order: pivot 반경.
Returns:
i 시점 매도 확인 신호.
"""
n = len(high)
out = np.zeros(n, dtype=np.int8)
for i in range(2 * order, n):
p = i - order
seg = high[p - order : i + 1]
if len(seg) == 0:
continue
if high[p] >= seg.max() - 1e-12:
out[i] = 1
return out
def _zigzag_filter_causal(
confirm: np.ndarray,
prices: np.ndarray,
min_swing_pct: float,
kind: str,
) -> np.ndarray:
"""
확정 피벗에 ZigZag 최소 스윙% 필터 (인과적, 순차 갱신).
Args:
confirm: bar i 에 확정 플래그.
prices: pivot 가격 (i-order 위치의 low/high).
pivot_indices: confirm==1 인 bar index.
min_swing_pct: 최소 스윙 %.
kind: trough | peak.
Returns:
zigzag 통과 시점에 1.
"""
n = len(confirm)
out = np.zeros(n, dtype=np.int8)
order = GT_PIVOT_ORDER
last_kind: str | None = None
last_price = 0.0
min_ratio = min_swing_pct / 100.0
for i in range(n):
if confirm[i] != 1:
continue
p = i - order
if p < 0:
continue
price = float(prices[p])
if last_kind is None:
out[i] = 1
last_kind = kind
last_price = price
continue
if kind == last_kind:
if kind == "trough" and price < last_price:
out[i - 1] = 0
out[i] = 1
last_price = price
elif kind == "peak" and price > last_price:
out[i - 1] = 0
out[i] = 1
last_price = price
continue
move = abs(price - last_price) / max(last_price, 1e-9)
if move >= min_ratio:
out[i] = 1
last_kind = kind
last_price = price
return out
def enrich_scan_frame_gt_signals_causal(
frame: pd.DataFrame,
*,
pivot_order: int = GT_PIVOT_ORDER,
buy_swing_pct: float = GT_BUY_MIN_SWING_PCT,
sell_swing_pct: float = GT_MIN_SWING_PCT,
bb_max: float = GT_BUY_BB_MAX,
) -> pd.DataFrame:
"""
인과적 GT 신호 컬럼 (gt_*). t 시점 신호는 데이터 index<=t 만 사용.
Args:
frame: m3 스캔 프레임.
pivot_order: 확정 지연(봉).
buy_swing_pct: 매수 ZigZag 스윙%.
sell_swing_pct: 매도 ZigZag 스윙%.
bb_max: BB 하단 필터.
Returns:
gt_* 컬럼 추가 DataFrame.
"""
out = frame.copy()
if "Low" not in out.columns or "High" not in out.columns:
return out
low = out["Low"].astype(float).values
high = out["High"].astype(float).values
n = len(low)
trough_conf = _confirmed_trough_mask(low, pivot_order)
peak_conf = _confirmed_peak_mask(high, pivot_order)
trough_z = _zigzag_filter_causal(
trough_conf, low, buy_swing_pct, "trough"
)
peak_z = _zigzag_filter_causal(
peak_conf, high, sell_swing_pct, "peak"
)
out["gt_trough_local"] = trough_conf
out["gt_peak_local"] = peak_conf
out["gt_trough_zigzag"] = trough_z
out["gt_peak_zigzag"] = peak_z
bb_ok = pd.Series(True, index=out.index)
if "bb_pos" in out.columns:
bb = pd.to_numeric(out["bb_pos"], errors="coerce")
bb_ok = bb <= bb_max
out["gt_buy_signal"] = (pd.Series(trough_z, index=out.index) == 1) & bb_ok
out["gt_buy_signal"] = out["gt_buy_signal"].astype(int)
out["gt_sell_signal"] = pd.Series(peak_z, index=out.index).astype(int)
out["gt_signal_causal"] = 1
return out

View File

@@ -0,0 +1,197 @@
"""
GT 모델(entry/exit)을 규칙 스캔·발화 형식으로 일반화.
ZigZag trough/peak + BB 필터 등 GT 타점 생성 로직과 동일 파라미터를
rule_eval 스캔 프레임 컬럼(gt_*)으로 노출합니다.
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from config import (
GT_BUY_BB_MAX,
GT_BUY_MIN_SWING_PCT,
GT_MIN_SWING_PCT,
GT_PIVOT_ORDER,
MATCH_PRIMARY_INTERVAL,
)
from deepcoin.ground_truth.ground_truth import build_zigzag_pivots
def _local_extrema_mask(
series: pd.Series,
order: int,
kind: str,
) -> pd.Series:
"""
국소 극값 boolean 마스크.
Args:
series: 가격 시리즈.
order: 좌우 봉 수.
kind: min | max.
Returns:
boolean Series (index=series.index).
"""
arr = series.astype(float).values
n = len(arr)
out = np.zeros(n, dtype=bool)
if n < 2 * order + 1:
return pd.Series(out, index=series.index)
for i in range(order, n - order):
window = arr[i - order : i + order + 1]
if kind == "min" and arr[i] <= window.min():
out[i] = True
elif kind == "max" and arr[i] >= window.max():
out[i] = True
return pd.Series(out, index=series.index)
def enrich_scan_frame_gt_signals(
frame: pd.DataFrame,
*,
pivot_order: int = GT_PIVOT_ORDER,
buy_swing_pct: float = GT_BUY_MIN_SWING_PCT,
sell_swing_pct: float = GT_MIN_SWING_PCT,
bb_max: float = GT_BUY_BB_MAX,
causal: bool | None = None,
) -> pd.DataFrame:
"""
스캔 프레임에 GT 모델 신호 컬럼을 추가합니다.
GT_SIGNAL_CAUSAL=1 이면 t 시점까지 데이터만 사용 (운영 정합).
Args:
frame: m3 스캔 프레임 (Low, High, bb_pos).
pivot_order: 피벗 반경.
buy_swing_pct: 매수 ZigZag 스윙%.
sell_swing_pct: 매도 ZigZag 스윙%.
bb_max: BB 하단 필터.
causal: None이면 config GT_SIGNAL_CAUSAL.
Returns:
gt_* 컬럼이 추가된 DataFrame.
"""
from config import GT_SIGNAL_CAUSAL
use_causal = GT_SIGNAL_CAUSAL if causal is None else causal
if use_causal:
from deepcoin.ground_truth.gt_signal_causal import (
enrich_scan_frame_gt_signals_causal,
)
return enrich_scan_frame_gt_signals_causal(
frame,
pivot_order=pivot_order,
buy_swing_pct=buy_swing_pct,
sell_swing_pct=sell_swing_pct,
bb_max=bb_max,
)
out = frame.copy()
if "Low" not in out.columns or "High" not in out.columns:
return out
low = out["Low"].astype(float)
high = out["High"].astype(float)
out["gt_trough_local"] = _local_extrema_mask(low, pivot_order, "min").astype(int)
out["gt_peak_local"] = _local_extrema_mask(high, pivot_order, "max").astype(int)
df_ohlc = out[["Low", "High"]].copy()
if "close" in out.columns:
df_ohlc["close"] = out["close"]
df_ohlc.index = out.index
buy_pivots = build_zigzag_pivots(
df_ohlc,
min_swing_pct=buy_swing_pct,
pivot_order=pivot_order,
)
sell_pivots = build_zigzag_pivots(
df_ohlc,
min_swing_pct=sell_swing_pct,
pivot_order=pivot_order,
)
trough_z = pd.Series(0, index=out.index, dtype=int)
for p in buy_pivots:
if p.kind == "trough" and p.ts in trough_z.index:
trough_z.loc[p.ts] = 1
peak_z = pd.Series(0, index=out.index, dtype=int)
for p in sell_pivots:
if p.kind == "peak" and p.ts in peak_z.index:
peak_z.loc[p.ts] = 1
out["gt_trough_zigzag"] = trough_z
out["gt_peak_zigzag"] = peak_z
bb_ok = pd.Series(True, index=out.index)
if "bb_pos" in out.columns:
bb = pd.to_numeric(out["bb_pos"], errors="coerce")
bb_ok = bb <= bb_max
out["gt_buy_signal"] = ((out["gt_trough_zigzag"] == 1) & bb_ok).astype(int)
out["gt_sell_signal"] = (out["gt_peak_zigzag"] == 1).astype(int)
return out
def build_gt_model_rules() -> list[dict[str, Any]]:
"""
GT entry/exit 명세와 동일한 스캔 규칙 후보.
Returns:
rule dict 리스트 (buy 2종 + sell 2종).
"""
return [
{
"rule_id": "gt_model_buy_zigzag_bb",
"side": "buy",
"kind": "gt_model",
"logic": "and",
"conditions": [
{"col": "gt_buy_signal", "op": "eq_int", "value": 1},
],
"gt_spec": "trough_zigzag + bb_pos <= GT_BUY_BB_MAX",
},
{
"rule_id": "gt_model_buy_trough_local",
"side": "buy",
"kind": "gt_model",
"logic": "and",
"conditions": [
{"col": "gt_trough_local", "op": "eq_int", "value": 1},
{"col": "bb_pos", "op": "lte", "value": GT_BUY_BB_MAX},
],
"gt_spec": "local trough + bb filter",
},
{
"rule_id": "gt_model_sell_zigzag_peak",
"side": "sell",
"kind": "gt_model",
"logic": "and",
"conditions": [
{"col": "gt_sell_signal", "op": "eq_int", "value": 1},
],
"gt_spec": "major swing peak (ZigZag)",
},
{
"rule_id": "gt_model_sell_peak_local",
"side": "sell",
"kind": "gt_model",
"logic": "and",
"conditions": [
{"col": "gt_peak_local", "op": "eq_int", "value": 1},
],
"gt_spec": "local high extremum",
},
]
def gt_signal_rule_ids() -> set[str]:
"""GT 일반화 규칙 ID 집합."""
return {r["rule_id"] for r in build_gt_model_rules()}