로고스/루트 레거시를 제거하고 deepcoin 패키지·scripts 01~05 CLI·docs/reference로 데이터·GT·분석·매칭·운영 단계를 정리했다. config와 .env 기반 설정, trade_anaysis.html 동기화 포함. Co-authored-by: Cursor <cursoragent@cursor.com>
303 lines
10 KiB
Python
303 lines
10 KiB
Python
"""
|
|
general_analysis 차트·가격 패턴 (반전·지속·박스).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from config import GA_PATTERN_TOLERANCE_PCT, GA_PIVOT_ORDER
|
|
from deepcoin.analysis.general_analysis_core import find_pivots, ga_col, last_row_dict
|
|
|
|
|
|
def _pct_diff(a: float, b: float) -> float:
|
|
return abs(a - b) / max(abs(a), abs(b), 1e-9) * 100
|
|
|
|
|
|
def general_analysis_detect_patterns(win: pd.DataFrame) -> dict[str, int | float | str | None]:
|
|
"""
|
|
lookback 윈도우 마지막 봉 기준 패턴 라벨 (0/1 및 요약).
|
|
|
|
Args:
|
|
win: OHLCV (+ 지표 선택).
|
|
|
|
Returns:
|
|
ga_pattern_* 키 dict (접두사 없음, ga_col로 감쌈).
|
|
"""
|
|
res: dict[str, int | float | str | None] = {
|
|
"pattern_double_top": 0,
|
|
"pattern_double_bottom": 0,
|
|
"pattern_head_shoulders": 0,
|
|
"pattern_inv_head_shoulders": 0,
|
|
"pattern_triangle_sym": 0,
|
|
"pattern_triangle_asc": 0,
|
|
"pattern_triangle_desc": 0,
|
|
"pattern_flag_bull": 0,
|
|
"pattern_flag_bear": 0,
|
|
"pattern_wedge_rising": 0,
|
|
"pattern_wedge_falling": 0,
|
|
"pattern_rectangle": 0,
|
|
"pattern_channel_up": 0,
|
|
"pattern_channel_down": 0,
|
|
"pattern_measured_move": 0,
|
|
"pattern_rounding_top": 0,
|
|
"pattern_rounding_bottom": 0,
|
|
"pattern_gap_up": 0,
|
|
"pattern_gap_down": 0,
|
|
"pattern_v_bottom": 0,
|
|
"pattern_spike_top": 0,
|
|
"pattern_triple_top": 0,
|
|
"pattern_triple_bottom": 0,
|
|
"pattern_cup_handle": 0,
|
|
"pattern_keystone_bull": 0,
|
|
"pattern_keystone_bear": 0,
|
|
"pattern_island_top": 0,
|
|
"pattern_island_bottom": 0,
|
|
"pattern_label": "none",
|
|
}
|
|
if win is None or len(win) < 20:
|
|
return res
|
|
|
|
h = win["High"].astype(float).values
|
|
l = win["Low"].astype(float).values
|
|
c = win["Close"].astype(float).values
|
|
peaks, troughs = find_pivots(h, l, order=GA_PIVOT_ORDER)
|
|
tol = GA_PATTERN_TOLERANCE_PCT
|
|
|
|
if len(peaks) >= 3:
|
|
p1, p2, p3 = peaks[-3], peaks[-2], peaks[-1]
|
|
if (
|
|
_pct_diff(h[p1], h[p2]) < tol
|
|
and _pct_diff(h[p2], h[p3]) < tol
|
|
and p1 < p2 < p3
|
|
):
|
|
res["pattern_triple_top"] = 1
|
|
res["pattern_label"] = "triple_top"
|
|
|
|
if len(troughs) >= 3:
|
|
t1, t2, t3 = troughs[-3], troughs[-2], troughs[-1]
|
|
if (
|
|
_pct_diff(l[t1], l[t2]) < tol
|
|
and _pct_diff(l[t2], l[t3]) < tol
|
|
and t1 < t2 < t3
|
|
):
|
|
res["pattern_triple_bottom"] = 1
|
|
res["pattern_label"] = "triple_bottom"
|
|
|
|
if len(peaks) >= 2:
|
|
p1, p2 = peaks[-2], peaks[-1]
|
|
if _pct_diff(h[p1], h[p2]) < tol:
|
|
res["pattern_double_top"] = 1
|
|
if res["pattern_label"] == "none":
|
|
res["pattern_label"] = "double_top"
|
|
|
|
if len(troughs) >= 2:
|
|
t1, t2 = troughs[-2], troughs[-1]
|
|
if _pct_diff(l[t1], l[t2]) < tol:
|
|
res["pattern_double_bottom"] = 1
|
|
res["pattern_label"] = "double_bottom"
|
|
|
|
if len(peaks) >= 3:
|
|
i, j, k = peaks[-3], peaks[-2], peaks[-1]
|
|
if h[j] > h[i] and h[j] > h[k] and _pct_diff(h[i], h[k]) < tol * 1.5:
|
|
res["pattern_head_shoulders"] = 1
|
|
res["pattern_label"] = "head_shoulders"
|
|
|
|
if len(troughs) >= 3:
|
|
i, j, k = troughs[-3], troughs[-2], troughs[-1]
|
|
if l[j] < l[i] and l[j] < l[k] and _pct_diff(l[i], l[k]) < tol * 1.5:
|
|
res["pattern_inv_head_shoulders"] = 1
|
|
res["pattern_label"] = "inv_head_shoulders"
|
|
|
|
n = len(win)
|
|
x = np.arange(n)
|
|
high_slope = np.polyfit(x, h, 1)[0]
|
|
low_slope = np.polyfit(x, l, 1)[0]
|
|
if high_slope < 0 and low_slope > 0:
|
|
res["pattern_triangle_sym"] = 1
|
|
if res["pattern_label"] == "none":
|
|
res["pattern_label"] = "triangle_sym"
|
|
if high_slope < 0 and low_slope > 0 and low_slope > abs(high_slope) * 0.5:
|
|
res["pattern_triangle_asc"] = 1
|
|
if high_slope < 0 and low_slope < 0 and abs(high_slope) > abs(low_slope) * 0.5:
|
|
res["pattern_triangle_desc"] = 1
|
|
|
|
rng_pct = (h.max() - l.min()) / max(c[-1], 1e-9) * 100
|
|
if rng_pct < 8 and abs(high_slope) < c[-1] * 0.0001:
|
|
res["pattern_rectangle"] = 1
|
|
if res["pattern_label"] == "none":
|
|
res["pattern_label"] = "rectangle"
|
|
|
|
leg = max(n // 3, 5)
|
|
if n > leg * 2:
|
|
first_move = (c[leg] - c[0]) / max(c[0], 1e-9) * 100
|
|
channel = (c[-1] - c[-leg]) / max(c[-leg], 1e-9) * 100
|
|
if first_move > 5 and abs(channel) < 3:
|
|
res["pattern_flag_bull"] = 1
|
|
res["pattern_label"] = "flag_bull"
|
|
if first_move < -5 and abs(channel) < 3:
|
|
res["pattern_flag_bear"] = 1
|
|
res["pattern_label"] = "flag_bear"
|
|
|
|
if high_slope > 0 and low_slope > 0:
|
|
res["pattern_wedge_rising"] = 1
|
|
if high_slope < 0 and low_slope < 0:
|
|
res["pattern_wedge_falling"] = 1
|
|
|
|
if high_slope > 0 and low_slope > 0:
|
|
res["pattern_channel_up"] = 1
|
|
if high_slope < 0 and low_slope < 0:
|
|
res["pattern_channel_down"] = 1
|
|
|
|
if len(c) >= 15:
|
|
mid = len(c) // 2
|
|
first_half = c[:mid].mean()
|
|
second_half = c[mid:].mean()
|
|
if c[0] > c[mid] * 1.08 and c[-1] > c[mid] * 1.05:
|
|
res["pattern_v_bottom"] = 1
|
|
res["pattern_label"] = "v_bottom"
|
|
if c[0] < c[-1] * 0.92 and c.max() > c[0] * 1.1:
|
|
res["pattern_spike_top"] = 1
|
|
|
|
o = win["Open"].astype(float).values
|
|
gap_ups: list[int] = []
|
|
gap_downs: list[int] = []
|
|
for i in range(1, min(30, n)):
|
|
if l[i] > h[i - 1]:
|
|
res["pattern_gap_up"] = 1
|
|
gap_ups.append(i)
|
|
if h[i] < l[i - 1]:
|
|
res["pattern_gap_down"] = 1
|
|
gap_downs.append(i)
|
|
for gi in gap_ups:
|
|
for gd in gap_downs:
|
|
if gd > gi and h[gi] < l[gd]:
|
|
res["pattern_island_top"] = 1
|
|
res["pattern_label"] = "island_top"
|
|
if gd > gi and l[gi] > h[gd]:
|
|
res["pattern_island_bottom"] = 1
|
|
res["pattern_label"] = "island_bottom"
|
|
|
|
# 키리스톤: 상단 수평 + 하단 상승(역키리스톤) 또는 하단 수평 + 상단 하락
|
|
if abs(high_slope) < c[-1] * 0.00005 and low_slope > 0:
|
|
res["pattern_keystone_bull"] = 1
|
|
if res["pattern_label"] == "none":
|
|
res["pattern_label"] = "keystone_bull"
|
|
if abs(low_slope) < c[-1] * 0.00005 and high_slope < 0:
|
|
res["pattern_keystone_bear"] = 1
|
|
if res["pattern_label"] == "none":
|
|
res["pattern_label"] = "keystone_bear"
|
|
|
|
# 컵앤핸들: 전반 U자 + 후반 15% 소폭 조정
|
|
if n >= 40:
|
|
cup_len = int(n * 0.65)
|
|
handle_len = max(int(n * 0.15), 5)
|
|
cup = c[:cup_len]
|
|
handle = c[-handle_len:]
|
|
rim = float(max(cup[0], cup[-1]))
|
|
bottom = float(cup.min())
|
|
depth = rim - bottom
|
|
if depth > rim * 0.08 and float(cup[-1]) > bottom + depth * 0.5:
|
|
handle_pull = float(handle.max() - handle.min())
|
|
if handle_pull < depth * 0.5 and float(c[-1]) >= rim * 0.98:
|
|
res["pattern_cup_handle"] = 1
|
|
res["pattern_label"] = "cup_handle"
|
|
|
|
if len(c) >= 30:
|
|
ma = pd.Series(c).rolling(10).mean()
|
|
if float(ma.iloc[-1]) > float(ma.iloc[-15]) > float(ma.iloc[-30]):
|
|
res["pattern_rounding_bottom"] = 1
|
|
if float(ma.iloc[-1]) < float(ma.iloc[-15]) < float(ma.iloc[-30]):
|
|
res["pattern_rounding_top"] = 1
|
|
|
|
if len(peaks) >= 2 and len(troughs) >= 2:
|
|
leg_h = h[peaks[-1]] - l[troughs[-1]]
|
|
if leg_h > 0 and c[-1] >= l[troughs[-1]] + leg_h * 0.9:
|
|
res["pattern_measured_move"] = 1
|
|
|
|
return res
|
|
|
|
|
|
def general_analysis_pattern_snapshot(win: pd.DataFrame) -> dict[str, object]:
|
|
"""패턴 dict → ga_pattern_* 컬럼명."""
|
|
raw = general_analysis_detect_patterns(win)
|
|
return {ga_col(k): v for k, v in raw.items()}
|
|
|
|
|
|
def general_analysis_pattern_columns() -> list[str]:
|
|
return [
|
|
"pattern_double_top",
|
|
"pattern_double_bottom",
|
|
"pattern_head_shoulders",
|
|
"pattern_inv_head_shoulders",
|
|
"pattern_triangle_sym",
|
|
"pattern_triangle_asc",
|
|
"pattern_triangle_desc",
|
|
"pattern_flag_bull",
|
|
"pattern_flag_bear",
|
|
"pattern_wedge_rising",
|
|
"pattern_wedge_falling",
|
|
"pattern_rectangle",
|
|
"pattern_channel_up",
|
|
"pattern_channel_down",
|
|
"pattern_measured_move",
|
|
"pattern_rounding_top",
|
|
"pattern_rounding_bottom",
|
|
"pattern_gap_up",
|
|
"pattern_gap_down",
|
|
"pattern_v_bottom",
|
|
"pattern_spike_top",
|
|
"pattern_triple_top",
|
|
"pattern_triple_bottom",
|
|
"pattern_cup_handle",
|
|
"pattern_keystone_bull",
|
|
"pattern_keystone_bear",
|
|
"pattern_island_top",
|
|
"pattern_island_bottom",
|
|
"pattern_label",
|
|
]
|
|
|
|
|
|
def general_analysis_apply_patterns_to_bars(
|
|
df: pd.DataFrame,
|
|
interval: int,
|
|
tail_rows: int | None = None,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
lookback 윈도우 패턴 라벨을 봉별 컬럼으로 채움 (최근 tail_rows만, 성능).
|
|
|
|
Args:
|
|
df: OHLCV (+ 선택적 지표).
|
|
interval: 분봉 간격.
|
|
tail_rows: None이면 전체(8천봉 이하) 또는 config tail.
|
|
|
|
Returns:
|
|
ga_pattern_* 컬럼이 추가된 DataFrame.
|
|
"""
|
|
from deepcoin.analysis.general_analysis_config import CONTEXT_TAIL_ROWS, LOOKBACK_BARS
|
|
|
|
out = df.copy()
|
|
lb = LOOKBACK_BARS.get(interval, 80)
|
|
keys = [k for k in general_analysis_pattern_columns() if k != "pattern_label"]
|
|
for k in keys:
|
|
out[ga_col(k)] = 0
|
|
out[ga_col("pattern_label")] = "none"
|
|
|
|
n = len(out)
|
|
if n < lb + 1:
|
|
return out
|
|
|
|
if tail_rows is None:
|
|
tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000)
|
|
start = max(lb, n - tail_rows)
|
|
|
|
for i in range(start, n):
|
|
win = out.iloc[i - lb : i]
|
|
det = general_analysis_detect_patterns(win)
|
|
idx = out.index[i]
|
|
for k, v in det.items():
|
|
out.at[idx, ga_col(k)] = v
|
|
|
|
return out
|