Files
kv-ai/docs/superpowers/plans/2026-04-13-personal-companion-ai-phase1.md

60 KiB

Personal Companion AI — Implementation Plan Phase 1: Vault Indexer + RAG Engine

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build a fully working vault indexer and RAG engine that can watch, chunk, embed, and search 677+ Obsidian markdown files locally using Ollama and LanceDB.

Architecture: Decoupled Python services. The RAG engine handles markdown chunking (with per-directory rules), embedding via Ollama, and LanceDB storage. The indexer daemon watches the vault filesystem and triggers incremental or full syncs. A simple search CLI proves end-to-end retrieval works.

Tech Stack: Python 3.11+, LanceDB, Ollama (mxbai-embed-large), watchdog for file watching, pytest for testing, pydantic for config.


File Map

File Responsibility
pyproject.toml Python project deps and metadata
config.json Runtime configuration (vault path, chunking rules, Ollama settings)
src/config.py Load and validate config.json into typed Pydantic models
src/rag/chunker.py Parse markdown, apply chunking rules (sliding window + section-based), emit chunks with metadata
src/rag/embedder.py HTTP client for Ollama embeddings with batching and retries
src/rag/vector_store.py LanceDB wrapper: init table, upsert, search, delete by source_file
src/rag/indexer.py Orchestrate full sync and incremental sync: scan files, chunk, embed, store
src/rag/search.py High-level search interface: embed query, run vector + optional keyword hybrid search
src/indexer_daemon/cli.py Click/Typer CLI for index, sync, reindex, status commands
src/indexer_daemon/watcher.py watchdog observer that triggers incremental sync on .md changes
tests/test_chunker.py Unit tests for all chunking strategies
tests/test_embedder.py Mocked tests for Ollama client
tests/test_vector_store.py LanceDB CRUD and search tests
tests/test_indexer.py End-to-end sync tests with temp vault

Task 1: Project Scaffolding

Files:

  • Create: pyproject.toml

  • Create: config.json

  • Create: .gitignore

  • Step 1: Write pyproject.toml

