289 lines
12 KiB
Python
289 lines
12 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.utils.data import DataLoader, Dataset
|
|
|
|
import tqdm
|
|
from torch.autograd import Variable
|
|
import argparse
|
|
import math
|
|
import torch.nn.functional as F
|
|
|
|
checkpointdir = './model/122630'
|
|
plotdir = './plot/122630'
|
|
|
|
torch.random.manual_seed(0)
|
|
np.random.seed(0)
|
|
|
|
parser = argparse.ArgumentParser("Transformer-LSTM")
|
|
parser.add_argument("-data_path", type=str, default="./stocks/KOSPI.csv", help="dataset path")
|
|
|
|
args = parser.parse_args()
|
|
time_step = 10
|
|
|
|
|
|
class PositionalEncoding(nn.Module):
|
|
# Transformer의 Positional Encoding 정의
|
|
def __init__(self, d_model, max_len=500):
|
|
super(PositionalEncoding, self).__init__()
|
|
pe = torch.zeros(max_len, d_model) # 입력 값의 최대 길이만큼 0인 텐서 값 생성
|
|
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
|
|
pe[:, 0::2] = torch.sin(position * div_term) # sin 주파수 기능 사용
|
|
pe[:, 1::2] = torch.cos(position * div_term) # cos 주파수 기능 사용
|
|
pe = pe.unsqueeze(0).transpose(0, 1)
|
|
self.register_buffer('pe', pe)
|
|
|
|
def forward(self, x):
|
|
return x + self.pe[:x.size(0), :] # 각 입력값마다 positional encoding 진행
|
|
|
|
|
|
class TransAm(nn.Module):
|
|
# Transformer Encoder 구조 정의
|
|
def __init__(self, feature_size=64, num_layers=6, dropout=0.1):
|
|
super(TransAm, self).__init__()
|
|
self.model_type = 'Transformer'
|
|
|
|
self.src_mask = None
|
|
self.pos_encoder = PositionalEncoding(feature_size)
|
|
|
|
# torch.nn 모듈에 있는 encoder 및 decoder 레이어 설정
|
|
self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=8, dropout=dropout)
|
|
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
|
|
self.decoder_layer = nn.TransformerDecoderLayer(d_model=feature_size, nhead=8, dropout=dropout)
|
|
self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
|
|
|
|
self.decoder = nn.Linear(feature_size, 1)
|
|
self.init_weights()
|
|
|
|
# decoder 가중치 초기화
|
|
def init_weights(self):
|
|
initrange = 0.1
|
|
self.decoder.bias.data.zero_()
|
|
self.decoder.weight.data.uniform_(-initrange, initrange)
|
|
|
|
# decoder에서 다음 값 예측 시 sequence의 다음 값을 모르게 하기 위해 마스킹 함수 정의
|
|
def _generate_square_subsequent_mask(self, sz):
|
|
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
|
|
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
|
|
return mask
|
|
|
|
# 앞서 정의한 함수를 사용해 Transformer Encoder의 순전파 진행
|
|
def forward(self, src):
|
|
if self.src_mask is None or self.src_mask.size(0) != len(src):
|
|
device = src.device
|
|
mask = self._generate_square_subsequent_mask(len(src)).to(device)
|
|
self.src_mask = mask # mask를 씌워 Mult-head Attn 수행
|
|
src = self.pos_encoder(src)
|
|
output = self.transformer_encoder(src)
|
|
return output
|
|
|
|
|
|
class AttnDecoder(nn.Module):
|
|
# Transformer Decoder 구조 정의
|
|
def __init__(self, code_hidden_size, hidden_size, time_step):
|
|
super(AttnDecoder, self).__init__()
|
|
self.code_hidden_size = code_hidden_size
|
|
self.hidden_size = hidden_size
|
|
self.T = time_step
|
|
|
|
# Model, Layer, Activation Fuction 정의
|
|
self.attn1 = nn.Linear(in_features=2 * hidden_size, out_features=code_hidden_size)
|
|
self.attn2 = nn.Linear(in_features=code_hidden_size, out_features=code_hidden_size)
|
|
self.tanh = nn.Tanh()
|
|
self.attn3 = nn.Linear(in_features=code_hidden_size, out_features=1)
|
|
self.lstm = nn.LSTM(input_size=1, hidden_size=self.hidden_size, num_layers=1)
|
|
self.tilde = nn.Linear(in_features=self.code_hidden_size + 1, out_features=1)
|
|
self.fc1 = nn.Linear(in_features=code_hidden_size + hidden_size, out_features=hidden_size)
|
|
self.fc2 = nn.Linear(in_features=hidden_size, out_features=1)
|
|
|
|
# 인풋 사이즈의 0값을 갖는 초기 텐서 생성
|
|
def init_variable(self, *args):
|
|
zero_tensor = torch.zeros(args)
|
|
return Variable(zero_tensor)
|
|
|
|
# hidden layer embedding 순서값을 바꾸어 줌
|
|
def embedding_hidden(self, x):
|
|
return x.permute(1, 0, 2)
|
|
|
|
# 앞서 정의한 함수를 사용해 Transformer Decoder의 순전파 진행
|
|
def forward(self, h, y_seq):
|
|
h_ = h.transpose(0, 1)
|
|
batch_size = h.size(0)
|
|
d = self.init_variable(1, batch_size, self.hidden_size)
|
|
s = self.init_variable(1, batch_size, self.hidden_size)
|
|
h_0 = self.init_variable(1, batch_size, self.hidden_size)
|
|
h_ = torch.cat((h_0, h_), dim=0)
|
|
|
|
for t in range(self.T):
|
|
x = torch.cat((d, h_[t, :, :].unsqueeze(0)), 2)
|
|
h1 = self.attn1(x)
|
|
_, states = self.lstm(y_seq[:, t].unsqueeze(0).unsqueeze(2), (h1, s))
|
|
d = states[0]
|
|
s = states[1]
|
|
y_res = self.fc2(self.fc1(torch.cat((d.squeeze(0), h_[-1, :, :]), dim=1)))
|
|
return y_res
|
|
|
|
|
|
class StockDataset(Dataset):
|
|
def __init__(self, file_path, T=time_step, train_flag=True):
|
|
# KOSPI 데이터 불러오기
|
|
with open(file_path, "r", encoding="utf-8") as fp:
|
|
data_pd = pd.read_csv(fp)
|
|
self.train_flag = train_flag # 학습용 데이터를 True로 설정
|
|
self.data_train_ratio = 0.9 # 90%를 학습용 데이터로 사용
|
|
self.T = T
|
|
|
|
# 학습용 데이터인 경우
|
|
if train_flag:
|
|
self.data_len = int(self.data_train_ratio * len(data_pd))
|
|
data_all = np.array(data_pd['close']) # KOSPI 종가 데이터 셋 활용
|
|
data_all = (data_all - np.mean(data_all)) / np.std(data_all) # 데이터 셋 표준화
|
|
self.data = data_all[: self.data_len]
|
|
|
|
# 평가용 데이터인 경우
|
|
else:
|
|
self.data_len = int((1 - self.data_train_ratio) * len(data_pd))
|
|
data_all = np.array(data_pd['close'])
|
|
data_all = (data_all - np.mean(data_all)) / np.std(data_all)
|
|
self.data = data_all[-self.data_len:]
|
|
print("data len:{}".format(self.data_len)) # 학습 시 학습/평가 데이터 개수 출력
|
|
|
|
def __len__(self):
|
|
return self.data_len - self.T
|
|
|
|
def __getitem__(self, idx):
|
|
return self.data[idx:idx + self.T], self.data[idx + self.T]
|
|
|
|
|
|
def l2_loss(pred, label):
|
|
loss = torch.nn.functional.mse_loss(pred, label, size_average=True) # MSE Loss 사용
|
|
return loss
|
|
|
|
|
|
def train_once(encoder, decoder, dataloader, encoder_optim, decoder_optim):
|
|
# encoder, decoder 각각의 모델을 학습 단계로 설정
|
|
encoder.train()
|
|
decoder.train()
|
|
loader = tqdm.tqdm(dataloader)
|
|
|
|
loss_epoch = 0
|
|
|
|
for idx, (data, label) in enumerate(loader):
|
|
data_x = data.unsqueeze(2)
|
|
data_tran = data_x.transpose(0, 1)
|
|
data_x, label, data_y = data_tran.float(), label.float(), data.float()
|
|
code_hidden = encoder(data_x) # batch_size(64) 별로 전체 데이터를 나누어서 encoder 학습 진행
|
|
code_hidden = code_hidden.transpose(0, 1)
|
|
output = decoder(code_hidden, data_y) # batch_size(64) 별로 전체 데이터를 나누어서 decoder 학습 진행
|
|
|
|
encoder_optim.zero_grad() # epoch 한 번의 학습이 완료되어지면 gradient를 항상 0으로 초기화
|
|
decoder_optim.zero_grad()
|
|
loss = l2_loss(output.squeeze(1), label) # 손실 함수는 MSE Loss로 설정
|
|
loss.backward() # 역전파 진행
|
|
|
|
encoder_optim.step() # 역전파 단계에서 수집된 변화도로 매개변수 조정
|
|
decoder_optim.step()
|
|
loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력
|
|
loss_epoch /= len(loader)
|
|
return loss_epoch
|
|
|
|
|
|
def eval_once(encoder, decoder, dataloader):
|
|
# encoder, decoder 각각의 모델을 평가 단계로 설정
|
|
encoder.eval()
|
|
decoder.eval()
|
|
loader = tqdm.tqdm(dataloader)
|
|
|
|
loss_epoch = 0
|
|
|
|
preds = []
|
|
labels = []
|
|
|
|
for idx, (data, label) in enumerate(loader):
|
|
# data: batch, time x 1
|
|
data_x = data.unsqueeze(2)
|
|
data_x, label, data_y = data_x.float(), label.float(), data.float()
|
|
code_hidden = encoder(data_x) # encoder를 거쳐 code_hidden 출력
|
|
output = decoder(code_hidden, data_y).squeeze(1) # decoder를 거쳐 output 출력
|
|
loss = l2_loss(output, label) # 손실함수는 MSE Loss로 설정
|
|
loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력
|
|
preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가
|
|
labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가
|
|
|
|
preds = torch.Tensor(preds) # 각 예측값과 정답값을 Tensor 형태로 변환
|
|
labels = torch.Tensor(labels)
|
|
|
|
# 각 예측값과 정답값 계산
|
|
pred1 = preds[:-1]
|
|
pred2 = preds[1:]
|
|
pred_ = preds[1:] > preds[:-1]
|
|
label1 = labels[:-1]
|
|
label2 = labels[1:]
|
|
label_ = labels[1:] > labels[:-1]
|
|
|
|
accuracy = (label_ == pred_).sum() / len(pred1) # 앞서 정의한 예측값과 정답값을 기준으로 accuracy 계산
|
|
loss_epoch /= len(loader) # 앞서 정의한 예측값과 정답값을 기준으로 loss 값 계산
|
|
return loss_epoch, accuracy
|
|
|
|
|
|
def eval_plot(encoder, decoder, dataloader):
|
|
dataloader.shuffle = False # 평가 단계이므로 shuffle=False로 설정
|
|
preds = []
|
|
labels = []
|
|
# encoder, decoder 각각의 모델을 평가 단계로 설정
|
|
encoder.eval()
|
|
decoder.eval()
|
|
loader = tqdm.tqdm(dataloader)
|
|
|
|
for idx, (data, label) in enumerate(loader):
|
|
data_x = data.unsqueeze(2)
|
|
data_x, label, data_y = data_x.float(), label.float(), data.float()
|
|
code_hidden = encoder(data_x) # encoder를 거쳐 core_hidden 출력
|
|
output = decoder(code_hidden, data_y) # decoder를 거쳐 output 출력
|
|
preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가
|
|
labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가
|
|
|
|
fig, ax = plt.subplots()
|
|
data_x = list(range(len(preds)))
|
|
|
|
ax.plot(data_x, preds, label='predict', color='red') # 빨간색으로 예측값 lineplot 생성
|
|
ax.plot(data_x, labels, label='ground truth', color='blue') # 파란색으로 정답값 lineplot 생성
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
def main():
|
|
# train, val 데이터 셋 불러오기
|
|
|
|
dataset_train = StockDataset(file_path=args.data_path)
|
|
dataset_val = StockDataset(file_path=args.data_path, train_flag=False)
|
|
train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True)
|
|
val_loader = DataLoader(dataset_val, batch_size=64, shuffle=False)
|
|
|
|
encoder = TransAm() # encoder는 앞서 정의한 TransAM 함수 사용
|
|
decoder = AttnDecoder(code_hidden_size=64, hidden_size=64, time_step=time_step) # decoder는 앞서 정의한 AttnDecoder 함수 사용
|
|
|
|
# 각 encoder, decoder의 optimizer는 Adam으로 설정
|
|
encoder_optim = torch.optim.Adam(encoder.parameters(), lr=0.001)
|
|
decoder_optim = torch.optim.Adam(decoder.parameters(), lr=0.001)
|
|
|
|
total_epoch = 101 # 100 epoch까지 checkpoints 및 결과 plot 생성
|
|
|
|
for epoch_idx in range(total_epoch):
|
|
train_loss = train_once(encoder, decoder, train_loader, encoder_optim, decoder_optim)
|
|
print("stage: train, epoch:{:5d}, loss:{}".format(epoch_idx, train_loss))
|
|
|
|
if epoch_idx % 10 == 0: # 10 epoch마다 평가용 데이터로 검증
|
|
eval_loss, accuracy = eval_once(encoder, decoder, val_loader) # 평가를 진행해 eval_loss와 accuracy 계산
|
|
|
|
print("##### stage: test, epoch:{:5d}, loss:{}, accuracy:{}".format(epoch_idx, eval_loss,
|
|
accuracy)) # 10번의 step마다 loss 출력
|
|
eval_plot(encoder, decoder, val_loader)
|
|
torch.save(encoder.state_dict(), "{}/checkpoint_{:0>3}.ckpt".format(checkpointdir, epoch_idx)) # 모델의 학습 가중치를 checkpoints로 저장
|
|
plt.savefig("{}/122630{}.png".format(plotdir, epoch_idx)) # 각 figure를 png 형태로 저장
|
|
|
|
if __name__ == "__main__":
|
|
main() |