Files
DeepCoin/monitor.py
dsyoon 4ac03fd662 init
2025-08-24 17:31:32 +09:00

576 lines
28 KiB
Python

import pandas as pd
from HTS2 import HTS
from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta
import sqlite3
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import FinanceDataReader as fdr
import numpy as np
import os
from config import *
from HTS2 import HTS
class Monitor(HTS):
"""자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스"""
last_signal = None
cooldown_file = None
def __init__(self, cooldown_file='coins_buy_time.json') -> None:
self.hts = HTS()
# 최근 매수 신호 저장용(파일은 [신규] 포맷으로 저장)
self.last_signal: dict[str, str] = {}
if cooldown_file is not None:
self.cooldown_file = cooldown_file
self.buy_cooldown = self._load_buy_cooldown()
# ------------- Persistence -------------
def _load_buy_cooldown(self) -> dict:
"""load trade record file into nested dict {symbol:{'buy':{'datetime':dt,'signal':s},'sell':{...}}}"""
if not os.path.exists(self.cooldown_file):
return {}
try:
with open(self.cooldown_file, 'r', encoding='utf-8') as f:
raw = json.load(f)
except Exception as e:
print(f"Error loading cooldown data: {e}")
return {}
record: dict[str, dict] = {}
for symbol, value in raw.items():
# 신규 포맷: value has 'buy'/'sell'
if isinstance(value, dict) and ('buy' in value or 'sell' in value):
record[symbol] = {}
for side in ['buy', 'sell']:
side_val = value.get(side)
if isinstance(side_val, dict):
dt_iso = side_val.get('datetime')
sig = side_val.get('signal', '')
if dt_iso:
try:
dt_obj = datetime.fromisoformat(dt_iso)
except Exception:
dt_obj = None
else:
dt_obj = None
record[symbol][side] = {'datetime': dt_obj, 'signal': sig}
else:
# 구 포맷 처리 (매수만 기록)
try:
dt_obj = None
sig = ''
if isinstance(value, str):
dt_obj = datetime.fromisoformat(value)
elif isinstance(value, dict):
dt_iso = value.get('datetime')
sig = value.get('signal', '')
if dt_iso:
dt_obj = datetime.fromisoformat(dt_iso)
record.setdefault(symbol, {})['buy'] = {'datetime': dt_obj, 'signal': sig}
except Exception:
continue
# last_signal 채우기 (buy 기준)
for sym, sides in record.items():
if 'buy' in sides and sides['buy'].get('signal'):
self.last_signal[sym] = sides['buy']['signal']
return record
def _save_buy_cooldown(self) -> None:
"""save nested trade record structure"""
try:
data: dict[str, dict] = {}
for symbol, sides in self.buy_cooldown.items():
data[symbol] = {}
for side in ['buy', 'sell']:
info = sides.get(side)
if not info:
continue
dt_obj = info.get('datetime')
sig = info.get('signal', '')
data[symbol][side] = {
'datetime': dt_obj.isoformat() if isinstance(dt_obj, datetime) else '',
'signal': sig,
}
with open(self.cooldown_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error saving cooldown data: {e}")
# ------------- Telegram -------------
def _send_coin_msg(self, text: str) -> None:
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
def _send_stock_msg(self, text: str) -> None:
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
def sendMsg(self, msg):
try:
pool = Pool(12)
pool.map(self._send_coin_msg, [msg])
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def send_coin_telegram_message(self, message_list: list[str], header: str) -> None:
payload = header + "\n"
for i, message in enumerate(message_list):
payload += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(self._send_coin_msg, [payload])
payload = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(self._send_coin_msg, [payload])
def send_stock_telegram_message(self, message_list: list[str], header: str) -> None:
payload = header + "\n"
for i, message in enumerate(message_list):
payload += message + "\n"
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(self._send_stock_msg, [payload])
payload = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(self._send_stock_msg, [payload])
# ------------- Indicators -------------
def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame:
columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']
normalized_data = data.copy()
for column in columns_to_normalize:
min_val = data[column].rolling(window=20).min()
max_val = data[column].rolling(window=20).max()
denominator = max_val - min_val
normalized_data[f'{column}_Norm'] = np.where(
denominator != 0,
(data[column] - min_val) / denominator,
0.5,
)
return normalized_data
def inverse_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""원본 data 가격 시계를 상하 대칭(글로벌 min/max 기준)으로 반전하여 하락↔상승 트렌드를 뒤집는다."""
price_cols = ['Open', 'High', 'Low', 'Close']
inv = data.copy()
global_min = data[price_cols].min().min()
global_max = data[price_cols].max().max()
# 축 기준은 global_mid = (max+min), so transformed = max+min - price
for col in price_cols:
inv[col] = global_max + global_min - data[col]
# Volume은 그대로 유지
inv['Volume'] = data['Volume']
# 지표 다시 계산
inv = self.normalize_data(inv)
inv['MA5'] = inv['Close'].rolling(window=5).mean()
inv['MA20'] = inv['Close'].rolling(window=20).mean()
inv['MA40'] = inv['Close'].rolling(window=40).mean()
inv['MA120'] = inv['Close'].rolling(window=120).mean()
inv['MA200'] = inv['Close'].rolling(window=200).mean()
inv['MA240'] = inv['Close'].rolling(window=240).mean()
inv['MA720'] = inv['Close'].rolling(window=720).mean()
inv['MA1440'] = inv['Close'].rolling(window=1440).mean()
inv['Deviation5'] = (inv['Close'] / inv['MA5']) * 100
inv['Deviation20'] = (inv['Close'] / inv['MA20']) * 100
inv['Deviation40'] = (inv['Close'] / inv['MA40']) * 100
inv['Deviation120'] = (inv['Close'] / inv['MA120']) * 100
inv['Deviation200'] = (inv['Close'] / inv['MA200']) * 100
inv['Deviation240'] = (inv['Close'] / inv['MA240']) * 100
inv['Deviation720'] = (inv['Close'] / inv['MA720']) * 100
inv['Deviation1440'] = (inv['Close'] / inv['MA1440']) * 100
inv['golden_cross'] = (inv['MA5'] > inv['MA20']) & (inv['MA5'].shift(1) <= inv['MA20'].shift(1))
inv['MA'] = inv['Close'].rolling(window=20).mean()
inv['STD'] = inv['Close'].rolling(window=20).std()
inv['Upper'] = inv['MA'] + (2 * inv['STD'])
inv['Lower'] = inv['MA'] - (2 * inv['STD'])
return inv
def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
data = self.normalize_data(data)
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA40'] = data['Close'].rolling(window=40).mean()
data['MA120'] = data['Close'].rolling(window=120).mean()
data['MA200'] = data['Close'].rolling(window=200).mean()
data['MA240'] = data['Close'].rolling(window=240).mean()
data['MA720'] = data['Close'].rolling(window=720).mean()
data['MA1440'] = data['Close'].rolling(window=1440).mean()
data['Deviation5'] = (data['Close'] / data['MA5']) * 100
data['Deviation20'] = (data['Close'] / data['MA20']) * 100
data['Deviation40'] = (data['Close'] / data['MA40']) * 100
data['Deviation120'] = (data['Close'] / data['MA120']) * 100
data['Deviation200'] = (data['Close'] / data['MA200']) * 100
data['Deviation240'] = (data['Close'] / data['MA240']) * 100
data['Deviation720'] = (data['Close'] / data['MA720']) * 100
data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100
data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1))
data['MA'] = data['Close'].rolling(window=20).mean()
data['STD'] = data['Close'].rolling(window=20).std()
data['Upper'] = data['MA'] + (2 * data['STD'])
data['Lower'] = data['MA'] - (2 * data['STD'])
return data
# ------------- Strategy -------------
def buy_ticker_1h(self, symbol: str, data: pd.DataFrame) -> bool:
try:
# 기존 로직 ---------------------------------------------------
#print('BUY: {}'.format(symbol))
#self.sendMsg('BUY: {}'.format(symbol))
check_5_week_lowest = False
# 5주봉이 20주봉이나 40주봉보다 아래에 있는지 체크
try:
# Convert hourly data to week-based rolling periods (5, 20, 40 weeks)
hours_in_week = 24 * 7 # 168 hours
period_5w = 5 * hours_in_week # 840 hours
period_20w = 20 * hours_in_week # 3,360 hours
period_40w = 40 * hours_in_week # 6,720 hours
if len(data) >= period_40w:
wma5 = data['Close'].rolling(window=period_5w).mean().iloc[-1]
wma20 = data['Close'].rolling(window=period_20w).mean().iloc[-1]
wma40 = data['Close'].rolling(window=period_40w).mean().iloc[-1]
# 5-week MA is the lowest among 5, 20, 40 week MAs
if (wma5 < wma20) and (wma5 < wma40):
check_5_week_lowest = True
except Exception:
# Ignore errors in MA calculation so as not to block trading logic
pass
buy_amount = 6000
current_time = datetime.now()
if data['signal'].iloc[-1] == 'fall_6p':
if data['Close'].iloc[-1] > 100:
buy_amount = 500000
else:
buy_amount = 300000
last_buy_dt = self.buy_cooldown.get(symbol, {}).get('buy', {}).get('datetime')
if last_buy_dt and self.last_signal.get(symbol) == 'fall_6p':
time_diff = current_time - last_buy_dt
if time_diff.total_seconds() < 4000:
print(f"{symbol}: 매수 금지 중 (남은 시간: {600 - time_diff.total_seconds():.0f}초)")
return False
else:
last_buy_dt = self.buy_cooldown.get(symbol, {}).get('buy', {}).get('datetime')
if last_buy_dt:
time_diff = current_time - last_buy_dt
if time_diff.total_seconds() < 1800:
print(f"{symbol}: 매수 금지 중 (남은 시간: {1800 - time_diff.total_seconds():.0f}초)")
return False
buy_amount = 5100
if data['signal'].iloc[-1] == 'movingaverage':
buy_amount = 30000
elif data['signal'].iloc[-1] == 'deviation40':
buy_amount = 50000
elif data['signal'].iloc[-1] == 'deviation240':
buy_amount = 6000
elif data['signal'].iloc[-1] == 'deviation1440':
if symbol in ['BONK', 'PEPE', 'TON']:
buy_amount = 20000
else:
buy_amount = 30000
# heikin_ashi 조건 제거 완료
if data['signal'].iloc[-1] in ['movingaverage', 'deviation40', 'deviation240', 'deviation1440']:
if check_5_week_lowest:
buy_amount *= 4
_ = self.hts.buyCoinMarket(symbol, buy_amount)
if self.cooldown_file is not None:
# 최근 매수 신호를 함께 기록하여 [신규] 포맷으로 저장
try:
self.last_signal[symbol] = str(data['signal'].iloc[-1])
except Exception:
self.last_signal[symbol] = ''
self.buy_cooldown.setdefault(symbol, {})['buy'] = {'datetime': current_time, 'signal': str(data['signal'].iloc[-1])}
# 매수를 저장함
self._save_buy_cooldown()
print(f"{KR_COINS[symbol]} ({symbol}) [{data['signal'].iloc[-1]}], 현재가: {data['Close'].iloc[-1]:.4f}, 20분간 매수 금지 시작")
self.sendMsg("[KRW-COIN]" + "\n" + self.format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['signal'].iloc[-1]))
except Exception as e:
print(f"Error buying {symbol}: {str(e)}")
return False
return True
def sell_ticker_1h(self, symbol: str, data: pd.DataFrame, balances) -> bool:
"""Dev40(Deviation40) 매도 조건을 만족할 때만 매도 실행"""
try:
# 최신 캔들의 시그널이 Dev40이 아니면 매도하지 않음
if data['signal'].iloc[-1] != 'deviation40':
return False
current_time = datetime.now()
# 최근 Dev40 매도로부터 20분(1200초) 쿨다운 적용
last_sell_dt = self.buy_cooldown.get(symbol, {}).get('sell', {}).get('datetime')
if last_sell_dt and self.last_signal.get(symbol) == 'deviation40':
time_diff = current_time - last_sell_dt
if time_diff.total_seconds() < 1200:
remain = 1200 - time_diff.total_seconds()
print(f"{symbol}: 매도 금지 중 (남은 시간: {remain:.0f}초)")
return False
# 매도 수량/금액 산정 (예: 50,000 KRW 상당)
sell_amount = balances[symbol]['balance'] * 0.7 # KRW 기준 매도 총액 혹은 수량 설정
# 실제 매도 실행 (HTS API)
_ = self.hts.sellCoinMarket(symbol, 0, sell_amount) # market 매도 (price 파라미터 미사용)
# 쿨다운 및 로그 저장
if self.cooldown_file is not None:
self.last_signal[symbol] = 'deviation40'
self.buy_cooldown.setdefault(symbol, {})['sell'] = {'datetime': current_time, 'signal': 'deviation40'}
self._save_buy_cooldown()
print(f"{KR_COINS[symbol]} ({symbol}) [Dev40 매도], 현재가: {data['Close'].iloc[-1]:.4f}, 20분간 매도 금지 시작")
self.sendMsg("[KRW-COIN]" + "\n" + self.format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], 'Dev40 매도'))
except Exception as e:
print(f"Error selling {symbol}: {str(e)}")
return False
return True
def check_point(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame:
data = data.copy()
data['signal'] = ''
data['point'] = 0
if data['point'].iloc[-1] != 1:
for i in range(1, len(data)):
if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \
all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \
data['MA720'].iloc[i] < data['MA1440'].iloc[i]:
data.at[data.index[i], 'signal'] = 'movingaverage'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'movingaverage'
data.at[data.index[-1], 'point'] = 1
if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90:
data.at[data.index[i], 'signal'] = 'deviation40'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation40'
data.at[data.index[-1], 'point'] = 1
if symbol not in ['BONK']:
if symbol in ['TRX']:
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98:
data.at[data.index[i], 'signal'] = 'deviation240'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation240'
data.at[data.index[-1], 'point'] = 1
else:
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90:
data.at[data.index[i], 'signal'] = 'deviation240'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation240'
data.at[data.index[-1], 'point'] = 1
if symbol in ['TON']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89:
data.at[data.index[i], 'signal'] = 'deviation1440'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation1440'
data.at[data.index[-1], 'point'] = 1
elif symbol in ['XRP']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90:
data.at[data.index[i], 'signal'] = 'deviation1440'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation1440'
data.at[data.index[-1], 'point'] = 1
elif symbol in ['BONK']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76:
data.at[data.index[i], 'signal'] = 'deviation1440'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation1440'
data.at[data.index[-1], 'point'] = 1
else:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80:
data.at[data.index[i], 'signal'] = 'deviation1440'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'deviation1440'
data.at[data.index[-1], 'point'] = 1
try:
prev_low = data['Low'].iloc[i - 1]
curr_close = data['Close'].iloc[i]
curr_low = data['Low'].iloc[i]
cond_close_drop = curr_close <= prev_low * 0.94
cond_low_drop = curr_low <= prev_low * 0.94
if cond_close_drop or cond_low_drop:
data.at[data.index[i], 'signal'] = 'fall_6p'
data.at[data.index[i], 'point'] = 1
if not simulation and data['point'][-3:].sum() > 0:
data.at[data.index[-1], 'signal'] = 'fall_6p'
data.at[data.index[-1], 'point'] = 1
except Exception:
pass
return data
# ------------- Formatting -------------
def format_message(self, market_type: str, symbol: str, symbol_name: str, close: float, signal: str) -> str:
message = f"• 매수 [{market_type}] {symbol_name} ({symbol}): {signal} "
message += f"({'$' if market_type == 'US' else ''}{close:.4f})"
return message
def format_ma_message(self, info: dict, market_type: str) -> str:
prefix = '상승 ' if info.get('alert') else ''
message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"{'$' if market_type == 'US' else ''}({info['price']:.4f}) \n"
return message
# ------------- Data fetch -------------
def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None:
for attempt in range(retries):
try:
if to is None:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol)
else:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame:
to = datetime.now()
data: pd.DataFrame | None = None
while data is None or len(data) < bong_count:
if data is None:
data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
else:
previous_count = len(data)
df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
data = pd.concat([data, df], ignore_index=True)
if previous_count == len(data):
break
time.sleep(0.3)
to = to - relativedelta(minutes=interval * 200)
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame:
conn = sqlite3.connect('coins.db')
cursor = conn.cursor()
for i in range(1, len(data)):
cursor.execute("SELECT * from {}_{} where CODE = ? and ymdhms = ?".format(symbol, str(interval)), (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S')),)
arr = cursor.fetchone()
if not arr:
cursor.execute(
"INSERT INTO {}_{} (CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)".format(symbol, interval),
(
symbol,
KR_COINS[symbol],
data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'),
data['datetime'].iloc[-i].strftime('%Y%m%d'),
data['datetime'].iloc[-i].strftime('%H%M%S'),
data['Close'].iloc[-i],
data['Open'].iloc[-i],
data['High'].iloc[-i],
data['Low'].iloc[-i],
data['Volume'].iloc[-i],
),
)
else:
break
cursor.execute("select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from {}_{} order by ymdhms desc limit 7000) subquery order by datetime".format(symbol, str(interval)))
result = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
df = pd.DataFrame(result)
df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime']
df = df.set_index('datetime')
df = df.sort_index()
df['datetime'] = df.index
return df
def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame:
data = self.get_coin_data(symbol, interval)
data_1 = self.get_coin_data(symbol, interval=1)
data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60
saved_data = self.get_coin_saved_data(symbol, interval, data)
data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True)
data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S')
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None:
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume',
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None