import yfinance as yf import pandas as pd from datetime import datetime, timedelta import telegram import time import requests import json import asyncio from multiprocessing import Pool import schedule from config import * def send(text): client = telegram.Bot(token=TELEGRAM_BOT_TOKEN) asyncio.run(client.send_message(chat_id=TELEGRAM_CHAT_ID, text=text)) return def send_telegram_message(message): pool = Pool(12) pool.map(send, [message]) def calculate_bollinger_bands(data): data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean() data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std() data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD']) data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD']) return data def check_bollinger_bands(symbol, data): if len(data) < BOLLINGER_PERIOD: return None latest = data.iloc[-1] if isinstance(latest['Upper'], float): upper_band = latest['Upper'] lower_band = latest['Lower'] current_price = latest['Close'] else: upper_band = latest['Upper'].iloc[0] lower_band = latest['Lower'].iloc[0] current_price = latest['Close'].iloc[0] distance = (current_price - lower_band) / (upper_band - lower_band) return { 'symbol': symbol, 'price': current_price, 'lower_band': lower_band, 'distance': distance } def get_coin_data(symbol, retries=3): for attempt in range(retries): try: url = ('https://api.bithumb.com/v1/candles/minutes?market=KRW-{}&count=300').format(symbol) headers = {"accept": "application/json"} response = requests.get(url, headers=headers) json_data = json.loads(response.text) df_temp = pd.DataFrame(json_data) df_temp = df_temp.sort_index(ascending=False) if 'candle_date_time_kst' not in df_temp: return None data = pd.DataFrame() # data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume'] # data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst']) data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') data['Open'] = df_temp['opening_price'] data['Close'] = df_temp['trade_price'] data['High'] = df_temp['high_price'] data['Low'] = df_temp['low_price'] data['Volume'] = df_temp['candle_acc_trade_volume'] data = data.set_index('datetime') data = data.astype(float) data["datetime"] = data.index if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def get_stock_data(symbol, retries=3): for attempt in range(retries): try: end = datetime.now() start = end - timedelta(days=60) data = yf.download( symbol, start=start.strftime('%Y-%m-%d'), end=end.strftime('%Y-%m-%d'), interval='1d', auto_adjust=True, progress=False ) if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def sendAlertMsg(info, market="US", alert=False): if alert: message = "๐Ÿ””" else: message = "" message += "[{}] {} ({}) ํ˜„์žฌ๊ฐ€: ${:.2f}, ๊ทผ์ ‘๋„: {:.2f}%".format(market, info['name'], info['symbol'], info['price'], info['distance']) try: send_telegram_message(message) except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def monitor_us_stocks(): # ๋ฏธ๊ตญ ์ฃผ์‹ ๋ชจ๋‹ˆํ„ฐ๋ง print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) for symbol in US_STOCKS: data = get_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = US_STOCKS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] > ALERT_THRESHOLD: sendAlertMsg(info, "US") else: sendAlertMsg(info, "US", alert=True) print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) return def monitor_kr_stocks(): # ํ•œ๊ตญ ETF ๋ชจ๋‹ˆํ„ฐ๋ง print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) for symbol in KR_ETFS: data = get_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = KR_ETFS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] > ALERT_THRESHOLD: sendAlertMsg(info, "KR") else: sendAlertMsg(info, "KR", alert=True) print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) return def monitor_coins(): # ์ฝ”์ธ ๋ชจ๋‹ˆํ„ฐ๋ง print("KRW Coins {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) for symbol in KR_COINS: data = get_coin_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = KR_COINS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] > ALERT_THRESHOLD: sendAlertMsg(info, "KRW") else: sendAlertMsg(info, "KRW", alert=True) print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) return def run_schedule(): # ์ฝ”์ธ ๋ชจ๋‹ˆํ„ฐ๋ง ์Šค์ผ€์ค„ (๋งค์‹œ๊ฐ„ 1๋ถ„, 11๋ถ„, 21๋ถ„, 31๋ถ„, 41๋ถ„, 51๋ถ„) for minute in [1, 11, 21, 31, 41, 51]: schedule.every().hour.at(f":{minute:02d}").do(monitor_coins) # ๋ฏธ๊ตญ ์ฃผ์‹ ๋ชจ๋‹ˆํ„ฐ๋ง ์Šค์ผ€์ค„ (๋งค์ผ ์ €๋… 5์‹œ 20๋ถ„) schedule.every().day.at("17:20").do(monitor_us_stocks) # ํ•œ๊ตญ ETF ๋ชจ๋‹ˆํ„ฐ๋ง ์Šค์ผ€์ค„ (๋งค์ผ ์˜ค์ „ 8์‹œ) schedule.every().day.at("18:20").do(monitor_kr_stocks) print("Scheduler started. Monitoring will run at specified times.") while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": run_schedule()