Files
DeepStock/analyzer/122630.py
dsyoon 11531e4507 init
2023-07-22 21:12:48 +09:00

289 lines
12 KiB
Python

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import tqdm
from torch.autograd import Variable
import argparse
import math
import torch.nn.functional as F
checkpointdir = './model/122630'
plotdir = './plot/122630'
torch.random.manual_seed(0)
np.random.seed(0)
parser = argparse.ArgumentParser("Transformer-LSTM")
parser.add_argument("-data_path", type=str, default="./stocks/KOSPI.csv", help="dataset path")
args = parser.parse_args()
time_step = 10
class PositionalEncoding(nn.Module):
# Transformer의 Positional Encoding 정의
def __init__(self, d_model, max_len=500):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model) # 입력 값의 최대 길이만큼 0인 텐서 값 생성
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) # sin 주파수 기능 사용
pe[:, 1::2] = torch.cos(position * div_term) # cos 주파수 기능 사용
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :] # 각 입력값마다 positional encoding 진행
class TransAm(nn.Module):
# Transformer Encoder 구조 정의
def __init__(self, feature_size=64, num_layers=6, dropout=0.1):
super(TransAm, self).__init__()
self.model_type = 'Transformer'
self.src_mask = None
self.pos_encoder = PositionalEncoding(feature_size)
# torch.nn 모듈에 있는 encoder 및 decoder 레이어 설정
self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=8, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.decoder_layer = nn.TransformerDecoderLayer(d_model=feature_size, nhead=8, dropout=dropout)
self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
self.decoder = nn.Linear(feature_size, 1)
self.init_weights()
# decoder 가중치 초기화
def init_weights(self):
initrange = 0.1
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-initrange, initrange)
# decoder에서 다음 값 예측 시 sequence의 다음 값을 모르게 하기 위해 마스킹 함수 정의
def _generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
# 앞서 정의한 함수를 사용해 Transformer Encoder의 순전파 진행
def forward(self, src):
if self.src_mask is None or self.src_mask.size(0) != len(src):
device = src.device
mask = self._generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask # mask를 씌워 Mult-head Attn 수행
src = self.pos_encoder(src)
output = self.transformer_encoder(src)
return output
class AttnDecoder(nn.Module):
# Transformer Decoder 구조 정의
def __init__(self, code_hidden_size, hidden_size, time_step):
super(AttnDecoder, self).__init__()
self.code_hidden_size = code_hidden_size
self.hidden_size = hidden_size
self.T = time_step
# Model, Layer, Activation Fuction 정의
self.attn1 = nn.Linear(in_features=2 * hidden_size, out_features=code_hidden_size)
self.attn2 = nn.Linear(in_features=code_hidden_size, out_features=code_hidden_size)
self.tanh = nn.Tanh()
self.attn3 = nn.Linear(in_features=code_hidden_size, out_features=1)
self.lstm = nn.LSTM(input_size=1, hidden_size=self.hidden_size, num_layers=1)
self.tilde = nn.Linear(in_features=self.code_hidden_size + 1, out_features=1)
self.fc1 = nn.Linear(in_features=code_hidden_size + hidden_size, out_features=hidden_size)
self.fc2 = nn.Linear(in_features=hidden_size, out_features=1)
# 인풋 사이즈의 0값을 갖는 초기 텐서 생성
def init_variable(self, *args):
zero_tensor = torch.zeros(args)
return Variable(zero_tensor)
# hidden layer embedding 순서값을 바꾸어 줌
def embedding_hidden(self, x):
return x.permute(1, 0, 2)
# 앞서 정의한 함수를 사용해 Transformer Decoder의 순전파 진행
def forward(self, h, y_seq):
h_ = h.transpose(0, 1)
batch_size = h.size(0)
d = self.init_variable(1, batch_size, self.hidden_size)
s = self.init_variable(1, batch_size, self.hidden_size)
h_0 = self.init_variable(1, batch_size, self.hidden_size)
h_ = torch.cat((h_0, h_), dim=0)
for t in range(self.T):
x = torch.cat((d, h_[t, :, :].unsqueeze(0)), 2)
h1 = self.attn1(x)
_, states = self.lstm(y_seq[:, t].unsqueeze(0).unsqueeze(2), (h1, s))
d = states[0]
s = states[1]
y_res = self.fc2(self.fc1(torch.cat((d.squeeze(0), h_[-1, :, :]), dim=1)))
return y_res
class StockDataset(Dataset):
def __init__(self, file_path, T=time_step, train_flag=True):
# KOSPI 데이터 불러오기
with open(file_path, "r", encoding="utf-8") as fp:
data_pd = pd.read_csv(fp)
self.train_flag = train_flag # 학습용 데이터를 True로 설정
self.data_train_ratio = 0.9 # 90%를 학습용 데이터로 사용
self.T = T
# 학습용 데이터인 경우
if train_flag:
self.data_len = int(self.data_train_ratio * len(data_pd))
data_all = np.array(data_pd['close']) # KOSPI 종가 데이터 셋 활용
data_all = (data_all - np.mean(data_all)) / np.std(data_all) # 데이터 셋 표준화
self.data = data_all[: self.data_len]
# 평가용 데이터인 경우
else:
self.data_len = int((1 - self.data_train_ratio) * len(data_pd))
data_all = np.array(data_pd['close'])
data_all = (data_all - np.mean(data_all)) / np.std(data_all)
self.data = data_all[-self.data_len:]
print("data len:{}".format(self.data_len)) # 학습 시 학습/평가 데이터 개수 출력
def __len__(self):
return self.data_len - self.T
def __getitem__(self, idx):
return self.data[idx:idx + self.T], self.data[idx + self.T]
def l2_loss(pred, label):
loss = torch.nn.functional.mse_loss(pred, label, size_average=True) # MSE Loss 사용
return loss
def train_once(encoder, decoder, dataloader, encoder_optim, decoder_optim):
# encoder, decoder 각각의 모델을 학습 단계로 설정
encoder.train()
decoder.train()
loader = tqdm.tqdm(dataloader)
loss_epoch = 0
for idx, (data, label) in enumerate(loader):
data_x = data.unsqueeze(2)
data_tran = data_x.transpose(0, 1)
data_x, label, data_y = data_tran.float(), label.float(), data.float()
code_hidden = encoder(data_x) # batch_size(64) 별로 전체 데이터를 나누어서 encoder 학습 진행
code_hidden = code_hidden.transpose(0, 1)
output = decoder(code_hidden, data_y) # batch_size(64) 별로 전체 데이터를 나누어서 decoder 학습 진행
encoder_optim.zero_grad() # epoch 한 번의 학습이 완료되어지면 gradient를 항상 0으로 초기화
decoder_optim.zero_grad()
loss = l2_loss(output.squeeze(1), label) # 손실 함수는 MSE Loss로 설정
loss.backward() # 역전파 진행
encoder_optim.step() # 역전파 단계에서 수집된 변화도로 매개변수 조정
decoder_optim.step()
loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력
loss_epoch /= len(loader)
return loss_epoch
def eval_once(encoder, decoder, dataloader):
# encoder, decoder 각각의 모델을 평가 단계로 설정
encoder.eval()
decoder.eval()
loader = tqdm.tqdm(dataloader)
loss_epoch = 0
preds = []
labels = []
for idx, (data, label) in enumerate(loader):
# data: batch, time x 1
data_x = data.unsqueeze(2)
data_x, label, data_y = data_x.float(), label.float(), data.float()
code_hidden = encoder(data_x) # encoder를 거쳐 code_hidden 출력
output = decoder(code_hidden, data_y).squeeze(1) # decoder를 거쳐 output 출력
loss = l2_loss(output, label) # 손실함수는 MSE Loss로 설정
loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력
preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가
labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가
preds = torch.Tensor(preds) # 각 예측값과 정답값을 Tensor 형태로 변환
labels = torch.Tensor(labels)
# 각 예측값과 정답값 계산
pred1 = preds[:-1]
pred2 = preds[1:]
pred_ = preds[1:] > preds[:-1]
label1 = labels[:-1]
label2 = labels[1:]
label_ = labels[1:] > labels[:-1]
accuracy = (label_ == pred_).sum() / len(pred1) # 앞서 정의한 예측값과 정답값을 기준으로 accuracy 계산
loss_epoch /= len(loader) # 앞서 정의한 예측값과 정답값을 기준으로 loss 값 계산
return loss_epoch, accuracy
def eval_plot(encoder, decoder, dataloader):
dataloader.shuffle = False # 평가 단계이므로 shuffle=False로 설정
preds = []
labels = []
# encoder, decoder 각각의 모델을 평가 단계로 설정
encoder.eval()
decoder.eval()
loader = tqdm.tqdm(dataloader)
for idx, (data, label) in enumerate(loader):
data_x = data.unsqueeze(2)
data_x, label, data_y = data_x.float(), label.float(), data.float()
code_hidden = encoder(data_x) # encoder를 거쳐 core_hidden 출력
output = decoder(code_hidden, data_y) # decoder를 거쳐 output 출력
preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가
labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가
fig, ax = plt.subplots()
data_x = list(range(len(preds)))
ax.plot(data_x, preds, label='predict', color='red') # 빨간색으로 예측값 lineplot 생성
ax.plot(data_x, labels, label='ground truth', color='blue') # 파란색으로 정답값 lineplot 생성
plt.legend()
plt.show()
def main():
# train, val 데이터 셋 불러오기
dataset_train = StockDataset(file_path=args.data_path)
dataset_val = StockDataset(file_path=args.data_path, train_flag=False)
train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True)
val_loader = DataLoader(dataset_val, batch_size=64, shuffle=False)
encoder = TransAm() # encoder는 앞서 정의한 TransAM 함수 사용
decoder = AttnDecoder(code_hidden_size=64, hidden_size=64, time_step=time_step) # decoder는 앞서 정의한 AttnDecoder 함수 사용
# 각 encoder, decoder의 optimizer는 Adam으로 설정
encoder_optim = torch.optim.Adam(encoder.parameters(), lr=0.001)
decoder_optim = torch.optim.Adam(decoder.parameters(), lr=0.001)
total_epoch = 101 # 100 epoch까지 checkpoints 및 결과 plot 생성
for epoch_idx in range(total_epoch):
train_loss = train_once(encoder, decoder, train_loader, encoder_optim, decoder_optim)
print("stage: train, epoch:{:5d}, loss:{}".format(epoch_idx, train_loss))
if epoch_idx % 10 == 0: # 10 epoch마다 평가용 데이터로 검증
eval_loss, accuracy = eval_once(encoder, decoder, val_loader) # 평가를 진행해 eval_loss와 accuracy 계산
print("##### stage: test, epoch:{:5d}, loss:{}, accuracy:{}".format(epoch_idx, eval_loss,
accuracy)) # 10번의 step마다 loss 출력
eval_plot(encoder, decoder, val_loader)
torch.save(encoder.state_dict(), "{}/checkpoint_{:0>3}.ckpt".format(checkpointdir, epoch_idx)) # 모델의 학습 가중치를 checkpoints로 저장
plt.savefig("{}/122630{}.png".format(plotdir, epoch_idx)) # 각 figure를 png 형태로 저장
if __name__ == "__main__":
main()