From d08e9060669479d8be2923914503fec15eea72aa Mon Sep 17 00:00:00 2001 From: dsyoon Date: Wed, 8 Apr 2026 19:33:26 +0900 Subject: [PATCH] Add final_practice.py for next-draw recommendations via final_BallFilter - Mirrors 3_Practice_22 flow: DataCrawler, optional API crawl, fixed combo, Monte Carlo filtered samples (default) or exhaustive mode - Caps total recommendations under 100; saves recommend_ball.final.json Made-with: Cursor --- README.md | 3 + final_practice.py | 319 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+) create mode 100644 final_practice.py diff --git a/README.md b/README.md index f685195..4294d18 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ - **`filter_model.py`** — `from final_BallFilter import BallFilter` 재노출. - **`train.py` / `valid.py`** — 구간별로 당첨 6개가 모든 필터를 통과한 회차 수 집계. - **`final_filterTest.py`** — `1_FilterTest_25.py`와 동일한 분석·(선택) MC 생존 추정. +- **`final_practice.py`** — `3_Practice_22.py`와 같이 다음 회차용 추천 조합 생성(`final_BallFilter`, 기본은 Monte Carlo로 100개 미만). ## 실행 (miniconda **ncue**) @@ -25,6 +26,8 @@ python valid.py python final_filterTest.py # 특정 회차 생존 조합 수 Monte Carlo 근사 python final_filterTest.py --mc-no 900 --mc-samples 12000 +# 다음 회차 추천(네트워크·텔레그램 없이 로컬만) +python final_practice.py --skip-data-crawl --skip-fetch-next --no-telegram ``` 동일 환경을 셸 스크립트로: diff --git a/final_practice.py b/final_practice.py new file mode 100644 index 0000000..03b285d --- /dev/null +++ b/final_practice.py @@ -0,0 +1,319 @@ +# `3_Practice_22.py` 흐름을 따르되 `final_BallFilter` + `lotto_history.txt`로 다음 회차 후보를 만듭니다. +import argparse +import itertools +import json +import os +import random +import time +from datetime import datetime, timedelta + +import pandas as pd +import requests + +from DataCrawler import DataCrawler +from final_BallFilter import BallFilter + +try: + from TelegramBot import TelegramBot +except Exception: # pragma: no cover + class TelegramBot: + def __init__(self, enable=True): + pass + + def sendMsg(self, msg): + print(msg) + + +class FinalPractice: + def __init__(self, resources_path): + self.resources_path = resources_path + self.bot = TelegramBot() + + def craw(self, lotto_history_base: str, drw_no=None): + """동행복권 API로 회차별 결과를 `lotto_history.json` / `.txt`에 저장합니다.""" + ball = None + if drw_no is not None: + json_fp = open(lotto_history_base + ".json", "a", encoding="utf-8") + text_fp = open(lotto_history_base + ".txt", "a", encoding="utf-8") + url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(drw_no) + res = requests.post(url, timeout=30) + result = res.json() + if result.get("returnValue") != "success": + json_fp.close() + text_fp.close() + return None + json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") + text_fp.write( + "%d,%d,%d,%d,%d,%d,%d,%d\n" + % ( + drw_no, + result["drwtNo1"], + result["drwtNo2"], + result["drwtNo3"], + result["drwtNo4"], + result["drwtNo5"], + result["drwtNo6"], + result["bnusNo"], + ) + ) + print( + "%d,%d,%d,%d,%d,%d,%d,%d" + % ( + drw_no, + result["drwtNo1"], + result["drwtNo2"], + result["drwtNo3"], + result["drwtNo4"], + result["drwtNo5"], + result["drwtNo6"], + result["bnusNo"], + ) + ) + ball = [ + result["drwtNo1"], + result["drwtNo2"], + result["drwtNo3"], + result["drwtNo4"], + result["drwtNo5"], + result["drwtNo6"], + result["bnusNo"], + ] + json_fp.close() + text_fp.close() + else: + json_fp = open(lotto_history_base + ".json", "w", encoding="utf-8") + text_fp = open(lotto_history_base + ".txt", "w", encoding="utf-8") + idx = 1 + while True: + url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(idx) + res = requests.post(url, timeout=30) + result = res.json() + if result.get("returnValue") != "success": + break + json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") + text_fp.write( + "%d,%d,%d,%d,%d,%d,%d,%d\n" + % ( + idx, + result["drwtNo1"], + result["drwtNo2"], + result["drwtNo3"], + result["drwtNo4"], + result["drwtNo5"], + result["drwtNo6"], + result["bnusNo"], + ) + ) + print( + "%d,%d,%d,%d,%d,%d,%d,%d" + % ( + idx, + result["drwtNo1"], + result["drwtNo2"], + result["drwtNo3"], + result["drwtNo4"], + result["drwtNo5"], + result["drwtNo6"], + result["bnusNo"], + ) + ) + ball = [ + result["drwtNo1"], + result["drwtNo2"], + result["drwtNo3"], + result["drwtNo4"], + result["drwtNo5"], + result["drwtNo6"], + result["bnusNo"], + ] + idx += 1 + time.sleep(0.5) + json_fp.close() + text_fp.close() + return ball + + def predict_fixed(self, result_list): + """참고 스크립트와 동일하게 고정 6개 조합 1개를 넣습니다.""" + result_list.append([6, 7, 10, 11, 20, 45]) + + def predict_filter_exhaustive(self, ball_filter: BallFilter, df_ball: pd.DataFrame, no: int, result_list): + """전체 8,145,060조합을 순회합니다(시간이 매우 오래 걸릴 수 있음).""" + candidates = list(range(1, 46)) + for idx, ball in enumerate(itertools.combinations(candidates, 6)): + if idx % 1_000_000 == 0: + print(" - {} processed...".format(idx)) + ball = list(ball) + filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) + if len(filter_type) > 0: + continue + result_list.append(ball) + + def predict_filter_montecarlo( + self, + ball_filter: BallFilter, + df_ball: pd.DataFrame, + no: int, + result_list: list, + max_recommend: int, + max_tries: int, + seed: int, + ): + """무작위 조합으로 필터를 통과하는 조합을 최대 `max_recommend`개까지 수집합니다.""" + rng = random.Random(seed) + pool = list(range(1, 46)) + tries = 0 + seen = set() + while len(result_list) < max_recommend and tries < max_tries: + tries += 1 + ball = sorted(rng.sample(pool, 6)) + key = tuple(ball) + if key in seen: + continue + seen.add(key) + filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) + if filter_type: + continue + result_list.append(ball) + print( + "Monte Carlo: 수집 {}개 / 시도 {}회 (상한 {}회)".format( + len(result_list), tries, max_tries + ) + ) + + +def load_df(resources_path: str) -> pd.DataFrame: + path = os.path.join(resources_path, "lotto_history.txt") + df_ball = pd.read_csv(path, header=None) + df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"] + return df_ball + + +def next_draw_no(df_ball: pd.DataFrame) -> int: + """파일에 있는 최신 회차의 다음 회차 번호.""" + return int(df_ball["no"].max()) + 1 + + +def weekend_ymd_strings(): + """표시용: 이번 주 토요일(추첨일) YMD, 지난 주 동일.""" + today = datetime.today() + if today.weekday() == 5: + if today.hour > 20: + this_weekend = today + timedelta(days=(12 - today.weekday())) + else: + this_weekend = today + timedelta(days=(5 - today.weekday())) + elif today.weekday() == 6: + this_weekend = today + timedelta(days=(12 - today.weekday())) + else: + this_weekend = today + timedelta(days=(5 - today.weekday())) + last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d") + ymd = this_weekend.strftime("%Y%m%d") + return ymd, last_weekend + + +def main(): + parser = argparse.ArgumentParser(description="다음 회차 필터 통과 조합 추천 (final_BallFilter)") + parser.add_argument("--project-home", default=".", help="프로젝트 루트") + parser.add_argument( + "--max-total", + type=int, + default=99, + help="추천 조합 총 개수 상한(고정 1개 포함, 기본 99 = 100 미만)", + ) + parser.add_argument("--max-tries", type=int, default=3_000_000, help="Monte Carlo 최대 시도 횟수") + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--exhaustive", action="store_true", help="전수 탐색(매우 느림)") + parser.add_argument("--skip-data-crawl", action="store_true", help="DataCrawler.excute 생략") + parser.add_argument("--skip-fetch-next", action="store_true", help="다음 회차 API 크롤 생략") + parser.add_argument("--no-telegram", action="store_true", help="텔레그램 전송 생략") + parser.add_argument("--draw-no", type=int, default=None, help="예측 대상 회차(미지정 시 파일 기준 다음 회차)") + args = parser.parse_args() + + project_home = os.path.abspath(args.project_home) + resources_path = os.path.join(project_home, "resources") + os.makedirs(resources_path, exist_ok=True) + + ymd, last_weekend = weekend_ymd_strings() + print("ymd(표시용): {}".format(ymd)) + + if not args.skip_data_crawl: + data_crawler = DataCrawler() + data_crawler.excute(resources_path) + + lotto_history_base = os.path.join(project_home, "resources", "lotto_history") + lotto_json = lotto_history_base + ".json" + practice = FinalPractice(resources_path) + if not args.skip_fetch_next and os.path.isfile(lotto_json): + with open(lotto_json, "r", encoding="utf-8") as f: + last_json = None + for line in f: + line = line.strip() + if line: + last_json = json.loads(line) + if last_json is not None: + nxt = int(last_json["drwNo"]) + 1 + practice.craw(lotto_history_base, drwNo=nxt) + + df_ball = load_df(resources_path) + no = args.draw_no if args.draw_no is not None else next_draw_no(df_ball) + print("예측 대상 회차 no: {}".format(no)) + + lotto_txt = os.path.join(resources_path, "lotto_history.txt") + ball_filter = BallFilter(lotto_txt) + + recommend_path = os.path.join(resources_path, "recommend_ball.final.json") + if os.path.isfile(recommend_path): + with open(recommend_path, "r", encoding="utf-8") as result_fp: + result_json = json.load(result_fp) + result_json[ymd] = [] + else: + result_json = {ymd: []} + + if args.no_telegram: + practice.bot = type("T", (), {"sendMsg": lambda self, m: None})() + + practice.predict_fixed(result_json[ymd]) + mc_cap = max(0, args.max_total - len(result_json[ymd])) + + if args.exhaustive: + practice.predict_filter_exhaustive(ball_filter, df_ball, no, result_json[ymd]) + else: + practice.predict_filter_montecarlo( + ball_filter, + df_ball, + no, + result_json[ymd], + max_recommend=mc_cap, + max_tries=args.max_tries, + seed=args.seed, + ) + + with open(recommend_path, "w", encoding="utf-8") as out_fp: + json.dump(result_json, out_fp, ensure_ascii=False) + + prev_row = df_ball[df_ball["no"] == no - 1] + if prev_row.empty: + p_no, p_ball = no - 1, [] + print("경고: no-1={} 행이 없어 이전 당첨 표시를 건너뜁니다.".format(no - 1)) + else: + p_no = int(prev_row["no"].iloc[0]) + p_ball = [int(prev_row["b{}".format(i)].iloc[0]) for i in range(1, 7)] + + p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format( + last_weekend, p_no, str(p_ball), ymd, no + ) + for i, ball in enumerate(result_json[ymd]): + p_str += " {}. {}\n".format((i + 1), str(ball)) + if (i + 1) % 100 == 0: + practice.bot.sendMsg("{}".format(p_str)) + p_str = "" + if len(result_json[ymd]) % 100 != 0: + practice.bot.sendMsg("{}".format(p_str)) + + size = len(result_json[ymd]) + print("recommend size: {}".format(size)) + print("저장: {}".format(recommend_path)) + print("done...") + + +if __name__ == "__main__": + main()