Files
DeepCoin/stock_monitor.py
dsyoon 1fdbb8c783 init
2025-04-26 19:54:45 +09:00

212 lines
7.5 KiB
Python

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
def send(text):
client = telegram.Bot(token=TELEGRAM_BOT_TOKEN)
asyncio.run(client.send_message(chat_id=TELEGRAM_CHAT_ID, text=text))
return
def send_telegram_message(message):
pool = Pool(12)
pool.map(send, [message])
def calculate_bollinger_bands(data):
data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean()
data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std()
data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD'])
data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD'])
return data
def check_bollinger_bands(symbol, data):
if len(data) < BOLLINGER_PERIOD:
return None
latest = data.iloc[-1]
if isinstance(latest['Upper'], float):
upper_band = latest['Upper']
lower_band = latest['Lower']
current_price = latest['Close']
else:
upper_band = latest['Upper'].iloc[0]
lower_band = latest['Lower'].iloc[0]
current_price = latest['Close'].iloc[0]
distance = (current_price - lower_band) / (upper_band - lower_band)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance
}
def get_coin_data(symbol, retries=3):
for attempt in range(retries):
try:
url = ('https://api.bithumb.com/v1/candles/days?market=KRW-{}&count=100').format(symbol)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=60)
data = yf.download(
symbol,
start=start.strftime('%Y-%m-%d'),
end=end.strftime('%Y-%m-%d'),
interval='1d',
auto_adjust=True,
progress=False
)
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def sendAlertMsg(info, market="US"):
if market == "US":
message = "🔔 [US] {} ({}) 현재가: ${:.2f}, 근접도: {:.2f}%".format(info['name'], info['symbol'], info['price'], info['distance'])
else:
message = "🔔 [KR] {} ({}) 현재가: ₩{:.0f}, 근접도: {:.2f}%".format(info['name'], info['symbol'].replace('.KS', ''), info['price'], info['distance'])
try:
send_telegram_message(message)
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_us_stocks():
# 미국 주식 모니터링
print("Monitoring US stocks...")
for symbol in US_STOCKS:
data = get_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_bollinger_bands(data)
info = check_bollinger_bands(symbol, data)
info['name'] = US_STOCKS[symbol]
print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance']))
if info['distance'] <= ALERT_THRESHOLD:
sendAlertMsg(info, "US")
print(f"Alert generated for {symbol}")
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
return
def monitor_kr_stocks():
# 한국 ETF 모니터링
print("\nMonitoring Korean ETFs...")
for symbol in KR_ETFS:
data = get_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_bollinger_bands(data)
info = check_bollinger_bands(symbol, data)
info['name'] = KR_ETFS[symbol]
print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance']))
if info['distance'] <= ALERT_THRESHOLD:
sendAlertMsg(info, "KR")
print(f"Alert generated for {symbol}")
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
return
def monitor_coins():
# 미국 주식 모니터링
print("Monitoring KR Coins...")
for symbol in KR_COINS:
data = get_coin_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_bollinger_bands(data)
info = check_bollinger_bands(symbol, data)
info['name'] = KR_COINS[symbol]
print(" - {} ({}): {:.2f} ({:.2f})".format(info['name'], symbol, info['price'], info['distance']))
if info['distance'] <= ALERT_THRESHOLD:
sendAlertMsg(info, "US")
print(f"Alert generated for {symbol}")
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
return
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
print("Starting monitoring system...")
# 스케줄 설정
schedule.every().hour.at(":01").do(monitor_coins) # 매시 1분에 실행
schedule.every().day.at("17:20").do(monitor_us_stocks) # 매일 저녁 5시 20분에 실행
schedule.every().day.at("08:00").do(monitor_kr_stocks) # 매일 오전 8시에 실행
# 초기 실행
print("Running initial checks...")
monitor_coins()
monitor_us_stocks()
monitor_kr_stocks()
# 스케줄러 실행
print("Starting scheduler...")
run_schedule()