Add final_practice.py for next-draw recommendations via final_BallFilter

- Mirrors 3_Practice_22 flow: DataCrawler, optional API crawl, fixed combo,
  Monte Carlo filtered samples (default) or exhaustive mode
- Caps total recommendations under 100; saves recommend_ball.final.json

Made-with: Cursor
This commit is contained in:
2026-04-08 19:33:26 +09:00
parent 52e8495148
commit d08e906066
2 changed files with 322 additions and 0 deletions

319
final_practice.py Normal file
View File

@@ -0,0 +1,319 @@
# `3_Practice_22.py` 흐름을 따르되 `final_BallFilter` + `lotto_history.txt`로 다음 회차 후보를 만듭니다.
import argparse
import itertools
import json
import os
import random
import time
from datetime import datetime, timedelta
import pandas as pd
import requests
from DataCrawler import DataCrawler
from final_BallFilter import BallFilter
try:
from TelegramBot import TelegramBot
except Exception: # pragma: no cover
class TelegramBot:
def __init__(self, enable=True):
pass
def sendMsg(self, msg):
print(msg)
class FinalPractice:
def __init__(self, resources_path):
self.resources_path = resources_path
self.bot = TelegramBot()
def craw(self, lotto_history_base: str, drw_no=None):
"""동행복권 API로 회차별 결과를 `lotto_history.json` / `.txt`에 저장합니다."""
ball = None
if drw_no is not None:
json_fp = open(lotto_history_base + ".json", "a", encoding="utf-8")
text_fp = open(lotto_history_base + ".txt", "a", encoding="utf-8")
url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(drw_no)
res = requests.post(url, timeout=30)
result = res.json()
if result.get("returnValue") != "success":
json_fp.close()
text_fp.close()
return None
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
text_fp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
drw_no,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
print(
"%d,%d,%d,%d,%d,%d,%d,%d"
% (
drw_no,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
ball = [
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
]
json_fp.close()
text_fp.close()
else:
json_fp = open(lotto_history_base + ".json", "w", encoding="utf-8")
text_fp = open(lotto_history_base + ".txt", "w", encoding="utf-8")
idx = 1
while True:
url = "https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=" + str(idx)
res = requests.post(url, timeout=30)
result = res.json()
if result.get("returnValue") != "success":
break
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
text_fp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
idx,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
print(
"%d,%d,%d,%d,%d,%d,%d,%d"
% (
idx,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
ball = [
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
]
idx += 1
time.sleep(0.5)
json_fp.close()
text_fp.close()
return ball
def predict_fixed(self, result_list):
"""참고 스크립트와 동일하게 고정 6개 조합 1개를 넣습니다."""
result_list.append([6, 7, 10, 11, 20, 45])
def predict_filter_exhaustive(self, ball_filter: BallFilter, df_ball: pd.DataFrame, no: int, result_list):
"""전체 8,145,060조합을 순회합니다(시간이 매우 오래 걸릴 수 있음)."""
candidates = list(range(1, 46))
for idx, ball in enumerate(itertools.combinations(candidates, 6)):
if idx % 1_000_000 == 0:
print(" - {} processed...".format(idx))
ball = list(ball)
filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball)
if len(filter_type) > 0:
continue
result_list.append(ball)
def predict_filter_montecarlo(
self,
ball_filter: BallFilter,
df_ball: pd.DataFrame,
no: int,
result_list: list,
max_recommend: int,
max_tries: int,
seed: int,
):
"""무작위 조합으로 필터를 통과하는 조합을 최대 `max_recommend`개까지 수집합니다."""
rng = random.Random(seed)
pool = list(range(1, 46))
tries = 0
seen = set()
while len(result_list) < max_recommend and tries < max_tries:
tries += 1
ball = sorted(rng.sample(pool, 6))
key = tuple(ball)
if key in seen:
continue
seen.add(key)
filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball)
if filter_type:
continue
result_list.append(ball)
print(
"Monte Carlo: 수집 {}개 / 시도 {}회 (상한 {}회)".format(
len(result_list), tries, max_tries
)
)
def load_df(resources_path: str) -> pd.DataFrame:
path = os.path.join(resources_path, "lotto_history.txt")
df_ball = pd.read_csv(path, header=None)
df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"]
return df_ball
def next_draw_no(df_ball: pd.DataFrame) -> int:
"""파일에 있는 최신 회차의 다음 회차 번호."""
return int(df_ball["no"].max()) + 1
def weekend_ymd_strings():
"""표시용: 이번 주 토요일(추첨일) YMD, 지난 주 동일."""
today = datetime.today()
if today.weekday() == 5:
if today.hour > 20:
this_weekend = today + timedelta(days=(12 - today.weekday()))
else:
this_weekend = today + timedelta(days=(5 - today.weekday()))
elif today.weekday() == 6:
this_weekend = today + timedelta(days=(12 - today.weekday()))
else:
this_weekend = today + timedelta(days=(5 - today.weekday()))
last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d")
ymd = this_weekend.strftime("%Y%m%d")
return ymd, last_weekend
def main():
parser = argparse.ArgumentParser(description="다음 회차 필터 통과 조합 추천 (final_BallFilter)")
parser.add_argument("--project-home", default=".", help="프로젝트 루트")
parser.add_argument(
"--max-total",
type=int,
default=99,
help="추천 조합 총 개수 상한(고정 1개 포함, 기본 99 = 100 미만)",
)
parser.add_argument("--max-tries", type=int, default=3_000_000, help="Monte Carlo 최대 시도 횟수")
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--exhaustive", action="store_true", help="전수 탐색(매우 느림)")
parser.add_argument("--skip-data-crawl", action="store_true", help="DataCrawler.excute 생략")
parser.add_argument("--skip-fetch-next", action="store_true", help="다음 회차 API 크롤 생략")
parser.add_argument("--no-telegram", action="store_true", help="텔레그램 전송 생략")
parser.add_argument("--draw-no", type=int, default=None, help="예측 대상 회차(미지정 시 파일 기준 다음 회차)")
args = parser.parse_args()
project_home = os.path.abspath(args.project_home)
resources_path = os.path.join(project_home, "resources")
os.makedirs(resources_path, exist_ok=True)
ymd, last_weekend = weekend_ymd_strings()
print("ymd(표시용): {}".format(ymd))
if not args.skip_data_crawl:
data_crawler = DataCrawler()
data_crawler.excute(resources_path)
lotto_history_base = os.path.join(project_home, "resources", "lotto_history")
lotto_json = lotto_history_base + ".json"
practice = FinalPractice(resources_path)
if not args.skip_fetch_next and os.path.isfile(lotto_json):
with open(lotto_json, "r", encoding="utf-8") as f:
last_json = None
for line in f:
line = line.strip()
if line:
last_json = json.loads(line)
if last_json is not None:
nxt = int(last_json["drwNo"]) + 1
practice.craw(lotto_history_base, drwNo=nxt)
df_ball = load_df(resources_path)
no = args.draw_no if args.draw_no is not None else next_draw_no(df_ball)
print("예측 대상 회차 no: {}".format(no))
lotto_txt = os.path.join(resources_path, "lotto_history.txt")
ball_filter = BallFilter(lotto_txt)
recommend_path = os.path.join(resources_path, "recommend_ball.final.json")
if os.path.isfile(recommend_path):
with open(recommend_path, "r", encoding="utf-8") as result_fp:
result_json = json.load(result_fp)
result_json[ymd] = []
else:
result_json = {ymd: []}
if args.no_telegram:
practice.bot = type("T", (), {"sendMsg": lambda self, m: None})()
practice.predict_fixed(result_json[ymd])
mc_cap = max(0, args.max_total - len(result_json[ymd]))
if args.exhaustive:
practice.predict_filter_exhaustive(ball_filter, df_ball, no, result_json[ymd])
else:
practice.predict_filter_montecarlo(
ball_filter,
df_ball,
no,
result_json[ymd],
max_recommend=mc_cap,
max_tries=args.max_tries,
seed=args.seed,
)
with open(recommend_path, "w", encoding="utf-8") as out_fp:
json.dump(result_json, out_fp, ensure_ascii=False)
prev_row = df_ball[df_ball["no"] == no - 1]
if prev_row.empty:
p_no, p_ball = no - 1, []
print("경고: no-1={} 행이 없어 이전 당첨 표시를 건너뜁니다.".format(no - 1))
else:
p_no = int(prev_row["no"].iloc[0])
p_ball = [int(prev_row["b{}".format(i)].iloc[0]) for i in range(1, 7)]
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format(
last_weekend, p_no, str(p_ball), ymd, no
)
for i, ball in enumerate(result_json[ymd]):
p_str += " {}. {}\n".format((i + 1), str(ball))
if (i + 1) % 100 == 0:
practice.bot.sendMsg("{}".format(p_str))
p_str = ""
if len(result_json[ymd]) % 100 != 0:
practice.bot.sendMsg("{}".format(p_str))
size = len(result_json[ymd])
print("recommend size: {}".format(size))
print("저장: {}".format(recommend_path))
print("done...")
if __name__ == "__main__":
main()