This commit is contained in:
dsyoon
2024-03-20 19:51:17 +09:00
parent 027698ed06
commit 8b8b89c0a1
5 changed files with 2 additions and 542 deletions

View File

@@ -1,289 +0,0 @@
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import tqdm
from torch.autograd import Variable
import argparse
import math
import torch.nn.functional as F
checkpointdir = './model/122630'
plotdir = './plot/122630'
torch.random.manual_seed(0)
np.random.seed(0)
parser = argparse.ArgumentParser("Transformer-LSTM")
parser.add_argument("-data_path", type=str, default="./stocks/KOSPI.csv", help="dataset path")
args = parser.parse_args()
time_step = 10
class PositionalEncoding(nn.Module):
# Transformer의 Positional Encoding 정의
def __init__(self, d_model, max_len=500):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model) # 입력 값의 최대 길이만큼 0인 텐서 값 생성
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) # sin 주파수 기능 사용
pe[:, 1::2] = torch.cos(position * div_term) # cos 주파수 기능 사용
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :] # 각 입력값마다 positional encoding 진행
class TransAm(nn.Module):
# Transformer Encoder 구조 정의
def __init__(self, feature_size=64, num_layers=6, dropout=0.1):
super(TransAm, self).__init__()
self.model_type = 'Transformer'
self.src_mask = None
self.pos_encoder = PositionalEncoding(feature_size)
# torch.nn 모듈에 있는 encoder 및 decoder 레이어 설정
self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=8, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.decoder_layer = nn.TransformerDecoderLayer(d_model=feature_size, nhead=8, dropout=dropout)
self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
self.decoder = nn.Linear(feature_size, 1)
self.init_weights()
# decoder 가중치 초기화
def init_weights(self):
initrange = 0.1
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-initrange, initrange)
# decoder에서 다음 값 예측 시 sequence의 다음 값을 모르게 하기 위해 마스킹 함수 정의
def _generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
# 앞서 정의한 함수를 사용해 Transformer Encoder의 순전파 진행
def forward(self, src):
if self.src_mask is None or self.src_mask.size(0) != len(src):
device = src.device
mask = self._generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask # mask를 씌워 Mult-head Attn 수행
src = self.pos_encoder(src)
output = self.transformer_encoder(src)
return output
class AttnDecoder(nn.Module):
# Transformer Decoder 구조 정의
def __init__(self, code_hidden_size, hidden_size, time_step):
super(AttnDecoder, self).__init__()
self.code_hidden_size = code_hidden_size
self.hidden_size = hidden_size
self.T = time_step
# Model, Layer, Activation Fuction 정의
self.attn1 = nn.Linear(in_features=2 * hidden_size, out_features=code_hidden_size)
self.attn2 = nn.Linear(in_features=code_hidden_size, out_features=code_hidden_size)
self.tanh = nn.Tanh()
self.attn3 = nn.Linear(in_features=code_hidden_size, out_features=1)
self.lstm = nn.LSTM(input_size=1, hidden_size=self.hidden_size, num_layers=1)
self.tilde = nn.Linear(in_features=self.code_hidden_size + 1, out_features=1)
self.fc1 = nn.Linear(in_features=code_hidden_size + hidden_size, out_features=hidden_size)
self.fc2 = nn.Linear(in_features=hidden_size, out_features=1)
# 인풋 사이즈의 0값을 갖는 초기 텐서 생성
def init_variable(self, *args):
zero_tensor = torch.zeros(args)
return Variable(zero_tensor)
# hidden layer embedding 순서값을 바꾸어 줌
def embedding_hidden(self, x):
return x.permute(1, 0, 2)
# 앞서 정의한 함수를 사용해 Transformer Decoder의 순전파 진행
def forward(self, h, y_seq):
h_ = h.transpose(0, 1)
batch_size = h.size(0)
d = self.init_variable(1, batch_size, self.hidden_size)
s = self.init_variable(1, batch_size, self.hidden_size)
h_0 = self.init_variable(1, batch_size, self.hidden_size)
h_ = torch.cat((h_0, h_), dim=0)
for t in range(self.T):
x = torch.cat((d, h_[t, :, :].unsqueeze(0)), 2)
h1 = self.attn1(x)
_, states = self.lstm(y_seq[:, t].unsqueeze(0).unsqueeze(2), (h1, s))
d = states[0]
s = states[1]
y_res = self.fc2(self.fc1(torch.cat((d.squeeze(0), h_[-1, :, :]), dim=1)))
return y_res
class StockDataset(Dataset):
def __init__(self, file_path, T=time_step, train_flag=True):
# KOSPI 데이터 불러오기
with open(file_path, "r", encoding="utf-8") as fp:
data_pd = pd.read_csv(fp)
self.train_flag = train_flag # 학습용 데이터를 True로 설정
self.data_train_ratio = 0.9 # 90%를 학습용 데이터로 사용
self.T = T
# 학습용 데이터인 경우
if train_flag:
self.data_len = int(self.data_train_ratio * len(data_pd))
data_all = np.array(data_pd['close']) # KOSPI 종가 데이터 셋 활용
data_all = (data_all - np.mean(data_all)) / np.std(data_all) # 데이터 셋 표준화
self.data = data_all[: self.data_len]
# 평가용 데이터인 경우
else:
self.data_len = int((1 - self.data_train_ratio) * len(data_pd))
data_all = np.array(data_pd['close'])
data_all = (data_all - np.mean(data_all)) / np.std(data_all)
self.data = data_all[-self.data_len:]
print("data len:{}".format(self.data_len)) # 학습 시 학습/평가 데이터 개수 출력
def __len__(self):
return self.data_len - self.T
def __getitem__(self, idx):
return self.data[idx:idx + self.T], self.data[idx + self.T]
def l2_loss(pred, label):
loss = torch.nn.functional.mse_loss(pred, label, size_average=True) # MSE Loss 사용
return loss
def train_once(encoder, decoder, dataloader, encoder_optim, decoder_optim):
# encoder, decoder 각각의 모델을 학습 단계로 설정
encoder.train()
decoder.train()
loader = tqdm.tqdm(dataloader)
loss_epoch = 0
for idx, (data, label) in enumerate(loader):
data_x = data.unsqueeze(2)
data_tran = data_x.transpose(0, 1)
data_x, label, data_y = data_tran.float(), label.float(), data.float()
code_hidden = encoder(data_x) # batch_size(64) 별로 전체 데이터를 나누어서 encoder 학습 진행
code_hidden = code_hidden.transpose(0, 1)
output = decoder(code_hidden, data_y) # batch_size(64) 별로 전체 데이터를 나누어서 decoder 학습 진행
encoder_optim.zero_grad() # epoch 한 번의 학습이 완료되어지면 gradient를 항상 0으로 초기화
decoder_optim.zero_grad()
loss = l2_loss(output.squeeze(1), label) # 손실 함수는 MSE Loss로 설정
loss.backward() # 역전파 진행
encoder_optim.step() # 역전파 단계에서 수집된 변화도로 매개변수 조정
decoder_optim.step()
loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력
loss_epoch /= len(loader)
return loss_epoch
def eval_once(encoder, decoder, dataloader):
# encoder, decoder 각각의 모델을 평가 단계로 설정
encoder.eval()
decoder.eval()
loader = tqdm.tqdm(dataloader)
loss_epoch = 0
preds = []
labels = []
for idx, (data, label) in enumerate(loader):
# data: batch, time x 1
data_x = data.unsqueeze(2)
data_x, label, data_y = data_x.float(), label.float(), data.float()
code_hidden = encoder(data_x) # encoder를 거쳐 code_hidden 출력
output = decoder(code_hidden, data_y).squeeze(1) # decoder를 거쳐 output 출력
loss = l2_loss(output, label) # 손실함수는 MSE Loss로 설정
loss_epoch += loss.detach().item() # 각 epoch 별 loss 출력
preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가
labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가
preds = torch.Tensor(preds) # 각 예측값과 정답값을 Tensor 형태로 변환
labels = torch.Tensor(labels)
# 각 예측값과 정답값 계산
pred1 = preds[:-1]
pred2 = preds[1:]
pred_ = preds[1:] > preds[:-1]
label1 = labels[:-1]
label2 = labels[1:]
label_ = labels[1:] > labels[:-1]
accuracy = (label_ == pred_).sum() / len(pred1) # 앞서 정의한 예측값과 정답값을 기준으로 accuracy 계산
loss_epoch /= len(loader) # 앞서 정의한 예측값과 정답값을 기준으로 loss 값 계산
return loss_epoch, accuracy
def eval_plot(encoder, decoder, dataloader):
dataloader.shuffle = False # 평가 단계이므로 shuffle=False로 설정
preds = []
labels = []
# encoder, decoder 각각의 모델을 평가 단계로 설정
encoder.eval()
decoder.eval()
loader = tqdm.tqdm(dataloader)
for idx, (data, label) in enumerate(loader):
data_x = data.unsqueeze(2)
data_x, label, data_y = data_x.float(), label.float(), data.float()
code_hidden = encoder(data_x) # encoder를 거쳐 core_hidden 출력
output = decoder(code_hidden, data_y) # decoder를 거쳐 output 출력
preds += (output.detach().tolist()) # 예측값 preds를 리스트에 추가
labels += (label.detach().tolist()) # 정답값 label을 리스트에 추가
fig, ax = plt.subplots()
data_x = list(range(len(preds)))
ax.plot(data_x, preds, label='predict', color='red') # 빨간색으로 예측값 lineplot 생성
ax.plot(data_x, labels, label='ground truth', color='blue') # 파란색으로 정답값 lineplot 생성
plt.legend()
plt.show()
def main():
# train, val 데이터 셋 불러오기
dataset_train = StockDataset(file_path=args.data_path)
dataset_val = StockDataset(file_path=args.data_path, train_flag=False)
train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True)
val_loader = DataLoader(dataset_val, batch_size=64, shuffle=False)
encoder = TransAm() # encoder는 앞서 정의한 TransAM 함수 사용
decoder = AttnDecoder(code_hidden_size=64, hidden_size=64, time_step=time_step) # decoder는 앞서 정의한 AttnDecoder 함수 사용
# 각 encoder, decoder의 optimizer는 Adam으로 설정
encoder_optim = torch.optim.Adam(encoder.parameters(), lr=0.001)
decoder_optim = torch.optim.Adam(decoder.parameters(), lr=0.001)
total_epoch = 101 # 100 epoch까지 checkpoints 및 결과 plot 생성
for epoch_idx in range(total_epoch):
train_loss = train_once(encoder, decoder, train_loader, encoder_optim, decoder_optim)
print("stage: train, epoch:{:5d}, loss:{}".format(epoch_idx, train_loss))
if epoch_idx % 10 == 0: # 10 epoch마다 평가용 데이터로 검증
eval_loss, accuracy = eval_once(encoder, decoder, val_loader) # 평가를 진행해 eval_loss와 accuracy 계산
print("##### stage: test, epoch:{:5d}, loss:{}, accuracy:{}".format(epoch_idx, eval_loss,
accuracy)) # 10번의 step마다 loss 출력
eval_plot(encoder, decoder, val_loader)
torch.save(encoder.state_dict(), "{}/checkpoint_{:0>3}.ckpt".format(checkpointdir, epoch_idx)) # 모델의 학습 가중치를 checkpoints로 저장
plt.savefig("{}/122630{}.png".format(plotdir, epoch_idx)) # 각 figure를 png 형태로 저장
if __name__ == "__main__":
main()

