Files
DeepCoin/stock_monitor.py
dsyoon b2cc4542a4 init
2025-05-07 12:11:37 +09:00

395 lines
14 KiB
Python

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
def send_coin_msg(text):
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
return
def send_coin_telegram_message(message_list, header):
pStr = header + "/n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
return
def send_stock_msg(text):
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
return
def send_stock_telegram_message(message_list, header):
pStr = header+"/n"
for i, message in enumerate(message_list):
pStr += message
if i+1 % 20 == 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
return
def calculate_bollinger_bands(data):
data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean()
data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std()
data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD'])
data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD'])
return data
def calculate_technical_indicators(data):
# 볼린저 밴드 계산
data = calculate_bollinger_bands(data)
# RSI 계산 (14일 기준)
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
data['RSI'] = 100 - (100 / (1 + rs))
# MACD 계산
exp1 = data['Close'].ewm(span=12, adjust=False).mean()
exp2 = data['Close'].ewm(span=26, adjust=False).mean()
data['MACD'] = exp1 - exp2
data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
# 이동평균선
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA60'] = data['Close'].rolling(window=60).mean()
# 거래량 이동평균
data['Volume_MA5'] = data['Volume'].rolling(window=5).mean()
return data
def check_buy_signals(symbol, data):
if len(data) < 60: # 최소 60일치 데이터 필요
return None
latest = data.iloc[-1]
prev = data.iloc[-2]
# 볼린저 밴드 신호
bb_signal = False
if isinstance(latest['Upper'], float):
upper_band = latest['Upper']
lower_band = latest['Lower']
current_price = latest['Close']
else:
upper_band = latest['Upper'].iloc[0]
lower_band = latest['Lower'].iloc[0]
current_price = latest['Close'].iloc[0]
distance = (current_price - lower_band) / (upper_band - lower_band)
bb_signal = distance < BOLLINGER_THRESHOLD
# RSI 과매도 신호 (RSI < 30)
if not isinstance(latest['Upper'], float):
rsi_signal = latest['RSI'].iloc[0] < 30
# MACD 신호 (MACD가 시그널 라인을 상향 돌파)
macd_signal = (prev['MACD'].iloc[0] < prev['Signal'].iloc[0]) and (latest['MACD'].iloc[0] > latest['Signal'].iloc[0])
# 이동평균선 골든크로스 임박 또는 발생
ma_signal = (prev['MA5'].iloc[0] < prev['MA20'].iloc[0]) and (latest['MA5'].iloc[0] >= latest['MA20'].iloc[0])
# 거래량 증가 신호 (5일 평균 대비 150% 이상)
volume_signal = latest['Volume'].iloc[0] > (latest['Volume_MA5'].iloc[0] * 1.5)
# 종합 신호
buy_signals = {
'bb_signal': bb_signal,
'rsi_signal': rsi_signal,
'macd_signal': macd_signal,
'ma_signal': ma_signal,
'volume_signal': volume_signal
}
# 최소 3개 이상의 신호가 동시에 발생할 때 매수 신호로 간주
signal_count = sum(1 for signal in buy_signals.values() if signal)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance,
'rsi': latest['RSI'].iloc[0],
'macd': latest['MACD'].iloc[0],
'signal_line': latest['Signal'].iloc[0],
'buy_signals': buy_signals,
'signal_count': signal_count,
'buy': signal_count >= 2
}
else:
rsi_signal = latest['RSI'] < 30
# MACD 신호 (MACD가 시그널 라인을 상향 돌파)
macd_signal = (prev['MACD'] < prev['Signal']) and (latest['MACD'] > latest['Signal'])
# 이동평균선 골든크로스 임박 또는 발생
ma_signal = (prev['MA5'] < prev['MA20']) and (latest['MA5'] >= latest['MA20'])
# 거래량 증가 신호 (5일 평균 대비 150% 이상)
volume_signal = latest['Volume'] > (latest['Volume_MA5'] * 1.5)
# 종합 신호
buy_signals = {
'bb_signal': bb_signal,
'rsi_signal': rsi_signal,
'macd_signal': macd_signal,
'ma_signal': ma_signal,
'volume_signal': volume_signal
}
# 최소 3개 이상의 신호가 동시에 발생할 때 매수 신호로 간주
signal_count = sum(1 for signal in buy_signals.values() if signal)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance,
'rsi': latest['RSI'],
'macd': latest['MACD'],
'signal_line': latest['Signal'],
'buy_signals': buy_signals,
'signal_count': signal_count,
'buy': signal_count >= 2
}
def format_message(info, market_type):
message = ""
if info['buy']:
message += '🛒 '
message += f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.2f}\n"
# 매수 신호 상세 정보
count = 0
if any(info['buy_signals'].values()):
message += "📊신호 ({count}):"
if info['buy_signals']['bb_signal']:
message += "- 볼린저 밴드 하단 근접 (근접도: {:.1f}%),".format(info['distance'] * 100)
count += 1
if info['buy_signals']['rsi_signal']:
message += f"- RSI 과매도 구간 (RSI: {info['rsi']:.1f}),"
count += 1
if info['buy_signals']['macd_signal']:
message += "- MACD 골든크로스,"
count += 1
if info['buy_signals']['ma_signal']:
message += "- 이동평균선 골든크로스,"
count += 1
if info['buy_signals']['volume_signal']:
message += "- 거래량 급증"
count += 1
message += "\n"
message = message.replace("{count}", str(count))
return message
def get_coin_data(symbol, retries=3):
for attempt in range(retries):
try:
url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(240, symbol)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
data = yf.download(
symbol,
start=start.strftime('%Y-%m-%d'),
end=end.strftime('%Y-%m-%d'),
interval='1d',
auto_adjust=True,
progress=False
)
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def monitor_us_stocks():
message_list = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = get_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_signals(symbol, data)
info['name'] = US_STOCKS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['signal_count']}")
if info['buy']:
#if info['buy'] or any(info['buy_signals'].values()):
message_list.append(format_message(info, 'US'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_kr_stocks():
message_list = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
data = get_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_signals(symbol, data)
info['name'] = KR_ETFS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['signal_count']}")
if info['buy']:
#if info['buy'] or any(info['buy_signals'].values()):
message_list.append(format_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_coins():
message_list = []
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
data = get_coin_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_signals(symbol, data)
info['name'] = KR_COINS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['signal_count']}")
if info['buy']:
#if info['buy'] or any(info['buy_signals'].values()):
message_list.append(format_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_coin_telegram_message(message_list, header="[KRW-COIN]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def run_schedule():
# 코인 모니터링 스케줄 (매시간 1분, 11분, 21분, 31분, 41분, 51분)
for minute in [4, 34]:
schedule.every().hour.at(f":{minute:02d}").do(monitor_coins)
# 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분)
schedule.every().day.at("16:30").do(monitor_us_stocks)
schedule.every().day.at("23:30").do(monitor_us_stocks)
schedule.every().day.at("05:10").do(monitor_us_stocks)
# 한국 ETF 모니터링 스케줄 (매일 오전 8시)
schedule.every().day.at("18:20").do(monitor_kr_stocks)
schedule.every().day.at("07:10").do(monitor_kr_stocks)
print("Scheduler started. Monitoring will run at specified times.")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
run_schedule()