Files
DeepStock/hts/BuySellChecker.py
dsyoon dbcadb6080 init
2023-12-13 00:11:37 +09:00

274 lines
13 KiB
Python

import os
from dtw import dtw
import json
import sqlite3
import numpy as np
from datetime import datetime, timedelta
class BuySellChecker():
PATTERNS = None
RESOURCE_PATH = None
def __init__(self, RESOURCE_PATH, s):
self.RESOURCE_PATH = RESOURCE_PATH
return
def nearDisparity(self, data, i):
if (0.998 < data['disparity_avg5'][i] < 1.002 and
0.998 < data['disparity_avg5'][i] < 1.002 and
0.998 < data['disparity_avg5'][i] < 1.002 and
0.998 < data['disparity_avg5'][i] < 1.002 and
0.998 < data['disparity_avg5'][i] < 1.002):
return True
return False
def cosine_similarity(self, x, y):
return np.dot(x, y) / (np.sqrt(np.dot(x, x)) * np.sqrt(np.dot(y, y)))
"""
def findBuyPoint(self, data, data_signal, i):
# 코사인 유사도(cosine similarity)로 과거 주가의 유사 패턴을 찾아 미래 예측하기
# https://teddylee777.github.io/pandas/cos-sim-stock/
buy_target = data['close'].iloc[i-179:i+1]
window_size = len(buy_target)
if window_size == 180:
buy_target = (buy_target - buy_target.min()) / (buy_target.max() - buy_target.min())
for pattern in self.PATTERNS:
cos_similarity = self.cosine_similarity(pattern, buy_target)
if 0.995 < cos_similarity:
return True
return False
"""
def findBuyPoint(self, data, i):
# DTW (Dynamic Time Warping)
# 시계열 유사도: https://m.blog.naver.com/happyrachy/221693939341
if i < 24:
return False
for p in range(len(self.PATTERNS['min_max'])):
size = len(self.PATTERNS['stndardization'][p])
if i - size + 1 < 0:
continue
close = data['close'].iloc[i-size+1:i+1]
#min_max = np.array(self.PATTERNS['min_max'][p]).reshape(-1, 1)
stndardization = np.array(self.PATTERNS['stndardization'][p]).reshape(-1, 1)
#min_max_y = np.array((close - close.min()) / (close.max() - close.min())).reshape(-1, 1)
stndardization_y = np.array((close - close.mean()) / close.std()).reshape(-1, 1)
#manhattan_distance = lambda min_max, min_max_y: np.abs(min_max - min_max_y)
#min_max_d, cost_matrix, acc_cost_matrix, path = dtw(min_max, min_max_y, dist=manhattan_distance)
manhattan_distance = lambda stndardization, stndardization_y: np.abs(stndardization - stndardization_y)
stndardization_d, cost_matrix, acc_cost_matrix, path = dtw(stndardization, stndardization_y, dist=manhattan_distance)
if stndardization_d < 2:
#print(i, data['ymd'].iloc[i], stndardization_d)
return True
return False
def getMacd(self, ticker_code, day, mins=1):
table = 'minutely_max_macd_' + str(mins)
conn = sqlite3.connect(os.path.join(self.RESOURCE_PATH, 'coins.db'))
cursor = conn.cursor()
day1 = (datetime.strptime(day, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
cursor.execute('SELECT ymd, hms, macd, close FROM '+table+' WHERE (CODE=? or CODE=?) and (ymd=? or ymd=?) order by macd desc', (ticker_code, ticker_code.replace('KRW-', ''), day, day1, ))
db_result1 = cursor.fetchall()
cursor.close()
conn.close()
macd_limit = [(datetime.strptime(rows[0]+" "+rows[1], '%Y%m%d %H%M%S'), rows[2], rows[3]) for rows in db_result1]
macd_dup = list(set(macd_limit))
return macd_dup
def getBuyPriceAndWeight1(self, ticker, MAX_BUY_PRICE, i, data, data_signal, BUY_LIST, isRealTime=True):
buy_ymd, buy_price, buy_count, buy_cut, buy_type = None, -1, -1, -1, ''
df_tmp = data_signal['ymd'] <= data['ymd'][i]
df_signal = data_signal.loc[df_tmp]
si = len(df_signal) - 1
"""
if isRealTime:
macds = data['macd'][i-300:i].to_list()
if 0 < len(macds):
macds_max = max(macds)
mi = i-300 + macds.index(macds_max)
if data['macd'][i] < macds_max and data['close'][mi] < data['close'][i]:
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
else:
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
else:
macds = self.getMacd(ticker['ticker_code'], data['ymd'][i].strftime('%Y%m%d'), mins=1)
if len(macds) == 0:
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
macds_sort = sorted(macds, key=lambda x:x[0], reverse=True)
if data['macd'][i] < macds_sort[0][1] and macds_sort[0][2] < data['close'][i]:
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
"""
duration = 3
if sum(data['trend_avg'][i - duration:i]) / duration < data['trend_avg'][i]:
# 상승 트렌드
if data_signal['avg20'][si] < data_signal['avg5'][si]:
"""
# 방법 1:
if max(data['volume_up'][i-180:i]) < data['volume_up'][i]:
if data_signal['slow_k'][si] < 70:
if BUY_LIST is not None and 0 < len(BUY_LIST['buy_list']) and BUY_LIST['buy_list'][-1]['buy_price'] < data['close'][i]:
buy_price = data['close'][i]
buy_type = 'volume_up'
buy_ymd = data['ymd'][i]
if data['slow_k'][si] < 30:
buy_count = MAX_BUY_PRICE / (1 * data['close'][i])
elif data['slow_k'][si] < 50:
buy_count = MAX_BUY_PRICE / (2 * data['close'][i])
else:
buy_count = MAX_BUY_PRICE / (3 * data['close'][i])
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
else:
buy_price = data['close'][i]
buy_type = 'volume_up'
buy_ymd = data['ymd'][i]
if data['slow_k'][si] < 30:
buy_count = MAX_BUY_PRICE / (1 * data['close'][i])
elif data['slow_k'][si] < 50:
buy_count = MAX_BUY_PRICE / (2 * data['close'][i])
else:
buy_count = MAX_BUY_PRICE / (3 * data['close'][i])
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
"""
# 방법 2:
if data['avg480'][i] < data['avg120'][i] < data['avg60'][i] < data['avg20'][i] < data['avg5'][i] < data['close'][i]:
if data['avg240'][i] < min(data['avg5'][i], data['avg20'][i], data['avg60'][i], data['avg120'][i]):
if BUY_LIST is not None and 0 < len(BUY_LIST['buy_list']) and data['ymd'][i] < BUY_LIST['buy_list'][-1]['buy_ymd'] + timedelta(minutes=10):
if BUY_LIST['buy_list'][-1]['buy_price'] < data['close'][i]:
buy_price = data['close'][i]
buy_type = 'golden'
buy_ymd = data['ymd'][i]
if data['slow_k'][si] < 30:
buy_count = MAX_BUY_PRICE / (1 * data['close'][i])
elif data['slow_k'][si] < 50:
buy_count = MAX_BUY_PRICE / (2 * data['close'][i])
else:
buy_count = MAX_BUY_PRICE / (3 * data['close'][i])
else:
buy_price = data['close'][i]
buy_type = 'golden'
buy_ymd = data['ymd'][i]
if data['slow_k'][si] < 30:
buy_count = MAX_BUY_PRICE / (1 * data['close'][i])
elif data['slow_k'][si] < 50:
buy_count = MAX_BUY_PRICE / (2 * data['close'][i])
else:
buy_count = MAX_BUY_PRICE / (3 * data['close'][i])
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
# 방법 3:
if sum(data['avg480'][i - 5:i]) < data['avg480'][i] and sum(data['avg60'][i - 5:i]) < data['avg60'][i] and sum(data['avg20'][i - 5:i]) < data['avg20'][i] and sum(data['avg5'][i - 5:i]) < data['avg5'][i]:
if data['avg480'][i] < data['avg20'][i] < data['avg5'][i]:
if data['avg60'][i] < data['avg20'][i] < data['avg5'][i]:
if data['avg5'][i] < data['avg5'][i]:
buy_type = 'trend_up'
buy_ymd = data['ymd'][i]
if data['slow_k'][si] < 30:
buy_count = MAX_BUY_PRICE / (1 * data['close'][i])
elif data['slow_k'][si] < 50:
buy_count = MAX_BUY_PRICE / (1.5 * data['close'][i])
else:
buy_count = MAX_BUY_PRICE / (2 * data['close'][i])
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
return buy_ymd, buy_price, buy_count, buy_cut, buy_type
def getSellPriceAndWeight1(self, ticker, i, data, data_signal, BUY_LIST=None):
sell_price, sell_count = -1, -1
if BUY_LIST is not None and 0 < len(BUY_LIST['buy_list']):
# 방법1에 대해서는 1% 이익시 매도 한다. (Upbit.py 파일에서)
# 방법2에 대한 매도
if data['close'][i-1] < data['open'][i-1] and data['close'][i] < data['open'][i]:
count = sum([price['buy_count'] for price in BUY_LIST['buy_list'] if price['buy_type'] == 'golden'])
if 0 < count:
sell_price = data['close'][i]
sell_count = sum([price['buy_count'] for price in BUY_LIST['buy_list']])
return sell_price, sell_count
def checkTransaction1(self, ticker, MAX_BUY_PRICE, data, data_signal, BUY_LIST=None, isRealTime=True):
# 어제 오늘 데이터로 분석
bsLine = {}
if data is not None and 'close' in data.columns:
size = len(data["close"])
if isRealTime:
# isRealTime=True, 실시간 적용
last_index = size - 1
sell_price, sell_weight = self.getSellPriceAndWeight1(ticker, last_index, data, data_signal, BUY_LIST)
bsLine['sell_price'] = [sell_price]
bsLine['sell_weight'] = [sell_weight]
buy_ymd, buy_price, buy_count, buy_cut, buy_type = self.getBuyPriceAndWeight1(ticker, MAX_BUY_PRICE, last_index, data, data_signal, BUY_LIST, isRealTime)
bsLine['buy_ymd'] = [buy_ymd]
bsLine['buy_price'] = [buy_price]
bsLine['buy_count'] = [buy_count]
bsLine['buy_cut'] = [buy_cut]
bsLine['buy_type'] = [buy_type]
if BUY_LIST is not None and 0 < buy_price:
BUY_LIST['buy_list'].append({'buy_ymd': buy_ymd, 'buy_price': buy_price, 'buy_count': buy_count, 'buy_cut': buy_cut, 'buy_type': buy_type})
else:
# Type=False, 시뮬레이션 적용
bsLine['buy_ymd'] = [-1 for i in range(size)]
bsLine['buy_price'] = [-1 for i in range(size)]
bsLine['buy_count'] = [-1 for i in range(size)]
bsLine['buy_cut'] = [-1 for i in range(size)]
bsLine['buy_type'] = ['' for i in range(size)]
bsLine['sell_price'] = [-1 for i in range(size)]
bsLine['sell_weight'] = [-1 for i in range(size)]
for last_index in range(size):
sell_price, sell_weight = self.getSellPriceAndWeight1(ticker, last_index, data, data_signal, BUY_LIST)
bsLine['sell_price'][last_index] = sell_price
bsLine['sell_weight'][last_index] = sell_weight
if sell_price < 0:
buy_ymd, buy_price, buy_count, buy_cut, buy_type = self.getBuyPriceAndWeight1(ticker, MAX_BUY_PRICE, last_index, data, data_signal, BUY_LIST, isRealTime)
bsLine['buy_price'][last_index] = buy_price
bsLine['buy_count'][last_index] = buy_count
bsLine['buy_cut'][last_index] = buy_cut
bsLine['buy_type'][last_index] = buy_type
if BUY_LIST is not None and 0 < buy_price:
BUY_LIST['buy_list'].append({'buy_ymd': buy_ymd, 'buy_price': buy_price, 'buy_count': buy_count, 'buy_cut': buy_cut, 'buy_type': buy_type})
else:
bsLine['buy_price'] = [-1]
bsLine['buy_count'] = [-1]
bsLine['buy_cut'] = [-1]
bsLine['buy_type'] = ['']
bsLine['sell_price'] = [-1]
bsLine['sell_weight'] = [-1]
return bsLine