# 웹 호출 라이브러리를 호출합니다. import time import requests from DataCrawler import DataCrawler import json import os import pandas as pd import itertools from datetime import datetime, timedelta from TelegramBot import TelegramBot from final_BallFilter import BallFilter COST_PER_GAME = 1000 MAX_BUDGET_KRW = 70000 MAX_GAMES_PER_DRAW = MAX_BUDGET_KRW // COST_PER_GAME class Practice: bot = None preprocessor = None predictor = None extract_count = None def __init__(self, resources_path): self.bot = TelegramBot() return # 로또 당첨 데이터를 수집해서 파일로 저장합니다. # lottoHistoryFile: 로또 당첨 데이터를 저장할 파일 def craw(self, lottoHistoryFile, drwNo=None): ball = None if drwNo != None: # 로또 데이터를 저장할 파일을 선언합니다. jsonFp = open(lottoHistoryFile + ".json", 'a', encoding="utf-8") textFp = open(lottoHistoryFile + ".txt", 'a', encoding="utf-8") url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(drwNo) # URL을 호출합니다. res = requests.post(url) # 호출한 결과에 대해서 Json 포맷을 가져옵니다. result = res.json() if result['returnValue'] != 'success': return None # 가져온 Json 포맷을 파일로 저장합니다. jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n") textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo'])) print("%d,%d,%d,%d,%d,%d,%d,%d" % (drwNo, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo'])) ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']] else: # 로또 데이터를 저장할 파일을 선언합니다. jsonFp = open(lottoHistoryFile + ".json", 'w', encoding="utf-8") textFp = open(lottoHistoryFile + ".txt", 'w', encoding="utf-8") # 1회차부터 지정된 회차까지 로또 당첨 번호를 수집합니다. idx = 1 while True: # 1회차부터 지정된 회차까지의 URL을 생성합니다. url = 'https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo=' + str(idx) # URL을 호출합니다. res = requests.post(url) # 호출한 결과에 대해서 Json 포맷을 가져옵니다. result = res.json() if result['returnValue'] != 'success': break # 가져온 Json 포맷을 파일로 저장합니다. jsonFp.write(json.dumps(result, ensure_ascii=False) + "\n") textFp.write("%d,%d,%d,%d,%d,%d,%d,%d\n" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo'])) print("%d,%d,%d,%d,%d,%d,%d,%d" % (idx, result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo'])) ball = [result['drwtNo1'], result['drwtNo2'], result['drwtNo3'], result['drwtNo4'], result['drwtNo5'], result['drwtNo6'], result['bnusNo']] idx += 1 time.sleep(0.5) # 저장한 파일을 종료합니다. jsonFp.close() textFp.close() return ball def predict1(self, result_json): result_json.append([6, 7, 10, 11, 20, 45]) result_json.append([5, 12, 16, 27, 39, 45]) result_json.append([5, 15, 18, 29, 36, 41]) result_json.append([1, 17, 20, 25, 36, 45]) result_json.append([6, 15, 20, 23, 37, 43]) result_json.append([8, 15, 19, 23, 38, 41]) result_json.append([3, 14, 20, 27, 35, 45]) result_json.append([5, 11, 19, 24, 40, 45]) result_json.append([5, 9, 20, 25, 32, 37]) result_json.append([2, 13, 19, 27, 40, 43]) result_json.append([4, 13, 17, 28, 39, 43]) return def _can_add_ball(self, ball, fixed_balls, selected_balls, max_overlap): ball_set = set(ball) for fixed_ball in fixed_balls: if len(ball_set & set(fixed_ball)) > max_overlap: return False for selected_ball in selected_balls: if len(ball_set & set(selected_ball)) > max_overlap: return False return True def select_portfolio(self, fixed_balls, candidates, target_count): """ 2차 포트폴리오 선정: - 중복 제거 - 고정수/선정수 간 중복도(겹치는 번호 수) 제약을 단계적으로 완화하며 선택 """ unique_candidates = [] seen = set() fixed_keys = {tuple(sorted(fixed_ball)) for fixed_ball in fixed_balls} for candidate in candidates: key = tuple(sorted(candidate)) if key in seen or key in fixed_keys: continue seen.add(key) unique_candidates.append(list(key)) if target_count <= 0: return [] if len(unique_candidates) <= target_count: return unique_candidates selected = [] selected_keys = set() overlap_stages = [2, 3, 4, 5] for max_overlap in overlap_stages: for candidate in unique_candidates: key = tuple(candidate) if key in selected_keys: continue if self._can_add_ball(candidate, fixed_balls, selected, max_overlap): selected.append(candidate) selected_keys.add(key) if len(selected) >= target_count: return selected # 단계 완화 후에도 부족하면 남은 조합을 순서대로 채움 for candidate in unique_candidates: key = tuple(candidate) if key in selected_keys: continue selected.append(candidate) selected_keys.add(key) if len(selected) >= target_count: break return selected def predict2(self, resources_path, ymd, fixed_balls, max_games_per_draw=MAX_GAMES_PER_DRAW): candidates = [i for i in range(1, 46)] lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.json') ballFilter = BallFilter(lottoHistoryFileName) no = ballFilter.getNextNo(ymd) print("회차: {}".format(no)) lottoHistoryFileName = os.path.join(resources_path, 'lotto_history.txt') df_ball = pd.read_csv(lottoHistoryFileName, header=None) df_ball.columns = ['no', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'bn'] passed_candidates = [] nCr = list(itertools.combinations(candidates, 6)) for idx, ball in enumerate(nCr): if idx % 1000000 == 0: print(" - {} processed, pass: {}".format(idx, len(passed_candidates))) ball = list(ball) filter_type = ballFilter.filter(ball=ball, no=no, until_end=False, df=df_ball) filter_size = len(filter_type) if 0 < filter_size: continue passed_candidates.append(ball) variable_target_count = max(0, max_games_per_draw - len(fixed_balls)) selected_candidates = self.select_portfolio( fixed_balls=fixed_balls, candidates=passed_candidates, target_count=variable_target_count ) p_ball = df_ball[df_ball['no'] == no - 1].values.tolist()[0] p_no = p_ball[0] p_ball = p_ball[1:7] return p_no, p_ball, selected_candidates, len(passed_candidates), variable_target_count if __name__ == '__main__': PROJECT_HOME = '.' resources_path = os.path.join(PROJECT_HOME, 'resources') # 데이터 수집 #dataCrawler = DataCrawler() #dataCrawler.excute(resources_path) today = datetime.today() if today.weekday() == 5: if today.hour > 20: this_weekend = today + timedelta(days=(12 - today.weekday())) else: this_weekend = today + timedelta(days=(5 - today.weekday())) elif today.weekday() == 6: this_weekend = today + timedelta(days=(12 - today.weekday())) else: this_weekend = today + timedelta(days=(5 - today.weekday())) last_weekend = (this_weekend - timedelta(days=7)).strftime('%Y%m%d') ymd = this_weekend.strftime('%Y%m%d') print("ymd: {}".format(ymd)) # 로또 예측 practice = Practice(resources_path) recommend_result_file = os.path.join(resources_path, "recommend_ball.biz_25.json") if os.path.isfile(recommend_result_file): with open(recommend_result_file, "r", encoding="utf-8") as result_fp: result_json = json.load(result_fp) result_json[ymd] = [] else: result_json = {ymd: []} # 매주 고정 fixed_balls = [] practice.predict1(fixed_balls) result_json[ymd].extend(fixed_balls) # 필터 기반 예측 p_no, p_ball, selected_candidates, passed_count, variable_target_count = practice.predict2( resources_path=resources_path, ymd=ymd, fixed_balls=fixed_balls, max_games_per_draw=MAX_GAMES_PER_DRAW ) result_json[ymd].extend(selected_candidates) with open(recommend_result_file, 'w', encoding='utf-8') as outFp: json.dump(result_json, outFp, ensure_ascii=False) total_games = len(result_json[ymd]) total_cost = total_games * COST_PER_GAME p_str = "[지난주] {}\n - {} 회차, {}\n[금주] {}\n - {} 회차\n[모델#25]\n".format(last_weekend, p_no, str(p_ball), ymd, (p_no + 1)) p_str += " - 고정수: {}개\n".format(len(fixed_balls)) p_str += " - 필터 통과 후보: {}개\n".format(passed_count) p_str += " - 추가 선정: {}개 (목표 {}개)\n".format(len(selected_candidates), variable_target_count) p_str += " - 총 추천: {}개, 총 금액: {:,}원 (한도 {:,}원)\n".format(total_games, total_cost, MAX_BUDGET_KRW) for i, ball in enumerate(result_json[ymd]): p_str += " {}. {}\n".format((i+1), str(ball)) if (i+1) % 100 == 0: practice.bot.sendMsg("{}".format(p_str)) p_str = '' if len(result_json[ymd]) % 100 != 0: practice.bot.sendMsg("{}".format(p_str)) print("size: {}".format(total_games)) print("cost: {:,} KRW / limit: {:,} KRW".format(total_cost, MAX_BUDGET_KRW)) # https://youtu.be/QjBsui8Ob14?si=4dC3q8p0Yu5ZWK1K # https://www.youtube.com/watch?v=YwiHaa1KNwA print("done...")