- created_at 기본값이 없어도 저장되도록 NOW()로 기록 - API 오류 발생 시 detail을 함께 표시 Co-authored-by: Cursor <cursoragent@cursor.com>
265 lines
8.1 KiB
Python
265 lines
8.1 KiB
Python
import os
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
|
|
import psycopg2
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from dotenv import load_dotenv
|
|
from flask import Flask, jsonify, render_template, request
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__, static_folder="static", template_folder="templates")
|
|
|
|
|
|
DEFAULT_DESCRIPTION = "설명 없음"
|
|
DEFAULT_IMAGE = "/static/placeholder.svg"
|
|
CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "3600"))
|
|
FAILED_TTL_SECONDS = int(os.getenv("FAILED_TTL_SECONDS", "300"))
|
|
METADATA_CACHE = {}
|
|
PLACEHOLDER_DATA_URI = (
|
|
"data:image/svg+xml;utf8,"
|
|
"<svg%20width='640'%20height='360'%20viewBox='0%200%20640%20360'%20fill='none'%20"
|
|
"xmlns='http://www.w3.org/2000/svg'>"
|
|
"<rect%20width='640'%20height='360'%20fill='%23e9ecef'/>"
|
|
"<rect%20x='120'%20y='90'%20width='400'%20height='180'%20rx='16'%20fill='%23dee2e6'/>"
|
|
"<path%20d='M210%20210l60-70%2070%2080%2060-60%2090%2090H210z'%20fill='%23adb5bd'/>"
|
|
"<circle%20cx='260'%20cy='150'%20r='22'%20fill='%23adb5bd'/>"
|
|
"<text%20x='320'%20y='260'%20text-anchor='middle'%20font-size='18'%20"
|
|
"fill='%236c757d'%20font-family='Arial,%20sans-serif'>No%20Image</text>"
|
|
"</svg>"
|
|
)
|
|
DEFAULT_PAGE_SIZE = int(os.getenv("DEFAULT_PAGE_SIZE", "30"))
|
|
MAX_PAGE_SIZE = int(os.getenv("MAX_PAGE_SIZE", "60"))
|
|
|
|
|
|
def get_db_connection():
|
|
return psycopg2.connect(
|
|
host=os.getenv("DB_HOST"),
|
|
port=os.getenv("DB_PORT"),
|
|
dbname=os.getenv("DB_NAME"),
|
|
user=os.getenv("DB_USER"),
|
|
password=os.getenv("DB_PASSWORD"),
|
|
)
|
|
|
|
|
|
def normalize_url(raw_url: str) -> str:
|
|
if not raw_url:
|
|
return raw_url
|
|
parsed = urlparse(raw_url)
|
|
if parsed.scheme:
|
|
return raw_url
|
|
return f"https://{raw_url}"
|
|
|
|
|
|
def extract_meta(soup: BeautifulSoup, property_name: str, name: str):
|
|
tag = soup.find("meta", property=property_name)
|
|
if tag and tag.get("content"):
|
|
return tag.get("content").strip()
|
|
tag = soup.find("meta", attrs={"name": name})
|
|
if tag and tag.get("content"):
|
|
return tag.get("content").strip()
|
|
return ""
|
|
|
|
|
|
def extract_fallback_description(soup: BeautifulSoup) -> str:
|
|
for paragraph in soup.find_all("p"):
|
|
text = paragraph.get_text(" ", strip=True)
|
|
if len(text) >= 40:
|
|
return text[:180]
|
|
return ""
|
|
|
|
|
|
def fetch_metadata(url: str):
|
|
fallback = {
|
|
"title": url,
|
|
"description": DEFAULT_DESCRIPTION,
|
|
"image": DEFAULT_IMAGE,
|
|
}
|
|
cached = METADATA_CACHE.get(url)
|
|
now = time.time()
|
|
if cached and cached["expires_at"] > now:
|
|
return cached["data"]
|
|
try:
|
|
response = requests.get(
|
|
url,
|
|
headers={
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/121.0.0.0 Safari/537.36"
|
|
)
|
|
},
|
|
timeout=6,
|
|
)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
resolved_url = response.url or url
|
|
|
|
title = (
|
|
extract_meta(soup, "og:title", "twitter:title")
|
|
or extract_meta(soup, "twitter:title", "title")
|
|
or (soup.title.string.strip() if soup.title and soup.title.string else "")
|
|
or resolved_url
|
|
)
|
|
description = (
|
|
extract_meta(soup, "og:description", "description")
|
|
or extract_meta(soup, "twitter:description", "description")
|
|
)
|
|
if not description:
|
|
description = extract_fallback_description(soup) or DEFAULT_DESCRIPTION
|
|
image = (
|
|
extract_meta(soup, "og:image", "twitter:image")
|
|
or extract_meta(soup, "twitter:image", "image")
|
|
or DEFAULT_IMAGE
|
|
)
|
|
data = {"title": title, "description": description, "image": image}
|
|
METADATA_CACHE[url] = {
|
|
"data": data,
|
|
"expires_at": now + CACHE_TTL_SECONDS,
|
|
"ok": True,
|
|
}
|
|
return data
|
|
except Exception:
|
|
METADATA_CACHE[url] = {
|
|
"data": fallback,
|
|
"expires_at": now + FAILED_TTL_SECONDS,
|
|
"ok": False,
|
|
}
|
|
return fallback
|
|
|
|
|
|
def _clamp_int(value, default: int, minimum: int, maximum: int) -> int:
|
|
try:
|
|
parsed = int(value)
|
|
except Exception:
|
|
return default
|
|
return max(minimum, min(parsed, maximum))
|
|
|
|
|
|
def fetch_links_page_from_db(limit: int, offset: int):
|
|
table = os.getenv("TABLE", "news_link")
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"SELECT id, url, created_at FROM {table} ORDER BY created_at DESC OFFSET %s LIMIT %s",
|
|
(offset, limit),
|
|
)
|
|
return cur.fetchall()
|
|
|
|
|
|
@app.get("/")
|
|
def index():
|
|
links = []
|
|
error_message = ""
|
|
try:
|
|
rows = fetch_links_page_from_db(DEFAULT_PAGE_SIZE, 0)
|
|
for link_id, url, created_at in rows:
|
|
links.append(
|
|
{
|
|
"id": link_id,
|
|
"url": url,
|
|
"created_at": created_at.isoformat()
|
|
if isinstance(created_at, datetime)
|
|
else str(created_at),
|
|
"title": "",
|
|
"description": "",
|
|
"image": "",
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
error_message = f"DB 조회 실패: {exc}"
|
|
return render_template(
|
|
"index.html",
|
|
links=links,
|
|
error_message=error_message,
|
|
placeholder_data_uri=PLACEHOLDER_DATA_URI,
|
|
default_image=DEFAULT_IMAGE,
|
|
)
|
|
|
|
|
|
@app.get("/links")
|
|
def get_links():
|
|
limit = _clamp_int(
|
|
request.args.get("limit"), DEFAULT_PAGE_SIZE, minimum=1, maximum=MAX_PAGE_SIZE
|
|
)
|
|
offset = _clamp_int(request.args.get("offset"), 0, minimum=0, maximum=10_000_000)
|
|
|
|
try:
|
|
rows_plus_one = fetch_links_page_from_db(limit + 1, offset)
|
|
except Exception as exc:
|
|
return jsonify({"error": "DB 조회 실패", "detail": str(exc)}), 500
|
|
|
|
has_more = len(rows_plus_one) > limit
|
|
rows = rows_plus_one[:limit]
|
|
|
|
urls = [url for _, url, _ in rows]
|
|
metas = []
|
|
if urls:
|
|
with ThreadPoolExecutor(max_workers=min(8, len(urls))) as executor:
|
|
metas = list(executor.map(fetch_metadata, urls))
|
|
|
|
results = []
|
|
for (link_id, url, created_at), meta in zip(rows, metas):
|
|
results.append(
|
|
{
|
|
"id": link_id,
|
|
"url": url,
|
|
"created_at": created_at.isoformat()
|
|
if isinstance(created_at, datetime)
|
|
else str(created_at),
|
|
**meta,
|
|
}
|
|
)
|
|
return jsonify(
|
|
{
|
|
"items": results,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"next_offset": offset + len(results),
|
|
"has_more": has_more,
|
|
}
|
|
)
|
|
|
|
|
|
@app.post("/links")
|
|
def add_link():
|
|
data = request.get_json(silent=True) or {}
|
|
raw_url = (data.get("url") or "").strip()
|
|
if not raw_url:
|
|
return jsonify({"error": "URL을 입력해주세요."}), 400
|
|
|
|
url = normalize_url(raw_url)
|
|
table = os.getenv("TABLE", "news_link")
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
# created_at에 DEFAULT가 없더라도 저장되도록 NOW()를 함께 기록
|
|
f"INSERT INTO {table} (url, created_at) VALUES (%s, NOW()) RETURNING id, created_at",
|
|
(url,),
|
|
)
|
|
link_id, created_at = cur.fetchone()
|
|
conn.commit()
|
|
except Exception as exc:
|
|
return jsonify({"error": "DB 저장 실패", "detail": str(exc)}), 500
|
|
|
|
meta = fetch_metadata(url)
|
|
return jsonify(
|
|
{
|
|
"id": link_id,
|
|
"url": url,
|
|
"created_at": created_at.isoformat()
|
|
if isinstance(created_at, datetime)
|
|
else str(created_at),
|
|
**meta,
|
|
}
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8021, debug=True)
|