import os import re import win32com.client from datetime import datetime #import matplotlib.pyplot as plt #import matplotlib.ticker as ticker #import mplfinance as mpf import pandas as pd #import cufflinks as cf #import plotly.graph_objects as go class HTS: objCpCybos = None objCpCodeMgr = None stock = [] def __init__(self): #self.connect() return def connect(self): # 연결 여부 체크 self.objCpCybos = win32com.client.Dispatch("CpUtil.CpCybos") bConnect = self.objCpCybos.IsConnect if (bConnect == 0): print("PLUS가 정상적으로 연결되지 않음. ") exit() return def all_stocks(self): # 종목코드 리스트 구하기 self.objCpCodeMgr = win32com.client.Dispatch("CpUtil.CpCodeMgr") codeList = self.objCpCodeMgr.GetStockListByMarket(1) # 거래소 codeList2 = self.objCpCodeMgr.GetStockListByMarket(2) # 코스닥 print("거래소 종목코드", len(codeList)) for i, code in enumerate(codeList): secondCode = self.objCpCodeMgr.GetStockSectionKind(code) name = self.objCpCodeMgr.CodeToName(code) stdPrice = self.objCpCodeMgr.GetStockStdPrice(code) print(i, code, secondCode, stdPrice, name) print("코스닥 종목코드", len(codeList2)) for i, code in enumerate(codeList2): secondCode = self.objCpCodeMgr.GetStockSectionKind(code) name = self.objCpCodeMgr.CodeToName(code) stdPrice = self.objCpCodeMgr.GetStockStdPrice(code) print(i, code, secondCode, stdPrice, name) print("거래소 + 코스닥 종목코드 ", len(codeList) + len(codeList2)) return # 차트 데이터 구하기 def getChartData(self, stock_code): # 차트 객체 구하기 objStockChart = win32com.client.Dispatch("CpSysDib.StockChart") objStockChart.SetInputValue(0, 'A'+stock_code) # 종목 코드 - 삼성전자 objStockChart.SetInputValue(1, ord('2')) # 개수로 조회 objStockChart.SetInputValue(4, 100) # 최근 100일 치 objStockChart.SetInputValue(5, [0, 2, 3, 4, 5, 8]) # 날짜,시가,고가,저가,종가,거래량 objStockChart.SetInputValue(6, ord('D')) # '차트 주가 - 일간 차트 요청 objStockChart.SetInputValue(9, ord('1')) # 수정주가 사용 objStockChart.BlockRequest() len = objStockChart.GetHeaderValue(3) print("날짜", "시가", "고가", "저가", "종가", "거래량") print("==============================================-") for i in range(len): day = objStockChart.GetDataValue(0, i) open = objStockChart.GetDataValue(1, i) high = objStockChart.GetDataValue(2, i) low = objStockChart.GetDataValue(3, i) close = objStockChart.GetDataValue(4, i) vol = objStockChart.GetDataValue(5, i) print(day, open, high, low, close, vol) return # 주식 현재가 조회 def currentStock(self, stock_code): # 현재가 객체 구하기 self.objStockMst = win32com.client.Dispatch("DsCbo1.StockMst") self.objStockMst.SetInputValue(0, 'A'+stock_code) # 종목 코드 - 삼성전자 self.objStockMst.BlockRequest() # 현재가 통신 및 통신 에러 처리 rqStatus = self.objStockMst.GetDibStatus() rqRet = self.objStockMst.GetDibMsg1() print("통신상태", rqStatus, rqRet) if rqStatus != 0: exit() # 현재가 정보 조회 code = self.objStockMst.GetHeaderValue(0) # 종목코드 name = self.objStockMst.GetHeaderValue(1) # 종목명 time = self.objStockMst.GetHeaderValue(4) # 시간 cprice = self.objStockMst.GetHeaderValue(11) # 종가 diff = self.objStockMst.GetHeaderValue(12) # 대비 open = self.objStockMst.GetHeaderValue(13) # 시가 high = self.objStockMst.GetHeaderValue(14) # 고가 low = self.objStockMst.GetHeaderValue(15) # 저가 offer = self.objStockMst.GetHeaderValue(16) # 매도호가 bid = self.objStockMst.GetHeaderValue(17) # 매수호가 vol = self.objStockMst.GetHeaderValue(18) # 거래량 vol_value = self.objStockMst.GetHeaderValue(19) # 거래대금 # 예상 체결관련 정보 exFlag = self.objStockMst.GetHeaderValue(58) # 예상체결가 구분 플래그 exPrice = self.objStockMst.GetHeaderValue(55) # 예상체결가 exDiff = self.objStockMst.GetHeaderValue(56) # 예상체결가 전일대비 exVol = self.objStockMst.GetHeaderValue(57) # 예상체결수량 print("코드", code) print("이름", name) print("시간", time) print("종가", cprice) print("대비", diff) print("시가", open) print("고가", high) print("저가", low) print("매도호가", offer) print("매수호가", bid) print("거래량", vol) print("거래대금", vol_value) if (exFlag == ord('0')): print("장 구분값: 동시호가와 장중 이외의 시간") elif (exFlag == ord('1')): print("장 구분값: 동시호가 시간") elif (exFlag == ord('2')): print("장 구분값: 장중 또는 장종료") print("예상체결가 대비 수량") print("예상체결가", exPrice) print("예상체결가 대비", exDiff) print("예상체결수량", exVol) return # 주식 현금 매수주문 def orderToBuy(self, stock_code): # 주문 초기화 objTrade = win32com.client.Dispatch("CpTrade.CpTdUtil") initCheck = objTrade.TradeInit(0) if (initCheck != 0): print("주문 초기화 실패") exit() # 주식 매수 주문 acc = objTrade.AccountNumber[0] # 계좌번호 accFlag = objTrade.GoodsList(acc, 1) # 주식상품 구분 print(acc, accFlag[0]) objStockOrder = win32com.client.Dispatch("CpTrade.CpTd0311") objStockOrder.SetInputValue(0, "2") # 2: 매수 objStockOrder.SetInputValue(1, acc) # 계좌번호 objStockOrder.SetInputValue(2, accFlag[0]) # 상품구분 - 주식 상품 중 첫번째 objStockOrder.SetInputValue(3, "A"+stock_code) # 종목코드 objStockOrder.SetInputValue(4, 10) # 매수수량 10주 objStockOrder.SetInputValue(5, 14100) # 주문단가 - 14,100원 objStockOrder.SetInputValue(7, "0") # 주문 조건 구분 코드, 0: 기본 1: IOC 2:FOK objStockOrder.SetInputValue(8, "01") # 주문호가 구분코드 - 01: 보통 # 매수 주문 요청 objStockOrder.BlockRequest() rqStatus = objStockOrder.GetDibStatus() rqRet = objStockOrder.GetDibMsg1() print("통신상태", rqStatus, rqRet) if rqStatus != 0: exit() return # 주식 현금 매도주문 def orderToSell(self, stock_code): # 주문 초기화 objTrade = win32com.client.Dispatch("CpTrade.CpTdUtil") initCheck = objTrade.TradeInit(0) if (initCheck != 0): print("주문 초기화 실패") exit() # 주식 매도 주문 acc = objTrade.AccountNumber[0] # 계좌번호 accFlag = objTrade.GoodsList(acc, 1) # 주식상품 구분 print(acc, accFlag[0]) objStockOrder = win32com.client.Dispatch("CpTrade.CpTd0311") objStockOrder.SetInputValue(0, "1") # 1: 매도 objStockOrder.SetInputValue(1, acc) # 계좌번호 objStockOrder.SetInputValue(2, accFlag[0]) # 상품구분 - 주식 상품 중 첫번째 objStockOrder.SetInputValue(3, "A"+stock_code) # 종목코드 - A003540 - 대신증권 종목 objStockOrder.SetInputValue(4, 10) # 매도수량 10주 objStockOrder.SetInputValue(5, 14100) # 주문단가 - 14,100원 objStockOrder.SetInputValue(7, "0") # 주문 조건 구분 코드, 0: 기본 1: IOC 2:FOK objStockOrder.SetInputValue(8, "01") # 주문호가 구분코드 - 01: 보통 # 매도 주문 요청 objStockOrder.BlockRequest() rqStatus = objStockOrder.GetDibStatus() rqRet = objStockOrder.GetDibMsg1() print("통신상태", rqStatus, rqRet) if rqStatus != 0: exit() return # 주식 현재가 조회 def printStockData(self, stock_code, day): objCpCybos = win32com.client.Dispatch("CpUtil.CpCybos") bConnect = objCpCybos.IsConnect if (bConnect == 0): print("PLUS가 정상적으로 연결되지 않음. ") exit() # 차트 객체 구하기 objStockChart = win32com.client.Dispatch("CpSysDib.StockChart") objStockChart.SetInputValue(0, 'A' + stock_code) # 종목 코드 objStockChart.SetInputValue(1, ord('2')) # 1: 기간으로 조회, 2: 개수로 조회 objStockChart.SetInputValue(2, day) # 기간 조회 시, 시작일 objStockChart.SetInputValue(3, '20210909') # 기간 조회 시, 종료일 objStockChart.SetInputValue(4, 100000000) # 조회 시 가져오는 Line 개수 objStockChart.SetInputValue(5, [0, 1, 2, 3, 4, 5, 8]) # 날짜,시간,시가,고가,저가,종가,거래량 objStockChart.SetInputValue(6, ord('m')) # '차트 주가 - 월(M), 주(W), 일(D), 시(H), 분(m), 초(S) 차트 요청 objStockChart.SetInputValue(7, 1) objStockChart.SetInputValue(9, ord('1')) # 수정주가 사용 objStockChart.BlockRequest() size = objStockChart.GetHeaderValue(3) # print("날짜", "시간", "시가", "고가", "저가", "종가", "거래량") for i in range(size - 1, -1, -1): day = objStockChart.GetDataValue(0, i) time = objStockChart.GetDataValue(1, i) open = objStockChart.GetDataValue(2, i) high = objStockChart.GetDataValue(3, i) low = objStockChart.GetDataValue(4, i) close = objStockChart.GetDataValue(5, i) vol = objStockChart.GetDataValue(6, i) print(day, time, open, high, low, close, vol) return # 주식 현재가 조회 def getRealTime(self, stock_code, day, result): objCpCybos = win32com.client.Dispatch("CpUtil.CpCybos") bConnect = objCpCybos.IsConnect if (bConnect == 0): print("PLUS가 정상적으로 연결되지 않음. ") exit() # 차트 객체 구하기 objStockChart = win32com.client.Dispatch("CpSysDib.StockChart") objStockChart.SetInputValue(0, 'A'+stock_code) # 종목 코드 objStockChart.SetInputValue(1, ord('2')) # 1: 기간으로 조회, 2: 개수로 조회 objStockChart.SetInputValue(2, day) # 기간 조회 시, 시작일 #objStockChart.SetInputValue(3, '20210915') # 기간 조회 시, 종료일 objStockChart.SetInputValue(4, 100) # 조회 시 가져오는 Line 개수 objStockChart.SetInputValue(5, [0, 1, 2, 3, 4, 5, 8]) # 날짜,시간,시가,고가,저가,종가,거래량 objStockChart.SetInputValue(6, ord('S')) # '차트 주가 - 월(M), 주(W), 일(D), 시(H), 분(m), 초(S) 차트 요청 objStockChart.SetInputValue(7, 1) objStockChart.SetInputValue(9, ord('1')) # 수정주가 사용 objStockChart.BlockRequest() size = objStockChart.GetHeaderValue(3) #print("날짜", "시간", "시가", "고가", "저가", "종가", "거래량") start_time = datetime.strptime(day+" 093000", '%Y%m%d %H%M%S') for i in range(size-1, -1, -1): #day = objStockChart.GetDataValue(0, i) time = datetime.strptime(day+" "+objStockChart.GetDataValue(1, i), '%Y%m%d %H%M%S') if time < start_time: continue #open = objStockChart.GetDataValue(2, i) #high = objStockChart.GetDataValue(3, i) #low = objStockChart.GetDataValue(4, i) close = objStockChart.GetDataValue(5, i) vol = objStockChart.GetDataValue(6, i) #print(day, time, open, high, low, close, vol) if time[i] not in result["check"]: result["check"].add(time) result["time"].append(time) result["close"].append(close) result["vol"].append(vol) return def getCSV(self, fileName, day, result): data = pd.read_csv(fileName) days = data.날짜 time = data.시간 close = data.종가 vol = data.거래량 start_time = datetime.strptime(day + " 093000", '%Y%m%d %H%M%S') for i in range(len(data)): temp = datetime.strptime(str(days[i]) + " " + str(time[i]), '%Y%m%d %H%M%S') if temp < start_time: continue if temp not in result["check"]: result["check"].add(temp) result["time"].append(temp) result["close"].append(close[i]) result["vol"].append(vol[i]) return def analyze(self, result): y_value = result["close"] #x_value = [i for i in range(len(y_value))] vol = result["vol"] df = pd.DataFrame(y_value) point = result["time"] max20 = df.rolling(window=20).mean() stddev20 = df.rolling(window=20).std() upper_df = max20 + (stddev20 * 2) # 상단 볼린저 밴드 lower_df = max20 - (stddev20 * 2) # 하단 볼린저 밴드 window = 60 open = y_value[:len(y_value)-window] + [y_value[len(y_value)-window] for i in range(window)] # 시가 close = [y_value[window-1] for i in range(window-1)] + y_value[window-1:] # 종가 high_df = df.rolling(window=window).max() # 고가 low_df = df.rolling(window=window).min() # 저가 high, low, upper, lower = [], [], [], [] for i in range(len(high_df)): if i < window: high.append(high_df.values[window - 1][0]) low.append(low_df.values[window - 1][0]) upper.append(upper_df.values[window - 1][0]) lower.append(lower_df.values[window - 1][0]) else: high.append(high_df.values[i][0]) low.append(low_df.values[i][0]) upper.append(upper_df.values[i][0]) lower.append(lower_df.values[i][0]) open_temp = [open[i] for i in range(len(open)) if i % window == 0] high_temp = [high[i] for i in range(len(high)) if i % window == 0] low_temp = [low[i] for i in range(len(low)) if i % window == 0] close_temp = [close[i] for i in range(len(close)) if i % window == 0] vol_temp = [vol[i] for i in range(len(vol)) if i % window == 0] point_temp = [point[i] for i in range(len(point)) if i % window == 0] upper_temp = [upper[i] for i in range(len(upper)) if i % window == 0] lower_temp = [lower[i] for i in range(len(lower)) if i % window == 0] temp = {"Open": open_temp, "High": high_temp, "Low": low_temp, "Close": close_temp, "Volume": vol_temp, "Date": point_temp} data = pd.DataFrame(temp) df_final_time = pd.DatetimeIndex(point_temp) data.index = df_final_time return data, upper_temp, lower_temp def buyRealTime(self, stock_code, day): result = {"check": set(), "time": [], "close": [], "vol": []} #self.getRealTime(stock_code, day, result) self.getCSV("data_s_1.csv", day, result) self.getCSV("data_s_2.csv", day, result) data, upper, lower = self.analyze(result) data['Open'] = pd.to_numeric(data['Open']) data['High'] = pd.to_numeric(data['High']) data['Low'] = pd.to_numeric(data['Low']) data['Close'] = pd.to_numeric(data['Close']) data['Volume'] = pd.to_numeric(data['Volume']) fig = plt.figure(figsize=(10, 10)) ax = fig.add_subplot(111) """ # 이동평균선 그리기 ax.plot(upper, label='upper', linewidth=1.5) ax.plot(lower, label='lower', linewidth=1.5) # 참고) https://pypi.org/project/mplfinance/ mc = mpf.make_marketcolors(up='red', down='blue', inherit=True) style_final = mpf.make_mpf_style(marketcolors=mc) mpf.plot(data, ax=ax, style=style_final) plt.grid() plt.show() """ bolinger_upper = go.Scatter(x=data['Date'], y=upper, name="upper", line_color='#8B4513') bolinger_lower = go.Scatter(x=data['Date'], y=lower, name="lower", line_color='#8B4513') candle_stick = go.Candlestick(x=data['Date'], open=data['Open'], high=data['High'], low=data['Low'], close=data['Close']) fig = go.Figure(data=[candle_stick, bolinger_upper, bolinger_lower]) fig.update_layout(title="2x") fig.show() return if __name__ == "__main__": today = datetime.today() PROJECT_HOME = os.path.join(os.path.dirname(os.path.join(os.path.dirname(__file__)))) RESOURCE_DIR = PROJECT_HOME + "/resources/analysis/"+today.strftime("%Y%m%d") stock_code = "252670" day = datetime.today().strftime("%Y%m%d") day = '20210917' hts = HTS() #hts.all_stocks() #hts.getChartData(stock_code) #hts.currentStock(stock_code) hts.printStockData(stock_code, day) #hts.buyRealTime(stock_code, day) print ("done...")