import yfinance as yf import pandas as pd from datetime import datetime, timedelta import telegram import time import asyncio from multiprocessing import Pool from config import * def send(text): client = telegram.Bot(token=TELEGRAM_BOT_TOKEN) asyncio.run(client.send_message(chat_id=TELEGRAM_CHAT_ID, text=text)) return def send_telegram_message(message): pool = Pool(12) pool.map(send, [message]) def calculate_bollinger_bands(data): data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean() data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std() data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD']) data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD']) return data def check_bollinger_bands(symbol, data): if len(data) < BOLLINGER_PERIOD: return None latest = data.iloc[-1] upper_band = latest['Upper'].iloc[0] lower_band = latest['Lower'].iloc[0] current_price = latest['Close'].iloc[0] distance = (current_price - lower_band) / (upper_band - lower_band) return { 'symbol': symbol, 'price': current_price, 'lower_band': lower_band, 'distance': distance } def get_stock_data(symbol, retries=3): for attempt in range(retries): try: end = datetime.now() start = end - timedelta(days=60) data = yf.download( symbol, start=start.strftime('%Y-%m-%d'), end=end.strftime('%Y-%m-%d'), interval='1d', auto_adjust=True, progress=False ) if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(2) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def sendAlertMsg(info, market="US"): if market == "US": message = "🔔 [US] {} ({}) 현재가: ${:.2f}, 근접도: {:.2f}%".format(info['name'], info['symbol'], info['price'], info['distance']) else: message = "🔔 [KR] {} ({}) 현재가: ₩{:.0f}, 근접도: {:.2f}%".format(info['name'], info['symbol'].replace('.KS', ''), info['price'], info['distance']) try: send_telegram_message(message) except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def monitor_stocks(): # 미국 주식 모니터링 print("Monitoring US stocks...") for symbol in US_STOCKS: data = get_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = US_STOCKS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] <= ALERT_THRESHOLD: sendAlertMsg(info, "US") print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) # 한국 ETF 모니터링 print("\nMonitoring Korean ETFs...") for symbol in KR_ETFS: data = get_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = KR_ETFS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] <= ALERT_THRESHOLD: sendAlertMsg(info, "KR") print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) return if __name__ == "__main__": monitor_stocks()