View File

@@ -1,250 +0,0 @@
import os
import json
import sqlite3
import pandas as pd
class SPCorrelationAnalyzer:
stock_info = None
def __init__(self, stockFileName):
self.stock_info = {}
self.stockFileName = stockFileName
return
def open(self):
self.conn = sqlite3.connect(self.stockFileName)
self.cursor = self.conn.cursor()
def close(self):
self.cursor.close()
self.conn.close()
return
def get_all_stock_code(self):
self.open()
sql = "SELECT distinct code, name FROM stock"
self.cursor.execute(sql)
rows = self.cursor.fetchall()
for row in rows:
self.stock_info[row[0]] = row[1]
self.close()
return [row[0] for row in rows]
def load(self, master_code, limit_count=2000):
self.open()
sql = "SELECT ymd, close FROM stock where CODE=? order by ymd"
self.cursor.execute(sql, (master_code, ))
master_data = self.cursor.fetchall()
self.close()
master_data = master_data[-limit_count:]
return master_data
def getClosePrice(self, stock_codes, first_day, limit_count=2000):
data = {}
self.open()
# 신용잔고 누적
sql = "SELECT ymd, dep2_1 FROM meta_3 where ymd>=? order by ymd"
self.cursor.execute(sql, (first_day, ))
rows = self.cursor.fetchall()
if len(rows) >=limit_count:
#data['dep2_1'] = [(rows[i][0], rows[i][1]-rows[i-1][1]) for i in range(1, len(rows))]
data['dep2_1'] = [(rows[i][0], rows[i][1]) for i in range(len(rows))]
data['dep2_1_diff'] = [(rows[i][0], rows[i][1] - rows[i - 1][1]) for i in range(1, len(rows))]
self.stock_info['dep2_1'] = '신용잔고 누적'
self.stock_info['dep2_1_diff'] = '신용잔고 누적 차이'
# 투자자별 매매 동향
sql = "SELECT ymd, pri, fori, ins, ins0, ins1, ins2, ins3, ins4, ins5, cor FROM meta_2 where ymd>=? order by ymd"
self.cursor.execute(sql, (first_day,))
rows = self.cursor.fetchall()
if len(rows) >= limit_count:
data['pri'] = [(rows[i][0], rows[i][1]) for i in range(len(rows))]
data['fori'] = [(rows[i][0], rows[i][2]) for i in range(len(rows))]
data['ins'] = [(rows[i][0], rows[i][3]) for i in range(len(rows))]
data['ins0'] = [(rows[i][0], rows[i][4]) for i in range(len(rows))]
data['ins1'] = [(rows[i][0], rows[i][5]) for i in range(len(rows))]
data['ins2'] = [(rows[i][0], rows[i][6]) for i in range(len(rows))]
data['ins3'] = [(rows[i][0], rows[i][7]) for i in range(len(rows))]
data['ins4'] = [(rows[i][0], rows[i][8]) for i in range(len(rows))]
data['ins5'] = [(rows[i][0], rows[i][9]) for i in range(len(rows))]
data['cor'] = [(rows[i][0], rows[i][10]) for i in range(len(rows))]
self.stock_info['pri'] = '개인'
self.stock_info['fori'] = '외국인'
self.stock_info['ins'] = '기관합'
self.stock_info['ins0'] = '금융투자'
self.stock_info['ins1'] = '보험'
self.stock_info['ins2'] = '투신 (사모)'
self.stock_info['ins3'] = '은행'
self.stock_info['ins4'] = '기타금융기관'
self.stock_info['ins5'] = '연기금 등'
self.stock_info['cor'] = '기타법인'
# 환율
sql = "SELECT distinct code FROM meta_1"
self.cursor.execute(sql)
rows = self.cursor.fetchall()
exchange_codes = [row[0] for row in rows]
for exchange_code in exchange_codes:
sql = "SELECT ymd, price FROM meta_1 where code=? and ymd>=? order by ymd"
self.cursor.execute(sql, (exchange_code, first_day, ))
rows = self.cursor.fetchall()
if len(rows) >= limit_count:
data[exchange_code] = rows
self.stock_info[exchange_code] = exchange_code
# 원자재
sql = "SELECT distinct code FROM meta_5"
self.cursor.execute(sql)
rows = self.cursor.fetchall()
meterial_codes = [row[0] for row in rows]
for meterial_code in meterial_codes:
sql = "SELECT ymd, close FROM meta_5 where code=? and ymd>=? order by ymd"
self.cursor.execute(sql, (meterial_code, first_day, ))
rows = self.cursor.fetchall()
if len(rows) >= limit_count:
data[meterial_code] = rows
self.stock_info[meterial_code] = meterial_code
# 종목 종가
for stock_code in stock_codes:
sql = "SELECT ymd, close FROM stock where CODE=? and ymd>=? order by ymd"
self.cursor.execute(sql, (stock_code, first_day, ))
rows = self.cursor.fetchall()
if len(rows) >=limit_count:
data[stock_code] = rows
self.close()
return data
def debug(self, master_data, data):
master_days = [item[0] for item in master_data]
trimedData = {}
for i, stock_code in enumerate(data):
stock_data = data[stock_code]
stock_days = [item[0] for item in stock_data]
if len(master_days) < len(stock_days):
diff = set(stock_days) - set(master_days)
else:
diff = set(master_days) - set(stock_days)
tmp = []
for item in stock_data:
if item[0] not in diff:
tmp.append((item[0], item[1]))
if len(tmp) == len(master_data):
trimedData[stock_code] = tmp
return trimedData
def trim(self, master_data, data):
master_days = [item[0] for item in master_data]
trimedData = {}
for i, stock_code in enumerate(data):
stock_data = data[stock_code]
stock_days = [item[0] for item in stock_data]
intersection = set(stock_days) & set(master_days)
tmp = []
for item in stock_data:
if item[0] in intersection:
tmp.append((item[0], item[1]))
trimedData[stock_code] = tmp
return trimedData
def analyzeCorRelation(self, master_data, trimedData):
corr_scores = {}
master_days = [item[0] for item in master_data]
master_set = {}
for item in master_data:
master_set[item[0]] = item[1]
for stock_code in trimedData:
stock_data = trimedData[stock_code]
stock_days = [item[0] for item in stock_data]
stock_set = {}
for item in stock_data:
stock_set[item[0]] = item[1]
intersection = sorted(list(set(stock_days) & set(master_days)))
master_list = []
stock_list = []
for day in intersection:
master_list.append(master_set[day])
stock_list.append(stock_set[day])
lst = [master_list[3:], stock_list[:-3]]
df = pd.DataFrame(lst).T
corr = df.corr(method='pearson')
corr_scores[stock_code + "_" + self.stock_info[stock_code]] = corr.at[0, 1]
return corr_scores
def analyze(self, master_code, limit_count=1501):
stock_codes = self.get_all_stock_code()
master_data = self.load(master_code, limit_count)
first_day = master_data[0][0]
data = self.getClosePrice(stock_codes, first_day, limit_count)
#debugData = self.debug(master_data, data)
trimedData = self.trim(master_data, data)
corr_scores = self.analyzeCorRelation(master_data, trimedData)
return corr_scores
if __name__ == "__main__":
PROJECT_HOME = "."
RESOURCE_PATH = os.path.join(PROJECT_HOME, 'resources')
stockFileName = os.path.join(RESOURCE_PATH, 'stock.db')
spCorrelationAnalyzer = SPCorrelationAnalyzer(stockFileName)
inputs = [
{"stock_code": "122630", "stock_name": "KODEX 레버리지", "corr": []},
{"stock_code": "252670", "stock_name": "KODEX 200선물인버스2X", "corr": []},
{"stock_code": "^NDX", "stock_name": "NASDAQ 100", "corr": []},
{"stock_code": "TQQQ", "stock_name": "ProShares UltraPro QQQ", "corr": []},
{"stock_code": "SQQQ", "stock_name": "ProShares UltraPro Short QQQ", "corr": []},
{"stock_code": "SOXL", "stock_name": "Direxion Daily Semiconductor Bull 3X Shares", "corr": []},
{"stock_code": "SOXS", "stock_name": "Direxion Daily Semiconductor Bear -3X Shares", "corr": []}
]
for input in inputs:
corr_scores = spCorrelationAnalyzer.analyze(master_code=input["stock_code"])
corr_scores_list = sorted(corr_scores.items(), key=lambda item: item[1], reverse=True)
for item in corr_scores_list:
input["corr"].append({item[0]: item[1]})
print("%s,%s,%4.3f" % (input["stock_code"], item[0], item[1]))
outFileName = os.path.join(PROJECT_HOME, 'analyzer/corr.json')
with open(outFileName, 'w', encoding='utf-8') as file:
json.dump(inputs, file, indent="\t", ensure_ascii=False)
print('done...')

View File

@@ -22,4 +22,3 @@ pybithumb
ccxt
slack-sdk
scikit-learn
ta-lib

View File

@@ -13,7 +13,7 @@ import talib
import pandas as pd
from datetime import datetime, timedelta
from JSDPattern import JSDPattern
from stock.analysis.JSDPattern import JSDPattern
class JSDPattern_realtime (JSDPattern):

View File

@@ -15,7 +15,7 @@ from datetime import datetime, timedelta
from stock.analysis.IchimokuCloud import IchimokuCloud
from sklearn.preprocessing import StandardScaler
from JSDPattern import JSDPattern
from stock.analysis.JSDPattern import JSDPattern
class JSDPattern_simulation (JSDPattern):