This commit is contained in:
dsyoon
2025-08-07 00:30:09 +09:00
parent 4b3e98d100
commit dc89b306dc
2 changed files with 460 additions and 0 deletions

535
stock_monitor_1.py.bak Normal file
View File

@@ -0,0 +1,535 @@
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
import FinanceDataReader as fdr
def send_coin_msg(text):
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
return
def send_coin_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
return
def send_stock_msg(text):
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
return
def send_stock_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
return
def calculate_bollinger_bands(data):
data['MA'] = data['Close'].rolling(window=BOLLINGER_PERIOD).mean()
data['STD'] = data['Close'].rolling(window=BOLLINGER_PERIOD).std()
data['Upper'] = data['MA'] + (BOLLINGER_STD * data['STD'])
data['Lower'] = data['MA'] - (BOLLINGER_STD * data['STD'])
return data
def calculate_technical_indicators(data):
# 볼린저 밴드 계산
data = calculate_bollinger_bands(data)
# RSI 계산 (14일 기준)
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
data['RSI'] = 100 - (100 / (1 + rs))
# MACD 계산
exp1 = data['Close'].ewm(span=12, adjust=False).mean()
exp2 = data['Close'].ewm(span=26, adjust=False).mean()
data['MACD'] = exp1 - exp2
data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
# 이동평균선
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA40'] = data['Close'].rolling(window=40).mean()
data['MA60'] = data['Close'].rolling(window=60).mean()
# 거래량 이동평균
data['Volume_MA5'] = data['Volume'].rolling(window=5).mean()
return data
def check_ma_alert(symbol, data, interval=0):
"""1시간봉 기준 이동평균선 조건 알림
- 5봉 이동평균(MA5) 상승
- 20봉 이동평균(MA20) 상승
- 40봉 이동평균(MA40)이 하락세에서 상승세로 전환되는 시점 (직전 기울기 < 0 and 현재 기울기 ≥ 0)
"""
# 40 이동평균선의 기울기를 계산하기 위해 최소 41개 캔들이 필요합니다.
if len(data) < 41:
return None
# 이동평균선 값 추출
ma5_current, ma5_prev = data['MA5'].iloc[-1], data['MA5'].iloc[-2]
ma20_current, ma20_prev = data['MA20'].iloc[-1], data['MA20'].iloc[-2]
ma40_current = data['MA40'].iloc[-1]
ma40_prev1, ma40_prev2 = data['MA40'].iloc[-2], data['MA40'].iloc[-3]
# 조건 계산
up5 = ma5_current > ma5_prev
up20 = ma20_current > ma20_prev
slope_prev = ma40_prev1 - ma40_prev2 # 직전 기울기 (음수: 하락)
slope_now = ma40_current - ma40_prev1 # 현재 기울기
turning = (slope_prev < 0) and (slope_now >= 0)
alert = up5 and up20 and turning
return {
'symbol': symbol,
'price': data['Close'].iloc[-1],
'alert': alert,
'details': {
'interval': interval,
'up5': up5,
'up20': up20,
'ma40_turning': turning
}
}
def check_buy_signals(symbol, data):
if len(data) < 60: # 최소 60일치 데이터 필요
return None
latest = data.iloc[-1]
prev = data.iloc[-2]
# 볼린저 밴드 신호
bb_signal = False
if isinstance(latest['Upper'], float):
upper_band = latest['Upper']
lower_band = latest['Lower']
current_price = latest['Close']
else:
upper_band = latest['Upper'].iloc[0]
lower_band = latest['Lower'].iloc[0]
current_price = latest['Close'].iloc[0]
distance = (current_price - lower_band) / (upper_band - lower_band)
bb_signal = distance < BOLLINGER_THRESHOLD
# U자 반등 후 이전 고점 돌파 여부 계산 (BREAKOUT)
breakout_signal = False
if len(data) >= max(BREAKOUT_LOOKBACK, BREAKOUT_WEEK_LOOKBACK) + 1:
# ① U자 형태 확인
window_close = data['Close'].iloc[-BREAKOUT_LOOKBACK - 1:-1]
prev_high = window_close.max()
prev_low = window_close.min()
# ② 1주일(42캔들) 전 가격 대비 5% 이상 상승하지 않았는지 체크
price_week_ago = data['Close'].iloc[-BREAKOUT_WEEK_LOOKBACK - 1]
if price_week_ago > 0:
week_change = (current_price - price_week_ago) / price_week_ago
else:
week_change = 1 # 값이 0이면 조건 불충족 처리
# ③ 조건 종합: U자+돌파 && 주간 상승률 ≤ 5%
if (
prev_high > 0 and (prev_high - prev_low) / prev_high > BUY_THRESHOLD and current_price > prev_high
and week_change <= BREAKOUT_WEEK_LIMIT
):
breakout_signal = True
# 장기간 저항선 돌파 여부 계산 (LONG RESISTANCE BREAKOUT)
long_breakout_signal = False
if len(data) >= RESISTANCE_LOOKBACK + 1:
resistance_level = data['Close'].iloc[-RESISTANCE_LOOKBACK - 1:-1].max()
previous_closes = data['Close'].iloc[-RESISTANCE_LOOKBACK - 1:-1]
# 과거 구간에서 저항선 이상으로 종가가 한번도 올라가지 않은 경우 + 현재 가격이 저항선 돌파
if (previous_closes <= resistance_level * (1 + RESISTANCE_BREAK_THRESHOLD)).all() and \
current_price > resistance_level * (1 + RESISTANCE_BREAK_THRESHOLD):
long_breakout_signal = True
# RSI 과매도 신호 (RSI < 30)
if not isinstance(latest['Upper'], float):
rsi_signal = latest['RSI'].iloc[0] < 30
# MACD 신호 (MACD가 시그널 라인을 상향 돌파)
macd_signal = (prev['MACD'].iloc[0] < prev['Signal'].iloc[0]) and (
latest['MACD'].iloc[0] > latest['Signal'].iloc[0])
# 이동평균선 골든크로스 임박 또는 발생
ma_signal = (prev['MA5'].iloc[0] < prev['MA20'].iloc[0]) and (latest['MA5'].iloc[0] >= latest['MA20'].iloc[0])
# 거래량 증가 신호 (5일 평균 대비 150% 이상)
volume_signal = latest['Volume'].iloc[0] > (latest['Volume_MA5'].iloc[0] * 1.5)
# 종합 신호
buy_signals = {
'bb_signal': bb_signal,
'rsi_signal': rsi_signal,
'macd_signal': macd_signal,
'ma_signal': ma_signal,
'volume_signal': volume_signal,
'breakout_signal': breakout_signal,
'long_breakout_signal': long_breakout_signal
}
# 최소 3개 이상의 신호가 동시에 발생할 때 매수 신호로 간주
signal_count = sum(1 for signal in buy_signals.values() if signal)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance,
'rsi': latest['RSI'].iloc[0],
'macd': latest['MACD'].iloc[0],
'signal_line': latest['Signal'].iloc[0],
'buy_signals': buy_signals,
'signal_count': signal_count,
'buy': long_breakout_signal or breakout_signal or (
(bb_signal and rsi_signal) or (signal_count >= 2 and (bb_signal or rsi_signal)))
}
else:
rsi_signal = latest['RSI'] < 30
# MACD 신호 (MACD가 시그널 라인을 상향 돌파)
macd_signal = (prev['MACD'] < prev['Signal']) and (latest['MACD'] > latest['Signal'])
# 이동평균선 골든크로스 임박 또는 발생
ma_signal = (prev['MA5'] < prev['MA20']) and (latest['MA5'] >= latest['MA20'])
# 거래량 증가 신호 (5일 평균 대비 150% 이상)
volume_signal = latest['Volume'] > (latest['Volume_MA5'] * 1.5)
# 종합 신호
buy_signals = {
'bb_signal': bb_signal,
'rsi_signal': rsi_signal,
'macd_signal': macd_signal,
'ma_signal': ma_signal,
'volume_signal': volume_signal,
'breakout_signal': breakout_signal,
'long_breakout_signal': long_breakout_signal
}
# 최소 3개 이상의 신호가 동시에 발생할 때 매수 신호로 간주
signal_count = sum(1 for signal in buy_signals.values() if signal)
return {
'symbol': symbol,
'price': current_price,
'lower_band': lower_band,
'distance': distance,
'rsi': latest['RSI'],
'macd': latest['MACD'],
'signal_line': latest['Signal'],
'buy_signals': buy_signals,
'signal_count': signal_count,
'buy': long_breakout_signal or breakout_signal or (
(bb_signal and rsi_signal) or (signal_count >= 2 and (bb_signal or rsi_signal)))
}
def format_message(info, market_type):
message = ""
if info['buy']:
message += '🛒 '
message += f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.2f}, "
# 매수 신호 상세 정보
count = 0
if any(info['buy_signals'].values()):
message += "📊신호 ({count}):"
if info['buy_signals']['bb_signal']:
message += "- 볼린저 밴드 하단 근접 (근접도: {:.1f}%),".format(info['distance'] * 100)
count += 1
if info['buy_signals']['rsi_signal']:
message += f"- RSI 과매도 구간 (RSI: {info['rsi']:.1f}),"
count += 1
if info['buy_signals']['macd_signal']:
message += "- MACD 골든크로스,"
count += 1
if info['buy_signals']['ma_signal']:
message += "- 이동평균선 골든크로스,"
count += 1
if info['buy_signals']['volume_signal']:
message += "- 거래량 급증"
count += 1
if info['buy_signals'].get('breakout_signal'):
message += "- U자 반등 돌파"
count += 1
if info['buy_signals'].get('long_breakout_signal'):
message += "- 장기 저항 돌파"
count += 1
message += "\n"
message = message.replace("{count}", str(count))
return message
def format_ma_message(info, market_type):
"""MA 알림 메시지 생성"""
prefix = '📈 ' if info.get('alert') else ''
message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.2f} \n"
return message
def get_coin_data(symbol, interval=240, retries=3):
for attempt in range(retries):
try:
url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(interval, symbol)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_kr_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
# FinanceDataReader를 사용하여 한국 주식 데이터 가져오기
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
# FinanceDataReader의 컬럼명을 yfinance 형식으로 변환
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume'
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def monitor_us_stocks():
message_list = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = get_kr_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_ma_alert(symbol, data, 0)
if info is None:
continue
info['name'] = US_STOCKS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['alert']}")
if info['alert']:
message_list.append(format_ma_message(info, 'US'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_kr_stocks():
message_list = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
try:
# .KS 접미사 제거
clean_symbol = symbol.replace('.KS', '')
data = get_kr_stock_data(clean_symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_ma_alert(symbol, data, 0)
if info is None:
continue
info['name'] = KR_ETFS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['alert']}")
if info['alert']:
message_list.append(format_ma_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
# 각 심볼 처리 후 1초 대기
time.sleep(1)
except Exception as e:
print(f"Unexpected error processing {symbol}: {str(e)}")
continue
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_coins():
message_list = []
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
# 1시간
interval = 60
data = get_coin_data(symbol, interval=interval)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_ma_alert(symbol, data, interval)
if info is None:
continue
info['name'] = KR_COINS[symbol]
print(
f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['alert']} ({info['details']['interval']})")
if info['alert']:
message_list.append(format_ma_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
# 4시간
interval = 240
data = get_coin_data(symbol, interval=interval)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_ma_alert(symbol, data, interval)
if info is None:
continue
info['name'] = KR_COINS[symbol]
print(
f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['alert']} ({info['details']['interval']})")
if info['alert']:
message_list.append(format_ma_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_coin_telegram_message(message_list, header="[KRW-COIN]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def run_schedule():
# 코인 모니터링 스케줄 (매시간 1분, 11분, 21분, 31분, 41분, 51분)
for minute in [4, 14, 24, 34, 44, 54]:
schedule.every().hour.at(f":{minute:02d}").do(monitor_coins)
# 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분)
schedule.every().day.at("16:30").do(monitor_us_stocks)
schedule.every().day.at("23:30").do(monitor_us_stocks)
schedule.every().day.at("05:10").do(monitor_us_stocks)
# 한국 ETF 모니터링 스케줄 (매일 오전 8시)
schedule.every().day.at("18:20").do(monitor_kr_stocks)
schedule.every().day.at("07:10").do(monitor_kr_stocks)
print("Scheduler started. Monitoring will run at specified times.")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
run_schedule()