Files
DeepLottery/DataCrawler.py
dsyoon 919f2e19bb refactor: apply portfolio cap and align project docs
Keep the fixed 11-number set intact while adding a second-stage portfolio selection that caps final recommendations to the 70,000 KRW budget, and update docs/data/scripts to match the current project structure and runtime flow.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 10:37:03 +09:00

402 lines
15 KiB
Python

# 웹 호출 라이브러리를 호출합니다.
import os
import time
import requests
# JSON 포맷을 다루기 위한 라이브러리를 호출합니다.
import json
from datetime import datetime, timedelta
import random
import socket
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
from TelegramBot import TelegramBot
except ModuleNotFoundError:
class TelegramBot:
def __init__(self, enable=True):
pass
def sendMsg(self, msg):
pass
_LOTTO_URLS = (
"https://www.dhlottery.co.kr/common.do?method=getLottoNumber&drwNo={}",
"https://dhlottery.co.kr/common.do?method=getLottoNumber&drwNo={}",
)
_REQUEST_TIMEOUT = float(os.environ.get("LOTTO_REQUEST_TIMEOUT", "12"))
_FETCH_RETRIES_PER_DRAW = int(os.environ.get("LOTTO_FETCH_RETRIES", "3"))
_BACKOFF_BASE_SECONDS = float(os.environ.get("LOTTO_BACKOFF_BASE", "0.7"))
_MAX_CONSECUTIVE_FETCH_FAILURES = int(os.environ.get("LOTTO_MAX_CONSEC_FAIL", "8"))
_CONNECTION_PROBE_TIMEOUT = float(os.environ.get("LOTTO_PROBE_TIMEOUT", "3"))
_BROWSER_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Referer": "https://www.dhlottery.co.kr/gameResult.do?method=byWin",
"X-Requested-With": "XMLHttpRequest",
}
def _ssl_verify_arg():
try:
import certifi
return certifi.where()
except ImportError:
return True
# 로또 데이터를 수집하기 위한 파이썬 클래스를 선언합니다.
class DataCrawler:
bot = None
# 클래스 생성자로 수집할 회차를 입력받습니다.
def __init__(self):
self.bot = TelegramBot()
self._session = requests.Session()
self._session.headers.update(_BROWSER_HEADERS)
self._last_fetch_error = ""
def _can_reach_lottery_host(self):
"""
API 호스트 TCP 연결 가능 여부를 빠르게 확인합니다.
완전한 보장은 아니지만, 완전 차단 상태를 조기 감지해 불필요한 대기 시간을 줄입니다.
"""
for host in ("www.dhlottery.co.kr", "dhlottery.co.kr"):
try:
with socket.create_connection((host, 443), timeout=_CONNECTION_PROBE_TIMEOUT):
return True
except OSError:
continue
return False
def _fetch_draw(self, drw_no):
"""동행복권 API에서 단일 회차 결과를 가져옵니다. 실패 시 None."""
self._last_fetch_error = ""
verify_options = (_ssl_verify_arg(), False)
last_error = "unknown"
for attempt in range(1, _FETCH_RETRIES_PER_DRAW + 1):
for raw_url in _LOTTO_URLS:
url = raw_url.format(int(drw_no))
for verify in verify_options:
for method in ("POST", "GET"):
try:
res = self._session.request(
method,
url,
timeout=_REQUEST_TIMEOUT,
verify=verify,
)
if res.status_code != 200:
last_error = "http {}".format(res.status_code)
continue
text = res.text.strip()
if not text.startswith("{"):
last_error = "non-json response"
continue
result = json.loads(text)
except (
requests.RequestException,
ValueError,
json.JSONDecodeError,
) as ex:
last_error = str(ex)
continue
if isinstance(result, dict) and result.get("returnValue") == "success":
return result
rv = result.get("returnValue") if isinstance(result, dict) else "unknown"
last_error = "api returnValue={}".format(rv)
if attempt < _FETCH_RETRIES_PER_DRAW:
# 지수 백오프 + 지터로 일시적 네트워크 혼잡 완화
delay = _BACKOFF_BASE_SECONDS * (2 ** (attempt - 1)) + random.uniform(0, 0.25)
time.sleep(delay)
self._last_fetch_error = last_error
return None
def _append_draw_files(self, lottoHistoryFile, result):
"""성공 응답 한 건을 txt/json에 이어 씁니다."""
drw_no = result["drwNo"]
json_path = lottoHistoryFile + ".json"
txt_path = lottoHistoryFile + ".txt"
with open(json_path, "a", encoding="utf-8") as json_fp:
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
with open(txt_path, "a", encoding="utf-8") as text_fp:
text_fp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
drw_no,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
def _read_last_draw_from_json(self, json_path):
"""JSONL 마지막 유효 레코드의 drwNo를 반환. 없으면 None."""
if not os.path.isfile(json_path) or os.path.getsize(json_path) == 0:
return None
last_json = None
with open(json_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
last_json = json.loads(line)
except json.JSONDecodeError:
continue
if not last_json or last_json.get("returnValue") != "success":
return None
return last_json.get("drwNo")
def _read_draw_map_from_json(self, json_path):
"""
JSONL 전체를 읽어 drwNo -> record 맵으로 반환합니다.
잘못된 라인/중복 라인은 정리 대상이며, 마지막 유효값을 유지합니다.
"""
draw_map = {}
if not os.path.isfile(json_path) or os.path.getsize(json_path) == 0:
return draw_map
with open(json_path, "r", encoding="utf-8") as fp:
for line in fp:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
if (
isinstance(data, dict)
and data.get("returnValue") == "success"
and isinstance(data.get("drwNo"), int)
):
draw_map[data["drwNo"]] = data
return draw_map
def _write_draw_map_files(self, lottoHistoryFile, draw_map):
"""
drwNo 오름차순으로 json/txt를 재생성합니다.
누락 회차 보강/중복 제거 후 일관된 파일 상태를 보장합니다.
"""
json_path = lottoHistoryFile + ".json"
txt_path = lottoHistoryFile + ".txt"
with open(json_path, "w", encoding="utf-8") as json_fp, open(
txt_path, "w", encoding="utf-8"
) as text_fp:
for drw_no in sorted(draw_map.keys()):
result = draw_map[drw_no]
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
text_fp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
drw_no,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
def _get_last_week_draw_date(self):
"""
'지난 주' 기준 토요일 날짜를 반환합니다.
예: 금요일(2026-05-08) 실행 시 직전 토요일(2026-05-02)
"""
now = datetime.now()
days_since_saturday = (now.weekday() - 5) % 7
latest_saturday = now.date() - timedelta(days=days_since_saturday)
# 토요일이면서 추첨 전(20시 이전)이라면 지난 주 토요일을 목표로 사용
if now.weekday() == 5 and now.hour < 20:
latest_saturday = latest_saturday - timedelta(days=7)
return latest_saturday
def _estimate_target_draw_no(self, draw_map):
"""
기존 데이터의 마지막 drwNoDate와 지난 주 토요일을 비교해
이번 실행에서 확보해야 할 목표 회차를 계산합니다.
"""
if not draw_map:
return None
last_no = max(draw_map.keys())
last_data = draw_map[last_no]
last_date_str = last_data.get("drwNoDate", "")
try:
last_date = datetime.strptime(last_date_str, "%Y-%m-%d").date()
except ValueError:
return last_no
target_date = self._get_last_week_draw_date()
if target_date <= last_date:
return last_no
week_gap = (target_date - last_date).days // 7
if week_gap <= 0:
return last_no
return last_no + week_gap
# 로또 당첨 데이터를 수집해서 파일로 저장합니다.
# lottoHistoryFile: 로또 당첨 데이터를 저장할 파일 (확장자 제외)
def craw(self, lottoHistoryFile, drwNo=None):
if drwNo is not None:
result = self._fetch_draw(drwNo)
if result is None:
return False
self._append_draw_files(lottoHistoryFile, result)
return True
json_path = lottoHistoryFile + ".json"
text_path = lottoHistoryFile + ".txt"
with open(json_path, "w", encoding="utf-8") as json_fp, open(
text_path, "w", encoding="utf-8"
) as text_fp:
idx = 1
while True:
result = self._fetch_draw(idx)
if result is None:
break
json_fp.write(json.dumps(result, ensure_ascii=False) + "\n")
text_fp.write(
"%d,%d,%d,%d,%d,%d,%d,%d\n"
% (
idx,
result["drwtNo1"],
result["drwtNo2"],
result["drwtNo3"],
result["drwtNo4"],
result["drwtNo5"],
result["drwtNo6"],
result["bnusNo"],
)
)
idx += 1
time.sleep(0.5)
return True
def excute(self, resource_path):
"""
resources/lotto_history.* 를 지난 주 기준으로 누락 없이 동기화합니다.
- 마지막 회차+1만 확인하지 않고, 1~목표회차 범위에서 누락 회차를 탐지/보강
- 중복/깨진 라인을 정리해 json/txt를 일관 상태로 재생성
"""
lottoHistoryFile = os.path.join(resource_path, "lotto_history")
json_path = lottoHistoryFile + ".json"
draw_map = self._read_draw_map_from_json(json_path)
# 기존 이력이 비었거나 깨졌으면 전체 재수집(기존 동작 유지)
if not draw_map:
try:
self.craw(lottoHistoryFile)
self.bot.sendMsg("[Lottery Crawler] full history rebuilt (no valid json).")
except Exception:
pass
return True
target_no = self._estimate_target_draw_no(draw_map)
if target_no is None:
target_no = max(draw_map.keys())
if not self._can_reach_lottery_host():
msg = "[Lottery Crawler] network blocked: cannot reach dhlottery host."
print(msg)
try:
self.bot.sendMsg(msg)
except Exception:
pass
return False
missing_nos = [no for no in range(1, target_no + 1) if no not in draw_map]
added = 0
failed = []
aborted_missing_nos = []
consecutive_failure = 0
fail_reasons = {}
for no in missing_nos:
result = self._fetch_draw(no)
if result is None:
failed.append(no)
reason = self._last_fetch_error or "unknown"
fail_reasons[reason] = fail_reasons.get(reason, 0) + 1
consecutive_failure += 1
if consecutive_failure >= _MAX_CONSECUTIVE_FETCH_FAILURES:
aborted_missing_nos = [x for x in missing_nos if x > no]
break
continue
draw_map[no] = result
added += 1
consecutive_failure = 0
time.sleep(0.2)
# 누락 보강 또는 중복 정리 여지가 있으면 파일을 재생성
self._write_draw_map_files(lottoHistoryFile, draw_map)
last_no = max(draw_map.keys())
if added == 0 and not failed:
try:
self.bot.sendMsg(
"[Lottery Crawler] up to date (last drwNo={}, target={}).".format(
last_no, target_no
)
)
except Exception:
pass
elif failed:
sample = ",".join(str(x) for x in failed[:10])
reason_items = sorted(fail_reasons.items(), key=lambda x: x[1], reverse=True)
reason_str = "; ".join("{} x{}".format(reason, count) for reason, count in reason_items[:3])
if aborted_missing_nos:
reason_str += " | aborted {} pending draws due to consecutive failures".format(
len(aborted_missing_nos)
)
try:
self.bot.sendMsg(
"[Lottery Crawler] appended {}, failed {} draw(s): {}{} | {}".format(
added,
len(failed),
sample,
"..." if len(failed) > 10 else "",
reason_str or "no reason",
)
)
except Exception:
pass
else:
try:
self.bot.sendMsg(
"[Lottery Crawler] appended {} draw(s), last drwNo={}, target={}.".format(
added, last_no, target_no
)
)
except Exception:
pass
return True
# 오타 호환: 기존 코드에서 excute 를 쓰고 있음
execute = excute
if __name__ == "__main__":
PROJECT_HOME = '.'
resource_path = os.path.join(PROJECT_HOME, 'resources')
# 로또 데이터를 수집하기 위한 파이썬 클래스를 지정합니다.
dataCrawler = DataCrawler()
dataCrawler.excute(resource_path)