Add final BallFilter, train/valid scripts, train-derived sum filters

- final_BallFilter: CSV history loader, TRAIN_ALLOW for 6-sum and week diff,
  fix filterOneDigitPattern ball overwrite bug, drop socket call
- final_filter_params: build sum6 and abs_sum_diff from rounds 1-800
- filter_model re-exports BallFilter; train/valid evaluate pass-through counts
- final_filterTest aligned with 1_FilterTest_25 plus optional MC survivors
- README and scripts/run_with_ncue.sh for ncue workflow

Made-with: Cursor
This commit is contained in:
2026-04-08 19:29:10 +09:00
parent 013206ef67
commit 52e8495148
8 changed files with 4639 additions and 725 deletions

View File

@@ -1,50 +1,38 @@
# -*- coding: utf-8 -*-
"""
학습(1~800) / 검증(801~1000) / 테스트(1001~) 구간별 필터 통과(당첨번호가 필터를 통과하는지) 분석.
1_FilterTest_25.py 와 동일한 흐름이며 BallFilter 대신 final_BallFilter.BallFilter 를 사용합니다.
실행: miniconda 환경 ncue 에서 `python final_filterTest.py` (README 참고).
`1_FilterTest_25.py`와 동일한 역할이며 `final_BallFilter.BallFilter` + `lotto_history.txt`를 사용합니다.
"""
from __future__ import annotations
import datetime
import argparse
import itertools
import os
import random
import time
import datetime
import pandas as pd
from final_BallFilter import BallFilter
# PROMPT.txt 기준 구간
TRAIN_NO = (1, 800)
VALID_NO = (801, 1000)
TEST_NO = (1001, 10**9)
class FilterTest:
def __init__(self, resources_path: str):
lotto_json = os.path.join(resources_path, "lotto_history.json")
self.ballFilter = BallFilter(lotto_json)
ballFilter = None
def find_filter_method(self, df_ball, filter_ball=None, no_min=None, no_max=None):
"""no_min~no_max 회차만 역순으로 검사 (None 이면 전체)."""
def __init__(self, resources_path):
lotto_path = os.path.join(resources_path, "lotto_history.txt")
self.ballFilter = BallFilter(lotto_path)
def find_filter_method(self, df_ball, filter_ball=None):
win_count = 0
no_filter_ball = {}
printLog = True
filter_dic = {}
filter_dic_len = {}
filter_dic_1 = {}
filter_dic_2 = {}
idx_list = list(range(len(df_ball) - 1, 19, -1))
for i in idx_list:
no = int(df_ball["no"].iloc[i])
if no_min is not None and no < no_min:
continue
if no_max is not None and no > no_max:
continue
for i in range(len(df_ball) - 1, 19, -1):
no = df_ball["no"].iloc[i]
answer = df_ball[df_ball["no"] == no].values.tolist()[0]
answer = answer[1:7]
answer = sorted(int(x) for x in answer[1:7])
filter_type = self.ballFilter.filter(ball=answer, no=no, until_end=True, df=df_ball)
filter_type = list(filter_type)
@@ -53,13 +41,20 @@ class FilterTest:
if size == 0:
win_count += 1
no_filter_ball[no] = answer
print("\t", no)
elif size == 1:
key = filter_type[0]
filter_dic_1[key] = filter_dic_1.get(key, 0) + 1
if printLog:
print("\t", no, filter_type)
elif size == 2:
key = ",".join(filter_type)
filter_dic_2[key] = filter_dic_2.get(key, 0) + 1
if printLog:
print("\t", no, filter_type)
else:
if printLog:
print("\t", no, filter_type)
if size not in filter_dic_len:
filter_dic_len[size] = []
filter_dic_len[size].append(filter_type)
@@ -67,46 +62,106 @@ class FilterTest:
for f_t in filter_type:
filter_dic[f_t] = filter_dic.get(f_t, 0) + 1
print("\n\t[구간 {}~{}] 필터에 걸리지 않은 회차 (당첨 조합 통과)]".format(no_min, no_max))
print("\tcount: {:,} (통과)".format(len(no_filter_ball)))
for no in sorted(no_filter_ball.keys()):
print("\n\t[필터 개수가 적은 것부터 최적화를 위함]")
sorted_filter_dic_len = sorted(filter_dic_len.keys())
for filter_count in sorted_filter_dic_len:
for filter_type in filter_dic_len[filter_count]:
print("\t\t>{} > {}".format(filter_count, filter_type))
print("\n\t[걸러진 유일 필터]")
sorted_filter_dic_1 = sorted(filter_dic_1.items(), key=lambda x: x[1], reverse=True)
for i in range(len(sorted_filter_dic_1)):
print("\t\t>", sorted_filter_dic_1[i][0], "->", sorted_filter_dic_1[i][1])
print("\n\t[2개 필터에 걸린 경우]")
sorted_filter_dic_2 = sorted(filter_dic_2.items(), key=lambda x: x[1], reverse=True)
for i in range(len(sorted_filter_dic_2)):
print("\t\t>", sorted_filter_dic_2[i][0], "->", sorted_filter_dic_2[i][1])
print("\n\t[Filter 유형 별 걸린 개수]")
sorted_filter_dic = sorted(filter_dic.items(), key=lambda x: x[1], reverse=True)
for i in range(len(sorted_filter_dic)):
print("\t\t>", sorted_filter_dic[i][0], "->", sorted_filter_dic[i][1])
print("\n\t# 필터에 걸리지 않고 당첨된 회차")
print("\tcount: {:,} / total: {:,}".format(len(no_filter_ball), len(df_ball)))
for no in no_filter_ball:
print("\t\t>", no, no_filter_ball[no])
print("\tcount: {:,} / total: {:,}".format(len(no_filter_ball), len(df_ball)))
return win_count, no_filter_ball
return win_count
def report_split(self, df_ball, name: str, lo: int, hi: int):
print("\n" + "=" * 60)
print(" {} | 회차 {} ~ {}".format(name, lo, hi))
print("=" * 60)
t0 = time.time()
wc, _ = self.find_filter_method(df_ball, no_min=lo, no_max=hi)
elapsed = datetime.timedelta(seconds=time.time() - t0)
span = hi - lo + 1
rate = (wc / span * 100) if span else 0
print("\t처리 시간: {}".format(elapsed))
print("\t통과 회차 수: {} / {} ({:.2f}%)".format(wc, span, rate))
if lo >= TRAIN_NO[0] and hi <= TRAIN_NO[1]:
need = max(1, span // 100)
print("\t(참고) 100회당 최소 1회 기준 대략 {}회 이상이면 충족".format(need))
if lo >= VALID_NO[0] and hi <= VALID_NO[1]:
print("\t(참고) 검증 200회 구간에서 최소 3회 이상이면 요구사항 예시 충족")
return wc
def find_final_candidates(self, no, df_ball, filter_ball=None):
final_candidates = []
generation_balls = list(range(1, 46))
nCr = list(itertools.combinations(generation_balls, 6))
for idx, ball in enumerate(nCr):
if idx % 1000000 == 0:
print(" - {} processed...".format(idx))
if filter_ball is not None and 0 < len(set(ball) & set(filter_ball)):
continue
filter_type = self.ballFilter.filter(ball=list(ball), no=no, until_end=False, df=df_ball)
if filter_type:
continue
final_candidates.append(ball)
return final_candidates
def check_filter_method(self, df_ball, p_win_count, filter_ball=None):
win_count = 0
for i in range(len(df_ball) - 1, 0, -1):
no = df_ball["no"].iloc[i]
answer = df_ball[df_ball["no"] == no].values.tolist()[0]
answer = sorted(int(x) for x in answer[1:7])
if filter_ball is not None and len(set(answer) & set(filter_ball)):
continue
filter_type = self.ballFilter.extract_final_candidates(answer, no=no, until_end=True, df=df_ball)
if len(filter_type) == 0:
win_count += 1
print("\t\t>{}. {}".format(no, answer))
print("\n\t> {} / {} p_win_count, {} total".format(win_count, p_win_count, len(df_ball) - 1))
def estimate_survivors_mc(self, no, df_ball, n_samples=8000, seed=0):
"""전수(814만) 대신 무작위 조합으로 생존 비율을 추정해 대략적인 생존 개수를 반환합니다."""
rng = random.Random(seed)
generation_balls = list(range(1, 46))
total = 8145060
hits = 0
for _ in range(n_samples):
ball = sorted(rng.sample(generation_balls, 6))
fts = self.ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
if not fts:
hits += 1
est = int(round(total * (hits / n_samples)))
return est, hits, n_samples
if __name__ == "__main__":
resources_path = os.path.join(os.path.dirname(__file__), "resources")
csv_path = os.path.join(resources_path, "lotto_history.txt")
df_ball = pd.read_csv(csv_path, header=None)
parser = argparse.ArgumentParser()
parser.add_argument("--resources", default="resources")
parser.add_argument("--mc-no", type=int, default=None, help="생존 MC 추정을 할 회차 번호")
parser.add_argument("--mc-samples", type=int, default=8000)
args = parser.parse_args()
resources_path = args.resources
lottoHistoryFileName = os.path.join(resources_path, "lotto_history.txt")
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"]
ft = FilterTest(resources_path)
filterTest = FilterTest(resources_path)
ft.report_split(df_ball, "학습 TRAIN", TRAIN_NO[0], TRAIN_NO[1])
ft.report_split(df_ball, "검증 VALID", VALID_NO[0], min(VALID_NO[1], int(df_ball["no"].max())))
if int(df_ball["no"].max()) >= TEST_NO[0]:
ft.report_split(
df_ball,
"테스트 TEST",
TEST_NO[0],
int(df_ball["no"].max()),
)
print("STEP #1. 필터 방법 추출")
start = time.time()
win_count = filterTest.find_filter_method(df_ball)
process_time = datetime.timedelta(seconds=time.time() - start)
print("process_time: ", process_time)
if args.mc_no is not None:
est, h, n = filterTest.estimate_survivors_mc(args.mc_no, df_ball, n_samples=args.mc_samples)
print(f"MC 생존 추정 (회차 {args.mc_no}): 약 {est}개 (표본 통과 {h}/{n})")