[project]
name = "companion"
version = "0.1.0"
description = "Personal companion AI with local RAG"
requires-python = ">=3.11"
dependencies = [
    "pydantic>=2.0",
    "lancedb>=0.9.0",
    "pyarrow>=15.0.0",
    "requests>=2.31.0",
    "watchdog>=4.0.0",
    "typer>=0.12.0",
    "rich>=13.0.0",
    "numpy>=1.26.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-asyncio>=0.23.0",
    "httpx>=0.27.0",
    "respx>=0.21.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
  • Step 2: Write config.json
{
  "companion": {
    "name": "SAN",
    "persona": {
      "role": "companion",
      "tone": "reflective",
      "style": "questioning",
      "boundaries": [
        "does_not_impersonate_user",
        "no_future_predictions",
        "no_medical_or_legal_advice"
      ]
    },
    "memory": {
      "session_turns": 20,
      "persistent_store": "~/.companion/memory.db",
      "summarize_after": 10
    },
    "chat": {
      "streaming": true,
      "max_response_tokens": 2048,
      "default_temperature": 0.7,
      "allow_temperature_override": true
    }
  },
  "vault": {
    "path": "./sample-data/Default",
    "indexing": {
      "auto_sync": true,
      "auto_sync_interval_minutes": 1440,
      "watch_fs_events": true,
      "file_patterns": ["*.md"],
      "deny_dirs": [".obsidian", ".trash", "zzz-Archive", ".git", ".logseq"],
      "deny_patterns": ["*.tmp", "*.bak", "*conflict*", ".*"]
    },
    "chunking_rules": {
      "default": {
        "strategy": "sliding_window",
        "chunk_size": 500,
        "chunk_overlap": 100
      },
      "Journal/**": {
        "strategy": "section",
        "section_tags": ["#DayInShort", "#mentalhealth", "#physicalhealth", "#work", "#finance", "#Relations"],
        "chunk_size": 300,
        "chunk_overlap": 50
      },
      "zzz-Archive/**": {
        "strategy": "sliding_window",
        "chunk_size": 800,
        "chunk_overlap": 150
      }
    }
  },
  "rag": {
    "embedding": {
      "provider": "ollama",
      "model": "mxbai-embed-large",
      "base_url": "http://localhost:11434",
      "dimensions": 1024,
      "batch_size": 32
    },
    "vector_store": {
      "type": "lancedb",
      "path": "./.companion/vectors.lance"
    },
    "search": {
      "default_top_k": 8,
      "max_top_k": 20,
      "similarity_threshold": 0.75,
      "hybrid_search": {
        "enabled": true,
        "keyword_weight": 0.3,
        "semantic_weight": 0.7
      },
      "filters": {
        "date_range_enabled": true,
        "tag_filter_enabled": true,
        "directory_filter_enabled": true
      }
    }
  },
  "model": {
    "inference": {
      "backend": "llama.cpp",
      "model_path": "~/.companion/models/companion-7b-q4.gguf",
      "context_length": 8192,
      "gpu_layers": 35,
      "batch_size": 512,
      "threads": 8
    },
    "fine_tuning": {
      "base_model": "unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit",
      "output_dir": "~/.companion/training",
      "lora_rank": 16,
      "lora_alpha": 32,
      "learning_rate": 0.0002,
      "batch_size": 4,
      "gradient_accumulation_steps": 4,
      "num_epochs": 3,
      "warmup_steps": 100,
      "save_steps": 500,
      "eval_steps": 250,
      "training_data_path": "~/.companion/training_data/",
      "validation_split": 0.1
    },
    "retrain_schedule": {
      "auto_reminder": true,
      "default_interval_days": 90,
      "reminder_channels": ["chat_stream", "log"]
    }
  },
  "api": {
    "host": "127.0.0.1",
    "port": 7373,
    "cors_origins": ["http://localhost:5173"],
    "auth": {
      "enabled": false
    }
  },
  "ui": {
    "web": {
      "enabled": true,
      "theme": "obsidian",
      "features": {
        "streaming": true,
        "citations": true,
        "source_preview": true
      }
    },
    "cli": {
      "enabled": true,
      "rich_output": true
    }
  },
  "logging": {
    "level": "INFO",
    "file": "./.companion/logs/companion.log",
    "max_size_mb": 100,
    "backup_count": 5
  },
  "security": {
    "local_only": true,
    "vault_path_traversal_check": true,
    "sensitive_content_detection": true,
    "sensitive_patterns": ["#mentalhealth", "#physicalhealth", "#finance", "#Relations"],
    "require_confirmation_for_external_apis": true
  }
}
  • Step 3: Write .gitignore
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
.pytest_cache/
.mypy_cache/
.venv/
venv/
.companion/
dist/
build/
  • Step 4: Install dependencies

Run:

pip install -e ".[dev]"

Expected: installs all packages without errors.

  • Step 5: Commit
git add pyproject.toml config.json .gitignore
git commit -m "chore: scaffold companion project with deps and config"

Task 2: Configuration Loader

Files:

  • Create: src/config.py

  • Create: tests/test_config.py

  • Step 1: Write failing test for config loading

# tests/test_config.py
import json
import os
import tempfile

from src.config import load_config


def test_load_config_reads_json_and_expands_tilde():
    with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
        json.dump({
            "vault": {"path": "~/test-vault"},
            "rag": {"vector_store": {"path": "~/.companion/vectors.lance"}}
        }, f)
        path = f.name
    try:
        config = load_config(path)
        assert config.vault.path == os.path.expanduser("~/test-vault")
        assert config.rag.vector_store.path == os.path.expanduser("~/.companion/vectors.lance")
    finally:
        os.unlink(path)

Run:

pytest tests/test_config.py -v

Expected: FAIL — src.config module not found.

  • Step 2: Implement src/config.py
# src/config.py
from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field


class PersonaConfig(BaseModel):
    role: str
    tone: str
    style: str
    boundaries: list[str]


class MemoryConfig(BaseModel):
    session_turns: int
    persistent_store: str
    summarize_after: int


class ChatConfig(BaseModel):
    streaming: bool
    max_response_tokens: int
    default_temperature: float
    allow_temperature_override: bool


class CompanionConfig(BaseModel):
    name: str
    persona: PersonaConfig
    memory: MemoryConfig
    chat: ChatConfig


class IndexingConfig(BaseModel):
    auto_sync: bool
    auto_sync_interval_minutes: int
    watch_fs_events: bool
    file_patterns: list[str]
    deny_dirs: list[str]
    deny_patterns: list[str]


class ChunkingRule(BaseModel):
    strategy: str
    chunk_size: int
    chunk_overlap: int
    section_tags: list[str] | None = None


class VaultConfig(BaseModel):
    path: str
    indexing: IndexingConfig
    chunking_rules: dict[str, ChunkingRule]


class EmbeddingConfig(BaseModel):
    provider: str
    model: str
    base_url: str
    dimensions: int
    batch_size: int


class VectorStoreConfig(BaseModel):
    type: str
    path: str


class HybridSearchConfig(BaseModel):
    enabled: bool
    keyword_weight: float
    semantic_weight: float


class FiltersConfig(BaseModel):
    date_range_enabled: bool
    tag_filter_enabled: bool
    directory_filter_enabled: bool


class SearchConfig(BaseModel):
    default_top_k: int
    max_top_k: int
    similarity_threshold: float
    hybrid_search: HybridSearchConfig
    filters: FiltersConfig


class RagConfig(BaseModel):
    embedding: EmbeddingConfig
    vector_store: VectorStoreConfig
    search: SearchConfig


class InferenceConfig(BaseModel):
    backend: str
    model_path: str
    context_length: int
    gpu_layers: int
    batch_size: int
    threads: int


class FineTuningConfig(BaseModel):
    base_model: str
    output_dir: str
    lora_rank: int
    lora_alpha: int
    learning_rate: float
    batch_size: int
    gradient_accumulation_steps: int
    num_epochs: int
    warmup_steps: int
    save_steps: int
    eval_steps: int
    training_data_path: str
    validation_split: float


class RetrainScheduleConfig(BaseModel):
    auto_reminder: bool
    default_interval_days: int
    reminder_channels: list[str]


class ModelConfig(BaseModel):
    inference: InferenceConfig
    fine_tuning: FineTuningConfig
    retrain_schedule: RetrainScheduleConfig


class AuthConfig(BaseModel):
    enabled: bool


class ApiConfig(BaseModel):
    host: str
    port: int
    cors_origins: list[str]
    auth: AuthConfig


class WebFeaturesConfig(BaseModel):
    streaming: bool
    citations: bool
    source_preview: bool


class WebConfig(BaseModel):
    enabled: bool
    theme: str
    features: WebFeaturesConfig


class CliConfig(BaseModel):
    enabled: bool
    rich_output: bool


class UiConfig(BaseModel):
    web: WebConfig
    cli: CliConfig


class LoggingConfig(BaseModel):
    level: str
    file: str
    max_size_mb: int
    backup_count: int


class SecurityConfig(BaseModel):
    local_only: bool
    vault_path_traversal_check: bool
    sensitive_content_detection: bool
    sensitive_patterns: list[str]
    require_confirmation_for_external_apis: bool


class Config(BaseModel):
    companion: CompanionConfig
    vault: VaultConfig
    rag: RagConfig
    model: ModelConfig
    api: ApiConfig
    ui: UiConfig
    logging: LoggingConfig
    security: SecurityConfig


def _expand_tilde(obj: Any) -> Any:
    if isinstance(obj, str) and obj.startswith("~/"):
        return os.path.expanduser(obj)
    if isinstance(obj, dict):
        return {k: _expand_tilde(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_expand_tilde(item) for item in obj]
    return obj


def load_config(path: str | Path = "config.json") -> Config:
    with open(path, "r", encoding="utf-8") as f:
        raw = json.load(f)
    expanded = _expand_tilde(raw)
    return Config.model_validate(expanded)
  • Step 3: Run test

Run:

pytest tests/test_config.py -v

Expected: PASS.

  • Step 4: Commit
git add src/config.py tests/test_config.py
git commit -m "feat: add typed configuration loader with tilde expansion"

Task 3: Markdown Chunker

Files:

  • Create: src/rag/chunker.py

  • Create: tests/test_chunker.py

  • Step 1: Write failing test for sliding window chunker

# tests/test_chunker.py
from src.rag.chunker import sliding_window_chunks


def test_sliding_window_basic():
    text = "word " * 100
    chunks = sliding_window_chunks(text, chunk_size=20, chunk_overlap=5)
    assert len(chunks) > 1
    assert len(chunks[0].split()) == 20
    # overlap check: last 5 words of chunk 0 should appear in chunk 1
    last_five = chunks[0].split()[-5:]
    first_chunk1 = chunks[1].split()[:5]
    assert last_five == first_chunk1

Run:

pytest tests/test_chunker.py::test_sliding_window_basic -v

Expected: FAIL — src.rag.chunker not found.

  • Step 2: Implement src/rag/chunker.py with sliding window and section chunkers
# src/rag/chunker.py
from __future__ import annotations

import fnmatch
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable


@dataclass(frozen=True)
class Chunk:
    text: str
    source_file: str
    source_directory: str
    section: str | None = None
    date: str | None = None
    tags: list[str] | None = None
    chunk_index: int = 0
    total_chunks: int = 1
    modified_at: float | None = None
    rule_applied: str = "default"


def _extract_tags(text: str) -> list[str]:
    hashtags = re.findall(r"#\w+", text)
    wikilinks = re.findall(r"\[\[([^\]]+)\]\]", text)
    return hashtags + wikilinks


def _extract_headings(text: str) -> list[str]:
    return re.findall(r"^#+\s*(.+)$", text, flags=re.MULTILINE)


def _parse_date_from_filename(filename: str) -> str | None:
    # YYYY-MM-DD or "DD MMM YYYY"
    m = re.search(r"(\d{4}-\d{2}-\d{2})", filename)
    if m:
        return m.group(1)
    m = re.search(r"(\d{2}\s+[A-Za-z]{3}\s+\d{4})", filename)
    if m:
        return m.group(1)
    return None


def sliding_window_chunks(text: str, chunk_size: int, chunk_overlap: int) -> list[str]:
    words = text.split()
    if len(words) <= chunk_size:
        return [" ".join(words)]
    chunks: list[str] = []
    step = chunk_size - chunk_overlap
    for i in range(0, len(words), step):
        window = words[i : i + chunk_size]
        chunks.append(" ".join(window))
        if i + chunk_size >= len(words):
            break
    return chunks


def section_based_chunks(text: str, section_tags: list[str] | None, chunk_size: int, chunk_overlap: int) -> list[tuple[str, str | None]]:
    """Split by section tags, then apply sliding window within each section."""
    if not section_tags:
        return [(chunk, None) for chunk in sliding_window_chunks(text, chunk_size, chunk_overlap)]

    # Build regex for any of the section tags at start of line
    escaped = [re.escape(tag) for tag in section_tags]
    pattern = re.compile(r"^(" + "|".join(escaped) + r")", flags=re.MULTILINE)
    matches = list(pattern.finditer(text))
    if not matches:
        return [(chunk, None) for chunk in sliding_window_chunks(text, chunk_size, chunk_overlap)]

    sections: list[tuple[str, str | None]] = []
    for i, match in enumerate(matches):
        start = match.start()
        end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
        section_text = text[start:end].strip()
        section_name = match.group(1)
        for chunk in sliding_window_chunks(section_text, chunk_size, chunk_overlap):
            sections.append((chunk, section_name))
    return sections


@dataclass
class ChunkingRule:
    strategy: str  # "sliding_window" | "section"
    chunk_size: int
    chunk_overlap: int
    section_tags: list[str] | None = None


def match_chunking_rule(relative_path: str, rules: dict[str, ChunkingRule]) -> ChunkingRule:
    for pattern, rule in rules.items():
        if pattern == "default":
            continue
        if fnmatch.fnmatch(relative_path, pattern):
            return rule
    return rules.get("default", ChunkingRule(strategy="sliding_window", chunk_size=500, chunk_overlap=100))


def chunk_file(
    file_path: Path,
    vault_root: Path,
    rules: dict[str, ChunkingRule],
    modified_at: float | None = None,
) -> list[Chunk]:
    relative = str(file_path.relative_to(vault_root)).replace("\\", "/")
    source_directory = relative.split("/")[0] if "/" in relative else "."
    text = file_path.read_text(encoding="utf-8")
    rule = match_chunking_rule(relative, rules)

    if rule.strategy == "section":
        raw_chunks = section_based_chunks(text, rule.section_tags, rule.chunk_size, rule.chunk_overlap)
    else:
        raw_chunks = [(chunk, None) for chunk in sliding_window_chunks(text, rule.chunk_size, rule.chunk_overlap)]

    total = len(raw_chunks)
    date = _parse_date_from_filename(file_path.name)
    result: list[Chunk] = []
    for idx, (chunk_text, section_name) in enumerate(raw_chunks):
        tags = _extract_tags(chunk_text)
        result.append(
            Chunk(
                text=chunk_text,
                source_file=relative,
                source_directory=source_directory,
                section=section_name,
                date=date,
                tags=tags,
                chunk_index=idx,
                total_chunks=total,
                modified_at=modified_at,
                rule_applied=rule.strategy,
            )
        )
    return result
  • Step 3: Run sliding window test

Run:

pytest tests/test_chunker.py::test_sliding_window_basic -v

Expected: PASS.

  • Step 4: Add section-based chunker test
# Append to tests/test_chunker.py
from src.rag.chunker import section_based_chunks, chunk_file, ChunkingRule
import tempfile
from pathlib import Path


def test_section_based_chunks_splits_on_tags():
    text = "#DayInShort: good day\n#mentalhealth: stressed\n#work: busy"
    chunks = section_based_chunks(text, ["#DayInShort", "#mentalhealth", "#work"], chunk_size=10, chunk_overlap=2)
    assert len(chunks) == 3
    assert chunks[0][1] == "#DayInShort"
    assert chunks[1][1] == "#mentalhealth"
    assert chunks[2][1] == "#work"


def test_chunk_file_extracts_metadata():
    with tempfile.TemporaryDirectory() as tmp:
        vault = Path(tmp)
        journal = vault / "Journal" / "2026" / "04" / "2026-04-12.md"
        journal.parent.mkdir(parents=True)
        journal.write_text("#DayInShort: good day\n#Relations: [[Person/Vinay]] visited.", encoding="utf-8")
        rules = {
            "default": ChunkingRule(strategy="sliding_window", chunk_size=500, chunk_overlap=100),
            "Journal/**": ChunkingRule(strategy="section", chunk_size=300, chunk_overlap=50, section_tags=["#DayInShort", "#Relations"]),
        }
        chunks = chunk_file(journal, vault, rules, modified_at=1234567890.0)
        assert len(chunks) == 2
        assert chunks[0].source_directory == "Journal"
        assert chunks[0].date == "2026-04-12"
        assert "Person/Vinay" in (chunks[1].tags or [])

Run:

pytest tests/test_chunker.py -v

Expected: PASS.

  • Step 5: Commit
git add src/rag/chunker.py tests/test_chunker.py
git commit -m "feat: add markdown chunker with sliding window and section strategies"

Task 4: Ollama Embedder

Files:

  • Create: src/rag/embedder.py

  • Create: tests/test_embedder.py

  • Step 1: Write failing test for embedder

# tests/test_embedder.py
import pytest
import respx
from httpx import Response

from src.rag.embedder import OllamaEmbedder


@respx.mock
def test_embed_single_text():
    route = respx.post("http://localhost:11434/api/embeddings").mock(
        return_value=Response(200, json={"embedding": [0.1] * 1024})
    )
    embedder = OllamaEmbedder(base_url="http://localhost:11434", model="mxbai-embed-large")
    result = embedder.embed(["hello world"])
    assert len(result) == 1
    assert len(result[0]) == 1024
    assert route.called

Run:

pytest tests/test_embedder.py -v

Expected: FAIL — src.rag.embedder not found.

  • Step 2: Implement src/rag/embedder.py
# src/rag/embedder.py
from __future__ import annotations

import time
from typing import Iterable

import requests


class OllamaEmbedder:
    def __init__(self, base_url: str, model: str, batch_size: int = 32):
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.batch_size = batch_size

    def embed(self, texts: list[str], retries: int = 3, backoff: float = 1.0) -> list[list[float]]:
        results: list[list[float]] = []
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i : i + self.batch_size]
            batch_results = self._embed_batch(batch, retries, backoff)
            results.extend(batch_results)
        return results

    def _embed_batch(self, texts: list[str], retries: int, backoff: float) -> list[list[float]]:
        last_exception: Exception | None = None
        for attempt in range(retries):
            try:
                response = requests.post(
                    f"{self.base_url}/api/embeddings",
                    json={"model": self.model, "prompt": texts[0]},
                    timeout=120,
                )
                response.raise_for_status()
                data = response.json()
                embedding = data.get("embedding")
                if not embedding or not isinstance(embedding, list):
                    raise ValueError(f"Invalid response from Ollama: {data}")
                return [embedding]
            except Exception as exc:
                last_exception = exc
                if attempt < retries - 1:
                    time.sleep(backoff * (2 ** attempt))
        raise RuntimeError(f"Ollama embedding failed after {retries} attempts") from last_exception

Wait — the above only handles single text per call because Ollama /api/embeddings (plural) endpoint is different. Let me check. Actually Ollama has /api/embed which takes multiple inputs. But the spec says via Ollama. Let's use /api/embed which supports batching.

# src/rag/embedder.py (revised)
from __future__ import annotations

import time

import requests


class OllamaEmbedder:
    def __init__(self, base_url: str, model: str, batch_size: int = 32):
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.batch_size = batch_size

    def embed(self, texts: list[str], retries: int = 3, backoff: float = 1.0) -> list[list[float]]:
        results: list[list[float]] = []
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i : i + self.batch_size]
            batch_results = self._embed_batch(batch, retries, backoff)
            results.extend(batch_results)
        return results

    def _embed_batch(self, texts: list[str], retries: int, backoff: float) -> list[list[float]]:
        last_exception: Exception | None = None
        for attempt in range(retries):
            try:
                response = requests.post(
                    f"{self.base_url}/api/embed",
                    json={"model": self.model, "input": texts},
                    timeout=300,
                )
                response.raise_for_status()
                data = response.json()
                embeddings = data.get("embeddings")
                if not embeddings or not isinstance(embeddings, list):
                    raise ValueError(f"Invalid response from Ollama: {data}")
                return embeddings
            except Exception as exc:
                last_exception = exc
                if attempt < retries - 1:
                    time.sleep(backoff * (2 ** attempt))
        raise RuntimeError(f"Ollama embedding failed after {retries} attempts") from last_exception

