import pandas as pd from HTS2 import HTS from dateutil.relativedelta import relativedelta from datetime import datetime, timedelta import sqlite3 import telegram import time import requests import json import asyncio from multiprocessing import Pool import FinanceDataReader as fdr import numpy as np import os from config import * from HTS2 import HTS class Monitor(HTS): """자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스""" last_signal = None cooldown_file = None def __init__(self, cooldown_file='./resources/coins_buy_time.json') -> None: self.hts = HTS() # 최근 매수 신호 저장용(파일은 [신규] 포맷으로 저장) self.last_signal: dict[str, str] = {} if cooldown_file is not None: self.cooldown_file = cooldown_file self.buy_cooldown = self._load_buy_cooldown() # ------------- Persistence ------------- def _load_buy_cooldown(self) -> dict: """load trade record file into nested dict {symbol:{'buy':{'datetime':dt,'signal':s},'sell':{...}}}""" if not os.path.exists(self.cooldown_file): return {} try: with open(self.cooldown_file, 'r', encoding='utf-8') as f: raw = json.load(f) except Exception as e: print(f"Error loading cooldown data: {e}") return {} record: dict[str, dict] = {} for symbol, value in raw.items(): # 신규 포맷: value has 'buy'/'sell' if isinstance(value, dict) and ('buy' in value or 'sell' in value): record[symbol] = {} for side in ['buy', 'sell']: side_val = value.get(side) if isinstance(side_val, dict): dt_iso = side_val.get('datetime') sig = side_val.get('signal', '') if dt_iso: try: dt_obj = datetime.fromisoformat(dt_iso) except Exception: dt_obj = None else: dt_obj = None record[symbol][side] = {'datetime': dt_obj, 'signal': sig} else: # 구 포맷 처리 (매수만 기록) try: dt_obj = None sig = '' if isinstance(value, str): dt_obj = datetime.fromisoformat(value) elif isinstance(value, dict): dt_iso = value.get('datetime') sig = value.get('signal', '') if dt_iso: dt_obj = datetime.fromisoformat(dt_iso) record.setdefault(symbol, {})['buy'] = {'datetime': dt_obj, 'signal': sig} except Exception: continue # last_signal 채우기 (buy 기준) for sym, sides in record.items(): if 'buy' in sides and sides['buy'].get('signal'): self.last_signal[sym] = sides['buy']['signal'] return record def _save_buy_cooldown(self) -> None: """save nested trade record structure""" try: data: dict[str, dict] = {} for symbol, sides in self.buy_cooldown.items(): data[symbol] = {} for side in ['buy', 'sell']: info = sides.get(side) if not info: continue dt_obj = info.get('datetime') sig = info.get('signal', '') data[symbol][side] = { 'datetime': dt_obj.isoformat() if isinstance(dt_obj, datetime) else '', 'signal': sig, } with open(self.cooldown_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Error saving cooldown data: {e}") # ------------- Telegram ------------- def _send_coin_msg(self, text: str) -> None: coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN) asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text)) def _send_stock_msg(self, text: str) -> None: stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN) asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text)) def sendMsg(self, msg): try: pool = Pool(12) pool.map(self._send_coin_msg, [msg]) except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def send_coin_telegram_message(self, message_list: list[str], header: str) -> None: payload = header + "\n" for i, message in enumerate(message_list): payload += message if i + 1 % 20 == 0: pool = Pool(12) pool.map(self._send_coin_msg, [payload]) payload = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(self._send_coin_msg, [payload]) def send_stock_telegram_message(self, message_list: list[str], header: str) -> None: payload = header + "\n" for i, message in enumerate(message_list): payload += message + "\n" if i + 1 % 20 == 0: pool = Pool(12) pool.map(self._send_stock_msg, [payload]) payload = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(self._send_stock_msg, [payload]) # ------------- Indicators ------------- def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame: columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume'] normalized_data = data.copy() for column in columns_to_normalize: min_val = data[column].rolling(window=20).min() max_val = data[column].rolling(window=20).max() denominator = max_val - min_val normalized_data[f'{column}_Norm'] = np.where( denominator != 0, (data[column] - min_val) / denominator, 0.5, ) return normalized_data def inverse_data(self, data: pd.DataFrame) -> pd.DataFrame: """원본 data 가격 시계를 상하 대칭(글로벌 min/max 기준)으로 반전하여 하락↔상승 트렌드를 뒤집는다.""" price_cols = ['Open', 'High', 'Low', 'Close'] inv = data.copy() global_min = data[price_cols].min().min() global_max = data[price_cols].max().max() # 축 기준은 global_mid = (max+min), so transformed = max+min - price for col in price_cols: inv[col] = global_max + global_min - data[col] # Volume은 그대로 유지 inv['Volume'] = data['Volume'] # 지표 다시 계산 inv = self.normalize_data(inv) inv['MA5'] = inv['Close'].rolling(window=5).mean() inv['MA20'] = inv['Close'].rolling(window=20).mean() inv['MA40'] = inv['Close'].rolling(window=40).mean() inv['MA120'] = inv['Close'].rolling(window=120).mean() inv['MA200'] = inv['Close'].rolling(window=200).mean() inv['MA240'] = inv['Close'].rolling(window=240).mean() inv['MA720'] = inv['Close'].rolling(window=720).mean() inv['MA1440'] = inv['Close'].rolling(window=1440).mean() inv['Deviation5'] = (inv['Close'] / inv['MA5']) * 100 inv['Deviation20'] = (inv['Close'] / inv['MA20']) * 100 inv['Deviation40'] = (inv['Close'] / inv['MA40']) * 100 inv['Deviation120'] = (inv['Close'] / inv['MA120']) * 100 inv['Deviation200'] = (inv['Close'] / inv['MA200']) * 100 inv['Deviation240'] = (inv['Close'] / inv['MA240']) * 100 inv['Deviation720'] = (inv['Close'] / inv['MA720']) * 100 inv['Deviation1440'] = (inv['Close'] / inv['MA1440']) * 100 inv['golden_cross'] = (inv['MA5'] > inv['MA20']) & (inv['MA5'].shift(1) <= inv['MA20'].shift(1)) inv['MA'] = inv['Close'].rolling(window=20).mean() inv['STD'] = inv['Close'].rolling(window=20).std() inv['Upper'] = inv['MA'] + (2 * inv['STD']) inv['Lower'] = inv['MA'] - (2 * inv['STD']) return inv def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame: data = self.normalize_data(data) data['MA5'] = data['Close'].rolling(window=5).mean() data['MA20'] = data['Close'].rolling(window=20).mean() data['MA40'] = data['Close'].rolling(window=40).mean() data['MA120'] = data['Close'].rolling(window=120).mean() data['MA200'] = data['Close'].rolling(window=200).mean() data['MA240'] = data['Close'].rolling(window=240).mean() data['MA720'] = data['Close'].rolling(window=720).mean() data['MA1440'] = data['Close'].rolling(window=1440).mean() data['Deviation5'] = (data['Close'] / data['MA5']) * 100 data['Deviation20'] = (data['Close'] / data['MA20']) * 100 data['Deviation40'] = (data['Close'] / data['MA40']) * 100 data['Deviation120'] = (data['Close'] / data['MA120']) * 100 data['Deviation200'] = (data['Close'] / data['MA200']) * 100 data['Deviation240'] = (data['Close'] / data['MA240']) * 100 data['Deviation720'] = (data['Close'] / data['MA720']) * 100 data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100 data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1)) data['MA'] = data['Close'].rolling(window=20).mean() data['STD'] = data['Close'].rolling(window=20).std() data['Upper'] = data['MA'] + (2 * data['STD']) data['Lower'] = data['MA'] - (2 * data['STD']) return data # ------------- Strategy ------------- def buy_sell_ticker_1h(self, symbol: str, data: pd.DataFrame, balances=None, is_inverse: bool = False) -> bool: try: # 신호 생성 및 최신 포인트 확인 data = self.annotate_signals(symbol, data) if data['point'].iloc[-1] != 1: return False if is_inverse: # BUY_MINUTE_LIMIT 이내라면 매수하지 않음 current_time = datetime.now() last_buy_dt = self.buy_cooldown.get(symbol, {}).get('sell', {}).get('datetime') if last_buy_dt: time_diff = current_time - last_buy_dt if time_diff.total_seconds() < BUY_MINUTE_LIMIT: print(f"{symbol}: 매수 금지 중 (남은 시간: {BUY_MINUTE_LIMIT - time_diff.total_seconds():.0f}초)") return False # 인버스 데이터: 매수 신호를 매도로 처리 (fall_6p, deviation40 만 허용) # 허용된 인버스 매도 신호만 처리 last_signal = str(data['signal'].iloc[-1]) if 'signal' in data.columns else '' if last_signal not in ['fall_6p', 'deviation40']: return False available_balance = 0 try: if balances and symbol in balances: available_balance = float(balances[symbol].get('balance', 0)) except Exception: available_balance = 0 if available_balance <= 0: return False sell_amount = available_balance * 0.7 """ _ = self.hts.sellCoinMarket(symbol, 0, sell_amount) if self.cooldown_file is not None: try: self.last_signal[symbol] = str(data['signal'].iloc[-1]) except Exception: self.last_signal[symbol] = '' self.buy_cooldown.setdefault(symbol, {})['sell'] = {'datetime': current_time, 'signal': str(data['signal'].iloc[-1])} self._save_buy_cooldown() print(f"{KR_COINS[symbol]} ({symbol}) [{data['signal'].iloc[-1]} 매도], 현재가: {data['Close'].iloc[-1]:.4f}") self.sendMsg("[KRW-COIN]\n" + f"• 매도 [COIN] {KR_COINS[symbol]} ({symbol}): {data['signal'].iloc[-1]} ({'₩'}{data['Close'].iloc[-1]:.4f})") """ return True else: check_5_week_lowest = False # BUY_MINUTE_LIMIT 이내라면 매수하지 않음 current_time = datetime.now() last_buy_dt = self.buy_cooldown.get(symbol, {}).get('buy', {}).get('datetime') if last_buy_dt: time_diff = current_time - last_buy_dt if time_diff.total_seconds() < BUY_MINUTE_LIMIT: print(f"{symbol}: 매수 금지 중 (남은 시간: {BUY_MINUTE_LIMIT - time_diff.total_seconds():.0f}초)") return False try: # 5주봉이 20주봉이나 40주봉보다 아래에 있는지 체크 # Convert hourly data to week-based rolling periods (5, 20, 40 weeks) hours_in_week = 24 * 7 # 168 hours period_5w = 5 * hours_in_week # 840 hours period_20w = 20 * hours_in_week # 3,360 hours period_40w = 40 * hours_in_week # 6,720 hours if len(data) >= period_40w: wma5 = data['Close'].rolling(window=period_5w).mean().iloc[-1] wma20 = data['Close'].rolling(window=period_20w).mean().iloc[-1] wma40 = data['Close'].rolling(window=period_40w).mean().iloc[-1] # 5-week MA is the lowest among 5, 20, 40 week MAs if (wma5 < wma20) and (wma5 < wma40): check_5_week_lowest = True except Exception: # Ignore errors in MA calculation so as not to block trading logic pass # 체크: fall_6p buy_amount = 5100 current_time = datetime.now() if data['signal'].iloc[-1] == 'fall_6p': if data['Close'].iloc[-1] > 100: buy_amount = 500000 else: buy_amount = 300000 #elif data['signal'].iloc[-1] == 'movingaverage': # buy_amount = 10000 elif data['signal'].iloc[-1] == 'deviation40': buy_amount = 7000 elif data['signal'].iloc[-1] == 'deviation240': buy_amount = 6000 elif data['signal'].iloc[-1] == 'deviation1440': if symbol in ['BONK', 'PEPE', 'TON']: buy_amount = 7000 else: buy_amount = 6000 if data['signal'].iloc[-1] in ['movingaverage', 'deviation40', 'deviation240', 'deviation1440']: if check_5_week_lowest: buy_amount *= 2 # 매수를 진행함 buy_amount = self.hts.buyCoinMarket(symbol, buy_amount) # 최근 매수 신호를 함께 기록하여 [신규] 포맷으로 저장 if self.cooldown_file is not None: try: self.last_signal[symbol] = str(data['signal'].iloc[-1]) except Exception: self.last_signal[symbol] = '' self.buy_cooldown.setdefault(symbol, {})['buy'] = {'datetime': current_time, 'signal': str(data['signal'].iloc[-1])} # 매수를 저장함 self._save_buy_cooldown() print(f"{KR_COINS[symbol]} ({symbol}) [{data['signal'].iloc[-1]}], 현재가: {data['Close'].iloc[-1]:.4f}, {int(BUY_MINUTE_LIMIT/60)}분간 매수 금지 시작") self.sendMsg("{}".format(self.format_message(symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['signal'].iloc[-1], buy_amount))) except Exception as e: print(f"Error buying {symbol}: {str(e)}") return False return True def annotate_signals(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame: data = data.copy() data['signal'] = '' data['point'] = 0 if data['point'].iloc[-1] != 1: for i in range(1, len(data)): if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \ all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \ data['MA720'].iloc[i] < data['MA1440'].iloc[i]: data.at[data.index[i], 'signal'] = 'movingaverage' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'movingaverage' data.at[data.index[-1], 'point'] = 1 if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90: data.at[data.index[i], 'signal'] = 'deviation40' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation40' data.at[data.index[-1], 'point'] = 1 if symbol not in ['BONK']: if symbol in ['TRX']: if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98: data.at[data.index[i], 'signal'] = 'deviation240' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation240' data.at[data.index[-1], 'point'] = 1 else: if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90: data.at[data.index[i], 'signal'] = 'deviation240' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation240' data.at[data.index[-1], 'point'] = 1 if symbol in ['TON']: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89: data.at[data.index[i], 'signal'] = 'deviation1440' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation1440' data.at[data.index[-1], 'point'] = 1 elif symbol in ['XRP']: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90: data.at[data.index[i], 'signal'] = 'deviation1440' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation1440' data.at[data.index[-1], 'point'] = 1 elif symbol in ['BONK']: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76: data.at[data.index[i], 'signal'] = 'deviation1440' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation1440' data.at[data.index[-1], 'point'] = 1 else: if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80: data.at[data.index[i], 'signal'] = 'deviation1440' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'deviation1440' data.at[data.index[-1], 'point'] = 1 # Deviation720 상향 돌파 매수 (92, 93) try: prev_d720 = data['Deviation720'].iloc[i - 1] curr_d720 = data['Deviation720'].iloc[i] # 92 상향 돌파 if prev_d720 < 92 and curr_d720 >= 92: data.at[data.index[i], 'signal'] = 'Deviation720' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'Deviation720' data.at[data.index[-1], 'point'] = 1 # 93 상향 돌파 if prev_d720 < 93 and curr_d720 >= 93: data.at[data.index[i], 'signal'] = 'Deviation720' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'Deviation720' data.at[data.index[-1], 'point'] = 1 except Exception: pass try: prev_low = data['Low'].iloc[i - 1] curr_close = data['Close'].iloc[i] curr_low = data['Low'].iloc[i] cond_close_drop = curr_close <= prev_low * 0.94 cond_low_drop = curr_low <= prev_low * 0.94 if cond_close_drop or cond_low_drop: data.at[data.index[i], 'signal'] = 'fall_6p' data.at[data.index[i], 'point'] = 1 if not simulation and data['point'][-3:].sum() > 0: data.at[data.index[-1], 'signal'] = 'fall_6p' data.at[data.index[-1], 'point'] = 1 except Exception: pass return data # ------------- Formatting ------------- def format_message(self, symbol: str, symbol_name: str, close: float, signal: str, buy_amount: float) -> str: message = f"[매수] {symbol_name} ({symbol}): " if int(close) >= 100: message += f"₩{close}" message += f" (₩{buy_amount})" elif int(close) >= 10: message += f"₩{close:.2f}" message += f" (₩{buy_amount:.2f})" elif int(close) >= 1: message += f"₩{close:.3f}" message += f" (₩{buy_amount:.3f})" else: message += f"₩{close:.4f}" message += f" (₩{buy_amount:.4f})" if signal != '': message += f"[{signal}]" return message def format_ma_message(self, info: dict, market_type: str) -> str: prefix = '상승 ' if info.get('alert') else '' message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) " message += f"{'$' if market_type == 'US' else '₩'}({info['price']:.4f}) \n" return message # ------------- Data fetch ------------- def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None: for attempt in range(retries): try: if to is None: if interval == 43200: url = ("https://api.bithumb.com/v1/candles/months?market=KRW-{}&count=200").format(symbol) elif interval == 10080: url = ("https://api.bithumb.com/v1/candles/weeks?market=KRW-{}&count=200").format(symbol) elif interval == 1440: url = ("https://api.bithumb.com/v1/candles/days?market=KRW-{}&count=200").format(symbol) else: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) else: if interval == 43200: url = ("https://api.bithumb.com/v1/candles/months?market=KRW-{}&count=200&to={}").format(symbol, to) elif interval == 10080: url = ("https://api.bithumb.com/v1/candles/weeks?market=KRW-{}&count=200&to={}").format(symbol, to) elif interval == 1440: url = ("https://api.bithumb.com/v1/candles/days?market=KRW-{}&count=200&to={}").format(symbol, to) else: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) headers = {"accept": "application/json"} response = requests.get(url, headers=headers) json_data = json.loads(response.text) df_temp = pd.DataFrame(json_data) df_temp = df_temp.sort_index(ascending=False) if 'candle_date_time_kst' not in df_temp: return None data = pd.DataFrame() data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') data['Open'] = df_temp['opening_price'] data['Close'] = df_temp['trade_price'] data['High'] = df_temp['high_price'] data['Low'] = df_temp['low_price'] data['Volume'] = df_temp['candle_acc_trade_volume'] data = data.set_index('datetime') data = data.astype(float) data["datetime"] = data.index if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame: to = datetime.now() data: pd.DataFrame | None = None while data is None or len(data) < bong_count: if data is None: data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) else: previous_count = len(data) df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) data = pd.concat([data, df], ignore_index=True) if previous_count == len(data): break time.sleep(0.3) to = to - relativedelta(minutes=interval * 200) data = data.set_index('datetime') data = data.sort_index() data = data.drop_duplicates(keep='first') data["datetime"] = data.index return data def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame: conn = sqlite3.connect('./resources/coins.db') cursor = conn.cursor() for i in range(1, len(data)): cursor.execute("SELECT * from {}_{} where CODE = ? and ymdhms = ?".format(symbol, str(interval)), (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S')),) arr = cursor.fetchone() if not arr: cursor.execute( "INSERT INTO {}_{} (CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)".format(symbol, interval), ( symbol, KR_COINS[symbol], data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), data['datetime'].iloc[-i].strftime('%Y%m%d'), data['datetime'].iloc[-i].strftime('%H%M%S'), data['Close'].iloc[-i], data['Open'].iloc[-i], data['High'].iloc[-i], data['Low'].iloc[-i], data['Volume'].iloc[-i], ), ) else: break cursor.execute("select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from {}_{} order by ymdhms desc limit 7000) subquery order by datetime".format(symbol, str(interval))) result = cursor.fetchall() conn.commit() cursor.close() conn.close() df = pd.DataFrame(result) df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime'] df = df.set_index('datetime') df = df.sort_index() df['datetime'] = df.index return df def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame: data = self.get_coin_data(symbol, interval) data_1 = self.get_coin_data(symbol, interval=1) data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60 saved_data = self.get_coin_saved_data(symbol, interval, data) frames = [data] if data_1 is not None and not data_1.empty: frames.append(data_1.iloc[[-1]]) frames.append(saved_data) data = pd.concat(frames, ignore_index=True) data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S') data = data.set_index('datetime') data = data.sort_index() data = data[~data.index.duplicated(keep='last')] data["datetime"] = data.index return data def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None: for attempt in range(retries): try: end = datetime.now() start = end - timedelta(days=300) data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')) if not data.empty: data = data.rename(columns={ 'Open': 'Open', 'High': 'High', 'Low': 'Low', 'Close': 'Close', 'Volume': 'Volume', }) return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(2) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None