diff --git a/README.md b/README.md index e5085b1..3a14a14 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ export STOCK_TELEGRAM_CHAT_ID="" ## 사용 방법 ```bash -$ python stock_monitor.py +$ python monitor_coin.py ``` 스크립트가 백그라운드에서 무한 루프로 동작하며 지정된 시간마다 텔레그램 알림을 전송합니다. diff --git a/stock_downloader.py b/downloader.py similarity index 97% rename from stock_downloader.py rename to downloader.py index 0db1a9c..874b5d3 100644 --- a/stock_downloader.py +++ b/downloader.py @@ -1,6 +1,6 @@ from HTS2 import HTS import sqlite3 -from stock_monitor import get_coin_more_data +from monitor_coin import get_coin_more_data from config import * hts = HTS() diff --git a/monitor.py b/monitor.py new file mode 100644 index 0000000..a614f96 --- /dev/null +++ b/monitor.py @@ -0,0 +1,407 @@ +import pandas as pd +from HTS2 import HTS +from dateutil.relativedelta import relativedelta +from datetime import datetime, timedelta +import sqlite3 +import telegram +import time +import requests +import json +import asyncio +from multiprocessing import Pool +import schedule +from config import * +import FinanceDataReader as fdr +import numpy as np +import os + + +class Monitor: + """자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스""" + + cooldown_file = None + + def __init__(self, cooldown_file='coins_buy_time.json') -> None: + self.hts = HTS() + if cooldown_file is not None: + self.cooldown_file = cooldown_file + self.buy_cooldown = self._load_buy_cooldown() + + # ------------- Persistence ------------- + def _load_buy_cooldown(self) -> dict: + if os.path.exists(self.cooldown_file): + try: + with open(self.cooldown_file, 'r', encoding='utf-8') as f: + data = json.load(f) + cooldown: dict[str, datetime] = {} + for symbol, time_str in data.items(): + cooldown[symbol] = datetime.fromisoformat(time_str) + return cooldown + except Exception as e: + print(f"Error loading cooldown data: {e}") + return {} + return {} + + def _save_buy_cooldown(self) -> None: + try: + data: dict[str, str] = {} + for symbol, dt in self.buy_cooldown.items(): + data[symbol] = dt.isoformat() + with open(self.cooldown_file, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"Error saving cooldown data: {e}") + + # ------------- Telegram ------------- + def _send_coin_msg(self, text: str) -> None: + coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN) + asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text)) + + def _send_stock_msg(self, text: str) -> None: + stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN) + asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text)) + + def send_coin_telegram_message(self, message_list: list[str], header: str) -> None: + payload = header + "\n" + for i, message in enumerate(message_list): + payload += message + if i + 1 % 20 == 0: + pool = Pool(12) + pool.map(self._send_coin_msg, [payload]) + payload = '' + if len(message_list) % 20 != 0: + pool = Pool(12) + pool.map(self._send_coin_msg, [payload]) + + def send_stock_telegram_message(self, message_list: list[str], header: str) -> None: + payload = header + "\n" + for i, message in enumerate(message_list): + payload += message + if i + 1 % 20 == 0: + pool = Pool(12) + pool.map(self._send_stock_msg, [payload]) + payload = '' + if len(message_list) % 20 != 0: + pool = Pool(12) + pool.map(self._send_stock_msg, [payload]) + + # ------------- Indicators ------------- + def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame: + columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume'] + normalized_data = data.copy() + for column in columns_to_normalize: + min_val = data[column].rolling(window=20).min() + max_val = data[column].rolling(window=20).max() + denominator = max_val - min_val + normalized_data[f'{column}_Norm'] = np.where( + denominator != 0, + (data[column] - min_val) / denominator, + 0.5, + ) + return normalized_data + + def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame: + data = self.normalize_data(data) + data['MA5'] = data['Close'].rolling(window=5).mean() + data['MA20'] = data['Close'].rolling(window=20).mean() + data['MA40'] = data['Close'].rolling(window=40).mean() + data['MA120'] = data['Close'].rolling(window=120).mean() + data['MA200'] = data['Close'].rolling(window=200).mean() + data['MA240'] = data['Close'].rolling(window=240).mean() + data['MA720'] = data['Close'].rolling(window=720).mean() + data['MA1440'] = data['Close'].rolling(window=1440).mean() + data['Deviation5'] = (data['Close'] / data['MA5']) * 100 + data['Deviation20'] = (data['Close'] / data['MA20']) * 100 + data['Deviation40'] = (data['Close'] / data['MA40']) * 100 + data['Deviation120'] = (data['Close'] / data['MA120']) * 100 + data['Deviation200'] = (data['Close'] / data['MA200']) * 100 + data['Deviation240'] = (data['Close'] / data['MA240']) * 100 + data['Deviation720'] = (data['Close'] / data['MA720']) * 100 + data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100 + data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1)) + data['MA'] = data['Close'].rolling(window=20).mean() + data['STD'] = data['Close'].rolling(window=20).std() + data['Upper'] = data['MA'] + (2 * data['STD']) + data['Lower'] = data['MA'] - (2 * data['STD']) + return data + + # ------------- Strategy ------------- + def buy_ticker(self, symbol: str, data: pd.DataFrame) -> bool: + try: + current_time = datetime.now() + if data['buy_signal'].iloc[-1] != 'fall_5p': + buy_amount = 100000 + + if symbol in self.buy_cooldown: + time_diff = current_time - self.buy_cooldown[symbol] + if time_diff.total_seconds() < 600: + print(f"{symbol}: 매수 금지 중 (남은 시간: {600 - time_diff.total_seconds():.0f}초)") + return False + else: + if symbol in self.buy_cooldown: + time_diff = current_time - self.buy_cooldown[symbol] + if time_diff.total_seconds() < 1200: + print(f"{symbol}: 매수 금지 중 (남은 시간: {1200 - time_diff.total_seconds():.0f}초)") + return False + + buy_amount = 5100 + if data['buy_signal'].iloc[-1] == 'movingaverage': + buy_amount = 7000 + elif data['buy_signal'].iloc[-1] == 'deviation40': + buy_amount = 10000 + elif data['buy_signal'].iloc[-1] == 'deviation240': + buy_amount = 6000 + elif data['buy_signal'].iloc[-1] == 'deviation1440': + if symbol in ['BONK', 'PEPE', 'TON']: + buy_amount = 30000 + else: + buy_amount = 50000 + + _ = self.hts.buyCoinMarket(symbol, buy_amount) + + if self.cooldown_file is not None: + self.buy_cooldown[symbol] = current_time + self._save_buy_cooldown() + + print(f"{KR_COINS[symbol]} ({symbol}): {data['Close'].iloc[-1]:.4f}: 매수 완료, 20분간 매수 금지 시작") + try: + pool = Pool(12) + pool.map(self._send_coin_msg, [ + "[KRW-COIN]" + "\n" + self.format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['buy_signal'].iloc[-1]) + ]) + except Exception as e: + print(f"Error sending Telegram message: {str(e)}") + except Exception as e: + print(f"Error buying {symbol}: {str(e)}") + return False + return True + + def check_buy_point(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame: + data = data.copy() + data['buy_signal'] = '' + data['buy_point'] = 0 + if data['buy_point'].iloc[-1] != 1: + for i in range(1, len(data)): + if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \ + all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \ + data['MA720'].iloc[i] < data['MA1440'].iloc[i]: + data.at[data.index[i], 'buy_signal'] = 'movingaverage' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'movingaverage' + data.at[data.index[-1], 'buy_point'] = 1 + + if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90: + data.at[data.index[i], 'buy_signal'] = 'deviation40' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation40' + data.at[data.index[-1], 'buy_point'] = 1 + + if symbol not in ['BONK']: + if symbol in ['TRX']: + if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98: + data.at[data.index[i], 'buy_signal'] = 'deviation240' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation240' + data.at[data.index[-1], 'buy_point'] = 1 + else: + if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90: + data.at[data.index[i], 'buy_signal'] = 'deviation240' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation240' + data.at[data.index[-1], 'buy_point'] = 1 + + if symbol in ['TON']: + if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89: + data.at[data.index[i], 'buy_signal'] = 'deviation1440' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation1440' + data.at[data.index[-1], 'buy_point'] = 1 + elif symbol in ['XRP']: + if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90: + data.at[data.index[i], 'buy_signal'] = 'deviation1440' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation1440' + data.at[data.index[-1], 'buy_point'] = 1 + elif symbol in ['BONK']: + if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76: + data.at[data.index[i], 'buy_signal'] = 'deviation1440' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation1440' + data.at[data.index[-1], 'buy_point'] = 1 + else: + if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80: + data.at[data.index[i], 'buy_signal'] = 'deviation1440' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'deviation1440' + data.at[data.index[-1], 'buy_point'] = 1 + + try: + prev_low = data['Low'].iloc[i - 1] + curr_close = data['Close'].iloc[i] + cond_close_drop = curr_close <= prev_low * 0.95 + if cond_close_drop: + data.at[data.index[i], 'buy_signal'] = 'fall_5p' + data.at[data.index[i], 'buy_point'] = 1 + if not simulation and data['buy_point'][-3:].sum() > 0: + data.at[data.index[-1], 'buy_signal'] = 'fall_5p' + data.at[data.index[-1], 'buy_point'] = 1 + except Exception: + pass + return data + + # ------------- Formatting ------------- + def format_message(self, market_type: str, symbol: str, symbol_name: str, close: float, buy_signal: str) -> str: + message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} " + message += f"현재가: {'$' if market_type == 'US' else '₩'}{close:.4f}, " + return message + + def format_ma_message(self, info: dict, market_type: str) -> str: + prefix = '상승 ' if info.get('alert') else '' + message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) " + message += f"현재가: {'$' if market_type == 'US' else '₩'}{info['price']:.4f} \n" + return message + + # ------------- Data fetch ------------- + def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None: + for attempt in range(retries): + try: + if to is None: + url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) + else: + url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) + headers = {"accept": "application/json"} + response = requests.get(url, headers=headers) + json_data = json.loads(response.text) + df_temp = pd.DataFrame(json_data) + df_temp = df_temp.sort_index(ascending=False) + if 'candle_date_time_kst' not in df_temp: + return None + data = pd.DataFrame() + data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') + data['Open'] = df_temp['opening_price'] + data['Close'] = df_temp['trade_price'] + data['High'] = df_temp['high_price'] + data['Low'] = df_temp['low_price'] + data['Volume'] = df_temp['candle_acc_trade_volume'] + data = data.set_index('datetime') + data = data.astype(float) + data["datetime"] = data.index + if not data.empty: + return data + print(f"No data received for {symbol}, attempt {attempt + 1}") + time.sleep(0.5) + except Exception as e: + print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") + if attempt < retries - 1: + time.sleep(5) + continue + return None + + def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame: + to = datetime.now() + data: pd.DataFrame | None = None + while data is None or len(data) < bong_count: + if data is None: + data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) + else: + previous_count = len(data) + df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) + data = pd.concat([data, df], ignore_index=True) + if previous_count == len(data): + break + time.sleep(0.3) + to = to - relativedelta(minutes=interval * 200) + data = data.set_index('datetime') + data = data.sort_index() + data = data.drop_duplicates(keep='first') + data["datetime"] = data.index + return data + + def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame: + conn = sqlite3.connect('coins.db') + cursor = conn.cursor() + for i in range(1, len(data)): + cursor.execute( + "SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?", + (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval), + ) + arr = cursor.fetchone() + if not arr: + cursor.execute( + "INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + interval, + symbol, + KR_COINS[symbol], + data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), + data['datetime'].iloc[-i].strftime('%Y%m%d'), + data['datetime'].iloc[-i].strftime('%H%M%S'), + data['Close'].iloc[-i], + data['Open'].iloc[-i], + data['High'].iloc[-i], + data['Low'].iloc[-i], + data['Volume'].iloc[-i], + ), + ) + else: + break + cursor.execute( + "select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from " + + symbol + + " order by ymdhms desc limit 5000) subquery order by datetime" + ) + result = cursor.fetchall() + conn.commit() + cursor.close() + conn.close() + df = pd.DataFrame(result) + df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime'] + df = df.set_index('datetime') + df = df.sort_index() + df['datetime'] = df.index + return df + + def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame: + data = self.get_coin_data(symbol, interval) + data_1 = self.get_coin_data(symbol, interval=1) + data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60 + saved_data = self.get_coin_saved_data(symbol, interval, data) + data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True) + data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S') + data = data.set_index('datetime') + data = data.sort_index() + data = data.drop_duplicates(keep='first') + data["datetime"] = data.index + return data + + def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None: + for attempt in range(retries): + try: + end = datetime.now() + start = end - timedelta(days=300) + data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')) + if not data.empty: + data = data.rename(columns={ + 'Open': 'Open', + 'High': 'High', + 'Low': 'Low', + 'Close': 'Close', + 'Volume': 'Volume', + }) + return data + print(f"No data received for {symbol}, attempt {attempt + 1}") + time.sleep(2) + except Exception as e: + print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") + if attempt < retries - 1: + time.sleep(5) + continue + return None diff --git a/monitor_coin.py b/monitor_coin.py new file mode 100644 index 0000000..016eb95 --- /dev/null +++ b/monitor_coin.py @@ -0,0 +1,164 @@ +import pandas as pd +from dateutil.relativedelta import relativedelta +from datetime import datetime +import sqlite3 +import time +import requests +import json +from config import * + +from monitor import Monitor + +class MonitorCoin (Monitor): + """자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스""" + + def __init__(self, cooldown_file: str = 'coins_buy_time.json') -> None: + super().__init__(cooldown_file) + + # ------------- Data fetch ------------- + def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None: + for attempt in range(retries): + try: + if to is None: + url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) + else: + url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) + headers = {"accept": "application/json"} + response = requests.get(url, headers=headers) + json_data = json.loads(response.text) + df_temp = pd.DataFrame(json_data) + df_temp = df_temp.sort_index(ascending=False) + if 'candle_date_time_kst' not in df_temp: + return None + data = pd.DataFrame() + data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') + data['Open'] = df_temp['opening_price'] + data['Close'] = df_temp['trade_price'] + data['High'] = df_temp['high_price'] + data['Low'] = df_temp['low_price'] + data['Volume'] = df_temp['candle_acc_trade_volume'] + data = data.set_index('datetime') + data = data.astype(float) + data["datetime"] = data.index + if not data.empty: + return data + print(f"No data received for {symbol}, attempt {attempt + 1}") + time.sleep(0.5) + except Exception as e: + print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") + if attempt < retries - 1: + time.sleep(5) + continue + return None + + def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame: + to = datetime.now() + data: pd.DataFrame | None = None + while data is None or len(data) < bong_count: + if data is None: + data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) + else: + previous_count = len(data) + df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) + data = pd.concat([data, df], ignore_index=True) + if previous_count == len(data): + break + time.sleep(0.3) + to = to - relativedelta(minutes=interval * 200) + data = data.set_index('datetime') + data = data.sort_index() + data = data.drop_duplicates(keep='first') + data["datetime"] = data.index + return data + + def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame: + conn = sqlite3.connect('coins.db') + cursor = conn.cursor() + for i in range(1, len(data)): + cursor.execute( + "SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?", + (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval), + ) + arr = cursor.fetchone() + if not arr: + cursor.execute( + "INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + interval, + symbol, + KR_COINS[symbol], + data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), + data['datetime'].iloc[-i].strftime('%Y%m%d'), + data['datetime'].iloc[-i].strftime('%H%M%S'), + data['Close'].iloc[-i], + data['Open'].iloc[-i], + data['High'].iloc[-i], + data['Low'].iloc[-i], + data['Volume'].iloc[-i], + ), + ) + else: + break + cursor.execute( + "select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from " + + symbol + + " order by ymdhms desc limit 5000) subquery order by datetime" + ) + result = cursor.fetchall() + conn.commit() + cursor.close() + conn.close() + df = pd.DataFrame(result) + df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime'] + df = df.set_index('datetime') + df = df.sort_index() + df['datetime'] = df.index + return df + + def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame: + data = self.get_coin_data(symbol, interval) + data_1 = self.get_coin_data(symbol, interval=1) + data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60 + saved_data = self.get_coin_saved_data(symbol, interval, data) + data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True) + data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S') + data = data.set_index('datetime') + data = data.sort_index() + data = data.drop_duplicates(keep='first') + data["datetime"] = data.index + return data + + def monitor_coins(self) -> None: + print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + for symbol in KR_COINS: + interval = 60 + data = self.get_coin_some_data(symbol, interval) + if data is not None and not data.empty: + try: + data = self.calculate_technical_indicators(data) + recent_data = self.check_buy_point(symbol, data) + if recent_data['buy_point'].iloc[-1] != 1: + continue + buy_success = self.buy_ticker(symbol, recent_data) + if not buy_success: + continue + except Exception as e: + print(f"Error processing data for {symbol}: {str(e)}") + else: + print(f"Data for {symbol} is empty or None.") + + time.sleep(0.5) + + print("\t-{}".format(','.join(KR_COINS.keys()))) + return + # ------------- Scheduler ------------- + def run_schedule(self) -> None: + + while True: + self.monitor_coins() + time.sleep(10) + +if __name__ == "__main__": + KR_COINS.keys() + + MonitorCoin().run_schedule() diff --git a/monitor_stock.py b/monitor_stock.py new file mode 100644 index 0000000..af5cc37 --- /dev/null +++ b/monitor_stock.py @@ -0,0 +1,113 @@ +import pandas as pd +from datetime import datetime, timedelta +import time +import schedule +from config import * +import FinanceDataReader as fdr + +from monitor import Monitor + +class MonitorStock (Monitor): + """자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스""" + + def __init__(self) -> None: + super().__init__(None) + + + def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None: + for attempt in range(retries): + try: + end = datetime.now() + start = end - timedelta(days=300) + data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')) + if not data.empty: + data = data.rename(columns={ + 'Open': 'Open', + 'High': 'High', + 'Low': 'Low', + 'Close': 'Close', + 'Volume': 'Volume', + }) + return data + print(f"No data received for {symbol}, attempt {attempt + 1}") + time.sleep(2) + except Exception as e: + print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") + if attempt < retries - 1: + time.sleep(5) + continue + return None + + # ------------- Monitors ------------- + def monitor_us_stocks(self) -> None: + message_list: list[str] = [] + print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + for symbol in US_STOCKS: + data = self.get_kr_stock_data(symbol) + if data is not None and not data.empty: + try: + data = self.calculate_technical_indicators(data) + recent_data = self.check_buy_point(symbol, data) + if recent_data['buy_point'].iloc[-1] != 1: + continue + print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}") + message_list.append( + self.format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1]) + ) + except Exception as e: + print(f"Error processing data for {symbol}: {str(e)}") + time.sleep(0.5) + if len(message_list) > 0: + try: + self.send_stock_telegram_message(message_list, header="[US-STOCK]") + except Exception as e: + print(f"Error sending Telegram message: {str(e)}") + + def monitor_kr_stocks(self) -> None: + message_list: list[str] = [] + print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + for symbol in KR_ETFS: + try: + clean_symbol = symbol.replace('.KS', '') + data = self.get_kr_stock_data(clean_symbol) + if data is not None and not data.empty: + try: + data = self.calculate_technical_indicators(data) + recent_data = self.check_buy_point(symbol, data) + if recent_data['buy_point'].iloc[-1] != 1: + continue + print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}") + message_list.append( + self.format_message('KR', symbol, KR_ETFS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1]) + ) + except Exception as e: + print(f"Error processing data for {symbol}: {str(e)}") + else: + print(f"Data for {symbol} is empty or None.") + time.sleep(1) + except Exception as e: + print(f"Unexpected error processing {symbol}: {str(e)}") + continue + if len(message_list) > 0: + try: + self.send_stock_telegram_message(message_list, header="[KR-STOCK]") + except Exception as e: + print(f"Error sending Telegram message: {str(e)}") + + # ------------- Scheduler ------------- + def run_schedule(self) -> None: + schedule.every().day.at("16:30").do(self.monitor_us_stocks) + schedule.every().day.at("23:30").do(self.monitor_us_stocks) + schedule.every().day.at("23:30").do(self.monitor_us_stocks) + schedule.every().day.at("05:10").do(self.monitor_us_stocks) + schedule.every().day.at("18:20").do(self.monitor_kr_stocks) + schedule.every().day.at("07:10").do(self.monitor_kr_stocks) + print("Scheduler started. Stock Monitoring will run at specified times.") + print(f"Loaded cooldown data for {len(self.buy_cooldown)} coins") + while True: + schedule.run_pending() + time.sleep(1) + + +if __name__ == "__main__": + MonitorStock().run_schedule() diff --git a/simulation.py b/simulation.py new file mode 100644 index 0000000..8f717f2 --- /dev/null +++ b/simulation.py @@ -0,0 +1,458 @@ +from dateutil.relativedelta import relativedelta +import pandas as pd +import yfinance as yf +import matplotlib.pyplot as plt +import matplotlib.dates +import mplcursors +plt.rcParams['font.family'] ='AppleGothic' +plt.rcParams['axes.unicode_minus'] =False + +from config import * +from monitor import Monitor + + +class Simulation: + def __init__(self) -> None: + self.monitor = Monitor() + self.INTERVAL_MAP = { + 60: "60m", + 240: "4h", + } + + def detect_turnaround_signal(self, symbol, data, interval=0, params=None): + if len(data) < 7: + return None + current_data = data.iloc[-1] + if current_data.get('buy_point', 0) == 1: + return { + 'alert': True, + 'details': f"매수신호: {current_data.get('buy_signal', 'unknown')}" + } + return {'alert': False, 'details': "매수신호 없음"} + + def fetch_price_history(self, symbol: str, interval_minutes: int, days: int = 30) -> pd.DataFrame: + if symbol in KR_COINS: + bong_count = 3000 + return self.monitor.get_coin_more_data(symbol, interval_minutes, bong_count=bong_count) + if interval_minutes not in self.INTERVAL_MAP: + raise ValueError("interval must be 60 or 240") + interval_str = self.INTERVAL_MAP[interval_minutes] + df = yf.download( + tickers=symbol, + period=f"{days}d", + interval=interval_str, + progress=False, + ) + if df.empty: + raise RuntimeError("No data fetched. Check symbol or interval support.") + return df + + def analyze_bottom_period(self, symbol: str, interval_minutes: int, days: int = 90): + data = self.fetch_price_history(symbol, interval_minutes, days) + data = self.monitor.calculate_technical_indicators(data) + data = self.monitor.check_buy_point(symbol, data, simulation=True) + print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}") + print(f"총 데이터 수: {len(data)}") + bottom_start = pd.Timestamp('2025-06-22') + bottom_end = pd.Timestamp('2025-07-09') + bottom_data = data[(data.index >= bottom_start) & (data.index <= bottom_end)] + if len(bottom_data) == 0: + print("저점 기간 데이터가 없습니다.") + return None, [] + print(f"\n저점 기간 데이터: {bottom_data.index[0]} ~ {bottom_data.index[-1]}") + print(f"저점 기간 데이터 수: {len(bottom_data)}") + print("\n=== 저점 기간 기술적 지표 분석 ===") + min_price = bottom_data['Low'].min() + max_price = bottom_data['High'].max() + avg_price = bottom_data['Close'].mean() + print(f"최저가: {min_price:.4f}") + print(f"최고가: {max_price:.4f}") + print(f"평균가: {avg_price:.4f}") + print(f"가격 변동폭: {((max_price - min_price) / min_price * 100):.2f}%") + bb_lower_min = bottom_data['Lower'].min() + bb_upper_max = bottom_data['Upper'].max() + print(f"\n볼린저 밴드 분석:") + print(f"하단 밴드 최저: {bb_lower_min:.4f}") + print(f"상단 밴드 최고: {bb_upper_max:.4f}") + volume_avg = bottom_data['Volume'].mean() + volume_max = bottom_data['Volume'].max() + print(f"\n거래량 분석:") + print(f"평균 거래량: {volume_avg:.0f}") + print(f"최대 거래량: {volume_max:.0f}") + actual_bottom_idx = bottom_data['Low'].idxmin() + actual_bottom_price = bottom_data.loc[actual_bottom_idx, 'Low'] + actual_bottom_date = actual_bottom_idx + print(f"\n실제 저점:") + print(f"날짜: {actual_bottom_date}") + print(f"가격: {actual_bottom_price:.4f}") + print(f"볼린저 하단 대비: {((actual_bottom_price - bottom_data.loc[actual_bottom_idx, 'Lower']) / bottom_data.loc[actual_bottom_idx, 'Lower'] * 100):.2f}%") + print(f"\n=== 매수 신호 분석 ===") + bottom_alerts = bottom_data[bottom_data['buy_point'] == 1] + alerts = [(idx, row['Close']) for idx, row in bottom_alerts.iterrows()] + print(f"저점 기간 매수 신호 수: {len(alerts)}") + if alerts: + print("매수 신호 발생 시점:") + for date, price in alerts: + print(f" {date}: {price:.4f}") + return bottom_data, alerts + + def run_simulation(self, symbol: str, interval_minutes: int, days: int = 30): + data = self.fetch_price_history(symbol, interval_minutes) + data = self.monitor.calculate_technical_indicators(data) + data = self.monitor.check_buy_point(symbol, data, simulation=True) + print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}") + print(f"총 데이터 수: {len(data)}") + alerts = [] + for i in range(len(data)): + if data['buy_point'].iloc[i] == 1: + alerts.append((data.index[i], data['Close'].iloc[i])) + print(f"\n총 매수 신호 수: {len(alerts)}") + ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]) + dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]) + dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]) + dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')]) + print(f" - MA 신호: {ma_signals}") + print(f" - Dev40 신호: {dev40_signals}") + print(f" - Dev240 신호: {dev240_signals}") + print(f" - Dev1440 신호: {dev1440_signals}") + fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(15, 8), height_ratios=[3, 1]) + fig.suptitle(f"{symbol} - 시뮬레이션 {interval_minutes}분봉", fontsize=14) + + # ----------------- 마우스 휠 확대/축소 ----------------- + def on_scroll(event): + # event.button: 'up' -> zoom in, 'down' -> zoom out + if event.inaxes not in [ax1, ax2]: + return + ax = event.inaxes + # x 축만 두 축을 동시에 조정 + cur_xlim = ax1.get_xlim() + xdata = event.xdata + if xdata is None: + return + scale_factor = 0.9 if event.button == 'up' else 1/0.9 + new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor + relx = (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0]) + new_left = xdata - new_width * (1 - relx) + new_right = xdata + new_width * relx + # 데이터 영역 벗어나지 않도록 클램프 + xmin, xmax = matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1]) + if new_left < xmin: + new_left = xmin + if new_right > xmax: + new_right = xmax + ax1.set_xlim([new_left, new_right]) + ax2.set_xlim([new_left, new_right]) + # y축은 해당 축만 줌 + cur_ylim = ax.get_ylim() + ydata = event.ydata + if ydata is not None: + new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor + rely = (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0]) + ax.set_ylim([ydata - new_height * (1 - rely), ydata + new_height * rely]) + ax.figure.canvas.draw_idle() + + fig.canvas.mpl_connect('scroll_event', on_scroll) + + # 캔들스틱 차트 추가 (matplotlib 기본 기능 사용) + import matplotlib.dates as mdates + + # 캔들스틱 데이터 준비 + ohlc_data = [] + for i, (idx, row) in enumerate(data.iterrows()): + ohlc_data.append([ + mdates.date2num(idx), + row['Open'], + row['High'], + row['Low'], + row['Close'] + ]) + + # 캔들스틱 그리기 (matplotlib 기본 기능으로 구현) + for ohlc in ohlc_data: + date, open_price, high, low, close = ohlc + + # 캔들 색상 결정 + color = 'red' if close >= open_price else 'blue' + + # 캔들 몸통 그리기 + body_height = abs(close - open_price) + body_bottom = min(open_price, close) + rect = plt.Rectangle((date - 0.3, body_bottom), 0.6, body_height, + facecolor=color, edgecolor='black', alpha=0.7) + ax1.add_patch(rect) + + # 캔들 심지 그리기 + ax1.plot([date, date], [low, high], color='black', linewidth=1) + + # 메인 차트 (가격, 이동평균선, 볼린저 밴드) + line_close = ax1.plot(data.index, data["Close"], label="종가", color="black", linewidth=1.5, alpha=0.8)[0] + line_ma5 = ax1.plot(data.index, data["MA5"], label="MA5", color="red", linewidth=1)[0] + line_ma20 = ax1.plot(data.index, data["MA20"], label="MA20", color="blue", linewidth=1)[0] + line_ma40 = ax1.plot(data.index, data["MA40"], label="MA40", color="green", linewidth=1)[0] + line_ma120 = ax1.plot(data.index, data["MA120"], label="MA120", color="purple", linewidth=1)[0] + line_ma200 = ax1.plot(data.index, data["MA200"], label="MA200", color="brown", linewidth=1)[0] + line_ma240 = ax1.plot(data.index, data["MA240"], label="MA240", color="darkred", linewidth=1)[0] + line_ma720 = ax1.plot(data.index, data["MA720"], label="MA720", color="cyan", linewidth=1)[0] + line_ma1440 = ax1.plot(data.index, data["MA1440"], label="MA1440", color="magenta", linewidth=1)[0] + + # Bollinger Bands + line_upper = ax1.plot(data.index, data["Upper"], label="볼린저 Upper", color="grey", linestyle="--", linewidth=1)[0] + line_lower = ax1.plot(data.index, data["Lower"], label="볼린저 Lower", color="grey", linestyle="--", linewidth=1)[0] + + ax1.fill_between(data.index, data["Lower"], data["Upper"], color="grey", alpha=0.1) + + # 매수 포인트를 신호 유형별로 다르게 표시 (수직선 포함) + # 이동평균선 기반 매수 포인트 + ma_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')] + scatter_ma_buy_points = None + if len(ma_buy_points) > 0: + scatter_ma_buy_points = ax1.scatter(ma_buy_points.index, ma_buy_points['Close'], color='red', s=150, zorder=10, label='MA 매수 포인트', marker='o') + for time in ma_buy_points.index: + ax1.axvline(x=time, color='red', linestyle='-', alpha=0.5, linewidth=1) + + # Deviation40 기반 매수 포인트 + dev40_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')] + scatter_dev40_buy_points = None + if len(dev40_buy_points) > 0: + scatter_dev40_buy_points = ax1.scatter(dev40_buy_points.index, dev40_buy_points['Close'], + facecolors='none', edgecolors='red', linestyle='--', + linewidth=2, s=200, zorder=10, label='Dev40 매수 포인트') + for time in dev40_buy_points.index: + ax1.axvline(x=time, color='red', linestyle='--', alpha=0.5, linewidth=1) + + # Deviation240 기반 매수 포인트 + dev240_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')] + scatter_dev240_buy_points = None + if len(dev240_buy_points) > 0: + scatter_dev240_buy_points = ax1.scatter(dev240_buy_points.index, dev240_buy_points['Close'], + facecolors='none', edgecolors='blue', linestyle='--', + linewidth=2, s=200, zorder=10, label='Dev240 매수 포인트') + for time in dev240_buy_points.index: + ax1.axvline(x=time, color='blue', linestyle='--', alpha=0.5, linewidth=1) + + # Deviation1440 기반 매수 포인트 + dev1440_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')] + scatter_dev1440_buy_points = None + if len(dev1440_buy_points) > 0: + scatter_dev1440_buy_points = ax1.scatter(dev1440_buy_points.index, dev1440_buy_points['Close'], + facecolors='none', edgecolors='purple', linestyle='--', + linewidth=2, s=200, zorder=10, label='Dev1440 매수 포인트') + for time in dev1440_buy_points.index: + ax1.axvline(x=time, color='purple', linestyle='--', alpha=0.5, linewidth=1) + + # 마우스 오버 기능 추가 (이동평균선 매수 포인트) + if scatter_ma_buy_points is not None: + cursor = mplcursors.cursor(scatter_ma_buy_points, hover=True) + cursor.connect("add", lambda sel: sel.annotation.set_text( + f'MA 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' + )) + cursor.connect("remove", lambda sel: sel.annotation.set_visible(False)) + + # 마우스 오버 기능 추가 (Deviation40 매수 포인트) + if scatter_dev40_buy_points is not None: + cursor_dev40 = mplcursors.cursor(scatter_dev40_buy_points, hover=True) + cursor_dev40.connect("add", lambda sel: sel.annotation.set_text( + f'Dev40 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' + )) + cursor_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False)) + + # 마우스 오버 기능 추가 (Deviation240 매수 포인트) + if scatter_dev240_buy_points is not None: + cursor_dev240 = mplcursors.cursor(scatter_dev240_buy_points, hover=True) + cursor_dev240.connect("add", lambda sel: sel.annotation.set_text( + f'Dev240 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' + )) + cursor_dev240.connect("remove", lambda sel: sel.annotation.set_visible(False)) + + # 마우스 오버 기능 추가 (Deviation1440 매수 포인트) + if scatter_dev1440_buy_points is not None: + cursor_dev1440 = mplcursors.cursor(scatter_dev1440_buy_points, hover=True) + cursor_dev1440.connect("add", lambda sel: sel.annotation.set_text( + f'Dev1440 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' + )) + cursor_dev1440.connect("remove", lambda sel: sel.annotation.set_visible(False)) + + # 모든 봉에 마우스 오버 기능 추가 + cursor3 = mplcursors.cursor(line_close, hover=True) + cursor3.connect("add", lambda sel: sel.annotation.set_text( + f'종가\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' + )) + cursor3.connect("remove", lambda sel: sel.annotation.set_visible(False)) + + ax1.set_ylabel("가격 (KRW)") + ax1.set_title(f"{symbol} 차트 - 캔들스틱 & 이동평균선", fontsize=14) + ax1.grid(True, linestyle='--', alpha=0.3) + + # 홈 버튼 추가 (초기화 기능) + home_button = ax1.text(0.02, 0.98, '홈', transform=ax1.transAxes, + bbox=dict(boxstyle="round,pad=0.3", facecolor='lightblue', alpha=0.8), + fontsize=10, ha='left', va='top', picker=True) + + # --- 범례 생성 및 인터랙티브 토글 --- + legend = ax1.legend(loc='upper left', bbox_to_anchor=(0.02, 0.85), fontsize=10) + + # 범례 클릭 시 해당 선 토글 기능 + lined = {} + if hasattr(legend, "legend_handles"): + legend_handles = legend.legend_handles + elif hasattr(legend, "legendHandles"): + legend_handles = legend.legendHandles + else: + legend_handles = legend.get_lines() + plot_lines = [line_close, line_ma5, line_ma20, line_ma40, line_ma120, + line_ma200, line_ma240, line_ma720, line_ma1440, + line_upper, line_lower] + if scatter_ma_buy_points is not None: + plot_lines.append(scatter_ma_buy_points) + if scatter_dev40_buy_points is not None: + plot_lines.append(scatter_dev40_buy_points) + if scatter_dev240_buy_points is not None: + plot_lines.append(scatter_dev240_buy_points) + if scatter_dev1440_buy_points is not None: + plot_lines.append(scatter_dev1440_buy_points) + + for leg_handle, orig in zip(legend_handles, plot_lines): + leg_handle.set_picker(True) + lined[leg_handle] = orig + + def on_pick(event): + leg_handle = event.artist + if leg_handle == home_button: + ax1.set_xlim(matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1])) + ax1.set_ylim(data['Low'].min() * 0.99, data['High'].max() * 1.01) + ax2.set_xlim(ax1.get_xlim()) + ax2.set_ylim( + data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].min().min() * 0.95, + data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].max().max() * 1.05, + ) + fig.canvas.draw_idle() + return + orig = lined.get(leg_handle) + if orig is None: + return + vis = not orig.get_visible() + orig.set_visible(vis) + leg_handle.set_alpha(1.0 if vis else 0.2) + fig.canvas.draw_idle() + + fig.canvas.mpl_connect('pick_event', on_pick) + + # Deviation subplot (이격도 보조지표) + line_dev5 = ax2.plot(data.index, data['Deviation5'], color='red', label='Dev5', linewidth=1)[0] + line_dev20 = ax2.plot(data.index, data['Deviation20'], color='blue', label='Dev20', linewidth=1)[0] + line_dev40 = ax2.plot(data.index, data['Deviation40'], color='green', label='Dev40', linewidth=2)[0] + line_dev120 = ax2.plot(data.index, data['Deviation120'], color='purple', label='Dev120', linewidth=1)[0] + line_dev200 = ax2.plot(data.index, data['Deviation200'], color='brown', label='Dev200', linewidth=1)[0] + line_dev240 = ax2.plot(data.index, data['Deviation240'], color='darkred', label='Dev240', linewidth=2)[0] + line_dev720 = ax2.plot(data.index, data['Deviation720'], color='cyan', label='Dev720', linewidth=1)[0] + line_dev1440 = ax2.plot(data.index, data['Deviation1440'], color='magenta', label='Dev1440', linewidth=1)[0] + + cursor_dev = mplcursors.cursor([line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440], hover=True) + cursor_dev.connect("add", lambda sel: sel.annotation.set_text( + f"{sel.artist.get_label()}\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M')}\n값: {sel.target[1]:.2f}" + )) + line_h90 = ax2.axhline(90, color='red', linestyle='--', linewidth=2, label='90', alpha=0.7) + line_h95 = ax2.axhline(95, color='green', linestyle='--', linewidth=2, label='95', alpha=0.7) + line_h100 = ax2.axhline(100, color='black', linestyle='-', linewidth=1, label='100', alpha=0.5) + ax2.set_ylabel('이격도 (%)') + ax2.set_title('이격도 보조지표', fontsize=12) + legend2 = ax2.legend(loc='upper left', fontsize=9) + + if legend2 is not None: + if hasattr(legend2, "legend_handles"): + legend2_handles = legend2.legend_handles + elif hasattr(legend2, "legendHandles"): + legend2_handles = legend2.legendHandles + else: + legend2_handles = legend2.get_lines() + plot_lines2 = [line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440, line_h90, line_h95] + for leg_handle in legend2_handles: + label = leg_handle.get_label() + target_line = next((pl for pl in plot_lines2 if pl.get_label() == label), None) + if target_line is not None: + leg_handle.set_picker(True) + lined[leg_handle] = target_line + ax2.grid(True, linestyle='--', alpha=0.3) + + plt.tight_layout() + + print("그래프를 표시합니다...") + print(f"매수 포인트 수: MA={len(ma_buy_points)}, Dev40={len(dev40_buy_points)}, Dev240={len(dev240_buy_points)}") + + # -------- 확대/축소 및 이동 기능 -------- + press = {} + + def on_scroll(event): + ax = event.inaxes + if ax is None: + return + x_left, x_right = ax.get_xlim() + x_range = (x_right - x_left) + if event.button == 'up': # zoom in + scale = 0.8 + elif event.button == 'down': # zoom out + scale = 1.25 + else: + scale = 1.0 + new_range = x_range * scale + center = event.xdata if event.xdata is not None else (x_left + x_right) / 2 + ax.set_xlim(center - new_range / 2, center + new_range / 2) + for other_ax in fig.axes: + if other_ax is not ax: + other_ax.set_xlim(ax.get_xlim()) + fig.canvas.draw_idle() + + def on_press(event): + if event.button == 1 and event.inaxes is not None: + press['xpress'] = event.xdata + press['axes'] = event.inaxes + + def on_motion(event): + if 'xpress' not in press or press.get('axes') is None or event.inaxes is None: + return + dx = press['xpress'] - event.xdata + for ax in fig.axes: + x_left, x_right = ax.get_xlim() + ax.set_xlim(x_left + dx, x_right + dx) + fig.canvas.draw_idle() + + def on_release(event): + press.clear() + + fig.canvas.mpl_connect('scroll_event', on_scroll) + fig.canvas.mpl_connect('button_press_event', on_press) + fig.canvas.mpl_connect('motion_notify_event', on_motion) + fig.canvas.mpl_connect('button_release_event', on_release) + + plt.show() + + +if __name__ == "__main__": + sim = Simulation() + interval = 60 + days = 90 + target_coins = ['POL'] + show_graphs = True + for symbol in target_coins: + print(f"\n=== {symbol} 저점 기간 분석 시작 ===") + try: + bottom_data, alerts = sim.analyze_bottom_period(symbol, interval, days) + print(f"\n=== {symbol} 전체 기간 시뮬레이션 ===") + if show_graphs: + sim.run_simulation(symbol, interval, days) + else: + data = sim.fetch_price_history(symbol, interval, days) + data = sim.monitor.calculate_technical_indicators(data) + data = sim.monitor.check_buy_point(symbol, data, simulation=True) + total_buy_signals = len(data[data['buy_point'] == 1]) + ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]) + dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]) + dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]) + dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')]) + print(f"총 매수 신호: {total_buy_signals}") + print(f" - MA 신호: {ma_signals}") + print(f" - Dev40 신호: {dev40_signals}") + print(f" - Dev240 신호: {dev240_signals}") + print(f" - Dev1440 신호: {dev1440_signals}") + except Exception as e: + print(f"Error analyzing {symbol}: {str(e)}") diff --git a/stock_monitor.py b/stock_monitor.py deleted file mode 100644 index a55b19c..0000000 --- a/stock_monitor.py +++ /dev/null @@ -1,592 +0,0 @@ -import pandas as pd -from HTS2 import HTS -from dateutil.relativedelta import relativedelta -from datetime import datetime, timedelta -import sqlite3 -import telegram -import time -import requests -import json -import asyncio -from multiprocessing import Pool -import schedule -from config import * -import FinanceDataReader as fdr -import numpy as np -import os - -hts = HTS() - -# 매수 금지 시간을 관리하는 JSON 파일 경로 -COOLDOWN_FILE = 'coins_buy_time.json' - -def load_buy_cooldown(): - """매수 금지 시간을 JSON 파일에서 로드""" - if os.path.exists(COOLDOWN_FILE): - try: - with open(COOLDOWN_FILE, 'r', encoding='utf-8') as f: - data = json.load(f) - # 문자열을 datetime 객체로 변환 - cooldown = {} - for symbol, time_str in data.items(): - cooldown[symbol] = datetime.fromisoformat(time_str) - return cooldown - except Exception as e: - print(f"Error loading cooldown data: {e}") - return {} - return {} - -def save_buy_cooldown(cooldown): - """매수 금지 시간을 JSON 파일에 저장""" - try: - # datetime 객체를 문자열로 변환 - data = {} - for symbol, dt in cooldown.items(): - data[symbol] = dt.isoformat() - - with open(COOLDOWN_FILE, 'w', encoding='utf-8') as f: - json.dump(data, f, ensure_ascii=False, indent=2) - except Exception as e: - print(f"Error saving cooldown data: {e}") - -# 매수 금지 시간을 추적하는 전역 딕셔너리 -buy_cooldown = load_buy_cooldown() - -def send_coin_msg(text): - coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN) - asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text)) - return - - -def send_coin_telegram_message(message_list, header): - pStr = header + "\n" - for i, message in enumerate(message_list): - pStr += message - - if i + 1 % 20 == 0: - pool = Pool(12) - pool.map(send_coin_msg, [pStr]) - pStr = '' - - if len(message_list) % 20 != 0: - pool = Pool(12) - pool.map(send_coin_msg, [pStr]) - - return - - -def send_stock_msg(text): - stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN) - asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text)) - return - - -def send_stock_telegram_message(message_list, header): - pStr = header + "\n" - for i, message in enumerate(message_list): - pStr += message - - if i + 1 % 20 == 0: - pool = Pool(12) - pool.map(send_stock_msg, [pStr]) - pStr = '' - - if len(message_list) % 20 != 0: - pool = Pool(12) - pool.map(send_stock_msg, [pStr]) - - return - - -def normalize_data(data): - """데이터 정규화 함수 - 모든 코인에 동일하게 적용""" - # Min-Max 정규화를 위한 컬럼 - columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume'] - - normalized_data = data.copy() - - # 각 컬럼별 정규화 (20일 롤링 윈도우 사용) - for column in columns_to_normalize: - min_val = data[column].rolling(window=20).min() - max_val = data[column].rolling(window=20).max() - # 0으로 나누기 방지 - denominator = max_val - min_val - normalized_data[f'{column}_Norm'] = np.where( - denominator != 0, - (data[column] - min_val) / denominator, - 0.5 # 기본값 설정 - ) - - return normalized_data - -def calculate_technical_indicators(data): - """기술적 지표 계산 - 모든 코인에 동일하게 적용""" - # 데이터 정규화 - data = normalize_data(data) - - # 이동평균선 계산 - data['MA5'] = data['Close'].rolling(window=5).mean() - data['MA20'] = data['Close'].rolling(window=20).mean() - data['MA40'] = data['Close'].rolling(window=40).mean() - data['MA120'] = data['Close'].rolling(window=120).mean() - data['MA200'] = data['Close'].rolling(window=200).mean() - data['MA240'] = data['Close'].rolling(window=240).mean() - data['MA720'] = data['Close'].rolling(window=720).mean() - data['MA1440'] = data['Close'].rolling(window=1440).mean() - - # --- 이격도(Deviation) 계산 --- - data['Deviation5'] = (data['Close'] / data['MA5']) * 100 - data['Deviation20'] = (data['Close'] / data['MA20']) * 100 - data['Deviation40'] = (data['Close'] / data['MA40']) * 100 - data['Deviation120'] = (data['Close'] / data['MA120']) * 100 - data['Deviation200'] = (data['Close'] / data['MA200']) * 100 - data['Deviation240'] = (data['Close'] / data['MA240']) * 100 - data['Deviation720'] = (data['Close'] / data['MA720']) * 100 - data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100 - - # 매수 타이밍을 이동평균선으로 결정 - # 골든크로스: 단기 이동평균선이 장기 이동평균선을 상향 돌파할 때 매수 - data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1)) - - # 볼린저 밴드 계산 - data['MA'] = data['Close'].rolling(window=20).mean() - data['STD'] = data['Close'].rolling(window=20).std() - data['Upper'] = data['MA'] + (2 * data['STD']) - data['Lower'] = data['MA'] - (2 * data['STD']) - - return data - - -def buy_ticker(symbol, data): - try: - current_time = datetime.now() - - if data['buy_signal'].iloc[-1] != 'fall_5p': - # 5%이상 급락일 때는 10분마다 매수 - BUY_AMOUNT = 100000 - else: - # 매수 금지 시간 확인 (20분) - if symbol in buy_cooldown: - time_diff = current_time - buy_cooldown[symbol] - if time_diff.total_seconds() < 1200: # 20분 = 1200초 - print(f"{symbol}: 매수 금지 중 (남은 시간: {1200 - time_diff.total_seconds():.0f}초)") - return False - - BUY_AMOUNT = 5100 - - if data['buy_signal'].iloc[-1] == 'movingaverage': - BUY_AMOUNT = 7000 - elif data['buy_signal'].iloc[-1] == 'deviation40': - BUY_AMOUNT = 10000 - elif data['buy_signal'].iloc[-1] == 'deviation240': - BUY_AMOUNT = 6000 - elif data['buy_signal'].iloc[-1] == 'deviation1440': - if symbol in ['BONK', 'PEPE', 'TON']: - BUY_AMOUNT = 30000 - else: - BUY_AMOUNT = 50000 - - _ = hts.buyCoinMarket(symbol, BUY_AMOUNT) - - # 매수 성공 시 금지 시간 설정 및 파일에 저장 - buy_cooldown[symbol] = current_time - save_buy_cooldown(buy_cooldown) - print(f"{KR_COINS[symbol]} ({symbol}): {data['Close'].iloc[-1]:.4f}: 매수 완료, 20분간 매수 금지 시작") - - try: - pool = Pool(12) - pool.map(send_coin_msg, ["[KRW-COIN]" + "\n" + format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['buy_signal'].iloc[-1])]) - except Exception as e: - print(f"Error sending Telegram message: {str(e)}") - - except Exception as e: - print(f"Error buying {symbol}: {str(e)}") - return False - - return True - -def check_buy_point(symbol, data, simulation=None): - - # 매수 포인트 탐지 및 표시 - # 데이터 복사본 생성하여 SettingWithCopyWarning 방지 - data = data.copy() - # 'buy_point' 열 초기화 - data['buy_signal'] = '' - data['buy_point'] = 0 - - # FutureWarning 해결 - if data['buy_point'].iloc[-1] != 1: - # 코드 계속 - for i in range(1, len(data)): - # 이동평균선 기반 매수 조건 - if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \ - all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \ - data['MA720'].iloc[i] < data['MA1440'].iloc[i]: - data.at[data.index[i], 'buy_signal'] = 'movingaverage' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'movingaverage' - data.at[data.index[-1], 'buy_point'] = 1 - - # Deviation40(이격도 40) 기반 매수 조건: 90 이하에서 상승 전환 - if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90: - data.at[data.index[i], 'buy_signal'] = 'deviation40' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation40' - data.at[data.index[-1], 'buy_point'] = 1 - - if symbol not in ['BONK']: - # BONK는 240 체크하지 않음 - - if symbol in ['TRX']: - # Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환 - if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98: - data.at[data.index[i], 'buy_signal'] = 'deviation240' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation240' - data.at[data.index[-1], 'buy_point'] = 1 - else: - # Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환 - if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90: - data.at[data.index[i], 'buy_signal'] = 'deviation240' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation240' - data.at[data.index[-1], 'buy_point'] = 1 - - # Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환 - if symbol in ['TON']: - if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89: - data.at[data.index[i], 'buy_signal'] = 'deviation1440' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation1440' - data.at[data.index[-1], 'buy_point'] = 1 - elif symbol in ['XRP']: - if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90: - data.at[data.index[i], 'buy_signal'] = 'deviation1440' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation1440' - data.at[data.index[-1], 'buy_point'] = 1 - elif symbol in ['BONK']: - if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76: - data.at[data.index[i], 'buy_signal'] = 'deviation1440' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation1440' - data.at[data.index[-1], 'buy_point'] = 1 - else: - if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80: - data.at[data.index[i], 'buy_signal'] = 'deviation1440' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'deviation1440' - data.at[data.index[-1], 'buy_point'] = 1 - - # 하락 5% 조건: 현재가 또는 현재 봉의 저가가 지난 봉 고가/현재 봉 고가 대비 5% 이상 하락 - try: - prev_low = data['Low'].iloc[i - 1] - curr_close = data['Close'].iloc[i] - - cond_close_drop = curr_close <= prev_low * 0.95 - - if cond_close_drop: - data.at[data.index[i], 'buy_signal'] = 'fall_5p' - data.at[data.index[i], 'buy_point'] = 1 - if not simulation: - if data['buy_point'][-3:].sum() > 0: - data.at[data.index[-1], 'buy_signal'] = 'fall_5p' - data.at[data.index[-1], 'buy_point'] = 1 - except Exception: - pass - - return data - -def format_message(market_type, symbol, symbol_name, close, buy_signal): - message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} " - message += f"현재가: {'$' if market_type == 'US' else '₩'}{close:.4f}, " - return message - - -def format_ma_message(info, market_type): - """MA 알림 메시지 생성""" - prefix = '상승 ' if info.get('alert') else '' - message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) " - message += f"현재가: {'$' if market_type == 'US' else '₩'}{info['price']:.4f} \n" - return message - - -def get_coin_data(symbol, interval=60, to=None, retries=3): - for attempt in range(retries): - try: - #url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(interval, symbol) - if to is None: - url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol) - else: - url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to) - - - #url = 'https://api.bithumb.com/v1/candles/minutes/60?market=KRW-ADA&count=200' - #url = 'https://api.bithumb.com/v1/candles/minutes/minutes/60?market=KRW-ADA&count=200&to=2025-08-06 10:38:38' - headers = {"accept": "application/json"} - response = requests.get(url, headers=headers) - json_data = json.loads(response.text) - df_temp = pd.DataFrame(json_data) - df_temp = df_temp.sort_index(ascending=False) - if 'candle_date_time_kst' not in df_temp: - return None - - data = pd.DataFrame() - # data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume'] - # data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst']) - data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S') - data['Open'] = df_temp['opening_price'] - data['Close'] = df_temp['trade_price'] - data['High'] = df_temp['high_price'] - data['Low'] = df_temp['low_price'] - data['Volume'] = df_temp['candle_acc_trade_volume'] - data = data.set_index('datetime') - data = data.astype(float) - data["datetime"] = data.index - - if not data.empty: - return data - - print(f"No data received for {symbol}, attempt {attempt + 1}") - time.sleep(0.5) - except Exception as e: - print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") - if attempt < retries - 1: - time.sleep(5) - continue - return None - -def get_coin_more_data(symbol, interval, bong_count=3000): - # 코인 데이터 1500개 봉 가져오기 - to = datetime.now() - data = None - while data is None or len(data) < bong_count: - if data is None: - data = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) - else: - p_count = len(data) - df = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S")) - data = pd.concat([data, df], ignore_index=True) - if p_count == len(data): - break - time.sleep(0.3) - to = to - relativedelta(minutes=interval * 200) - - data = data.set_index('datetime') - data = data.sort_index() - data = data.drop_duplicates(keep='first') - data["datetime"] = data.index - # 코인 데이터 1500개 봉 가져오기 - - return data - -def get_coin_saved_data(symbol, interval, data): - conn = sqlite3.connect('coins.db') - cursor = conn.cursor() - - for i in range(1, len(data)): - cursor.execute("SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?", (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval)) - arr = cursor.fetchone() - if not arr: - cursor.execute("INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (interval, symbol, KR_COINS[symbol], data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), data['datetime'].iloc[-i].strftime('%Y%m%d'), data['datetime'].iloc[-i].strftime('%H%M%S'), data['Close'].iloc[-i], data['Open'].iloc[-i], data['High'].iloc[-i], data['Low'].iloc[-i], data['Volume'].iloc[-i])) - else: - break - - cursor.execute("select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from " + symbol + " order by ymdhms desc limit 5000) subquery order by datetime") - result = cursor.fetchall() - conn.commit() - cursor.close() - conn.close() - - df = pd.DataFrame(result) - df.columns = ['Open','Close','High','Low','Volume','datetime'] - df = df.set_index('datetime') - df = df.sort_index() - df['datetime'] = df.index - return df - -def get_coin_some_data(symbol, interval): - data = get_coin_data(symbol, interval) - data_1 = get_coin_data(symbol, interval=1) - data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60 - - saved_data = get_coin_saved_data(symbol, interval, data) - - data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True) - #data = pd.concat([data, saved_data], ignore_index=True) - data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S') - data = data.set_index('datetime') - data = data.sort_index() - data = data.drop_duplicates(keep='first') - data["datetime"] = data.index - - return data - - -def get_kr_stock_data(symbol, retries=3): - for attempt in range(retries): - try: - end = datetime.now() - start = end - timedelta(days=300) - - # FinanceDataReader를 사용하여 한국 주식 데이터 가져오기 - data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')) - - if not data.empty: - # FinanceDataReader의 컬럼명을 yfinance 형식으로 변환 - data = data.rename(columns={ - 'Open': 'Open', - 'High': 'High', - 'Low': 'Low', - 'Close': 'Close', - 'Volume': 'Volume' - }) - return data - - print(f"No data received for {symbol}, attempt {attempt + 1}") - time.sleep(2) - except Exception as e: - print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}") - if attempt < retries - 1: - time.sleep(5) - continue - return None - - -def monitor_us_stocks(): - message_list = [] - print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - - for symbol in US_STOCKS: - data = get_kr_stock_data(symbol) - if data is not None and not data.empty: - try: - data = calculate_technical_indicators(data) - recent_data = check_buy_point(symbol, data) # Changed to check_buy_point - if recent_data['buy_point'].iloc[-1] != 1: - continue - print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}") - message_list.append(format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1])) - except Exception as e: - print(f"Error processing data for {symbol}: {str(e)}") - time.sleep(0.5) - - if len(message_list) > 0: - try: - send_stock_telegram_message(message_list, header="[US-STOCK]") - except Exception as e: - print(f"Error sending Telegram message: {str(e)}") - - return - - -def monitor_kr_stocks(): - message_list = [] - print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - - for symbol in KR_ETFS: - try: - # .KS 접미사 제거 - clean_symbol = symbol.replace('.KS', '') - data = get_kr_stock_data(clean_symbol) - - if data is not None and not data.empty: - try: - data = calculate_technical_indicators(data) - recent_data = check_buy_point(symbol, data) # Changed to check_buy_point - if recent_data['buy_point'].iloc[-1] != 1: - continue - print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}") - message_list.append(format_message('KR', symbol, KR_ETFS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1])) - - except Exception as e: - print(f"Error processing data for {symbol}: {str(e)}") - else: - print(f"Data for {symbol} is empty or None.") - - # 각 심볼 처리 후 1초 대기 - time.sleep(1) - - except Exception as e: - print(f"Unexpected error processing {symbol}: {str(e)}") - continue - - if len(message_list) > 0: - try: - send_stock_telegram_message(message_list, header="[KR-STOCK]") - except Exception as e: - print(f"Error sending Telegram message: {str(e)}") - - return - - -def monitor_coins(): - message_list = [] - print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - - for symbol in KR_COINS: - - # 1시간 - interval = 60 - data = get_coin_some_data(symbol, interval) - - if data is not None and not data.empty: - try: - data = calculate_technical_indicators(data) - recent_data = check_buy_point(symbol, data) # Changed to check_buy_point - if recent_data['buy_point'].iloc[-1] != 1: - continue - - # buy - buy_success = buy_ticker(symbol, recent_data) - if not buy_success: - continue # 매수 금지 중이면 다음 코인으로 넘어감 - - except Exception as e: - print(f"Error processing data for {symbol}: {str(e)}") - else: - print(f"Data for {symbol} is empty or None.") - time.sleep(0.5) - - return - -def run_schedule(): - - # 코인 모니터링 스케줄 (매시간 4분, 14분, 24분, 34분, 44분, 54분) - for minute in [2, 12, 22, 32, 42, 52]: - schedule.every().hour.at(f":{minute:02d}").do(monitor_coins) - - # 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분) - schedule.every().day.at("16:30").do(monitor_us_stocks) - schedule.every().day.at("23:30").do(monitor_us_stocks) - schedule.every().day.at("05:10").do(monitor_us_stocks) - - # 한국 ETF 모니터링 스케줄 (매일 오전 8시) - schedule.every().day.at("18:20").do(monitor_kr_stocks) - schedule.every().day.at("07:10").do(monitor_kr_stocks) - - print("Scheduler started. Monitoring will run at specified times.") - print(f"Loaded cooldown data for {len(buy_cooldown)} coins") - - while True: - schedule.run_pending() - time.sleep(1) - -if __name__ == "__main__": - run_schedule() diff --git a/stock_simulation.py b/stock_simulation.py deleted file mode 100644 index 1f0bf54..0000000 --- a/stock_simulation.py +++ /dev/null @@ -1,542 +0,0 @@ -from dateutil.relativedelta import relativedelta -import pandas as pd -import yfinance as yf -import matplotlib.pyplot as plt -import matplotlib.dates -import mplcursors -plt.rcParams['font.family'] ='AppleGothic' -plt.rcParams['axes.unicode_minus'] =False - -from config import * -from stock_monitor import calculate_technical_indicators, get_coin_more_data, check_buy_point - -def detect_turnaround_signal(symbol, data, interval=0, params=None): - """매수 신호 감지 함수 - stock_simulation.py에서 사용""" - if len(data) < 7: - return None - - # 현재 매수 조건 확인 - current_data = data.iloc[-1] - - # 매수 신호가 있는지 확인 - if current_data.get('buy_point', 0) == 1: - return { - 'alert': True, - 'details': f"매수신호: {current_data.get('buy_signal', 'unknown')}" - } - - return { - 'alert': False, - 'details': "매수신호 없음" - } - -# 비트/알트코인 KRW 마켓 식별: 문자열 "-KRW" 포함 여부로 간단 구분 - -INTERVAL_MAP = { - 60: "60m", # 1시간 (yfinance) - 240: "4h", # 4시간 (yfinance) -} - -BITHUMB_MAX_COUNT = 3000 # API 최대 3000 캔들 - -def fetch_price_history(symbol: str, interval_minutes: int, days: int = 30) -> pd.DataFrame: - - """최근 `days`일 데이터(캔들)를 가져온다. 코인(-KRW)은 빗썸, 그 외 yfinance.""" - if symbol in KR_COINS: - bong_count = 3000 - return get_coin_more_data(symbol, interval_minutes, bong_count=bong_count) - - # -------- 주식/ETF/해외코인 (yfinance) -------- - if interval_minutes not in INTERVAL_MAP: - raise ValueError("interval must be 60 or 240") - - interval_str = INTERVAL_MAP[interval_minutes] - - df = yf.download( - tickers=symbol, - period=f"{days}d", - interval=interval_str, - progress=False, - ) - - if df.empty: - raise RuntimeError("No data fetched. Check symbol or interval support.") - - return df - - -def analyze_bottom_period(symbol: str, interval_minutes: int, days: int = 90): - """저점 기간(6월 22일~7월 9일) 분석 - 최적화된 버전""" - data = fetch_price_history(symbol, interval_minutes, days) - data = calculate_technical_indicators(data) - data = check_buy_point(symbol, data, simulation=True) # 한 번만 계산 - - print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}") - print(f"총 데이터 수: {len(data)}") - - # 저점 기간 필터링 (6월 22일~7월 9일) - bottom_start = pd.Timestamp('2025-06-22') - bottom_end = pd.Timestamp('2025-07-09') - - bottom_data = data[(data.index >= bottom_start) & (data.index <= bottom_end)] - - if len(bottom_data) == 0: - print("저점 기간 데이터가 없습니다.") - return None, [] - - print(f"\n저점 기간 데이터: {bottom_data.index[0]} ~ {bottom_data.index[-1]}") - print(f"저점 기간 데이터 수: {len(bottom_data)}") - - # 저점 기간의 기술적 지표 분석 - print("\n=== 저점 기간 기술적 지표 분석 ===") - - # 1. 가격 분석 - min_price = bottom_data['Low'].min() - max_price = bottom_data['High'].max() - avg_price = bottom_data['Close'].mean() - - print(f"최저가: {min_price:.4f}") - print(f"최고가: {max_price:.4f}") - print(f"평균가: {avg_price:.4f}") - print(f"가격 변동폭: {((max_price - min_price) / min_price * 100):.2f}%") - - # 3. 볼린저 밴드 분석 - bb_lower_min = bottom_data['Lower'].min() - bb_upper_max = bottom_data['Upper'].max() - - print(f"\n볼린저 밴드 분석:") - print(f"하단 밴드 최저: {bb_lower_min:.4f}") - print(f"상단 밴드 최고: {bb_upper_max:.4f}") - - # 4. 거래량 분석 - volume_avg = bottom_data['Volume'].mean() - volume_max = bottom_data['Volume'].max() - - print(f"\n거래량 분석:") - print(f"평균 거래량: {volume_avg:.0f}") - print(f"최대 거래량: {volume_max:.0f}") - - # 5. 실제 저점 찾기 - actual_bottom_idx = bottom_data['Low'].idxmin() - actual_bottom_price = bottom_data.loc[actual_bottom_idx, 'Low'] - actual_bottom_date = actual_bottom_idx - - print(f"\n실제 저점:") - print(f"날짜: {actual_bottom_date}") - print(f"가격: {actual_bottom_price:.4f}") - print(f"볼린저 하단 대비: {((actual_bottom_price - bottom_data.loc[actual_bottom_idx, 'Lower']) / bottom_data.loc[actual_bottom_idx, 'Lower'] * 100):.2f}%") - - # 6. 매수 신호 분석 - 최적화된 버전 - print(f"\n=== 매수 신호 분석 ===") - - # 이미 계산된 매수 포인트에서 저점 기간만 필터링 - bottom_alerts = bottom_data[bottom_data['buy_point'] == 1] - alerts = [(idx, row['Close']) for idx, row in bottom_alerts.iterrows()] - - print(f"저점 기간 매수 신호 수: {len(alerts)}") - - if alerts: - print("매수 신호 발생 시점:") - for date, price in alerts: - print(f" {date}: {price:.4f}") - - return bottom_data, alerts - -def run_simulation(symbol: str, interval_minutes: int, days: int = 30): - data = fetch_price_history(symbol, interval_minutes) - data = calculate_technical_indicators(data) - data = check_buy_point(symbol, data, simulation=True) - - print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}") - print(f"총 데이터 수: {len(data)}") - - # check_buy_point에서 이미 계산된 매수 포인트를 사용 - alerts = [] - for i in range(len(data)): - if data['buy_point'].iloc[i] == 1: - alerts.append((data.index[i], data['Close'].iloc[i])) - - print(f"\n총 매수 신호 수: {len(alerts)}") - - # 매수 신호 유형별 통계 출력 - ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]) - dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]) - dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]) - dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')]) - - print(f" - MA 신호: {ma_signals}") - print(f" - Dev40 신호: {dev40_signals}") - print(f" - Dev240 신호: {dev240_signals}") - print(f" - Dev1440 신호: {dev1440_signals}") - - # 서브플롯 생성 (가격 + Deviation) - fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(15, 8), height_ratios=[3, 1]) - fig.suptitle(f"{symbol} - 시뮬레이션 {interval_minutes}분봉", fontsize=14) - - # ----------------- 마우스 휠 확대/축소 ----------------- - def on_scroll(event): - # event.button: 'up' -> zoom in, 'down' -> zoom out - if event.inaxes not in [ax1, ax2]: - return - ax = event.inaxes - # x 축만 두 축을 동시에 조정 - cur_xlim = ax1.get_xlim() - xdata = event.xdata - if xdata is None: - return - scale_factor = 0.9 if event.button == 'up' else 1/0.9 - new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor - relx = (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0]) - new_left = xdata - new_width * (1 - relx) - new_right = xdata + new_width * relx - # 데이터 영역 벗어나지 않도록 클램프 - xmin, xmax = matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1]) - if new_left < xmin: - new_left = xmin - if new_right > xmax: - new_right = xmax - ax1.set_xlim([new_left, new_right]) - ax2.set_xlim([new_left, new_right]) - # y축은 해당 축만 줌 - cur_ylim = ax.get_ylim() - ydata = event.ydata - if ydata is not None: - new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor - rely = (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0]) - ax.set_ylim([ydata - new_height * (1 - rely), ydata + new_height * rely]) - ax.figure.canvas.draw_idle() - - fig.canvas.mpl_connect('scroll_event', on_scroll) - - # 캔들스틱 차트 추가 (matplotlib 기본 기능 사용) - import matplotlib.dates as mdates - - # 캔들스틱 데이터 준비 - ohlc_data = [] - for i, (idx, row) in enumerate(data.iterrows()): - ohlc_data.append([ - mdates.date2num(idx), - row['Open'], - row['High'], - row['Low'], - row['Close'] - ]) - - # 캔들스틱 그리기 (matplotlib 기본 기능으로 구현) - for ohlc in ohlc_data: - date, open_price, high, low, close = ohlc - - # 캔들 색상 결정 - color = 'red' if close >= open_price else 'blue' - - # 캔들 몸통 그리기 - body_height = abs(close - open_price) - body_bottom = min(open_price, close) - rect = plt.Rectangle((date - 0.3, body_bottom), 0.6, body_height, - facecolor=color, edgecolor='black', alpha=0.7) - ax1.add_patch(rect) - - # 캔들 심지 그리기 - ax1.plot([date, date], [low, high], color='black', linewidth=1) - - # 메인 차트 (가격, 이동평균선, 볼린저 밴드) - line_close = ax1.plot(data.index, data["Close"], label="종가", color="black", linewidth=1.5, alpha=0.8)[0] - line_ma5 = ax1.plot(data.index, data["MA5"], label="MA5", color="red", linewidth=1)[0] - line_ma20 = ax1.plot(data.index, data["MA20"], label="MA20", color="blue", linewidth=1)[0] - line_ma40 = ax1.plot(data.index, data["MA40"], label="MA40", color="green", linewidth=1)[0] - line_ma120 = ax1.plot(data.index, data["MA120"], label="MA120", color="purple", linewidth=1)[0] - line_ma200 = ax1.plot(data.index, data["MA200"], label="MA200", color="brown", linewidth=1)[0] - line_ma240 = ax1.plot(data.index, data["MA240"], label="MA240", color="darkred", linewidth=1)[0] - line_ma720 = ax1.plot(data.index, data["MA720"], label="MA720", color="cyan", linewidth=1)[0] - line_ma1440 = ax1.plot(data.index, data["MA1440"], label="MA1440", color="magenta", linewidth=1)[0] - - # Bollinger Bands - line_upper = ax1.plot(data.index, data["Upper"], label="볼린저 Upper", color="grey", linestyle="--", linewidth=1)[0] - line_lower = ax1.plot(data.index, data["Lower"], label="볼린저 Lower", color="grey", linestyle="--", linewidth=1)[0] - - ax1.fill_between(data.index, data["Lower"], data["Upper"], color="grey", alpha=0.1) - - # 매수 포인트를 신호 유형별로 다르게 표시 (수직선 포함) - # 이동평균선 기반 매수 포인트 - ma_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')] - scatter_ma_buy_points = None - if len(ma_buy_points) > 0: - scatter_ma_buy_points = ax1.scatter(ma_buy_points.index, ma_buy_points['Close'], color='red', s=150, zorder=10, label='MA 매수 포인트', marker='o') - for time in ma_buy_points.index: - ax1.axvline(x=time, color='red', linestyle='-', alpha=0.5, linewidth=1) - - # Deviation40 기반 매수 포인트 - dev40_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')] - scatter_dev40_buy_points = None - if len(dev40_buy_points) > 0: - scatter_dev40_buy_points = ax1.scatter(dev40_buy_points.index, dev40_buy_points['Close'], - facecolors='none', edgecolors='red', linestyle='--', - linewidth=2, s=200, zorder=10, label='Dev40 매수 포인트') - for time in dev40_buy_points.index: - ax1.axvline(x=time, color='red', linestyle='--', alpha=0.5, linewidth=1) - - # Deviation240 기반 매수 포인트 - dev240_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')] - scatter_dev240_buy_points = None - if len(dev240_buy_points) > 0: - scatter_dev240_buy_points = ax1.scatter(dev240_buy_points.index, dev240_buy_points['Close'], - facecolors='none', edgecolors='blue', linestyle='--', - linewidth=2, s=200, zorder=10, label='Dev240 매수 포인트') - for time in dev240_buy_points.index: - ax1.axvline(x=time, color='blue', linestyle='--', alpha=0.5, linewidth=1) - - # Deviation1440 기반 매수 포인트 - dev1440_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')] - scatter_dev1440_buy_points = None - if len(dev1440_buy_points) > 0: - scatter_dev1440_buy_points = ax1.scatter(dev1440_buy_points.index, dev1440_buy_points['Close'], - facecolors='none', edgecolors='purple', linestyle='--', - linewidth=2, s=200, zorder=10, label='Dev1440 매수 포인트') - for time in dev1440_buy_points.index: - ax1.axvline(x=time, color='purple', linestyle='--', alpha=0.5, linewidth=1) - - - - # 마우스 오버 기능 추가 (이동평균선 매수 포인트) - if scatter_ma_buy_points is not None: - cursor = mplcursors.cursor(scatter_ma_buy_points, hover=True) - cursor.connect("add", lambda sel: sel.annotation.set_text( - f'MA 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 마우스 오버 기능 추가 (Deviation40 매수 포인트) - if scatter_dev40_buy_points is not None: - cursor_dev40 = mplcursors.cursor(scatter_dev40_buy_points, hover=True) - cursor_dev40.connect("add", lambda sel: sel.annotation.set_text( - f'Dev40 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 마우스 오버 기능 추가 (Deviation240 매수 포인트) - if scatter_dev240_buy_points is not None: - cursor_dev240 = mplcursors.cursor(scatter_dev240_buy_points, hover=True) - cursor_dev240.connect("add", lambda sel: sel.annotation.set_text( - f'Dev240 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_dev240.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 마우스 오버 기능 추가 (Deviation1440 매수 포인트) - if scatter_dev1440_buy_points is not None: - cursor_dev1440 = mplcursors.cursor(scatter_dev1440_buy_points, hover=True) - cursor_dev1440.connect("add", lambda sel: sel.annotation.set_text( - f'Dev1440 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_dev1440.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - - - # 모든 봉에 마우스 오버 기능 추가 - cursor3 = mplcursors.cursor(line_close, hover=True) - cursor3.connect("add", lambda sel: sel.annotation.set_text( - f'종가\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor3.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - ax1.set_ylabel("가격 (KRW)") - ax1.set_title(f"{symbol} 차트 - 캔들스틱 & 이동평균선", fontsize=14) - ax1.grid(True, linestyle='--', alpha=0.3) - - # 홈 버튼 추가 (초기화 기능) - home_button = ax1.text(0.02, 0.98, '홈', transform=ax1.transAxes, - bbox=dict(boxstyle="round,pad=0.3", facecolor='lightblue', alpha=0.8), - fontsize=10, ha='left', va='top', picker=True) - - # --- 범례 생성 및 인터랙티브 토글 --- - legend = ax1.legend(loc='upper left', bbox_to_anchor=(0.02, 0.85), fontsize=10) - - # 범례 클릭 시 해당 선 토글 기능 - lined = {} - # legend 핸들과 실제 plot 선을 매핑 (생성 순서가 동일하다고 가정) - # Matplotlib 버전에 따라 legend 객체의 핸들 보유 프로퍼티가 다를 수 있음 - if hasattr(legend, "legend_handles"): - legend_handles = legend.legend_handles - elif hasattr(legend, "legendHandles"): - legend_handles = legend.legendHandles - else: - # 마지막 방어(일반적으로 선만 리턴) - legend_handles = legend.get_lines() - plot_lines = [line_close, line_ma5, line_ma20, line_ma40, line_ma120, - line_ma200, line_ma240, line_ma720, line_ma1440, - line_upper, line_lower] - - # None이 아닌 scatter plot만 추가 - if scatter_ma_buy_points is not None: - plot_lines.append(scatter_ma_buy_points) - if scatter_dev40_buy_points is not None: - plot_lines.append(scatter_dev40_buy_points) - if scatter_dev240_buy_points is not None: - plot_lines.append(scatter_dev240_buy_points) - if scatter_dev1440_buy_points is not None: - plot_lines.append(scatter_dev1440_buy_points) - - # zip 길이가 짧은 쪽에 맞춰 매핑 - for leg_handle, orig in zip(legend_handles, plot_lines): - leg_handle.set_picker(True) # 클릭 이벤트 활성화 - lined[leg_handle] = orig - - def on_pick(event): - leg_handle = event.artist - - # 홈 버튼 클릭 처리 - if leg_handle == home_button: - # 그래프 초기화 - ax1.set_xlim(matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1])) - ax1.set_ylim(data['Low'].min() * 0.99, data['High'].max() * 1.01) - ax2.set_xlim(ax1.get_xlim()) - ax2.set_ylim(data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].min().min() * 0.95, - data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].max().max() * 1.05) - fig.canvas.draw_idle() - return - - # 기존 범례 클릭 처리 - orig = lined.get(leg_handle) - if orig is None: - return - vis = not orig.get_visible() - orig.set_visible(vis) - # 범례 아이콘 투명도 조정 - leg_handle.set_alpha(1.0 if vis else 0.2) - fig.canvas.draw_idle() - - fig.canvas.mpl_connect('pick_event', on_pick) - - # Deviation subplot (이격도 보조지표) - line_dev5 = ax2.plot(data.index, data['Deviation5'], color='red', label='Dev5', linewidth=1)[0] - line_dev20 = ax2.plot(data.index, data['Deviation20'], color='blue', label='Dev20', linewidth=1)[0] - line_dev40 = ax2.plot(data.index, data['Deviation40'], color='green', label='Dev40', linewidth=2)[0] - line_dev120 = ax2.plot(data.index, data['Deviation120'], color='purple', label='Dev120', linewidth=1)[0] - line_dev200 = ax2.plot(data.index, data['Deviation200'], color='brown', label='Dev200', linewidth=1)[0] - line_dev240 = ax2.plot(data.index, data['Deviation240'], color='darkred', label='Dev240', linewidth=2)[0] - line_dev720 = ax2.plot(data.index, data['Deviation720'], color='cyan', label='Dev720', linewidth=1)[0] - line_dev1440 = ax2.plot(data.index, data['Deviation1440'], color='magenta', label='Dev1440', linewidth=1)[0] - - cursor_dev = mplcursors.cursor([line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440], hover=True) - cursor_dev.connect("add", lambda sel: sel.annotation.set_text( - f"{sel.artist.get_label()}\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M')}\n값: {sel.target[1]:.2f}" - )) - # 이격도 기준선 추가 - line_h90 = ax2.axhline(90, color='red', linestyle='--', linewidth=2, label='90', alpha=0.7) - line_h95 = ax2.axhline(95, color='green', linestyle='--', linewidth=2, label='95', alpha=0.7) - line_h100 = ax2.axhline(100, color='black', linestyle='-', linewidth=1, label='100', alpha=0.5) - ax2.set_ylabel('이격도 (%)') - ax2.set_title('이격도 보조지표', fontsize=12) - legend2 = ax2.legend(loc='upper left', fontsize=9) - - # Deviation subplot 범례 클릭 토글 기능 - if legend2 is not None: - if hasattr(legend2, "legend_handles"): - legend2_handles = legend2.legend_handles - elif hasattr(legend2, "legendHandles"): - legend2_handles = legend2.legendHandles - else: - legend2_handles = legend2.get_lines() - plot_lines2 = [line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440, line_h90, line_h95] - # 레이블 기준으로 안정적 매핑 - for leg_handle in legend2_handles: - label = leg_handle.get_label() - target_line = next((pl for pl in plot_lines2 if pl.get_label() == label), None) - if target_line is not None: - leg_handle.set_picker(True) - lined[leg_handle] = target_line - ax2.grid(True, linestyle='--', alpha=0.3) - - plt.tight_layout() - - print("그래프를 표시합니다...") - print(f"매수 포인트 수: MA={len(ma_buy_points)}, Dev40={len(dev40_buy_points)}, Dev240={len(dev240_buy_points)}") - - # -------- 확대/축소 및 이동 기능 -------- - press = {} - - def on_scroll(event): - ax = event.inaxes - if ax is None: - return - x_left, x_right = ax.get_xlim() - x_range = (x_right - x_left) - if event.button == 'up': # zoom in - scale = 0.8 - elif event.button == 'down': # zoom out - scale = 1.25 - else: - scale = 1.0 - new_range = x_range * scale - center = event.xdata if event.xdata is not None else (x_left + x_right) / 2 - ax.set_xlim(center - new_range / 2, center + new_range / 2) - # 다른 축들도 동일 적용 (shared x) - for other_ax in fig.axes: - if other_ax is not ax: - other_ax.set_xlim(ax.get_xlim()) - fig.canvas.draw_idle() - - def on_press(event): - if event.button == 1 and event.inaxes is not None: - press['xpress'] = event.xdata - press['axes'] = event.inaxes - - def on_motion(event): - if 'xpress' not in press or press.get('axes') is None or event.inaxes is None: - return - dx = press['xpress'] - event.xdata - for ax in fig.axes: - x_left, x_right = ax.get_xlim() - ax.set_xlim(x_left + dx, x_right + dx) - fig.canvas.draw_idle() - - def on_release(event): - press.clear() - - fig.canvas.mpl_connect('scroll_event', on_scroll) - fig.canvas.mpl_connect('button_press_event', on_press) - fig.canvas.mpl_connect('motion_notify_event', on_motion) - fig.canvas.mpl_connect('button_release_event', on_release) - - plt.show() - - -if __name__ == "__main__": - interval = 60 - days = 90 # 분석 기간을 90일로 늘림 (6월~8월 데이터 포함) - #target_coins = ['ADA','APE','ARB','BONK','HBAR','LINK','ONDO','PEPE','POL','SEI','SHIB','STORJ','SUI','TON','TRX','WLD','XLM','XRP'] - target_coins = ['POL'] - - # 그래프 표시 여부 설정 (성능 향상을 위해 기본값은 False) - show_graphs = True # True로 설정하면 각 코인마다 그래프 표시 - - for symbol in target_coins: - print(f"\n=== {symbol} 저점 기간 분석 시작 ===") - try: - # 저점 기간 분석 - bottom_data, alerts = analyze_bottom_period(symbol, interval, days) - - # 전체 기간 시뮬레이션 (그래프 표시 옵션 추가) - print(f"\n=== {symbol} 전체 기간 시뮬레이션 ===") - if show_graphs: - run_simulation(symbol, interval, days) - else: - # 그래프 없이 빠른 분석만 수행 - data = fetch_price_history(symbol, interval, days) - data = calculate_technical_indicators(data) - data = check_buy_point(symbol, data, simulation=True) - - # 매수 신호 통계만 출력 - total_buy_signals = len(data[data['buy_point'] == 1]) - ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]) - dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]) - dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]) - dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')]) - - print(f"총 매수 신호: {total_buy_signals}") - print(f" - MA 신호: {ma_signals}") - print(f" - Dev40 신호: {dev40_signals}") - print(f" - Dev240 신호: {dev240_signals}") - print(f" - Dev1440 신호: {dev1440_signals}") - - except Exception as e: - print(f"Error analyzing {symbol}: {str(e)}")