from math import nan
import pandas as pd
import plotly.graph_objects as go
from plotly import subplots
import os
import sqlite3
from hts.HTS import HTS
from stock.util.Stock2Vector import Stock2Vector
from stock.util.LabelChecker import LabelChecker
from hts.BuySellChecker_122630 import BuySellChecker_122630
from hts.BuySellChecker_233740 import BuySellChecker_233740
from hts.BuySellChecker_251340 import BuySellChecker_251340
from hts.BuySellChecker_252670 import BuySellChecker_252670
class Simulation (HTS):
stock2Vector = None
buySellChecker = None
def __init__(self, RESOURCE_PATH, stock_code):
super().__init__(RESOURCE_PATH)
self.RESOURCE_PATH = RESOURCE_PATH
self.buySellChecker = None
if stock_code == '122630':
self.buySellChecker = BuySellChecker_122630()
elif stock_code == '233740':
self.buySellChecker = BuySellChecker_233740()
elif stock_code == '251340':
self.buySellChecker = BuySellChecker_251340()
elif stock_code == '252670':
self.buySellChecker = BuySellChecker_252670()
try:
self.stock2Vector = Stock2Vector(RESOURCE_PATH)
self.labelChecker = LabelChecker(RESOURCE_PATH)
except:
pass
#self.connect()
return
def draw(self, stock_code, given_day, data, bsLine):
if bsLine is None:
return
# 어제 데이터는 지운다.
data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])]
buy_line = bsLine['buy'][len(bsLine['buy'])-len(data):]
buy_weight_line = bsLine['buy_weight'][len(bsLine['buy'])-len(data):]
sell_line = bsLine['sell'][len(bsLine['buy'])-len(data):]
# 그래프 설정을 위한 변수를 생성한다.
data = data.astype({'open': 'int', 'high': 'int', 'low': 'int', 'close': 'int', 'volume': 'int',
'avg5': 'float', 'avg20': 'float', 'avg60': 'float', 'avg120': 'float', 'avg200': 'float',
'disparity_avg5': 'float', 'disparity_avg20': 'float', 'disparity_avg60': 'float', 'disparity_avg120': 'float', 'disparity_avg200': 'float',
'fast_k': 'float', 'slow_k': 'float', 'slow_d': 'float',
'macd': 'float', 'macds': 'float', 'macdo': 'float',
'rsi': 'float', 'rsis': 'float'
})
buy_size = []
buy_colors = []
for i in range(len(buy_line)):
if buy_line[i] < 0:
buy_colors.append("#ffffff")
buy_line[i] = nan
buy_size.append(0)
else:
buy_colors.append("#0C752E")
buy_size.append(10 + (5 * buy_weight_line[i]))
sell_colors = []
for i in range(len(sell_line)):
if sell_line[i] < 0:
sell_colors.append("#ffffff")
sell_line[i] = nan
else:
sell_colors.append("#00ced1")
volume_colors = []
for i in range(len(buy_line)):
if data['open'][i] > data['close'][i]:
volume_colors.append("#0000FF")
elif data['open'][i] < data['close'][i]:
volume_colors.append("#FF0000")
else:
volume_colors.append("#000000")
# 그래프를 설정한다.
buy_check = go.Scatter(x=data['date'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0))
sell_check = go.Scatter(x=data['date'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0))
#volume_line = go.Scatter(x=data['date'], y=data["volume"], mode='lines', name='volume')
volume_line = go.Bar(x=data['date'], y=data["volume"], marker_color=volume_colors, name='volume')
disparity_avg5 = go.Scatter(x=data['date'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#F81191')
disparity_avg20 = go.Scatter(x=data['date'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#097F19')
disparity_avg30 = go.Scatter(x=data['date'], y=data["disparity_avg30"], name="disparity_avg30", line_color='#097F19')
disparity_avg60 = go.Scatter(x=data['date'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#671BEA')
disparity_avg120 = go.Scatter(x=data['date'], y=data["disparity_avg120"], name="disparity_avg120", line_color='#DFB809')
disparity_avg200 = go.Scatter(x=data['date'], y=data["disparity_avg200"], name="disparity_avg200", line_color='#000000')
macd_line = go.Scatter(x=data['date'], y=data["macd"], line=dict(color='red', width=2), name='macd')
macd_s_line = go.Scatter(x=data['date'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds')
macd_o_line = go.Bar(x=data['date'], y=data["macdo"], marker_color='purple', name='macdo')
# fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k')
slow_k_line = go.Scatter(x=data['date'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k')
slow_d_line = go.Scatter(x=data['date'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
rsi_line = go.Scatter(x=data['date'], y=data["rsi"], line=dict(color='red', width=2), name='rsi')
rsis_line = go.Scatter(x=data['date'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis')
upper = go.Scatter(x=data['date'], y=data["upper"], name="upper", line_color='#000000')
lower = go.Scatter(x=data['date'], y=data["lower"], name="lower", line_color='#000000')
avg5 = go.Scatter(x=data['date'], y=data["avg5"], name="avg5", line_color='#F81191')
avg20 = go.Scatter(x=data['date'], y=data["avg20"], name="avg20", line_color='#097F19')
avg30 = go.Scatter(x=data['date'], y=data["avg30"], name="avg30", line_color='#097F19')
avg60 = go.Scatter(x=data['date'], y=data["avg60"], name="avg60", line_color='#671BEA')
avg120 = go.Scatter(x=data['date'], y=data["avg120"], name="avg120", line_color='#DFB809')
avg200 = go.Scatter(x=data['date'], y=data["avg200"], name="avg200", line_color='#000000')
changeLine = go.Scatter(x=data['date'], y=data["changeLine"], name='changeLine', line_color='#14A200') #CF6E0D
baseLine = go.Scatter(x=data['date'], y=data["baseLine"], name='baseLine', line_color='orange')
laggingSpan = go.Scatter(x=data['date'], y=data["laggingSpan"], name='laggingSpan', line_color='#B50ABB')
leadingSpan1 = go.Scatter(x=data['date'], y=data["leadingSpan1"], name='leadingSpan1', line_color='black')
leadingSpan2 = go.Scatter(x=data['date'], y=data["leadingSpan2"], name='leadingSpan2', line_color='black')
text_list = []
for i in range(len(data['macd'])):
text = "{}
open: {}
close: {}
high: {}
low: {}
volume: {}
avg5: {:.2f}
avg20: {:.2f}
avg60: {:.2f}
avg200: {:.2f}
d_avg5: {:.6f}
d_avg20: {:.6f}
d_avg60: {:.6f}
d_avg200: {:.6f}
d_avg5_60: {:.6f}
d_avg5_200: {:.6f}
d_avg200-5: {:.6f}
macd: {:.2f}
slow_k: {:.2f}
rsi: {:.2f}".format(
data['date'][i],
data['open'][i],data['close'][i],data['high'][i],data['low'][i],data['volume'][i],
data['avg5'][i],data['avg20'][i],data['avg60'][i],data['avg200'][i],
data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity_avg200'][i],
max(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i]) - min(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i]),
max(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity_avg200'][i]) - min(data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity_avg200'][i]),
abs(data['disparity_avg200'][i] - data['disparity_avg5'][i]),
data['macd'][i],data['slow_k'][i],data['rsi'][i]
)
text_list.append(text)
candle_stick = go.Candlestick(x=data['date'],
open=data['open'],
high=data['high'],
low=data['low'],
close=data['close'],
increasing_line_color='red',
decreasing_line_color='blue',
showlegend=False,
text=text_list,
hoverinfo="text"
)
# candle_data = [avg5, avg20, avg30, avg60, avg120, avg200, buy_check, sell_check, laggingSpan, changeLine, baseLine, upper, lower, candle_stick]
# candle_data = [avg5, avg20, avg30, avg60, avg200, buy_check, sell_check, upper, lower, candle_stick]
# candle_data = [buy_check, sell_check, changeLine, baseLine, changeLine, laggingSpan, candle_stick]
candle_data = [avg5, avg200, buy_check, sell_check, candle_stick]
volume_data = [volume_line]
disparity_data = [disparity_avg5, disparity_avg20, disparity_avg30, disparity_avg60, disparity_avg120, disparity_avg200]
macd_data = [macd_line, macd_s_line, macd_o_line]
stochastic_data = [slow_k_line, slow_d_line]
rsi_data = [rsi_line, rsis_line]
# 그래프를 그린다.
"""
fig = go.Figure(data=candle_data)
fig.update_layout(title=stock_code + "_" + given_day)
fig.show()
"""
fig = subplots.make_subplots(
rows=6, cols=1,
subplot_titles=("거래량", "이격도", "스토캐스틱", "RSI", "MACD", '캔들'),
#specs=[[{}], [{}], [{}], [{}], [{}], [{}]],
shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01,
row_heights=[200, 200, 200, 200, 200, 700]
)
for trace in volume_data:
fig.append_trace(trace, 1, 1)
for trace in disparity_data:
fig.append_trace(trace, 2, 1)
for trace in stochastic_data:
fig.append_trace(trace, 3, 1)
for trace in rsi_data:
fig.append_trace(trace, 4, 1)
for trace in macd_data:
fig.append_trace(trace, 5, 1)
for trace in candle_data:
fig.append_trace(trace, 6, 1)
#fig.update_xaxes(nticks=5)
#fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False)
df = pd.DataFrame(bsLine)
df = df.fillna(-1)
buy_count = len(df.loc[df["buy"] > 0])
sell_count = len(df.loc[df["sell"] > 0])
fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count))
#fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count))
fig.show()
return
def makeTickData(self, data, mins=30):
result = {"check": set(),
"time": [],
"open": [],
"close": [],
"high": [],
"low": [],
"vol": [],
"label": []}
for i in range(mins, len(data['time'])+1):
result["check"].add(data['time'][i-1])
result["time"].append(data['time'][i-1])
result["open"].append(data['open'][i-mins])
result["close"].append(data['close'][i-1])
result["high"].append(max(data['high'][i - mins: i]))
result["low"].append(min(data['low'][i - mins: i]))
result["vol"].append(sum(data['vol'][i - mins: i]))
return result
def getLIMITInfo(self, stock_code, ymd, dbfile_name="stock.db"):
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, dbfile_name))
cursor = conn.cursor()
cursor.execute('select ymd, open, close, high, low, volume from stock where code=? order by ymd desc limit ?', (stock_code, 100, ))
db_result = cursor.fetchall()
cursor.close()
conn.close()
match = False
LIMIT_PRICE = []
for i, rows in enumerate(db_result):
if rows[0].replace('.', '') <= ymd:
match = True
if match:
LIMIT_PRICE.append(rows[2])
return {'LIMIT_PRICE': sum(LIMIT_PRICE[:20])/len(LIMIT_PRICE[:20])}
def simulate(self, stock, analyzed_day=1000):
stock_code = stock['code']
for ymd in stock['ymd']:
LAST_DATA = self.stock2Vector.getLastData(stock_code, ymd)
# 1분봉
result = self.stock2Vector.getRealTime(stock_code, ymd, LAST_DATA)
# 5분봉
#result = self.makeTickData(result, mins=5)
# 30분봉
#result = self.makeTickData(result, mins=30)
data = self.buySellChecker.analyze(result)
data.drop(data.index[:len(data) - analyzed_day], inplace=True)
# 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다.
#data_5 = self.buySellChecker.analyze(result_5)
# 분석일 데이터만 활용한다 (이전 데이터는 제거)
#data_5.drop(data_5.index[:len(data_5) - analyzed_day], inplace=True)
#data_30 = self.buySellChecker.analyze(result_30)
# 분석일 데이터만 활용한다 (이전 데이터는 제거)
#data_30.drop(data_30.index[:len(data_30) - analyzed_day], inplace=True)
# 사야 할 시점과 팔아야 할 시점을 체크한다.
#bsLine = self.buySellChecker.checkTransaction(stock_code, data, data_5, data_30, isRealTime=False)
INFO = self.getLIMITInfo(stock_code, ymd)
# 어제 데이터는 지운다.
#data = data.loc[pd.DatetimeIndex(data.index).day == int(given_day[6:])]
bsLine = self.buySellChecker.checkTransaction(stock_code, data, INFO, isRealTime=False)
# 그래프를 그린다.
self.draw(stock_code, ymd, data, bsLine)
return
if __name__ == "__main__":
PROJECT_HOME = os.getcwd()
RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources")
day_list = ['20231016', '20231017', '20231018', '20231019', '20231020',
'20231023', '20231024', '20231025', '20231026', '20231027',
'20231030', '20231031', '20231101', '20231102']
# to check bying
stock = {'code': '252670', 'name': 'KODEX 200선물인버스2X', 'ymd': day_list}
simulation = Simulation(RESOURCE_PATH, stock['code'])
simulation.simulate(stock)
#stock = {'code': '122630', 'name': 'KODEX 레버리지', 'ymd': day_list}
#simulation = Simulation(RESOURCE_PATH, stock['code'])
#simulation.simulate(stock)
#stock = {'code': '233740', 'name': 'KODEX 코스닥150레버리지', 'ymd': day_list}
#simulation = Simulation(RESOURCE_PATH, stock['code'])
#simulation.simulate(stock)
#stock = {'code': '251340', 'name': 'KODEX 코스닥150선물인버스', 'ymd': day_list}
#simulation = Simulation(RESOURCE_PATH, stock['code'])
#simulation.simulate(stock)
print ("done...")