Files
obsidian-rag/python/obsidian_rag/audit_logger.py

96 lines
3.0 KiB
Python

"""Audit logging for sensitive data access and system events."""
from __future__ import annotations
import getpass
import json
import socket
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
class AuditLogger:
"""Secure audit logger for sensitive content access."""
def __init__(self, log_path: Path):
self.log_path = log_path
self.log_path.parent.mkdir(parents=True, exist_ok=True)
def log_sensitive_access(
self,
file_path: str,
content_type: str,
action: str,
metadata: dict[str, Any] | None = None,
) -> None:
"""Log access to sensitive content with redaction."""
entry = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'file_path': self._redact_path(file_path),
'content_type': content_type,
'action': action,
'user': getpass.getuser(),
'ip_address': self._get_local_ip(),
'metadata': metadata or {},
}
self._write_entry(entry)
def log_security_event(
self,
event_type: str,
severity: str,
description: str,
details: dict[str, Any] | None = None,
) -> None:
"""Log security-related events."""
entry = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'event_type': event_type,
'severity': severity,
'description': description,
'user': getpass.getuser(),
'ip_address': self._get_local_ip(),
'details': details or {},
}
self._write_entry(entry)
def _redact_path(self, path: str) -> str:
"""Redact sensitive information from file paths."""
# Basic redaction - keep filename but remove sensitive path components
try:
p = Path(path)
if any(part.startswith('.') for part in p.parts):
return f".../{p.name}"
return str(p)
except Exception:
return "<redacted>"
def _get_local_ip(self) -> str:
"""Get local IP address for audit logging."""
try:
return socket.gethostbyname(socket.gethostname())
except Exception:
return "127.0.0.1"
def _write_entry(self, entry: dict[str, Any]) -> None:
"""Atomically append to audit log with secure permissions."""
# Write to temporary file first
tmp_path = self.log_path.with_suffix('.tmp')
# Read existing entries
entries = []
if self.log_path.exists():
try:
entries = json.loads(self.log_path.read_text())
except (json.JSONDecodeError, OSError):
entries = []
# Append new entry
entries.append(entry)
# Write atomically
tmp_path.write_text(json.dumps(entries, indent=2), encoding='utf-8')
tmp_path.chmod(0o600) # Restrictive permissions
tmp_path.rename(self.log_path)