Files
AssetMonitor/monitor_coin_1min_base.py
dsyoon c45ad151b6 init
2026-01-28 18:58:33 +09:00

394 lines
15 KiB
Python

from __future__ import annotations
import json
import os
import time
from dataclasses import asdict
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import pandas as pd
from config import (
COIN_TELEGRAM_CHAT_ID,
COIN_TELEGRAM_BOT_TOKEN,
KR_COINS,
)
from monitor_min import Monitor
from simulation_1min import (
DEFAULT_DB_PATH,
DEFAULT_ENTRY_COMBOS,
EntrySignalEngine,
IndicatorBuilder,
MinuteDataLoader,
StrategyOptimizer,
StrategyParams,
generate_entry_combos,
get_tick_size,
load_strategy_mode,
MIN_ORDER_KRW,
INITIAL_CAPITAL,
MAX_POSITION_KRW,
)
class LiveOneMinuteStrategy(Monitor):
"""공통 1분봉 실전 매매 모니터 베이스 클래스."""
def __init__(
self,
coins: Dict[str, str],
cooldown_file: str,
position_file: str,
entry_max_len: int = 3,
grid_limit: int = 4,
random_trials: int = 4,
sleep_seconds: float = 0.6,
max_order_krw: int = 200_000,
) -> None:
super().__init__(cooldown_file)
self.coins = coins
self.position_file = position_file
self.grid_limit = grid_limit
self.random_trials = random_trials
self.sleep_seconds = sleep_seconds
self.max_order_krw = max_order_krw
self.loader = MinuteDataLoader(DEFAULT_DB_PATH)
self.indicator_builder = IndicatorBuilder()
self.use_full = load_strategy_mode()
self.entry_combos = (
generate_entry_combos(entry_max_len) if self.use_full else list(DEFAULT_ENTRY_COMBOS)
)
self.positions = self._load_positions()
# ------------------------------------------------------------------ #
# Position persistence
# ------------------------------------------------------------------ #
def _load_positions(self) -> Dict[str, dict]:
if not os.path.exists(self.position_file):
return {}
try:
with open(self.position_file, "r", encoding="utf-8") as f:
raw = json.load(f)
except Exception:
return {}
return raw if isinstance(raw, dict) else {}
def _save_positions(self) -> None:
os.makedirs(os.path.dirname(self.position_file), exist_ok=True)
with open(self.position_file, "w", encoding="utf-8") as f:
json.dump(self.positions, f, ensure_ascii=False, indent=2)
# ------------------------------------------------------------------ #
# Data / Optimization helpers
# ------------------------------------------------------------------ #
def _fetch_enriched_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""
모니터링 전용 1분봉 데이터 로딩.
1) 실시간 + 저장 데이터를 혼합하는 monitor_min.get_coin_some_data를 우선 사용
2) 실패 시 DB MinuteDataLoader → API fallback 순으로 보완
"""
def normalize(df: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
if df is None or df.empty:
return None
normalized = df.copy()
if "datetime" not in normalized.columns:
normalized["datetime"] = normalized.index
normalized = normalized.reset_index(drop=True)
return normalized
source_df = None
# 1) monitor_min 방식 (API + DB 혼합)
try:
mix_df = self.get_coin_some_data(symbol, 1)
source_df = normalize(mix_df)
except Exception:
source_df = None
# 2) DB MinuteDataLoader
if source_df is None:
try:
loader_df = self.loader.load(symbol, limit=9000)
source_df = normalize(loader_df)
except Exception:
source_df = None
# 3) API fallback (get_coin_more_data)
if source_df is None:
try:
api_df = self.get_coin_more_data(symbol, 1, bong_count=4000).reset_index()
api_df.rename(columns={"index": "datetime"}, inplace=True)
source_df = api_df
except Exception:
return None
if source_df is None or source_df.empty:
return None
try:
enriched = self.indicator_builder.enrich(source_df)
except Exception:
return None
if "datetime" not in enriched.columns and "datetime" in source_df.columns:
enriched["datetime"] = source_df["datetime"]
return enriched if len(enriched) > 500 else None
def _optimize_strategy(self, symbol: str, data: pd.DataFrame) -> StrategyParams:
optimizer = StrategyOptimizer(data, INITIAL_CAPITAL, entry_combos=self.entry_combos)
best_candidates = []
try:
grid = optimizer.grid_search(limit=self.grid_limit)
if grid:
best_candidates.extend(grid[:2])
except Exception:
pass
try:
rand = optimizer.random_search(trials=self.random_trials)
if rand:
best_candidates.extend(rand[:2])
except Exception:
pass
if not best_candidates:
# fallback to default params
return StrategyParams()
best_overall = max(best_candidates, key=lambda r: r.metrics.get("CAGR", float("-inf")))
return best_overall.params
# ------------------------------------------------------------------ #
# Live trading helpers
# ------------------------------------------------------------------ #
def _can_buy(self, symbol: str) -> bool:
last_buy_dt = self.buy_cooldown.get(symbol, {}).get("buy", {}).get("datetime")
if last_buy_dt and isinstance(last_buy_dt, datetime):
diff = datetime.now() - last_buy_dt
if diff.total_seconds() < 60: # 최소 1분 쿨다운
return False
return True
def _record_buy(self, symbol: str, signal: str) -> None:
self.buy_cooldown.setdefault(symbol, {})["buy"] = {
"datetime": datetime.now(),
"signal": signal,
}
self._save_buy_cooldown()
def _record_sell(self, symbol: str, signal: str) -> None:
self.buy_cooldown.setdefault(symbol, {})["sell"] = {
"datetime": datetime.now(),
"signal": signal,
}
self._save_buy_cooldown()
def _determine_buy_amount(self, params: StrategyParams, current_price: float) -> float:
alloc = max(MIN_ORDER_KRW, INITIAL_CAPITAL * params.risk_pct)
alloc = min(alloc, self.max_order_krw, MAX_POSITION_KRW)
return alloc
def _evaluate_entry(self, data: pd.DataFrame, params: StrategyParams) -> bool:
if len(data) < params.ma_slow + 10:
return False
engine = EntrySignalEngine(params)
entry_idx = len(data) - 2
return engine.evaluate(data, entry_idx)
def _update_position_record(
self,
symbol: str,
position: dict,
field: str,
value,
) -> None:
position[field] = value
self.positions[symbol] = position
def _get_recent_timestamp(self, data: pd.DataFrame) -> str:
ts = data["datetime"].iloc[-1]
return ts.isoformat() if isinstance(ts, pd.Timestamp) else str(ts)
def _evaluate_exit(
self,
symbol: str,
data: pd.DataFrame,
position: dict,
) -> Tuple[bool, float, str, dict]:
params = StrategyParams(**position.get("params", {})) if position.get("params") else StrategyParams()
engine = EntrySignalEngine(params)
idx = len(data) - 1
low = data["Low"].iloc[idx]
high = data["High"].iloc[idx]
close = data["Close"].iloc[idx]
macro_trend = data.get("macro_trend", pd.Series([0])).iloc[idx]
entry_price = position["entry_price"]
best_price = max(position.get("best_price", entry_price), high)
atr = position.get("atr", data["atr14"].iloc[idx])
trailing_mult = position.get("trailing_mult", params.trailing_atr_mult)
stop_price = entry_price * (1 - position.get("stop_loss_pct", params.stop_loss_pct))
take_price = entry_price * (1 + position.get("take_profit_pct", params.take_profit_pct))
trail_price = best_price - atr * trailing_mult
time_stop_bars = position.get("time_stop_bars", params.time_stop_bars)
vol_drop_threshold = position.get("vol_drop_exit_z", params.vol_drop_exit_z)
sell_reason = ""
exec_price = close
executed = False
if low <= stop_price:
exec_price = stop_price - get_tick_size(stop_price)
sell_reason = "stop_loss"
executed = True
elif high >= take_price:
exec_price = take_price - get_tick_size(take_price)
sell_reason = "take_profit"
executed = True
elif low <= trail_price:
exec_price = trail_price - get_tick_size(trail_price)
sell_reason = "trailing_stop"
executed = True
else:
entry_time = position.get("entry_time")
if entry_time:
try:
entry_dt = datetime.fromisoformat(entry_time)
bars_held = max(
1,
int((data["datetime"].iloc[idx] - entry_dt).total_seconds() / 60),
)
except Exception:
bars_held = time_stop_bars + 1
else:
bars_held = time_stop_bars + 1
if bars_held >= time_stop_bars:
sell_reason = "time_stop"
executed = True
elif (
vol_drop_threshold is not None
and data.get("volume_z") is not None
and data["volume_z"].iloc[idx] <= vol_drop_threshold
):
sell_reason = "volume_drop"
executed = True
else:
reverse_idx = max(0, len(data) - 2)
if engine.evaluate(data, reverse_idx):
sell_reason = "reverse_signal"
executed = True
if executed:
sell_ratio = 0.5 if macro_trend >= 0 else 1.0
qty = position["qty"] * sell_ratio
qty = max(qty, 1e-8)
return True, qty, sell_reason, {"best_price": best_price}
return False, 0.0, "", {"best_price": best_price}
# ------------------------------------------------------------------ #
# Main monitoring routine
# ------------------------------------------------------------------ #
def monitor_once(self) -> None:
balances = {}
try:
for bal in self.getBalances():
balances[bal["currency"]] = float(bal.get("balance", 0))
except Exception:
pass
for symbol in self.coins.keys():
try:
data = self._fetch_enriched_data(symbol)
if data is None:
print(f"{symbol}: 데이터 부족으로 스킵")
continue
params = self._optimize_strategy(symbol, data)
# Exit handling first
if symbol in self.positions:
should_exit, qty, reason, updates = self._evaluate_exit(symbol, data, self.positions[symbol])
if should_exit and qty > 0:
self._execute_sell(symbol, qty, data["Close"].iloc[-1], reason)
if qty >= self.positions[symbol]["qty"]:
self.positions.pop(symbol, None)
else:
self.positions[symbol]["qty"] -= qty
self.positions[symbol].update(updates)
self._record_sell(symbol, reason)
self._save_positions()
# Entry logic
if symbol not in self.positions and self._can_buy(symbol):
if self._evaluate_entry(data, params):
entry_price = data["Close"].iloc[-1] + get_tick_size(data["Close"].iloc[-1])
buy_amount = self._determine_buy_amount(params, entry_price)
qty = max(buy_amount / entry_price, 0)
if qty <= 0:
continue
self._execute_buy(symbol, buy_amount, entry_price, params)
position_payload = {
"entry_price": entry_price,
"qty": qty,
"atr": data["atr14"].iloc[-1],
"best_price": entry_price,
"entry_time": self._get_recent_timestamp(data),
"stop_loss_pct": params.stop_loss_pct,
"take_profit_pct": params.take_profit_pct,
"trailing_mult": params.trailing_atr_mult,
"time_stop_bars": params.time_stop_bars,
"vol_drop_exit_z": params.vol_drop_exit_z,
"params": asdict(params),
}
self.positions[symbol] = position_payload
self._record_buy(symbol, "+".join(params.entry_combo))
self._save_positions()
print("{} {} ({})".format(symbol, data['datetime'].iloc[-1], len(data['datetime'])))
except Exception as exc:
print(f"[{symbol}] 오류: {exc}")
finally:
time.sleep(self.sleep_seconds)
def _execute_buy(self, symbol: str, amount_krw: float, entry_price: float, params: StrategyParams) -> None:
try:
self.hts.buyCoinMarket(symbol, amount_krw)
print(
f"[BUY] {symbol} amount={amount_krw:,.0f}KRW price=₩{entry_price:,.2f} "
f"strategy={'+'.join(params.entry_combo)}"
)
msg = (
f"[KRW-COIN]\n"
f"• 매수 {symbol} : {amount_krw:,.0f}원, 가격 ₩{entry_price:,.2f}\n"
f"• 전략 { '+'.join(params.entry_combo) }"
)
self.sendMsg(msg)
except Exception as exc:
print(f"{symbol} 매수 실패: {exc}")
def _execute_sell(self, symbol: str, qty: float, current_price: float, reason: str) -> None:
try:
self.hts.sellCoinMarket(symbol, 0, qty)
print(f"[SELL] {symbol} qty={qty:.6f} price=₩{current_price:,.2f} reason={reason}")
msg = (
f"[KRW-COIN]\n"
f"• 매도 {symbol} : 수량 {qty:.6f}, 가격 ₩{current_price:,.2f}\n"
f"• 사유 {reason}"
)
self.sendMsg(msg)
except Exception as exc:
print(f"{symbol} 매도 실패: {exc}")
def run_schedule(self, interval_seconds: int = 15) -> None:
while True:
self.monitor_once()
time.sleep(interval_seconds)
def data_timestamp_to_iso(data: pd.DataFrame) -> str:
pass