And update the test:

# tests/test_embedder.py (revised)
import pytest
import respx
from httpx import Response

from src.rag.embedder import OllamaEmbedder


@respx.mock
def test_embed_batch():
    route = respx.post("http://localhost:11434/api/embed").mock(
        return_value=Response(200, json={"embeddings": [[0.1] * 1024, [0.2] * 1024]})
    )
    embedder = OllamaEmbedder(base_url="http://localhost:11434", model="mxbai-embed-large", batch_size=2)
    result = embedder.embed(["hello world", "goodbye world"])
    assert len(result) == 2
    assert len(result[0]) == 1024
    assert result[0][0] == 0.1
    assert result[1][0] == 0.2
    assert route.called
  • Step 3: Write the revised embedder and test

Write src/rag/embedder.py with the revised code above. Write tests/test_embedder.py with the revised test above.

  • Step 4: Run test

Run:

pytest tests/test_embedder.py -v

Expected: PASS.

  • Step 5: Commit
git add src/rag/embedder.py tests/test_embedder.py
git commit -m "feat: add Ollama embedder with batching and retries"

Task 5: LanceDB Vector Store

Files:

  • Create: src/rag/vector_store.py

  • Create: tests/test_vector_store.py

  • Step 1: Write failing test for vector store init and upsert

