Files
DeepStock/stockpredictor/train/PricePredictor.py
dsyoon 890418a3ae init
2021-02-16 04:29:48 +09:00

158 lines
5.6 KiB
Python

import os
import json
import sqlite3
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers import LSTM
class PricePredictor:
trainingRate = 0.7
TYPE = 0 # OPEN = 0, CLOSE = 1, HIGH = 2, LOW = 3, VOLUME = 4
MAX_VALUE = 500000
MAX_VOLUME_VALUE = 300000000
def getNormalPrice(self, value):
value = value / self.MAX_VALUE
if value>1:
value = 1
return value
def getNormalVolume(self, value):
value = value / self.MAX_VOLUME_VALUE
if value > 1:
value = 1
return value
def read (self, inFileName):
conn = sqlite3.connect(inFileName)
cursor = conn.cursor()
stocks = []
rowid = 1
cursor.execute('SELECT * FROM stock WHERE rowid=?', (rowid,))
result = cursor.fetchone()
while result != None:
prices = json.loads(result[2])
stock = []
for price in prices:
stock.append([self.getNormalPrice(price['open']), self.getNormalPrice(price['close']), self.getNormalPrice(price['high']), self.getNormalPrice(price['low']), self.getNormalVolume(price['volume'])])
stocks.append(stock)
rowid += 1
cursor.execute('SELECT * FROM stock WHERE rowid=?', (rowid,))
result = cursor.fetchone()
cursor.close()
conn.close()
return stocks
def make_dataset(self, stocks, window_size=5):
feature = []
label = []
test_feature = []
test_label = []
for i in range(len(stocks)):
stock = stocks[i]
for j in range(len(stock) - window_size):
feature.append(stock[j:j+window_size])
label.append([stock[j+window_size][self.TYPE]])
test_feature.append(stock[len(stock)-window_size-1:len(stock)-1])
test_label.append(stock[len(stock)-1][self.TYPE])
index = int(len(feature) * self.trainingRate)
train_feature = feature[:index]
train_label = label[:index]
valid_feature = feature[index:]
valid_label = label[index:]
test_feature = feature[len(feature) - 1]
test_label = label[len(feature) - 1]
return np.array(train_feature), np.array(train_label), np.array(valid_feature), np.array(valid_label), np.array(test_feature), np.array(test_label)
def train(self, inFileName, model_path):
stocks = self.read(inFileName)
train_feature, train_label, valid_feature, valid_label, test_feature, test_label = self.make_dataset(stocks)
print (train_feature.shape)
print (train_label.shape)
model = Sequential()
model.add(LSTM(32,
input_shape=(train_feature.shape[1], train_feature.shape[2]),
activation='relu',
return_sequences=False))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
early_stop = EarlyStopping(monitor='val_loss', patience=5)
filename = ''
if self.TYPE==0 :
filename = os.path.join(model_path, 'open_checkpoint.h5')
elif self.TYPE==1 :
filename = os.path.join(model_path, 'close_checkpoint.h5')
elif self.TYPE==2 :
filename = os.path.join(model_path, 'high_checkpoint.h5')
elif self.TYPE==3 :
filename = os.path.join(model_path, 'low_checkpoint.h5')
checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='auto')
history = model.fit(train_feature, train_label,
epochs=1000,
batch_size=100000,
validation_data=(valid_feature, valid_label),
callbacks=[early_stop, checkpoint])
test_feature = np.array([test_feature])
predition = model.predict(test_feature)
print(predition[0], test_label)
print(predition[0] * self.MAX_VALUE, test_label * self.MAX_VALUE)
return
def predict(self, inFileName, model_path):
stocks = self.read(inFileName)
train_feature, train_label, valid_feature, valid_label, test_feature, test_label = self.make_dataset(stocks)
filename = ''
if self.TYPE==0 :
filename = os.path.join(model_path, 'open_checkpoint.h5')
elif self.TYPE==1 :
filename = os.path.join(model_path, 'close_checkpoint.h5')
elif self.TYPE==2 :
filename = os.path.join(model_path, 'high_checkpoint.h5')
elif self.TYPE==3 :
filename = os.path.join(model_path, 'low_checkpoint.h5')
model = Sequential()
model.add(LSTM(32,
input_shape=(train_feature.shape[1], train_feature.shape[2]),
activation='relu',
return_sequences=False))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.load_weights(filename)
test_feature = np.array([test_feature])
predition = model.predict(test_feature)
print(predition[0], test_label)
print(predition[0] * self.MAX_VALUE, test_label * self.MAX_VALUE)
return
if __name__ == "__main__":
PROJECT_HOME = "../.."
inFileName = PROJECT_HOME + '/resources/stock.db'
model_path = PROJECT_HOME + '/resources/model'
pricePredictor = PricePredictor()
pricePredictor.train(inFileName, model_path)
#pricePredictor.predict(inFileName, model_path)