This commit is contained in:
dsyoon
2023-01-15 14:56:34 +09:00
parent 5821f7bfa5
commit 11593dd324
8 changed files with 752 additions and 798 deletions

View File

@@ -2,7 +2,7 @@ import time
import os
from datetime import datetime
from hts.DailyStatus import DailyStatus
from stock.analysis.DailyStatus import DailyStatus
from hts.HTS import HTS
from hts.OrderType import OrderType

View File

@@ -1,4 +1,3 @@
import numpy as np
from math import nan
import pandas as pd
import plotly.graph_objects as go
@@ -9,6 +8,7 @@ from hts.HTS import HTS
from stock.util.Stock2Vector import Stock2Vector
from stock.util.LabelChecker import LabelChecker
from stock.util.StockPredictor import StockPredictor
from stock.analysis.DailyStatus import DailyStatus
from hts.BuySellChecker import BuySellChecker
class Simulation (HTS):
@@ -163,36 +163,24 @@ class Simulation (HTS):
return
def simulate(self, stock_code, today, method="rule"):
def simulate(self, stock_codes:dict=None):
if method == "answer":
#self.labelMaker.makeCandidate(stock_code, today, view=True)
self.labelChecker.showLabels(stock_code, today)
else:
if method == "ml":
LAST_DATA = self.stock2Vector.getLastData(stock_code, today, n=3)
result = self.stock2Vector.getRealTime(stock_code, today, LAST_DATA)
X, Y = self.stock2Vector.getVectorData(result)
predY = self.stockPredictor.predict(X, Y)
predY = np.argmax(predY, axis=1)
# 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다.
data = self.buySellChecker.analyze(result)
# 사야 할 시점과 팔아야 할 시점을 체크한다.
bsLine, data = self.buySellChecker.checkTransactionML(data, stock_code, predY, isRealTime=False)
else:
LAST_DATA = self.stock2Vector.getLastData(stock_code, today)
result = self.stock2Vector.getRealTime(stock_code, today, LAST_DATA)
if stock_codes is not None:
for stock_code in stock_codes:
for given_day in stock_codes[stock_code]:
LAST_DATA = self.stock2Vector.getLastData(stock_code, given_day)
result = self.stock2Vector.getRealTime(stock_code, given_day, LAST_DATA)
# 이동평균, RSI, MACD, 일목균형, 볼린저밴드 상/하단을 계산한다.
data = self.buySellChecker.analyze(result)
# 사야 할 시점과 팔아야 할 시점을 체크한다.
bsLine, data = self.buySellChecker.checkTransaction(data, stock_code, isRealTime=False)
if data is not None:
# 그래프를 그린다.
self.draw(stock_code, today, data, bsLine)
self.draw(stock_code, given_day, data, bsLine)
else:
dailyStatus = DailyStatus(self.RESOURCE_PATH)
dailyStatus.checkEnvelope()
return
@@ -201,6 +189,8 @@ if __name__ == "__main__":
PROJECT_HOME = os.getcwd()
RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources")
simulation = Simulation(RESOURCE_PATH)
# to check bying
stock_codes = {
"252670": [
@@ -208,24 +198,7 @@ if __name__ == "__main__":
'20220908','20220913','20220914','20220915','20220916'
]
}
"""
# 122630
"252670": [
'20220801', '20220802', '20220803', '20220804', '20220805',
'20220808', '20220809', '20220810', '20220811', '20220812',
'20220816', '20220817', '20220818', '20220819', '20220822',
'20220823', '20220824', '20220825', '20220826', '20220829',
'20220830', '20220831',
'20220901', '20220902', '20220905', '20220906','20220907',
'20220908'
],
"""
method = "rule" # "rule", "ml", "answer"
for stock_code in stock_codes:
simulation = Simulation(RESOURCE_PATH)
for given_day in stock_codes[stock_code]:
simulation.simulate(stock_code, given_day, method=method)
#simulation.simulate(stock_codes)
simulation.simulate()
print ("done...")

View File

