Files
DeepStock/Simulation.py
dosangyoon d230a51652 init
2022-07-29 17:22:23 +09:00

232 lines
9.3 KiB
Python

from math import nan
from datetime import datetime, timedelta
import csv
import pandas as pd
import plotly.graph_objects as go
from plotly import subplots
import sqlite3
import os
from hts.BuySellChecker import BuySellChecker
class Simulation:
buySellChecker = None
stock_code = None
def __init__(self, RESOURCE_PATH, stock_code):
self.buySellChecker = BuySellChecker()
self.stock_code = stock_code
self.RESOURCE_PATH = RESOURCE_PATH
#self.connect()
return
def getCSV(self, fileName, given_day, result):
with open(fileName, 'r', encoding='utf-8') as infp:
reader = csv.reader(infp)
next(reader)
for rows in reader:
days = rows[0] # hts.날짜
time = rows[1] # hts.시간
open_v = rows[2] # hts.시가
high = rows[3] # hts.고가
low = rows[4] # hts.저가
close = rows[5] # hts.종가
vol = rows[6] # hts.거래량
start_time = datetime.strptime(given_day + " 090000", '%Y%m%d %H%M%S')
temp = datetime.strptime(str(days) + " " + str(time).zfill(4)+"00", '%Y%m%d %H%M%S')
#if temp < start_time:
# continue
result["time"].append(temp)
result["open"].append(int(open_v))
result["close"].append(int(close))
result["high"].append(int(high))
result["low"].append(int(low))
result["vol"].append(int(vol))
return
def getDBData(self, stock_code, lastday, result):
tableName = 'hts'
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, "hts.db"))
cursor = conn.cursor()
cursor.execute('SELECT ymd, hms, open, high, low, close, volume FROM ' + tableName + ' WHERE CODE=? and ymd=? order by ymd, hms', (stock_code, lastday,))
db_result = cursor.fetchall()
for rows in db_result:
ymd = rows[0] # hts.날짜
hms = rows[1] # hts.시간
open = rows[2] # hts.시가
high = rows[3] # hts.고가
low = rows[4] # hts.저가
close = rows[5] # hts.종가
vol = rows[6] # hts.거래량
temp = datetime.strptime(str(ymd) + " " + str(hms).zfill(4) + "00", '%Y%m%d %H%M%S')
result["time"].append(temp)
result["open"].append(int(open))
result["close"].append(int(close))
result["high"].append(int(high))
result["low"].append(int(low))
result["vol"].append(int(vol))
return
def draw(self, stock_code, given_day, data, bsLine):
# 어제 데이터는 지운다.
data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])]
buy_line = bsLine['buy'][381:]
sell_line = bsLine['sell'][381:]
#buy_line = bsLine['buy']
#sell_line = bsLine['sell']
# 그래프 설정을 위한 변수를 생성한다.
data = data.astype({'open': 'int',
'high': 'int',
'low': 'int',
'close': 'int',
'volume': 'int',
'avg3': 'float',
'avg5': 'float',
'avg10': 'float',
'avg20': 'float',
'avg30': 'float',
'avg60': 'float',
'fast_k': 'float',
'slow_k': 'float',
'slow_d': 'float',
'rsi': 'float',
'rsis': 'float'
})
buy_colors = []
for i in range(len(buy_line)):
if buy_line[i] < 0:
buy_colors.append("#ffffff")
buy_line[i] = nan
else:
buy_colors.append("#ff00ff")
sell_colors = []
for i in range(len(sell_line)):
if sell_line[i] < 0:
sell_colors.append("#ffffff")
sell_line[i] = nan
else:
sell_colors.append("#00ced1")
# 그래프를 설정한다.
buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=14, color=buy_colors, line_width=0))
sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0))
upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000')
lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000')
avg3 = go.Scatter(x=data['date'], y=data["avg3"], name="avg3", line_color='#1469F4')
avg5 = go.Scatter(x=data['date'], y=data["avg5"], name="avg5", line_color='#089B5B')
avg10 = go.Scatter(x=data['date'], y=data["avg10"], name="avg10", line_color='#ff00ff')
avg20 = go.Scatter(x=data['date'], y=data["avg20"], name="avg20", line_color='#8F8203')
avg30 = go.Scatter(x=data['date'], y=data["avg30"], name="avg30", line_color='#000000')
#avg60 = go.Scatter(x=hts['date'], y=hts["avg60"], name="avg60", line_color='#008000')
candle_stick = go.Candlestick(x=data['date'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue')
volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume')
#fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k')
macd_line = go.Scatter(x=data['date'], y=data["macd"], mode='lines', name='macd')
macd_s_line = go.Scatter(x=data['date'], y=data["macds"], mode='lines', name='macds')
macd_o_line = go.Scatter(x=data['date'], y=data["macdo"], mode='lines', name='macdo')
slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], mode='lines', name='slow_k')
slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], mode='lines', name='slow_d')
rsi_line = go.Scatter(x=data['date'], y=data["rsi"], mode='lines', name='rsi')
rsis_line = go.Scatter(x=data['date'], y=data["rsis"], mode='lines', name='rsis')
candle_data = [candle_stick, upper, lower, avg3, avg5, avg10, avg20, avg30, buy_check, sell_check]
volume_data = [volume_line]
macd_data = [macd_line, macd_s_line, macd_o_line]
stochastic_data = [slow_k_line, slow_d_line]
rsi_data = [rsi_line, rsis_line]
# 그래프를 그린다.
"""
fig = go.Figure(hts=candle_data)
fig.update_layout(title=stock_code + "_" + given_day)
fig.show()
"""
fig = subplots.make_subplots(rows=5, cols=1, subplot_titles=('캔들', "거래량", "MACD", "스토캐스틱", "RSI"))
for trace in candle_data:
fig.append_trace(trace, 1, 1)
for trace in volume_data:
fig.append_trace(trace, 2, 1)
for trace in macd_data:
fig.append_trace(trace, 3, 1)
for trace in stochastic_data:
fig.append_trace(trace, 4, 1)
for trace in rsi_data:
fig.append_trace(trace, 5, 1)
#fig.update_xaxes(nticks=5)
#fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False)
df = pd.DataFrame(bsLine)
df = df.fillna(-1)
buy_count = len(df.loc[df["buy"] > 0])
sell_count = len(df.loc[df["sell"] > 0])
fig.update_layout(height=5000, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count))
fig.show()
return
def simulate(self, today):
result = {"check": set(),
"time": [],
"open": [],
"close": [],
"high": [],
"low": [],
"vol": []}
# 데이터를 가지고 온다.
#self.getCSV(path + "/" + self.stock_code + "_" + last_day + ".csv", last_day, result)
#self.getCSV(path + "/" + self.stock_code + "_" + today + ".csv", last_day, result)
for i in range(1, 10):
last_day = (datetime.strptime(today, '%Y%m%d') - timedelta(i)).strftime('%Y%m%d')
self.getDBData(self.stock_code, last_day, result)
if len(result['time']) > 0:
break
self.getDBData(self.stock_code, today, result)
# 분석을 통해서 볼린저밴드 상/하단을 계산한다.
data = self.buySellChecker.analyze(result)
# 사야 할 시점과 팔아야 할 시점을 체크한다.
bsLine = self.buySellChecker.checkTransaction(data, self.stock_code)
# 그래프를 그린다.
self.draw(self.stock_code, today, data, bsLine)
return
if __name__ == "__main__":
PROJECT_HOME = os.path.join(os.path.dirname(__file__))
RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources")
# to check bying
stock_codes = {
# 252670
# 122630
"122630": ['20220725'],
}
for stock_code in stock_codes:
simulation = Simulation(RESOURCE_PATH, stock_code)
for given_day in stock_codes[stock_code]:
simulation.simulate(given_day)
print ("done...")