import numpy as np from math import nan import shutil import sqlite3 import pandas as pd import plotly.graph_objects as go from plotly import subplots import plotly.io as po import os from datetime import datetime from hts.HTS import HTS from hts.DailyStatus import DailyStatus from hts.BuySellChecker import BuySellChecker class Simulation (HTS): buySellChecker = None stockPredictor = None dailyStatus = None def __init__(self, RESOURCE_PATH): super().__init__(RESOURCE_PATH) self.RESOURCE_PATH = RESOURCE_PATH self.buySellChecker = BuySellChecker() self.dailyStatus = DailyStatus(RESOURCE_PATH) return def draw(self, stock_code, given_day, data, bsLine): if bsLine is None: return # 어제 데이터는 지운다. buy_line = bsLine['buy'] buy_weight_line = bsLine['buy_weight'] sell_line = bsLine['sell'] buy_size = [] buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan buy_size.append(0) else: buy_colors.append("#B2028C") buy_size.append(10 + (0.1 * buy_weight_line[i])) sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['ymd'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['ymd'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) envelope_upper = go.Scatter(x=data['ymd'], y=data["envelope_upper"], name="upper", line_color='#000000') envelope_middle = go.Scatter(x=data['ymd'], y=data["envelope_middle"], name="upper", line_color='#927786') envelope_lower = go.Scatter(x=data['ymd'], y=data["envelope_lower"], name="lower", line_color='#000000') avg5 = go.Scatter(x=data['ymd'], y=data["avg5"], name="avg5", line_color='#6C2507') avg20 = go.Scatter(x=data['ymd'], y=data["avg20"], name="avg20", line_color='#f84c43') avg60 = go.Scatter(x=data['ymd'], y=data["avg60"], name="avg60", line_color='#f89543') candle_stick = go.Candlestick(x=data['ymd'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False) macd_line = go.Scatter(x=data['ymd'], y=data["macd"], line=dict(color='red', width=2), name='macd') macd_s_line = go.Scatter(x=data['ymd'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds') # fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') slow_k_line = go.Scatter(x=data['ymd'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k') slow_d_line = go.Scatter(x=data['ymd'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d') rsi_line = go.Scatter(x=data['ymd'], y=data["rsi"], line=dict(color='red', width=2), name='rsi') rsis_line = go.Scatter(x=data['ymd'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis') disparity_avg5 = go.Scatter(x=data['ymd'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203') disparity_avg20 = go.Scatter(x=data['ymd'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff') disparity_avg60 = go.Scatter(x=data['ymd'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4') candle_data = [candle_stick, avg5, avg20, avg60, envelope_upper, envelope_middle, envelope_lower, buy_check, sell_check] disparity_data = [disparity_avg5, disparity_avg20, disparity_avg60] macd_data = [macd_line, macd_s_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] # 그래프를 그린다. """ fig = go.Figure(data=candle_data) fig.update_layout(title=stock_code + "_" + given_day) fig.show() """ fig = subplots.make_subplots( rows=5, cols=1, subplot_titles=("MACD", "RSI", "스토캐스틱", '이격도', '캔들'), #specs=[[{}], [{}], [{}], [{}], [{}], [{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01, row_heights=[200, 200, 200, 200, 750] ) for trace in macd_data: fig.append_trace(trace, 1, 1) for trace in rsi_data: fig.append_trace(trace, 2, 1) for trace in stochastic_data: fig.append_trace(trace, 3, 1) for trace in disparity_data: fig.append_trace(trace, 4, 1) for trace in candle_data: fig.append_trace(trace, 5, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) #fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count)) fig.show() return def writeFile(self, dailyDirName, stock_code, stock_name, given_day, data, bsLine): if bsLine is None: return # 어제 데이터는 지운다. buy_line = bsLine['buy'] buy_weight_line = bsLine['buy_weight'] sell_line = bsLine['sell'] buy_size = [] buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan buy_size.append(0) else: buy_colors.append("#B2028C") buy_size.append(10 + (0.1 * buy_weight_line[i])) sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['ymd'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['ymd'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) envelope_upper = go.Scatter(x=data['ymd'], y=data["envelope_upper"], name="upper", line_color='#000000') envelope_middle = go.Scatter(x=data['ymd'], y=data["envelope_middle"], name="upper", line_color='#927786') envelope_lower = go.Scatter(x=data['ymd'], y=data["envelope_lower"], name="lower", line_color='#000000') avg5 = go.Scatter(x=data['ymd'], y=data["avg5"], name="avg5", line_color='#6C2507') avg20 = go.Scatter(x=data['ymd'], y=data["avg20"], name="avg20", line_color='#f84c43') avg60 = go.Scatter(x=data['ymd'], y=data["avg60"], name="avg60", line_color='#f89543') candle_stick = go.Candlestick(x=data['ymd'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False) macd_line = go.Scatter(x=data['ymd'], y=data["macd"], line=dict(color='red', width=2), name='macd') macd_s_line = go.Scatter(x=data['ymd'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds') # fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') slow_k_line = go.Scatter(x=data['ymd'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k') slow_d_line = go.Scatter(x=data['ymd'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d') rsi_line = go.Scatter(x=data['ymd'], y=data["rsi"], line=dict(color='red', width=2), name='rsi') rsis_line = go.Scatter(x=data['ymd'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis') disparity_avg5 = go.Scatter(x=data['ymd'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203') disparity_avg20 = go.Scatter(x=data['ymd'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff') disparity_avg60 = go.Scatter(x=data['ymd'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4') candle_data = [candle_stick, avg5, avg20, avg60, envelope_upper, envelope_middle, envelope_lower, buy_check, sell_check] disparity_data = [disparity_avg5, disparity_avg20, disparity_avg60] macd_data = [macd_line, macd_s_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] # 그래프를 그린다. """ fig = go.Figure(data=candle_data) fig.update_layout(title=stock_code + "_" + given_day) fig.show() """ fig = subplots.make_subplots( rows=5, cols=1, subplot_titles=("MACD", "RSI", "스토캐스틱", '이격도', '캔들'), #specs=[[{}], [{}], [{}], [{}], [{}], [{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01, row_heights=[200, 200, 200, 200, 750] ) for trace in macd_data: fig.append_trace(trace, 1, 1) for trace in rsi_data: fig.append_trace(trace, 2, 1) for trace in stochastic_data: fig.append_trace(trace, 3, 1) for trace in disparity_data: fig.append_trace(trace, 4, 1) for trace in candle_data: fig.append_trace(trace, 5, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) #fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count)) #fig.show() fileName = "%s/%s_%s.html" % (dailyDirName, stock_code, stock_name.replace(" ", "")) po.write_html(fig, file=fileName, auto_open=False) return def getData(self, today, stock): close = stock['PRICE'][len(stock['PRICE']) - 1]["close"] open = stock['PRICE'][len(stock['PRICE']) - 1]["open"] high = stock['PRICE'][len(stock['PRICE']) - 1]["high"] low = stock['PRICE'][len(stock['PRICE']) - 1]["low"] volume = stock['PRICE'][len(stock['PRICE']) - 1]["volume"] stock['PRICE'].append( { "ymd": today, "close": close, "diff": stock['PRICE'][len(stock['PRICE']) - 1]["close"] - close, "open": open, "high": high, "low": low, "volume": volume, "avg3": -1, "avg4": -1, "avg5": -1, "avg6": -1, "avg10": -1, "avg12": -1, "avg20": -1, "avg36": -1, "avg40": -1, "avg48": -1, "avg60": -1, "avg120": -1, "avg200": -1, "avg240": -1, "avg300": -1, "disparity_avg5": -1, "disparity_avg10": -1, "disparity_avg20": -1, "disparity_avg60": -1, "disparity_avg120": -1, "bolingerband_upper": -1, "bolingerband_lower": -1, "bolingerband_middle": -1, "envelope_upper": -1, "envelope_lower": -1, "envelope_middle": -1, "ichimokucloud_changeLine": -1, "ichimokucloud_baseLine": -1, "ichimokucloud_leadingSpan1": -1, "ichimokucloud_leadingSpan2": -1, "stochastic_fast_k": -1, "stochastic_slow_k": -1, "stochastic_slow_d": -1, "rsi": -1, "rsis": -1, "macd": -1, "macds": -1, "macdo": -1, } ) return def simulate(self, stock_code=None, isRealTime=False): if not isRealTime: n = 200 else: n = 200 if stock_code is not None: stock = self.dailyStatus.getLastData(stock_code, n) today = datetime.today().strftime('%Y%m%d') self.getData(today, stock) analyzed_day = 60 data = self.dailyStatus.analyze(stock, analyzed_day) # 분석일 데이터만 활용한다 (이전 데이터는 제거) data.drop(data.index[:analyzed_day], inplace=True) # print logs for i in range(len(data.index)): print (i, data.index[i], data['macd'][i], data['slow_k'][i], data['gradients_low'][i], data['gradients_avg5'][i], data['gradients_avg20'][i], data['gradients_avg60'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity'][i], data['disparity_type'][i]) bsLine, data = self.buySellChecker.checkTransactionWithEnvelope(data, stock_code, isRealTime=False) # 그래프를 그린다. self.draw(stock_code, today, data, bsLine) else: stockTableName = 'stock' PROJECT_HOME = os.path.join(os.path.dirname(__file__)) stockFileName = PROJECT_HOME + '/resources/stock.db' conn = sqlite3.connect(stockFileName) cursor = conn.cursor() cursor.execute('SELECT distinct code, name FROM ' + stockTableName + ' order by code') items = cursor.fetchall() cursor.close() conn.close() today = datetime.today().strftime('%Y%m%d') dailyDirName = os.path.join(RESOURCE_PATH, 'analysis', today, 'daily') if os.path.exists(dailyDirName): shutil.rmtree(dailyDirName) dailyDirName = os.path.join(RESOURCE_PATH, 'analysis', today) if not os.path.isdir(dailyDirName): os.mkdir(dailyDirName) dailyDirName = os.path.join(dailyDirName, 'daily') if not os.path.isdir(dailyDirName): os.mkdir(dailyDirName) for idx, item in enumerate(items): stock_code = item[0] stock_name = item[1] print(idx, stock_code, stock_name) print("Analysis # :", idx, ", CODE: ", stock_code, ", NAME: ", stock_name) stock = self.dailyStatus.getLastData(stock_code, n) today = datetime.today().strftime('%Y%m%d') self.getData(today, stock) analyzed_day = 60 data = self.dailyStatus.analyze(stock, analyzed_day) # 분석일 데이터만 활용한다 (이전 데이터는 제거) data.drop(data.index[:analyzed_day], inplace=True) # print logs # for i in range(len(data.index)): # print (i, data.index[i], data['macd'][i], data['slow_k'][i], data['gradients_low'][i], data['gradients_avg5'][i], data['gradients_avg20'][i], data['gradients_avg60'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity'][i], data['disparity_type'][i]) bsLine, data = self.buySellChecker.checkTransactionWithEnvelope(data, stock_code, isRealTime=False) # 그래프를 그린다. if len(data.index) > 10 and max(bsLine['buy'][len(bsLine['buy'])-2:]) > 0: self.writeFile(dailyDirName, stock_code, stock_name, today, data, bsLine) return if __name__ == "__main__": PROJECT_HOME = os.getcwd() RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") simulation = Simulation(RESOURCE_PATH) # to check buying #stock_codes = ["252670", "122630"] #stock_codes = ["051600", "139130", "066570", "000990"] stock_codes = ["102950"] """ for stock_code in stock_codes: simulation.simulate(stock_code) """ simulation.simulate() print ("done...")