Files
clawfort/backend/news_service.py

806 lines
26 KiB
Python

import asyncio
import hashlib
import json
import logging
import os
import re
import time
from collections.abc import Awaitable, Callable
from io import BytesIO
from urllib.parse import quote_plus
import httpx
from PIL import Image
from backend import config
from backend.database import SessionLocal
from backend.repository import (
create_news,
create_translation,
headline_exists_within_24h,
translation_exists,
)
logger = logging.getLogger(__name__)
PLACEHOLDER_IMAGE_PATH = "/static/images/placeholder.png"
GENERIC_AI_FALLBACK_URL = "https://placehold.co/1200x630/0f172a/e2e8f0/png?text=AI+News"
async def call_perplexity_api(query: str) -> dict | None:
headers = {
"Authorization": f"Bearer {config.PERPLEXITY_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": config.PERPLEXITY_MODEL,
"messages": [
{
"role": "system",
"content": (
"You are a news aggregator. Return a JSON array of news items. "
"Each item must have: headline, summary (2-3 sentences), source_url, "
"image_url (a relevant image URL if available), image_credit. "
"Return between 3 and 5 items. Respond ONLY with valid JSON array, no markdown."
),
},
{"role": "user", "content": query},
],
"temperature": 0.3,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload)
cost_info = {
"model": config.PERPLEXITY_MODEL,
"status": response.status_code,
"usage": response.json().get("usage", {}),
}
logger.info("Perplexity API cost: %s", json.dumps(cost_info))
response.raise_for_status()
return response.json()
async def call_openrouter_api(query: str) -> dict | None:
if not config.OPENROUTER_API_KEY:
return None
headers = {
"Authorization": f"Bearer {config.OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": config.OPENROUTER_MODEL,
"messages": [
{
"role": "system",
"content": (
"You are a news aggregator. Return a JSON array of news items. "
"Each item must have: headline, summary (2-3 sentences), source_url, "
"image_url (a relevant image URL if available), image_credit. "
"Return between 3 and 5 items. Respond ONLY with valid JSON array, no markdown."
),
},
{"role": "user", "content": query},
],
"temperature": 0.3,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(config.OPENROUTER_API_URL, headers=headers, json=payload)
cost_info = {
"model": config.OPENROUTER_MODEL,
"status": response.status_code,
"usage": response.json().get("usage", {}),
}
logger.info("OpenRouter API cost: %s", json.dumps(cost_info))
response.raise_for_status()
return response.json()
async def call_perplexity_translation_api(
headline: str,
summary: str,
language: str,
tldr_points: list[str] | None = None,
summary_body: str | None = None,
source_citation: str | None = None,
) -> dict | None:
headers = {
"Authorization": f"Bearer {config.PERPLEXITY_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": config.PERPLEXITY_MODEL,
"messages": [
{
"role": "system",
"content": (
"Translate the given headline and summary to the target language. "
"Return only valid JSON object with keys: headline, summary. "
"No markdown, no extra text."
),
},
{
"role": "user",
"content": json.dumps(
{
"target_language": language,
"headline": headline,
"summary": summary,
"tldr_points": tldr_points or [],
"summary_body": summary_body or "",
"source_citation": source_citation or "",
}
),
},
],
"temperature": 0.1,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def parse_translation_response(response: dict) -> dict | None:
content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
headline = str(parsed.get("headline", "")).strip()
summary = str(parsed.get("summary", "")).strip()
if headline and summary:
tldr_points = parsed.get("tldr_points", [])
if not isinstance(tldr_points, list):
tldr_points = []
cleaned_points = [str(p).strip() for p in tldr_points if str(p).strip()]
summary_body = str(parsed.get("summary_body", "")).strip() or None
source_citation = str(parsed.get("source_citation", "")).strip() or None
return {
"headline": headline,
"summary": summary,
"tldr_points": cleaned_points,
"summary_body": summary_body,
"source_citation": source_citation,
}
except json.JSONDecodeError:
logger.error("Failed to parse translation response: %s", content[:200])
return None
async def generate_translations(
headline: str,
summary: str,
tldr_points: list[str] | None = None,
summary_body: str | None = None,
source_citation: str | None = None,
) -> dict[str, dict]:
translations: dict[str, dict] = {}
language_names = {"ta": "Tamil", "ml": "Malayalam"}
if not config.PERPLEXITY_API_KEY:
return translations
for language_code, language_name in language_names.items():
try:
response = await call_perplexity_translation_api(
headline=headline,
summary=summary,
language=language_name,
tldr_points=tldr_points,
summary_body=summary_body,
source_citation=source_citation,
)
if response:
parsed = parse_translation_response(response)
if parsed:
translations[language_code] = parsed
except Exception:
logger.exception("Translation generation failed for %s", language_code)
return translations
async def call_perplexity_summary_api(
headline: str, summary: str, source_url: str | None
) -> dict | None:
headers = {
"Authorization": f"Bearer {config.PERPLEXITY_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": config.PERPLEXITY_MODEL,
"messages": [
{
"role": "system",
"content": (
"Generate concise structured JSON for UI modal. Return only JSON object with keys: "
"tldr_points (array of 3 short bullets), summary_body (detailed summary), "
"source_citation (concise source/citation text). "
"Always summarize from provided text only. No markdown."
),
},
{
"role": "user",
"content": json.dumps(
{
"headline": headline,
"summary": summary,
"source_citation": source_url or "Original source",
"summary_length_scale": config.SUMMARY_LENGTH_SCALE,
"summary_length_rule": (
"1=very short, 2=short, 3=medium, 4=long, 5=very long. "
"Use more detail as scale increases."
),
}
),
},
],
"temperature": 0.2,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def parse_summary_response(response: dict) -> dict | None:
content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
try:
parsed = json.loads(content)
except json.JSONDecodeError:
logger.error("Failed to parse summary response: %s", content[:200])
return None
if not isinstance(parsed, dict):
return None
tldr_points = parsed.get("tldr_points", [])
if not isinstance(tldr_points, list):
tldr_points = []
cleaned_points = [str(point).strip() for point in tldr_points if str(point).strip()]
summary_body = str(parsed.get("summary_body", "")).strip()
source_citation = str(parsed.get("source_citation", "")).strip()
if not cleaned_points and not summary_body:
return None
return {
"tldr_points": cleaned_points[:5],
"summary_body": summary_body or None,
"source_citation": source_citation or None,
}
def build_fallback_summary(summary: str, source_url: str | None) -> dict:
segments = [
s.strip() for s in summary.replace("!", ".").replace("?", ".").split(".") if s.strip()
]
points = segments[:3]
if not points and summary.strip():
points = [summary.strip()[:180]]
return {
"tldr_points": points,
"summary_body": summary,
"source_citation": source_url or "Original source",
}
# Stop words to remove from image search queries
_STOP_WORDS = frozenset(
[
"a",
"an",
"the",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"for",
"of",
"with",
"by",
"from",
"as",
"is",
"was",
"are",
"were",
"been",
"be",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"must",
"shall",
"can",
"need",
"it",
"its",
"this",
"that",
"these",
"those",
"i",
"you",
"he",
"she",
"we",
"they",
"what",
"which",
"who",
"whom",
"how",
"when",
"where",
"why",
"all",
"each",
"every",
"both",
"few",
"more",
"most",
"other",
"some",
"such",
"no",
"nor",
"not",
"only",
"own",
"same",
"so",
"than",
"too",
"very",
"just",
"also",
"now",
"here",
"there",
"about",
"after",
"before",
"above",
"below",
"between",
"into",
"through",
"during",
"under",
"again",
"further",
"then",
"once",
"announces",
"announced",
"says",
"said",
"reports",
"reported",
"reveals",
"revealed",
"launches",
"launched",
"introduces",
"introduced",
]
)
def extract_image_keywords(headline: str) -> str:
"""Extract relevant keywords from headline for image search.
- Removes stop words (articles, prepositions, common verbs)
- Limits to max 5 significant words
- Handles edge cases (empty, only stop words, special characters)
"""
if not headline or not headline.strip():
return "ai machine learning deep learning"
# Normalize: remove special characters, keep alphanumeric and spaces
cleaned = re.sub(r"[^\w\s]", " ", headline)
# Split into words and lowercase
words = cleaned.lower().split()
# Filter out stop words and very short words
keywords = [w for w in words if w not in _STOP_WORDS and len(w) > 2]
# Limit to first 5 significant keywords
keywords = keywords[:5]
if not keywords:
return "ai machine learning deep learning"
return " ".join(keywords)
async def fetch_pixabay_image(query: str) -> tuple[str | None, str | None]:
"""Fetch image from Pixabay API."""
if not config.PIXABAY_API_KEY:
return None, None
try:
encoded_query = quote_plus(query)
url = (
f"https://pixabay.com/api/"
f"?key={config.PIXABAY_API_KEY}"
f"&q={encoded_query}"
f"&image_type=photo&per_page=3&safesearch=true"
)
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
hits = data.get("hits", [])
if hits:
image_url = hits[0].get("webformatURL")
user = hits[0].get("user", "Unknown")
if image_url:
return str(image_url), f"Photo by {user} on Pixabay"
except Exception:
logger.exception("Pixabay image retrieval failed")
return GENERIC_AI_FALLBACK_URL, "Generic AI fallback"
async def fetch_unsplash_image(query: str) -> tuple[str | None, str | None]:
"""Fetch image from Unsplash API."""
if not config.UNSPLASH_ACCESS_KEY:
return None, None
try:
encoded_query = quote_plus(query)
url = f"https://api.unsplash.com/search/photos?query={encoded_query}&per_page=3"
headers = {"Authorization": f"Client-ID {config.UNSPLASH_ACCESS_KEY}"}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
if results:
image_url = results[0].get("urls", {}).get("regular")
user_name = results[0].get("user", {}).get("name", "Unknown")
if image_url:
return str(image_url), f"Photo by {user_name} on Unsplash"
except Exception:
logger.exception("Unsplash image retrieval failed")
return None, None
async def fetch_pexels_image(query: str) -> tuple[str | None, str | None]:
"""Fetch image from Pexels API."""
if not config.PEXELS_API_KEY:
return None, None
try:
encoded_query = quote_plus(query)
url = f"https://api.pexels.com/v1/search?query={encoded_query}&per_page=3"
headers = {"Authorization": config.PEXELS_API_KEY}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
data = response.json()
photos = data.get("photos", [])
if photos:
image_url = photos[0].get("src", {}).get("large")
photographer = photos[0].get("photographer", "Unknown")
if image_url:
return str(image_url), f"Photo by {photographer} on Pexels"
except Exception:
logger.exception("Pexels image retrieval failed")
return None, None
async def fetch_wikimedia_image(query: str) -> tuple[str | None, str | None]:
"""Fetch image from Wikimedia Commons."""
try:
encoded_query = quote_plus(query[:120])
search_url = (
"https://commons.wikimedia.org/w/api.php"
"?action=query&format=json&generator=search&gsrnamespace=6&gsrlimit=1"
f"&gsrsearch={encoded_query}&prop=imageinfo&iiprop=url"
)
async with httpx.AsyncClient(
timeout=15.0,
headers={"User-Agent": "ClawFortBot/1.0 (news image enrichment)"},
) as client:
response = await client.get(search_url)
response.raise_for_status()
data = response.json()
pages = data.get("query", {}).get("pages", {})
if pages:
first_page = next(iter(pages.values()))
infos = first_page.get("imageinfo", [])
if infos:
url = infos[0].get("url")
if url:
return str(url), "Wikimedia Commons"
except Exception:
logger.exception("Wikimedia image retrieval failed")
return None, None
async def fetch_picsum_image(query: str) -> tuple[str | None, str | None]:
"""Generate deterministic Picsum image URL (always succeeds)."""
seed = hashlib.md5(query.encode("utf-8")).hexdigest()[:12]
return f"https://picsum.photos/seed/{seed}/1200/630", "Picsum Photos"
# Provider registry: maps provider names to (fetch_function, requires_api_key)
_PROVIDER_REGISTRY: dict[str, tuple] = {
"pixabay": (fetch_pixabay_image, lambda: bool(config.PIXABAY_API_KEY)),
"unsplash": (fetch_unsplash_image, lambda: bool(config.UNSPLASH_ACCESS_KEY)),
"pexels": (fetch_pexels_image, lambda: bool(config.PEXELS_API_KEY)),
"wikimedia": (fetch_wikimedia_image, lambda: True), # No API key required
"picsum": (fetch_picsum_image, lambda: True), # Always available
}
def get_enabled_providers() -> list[
tuple[str, Callable[[str], Awaitable[tuple[str | None, str | None]]]]
]:
"""Get ordered list of enabled providers based on config and available API keys."""
provider_names = [
p.strip().lower() for p in config.ROYALTY_IMAGE_PROVIDERS.split(",") if p.strip()
]
enabled = []
for name in provider_names:
if name in _PROVIDER_REGISTRY:
fetch_fn, is_enabled = _PROVIDER_REGISTRY[name]
if is_enabled():
enabled.append((name, fetch_fn))
return enabled
async def fetch_royalty_free_image(query: str) -> tuple[str | None, str | None]:
"""Fetch royalty-free image using provider chain with fallback."""
# MCP endpoint takes highest priority if configured
if config.ROYALTY_IMAGE_MCP_ENDPOINT:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
config.ROYALTY_IMAGE_MCP_ENDPOINT,
json={"query": query},
)
response.raise_for_status()
payload = response.json()
image_url = payload.get("image_url") or payload.get("url")
image_credit = payload.get("image_credit") or payload.get("credit")
if image_url:
return str(image_url), str(image_credit or "Royalty-free")
except Exception:
logger.exception("MCP image retrieval failed")
# Extract keywords for better image search
refined_query = extract_image_keywords(query)
# Try each enabled provider in order
for provider_name, fetch_fn in get_enabled_providers():
try:
image_url, credit = await fetch_fn(refined_query)
if image_url:
return image_url, credit
except Exception:
logger.exception("%s image retrieval failed", provider_name.capitalize())
return None, None
def parse_news_response(response: dict) -> list[dict]:
content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
try:
items = json.loads(content)
if isinstance(items, list):
return items
except json.JSONDecodeError:
logger.error("Failed to parse news response: %s", content[:200])
return []
async def download_and_optimize_image(image_url: str) -> str | None:
if not image_url:
return None
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(image_url)
response.raise_for_status()
img = Image.open(BytesIO(response.content))
if img.width > 1200:
ratio = 1200 / img.width
new_height = int(img.height * ratio)
img = img.resize((1200, new_height), Image.Resampling.LANCZOS)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
filename = hashlib.md5(image_url.encode()).hexdigest() + ".jpg"
filepath = os.path.join(config.STATIC_IMAGES_DIR, filename)
img.save(filepath, "JPEG", quality=config.IMAGE_QUALITY, optimize=True)
return f"/static/images/{filename}"
except Exception:
logger.exception("Failed to download/optimize image: %s", image_url)
return None
async def fetch_news_with_retry(
max_attempts: int = 3, article_count: int | None = None
) -> list[dict]:
query = "What are the latest AI news from the last hour? Include source URLs and image URLs."
if article_count is not None:
bounded = max(1, min(50, int(article_count)))
query = (
f"What are the latest AI news from the last hour? Return exactly {bounded} items. "
"Include source URLs and image URLs."
)
for attempt in range(max_attempts):
try:
response = await call_perplexity_api(query)
if response:
return parse_news_response(response)
except Exception:
wait = 2**attempt
logger.warning("Perplexity API attempt %d failed, retrying in %ds", attempt + 1, wait)
await asyncio.sleep(wait)
logger.warning("Perplexity API exhausted, trying OpenRouter fallback")
try:
response = await call_openrouter_api(query)
if response:
return parse_news_response(response)
except Exception:
logger.exception("OpenRouter fallback also failed")
return []
async def process_and_store_news(article_count: int | None = None) -> int:
items = await fetch_news_with_retry(article_count=article_count)
if not items:
logger.warning("No news items fetched this cycle")
return 0
db = SessionLocal()
stored = 0
try:
for item in items:
headline = item.get("headline", "").strip()
summary = item.get("summary", "").strip()
if not headline or not summary:
continue
if headline_exists_within_24h(db, headline):
logger.debug("Duplicate headline skipped: %s", headline[:80])
continue
local_image = await download_and_optimize_image(item.get("image_url", ""))
image_url = local_image or PLACEHOLDER_IMAGE_PATH
summary_artifact: dict | None = None
if config.PERPLEXITY_API_KEY:
try:
summary_response = await call_perplexity_summary_api(
headline=headline,
summary=summary,
source_url=item.get("source_url"),
)
if summary_response:
summary_artifact = parse_summary_response(summary_response)
except Exception:
logger.exception("Summary generation failed for article: %s", headline[:80])
if summary_artifact is None:
summary_artifact = build_fallback_summary(summary, item.get("source_url"))
summary_image_url, summary_image_credit = await fetch_royalty_free_image(headline)
summary_local_image = None
if summary_image_url:
summary_local_image = await download_and_optimize_image(summary_image_url)
if summary_local_image:
summary_image_url = summary_local_image
if not summary_image_url:
summary_image_url = image_url
if not summary_image_credit:
summary_image_credit = item.get("image_credit")
tldr_points = summary_artifact.get("tldr_points") if summary_artifact else None
summary_body = summary_artifact.get("summary_body") if summary_artifact else None
source_citation = summary_artifact.get("source_citation") if summary_artifact else None
created_news_item = create_news(
db=db,
headline=headline,
summary=summary,
source_url=item.get("source_url"),
image_url=image_url,
image_credit=item.get("image_credit"),
tldr_points=tldr_points,
summary_body=summary_body,
source_citation=source_citation,
summary_image_url=summary_image_url,
summary_image_credit=summary_image_credit,
)
translations = await generate_translations(
headline=headline,
summary=summary,
tldr_points=tldr_points,
summary_body=summary_body,
source_citation=source_citation,
)
for language_code, payload in translations.items():
if translation_exists(db, created_news_item.id, language_code):
continue
create_translation(
db=db,
news_item_id=created_news_item.id,
language=language_code,
headline=payload["headline"],
summary=payload["summary"],
tldr_points=payload.get("tldr_points"),
summary_body=payload.get("summary_body"),
source_citation=payload.get("source_citation"),
)
stored += 1
logger.info("Stored %d new news items", stored)
finally:
db.close()
return stored
def scheduled_news_fetch() -> None:
start = time.monotonic()
logger.info("Starting scheduled news fetch")
count = asyncio.run(process_and_store_news())
elapsed = time.monotonic() - start
logger.info("Scheduled news fetch complete: %d items in %.1fs", count, elapsed)