from math import nan import pandas as pd import plotly.graph_objects as go from plotly import subplots import os import sqlite3 from hts.HTS import HTS from stock.util.Stock2Vector import Stock2Vector from stock.util.LabelChecker import LabelChecker from hts.BuySellChecker import BuySellChecker class Simulation (HTS): stock2Vector = None buySellChecker = None def __init__(self, RESOURCE_PATH): super().__init__(RESOURCE_PATH) self.RESOURCE_PATH = RESOURCE_PATH self.buySellChecker = BuySellChecker() try: self.stock2Vector = Stock2Vector(RESOURCE_PATH) self.labelChecker = LabelChecker(RESOURCE_PATH) except: pass #self.connect() return def draw(self, stock_code, given_day, data, bsLine): if bsLine is None: return # 어제 데이터는 지운다. data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])] buy_line = bsLine['buy'][len(bsLine['buy'])-len(data):] buy_weight_line = bsLine['buy_weight'][len(bsLine['buy'])-len(data):] sell_line = bsLine['sell'][len(bsLine['buy'])-len(data):] # 그래프 설정을 위한 변수를 생성한다. data = data.astype({'open': 'int', 'high': 'int', 'low': 'int', 'close': 'int', 'volume': 'int', 'avg5': 'float', 'avg20': 'float', 'avg60': 'float', 'avg120': 'float', 'avg200': 'float', 'disparity_avg5': 'float', 'disparity_avg20': 'float', 'disparity_avg60': 'float', 'disparity_avg120': 'float', 'disparity_avg200': 'float', 'fast_k': 'float', 'slow_k': 'float', 'slow_d': 'float', 'macd': 'float', 'macds': 'float', 'macdo': 'float', 'rsi': 'float', 'rsis': 'float' }) buy_size = [] buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan buy_size.append(0) else: buy_colors.append("#0C752E") buy_size.append(10 + (5 * buy_weight_line[i])) sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") volume_colors = [] for i in range(len(buy_line)): if data['open'][i] > data['close'][i]: volume_colors.append("#0000FF") elif data['open'][i] < data['close'][i]: volume_colors.append("#FF0000") else: volume_colors.append("#000000") # 그래프를 설정한다. buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) #volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume') volume_line = go.Bar(x=data['date'], y=data["volume"], marker_color=volume_colors, name='volume') disparity_avg5 = go.Scatter(x=data['date'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#F81191') disparity_avg20 = go.Scatter(x=data['date'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#097F19') disparity_avg30 = go.Scatter(x=data['date'], y=data["disparity_avg30"], name="disparity_avg30", line_color='#097F19') disparity_avg60 = go.Scatter(x=data['date'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#671BEA') disparity_avg120 = go.Scatter(x=data['date'], y=data["disparity_avg120"], name="disparity_avg120", line_color='#DFB809') disparity_avg200 = go.Scatter(x=data['date'], y=data["disparity_avg200"], name="disparity_avg200", line_color='#000000') macd_line = go.Scatter(x=data['date'], y=data["macd"], line=dict(color='red', width=2), name='macd') macd_s_line = go.Scatter(x=data['date'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds') macd_o_line = go.Bar(x=data['date'], y=data["macdo"], marker_color='purple', name='macdo') # fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k') slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d') rsi_line = go.Scatter(x=data['date'], y=data["rsi"], line=dict(color='red', width=2), name='rsi') rsis_line = go.Scatter(x=data['date'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis') upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000') lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000') avg5 = go.Scatter(x=data['date'], y=data["avg5"], name="avg5", line_color='#F81191') avg20 = go.Scatter(x=data['date'], y=data["avg20"], name="avg20", line_color='#097F19') avg30 = go.Scatter(x=data['date'], y=data["avg30"], name="avg30", line_color='#097F19') avg60 = go.Scatter(x=data['date'], y=data["avg60"], name="avg60", line_color='#671BEA') avg120 = go.Scatter(x=data['date'], y=data["avg120"], name="avg120", line_color='#DFB809') avg200 = go.Scatter(x=data['date'], y=data["avg200"], name="avg200", line_color='#000000') changeLine = go.Scatter(x=data['date'], y=data["changeLine"], name='changeLine', line_color='#14A200') #CF6E0D baseLine = go.Scatter(x=data['date'], y=data["baseLine"], name='baseLine', line_color='orange') laggingSpan = go.Scatter(x=data['date'], y=data["laggingSpan"], name='laggingSpan', line_color='#B50ABB') leadingSpan1 = go.Scatter(x=data['date'], y=data["leadingSpan1"], name='leadingSpan1', line_color='black') leadingSpan2 = go.Scatter(x=data['date'], y=data["leadingSpan2"], name='leadingSpan2', line_color='black') text_list = [] for i in range(len(data['macd'])): text = "{}

open: {}
close: {}
high: {}
low: {}
volume: {}

avg5: {:.2f}
avg20: {:.2f}
avg60: {:.2f}
avg200: {:.2f}

