import os import re import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime from urllib.parse import urlparse import psycopg2 import requests from bs4 import BeautifulSoup from dotenv import load_dotenv from flask import Flask, jsonify, make_response, render_template, request, send_from_directory load_dotenv() app = Flask(__name__, static_folder="static", template_folder="templates") DEFAULT_DESCRIPTION = "설명 없음" DEFAULT_IMAGE = "/static/placeholder.svg" CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "3600")) FAILED_TTL_SECONDS = int(os.getenv("FAILED_TTL_SECONDS", "300")) METADATA_CACHE = {} TABLE_COLUMNS_CACHE = {} PLACEHOLDER_DATA_URI = ( "data:image/svg+xml;utf8," "" "" "" "" "" "No%20Image" "" ) DEFAULT_PAGE_SIZE = int(os.getenv("DEFAULT_PAGE_SIZE", "30")) MAX_PAGE_SIZE = int(os.getenv("MAX_PAGE_SIZE", "60")) DEFAULT_SCHEMA = os.getenv("DB_SCHEMA", "public") def _safe_identifier(name: str) -> str: if not name or not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name): raise ValueError(f"Invalid SQL identifier: {name!r}") return name def _table_ref(schema: str, table: str) -> str: return f"{_safe_identifier(schema)}.{_safe_identifier(table)}" def get_db_connection(): return psycopg2.connect( host=os.getenv("DB_HOST"), port=os.getenv("DB_PORT"), dbname=os.getenv("DB_NAME"), user=os.getenv("DB_USER"), password=os.getenv("DB_PASSWORD"), ) def normalize_url(raw_url: str) -> str: if not raw_url: return raw_url parsed = urlparse(raw_url) if parsed.scheme: return raw_url return f"https://{raw_url}" def extract_meta(soup: BeautifulSoup, property_name: str, name: str): tag = soup.find("meta", property=property_name) if tag and tag.get("content"): return tag.get("content").strip() tag = soup.find("meta", attrs={"name": name}) if tag and tag.get("content"): return tag.get("content").strip() return "" def extract_fallback_description(soup: BeautifulSoup) -> str: for paragraph in soup.find_all("p"): text = paragraph.get_text(" ", strip=True) if len(text) >= 40: return text[:180] return "" def fetch_metadata(url: str): fallback = { "title": url, "description": DEFAULT_DESCRIPTION, "image": DEFAULT_IMAGE, } cached = METADATA_CACHE.get(url) now = time.time() if cached and cached["expires_at"] > now: return cached["data"] try: response = requests.get( url, headers={ "User-Agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/121.0.0.0 Safari/537.36" ) }, timeout=6, ) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") resolved_url = response.url or url title = ( extract_meta(soup, "og:title", "twitter:title") or extract_meta(soup, "twitter:title", "title") or (soup.title.string.strip() if soup.title and soup.title.string else "") or resolved_url ) description = ( extract_meta(soup, "og:description", "description") or extract_meta(soup, "twitter:description", "description") ) if not description: description = extract_fallback_description(soup) or DEFAULT_DESCRIPTION image = ( extract_meta(soup, "og:image", "twitter:image") or extract_meta(soup, "twitter:image", "image") or DEFAULT_IMAGE ) data = {"title": title, "description": description, "image": image} METADATA_CACHE[url] = { "data": data, "expires_at": now + CACHE_TTL_SECONDS, "ok": True, } return data except Exception: METADATA_CACHE[url] = { "data": fallback, "expires_at": now + FAILED_TTL_SECONDS, "ok": False, } return fallback def _clamp_int(value, default: int, minimum: int, maximum: int) -> int: try: parsed = int(value) except Exception: return default return max(minimum, min(parsed, maximum)) def get_table_columns(schema: str, table: str): key = (schema, table) cached = TABLE_COLUMNS_CACHE.get(key) if cached is not None: return cached with get_db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT column_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """, (schema, table), ) cols = {row[0] for row in cur.fetchall()} TABLE_COLUMNS_CACHE[key] = cols return cols def get_table_columns_info(schema: str, table: str): key = ("info", schema, table) cached = TABLE_COLUMNS_CACHE.get(key) if cached is not None: return cached with get_db_connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT column_name, data_type, udt_name, is_nullable FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """, (schema, table), ) info = { row[0]: { "data_type": row[1], "udt_name": row[2], "is_nullable": row[3], } for row in cur.fetchall() } TABLE_COLUMNS_CACHE[key] = info return info def get_request_identity(req) -> tuple[str | None, str | None]: """ 가능한 경우 (email, ip)를 반환. - 이메일: 프록시/SSO가 주입하는 헤더에서 추출 - IP: X-Forwarded-For / X-Real-IP / remote_addr 순 """ # 0) ncue.net/go 연동: ref_type/ref 를 쿼리스트링 또는 쿠키로 전달받을 수 있음 ref_type = (req.args.get("ref_type") or req.cookies.get("ref_type") or "").strip() ref = (req.args.get("ref") or req.cookies.get("ref") or "").strip() if ref_type in ("email", "ip") and ref: if ref_type == "email": return ref, None return None, ref email_headers = [ "X-User-Email", "X-Forwarded-Email", "X-Auth-Request-Email", "X-Forwarded-User", "Remote-User", "X-Email", ] email = None for h in email_headers: v = (req.headers.get(h) or "").strip() if v and "@" in v: email = v break xff = (req.headers.get("X-Forwarded-For") or "").strip() if xff: ip = xff.split(",")[0].strip() else: ip = (req.headers.get("X-Real-IP") or "").strip() or (req.remote_addr or "") ip = ip.strip() or None return email, ip def _maybe_set_ref_cookies(resp): ref_type = (request.args.get("ref_type") or "").strip() ref = (request.args.get("ref") or "").strip() if ref_type in ("email", "ip") and ref: # JS 요청(/links)에서도 유지되도록 쿠키 저장 (SameSite=Lax) max_age = 60 * 60 * 24 * 30 # 30일 resp.set_cookie("ref_type", ref_type, max_age=max_age, samesite="Lax") resp.set_cookie("ref", ref, max_age=max_age, samesite="Lax") return resp def fetch_links_page_from_db(limit: int, offset: int): table = os.getenv("TABLE", "news_link") schema = os.getenv("DB_SCHEMA", DEFAULT_SCHEMA) table_ref = _table_ref(schema, table) with get_db_connection() as conn: with conn.cursor() as cur: cur.execute( f"SELECT id, url, created_at FROM {table_ref} ORDER BY created_at DESC OFFSET %s LIMIT %s", (offset, limit), ) return cur.fetchall() @app.get("/") def index(): links = [] error_message = "" try: rows = fetch_links_page_from_db(DEFAULT_PAGE_SIZE, 0) for link_id, url, created_at in rows: links.append( { "id": link_id, "url": url, "created_at": created_at.isoformat() if isinstance(created_at, datetime) else str(created_at), "title": "", "description": "", "image": "", } ) except Exception as exc: error_message = f"DB 조회 실패: {exc}" resp = make_response( render_template( "index.html", links=links, error_message=error_message, placeholder_data_uri=PLACEHOLDER_DATA_URI, default_image=DEFAULT_IMAGE, ) ) return _maybe_set_ref_cookies(resp) @app.get("/favicon.ico") def favicon(): # ncue.net 등에서 /favicon.ico 로 직접 가져갈 수 있게 제공 return send_from_directory( app.static_folder, "favicon.ico", mimetype="image/x-icon", max_age=60 * 60 * 24 * 7, ) @app.get("/links") def get_links(): limit = _clamp_int( request.args.get("limit"), DEFAULT_PAGE_SIZE, minimum=1, maximum=MAX_PAGE_SIZE ) offset = _clamp_int(request.args.get("offset"), 0, minimum=0, maximum=10_000_000) try: rows_plus_one = fetch_links_page_from_db(limit + 1, offset) except Exception as exc: return jsonify({"error": "DB 조회 실패", "detail": str(exc)}), 500 has_more = len(rows_plus_one) > limit rows = rows_plus_one[:limit] urls = [url for _, url, _ in rows] metas = [] if urls: with ThreadPoolExecutor(max_workers=min(8, len(urls))) as executor: metas = list(executor.map(fetch_metadata, urls)) results = [] for (link_id, url, created_at), meta in zip(rows, metas): results.append( { "id": link_id, "url": url, "created_at": created_at.isoformat() if isinstance(created_at, datetime) else str(created_at), **meta, } ) return jsonify( { "items": results, "limit": limit, "offset": offset, "next_offset": offset + len(results), "has_more": has_more, } ) @app.post("/links") def add_link(): data = request.get_json(silent=True) or {} raw_url = (data.get("url") or "").strip() if not raw_url: return jsonify({"error": "URL을 입력해주세요."}), 400 url = normalize_url(raw_url) table = os.getenv("TABLE", "news_link") schema = os.getenv("DB_SCHEMA", DEFAULT_SCHEMA) table_ref = _table_ref(schema, table) try: cols = get_table_columns(schema, table) cols_info = get_table_columns_info(schema, table) email, ip = get_request_identity(request) identity = email or ip # 이메일 우선, 없으면 IP insert_cols = ["url"] insert_vals_sql = ["%s"] insert_params = [url] # 운영 DB 스키마 호환: created_at/updated_at, author_id 등이 NOT NULL일 수 있음 if "created_at" in cols: insert_cols.append("created_at") insert_vals_sql.append("NOW()") if "updated_at" in cols: insert_cols.append("updated_at") insert_vals_sql.append("NOW()") if "author_id" in cols: author_col = cols_info.get("author_id", {}) data_type = (author_col.get("data_type") or "").lower() udt = (author_col.get("udt_name") or "").lower() # 1) author_id가 텍스트 계열이면: 이메일/아이피 문자열을 그대로 저장 if data_type in ("text", "character varying", "character"): insert_cols.append("author_id") insert_vals_sql.append("%s") insert_params.append(identity or "unknown") # 2) author_id가 숫자(정수/숫자)면: 문자열 저장 불가 # → 기존 DEFAULT_AUTHOR_ID로 채우고, 가능한 경우 author_email/author_ip에 따로 저장(스키마 호환) elif udt in ("int2", "int4", "int8") or data_type in ("smallint", "integer", "bigint", "numeric"): raw_author_id = os.getenv("DEFAULT_AUTHOR_ID") if raw_author_id is None or str(raw_author_id).strip() == "": return ( jsonify( { "error": "DB 저장 실패", "detail": "author_id가 정수 NOT NULL입니다. .env에 DEFAULT_AUTHOR_ID(정수)를 설정하거나, author_id 타입을 text로 변경하세요.", } ), 500, ) try: author_id_int = int(raw_author_id) except Exception: return ( jsonify( { "error": "DB 저장 실패", "detail": f"DEFAULT_AUTHOR_ID는 정수여야 합니다: {raw_author_id!r}", } ), 500, ) insert_cols.append("author_id") insert_vals_sql.append("%s") insert_params.append(author_id_int) if "author_email" in cols and email: insert_cols.append("author_email") insert_vals_sql.append("%s") insert_params.append(email) if "author_ip" in cols and ip: insert_cols.append("author_ip") insert_vals_sql.append("%s") insert_params.append(ip) # 3) 기타 타입(uuid 등): 우선 문자열을 넣되 실패 시 detail로 노출 else: insert_cols.append("author_id") insert_vals_sql.append("%s") insert_params.append(identity or "unknown") with get_db_connection() as conn: with conn.cursor() as cur: cur.execute( f"INSERT INTO {table_ref} ({', '.join(insert_cols)}) VALUES ({', '.join(insert_vals_sql)}) RETURNING id, created_at", tuple(insert_params), ) link_id, created_at = cur.fetchone() conn.commit() except Exception as exc: return jsonify({"error": "DB 저장 실패", "detail": str(exc)}), 500 meta = fetch_metadata(url) return jsonify( { "id": link_id, "url": url, "created_at": created_at.isoformat() if isinstance(created_at, datetime) else str(created_at), **meta, } ) if __name__ == "__main__": app.run(host="0.0.0.0", port=8021, debug=True)