From fc9f975fffafe29e6514ec02ff507307659e6a6a Mon Sep 17 00:00:00 2001 From: Santhosh Janardhanan Date: Mon, 13 Apr 2026 14:06:37 -0400 Subject: [PATCH] feat: add markdown chunker with sliding window and section strategies --- src/companion/rag/__init__.py | 0 src/companion/rag/chunker.py | 168 ++++++++++++++++++++++++++++++++++ tests/test_chunker.py | 41 +++++++++ 3 files changed, 209 insertions(+) create mode 100644 src/companion/rag/__init__.py create mode 100644 src/companion/rag/chunker.py create mode 100644 tests/test_chunker.py diff --git a/src/companion/rag/__init__.py b/src/companion/rag/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/companion/rag/chunker.py b/src/companion/rag/chunker.py new file mode 100644 index 0000000..0a98436 --- /dev/null +++ b/src/companion/rag/chunker.py @@ -0,0 +1,168 @@ +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional, Dict +import re +import fnmatch + + +@dataclass +class Chunk: + text: str + source_file: str = "" + source_directory: str = "" + section: str = "" + date: str = "" + tags: List[str] = field(default_factory=list) + chunk_index: int = 0 + total_chunks: int = 0 + modified_at: float = 0.0 + rule_applied: str = "" + + +@dataclass +class ChunkingRule: + strategy: str + chunk_size: int + chunk_overlap: int + section_tags: Optional[List[str]] = None + + +def sliding_window_chunks(text: str, chunk_size: int, chunk_overlap: int) -> List[str]: + words = text.split() + if not words: + return [] + + chunks = [] + step = chunk_size - chunk_overlap + for i in range(0, len(words), step): + chunk_words = words[i : i + chunk_size] + chunks.append(" ".join(chunk_words)) + if i + chunk_size >= len(words): + break + + return chunks + + +def section_based_chunks( + text: str, section_tags: List[str], chunk_size: int, chunk_overlap: int +) -> List[tuple]: + lines = text.split("\n") + sections = [] + current_tag = None + current_text = [] + + for line in lines: + stripped = line.strip() + matched_tag = None + content = "" + for tag in section_tags: + if stripped.startswith(tag + ":"): + matched_tag = tag + content = stripped[len(tag) + 1 :].strip() + break + elif stripped.startswith(tag): + matched_tag = tag + remainder = stripped[len(tag) :].strip() + if remainder.startswith(":"): + remainder = remainder[1:].strip() + content = remainder + break + + if matched_tag: + if current_tag is not None: + sections.append((current_tag, " ".join(current_text))) + current_tag = matched_tag + current_text = [content] + else: + if current_tag is not None: + current_text.append(stripped) + + if current_tag is not None: + sections.append((current_tag, " ".join(current_text))) + + if not sections: + return [] + + result = [] + for tag, section_text in sections: + sub_chunks = sliding_window_chunks(section_text, chunk_size, chunk_overlap) + for sub_chunk in sub_chunks: + result.append((sub_chunk, tag)) + + return result + + +def match_chunking_rule( + relative_path: str, rules: Dict[str, ChunkingRule] +) -> ChunkingRule: + for pattern, rule in rules.items(): + if pattern == "default": + continue + if fnmatch.fnmatch(relative_path, pattern): + return rule + return rules.get("default") + + +def chunk_file( + file_path: Path, + vault_root: Path, + rules: Dict[str, ChunkingRule], + modified_at: float, +) -> List[Chunk]: + relative_path = file_path.relative_to(vault_root).as_posix() + rule = match_chunking_rule(relative_path, rules) + + text = file_path.read_text(encoding="utf-8") + + date_match = re.search(r"(\d{4}-\d{2}-\d{2})", file_path.stem) + date = date_match.group(1) if date_match else "" + + source_directory = relative_path.split("/")[0] if "/" in relative_path else "" + source_file = relative_path + + if rule.strategy == "section" and rule.section_tags: + raw_chunks = section_based_chunks( + text, rule.section_tags, rule.chunk_size, rule.chunk_overlap + ) + chunks = [] + for i, (chunk_text, section) in enumerate(raw_chunks): + section_hashtags = re.findall(r"#\w+", chunk_text) + section_wikilinks = re.findall(r"\[\[(.*?)\]\]", chunk_text) + section_tags = list(set(section_hashtags + section_wikilinks)) + + chunk = Chunk( + text=chunk_text, + source_file=source_file, + source_directory=source_directory, + section=section, + date=date, + tags=section_tags, + chunk_index=i, + total_chunks=len(raw_chunks), + modified_at=modified_at, + rule_applied=rule.strategy, + ) + chunks.append(chunk) + else: + raw_chunks = sliding_window_chunks(text, rule.chunk_size, rule.chunk_overlap) + chunks = [] + for i, chunk_text in enumerate(raw_chunks): + chunk_hashtags = re.findall(r"#\w+", chunk_text) + chunk_wikilinks = re.findall(r"\[\[(.*?)\]\]", chunk_text) + chunk_tags = list(set(chunk_hashtags + chunk_wikilinks)) + + chunk = Chunk( + text=chunk_text, + source_file=source_file, + source_directory=source_directory, + section="", + date=date, + tags=chunk_tags, + chunk_index=i, + total_chunks=len(raw_chunks), + modified_at=modified_at, + rule_applied=rule.strategy, + ) + chunks.append(chunk) + + return chunks diff --git a/tests/test_chunker.py b/tests/test_chunker.py new file mode 100644 index 0000000..131829c --- /dev/null +++ b/tests/test_chunker.py @@ -0,0 +1,41 @@ +from companion.rag.chunker import sliding_window_chunks + + +def test_sliding_window_basic(): + text = "word " * 100 + chunks = sliding_window_chunks(text, chunk_size=20, chunk_overlap=5) + assert len(chunks) > 1 + assert len(chunks[0].split()) == 20 + # overlap check: last 5 words of chunk 0 should appear in chunk 1 + last_five = chunks[0].split()[-5:] + first_chunk1 = chunks[1].split()[:5] + assert last_five == first_chunk1 +from companion.rag.chunker import section_based_chunks, chunk_file, ChunkingRule +import tempfile +from pathlib import Path + + +def test_section_based_chunks_splits_on_tags(): + text = "#DayInShort: good day\n#mentalhealth: stressed\n#work: busy" + chunks = section_based_chunks(text, ["#DayInShort", "#mentalhealth", "#work"], chunk_size=10, chunk_overlap=2) + assert len(chunks) == 3 + assert chunks[0][1] == "#DayInShort" + assert chunks[1][1] == "#mentalhealth" + assert chunks[2][1] == "#work" + + +def test_chunk_file_extracts_metadata(): + with tempfile.TemporaryDirectory() as tmp: + vault = Path(tmp) + journal = vault / "Journal" / "2026" / "04" / "2026-04-12.md" + journal.parent.mkdir(parents=True) + journal.write_text("#DayInShort: good day\n#Relations: [[Person/Vinay]] visited.", encoding="utf-8") + rules = { + "default": ChunkingRule(strategy="sliding_window", chunk_size=500, chunk_overlap=100), + "Journal/**": ChunkingRule(strategy="section", chunk_size=300, chunk_overlap=50, section_tags=["#DayInShort", "#Relations"]), + } + chunks = chunk_file(journal, vault, rules, modified_at=1234567890.0) + assert len(chunks) == 2 + assert chunks[0].source_directory == "Journal" + assert chunks[0].date == "2026-04-12" + assert "Person/Vinay" in (chunks[1].tags or [])