import os import csv import logging from math import nan import pandas as pd import plotly.graph_objects as go from plotly import subplots from hts.HTS import HTS from hts.BuySellChecker import BuySellChecker class LabelMaker (HTS): buySellChecker = None def __init__(self, RESOURCE_PATH): super().__init__(RESOURCE_PATH) self.buySellChecker = BuySellChecker() return def checkTransaction(self, data, duration=60): bsLine = {} size = len(data["close"]) # Type=False, 시뮬레이션 적용 bsLine['buy'] = [-1 for i in range(size)] bsLine['buy_weight'] = [-1 for i in range(size)] bsLine['sell'] = [-1 for i in range(size)] bsLine['sell_weight'] = [-1 for i in range(size)] # 매수 처리 p_min_price = 999999999 # - 10을 한 것은 3시 11분까지 매수를 하기 위함임 for i in range(size-duration-10): min_price, min_price_c, = 9999999, -1 for c in range(i, i+duration): if min(data["close"][c], data["close"][c]) < min_price: min_price = min(data["close"][c], data["close"][c]) min_price_c = c # 매수 처리 if min_price_c > 0: isValid = True for c in range(2, 10): if bsLine['buy'][min_price_c - c] > 0: isValid = False break if isValid: if min_price < p_min_price: bsLine['buy'][min_price_c] = min_price bsLine['buy_weight'][min_price_c] = 1 p_min_price = min_price # 매도 처리 buy_i = [c for c in range(size) if bsLine['buy'][c] > 0] for i in range(len(buy_i)): if i < len(buy_i)-1: boundry = data["high"][buy_i[i]+1: buy_i[i+1]] if len(boundry) > 1: max_price = max(boundry) for c in range(buy_i[i], buy_i[i+1]): if data["high"][c] == max_price: isValid = False for d in range(c - 1, 0, -1): if bsLine['sell'][d] > 0 and bsLine['buy'][d] < 0: break if bsLine['sell'][d] < 0 and bsLine['buy'][d] > 0: isValid = True break if isValid: bsLine['sell'][c] = max_price bsLine['sell_weight'][c] = 1 else: # - 5를 한 것은 3시 16분까지 매도를 하기 위함임 boundry = data["high"][buy_i[i] + 1: len(data["high"])-5] if len(boundry) > 1: max_price = max(boundry) for c in range(buy_i[i], len(data["high"])): if data["high"][c] == max_price: isValid = False for d in range(c-1, 0, -1): if bsLine['sell'][d] > 0 and bsLine['buy'][d] < 0 : break if bsLine['sell'][d] < 0 and bsLine['buy'][d] > 0 : isValid = True break if isValid: bsLine['sell'][c] = max_price bsLine['sell_weight'][c] = 1 return bsLine, data def draw_simple(self, stock_code, given_day, data, bsLine): buy_line = bsLine['buy'] sell_line = bsLine['sell'] buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan else: buy_colors.append("#ff00ff") sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=14, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue') candle_data = [candle_stick, buy_check, sell_check] df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) self.fig = go.FigureWidget(data=candle_data) self.buy_data = self.fig.data[1] self.fig.update_layout(height=1000, title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count)) self.fig.show() return def draw_detail(self, stock_code, given_day, data, bsLine): buy_line = bsLine['buy'] sell_line = bsLine['sell'] # 그래프 설정을 위한 변수를 생성한다. data = data.astype({'open': 'int', 'high': 'int', 'low': 'int', 'close': 'int', 'volume': 'int', 'avg3': 'float', 'avg5': 'float', 'avg10': 'float', 'avg20': 'float', 'avg30': 'float', 'avg60': 'float', 'fast_k': 'float', 'slow_k': 'float', 'slow_d': 'float', 'rsi': 'float', 'rsis': 'float' }) buy_colors = [] for i in range(len(buy_line)): if buy_line[i] < 0: buy_colors.append("#ffffff") buy_line[i] = nan else: buy_colors.append("#ff00ff") sell_colors = [] for i in range(len(sell_line)): if sell_line[i] < 0: sell_colors.append("#ffffff") sell_line[i] = nan else: sell_colors.append("#00ced1") # 그래프를 설정한다. buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=14, color=buy_colors, line_width=0)) sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0)) upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000') lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000') avg3 = go.Scatter(x=data['date'], y=data["avg3"], name="avg3", line_color='#1469F4') avg5 = go.Scatter(x=data['date'], y=data["avg5"], name="avg5", line_color='#089B5B') avg10 = go.Scatter(x=data['date'], y=data["avg10"], name="avg10", line_color='#ff00ff') avg20 = go.Scatter(x=data['date'], y=data["avg20"], name="avg20", line_color='#8F8203') avg30 = go.Scatter(x=data['date'], y=data["avg30"], name="avg30", line_color='#000000') candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue') volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume') #fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k') macd_line = go.Scatter(x=data['date'], y=data["macd"], mode='lines', name='macd') macd_s_line = go.Scatter(x=data['date'], y=data["macds"], mode='lines', name='macds') macd_o_line = go.Scatter(x=data['date'], y=data["macdo"], mode='lines', name='macdo') slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], mode='lines', name='slow_k') slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], mode='lines', name='slow_d') rsi_line = go.Scatter(x=data['date'], y=data["rsi"], mode='lines', name='rsi') rsis_line = go.Scatter(x=data['date'], y=data["rsis"], mode='lines', name='rsis') candle_data = [candle_stick, upper, lower, avg3, avg5, avg10, avg20, avg30, buy_check, sell_check] volume_data = [volume_line] macd_data = [macd_line, macd_s_line, macd_o_line] stochastic_data = [slow_k_line, slow_d_line] rsi_data = [rsi_line, rsis_line] fig = subplots.make_subplots(rows=5, cols=1, subplot_titles=('캔들', "거래량", "MACD", "스토캐스틱", "RSI")) for trace in candle_data: fig.append_trace(trace, 1, 1) for trace in volume_data: fig.append_trace(trace, 2, 1) for trace in macd_data: fig.append_trace(trace, 3, 1) for trace in stochastic_data: fig.append_trace(trace, 4, 1) for trace in rsi_data: fig.append_trace(trace, 5, 1) #fig.update_xaxes(nticks=5) #fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False) df = pd.DataFrame(bsLine) df = df.fillna(-1) buy_count = len(df.loc[df["buy"] > 0]) sell_count = len(df.loc[df["sell"] > 0]) fig.update_layout(height=3000, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count)) fig.show() return def writeLabelFile(self, stock_code, bsLine, data, ymd): if not os.path.isdir(os.path.join(self.RESOURCE_PATH, "tmp")): os.mkdir(os.path.join(self.RESOURCE_PATH, "tmp")) outFileName = os.path.join(self.RESOURCE_PATH, "tmp", stock_code+"."+ymd+".sell.csv") with open(outFileName, "w", encoding="utf-8") as outFp: writer = csv.writer(outFp) for i, price in enumerate(bsLine["sell"]): if price != -1: writer.writerow([data['date'][i], bsLine["sell"][i]]) outFileName = os.path.join(self.RESOURCE_PATH, "tmp", stock_code+"."+ymd + ".buy.csv") with open(outFileName, "w", encoding="utf-8") as outFp: writer = csv.writer(outFp) for i, price in enumerate(bsLine["buy"]): if price != -1: writer.writerow([data['date'][i], bsLine["buy"][i]]) return def makeCandidate(self, stock_code, ymd, view=False): result = {"check": set(), "time": [], "open": [], "close": [], "high": [], "low": [], "vol": [], "label": []} self.getDBData(stock_code, ymd, result) data = self.buySellChecker.analyze(result) bsLine, data = self.checkTransaction(data) if view: self.draw_simple(stock_code, ymd, data, bsLine) #self.draw_detail(stock_code, ymd, data, bsLine) return bsLine, data def showLabels(self, stock_code, ymd): result = {"time": [], "open": [], "close": [], "high": [], "low": [], "vol": [], "label": []} self.getDBData(stock_code, ymd, result) result["date"] = result["time"] data = pd.DataFrame(result) df_final_time = pd.DatetimeIndex(result["date"]) data.index = df_final_time bsLine = {} size = len(result["close"]) # Type=False, 시뮬레이션 적용 bsLine['buy'] = [-1 for i in range(size)] for i in range(size): if result["label"][i] == 2: bsLine['buy'][i] = result["close"][i] bsLine['sell'] = [-1 for i in range(size)] for i in range(size): if result["label"][i] == 1: bsLine['sell'][i] = result["close"][i] self.draw_simple(stock_code, ymd, data, bsLine) return def getDate(self, stock_code): ymds = self.getYMD(stock_code) return ymds if __name__ == "__main__": PROJECT_HOME = os.path.join(os.path.dirname(os.path.join(os.path.dirname(os.path.join(os.path.dirname(__file__)))))) RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") db_filename = os.path.join(RESOURCE_PATH, "hts.db") labelMaker = LabelMaker(RESOURCE_PATH) view = True stock_code = "122630" ymd = "20220811" if view: labelMaker.makeLabel(db_filename, "122630", "20220811", "0905", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "0910", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "0911", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1030", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1034", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1109", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1110", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1111", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1207", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1211", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1228", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1229", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1230", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1231", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1232", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1249", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1303", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1304", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1420", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1421", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1436", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1437", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "1438", 2) labelMaker.makeLabel(db_filename, "122630", "20220811", "0949", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1000", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1001", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1016", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1017", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1045", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1049", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1126", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1127", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1315", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1316", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1317", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1458", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1459", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1500", 1) labelMaker.makeLabel(db_filename, "122630", "20220811", "1501", 1) labelMaker.showLabels(stock_code, ymd) else: stock_codes = ["252670", "122630"] for stock_code in stock_codes: ymds = labelMaker.getDate(stock_code) for ymd in ymds: logging.info(stock_code, ymd) bsLine, data = labelMaker.makeCandidate(stock_code, ymd) labelMaker.updateLabel(db_filename, stock_code, bsLine, data, ymd)