Files
obsidian-rag/python/tests/unit/test_security.py
Santhosh Janardhanan 5c281165c7 Sprint 0-1: Python indexer, TS plugin scaffolding, and test suite
## What's new

**Python indexer (`python/obsidian_rag/`)** — full pipeline from scan to LanceDB:
- `config.py` — JSON config loader with cross-platform path resolution
- `security.py` — path traversal prevention, HTML stripping, sensitive content detection, dir allow/deny lists
- `chunker.py` — section-split for journal entries (date-named files), sliding-window for unstructured notes
- `embedder.py` — Ollama `/api/embeddings` client with batched requests and timeout/error handling
- `vector_store.py` — LanceDB schema, upsert (merge_insert), delete, search with filters, stats
- `indexer.py` — full/sync/reindex pipeline orchestrator with progress yields
- `cli.py` — `index | sync | reindex | status` CLI commands

**TypeScript plugin (`src/`)** — OpenClaw plugin scaffold:
- `utils/` — config loader, TypeScript types, response envelope factory, LanceDB client
- `services/` — health state machine (HEALTHY/DEGRADED/UNAVAILABLE), vault watcher with debounce/batching, indexer bridge (subprocess spawner)
- `tools/` — 4 tool stubs: search, index, status, memory_store (OpenClaw wiring pending)
- `index.ts` — plugin entry point with health probe + vault watcher startup

**Config** (`obsidian-rag/config.json`, `openclaw.plugin.json`):
- 627 files / 3764 chunks indexed in dev vault

**Tests: 76 passing**
- Python: 64 pytest tests (chunker, security, vector_store, config)
- TypeScript: 12 vitest tests (lancedb client, response envelope)

## Bugs fixed

- LanceDB `tags` column filter: `LIKE '%tag%'` → `list_contains(tags, 'tag')` (List<String> column)
- LanceDB JS `db.list_tables()` returns `ListTablesResponse` object, not plain array
- LanceDB JS result score field: `_score` → `_distance`
- TypeScript regex literal with unescaped `/` in path-resolve regex
- Python: `create_table_if_not_exists` identity check → name comparison

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-10 22:56:50 -04:00

255 lines
7.0 KiB
Python

