import datetime from sqlalchemy import and_, desc from sqlalchemy.orm import Session from backend.models import NewsItem, NewsTranslation SUPPORTED_LANGUAGES = {"en", "ta", "ml"} def create_news( db: Session, headline: str, summary: str, source_url: str | None = None, image_url: str | None = None, image_credit: str | None = None, published_at: datetime.datetime | None = None, ) -> NewsItem: item = NewsItem( headline=headline, summary=summary, source_url=source_url, image_url=image_url, image_credit=image_credit, published_at=published_at or datetime.datetime.utcnow(), ) db.add(item) db.commit() db.refresh(item) return item def get_recent_news(db: Session, limit: int = 10) -> list[NewsItem]: return ( db.query(NewsItem) .filter(NewsItem.archived.is_(False)) .order_by(desc(NewsItem.published_at)) .limit(limit) .all() ) def get_latest_news(db: Session) -> NewsItem | None: return ( db.query(NewsItem) .filter(NewsItem.archived.is_(False)) .order_by(desc(NewsItem.published_at)) .first() ) def create_translation( db: Session, news_item_id: int, language: str, headline: str, summary: str, ) -> NewsTranslation: translation = NewsTranslation( news_item_id=news_item_id, language=language, headline=headline, summary=summary, ) db.add(translation) db.commit() db.refresh(translation) return translation def get_translation(db: Session, news_item_id: int, language: str) -> NewsTranslation | None: return ( db.query(NewsTranslation) .filter( and_( NewsTranslation.news_item_id == news_item_id, NewsTranslation.language == language, ) ) .first() ) def translation_exists(db: Session, news_item_id: int, language: str) -> bool: return get_translation(db, news_item_id, language) is not None def get_translations_by_article(db: Session, news_item_id: int) -> list[NewsTranslation]: return ( db.query(NewsTranslation) .filter(NewsTranslation.news_item_id == news_item_id) .order_by(NewsTranslation.language.asc()) .all() ) def resolve_news_content(item: NewsItem, translation: NewsTranslation | None) -> tuple[str, str]: if translation is None: return item.headline, item.summary return translation.headline, translation.summary def normalize_language(language: str | None) -> str: if not language: return "en" lower = language.lower() if lower not in SUPPORTED_LANGUAGES: return "en" return lower def get_news_paginated( db: Session, cursor: int | None = None, limit: int = 10, exclude_id: int | None = None ) -> list[NewsItem]: query = db.query(NewsItem).filter(NewsItem.archived.is_(False)) if exclude_id is not None: query = query.filter(NewsItem.id != exclude_id) if cursor is not None: query = query.filter(NewsItem.id < cursor) return query.order_by(desc(NewsItem.id)).limit(limit).all() def headline_exists_within_24h(db: Session, headline: str) -> bool: cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=24) return ( db.query(NewsItem) .filter( and_( NewsItem.headline == headline, NewsItem.created_at >= cutoff, ) ) .first() is not None ) def archive_old_news(db: Session, retention_days: int = 30) -> int: cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=retention_days) count = ( db.query(NewsItem) .filter( and_( NewsItem.created_at < cutoff, NewsItem.archived.is_(False), ) ) .update({"archived": True}) ) db.commit() return count def delete_archived_news(db: Session, days_after_archive: int = 60) -> int: cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=days_after_archive) count = ( db.query(NewsItem) .filter( and_( NewsItem.archived.is_(True), NewsItem.created_at < cutoff, ) ) .delete() ) db.commit() return count