Files
clawfort/backend/cli.py
Santhosh Janardhanan 679561bcdb
Some checks failed
quality-gates / lint-and-test (push) Has been cancelled
quality-gates / security-scan (push) Has been cancelled
First deployment
2026-02-13 09:14:04 -05:00

557 lines
18 KiB
Python

import argparse
import asyncio
import datetime
import json
import logging
import os
import re
import sys
import time
from sqlalchemy import and_, desc
from backend import config
from backend.database import SessionLocal, init_db
from backend.models import NewsItem
from backend.news_service import (
GENERIC_AI_FALLBACK_URL,
download_and_optimize_image,
extract_image_keywords,
fetch_royalty_free_image,
generate_translations,
process_and_store_news,
)
from backend.repository import (
create_translation,
delete_archived_news,
get_translation,
resolve_tldr_points,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def positive_int(value: str) -> int:
try:
parsed = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be an integer") from exc
if parsed <= 0:
raise argparse.ArgumentTypeError("must be greater than 0")
return parsed
def bounded_count(value: str) -> int:
parsed = positive_int(value)
if parsed > 50:
raise argparse.ArgumentTypeError("must be <= 50")
return parsed
def print_result(command: str, status: str, **fields: object) -> None:
parts = [f"admin:{command}", f"status={status}"]
parts.extend([f"{key}={value}" for key, value in fields.items()])
print(" ".join(parts))
def require_confirm(args: argparse.Namespace, action: str) -> bool:
if getattr(args, "confirm", False):
return True
print_result(
action,
"blocked",
reason="missing-confirm",
hint="rerun with --confirm",
)
return False
def build_contextual_query(headline: str, summary: str | None) -> str:
headline_query = extract_image_keywords(headline)
summary_query = extract_image_keywords(summary or "")
mood_terms: list[str] = []
text = f"{headline} {summary or ''}".lower()
if any(word in text for word in ("breakthrough", "launch", "record", "surge", "growth")):
mood_terms.extend(["innovation", "future"])
if any(word in text for word in ("risk", "lawsuit", "ban", "decline", "drop", "crash")):
mood_terms.extend(["serious", "technology"])
combined = " ".join([headline_query, summary_query, " ".join(mood_terms)]).strip()
cleaned = re.sub(r"\s+", " ", combined).strip()
if not cleaned:
return "ai machine learning deep learning"
return cleaned
def resolve_article_id_from_permalink(value: str | None) -> int | None:
if not value:
return None
if value.isdigit():
return int(value)
match = re.search(r"(?:\?|&)article=(\d+)", value)
if match:
return int(match.group(1))
return None
def is_unrelated_image_candidate(image_url: str | None, image_credit: str | None) -> bool:
text = f"{image_url or ''} {image_credit or ''}".lower()
blocked = (
"cat",
"dog",
"pet",
"animal",
"wildlife",
"lion",
"tiger",
"bird",
"horse",
)
return any(term in text for term in blocked)
async def refetch_image_for_item(
item: NewsItem,
max_attempts: int,
) -> tuple[str | None, str | None, str]:
query = build_contextual_query(item.headline, item.summary)
current_summary_image = item.summary_image_url
query_variants = [
f"{query} alternative angle",
f"{query} concept illustration",
query,
]
for query_variant in query_variants:
for attempt in range(max_attempts):
try:
image_url, image_credit = await fetch_royalty_free_image(query_variant)
if not image_url:
raise RuntimeError("no-image-url")
if is_unrelated_image_candidate(image_url, image_credit):
logger.info("Rejected unrelated image candidate: %s", image_url)
continue
local_image = await download_and_optimize_image(image_url)
if not local_image:
raise RuntimeError("image-download-or-optimize-failed")
if current_summary_image and local_image == current_summary_image:
logger.info("Rejected duplicate image candidate for article=%s", item.id)
continue
return local_image, image_credit, "provider"
except Exception:
if attempt < max_attempts - 1:
delay = 2**attempt
await asyncio.sleep(delay)
fallback_local = await download_and_optimize_image(GENERIC_AI_FALLBACK_URL)
if fallback_local and fallback_local != current_summary_image:
return fallback_local, "AI-themed fallback", "fallback"
return None, None, "none"
async def refetch_images_for_latest(
limit: int,
max_attempts: int,
dry_run: bool,
target_article_id: int | None = None,
) -> tuple[int, int]:
db = SessionLocal()
processed = 0
refreshed = 0
try:
if target_article_id is not None:
items = (
db.query(NewsItem)
.filter(NewsItem.archived.is_(False), NewsItem.id == target_article_id)
.all()
)
else:
items = (
db.query(NewsItem)
.filter(NewsItem.archived.is_(False))
.order_by(desc(NewsItem.published_at))
.limit(limit)
.all()
)
total = len(items)
for idx, item in enumerate(items, start=1):
processed += 1
local_image, image_credit, decision = await refetch_image_for_item(
item=item,
max_attempts=max_attempts,
)
if local_image:
refreshed += 1
if not dry_run:
item.summary_image_url = local_image
item.summary_image_credit = image_credit or item.summary_image_credit
db.commit()
print_result(
"refetch-images",
"progress",
current=idx,
total=total,
refreshed=refreshed,
article_id=item.id,
decision=decision,
)
return processed, refreshed
finally:
db.close()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="clawfort", description="ClawFort operations CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
force_fetch_parser = subparsers.add_parser(
"force-fetch",
help="Run one immediate news fetch cycle",
description="Trigger one immediate news fetch run outside scheduler cadence.",
)
force_fetch_parser.set_defaults(handler=handle_force_fetch)
admin_parser = subparsers.add_parser(
"admin",
help="Administrative maintenance commands",
description="Run admin maintenance and recovery operations.",
)
admin_subparsers = admin_parser.add_subparsers(dest="admin_command", required=True)
fetch_parser = admin_subparsers.add_parser("fetch", help="Fetch n number of articles")
fetch_parser.add_argument("--count", type=bounded_count, default=5)
fetch_parser.set_defaults(handler=handle_admin_fetch)
refetch_parser = admin_subparsers.add_parser(
"refetch-images",
help="Refetch and optimize latest article images",
)
refetch_parser.add_argument("--limit", type=positive_int, default=30)
refetch_parser.add_argument(
"--permalink",
type=str,
default="",
help="Target one article by permalink (for example '/?article=123' or '123')",
)
refetch_parser.add_argument("--max-attempts", type=positive_int, default=4)
refetch_parser.add_argument("--dry-run", action="store_true")
refetch_parser.set_defaults(handler=handle_admin_refetch_images)
clean_archive_parser = admin_subparsers.add_parser(
"clean-archive",
help="Delete archived items older than retention window",
)
clean_archive_parser.add_argument("--days", type=positive_int, default=60)
clean_archive_parser.add_argument("--confirm", action="store_true")
clean_archive_parser.add_argument("--dry-run", action="store_true")
clean_archive_parser.set_defaults(handler=handle_admin_clean_archive)
clear_cache_parser = admin_subparsers.add_parser(
"clear-cache", help="Clear optimized image cache"
)
clear_cache_parser.add_argument("--confirm", action="store_true")
clear_cache_parser.add_argument("--dry-run", action="store_true")
clear_cache_parser.set_defaults(handler=handle_admin_clear_cache)
clear_news_parser = admin_subparsers.add_parser("clear-news", help="Clear existing news items")
clear_news_parser.add_argument("--include-archived", action="store_true")
clear_news_parser.add_argument("--confirm", action="store_true")
clear_news_parser.add_argument("--dry-run", action="store_true")
clear_news_parser.set_defaults(handler=handle_admin_clear_news)
rebuild_parser = admin_subparsers.add_parser(
"rebuild-site", help="Clear and rebuild site content"
)
rebuild_parser.add_argument("--count", type=bounded_count, default=5)
rebuild_parser.add_argument("--confirm", action="store_true")
rebuild_parser.add_argument("--dry-run", action="store_true")
rebuild_parser.set_defaults(handler=handle_admin_rebuild_site)
regen_parser = admin_subparsers.add_parser(
"regenerate-translations",
help="Regenerate translations for existing articles",
)
regen_parser.add_argument("--limit", type=positive_int, default=0)
regen_parser.add_argument("--dry-run", action="store_true")
regen_parser.set_defaults(handler=handle_admin_regenerate_translations)
return parser
def validate_runtime() -> None:
if not config.PERPLEXITY_API_KEY and not config.OPENROUTER_API_KEY:
raise RuntimeError(
"No provider API key configured. Set PERPLEXITY_API_KEY or OPENROUTER_API_KEY in the environment."
)
def handle_force_fetch(_: argparse.Namespace) -> int:
start = time.monotonic()
try:
validate_runtime()
os.makedirs("data", exist_ok=True)
init_db()
stored_count = asyncio.run(process_and_store_news())
elapsed = time.monotonic() - start
print(f"force-fetch succeeded: stored={stored_count} elapsed={elapsed:.1f}s")
return 0
except Exception as exc:
logger.exception("force-fetch failed")
print(f"force-fetch failed: {exc}", file=sys.stderr)
print(
"Check API keys, network connectivity, and provider status, then retry the command.",
file=sys.stderr,
)
return 1
def handle_admin_fetch(args: argparse.Namespace) -> int:
start = time.monotonic()
try:
validate_runtime()
init_db()
stored = asyncio.run(process_and_store_news(article_count=args.count))
elapsed = time.monotonic() - start
print_result("fetch", "ok", requested=args.count, stored=stored, elapsed=f"{elapsed:.1f}s")
return 0
except Exception:
logger.exception("admin fetch failed")
print_result("fetch", "error")
return 1
def handle_admin_refetch_images(args: argparse.Namespace) -> int:
start = time.monotonic()
try:
init_db()
target_article_id = resolve_article_id_from_permalink(args.permalink)
if args.permalink and target_article_id is None:
print_result(
"refetch-images",
"blocked",
reason="invalid-permalink",
hint="use '/?article=<id>' or raw numeric id",
)
return 2
processed, refreshed = asyncio.run(
refetch_images_for_latest(
limit=min(args.limit, 30),
max_attempts=args.max_attempts,
dry_run=args.dry_run,
target_article_id=target_article_id,
)
)
elapsed = time.monotonic() - start
print_result(
"refetch-images",
"ok",
processed=processed,
refreshed=refreshed,
target_article_id=target_article_id,
dry_run=args.dry_run,
elapsed=f"{elapsed:.1f}s",
)
return 0
except Exception:
logger.exception("admin refetch-images failed")
print_result("refetch-images", "error")
return 1
def handle_admin_clean_archive(args: argparse.Namespace) -> int:
if not require_confirm(args, "clean-archive"):
return 2
db = SessionLocal()
try:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=args.days)
query = db.query(NewsItem).filter(
and_(NewsItem.archived.is_(True), NewsItem.created_at < cutoff)
)
count = query.count()
if args.dry_run:
print_result("clean-archive", "ok", dry_run=True, would_delete=count)
return 0
deleted = delete_archived_news(db, days_after_archive=args.days)
print_result("clean-archive", "ok", deleted=deleted)
return 0
except Exception:
logger.exception("admin clean-archive failed")
print_result("clean-archive", "error")
return 1
finally:
db.close()
def handle_admin_clear_cache(args: argparse.Namespace) -> int:
if not require_confirm(args, "clear-cache"):
return 2
try:
os.makedirs(config.STATIC_IMAGES_DIR, exist_ok=True)
files = [
os.path.join(config.STATIC_IMAGES_DIR, name)
for name in os.listdir(config.STATIC_IMAGES_DIR)
if name.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))
]
if args.dry_run:
print_result("clear-cache", "ok", dry_run=True, would_delete=len(files))
return 0
deleted = 0
for file_path in files:
try:
os.remove(file_path)
deleted += 1
except OSError:
logger.warning("Failed to remove cache file: %s", file_path)
print_result("clear-cache", "ok", deleted=deleted)
return 0
except Exception:
logger.exception("admin clear-cache failed")
print_result("clear-cache", "error")
return 1
def handle_admin_clear_news(args: argparse.Namespace) -> int:
if not require_confirm(args, "clear-news"):
return 2
db = SessionLocal()
try:
query = db.query(NewsItem)
if not args.include_archived:
query = query.filter(NewsItem.archived.is_(False))
items = query.all()
if args.dry_run:
print_result("clear-news", "ok", dry_run=True, would_delete=len(items))
return 0
deleted = 0
for item in items:
db.delete(item)
deleted += 1
db.commit()
print_result("clear-news", "ok", deleted=deleted)
return 0
except Exception:
db.rollback()
logger.exception("admin clear-news failed")
print_result("clear-news", "error")
return 1
finally:
db.close()
def handle_admin_rebuild_site(args: argparse.Namespace) -> int:
if not require_confirm(args, "rebuild-site"):
return 2
if args.dry_run:
print_result("rebuild-site", "ok", dry_run=True, steps="clear-news,fetch")
return 0
clear_result = handle_admin_clear_news(
argparse.Namespace(include_archived=True, confirm=True, dry_run=False)
)
if clear_result != 0:
print_result("rebuild-site", "error", step="clear-news")
return clear_result
fetch_result = handle_admin_fetch(argparse.Namespace(count=args.count))
if fetch_result != 0:
print_result("rebuild-site", "error", step="fetch")
return fetch_result
print_result("rebuild-site", "ok", count=args.count)
return 0
def handle_admin_regenerate_translations(args: argparse.Namespace) -> int:
db = SessionLocal()
try:
query = db.query(NewsItem).filter(NewsItem.archived.is_(False)).order_by(desc(NewsItem.id))
if args.limit and args.limit > 0:
query = query.limit(args.limit)
items = query.all()
regenerated = 0
for item in items:
tldr_points = resolve_tldr_points(item, None)
translations = asyncio.run(
generate_translations(
headline=item.headline,
summary=item.summary,
tldr_points=tldr_points,
summary_body=item.summary_body,
source_citation=item.source_citation,
)
)
for language_code, payload in translations.items():
if args.dry_run:
regenerated += 1
continue
existing = get_translation(db, item.id, language_code)
if existing is None:
create_translation(
db=db,
news_item_id=item.id,
language=language_code,
headline=payload["headline"],
summary=payload["summary"],
tldr_points=payload.get("tldr_points"),
summary_body=payload.get("summary_body"),
source_citation=payload.get("source_citation"),
)
else:
existing.headline = payload["headline"]
existing.summary = payload["summary"]
existing.tldr_points = (
json.dumps(payload.get("tldr_points"))
if payload.get("tldr_points")
else None
)
existing.summary_body = payload.get("summary_body")
existing.source_citation = payload.get("source_citation")
regenerated += 1
if not args.dry_run:
db.commit()
print_result(
"regenerate-translations",
"ok",
articles=len(items),
regenerated=regenerated,
dry_run=args.dry_run,
)
return 0
except Exception:
db.rollback()
logger.exception("admin regenerate-translations failed")
print_result("regenerate-translations", "error")
return 1
finally:
db.close()
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
handler = args.handler
return handler(args)
if __name__ == "__main__":
raise SystemExit(main())