diff --git a/.env.example b/.env.example index dbe1be0..0b533cd 100644 --- a/.env.example +++ b/.env.example @@ -90,6 +90,8 @@ OPS_SYNC_CANDLES=true # 비우면 DOWNLOAD_INTERVALS 전체 증분 sync # OPS_SYNC_INTERVALS= OPS_SIGNAL_TAIL_BARS=800 +# live 2층 방어: 최근 N봉(3분봉 기준) 미체결 신호 재시도 (10=30분) +OPS_CATCHUP_BARS=10 # OPS_PERSIST_SIGNAL_CACHE=false OPS_STATE_JSON=data/spot/operations/fractal_ops_state.json OPS_REPORT_JSON=docs/spot/3_operations/fractal_ops_report.json diff --git a/src/bithumb/config.py b/src/bithumb/config.py index da73a78..c0f7f6c 100644 --- a/src/bithumb/config.py +++ b/src/bithumb/config.py @@ -111,6 +111,7 @@ class Settings: ops_sync_intervals: list[int] ops_signal_tail_bars: int ops_persist_signal_cache: bool + ops_catchup_bars: int telegram_bot_token: str telegram_chat_id: str ops_telegram_enabled: bool @@ -296,6 +297,7 @@ def load_settings(env_path: Path | None = None) -> Settings: ops_signal_tail_bars=int(os.getenv("OPS_SIGNAL_TAIL_BARS", "800")), ops_persist_signal_cache=os.getenv("OPS_PERSIST_SIGNAL_CACHE", "false").strip().lower() in ("1", "true", "yes", "on"), + ops_catchup_bars=int(os.getenv("OPS_CATCHUP_BARS", "10")), telegram_bot_token=os.getenv("COIN_TELEGRAM_BOT_TOKEN", "").strip(), telegram_chat_id=os.getenv("COIN_TELEGRAM_CHAT_ID", "").strip(), ops_telegram_enabled=_parse_ops_telegram_enabled( diff --git a/src/bithumb/operations/runner.py b/src/bithumb/operations/runner.py index 8861919..1149140 100644 --- a/src/bithumb/operations/runner.py +++ b/src/bithumb/operations/runner.py @@ -38,28 +38,150 @@ def sync_candles_if_enabled(settings: Settings) -> list[Any]: return sync_ops_candles(settings) +def _parse_signal_dt(value: str) -> datetime: + """신호 datetime 파싱.""" + return datetime.strptime(value, "%Y-%m-%d %H:%M:%S") + + def _signals_on_bar(signals: list[dict[str, Any]], bar_index: int) -> list[dict[str, Any]]: """특정 bar_index 신호만 반환.""" return [s for s in signals if int(s.get("bar_index", -1)) == bar_index] -def _pending_bar_indices( +def _reconcile_processed_cursor( + state: dict[str, Any], + signals: list[dict[str, Any]], +) -> None: + """tail/lookback 롤링 후 bar 커서가 신호보다 앞서 있으면 보정한다.""" + last_dt = state.get("last_processed_datetime") + if not last_dt or not signals: + return + last_bar = int(state.get("last_processed_bar_index", -1)) + max_bar_at_or_before = max( + ( + int(sig.get("bar_index", -1)) + for sig in signals + if str(sig.get("datetime", "")) <= str(last_dt) + ), + default=-1, + ) + if last_bar > max_bar_at_or_before: + logger.warning( + "bar 커서 보정: %s → %s (last_dt=%s)", + last_bar, + max_bar_at_or_before, + last_dt, + ) + state["last_processed_bar_index"] = max_bar_at_or_before + + +def _pending_signals_for_ops( kept: list[dict[str, Any]], + *, + last_processed_datetime: str | None, last_bar_index: int, latest_bar_index: int, -) -> list[int]: - """체결 대상 bar_index 목록 (최초 실행은 최신 봉만).""" - if last_bar_index < 0: - return [latest_bar_index] if any( - int(s.get("bar_index", -1)) == latest_bar_index for s in kept - ) else [] - return sorted( - { - int(s.get("bar_index", -1)) - for s in kept - if int(s.get("bar_index", -1)) > last_bar_index - } +) -> list[dict[str, Any]]: + """아직 체결하지 않은 신호 목록 (datetime 우선, 시간순). + + live tick은 최신 봉만이 아니라 마지막 처리 시각 이후 신호를 모두 처리한다. + """ + last_dt = _parse_signal_dt(last_processed_datetime) if last_processed_datetime else None + pending: list[dict[str, Any]] = [] + for sig in kept: + bar_idx = int(sig.get("bar_index", -1)) + if bar_idx > latest_bar_index: + continue + sig_dt = _parse_signal_dt(str(sig["datetime"])) + if last_dt is not None: + if sig_dt <= last_dt: + continue + elif last_bar_index >= 0: + if bar_idx <= last_bar_index: + continue + elif bar_idx != latest_bar_index: + continue + pending.append(sig) + pending.sort(key=lambda s: (_parse_signal_dt(str(s["datetime"])), str(s.get("side", "")))) + return pending + + +def _signal_key(sig: dict[str, Any]) -> tuple[str, str]: + """신호 고유 키 (datetime, side).""" + return (str(sig["datetime"]), str(sig["side"])) + + +def _needs_catchup_execution( + sig: dict[str, Any], + trade_history: list[dict[str, Any]], +) -> bool: + """최근 N봉 방어 대상인지 (성공·expected_skip 제외, 실패·미시도 포함).""" + dt, side = _signal_key(sig) + for record in trade_history: + if str(record.get("datetime")) != dt or str(record.get("side")) != side: + continue + trade = record.get("trade") or {} + if trade.get("executed"): + return False + if trade.get("expected_skip"): + return False + return True + return True + + +def _catchup_signals_for_ops( + kept: list[dict[str, Any]], + *, + latest_bar_index: int, + catchup_bars: int, + trade_history: list[dict[str, Any]], +) -> list[dict[str, Any]]: + """최근 N봉(신호 interval) 안에서 아직 성공 체결되지 않은 신호.""" + if catchup_bars <= 0: + return [] + min_bar = max(0, latest_bar_index - catchup_bars + 1) + catchup: list[dict[str, Any]] = [] + for sig in kept: + bar_idx = int(sig.get("bar_index", -1)) + if bar_idx < min_bar or bar_idx > latest_bar_index: + continue + if not _needs_catchup_execution(sig, trade_history): + continue + catchup.append(sig) + catchup.sort( + key=lambda s: (_parse_signal_dt(str(s["datetime"])), str(s.get("side", ""))) ) + return catchup + + +def _merge_pending_signals(*groups: list[dict[str, Any]]) -> list[dict[str, Any]]: + """여러 pending 목록을 datetime·side 기준 병합 (중복 제거, 시간순).""" + merged: list[dict[str, Any]] = [] + seen: set[tuple[str, str]] = set() + for group in groups: + for sig in group: + key = _signal_key(sig) + if key in seen: + continue + seen.add(key) + merged.append(sig) + merged.sort( + key=lambda s: (_parse_signal_dt(str(s["datetime"])), str(s.get("side", ""))) + ) + return merged + + +def _pending_bar_indices(pending_signals: list[dict[str, Any]]) -> list[int]: + """pending 신호의 bar_index 목록 (시간순, 중복 제거).""" + seen: set[int] = set() + ordered: list[int] = [] + for sig in pending_signals: + bar_idx = int(sig.get("bar_index", -1)) + if bar_idx in seen: + continue + seen.add(bar_idx) + ordered.append(bar_idx) + return ordered def _cluster_pending(pending: list[dict[str, Any]]) -> list[tuple[str, list[dict[str, Any]]]]: @@ -127,10 +249,11 @@ class OperationsRunner: latest_bar = int(len(df) - 1) gen = generate_raw_signals(self.settings, df=df, use_cache=True) - # tick: 최신 봉 후보만 MTF 평가 (전기간 MTF는 백테스트 전용) + filtered = filter_signals_for_ops(self.settings, gen["raw_signals"]) + all_kept = filtered["kept"] bar_candidates = _signals_on_bar(gen["raw_signals"], latest_bar) - filtered = filter_signals_for_ops(self.settings, bar_candidates) - kept = filtered["kept"] + + _reconcile_processed_cursor(self.state, gen["raw_signals"]) reset_daily_trade_count(self.state) if self.settings.ops_mode == "live" and isinstance(self.executor, LiveExecutor): @@ -146,13 +269,28 @@ class OperationsRunner: self._notify_ops_error("portfolio_sync", exc) last_bar = int(self.state.get("last_processed_bar_index", -1)) - target_bars = _pending_bar_indices(kept, last_bar, latest_bar) + last_dt = self.state.get("last_processed_datetime") + pending_signals = _pending_signals_for_ops( + all_kept, + last_processed_datetime=last_dt, + last_bar_index=last_bar, + latest_bar_index=latest_bar, + ) + catchup_signals = _catchup_signals_for_ops( + all_kept, + latest_bar_index=latest_bar, + catchup_bars=self.settings.ops_catchup_bars, + trade_history=self.state.get("trade_history") or [], + ) + catchup_keys = {_signal_key(s) for s in catchup_signals} + pending_signals = _merge_pending_signals(pending_signals, catchup_signals) + target_bars = _pending_bar_indices(pending_signals) executions: list[dict[str, Any]] = [] max_daily = self.settings.ops_daily_max_trades for bar_idx in target_bars: - bar_signals = _signals_on_bar(kept, bar_idx) + bar_signals = _signals_on_bar(pending_signals, bar_idx) clusters = _cluster_pending(bar_signals) for side, cluster in clusters: if self.state["trades_today_count"] >= max_daily: @@ -161,7 +299,7 @@ class OperationsRunner: cluster_size = len(cluster) for sig in cluster: full_sig = next( - (k for k in kept if k["datetime"] == sig["datetime"]), + (k for k in all_kept if k["datetime"] == sig["datetime"]), sig, ) try: @@ -196,6 +334,7 @@ class OperationsRunner: "signal_type": full_sig.get("signal_type"), "price": full_sig["price"], "bar_index": bar_idx, + "catchup": _signal_key(full_sig) in catchup_keys, "trade": trade.to_dict(), "mtf_filter": full_sig.get("mtf_filter"), } @@ -231,9 +370,12 @@ class OperationsRunner: pipeline = { "technique_id": gen["technique_id"], "raw_count": len(bar_candidates), - "kept_count": len(kept), + "kept_count": len(all_kept), "rejected_count": len(filtered["rejected"]), "latest_bar_index": latest_bar, + "pending_signal_count": len(pending_signals), + "catchup_signal_count": len(catchup_signals), + "catchup_bars": self.settings.ops_catchup_bars, } now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -260,6 +402,9 @@ class OperationsRunner: "raw_signals": pipeline["raw_count"], "filtered_signals": pipeline["kept_count"], "pending_bars": target_bars, + "pending_signal_count": pipeline["pending_signal_count"], + "catchup_signal_count": pipeline["catchup_signal_count"], + "catchup_bars": pipeline["catchup_bars"], "latest_bar_candidates": pipeline["raw_count"], "executions": executions, "portfolio": self.state["portfolio"], diff --git a/src/bithumb/operations/signal_pipeline.py b/src/bithumb/operations/signal_pipeline.py index 607813a..6edbc63 100644 --- a/src/bithumb/operations/signal_pipeline.py +++ b/src/bithumb/operations/signal_pipeline.py @@ -67,6 +67,28 @@ def _merge_tail_signals( return merged +def _reindex_signals_to_df( + signals: list[dict[str, Any]], + df: pd.DataFrame, +) -> list[dict[str, Any]]: + """신호 bar_index를 현재 df 행 위치에 맞게 재부여한다. + + lookback_days 롤링 윈도우로 앞쪽 봉이 빠지면 동일 datetime의 bar_index가 + 변하므로, 캐시 신호는 tick마다 datetime 기준으로 재정렬해야 한다. + """ + if df.empty or not signals: + return signals + dt_to_idx = {str(row["datetime"]): int(i) for i, row in df.iterrows()} + reindexed: list[dict[str, Any]] = [] + for signal in signals: + row = dict(signal) + idx = dt_to_idx.get(str(row.get("datetime", ""))) + if idx is not None: + row["bar_index"] = idx + reindexed.append(row) + return reindexed + + def _load_technique_cached(cache_path: Path) -> TechniqueResult: """대용량 기법 JSON을 mtime 기준으로 메모리 캐시한다 (fractal_swing 등).""" path = Path(cache_path) @@ -225,6 +247,7 @@ def generate_raw_signals( len(cached.signals), len(raw_signals), ) + raw_signals = _reindex_signals_to_df(raw_signals, df) raw_signals = _apply_ops_min_score( cached.technique_id, raw_signals, @@ -250,11 +273,12 @@ def generate_raw_signals( technique = get_technique(settings.ops_technique_id) params_obj = build_technique_params(settings) result = run_technique(technique, df, params_obj, gt_result=None) + signals = _reindex_signals_to_df(result.signals, df) return { "technique_id": result.technique_id, "technique_name": result.technique_name, "params": result.params, - "raw_signals": result.signals, + "raw_signals": signals, "data_end": data_end, "last_price": last_price, "from_cache": False,