928 lines
30 KiB
Python
928 lines
30 KiB
Python
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from collections.abc import Awaitable, Callable
|
|
from io import BytesIO
|
|
from urllib.parse import quote_plus
|
|
|
|
import httpx
|
|
from PIL import Image
|
|
|
|
from backend import config
|
|
from backend.database import SessionLocal
|
|
from backend.repository import (
|
|
create_news,
|
|
create_translation,
|
|
headline_exists_within_24h,
|
|
translation_exists,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PLACEHOLDER_IMAGE_PATH = "/static/images/placeholder.png"
|
|
GENERIC_AI_FALLBACK_URL = "https://placehold.co/1200x630/0f172a/e2e8f0/png?text=AI+News"
|
|
GENERIC_FINANCE_FALLBACK_URL = "https://placehold.co/1200x630/0f172a/e2e8f0/png?text=Market+News"
|
|
|
|
_FINANCE_TOPIC_TERMS = frozenset(
|
|
{
|
|
"finance",
|
|
"financial",
|
|
"market",
|
|
"markets",
|
|
"stock",
|
|
"stocks",
|
|
"share",
|
|
"shares",
|
|
"earnings",
|
|
"investor",
|
|
"investors",
|
|
"nasdaq",
|
|
"nyse",
|
|
"dow",
|
|
"s&p",
|
|
"bank",
|
|
"banking",
|
|
"revenue",
|
|
"profit",
|
|
"trading",
|
|
"ipo",
|
|
"valuation",
|
|
}
|
|
)
|
|
|
|
_FINANCE_IMAGE_BLOCKLIST = (
|
|
"cat",
|
|
"dog",
|
|
"pet",
|
|
"lion",
|
|
"tiger",
|
|
"bird",
|
|
"horse",
|
|
"portrait",
|
|
"selfie",
|
|
"wedding",
|
|
"food",
|
|
"nature-only",
|
|
)
|
|
|
|
|
|
async def call_perplexity_api(query: str) -> dict | None:
|
|
headers = {
|
|
"Authorization": f"Bearer {config.PERPLEXITY_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": config.PERPLEXITY_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"You are a news aggregator. Return a JSON array of news items. "
|
|
"Each item must have: headline, summary (2-3 sentences), source_url, "
|
|
"image_url (a relevant image URL if available), image_credit. "
|
|
"Return between 3 and 5 items. Respond ONLY with valid JSON array, no markdown."
|
|
),
|
|
},
|
|
{"role": "user", "content": query},
|
|
],
|
|
"temperature": 0.3,
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload)
|
|
cost_info = {
|
|
"model": config.PERPLEXITY_MODEL,
|
|
"status": response.status_code,
|
|
"usage": response.json().get("usage", {}),
|
|
}
|
|
logger.info("Perplexity API cost: %s", json.dumps(cost_info))
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def call_openrouter_api(query: str) -> dict | None:
|
|
if not config.OPENROUTER_API_KEY:
|
|
return None
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {config.OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": config.OPENROUTER_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"You are a news aggregator. Return a JSON array of news items. "
|
|
"Each item must have: headline, summary (2-3 sentences), source_url, "
|
|
"image_url (a relevant image URL if available), image_credit. "
|
|
"Return between 3 and 5 items. Respond ONLY with valid JSON array, no markdown."
|
|
),
|
|
},
|
|
{"role": "user", "content": query},
|
|
],
|
|
"temperature": 0.3,
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(config.OPENROUTER_API_URL, headers=headers, json=payload)
|
|
cost_info = {
|
|
"model": config.OPENROUTER_MODEL,
|
|
"status": response.status_code,
|
|
"usage": response.json().get("usage", {}),
|
|
}
|
|
logger.info("OpenRouter API cost: %s", json.dumps(cost_info))
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def call_perplexity_translation_api(
|
|
headline: str,
|
|
summary: str,
|
|
language: str,
|
|
tldr_points: list[str] | None = None,
|
|
summary_body: str | None = None,
|
|
source_citation: str | None = None,
|
|
) -> dict | None:
|
|
headers = {
|
|
"Authorization": f"Bearer {config.PERPLEXITY_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": config.PERPLEXITY_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"Translate the given headline and summary to the target language. "
|
|
"Return only valid JSON object with keys: headline, summary. "
|
|
"No markdown, no extra text."
|
|
),
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": json.dumps(
|
|
{
|
|
"target_language": language,
|
|
"headline": headline,
|
|
"summary": summary,
|
|
"tldr_points": tldr_points or [],
|
|
"summary_body": summary_body or "",
|
|
"source_citation": source_citation or "",
|
|
}
|
|
),
|
|
},
|
|
],
|
|
"temperature": 0.1,
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def parse_translation_response(response: dict) -> dict | None:
|
|
content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
content = content.strip()
|
|
if content.startswith("```"):
|
|
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
|
|
|
|
try:
|
|
parsed = json.loads(content)
|
|
if isinstance(parsed, dict):
|
|
headline = str(parsed.get("headline", "")).strip()
|
|
summary = str(parsed.get("summary", "")).strip()
|
|
if headline and summary:
|
|
tldr_points = parsed.get("tldr_points", [])
|
|
if not isinstance(tldr_points, list):
|
|
tldr_points = []
|
|
cleaned_points = [str(p).strip() for p in tldr_points if str(p).strip()]
|
|
summary_body = str(parsed.get("summary_body", "")).strip() or None
|
|
source_citation = str(parsed.get("source_citation", "")).strip() or None
|
|
return {
|
|
"headline": headline,
|
|
"summary": summary,
|
|
"tldr_points": cleaned_points,
|
|
"summary_body": summary_body,
|
|
"source_citation": source_citation,
|
|
}
|
|
except json.JSONDecodeError:
|
|
logger.error("Failed to parse translation response: %s", content[:200])
|
|
return None
|
|
|
|
|
|
def validate_translation_quality(
|
|
headline: str, summary: str, language_code: str
|
|
) -> tuple[bool, str | None]:
|
|
text = f"{headline} {summary}".strip()
|
|
if not headline or not summary:
|
|
return False, "empty-content"
|
|
if len(text) < 20:
|
|
return False, "too-short"
|
|
|
|
repeated_runs = re.search(r"(.)\1{6,}", text)
|
|
if repeated_runs:
|
|
return False, "repeated-sequence"
|
|
|
|
lines = [segment.strip() for segment in re.split(r"[.!?]\s+", text) if segment.strip()]
|
|
if lines:
|
|
unique_ratio = len(set(lines)) / len(lines)
|
|
if unique_ratio < 0.4:
|
|
return False, "low-unique-content"
|
|
|
|
if language_code == "ta":
|
|
script_hits = sum(1 for char in text if "\u0b80" <= char <= "\u0bff")
|
|
elif language_code == "ml":
|
|
script_hits = sum(1 for char in text if "\u0d00" <= char <= "\u0d7f")
|
|
else:
|
|
return True, None
|
|
|
|
alpha_hits = sum(1 for char in text if char.isalpha())
|
|
if alpha_hits == 0:
|
|
return False, "no-alpha-content"
|
|
|
|
script_ratio = script_hits / alpha_hits
|
|
if script_ratio < 0.35:
|
|
return False, "script-mismatch"
|
|
|
|
return True, None
|
|
|
|
|
|
async def generate_translations(
|
|
headline: str,
|
|
summary: str,
|
|
tldr_points: list[str] | None = None,
|
|
summary_body: str | None = None,
|
|
source_citation: str | None = None,
|
|
) -> dict[str, dict]:
|
|
translations: dict[str, dict] = {}
|
|
language_names = {"ta": "Tamil", "ml": "Malayalam"}
|
|
|
|
if not config.PERPLEXITY_API_KEY:
|
|
return translations
|
|
|
|
for language_code, language_name in language_names.items():
|
|
try:
|
|
response = await call_perplexity_translation_api(
|
|
headline=headline,
|
|
summary=summary,
|
|
language=language_name,
|
|
tldr_points=tldr_points,
|
|
summary_body=summary_body,
|
|
source_citation=source_citation,
|
|
)
|
|
if response:
|
|
parsed = parse_translation_response(response)
|
|
if parsed:
|
|
is_valid, reason = validate_translation_quality(
|
|
parsed["headline"],
|
|
parsed["summary"],
|
|
language_code,
|
|
)
|
|
if is_valid:
|
|
logger.info("Translation accepted for %s", language_code)
|
|
translations[language_code] = parsed
|
|
else:
|
|
logger.warning(
|
|
"Translation rejected for %s: %s",
|
|
language_code,
|
|
reason,
|
|
)
|
|
except Exception:
|
|
logger.exception("Translation generation failed for %s", language_code)
|
|
|
|
return translations
|
|
|
|
|
|
async def call_perplexity_summary_api(
|
|
headline: str, summary: str, source_url: str | None
|
|
) -> dict | None:
|
|
headers = {
|
|
"Authorization": f"Bearer {config.PERPLEXITY_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": config.PERPLEXITY_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"Generate concise structured JSON for UI modal. Return only JSON object with keys: "
|
|
"tldr_points (array of 3 short bullets), summary_body (detailed summary), "
|
|
"source_citation (concise source/citation text). "
|
|
"Always summarize from provided text only. No markdown."
|
|
),
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": json.dumps(
|
|
{
|
|
"headline": headline,
|
|
"summary": summary,
|
|
"source_citation": source_url or "Original source",
|
|
"summary_length_scale": config.SUMMARY_LENGTH_SCALE,
|
|
"summary_length_rule": (
|
|
"1=very short, 2=short, 3=medium, 4=long, 5=very long. "
|
|
"Use more detail as scale increases."
|
|
),
|
|
}
|
|
),
|
|
},
|
|
],
|
|
"temperature": 0.2,
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def parse_summary_response(response: dict) -> dict | None:
|
|
content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
content = content.strip()
|
|
if content.startswith("```"):
|
|
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
|
|
try:
|
|
parsed = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
logger.error("Failed to parse summary response: %s", content[:200])
|
|
return None
|
|
|
|
if not isinstance(parsed, dict):
|
|
return None
|
|
|
|
tldr_points = parsed.get("tldr_points", [])
|
|
if not isinstance(tldr_points, list):
|
|
tldr_points = []
|
|
cleaned_points = [str(point).strip() for point in tldr_points if str(point).strip()]
|
|
|
|
summary_body = str(parsed.get("summary_body", "")).strip()
|
|
source_citation = str(parsed.get("source_citation", "")).strip()
|
|
|
|
if not cleaned_points and not summary_body:
|
|
return None
|
|
|
|
return {
|
|
"tldr_points": cleaned_points[:5],
|
|
"summary_body": summary_body or None,
|
|
"source_citation": source_citation or None,
|
|
}
|
|
|
|
|
|
def build_fallback_summary(summary: str, source_url: str | None) -> dict:
|
|
segments = [
|
|
s.strip() for s in summary.replace("!", ".").replace("?", ".").split(".") if s.strip()
|
|
]
|
|
points = segments[:3]
|
|
if not points and summary.strip():
|
|
points = [summary.strip()[:180]]
|
|
return {
|
|
"tldr_points": points,
|
|
"summary_body": summary,
|
|
"source_citation": source_url or "Original source",
|
|
}
|
|
|
|
|
|
# Stop words to remove from image search queries
|
|
_STOP_WORDS = frozenset(
|
|
[
|
|
"a",
|
|
"an",
|
|
"the",
|
|
"and",
|
|
"or",
|
|
"but",
|
|
"in",
|
|
"on",
|
|
"at",
|
|
"to",
|
|
"for",
|
|
"of",
|
|
"with",
|
|
"by",
|
|
"from",
|
|
"as",
|
|
"is",
|
|
"was",
|
|
"are",
|
|
"were",
|
|
"been",
|
|
"be",
|
|
"have",
|
|
"has",
|
|
"had",
|
|
"do",
|
|
"does",
|
|
"did",
|
|
"will",
|
|
"would",
|
|
"could",
|
|
"should",
|
|
"may",
|
|
"might",
|
|
"must",
|
|
"shall",
|
|
"can",
|
|
"need",
|
|
"it",
|
|
"its",
|
|
"this",
|
|
"that",
|
|
"these",
|
|
"those",
|
|
"i",
|
|
"you",
|
|
"he",
|
|
"she",
|
|
"we",
|
|
"they",
|
|
"what",
|
|
"which",
|
|
"who",
|
|
"whom",
|
|
"how",
|
|
"when",
|
|
"where",
|
|
"why",
|
|
"all",
|
|
"each",
|
|
"every",
|
|
"both",
|
|
"few",
|
|
"more",
|
|
"most",
|
|
"other",
|
|
"some",
|
|
"such",
|
|
"no",
|
|
"nor",
|
|
"not",
|
|
"only",
|
|
"own",
|
|
"same",
|
|
"so",
|
|
"than",
|
|
"too",
|
|
"very",
|
|
"just",
|
|
"also",
|
|
"now",
|
|
"here",
|
|
"there",
|
|
"about",
|
|
"after",
|
|
"before",
|
|
"above",
|
|
"below",
|
|
"between",
|
|
"into",
|
|
"through",
|
|
"during",
|
|
"under",
|
|
"again",
|
|
"further",
|
|
"then",
|
|
"once",
|
|
"announces",
|
|
"announced",
|
|
"says",
|
|
"said",
|
|
"reports",
|
|
"reported",
|
|
"reveals",
|
|
"revealed",
|
|
"launches",
|
|
"launched",
|
|
"introduces",
|
|
"introduced",
|
|
]
|
|
)
|
|
|
|
|
|
def extract_image_keywords(headline: str) -> str:
|
|
"""Extract relevant keywords from headline for image search.
|
|
|
|
- Removes stop words (articles, prepositions, common verbs)
|
|
- Limits to max 5 significant words
|
|
- Handles edge cases (empty, only stop words, special characters)
|
|
"""
|
|
if not headline or not headline.strip():
|
|
return "ai machine learning deep learning"
|
|
|
|
# Normalize: remove special characters, keep alphanumeric and spaces
|
|
cleaned = re.sub(r"[^\w\s]", " ", headline)
|
|
# Split into words and lowercase
|
|
words = cleaned.lower().split()
|
|
|
|
# Filter out stop words and very short words
|
|
keywords = [w for w in words if w not in _STOP_WORDS and len(w) > 2]
|
|
|
|
# Limit to first 5 significant keywords
|
|
keywords = keywords[:5]
|
|
|
|
if not keywords:
|
|
return "ai machine learning deep learning"
|
|
|
|
return " ".join(keywords)
|
|
|
|
|
|
async def fetch_pixabay_image(query: str) -> tuple[str | None, str | None]:
|
|
"""Fetch image from Pixabay API."""
|
|
if not config.PIXABAY_API_KEY:
|
|
return None, None
|
|
|
|
try:
|
|
encoded_query = quote_plus(query)
|
|
url = (
|
|
f"https://pixabay.com/api/"
|
|
f"?key={config.PIXABAY_API_KEY}"
|
|
f"&q={encoded_query}"
|
|
f"&image_type=photo&per_page=3&safesearch=true"
|
|
)
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
hits = data.get("hits", [])
|
|
if hits:
|
|
image_url = hits[0].get("webformatURL")
|
|
user = hits[0].get("user", "Unknown")
|
|
if image_url:
|
|
return str(image_url), f"Photo by {user} on Pixabay"
|
|
except Exception:
|
|
logger.exception("Pixabay image retrieval failed")
|
|
|
|
return None, None
|
|
|
|
|
|
async def fetch_unsplash_image(query: str) -> tuple[str | None, str | None]:
|
|
"""Fetch image from Unsplash API."""
|
|
if not config.UNSPLASH_ACCESS_KEY:
|
|
return None, None
|
|
|
|
try:
|
|
encoded_query = quote_plus(query)
|
|
url = f"https://api.unsplash.com/search/photos?query={encoded_query}&per_page=3"
|
|
headers = {"Authorization": f"Client-ID {config.UNSPLASH_ACCESS_KEY}"}
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
results = data.get("results", [])
|
|
if results:
|
|
image_url = results[0].get("urls", {}).get("regular")
|
|
user_name = results[0].get("user", {}).get("name", "Unknown")
|
|
if image_url:
|
|
return str(image_url), f"Photo by {user_name} on Unsplash"
|
|
except Exception:
|
|
logger.exception("Unsplash image retrieval failed")
|
|
|
|
return None, None
|
|
|
|
|
|
async def fetch_pexels_image(query: str) -> tuple[str | None, str | None]:
|
|
"""Fetch image from Pexels API."""
|
|
if not config.PEXELS_API_KEY:
|
|
return None, None
|
|
|
|
try:
|
|
encoded_query = quote_plus(query)
|
|
url = f"https://api.pexels.com/v1/search?query={encoded_query}&per_page=3"
|
|
headers = {"Authorization": config.PEXELS_API_KEY}
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
photos = data.get("photos", [])
|
|
if photos:
|
|
image_url = photos[0].get("src", {}).get("large")
|
|
photographer = photos[0].get("photographer", "Unknown")
|
|
if image_url:
|
|
return str(image_url), f"Photo by {photographer} on Pexels"
|
|
except Exception:
|
|
logger.exception("Pexels image retrieval failed")
|
|
|
|
return None, None
|
|
|
|
|
|
async def fetch_wikimedia_image(query: str) -> tuple[str | None, str | None]:
|
|
"""Fetch image from Wikimedia Commons."""
|
|
try:
|
|
encoded_query = quote_plus(query[:120])
|
|
search_url = (
|
|
"https://commons.wikimedia.org/w/api.php"
|
|
"?action=query&format=json&generator=search&gsrnamespace=6&gsrlimit=1"
|
|
f"&gsrsearch={encoded_query}&prop=imageinfo&iiprop=url"
|
|
)
|
|
async with httpx.AsyncClient(
|
|
timeout=15.0,
|
|
headers={"User-Agent": "ClawFortBot/1.0 (news image enrichment)"},
|
|
) as client:
|
|
response = await client.get(search_url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
pages = data.get("query", {}).get("pages", {})
|
|
if pages:
|
|
first_page = next(iter(pages.values()))
|
|
infos = first_page.get("imageinfo", [])
|
|
if infos:
|
|
url = infos[0].get("url")
|
|
if url:
|
|
return str(url), "Wikimedia Commons"
|
|
except Exception:
|
|
logger.exception("Wikimedia image retrieval failed")
|
|
|
|
return None, None
|
|
|
|
|
|
async def fetch_picsum_image(query: str) -> tuple[str | None, str | None]:
|
|
"""Generate deterministic Picsum image URL (always succeeds)."""
|
|
seed = hashlib.md5(query.encode("utf-8")).hexdigest()[:12]
|
|
return f"https://picsum.photos/seed/{seed}/1200/630", "Picsum Photos"
|
|
|
|
|
|
# Provider registry: maps provider names to (fetch_function, requires_api_key)
|
|
_PROVIDER_REGISTRY: dict[str, tuple] = {
|
|
"pixabay": (fetch_pixabay_image, lambda: bool(config.PIXABAY_API_KEY)),
|
|
"unsplash": (fetch_unsplash_image, lambda: bool(config.UNSPLASH_ACCESS_KEY)),
|
|
"pexels": (fetch_pexels_image, lambda: bool(config.PEXELS_API_KEY)),
|
|
"wikimedia": (fetch_wikimedia_image, lambda: True), # No API key required
|
|
"picsum": (fetch_picsum_image, lambda: True), # Always available
|
|
}
|
|
|
|
|
|
def get_enabled_providers() -> list[
|
|
tuple[str, Callable[[str], Awaitable[tuple[str | None, str | None]]]]
|
|
]:
|
|
"""Get ordered list of enabled providers based on config and available API keys."""
|
|
provider_names = [
|
|
p.strip().lower() for p in config.ROYALTY_IMAGE_PROVIDERS.split(",") if p.strip()
|
|
]
|
|
|
|
enabled = []
|
|
for name in provider_names:
|
|
if name in _PROVIDER_REGISTRY:
|
|
fetch_fn, is_enabled = _PROVIDER_REGISTRY[name]
|
|
if is_enabled():
|
|
enabled.append((name, fetch_fn))
|
|
|
|
return enabled
|
|
|
|
|
|
async def fetch_royalty_free_image(query: str) -> tuple[str | None, str | None]:
|
|
"""Fetch royalty-free image using provider chain with fallback."""
|
|
|
|
def is_finance_story(text: str) -> bool:
|
|
lowered = (text or "").lower()
|
|
return any(term in lowered for term in _FINANCE_TOPIC_TERMS)
|
|
|
|
def is_finance_safe_image(image_url: str, credit: str | None) -> bool:
|
|
haystack = f"{image_url or ''} {credit or ''}".lower()
|
|
return not any(term in haystack for term in _FINANCE_IMAGE_BLOCKLIST)
|
|
|
|
# MCP endpoint takes highest priority if configured
|
|
if config.ROYALTY_IMAGE_MCP_ENDPOINT:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.post(
|
|
config.ROYALTY_IMAGE_MCP_ENDPOINT,
|
|
json={"query": query},
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
image_url = payload.get("image_url") or payload.get("url")
|
|
image_credit = payload.get("image_credit") or payload.get("credit")
|
|
if image_url:
|
|
return str(image_url), str(image_credit or "Royalty-free")
|
|
except Exception:
|
|
logger.exception("MCP image retrieval failed")
|
|
|
|
# Extract keywords for better image search
|
|
refined_query = extract_image_keywords(query)
|
|
finance_story = is_finance_story(query)
|
|
query_variants = [refined_query]
|
|
if finance_story:
|
|
query_variants = [
|
|
f"{refined_query} stock market trading chart finance business",
|
|
refined_query,
|
|
]
|
|
|
|
# Try each enabled provider in order
|
|
for query_variant in query_variants:
|
|
for provider_name, fetch_fn in get_enabled_providers():
|
|
try:
|
|
image_url, credit = await fetch_fn(query_variant)
|
|
if not image_url:
|
|
continue
|
|
if finance_story and not is_finance_safe_image(image_url, credit):
|
|
logger.info(
|
|
"Rejected non-finance-safe image from %s for query '%s': %s",
|
|
provider_name,
|
|
query_variant,
|
|
image_url,
|
|
)
|
|
continue
|
|
return image_url, credit
|
|
except Exception:
|
|
logger.exception("%s image retrieval failed", provider_name.capitalize())
|
|
|
|
if finance_story:
|
|
return GENERIC_FINANCE_FALLBACK_URL, "Finance-safe fallback"
|
|
|
|
return None, None
|
|
|
|
|
|
def parse_news_response(response: dict) -> list[dict]:
|
|
content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
content = content.strip()
|
|
if content.startswith("```"):
|
|
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
|
|
|
|
try:
|
|
items = json.loads(content)
|
|
if isinstance(items, list):
|
|
return items
|
|
except json.JSONDecodeError:
|
|
logger.error("Failed to parse news response: %s", content[:200])
|
|
return []
|
|
|
|
|
|
async def download_and_optimize_image(image_url: str) -> str | None:
|
|
if not image_url:
|
|
return None
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
|
|
response = await client.get(image_url)
|
|
response.raise_for_status()
|
|
|
|
img = Image.open(BytesIO(response.content))
|
|
|
|
if img.width > 1200:
|
|
ratio = 1200 / img.width
|
|
new_height = int(img.height * ratio)
|
|
img = img.resize((1200, new_height), Image.Resampling.LANCZOS)
|
|
|
|
if img.mode in ("RGBA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
filename = hashlib.md5(image_url.encode()).hexdigest() + ".jpg"
|
|
filepath = os.path.join(config.STATIC_IMAGES_DIR, filename)
|
|
|
|
img.save(filepath, "JPEG", quality=config.IMAGE_QUALITY, optimize=True)
|
|
return f"/static/images/{filename}"
|
|
except Exception:
|
|
logger.exception("Failed to download/optimize image: %s", image_url)
|
|
return None
|
|
|
|
|
|
async def fetch_news_with_retry(
|
|
max_attempts: int = 3, article_count: int | None = None
|
|
) -> list[dict]:
|
|
query = "What are the latest AI news from the last hour? Include source URLs and image URLs."
|
|
if article_count is not None:
|
|
bounded = max(1, min(50, int(article_count)))
|
|
query = (
|
|
f"What are the latest AI news from the last hour? Return exactly {bounded} items. "
|
|
"Include source URLs and image URLs."
|
|
)
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
response = await call_perplexity_api(query)
|
|
if response:
|
|
return parse_news_response(response)
|
|
except Exception:
|
|
wait = 2**attempt
|
|
logger.warning("Perplexity API attempt %d failed, retrying in %ds", attempt + 1, wait)
|
|
await asyncio.sleep(wait)
|
|
|
|
logger.warning("Perplexity API exhausted, trying OpenRouter fallback")
|
|
try:
|
|
response = await call_openrouter_api(query)
|
|
if response:
|
|
return parse_news_response(response)
|
|
except Exception:
|
|
logger.exception("OpenRouter fallback also failed")
|
|
|
|
return []
|
|
|
|
|
|
async def process_and_store_news(article_count: int | None = None) -> int:
|
|
items = await fetch_news_with_retry(article_count=article_count)
|
|
if not items:
|
|
logger.warning("No news items fetched this cycle")
|
|
return 0
|
|
|
|
db = SessionLocal()
|
|
stored = 0
|
|
try:
|
|
for item in items:
|
|
headline = item.get("headline", "").strip()
|
|
summary = item.get("summary", "").strip()
|
|
|
|
if not headline or not summary:
|
|
continue
|
|
|
|
if headline_exists_within_24h(db, headline):
|
|
logger.debug("Duplicate headline skipped: %s", headline[:80])
|
|
continue
|
|
|
|
local_image = await download_and_optimize_image(item.get("image_url", ""))
|
|
image_url = local_image or PLACEHOLDER_IMAGE_PATH
|
|
|
|
summary_artifact: dict | None = None
|
|
if config.PERPLEXITY_API_KEY:
|
|
try:
|
|
summary_response = await call_perplexity_summary_api(
|
|
headline=headline,
|
|
summary=summary,
|
|
source_url=item.get("source_url"),
|
|
)
|
|
if summary_response:
|
|
summary_artifact = parse_summary_response(summary_response)
|
|
except Exception:
|
|
logger.exception("Summary generation failed for article: %s", headline[:80])
|
|
|
|
if summary_artifact is None:
|
|
summary_artifact = build_fallback_summary(summary, item.get("source_url"))
|
|
|
|
summary_image_url, summary_image_credit = await fetch_royalty_free_image(headline)
|
|
summary_local_image = None
|
|
if summary_image_url:
|
|
summary_local_image = await download_and_optimize_image(summary_image_url)
|
|
if summary_local_image:
|
|
summary_image_url = summary_local_image
|
|
if not summary_image_url:
|
|
summary_image_url = image_url
|
|
if not summary_image_credit:
|
|
summary_image_credit = item.get("image_credit")
|
|
|
|
tldr_points = summary_artifact.get("tldr_points") if summary_artifact else None
|
|
summary_body = summary_artifact.get("summary_body") if summary_artifact else None
|
|
source_citation = summary_artifact.get("source_citation") if summary_artifact else None
|
|
|
|
created_news_item = create_news(
|
|
db=db,
|
|
headline=headline,
|
|
summary=summary,
|
|
source_url=item.get("source_url"),
|
|
image_url=image_url,
|
|
image_credit=item.get("image_credit"),
|
|
tldr_points=tldr_points,
|
|
summary_body=summary_body,
|
|
source_citation=source_citation,
|
|
summary_image_url=summary_image_url,
|
|
summary_image_credit=summary_image_credit,
|
|
)
|
|
|
|
translations = await generate_translations(
|
|
headline=headline,
|
|
summary=summary,
|
|
tldr_points=tldr_points,
|
|
summary_body=summary_body,
|
|
source_citation=source_citation,
|
|
)
|
|
for language_code, payload in translations.items():
|
|
if translation_exists(db, created_news_item.id, language_code):
|
|
continue
|
|
create_translation(
|
|
db=db,
|
|
news_item_id=created_news_item.id,
|
|
language=language_code,
|
|
headline=payload["headline"],
|
|
summary=payload["summary"],
|
|
tldr_points=payload.get("tldr_points"),
|
|
summary_body=payload.get("summary_body"),
|
|
source_citation=payload.get("source_citation"),
|
|
)
|
|
|
|
stored += 1
|
|
|
|
logger.info("Stored %d new news items", stored)
|
|
finally:
|
|
db.close()
|
|
|
|
return stored
|
|
|
|
|
|
def scheduled_news_fetch() -> None:
|
|
start = time.monotonic()
|
|
logger.info("Starting scheduled news fetch")
|
|
count = asyncio.run(process_and_store_news())
|
|
elapsed = time.monotonic() - start
|
|
logger.info("Scheduled news fetch complete: %d items in %.1fs", count, elapsed)
|