Practice/FilterTest: 인스턴스 상태, 정렬 일관성, 고정 5조합·중복·과거당첨 제외; README 정리

Made-with: Cursor
This commit is contained in:
2026-04-12 11:21:49 +09:00
parent a6b170fefa
commit bd9eea2aee
3 changed files with 244 additions and 205 deletions

View File

@@ -15,7 +15,7 @@
- **`filter_model.py`** — `from final_BallFilter import BallFilter` 재노출.
- **`train.py` / `valid.py`** — 구간별로 당첨 6개가 모든 필터를 통과한 회차 수 집계.
- **`final_filterTest.py`** — `1_FilterTest_25.py`와 동일한 분석·(선택) MC 생존 추정.
- **`final_practice.py`** — `3_Practice_22.py`와 동일 흐름(DataCrawler → 마지막 JSON 회차+1 크롤 → `predict1`+`predict2`). `lotto_history.txt``BallFilter`를 만들고 회차는 `max(no)+1`, `predict2`는 전 조합 순회(시간 매우 김).
- **`final_Practice.py`** — DataCrawler → 마지막 JSON 회차+1 크롤 → `predict1`+`predict2`. `lotto_history.json``BallFilter` 한 번 생성 후 공유. `predict1`은 고정 5조합(기존 1 + 미당첨 4, `hasWon`으로 제외) 후 `predict2`는 정렬된 6개·`seen`·과거 당첨 조합 제외 후 필터.
## 실행 (miniconda **ncue**)

View File

