"""빗썸 캔들 역방향 수집.""" from __future__ import annotations import logging from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any from deepcoin.api.bithumb import BithumbCandleClient, parse_kst_datetime from deepcoin.config import Settings from deepcoin.data.candle_store import CandleStore logger = logging.getLogger(__name__) @dataclass(frozen=True) class DownloadResult: """인터벌별 수집 결과.""" interval_min: int mode: str requests: int saved_rows: int reached_target: bool def _candle_rows_from_api( candles: list[dict[str, Any]], ) -> list[tuple[str, float, float, float, float, float]]: """API 응답을 DB upsert 튜플로 변환한다.""" rows: list[tuple[str, float, float, float, float, float]] = [] for candle in candles: ts = candle.get("candle_date_time_kst") or candle.get("candle_date_time_utc") if not ts: continue rows.append( ( str(ts).replace("T", " "), float(candle["opening_price"]), float(candle["high_price"]), float(candle["low_price"]), float(candle["trade_price"]), float(candle.get("candle_acc_trade_volume", 0.0)), ) ) return rows class CandleDownloader: """설정 기반 캔들 다운로더.""" def __init__(self, settings: Settings) -> None: """다운로더를 초기화한다. Args: settings: 애플리케이션 설정. """ self.settings = settings self._client = BithumbCandleClient( base_url=settings.api_url, count=settings.candle_count, sleep_sec=settings.request_sleep_sec, retries=settings.request_retries, ) def download_all( self, store: CandleStore, *, days: int, full: bool = False, ) -> list[DownloadResult]: """모든 인터벌을 수집한다. Args: store: 캔들 저장소. days: 풀 다운 목표 일수. full: True면 목표 일수까지 역방향 풀 다운. Returns: 인터벌별 DownloadResult 리스트. """ results: list[DownloadResult] = [] for interval_min in self.settings.download_intervals: results.append( self._download_interval(store, interval_min, days=days, full=full) ) return results def _download_interval( self, store: CandleStore, interval_min: int, *, days: int, full: bool, ) -> DownloadResult: """단일 인터벌을 수집한다.""" symbol = self.settings.symbol count_before, _, db_max = store.get_range(symbol, interval_min) target_from = datetime.now() - timedelta(days=max(1, days)) if full or db_max is None: mode = "full" stop_at = target_from else: mode = "incremental" if db_max >= datetime.now() - timedelta(minutes=max(interval_min, 1)): return DownloadResult( interval_min=interval_min, mode="uptodate", requests=0, saved_rows=0, reached_target=True, ) stop_at = db_max - timedelta(minutes=interval_min) to_kst: datetime | None = None requests = 0 saved_rows = 0 reached_target = False oldest_seen: datetime | None = None while True: candles = self._client.fetch_candles( self.settings.market, interval_min, to_kst=to_kst, ) requests += 1 if not candles: break rows = _candle_rows_from_api(candles) if not rows: break if mode == "incremental" and db_max is not None: inserted = store.insert_new_rows( symbol, self.settings.coin_name, interval_min, rows, after=db_max, ) else: inserted = store.upsert_rows( symbol, self.settings.coin_name, interval_min, rows, ) saved_rows += inserted batch_oldest = min(parse_kst_datetime(r[0]) for r in rows) if oldest_seen is None or batch_oldest < oldest_seen: oldest_seen = batch_oldest if batch_oldest <= stop_at: reached_target = True break to_kst = batch_oldest if to_kst <= stop_at: reached_target = True break if mode == "full" and oldest_seen is not None and oldest_seen <= target_from: reached_target = True if mode == "incremental" and requests == 0 and count_before > 0: return DownloadResult( interval_min=interval_min, mode="uptodate", requests=0, saved_rows=0, reached_target=True, ) logger.info( "수집 완료 %s_%s mode=%s requests=%s inserted=%s reached=%s", symbol, interval_min, mode, requests, saved_rows, reached_target, ) return DownloadResult( interval_min=interval_min, mode=mode, requests=requests, saved_rows=saved_rows, reached_target=reached_target, )