Files
Bithumb/rule_discovery.py
dsyoon e631a5701f 로고스 전략 FSM을 simulation 기본 실행에 통합한다.
수동 타점(logos_trades.json) 흐름에 맞춘 순차 매매 로직을 추가하고, python simulation.py 실행 시 로고스 백테스트·HTML을 생성한다. 규칙 탐색·BB 안전장치 개선과 함께 reports HTML은 gitignore로 제외한다.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 19:07:10 +09:00

860 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
모든 봉·캔들 특징 행렬에서 매수/매도 규칙을 탐색합니다 (인과적 백테스트).
python simulation.py discover
"""
from __future__ import annotations
import json
import random
from dataclasses import asdict, dataclass, field
from pathlib import Path
import numpy as np
import pandas as pd
from candle_features import (
FEATURE_BOOL_COLS,
build_master_feature_matrix,
interval_prefix,
)
from config import (
BUY_COOLDOWN_SEC,
BUY_MAX_BB_POS_CHASE,
DISCOVER_MAX_TRADES,
DISCOVER_TRADE_PENALTY_PCT,
DOWNLOAD_INTERVALS,
ENTRY_INTERVAL,
SELL_COOLDOWN_SEC,
SELL_MIN_BB_POS,
SIGNAL_EDGE_ONLY,
SIM_INITIAL_CASH_KRW,
SIM_MIN_ORDER_KRW,
SYMBOL,
TRADE_MIN_GAP_BARS,
TRADING_FEE_RATE,
)
from strategy import (
SIGNAL_BUY_LOWER,
SIGNAL_SELL_STOP,
SIGNAL_SELL_UPPER,
)
RULES_FILE = Path(__file__).parent / "discovered_rules.json"
# 탐색에 쓸 특징 (불리언 컬럼)
SEARCH_FEATURES: tuple[str, ...] = FEATURE_BOOL_COLS
# 상위 봉 과열·하락 차단용 부정 조건 후보
NEG_BLOCK_FEATURES: tuple[str, ...] = (
"cross_up_upper",
"above_upper",
"cross_down_lower",
"shooting_star",
"ichi_above_cloud",
"bb_zone_top",
"bb_pos_high",
)
# 탐색 규칙 적용 시 항상 매수 차단 (상단 돌파·과열)
BUY_SAFETY_BLOCK: tuple[str, ...] = (
"m3:above_upper",
"m3:cross_up_upper",
"m10:above_upper",
"m10:cross_up_upper",
)
# 연속 봉에서 오래 참 → 엣지 없으면 과다 체결
LEVEL_STATE_FEATURES: tuple[str, ...] = (
"below_lower",
"above_upper",
"inside_band",
"bb_zone_bottom",
"bb_zone_top",
"bb_pos_low",
"bb_pos_high",
"ichi_price_above_tenkan",
"ichi_price_below_tenkan",
)
@dataclass
class DiscoveredRules:
"""탐색된 매수/매도 규칙 (모든 봉 특징 조합)."""
name: str = "discovered"
buy_all: list[str] = field(default_factory=list)
buy_any: list[list[str]] = field(default_factory=list)
sell_all: list[str] = field(default_factory=list)
sell_stop: list[str] = field(default_factory=list)
train_return_pct: float = 0.0
test_return_pct: float = 0.0
full_return_pct: float = 0.0
trade_count: int = 0
def predicate_column(key: str) -> tuple[str, bool]:
"""
'm60:cross_up_lower' / 'd1:!above_upper' -> (컬럼명, negated).
"""
if ":" not in key:
raise ValueError(f"잘못된 predicate: {key}")
prefix, rest = key.split(":", 1)
neg = rest.startswith("!")
feat = rest[1:] if neg else rest
return f"{prefix}_{feat}", neg
def _predicate_feature(key: str) -> str:
"""predicate에서 특징명만 추출 (! 제외)."""
rest = key.split(":", 1)[1]
return rest[1:] if rest.startswith("!") else rest
def is_level_state_predicate(key: str) -> bool:
"""한번 참이면 여러 봉 연속 참인 상태형 조건."""
return _predicate_feature(key) in LEVEL_STATE_FEATURES
def is_weak_sell_predicate(key: str) -> bool:
"""
!cross_* / !below_* 등 — 대부분의 봉에서 참이라 매도가 과다해짐.
"""
if ":" not in key:
return False
rest = key.split(":", 1)[1]
if not rest.startswith("!"):
return False
feat = rest[1:]
if feat.startswith("cross_"):
return True
return feat in ("below_lower", "above_upper", "inside_band")
def is_blocked_buy_predicate(key: str) -> bool:
"""진입(3분) 봉의 상태형 매수 조건은 탐색에서 제외."""
pfx = interval_prefix(ENTRY_INTERVAL)
return key.startswith(f"{pfx}:") and is_level_state_predicate(key)
# 고점 추격 매수(상단 구간·과열) — 탐색·체결에서 제외
CHASE_BUY_FEATURES: tuple[str, ...] = (
"bb_zone_top",
"bb_zone_high",
"bb_pos_high",
"above_upper",
"cross_up_upper",
)
# 저점·반등 매수 트리거
VALUE_BUY_FEATURES: tuple[str, ...] = (
"cross_up_lower",
"bb_zone_bottom",
"bb_zone_low",
"hammer",
"bb_pos_low",
"ichi_tk_cross_up",
"cross_down_lower",
)
def is_chase_buy_predicate(key: str) -> bool:
"""밴드 상단·고점 추격 매수 조건."""
if ":" not in key:
return False
rest = key.split(":", 1)[1]
if rest.startswith("!"):
return False
return _predicate_feature(key) in CHASE_BUY_FEATURES
def is_value_buy_predicate(key: str) -> bool:
"""하단 돌파·반등형 매수 조건."""
if ":" not in key:
return False
rest = key.split(":", 1)[1]
if rest.startswith("!"):
return False
return _predicate_feature(key) in VALUE_BUY_FEATURES
def _entry_bb_pos_col() -> str:
return f"{interval_prefix(ENTRY_INTERVAL)}_bb_pos"
def discover_score(return_pct: float, trade_count: int) -> float:
"""탐색 목적함수: 수익률 과다 거래 패널티."""
excess = max(0, trade_count - DISCOVER_MAX_TRADES)
return return_pct - excess * DISCOVER_TRADE_PENALTY_PCT
def _rising_edge(mask: np.ndarray, i: int) -> bool:
"""i번째 봉에서 조건이 새로 참이 됐는지."""
if not bool(mask[i]):
return False
if i == 0:
return True
return not bool(mask[i - 1])
def _trigger_at(mask: np.ndarray, i: int, edge_only: bool = SIGNAL_EDGE_ONLY) -> bool:
if edge_only:
return _rising_edge(mask, i)
return bool(mask[i])
def _mask_for_keys(matrix: pd.DataFrame, keys: list[str]) -> np.ndarray:
"""AND 조건 마스크."""
n = len(matrix)
if not keys:
return np.ones(n, dtype=bool)
out = np.ones(n, dtype=bool)
for key in keys:
col, neg = predicate_column(key)
if col not in matrix.columns:
return np.zeros(n, dtype=bool)
vals = matrix[col].fillna(0).astype(bool).to_numpy()
if neg:
vals = ~vals
out &= vals
return out
def _unsafe_buy_mask(matrix: pd.DataFrame) -> np.ndarray:
"""
고점 매수 차단.
- 상단/상향돌파
- 3분 밴드 하단(low)인데 유성형 → 급등 끝물음 (5/27 사례)
"""
n = len(matrix)
unsafe = np.zeros(n, dtype=bool)
for key in BUY_SAFETY_BLOCK:
col, neg = predicate_column(key)
if neg or col not in matrix.columns:
continue
unsafe |= matrix[col].fillna(0).astype(bool).to_numpy()
if "Close" in matrix.columns:
roll_hi = matrix["Close"].astype(float).rolling(20, min_periods=5).max()
near_peak = matrix["Close"].astype(float) >= roll_hi * 0.97
if "m3_bb_pos_low" in matrix.columns and "m3_shooting_star" in matrix.columns:
# 급등 끝 고점: 밴드하단+유성형 (5/27 02:33) — 차트상 매도 구간
toxic = (
matrix["m3_bb_pos_low"].fillna(0).astype(bool)
& matrix["m3_shooting_star"].fillna(0).astype(bool)
& near_peak.fillna(False)
)
unsafe |= toxic.to_numpy()
if "m30_hammer" in matrix.columns:
# 30분 망치만으로 고점 매수 (5/27 00:00)
unsafe |= (
matrix["m30_hammer"].fillna(0).astype(bool) & near_peak.fillna(False)
).to_numpy()
if _entry_bb_pos_col() in matrix.columns:
pos = matrix[_entry_bb_pos_col()].fillna(0.5).astype(float).to_numpy()
unsafe |= pos >= BUY_MAX_BB_POS_CHASE
for key in CHASE_BUY_FEATURES:
col = f"{interval_prefix(ENTRY_INTERVAL)}_{key}"
if col in matrix.columns:
unsafe |= matrix[col].fillna(0).astype(bool).to_numpy()
return unsafe
def _value_buy_gate_mask(matrix: pd.DataFrame, group: list[str]) -> np.ndarray:
"""
매수 그룹별: 저점 트리거(value) 또는 3분 bb_pos < BUY_MAX_BB_POS_CHASE 일 때만 허용.
"""
n = len(matrix)
pos_col = _entry_bb_pos_col()
if pos_col in matrix.columns:
pos_ok = (
matrix[pos_col].fillna(0.5).astype(float).to_numpy()
< BUY_MAX_BB_POS_CHASE
)
else:
pos_ok = np.ones(n, dtype=bool)
value_keys = [k for k in group if is_value_buy_predicate(k)]
if not value_keys:
return pos_ok
value_hit = _mask_for_keys(matrix, value_keys)
return pos_ok | value_hit
def _unsafe_sell_mask(matrix: pd.DataFrame) -> np.ndarray:
"""
저점·반등 구간 매도 차단.
- 3분 bb_pos < SELL_MIN_BB_POS
- 망치·밴드 하단 구간에서 상단돌파 익절 방지 (5/26 01:48 유형)
"""
n = len(matrix)
blocked = np.zeros(n, dtype=bool)
pos_col = _entry_bb_pos_col()
if pos_col in matrix.columns:
pos = matrix[pos_col].fillna(0.5).astype(float).to_numpy()
blocked |= pos < SELL_MIN_BB_POS
pfx = interval_prefix(ENTRY_INTERVAL)
for feat in ("hammer", "bb_zone_bottom", "bb_zone_low", "bb_pos_low"):
col = f"{pfx}_{feat}"
if col in matrix.columns:
blocked |= matrix[col].fillna(0).astype(bool).to_numpy()
return blocked
def buy_mask(matrix: pd.DataFrame, rules: DiscoveredRules) -> np.ndarray:
"""
매수 마스크 = (buy_all) 또는 (buy_any 각 그룹의 AND) 중 하나 + 안전필터.
buy_any는 추가 분기(OR)이지, buy_all과의 AND가 아닙니다.
"""
n = len(matrix)
groups: list[list[str]] = []
if rules.buy_all:
groups.append(list(rules.buy_all))
for g in rules.buy_any:
if g:
groups.append(list(g))
if not groups:
return np.zeros(n, dtype=bool)
any_ok = np.zeros(n, dtype=bool)
for group in groups:
raw = _mask_for_keys(matrix, group)
any_ok |= raw & _value_buy_gate_mask(matrix, group)
return any_ok & ~_unsafe_buy_mask(matrix)
def sell_mask(matrix: pd.DataFrame, rules: DiscoveredRules, stop: bool = False) -> np.ndarray:
keys = rules.sell_stop if stop else rules.sell_all
raw = _mask_for_keys(matrix, keys)
if stop:
return raw
return raw & ~_unsafe_sell_mask(matrix)
def sanitize_rules(rules: DiscoveredRules) -> DiscoveredRules:
"""탐색 결과에서 추격 매수·무의미 조건 제거."""
rules.buy_all = [p for p in rules.buy_all if not is_chase_buy_predicate(p)]
rules.buy_any = [
[p for p in g if not is_chase_buy_predicate(p)]
for g in rules.buy_any
]
rules.buy_any = [g for g in rules.buy_any if g]
rules.sell_all = [p for p in rules.sell_all if not is_weak_sell_predicate(p)]
return rules
def _discovery_seed() -> DiscoveredRules:
"""탐색 시드: 하단 돌파 기준선 (combination_seed의 상단 추격 매수 미사용)."""
return _baseline_rules()
def generate_predicate_pool(intervals: list[int]) -> list[str]:
"""탐색 후보 predicate 목록."""
pool: list[str] = []
for iv in intervals:
pfx = interval_prefix(iv)
for feat in SEARCH_FEATURES:
pool.append(f"{pfx}:{feat}")
if iv != ENTRY_INTERVAL:
for feat in NEG_BLOCK_FEATURES:
pool.append(f"{pfx}:!{feat}")
return pool
def list_rule_signal_edges(
matrix: pd.DataFrame,
rules: DiscoveredRules,
) -> list[tuple[pd.Timestamp, str]]:
"""
전 기간 규칙 엣지 신호(체결 여부와 무관).
Returns:
(timestamp, action) — buy_signal | sell_signal | sell_stop_signal
"""
idx = matrix.index
b_mask = buy_mask(matrix, rules)
s_mask = sell_mask(matrix, rules, stop=False)
stop_mask = (
sell_mask(matrix, rules, stop=True)
if rules.sell_stop
else np.zeros(len(matrix), dtype=bool)
)
out: list[tuple[pd.Timestamp, str]] = []
for i in range(len(matrix)):
if _rising_edge(b_mask, i):
out.append((idx[i], "buy_signal"))
if _rising_edge(s_mask, i):
out.append((idx[i], "sell_signal"))
if rules.sell_stop and _rising_edge(stop_mask, i):
out.append((idx[i], "sell_stop_signal"))
return out
def generate_trade_events(
matrix: pd.DataFrame,
rules: DiscoveredRules,
) -> list[tuple[pd.Timestamp, str, str]]:
"""
규칙에 따른 체결 이벤트 목록.
Returns:
(timestamp, action, signal_name)
"""
close = matrix["Close"].astype(float).to_numpy()
idx = matrix.index
b_mask = buy_mask(matrix, rules)
s_mask = sell_mask(matrix, rules, stop=False)
stop_mask = (
sell_mask(matrix, rules, stop=True)
if rules.sell_stop
else np.zeros(len(matrix), dtype=bool)
)
events: list[tuple[pd.Timestamp, str, str]] = []
qty = 0.0
last_buy_i: int | None = None
last_sell_i: int | None = None
last_trade_i: int | None = None
for i in range(len(matrix)):
price = close[i]
if price <= 0 or np.isnan(price):
continue
ts = idx[i]
if last_trade_i is not None and i - last_trade_i < TRADE_MIN_GAP_BARS:
continue
if qty > 0:
is_stop = _trigger_at(stop_mask, i) if rules.sell_stop else False
is_sell = _trigger_at(s_mask, i)
if is_stop or is_sell:
if last_sell_i is not None:
if (ts - idx[last_sell_i]).total_seconds() < SELL_COOLDOWN_SEC:
continue
sig = SIGNAL_SELL_STOP if is_stop else SIGNAL_SELL_UPPER
events.append((ts, "sell", sig))
qty = 0.0
last_sell_i = i
last_trade_i = i
continue
if _trigger_at(b_mask, i) and qty <= 0:
if last_buy_i is not None:
if (ts - idx[last_buy_i]).total_seconds() < BUY_COOLDOWN_SEC:
continue
events.append((ts, "buy", SIGNAL_BUY_LOWER))
qty = 1.0
last_buy_i = i
last_trade_i = i
return events
def backtest_rules(
matrix: pd.DataFrame,
rules: DiscoveredRules,
df_1d: pd.DataFrame,
df_1h: pd.DataFrame,
entry_ohlc: pd.DataFrame,
) -> tuple[float, int]:
"""
HTML 시뮬과 동일한 run_backtest 로직으로 수익률 계산.
"""
from simulation import run_backtest
import strategy as st
df = entry_ohlc.loc[matrix.index].copy()
df["signal"] = ""
df["point"] = 0
df["action"] = ""
df["trend"] = ""
for ts, action, sig in generate_trade_events(matrix, rules):
if ts not in df.index:
continue
trend_at = st.get_trend_at(df_1d, df_1h, ts)
df.at[ts, "signal"] = sig
df.at[ts, "point"] = 1
df.at[ts, "action"] = action
df.at[ts, "trend"] = trend_at
res = run_backtest(df, df_1d, df_1h, config_name=rules.name)
return res.total_return_pct, res.trade_count
def _evaluate_train(
train: pd.DataFrame,
rules: DiscoveredRules,
df_1d: pd.DataFrame,
df_1h: pd.DataFrame,
entry_ohlc: pd.DataFrame,
) -> tuple[float, int, float]:
"""학습 구간 수익·거래수·목적함수 점수."""
ret, tc = backtest_rules(train, rules, df_1d, df_1h, entry_ohlc)
return ret, tc, discover_score(ret, tc)
def _baseline_rules() -> DiscoveredRules:
"""다봉 BB 하단 돌파 + 상단 돌파 기준선."""
p3 = interval_prefix(ENTRY_INTERVAL)
return DiscoveredRules(
name="baseline_bb_mtf",
buy_all=[
f"{p3}:cross_up_lower",
f"{interval_prefix(60)}:ichi_tk_bull",
f"{interval_prefix(1440)}:!ichi_below_cloud",
],
sell_all=[
f"{p3}:cross_up_upper",
f"{interval_prefix(60)}:cross_up_upper",
],
sell_stop=[f"{p3}:cross_down_lower"],
)
def _seed_from_combination_report() -> DiscoveredRules | None:
"""combination_report.json 제안 규칙."""
path = Path(__file__).parent / "combination_report.json"
if not path.exists():
return None
data = json.loads(path.read_text(encoding="utf-8"))
sug = data.get("suggested_rules") or {}
buy_all = list(sug.get("buy_all") or [])
if not buy_all:
return None
return DiscoveredRules(
name="combination_seed",
buy_all=buy_all,
buy_any=[list(g) for g in (sug.get("buy_any") or []) if g],
sell_all=list(sug.get("sell_all") or [f"{interval_prefix(ENTRY_INTERVAL)}:cross_up_upper"]),
sell_stop=list(sug.get("sell_stop") or []),
)
def greedy_search(
matrix: pd.DataFrame,
train_end: int,
pool: list[str],
seed: DiscoveredRules,
df_1d: pd.DataFrame,
df_1h: pd.DataFrame,
entry_ohlc: pd.DataFrame,
max_buy: int = 8,
max_sell: int = 6,
max_stop: int = 2,
) -> DiscoveredRules:
"""학습 구간 수익률을 올리도록 매수/매도 조건을 탐욕적으로 확장."""
train = matrix.iloc[:train_end]
best = DiscoveredRules(
name=seed.name,
buy_all=list(seed.buy_all),
buy_any=[list(g) for g in seed.buy_any],
sell_all=list(seed.sell_all),
sell_stop=list(seed.sell_stop),
)
best_ret, best_tc, best_score = _evaluate_train(
train, best, df_1d, df_1h, entry_ohlc
)
buy_pool = [
p
for p in pool
if not is_blocked_buy_predicate(p) and not is_chase_buy_predicate(p)
]
sell_pool = [p for p in pool if not is_weak_sell_predicate(p)]
improved = True
while improved:
improved = False
# 매수 AND 추가/제거
for pred in buy_pool:
if pred in best.buy_all:
trial_all = [p for p in best.buy_all if p != pred]
else:
if len(best.buy_all) >= max_buy:
continue
trial_all = best.buy_all + [pred]
trial = DiscoveredRules(
name="trial",
buy_all=trial_all,
buy_any=best.buy_any,
sell_all=best.sell_all,
sell_stop=best.sell_stop,
)
ret, tc, score = _evaluate_train(
train, trial, df_1d, df_1h, entry_ohlc
)
if score > best_score:
best_ret, best_tc, best_score = ret, tc, score
best.buy_all = trial_all
improved = True
# 매도 AND
for pred in sell_pool:
if pred in best.sell_all:
trial_s = [p for p in best.sell_all if p != pred]
else:
if len(best.sell_all) >= max_sell:
continue
trial_s = best.sell_all + [pred]
trial = DiscoveredRules(
name="trial",
buy_all=best.buy_all,
buy_any=best.buy_any,
sell_all=trial_s,
sell_stop=best.sell_stop,
)
ret, tc, score = _evaluate_train(
train, trial, df_1d, df_1h, entry_ohlc
)
if score > best_score:
best_ret, best_tc, best_score = ret, tc, score
best.sell_all = trial_s
improved = True
# 손절
stop_pool = [
p
for p in pool
if "cross_down_lower" in p
and not is_level_state_predicate(p)
]
for pred in stop_pool:
if pred in best.sell_stop:
trial_st = [p for p in best.sell_stop if p != pred]
else:
if len(best.sell_stop) >= max_stop:
continue
trial_st = best.sell_stop + [pred]
trial = DiscoveredRules(
name="trial",
buy_all=best.buy_all,
buy_any=best.buy_any,
sell_all=best.sell_all,
sell_stop=trial_st,
)
ret, tc, score = _evaluate_train(
train, trial, df_1d, df_1h, entry_ohlc
)
if score > best_score:
best_ret, best_tc, best_score = ret, tc, score
best.sell_stop = trial_st
improved = True
return sanitize_rules(best)
def try_buy_any_branches(
matrix: pd.DataFrame,
train_end: int,
base: DiscoveredRules,
pool: list[str],
df_1d: pd.DataFrame,
df_1h: pd.DataFrame,
entry_ohlc: pd.DataFrame,
max_branches: int = 8,
) -> DiscoveredRules:
"""매수 OR 분기: 다른 봉의 cross_up_lower / hammer 등."""
train = matrix.iloc[:train_end]
triggers = [
p
for p in pool
if p.endswith(":cross_up_lower")
or p.endswith(":hammer")
or p.endswith(":bb_zone_bottom")
or p.endswith(":ichi_tk_cross_up")
]
best = DiscoveredRules(
name=base.name,
buy_all=list(base.buy_all),
buy_any=[list(g) for g in base.buy_any],
sell_all=list(base.sell_all),
sell_stop=list(base.sell_stop),
)
best_ret, best_tc, best_score = _evaluate_train(
train, best, df_1d, df_1h, entry_ohlc
)
for pred in triggers[:max_branches]:
if pred in best.buy_all:
continue
trial = DiscoveredRules(
name="trial_or",
buy_all=[],
buy_any=[list(best.buy_all), [pred]],
sell_all=best.sell_all,
sell_stop=best.sell_stop,
)
if not trial.buy_any[0]:
trial.buy_any = [[pred]]
ret, tc, score = _evaluate_train(
train, trial, df_1d, df_1h, entry_ohlc
)
if score > best_score:
best_ret, best_score = ret, score
best = sanitize_rules(trial)
best.name = "discovered_or"
return sanitize_rules(best)
def random_search_refine(
matrix: pd.DataFrame,
train_end: int,
pool: list[str],
seed: DiscoveredRules,
df_1d: pd.DataFrame,
df_1h: pd.DataFrame,
entry_ohlc: pd.DataFrame,
iterations: int = 1200,
) -> DiscoveredRules:
"""무작위 변형으로 국소 최적 보완."""
train = matrix.iloc[:train_end]
best = seed
best_ret, best_tc, best_score = _evaluate_train(
train, best, df_1d, df_1h, entry_ohlc
)
rng = random.Random(42)
buy_pool = [
p
for p in pool
if not is_blocked_buy_predicate(p) and not is_chase_buy_predicate(p)
]
sell_pool = [p for p in pool if not is_weak_sell_predicate(p)]
for _ in range(iterations):
trial = DiscoveredRules(
name="rand",
buy_all=[p for p in best.buy_all],
buy_any=[list(g) for g in best.buy_any],
sell_all=[p for p in best.sell_all],
sell_stop=[p for p in best.sell_stop],
)
action = rng.choice(["add_buy", "drop_buy", "add_sell", "drop_sell", "swap_buy"])
if action == "add_buy" and len(trial.buy_all) < 6 and buy_pool:
p = rng.choice(buy_pool)
if p not in trial.buy_all:
trial.buy_all.append(p)
elif action == "drop_buy" and trial.buy_all:
trial.buy_all.pop(rng.randrange(len(trial.buy_all)))
elif action == "add_sell" and len(trial.sell_all) < 5 and sell_pool:
p = rng.choice(sell_pool)
if p not in trial.sell_all:
trial.sell_all.append(p)
elif action == "drop_sell" and trial.sell_all:
trial.sell_all.pop(rng.randrange(len(trial.sell_all)))
elif action == "swap_buy" and buy_pool:
if trial.buy_all:
trial.buy_all[rng.randrange(len(trial.buy_all))] = rng.choice(buy_pool)
trial = sanitize_rules(trial)
ret, tc, score = _evaluate_train(
train, trial, df_1d, df_1h, entry_ohlc
)
if score > best_score:
best_ret, best_score = ret, score
best = trial
best.name = "discovered_refined"
return sanitize_rules(best)
def discover_rules(frames: dict[int, pd.DataFrame]) -> DiscoveredRules:
"""전체 탐색 파이프라인."""
print("특징 행렬 생성 (모든 봉·캔들 위치/높이)...")
from config import ENTRY_INTERVAL, TREND_INTERVAL_1D, TREND_INTERVAL_1H
entry_raw = frames[ENTRY_INTERVAL]
df_1d = frames.get(TREND_INTERVAL_1D)
if df_1d is None or df_1d.empty:
df_1d = entry_raw
df_1h = frames.get(TREND_INTERVAL_1H)
if df_1h is None or df_1h.empty:
df_1h = entry_raw
matrix = build_master_feature_matrix(frames)
matrix = matrix.iloc[21:].copy()
entry_ohlc = entry_raw.iloc[21:].loc[matrix.index]
n = len(matrix)
train_end = int(n * 0.7)
intervals = sorted(frames.keys())
pool = generate_predicate_pool(intervals)
print(f" 샘플 {n}봉 | 학습 {train_end} | predicate 후보 {len(pool)}")
baseline = _discovery_seed()
br, bt = backtest_rules(
matrix.iloc[:train_end], baseline, df_1d, df_1h, entry_ohlc
)
print(f" 시드 규칙: {baseline.name} (하단돌파 매수·상단돌파 매도)")
bf, _ = backtest_rules(matrix, baseline, df_1d, df_1h, entry_ohlc)
print(f" 기준선: 학습 {br:+.2f}% | 전체 {bf:+.2f}%")
print("1단계: 탐욕적 AND 확장...")
g1 = greedy_search(matrix, train_end, pool, baseline, df_1d, df_1h, entry_ohlc)
r1, _ = backtest_rules(matrix.iloc[:train_end], g1, df_1d, df_1h, entry_ohlc)
print(f" 학습 {r1:+.2f}% | buy={g1.buy_all} sell={g1.sell_all}")
print("2단계: 매수 OR 분기(다른 봉 트리거)...")
g2 = try_buy_any_branches(matrix, train_end, g1, pool, df_1d, df_1h, entry_ohlc)
r2, _ = backtest_rules(matrix.iloc[:train_end], g2, df_1d, df_1h, entry_ohlc)
print(f" 학습 {r2:+.2f}%")
print("3단계: 무작위 정밀 탐색...")
best = g2 if r2 >= r1 else g1
g3 = random_search_refine(matrix, train_end, pool, best, df_1d, df_1h, entry_ohlc, iterations=1200)
g3 = sanitize_rules(g3)
train_ret, t_cnt = backtest_rules(
matrix.iloc[:train_end], g3, df_1d, df_1h, entry_ohlc
)
test_ret, _ = backtest_rules(
matrix.iloc[train_end:], g3, df_1d, df_1h, entry_ohlc
)
full_ret, full_cnt = backtest_rules(matrix, g3, df_1d, df_1h, entry_ohlc)
g3.train_return_pct = train_ret
g3.test_return_pct = test_ret
g3.full_return_pct = full_ret
g3.trade_count = full_cnt
g3.name = "discovered_best"
print(f"\n최종 규칙 ({g3.name})")
print(f" 매수 AND: {g3.buy_all}")
if g3.buy_any:
print(f" 매수 OR: {g3.buy_any}")
print(f" 매도 AND: {g3.sell_all}")
if g3.sell_stop:
print(f" 손절: {g3.sell_stop}")
print(
f" 학습 {train_ret:+.2f}% | 검증 {test_ret:+.2f}% | 전체 {full_ret:+.2f}% ({full_cnt}건)"
)
return g3
def save_rules(rules: DiscoveredRules, path: Path = RULES_FILE) -> None:
path.write_text(json.dumps(asdict(rules), ensure_ascii=False, indent=2), encoding="utf-8")
def rules_have_buy(rules: DiscoveredRules) -> bool:
"""매수 규칙이 하나라도 있는지."""
if rules.buy_all:
return True
return any(bool(g) for g in rules.buy_any)
def load_rules(path: Path = RULES_FILE) -> DiscoveredRules | None:
if not path.exists():
return None
data = json.loads(path.read_text(encoding="utf-8"))
rules = DiscoveredRules(**{k: data[k] for k in asdict(DiscoveredRules()).keys() if k in data})
if not rules_have_buy(rules):
return None
return rules
def load_frames(monitor) -> dict[int, pd.DataFrame]:
from mtf_bb import load_frames_from_db
return load_frames_from_db(monitor, SYMBOL)