Add final_BallFilter, train-based params, ncue run script and README notes

Made-with: Cursor
This commit is contained in:
2026-04-08 19:18:31 +09:00
parent 2bd4ad8fcb
commit 013206ef67
6 changed files with 1006 additions and 0 deletions

View File

@@ -0,0 +1,405 @@
#!/usr/bin/env python3
"""
학습 구간(1~800회) 당첨번호로 final_BallFilter.extract_final_candidates 에 쓸 허용 집합을 계산합니다.
표준 라이브러리 + pandas(df 호환)만 사용합니다.
"""
from __future__ import annotations
import csv
import re
from collections import defaultdict
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
HISTORY = ROOT / "resources" / "lotto_history.txt"
BALLFILTER_SRC = ROOT / "BallFilter_25.py"
OUT = ROOT / "final_filter_params.py"
TRAIN_LO = 1
TRAIN_HI = 800
# 학습 분포에서 너무 넓은 합집합(union)을 피하기 위해 고유값 기준 백분위 밴드 후,
# 각 회차 특성값이 밴드 밖이면 해당 값을 다시 포함(학습 당첨 100% 커버).
# 좁을수록 필터가 강해짐. 학습·검증 균형은 이 값과 final_filterTest.py 결과로 조정.
PCT_LO = 8
PCT_HI = 92
PRIME = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43}
COMPOSITE = {4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, 34, 35, 36, 38, 39, 40, 42, 44, 45}
def load_draws():
rows = []
with open(HISTORY, newline="", encoding="utf-8") as f:
for p in csv.reader(f):
if not p:
continue
no = int(p[0])
balls = sorted(int(x) for x in p[1:7])
rows.append((no, balls))
rows.sort(key=lambda x: x[0])
return {no: b for no, b in rows}
def get_ac(ball):
ac = set()
for i in range(5, -1, -1):
for j in range(i - 1, -1, -1):
ac.add(ball[i] - ball[j])
return len(ac) - (6 - 1)
def interval_sum(ball):
return sum(ball[i] - ball[i - 1] for i in range(1, 6))
def first_letter_sum(ball):
acc = [str(b)[0] for b in ball if len(str(b)) == 2]
return sum(int(x) for x in acc)
def last_letter_sum(ball):
acc = [str(b)[1] for b in ball if len(str(b)) == 2] + [str(b) for b in ball if len(str(b)) == 1]
return sum(int(x) for x in acc)
def uniq_end_digits(ball):
return len({b % 10 for b in ball})
def high_low(ball):
low = sum(1 for b in ball if b < 23)
high = sum(1 for b in ball if 23 < b)
return low, high
def section10_count(ball):
section = set()
for b in ball:
section.add(int(b / 10))
return len(section)
def count_mult(ball, m):
return sum(1 for b in ball if b % m == 0)
def continus_max(ball):
w = ball
best = 1
run = 1
for i in range(1, 6):
if w[i] == w[i - 1] + 1:
run += 1
best = max(best, run)
else:
run = 1
return best
def weeks_freq(draws_map, answer, no, week):
s = set()
for w in range(1, week + 1):
prev_no = no - w
if prev_no not in draws_map:
continue
for b in draws_map[prev_no]:
s.add(b)
return sum(1 for b in answer if b in s)
def pct_band_unique(values, lo=PCT_LO, hi=PCT_HI):
"""고유값 정렬 후 백분위 구간에 들어가는 값만 유지. 고유 개수가 적으면 전부 유지."""
if not values:
return set()
u = sorted(set(values))
if len(u) <= 6:
return set(u)
n = len(u)
il = int((lo / 100.0) * (n - 1))
ih = int((hi / 100.0) * (n - 1))
low_b, high_b = u[il], u[ih]
return {x for x in u if low_b <= x <= high_b}
def parse_pair_triple_rules():
"""BallFilter_25.filterPairBall / filterTriplePairBall 에서 규칙 추출."""
text = BALLFILTER_SRC.read_text(encoding="utf-8")
pairs = []
for m in re.finditer(r"len\(set_ball & \{([^}]+)\}\) == 2", text):
parts = [int(x.strip()) for x in m.group(1).split(",")]
if len(parts) == 2:
pairs.append(frozenset(parts))
triples = []
for m in re.finditer(r"len\(set_ball & \{([^}]+)\}\) == 3", text):
parts = [int(x.strip()) for x in m.group(1).split(",")]
if len(parts) == 3:
triples.append(frozenset(parts))
return pairs, triples
def main():
draws = load_draws()
pair_rules, triple_rules = parse_pair_triple_rules()
train_draws = {n: draws[n] for n in range(TRAIN_LO, TRAIN_HI + 1) if n in draws}
# 블랙리스트: 학습 당첨 6개에 함께 등장한 쌍/삼은 제외(당첨을 막지 않음)
train_pairs_seen = set()
train_triples_seen = set()
for b in train_draws.values():
for i in range(6):
for j in range(i + 1, 6):
train_pairs_seen.add(frozenset((b[i], b[j])))
for i in range(6):
for j in range(i + 1, 6):
for k in range(j + 1, 6):
train_triples_seen.add(frozenset((b[i], b[j], b[k])))
pair_block = [p for p in pair_rules if p not in train_pairs_seen]
triple_block = [t for t in triple_rules if t not in train_triples_seen]
sets = defaultdict(set)
flags_prev = {"need_relax_previous": False, "need_relax_prev7": False}
for no in range(2, TRAIN_HI + 1):
if no not in draws or (no - 1) not in draws:
continue
ball = draws[no]
p_ball = draws[no - 1]
s = sum(ball)
sets["sum6"].add(s)
sets["sum6_diff"].add(abs(s - sum(p_ball)))
avg = s // 6
pavg = sum(p_ball) // 6
sets["avg6"].add(avg)
sets["avg6_diff"].add(abs(avg - pavg))
s3f = ball[0] + ball[1] + ball[2]
ps3f = p_ball[0] + p_ball[1] + p_ball[2]
sets["sum3f"].add(s3f)
sets["sum3f_diff"].add(abs(s3f - ps3f))
s3b = ball[3] + ball[4] + ball[5]
ps3b = p_ball[3] + p_ball[4] + p_ball[5]
sets["sum3b"].add(s3b)
sets["sum3b_diff"].add(abs(s3b - ps3b))
l, h = high_low(ball)
sets["hl_allowed"].add((l, h))
gh = ball[0] + ball[5]
pgh = p_ball[0] + p_ball[5]
sets["go_sum"].add(gh)
sets["go_sum_diff"].add(abs(gh - pgh))
iv = interval_sum(ball)
piv = interval_sum(p_ball)
sets["interval"].add(iv)
sets["interval_diff"].add(abs(iv - piv))
fl = first_letter_sum(ball)
pfl = first_letter_sum(p_ball)
sets["first_letter"].add(fl)
sets["first_letter_diff"].add(abs(fl - pfl))
ll = last_letter_sum(ball)
pll = last_letter_sum(p_ball)
sets["last_letter"].add(ll)
sets["last_letter_diff"].add(abs(ll - pll))
sets["b0"].add(ball[0])
sets["b0_diff"].add(abs(ball[0] - p_ball[0]))
sets["b5"].add(ball[5])
sets["b5_diff"].add(abs(ball[5] - p_ball[5]))
sets["uniq_end"].add(uniq_end_digits(ball))
sets["uniq_end_diff"].add(abs(uniq_end_digits(ball) - uniq_end_digits(p_ball)))
ac = get_ac(ball)
pac = get_ac(p_ball)
sets["ac"].add(ac)
sets["ac_diff"].add(abs(ac - pac))
for m in (3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 17, 19, 23):
sets[f"mul{m}"].add(count_mult(ball, m))
sets[f"mul{m}_diff"].add(abs(count_mult(ball, m) - count_mult(p_ball, m)))
pn = len(set(ball) & PRIME)
sets["prime_n"].add(pn)
cn = len(set(ball) & COMPOSITE)
sets["composite_n"].add(cn)
sets["composite_diff"].add(abs(cn - len(set(p_ball) & COMPOSITE)))
ev = sum(1 for b in ball if b % 2 == 0)
pev = sum(1 for b in p_ball if b % 2 == 0)
sets["even_n"].add(ev)
sets["even_diff"].add(abs(ev - pev))
sc = section10_count(ball)
psc = section10_count(p_ball)
sets["sec10"].add(sc)
sets["sec10_diff"].add(abs(sc - psc))
for wk in (8, 12, 16, 20):
ex = weeks_freq(draws, ball, no, wk)
pex = weeks_freq(draws, p_ball, no, wk)
sets[f"w{wk}"].add(ex)
sets[f"w{wk}_diff"].add(abs(ex - pex))
sets["continus_max"].add(continus_max(ball))
# filterPreviousNumber (원본과 동일)
pb_set = set(p_ball)
bad_prev = True
for i in range(6):
bi = ball[i]
if bi in pb_set or bi - 1 in pb_set or bi + 1 in pb_set:
bad_prev = False
break
if bad_prev:
flags_prev["need_relax_previous"] = True
# filterAllPreivous7
pb7 = set()
for i in range(no - 1, no - 8, -1):
if i in draws:
for x in draws[i]:
pb7.add(x)
if len(set(ball) & pb7) == 6:
flags_prev["need_relax_prev7"] = True
# 백분위로 타이트닝 후 학습 각 회차 특성 보강
keys_numeric = [
"sum6",
"sum6_diff",
"avg6",
"avg6_diff",
"sum3f",
"sum3f_diff",
"sum3b",
"sum3b_diff",
"go_sum",
"go_sum_diff",
"interval",
"interval_diff",
"first_letter",
"first_letter_diff",
"last_letter",
"last_letter_diff",
"b0",
"b0_diff",
"b5",
"b5_diff",
"uniq_end",
"uniq_end_diff",
"ac",
"ac_diff",
"prime_n",
"composite_n",
"composite_diff",
"even_n",
"even_diff",
"sec10",
"sec10_diff",
]
for m in (3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 17, 19, 23):
keys_numeric.extend([f"mul{m}", f"mul{m}_diff"])
for wk in (8, 12, 16, 20):
keys_numeric.extend([f"w{wk}", f"w{wk}_diff"])
keys_numeric.append("continus_max")
for k in keys_numeric:
sets[k] = pct_band_unique(sets[k])
# 고저: (0,1)/(1,0) 만 제외하는 기존 로직 유지 + 학습에 나온 (l,h) 항상 허용
hl_skip = {(l, h) for l in (0, 1) for h in (0, 1)}
def emit():
lines = [
"# -*- coding: utf-8 -*-",
'"""학습 구간 {}~{}회 기준 자동 생성 — tools/compute_final_filter_params.py"""'.format(
TRAIN_LO, TRAIN_HI
),
"",
"TRAIN_RANGE = ({}, {})".format(TRAIN_LO, TRAIN_HI),
"DISABLE_FILTER_PREVIOUS_NUMBER = {}".format(
str(flags_prev["need_relax_previous"])
),
"DISABLE_FILTER_ALL_PREVIOUS_7 = {}".format(str(flags_prev["need_relax_prev7"])),
"",
]
def sset(name, key):
v = sets[key]
lines.append("{} = {}".format(name, repr(sorted(v))))
sset("ALLOW_SUM6", "sum6")
sset("ALLOW_SUM6_DIFF", "sum6_diff")
sset("ALLOW_AVG6", "avg6")
sset("ALLOW_AVG6_DIFF", "avg6_diff")
sset("ALLOW_SUM3F", "sum3f")
sset("ALLOW_SUM3F_DIFF", "sum3f_diff")
sset("ALLOW_SUM3B", "sum3b")
sset("ALLOW_SUM3B_DIFF", "sum3b_diff")
lines.append("HL_SKIP = {}".format(repr(sorted(hl_skip))))
lines.append("HL_SEEN = {}".format(repr(sorted(sets['hl_allowed']))))
sset("ALLOW_GO_SUM", "go_sum")
sset("ALLOW_GO_SUM_DIFF", "go_sum_diff")
sset("ALLOW_INTERVAL", "interval")
sset("ALLOW_INTERVAL_DIFF", "interval_diff")
sset("ALLOW_FIRST_LETTER", "first_letter")
sset("ALLOW_FIRST_LETTER_DIFF", "first_letter_diff")
sset("ALLOW_LAST_LETTER", "last_letter")
sset("ALLOW_LAST_LETTER_DIFF", "last_letter_diff")
sset("ALLOW_B0", "b0")
sset("ALLOW_B0_DIFF", "b0_diff")
sset("ALLOW_B5", "b5")
sset("ALLOW_B5_DIFF", "b5_diff")
sset("ALLOW_UNIQ_END", "uniq_end")
sset("ALLOW_UNIQ_END_DIFF", "uniq_end_diff")
sset("ALLOW_AC", "ac")
sset("ALLOW_AC_DIFF", "ac_diff")
for m in (3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 17, 19, 23):
sset("ALLOW_MUL{}".format(m), "mul{}".format(m))
sset("ALLOW_MUL{}_DIFF".format(m), "mul{}_diff".format(m))
sset("ALLOW_PRIME_N", "prime_n")
sset("ALLOW_COMPOSITE_N", "composite_n")
sset("ALLOW_COMPOSITE_DIFF", "composite_diff")
sset("ALLOW_EVEN_N", "even_n")
sset("ALLOW_EVEN_DIFF", "even_diff")
sset("ALLOW_SEC10", "sec10")
sset("ALLOW_SEC10_DIFF", "sec10_diff")
for wk in (8, 12, 16, 20):
sset("ALLOW_W{}".format(wk), "w{}".format(wk))
sset("ALLOW_W{}_DIFF".format(wk), "w{}_diff".format(wk))
sset("ALLOW_CONTINUS_MAX", "continus_max")
lines.append("PAIR_BLOCKLIST = {}".format(repr([sorted(list(x)) for x in pair_block])))
lines.append("TRIPLE_BLOCKLIST = {}".format(repr([sorted(list(x)) for x in triple_block])))
lines.extend(["", "# frozenset 캐시", ""])
allow_names = []
for line in list(lines):
if line.startswith("ALLOW_") and " = " in line:
name = line.split(" = ")[0]
allow_names.append(name)
for name in allow_names:
short = name.replace("ALLOW_", "", 1)
lines.append("_F_{} = frozenset({})".format(short, name))
lines.append("_F_HL_SEEN = frozenset(HL_SEEN)")
lines.append("")
return "\n".join(lines) + "\n"
OUT.write_text(emit(), encoding="utf-8")
print("Wrote", OUT)
print("pair rules:", len(pair_rules), "-> block", len(pair_block))
print("triple rules:", len(triple_rules), "-> block", len(triple_block))
print("DISABLE_FILTER_PREVIOUS_NUMBER", flags_prev["need_relax_previous"])
print("DISABLE_FILTER_ALL_PREVIOUS_7", flags_prev["need_relax_prev7"])
if __name__ == "__main__":
main()