import yfinance as yf import pandas as pd from datetime import datetime, timedelta import telegram import time import requests import json import asyncio from multiprocessing import Pool from config import * def send(text): client = telegram.Bot(token=TELEGRAM_BOT_TOKEN) asyncio.run(client.send_message(chat_id=TELEGRAM_CHAT_ID, text=text)) return def send_telegram_message(message): pool = Pool(12) pool.map(send, [message]) def calculate_bollinger_bands(data): data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean() data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std() data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD']) data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD']) return data def check_bollinger_bands(symbol, data): if len(data) < BOLLINGER_PERIOD: return None latest = data.iloc[-1] if isinstance(latest['Upper'], float): upper_band = latest['Upper'] lower_band = latest['Lower'] current_price = latest['Close'] else: upper_band = latest['Upper'].iloc[0] lower_band = latest['Lower'].iloc[0] current_price = latest['Close'].iloc[0] distance = (current_price - lower_band) / (upper_band - lower_band) return { 'symbol': symbol, 'price': current_price, 'lower_band': lower_band, 'distance': distance } def get_coin_data(symbol, retries=3): for attempt in range(retries): try: url = ('https://api.bithumb.com/v1/candles/days?market=KRW-{}&count=100').format(symbol) headers = {"accept": "application/json"} response = requests.get(url, headers=headers) json_data = json.loads(response.text) df_temp = pd.DataFrame(json_data) df_temp = df_temp.sort_index(ascending=False) if 'candle_date_time_kst' not in df_temp: return None data = pd.DataFrame() # data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume'] # data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst']) data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') data['Open'] = df_temp['opening_price'] data['Close'] = df_temp['trade_price'] data['High'] = df_temp['high_price'] data['Low'] = df_temp['low_price'] data['Volume'] = df_temp['candle_acc_trade_volume'] data = data.set_index('datetime') data = data.astype(float) data["datetime"] = data.index if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def get_stock_data(symbol, retries=3): for attempt in range(retries): try: end = datetime.now() start = end - timedelta(days=60) data = yf.download( symbol, start=start.strftime('%Y-%m-%d'), end=end.strftime('%Y-%m-%d'), interval='1d', auto_adjust=True, progress=False ) if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def sendAlertMsg(info, market="US"): if market == "US": message = "🔔 [US] {} ({}) 현재가: ${:.2f}, 근접도: {:.2f}%".format(info['name'], info['symbol'], info['price'], info['distance']) else: message = "🔔 [KR] {} ({}) 현재가: ₩{:.0f}, 근접도: {:.2f}%".format(info['name'], info['symbol'].replace('.KS', ''), info['price'], info['distance']) try: send_telegram_message(message) except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def monitor_stocks(): # 미국 주식 모니터링 print("Monitoring US stocks...") for symbol in US_STOCKS: data = get_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = US_STOCKS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] <= ALERT_THRESHOLD: sendAlertMsg(info, "US") print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) # 한국 ETF 모니터링 print("\nMonitoring Korean ETFs...") for symbol in KR_ETFS: data = get_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = KR_ETFS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] <= ALERT_THRESHOLD: sendAlertMsg(info, "KR") print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) return def monitor_coins(): # 미국 주식 모니터링 print("Monitoring KR Coins...") for symbol in KR_COINS: data = get_coin_data(symbol) if data is not None and not data.empty: try: data = calculate_bollinger_bands(data) info = check_bollinger_bands(symbol, data) info['name'] = KR_COINS[symbol] print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance'])) if info['distance'] <= ALERT_THRESHOLD: sendAlertMsg(info, "US") print(f"Alert generated for {symbol}") except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) return if __name__ == "__main__": monitor_coins() monitor_stocks()