# tests/test_vector_store.py
import tempfile
from pathlib import Path

import pyarrow as pa
import pytest

from src.rag.vector_store import VectorStore


def test_vector_store_upsert_and_search():
    with tempfile.TemporaryDirectory() as tmp:
        store = VectorStore(uri=tmp, dimensions=4)
        store.upsert(
            ids=["a", "b"],
            texts=["hello world", "goodbye world"],
            embeddings=[[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0]],
            metadatas=[
                {"source_file": "a.md", "source_directory": "docs"},
                {"source_file": "b.md", "source_directory": "docs"},
            ],
        )
        results = store.search(query_vector=[1.0, 0.0, 0.0, 0.0], top_k=1)
        assert len(results) == 1
        assert results[0]["source_file"] == "a.md"

Run:

pytest tests/test_vector_store.py -v

Expected: FAIL — module not found.

  • Step 2: Implement src/rag/vector_store.py
# src/rag/vector_store.py
from __future__ import annotations

import uuid
from pathlib import Path
from typing import Any

import lancedb
import numpy as np
import pyarrow as pa


class VectorStore:
    TABLE_NAME = "chunks"

    def __init__(self, uri: str | Path, dimensions: int):
        self.uri = str(uri)
        self.dimensions = dimensions
        self.db = lancedb.connect(self.uri)
        self.table = self._get_or_create_table()

    def _get_or_create_table(self):
        try:
            return self.db.open_table(self.TABLE_NAME)
        except Exception:
            schema = pa.schema([
                pa.field("id", pa.string()),
                pa.field("text", pa.string()),
                pa.field("vector", pa.list_(pa.float32(), self.dimensions)),
                pa.field("source_file", pa.string()),
                pa.field("source_directory", pa.string()),
                pa.field("section", pa.string(), nullable=True),
                pa.field("date", pa.string(), nullable=True),
                pa.field("tags", pa.list_(pa.string()), nullable=True),
                pa.field("chunk_index", pa.int32()),
                pa.field("total_chunks", pa.int32()),
                pa.field("modified_at", pa.float64(), nullable=True),
                pa.field("rule_applied", pa.string()),
            ])
            return self.db.create_table(self.TABLE_NAME, schema=schema)

    def upsert(
        self,
        ids: list[str],
        texts: list[str],
        embeddings: list[list[float]],
        metadatas: list[dict[str, Any]],
    ) -> None:
        data = []
        for id_, text, vector, meta in zip(ids, texts, embeddings, metadatas):
            row = {
                "id": id_,
                "text": text,
                "vector": np.array(vector, dtype=np.float32),
                "source_file": meta.get("source_file", ""),
                "source_directory": meta.get("source_directory", ""),
                "section": meta.get("section"),
                "date": meta.get("date"),
                "tags": meta.get("tags") or [],
                "chunk_index": meta.get("chunk_index", 0),
                "total_chunks": meta.get("total_chunks", 1),
                "modified_at": meta.get("modified_at"),
                "rule_applied": meta.get("rule_applied", "default"),
            }
            data.append(row)
        self.table.merge_insert("id") \
            .when_matched_update_all() \
            .when_not_matched_insert_all() \
            .execute(data)

    def delete_by_source_file(self, source_file: str) -> None:
        self.table.delete(f'source_file = "{source_file}"')

    def search(
        self,
        query_vector: list[float],
        top_k: int = 8,
        filters: dict[str, Any] | None = None,
    ) -> list[dict[str, Any]]:
        query = self.table.search(np.array(query_vector, dtype=np.float32))
        if filters:
            expr_parts = []
            for key, value in filters.items():
                if isinstance(value, list):
                    quoted = [f'"{v}"' for v in value]
                    expr_parts.append(f"{key} IN ({', '.join(quoted)})")
                elif isinstance(value, str):
                    expr_parts.append(f'{key} = "{value}"')
                else:
                    expr_parts.append(f"{key} = {value}")
            if expr_parts:
                query = query.where(" AND ".join(expr_parts))
        results = query.limit(top_k).to_list()
        return results

    def count(self) -> int:
        return self.table.count_rows()
  • Step 3: Run test

Run:

pytest tests/test_vector_store.py -v

Expected: PASS.

  • Step 4: Commit
git add src/rag/vector_store.py tests/test_vector_store.py
git commit -m "feat: add LanceDB vector store with upsert, delete, and search"

