perf: filter scan optimization and portfolio selection improvements

Precompute p_ball to speed up exhaustive filtering, add fixed-ball validation with labeled exceptions, and improve portfolio selection via ymd-seeded shuffle and coverage-aware tie-breaking. Include lotto draw 1225 history update.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dsyoon
2026-05-27 11:10:37 +09:00
parent aa0f925d4e
commit b82b5a58ee
4 changed files with 161 additions and 29 deletions

View File

@@ -3812,9 +3812,10 @@ class BallFilter:
if len(set_ball & {3, 20, 44}) == 3: return 2928 if len(set_ball & {3, 20, 44}) == 3: return 2928
return None return None
def extract_final_candidates(self, ball, no=None, until_end=False, df=None): def extract_final_candidates(self, ball, no=None, until_end=False, df=None, p_ball=None):
p_ball = df[df['no'] == no - 1].values.tolist()[0] if p_ball is None:
p_ball = p_ball[1:7] p_ball = df[df['no'] == no - 1].values.tolist()[0]
p_ball = p_ball[1:7]
filter_set = set() filter_set = set()
@@ -4447,7 +4448,9 @@ class BallFilter:
return filter_set return filter_set
def filter(self, ball, no, until_end=False, df=None, filter_ball=None): def filter(self, ball, no, until_end=False, df=None, filter_ball=None, p_ball=None):
filter_type = self.extract_final_candidates(ball=ball, no=no, until_end=until_end, df=df) filter_type = self.extract_final_candidates(
ball=ball, no=no, until_end=until_end, df=df, p_ball=p_ball
)
return filter_type return filter_type

View File

