from __future__ import annotations import json import os import time from dataclasses import asdict from datetime import datetime from typing import Dict, List, Optional, Tuple import pandas as pd from config import ( COIN_TELEGRAM_CHAT_ID, COIN_TELEGRAM_BOT_TOKEN, KR_COINS, ) from monitor_min import Monitor from simulation_1min import ( DEFAULT_DB_PATH, DEFAULT_ENTRY_COMBOS, EntrySignalEngine, IndicatorBuilder, MinuteDataLoader, StrategyOptimizer, StrategyParams, generate_entry_combos, get_tick_size, load_strategy_mode, MIN_ORDER_KRW, INITIAL_CAPITAL, MAX_POSITION_KRW, ) class LiveOneMinuteStrategy(Monitor): """공통 1분봉 실전 매매 모니터 베이스 클래스.""" def __init__( self, coins: Dict[str, str], cooldown_file: str, position_file: str, entry_max_len: int = 3, grid_limit: int = 4, random_trials: int = 4, sleep_seconds: float = 0.6, max_order_krw: int = 200_000, ) -> None: super().__init__(cooldown_file) self.coins = coins self.position_file = position_file self.grid_limit = grid_limit self.random_trials = random_trials self.sleep_seconds = sleep_seconds self.max_order_krw = max_order_krw self.loader = MinuteDataLoader(DEFAULT_DB_PATH) self.indicator_builder = IndicatorBuilder() self.use_full = load_strategy_mode() self.entry_combos = ( generate_entry_combos(entry_max_len) if self.use_full else list(DEFAULT_ENTRY_COMBOS) ) self.positions = self._load_positions() # ------------------------------------------------------------------ # # Position persistence # ------------------------------------------------------------------ # def _load_positions(self) -> Dict[str, dict]: if not os.path.exists(self.position_file): return {} try: with open(self.position_file, "r", encoding="utf-8") as f: raw = json.load(f) except Exception: return {} return raw if isinstance(raw, dict) else {} def _save_positions(self) -> None: os.makedirs(os.path.dirname(self.position_file), exist_ok=True) with open(self.position_file, "w", encoding="utf-8") as f: json.dump(self.positions, f, ensure_ascii=False, indent=2) # ------------------------------------------------------------------ # # Data / Optimization helpers # ------------------------------------------------------------------ # def _fetch_enriched_data(self, symbol: str) -> Optional[pd.DataFrame]: """ 모니터링 전용 1분봉 데이터 로딩. 1) 실시간 + 저장 데이터를 혼합하는 monitor_min.get_coin_some_data를 우선 사용 2) 실패 시 DB MinuteDataLoader → API fallback 순으로 보완 """ def normalize(df: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]: if df is None or df.empty: return None normalized = df.copy() if "datetime" not in normalized.columns: normalized["datetime"] = normalized.index normalized = normalized.reset_index(drop=True) return normalized source_df = None # 1) monitor_min 방식 (API + DB 혼합) try: mix_df = self.get_coin_some_data(symbol, 1) source_df = normalize(mix_df) except Exception: source_df = None # 2) DB MinuteDataLoader if source_df is None: try: loader_df = self.loader.load(symbol, limit=9000) source_df = normalize(loader_df) except Exception: source_df = None # 3) API fallback (get_coin_more_data) if source_df is None: try: api_df = self.get_coin_more_data(symbol, 1, bong_count=4000).reset_index() api_df.rename(columns={"index": "datetime"}, inplace=True) source_df = api_df except Exception: return None if source_df is None or source_df.empty: return None try: enriched = self.indicator_builder.enrich(source_df) except Exception: return None if "datetime" not in enriched.columns and "datetime" in source_df.columns: enriched["datetime"] = source_df["datetime"] return enriched if len(enriched) > 500 else None def _optimize_strategy(self, symbol: str, data: pd.DataFrame) -> StrategyParams: optimizer = StrategyOptimizer(data, INITIAL_CAPITAL, entry_combos=self.entry_combos) best_candidates = [] try: grid = optimizer.grid_search(limit=self.grid_limit) if grid: best_candidates.extend(grid[:2]) except Exception: pass try: rand = optimizer.random_search(trials=self.random_trials) if rand: best_candidates.extend(rand[:2]) except Exception: pass if not best_candidates: # fallback to default params return StrategyParams() best_overall = max(best_candidates, key=lambda r: r.metrics.get("CAGR", float("-inf"))) return best_overall.params # ------------------------------------------------------------------ # # Live trading helpers # ------------------------------------------------------------------ # def _can_buy(self, symbol: str) -> bool: last_buy_dt = self.buy_cooldown.get(symbol, {}).get("buy", {}).get("datetime") if last_buy_dt and isinstance(last_buy_dt, datetime): diff = datetime.now() - last_buy_dt if diff.total_seconds() < 60: # 최소 1분 쿨다운 return False return True def _record_buy(self, symbol: str, signal: str) -> None: self.buy_cooldown.setdefault(symbol, {})["buy"] = { "datetime": datetime.now(), "signal": signal, } self._save_buy_cooldown() def _record_sell(self, symbol: str, signal: str) -> None: self.buy_cooldown.setdefault(symbol, {})["sell"] = { "datetime": datetime.now(), "signal": signal, } self._save_buy_cooldown() def _determine_buy_amount(self, params: StrategyParams, current_price: float) -> float: alloc = max(MIN_ORDER_KRW, INITIAL_CAPITAL * params.risk_pct) alloc = min(alloc, self.max_order_krw, MAX_POSITION_KRW) return alloc def _evaluate_entry(self, data: pd.DataFrame, params: StrategyParams) -> bool: if len(data) < params.ma_slow + 10: return False engine = EntrySignalEngine(params) entry_idx = len(data) - 2 return engine.evaluate(data, entry_idx) def _update_position_record( self, symbol: str, position: dict, field: str, value, ) -> None: position[field] = value self.positions[symbol] = position def _get_recent_timestamp(self, data: pd.DataFrame) -> str: ts = data["datetime"].iloc[-1] return ts.isoformat() if isinstance(ts, pd.Timestamp) else str(ts) def _evaluate_exit( self, symbol: str, data: pd.DataFrame, position: dict, ) -> Tuple[bool, float, str, dict]: params = StrategyParams(**position.get("params", {})) if position.get("params") else StrategyParams() engine = EntrySignalEngine(params) idx = len(data) - 1 low = data["Low"].iloc[idx] high = data["High"].iloc[idx] close = data["Close"].iloc[idx] macro_trend = data.get("macro_trend", pd.Series([0])).iloc[idx] entry_price = position["entry_price"] best_price = max(position.get("best_price", entry_price), high) atr = position.get("atr", data["atr14"].iloc[idx]) trailing_mult = position.get("trailing_mult", params.trailing_atr_mult) stop_price = entry_price * (1 - position.get("stop_loss_pct", params.stop_loss_pct)) take_price = entry_price * (1 + position.get("take_profit_pct", params.take_profit_pct)) trail_price = best_price - atr * trailing_mult time_stop_bars = position.get("time_stop_bars", params.time_stop_bars) vol_drop_threshold = position.get("vol_drop_exit_z", params.vol_drop_exit_z) sell_reason = "" exec_price = close executed = False if low <= stop_price: exec_price = stop_price - get_tick_size(stop_price) sell_reason = "stop_loss" executed = True elif high >= take_price: exec_price = take_price - get_tick_size(take_price) sell_reason = "take_profit" executed = True elif low <= trail_price: exec_price = trail_price - get_tick_size(trail_price) sell_reason = "trailing_stop" executed = True else: entry_time = position.get("entry_time") if entry_time: try: entry_dt = datetime.fromisoformat(entry_time) bars_held = max( 1, int((data["datetime"].iloc[idx] - entry_dt).total_seconds() / 60), ) except Exception: bars_held = time_stop_bars + 1 else: bars_held = time_stop_bars + 1 if bars_held >= time_stop_bars: sell_reason = "time_stop" executed = True elif ( vol_drop_threshold is not None and data.get("volume_z") is not None and data["volume_z"].iloc[idx] <= vol_drop_threshold ): sell_reason = "volume_drop" executed = True else: reverse_idx = max(0, len(data) - 2) if engine.evaluate(data, reverse_idx): sell_reason = "reverse_signal" executed = True if executed: sell_ratio = 0.5 if macro_trend >= 0 else 1.0 qty = position["qty"] * sell_ratio qty = max(qty, 1e-8) return True, qty, sell_reason, {"best_price": best_price} return False, 0.0, "", {"best_price": best_price} # ------------------------------------------------------------------ # # Main monitoring routine # ------------------------------------------------------------------ # def monitor_once(self) -> None: balances = {} try: for bal in self.getBalances(): balances[bal["currency"]] = float(bal.get("balance", 0)) except Exception: pass for symbol in self.coins.keys(): try: data = self._fetch_enriched_data(symbol) if data is None: print(f"{symbol}: 데이터 부족으로 스킵") continue params = self._optimize_strategy(symbol, data) # Exit handling first if symbol in self.positions: should_exit, qty, reason, updates = self._evaluate_exit(symbol, data, self.positions[symbol]) if should_exit and qty > 0: self._execute_sell(symbol, qty, data["Close"].iloc[-1], reason) if qty >= self.positions[symbol]["qty"]: self.positions.pop(symbol, None) else: self.positions[symbol]["qty"] -= qty self.positions[symbol].update(updates) self._record_sell(symbol, reason) self._save_positions() # Entry logic if symbol not in self.positions and self._can_buy(symbol): if self._evaluate_entry(data, params): entry_price = data["Close"].iloc[-1] + get_tick_size(data["Close"].iloc[-1]) buy_amount = self._determine_buy_amount(params, entry_price) qty = max(buy_amount / entry_price, 0) if qty <= 0: continue self._execute_buy(symbol, buy_amount, entry_price, params) position_payload = { "entry_price": entry_price, "qty": qty, "atr": data["atr14"].iloc[-1], "best_price": entry_price, "entry_time": self._get_recent_timestamp(data), "stop_loss_pct": params.stop_loss_pct, "take_profit_pct": params.take_profit_pct, "trailing_mult": params.trailing_atr_mult, "time_stop_bars": params.time_stop_bars, "vol_drop_exit_z": params.vol_drop_exit_z, "params": asdict(params), } self.positions[symbol] = position_payload self._record_buy(symbol, "+".join(params.entry_combo)) self._save_positions() print("{} {} ({})".format(symbol, data['datetime'].iloc[-1], len(data['datetime']))) except Exception as exc: print(f"[{symbol}] 오류: {exc}") finally: time.sleep(self.sleep_seconds) def _execute_buy(self, symbol: str, amount_krw: float, entry_price: float, params: StrategyParams) -> None: try: self.hts.buyCoinMarket(symbol, amount_krw) print( f"[BUY] {symbol} amount={amount_krw:,.0f}KRW price=₩{entry_price:,.2f} " f"strategy={'+'.join(params.entry_combo)}" ) msg = ( f"[KRW-COIN]\n" f"• 매수 {symbol} : {amount_krw:,.0f}원, 가격 ₩{entry_price:,.2f}\n" f"• 전략 { '+'.join(params.entry_combo) }" ) self.sendMsg(msg) except Exception as exc: print(f"{symbol} 매수 실패: {exc}") def _execute_sell(self, symbol: str, qty: float, current_price: float, reason: str) -> None: try: self.hts.sellCoinMarket(symbol, 0, qty) print(f"[SELL] {symbol} qty={qty:.6f} price=₩{current_price:,.2f} reason={reason}") msg = ( f"[KRW-COIN]\n" f"• 매도 {symbol} : 수량 {qty:.6f}, 가격 ₩{current_price:,.2f}\n" f"• 사유 {reason}" ) self.sendMsg(msg) except Exception as exc: print(f"{symbol} 매도 실패: {exc}") def run_schedule(self, interval_seconds: int = 15) -> None: while True: self.monitor_once() time.sleep(interval_seconds) def data_timestamp_to_iso(data: pd.DataFrame) -> str: pass