WLD DeepCoin 단계별 구조 재편 및 설정·문서 통합

로고스/루트 레거시를 제거하고 deepcoin 패키지·scripts 01~05 CLI·docs/reference로
데이터·GT·분석·매칭·운영 단계를 정리했다. config와 .env 기반 설정, trade_anaysis.html 동기화 포함.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-05-30 22:58:25 +09:00
parent e631a5701f
commit b52d61b777
76 changed files with 11552 additions and 4567 deletions

View File

@@ -0,0 +1,6 @@
# analysis — 03·03b 기술적 분석
- **03 enrich**: `general_analysis_enrich_runner.py` — 봉 전구간 지표·패턴 → `docs/03_analysis/latest/`
- **03b GT 스냅샷**: `general_analysis_runner.py` — 정답 매수·매도 시점 MTF 상태 → `general_analysis_trades.csv`
실행은 `scripts/03_analyze_enrich.py`, `scripts/03_analyze_trades.py`만 사용합니다.

View File

View File

@@ -0,0 +1,153 @@
"""
general_analysis MTF 합성·정렬 점수.
"""
from __future__ import annotations
from typing import Any
import pandas as pd
from config import (
ALIGN_BB_POS_HIGH,
ALIGN_BB_POS_LOW,
ALIGN_RSI_CONFLICT_TIMING_HIGH,
ALIGN_RSI_CONFLICT_TIMING_LOW,
ALIGN_RSI_CONFLICT_TREND_HIGH,
ALIGN_RSI_CONFLICT_TREND_LOW,
ALIGN_RSI_OVERBOUGHT,
ALIGN_RSI_OVERSOLD,
)
from deepcoin.analysis.general_analysis_config import TIMING_INTERVALS, TREND_INTERVALS
from deepcoin.analysis.general_analysis_core import ga_col, interval_tf_prefix
def general_analysis_mtf_scores(
prefixed_row: dict[str, Any],
) -> dict[str, float | int | str]:
"""
간격 접두사가 붙은 스냅샷 행에서 MTF 합성 점수 계산.
Args:
prefixed_row: m3_ga_rsi 형태 flat dict.
Returns:
ga_align_* 점수.
"""
rsi_oversold = 0
rsi_overbought = 0
trend_up = 0
trend_down = 0
n_timing = 0
n_trend = 0
conflict = 0
for interval in TIMING_INTERVALS:
p = interval_tf_prefix(interval)
rk = f"{p}_RSI"
if rk in prefixed_row and prefixed_row[rk] is not None:
n_timing += 1
rsi = float(prefixed_row[rk])
if rsi < ALIGN_RSI_OVERSOLD:
rsi_oversold += 1
if rsi > ALIGN_RSI_OVERBOUGHT:
rsi_overbought += 1
for interval in TREND_INTERVALS:
p = interval_tf_prefix(interval)
sk = f"{p}_{ga_col('struct_trend')}"
if sk in prefixed_row:
n_trend += 1
t = prefixed_row[sk]
if t == "up":
trend_up += 1
elif t == "down":
trend_down += 1
m3_rsi = prefixed_row.get("m3_RSI")
d1_rsi = prefixed_row.get("d1_RSI")
if m3_rsi is not None and d1_rsi is not None:
if (
float(m3_rsi) < ALIGN_RSI_CONFLICT_TIMING_LOW
and float(d1_rsi) > ALIGN_RSI_CONFLICT_TREND_HIGH
):
conflict = 1
if (
float(m3_rsi) > ALIGN_RSI_CONFLICT_TIMING_HIGH
and float(d1_rsi) < ALIGN_RSI_CONFLICT_TREND_LOW
):
conflict = 1
timing_buy_align = rsi_oversold / max(len(TIMING_INTERVALS), 1)
timing_sell_align = rsi_overbought / max(len(TIMING_INTERVALS), 1)
return {
"ga_align_rsi_oversold_tf": rsi_oversold,
"ga_align_rsi_overbought_tf": rsi_overbought,
"ga_align_trend_up_tf": trend_up,
"ga_align_trend_down_tf": trend_down,
"ga_align_timing_buy_score": round(timing_buy_align, 3),
"ga_align_timing_sell_score": round(timing_sell_align, 3),
"ga_align_trend_score": round(
(trend_up - trend_down) / max(n_trend, 1), 3
),
"ga_align_mtf_conflict": conflict,
}
def general_analysis_mtf_vote_latest(
frames_enriched: dict[int, pd.DataFrame],
) -> dict[str, float | int | str]:
"""
각 TF 최신 완성봉 지표로 TF 가중 투표·필터 점수 산출.
Args:
frames_enriched: interval → enrich된 DataFrame.
Returns:
ga_vote_* 점수 (접두사 없음, ga_col로 감쌀 것).
"""
votes_buy = 0
votes_sell = 0
trend_ok = 0
n = 0
for interval in TIMING_INTERVALS:
df = frames_enriched.get(interval)
if df is None or df.empty:
continue
row = df.iloc[-1]
n += 1
rsi = row.get("RSI")
if rsi is not None and not pd.isna(rsi):
if float(rsi) < ALIGN_RSI_OVERSOLD:
votes_buy += 1
if float(rsi) > ALIGN_RSI_OVERBOUGHT:
votes_sell += 1
bb_pos = row.get("bb_pos")
if bb_pos is not None and float(bb_pos) < ALIGN_BB_POS_LOW:
votes_buy += 1
if bb_pos is not None and float(bb_pos) > ALIGN_BB_POS_HIGH:
votes_sell += 1
for interval in TREND_INTERVALS:
df = frames_enriched.get(interval)
if df is None or df.empty:
continue
row = df.iloc[-1]
st = row.get(ga_col("struct_trend"), "range")
if st == "up":
trend_ok += 1
elif st == "down":
trend_ok -= 1
return {
"vote_timing_buy": votes_buy,
"vote_timing_sell": votes_sell,
"vote_trend_score": trend_ok,
"vote_tf_used": n,
}
def general_analysis_vote_columns() -> list[str]:
return ["vote_timing_buy", "vote_timing_sell", "vote_trend_score", "vote_tf_used"]

View File

