""" general_analysis 파동·시장 구조 (다우, 엘리어트 라이트, 피보나치, Wyckoff 태그). """ from __future__ import annotations import numpy as np import pandas as pd from deepcoin.analysis.general_analysis_core import find_pivots, ga_col def _fib_levels(low: float, high: float) -> dict[str, float]: diff = high - low return { "fib_0": low, "fib_382": low + diff * 0.382, "fib_500": low + diff * 0.5, "fib_618": low + diff * 0.618, "fib_100": high, "fib_1618": low + diff * 1.618, } def general_analysis_wave_snapshot(win: pd.DataFrame) -> dict[str, object]: """ lookback 윈도우 마지막 시점 파동·구조 스냅샷. Args: win: OHLCV. Returns: ga_wave_* / ga_struct_* / ga_fib_* dict. """ res: dict[str, object] = { "struct_trend": "range", "struct_hh": 0, "struct_hl": 0, "struct_lh": 0, "struct_ll": 0, "struct_bos_bull": 0, "struct_bos_bear": 0, "struct_choch": 0, "elliott_wave_count": 0, "elliott_phase": "unknown", "wyckoff_phase": "unknown", "fib_near_level": "none", "ichi_trend": "neutral", "pitchfork_bias": "neutral", "pitchfork_dist_pct": 0.0, "wyckoff_spring": 0, "wyckoff_utad": 0, } if win is None or len(win) < 15: return {ga_col(k): v for k, v in res.items()} h = win["High"].astype(float).values l = win["Low"].astype(float).values c = win["Close"].astype(float).values peaks, troughs = find_pivots(h, l, order=2) # Dow HH/HL/LH/LL if len(peaks) >= 2 and len(troughs) >= 2: hh = int(h[peaks[-1]] > h[peaks[-2]]) hl = int(l[troughs[-1]] > l[troughs[-2]]) lh = int(h[peaks[-1]] < h[peaks[-2]]) ll = int(l[troughs[-1]] < l[troughs[-2]]) res["struct_hh"] = hh res["struct_hl"] = hl res["struct_lh"] = lh res["struct_ll"] = ll if hh and hl: res["struct_trend"] = "up" elif lh and ll: res["struct_trend"] = "down" if hh and c[-1] > h[peaks[-2]]: res["struct_bos_bull"] = 1 if ll and c[-1] < l[troughs[-2]]: res["struct_bos_bear"] = 1 if (hh and ll) or (lh and hl): res["struct_choch"] = 1 # Elliott lite: pivot count in window swings = len(peaks) + len(troughs) res["elliott_wave_count"] = swings if swings >= 5: res["elliott_phase"] = "impulse_late" elif swings >= 3: res["elliott_phase"] = "corrective" # Wyckoff lite vol = win["Volume"].astype(float).values if "Volume" in win.columns else np.ones(len(c)) vol_ma = vol[-20:].mean() if len(vol) >= 20 else vol.mean() price_range = (h[-20:].max() - l[-20:].min()) / max(c[-1], 1e-9) * 100 if price_range < 6 and vol[-1] < vol_ma * 1.2: res["wyckoff_phase"] = "accumulation" elif price_range < 6 and vol[-1] > vol_ma * 1.5 and c[-1] > c[-5]: res["wyckoff_phase"] = "distribution" if price_range < 8 and l[-1] < l[-5] and c[-1] > c[-2] and vol[-1] > vol_ma * 1.3: res["wyckoff_spring"] = 1 if price_range < 8 and h[-1] > h[-5] and c[-1] < c[-2] and vol[-1] > vol_ma * 1.3: res["wyckoff_utad"] = 1 # Andrews Pitchfork (3피벗 중앙선 대비 종가 위치) pivots = sorted([(i, h[i]) for i in peaks] + [(i, l[i]) for i in troughs]) if len(pivots) >= 3: p0, p1, p2 = pivots[-3], pivots[-2], pivots[-1] y0 = p0[1] y_mid = (p1[1] + p2[1]) / 2 x0, x2 = p0[0], p2[0] if x2 != x0: slope = (y_mid - y0) / (x2 - x0) y_line = y0 + slope * (len(c) - 1 - x0) dist_pct = (c[-1] - y_line) / max(c[-1], 1e-9) * 100 res["pitchfork_dist_pct"] = round(float(dist_pct), 3) if dist_pct > 0.5: res["pitchfork_bias"] = "above" elif dist_pct < -0.5: res["pitchfork_bias"] = "below" # Fibonacci hi, lo = float(h.max()), float(l.min()) levels = _fib_levels(lo, hi) price = float(c[-1]) for name, lvl in levels.items(): if abs(price - lvl) / max(price, 1e-9) * 100 < 1.5: res["fib_near_level"] = name.replace("fib_", "") break if "ichi_cloud_top" in win.columns: row = win.iloc[-1] ct = float(row.get("ichi_cloud_top", np.nan)) cb = float(row.get("ichi_cloud_bottom", np.nan)) if not np.isnan(ct) and price > ct: res["ichi_trend"] = "above_cloud" elif not np.isnan(cb) and price < cb: res["ichi_trend"] = "below_cloud" else: res["ichi_trend"] = "in_cloud" return {ga_col(k): v for k, v in res.items()} def general_analysis_wave_columns() -> list[str]: return [ "struct_trend", "struct_hh", "struct_hl", "struct_lh", "struct_ll", "struct_bos_bull", "struct_bos_bear", "struct_choch", "elliott_wave_count", "elliott_phase", "wyckoff_phase", "fib_near_level", "ichi_trend", "pitchfork_bias", "pitchfork_dist_pct", "wyckoff_spring", "wyckoff_utad", ] def general_analysis_apply_wave_to_bars( df: pd.DataFrame, interval: int, tail_rows: int | None = None, ) -> pd.DataFrame: """파동·구조 스냅샷을 최근 봉에 롤링 적용.""" from deepcoin.analysis.general_analysis_config import CONTEXT_TAIL_ROWS, LOOKBACK_BARS out = df.copy() lb = LOOKBACK_BARS.get(interval, 80) for k in general_analysis_wave_columns(): col = ga_col(k) if k == "struct_trend": out[col] = "range" elif k in ("elliott_phase", "wyckoff_phase"): out[col] = "unknown" elif k == "fib_near_level": out[col] = "none" elif k in ("ichi_trend", "pitchfork_bias"): out[col] = "neutral" elif k == "pitchfork_dist_pct": out[col] = 0.0 else: out[col] = 0 n = len(out) if n < lb + 1: return out if tail_rows is None: tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000) start = max(lb, n - tail_rows) for i in range(start, n): snap = general_analysis_wave_snapshot(out.iloc[i - lb : i]) idx = out.index[i] for k, v in snap.items(): out.at[idx, k] = v return out