diff --git a/final_BallFilter.py b/final_BallFilter.py index 869bcce..c5a5db8 100644 --- a/final_BallFilter.py +++ b/final_BallFilter.py @@ -3812,9 +3812,10 @@ class BallFilter: if len(set_ball & {3, 20, 44}) == 3: return 2928 return None - def extract_final_candidates(self, ball, no=None, until_end=False, df=None): - p_ball = df[df['no'] == no - 1].values.tolist()[0] - p_ball = p_ball[1:7] + def extract_final_candidates(self, ball, no=None, until_end=False, df=None, p_ball=None): + if p_ball is None: + p_ball = df[df['no'] == no - 1].values.tolist()[0] + p_ball = p_ball[1:7] filter_set = set() @@ -4447,7 +4448,9 @@ class BallFilter: return filter_set - def filter(self, ball, no, until_end=False, df=None, filter_ball=None): - filter_type = self.extract_final_candidates(ball=ball, no=no, until_end=until_end, df=df) + def filter(self, ball, no, until_end=False, df=None, filter_ball=None, p_ball=None): + filter_type = self.extract_final_candidates( + ball=ball, no=no, until_end=until_end, df=df, p_ball=p_ball + ) return filter_type \ No newline at end of file diff --git a/final_practice.py b/final_practice.py index 8b0de85..4c0d7e7 100644 --- a/final_practice.py +++ b/final_practice.py @@ -5,8 +5,10 @@ from DataCrawler import DataCrawler import json import os +import random import pandas as pd import itertools +from collections import Counter from datetime import datetime, timedelta from TelegramBot import TelegramBot @@ -98,6 +100,70 @@ class Practice: return + def validate_fixed_balls(self, resources_path, ymd, fixed_balls): + """ + 고정수 BallFilter 통과 여부를 검증한다. + + Returns: + dict: total, passed_count, failed_count, draw_no, details + """ + lotto_history_json = os.path.join(resources_path, 'lotto_history.json') + ball_filter = BallFilter(lotto_history_json) + draw_no = ball_filter.getNextNo(ymd) + + lotto_history_txt = os.path.join(resources_path, 'lotto_history.txt') + df_ball = pd.read_csv(lotto_history_txt, header=None) + df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn'] + prev_row = df_ball[df_ball['no'] == draw_no - 1].values.tolist()[0] + p_ball = prev_row[1:7] + + details = [] + passed_count = 0 + for index, ball in enumerate(fixed_balls): + filter_type = ball_filter.filter( + ball=ball, no=draw_no, until_end=False, df=df_ball, p_ball=p_ball + ) + passed = len(filter_type) == 0 + if passed: + passed_count += 1 + details.append({ + 'index': index + 1, + 'ball': ball, + 'passed': passed, + 'filter_reasons': sorted(filter_type), + }) + + return { + 'draw_no': draw_no, + 'total': len(fixed_balls), + 'passed_count': passed_count, + 'failed_count': len(fixed_balls) - passed_count, + 'details': details, + } + + @staticmethod + def format_fixed_validation_summary(validation): + """고정수 검증 결과를 Telegram/로그용 문자열로 변환한다.""" + lines = [ + " - 고정수 필터 검증: {}/{} 통과".format( + validation['passed_count'], validation['total'] + ) + ] + if validation['failed_count'] > 0: + lines.append( + " - 필터 예외 포함: {}개 (고정수 유지)".format( + validation['failed_count'] + ) + ) + for item in validation['details']: + if item['passed']: + continue + reason = item['filter_reasons'][0] if item['filter_reasons'] else 'unknown' + lines.append( + " * #{} {} -> {}".format(item['index'], item['ball'], reason) + ) + return "\n".join(lines) + def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap): ball_set = set(ball) @@ -111,11 +177,48 @@ class Practice: return True - def select_portfolio(self, fixed_balls, candidates, target_count): + @staticmethod + def _portfolio_number_counts(fixed_balls, selected_balls): + """포트폴리오 내 번호 등장 횟수를 집계한다.""" + counts = Counter() + for ball in fixed_balls + selected_balls: + counts.update(ball) + return counts + + @staticmethod + def _coverage_priority(ball, number_counts): + """낮을수록 포트폴리오에 덜 등장한 번호 위주 조합이다.""" + return sum(number_counts.get(number, 0) for number in ball) + + def _pick_best_candidate(self, unique_candidates, selected_keys, fixed_balls, selected, max_overlap): + """겹침 제약을 만족하는 후보 중 번호 커버리지가 가장 넓은 조합을 고른다.""" + number_counts = self._portfolio_number_counts(fixed_balls, selected) + best_candidate = None + best_score = None + best_key = None + + for candidate in unique_candidates: + key = tuple(candidate) + if key in selected_keys: + continue + if not self._can_add_ball(candidate, fixed_balls, selected, max_overlap): + continue + + score = self._coverage_priority(candidate, number_counts) + if best_candidate is None or score < best_score or (score == best_score and key < best_key): + best_candidate = candidate + best_score = score + best_key = key + + return best_candidate, best_key + + def select_portfolio(self, fixed_balls, candidates, target_count, shuffle_seed=None): """ 2차 포트폴리오 선정: - 중복 제거 + - shuffle_seed 기반 셔플로 순서 편향 완화 - 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택 + - 동률 후보는 번호 커버리지가 넓은 조합 우선 """ unique_candidates = [] seen = set() @@ -128,6 +231,10 @@ class Practice: seen.add(key) unique_candidates.append(list(key)) + if shuffle_seed is not None: + rng = random.Random(int(shuffle_seed)) + rng.shuffle(unique_candidates) + if target_count <= 0: return [] @@ -139,26 +246,27 @@ class Practice: overlap_stages = [2, 3, 4, 5] for max_overlap in overlap_stages: - for candidate in unique_candidates: - key = tuple(candidate) - if key in selected_keys: - continue + while len(selected) < target_count: + best_candidate, best_key = self._pick_best_candidate( + unique_candidates, selected_keys, fixed_balls, selected, max_overlap + ) + if best_candidate is None: + break - if self._can_add_ball(candidate, fixed_balls, selected, max_overlap): - selected.append(candidate) - selected_keys.add(key) - if len(selected) >= target_count: - return selected + selected.append(best_candidate) + selected_keys.add(best_key) - # 단계 완화 후에도 부족하면 남은 조합을 순서대로 채움 - for candidate in unique_candidates: - key = tuple(candidate) - if key in selected_keys: - continue - selected.append(candidate) - selected_keys.add(key) if len(selected) >= target_count: + return selected + + while len(selected) < target_count: + best_candidate, best_key = self._pick_best_candidate( + unique_candidates, selected_keys, fixed_balls, selected, max_overlap=6 + ) + if best_candidate is None: break + selected.append(best_candidate) + selected_keys.add(best_key) return selected @@ -175,15 +283,19 @@ class Practice: df_ball = pd.read_csv(lottoHistoryFileName, header=None) df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn'] + prev_row = df_ball[df_ball['no'] == no - 1].values.tolist()[0] + p_ball = prev_row[1:7] + passed_candidates = [] - nCr = list(itertools.combinations(candidates, 6)) - for idx, ball in enumerate(nCr): + for idx, ball in enumerate(itertools.combinations(candidates, 6)): if idx % 1000000 == 0: print(" - {} processed, pass: {}".format(idx, len(passed_candidates))) ball = list(ball) - filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball) + filter_type = ballFilter.filter( + ball=ball, no=no, until_end=False, df=df_ball, p_ball=p_ball + ) filter_size = len(filter_type) if 0 < filter_size: @@ -195,12 +307,11 @@ class Practice: selected_candidates = self.select_portfolio( fixed_balls=fixed_balls, candidates=passed_candidates, - target_count=variable_target_count + target_count=variable_target_count, + shuffle_seed=ymd, ) - p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0] - p_no = p_ball[0] - p_ball = p_ball[1:7] + p_no = prev_row[0] return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count @@ -243,6 +354,12 @@ if __name__ == '__main__': # 매주 고정 fixed_balls = [] practice.predict1(fixed_balls) + fixed_validation = practice.validate_fixed_balls( + resources_path=resources_path, + ymd=ymd, + fixed_balls=fixed_balls, + ) + print(Practice.format_fixed_validation_summary(fixed_validation)) result_json[ymd].extend(fixed_balls) # 필터 기반 예측 @@ -254,6 +371,15 @@ if __name__ == '__main__': ) result_json[ymd].extend(selected_candidates) + if '_meta' not in result_json: + result_json['_meta'] = {} + result_json['_meta'][ymd] = { + 'fixed_validation': fixed_validation, + 'passed_count': passed_count, + 'selected_count': len(selected_candidates), + 'portfolio_shuffle_seed': ymd, + } + with open(recommend_result_file, 'w', encoding='utf-8') as outFp: json.dump(result_json, outFp, ensure_ascii=False) @@ -261,6 +387,7 @@ if __name__ == '__main__': total_cost = total_games * COST_PER_GAME p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1)) p_str += " - 고정수: {}개\n".format(len(fixed_balls)) + p_str += Practice.format_fixed_validation_summary(fixed_validation) + "\n" p_str += " - 필터 통과 후보: {}개\n".format(passed_count) p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count) p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW) diff --git a/resources/lotto_history.json b/resources/lotto_history.json index 8ae71b1..600e674 100644 --- a/resources/lotto_history.json +++ b/resources/lotto_history.json @@ -1222,3 +1222,4 @@ {"returnValue": "success", "drwNoDate": "2026-05-02", "drwNo": 1222, "drwtNo1": 4, "drwtNo2": 11, "drwtNo3": 17, "drwtNo4": 22, "drwtNo5": 32, "drwtNo6": 41, "bnusNo": 34} {"returnValue": "success", "drwNoDate": "2026-05-09", "drwNo": 1223, "drwtNo1": 16, "drwtNo2": 18, "drwtNo3": 20, "drwtNo4": 32, "drwtNo5": 33, "drwtNo6": 39, "bnusNo": 26} {"returnValue": "success", "drwNoDate": "2026-05-16", "drwNo": 1224, "drwtNo1": 9, "drwtNo2": 18, "drwtNo3": 21, "drwtNo4": 27, "drwtNo5": 44, "drwtNo6": 45, "bnusNo": 28} +{"returnValue": "success", "drwNoDate": "2026-05-23", "drwNo": 1225, "drwtNo1": 8, "drwtNo2": 9, "drwtNo3": 19, "drwtNo4": 25, "drwtNo5": 41, "drwtNo6": 42, "bnusNo": 33} diff --git a/resources/lotto_history.txt b/resources/lotto_history.txt index dd1eeb5..b37e649 100644 --- a/resources/lotto_history.txt +++ b/resources/lotto_history.txt @@ -1222,3 +1222,4 @@ 1222,4,11,17,22,32,41,34 1223,16,18,20,32,33,39,26 1224,9,18,21,27,44,45,28 +1225,8,9,19,25,41,42,33 \ No newline at end of file