@@ -0,0 +1,114 @@
"""
general_analysis 캔들·차트 변환 (Heikin-Ashi, 복수봉 패턴).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from deepcoin.analysis.general_analysis_core import ga_col
def general_analysis_apply_candles(df: pd.DataFrame) -> pd.DataFrame:
"""
단일·복수 봉 캔들 패턴 및 Heikin-Ashi 컬럼 추가.
Args:
df: OHLCV.
Returns:
ga_* 캔들 컬럼이 추가된 DataFrame.
"""
out = df.copy()
o = out["Open"].astype(float)
h = out["High"].astype(float)
l = out["Low"].astype(float)
c = out["Close"].astype(float)
rng = (h - l).replace(0, np.nan)
body = (c - o).abs()
out[ga_col("body_ratio")] = (body / rng).fillna(0).clip(0, 1)
upper_wick = h - np.maximum(o, c)
lower_wick = np.minimum(o, c) - l
out[ga_col("upper_wick_ratio")] = (upper_wick / rng).fillna(0).clip(0, 1)
out[ga_col("lower_wick_ratio")] = (lower_wick / rng).fillna(0).clip(0, 1)
out[ga_col("bullish")] = (c > o).astype(int)
out[ga_col("bearish")] = (c < o).astype(int)
out[ga_col("hammer")] = (
(out[ga_col("lower_wick_ratio")] > 0.45) & (out[ga_col("body_ratio")] < 0.35)
).astype(int)
out[ga_col("shooting_star")] = (
(out[ga_col("upper_wick_ratio")] > 0.45) & (out[ga_col("body_ratio")] < 0.35)
).astype(int)
out[ga_col("doji")] = (out[ga_col("body_ratio")] < 0.1).astype(int)
prev_o, prev_c = o.shift(1), c.shift(1)
out[ga_col("bullish_engulfing")] = (
(c > o) & (prev_c < prev_o) & (c >= prev_o) & (o <= prev_c)
).astype(int)
out[ga_col("bearish_engulfing")] = (
(c < o) & (prev_c > prev_o) & (c <= prev_o) & (o >= prev_c)
).astype(int)
o2, c2 = o.shift(2), c.shift(2)
mid1 = (o.shift(1) + c.shift(1)) / 2
out[ga_col("morning_star")] = (
(c2 < o2)
& (abs(c.shift(1) - o.shift(1)) < rng.shift(1) * 0.15)
& (c > o)
& (c > mid1)
).astype(int)
out[ga_col("evening_star")] = (
(c2 > o2)
& (abs(c.shift(1) - o.shift(1)) < rng.shift(1) * 0.15)
& (c < o)
& (c < mid1)
).astype(int)
out[ga_col("three_white_soldiers")] = (
(c > o)
& (c.shift(1) > o.shift(1))
& (c.shift(2) > o.shift(2))
& (c > c.shift(1))
& (c.shift(1) > c.shift(2))
).astype(int)
out[ga_col("three_black_crows")] = (
(c < o)
& (c.shift(1) < o.shift(1))
& (c.shift(2) < o.shift(2))
& (c < c.shift(1))
& (c.shift(1) < c.shift(2))
).astype(int)
# Heikin-Ashi
ha_close = (o + h + l + c) / 4
ha_open = ha_close.copy()
ha_open.iloc[0] = (o.iloc[0] + c.iloc[0]) / 2
for i in range(1, len(out)):
ha_open.iloc[i] = (ha_open.iloc[i - 1] + ha_close.iloc[i - 1]) / 2
out[ga_col("ha_close")] = ha_close
out[ga_col("ha_open")] = ha_open
out[ga_col("ha_bull")] = (ha_close > ha_open).astype(int)
out[ga_col("ha_trend_up")] = (
(ha_close > ha_close.shift(1)) & (ha_close.shift(1) > ha_close.shift(2))
).astype(int)
return out
def general_analysis_candle_columns() -> list[str]:
return [
"body_ratio",
"hammer",
"shooting_star",
"doji",
"bullish_engulfing",
"bearish_engulfing",
"morning_star",
"evening_star",
"three_white_soldiers",
"three_black_crows",
"ha_bull",
"ha_trend_up",
]

View File

@@ -0,0 +1,159 @@
"""
general_analysis 차트 유형 (캔들·선·바·Renko·P&F).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from deepcoin.analysis.general_analysis_core import ga_col
def _renko_direction_series(close: pd.Series, brick: pd.Series) -> pd.Series:
"""ATR 기반 브릭 크기로 Renko 방향 (+1/-1/0) 시계열."""
n = len(close)
direction = pd.Series(0, index=close.index, dtype=int)
if n < 2:
return direction
price = float(close.iloc[0])
for i in range(1, n):
b = float(brick.iloc[i]) if not np.isnan(brick.iloc[i]) else float(close.diff().abs().median())
if b < 1e-9:
b = 1e-9
c = float(close.iloc[i])
if c >= price + b:
steps = int((c - price) // b)
direction.iloc[i] = 1
price += steps * b
elif c <= price - b:
steps = int((price - c) // b)
direction.iloc[i] = -1
price -= steps * b
return direction
def general_analysis_apply_chart_bars(df: pd.DataFrame) -> pd.DataFrame:
"""
봉 단위 차트 파생 컬럼 (선 기울기, Renko, P&F).
Args:
df: OHLCV (+ ga_atr_14 권장).
Returns:
ga_chart_* 시계열 컬럼 추가.
"""
out = df.copy()
c = out["Close"].astype(float)
h = out["High"].astype(float)
l = out["Low"].astype(float)
out[ga_col("chart_line_slope_1")] = c.diff()
out[ga_col("chart_bar_range_pct")] = (h - l) / c.replace(0, np.nan) * 100
from config import GA_ATR_PERIOD
brick = (
out[ga_col("atr_14")]
if ga_col("atr_14") in out.columns
else (h - l).rolling(GA_ATR_PERIOD).mean()
)
brick = brick.fillna(c.diff().abs().rolling(20).median()).replace(0, np.nan).bfill()
renko_dir = _renko_direction_series(c, brick)
out[ga_col("chart_renko_dir")] = renko_dir
out[ga_col("chart_renko_up")] = (renko_dir == 1).astype(int)
box = brick.fillna(1.0)
pnf = pd.Series(0, index=out.index, dtype=int)
col = 0
for i in range(1, len(c)):
b = float(box.iloc[i])
move = c.iloc[i] - c.iloc[i - 1]
if move >= b:
col += 1
pnf.iloc[i] = 1
elif move <= -b:
col -= 1
pnf.iloc[i] = -1
out[ga_col("chart_pnf_col")] = pnf
if "Volume" in out.columns:
v = out["Volume"].astype(float)
out[ga_col("chart_vol_spike")] = (v > v.rolling(20).mean() * 1.8).astype(int)
if ga_col("ha_trend_up") in out.columns:
out[ga_col("chart_ha_trend")] = out[ga_col("ha_trend_up")]
elif ga_col("ha_bull") in out.columns:
out[ga_col("chart_ha_trend")] = out[ga_col("ha_bull")]
return out
def general_analysis_chart_metrics(df: pd.DataFrame) -> dict[str, object]:
"""
lookback 구간 차트 요약 (마지막 봉 스냅샷용).
Returns:
ga_chart_* dict.
"""
res: dict[str, object] = {
"chart_type_candle": 1,
"chart_line_slope": 0.0,
"chart_bar_range_pct": 0.0,
"chart_ha_trend": 0,
"chart_renko_brick_up_ratio": 0.5,
"chart_renko_dir": 0,
"chart_pnf_col": 0,
"chart_vol_spike": 0,
}
if df is None or len(df) < 5:
return {ga_col(k): v for k, v in res.items()}
row = df.iloc[-1]
c = df["Close"].astype(float)
res["chart_line_slope"] = float((c.iloc[-1] - c.iloc[0]) / max(len(c) - 1, 1))
res["chart_bar_range_pct"] = float(
(df["High"].iloc[-1] - df["Low"].iloc[-1]) / max(c.iloc[-1], 1e-9) * 100
)
if ga_col("chart_renko_dir") in df.columns:
rd = df[ga_col("chart_renko_dir")].astype(float)
up = (rd == 1).sum()
down = (rd == -1).sum()
res["chart_renko_brick_up_ratio"] = round(up / max(up + down, 1), 3)
res["chart_renko_dir"] = int(rd.iloc[-1])
else:
diff = c.diff().fillna(0)
up = (diff > 0).sum()
down = (diff < 0).sum()
res["chart_renko_brick_up_ratio"] = round(up / max(up + down, 1), 3)
if ga_col("chart_pnf_col") in df.columns:
res["chart_pnf_col"] = int(df[ga_col("chart_pnf_col")].iloc[-1])
if ga_col("chart_ha_trend") in df.columns:
res["chart_ha_trend"] = int(row[ga_col("chart_ha_trend")])
elif ga_col("ha_trend_up") in df.columns:
res["chart_ha_trend"] = int(row[ga_col("ha_trend_up")])
if ga_col("chart_vol_spike") in df.columns:
res["chart_vol_spike"] = int(row[ga_col("chart_vol_spike")])
elif "Volume" in df.columns:
v = df["Volume"].astype(float)
res["chart_vol_spike"] = int(v.iloc[-1] > v.iloc[-20:].mean() * 1.8)
return {ga_col(k): v for k, v in res.items()}
def general_analysis_chart_columns() -> list[str]:
return [
"chart_type_candle",
"chart_line_slope",
"chart_line_slope_1",
"chart_bar_range_pct",
"chart_ha_trend",
"chart_renko_brick_up_ratio",
"chart_renko_dir",
"chart_renko_up",
"chart_pnf_col",
"chart_vol_spike",
]

View File

@@ -0,0 +1,26 @@
"""
general_analysis MTF 설정 (config.py 재노출).
"""
from __future__ import annotations
from config import (
CONTEXT_TAIL_ROWS,
GA_COL_PREFIX,
GENERAL_ANALYSIS_INTERVALS,
GROUND_TRUTH_FILE,
INTERVAL_PREFIX,
LOOKBACK_BARS,
REPORTS_ANALYSIS_CAPABILITY_HTML,
REPORTS_ANALYSIS_LATEST_DIR,
REPORTS_ANALYSIS_REPORT_HTML,
REPORTS_ANALYSIS_TRADES_CSV,
TIMING_INTERVALS,
TREND_INTERVALS,
)
DEFAULT_TRADES_FILE = GROUND_TRUTH_FILE
DEFAULT_OUTPUT_CSV = str(REPORTS_ANALYSIS_TRADES_CSV)
DEFAULT_OUTPUT_HTML = str(REPORTS_ANALYSIS_REPORT_HTML)
DEFAULT_CAPABILITY_HTML = str(REPORTS_ANALYSIS_CAPABILITY_HTML)
DEFAULT_LATEST_DIR = str(REPORTS_ANALYSIS_LATEST_DIR)

View File

@@ -0,0 +1,98 @@
"""
general_analysis lookback 컨텍스트 특징 (패턴·파동·VP·하모닉) 봉별 적용.
"""
from __future__ import annotations
import pandas as pd
from deepcoin.analysis.general_analysis_config import CONTEXT_TAIL_ROWS, LOOKBACK_BARS
from deepcoin.analysis.general_analysis_core import ga_col
from deepcoin.analysis.general_analysis_harmonic import (
general_analysis_harmonic_columns,
general_analysis_harmonic_snapshot,
)
from deepcoin.analysis.general_analysis_patterns import general_analysis_apply_patterns_to_bars
from deepcoin.analysis.general_analysis_volume import (
general_analysis_volume_columns,
general_analysis_volume_snapshot,
)
from deepcoin.analysis.general_analysis_wave import general_analysis_apply_wave_to_bars
def general_analysis_apply_volume_to_bars(
df: pd.DataFrame,
interval: int,
tail_rows: int | None = None,
) -> pd.DataFrame:
"""Volume Profile 컬럼을 최근 봉에 롤링 적용."""
out = df.copy()
for k in general_analysis_volume_columns():
out[ga_col(k)] = 0.0 if k != "vp_in_value_area" else 0
lb = LOOKBACK_BARS.get(interval, 80)
n = len(out)
if n < lb + 1:
return out
if tail_rows is None:
tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000)
start = max(lb, n - tail_rows)
for i in range(start, n):
snap = general_analysis_volume_snapshot(out.iloc[i - lb : i])
idx = out.index[i]
for k, v in snap.items():
out.at[idx, k] = v
return out
def general_analysis_apply_harmonic_to_bars(
df: pd.DataFrame,
interval: int,
tail_rows: int | None = None,
) -> pd.DataFrame:
"""하모닉 패턴 컬럼 롤링 적용."""
out = df.copy()
for k in general_analysis_harmonic_columns():
default = "none" if k == "harmonic_label" else 0
out[ga_col(k)] = default
lb = LOOKBACK_BARS.get(interval, 80)
n = len(out)
if n < lb + 1:
return out
if tail_rows is None:
tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000)
start = max(lb, n - tail_rows)
for i in range(start, n):
snap = general_analysis_harmonic_snapshot(out.iloc[i - lb : i])
idx = out.index[i]
for k, v in snap.items():
out.at[idx, k] = v
return out
def general_analysis_apply_context_features(
df: pd.DataFrame,
interval: int,
tail_rows: int | None = None,
) -> pd.DataFrame:
"""
패턴·파동·VP·하모닉 lookback 라벨을 봉 시계열에 병합.
Args:
df: general_analysis_enrich_bars 1단계 결과.
interval: 분봉 간격(분).
tail_rows: 롤링 적용 봉 수 상한.
Returns:
컨텍스트 ga_* 컬럼이 추가된 DataFrame.
"""
out = general_analysis_apply_patterns_to_bars(df, interval, tail_rows)
out = general_analysis_apply_wave_to_bars(out, interval, tail_rows)
out = general_analysis_apply_volume_to_bars(out, interval, tail_rows)
out = general_analysis_apply_harmonic_to_bars(out, interval, tail_rows)
return out

View File

