557 lines
18 KiB
Python
557 lines
18 KiB
Python
import argparse
|
|
import asyncio
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
from sqlalchemy import and_, desc
|
|
|
|
from backend import config
|
|
from backend.database import SessionLocal, init_db
|
|
from backend.models import NewsItem
|
|
from backend.news_service import (
|
|
GENERIC_AI_FALLBACK_URL,
|
|
download_and_optimize_image,
|
|
extract_image_keywords,
|
|
fetch_royalty_free_image,
|
|
generate_translations,
|
|
process_and_store_news,
|
|
)
|
|
from backend.repository import (
|
|
create_translation,
|
|
delete_archived_news,
|
|
get_translation,
|
|
resolve_tldr_points,
|
|
)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def positive_int(value: str) -> int:
|
|
try:
|
|
parsed = int(value)
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError("must be an integer") from exc
|
|
if parsed <= 0:
|
|
raise argparse.ArgumentTypeError("must be greater than 0")
|
|
return parsed
|
|
|
|
|
|
def bounded_count(value: str) -> int:
|
|
parsed = positive_int(value)
|
|
if parsed > 50:
|
|
raise argparse.ArgumentTypeError("must be <= 50")
|
|
return parsed
|
|
|
|
|
|
def print_result(command: str, status: str, **fields: object) -> None:
|
|
parts = [f"admin:{command}", f"status={status}"]
|
|
parts.extend([f"{key}={value}" for key, value in fields.items()])
|
|
print(" ".join(parts))
|
|
|
|
|
|
def require_confirm(args: argparse.Namespace, action: str) -> bool:
|
|
if getattr(args, "confirm", False):
|
|
return True
|
|
print_result(
|
|
action,
|
|
"blocked",
|
|
reason="missing-confirm",
|
|
hint="rerun with --confirm",
|
|
)
|
|
return False
|
|
|
|
|
|
def build_contextual_query(headline: str, summary: str | None) -> str:
|
|
headline_query = extract_image_keywords(headline)
|
|
summary_query = extract_image_keywords(summary or "")
|
|
|
|
mood_terms: list[str] = []
|
|
text = f"{headline} {summary or ''}".lower()
|
|
if any(word in text for word in ("breakthrough", "launch", "record", "surge", "growth")):
|
|
mood_terms.extend(["innovation", "future"])
|
|
if any(word in text for word in ("risk", "lawsuit", "ban", "decline", "drop", "crash")):
|
|
mood_terms.extend(["serious", "technology"])
|
|
|
|
combined = " ".join([headline_query, summary_query, " ".join(mood_terms)]).strip()
|
|
cleaned = re.sub(r"\s+", " ", combined).strip()
|
|
if not cleaned:
|
|
return "ai machine learning deep learning"
|
|
return cleaned
|
|
|
|
|
|
def resolve_article_id_from_permalink(value: str | None) -> int | None:
|
|
if not value:
|
|
return None
|
|
if value.isdigit():
|
|
return int(value)
|
|
match = re.search(r"(?:\?|&)article=(\d+)", value)
|
|
if match:
|
|
return int(match.group(1))
|
|
return None
|
|
|
|
|
|
def is_unrelated_image_candidate(image_url: str | None, image_credit: str | None) -> bool:
|
|
text = f"{image_url or ''} {image_credit or ''}".lower()
|
|
blocked = (
|
|
"cat",
|
|
"dog",
|
|
"pet",
|
|
"animal",
|
|
"wildlife",
|
|
"lion",
|
|
"tiger",
|
|
"bird",
|
|
"horse",
|
|
)
|
|
return any(term in text for term in blocked)
|
|
|
|
|
|
async def refetch_image_for_item(
|
|
item: NewsItem,
|
|
max_attempts: int,
|
|
) -> tuple[str | None, str | None, str]:
|
|
query = build_contextual_query(item.headline, item.summary)
|
|
current_summary_image = item.summary_image_url
|
|
query_variants = [
|
|
f"{query} alternative angle",
|
|
f"{query} concept illustration",
|
|
query,
|
|
]
|
|
|
|
for query_variant in query_variants:
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
image_url, image_credit = await fetch_royalty_free_image(query_variant)
|
|
if not image_url:
|
|
raise RuntimeError("no-image-url")
|
|
if is_unrelated_image_candidate(image_url, image_credit):
|
|
logger.info("Rejected unrelated image candidate: %s", image_url)
|
|
continue
|
|
local_image = await download_and_optimize_image(image_url)
|
|
if not local_image:
|
|
raise RuntimeError("image-download-or-optimize-failed")
|
|
if current_summary_image and local_image == current_summary_image:
|
|
logger.info("Rejected duplicate image candidate for article=%s", item.id)
|
|
continue
|
|
return local_image, image_credit, "provider"
|
|
except Exception:
|
|
if attempt < max_attempts - 1:
|
|
delay = 2**attempt
|
|
await asyncio.sleep(delay)
|
|
|
|
fallback_local = await download_and_optimize_image(GENERIC_AI_FALLBACK_URL)
|
|
if fallback_local and fallback_local != current_summary_image:
|
|
return fallback_local, "AI-themed fallback", "fallback"
|
|
return None, None, "none"
|
|
|
|
|
|
async def refetch_images_for_latest(
|
|
limit: int,
|
|
max_attempts: int,
|
|
dry_run: bool,
|
|
target_article_id: int | None = None,
|
|
) -> tuple[int, int]:
|
|
db = SessionLocal()
|
|
processed = 0
|
|
refreshed = 0
|
|
|
|
try:
|
|
if target_article_id is not None:
|
|
items = (
|
|
db.query(NewsItem)
|
|
.filter(NewsItem.archived.is_(False), NewsItem.id == target_article_id)
|
|
.all()
|
|
)
|
|
else:
|
|
items = (
|
|
db.query(NewsItem)
|
|
.filter(NewsItem.archived.is_(False))
|
|
.order_by(desc(NewsItem.published_at))
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
total = len(items)
|
|
for idx, item in enumerate(items, start=1):
|
|
processed += 1
|
|
local_image, image_credit, decision = await refetch_image_for_item(
|
|
item=item,
|
|
max_attempts=max_attempts,
|
|
)
|
|
|
|
if local_image:
|
|
refreshed += 1
|
|
if not dry_run:
|
|
item.summary_image_url = local_image
|
|
item.summary_image_credit = image_credit or item.summary_image_credit
|
|
db.commit()
|
|
|
|
print_result(
|
|
"refetch-images",
|
|
"progress",
|
|
current=idx,
|
|
total=total,
|
|
refreshed=refreshed,
|
|
article_id=item.id,
|
|
decision=decision,
|
|
)
|
|
|
|
return processed, refreshed
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(prog="clawfort", description="ClawFort operations CLI")
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
force_fetch_parser = subparsers.add_parser(
|
|
"force-fetch",
|
|
help="Run one immediate news fetch cycle",
|
|
description="Trigger one immediate news fetch run outside scheduler cadence.",
|
|
)
|
|
force_fetch_parser.set_defaults(handler=handle_force_fetch)
|
|
|
|
admin_parser = subparsers.add_parser(
|
|
"admin",
|
|
help="Administrative maintenance commands",
|
|
description="Run admin maintenance and recovery operations.",
|
|
)
|
|
admin_subparsers = admin_parser.add_subparsers(dest="admin_command", required=True)
|
|
|
|
fetch_parser = admin_subparsers.add_parser("fetch", help="Fetch n number of articles")
|
|
fetch_parser.add_argument("--count", type=bounded_count, default=5)
|
|
fetch_parser.set_defaults(handler=handle_admin_fetch)
|
|
|
|
refetch_parser = admin_subparsers.add_parser(
|
|
"refetch-images",
|
|
help="Refetch and optimize latest article images",
|
|
)
|
|
refetch_parser.add_argument("--limit", type=positive_int, default=30)
|
|
refetch_parser.add_argument(
|
|
"--permalink",
|
|
type=str,
|
|
default="",
|
|
help="Target one article by permalink (for example '/?article=123' or '123')",
|
|
)
|
|
refetch_parser.add_argument("--max-attempts", type=positive_int, default=4)
|
|
refetch_parser.add_argument("--dry-run", action="store_true")
|
|
refetch_parser.set_defaults(handler=handle_admin_refetch_images)
|
|
|
|
clean_archive_parser = admin_subparsers.add_parser(
|
|
"clean-archive",
|
|
help="Delete archived items older than retention window",
|
|
)
|
|
clean_archive_parser.add_argument("--days", type=positive_int, default=60)
|
|
clean_archive_parser.add_argument("--confirm", action="store_true")
|
|
clean_archive_parser.add_argument("--dry-run", action="store_true")
|
|
clean_archive_parser.set_defaults(handler=handle_admin_clean_archive)
|
|
|
|
clear_cache_parser = admin_subparsers.add_parser(
|
|
"clear-cache", help="Clear optimized image cache"
|
|
)
|
|
clear_cache_parser.add_argument("--confirm", action="store_true")
|
|
clear_cache_parser.add_argument("--dry-run", action="store_true")
|
|
clear_cache_parser.set_defaults(handler=handle_admin_clear_cache)
|
|
|
|
clear_news_parser = admin_subparsers.add_parser("clear-news", help="Clear existing news items")
|
|
clear_news_parser.add_argument("--include-archived", action="store_true")
|
|
clear_news_parser.add_argument("--confirm", action="store_true")
|
|
clear_news_parser.add_argument("--dry-run", action="store_true")
|
|
clear_news_parser.set_defaults(handler=handle_admin_clear_news)
|
|
|
|
rebuild_parser = admin_subparsers.add_parser(
|
|
"rebuild-site", help="Clear and rebuild site content"
|
|
)
|
|
rebuild_parser.add_argument("--count", type=bounded_count, default=5)
|
|
rebuild_parser.add_argument("--confirm", action="store_true")
|
|
rebuild_parser.add_argument("--dry-run", action="store_true")
|
|
rebuild_parser.set_defaults(handler=handle_admin_rebuild_site)
|
|
|
|
regen_parser = admin_subparsers.add_parser(
|
|
"regenerate-translations",
|
|
help="Regenerate translations for existing articles",
|
|
)
|
|
regen_parser.add_argument("--limit", type=positive_int, default=0)
|
|
regen_parser.add_argument("--dry-run", action="store_true")
|
|
regen_parser.set_defaults(handler=handle_admin_regenerate_translations)
|
|
|
|
return parser
|
|
|
|
|
|
def validate_runtime() -> None:
|
|
if not config.PERPLEXITY_API_KEY and not config.OPENROUTER_API_KEY:
|
|
raise RuntimeError(
|
|
"No provider API key configured. Set PERPLEXITY_API_KEY or OPENROUTER_API_KEY in the environment."
|
|
)
|
|
|
|
|
|
def handle_force_fetch(_: argparse.Namespace) -> int:
|
|
start = time.monotonic()
|
|
|
|
try:
|
|
validate_runtime()
|
|
os.makedirs("data", exist_ok=True)
|
|
init_db()
|
|
|
|
stored_count = asyncio.run(process_and_store_news())
|
|
elapsed = time.monotonic() - start
|
|
|
|
print(f"force-fetch succeeded: stored={stored_count} elapsed={elapsed:.1f}s")
|
|
return 0
|
|
except Exception as exc:
|
|
logger.exception("force-fetch failed")
|
|
print(f"force-fetch failed: {exc}", file=sys.stderr)
|
|
print(
|
|
"Check API keys, network connectivity, and provider status, then retry the command.",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
|
|
def handle_admin_fetch(args: argparse.Namespace) -> int:
|
|
start = time.monotonic()
|
|
try:
|
|
validate_runtime()
|
|
init_db()
|
|
stored = asyncio.run(process_and_store_news(article_count=args.count))
|
|
elapsed = time.monotonic() - start
|
|
print_result("fetch", "ok", requested=args.count, stored=stored, elapsed=f"{elapsed:.1f}s")
|
|
return 0
|
|
except Exception:
|
|
logger.exception("admin fetch failed")
|
|
print_result("fetch", "error")
|
|
return 1
|
|
|
|
|
|
def handle_admin_refetch_images(args: argparse.Namespace) -> int:
|
|
start = time.monotonic()
|
|
try:
|
|
init_db()
|
|
target_article_id = resolve_article_id_from_permalink(args.permalink)
|
|
if args.permalink and target_article_id is None:
|
|
print_result(
|
|
"refetch-images",
|
|
"blocked",
|
|
reason="invalid-permalink",
|
|
hint="use '/?article=<id>' or raw numeric id",
|
|
)
|
|
return 2
|
|
|
|
processed, refreshed = asyncio.run(
|
|
refetch_images_for_latest(
|
|
limit=min(args.limit, 30),
|
|
max_attempts=args.max_attempts,
|
|
dry_run=args.dry_run,
|
|
target_article_id=target_article_id,
|
|
)
|
|
)
|
|
elapsed = time.monotonic() - start
|
|
print_result(
|
|
"refetch-images",
|
|
"ok",
|
|
processed=processed,
|
|
refreshed=refreshed,
|
|
target_article_id=target_article_id,
|
|
dry_run=args.dry_run,
|
|
elapsed=f"{elapsed:.1f}s",
|
|
)
|
|
return 0
|
|
except Exception:
|
|
logger.exception("admin refetch-images failed")
|
|
print_result("refetch-images", "error")
|
|
return 1
|
|
|
|
|
|
def handle_admin_clean_archive(args: argparse.Namespace) -> int:
|
|
if not require_confirm(args, "clean-archive"):
|
|
return 2
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=args.days)
|
|
query = db.query(NewsItem).filter(
|
|
and_(NewsItem.archived.is_(True), NewsItem.created_at < cutoff)
|
|
)
|
|
count = query.count()
|
|
if args.dry_run:
|
|
print_result("clean-archive", "ok", dry_run=True, would_delete=count)
|
|
return 0
|
|
deleted = delete_archived_news(db, days_after_archive=args.days)
|
|
print_result("clean-archive", "ok", deleted=deleted)
|
|
return 0
|
|
except Exception:
|
|
logger.exception("admin clean-archive failed")
|
|
print_result("clean-archive", "error")
|
|
return 1
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def handle_admin_clear_cache(args: argparse.Namespace) -> int:
|
|
if not require_confirm(args, "clear-cache"):
|
|
return 2
|
|
|
|
try:
|
|
os.makedirs(config.STATIC_IMAGES_DIR, exist_ok=True)
|
|
files = [
|
|
os.path.join(config.STATIC_IMAGES_DIR, name)
|
|
for name in os.listdir(config.STATIC_IMAGES_DIR)
|
|
if name.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))
|
|
]
|
|
if args.dry_run:
|
|
print_result("clear-cache", "ok", dry_run=True, would_delete=len(files))
|
|
return 0
|
|
deleted = 0
|
|
for file_path in files:
|
|
try:
|
|
os.remove(file_path)
|
|
deleted += 1
|
|
except OSError:
|
|
logger.warning("Failed to remove cache file: %s", file_path)
|
|
print_result("clear-cache", "ok", deleted=deleted)
|
|
return 0
|
|
except Exception:
|
|
logger.exception("admin clear-cache failed")
|
|
print_result("clear-cache", "error")
|
|
return 1
|
|
|
|
|
|
def handle_admin_clear_news(args: argparse.Namespace) -> int:
|
|
if not require_confirm(args, "clear-news"):
|
|
return 2
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(NewsItem)
|
|
if not args.include_archived:
|
|
query = query.filter(NewsItem.archived.is_(False))
|
|
items = query.all()
|
|
if args.dry_run:
|
|
print_result("clear-news", "ok", dry_run=True, would_delete=len(items))
|
|
return 0
|
|
deleted = 0
|
|
for item in items:
|
|
db.delete(item)
|
|
deleted += 1
|
|
db.commit()
|
|
print_result("clear-news", "ok", deleted=deleted)
|
|
return 0
|
|
except Exception:
|
|
db.rollback()
|
|
logger.exception("admin clear-news failed")
|
|
print_result("clear-news", "error")
|
|
return 1
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def handle_admin_rebuild_site(args: argparse.Namespace) -> int:
|
|
if not require_confirm(args, "rebuild-site"):
|
|
return 2
|
|
if args.dry_run:
|
|
print_result("rebuild-site", "ok", dry_run=True, steps="clear-news,fetch")
|
|
return 0
|
|
|
|
clear_result = handle_admin_clear_news(
|
|
argparse.Namespace(include_archived=True, confirm=True, dry_run=False)
|
|
)
|
|
if clear_result != 0:
|
|
print_result("rebuild-site", "error", step="clear-news")
|
|
return clear_result
|
|
|
|
fetch_result = handle_admin_fetch(argparse.Namespace(count=args.count))
|
|
if fetch_result != 0:
|
|
print_result("rebuild-site", "error", step="fetch")
|
|
return fetch_result
|
|
|
|
print_result("rebuild-site", "ok", count=args.count)
|
|
return 0
|
|
|
|
|
|
def handle_admin_regenerate_translations(args: argparse.Namespace) -> int:
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(NewsItem).filter(NewsItem.archived.is_(False)).order_by(desc(NewsItem.id))
|
|
if args.limit and args.limit > 0:
|
|
query = query.limit(args.limit)
|
|
items = query.all()
|
|
|
|
regenerated = 0
|
|
for item in items:
|
|
tldr_points = resolve_tldr_points(item, None)
|
|
translations = asyncio.run(
|
|
generate_translations(
|
|
headline=item.headline,
|
|
summary=item.summary,
|
|
tldr_points=tldr_points,
|
|
summary_body=item.summary_body,
|
|
source_citation=item.source_citation,
|
|
)
|
|
)
|
|
for language_code, payload in translations.items():
|
|
if args.dry_run:
|
|
regenerated += 1
|
|
continue
|
|
existing = get_translation(db, item.id, language_code)
|
|
if existing is None:
|
|
create_translation(
|
|
db=db,
|
|
news_item_id=item.id,
|
|
language=language_code,
|
|
headline=payload["headline"],
|
|
summary=payload["summary"],
|
|
tldr_points=payload.get("tldr_points"),
|
|
summary_body=payload.get("summary_body"),
|
|
source_citation=payload.get("source_citation"),
|
|
)
|
|
else:
|
|
existing.headline = payload["headline"]
|
|
existing.summary = payload["summary"]
|
|
existing.tldr_points = (
|
|
json.dumps(payload.get("tldr_points"))
|
|
if payload.get("tldr_points")
|
|
else None
|
|
)
|
|
existing.summary_body = payload.get("summary_body")
|
|
existing.source_citation = payload.get("source_citation")
|
|
regenerated += 1
|
|
if not args.dry_run:
|
|
db.commit()
|
|
|
|
print_result(
|
|
"regenerate-translations",
|
|
"ok",
|
|
articles=len(items),
|
|
regenerated=regenerated,
|
|
dry_run=args.dry_run,
|
|
)
|
|
return 0
|
|
except Exception:
|
|
db.rollback()
|
|
logger.exception("admin regenerate-translations failed")
|
|
print_result("regenerate-translations", "error")
|
|
return 1
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
handler = args.handler
|
|
return handler(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|