Files
news_link/app.py
dsyoon d5174d5835 feat: 뉴스 링크 저장/조회 웹앱 추가
- Flask 기반 UI 및 /links API 구현
- 30개 단위 페이지네이션 + 무한 스크롤 적용
- 메타데이터(제목/요약/이미지) 추출 및 캐시 적용

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-07 16:28:39 +09:00

264 lines
8.0 KiB
Python

import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from urllib.parse import urlparse
import psycopg2
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from flask import Flask, jsonify, render_template, request
load_dotenv()
app = Flask(__name__, static_folder="static", template_folder="templates")
DEFAULT_DESCRIPTION = "설명 없음"
DEFAULT_IMAGE = "/static/placeholder.svg"
CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "3600"))
FAILED_TTL_SECONDS = int(os.getenv("FAILED_TTL_SECONDS", "300"))
METADATA_CACHE = {}
PLACEHOLDER_DATA_URI = (
"data:image/svg+xml;utf8,"
"<svg%20width='640'%20height='360'%20viewBox='0%200%20640%20360'%20fill='none'%20"
"xmlns='http://www.w3.org/2000/svg'>"
"<rect%20width='640'%20height='360'%20fill='%23e9ecef'/>"
"<rect%20x='120'%20y='90'%20width='400'%20height='180'%20rx='16'%20fill='%23dee2e6'/>"
"<path%20d='M210%20210l60-70%2070%2080%2060-60%2090%2090H210z'%20fill='%23adb5bd'/>"
"<circle%20cx='260'%20cy='150'%20r='22'%20fill='%23adb5bd'/>"
"<text%20x='320'%20y='260'%20text-anchor='middle'%20font-size='18'%20"
"fill='%236c757d'%20font-family='Arial,%20sans-serif'>No%20Image</text>"
"</svg>"
)
DEFAULT_PAGE_SIZE = int(os.getenv("DEFAULT_PAGE_SIZE", "30"))
MAX_PAGE_SIZE = int(os.getenv("MAX_PAGE_SIZE", "60"))
def get_db_connection():
return psycopg2.connect(
host=os.getenv("DB_HOST"),
port=os.getenv("DB_PORT"),
dbname=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
)
def normalize_url(raw_url: str) -> str:
if not raw_url:
return raw_url
parsed = urlparse(raw_url)
if parsed.scheme:
return raw_url
return f"https://{raw_url}"
def extract_meta(soup: BeautifulSoup, property_name: str, name: str):
tag = soup.find("meta", property=property_name)
if tag and tag.get("content"):
return tag.get("content").strip()
tag = soup.find("meta", attrs={"name": name})
if tag and tag.get("content"):
return tag.get("content").strip()
return ""
def extract_fallback_description(soup: BeautifulSoup) -> str:
for paragraph in soup.find_all("p"):
text = paragraph.get_text(" ", strip=True)
if len(text) >= 40:
return text[:180]
return ""
def fetch_metadata(url: str):
fallback = {
"title": url,
"description": DEFAULT_DESCRIPTION,
"image": DEFAULT_IMAGE,
}
cached = METADATA_CACHE.get(url)
now = time.time()
if cached and cached["expires_at"] > now:
return cached["data"]
try:
response = requests.get(
url,
headers={
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/121.0.0.0 Safari/537.36"
)
},
timeout=6,
)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
resolved_url = response.url or url
title = (
extract_meta(soup, "og:title", "twitter:title")
or extract_meta(soup, "twitter:title", "title")
or (soup.title.string.strip() if soup.title and soup.title.string else "")
or resolved_url
)
description = (
extract_meta(soup, "og:description", "description")
or extract_meta(soup, "twitter:description", "description")
)
if not description:
description = extract_fallback_description(soup) or DEFAULT_DESCRIPTION
image = (
extract_meta(soup, "og:image", "twitter:image")
or extract_meta(soup, "twitter:image", "image")
or DEFAULT_IMAGE
)
data = {"title": title, "description": description, "image": image}
METADATA_CACHE[url] = {
"data": data,
"expires_at": now + CACHE_TTL_SECONDS,
"ok": True,
}
return data
except Exception:
METADATA_CACHE[url] = {
"data": fallback,
"expires_at": now + FAILED_TTL_SECONDS,
"ok": False,
}
return fallback
def _clamp_int(value, default: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value)
except Exception:
return default
return max(minimum, min(parsed, maximum))
def fetch_links_page_from_db(limit: int, offset: int):
table = os.getenv("TABLE", "news_link")
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
f"SELECT id, url, created_at FROM {table} ORDER BY created_at DESC OFFSET %s LIMIT %s",
(offset, limit),
)
return cur.fetchall()
@app.get("/")
def index():
links = []
error_message = ""
try:
rows = fetch_links_page_from_db(DEFAULT_PAGE_SIZE, 0)
for link_id, url, created_at in rows:
links.append(
{
"id": link_id,
"url": url,
"created_at": created_at.isoformat()
if isinstance(created_at, datetime)
else str(created_at),
"title": "",
"description": "",
"image": "",
}
)
except Exception as exc:
error_message = f"DB 조회 실패: {exc}"
return render_template(
"index.html",
links=links,
error_message=error_message,
placeholder_data_uri=PLACEHOLDER_DATA_URI,
default_image=DEFAULT_IMAGE,
)
@app.get("/links")
def get_links():
limit = _clamp_int(
request.args.get("limit"), DEFAULT_PAGE_SIZE, minimum=1, maximum=MAX_PAGE_SIZE
)
offset = _clamp_int(request.args.get("offset"), 0, minimum=0, maximum=10_000_000)
try:
rows_plus_one = fetch_links_page_from_db(limit + 1, offset)
except Exception as exc:
return jsonify({"error": "DB 조회 실패", "detail": str(exc)}), 500
has_more = len(rows_plus_one) > limit
rows = rows_plus_one[:limit]
urls = [url for _, url, _ in rows]
metas = []
if urls:
with ThreadPoolExecutor(max_workers=min(8, len(urls))) as executor:
metas = list(executor.map(fetch_metadata, urls))
results = []
for (link_id, url, created_at), meta in zip(rows, metas):
results.append(
{
"id": link_id,
"url": url,
"created_at": created_at.isoformat()
if isinstance(created_at, datetime)
else str(created_at),
**meta,
}
)
return jsonify(
{
"items": results,
"limit": limit,
"offset": offset,
"next_offset": offset + len(results),
"has_more": has_more,
}
)
@app.post("/links")
def add_link():
data = request.get_json(silent=True) or {}
raw_url = (data.get("url") or "").strip()
if not raw_url:
return jsonify({"error": "URL을 입력해주세요."}), 400
url = normalize_url(raw_url)
table = os.getenv("TABLE", "news_link")
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
f"INSERT INTO {table} (url) VALUES (%s) RETURNING id, created_at",
(url,),
)
link_id, created_at = cur.fetchone()
conn.commit()
except Exception as exc:
return jsonify({"error": "DB 저장 실패", "detail": str(exc)}), 500
meta = fetch_metadata(url)
return jsonify(
{
"id": link_id,
"url": url,
"created_at": created_at.isoformat()
if isinstance(created_at, datetime)
else str(created_at),
**meta,
}
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8021, debug=True)