""" simulation_1min.py ------------------- 목표: 1분봉 기반 자동 매수/매도 전략을 최적화하고 시뮬레이션한다. 기능 개요 1) SQLITE DB(resources/coins.db)의 1분봉 데이터 로딩 2) 이동평균/RSI/Bollinger/ATR/거래량 지표 계산 3) 매수 후보 전략(모멘텀, 이동평균 교차, RSI+볼린저, 거래량 스파이크)을 조합 4) 손절·익절·트레일링스탑·시간청산·변동성/거래량 기반 매도 규칙 비교 5) 그리드 서치, 랜덤 서치, 간단한 walk-forward(롤링) 검증 6) 스트레스 테스트(하락장/고변동/저변동)와 결과 리포트 및 Plotly 시각화 7) 실거래 옵션(HTS 주문 Stub) 사용 예시 python simulation_1min.py --symbol BTC --optimize grid random --walk-forward python simulation_1min.py --symbol XRP --export-html reports/xrp_equity.html --stress-test """ from __future__ import annotations import argparse import itertools import json import math import os import random import sqlite3 from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Dict, Iterable, List, Optional, Sequence, Tuple import numpy as np import pandas as pd import plotly.graph_objects as go from plotly.subplots import make_subplots try: from HTS2 import HTS # 실거래 연동 옵션 except ImportError: # pragma: no cover HTS = None # ------------------------------ 유틸 ------------------------------ # def json_default(obj): """numpy, Timestamp 등을 JSON 직렬화할 때 float/str로 변환.""" if isinstance(obj, (np.floating, np.float32, np.float64)): return float(obj) if isinstance(obj, (np.integer, np.int32, np.int64)): return int(obj) if isinstance(obj, (np.bool_,)): return bool(obj) if isinstance(obj, (pd.Timestamp, datetime)): return obj.isoformat() raise TypeError(f"Type {type(obj)} not serializable") def get_tick_size(price: float) -> float: """국내 거래소 KRW 호가 단위를 근사.""" if price < 0.1: return 0.0001 if price < 1: return 0.001 if price < 10: return 0.01 if price < 100: return 0.1 if price < 1_000: return 1 if price < 10_000: return 5 if price < 100_000: return 10 if price < 500_000: return 50 if price < 1_000_000: return 100 return 1_000 # ------------------------------ 공통 설정 ------------------------------ # BASE_DIR = os.path.abspath(os.path.dirname(__file__)) DEFAULT_DB_PATH = os.path.join(BASE_DIR, "resources", "coins.db") DEFAULT_SYMBOL = "BTC" FEE_RATE = 0.0004 # 매수/매도 각각 0.04% SLIPPAGE = 0.0002 # 체결 슬리피지 가정 MIN_ORDER_KRW = 10_000 INITIAL_CAPITAL = 1_000_000 MAX_POSITION_KRW = 1_000_000 RISK_FREE_RATE = 0.0 # 단기 무위험수익률 STRATEGY_MODE_PATH = os.path.join(BASE_DIR, "resources", "strategy_mode.json") ENTRY_SIGNAL_OPTIONS: Tuple[str, ...] = ( "ma_cross", "rsi_dip", "momentum_breakout", "keltner_reversion", "volatility_breakout", ) DEFAULT_ENTRY_COMBOS: List[Tuple[str, ...]] = [ ("ma_cross",), ("ma_cross", "rsi_dip"), ("ma_cross", "keltner_reversion"), ("keltner_reversion",), ("momentum_breakout",), ("volatility_breakout", "ma_cross"), ] # ------------------------------ 데이터 로더 ------------------------------ # class MinuteDataLoader: """SQLite에서 분봉 데이터를 읽어오는 헬퍼. Example ------- loader = MinuteDataLoader(DEFAULT_DB_PATH) df = loader.load("BTC", limit=8000) """ def __init__(self, db_path: str) -> None: self.db_path = db_path def load(self, symbol: str, limit: int = 8000, interval: int = 1) -> pd.DataFrame: table_name = f"{symbol}_{interval}" query = ( f"SELECT ymdhms as datetime, Open, High, Low, Close, Volume " f"FROM {table_name} ORDER BY datetime DESC LIMIT {limit}" ) with sqlite3.connect(self.db_path) as conn: df = pd.read_sql_query(query, conn, parse_dates=["datetime"]) if df.empty: raise ValueError(f"{table_name} 테이블에서 데이터가 비어있습니다.") df = df.sort_values("datetime").reset_index(drop=True) df = df.astype( { "Open": float, "High": float, "Low": float, "Close": float, "Volume": float, } ) df["return"] = df["Close"].pct_change().fillna(0.0) df["minute"] = df["datetime"].dt.minute return df # ------------------------------ 지표 도우미 ------------------------------ # class IndicatorBuilder: """가격/거래량 지표 생성기.""" ROLL_LOOKBACKS: Tuple[int, ...] = (30, 45, 60, 90) REGIME_LOOKBACK: int = 600 @staticmethod def ema(series: pd.Series, span: int) -> pd.Series: return series.ewm(span=span, adjust=False).mean() @staticmethod def rsi(series: pd.Series, period: int = 14) -> pd.Series: delta = series.diff() up = delta.clip(lower=0).rolling(period).mean() down = -delta.clip(upper=0).rolling(period).mean() rs = up / down.replace(0, np.nan) return 100 - (100 / (1 + rs)) @staticmethod def atr(df: pd.DataFrame, period: int = 14) -> pd.Series: high_low = df["High"] - df["Low"] high_close = (df["High"] - df["Close"].shift()).abs() low_close = (df["Low"] - df["Close"].shift()).abs() tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) return tr.rolling(period).mean() def enrich(self, df: pd.DataFrame) -> pd.DataFrame: out = df.copy() for window in (3, 5, 8, 13, 21, 34, 55, 89): out[f"sma_{window}"] = out["Close"].rolling(window).mean() out[f"ema_{window}"] = self.ema(out["Close"], window) out["rsi14"] = self.rsi(out["Close"], 14) out["atr14"] = self.atr(out, 14) out["atr_pct"] = out["atr14"] / out["Close"] out["boll_mid"] = out["Close"].rolling(20).mean() out["boll_std"] = out["Close"].rolling(20).std(ddof=0) out["boll_up"] = out["boll_mid"] + 2 * out["boll_std"] out["boll_low"] = out["boll_mid"] - 2 * out["boll_std"] out["volume_ma50"] = out["Volume"].rolling(50).mean() out["volume_std50"] = out["Volume"].rolling(50).std(ddof=0) out["volume_std50"] = out["volume_std50"].replace(0, np.nan) out["volume_z"] = (out["Volume"] - out["volume_ma50"]) / out["volume_std50"] out["range_pct"] = (out["High"] - out["Low"]) / out["Close"] out["ret_5"] = out["Close"].pct_change(5) out["ret_15"] = out["Close"].pct_change(15) out["weekday"] = out["datetime"].dt.weekday out["macro_trend"] = out["ema_34"] - out["ema_89"] # 켈트너 밴드 및 롤링 고저 out["keltner_mid"] = out["ema_21"] out["keltner_upper"] = out["keltner_mid"] + out["atr14"] * 1.5 out["keltner_lower"] = out["keltner_mid"] - out["atr14"] * 1.5 for lookback in self.ROLL_LOOKBACKS: out[f"roll_max_{lookback}"] = out["High"].rolling(lookback).max() out[f"roll_min_{lookback}"] = out["Low"].rolling(lookback).min() # 변동성 레짐 계산 (롤링 분위수 기반) high_q = out["atr_pct"].rolling(self.REGIME_LOOKBACK, min_periods=self.REGIME_LOOKBACK).quantile(0.7) low_q = out["atr_pct"].rolling(self.REGIME_LOOKBACK, min_periods=self.REGIME_LOOKBACK).quantile(0.3) out["vol_regime"] = np.where( out["atr_pct"] >= high_q, 1, np.where(out["atr_pct"] <= low_q, -1, 0), ) out.dropna(inplace=True) return out # ------------------------------ 전략 파라미터 ------------------------------ # @dataclass class StrategyParams: """단일 전략 설정 값을 담는 데이터 구조.""" entry_combo: Tuple[str, ...] = ("ma_cross",) combo_mode: str = "OR" ma_fast: int = 5 ma_slow: int = 21 rsi_buy: float = 32.0 rsi_exit: float = 68.0 mom_threshold: float = 0.0015 dip_sigma: float = 1.0 volume_z_buy: float = -0.5 volume_z_breakout: float = 0.5 atr_filter: Tuple[float, float] = (0.001, 0.03) trading_hours: Tuple[int, int] = (0, 23) # 한국시간 기준 시간 필터 max_positions: int = 2 risk_pct: float = 0.2 # 보유 현금 대비 stop_loss_pct: float = 0.006 # 0.6% take_profit_pct: float = 0.012 # 1.2% trailing_atr_mult: float = 2.0 time_stop_bars: int = 45 vol_drop_exit_z: float = -1.0 reverse_signal_exit: bool = True use_dynamic_risk: bool = True risk_pct_high_vol: float = 0.12 risk_pct_low_vol: float = 0.25 stop_loss_pct_high_vol: float = 0.005 stop_loss_pct_low_vol: float = 0.007 take_profit_pct_high_vol: float = 0.012 take_profit_pct_low_vol: float = 0.009 trailing_atr_mult_high_vol: float = 2.4 trailing_atr_mult_low_vol: float = 1.8 breakout_lookback: int = 60 # ------------------------------ 전략 조합 생성 ------------------------------ # def generate_entry_combos(max_len: int = 3) -> List[Tuple[str, ...]]: combos: List[Tuple[str, ...]] = [] for length in range(1, max_len + 1): combos.extend(itertools.combinations(ENTRY_SIGNAL_OPTIONS, length)) return combos def load_strategy_mode() -> bool: try: with open(STRATEGY_MODE_PATH, "r", encoding="utf-8") as f: payload = json.load(f) return bool(payload.get("use_full_search", False)) except FileNotFoundError: return False except Exception: return False def save_strategy_mode(use_full: bool) -> None: os.makedirs(os.path.dirname(STRATEGY_MODE_PATH), exist_ok=True) with open(STRATEGY_MODE_PATH, "w", encoding="utf-8") as f: json.dump({"use_full_search": bool(use_full)}, f, ensure_ascii=False, indent=2) # ------------------------------ 매수 신호 생성기 ------------------------------ # class EntrySignalEngine: """여러 매수 규칙을 함수화하여 조합.""" def __init__(self, params: StrategyParams) -> None: self.params = params def _ma_cross(self, df: pd.DataFrame, idx: int) -> bool: fast = df[f"ema_{self.params.ma_fast}"] slow = df[f"ema_{self.params.ma_slow}"] if idx == 0: return False cross_up = fast.iloc[idx] > slow.iloc[idx] and fast.iloc[idx - 1] <= slow.iloc[idx - 1] rsi_cond = df["rsi14"].iloc[idx] >= self.params.rsi_buy volume_cond = df["volume_z"].iloc[idx] > self.params.volume_z_breakout return bool(cross_up and rsi_cond and volume_cond) def _rsi_bollinger_dip(self, df: pd.DataFrame, idx: int) -> bool: price = df["Close"].iloc[idx] lower_band = df["boll_low"].iloc[idx] std = df["boll_std"].iloc[idx] rsi = df["rsi14"].iloc[idx] vol = df["volume_z"].iloc[idx] band_touch = price <= lower_band + self.params.dip_sigma * std rsi_ok = rsi <= self.params.rsi_buy volume_rebound = vol >= self.params.volume_z_buy return bool(band_touch and rsi_ok and volume_rebound) def _momentum_breakout(self, df: pd.DataFrame, idx: int) -> bool: price = df["Close"].iloc[idx] upper_band = df["boll_up"].iloc[idx] ret_5 = df["ret_5"].iloc[idx] volume = df["volume_z"].iloc[idx] atr_pct = df["atr_pct"].iloc[idx] atr_ok = self.params.atr_filter[0] <= atr_pct <= self.params.atr_filter[1] return bool(price > upper_band and ret_5 > self.params.mom_threshold and volume > self.params.volume_z_breakout and atr_ok) def _keltner_reversion(self, df: pd.DataFrame, idx: int) -> bool: if idx == 0 or "keltner_lower" not in df.columns: return False price = df["Close"].iloc[idx] lower_band = df["keltner_lower"].iloc[idx] trend_ok = df["ema_21"].iloc[idx] > df["ema_55"].iloc[idx] regime = df["vol_regime"].iloc[idx] if "vol_regime" in df.columns else 0 rsi = df["rsi14"].iloc[idx] volume_rebound = df["volume_z"].iloc[idx] > self.params.volume_z_buy return bool(trend_ok and regime <= 0 and price <= lower_band and rsi <= self.params.rsi_buy + 5 and volume_rebound) def _volatility_breakout(self, df: pd.DataFrame, idx: int) -> bool: lookback = self.params.breakout_lookback col = f"roll_max_{lookback}" if col not in df.columns or idx == 0: return False regime = df["vol_regime"].iloc[idx] if "vol_regime" in df.columns else 0 if regime <= 0: return False breakout_level = df[col].iloc[idx - 1] price = df["Close"].iloc[idx] volume = df["volume_z"].iloc[idx] momentum = df["ret_15"].iloc[idx] trend_ok = df["macro_trend"].iloc[idx] > 0 return bool(price > breakout_level and volume > self.params.volume_z_breakout and momentum > self.params.mom_threshold * 0.8 and trend_ok) def evaluate(self, df: pd.DataFrame, idx: int) -> bool: hour = df["datetime"].iloc[idx].hour if not (self.params.trading_hours[0] <= hour <= self.params.trading_hours[1]): return False signal_map = { "ma_cross": self._ma_cross, "rsi_dip": self._rsi_bollinger_dip, "momentum_breakout": self._momentum_breakout, "keltner_reversion": self._keltner_reversion, "volatility_breakout": self._volatility_breakout, } results = [] for name in self.params.entry_combo: func = signal_map.get(name) if func is None: continue results.append(func(df, idx)) if not results: return False if self.params.combo_mode == "AND": return all(results) return any(results) # ------------------------------ 거래 및 백테스트 ------------------------------ # @dataclass class Trade: entry_time: datetime exit_time: datetime entry_price: float exit_price: float qty: float pnl: float return_pct: float bars_held: int reason: str @dataclass class SimulationResult: params: StrategyParams trades: List[Trade] equity_curve: pd.DataFrame metrics: Dict[str, float] price_history: pd.DataFrame class PortfolioSimulator: """포트폴리오(단일 종목) 시뮬레이션 엔진.""" def __init__(self, data: pd.DataFrame, params: StrategyParams, initial_capital: float = INITIAL_CAPITAL) -> None: self.df = data.reset_index(drop=True) self.params = params self.initial_capital = initial_capital self.signal_engine = EntrySignalEngine(params) def _current_regime(self, idx: int) -> int: if "vol_regime" in self.df.columns: try: return int(self.df["vol_regime"].iloc[idx]) except Exception: return 0 return 0 def _resolve_risk_pct(self, regime: int) -> float: if not self.params.use_dynamic_risk: return self.params.risk_pct if regime > 0: return self.params.risk_pct_high_vol if regime < 0: return self.params.risk_pct_low_vol return self.params.risk_pct def _resolve_levels(self, regime: int) -> Tuple[float, float, float]: stop_loss = self.params.stop_loss_pct take_profit = self.params.take_profit_pct trailing_mult = self.params.trailing_atr_mult if regime > 0: stop_loss = self.params.stop_loss_pct_high_vol take_profit = self.params.take_profit_pct_high_vol trailing_mult = self.params.trailing_atr_mult_high_vol elif regime < 0: stop_loss = self.params.stop_loss_pct_low_vol take_profit = self.params.take_profit_pct_low_vol trailing_mult = self.params.trailing_atr_mult_low_vol return stop_loss, take_profit, trailing_mult def _position_size(self, cash: float, price: float, regime: int) -> Tuple[float, float]: max_alloc = cash * self._resolve_risk_pct(regime) krw_to_use = max(MIN_ORDER_KRW, max_alloc) krw_to_use = min(krw_to_use, cash, MAX_POSITION_KRW) qty = krw_to_use / price if price > 0 else 0.0 if qty <= 0: return 0.0, cash return qty, cash - krw_to_use def _adjust_buy_price(self, price: float) -> float: tick = get_tick_size(price) return price + tick def _adjust_sell_price(self, price: float) -> float: tick = get_tick_size(price) adjusted = price - tick return adjusted if adjusted > 0 else price def _apply_fees(self, price: float, side: str) -> float: fee_multiplier = 1 + FEE_RATE + (SLIPPAGE if side == "buy" else -SLIPPAGE) if side == "buy": return price * fee_multiplier return price * (1 - FEE_RATE - SLIPPAGE) def run(self) -> SimulationResult: cash = self.initial_capital positions: List[Dict] = [] equity_curve: List[Tuple[datetime, float]] = [] trades: List[Trade] = [] for i in range(max(self.params.ma_slow, 60), len(self.df) - 1): current = self.df.iloc[i] next_candle = self.df.iloc[i + 1] # 1) 기존 포지션 청산 조건 확인 updated_positions = [] for pos in positions: exit_reason = "" entry_price = pos["entry_price"] trailing_level = pos["trailing"] best_price = max(pos["best_price"], self.df["High"].iloc[i]) stop_price = entry_price * (1 - pos["stop_loss_pct"]) take_price = entry_price * (1 + pos["take_profit_pct"]) trail_price = best_price - pos["atr"] * pos["trailing_mult"] if trail_price > trailing_level: trailing_level = trail_price executed_exit = False exit_price = next_candle["Open"] stop_exec_price = self._adjust_sell_price(stop_price) take_exec_price = self._adjust_sell_price(take_price) trail_exec_price = self._adjust_sell_price(trailing_level) # 봉 내부 시뮬레이션 low = next_candle["Low"] high = next_candle["High"] if low <= stop_price: exit_price = stop_exec_price exit_reason = "stop_loss" executed_exit = True elif high >= take_price: exit_price = take_exec_price exit_reason = "take_profit" executed_exit = True elif low <= trailing_level: exit_price = trail_exec_price exit_reason = "trailing_stop" executed_exit = True elif (i - pos["entry_idx"]) >= self.params.time_stop_bars: exit_price = self._adjust_sell_price(next_candle["Open"]) exit_reason = "time_stop" executed_exit = True elif self.params.vol_drop_exit_z is not None and self.df["volume_z"].iloc[i] <= self.params.vol_drop_exit_z: exit_price = self._adjust_sell_price(next_candle["Open"]) exit_reason = "volume_drop" executed_exit = True elif self.params.reverse_signal_exit and self.signal_engine.evaluate(self.df, i): exit_price = self._adjust_sell_price(next_candle["Open"]) exit_reason = "reverse_signal" executed_exit = True if executed_exit: trend_value = self.df["macro_trend"].iloc[i] if "macro_trend" in self.df.columns else 0.0 sell_ratio = 0.5 if trend_value >= 0 else 1.0 sell_ratio = min(max(sell_ratio, 0.0), 1.0) qty_to_sell = pos["qty"] * sell_ratio qty_to_sell = pos["qty"] if qty_to_sell <= 0 else qty_to_sell exit_price_fee = self._apply_fees(exit_price, "sell") pnl = (exit_price_fee - entry_price) * qty_to_sell cash += qty_to_sell * exit_price_fee trades.append( Trade( entry_time=pos["entry_time"], exit_time=next_candle["datetime"], entry_price=entry_price, exit_price=exit_price_fee, qty=qty_to_sell, pnl=pnl, return_pct=pnl / (entry_price * qty_to_sell), bars_held=i - pos["entry_idx"], reason=f"{exit_reason}|ratio:{sell_ratio:.2f}", ) ) remaining_qty = pos["qty"] - qty_to_sell if remaining_qty > 1e-8: pos["qty"] = remaining_qty pos["best_price"] = best_price pos["trailing"] = trailing_level updated_positions.append(pos) else: pos["best_price"] = best_price pos["trailing"] = trailing_level updated_positions.append(pos) positions = updated_positions # 2) 신규 진입 atr_pct = self.df["atr_pct"].iloc[i] regime = self._current_regime(i) if ( len(positions) < self.params.max_positions and self.params.atr_filter[0] <= atr_pct <= self.params.atr_filter[1] and self.signal_engine.evaluate(self.df, i) ): entry_price = self._adjust_buy_price(next_candle["Open"]) entry_price_fee = self._apply_fees(entry_price, "buy") qty, cash = self._position_size(cash, entry_price_fee, regime) if qty > 0: stop_loss_pct, take_profit_pct, trailing_mult = self._resolve_levels(regime) positions.append( { "entry_price": entry_price_fee, "entry_time": next_candle["datetime"], "entry_idx": i + 1, "qty": qty, "atr": self.df["atr14"].iloc[i], "best_price": entry_price_fee, "trailing": entry_price_fee * (1 - stop_loss_pct), "stop_loss_pct": stop_loss_pct, "take_profit_pct": take_profit_pct, "trailing_mult": trailing_mult, "regime": regime, } ) # 3) 자산 곡선 기록 market_value = sum(pos["qty"] * self.df["Close"].iloc[i] for pos in positions) equity_curve.append((current["datetime"], cash + market_value)) equity_df = pd.DataFrame(equity_curve, columns=["datetime", "equity"]) final_time = self.df.iloc[-1]["datetime"] final_price = self.df.iloc[-1]["Close"] if positions: for pos in positions: forced_price = self._adjust_sell_price(final_price) exit_price_fee = self._apply_fees(forced_price, "sell") pnl = (exit_price_fee - pos["entry_price"]) * pos["qty"] cash += pos["qty"] * exit_price_fee trades.append( Trade( entry_time=pos["entry_time"], exit_time=final_time, entry_price=pos["entry_price"], exit_price=exit_price_fee, qty=pos["qty"], pnl=pnl, return_pct=pnl / (pos["entry_price"] * pos["qty"]), bars_held=len(self.df) - pos["entry_idx"], reason="forced_exit", ) ) if equity_df.empty or equity_df["datetime"].iloc[-1] != final_time: equity_df = pd.concat([equity_df, pd.DataFrame({"datetime": [final_time], "equity": [cash]})], ignore_index=True) else: equity_df.loc[equity_df.index[-1], "equity"] = cash metrics = self._calculate_metrics(equity_df, trades) price_history = self.df[["datetime", "Open", "High", "Low", "Close"]].set_index("datetime") return SimulationResult(self.params, trades, equity_df, metrics, price_history) def _calculate_metrics(self, equity_df: pd.DataFrame, trades: List[Trade]) -> Dict[str, float]: if equity_df.empty: return {} returns = equity_df["equity"].pct_change().dropna() total_return = equity_df["equity"].iloc[-1] / equity_df["equity"].iloc[0] - 1 duration_minutes = (equity_df["datetime"].iloc[-1] - equity_df["datetime"].iloc[0]).total_seconds() / 60 years = max(duration_minutes / (60 * 24 * 365), 1e-6) cagr = (1 + total_return) ** (1 / years) - 1 running_max = equity_df["equity"].cummax() drawdown = equity_df["equity"] / running_max - 1 max_dd = drawdown.min() sharpe = (returns.mean() - RISK_FREE_RATE / (365 * 24 * 60)) / (returns.std() + 1e-9) * math.sqrt(365 * 24 * 60) win_trades = [t for t in trades if t.pnl > 0] loss_trades = [t for t in trades if t.pnl <= 0] profit_factor = (sum(t.pnl for t in win_trades) / abs(sum(t.pnl for t in loss_trades))) if loss_trades else np.inf hit_ratio = len(win_trades) / len(trades) if trades else 0.0 return { "final_equity": equity_df["equity"].iloc[-1], "total_return": total_return, "CAGR": cagr, "max_drawdown": max_dd, "sharpe": sharpe, "num_trades": len(trades), "hit_ratio": hit_ratio, "profit_factor": profit_factor, "avg_bars": np.mean([t.bars_held for t in trades]) if trades else 0.0, } # ------------------------------ 최적화 도구 ------------------------------ # class StrategyOptimizer: """그리드/랜덤 서치 및 walk-forward 평가.""" def __init__( self, data: pd.DataFrame, initial_capital: float = INITIAL_CAPITAL, entry_combos: Optional[Sequence[Tuple[str, ...]]] = None, ): self.data = data self.initial_capital = initial_capital self.entry_combos = list(entry_combos) if entry_combos else list(DEFAULT_ENTRY_COMBOS) def _evaluate(self, params: StrategyParams) -> SimulationResult: simulator = PortfolioSimulator(self.data, params, self.initial_capital) return simulator.run() def grid_search(self, limit: int = 20) -> List[SimulationResult]: entry_combos = self.entry_combos combo_modes = ["OR"] ma_fast_opts = [5, 8] ma_slow_opts = [34, 55] stop_opts = [0.004, 0.0055] tp_opts = [0.01, 0.013] trailing_opts = [1.8, 2.2] risk_sets = [ (0.18, 0.12, 0.25), (0.2, 0.1, 0.3), ] configs = itertools.product(entry_combos, combo_modes, ma_fast_opts, ma_slow_opts, stop_opts, tp_opts, trailing_opts, risk_sets) results: List[SimulationResult] = [] for combo, mode, ma_fast, ma_slow, stop_pct, tp_pct, trail_mult, risk_tuple in itertools.islice(configs, limit): risk_mid, risk_high, risk_low = risk_tuple params = StrategyParams( entry_combo=combo, combo_mode=mode, ma_fast=ma_fast, ma_slow=ma_slow, stop_loss_pct=stop_pct, take_profit_pct=tp_pct, trailing_atr_mult=trail_mult, risk_pct=risk_mid, risk_pct_high_vol=risk_high, risk_pct_low_vol=risk_low, stop_loss_pct_high_vol=stop_pct * 0.8, stop_loss_pct_low_vol=stop_pct * 1.2, take_profit_pct_high_vol=tp_pct * 1.2, take_profit_pct_low_vol=tp_pct * 0.8, trailing_atr_mult_high_vol=trail_mult * 1.1, trailing_atr_mult_low_vol=max(1.2, trail_mult * 0.9), ) results.append(self._evaluate(params)) return sorted(results, key=lambda r: r.metrics.get("sharpe", -np.inf), reverse=True) def random_search(self, trials: int = 20, seed: int = 42) -> List[SimulationResult]: random.seed(seed) results = [] for _ in range(trials): params = StrategyParams( entry_combo=random.choice(self.entry_combos), combo_mode=random.choice(["OR", "AND"]), ma_fast=random.choice([3, 5, 8]), ma_slow=random.choice([34, 55]), rsi_buy=random.uniform(25, 38), mom_threshold=random.uniform(0.0008, 0.0025), risk_pct=random.choice([0.16, 0.2, 0.24]), risk_pct_high_vol=random.uniform(0.08, 0.15), risk_pct_low_vol=random.uniform(0.22, 0.32), stop_loss_pct=random.uniform(0.003, 0.01), stop_loss_pct_high_vol=random.uniform(0.003, 0.007), stop_loss_pct_low_vol=random.uniform(0.006, 0.012), take_profit_pct=random.uniform(0.008, 0.02), take_profit_pct_high_vol=random.uniform(0.012, 0.025), take_profit_pct_low_vol=random.uniform(0.008, 0.015), trailing_atr_mult=random.uniform(1.5, 2.5), trailing_atr_mult_high_vol=random.uniform(2.0, 2.8), trailing_atr_mult_low_vol=random.uniform(1.2, 2.0), time_stop_bars=random.choice([30, 45, 60]), vol_drop_exit_z=random.uniform(-1.5, -0.3), breakout_lookback=random.choice([30, 45, 60, 90]), ) results.append(self._evaluate(params)) return sorted(results, key=lambda r: r.metrics.get("CAGR", -np.inf), reverse=True) def walk_forward(self, train_bars: int = 2000, test_bars: int = 600) -> Dict[str, float]: cursor = 0 wf_metrics: List[Dict[str, float]] = [] while cursor + train_bars + test_bars < len(self.data): train_slice = self.data.iloc[cursor : cursor + train_bars].copy() test_slice = self.data.iloc[cursor + train_bars : cursor + train_bars + test_bars].copy() # 학습 구간 간이 최적화(랜덤 5번) temp_optimizer = StrategyOptimizer(train_slice, self.initial_capital) best_train = temp_optimizer.random_search(trials=5)[0] # 테스트 구간 성능 test_sim = PortfolioSimulator(test_slice, best_train.params, self.initial_capital).run() wf_metrics.append(test_sim.metrics) cursor += test_bars if not wf_metrics: return {} agg = pd.DataFrame(wf_metrics).mean().to_dict() agg["segments"] = len(wf_metrics) return agg def walk_forward_with_params(self, params: StrategyParams, train_bars: int = 2000, test_bars: int = 600) -> Dict[str, float]: cursor = 0 wf_metrics: List[Dict[str, float]] = [] while cursor + train_bars + test_bars < len(self.data): test_slice = self.data.iloc[cursor + train_bars : cursor + train_bars + test_bars].copy() if test_slice.empty: break sim = PortfolioSimulator(test_slice, params, self.initial_capital).run() wf_metrics.append(sim.metrics) cursor += test_bars if not wf_metrics: return {} agg = pd.DataFrame(wf_metrics).mean().to_dict() agg["segments"] = len(wf_metrics) return agg # ------------------------------ 스트레스 테스트 ------------------------------ # class StressTester: """하락장, 고/저변동 시나리오 재평가.""" def __init__(self, data: pd.DataFrame, params: StrategyParams, initial_capital: float = INITIAL_CAPITAL): self.data = data self.params = params self.initial_capital = initial_capital def _subset(self, mask: pd.Series) -> Optional[SimulationResult]: mask = mask.fillna(False) if not mask.any(): return None subset = self.data.loc[mask].copy() if len(subset) < 500: return None sim = PortfolioSimulator(subset, self.params, self.initial_capital) return sim.run() def run(self) -> Dict[str, Dict[str, float]]: stress_results = {} rolling_ret = self.data["Close"].pct_change(300) bear_mask = rolling_ret < -0.05 bull_mask = rolling_ret > 0.05 high_vol_mask = self.data["atr_pct"] > self.data["atr_pct"].quantile(0.7) low_vol_mask = self.data["atr_pct"] < self.data["atr_pct"].quantile(0.3) scenarios = { "bear_market": bear_mask, "bull_market": bull_mask, "high_volatility": high_vol_mask, "low_volatility": low_vol_mask, } for name, mask in scenarios.items(): res = self._subset(mask) if res: stress_results[name] = res.metrics return stress_results # ------------------------------ 리포팅/시각화 ------------------------------ # class ReportBuilder: """표, 그래프, JSON 저장 등을 담당.""" def __init__(self, output_dir: str = os.path.join(BASE_DIR, "reports")) -> None: self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) def print_table(self, results: Sequence[SimulationResult], title: str) -> None: rows = [] for rank, res in enumerate(results[:5], start=1): rows.append( { "rank": rank, "entry_combo": "+".join(res.params.entry_combo), "mode": res.params.combo_mode, "sharpe": round(res.metrics.get("sharpe", 0), 3), "CAGR": f"{res.metrics.get('CAGR', 0)*100:.2f}%", "maxDD": f"{res.metrics.get('max_drawdown', 0)*100:.2f}%", "#trades": res.metrics.get("num_trades", 0), } ) df = pd.DataFrame(rows) print(f"\n[{title}]") # tabulate 미설치 환경을 고려해 to_markdown 대신 문자열 출력 try: print(df.to_markdown(index=False)) except Exception: print(df.to_string(index=False)) def plot_equity(self, result: SimulationResult, symbol: str, export_html: Optional[str] = None) -> str: equity = result.equity_curve trades = result.trades fig = make_subplots( rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.07, row_heights=[0.7, 0.3], specs=[[{"secondary_y": True}], [{"secondary_y": False}]], ) fig.add_trace( go.Scatter(x=equity["datetime"], y=equity["equity"], name="Equity", line=dict(color="blue")), row=1, col=1, secondary_y=False, ) dd = equity["equity"] / equity["equity"].cummax() - 1 fig.add_trace( go.Scatter(x=equity["datetime"], y=dd, name="Drawdown", line=dict(color="red", width=1, dash="dot")), row=1, col=1, secondary_y=True, ) prices = self._extract_prices(result) if prices.empty: prices = result.price_history fig.add_trace( go.Candlestick( x=prices.index, open=prices["Open"], high=prices["High"], low=prices["Low"], close=prices["Close"], name="Price", ), row=2, col=1, ) entries_x = [t.entry_time for t in trades] entries_y = [t.entry_price for t in trades] exits_x = [t.exit_time for t in trades] exits_y = [t.exit_price for t in trades] fig.add_trace( go.Scatter(x=entries_x, y=entries_y, mode="markers", marker=dict(color="green", size=8), name="Entry"), row=2, col=1, ) fig.add_trace( go.Scatter(x=exits_x, y=exits_y, mode="markers", marker=dict(color="orange", size=8), name="Exit"), row=2, col=1, ) fig.update_layout(title=f"{symbol} 1분봉 자산곡선/거래포인트", xaxis_rangeslider_visible=False) if export_html: output_path = os.path.join(self.output_dir, export_html) else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = os.path.join(self.output_dir, f"{symbol}_1min_equity_{timestamp}.html") fig.write_html(output_path) return output_path def _extract_prices(self, result: SimulationResult) -> pd.DataFrame: return result.price_history.loc[ result.price_history.index.intersection(result.equity_curve["datetime"]) ] def save_json(self, data: Dict, filename: str) -> str: path = os.path.join(self.output_dir, filename) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2, default=json_default) return path # ------------------------------ 실거래 Stub ------------------------------ # class LiveTradingExecutor: """HTS2 주문 객체를 래핑하여 실거래 연동 옵션 제공.""" def __init__(self, enable: bool = False) -> None: self.enable = enable and HTS is not None self.hts = HTS() if self.enable else None def execute(self, symbol: str, side: str, amount_krw: float) -> None: if not self.enable or self.hts is None: print(f"[LIVE_DISABLED] {symbol} {side} {amount_krw:,.0f} KRW") return if side == "buy": self.hts.buyCoinMarket(symbol, amount_krw, None) else: self.hts.sellCoinMarket(symbol, 0, amount_krw) print(f"[LIVE] {symbol} {side} {amount_krw:,.0f} KRW 실행 완료") # ------------------------------ 실행 진입점 ------------------------------ # def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="1분봉 매매 전략 시뮬레이터") parser.add_argument("--symbol", default=DEFAULT_SYMBOL, help="코인 심볼 (예: BTC)") parser.add_argument("--db", default=DEFAULT_DB_PATH, help="SQLite DB 경로") parser.add_argument("--limit", type=int, default=9000, help="최대 로드 캔들 수") parser.add_argument("--optimize", nargs="*", choices=["grid", "random"], default=["grid", "random"], help="실행할 최적화 유형") parser.add_argument("--random-trials", type=int, default=20, help="랜덤 서치 반복 횟수") parser.add_argument("--walk-forward", action="store_true", help="walk-forward 검증 실행") parser.add_argument("--stress-test", action="store_true", help="스트레스 테스트 실행") parser.add_argument("--export-html", default=None, help="시각화 HTML 파일명") parser.add_argument("--live", action="store_true", help="실거래 모드 (HTS 필요)") parser.add_argument("--strategy-mode", choices=["auto", "current", "full"], default="auto", help="전략 조합 선택 모드") parser.add_argument("--entry-max-len", type=int, default=3, help="full 모드에서 사용할 최대 신호 조합 길이") return parser.parse_args() def main() -> None: args = parse_args() loader = MinuteDataLoader(args.db) raw_df = loader.load(args.symbol, limit=args.limit) enriched_df = IndicatorBuilder().enrich(raw_df) if args.strategy_mode == "current": use_full_search = False elif args.strategy_mode == "full": use_full_search = True else: use_full_search = load_strategy_mode() entry_combos = ( generate_entry_combos(max(1, args.entry_max_len)) if use_full_search else DEFAULT_ENTRY_COMBOS ) optimizer = StrategyOptimizer(enriched_df, entry_combos=entry_combos) report = ReportBuilder() best_candidates: List[SimulationResult] = [] if "grid" in args.optimize: grid_results = optimizer.grid_search() report.print_table(grid_results, "Grid Search Top 5") best_candidates.extend(grid_results[:2]) if "random" in args.optimize: random_results = optimizer.random_search(trials=args.random_trials) report.print_table(random_results, "Random Search Top 5") best_candidates.extend(random_results[:3]) if not best_candidates: raise RuntimeError("최적화 결과가 없습니다.") best_overall = max(best_candidates, key=lambda r: r.metrics.get("CAGR", -np.inf)) wf_summary = {} if args.walk_forward: wf_candidates: List[Tuple[SimulationResult, Dict[str, float]]] = [] for cand in best_candidates: wf_metrics = optimizer.walk_forward_with_params(cand.params) if wf_metrics: wf_candidates.append((cand, wf_metrics)) if wf_candidates: best_overall, wf_summary = max( wf_candidates, key=lambda item: ( item[1].get("total_return", -np.inf), item[1].get("sharpe", -np.inf), ), ) else: wf_summary = optimizer.walk_forward() print("\n[Best Strategy]") print(json.dumps(best_overall.metrics, indent=2, default=json_default)) if wf_summary: print("\n[Walk-Forward 성능]") print(json.dumps(wf_summary, indent=2, default=json_default)) stress_summary = {} if args.stress_test: stress_summary = StressTester(enriched_df, best_overall.params).run() print("\n[Stress Test]") print(json.dumps(stress_summary, indent=2, default=json_default)) html_path = report.plot_equity(best_overall, args.symbol, export_html=args.export_html) print(f"\nPlot 저장 경로: {html_path}") summary_payload = { "best_params": best_overall.params.__dict__, "best_metrics": best_overall.metrics, "walk_forward": wf_summary, "stress_test": stress_summary, "equity_html": html_path, } json_path = report.save_json(summary_payload, f"{args.symbol}_1min_summary.json") print(f"요약 JSON 저장 경로: {json_path}") if args.live: LiveTradingExecutor(enable=True).execute(args.symbol, "buy", MIN_ORDER_KRW) if __name__ == "__main__": main()