d_avg5: {:.6f}
d_avg20: {:.6f}
d_avg60: {:.6f}
d_avg200: {:.6f}
d_avg5_60: {:.6f}
d_avg5_200: {:.6f}
d_avg200-5: {:.6f}

macd: {:.2f}
slow_k: {:.2f}
rsi: {:.2f}".format( data['date'][i], data['open'][i],data['close'][i],data['high'][i],data['low'][i],data['volume'][i], data['avg5'][i],data['avg20'][i],data['avg60'][i],data['avg200'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity_avg200'][i], max(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i]) - min(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i]), max(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity_avg200'][i]) - min(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity_avg200'][i]), abs(data['disparity_avg200'][i] - data['disparity_avg5'][i]), data['macd'][i],data['slow_k'][i],data['rsi'][i] ) text_list.append(text) candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False, text=text_list, hoverinfo="text" ) # candle_data = [avg5, avg20, avg30, avg60, avg120, avg200, buy_check, sell_check, laggingSpan, changeLine, baseLine, upper, lower, candle_stick] # candle_data = [avg5, avg20, avg30, avg60, avg200, buy_check, sell_check, upper, lower, candle_stick] # candle_data = [buy_check, sell_check, changeLine, baseLine, changeLine, laggingSpan, candle_stick] candle_data = [avg5, avg200, buy_check, sell_check, candle_stick] volume_data = [volume_line] disparity_data = [disparity_avg5, disparity_avg20, disparity_avg30, disparity_avg60, disparity_avg120, disparity_avg200] macd_data = [macd_line, macd_s_line, macd_o_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] # 그래프를 그린다. """ fig = go.Figure(data=candle_data) fig.update_layout(title=stock_code + "_" + given_day) fig.show() """ fig = subplots.make_subplots( rows=6, cols=1, subplot_titles=("거래량", "이격도", "스토캐스틱", "RSI", "MACD", '캔들'), #specs=[[{}], [{}], [{}], [{}], [{}], [{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01, row_heights=[200, 200, 200, 200, 200, 700] ) for trace in volume_data: fig.append_trace(trace, 1, 1) for trace in disparity_data: fig.append_trace(trace, 2, 1) for trace in stochastic_data: fig.append_trace(trace, 3, 1) for trace in rsi_data: fig.append_trace(trace, 4, 1) for trace in macd_data: fig.append_trace(trace, 5, 1) for trace in candle_data: fig.append_trace(trace, 6, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) #fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count)) fig.show() return def makeTickData(self, data, mins=30): result = {"check": set(), "time": [], "open": [], "close": [], "high": [], "low": [], "vol": [], "label": []} for i in range(mins, len(data['time'])+1): result["check"].add(data['time'][i-1]) result["time"].append(data['time'][i-1]) result["open"].append(data['open'][i-mins]) result["close"].append(data['close'][i-1]) result["high"].append(max(data['high'][i - mins: i])) result["low"].append(min(data['low'][i - mins: i])) result["vol"].append(sum(data['vol'][i - mins: i])) return result def simulate(self, stock, analyzed_day=1000): stock_code = stock['code'] for ymd in stock['ymd']: LAST_DATA = self.stock2Vector.getLastData(stock_code, ymd) # 1분봉 result = self.stock2Vector.getRealTime(stock_code, ymd, LAST_DATA) # 5분봉 #result = self.makeTickData(result, mins=5) # 30분봉 #result = self.makeTickData(result, mins=30) data = self.buySellChecker.analyze(result) data.drop(data.index[:len(data) - analyzed_day], inplace=True) # 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다. #data_5 = self.buySellChecker.analyze(result_5) # 분석일 데이터만 활용한다 (이전 데이터는 제거) #data_5.drop(data_5.index[:len(data_5) - analyzed_day], inplace=True) #data_30 = self.buySellChecker.analyze(result_30) # 분석일 데이터만 활용한다 (이전 데이터는 제거) #data_30.drop(data_30.index[:len(data_30) - analyzed_day], inplace=True) # 사야 할 시점과 팔아야 할 시점을 체크한다. #bsLine = self.buySellChecker.checkTransaction(stock_code, data, data_5, data_30, isRealTime=False) # 어제 데이터는 지운다. #data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])] bsLine = self.buySellChecker.checkTransaction(data, isRealTime=False) # 그래프를 그린다. self.draw(stock_code, ymd, data, bsLine) return if __name__ == "__main__": PROJECT_HOME = os.getcwd() RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") #day_list = ['20231016', '20231017', '20231018', '20231019', '20231020', '20231023', '20231024', '20231025', '20231026', '20231027', '20231030', '20231031', '20231101', '20231102'] day_list = ['20231113'] stocks = [ {'code': '252670', 'name': 'KODEX 200선물인버스2X', 'ymd': day_list}, {'code': '122630', 'name': 'KODEX 레버리지', 'ymd': day_list}, {'code': '233740', 'name': 'KODEX 코스닥150레버리지', 'ymd': day_list}, {'code': '251340', 'name': 'KODEX 코스닥150선물인버스', 'ymd': day_list} ] for stock in stocks: simulation = Simulation(RESOURCE_PATH) simulation.simulate(stock) print ("done...")