import asyncio import hashlib import json import logging import os import re import time from io import BytesIO from urllib.parse import quote_plus import httpx from PIL import Image from backend import config from backend.database import SessionLocal from backend.repository import ( create_news, create_translation, headline_exists_within_24h, translation_exists, ) logger = logging.getLogger(__name__) PLACEHOLDER_IMAGE_PATH = "/static/images/placeholder.png" async def call_perplexity_api(query: str) -> dict | None: headers = { "Authorization": f"Bearer {config.PERPLEXITY_API_KEY}", "Content-Type": "application/json", } payload = { "model": config.PERPLEXITY_MODEL, "messages": [ { "role": "system", "content": ( "You are a news aggregator. Return a JSON array of news items. " "Each item must have: headline, summary (2-3 sentences), source_url, " "image_url (a relevant image URL if available), image_credit. " "Return between 3 and 5 items. Respond ONLY with valid JSON array, no markdown." ), }, {"role": "user", "content": query}, ], "temperature": 0.3, } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload) cost_info = { "model": config.PERPLEXITY_MODEL, "status": response.status_code, "usage": response.json().get("usage", {}), } logger.info("Perplexity API cost: %s", json.dumps(cost_info)) response.raise_for_status() return response.json() async def call_openrouter_api(query: str) -> dict | None: if not config.OPENROUTER_API_KEY: return None headers = { "Authorization": f"Bearer {config.OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": config.OPENROUTER_MODEL, "messages": [ { "role": "system", "content": ( "You are a news aggregator. Return a JSON array of news items. " "Each item must have: headline, summary (2-3 sentences), source_url, " "image_url (a relevant image URL if available), image_credit. " "Return between 3 and 5 items. Respond ONLY with valid JSON array, no markdown." ), }, {"role": "user", "content": query}, ], "temperature": 0.3, } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(config.OPENROUTER_API_URL, headers=headers, json=payload) cost_info = { "model": config.OPENROUTER_MODEL, "status": response.status_code, "usage": response.json().get("usage", {}), } logger.info("OpenRouter API cost: %s", json.dumps(cost_info)) response.raise_for_status() return response.json() async def call_perplexity_translation_api( headline: str, summary: str, language: str, tldr_points: list[str] | None = None, summary_body: str | None = None, source_citation: str | None = None, ) -> dict | None: headers = { "Authorization": f"Bearer {config.PERPLEXITY_API_KEY}", "Content-Type": "application/json", } payload = { "model": config.PERPLEXITY_MODEL, "messages": [ { "role": "system", "content": ( "Translate the given headline and summary to the target language. " "Return only valid JSON object with keys: headline, summary. " "No markdown, no extra text." ), }, { "role": "user", "content": json.dumps( { "target_language": language, "headline": headline, "summary": summary, "tldr_points": tldr_points or [], "summary_body": summary_body or "", "source_citation": source_citation or "", } ), }, ], "temperature": 0.1, } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload) response.raise_for_status() return response.json() def parse_translation_response(response: dict) -> dict | None: content = response.get("choices", [{}])[0].get("message", {}).get("content", "") content = content.strip() if content.startswith("```"): content = content.split("\n", 1)[-1].rsplit("```", 1)[0] try: parsed = json.loads(content) if isinstance(parsed, dict): headline = str(parsed.get("headline", "")).strip() summary = str(parsed.get("summary", "")).strip() if headline and summary: tldr_points = parsed.get("tldr_points", []) if not isinstance(tldr_points, list): tldr_points = [] cleaned_points = [str(p).strip() for p in tldr_points if str(p).strip()] summary_body = str(parsed.get("summary_body", "")).strip() or None source_citation = str(parsed.get("source_citation", "")).strip() or None return { "headline": headline, "summary": summary, "tldr_points": cleaned_points, "summary_body": summary_body, "source_citation": source_citation, } except json.JSONDecodeError: logger.error("Failed to parse translation response: %s", content[:200]) return None async def generate_translations( headline: str, summary: str, tldr_points: list[str] | None = None, summary_body: str | None = None, source_citation: str | None = None, ) -> dict[str, dict]: translations: dict[str, dict] = {} language_names = {"ta": "Tamil", "ml": "Malayalam"} if not config.PERPLEXITY_API_KEY: return translations for language_code, language_name in language_names.items(): try: response = await call_perplexity_translation_api( headline=headline, summary=summary, language=language_name, tldr_points=tldr_points, summary_body=summary_body, source_citation=source_citation, ) if response: parsed = parse_translation_response(response) if parsed: translations[language_code] = parsed except Exception: logger.exception("Translation generation failed for %s", language_code) return translations async def call_perplexity_summary_api( headline: str, summary: str, source_url: str | None ) -> dict | None: headers = { "Authorization": f"Bearer {config.PERPLEXITY_API_KEY}", "Content-Type": "application/json", } payload = { "model": config.PERPLEXITY_MODEL, "messages": [ { "role": "system", "content": ( "Generate concise structured JSON for UI modal. Return only JSON object with keys: " "tldr_points (array of 3 short bullets), summary_body (detailed summary), " "source_citation (concise source/citation text). " "Always summarize from provided text only. No markdown." ), }, { "role": "user", "content": json.dumps( { "headline": headline, "summary": summary, "source_citation": source_url or "Original source", "summary_length_scale": config.SUMMARY_LENGTH_SCALE, "summary_length_rule": ( "1=very short, 2=short, 3=medium, 4=long, 5=very long. " "Use more detail as scale increases." ), } ), }, ], "temperature": 0.2, } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(config.PERPLEXITY_API_URL, headers=headers, json=payload) response.raise_for_status() return response.json() def parse_summary_response(response: dict) -> dict | None: content = response.get("choices", [{}])[0].get("message", {}).get("content", "") content = content.strip() if content.startswith("```"): content = content.split("\n", 1)[-1].rsplit("```", 1)[0] try: parsed = json.loads(content) except json.JSONDecodeError: logger.error("Failed to parse summary response: %s", content[:200]) return None if not isinstance(parsed, dict): return None tldr_points = parsed.get("tldr_points", []) if not isinstance(tldr_points, list): tldr_points = [] cleaned_points = [str(point).strip() for point in tldr_points if str(point).strip()] summary_body = str(parsed.get("summary_body", "")).strip() source_citation = str(parsed.get("source_citation", "")).strip() if not cleaned_points and not summary_body: return None return { "tldr_points": cleaned_points[:5], "summary_body": summary_body or None, "source_citation": source_citation or None, } def build_fallback_summary(summary: str, source_url: str | None) -> dict: segments = [ s.strip() for s in summary.replace("!", ".").replace("?", ".").split(".") if s.strip() ] points = segments[:3] if not points and summary.strip(): points = [summary.strip()[:180]] return { "tldr_points": points, "summary_body": summary, "source_citation": source_url or "Original source", } # Stop words to remove from image search queries _STOP_WORDS = frozenset( [ "a", "an", "the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "been", "be", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "need", "it", "its", "this", "that", "these", "those", "i", "you", "he", "she", "we", "they", "what", "which", "who", "whom", "how", "when", "where", "why", "all", "each", "every", "both", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "just", "also", "now", "here", "there", "about", "after", "before", "above", "below", "between", "into", "through", "during", "under", "again", "further", "then", "once", "announces", "announced", "says", "said", "reports", "reported", "reveals", "revealed", "launches", "launched", "introduces", "introduced", ] ) def extract_image_keywords(headline: str) -> str: """Extract relevant keywords from headline for image search. - Removes stop words (articles, prepositions, common verbs) - Limits to max 5 significant words - Handles edge cases (empty, only stop words, special characters) """ if not headline or not headline.strip(): return "news technology" # Normalize: remove special characters, keep alphanumeric and spaces cleaned = re.sub(r"[^\w\s]", " ", headline) # Split into words and lowercase words = cleaned.lower().split() # Filter out stop words and very short words keywords = [w for w in words if w not in _STOP_WORDS and len(w) > 2] # Limit to first 5 significant keywords keywords = keywords[:5] if not keywords: return "news technology" return " ".join(keywords) async def fetch_pixabay_image(query: str) -> tuple[str | None, str | None]: """Fetch image from Pixabay API.""" if not config.PIXABAY_API_KEY: return None, None try: encoded_query = quote_plus(query) url = ( f"https://pixabay.com/api/" f"?key={config.PIXABAY_API_KEY}" f"&q={encoded_query}" f"&image_type=photo&per_page=3&safesearch=true" ) async with httpx.AsyncClient(timeout=15.0) as client: response = await client.get(url) response.raise_for_status() data = response.json() hits = data.get("hits", []) if hits: image_url = hits[0].get("webformatURL") user = hits[0].get("user", "Unknown") if image_url: return str(image_url), f"Photo by {user} on Pixabay" except Exception: logger.exception("Pixabay image retrieval failed") return None, None async def fetch_unsplash_image(query: str) -> tuple[str | None, str | None]: """Fetch image from Unsplash API.""" if not config.UNSPLASH_ACCESS_KEY: return None, None try: encoded_query = quote_plus(query) url = f"https://api.unsplash.com/search/photos?query={encoded_query}&per_page=3" headers = {"Authorization": f"Client-ID {config.UNSPLASH_ACCESS_KEY}"} async with httpx.AsyncClient(timeout=15.0) as client: response = await client.get(url, headers=headers) response.raise_for_status() data = response.json() results = data.get("results", []) if results: image_url = results[0].get("urls", {}).get("regular") user_name = results[0].get("user", {}).get("name", "Unknown") if image_url: return str(image_url), f"Photo by {user_name} on Unsplash" except Exception: logger.exception("Unsplash image retrieval failed") return None, None async def fetch_pexels_image(query: str) -> tuple[str | None, str | None]: """Fetch image from Pexels API.""" if not config.PEXELS_API_KEY: return None, None try: encoded_query = quote_plus(query) url = f"https://api.pexels.com/v1/search?query={encoded_query}&per_page=3" headers = {"Authorization": config.PEXELS_API_KEY} async with httpx.AsyncClient(timeout=15.0) as client: response = await client.get(url, headers=headers) response.raise_for_status() data = response.json() photos = data.get("photos", []) if photos: image_url = photos[0].get("src", {}).get("large") photographer = photos[0].get("photographer", "Unknown") if image_url: return str(image_url), f"Photo by {photographer} on Pexels" except Exception: logger.exception("Pexels image retrieval failed") return None, None async def fetch_wikimedia_image(query: str) -> tuple[str | None, str | None]: """Fetch image from Wikimedia Commons.""" try: encoded_query = quote_plus(query[:120]) search_url = ( "https://commons.wikimedia.org/w/api.php" "?action=query&format=json&generator=search&gsrnamespace=6&gsrlimit=1" f"&gsrsearch={encoded_query}&prop=imageinfo&iiprop=url" ) async with httpx.AsyncClient( timeout=15.0, headers={"User-Agent": "ClawFortBot/1.0 (news image enrichment)"}, ) as client: response = await client.get(search_url) response.raise_for_status() data = response.json() pages = data.get("query", {}).get("pages", {}) if pages: first_page = next(iter(pages.values())) infos = first_page.get("imageinfo", []) if infos: url = infos[0].get("url") if url: return str(url), "Wikimedia Commons" except Exception: logger.exception("Wikimedia image retrieval failed") return None, None async def fetch_picsum_image(query: str) -> tuple[str | None, str | None]: """Generate deterministic Picsum image URL (always succeeds).""" seed = hashlib.md5(query.encode("utf-8")).hexdigest()[:12] return f"https://picsum.photos/seed/{seed}/1200/630", "Picsum Photos" # Provider registry: maps provider names to (fetch_function, requires_api_key) _PROVIDER_REGISTRY: dict[str, tuple] = { "pixabay": (fetch_pixabay_image, lambda: bool(config.PIXABAY_API_KEY)), "unsplash": (fetch_unsplash_image, lambda: bool(config.UNSPLASH_ACCESS_KEY)), "pexels": (fetch_pexels_image, lambda: bool(config.PEXELS_API_KEY)), "wikimedia": (fetch_wikimedia_image, lambda: True), # No API key required "picsum": (fetch_picsum_image, lambda: True), # Always available } def get_enabled_providers() -> list[tuple[str, callable]]: """Get ordered list of enabled providers based on config and available API keys.""" provider_names = [ p.strip().lower() for p in config.ROYALTY_IMAGE_PROVIDERS.split(",") if p.strip() ] enabled = [] for name in provider_names: if name in _PROVIDER_REGISTRY: fetch_fn, is_enabled = _PROVIDER_REGISTRY[name] if is_enabled(): enabled.append((name, fetch_fn)) return enabled async def fetch_royalty_free_image(query: str) -> tuple[str | None, str | None]: """Fetch royalty-free image using provider chain with fallback.""" # MCP endpoint takes highest priority if configured if config.ROYALTY_IMAGE_MCP_ENDPOINT: try: async with httpx.AsyncClient(timeout=15.0) as client: response = await client.post( config.ROYALTY_IMAGE_MCP_ENDPOINT, json={"query": query}, ) response.raise_for_status() payload = response.json() image_url = payload.get("image_url") or payload.get("url") image_credit = payload.get("image_credit") or payload.get("credit") if image_url: return str(image_url), str(image_credit or "Royalty-free") except Exception: logger.exception("MCP image retrieval failed") # Extract keywords for better image search refined_query = extract_image_keywords(query) # Try each enabled provider in order for provider_name, fetch_fn in get_enabled_providers(): try: image_url, credit = await fetch_fn(refined_query) if image_url: return image_url, credit except Exception: logger.exception("%s image retrieval failed", provider_name.capitalize()) return None, None def parse_news_response(response: dict) -> list[dict]: content = response.get("choices", [{}])[0].get("message", {}).get("content", "") content = content.strip() if content.startswith("```"): content = content.split("\n", 1)[-1].rsplit("```", 1)[0] try: items = json.loads(content) if isinstance(items, list): return items except json.JSONDecodeError: logger.error("Failed to parse news response: %s", content[:200]) return [] async def download_and_optimize_image(image_url: str) -> str | None: if not image_url: return None try: async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: response = await client.get(image_url) response.raise_for_status() img = Image.open(BytesIO(response.content)) if img.width > 1200: ratio = 1200 / img.width new_height = int(img.height * ratio) img = img.resize((1200, new_height), Image.Resampling.LANCZOS) if img.mode in ("RGBA", "P"): img = img.convert("RGB") filename = hashlib.md5(image_url.encode()).hexdigest() + ".jpg" filepath = os.path.join(config.STATIC_IMAGES_DIR, filename) img.save(filepath, "JPEG", quality=config.IMAGE_QUALITY, optimize=True) return f"/static/images/{filename}" except Exception: logger.exception("Failed to download/optimize image: %s", image_url) return None async def fetch_news_with_retry(max_attempts: int = 3) -> list[dict]: query = "What are the latest AI news from the last hour? Include source URLs and image URLs." for attempt in range(max_attempts): try: response = await call_perplexity_api(query) if response: return parse_news_response(response) except Exception: wait = 2**attempt logger.warning("Perplexity API attempt %d failed, retrying in %ds", attempt + 1, wait) await asyncio.sleep(wait) logger.warning("Perplexity API exhausted, trying OpenRouter fallback") try: response = await call_openrouter_api(query) if response: return parse_news_response(response) except Exception: logger.exception("OpenRouter fallback also failed") return [] async def process_and_store_news() -> int: items = await fetch_news_with_retry() if not items: logger.warning("No news items fetched this cycle") return 0 db = SessionLocal() stored = 0 try: for item in items: headline = item.get("headline", "").strip() summary = item.get("summary", "").strip() if not headline or not summary: continue if headline_exists_within_24h(db, headline): logger.debug("Duplicate headline skipped: %s", headline[:80]) continue local_image = await download_and_optimize_image(item.get("image_url", "")) image_url = local_image or PLACEHOLDER_IMAGE_PATH summary_artifact: dict | None = None if config.PERPLEXITY_API_KEY: try: summary_response = await call_perplexity_summary_api( headline=headline, summary=summary, source_url=item.get("source_url"), ) if summary_response: summary_artifact = parse_summary_response(summary_response) except Exception: logger.exception("Summary generation failed for article: %s", headline[:80]) if summary_artifact is None: summary_artifact = build_fallback_summary(summary, item.get("source_url")) summary_image_url, summary_image_credit = await fetch_royalty_free_image(headline) summary_local_image = None if summary_image_url: summary_local_image = await download_and_optimize_image(summary_image_url) if summary_local_image: summary_image_url = summary_local_image if not summary_image_url: summary_image_url = image_url if not summary_image_credit: summary_image_credit = item.get("image_credit") tldr_points = summary_artifact.get("tldr_points") if summary_artifact else None summary_body = summary_artifact.get("summary_body") if summary_artifact else None source_citation = summary_artifact.get("source_citation") if summary_artifact else None created_news_item = create_news( db=db, headline=headline, summary=summary, source_url=item.get("source_url"), image_url=image_url, image_credit=item.get("image_credit"), tldr_points=tldr_points, summary_body=summary_body, source_citation=source_citation, summary_image_url=summary_image_url, summary_image_credit=summary_image_credit, ) translations = await generate_translations( headline=headline, summary=summary, tldr_points=tldr_points, summary_body=summary_body, source_citation=source_citation, ) for language_code, payload in translations.items(): if translation_exists(db, created_news_item.id, language_code): continue create_translation( db=db, news_item_id=created_news_item.id, language=language_code, headline=payload["headline"], summary=payload["summary"], tldr_points=payload.get("tldr_points"), summary_body=payload.get("summary_body"), source_citation=payload.get("source_citation"), ) stored += 1 logger.info("Stored %d new news items", stored) finally: db.close() return stored def scheduled_news_fetch() -> None: start = time.monotonic() logger.info("Starting scheduled news fetch") count = asyncio.run(process_and_store_news()) elapsed = time.monotonic() - start logger.info("Scheduled news fetch complete: %d items in %.1fs", count, elapsed)