#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 로또 6/45: 1~N회차(기본 1208) 과거 1등(6개 번호) 기록을 기반으로, 다음 회차(기본 1209)에 대해 “점수 기반 확률(추정)”이 높은 순서대로 상위 K(기본 20) 단일 번호를 추천한다. ⚠️ 중요 고지(프로그램 출력에도 포함): - 로또는 원칙적으로 독립·무작위 추첨이므로 과거 데이터로 실제 당첨확률을 증가시킨다고 보장할 수 없다. - 본 프로그램의 “확률”은 통계적 추정/휴리스틱 점수이며, 실험/학습 목적이다. 요구사항 핵심(요약): - 입력: resources/lotto_history.txt / resources/lotto_history.json (둘 중 하나만 있어도 동작, 둘 다 있으면 교차검증 후 통합) - 회차별 6개 번호(1~45, 중복 없음)만 확실히 추출하면 됨(보너스 번호는 무시) - 피처 A: 베타-이항 posterior_mean - 피처 B: 최근성 가중 출현율(EWMA; 지수감쇠) - 피처 C: 마지막 출현 이후 갭 기반 약한 페널티(“안 나온지 오래”를 유리하게 가정하지 않도록 중립 수렴) - 피처 D: 스케일링(z-score) 후 가중합 점수 - 롤링 백테스트 그리드서치로 (wA,wB,wC), half_life, gscale 튜닝 - 출력: 표 + JSON 설계/구현 계획(의사코드): 1) 데이터 로드 - txt/json 각각 로드 시도(없으면 None) - txt: 정규식으로 정수 추출 → (회차, 번호6) 구성(보너스 등 추가 숫자는 무시/방어) - json: 가능한 경우 (drwNo, drwtNo1..6) 우선, 아니면 numbers 배열, 그래도 아니면 방어적으로 숫자 추출 2) 정규화/정합성 - normalize_draw(numbers): 1..45 범위만, 중복 제거, 6개면 정렬 후 채택 - 회차 중복/누락/이상치 로그 - 둘 다 있으면 동일 회차 교차검증(번호 불일치 개수 경고), 충돌 시 우선순위(더 많이/더 신뢰되는 쪽)로 통합 3) 행렬화 - 회차 오름차순 정렬된 draws를 만들고, X[t,i]=번호(i+1)가 t회차에 포함되면 1 (t=0..N-1) 4) 피처 계산(훈련 길이 t에 대해) - A: posterior_mean = (count + alpha) / (t + alpha + beta) - B: EWMA: r = exp(-ln2/half_life) numer_t = sum_{k=0..t-1} r^k * x_{t-1-k} denom_t = sum_{k=0..t-1} r^k ewma_freq = numer_t/denom_t - C: gap_score: gap = current_draw_no - last_seen_draw_no (없으면 큰 값) gap_score = -exp(-gap/gscale) # gap 작을수록(최근 출현) 페널티 강함, gap 커질수록 0으로 중립 수렴 - D: z-score로 A,B,C 각각 스케일링 후 score = wA*zA + wB*zB + wC*zC 5) 백테스트 & 튜닝(롤링) - warmup..(N-1)까지: train[0:t)로 score 산출 → topK 번호 추천 → 실제 draw[t]와 교집합(hit) 계산 - 그리드: weights: step 0.1, 합=1 half_life: [50,100,150,200,300,400] (+사용자 입력 포함) gscale: [10,20,30,50] (+사용자 입력 포함) - 지표: overall mean hit@K, overall P(hit>=1) recent window(마지막 valid_last 예측) mean hit@K, P(hit>=1) objective = 0.7*recent_mean_hit + 0.3*overall_mean_hit (타이브레이크로 hit>=1) 6) 최종 추천 - best params로 전체(1..N) 학습 후 next_draw=N+1 추천 topK 출력(표+JSON) """ from __future__ import annotations import argparse import json import math import os import re import sys from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple try: import numpy as np # type: ignore except Exception: np = None # type: ignore NUM_MIN = 1 NUM_MAX = 45 NUM_COUNT = 45 def eprint(*args: Any, **kwargs: Any) -> None: print(*args, file=sys.stderr, **kwargs) def normalize_draw(numbers: Sequence[int]) -> Optional[Tuple[int, ...]]: """번호 리스트에서 1..45 범위의 중복 없는 6개를 추출/정규화(정렬)한다.""" clean = [int(x) for x in numbers if NUM_MIN <= int(x) <= NUM_MAX] uniq = sorted(set(clean)) if len(uniq) != 6: return None return tuple(uniq) def _extract_ints(s: str) -> List[int]: return [int(x) for x in re.findall(r"\d+", s)] def load_history_txt(path: str) -> Dict[int, Tuple[int, ...]]: """ txt는 포맷이 확정되지 않았으므로 방어적으로 파싱한다. - 기본: 라인별로 정수 추출 후, 첫 번째 정수를 회차로 보고 이후 숫자 중 6개를 번호로 구성(보너스 등 추가는 무시) - 실패 시: 전체 정수 스트림을 블록(8,7) 단위로 재시도 """ if not os.path.exists(path): raise FileNotFoundError(path) by_round: Dict[int, Tuple[int, ...]] = {} bad_lines = 0 with open(path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() for line in lines: ints = _extract_ints(line) if len(ints) < 7: continue rnd = ints[0] rest = ints[1:] # 흔한 케이스: 회차 + 6개 + 보너스(7개) candidates = [] if len(rest) >= 6: candidates.append(rest[:6]) candidates.append(rest[-6:]) candidates.append(list(dict.fromkeys([x for x in rest if NUM_MIN <= x <= NUM_MAX]))[:6]) draw = None for cand in candidates: draw = normalize_draw(cand) if draw is not None: break if draw is None: bad_lines += 1 continue if rnd in by_round and by_round[rnd] != draw: eprint(f"[WARN][txt] 회차 {rnd} 중복/불일치: {by_round[rnd]} vs {draw} (첫 값 유지)") continue by_round[rnd] = draw # 라인 기반 파싱이 거의 실패하면 전체 스트림 기반으로 복구 시도 if len(by_round) < max(50, len(lines) // 10): ints_all: List[int] = [] for line in lines: ints_all.extend(_extract_ints(line)) def try_block(block_size: int) -> Dict[int, Tuple[int, ...]]: out: Dict[int, Tuple[int, ...]] = {} if block_size <= 0 or len(ints_all) < block_size: return out if len(ints_all) % block_size != 0: # 나머지가 있어도 앞부분만 시도 limit = len(ints_all) - (len(ints_all) % block_size) else: limit = len(ints_all) ok = 0 for i in range(0, limit, block_size): block = ints_all[i : i + block_size] if len(block) < 7: continue rnd = block[0] rest = block[1:] if len(rest) < 6: continue draw = normalize_draw(rest[:6]) if draw is None: continue out[rnd] = draw ok += 1 return out recovered8 = try_block(8) # (회차 + 6 + 보너스) 가능성 recovered7 = try_block(7) # (회차 + 6) 가능성 recovered = recovered8 if len(recovered8) >= len(recovered7) else recovered7 if len(recovered) > len(by_round): eprint(f"[INFO][txt] 라인 파싱이 약함 → 스트림 기반 복구 사용: {len(recovered)}회차") by_round = recovered if bad_lines: eprint(f"[WARN][txt] 파싱 실패 라인 수: {bad_lines}") return by_round def _coerce_json_obj_to_round_and_numbers(obj: Any) -> Optional[Tuple[int, List[int]]]: """JSON 오브젝트에서 (회차, 번호후보들)을 최대한 방어적으로 추출한다.""" if isinstance(obj, dict): # 1) 대표 구조: drwNo + drwtNo1..6 if "drwNo" in obj: try: rnd = int(obj["drwNo"]) except Exception: rnd = None # type: ignore nums: List[int] = [] for k in ("drwtNo1", "drwtNo2", "drwtNo3", "drwtNo4", "drwtNo5", "drwtNo6"): if k in obj: try: nums.append(int(obj[k])) except Exception: pass if rnd is not None and len(nums) >= 6: return rnd, nums[:6] # 2) 다른 흔한 구조: (round, numbers) for rk in ("round", "draw", "drawNo", "no", "id"): if rk in obj: try: rnd = int(obj[rk]) except Exception: continue nums = [] if "numbers" in obj and isinstance(obj["numbers"], (list, tuple)): for x in obj["numbers"]: try: nums.append(int(x)) except Exception: pass # dict 내부에 숫자들이 흩어져 있어도 방어적으로 수집 if len(nums) < 6: nums = [] for v in obj.values(): if isinstance(v, (int, float)) and NUM_MIN <= int(v) <= NUM_MAX: nums.append(int(v)) if len(nums) >= 6: return rnd, nums[:6] # 3) 최후: dict를 문자열로 만들고 숫자 추출(회차/번호 분리 어려움 → 실패 처리) return None if isinstance(obj, (list, tuple)): # list 자체가 [회차, n1..] 형태일 수도 ints = [] for x in obj: if isinstance(x, (int, float)): ints.append(int(x)) elif isinstance(x, str): ints.extend(_extract_ints(x)) if len(ints) >= 7: return ints[0], ints[1:7] return None def load_history_json(path: str) -> Dict[int, Tuple[int, ...]]: """ json은 다음을 우선 시도: - 라인별 JSON(dict) 파싱 (현재 리소스는 라인-델리미티드 dict 형태) - 전체 파일 JSON 파싱 (array/object)도 방어적으로 처리 """ if not os.path.exists(path): raise FileNotFoundError(path) by_round: Dict[int, Tuple[int, ...]] = {} bad = 0 def ingest_obj(obj: Any) -> None: nonlocal bad, by_round got = _coerce_json_obj_to_round_and_numbers(obj) if not got: bad += 1 return rnd, nums = got draw = normalize_draw(nums) if draw is None: bad += 1 return if rnd in by_round and by_round[rnd] != draw: eprint(f"[WARN][json] 회차 {rnd} 중복/불일치: {by_round[rnd]} vs {draw} (첫 값 유지)") return by_round[rnd] = draw with open(path, "r", encoding="utf-8", errors="ignore") as f: text = f.read() # 1) 라인별 JSON 우선 (여기선 거의 확실) lines = [ln.strip() for ln in text.splitlines() if ln.strip()] line_success = 0 for ln in lines: # 단일 JSON object 라인일 가능성 if not (ln.startswith("{") and ln.endswith("}")): continue try: obj = json.loads(ln) ingest_obj(obj) line_success += 1 except Exception: continue # 2) 라인 방식이 거의 실패하면 전체 JSON 파싱 if line_success < max(50, len(lines) // 10): try: obj = json.loads(text) if isinstance(obj, list): for item in obj: ingest_obj(item) elif isinstance(obj, dict): # dict 안에 list가 들어있을 수도 if "data" in obj and isinstance(obj["data"], list): for item in obj["data"]: ingest_obj(item) else: # 값들 중 dict/list를 훑기 for v in obj.values(): if isinstance(v, (list, dict)): ingest_obj(v) except Exception: pass if bad: eprint(f"[WARN][json] 파싱 실패/무시 레코드 수(추정): {bad}") return by_round def cross_validate_and_merge( txt_data: Optional[Dict[int, Tuple[int, ...]]], json_data: Optional[Dict[int, Tuple[int, ...]]], ) -> Dict[int, Tuple[int, ...]]: """둘 다 있을 경우 교차검증 후 통합. 충돌 시 더 많은 레코드를 가진 쪽을 우선.""" if not txt_data and not json_data: return {} if txt_data and not json_data: return dict(txt_data) if json_data and not txt_data: return dict(json_data) assert txt_data is not None and json_data is not None common = sorted(set(txt_data) & set(json_data)) mismatch = 0 for rnd in common: if txt_data[rnd] != json_data[rnd]: mismatch += 1 if mismatch <= 20: eprint(f"[WARN] 교차검증 불일치 회차 {rnd}: txt={txt_data[rnd]} json={json_data[rnd]}") if mismatch: eprint(f"[WARN] 교차검증: 공통 {len(common)}개 중 불일치 {mismatch}개") else: eprint(f"[INFO] 교차검증: 공통 {len(common)}개 모두 일치") base = json_data if len(json_data) >= len(txt_data) else txt_data other = txt_data if base is json_data else json_data merged: Dict[int, Tuple[int, ...]] = dict(base) added = 0 conflict = 0 for rnd, draw in other.items(): if rnd not in merged: merged[rnd] = draw added += 1 else: if merged[rnd] != draw: conflict += 1 # 충돌은 base 유지 if added: eprint(f"[INFO] 통합: 누락 회차 추가 {added}개") if conflict: eprint(f"[WARN] 통합: 충돌 {conflict}개(우선 데이터 유지)") return merged def sanitize_and_sort_draws( merged: Dict[int, Tuple[int, ...]], min_round: int = 1, max_round: Optional[int] = None, ) -> List[Tuple[int, Tuple[int, ...]]]: """회차 정렬 + 범위/이상치 제거.""" items = [] dropped = 0 for rnd, draw in merged.items(): if rnd < min_round: dropped += 1 continue if max_round is not None and rnd > max_round: dropped += 1 continue if draw is None or len(draw) != 6: dropped += 1 continue if any((x < NUM_MIN or x > NUM_MAX) for x in draw) or len(set(draw)) != 6: dropped += 1 continue items.append((int(rnd), tuple(sorted(draw)))) items.sort(key=lambda x: x[0]) if dropped: eprint(f"[WARN] 이상치/범위 밖/형식 불량 제거: {dropped}개") return items def build_indicator_matrix(draws: List[Tuple[int, Tuple[int, ...]]]) -> Tuple["np.ndarray", "np.ndarray"]: """draws -> (draw_nos, X) where X[t,i] in {0,1} for number i+1.""" if np is None: raise RuntimeError("numpy가 필요합니다(성능/구현 단순화를 위해). requirements.txt를 확인하세요.") draw_nos = np.array([rnd for rnd, _ in draws], dtype=np.int32) X = np.zeros((len(draws), NUM_COUNT), dtype=np.int8) for t, (_, nums) in enumerate(draws): for n in nums: X[t, n - 1] = 1 return draw_nos, X def zscore(x: "np.ndarray", eps: float = 1e-12) -> "np.ndarray": mu = float(np.mean(x)) sd = float(np.std(x)) if sd < eps: return x * 0.0 return (x - mu) / sd def zscore_rowwise(M: "np.ndarray", eps: float = 1e-12) -> "np.ndarray": """행(row) 단위 z-score. M: (T,45) -> (T,45)""" mu = np.mean(M, axis=1, keepdims=True) sd = np.std(M, axis=1, keepdims=True) out = (M - mu).astype(np.float64, copy=False) # np.where는 양쪽 branch를 평가해서 RuntimeWarning이 날 수 있으므로 np.divide(where=...) 사용 z = np.zeros_like(out, dtype=np.float64) np.divide(out, sd, out=z, where=sd >= eps) return z @dataclass(frozen=True) class ScoringParams: alpha: float = 1.0 beta: float = 1.0 half_life: int = 200 gscale: float = 20.0 wA: float = 0.5 wB: float = 0.4 wC: float = 0.1 def normalized_weights(self) -> "ScoringParams": s = self.wA + self.wB + self.wC if s <= 0: return self return ScoringParams( alpha=self.alpha, beta=self.beta, half_life=self.half_life, gscale=self.gscale, wA=self.wA / s, wB=self.wB / s, wC=self.wC / s, ) def precompute_last_seen(X: "np.ndarray") -> "np.ndarray": """last_seen_end[t,i] = t개 훈련(0..t-1 포함) 이후, 번호 i의 마지막 등장 index (없으면 -1). shape (N+1,45).""" N = X.shape[0] last_seen_end = np.full((N + 1, NUM_COUNT), -1, dtype=np.int32) last = np.full((NUM_COUNT,), -1, dtype=np.int32) last_seen_end[0] = last for idx in range(N): hits = np.nonzero(X[idx])[0] if hits.size: last[hits] = idx # idx번째 draw까지 포함한 상태가 train_size=idx+1 last_seen_end[idx + 1] = last return last_seen_end def precompute_ewma_numer_denoms(X: "np.ndarray", half_life: int) -> Tuple["np.ndarray", "np.ndarray"]: """ EWMA numerator/denominator를 훈련 길이별로 미리 계산. - numer_end[t,i] = t개 훈련(0..t-1)의 weighted sum - denom_end[t] = sum_{k=0..t-1} r^k """ N = X.shape[0] hl = max(1, int(half_life)) lam = math.log(2.0) / float(hl) r = math.exp(-lam) # per-step decay numer_end = np.zeros((N + 1, NUM_COUNT), dtype=np.float64) denom_end = np.zeros((N + 1,), dtype=np.float64) for t in range(1, N + 1): numer_end[t] = X[t - 1].astype(np.float64) + r * numer_end[t - 1] denom_end[t] = 1.0 + r * denom_end[t - 1] denom_end[0] = 0.0 return numer_end, denom_end def compute_features_at_train_size( *, train_size: int, draw_nos: "np.ndarray", X: "np.ndarray", cum_counts: "np.ndarray", last_seen_end: "np.ndarray", ewma_numer_end: "np.ndarray", ewma_denom_end: "np.ndarray", params: ScoringParams, ) -> Dict[str, "np.ndarray"]: """ train_size=t인 시점의 피처를 45개 번호에 대해 계산. 반환: posterior_mean (A), ewma_freq (B), gap (raw), gap_score (C raw), zA,zB,zC, score """ t = int(train_size) if t <= 0: raise ValueError("train_size must be >= 1") # counts over first t draws counts = cum_counts[t - 1].astype(np.float64) alpha = float(params.alpha) beta = float(params.beta) posterior_mean = (counts + alpha) / (float(t) + alpha + beta) denom = float(ewma_denom_end[t]) if denom <= 0: ewma_freq = np.zeros((NUM_COUNT,), dtype=np.float64) else: ewma_freq = ewma_numer_end[t] / denom # gap based on draw numbers (회차) to match "마지막 출현 이후 회차 수" current_draw_no = int(draw_nos[t - 1]) last_idx = last_seen_end[t].astype(np.int32) gap = np.zeros((NUM_COUNT,), dtype=np.float64) seen_mask = last_idx >= 0 if np.any(seen_mask): last_draw = draw_nos[last_idx.clip(min=0)] gap[seen_mask] = (current_draw_no - last_draw[seen_mask]).astype(np.float64) # never seen -> large gap (중립 수렴용) gap[~seen_mask] = float(current_draw_no + 999) gscale = float(params.gscale) if gscale <= 0: gscale = 20.0 # 최근(갭 짧음)일수록 -1에 가깝게, 오래되면 0으로 수렴 (U자 가정 금지) gap_score = -np.exp(-gap / gscale) zA = zscore(posterior_mean) zB = zscore(ewma_freq) zC = zscore(gap_score) w = params.normalized_weights() score = w.wA * zA + w.wB * zB + w.wC * zC return { "posterior_mean": posterior_mean, "ewma_freq": ewma_freq, "gap": gap, "gap_score": gap_score, "zA": zA, "zB": zB, "zC": zC, "score": score, } @dataclass class BacktestResult: params: ScoringParams topk: int warmup: int n_eval: int overall_mean_hit: float overall_hit1_rate: float recent_mean_hit: float recent_hit1_rate: float objective: float def _iter_weight_grid(step: float = 0.1) -> Iterable[Tuple[float, float, float]]: # wA, wB in [0..1], wC=1-wA-wB, step grid s = float(step) if s <= 0: s = 0.1 # to avoid float drift, operate on integer ticks ticks = int(round(1.0 / s)) for a in range(ticks + 1): for b in range(ticks + 1 - a): c = ticks - a - b yield a / ticks, b / ticks, c / ticks def backtest_and_tune( draws: List[Tuple[int, Tuple[int, ...]]], *, topk: int = 20, warmup: int = 300, valid_last: int = 200, alpha: float = 1.0, beta: float = 1.0, half_life_grid: Sequence[int] = (50, 100, 150, 200, 300, 400), gscale_grid: Sequence[float] = (10.0, 20.0, 30.0, 50.0), weight_step: float = 0.1, ) -> BacktestResult: if np is None: raise RuntimeError("numpy가 필요합니다.") if len(draws) < warmup + 2: raise ValueError(f"데이터가 너무 적습니다: {len(draws)}회차 (warmup={warmup})") draw_nos, X = build_indicator_matrix(draws) N = int(X.shape[0]) topk = max(1, min(int(topk), NUM_COUNT)) # 누적 카운트 (N,45) cum_counts = np.cumsum(X, axis=0, dtype=np.int32) # counts_end[t] = 0..t-1까지의 카운트 (shape N+1,45) counts_end = np.zeros((N + 1, NUM_COUNT), dtype=np.int32) counts_end[1:] = cum_counts last_seen_end = precompute_last_seen(X) # gap_end[t,i] = train_size=t 시점의 갭(회차 기준). (shape N+1,45) gap_end = np.zeros((N + 1, NUM_COUNT), dtype=np.float64) gap_end[0] = 0.0 for t in range(1, N + 1): current_draw_no = int(draw_nos[t - 1]) last_idx = last_seen_end[t] seen = last_idx >= 0 gap = np.empty((NUM_COUNT,), dtype=np.float64) if np.any(seen): gap[seen] = (current_draw_no - draw_nos[last_idx[seen]]).astype(np.float64) gap[~seen] = float(current_draw_no + 999) gap_end[t] = gap # 평가 구간: train_size=t로 target index=t 예측 start_t = max(1, int(warmup)) end_t = N - 1 n_eval = end_t - start_t + 1 if n_eval <= 0: raise ValueError("평가 구간이 비었습니다. warmup을 줄이세요.") recent_len = max(1, min(int(valid_last), n_eval)) recent_start_t = end_t - recent_len + 1 # 실제 정답 마스크 actual_mask = X.astype(np.int8) # 그리드 정리 hl_list = sorted(set(int(x) for x in half_life_grid if int(x) > 0)) gs_list = sorted(set(float(x) for x in gscale_grid if float(x) > 0)) weight_list = list(_iter_weight_grid(step=weight_step)) W = np.array(weight_list, dtype=np.float64).T # (3,M) M = int(W.shape[1]) eprint( f"[INFO] 튜닝 시작(가속): weights={M} half_life={len(hl_list)} gscale={len(gs_list)} " f"→ 총 {M*len(hl_list)*len(gs_list)} 조합, eval={n_eval} (recent={recent_len})" ) # A는 half_life/gscale과 무관 (alpha,beta 고정) t_vec = np.arange(N + 1, dtype=np.float64).reshape(-1, 1) # (N+1,1) A_end = (counts_end.astype(np.float64) + float(alpha)) / (t_vec + float(alpha) + float(beta)) zA_end = zscore_rowwise(A_end) best: Optional[BacktestResult] = None nums = np.arange(1, NUM_COUNT + 1, dtype=np.int32) # tie-break key for hl in hl_list: ewma_numer_end, ewma_denom_end = precompute_ewma_numer_denoms(X, hl) denom = ewma_denom_end.reshape(-1, 1) B_end = np.zeros_like(ewma_numer_end, dtype=np.float64) np.divide(ewma_numer_end, denom, out=B_end, where=denom > 0) zB_end = zscore_rowwise(B_end) for gs in gs_list: gap_score_end = -np.exp(-gap_end / float(gs)) zC_end = zscore_rowwise(gap_score_end) # 누적 hit/hit1 (M개 weight에 대해 벡터) total_hit = np.zeros((M,), dtype=np.int32) total_hit1 = np.zeros((M,), dtype=np.int32) recent_hit = np.zeros((M,), dtype=np.int32) recent_hit1 = np.zeros((M,), dtype=np.int32) for t in range(start_t, end_t + 1): # z-features at train_size=t : (45,3) Z = np.stack([zA_end[t], zB_end[t], zC_end[t]], axis=1) # (45,3) scores = Z @ W # (45,M) # topK indices per column (order not needed for hit 계산) picks = np.argpartition(-scores, topk - 1, axis=0)[:topk, :] # (topk,M) hits = actual_mask[t][picks].sum(axis=0).astype(np.int32) # (M,) total_hit += hits total_hit1 += (hits >= 1).astype(np.int32) if t >= recent_start_t: recent_hit += hits recent_hit1 += (hits >= 1).astype(np.int32) overall_mean_hit = total_hit.astype(np.float64) / float(n_eval) overall_hit1_rate = total_hit1.astype(np.float64) / float(n_eval) recent_mean_hit = recent_hit.astype(np.float64) / float(recent_len) recent_hit1_rate = recent_hit1.astype(np.float64) / float(recent_len) objective = 0.7 * recent_mean_hit + 0.3 * overall_mean_hit # (hl,gs)에서 best weight 선택: objective, recent_hit1, overall_mean, overall_hit1 # numpy.lexsort는 마지막 키가 1순위(오름차순) → 음수로 내림차순 효과 order = np.lexsort( ( -overall_hit1_rate, -overall_mean_hit, -recent_hit1_rate, -objective, ) ) best_w_idx = int(order[0]) wA, wB, wC = float(W[0, best_w_idx]), float(W[1, best_w_idx]), float(W[2, best_w_idx]) params = ScoringParams(alpha=alpha, beta=beta, half_life=hl, gscale=gs, wA=wA, wB=wB, wC=wC).normalized_weights() cand = BacktestResult( params=params, topk=topk, warmup=warmup, n_eval=n_eval, overall_mean_hit=float(overall_mean_hit[best_w_idx]), overall_hit1_rate=float(overall_hit1_rate[best_w_idx]), recent_mean_hit=float(recent_mean_hit[best_w_idx]), recent_hit1_rate=float(recent_hit1_rate[best_w_idx]), objective=float(objective[best_w_idx]), ) if best is None: best = cand else: if (cand.objective > best.objective + 1e-12) or ( abs(cand.objective - best.objective) <= 1e-12 and ( cand.recent_hit1_rate > best.recent_hit1_rate + 1e-12 or ( abs(cand.recent_hit1_rate - best.recent_hit1_rate) <= 1e-12 and ( cand.overall_mean_hit > best.overall_mean_hit + 1e-12 or ( abs(cand.overall_mean_hit - best.overall_mean_hit) <= 1e-12 and cand.overall_hit1_rate > best.overall_hit1_rate + 1e-12 ) ) ) ) ): best = cand assert best is not None return best def _reason_string(zA: float, zB: float, gap: float, gap_score: float, params: ScoringParams) -> str: parts: List[str] = [] if zB >= 0.7: parts.append("최근성↑") elif zB <= -0.7: parts.append("최근성↓") if zA >= 0.7: parts.append("장기빈도↑") elif zA <= -0.7: parts.append("장기빈도↓") # gap_score는 음수(페널티), 0에 가까울수록 중립 if gap_score <= -0.7: parts.append("최근출현페널티↑") elif gap_score >= -0.1: parts.append("갭영향↓(중립)") else: parts.append("갭영향(약)") # 숫자로 한 번 더 parts.append(f"gap={int(round(gap))}") return ", ".join(parts) def recommend( draws: List[Tuple[int, Tuple[int, ...]]], *, best: BacktestResult, target_draw_no: int, topk: int, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: if np is None: raise RuntimeError("numpy가 필요합니다.") draw_nos, X = build_indicator_matrix(draws) N = int(X.shape[0]) cum_counts = np.cumsum(X, axis=0, dtype=np.int32) last_seen_end = precompute_last_seen(X) ewma_numer_end, ewma_denom_end = precompute_ewma_numer_denoms(X, best.params.half_life) feats = compute_features_at_train_size( train_size=N, draw_nos=draw_nos, X=X, cum_counts=cum_counts, last_seen_end=last_seen_end, ewma_numer_end=ewma_numer_end, ewma_denom_end=ewma_denom_end, params=best.params, ) score = feats["score"] nums = np.arange(1, NUM_COUNT + 1, dtype=np.int32) order = np.lexsort((nums, -score)) order = order[:topk] recs: List[Dict[str, Any]] = [] for rank, idx in enumerate(order, start=1): n = int(idx + 1) rec = { "rank": rank, "number": n, "score": float(score[idx]), "posterior_mean": float(feats["posterior_mean"][idx]), "ewma_freq": float(feats["ewma_freq"][idx]), "gap": float(feats["gap"][idx]), "reason": _reason_string( float(feats["zA"][idx]), float(feats["zB"][idx]), float(feats["gap"][idx]), float(feats["gap_score"][idx]), best.params, ), "feature_z": { "A": float(feats["zA"][idx]), "B": float(feats["zB"][idx]), "C": float(feats["zC"][idx]), }, "weights": {"wA": best.params.wA, "wB": best.params.wB, "wC": best.params.wC}, } recs.append(rec) out_json = { "draw": int(target_draw_no), "topk": int(topk), "disclaimer": [ "로또는 원칙적으로 독립·무작위 추첨이므로 과거 데이터로 실제 당첨확률을 증가시킨다고 보장할 수 없습니다.", "본 프로그램의 '확률'은 통계적 추정/휴리스틱 점수이며, 실험/학습 목적입니다.", ], "best_params": { "alpha": best.params.alpha, "beta": best.params.beta, "half_life": best.params.half_life, "gscale": best.params.gscale, "wA": best.params.wA, "wB": best.params.wB, "wC": best.params.wC, }, "backtest": { "warmup": best.warmup, "n_eval": best.n_eval, "overall_mean_hit": best.overall_mean_hit, "overall_hit>=1_rate": best.overall_hit1_rate, "recent_mean_hit": best.recent_mean_hit, "recent_hit>=1_rate": best.recent_hit1_rate, "objective": best.objective, }, "recommendations": recs, } return recs, out_json def print_table(recs: List[Dict[str, Any]]) -> None: # 콘솔 표: rank, number, score, posterior_mean, ewma_freq, last_gap, reason headers = ["rank", "number", "score", "posterior_mean", "ewma_freq", "gap", "reason"] rows = [] for r in recs: rows.append([ str(r["rank"]), str(r["number"]), f'{r["score"]:+.6f}', f'{r["posterior_mean"]:.6f}', f'{r["ewma_freq"]:.6f}', f'{r["gap"]:.0f}', str(r["reason"]), ]) widths = [len(h) for h in headers] for row in rows: for i, cell in enumerate(row): widths[i] = max(widths[i], len(cell)) def fmt_row(row: Sequence[str]) -> str: return " | ".join(row[i].ljust(widths[i]) for i in range(len(headers))) sep = "-+-".join("-" * w for w in widths) print(fmt_row(headers)) print(sep) for row in rows: print(fmt_row(row)) def main() -> None: parser = argparse.ArgumentParser(description="로또 6/45 점수 기반(휴리스틱) 단일 번호 TopK 추천") parser.add_argument("--topk", type=int, default=20, help="추천할 단일 번호 개수 (기본 20)") parser.add_argument("--half_life", type=int, default=200, help="EWMA half-life (튜닝 그리드에 포함, 기본 200)") parser.add_argument("--seed", type=int, default=42, help="재현성용 seed (현재는 타이브레이크에 사용하지 않음, 기본 42)") parser.add_argument("--warmup", type=int, default=300, help="롤링 검증 워밍업(학습 최소 회차 수), 기본 300") parser.add_argument("--valid_last", type=int, default=200, help="최근 성능 가중 평가에 쓰는 마지막 예측 개수, 기본 200") parser.add_argument("--alpha", type=float, default=1.0, help="Beta prior alpha (기본 1)") parser.add_argument("--beta", type=float, default=1.0, help="Beta prior beta (기본 1)") parser.add_argument( "--prior", type=str, default="uniform", choices=["uniform", "near_6_45"], help="prior 선택: uniform=Beta(1,1), near_6_45=Beta(6,39)", ) parser.add_argument( "--max_round", type=int, default=1208, help="사용할 최대 회차 (기본 1208). 데이터가 더 많으면 잘라서 사용.", ) parser.add_argument( "--no_tune", action="store_true", help="튜닝(그리드서치) 비활성화: 기본 파라미터로만 추천", ) parser.add_argument( "--half_life_grid", type=str, default="50,100,150,200,300,400", help="튜닝 half_life 후보(쉼표구분). 기본 '50,100,150,200,300,400'", ) parser.add_argument( "--gscale_grid", type=str, default="10,20,30,50", help="튜닝 gscale 후보(쉼표구분). 기본 '10,20,30,50'", ) parser.add_argument( "--weight_step", type=float, default=0.1, help="가중치 그리드 step (합=1, 기본 0.1)", ) args = parser.parse_args() # seed은 현재 타이브레이크에 랜덤을 쓰지 않지만, 확장 대비로 고정 try: import random random.seed(int(args.seed)) except Exception: pass print("⚠️ 중요 고지") print("- 로또는 원칙적으로 독립·무작위 추첨이므로 과거 데이터로 실제 당첨확률을 증가시킨다고 보장할 수 없습니다.") print("- 본 프로그램의 '확률'은 통계적 추정/휴리스틱 점수이며, 실험/학습 목적입니다.") print() # prior shortcut alpha = float(args.alpha) beta = float(args.beta) if args.prior == "near_6_45": alpha, beta = 6.0, 39.0 eprint("[INFO] prior=near_6_45 → Beta(6,39)") else: eprint("[INFO] prior=uniform → Beta(alpha,beta) (기본 1,1)") base_dir = os.path.dirname(os.path.abspath(__file__)) txt_path = os.path.join(base_dir, "resources", "lotto_history.txt") json_path = os.path.join(base_dir, "resources", "lotto_history.json") txt_data = None json_data = None try: txt_data = load_history_txt(txt_path) eprint(f"[INFO] txt 로드: {len(txt_data)}회차") except FileNotFoundError: eprint(f"[INFO] txt 없음: {txt_path}") except Exception as ex: eprint(f"[WARN] txt 로드 실패: {ex}") try: json_data = load_history_json(json_path) eprint(f"[INFO] json 로드: {len(json_data)}회차") except FileNotFoundError: eprint(f"[INFO] json 없음: {json_path}") except Exception as ex: eprint(f"[WARN] json 로드 실패: {ex}") merged = cross_validate_and_merge(txt_data, json_data) if not merged: eprint("[ERROR] 입력 데이터가 없습니다. resources/lotto_history.txt 또는 .json 중 하나가 필요합니다.") sys.exit(2) max_round = int(args.max_round) if args.max_round else None draws = sanitize_and_sort_draws(merged, min_round=1, max_round=max_round) if not draws: eprint("[ERROR] 유효한 회차/번호가 없습니다.") sys.exit(2) # 연속성/누락 간단 체크 round_list = [rnd for rnd, _ in draws] if round_list[0] != 1: eprint(f"[WARN] 시작 회차가 1이 아님: {round_list[0]}") if max_round is not None and round_list[-1] != max_round: eprint(f"[WARN] 최대 회차가 {max_round}가 아님: {round_list[-1]}") missing = [] if round_list: expected = set(range(round_list[0], round_list[-1] + 1)) missing = sorted(expected - set(round_list)) if missing: eprint(f"[WARN] 누락 회차 수: {len(missing)} (예: {missing[:10]}{'...' if len(missing)>10 else ''})") last_round = round_list[-1] target_draw = last_round + 1 if max_round is not None: # 사용자가 max_round=1208로 고정했으면 target은 1209가 되도록 맞춤 if last_round != max_round: eprint(f"[WARN] max_round={max_round}로 잘랐지만 실제 마지막 회차={last_round}. target={target_draw}") # 튜닝 그리드 파라미터 구성 def parse_num_list(s: str, typ: Any) -> List[Any]: out = [] for tok in (s or "").split(","): tok = tok.strip() if not tok: continue try: out.append(typ(tok)) except Exception: pass return out half_life_grid = parse_num_list(args.half_life_grid, int) if int(args.half_life) not in set(int(x) for x in half_life_grid): half_life_grid.append(int(args.half_life)) half_life_grid = sorted(set(int(x) for x in half_life_grid if int(x) > 0)) gscale_grid = parse_num_list(args.gscale_grid, float) gscale_grid = sorted(set(float(x) for x in gscale_grid if float(x) > 0)) if not gscale_grid: gscale_grid = [20.0] topk = int(args.topk) warmup = int(args.warmup) valid_last = int(args.valid_last) if np is None: eprint("[ERROR] numpy가 설치되어 있지 않아 실행할 수 없습니다. (성능/구현 단순화를 위해 numpy를 사용합니다)") sys.exit(2) if args.no_tune: # 튜닝 없이 기본 값 사용 best = BacktestResult( params=ScoringParams( alpha=alpha, beta=beta, half_life=int(args.half_life), gscale=float(gscale_grid[0]), wA=0.5, wB=0.4, wC=0.1, ).normalized_weights(), topk=topk, warmup=warmup, n_eval=0, overall_mean_hit=float("nan"), overall_hit1_rate=float("nan"), recent_mean_hit=float("nan"), recent_hit1_rate=float("nan"), objective=float("nan"), ) eprint("[INFO] --no_tune: 기본 파라미터로 추천만 수행") else: best = backtest_and_tune( draws, topk=topk, warmup=warmup, valid_last=valid_last, alpha=alpha, beta=beta, half_life_grid=half_life_grid, gscale_grid=gscale_grid, weight_step=float(args.weight_step), ) eprint("[INFO] 튜닝 완료") eprint( f"[BEST] objective={best.objective:.6f} " f"recent_mean_hit={best.recent_mean_hit:.4f} recent_hit>=1={best.recent_hit1_rate:.4f} | " f"overall_mean_hit={best.overall_mean_hit:.4f} overall_hit>=1={best.overall_hit1_rate:.4f} | " f"half_life={best.params.half_life} gscale={best.params.gscale} " f"wA={best.params.wA:.1f} wB={best.params.wB:.1f} wC={best.params.wC:.1f} " f"alpha={best.params.alpha} beta={best.params.beta}" ) recs, out_json = recommend(draws, best=best, target_draw_no=target_draw, topk=topk) print(f"## 1209회차 추천(단일 번호 Top{topk})" if target_draw == 1209 else f"## {target_draw}회차 추천(단일 번호 Top{topk})") print_table(recs) print() print("## JSON") print(json.dumps(out_json, ensure_ascii=False, indent=2, sort_keys=False)) if __name__ == "__main__": main()