Files
clawfort/backend/repository.py
2026-02-13 00:49:22 -05:00

211 lines
5.8 KiB
Python

import datetime
import json
from sqlalchemy import and_, desc
from sqlalchemy.orm import Session
from backend.models import NewsItem, NewsTranslation
SUPPORTED_LANGUAGES = {"en", "ta", "ml"}
def create_news(
db: Session,
headline: str,
summary: str,
source_url: str | None = None,
image_url: str | None = None,
image_credit: str | None = None,
tldr_points: list[str] | None = None,
summary_body: str | None = None,
source_citation: str | None = None,
summary_image_url: str | None = None,
summary_image_credit: str | None = None,
published_at: datetime.datetime | None = None,
) -> NewsItem:
item = NewsItem(
headline=headline,
summary=summary,
source_url=source_url,
image_url=image_url,
image_credit=image_credit,
tldr_points=json.dumps(tldr_points) if tldr_points else None,
summary_body=summary_body,
source_citation=source_citation,
summary_image_url=summary_image_url,
summary_image_credit=summary_image_credit,
published_at=published_at or datetime.datetime.utcnow(),
)
db.add(item)
db.commit()
db.refresh(item)
return item
def get_recent_news(db: Session, limit: int = 10) -> list[NewsItem]:
return (
db.query(NewsItem)
.filter(NewsItem.archived.is_(False))
.order_by(desc(NewsItem.published_at))
.limit(limit)
.all()
)
def get_latest_news(db: Session) -> NewsItem | None:
return (
db.query(NewsItem)
.filter(NewsItem.archived.is_(False))
.order_by(desc(NewsItem.published_at))
.first()
)
def create_translation(
db: Session,
news_item_id: int,
language: str,
headline: str,
summary: str,
tldr_points: list[str] | None = None,
summary_body: str | None = None,
source_citation: str | None = None,
) -> NewsTranslation:
translation = NewsTranslation(
news_item_id=news_item_id,
language=language,
headline=headline,
summary=summary,
tldr_points=json.dumps(tldr_points) if tldr_points else None,
summary_body=summary_body,
source_citation=source_citation,
)
db.add(translation)
db.commit()
db.refresh(translation)
return translation
def get_translation(db: Session, news_item_id: int, language: str) -> NewsTranslation | None:
return (
db.query(NewsTranslation)
.filter(
and_(
NewsTranslation.news_item_id == news_item_id,
NewsTranslation.language == language,
)
)
.first()
)
def translation_exists(db: Session, news_item_id: int, language: str) -> bool:
return get_translation(db, news_item_id, language) is not None
def get_translations_by_article(db: Session, news_item_id: int) -> list[NewsTranslation]:
return (
db.query(NewsTranslation)
.filter(NewsTranslation.news_item_id == news_item_id)
.order_by(NewsTranslation.language.asc())
.all()
)
def resolve_news_content(item: NewsItem, translation: NewsTranslation | None) -> tuple[str, str]:
if translation is None:
return item.headline, item.summary
return translation.headline, translation.summary
def resolve_tldr_points(item: NewsItem, translation: NewsTranslation | None) -> list[str] | None:
raw = translation.tldr_points if translation is not None else item.tldr_points
if not raw:
return None
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return None
if isinstance(parsed, list):
return [str(x) for x in parsed if str(x).strip()]
return None
def resolve_summary_fields(
item: NewsItem, translation: NewsTranslation | None
) -> tuple[list[str] | None, str | None, str | None]:
tldr_points = resolve_tldr_points(item, translation)
if translation is None:
return tldr_points, item.summary_body, item.source_citation
return tldr_points, translation.summary_body, translation.source_citation
def normalize_language(language: str | None) -> str:
if not language:
return "en"
lower = language.lower()
if lower not in SUPPORTED_LANGUAGES:
return "en"
return lower
def get_news_paginated(
db: Session, cursor: int | None = None, limit: int = 10, exclude_id: int | None = None
) -> list[NewsItem]:
query = db.query(NewsItem).filter(NewsItem.archived.is_(False))
if exclude_id is not None:
query = query.filter(NewsItem.id != exclude_id)
if cursor is not None:
query = query.filter(NewsItem.id < cursor)
return query.order_by(desc(NewsItem.id)).limit(limit).all()
def headline_exists_within_24h(db: Session, headline: str) -> bool:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=24)
return (
db.query(NewsItem)
.filter(
and_(
NewsItem.headline == headline,
NewsItem.created_at >= cutoff,
)
)
.first()
is not None
)
def archive_old_news(db: Session, retention_days: int = 30) -> int:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=retention_days)
count = (
db.query(NewsItem)
.filter(
and_(
NewsItem.created_at < cutoff,
NewsItem.archived.is_(False),
)
)
.update({"archived": True})
)
db.commit()
return count
def delete_archived_news(db: Session, days_after_archive: int = 60) -> int:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=days_after_archive)
count = (
db.query(NewsItem)
.filter(
and_(
NewsItem.archived.is_(True),
NewsItem.created_at < cutoff,
)
)
.delete()
)
db.commit()
return count