Files
DeepCoin/stock_monitor.py
dsyoon 74c179dd3c init
2025-08-06 14:52:26 +09:00

422 lines
15 KiB
Python

import pandas as pd
from HTS2 import HTS
from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
import FinanceDataReader as fdr
import numpy as np
hts = HTS()
BUY_AMOUNT = 10000
def send_coin_msg(text):
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
return
def send_coin_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
return
def buy_ticker(symbole):
try:
_ = hts.buyCoinMarket(symbole, BUY_AMOUNT)
except Exception as e:
print(f"Error buying {symbole}: {str(e)}")
return
def send_stock_msg(text):
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
return
def send_stock_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
return
def normalize_data(data):
"""데이터 정규화 함수 - 모든 코인에 동일하게 적용"""
# Min-Max 정규화를 위한 컬럼
columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']
normalized_data = data.copy()
# 각 컬럼별 정규화 (20일 롤링 윈도우 사용)
for column in columns_to_normalize:
min_val = data[column].rolling(window=20).min()
max_val = data[column].rolling(window=20).max()
# 0으로 나누기 방지
denominator = max_val - min_val
normalized_data[f'{column}_Norm'] = np.where(
denominator != 0,
(data[column] - min_val) / denominator,
0.5 # 기본값 설정
)
return normalized_data
def calculate_technical_indicators(data):
"""기술적 지표 계산 - 모든 코인에 동일하게 적용"""
# 데이터 정규화
data = normalize_data(data)
# 이동평균선 계산
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA40'] = data['Close'].rolling(window=40).mean()
data['MA120'] = data['Close'].rolling(window=120).mean()
data['MA200'] = data['Close'].rolling(window=200).mean()
data['MA240'] = data['Close'].rolling(window=240).mean()
data['MA720'] = data['Close'].rolling(window=720).mean()
data['MA1440'] = data['Close'].rolling(window=1440).mean()
# 매수 타이밍을 이동평균선으로 결정
# 골든크로스: 단기 이동평균선이 장기 이동평균선을 상향 돌파할 때 매수
data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1))
# 볼린저 밴드 계산
data['MA'] = data['Close'].rolling(window=20).mean()
data['STD'] = data['Close'].rolling(window=20).std()
data['Upper'] = data['MA'] + (2 * data['STD'])
data['Lower'] = data['MA'] - (2 * data['STD'])
return data
def check_buy_point(data, simulation=None):
# 매수 포인트 탐지 및 표시
if simulation:
recent_data = data
else:
recent_data = data.tail(10)
# SettingWithCopyWarning 해결
recent_data.loc[:, 'buy_point'] = 0
# FutureWarning 해결
if recent_data['buy_point'].iloc[-1] != 1:
# 코드 계속
for i in range(1, len(recent_data)):
if all(recent_data[f'MA{n}'].iloc[i] < recent_data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \
all(recent_data[f'MA{n}'].iloc[i] > recent_data[f'MA{n}'].iloc[i-1] for n in [5, 20, 40, 120, 200, 240]) and \
recent_data['MA720'].iloc[i] < recent_data['MA1440'].iloc[i]:
recent_data.at[recent_data.index[i], 'buy_point'] = 1
if not simulation:
if recent_data['buy_point'][-10:-1].sum() > 0:
recent_data.at[recent_data.index[-1], 'buy_point'] = 1
return recent_data
def format_message(market_type, symbol, symbol_name, close):
message = f"매수 [{market_type}] {symbol_name} ({symbol}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{close:.2f}, "
return message
def format_ma_message(info, market_type):
"""MA 알림 메시지 생성"""
prefix = '상승 ' if info.get('alert') else ''
message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.2f} \n"
return message
def get_coin_data(symbol, interval=240, to=None, retries=3):
for attempt in range(retries):
try:
#url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(interval, symbol)
if to is None:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol)
else:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to)
#url = 'https://api.bithumb.com/v1/candles/minutes/60?market=KRW-ADA&count=200'
#url = 'https://api.bithumb.com/v1/candles/minutes/minutes/60?market=KRW-ADA&count=200&to=2025-08-06 10:38:38'
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_coin_more_data(symbol, interval, bong_count=3000):
# 코인 데이터 1500개 봉 가져오기
to = datetime.now()
data = None
while data is None or len(data) < bong_count:
if data is None:
data = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
else:
df = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
data = pd.concat([data, df], ignore_index=True)
time.sleep(0.3)
to = to - relativedelta(minutes=interval * 200)
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
# 코인 데이터 1500개 봉 가져오기
return data
def get_kr_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
# FinanceDataReader를 사용하여 한국 주식 데이터 가져오기
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
# FinanceDataReader의 컬럼명을 yfinance 형식으로 변환
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume'
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def monitor_us_stocks():
message_list = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = get_kr_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'][-1]:.2f}")
message_list.append(format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'][-1]))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_kr_stocks():
message_list = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
try:
# .KS 접미사 제거
clean_symbol = symbol.replace('.KS', '')
data = get_kr_stock_data(clean_symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'][-1]:.2f}")
message_list.append(format_message('KR', symbol, US_STOCKS[symbol], recent_data['Close'][-1]))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
# 각 심볼 처리 후 1초 대기
time.sleep(1)
except Exception as e:
print(f"Unexpected error processing {symbol}: {str(e)}")
continue
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_coins():
message_list = []
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
# 1시간
interval = 60
data = get_coin_more_data(symbol, interval)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'][-1]:.2f}")
message_list.append(format_message('COIN', symbol, US_STOCKS[symbol], recent_data['Close'][-1]))
# buy
buy_ticker(symbol)
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
# 4시간
interval = 240
data = get_coin_more_data(symbol, interval, bong_count=1500)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
info = check_buy_point(data, simulation=True) # Changed to check_buy_point
if info is None:
continue
info['name'] = KR_COINS[symbol]
print(f" - {info['name']} ({symbol}): {info['price']:.2f} -> {info['buy']}")
if info['buy']:
message_list.append(format_message(info, 'KR'))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
if len(message_list) > 0:
try:
# send message
send_coin_telegram_message(message_list, header="[KRW-COIN]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
# ----------------------
# Turnaround Detector v6
# ----------------------
def detect_turnaround_signal(symbol, data, interval=0, params=None):
if len(data) < 7:
return None
# 이동평균을 기반으로 매수 신호 결정
cur = data.iloc[-1]
prev = data.iloc[-2]
return None
def run_schedule():
# 코인 모니터링 스케줄 (매시간 1분, 11분, 21분, 31분, 41분, 51분)
for minute in [4, 14, 24, 34, 44, 54]:
schedule.every().hour.at(f":{minute:02d}").do(monitor_coins)
# 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분)
schedule.every().day.at("16:30").do(monitor_us_stocks)
schedule.every().day.at("23:30").do(monitor_us_stocks)
schedule.every().day.at("05:10").do(monitor_us_stocks)
# 한국 ETF 모니터링 스케줄 (매일 오전 8시)
schedule.every().day.at("18:20").do(monitor_kr_stocks)
schedule.every().day.at("07:10").do(monitor_kr_stocks)
print("Scheduler started. Monitoring will run at specified times.")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
#run_schedule()
monitor_coins()