fix(ops): live 신호 누락 방어 — bar 재정렬·datetime catch-up·N봉 재시도

lookback 롤링으로 bar_index가 밀려 매도/매수가 스킵되던 문제를 tick마다 재정렬하고,
last_processed_datetime 기반 catch-up과 OPS_CATCHUP_BARS(10) 2층 방어를 추가한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dsyoon
2026-06-14 13:46:31 +09:00
parent be9ea53875
commit ae536ff66c
4 changed files with 194 additions and 21 deletions

View File

@@ -90,6 +90,8 @@ OPS_SYNC_CANDLES=true
# 비우면 DOWNLOAD_INTERVALS 전체 증분 sync
# OPS_SYNC_INTERVALS=
OPS_SIGNAL_TAIL_BARS=800
# live 2층 방어: 최근 N봉(3분봉 기준) 미체결 신호 재시도 (10=30분)
OPS_CATCHUP_BARS=10
# OPS_PERSIST_SIGNAL_CACHE=false
OPS_STATE_JSON=data/spot/operations/fractal_ops_state.json
OPS_REPORT_JSON=docs/spot/3_operations/fractal_ops_report.json

View File

@@ -111,6 +111,7 @@ class Settings:
ops_sync_intervals: list[int]
ops_signal_tail_bars: int
ops_persist_signal_cache: bool
ops_catchup_bars: int
telegram_bot_token: str
telegram_chat_id: str
ops_telegram_enabled: bool
@@ -296,6 +297,7 @@ def load_settings(env_path: Path | None = None) -> Settings:
ops_signal_tail_bars=int(os.getenv("OPS_SIGNAL_TAIL_BARS", "800")),
ops_persist_signal_cache=os.getenv("OPS_PERSIST_SIGNAL_CACHE", "false").strip().lower()
in ("1", "true", "yes", "on"),
ops_catchup_bars=int(os.getenv("OPS_CATCHUP_BARS", "10")),
telegram_bot_token=os.getenv("COIN_TELEGRAM_BOT_TOKEN", "").strip(),
telegram_chat_id=os.getenv("COIN_TELEGRAM_CHAT_ID", "").strip(),
ops_telegram_enabled=_parse_ops_telegram_enabled(

View File

@@ -38,28 +38,150 @@ def sync_candles_if_enabled(settings: Settings) -> list[Any]:
return sync_ops_candles(settings)
def _parse_signal_dt(value: str) -> datetime:
"""신호 datetime 파싱."""
return datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
def _signals_on_bar(signals: list[dict[str, Any]], bar_index: int) -> list[dict[str, Any]]:
"""특정 bar_index 신호만 반환."""
return [s for s in signals if int(s.get("bar_index", -1)) == bar_index]
def _pending_bar_indices(
def _reconcile_processed_cursor(
state: dict[str, Any],
signals: list[dict[str, Any]],
) -> None:
"""tail/lookback 롤링 후 bar 커서가 신호보다 앞서 있으면 보정한다."""
last_dt = state.get("last_processed_datetime")
if not last_dt or not signals:
return
last_bar = int(state.get("last_processed_bar_index", -1))
max_bar_at_or_before = max(
(
int(sig.get("bar_index", -1))
for sig in signals
if str(sig.get("datetime", "")) <= str(last_dt)
),
default=-1,
)
if last_bar > max_bar_at_or_before:
logger.warning(
"bar 커서 보정: %s%s (last_dt=%s)",
last_bar,
max_bar_at_or_before,
last_dt,
)
state["last_processed_bar_index"] = max_bar_at_or_before
def _pending_signals_for_ops(
kept: list[dict[str, Any]],
*,
last_processed_datetime: str | None,
last_bar_index: int,
latest_bar_index: int,
) -> list[int]:
"""체결 대상 bar_index 목록 (최초 실행은 최신 봉만)."""
if last_bar_index < 0:
return [latest_bar_index] if any(
int(s.get("bar_index", -1)) == latest_bar_index for s in kept
) else []
return sorted(
{
int(s.get("bar_index", -1))
for s in kept
if int(s.get("bar_index", -1)) > last_bar_index
}
) -> list[dict[str, Any]]:
"""아직 체결하지 않은 신호 목록 (datetime 우선, 시간순).
live tick은 최신 봉만이 아니라 마지막 처리 시각 이후 신호를 모두 처리한다.
"""
last_dt = _parse_signal_dt(last_processed_datetime) if last_processed_datetime else None
pending: list[dict[str, Any]] = []
for sig in kept:
bar_idx = int(sig.get("bar_index", -1))
if bar_idx > latest_bar_index:
continue
sig_dt = _parse_signal_dt(str(sig["datetime"]))
if last_dt is not None:
if sig_dt <= last_dt:
continue
elif last_bar_index >= 0:
if bar_idx <= last_bar_index:
continue
elif bar_idx != latest_bar_index:
continue
pending.append(sig)
pending.sort(key=lambda s: (_parse_signal_dt(str(s["datetime"])), str(s.get("side", ""))))
return pending
def _signal_key(sig: dict[str, Any]) -> tuple[str, str]:
"""신호 고유 키 (datetime, side)."""
return (str(sig["datetime"]), str(sig["side"]))
def _needs_catchup_execution(
sig: dict[str, Any],
trade_history: list[dict[str, Any]],
) -> bool:
"""최근 N봉 방어 대상인지 (성공·expected_skip 제외, 실패·미시도 포함)."""
dt, side = _signal_key(sig)
for record in trade_history:
if str(record.get("datetime")) != dt or str(record.get("side")) != side:
continue
trade = record.get("trade") or {}
if trade.get("executed"):
return False
if trade.get("expected_skip"):
return False
return True
return True
def _catchup_signals_for_ops(
kept: list[dict[str, Any]],
*,
latest_bar_index: int,
catchup_bars: int,
trade_history: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""최근 N봉(신호 interval) 안에서 아직 성공 체결되지 않은 신호."""
if catchup_bars <= 0:
return []
min_bar = max(0, latest_bar_index - catchup_bars + 1)
catchup: list[dict[str, Any]] = []
for sig in kept:
bar_idx = int(sig.get("bar_index", -1))
if bar_idx < min_bar or bar_idx > latest_bar_index:
continue
if not _needs_catchup_execution(sig, trade_history):
continue
catchup.append(sig)
catchup.sort(
key=lambda s: (_parse_signal_dt(str(s["datetime"])), str(s.get("side", "")))
)
return catchup
def _merge_pending_signals(*groups: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""여러 pending 목록을 datetime·side 기준 병합 (중복 제거, 시간순)."""
merged: list[dict[str, Any]] = []
seen: set[tuple[str, str]] = set()
for group in groups:
for sig in group:
key = _signal_key(sig)
if key in seen:
continue
seen.add(key)
merged.append(sig)
merged.sort(
key=lambda s: (_parse_signal_dt(str(s["datetime"])), str(s.get("side", "")))
)
return merged
def _pending_bar_indices(pending_signals: list[dict[str, Any]]) -> list[int]:
"""pending 신호의 bar_index 목록 (시간순, 중복 제거)."""
seen: set[int] = set()
ordered: list[int] = []
for sig in pending_signals:
bar_idx = int(sig.get("bar_index", -1))
if bar_idx in seen:
continue
seen.add(bar_idx)
ordered.append(bar_idx)
return ordered
def _cluster_pending(pending: list[dict[str, Any]]) -> list[tuple[str, list[dict[str, Any]]]]:
@@ -127,10 +249,11 @@ class OperationsRunner:
latest_bar = int(len(df) - 1)
gen = generate_raw_signals(self.settings, df=df, use_cache=True)
# tick: 최신 봉 후보만 MTF 평가 (전기간 MTF는 백테스트 전용)
filtered = filter_signals_for_ops(self.settings, gen["raw_signals"])
all_kept = filtered["kept"]
bar_candidates = _signals_on_bar(gen["raw_signals"], latest_bar)
filtered = filter_signals_for_ops(self.settings, bar_candidates)
kept = filtered["kept"]
_reconcile_processed_cursor(self.state, gen["raw_signals"])
reset_daily_trade_count(self.state)
if self.settings.ops_mode == "live" and isinstance(self.executor, LiveExecutor):
@@ -146,13 +269,28 @@ class OperationsRunner:
self._notify_ops_error("portfolio_sync", exc)
last_bar = int(self.state.get("last_processed_bar_index", -1))
target_bars = _pending_bar_indices(kept, last_bar, latest_bar)
last_dt = self.state.get("last_processed_datetime")
pending_signals = _pending_signals_for_ops(
all_kept,
last_processed_datetime=last_dt,
last_bar_index=last_bar,
latest_bar_index=latest_bar,
)
catchup_signals = _catchup_signals_for_ops(
all_kept,
latest_bar_index=latest_bar,
catchup_bars=self.settings.ops_catchup_bars,
trade_history=self.state.get("trade_history") or [],
)
catchup_keys = {_signal_key(s) for s in catchup_signals}
pending_signals = _merge_pending_signals(pending_signals, catchup_signals)
target_bars = _pending_bar_indices(pending_signals)
executions: list[dict[str, Any]] = []
max_daily = self.settings.ops_daily_max_trades
for bar_idx in target_bars:
bar_signals = _signals_on_bar(kept, bar_idx)
bar_signals = _signals_on_bar(pending_signals, bar_idx)
clusters = _cluster_pending(bar_signals)
for side, cluster in clusters:
if self.state["trades_today_count"] >= max_daily:
@@ -161,7 +299,7 @@ class OperationsRunner:
cluster_size = len(cluster)
for sig in cluster:
full_sig = next(
(k for k in kept if k["datetime"] == sig["datetime"]),
(k for k in all_kept if k["datetime"] == sig["datetime"]),
sig,
)
try:
@@ -196,6 +334,7 @@ class OperationsRunner:
"signal_type": full_sig.get("signal_type"),
"price": full_sig["price"],
"bar_index": bar_idx,
"catchup": _signal_key(full_sig) in catchup_keys,
"trade": trade.to_dict(),
"mtf_filter": full_sig.get("mtf_filter"),
}
@@ -231,9 +370,12 @@ class OperationsRunner:
pipeline = {
"technique_id": gen["technique_id"],
"raw_count": len(bar_candidates),
"kept_count": len(kept),
"kept_count": len(all_kept),
"rejected_count": len(filtered["rejected"]),
"latest_bar_index": latest_bar,
"pending_signal_count": len(pending_signals),
"catchup_signal_count": len(catchup_signals),
"catchup_bars": self.settings.ops_catchup_bars,
}
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@@ -260,6 +402,9 @@ class OperationsRunner:
"raw_signals": pipeline["raw_count"],
"filtered_signals": pipeline["kept_count"],
"pending_bars": target_bars,
"pending_signal_count": pipeline["pending_signal_count"],
"catchup_signal_count": pipeline["catchup_signal_count"],
"catchup_bars": pipeline["catchup_bars"],
"latest_bar_candidates": pipeline["raw_count"],
"executions": executions,
"portfolio": self.state["portfolio"],

View File

@@ -67,6 +67,28 @@ def _merge_tail_signals(
return merged
def _reindex_signals_to_df(
signals: list[dict[str, Any]],
df: pd.DataFrame,
) -> list[dict[str, Any]]:
"""신호 bar_index를 현재 df 행 위치에 맞게 재부여한다.
lookback_days 롤링 윈도우로 앞쪽 봉이 빠지면 동일 datetime의 bar_index가
변하므로, 캐시 신호는 tick마다 datetime 기준으로 재정렬해야 한다.
"""
if df.empty or not signals:
return signals
dt_to_idx = {str(row["datetime"]): int(i) for i, row in df.iterrows()}
reindexed: list[dict[str, Any]] = []
for signal in signals:
row = dict(signal)
idx = dt_to_idx.get(str(row.get("datetime", "")))
if idx is not None:
row["bar_index"] = idx
reindexed.append(row)
return reindexed
def _load_technique_cached(cache_path: Path) -> TechniqueResult:
"""대용량 기법 JSON을 mtime 기준으로 메모리 캐시한다 (fractal_swing 등)."""
path = Path(cache_path)
@@ -225,6 +247,7 @@ def generate_raw_signals(
len(cached.signals),
len(raw_signals),
)
raw_signals = _reindex_signals_to_df(raw_signals, df)
raw_signals = _apply_ops_min_score(
cached.technique_id,
raw_signals,
@@ -250,11 +273,12 @@ def generate_raw_signals(
technique = get_technique(settings.ops_technique_id)
params_obj = build_technique_params(settings)
result = run_technique(technique, df, params_obj, gt_result=None)
signals = _reindex_signals_to_df(result.signals, df)
return {
"technique_id": result.technique_id,
"technique_name": result.technique_name,
"params": result.params,
"raw_signals": result.signals,
"raw_signals": signals,
"data_end": data_end,
"last_price": last_price,
"from_cache": False,