@@ -1,38 +1,33 @@
"""
`1_FilterTest_25.py`와 동일한 역할이며 `final_BallFilter.BallFilter` + `lotto_history.txt`를 사용합니다.
"""
import argparse
import itertools
import os
import random
import pandas as pd
import itertools
from final_BallFilter import BallFilter
import time
import datetime
import pandas as pd
from final_BallFilter import BallFilter
class FilterTest:
ballFilter = None
def __init__(self, resources_path):
lotto_path = os.path.join(resources_path, "lotto_history.txt")
self.ballFilter = BallFilter(lotto_path)
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
self.ballFilter = BallFilter(lottoHistoryFileName)
return
def find_filter_method(self, df_ball, filter_ball=None):
win_count = 0
no_filter_ball = {}
printLog = True
filter_dic = {}
filter_dic_len = {}
filter_dic_1 = {}
filter_dic_2 = {}
for i in range(len(df_ball) - 1, 19, -1):
no = df_ball["no"].iloc[i]
answer = df_ball[df_ball["no"] == no].values.tolist()[0]
answer = sorted(int(x) for x in answer[1:7])
for i in range(len(df_ball)-1, 19, -1):
no = df_ball['no'].iloc[i]
answer = df_ball[df_ball['no'] == no].values.tolist()[0]
answer = sorted(answer[1:7])
filter_type = self.ballFilter.filter(ball=answer, no=no, until_end=True, df=df_ball)
filter_type = list(filter_type)
@@ -44,23 +39,36 @@ class FilterTest:
print("\t", no)
elif size == 1:
key = filter_type[0]
filter_dic_1[key] = filter_dic_1.get(key, 0) + 1
if key not in filter_dic_1:
filter_dic_1[key] = 1
else:
filter_dic_1[key] += 1
if printLog:
print("\t", no, filter_type)
elif size == 2:
key = ",".join(filter_type)
filter_dic_2[key] = filter_dic_2.get(key, 0) + 1
key = ','.join(filter_type)
if key not in filter_dic_2:
filter_dic_2[key] = 1
else:
filter_dic_2[key] += 1
if printLog:
print("\t", no, filter_type)
else:
if printLog:
print("\t", no, filter_type)
# 회차별 필터개수가 적은 것을 정렬하기 위함
if size not in filter_dic_len:
filter_dic_len[size] = []
filter_dic_len[size].append(filter_type)
for f_t in filter_type:
filter_dic[f_t] = filter_dic.get(f_t, 0) + 1
if f_t not in filter_dic:
filter_dic[f_t] = 1
else:
filter_dic[f_t] += 1
print("\n\t[필터 개수가 적은 것부터 최적화를 위함]")
sorted_filter_dic_len = sorted(filter_dic_len.keys())
@@ -93,28 +101,37 @@ class FilterTest:
def find_final_candidates(self, no, df_ball, filter_ball=None):
final_candidates = []
generation_balls = list(range(1, 46))
nCr = list(itertools.combinations(generation_balls, 6))
for idx, ball in enumerate(nCr):
if idx % 1000000 == 0:
print(" - {} processed...".format(idx))
if filter_ball is not None and 0 < len(set(ball) & set(filter_ball)):
continue
filter_type = self.ballFilter.filter(ball=list(ball), no=no, until_end=False, df=df_ball)
if filter_type:
ball = sorted(list(ball))
filter_type = self.ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
filter_size = len(filter_type)
if filter_size:
continue
final_candidates.append(ball)
return final_candidates
def check_filter_method(self, df_ball, p_win_count, filter_ball=None):
win_count = 0
for i in range(len(df_ball) - 1, 0, -1):
no = df_ball["no"].iloc[i]
answer = df_ball[df_ball["no"] == no].values.tolist()[0]
answer = sorted(int(x) for x in answer[1:7])
for i in range(len(df_ball)-1, 0, -1):
no = df_ball['no'].iloc[i]
answer = df_ball[df_ball['no'] == no].values.tolist()[0]
answer = sorted(answer[1:7])
if filter_ball is not None and len(set(answer) & set(filter_ball)):
continue
@@ -125,43 +142,74 @@ class FilterTest:
win_count += 1
print("\t\t>{}. {}".format(no, answer))
print("\n\t> {} / {} p_win_count, {} total".format(win_count, p_win_count, len(df_ball) - 1))
print("\n\t> {} / {} p_win_count, {} total".format( win_count, p_win_count, len(df_ball)-1) )
def estimate_survivors_mc(self, no, df_ball, n_samples=8000, seed=0):
"""전수(814만) 대신 무작위 조합으로 생존 비율을 추정해 대략적인 생존 개수를 반환합니다."""
rng = random.Random(seed)
generation_balls = list(range(1, 46))
total = 8145060
hits = 0
for _ in range(n_samples):
ball = sorted(rng.sample(generation_balls, 6))
fts = self.ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
if not fts:
hits += 1
est = int(round(total * (hits / n_samples)))
return est, hits, n_samples
return
def validate(self, df_ball, nos=None):
win_history = {}
for no in nos:
print(no, "processing...")
answer = df_ball[df_ball['no'] == no].values.tolist()[0]
answer = sorted(answer[1:7])
generation_balls = list(range(1, 46))
nCr = list(itertools.combinations(generation_balls, 6))
for idx, ball in enumerate(nCr):
if idx % 1000000 == 0:
print(" - {} processed...".format(idx))
ball = sorted(list(ball))
filter_type = self.ballFilter.extract_final_candidates(ball, no=no, until_end=True, df=df_ball)
if 0 == len(filter_type) and len(set(answer)&set(ball))==6:
win_history[no] = answer
print("win.. no: {}, answer: {}".format(no, str(answer)))
break
return win_history
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--resources", default="resources")
parser.add_argument("--mc-no", type=int, default=None, help="생존 MC 추정을 할 회차 번호")
parser.add_argument("--mc-samples", type=int, default=8000)
args = parser.parse_args()
if __name__ == '__main__':
resources_path = args.resources
lottoHistoryFileName = os.path.join(resources_path, "lotto_history.txt")
resources_path = 'resources'
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"]
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
filter_ball=[]
filterTest = FilterTest(resources_path)
print("STEP #1. 필터 방법 추출")
start = time.time()
win_count = filterTest.find_filter_method(df_ball)
win_count = filterTest.find_filter_method(df_ball, filter_ball)
process_time = datetime.timedelta(seconds=time.time() - start)
print("process_time: ", process_time)
if args.mc_no is not None:
est, h, n = filterTest.estimate_survivors_mc(args.mc_no, df_ball, n_samples=args.mc_samples)
print(f"MC 생존 추정 (회차 {args.mc_no}): 약 {est}개 (표본 통과 {h}/{n})")
"""
print("\n\n")
no = df_ball['no'].values[-1]
ball = df_ball[df_ball['no'] == no].values.tolist()[0]
answer = ball[1:7]
print("STEP #0. 최종 후보 선정")
start = time.time()
final_candidates = filterTest.find_final_candidates(no, df_ball, filter_ball=None)
process_time = datetime.timedelta(seconds=time.time() - start)
print("process_time: ", process_time)
print(" > size: {}".format(len(final_candidates)))
file_name = os.path.join(resources_path, 'final_candidates.biz_a1.txt')
with open(file_name, 'w+') as outFp:
for ball in final_candidates:
ball_str = [str(b) for b in answer]
outFp.write("{}\n".format(','.join(ball_str)))
print('{}회, 정답: {}\n'.format(no, str(answer)))
"""
#print("\n\n")
#print("STEP #2. 당첨 회수 확인")
#filterTest.check_filter_method(df_ball, win_count)
# 오리지널 버전 (자질 파일에 고정): 당첨 22개

View File

