Keep the fixed 11-number set intact while adding a second-stage portfolio selection that caps final recommendations to the 70,000 KRW budget, and update docs/data/scripts to match the current project structure and runtime flow. Co-authored-by: Cursor <cursoragent@cursor.com>
282 lines
11 KiB
Python
282 lines
11 KiB
Python
# 웹 호출 라이브러리를 호출합니다.
|
|
import time
|
|
import requests
|
|
from DataCrawler import DataCrawler
|
|
|
|
import json
|
|
import os
|
|
import pandas as pd
|
|
import itertools
|
|
from datetime import datetime, timedelta
|
|
from TelegramBot import TelegramBot
|
|
|
|
from final_BallFilter import BallFilter
|
|
|
|
COST_PER_GAME = 1000
|
|
MAX_BUDGET_KRW = 70000
|
|
MAX_GAMES_PER_DRAW = MAX_BUDGET_KRW // COST_PER_GAME
|
|
|
|
class Practice:
|
|
|
|
bot = None
|
|
preprocessor = None
|
|
predictor = None
|
|
|
|
extract_count = None
|
|
|
|
def __init__(self, resources_path):
|
|
self.bot = TelegramBot()
|
|
|
|
return
|
|
|
|
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
|
|
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일
|
|
def craw(self, lottoHistoryFile, drwNo=None):
|
|
|
|
ball = None
|
|
if drwNo != None:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8")
|
|
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
|
|
if result['returnValue'] != 'success':
|
|
return None
|
|
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
else:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8")
|
|
|
|
# 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다.
|
|
idx = 1
|
|
while True:
|
|
# 1회차부터 지정된 회차까지의 URL을 생성합니다.
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
if result['returnValue'] != 'success':
|
|
break
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
idx += 1
|
|
time.sleep(0.5)
|
|
# 저장한 파일을 종료합니다.
|
|
jsonFp.close()
|
|
textFp.close()
|
|
|
|
return ball
|
|
|
|
def predict1(self, result_json):
|
|
result_json.append([6, 7, 10, 11, 20, 45])
|
|
result_json.append([5, 12, 16, 27, 39, 45])
|
|
result_json.append([5, 15, 18, 29, 36, 41])
|
|
result_json.append([1, 17, 20, 25, 36, 45])
|
|
result_json.append([6, 15, 20, 23, 37, 43])
|
|
result_json.append([8, 15, 19, 23, 38, 41])
|
|
result_json.append([3, 14, 20, 27, 35, 45])
|
|
result_json.append([5, 11, 19, 24, 40, 45])
|
|
result_json.append([5, 9, 20, 25, 32, 37])
|
|
result_json.append([2, 13, 19, 27, 40, 43])
|
|
result_json.append([4, 13, 17, 28, 39, 43])
|
|
|
|
return
|
|
|
|
def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap):
|
|
ball_set = set(ball)
|
|
|
|
for fixed_ball in fixed_balls:
|
|
if len(ball_set & set(fixed_ball)) > max_overlap:
|
|
return False
|
|
|
|
for selected_ball in selected_balls:
|
|
if len(ball_set & set(selected_ball)) > max_overlap:
|
|
return False
|
|
|
|
return True
|
|
|
|
def select_portfolio(self, fixed_balls, candidates, target_count):
|
|
"""
|
|
2차 포트폴리오 선정:
|
|
- 중복 제거
|
|
- 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택
|
|
"""
|
|
unique_candidates = []
|
|
seen = set()
|
|
fixed_keys = {tuple(sorted(fixed_ball)) for fixed_ball in fixed_balls}
|
|
|
|
for candidate in candidates:
|
|
key = tuple(sorted(candidate))
|
|
if key in seen or key in fixed_keys:
|
|
continue
|
|
seen.add(key)
|
|
unique_candidates.append(list(key))
|
|
|
|
if target_count <= 0:
|
|
return []
|
|
|
|
if len(unique_candidates) <= target_count:
|
|
return unique_candidates
|
|
|
|
selected = []
|
|
selected_keys = set()
|
|
overlap_stages = [2, 3, 4, 5]
|
|
|
|
for max_overlap in overlap_stages:
|
|
for candidate in unique_candidates:
|
|
key = tuple(candidate)
|
|
if key in selected_keys:
|
|
continue
|
|
|
|
if self._can_add_ball(candidate, fixed_balls, selected, max_overlap):
|
|
selected.append(candidate)
|
|
selected_keys.add(key)
|
|
if len(selected) >= target_count:
|
|
return selected
|
|
|
|
# 단계 완화 후에도 부족하면 남은 조합을 순서대로 채움
|
|
for candidate in unique_candidates:
|
|
key = tuple(candidate)
|
|
if key in selected_keys:
|
|
continue
|
|
selected.append(candidate)
|
|
selected_keys.add(key)
|
|
if len(selected) >= target_count:
|
|
break
|
|
|
|
return selected
|
|
|
|
def predict2(self, resources_path, ymd, fixed_balls, max_games_per_draw=MAX_GAMES_PER_DRAW):
|
|
|
|
candidates = [i for i in range(1, 46)]
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
|
|
ballFilter = BallFilter(lottoHistoryFileName)
|
|
no = ballFilter.getNextNo(ymd)
|
|
print("회차: {}".format(no))
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
|
|
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
|
|
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
|
|
|
|
passed_candidates = []
|
|
nCr = list(itertools.combinations(candidates, 6))
|
|
for idx, ball in enumerate(nCr):
|
|
|
|
if idx % 1000000 == 0:
|
|
print(" - {} processed, pass: {}".format(idx, len(passed_candidates)))
|
|
ball = list(ball)
|
|
|
|
filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
|
|
filter_size = len(filter_type)
|
|
|
|
if 0 < filter_size:
|
|
continue
|
|
|
|
passed_candidates.append(ball)
|
|
|
|
variable_target_count = max(0, max_games_per_draw - len(fixed_balls))
|
|
selected_candidates = self.select_portfolio(
|
|
fixed_balls=fixed_balls,
|
|
candidates=passed_candidates,
|
|
target_count=variable_target_count
|
|
)
|
|
|
|
p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
|
|
p_no = p_ball[0]
|
|
p_ball = p_ball[1:7]
|
|
|
|
return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count
|
|
|
|
if __name__ == '__main__':
|
|
|
|
PROJECT_HOME = '.'
|
|
resources_path = os.path.join(PROJECT_HOME, 'resources')
|
|
|
|
# 데이터 수집
|
|
#dataCrawler = DataCrawler()
|
|
#dataCrawler.excute(resources_path)
|
|
|
|
today = datetime.today()
|
|
if today.weekday() == 5:
|
|
if today.hour > 20:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
elif today.weekday() == 6:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
|
|
last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d')
|
|
ymd = this_weekend.strftime('%Y%m%d')
|
|
|
|
print("ymd: {}".format(ymd))
|
|
|
|
# 로또 예측
|
|
practice = Practice(resources_path)
|
|
|
|
recommend_result_file = os.path.join(resources_path, "recommend_ball.biz_25.json")
|
|
if os.path.isfile(recommend_result_file):
|
|
with open(recommend_result_file, "r", encoding="utf-8") as result_fp:
|
|
result_json = json.load(result_fp)
|
|
result_json[ymd] = []
|
|
else:
|
|
result_json = {ymd: []}
|
|
|
|
# 매주 고정
|
|
fixed_balls = []
|
|
practice.predict1(fixed_balls)
|
|
result_json[ymd].extend(fixed_balls)
|
|
|
|
# 필터 기반 예측
|
|
p_no, p_ball, selected_candidates, passed_count, variable_target_count = practice.predict2(
|
|
resources_path=resources_path,
|
|
ymd=ymd,
|
|
fixed_balls=fixed_balls,
|
|
max_games_per_draw=MAX_GAMES_PER_DRAW
|
|
)
|
|
result_json[ymd].extend(selected_candidates)
|
|
|
|
with open(recommend_result_file, 'w', encoding='utf-8') as outFp:
|
|
json.dump(result_json, outFp, ensure_ascii=False)
|
|
|
|
total_games = len(result_json[ymd])
|
|
total_cost = total_games * COST_PER_GAME
|
|
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
|
|
p_str += " - 고정수: {}개\n".format(len(fixed_balls))
|
|
p_str += " - 필터 통과 후보: {}개\n".format(passed_count)
|
|
p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count)
|
|
p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW)
|
|
for i, ball in enumerate(result_json[ymd]):
|
|
p_str += " {}. {}\n".format((i+1), str(ball))
|
|
if (i+1) % 100 == 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
p_str = ''
|
|
|
|
if len(result_json[ymd]) % 100 != 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
|
|
print("size: {}".format(total_games))
|
|
print("cost: {:,} KRW / limit: {:,} KRW".format(total_cost, MAX_BUDGET_KRW))
|
|
|
|
# https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K
|
|
# https://www.youtube.com/watch?v=YwiHaa1KNwA
|
|
|
|
print("done...") |