Files
Bithumb/deepcoin/ground_truth/ground_truth.py
xavis 91c9338651 fix: GT 중복 인덱스 Series 오류 수정 및 파이프라인 산출물 갱신
봉 데이터 갱신 후 02~04 재실행. BB 저점 수집 시 _row_at_ts로 스칼라 추출.
hybrid Go/No-Go 및 matched_rules·fire_outcomes 동기화.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-04 08:41:33 +09:00

1394 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
차트 조회 구간(기본 1년) 3분봉에서 최적 매수·매도 타점(정답 라벨)을 생성합니다.
방법:
1) ZigZag 피벗(스윙 고저) 추출
2) split_buy_peak_sell: 저점 분할 매수 + 고점 1~2회 매도 (비중=삼각형 크기)
3) ground_truth_trades.json 저장
실행:
python scripts/02_ground_truth.py
python scripts/05_chart_truth.py
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from config import (
CHART_LOOKBACK_DAYS,
ENTRY_INTERVAL,
GROUND_TRUTH_FILE,
GT_BUY_BB_MAX,
GT_BUY_MIN_BARS,
GT_BUY_MIN_SWING_PCT,
GT_BUY_PCT_LARGE_LEG,
GT_BUY_PCT_SMALL_LEG,
GT_INITIAL_CASH_KRW,
GT_LARGE_LEG_TOP_PCT,
GT_MIN_ORDER_KRW,
GT_MAX_BUYS_PER_LEG,
GT_MAX_ROUND_TRIPS,
TRADING_FEE_RATE,
GT_MAX_SELLS_PER_LEG,
GT_MIN_BARS_BETWEEN,
GT_MIN_LEG_PCT,
GT_MIN_SWING_PCT,
GT_PIVOT_ORDER,
GT_SELECTION_MODE,
GT_SELL_SPLIT_GAP_PCT,
SYMBOL,
)
from deepcoin.common.indicators import apply_bar_indicators, get_trend
from deepcoin.data.mtf_bb import load_frames_from_db
from deepcoin.ground_truth.gt_allocation import (
allocate_order_amounts_chronological,
resolve_sell_qty as _resolve_sell_qty,
)
from deepcoin.ground_truth.gt_model import (
compute_entry_weights,
leg_entry_weights,
leg_exit_weights,
sell_split_weights,
)
from deepcoin.paths import resolve_ground_truth_file
DEFAULT_OUTPUT = resolve_ground_truth_file()
@dataclass
class Pivot:
"""스윙 피벗 한 점."""
idx: int
ts: pd.Timestamp
kind: str # "trough" | "peak"
price: float
@dataclass
class TradePoint:
"""정답 타점 1건."""
dt: str
action: str
price: float
memo: str
weight: float = 1.0
amount_krw: float | None = None
leg_id: int = 0
bb_pos: float | None = None
rsi: float | None = None
pivot_kind: str = ""
forward_return_pct: float | None = None
def _local_extrema_indices(arr: np.ndarray, order: int, kind: str) -> np.ndarray:
"""
order 양옆 구간에서 국소 최소/최대 인덱스를 반환합니다.
Args:
arr: 가격 배열.
order: 좌우 봉 수.
kind: "min" 또는 "max".
Returns:
인덱스 ndarray.
"""
n = len(arr)
if n < 2 * order + 1:
return np.array([], dtype=int)
out: list[int] = []
for i in range(order, n - order):
window = arr[i - order : i + order + 1]
if kind == "min" and arr[i] <= window.min():
out.append(i)
elif kind == "max" and arr[i] >= window.max():
out.append(i)
return np.array(out, dtype=int)
def build_zigzag_pivots(
df: pd.DataFrame,
min_swing_pct: float = GT_MIN_SWING_PCT,
pivot_order: int = GT_PIVOT_ORDER,
) -> list[Pivot]:
"""
ZigZag 방식으로 스윙 저점·고점 피벗을 만듭니다.
Args:
df: OHLCV (index=datetime).
min_swing_pct: 피벗 확정 최소 가격 변동(%).
pivot_order: 국소 극값 탐색 반경(봉).
Returns:
시간순 Pivot 리스트.
"""
low = df["Low"].astype(float).values
high = df["High"].astype(float).values
index = df.index
min_ratio = min_swing_pct / 100.0
trough_idx = _local_extrema_indices(low, pivot_order, "min")
peak_idx = _local_extrema_indices(high, pivot_order, "max")
candidates: list[tuple[int, str, float]] = []
for i in trough_idx:
candidates.append((int(i), "trough", float(low[i])))
for i in peak_idx:
candidates.append((int(i), "peak", float(high[i])))
candidates.sort(key=lambda x: x[0])
if not candidates:
return []
pivots: list[Pivot] = []
last_kind: str | None = None
last_price = 0.0
for idx, kind, price in candidates:
if not pivots:
pivots.append(Pivot(idx, index[idx], kind, price))
last_kind = kind
last_price = price
continue
if kind == last_kind:
# 같은 방향이면 더 극단적인 쪽만 유지
if kind == "trough" and price < last_price:
pivots[-1] = Pivot(idx, index[idx], kind, price)
last_price = price
elif kind == "peak" and price > last_price:
pivots[-1] = Pivot(idx, index[idx], kind, price)
last_price = price
continue
move = abs(price - last_price) / max(last_price, 1e-9)
if move >= min_ratio:
pivots.append(Pivot(idx, index[idx], kind, price))
last_kind = kind
last_price = price
return pivots
def _select_optimal_chain(
pivots: list[Pivot],
min_bars: int = GT_MIN_BARS_BETWEEN,
max_round_trips: int = GT_MAX_ROUND_TRIPS,
mode: str = GT_SELECTION_MODE,
) -> list[Pivot]:
"""
피벗에서 정답 체인을 선택합니다.
Args:
pivots: ZigZag 피벗.
min_bars: 연속 체결 최소 봉 간격.
max_round_trips: 최대 라운드트립 수.
mode: "zigzag" 또는 "max_profit".
Returns:
선택된 Pivot 부분열 (매수·매도 교대).
"""
if len(pivots) < 2:
return []
if mode == "zigzag":
return _filter_alternating_pivots(pivots, min_bars, max_round_trips * 2)
if mode == "major_swings":
return _select_major_swings(
pivots,
min_bars=min_bars,
max_round_trips=max_round_trips,
min_leg_pct=GT_MIN_LEG_PCT,
)
intervals: list[tuple[int, int, float, Pivot, Pivot]] = []
for i, buy_p in enumerate(pivots):
if buy_p.kind != "trough":
continue
for j in range(i + 1, len(pivots)):
sell_p = pivots[j]
if sell_p.kind != "peak":
continue
if sell_p.idx - buy_p.idx < min_bars:
continue
if sell_p.price <= buy_p.price:
continue
profit = (sell_p.price - buy_p.price) / buy_p.price * 100.0
intervals.append((buy_p.idx, sell_p.idx, profit, buy_p, sell_p))
if not intervals:
return _filter_alternating_pivots(pivots, min_bars, max_round_trips * 2)
intervals.sort(key=lambda x: x[1])
m = len(intervals)
sell_bars = [iv[1] for iv in intervals]
def prev_non_overlap(k: int) -> int:
"""매도 봉이 겹치지 않도록, 이전 구간의 매도 봉 < 현재 매수 봉."""
buy_bar = intervals[k][0]
lo, hi = 0, k - 1
ans = -1
while lo <= hi:
mid = (lo + hi) // 2
if sell_bars[mid] < buy_bar:
ans = mid
lo = mid + 1
else:
hi = mid - 1
return ans
pprev = [prev_non_overlap(k) for k in range(m)]
dp_val = [0.0] * m
dp_take = [False] * m
for k in range(m):
profit = intervals[k][2]
p_idx = pprev[k]
skip = profit
take = profit + (dp_val[p_idx] if p_idx >= 0 else 0.0)
if take >= skip:
dp_val[k] = take
dp_take[k] = True
else:
dp_val[k] = skip
chain_iv: list[tuple[int, int, float, Pivot, Pivot]] = []
k = m - 1
if m == 0:
return []
best_end = max(range(m), key=lambda i: dp_val[i])
k = best_end
while k >= 0 and len(chain_iv) < max_round_trips:
if dp_take[k]:
chain_iv.append(intervals[k])
k = pprev[k]
else:
k -= 1
chain_iv.reverse()
result: list[Pivot] = []
for _, _, _, bp, sp in chain_iv:
result.extend([bp, sp])
return result
def _select_major_swings(
pivots: list[Pivot],
min_bars: int,
max_round_trips: int,
min_leg_pct: float,
) -> list[Pivot]:
"""
ZigZag 교대 체인에서 구간 수익이 min_leg_pct 이상인 매수·매도만 남깁니다.
구간이 max_round_trips를 초과하면 비겹침 수익 합이 최대가 되도록 고릅니다.
Args:
pivots: ZigZag 피벗.
min_bars: 체결 간 최소 봉 수.
max_round_trips: 최대 라운드트립.
min_leg_pct: 한 구간 최소 수익률(%).
Returns:
선택된 Pivot 리스트 (시간순).
"""
chain = _filter_alternating_pivots(pivots, min_bars, len(pivots))
if len(chain) < 2:
return chain
legs: list[tuple[float, Pivot, Pivot, int, int]] = []
i = 0
while i < len(chain) - 1:
buy_p = chain[i]
sell_p = chain[i + 1]
if buy_p.kind == "trough" and sell_p.kind == "peak":
profit = (sell_p.price - buy_p.price) / max(buy_p.price, 1e-9) * 100.0
if profit >= min_leg_pct:
legs.append((profit, buy_p, sell_p, buy_p.idx, sell_p.idx))
i += 2
else:
i += 1
if not legs:
# 임계값 완화 후 재시도
return _select_major_swings(
pivots,
min_bars,
max_round_trips,
min_leg_pct=max(min_leg_pct * 0.6, 3.0),
)
if len(legs) <= max_round_trips:
out: list[Pivot] = []
for _, bp, sp, _, _ in legs:
out.extend([bp, sp])
return out
# 1년 라벨: 시간순 비겹침 구간 전부 사용 (상한으로 뒤쪽 월이 빠지지 않게 함)
if CHART_LOOKBACK_DAYS >= 300:
chosen: list[tuple[float, Pivot, Pivot, int, int]] = []
last_sell_bar = -1
for pr, bp, sp, lb, sb in sorted(legs, key=lambda x: x[3]):
if lb > last_sell_bar:
chosen.append((pr, bp, sp, lb, sb))
last_sell_bar = sb
result: list[Pivot] = []
for _pr, bp, sp, _lb, _sb in chosen:
result.extend([bp, sp])
return result
intervals = [(lb, sb, pr, bp, sp) for pr, bp, sp, lb, sb in legs]
intervals.sort(key=lambda x: x[1])
m = len(intervals)
sell_bars = [iv[1] for iv in intervals]
def prev_non_overlap(k: int) -> int:
buy_bar = intervals[k][0]
lo, hi = 0, k - 1
ans = -1
while lo <= hi:
mid = (lo + hi) // 2
if sell_bars[mid] < buy_bar:
ans = mid
lo = mid + 1
else:
hi = mid - 1
return ans
pprev = [prev_non_overlap(k) for k in range(m)]
dp_val = [0.0] * m
dp_take = [False] * m
for k in range(m):
profit = intervals[k][2]
p_idx = pprev[k]
take = profit + (dp_val[p_idx] if p_idx >= 0 else 0.0)
if take >= profit:
dp_val[k] = take
dp_take[k] = True
else:
dp_val[k] = profit
best_end = max(range(m), key=lambda i: dp_val[i])
k = best_end
chosen: list[tuple[float, Pivot, Pivot, int, int]] = []
while k >= 0 and len(chosen) < max_round_trips:
if dp_take[k]:
chosen.append(intervals[k])
k = pprev[k]
else:
k -= 1
chosen.reverse()
result: list[Pivot] = []
for _lb, _sb, _pr, bp, sp in chosen:
result.extend([bp, sp])
return result
def _filter_alternating_pivots(
pivots: list[Pivot],
min_bars: int,
max_points: int,
) -> list[Pivot]:
"""ZigZag 피벗을 간격·교대 규칙으로만 줄입니다."""
filtered: list[Pivot] = []
for p in pivots:
if filtered and p.idx - filtered[-1].idx < min_bars:
continue
if filtered and p.kind == filtered[-1].kind:
if p.kind == "trough" and p.price < filtered[-1].price:
filtered[-1] = p
elif p.kind == "peak" and p.price > filtered[-1].price:
filtered[-1] = p
continue
filtered.append(p)
if filtered and filtered[0].kind == "peak":
filtered = filtered[1:]
if filtered and filtered[-1].kind == "trough":
filtered = filtered[:-1]
return filtered[:max_points]
def _bb_context(row: pd.Series) -> tuple[float | None, float | None, float | None]:
"""봉의 BB %B, RSI, 이격도(20 기본)."""
from config import DISPARITY_PERIODS
from deepcoin.common.indicators import disparity_column
bb = None
if "bb_pos" in row.index and pd.notna(row["bb_pos"]):
bb = round(float(row["bb_pos"]), 3)
rsi = None
if "RSI" in row.index and pd.notna(row["RSI"]):
rsi = round(float(row["RSI"]), 1)
disp = None
primary_p = 20 if 20 in DISPARITY_PERIODS else DISPARITY_PERIODS[0]
dcol = disparity_column(primary_p)
if dcol in row.index and pd.notna(row[dcol]):
disp = round(float(row[dcol]), 1)
return bb, rsi, disp
def _memo_for_trade(
action: str,
pivot: Pivot,
bb_pos: float | None,
rsi: float | None,
disparity: float | None,
forward_pct: float | None,
) -> str:
"""타점 해석 메모."""
zone = "중단"
if bb_pos is not None:
if bb_pos < 0.25:
zone = "밴드 하단"
elif bb_pos > 0.75:
zone = "밴드 상단"
parts = [
f"ZigZag {pivot.kind}",
zone,
]
if rsi is not None:
parts.append(f"RSI {rsi}")
if disparity is not None:
parts.append(f"D.I.{disparity}")
if forward_pct is not None and action == "buy":
parts.append(f"다음 매도까지 +{forward_pct:.1f}%")
elif forward_pct is not None and action == "sell":
parts.append(f"직전 매수 대비 +{forward_pct:.1f}%")
return " · ".join(parts)
def _bar_index(df: pd.DataFrame, ts: pd.Timestamp) -> int:
"""타임스탬프의 정수 봉 위치."""
loc = df.index.get_loc(ts if ts in df.index else df.index[df.index.get_indexer([ts], method="nearest")[0]])
if isinstance(loc, slice):
return int(loc.start or 0)
if hasattr(loc, "__len__") and not isinstance(loc, int):
return int(loc[-1])
return int(loc)
def _row_at_ts(df: pd.DataFrame, ts: pd.Timestamp) -> pd.Series:
"""타임스탬프에 해당하는 봉 1행."""
loc = ts if ts in df.index else df.index[df.index.get_indexer([ts], method="nearest")[0]]
row = df.loc[loc]
if isinstance(row, pd.DataFrame):
row = row.iloc[-1]
return row
def _collect_buy_troughs(
df: pd.DataFrame,
buy_pivots: list[Pivot],
start: pd.Timestamp,
end: pd.Timestamp,
min_bars: int,
max_buys: int = GT_MAX_BUYS_PER_LEG,
) -> list[Pivot]:
"""
매도 전 구간의 ZigZag 저점(trough)을 모읍니다.
BB 하단이면서 구간 최저에 가까운 저점 1건만 추가 보완합니다.
"""
out: list[Pivot] = []
for p in buy_pivots:
if start < p.ts < end:
out.append(p)
if "bb_pos" in df.columns and out:
seg = df[(df.index > start) & (df.index < end)]
if not seg.empty and "bb_pos" in seg.columns:
bb_seg = seg[seg["bb_pos"] <= GT_BUY_BB_MAX]
if not bb_seg.empty:
loc = bb_seg["Low"].astype(float).idxmin()
idx = _bar_index(df, loc)
if all(abs(idx - p.idx) >= min_bars for p in out):
row_bb = _row_at_ts(bb_seg, loc)
out.append(
Pivot(idx, loc, "trough", float(row_bb["Low"]))
)
out.sort(key=lambda x: x.ts)
filtered: list[Pivot] = []
for p in out:
if filtered and p.idx - filtered[-1].idx < min_bars:
if p.price < filtered[-1].price:
filtered[-1] = p
continue
filtered.append(p)
if len(filtered) > max_buys:
filtered.sort(key=lambda x: x.price)
filtered = sorted(filtered[:max_buys], key=lambda x: x.ts)
return filtered
def _peak_sell_points(
df: pd.DataFrame,
peak: Pivot,
max_splits: int,
split_gap_pct: float,
) -> list[tuple[Pivot, float]]:
"""
고점에서 1회 또는 분할 매도 시점·비중.
Returns:
(피벗, 비중) 리스트. 비중 합 = 1.0.
"""
row = _row_at_ts(df, peak.ts)
main_price = float(row["High"]) if "High" in row else peak.price
main = Pivot(peak.idx, peak.ts, "peak", main_price)
if max_splits < 2:
return [(main, 1.0)]
seg = df.iloc[peak.idx : peak.idx + 80]
if len(seg) < 5:
return [(main, 1.0)]
sub_peaks: list[Pivot] = []
highs = seg["High"].astype(float).values
for j in range(2, len(seg) - 2):
if highs[j] >= highs[j - 2 : j + 3].max():
px = float(highs[j])
if abs(px - main_price) / max(main_price, 1e-9) * 100 <= split_gap_pct:
sub_peaks.append(
Pivot(peak.idx + j, seg.index[j], "peak", px)
)
if not sub_peaks:
return [(main, 1.0)]
second = max(sub_peaks, key=lambda x: x.price)
if second.ts == main.ts:
return [(main, 1.0)]
w = sell_split_weights(2)
return [(main, w[0]), (second, w[1])]
def build_split_buy_peak_sell_trades(
df: pd.DataFrame,
raw_pivots: list[Pivot],
sell_peaks: list[Pivot],
buy_min_bars: int = GT_BUY_MIN_BARS,
) -> list[TradePoint]:
"""
저점 분할 매수 + 고점 1~2회 매도 정답 타점.
Args:
df: 지표 포함 3분봉.
raw_pivots: ZigZag 피벗(매수 탐지용, 낮은 스윙%).
sell_peaks: 고점 매도 기준 피벗(major swing).
buy_min_bars: 분할 매수 최소 간격(봉).
Returns:
TradePoint 리스트.
"""
buy_pivots = build_zigzag_pivots(
df, min_swing_pct=GT_BUY_MIN_SWING_PCT, pivot_order=GT_PIVOT_ORDER
)
buy_pivots = [p for p in buy_pivots if p.kind == "trough"]
sell_peaks = sorted(sell_peaks, key=lambda x: x.ts)
trades: list[TradePoint] = []
prev_sell_ts = df.index[0]
for leg_id, peak in enumerate(sell_peaks):
troughs = _collect_buy_troughs(df, buy_pivots, prev_sell_ts, peak.ts, buy_min_bars)
if troughs:
prices = [
float(_row_at_ts(df, t.ts)["Low"]) if "Low" in _row_at_ts(df, t.ts) else t.price
for t in troughs
]
weights = leg_entry_weights(prices)
for t, w in zip(troughs, weights):
row = _row_at_ts(df, t.ts)
bb_pos, rsi, disp = _bb_context(row)
price = float(row["Low"]) if "Low" in row else t.price
pct = (peak.price - price) / max(price, 1e-9) * 100.0
trades.append(
TradePoint(
dt=t.ts.strftime("%Y-%m-%d %H:%M:%S"),
action="buy",
price=round(price, 2),
weight=round(w, 3),
leg_id=leg_id,
memo=(
f"저점 분할 매수 · 비중 {w*100:.0f}% · {len(troughs)}"
f"· BB하단 · leg#{leg_id}"
),
bb_pos=bb_pos,
rsi=rsi,
pivot_kind="trough",
forward_return_pct=round(pct, 2),
)
)
sell_pts = _peak_sell_points(
df, peak, GT_MAX_SELLS_PER_LEG, GT_SELL_SPLIT_GAP_PCT
)
leg_avg = (
sum(t.price * t.weight for t in trades if t.leg_id == leg_id and t.action == "buy")
/ max(
sum(t.weight for t in trades if t.leg_id == leg_id and t.action == "buy"),
1e-9,
)
)
for sp, w in sell_pts:
row = _row_at_ts(df, sp.ts)
bb_pos, rsi, disp = _bb_context(row)
price = float(row["High"]) if "High" in row else sp.price
ret = (price - leg_avg) / max(leg_avg, 1e-9) * 100.0 if leg_avg > 0 else None
n_sell = len(sell_pts)
trades.append(
TradePoint(
dt=sp.ts.strftime("%Y-%m-%d %H:%M:%S"),
action="sell",
price=round(price, 2),
weight=round(w, 3),
leg_id=leg_id,
memo=(
f"고점 매도 · 비중 {w*100:.0f}% · "
f"{'분할' if n_sell > 1 else '1회'} · leg#{leg_id}"
),
bb_pos=bb_pos,
rsi=rsi,
pivot_kind="peak",
forward_return_pct=round(ret, 2) if ret is not None else None,
)
)
prev_sell_ts = peak.ts
# 마지막 매도 이후 ~ 기간 말: 분할 매수 후 동일 leg에서 기간말 청산(포트폴리오 정합)
if sell_peaks:
last_peak = sell_peaks[-1]
troughs = _collect_buy_troughs(
df, buy_pivots, last_peak.ts, df.index[-1], buy_min_bars
)
leg_id = len(sell_peaks)
if troughs:
prices = [
float(_row_at_ts(df, t.ts)["Low"]) if "Low" in _row_at_ts(df, t.ts) else t.price
for t in troughs
]
weights = leg_entry_weights(prices)
leg_buys: list[TradePoint] = []
for t, w in zip(troughs, weights):
row = _row_at_ts(df, t.ts)
bb_pos, rsi, disp = _bb_context(row)
price = float(row["Low"]) if "Low" in row else t.price
leg_buys.append(
TradePoint(
dt=t.ts.strftime("%Y-%m-%d %H:%M:%S"),
action="buy",
price=round(price, 2),
weight=round(w, 3),
leg_id=leg_id,
memo=f"저점 분할 매수 · 비중 {w*100:.0f}% · leg#{leg_id}(기간말)",
bb_pos=bb_pos,
rsi=rsi,
pivot_kind="trough",
)
)
trades.extend(leg_buys)
leg_avg = (
sum(x.price * x.weight for x in leg_buys)
/ max(sum(x.weight for x in leg_buys), 1e-9)
)
end_ts = df.index[-1]
end_row = df.loc[end_ts]
if isinstance(end_row, pd.DataFrame):
end_row = end_row.iloc[-1]
end_price = float(end_row["Close"])
bb_pos, rsi, _ = _bb_context(end_row)
ret = (end_price - leg_avg) / max(leg_avg, 1e-9) * 100.0 if leg_avg > 0 else None
trades.append(
TradePoint(
dt=end_ts.strftime("%Y-%m-%d %H:%M:%S"),
action="sell",
price=round(end_price, 2),
weight=1.0,
leg_id=leg_id,
memo=f"기간말 잔여 청산 · leg#{leg_id}",
bb_pos=bb_pos,
rsi=rsi,
pivot_kind="peak",
forward_return_pct=round(ret, 2) if ret is not None else None,
)
)
for b in leg_buys:
if b.forward_return_pct is None and ret is not None:
b.forward_return_pct = round(
(end_price - b.price) / max(b.price, 1e-9) * 100.0, 2
)
return trades
def pivots_to_trades(
pivots: list[Pivot],
df: pd.DataFrame,
) -> list[TradePoint]:
"""
피벗을 매수·매도 정답 타점으로 변환합니다.
Args:
pivots: 선택된 피벗.
df: 지표가 포함된 3분봉.
Returns:
TradePoint 리스트.
"""
trades: list[TradePoint] = []
last_buy_price: float | None = None
for i, p in enumerate(pivots):
loc = (
p.ts
if p.ts in df.index
else df.index[df.index.get_indexer([p.ts], method="nearest")[0]]
)
row = df.loc[loc]
if isinstance(row, pd.DataFrame):
row = row.iloc[-1]
bb_pos, rsi, disp = _bb_context(row)
forward_pct: float | None = None
if p.kind == "trough":
action = "buy"
price = float(row["Low"]) if "Low" in row else p.price
if i + 1 < len(pivots) and pivots[i + 1].kind == "peak":
forward_pct = (pivots[i + 1].price - price) / max(price, 1e-9) * 100.0
last_buy_price = price
else:
action = "sell"
price = float(row["High"]) if "High" in row else p.price
if last_buy_price:
forward_pct = (price - last_buy_price) / max(last_buy_price, 1e-9) * 100.0
last_buy_price = None
trades.append(
TradePoint(
dt=p.ts.strftime("%Y-%m-%d %H:%M:%S"),
action=action,
price=round(price, 2),
weight=1.0,
memo=_memo_for_trade(action, p, bb_pos, rsi, disp, forward_pct),
bb_pos=bb_pos,
rsi=rsi,
pivot_kind=p.kind,
forward_return_pct=round(forward_pct, 2) if forward_pct is not None else None,
)
)
return trades
def generate_ground_truth(
df_3m: pd.DataFrame,
df_1d: pd.DataFrame | None = None,
df_1h: pd.DataFrame | None = None,
min_swing_pct: float = GT_MIN_SWING_PCT,
pivot_order: int = GT_PIVOT_ORDER,
min_bars: int = GT_MIN_BARS_BETWEEN,
max_round_trips: int = GT_MAX_ROUND_TRIPS,
selection_mode: str = GT_SELECTION_MODE,
) -> dict[str, Any]:
"""
3분봉 구간에서 정답 타점 JSON 구조를 생성합니다.
Args:
df_3m: 3분 OHLCV.
df_1d: 일봉 (추세 메모용, 선택).
df_1h: 1시간봉 (추세 메모용, 선택).
min_swing_pct: ZigZag 최소 스윙(%).
pivot_order: 국소 극값 반경.
min_bars: 체결 간 최소 봉 수.
max_round_trips: 최대 라운드트립.
selection_mode: zigzag | max_profit.
Returns:
ground_truth_trades.json 에 넣을 dict.
"""
df = apply_bar_indicators(df_3m.sort_index().copy())
if df.empty:
raise ValueError("3분봉 데이터가 비어 있습니다.")
raw_pivots = build_zigzag_pivots(df, min_swing_pct=min_swing_pct, pivot_order=pivot_order)
if selection_mode == "split_buy_peak_sell":
selected = _select_optimal_chain(
raw_pivots,
min_bars=min_bars,
max_round_trips=max_round_trips,
mode="major_swings",
)
sell_peaks = [p for p in selected if p.kind == "peak"]
trades = build_split_buy_peak_sell_trades(df, raw_pivots, sell_peaks)
method = "split_buy_at_troughs + peak_sell_1or2"
else:
selected = _select_optimal_chain(
raw_pivots,
min_bars=min_bars,
max_round_trips=max_round_trips,
mode=selection_mode,
)
trades = pivots_to_trades(selected, df)
method = "zigzag_pivot + max_profit_chain"
trend = "range"
if df_1d is not None and df_1h is not None:
trend = get_trend(df_1d, df_1h)
round_trips = len({t.leg_id for t in trades if t.action == "sell"})
buy_count = sum(1 for t in trades if t.action == "buy")
sell_count = sum(1 for t in trades if t.action == "sell")
total_ret = sum(
t.forward_return_pct or 0.0 for t in trades if t.action == "sell"
)
trade_dicts = order_trades_leg_block(trades)
trade_dicts, alloc_stats = allocate_gt_order_amounts(
trade_dicts,
initial_cash=GT_INITIAL_CASH_KRW,
min_order_krw=GT_MIN_ORDER_KRW,
fee_rate=TRADING_FEE_RATE,
)
last_close = float(df["Close"].iloc[-1])
pnl = simulate_truth_portfolio(
trade_dicts,
initial_cash=GT_INITIAL_CASH_KRW,
fee_rate=TRADING_FEE_RATE,
last_price=last_close,
)
pnl_realized = simulate_truth_portfolio(
trade_dicts,
initial_cash=GT_INITIAL_CASH_KRW,
fee_rate=TRADING_FEE_RATE,
last_price=None,
)
_validate_leg_portfolio(trade_dicts, last_close)
from deepcoin.ground_truth.gt_model import default_model, model_to_dict
gt_model = model_to_dict(default_model())
return {
"name": "ground_truth_split_buy_peak_sell",
"model": gt_model,
"method": method,
"symbol": SYMBOL,
"interval_min": ENTRY_INTERVAL,
"lookback_days": CHART_LOOKBACK_DAYS,
"period_start": str(df.index[0]),
"period_end": str(df.index[-1]),
"trend_at_end": trend,
"params": {
"min_swing_pct": min_swing_pct,
"pivot_order": pivot_order,
"min_bars_between": min_bars,
"max_round_trips": max_round_trips,
"selection_mode": selection_mode,
"buy_min_swing_pct": GT_BUY_MIN_SWING_PCT,
"buy_bb_max": GT_BUY_BB_MAX,
"max_sells_per_leg": GT_MAX_SELLS_PER_LEG,
},
"summary": {
"pivot_candidates": len(raw_pivots),
"sell_peaks": len([p for p in selected if p.kind == "peak"]) if selected else 0,
"trade_count": len(trades),
"buy_count": buy_count,
"sell_count": sell_count,
"round_trips": round_trips,
"sum_sell_leg_return_pct": round(total_ret, 2),
**pnl,
"realized_final_asset_krw": pnl_realized.get("final_asset_krw"),
"realized_pnl_krw": pnl_realized.get("pnl_krw"),
"realized_pnl_pct": pnl_realized.get("pnl_pct"),
"unrealized_pnl_krw": round(
float(pnl.get("pnl_krw", 0)) - float(pnl_realized.get("pnl_krw", 0)), 0
),
"execution_order": (
"chronological"
if any(float(t.get("amount_krw") or 0) > 0 for t in trade_dicts)
else "leg_block"
),
"order_amount_min_krw": GT_MIN_ORDER_KRW,
"buy_pct_large_leg": GT_BUY_PCT_LARGE_LEG,
"buy_pct_small_leg": GT_BUY_PCT_SMALL_LEG,
"large_leg_top_pct": GT_LARGE_LEG_TOP_PCT,
**alloc_stats,
},
"note": (
"저점 분할 매수(비중=삼각형), 고점 1~2회 매도. "
"매수=총자산×최적비중×티어(상위 leg 대형·그 외 소형), "
f"현금 한도·최소 ₩{GT_MIN_ORDER_KRW:,}. "
"체결 순서=chronological. summary.pnl_pct는 미청산 포함 종가 평가."
),
"trades": trade_dicts,
}
def _validate_leg_portfolio(
trade_dicts: list[dict[str, Any]],
last_close: float,
) -> None:
"""
leg 블록 체결 후 보유·현금 불변식을 검증합니다.
Args:
trade_dicts: order_trades_leg_block 결과.
last_close: 기간 말 종가.
Raises:
ValueError: leg 매도 후에도 보유가 남는 경우(비정상).
"""
steps = simulate_truth_portfolio_steps(trade_dicts)
if not steps:
return
final = steps[-1]
if float(final["holding_qty"]) > 1e-2:
raise ValueError(
f"최종 보유 잔존 qty={final['holding_qty']} — 기간말 청산 누락 가능"
)
pnl = simulate_truth_portfolio(trade_dicts, last_price=last_close)
if float(pnl.get("holding_qty", 0)) > 1e-2:
raise ValueError("종가 평가 후에도 미청산 보유가 남음")
def allocate_gt_order_amounts(
trades: list[dict[str, Any]],
initial_cash: float = GT_INITIAL_CASH_KRW,
min_order_krw: float = GT_MIN_ORDER_KRW,
fee_rate: float = TRADING_FEE_RATE,
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""
GT 각 타점에 amount_krw를 시각순·총자산·비중(최적 매수율)으로 배분합니다.
매수: 목표=총보유자산×(leg 비중 share×티어 스케일), 체결=min(목표, 보유현금/(1+fee)).
leg 상위 GT_LARGE_LEG_TOP_PCT는 GT_BUY_PCT_LARGE_LEG, 그 외는 GT_BUY_PCT_SMALL_LEG.
매도 후 현금 증가분은 다음 매수부터 자동 반영(시각순 복리).
Args:
trades: trade dict 리스트(시각순 정렬 전).
initial_cash: 초기 현금.
min_order_krw: 매수·매도 최소 원화 금액.
fee_rate: 수수료율.
Returns:
(동일 dict 참조, amount_krw 채움), alloc_stats 요약.
"""
return allocate_order_amounts_chronological(
trades,
initial_cash=initial_cash,
min_order_krw=min_order_krw,
fee_rate=fee_rate,
)
def _trade_buy_amount(
t: dict[str, Any],
cash: float,
leg_budget: float,
current_leg: int | None,
leg_id: int,
fee_rate: float,
) -> tuple[float, float, int | None]:
"""
매수 체결 원화: amount_krw 우선, 없으면 leg_budget*weight.
Returns:
(amount, new_leg_budget, new_current_leg).
"""
weight = float(t.get("weight", 1.0))
if t.get("amount_krw") is not None and float(t["amount_krw"]) > 0:
amount = min(float(t["amount_krw"]), max(cash / (1.0 + fee_rate), 0.0))
return amount, leg_budget, current_leg
if leg_id != current_leg:
current_leg = leg_id
leg_budget = cash
amount = leg_budget * weight
return amount, leg_budget, current_leg
def order_trades_leg_block(
trades: list[TradePoint] | list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""
leg별 매수 전량 → 매도 전량 순으로 정렬합니다 (포트폴리오 시뮬·JSON 저장용).
시각순 정렬은 leg가 섞여 매도 미완료·보유 누적 오류를 만듭니다.
Args:
trades: TradePoint 또는 dict 리스트.
Returns:
leg_id, action(buy=0), dt 순 dict 리스트.
"""
rows = [t if isinstance(t, dict) else asdict(t) for t in trades]
def _sort_key(x: dict[str, Any]) -> tuple[int, int, str]:
return (int(x.get("leg_id", 0)), 0 if x.get("action") == "buy" else 1, x["dt"])
return sorted(rows, key=_sort_key)
def order_trades_chronological(
trades: list[TradePoint] | list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""
시각순 dict 리스트 (차트 표시·분석용).
Args:
trades: TradePoint 또는 dict.
Returns:
dt 순 정렬된 dict 리스트.
"""
rows = [t if isinstance(t, dict) else asdict(t) for t in trades]
return sorted(rows, key=lambda x: x["dt"])
def _truth_simulation_rows(
trades: list[dict[str, Any]] | list[TradePoint],
*,
chronological: bool = False,
) -> list[dict[str, Any]]:
"""
포트폴리오 시뮬용 체결 순서로 정규화합니다.
Args:
trades: JSON trades 또는 TradePoint.
chronological: True면 시각순(레거시), False면 leg 블록 순(기본).
Returns:
dict 행 리스트.
"""
rows = [t if isinstance(t, dict) else asdict(t) for t in trades]
use_chrono = chronological or any(
float(r.get("amount_krw") or 0) > 0 for r in rows
)
if use_chrono:
return sorted(rows, key=lambda x: x["dt"])
return order_trades_leg_block(trades)
def simulate_truth_portfolio_steps(
trades: list[dict[str, Any]] | list[TradePoint],
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
) -> list[dict[str, Any]]:
"""
체결마다 현금·보유·총평가(현금+보유×체결가) 스냅샷을 반환합니다.
Args:
trades: JSON trades 또는 TradePoint 리스트.
initial_cash: 시작 원화.
fee_rate: 매수·매도 수수료율.
Returns:
체결 시각순 스냅샷 dict 리스트 (total_asset_krw, cash_krw, holding_qty 등).
"""
rows = _truth_simulation_rows(trades)
cash = float(initial_cash)
qty = 0.0
leg_budget = 0.0
current_leg: int | None = None
sell_leg: int | None = None
sell_base_qty = 0.0
steps: list[dict[str, Any]] = []
for t in rows:
action = t["action"]
price = float(t["price"])
weight = float(t.get("weight", 1.0))
leg_id = int(t.get("leg_id", 0))
if action == "buy":
if leg_id != current_leg:
current_leg = leg_id
leg_budget = cash
sell_leg = None
amount, leg_budget, current_leg = _trade_buy_amount(
t, cash, leg_budget, current_leg, leg_id, fee_rate
)
if amount <= 0:
continue
fee = amount * fee_rate
spend = amount + fee
if spend > cash:
amount = max(cash / (1.0 + fee_rate), 0.0)
fee = amount * fee_rate
spend = amount + fee
cash -= spend
if price > 0:
qty += amount / price
elif action == "sell" and qty > 0:
if leg_id != sell_leg:
sell_leg = leg_id
sell_base_qty = qty
sell_qty = _resolve_sell_qty(t, qty, price, sell_base_qty, weight)
if sell_qty <= 0:
continue
gross = sell_qty * price
fee = gross * fee_rate
cash += gross - fee
qty -= sell_qty
if qty < 1e-12:
qty = 0.0
total_asset = cash + qty * price
steps.append(
{
"dt": t["dt"],
"action": action,
"price": price,
"weight": weight,
"amount_krw": t.get("amount_krw"),
"leg_id": leg_id,
"cash_krw": round(cash, 0),
"holding_qty": round(qty, 4),
"total_asset_krw": round(total_asset, 0),
}
)
return steps
def simulate_truth_portfolio(
trades: list[dict[str, Any]] | list[TradePoint],
initial_cash: float = GT_INITIAL_CASH_KRW,
fee_rate: float = TRADING_FEE_RATE,
last_price: float | None = None,
) -> dict[str, Any]:
"""
분할 매수·매도를 시간순으로 적용한 뒤, 초기·기말 총평가로 수익을 계산합니다.
- 초기 총평가 = initial_cash (전액 현금, 보유 0).
- 매수/매도마다 그 시점 현금·보유 수량을 갱신 (분할 비중 weight 반영).
- 기말 총평가 = 현금 + 보유수량 × mark_price(미청산은 종가 평가).
- 수익금 = 기말 총평가 초기 총평가.
- 수익률(%) = 수익금 / 초기 총평가 × 100.
분할 매도: 같은 leg의 첫 매도 시점 보유량 기준으로 weight 합이 1이 되도록 매도
(0.65+0.35를 남은 수량에 연속 적용하지 않음).
Args:
trades: JSON trades 또는 TradePoint 리스트.
initial_cash: 시작 원화 (기본 GT_INITIAL_CASH_KRW, 40만).
fee_rate: 매수·매도 각각 적용 수수료율.
last_price: 미청산 평가용 종가. None이면 마지막 체결가.
Returns:
initial_cash, final_asset, pnl_krw, pnl_pct, total_fees, holding_qty 등.
"""
rows = _truth_simulation_rows(trades)
cash = float(initial_cash)
qty = 0.0
total_fees = 0.0
leg_budget = 0.0
current_leg: int | None = None
sell_leg: int | None = None
sell_base_qty = 0.0
last_trade_price = last_price
for t in rows:
action = t["action"]
price = float(t["price"])
weight = float(t.get("weight", 1.0))
leg_id = int(t.get("leg_id", 0))
last_trade_price = price
if action == "buy":
if leg_id != current_leg:
current_leg = leg_id
leg_budget = cash
sell_leg = None
amount, leg_budget, current_leg = _trade_buy_amount(
t, cash, leg_budget, current_leg, leg_id, fee_rate
)
if amount <= 0:
continue
fee = amount * fee_rate
spend = amount + fee
if spend > cash:
amount = max(cash / (1.0 + fee_rate), 0.0)
fee = amount * fee_rate
spend = amount + fee
cash -= spend
total_fees += fee
if price > 0:
qty += amount / price
elif action == "sell" and qty > 0:
if leg_id != sell_leg:
sell_leg = leg_id
sell_base_qty = qty
sell_qty = _resolve_sell_qty(t, qty, price, sell_base_qty, weight)
if sell_qty <= 0:
continue
gross = sell_qty * price
fee = gross * fee_rate
cash += gross - fee
total_fees += fee
qty -= sell_qty
if qty < 1e-12:
qty = 0.0
if last_price is None:
mark_price = None
holding_value = 0.0
else:
mark_price = float(last_price)
holding_value = qty * mark_price
final_asset = cash + holding_value
pnl_krw = final_asset - initial_cash
pnl_pct = pnl_krw / initial_cash * 100.0 if initial_cash else 0.0
return {
"initial_cash_krw": round(initial_cash, 0),
"final_asset_krw": round(final_asset, 0),
"pnl_krw": round(pnl_krw, 0),
"pnl_pct": round(pnl_pct, 2),
"total_fees_krw": round(total_fees, 0),
"cash_krw": round(cash, 0),
"holding_qty": round(qty, 6),
"holding_value_krw": round(holding_value, 0),
"mark_price": round(mark_price, 2) if last_price is not None else None,
"fee_rate": fee_rate,
}
def save_ground_truth(data: dict[str, Any], path: Path = DEFAULT_OUTPUT) -> Path:
"""정답 JSON 저장."""
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
return path
def load_ground_truth(path: Path = DEFAULT_OUTPUT) -> dict[str, Any] | None:
"""정답 JSON 로드."""
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8"))
def _report_month_gaps(trades: list[dict[str, Any]]) -> list[str]:
"""거래가 없는 연속 월(YYYY-MM) 목록."""
if not trades:
return []
from collections import Counter
months = sorted({t["dt"][:7] for t in trades})
gaps: list[str] = []
y1, m1 = map(int, months[0].split("-"))
for label in months[1:]:
y2, m2 = map(int, label.split("-"))
gap = (y2 - y1) * 12 + (m2 - m1)
if gap > 1:
gaps.append(f"{months[months.index(label) - 1]}{label} ({gap - 1}개월 공백)")
y1, m1 = y2, m2
return gaps
def print_ground_truth_report(data: dict[str, Any]) -> None:
"""터미널 요약 출력."""
s = data.get("summary", {})
trades = data.get("trades") or []
print(f"\n[정답 타점] {data.get('period_start')} ~ {data.get('period_end')}")
print(
f" 피벗 {s.get('pivot_candidates')} | 매수 {s.get('buy_count')} / 매도 {s.get('sell_count')} "
f"| leg {s.get('round_trips')}"
)
print(f" 매도 수익 합(참고): {s.get('sum_sell_leg_return_pct')}%")
if s.get("initial_cash_krw"):
print(
f" 포트폴리오: 초기 ₩{s['initial_cash_krw']:,.0f}"
f"총보유자산 ₩{s['final_asset_krw']:,.0f} | "
f"초기 대비 {s['pnl_pct']:+.2f}% | "
f"수수료 ₩{s['total_fees_krw']:,.0f}"
)
if s.get("holding_qty", 0) > 0:
print(
f" 미청산: {s['holding_qty']}"
f"(평가 ₩{s['holding_value_krw']:,.0f}, 종가 ₩{s.get('mark_price', 0):,.0f})"
)
elif s.get("execution_order"):
print(f" 체결 순서: {s['execution_order']} (leg별 매수→매도)")
print(f" 파라미터: {data.get('params')}")
from collections import Counter
by_month = Counter(t["dt"][:7] for t in trades)
print(f" 월별 타점: {', '.join(f'{m}({c})' for m, c in sorted(by_month.items()))}")
gaps = _report_month_gaps(trades)
if gaps:
print(f" 경고 — 거래 공백 월: {'; '.join(gaps)}")
else:
print(" 월별 공백: 없음 (연속 커버)")
show = trades if len(trades) <= 40 else trades[:20] + trades[-10:]
if len(trades) > 40:
print(f" (타점 {len(trades)}건 — 앞 20·뒤 10건만 표시)")
for t in show:
mark = "매수" if t["action"] == "buy" else "매도"
w = float(t.get("weight", 1.0))
ret = t.get("forward_return_pct")
ret_s = f" (+{ret}%)" if ret is not None else ""
print(
f" [{mark}] {t['dt'][:16]}{t['price']:,.0f} "
f"비중{w*100:.0f}%{ret_s} {t.get('memo', '')}"
)
def run_from_db(monitor=None, output: Path = DEFAULT_OUTPUT) -> dict[str, Any]:
"""
coins.db에서 CHART_LOOKBACK_DAYS 구간을 읽어 정답을 생성·저장합니다.
Args:
monitor: Monitor 인스턴스. None이면 새로 생성.
output: 저장 경로.
Returns:
생성된 dict.
"""
from config import TREND_INTERVAL_1D, TREND_INTERVAL_1H
from deepcoin.ops.monitor import Monitor
mon = monitor or Monitor(cooldown_file=None)
print(f"정답 생성: 최근 {CHART_LOOKBACK_DAYS}일 3분봉")
frames = load_frames_from_db(mon, SYMBOL, lookback_days=CHART_LOOKBACK_DAYS)
df_3m = frames.get(ENTRY_INTERVAL)
if df_3m is None or df_3m.empty:
raise RuntimeError("3분봉 없음. python scripts/01_download.py 실행 후 재시도.")
df_1d = frames.get(TREND_INTERVAL_1D)
if df_1d is None or df_1d.empty:
df_1d = df_3m
df_1h = frames.get(TREND_INTERVAL_1H)
if df_1h is None or df_1h.empty:
df_1h = df_3m
data = generate_ground_truth(df_3m, df_1d, df_1h)
save_ground_truth(data, output)
print(f"저장: {output}")
print_ground_truth_report(data)
return data
def main() -> None:
"""CLI: 정답 JSON 생성."""
run_from_db()
if __name__ == "__main__":
main()