refactor: final_practice mirrors 3_Practice_22 (txt-based next no, exhaustive predict2)

Made-with: Cursor
This commit is contained in:
2026-04-08 19:41:04 +09:00
parent b440ec96c9
commit e31eefef09
2 changed files with 87 additions and 175 deletions

View File

@@ -15,7 +15,7 @@
- **`filter_model.py`** — `from final_BallFilter import BallFilter` 재노출. - **`filter_model.py`** — `from final_BallFilter import BallFilter` 재노출.
- **`train.py` / `valid.py`** — 구간별로 당첨 6개가 모든 필터를 통과한 회차 수 집계. - **`train.py` / `valid.py`** — 구간별로 당첨 6개가 모든 필터를 통과한 회차 수 집계.
- **`final_filterTest.py`** — `1_FilterTest_25.py`와 동일한 분석·(선택) MC 생존 추정. - **`final_filterTest.py`** — `1_FilterTest_25.py`와 동일한 분석·(선택) MC 생존 추정.
- **`final_practice.py`** — `3_Practice_22.py`같이 다음 회차용 추천 조합 생성(`final_BallFilter`, 기본은 Monte Carlo로 100개 미만). - **`final_practice.py`** — `3_Practice_22.py`동일 흐름(DataCrawler → 마지막 JSON 회차+1 크롤 → `predict1`+`predict2`). `lotto_history.txt``BallFilter`를 만들고 회차는 `max(no)+1`, `predict2`는 전 조합 순회(시간 매우 김).
## 실행 (miniconda **ncue**) ## 실행 (miniconda **ncue**)
@@ -26,8 +26,7 @@ python valid.py
python final_filterTest.py python final_filterTest.py
# 특정 회차 생존 조합 수 Monte Carlo 근사 # 특정 회차 생존 조합 수 Monte Carlo 근사
python final_filterTest.py --mc-no 900 --mc-samples 12000 python final_filterTest.py --mc-no 900 --mc-samples 12000
# 다음 회차 추천(네트워크·텔레그램 없이 로컬만) python final_practice.py
python final_practice.py --skip-data-crawl --skip-fetch-next --no-telegram
``` ```
동일 환경을 셸 스크립트로: 동일 환경을 셸 스크립트로:

View File

