192 lines
7.3 KiB
Python
192 lines
7.3 KiB
Python
# 웹 호출 라이브러리를 호출합니다.
|
|
import time
|
|
import requests
|
|
from DataCrawler import DataCrawler
|
|
|
|
import json
|
|
import os
|
|
import pandas as pd
|
|
import itertools
|
|
from datetime import datetime, timedelta
|
|
from TelegramBot import TelegramBot
|
|
|
|
from BallFilter_25 import BallFilter
|
|
|
|
class Practice:
|
|
|
|
bot = None
|
|
preprocessor = None
|
|
predictor = None
|
|
|
|
extract_count = None
|
|
|
|
def __init__(self, resources_path):
|
|
self.bot = TelegramBot()
|
|
|
|
return
|
|
|
|
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
|
|
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일
|
|
def craw(self, lottoHistoryFile, drwNo=None):
|
|
|
|
ball = None
|
|
if drwNo != None:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8")
|
|
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
|
|
if result['returnValue'] != 'success':
|
|
return None
|
|
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
else:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8")
|
|
|
|
# 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다.
|
|
idx = 1
|
|
while True:
|
|
# 1회차부터 지정된 회차까지의 URL을 생성합니다.
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
if result['returnValue'] != 'success':
|
|
break
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
idx += 1
|
|
time.sleep(0.5)
|
|
# 저장한 파일을 종료합니다.
|
|
jsonFp.close()
|
|
textFp.close()
|
|
|
|
return ball
|
|
|
|
def predict1(self, result_json):
|
|
result_json.append([6,7,10,11,20,45])
|
|
return
|
|
|
|
def predict2(self, resources_path, ymd, result_json):
|
|
|
|
candidates = [i for i in range(1, 46)]
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
|
|
ballFilter = BallFilter(lottoHistoryFileName)
|
|
no = ballFilter.getNextNo(ymd)
|
|
print("회차: {}".format(no))
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
|
|
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
|
|
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
|
|
|
|
#filter_ball=[1,2,4,6,10,11,11,17,18,20,21,22,23,24,26,27,28,30,31,32,33,34,37,38,39,40,42,44]
|
|
nCr = list(itertools.combinations(candidates, 6))
|
|
for idx, ball in enumerate(nCr):
|
|
|
|
if idx % 1000000 == 0:
|
|
print(" - {} processed...".format(idx))
|
|
|
|
ball = list(ball)
|
|
|
|
filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
|
|
filter_size = len(filter_type)
|
|
|
|
if 0 < filter_size:
|
|
continue
|
|
|
|
result_json.append(ball)
|
|
|
|
p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
|
|
p_no = p_ball[0]
|
|
p_ball = p_ball[1:7]
|
|
|
|
return p_no, p_ball
|
|
|
|
if __name__ == '__main__':
|
|
|
|
PROJECT_HOME = '.'
|
|
resources_path = os.path.join(PROJECT_HOME, 'resources')
|
|
|
|
# 데이터 수집
|
|
dataCrawler = DataCrawler()
|
|
dataCrawler.excute(resources_path)
|
|
|
|
today = datetime.today()
|
|
if today.weekday() == 5:
|
|
if today.hour > 20:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
elif today.weekday() == 6:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
|
|
last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d')
|
|
ymd = this_weekend.strftime('%Y%m%d')
|
|
|
|
print("ymd: {}".format(ymd))
|
|
|
|
# 로또 예측
|
|
practice = Practice(resources_path)
|
|
|
|
# 데이터 수집
|
|
lottoHistoryFile = PROJECT_HOME + '/resources/lotto_history'
|
|
lottoHistoryFileName = lottoHistoryFile + '.json'
|
|
with open(lottoHistoryFileName, "r", encoding='utf-8') as f:
|
|
for line in f:
|
|
if line != '\n':
|
|
last_json = json.loads(line)
|
|
|
|
ball = practice.craw(lottoHistoryFile, drwNo=last_json['drwNo'] + 1)
|
|
|
|
recommend_result_file = os.path.join(resources_path, "recommend_ball.biz_25.json")
|
|
if os.path.isfile(recommend_result_file):
|
|
result_fp = open(recommend_result_file, "r")
|
|
result_json = json.load(result_fp)
|
|
result_json[ymd] = []
|
|
else:
|
|
result_json = {ymd: []}
|
|
|
|
# 매주 고정
|
|
practice.predict1(result_json[ymd])
|
|
# 필터 기반 예측
|
|
p_no, p_ball = practice.predict2(resources_path, ymd, result_json[ymd])
|
|
|
|
with open(recommend_result_file, 'w', encoding='utf-8') as outFp:
|
|
json.dump(result_json, outFp, ensure_ascii=False)
|
|
|
|
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
|
|
for i, ball in enumerate(result_json[ymd]):
|
|
p_str += " {}. {}\n".format((i+1), str(ball))
|
|
if (i+1) % 100 == 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
p_str = ''
|
|
|
|
if len(result_json[ymd]) % 100 != 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
|
|
size = len(result_json[ymd])
|
|
print("size: {}".format(size))
|
|
|
|
# https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K
|
|
# https://www.youtube.com/watch?v=YwiHaa1KNwA
|
|
|
|
print("done...") |