import logging
import os
import random
from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import Depends, FastAPI, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from sqlalchemy.orm import Session
from starlette.exceptions import HTTPException as StarletteHTTPException
from backend import config
from backend.database import get_db, init_db
from backend.models import NewsItem
from backend.news_service import scheduled_news_fetch
from backend.repository import (
archive_old_news,
delete_archived_news,
get_latest_news,
get_news_paginated,
get_translation,
normalize_language,
resolve_news_content,
resolve_summary_fields,
)
from backend.schemas import HealthResponse, NewsItemResponse, PaginatedNewsResponse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
app = FastAPI(title="ClawFort News API", version="0.1.0")
_ERROR_MESSAGES = {
404: [
"This page wandered off to train a tiny model.",
"We looked everywhere, even in the latent space.",
"The link took a creative detour.",
"This route is currently off doing research.",
"The page you asked for is not in this timeline.",
],
500: [
"The server hit a logic knot and needs a quick reset.",
"Our robots dropped a semicolon somewhere important.",
"A background process got stage fright.",
"The AI took an unexpected coffee break.",
"Something internal blinked at the wrong moment.",
],
}
def _render_error_page(status_code: int) -> str:
message = random.choice(_ERROR_MESSAGES.get(status_code, _ERROR_MESSAGES[500]))
return f"""
{status_code} - ClawFort
"""
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
if request.url.path.startswith("/api/"):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})
if exc.status_code == 404:
return HTMLResponse(_render_error_page(404), status_code=404)
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger.exception("Unhandled server error: %s", exc)
if request.url.path.startswith("/api/"):
return JSONResponse(status_code=500, content={"detail": "Internal Server Error"})
return HTMLResponse(_render_error_page(500), status_code=500)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=500)
@app.middleware("http")
async def add_cache_headers(request: Request, call_next):
response = await call_next(request)
path = request.url.path
if path.startswith("/static/"):
response.headers.setdefault("Cache-Control", "public, max-age=604800, immutable")
elif path.startswith("/api/"):
response.headers.setdefault(
"Cache-Control", "public, max-age=60, stale-while-revalidate=120"
)
response.headers.setdefault("Vary", "Accept-Encoding")
elif path in {"/", "/terms", "/attribution"}:
response.headers.setdefault(
"Cache-Control", "public, max-age=300, stale-while-revalidate=600"
)
response.headers.setdefault("X-Content-Type-Options", "nosniff")
return response
static_dir = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
scheduler = BackgroundScheduler()
def nightly_cleanup() -> None:
from backend.database import SessionLocal
db = SessionLocal()
try:
archived = archive_old_news(db, config.RETENTION_DAYS)
deleted = delete_archived_news(db, days_after_archive=60)
logger.info("Nightly cleanup: archived=%d, deleted=%d", archived, deleted)
finally:
db.close()
@app.on_event("startup")
async def startup_event() -> None:
if not config.PERPLEXITY_API_KEY:
logger.error("PERPLEXITY_API_KEY is not set — news fetching will fail")
os.makedirs("data", exist_ok=True)
init_db()
logger.info("Database initialized")
scheduler.add_job(scheduled_news_fetch, "interval", hours=1, id="news_fetch")
scheduler.add_job(nightly_cleanup, "cron", hour=3, minute=0, id="nightly_cleanup")
scheduler.start()
logger.info("Scheduler started: hourly news fetch + nightly cleanup")
@app.on_event("shutdown")
async def shutdown_event() -> None:
scheduler.shutdown(wait=False)
logger.info("Scheduler shut down")
@app.get("/api/news", response_model=PaginatedNewsResponse)
def api_get_news(
cursor: int | None = Query(None, description="Cursor for pagination (last item ID)"),
limit: int = Query(10, ge=1, le=50),
exclude_hero: int | None = Query(None, description="Hero item ID to exclude from feed"),
language: str = Query("en", description="Language code: en, ta, ml"),
db: Session = Depends(get_db),
) -> PaginatedNewsResponse:
lang = normalize_language(language)
items = get_news_paginated(db, cursor=cursor, limit=limit + 1, exclude_id=exclude_hero)
has_more = len(items) > limit
if has_more:
items = items[:limit]
next_cursor = items[-1].id if items and has_more else None
response_items: list[NewsItemResponse] = []
for item in items:
translation = None
if lang != "en":
translation = get_translation(db, item.id, lang)
headline, summary = resolve_news_content(item, translation)
tldr_points, summary_body, source_citation = resolve_summary_fields(item, translation)
response_items.append(
NewsItemResponse(
id=item.id,
headline=headline,
summary=summary,
source_url=item.source_url,
image_url=item.image_url,
image_credit=item.image_credit,
tldr_points=tldr_points,
summary_body=summary_body,
source_citation=source_citation,
summary_image_url=item.summary_image_url,
summary_image_credit=item.summary_image_credit,
published_at=item.published_at,
created_at=item.created_at,
language=lang if translation is not None else "en",
)
)
return PaginatedNewsResponse(
items=response_items,
next_cursor=next_cursor,
has_more=has_more,
)
@app.get("/api/news/latest", response_model=NewsItemResponse | None)
def api_get_latest_news(
language: str = Query("en", description="Language code: en, ta, ml"),
db: Session = Depends(get_db),
) -> NewsItemResponse | None:
lang = normalize_language(language)
item = get_latest_news(db)
if not item:
return None
translation = None
if lang != "en":
translation = get_translation(db, item.id, lang)
headline, summary = resolve_news_content(item, translation)
tldr_points, summary_body, source_citation = resolve_summary_fields(item, translation)
return NewsItemResponse(
id=item.id,
headline=headline,
summary=summary,
source_url=item.source_url,
image_url=item.image_url,
image_credit=item.image_credit,
tldr_points=tldr_points,
summary_body=summary_body,
source_citation=source_citation,
summary_image_url=item.summary_image_url,
summary_image_credit=item.summary_image_credit,
published_at=item.published_at,
created_at=item.created_at,
language=lang if translation is not None else "en",
)
@app.get("/api/health", response_model=HealthResponse)
def api_health(db: Session = Depends(get_db)) -> HealthResponse:
count = db.query(NewsItem).filter(NewsItem.archived.is_(False)).count()
return HealthResponse(status="ok", version="0.1.0", news_count=count)
frontend_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend")
@app.get("/")
async def serve_frontend() -> FileResponse:
return FileResponse(os.path.join(frontend_dir, "index.html"))
@app.get("/terms")
async def serve_terms() -> FileResponse:
return FileResponse(os.path.join(frontend_dir, "terms.html"))
@app.get("/attribution")
async def serve_attribution() -> FileResponse:
return FileResponse(os.path.join(frontend_dir, "attribution.html"))
@app.get("/config")
async def serve_config() -> dict:
return {
"umami_script_url": config.UMAMI_SCRIPT_URL,
"umami_website_id": config.UMAMI_WEBSITE_ID,
"github_repo_url": config.GITHUB_REPO_URL,
"contact_email": config.CONTACT_EMAIL,
"supported_languages": config.SUPPORTED_LANGUAGES,
"default_language": "en",
}