Files
Bithumb/deepcoin/analysis/general_analysis_indicators.py
dsyoon b52d61b777 WLD DeepCoin 단계별 구조 재편 및 설정·문서 통합
로고스/루트 레거시를 제거하고 deepcoin 패키지·scripts 01~05 CLI·docs/reference로
데이터·GT·분석·매칭·운영 단계를 정리했다. config와 .env 기반 설정, trade_anaysis.html 동기화 포함.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-30 22:58:25 +09:00

383 lines
13 KiB
Python

"""
general_analysis 확장 기술적 지표 (추세·모멘텀·변동성·거래량).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from config import (
BB_PERIOD,
BB_STD,
GA_ADX_PERIOD,
GA_ADX_TREND_THRESHOLD,
GA_AO_FAST,
GA_AO_SLOW,
GA_ATR_PERIOD,
GA_BB_SQUEEZE_QUANTILE,
GA_BB_SQUEEZE_WINDOW,
GA_CCI_PERIOD,
GA_CMF_PERIOD,
GA_DIVERGENCE_LOOKBACK,
GA_DONCHIAN_PERIOD,
GA_EMA_SPANS,
GA_HV_ANNUALIZE_SQRT,
GA_HV_PERCENTILE_WINDOW,
GA_HV_ROLLING_BARS,
GA_KELTNER_ATR_MULT,
GA_LINREG_WINDOW,
GA_MFI_PERIOD,
GA_PSAR_AF_MAX,
GA_PSAR_AF_START,
GA_PSAR_AF_STEP,
GA_ROC_PERIOD,
GA_SMA_PERIODS,
GA_SUPERTREND_ATR_MULT,
GA_VOL_MA_WINDOW,
GA_WILLIAMS_PERIOD,
)
from deepcoin.analysis.general_analysis_core import ga_col
from deepcoin.common.indicators import apply_bar_indicators
def _ema(series: pd.Series, span: int) -> pd.Series:
return series.ewm(span=span, adjust=False).mean()
def _parabolic_sar(
high: np.ndarray,
low: np.ndarray,
af_start: float = GA_PSAR_AF_START,
af_step: float = GA_PSAR_AF_STEP,
af_max: float = GA_PSAR_AF_MAX,
) -> tuple[np.ndarray, np.ndarray]:
"""
Parabolic SAR 시계열.
Returns:
(sar, bull_flag 0/1)
"""
n = len(high)
sar = np.zeros(n)
bull = np.ones(n, dtype=int)
if n < 2:
return sar, bull
is_bull = True
af = af_start
ep = high[0]
sar[0] = low[0]
for i in range(1, n):
prev_sar = sar[i - 1]
if is_bull:
sar[i] = prev_sar + af * (ep - prev_sar)
sar[i] = min(sar[i], low[i - 1], low[i] if i > 0 else low[i - 1])
if low[i] < sar[i]:
is_bull = False
sar[i] = ep
ep = low[i]
af = af_start
else:
if high[i] > ep:
ep = high[i]
af = min(af + af_step, af_max)
else:
sar[i] = prev_sar + af * (ep - prev_sar)
sar[i] = max(sar[i], high[i - 1], high[i] if i > 0 else high[i - 1])
if high[i] > sar[i]:
is_bull = True
sar[i] = ep
ep = high[i]
af = af_start
else:
if low[i] < ep:
ep = low[i]
af = min(af + af_step, af_max)
bull[i] = int(is_bull)
return sar, bull
def general_analysis_apply_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
기존 apply_bar_indicators 위에 ga_ 접두사 확장 지표를 추가합니다.
Args:
df: OHLCV.
Returns:
ga_* 컬럼이 추가된 DataFrame.
"""
out = apply_bar_indicators(df.copy())
o = out["Open"].astype(float)
h = out["High"].astype(float)
l = out["Low"].astype(float)
c = out["Close"].astype(float)
v = out["Volume"].astype(float)
# --- 추세: MA ---
sma_fast = GA_SMA_PERIODS[0] if GA_SMA_PERIODS else 5
sma_slow = GA_SMA_PERIODS[1] if len(GA_SMA_PERIODS) > 1 else 20
for p in GA_SMA_PERIODS:
ma = c.rolling(p).mean()
out[ga_col(f"sma_{p}")] = ma
out[ga_col(f"close_vs_sma_{p}_pct")] = (c / ma.replace(0, np.nan) - 1) * 100
ema_fast = GA_EMA_SPANS[0] if GA_EMA_SPANS else 12
ema_slow = GA_EMA_SPANS[1] if len(GA_EMA_SPANS) > 1 else 26
out[ga_col(f"ema_{ema_fast}")] = _ema(c, ema_fast)
out[ga_col(f"ema_{ema_slow}")] = _ema(c, ema_slow)
out[ga_col("golden_cross")] = (
(out[ga_col(f"sma_{sma_fast}")] > out[ga_col(f"sma_{sma_slow}")])
& (out[ga_col(f"sma_{sma_fast}")].shift(1) <= out[ga_col(f"sma_{sma_slow}")].shift(1))
).astype(int)
out[ga_col("death_cross")] = (
(out[ga_col(f"sma_{sma_fast}")] < out[ga_col(f"sma_{sma_slow}")])
& (out[ga_col(f"sma_{sma_fast}")].shift(1) >= out[ga_col(f"sma_{sma_slow}")].shift(1))
).astype(int)
# --- ATR / 변동성 ---
tr = pd.concat(
[
h - l,
(h - c.shift(1)).abs(),
(l - c.shift(1)).abs(),
],
axis=1,
).max(axis=1)
atr = tr.rolling(GA_ATR_PERIOD).mean()
out[ga_col("atr_14")] = atr
out[ga_col("atr_pct")] = atr / c.replace(0, np.nan) * 100
out[ga_col("bb_width_pct")] = out.get("BB_Width", (out["Upper"] - out["Lower"]) / out["MA"] * 100)
bw = out[ga_col("bb_width_pct")].astype(float)
out[ga_col("bb_squeeze")] = (
bw < bw.rolling(GA_BB_SQUEEZE_WINDOW).quantile(GA_BB_SQUEEZE_QUANTILE)
).astype(int)
dc = GA_DONCHIAN_PERIOD
out[ga_col("donchian_high_20")] = h.rolling(dc).max()
out[ga_col("donchian_low_20")] = l.rolling(dc).min()
out[ga_col("donchian_pos")] = (c - out[ga_col("donchian_low_20")]) / (
out[ga_col("donchian_high_20")] - out[ga_col("donchian_low_20")]
).replace(0, np.nan)
# --- 모멘텀: CCI, Williams %R ---
tp = (h + l + c) / 3
cci_period = GA_CCI_PERIOD
sma_tp = tp.rolling(cci_period).mean()
mad = tp.rolling(cci_period).apply(lambda x: np.abs(x - x.mean()).mean(), raw=True)
out[ga_col("cci_20")] = (tp - sma_tp) / (0.015 * mad.replace(0, np.nan))
out[ga_col("cci_oversold")] = (out[ga_col("cci_20")] < -100).astype(int)
out[ga_col("cci_overbought")] = (out[ga_col("cci_20")] > 100).astype(int)
hh = h.rolling(GA_WILLIAMS_PERIOD).max()
ll = l.rolling(GA_WILLIAMS_PERIOD).min()
out[ga_col("williams_r")] = (hh - c) / (hh - ll).replace(0, np.nan) * -100
out[ga_col("williams_oversold")] = (out[ga_col("williams_r")] < -80).astype(int)
out[ga_col("williams_overbought")] = (out[ga_col("williams_r")] > -20).astype(int)
div_lb = GA_DIVERGENCE_LOOKBACK
out[ga_col("roc_10")] = (c / c.shift(GA_ROC_PERIOD).replace(0, np.nan) - 1) * 100
# MFI
raw_mf = tp * v
pos_mf = raw_mf.where(tp > tp.shift(1), 0.0).rolling(GA_MFI_PERIOD).sum()
neg_mf = raw_mf.where(tp < tp.shift(1), 0.0).rolling(GA_MFI_PERIOD).sum()
mfr = pos_mf / neg_mf.replace(0, np.nan)
out[ga_col("mfi_14")] = 100 - (100 / (1 + mfr))
# MACD / RSI / Stoch 다이버전스
if "RSI" in out.columns and "macd_hist" in out.columns:
price_up = (c > c.shift(div_lb)).astype(int)
rsi_up = (out["RSI"] > out["RSI"].shift(div_lb)).astype(int)
macd_up = (out["macd_hist"] > out["macd_hist"].shift(div_lb)).astype(int)
out[ga_col("rsi_bull_div")] = ((price_up == 0) & (rsi_up == 1)).astype(int)
out[ga_col("rsi_bear_div")] = ((price_up == 1) & (rsi_up == 0)).astype(int)
out[ga_col("macd_bull_div")] = ((price_up == 0) & (macd_up == 1)).astype(int)
out[ga_col("macd_bear_div")] = ((price_up == 1) & (macd_up == 0)).astype(int)
if "stoch_k" in out.columns:
price_up = (c > c.shift(div_lb)).astype(int)
st_up = (out["stoch_k"] > out["stoch_k"].shift(div_lb)).astype(int)
out[ga_col("stoch_bull_div")] = ((price_up == 0) & (st_up == 1)).astype(int)
out[ga_col("stoch_bear_div")] = ((price_up == 1) & (st_up == 0)).astype(int)
# 봉 간 변화 (타점 Δ와 동일 정의, 전 구간)
if "RSI" in out.columns:
out[ga_col("rsi_delta_1")] = out["RSI"].diff()
if "macd_hist" in out.columns:
out[ga_col("macd_hist_delta_1")] = out["macd_hist"].diff()
if "stoch_k" in out.columns:
out[ga_col("stoch_k_delta_1")] = out["stoch_k"].diff()
# --- 거래량 ---
vol_ma = v.rolling(GA_VOL_MA_WINDOW).mean()
out[ga_col("vol_ma20")] = vol_ma
out[ga_col("vol_ratio")] = v / vol_ma.replace(0, np.nan)
out[ga_col("obv")] = (np.sign(c.diff().fillna(0)) * v).cumsum()
obv = out[ga_col("obv")].astype(float)
out[ga_col("obv_slope_10")] = obv - obv.shift(div_lb)
out[ga_col("obv_bull_div")] = (
(c < c.shift(div_lb)) & (obv > obv.shift(div_lb))
).astype(int)
out[ga_col("obv_bear_div")] = (
(c > c.shift(div_lb)) & (obv < obv.shift(div_lb))
).astype(int)
# CMF
mfv = ((c - l) - (h - c)) / (h - l).replace(0, np.nan) * v
out[ga_col("cmf_20")] = (
mfv.rolling(GA_CMF_PERIOD).sum() / v.rolling(GA_CMF_PERIOD).sum().replace(0, np.nan)
)
# Accumulation/Distribution Line
clv = ((c - l) - (h - c)) / (h - l).replace(0, np.nan)
out[ga_col("ad_line")] = (clv * v).cumsum()
ad = out[ga_col("ad_line")].astype(float)
out[ga_col("ad_slope_10")] = ad - ad.shift(div_lb)
# VWAP 근사 (누적, 세션 리셋 없음)
cum_vp = (tp * v).cumsum()
cum_v = v.cumsum().replace(0, np.nan)
out[ga_col("vwap")] = cum_vp / cum_v
out[ga_col("close_vs_vwap_pct")] = (c / out[ga_col("vwap")] - 1) * 100
# Keltner Channel
k_mid = _ema(c, BB_PERIOD)
out[ga_col("keltner_mid")] = k_mid
out[ga_col("keltner_upper")] = k_mid + GA_KELTNER_ATR_MULT * atr
out[ga_col("keltner_lower")] = k_mid - GA_KELTNER_ATR_MULT * atr
out[ga_col("keltner_pos")] = (c - out[ga_col("keltner_lower")]) / (
out[ga_col("keltner_upper")] - out[ga_col("keltner_lower")]
).replace(0, np.nan)
# Awesome Oscillator: median price SMA5 - SMA34
mp = (h + l) / 2
out[ga_col("ao")] = mp.rolling(GA_AO_FAST).mean() - mp.rolling(GA_AO_SLOW).mean()
out[ga_col("ao_bull")] = (
(out[ga_col("ao")] > 0) & (out[ga_col("ao")].shift(1) <= 0)
).astype(int)
out[ga_col("ao_bear")] = (
(out[ga_col("ao")] < 0) & (out[ga_col("ao")].shift(1) >= 0)
).astype(int)
# Historical Volatility (로그수익 20봉 표준편차, 연율화 계수 1=봉 단위)
log_ret = np.log(c / c.shift(1).replace(0, np.nan))
hv = log_ret.rolling(GA_HV_ROLLING_BARS).std() * GA_HV_ANNUALIZE_SQRT
out[ga_col("hv_20")] = hv
out[ga_col("hv_percentile")] = hv.rolling(GA_HV_PERCENTILE_WINDOW).apply(
lambda x: float((x[:-1] < x[-1]).mean()) if len(x) > 1 and not np.isnan(x[-1]) else 0.5,
raw=True,
)
# Parabolic SAR
sar, psar_bull = _parabolic_sar(h.values, l.values)
out[ga_col("psar")] = sar
out[ga_col("psar_bull")] = psar_bull
ps = pd.Series(psar_bull, index=out.index)
out[ga_col("psar_flip_bull")] = ((ps == 1) & (ps.shift(1) == 0)).astype(int)
out[ga_col("psar_flip_bear")] = ((ps == 0) & (ps.shift(1) == 1)).astype(int)
# ADX
up_move = h.diff()
down_move = -l.diff()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
atr_safe = atr.replace(0, np.nan)
plus_di = 100 * pd.Series(plus_dm, index=out.index).rolling(GA_ADX_PERIOD).mean() / atr_safe
minus_di = 100 * pd.Series(minus_dm, index=out.index).rolling(GA_ADX_PERIOD).mean() / atr_safe
dx = (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan) * 100
out[ga_col("adx_14")] = dx.rolling(GA_ADX_PERIOD).mean()
out[ga_col("plus_di")] = plus_di
out[ga_col("minus_di")] = minus_di
out[ga_col("adx_trending")] = (out[ga_col("adx_14")] > GA_ADX_TREND_THRESHOLD).astype(int)
# Supertrend 방향 (ATR 밴드)
hl2 = (h + l) / 2
upper = hl2 + GA_SUPERTREND_ATR_MULT * atr
lower = hl2 - GA_SUPERTREND_ATR_MULT * atr
out[ga_col("supertrend_bull")] = (c > lower).astype(int)
# Linear regression slope 20
def _lin_slope(y: np.ndarray) -> float:
if len(y) < 2:
return 0.0
x = np.arange(len(y))
coef = np.polyfit(x, y, 1)
return float(coef[0])
out[ga_col("linreg_slope_20")] = c.rolling(GA_LINREG_WINDOW).apply(_lin_slope, raw=True)
def _lin_r2(y: np.ndarray) -> float:
if len(y) < 3:
return 0.0
x = np.arange(len(y))
coef = np.polyfit(x, y, 1)
pred = coef[0] * x + coef[1]
ss_res = ((y - pred) ** 2).sum()
ss_tot = ((y - y.mean()) ** 2).sum()
if ss_tot < 1e-12:
return 0.0
return float(1 - ss_res / ss_tot)
out[ga_col("linreg_r2_20")] = c.rolling(GA_LINREG_WINDOW).apply(_lin_r2, raw=True)
return out
def general_analysis_indicator_columns() -> list[str]:
"""스냅샷용 ga_ 지표 컬럼 목록."""
return [
"sma_5",
"sma_20",
"sma_60",
"close_vs_sma_20_pct",
"golden_cross",
"death_cross",
"atr_14",
"atr_pct",
"bb_squeeze",
"donchian_pos",
"cci_20",
"cci_oversold",
"cci_overbought",
"williams_r",
"williams_oversold",
"williams_overbought",
"roc_10",
"mfi_14",
"rsi_bull_div",
"rsi_bear_div",
"macd_bull_div",
"macd_bear_div",
"stoch_bull_div",
"stoch_bear_div",
"rsi_delta_1",
"macd_hist_delta_1",
"stoch_k_delta_1",
"keltner_pos",
"ao",
"ao_bull",
"ao_bear",
"hv_20",
"hv_percentile",
"ad_line",
"ad_slope_10",
"vol_ratio",
"obv_slope_10",
"obv_bull_div",
"obv_bear_div",
"cmf_20",
"close_vs_vwap_pct",
"adx_14",
"adx_trending",
"supertrend_bull",
"linreg_slope_20",
"linreg_r2_20",
"psar",
"psar_bull",
"psar_flip_bull",
"psar_flip_bear",
]