import os import sqlite3 from datetime import datetime, timedelta import pandas as pd from stock.analysis.Common import Common from stock.analysis.Stochastic import Stochastic from stock.analysis.RSI import RSI from stock.analysis.MACD import MACD from stock.analysis.IchimokuCloud import IchimokuCloud class Stock2Vector: RESOURCE_PATH = None common = None stochastic = None rsi = None macd = None ichimokuCloud = None def __init__(self, RESOURCE_PATH): self.RESOURCE_PATH = RESOURCE_PATH self.common = Common() self.stochastic = Stochastic() self.rsi = RSI() self.macd = MACD() self.ichimokuCloud = IchimokuCloud() return def analyze(self, result): open = result["open"] close = result["close"] high = result["high"] low = result["low"] vol = result["vol"] close_df = pd.DataFrame(close) avg3_list = close_df.rolling(window=3).mean().fillna(close[0]).values.tolist() avg3 = [item[0] for item in avg3_list] avg5_list = close_df.rolling(window=5).mean().fillna(close[0]).values.tolist() avg5 = [item[0] for item in avg5_list] avg10_list = close_df.rolling(window=10).mean().fillna(close[0]).values.tolist() avg10 = [item[0] for item in avg10_list] avg20_list = close_df.rolling(window=20).mean().fillna(close[0]).values.tolist() avg20 = [item[0] for item in avg20_list] avg30_list = close_df.rolling(window=30).mean().fillna(close[0]).values.tolist() avg30 = [item[0] for item in avg30_list] avg60_list = close_df.rolling(window=60).mean().fillna(close[0]).values.tolist() avg60 = [item[0] for item in avg60_list] df = pd.DataFrame(close) max20 = df.rolling(window=20).mean() stddev20 = df.rolling(window=20).std() upper_df = max20 + (stddev20 * 2) # 상단 볼린저 밴드 lower_df = max20 - (stddev20 * 2) # 하단 볼린저 밴드 upper, lower = [], [] for i in range(len(upper_df)): if i < 10: upper.append(upper_df.values[0][0]) lower.append(lower_df.values[0][0]) else: upper.append(upper_df.values[i][0]) lower.append(lower_df.values[i][0]) point_temp = result["time"] STOCK = [] for i in range(len(open)): STOCK.append({'volume': vol[i], 'close': close[i], 'open': open[i], 'high': high[i], 'low': low[i], 'avg3': avg3[i], 'avg5': avg5[i],'avg10': avg10[i],'avg20': avg20[i],'avg30': avg30[i],'avg60': avg60[i]}) # stochastic 계산 stochastic_df = self.stochastic.apply(STOCK, n=30, m=5, t=5) stochastic_df = stochastic_df.fillna(100) fast_k = stochastic_df['fast_k'].values.tolist() slow_k = stochastic_df['slow_k'].values.tolist() slow_d = stochastic_df['slow_d'].values.tolist() # macd 계산 macd_df = self.macd.apply(STOCK, short=12, long=26, t=9) macd_df = macd_df.fillna(100) macd = macd_df['macd'].values.tolist() macds = macd_df['macds'].values.tolist() macdo = macd_df['macdo'].values.tolist() # rsi 계산 rsi_df = self.rsi.apply(STOCK, period=30, window=5) rsi_df = rsi_df.fillna(100) rsi = rsi_df['rsi'].values.tolist() rsis = rsi_df['rsis'].values.tolist() # ichimokuCloud 계산 # ichimokuCloud_df = self.ichimokuCloud.apply(STOCK, c=9, b=26, l=52) # ichimokuCloud_df = rsi_df.fillna(100) # changeLine = rsi_df['changeLine'].values.tolist() # baseLine = rsi_df['baseLine'].values.tolist() # leadingSpan1 = rsi_df['leadingSpan1'].values.tolist() # leadingSpan2 = rsi_df['leadingSpan2'].values.tolist() temp = {"date": point_temp, "open": open, "high": high, "low": low, "close": close, "volume": vol, "upper": upper, "lower": lower, "avg3": avg3, "avg5": avg5, "avg10": avg10, "avg20": avg20, "avg30": avg30, "avg60": avg60, "macd": macd, "macds": macds, "macdo": macdo, "fast_k": fast_k, "slow_k": slow_k, "slow_d": slow_d, "rsi": rsi, "rsis": rsis} data = pd.DataFrame(temp) df_final_time = pd.DatetimeIndex(point_temp) data.index = df_final_time data.fillna(0) return data def getDBData(self, stock_code, lastday, result): tableName = 'hts' conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, "hts.db")) cursor = conn.cursor() cursor.execute('SELECT ymd, hms, open, high, low, close, volume FROM ' + tableName + ' WHERE CODE=? and ymd=? order by ymd, hms', (stock_code, lastday,)) db_result = cursor.fetchall() for rows in db_result: ymd = rows[0] # hts.날짜 hms = rows[1] # hts.시간 open = rows[2] # hts.시가 high = rows[3] # hts.고가 low = rows[4] # hts.저가 close = rows[5] # hts.종가 vol = rows[6] # hts.거래량 temp = datetime.strptime(str(ymd) + " " + str(hms).zfill(4) + "00", '%Y%m%d %H%M%S') result["time"].append(temp) result["open"].append(int(open)) result["close"].append(int(close)) result["high"].append(int(high)) result["low"].append(int(low)) result["vol"].append(int(vol)) return def vectorize(self, stock_code, given_day): result = {"check": set(), "time": [], "open": [], "close": [], "high": [], "low": [], "vol": []} for i in range(1, 10): last_day = (datetime.strptime(given_day, '%Y%m%d') - timedelta(i)).strftime('%Y%m%d') self.getDBData(stock_code, last_day, result) if len(result['time']) > 0: break self.getDBData(stock_code, given_day, result) # 분석을 통해서 볼린저밴드 상/하단을 계산한다. data = self.analyze(result) return data if __name__ == "__main__": PROJECT_HOME = os.path.join(os.path.dirname(os.path.join(os.path.dirname(os.path.join(os.path.dirname(__file__)))))) RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources") stock2Vector = Stock2Vector(RESOURCE_PATH) # to check bying stock_codes = { # 252670 # 122630 "122630": ['20220725'], } for stock_code in stock_codes: for given_day in stock_codes[stock_code]: stock2Vector.vectorize(stock_code, given_day) print ("done...")