## What's new **Python indexer (`python/obsidian_rag/`)** — full pipeline from scan to LanceDB: - `config.py` — JSON config loader with cross-platform path resolution - `security.py` — path traversal prevention, HTML stripping, sensitive content detection, dir allow/deny lists - `chunker.py` — section-split for journal entries (date-named files), sliding-window for unstructured notes - `embedder.py` — Ollama `/api/embeddings` client with batched requests and timeout/error handling - `vector_store.py` — LanceDB schema, upsert (merge_insert), delete, search with filters, stats - `indexer.py` — full/sync/reindex pipeline orchestrator with progress yields - `cli.py` — `index | sync | reindex | status` CLI commands **TypeScript plugin (`src/`)** — OpenClaw plugin scaffold: - `utils/` — config loader, TypeScript types, response envelope factory, LanceDB client - `services/` — health state machine (HEALTHY/DEGRADED/UNAVAILABLE), vault watcher with debounce/batching, indexer bridge (subprocess spawner) - `tools/` — 4 tool stubs: search, index, status, memory_store (OpenClaw wiring pending) - `index.ts` — plugin entry point with health probe + vault watcher startup **Config** (`obsidian-rag/config.json`, `openclaw.plugin.json`): - 627 files / 3764 chunks indexed in dev vault **Tests: 76 passing** - Python: 64 pytest tests (chunker, security, vector_store, config) - TypeScript: 12 vitest tests (lancedb client, response envelope) ## Bugs fixed - LanceDB `tags` column filter: `LIKE '%tag%'` → `list_contains(tags, 'tag')` (List<String> column) - LanceDB JS `db.list_tables()` returns `ListTablesResponse` object, not plain array - LanceDB JS result score field: `_score` → `_distance` - TypeScript regex literal with unescaped `/` in path-resolve regex - Python: `create_table_if_not_exists` identity check → name comparison Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
164 lines
4.8 KiB
Python
164 lines
4.8 KiB
Python
"""Path traversal prevention, input sanitization, sensitive content detection, directory access control."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from obsidian_rag.config import ObsidianRagConfig
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Path traversal
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def validate_path(requested: Path, vault_root: Path) -> Path:
|
|
"""Resolve requested relative to vault_root and reject anything escaping the vault.
|
|
|
|
Raises ValueError on traversal attempts.
|
|
"""
|
|
# Resolve both to absolute paths
|
|
vault = vault_root.resolve()
|
|
try:
|
|
resolved = (vault / requested).resolve()
|
|
except (OSError, ValueError) as e:
|
|
raise ValueError(f"Cannot resolve path: {requested}") from e
|
|
|
|
# Check the resolved path is under vault
|
|
try:
|
|
resolved.relative_to(vault)
|
|
except ValueError:
|
|
raise ValueError(f"Path traversal attempt blocked: {requested} resolves outside vault")
|
|
|
|
# Reject obvious traversal
|
|
if ".." in requested.parts:
|
|
raise ValueError(f"Path traversal attempt blocked: {requested}")
|
|
|
|
return resolved
|
|
|
|
|
|
def is_symlink_outside_vault(path: Path, vault_root: Path) -> bool:
|
|
"""Check if path is a symlink that resolves outside the vault."""
|
|
try:
|
|
resolved = path.resolve()
|
|
vault = vault_root.resolve()
|
|
# Check if any parent (including self) is outside vault
|
|
try:
|
|
resolved.relative_to(vault)
|
|
return False
|
|
except ValueError:
|
|
return True
|
|
except (OSError, ValueError):
|
|
return True
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Input sanitization
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
HTML_TAG_RE = re.compile(r"<[^>]+>")
|
|
CODE_BLOCK_RE = re.compile(r"```[\s\S]*?```", re.MULTILINE)
|
|
MULTI_WHITESPACE_RE = re.compile(r"\s+")
|
|
MAX_CHUNK_LEN = 2000
|
|
|
|
|
|
def sanitize_text(raw: str) -> str:
|
|
"""Sanitize raw vault content before embedding.
|
|
|
|
- Strip HTML tags (prevent XSS)
|
|
- Remove fenced code blocks
|
|
- Normalize whitespace
|
|
- Cap length at MAX_CHUNK_LEN chars
|
|
"""
|
|
# Remove fenced code blocks
|
|
text = CODE_BLOCK_RE.sub(" ", raw)
|
|
# Strip HTML tags
|
|
text = HTML_TAG_RE.sub("", text)
|
|
# Remove leading/trailing whitespace
|
|
text = text.strip()
|
|
# Normalize internal whitespace
|
|
text = MULTI_WHITESPACE_RE.sub(" ", text)
|
|
# Cap length
|
|
if len(text) > MAX_CHUNK_LEN:
|
|
text = text[:MAX_CHUNK_LEN]
|
|
return text
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Sensitive content detection
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def detect_sensitive(
|
|
text: str,
|
|
sensitive_sections: list[str],
|
|
patterns: dict[str, list[str]],
|
|
) -> dict[str, bool]:
|
|
"""Detect sensitive content categories in text.
|
|
|
|
Returns dict with keys: health, financial, relations.
|
|
"""
|
|
text_lower = text.lower()
|
|
result: dict[str, bool] = {
|
|
"health": False,
|
|
"financial": False,
|
|
"relations": False,
|
|
}
|
|
|
|
# Check for sensitive section headings in the text
|
|
for section in sensitive_sections:
|
|
if section.lower() in text_lower:
|
|
result["health"] = result["health"] or section.lower() in ["#mentalhealth", "#physicalhealth"]
|
|
|
|
# Pattern matching
|
|
financial_patterns = patterns.get("financial", [])
|
|
health_patterns = patterns.get("health", [])
|
|
|
|
for pat in financial_patterns:
|
|
if pat.lower() in text_lower:
|
|
result["financial"] = True
|
|
break
|
|
|
|
for pat in health_patterns:
|
|
if pat.lower() in text_lower:
|
|
result["health"] = True
|
|
break
|
|
|
|
return result
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Directory access control
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
|
def should_index_dir(
|
|
dir_name: str,
|
|
config: "ObsidianRagConfig",
|
|
) -> bool:
|
|
"""Apply deny/allow list rules to a directory.
|
|
|
|
If allow_dirs is non-empty, only those dirs are allowed.
|
|
If deny_dirs matches, the dir is rejected.
|
|
Hidden dirs (starting with '.') are always rejected.
|
|
"""
|
|
# Always reject hidden directories
|
|
if dir_name.startswith("."):
|
|
return False
|
|
|
|
# If allow list is set, only those dirs are allowed
|
|
if config.indexing.allow_dirs:
|
|
return dir_name in config.indexing.allow_dirs
|
|
|
|
# Otherwise reject any deny-listed directory
|
|
deny = config.indexing.deny_dirs
|
|
return dir_name not in deny
|
|
|
|
|
|
def filter_tags(text: str) -> list[str]:
|
|
"""Extract all #hashtags from text, lowercased and deduplicated."""
|
|
return list(dict.fromkeys(tag.lower() for tag in re.findall(r"#\w+", text))) |