"""Tests for obsidian_rag.security — path traversal, sanitization, sensitive detection."""
from __future__ import annotations
from pathlib import Path
import tempfile
from unittest.mock import MagicMock
import pytest
from obsidian_rag.security import (
detect_sensitive,
filter_tags,
is_symlink_outside_vault,
sanitize_text,
should_index_dir,
validate_path,
)
# ----------------------------------------------------------------------
# validate_path
# ----------------------------------------------------------------------
def test_validate_path_normal_file(tmp_path: Path):
vault = tmp_path / "vault"
vault.mkdir()
target = vault / "subdir" / "note.md"
target.parent.mkdir()
target.touch()
result = validate_path(Path("subdir/note.md"), vault)
assert result == target.resolve()
def test_validate_path_traversal_attempt(tmp_path: Path):
vault = tmp_path / "vault"
vault.mkdir()
with pytest.raises(ValueError, match="traversal"):
validate_path(Path("../etc/passwd"), vault)
def test_validate_path_deep_traversal(tmp_path: Path):
vault = tmp_path / "vault"
vault.mkdir()
with pytest.raises(ValueError, match="traversal"):
validate_path(Path("subdir/../../../etc/passwd"), vault)
def test_validate_path_absolute_path(tmp_path: Path):
vault = tmp_path / "vault"
vault.mkdir()
with pytest.raises(ValueError):
validate_path(Path("/etc/passwd"), vault)
def test_validate_path_path_with_dotdot_in_resolve(tmp_path: Path):
"""Path that resolves inside vault but has .. in parts should be caught."""
vault = tmp_path / "vault"
vault.mkdir()
sub = vault / "subdir"
sub.mkdir()
# validate_path checks parts for ".."
with pytest.raises(ValueError, match="traversal"):
validate_path(Path("subdir/../subdir/../note.md"), vault)
# ----------------------------------------------------------------------
# is_symlink_outside_vault
# ----------------------------------------------------------------------
def test_is_symlink_outside_vault_internal(tmp_path: Path):
vault = tmp_path / "vault"
vault.mkdir()
note = vault / "note.md"
note.touch()
link = vault / "link.md"
link.symlink_to(note)
assert is_symlink_outside_vault(link, vault) is False
def test_is_symlink_outside_vault_external(tmp_path: Path):
vault = tmp_path / "vault"
vault.mkdir()
outside = tmp_path / "outside.md"
outside.touch()
link = vault / "link.md"
link.symlink_to(outside)
assert is_symlink_outside_vault(link, vault) is True
# ----------------------------------------------------------------------
# sanitize_text
# ----------------------------------------------------------------------
def test_sanitize_text_strips_html():
raw = "<script>alert('xss')</script>Hello #world"
result = sanitize_text(raw)
assert "<script>" not in result
assert "Hello #world" in result
# Text content inside HTML tags is preserved (sanitize_text strips the tags only)
def test_sanitize_text_removes_code_blocks():
raw = """Some text
```
secret_api_key = "sk-12345"
```
More text
"""
result = sanitize_text(raw)
assert "secret_api_key" not in result
assert "Some text" in result
assert "More text" in result
def test_sanitize_text_normalizes_whitespace():
raw = "Hello\n\n\n world\t\t spaces"
result = sanitize_text(raw)
assert "\n" not in result
assert "\t" not in result
assert " " not in result
def test_sanitize_text_caps_length():
long_text = "word " * 1000
result = sanitize_text(long_text)
assert len(result) <= 2000
def test_sanitize_text_preserves_hashtags():
raw = "#mentalhealth #python #machine-learning"
result = sanitize_text(raw)
assert "#mentalhealth" in result
assert "#python" in result
# ----------------------------------------------------------------------
# detect_sensitive
# ----------------------------------------------------------------------
def test_detect_sensitive_mental_health_section():
text = " #mentalhealth section content"
sensitive_sections = ["#mentalhealth", "#physicalhealth", "#Relations"]
patterns = {"financial": [], "health": []}
result = detect_sensitive(text, sensitive_sections, patterns)
assert result["health"] is True
def test_detect_sensitive_financial_pattern():
text = "I owe Sreenivas $50 and need to pay it back"
sensitive_sections = ["#mentalhealth"]
patterns = {"financial": ["owe", "$"], "health": []}
result = detect_sensitive(text, sensitive_sections, patterns)
assert result["financial"] is True
assert result["health"] is False
def test_detect_sensitive_relations():
text = "Had coffee with Sarah #Relations"
sensitive_sections = ["#Relations"]
patterns = {"financial": [], "health": []}
result = detect_sensitive(text, sensitive_sections, patterns)
# Only specific health sections set health=true
assert result["relations"] is False
def test_detect_sensitive_clean_text():
text = "This is a normal note about cooking dinner."
sensitive_sections = []
patterns = {"financial": [], "health": []}
result = detect_sensitive(text, sensitive_sections, patterns)
assert result == {"health": False, "financial": False, "relations": False}
# ----------------------------------------------------------------------
# should_index_dir
# ----------------------------------------------------------------------
def _mock_config() -> MagicMock:
cfg = MagicMock()
cfg.indexing.allow_dirs = []
cfg.indexing.deny_dirs = [".obsidian", ".trash", "zzz-Archive", ".git"]
return cfg
def test_should_index_dir_allows_normal():
cfg = _mock_config()
assert should_index_dir("Journal", cfg) is True
assert should_index_dir("Finance", cfg) is True
assert should_index_dir("Projects", cfg) is True
def test_should_index_dir_denies_hidden():
cfg = _mock_config()
assert should_index_dir(".obsidian", cfg) is False
assert should_index_dir(".git", cfg) is False
assert should_index_dir(".trash", cfg) is False
def test_should_index_dir_denies_configured():
cfg = _mock_config()
assert should_index_dir("zzz-Archive", cfg) is False
def test_should_index_dir_allow_list_override():
cfg = _mock_config()
cfg.indexing.allow_dirs = ["Journal", "Finance"]
assert should_index_dir("Journal", cfg) is True
assert should_index_dir("Finance", cfg) is True
assert should_index_dir("Projects", cfg) is False
# ----------------------------------------------------------------------
# filter_tags
# ----------------------------------------------------------------------
def test_filter_tags_basic():
text = "Hello #world and #python tags #AI"
tags = filter_tags(text)
assert "#world" in tags
assert "#python" in tags
assert "#ai" in tags
def test_filter_tags_deduplicates():
text = "#hello #world #hello"
tags = filter_tags(text)
assert len(tags) == 2
def test_filter_tags_no_tags():
text = "just plain text without any hashtags"
assert filter_tags(text) == []