Files
DeepLottery/final_practice.py
dsyoon b82b5a58ee perf: filter scan optimization and portfolio selection improvements
Precompute p_ball to speed up exhaustive filtering, add fixed-ball validation with labeled exceptions, and improve portfolio selection via ymd-seeded shuffle and coverage-aware tie-breaking. Include lotto draw 1225 history update.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-27 11:10:37 +09:00

409 lines
16 KiB
Python

# 웹 호출 라이브러리를 호출합니다.
import time
import requests
from DataCrawler import DataCrawler
import json
import os
import random
import pandas as pd
import itertools
from collections import Counter
from datetime import datetime, timedelta
from TelegramBot import TelegramBot
from final_BallFilter import BallFilter
COST_PER_GAME = 1000
MAX_BUDGET_KRW = 70000
MAX_GAMES_PER_DRAW = MAX_BUDGET_KRW // COST_PER_GAME
class Practice:
bot = None
preprocessor = None
predictor = None
extract_count = None
def __init__(self, resources_path):
self.bot = TelegramBot()
return
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일
def craw(self, lottoHistoryFile, drwNo=None):
ball = None
if drwNo != None:
# 로또 데이터를 저장할 파일을 선언합니다.
jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8")
textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8")
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo)
# URL을 호출합니다.
res = requests.post(url)
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
result = res.json()
if result['returnValue'] != 'success':
return None
# 가져온 Json 포맷을 파일로 저장합니다.
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
else:
# 로또 데이터를 저장할 파일을 선언합니다.
jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8")
textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8")
# 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다.
idx = 1
while True:
# 1회차부터 지정된 회차까지의 URL을 생성합니다.
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx)
# URL을 호출합니다.
res = requests.post(url)
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
result = res.json()
if result['returnValue'] != 'success':
break
# 가져온 Json 포맷을 파일로 저장합니다.
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
idx += 1
time.sleep(0.5)
# 저장한 파일을 종료합니다.
jsonFp.close()
textFp.close()
return ball
def predict1(self, result_json):
result_json.append([6, 7, 10, 11, 20, 45])
result_json.append([5, 12, 16, 27, 39, 45])
result_json.append([5, 15, 18, 29, 36, 41])
result_json.append([1, 17, 20, 25, 36, 45])
result_json.append([6, 15, 20, 23, 37, 43])
result_json.append([8, 15, 19, 23, 38, 41])
result_json.append([3, 14, 20, 27, 35, 45])
result_json.append([5, 11, 19, 24, 40, 45])
result_json.append([5, 9, 20, 25, 32, 37])
result_json.append([2, 13, 19, 27, 40, 43])
result_json.append([4, 13, 17, 28, 39, 43])
return
def validate_fixed_balls(self, resources_path, ymd, fixed_balls):
"""
고정수 BallFilter 통과 여부를 검증한다.
Returns:
dict: total, passed_count, failed_count, draw_no, details
"""
lotto_history_json = os.path.join(resources_path, 'lotto_history.json')
ball_filter = BallFilter(lotto_history_json)
draw_no = ball_filter.getNextNo(ymd)
lotto_history_txt = os.path.join(resources_path, 'lotto_history.txt')
df_ball = pd.read_csv(lotto_history_txt, header=None)
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
prev_row = df_ball[df_ball['no'] == draw_no - 1].values.tolist()[0]
p_ball = prev_row[1:7]
details = []
passed_count = 0
for index, ball in enumerate(fixed_balls):
filter_type = ball_filter.filter(
ball=ball, no=draw_no, until_end=False, df=df_ball, p_ball=p_ball
)
passed = len(filter_type) == 0
if passed:
passed_count += 1
details.append({
'index': index + 1,
'ball': ball,
'passed': passed,
'filter_reasons': sorted(filter_type),
})
return {
'draw_no': draw_no,
'total': len(fixed_balls),
'passed_count': passed_count,
'failed_count': len(fixed_balls) - passed_count,
'details': details,
}
@staticmethod
def format_fixed_validation_summary(validation):
"""고정수 검증 결과를 Telegram/로그용 문자열로 변환한다."""
lines = [
" - 고정수 필터 검증: {}/{} 통과".format(
validation['passed_count'], validation['total']
)
]
if validation['failed_count'] > 0:
lines.append(
" - 필터 예외 포함: {}개 (고정수 유지)".format(
validation['failed_count']
)
)
for item in validation['details']:
if item['passed']:
continue
reason = item['filter_reasons'][0] if item['filter_reasons'] else 'unknown'
lines.append(
" * #{} {} -> {}".format(item['index'], item['ball'], reason)
)
return "\n".join(lines)
def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap):
ball_set = set(ball)
for fixed_ball in fixed_balls:
if len(ball_set & set(fixed_ball)) > max_overlap:
return False
for selected_ball in selected_balls:
if len(ball_set & set(selected_ball)) > max_overlap:
return False
return True
@staticmethod
def _portfolio_number_counts(fixed_balls, selected_balls):
"""포트폴리오 내 번호 등장 횟수를 집계한다."""
counts = Counter()
for ball in fixed_balls + selected_balls:
counts.update(ball)
return counts
@staticmethod
def _coverage_priority(ball, number_counts):
"""낮을수록 포트폴리오에 덜 등장한 번호 위주 조합이다."""
return sum(number_counts.get(number, 0) for number in ball)
def _pick_best_candidate(self, unique_candidates, selected_keys, fixed_balls, selected, max_overlap):
"""겹침 제약을 만족하는 후보 중 번호 커버리지가 가장 넓은 조합을 고른다."""
number_counts = self._portfolio_number_counts(fixed_balls, selected)
best_candidate = None
best_score = None
best_key = None
for candidate in unique_candidates:
key = tuple(candidate)
if key in selected_keys:
continue
if not self._can_add_ball(candidate, fixed_balls, selected, max_overlap):
continue
score = self._coverage_priority(candidate, number_counts)
if best_candidate is None or score < best_score or (score == best_score and key < best_key):
best_candidate = candidate
best_score = score
best_key = key
return best_candidate, best_key
def select_portfolio(self, fixed_balls, candidates, target_count, shuffle_seed=None):
"""
2차 포트폴리오 선정:
- 중복 제거
- shuffle_seed 기반 셔플로 순서 편향 완화
- 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택
- 동률 후보는 번호 커버리지가 넓은 조합 우선
"""
unique_candidates = []
seen = set()
fixed_keys = {tuple(sorted(fixed_ball)) for fixed_ball in fixed_balls}
for candidate in candidates:
key = tuple(sorted(candidate))
if key in seen or key in fixed_keys:
continue
seen.add(key)
unique_candidates.append(list(key))
if shuffle_seed is not None:
rng = random.Random(int(shuffle_seed))
rng.shuffle(unique_candidates)
if target_count <= 0:
return []
if len(unique_candidates) <= target_count:
return unique_candidates
selected = []
selected_keys = set()
overlap_stages = [2, 3, 4, 5]
for max_overlap in overlap_stages:
while len(selected) < target_count:
best_candidate, best_key = self._pick_best_candidate(
unique_candidates, selected_keys, fixed_balls, selected, max_overlap
)
if best_candidate is None:
break
selected.append(best_candidate)
selected_keys.add(best_key)
if len(selected) >= target_count:
return selected
while len(selected) < target_count:
best_candidate, best_key = self._pick_best_candidate(
unique_candidates, selected_keys, fixed_balls, selected, max_overlap=6
)
if best_candidate is None:
break
selected.append(best_candidate)
selected_keys.add(best_key)
return selected
def predict2(self, resources_path, ymd, fixed_balls, max_games_per_draw=MAX_GAMES_PER_DRAW):
candidates = [i for i in range(1, 46)]
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
ballFilter = BallFilter(lottoHistoryFileName)
no = ballFilter.getNextNo(ymd)
print("회차: {}".format(no))
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
prev_row = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
p_ball = prev_row[1:7]
passed_candidates = []
for idx, ball in enumerate(itertools.combinations(candidates, 6)):
if idx % 1000000 == 0:
print(" - {} processed, pass: {}".format(idx, len(passed_candidates)))
ball = list(ball)
filter_type = ballFilter.filter(
ball=ball, no=no, until_end=False, df=df_ball, p_ball=p_ball
)
filter_size = len(filter_type)
if 0 < filter_size:
continue
passed_candidates.append(ball)
variable_target_count = max(0, max_games_per_draw - len(fixed_balls))
selected_candidates = self.select_portfolio(
fixed_balls=fixed_balls,
candidates=passed_candidates,
target_count=variable_target_count,
shuffle_seed=ymd,
)
p_no = prev_row[0]
return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count
if __name__ == '__main__':
PROJECT_HOME = '.'
resources_path = os.path.join(PROJECT_HOME, 'resources')
# 데이터 수집
#dataCrawler = DataCrawler()
#dataCrawler.excute(resources_path)
today = datetime.today()
if today.weekday() == 5:
if today.hour > 20:
this_weekend = today + timedelta(days=(12 - today.weekday()))
else:
this_weekend = today + timedelta(days=(5 - today.weekday()))
elif today.weekday() == 6:
this_weekend = today + timedelta(days=(12 - today.weekday()))
else:
this_weekend = today + timedelta(days=(5 - today.weekday()))
last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d')
ymd = this_weekend.strftime('%Y%m%d')
print("ymd: {}".format(ymd))
# 로또 예측
practice = Practice(resources_path)
recommend_result_file = os.path.join(resources_path, "recommend_ball.biz_25.json")
if os.path.isfile(recommend_result_file):
with open(recommend_result_file, "r", encoding="utf-8") as result_fp:
result_json = json.load(result_fp)
result_json[ymd] = []
else:
result_json = {ymd: []}
# 매주 고정
fixed_balls = []
practice.predict1(fixed_balls)
fixed_validation = practice.validate_fixed_balls(
resources_path=resources_path,
ymd=ymd,
fixed_balls=fixed_balls,
)
print(Practice.format_fixed_validation_summary(fixed_validation))
result_json[ymd].extend(fixed_balls)
# 필터 기반 예측
p_no, p_ball, selected_candidates, passed_count, variable_target_count = practice.predict2(
resources_path=resources_path,
ymd=ymd,
fixed_balls=fixed_balls,
max_games_per_draw=MAX_GAMES_PER_DRAW
)
result_json[ymd].extend(selected_candidates)
if '_meta' not in result_json:
result_json['_meta'] = {}
result_json['_meta'][ymd] = {
'fixed_validation': fixed_validation,
'passed_count': passed_count,
'selected_count': len(selected_candidates),
'portfolio_shuffle_seed': ymd,
}
with open(recommend_result_file, 'w', encoding='utf-8') as outFp:
json.dump(result_json, outFp, ensure_ascii=False)
total_games = len(result_json[ymd])
total_cost = total_games * COST_PER_GAME
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
p_str += " - 고정수: {}\n".format(len(fixed_balls))
p_str += Practice.format_fixed_validation_summary(fixed_validation) + "\n"
p_str += " - 필터 통과 후보: {}\n".format(passed_count)
p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count)
p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW)
for i, ball in enumerate(result_json[ymd]):
p_str += " {}. {}\n".format((i+1), str(ball))
if (i+1) % 100 == 0:
practice.bot.sendMsg("{}".format(p_str))
p_str = ''
if len(result_json[ymd]) % 100 != 0:
practice.bot.sendMsg("{}".format(p_str))
print("size: {}".format(total_games))
print("cost: {:,} KRW / limit: {:,} KRW".format(total_cost, MAX_BUDGET_KRW))
# https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K
# https://www.youtube.com/watch?v=YwiHaa1KNwA
print("done...")