Task 6: Indexer Orchestrator

Files:

  • Create: src/rag/indexer.py

  • Create: tests/test_indexer.py

  • Step 1: Write failing end-to-end indexer test

# tests/test_indexer.py
import tempfile
from pathlib import Path

from src.config import Config, VaultConfig, IndexingConfig, RagConfig, EmbeddingConfig, VectorStoreConfig, SearchConfig, HybridSearchConfig, FiltersConfig
from src.rag.indexer import Indexer
from src.rag.vector_store import VectorStore


def _make_config(vault_path: Path, vector_store_path: Path) -> Config:
    return Config(
        companion=None,  # not used
        vault=VaultConfig(
            path=str(vault_path),
            indexing=IndexingConfig(
                auto_sync=False,
                auto_sync_interval_minutes=1440,
                watch_fs_events=False,
                file_patterns=["*.md"],
                deny_dirs=[".git"],
                deny_patterns=[".*"],
            ),
            chunking_rules={},
        ),
        rag=RagConfig(
            embedding=EmbeddingConfig(
                provider="ollama",
                model="dummy",
                base_url="http://localhost:11434",
                dimensions=4,
                batch_size=2,
            ),
            vector_store=VectorStoreConfig(type="lancedb", path=str(vector_store_path)),
            search=SearchConfig(
                default_top_k=8,
                max_top_k=20,
                similarity_threshold=0.75,
                hybrid_search=HybridSearchConfig(enabled=False, keyword_weight=0.3, semantic_weight=0.7),
                filters=FiltersConfig(date_range_enabled=True, tag_filter_enabled=True, directory_filter_enabled=True),
            ),
        ),
        model=None,
        api=None,
        ui=None,
        logging=None,
        security=None,
    )


def test_full_index_creates_vectors():
    with tempfile.TemporaryDirectory() as tmp:
        vault = Path(tmp) / "vault"
        vault.mkdir()
        (vault / "hello.md").write_text("hello world", encoding="utf-8")
        vs_path = Path(tmp) / "vectors"
        config = _make_config(vault, vs_path)
        store = VectorStore(uri=vs_path, dimensions=4)
        indexer = Indexer(config, store)
        indexer.full_index()
        assert store.count() == 1

Run:

pytest tests/test_indexer.py::test_full_index_creates_vectors -v

Expected: FAIL — src.rag.indexer not found.

  • Step 2: Implement src/rag/indexer.py
# src/rag/indexer.py
from __future__ import annotations

import fnmatch
import os
from pathlib import Path
from typing import Iterable

from src.config import Config
from src.rag.chunker import Chunk, ChunkingRule, chunk_file
from src.rag.embedder import OllamaEmbedder
from src.rag.vector_store import VectorStore


class Indexer:
    def __init__(self, config: Config, vector_store: VectorStore):
        self.config = config
        self.vector_store = vector_store
        self.embedder = OllamaEmbedder(
            base_url=config.rag.embedding.base_url,
            model=config.rag.embedding.model,
            batch_size=config.rag.embedding.batch_size,
        )
        self.vault_path = Path(config.vault.path)
        self._chunking_rules = self._load_chunking_rules()

    def _load_chunking_rules(self) -> dict[str, ChunkingRule]:
        rules = {"default": ChunkingRule(strategy="sliding_window", chunk_size=500, chunk_overlap=100)}
        for pattern, rule in self.config.vault.chunking_rules.items():
            rules[pattern] = ChunkingRule(
                strategy=rule.strategy,
                chunk_size=rule.chunk_size,
                chunk_overlap=rule.chunk_overlap,
                section_tags=rule.section_tags,
            )
        return rules

    def _should_index(self, relative_path: str) -> bool:
        parts = relative_path.split("/")
        for deny in self.config.vault.indexing.deny_dirs:
            if deny in parts:
                return False
        for pattern in self.config.vault.indexing.deny_patterns:
            if fnmatch.fnmatch(Path(relative_path).name, pattern):
                return False
        matched = False
        for pattern in self.config.vault.indexing.file_patterns:
            if fnmatch.fnmatch(Path(relative_path).name, pattern):
                matched = True
                break
        return matched

    def _list_files(self) -> Iterable[Path]:
        for root, dirs, files in os.walk(self.vault_path):
            # prune denied dirs
            dirs[:] = [d for d in dirs if d not in self.config.vault.indexing.deny_dirs]
            for f in files:
                file_path = Path(root) / f
                relative = str(file_path.relative_to(self.vault_path)).replace("\\", "/")
                if self._should_index(relative):
                    yield file_path

    def full_index(self) -> None:
        # Clear existing data for simplicity in full reindex
        try:
            self.vector_store.db.drop_table(VectorStore.TABLE_NAME)
        except Exception:
            pass
        self.vector_store.table = self.vector_store._get_or_create_table()
        self._index_files(self._list_files())

    def sync(self) -> None:
        files_to_process = []
        for file_path in self._list_files():
            relative = str(file_path.relative_to(self.vault_path)).replace("\\", "/")
            mtime = file_path.stat().st_mtime
            # Check if already indexed with same mtime
            existing = self.vector_store.table.search().where(f'source_file = "{relative}"').limit(1).to_list()
            if not existing or existing[0].get("modified_at") != mtime:
                files_to_process.append(file_path)
        # Delete old entries for files being reprocessed
        for file_path in files_to_process:
            relative = str(file_path.relative_to(self.vault_path)).replace("\\", "/")
            self.vector_store.delete_by_source_file(relative)
        self._index_files(files_to_process)

    def _index_files(self, file_paths: Iterable[Path]) -> None:
        all_chunks: list[Chunk] = []
        for file_path in file_paths:
            mtime = file_path.stat().st_mtime
            chunks = chunk_file(file_path, self.vault_path, self._chunking_rules, modified_at=mtime)
            all_chunks.extend(chunks)
        if not all_chunks:
            return
        texts = [c.text for c in all_chunks]
        embeddings = self.embedder.embed(texts)
        ids = [f"{c.source_file}::{c.chunk_index}" for c in all_chunks]
        metadatas = [
            {
                "source_file": c.source_file,
                "source_directory": c.source_directory,
                "section": c.section,
                "date": c.date,
                "tags": c.tags,
                "chunk_index": c.chunk_index,
                "total_chunks": c.total_chunks,
                "modified_at": c.modified_at,
                "rule_applied": c.rule_applied,
            }
            for c in all_chunks
        ]
        self.vector_store.upsert(ids=ids, texts=texts, embeddings=embeddings, metadatas=metadatas)

    def status(self) -> dict:
        total_docs = self.vector_store.count()
        indexed_files = set()
        try:
            results = self.vector_store.table.to_lance().to_table(columns=["source_file", "modified_at"]).to_pylist()
            for row in results:
                indexed_files.add((row["source_file"], row.get("modified_at")))
        except Exception:
            pass
        unindexed = 0
        for file_path in self._list_files():
            relative = str(file_path.relative_to(self.vault_path)).replace("\\", "/")
            mtime = file_path.stat().st_mtime
            if (relative, mtime) not in indexed_files:
                unindexed += 1
        return {
            "total_chunks": total_docs,
            "indexed_files": len(indexed_files),
            "unindexed_files": unindexed,
        }

