""" 볼린저 밴드·일목·MACD·스토캐스틱·RSI·이격도 계산 (모든 봉 간격 공용). """ from __future__ import annotations import numpy as np import pandas as pd from config import ( BB_PERIOD, BB_STD, DISPARITY_OVERBOUGHT, DISPARITY_OVERSOLD, DISPARITY_PERIODS, MACD_FAST, MACD_SIGNAL, MACD_SLOW, RSI_PERIOD, STOCH_D_PERIOD, STOCH_K_PERIOD, STOCH_OVERBOUGHT, STOCH_OVERSOLD, STOCH_SMOOTH_K, TREND_RANGE_MA_GAP_PCT, ) Trend = str # "up" | "down" | "range" def add_bollinger( df: pd.DataFrame, period: int = BB_PERIOD, std_mult: float = BB_STD, ) -> pd.DataFrame: """ 볼린저 밴드 컬럼을 추가합니다. Args: df: OHLCV DataFrame. period: 중심선 기간. std_mult: 표준편차 배수. Returns: MA, Upper, Lower, STD, bb_pos, BB_Width 가 추가된 DataFrame. """ out = df.copy() if "MA" not in out.columns: out["MA"] = out["Close"].rolling(period).mean() if "Upper" not in out.columns or "Lower" not in out.columns: std = out["Close"].rolling(period).std() out["STD"] = std out["Upper"] = out["MA"] + std_mult * std out["Lower"] = out["MA"] - std_mult * std ma = out["MA"].replace(0, np.nan) band = (out["Upper"] - out["Lower"]).replace(0, np.nan) out["bb_pos"] = ((out["Close"] - out["Lower"]) / band).clip(0, 1) out["BB_Width"] = band / ma * 100 return out def add_macd( df: pd.DataFrame, fast: int = MACD_FAST, slow: int = MACD_SLOW, signal_period: int = MACD_SIGNAL, ) -> pd.DataFrame: """ MACD(12,26,9) 라인·시그널·히스토그램을 추가합니다. Args: df: OHLCV (Close 필요). fast: 단기 EMA 기간. slow: 장기 EMA 기간. signal_period: 시그널 EMA 기간. Returns: macd_line, macd_signal, macd_hist 컬럼이 추가된 DataFrame. """ out = df.copy() close = out["Close"].astype(float) ema_fast = close.ewm(span=fast, adjust=False).mean() ema_slow = close.ewm(span=slow, adjust=False).mean() out["macd_line"] = ema_fast - ema_slow out["macd_signal"] = out["macd_line"].ewm(span=signal_period, adjust=False).mean() out["macd_hist"] = out["macd_line"] - out["macd_signal"] return out def disparity_column(period: int) -> str: """이격도 컬럼명 (예: disparity_20).""" return f"disparity_{period}" def add_disparity( df: pd.DataFrame, periods: tuple[int, ...] | None = None, ) -> pd.DataFrame: """ 이격도 = (종가 / SMA(n)) × 100. 100이면 이평선과 동일 위치. Args: df: OHLCV (Close 필요). periods: SMA 기간 목록. None이면 config.DISPARITY_PERIODS. Returns: disparity_{n} 컬럼이 추가된 DataFrame. """ out = df.copy() close = out["Close"].astype(float) for p in periods or DISPARITY_PERIODS: ma = close.rolling(p).mean() out[disparity_column(p)] = (close / ma.replace(0, np.nan)) * 100.0 return out def disparity_zone(value: float | None) -> str: """이격도 구간 라벨 (oversold / mid / overbought).""" if value is None: return "mid" if value <= DISPARITY_OVERSOLD: return "oversold" if value >= DISPARITY_OVERBOUGHT: return "overbought" return "mid" def add_stochastic( df: pd.DataFrame, k_period: int = STOCH_K_PERIOD, d_period: int = STOCH_D_PERIOD, smooth_k: int = STOCH_SMOOTH_K, ) -> pd.DataFrame: """ 스토캐스틱 %K·%D를 추가합니다 (Slow Stochastic). Args: df: OHLCV (High, Low, Close 필요). k_period: %K lookback. d_period: %D SMA 기간. smooth_k: %K SMA 평활 기간. Returns: stoch_k, stoch_d 컬럼이 추가된 DataFrame. """ out = df.copy() h = out["High"].astype(float) l = out["Low"].astype(float) c = out["Close"].astype(float) lowest = l.rolling(k_period).min() highest = h.rolling(k_period).max() denom = (highest - lowest).replace(0, np.nan) raw_k = ((c - lowest) / denom) * 100.0 out["stoch_k"] = raw_k.rolling(smooth_k).mean() out["stoch_d"] = out["stoch_k"].rolling(d_period).mean() return out def add_ichimoku( df: pd.DataFrame, tenkan: int = 9, kijun: int = 26, senkou_b_period: int = 52, ) -> pd.DataFrame: """ 일목균형표 라인·구름 위치 컬럼 추가 (해당 봉 시점, 미래 데이터 미사용). Returns: ichi_tenkan, ichi_kijun, ichi_span_a, ichi_span_b, ichi_cloud_top, ichi_cloud_bottom """ out = df.copy() h = out["High"].astype(float) l = out["Low"].astype(float) c = out["Close"].astype(float) out["ichi_tenkan"] = (h.rolling(tenkan).max() + l.rolling(tenkan).min()) / 2 out["ichi_kijun"] = (h.rolling(kijun).max() + l.rolling(kijun).min()) / 2 out["ichi_span_a"] = (out["ichi_tenkan"] + out["ichi_kijun"]) / 2 out["ichi_span_b"] = (h.rolling(senkou_b_period).max() + l.rolling(senkou_b_period).min()) / 2 out["ichi_cloud_top"] = np.maximum(out["ichi_span_a"], out["ichi_span_b"]) out["ichi_cloud_bottom"] = np.minimum(out["ichi_span_a"], out["ichi_span_b"]) return out def prepare_entry_df(data: pd.DataFrame) -> pd.DataFrame: """ RSI·거래량 MA·BB 폭 등 보조 컬럼을 추가합니다. Args: data: BB(MA/Upper/Lower)가 계산된 OHLCV. Returns: RSI 등 컬럼이 추가된 DataFrame. """ df = data.copy() delta = df["Close"].diff() gain = delta.where(delta > 0, 0.0).rolling(RSI_PERIOD).mean() loss = (-delta.where(delta < 0, 0.0)).rolling(RSI_PERIOD).mean() rs = gain / loss.replace(0, np.nan) df["RSI"] = 100 - (100 / (1 + rs)) df["VolMA5"] = df["Volume"].rolling(5).mean() if "MA" in df.columns and "Upper" in df.columns and "Lower" in df.columns: ma = df["MA"].replace(0, np.nan) df["BB_Width"] = (df["Upper"] - df["Lower"]) / ma * 100 return df def apply_bar_indicators(df: pd.DataFrame) -> pd.DataFrame: """ 봉 분석·차트용 표준 지표 일괄 적용 (BB, 일목, RSI, MACD, 스토캐스틱, 이격도). Args: df: OHLCV DataFrame (datetime index). Returns: 모든 지표 컬럼이 붙은 DataFrame. """ out = add_bollinger(df) out = add_ichimoku(out) out = prepare_entry_df(out) out = add_disparity(out) out = add_macd(out) out = add_stochastic(out) return out def latest_indicator_snapshot(df: pd.DataFrame) -> dict[str, float | str | None]: """ 최신 봉의 BB·RSI·MACD·스토캐스틱 요약 (모니터·로그용). Args: df: apply_bar_indicators 적용된 DataFrame. Returns: 지표명→값 dict. """ if df.empty: return {} row = df.iloc[-1] def _f(col: str) -> float | None: if col not in row.index or pd.isna(row[col]): return None return round(float(row[col]), 4) macd_hist = _f("macd_hist") stoch_k = _f("stoch_k") stoch_d = _f("stoch_d") stoch_zone = "mid" if stoch_k is not None: if stoch_k <= STOCH_OVERSOLD: stoch_zone = "oversold" elif stoch_k >= STOCH_OVERBOUGHT: stoch_zone = "overbought" macd_state = "neutral" if macd_hist is not None: macd_state = "bull" if macd_hist > 0 else "bear" disp: dict[str, float | None] = {} for p in DISPARITY_PERIODS: col = disparity_column(p) disp[col] = _f(col) primary = disparity_column(DISPARITY_PERIODS[0]) if DISPARITY_PERIODS else None disp_primary = disp.get(primary) if primary else None return { "bb_pos": _f("bb_pos"), "rsi": _f("RSI"), "disparity": disp, "disparity_primary": disp_primary, "disparity_zone": disparity_zone(disp_primary), "macd_line": _f("macd_line"), "macd_signal": _f("macd_signal"), "macd_hist": macd_hist, "macd_state": macd_state, "stoch_k": stoch_k, "stoch_d": stoch_d, "stoch_zone": stoch_zone, } def get_trend(df_1d: pd.DataFrame, df_1h: pd.DataFrame) -> Trend: """ 일봉·1시간봉 기준 추세(up/down/range)를 반환합니다. Args: df_1d: 일봉 OHLCV+지표. df_1h: 1시간봉 OHLCV+지표. Returns: 추세 문자열. """ if len(df_1d) < 20 or len(df_1h) < 40: return "range" d_close = float(df_1d["Close"].iloc[-1]) d_ma20 = float(df_1d["MA20"].iloc[-1]) h_close = float(df_1h["Close"].iloc[-1]) h_ma20 = float(df_1h["MA20"].iloc[-1]) h_ma40 = float(df_1h["MA40"].iloc[-1]) if h_ma40 == 0: return "range" ma_gap_pct = abs(h_ma20 - h_ma40) / h_ma40 * 100 if ma_gap_pct < TREND_RANGE_MA_GAP_PCT: return "range" if d_close > d_ma20 and h_ma20 > h_ma40 and h_close > h_ma20: return "up" if d_close < d_ma20 and h_ma20 < h_ma40 and h_close < h_ma20: return "down" return "range"