Files
Bithumb/deepcoin/common/indicators.py
xavis d7848df6f7 refactor: GT·시뮬·운영 3축 정리 및 hybrid 실거래 정합
Phase C/dry-run·미사용 모듈·재생성 HTML을 제거하고, 운영 체결을
sim_causal_hybrid와 동일한 hybrid 로직으로 통합한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-03 23:50:28 +09:00

371 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
볼린저 밴드·일목·MACD·스토캐스틱·RSI·이격도 계산 (모든 봉 간격 공용).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from config import (
BB_PERIOD,
BB_STD,
DISPARITY_OVERBOUGHT,
DISPARITY_OVERSOLD,
DISPARITY_PERIODS,
MACD_FAST,
MACD_SIGNAL,
MACD_SLOW,
RSI_PERIOD,
STOCH_D_PERIOD,
STOCH_K_PERIOD,
STOCH_OVERBOUGHT,
STOCH_OVERSOLD,
STOCH_SMOOTH_K,
TREND_RANGE_MA_GAP_PCT,
)
Trend = str # "up" | "down" | "range"
def add_bollinger(
df: pd.DataFrame,
period: int = BB_PERIOD,
std_mult: float = BB_STD,
) -> pd.DataFrame:
"""
볼린저 밴드 컬럼을 추가합니다.
Args:
df: OHLCV DataFrame.
period: 중심선 기간.
std_mult: 표준편차 배수.
Returns:
MA, Upper, Lower, STD, bb_pos, BB_Width 가 추가된 DataFrame.
"""
out = df.copy()
if "MA" not in out.columns:
out["MA"] = out["Close"].rolling(period).mean()
if "Upper" not in out.columns or "Lower" not in out.columns:
std = out["Close"].rolling(period).std()
out["STD"] = std
out["Upper"] = out["MA"] + std_mult * std
out["Lower"] = out["MA"] - std_mult * std
ma = out["MA"].replace(0, np.nan)
band = (out["Upper"] - out["Lower"]).replace(0, np.nan)
out["bb_pos"] = ((out["Close"] - out["Lower"]) / band).clip(0, 1)
out["BB_Width"] = band / ma * 100
return out
def add_macd(
df: pd.DataFrame,
fast: int = MACD_FAST,
slow: int = MACD_SLOW,
signal_period: int = MACD_SIGNAL,
) -> pd.DataFrame:
"""
MACD(12,26,9) 라인·시그널·히스토그램을 추가합니다.
Args:
df: OHLCV (Close 필요).
fast: 단기 EMA 기간.
slow: 장기 EMA 기간.
signal_period: 시그널 EMA 기간.
Returns:
macd_line, macd_signal, macd_hist 컬럼이 추가된 DataFrame.
"""
out = df.copy()
close = out["Close"].astype(float)
ema_fast = close.ewm(span=fast, adjust=False).mean()
ema_slow = close.ewm(span=slow, adjust=False).mean()
out["macd_line"] = ema_fast - ema_slow
out["macd_signal"] = out["macd_line"].ewm(span=signal_period, adjust=False).mean()
out["macd_hist"] = out["macd_line"] - out["macd_signal"]
return out
def disparity_column(period: int) -> str:
"""이격도 컬럼명 (예: disparity_20)."""
return f"disparity_{period}"
def add_disparity(
df: pd.DataFrame,
periods: tuple[int, ...] | None = None,
) -> pd.DataFrame:
"""
이격도 = (종가 / SMA(n)) × 100. 100이면 이평선과 동일 위치.
Args:
df: OHLCV (Close 필요).
periods: SMA 기간 목록. None이면 config.DISPARITY_PERIODS.
Returns:
disparity_{n} 컬럼이 추가된 DataFrame.
"""
out = df.copy()
close = out["Close"].astype(float)
for p in periods or DISPARITY_PERIODS:
ma = close.rolling(p).mean()
out[disparity_column(p)] = (close / ma.replace(0, np.nan)) * 100.0
return out
def disparity_zone(value: float | None) -> str:
"""이격도 구간 라벨 (oversold / mid / overbought)."""
if value is None:
return "mid"
if value <= DISPARITY_OVERSOLD:
return "oversold"
if value >= DISPARITY_OVERBOUGHT:
return "overbought"
return "mid"
def add_stochastic(
df: pd.DataFrame,
k_period: int = STOCH_K_PERIOD,
d_period: int = STOCH_D_PERIOD,
smooth_k: int = STOCH_SMOOTH_K,
) -> pd.DataFrame:
"""
스토캐스틱 %%D를 추가합니다 (Slow Stochastic).
Args:
df: OHLCV (High, Low, Close 필요).
k_period: %K lookback.
d_period: %D SMA 기간.
smooth_k: %K SMA 평활 기간.
Returns:
stoch_k, stoch_d 컬럼이 추가된 DataFrame.
"""
out = df.copy()
h = out["High"].astype(float)
l = out["Low"].astype(float)
c = out["Close"].astype(float)
lowest = l.rolling(k_period).min()
highest = h.rolling(k_period).max()
denom = (highest - lowest).replace(0, np.nan)
raw_k = ((c - lowest) / denom) * 100.0
out["stoch_k"] = raw_k.rolling(smooth_k).mean()
out["stoch_d"] = out["stoch_k"].rolling(d_period).mean()
return out
def add_ichimoku(
df: pd.DataFrame,
tenkan: int = 9,
kijun: int = 26,
senkou_b_period: int = 52,
) -> pd.DataFrame:
"""
일목균형표 라인·구름 위치 컬럼 추가 (해당 봉 시점, 미래 데이터 미사용).
Returns:
ichi_tenkan, ichi_kijun, ichi_span_a, ichi_span_b,
ichi_cloud_top, ichi_cloud_bottom
"""
out = df.copy()
h = out["High"].astype(float)
l = out["Low"].astype(float)
c = out["Close"].astype(float)
out["ichi_tenkan"] = (h.rolling(tenkan).max() + l.rolling(tenkan).min()) / 2
out["ichi_kijun"] = (h.rolling(kijun).max() + l.rolling(kijun).min()) / 2
out["ichi_span_a"] = (out["ichi_tenkan"] + out["ichi_kijun"]) / 2
out["ichi_span_b"] = (h.rolling(senkou_b_period).max() + l.rolling(senkou_b_period).min()) / 2
out["ichi_cloud_top"] = np.maximum(out["ichi_span_a"], out["ichi_span_b"])
out["ichi_cloud_bottom"] = np.minimum(out["ichi_span_a"], out["ichi_span_b"])
return out
def prepare_entry_df(data: pd.DataFrame) -> pd.DataFrame:
"""
RSI·거래량 MA·BB 폭 등 보조 컬럼을 추가합니다.
Args:
data: BB(MA/Upper/Lower)가 계산된 OHLCV.
Returns:
RSI 등 컬럼이 추가된 DataFrame.
"""
df = data.copy()
delta = df["Close"].diff()
gain = delta.where(delta > 0, 0.0).rolling(RSI_PERIOD).mean()
loss = (-delta.where(delta < 0, 0.0)).rolling(RSI_PERIOD).mean()
rs = gain / loss.replace(0, np.nan)
df["RSI"] = 100 - (100 / (1 + rs))
df["VolMA5"] = df["Volume"].rolling(5).mean()
if "MA" in df.columns and "Upper" in df.columns and "Lower" in df.columns:
ma = df["MA"].replace(0, np.nan)
df["BB_Width"] = (df["Upper"] - df["Lower"]) / ma * 100
return df
def apply_bar_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
봉 분석·차트용 표준 지표 일괄 적용 (BB, 일목, RSI, MACD, 스토캐스틱, 이격도).
Args:
df: OHLCV DataFrame (datetime index).
Returns:
모든 지표 컬럼이 붙은 DataFrame.
"""
out = add_bollinger(df)
out = add_ichimoku(out)
out = prepare_entry_df(out)
out = add_disparity(out)
out = add_macd(out)
out = add_stochastic(out)
return out
def latest_indicator_snapshot(df: pd.DataFrame) -> dict[str, float | str | None]:
"""
최신 봉의 BB·RSI·MACD·스토캐스틱 요약 (모니터·로그용).
Args:
df: apply_bar_indicators 적용된 DataFrame.
Returns:
지표명→값 dict.
"""
if df.empty:
return {}
row = df.iloc[-1]
def _f(col: str) -> float | None:
if col not in row.index or pd.isna(row[col]):
return None
return round(float(row[col]), 4)
macd_hist = _f("macd_hist")
stoch_k = _f("stoch_k")
stoch_d = _f("stoch_d")
stoch_zone = "mid"
if stoch_k is not None:
if stoch_k <= STOCH_OVERSOLD:
stoch_zone = "oversold"
elif stoch_k >= STOCH_OVERBOUGHT:
stoch_zone = "overbought"
macd_state = "neutral"
if macd_hist is not None:
macd_state = "bull" if macd_hist > 0 else "bear"
disp: dict[str, float | None] = {}
for p in DISPARITY_PERIODS:
col = disparity_column(p)
disp[col] = _f(col)
primary = disparity_column(DISPARITY_PERIODS[0]) if DISPARITY_PERIODS else None
disp_primary = disp.get(primary) if primary else None
return {
"bb_pos": _f("bb_pos"),
"rsi": _f("RSI"),
"disparity": disp,
"disparity_primary": disp_primary,
"disparity_zone": disparity_zone(disp_primary),
"macd_line": _f("macd_line"),
"macd_signal": _f("macd_signal"),
"macd_hist": macd_hist,
"macd_state": macd_state,
"stoch_k": stoch_k,
"stoch_d": stoch_d,
"stoch_zone": stoch_zone,
}
def _trend_from_ma20(df: pd.DataFrame) -> Trend:
"""
단일 TF에서 종가·MA20·MA40 관계로 추세를 판정합니다.
Args:
df: OHLCV+지표 DataFrame.
Returns:
up | down | range.
"""
if len(df) < 20:
return "range"
close = float(df["Close"].iloc[-1])
ma20_col = "MA20" if "MA20" in df.columns else "MA"
if ma20_col not in df.columns:
return "range"
ma20 = float(df[ma20_col].iloc[-1])
ma40 = float(df["MA40"].iloc[-1]) if "MA40" in df.columns and len(df) >= 40 else ma20
if ma40 == 0:
return "range"
gap = abs(ma20 - ma40) / ma40 * 100
if gap < TREND_RANGE_MA_GAP_PCT:
return "range"
if close > ma20 and ma20 > ma40:
return "up"
if close < ma20 and ma20 < ma40:
return "down"
return "range"
def get_trend(df_1d: pd.DataFrame, df_1h: pd.DataFrame) -> Trend:
"""
일봉·1시간봉 기준 추세(up/down/range)를 반환합니다.
Args:
df_1d: 일봉 OHLCV+지표.
df_1h: 1시간봉 OHLCV+지표.
Returns:
추세 문자열.
"""
if len(df_1d) < 20 or len(df_1h) < 40:
return "range"
d_close = float(df_1d["Close"].iloc[-1])
d_ma20 = float(df_1d["MA20"].iloc[-1])
h_close = float(df_1h["Close"].iloc[-1])
h_ma20 = float(df_1h["MA20"].iloc[-1])
h_ma40 = float(df_1h["MA40"].iloc[-1])
if h_ma40 == 0:
return "range"
ma_gap_pct = abs(h_ma20 - h_ma40) / h_ma40 * 100
if ma_gap_pct < TREND_RANGE_MA_GAP_PCT:
return "range"
if d_close > d_ma20 and h_ma20 > h_ma40 and h_close > h_ma20:
return "up"
if d_close < d_ma20 and h_ma20 < h_ma40 and h_close < h_ma20:
return "down"
return "range"
def get_mtf_trend_summary(
frames: dict[int, pd.DataFrame],
interval_keys: tuple[int, ...],
) -> dict[str, str]:
"""
여러 TF의 종가·이동평균 기준 추세 요약(주·월봉 포함).
Args:
frames: interval → OHLCV+지표.
interval_keys: 요약할 간격(분) 목록.
Returns:
interval_label → up/down/range.
"""
from deepcoin.data.mtf_bb import interval_label
out: dict[str, str] = {}
for iv in interval_keys:
df = frames.get(iv)
if df is None or df.empty:
continue
out[interval_label(iv)] = _trend_from_ma20(df)
return out