Wait, there's an issue: the test uses Config(companion=None, ...) but Pydantic models won't accept None if the field is required. I need to make the test valid. Let me check the Config model — companion, vault, rag are required and typed. model, api, ui, logging, security are also required. So the test needs full config or I should make the indexer test simpler by passing only what it needs. But the indexer constructor takes Config. Let me fix the test to build a minimal valid Config.

Actually, I can just build a full minimal Config. That's tedious but necessary. Let me rewrite the test helper:

# tests/test_indexer.py (revised helper)
def _make_config(vault_path: Path, vector_store_path: Path) -> Config:
    from src.config import (
        CompanionConfig, PersonaConfig, MemoryConfig, ChatConfig,
        ModelConfig, InferenceConfig, FineTuningConfig, RetrainScheduleConfig,
        ApiConfig, AuthConfig, UiConfig, WebConfig, WebFeaturesConfig, CliConfig,
        LoggingConfig, SecurityConfig,
    )
    return Config(
        companion=CompanionConfig(
            name="SAN",
            persona=PersonaConfig(role="companion", tone="reflective", style="questioning", boundaries=[]),
            memory=MemoryConfig(session_turns=20, persistent_store="", summarize_after=10),
            chat=ChatConfig(streaming=True, max_response_tokens=2048, default_temperature=0.7, allow_temperature_override=True),
        ),
        vault=VaultConfig(
            path=str(vault_path),
            indexing=IndexingConfig(
                auto_sync=False,
                auto_sync_interval_minutes=1440,
                watch_fs_events=False,
                file_patterns=["*.md"],
                deny_dirs=[".git"],
                deny_patterns=[".*"],
            ),
            chunking_rules={},
        ),
        rag=RagConfig(
            embedding=EmbeddingConfig(
                provider="ollama",
                model="dummy",
                base_url="http://localhost:11434",
                dimensions=4,
                batch_size=2,
            ),
            vector_store=VectorStoreConfig(type="lancedb", path=str(vector_store_path)),
            search=SearchConfig(
                default_top_k=8,
                max_top_k=20,
                similarity_threshold=0.75,
                hybrid_search=HybridSearchConfig(enabled=False, keyword_weight=0.3, semantic_weight=0.7),
                filters=FiltersConfig(date_range_enabled=True, tag_filter_enabled=True, directory_filter_enabled=True),
            ),
        ),
        model=ModelConfig(
            inference=InferenceConfig(backend="llama.cpp", model_path="", context_length=8192, gpu_layers=35, batch_size=512, threads=8),
            fine_tuning=FineTuningConfig(base_model="", output_dir="", lora_rank=16, lora_alpha=32, learning_rate=0.0002, batch_size=4, gradient_accumulation_steps=4, num_epochs=3, warmup_steps=100, save_steps=500, eval_steps=250, training_data_path="", validation_split=0.1),
            retrain_schedule=RetrainScheduleConfig(auto_reminder=True, default_interval_days=90, reminder_channels=[]),
        ),
        api=ApiConfig(host="127.0.0.1", port=7373, cors_origins=[], auth=AuthConfig(enabled=False)),
        ui=UiConfig(web=WebConfig(enabled=True, theme="obsidian", features=WebFeaturesConfig(streaming=True, citations=True, source_preview=True)), cli=CliConfig(enabled=True, rich_output=True)),
        logging=LoggingConfig(level="INFO", file="", max_size_mb=100, backup_count=5),
        security=SecurityConfig(local_only=True, vault_path_traversal_check=True, sensitive_content_detection=True, sensitive_patterns=[], require_confirmation_for_external_apis=True),
    )

This is verbose but correct. Now, there's another issue: the indexer calls self.embedder.embed(texts) which will try to hit Ollama. In tests we need to mock it. Let me add a mock embedder injection or monkeypatch. Actually, the simplest approach is to mock OllamaEmbedder.embed in the test.

Revised test:

# tests/test_indexer.py (revised)
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

from src.config import Config, VaultConfig, IndexingConfig, RagConfig, EmbeddingConfig, VectorStoreConfig, SearchConfig, HybridSearchConfig, FiltersConfig
from src.config import (
    CompanionConfig, PersonaConfig, MemoryConfig, ChatConfig,
    ModelConfig, InferenceConfig, FineTuningConfig, RetrainScheduleConfig,
    ApiConfig, AuthConfig, UiConfig, WebConfig, WebFeaturesConfig, CliConfig,
    LoggingConfig, SecurityConfig,
)
from src.rag.indexer import Indexer
from src.rag.vector_store import VectorStore


def _make_config(vault_path: Path, vector_store_path: Path) -> Config:
    return Config(
        companion=CompanionConfig(
            name="SAN",
            persona=PersonaConfig(role="companion", tone="reflective", style="questioning", boundaries=[]),
            memory=MemoryConfig(session_turns=20, persistent_store="", summarize_after=10),
            chat=ChatConfig(streaming=True, max_response_tokens=2048, default_temperature=0.7, allow_temperature_override=True),
        ),
        vault=VaultConfig(
            path=str(vault_path),
            indexing=IndexingConfig(
                auto_sync=False,
                auto_sync_interval_minutes=1440,
                watch_fs_events=False,
                file_patterns=["*.md"],
                deny_dirs=[".git"],
                deny_patterns=[".*"],
            ),
            chunking_rules={},
        ),
        rag=RagConfig(
            embedding=EmbeddingConfig(
                provider="ollama",
                model="dummy",
                base_url="http://localhost:11434",
                dimensions=4,
                batch_size=2,
            ),
            vector_store=VectorStoreConfig(type="lancedb", path=str(vector_store_path)),
            search=SearchConfig(
                default_top_k=8,
                max_top_k=20,
                similarity_threshold=0.75,
                hybrid_search=HybridSearchConfig(enabled=False, keyword_weight=0.3, semantic_weight=0.7),
                filters=FiltersConfig(date_range_enabled=True, tag_filter_enabled=True, directory_filter_enabled=True),
            ),
        ),
        model=ModelConfig(
            inference=InferenceConfig(backend="llama.cpp", model_path="", context_length=8192, gpu_layers=35, batch_size=512, threads=8),
            fine_tuning=FineTuningConfig(base_model="", output_dir="", lora_rank=16, lora_alpha=32, learning_rate=0.0002, batch_size=4, gradient_accumulation_steps=4, num_epochs=3, warmup_steps=100, save_steps=500, eval_steps=250, training_data_path="", validation_split=0.1),
            retrain_schedule=RetrainScheduleConfig(auto_reminder=True, default_interval_days=90, reminder_channels=[]),
        ),
        api=ApiConfig(host="127.0.0.1", port=7373, cors_origins=[], auth=AuthConfig(enabled=False)),
        ui=UiConfig(web=WebConfig(enabled=True, theme="obsidian", features=WebFeaturesConfig(streaming=True, citations=True, source_preview=True)), cli=CliConfig(enabled=True, rich_output=True)),
        logging=LoggingConfig(level="INFO", file="", max_size_mb=100, backup_count=5),
        security=SecurityConfig(local_only=True, vault_path_traversal_check=True, sensitive_content_detection=True, sensitive_patterns=[], require_confirmation_for_external_apis=True),
    )


