452 lines
15 KiB
Python
452 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
fixed10.py
|
|
|
|
요구사항
|
|
- "지금까지 당첨되지 않은(=과거 1등 조합으로 나온 적 없는)" 조합만 추천
|
|
- 앞으로 10개 조합을 꾸준히 구매할 수 있도록 10개만 출력
|
|
- filter_model_1/2/3와 무관한 새로운 최적화 방법
|
|
|
|
중요한 사실
|
|
- 로또는 통계적으로 독립/균등(무작위) 가정이 기본이라 미래 1등을 '예측'할 수는 없습니다.
|
|
- 대신 이 코드는 과거 1등 조합들의 전형적 분포(합/홀짝/구간/연속/끝수 등)에
|
|
"가까운" 조합을 찾고, 10개 조합 간 중복(겹침)을 줄이는 방향으로 최적화합니다.
|
|
|
|
동작 개요
|
|
1) 히스토리( resources/lotto_history.txt )로부터 과거 1등 조합 집합을 로드
|
|
2) 과거 1등들의 feature 분포를 구축(라플라스 스무딩)
|
|
3) 고정 seed로 랜덤 샘플 풀을 생성하고, 분포 적합도 + 제약(겹침/최근회차 유사도 등)으로 스코어링
|
|
4) 상위 후보에서 다양성 제약을 만족하도록 greedy하게 10개 선택
|
|
|
|
사용 예:
|
|
python fixed10.py
|
|
python fixed10.py --history resources/lotto_history.txt --count 10 --seed 42 --pool 300000
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import math
|
|
import os
|
|
import random
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Iterable, List, Sequence, Set, Tuple, Optional
|
|
|
|
|
|
Ball = Tuple[int, int, int, int, int, int]
|
|
|
|
|
|
def parse_history_txt(path: str) -> List[Ball]:
|
|
"""
|
|
Parse lotto_history.txt rows: no,b1,b2,b3,b4,b5,b6,bn
|
|
Returns list of sorted 6-number tuples (Ball), in file order.
|
|
"""
|
|
balls: List[Ball] = []
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
reader = csv.reader(f)
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
# tolerate whitespace
|
|
row = [c.strip() for c in row]
|
|
if len(row) < 7:
|
|
continue
|
|
nums = sorted(int(x) for x in row[1:7])
|
|
balls.append(tuple(nums)) # type: ignore[arg-type]
|
|
return balls
|
|
|
|
|
|
def max_consecutive_len(nums: Sequence[int]) -> int:
|
|
m = 1
|
|
cur = 1
|
|
for i in range(1, len(nums)):
|
|
if nums[i] == nums[i - 1] + 1:
|
|
cur += 1
|
|
m = max(m, cur)
|
|
else:
|
|
cur = 1
|
|
return m
|
|
|
|
|
|
def decade_bucket(n: int) -> int:
|
|
# 1~45 -> 0..4 (1-10, 11-20, 21-30, 31-40, 41-45)
|
|
if 1 <= n <= 10:
|
|
return 0
|
|
if 11 <= n <= 20:
|
|
return 1
|
|
if 21 <= n <= 30:
|
|
return 2
|
|
if 31 <= n <= 40:
|
|
return 3
|
|
return 4
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Features:
|
|
sum6: int
|
|
odd: int
|
|
low: int # <=22
|
|
max_run: int
|
|
uniq_last_digit: int
|
|
decade_sig: Tuple[int, int, int, int, int] # counts per bucket
|
|
|
|
|
|
def features_of(ball: Ball) -> Features:
|
|
nums = ball
|
|
s = sum(nums)
|
|
odd = sum(1 for x in nums if x % 2 == 1)
|
|
low = sum(1 for x in nums if x <= 22)
|
|
max_run = max_consecutive_len(nums)
|
|
uniq_last = len({x % 10 for x in nums})
|
|
buckets = [0, 0, 0, 0, 0]
|
|
for x in nums:
|
|
buckets[decade_bucket(x)] += 1
|
|
return Features(
|
|
sum6=s,
|
|
odd=odd,
|
|
low=low,
|
|
max_run=max_run,
|
|
uniq_last_digit=uniq_last,
|
|
decade_sig=tuple(buckets), # type: ignore[arg-type]
|
|
)
|
|
|
|
|
|
class SmoothedDist:
|
|
"""
|
|
Discrete distribution with Laplace smoothing:
|
|
P(v) = (count(v) + alpha) / (N + alpha*|V|)
|
|
where V is the observed support.
|
|
"""
|
|
|
|
def __init__(self, counts: Counter, alpha: float = 1.0):
|
|
self.counts = counts
|
|
self.alpha = float(alpha)
|
|
self.n = sum(counts.values())
|
|
self.k = max(1, len(counts))
|
|
|
|
def logp(self, v) -> float:
|
|
c = self.counts.get(v, 0)
|
|
return math.log((c + self.alpha) / (self.n + self.alpha * self.k))
|
|
|
|
|
|
def build_feature_dists(history: Sequence[Ball]) -> Dict[str, SmoothedDist]:
|
|
feats = [features_of(b) for b in history]
|
|
return {
|
|
"sum6": SmoothedDist(Counter(f.sum6 for f in feats), alpha=1.0),
|
|
"odd": SmoothedDist(Counter(f.odd for f in feats), alpha=1.0),
|
|
"low": SmoothedDist(Counter(f.low for f in feats), alpha=1.0),
|
|
"max_run": SmoothedDist(Counter(f.max_run for f in feats), alpha=1.0),
|
|
"uniq_last_digit": SmoothedDist(Counter(f.uniq_last_digit for f in feats), alpha=1.0),
|
|
"decade_sig": SmoothedDist(Counter(f.decade_sig for f in feats), alpha=1.0),
|
|
}
|
|
|
|
|
|
def overlap(a: Ball, b: Ball) -> int:
|
|
return len(set(a) & set(b))
|
|
|
|
|
|
def recent_overlap_penalty(ball: Ball, recent: Sequence[Ball]) -> float:
|
|
"""
|
|
Penalize candidates that look too similar to very recent winning draws.
|
|
This does NOT mean such candidates can't win; it's just a diversification heuristic.
|
|
"""
|
|
# if overlaps >=4 with any recent draw -> strong penalty
|
|
mx = 0
|
|
for rb in recent:
|
|
mx = max(mx, overlap(ball, rb))
|
|
if mx >= 4:
|
|
break
|
|
if mx >= 4:
|
|
return 6.0
|
|
if mx == 3:
|
|
return 1.0
|
|
return 0.0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Tuning:
|
|
# sampling / search
|
|
pool: int
|
|
top_k: int
|
|
# diversification
|
|
recent_window: int
|
|
max_pair_overlap: int
|
|
# penalty weights
|
|
recent_penalty_3: float
|
|
recent_penalty_4plus: float
|
|
max_run_penalty: float
|
|
decade_concentration_penalty: float
|
|
|
|
|
|
PRESETS: Dict[str, Tuning] = {
|
|
# balanced: 기본값(지금까지 사용) - 분포 적합 + 적당한 다양성
|
|
"balanced": Tuning(
|
|
pool=250_000,
|
|
top_k=5_000,
|
|
recent_window=52,
|
|
max_pair_overlap=2,
|
|
recent_penalty_3=1.0,
|
|
recent_penalty_4plus=6.0,
|
|
max_run_penalty=1.5,
|
|
decade_concentration_penalty=2.0,
|
|
),
|
|
# aggressive: 후보를 더 "분포에 딱 맞게" + 최근 유사도 더 강하게 회피 + 서로 겹침 더 엄격
|
|
"aggressive": Tuning(
|
|
pool=500_000,
|
|
top_k=7_500,
|
|
recent_window=80,
|
|
max_pair_overlap=1,
|
|
recent_penalty_3=2.0,
|
|
recent_penalty_4plus=10.0,
|
|
max_run_penalty=2.5,
|
|
decade_concentration_penalty=3.0,
|
|
),
|
|
# conservative: 후보를 더 넓게(덜 가혹) + 다양성 제약 완화
|
|
"conservative": Tuning(
|
|
pool=150_000,
|
|
top_k=5_000,
|
|
recent_window=26,
|
|
max_pair_overlap=3,
|
|
recent_penalty_3=0.3,
|
|
recent_penalty_4plus=2.0,
|
|
max_run_penalty=0.8,
|
|
decade_concentration_penalty=1.0,
|
|
),
|
|
}
|
|
|
|
|
|
def max_recent_overlap(ball: Ball, recent: Sequence[Ball]) -> int:
|
|
mx = 0
|
|
for rb in recent:
|
|
mx = max(mx, overlap(ball, rb))
|
|
if mx >= 6:
|
|
break
|
|
return mx
|
|
|
|
|
|
def score_ball(
|
|
ball: Ball,
|
|
dists: Dict[str, SmoothedDist],
|
|
history_set: Set[Ball],
|
|
recent: Sequence[Ball],
|
|
tuning: Tuning,
|
|
) -> float:
|
|
# hard reject: already won in history
|
|
if ball in history_set:
|
|
return float("-inf")
|
|
|
|
f = features_of(ball)
|
|
|
|
# distribution-fit score (higher is better)
|
|
s = 0.0
|
|
s += dists["sum6"].logp(f.sum6)
|
|
s += dists["odd"].logp(f.odd)
|
|
s += dists["low"].logp(f.low)
|
|
s += dists["max_run"].logp(f.max_run)
|
|
s += dists["uniq_last_digit"].logp(f.uniq_last_digit)
|
|
s += dists["decade_sig"].logp(f.decade_sig)
|
|
|
|
# mild, human-sensible constraints (soft)
|
|
# - avoid very long consecutive runs (>=4)
|
|
if f.max_run >= 4:
|
|
s -= tuning.max_run_penalty
|
|
# - avoid extremely concentrated decades (e.g. 5+ numbers in same bucket)
|
|
if max(f.decade_sig) >= 5:
|
|
s -= tuning.decade_concentration_penalty
|
|
|
|
# diversify away from recent draws (soft)
|
|
mx = max_recent_overlap(ball, recent)
|
|
if mx >= 4:
|
|
s -= tuning.recent_penalty_4plus
|
|
elif mx == 3:
|
|
s -= tuning.recent_penalty_3
|
|
|
|
return s
|
|
|
|
|
|
def select_diverse(
|
|
candidates: Sequence[Ball],
|
|
scores: Dict[Ball, float],
|
|
count: int,
|
|
max_pair_overlap: int,
|
|
) -> List[Ball]:
|
|
"""
|
|
Greedy selection:
|
|
- iterate candidates in descending score
|
|
- pick if it doesn't overlap too much with already chosen ones
|
|
"""
|
|
chosen: List[Ball] = []
|
|
for b in candidates:
|
|
if len(chosen) >= count:
|
|
break
|
|
ok = True
|
|
for c in chosen:
|
|
if overlap(b, c) > max_pair_overlap:
|
|
ok = False
|
|
break
|
|
if ok:
|
|
chosen.append(b)
|
|
return chosen
|
|
|
|
|
|
def generate_fixed10(
|
|
history: Sequence[Ball],
|
|
count: int = 10,
|
|
seed: int = 42,
|
|
pool: int = 250_000,
|
|
top_k: int = 5_000,
|
|
recent_window: int = 52,
|
|
max_pair_overlap: int = 2,
|
|
recent_penalty_3: float = 1.0,
|
|
recent_penalty_4plus: float = 6.0,
|
|
max_run_penalty: float = 1.5,
|
|
decade_concentration_penalty: float = 2.0,
|
|
) -> List[Ball]:
|
|
rng = random.Random(seed)
|
|
history_set = set(history)
|
|
dists = build_feature_dists(history)
|
|
recent = list(history[-recent_window:]) if len(history) >= recent_window else list(history)
|
|
|
|
tuning = Tuning(
|
|
pool=pool,
|
|
top_k=top_k,
|
|
recent_window=recent_window,
|
|
max_pair_overlap=max_pair_overlap,
|
|
recent_penalty_3=recent_penalty_3,
|
|
recent_penalty_4plus=recent_penalty_4plus,
|
|
max_run_penalty=max_run_penalty,
|
|
decade_concentration_penalty=decade_concentration_penalty,
|
|
)
|
|
|
|
scored: List[Tuple[float, Ball]] = []
|
|
seen: Set[Ball] = set()
|
|
|
|
# sample pool
|
|
for _ in range(pool):
|
|
ball = tuple(sorted(rng.sample(range(1, 46), 6))) # type: ignore[assignment]
|
|
if ball in seen:
|
|
continue
|
|
seen.add(ball)
|
|
sc = score_ball(ball, dists, history_set, recent, tuning)
|
|
if sc == float("-inf"):
|
|
continue
|
|
scored.append((sc, ball))
|
|
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
top = [b for _, b in scored[: top_k]]
|
|
scores_map = {b: sc for sc, b in scored[: top_k]}
|
|
|
|
chosen = select_diverse(top, scores_map, count=count, max_pair_overlap=max_pair_overlap)
|
|
|
|
# If we couldn't pick enough due to overlap constraints, relax progressively.
|
|
if len(chosen) < count:
|
|
for relax in [3, 4, 5]:
|
|
chosen = select_diverse(top, scores_map, count=count, max_pair_overlap=relax)
|
|
if len(chosen) >= count:
|
|
chosen = chosen[:count]
|
|
break
|
|
|
|
return chosen
|
|
|
|
|
|
def summarize(picks: Sequence[Ball], recent: Sequence[Ball]) -> Dict[str, object]:
|
|
# pairwise overlap stats
|
|
mx_pair = 0
|
|
pair_hist = Counter()
|
|
for i in range(len(picks)):
|
|
for j in range(i + 1, len(picks)):
|
|
o = overlap(picks[i], picks[j])
|
|
mx_pair = max(mx_pair, o)
|
|
pair_hist[o] += 1
|
|
|
|
# overlap with recent draws
|
|
mx_recent = 0
|
|
recent_hist = Counter()
|
|
for b in picks:
|
|
o = max_recent_overlap(b, recent)
|
|
mx_recent = max(mx_recent, o)
|
|
recent_hist[o] += 1
|
|
|
|
return {
|
|
"max_pair_overlap": mx_pair,
|
|
"pair_overlap_hist": dict(sorted(pair_hist.items())),
|
|
"max_recent_overlap": mx_recent,
|
|
"recent_overlap_hist": dict(sorted(recent_hist.items())),
|
|
}
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--history", default=os.path.join("resources", "lotto_history.txt"))
|
|
p.add_argument("--count", type=int, default=10)
|
|
p.add_argument("--seed", type=int, default=42)
|
|
p.add_argument(
|
|
"--profile",
|
|
choices=sorted(PRESETS.keys()),
|
|
default="balanced",
|
|
help="Tuning preset. You can still override any individual knob below.",
|
|
)
|
|
p.add_argument("--pool", type=int, default=None, help="Number of random candidates to sample.")
|
|
p.add_argument("--top-k", type=int, default=None, help="Keep top-K scored candidates before diversification.")
|
|
p.add_argument("--recent-window", type=int, default=None, help="Recent draw window size for overlap penalty.")
|
|
p.add_argument("--max-pair-overlap", type=int, default=None, help="Max allowed overlap between chosen picks (greedy).")
|
|
p.add_argument("--recent-penalty-3", type=float, default=None, help="Penalty if overlaps 3 with any recent draw.")
|
|
p.add_argument("--recent-penalty-4plus", type=float, default=None, help="Penalty if overlaps >=4 with any recent draw.")
|
|
p.add_argument("--max-run-penalty", type=float, default=None, help="Penalty if max consecutive run >=4.")
|
|
p.add_argument("--decade-concentration-penalty", type=float, default=None, help="Penalty if >=5 numbers in a decade bucket.")
|
|
p.add_argument("--no-report", action="store_true", help="Do not print overlap summary.")
|
|
args = p.parse_args()
|
|
|
|
history = parse_history_txt(args.history)
|
|
if not history:
|
|
raise SystemExit(f"History is empty or not readable: {args.history}")
|
|
|
|
preset = PRESETS[args.profile]
|
|
pool = int(args.pool) if args.pool is not None else preset.pool
|
|
top_k = int(args.top_k) if args.top_k is not None else preset.top_k
|
|
recent_window = int(args.recent_window) if args.recent_window is not None else preset.recent_window
|
|
max_pair_overlap = int(args.max_pair_overlap) if args.max_pair_overlap is not None else preset.max_pair_overlap
|
|
recent_penalty_3 = float(args.recent_penalty_3) if args.recent_penalty_3 is not None else preset.recent_penalty_3
|
|
recent_penalty_4plus = float(args.recent_penalty_4plus) if args.recent_penalty_4plus is not None else preset.recent_penalty_4plus
|
|
max_run_penalty = float(args.max_run_penalty) if args.max_run_penalty is not None else preset.max_run_penalty
|
|
decade_concentration_penalty = float(args.decade_concentration_penalty) if args.decade_concentration_penalty is not None else preset.decade_concentration_penalty
|
|
|
|
picks = generate_fixed10(
|
|
history=history,
|
|
count=args.count,
|
|
seed=args.seed,
|
|
pool=pool,
|
|
top_k=top_k,
|
|
recent_window=recent_window,
|
|
max_pair_overlap=max_pair_overlap,
|
|
recent_penalty_3=recent_penalty_3,
|
|
recent_penalty_4plus=recent_penalty_4plus,
|
|
max_run_penalty=max_run_penalty,
|
|
decade_concentration_penalty=decade_concentration_penalty,
|
|
)
|
|
|
|
print(f"history draws: {len(history)}")
|
|
print(
|
|
"fixed picks "
|
|
f"(profile={args.profile}, count={len(picks)}, seed={args.seed}, "
|
|
f"pool={pool}, top_k={top_k}, recent_window={recent_window}, max_pair_overlap={max_pair_overlap}):"
|
|
)
|
|
for i, b in enumerate(picks, start=1):
|
|
print(f"{i:2d}. {list(b)}")
|
|
|
|
if not args.no_report:
|
|
recent = list(history[-recent_window:]) if len(history) >= recent_window else list(history)
|
|
rep = summarize(picks, recent)
|
|
print("\nsummary:")
|
|
print(f"- max_pair_overlap: {rep['max_pair_overlap']}")
|
|
print(f"- pair_overlap_hist: {rep['pair_overlap_hist']}")
|
|
print(f"- max_recent_overlap: {rep['max_recent_overlap']}")
|
|
print(f"- recent_overlap_hist: {rep['recent_overlap_hist']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|