import argparse import asyncio import datetime import json import logging import os import re import sys import time from sqlalchemy import and_, desc from backend import config from backend.database import SessionLocal, init_db from backend.models import NewsItem from backend.news_service import ( download_and_optimize_image, extract_image_keywords, fetch_royalty_free_image, generate_translations, process_and_store_news, ) from backend.repository import ( create_translation, delete_archived_news, get_translation, resolve_tldr_points, ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) def positive_int(value: str) -> int: try: parsed = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be an integer") from exc if parsed <= 0: raise argparse.ArgumentTypeError("must be greater than 0") return parsed def bounded_count(value: str) -> int: parsed = positive_int(value) if parsed > 50: raise argparse.ArgumentTypeError("must be <= 50") return parsed def print_result(command: str, status: str, **fields: object) -> None: parts = [f"admin:{command}", f"status={status}"] parts.extend([f"{key}={value}" for key, value in fields.items()]) print(" ".join(parts)) def require_confirm(args: argparse.Namespace, action: str) -> bool: if getattr(args, "confirm", False): return True print_result( action, "blocked", reason="missing-confirm", hint="rerun with --confirm", ) return False def build_contextual_query(headline: str, summary: str | None) -> str: headline_query = extract_image_keywords(headline) summary_query = extract_image_keywords(summary or "") mood_terms: list[str] = [] text = f"{headline} {summary or ''}".lower() if any(word in text for word in ("breakthrough", "launch", "record", "surge", "growth")): mood_terms.extend(["innovation", "future"]) if any(word in text for word in ("risk", "lawsuit", "ban", "decline", "drop", "crash")): mood_terms.extend(["serious", "technology"]) combined = " ".join([headline_query, summary_query, " ".join(mood_terms)]).strip() cleaned = re.sub(r"\s+", " ", combined).strip() if not cleaned: return "ai machine learning deep learning" return cleaned async def refetch_images_for_latest( limit: int, max_attempts: int, dry_run: bool, ) -> tuple[int, int]: db = SessionLocal() processed = 0 refreshed = 0 try: items = ( db.query(NewsItem) .filter(NewsItem.archived.is_(False)) .order_by(desc(NewsItem.published_at)) .limit(limit) .all() ) total = len(items) for idx, item in enumerate(items, start=1): processed += 1 query = build_contextual_query(item.headline, item.summary) image_url: str | None = None image_credit: str | None = None local_image: str | None = None for attempt in range(max_attempts): try: image_url, image_credit = await fetch_royalty_free_image(query) if not image_url: raise RuntimeError("no-image-url") local_image = await download_and_optimize_image(image_url) if not local_image: raise RuntimeError("image-download-or-optimize-failed") break except Exception: if attempt == max_attempts - 1: logger.exception("Image refetch failed for item=%s after retries", item.id) image_url = None local_image = None break delay = 2**attempt logger.warning( "Refetch retry item=%s attempt=%d delay=%ds", item.id, attempt + 1, delay, ) await asyncio.sleep(delay) if local_image: refreshed += 1 if not dry_run: item.summary_image_url = local_image item.summary_image_credit = image_credit or item.summary_image_credit db.commit() print_result( "refetch-images", "progress", current=idx, total=total, refreshed=refreshed, article_id=item.id, ) return processed, refreshed finally: db.close() def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="clawfort", description="ClawFort operations CLI") subparsers = parser.add_subparsers(dest="command", required=True) force_fetch_parser = subparsers.add_parser( "force-fetch", help="Run one immediate news fetch cycle", description="Trigger one immediate news fetch run outside scheduler cadence.", ) force_fetch_parser.set_defaults(handler=handle_force_fetch) admin_parser = subparsers.add_parser( "admin", help="Administrative maintenance commands", description="Run admin maintenance and recovery operations.", ) admin_subparsers = admin_parser.add_subparsers(dest="admin_command", required=True) fetch_parser = admin_subparsers.add_parser("fetch", help="Fetch n number of articles") fetch_parser.add_argument("--count", type=bounded_count, default=5) fetch_parser.set_defaults(handler=handle_admin_fetch) refetch_parser = admin_subparsers.add_parser( "refetch-images", help="Refetch and optimize latest article images", ) refetch_parser.add_argument("--limit", type=positive_int, default=30) refetch_parser.add_argument("--max-attempts", type=positive_int, default=4) refetch_parser.add_argument("--dry-run", action="store_true") refetch_parser.set_defaults(handler=handle_admin_refetch_images) clean_archive_parser = admin_subparsers.add_parser( "clean-archive", help="Delete archived items older than retention window", ) clean_archive_parser.add_argument("--days", type=positive_int, default=60) clean_archive_parser.add_argument("--confirm", action="store_true") clean_archive_parser.add_argument("--dry-run", action="store_true") clean_archive_parser.set_defaults(handler=handle_admin_clean_archive) clear_cache_parser = admin_subparsers.add_parser( "clear-cache", help="Clear optimized image cache" ) clear_cache_parser.add_argument("--confirm", action="store_true") clear_cache_parser.add_argument("--dry-run", action="store_true") clear_cache_parser.set_defaults(handler=handle_admin_clear_cache) clear_news_parser = admin_subparsers.add_parser("clear-news", help="Clear existing news items") clear_news_parser.add_argument("--include-archived", action="store_true") clear_news_parser.add_argument("--confirm", action="store_true") clear_news_parser.add_argument("--dry-run", action="store_true") clear_news_parser.set_defaults(handler=handle_admin_clear_news) rebuild_parser = admin_subparsers.add_parser( "rebuild-site", help="Clear and rebuild site content" ) rebuild_parser.add_argument("--count", type=bounded_count, default=5) rebuild_parser.add_argument("--confirm", action="store_true") rebuild_parser.add_argument("--dry-run", action="store_true") rebuild_parser.set_defaults(handler=handle_admin_rebuild_site) regen_parser = admin_subparsers.add_parser( "regenerate-translations", help="Regenerate translations for existing articles", ) regen_parser.add_argument("--limit", type=positive_int, default=0) regen_parser.add_argument("--dry-run", action="store_true") regen_parser.set_defaults(handler=handle_admin_regenerate_translations) return parser def validate_runtime() -> None: if not config.PERPLEXITY_API_KEY and not config.OPENROUTER_API_KEY: raise RuntimeError( "No provider API key configured. Set PERPLEXITY_API_KEY or OPENROUTER_API_KEY in the environment." ) def handle_force_fetch(_: argparse.Namespace) -> int: start = time.monotonic() try: validate_runtime() os.makedirs("data", exist_ok=True) init_db() stored_count = asyncio.run(process_and_store_news()) elapsed = time.monotonic() - start print(f"force-fetch succeeded: stored={stored_count} elapsed={elapsed:.1f}s") return 0 except Exception as exc: logger.exception("force-fetch failed") print(f"force-fetch failed: {exc}", file=sys.stderr) print( "Check API keys, network connectivity, and provider status, then retry the command.", file=sys.stderr, ) return 1 def handle_admin_fetch(args: argparse.Namespace) -> int: start = time.monotonic() try: validate_runtime() init_db() stored = asyncio.run(process_and_store_news(article_count=args.count)) elapsed = time.monotonic() - start print_result("fetch", "ok", requested=args.count, stored=stored, elapsed=f"{elapsed:.1f}s") return 0 except Exception: logger.exception("admin fetch failed") print_result("fetch", "error") return 1 def handle_admin_refetch_images(args: argparse.Namespace) -> int: start = time.monotonic() try: init_db() processed, refreshed = asyncio.run( refetch_images_for_latest( limit=min(args.limit, 30), max_attempts=args.max_attempts, dry_run=args.dry_run, ) ) elapsed = time.monotonic() - start print_result( "refetch-images", "ok", processed=processed, refreshed=refreshed, dry_run=args.dry_run, elapsed=f"{elapsed:.1f}s", ) return 0 except Exception: logger.exception("admin refetch-images failed") print_result("refetch-images", "error") return 1 def handle_admin_clean_archive(args: argparse.Namespace) -> int: if not require_confirm(args, "clean-archive"): return 2 db = SessionLocal() try: cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=args.days) query = db.query(NewsItem).filter( and_(NewsItem.archived.is_(True), NewsItem.created_at < cutoff) ) count = query.count() if args.dry_run: print_result("clean-archive", "ok", dry_run=True, would_delete=count) return 0 deleted = delete_archived_news(db, days_after_archive=args.days) print_result("clean-archive", "ok", deleted=deleted) return 0 except Exception: logger.exception("admin clean-archive failed") print_result("clean-archive", "error") return 1 finally: db.close() def handle_admin_clear_cache(args: argparse.Namespace) -> int: if not require_confirm(args, "clear-cache"): return 2 try: os.makedirs(config.STATIC_IMAGES_DIR, exist_ok=True) files = [ os.path.join(config.STATIC_IMAGES_DIR, name) for name in os.listdir(config.STATIC_IMAGES_DIR) if name.lower().endswith((".jpg", ".jpeg", ".png", ".webp")) ] if args.dry_run: print_result("clear-cache", "ok", dry_run=True, would_delete=len(files)) return 0 deleted = 0 for file_path in files: try: os.remove(file_path) deleted += 1 except OSError: logger.warning("Failed to remove cache file: %s", file_path) print_result("clear-cache", "ok", deleted=deleted) return 0 except Exception: logger.exception("admin clear-cache failed") print_result("clear-cache", "error") return 1 def handle_admin_clear_news(args: argparse.Namespace) -> int: if not require_confirm(args, "clear-news"): return 2 db = SessionLocal() try: query = db.query(NewsItem) if not args.include_archived: query = query.filter(NewsItem.archived.is_(False)) items = query.all() if args.dry_run: print_result("clear-news", "ok", dry_run=True, would_delete=len(items)) return 0 deleted = 0 for item in items: db.delete(item) deleted += 1 db.commit() print_result("clear-news", "ok", deleted=deleted) return 0 except Exception: db.rollback() logger.exception("admin clear-news failed") print_result("clear-news", "error") return 1 finally: db.close() def handle_admin_rebuild_site(args: argparse.Namespace) -> int: if not require_confirm(args, "rebuild-site"): return 2 if args.dry_run: print_result("rebuild-site", "ok", dry_run=True, steps="clear-news,fetch") return 0 clear_result = handle_admin_clear_news( argparse.Namespace(include_archived=True, confirm=True, dry_run=False) ) if clear_result != 0: print_result("rebuild-site", "error", step="clear-news") return clear_result fetch_result = handle_admin_fetch(argparse.Namespace(count=args.count)) if fetch_result != 0: print_result("rebuild-site", "error", step="fetch") return fetch_result print_result("rebuild-site", "ok", count=args.count) return 0 def handle_admin_regenerate_translations(args: argparse.Namespace) -> int: db = SessionLocal() try: query = db.query(NewsItem).filter(NewsItem.archived.is_(False)).order_by(desc(NewsItem.id)) if args.limit and args.limit > 0: query = query.limit(args.limit) items = query.all() regenerated = 0 for item in items: tldr_points = resolve_tldr_points(item, None) translations = asyncio.run( generate_translations( headline=item.headline, summary=item.summary, tldr_points=tldr_points, summary_body=item.summary_body, source_citation=item.source_citation, ) ) for language_code, payload in translations.items(): if args.dry_run: regenerated += 1 continue existing = get_translation(db, item.id, language_code) if existing is None: create_translation( db=db, news_item_id=item.id, language=language_code, headline=payload["headline"], summary=payload["summary"], tldr_points=payload.get("tldr_points"), summary_body=payload.get("summary_body"), source_citation=payload.get("source_citation"), ) else: existing.headline = payload["headline"] existing.summary = payload["summary"] existing.tldr_points = ( json.dumps(payload.get("tldr_points")) if payload.get("tldr_points") else None ) existing.summary_body = payload.get("summary_body") existing.source_citation = payload.get("source_citation") regenerated += 1 if not args.dry_run: db.commit() print_result( "regenerate-translations", "ok", articles=len(items), regenerated=regenerated, dry_run=args.dry_run, ) return 0 except Exception: db.rollback() logger.exception("admin regenerate-translations failed") print_result("regenerate-translations", "error") return 1 finally: db.close() def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) handler = args.handler return handler(args) if __name__ == "__main__": raise SystemExit(main())