@patch("src.rag.indexer.OllamaEmbedder")
def test_full_index_creates_vectors(mock_embedder_cls):
    mock_embedder = MagicMock()
    mock_embedder.embed.return_value = [[1.0, 0.0, 0.0, 0.0]]
    mock_embedder_cls.return_value = mock_embedder

    with tempfile.TemporaryDirectory() as tmp:
        vault = Path(tmp) / "vault"
        vault.mkdir()
        (vault / "hello.md").write_text("hello world", encoding="utf-8")
        vs_path = Path(tmp) / "vectors"
        config = _make_config(vault, vs_path)
        store = VectorStore(uri=vs_path, dimensions=4)
        indexer = Indexer(config, store)
        indexer.full_index()
        assert store.count() == 1
  • Step 3: Write the revised indexer and test

Write src/rag/indexer.py and tests/test_indexer.py with the code above.

  • Step 4: Run test

Run:

pytest tests/test_indexer.py -v

Expected: PASS.

  • Step 5: Commit
git add src/rag/indexer.py tests/test_indexer.py
git commit -m "feat: add indexer orchestrator with full index, sync, and status"

Task 7: Search Interface

Files:

  • Create: src/rag/search.py

  • Create: tests/test_search.py

  • Step 1: Write failing search test

# tests/test_search.py
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

from src.rag.search import SearchEngine
from src.rag.vector_store import VectorStore


@patch("src.rag.search.OllamaEmbedder")
def test_search_returns_results(mock_embedder_cls):
    mock_embedder = MagicMock()
    mock_embedder.embed.return_value = [[1.0, 0.0, 0.0, 0.0]]
    mock_embedder_cls.return_value = mock_embedder

    with tempfile.TemporaryDirectory() as tmp:
        store = VectorStore(uri=tmp, dimensions=4)
        store.upsert(
            ids=["a"],
            texts=["hello world"],
            embeddings=[[1.0, 0.0, 0.0, 0.0]],
            metadatas=[{"source_file": "a.md", "source_directory": "docs"}],
        )
        engine = SearchEngine(
            vector_store=store,
            embedder_base_url="http://localhost:11434",
            embedder_model="dummy",
            default_top_k=5,
            similarity_threshold=0.0,
            hybrid_search_enabled=False,
        )
        results = engine.search("hello")
        assert len(results) == 1
        assert results[0]["source_file"] == "a.md"

Run:

pytest tests/test_search.py -v

Expected: FAIL — src.rag.search not found.

  • Step 2: Implement src/rag/search.py
# src/rag/search.py
from __future__ import annotations

from typing import Any

from src.rag.embedder import OllamaEmbedder
from src.rag.vector_store import VectorStore


class SearchEngine:
    def __init__(
        self,
        vector_store: VectorStore,
        embedder_base_url: str,
        embedder_model: str,
        default_top_k: int,
        similarity_threshold: float,
        hybrid_search_enabled: bool,
        keyword_weight: float = 0.3,
        semantic_weight: float = 0.7,
    ):
        self.vector_store = vector_store
        self.embedder = OllamaEmbedder(base_url=embedder_base_url, model=embedder_model)
        self.default_top_k = default_top_k
        self.similarity_threshold = similarity_threshold
        self.hybrid_search_enabled = hybrid_search_enabled
        self.keyword_weight = keyword_weight
        self.semantic_weight = semantic_weight

    def search(
        self,
        query: str,
        top_k: int | None = None,
        filters: dict[str, Any] | None = None,
    ) -> list[dict[str, Any]]:
        k = top_k or self.default_top_k
        query_embedding = self.embedder.embed([query])[0]
        results = self.vector_store.search(query_embedding, top_k=k, filters=filters)
        if self.similarity_threshold > 0 and results:
            # LanceDB returns `_distance`; cosine distance threshold logic
            results = [r for r in results if r.get("_distance", float("inf")) <= self.similarity_threshold]
        return results
  • Step 3: Run test

Run:

pytest tests/test_search.py -v

Expected: PASS.

  • Step 4: Commit
git add src/rag/search.py tests/test_search.py
git commit -m "feat: add search engine interface with embedding and filtering"

Task 8: Indexer CLI

Files:

  • Create: src/indexer_daemon/cli.py

  • Step 1: Implement CLI without test (simple Typer app)

# src/indexer_daemon/cli.py
from __future__ import annotations

from pathlib import Path

import typer

from src.config import load_config
from src.rag.indexer import Indexer
from src.rag.vector_store import VectorStore

app = typer.Typer(help="Companion vault indexer")


def _get_indexer() -> Indexer:
    config = load_config("config.json")
    store = VectorStore(uri=config.rag.vector_store.path, dimensions=config.rag.embedding.dimensions)
    return Indexer(config, store)


@app.command()
def index() -> None:
    """Run a full index of the vault."""
    indexer = _get_indexer()
    typer.echo("Running full index...")
    indexer.full_index()
    typer.echo(f"Done. Total chunks: {indexer.status()['total_chunks']}")


@app.command()
def sync() -> None:
    """Run an incremental sync."""
    indexer = _get_indexer()
    typer.echo("Running incremental sync...")
    indexer.sync()
    typer.echo(f"Done. Total chunks: {indexer.status()['total_chunks']}")


@app.command()
def reindex() -> None:
    """Force a full reindex (same as index)."""
    index()


@app.command()
def status() -> None:
    """Show indexer status."""
    indexer = _get_indexer()
    s = indexer.status()
    typer.echo(f"Total chunks: {s['total_chunks']}")
    typer.echo(f"Indexed files: {s['indexed_files']}")
    typer.echo(f"Unindexed files: {s['unindexed_files']}")


if __name__ == "__main__":
    app()
  • Step 2: Verify CLI loads

Run:

python -m src.indexer_daemon.cli --help

Expected: Shows Typer help with index, sync, reindex, status commands.

  • Step 3: Commit
git add src/indexer_daemon/cli.py
git commit -m "feat: add indexer CLI with index, sync, reindex, status commands"

Task 9: File System Watcher

Files:

  • Create: src/indexer_daemon/watcher.py

  • Step 1: Implement watcher without test (watchdog integration)

# src/indexer_daemon/watcher.py
from __future__ import annotations

import time
from pathlib import Path

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from src.config import load_config
from src.rag.indexer import Indexer
from src.rag.vector_store import VectorStore


class VaultEventHandler(FileSystemEventHandler):
    def __init__(self, indexer: Indexer, debounce_seconds: float = 5.0):
        self.indexer = indexer
        self.debounce_seconds = debounce_seconds
        self._last_sync = 0.0

    def on_any_event(self, event):
        if event.is_directory:
            return
        if not event.src_path.endswith(".md"):
            return
        now = time.time()
        if now - self._last_sync < self.debounce_seconds:
            return
        self._last_sync = now
        try:
            self.indexer.sync()
        except Exception as exc:
            print(f"Sync failed: {exc}")


