feat: add markdown chunker with sliding window and section strategies

This commit is contained in:
2026-04-13 14:06:37 -04:00
parent d16656a473
commit fc9f975fff
3 changed files with 209 additions and 0 deletions

View File

View File

@@ -0,0 +1,168 @@
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional, Dict
import re
import fnmatch
@dataclass
class Chunk:
text: str
source_file: str = ""
source_directory: str = ""
section: str = ""
date: str = ""
tags: List[str] = field(default_factory=list)
chunk_index: int = 0
total_chunks: int = 0
modified_at: float = 0.0
rule_applied: str = ""
@dataclass
class ChunkingRule:
strategy: str
chunk_size: int
chunk_overlap: int
section_tags: Optional[List[str]] = None
def sliding_window_chunks(text: str, chunk_size: int, chunk_overlap: int) -> List[str]:
words = text.split()
if not words:
return []
chunks = []
step = chunk_size - chunk_overlap
for i in range(0, len(words), step):
chunk_words = words[i : i + chunk_size]
chunks.append(" ".join(chunk_words))
if i + chunk_size >= len(words):
break
return chunks
def section_based_chunks(
text: str, section_tags: List[str], chunk_size: int, chunk_overlap: int
) -> List[tuple]:
lines = text.split("\n")
sections = []
current_tag = None
current_text = []
for line in lines:
stripped = line.strip()
matched_tag = None
content = ""
for tag in section_tags:
if stripped.startswith(tag + ":"):
matched_tag = tag
content = stripped[len(tag) + 1 :].strip()
break
elif stripped.startswith(tag):
matched_tag = tag
remainder = stripped[len(tag) :].strip()
if remainder.startswith(":"):
remainder = remainder[1:].strip()
content = remainder
break
if matched_tag:
if current_tag is not None:
sections.append((current_tag, " ".join(current_text)))
current_tag = matched_tag
current_text = [content]
else:
if current_tag is not None:
current_text.append(stripped)
if current_tag is not None:
sections.append((current_tag, " ".join(current_text)))
if not sections:
return []
result = []
for tag, section_text in sections:
sub_chunks = sliding_window_chunks(section_text, chunk_size, chunk_overlap)
for sub_chunk in sub_chunks:
result.append((sub_chunk, tag))
return result
def match_chunking_rule(
relative_path: str, rules: Dict[str, ChunkingRule]
) -> ChunkingRule:
for pattern, rule in rules.items():
if pattern == "default":
continue
if fnmatch.fnmatch(relative_path, pattern):
return rule
return rules.get("default")
def chunk_file(
file_path: Path,
vault_root: Path,
rules: Dict[str, ChunkingRule],
modified_at: float,
) -> List[Chunk]:
relative_path = file_path.relative_to(vault_root).as_posix()
rule = match_chunking_rule(relative_path, rules)
text = file_path.read_text(encoding="utf-8")
date_match = re.search(r"(\d{4}-\d{2}-\d{2})", file_path.stem)
date = date_match.group(1) if date_match else ""
source_directory = relative_path.split("/")[0] if "/" in relative_path else ""
source_file = relative_path
if rule.strategy == "section" and rule.section_tags:
raw_chunks = section_based_chunks(
text, rule.section_tags, rule.chunk_size, rule.chunk_overlap
)
chunks = []
for i, (chunk_text, section) in enumerate(raw_chunks):
section_hashtags = re.findall(r"#\w+", chunk_text)
section_wikilinks = re.findall(r"\[\[(.*?)\]\]", chunk_text)
section_tags = list(set(section_hashtags + section_wikilinks))
chunk = Chunk(
text=chunk_text,
source_file=source_file,
source_directory=source_directory,
section=section,
date=date,
tags=section_tags,
chunk_index=i,
total_chunks=len(raw_chunks),
modified_at=modified_at,
rule_applied=rule.strategy,
)
chunks.append(chunk)
else:
raw_chunks = sliding_window_chunks(text, rule.chunk_size, rule.chunk_overlap)
chunks = []
for i, chunk_text in enumerate(raw_chunks):
chunk_hashtags = re.findall(r"#\w+", chunk_text)
chunk_wikilinks = re.findall(r"\[\[(.*?)\]\]", chunk_text)
chunk_tags = list(set(chunk_hashtags + chunk_wikilinks))
chunk = Chunk(
text=chunk_text,
source_file=source_file,
source_directory=source_directory,
section="",
date=date,
tags=chunk_tags,
chunk_index=i,
total_chunks=len(raw_chunks),
modified_at=modified_at,
rule_applied=rule.strategy,
)
chunks.append(chunk)
return chunks