Files
obsidian-rag/python/obsidian_rag/security.py
Santhosh Janardhanan 5c281165c7 Sprint 0-1: Python indexer, TS plugin scaffolding, and test suite
## What's new

**Python indexer (`python/obsidian_rag/`)** — full pipeline from scan to LanceDB:
- `config.py` — JSON config loader with cross-platform path resolution
- `security.py` — path traversal prevention, HTML stripping, sensitive content detection, dir allow/deny lists
- `chunker.py` — section-split for journal entries (date-named files), sliding-window for unstructured notes
- `embedder.py` — Ollama `/api/embeddings` client with batched requests and timeout/error handling
- `vector_store.py` — LanceDB schema, upsert (merge_insert), delete, search with filters, stats
- `indexer.py` — full/sync/reindex pipeline orchestrator with progress yields
- `cli.py` — `index | sync | reindex | status` CLI commands

**TypeScript plugin (`src/`)** — OpenClaw plugin scaffold:
- `utils/` — config loader, TypeScript types, response envelope factory, LanceDB client
- `services/` — health state machine (HEALTHY/DEGRADED/UNAVAILABLE), vault watcher with debounce/batching, indexer bridge (subprocess spawner)
- `tools/` — 4 tool stubs: search, index, status, memory_store (OpenClaw wiring pending)
- `index.ts` — plugin entry point with health probe + vault watcher startup

**Config** (`obsidian-rag/config.json`, `openclaw.plugin.json`):
- 627 files / 3764 chunks indexed in dev vault

**Tests: 76 passing**
- Python: 64 pytest tests (chunker, security, vector_store, config)
- TypeScript: 12 vitest tests (lancedb client, response envelope)

## Bugs fixed

- LanceDB `tags` column filter: `LIKE '%tag%'` → `list_contains(tags, 'tag')` (List<String> column)
- LanceDB JS `db.list_tables()` returns `ListTablesResponse` object, not plain array
- LanceDB JS result score field: `_score` → `_distance`
- TypeScript regex literal with unescaped `/` in path-resolve regex
- Python: `create_table_if_not_exists` identity check → name comparison

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-10 22:56:50 -04:00

164 lines
4.8 KiB
Python

"""Path traversal prevention, input sanitization, sensitive content detection, directory access control."""
from __future__ import annotations
import re
import unicodedata
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from obsidian_rag.config import ObsidianRagConfig
# ----------------------------------------------------------------------
# Path traversal
# ----------------------------------------------------------------------
def validate_path(requested: Path, vault_root: Path) -> Path:
"""Resolve requested relative to vault_root and reject anything escaping the vault.
Raises ValueError on traversal attempts.
"""
# Resolve both to absolute paths
vault = vault_root.resolve()
try:
resolved = (vault / requested).resolve()
except (OSError, ValueError) as e:
raise ValueError(f"Cannot resolve path: {requested}") from e
# Check the resolved path is under vault
try:
resolved.relative_to(vault)
except ValueError:
raise ValueError(f"Path traversal attempt blocked: {requested} resolves outside vault")
# Reject obvious traversal
if ".." in requested.parts:
raise ValueError(f"Path traversal attempt blocked: {requested}")
return resolved
def is_symlink_outside_vault(path: Path, vault_root: Path) -> bool:
"""Check if path is a symlink that resolves outside the vault."""
try:
resolved = path.resolve()
vault = vault_root.resolve()
# Check if any parent (including self) is outside vault
try:
resolved.relative_to(vault)
return False
except ValueError:
return True
except (OSError, ValueError):
return True
# ----------------------------------------------------------------------
# Input sanitization
# ----------------------------------------------------------------------
HTML_TAG_RE = re.compile(r"<[^>]+>")
CODE_BLOCK_RE = re.compile(r"```[\s\S]*?```", re.MULTILINE)
MULTI_WHITESPACE_RE = re.compile(r"\s+")
MAX_CHUNK_LEN = 2000
def sanitize_text(raw: str) -> str:
"""Sanitize raw vault content before embedding.
- Strip HTML tags (prevent XSS)
- Remove fenced code blocks
- Normalize whitespace
- Cap length at MAX_CHUNK_LEN chars
"""
# Remove fenced code blocks
text = CODE_BLOCK_RE.sub(" ", raw)
# Strip HTML tags
text = HTML_TAG_RE.sub("", text)
# Remove leading/trailing whitespace
text = text.strip()
# Normalize internal whitespace
text = MULTI_WHITESPACE_RE.sub(" ", text)
# Cap length
if len(text) > MAX_CHUNK_LEN:
text = text[:MAX_CHUNK_LEN]
return text
# ----------------------------------------------------------------------
# Sensitive content detection
# ----------------------------------------------------------------------
def detect_sensitive(
text: str,
sensitive_sections: list[str],
patterns: dict[str, list[str]],
) -> dict[str, bool]:
"""Detect sensitive content categories in text.
Returns dict with keys: health, financial, relations.
"""
text_lower = text.lower()
result: dict[str, bool] = {
"health": False,
"financial": False,
"relations": False,
}
# Check for sensitive section headings in the text
for section in sensitive_sections:
if section.lower() in text_lower:
result["health"] = result["health"] or section.lower() in ["#mentalhealth", "#physicalhealth"]
# Pattern matching
financial_patterns = patterns.get("financial", [])
health_patterns = patterns.get("health", [])
for pat in financial_patterns:
if pat.lower() in text_lower:
result["financial"] = True
break
for pat in health_patterns:
if pat.lower() in text_lower:
result["health"] = True
break
return result
# ----------------------------------------------------------------------
# Directory access control
# ----------------------------------------------------------------------
def should_index_dir(
dir_name: str,
config: "ObsidianRagConfig",
) -> bool:
"""Apply deny/allow list rules to a directory.
If allow_dirs is non-empty, only those dirs are allowed.
If deny_dirs matches, the dir is rejected.
Hidden dirs (starting with '.') are always rejected.
"""
# Always reject hidden directories
if dir_name.startswith("."):
return False
# If allow list is set, only those dirs are allowed
if config.indexing.allow_dirs:
return dir_name in config.indexing.allow_dirs
# Otherwise reject any deny-listed directory
deny = config.indexing.deny_dirs
return dir_name not in deny
def filter_tags(text: str) -> list[str]:
"""Extract all #hashtags from text, lowercased and deduplicated."""
return list(dict.fromkeys(tag.lower() for tag in re.findall(r"#\w+", text)))