def start_watcher(config_path: str = "config.json") -> None:
    config = load_config(config_path)
    store = VectorStore(uri=config.rag.vector_store.path, dimensions=config.rag.embedding.dimensions)
    indexer = Indexer(config, store)
    handler = VaultEventHandler(indexer)
    observer = Observer()
    observer.schedule(handler, str(config.vault.path), recursive=True)
    observer.start()
    print(f"Watching {config.vault.path} for changes...")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()


if __name__ == "__main__":
    start_watcher()
  • Step 2: Verify watcher module imports cleanly

Run:

python -c "from src.indexer_daemon.watcher import start_watcher; print('OK')"

Expected: Prints OK.

  • Step 3: Commit
git add src/indexer_daemon/watcher.py
git commit -m "feat: add vault file system watcher with debounced sync"

Task 10: Integration Test — End-to-End Sync

Files:

  • Create: tests/test_integration.py

  • Step 1: Write integration test

# tests/test_integration.py
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

from src.config import Config, VaultConfig, IndexingConfig, RagConfig, EmbeddingConfig, VectorStoreConfig, SearchConfig, HybridSearchConfig, FiltersConfig
from src.config import (
    CompanionConfig, PersonaConfig, MemoryConfig, ChatConfig,
    ModelConfig, InferenceConfig, FineTuningConfig, RetrainScheduleConfig,
    ApiConfig, AuthConfig, UiConfig, WebConfig, WebFeaturesConfig, CliConfig,
    LoggingConfig, SecurityConfig,
)
from src.rag.indexer import Indexer
from src.rag.search import SearchEngine
from src.rag.vector_store import VectorStore


def _make_config(vault_path: Path, vector_store_path: Path) -> Config:
    return Config(
        companion=CompanionConfig(
            name="SAN",
            persona=PersonaConfig(role="companion", tone="reflective", style="questioning", boundaries=[]),
            memory=MemoryConfig(session_turns=20, persistent_store="", summarize_after=10),
            chat=ChatConfig(streaming=True, max_response_tokens=2048, default_temperature=0.7, allow_temperature_override=True),
        ),
        vault=VaultConfig(
            path=str(vault_path),
            indexing=IndexingConfig(
                auto_sync=False,
                auto_sync_interval_minutes=1440,
                watch_fs_events=False,
                file_patterns=["*.md"],
                deny_dirs=[".git"],
                deny_patterns=[".*"],
            ),
            chunking_rules={},
        ),
        rag=RagConfig(
            embedding=EmbeddingConfig(
                provider="ollama",
                model="dummy",
                base_url="http://localhost:11434",
                dimensions=4,
                batch_size=2,
            ),
            vector_store=VectorStoreConfig(type="lancedb", path=str(vector_store_path)),
            search=SearchConfig(
                default_top_k=8,
                max_top_k=20,
                similarity_threshold=0.0,
                hybrid_search=HybridSearchConfig(enabled=False, keyword_weight=0.3, semantic_weight=0.7),
                filters=FiltersConfig(date_range_enabled=True, tag_filter_enabled=True, directory_filter_enabled=True),
            ),
        ),
        model=ModelConfig(
            inference=InferenceConfig(backend="llama.cpp", model_path="", context_length=8192, gpu_layers=35, batch_size=512, threads=8),
            fine_tuning=FineTuningConfig(base_model="", output_dir="", lora_rank=16, lora_alpha=32, learning_rate=0.0002, batch_size=4, gradient_accumulation_steps=4, num_epochs=3, warmup_steps=100, save_steps=500, eval_steps=250, training_data_path="", validation_split=0.1),
            retrain_schedule=RetrainScheduleConfig(auto_reminder=True, default_interval_days=90, reminder_channels=[]),
        ),
        api=ApiConfig(host="127.0.0.1", port=7373, cors_origins=[], auth=AuthConfig(enabled=False)),
        ui=UiConfig(web=WebConfig(enabled=True, theme="obsidian", features=WebFeaturesConfig(streaming=True, citations=True, source_preview=True)), cli=CliConfig(enabled=True, rich_output=True)),
        logging=LoggingConfig(level="INFO", file="", max_size_mb=100, backup_count=5),
        security=SecurityConfig(local_only=True, vault_path_traversal_check=True, sensitive_content_detection=True, sensitive_patterns=[], require_confirmation_for_external_apis=True),
    )


@patch("src.rag.search.OllamaEmbedder")
@patch("src.rag.indexer.OllamaEmbedder")
def test_index_and_search_flow(mock_indexer_embedder, mock_search_embedder):
    mock_embed = MagicMock()
    mock_embed.embed.return_value = [[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0]]
    mock_indexer_embedder.return_value = mock_embed
    mock_search_embedder.return_value = mock_embed

    with tempfile.TemporaryDirectory() as tmp:
        vault = Path(tmp) / "vault"
        vault.mkdir()
        (vault / "note1.md").write_text("hello world", encoding="utf-8")
        (vault / "note2.md").write_text("goodbye world", encoding="utf-8")
        vs_path = Path(tmp) / "vectors"
        config = _make_config(vault, vs_path)
        store = VectorStore(uri=vs_path, dimensions=4)
        indexer = Indexer(config, store)
        indexer.full_index()
        assert store.count() == 2

        engine = SearchEngine(
            vector_store=store,
            embedder_base_url="http://localhost:11434",
            embedder_model="dummy",
            default_top_k=5,
            similarity_threshold=0.0,
            hybrid_search_enabled=False,
        )
        results = engine.search("hello")
        assert len(results) >= 1
        files = {r["source_file"] for r in results}
        assert "note1.md" in files
  • Step 2: Run integration test

Run:

pytest tests/test_integration.py -v

Expected: PASS.

  • Step 3: Commit
git add tests/test_integration.py
git commit -m "test: add end-to-end integration test for index and search"

Plan Summary

This plan delivers a working Vault Indexer + RAG Engine with:

  • Typed config loading with tilde expansion
  • Markdown chunking (sliding window + section-based, per-directory rules)
  • Ollama embedder with batching and retries
  • LanceDB vector store with upsert, delete, search
  • Full and incremental indexing with status tracking
  • CLI commands: index, sync, reindex, status
  • File system watcher with debounced auto-sync
  • Search engine interface for query embedding + filtering
  • Full test coverage for chunker, embedder, vector store, indexer, search, and integration

Spec coverage check:

  • Config schema → Task 2
  • Per-directory chunking rules → Task 3
  • Ollama embeddings → Task 4
  • LanceDB vector store → Task 5
  • Full/sync/reindex/status indexing modes → Tasks 5, 6, 8
  • File system watcher → Task 9
  • Search with filters → Task 7
  • Security (deny_dirs, deny_patterns) → Task 6

No placeholders found.

Type consistency verified: VectorStore.TABLE_NAME, ChunkingRule, OllamaEmbedder.embed signatures match across tasks.


Execution Handoff

Plan complete and saved to docs/superpowers/plans/2026-04-13-personal-companion-ai-phase1.md. Two execution options:

1. Subagent-Driven (recommended) — I dispatch a fresh subagent per task, review between tasks, fast iteration

2. Inline Execution — Execute tasks in this session using executing-plans, batch execution with checkpoints for review

Which approach?