import pandas as pd from HTS2 import HTS from dateutil.relativedelta import relativedelta from datetime import datetime, timedelta import telegram import time import requests import json import asyncio from multiprocessing import Pool import schedule from config import * import FinanceDataReader as fdr import numpy as np hts = HTS() def send_coin_msg(text): coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN) asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text)) return def send_coin_telegram_message(message_list, header): pStr = header + "\n" for i, message in enumerate(message_list): pStr += message if i + 1 % 20 == 0: pool = Pool(12) pool.map(send_coin_msg, [pStr]) pStr = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(send_coin_msg, [pStr]) return def buy_ticker(symbole, data): try: BUY_AMOUNT = 5000 if data['buy_signal'].iloc[-1] == 'movingaverage': BUY_AMOUNT = 50000 elif data['buy_signal'].iloc[-1] == 'deviation40': BUY_AMOUNT = 6000 elif data['buy_signal'].iloc[-1] == 'deviation240': BUY_AMOUNT = 5000 _ = hts.buyCoinMarket(symbole, BUY_AMOUNT) except Exception as e: print(f"Error buying {symbole}: {str(e)}") return def send_stock_msg(text): stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN) asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text)) return def send_stock_telegram_message(message_list, header): pStr = header + "\n" for i, message in enumerate(message_list): pStr += message if i + 1 % 20 == 0: pool = Pool(12) pool.map(send_stock_msg, [pStr]) pStr = '' if len(message_list) % 20 != 0: pool = Pool(12) pool.map(send_stock_msg, [pStr]) return def normalize_data(data): """데이터 정규화 함수 - 모든 코인에 동일하게 적용""" # Min-Max 정규화를 위한 컬럼 columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume'] normalized_data = data.copy() # 각 컬럼별 정규화 (20일 롤링 윈도우 사용) for column in columns_to_normalize: min_val = data[column].rolling(window=20).min() max_val = data[column].rolling(window=20).max() # 0으로 나누기 방지 denominator = max_val - min_val normalized_data[f'{column}_Norm'] = np.where( denominator != 0, (data[column] - min_val) / denominator, 0.5 # 기본값 설정 ) return normalized_data def calculate_technical_indicators(data): """기술적 지표 계산 - 모든 코인에 동일하게 적용""" # 데이터 정규화 data = normalize_data(data) # 이동평균선 계산 data['MA5'] = data['Close'].rolling(window=5).mean() data['MA20'] = data['Close'].rolling(window=20).mean() data['MA40'] = data['Close'].rolling(window=40).mean() data['MA120'] = data['Close'].rolling(window=120).mean() data['MA200'] = data['Close'].rolling(window=200).mean() data['MA240'] = data['Close'].rolling(window=240).mean() data['MA720'] = data['Close'].rolling(window=720).mean() data['MA1440'] = data['Close'].rolling(window=1440).mean() # --- 이격도(Deviation) 계산 --- data['Deviation5'] = (data['Close'] / data['MA5']) * 100 data['Deviation20'] = (data['Close'] / data['MA20']) * 100 data['Deviation40'] = (data['Close'] / data['MA40']) * 100 data['Deviation120'] = (data['Close'] / data['MA120']) * 100 data['Deviation200'] = (data['Close'] / data['MA200']) * 100 data['Deviation240'] = (data['Close'] / data['MA240']) * 100 data['Deviation720'] = (data['Close'] / data['MA720']) * 100 data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100 # 매수 타이밍을 이동평균선으로 결정 # 골든크로스: 단기 이동평균선이 장기 이동평균선을 상향 돌파할 때 매수 data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1)) # 볼린저 밴드 계산 data['MA'] = data['Close'].rolling(window=20).mean() data['STD'] = data['Close'].rolling(window=20).std() data['Upper'] = data['MA'] + (2 * data['STD']) data['Lower'] = data['MA'] - (2 * data['STD']) return data def check_buy_point(data, simulation=None): """ # 매수 포인트 탐지 및 표시 if simulation: recent_data = data else: # recent_data의 복사본 생성 recent_data = data.tail(10).copy() # 'buy_point' 열 초기화 recent_data['buy_point'] = 0 # FutureWarning 해결 if recent_data['buy_point'].iloc[-1] != 1: # 코드 계속 for i in range(1, len(recent_data)): if all(recent_data[f'MA{n}'].iloc[i] < recent_data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \ all(recent_data[f'MA{n}'].iloc[i] > recent_data[f'MA{n}'].iloc[i-1] for n in [5, 20, 40, 120, 200, 240]) and \ recent_data['MA720'].iloc[i] < recent_data['MA1440'].iloc[i]: recent_data.at[recent_data.index[i], 'buy_point'] = 1 if not simulation: if recent_data['buy_point'][-10:-1].sum() > 0: recent_data.at[recent_data.index[-1], 'buy_point'] = 1 return recent_data """ # 매수 포인트 탐지 및 표시 # 'buy_point' 열 초기화 data['buy_signal'] = '' data['buy_point'] = 0 # FutureWarning 해결 if data['buy_point'].iloc[-1] != 1: # 코드 계속 for i in range(1, len(data)): # 이동평균선 기반 매수 조건 if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \ all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \ data['MA720'].iloc[i] < data['MA1440'].iloc[i]: data.at[data.index[i], 'buy_signal'] = 'movingaverage' data.at[data.index[i], 'buy_point'] = 1 if not simulation: if data['buy_point'][-10:-1].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'movingaverage' data.at[data.index[-1], 'buy_point'] = 1 # Deviation40(이격도 40) 기반 매수 조건: 90 이하에서 상승 전환 if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90: data.at[data.index[i], 'buy_signal'] = 'deviation40' data.at[data.index[i], 'buy_point'] = 1 if not simulation: if data['buy_point'][-10:-1].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation40' data.at[data.index[-1], 'buy_point'] = 1 # Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환 if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90: data.at[data.index[i], 'buy_signal'] = 'deviation240' data.at[data.index[i], 'buy_point'] = 1 if not simulation: if data['buy_point'][-10:-1].sum() > 0: data.at[data.index[-1], 'buy_signal'] = 'deviation240' data.at[data.index[-1], 'buy_point'] = 1 return data def format_message(market_type, symbol, symbol_name, close, buy_signal): message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} " message += f"현재가: {'$' if market_type == 'US' else '₩'}{close:.2f}, " return message def format_ma_message(info, market_type): """MA 알림 메시지 생성""" prefix = '상승 ' if info.get('alert') else '' message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) " message += f"현재가: {'$' if market_type == 'US' else '₩'}{info['price']:.2f} \n" return message def get_coin_data(symbol, interval=240, to=None, retries=3): for attempt in range(retries): try: #url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(interval, symbol) if to is None: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) else: url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) #url = 'https://api.bithumb.com/v1/candles/minutes/60?market=KRW-ADA&count=200' #url = 'https://api.bithumb.com/v1/candles/minutes/minutes/60?market=KRW-ADA&count=200&to=2025-08-06 10:38:38' headers = {"accept": "application/json"} response = requests.get(url, headers=headers) json_data = json.loads(response.text) df_temp = pd.DataFrame(json_data) df_temp = df_temp.sort_index(ascending=False) if 'candle_date_time_kst' not in df_temp: return None data = pd.DataFrame() # data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume'] # data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst']) data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') data['Open'] = df_temp['opening_price'] data['Close'] = df_temp['trade_price'] data['High'] = df_temp['high_price'] data['Low'] = df_temp['low_price'] data['Volume'] = df_temp['candle_acc_trade_volume'] data = data.set_index('datetime') data = data.astype(float) data["datetime"] = data.index if not data.empty: return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(0.5) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def get_coin_more_data(symbol, interval, bong_count=3000): # 코인 데이터 1500개 봉 가져오기 to = datetime.now() data = None while data is None or len(data) < bong_count: if data is None: data = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) else: df = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) data = pd.concat([data, df], ignore_index=True) time.sleep(0.3) to = to - relativedelta(minutes=interval * 200) data = data.set_index('datetime') data = data.sort_index() data = data.drop_duplicates(keep='first') data["datetime"] = data.index # 코인 데이터 1500개 봉 가져오기 return data def get_kr_stock_data(symbol, retries=3): for attempt in range(retries): try: end = datetime.now() start = end - timedelta(days=300) # FinanceDataReader를 사용하여 한국 주식 데이터 가져오기 data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')) if not data.empty: # FinanceDataReader의 컬럼명을 yfinance 형식으로 변환 data = data.rename(columns={ 'Open': 'Open', 'High': 'High', 'Low': 'Low', 'Close': 'Close', 'Volume': 'Volume' }) return data print(f"No data received for {symbol}, attempt {attempt + 1}") time.sleep(2) except Exception as e: print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") if attempt < retries - 1: time.sleep(5) continue return None def monitor_us_stocks(): message_list = [] print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) for symbol in US_STOCKS: data = get_kr_stock_data(symbol) if data is not None and not data.empty: try: data = calculate_technical_indicators(data) recent_data = check_buy_point(data) # Changed to check_buy_point if recent_data['buy_point'].iloc[-1] != 1: continue print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'][-1]:.2f}") message_list.append(format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'][-1], recent_data['buy_signal'][-1])) except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") time.sleep(0.5) if len(message_list) > 0: try: send_stock_telegram_message(message_list, header="[US-STOCK]") except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def monitor_kr_stocks(): message_list = [] print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) for symbol in KR_ETFS: try: # .KS 접미사 제거 clean_symbol = symbol.replace('.KS', '') data = get_kr_stock_data(clean_symbol) if data is not None and not data.empty: try: data = calculate_technical_indicators(data) recent_data = check_buy_point(data) # Changed to check_buy_point if recent_data['buy_point'].iloc[-1] != 1: continue print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'][-1]:.2f}") message_list.append(format_message('KR', symbol, US_STOCKS[symbol], recent_data['Close'][-1])) except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") # 각 심볼 처리 후 1초 대기 time.sleep(1) except Exception as e: print(f"Unexpected error processing {symbol}: {str(e)}") continue if len(message_list) > 0: try: send_stock_telegram_message(message_list, header="[KR-STOCK]") except Exception as e: print(f"Error sending Telegram message: {str(e)}") return def monitor_coins(): message_list = [] print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) for symbol in KR_COINS: # 1시간 interval = 60 data = get_coin_more_data(symbol, interval) if data is not None and not data.empty: try: data = calculate_technical_indicators(data) recent_data = check_buy_point(data) # Changed to check_buy_point if recent_data['buy_point'].iloc[-1] != 1: continue print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'][-1]:.2f}") message_list.append(format_message('COIN', symbol, US_STOCKS[symbol], recent_data['Close'][-1], recent_data['buy_signal'][-1])) # buy buy_ticker(symbol, recent_data) except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: print(f"Data for {symbol} is empty or None.") time.sleep(0.5) if len(message_list) > 0: try: # send message send_coin_telegram_message(message_list, header="[KRW-COIN]") except Exception as e: print(f"Error sending Telegram message: {str(e)}") return # ---------------------- # Turnaround Detector v6 # ---------------------- def detect_turnaround_signal(symbol, data, interval=0, params=None): if len(data) < 7: return None # 이동평균을 기반으로 매수 신호 결정 cur = data.iloc[-1] prev = data.iloc[-2] return None def run_schedule(): # 코인 모니터링 스케줄 (매시간 1분, 11분, 21분, 31분, 41분, 51분) for minute in [4, 14, 24, 34, 44, 54]: schedule.every().hour.at(f":{minute:02d}").do(monitor_coins) # 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분) schedule.every().day.at("16:30").do(monitor_us_stocks) schedule.every().day.at("23:30").do(monitor_us_stocks) schedule.every().day.at("05:10").do(monitor_us_stocks) # 한국 ETF 모니터링 스케줄 (매일 오전 8시) schedule.every().day.at("18:20").do(monitor_kr_stocks) schedule.every().day.at("07:10").do(monitor_kr_stocks) print("Scheduler started. Monitoring will run at specified times.") while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": run_schedule() #monitor_coins()