Files
DeepCoin/stock_monitor.py
dsyoon a869e6e0da init
2025-08-09 14:01:16 +09:00

596 lines
24 KiB
Python

import pandas as pd
from HTS2 import HTS
from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta
import sqlite3
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
import FinanceDataReader as fdr
import numpy as np
import os
hts = HTS()
# 매수 금지 시간을 관리하는 JSON 파일 경로
COOLDOWN_FILE = 'coins_buy_time.json'
def load_buy_cooldown():
"""매수 금지 시간을 JSON 파일에서 로드"""
if os.path.exists(COOLDOWN_FILE):
try:
with open(COOLDOWN_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# 문자열을 datetime 객체로 변환
cooldown = {}
for symbol, time_str in data.items():
cooldown[symbol] = datetime.fromisoformat(time_str)
return cooldown
except Exception as e:
print(f"Error loading cooldown data: {e}")
return {}
return {}
def save_buy_cooldown(cooldown):
"""매수 금지 시간을 JSON 파일에 저장"""
try:
# datetime 객체를 문자열로 변환
data = {}
for symbol, dt in cooldown.items():
data[symbol] = dt.isoformat()
with open(COOLDOWN_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error saving cooldown data: {e}")
# 매수 금지 시간을 추적하는 전역 딕셔너리
buy_cooldown = load_buy_cooldown()
def send_coin_msg(text):
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
return
def send_coin_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
return
def send_stock_msg(text):
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
return
def send_stock_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
return
def normalize_data(data):
"""데이터 정규화 함수 - 모든 코인에 동일하게 적용"""
# Min-Max 정규화를 위한 컬럼
columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']
normalized_data = data.copy()
# 각 컬럼별 정규화 (20일 롤링 윈도우 사용)
for column in columns_to_normalize:
min_val = data[column].rolling(window=20).min()
max_val = data[column].rolling(window=20).max()
# 0으로 나누기 방지
denominator = max_val - min_val
normalized_data[f'{column}_Norm'] = np.where(
denominator != 0,
(data[column] - min_val) / denominator,
0.5 # 기본값 설정
)
return normalized_data
def calculate_technical_indicators(data):
"""기술적 지표 계산 - 모든 코인에 동일하게 적용"""
# 데이터 정규화
data = normalize_data(data)
# 이동평균선 계산
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA40'] = data['Close'].rolling(window=40).mean()
data['MA120'] = data['Close'].rolling(window=120).mean()
data['MA200'] = data['Close'].rolling(window=200).mean()
data['MA240'] = data['Close'].rolling(window=240).mean()
data['MA720'] = data['Close'].rolling(window=720).mean()
data['MA1440'] = data['Close'].rolling(window=1440).mean()
# --- 이격도(Deviation) 계산 ---
data['Deviation5'] = (data['Close'] / data['MA5']) * 100
data['Deviation20'] = (data['Close'] / data['MA20']) * 100
data['Deviation40'] = (data['Close'] / data['MA40']) * 100
data['Deviation120'] = (data['Close'] / data['MA120']) * 100
data['Deviation200'] = (data['Close'] / data['MA200']) * 100
data['Deviation240'] = (data['Close'] / data['MA240']) * 100
data['Deviation720'] = (data['Close'] / data['MA720']) * 100
data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100
# 매수 타이밍을 이동평균선으로 결정
# 골든크로스: 단기 이동평균선이 장기 이동평균선을 상향 돌파할 때 매수
data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1))
# 볼린저 밴드 계산
data['MA'] = data['Close'].rolling(window=20).mean()
data['STD'] = data['Close'].rolling(window=20).std()
data['Upper'] = data['MA'] + (2 * data['STD'])
data['Lower'] = data['MA'] - (2 * data['STD'])
return data
def buy_ticker(symbol, data):
try:
current_time = datetime.now()
if data['buy_signal'].iloc[-1] != 'fall_5p':
# 5%이상 급락일 때는 10분마다 매수
BUY_AMOUNT = 100000
else:
# 매수 금지 시간 확인 (20분)
if symbol in buy_cooldown:
time_diff = current_time - buy_cooldown[symbol]
if time_diff.total_seconds() < 1200: # 20분 = 1200초
print(f"{symbol}: 매수 금지 중 (남은 시간: {1200 - time_diff.total_seconds():.0f}초)")
return False
BUY_AMOUNT = 6000
if data['buy_signal'].iloc[-1] == 'movingaverage':
BUY_AMOUNT = 10000
elif data['buy_signal'].iloc[-1] == 'deviation40':
BUY_AMOUNT = 15000
elif data['buy_signal'].iloc[-1] == 'deviation240':
BUY_AMOUNT = 6000
elif data['buy_signal'].iloc[-1] == 'deviation1440':
if symbol in ['BONK', 'PEPE', 'TON']:
BUY_AMOUNT = 30000
else:
BUY_AMOUNT = 50000
_ = hts.buyCoinMarket(symbol, BUY_AMOUNT)
# 매수 성공 시 금지 시간 설정 및 파일에 저장
buy_cooldown[symbol] = current_time
save_buy_cooldown(buy_cooldown)
print(f"{KR_COINS[symbol]} ({symbol}): {data['Close'].iloc[-1]:.4f}: 매수 완료, 20분간 매수 금지 시작")
try:
pool = Pool(12)
pool.map(send_coin_msg, ["[KRW-COIN]" + "\n" + format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['buy_signal'].iloc[-1])])
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
except Exception as e:
print(f"Error buying {symbol}: {str(e)}")
return False
return True
def check_buy_point(symbol, data, simulation=None):
# 매수 포인트 탐지 및 표시
# 데이터 복사본 생성하여 SettingWithCopyWarning 방지
data = data.copy()
# 'buy_point' 열 초기화
data['buy_signal'] = ''
data['buy_point'] = 0
# FutureWarning 해결
if data['buy_point'].iloc[-1] != 1:
# 코드 계속
for i in range(1, len(data)):
# 이동평균선 기반 매수 조건
if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \
all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \
data['MA720'].iloc[i] < data['MA1440'].iloc[i]:
data.at[data.index[i], 'buy_signal'] = 'movingaverage'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'movingaverage'
data.at[data.index[-1], 'buy_point'] = 1
# Deviation40(이격도 40) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation40'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation40'
data.at[data.index[-1], 'buy_point'] = 1
if symbol not in ['BONK']:
# BONK는 240 체크하지 않음
if symbol in ['TRX']:
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation240'
data.at[data.index[-1], 'buy_point'] = 1
else:
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation240'
data.at[data.index[-1], 'buy_point'] = 1
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if symbol in ['TON']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
elif symbol in ['XRP']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
elif symbol in ['BONK']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
else:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
# 하락 5% 조건: 현재가 또는 현재 봉의 저가가 지난 봉 고가/현재 봉 고가 대비 5% 이상 하락
try:
prev_high = data['High'].iloc[i - 1]
curr_high = data['High'].iloc[i]
curr_close = data['Close'].iloc[i]
curr_low = data['Low'].iloc[i]
cond_close_drop = (curr_close <= prev_high * 0.95) or (curr_close <= curr_high * 0.95)
cond_low_drop = (curr_low <= prev_high * 0.95) or (curr_low <= curr_high * 0.95)
if cond_close_drop or cond_low_drop:
data.at[data.index[i], 'buy_signal'] = 'fall_5p'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'fall_5p'
data.at[data.index[-1], 'buy_point'] = 1
except Exception:
pass
return data
def format_message(market_type, symbol, symbol_name, close, buy_signal):
message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} "
message += f"현재가: {'$' if market_type == 'US' else ''}{close:.4f}, "
return message
def format_ma_message(info, market_type):
"""MA 알림 메시지 생성"""
prefix = '상승 ' if info.get('alert') else ''
message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.4f} \n"
return message
def get_coin_data(symbol, interval=60, to=None, retries=3):
for attempt in range(retries):
try:
#url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(interval, symbol)
if to is None:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol)
else:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to)
#url = 'https://api.bithumb.com/v1/candles/minutes/60?market=KRW-ADA&count=200'
#url = 'https://api.bithumb.com/v1/candles/minutes/minutes/60?market=KRW-ADA&count=200&to=2025-08-06 10:38:38'
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_coin_more_data(symbol, interval, bong_count=3000):
# 코인 데이터 1500개 봉 가져오기
to = datetime.now()
data = None
while data is None or len(data) < bong_count:
if data is None:
data = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
else:
p_count = len(data)
df = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
data = pd.concat([data, df], ignore_index=True)
if p_count == len(data):
break
time.sleep(0.3)
to = to - relativedelta(minutes=interval * 200)
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
# 코인 데이터 1500개 봉 가져오기
return data
def get_coin_saved_data(symbol, interval, data):
conn = sqlite3.connect('coins.db')
cursor = conn.cursor()
for i in range(1, len(data)):
cursor.execute("SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?", (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval))
arr = cursor.fetchone()
if not arr:
cursor.execute("INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (interval, symbol, KR_COINS[symbol], data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), data['datetime'].iloc[-i].strftime('%Y%m%d'), data['datetime'].iloc[-i].strftime('%H%M%S'), data['Close'].iloc[-i], data['Open'].iloc[-i], data['High'].iloc[-i], data['Low'].iloc[-i], data['Volume'].iloc[-i]))
else:
break
cursor.execute("select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from " + symbol + " order by ymdhms desc limit 5000) subquery order by datetime")
result = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
df = pd.DataFrame(result)
df.columns = ['Open','Close','High','Low','Volume','datetime']
df = df.set_index('datetime')
df = df.sort_index()
df['datetime'] = df.index
return df
def get_coin_some_data(symbol, interval):
data = get_coin_data(symbol, interval)
data_1 = get_coin_data(symbol, interval=1)
data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60
saved_data = get_coin_saved_data(symbol, interval, data)
data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True)
#data = pd.concat([data, saved_data], ignore_index=True)
data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S')
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_kr_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
# FinanceDataReader를 사용하여 한국 주식 데이터 가져오기
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
# FinanceDataReader의 컬럼명을 yfinance 형식으로 변환
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume'
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def monitor_us_stocks():
message_list = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = get_kr_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(symbol, data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}")
message_list.append(format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1]))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_kr_stocks():
message_list = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
try:
# .KS 접미사 제거
clean_symbol = symbol.replace('.KS', '')
data = get_kr_stock_data(clean_symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(symbol, data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}")
message_list.append(format_message('KR', symbol, KR_ETFS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1]))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
# 각 심볼 처리 후 1초 대기
time.sleep(1)
except Exception as e:
print(f"Unexpected error processing {symbol}: {str(e)}")
continue
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_coins():
message_list = []
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
# 1시간
interval = 60
data = get_coin_some_data(symbol, interval)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(symbol, data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
# buy
buy_success = buy_ticker(symbol, recent_data)
if not buy_success:
continue # 매수 금지 중이면 다음 코인으로 넘어감
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
return
def run_schedule():
# 코인 모니터링 스케줄 (매시간 4분, 14분, 24분, 34분, 44분, 54분)
for minute in [2, 12, 22, 32, 42, 52]:
schedule.every().hour.at(f":{minute:02d}").do(monitor_coins)
# 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분)
schedule.every().day.at("16:30").do(monitor_us_stocks)
schedule.every().day.at("23:30").do(monitor_us_stocks)
schedule.every().day.at("05:10").do(monitor_us_stocks)
# 한국 ETF 모니터링 스케줄 (매일 오전 8시)
schedule.every().day.at("18:20").do(monitor_kr_stocks)
schedule.every().day.at("07:10").do(monitor_kr_stocks)
print("Scheduler started. Monitoring will run at specified times.")
print(f"Loaded cooldown data for {len(buy_cooldown)} coins")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
run_schedule()