490 lines
20 KiB
Python
490 lines
20 KiB
Python
# 웹 호출 라이브러리를 호출합니다.
|
|
import time
|
|
import requests
|
|
|
|
import json
|
|
import os
|
|
import copy
|
|
import pandas as pd
|
|
import itertools
|
|
from datetime import datetime, timedelta
|
|
from TelegramBot import TelegramBot
|
|
|
|
from filter_model_3 import BallFilter
|
|
|
|
class Practice:
|
|
|
|
bot = None
|
|
preprocessor = None
|
|
predictor = None
|
|
|
|
extract_count = None
|
|
TARGET_MIN_SURVIVORS = 30
|
|
TARGET_MAX_SURVIVORS = 150
|
|
PREDICT_TIMEOUT_SECONDS = 180
|
|
|
|
def __init__(self, resources_path):
|
|
self.bot = TelegramBot()
|
|
self.resources_path = resources_path
|
|
|
|
return
|
|
|
|
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
|
|
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일
|
|
def craw(self, lottoHistoryFile, drwNo=None):
|
|
|
|
ball = None
|
|
if drwNo != None:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8")
|
|
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
|
|
if result['returnValue'] != 'success':
|
|
return None
|
|
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
else:
|
|
# 로또 데이터를 저장할 파일을 선언합니다.
|
|
jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8")
|
|
textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8")
|
|
|
|
# 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다.
|
|
idx = 1
|
|
while True:
|
|
# 1회차부터 지정된 회차까지의 URL을 생성합니다.
|
|
url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx)
|
|
# URL을 호출합니다.
|
|
res = requests.post(url)
|
|
# 호출한 결과에 대해서 Json 포맷을 가져옵니다.
|
|
result = res.json()
|
|
if result['returnValue'] != 'success':
|
|
break
|
|
# 가져온 Json 포맷을 파일로 저장합니다.
|
|
jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']))
|
|
ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']]
|
|
idx += 1
|
|
time.sleep(0.5)
|
|
# 저장한 파일을 종료합니다.
|
|
jsonFp.close()
|
|
textFp.close()
|
|
|
|
return ball
|
|
|
|
def predict1(self, result_json):
|
|
result_json.append([6, 7, 10, 11, 20, 45])
|
|
result_json.append([2, 7, 17, 28, 35, 39])
|
|
result_json.append([6, 10, 19, 25, 33, 35])
|
|
result_json.append([3, 17, 20, 24, 35, 45])
|
|
result_json.append([5, 15, 18, 29, 36, 41])
|
|
result_json.append([6, 15, 20, 23, 37, 43])
|
|
result_json.append([8, 15, 19, 23, 38, 41])
|
|
result_json.append([5, 11, 19, 24, 40, 45])
|
|
result_json.append([9, 16, 18, 23, 35, 43])
|
|
result_json.append([7, 13, 19, 28, 33, 44])
|
|
result_json.append([7, 11, 18, 29, 37, 42])
|
|
return
|
|
|
|
def predict2(self, resources_path, ymd, result_json):
|
|
candidates = [i for i in range(1, 46)]
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json')
|
|
no = BallFilter(lottoHistoryFileName).getNextNo(ymd)
|
|
print("회차: {}".format(no))
|
|
predict_start_ts = time.time()
|
|
deadline_ts = predict_start_ts + self.PREDICT_TIMEOUT_SECONDS
|
|
|
|
lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt')
|
|
df_ball = pd.read_csv(lottoHistoryFileName, header=None)
|
|
df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn']
|
|
|
|
p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0]
|
|
p_no = p_ball[0]
|
|
p_ball = sorted(p_ball[1:7])
|
|
|
|
# 기본/강화/완화 단계별 ruleset
|
|
base_ruleset = self._get_base_ruleset()
|
|
tighten_rulesets = [
|
|
self._build_ruleset(
|
|
base_ruleset=base_ruleset,
|
|
enabled_overrides={
|
|
"paper_patterns": True,
|
|
"ban_triples_legacy": True,
|
|
"all_in_previous7": True,
|
|
"previous_neighbors": True,
|
|
},
|
|
allowed_overrides={
|
|
"ac_value": [8, 9],
|
|
"uniq_last_digit_count": [4, 5],
|
|
"even_count": [2, 3, 4],
|
|
},
|
|
),
|
|
self._build_ruleset(
|
|
base_ruleset=base_ruleset,
|
|
enabled_overrides={
|
|
"paper_patterns": True,
|
|
"ban_triples_legacy": True,
|
|
"all_in_previous7": True,
|
|
"previous_neighbors": True,
|
|
},
|
|
allowed_overrides={
|
|
"ac_value": [8, 9],
|
|
"uniq_last_digit_count": [4, 5],
|
|
"even_count": [2, 3, 4],
|
|
"sum": [112, 114, 121, 123, 126, 127, 131, 132, 138, 146, 148],
|
|
"sum_prev_diff": [13, 14, 17, 18, 26, 28, 29, 30, 32, 39, 40],
|
|
},
|
|
),
|
|
]
|
|
relax_rulesets = [
|
|
self._build_ruleset(
|
|
base_ruleset=base_ruleset,
|
|
enabled_overrides={
|
|
"paper_patterns": False,
|
|
"ban_triples_legacy": False,
|
|
},
|
|
),
|
|
self._build_ruleset(
|
|
base_ruleset=base_ruleset,
|
|
enabled_overrides={
|
|
"paper_patterns": False,
|
|
"ban_triples_legacy": False,
|
|
"previous_neighbors": False,
|
|
"all_in_previous7": False,
|
|
},
|
|
),
|
|
self._build_ruleset(
|
|
base_ruleset=base_ruleset,
|
|
enabled_overrides={
|
|
"paper_patterns": False,
|
|
"ban_triples_legacy": False,
|
|
"previous_neighbors": False,
|
|
"all_in_previous7": False,
|
|
"weeks_8_count": False,
|
|
"weeks_12_count": False,
|
|
"weeks_16_count": False,
|
|
"weeks_20_count": False,
|
|
},
|
|
),
|
|
]
|
|
|
|
min_survivors = self.TARGET_MIN_SURVIVORS
|
|
max_survivors = self.TARGET_MAX_SURVIVORS
|
|
chosen = []
|
|
stage_name = "base"
|
|
|
|
current_info = self._collect_candidates(
|
|
candidates=candidates,
|
|
no=no,
|
|
df_ball=df_ball,
|
|
ruleset=base_ruleset,
|
|
stop_when_gt=max_survivors,
|
|
stage_name="base",
|
|
predict_start_ts=predict_start_ts,
|
|
deadline_ts=deadline_ts,
|
|
)
|
|
current = current_info["candidates"]
|
|
if current_info["timed_out"]:
|
|
chosen = self._finalize_on_timeout(current, p_ball, min_survivors, max_survivors)
|
|
stage_name = "base_timeout_fallback"
|
|
print("candidate_stage: {}, survivors: {}".format(stage_name, len(chosen)))
|
|
for ball in chosen:
|
|
result_json.append(ball)
|
|
return p_no, p_ball
|
|
|
|
if min_survivors <= len(current) <= max_survivors:
|
|
chosen = current
|
|
elif len(current) > max_survivors:
|
|
chosen = current
|
|
stage_name = "base_overflow"
|
|
for idx, rs in enumerate(tighten_rulesets, start=1):
|
|
t_info = self._collect_candidates(
|
|
candidates=candidates,
|
|
no=no,
|
|
df_ball=df_ball,
|
|
ruleset=rs,
|
|
stop_when_gt=max_survivors,
|
|
stage_name="tighten_{}".format(idx),
|
|
predict_start_ts=predict_start_ts,
|
|
deadline_ts=deadline_ts,
|
|
)
|
|
t = t_info["candidates"]
|
|
if t_info["timed_out"]:
|
|
chosen = self._finalize_on_timeout(t, p_ball, min_survivors, max_survivors)
|
|
stage_name = "tighten_{}_timeout_fallback".format(idx)
|
|
break
|
|
if min_survivors <= len(t) <= max_survivors:
|
|
chosen = t
|
|
stage_name = "tighten_{}".format(idx)
|
|
break
|
|
if len(t) <= max_survivors:
|
|
chosen = t
|
|
stage_name = "tighten_{}".format(idx)
|
|
if len(chosen) > max_survivors:
|
|
# 상한 가드 강제 적용: 품질 점수 상위 N개만 사용
|
|
full_info = self._collect_candidates(
|
|
candidates=candidates,
|
|
no=no,
|
|
df_ball=df_ball,
|
|
ruleset=tighten_rulesets[-1],
|
|
stop_when_gt=None,
|
|
stage_name="tighten_full_rank",
|
|
predict_start_ts=predict_start_ts,
|
|
deadline_ts=deadline_ts,
|
|
)
|
|
full_for_ranking = full_info["candidates"]
|
|
if full_info["timed_out"]:
|
|
chosen = self._finalize_on_timeout(full_for_ranking, p_ball, min_survivors, max_survivors)
|
|
stage_name = "tighten_rank_timeout_fallback"
|
|
else:
|
|
chosen = self._rank_and_trim(full_for_ranking, p_ball, max_survivors)
|
|
stage_name = "tighten_rank_trim"
|
|
else:
|
|
chosen = current
|
|
stage_name = "base_underflow"
|
|
for idx, rs in enumerate(relax_rulesets, start=1):
|
|
# relax는 하한(min_survivors)만 채우면 충분하므로 조기 종료
|
|
r_info = self._collect_candidates(
|
|
candidates=candidates,
|
|
no=no,
|
|
df_ball=df_ball,
|
|
ruleset=rs,
|
|
stop_when_gt=None,
|
|
stop_when_gte=min_survivors,
|
|
stage_name="relax_{}".format(idx),
|
|
predict_start_ts=predict_start_ts,
|
|
deadline_ts=deadline_ts,
|
|
)
|
|
r = r_info["candidates"]
|
|
chosen = r
|
|
stage_name = "relax_{}".format(idx)
|
|
if r_info["timed_out"]:
|
|
chosen = self._finalize_on_timeout(r, p_ball, min_survivors, max_survivors)
|
|
stage_name = "relax_{}_timeout_fallback".format(idx)
|
|
break
|
|
if len(r) >= min_survivors:
|
|
break
|
|
|
|
if len(chosen) == 0:
|
|
# 0개 생존 방지: 가장 완화된 규칙에서도 0개면 직전 결과와 유사한 조합으로 최소 개수 확보
|
|
stage_name = "relax_zero_fallback"
|
|
chosen = self._fallback_candidates_from_prev(p_ball, min_survivors)
|
|
elif len(chosen) < min_survivors:
|
|
# 하한 가드: 부족분은 완화 후보/고정 후보 기반으로 보강
|
|
stage_name = "{}_fill".format(stage_name)
|
|
fill = self._fallback_candidates_from_prev(p_ball, min_survivors - len(chosen), exclude=set(tuple(x) for x in chosen))
|
|
chosen.extend(fill)
|
|
|
|
print("candidate_stage: {}, survivors: {}".format(stage_name, len(chosen)))
|
|
for ball in chosen:
|
|
result_json.append(ball)
|
|
return p_no, p_ball
|
|
|
|
def _get_base_ruleset(self):
|
|
history_json = os.path.join(self.resources_path, "lotto_history.json")
|
|
base_filter = BallFilter(history_json)
|
|
return copy.deepcopy(base_filter.m1.ruleset)
|
|
|
|
def _build_ruleset(self, base_ruleset, enabled_overrides=None, allowed_overrides=None):
|
|
ruleset = copy.deepcopy(base_ruleset)
|
|
ruleset.setdefault("filters", {})
|
|
enabled_overrides = enabled_overrides or {}
|
|
allowed_overrides = allowed_overrides or {}
|
|
for key, value in enabled_overrides.items():
|
|
ruleset["filters"].setdefault(key, {})
|
|
ruleset["filters"][key]["enabled"] = bool(value)
|
|
for key, values in allowed_overrides.items():
|
|
ruleset["filters"].setdefault(key, {})
|
|
ruleset["filters"][key]["enabled"] = True
|
|
ruleset["filters"][key]["allowed"] = list(values)
|
|
return ruleset
|
|
|
|
def _collect_candidates(
|
|
self,
|
|
candidates,
|
|
no,
|
|
df_ball,
|
|
ruleset,
|
|
stop_when_gt=None,
|
|
stop_when_gte=None,
|
|
stage_name="base",
|
|
predict_start_ts=None,
|
|
deadline_ts=None,
|
|
):
|
|
lottoHistoryFileName = os.path.join(self.resources_path, "lotto_history.json")
|
|
ballFilter = BallFilter(lottoHistoryFileName, ruleset=ruleset)
|
|
result = []
|
|
last_idx = 0
|
|
for idx, ball in enumerate(itertools.combinations(candidates, 6), start=1):
|
|
last_idx = idx
|
|
if deadline_ts is not None and deadline_ts <= time.time():
|
|
elapsed = (time.time() - predict_start_ts) if predict_start_ts is not None else 0.0
|
|
print(" - [{}] timeout after {:,} processed (elapsed: {:.1f}s, survivors: {:,})".format(stage_name, idx, elapsed, len(result)))
|
|
return {
|
|
"candidates": result,
|
|
"timed_out": True,
|
|
"processed": idx,
|
|
}
|
|
if idx % 1000000 == 0:
|
|
elapsed = (time.time() - predict_start_ts) if predict_start_ts is not None else 0.0
|
|
print(" - [{}] {:,} processed... (elapsed: {:.1f}s, survivors: {:,})".format(stage_name, idx, elapsed, len(result)))
|
|
b = list(ball)
|
|
if len(ballFilter.filter(ball=b, no=no, until_end=False, df=df_ball)) == 0:
|
|
result.append(b)
|
|
if stop_when_gt is not None and len(result) > stop_when_gt:
|
|
return {
|
|
"candidates": result,
|
|
"timed_out": False,
|
|
"processed": idx,
|
|
}
|
|
if stop_when_gte is not None and len(result) >= stop_when_gte:
|
|
return {
|
|
"candidates": result,
|
|
"timed_out": False,
|
|
"processed": idx,
|
|
}
|
|
return {
|
|
"candidates": result,
|
|
"timed_out": False,
|
|
"processed": last_idx,
|
|
}
|
|
|
|
def _finalize_on_timeout(self, partial_candidates, prev_ball, min_survivors, max_survivors):
|
|
chosen = list(partial_candidates)
|
|
if len(chosen) > max_survivors:
|
|
chosen = self._rank_and_trim(chosen, prev_ball, max_survivors)
|
|
elif len(chosen) < min_survivors:
|
|
fill = self._fallback_candidates_from_prev(
|
|
prev_ball,
|
|
min_survivors - len(chosen),
|
|
exclude=set(tuple(x) for x in chosen),
|
|
)
|
|
chosen.extend(fill)
|
|
return chosen
|
|
|
|
def _rank_and_trim(self, candidates, prev_ball, limit):
|
|
scored = [(self._score_candidate(ball, prev_ball), ball) for ball in candidates]
|
|
scored.sort(key=lambda x: x[0])
|
|
return [ball for _, ball in scored[:limit]]
|
|
|
|
def _score_candidate(self, ball, prev_ball):
|
|
sum_diff = abs(sum(ball) - sum(prev_ball))
|
|
even_cnt = len([x for x in ball if x % 2 == 0])
|
|
uniq_last = len(set([x % 10 for x in ball]))
|
|
contiguous_penalty = 0
|
|
s = sorted(ball)
|
|
for i in range(1, len(s)):
|
|
if s[i] - s[i - 1] == 1:
|
|
contiguous_penalty += 1
|
|
score = 0
|
|
score += sum_diff
|
|
score += abs(even_cnt - 3) * 2
|
|
score += abs(uniq_last - 5) * 2
|
|
score += contiguous_penalty
|
|
return score
|
|
|
|
def _fallback_candidates_from_prev(self, prev_ball, need_count, exclude=None):
|
|
exclude = exclude or set()
|
|
seed = sorted(prev_ball)
|
|
out = []
|
|
delta_patterns = [
|
|
(0, 0, 0, 0, 0, 0),
|
|
(-1, 0, 0, 0, 0, 1),
|
|
(0, -1, 0, 0, 1, 0),
|
|
(0, 0, -1, 1, 0, 0),
|
|
(-2, 0, 0, 0, 0, 2),
|
|
(0, -2, 0, 0, 2, 0),
|
|
(0, 0, -2, 2, 0, 0),
|
|
(-1, -1, 0, 0, 1, 1),
|
|
(1, 0, -1, 0, 0, 0),
|
|
(0, 1, 0, -1, 0, 0),
|
|
(1, -1, 1, -1, 1, -1),
|
|
(-1, 1, -1, 1, -1, 1),
|
|
]
|
|
shift = 0
|
|
while len(out) < need_count and shift <= 8:
|
|
for delta in delta_patterns:
|
|
cand = [seed[i] + delta[i] for i in range(6)]
|
|
cand = [min(45, max(1, v + shift)) for v in cand]
|
|
cand = sorted(cand)
|
|
if len(set(cand)) != 6:
|
|
continue
|
|
t = tuple(cand)
|
|
if t in exclude:
|
|
continue
|
|
exclude.add(t)
|
|
out.append(cand)
|
|
if len(out) >= need_count:
|
|
break
|
|
shift += 1
|
|
return out
|
|
|
|
if __name__ == '__main__':
|
|
|
|
PROJECT_HOME = '.'
|
|
resources_path = os.path.join(PROJECT_HOME, 'resources')
|
|
|
|
today = datetime.today()
|
|
if today.weekday() == 5:
|
|
if today.hour > 20:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
elif today.weekday() == 6:
|
|
this_weekend = today + timedelta(days=(12 - today.weekday()))
|
|
else:
|
|
this_weekend = today + timedelta(days=(5 - today.weekday()))
|
|
|
|
last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d')
|
|
ymd = this_weekend.strftime('%Y%m%d')
|
|
|
|
print("ymd: {}".format(ymd))
|
|
|
|
# 로또 예측
|
|
practice = Practice(resources_path)
|
|
|
|
# 데이터 수집
|
|
lottoHistoryFile = PROJECT_HOME + '/resources/lotto_history'
|
|
lottoHistoryFileName = lottoHistoryFile + '.json'
|
|
with open(lottoHistoryFileName, "r", encoding='utf-8') as f:
|
|
for line in f:
|
|
if line != '\n':
|
|
last_json = json.loads(line)
|
|
|
|
#ball = practice.craw(lottoHistoryFile, drwNo=last_json['drwNo'] + 1)
|
|
|
|
result_json = {ymd: []}
|
|
|
|
# 매주 고정
|
|
practice.predict1(result_json[ymd])
|
|
# 필터 기반 예측
|
|
p_no, p_ball = practice.predict2(resources_path, ymd, result_json[ymd])
|
|
|
|
p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1))
|
|
for i, ball in enumerate(result_json[ymd]):
|
|
p_str += " {}. {}\n".format((i+1), str(ball))
|
|
if (i+1) % 100 == 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
p_str = ''
|
|
|
|
if len(result_json[ymd]) % 100 != 0:
|
|
practice.bot.sendMsg("{}".format(p_str))
|
|
|
|
size = len(result_json[ymd])
|
|
print("size: {}".format(size))
|
|
|
|
# https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K
|
|
# https://www.youtube.com/watch?v=YwiHaa1KNwA
|
|
|
|
print("done...") |