96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
"""Audit logging for sensitive data access and system events."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import getpass
|
|
import json
|
|
import socket
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class AuditLogger:
|
|
"""Secure audit logger for sensitive content access."""
|
|
|
|
def __init__(self, log_path: Path):
|
|
self.log_path = log_path
|
|
self.log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_sensitive_access(
|
|
self,
|
|
file_path: str,
|
|
content_type: str,
|
|
action: str,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Log access to sensitive content with redaction."""
|
|
entry = {
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'file_path': self._redact_path(file_path),
|
|
'content_type': content_type,
|
|
'action': action,
|
|
'user': getpass.getuser(),
|
|
'ip_address': self._get_local_ip(),
|
|
'metadata': metadata or {},
|
|
}
|
|
self._write_entry(entry)
|
|
|
|
def log_security_event(
|
|
self,
|
|
event_type: str,
|
|
severity: str,
|
|
description: str,
|
|
details: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Log security-related events."""
|
|
entry = {
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'event_type': event_type,
|
|
'severity': severity,
|
|
'description': description,
|
|
'user': getpass.getuser(),
|
|
'ip_address': self._get_local_ip(),
|
|
'details': details or {},
|
|
}
|
|
self._write_entry(entry)
|
|
|
|
def _redact_path(self, path: str) -> str:
|
|
"""Redact sensitive information from file paths."""
|
|
# Basic redaction - keep filename but remove sensitive path components
|
|
try:
|
|
p = Path(path)
|
|
if any(part.startswith('.') for part in p.parts):
|
|
return f".../{p.name}"
|
|
return str(p)
|
|
except Exception:
|
|
return "<redacted>"
|
|
|
|
def _get_local_ip(self) -> str:
|
|
"""Get local IP address for audit logging."""
|
|
try:
|
|
return socket.gethostbyname(socket.gethostname())
|
|
except Exception:
|
|
return "127.0.0.1"
|
|
|
|
def _write_entry(self, entry: dict[str, Any]) -> None:
|
|
"""Atomically append to audit log with secure permissions."""
|
|
# Write to temporary file first
|
|
tmp_path = self.log_path.with_suffix('.tmp')
|
|
|
|
# Read existing entries
|
|
entries = []
|
|
if self.log_path.exists():
|
|
try:
|
|
entries = json.loads(self.log_path.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
entries = []
|
|
|
|
# Append new entry
|
|
entries.append(entry)
|
|
|
|
# Write atomically
|
|
tmp_path.write_text(json.dumps(entries, indent=2), encoding='utf-8')
|
|
tmp_path.chmod(0o600) # Restrictive permissions
|
|
tmp_path.rename(self.log_path)
|