파이프라인 산출물(data/, docs/)을 Git 추적에서 제외하고 히스토리를 단일 커밋으로 재구성해 저장소 용량을 경량화한다. Co-authored-by: Cursor <cursoragent@cursor.com>
135 lines
4.2 KiB
Python
135 lines
4.2 KiB
Python
"""빗썸 Public REST API — 캔들 조회."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
from deepcoin.data.intervals import INTERVAL_DAILY, INTERVAL_MONTHLY, INTERVAL_WEEKLY
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 인터벌(분) → 빗썸 candles API 경로 세그먼트
|
|
_CALENDAR_PATHS: dict[int, str] = {
|
|
INTERVAL_DAILY: "days",
|
|
INTERVAL_WEEKLY: "weeks",
|
|
INTERVAL_MONTHLY: "months",
|
|
}
|
|
|
|
KST_FMT = "%Y-%m-%d %H:%M:%S"
|
|
KST_FMT_T = "%Y-%m-%dT%H:%M:%S"
|
|
|
|
|
|
def parse_kst_datetime(value: str) -> datetime:
|
|
"""KST 캔들 시각 문자열을 datetime으로 변환한다.
|
|
|
|
Args:
|
|
value: `yyyy-MM-dd HH:mm:ss` 또는 ISO 형식.
|
|
|
|
Returns:
|
|
naive datetime (KST 기준).
|
|
"""
|
|
normalized = value.strip().replace("T", " ")
|
|
return datetime.strptime(normalized, KST_FMT)
|
|
|
|
|
|
def format_kst_datetime(dt: datetime) -> str:
|
|
"""datetime을 빗썸 `to` 파라미터 형식으로 포맷한다.
|
|
|
|
Args:
|
|
dt: KST 기준 시각.
|
|
|
|
Returns:
|
|
`yyyy-MM-dd HH:mm:ss` 문자열.
|
|
"""
|
|
return dt.strftime(KST_FMT)
|
|
|
|
|
|
class BithumbCandleClient:
|
|
"""빗썸 캔들 API 클라이언트."""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str = "https://api.bithumb.com",
|
|
count: int = 200,
|
|
sleep_sec: float = 0.35,
|
|
retries: int = 3,
|
|
) -> None:
|
|
"""클라이언트를 초기화한다.
|
|
|
|
Args:
|
|
base_url: API 베이스 URL.
|
|
count: 요청당 캔들 개수 (최대 200).
|
|
sleep_sec: 연속 요청 간 대기(초).
|
|
retries: 실패 시 재시도 횟수.
|
|
"""
|
|
self.base_url = base_url.rstrip("/")
|
|
self.count = min(max(count, 1), 200)
|
|
self.sleep_sec = sleep_sec
|
|
self.retries = retries
|
|
self._session = requests.Session()
|
|
self._session.headers.update({"accept": "application/json"})
|
|
|
|
def fetch_candles(
|
|
self,
|
|
market: str,
|
|
interval_min: int,
|
|
to_kst: datetime | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""캔들 배치를 조회한다 (최신순).
|
|
|
|
Args:
|
|
market: 거래 페어 (예: KRW-WLD).
|
|
interval_min: 분 단위. 1440이면 일봉 API 사용.
|
|
to_kst: 조회 기준 시각(KST). 해당 시각 캔들은 제외.
|
|
|
|
Returns:
|
|
캔들 dict 리스트. API 오류 시 빈 리스트.
|
|
|
|
Raises:
|
|
requests.RequestException: 재시도 후에도 네트워크 실패.
|
|
"""
|
|
if interval_min in _CALENDAR_PATHS:
|
|
segment = _CALENDAR_PATHS[interval_min]
|
|
url = f"{self.base_url}/v1/candles/{segment}"
|
|
params: dict[str, Any] = {"market": market, "count": self.count}
|
|
else:
|
|
url = f"{self.base_url}/v1/candles/minutes/{interval_min}"
|
|
params = {"market": market, "count": self.count}
|
|
|
|
if to_kst is not None:
|
|
params["to"] = format_kst_datetime(to_kst)
|
|
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, self.retries + 1):
|
|
try:
|
|
response = self._session.get(url, params=params, timeout=30)
|
|
if response.status_code == 429:
|
|
wait = self.sleep_sec * attempt * 3
|
|
logger.warning("Rate limit 429 — %ss 대기 후 재시도", wait)
|
|
time.sleep(wait)
|
|
continue
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if isinstance(payload, dict) and "error" in payload:
|
|
logger.error("API error: %s", payload["error"])
|
|
return []
|
|
if not isinstance(payload, list):
|
|
logger.error("Unexpected response type: %s", type(payload))
|
|
return []
|
|
time.sleep(self.sleep_sec)
|
|
return payload
|
|
except requests.RequestException as exc:
|
|
last_error = exc
|
|
wait = self.sleep_sec * attempt * 2
|
|
logger.warning("Request failed (%s/%s): %s", attempt, self.retries, exc)
|
|
time.sleep(wait)
|
|
|
|
if last_error is not None:
|
|
raise last_error
|
|
return []
|