@@ -1,14 +1,14 @@
# `3_Practice_22.py` 흐름을 따르되 `final_BallFilter` + `lotto_history.txt`로 다음 회차 후보를 만듭니다. # `3_Practice_22.py`와 동일한 흐름입니다.
import argparse # - `resources/lotto_history.txt`(및 크롤 시 `.json`)를 사용합니다.
# - 필터만 `final_BallFilter.BallFilter` + `lotto_history.txt` 로딩으로 교체했습니다.
# - 회차 번호는 JSON의 `getNo(ymd)` 대신 `lotto_history.txt` 최대 회차 + 1 을 사용합니다.
import itertools import itertools
import json import json
import os import os
import random
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pandas as pd import pandas as pd
import requests
from DataCrawler import DataCrawler from DataCrawler import DataCrawler
from final_BallFilter import BallFilter from final_BallFilter import BallFilter
@@ -24,29 +24,31 @@ except Exception: # pragma: no cover
print(msg) print(msg)
def fetch_lotto_draw_json(drw_no: int):
"""동행복권 API 한 건. SSL·POST/GET 재시도는 DataCrawler._fetch_draw와 동일."""
return DataCrawler()._fetch_draw(int(drw_no))
class FinalPractice: class FinalPractice:
bot = None
def __init__(self, resources_path): def __init__(self, resources_path):
self.resources_path = resources_path
self.bot = TelegramBot() self.bot = TelegramBot()
def craw(self, lotto_history_base: str, drw_no=None): def craw(self, lottoHistoryFile, drwNo=None):
"""동행복권 API로 회차별 결과를 `lotto_history.json` / `.txt`에 저장합니다."""
ball = None ball = None
if drw_no is not None: if drwNo is not None:
json_fp = open(lotto_history_base + ".json", "a", encoding="utf-8") result = fetch_lotto_draw_json(drwNo)
text_fp = open(lotto_history_base + ".txt", "a", encoding="utf-8") if result is None:
url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(drw_no) print("경고: 회차 {} API 조회 실패(SSL/네트워크 또는 미추첨).".format(drwNo))
res = requests.post(url, timeout=30)
result = res.json()
if result.get("returnValue") != "success":
json_fp.close()
text_fp.close()
return None return None
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") jsonFp = open(lottoHistoryFile + ".json", "a", encoding="utf-8")
text_fp.write( textFp = open(lottoHistoryFile + ".txt", "a", encoding="utf-8")
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
textFp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n" "%d,%d,%d,%d,%d,%d,%d,%d\n"
% ( % (
drw_no, drwNo,
result["drwtNo1"], result["drwtNo1"],
result["drwtNo2"], result["drwtNo2"],
result["drwtNo3"], result["drwtNo3"],
@@ -59,7 +61,7 @@ class FinalPractice:
print( print(
"%d,%d,%d,%d,%d,%d,%d,%d" "%d,%d,%d,%d,%d,%d,%d,%d"
% ( % (
drw_no, drwNo,
result["drwtNo1"], result["drwtNo1"],
result["drwtNo2"], result["drwtNo2"],
result["drwtNo3"], result["drwtNo3"],
@@ -78,20 +80,18 @@ class FinalPractice:
result["drwtNo6"], result["drwtNo6"],
result["bnusNo"], result["bnusNo"],
] ]
json_fp.close() jsonFp.close()
text_fp.close() textFp.close()
else: else:
json_fp = open(lotto_history_base + ".json", "w", encoding="utf-8") jsonFp = open(lottoHistoryFile + ".json", "w", encoding="utf-8")
text_fp = open(lotto_history_base + ".txt", "w", encoding="utf-8") textFp = open(lottoHistoryFile + ".txt", "w", encoding="utf-8")
idx = 1 idx = 1
while True: while True:
url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(idx) result = fetch_lotto_draw_json(idx)
res = requests.post(url, timeout=30) if result is None or result.get("returnValue") != "success":
result = res.json()
if result.get("returnValue") != "success":
break break
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n") jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
text_fp.write( textFp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n" "%d,%d,%d,%d,%d,%d,%d,%d\n"
% ( % (
idx, idx,
@@ -128,73 +128,49 @@ class FinalPractice:
] ]
idx += 1 idx += 1
time.sleep(0.5) time.sleep(0.5)
json_fp.close() jsonFp.close()
text_fp.close() textFp.close()
return ball return ball
def predict_fixed(self, result_list): def predict1(self, result_json):
"""참고 스크립트와 동일하게 고정 6개 조합 1개를 넣습니다.""" result_json.append([6, 7, 10, 11, 20, 45])
result_list.append([6, 7, 10, 11, 20, 45])
def predict_filter_exhaustive(self, ball_filter: BallFilter, df_ball: pd.DataFrame, no: int, result_list): def predict2(self, resources_path, ymd, result_json):
"""전체 8,145,060조합을 순회합니다(시간이 매우 오래 걸릴 수 있음).""" candidates = [i for i in range(1, 46)]
candidates = list(range(1, 46))
for idx, ball in enumerate(itertools.combinations(candidates, 6)): lottoHistoryFileName = os.path.join(resources_path, "lotto_history.txt")
if idx % 1_000_000 == 0: df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"]
no = int(df_ball["no"].max()) + 1
print("회차: {}".format(no))
ballFilter = BallFilter(lottoHistoryFileName)
nCr = list(itertools.combinations(candidates, 6))
for idx, ball in enumerate(nCr):
if idx % 1000000 == 0:
print(" - {} processed...".format(idx)) print(" - {} processed...".format(idx))
ball = list(ball) ball = list(ball)
filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball) filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
if len(filter_type) > 0: filter_size = len(filter_type)
if 0 < filter_size:
continue continue
result_list.append(ball) result_json.append(ball)
def predict_filter_montecarlo( p_ball = df_ball[df_ball["no"] == no - 1].values.tolist()[0]
self, p_no = p_ball[0]
ball_filter: BallFilter, p_ball = p_ball[1:7]
df_ball: pd.DataFrame, return p_no, p_ball
no: int,
result_list: list,
max_recommend: int,
max_tries: int,
seed: int,
):
"""무작위 조합으로 필터를 통과하는 조합을 최대 `max_recommend`개까지 수집합니다."""
rng = random.Random(seed)
pool = list(range(1, 46))
tries = 0
seen = set()
while len(result_list) < max_recommend and tries < max_tries:
tries += 1
ball = sorted(rng.sample(pool, 6))
key = tuple(ball)
if key in seen:
continue
seen.add(key)
filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball)
if filter_type:
continue
result_list.append(ball)
print(
"Monte Carlo: 수집 {}개 / 시도 {}회 (상한 {}회)".format(
len(result_list), tries, max_tries
)
)
def load_df(resources_path: str) -> pd.DataFrame: if __name__ == "__main__":
path = os.path.join(resources_path, "lotto_history.txt") PROJECT_HOME = "."
df_ball = pd.read_csv(path, header=None) resources_path = os.path.join(PROJECT_HOME, "resources")
df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"]
return df_ball
dataCrawler = DataCrawler()
dataCrawler.excute(resources_path)
def next_draw_no(df_ball: pd.DataFrame) -> int:
"""파일에 있는 최신 회차의 다음 회차 번호."""
return int(df_ball["no"].max()) + 1
def weekend_ymd_strings():
"""표시용: 이번 주 토요일(추첨일) YMD, 지난 주 동일."""
today = datetime.today() today = datetime.today()
if today.weekday() == 5: if today.weekday() == 5:
if today.hour > 20: if today.hour > 20:
@@ -205,115 +181,52 @@ def weekend_ymd_strings():
this_weekend = today + timedelta(days=(12 - today.weekday())) this_weekend = today + timedelta(days=(12 - today.weekday()))
else: else:
this_weekend = today + timedelta(days=(5 - today.weekday())) this_weekend = today + timedelta(days=(5 - today.weekday()))
last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d") last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d")
ymd = this_weekend.strftime("%Y%m%d") ymd = this_weekend.strftime("%Y%m%d")
return ymd, last_weekend
print("ymd: {}".format(ymd))
def main():
parser = argparse.ArgumentParser(description="다음 회차 필터 통과 조합 추천 (final_BallFilter)")
parser.add_argument("--project-home", default=".", help="프로젝트 루트")
parser.add_argument(
"--max-total",
type=int,
default=99,
help="추천 조합 총 개수 상한(고정 1개 포함, 기본 99 = 100 미만)",
)
parser.add_argument("--max-tries", type=int, default=3_000_000, help="Monte Carlo 최대 시도 횟수")
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--exhaustive", action="store_true", help="전수 탐색(매우 느림)")
parser.add_argument("--skip-data-crawl", action="store_true", help="DataCrawler.excute 생략")
parser.add_argument("--skip-fetch-next", action="store_true", help="다음 회차 API 크롤 생략")
parser.add_argument("--no-telegram", action="store_true", help="텔레그램 전송 생략")
parser.add_argument("--draw-no", type=int, default=None, help="예측 대상 회차(미지정 시 파일 기준 다음 회차)")
args = parser.parse_args()
project_home = os.path.abspath(args.project_home)
resources_path = os.path.join(project_home, "resources")
os.makedirs(resources_path, exist_ok=True)
ymd, last_weekend = weekend_ymd_strings()
print("ymd(표시용): {}".format(ymd))
if not args.skip_data_crawl:
data_crawler = DataCrawler()
data_crawler.excute(resources_path)
lotto_history_base = os.path.join(project_home, "resources", "lotto_history")
lotto_json = lotto_history_base + ".json"
practice = FinalPractice(resources_path) practice = FinalPractice(resources_path)
if not args.skip_fetch_next and os.path.isfile(lotto_json):
with open(lotto_json, "r", encoding="utf-8") as f:
last_json = None
for line in f:
line = line.strip()
if line:
last_json = json.loads(line)
if last_json is not None:
nxt = int(last_json["drwNo"]) + 1
practice.craw(lotto_history_base, drw_no=nxt)
df_ball = load_df(resources_path) lottoHistoryFile = PROJECT_HOME + "/resources/lotto_history"
no = args.draw_no if args.draw_no is not None else next_draw_no(df_ball) lottoHistoryFileName = lottoHistoryFile + ".json"
print("예측 대상 회차 no: {}".format(no)) with open(lottoHistoryFileName, "r", encoding="utf-8") as f:
for line in f:
pass
last_json = json.loads(line)
lotto_txt = os.path.join(resources_path, "lotto_history.txt") ball = practice.craw(lottoHistoryFile, drwNo=last_json["drwNo"] + 1)
ball_filter = BallFilter(lotto_txt)
recommend_path = os.path.join(resources_path, "recommend_ball.final.json") recommend_result_file = os.path.join(resources_path, "recommend_ball.final.json")
if os.path.isfile(recommend_path): if os.path.isfile(recommend_result_file):
with open(recommend_path, "r", encoding="utf-8") as result_fp: result_fp = open(recommend_result_file, "r", encoding="utf-8")
result_json = json.load(result_fp) result_json = json.load(result_fp)
result_fp.close()
result_json[ymd] = [] result_json[ymd] = []
else: else:
result_json = {ymd: []} result_json = {ymd: []}
if args.no_telegram: practice.predict1(result_json[ymd])
practice.bot = type("T", (), {"sendMsg": lambda self, m: None})() p_no, p_ball = practice.predict2(resources_path, ymd, result_json[ymd])
practice.predict_fixed(result_json[ymd]) with open(recommend_result_file, "w", encoding="utf-8") as outFp:
mc_cap = max(0, args.max_total - len(result_json[ymd])) json.dump(result_json, outFp, ensure_ascii=False)
if args.exhaustive:
practice.predict_filter_exhaustive(ball_filter, df_ball, no, result_json[ymd])
else:
practice.predict_filter_montecarlo(
ball_filter,
df_ball,
no,
result_json[ymd],
max_recommend=mc_cap,
max_tries=args.max_tries,
seed=args.seed,
)
with open(recommend_path, "w", encoding="utf-8") as out_fp:
json.dump(result_json, out_fp, ensure_ascii=False)
prev_row = df_ball[df_ball["no"] == no - 1]
if prev_row.empty:
p_no, p_ball = no - 1, []
print("경고: no-1={} 행이 없어 이전 당첨 표시를 건너뜁니다.".format(no - 1))
else:
p_no = int(prev_row["no"].iloc[0])
p_ball = [int(prev_row["b{}".format(i)].iloc[0]) for i in range(1, 7)]
no_predict = int(p_no) + 1
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format( p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format(
last_weekend, p_no, str(p_ball), ymd, no last_weekend, p_no, str(p_ball), ymd, no_predict
) )
for i, ball in enumerate(result_json[ymd]): for i, ball in enumerate(result_json[ymd]):
p_str += " {}. {}\n".format((i + 1), str(ball)) p_str += " {}. {}\n".format((i + 1), str(ball))
if (i + 1) % 100 == 0: if (i + 1) % 100 == 0:
practice.bot.sendMsg("{}".format(p_str)) practice.bot.sendMsg("{}".format(p_str))
p_str = "" p_str = ""
if len(result_json[ymd]) % 100 != 0: if len(result_json[ymd]) % 100 != 0:
practice.bot.sendMsg("{}".format(p_str)) practice.bot.sendMsg("{}".format(p_str))
size = len(result_json[ymd]) size = len(result_json[ymd])
print("recommend size: {}".format(size)) print("size: {}".format(size))
print("저장: {}".format(recommend_path))
print("done...") print("done...")
if __name__ == "__main__":
main()