import os import json import sqlite3 import numpy as np from keras.models import Sequential from keras.layers import Dense from keras.callbacks import EarlyStopping, ModelCheckpoint from keras.layers import LSTM class PricePredictor: trainingRate = 0.7 TYPE = 0 # OPEN = 0, CLOSE = 1, HIGH = 2, LOW = 3, VOLUME = 4 MAX_VALUE = 500000 MAX_VOLUME_VALUE = 300000000 def getNormalPrice(self, value): value = value / self.MAX_VALUE if value>1: value = 1 return value def getNormalVolume(self, value): value = value / self.MAX_VOLUME_VALUE if value > 1: value = 1 return value def read (self, inFileName): conn = sqlite3.connect(inFileName) cursor = conn.cursor() stocks = [] rowid = 1 cursor.execute('SELECT * FROM stock WHERE rowid=?', (rowid,)) result = cursor.fetchone() while result != None: prices = json.loads(result[2]) stock = [] for price in prices: stock.append([self.getNormalPrice(price['open']), self.getNormalPrice(price['close']), self.getNormalPrice(price['high']), self.getNormalPrice(price['low']), self.getNormalVolume(price['volume'])]) stocks.append(stock) rowid += 1 cursor.execute('SELECT * FROM stock WHERE rowid=?', (rowid,)) result = cursor.fetchone() cursor.close() conn.close() return stocks def make_dataset(self, stocks, window_size=5): feature = [] label = [] test_feature = [] test_label = [] for i in range(len(stocks)): stock = stocks[i] for j in range(len(stock) - window_size): feature.append(stock[j:j+window_size]) label.append([stock[j+window_size][self.TYPE]]) test_feature.append(stock[len(stock)-window_size-1:len(stock)-1]) test_label.append(stock[len(stock)-1][self.TYPE]) index = int(len(feature) * self.trainingRate) train_feature = feature[:index] train_label = label[:index] valid_feature = feature[index:] valid_label = label[index:] test_feature = feature[len(feature) - 1] test_label = label[len(feature) - 1] return np.array(train_feature), np.array(train_label), np.array(valid_feature), np.array(valid_label), np.array(test_feature), np.array(test_label) def train(self, inFileName, model_path): stocks = self.read(inFileName) train_feature, train_label, valid_feature, valid_label, test_feature, test_label = self.make_dataset(stocks) print (train_feature.shape) print (train_label.shape) model = Sequential() model.add(LSTM(32, input_shape=(train_feature.shape[1], train_feature.shape[2]), activation='relu', return_sequences=False)) model.add(Dense(1)) model.compile(loss='mean_squared_error', optimizer='adam') early_stop = EarlyStopping(monitor='val_loss', patience=5) filename = '' if self.TYPE==0 : filename = os.path.join(model_path, 'open_checkpoint.h5') elif self.TYPE==1 : filename = os.path.join(model_path, 'close_checkpoint.h5') elif self.TYPE==2 : filename = os.path.join(model_path, 'high_checkpoint.h5') elif self.TYPE==3 : filename = os.path.join(model_path, 'low_checkpoint.h5') checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='auto') history = model.fit(train_feature, train_label, epochs=1000, batch_size=100000, validation_data=(valid_feature, valid_label), callbacks=[early_stop, checkpoint]) test_feature = np.array([test_feature]) predition = model.predict(test_feature) print(predition[0], test_label) print(predition[0] * self.MAX_VALUE, test_label * self.MAX_VALUE) return def predict(self, inFileName, model_path): stocks = self.read(inFileName) train_feature, train_label, valid_feature, valid_label, test_feature, test_label = self.make_dataset(stocks) filename = '' if self.TYPE==0 : filename = os.path.join(model_path, 'open_checkpoint.h5') elif self.TYPE==1 : filename = os.path.join(model_path, 'close_checkpoint.h5') elif self.TYPE==2 : filename = os.path.join(model_path, 'high_checkpoint.h5') elif self.TYPE==3 : filename = os.path.join(model_path, 'low_checkpoint.h5') model = Sequential() model.add(LSTM(32, input_shape=(train_feature.shape[1], train_feature.shape[2]), activation='relu', return_sequences=False)) model.add(Dense(1)) model.compile(loss='mean_squared_error', optimizer='adam') model.load_weights(filename) test_feature = np.array([test_feature]) predition = model.predict(test_feature) print(predition[0], test_label) print(predition[0] * self.MAX_VALUE, test_label * self.MAX_VALUE) return if __name__ == "__main__": PROJECT_HOME = "../.." inFileName = PROJECT_HOME + '/resources/stock.db' model_path = PROJECT_HOME + '/resources/model' pricePredictor = PricePredictor() pricePredictor.train(inFileName, model_path) #pricePredictor.predict(inFileName, model_path)