from math import nan import pandas as pd import plotly.graph_objects as go from plotly import subplots import os from hts.HTS import HTS from stock.util.Stock2Vector import Stock2Vector from stock.util.StockPredictor import StockPredictor from hts.BuySellChecker import BuySellChecker class Simulation (HTS): stock2Vector = None buySellChecker = None def __init__(self, RESOURCE_PATH): super().__init__(RESOURCE_PATH) self.stock2Vector = Stock2Vector(RESOURCE_PATH) self.stockPredictor = StockPredictor() self.buySellChecker = BuySellChecker() self.RESOURCE_PATH = RESOURCE_PATH #self.connect() return def draw(self, stock_code, given_day, data, bsLine): # 어제 데이터는 지운다. data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])] buy_line = bsLine['buy'][381:] sell_line = bsLine['sell'][381:] #buy_line = bsLine['buy'] #sell_line = bsLine['sell'] # 그래프 설정을 위한 변수를 생성한다. data = data.astype({'open': 'int', 'high': 'int', 'low': 'int', 'close': 'int', 'volume': 'int', 'avg3': 'float', 'avg5': 'float', 'avg10': 'float', 'avg20': 'float', 'avg30': 'float', 'avg60': 'float', 'fast_k': 'float', 'slow_k': 'float', 'slow_d': 'float', 'rsi': 'float', 'rsis': 'float' }) buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan else: buy_colors.append("#ff00ff") sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=14, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000') lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000') avg3 = go.Scatter(x=data['date'], y=data["avg3"], name="avg3", line_color='#1469F4') avg5 = go.Scatter(x=data['date'], y=data["avg5"], name="avg5", line_color='#089B5B') avg10 = go.Scatter(x=data['date'], y=data["avg10"], name="avg10", line_color='#ff00ff') avg20 = go.Scatter(x=data['date'], y=data["avg20"], name="avg20", line_color='#8F8203') avg30 = go.Scatter(x=data['date'], y=data["avg30"], name="avg30", line_color='#000000') #avg60 = go.Scatter(x=hts['date'], y=hts["avg60"], name="avg60", line_color='#008000') candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue') volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume') #fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') macd_line = go.Scatter(x=data['date'], y=data["macd"], mode='lines', name='macd') macd_s_line = go.Scatter(x=data['date'], y=data["macds"], mode='lines', name='macds') macd_o_line = go.Scatter(x=data['date'], y=data["macdo"], mode='lines', name='macdo') slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], mode='lines', name='slow_k') slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], mode='lines', name='slow_d') rsi_line = go.Scatter(x=data['date'], y=data["rsi"], mode='lines', name='rsi') rsis_line = go.Scatter(x=data['date'], y=data["rsis"], mode='lines', name='rsis') candle_data = [candle_stick, upper, lower, avg3, avg5, avg10, avg20, avg30, buy_check, sell_check] volume_data = [volume_line] macd_data = [macd_line, macd_s_line, macd_o_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] # 그래프를 그린다. """ fig = go.Figure(data=candle_data) fig.update_layout(title=stock_code + "_" + given_day) fig.show() """ fig = subplots.make_subplots(rows=5, cols=1, subplot_titles=('캔들', "거래량", "MACD", "스토캐스틱", "RSI")) for trace in candle_data: fig.append_trace(trace, 1, 1) for trace in volume_data: fig.append_trace(trace, 2, 1) for trace in macd_data: fig.append_trace(trace, 3, 1) for trace in stochastic_data: fig.append_trace(trace, 4, 1) for trace in rsi_data: fig.append_trace(trace, 5, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=5000, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) fig.show() return def simulate(self, stock_code, today, method="rule"): if method == "ml": LAST_DATA = self.stock2Vector.getLastData(stock_code, today, n=10) result = self.stock2Vector.getRealTime(stock_code, today, LAST_DATA) df, minmax_df = self.stock2Vector.preprocessData(result) bsLine, data = self.stockPredictor.predict(df, minmax_df, isRealTime=False) else: LAST_DATA = self.stock2Vector.getLastData(stock_code, today) result = self.stock2Vector.getRealTime(stock_code, today, LAST_DATA) # 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다. data = self.buySellChecker.analyze(result) # 사야 할 시점과 팔아야 할 시점을 체크한다. bsLine, data = self.buySellChecker.checkTransaction(data, stock_code, isRealTime=False) if data is not None: # 그래프를 그린다. self.draw(stock_code, today, data, bsLine) return if __name__ == "__main__": PROJECT_HOME = os.path.join(os.path.dirname(__file__)) RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") # to check bying stock_codes = { # 252670 # 122630 "122630": ['20220803'], } for stock_code in stock_codes: simulation = Simulation(RESOURCE_PATH) for given_day in stock_codes[stock_code]: simulation.simulate(stock_code, given_day, method='ml') print ("done...")