Keep the fixed 11-number set intact while adding a second-stage portfolio selection that caps final recommendations to the 70,000 KRW budget, and update docs/data/scripts to match the current project structure and runtime flow. Co-authored-by: Cursor <cursoragent@cursor.com>
402 lines
15 KiB
Python
402 lines
15 KiB
Python
# 웹 호출 라이브러리를 호출합니다.
|
|
import os
|
|
import time
|
|
import requests
|
|
# JSON 포맷을 다루기 위한 라이브러리를 호출합니다.
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import random
|
|
import socket
|
|
|
|
import urllib3
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
try:
|
|
from TelegramBot import TelegramBot
|
|
except ModuleNotFoundError:
|
|
class TelegramBot:
|
|
def __init__(self, enable=True):
|
|
pass
|
|
|
|
def sendMsg(self, msg):
|
|
pass
|
|
|
|
_LOTTO_URLS = (
|
|
"https://www.dhlottery.co.kr/common.do?method=getLottoNumber&drwNo={}",
|
|
"https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo={}",
|
|
)
|
|
_REQUEST_TIMEOUT = float(os.environ.get("LOTTO_REQUEST_TIMEOUT", "12"))
|
|
_FETCH_RETRIES_PER_DRAW = int(os.environ.get("LOTTO_FETCH_RETRIES", "3"))
|
|
_BACKOFF_BASE_SECONDS = float(os.environ.get("LOTTO_BACKOFF_BASE", "0.7"))
|
|
_MAX_CONSECUTIVE_FETCH_FAILURES = int(os.environ.get("LOTTO_MAX_CONSEC_FAIL", "8"))
|
|
_CONNECTION_PROBE_TIMEOUT = float(os.environ.get("LOTTO_PROBE_TIMEOUT", "3"))
|
|
_BROWSER_HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
|
|
),
|
|
"Accept": "application/json, text/javascript, */*; q=0.01",
|
|
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
"Referer": "https://www.dhlottery.co.kr/gameResult.do?method=byWin",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
}
|
|
|
|
|
|
def _ssl_verify_arg():
|
|
try:
|
|
import certifi
|
|
|
|
return certifi.where()
|
|
except ImportError:
|
|
return True
|
|
|
|
# 로또 데이터를 수집하기 위한 파이썬 클래스를 선언합니다.
|
|
class DataCrawler:
|
|
|
|
bot = None
|
|
|
|
# 클래스 생성자로 수집할 회차를 입력받습니다.
|
|
def __init__(self):
|
|
self.bot = TelegramBot()
|
|
self._session = requests.Session()
|
|
self._session.headers.update(_BROWSER_HEADERS)
|
|
self._last_fetch_error = ""
|
|
|
|
def _can_reach_lottery_host(self):
|
|
"""
|
|
API 호스트 TCP 연결 가능 여부를 빠르게 확인합니다.
|
|
완전한 보장은 아니지만, 완전 차단 상태를 조기 감지해 불필요한 대기 시간을 줄입니다.
|
|
"""
|
|
for host in ("www.dhlottery.co.kr", "dhlottery.co.kr"):
|
|
try:
|
|
with socket.create_connection((host, 443), timeout=_CONNECTION_PROBE_TIMEOUT):
|
|
return True
|
|
except OSError:
|
|
continue
|
|
return False
|
|
|
|
def _fetch_draw(self, drw_no):
|
|
"""동행복권 API에서 단일 회차 결과를 가져옵니다. 실패 시 None."""
|
|
self._last_fetch_error = ""
|
|
verify_options = (_ssl_verify_arg(), False)
|
|
last_error = "unknown"
|
|
for attempt in range(1, _FETCH_RETRIES_PER_DRAW + 1):
|
|
for raw_url in _LOTTO_URLS:
|
|
url = raw_url.format(int(drw_no))
|
|
for verify in verify_options:
|
|
for method in ("POST", "GET"):
|
|
try:
|
|
res = self._session.request(
|
|
method,
|
|
url,
|
|
timeout=_REQUEST_TIMEOUT,
|
|
verify=verify,
|
|
)
|
|
if res.status_code != 200:
|
|
last_error = "http {}".format(res.status_code)
|
|
continue
|
|
text = res.text.strip()
|
|
if not text.startswith("{"):
|
|
last_error = "non-json response"
|
|
continue
|
|
result = json.loads(text)
|
|
except (
|
|
requests.RequestException,
|
|
ValueError,
|
|
json.JSONDecodeError,
|
|
) as ex:
|
|
last_error = str(ex)
|
|
continue
|
|
if isinstance(result, dict) and result.get("returnValue") == "success":
|
|
return result
|
|
rv = result.get("returnValue") if isinstance(result, dict) else "unknown"
|
|
last_error = "api returnValue={}".format(rv)
|
|
if attempt < _FETCH_RETRIES_PER_DRAW:
|
|
# 지수 백오프 + 지터로 일시적 네트워크 혼잡 완화
|
|
delay = _BACKOFF_BASE_SECONDS * (2 ** (attempt - 1)) + random.uniform(0, 0.25)
|
|
time.sleep(delay)
|
|
self._last_fetch_error = last_error
|
|
return None
|
|
|
|
def _append_draw_files(self, lottoHistoryFile, result):
|
|
"""성공 응답 한 건을 txt/json에 이어 씁니다."""
|
|
drw_no = result["drwNo"]
|
|
json_path = lottoHistoryFile + ".json"
|
|
txt_path = lottoHistoryFile + ".txt"
|
|
with open(json_path, "a", encoding="utf-8") as json_fp:
|
|
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
with open(txt_path, "a", encoding="utf-8") as text_fp:
|
|
text_fp.write(
|
|
"%d,%d,%d,%d,%d,%d,%d,%d\n"
|
|
% (
|
|
drw_no,
|
|
result["drwtNo1"],
|
|
result["drwtNo2"],
|
|
result["drwtNo3"],
|
|
result["drwtNo4"],
|
|
result["drwtNo5"],
|
|
result["drwtNo6"],
|
|
result["bnusNo"],
|
|
)
|
|
)
|
|
|
|
def _read_last_draw_from_json(self, json_path):
|
|
"""JSONL 마지막 유효 레코드의 drwNo를 반환. 없으면 None."""
|
|
if not os.path.isfile(json_path) or os.path.getsize(json_path) == 0:
|
|
return None
|
|
last_json = None
|
|
with open(json_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
last_json = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if not last_json or last_json.get("returnValue") != "success":
|
|
return None
|
|
return last_json.get("drwNo")
|
|
|
|
def _read_draw_map_from_json(self, json_path):
|
|
"""
|
|
JSONL 전체를 읽어 drwNo -> record 맵으로 반환합니다.
|
|
잘못된 라인/중복 라인은 정리 대상이며, 마지막 유효값을 유지합니다.
|
|
"""
|
|
draw_map = {}
|
|
if not os.path.isfile(json_path) or os.path.getsize(json_path) == 0:
|
|
return draw_map
|
|
|
|
with open(json_path, "r", encoding="utf-8") as fp:
|
|
for line in fp:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if (
|
|
isinstance(data, dict)
|
|
and data.get("returnValue") == "success"
|
|
and isinstance(data.get("drwNo"), int)
|
|
):
|
|
draw_map[data["drwNo"]] = data
|
|
|
|
return draw_map
|
|
|
|
def _write_draw_map_files(self, lottoHistoryFile, draw_map):
|
|
"""
|
|
drwNo 오름차순으로 json/txt를 재생성합니다.
|
|
누락 회차 보강/중복 제거 후 일관된 파일 상태를 보장합니다.
|
|
"""
|
|
json_path = lottoHistoryFile + ".json"
|
|
txt_path = lottoHistoryFile + ".txt"
|
|
with open(json_path, "w", encoding="utf-8") as json_fp, open(
|
|
txt_path, "w", encoding="utf-8"
|
|
) as text_fp:
|
|
for drw_no in sorted(draw_map.keys()):
|
|
result = draw_map[drw_no]
|
|
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
text_fp.write(
|
|
"%d,%d,%d,%d,%d,%d,%d,%d\n"
|
|
% (
|
|
drw_no,
|
|
result["drwtNo1"],
|
|
result["drwtNo2"],
|
|
result["drwtNo3"],
|
|
result["drwtNo4"],
|
|
result["drwtNo5"],
|
|
result["drwtNo6"],
|
|
result["bnusNo"],
|
|
)
|
|
)
|
|
|
|
def _get_last_week_draw_date(self):
|
|
"""
|
|
'지난 주' 기준 토요일 날짜를 반환합니다.
|
|
예: 금요일(2026-05-08) 실행 시 직전 토요일(2026-05-02)
|
|
"""
|
|
now = datetime.now()
|
|
days_since_saturday = (now.weekday() - 5) % 7
|
|
latest_saturday = now.date() - timedelta(days=days_since_saturday)
|
|
# 토요일이면서 추첨 전(20시 이전)이라면 지난 주 토요일을 목표로 사용
|
|
if now.weekday() == 5 and now.hour < 20:
|
|
latest_saturday = latest_saturday - timedelta(days=7)
|
|
return latest_saturday
|
|
|
|
def _estimate_target_draw_no(self, draw_map):
|
|
"""
|
|
기존 데이터의 마지막 drwNoDate와 지난 주 토요일을 비교해
|
|
이번 실행에서 확보해야 할 목표 회차를 계산합니다.
|
|
"""
|
|
if not draw_map:
|
|
return None
|
|
|
|
last_no = max(draw_map.keys())
|
|
last_data = draw_map[last_no]
|
|
last_date_str = last_data.get("drwNoDate", "")
|
|
try:
|
|
last_date = datetime.strptime(last_date_str, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
return last_no
|
|
|
|
target_date = self._get_last_week_draw_date()
|
|
if target_date <= last_date:
|
|
return last_no
|
|
|
|
week_gap = (target_date - last_date).days // 7
|
|
if week_gap <= 0:
|
|
return last_no
|
|
|
|
return last_no + week_gap
|
|
|
|
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
|
|
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일 (확장자 제외)
|
|
def craw(self, lottoHistoryFile, drwNo=None):
|
|
|
|
if drwNo is not None:
|
|
result = self._fetch_draw(drwNo)
|
|
if result is None:
|
|
return False
|
|
self._append_draw_files(lottoHistoryFile, result)
|
|
return True
|
|
|
|
json_path = lottoHistoryFile + ".json"
|
|
text_path = lottoHistoryFile + ".txt"
|
|
with open(json_path, "w", encoding="utf-8") as json_fp, open(
|
|
text_path, "w", encoding="utf-8"
|
|
) as text_fp:
|
|
idx = 1
|
|
while True:
|
|
result = self._fetch_draw(idx)
|
|
if result is None:
|
|
break
|
|
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
|
|
text_fp.write(
|
|
"%d,%d,%d,%d,%d,%d,%d,%d\n"
|
|
% (
|
|
idx,
|
|
result["drwtNo1"],
|
|
result["drwtNo2"],
|
|
result["drwtNo3"],
|
|
result["drwtNo4"],
|
|
result["drwtNo5"],
|
|
result["drwtNo6"],
|
|
result["bnusNo"],
|
|
)
|
|
)
|
|
idx += 1
|
|
time.sleep(0.5)
|
|
return True
|
|
|
|
def excute(self, resource_path):
|
|
"""
|
|
resources/lotto_history.* 를 지난 주 기준으로 누락 없이 동기화합니다.
|
|
- 마지막 회차+1만 확인하지 않고, 1~목표회차 범위에서 누락 회차를 탐지/보강
|
|
- 중복/깨진 라인을 정리해 json/txt를 일관 상태로 재생성
|
|
"""
|
|
lottoHistoryFile = os.path.join(resource_path, "lotto_history")
|
|
json_path = lottoHistoryFile + ".json"
|
|
|
|
draw_map = self._read_draw_map_from_json(json_path)
|
|
|
|
# 기존 이력이 비었거나 깨졌으면 전체 재수집(기존 동작 유지)
|
|
if not draw_map:
|
|
try:
|
|
self.craw(lottoHistoryFile)
|
|
self.bot.sendMsg("[Lottery Crawler] full history rebuilt (no valid json).")
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
target_no = self._estimate_target_draw_no(draw_map)
|
|
if target_no is None:
|
|
target_no = max(draw_map.keys())
|
|
|
|
if not self._can_reach_lottery_host():
|
|
msg = "[Lottery Crawler] network blocked: cannot reach dhlottery host."
|
|
print(msg)
|
|
try:
|
|
self.bot.sendMsg(msg)
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
missing_nos = [no for no in range(1, target_no + 1) if no not in draw_map]
|
|
added = 0
|
|
failed = []
|
|
aborted_missing_nos = []
|
|
consecutive_failure = 0
|
|
fail_reasons = {}
|
|
for no in missing_nos:
|
|
result = self._fetch_draw(no)
|
|
if result is None:
|
|
failed.append(no)
|
|
reason = self._last_fetch_error or "unknown"
|
|
fail_reasons[reason] = fail_reasons.get(reason, 0) + 1
|
|
consecutive_failure += 1
|
|
if consecutive_failure >= _MAX_CONSECUTIVE_FETCH_FAILURES:
|
|
aborted_missing_nos = [x for x in missing_nos if x > no]
|
|
break
|
|
continue
|
|
draw_map[no] = result
|
|
added += 1
|
|
consecutive_failure = 0
|
|
time.sleep(0.2)
|
|
|
|
# 누락 보강 또는 중복 정리 여지가 있으면 파일을 재생성
|
|
self._write_draw_map_files(lottoHistoryFile, draw_map)
|
|
|
|
last_no = max(draw_map.keys())
|
|
if added == 0 and not failed:
|
|
try:
|
|
self.bot.sendMsg(
|
|
"[Lottery Crawler] up to date (last drwNo={}, target={}).".format(
|
|
last_no, target_no
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
elif failed:
|
|
sample = ",".join(str(x) for x in failed[:10])
|
|
reason_items = sorted(fail_reasons.items(), key=lambda x: x[1], reverse=True)
|
|
reason_str = "; ".join("{} x{}".format(reason, count) for reason, count in reason_items[:3])
|
|
if aborted_missing_nos:
|
|
reason_str += " | aborted {} pending draws due to consecutive failures".format(
|
|
len(aborted_missing_nos)
|
|
)
|
|
try:
|
|
self.bot.sendMsg(
|
|
"[Lottery Crawler] appended {}, failed {} draw(s): {}{} | {}".format(
|
|
added,
|
|
len(failed),
|
|
sample,
|
|
"..." if len(failed) > 10 else "",
|
|
reason_str or "no reason",
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
self.bot.sendMsg(
|
|
"[Lottery Crawler] appended {} draw(s), last drwNo={}, target={}.".format(
|
|
added, last_no, target_no
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
# 오타 호환: 기존 코드에서 excute 를 쓰고 있음
|
|
execute = excute
|
|
|
|
if __name__ == "__main__":
|
|
PROJECT_HOME = '.'
|
|
resource_path = os.path.join(PROJECT_HOME, 'resources')
|
|
|
|
# 로또 데이터를 수집하기 위한 파이썬 클래스를 지정합니다.
|
|
dataCrawler = DataCrawler()
|
|
dataCrawler.excute(resource_path) |