diff --git a/README.md b/README.md index 4294d18..e38219e 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ - **`filter_model.py`** — `from final_BallFilter import BallFilter` 재노출. - **`train.py` / `valid.py`** — 구간별로 당첨 6개가 모든 필터를 통과한 회차 수 집계. - **`final_filterTest.py`** — `1_FilterTest_25.py`와 동일한 분석·(선택) MC 생존 추정. -- **`final_practice.py`** — `3_Practice_22.py`와 같이 다음 회차용 추천 조합 생성(`final_BallFilter`, 기본은 Monte Carlo로 100개 미만). +- **`final_practice.py`** — `3_Practice_22.py`와 동일 흐름(DataCrawler → 마지막 JSON 회차+1 크롤 → `predict1`+`predict2`). `lotto_history.txt`로 `BallFilter`를 만들고 회차는 `max(no)+1`, `predict2`는 전 조합 순회(시간 매우 김). ## 실행 (miniconda **ncue**) @@ -26,8 +26,7 @@ python valid.py python final_filterTest.py # 특정 회차 생존 조합 수 Monte Carlo 근사 python final_filterTest.py --mc-no 900 --mc-samples 12000 -# 다음 회차 추천(네트워크·텔레그램 없이 로컬만) -python final_practice.py --skip-data-crawl --skip-fetch-next --no-telegram +python final_practice.py ``` 동일 환경을 셸 스크립트로: diff --git a/final_practice.py b/final_practice.py index 74545a0..12ce42b 100644 --- a/final_practice.py +++ b/final_practice.py @@ -1,14 +1,14 @@ -# `3_Practice_22.py` 흐름을 따르되 `final_BallFilter` + `lotto_history.txt`로 다음 회차 후보를 만듭니다. -import argparse +# `3_Practice_22.py`와 동일한 흐름입니다. +# - `resources/lotto_history.txt`(및 크롤 시 `.json`)를 사용합니다. +# - 필터만 `final_BallFilter.BallFilter` + `lotto_history.txt` 로딩으로 교체했습니다. +# - 회차 번호는 JSON의 `getNo(ymd)` 대신 `lotto_history.txt` 최대 회차 + 1 을 사용합니다. import itertools import json import os -import random import time from datetime import datetime, timedelta import pandas as pd -import requests from DataCrawler import DataCrawler from final_BallFilter import BallFilter @@ -24,29 +24,31 @@ except Exception: # pragma: no cover print(msg) +def fetch_lotto_draw_json(drw_no: int): + """동행복권 API 한 건. SSL·POST/GET 재시도는 DataCrawler._fetch_draw와 동일.""" + return DataCrawler()._fetch_draw(int(drw_no)) + + class FinalPractice: + bot = None + def __init__(self, resources_path): - self.resources_path = resources_path self.bot = TelegramBot() - def craw(self, lotto_history_base: str, drw_no=None): - """동행복권 API로 회차별 결과를 `lotto_history.json` / `.txt`에 저장합니다.""" + def craw(self, lottoHistoryFile, drwNo=None): ball = None - if drw_no is not None: - json_fp = open(lotto_history_base + ".json", "a", encoding="utf-8") - text_fp = open(lotto_history_base + ".txt", "a", encoding="utf-8") - url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(drw_no) - res = requests.post(url, timeout=30) - result = res.json() - if result.get("returnValue") != "success": - json_fp.close() - text_fp.close() + if drwNo is not None: + result = fetch_lotto_draw_json(drwNo) + if result is None: + print("경고: 회차 {} API 조회 실패(SSL/네트워크 또는 미추첨).".format(drwNo)) return None - json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") - text_fp.write( + jsonFp = open(lottoHistoryFile + ".json", "a", encoding="utf-8") + textFp = open(lottoHistoryFile + ".txt", "a", encoding="utf-8") + jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n") + textFp.write( "%d,%d,%d,%d,%d,%d,%d,%d\n" % ( - drw_no, + drwNo, result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], @@ -59,7 +61,7 @@ class FinalPractice: print( "%d,%d,%d,%d,%d,%d,%d,%d" % ( - drw_no, + drwNo, result["drwtNo1"], result["drwtNo2"], result["drwtNo3"], @@ -78,20 +80,18 @@ class FinalPractice: result["drwtNo6"], result["bnusNo"], ] - json_fp.close() - text_fp.close() + jsonFp.close() + textFp.close() else: - json_fp = open(lotto_history_base + ".json", "w", encoding="utf-8") - text_fp = open(lotto_history_base + ".txt", "w", encoding="utf-8") + jsonFp = open(lottoHistoryFile + ".json", "w", encoding="utf-8") + textFp = open(lottoHistoryFile + ".txt", "w", encoding="utf-8") idx = 1 while True: - url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(idx) - res = requests.post(url, timeout=30) - result = res.json() - if result.get("returnValue") != "success": + result = fetch_lotto_draw_json(idx) + if result is None or result.get("returnValue") != "success": break - json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") - text_fp.write( + jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n") + textFp.write( "%d,%d,%d,%d,%d,%d,%d,%d\n" % ( idx, @@ -128,73 +128,49 @@ class FinalPractice: ] idx += 1 time.sleep(0.5) - json_fp.close() - text_fp.close() + jsonFp.close() + textFp.close() return ball - def predict_fixed(self, result_list): - """참고 스크립트와 동일하게 고정 6개 조합 1개를 넣습니다.""" - result_list.append([6, 7, 10, 11, 20, 45]) + def predict1(self, result_json): + result_json.append([6, 7, 10, 11, 20, 45]) - def predict_filter_exhaustive(self, ball_filter: BallFilter, df_ball: pd.DataFrame, no: int, result_list): - """전체 8,145,060조합을 순회합니다(시간이 매우 오래 걸릴 수 있음).""" - candidates = list(range(1, 46)) - for idx, ball in enumerate(itertools.combinations(candidates, 6)): - if idx % 1_000_000 == 0: + def predict2(self, resources_path, ymd, result_json): + candidates = [i for i in range(1, 46)] + + lottoHistoryFileName = os.path.join(resources_path, "lotto_history.txt") + df_ball = pd.read_csv(lottoHistoryFileName, header=None) + df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"] + + no = int(df_ball["no"].max()) + 1 + print("회차: {}".format(no)) + + ballFilter = BallFilter(lottoHistoryFileName) + + nCr = list(itertools.combinations(candidates, 6)) + for idx, ball in enumerate(nCr): + if idx % 1000000 == 0: print(" - {} processed...".format(idx)) ball = list(ball) - filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) - if len(filter_type) > 0: + filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball) + filter_size = len(filter_type) + if 0 < filter_size: continue - result_list.append(ball) + result_json.append(ball) - def predict_filter_montecarlo( - self, - ball_filter: BallFilter, - df_ball: pd.DataFrame, - no: int, - result_list: list, - max_recommend: int, - max_tries: int, - seed: int, - ): - """무작위 조합으로 필터를 통과하는 조합을 최대 `max_recommend`개까지 수집합니다.""" - rng = random.Random(seed) - pool = list(range(1, 46)) - tries = 0 - seen = set() - while len(result_list) < max_recommend and tries < max_tries: - tries += 1 - ball = sorted(rng.sample(pool, 6)) - key = tuple(ball) - if key in seen: - continue - seen.add(key) - filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) - if filter_type: - continue - result_list.append(ball) - print( - "Monte Carlo: 수집 {}개 / 시도 {}회 (상한 {}회)".format( - len(result_list), tries, max_tries - ) - ) + p_ball = df_ball[df_ball["no"] == no - 1].values.tolist()[0] + p_no = p_ball[0] + p_ball = p_ball[1:7] + return p_no, p_ball -def load_df(resources_path: str) -> pd.DataFrame: - path = os.path.join(resources_path, "lotto_history.txt") - df_ball = pd.read_csv(path, header=None) - df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"] - return df_ball +if __name__ == "__main__": + PROJECT_HOME = "." + resources_path = os.path.join(PROJECT_HOME, "resources") + dataCrawler = DataCrawler() + dataCrawler.excute(resources_path) -def next_draw_no(df_ball: pd.DataFrame) -> int: - """파일에 있는 최신 회차의 다음 회차 번호.""" - return int(df_ball["no"].max()) + 1 - - -def weekend_ymd_strings(): - """표시용: 이번 주 토요일(추첨일) YMD, 지난 주 동일.""" today = datetime.today() if today.weekday() == 5: if today.hour > 20: @@ -205,115 +181,52 @@ def weekend_ymd_strings(): this_weekend = today + timedelta(days=(12 - today.weekday())) else: this_weekend = today + timedelta(days=(5 - today.weekday())) + last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d") ymd = this_weekend.strftime("%Y%m%d") - return ymd, last_weekend + print("ymd: {}".format(ymd)) -def main(): - parser = argparse.ArgumentParser(description="다음 회차 필터 통과 조합 추천 (final_BallFilter)") - parser.add_argument("--project-home", default=".", help="프로젝트 루트") - parser.add_argument( - "--max-total", - type=int, - default=99, - help="추천 조합 총 개수 상한(고정 1개 포함, 기본 99 = 100 미만)", - ) - parser.add_argument("--max-tries", type=int, default=3_000_000, help="Monte Carlo 최대 시도 횟수") - parser.add_argument("--seed", type=int, default=0) - parser.add_argument("--exhaustive", action="store_true", help="전수 탐색(매우 느림)") - parser.add_argument("--skip-data-crawl", action="store_true", help="DataCrawler.excute 생략") - parser.add_argument("--skip-fetch-next", action="store_true", help="다음 회차 API 크롤 생략") - parser.add_argument("--no-telegram", action="store_true", help="텔레그램 전송 생략") - parser.add_argument("--draw-no", type=int, default=None, help="예측 대상 회차(미지정 시 파일 기준 다음 회차)") - args = parser.parse_args() - - project_home = os.path.abspath(args.project_home) - resources_path = os.path.join(project_home, "resources") - os.makedirs(resources_path, exist_ok=True) - - ymd, last_weekend = weekend_ymd_strings() - print("ymd(표시용): {}".format(ymd)) - - if not args.skip_data_crawl: - data_crawler = DataCrawler() - data_crawler.excute(resources_path) - - lotto_history_base = os.path.join(project_home, "resources", "lotto_history") - lotto_json = lotto_history_base + ".json" practice = FinalPractice(resources_path) - if not args.skip_fetch_next and os.path.isfile(lotto_json): - with open(lotto_json, "r", encoding="utf-8") as f: - last_json = None - for line in f: - line = line.strip() - if line: - last_json = json.loads(line) - if last_json is not None: - nxt = int(last_json["drwNo"]) + 1 - practice.craw(lotto_history_base, drw_no=nxt) - df_ball = load_df(resources_path) - no = args.draw_no if args.draw_no is not None else next_draw_no(df_ball) - print("예측 대상 회차 no: {}".format(no)) + lottoHistoryFile = PROJECT_HOME + "/resources/lotto_history" + lottoHistoryFileName = lottoHistoryFile + ".json" + with open(lottoHistoryFileName, "r", encoding="utf-8") as f: + for line in f: + pass + last_json = json.loads(line) - lotto_txt = os.path.join(resources_path, "lotto_history.txt") - ball_filter = BallFilter(lotto_txt) + ball = practice.craw(lottoHistoryFile, drwNo=last_json["drwNo"] + 1) - recommend_path = os.path.join(resources_path, "recommend_ball.final.json") - if os.path.isfile(recommend_path): - with open(recommend_path, "r", encoding="utf-8") as result_fp: - result_json = json.load(result_fp) + recommend_result_file = os.path.join(resources_path, "recommend_ball.final.json") + if os.path.isfile(recommend_result_file): + result_fp = open(recommend_result_file, "r", encoding="utf-8") + result_json = json.load(result_fp) + result_fp.close() result_json[ymd] = [] else: result_json = {ymd: []} - if args.no_telegram: - practice.bot = type("T", (), {"sendMsg": lambda self, m: None})() + practice.predict1(result_json[ymd]) + p_no, p_ball = practice.predict2(resources_path, ymd, result_json[ymd]) - practice.predict_fixed(result_json[ymd]) - mc_cap = max(0, args.max_total - len(result_json[ymd])) - - if args.exhaustive: - practice.predict_filter_exhaustive(ball_filter, df_ball, no, result_json[ymd]) - else: - practice.predict_filter_montecarlo( - ball_filter, - df_ball, - no, - result_json[ymd], - max_recommend=mc_cap, - max_tries=args.max_tries, - seed=args.seed, - ) - - with open(recommend_path, "w", encoding="utf-8") as out_fp: - json.dump(result_json, out_fp, ensure_ascii=False) - - prev_row = df_ball[df_ball["no"] == no - 1] - if prev_row.empty: - p_no, p_ball = no - 1, [] - print("경고: no-1={} 행이 없어 이전 당첨 표시를 건너뜁니다.".format(no - 1)) - else: - p_no = int(prev_row["no"].iloc[0]) - p_ball = [int(prev_row["b{}".format(i)].iloc[0]) for i in range(1, 7)] + with open(recommend_result_file, "w", encoding="utf-8") as outFp: + json.dump(result_json, outFp, ensure_ascii=False) + no_predict = int(p_no) + 1 p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format( - last_weekend, p_no, str(p_ball), ymd, no + last_weekend, p_no, str(p_ball), ymd, no_predict ) for i, ball in enumerate(result_json[ymd]): p_str += " {}. {}\n".format((i + 1), str(ball)) if (i + 1) % 100 == 0: practice.bot.sendMsg("{}".format(p_str)) p_str = "" + if len(result_json[ymd]) % 100 != 0: practice.bot.sendMsg("{}".format(p_str)) size = len(result_json[ymd]) - print("recommend size: {}".format(size)) - print("저장: {}".format(recommend_path)) + print("size: {}".format(size)) + print("done...") - - -if __name__ == "__main__": - main()