@@ -5,8 +5,10 @@ from DataCrawler import DataCrawler
import json import json
import os import os
import random
import pandas as pd import pandas as pd
import itertools import itertools
from collections import Counter
from datetime import datetime, timedelta from datetime import datetime, timedelta
from TelegramBot import TelegramBot from TelegramBot import TelegramBot
@@ -98,6 +100,70 @@ class Practice:
return return
def validate_fixed_balls(self, resources_path, ymd, fixed_balls):
"""
고정수 BallFilter 통과 여부를 검증한다.
Returns:
dict: total, passed_count, failed_count, draw_no, details
"""
lotto_history_json = os.path.join(resources_path, 'lotto_history.json')
ball_filter = BallFilter(lotto_history_json)
draw_no = ball_filter.getNextNo(ymd)
lotto_history_txt = os.path.join(resources_path, 'lotto_history.txt')
df_ball = pd.read_csv(lotto_history_txt, header=None)
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
prev_row = df_ball[df_ball['no'] == draw_no - 1].values.tolist()[0]
p_ball = prev_row[1:7]
details = []
passed_count = 0
for index, ball in enumerate(fixed_balls):
filter_type = ball_filter.filter(
ball=ball, no=draw_no, until_end=False, df=df_ball, p_ball=p_ball
)
passed = len(filter_type) == 0
if passed:
passed_count += 1
details.append({
'index': index + 1,
'ball': ball,
'passed': passed,
'filter_reasons': sorted(filter_type),
})
return {
'draw_no': draw_no,
'total': len(fixed_balls),
'passed_count': passed_count,
'failed_count': len(fixed_balls) - passed_count,
'details': details,
}
@staticmethod
def format_fixed_validation_summary(validation):
"""고정수 검증 결과를 Telegram/로그용 문자열로 변환한다."""
lines = [
" - 고정수 필터 검증: {}/{} 통과".format(
validation['passed_count'], validation['total']
)
]
if validation['failed_count'] > 0:
lines.append(
" - 필터 예외 포함: {}개 (고정수 유지)".format(
validation['failed_count']
)
)
for item in validation['details']:
if item['passed']:
continue
reason = item['filter_reasons'][0] if item['filter_reasons'] else 'unknown'
lines.append(
" * #{} {} -> {}".format(item['index'], item['ball'], reason)
)
return "\n".join(lines)
def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap): def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap):
ball_set = set(ball) ball_set = set(ball)
@@ -111,11 +177,48 @@ class Practice:
return True return True
def select_portfolio(self, fixed_balls, candidates, target_count): @staticmethod
def _portfolio_number_counts(fixed_balls, selected_balls):
"""포트폴리오 내 번호 등장 횟수를 집계한다."""
counts = Counter()
for ball in fixed_balls + selected_balls:
counts.update(ball)
return counts
@staticmethod
def _coverage_priority(ball, number_counts):
"""낮을수록 포트폴리오에 덜 등장한 번호 위주 조합이다."""
return sum(number_counts.get(number, 0) for number in ball)
def _pick_best_candidate(self, unique_candidates, selected_keys, fixed_balls, selected, max_overlap):
"""겹침 제약을 만족하는 후보 중 번호 커버리지가 가장 넓은 조합을 고른다."""
number_counts = self._portfolio_number_counts(fixed_balls, selected)
best_candidate = None
best_score = None
best_key = None
for candidate in unique_candidates:
key = tuple(candidate)
if key in selected_keys:
continue
if not self._can_add_ball(candidate, fixed_balls, selected, max_overlap):
continue
score = self._coverage_priority(candidate, number_counts)
if best_candidate is None or score < best_score or (score == best_score and key < best_key):
best_candidate = candidate
best_score = score
best_key = key
return best_candidate, best_key
def select_portfolio(self, fixed_balls, candidates, target_count, shuffle_seed=None):
""" """
2차 포트폴리오 선정: 2차 포트폴리오 선정:
- 중복 제거 - 중복 제거
- shuffle_seed 기반 셔플로 순서 편향 완화
- 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택 - 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택
- 동률 후보는 번호 커버리지가 넓은 조합 우선
""" """
unique_candidates = [] unique_candidates = []
seen = set() seen = set()
@@ -128,6 +231,10 @@ class Practice:
seen.add(key) seen.add(key)
unique_candidates.append(list(key)) unique_candidates.append(list(key))
if shuffle_seed is not None:
rng = random.Random(int(shuffle_seed))
rng.shuffle(unique_candidates)
if target_count <= 0: if target_count <= 0:
return [] return []
@@ -139,26 +246,27 @@ class Practice:
overlap_stages = [2, 3, 4, 5] overlap_stages = [2, 3, 4, 5]
for max_overlap in overlap_stages: for max_overlap in overlap_stages:
for candidate in unique_candidates: while len(selected) < target_count:
key = tuple(candidate) best_candidate, best_key = self._pick_best_candidate(
if key in selected_keys: unique_candidates, selected_keys, fixed_balls, selected, max_overlap
continue )
if best_candidate is None:
break
if self._can_add_ball(candidate, fixed_balls, selected, max_overlap): selected.append(best_candidate)
selected.append(candidate) selected_keys.add(best_key)
selected_keys.add(key)
if len(selected) >= target_count:
return selected
# 단계 완화 후에도 부족하면 남은 조합을 순서대로 채움
for candidate in unique_candidates:
key = tuple(candidate)
if key in selected_keys:
continue
selected.append(candidate)
selected_keys.add(key)
if len(selected) >= target_count: if len(selected) >= target_count:
return selected
while len(selected) < target_count:
best_candidate, best_key = self._pick_best_candidate(
unique_candidates, selected_keys, fixed_balls, selected, max_overlap=6
)
if best_candidate is None:
break break
selected.append(best_candidate)
selected_keys.add(best_key)
return selected return selected
@@ -175,15 +283,19 @@ class Practice:
df_ball = pd.read_csv(lottoHistoryFileName, header=None) df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn'] df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
prev_row = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
p_ball = prev_row[1:7]
passed_candidates = [] passed_candidates = []
nCr = list(itertools.combinations(candidates, 6)) for idx, ball in enumerate(itertools.combinations(candidates, 6)):
for idx, ball in enumerate(nCr):
if idx % 1000000 == 0: if idx % 1000000 == 0:
print(" - {} processed, pass: {}".format(idx, len(passed_candidates))) print(" - {} processed, pass: {}".format(idx, len(passed_candidates)))
ball = list(ball) ball = list(ball)
filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball) filter_type = ballFilter.filter(
ball=ball, no=no, until_end=False, df=df_ball, p_ball=p_ball
)
filter_size = len(filter_type) filter_size = len(filter_type)
if 0 < filter_size: if 0 < filter_size:
@@ -195,12 +307,11 @@ class Practice:
selected_candidates = self.select_portfolio( selected_candidates = self.select_portfolio(
fixed_balls=fixed_balls, fixed_balls=fixed_balls,
candidates=passed_candidates, candidates=passed_candidates,
target_count=variable_target_count target_count=variable_target_count,
shuffle_seed=ymd,
) )
p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0] p_no = prev_row[0]
p_no = p_ball[0]
p_ball = p_ball[1:7]
return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count
@@ -243,6 +354,12 @@ if __name__ == '__main__':
# 매주 고정 # 매주 고정
fixed_balls = [] fixed_balls = []
practice.predict1(fixed_balls) practice.predict1(fixed_balls)
fixed_validation = practice.validate_fixed_balls(
resources_path=resources_path,
ymd=ymd,
fixed_balls=fixed_balls,
)
print(Practice.format_fixed_validation_summary(fixed_validation))
result_json[ymd].extend(fixed_balls) result_json[ymd].extend(fixed_balls)
# 필터 기반 예측 # 필터 기반 예측
@@ -254,6 +371,15 @@ if __name__ == '__main__':
) )
result_json[ymd].extend(selected_candidates) result_json[ymd].extend(selected_candidates)
if '_meta' not in result_json:
result_json['_meta'] = {}
result_json['_meta'][ymd] = {
'fixed_validation': fixed_validation,
'passed_count': passed_count,
'selected_count': len(selected_candidates),
'portfolio_shuffle_seed': ymd,
}
with open(recommend_result_file, 'w', encoding='utf-8') as outFp: with open(recommend_result_file, 'w', encoding='utf-8') as outFp:
json.dump(result_json, outFp, ensure_ascii=False) json.dump(result_json, outFp, ensure_ascii=False)
@@ -261,6 +387,7 @@ if __name__ == '__main__':
total_cost = total_games * COST_PER_GAME total_cost = total_games * COST_PER_GAME
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1)) p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
p_str += " - 고정수: {}\n".format(len(fixed_balls)) p_str += " - 고정수: {}\n".format(len(fixed_balls))
p_str += Practice.format_fixed_validation_summary(fixed_validation) + "\n"
p_str += " - 필터 통과 후보: {}\n".format(passed_count) p_str += " - 필터 통과 후보: {}\n".format(passed_count)
p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count) p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count)
p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW) p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW)

