""" general_analysis 차트·가격 패턴 (반전·지속·박스). """ from __future__ import annotations import numpy as np import pandas as pd from config import GA_PATTERN_TOLERANCE_PCT, GA_PIVOT_ORDER from deepcoin.analysis.general_analysis_core import find_pivots, ga_col, last_row_dict def _pct_diff(a: float, b: float) -> float: return abs(a - b) / max(abs(a), abs(b), 1e-9) * 100 def general_analysis_detect_patterns(win: pd.DataFrame) -> dict[str, int | float | str | None]: """ lookback 윈도우 마지막 봉 기준 패턴 라벨 (0/1 및 요약). Args: win: OHLCV (+ 지표 선택). Returns: ga_pattern_* 키 dict (접두사 없음, ga_col로 감쌈). """ res: dict[str, int | float | str | None] = { "pattern_double_top": 0, "pattern_double_bottom": 0, "pattern_head_shoulders": 0, "pattern_inv_head_shoulders": 0, "pattern_triangle_sym": 0, "pattern_triangle_asc": 0, "pattern_triangle_desc": 0, "pattern_flag_bull": 0, "pattern_flag_bear": 0, "pattern_wedge_rising": 0, "pattern_wedge_falling": 0, "pattern_rectangle": 0, "pattern_channel_up": 0, "pattern_channel_down": 0, "pattern_measured_move": 0, "pattern_rounding_top": 0, "pattern_rounding_bottom": 0, "pattern_gap_up": 0, "pattern_gap_down": 0, "pattern_v_bottom": 0, "pattern_spike_top": 0, "pattern_triple_top": 0, "pattern_triple_bottom": 0, "pattern_cup_handle": 0, "pattern_keystone_bull": 0, "pattern_keystone_bear": 0, "pattern_island_top": 0, "pattern_island_bottom": 0, "pattern_label": "none", } if win is None or len(win) < 20: return res h = win["High"].astype(float).values l = win["Low"].astype(float).values c = win["Close"].astype(float).values peaks, troughs = find_pivots(h, l, order=GA_PIVOT_ORDER) tol = GA_PATTERN_TOLERANCE_PCT if len(peaks) >= 3: p1, p2, p3 = peaks[-3], peaks[-2], peaks[-1] if ( _pct_diff(h[p1], h[p2]) < tol and _pct_diff(h[p2], h[p3]) < tol and p1 < p2 < p3 ): res["pattern_triple_top"] = 1 res["pattern_label"] = "triple_top" if len(troughs) >= 3: t1, t2, t3 = troughs[-3], troughs[-2], troughs[-1] if ( _pct_diff(l[t1], l[t2]) < tol and _pct_diff(l[t2], l[t3]) < tol and t1 < t2 < t3 ): res["pattern_triple_bottom"] = 1 res["pattern_label"] = "triple_bottom" if len(peaks) >= 2: p1, p2 = peaks[-2], peaks[-1] if _pct_diff(h[p1], h[p2]) < tol: res["pattern_double_top"] = 1 if res["pattern_label"] == "none": res["pattern_label"] = "double_top" if len(troughs) >= 2: t1, t2 = troughs[-2], troughs[-1] if _pct_diff(l[t1], l[t2]) < tol: res["pattern_double_bottom"] = 1 res["pattern_label"] = "double_bottom" if len(peaks) >= 3: i, j, k = peaks[-3], peaks[-2], peaks[-1] if h[j] > h[i] and h[j] > h[k] and _pct_diff(h[i], h[k]) < tol * 1.5: res["pattern_head_shoulders"] = 1 res["pattern_label"] = "head_shoulders" if len(troughs) >= 3: i, j, k = troughs[-3], troughs[-2], troughs[-1] if l[j] < l[i] and l[j] < l[k] and _pct_diff(l[i], l[k]) < tol * 1.5: res["pattern_inv_head_shoulders"] = 1 res["pattern_label"] = "inv_head_shoulders" n = len(win) x = np.arange(n) high_slope = np.polyfit(x, h, 1)[0] low_slope = np.polyfit(x, l, 1)[0] if high_slope < 0 and low_slope > 0: res["pattern_triangle_sym"] = 1 if res["pattern_label"] == "none": res["pattern_label"] = "triangle_sym" if high_slope < 0 and low_slope > 0 and low_slope > abs(high_slope) * 0.5: res["pattern_triangle_asc"] = 1 if high_slope < 0 and low_slope < 0 and abs(high_slope) > abs(low_slope) * 0.5: res["pattern_triangle_desc"] = 1 rng_pct = (h.max() - l.min()) / max(c[-1], 1e-9) * 100 if rng_pct < 8 and abs(high_slope) < c[-1] * 0.0001: res["pattern_rectangle"] = 1 if res["pattern_label"] == "none": res["pattern_label"] = "rectangle" leg = max(n // 3, 5) if n > leg * 2: first_move = (c[leg] - c[0]) / max(c[0], 1e-9) * 100 channel = (c[-1] - c[-leg]) / max(c[-leg], 1e-9) * 100 if first_move > 5 and abs(channel) < 3: res["pattern_flag_bull"] = 1 res["pattern_label"] = "flag_bull" if first_move < -5 and abs(channel) < 3: res["pattern_flag_bear"] = 1 res["pattern_label"] = "flag_bear" if high_slope > 0 and low_slope > 0: res["pattern_wedge_rising"] = 1 if high_slope < 0 and low_slope < 0: res["pattern_wedge_falling"] = 1 if high_slope > 0 and low_slope > 0: res["pattern_channel_up"] = 1 if high_slope < 0 and low_slope < 0: res["pattern_channel_down"] = 1 if len(c) >= 15: mid = len(c) // 2 first_half = c[:mid].mean() second_half = c[mid:].mean() if c[0] > c[mid] * 1.08 and c[-1] > c[mid] * 1.05: res["pattern_v_bottom"] = 1 res["pattern_label"] = "v_bottom" if c[0] < c[-1] * 0.92 and c.max() > c[0] * 1.1: res["pattern_spike_top"] = 1 o = win["Open"].astype(float).values gap_ups: list[int] = [] gap_downs: list[int] = [] for i in range(1, min(30, n)): if l[i] > h[i - 1]: res["pattern_gap_up"] = 1 gap_ups.append(i) if h[i] < l[i - 1]: res["pattern_gap_down"] = 1 gap_downs.append(i) for gi in gap_ups: for gd in gap_downs: if gd > gi and h[gi] < l[gd]: res["pattern_island_top"] = 1 res["pattern_label"] = "island_top" if gd > gi and l[gi] > h[gd]: res["pattern_island_bottom"] = 1 res["pattern_label"] = "island_bottom" # 키리스톤: 상단 수평 + 하단 상승(역키리스톤) 또는 하단 수평 + 상단 하락 if abs(high_slope) < c[-1] * 0.00005 and low_slope > 0: res["pattern_keystone_bull"] = 1 if res["pattern_label"] == "none": res["pattern_label"] = "keystone_bull" if abs(low_slope) < c[-1] * 0.00005 and high_slope < 0: res["pattern_keystone_bear"] = 1 if res["pattern_label"] == "none": res["pattern_label"] = "keystone_bear" # 컵앤핸들: 전반 U자 + 후반 15% 소폭 조정 if n >= 40: cup_len = int(n * 0.65) handle_len = max(int(n * 0.15), 5) cup = c[:cup_len] handle = c[-handle_len:] rim = float(max(cup[0], cup[-1])) bottom = float(cup.min()) depth = rim - bottom if depth > rim * 0.08 and float(cup[-1]) > bottom + depth * 0.5: handle_pull = float(handle.max() - handle.min()) if handle_pull < depth * 0.5 and float(c[-1]) >= rim * 0.98: res["pattern_cup_handle"] = 1 res["pattern_label"] = "cup_handle" if len(c) >= 30: ma = pd.Series(c).rolling(10).mean() if float(ma.iloc[-1]) > float(ma.iloc[-15]) > float(ma.iloc[-30]): res["pattern_rounding_bottom"] = 1 if float(ma.iloc[-1]) < float(ma.iloc[-15]) < float(ma.iloc[-30]): res["pattern_rounding_top"] = 1 if len(peaks) >= 2 and len(troughs) >= 2: leg_h = h[peaks[-1]] - l[troughs[-1]] if leg_h > 0 and c[-1] >= l[troughs[-1]] + leg_h * 0.9: res["pattern_measured_move"] = 1 return res def general_analysis_pattern_snapshot(win: pd.DataFrame) -> dict[str, object]: """패턴 dict → ga_pattern_* 컬럼명.""" raw = general_analysis_detect_patterns(win) return {ga_col(k): v for k, v in raw.items()} def general_analysis_pattern_columns() -> list[str]: return [ "pattern_double_top", "pattern_double_bottom", "pattern_head_shoulders", "pattern_inv_head_shoulders", "pattern_triangle_sym", "pattern_triangle_asc", "pattern_triangle_desc", "pattern_flag_bull", "pattern_flag_bear", "pattern_wedge_rising", "pattern_wedge_falling", "pattern_rectangle", "pattern_channel_up", "pattern_channel_down", "pattern_measured_move", "pattern_rounding_top", "pattern_rounding_bottom", "pattern_gap_up", "pattern_gap_down", "pattern_v_bottom", "pattern_spike_top", "pattern_triple_top", "pattern_triple_bottom", "pattern_cup_handle", "pattern_keystone_bull", "pattern_keystone_bear", "pattern_island_top", "pattern_island_bottom", "pattern_label", ] def general_analysis_apply_patterns_to_bars( df: pd.DataFrame, interval: int, tail_rows: int | None = None, ) -> pd.DataFrame: """ lookback 윈도우 패턴 라벨을 봉별 컬럼으로 채움 (최근 tail_rows만, 성능). Args: df: OHLCV (+ 선택적 지표). interval: 분봉 간격. tail_rows: None이면 전체(8천봉 이하) 또는 config tail. Returns: ga_pattern_* 컬럼이 추가된 DataFrame. """ from deepcoin.analysis.general_analysis_config import CONTEXT_TAIL_ROWS, LOOKBACK_BARS out = df.copy() lb = LOOKBACK_BARS.get(interval, 80) keys = [k for k in general_analysis_pattern_columns() if k != "pattern_label"] for k in keys: out[ga_col(k)] = 0 out[ga_col("pattern_label")] = "none" n = len(out) if n < lb + 1: return out if tail_rows is None: tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000) start = max(lb, n - tail_rows) for i in range(start, n): win = out.iloc[i - lb : i] det = general_analysis_detect_patterns(win) idx = out.index[i] for k, v in det.items(): out.at[idx, ga_col(k)] = v return out