Precompute p_ball to speed up exhaustive filtering, add fixed-ball validation with labeled exceptions, and improve portfolio selection via ymd-seeded shuffle and coverage-aware tie-breaking. Include lotto draw 1225 history update. Co-authored-by: Cursor <cursoragent@cursor.com>
409 lines
16 KiB
Python
409 lines
16 KiB
Python
# 웹 호출 라이브러리를 호출합니다.
|
|
import time
|
|
import requests
|
|
from DataCrawler import DataCrawler
|
|
|
|
import json
|
|
import os
|
|
import random
|
|
import pandas as pd
|
|
import itertools
|
|
from collections import Counter
|
|
from datetime import datetime, timedelta
|
|
from TelegramBot import TelegramBot
|
|
|
|
from final_BallFilter import BallFilter
|
|
|
|
COST_PER_GAME = 1000
|
|
MAX_BUDGET_KRW = 70000
|
|
MAX_GAMES_PER_DRAW = MAX_BUDGET_KRW // COST_PER_GAME
|
|
|
|
class Practice:
|
|
|
|
bot = None
|
|
preprocessor = None
|
|
predictor = None
|
|
|
|
extract_count = None
|
|
|
|
def __init__(self, resources_path):
|
|
self.bot = TelegramBot()
|
|
|
|
return
|
|
|
|
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
|
|
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일
|
|
def craw(self, lottoHistoryFile, drwNo=None):
|
|
|
|
ball = None
|
|
if drwNo != None:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8")
|
|
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
|
|
if result['returnValue'] != 'success':
|
|
return None
|
|
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
else:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8")
|
|
|
|
# 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다.
|
|
idx = 1
|
|
while True:
|
|
# 1회차부터 지정된 회차까지의 URL을 생성합니다.
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
if result['returnValue'] != 'success':
|
|
break
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
idx += 1
|
|
time.sleep(0.5)
|
|
# 저장한 파일을 종료합니다.
|
|
jsonFp.close()
|
|
textFp.close()
|
|
|
|
return ball
|
|
|
|
def predict1(self, result_json):
|
|
result_json.append([6, 7, 10, 11, 20, 45])
|
|
result_json.append([5, 12, 16, 27, 39, 45])
|
|
result_json.append([5, 15, 18, 29, 36, 41])
|
|
result_json.append([1, 17, 20, 25, 36, 45])
|
|
result_json.append([6, 15, 20, 23, 37, 43])
|
|
result_json.append([8, 15, 19, 23, 38, 41])
|
|
result_json.append([3, 14, 20, 27, 35, 45])
|
|
result_json.append([5, 11, 19, 24, 40, 45])
|
|
result_json.append([5, 9, 20, 25, 32, 37])
|
|
result_json.append([2, 13, 19, 27, 40, 43])
|
|
result_json.append([4, 13, 17, 28, 39, 43])
|
|
|
|
return
|
|
|
|
def validate_fixed_balls(self, resources_path, ymd, fixed_balls):
|
|
"""
|
|
고정수 BallFilter 통과 여부를 검증한다.
|
|
|
|
Returns:
|
|
dict: total, passed_count, failed_count, draw_no, details
|
|
"""
|
|
lotto_history_json = os.path.join(resources_path, 'lotto_history.json')
|
|
ball_filter = BallFilter(lotto_history_json)
|
|
draw_no = ball_filter.getNextNo(ymd)
|
|
|
|
lotto_history_txt = os.path.join(resources_path, 'lotto_history.txt')
|
|
df_ball = pd.read_csv(lotto_history_txt, header=None)
|
|
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
|
|
prev_row = df_ball[df_ball['no'] == draw_no - 1].values.tolist()[0]
|
|
p_ball = prev_row[1:7]
|
|
|
|
details = []
|
|
passed_count = 0
|
|
for index, ball in enumerate(fixed_balls):
|
|
filter_type = ball_filter.filter(
|
|
ball=ball, no=draw_no, until_end=False, df=df_ball, p_ball=p_ball
|
|
)
|
|
passed = len(filter_type) == 0
|
|
if passed:
|
|
passed_count += 1
|
|
details.append({
|
|
'index': index + 1,
|
|
'ball': ball,
|
|
'passed': passed,
|
|
'filter_reasons': sorted(filter_type),
|
|
})
|
|
|
|
return {
|
|
'draw_no': draw_no,
|
|
'total': len(fixed_balls),
|
|
'passed_count': passed_count,
|
|
'failed_count': len(fixed_balls) - passed_count,
|
|
'details': details,
|
|
}
|
|
|
|
@staticmethod
|
|
def format_fixed_validation_summary(validation):
|
|
"""고정수 검증 결과를 Telegram/로그용 문자열로 변환한다."""
|
|
lines = [
|
|
" - 고정수 필터 검증: {}/{} 통과".format(
|
|
validation['passed_count'], validation['total']
|
|
)
|
|
]
|
|
if validation['failed_count'] > 0:
|
|
lines.append(
|
|
" - 필터 예외 포함: {}개 (고정수 유지)".format(
|
|
validation['failed_count']
|
|
)
|
|
)
|
|
for item in validation['details']:
|
|
if item['passed']:
|
|
continue
|
|
reason = item['filter_reasons'][0] if item['filter_reasons'] else 'unknown'
|
|
lines.append(
|
|
" * #{} {} -> {}".format(item['index'], item['ball'], reason)
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap):
|
|
ball_set = set(ball)
|
|
|
|
for fixed_ball in fixed_balls:
|
|
if len(ball_set & set(fixed_ball)) > max_overlap:
|
|
return False
|
|
|
|
for selected_ball in selected_balls:
|
|
if len(ball_set & set(selected_ball)) > max_overlap:
|
|
return False
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def _portfolio_number_counts(fixed_balls, selected_balls):
|
|
"""포트폴리오 내 번호 등장 횟수를 집계한다."""
|
|
counts = Counter()
|
|
for ball in fixed_balls + selected_balls:
|
|
counts.update(ball)
|
|
return counts
|
|
|
|
@staticmethod
|
|
def _coverage_priority(ball, number_counts):
|
|
"""낮을수록 포트폴리오에 덜 등장한 번호 위주 조합이다."""
|
|
return sum(number_counts.get(number, 0) for number in ball)
|
|
|
|
def _pick_best_candidate(self, unique_candidates, selected_keys, fixed_balls, selected, max_overlap):
|
|
"""겹침 제약을 만족하는 후보 중 번호 커버리지가 가장 넓은 조합을 고른다."""
|
|
number_counts = self._portfolio_number_counts(fixed_balls, selected)
|
|
best_candidate = None
|
|
best_score = None
|
|
best_key = None
|
|
|
|
for candidate in unique_candidates:
|
|
key = tuple(candidate)
|
|
if key in selected_keys:
|
|
continue
|
|
if not self._can_add_ball(candidate, fixed_balls, selected, max_overlap):
|
|
continue
|
|
|
|
score = self._coverage_priority(candidate, number_counts)
|
|
if best_candidate is None or score < best_score or (score == best_score and key < best_key):
|
|
best_candidate = candidate
|
|
best_score = score
|
|
best_key = key
|
|
|
|
return best_candidate, best_key
|
|
|
|
def select_portfolio(self, fixed_balls, candidates, target_count, shuffle_seed=None):
|
|
"""
|
|
2차 포트폴리오 선정:
|
|
- 중복 제거
|
|
- shuffle_seed 기반 셔플로 순서 편향 완화
|
|
- 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택
|
|
- 동률 후보는 번호 커버리지가 넓은 조합 우선
|
|
"""
|
|
unique_candidates = []
|
|
seen = set()
|
|
fixed_keys = {tuple(sorted(fixed_ball)) for fixed_ball in fixed_balls}
|
|
|
|
for candidate in candidates:
|
|
key = tuple(sorted(candidate))
|
|
if key in seen or key in fixed_keys:
|
|
continue
|
|
seen.add(key)
|
|
unique_candidates.append(list(key))
|
|
|
|
if shuffle_seed is not None:
|
|
rng = random.Random(int(shuffle_seed))
|
|
rng.shuffle(unique_candidates)
|
|
|
|
if target_count <= 0:
|
|
return []
|
|
|
|
if len(unique_candidates) <= target_count:
|
|
return unique_candidates
|
|
|
|
selected = []
|
|
selected_keys = set()
|
|
overlap_stages = [2, 3, 4, 5]
|
|
|
|
for max_overlap in overlap_stages:
|
|
while len(selected) < target_count:
|
|
best_candidate, best_key = self._pick_best_candidate(
|
|
unique_candidates, selected_keys, fixed_balls, selected, max_overlap
|
|
)
|
|
if best_candidate is None:
|
|
break
|
|
|
|
selected.append(best_candidate)
|
|
selected_keys.add(best_key)
|
|
|
|
if len(selected) >= target_count:
|
|
return selected
|
|
|
|
while len(selected) < target_count:
|
|
best_candidate, best_key = self._pick_best_candidate(
|
|
unique_candidates, selected_keys, fixed_balls, selected, max_overlap=6
|
|
)
|
|
if best_candidate is None:
|
|
break
|
|
selected.append(best_candidate)
|
|
selected_keys.add(best_key)
|
|
|
|
return selected
|
|
|
|
def predict2(self, resources_path, ymd, fixed_balls, max_games_per_draw=MAX_GAMES_PER_DRAW):
|
|
|
|
candidates = [i for i in range(1, 46)]
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
|
|
ballFilter = BallFilter(lottoHistoryFileName)
|
|
no = ballFilter.getNextNo(ymd)
|
|
print("회차: {}".format(no))
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
|
|
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
|
|
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
|
|
|
|
prev_row = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
|
|
p_ball = prev_row[1:7]
|
|
|
|
passed_candidates = []
|
|
for idx, ball in enumerate(itertools.combinations(candidates, 6)):
|
|
|
|
if idx % 1000000 == 0:
|
|
print(" - {} processed, pass: {}".format(idx, len(passed_candidates)))
|
|
ball = list(ball)
|
|
|
|
filter_type = ballFilter.filter(
|
|
ball=ball, no=no, until_end=False, df=df_ball, p_ball=p_ball
|
|
)
|
|
filter_size = len(filter_type)
|
|
|
|
if 0 < filter_size:
|
|
continue
|
|
|
|
passed_candidates.append(ball)
|
|
|
|
variable_target_count = max(0, max_games_per_draw - len(fixed_balls))
|
|
selected_candidates = self.select_portfolio(
|
|
fixed_balls=fixed_balls,
|
|
candidates=passed_candidates,
|
|
target_count=variable_target_count,
|
|
shuffle_seed=ymd,
|
|
)
|
|
|
|
p_no = prev_row[0]
|
|
|
|
return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count
|
|
|
|
if __name__ == '__main__':
|
|
|
|
PROJECT_HOME = '.'
|
|
resources_path = os.path.join(PROJECT_HOME, 'resources')
|
|
|
|
# 데이터 수집
|
|
#dataCrawler = DataCrawler()
|
|
#dataCrawler.excute(resources_path)
|
|
|
|
today = datetime.today()
|
|
if today.weekday() == 5:
|
|
if today.hour > 20:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
elif today.weekday() == 6:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
|
|
last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d')
|
|
ymd = this_weekend.strftime('%Y%m%d')
|
|
|
|
print("ymd: {}".format(ymd))
|
|
|
|
# 로또 예측
|
|
practice = Practice(resources_path)
|
|
|
|
recommend_result_file = os.path.join(resources_path, "recommend_ball.biz_25.json")
|
|
if os.path.isfile(recommend_result_file):
|
|
with open(recommend_result_file, "r", encoding="utf-8") as result_fp:
|
|
result_json = json.load(result_fp)
|
|
result_json[ymd] = []
|
|
else:
|
|
result_json = {ymd: []}
|
|
|
|
# 매주 고정
|
|
fixed_balls = []
|
|
practice.predict1(fixed_balls)
|
|
fixed_validation = practice.validate_fixed_balls(
|
|
resources_path=resources_path,
|
|
ymd=ymd,
|
|
fixed_balls=fixed_balls,
|
|
)
|
|
print(Practice.format_fixed_validation_summary(fixed_validation))
|
|
result_json[ymd].extend(fixed_balls)
|
|
|
|
# 필터 기반 예측
|
|
p_no, p_ball, selected_candidates, passed_count, variable_target_count = practice.predict2(
|
|
resources_path=resources_path,
|
|
ymd=ymd,
|
|
fixed_balls=fixed_balls,
|
|
max_games_per_draw=MAX_GAMES_PER_DRAW
|
|
)
|
|
result_json[ymd].extend(selected_candidates)
|
|
|
|
if '_meta' not in result_json:
|
|
result_json['_meta'] = {}
|
|
result_json['_meta'][ymd] = {
|
|
'fixed_validation': fixed_validation,
|
|
'passed_count': passed_count,
|
|
'selected_count': len(selected_candidates),
|
|
'portfolio_shuffle_seed': ymd,
|
|
}
|
|
|
|
with open(recommend_result_file, 'w', encoding='utf-8') as outFp:
|
|
json.dump(result_json, outFp, ensure_ascii=False)
|
|
|
|
total_games = len(result_json[ymd])
|
|
total_cost = total_games * COST_PER_GAME
|
|
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
|
|
p_str += " - 고정수: {}개\n".format(len(fixed_balls))
|
|
p_str += Practice.format_fixed_validation_summary(fixed_validation) + "\n"
|
|
p_str += " - 필터 통과 후보: {}개\n".format(passed_count)
|
|
p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count)
|
|
p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW)
|
|
for i, ball in enumerate(result_json[ymd]):
|
|
p_str += " {}. {}\n".format((i+1), str(ball))
|
|
if (i+1) % 100 == 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
p_str = ''
|
|
|
|
if len(result_json[ymd]) % 100 != 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
|
|
print("size: {}".format(total_games))
|
|
print("cost: {:,} KRW / limit: {:,} KRW".format(total_cost, MAX_BUDGET_KRW))
|
|
|
|
# https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K
|
|
# https://www.youtube.com/watch?v=YwiHaa1KNwA
|
|
|
|
print("done...") |