From e6a2528ce3d7b3f62d918b1ce27b939f0ca2b0eb Mon Sep 17 00:00:00 2001 From: dsyoon Date: Sat, 22 Jul 2023 20:43:36 +0900 Subject: [PATCH 1/2] init --- ...rectionAnalyzer.py => SPCorrelationAnalyzer.py} | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) rename analyzer/{StockPriceDirectionAnalyzer.py => SPCorrelationAnalyzer.py} (92%) diff --git a/analyzer/StockPriceDirectionAnalyzer.py b/analyzer/SPCorrelationAnalyzer.py similarity index 92% rename from analyzer/StockPriceDirectionAnalyzer.py rename to analyzer/SPCorrelationAnalyzer.py index 492e661..ba3924e 100644 --- a/analyzer/StockPriceDirectionAnalyzer.py +++ b/analyzer/SPCorrelationAnalyzer.py @@ -2,7 +2,7 @@ import os import sqlite3 import pandas as pd -class StockPriceDirectionAnalyzer: +class SPCorrelationAnalyzer: stock_info = None @@ -222,8 +222,16 @@ if __name__ == "__main__": RESOURCE_PATH = os.path.join(PROJECT_HOME, 'resources') stockFileName = os.path.join(RESOURCE_PATH, 'stock.db') - stockPriceDirectionAnalyzer = StockPriceDirectionAnalyzer(stockFileName) - corr_scores = stockPriceDirectionAnalyzer.analyze(master_code='122630') + spCorrelationAnalyzer = SPCorrelationAnalyzer(stockFileName) + + # {"stock_code": "122630", "stock_name": "KODEX 레버리지"} + # {"stock_code": "252670", "stock_name": "KODEX 200선물인버스2X"} + # {"stock_code": "^NDX", "stock_name": "NASDAQ 100"} + # {"stock_code": "TQQQ", "stock_name": "ProShares UltraPro QQQ"} + # {"stock_code": "SQQQ", "stock_name": "ProShares UltraPro Short QQQ"} + # {"stock_code": "SOXL", "stock_name": "Direxion Daily Semiconductor Bull 3X Shares"} + # {"stock_code": "SOXS", "stock_name": "Direxion Daily Semiconductor Bear -3X Shares"} + corr_scores = spCorrelationAnalyzer.analyze(master_code='SOXS') corr_scores_list = sorted(corr_scores.items(), key=lambda item: item[1], reverse=True) for item in corr_scores_list: print("%s %4.3f" % (item[0], item[1])) From 11531e4507160ef51dcadd75b5225368e4d453a5 Mon Sep 17 00:00:00 2001 From: dsyoon Date: Sat, 22 Jul 2023 21:12:48 +0900 Subject: [PATCH 2/2] init --- analyzer/122630.py | 289 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 analyzer/122630.py diff --git a/analyzer/122630.py b/analyzer/122630.py new file mode 100644 index 0000000..493e7ee --- /dev/null +++ b/analyzer/122630.py @@ -0,0 +1,289 @@ +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +import torch +import torch.nn as nn +from torch.utils.data import DataLoader, Dataset + +import tqdm +from torch.autograd import Variable +import argparse +import math +import torch.nn.functional as F + +checkpointdir = './model/122630' +plotdir = './plot/122630' + +torch.random.manual_seed(0) +np.random.seed(0) + +parser = argparse.ArgumentParser("Transformer-LSTM") +parser.add_argument("-data_path", type=str, default="./stocks/KOSPI.csv", help="dataset path") + +args = parser.parse_args() +time_step = 10 + + +class PositionalEncoding(nn.Module): + # Transformer의 Positional Encoding 정의 + def __init__(self, d_model, max_len=500): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) # 입력 값의 최대 길이만큼 0인 텐서 값 생성 + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) # sin 주파수 기능 사용 + pe[:, 1::2] = torch.cos(position * div_term) # cos 주파수 기능 사용 + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer('pe', pe) + + def forward(self, x): + return x + self.pe[:x.size(0), :] # 각 입력값마다 positional encoding 진행 + + +class TransAm(nn.Module): + # Transformer Encoder 구조 정의 + def __init__(self, feature_size=64, num_layers=6, dropout=0.1): + super(TransAm, self).__init__() + self.model_type = 'Transformer' + + self.src_mask = None + self.pos_encoder = PositionalEncoding(feature_size) + + # torch.nn 모듈에 있는 encoder 및 decoder 레이어 설정 + self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=8, dropout=dropout) + self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) + self.decoder_layer = nn.TransformerDecoderLayer(d_model=feature_size, nhead=8, dropout=dropout) + self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers) + + self.decoder = nn.Linear(feature_size, 1) + self.init_weights() + + # decoder 가중치 초기화 + def init_weights(self): + initrange = 0.1 + self.decoder.bias.data.zero_() + self.decoder.weight.data.uniform_(-initrange, initrange) + + # decoder에서 다음 값 예측 시 sequence의 다음 값을 모르게 하기 위해 마스킹 함수 정의 + def _generate_square_subsequent_mask(self, sz): + mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) + mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) + return mask + + # 앞서 정의한 함수를 사용해 Transformer Encoder의 순전파 진행 + def forward(self, src): + if self.src_mask is None or self.src_mask.size(0) != len(src): + device = src.device + mask = self._generate_square_subsequent_mask(len(src)).to(device) + self.src_mask = mask # mask를 씌워 Mult-head Attn 수행 + src = self.pos_encoder(src) + output = self.transformer_encoder(src) + return output + + +class AttnDecoder(nn.Module): + # Transformer Decoder 구조 정의 + def __init__(self, code_hidden_size, hidden_size, time_step): + super(AttnDecoder, self).__init__() + self.code_hidden_size = code_hidden_size + self.hidden_size = hidden_size + self.T = time_step + + # Model, Layer, Activation Fuction 정의 + self.attn1 = nn.Linear(in_features=2 * hidden_size, out_features=code_hidden_size) + self.attn2 = nn.Linear(in_features=code_hidden_size, out_features=code_hidden_size) + self.tanh = nn.Tanh() + self.attn3 = nn.Linear(in_features=code_hidden_size, out_features=1) + self.lstm = nn.LSTM(input_size=1, hidden_size=self.hidden_size, num_layers=1) + self.tilde = nn.Linear(in_features=self.code_hidden_size + 1, out_features=1) + self.fc1 = nn.Linear(in_features=code_hidden_size + hidden_size, out_features=hidden_size) + self.fc2 = nn.Linear(in_features=hidden_size, out_features=1) + + # 인풋 사이즈의 0값을 갖는 초기 텐서 생성 + def init_variable(self, *args): + zero_tensor = torch.zeros(args) + return Variable(zero_tensor) + + # hidden layer embedding 순서값을 바꾸어 줌 + def embedding_hidden(self, x): + return x.permute(1, 0, 2) + + # 앞서 정의한 함수를 사용해 Transformer Decoder의 순전파 진행 + def forward(self, h, y_seq): + h_ = h.transpose(0, 1) + batch_size = h.size(0) + d = self.init_variable(1, batch_size, self.hidden_size) + s = self.init_variable(1, batch_size, self.hidden_size) + h_0 = self.init_variable(1, batch_size, self.hidden_size) + h_ = torch.cat((h_0, h_), dim=0) + + for t in range(self.T): + x = torch.cat((d, h_[t, :, :].unsqueeze(0)), 2) + h1 = self.attn1(x) + _, states = self.lstm(y_seq[:, t].unsqueeze(0).unsqueeze(2), (h1, s)) + d = states[0] + s = states[1] + y_res = self.fc2(self.fc1(torch.cat((d.squeeze(0), h_[-1, :, :]), dim=1))) + return y_res + + +class StockDataset(Dataset): + def __init__(self, file_path, T=time_step, train_flag=True): + # KOSPI 데이터 불러오기 + with open(file_path, "r", encoding="utf-8") as fp: + data_pd = pd.read_csv(fp) + self.train_flag = train_flag # 학습용 데이터를 True로 설정 + self.data_train_ratio = 0.9 # 90%를 학습용 데이터로 사용 + self.T = T + + # 학습용 데이터인 경우 + if train_flag: + self.data_len = int(self.data_train_ratio * len(data_pd)) + data_all = np.array(data_pd['close']) # KOSPI 종가 데이터 셋 활용 + data_all = (data_all - np.mean(data_all)) / np.std(data_all) # 데이터 셋 표준화 + self.data = data_all[: self.data_len] + + # 평가용 데이터인 경우 + else: + self.data_len = int((1 - self.data_train_ratio) * len(data_pd)) + data_all = np.array(data_pd['close']) + data_all = (data_all - np.mean(data_all)) / np.std(data_all) + self.data = data_all[-self.data_len:] + print("data len:{}".format(self.data_len)) # 학습 시 학습/평가 데이터 개수 출력 + + def __len__(self): + return self.data_len - self.T + + def __getitem__(self, idx): + return self.data[idx:idx + self.T], self.data[idx + self.T] + + +def l2_loss(pred, label): + loss = torch.nn.functional.mse_loss(pred, label, size_average=True) # MSE Loss 사용 + return loss + + +def train_once(encoder, decoder, dataloader, encoder_optim, decoder_optim): + # encoder, decoder 각각의 모델을 학습 단계로 설정 + encoder.train() + decoder.train() + loader = tqdm.tqdm(dataloader) + + loss_epoch = 0 + + for idx, (data, label) in enumerate(loader): + data_x = data.unsqueeze(2) + data_tran = data_x.transpose(0, 1) + data_x, label, data_y = data_tran.float(), label.float(), data.float() + code_hidden = encoder(data_x) # batch_size(64) 별로 전체 데이터를 나누어서 encoder 학습 진행 + code_hidden = code_hidden.transpose(0, 1) + output = decoder(code_hidden, data_y) # batch_size(64) 별로 전체 데이터를 나누어서 decoder 학습 진행 + + encoder_optim.zero_grad() # epoch 한 번의 학습이 완료되어지면 gradient를 항상 0으로 초기화 + decoder_optim.zero_grad() + loss = l2_loss(output.squeeze(1), label) # 손실 함수는 MSE Loss로 설정 + loss.backward() # 역전파 진행 + + encoder_optim.step() # 역전파 단계에서 수집된 변화도로 매개변수 조정 + decoder_optim.step() + loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력 + loss_epoch /= len(loader) + return loss_epoch + + +def eval_once(encoder, decoder, dataloader): + # encoder, decoder 각각의 모델을 평가 단계로 설정 + encoder.eval() + decoder.eval() + loader = tqdm.tqdm(dataloader) + + loss_epoch = 0 + + preds = [] + labels = [] + + for idx, (data, label) in enumerate(loader): + # data: batch, time x 1 + data_x = data.unsqueeze(2) + data_x, label, data_y = data_x.float(), label.float(), data.float() + code_hidden = encoder(data_x) # encoder를 거쳐 code_hidden 출력 + output = decoder(code_hidden, data_y).squeeze(1) # decoder를 거쳐 output 출력 + loss = l2_loss(output, label) # 손실함수는 MSE Loss로 설정 + loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력 + preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가 + labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가 + + preds = torch.Tensor(preds) # 각 예측값과 정답값을 Tensor 형태로 변환 + labels = torch.Tensor(labels) + + # 각 예측값과 정답값 계산 + pred1 = preds[:-1] + pred2 = preds[1:] + pred_ = preds[1:] > preds[:-1] + label1 = labels[:-1] + label2 = labels[1:] + label_ = labels[1:] > labels[:-1] + + accuracy = (label_ == pred_).sum() / len(pred1) # 앞서 정의한 예측값과 정답값을 기준으로 accuracy 계산 + loss_epoch /= len(loader) # 앞서 정의한 예측값과 정답값을 기준으로 loss 값 계산 + return loss_epoch, accuracy + + +def eval_plot(encoder, decoder, dataloader): + dataloader.shuffle = False # 평가 단계이므로 shuffle=False로 설정 + preds = [] + labels = [] + # encoder, decoder 각각의 모델을 평가 단계로 설정 + encoder.eval() + decoder.eval() + loader = tqdm.tqdm(dataloader) + + for idx, (data, label) in enumerate(loader): + data_x = data.unsqueeze(2) + data_x, label, data_y = data_x.float(), label.float(), data.float() + code_hidden = encoder(data_x) # encoder를 거쳐 core_hidden 출력 + output = decoder(code_hidden, data_y) # decoder를 거쳐 output 출력 + preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가 + labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가 + + fig, ax = plt.subplots() + data_x = list(range(len(preds))) + + ax.plot(data_x, preds, label='predict', color='red') # 빨간색으로 예측값 lineplot 생성 + ax.plot(data_x, labels, label='ground truth', color='blue') # 파란색으로 정답값 lineplot 생성 + plt.legend() + plt.show() + + +def main(): + # train, val 데이터 셋 불러오기 + + dataset_train = StockDataset(file_path=args.data_path) + dataset_val = StockDataset(file_path=args.data_path, train_flag=False) + train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True) + val_loader = DataLoader(dataset_val, batch_size=64, shuffle=False) + + encoder = TransAm() # encoder는 앞서 정의한 TransAM 함수 사용 + decoder = AttnDecoder(code_hidden_size=64, hidden_size=64, time_step=time_step) # decoder는 앞서 정의한 AttnDecoder 함수 사용 + + # 각 encoder, decoder의 optimizer는 Adam으로 설정 + encoder_optim = torch.optim.Adam(encoder.parameters(), lr=0.001) + decoder_optim = torch.optim.Adam(decoder.parameters(), lr=0.001) + + total_epoch = 101 # 100 epoch까지 checkpoints 및 결과 plot 생성 + + for epoch_idx in range(total_epoch): + train_loss = train_once(encoder, decoder, train_loader, encoder_optim, decoder_optim) + print("stage: train, epoch:{:5d}, loss:{}".format(epoch_idx, train_loss)) + + if epoch_idx % 10 == 0: # 10 epoch마다 평가용 데이터로 검증 + eval_loss, accuracy = eval_once(encoder, decoder, val_loader) # 평가를 진행해 eval_loss와 accuracy 계산 + + print("##### stage: test, epoch:{:5d}, loss:{}, accuracy:{}".format(epoch_idx, eval_loss, + accuracy)) # 10번의 step마다 loss 출력 + eval_plot(encoder, decoder, val_loader) + torch.save(encoder.state_dict(), "{}/checkpoint_{:0>3}.ckpt".format(checkpointdir, epoch_idx)) # 모델의 학습 가중치를 checkpoints로 저장 + plt.savefig("{}/122630{}.png".format(plotdir, epoch_idx)) # 각 figure를 png 형태로 저장 + +if __name__ == "__main__": + main() \ No newline at end of file