import pandas as pd from HTS2 import HTS from dateutil.relativedelta import relativedelta from datetime import datetime, timedelta import sqlite3 import telegram import time import requests import json import asyncio from multiprocessing import Pool import schedule from config import * import FinanceDataReader as fdr import numpy as np import os class Monitor: """자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스""" cooldown_file = None def __init__(self, cooldown_file='coins_buy_time.json') -> None: self.hts = HTS() if cooldown_file is not None: self.cooldown_file = cooldown_file self.buy_cooldown = self._load_buy_cooldown() # ------------- Persistence ------------- def _load_buy_cooldown(self) -> dict: if os.path.exists(self.cooldown_file): try: with open(self.cooldown_file, 'r', encoding='utf-8') as f: data = json.load(f) cooldown: dict[str, datetime] = {} for symbol, time_str in data.items(): cooldown[symbol] = datetime.fromisoformat(time_str) return cooldown except Exception as e: print(f"Error loading cooldown data: {e}") return {} return {} def _save_buy_cooldown(self) -> None: try: data: dict[str, str] = {} for symbol, dt in self.buy_cooldown.items(): data[symbol] = dt.isoformat() with open(self.cooldown_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Error saving cooldown data: {e}") # ------------- Telegram ------------- def _send_coin_msg(self, text: str) -> None: coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN) asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text)) def _send_stock_msg(self, text: str) -> None: stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN) asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text)) def send_coin_telegram_message(self, message_list: list[str], header: str) -> None: payload = header + "\n" for i, message in enumerate(message_list): payload += message if i + 1 % 20 == 0: pool = Pool(12) pool.map(self._send_coin_msg, [payload]) payload = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(self._send_coin_msg, [payload]) def send_stock_telegram_message(self, message_list: list[str], header: str) -> None: payload = header + "\n" for i, message in enumerate(message_list): payload += message if i + 1 % 20 == 0: pool = Pool(12) pool.map(self._send_stock_msg, [payload]) payload = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(self._send_stock_msg, [payload]) # ------------- Indicators ------------- def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame: columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume'] normalized_data = data.copy() for column in columns_to_normalize: min_val = data[column].rolling(window=20).min() max_val = data[column].rolling(window=20).max() denominator = max_val - min_val normalized_data[f'{column}_Norm'] = np.where( denominator != 0, (data[column] - min_val) / denominator, 0.5, ) return normalized_data def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame: data = self.normalize_data(data) data['MA5'] = data['Close'].rolling(window=5).mean() data['MA20'] = data['Close'].rolling(window=20).mean() data['MA40'] = data['Close'].rolling(window=40).mean() data['MA120'] = data['Close'].rolling(window=120).mean() data['MA200'] = data['Close'].rolling(window=200).mean() data['MA240'] = data['Close'].rolling(window=240).mean() data['MA720'] = data['Close'].rolling(window=720).mean() data['MA1440'] = data['Close'].rolling(window=1440).mean() data['Deviation5'] = (data['Close'] / data['MA5']) * 100 data['Deviation20'] = (data['Close'] / data['MA20']) * 100 data['Deviation40'] = (data['Close'] / data['MA40']) * 100 data['Deviation120'] = (data['Close'] / data['MA120']) * 100 data['Deviation200'] = (data['Close'] / data['MA200']) * 100 data['Deviation240'] = (data['Close'] / data['MA240']) * 100 data['Deviation720'] = (data['Close'] / data['MA720']) * 100 data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100 data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1)) data['MA'] = data['Close'].rolling(window=20).mean() data['STD'] = data['Close'].rolling(window=20).std() data['Upper'] = data['MA'] + (2 * data['STD']) data['Lower'] = data['MA'] - (2 * data['STD']) return data # ------------- Strategy ------------- def buy_ticker(self, symbol: str, data: pd.DataFrame) -> bool: try: current_time = datetime.now() if data['buy_signal'].iloc[-1] == 'fall_5p': buy_amount = 100000 if symbol in self.buy_cooldown: time_diff = current_time - self.buy_cooldown[symbol] if time_diff.total_seconds() < 600: print(f"{symbol}: 매수 금지 중 (남은 시간: {600 - time_diff.total_seconds():.0f}초)") return False else: if symbol in self.buy_cooldown: time_diff = current_time - self.buy_cooldown[symbol] if time_diff.total_seconds() < 1200: print(f"{symbol}: 매수 금지 중 (남은 시간: {1200 - time_diff.total_seconds():.0f}초)") return False buy_amount = 5100 if data['buy_signal'].iloc[-1] == 'movingaverage': buy_amount = 7000 elif data['buy_signal'].iloc[-1] == 'deviation40': buy_amount = 10000 elif data['buy_signal'].iloc[-1] == 'deviation240': buy_amount = 6000 elif data['buy_signal'].iloc[-1] == 'deviation1440': if symbol in ['BONK', 'PEPE', 'TON']: buy_amount = 30000 else: buy_amount = 50000 _ = self.hts.buyCoinMarket(symbol, buy_amount) if self.cooldown_file is not None: self.buy_cooldown[symbol] = current_time self._save_buy_cooldown() print(f"{KR_COINS[symbol]} ({symbol}) [{data['buy_signal'].iloc[-1]}], 현재가: {data['Close'].iloc[-1]:.4f}, 20분간 매수 금지 시작") try: pool = Pool(12) pool.map(self._send_coin_msg, [ "[KRW-COIN]" + "\n" + self.format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['buy_signal'].iloc[-1]) ]) except Exception as e: print(f"Error sending Telegram message: {str(e)}") except Exception as e: print(f"Error buying {symbol}: {str(e)}") return False return True def check_buy_point(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame: data = data.copy() data['buy_signal'] = '' data['buy_point'] = 0 if data['buy_point'].iloc[-1] != 1: for i in range(1, len(data)): if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \ all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \ data['MA720'].iloc[i] < data['MA1440'].iloc[i]: data.at[data.index[i], 'buy_signal'] = 'movingaverage' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'movingaverage' data.at[data.index[-1], 'buy_point'] = 1 if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90: data.at[data.index[i], 'buy_signal'] = 'deviation40' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation40' data.at[data.index[-1], 'buy_point'] = 1 if symbol not in ['BONK']: if symbol in ['TRX']: if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98: data.at[data.index[i], 'buy_signal'] = 'deviation240' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation240' data.at[data.index[-1], 'buy_point'] = 1 else: if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90: data.at[data.index[i], 'buy_signal'] = 'deviation240' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation240' data.at[data.index[-1], 'buy_point'] = 1 if symbol in ['TON']: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89: data.at[data.index[i], 'buy_signal'] = 'deviation1440' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation1440' data.at[data.index[-1], 'buy_point'] = 1 elif symbol in ['XRP']: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90: data.at[data.index[i], 'buy_signal'] = 'deviation1440' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation1440' data.at[data.index[-1], 'buy_point'] = 1 elif symbol in ['BONK']: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76: data.at[data.index[i], 'buy_signal'] = 'deviation1440' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation1440' data.at[data.index[-1], 'buy_point'] = 1 else: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80: data.at[data.index[i], 'buy_signal'] = 'deviation1440' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation1440' data.at[data.index[-1], 'buy_point'] = 1 try: prev_low = data['Low'].iloc[i - 1] curr_close = data['Close'].iloc[i] cond_close_drop = curr_close <= prev_low * 0.95 if cond_close_drop: data.at[data.index[i], 'buy_signal'] = 'fall_5p' data.at[data.index[i], 'buy_point'] = 1 if not simulation and data['buy_point'][-3:].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'fall_5p' data.at[data.index[-1], 'buy_point'] = 1 except Exception: pass return data # ------------- Formatting ------------- def format_message(self, market_type: str, symbol: str, symbol_name: str, close: float, buy_signal: str) -> str: message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} " message += f"현재가: {'$' if market_type == 'US' else '₩'}{close:.4f}, " return message def format_ma_message(self, info: dict, market_type: str) -> str: prefix = '상승 ' if info.get('alert') else '' message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) " message += f"현재가: {'$' if market_type == 'US' else '₩'}{info['price']:.4f} \n" return message # ------------- Data fetch ------------- def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None: for attempt in range(retries): try: if to is None: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) else: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) headers = {"accept": "application/json"} response = requests.get(url, headers=headers) json_data = json.loads(response.text) df_temp = pd.DataFrame(json_data) df_temp = df_temp.sort_index(ascending=False) if 'candle_date_time_kst' not in df_temp: return None data = pd.DataFrame() data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') data['Open'] = df_temp['opening_price'] data['Close'] = df_temp['trade_price'] data['High'] = df_temp['high_price'] data['Low'] = df_temp['low_price'] data['Volume'] = df_temp['candle_acc_trade_volume'] data = data.set_index('datetime') data = data.astype(float) data["datetime"] = data.index if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame: to = datetime.now() data: pd.DataFrame | None = None while data is None or len(data) < bong_count: if data is None: data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) else: previous_count = len(data) df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) data = pd.concat([data, df], ignore_index=True) if previous_count == len(data): break time.sleep(0.3) to = to - relativedelta(minutes=interval * 200) data = data.set_index('datetime') data = data.sort_index() data = data.drop_duplicates(keep='first') data["datetime"] = data.index return data def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame: conn = sqlite3.connect('coins.db') cursor = conn.cursor() for i in range(1, len(data)): cursor.execute( "SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?", (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval), ) arr = cursor.fetchone() if not arr: cursor.execute( "INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( interval, symbol, KR_COINS[symbol], data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), data['datetime'].iloc[-i].strftime('%Y%m%d'), data['datetime'].iloc[-i].strftime('%H%M%S'), data['Close'].iloc[-i], data['Open'].iloc[-i], data['High'].iloc[-i], data['Low'].iloc[-i], data['Volume'].iloc[-i], ), ) else: break cursor.execute( "select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from " + symbol + " order by ymdhms desc limit 5000) subquery order by datetime" ) result = cursor.fetchall() conn.commit() cursor.close() conn.close() df = pd.DataFrame(result) df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime'] df = df.set_index('datetime') df = df.sort_index() df['datetime'] = df.index return df def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame: data = self.get_coin_data(symbol, interval) data_1 = self.get_coin_data(symbol, interval=1) data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60 saved_data = self.get_coin_saved_data(symbol, interval, data) data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True) data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S') data = data.set_index('datetime') data = data.sort_index() data = data.drop_duplicates(keep='first') data["datetime"] = data.index return data def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None: for attempt in range(retries): try: end = datetime.now() start = end - timedelta(days=300) data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')) if not data.empty: data = data.rename(columns={ 'Open': 'Open', 'High': 'High', 'Low': 'Low', 'Close': 'Close', 'Volume': 'Volume', }) return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(2) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None