""" 모든 봉 간격에 대해 BB 위치·캔들 형태(몸통/꼬리/높이) 특징을 계산하고 기준 타임라인(3분)에 맞춰 정렬합니다. """ from __future__ import annotations import numpy as np import pandas as pd from config import ENTRY_INTERVAL from strategy import prepare_entry_df INTERVAL_LABELS: dict[int, str] = { 3: "m3", 10: "m10", 15: "m15", 30: "m30", 60: "m60", 240: "m240", 1440: "d1", } def interval_prefix(interval: int) -> str: """컬럼 접두사 (예: m3, d1).""" return INTERVAL_LABELS.get(interval, f"m{interval}") def compute_bar_features(df: pd.DataFrame) -> pd.DataFrame: """단일 봉 DataFrame에 위치·캔들 높이 특징을 추가합니다.""" out = prepare_entry_df(df.copy()) if len(out) < 2: return out o = out["Open"].astype(float) h = out["High"].astype(float) l = out["Low"].astype(float) c = out["Close"].astype(float) prev_c = c.shift(1) upper = out["Upper"].astype(float) lower = out["Lower"].astype(float) prev_upper = upper.shift(1) prev_lower = lower.shift(1) ma = out["MA"].astype(float) band = (upper - lower).replace(0, np.nan) out["bb_pos"] = ((c - lower) / band).clip(0, 1) out["bb_width_pct"] = ( out["BB_Width"] if "BB_Width" in out.columns else (band / ma.replace(0, np.nan) * 100) ) rng = (h - l).replace(0, np.nan) body = (c - o).abs() out["range_pct"] = (rng / c.replace(0, np.nan)) * 100 out["body_ratio"] = (body / rng).fillna(0).clip(0, 1) out["upper_wick_ratio"] = ((h - np.maximum(o, c)) / rng).fillna(0).clip(0, 1) out["lower_wick_ratio"] = ((np.minimum(o, c) - l) / rng).fillna(0).clip(0, 1) out["ret_pct"] = ((c - prev_c) / prev_c.replace(0, np.nan)) * 100 out["bullish"] = (c > o).astype(int) out["bearish"] = (c < o).astype(int) out["cross_up_lower"] = ((prev_c <= prev_lower) & (c > lower)).astype(int) out["cross_up_upper"] = ((prev_c < prev_upper) & (c >= upper)).astype(int) out["cross_down_lower"] = ((prev_c >= prev_lower) & (c < lower)).astype(int) out["below_lower"] = (c < lower).astype(int) out["above_upper"] = (c > upper).astype(int) out["inside_band"] = ((c >= lower) & (c <= upper)).astype(int) out["bb_pos_low"] = (out["bb_pos"] < 0.2).astype(int) out["bb_pos_high"] = (out["bb_pos"] > 0.8).astype(int) out["body_strong"] = (out["body_ratio"] > 0.55).astype(int) out["body_weak"] = (out["body_ratio"] < 0.25).astype(int) out["hammer"] = ((out["lower_wick_ratio"] > 0.45) & (out["body_ratio"] < 0.35)).astype(int) out["shooting_star"] = ((out["upper_wick_ratio"] > 0.45) & (out["body_ratio"] < 0.35)).astype(int) out["squeeze"] = (out["bb_width_pct"] < 0.8).astype(int) return out FEATURE_BOOL_COLS: tuple[str, ...] = ( "cross_up_lower", "cross_up_upper", "cross_down_lower", "below_lower", "above_upper", "inside_band", "bb_pos_low", "bb_pos_high", "body_strong", "body_weak", "hammer", "shooting_star", "squeeze", "bullish", "bearish", ) def _merge_interval_features( master_index: pd.DatetimeIndex, feat: pd.DataFrame, prefix: str, ) -> pd.DataFrame: """master_index 길이와 동일한 간격 특징만 반환.""" pick = [c for c in FEATURE_BOOL_COLS if c in feat.columns] extra = [c for c in ("bb_pos", "body_ratio", "lower_wick_ratio", "ret_pct") if c in feat.columns] sub = feat[pick + extra].copy() sub.columns = [f"{prefix}_{c}" for c in sub.columns] left = pd.DataFrame({"ts": master_index}) right = sub.reset_index() time_col = right.columns[0] right = right.rename(columns={time_col: "ts"}) merged = pd.merge_asof( left.sort_values("ts"), right.sort_values("ts"), on="ts", direction="backward", ) merged.index = master_index return merged.drop(columns=["ts"]) def build_master_feature_matrix(frames: dict[int, pd.DataFrame]) -> pd.DataFrame: """3분 타임라인에 모든 봉의 위치·캔들 특징을 붙인 행렬 (인덱스 유일).""" entry = frames.get(ENTRY_INTERVAL) if entry is None or entry.empty: raise ValueError("ENTRY_INTERVAL 데이터가 없습니다.") entry_feat = compute_bar_features(entry) entry_feat = entry_feat[~entry_feat.index.duplicated(keep="last")].sort_index() p3 = interval_prefix(ENTRY_INTERVAL) ohlc = ["Open", "High", "Low", "Close", "Volume", "Upper", "Lower", "MA"] master = entry_feat[[c for c in ohlc if c in entry_feat.columns]].copy() for col in FEATURE_BOOL_COLS: if col in entry_feat.columns: master[f"{p3}_{col}"] = entry_feat[col] for col in ("bb_pos", "body_ratio", "lower_wick_ratio", "ret_pct", "bb_width_pct"): if col in entry_feat.columns: master[f"{p3}_{col}"] = entry_feat[col] parts = [master] for interval, df in sorted(frames.items()): if interval == ENTRY_INTERVAL or df is None or df.empty: continue feat = compute_bar_features(df) feat = feat[~feat.index.duplicated(keep="last")].sort_index() prefix = interval_prefix(interval) parts.append(_merge_interval_features(master.index, feat, prefix)) out = pd.concat(parts, axis=1) return out.loc[:, ~out.columns.duplicated()]