View File

@@ -1222,3 +1222,4 @@
{"returnValue": "success", "drwNoDate": "2026-05-02", "drwNo": 1222, "drwtNo1": 4, "drwtNo2": 11, "drwtNo3": 17, "drwtNo4": 22, "drwtNo5": 32, "drwtNo6": 41, "bnusNo": 34} {"returnValue": "success", "drwNoDate": "2026-05-02", "drwNo": 1222, "drwtNo1": 4, "drwtNo2": 11, "drwtNo3": 17, "drwtNo4": 22, "drwtNo5": 32, "drwtNo6": 41, "bnusNo": 34}
{"returnValue": "success", "drwNoDate": "2026-05-09", "drwNo": 1223, "drwtNo1": 16, "drwtNo2": 18, "drwtNo3": 20, "drwtNo4": 32, "drwtNo5": 33, "drwtNo6": 39, "bnusNo": 26} {"returnValue": "success", "drwNoDate": "2026-05-09", "drwNo": 1223, "drwtNo1": 16, "drwtNo2": 18, "drwtNo3": 20, "drwtNo4": 32, "drwtNo5": 33, "drwtNo6": 39, "bnusNo": 26}
{"returnValue": "success", "drwNoDate": "2026-05-16", "drwNo": 1224, "drwtNo1": 9, "drwtNo2": 18, "drwtNo3": 21, "drwtNo4": 27, "drwtNo5": 44, "drwtNo6": 45, "bnusNo": 28} {"returnValue": "success", "drwNoDate": "2026-05-16", "drwNo": 1224, "drwtNo1": 9, "drwtNo2": 18, "drwtNo3": 21, "drwtNo4": 27, "drwtNo5": 44, "drwtNo6": 45, "bnusNo": 28}
{"returnValue": "success", "drwNoDate": "2026-05-23", "drwNo": 1225, "drwtNo1": 8, "drwtNo2": 9, "drwtNo3": 19, "drwtNo4": 25, "drwtNo5": 41, "drwtNo6": 42, "bnusNo": 33}

View File

@@ -1222,3 +1222,4 @@
1222,4,11,17,22,32,41,34 1222,4,11,17,22,32,41,34
1223,16,18,20,32,33,39,26 1223,16,18,20,32,33,39,26
1224,9,18,21,27,44,45,28 1224,9,18,21,27,44,45,28
1225,8,9,19,25,41,42,33