From 017989498af2316b17984e373d6b9c0299d0a526 Mon Sep 17 00:00:00 2001 From: dsyoon Date: Sun, 31 Aug 2025 09:31:01 +0900 Subject: [PATCH] init --- monitor.py | 97 ++++---- monitor_coin.py | 18 +- monitor_coin_1h_1.py | 16 +- monitor_coin_1h_2.py | 16 +- requirements.txt | 4 +- simulation_1h.py | 560 +++++++++++++++---------------------------- 6 files changed, 260 insertions(+), 451 deletions(-) diff --git a/monitor.py b/monitor.py index 0a93bb4..bb6f48e 100644 --- a/monitor.py +++ b/monitor.py @@ -225,11 +225,40 @@ class Monitor(HTS): return data # ------------- Strategy ------------- - def buy_ticker_1h(self, symbol: str, data: pd.DataFrame) -> bool: + def buy_sell_ticker_1h(self, symbol: str, data: pd.DataFrame, balances=None, is_inverse: bool = False) -> bool: try: - # 기존 로직 --------------------------------------------------- - #print('BUY: {}'.format(symbol)) - #self.sendMsg('BUY: {}'.format(symbol)) + # 신호 생성 및 최신 포인트 확인 + data = self.annotate_signals(symbol, data) + if data['point'].iloc[-1] != 1: + return False + + # 인버스 데이터: 매수 신호를 매도로 처리 (fall_6p, deviation40 만 허용) + if is_inverse: + # 허용된 인버스 매도 신호만 처리 + last_signal = str(data['signal'].iloc[-1]) if 'signal' in data.columns else '' + if last_signal not in ['fall_6p', 'deviation40']: + return False + current_time = datetime.now() + available_balance = 0 + try: + if balances and symbol in balances: + available_balance = float(balances[symbol].get('balance', 0)) + except Exception: + available_balance = 0 + if available_balance <= 0: + return False + sell_amount = available_balance * 0.7 + _ = self.hts.sellCoinMarket(symbol, 0, sell_amount) + if self.cooldown_file is not None: + try: + self.last_signal[symbol] = str(data['signal'].iloc[-1]) + except Exception: + self.last_signal[symbol] = '' + self.buy_cooldown.setdefault(symbol, {})['sell'] = {'datetime': current_time, 'signal': str(data['signal'].iloc[-1])} + self._save_buy_cooldown() + print(f"{KR_COINS[symbol]} ({symbol}) [{data['signal'].iloc[-1]} 매도], 현재가: {data['Close'].iloc[-1]:.4f}") + self.sendMsg("[KRW-COIN]\n" + f"• 매도 [COIN] {KR_COINS[symbol]} ({symbol}): {data['signal'].iloc[-1]} ({'₩'}{data['Close'].iloc[-1]:.4f})") + return True check_5_week_lowest = False @@ -313,44 +342,7 @@ class Monitor(HTS): return False return True - def sell_ticker_1h(self, symbol: str, data: pd.DataFrame, balances) -> bool: - """Dev40(Deviation40) 매도 조건을 만족할 때만 매도 실행""" - try: - # 최신 캔들의 시그널이 Dev40이 아니면 매도하지 않음 - if data['signal'].iloc[-1] != 'deviation40': - return False - - current_time = datetime.now() - - # 최근 Dev40 매도로부터 20분(1200초) 쿨다운 적용 - last_sell_dt = self.buy_cooldown.get(symbol, {}).get('sell', {}).get('datetime') - if last_sell_dt and self.last_signal.get(symbol) == 'deviation40': - time_diff = current_time - last_sell_dt - if time_diff.total_seconds() < 1200: - remain = 1200 - time_diff.total_seconds() - print(f"{symbol}: 매도 금지 중 (남은 시간: {remain:.0f}초)") - return False - - # 매도 수량/금액 산정 (예: 50,000 KRW 상당) - sell_amount = balances[symbol]['balance'] * 0.7 # KRW 기준 매도 총액 혹은 수량 설정 - - # 실제 매도 실행 (HTS API) - _ = self.hts.sellCoinMarket(symbol, 0, sell_amount) # market 매도 (price 파라미터 미사용) - - # 쿨다운 및 로그 저장 - if self.cooldown_file is not None: - self.last_signal[symbol] = 'deviation40' - self.buy_cooldown.setdefault(symbol, {})['sell'] = {'datetime': current_time, 'signal': 'deviation40'} - self._save_buy_cooldown() - - print(f"{KR_COINS[symbol]} ({symbol}) [Dev40 매도], 현재가: {data['Close'].iloc[-1]:.4f}, 20분간 매도 금지 시작") - self.sendMsg("[KRW-COIN]" + "\n" + self.format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], 'Dev40 매도')) - except Exception as e: - print(f"Error selling {symbol}: {str(e)}") - return False - return True - - def check_point(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame: + def annotate_signals(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame: data = data.copy() data['signal'] = '' data['point'] = 0 @@ -417,6 +409,27 @@ class Monitor(HTS): data.at[data.index[-1], 'signal'] = 'deviation1440' data.at[data.index[-1], 'point'] = 1 + # Deviation720 상향 돌파 매수 (92, 93) + try: + prev_d720 = data['Deviation720'].iloc[i - 1] + curr_d720 = data['Deviation720'].iloc[i] + # 92 상향 돌파 + if prev_d720 < 92 and curr_d720 >= 92: + data.at[data.index[i], 'signal'] = 'Deviation720' + data.at[data.index[i], 'point'] = 1 + if not simulation and data['point'][-3:].sum() > 0: + data.at[data.index[-1], 'signal'] = 'Deviation720' + data.at[data.index[-1], 'point'] = 1 + # 93 상향 돌파 + if prev_d720 < 93 and curr_d720 >= 93: + data.at[data.index[i], 'signal'] = 'Deviation720' + data.at[data.index[i], 'point'] = 1 + if not simulation and data['point'][-3:].sum() > 0: + data.at[data.index[-1], 'signal'] = 'Deviation720' + data.at[data.index[-1], 'point'] = 1 + except Exception: + pass + try: prev_low = data['Low'].iloc[i - 1] curr_close = data['Close'].iloc[i] diff --git a/monitor_coin.py b/monitor_coin.py index bf8776a..2a4e3da 100644 --- a/monitor_coin.py +++ b/monitor_coin.py @@ -23,21 +23,13 @@ class MonitorCoin (Monitor): if data is not None and not data.empty: try: inverseData= self.inverse_data(data) - recent_inverseData = self.check_point(symbol, inverseData) - if recent_inverseData['point'].iloc[-1] != 1: - continue - sell_success = self.sell_ticker_1h(symbol, recent_inverseData, balances) - if not sell_success: - continue + recent_inverseData = self.annotate_signals(symbol, inverseData) + if not self.buy_sell_ticker_1h(symbol, recent_inverseData, balances=balances, is_inverse=True): + pass data = self.calculate_technical_indicators(data) - recent_data = self.check_point(symbol, data) - - if recent_data['point'].iloc[-1] != 1: - continue - buy_success = self.buy_ticker_1h(symbol, recent_data) - if not buy_success: - continue + recent_data = self.annotate_signals(symbol, data) + _ = self.buy_sell_ticker_1h(symbol, recent_data, balances=None, is_inverse=False) except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") diff --git a/monitor_coin_1h_1.py b/monitor_coin_1h_1.py index 22f0823..e835ef7 100644 --- a/monitor_coin_1h_1.py +++ b/monitor_coin_1h_1.py @@ -24,20 +24,12 @@ class MonitorCoin (Monitor): if data is not None and not data.empty: try: inverseData= self.inverse_data(data) - recent_inverseData = self.check_point(symbol, inverseData) - if recent_inverseData['point'].iloc[-1] != 1: - continue - sell_success = self.sell_ticker_1h(symbol, recent_inverseData, balances) - if not sell_success: - continue + recent_inverseData = self.annotate_signals(symbol, inverseData) + _ = self.buy_sell_ticker_1h(symbol, recent_inverseData, balances=balances, is_inverse=True) data = self.calculate_technical_indicators(data) - recent_data = self.check_point(symbol, data) - if recent_data['point'].iloc[-1] != 1: - continue - buy_success = self.buy_ticker_1h(symbol, recent_data) - if not buy_success: - continue + recent_data = self.annotate_signals(symbol, data) + _ = self.buy_sell_ticker_1h(symbol, recent_data, balances=None, is_inverse=False) except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: diff --git a/monitor_coin_1h_2.py b/monitor_coin_1h_2.py index ef0c627..c91a3e0 100644 --- a/monitor_coin_1h_2.py +++ b/monitor_coin_1h_2.py @@ -23,20 +23,12 @@ class MonitorCoin (Monitor): if data is not None and not data.empty: try: inverseData= self.inverse_data(data) - recent_inverseData = self.check_point(symbol, inverseData) - if recent_inverseData['point'].iloc[-1] != 1: - continue - sell_success = self.sell_ticker_1h(symbol, recent_inverseData, balances) - if not sell_success: - continue + recent_inverseData = self.annotate_signals(symbol, inverseData) + _ = self.buy_sell_ticker_1h(symbol, recent_inverseData, balances=balances, is_inverse=True) data = self.calculate_technical_indicators(data) - recent_data = self.check_point(symbol, data) - if recent_data['point'].iloc[-1] != 1: - continue - buy_success = self.buy_ticker_1h(symbol, recent_data) - if not buy_success: - continue + recent_data = self.annotate_signals(symbol, data) + _ = self.buy_sell_ticker_1h(symbol, recent_data, balances=None, is_inverse=False) except Exception as e: print(f"Error processing data for {symbol}: {str(e)}") else: diff --git a/requirements.txt b/requirements.txt index 8bc85de..1e18de3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,6 @@ schedule python-dateutil python-telegram-bot finance-datareader -psutil \ No newline at end of file +psutil +mpld3 +plotly \ No newline at end of file diff --git a/simulation_1h.py b/simulation_1h.py index 3b99d5d..50bc64b 100644 --- a/simulation_1h.py +++ b/simulation_1h.py @@ -1,18 +1,190 @@ from dateutil.relativedelta import relativedelta import pandas as pd import yfinance as yf -import matplotlib.pyplot as plt -import matplotlib.dates -import mplcursors -import matplotlib.lines as mlines # 추가: 범례 프록시용 -plt.rcParams['font.family'] ='AppleGothic' -plt.rcParams['axes.unicode_minus'] =False +import plotly.graph_objs as go +from plotly import subplots +import plotly.io as pio +from datetime import datetime +pio.renderers.default = 'browser' from config import * from monitor import Monitor class Simulation: + + def render_plotly(self, symbol: str, interval_minutes: int, data: pd.DataFrame, inverseData: pd.DataFrame) -> None: + fig = subplots.make_subplots( + rows=3, cols=1, + subplot_titles=("캔들", "이격도/거래량", "장기 이격도"), + shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.03, + row_heights=[0.6, 0.2, 0.2] + ) + + # Row 1: 캔들 + 이동평균 + 볼린저 + fig.add_trace(go.Candlestick(x=data.index, open=data['Open'], high=data['High'], low=data['Low'], close=data['Close'], name='캔들'), row=1, col=1) + for ma_col, color in [('MA5','red'),('MA20','blue'),('MA40','green'),('MA120','purple'),('MA200','brown'),('MA240','darkred'),('MA720','cyan'),('MA1440','magenta')]: + if ma_col in data.columns: + fig.add_trace(go.Scatter(x=data.index, y=data[ma_col], name=ma_col, mode='lines', line=dict(color=color, width=1)), row=1, col=1) + if 'Lower' in data.columns and 'Upper' in data.columns: + fig.add_trace(go.Scatter(x=data.index, y=data['Lower'], name='볼린저 하단', mode='lines', line=dict(color='grey', width=1, dash='dot')), row=1, col=1) + fig.add_trace(go.Scatter(x=data.index, y=data['Upper'], name='볼린저 상단', mode='lines', line=dict(color='grey', width=1, dash='dot')), row=1, col=1) + + # 매수 포인트 + for sig, color in [('movingaverage','red'),('deviation40','orange'),('Deviation720','blue'),('deviation1440','purple'),('fall_6p','black')]: + pts = data[(data['point']==1) & (data['signal']==sig)] + if len(pts)>0: + fig.add_trace(go.Scatter(x=pts.index, y=pts['Close'], mode='markers', name=f'{sig} 매수', marker=dict(color=color, size=8, symbol='circle')), row=1, col=1) + + # 매도 포인트: inverseData의 buy 신호 중 fall_6p, deviation40만 일반 그래프 가격축에 매도로 표시 + inv_sell_pts = inverseData[(inverseData['point']==1) & (inverseData['signal'].isin(['deviation40','fall_6p']))] + if len(inv_sell_pts)>0: + idx = inv_sell_pts.index.intersection(data.index) + if len(idx)>0: + fig.add_trace( + go.Scatter( + x=idx, + y=data.loc[idx, 'Close'], + mode='markers', + name='매도', + marker=dict(color='orange', size=10, symbol='triangle-down') + ), + row=1, col=1 + ) + + # Row 2: 이격도 + 거래량 + for dev_col, color, width in [('Deviation5','red',1),('Deviation20','blue',1),('Deviation40','green',2),('Deviation120','purple',1),('Deviation200','brown',1),('Deviation720','darkred',2),('Deviation720','cyan',1),('Deviation1440','magenta',1)]: + if dev_col in data.columns: + fig.add_trace(go.Scatter(x=data.index, y=data[dev_col], name=dev_col, mode='lines', line=dict(color=color, width=width)), row=2, col=1) + if 'Volume' in data.columns: + fig.add_trace(go.Bar(x=data.index, y=data['Volume'], name='거래량', marker_color='lightgray', opacity=0.5), row=2, col=1) + + # Row 3: 장기 이격도 및 기준선 + for dev_col, color in [('Deviation720','darkred'),('Deviation1440','magenta')]: + if dev_col in data.columns: + fig.add_trace(go.Scatter(x=data.index, y=data[dev_col], name=f'{dev_col}(장기)', mode='lines', line=dict(color=color, width=2)), row=3, col=1) + for h, color in [(90,'red'),(95,'green'),(100,'black')]: + fig.add_hline(y=h, line_width=1, line_dash='dash', line_color=color, row=3, col=1) + + # ----------------- 인버스용 트레이스 (초기 숨김) ----------------- + n_orig = len(fig.data) + + # Row 1: 캔들/MA/볼린저 (inverseData) + fig.add_trace(go.Candlestick(x=inverseData.index, open=inverseData['Open'], high=inverseData['High'], low=inverseData['Low'], close=inverseData['Close'], name='캔들(인버스)', showlegend=True, visible=False), row=1, col=1) + for ma_col, color in [('MA5','red'),('MA20','blue'),('MA40','green'),('MA120','purple'),('MA200','brown'),('MA240','darkred'),('MA720','cyan'),('MA1440','magenta')]: + if ma_col in inverseData.columns: + fig.add_trace(go.Scatter(x=inverseData.index, y=inverseData[ma_col], name=f'{ma_col}(인버스)', mode='lines', line=dict(color=color, width=1), showlegend=True, visible=False), row=1, col=1) + if 'Lower' in inverseData.columns and 'Upper' in inverseData.columns: + fig.add_trace(go.Scatter(x=inverseData.index, y=inverseData['Lower'], name='볼린저 하단(인버스)', mode='lines', line=dict(color='grey', width=1, dash='dot'), showlegend=True, visible=False), row=1, col=1) + fig.add_trace(go.Scatter(x=inverseData.index, y=inverseData['Upper'], name='볼린저 상단(인버스)', mode='lines', line=dict(color='grey', width=1, dash='dot'), showlegend=True, visible=False), row=1, col=1) + + # 인버스 매수 포인트: fall_6p, deviation40만 표시 + for sig, color in [('deviation40','orange'),('fall_6p','black')]: + pts_inv = inverseData[(inverseData['point']==1) & (inverseData['signal']==sig)] + if len(pts_inv)>0: + fig.add_trace(go.Scatter(x=pts_inv.index, y=inverseData.loc[pts_inv.index,'Close'], mode='markers', name=f'{sig} 매수(인버스)', marker=dict(color=color, size=8, symbol='circle'), showlegend=True, visible=False), row=1, col=1) + + # 인버스 보기에서의 매도 포인트: 일반 그래프의 매수를 인버스 그래프의 매도로 표시 (모든 매수 신호 반영) + normal_to_inv_sell = data[(data['point']==1)] + if len(normal_to_inv_sell) > 0: + idx2 = normal_to_inv_sell.index.intersection(inverseData.index) + if len(idx2) > 0: + fig.add_trace( + go.Scatter( + x=idx2, + y=inverseData.loc[idx2, 'Close'], + mode='markers', + name='매도(일반→인버스)', + marker=dict(color='orange', size=10, symbol='triangle-down'), + showlegend=True, + visible=False + ), + row=1, col=1 + ) + + # Row 2: 이격도 + 거래량 (inverseData) + for dev_col, color, width in [('Deviation5','red',1),('Deviation20','blue',1),('Deviation40','green',2),('Deviation120','purple',1),('Deviation200','brown',1),('Deviation720','darkred',2),('Deviation720','cyan',1),('Deviation1440','magenta',1)]: + if dev_col in inverseData.columns: + fig.add_trace(go.Scatter(x=inverseData.index, y=inverseData[dev_col], name=f'{dev_col}(인버스)', mode='lines', line=dict(color=color, width=width), showlegend=True, visible=False), row=2, col=1) + if 'Volume' in inverseData.columns: + fig.add_trace(go.Bar(x=inverseData.index, y=inverseData['Volume'], name='거래량(인버스)', marker_color='lightgray', opacity=0.5, showlegend=True, visible=False), row=2, col=1) + + # Row 3: 장기 이격도 (inverseData) + for dev_col, color in [('Deviation720','darkred'),('Deviation1440','magenta')]: + if dev_col in inverseData.columns: + fig.add_trace(go.Scatter(x=inverseData.index, y=inverseData[dev_col], name=f'{dev_col}(장기-인버스)', mode='lines', line=dict(color=color, width=2), showlegend=True, visible=False), row=3, col=1) + + n_total = len(fig.data) + n_inv = n_total - n_orig + visible_orig = [True]*n_orig + [False]*n_inv + visible_inv = [False]*n_orig + [True]*n_inv + legendtitle_orig = {'text': '일반 그래프'} + legendtitle_inv = {'text': '인버스 그래프'} + + fig.update_layout( + height=1000, + margin=dict(t=180, l=40, r=240, b=40), + title=dict( + text=f"{symbol}, {interval_minutes} 분봉, ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})", + x=0.5, + xanchor='center', + y=0.995, + yanchor='top', + pad=dict(t=10, b=12) + ), + xaxis_rangeslider_visible=False, + xaxis1_rangeslider_visible=False, + xaxis2_rangeslider_visible=False, + legend=dict(orientation='v', yref='paper', yanchor='top', y=1.0, xref='paper', xanchor='left', x=1.02, title=legendtitle_orig), + dragmode='zoom', + updatemenus=[dict( + type='buttons', + direction='left', + x=0.0, + xanchor='left', + y=1.11, + yanchor='top', + pad=dict(t=0, r=10, b=0, l=0), + buttons=[ + dict( + label='홈', + method='update', + args=[ + {'visible': visible_orig}, + { + 'legend': {'title': legendtitle_orig}, + 'xaxis.autorange': True, + 'xaxis2.autorange': True, + 'xaxis3.autorange': True, + 'yaxis.autorange': True, + 'yaxis2.autorange': True, + 'yaxis3.autorange': True, + } + ], + execute=True + ), + dict( + label='인버스', + method='update', + args=[ + {'visible': visible_inv}, + {'legend': {'title': legendtitle_inv, 'orientation': 'v', 'y': 1.0, 'yanchor': 'top', 'x': 1.02, 'xanchor': 'left'}} + ], + args2=[ + {'visible': visible_orig}, + {'legend': {'title': legendtitle_orig, 'orientation': 'v', 'y': 1.0, 'yanchor': 'top', 'x': 1.02, 'xanchor': 'left'}} + ], + execute=True + ), + ] + )] + ) + fig.update_xaxes(title_text='시간', row=3, col=1) + fig.update_yaxes(title_text='가격 (KRW)', row=1, col=1) + fig.update_yaxes(title_text='이격도/거래량', row=2, col=1) + fig.update_yaxes(title_text='장기 이격도', row=3, col=1) + + fig.show(config={'scrollZoom': True, 'displaylogo': False}) def __init__(self) -> None: self.monitor = Monitor() self.INTERVAL_MAP = { @@ -51,7 +223,7 @@ class Simulation: def analyze_bottom_period(self, symbol: str, interval_minutes: int, days: int = 90): data = self.fetch_price_history(symbol, interval_minutes, days) data = self.monitor.calculate_technical_indicators(data) - data = self.monitor.check_point(symbol, data, simulation=True) + data = self.monitor.annotate_signals(symbol, data, simulation=True) print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}") print(f"총 데이터 수: {len(data)}") bottom_start = pd.Timestamp('2025-06-22') @@ -101,10 +273,10 @@ class Simulation: data = self.fetch_price_history(symbol, interval_minutes) inverseData = self.monitor.inverse_data(data) - inverseData = self.monitor.check_point(symbol, inverseData, simulation=True) + inverseData = self.monitor.annotate_signals(symbol, inverseData, simulation=True) data = self.monitor.calculate_technical_indicators(data) - data = self.monitor.check_point(symbol, data, simulation=True) + data = self.monitor.annotate_signals(symbol, data, simulation=True) print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}") print(f"총 데이터 수: {len(data)}") alerts = [] @@ -114,371 +286,16 @@ class Simulation: print(f"\n총 매수 신호 수: {len(alerts)}") ma_signals = len(data[(data['point'] == 1) & (data['signal'] == 'movingaverage')]) dev40_signals = len(data[(data['point'] == 1) & (data['signal'] == 'deviation40')]) - dev240_signals = len(data[(data['point'] == 1) & (data['signal'] == 'deviation240')]) + dev240_signals = len(data[(data['point'] == 1) & (data['signal'] == 'Deviation720')]) dev1440_signals = len(data[(data['point'] == 1) & (data['signal'] == 'deviation1440')]) print(f" - MA 신호: {ma_signals}") print(f" - Dev40 신호: {dev40_signals}") print(f" - Dev240 신호: {dev240_signals}") print(f" - Dev1440 신호: {dev1440_signals}") - fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(15, 8), height_ratios=[3, 1]) - fig.suptitle(f"{symbol} - 시뮬레이션 {interval_minutes}분봉", fontsize=14) - - # ----------------- 마우스 휠 확대/축소 ----------------- - def on_scroll(event): - # event.button: 'up' -> zoom in, 'down' -> zoom out - if event.inaxes not in [ax1, ax2]: - return - ax = event.inaxes - # x 축만 두 축을 동시에 조정 - cur_xlim = ax1.get_xlim() - xdata = event.xdata - if xdata is None: - return - scale_factor = 0.9 if event.button == 'up' else 1/0.9 - new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor - relx = (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0]) - new_left = xdata - new_width * (1 - relx) - new_right = xdata + new_width * relx - # 데이터 영역 벗어나지 않도록 클램프 - xmin, xmax = matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1]) - if new_left < xmin: - new_left = xmin - if new_right > xmax: - new_right = xmax - ax1.set_xlim([new_left, new_right]) - ax2.set_xlim([new_left, new_right]) - # y축은 해당 축만 줌 - cur_ylim = ax.get_ylim() - ydata = event.ydata - if ydata is not None: - new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor - rely = (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0]) - ax.set_ylim([ydata - new_height * (1 - rely), ydata + new_height * rely]) - ax.figure.canvas.draw_idle() - - fig.canvas.mpl_connect('scroll_event', on_scroll) - - # 캔들스틱 차트 추가 (matplotlib 기본 기능 사용) - import matplotlib.dates as mdates - - # 캔들스틱 데이터 준비 - ohlc_data = [] - normal_patches = [] # 추가: 일반 캔들 아티스트 목록 - for i, (idx, row) in enumerate(data.iterrows()): - ohlc_data.append([ - mdates.date2num(idx), - row['Open'], - row['High'], - row['Low'], - row['Close'] - ]) - - # 캔들스틱 그리기 (matplotlib 기본 기능으로 구현) - for ohlc in ohlc_data: - date, open_price, high, low, close = ohlc - - # 캔들 색상 결정 - color = 'red' if close >= open_price else 'blue' - - # 캔들 몸통 그리기 - body_height = abs(close - open_price) - body_bottom = min(open_price, close) - rect = plt.Rectangle((date - 0.3, body_bottom), 0.6, body_height, - facecolor=color, edgecolor='black', alpha=0.7) - ax1.add_patch(rect) - normal_patches.append(rect) # 추가: 리스트에 저장 - - # 캔들 심지 그리기 - ax1.plot([date, date], [low, high], color='black', linewidth=1) - normal_patches.append(ax1.lines[-1]) # 추가: 선도 저장 - - # ----------------- 하이킨아시 캔들스틱 (제거됨) ----------------- - - # 메인 차트 (가격, 이동평균선, 볼린저 밴드) - line_close = ax1.plot(data.index, data["Close"], label="종가", color="black", linewidth=1.5, alpha=0.8)[0] - line_ma5 = ax1.plot(data.index, data["MA5"], label="MA5", color="red", linewidth=1)[0] - line_ma20 = ax1.plot(data.index, data["MA20"], label="MA20", color="blue", linewidth=1)[0] - line_ma40 = ax1.plot(data.index, data["MA40"], label="MA40", color="green", linewidth=1)[0] - line_ma120 = ax1.plot(data.index, data["MA120"], label="MA120", color="purple", linewidth=1)[0] - line_ma200 = ax1.plot(data.index, data["MA200"], label="MA200", color="brown", linewidth=1)[0] - line_ma240 = ax1.plot(data.index, data["MA240"], label="MA240", color="darkred", linewidth=1)[0] - line_ma720 = ax1.plot(data.index, data["MA720"], label="MA720", color="cyan", linewidth=1)[0] - line_ma1440 = ax1.plot(data.index, data["MA1440"], label="MA1440", color="magenta", linewidth=1)[0] - - # Bollinger Bands - line_upper = ax1.plot(data.index, data["Upper"], label="볼린저 Upper", color="grey", linestyle="--", linewidth=1)[0] - line_lower = ax1.plot(data.index, data["Lower"], label="볼린저 Lower", color="grey", linestyle="--", linewidth=1)[0] - - ax1.fill_between(data.index, data["Lower"], data["Upper"], color="grey", alpha=0.1) - - # 매수 포인트를 신호 유형별로 다르게 표시 (수직선 포함) - # 이동평균선 기반 매수 포인트 - ma_points = data[(data['point'] == 1) & (data['signal'] == 'movingaverage')] - scatter_ma_points = None - if len(ma_points) > 0: - scatter_ma_points = ax1.scatter(ma_points.index, ma_points['Close'], color='red', s=150, zorder=10, label='MA 매수 포인트', marker='o') - for time in ma_points.index: - ax1.axvline(x=time, color='red', linestyle='-', alpha=0.5, linewidth=1) - - # Deviation40 기반 매수 포인트 - dev40_points = data[(data['point'] == 1) & (data['signal'] == 'deviation40')] - scatter_dev40_points = None - if len(dev40_points) > 0: - scatter_dev40_points = ax1.scatter(dev40_points.index, dev40_points['Close'], - facecolors='none', edgecolors='red', linestyle='--', - linewidth=2, s=200, zorder=10, label='Dev40 매수 포인트') - for time in dev40_points.index: - ax1.axvline(x=time, color='red', linestyle='--', alpha=0.5, linewidth=1) - - # Deviation240 기반 매수 포인트 - dev240_points = data[(data['point'] == 1) & (data['signal'] == 'deviation240')] - scatter_dev240_points = None - if len(dev240_points) > 0: - scatter_dev240_points = ax1.scatter(dev240_points.index, dev240_points['Close'], - facecolors='none', edgecolors='blue', linestyle='--', - linewidth=2, s=200, zorder=10, label='Dev240 매수 포인트') - for time in dev240_points.index: - ax1.axvline(x=time, color='blue', linestyle='--', alpha=0.5, linewidth=1) - - # Deviation1440 기반 매수 포인트 - dev1440_points = data[(data['point'] == 1) & (data['signal'] == 'deviation1440')] - scatter_dev1440_points = None - if len(dev1440_points) > 0: - scatter_dev1440_points = ax1.scatter(dev1440_points.index, dev1440_points['Close'], - facecolors='none', edgecolors='purple', linestyle='--', - linewidth=2, s=200, zorder=10, label='Dev1440 매수 포인트') - for time in dev1440_points.index: - ax1.axvline(x=time, color='purple', linestyle='--', alpha=0.5, linewidth=1) - - # 마우스 오버 기능 추가 (이동평균선 매수 포인트) - if scatter_ma_points is not None: - cursor = mplcursors.cursor(scatter_ma_points, hover=True) - cursor.connect("add", lambda sel: sel.annotation.set_text( - f'MA 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 마우스 오버 기능 추가 (Deviation40 매수 포인트) - if scatter_dev40_points is not None: - cursor_dev40 = mplcursors.cursor(scatter_dev40_points, hover=True) - cursor_dev40.connect("add", lambda sel: sel.annotation.set_text( - f'Dev40 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 마우스 오버 기능 추가 (Deviation240 매수 포인트) - if scatter_dev240_points is not None: - cursor_dev240 = mplcursors.cursor(scatter_dev240_points, hover=True) - cursor_dev240.connect("add", lambda sel: sel.annotation.set_text( - f'Dev240 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_dev240.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 마우스 오버 기능 추가 (Deviation1440 매수 포인트) - if scatter_dev1440_points is not None: - cursor_dev1440 = mplcursors.cursor(scatter_dev1440_points, hover=True) - cursor_dev1440.connect("add", lambda sel: sel.annotation.set_text( - f'Dev1440 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_dev1440.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # 모든 봉에 마우스 오버 기능 추가 - cursor3 = mplcursors.cursor(line_close, hover=True) - cursor3.connect("add", lambda sel: sel.annotation.set_text( - f'종가\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor3.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - ax1.set_ylabel("가격 (KRW)") - ax1.set_title(f"{symbol} 차트 - 캔들스틱 & 이동평균선", fontsize=14) - ax1.grid(True, linestyle='--', alpha=0.3) - - # 홈 버튼 추가 (초기화 기능) - home_button = ax1.text(0.02, 0.98, '홈', transform=ax1.transAxes, - bbox=dict(boxstyle="round,pad=0.3", facecolor='lightblue', alpha=0.8), - fontsize=10, ha='left', va='top', picker=True) - # 하이킨아시 토글 버튼 추가 - - # --- 범례 생성 및 인터랙티브 토글 --- - # 캔들 및 지표만 범례에 표시 (일반/하이킨아시 토글은 HA 버튼으로 제공) - legend = ax1.legend(loc='upper left', bbox_to_anchor=(0.02, 0.85), fontsize=10) - - # 범례 클릭 시 해당 선 토글 기능 - lined = {} - if hasattr(legend, "legend_handles"): - legend_handles = legend.legend_handles - elif hasattr(legend, "legendHandles"): - legend_handles = legend.legendHandles - else: - legend_handles = legend.get_lines() - plot_lines = [line_close, line_ma5, line_ma20, line_ma40, line_ma120, - line_ma200, line_ma240, line_ma720, line_ma1440, - line_upper, line_lower] - if scatter_ma_points is not None: - plot_lines.append(scatter_ma_points) - if scatter_dev40_points is not None: - plot_lines.append(scatter_dev40_points) - if scatter_dev240_points is not None: - plot_lines.append(scatter_dev240_points) - if scatter_dev1440_points is not None: - plot_lines.append(scatter_dev1440_points) - - # --------- 매도 포인트(인버스 데이터) 표시 --------- - # Deviation40 기반 매도 포인트 - sell_dev40_points = inverseData[(inverseData['point'] == 1) & (inverseData['signal'] == 'deviation40')] - scatter_sell_dev40_points = None - if len(sell_dev40_points) > 0: - scatter_sell_dev40_points = ax1.scatter(sell_dev40_points.index, - data.loc[sell_dev40_points.index, 'Close'], - facecolors='none', edgecolors='orange', marker='v', linewidth=2, - s=200, zorder=10, label='Dev40 매도 포인트') - for time in sell_dev40_points.index: - ax1.axvline(x=time, color='orange', linestyle='--', alpha=0.5, linewidth=1) - - # -------- 매도 포인트 마우스 오버 -------- - if scatter_sell_dev40_points is not None: - cursor_sell_dev40 = mplcursors.cursor(scatter_sell_dev40_points, hover=True) - cursor_sell_dev40.connect("add", lambda sel: sel.annotation.set_text( - f'Dev40 매도신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}' - )) - cursor_sell_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False)) - - # -------- plot_lines 업데이트 -------- - if scatter_sell_dev40_points is not None: - plot_lines.append(scatter_sell_dev40_points) - - # 기존 범례 제거 후 새로 생성하여 매도 항목 포함 - if legend is not None: - legend.remove() - legend = ax1.legend(loc='upper left', bbox_to_anchor=(0.02, 0.85), fontsize=10) - # 새 legend_handles 추출 - if hasattr(legend, "legend_handles"): - legend_handles = legend.legend_handles - elif hasattr(legend, "legendHandles"): - legend_handles = legend.legendHandles - else: - legend_handles = legend.get_lines() - - # lined 사전에 새 핸들 매핑 추가 - for leg_handle, orig in zip(legend_handles[:len(plot_lines)], plot_lines): - leg_handle.set_picker(True) - lined[leg_handle] = orig - - # 하이킨아시/일반 토글 상태 변수 - - def on_pick(event): - leg_handle = event.artist - # 홈 버튼 동작 - if leg_handle == home_button: - ax1.set_xlim(matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1])) - ax1.set_ylim(data['Low'].min() * 0.99, data['High'].max() * 1.01) - ax2.set_xlim(ax1.get_xlim()) - ax2.set_ylim( - data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].min().min() * 0.95, - data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].max().max() * 1.05, - ) - fig.canvas.draw_idle() - return - # 선 토글 처리 - orig = lined.get(leg_handle) - if orig is None: - return - vis = not orig.get_visible() - orig.set_visible(vis) - leg_handle.set_alpha(1.0 if vis else 0.2) - fig.canvas.draw_idle() - - fig.canvas.mpl_connect('pick_event', on_pick) - - # Deviation subplot (이격도 보조지표) - line_dev5 = ax2.plot(data.index, data['Deviation5'], color='red', label='Dev5', linewidth=1)[0] - line_dev20 = ax2.plot(data.index, data['Deviation20'], color='blue', label='Dev20', linewidth=1)[0] - line_dev40 = ax2.plot(data.index, data['Deviation40'], color='green', label='Dev40', linewidth=2)[0] - line_dev120 = ax2.plot(data.index, data['Deviation120'], color='purple', label='Dev120', linewidth=1)[0] - line_dev200 = ax2.plot(data.index, data['Deviation200'], color='brown', label='Dev200', linewidth=1)[0] - line_dev240 = ax2.plot(data.index, data['Deviation240'], color='darkred', label='Dev240', linewidth=2)[0] - line_dev720 = ax2.plot(data.index, data['Deviation720'], color='cyan', label='Dev720', linewidth=1)[0] - line_dev1440 = ax2.plot(data.index, data['Deviation1440'], color='magenta', label='Dev1440', linewidth=1)[0] - - cursor_dev = mplcursors.cursor([line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440], hover=True) - cursor_dev.connect("add", lambda sel: sel.annotation.set_text( - f"{sel.artist.get_label()}\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M')}\n값: {sel.target[1]:.2f}" - )) - line_h90 = ax2.axhline(90, color='red', linestyle='--', linewidth=2, label='90', alpha=0.7) - line_h95 = ax2.axhline(95, color='green', linestyle='--', linewidth=2, label='95', alpha=0.7) - line_h100 = ax2.axhline(100, color='black', linestyle='-', linewidth=1, label='100', alpha=0.5) - ax2.set_ylabel('이격도 (%)') - ax2.set_title('이격도 보조지표', fontsize=12) - legend2 = ax2.legend(loc='upper left', fontsize=9) - - if legend2 is not None: - if hasattr(legend2, "legend_handles"): - legend2_handles = legend2.legend_handles - elif hasattr(legend2, "legendHandles"): - legend2_handles = legend2.legendHandles - else: - legend2_handles = legend2.get_lines() - plot_lines2 = [line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440, line_h90, line_h95] - for leg_handle in legend2_handles: - label = leg_handle.get_label() - target_line = next((pl for pl in plot_lines2 if pl.get_label() == label), None) - if target_line is not None: - leg_handle.set_picker(True) - lined[leg_handle] = target_line - ax2.grid(True, linestyle='--', alpha=0.3) - - plt.tight_layout() - - print("그래프를 표시합니다...") - print(f"매수 포인트 수: MA={len(ma_points)}, Dev40={len(dev40_points)}, Dev240={len(dev240_points)}") - - # -------- 확대/축소 및 이동 기능 -------- - press = {} - - def on_scroll(event): - ax = event.inaxes - if ax is None: - return - x_left, x_right = ax.get_xlim() - x_range = (x_right - x_left) - if event.button == 'up': # zoom in - scale = 0.8 - elif event.button == 'down': # zoom out - scale = 1.25 - else: - scale = 1.0 - new_range = x_range * scale - center = event.xdata if event.xdata is not None else (x_left + x_right) / 2 - ax.set_xlim(center - new_range / 2, center + new_range / 2) - for other_ax in fig.axes: - if other_ax is not ax: - other_ax.set_xlim(ax.get_xlim()) - fig.canvas.draw_idle() - - def on_press(event): - if event.button == 1 and event.inaxes is not None: - press['xpress'] = event.xdata - press['axes'] = event.inaxes - - def on_motion(event): - if 'xpress' not in press or press.get('axes') is None or event.inaxes is None: - return - dx = press['xpress'] - event.xdata - for ax in fig.axes: - x_left, x_right = ax.get_xlim() - ax.set_xlim(x_left + dx, x_right + dx) - fig.canvas.draw_idle() - - def on_release(event): - press.clear() - - fig.canvas.mpl_connect('scroll_event', on_scroll) - fig.canvas.mpl_connect('button_press_event', on_press) - fig.canvas.mpl_connect('motion_notify_event', on_motion) - fig.canvas.mpl_connect('button_release_event', on_release) - - plt.show() + # Plotly 기반 시각화로 전환 + self.render_plotly(symbol, interval_minutes, data, inverseData) + return if __name__ == "__main__": sim = Simulation() @@ -497,14 +314,15 @@ if __name__ == "__main__": data = sim.fetch_price_history(symbol, interval, days) inverseData = sim.monitor.inverse_data(data) - inverseData = sim.monitor.check_point(symbol, inverseData, simulation=True) + inverseData = sim.monitor.annotate_signals(symbol, inverseData, simulation=True) data = sim.monitor.calculate_technical_indicators(data) - data = sim.monitor.check_point(symbol, data, simulation=True) + data = sim.monitor.annotate_signals(symbol, data, simulation=True) + total_signals = len(data[data['point'] == 1]) ma_signals = len(data[(data['point'] == 1) & (data['signal'] == 'movingaverage')]) dev40_signals = len(data[(data['point'] == 1) & (data['signal'] == 'deviation40')]) - dev240_signals = len(data[(data['point'] == 1) & (data['signal'] == 'deviation240')]) + dev240_signals = len(data[(data['point'] == 1) & (data['signal'] == 'Deviation720')]) dev1440_signals = len(data[(data['point'] == 1) & (data['signal'] == 'deviation1440')]) print(f"총 매수 신호: {total_signals}") print(f" - MA 신호: {ma_signals}")