Files
DeepStock/hts/BuySellChecker.py
dsyoon 7006be45fa init
2023-11-16 01:26:52 +09:00

299 lines
12 KiB
Python

import pandas as pd
from stock.analysis.Common import Common
from stock.analysis.Stochastic import Stochastic
from stock.analysis.RSI import RSI
from stock.analysis.MACD import MACD
from stock.analysis.IchimokuCloud import IchimokuCloud
class BuySellChecker:
common = None
stochastic = None
rsi = None
macd = None
ichimokuCloud = None
BUY_COUNT = None
def __init__(self):
self.common = Common()
self.stochastic = Stochastic()
self.rsi = RSI()
self.macd = MACD()
self.ichimokuCloud = IchimokuCloud()
self.BUY_COUNT = 0
return
def getBuyPriceAndWeight(self, i, data):
buy, weight, type = -1, -1, ""
"""
# 매수전략 #1: 다이버전스
if data['macd'][i] < 0 and data['open'][i] < data['close'][i]:
if 0 < len(data['rsi'].tolist()[i - 10:i - 5]):
if min(data['rsi'].tolist()[i - 10:i - 5]) < data['rsi'][i - 1]:
if data['low'][i - 1] < min(data['low'].tolist()[i - 10:i - 5]):
weight = 1
buy = data['close'][i]
type = 'Divergence'
"""
high_barrier = 70
low_barrier = 30
Buy_Price=[]
Sell_Price=[]
number=[]
temp01 = []
temp01_id = []
temp02 = []
temp01_id = []
temp01_min_price = []
temp02_min_price = []
temp01_min_rsi = []
temp02_min_rsi = []
n_id=[]
i_id=[]
flag=1
n = 0
# https://superhky.tistory.com/441
find = False
for c in range(i-40, i-1):
if data['rsi'][i-1] > low_barrier and data['rsi'][i] < low_barrier:
for k in range(c, i):
if data['rsi'][k-1] < low_barrier and data['rsi'][k] > low_barrier:
temp01 = data['rsi'].iloc[c:k]
temp01_id = temp01.argmin() + c
temp01_min_rsi = data['rsi'][temp01_id]
temp01_min_price = data['close'][temp01_id]
for m in range(k, i):
if data['rsi'][m-1] < low_barrier and data['rsi'][m] < low_barrier:
for n in range(m, i):
if data['rsi]'][n-1] < low_barrier and data['rsi'][n] < low_barrier:
temp02 = data['rsi'].iloc[m:n]
temp02_id = temp02.argmin() + m
temp02_min_rsi = data['rsi'][temp02_id]
temp02_min_price = data['close'][temp02_id]
if temp01_min_rsi < temp02_min_rsi and temp01_min_price > temp02_min_price and flag == 1:
if c == i-1:
weight = 1
buy = data['close'][i]
type = 'Divergence'
find = True
break
if find: break
if find: break
if find: break
"""
# 매수전략 #3: stochastic + rsi + macd
check = False
if data['slow_k'][i - 1] < data['slow_k'][i] and data['slow_d'][i] < data['slow_k'][i]:
# 과매도 체크
index = -1
for c in range(i - 40, i):
if data['slow_k'][i] < 20:
index = c
check = True
if check:
# 과매도 후 과매수 였는지 체크
check = False
for d in range(index, i):
if 80 < data['slow_k'][d]:
check = True
break
if not check:
# 과매도 후 과매수가 아니라면
if data['rsi'][i - 1] < 50 and 50 < data['rsi'][i]:
if data['macds'][i] < data['macd'][i] < 0:
weight = 1
buy = data['close'][i]
type = 'S+R+M'
"""
return buy, weight, type
def getSellPriceAndWeight(self, i, data):
sell, weight, type = -1, -1, ""
max_value = max(data['macd'].tolist()) * 0.8
if (max_value < data['macd'][i] or 1.9 < data['macds'][i]) and (0 < data['macdo'][i-1] and data['macdo'][i] <= 0):
#if data['macds'][i-1] < data['macd'][i-1] and data['macd'][i] < data['macds'][i]:
weight = 1
sell = data['close'][i]
type = 'method1'
# 매수전략 #2: RSI 과매수에서 데드크로스
if (data['macds'][i - 1] < data['macd'][i - 1] and data['macd'][i] < data['macds'][i]):
if 70 < data['rsi'][i]:
weight = 1
sell = data['close'][i]
type = 'method2'
return sell, weight, type
def checkTransaction(self, data, isRealTime=True):
# 어제 오늘 데이터로 분석
bsLine = {}
if data is not None and 'close' in data.columns:
size = len(data["close"])
if isRealTime:
# isRealTime=True, 실시간 적용
last_index = size - 1
buy, buy_weight, buy_type = self.getBuyPriceAndWeight(last_index, data)
sell, sell_weight, sell_type = self.getSellPriceAndWeight(last_index, data)
bsLine['buy'] = [buy]
bsLine['buy_weight'] = [buy_weight]
bsLine['buy_type'] = [buy_type]
bsLine['sell'] = [sell]
bsLine['sell_weight'] = [sell_weight]
bsLine['sell_type'] = [sell_type]
else:
# Type=False, 시뮬레이션 적용
bsLine['buy'] = [-1 for i in range(size)]
bsLine['buy_weight'] = [-1 for i in range(size)]
bsLine['buy_type'] = ['' for i in range(size)]
bsLine['sell'] = [-1 for i in range(size)]
bsLine['sell_weight'] = [-1 for i in range(size)]
bsLine['sell_type'] = ['' for i in range(size)]
for last_index in range(size):
buy, buy_weight, buy_type = self.getBuyPriceAndWeight(last_index, data)
sell, sell_weight, sell_type = self.getSellPriceAndWeight(last_index, data)
bsLine['buy'][last_index] = buy
bsLine['buy_weight'][last_index] = buy_weight
bsLine['buy_type'][last_index] = buy_type
bsLine['sell'][last_index] = sell
bsLine['sell_weight'][last_index] = sell_weight
bsLine['sell_type'][last_index] = sell_type
else:
bsLine['buy'] = [-1]
bsLine['buy_weight'] = [-1]
bsLine['buy_type'] = ['']
bsLine['sell'] = [-1]
bsLine['sell_weight'] = [-1]
bsLine['sell_type'] = ['']
return bsLine
def analyze(self, result):
# 기본 캔들 정보
open = result["open"]
close = result["close"]
high = result["high"]
low = result["low"]
vol = result["vol"]
# 이동 평균
close_df = pd.DataFrame(close)
avg5_list = close_df.rolling(window=5).mean().fillna(close[0]).values.tolist()
avg5 = [item[0] for item in avg5_list]
avg20_list = close_df.rolling(window=20).mean().fillna(close[0]).values.tolist()
avg20 = [item[0] for item in avg20_list]
avg30_list = close_df.rolling(window=30).mean().fillna(close[0]).values.tolist()
avg30 = [item[0] for item in avg30_list]
avg60_list = close_df.rolling(window=60).mean().fillna(close[0]).values.tolist()
avg60 = [item[0] for item in avg60_list]
avg120_list = close_df.rolling(window=120).mean().fillna(close[0]).values.tolist()
avg120 = [item[0] for item in avg120_list]
avg200_list = close_df.rolling(window=200).mean().fillna(close[0]).values.tolist()
avg200 = [item[0] for item in avg200_list]
open_df = pd.DataFrame(close)
disparity_avg5_list = (open_df / close_df.rolling(window=5).mean()).values.tolist()
disparity_avg5 = [item[0] for item in disparity_avg5_list]
disparity_avg20_list = (open_df / close_df.rolling(window=20).mean()).values.tolist()
disparity_avg20 = [item[0] for item in disparity_avg20_list]
disparity_avg30_list = (open_df / close_df.rolling(window=30).mean()).values.tolist()
disparity_avg30 = [item[0] for item in disparity_avg30_list]
disparity_avg60_list = (open_df / close_df.rolling(window=60).mean()).values.tolist()
disparity_avg60 = [item[0] for item in disparity_avg60_list]
disparity_avg120_list = (open_df / close_df.rolling(window=120).mean()).values.tolist()
disparity_avg120 = [item[0] for item in disparity_avg120_list]
disparity_avg200_list = (open_df / close_df.rolling(window=200).mean()).values.tolist()
disparity_avg200 = [item[0] for item in disparity_avg200_list]
# 볼린져 밴드
df = pd.DataFrame(close)
max20 = df.rolling(window=20).mean()
stddev20 = df.rolling(window=20).std()
upper_df = max20 + (stddev20 * 2) # 상단 볼린저 밴드
lower_df = max20 - (stddev20 * 2) # 하단 볼린저 밴드
upper, lower = [], []
for i in range(len(upper_df)):
if i < 10:
upper.append(upper_df.values[0][0])
lower.append(lower_df.values[0][0])
else:
upper.append(upper_df.values[i][0])
lower.append(lower_df.values[i][0])
point_temp = result["time"]
STOCK = []
for i in range(len(open)):
STOCK.append({'volume': vol[i], 'close': close[i], 'open': open[i], 'high': high[i], 'low': low[i],
'avg5': avg5[i], 'avg20': avg20[i], 'avg30': avg30[i], 'avg60': avg60[i], 'avg120': avg120[i], 'avg200': avg200[i]})
# stochastic
stochastic_df = self.stochastic.apply(STOCK, n=30, m=5, t=5)
fast_k = stochastic_df['fast_k'].values.tolist()
slow_k = stochastic_df['slow_k'].values.tolist()
slow_d = stochastic_df['slow_d'].values.tolist()
# macd
#macd_df = self.macd.apply(STOCK, short=12, long=26, t=9)
macd_df = self.macd.apply(STOCK, short=5, long=20, t=5)
macd = macd_df['macd'].values.tolist()
macds = macd_df['macds'].values.tolist()
macdo = macd_df['macdo'].values.tolist()
# rsi
rsi_df = self.rsi.apply(STOCK, period=30, window=5)
rsi = rsi_df['rsi'].values.tolist()
rsis = rsi_df['rsis'].values.tolist()
# ichimokuCloud
ichimokuCloud_df = self.ichimokuCloud.apply(STOCK, c=9, b=26, l=52)
ichimokuCloud_df = ichimokuCloud_df[:len(ichimokuCloud_df) - 51]
changeLine = ichimokuCloud_df['changeLine'].values.tolist()
baseLine = ichimokuCloud_df['baseLine'].values.tolist()
laggingSpan = ichimokuCloud_df['laggingSpan'].values.tolist()
leadingSpan1 = ichimokuCloud_df['leadingSpan1'].values.tolist()
leadingSpan2 = ichimokuCloud_df['leadingSpan2'].values.tolist()
# 결과
temp = {
"date": point_temp,
"open": open, "high": high, "low": low, "close": close, "volume": vol,
"avg5": avg5, "avg20": avg20, "avg30": avg30, "avg60": avg60, "avg120": avg120, "avg200": avg200,
"disparity_avg5": disparity_avg5, "disparity_avg20": disparity_avg20, "disparity_avg30": disparity_avg30,
"disparity_avg60": disparity_avg60, "disparity_avg120": disparity_avg120, "disparity_avg200": disparity_avg200,
"upper": upper, "lower": lower,
"macd": macd, "macds": macds, "macdo": macdo,
"fast_k": fast_k, "slow_k": slow_k, "slow_d": slow_d,
"rsi": rsi, "rsis": rsis,
"changeLine": changeLine, "baseLine": baseLine, "laggingSpan": laggingSpan, "leadingSpan1": leadingSpan1,
"leadingSpan2": leadingSpan2,
}
data = pd.DataFrame(temp)
df_final_time = pd.DatetimeIndex(point_temp)
data.index = df_final_time
data = data.fillna(-1)
return data