Files
DeepCoin/stock_monitor.py
dsyoon 0f93538112 init
2025-07-20 19:48:44 +09:00

424 lines
15 KiB
Python

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
import FinanceDataReader as fdr
def send_coin_msg(text):
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
return
def send_coin_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
return
def send_stock_msg(text):
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
return
def send_stock_telegram_message(message_list, header):
pStr = header+"\n"
for i, message in enumerate(message_list):
pStr += message
if i+1 % 20 == 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
return
def calculate_bollinger_bands(data):
data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean()
data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std()
data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD'])
data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD'])
return data
def calculate_technical_indicators(data):
# 볼린저 밴드 계산
data = calculate_bollinger_bands(data)
# RSI 계산 (14일 기준)
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
data['RSI'] = 100 - (100 / (1 + rs))
# MACD 계산
exp1 = data['Close'].ewm(span=12, adjust=False).mean()
exp2 = data['Close'].ewm(span=26, adjust=False).mean()
data['MACD'] = exp1 - exp2
data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
# 이동평균선
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA60'] = data['Close'].rolling(window=60).mean()
# 거래량 이동평균
data['Volume_MA5'] = data['Volume'].rolling(window=5).mean()
return data
def check_buy_signals(symbol, data):
if len(data) < 60: # 최소 60일치 데이터 필요
return None
latest = data.iloc[-1]
prev = data.iloc[-2]
# 볼린저 밴드 신호
bb_signal = False
if isinstance(latest['Upper'], float):
upper_band = latest['Upper']
lower_band = latest['Lower']
current_price = latest['Close']
else:
upper_band = latest['Upper'].iloc[0]
lower_band = latest['Lower'].iloc[0]
current_price = latest['Close'].iloc[0]
distance = (current_price - lower_band) / (upper_band - lower_band)
bb_signal = distance < BOLLINGER_THRESHOLD
# U자 반등 후 이전 고점 돌파 여부 계산 (BREAKOUT)
breakout_signal = False
if len(data) >= BREAKOUT_LOOKBACK + 1:
window_close = data['Close'].iloc[-BREAKOUT_LOOKBACK-1:-1]
prev_high = window_close.max()
prev_low = window_close.min()
# 가격이 충분히 내려갔다가(BUY_THRESHOLD 비율) 다시 이전 고점을 돌파하면 breakout으로 간주
if prev_high > 0 and (prev_high - prev_low) / prev_high > BUY_THRESHOLD and current_price > prev_high:
breakout_signal = True
# RSI 과매도 신호 (RSI < 30)
if not isinstance(latest['Upper'], float):
rsi_signal = latest['RSI'].iloc[0] < 30
# MACD 신호 (MACD가 시그널 라인을 상향 돌파)
macd_signal = (prev['MACD'].iloc[0] < prev['Signal'].iloc[0]) and (latest['MACD'].iloc[0] > latest['Signal'].iloc[0])
# 이동평균선 골든크로스 임박 또는 발생
ma_signal = (prev['MA5'].iloc[0] < prev['MA20'].iloc[0]) and (latest['MA5'].iloc[0] >= latest['MA20'].iloc[0])
# 거래량 증가 신호 (5일 평균 대비 150% 이상)
volume_signal = latest['Volume'].iloc[0] > (latest['Volume_MA5'].iloc[0] * 1.5)
# 종합 신호
buy_signals = {
'bb_signal': bb_signal,
'rsi_signal': rsi_signal,
'macd_signal': macd_signal,
'ma_signal': ma_signal,
'volume_signal': volume_signal,
'breakout_signal': breakout_signal
}
# 최소 3개 이상의 신호가 동시에 발생할 때 매수 신호로 간주
signal_count = sum(1 for signal in buy_signals.values() if signal)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance,
'rsi': latest['RSI'].iloc[0],
'macd': latest['MACD'].iloc[0],
'signal_line': latest['Signal'].iloc[0],
'buy_signals': buy_signals,
'signal_count': signal_count,
'buy': breakout_signal or ((bb_signal and rsi_signal) or (signal_count >= 2 and (bb_signal or rsi_signal)))
}
else:
rsi_signal = latest['RSI'] < 30
# MACD 신호 (MACD가 시그널 라인을 상향 돌파)
macd_signal = (prev['MACD'] < prev['Signal']) and (latest['MACD'] > latest['Signal'])
# 이동평균선 골든크로스 임박 또는 발생
ma_signal = (prev['MA5'] < prev['MA20']) and (latest['MA5'] >= latest['MA20'])
# 거래량 증가 신호 (5일 평균 대비 150% 이상)
volume_signal = latest['Volume'] > (latest['Volume_MA5'] * 1.5)
# 종합 신호
buy_signals = {
'bb_signal': bb_signal,
'rsi_signal': rsi_signal,
'macd_signal': macd_signal,
'ma_signal': ma_signal,
'volume_signal': volume_signal,
'breakout_signal': breakout_signal
}
# 최소 3개 이상의 신호가 동시에 발생할 때 매수 신호로 간주
signal_count = sum(1 for signal in buy_signals.values() if signal)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance,
'rsi': latest['RSI'],
'macd': latest['MACD'],
'signal_line': latest['Signal'],
'buy_signals': buy_signals,
'signal_count': signal_count,
'buy': breakout_signal or ((bb_signal and rsi_signal) or (signal_count >= 2 and (bb_signal or rsi_signal)))
}
def format_message(info, market_type):
message = ""
if info['buy']:
message += '🛒 '
message += f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.2f}, "
# 매수 신호 상세 정보
count = 0
if any(info['buy_signals'].values()):
message += "📊신호 ({count}):"
if info['buy_signals']['bb_signal']:
message += "- 볼린저 밴드 하단 근접 (근접도: {:.1f}%),".format(info['distance'] * 100)
count += 1
if info['buy_signals']['rsi_signal']:
message += f"- RSI 과매도 구간 (RSI: {info['rsi']:.1f}),"
count += 1
if info['buy_signals']['macd_signal']:
message += "- MACD 골든크로스,"
count += 1
if info['buy_signals']['ma_signal']:
message += "- 이동평균선 골든크로스,"
count += 1
if info['buy_signals']['volume_signal']:
message += "- 거래량 급증"
count += 1
if info['buy_signals'].get('breakout_signal'):
message += "- U자 반등 돌파"
count += 1
message += "\n"
message = message.replace("{count}", str(count))
return message
def get_coin_data(symbol, retries=3):
for attempt in range(retries):
try:
url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(240, symbol)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_kr_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
# FinanceDataReader를 사용하여 한국 주식 데이터 가져오기
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
# FinanceDataReader의 컬럼명을 yfinance 형식으로 변환
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume'
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def monitor_us_stocks():
message_list = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = get_kr_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_signals(symbol, data)
info['name'] = US_STOCKS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['signal_count']}")
if info['buy']:
#if info['buy'] or any(info['buy_signals'].values()):
message_list.append(format_message(info, 'US'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_kr_stocks():
message_list = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
try:
# .KS 접미사 제거
clean_symbol = symbol.replace('.KS', '')
data = get_kr_stock_data(clean_symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_signals(symbol, data)
info['name'] = KR_ETFS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['signal_count']}")
if info['buy']:
message_list.append(format_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
# 각 심볼 처리 후 1초 대기
time.sleep(1)
except Exception as e:
print(f"Unexpected error processing {symbol}: {str(e)}")
continue
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_coins():
message_list = []
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
data = get_coin_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_signals(symbol, data)
info['name'] = KR_COINS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['signal_count']}")
if info['buy']:
#if info['buy'] or any(info['buy_signals'].values()):
message_list.append(format_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_coin_telegram_message(message_list, header="[KRW-COIN]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def run_schedule():
# 코인 모니터링 스케줄 (매시간 1분, 11분, 21분, 31분, 41분, 51분)
for minute in [4, 34]:
schedule.every().hour.at(f":{minute:02d}").do(monitor_coins)
# 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분)
schedule.every().day.at("16:30").do(monitor_us_stocks)
schedule.every().day.at("23:30").do(monitor_us_stocks)
schedule.every().day.at("05:10").do(monitor_us_stocks)
# 한국 ETF 모니터링 스케줄 (매일 오전 8시)
schedule.every().day.at("18:20").do(monitor_kr_stocks)
schedule.every().day.at("07:10").do(monitor_kr_stocks)
print("Scheduler started. Monitoring will run at specified times.")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
run_schedule()