from math import nan import pandas as pd import plotly.graph_objects as go from plotly import subplots import os from hts.HTS import HTS from stock.util.Stock2Vector import Stock2Vector from stock.util.LabelChecker import LabelChecker from stock.util.StockPredictor import StockPredictor from hts.BuySellChecker import BuySellChecker from stock.analysis.StockStatus import StockStatus class Simulation (HTS): stock2Vector = None buySellChecker = None stockPredictor = None def __init__(self, RESOURCE_PATH): super().__init__(RESOURCE_PATH) self.RESOURCE_PATH = RESOURCE_PATH self.buySellChecker = BuySellChecker() try: self.stock2Vector = Stock2Vector(RESOURCE_PATH) self.labelChecker = LabelChecker(RESOURCE_PATH) self.stockPredictor = StockPredictor(RESOURCE_PATH) except: pass #self.connect() return def draw(self, stock_code, given_day, data, bsLine): if bsLine is None: return # 어제 데이터는 지운다. data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])] buy_line = bsLine['buy'][381:] buy_weight_line = bsLine['buy_weight'][381:] sell_line = bsLine['sell'][381:] # 그래프 설정을 위한 변수를 생성한다. data = data.astype({'open': 'int', 'high': 'int', 'low': 'int', 'close': 'int', 'volume': 'int', 'avg3': 'float', 'avg6': 'float', 'avg9': 'float', 'avg12': 'float', 'avg20': 'float', 'fast_k': 'float', 'slow_k': 'float', 'slow_d': 'float', 'rsi': 'float', 'rsis': 'float' }) buy_size = [] buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan buy_size.append(0) else: buy_colors.append("#B2028C") buy_size.append(10 + (5 * buy_weight_line[i])) sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000') lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000') avg3 = go.Scatter(x=data['date'], y=data["avg3"], name="avg3", line_color='#8F8203') avg6 = go.Scatter(x=data['date'], y=data["avg6"], name="avg6", line_color='#089B5B') avg9 = go.Scatter(x=data['date'], y=data["avg9"], name="avg9", line_color='#ff00ff') avg12 = go.Scatter(x=data['date'], y=data["avg12"], name="avg12", line_color='#1469F4') avg20 = go.Scatter(x=data['date'], y=data["avg20"], name="avg20", line_color='#000000') laggingSpan = go.Scatter(x=data['date'], y=data["laggingSpan"], name='laggingSpan', line_color='#B50ABB') changeLine = go.Scatter(x=data['date'], y=data["changeLine"], name='changeLine', line_color='#14A200') baseLine = go.Scatter(x=data['date'], y=data["baseLine"], name='baseLine', line_color='#CF6E0D') candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False) #volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume') volume_line = go.Bar(x=data['date'], y=data["volume"], marker_color='red', name='volume') disparity_avg5 = go.Scatter(x=data['date'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203') disparity_avg10 = go.Scatter(x=data['date'], y=data["disparity_avg10"], name="disparity_avg10", line_color='#089B5B') disparity_avg20 = go.Scatter(x=data['date'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff') disparity_avg60 = go.Scatter(x=data['date'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4') disparity_avg120 = go.Scatter(x=data['date'], y=data["disparity_avg120"], name="disparity_avg120", line_color='#000000') macd_line = go.Scatter(x=data['date'], y=data["macd"], line=dict(color='red', width=2), name='macd') macd_s_line = go.Scatter(x=data['date'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds') macd_o_line = go.Bar(x=data['date'], y=data["macdo"], marker_color='purple', name='macdo') # fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k') slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d') rsi_line = go.Scatter(x=data['date'], y=data["rsi"], line=dict(color='red', width=2), name='rsi') rsis_line = go.Scatter(x=data['date'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis') candle_data = [candle_stick, upper, lower, avg3, avg6, avg9, avg12, avg20, buy_check, sell_check, laggingSpan, changeLine, baseLine] volume_data = [volume_line] disparity_data = [disparity_avg5, disparity_avg10, disparity_avg20, disparity_avg60, disparity_avg120] macd_data = [macd_line, macd_s_line, macd_o_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] # 그래프를 그린다. """ fig = go.Figure(data=candle_data) fig.update_layout(title=stock_code + "_" + given_day) fig.show() """ fig = subplots.make_subplots( rows=6, cols=1, subplot_titles=("MACD", "스토캐스틱", "RSI", "거래량", "이격도", '캔들'), #specs=[[{}], [{}], [{}], [{}], [{}], [{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01, row_heights=[200, 200, 200, 200, 200, 700] ) for trace in macd_data: fig.append_trace(trace, 1, 1) for trace in stochastic_data: fig.append_trace(trace, 2, 1) for trace in rsi_data: fig.append_trace(trace, 3, 1) for trace in volume_data: fig.append_trace(trace, 4, 1) for trace in disparity_data: fig.append_trace(trace, 5, 1) for trace in candle_data: fig.append_trace(trace, 6, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) #fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count)) fig.show() return def makeTickData(self, data, mins=30): result = {"check": set(), "time": [], "open": [], "close": [], "high": [], "low": [], "vol": [], "label": []} for i in range(mins, len(data['time'])+1): result["check"].add(data['time'][i]) result["time"].append(data['time'][i]) result["open"].append(data['open'][i-mins]) result["close"].append(data['close'][i-1]) result["high"].append(max(data['high'][i - 30: i])) result["low"].append(min(data['low'][i - 30: i])) result["vol"].append(sum(data[i - 30: i])) return result def simulate(self, stock_codes:dict=None): if stock_codes is not None: for stock_code in stock_codes: for given_day in stock_codes[stock_code]: LAST_DATA = self.stock2Vector.getLastData(stock_code, given_day) result = self.stock2Vector.getRealTime(stock_code, given_day, LAST_DATA) result_30 = self.makeTickData(result, min=30) # 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다. data1 = self.buySellChecker.analyze(result) # 사야 할 시점과 팔아야 할 시점을 체크한다. bsLine, data = self.buySellChecker.checkTransaction(data, stock_code, isRealTime=False) # 그래프를 그린다. self.draw(stock_code, given_day, data, bsLine) else: stockStatus = StockStatus(self.RESOURCE_PATH) stockStatus.checkEnvelope() return if __name__ == "__main__": a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] for i in range(3, len(a)+1): print (a[i-3: i]) print (a[i - 3]) print (a[i - 1]) PROJECT_HOME = os.getcwd() RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") simulation = Simulation(RESOURCE_PATH) # to check bying stock_codes = { "252670": [ '20220901', '20220902', '20220905', '20220906', '20220907', '20220908','20220913','20220914','20220915','20220916' ], "122630": [ '20220901', '20220902', '20220905', '20220906', '20220907', '20220908', '20220913', '20220914', '20220915', '20220916' ] } #simulation.simulate(stock_codes) simulation.simulate(stock_codes) print ("done...")