Files
DeepLottery/practice_0.py
dsyoon c611b400ae init
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-25 18:32:11 +09:00

1082 lines
40 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
로또 6/45: 1~N회차(기본 1208) 과거 1등(6개 번호) 기록을 기반으로,
다음 회차(기본 1209)에 대해 “점수 기반 확률(추정)”이 높은 순서대로 상위 K(기본 20) 단일 번호를 추천한다.
⚠️ 중요 고지(프로그램 출력에도 포함):
- 로또는 원칙적으로 독립·무작위 추첨이므로 과거 데이터로 실제 당첨확률을 증가시킨다고 보장할 수 없다.
- 본 프로그램의 “확률”은 통계적 추정/휴리스틱 점수이며, 실험/학습 목적이다.
요구사항 핵심(요약):
- 입력: resources/lotto_history.txt / resources/lotto_history.json (둘 중 하나만 있어도 동작, 둘 다 있으면 교차검증 후 통합)
- 회차별 6개 번호(1~45, 중복 없음)만 확실히 추출하면 됨(보너스 번호는 무시)
- 피처 A: 베타-이항 posterior_mean
- 피처 B: 최근성 가중 출현율(EWMA; 지수감쇠)
- 피처 C: 마지막 출현 이후 갭 기반 약한 페널티(“안 나온지 오래”를 유리하게 가정하지 않도록 중립 수렴)
- 피처 D: 스케일링(z-score) 후 가중합 점수
- 롤링 백테스트 그리드서치로 (wA,wB,wC), half_life, gscale 튜닝
- 출력: 표 + JSON
설계/구현 계획(의사코드):
1) 데이터 로드
- txt/json 각각 로드 시도(없으면 None)
- txt: 정규식으로 정수 추출 → (회차, 번호6) 구성(보너스 등 추가 숫자는 무시/방어)
- json: 가능한 경우 (drwNo, drwtNo1..6) 우선, 아니면 numbers 배열, 그래도 아니면 방어적으로 숫자 추출
2) 정규화/정합성
- normalize_draw(numbers): 1..45 범위만, 중복 제거, 6개면 정렬 후 채택
- 회차 중복/누락/이상치 로그
- 둘 다 있으면 동일 회차 교차검증(번호 불일치 개수 경고), 충돌 시 우선순위(더 많이/더 신뢰되는 쪽)로 통합
3) 행렬화
- 회차 오름차순 정렬된 draws를 만들고, X[t,i]=번호(i+1)가 t회차에 포함되면 1 (t=0..N-1)
4) 피처 계산(훈련 길이 t에 대해)
- A: posterior_mean = (count + alpha) / (t + alpha + beta)
- B: EWMA:
r = exp(-ln2/half_life)
numer_t = sum_{k=0..t-1} r^k * x_{t-1-k}
denom_t = sum_{k=0..t-1} r^k
ewma_freq = numer_t/denom_t
- C: gap_score:
gap = current_draw_no - last_seen_draw_no (없으면 큰 값)
gap_score = -exp(-gap/gscale) # gap 작을수록(최근 출현) 페널티 강함, gap 커질수록 0으로 중립 수렴
- D: z-score로 A,B,C 각각 스케일링 후 score = wA*zA + wB*zB + wC*zC
5) 백테스트 & 튜닝(롤링)
- warmup..(N-1)까지: train[0:t)로 score 산출 → topK 번호 추천 → 실제 draw[t]와 교집합(hit) 계산
- 그리드:
weights: step 0.1, 합=1
half_life: [50,100,150,200,300,400] (+사용자 입력 포함)
gscale: [10,20,30,50] (+사용자 입력 포함)
- 지표:
overall mean hit@K, overall P(hit>=1)
recent window(마지막 valid_last 예측) mean hit@K, P(hit>=1)
objective = 0.7*recent_mean_hit + 0.3*overall_mean_hit (타이브레이크로 hit>=1)
6) 최종 추천
- best params로 전체(1..N) 학습 후 next_draw=N+1 추천 topK 출력(표+JSON)
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
import sys
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
try:
import numpy as np # type: ignore
except Exception:
np = None # type: ignore
NUM_MIN = 1
NUM_MAX = 45
NUM_COUNT = 45
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
def normalize_draw(numbers: Sequence[int]) -> Optional[Tuple[int, ...]]:
"""번호 리스트에서 1..45 범위의 중복 없는 6개를 추출/정규화(정렬)한다."""
clean = [int(x) for x in numbers if NUM_MIN <= int(x) <= NUM_MAX]
uniq = sorted(set(clean))
if len(uniq) != 6:
return None
return tuple(uniq)
def _extract_ints(s: str) -> List[int]:
return [int(x) for x in re.findall(r"\d+", s)]
def load_history_txt(path: str) -> Dict[int, Tuple[int, ...]]:
"""
txt는 포맷이 확정되지 않았으므로 방어적으로 파싱한다.
- 기본: 라인별로 정수 추출 후, 첫 번째 정수를 회차로 보고 이후 숫자 중 6개를 번호로 구성(보너스 등 추가는 무시)
- 실패 시: 전체 정수 스트림을 블록(8,7) 단위로 재시도
"""
if not os.path.exists(path):
raise FileNotFoundError(path)
by_round: Dict[int, Tuple[int, ...]] = {}
bad_lines = 0
with open(path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
for line in lines:
ints = _extract_ints(line)
if len(ints) < 7:
continue
rnd = ints[0]
rest = ints[1:]
# 흔한 케이스: 회차 + 6개 + 보너스(7개)
candidates = []
if len(rest) >= 6:
candidates.append(rest[:6])
candidates.append(rest[-6:])
candidates.append(list(dict.fromkeys([x for x in rest if NUM_MIN <= x <= NUM_MAX]))[:6])
draw = None
for cand in candidates:
draw = normalize_draw(cand)
if draw is not None:
break
if draw is None:
bad_lines += 1
continue
if rnd in by_round and by_round[rnd] != draw:
eprint(f"[WARN][txt] 회차 {rnd} 중복/불일치: {by_round[rnd]} vs {draw} (첫 값 유지)")
continue
by_round[rnd] = draw
# 라인 기반 파싱이 거의 실패하면 전체 스트림 기반으로 복구 시도
if len(by_round) < max(50, len(lines) // 10):
ints_all: List[int] = []
for line in lines:
ints_all.extend(_extract_ints(line))
def try_block(block_size: int) -> Dict[int, Tuple[int, ...]]:
out: Dict[int, Tuple[int, ...]] = {}
if block_size <= 0 or len(ints_all) < block_size:
return out
if len(ints_all) % block_size != 0:
# 나머지가 있어도 앞부분만 시도
limit = len(ints_all) - (len(ints_all) % block_size)
else:
limit = len(ints_all)
ok = 0
for i in range(0, limit, block_size):
block = ints_all[i : i + block_size]
if len(block) < 7:
continue
rnd = block[0]
rest = block[1:]
if len(rest) < 6:
continue
draw = normalize_draw(rest[:6])
if draw is None:
continue
out[rnd] = draw
ok += 1
return out
recovered8 = try_block(8) # (회차 + 6 + 보너스) 가능성
recovered7 = try_block(7) # (회차 + 6) 가능성
recovered = recovered8 if len(recovered8) >= len(recovered7) else recovered7
if len(recovered) > len(by_round):
eprint(f"[INFO][txt] 라인 파싱이 약함 → 스트림 기반 복구 사용: {len(recovered)}회차")
by_round = recovered
if bad_lines:
eprint(f"[WARN][txt] 파싱 실패 라인 수: {bad_lines}")
return by_round
def _coerce_json_obj_to_round_and_numbers(obj: Any) -> Optional[Tuple[int, List[int]]]:
"""JSON 오브젝트에서 (회차, 번호후보들)을 최대한 방어적으로 추출한다."""
if isinstance(obj, dict):
# 1) 대표 구조: drwNo + drwtNo1..6
if "drwNo" in obj:
try:
rnd = int(obj["drwNo"])
except Exception:
rnd = None # type: ignore
nums: List[int] = []
for k in ("drwtNo1", "drwtNo2", "drwtNo3", "drwtNo4", "drwtNo5", "drwtNo6"):
if k in obj:
try:
nums.append(int(obj[k]))
except Exception:
pass
if rnd is not None and len(nums) >= 6:
return rnd, nums[:6]
# 2) 다른 흔한 구조: (round, numbers)
for rk in ("round", "draw", "drawNo", "no", "id"):
if rk in obj:
try:
rnd = int(obj[rk])
except Exception:
continue
nums = []
if "numbers" in obj and isinstance(obj["numbers"], (list, tuple)):
for x in obj["numbers"]:
try:
nums.append(int(x))
except Exception:
pass
# dict 내부에 숫자들이 흩어져 있어도 방어적으로 수집
if len(nums) < 6:
nums = []
for v in obj.values():
if isinstance(v, (int, float)) and NUM_MIN <= int(v) <= NUM_MAX:
nums.append(int(v))
if len(nums) >= 6:
return rnd, nums[:6]
# 3) 최후: dict를 문자열로 만들고 숫자 추출(회차/번호 분리 어려움 → 실패 처리)
return None
if isinstance(obj, (list, tuple)):
# list 자체가 [회차, n1..] 형태일 수도
ints = []
for x in obj:
if isinstance(x, (int, float)):
ints.append(int(x))
elif isinstance(x, str):
ints.extend(_extract_ints(x))
if len(ints) >= 7:
return ints[0], ints[1:7]
return None
def load_history_json(path: str) -> Dict[int, Tuple[int, ...]]:
"""
json은 다음을 우선 시도:
- 라인별 JSON(dict) 파싱 (현재 리소스는 라인-델리미티드 dict 형태)
- 전체 파일 JSON 파싱 (array/object)도 방어적으로 처리
"""
if not os.path.exists(path):
raise FileNotFoundError(path)
by_round: Dict[int, Tuple[int, ...]] = {}
bad = 0
def ingest_obj(obj: Any) -> None:
nonlocal bad, by_round
got = _coerce_json_obj_to_round_and_numbers(obj)
if not got:
bad += 1
return
rnd, nums = got
draw = normalize_draw(nums)
if draw is None:
bad += 1
return
if rnd in by_round and by_round[rnd] != draw:
eprint(f"[WARN][json] 회차 {rnd} 중복/불일치: {by_round[rnd]} vs {draw} (첫 값 유지)")
return
by_round[rnd] = draw
with open(path, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
# 1) 라인별 JSON 우선 (여기선 거의 확실)
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
line_success = 0
for ln in lines:
# 단일 JSON object 라인일 가능성
if not (ln.startswith("{") and ln.endswith("}")):
continue
try:
obj = json.loads(ln)
ingest_obj(obj)
line_success += 1
except Exception:
continue
# 2) 라인 방식이 거의 실패하면 전체 JSON 파싱
if line_success < max(50, len(lines) // 10):
try:
obj = json.loads(text)
if isinstance(obj, list):
for item in obj:
ingest_obj(item)
elif isinstance(obj, dict):
# dict 안에 list가 들어있을 수도
if "data" in obj and isinstance(obj["data"], list):
for item in obj["data"]:
ingest_obj(item)
else:
# 값들 중 dict/list를 훑기
for v in obj.values():
if isinstance(v, (list, dict)):
ingest_obj(v)
except Exception:
pass
if bad:
eprint(f"[WARN][json] 파싱 실패/무시 레코드 수(추정): {bad}")
return by_round
def cross_validate_and_merge(
txt_data: Optional[Dict[int, Tuple[int, ...]]],
json_data: Optional[Dict[int, Tuple[int, ...]]],
) -> Dict[int, Tuple[int, ...]]:
"""둘 다 있을 경우 교차검증 후 통합. 충돌 시 더 많은 레코드를 가진 쪽을 우선."""
if not txt_data and not json_data:
return {}
if txt_data and not json_data:
return dict(txt_data)
if json_data and not txt_data:
return dict(json_data)
assert txt_data is not None and json_data is not None
common = sorted(set(txt_data) & set(json_data))
mismatch = 0
for rnd in common:
if txt_data[rnd] != json_data[rnd]:
mismatch += 1
if mismatch <= 20:
eprint(f"[WARN] 교차검증 불일치 회차 {rnd}: txt={txt_data[rnd]} json={json_data[rnd]}")
if mismatch:
eprint(f"[WARN] 교차검증: 공통 {len(common)}개 중 불일치 {mismatch}")
else:
eprint(f"[INFO] 교차검증: 공통 {len(common)}개 모두 일치")
base = json_data if len(json_data) >= len(txt_data) else txt_data
other = txt_data if base is json_data else json_data
merged: Dict[int, Tuple[int, ...]] = dict(base)
added = 0
conflict = 0
for rnd, draw in other.items():
if rnd not in merged:
merged[rnd] = draw
added += 1
else:
if merged[rnd] != draw:
conflict += 1
# 충돌은 base 유지
if added:
eprint(f"[INFO] 통합: 누락 회차 추가 {added}")
if conflict:
eprint(f"[WARN] 통합: 충돌 {conflict}개(우선 데이터 유지)")
return merged
def sanitize_and_sort_draws(
merged: Dict[int, Tuple[int, ...]],
min_round: int = 1,
max_round: Optional[int] = None,
) -> List[Tuple[int, Tuple[int, ...]]]:
"""회차 정렬 + 범위/이상치 제거."""
items = []
dropped = 0
for rnd, draw in merged.items():
if rnd < min_round:
dropped += 1
continue
if max_round is not None and rnd > max_round:
dropped += 1
continue
if draw is None or len(draw) != 6:
dropped += 1
continue
if any((x < NUM_MIN or x > NUM_MAX) for x in draw) or len(set(draw)) != 6:
dropped += 1
continue
items.append((int(rnd), tuple(sorted(draw))))
items.sort(key=lambda x: x[0])
if dropped:
eprint(f"[WARN] 이상치/범위 밖/형식 불량 제거: {dropped}")
return items
def build_indicator_matrix(draws: List[Tuple[int, Tuple[int, ...]]]) -> Tuple["np.ndarray", "np.ndarray"]:
"""draws -> (draw_nos, X) where X[t,i] in {0,1} for number i+1."""
if np is None:
raise RuntimeError("numpy가 필요합니다(성능/구현 단순화를 위해). requirements.txt를 확인하세요.")
draw_nos = np.array([rnd for rnd, _ in draws], dtype=np.int32)
X = np.zeros((len(draws), NUM_COUNT), dtype=np.int8)
for t, (_, nums) in enumerate(draws):
for n in nums:
X[t, n - 1] = 1
return draw_nos, X
def zscore(x: "np.ndarray", eps: float = 1e-12) -> "np.ndarray":
mu = float(np.mean(x))
sd = float(np.std(x))
if sd < eps:
return x * 0.0
return (x - mu) / sd
def zscore_rowwise(M: "np.ndarray", eps: float = 1e-12) -> "np.ndarray":
"""행(row) 단위 z-score. M: (T,45) -> (T,45)"""
mu = np.mean(M, axis=1, keepdims=True)
sd = np.std(M, axis=1, keepdims=True)
out = (M - mu).astype(np.float64, copy=False)
# np.where는 양쪽 branch를 평가해서 RuntimeWarning이 날 수 있으므로 np.divide(where=...) 사용
z = np.zeros_like(out, dtype=np.float64)
np.divide(out, sd, out=z, where=sd >= eps)
return z
@dataclass(frozen=True)
class ScoringParams:
alpha: float = 1.0
beta: float = 1.0
half_life: int = 200
gscale: float = 20.0
wA: float = 0.5
wB: float = 0.4
wC: float = 0.1
def normalized_weights(self) -> "ScoringParams":
s = self.wA + self.wB + self.wC
if s <= 0:
return self
return ScoringParams(
alpha=self.alpha,
beta=self.beta,
half_life=self.half_life,
gscale=self.gscale,
wA=self.wA / s,
wB=self.wB / s,
wC=self.wC / s,
)
def precompute_last_seen(X: "np.ndarray") -> "np.ndarray":
"""last_seen_end[t,i] = t개 훈련(0..t-1 포함) 이후, 번호 i의 마지막 등장 index (없으면 -1). shape (N+1,45)."""
N = X.shape[0]
last_seen_end = np.full((N + 1, NUM_COUNT), -1, dtype=np.int32)
last = np.full((NUM_COUNT,), -1, dtype=np.int32)
last_seen_end[0] = last
for idx in range(N):
hits = np.nonzero(X[idx])[0]
if hits.size:
last[hits] = idx
# idx번째 draw까지 포함한 상태가 train_size=idx+1
last_seen_end[idx + 1] = last
return last_seen_end
def precompute_ewma_numer_denoms(X: "np.ndarray", half_life: int) -> Tuple["np.ndarray", "np.ndarray"]:
"""
EWMA numerator/denominator를 훈련 길이별로 미리 계산.
- numer_end[t,i] = t개 훈련(0..t-1)의 weighted sum
- denom_end[t] = sum_{k=0..t-1} r^k
"""
N = X.shape[0]
hl = max(1, int(half_life))
lam = math.log(2.0) / float(hl)
r = math.exp(-lam) # per-step decay
numer_end = np.zeros((N + 1, NUM_COUNT), dtype=np.float64)
denom_end = np.zeros((N + 1,), dtype=np.float64)
for t in range(1, N + 1):
numer_end[t] = X[t - 1].astype(np.float64) + r * numer_end[t - 1]
denom_end[t] = 1.0 + r * denom_end[t - 1]
denom_end[0] = 0.0
return numer_end, denom_end
def compute_features_at_train_size(
*,
train_size: int,
draw_nos: "np.ndarray",
X: "np.ndarray",
cum_counts: "np.ndarray",
last_seen_end: "np.ndarray",
ewma_numer_end: "np.ndarray",
ewma_denom_end: "np.ndarray",
params: ScoringParams,
) -> Dict[str, "np.ndarray"]:
"""
train_size=t인 시점의 피처를 45개 번호에 대해 계산.
반환:
posterior_mean (A), ewma_freq (B), gap (raw), gap_score (C raw), zA,zB,zC, score
"""
t = int(train_size)
if t <= 0:
raise ValueError("train_size must be >= 1")
# counts over first t draws
counts = cum_counts[t - 1].astype(np.float64)
alpha = float(params.alpha)
beta = float(params.beta)
posterior_mean = (counts + alpha) / (float(t) + alpha + beta)
denom = float(ewma_denom_end[t])
if denom <= 0:
ewma_freq = np.zeros((NUM_COUNT,), dtype=np.float64)
else:
ewma_freq = ewma_numer_end[t] / denom
# gap based on draw numbers (회차) to match "마지막 출현 이후 회차 수"
current_draw_no = int(draw_nos[t - 1])
last_idx = last_seen_end[t].astype(np.int32)
gap = np.zeros((NUM_COUNT,), dtype=np.float64)
seen_mask = last_idx >= 0
if np.any(seen_mask):
last_draw = draw_nos[last_idx.clip(min=0)]
gap[seen_mask] = (current_draw_no - last_draw[seen_mask]).astype(np.float64)
# never seen -> large gap (중립 수렴용)
gap[~seen_mask] = float(current_draw_no + 999)
gscale = float(params.gscale)
if gscale <= 0:
gscale = 20.0
# 최근(갭 짧음)일수록 -1에 가깝게, 오래되면 0으로 수렴 (U자 가정 금지)
gap_score = -np.exp(-gap / gscale)
zA = zscore(posterior_mean)
zB = zscore(ewma_freq)
zC = zscore(gap_score)
w = params.normalized_weights()
score = w.wA * zA + w.wB * zB + w.wC * zC
return {
"posterior_mean": posterior_mean,
"ewma_freq": ewma_freq,
"gap": gap,
"gap_score": gap_score,
"zA": zA,
"zB": zB,
"zC": zC,
"score": score,
}
@dataclass
class BacktestResult:
params: ScoringParams
topk: int
warmup: int
n_eval: int
overall_mean_hit: float
overall_hit1_rate: float
recent_mean_hit: float
recent_hit1_rate: float
objective: float
def _iter_weight_grid(step: float = 0.1) -> Iterable[Tuple[float, float, float]]:
# wA, wB in [0..1], wC=1-wA-wB, step grid
s = float(step)
if s <= 0:
s = 0.1
# to avoid float drift, operate on integer ticks
ticks = int(round(1.0 / s))
for a in range(ticks + 1):
for b in range(ticks + 1 - a):
c = ticks - a - b
yield a / ticks, b / ticks, c / ticks
def backtest_and_tune(
draws: List[Tuple[int, Tuple[int, ...]]],
*,
topk: int = 20,
warmup: int = 300,
valid_last: int = 200,
alpha: float = 1.0,
beta: float = 1.0,
half_life_grid: Sequence[int] = (50, 100, 150, 200, 300, 400),
gscale_grid: Sequence[float] = (10.0, 20.0, 30.0, 50.0),
weight_step: float = 0.1,
) -> BacktestResult:
if np is None:
raise RuntimeError("numpy가 필요합니다.")
if len(draws) < warmup + 2:
raise ValueError(f"데이터가 너무 적습니다: {len(draws)}회차 (warmup={warmup})")
draw_nos, X = build_indicator_matrix(draws)
N = int(X.shape[0])
topk = max(1, min(int(topk), NUM_COUNT))
# 누적 카운트 (N,45)
cum_counts = np.cumsum(X, axis=0, dtype=np.int32)
# counts_end[t] = 0..t-1까지의 카운트 (shape N+1,45)
counts_end = np.zeros((N + 1, NUM_COUNT), dtype=np.int32)
counts_end[1:] = cum_counts
last_seen_end = precompute_last_seen(X)
# gap_end[t,i] = train_size=t 시점의 갭(회차 기준). (shape N+1,45)
gap_end = np.zeros((N + 1, NUM_COUNT), dtype=np.float64)
gap_end[0] = 0.0
for t in range(1, N + 1):
current_draw_no = int(draw_nos[t - 1])
last_idx = last_seen_end[t]
seen = last_idx >= 0
gap = np.empty((NUM_COUNT,), dtype=np.float64)
if np.any(seen):
gap[seen] = (current_draw_no - draw_nos[last_idx[seen]]).astype(np.float64)
gap[~seen] = float(current_draw_no + 999)
gap_end[t] = gap
# 평가 구간: train_size=t로 target index=t 예측
start_t = max(1, int(warmup))
end_t = N - 1
n_eval = end_t - start_t + 1
if n_eval <= 0:
raise ValueError("평가 구간이 비었습니다. warmup을 줄이세요.")
recent_len = max(1, min(int(valid_last), n_eval))
recent_start_t = end_t - recent_len + 1
# 실제 정답 마스크
actual_mask = X.astype(np.int8)
# 그리드 정리
hl_list = sorted(set(int(x) for x in half_life_grid if int(x) > 0))
gs_list = sorted(set(float(x) for x in gscale_grid if float(x) > 0))
weight_list = list(_iter_weight_grid(step=weight_step))
W = np.array(weight_list, dtype=np.float64).T # (3,M)
M = int(W.shape[1])
eprint(
f"[INFO] 튜닝 시작(가속): weights={M} half_life={len(hl_list)} gscale={len(gs_list)} "
f"→ 총 {M*len(hl_list)*len(gs_list)} 조합, eval={n_eval} (recent={recent_len})"
)
# A는 half_life/gscale과 무관 (alpha,beta 고정)
t_vec = np.arange(N + 1, dtype=np.float64).reshape(-1, 1) # (N+1,1)
A_end = (counts_end.astype(np.float64) + float(alpha)) / (t_vec + float(alpha) + float(beta))
zA_end = zscore_rowwise(A_end)
best: Optional[BacktestResult] = None
nums = np.arange(1, NUM_COUNT + 1, dtype=np.int32) # tie-break key
for hl in hl_list:
ewma_numer_end, ewma_denom_end = precompute_ewma_numer_denoms(X, hl)
denom = ewma_denom_end.reshape(-1, 1)
B_end = np.zeros_like(ewma_numer_end, dtype=np.float64)
np.divide(ewma_numer_end, denom, out=B_end, where=denom > 0)
zB_end = zscore_rowwise(B_end)
for gs in gs_list:
gap_score_end = -np.exp(-gap_end / float(gs))
zC_end = zscore_rowwise(gap_score_end)
# 누적 hit/hit1 (M개 weight에 대해 벡터)
total_hit = np.zeros((M,), dtype=np.int32)
total_hit1 = np.zeros((M,), dtype=np.int32)
recent_hit = np.zeros((M,), dtype=np.int32)
recent_hit1 = np.zeros((M,), dtype=np.int32)
for t in range(start_t, end_t + 1):
# z-features at train_size=t : (45,3)
Z = np.stack([zA_end[t], zB_end[t], zC_end[t]], axis=1) # (45,3)
scores = Z @ W # (45,M)
# topK indices per column (order not needed for hit 계산)
picks = np.argpartition(-scores, topk - 1, axis=0)[:topk, :] # (topk,M)
hits = actual_mask[t][picks].sum(axis=0).astype(np.int32) # (M,)
total_hit += hits
total_hit1 += (hits >= 1).astype(np.int32)
if t >= recent_start_t:
recent_hit += hits
recent_hit1 += (hits >= 1).astype(np.int32)
overall_mean_hit = total_hit.astype(np.float64) / float(n_eval)
overall_hit1_rate = total_hit1.astype(np.float64) / float(n_eval)
recent_mean_hit = recent_hit.astype(np.float64) / float(recent_len)
recent_hit1_rate = recent_hit1.astype(np.float64) / float(recent_len)
objective = 0.7 * recent_mean_hit + 0.3 * overall_mean_hit
# (hl,gs)에서 best weight 선택: objective, recent_hit1, overall_mean, overall_hit1
# numpy.lexsort는 마지막 키가 1순위(오름차순) → 음수로 내림차순 효과
order = np.lexsort(
(
-overall_hit1_rate,
-overall_mean_hit,
-recent_hit1_rate,
-objective,
)
)
best_w_idx = int(order[0])
wA, wB, wC = float(W[0, best_w_idx]), float(W[1, best_w_idx]), float(W[2, best_w_idx])
params = ScoringParams(alpha=alpha, beta=beta, half_life=hl, gscale=gs, wA=wA, wB=wB, wC=wC).normalized_weights()
cand = BacktestResult(
params=params,
topk=topk,
warmup=warmup,
n_eval=n_eval,
overall_mean_hit=float(overall_mean_hit[best_w_idx]),
overall_hit1_rate=float(overall_hit1_rate[best_w_idx]),
recent_mean_hit=float(recent_mean_hit[best_w_idx]),
recent_hit1_rate=float(recent_hit1_rate[best_w_idx]),
objective=float(objective[best_w_idx]),
)
if best is None:
best = cand
else:
if (cand.objective > best.objective + 1e-12) or (
abs(cand.objective - best.objective) <= 1e-12
and (
cand.recent_hit1_rate > best.recent_hit1_rate + 1e-12
or (
abs(cand.recent_hit1_rate - best.recent_hit1_rate) <= 1e-12
and (
cand.overall_mean_hit > best.overall_mean_hit + 1e-12
or (
abs(cand.overall_mean_hit - best.overall_mean_hit) <= 1e-12
and cand.overall_hit1_rate > best.overall_hit1_rate + 1e-12
)
)
)
)
):
best = cand
assert best is not None
return best
def _reason_string(zA: float, zB: float, gap: float, gap_score: float, params: ScoringParams) -> str:
parts: List[str] = []
if zB >= 0.7:
parts.append("최근성↑")
elif zB <= -0.7:
parts.append("최근성↓")
if zA >= 0.7:
parts.append("장기빈도↑")
elif zA <= -0.7:
parts.append("장기빈도↓")
# gap_score는 음수(페널티), 0에 가까울수록 중립
if gap_score <= -0.7:
parts.append("최근출현페널티↑")
elif gap_score >= -0.1:
parts.append("갭영향↓(중립)")
else:
parts.append("갭영향(약)")
# 숫자로 한 번 더
parts.append(f"gap={int(round(gap))}")
return ", ".join(parts)
def recommend(
draws: List[Tuple[int, Tuple[int, ...]]],
*,
best: BacktestResult,
target_draw_no: int,
topk: int,
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
if np is None:
raise RuntimeError("numpy가 필요합니다.")
draw_nos, X = build_indicator_matrix(draws)
N = int(X.shape[0])
cum_counts = np.cumsum(X, axis=0, dtype=np.int32)
last_seen_end = precompute_last_seen(X)
ewma_numer_end, ewma_denom_end = precompute_ewma_numer_denoms(X, best.params.half_life)
feats = compute_features_at_train_size(
train_size=N,
draw_nos=draw_nos,
X=X,
cum_counts=cum_counts,
last_seen_end=last_seen_end,
ewma_numer_end=ewma_numer_end,
ewma_denom_end=ewma_denom_end,
params=best.params,
)
score = feats["score"]
nums = np.arange(1, NUM_COUNT + 1, dtype=np.int32)
order = np.lexsort((nums, -score))
order = order[:topk]
recs: List[Dict[str, Any]] = []
for rank, idx in enumerate(order, start=1):
n = int(idx + 1)
rec = {
"rank": rank,
"number": n,
"score": float(score[idx]),
"posterior_mean": float(feats["posterior_mean"][idx]),
"ewma_freq": float(feats["ewma_freq"][idx]),
"gap": float(feats["gap"][idx]),
"reason": _reason_string(
float(feats["zA"][idx]),
float(feats["zB"][idx]),
float(feats["gap"][idx]),
float(feats["gap_score"][idx]),
best.params,
),
"feature_z": {
"A": float(feats["zA"][idx]),
"B": float(feats["zB"][idx]),
"C": float(feats["zC"][idx]),
},
"weights": {"wA": best.params.wA, "wB": best.params.wB, "wC": best.params.wC},
}
recs.append(rec)
out_json = {
"draw": int(target_draw_no),
"topk": int(topk),
"disclaimer": [
"로또는 원칙적으로 독립·무작위 추첨이므로 과거 데이터로 실제 당첨확률을 증가시킨다고 보장할 수 없습니다.",
"본 프로그램의 '확률'은 통계적 추정/휴리스틱 점수이며, 실험/학습 목적입니다.",
],
"best_params": {
"alpha": best.params.alpha,
"beta": best.params.beta,
"half_life": best.params.half_life,
"gscale": best.params.gscale,
"wA": best.params.wA,
"wB": best.params.wB,
"wC": best.params.wC,
},
"backtest": {
"warmup": best.warmup,
"n_eval": best.n_eval,
"overall_mean_hit": best.overall_mean_hit,
"overall_hit>=1_rate": best.overall_hit1_rate,
"recent_mean_hit": best.recent_mean_hit,
"recent_hit>=1_rate": best.recent_hit1_rate,
"objective": best.objective,
},
"recommendations": recs,
}
return recs, out_json
def print_table(recs: List[Dict[str, Any]]) -> None:
# 콘솔 표: rank, number, score, posterior_mean, ewma_freq, last_gap, reason
headers = ["rank", "number", "score", "posterior_mean", "ewma_freq", "gap", "reason"]
rows = []
for r in recs:
rows.append([
str(r["rank"]),
str(r["number"]),
f'{r["score"]:+.6f}',
f'{r["posterior_mean"]:.6f}',
f'{r["ewma_freq"]:.6f}',
f'{r["gap"]:.0f}',
str(r["reason"]),
])
widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(cell))
def fmt_row(row: Sequence[str]) -> str:
return " | ".join(row[i].ljust(widths[i]) for i in range(len(headers)))
sep = "-+-".join("-" * w for w in widths)
print(fmt_row(headers))
print(sep)
for row in rows:
print(fmt_row(row))
def main() -> None:
parser = argparse.ArgumentParser(description="로또 6/45 점수 기반(휴리스틱) 단일 번호 TopK 추천")
parser.add_argument("--topk", type=int, default=20, help="추천할 단일 번호 개수 (기본 20)")
parser.add_argument("--half_life", type=int, default=200, help="EWMA half-life (튜닝 그리드에 포함, 기본 200)")
parser.add_argument("--seed", type=int, default=42, help="재현성용 seed (현재는 타이브레이크에 사용하지 않음, 기본 42)")
parser.add_argument("--warmup", type=int, default=300, help="롤링 검증 워밍업(학습 최소 회차 수), 기본 300")
parser.add_argument("--valid_last", type=int, default=200, help="최근 성능 가중 평가에 쓰는 마지막 예측 개수, 기본 200")
parser.add_argument("--alpha", type=float, default=1.0, help="Beta prior alpha (기본 1)")
parser.add_argument("--beta", type=float, default=1.0, help="Beta prior beta (기본 1)")
parser.add_argument(
"--prior",
type=str,
default="uniform",
choices=["uniform", "near_6_45"],
help="prior 선택: uniform=Beta(1,1), near_6_45=Beta(6,39)",
)
parser.add_argument(
"--max_round",
type=int,
default=1208,
help="사용할 최대 회차 (기본 1208). 데이터가 더 많으면 잘라서 사용.",
)
parser.add_argument(
"--no_tune",
action="store_true",
help="튜닝(그리드서치) 비활성화: 기본 파라미터로만 추천",
)
parser.add_argument(
"--half_life_grid",
type=str,
default="50,100,150,200,300,400",
help="튜닝 half_life 후보(쉼표구분). 기본 '50,100,150,200,300,400'",
)
parser.add_argument(
"--gscale_grid",
type=str,
default="10,20,30,50",
help="튜닝 gscale 후보(쉼표구분). 기본 '10,20,30,50'",
)
parser.add_argument(
"--weight_step",
type=float,
default=0.1,
help="가중치 그리드 step (합=1, 기본 0.1)",
)
args = parser.parse_args()
# seed은 현재 타이브레이크에 랜덤을 쓰지 않지만, 확장 대비로 고정
try:
import random
random.seed(int(args.seed))
except Exception:
pass
print("⚠️ 중요 고지")
print("- 로또는 원칙적으로 독립·무작위 추첨이므로 과거 데이터로 실제 당첨확률을 증가시킨다고 보장할 수 없습니다.")
print("- 본 프로그램의 '확률'은 통계적 추정/휴리스틱 점수이며, 실험/학습 목적입니다.")
print()
# prior shortcut
alpha = float(args.alpha)
beta = float(args.beta)
if args.prior == "near_6_45":
alpha, beta = 6.0, 39.0
eprint("[INFO] prior=near_6_45 → Beta(6,39)")
else:
eprint("[INFO] prior=uniform → Beta(alpha,beta) (기본 1,1)")
base_dir = os.path.dirname(os.path.abspath(__file__))
txt_path = os.path.join(base_dir, "resources", "lotto_history.txt")
json_path = os.path.join(base_dir, "resources", "lotto_history.json")
txt_data = None
json_data = None
try:
txt_data = load_history_txt(txt_path)
eprint(f"[INFO] txt 로드: {len(txt_data)}회차")
except FileNotFoundError:
eprint(f"[INFO] txt 없음: {txt_path}")
except Exception as ex:
eprint(f"[WARN] txt 로드 실패: {ex}")
try:
json_data = load_history_json(json_path)
eprint(f"[INFO] json 로드: {len(json_data)}회차")
except FileNotFoundError:
eprint(f"[INFO] json 없음: {json_path}")
except Exception as ex:
eprint(f"[WARN] json 로드 실패: {ex}")
merged = cross_validate_and_merge(txt_data, json_data)
if not merged:
eprint("[ERROR] 입력 데이터가 없습니다. resources/lotto_history.txt 또는 .json 중 하나가 필요합니다.")
sys.exit(2)
max_round = int(args.max_round) if args.max_round else None
draws = sanitize_and_sort_draws(merged, min_round=1, max_round=max_round)
if not draws:
eprint("[ERROR] 유효한 회차/번호가 없습니다.")
sys.exit(2)
# 연속성/누락 간단 체크
round_list = [rnd for rnd, _ in draws]
if round_list[0] != 1:
eprint(f"[WARN] 시작 회차가 1이 아님: {round_list[0]}")
if max_round is not None and round_list[-1] != max_round:
eprint(f"[WARN] 최대 회차가 {max_round}가 아님: {round_list[-1]}")
missing = []
if round_list:
expected = set(range(round_list[0], round_list[-1] + 1))
missing = sorted(expected - set(round_list))
if missing:
eprint(f"[WARN] 누락 회차 수: {len(missing)} (예: {missing[:10]}{'...' if len(missing)>10 else ''})")
last_round = round_list[-1]
target_draw = last_round + 1
if max_round is not None:
# 사용자가 max_round=1208로 고정했으면 target은 1209가 되도록 맞춤
if last_round != max_round:
eprint(f"[WARN] max_round={max_round}로 잘랐지만 실제 마지막 회차={last_round}. target={target_draw}")
# 튜닝 그리드 파라미터 구성
def parse_num_list(s: str, typ: Any) -> List[Any]:
out = []
for tok in (s or "").split(","):
tok = tok.strip()
if not tok:
continue
try:
out.append(typ(tok))
except Exception:
pass
return out
half_life_grid = parse_num_list(args.half_life_grid, int)
if int(args.half_life) not in set(int(x) for x in half_life_grid):
half_life_grid.append(int(args.half_life))
half_life_grid = sorted(set(int(x) for x in half_life_grid if int(x) > 0))
gscale_grid = parse_num_list(args.gscale_grid, float)
gscale_grid = sorted(set(float(x) for x in gscale_grid if float(x) > 0))
if not gscale_grid:
gscale_grid = [20.0]
topk = int(args.topk)
warmup = int(args.warmup)
valid_last = int(args.valid_last)
if np is None:
eprint("[ERROR] numpy가 설치되어 있지 않아 실행할 수 없습니다. (성능/구현 단순화를 위해 numpy를 사용합니다)")
sys.exit(2)
if args.no_tune:
# 튜닝 없이 기본 값 사용
best = BacktestResult(
params=ScoringParams(
alpha=alpha,
beta=beta,
half_life=int(args.half_life),
gscale=float(gscale_grid[0]),
wA=0.5,
wB=0.4,
wC=0.1,
).normalized_weights(),
topk=topk,
warmup=warmup,
n_eval=0,
overall_mean_hit=float("nan"),
overall_hit1_rate=float("nan"),
recent_mean_hit=float("nan"),
recent_hit1_rate=float("nan"),
objective=float("nan"),
)
eprint("[INFO] --no_tune: 기본 파라미터로 추천만 수행")
else:
best = backtest_and_tune(
draws,
topk=topk,
warmup=warmup,
valid_last=valid_last,
alpha=alpha,
beta=beta,
half_life_grid=half_life_grid,
gscale_grid=gscale_grid,
weight_step=float(args.weight_step),
)
eprint("[INFO] 튜닝 완료")
eprint(
f"[BEST] objective={best.objective:.6f} "
f"recent_mean_hit={best.recent_mean_hit:.4f} recent_hit>=1={best.recent_hit1_rate:.4f} | "
f"overall_mean_hit={best.overall_mean_hit:.4f} overall_hit>=1={best.overall_hit1_rate:.4f} | "
f"half_life={best.params.half_life} gscale={best.params.gscale} "
f"wA={best.params.wA:.1f} wB={best.params.wB:.1f} wC={best.params.wC:.1f} "
f"alpha={best.params.alpha} beta={best.params.beta}"
)
recs, out_json = recommend(draws, best=best, target_draw_no=target_draw, topk=topk)
print(f"## 1209회차 추천(단일 번호 Top{topk})" if target_draw == 1209 else f"## {target_draw}회차 추천(단일 번호 Top{topk})")
print_table(recs)
print()
print("## JSON")
print(json.dumps(out_json, ensure_ascii=False, indent=2, sort_keys=False))
if __name__ == "__main__":
main()