Files
AssetMonitor/simulation_1min.py
dsyoon c45ad151b6 init
2026-01-28 18:58:33 +09:00

1057 lines
42 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
simulation_1min.py
-------------------
목표: 1분봉 기반 자동 매수/매도 전략을 최적화하고 시뮬레이션한다.
기능 개요
1) SQLITE DB(resources/coins.db)의 1분봉 데이터 로딩
2) 이동평균/RSI/Bollinger/ATR/거래량 지표 계산
3) 매수 후보 전략(모멘텀, 이동평균 교차, RSI+볼린저, 거래량 스파이크)을 조합
4) 손절·익절·트레일링스탑·시간청산·변동성/거래량 기반 매도 규칙 비교
5) 그리드 서치, 랜덤 서치, 간단한 walk-forward(롤링) 검증
6) 스트레스 테스트(하락장/고변동/저변동)와 결과 리포트 및 Plotly 시각화
7) 실거래 옵션(HTS 주문 Stub)
사용 예시
python simulation_1min.py --symbol BTC --optimize grid random --walk-forward
python simulation_1min.py --symbol XRP --export-html reports/xrp_equity.html --stress-test
"""
from __future__ import annotations
import argparse
import itertools
import json
import math
import os
import random
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
try:
from HTS2 import HTS # 실거래 연동 옵션
except ImportError: # pragma: no cover
HTS = None
# ------------------------------ 유틸 ------------------------------ #
def json_default(obj):
"""numpy, Timestamp 등을 JSON 직렬화할 때 float/str로 변환."""
if isinstance(obj, (np.floating, np.float32, np.float64)):
return float(obj)
if isinstance(obj, (np.integer, np.int32, np.int64)):
return int(obj)
if isinstance(obj, (np.bool_,)):
return bool(obj)
if isinstance(obj, (pd.Timestamp, datetime)):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
def get_tick_size(price: float) -> float:
"""국내 거래소 KRW 호가 단위를 근사."""
if price < 0.1:
return 0.0001
if price < 1:
return 0.001
if price < 10:
return 0.01
if price < 100:
return 0.1
if price < 1_000:
return 1
if price < 10_000:
return 5
if price < 100_000:
return 10
if price < 500_000:
return 50
if price < 1_000_000:
return 100
return 1_000
# ------------------------------ 공통 설정 ------------------------------ #
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
DEFAULT_DB_PATH = os.path.join(BASE_DIR, "resources", "coins.db")
DEFAULT_SYMBOL = "BTC"
FEE_RATE = 0.0004 # 매수/매도 각각 0.04%
SLIPPAGE = 0.0002 # 체결 슬리피지 가정
MIN_ORDER_KRW = 10_000
INITIAL_CAPITAL = 1_000_000
MAX_POSITION_KRW = 1_000_000
RISK_FREE_RATE = 0.0 # 단기 무위험수익률
STRATEGY_MODE_PATH = os.path.join(BASE_DIR, "resources", "strategy_mode.json")
ENTRY_SIGNAL_OPTIONS: Tuple[str, ...] = (
"ma_cross",
"rsi_dip",
"momentum_breakout",
"keltner_reversion",
"volatility_breakout",
)
DEFAULT_ENTRY_COMBOS: List[Tuple[str, ...]] = [
("ma_cross",),
("ma_cross", "rsi_dip"),
("ma_cross", "keltner_reversion"),
("keltner_reversion",),
("momentum_breakout",),
("volatility_breakout", "ma_cross"),
]
# ------------------------------ 데이터 로더 ------------------------------ #
class MinuteDataLoader:
"""SQLite에서 분봉 데이터를 읽어오는 헬퍼.
Example
-------
loader = MinuteDataLoader(DEFAULT_DB_PATH)
df = loader.load("BTC", limit=8000)
"""
def __init__(self, db_path: str) -> None:
self.db_path = db_path
def load(self, symbol: str, limit: int = 8000, interval: int = 1) -> pd.DataFrame:
table_name = f"{symbol}_{interval}"
query = (
f"SELECT ymdhms as datetime, Open, High, Low, Close, Volume "
f"FROM {table_name} ORDER BY datetime DESC LIMIT {limit}"
)
with sqlite3.connect(self.db_path) as conn:
df = pd.read_sql_query(query, conn, parse_dates=["datetime"])
if df.empty:
raise ValueError(f"{table_name} 테이블에서 데이터가 비어있습니다.")
df = df.sort_values("datetime").reset_index(drop=True)
df = df.astype(
{
"Open": float,
"High": float,
"Low": float,
"Close": float,
"Volume": float,
}
)
df["return"] = df["Close"].pct_change().fillna(0.0)
df["minute"] = df["datetime"].dt.minute
return df
# ------------------------------ 지표 도우미 ------------------------------ #
class IndicatorBuilder:
"""가격/거래량 지표 생성기."""
ROLL_LOOKBACKS: Tuple[int, ...] = (30, 45, 60, 90)
REGIME_LOOKBACK: int = 600
@staticmethod
def ema(series: pd.Series, span: int) -> pd.Series:
return series.ewm(span=span, adjust=False).mean()
@staticmethod
def rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
up = delta.clip(lower=0).rolling(period).mean()
down = -delta.clip(upper=0).rolling(period).mean()
rs = up / down.replace(0, np.nan)
return 100 - (100 / (1 + rs))
@staticmethod
def atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
high_low = df["High"] - df["Low"]
high_close = (df["High"] - df["Close"].shift()).abs()
low_close = (df["Low"] - df["Close"].shift()).abs()
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
return tr.rolling(period).mean()
def enrich(self, df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
for window in (3, 5, 8, 13, 21, 34, 55, 89):
out[f"sma_{window}"] = out["Close"].rolling(window).mean()
out[f"ema_{window}"] = self.ema(out["Close"], window)
out["rsi14"] = self.rsi(out["Close"], 14)
out["atr14"] = self.atr(out, 14)
out["atr_pct"] = out["atr14"] / out["Close"]
out["boll_mid"] = out["Close"].rolling(20).mean()
out["boll_std"] = out["Close"].rolling(20).std(ddof=0)
out["boll_up"] = out["boll_mid"] + 2 * out["boll_std"]
out["boll_low"] = out["boll_mid"] - 2 * out["boll_std"]
out["volume_ma50"] = out["Volume"].rolling(50).mean()
out["volume_std50"] = out["Volume"].rolling(50).std(ddof=0)
out["volume_std50"] = out["volume_std50"].replace(0, np.nan)
out["volume_z"] = (out["Volume"] - out["volume_ma50"]) / out["volume_std50"]
out["range_pct"] = (out["High"] - out["Low"]) / out["Close"]
out["ret_5"] = out["Close"].pct_change(5)
out["ret_15"] = out["Close"].pct_change(15)
out["weekday"] = out["datetime"].dt.weekday
out["macro_trend"] = out["ema_34"] - out["ema_89"]
# 켈트너 밴드 및 롤링 고저
out["keltner_mid"] = out["ema_21"]
out["keltner_upper"] = out["keltner_mid"] + out["atr14"] * 1.5
out["keltner_lower"] = out["keltner_mid"] - out["atr14"] * 1.5
for lookback in self.ROLL_LOOKBACKS:
out[f"roll_max_{lookback}"] = out["High"].rolling(lookback).max()
out[f"roll_min_{lookback}"] = out["Low"].rolling(lookback).min()
# 변동성 레짐 계산 (롤링 분위수 기반)
high_q = out["atr_pct"].rolling(self.REGIME_LOOKBACK, min_periods=self.REGIME_LOOKBACK).quantile(0.7)
low_q = out["atr_pct"].rolling(self.REGIME_LOOKBACK, min_periods=self.REGIME_LOOKBACK).quantile(0.3)
out["vol_regime"] = np.where(
out["atr_pct"] >= high_q,
1,
np.where(out["atr_pct"] <= low_q, -1, 0),
)
out.dropna(inplace=True)
return out
# ------------------------------ 전략 파라미터 ------------------------------ #
@dataclass
class StrategyParams:
"""단일 전략 설정 값을 담는 데이터 구조."""
entry_combo: Tuple[str, ...] = ("ma_cross",)
combo_mode: str = "OR"
ma_fast: int = 5
ma_slow: int = 21
rsi_buy: float = 32.0
rsi_exit: float = 68.0
mom_threshold: float = 0.0015
dip_sigma: float = 1.0
volume_z_buy: float = -0.5
volume_z_breakout: float = 0.5
atr_filter: Tuple[float, float] = (0.001, 0.03)
trading_hours: Tuple[int, int] = (0, 23) # 한국시간 기준 시간 필터
max_positions: int = 2
risk_pct: float = 0.2 # 보유 현금 대비
stop_loss_pct: float = 0.006 # 0.6%
take_profit_pct: float = 0.012 # 1.2%
trailing_atr_mult: float = 2.0
time_stop_bars: int = 45
vol_drop_exit_z: float = -1.0
reverse_signal_exit: bool = True
use_dynamic_risk: bool = True
risk_pct_high_vol: float = 0.12
risk_pct_low_vol: float = 0.25
stop_loss_pct_high_vol: float = 0.005
stop_loss_pct_low_vol: float = 0.007
take_profit_pct_high_vol: float = 0.012
take_profit_pct_low_vol: float = 0.009
trailing_atr_mult_high_vol: float = 2.4
trailing_atr_mult_low_vol: float = 1.8
breakout_lookback: int = 60
# ------------------------------ 전략 조합 생성 ------------------------------ #
def generate_entry_combos(max_len: int = 3) -> List[Tuple[str, ...]]:
combos: List[Tuple[str, ...]] = []
for length in range(1, max_len + 1):
combos.extend(itertools.combinations(ENTRY_SIGNAL_OPTIONS, length))
return combos
def load_strategy_mode() -> bool:
try:
with open(STRATEGY_MODE_PATH, "r", encoding="utf-8") as f:
payload = json.load(f)
return bool(payload.get("use_full_search", False))
except FileNotFoundError:
return False
except Exception:
return False
def save_strategy_mode(use_full: bool) -> None:
os.makedirs(os.path.dirname(STRATEGY_MODE_PATH), exist_ok=True)
with open(STRATEGY_MODE_PATH, "w", encoding="utf-8") as f:
json.dump({"use_full_search": bool(use_full)}, f, ensure_ascii=False, indent=2)
# ------------------------------ 매수 신호 생성기 ------------------------------ #
class EntrySignalEngine:
"""여러 매수 규칙을 함수화하여 조합."""
def __init__(self, params: StrategyParams) -> None:
self.params = params
def _ma_cross(self, df: pd.DataFrame, idx: int) -> bool:
fast = df[f"ema_{self.params.ma_fast}"]
slow = df[f"ema_{self.params.ma_slow}"]
if idx == 0:
return False
cross_up = fast.iloc[idx] > slow.iloc[idx] and fast.iloc[idx - 1] <= slow.iloc[idx - 1]
rsi_cond = df["rsi14"].iloc[idx] >= self.params.rsi_buy
volume_cond = df["volume_z"].iloc[idx] > self.params.volume_z_breakout
return bool(cross_up and rsi_cond and volume_cond)
def _rsi_bollinger_dip(self, df: pd.DataFrame, idx: int) -> bool:
price = df["Close"].iloc[idx]
lower_band = df["boll_low"].iloc[idx]
std = df["boll_std"].iloc[idx]
rsi = df["rsi14"].iloc[idx]
vol = df["volume_z"].iloc[idx]
band_touch = price <= lower_band + self.params.dip_sigma * std
rsi_ok = rsi <= self.params.rsi_buy
volume_rebound = vol >= self.params.volume_z_buy
return bool(band_touch and rsi_ok and volume_rebound)
def _momentum_breakout(self, df: pd.DataFrame, idx: int) -> bool:
price = df["Close"].iloc[idx]
upper_band = df["boll_up"].iloc[idx]
ret_5 = df["ret_5"].iloc[idx]
volume = df["volume_z"].iloc[idx]
atr_pct = df["atr_pct"].iloc[idx]
atr_ok = self.params.atr_filter[0] <= atr_pct <= self.params.atr_filter[1]
return bool(price > upper_band and ret_5 > self.params.mom_threshold and volume > self.params.volume_z_breakout and atr_ok)
def _keltner_reversion(self, df: pd.DataFrame, idx: int) -> bool:
if idx == 0 or "keltner_lower" not in df.columns:
return False
price = df["Close"].iloc[idx]
lower_band = df["keltner_lower"].iloc[idx]
trend_ok = df["ema_21"].iloc[idx] > df["ema_55"].iloc[idx]
regime = df["vol_regime"].iloc[idx] if "vol_regime" in df.columns else 0
rsi = df["rsi14"].iloc[idx]
volume_rebound = df["volume_z"].iloc[idx] > self.params.volume_z_buy
return bool(trend_ok and regime <= 0 and price <= lower_band and rsi <= self.params.rsi_buy + 5 and volume_rebound)
def _volatility_breakout(self, df: pd.DataFrame, idx: int) -> bool:
lookback = self.params.breakout_lookback
col = f"roll_max_{lookback}"
if col not in df.columns or idx == 0:
return False
regime = df["vol_regime"].iloc[idx] if "vol_regime" in df.columns else 0
if regime <= 0:
return False
breakout_level = df[col].iloc[idx - 1]
price = df["Close"].iloc[idx]
volume = df["volume_z"].iloc[idx]
momentum = df["ret_15"].iloc[idx]
trend_ok = df["macro_trend"].iloc[idx] > 0
return bool(price > breakout_level and volume > self.params.volume_z_breakout and momentum > self.params.mom_threshold * 0.8 and trend_ok)
def evaluate(self, df: pd.DataFrame, idx: int) -> bool:
hour = df["datetime"].iloc[idx].hour
if not (self.params.trading_hours[0] <= hour <= self.params.trading_hours[1]):
return False
signal_map = {
"ma_cross": self._ma_cross,
"rsi_dip": self._rsi_bollinger_dip,
"momentum_breakout": self._momentum_breakout,
"keltner_reversion": self._keltner_reversion,
"volatility_breakout": self._volatility_breakout,
}
results = []
for name in self.params.entry_combo:
func = signal_map.get(name)
if func is None:
continue
results.append(func(df, idx))
if not results:
return False
if self.params.combo_mode == "AND":
return all(results)
return any(results)
# ------------------------------ 거래 및 백테스트 ------------------------------ #
@dataclass
class Trade:
entry_time: datetime
exit_time: datetime
entry_price: float
exit_price: float
qty: float
pnl: float
return_pct: float
bars_held: int
reason: str
@dataclass
class SimulationResult:
params: StrategyParams
trades: List[Trade]
equity_curve: pd.DataFrame
metrics: Dict[str, float]
price_history: pd.DataFrame
class PortfolioSimulator:
"""포트폴리오(단일 종목) 시뮬레이션 엔진."""
def __init__(self, data: pd.DataFrame, params: StrategyParams, initial_capital: float = INITIAL_CAPITAL) -> None:
self.df = data.reset_index(drop=True)
self.params = params
self.initial_capital = initial_capital
self.signal_engine = EntrySignalEngine(params)
def _current_regime(self, idx: int) -> int:
if "vol_regime" in self.df.columns:
try:
return int(self.df["vol_regime"].iloc[idx])
except Exception:
return 0
return 0
def _resolve_risk_pct(self, regime: int) -> float:
if not self.params.use_dynamic_risk:
return self.params.risk_pct
if regime > 0:
return self.params.risk_pct_high_vol
if regime < 0:
return self.params.risk_pct_low_vol
return self.params.risk_pct
def _resolve_levels(self, regime: int) -> Tuple[float, float, float]:
stop_loss = self.params.stop_loss_pct
take_profit = self.params.take_profit_pct
trailing_mult = self.params.trailing_atr_mult
if regime > 0:
stop_loss = self.params.stop_loss_pct_high_vol
take_profit = self.params.take_profit_pct_high_vol
trailing_mult = self.params.trailing_atr_mult_high_vol
elif regime < 0:
stop_loss = self.params.stop_loss_pct_low_vol
take_profit = self.params.take_profit_pct_low_vol
trailing_mult = self.params.trailing_atr_mult_low_vol
return stop_loss, take_profit, trailing_mult
def _position_size(self, cash: float, price: float, regime: int) -> Tuple[float, float]:
max_alloc = cash * self._resolve_risk_pct(regime)
krw_to_use = max(MIN_ORDER_KRW, max_alloc)
krw_to_use = min(krw_to_use, cash, MAX_POSITION_KRW)
qty = krw_to_use / price if price > 0 else 0.0
if qty <= 0:
return 0.0, cash
return qty, cash - krw_to_use
def _adjust_buy_price(self, price: float) -> float:
tick = get_tick_size(price)
return price + tick
def _adjust_sell_price(self, price: float) -> float:
tick = get_tick_size(price)
adjusted = price - tick
return adjusted if adjusted > 0 else price
def _apply_fees(self, price: float, side: str) -> float:
fee_multiplier = 1 + FEE_RATE + (SLIPPAGE if side == "buy" else -SLIPPAGE)
if side == "buy":
return price * fee_multiplier
return price * (1 - FEE_RATE - SLIPPAGE)
def run(self) -> SimulationResult:
cash = self.initial_capital
positions: List[Dict] = []
equity_curve: List[Tuple[datetime, float]] = []
trades: List[Trade] = []
for i in range(max(self.params.ma_slow, 60), len(self.df) - 1):
current = self.df.iloc[i]
next_candle = self.df.iloc[i + 1]
# 1) 기존 포지션 청산 조건 확인
updated_positions = []
for pos in positions:
exit_reason = ""
entry_price = pos["entry_price"]
trailing_level = pos["trailing"]
best_price = max(pos["best_price"], self.df["High"].iloc[i])
stop_price = entry_price * (1 - pos["stop_loss_pct"])
take_price = entry_price * (1 + pos["take_profit_pct"])
trail_price = best_price - pos["atr"] * pos["trailing_mult"]
if trail_price > trailing_level:
trailing_level = trail_price
executed_exit = False
exit_price = next_candle["Open"]
stop_exec_price = self._adjust_sell_price(stop_price)
take_exec_price = self._adjust_sell_price(take_price)
trail_exec_price = self._adjust_sell_price(trailing_level)
# 봉 내부 시뮬레이션
low = next_candle["Low"]
high = next_candle["High"]
if low <= stop_price:
exit_price = stop_exec_price
exit_reason = "stop_loss"
executed_exit = True
elif high >= take_price:
exit_price = take_exec_price
exit_reason = "take_profit"
executed_exit = True
elif low <= trailing_level:
exit_price = trail_exec_price
exit_reason = "trailing_stop"
executed_exit = True
elif (i - pos["entry_idx"]) >= self.params.time_stop_bars:
exit_price = self._adjust_sell_price(next_candle["Open"])
exit_reason = "time_stop"
executed_exit = True
elif self.params.vol_drop_exit_z is not None and self.df["volume_z"].iloc[i] <= self.params.vol_drop_exit_z:
exit_price = self._adjust_sell_price(next_candle["Open"])
exit_reason = "volume_drop"
executed_exit = True
elif self.params.reverse_signal_exit and self.signal_engine.evaluate(self.df, i):
exit_price = self._adjust_sell_price(next_candle["Open"])
exit_reason = "reverse_signal"
executed_exit = True
if executed_exit:
trend_value = self.df["macro_trend"].iloc[i] if "macro_trend" in self.df.columns else 0.0
sell_ratio = 0.5 if trend_value >= 0 else 1.0
sell_ratio = min(max(sell_ratio, 0.0), 1.0)
qty_to_sell = pos["qty"] * sell_ratio
qty_to_sell = pos["qty"] if qty_to_sell <= 0 else qty_to_sell
exit_price_fee = self._apply_fees(exit_price, "sell")
pnl = (exit_price_fee - entry_price) * qty_to_sell
cash += qty_to_sell * exit_price_fee
trades.append(
Trade(
entry_time=pos["entry_time"],
exit_time=next_candle["datetime"],
entry_price=entry_price,
exit_price=exit_price_fee,
qty=qty_to_sell,
pnl=pnl,
return_pct=pnl / (entry_price * qty_to_sell),
bars_held=i - pos["entry_idx"],
reason=f"{exit_reason}|ratio:{sell_ratio:.2f}",
)
)
remaining_qty = pos["qty"] - qty_to_sell
if remaining_qty > 1e-8:
pos["qty"] = remaining_qty
pos["best_price"] = best_price
pos["trailing"] = trailing_level
updated_positions.append(pos)
else:
pos["best_price"] = best_price
pos["trailing"] = trailing_level
updated_positions.append(pos)
positions = updated_positions
# 2) 신규 진입
atr_pct = self.df["atr_pct"].iloc[i]
regime = self._current_regime(i)
if (
len(positions) < self.params.max_positions
and self.params.atr_filter[0] <= atr_pct <= self.params.atr_filter[1]
and self.signal_engine.evaluate(self.df, i)
):
entry_price = self._adjust_buy_price(next_candle["Open"])
entry_price_fee = self._apply_fees(entry_price, "buy")
qty, cash = self._position_size(cash, entry_price_fee, regime)
if qty > 0:
stop_loss_pct, take_profit_pct, trailing_mult = self._resolve_levels(regime)
positions.append(
{
"entry_price": entry_price_fee,
"entry_time": next_candle["datetime"],
"entry_idx": i + 1,
"qty": qty,
"atr": self.df["atr14"].iloc[i],
"best_price": entry_price_fee,
"trailing": entry_price_fee * (1 - stop_loss_pct),
"stop_loss_pct": stop_loss_pct,
"take_profit_pct": take_profit_pct,
"trailing_mult": trailing_mult,
"regime": regime,
}
)
# 3) 자산 곡선 기록
market_value = sum(pos["qty"] * self.df["Close"].iloc[i] for pos in positions)
equity_curve.append((current["datetime"], cash + market_value))
equity_df = pd.DataFrame(equity_curve, columns=["datetime", "equity"])
final_time = self.df.iloc[-1]["datetime"]
final_price = self.df.iloc[-1]["Close"]
if positions:
for pos in positions:
forced_price = self._adjust_sell_price(final_price)
exit_price_fee = self._apply_fees(forced_price, "sell")
pnl = (exit_price_fee - pos["entry_price"]) * pos["qty"]
cash += pos["qty"] * exit_price_fee
trades.append(
Trade(
entry_time=pos["entry_time"],
exit_time=final_time,
entry_price=pos["entry_price"],
exit_price=exit_price_fee,
qty=pos["qty"],
pnl=pnl,
return_pct=pnl / (pos["entry_price"] * pos["qty"]),
bars_held=len(self.df) - pos["entry_idx"],
reason="forced_exit",
)
)
if equity_df.empty or equity_df["datetime"].iloc[-1] != final_time:
equity_df = pd.concat([equity_df, pd.DataFrame({"datetime": [final_time], "equity": [cash]})], ignore_index=True)
else:
equity_df.loc[equity_df.index[-1], "equity"] = cash
metrics = self._calculate_metrics(equity_df, trades)
price_history = self.df[["datetime", "Open", "High", "Low", "Close"]].set_index("datetime")
return SimulationResult(self.params, trades, equity_df, metrics, price_history)
def _calculate_metrics(self, equity_df: pd.DataFrame, trades: List[Trade]) -> Dict[str, float]:
if equity_df.empty:
return {}
returns = equity_df["equity"].pct_change().dropna()
total_return = equity_df["equity"].iloc[-1] / equity_df["equity"].iloc[0] - 1
duration_minutes = (equity_df["datetime"].iloc[-1] - equity_df["datetime"].iloc[0]).total_seconds() / 60
years = max(duration_minutes / (60 * 24 * 365), 1e-6)
cagr = (1 + total_return) ** (1 / years) - 1
running_max = equity_df["equity"].cummax()
drawdown = equity_df["equity"] / running_max - 1
max_dd = drawdown.min()
sharpe = (returns.mean() - RISK_FREE_RATE / (365 * 24 * 60)) / (returns.std() + 1e-9) * math.sqrt(365 * 24 * 60)
win_trades = [t for t in trades if t.pnl > 0]
loss_trades = [t for t in trades if t.pnl <= 0]
profit_factor = (sum(t.pnl for t in win_trades) / abs(sum(t.pnl for t in loss_trades))) if loss_trades else np.inf
hit_ratio = len(win_trades) / len(trades) if trades else 0.0
return {
"final_equity": equity_df["equity"].iloc[-1],
"total_return": total_return,
"CAGR": cagr,
"max_drawdown": max_dd,
"sharpe": sharpe,
"num_trades": len(trades),
"hit_ratio": hit_ratio,
"profit_factor": profit_factor,
"avg_bars": np.mean([t.bars_held for t in trades]) if trades else 0.0,
}
# ------------------------------ 최적화 도구 ------------------------------ #
class StrategyOptimizer:
"""그리드/랜덤 서치 및 walk-forward 평가."""
def __init__(
self,
data: pd.DataFrame,
initial_capital: float = INITIAL_CAPITAL,
entry_combos: Optional[Sequence[Tuple[str, ...]]] = None,
):
self.data = data
self.initial_capital = initial_capital
self.entry_combos = list(entry_combos) if entry_combos else list(DEFAULT_ENTRY_COMBOS)
def _evaluate(self, params: StrategyParams) -> SimulationResult:
simulator = PortfolioSimulator(self.data, params, self.initial_capital)
return simulator.run()
def grid_search(self, limit: int = 20) -> List[SimulationResult]:
entry_combos = self.entry_combos
combo_modes = ["OR"]
ma_fast_opts = [5, 8]
ma_slow_opts = [34, 55]
stop_opts = [0.004, 0.0055]
tp_opts = [0.01, 0.013]
trailing_opts = [1.8, 2.2]
risk_sets = [
(0.18, 0.12, 0.25),
(0.2, 0.1, 0.3),
]
configs = itertools.product(entry_combos, combo_modes, ma_fast_opts, ma_slow_opts, stop_opts, tp_opts, trailing_opts, risk_sets)
results: List[SimulationResult] = []
for combo, mode, ma_fast, ma_slow, stop_pct, tp_pct, trail_mult, risk_tuple in itertools.islice(configs, limit):
risk_mid, risk_high, risk_low = risk_tuple
params = StrategyParams(
entry_combo=combo,
combo_mode=mode,
ma_fast=ma_fast,
ma_slow=ma_slow,
stop_loss_pct=stop_pct,
take_profit_pct=tp_pct,
trailing_atr_mult=trail_mult,
risk_pct=risk_mid,
risk_pct_high_vol=risk_high,
risk_pct_low_vol=risk_low,
stop_loss_pct_high_vol=stop_pct * 0.8,
stop_loss_pct_low_vol=stop_pct * 1.2,
take_profit_pct_high_vol=tp_pct * 1.2,
take_profit_pct_low_vol=tp_pct * 0.8,
trailing_atr_mult_high_vol=trail_mult * 1.1,
trailing_atr_mult_low_vol=max(1.2, trail_mult * 0.9),
)
results.append(self._evaluate(params))
return sorted(results, key=lambda r: r.metrics.get("sharpe", -np.inf), reverse=True)
def random_search(self, trials: int = 20, seed: int = 42) -> List[SimulationResult]:
random.seed(seed)
results = []
for _ in range(trials):
params = StrategyParams(
entry_combo=random.choice(self.entry_combos),
combo_mode=random.choice(["OR", "AND"]),
ma_fast=random.choice([3, 5, 8]),
ma_slow=random.choice([34, 55]),
rsi_buy=random.uniform(25, 38),
mom_threshold=random.uniform(0.0008, 0.0025),
risk_pct=random.choice([0.16, 0.2, 0.24]),
risk_pct_high_vol=random.uniform(0.08, 0.15),
risk_pct_low_vol=random.uniform(0.22, 0.32),
stop_loss_pct=random.uniform(0.003, 0.01),
stop_loss_pct_high_vol=random.uniform(0.003, 0.007),
stop_loss_pct_low_vol=random.uniform(0.006, 0.012),
take_profit_pct=random.uniform(0.008, 0.02),
take_profit_pct_high_vol=random.uniform(0.012, 0.025),
take_profit_pct_low_vol=random.uniform(0.008, 0.015),
trailing_atr_mult=random.uniform(1.5, 2.5),
trailing_atr_mult_high_vol=random.uniform(2.0, 2.8),
trailing_atr_mult_low_vol=random.uniform(1.2, 2.0),
time_stop_bars=random.choice([30, 45, 60]),
vol_drop_exit_z=random.uniform(-1.5, -0.3),
breakout_lookback=random.choice([30, 45, 60, 90]),
)
results.append(self._evaluate(params))
return sorted(results, key=lambda r: r.metrics.get("CAGR", -np.inf), reverse=True)
def walk_forward(self, train_bars: int = 2000, test_bars: int = 600) -> Dict[str, float]:
cursor = 0
wf_metrics: List[Dict[str, float]] = []
while cursor + train_bars + test_bars < len(self.data):
train_slice = self.data.iloc[cursor : cursor + train_bars].copy()
test_slice = self.data.iloc[cursor + train_bars : cursor + train_bars + test_bars].copy()
# 학습 구간 간이 최적화(랜덤 5번)
temp_optimizer = StrategyOptimizer(train_slice, self.initial_capital)
best_train = temp_optimizer.random_search(trials=5)[0]
# 테스트 구간 성능
test_sim = PortfolioSimulator(test_slice, best_train.params, self.initial_capital).run()
wf_metrics.append(test_sim.metrics)
cursor += test_bars
if not wf_metrics:
return {}
agg = pd.DataFrame(wf_metrics).mean().to_dict()
agg["segments"] = len(wf_metrics)
return agg
def walk_forward_with_params(self, params: StrategyParams, train_bars: int = 2000, test_bars: int = 600) -> Dict[str, float]:
cursor = 0
wf_metrics: List[Dict[str, float]] = []
while cursor + train_bars + test_bars < len(self.data):
test_slice = self.data.iloc[cursor + train_bars : cursor + train_bars + test_bars].copy()
if test_slice.empty:
break
sim = PortfolioSimulator(test_slice, params, self.initial_capital).run()
wf_metrics.append(sim.metrics)
cursor += test_bars
if not wf_metrics:
return {}
agg = pd.DataFrame(wf_metrics).mean().to_dict()
agg["segments"] = len(wf_metrics)
return agg
# ------------------------------ 스트레스 테스트 ------------------------------ #
class StressTester:
"""하락장, 고/저변동 시나리오 재평가."""
def __init__(self, data: pd.DataFrame, params: StrategyParams, initial_capital: float = INITIAL_CAPITAL):
self.data = data
self.params = params
self.initial_capital = initial_capital
def _subset(self, mask: pd.Series) -> Optional[SimulationResult]:
mask = mask.fillna(False)
if not mask.any():
return None
subset = self.data.loc[mask].copy()
if len(subset) < 500:
return None
sim = PortfolioSimulator(subset, self.params, self.initial_capital)
return sim.run()
def run(self) -> Dict[str, Dict[str, float]]:
stress_results = {}
rolling_ret = self.data["Close"].pct_change(300)
bear_mask = rolling_ret < -0.05
bull_mask = rolling_ret > 0.05
high_vol_mask = self.data["atr_pct"] > self.data["atr_pct"].quantile(0.7)
low_vol_mask = self.data["atr_pct"] < self.data["atr_pct"].quantile(0.3)
scenarios = {
"bear_market": bear_mask,
"bull_market": bull_mask,
"high_volatility": high_vol_mask,
"low_volatility": low_vol_mask,
}
for name, mask in scenarios.items():
res = self._subset(mask)
if res:
stress_results[name] = res.metrics
return stress_results
# ------------------------------ 리포팅/시각화 ------------------------------ #
class ReportBuilder:
"""표, 그래프, JSON 저장 등을 담당."""
def __init__(self, output_dir: str = os.path.join(BASE_DIR, "reports")) -> None:
self.output_dir = output_dir
os.makedirs(self.output_dir, exist_ok=True)
def print_table(self, results: Sequence[SimulationResult], title: str) -> None:
rows = []
for rank, res in enumerate(results[:5], start=1):
rows.append(
{
"rank": rank,
"entry_combo": "+".join(res.params.entry_combo),
"mode": res.params.combo_mode,
"sharpe": round(res.metrics.get("sharpe", 0), 3),
"CAGR": f"{res.metrics.get('CAGR', 0)*100:.2f}%",
"maxDD": f"{res.metrics.get('max_drawdown', 0)*100:.2f}%",
"#trades": res.metrics.get("num_trades", 0),
}
)
df = pd.DataFrame(rows)
print(f"\n[{title}]")
# tabulate 미설치 환경을 고려해 to_markdown 대신 문자열 출력
try:
print(df.to_markdown(index=False))
except Exception:
print(df.to_string(index=False))
def plot_equity(self, result: SimulationResult, symbol: str, export_html: Optional[str] = None) -> str:
equity = result.equity_curve
trades = result.trades
fig = make_subplots(
rows=2,
cols=1,
shared_xaxes=True,
vertical_spacing=0.07,
row_heights=[0.7, 0.3],
specs=[[{"secondary_y": True}], [{"secondary_y": False}]],
)
fig.add_trace(
go.Scatter(x=equity["datetime"], y=equity["equity"], name="Equity", line=dict(color="blue")),
row=1,
col=1,
secondary_y=False,
)
dd = equity["equity"] / equity["equity"].cummax() - 1
fig.add_trace(
go.Scatter(x=equity["datetime"], y=dd, name="Drawdown", line=dict(color="red", width=1, dash="dot")),
row=1,
col=1,
secondary_y=True,
)
prices = self._extract_prices(result)
if prices.empty:
prices = result.price_history
fig.add_trace(
go.Candlestick(
x=prices.index,
open=prices["Open"],
high=prices["High"],
low=prices["Low"],
close=prices["Close"],
name="Price",
),
row=2,
col=1,
)
entries_x = [t.entry_time for t in trades]
entries_y = [t.entry_price for t in trades]
exits_x = [t.exit_time for t in trades]
exits_y = [t.exit_price for t in trades]
fig.add_trace(
go.Scatter(x=entries_x, y=entries_y, mode="markers", marker=dict(color="green", size=8), name="Entry"),
row=2,
col=1,
)
fig.add_trace(
go.Scatter(x=exits_x, y=exits_y, mode="markers", marker=dict(color="orange", size=8), name="Exit"),
row=2,
col=1,
)
fig.update_layout(title=f"{symbol} 1분봉 자산곡선/거래포인트", xaxis_rangeslider_visible=False)
if export_html:
output_path = os.path.join(self.output_dir, export_html)
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(self.output_dir, f"{symbol}_1min_equity_{timestamp}.html")
fig.write_html(output_path)
return output_path
def _extract_prices(self, result: SimulationResult) -> pd.DataFrame:
return result.price_history.loc[
result.price_history.index.intersection(result.equity_curve["datetime"])
]
def save_json(self, data: Dict, filename: str) -> str:
path = os.path.join(self.output_dir, filename)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, default=json_default)
return path
# ------------------------------ 실거래 Stub ------------------------------ #
class LiveTradingExecutor:
"""HTS2 주문 객체를 래핑하여 실거래 연동 옵션 제공."""
def __init__(self, enable: bool = False) -> None:
self.enable = enable and HTS is not None
self.hts = HTS() if self.enable else None
def execute(self, symbol: str, side: str, amount_krw: float) -> None:
if not self.enable or self.hts is None:
print(f"[LIVE_DISABLED] {symbol} {side} {amount_krw:,.0f} KRW")
return
if side == "buy":
self.hts.buyCoinMarket(symbol, amount_krw, None)
else:
self.hts.sellCoinMarket(symbol, 0, amount_krw)
print(f"[LIVE] {symbol} {side} {amount_krw:,.0f} KRW 실행 완료")
# ------------------------------ 실행 진입점 ------------------------------ #
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="1분봉 매매 전략 시뮬레이터")
parser.add_argument("--symbol", default=DEFAULT_SYMBOL, help="코인 심볼 (예: BTC)")
parser.add_argument("--db", default=DEFAULT_DB_PATH, help="SQLite DB 경로")
parser.add_argument("--limit", type=int, default=9000, help="최대 로드 캔들 수")
parser.add_argument("--optimize", nargs="*", choices=["grid", "random"], default=["grid", "random"], help="실행할 최적화 유형")
parser.add_argument("--random-trials", type=int, default=20, help="랜덤 서치 반복 횟수")
parser.add_argument("--walk-forward", action="store_true", help="walk-forward 검증 실행")
parser.add_argument("--stress-test", action="store_true", help="스트레스 테스트 실행")
parser.add_argument("--export-html", default=None, help="시각화 HTML 파일명")
parser.add_argument("--live", action="store_true", help="실거래 모드 (HTS 필요)")
parser.add_argument("--strategy-mode", choices=["auto", "current", "full"], default="auto", help="전략 조합 선택 모드")
parser.add_argument("--entry-max-len", type=int, default=3, help="full 모드에서 사용할 최대 신호 조합 길이")
return parser.parse_args()
def main() -> None:
args = parse_args()
loader = MinuteDataLoader(args.db)
raw_df = loader.load(args.symbol, limit=args.limit)
enriched_df = IndicatorBuilder().enrich(raw_df)
if args.strategy_mode == "current":
use_full_search = False
elif args.strategy_mode == "full":
use_full_search = True
else:
use_full_search = load_strategy_mode()
entry_combos = (
generate_entry_combos(max(1, args.entry_max_len)) if use_full_search else DEFAULT_ENTRY_COMBOS
)
optimizer = StrategyOptimizer(enriched_df, entry_combos=entry_combos)
report = ReportBuilder()
best_candidates: List[SimulationResult] = []
if "grid" in args.optimize:
grid_results = optimizer.grid_search()
report.print_table(grid_results, "Grid Search Top 5")
best_candidates.extend(grid_results[:2])
if "random" in args.optimize:
random_results = optimizer.random_search(trials=args.random_trials)
report.print_table(random_results, "Random Search Top 5")
best_candidates.extend(random_results[:3])
if not best_candidates:
raise RuntimeError("최적화 결과가 없습니다.")
best_overall = max(best_candidates, key=lambda r: r.metrics.get("CAGR", -np.inf))
wf_summary = {}
if args.walk_forward:
wf_candidates: List[Tuple[SimulationResult, Dict[str, float]]] = []
for cand in best_candidates:
wf_metrics = optimizer.walk_forward_with_params(cand.params)
if wf_metrics:
wf_candidates.append((cand, wf_metrics))
if wf_candidates:
best_overall, wf_summary = max(
wf_candidates,
key=lambda item: (
item[1].get("total_return", -np.inf),
item[1].get("sharpe", -np.inf),
),
)
else:
wf_summary = optimizer.walk_forward()
print("\n[Best Strategy]")
print(json.dumps(best_overall.metrics, indent=2, default=json_default))
if wf_summary:
print("\n[Walk-Forward 성능]")
print(json.dumps(wf_summary, indent=2, default=json_default))
stress_summary = {}
if args.stress_test:
stress_summary = StressTester(enriched_df, best_overall.params).run()
print("\n[Stress Test]")
print(json.dumps(stress_summary, indent=2, default=json_default))
html_path = report.plot_equity(best_overall, args.symbol, export_html=args.export_html)
print(f"\nPlot 저장 경로: {html_path}")
summary_payload = {
"best_params": best_overall.params.__dict__,
"best_metrics": best_overall.metrics,
"walk_forward": wf_summary,
"stress_test": stress_summary,
"equity_html": html_path,
}
json_path = report.save_json(summary_payload, f"{args.symbol}_1min_summary.json")
print(f"요약 JSON 저장 경로: {json_path}")
if args.live:
LiveTradingExecutor(enable=True).execute(args.symbol, "buy", MIN_ORDER_KRW)
if __name__ == "__main__":
main()