Files
DeepCoin/stock_simulation.py
dsyoon 4b3e98d100 init
2025-08-07 00:24:21 +09:00

459 lines
21 KiB
Python

import time
from datetime import datetime
from dateutil.relativedelta import relativedelta
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import matplotlib.dates
import mplcursors
plt.rcParams['font.family'] ='AppleGothic'
plt.rcParams['axes.unicode_minus'] =False
from config import *
from stock_monitor import calculate_technical_indicators, detect_turnaround_signal, get_coin_more_data, check_buy_point
# 비트/알트코인 KRW 마켓 식별: 문자열 "-KRW" 포함 여부로 간단 구분
INTERVAL_MAP = {
60: "60m", # 1시간 (yfinance)
240: "4h", # 4시간 (yfinance)
}
BITHUMB_MAX_COUNT = 3000 # API 최대 3000 캔들
def fetch_price_history(symbol: str, interval_minutes: int, days: int = 30) -> pd.DataFrame:
"""최근 `days`일 데이터(캔들)를 가져온다. 코인(-KRW)은 빗썸, 그 외 yfinance."""
if symbol in KR_COINS:
bong_count = 3000
return get_coin_more_data(symbol, interval_minutes, bong_count=bong_count)
# -------- 주식/ETF/해외코인 (yfinance) --------
if interval_minutes not in INTERVAL_MAP:
raise ValueError("interval must be 60 or 240")
interval_str = INTERVAL_MAP[interval_minutes]
df = yf.download(
tickers=symbol,
period=f"{days}d",
interval=interval_str,
progress=False,
)
if df.empty:
raise RuntimeError("No data fetched. Check symbol or interval support.")
return df
def analyze_bottom_period(symbol: str, interval_minutes: int, days: int = 90):
"""저점 기간(6월 22일~7월 9일) 분석"""
data = fetch_price_history(symbol, interval_minutes, days)
data = calculate_technical_indicators(data)
print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}")
print(f"총 데이터 수: {len(data)}")
# 저점 기간 필터링 (6월 22일~7월 9일)
bottom_start = pd.Timestamp('2025-06-22')
bottom_end = pd.Timestamp('2025-07-09')
bottom_data = data[(data.index >= bottom_start) & (data.index <= bottom_end)]
if len(bottom_data) == 0:
print("저점 기간 데이터가 없습니다.")
return
print(f"\n저점 기간 데이터: {bottom_data.index[0]} ~ {bottom_data.index[-1]}")
print(f"저점 기간 데이터 수: {len(bottom_data)}")
# 저점 기간의 기술적 지표 분석
print("\n=== 저점 기간 기술적 지표 분석 ===")
# 1. 가격 분석
min_price = bottom_data['Low'].min()
max_price = bottom_data['High'].max()
avg_price = bottom_data['Close'].mean()
print(f"최저가: {min_price:.4f}")
print(f"최고가: {max_price:.4f}")
print(f"평균가: {avg_price:.4f}")
print(f"가격 변동폭: {((max_price - min_price) / min_price * 100):.2f}%")
# 3. 볼린저 밴드 분석
bb_lower_min = bottom_data['Lower'].min()
bb_upper_max = bottom_data['Upper'].max()
print(f"\n볼린저 밴드 분석:")
print(f"하단 밴드 최저: {bb_lower_min:.4f}")
print(f"상단 밴드 최고: {bb_upper_max:.4f}")
# 4. 거래량 분석
volume_avg = bottom_data['Volume'].mean()
volume_max = bottom_data['Volume'].max()
print(f"\n거래량 분석:")
print(f"평균 거래량: {volume_avg:.0f}")
print(f"최대 거래량: {volume_max:.0f}")
# 5. 실제 저점 찾기
actual_bottom_idx = bottom_data['Low'].idxmin()
actual_bottom_price = bottom_data.loc[actual_bottom_idx, 'Low']
actual_bottom_date = actual_bottom_idx
print(f"\n실제 저점:")
print(f"날짜: {actual_bottom_date}")
print(f"가격: {actual_bottom_price:.4f}")
# 실제 저점에서 RSI 출력 제거
# print(f"RSI: {bottom_data.loc[actual_bottom_idx, 'RSI']:.2f}")
print(f"볼린저 하단 대비: {((actual_bottom_price - bottom_data.loc[actual_bottom_idx, 'Lower']) / bottom_data.loc[actual_bottom_idx, 'Lower'] * 100):.2f}%")
# 6. 매수 신호 분석
print(f"\n=== 매수 신호 분석 ===")
# 현재 매수 조건으로 저점 기간에서 매수 신호가 몇 개 발생하는지 확인
alerts = []
debug_info = []
for i in range(len(bottom_data)):
slice_df = data.iloc[:data.index.get_loc(bottom_data.index[i]) + 1]
info = detect_turnaround_signal(symbol, slice_df, interval=interval_minutes)
if info:
debug_info.append({
'date': bottom_data.index[i],
'price': bottom_data['Close'].iloc[i],
'alert': info['alert'],
'details': info['details']
})
if info['alert']:
alerts.append((bottom_data.index[i], bottom_data['Close'].iloc[i]))
print(f"저점 기간 매수 신호 수: {len(alerts)}")
if alerts:
print("매수 신호 발생 시점:")
for date, price in alerts:
print(f" {date}: {price:.4f}")
return bottom_data, alerts
def run_simulation(symbol: str, interval_minutes: int, days: int = 30):
data = fetch_price_history(symbol, interval_minutes)
data = calculate_technical_indicators(data)
data = check_buy_point(data, simulation=True)
print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}")
print(f"총 데이터 수: {len(data)}")
# 파라미터 후보군 (표준화된 기술적 분석 기준)
param_candidates = [
{'rsi_oversold': 32, 'bb_distance': 0.06, 'near_low_tolerance': 0.01, 'volume_multiplier': 2.0}, # 기본 설정
{'rsi_oversold': 30, 'bb_distance': 0.05, 'near_low_tolerance': 0.008, 'volume_multiplier': 2.5}, # 엄격한 설정
{'rsi_oversold': 28, 'bb_distance': 0.04, 'near_low_tolerance': 0.005, 'volume_multiplier': 3.0}, # 매우 엄격한 설정
]
alerts = []
for params in param_candidates:
alerts.clear()
for i in range(len(data)):
slice_df = data.iloc[: i + 1]
info = detect_turnaround_signal(symbol, slice_df, interval=interval_minutes, params=params)
if info and info['alert']:
alerts.append((slice_df.index[-1], slice_df['Close'].iloc[-1]))
print(f"\n총 매수 신호 수: {len(alerts)}")
# 서브플롯 생성 (가격 + Deviation)
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(15, 8), height_ratios=[3, 1])
fig.suptitle(f"{symbol} - 시뮬레이션 {interval_minutes}분봉", fontsize=14)
# ----------------- 마우스 휠 확대/축소 -----------------
def on_scroll(event):
# event.button: 'up' -> zoom in, 'down' -> zoom out
if event.inaxes not in [ax1, ax2]:
return
ax = event.inaxes
# x 축만 두 축을 동시에 조정
cur_xlim = ax1.get_xlim()
xdata = event.xdata
if xdata is None:
return
scale_factor = 0.9 if event.button == 'up' else 1/0.9
new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor
relx = (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0])
new_left = xdata - new_width * (1 - relx)
new_right = xdata + new_width * relx
# 데이터 영역 벗어나지 않도록 클램프
xmin, xmax = matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1])
if new_left < xmin:
new_left = xmin
if new_right > xmax:
new_right = xmax
ax1.set_xlim([new_left, new_right])
ax2.set_xlim([new_left, new_right])
# y축은 해당 축만 줌
cur_ylim = ax.get_ylim()
ydata = event.ydata
if ydata is not None:
new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor
rely = (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0])
ax.set_ylim([ydata - new_height * (1 - rely), ydata + new_height * rely])
ax.figure.canvas.draw_idle()
fig.canvas.mpl_connect('scroll_event', on_scroll)
# 메인 차트 (가격, 이동평균선, 볼린저 밴드)
line_close = ax1.plot(data.index, data["Close"], label="종가", color="black", linewidth=1.5)[0]
line_ma5 = ax1.plot(data.index, data["MA5"], label="MA5", color="red", linewidth=1)[0]
line_ma20 = ax1.plot(data.index, data["MA20"], label="MA20", color="blue", linewidth=1)[0]
line_ma40 = ax1.plot(data.index, data["MA40"], label="MA40", color="green", linewidth=1)[0]
line_ma120 = ax1.plot(data.index, data["MA120"], label="MA120", color="purple", linewidth=1)[0]
line_ma200 = ax1.plot(data.index, data["MA200"], label="MA200", color="brown", linewidth=1)[0]
line_ma240 = ax1.plot(data.index, data["MA240"], label="MA240", color="darkred", linewidth=1)[0]
line_ma720 = ax1.plot(data.index, data["MA720"], label="MA720", color="cyan", linewidth=1)[0]
line_ma1440 = ax1.plot(data.index, data["MA1440"], label="MA1440", color="magenta", linewidth=1)[0]
# Bollinger Bands
line_upper = ax1.plot(data.index, data["Upper"], label="볼린저 Upper", color="grey", linestyle="--", linewidth=1)[0]
line_lower = ax1.plot(data.index, data["Lower"], label="볼린저 Lower", color="grey", linestyle="--", linewidth=1)[0]
ax1.fill_between(data.index, data["Lower"], data["Upper"], color="grey", alpha=0.1)
# 매수 신호 표시
scatter_buy = None
if alerts:
times, prices = zip(*alerts)
scatter_buy = ax1.scatter(times, prices, facecolors='none', edgecolors='red', linewidths=2, s=150, zorder=6, label='매수신호')
for time in times:
ax1.axvline(x=time, color='red', linestyle='--', alpha=0.3)
# 매수 포인트 탐지 및 표시
# 'buy_point' 열 추가
data['buy_point'] = 0
data['buy_signal'] = ''
for i in range(1, len(data)):
# 이동평균선 기반 매수 조건
if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \
all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i-1] for n in [5, 20, 40, 120, 200, 240]) and \
data['MA720'].iloc[i] < data['MA1440'].iloc[i]:
data.at[data.index[i], 'buy_signal'] = 'movingaverage'
data.at[data.index[i], 'buy_point'] = 1
# Deviation40(이격도 40) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation40'
data.at[data.index[i], 'buy_point'] = 1
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
# 매수 포인트를 신호 유형별로 다르게 표시
# 이동평균선 기반 매수 포인트 (빨간 동그라미)
ma_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]
scatter_ma_buy_points = ax1.scatter(ma_buy_points.index, ma_buy_points['Close'], color='red', s=100, zorder=5, label='MA 매수 포인트')
# Deviation40 기반 매수 포인트 (속이 빈 빨간 점선 원)
dev40_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]
scatter_dev40_buy_points = ax1.scatter(dev40_buy_points.index, dev40_buy_points['Close'],
facecolors='none', edgecolors='red', linestyle='--',
linewidth=1, s=150, zorder=5, label='Dev40 매수 포인트')
# Deviation240 기반 매수 포인트 (속이 빈 파란 점선 원)
dev240_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]
scatter_dev240_buy_points = ax1.scatter(dev240_buy_points.index, dev240_buy_points['Close'],
facecolors='none', edgecolors='blue', linestyle='--',
linewidth=1, s=150, zorder=5, label='Dev240 매수 포인트')
# 마우스 오버 기능 추가 (이동평균선 매수 포인트)
if len(ma_buy_points) > 0:
cursor = mplcursors.cursor(scatter_ma_buy_points, hover=True)
cursor.connect("add", lambda sel: sel.annotation.set_text(
f'MA 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation40 매수 포인트)
if len(dev40_buy_points) > 0:
cursor_dev40 = mplcursors.cursor(scatter_dev40_buy_points, hover=True)
cursor_dev40.connect("add", lambda sel: sel.annotation.set_text(
f'Dev40 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation240 매수 포인트)
if len(dev240_buy_points) > 0:
cursor_dev240 = mplcursors.cursor(scatter_dev240_buy_points, hover=True)
cursor_dev240.connect("add", lambda sel: sel.annotation.set_text(
f'Dev240 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev240.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 매수 신호에도 마우스 오버 기능 추가
if scatter_buy is not None:
cursor2 = mplcursors.cursor(scatter_buy, hover=True)
cursor2.connect("add", lambda sel: sel.annotation.set_text(
f'매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor2.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 모든 봉에 마우스 오버 기능 추가
cursor3 = mplcursors.cursor(line_close, hover=True)
cursor3.connect("add", lambda sel: sel.annotation.set_text(
f'종가\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor3.connect("remove", lambda sel: sel.annotation.set_visible(False))
ax1.set_ylabel("가격")
# --- 범례 생성 및 인터랙티브 토글 ---
legend = ax1.legend(loc='upper left', fontsize=10)
ax1.grid(True, linestyle='--', alpha=0.5)
# 범례 클릭 시 해당 선 토글 기능
lined = {}
# legend 핸들과 실제 plot 선을 매핑 (생성 순서가 동일하다고 가정)
# Matplotlib 버전에 따라 legend 객체의 핸들 보유 프로퍼티가 다를 수 있음
if hasattr(legend, "legend_handles"):
legend_handles = legend.legend_handles
elif hasattr(legend, "legendHandles"):
legend_handles = legend.legendHandles
else:
# 마지막 방어(일반적으로 선만 리턴)
legend_handles = legend.get_lines()
plot_lines = [line_close, line_ma5, line_ma20, line_ma40, line_ma120,
line_ma200, line_ma240, line_ma720, line_ma1440,
line_upper, line_lower, scatter_ma_buy_points, scatter_dev40_buy_points, scatter_dev240_buy_points]
# 매수신호 scatter가 있으면 포함
if scatter_buy is not None:
plot_lines.append(scatter_buy)
# zip 길이가 짧은 쪽에 맞춰 매핑
for leg_handle, orig in zip(legend_handles, plot_lines):
leg_handle.set_picker(True) # 클릭 이벤트 활성화
lined[leg_handle] = orig
def on_pick(event):
leg_handle = event.artist
orig = lined.get(leg_handle)
if orig is None:
return
vis = not orig.get_visible()
orig.set_visible(vis)
# 범례 아이콘 투명도 조정
leg_handle.set_alpha(1.0 if vis else 0.2)
fig.canvas.draw_idle()
fig.canvas.mpl_connect('pick_event', on_pick)
# Deviation subplot
line_dev5 = ax2.plot(data.index, data['Deviation5'], color='red', label='Dev5')[0]
line_dev20 = ax2.plot(data.index, data['Deviation20'], color='blue', label='Dev20')[0]
line_dev40 = ax2.plot(data.index, data['Deviation40'], color='green', label='Dev40')[0]
line_dev120 = ax2.plot(data.index, data['Deviation120'], color='purple', label='Dev120')[0]
line_dev200 = ax2.plot(data.index, data['Deviation200'], color='brown', label='Dev200')[0]
line_dev240 = ax2.plot(data.index, data['Deviation240'], color='darkred', label='Dev240')[0]
line_dev720 = ax2.plot(data.index, data['Deviation720'], color='cyan', label='Dev720')[0]
line_dev1440 = ax2.plot(data.index, data['Deviation1440'], color='magenta', label='Dev1440')[0]
cursor_dev = mplcursors.cursor([line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440], hover=True)
cursor_dev.connect("add", lambda sel: sel.annotation.set_text(
f"{sel.artist.get_label()}\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M')}\n값: {sel.target[1]:.2f}"
))
line_h90 = ax2.axhline(90, color='red', linestyle='--', linewidth=1, label='90')
line_h95 = ax2.axhline(95, color='green', linestyle='--', linewidth=1, label='93')
ax2.set_ylabel('Deviation %')
legend2 = ax2.legend(loc='upper left', fontsize=9)
# Deviation subplot 범례 클릭 토글 기능
if legend2 is not None:
if hasattr(legend2, "legend_handles"):
legend2_handles = legend2.legend_handles
elif hasattr(legend2, "legendHandles"):
legend2_handles = legend2.legendHandles
else:
legend2_handles = legend2.get_lines()
plot_lines2 = [line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440, line_h90, line_h95]
# 레이블 기준으로 안정적 매핑
for leg_handle in legend2_handles:
label = leg_handle.get_label()
target_line = next((pl for pl in plot_lines2 if pl.get_label() == label), None)
if target_line is not None:
leg_handle.set_picker(True)
lined[leg_handle] = target_line
ax2.grid(True, linestyle='--', alpha=0.3)
plt.tight_layout()
# -------- 확대/축소 및 이동 기능 --------
press = {}
def on_scroll(event):
ax = event.inaxes
if ax is None:
return
x_left, x_right = ax.get_xlim()
x_range = (x_right - x_left)
if event.button == 'up': # zoom in
scale = 0.8
elif event.button == 'down': # zoom out
scale = 1.25
else:
scale = 1.0
new_range = x_range * scale
center = event.xdata if event.xdata is not None else (x_left + x_right) / 2
ax.set_xlim(center - new_range / 2, center + new_range / 2)
# 다른 축들도 동일 적용 (shared x)
for other_ax in fig.axes:
if other_ax is not ax:
other_ax.set_xlim(ax.get_xlim())
fig.canvas.draw_idle()
def on_press(event):
if event.button == 1 and event.inaxes is not None:
press['xpress'] = event.xdata
press['axes'] = event.inaxes
def on_motion(event):
if 'xpress' not in press or press.get('axes') is None or event.inaxes is None:
return
dx = press['xpress'] - event.xdata
for ax in fig.axes:
x_left, x_right = ax.get_xlim()
ax.set_xlim(x_left + dx, x_right + dx)
fig.canvas.draw_idle()
def on_release(event):
press.clear()
fig.canvas.mpl_connect('scroll_event', on_scroll)
fig.canvas.mpl_connect('button_press_event', on_press)
fig.canvas.mpl_connect('motion_notify_event', on_motion)
fig.canvas.mpl_connect('button_release_event', on_release)
plt.show()
if __name__ == "__main__":
interval = 60
days = 90 # 분석 기간을 90일로 늘림 (6월~8월 데이터 포함)
target_coins = ['ADA','APE','ARB','BONK','HBAR','LINK','ONDO','PEPE','SEI','SHIB','STORJ','SUI','TON','TRX','WLD','XLM','XRP']
#target_coins = ['APE']
for symbol in target_coins:
print(f"\n=== {symbol} 저점 기간 분석 시작 ===")
try:
# 저점 기간 분석
bottom_data, alerts = analyze_bottom_period(symbol, interval, days)
# 전체 기간 시뮬레이션
print(f"\n=== {symbol} 전체 기간 시뮬레이션 ===")
run_simulation(symbol, interval, days)
except Exception as e:
print(f"Error analyzing {symbol}: {str(e)}")