import argparse import asyncio import datetime import json import logging import os import re import sys import time from sqlalchemy import and_, desc from backend import config from backend.database import SessionLocal, init_db from backend.models import NewsItem from backend.news_service import ( GENERIC_AI_FALLBACK_URL, download_and_optimize_image, extract_image_keywords, fetch_royalty_free_image, generate_translations, process_and_store_news, ) from backend.repository import ( create_translation, delete_archived_news, get_translation, resolve_tldr_points, ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) def positive_int(value: str) -> int: try: parsed = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be an integer") from exc if parsed <= 0: raise argparse.ArgumentTypeError("must be greater than 0") return parsed def bounded_count(value: str) -> int: parsed = positive_int(value) if parsed > 50: raise argparse.ArgumentTypeError("must be <= 50") return parsed def print_result(command: str, status: str, **fields: object) -> None: parts = [f"admin:{command}", f"status={status}"] parts.extend([f"{key}={value}" for key, value in fields.items()]) print(" ".join(parts)) def require_confirm(args: argparse.Namespace, action: str) -> bool: if getattr(args, "confirm", False): return True print_result( action, "blocked", reason="missing-confirm", hint="rerun with --confirm", ) return False def build_contextual_query(headline: str, summary: str | None) -> str: headline_query = extract_image_keywords(headline) summary_query = extract_image_keywords(summary or "") mood_terms: list[str] = [] text = f"{headline} {summary or ''}".lower() if any(word in text for word in ("breakthrough", "launch", "record", "surge", "growth")): mood_terms.extend(["innovation", "future"]) if any(word in text for word in ("risk", "lawsuit", "ban", "decline", "drop", "crash")): mood_terms.extend(["serious", "technology"]) combined = " ".join([headline_query, summary_query, " ".join(mood_terms)]).strip() cleaned = re.sub(r"\s+", " ", combined).strip() if not cleaned: return "ai machine learning deep learning" return cleaned def resolve_article_id_from_permalink(value: str | None) -> int | None: if not value: return None if value.isdigit(): return int(value) match = re.search(r"(?:\?|&)article=(\d+)", value) if match: return int(match.group(1)) return None def is_unrelated_image_candidate(image_url: str | None, image_credit: str | None) -> bool: text = f"{image_url or ''} {image_credit or ''}".lower() blocked = ( "cat", "dog", "pet", "animal", "wildlife", "lion", "tiger", "bird", "horse", ) return any(term in text for term in blocked) async def refetch_image_for_item( item: NewsItem, max_attempts: int, ) -> tuple[str | None, str | None, str]: query = build_contextual_query(item.headline, item.summary) current_summary_image = item.summary_image_url query_variants = [ f"{query} alternative angle", f"{query} concept illustration", query, ] for query_variant in query_variants: for attempt in range(max_attempts): try: image_url, image_credit = await fetch_royalty_free_image(query_variant) if not image_url: raise RuntimeError("no-image-url") if is_unrelated_image_candidate(image_url, image_credit): logger.info("Rejected unrelated image candidate: %s", image_url) continue local_image = await download_and_optimize_image(image_url) if not local_image: raise RuntimeError("image-download-or-optimize-failed") if current_summary_image and local_image == current_summary_image: logger.info("Rejected duplicate image candidate for article=%s", item.id) continue return local_image, image_credit, "provider" except Exception: if attempt < max_attempts - 1: delay = 2**attempt await asyncio.sleep(delay) fallback_local = await download_and_optimize_image(GENERIC_AI_FALLBACK_URL) if fallback_local and fallback_local != current_summary_image: return fallback_local, "AI-themed fallback", "fallback" return None, None, "none" async def refetch_images_for_latest( limit: int, max_attempts: int, dry_run: bool, target_article_id: int | None = None, ) -> tuple[int, int]: db = SessionLocal() processed = 0 refreshed = 0 try: if target_article_id is not None: items = ( db.query(NewsItem) .filter(NewsItem.archived.is_(False), NewsItem.id == target_article_id) .all() ) else: items = ( db.query(NewsItem) .filter(NewsItem.archived.is_(False)) .order_by(desc(NewsItem.published_at)) .limit(limit) .all() ) total = len(items) for idx, item in enumerate(items, start=1): processed += 1 local_image, image_credit, decision = await refetch_image_for_item( item=item, max_attempts=max_attempts, ) if local_image: refreshed += 1 if not dry_run: item.summary_image_url = local_image item.summary_image_credit = image_credit or item.summary_image_credit db.commit() print_result( "refetch-images", "progress", current=idx, total=total, refreshed=refreshed, article_id=item.id, decision=decision, ) return processed, refreshed finally: db.close() def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="clawfort", description="ClawFort operations CLI") subparsers = parser.add_subparsers(dest="command", required=True) force_fetch_parser = subparsers.add_parser( "force-fetch", help="Run one immediate news fetch cycle", description="Trigger one immediate news fetch run outside scheduler cadence.", ) force_fetch_parser.set_defaults(handler=handle_force_fetch) admin_parser = subparsers.add_parser( "admin", help="Administrative maintenance commands", description="Run admin maintenance and recovery operations.", ) admin_subparsers = admin_parser.add_subparsers(dest="admin_command", required=True) fetch_parser = admin_subparsers.add_parser("fetch", help="Fetch n number of articles") fetch_parser.add_argument("--count", type=bounded_count, default=5) fetch_parser.set_defaults(handler=handle_admin_fetch) refetch_parser = admin_subparsers.add_parser( "refetch-images", help="Refetch and optimize latest article images", ) refetch_parser.add_argument("--limit", type=positive_int, default=30) refetch_parser.add_argument( "--permalink", type=str, default="", help="Target one article by permalink (for example '/?article=123' or '123')", ) refetch_parser.add_argument("--max-attempts", type=positive_int, default=4) refetch_parser.add_argument("--dry-run", action="store_true") refetch_parser.set_defaults(handler=handle_admin_refetch_images) clean_archive_parser = admin_subparsers.add_parser( "clean-archive", help="Delete archived items older than retention window", ) clean_archive_parser.add_argument("--days", type=positive_int, default=60) clean_archive_parser.add_argument("--confirm", action="store_true") clean_archive_parser.add_argument("--dry-run", action="store_true") clean_archive_parser.set_defaults(handler=handle_admin_clean_archive) clear_cache_parser = admin_subparsers.add_parser( "clear-cache", help="Clear optimized image cache" ) clear_cache_parser.add_argument("--confirm", action="store_true") clear_cache_parser.add_argument("--dry-run", action="store_true") clear_cache_parser.set_defaults(handler=handle_admin_clear_cache) clear_news_parser = admin_subparsers.add_parser("clear-news", help="Clear existing news items") clear_news_parser.add_argument("--include-archived", action="store_true") clear_news_parser.add_argument("--confirm", action="store_true") clear_news_parser.add_argument("--dry-run", action="store_true") clear_news_parser.set_defaults(handler=handle_admin_clear_news) rebuild_parser = admin_subparsers.add_parser( "rebuild-site", help="Clear and rebuild site content" ) rebuild_parser.add_argument("--count", type=bounded_count, default=5) rebuild_parser.add_argument("--confirm", action="store_true") rebuild_parser.add_argument("--dry-run", action="store_true") rebuild_parser.set_defaults(handler=handle_admin_rebuild_site) regen_parser = admin_subparsers.add_parser( "regenerate-translations", help="Regenerate translations for existing articles", ) regen_parser.add_argument("--limit", type=positive_int, default=0) regen_parser.add_argument("--dry-run", action="store_true") regen_parser.set_defaults(handler=handle_admin_regenerate_translations) return parser def validate_runtime() -> None: if not config.PERPLEXITY_API_KEY and not config.OPENROUTER_API_KEY: raise RuntimeError( "No provider API key configured. Set PERPLEXITY_API_KEY or OPENROUTER_API_KEY in the environment." ) def handle_force_fetch(_: argparse.Namespace) -> int: start = time.monotonic() try: validate_runtime() os.makedirs("data", exist_ok=True) init_db() stored_count = asyncio.run(process_and_store_news()) elapsed = time.monotonic() - start print(f"force-fetch succeeded: stored={stored_count} elapsed={elapsed:.1f}s") return 0 except Exception as exc: logger.exception("force-fetch failed") print(f"force-fetch failed: {exc}", file=sys.stderr) print( "Check API keys, network connectivity, and provider status, then retry the command.", file=sys.stderr, ) return 1 def handle_admin_fetch(args: argparse.Namespace) -> int: start = time.monotonic() try: validate_runtime() init_db() stored = asyncio.run(process_and_store_news(article_count=args.count)) elapsed = time.monotonic() - start print_result("fetch", "ok", requested=args.count, stored=stored, elapsed=f"{elapsed:.1f}s") return 0 except Exception: logger.exception("admin fetch failed") print_result("fetch", "error") return 1 def handle_admin_refetch_images(args: argparse.Namespace) -> int: start = time.monotonic() try: init_db() target_article_id = resolve_article_id_from_permalink(args.permalink) if args.permalink and target_article_id is None: print_result( "refetch-images", "blocked", reason="invalid-permalink", hint="use '/?article=' or raw numeric id", ) return 2 processed, refreshed = asyncio.run( refetch_images_for_latest( limit=min(args.limit, 30), max_attempts=args.max_attempts, dry_run=args.dry_run, target_article_id=target_article_id, ) ) elapsed = time.monotonic() - start print_result( "refetch-images", "ok", processed=processed, refreshed=refreshed, target_article_id=target_article_id, dry_run=args.dry_run, elapsed=f"{elapsed:.1f}s", ) return 0 except Exception: logger.exception("admin refetch-images failed") print_result("refetch-images", "error") return 1 def handle_admin_clean_archive(args: argparse.Namespace) -> int: if not require_confirm(args, "clean-archive"): return 2 db = SessionLocal() try: cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=args.days) query = db.query(NewsItem).filter( and_(NewsItem.archived.is_(True), NewsItem.created_at < cutoff) ) count = query.count() if args.dry_run: print_result("clean-archive", "ok", dry_run=True, would_delete=count) return 0 deleted = delete_archived_news(db, days_after_archive=args.days) print_result("clean-archive", "ok", deleted=deleted) return 0 except Exception: logger.exception("admin clean-archive failed") print_result("clean-archive", "error") return 1 finally: db.close() def handle_admin_clear_cache(args: argparse.Namespace) -> int: if not require_confirm(args, "clear-cache"): return 2 try: os.makedirs(config.STATIC_IMAGES_DIR, exist_ok=True) files = [ os.path.join(config.STATIC_IMAGES_DIR, name) for name in os.listdir(config.STATIC_IMAGES_DIR) if name.lower().endswith((".jpg", ".jpeg", ".png", ".webp")) ] if args.dry_run: print_result("clear-cache", "ok", dry_run=True, would_delete=len(files)) return 0 deleted = 0 for file_path in files: try: os.remove(file_path) deleted += 1 except OSError: logger.warning("Failed to remove cache file: %s", file_path) print_result("clear-cache", "ok", deleted=deleted) return 0 except Exception: logger.exception("admin clear-cache failed") print_result("clear-cache", "error") return 1 def handle_admin_clear_news(args: argparse.Namespace) -> int: if not require_confirm(args, "clear-news"): return 2 db = SessionLocal() try: query = db.query(NewsItem) if not args.include_archived: query = query.filter(NewsItem.archived.is_(False)) items = query.all() if args.dry_run: print_result("clear-news", "ok", dry_run=True, would_delete=len(items)) return 0 deleted = 0 for item in items: db.delete(item) deleted += 1 db.commit() print_result("clear-news", "ok", deleted=deleted) return 0 except Exception: db.rollback() logger.exception("admin clear-news failed") print_result("clear-news", "error") return 1 finally: db.close() def handle_admin_rebuild_site(args: argparse.Namespace) -> int: if not require_confirm(args, "rebuild-site"): return 2 if args.dry_run: print_result("rebuild-site", "ok", dry_run=True, steps="clear-news,fetch") return 0 clear_result = handle_admin_clear_news( argparse.Namespace(include_archived=True, confirm=True, dry_run=False) ) if clear_result != 0: print_result("rebuild-site", "error", step="clear-news") return clear_result fetch_result = handle_admin_fetch(argparse.Namespace(count=args.count)) if fetch_result != 0: print_result("rebuild-site", "error", step="fetch") return fetch_result print_result("rebuild-site", "ok", count=args.count) return 0 def handle_admin_regenerate_translations(args: argparse.Namespace) -> int: db = SessionLocal() try: query = db.query(NewsItem).filter(NewsItem.archived.is_(False)).order_by(desc(NewsItem.id)) if args.limit and args.limit > 0: query = query.limit(args.limit) items = query.all() regenerated = 0 for item in items: tldr_points = resolve_tldr_points(item, None) translations = asyncio.run( generate_translations( headline=item.headline, summary=item.summary, tldr_points=tldr_points, summary_body=item.summary_body, source_citation=item.source_citation, ) ) for language_code, payload in translations.items(): if args.dry_run: regenerated += 1 continue existing = get_translation(db, item.id, language_code) if existing is None: create_translation( db=db, news_item_id=item.id, language=language_code, headline=payload["headline"], summary=payload["summary"], tldr_points=payload.get("tldr_points"), summary_body=payload.get("summary_body"), source_citation=payload.get("source_citation"), ) else: existing.headline = payload["headline"] existing.summary = payload["summary"] existing.tldr_points = ( json.dumps(payload.get("tldr_points")) if payload.get("tldr_points") else None ) existing.summary_body = payload.get("summary_body") existing.source_citation = payload.get("source_citation") regenerated += 1 if not args.dry_run: db.commit() print_result( "regenerate-translations", "ok", articles=len(items), regenerated=regenerated, dry_run=args.dry_run, ) return 0 except Exception: db.rollback() logger.exception("admin regenerate-translations failed") print_result("regenerate-translations", "error") return 1 finally: db.close() def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) handler = args.handler return handler(args) if __name__ == "__main__": raise SystemExit(main())