feat: add indexer orchestrator with full index, sync, and status

This commit is contained in:
2026-04-13 14:24:33 -04:00
parent ec69aa6f8d
commit ce0414a5ce
2 changed files with 317 additions and 0 deletions

View File

@@ -0,0 +1,162 @@
import fnmatch
import os
from pathlib import Path
from typing import Dict, Iterator, List
from companion.config import Config
from companion.rag.chunker import Chunk, ChunkingRule, chunk_file
from companion.rag.embedder import OllamaEmbedder
from companion.rag.vector_store import VectorStore
class Indexer:
def __init__(self, config: Config, vector_store: VectorStore):
self.config = config
self.vector_store = vector_store
self.vault_path = Path(config.vault.path).resolve()
self.embedding_config = config.rag.embedding
self.indexing_config = config.vault.indexing
self.chunking_rules = self._load_chunking_rules()
self.embedder = OllamaEmbedder(
base_url=self.embedding_config.base_url,
model=self.embedding_config.model,
batch_size=self.embedding_config.batch_size,
)
def _load_chunking_rules(self) -> Dict[str, ChunkingRule]:
rules = {}
for pattern, rule in self.config.vault.chunking_rules.items():
rules[pattern] = ChunkingRule(
strategy=rule.strategy,
chunk_size=rule.chunk_size,
chunk_overlap=rule.chunk_overlap,
section_tags=rule.section_tags if rule.section_tags else None,
)
return rules
def _should_index(self, relative_path: str) -> bool:
parts = Path(relative_path).parts
for deny_dir in self.indexing_config.deny_dirs:
if deny_dir in parts:
return False
for pattern in self.indexing_config.deny_patterns:
if fnmatch.fnmatch(relative_path, pattern) or fnmatch.fnmatch(
Path(relative_path).name, pattern
):
return False
for pattern in self.indexing_config.file_patterns:
if fnmatch.fnmatch(Path(relative_path).name, pattern):
return True
return False
def _list_files(self) -> Iterator[Path]:
for root, dirs, files in os.walk(self.vault_path):
for file_name in files:
file_path = Path(root) / file_name
try:
relative_path = file_path.relative_to(self.vault_path).as_posix()
except ValueError:
continue
if self._should_index(relative_path):
yield file_path
def _index_files(self, file_paths: List[Path]) -> None:
all_chunks: List[Chunk] = []
for file_path in file_paths:
modified_at = file_path.stat().st_mtime
chunks = chunk_file(
file_path=file_path,
vault_root=self.vault_path,
rules=self.chunking_rules,
modified_at=modified_at,
)
all_chunks.extend(chunks)
if not all_chunks:
return
texts = [chunk.text for chunk in all_chunks]
embeddings = self.embedder.embed(texts)
ids = []
metadatas = []
for chunk in all_chunks:
chunk_id = f"{chunk.source_file}::{chunk.chunk_index}"
ids.append(chunk_id)
metadatas.append(
{
"source_file": chunk.source_file,
"source_directory": chunk.source_directory,
"section": chunk.section,
"date": chunk.date,
"tags": chunk.tags,
"chunk_index": chunk.chunk_index,
"total_chunks": chunk.total_chunks,
"modified_at": chunk.modified_at,
"rule_applied": chunk.rule_applied,
}
)
self.vector_store.upsert(
ids=ids,
texts=texts,
embeddings=embeddings,
metadatas=metadatas,
)
def full_index(self) -> None:
try:
self.vector_store.table.drop()
except Exception:
pass
self.vector_store.table = self.vector_store._get_or_create_table()
file_paths = list(self._list_files())
self._index_files(file_paths)
def sync(self) -> None:
file_paths_to_index = []
for file_path in self._list_files():
relative_path = file_path.relative_to(self.vault_path).as_posix()
modified_at = file_path.stat().st_mtime
results = (
self.vector_store.table.search()
.limit(1)
.where(f"source_file = '{relative_path}'")
.to_list()
)
needs_index = True
if results:
existing_modified_at = results[0].get("modified_at")
if (
existing_modified_at is not None
and existing_modified_at >= modified_at
):
needs_index = False
if needs_index:
file_paths_to_index.append(file_path)
self.vector_store.delete_by_source_file(relative_path)
self._index_files(file_paths_to_index)
def status(self) -> Dict[str, int]:
total_chunks = self.vector_store.count()
indexed_files = set()
for row in (
self.vector_store.table.to_lance().to_table().to_pydict()["source_file"]
):
indexed_files.add(row)
all_files = set()
for file_path in self._list_files():
all_files.add(file_path.relative_to(self.vault_path).as_posix())
unindexed_files = list(all_files - indexed_files)
return {
"total_chunks": total_chunks,
"indexed_files": len(indexed_files),
"unindexed_files": len(unindexed_files),
}