@@ -1,173 +1,158 @@
# `3_Practice_22.py`와 동일한 흐름입니다.
# - `resources/lotto_history.txt`(및 크롤 시 `.json`)를 사용합니다.
# - 필터만 `final_BallFilter.BallFilter` + `lotto_history.txt` 로딩으로 교체했습니다.
# - 회차 번호는 JSON의 `getNo(ymd)` 대신 `lotto_history.txt` 최대 회차 + 1 을 사용합니다.
import itertools
# 웹 호출 라이브러리를 호출합니다.
import time
import requests
from DataCrawler import DataCrawler
import json
import os
import time
from datetime import datetime, timedelta
import pandas as pd
import itertools
from datetime import datetime, timedelta
from TelegramBot import TelegramBot
from DataCrawler import DataCrawler
from final_BallFilter import BallFilter
try:
from TelegramBot import TelegramBot
except Exception: # pragma: no cover
class TelegramBot:
def __init__(self, enable=True):
pass
def sendMsg(self, msg):
print(msg)
# predict1: 기존 1개 + 과거 6개 번호 당첨 이력이 없는 조합 4개 (resources/lotto_history.json 기준으로 검증)
_PREDICT1_FIXED_NEVER_DRAWN_EXTRA = (
[2, 4, 7, 17, 18, 39],
[3, 21, 24, 40, 42, 43],
[6, 9, 16, 22, 28, 29],
[12, 17, 19, 26, 40, 42],
)
def fetch_lotto_draw_json(drw_no: int):
"""동행복권 API 한 건. SSL·POST/GET 재시도는 DataCrawler._fetch_draw와 동일."""
return DataCrawler()._fetch_draw(int(drw_no))
class FinalPractice:
bot = None
class Practice:
def __init__(self, resources_path):
self.bot = TelegramBot()
self.preprocessor = None
self.predictor = None
self.extract_count = None
return
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일
def craw(self, lottoHistoryFile, drwNo=None):
ball = None
if drwNo is not None:
result = fetch_lotto_draw_json(drwNo)
if result is None:
print("경고: 회차 {} API 조회 실패(SSL/네트워크 또는 미추첨).".format(drwNo))
if drwNo != None:
# 로또 데이터를 저장할 파일을 선언합니다.
jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8")
textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8")
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo)
# URL을 호출합니다.
res = requests.post(url)
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
result = res.json()
if result['returnValue'] != 'success':
return None
jsonFp = open(lottoHistoryFile + ".json", "a", encoding="utf-8")
textFp = open(lottoHistoryFile + ".txt", "a", encoding="utf-8")
# 가져온 Json 포맷을 파일로 저장합니다.
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
textFp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
drwNo,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
print(
"%d,%d,%d,%d,%d,%d,%d,%d"
% (
drwNo,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
ball = [
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
]
jsonFp.close()
textFp.close()
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
else:
jsonFp = open(lottoHistoryFile + ".json", "w", encoding="utf-8")
textFp = open(lottoHistoryFile + ".txt", "w", encoding="utf-8")
# 로또 데이터를 저장할 파일을 선언합니다.
jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8")
textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8")
# 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다.
idx = 1
while True:
result = fetch_lotto_draw_json(idx)
if result is None or result.get("returnValue") != "success":
# 1회차부터 지정된 회차까지의 URL을 생성합니다.
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx)
# URL을 호출합니다.
res = requests.post(url)
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
result = res.json()
if result['returnValue'] != 'success':
break
# 가져온 Json 포맷을 파일로 저장합니다.
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
textFp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
idx,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
print(
"%d,%d,%d,%d,%d,%d,%d,%d"
% (
idx,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
ball = [
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
]
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
idx += 1
time.sleep(0.5)
jsonFp.close()
textFp.close()
# 저장한 파일을 종료합니다.
jsonFp.close()
textFp.close()
return ball
def predict1(self, result_json):
result_json.append([6, 7, 10, 11, 20, 45])
def predict1(self, result_json, ball_filter):
fixed_rows = [[6, 7, 10, 11, 20, 45]]
fixed_rows.extend([list(x) for x in _PREDICT1_FIXED_NEVER_DRAWN_EXTRA])
seen = set()
for ball in fixed_rows:
ball = sorted(ball)
key = tuple(ball)
if key in seen:
continue
if ball_filter.hasWon(ball):
continue
seen.add(key)
result_json.append(ball)
return
def predict2(self, resources_path, ymd, result_json, ball_filter=None):
def predict2(self, resources_path, ymd, result_json):
candidates = [i for i in range(1, 46)]
lottoHistoryFileName = os.path.join(resources_path, "lotto_history.txt")
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ["no", "b1", "b2", "b3", "b4", "b5", "b6", "bn"]
no = int(df_ball["no"].max()) + 1
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
if ball_filter is None:
ball_filter = BallFilter(lottoHistoryFileName)
no = ball_filter.getNextNo(ymd)
print("회차: {}".format(no))
ballFilter = BallFilter(lottoHistoryFileName)
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
seen = set()
for row in result_json:
seen.add(tuple(sorted(row)))
#filter_ball=[1,2,4,6,10,11,11,17,18,20,21,22,23,24,26,27,28,30,31,32,33,34,37,38,39,40,42,44]
nCr = list(itertools.combinations(candidates, 6))
for idx, ball in enumerate(nCr):
if idx % 1000000 == 0:
print(" - {} processed...".format(idx))
ball = list(ball)
filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball)
ball = sorted(list(ball))
key = tuple(ball)
if key in seen:
continue
if ball_filter.hasWon(ball):
continue
filter_type = ball_filter.filter(ball=ball, no=no, until_end=False, df=df_ball)
filter_size = len(filter_type)
if 0 < filter_size:
continue
result_json.append(ball)
p_ball = df_ball[df_ball["no"] == no - 1].values.tolist()[0]
result_json.append(ball)
seen.add(key)
p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
p_no = p_ball[0]
p_ball = p_ball[1:7]
return p_no, p_ball
if __name__ == '__main__':
if __name__ == "__main__":
PROJECT_HOME = "."
resources_path = os.path.join(PROJECT_HOME, "resources")
PROJECT_HOME = '.'
resources_path = os.path.join(PROJECT_HOME, 'resources')
# 데이터 수집
dataCrawler = DataCrawler()
dataCrawler.excute(resources_path)
@@ -182,46 +167,49 @@ if __name__ == "__main__":
else:
this_weekend = today + timedelta(days=(5 - today.weekday()))
last_weekend = (this_weekend - timedelta(days=7)).strftime("%Y%m%d")
ymd = this_weekend.strftime("%Y%m%d")
last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d')
ymd = this_weekend.strftime('%Y%m%d')
print("ymd: {}".format(ymd))
practice = FinalPractice(resources_path)
# 로또 예측
practice = Practice(resources_path)
lottoHistoryFile = PROJECT_HOME + "/resources/lotto_history"
lottoHistoryFileName = lottoHistoryFile + ".json"
with open(lottoHistoryFileName, "r", encoding="utf-8") as f:
# 데이터 수집
lottoHistoryFile = PROJECT_HOME + '/resources/lotto_history'
lottoHistoryFileName = lottoHistoryFile + '.json'
with open(lottoHistoryFileName, "r", encoding='utf-8') as f:
for line in f:
pass
last_json = json.loads(line)
if line != '\n':
last_json = json.loads(line)
ball = practice.craw(lottoHistoryFile, drwNo=last_json["drwNo"] + 1)
ball = practice.craw(lottoHistoryFile, drwNo=last_json['drwNo'] + 1)
recommend_result_file = os.path.join(resources_path, "recommend_ball.final.json")
recommend_result_file = os.path.join(resources_path, "recommend_ball.biz_25.json")
if os.path.isfile(recommend_result_file):
result_fp = open(recommend_result_file, "r", encoding="utf-8")
result_fp = open(recommend_result_file, "r")
result_json = json.load(result_fp)
result_fp.close()
result_json[ymd] = []
else:
result_json = {ymd: []}
practice.predict1(result_json[ymd])
p_no, p_ball = practice.predict2(resources_path, ymd, result_json[ymd])
lotto_json_for_filter = os.path.join(resources_path, 'lotto_history.json')
ball_filter = BallFilter(lotto_json_for_filter)
with open(recommend_result_file, "w", encoding="utf-8") as outFp:
# 매주 고정(과거 당첨 6개 조합 제외·중복 제외는 predict1 내부)
practice.predict1(result_json[ymd], ball_filter)
# 필터 기반 예측
p_no, p_ball = practice.predict2(resources_path, ymd, result_json[ymd], ball_filter)
with open(recommend_result_file, 'w', encoding='utf-8') as outFp:
json.dump(result_json, outFp, ensure_ascii=False)
no_predict = int(p_no) + 1
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[final_BallFilter]\n".format(
last_weekend, p_no, str(p_ball), ymd, no_predict
)
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
for i, ball in enumerate(result_json[ymd]):
p_str += " {}. {}\n".format((i + 1), str(ball))
if (i + 1) % 100 == 0:
p_str += " {}. {}\n".format((i+1), str(ball))
if (i+1) % 100 == 0:
practice.bot.sendMsg("{}".format(p_str))
p_str = ""
p_str = ''
if len(result_json[ymd]) % 100 != 0:
practice.bot.sendMsg("{}".format(p_str))
@@ -229,4 +217,7 @@ if __name__ == "__main__":
size = len(result_json[ymd])
print("size: {}".format(size))
print("done...")
# https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K
# https://www.youtube.com/watch?v=YwiHaa1KNwA
print("done...")