@@ -0,0 +1,92 @@
"""
general_analysis 공통 유틸 (슬라이스·피벗·컬럼 접두사).
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from deepcoin.analysis.general_analysis_config import GA_COL_PREFIX, LOOKBACK_BARS
def ga_col(name: str) -> str:
"""general_analysis 출력 컬럼명."""
return f"{GA_COL_PREFIX}{name}"
def interval_tf_prefix(interval: int) -> str:
"""간격 접두사 (m3, d1)."""
from deepcoin.analysis.general_analysis_config import INTERVAL_PREFIX
return INTERVAL_PREFIX.get(interval, f"m{interval}")
def prefixed_snapshot(
row: pd.Series,
interval: int,
keys: list[str] | tuple[str, ...],
) -> dict[str, Any]:
"""한 봉의 ga_ 컬럼을 {m3_ga_rsi: ...} 형태로 변환."""
p = interval_tf_prefix(interval)
out: dict[str, Any] = {}
for k in keys:
col = ga_col(k)
if col in row.index:
v = row[col]
out[f"{p}_{col}"] = None if pd.isna(v) else v
return out
def slice_to_timestamp(df: pd.DataFrame, ts: pd.Timestamp) -> pd.DataFrame:
"""타점 시각 이전 완성봉만 (해당 시각 봉 미포함)."""
if df.empty:
return df
if not isinstance(df.index, pd.DatetimeIndex):
df = df.copy()
df.index = pd.to_datetime(df.index)
ts = pd.Timestamp(ts)
if ts.tzinfo is not None and df.index.tz is None:
ts = ts.tz_localize(None)
return df[df.index < ts].copy()
def lookback_slice(df: pd.DataFrame, interval: int, end_ts: pd.Timestamp) -> pd.DataFrame:
"""타점 직전 lookback 구간."""
sliced = slice_to_timestamp(df, end_ts)
n = LOOKBACK_BARS.get(interval, 80)
if len(sliced) > n:
return sliced.iloc[-n:].copy()
return sliced
def find_pivots(
highs: np.ndarray,
lows: np.ndarray,
order: int = 3,
) -> tuple[list[int], list[int]]:
"""국소 고점·저점 인덱스 (양쪽 order 봉보다 극값)."""
peak_idx: list[int] = []
trough_idx: list[int] = []
n = len(highs)
if n < order * 2 + 1:
return peak_idx, trough_idx
for i in range(order, n - order):
if highs[i] >= highs[i - order : i + order + 1].max():
peak_idx.append(i)
if lows[i] <= lows[i - order : i + order + 1].min():
trough_idx.append(i)
return peak_idx, trough_idx
def last_row_dict(df: pd.DataFrame, cols: list[str]) -> dict[str, Any]:
"""마지막 봉의 지정 컬럼 dict."""
if df.empty:
return {ga_col(c): None for c in cols}
row = df.iloc[-1]
return {
ga_col(c): (None if c not in row.index or pd.isna(row[c]) else row[c])
for c in cols
}

View File

@@ -0,0 +1,148 @@
"""
general_analysis 봉 데이터 전구간 enrich + 최신봉·기법 점검 리포트.
python scripts/03_analyze_enrich.py
python scripts/03_analyze_enrich.py --interval 1440
"""
from __future__ import annotations
import argparse
from pathlib import Path
import pandas as pd
from config import CHART_LOOKBACK_DAYS, GA_DEFAULT_TAIL_EXPORT, SYMBOL
from deepcoin.analysis.general_analysis_align import (
general_analysis_mtf_scores,
general_analysis_mtf_vote_latest,
)
from deepcoin.analysis.general_analysis_config import (
DEFAULT_CAPABILITY_HTML,
DEFAULT_LATEST_DIR,
GENERAL_ANALYSIS_INTERVALS,
)
from deepcoin.analysis.general_analysis_core import ga_col, interval_tf_prefix
from deepcoin.analysis.general_analysis_pipeline import general_analysis_enrich_bars
from deepcoin.ops.monitor import Monitor
from deepcoin.data.mtf_bb import load_frames_from_db
def _latest_row_summary(df: pd.DataFrame, prefix: str) -> dict[str, object]:
"""최신 봉의 ga_·핵심 레거시 컬럼 요약."""
if df.empty:
return {}
row = df.iloc[-1]
out: dict[str, object] = {"dt": str(df.index[-1]), "tf": prefix}
for c in df.columns:
if c.startswith("ga_") or c in ("RSI", "bb_pos", "macd_hist", "stoch_k"):
v = row[c]
if pd.isna(v):
continue
out[c] = v
return out
def write_capability_html(
summaries: dict[str, dict[str, object]],
vote: dict[str, object],
path: Path,
) -> None:
"""기법 컬럼 존재 여부·최신값 요약 HTML."""
rows = ""
for tf, snap in summaries.items():
ga_cols = [k for k in snap if str(k).startswith("ga_")]
rows += f"<tr><td>{tf}</td><td>{snap.get('dt','')}</td><td>{len(ga_cols)}</td></tr>"
vote_rows = "".join(f"<li><code>{k}</code>: {v}</li>" for k, v in vote.items())
html = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8"/>
<title>general_analysis 기법 점검</title>
<style>
body {{ font-family: "Malgun Gothic", Arial, sans-serif; margin: 24px; background: #f5f5f5; }}
table {{ border-collapse: collapse; width: 100%; background: #fff; }}
th, td {{ border: 1px solid #e2e8f0; padding: 8px; font-size: 0.88rem; }}
th {{ background: #e2e8f0; }}
</style></head><body>
<h1>general_analysis 기법 점검 ({SYMBOL})</h1>
<p>3분~일봉 enrich 완료. 최신 봉 기준 컬럼 수·MTF 투표.</p>
<h2>간격별 ga_ 컬럼 수</h2>
<table><thead><tr><th>TF</th><th>최신 시각</th><th>ga_ 컬럼 수</th></tr></thead>
<tbody>{rows}</tbody></table>
<h2>MTF 투표 (최신 봉)</h2>
<ul>{vote_rows}</ul>
<p>상세 CSV: <code>{DEFAULT_LATEST_DIR}/</code></p>
</body></html>"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(html, encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="general_analysis 8TF enrich")
parser.add_argument("--interval", type=int, default=0, help="단일 간격만 (0=전체)")
parser.add_argument(
"--tail-export",
type=int,
default=GA_DEFAULT_TAIL_EXPORT,
help="CSV 저장 최근 N봉",
)
args = parser.parse_args()
from deepcoin.paths import ANALYSIS_CAPABILITY_HTML, ANALYSIS_LATEST_DIR
intervals = (
(args.interval,)
if args.interval > 0
else GENERAL_ANALYSIS_INTERVALS
)
print(f"=== general_analysis enrich {SYMBOL} ===")
mon = Monitor(cooldown_file=None)
frames = load_frames_from_db(mon, SYMBOL, lookback_days=CHART_LOOKBACK_DAYS)
if not frames:
raise RuntimeError("coins.db 데이터 없음")
enriched: dict[int, pd.DataFrame] = {}
summaries: dict[str, dict[str, object]] = {}
out_dir = ANALYSIS_LATEST_DIR
out_dir.mkdir(parents=True, exist_ok=True)
for iv in intervals:
raw = frames.get(iv)
if raw is None or raw.empty:
print(f" skip {iv}: no data")
continue
p = interval_tf_prefix(iv)
print(f" [{p}] enrich {len(raw)} bars...")
ef = general_analysis_enrich_bars(raw, iv, full_context=True)
enriched[iv] = ef
tail = ef.tail(args.tail_export)
csv_path = out_dir / f"{p}_latest.csv"
tail.to_csv(csv_path, encoding="utf-8-sig")
print(f" -> {csv_path} ({len(tail)} rows x {len(tail.columns)} cols)")
summaries[p] = _latest_row_summary(ef, p)
flat_vote: dict[str, object] = {}
if len(enriched) >= 2:
for k, v in general_analysis_mtf_vote_latest(enriched).items():
flat_vote[ga_col(k)] = v
prefixed = {}
for iv, ef in enriched.items():
p = interval_tf_prefix(iv)
row = ef.iloc[-1]
for c in ("RSI", "bb_pos"):
if c in row.index:
prefixed[f"{p}_{c}"] = row[c]
st = row.get(ga_col("struct_trend"))
if st is not None:
prefixed[f"{p}_{ga_col('struct_trend')}"] = st
flat_vote.update(general_analysis_mtf_scores(prefixed))
write_capability_html(summaries, flat_vote, ANALYSIS_CAPABILITY_HTML)
print(f"점검 리포트: {cap_path}")
print("완료.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,72 @@
"""
general_analysis 하모닉 패턴 (Gartley, Bat 근사).
"""
from __future__ import annotations
import numpy as np
from deepcoin.analysis.general_analysis_core import find_pivots, ga_col
def _ratio(a: float, b: float) -> float:
if abs(b) < 1e-12:
return 0.0
return abs(a / b)
def _near(x: float, target: float, tol: float = 0.08) -> bool:
return abs(x - target) <= tol
def general_analysis_harmonic_snapshot(win) -> dict[str, object]:
"""
최근 5개 피벗으로 Gartley·Bat 유사 비율 검사.
Args:
win: OHLCV DataFrame.
Returns:
ga_harmonic_* dict.
"""
res: dict[str, object] = {
"harmonic_gartley": 0,
"harmonic_bat": 0,
"harmonic_label": "none",
}
if win is None or len(win) < 30:
return {ga_col(k): v for k, v in res.items()}
h = win["High"].astype(float).values
l = win["Low"].astype(float).values
peaks, troughs = find_pivots(h, l, order=2)
pivots = sorted([(i, "H", h[i]) for i in peaks] + [(i, "L", l[i]) for i in troughs])
if len(pivots) < 5:
return {ga_col(k): v for k, v in res.items()}
pts = pivots[-5:]
prices = [p[2] for p in pts]
xa = abs(prices[1] - prices[0])
ab = abs(prices[2] - prices[1])
bc = abs(prices[3] - prices[2])
cd = abs(prices[4] - prices[3])
if xa < 1e-9:
return {ga_col(k): v for k, v in res.items()}
r_ab = _ratio(ab, xa)
r_bc = _ratio(bc, ab) if ab > 1e-9 else 0.0
r_cd = _ratio(cd, bc) if bc > 1e-9 else 0.0
if _near(r_ab, 0.618) and 0.35 <= r_bc <= 0.95 and 1.1 <= r_cd <= 1.75:
res["harmonic_gartley"] = 1
res["harmonic_label"] = "gartley"
if _near(r_ab, 0.382) and 0.35 <= r_bc <= 0.95 and 1.5 <= r_cd <= 2.1:
res["harmonic_bat"] = 1
res["harmonic_label"] = "bat"
return {ga_col(k): v for k, v in res.items()}
def general_analysis_harmonic_columns() -> list[str]:
return ["harmonic_gartley", "harmonic_bat", "harmonic_label"]

View File

@@ -0,0 +1,382 @@
"""
general_analysis 확장 기술적 지표 (추세·모멘텀·변동성·거래량).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from config import (
BB_PERIOD,
BB_STD,
GA_ADX_PERIOD,
GA_ADX_TREND_THRESHOLD,
GA_AO_FAST,
GA_AO_SLOW,
GA_ATR_PERIOD,
GA_BB_SQUEEZE_QUANTILE,
GA_BB_SQUEEZE_WINDOW,
GA_CCI_PERIOD,
GA_CMF_PERIOD,
GA_DIVERGENCE_LOOKBACK,
GA_DONCHIAN_PERIOD,
GA_EMA_SPANS,
GA_HV_ANNUALIZE_SQRT,
GA_HV_PERCENTILE_WINDOW,
GA_HV_ROLLING_BARS,
GA_KELTNER_ATR_MULT,
GA_LINREG_WINDOW,
GA_MFI_PERIOD,
GA_PSAR_AF_MAX,
GA_PSAR_AF_START,
GA_PSAR_AF_STEP,
GA_ROC_PERIOD,
GA_SMA_PERIODS,
GA_SUPERTREND_ATR_MULT,
GA_VOL_MA_WINDOW,
GA_WILLIAMS_PERIOD,
)
from deepcoin.analysis.general_analysis_core import ga_col
from deepcoin.common.indicators import apply_bar_indicators
def _ema(series: pd.Series, span: int) -> pd.Series:
return series.ewm(span=span, adjust=False).mean()
def _parabolic_sar(
high: np.ndarray,
low: np.ndarray,
af_start: float = GA_PSAR_AF_START,
af_step: float = GA_PSAR_AF_STEP,
af_max: float = GA_PSAR_AF_MAX,
) -> tuple[np.ndarray, np.ndarray]:
"""
Parabolic SAR 시계열.
Returns:
(sar, bull_flag 0/1)
"""
n = len(high)
sar = np.zeros(n)
bull = np.ones(n, dtype=int)
if n < 2:
return sar, bull
is_bull = True
af = af_start
ep = high[0]
sar[0] = low[0]
for i in range(1, n):
prev_sar = sar[i - 1]
if is_bull:
sar[i] = prev_sar + af * (ep - prev_sar)
sar[i] = min(sar[i], low[i - 1], low[i] if i > 0 else low[i - 1])
if low[i] < sar[i]:
is_bull = False
sar[i] = ep
ep = low[i]
af = af_start
else:
if high[i] > ep:
ep = high[i]
af = min(af + af_step, af_max)
else:
sar[i] = prev_sar + af * (ep - prev_sar)
sar[i] = max(sar[i], high[i - 1], high[i] if i > 0 else high[i - 1])
if high[i] > sar[i]:
is_bull = True
sar[i] = ep
ep = high[i]
af = af_start
else:
if low[i] < ep:
ep = low[i]
af = min(af + af_step, af_max)
bull[i] = int(is_bull)
return sar, bull
def general_analysis_apply_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
기존 apply_bar_indicators 위에 ga_ 접두사 확장 지표를 추가합니다.
Args:
df: OHLCV.
Returns:
ga_* 컬럼이 추가된 DataFrame.
"""
out = apply_bar_indicators(df.copy())
o = out["Open"].astype(float)
h = out["High"].astype(float)
l = out["Low"].astype(float)
c = out["Close"].astype(float)
v = out["Volume"].astype(float)
# --- 추세: MA ---
sma_fast = GA_SMA_PERIODS[0] if GA_SMA_PERIODS else 5
sma_slow = GA_SMA_PERIODS[1] if len(GA_SMA_PERIODS) > 1 else 20
for p in GA_SMA_PERIODS:
ma = c.rolling(p).mean()
out[ga_col(f"sma_{p}")] = ma
out[ga_col(f"close_vs_sma_{p}_pct")] = (c / ma.replace(0, np.nan) - 1) * 100
ema_fast = GA_EMA_SPANS[0] if GA_EMA_SPANS else 12
ema_slow = GA_EMA_SPANS[1] if len(GA_EMA_SPANS) > 1 else 26
out[ga_col(f"ema_{ema_fast}")] = _ema(c, ema_fast)
out[ga_col(f"ema_{ema_slow}")] = _ema(c, ema_slow)
out[ga_col("golden_cross")] = (
(out[ga_col(f"sma_{sma_fast}")] > out[ga_col(f"sma_{sma_slow}")])
& (out[ga_col(f"sma_{sma_fast}")].shift(1) <= out[ga_col(f"sma_{sma_slow}")].shift(1))
).astype(int)
out[ga_col("death_cross")] = (
(out[ga_col(f"sma_{sma_fast}")] < out[ga_col(f"sma_{sma_slow}")])
& (out[ga_col(f"sma_{sma_fast}")].shift(1) >= out[ga_col(f"sma_{sma_slow}")].shift(1))
).astype(int)
# --- ATR / 변동성 ---
tr = pd.concat(
[
h - l,
(h - c.shift(1)).abs(),
(l - c.shift(1)).abs(),
],
axis=1,
).max(axis=1)
atr = tr.rolling(GA_ATR_PERIOD).mean()
out[ga_col("atr_14")] = atr
out[ga_col("atr_pct")] = atr / c.replace(0, np.nan) * 100
out[ga_col("bb_width_pct")] = out.get("BB_Width", (out["Upper"] - out["Lower"]) / out["MA"] * 100)
bw = out[ga_col("bb_width_pct")].astype(float)
out[ga_col("bb_squeeze")] = (
bw < bw.rolling(GA_BB_SQUEEZE_WINDOW).quantile(GA_BB_SQUEEZE_QUANTILE)
).astype(int)
dc = GA_DONCHIAN_PERIOD
out[ga_col("donchian_high_20")] = h.rolling(dc).max()
out[ga_col("donchian_low_20")] = l.rolling(dc).min()
out[ga_col("donchian_pos")] = (c - out[ga_col("donchian_low_20")]) / (
out[ga_col("donchian_high_20")] - out[ga_col("donchian_low_20")]
).replace(0, np.nan)
# --- 모멘텀: CCI, Williams %R ---
tp = (h + l + c) / 3
cci_period = GA_CCI_PERIOD
sma_tp = tp.rolling(cci_period).mean()
mad = tp.rolling(cci_period).apply(lambda x: np.abs(x - x.mean()).mean(), raw=True)
out[ga_col("cci_20")] = (tp - sma_tp) / (0.015 * mad.replace(0, np.nan))
out[ga_col("cci_oversold")] = (out[ga_col("cci_20")] < -100).astype(int)
out[ga_col("cci_overbought")] = (out[ga_col("cci_20")] > 100).astype(int)
hh = h.rolling(GA_WILLIAMS_PERIOD).max()
ll = l.rolling(GA_WILLIAMS_PERIOD).min()
out[ga_col("williams_r")] = (hh - c) / (hh - ll).replace(0, np.nan) * -100
out[ga_col("williams_oversold")] = (out[ga_col("williams_r")] < -80).astype(int)
out[ga_col("williams_overbought")] = (out[ga_col("williams_r")] > -20).astype(int)
div_lb = GA_DIVERGENCE_LOOKBACK
out[ga_col("roc_10")] = (c / c.shift(GA_ROC_PERIOD).replace(0, np.nan) - 1) * 100
# MFI
raw_mf = tp * v
pos_mf = raw_mf.where(tp > tp.shift(1), 0.0).rolling(GA_MFI_PERIOD).sum()
neg_mf = raw_mf.where(tp < tp.shift(1), 0.0).rolling(GA_MFI_PERIOD).sum()
mfr = pos_mf / neg_mf.replace(0, np.nan)
out[ga_col("mfi_14")] = 100 - (100 / (1 + mfr))
# MACD / RSI / Stoch 다이버전스
if "RSI" in out.columns and "macd_hist" in out.columns:
price_up = (c > c.shift(div_lb)).astype(int)
rsi_up = (out["RSI"] > out["RSI"].shift(div_lb)).astype(int)
macd_up = (out["macd_hist"] > out["macd_hist"].shift(div_lb)).astype(int)
out[ga_col("rsi_bull_div")] = ((price_up == 0) & (rsi_up == 1)).astype(int)
out[ga_col("rsi_bear_div")] = ((price_up == 1) & (rsi_up == 0)).astype(int)
out[ga_col("macd_bull_div")] = ((price_up == 0) & (macd_up == 1)).astype(int)
out[ga_col("macd_bear_div")] = ((price_up == 1) & (macd_up == 0)).astype(int)
if "stoch_k" in out.columns:
price_up = (c > c.shift(div_lb)).astype(int)
st_up = (out["stoch_k"] > out["stoch_k"].shift(div_lb)).astype(int)
out[ga_col("stoch_bull_div")] = ((price_up == 0) & (st_up == 1)).astype(int)
out[ga_col("stoch_bear_div")] = ((price_up == 1) & (st_up == 0)).astype(int)
# 봉 간 변화 (타점 Δ와 동일 정의, 전 구간)
if "RSI" in out.columns:
out[ga_col("rsi_delta_1")] = out["RSI"].diff()
if "macd_hist" in out.columns:
out[ga_col("macd_hist_delta_1")] = out["macd_hist"].diff()
if "stoch_k" in out.columns:
out[ga_col("stoch_k_delta_1")] = out["stoch_k"].diff()
# --- 거래량 ---
vol_ma = v.rolling(GA_VOL_MA_WINDOW).mean()
out[ga_col("vol_ma20")] = vol_ma
out[ga_col("vol_ratio")] = v / vol_ma.replace(0, np.nan)
out[ga_col("obv")] = (np.sign(c.diff().fillna(0)) * v).cumsum()
obv = out[ga_col("obv")].astype(float)
out[ga_col("obv_slope_10")] = obv - obv.shift(div_lb)
out[ga_col("obv_bull_div")] = (
(c < c.shift(div_lb)) & (obv > obv.shift(div_lb))
).astype(int)
out[ga_col("obv_bear_div")] = (
(c > c.shift(div_lb)) & (obv < obv.shift(div_lb))
).astype(int)
# CMF
mfv = ((c - l) - (h - c)) / (h - l).replace(0, np.nan) * v
out[ga_col("cmf_20")] = (
mfv.rolling(GA_CMF_PERIOD).sum() / v.rolling(GA_CMF_PERIOD).sum().replace(0, np.nan)
)
# Accumulation/Distribution Line
clv = ((c - l) - (h - c)) / (h - l).replace(0, np.nan)
out[ga_col("ad_line")] = (clv * v).cumsum()
ad = out[ga_col("ad_line")].astype(float)
out[ga_col("ad_slope_10")] = ad - ad.shift(div_lb)
# VWAP 근사 (누적, 세션 리셋 없음)
cum_vp = (tp * v).cumsum()
cum_v = v.cumsum().replace(0, np.nan)
out[ga_col("vwap")] = cum_vp / cum_v
out[ga_col("close_vs_vwap_pct")] = (c / out[ga_col("vwap")] - 1) * 100
# Keltner Channel
k_mid = _ema(c, BB_PERIOD)
out[ga_col("keltner_mid")] = k_mid
out[ga_col("keltner_upper")] = k_mid + GA_KELTNER_ATR_MULT * atr
out[ga_col("keltner_lower")] = k_mid - GA_KELTNER_ATR_MULT * atr
out[ga_col("keltner_pos")] = (c - out[ga_col("keltner_lower")]) / (
out[ga_col("keltner_upper")] - out[ga_col("keltner_lower")]
).replace(0, np.nan)
# Awesome Oscillator: median price SMA5 - SMA34
mp = (h + l) / 2
out[ga_col("ao")] = mp.rolling(GA_AO_FAST).mean() - mp.rolling(GA_AO_SLOW).mean()
out[ga_col("ao_bull")] = (
(out[ga_col("ao")] > 0) & (out[ga_col("ao")].shift(1) <= 0)
).astype(int)
out[ga_col("ao_bear")] = (
(out[ga_col("ao")] < 0) & (out[ga_col("ao")].shift(1) >= 0)
).astype(int)
# Historical Volatility (로그수익 20봉 표준편차, 연율화 계수 1=봉 단위)
log_ret = np.log(c / c.shift(1).replace(0, np.nan))
hv = log_ret.rolling(GA_HV_ROLLING_BARS).std() * GA_HV_ANNUALIZE_SQRT
out[ga_col("hv_20")] = hv
out[ga_col("hv_percentile")] = hv.rolling(GA_HV_PERCENTILE_WINDOW).apply(
lambda x: float((x[:-1] < x[-1]).mean()) if len(x) > 1 and not np.isnan(x[-1]) else 0.5,
raw=True,
)
# Parabolic SAR
sar, psar_bull = _parabolic_sar(h.values, l.values)
out[ga_col("psar")] = sar
out[ga_col("psar_bull")] = psar_bull
ps = pd.Series(psar_bull, index=out.index)
out[ga_col("psar_flip_bull")] = ((ps == 1) & (ps.shift(1) == 0)).astype(int)
out[ga_col("psar_flip_bear")] = ((ps == 0) & (ps.shift(1) == 1)).astype(int)
# ADX
up_move = h.diff()
down_move = -l.diff()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
atr_safe = atr.replace(0, np.nan)
plus_di = 100 * pd.Series(plus_dm, index=out.index).rolling(GA_ADX_PERIOD).mean() / atr_safe
minus_di = 100 * pd.Series(minus_dm, index=out.index).rolling(GA_ADX_PERIOD).mean() / atr_safe
dx = (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan) * 100
out[ga_col("adx_14")] = dx.rolling(GA_ADX_PERIOD).mean()
out[ga_col("plus_di")] = plus_di
out[ga_col("minus_di")] = minus_di
out[ga_col("adx_trending")] = (out[ga_col("adx_14")] > GA_ADX_TREND_THRESHOLD).astype(int)
# Supertrend 방향 (ATR 밴드)
hl2 = (h + l) / 2
upper = hl2 + GA_SUPERTREND_ATR_MULT * atr
lower = hl2 - GA_SUPERTREND_ATR_MULT * atr
out[ga_col("supertrend_bull")] = (c > lower).astype(int)
# Linear regression slope 20
def _lin_slope(y: np.ndarray) -> float:
if len(y) < 2:
return 0.0
x = np.arange(len(y))
coef = np.polyfit(x, y, 1)
return float(coef[0])
out[ga_col("linreg_slope_20")] = c.rolling(GA_LINREG_WINDOW).apply(_lin_slope, raw=True)
def _lin_r2(y: np.ndarray) -> float:
if len(y) < 3:
return 0.0
x = np.arange(len(y))
coef = np.polyfit(x, y, 1)
pred = coef[0] * x + coef[1]
ss_res = ((y - pred) ** 2).sum()
ss_tot = ((y - y.mean()) ** 2).sum()
if ss_tot < 1e-12:
return 0.0
return float(1 - ss_res / ss_tot)
out[ga_col("linreg_r2_20")] = c.rolling(GA_LINREG_WINDOW).apply(_lin_r2, raw=True)
return out
def general_analysis_indicator_columns() -> list[str]:
"""스냅샷용 ga_ 지표 컬럼 목록."""
return [
"sma_5",
"sma_20",
"sma_60",
"close_vs_sma_20_pct",
"golden_cross",
"death_cross",
"atr_14",
"atr_pct",
"bb_squeeze",
"donchian_pos",
"cci_20",
"cci_oversold",
"cci_overbought",
"williams_r",
"williams_oversold",
"williams_overbought",
"roc_10",
"mfi_14",
"rsi_bull_div",
"rsi_bear_div",
"macd_bull_div",
"macd_bear_div",
"stoch_bull_div",
"stoch_bear_div",
"rsi_delta_1",
"macd_hist_delta_1",
"stoch_k_delta_1",
"keltner_pos",
"ao",
"ao_bull",
"ao_bear",
"hv_20",
"hv_percentile",
"ad_line",
"ad_slope_10",
"vol_ratio",
"obv_slope_10",
"obv_bull_div",
"obv_bear_div",
"cmf_20",
"close_vs_vwap_pct",
"adx_14",
"adx_trending",
"supertrend_bull",
"linreg_slope_20",
"linreg_r2_20",
"psar",
"psar_bull",
"psar_flip_bull",
"psar_flip_bear",
]

View File

@@ -0,0 +1,302 @@
"""
general_analysis 차트·가격 패턴 (반전·지속·박스).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from config import GA_PATTERN_TOLERANCE_PCT, GA_PIVOT_ORDER
from deepcoin.analysis.general_analysis_core import find_pivots, ga_col, last_row_dict
def _pct_diff(a: float, b: float) -> float:
return abs(a - b) / max(abs(a), abs(b), 1e-9) * 100
def general_analysis_detect_patterns(win: pd.DataFrame) -> dict[str, int | float | str | None]:
"""
lookback 윈도우 마지막 봉 기준 패턴 라벨 (0/1 및 요약).
Args:
win: OHLCV (+ 지표 선택).
Returns:
ga_pattern_* 키 dict (접두사 없음, ga_col로 감쌈).
"""
res: dict[str, int | float | str | None] = {
"pattern_double_top": 0,
"pattern_double_bottom": 0,
"pattern_head_shoulders": 0,
"pattern_inv_head_shoulders": 0,
"pattern_triangle_sym": 0,
"pattern_triangle_asc": 0,
"pattern_triangle_desc": 0,
"pattern_flag_bull": 0,
"pattern_flag_bear": 0,
"pattern_wedge_rising": 0,
"pattern_wedge_falling": 0,
"pattern_rectangle": 0,
"pattern_channel_up": 0,
"pattern_channel_down": 0,
"pattern_measured_move": 0,
"pattern_rounding_top": 0,
"pattern_rounding_bottom": 0,
"pattern_gap_up": 0,
"pattern_gap_down": 0,
"pattern_v_bottom": 0,
"pattern_spike_top": 0,
"pattern_triple_top": 0,
"pattern_triple_bottom": 0,
"pattern_cup_handle": 0,
"pattern_keystone_bull": 0,
"pattern_keystone_bear": 0,
"pattern_island_top": 0,
"pattern_island_bottom": 0,
"pattern_label": "none",
}
if win is None or len(win) < 20:
return res
h = win["High"].astype(float).values
l = win["Low"].astype(float).values
c = win["Close"].astype(float).values
peaks, troughs = find_pivots(h, l, order=GA_PIVOT_ORDER)
tol = GA_PATTERN_TOLERANCE_PCT
if len(peaks) >= 3:
p1, p2, p3 = peaks[-3], peaks[-2], peaks[-1]
if (
_pct_diff(h[p1], h[p2]) < tol
and _pct_diff(h[p2], h[p3]) < tol
and p1 < p2 < p3
):
res["pattern_triple_top"] = 1
res["pattern_label"] = "triple_top"
if len(troughs) >= 3:
t1, t2, t3 = troughs[-3], troughs[-2], troughs[-1]
if (
_pct_diff(l[t1], l[t2]) < tol
and _pct_diff(l[t2], l[t3]) < tol
and t1 < t2 < t3
):
res["pattern_triple_bottom"] = 1
res["pattern_label"] = "triple_bottom"
if len(peaks) >= 2:
p1, p2 = peaks[-2], peaks[-1]
if _pct_diff(h[p1], h[p2]) < tol:
res["pattern_double_top"] = 1
if res["pattern_label"] == "none":
res["pattern_label"] = "double_top"
if len(troughs) >= 2:
t1, t2 = troughs[-2], troughs[-1]
if _pct_diff(l[t1], l[t2]) < tol:
res["pattern_double_bottom"] = 1
res["pattern_label"] = "double_bottom"
if len(peaks) >= 3:
i, j, k = peaks[-3], peaks[-2], peaks[-1]
if h[j] > h[i] and h[j] > h[k] and _pct_diff(h[i], h[k]) < tol * 1.5:
res["pattern_head_shoulders"] = 1
res["pattern_label"] = "head_shoulders"
if len(troughs) >= 3:
i, j, k = troughs[-3], troughs[-2], troughs[-1]
if l[j] < l[i] and l[j] < l[k] and _pct_diff(l[i], l[k]) < tol * 1.5:
res["pattern_inv_head_shoulders"] = 1
res["pattern_label"] = "inv_head_shoulders"
n = len(win)
x = np.arange(n)
high_slope = np.polyfit(x, h, 1)[0]
low_slope = np.polyfit(x, l, 1)[0]
if high_slope < 0 and low_slope > 0:
res["pattern_triangle_sym"] = 1
if res["pattern_label"] == "none":
res["pattern_label"] = "triangle_sym"
if high_slope < 0 and low_slope > 0 and low_slope > abs(high_slope) * 0.5:
res["pattern_triangle_asc"] = 1
if high_slope < 0 and low_slope < 0 and abs(high_slope) > abs(low_slope) * 0.5:
res["pattern_triangle_desc"] = 1
rng_pct = (h.max() - l.min()) / max(c[-1], 1e-9) * 100
if rng_pct < 8 and abs(high_slope) < c[-1] * 0.0001:
res["pattern_rectangle"] = 1
if res["pattern_label"] == "none":
res["pattern_label"] = "rectangle"
leg = max(n // 3, 5)
if n > leg * 2:
first_move = (c[leg] - c[0]) / max(c[0], 1e-9) * 100
channel = (c[-1] - c[-leg]) / max(c[-leg], 1e-9) * 100
if first_move > 5 and abs(channel) < 3:
res["pattern_flag_bull"] = 1
res["pattern_label"] = "flag_bull"
if first_move < -5 and abs(channel) < 3:
res["pattern_flag_bear"] = 1
res["pattern_label"] = "flag_bear"
if high_slope > 0 and low_slope > 0:
res["pattern_wedge_rising"] = 1
if high_slope < 0 and low_slope < 0:
res["pattern_wedge_falling"] = 1
if high_slope > 0 and low_slope > 0:
res["pattern_channel_up"] = 1
if high_slope < 0 and low_slope < 0:
res["pattern_channel_down"] = 1
if len(c) >= 15:
mid = len(c) // 2
first_half = c[:mid].mean()
second_half = c[mid:].mean()
if c[0] > c[mid] * 1.08 and c[-1] > c[mid] * 1.05:
res["pattern_v_bottom"] = 1
res["pattern_label"] = "v_bottom"
if c[0] < c[-1] * 0.92 and c.max() > c[0] * 1.1:
res["pattern_spike_top"] = 1
o = win["Open"].astype(float).values
gap_ups: list[int] = []
gap_downs: list[int] = []
for i in range(1, min(30, n)):
if l[i] > h[i - 1]:
res["pattern_gap_up"] = 1
gap_ups.append(i)
if h[i] < l[i - 1]:
res["pattern_gap_down"] = 1
gap_downs.append(i)
for gi in gap_ups:
for gd in gap_downs:
if gd > gi and h[gi] < l[gd]:
res["pattern_island_top"] = 1
res["pattern_label"] = "island_top"
if gd > gi and l[gi] > h[gd]:
res["pattern_island_bottom"] = 1
res["pattern_label"] = "island_bottom"
# 키리스톤: 상단 수평 + 하단 상승(역키리스톤) 또는 하단 수평 + 상단 하락
if abs(high_slope) < c[-1] * 0.00005 and low_slope > 0:
res["pattern_keystone_bull"] = 1
if res["pattern_label"] == "none":
res["pattern_label"] = "keystone_bull"
if abs(low_slope) < c[-1] * 0.00005 and high_slope < 0:
res["pattern_keystone_bear"] = 1
if res["pattern_label"] == "none":
res["pattern_label"] = "keystone_bear"
# 컵앤핸들: 전반 U자 + 후반 15% 소폭 조정
if n >= 40:
cup_len = int(n * 0.65)
handle_len = max(int(n * 0.15), 5)
cup = c[:cup_len]
handle = c[-handle_len:]
rim = float(max(cup[0], cup[-1]))
bottom = float(cup.min())
depth = rim - bottom
if depth > rim * 0.08 and float(cup[-1]) > bottom + depth * 0.5:
handle_pull = float(handle.max() - handle.min())
if handle_pull < depth * 0.5 and float(c[-1]) >= rim * 0.98:
res["pattern_cup_handle"] = 1
res["pattern_label"] = "cup_handle"
if len(c) >= 30:
ma = pd.Series(c).rolling(10).mean()
if float(ma.iloc[-1]) > float(ma.iloc[-15]) > float(ma.iloc[-30]):
res["pattern_rounding_bottom"] = 1
if float(ma.iloc[-1]) < float(ma.iloc[-15]) < float(ma.iloc[-30]):
res["pattern_rounding_top"] = 1
if len(peaks) >= 2 and len(troughs) >= 2:
leg_h = h[peaks[-1]] - l[troughs[-1]]
if leg_h > 0 and c[-1] >= l[troughs[-1]] + leg_h * 0.9:
res["pattern_measured_move"] = 1
return res
def general_analysis_pattern_snapshot(win: pd.DataFrame) -> dict[str, object]:
"""패턴 dict → ga_pattern_* 컬럼명."""
raw = general_analysis_detect_patterns(win)
return {ga_col(k): v for k, v in raw.items()}
def general_analysis_pattern_columns() -> list[str]:
return [
"pattern_double_top",
"pattern_double_bottom",
"pattern_head_shoulders",
"pattern_inv_head_shoulders",
"pattern_triangle_sym",
"pattern_triangle_asc",
"pattern_triangle_desc",
"pattern_flag_bull",
"pattern_flag_bear",
"pattern_wedge_rising",
"pattern_wedge_falling",
"pattern_rectangle",
"pattern_channel_up",
"pattern_channel_down",
"pattern_measured_move",
"pattern_rounding_top",
"pattern_rounding_bottom",
"pattern_gap_up",
"pattern_gap_down",
"pattern_v_bottom",
"pattern_spike_top",
"pattern_triple_top",
"pattern_triple_bottom",
"pattern_cup_handle",
"pattern_keystone_bull",
"pattern_keystone_bear",
"pattern_island_top",
"pattern_island_bottom",
"pattern_label",
]
def general_analysis_apply_patterns_to_bars(
df: pd.DataFrame,
interval: int,
tail_rows: int | None = None,
) -> pd.DataFrame:
"""
lookback 윈도우 패턴 라벨을 봉별 컬럼으로 채움 (최근 tail_rows만, 성능).
Args:
df: OHLCV (+ 선택적 지표).
interval: 분봉 간격.
tail_rows: None이면 전체(8천봉 이하) 또는 config tail.
Returns:
ga_pattern_* 컬럼이 추가된 DataFrame.
"""
from deepcoin.analysis.general_analysis_config import CONTEXT_TAIL_ROWS, LOOKBACK_BARS
out = df.copy()
lb = LOOKBACK_BARS.get(interval, 80)
keys = [k for k in general_analysis_pattern_columns() if k != "pattern_label"]
for k in keys:
out[ga_col(k)] = 0
out[ga_col("pattern_label")] = "none"
n = len(out)
if n < lb + 1:
return out
if tail_rows is None:
tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000)
start = max(lb, n - tail_rows)
for i in range(start, n):
win = out.iloc[i - lb : i]
det = general_analysis_detect_patterns(win)
idx = out.index[i]
for k, v in det.items():
out.at[idx, ga_col(k)] = v
return out

