import pandas as pd from HTS2 import HTS from dateutil.relativedelta import relativedelta from datetime import datetime import sqlite3 import time try: import telegram except ImportError: telegram = None # type: ignore import requests import json import asyncio from multiprocessing import Pool import numpy as np import os from config import * import strategy class Monitor(HTS): """WLD 코인 모니터링 및 매매 실행.""" last_signal = None cooldown_file = None def __init__(self, cooldown_file='coins_buy_time.json') -> None: HTS.__init__(self) # 최근 매수 신호 저장용(파일은 [신규] 포맷으로 저장) self.last_signal: dict[str, str] = {} if cooldown_file is not None: self.cooldown_file = cooldown_file self.buy_cooldown = self._load_buy_cooldown() else: self.cooldown_file = None self.buy_cooldown = {} # ------------- Persistence ------------- def _load_buy_cooldown(self) -> dict: """load trade record file into nested dict {symbol:{'buy':{'datetime':dt,'signal':s},'sell':{...}}}""" if not os.path.exists(self.cooldown_file): return {} try: with open(self.cooldown_file, 'r', encoding='utf-8') as f: raw = json.load(f) except Exception as e: print(f"Error loading cooldown data: {e}") return {} record: dict[str, dict] = {} for symbol, value in raw.items(): # 신규 포맷: value has 'buy'/'sell' if isinstance(value, dict) and ('buy' in value or 'sell' in value): record[symbol] = {} for side in ['buy', 'sell']: side_val = value.get(side) if isinstance(side_val, dict): dt_iso = side_val.get('datetime') sig = side_val.get('signal', '') if dt_iso: try: dt_obj = datetime.fromisoformat(dt_iso) except Exception: dt_obj = None else: dt_obj = None record[symbol][side] = {'datetime': dt_obj, 'signal': sig} else: # 구 포맷 처리 (매수만 기록) try: dt_obj = None sig = '' if isinstance(value, str): dt_obj = datetime.fromisoformat(value) elif isinstance(value, dict): dt_iso = value.get('datetime') sig = value.get('signal', '') if dt_iso: dt_obj = datetime.fromisoformat(dt_iso) record.setdefault(symbol, {})['buy'] = {'datetime': dt_obj, 'signal': sig} except Exception: continue # last_signal 채우기 (buy 기준) for sym, sides in record.items(): if 'buy' in sides and sides['buy'].get('signal'): self.last_signal[sym] = sides['buy']['signal'] return record def _save_buy_cooldown(self) -> None: """save nested trade record structure""" try: data: dict[str, dict] = {} for symbol, sides in self.buy_cooldown.items(): data[symbol] = {} for side in ['buy', 'sell']: info = sides.get(side) if not info: continue dt_obj = info.get('datetime') sig = info.get('signal', '') data[symbol][side] = { 'datetime': dt_obj.isoformat() if isinstance(dt_obj, datetime) else '', 'signal': sig, } with open(self.cooldown_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Error saving cooldown data: {e}") # ------------- Telegram ------------- def _send_coin_msg(self, text: str) -> None: if telegram is None: print(f"[telegram skip] {text}") return coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN) asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text)) def sendMsg(self, msg): try: pool = Pool(12) pool.map(self._send_coin_msg, [msg]) except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def send_coin_telegram_message(self, message_list: list[str], header: str) -> None: payload = header + "\n" for i, message in enumerate(message_list): payload += message if i + 1 % 20 == 0: pool = Pool(12) pool.map(self._send_coin_msg, [payload]) payload = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(self._send_coin_msg, [payload]) # ------------- Indicators ------------- def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame: columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume'] normalized_data = data.copy() for column in columns_to_normalize: min_val = data[column].rolling(window=20).min() max_val = data[column].rolling(window=20).max() denominator = max_val - min_val normalized_data[f'{column}_Norm'] = np.where( denominator != 0, (data[column] - min_val) / denominator, 0.5, ) return normalized_data def inverse_data(self, data: pd.DataFrame) -> pd.DataFrame: """원본 data 가격 시계를 상하 대칭(글로벌 min/max 기준)으로 반전하여 하락↔상승 트렌드를 뒤집는다.""" price_cols = ['Open', 'High', 'Low', 'Close'] inv = data.copy() global_min = data[price_cols].min().min() global_max = data[price_cols].max().max() # 축 기준은 global_mid = (max+min), so transformed = max+min - price for col in price_cols: inv[col] = global_max + global_min - data[col] # Volume은 그대로 유지 inv['Volume'] = data['Volume'] # 지표 다시 계산 inv = self.normalize_data(inv) inv['MA5'] = inv['Close'].rolling(window=5).mean() inv['MA20'] = inv['Close'].rolling(window=20).mean() inv['MA40'] = inv['Close'].rolling(window=40).mean() inv['MA120'] = inv['Close'].rolling(window=120).mean() inv['MA200'] = inv['Close'].rolling(window=200).mean() inv['MA240'] = inv['Close'].rolling(window=240).mean() inv['MA720'] = inv['Close'].rolling(window=720).mean() inv['MA1440'] = inv['Close'].rolling(window=1440).mean() inv['Deviation5'] = (inv['Close'] / inv['MA5']) * 100 inv['Deviation20'] = (inv['Close'] / inv['MA20']) * 100 inv['Deviation40'] = (inv['Close'] / inv['MA40']) * 100 inv['Deviation120'] = (inv['Close'] / inv['MA120']) * 100 inv['Deviation200'] = (inv['Close'] / inv['MA200']) * 100 inv['Deviation240'] = (inv['Close'] / inv['MA240']) * 100 inv['Deviation720'] = (inv['Close'] / inv['MA720']) * 100 inv['Deviation1440'] = (inv['Close'] / inv['MA1440']) * 100 inv['golden_cross'] = (inv['MA5'] > inv['MA20']) & (inv['MA5'].shift(1) <= inv['MA20'].shift(1)) inv['MA'] = inv['Close'].rolling(window=20).mean() inv['STD'] = inv['Close'].rolling(window=20).std() inv['Upper'] = inv['MA'] + (2 * inv['STD']) inv['Lower'] = inv['MA'] - (2 * inv['STD']) return inv def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame: data = self.normalize_data(data) data['MA5'] = data['Close'].rolling(window=5).mean() data['MA20'] = data['Close'].rolling(window=20).mean() data['MA40'] = data['Close'].rolling(window=40).mean() data['MA120'] = data['Close'].rolling(window=120).mean() data['MA200'] = data['Close'].rolling(window=200).mean() data['MA240'] = data['Close'].rolling(window=240).mean() data['MA720'] = data['Close'].rolling(window=720).mean() data['MA1440'] = data['Close'].rolling(window=1440).mean() data['Deviation5'] = (data['Close'] / data['MA5']) * 100 data['Deviation20'] = (data['Close'] / data['MA20']) * 100 data['Deviation40'] = (data['Close'] / data['MA40']) * 100 data['Deviation120'] = (data['Close'] / data['MA120']) * 100 data['Deviation200'] = (data['Close'] / data['MA200']) * 100 data['Deviation240'] = (data['Close'] / data['MA240']) * 100 data['Deviation720'] = (data['Close'] / data['MA720']) * 100 data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100 data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1)) data['MA'] = data['Close'].rolling(window=20).mean() data['STD'] = data['Close'].rolling(window=20).std() data['Upper'] = data['MA'] + (2 * data['STD']) data['Lower'] = data['MA'] - (2 * data['STD']) return data # ------------- Strategy (strategy.py에 구현) ------------- def annotate_signals(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame: """strategy.annotate_signals에 위임.""" return strategy.annotate_signals( symbol, data, simulation=simulation, config=strategy.ACTIVE_CONFIG ) def _is_in_cooldown(self, symbol: str, side: str) -> bool: """매수/매도 쿨다운 여부.""" if self.cooldown_file is None: return False last_dt = self.buy_cooldown.get(symbol, {}).get(side, {}).get("datetime") if not last_dt: return False limit = BUY_COOLDOWN_SEC if side == "buy" else SELL_COOLDOWN_SEC elapsed = (datetime.now() - last_dt).total_seconds() if elapsed < limit: print(f"{symbol}: {side} 쿨다운 중 (남은 시간: {limit - elapsed:.0f}초)") return True return False def _record_trade(self, symbol: str, side: str, signal: str) -> None: """매매 기록 저장.""" if self.cooldown_file is None: return current_time = datetime.now() self.last_signal[symbol] = signal self.buy_cooldown.setdefault(symbol, {})[side] = { "datetime": current_time, "signal": signal, } self._save_buy_cooldown() def execute_trade_signal( self, symbol: str, trade: strategy.TradeSignal, balances: dict | None = None, ) -> bool: """TradeSignal 1건에 대해 현물 매수 또는 매도를 실행합니다.""" try: coin_name = KR_COINS.get(symbol, symbol) signal_name = trade.signal close = trade.close if trade.action == "sell": if self._is_in_cooldown(symbol, "sell"): return False available = 0.0 if balances and symbol in balances: available = float(balances[symbol].get("balance", 0)) if available <= 0: print(f"{symbol}: 매도 신호({signal_name}) — 보유 없음, 스킵") return False sell_amount = available * strategy.get_sell_ratio(symbol, signal_name) if sell_amount <= 0: return False self.sellCoinMarket(symbol, 0, sell_amount) self._record_trade(symbol, "sell", signal_name) print(f"{coin_name} ({symbol}) [매도 {signal_name}] ₩{close:.4f}, 수량 {sell_amount:.6f}") self.sendMsg( f"[KRW-COIN]\n• 매도 {coin_name} ({symbol}): {signal_name} ₩{close:.4f}" ) return True if self._is_in_cooldown(symbol, "buy"): return False buy_amount = strategy.get_buy_amount( symbol, signal_name, close, trend=trade.trend ) if strategy.should_double_buy(symbol, signal_name, pd.DataFrame()): buy_amount *= 2 executed = self.buyCoinMarket(symbol, buy_amount) self._record_trade(symbol, "buy", signal_name) print( f"{coin_name} ({symbol}) [매수 {signal_name}] ₩{close:.4f} " f"({buy_amount} KRW, 추세={trade.trend})" ) self.sendMsg( self.format_message( symbol, coin_name, close, signal_name, executed or buy_amount ) ) return True except Exception as e: print(f"Error trading {symbol}: {str(e)}") return False def process_wld_mtf(self, symbol: str, balances: dict | None = None) -> None: """ WLD: 전 봉(1~1440분) BB·일목 위치 조합 매매. USE_DISCOVERED_LIVE=True: discovered_rules.json + combination 특징 False: mtf_bb_policy.json BB MTF """ from config import USE_DISCOVERED_LIVE from mtf_bb import load_frames_from_db, load_policy, print_latest_states from candle_features import describe_latest_position try: frames = load_frames_from_db(self, symbol) if not frames: print(f"Data for {symbol}: 로드된 봉 없음.") return df_1d = frames.get(TREND_INTERVAL_1D) df_1h = frames.get(TREND_INTERVAL_1H) if df_1d is None or df_1d.empty: df_1d = frames.get(ENTRY_INTERVAL) if df_1h is None or df_1h.empty: df_1h = frames.get(ENTRY_INTERVAL) trend = strategy.get_trend(df_1d, df_1h) print(f"{symbol} 추세: {trend}") print("--- 봉별 BB·일목 위치 ---") for iv in sorted(frames.keys()): pos = describe_latest_position(frames[iv], iv) print( f" {pos['label']:>6} | BB {pos['bb_zone']} {pos['bb_state']:>16} | " f"일목 {pos['ichi_position']} TK={pos['ichi_tk']}" ) if USE_DISCOVERED_LIVE: print("모드: 전봉 BB·일목 조합 (discovered_rules)") trade = strategy.evaluate_discovered_live( symbol, frames, df_1d, df_1h, balances or {} ) else: policy = load_policy() or strategy.ACTIVE_MTF_POLICY cfg = strategy.ACTIVE_CONFIG print_latest_states(frames, cfg) print( f"MTF 정책: {policy.name} | 매수={policy.buy_interval}분 | " f"매도={policy.sell_interval}분" ) entry = frames.get(ENTRY_INTERVAL) trade = strategy.evaluate( symbol, entry if entry is not None else frames[policy.buy_interval], df_1h, df_1d, config=cfg, frames=frames, policy=policy, ) if trade is None: print("신호 없음") return self.execute_trade_signal(symbol, trade, balances=balances) except Exception as e: print(f"Error processing {symbol}: {str(e)}") def process_symbol( self, symbol: str, interval: int | None = None, balances: dict | None = None, use_inverse: bool = False, ) -> None: """하위 호환: MTF 전략으로 위임 (use_inverse 무시).""" self.process_wld_mtf(symbol, balances=balances) def load_balances_dict(self) -> dict: """getBalances() 결과를 currency 키 dict로 변환.""" tmps = self.getBalances() balances = {} for tmp in tmps: balances[tmp["currency"]] = { "balance": float(tmp["balance"]), "avg_buy_price": float(tmp["avg_buy_price"]), } return balances # ------------- Formatting ------------- def format_message( self, symbol: str, symbol_name: str, close: float, signal: str, buy_amount: float ) -> str: message = f"[매수] {symbol_name} ({symbol}) [{signal}]: " if int(close) >= 100: message += f"₩{close}" message += f" (₩{buy_amount})" elif int(close) >= 10: message += f"₩{close:.2f}" message += f" (₩{buy_amount:.2f})" elif int(close) >= 1: message += f"₩{close:.3f}" message += f" (₩{buy_amount:.3f})" else: message += f"₩{close:.4f}" message += f" (₩{buy_amount:.4f})" if signal != '': message += f"[{signal}]" return message # ------------- Data fetch ------------- def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None: for attempt in range(retries): try: if to is None: if interval == 1440: url = ("https://api.bithumb.com/v1/candles/days?market=KRW-{}&count=200").format(symbol) else: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) else: if interval == 1440: url = ("https://api.bithumb.com/v1/candles/days?market=KRW-{}&count=200&to={}").format(symbol, to) else: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) headers = {"accept": "application/json"} response = requests.get(url, headers=headers) json_data = json.loads(response.text) df_temp = pd.DataFrame(json_data) df_temp = df_temp.sort_index(ascending=False) if 'candle_date_time_kst' not in df_temp: return None data = pd.DataFrame() data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') data['Open'] = df_temp['opening_price'] data['Close'] = df_temp['trade_price'] data['High'] = df_temp['high_price'] data['Low'] = df_temp['low_price'] data['Volume'] = df_temp['candle_acc_trade_volume'] data = data.set_index('datetime') data = data.astype(float) data["datetime"] = data.index if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def get_coin_more_data( self, symbol: str, interval: int, bong_count: int = 3000, verbose: bool = False, ) -> pd.DataFrame: """ 빗썸 API를 반복 호출해 bong_count개까지 과거 봉을 수집합니다. Args: verbose: True면 수집 진행 상황을 출력합니다. """ to = datetime.now() data: pd.DataFrame | None = None step = 0 while data is None or len(data) < bong_count: step += 1 if data is None: chunk = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) data = chunk else: previous_count = len(data) df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) if df is not None and not df.empty: data = pd.concat([data, df], ignore_index=True) if df is None or df.empty or previous_count == len(data): if verbose: print(f" API 추가 데이터 없음 (수집 {len(data)}봉)") break if verbose and (step == 1 or step % 5 == 0 or len(data) >= bong_count): label = "일봉" if interval >= 1440 else f"{interval}분" print(f" [{label}] 요청 {step}회 — 누적 {len(data)}/{bong_count}봉") time.sleep(0.3) to = to - relativedelta(minutes=interval * 200) if data is None or data.empty: return pd.DataFrame() data = data.set_index("datetime") data = data.sort_index() data = data.drop_duplicates(keep="first") data["datetime"] = data.index return data def get_coin_saved_data( self, symbol: str, interval: int, data: pd.DataFrame, db_path: str = "coins.db" ) -> pd.DataFrame: """ coins.db에서 저장된 봉을 읽고, API로 받은 최신 봉을 DB에 반영합니다. downloader.py로 미리 적재해 두면 장기 MA 계산에 유리합니다. """ conn = sqlite3.connect(db_path) cursor = conn.cursor() table_name = f"{symbol}_{interval}" cursor.execute( f"CREATE TABLE IF NOT EXISTS {table_name} " "(CODE text, NAME text, ymdhms datetime, ymd text, hms text, " "Close REAL, Open REAL, High REAL, Low REAL, Volume REAL)" ) cursor.execute( f"CREATE INDEX IF NOT EXISTS {table_name}_idx ON {table_name}(CODE, ymdhms)" ) for i in range(1, len(data)): ymdhms = data["datetime"].iloc[-i].strftime("%Y-%m-%d %H:%M:%S") cursor.execute( f"SELECT 1 FROM {table_name} WHERE CODE = ? AND ymdhms = ?", (symbol, ymdhms), ) if not cursor.fetchone(): cursor.execute( f"INSERT INTO {table_name} " "(CODE, NAME, ymdhms, ymd, hms, Close, Open, High, Low, Volume) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( symbol, KR_COINS[symbol], ymdhms, data["datetime"].iloc[-i].strftime("%Y%m%d"), data["datetime"].iloc[-i].strftime("%H%M%S"), data["Close"].iloc[-i], data["Open"].iloc[-i], data["High"].iloc[-i], data["Low"].iloc[-i], data["Volume"].iloc[-i], ), ) else: break cursor.execute( f"SELECT Open, Close, High, Low, Volume, ymdhms AS datetime " f"FROM (SELECT Open, Close, High, Low, Volume, ymdhms " f"FROM {table_name} ORDER BY ymdhms DESC LIMIT 7000) " f"ORDER BY datetime" ) result = cursor.fetchall() conn.commit() cursor.close() conn.close() if not result: return pd.DataFrame( columns=["Open", "Close", "High", "Low", "Volume", "datetime"] ) df = pd.DataFrame( result, columns=["Open", "Close", "High", "Low", "Volume", "datetime"] ) df = df.set_index("datetime") df = df.sort_index() df["datetime"] = df.index return df def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame: """ WLD 시세: API 최신 봉 + coins.db 과거 봉 + 1분봉 최신 1개를 합칩니다. DB가 비어 있으면 API·1분봉만 사용합니다. 과거 적재는 downloader.py 실행. """ data = self.get_coin_data(symbol, interval) if data is None or data.empty: return pd.DataFrame() data_1 = self.get_coin_data(symbol, interval=1) if data_1 is not None and not data_1.empty: data_1 = data_1.copy() data_1.at[data_1.index[-1], "Volume"] = data_1["Volume"].iloc[-1] * 60 saved_data = self.get_coin_saved_data(symbol, interval, data) parts = [data] if saved_data is not None and not saved_data.empty: parts.append(saved_data) if data_1 is not None and not data_1.empty: parts.append(data_1.iloc[[-1]]) merged = pd.concat(parts, ignore_index=True) merged["datetime"] = pd.to_datetime(merged["datetime"], format="%Y-%m-%d %H:%M:%S") merged = merged.set_index("datetime") merged = merged.sort_index() merged = merged.drop_duplicates(keep="first") merged["datetime"] = merged.index return merged