import numpy as np from math import nan import pandas as pd import plotly.graph_objects as go from plotly import subplots import os from hts.HTS import HTS from stock.util.Stock2Vector import Stock2Vector from stock.util.LabelChecker import LabelChecker from stock.util.StockPredictor import StockPredictor from hts.BuySellChecker import BuySellChecker class Simulation (HTS): stock2Vector = None buySellChecker = None stockPredictor = None def __init__(self, RESOURCE_PATH): super().__init__(RESOURCE_PATH) self.RESOURCE_PATH = RESOURCE_PATH self.buySellChecker = BuySellChecker() try: self.stock2Vector = Stock2Vector(RESOURCE_PATH) self.labelChecker = LabelChecker(RESOURCE_PATH) self.stockPredictor = StockPredictor(RESOURCE_PATH) except: pass #self.connect() return def draw(self, stock_code, given_day, data, bsLine): if bsLine is None: return # 어제 데이터는 지운다. data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])] buy_line = bsLine['buy'][381:] buy_weight_line = bsLine['buy_weight'][381:] sell_line = bsLine['sell'][381:] # 그래프 설정을 위한 변수를 생성한다. data = data.astype({'open': 'int', 'high': 'int', 'low': 'int', 'close': 'int', 'volume': 'int', 'avg3': 'float', 'avg6': 'float', 'avg9': 'float', 'avg12': 'float', 'avg27': 'float', 'avg54': 'float', 'fast_k': 'float', 'slow_k': 'float', 'slow_d': 'float', 'rsi': 'float', 'rsis': 'float' }) buy_size = [] buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan buy_size.append(0) else: buy_colors.append("#B2028C") buy_size.append(10 + (5 * buy_weight_line[i])) sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000') lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000') avg3 = go.Scatter(x=data['date'], y=data["avg3"], name="avg3", line_color='#8F8203') avg6 = go.Scatter(x=data['date'], y=data["avg6"], name="avg6", line_color='#089B5B') avg9 = go.Scatter(x=data['date'], y=data["avg9"], name="avg9", line_color='#ff00ff') avg12 = go.Scatter(x=data['date'], y=data["avg12"], name="avg12", line_color='#1469F4') avg27 = go.Scatter(x=data['date'], y=data["avg27"], name="avg27", line_color='#000000') avg54 = go.Scatter(x=data['date'], y=data["avg54"], name="avg54", line_color='#008000') candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue') volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume') #fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') macd_line = go.Scatter(x=data['date'], y=data["macd"], mode='lines', name='macd') macd_s_line = go.Scatter(x=data['date'], y=data["macds"], mode='lines', name='macds') macd_o_line = go.Scatter(x=data['date'], y=data["macdo"], mode='lines', name='macdo') slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], mode='lines', name='slow_k') slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], mode='lines', name='slow_d') rsi_line = go.Scatter(x=data['date'], y=data["rsi"], mode='lines', name='rsi') rsis_line = go.Scatter(x=data['date'], y=data["rsis"], mode='lines', name='rsis') candle_data = [candle_stick, upper, lower, avg3, avg6, avg9, avg12, avg27, avg54, buy_check, sell_check] volume_data = [volume_line] macd_data = [macd_line, macd_s_line, macd_o_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] # 그래프를 그린다. """ fig = go.Figure(data=candle_data) fig.update_layout(title=stock_code + "_" + given_day) fig.show() """ fig = subplots.make_subplots(rows=5, cols=1, subplot_titles=('캔들', "거래량", "MACD", "스토캐스틱", "RSI")) for trace in candle_data: fig.append_trace(trace, 1, 1) for trace in volume_data: fig.append_trace(trace, 2, 1) for trace in macd_data: fig.append_trace(trace, 3, 1) for trace in stochastic_data: fig.append_trace(trace, 4, 1) for trace in rsi_data: fig.append_trace(trace, 5, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=5000, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) fig.show() return def simulate(self, stock_code, today, method="rule"): if method == "answer": #self.labelMaker.makeCandidate(stock_code, today, view=True) self.labelChecker.showLabels(stock_code, today) else: if method == "ml": LAST_DATA = self.stock2Vector.getLastData(stock_code, today, n=3) data = self.stock2Vector.getRealTime(stock_code, today, LAST_DATA) X, Y = self.stock2Vector.getVectorData(data) predY = self.stockPredictor.predict(X, Y) bsLine = None else: LAST_DATA = self.stock2Vector.getLastData(stock_code, today) result = self.stock2Vector.getRealTime(stock_code, today, LAST_DATA) # 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다. data = self.buySellChecker.analyze(result) # 사야 할 시점과 팔아야 할 시점을 체크한다. bsLine, data = self.buySellChecker.checkTransaction(data, stock_code, isRealTime=False) if data is not None: # 그래프를 그린다. self.draw(stock_code, today, data, bsLine) return if __name__ == "__main__": PROJECT_HOME = os.getcwd() RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") # to check bying stock_codes = { "252670": ['20220801', '20220802', '20220803', '20220804', '20220805', '20220808', '20220809', '20220810', '20220811', '20220812', '20220816', '20220817'], "122630": ['20220801', '20220802', '20220803', '20220804', '20220805', '20220808', '20220809', '20220810', '20220811', '20220812', '20220816', '20220817'], } stock_codes = { "252670": ['20220817'], "122630": ['20220817'], } method = "rule" # "rule", "ml", "answer" for stock_code in stock_codes: simulation = Simulation(RESOURCE_PATH) for given_day in stock_codes[stock_code]: simulation.simulate(stock_code, given_day, method=method) print ("done...")