View File

@@ -0,0 +1,154 @@
"""
general_analysis 전체 파이프라인 (지표·캔들·한 봉 특징).
"""
from __future__ import annotations
import pandas as pd
from deepcoin.common.candle_features import compute_bar_features
from deepcoin.analysis.general_analysis_candles import (
general_analysis_apply_candles,
general_analysis_candle_columns,
)
from deepcoin.analysis.general_analysis_chart import (
general_analysis_apply_chart_bars,
general_analysis_chart_columns,
general_analysis_chart_metrics,
)
from deepcoin.analysis.general_analysis_context import general_analysis_apply_context_features
from deepcoin.analysis.general_analysis_harmonic import (
general_analysis_harmonic_columns,
general_analysis_harmonic_snapshot,
)
from deepcoin.analysis.general_analysis_volume import (
general_analysis_volume_columns,
general_analysis_volume_snapshot,
)
from deepcoin.analysis.general_analysis_core import ga_col, lookback_slice
from deepcoin.analysis.general_analysis_indicators import (
general_analysis_apply_indicators,
general_analysis_indicator_columns,
)
from deepcoin.analysis.general_analysis_patterns import (
general_analysis_pattern_columns,
general_analysis_pattern_snapshot,
)
from deepcoin.analysis.general_analysis_wave import (
general_analysis_wave_columns,
general_analysis_wave_snapshot,
)
def general_analysis_enrich_bars(
df: pd.DataFrame,
interval: int | None = None,
*,
full_context: bool = True,
) -> pd.DataFrame:
"""
OHLCV → candle_features + ga 지표 + 캔들 + 차트 + (선택) lookback 컨텍스트.
Args:
df: raw OHLCV.
interval: 분봉 간격. full_context=True일 때 필수.
full_context: 패턴·VP·파동·하모닉 롤링 적용.
Returns:
전체 특징 컬럼 DataFrame.
"""
base = compute_bar_features(df)
out = general_analysis_apply_indicators(base)
out = general_analysis_apply_candles(out)
out = general_analysis_apply_chart_bars(out)
if full_context and interval is not None:
out = general_analysis_apply_context_features(out, interval)
return out
def general_analysis_snapshot_at_bar(
enriched: pd.DataFrame,
ts: pd.Timestamp,
interval: int,
) -> dict[str, object]:
"""
타점 시각 직전 완성봉 + lookback 패턴·파동·차트 메타.
Args:
enriched: general_analysis_enrich_bars 결과.
ts: 타점 시각.
interval: 분봉 간격.
Returns:
flat dict (ga_ 키 + legacy bb/rsi where present).
"""
win = lookback_slice(enriched, interval, ts)
snap: dict[str, object] = {}
if win.empty:
return snap
row = win.iloc[-1]
legacy_cols = [
"bb_pos",
"RSI",
"macd_hist",
"stoch_k",
"stoch_d",
"macd_line",
"macd_signal",
"BB_Width",
]
for c in legacy_cols:
if c in row.index and not pd.isna(row[c]):
snap[c] = float(row[c]) if isinstance(row[c], (int, float)) else row[c]
for c in general_analysis_indicator_columns():
col = ga_col(c)
if col in row.index:
v = row[col]
snap[col] = None if pd.isna(v) else v
for c in general_analysis_candle_columns():
col = ga_col(c)
if col in row.index:
snap[col] = int(row[col]) if not pd.isna(row[col]) else 0
pat_cols = [ga_col(c) for c in general_analysis_pattern_columns()]
if pat_cols and pat_cols[0] in enriched.columns:
for col in pat_cols:
if col in row.index:
snap[col] = row[col]
else:
snap.update(general_analysis_pattern_snapshot(win))
wave_cols = [ga_col(c) for c in general_analysis_wave_columns()]
if wave_cols and wave_cols[0] in enriched.columns:
for col in wave_cols:
if col in row.index:
snap[col] = row[col]
else:
snap.update(general_analysis_wave_snapshot(win))
snap.update(general_analysis_volume_snapshot(win))
snap.update(general_analysis_harmonic_snapshot(win))
snap.update(general_analysis_chart_metrics(win))
return snap
def general_analysis_all_snapshot_keys() -> list[str]:
"""CSV 헤더용 전체 키 목록 (간격 접두사 제외)."""
keys = list(legacy_snapshot_keys())
keys += [ga_col(c) for c in general_analysis_indicator_columns()]
keys += [ga_col(c) for c in general_analysis_candle_columns()]
keys += [ga_col(c) for c in general_analysis_pattern_columns()]
keys += [ga_col(c) for c in general_analysis_wave_columns()]
keys += [ga_col(c) for c in general_analysis_chart_columns()]
keys += [ga_col(c) for c in general_analysis_volume_columns()]
keys += [ga_col(c) for c in general_analysis_harmonic_columns()]
return keys
def legacy_snapshot_keys() -> list[str]:
return ["bb_pos", "RSI", "macd_hist", "stoch_k", "stoch_d"]

