feat(spot): 2단계 인과 기법 분석 파이프라인 마무리
common/spot/futures 경로 정비, 캔들 데이터 모듈 복원, MTF 규칙 자동 저장 및 2단계 설계·최종 정리 문서를 반영해 3단계 착수 기반을 확정한다. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
185
src/deepcoin/data/downloader.py
Normal file
185
src/deepcoin/data/downloader.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""빗썸 캔들 역방향 수집."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from deepcoin.api.bithumb import BithumbCandleClient, parse_kst_datetime
|
||||
from deepcoin.config import Settings
|
||||
from deepcoin.data.candle_store import CandleStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DownloadResult:
|
||||
"""인터벌별 수집 결과."""
|
||||
|
||||
interval_min: int
|
||||
mode: str
|
||||
requests: int
|
||||
saved_rows: int
|
||||
reached_target: bool
|
||||
|
||||
|
||||
def _candle_rows_from_api(
|
||||
candles: list[dict[str, Any]],
|
||||
) -> list[tuple[str, float, float, float, float, float]]:
|
||||
"""API 응답을 DB upsert 튜플로 변환한다."""
|
||||
rows: list[tuple[str, float, float, float, float, float]] = []
|
||||
for candle in candles:
|
||||
ts = candle.get("candle_date_time_kst") or candle.get("candle_date_time_utc")
|
||||
if not ts:
|
||||
continue
|
||||
rows.append(
|
||||
(
|
||||
str(ts).replace("T", " "),
|
||||
float(candle["opening_price"]),
|
||||
float(candle["high_price"]),
|
||||
float(candle["low_price"]),
|
||||
float(candle["trade_price"]),
|
||||
float(candle.get("candle_acc_trade_volume", 0.0)),
|
||||
)
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
class CandleDownloader:
|
||||
"""설정 기반 캔들 다운로더."""
|
||||
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
"""다운로더를 초기화한다.
|
||||
|
||||
Args:
|
||||
settings: 애플리케이션 설정.
|
||||
"""
|
||||
self.settings = settings
|
||||
self._client = BithumbCandleClient(
|
||||
base_url=settings.api_url,
|
||||
count=settings.candle_count,
|
||||
sleep_sec=settings.request_sleep_sec,
|
||||
retries=settings.request_retries,
|
||||
)
|
||||
|
||||
def download_all(
|
||||
self,
|
||||
store: CandleStore,
|
||||
*,
|
||||
days: int,
|
||||
full: bool = False,
|
||||
) -> list[DownloadResult]:
|
||||
"""모든 인터벌을 수집한다.
|
||||
|
||||
Args:
|
||||
store: 캔들 저장소.
|
||||
days: 풀 다운 목표 일수.
|
||||
full: True면 목표 일수까지 역방향 풀 다운.
|
||||
|
||||
Returns:
|
||||
인터벌별 DownloadResult 리스트.
|
||||
"""
|
||||
results: list[DownloadResult] = []
|
||||
for interval_min in self.settings.download_intervals:
|
||||
results.append(
|
||||
self._download_interval(store, interval_min, days=days, full=full)
|
||||
)
|
||||
return results
|
||||
|
||||
def _download_interval(
|
||||
self,
|
||||
store: CandleStore,
|
||||
interval_min: int,
|
||||
*,
|
||||
days: int,
|
||||
full: bool,
|
||||
) -> DownloadResult:
|
||||
"""단일 인터벌을 수집한다."""
|
||||
symbol = self.settings.symbol
|
||||
count_before, _, db_max = store.get_range(symbol, interval_min)
|
||||
target_from = datetime.now() - timedelta(days=max(1, days))
|
||||
|
||||
if full or db_max is None:
|
||||
mode = "full"
|
||||
stop_at = target_from
|
||||
else:
|
||||
mode = "incremental"
|
||||
if db_max >= datetime.now() - timedelta(minutes=max(interval_min, 1)):
|
||||
return DownloadResult(
|
||||
interval_min=interval_min,
|
||||
mode="uptodate",
|
||||
requests=0,
|
||||
saved_rows=0,
|
||||
reached_target=True,
|
||||
)
|
||||
stop_at = db_max - timedelta(minutes=interval_min)
|
||||
|
||||
to_kst: datetime | None = None
|
||||
requests = 0
|
||||
saved_rows = 0
|
||||
reached_target = False
|
||||
oldest_seen: datetime | None = None
|
||||
|
||||
while True:
|
||||
candles = self._client.fetch_candles(
|
||||
self.settings.market,
|
||||
interval_min,
|
||||
to_kst=to_kst,
|
||||
)
|
||||
requests += 1
|
||||
if not candles:
|
||||
break
|
||||
|
||||
rows = _candle_rows_from_api(candles)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
saved_rows += store.upsert_rows(
|
||||
symbol,
|
||||
self.settings.coin_name,
|
||||
interval_min,
|
||||
rows,
|
||||
)
|
||||
|
||||
batch_oldest = min(parse_kst_datetime(r[0]) for r in rows)
|
||||
if oldest_seen is None or batch_oldest < oldest_seen:
|
||||
oldest_seen = batch_oldest
|
||||
|
||||
if batch_oldest <= stop_at:
|
||||
reached_target = True
|
||||
break
|
||||
|
||||
to_kst = batch_oldest
|
||||
if to_kst <= stop_at:
|
||||
reached_target = True
|
||||
break
|
||||
|
||||
if mode == "full" and oldest_seen is not None and oldest_seen <= target_from:
|
||||
reached_target = True
|
||||
if mode == "incremental" and requests == 0 and count_before > 0:
|
||||
return DownloadResult(
|
||||
interval_min=interval_min,
|
||||
mode="uptodate",
|
||||
requests=0,
|
||||
saved_rows=0,
|
||||
reached_target=True,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"수집 완료 %s_%s mode=%s requests=%s saved=%s reached=%s",
|
||||
symbol,
|
||||
interval_min,
|
||||
mode,
|
||||
requests,
|
||||
saved_rows,
|
||||
reached_target,
|
||||
)
|
||||
return DownloadResult(
|
||||
interval_min=interval_min,
|
||||
mode=mode,
|
||||
requests=requests,
|
||||
saved_rows=saved_rows,
|
||||
reached_target=reached_target,
|
||||
)
|
||||
Reference in New Issue
Block a user