@@ -1,388 +0,0 @@
import numpy as np
from math import nan
import shutil
import sqlite3
import pandas as pd
import plotly.graph_objects as go
from plotly import subplots
import plotly.io as po
import os
from datetime import datetime
from hts.HTS import HTS
from hts.DailyStatus import DailyStatus
from hts.BuySellChecker import BuySellChecker
class Simulation (HTS):
buySellChecker = None
stockPredictor = None
dailyStatus = None
def __init__(self, RESOURCE_PATH):
super().__init__(RESOURCE_PATH)
self.RESOURCE_PATH = RESOURCE_PATH
self.buySellChecker = BuySellChecker()
self.dailyStatus = DailyStatus(RESOURCE_PATH)
return
def draw(self, stock_code, given_day, data, bsLine):
if bsLine is None:
return
# 어제 데이터는 지운다.
buy_line = bsLine['buy']
buy_weight_line = bsLine['buy_weight']
sell_line = bsLine['sell']
buy_size = []
buy_colors = []
for i in range(len(buy_line)):
if buy_line[i] < 0:
buy_colors.append("#ffffff")
buy_line[i] = nan
buy_size.append(0)
else:
buy_colors.append("#B2028C")
buy_size.append(10 + (0.1 * buy_weight_line[i]))
sell_colors = []
for i in range(len(sell_line)):
if sell_line[i] < 0:
sell_colors.append("#ffffff")
sell_line[i] = nan
else:
sell_colors.append("#00ced1")
# 그래프를 설정한다.
buy_check = go.Scatter(x=data['ymd'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0))
sell_check = go.Scatter(x=data['ymd'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0))
envelope_upper = go.Scatter(x=data['ymd'], y=data["envelope_upper"], name="upper", line_color='#000000')
envelope_middle = go.Scatter(x=data['ymd'], y=data["envelope_middle"], name="upper", line_color='#927786')
envelope_lower = go.Scatter(x=data['ymd'], y=data["envelope_lower"], name="lower", line_color='#000000')
avg5 = go.Scatter(x=data['ymd'], y=data["avg5"], name="avg5", line_color='#6C2507')
avg20 = go.Scatter(x=data['ymd'], y=data["avg20"], name="avg20", line_color='#f84c43')
avg60 = go.Scatter(x=data['ymd'], y=data["avg60"], name="avg60", line_color='#f89543')
candle_stick = go.Candlestick(x=data['ymd'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False)
macd_line = go.Scatter(x=data['ymd'], y=data["macd"], line=dict(color='red', width=2), name='macd')
macd_s_line = go.Scatter(x=data['ymd'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds')
# fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k')
slow_k_line = go.Scatter(x=data['ymd'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k')
slow_d_line = go.Scatter(x=data['ymd'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
rsi_line = go.Scatter(x=data['ymd'], y=data["rsi"], line=dict(color='red', width=2), name='rsi')
rsis_line = go.Scatter(x=data['ymd'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis')
disparity_avg5 = go.Scatter(x=data['ymd'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203')
disparity_avg20 = go.Scatter(x=data['ymd'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff')
disparity_avg60 = go.Scatter(x=data['ymd'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4')
candle_data = [candle_stick, avg5, avg20, avg60, envelope_upper, envelope_middle, envelope_lower, buy_check, sell_check]
disparity_data = [disparity_avg5, disparity_avg20, disparity_avg60]
macd_data = [macd_line, macd_s_line]
stochastic_data = [slow_k_line, slow_d_line]
rsi_data = [rsi_line, rsis_line]
# 그래프를 그린다.
"""
fig = go.Figure(data=candle_data)
fig.update_layout(title=stock_code + "_" + given_day)
fig.show()
"""
fig = subplots.make_subplots(
rows=5, cols=1,
subplot_titles=("MACD", "RSI", "스토캐스틱", '이격도', '캔들'),
#specs=[[{}], [{}], [{}], [{}], [{}], [{}]],
shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01,
row_heights=[200, 200, 200, 200, 750]
)
for trace in macd_data:
fig.append_trace(trace, 1, 1)
for trace in rsi_data:
fig.append_trace(trace, 2, 1)
for trace in stochastic_data:
fig.append_trace(trace, 3, 1)
for trace in disparity_data:
fig.append_trace(trace, 4, 1)
for trace in candle_data:
fig.append_trace(trace, 5, 1)
#fig.update_xaxes(nticks=5)
#fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False)
df = pd.DataFrame(bsLine)
df = df.fillna(-1)
buy_count = len(df.loc[df["buy"] > 0])
sell_count = len(df.loc[df["sell"] > 0])
fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count))
#fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count))
fig.show()
return
def writeFile(self, dailyDirName, stock_code, stock_name, given_day, data, bsLine):
if bsLine is None:
return
# 어제 데이터는 지운다.
buy_line = bsLine['buy']
buy_weight_line = bsLine['buy_weight']
sell_line = bsLine['sell']
buy_size = []
buy_colors = []
for i in range(len(buy_line)):
if buy_line[i] < 0:
buy_colors.append("#ffffff")
buy_line[i] = nan
buy_size.append(0)
else:
buy_colors.append("#B2028C")
buy_size.append(10 + (0.1 * buy_weight_line[i]))
sell_colors = []
for i in range(len(sell_line)):
if sell_line[i] < 0:
sell_colors.append("#ffffff")
sell_line[i] = nan
else:
sell_colors.append("#00ced1")
# 그래프를 설정한다.
buy_check = go.Scatter(x=data['ymd'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0))
sell_check = go.Scatter(x=data['ymd'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0))
envelope_upper = go.Scatter(x=data['ymd'], y=data["envelope_upper"], name="upper", line_color='#000000')
envelope_middle = go.Scatter(x=data['ymd'], y=data["envelope_middle"], name="upper", line_color='#927786')
envelope_lower = go.Scatter(x=data['ymd'], y=data["envelope_lower"], name="lower", line_color='#000000')
avg5 = go.Scatter(x=data['ymd'], y=data["avg5"], name="avg5", line_color='#6C2507')
avg20 = go.Scatter(x=data['ymd'], y=data["avg20"], name="avg20", line_color='#f84c43')
avg60 = go.Scatter(x=data['ymd'], y=data["avg60"], name="avg60", line_color='#f89543')
candle_stick = go.Candlestick(x=data['ymd'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False)
macd_line = go.Scatter(x=data['ymd'], y=data["macd"], line=dict(color='red', width=2), name='macd')
macd_s_line = go.Scatter(x=data['ymd'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds')
# fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k')
slow_k_line = go.Scatter(x=data['ymd'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k')
slow_d_line = go.Scatter(x=data['ymd'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
rsi_line = go.Scatter(x=data['ymd'], y=data["rsi"], line=dict(color='red', width=2), name='rsi')
rsis_line = go.Scatter(x=data['ymd'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis')
disparity_avg5 = go.Scatter(x=data['ymd'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203')
disparity_avg20 = go.Scatter(x=data['ymd'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff')
disparity_avg60 = go.Scatter(x=data['ymd'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4')
candle_data = [candle_stick, avg5, avg20, avg60, envelope_upper, envelope_middle, envelope_lower, buy_check, sell_check]
disparity_data = [disparity_avg5, disparity_avg20, disparity_avg60]
macd_data = [macd_line, macd_s_line]
stochastic_data = [slow_k_line, slow_d_line]
rsi_data = [rsi_line, rsis_line]
# 그래프를 그린다.
"""
fig = go.Figure(data=candle_data)
fig.update_layout(title=stock_code + "_" + given_day)
fig.show()
"""
fig = subplots.make_subplots(
rows=5, cols=1,
subplot_titles=("MACD", "RSI", "스토캐스틱", '이격도', '캔들'),
#specs=[[{}], [{}], [{}], [{}], [{}], [{}]],
shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01,
row_heights=[200, 200, 200, 200, 750]
)
for trace in macd_data:
fig.append_trace(trace, 1, 1)
for trace in rsi_data:
fig.append_trace(trace, 2, 1)
for trace in stochastic_data:
fig.append_trace(trace, 3, 1)
for trace in disparity_data:
fig.append_trace(trace, 4, 1)
for trace in candle_data:
fig.append_trace(trace, 5, 1)
#fig.update_xaxes(nticks=5)
#fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False)
df = pd.DataFrame(bsLine)
df = df.fillna(-1)
buy_count = len(df.loc[df["buy"] > 0])
sell_count = len(df.loc[df["sell"] > 0])
fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count))
#fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count))
#fig.show()
fileName = "%s/%s_%s.html" % (dailyDirName, stock_code, stock_name.replace(" ", ""))
po.write_html(fig, file=fileName, auto_open=False)
return
def getData(self, today, stock):
close = stock['PRICE'][len(stock['PRICE']) - 1]["close"]
open = stock['PRICE'][len(stock['PRICE']) - 1]["open"]
high = stock['PRICE'][len(stock['PRICE']) - 1]["high"]
low = stock['PRICE'][len(stock['PRICE']) - 1]["low"]
volume = stock['PRICE'][len(stock['PRICE']) - 1]["volume"]
stock['PRICE'].append(
{
"ymd": today,
"close": close,
"diff": stock['PRICE'][len(stock['PRICE']) - 1]["close"] - close,
"open": open,
"high": high,
"low": low,
"volume": volume,
"avg3": -1,
"avg4": -1,
"avg5": -1,
"avg6": -1,
"avg10": -1,
"avg12": -1,
"avg20": -1,
"avg36": -1,
"avg40": -1,
"avg48": -1,
"avg60": -1,
"avg120": -1,
"avg200": -1,
"avg240": -1,
"avg300": -1,
"disparity_avg5": -1,
"disparity_avg10": -1,
"disparity_avg20": -1,
"disparity_avg60": -1,
"disparity_avg120": -1,
"bolingerband_upper": -1,
"bolingerband_lower": -1,
"bolingerband_middle": -1,
"envelope_upper": -1,
"envelope_lower": -1,
"envelope_middle": -1,
"ichimokucloud_changeLine": -1,
"ichimokucloud_baseLine": -1,
"ichimokucloud_leadingSpan1": -1,
"ichimokucloud_leadingSpan2": -1,
"stochastic_fast_k": -1,
"stochastic_slow_k": -1,
"stochastic_slow_d": -1,
"rsi": -1,
"rsis": -1,
"macd": -1,
"macds": -1,
"macdo": -1,
}
)
return
def simulate(self, stock_code=None, isRealTime=False):
if not isRealTime:
n = 200
else:
n = 200
if stock_code is not None:
stock = self.dailyStatus.getLastData(stock_code, n)
today = datetime.today().strftime('%Y%m%d')
self.getData(today, stock)
analyzed_day = 60
data = self.dailyStatus.analyze(stock, analyzed_day)
# 분석일 데이터만 활용한다 (이전 데이터는 제거)
data.drop(data.index[:analyzed_day], inplace=True)
# print logs
for i in range(len(data.index)):
print (i, data.index[i], data['macd'][i], data['slow_k'][i], data['gradients_low'][i], data['gradients_avg5'][i], data['gradients_avg20'][i], data['gradients_avg60'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity'][i], data['disparity_type'][i])
bsLine, data = self.buySellChecker.checkTransactionWithEnvelope(data, stock_code, isRealTime=False)
# 그래프를 그린다.
self.draw(stock_code, today, data, bsLine)
else:
stockTableName = 'stock'
PROJECT_HOME = os.path.join(os.path.dirname(__file__))
stockFileName = PROJECT_HOME + '/resources/stock.db'
conn = sqlite3.connect(stockFileName)
cursor = conn.cursor()
cursor.execute('SELECT distinct code, name FROM ' + stockTableName + ' order by code')
items = cursor.fetchall()
cursor.close()
conn.close()
today = datetime.today().strftime('%Y%m%d')
dailyDirName = os.path.join(RESOURCE_PATH, 'analysis', today, 'daily')
if os.path.exists(dailyDirName):
shutil.rmtree(dailyDirName)
dailyDirName = os.path.join(RESOURCE_PATH, 'analysis', today)
if not os.path.isdir(dailyDirName):
os.mkdir(dailyDirName)
dailyDirName = os.path.join(dailyDirName, 'daily')
if not os.path.isdir(dailyDirName):
os.mkdir(dailyDirName)
for idx, item in enumerate(items):
stock_code = item[0]
stock_name = item[1]
print(idx, stock_code, stock_name)
print("Analysis # :", idx, ", CODE: ", stock_code, ", NAME: ", stock_name)
stock = self.dailyStatus.getLastData(stock_code, n)
today = datetime.today().strftime('%Y%m%d')
self.getData(today, stock)
analyzed_day = 60
data = self.dailyStatus.analyze(stock, analyzed_day)
# 분석일 데이터만 활용한다 (이전 데이터는 제거)
data.drop(data.index[:analyzed_day], inplace=True)
# print logs
# for i in range(len(data.index)):
# print (i, data.index[i], data['macd'][i], data['slow_k'][i], data['gradients_low'][i], data['gradients_avg5'][i], data['gradients_avg20'][i], data['gradients_avg60'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity'][i], data['disparity_type'][i])
bsLine, data = self.buySellChecker.checkTransactionWithEnvelope(data, stock_code, isRealTime=False)
# 그래프를 그린다.
if len(data.index) > 10 and max(bsLine['buy'][len(bsLine['buy'])-2:]) > 0:
self.writeFile(dailyDirName, stock_code, stock_name, today, data, bsLine)
return
if __name__ == "__main__":
PROJECT_HOME = os.getcwd()
RESOURCE_PATH = os.path.join(PROJECT_HOME, "resources")
simulation = Simulation(RESOURCE_PATH)
# to check buying
#stock_codes = ["252670", "122630"]
#stock_codes = ["051600", "139130", "066570", "000990"]
stock_codes = ["102950"]
"""
for stock_code in stock_codes:
simulation.simulate(stock_code)
"""
simulation.simulate()
print ("done...")

View File

@@ -7,6 +7,7 @@ from stock.crawler.FnGuideCrawler import FnGuideCrawler
from stock.crawler.MetaCrawler import MetaCrawler
from stock.crawler.StockCrawler import StockCrawler
from stock.analysis.AnalyzerSqlite import AnalyzerSqlite
from stock.analysis.DailyStatus import DailyStatus
today = datetime.now().strftime("%Y-%m-%d")
@@ -16,7 +17,8 @@ PROJECT_HOME = os.getcwd()
START_DATE = "2000.01.01"
start = time.time()
stockFileName = PROJECT_HOME + '/resources/stock.db'
RESOURCE_PATH = os.path.join(PROJECT_HOME, 'resources')
stockFileName = os.path.join(RESOURCE_PATH, 'stock.db')
week = datetime.today().weekday()
@@ -71,9 +73,13 @@ if week in (0, 1, 2, 3, 4): # 0:월, 1:화, 2:수, 3:목, 4:금, 5:토, 6:일
if os.path.isdir(outPath):
shutil.rmtree(outPath)
os.mkdir(outPath)
print("print to Html...")
analyzerSqlite.findCandidate(outPath)
print("print to Html...")
# envelopes를 이용한 daily check
dailyStatus = DailyStatus(RESOURCE_PATH)
dailyStatus.checkEnvelope()
analyzerSqlite.findCandidate(outPath)
print("time : %6.2f", (time.time() - start))
print ("done...")

View File

@@ -1071,12 +1071,27 @@ class BuySellChecker:
bsLine['sell'] = [-1 for i in range(size)]
bsLine['sell_weight'] = [-1 for i in range(size)]
gap_interval = 60
gap_state = False
for i in range(size):
if isRealTime:
if i < size - 1:
continue
if i > 10:
# 만약 전일 저가와 오늘 종의 차이가 1만원이 넘으면 향후 60일은 분석하지 않는다.
if data['low'][i-1] - data['high'][i] > 10000:
gap_state = True
gap_interval -= 1
continue
if gap_state:
if gap_interval <= 0:
gap_state = False
gap_interval = 60
else:
gap_interval -= 1
continue
if data['disparity'][i] < 2:
check = True
for l in range(i-3, i):

View File

@@ -1,356 +0,0 @@
import os.path
import pandas as pd
import platform
if platform.system().lower().find("window") >= 0 and platform.architecture()[0] != "64bit" :
import win32com.client
import sqlite3
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from stock.analysis.AnalyzerSqlite import AnalyzerSqlite
class DailyStatus:
tableName = None
dbFileName = None
RESOURCE_PATH = None
analyzerSqlite = None
def __init__(self, RESOURCE_PATH):
self.RESOURCE_PATH = RESOURCE_PATH
self.tableName = 'stock'
self.dbFileName = "stock.db"
self.analyzerSqlite = AnalyzerSqlite(os.path.join(self.RESOURCE_PATH, self.dbFileName))
return
def getDBData(self, stock_code, day, result):
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
cursor.execute('SELECT ymd, close, open, high, low, envelope_upper, envelope_lower, envelope_middle, rsi, rsis, macd, macds, stochastic_slow_k, stochastic_slow_d FROM ' + self.tableName + ' WHERE CODE=? and ymd=? order by ymd', (stock_code, day,))
db_result = cursor.fetchall()
for rows in db_result:
ymd = rows[0]
close = rows[1]
open = rows[2]
high = rows[3]
low = rows[4]
envelope_upper = rows[5]
envelope_lower = rows[6]
envelope_middle = rows[7]
rsi = 0 if rows[8] is None else rows[8]
rsis = 0 if rows[9] is None else rows[9]
macd = rows[10]
macds = rows[11]
stochastic_slow_k = 0 if rows[12] is None else rows[12]
stochastic_slow_d = 0 if rows[13] is None else rows[13]
result["ymd"].append(ymd)
result["open"].append(int(open))
result["close"].append(int(close))
result["high"].append(int(high))
result["low"].append(int(low))
result["envelope_upper"].append(int(envelope_upper))
result["envelope_lower"].append(int(envelope_lower))
result["envelope_middle"].append(int(envelope_middle))
result["rsi"].append(int(rsi))
result["rsis"].append(int(rsis))
result["macd"].append(int(macd))
result["macds"].append(int(macds))
result["slow_k"].append(int(stochastic_slow_k))
result["slow_d"].append(int(stochastic_slow_d))
return
def isValidYMD(self, stock_code, day):
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
cursor.execute('SELECT ymd, count(*) as cnt FROM ' + self.tableName + ' WHERE CODE=? and ymd=?', (stock_code, day,))
db_result = cursor.fetchone()
if db_result[1] > 0:
return True
return False
def getLastData(self, stock_code, limit=350):
stockTableName = 'stock'
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
stock = {"CODE": stock_code, "NAME": "", "PRICE": []}
sql = 'SELECT ymd, close, diff, open, high, low, volume FROM ' + stockTableName + ' where CODE=? order by ymd desc '
sql += ' limit ' + str(limit)
cursor.execute(sql, (stock['CODE'],))
items = cursor.fetchall()
items_reverse = reversed(items)
for item in items_reverse:
stock['PRICE'].append(
{
"ymd": item[0],
"close": item[1],
"diff": item[2],
"open": item[3],
"high": item[4],
"low": item[5],
"volume": item[6],
"avg3": -1,
"avg4": -1,
"avg5": -1,
"avg6": -1,
"avg10": -1,
"avg12": -1,
"avg20": -1,
"avg36": -1,
"avg40": -1,
"avg48": -1,
"avg60": -1,
"avg120": -1,
"avg200": -1,
"avg240": -1,
"avg300": -1,
"disparity_avg5": -1,
"disparity_avg10": -1,
"disparity_avg20": -1,
"disparity_avg60": -1,
"disparity_avg120": -1,
"bolingerband_upper": -1,
"bolingerband_lower": -1,
"bolingerband_middle": -1,
"envelope_upper": -1,
"envelope_lower": -1,
"envelope_middle": -1,
"ichimokucloud_changeLine": -1,
"ichimokucloud_baseLine": -1,
"ichimokucloud_leadingSpan1": -1,
"ichimokucloud_leadingSpan2": -1,
"stochastic_fast_k": -1,
"stochastic_slow_k": -1,
"stochastic_slow_d": -1,
"rsi": -1,
"rsis": -1,
"macd": -1,
"macds": -1,
"macdo": -1,
}
)
conn.commit()
cursor.close()
conn.close()
return stock
def analyze (self, stock, days=120):
stock["PRICE"] = sorted(stock["PRICE"], key=lambda x: x['ymd'])
self.analyzerSqlite.get_moving_average(stock["PRICE"])
# 이동 평균을 이용한 이격도 계산
self.analyzerSqlite.get_disparity(stock["PRICE"])
self.analyzerSqlite.ichimokuCloud.analyze(stock)
self.analyzerSqlite.stochastic.analyze(stock)
self.analyzerSqlite.bolingerBand.analyze(stock)
self.analyzerSqlite.envelope.analyze(stock)
self.analyzerSqlite.rsi.analyze(stock)
self.analyzerSqlite.macd.analyze(stock)
result = {
"ymd": [],
"open": [],
"close": [],
"high": [],
"low": [],
"avg3": [],
"avg4": [],
"avg5": [],
"avg6": [],
"avg10": [],
"avg12": [],
"avg20": [],
"avg36": [],
"avg40": [],
"avg48": [],
"avg60": [],
"avg120": [],
"avg200": [],
"avg240": [],
"avg300": [],
"disparity_avg5": [],
"disparity_avg20": [],
"disparity_avg60": [],
"disparity_avg120": [],
"disparity": [],
"disparity_type": [],
"envelope_upper": [],
"envelope_lower": [],
"envelope_middle": [],
"rsi": [],
"rsis": [],
"macd": [],
"macds": [],
"slow_k": [],
"slow_d": [],
"buy": [],
"sell": [],
}
for item in stock['PRICE']:
result["ymd"].append(item['ymd'])
result["open"].append(item['open'])
result["close"].append(item['close'])
result["high"].append(item['high'])
result["low"].append(item['low'])
result["avg3"].append(item['avg3'])
result["avg4"].append(item['avg4'])
result["avg5"].append(item['avg5'])
result["avg6"].append(item['avg6'])
result["avg10"].append(item['avg10'])
result["avg12"].append(item['avg12'])
result["avg20"].append(item['avg20'])
result["avg36"].append(item['avg36'])
result["avg40"].append(item['avg40'])
result["avg48"].append(item['avg48'])
result["avg60"].append(item['avg60'])
result["avg120"].append(item['avg120'])
result["avg200"].append(item['avg200'])
result["avg240"].append(item['avg240'])
result["avg300"].append(item['avg300'])
result["disparity_avg5"].append(item['disparity_avg5'])
result["disparity_avg20"].append(item['disparity_avg20'])
result["disparity_avg60"].append(item['disparity_avg60'])
result["disparity_avg120"].append(item['disparity_avg120'])
result['disparity'].append(max(item['disparity_avg5'], item['disparity_avg20'], item['disparity_avg60']) - min(item['disparity_avg5'], item['disparity_avg20'], item['disparity_avg60']))
if item['disparity_avg60'] < item['disparity_avg20'] < item['disparity_avg5']:
result['disparity_type'].append(1)
elif item['disparity_avg5'] < item['disparity_avg20'] < item['disparity_avg60']:
result['disparity_type'].append(-1)
else:
result['disparity_type'].append(0)
result["envelope_upper"].append(item['envelope_upper'])
result["envelope_lower"].append(item['envelope_lower'])
result["envelope_middle"].append(item['envelope_middle'])
result["rsi"].append(item['rsi'])
result["rsis"].append(item['rsis'])
result["macd"].append(item['macd'])
result["macds"].append(item['macds'])
result["slow_k"].append(item['stochastic_slow_k'])
result["slow_d"].append(item['stochastic_slow_d'])
result["buy"].append(-1)
result["sell"].append(-1)
data = pd.DataFrame(result)
df_final_time = pd.DatetimeIndex(result['ymd'])
data.index = df_final_time
data = data.astype(
{
'open': 'int',
'high': 'int',
'low': 'int',
'close': 'int',
'avg3': 'float',
'avg4': 'float',
'avg5': 'float',
'avg6': 'float',
'avg10': 'float',
'avg12': 'float',
'avg20': 'float',
'avg36': 'float',
'avg40': 'float',
'avg48': 'float',
'avg60': 'float',
'avg120': 'float',
'avg200': 'float',
'avg240': 'float',
'avg300': 'float',
'disparity_avg5': 'float',
'disparity_avg20': 'float',
'disparity_avg60': 'float',
'disparity_avg120': 'float',
'buy': 'int',
'sell': 'int',
'slow_k': 'float',
'slow_d': 'float',
'macd': 'float',
'macds': 'float',
'envelope_upper': 'float',
'envelope_lower': 'float',
'envelope_middle': 'float',
'rsi': 'float',
'rsis': 'float'
}
)
scaler = StandardScaler()
low_df = pd.DataFrame(data['low'])
low_df.index = [c for c in range(len(low_df))]
low_std = scaler.fit_transform(data['low'].values.reshape(-1, 1))
low_std = pd.DataFrame(low_std, columns=['low_std'])
min_df = pd.DataFrame({'open': data['open'].to_list(), 'close': data['close'].to_list()})
min_df['min_std'] = min_df.min(axis=1)
min_df.index = [c for c in range(len(min_df))]
min_std = scaler.fit_transform(min_df['min_std'].values.reshape(-1, 1))
min_std = pd.DataFrame(min_std, columns=['min_std'])
line_fitter = LinearRegression()
size = len(data["close"])
gradients_low = []
gradients_avg5 = []
gradients_avg20 = []
gradients_avg60 = []
for i in range(size):
coef_low = -999
coef_avg5 = -999
coef_avg20 = -999
coef_avg60 = -999
if i > 0:
l = days if i >= days else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(low_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_low = line_fitter.coef_[0][0]
l = 5 if i >= 5 else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(min_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_avg5 = line_fitter.coef_[0][0]
l = 20 if i >= 20 else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(min_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_avg20 = line_fitter.coef_[0][0]
l = 60 if i >= 60 else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(min_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_avg60 = line_fitter.coef_[0][0]
gradients_low.append(coef_low)
gradients_avg5.append(coef_avg5)
gradients_avg20.append(coef_avg20)
gradients_avg60.append(coef_avg60)
gradients_low_df = pd.DataFrame(gradients_low, columns=['gradients_low'])
gradients_avg5_df = pd.DataFrame(gradients_avg5, columns=['gradients_avg5'])
gradients_avg20_df = pd.DataFrame(gradients_avg20, columns=['gradients_avg20'])
gradients_avg60_df = pd.DataFrame(gradients_avg60, columns=['gradients_avg60'])
gradients_low_df.index = df_final_time
gradients_avg5_df.index = df_final_time
gradients_avg20_df.index = df_final_time
gradients_avg60_df.index = df_final_time
data = data.merge(gradients_low_df, left_index=True, right_index=True)
data = data.merge(gradients_avg5_df, left_index=True, right_index=True)
data = data.merge(gradients_avg20_df, left_index=True, right_index=True)
data = data.merge(gradients_avg60_df, left_index=True, right_index=True)
return data

View File

@@ -13,7 +13,7 @@ rc('font', family='AppleGothic')
plt.rcParams['axes.unicode_minus'] = False
import plotly.graph_objs as go
from plotly import tools, subplots
from plotly import subplots
import plotly.io as po
from stock.analysis.Common import Common
@@ -1027,7 +1027,8 @@ if __name__ == "__main__":
start = time.time()
PROJECT_HOME = os.path.join(os.path.dirname(os.path.join(os.path.dirname(os.path.join(os.path.dirname(__file__))))))
stockFileName = PROJECT_HOME + '/resources/stock.db'
RESOURCE_PATH = os.path.join(PROJECT_HOME, 'resources')
stockFileName = os.path.join(RESOURCE_PATH, 'stock.db')
analyzer = AnalyzerSqlite(stockFileName)
#analyzer.analyzeDaily()

View File

@@ -0,0 +1,703 @@
import os.path
import pandas as pd
import platform
if platform.system().lower().find("window") >= 0 and platform.architecture()[0] != "64bit" :
import win32com.client
import sqlite3
import shutil
from math import nan
import plotly.graph_objects as go
from plotly import subplots
import plotly.io as po
from datetime import datetime
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from hts.HTS import HTS
from hts.BuySellChecker import BuySellChecker
from stock.analysis.AnalyzerSqlite import AnalyzerSqlite
class DailyStatus (HTS):
tableName = None
dbFileName = None
RESOURCE_PATH = None
analyzerSqlite = None
def __init__(self, RESOURCE_PATH):
super().__init__(RESOURCE_PATH)
self.RESOURCE_PATH = RESOURCE_PATH
self.tableName = 'stock'
self.dbFileName = "stock.db"
self.analyzerSqlite = AnalyzerSqlite(os.path.join(self.RESOURCE_PATH, self.dbFileName))
self.buySellChecker = BuySellChecker()
return
def getDBData(self, stock_code, day, result):
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
cursor.execute('SELECT ymd, close, open, high, low, envelope_upper, envelope_lower, envelope_middle, rsi, rsis, macd, macds, stochastic_slow_k, stochastic_slow_d FROM ' + self.tableName + ' WHERE CODE=? and ymd=? order by ymd', (stock_code, day,))
db_result = cursor.fetchall()
for rows in db_result:
ymd = rows[0]
close = rows[1]
open = rows[2]
high = rows[3]
low = rows[4]
envelope_upper = rows[5]
envelope_lower = rows[6]
envelope_middle = rows[7]
rsi = 0 if rows[8] is None else rows[8]
rsis = 0 if rows[9] is None else rows[9]
macd = rows[10]
macds = rows[11]
stochastic_slow_k = 0 if rows[12] is None else rows[12]
stochastic_slow_d = 0 if rows[13] is None else rows[13]
result["ymd"].append(ymd)
result["open"].append(int(open))
result["close"].append(int(close))
result["high"].append(int(high))
result["low"].append(int(low))
result["envelope_upper"].append(int(envelope_upper))
result["envelope_lower"].append(int(envelope_lower))
result["envelope_middle"].append(int(envelope_middle))
result["rsi"].append(int(rsi))
result["rsis"].append(int(rsis))
result["macd"].append(int(macd))
result["macds"].append(int(macds))
result["slow_k"].append(int(stochastic_slow_k))
result["slow_d"].append(int(stochastic_slow_d))
return
def isValidYMD(self, stock_code, day):
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
cursor.execute('SELECT ymd, count(*) as cnt FROM ' + self.tableName + ' WHERE CODE=? and ymd=?', (stock_code, day,))
db_result = cursor.fetchone()
if db_result[1] > 0:
return True
return False
def getLastData(self, stock_code, limit=350):
stockTableName = 'stock'
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
stock = {"CODE": stock_code, "NAME": "", "PRICE": []}
sql = 'SELECT ymd, close, diff, open, high, low, volume FROM ' + stockTableName + ' where CODE=? order by ymd desc '
sql += ' limit ' + str(limit)
cursor.execute(sql, (stock['CODE'],))
items = cursor.fetchall()
items_reverse = reversed(items)
for item in items_reverse:
stock['PRICE'].append(
{
"ymd": item[0],
"close": item[1],
"diff": item[2],
"open": item[3],
"high": item[4],
"low": item[5],
"volume": item[6],
"avg3": -1,
"avg4": -1,
"avg5": -1,
"avg6": -1,
"avg10": -1,
"avg12": -1,
"avg20": -1,
"avg36": -1,
"avg40": -1,
"avg48": -1,
"avg60": -1,
"avg120": -1,
"avg200": -1,
"avg240": -1,
"avg300": -1,
"disparity_avg5": -1,
"disparity_avg10": -1,
"disparity_avg20": -1,
"disparity_avg60": -1,
"disparity_avg120": -1,
"bolingerband_upper": -1,
"bolingerband_lower": -1,
"bolingerband_middle": -1,
"envelope_upper": -1,
"envelope_lower": -1,
"envelope_middle": -1,
"ichimokucloud_changeLine": -1,
"ichimokucloud_baseLine": -1,
"ichimokucloud_leadingSpan1": -1,
"ichimokucloud_leadingSpan2": -1,
"stochastic_fast_k": -1,
"stochastic_slow_k": -1,
"stochastic_slow_d": -1,
"rsi": -1,
"rsis": -1,
"macd": -1,
"macds": -1,
"macdo": -1,
}
)
conn.commit()
cursor.close()
conn.close()
return stock
def analyze (self, stock, days=120):
stock["PRICE"] = sorted(stock["PRICE"], key=lambda x: x['ymd'])
self.analyzerSqlite.get_moving_average(stock["PRICE"])
# 이동 평균을 이용한 이격도 계산
self.analyzerSqlite.get_disparity(stock["PRICE"])
self.analyzerSqlite.ichimokuCloud.analyze(stock)
self.analyzerSqlite.stochastic.analyze(stock)
self.analyzerSqlite.bolingerBand.analyze(stock)
self.analyzerSqlite.envelope.analyze(stock)
self.analyzerSqlite.rsi.analyze(stock)
self.analyzerSqlite.macd.analyze(stock)
result = {
"ymd": [],
"open": [],
"close": [],
"high": [],
"low": [],
"avg3": [],
"avg4": [],
"avg5": [],
"avg6": [],
"avg10": [],
"avg12": [],
"avg20": [],
"avg36": [],
"avg40": [],
"avg48": [],
"avg60": [],
"avg120": [],
"avg200": [],
"avg240": [],
"avg300": [],
"disparity_avg5": [],
"disparity_avg20": [],
"disparity_avg60": [],
"disparity_avg120": [],
"disparity": [],
"disparity_type": [],
"envelope_upper": [],
"envelope_lower": [],
"envelope_middle": [],
"rsi": [],
"rsis": [],
"macd": [],
"macds": [],
"slow_k": [],
"slow_d": [],
"buy": [],
"sell": [],
}
for item in stock['PRICE']:
result["ymd"].append(item['ymd'])
result["open"].append(item['open'])
result["close"].append(item['close'])
result["high"].append(item['high'])
result["low"].append(item['low'])
result["avg3"].append(item['avg3'])
result["avg4"].append(item['avg4'])
result["avg5"].append(item['avg5'])
result["avg6"].append(item['avg6'])
result["avg10"].append(item['avg10'])
result["avg12"].append(item['avg12'])
result["avg20"].append(item['avg20'])
result["avg36"].append(item['avg36'])
result["avg40"].append(item['avg40'])
result["avg48"].append(item['avg48'])
result["avg60"].append(item['avg60'])
result["avg120"].append(item['avg120'])
result["avg200"].append(item['avg200'])
result["avg240"].append(item['avg240'])
result["avg300"].append(item['avg300'])
result["disparity_avg5"].append(item['disparity_avg5'])
result["disparity_avg20"].append(item['disparity_avg20'])
result["disparity_avg60"].append(item['disparity_avg60'])
result["disparity_avg120"].append(item['disparity_avg120'])
result['disparity'].append(max(item['disparity_avg5'], item['disparity_avg20'], item['disparity_avg60']) - min(item['disparity_avg5'], item['disparity_avg20'], item['disparity_avg60']))
if item['disparity_avg60'] < item['disparity_avg20'] < item['disparity_avg5']:
result['disparity_type'].append(1)
elif item['disparity_avg5'] < item['disparity_avg20'] < item['disparity_avg60']:
result['disparity_type'].append(-1)
else:
result['disparity_type'].append(0)
result["envelope_upper"].append(item['envelope_upper'])
result["envelope_lower"].append(item['envelope_lower'])
result["envelope_middle"].append(item['envelope_middle'])
result["rsi"].append(item['rsi'])
result["rsis"].append(item['rsis'])
result["macd"].append(item['macd'])
result["macds"].append(item['macds'])
result["slow_k"].append(item['stochastic_slow_k'])
result["slow_d"].append(item['stochastic_slow_d'])
result["buy"].append(-1)
result["sell"].append(-1)
data = pd.DataFrame(result)
df_final_time = pd.DatetimeIndex(result['ymd'])
data.index = df_final_time
data = data.astype(
{
'open': 'int',
'high': 'int',
'low': 'int',
'close': 'int',
'avg3': 'float',
'avg4': 'float',
'avg5': 'float',
'avg6': 'float',
'avg10': 'float',
'avg12': 'float',
'avg20': 'float',
'avg36': 'float',
'avg40': 'float',
'avg48': 'float',
'avg60': 'float',
'avg120': 'float',
'avg200': 'float',
'avg240': 'float',
'avg300': 'float',
'disparity_avg5': 'float',
'disparity_avg20': 'float',
'disparity_avg60': 'float',
'disparity_avg120': 'float',
'buy': 'int',
'sell': 'int',
'slow_k': 'float',
'slow_d': 'float',
'macd': 'float',
'macds': 'float',
'envelope_upper': 'float',
'envelope_lower': 'float',
'envelope_middle': 'float',
'rsi': 'float',
'rsis': 'float'
}
)
scaler = StandardScaler()
low_df = pd.DataFrame(data['low'])
low_df.index = [c for c in range(len(low_df))]
low_std = scaler.fit_transform(data['low'].values.reshape(-1, 1))
low_std = pd.DataFrame(low_std, columns=['low_std'])
min_df = pd.DataFrame({'open': data['open'].to_list(), 'close': data['close'].to_list()})
min_df['min_std'] = min_df.min(axis=1)
min_df.index = [c for c in range(len(min_df))]
min_std = scaler.fit_transform(min_df['min_std'].values.reshape(-1, 1))
min_std = pd.DataFrame(min_std, columns=['min_std'])
line_fitter = LinearRegression()
size = len(data["close"])
gradients_low = []
gradients_avg5 = []
gradients_avg20 = []
gradients_avg60 = []
for i in range(size):
coef_low = -999
coef_avg5 = -999
coef_avg20 = -999
coef_avg60 = -999
if i > 0:
l = days if i >= days else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(low_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_low = line_fitter.coef_[0][0]
l = 5 if i >= 5 else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(min_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_avg5 = line_fitter.coef_[0][0]
l = 20 if i >= 20 else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(min_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_avg20 = line_fitter.coef_[0][0]
l = 60 if i >= 60 else i
x = pd.DataFrame([c for c in range(i - l, i + 1)])
y = pd.DataFrame(min_std.values.tolist()[i - l:i + 1])
line_fitter.fit(x.values.reshape(-1, 1), y)
coef_avg60 = line_fitter.coef_[0][0]
gradients_low.append(coef_low)
gradients_avg5.append(coef_avg5)
gradients_avg20.append(coef_avg20)
gradients_avg60.append(coef_avg60)
gradients_low_df = pd.DataFrame(gradients_low, columns=['gradients_low'])
gradients_avg5_df = pd.DataFrame(gradients_avg5, columns=['gradients_avg5'])
gradients_avg20_df = pd.DataFrame(gradients_avg20, columns=['gradients_avg20'])
gradients_avg60_df = pd.DataFrame(gradients_avg60, columns=['gradients_avg60'])
gradients_low_df.index = df_final_time
gradients_avg5_df.index = df_final_time
gradients_avg20_df.index = df_final_time
gradients_avg60_df.index = df_final_time
data = data.merge(gradients_low_df, left_index=True, right_index=True)
data = data.merge(gradients_avg5_df, left_index=True, right_index=True)
data = data.merge(gradients_avg20_df, left_index=True, right_index=True)
data = data.merge(gradients_avg60_df, left_index=True, right_index=True)
return data
def draw(self, stock_code, given_day, data, bsLine):
if bsLine is None:
return
# 어제 데이터는 지운다.
buy_line = bsLine['buy']
buy_weight_line = bsLine['buy_weight']
sell_line = bsLine['sell']
buy_size = []
buy_colors = []
for i in range(len(buy_line)):
if buy_line[i] < 0:
buy_colors.append("#ffffff")
buy_line[i] = nan
buy_size.append(0)
else:
buy_colors.append("#B2028C")
buy_size.append(10 + (0.1 * buy_weight_line[i]))
sell_colors = []
for i in range(len(sell_line)):
if sell_line[i] < 0:
sell_colors.append("#ffffff")
sell_line[i] = nan
else:
sell_colors.append("#00ced1")
# 그래프를 설정한다.
buy_check = go.Scatter(x=data['ymd'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0))
sell_check = go.Scatter(x=data['ymd'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0))
envelope_upper = go.Scatter(x=data['ymd'], y=data["envelope_upper"], name="upper", line_color='#000000')
envelope_middle = go.Scatter(x=data['ymd'], y=data["envelope_middle"], name="upper", line_color='#927786')
envelope_lower = go.Scatter(x=data['ymd'], y=data["envelope_lower"], name="lower", line_color='#000000')
avg5 = go.Scatter(x=data['ymd'], y=data["avg5"], name="avg5", line_color='#6C2507')
avg20 = go.Scatter(x=data['ymd'], y=data["avg20"], name="avg20", line_color='#f84c43')
avg60 = go.Scatter(x=data['ymd'], y=data["avg60"], name="avg60", line_color='#f89543')
candle_stick = go.Candlestick(x=data['ymd'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False)
macd_line = go.Scatter(x=data['ymd'], y=data["macd"], line=dict(color='red', width=2), name='macd')
macd_s_line = go.Scatter(x=data['ymd'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds')
# fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k')
slow_k_line = go.Scatter(x=data['ymd'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k')
slow_d_line = go.Scatter(x=data['ymd'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
rsi_line = go.Scatter(x=data['ymd'], y=data["rsi"], line=dict(color='red', width=2), name='rsi')
rsis_line = go.Scatter(x=data['ymd'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis')
disparity_avg5 = go.Scatter(x=data['ymd'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203')
disparity_avg20 = go.Scatter(x=data['ymd'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff')
disparity_avg60 = go.Scatter(x=data['ymd'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4')
candle_data = [candle_stick, avg5, avg20, avg60, envelope_upper, envelope_middle, envelope_lower, buy_check, sell_check]
disparity_data = [disparity_avg5, disparity_avg20, disparity_avg60]
macd_data = [macd_line, macd_s_line]
stochastic_data = [slow_k_line, slow_d_line]
rsi_data = [rsi_line, rsis_line]
# 그래프를 그린다.
"""
fig = go.Figure(data=candle_data)
fig.update_layout(title=stock_code + "_" + given_day)
fig.show()
"""
fig = subplots.make_subplots(
rows=5, cols=1,
subplot_titles=("MACD", "RSI", "스토캐스틱", '이격도', '캔들'),
#specs=[[{}], [{}], [{}], [{}], [{}], [{}]],
shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01,
row_heights=[200, 200, 200, 200, 750]
)
for trace in macd_data:
fig.append_trace(trace, 1, 1)
for trace in rsi_data:
fig.append_trace(trace, 2, 1)
for trace in stochastic_data:
fig.append_trace(trace, 3, 1)
for trace in disparity_data:
fig.append_trace(trace, 4, 1)
for trace in candle_data:
fig.append_trace(trace, 5, 1)
#fig.update_xaxes(nticks=5)
#fig.update_layout(height=1800, title=stock_code + "_" + given_day, xaxis_rangeslider_visible=False)
df = pd.DataFrame(bsLine)
df = df.fillna(-1)
buy_count = len(df.loc[df["buy"] > 0])
sell_count = len(df.loc[df["sell"] > 0])
fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count))
#fig.update_layout(title=stock_code + "_" + given_day + "_" + str(buy_count) + "," + str(sell_count))
fig.show()
return
def writeFile(self, dailyDirName, stock_code, stock_name, given_day, data, bsLine):
if bsLine is None:
return
# 어제 데이터는 지운다.
buy_line = bsLine['buy']
buy_weight_line = bsLine['buy_weight']
sell_line = bsLine['sell']
buy_size = []
buy_colors = []
for i in range(len(buy_line)):
if buy_line[i] < 0:
buy_colors.append("#ffffff")
buy_line[i] = nan
buy_size.append(0)
else:
buy_colors.append("#B2028C")
buy_size.append(10 + (0.1 * buy_weight_line[i]))
sell_colors = []
for i in range(len(sell_line)):
if sell_line[i] < 0:
sell_colors.append("#ffffff")
sell_line[i] = nan
else:
sell_colors.append("#00ced1")
# 그래프를 설정한다.
buy_check = go.Scatter(x=data['ymd'], y=buy_line, mode='markers', name="buy", marker=dict(size=buy_size, color=buy_colors, line_width=0))
sell_check = go.Scatter(x=data['ymd'], y=sell_line, mode='markers', name="sell", marker=dict(size=14, color=sell_colors, line_width=0))
envelope_upper = go.Scatter(x=data['ymd'], y=data["envelope_upper"], name="upper", line_color='#000000')
envelope_middle = go.Scatter(x=data['ymd'], y=data["envelope_middle"], name="upper", line_color='#927786')
envelope_lower = go.Scatter(x=data['ymd'], y=data["envelope_lower"], name="lower", line_color='#000000')
avg5 = go.Scatter(x=data['ymd'], y=data["avg5"], name="avg5", line_color='#6C2507')
avg20 = go.Scatter(x=data['ymd'], y=data["avg20"], name="avg20", line_color='#f84c43')
avg60 = go.Scatter(x=data['ymd'], y=data["avg60"], name="avg60", line_color='#f89543')
candle_stick = go.Candlestick(x=data['ymd'], open=data['open'], high=data['high'], low=data['low'], close=data['close'], increasing_line_color='red', decreasing_line_color='blue', showlegend=False)
macd_line = go.Scatter(x=data['ymd'], y=data["macd"], line=dict(color='red', width=2), name='macd')
macd_s_line = go.Scatter(x=data['ymd'], y=data["macds"], line=dict(dash='dashdot', color='black', width=2), name='macds')
# fast_k_line = go.Scatter(x=hts['date'], y=hts["fast_k"], mode='lines', name='fast_k')
slow_k_line = go.Scatter(x=data['ymd'], y=data["slow_k"], line=dict(color='red', width=2), name='slow_k')
slow_d_line = go.Scatter(x=data['ymd'], y=data["slow_d"], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
rsi_line = go.Scatter(x=data['ymd'], y=data["rsi"], line=dict(color='red', width=2), name='rsi')
rsis_line = go.Scatter(x=data['ymd'], y=data["rsis"], line=dict(dash='dashdot', color='black', width=2), name='rsis')
disparity_avg5 = go.Scatter(x=data['ymd'], y=data["disparity_avg5"], name="disparity_avg5", line_color='#8F8203')
disparity_avg20 = go.Scatter(x=data['ymd'], y=data["disparity_avg20"], name="disparity_avg20", line_color='#ff00ff')
disparity_avg60 = go.Scatter(x=data['ymd'], y=data["disparity_avg60"], name="disparity_avg60", line_color='#1469F4')
candle_data = [candle_stick, avg5, avg20, avg60, envelope_upper, envelope_middle, envelope_lower, buy_check, sell_check]
disparity_data = [disparity_avg5, disparity_avg20, disparity_avg60]
macd_data = [macd_line, macd_s_line]
stochastic_data = [slow_k_line, slow_d_line]
rsi_data = [rsi_line, rsis_line]
# 그래프를 그린다.
"""
fig = go.Figure(data=candle_data)
fig.update_layout(title=stock_code + "_" + given_day)
fig.show()
"""
fig = subplots.make_subplots(
rows=5, cols=1,
subplot_titles=("MACD", "RSI", "스토캐스틱", '이격도', '캔들'),
#specs=[[{}], [{}], [{}], [{}], [{}], [{}]],
shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01,
row_heights=[200, 200, 200, 200, 750]
)
for trace in macd_data:
fig.append_trace(trace, 1, 1)
for trace in rsi_data:
fig.append_trace(trace, 2, 1)
for trace in stochastic_data:
fig.append_trace(trace, 3, 1)
for trace in disparity_data:
fig.append_trace(trace, 4, 1)
for trace in candle_data:
fig.append_trace(trace, 5, 1)
df = pd.DataFrame(bsLine)
df = df.fillna(-1)
buy_count = len(df.loc[df["buy"] > 0])
sell_count = len(df.loc[df["sell"] > 0])
fig.update_layout(height=1700, title=stock_code + "_" + given_day + "_" + str(buy_count)+","+str(sell_count))
title = "%s (%s) 차트 (<a href=\"https://alphasquare.co.kr/home/stock/financial-information?code=%s\">URL1</a>, <a href=\"https://www.tradingview.com/chart/jJ8zOXz0/?symbol=KRX:%s\">URL2</a>)" % (stock_name, stock_code, stock_code, stock_code)
fig['layout'].update(title=title)
fileName = "%s/%s_%s.html" % (dailyDirName, stock_code, stock_name.replace(" ", ""))
po.write_html(fig, file=fileName, auto_open=False)
return
def getData(self, today, stock):
close = stock['PRICE'][len(stock['PRICE']) - 1]["close"]
open = stock['PRICE'][len(stock['PRICE']) - 1]["open"]
high = stock['PRICE'][len(stock['PRICE']) - 1]["high"]
low = stock['PRICE'][len(stock['PRICE']) - 1]["low"]
volume = stock['PRICE'][len(stock['PRICE']) - 1]["volume"]
stock['PRICE'].append(
{
"ymd": today,
"close": close,
"diff": stock['PRICE'][len(stock['PRICE']) - 1]["close"] - close,
"open": open,
"high": high,
"low": low,
"volume": volume,
"avg3": -1,
"avg4": -1,
"avg5": -1,
"avg6": -1,
"avg10": -1,
"avg12": -1,
"avg20": -1,
"avg36": -1,
"avg40": -1,
"avg48": -1,
"avg60": -1,
"avg120": -1,
"avg200": -1,
"avg240": -1,
"avg300": -1,
"disparity_avg5": -1,
"disparity_avg10": -1,
"disparity_avg20": -1,
"disparity_avg60": -1,
"disparity_avg120": -1,
"bolingerband_upper": -1,
"bolingerband_lower": -1,
"bolingerband_middle": -1,
"envelope_upper": -1,
"envelope_lower": -1,
"envelope_middle": -1,
"ichimokucloud_changeLine": -1,
"ichimokucloud_baseLine": -1,
"ichimokucloud_leadingSpan1": -1,
"ichimokucloud_leadingSpan2": -1,
"stochastic_fast_k": -1,
"stochastic_slow_k": -1,
"stochastic_slow_d": -1,
"rsi": -1,
"rsis": -1,
"macd": -1,
"macds": -1,
"macdo": -1,
}
)
return
def checkEnvelope(self, stock_codes:list=None, isRealTime=False):
if not isRealTime:
n = 200
else:
n = 200
if stock_codes is not None:
for stock_code in stock_codes:
stock = self.getLastData(stock_code, n)
today = datetime.today().strftime('%Y%m%d')
self.getData(today, stock)
analyzed_day = 60
data = self.analyze(stock, analyzed_day)
# 분석일 데이터만 활용한다 (이전 데이터는 제거)
data.drop(data.index[:analyzed_day], inplace=True)
# print logs
for i in range(len(data.index)):
print (i, data.index[i], data['macd'][i], data['slow_k'][i], data['gradients_low'][i], data['gradients_avg5'][i], data['gradients_avg20'][i], data['gradients_avg60'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity'][i], data['disparity_type'][i])
bsLine, data = self.buySellChecker.checkTransactionWithEnvelope(data, stock_code, isRealTime=False)
# 그래프를 그린다.
self.draw(stock_code, today, data, bsLine)
else:
stockTableName = 'stock'
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, self.dbFileName))
cursor = conn.cursor()
cursor.execute('SELECT distinct code, name FROM ' + stockTableName + ' order by code')
items = cursor.fetchall()
cursor.close()
conn.close()
today = datetime.today().strftime('%Y%m%d')
dailyDirName = os.path.join(self.RESOURCE_PATH, 'analysis', today, 'daily')
if os.path.exists(dailyDirName):
shutil.rmtree(dailyDirName)
dailyDirName = os.path.join(self.RESOURCE_PATH, 'analysis', today)
if not os.path.isdir(dailyDirName):
os.mkdir(dailyDirName)
dailyDirName = os.path.join(dailyDirName, 'daily')
if not os.path.isdir(dailyDirName):
os.mkdir(dailyDirName)
for idx, item in enumerate(items):
stock_code = item[0]
stock_name = item[1]
print(idx, stock_code, stock_name)
print("Analysis # :", idx, ", CODE: ", stock_code, ", NAME: ", stock_name)
stock = self.getLastData(stock_code, n)
today = datetime.today().strftime('%Y%m%d')
self.getData(today, stock)
analyzed_day = 60
data = self.analyze(stock, analyzed_day)
# 분석일 데이터만 활용한다 (이전 데이터는 제거)
data.drop(data.index[:analyzed_day], inplace=True)
# print logs
# for i in range(len(data.index)):
# print (i, data.index[i], data['macd'][i], data['slow_k'][i], data['gradients_low'][i], data['gradients_avg5'][i], data['gradients_avg20'][i], data['gradients_avg60'][i], data['disparity_avg5'][i], data['disparity_avg20'][i], data['disparity_avg60'][i], data['disparity'][i], data['disparity_type'][i])
bsLine, data = self.buySellChecker.checkTransactionWithEnvelope(data, stock_code, isRealTime=False)
# 그래프를 그린다.
if len(data.index) > 10 and max(bsLine['buy'][len(bsLine['buy'])-2:]) > 0:
self.writeFile(dailyDirName, stock_code, stock_name, today, data, bsLine)
return