# `3_Practice_22.py` 흐름을 따르되 `final_BallFilter` + `lotto_history.txt`로 다음 회차 후보를 만듭니다. import argparse import itertools import json import os import random import time from datetime import datetime, timedelta import pandas as pd import requests from DataCrawler import DataCrawler from final_BallFilter import BallFilter try: from TelegramBot import TelegramBot except Exception: # pragma: no cover class TelegramBot: def __init__(self, enable=True): pass def sendMsg(self, msg): print(msg) class FinalPractice: def __init__(self, resources_path): self.resources_path = resources_path self.bot = TelegramBot() def craw(self, lotto_history_base: str, drw_no=None): """동행복권 API로 회차별 결과를 `lotto_history.json` / `.txt`에 저장합니다.""" ball = None if drw_no is not None: json_fp = open(lotto_history_base + ".json", "a", encoding="utf-8") text_fp = open(lotto_history_base + ".txt", "a", encoding="utf-8") url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(drw_no) res = requests.post(url, timeout=30) result = res.json() if result.get("returnValue") != "success": json_fp.close() text_fp.close() return None json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") text_fp.write( "%d,%d,%d,%d,%d,%d,%d,%d\n" % ( drw_no, result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], result["drwtNo4"], result["drwtNo5"], result["drwtNo6"], result["bnusNo"], ) ) print( "%d,%d,%d,%d,%d,%d,%d,%d" % ( drw_no, result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], result["drwtNo4"], result["drwtNo5"], result["drwtNo6"], result["bnusNo"], ) ) ball = [ result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], result["drwtNo4"], result["drwtNo5"], result["drwtNo6"], result["bnusNo"], ] json_fp.close() text_fp.close() else: json_fp = open(lotto_history_base + ".json", "w", encoding="utf-8") text_fp = open(lotto_history_base + ".txt", "w", encoding="utf-8") idx = 1 while True: url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(idx) res = requests.post(url, timeout=30) result = res.json() if result.get("returnValue") != "success": break json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") text_fp.write( "%d,%d,%d,%d,%d,%d,%d,%d\n" % ( idx, result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], result["drwtNo4"], result["drwtNo5"], result["drwtNo6"], result["bnusNo"], ) ) print( "%d,%d,%d,%d,%d,%d,%d,%d" % ( idx, result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], result["drwtNo4"], result["drwtNo5"], result["drwtNo6"], result["bnusNo"], ) ) ball = [ result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], result["drwtNo4"], result["drwtNo5"], result["drwtNo6"], result["bnusNo"], ] idx += 1 time.sleep(0.5) json_fp.close() text_fp.close() return ball def predict_fixed(self, result_list): """참고 스크립트와 동일하게 고정 6개 조합 1개를 넣습니다.""" result_list.append([6, 7, 10, 11, 20, 45]) def predict_filter_exhaustive(self, ball_filter: BallFilter, df_ball: pd.DataFrame, no: int, result_list): """전체 8,145,060조합을 순회합니다(시간이 매우 오래 걸릴 수 있음).""" candidates = list(range(1, 46)) for idx, ball in enumerate(itertools.combinations(candidates, 6)): if idx % 1_000_000 == 0: print(" - {} processed...".format(idx)) ball = list(ball) filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) if len(filter_type) > 0: continue result_list.append(ball) def predict_filter_montecarlo( self, ball_filter: BallFilter, df_ball: pd.DataFrame, no: int, result_list: list, max_recommend: int, max_tries: int, seed: int, ): """무작위 조합으로 필터를 통과하는 조합을 최대 `max_recommend`개까지 수집합니다.""" rng = random.Random(seed) pool = list(range(1, 46)) tries = 0 seen = set() while len(result_list) < max_recommend and tries < max_tries: tries += 1 ball = sorted(rng.sample(pool, 6)) key = tuple(ball) if key in seen: continue seen.add(key) filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) if filter_type: continue result_list.append(ball) print( "Monte Carlo: 수집 {}개 / 시도 {}회 (상한 {}회)".format( len(result_list), tries, max_tries ) ) def load_df(resources_path: str) -> pd.DataFrame: path = os.path.join(resources_path, "lotto_history.txt") df_ball = pd.read_csv(path, header=None) df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"] return df_ball def next_draw_no(df_ball: pd.DataFrame) -> int: """파일에 있는 최신 회차의 다음 회차 번호.""" return int(df_ball["no"].max()) + 1 def weekend_ymd_strings(): """표시용: 이번 주 토요일(추첨일) YMD, 지난 주 동일.""" today = datetime.today() if today.weekday() == 5: if today.hour > 20: this_weekend = today + timedelta(days=(12 - today.weekday())) else: this_weekend = today + timedelta(days=(5 - today.weekday())) elif today.weekday() == 6: this_weekend = today + timedelta(days=(12 - today.weekday())) else: this_weekend = today + timedelta(days=(5 - today.weekday())) last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d") ymd = this_weekend.strftime("%Y%m%d") return ymd, last_weekend def main(): parser = argparse.ArgumentParser(description="다음 회차 필터 통과 조합 추천 (final_BallFilter)") parser.add_argument("--project-home", default=".", help="프로젝트 루트") parser.add_argument( "--max-total", type=int, default=99, help="추천 조합 총 개수 상한(고정 1개 포함, 기본 99 = 100 미만)", ) parser.add_argument("--max-tries", type=int, default=3_000_000, help="Monte Carlo 최대 시도 횟수") parser.add_argument("--seed", type=int, default=0) parser.add_argument("--exhaustive", action="store_true", help="전수 탐색(매우 느림)") parser.add_argument("--skip-data-crawl", action="store_true", help="DataCrawler.excute 생략") parser.add_argument("--skip-fetch-next", action="store_true", help="다음 회차 API 크롤 생략") parser.add_argument("--no-telegram", action="store_true", help="텔레그램 전송 생략") parser.add_argument("--draw-no", type=int, default=None, help="예측 대상 회차(미지정 시 파일 기준 다음 회차)") args = parser.parse_args() project_home = os.path.abspath(args.project_home) resources_path = os.path.join(project_home, "resources") os.makedirs(resources_path, exist_ok=True) ymd, last_weekend = weekend_ymd_strings() print("ymd(표시용): {}".format(ymd)) if not args.skip_data_crawl: data_crawler = DataCrawler() data_crawler.excute(resources_path) lotto_history_base = os.path.join(project_home, "resources", "lotto_history") lotto_json = lotto_history_base + ".json" practice = FinalPractice(resources_path) if not args.skip_fetch_next and os.path.isfile(lotto_json): with open(lotto_json, "r", encoding="utf-8") as f: last_json = None for line in f: line = line.strip() if line: last_json = json.loads(line) if last_json is not None: nxt = int(last_json["drwNo"]) + 1 practice.craw(lotto_history_base, drw_no=nxt) df_ball = load_df(resources_path) no = args.draw_no if args.draw_no is not None else next_draw_no(df_ball) print("예측 대상 회차 no: {}".format(no)) lotto_txt = os.path.join(resources_path, "lotto_history.txt") ball_filter = BallFilter(lotto_txt) recommend_path = os.path.join(resources_path, "recommend_ball.final.json") if os.path.isfile(recommend_path): with open(recommend_path, "r", encoding="utf-8") as result_fp: result_json = json.load(result_fp) result_json[ymd] = [] else: result_json = {ymd: []} if args.no_telegram: practice.bot = type("T", (), {"sendMsg": lambda self, m: None})() practice.predict_fixed(result_json[ymd]) mc_cap = max(0, args.max_total - len(result_json[ymd])) if args.exhaustive: practice.predict_filter_exhaustive(ball_filter, df_ball, no, result_json[ymd]) else: practice.predict_filter_montecarlo( ball_filter, df_ball, no, result_json[ymd], max_recommend=mc_cap, max_tries=args.max_tries, seed=args.seed, ) with open(recommend_path, "w", encoding="utf-8") as out_fp: json.dump(result_json, out_fp, ensure_ascii=False) prev_row = df_ball[df_ball["no"] == no - 1] if prev_row.empty: p_no, p_ball = no - 1, [] print("경고: no-1={} 행이 없어 이전 당첨 표시를 건너뜁니다.".format(no - 1)) else: p_no = int(prev_row["no"].iloc[0]) p_ball = [int(prev_row["b{}".format(i)].iloc[0]) for i in range(1, 7)] p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format( last_weekend, p_no, str(p_ball), ymd, no ) for i, ball in enumerate(result_json[ymd]): p_str += " {}. {}\n".format((i + 1), str(ball)) if (i + 1) % 100 == 0: practice.bot.sendMsg("{}".format(p_str)) p_str = "" if len(result_json[ymd]) % 100 != 0: practice.bot.sendMsg("{}".format(p_str)) size = len(result_json[ymd]) print("recommend size: {}".format(size)) print("저장: {}".format(recommend_path)) print("done...") if __name__ == "__main__": main()