View File

@@ -0,0 +1,73 @@
"""
general_analysis HTML 요약 리포트.
"""
from __future__ import annotations
from pathlib import Path
import pandas as pd
from deepcoin.analysis.general_analysis_config import DEFAULT_OUTPUT_CSV, DEFAULT_OUTPUT_HTML
def write_analysis_report(
csv_path: Path | str = DEFAULT_OUTPUT_CSV,
html_path: Path | str = DEFAULT_OUTPUT_HTML,
) -> Path:
"""
스냅샷 CSV를 읽어 모듈별 컬럼 수·샘플 테이블 HTML 생성.
Returns:
HTML 경로.
"""
df = pd.read_csv(csv_path)
html_out = Path(html_path)
html_out.parent.mkdir(parents=True, exist_ok=True)
modules = {
"지표 (ga_)": [c for c in df.columns if "_ga_" in c or c.startswith("ga_")],
"패턴": [c for c in df.columns if "ga_pattern_" in c],
"파동·구조": [c for c in df.columns if "ga_struct_" in c or "ga_elliott" in c or "ga_wyckoff" in c or "ga_fib_" in c],
"차트": [c for c in df.columns if "ga_chart_" in c],
"MTF 합성": [c for c in df.columns if "ga_align_" in c],
"레거시": [c for c in df.columns if c.endswith("_RSI") or c.endswith("_bb_pos")],
}
summary_rows = ""
for name, cols in modules.items():
summary_rows += f"<tr><td>{name}</td><td>{len(cols)}</td></tr>"
sample = df.head(5)[
["dt", "action", "price", "ga_align_timing_buy_score", "ga_align_mtf_conflict", "d1_RSI", "m3_RSI"]
].to_html(index=False, classes="tbl") if "d1_RSI" in df.columns else df.head(3).to_html(index=False)
buy_mean = df[df["action"] == "buy"]["ga_align_timing_buy_score"].mean() if "ga_align_timing_buy_score" in df.columns else 0
sell_mean = df[df["action"] == "sell"]["ga_align_timing_sell_score"].mean() if "ga_align_timing_sell_score" in df.columns else 0
content = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8"/>
<title>general_analysis 실행 리포트</title>
<style>
body {{ font-family: "Malgun Gothic", Arial, sans-serif; margin: 24px; background: #f5f5f5; }}
table.tbl {{ border-collapse: collapse; width: 100%; background: #fff; font-size: 0.85rem; }}
th, td {{ border: 1px solid #e2e8f0; padding: 6px 8px; }}
th {{ background: #e2e8f0; }}
</style></head><body>
<h1>general_analysis 실행 리포트</h1>
<p>타점 {len(df)}건 · 컬럼 {len(df.columns)}개 · CSV: {csv_path}</p>
<h2>모듈별 컬럼 수</h2>
<table class="tbl"><thead><tr><th>모듈</th><th>컬럼 수</th></tr></thead>
<tbody>{summary_rows}</tbody></table>
<h2>MTF 합성 평균</h2>
<ul>
<li>매수 타점 timing_buy_score 평균: {buy_mean:.3f}</li>
<li>매도 타점 timing_sell_score 평균: {sell_mean:.3f}</li>
</ul>
<h2>샘플 5건</h2>
{sample}
<p>전체 데이터: <code>{csv_path}</code></p>
</body></html>"""
html_out.write_text(content, encoding="utf-8")
print(f"리포트: {html_out}")
return html_out

View File

@@ -0,0 +1,71 @@
"""
general_analysis 실행 진입점.
python scripts/03_analyze_trades.py
python scripts/03_analyze_trades.py --limit 20 # 테스트용 타점 수 제한
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from config import CHART_LOOKBACK_DAYS, SYMBOL
from deepcoin.analysis.general_analysis_config import (
DEFAULT_OUTPUT_CSV,
DEFAULT_OUTPUT_HTML,
DEFAULT_TRADES_FILE,
)
from deepcoin.analysis.general_analysis_report import write_analysis_report
from deepcoin.analysis.general_analysis_snapshot import export_trade_snapshots
from deepcoin.ops.monitor import Monitor
from deepcoin.data.mtf_bb import load_frames_from_db
def main() -> None:
"""ground truth 타점 MTF general_analysis 스냅샷 생성."""
parser = argparse.ArgumentParser(description="general_analysis MTF 타점 분석")
parser.add_argument("--limit", type=int, default=0, help="타점 수 제한 (0=전체)")
parser.add_argument("--trades", type=str, default=DEFAULT_TRADES_FILE)
parser.add_argument("--csv", type=str, default=DEFAULT_OUTPUT_CSV)
parser.add_argument("--html", type=str, default=DEFAULT_OUTPUT_HTML)
args = parser.parse_args()
from deepcoin.paths import REPORTS_ANALYSIS
trades_path = Path(args.trades)
data = json.loads(trades_path.read_text(encoding="utf-8"))
trades = data.get("trades") or []
if args.limit > 0:
trades = trades[: args.limit]
print(f"테스트 모드: 타점 {args.limit}건만")
print(f"=== general_analysis {SYMBOL} (lookback {CHART_LOOKBACK_DAYS}일) ===")
mon = Monitor(cooldown_file=None)
frames = load_frames_from_db(mon, SYMBOL, lookback_days=CHART_LOOKBACK_DAYS)
if not frames:
raise RuntimeError("coins.db 데이터 없음")
# limit 시 임시 trades 파일
if args.limit > 0:
tmp = REPORTS_ANALYSIS / "_ga_trades_subset.json"
tmp.parent.mkdir(exist_ok=True)
subset = {**data, "trades": trades}
tmp.write_text(json.dumps(subset, ensure_ascii=False), encoding="utf-8")
trades_path = tmp
from deepcoin.analysis.general_analysis_snapshot import build_trade_mtf_snapshots
csv_path = Path(args.csv)
df = build_trade_mtf_snapshots(frames, trades)
csv_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(csv_path, index=False, encoding="utf-8-sig")
print(f"저장: {csv_path} ({len(df)}× {len(df.columns)}열)")
write_analysis_report(csv_path, Path(args.html))
print("완료.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,100 @@
"""
general_analysis ground truth 타점 MTF 스냅샷 생성.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
from deepcoin.analysis.general_analysis_align import general_analysis_mtf_scores
from deepcoin.analysis.general_analysis_config import (
DEFAULT_OUTPUT_CSV,
DEFAULT_TRADES_FILE,
GENERAL_ANALYSIS_INTERVALS,
)
from deepcoin.analysis.general_analysis_core import interval_tf_prefix
from deepcoin.analysis.general_analysis_pipeline import general_analysis_enrich_bars, general_analysis_snapshot_at_bar
from deepcoin.ground_truth.ground_truth import load_ground_truth
def _prefixed_snap(snap: dict[str, Any], interval: int) -> dict[str, Any]:
p = interval_tf_prefix(interval)
return {f"{p}_{k}": v for k, v in snap.items()}
def build_trade_mtf_snapshots(
frames: dict[int, pd.DataFrame],
trades: list[dict[str, Any]],
) -> pd.DataFrame:
"""
모든 타점에 대해 8개 간격 general_analysis 스냅샷.
Args:
frames: interval → OHLCV.
trades: ground_truth trades.
Returns:
wide DataFrame (1 row per trade).
"""
enriched: dict[int, pd.DataFrame] = {}
for iv in GENERAL_ANALYSIS_INTERVALS:
raw = frames.get(iv)
if raw is None or raw.empty:
continue
print(f" [GA] {interval_tf_prefix(iv)} 봉 지표 계산 ({len(raw)}봉)...")
enriched[iv] = general_analysis_enrich_bars(raw, iv, full_context=True)
rows: list[dict[str, Any]] = []
for i, t in enumerate(sorted(trades, key=lambda x: x["dt"])):
ts = pd.Timestamp(t["dt"])
row: dict[str, Any] = {
"trade_idx": i,
"dt": t["dt"],
"action": t["action"],
"price": t["price"],
"weight": t.get("weight", 1.0),
"leg_id": t.get("leg_id", 0),
"memo": t.get("memo", ""),
}
flat: dict[str, Any] = {}
for iv in GENERAL_ANALYSIS_INTERVALS:
ef = enriched.get(iv)
if ef is None:
continue
snap = general_analysis_snapshot_at_bar(ef, ts, iv)
flat.update(_prefixed_snap(snap, iv))
row.update(flat)
row.update(general_analysis_mtf_scores(flat))
rows.append(row)
if (i + 1) % 50 == 0:
print(f" 타점 스냅샷 {i + 1}/{len(trades)}")
return pd.DataFrame(rows)
def export_trade_snapshots(
frames: dict[int, pd.DataFrame],
trades_path: Path | str = DEFAULT_TRADES_FILE,
output_csv: Path | str = DEFAULT_OUTPUT_CSV,
) -> Path:
"""
CSV로 타점 MTF 스냅샷 저장.
Returns:
저장 경로.
"""
data = load_ground_truth(Path(trades_path))
if not data:
raise FileNotFoundError(f"정답 파일 없음: {trades_path}")
trades = data.get("trades") or []
print(f"타점 {len(trades)}× {len(GENERAL_ANALYSIS_INTERVALS)} TF general_analysis")
df = build_trade_mtf_snapshots(frames, trades)
out = Path(output_csv)
out.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(out, index=False, encoding="utf-8-sig")
print(f"저장: {out} ({len(df)}× {len(df.columns)}열)")
return out

View File

@@ -0,0 +1,92 @@
"""
general_analysis Volume Profile (POC, VAH, VAL).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from config import GA_VP_BINS, GA_VP_VALUE_AREA_PCT
from deepcoin.analysis.general_analysis_core import ga_col
def general_analysis_volume_profile(
win: pd.DataFrame,
bins: int | None = None,
value_area_pct: float | None = None,
) -> dict[str, float | int]:
"""
lookback 구간 가격-거래량 분포에서 POC·VAH·VAL 계산.
Args:
win: OHLCV.
bins: 가격 구간 수.
value_area_pct: value area 누적 비율 (기본 70%).
Returns:
ga_vp_* 키 dict (접두사 없음).
"""
res: dict[str, float | int] = {
"vp_poc": 0.0,
"vp_vah": 0.0,
"vp_val": 0.0,
"vp_close_vs_poc_pct": 0.0,
"vp_in_value_area": 0,
}
if bins is None:
bins = GA_VP_BINS
if value_area_pct is None:
value_area_pct = GA_VP_VALUE_AREA_PCT
if win is None or len(win) < 10 or "Volume" not in win.columns:
return res
h = win["High"].astype(float).values
l = win["Low"].astype(float).values
c = win["Close"].astype(float).values
v = win["Volume"].astype(float).values
tp = (h + l + c) / 3.0
lo, hi = float(l.min()), float(h.max())
if hi <= lo:
return res
edges = np.linspace(lo, hi, bins + 1)
hist = np.zeros(bins, dtype=float)
for i in range(len(tp)):
idx = int(np.clip(np.digitize(tp[i], edges) - 1, 0, bins - 1))
hist[idx] += v[i]
if hist.sum() <= 0:
return res
poc_idx = int(np.argmax(hist))
poc = float((edges[poc_idx] + edges[poc_idx + 1]) / 2)
res["vp_poc"] = poc
order = np.argsort(hist)[::-1]
cum = 0.0
selected: list[int] = []
total = hist.sum()
for idx in order:
selected.append(int(idx))
cum += hist[idx]
if cum >= total * value_area_pct:
break
sel_min, sel_max = min(selected), max(selected)
res["vp_val"] = float(edges[sel_min])
res["vp_vah"] = float(edges[sel_max + 1])
res["vp_close_vs_poc_pct"] = float((c[-1] / poc - 1) * 100) if poc else 0.0
res["vp_in_value_area"] = int(res["vp_val"] <= c[-1] <= res["vp_vah"])
return res
def general_analysis_volume_columns() -> list[str]:
return ["vp_poc", "vp_vah", "vp_val", "vp_close_vs_poc_pct", "vp_in_value_area"]
def general_analysis_volume_snapshot(win: pd.DataFrame) -> dict[str, object]:
"""Volume profile → ga_vp_*."""
return {ga_col(k): v for k, v in general_analysis_volume_profile(win).items()}

View File

@@ -0,0 +1,205 @@
"""
general_analysis 파동·시장 구조 (다우, 엘리어트 라이트, 피보나치, Wyckoff 태그).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from deepcoin.analysis.general_analysis_core import find_pivots, ga_col
def _fib_levels(low: float, high: float) -> dict[str, float]:
diff = high - low
return {
"fib_0": low,
"fib_382": low + diff * 0.382,
"fib_500": low + diff * 0.5,
"fib_618": low + diff * 0.618,
"fib_100": high,
"fib_1618": low + diff * 1.618,
}
def general_analysis_wave_snapshot(win: pd.DataFrame) -> dict[str, object]:
"""
lookback 윈도우 마지막 시점 파동·구조 스냅샷.
Args:
win: OHLCV.
Returns:
ga_wave_* / ga_struct_* / ga_fib_* dict.
"""
res: dict[str, object] = {
"struct_trend": "range",
"struct_hh": 0,
"struct_hl": 0,
"struct_lh": 0,
"struct_ll": 0,
"struct_bos_bull": 0,
"struct_bos_bear": 0,
"struct_choch": 0,
"elliott_wave_count": 0,
"elliott_phase": "unknown",
"wyckoff_phase": "unknown",
"fib_near_level": "none",
"ichi_trend": "neutral",
"pitchfork_bias": "neutral",
"pitchfork_dist_pct": 0.0,
"wyckoff_spring": 0,
"wyckoff_utad": 0,
}
if win is None or len(win) < 15:
return {ga_col(k): v for k, v in res.items()}
h = win["High"].astype(float).values
l = win["Low"].astype(float).values
c = win["Close"].astype(float).values
peaks, troughs = find_pivots(h, l, order=2)
# Dow HH/HL/LH/LL
if len(peaks) >= 2 and len(troughs) >= 2:
hh = int(h[peaks[-1]] > h[peaks[-2]])
hl = int(l[troughs[-1]] > l[troughs[-2]])
lh = int(h[peaks[-1]] < h[peaks[-2]])
ll = int(l[troughs[-1]] < l[troughs[-2]])
res["struct_hh"] = hh
res["struct_hl"] = hl
res["struct_lh"] = lh
res["struct_ll"] = ll
if hh and hl:
res["struct_trend"] = "up"
elif lh and ll:
res["struct_trend"] = "down"
if hh and c[-1] > h[peaks[-2]]:
res["struct_bos_bull"] = 1
if ll and c[-1] < l[troughs[-2]]:
res["struct_bos_bear"] = 1
if (hh and ll) or (lh and hl):
res["struct_choch"] = 1
# Elliott lite: pivot count in window
swings = len(peaks) + len(troughs)
res["elliott_wave_count"] = swings
if swings >= 5:
res["elliott_phase"] = "impulse_late"
elif swings >= 3:
res["elliott_phase"] = "corrective"
# Wyckoff lite
vol = win["Volume"].astype(float).values if "Volume" in win.columns else np.ones(len(c))
vol_ma = vol[-20:].mean() if len(vol) >= 20 else vol.mean()
price_range = (h[-20:].max() - l[-20:].min()) / max(c[-1], 1e-9) * 100
if price_range < 6 and vol[-1] < vol_ma * 1.2:
res["wyckoff_phase"] = "accumulation"
elif price_range < 6 and vol[-1] > vol_ma * 1.5 and c[-1] > c[-5]:
res["wyckoff_phase"] = "distribution"
if price_range < 8 and l[-1] < l[-5] and c[-1] > c[-2] and vol[-1] > vol_ma * 1.3:
res["wyckoff_spring"] = 1
if price_range < 8 and h[-1] > h[-5] and c[-1] < c[-2] and vol[-1] > vol_ma * 1.3:
res["wyckoff_utad"] = 1
# Andrews Pitchfork (3피벗 중앙선 대비 종가 위치)
pivots = sorted([(i, h[i]) for i in peaks] + [(i, l[i]) for i in troughs])
if len(pivots) >= 3:
p0, p1, p2 = pivots[-3], pivots[-2], pivots[-1]
y0 = p0[1]
y_mid = (p1[1] + p2[1]) / 2
x0, x2 = p0[0], p2[0]
if x2 != x0:
slope = (y_mid - y0) / (x2 - x0)
y_line = y0 + slope * (len(c) - 1 - x0)
dist_pct = (c[-1] - y_line) / max(c[-1], 1e-9) * 100
res["pitchfork_dist_pct"] = round(float(dist_pct), 3)
if dist_pct > 0.5:
res["pitchfork_bias"] = "above"
elif dist_pct < -0.5:
res["pitchfork_bias"] = "below"
# Fibonacci
hi, lo = float(h.max()), float(l.min())
levels = _fib_levels(lo, hi)
price = float(c[-1])
for name, lvl in levels.items():
if abs(price - lvl) / max(price, 1e-9) * 100 < 1.5:
res["fib_near_level"] = name.replace("fib_", "")
break
if "ichi_cloud_top" in win.columns:
row = win.iloc[-1]
ct = float(row.get("ichi_cloud_top", np.nan))
cb = float(row.get("ichi_cloud_bottom", np.nan))
if not np.isnan(ct) and price > ct:
res["ichi_trend"] = "above_cloud"
elif not np.isnan(cb) and price < cb:
res["ichi_trend"] = "below_cloud"
else:
res["ichi_trend"] = "in_cloud"
return {ga_col(k): v for k, v in res.items()}
def general_analysis_wave_columns() -> list[str]:
return [
"struct_trend",
"struct_hh",
"struct_hl",
"struct_lh",
"struct_ll",
"struct_bos_bull",
"struct_bos_bear",
"struct_choch",
"elliott_wave_count",
"elliott_phase",
"wyckoff_phase",
"fib_near_level",
"ichi_trend",
"pitchfork_bias",
"pitchfork_dist_pct",
"wyckoff_spring",
"wyckoff_utad",
]
def general_analysis_apply_wave_to_bars(
df: pd.DataFrame,
interval: int,
tail_rows: int | None = None,
) -> pd.DataFrame:
"""파동·구조 스냅샷을 최근 봉에 롤링 적용."""
from deepcoin.analysis.general_analysis_config import CONTEXT_TAIL_ROWS, LOOKBACK_BARS
out = df.copy()
lb = LOOKBACK_BARS.get(interval, 80)
for k in general_analysis_wave_columns():
col = ga_col(k)
if k == "struct_trend":
out[col] = "range"
elif k in ("elliott_phase", "wyckoff_phase"):
out[col] = "unknown"
elif k == "fib_near_level":
out[col] = "none"
elif k in ("ichi_trend", "pitchfork_bias"):
out[col] = "neutral"
elif k == "pitchfork_dist_pct":
out[col] = 0.0
else:
out[col] = 0
n = len(out)
if n < lb + 1:
return out
if tail_rows is None:
tail_rows = CONTEXT_TAIL_ROWS.get(interval, 5000)
start = max(lb, n - tail_rows)
for i in range(start, n):
snap = general_analysis_wave_snapshot(out.iloc[i - lb : i])
idx = out.index[i]
for k, v in snap.items():
out.at[idx, k] = v
return out