""" 모든 봉(1~1440분)에 BB·일목 위치·캔들 형태 특징을 계산하고 기준 타임라인(3분)에 맞춰 정렬합니다. """ from __future__ import annotations import numpy as np import pandas as pd from config import ALL_INTERVALS, ENTRY_INTERVAL from indicators import add_bollinger, add_ichimoku from strategy import prepare_entry_df INTERVAL_LABELS: dict[int, str] = { 1: "m1", 3: "m3", 5: "m5", 10: "m10", 15: "m15", 30: "m30", 60: "m60", 240: "m240", 1440: "d1", } def interval_prefix(interval: int) -> str: """컬럼 접두사 (예: m3, d1).""" return INTERVAL_LABELS.get(interval, f"m{interval}") def interval_display(interval: int) -> str: if interval >= 1440: return "일봉" return f"{interval}분" # BB 위치 (밴드 내 %B 구간) BB_ZONE_FEATURES: tuple[str, ...] = ( "bb_zone_bottom", "bb_zone_low", "bb_zone_mid", "bb_zone_high", "bb_zone_top", ) # 일목 위치 ICHI_FEATURES: tuple[str, ...] = ( "ichi_above_cloud", "ichi_below_cloud", "ichi_in_cloud", "ichi_cloud_bull", "ichi_cloud_bear", "ichi_tk_bull", "ichi_tk_bear", "ichi_price_above_tenkan", "ichi_price_below_kijun", "ichi_tk_cross_up", "ichi_tk_cross_down", ) # BB 이벤트·캔들 형태 BB_EVENT_FEATURES: tuple[str, ...] = ( "cross_up_lower", "cross_up_upper", "cross_down_lower", "below_lower", "above_upper", "inside_band", "bb_pos_low", "bb_pos_high", "squeeze", ) CANDLE_SHAPE_FEATURES: tuple[str, ...] = ( "body_strong", "body_weak", "hammer", "shooting_star", "bullish", "bearish", ) FEATURE_BOOL_COLS: tuple[str, ...] = ( BB_EVENT_FEATURES + BB_ZONE_FEATURES + ICHI_FEATURES + CANDLE_SHAPE_FEATURES ) def compute_bar_features(df: pd.DataFrame) -> pd.DataFrame: """단일 봉 DataFrame에 BB·일목·캔들 위치 특징을 추가합니다.""" out = add_bollinger(add_ichimoku(prepare_entry_df(df.copy()))) if len(out) < 2: return out o = out["Open"].astype(float) h = out["High"].astype(float) l = out["Low"].astype(float) c = out["Close"].astype(float) prev_c = c.shift(1) upper = out["Upper"].astype(float) lower = out["Lower"].astype(float) prev_upper = upper.shift(1) prev_lower = lower.shift(1) rng = (h - l).replace(0, np.nan) body = (c - o).abs() out["range_pct"] = (rng / c.replace(0, np.nan)) * 100 out["body_ratio"] = (body / rng).fillna(0).clip(0, 1) out["upper_wick_ratio"] = ((h - np.maximum(o, c)) / rng).fillna(0).clip(0, 1) out["lower_wick_ratio"] = ((np.minimum(o, c) - l) / rng).fillna(0).clip(0, 1) out["ret_pct"] = ((c - prev_c) / prev_c.replace(0, np.nan)) * 100 pos = out["bb_pos"].astype(float) out["bb_zone_bottom"] = (pos < 0.15).astype(int) out["bb_zone_low"] = ((pos >= 0.15) & (pos < 0.35)).astype(int) out["bb_zone_mid"] = ((pos >= 0.35) & (pos < 0.65)).astype(int) out["bb_zone_high"] = ((pos >= 0.65) & (pos < 0.85)).astype(int) out["bb_zone_top"] = (pos >= 0.85).astype(int) out["cross_up_lower"] = ((prev_c <= prev_lower) & (c > lower)).astype(int) out["cross_up_upper"] = ((prev_c < prev_upper) & (c >= upper)).astype(int) out["cross_down_lower"] = ((prev_c >= prev_lower) & (c < lower)).astype(int) out["below_lower"] = (c < lower).astype(int) out["above_upper"] = (c > upper).astype(int) out["inside_band"] = ((c >= lower) & (c <= upper)).astype(int) out["bb_pos_low"] = (pos < 0.2).astype(int) out["bb_pos_high"] = (pos > 0.8).astype(int) out["squeeze"] = (out["BB_Width"] < 0.8).astype(int) ct = out["ichi_cloud_top"].astype(float) cb = out["ichi_cloud_bottom"].astype(float) ten = out["ichi_tenkan"].astype(float) kij = out["ichi_kijun"].astype(float) prev_ten = ten.shift(1) prev_kij = kij.shift(1) out["ichi_above_cloud"] = (c > ct).astype(int) out["ichi_below_cloud"] = (c < cb).astype(int) out["ichi_in_cloud"] = ((c >= cb) & (c <= ct)).astype(int) out["ichi_cloud_bull"] = (out["ichi_span_a"] > out["ichi_span_b"]).astype(int) out["ichi_cloud_bear"] = (out["ichi_span_a"] < out["ichi_span_b"]).astype(int) out["ichi_tk_bull"] = (ten > kij).astype(int) out["ichi_tk_bear"] = (ten < kij).astype(int) out["ichi_price_above_tenkan"] = (c > ten).astype(int) out["ichi_price_below_kijun"] = (c < kij).astype(int) out["ichi_tk_cross_up"] = ((prev_ten <= prev_kij) & (ten > kij)).astype(int) out["ichi_tk_cross_down"] = ((prev_ten >= prev_kij) & (ten < kij)).astype(int) out["body_strong"] = (out["body_ratio"] > 0.55).astype(int) out["body_weak"] = (out["body_ratio"] < 0.25).astype(int) out["hammer"] = ((out["lower_wick_ratio"] > 0.45) & (out["body_ratio"] < 0.35)).astype(int) out["shooting_star"] = ((out["upper_wick_ratio"] > 0.45) & (out["body_ratio"] < 0.35)).astype(int) out["bullish"] = (c > o).astype(int) out["bearish"] = (c < o).astype(int) return out def describe_latest_position(df: pd.DataFrame, interval: int) -> dict: """한 봉의 최신 BB·일목 위치 요약.""" feat = compute_bar_features(df) if feat.empty: return {"interval": interval, "label": interval_display(interval)} row = feat.iloc[-1] pos = float(row.get("bb_pos", 0.5)) bb_zone = "mid" for z in BB_ZONE_FEATURES: if int(row.get(z, 0)) == 1: bb_zone = z.replace("bb_zone_", "") break ichi_pos = "in_cloud" if int(row.get("ichi_above_cloud", 0)): ichi_pos = "above_cloud" elif int(row.get("ichi_below_cloud", 0)): ichi_pos = "below_cloud" return { "interval": interval, "label": interval_display(interval), "close": float(row["Close"]), "bb_pos": round(pos, 3), "bb_zone": bb_zone, "bb_state": _bb_event_label(row), "ichi_position": ichi_pos, "ichi_tk": "bull" if int(row.get("ichi_tk_bull", 0)) else "bear", "ichi_cloud": "bull" if int(row.get("ichi_cloud_bull", 0)) else "bear", } def _bb_event_label(row: pd.Series) -> str: for name in ( "cross_up_lower", "cross_up_upper", "cross_down_lower", "below_lower", "above_upper", "squeeze", "inside_band", ): if int(row.get(name, 0)) == 1: return name return "neutral" def _merge_interval_features( master_index: pd.DatetimeIndex, feat: pd.DataFrame, prefix: str, ) -> pd.DataFrame: """master_index 길이와 동일한 간격 특징만 반환.""" pick = [c for c in FEATURE_BOOL_COLS if c in feat.columns] extra = [ c for c in ("bb_pos", "body_ratio", "lower_wick_ratio", "ret_pct", "bb_width_pct") if c in feat.columns ] if "bb_width_pct" not in feat.columns and "BB_Width" in feat.columns: feat = feat.copy() feat["bb_width_pct"] = feat["BB_Width"] extra.append("bb_width_pct") sub = feat[pick + extra].copy() sub.columns = [f"{prefix}_{c}" for c in sub.columns] left = pd.DataFrame({"ts": master_index}) right = sub.reset_index() time_col = right.columns[0] right = right.rename(columns={time_col: "ts"}) merged = pd.merge_asof( left.sort_values("ts"), right.sort_values("ts"), on="ts", direction="backward", ) merged.index = master_index return merged.drop(columns=["ts"]) def build_master_feature_matrix(frames: dict[int, pd.DataFrame]) -> pd.DataFrame: """3분 타임라인에 모든 봉의 BB·일목·캔들 특징을 붙인 행렬.""" entry = frames.get(ENTRY_INTERVAL) if entry is None or entry.empty: raise ValueError(f"{ENTRY_INTERVAL}분봉(ENTRY_INTERVAL) 데이터가 없습니다.") entry_feat = compute_bar_features(entry) entry_feat = entry_feat[~entry_feat.index.duplicated(keep="last")].sort_index() p0 = interval_prefix(ENTRY_INTERVAL) ohlc = ["Open", "High", "Low", "Close", "Volume", "Upper", "Lower", "MA"] master = entry_feat[[c for c in ohlc if c in entry_feat.columns]].copy() for col in FEATURE_BOOL_COLS: if col in entry_feat.columns: master[f"{p0}_{col}"] = entry_feat[col] for col in ("bb_pos", "body_ratio", "lower_wick_ratio", "ret_pct", "bb_width_pct", "BB_Width"): if col in entry_feat.columns: master[f"{p0}_{col}"] = entry_feat[col] for interval in ALL_INTERVALS: if interval == ENTRY_INTERVAL: continue df = frames.get(interval) if df is None or df.empty: continue feat = compute_bar_features(df) feat = feat[~feat.index.duplicated(keep="last")].sort_index() prefix = interval_prefix(interval) merged = _merge_interval_features(master.index, feat, prefix) master = pd.concat([master, merged], axis=1) return master.loc[:, ~master.columns.duplicated()]