Files
obsidian-rag/python/obsidian_rag/chunker.py
Santhosh Janardhanan 34f3ce97f7 feat(indexer): hierarchical chunking for large sections
- Section-split first for structured notes
- Large sections (>max_section_chars) broken via sliding-window
- Small sections stay intact with heading preserved
- Adds max_section_chars config (default 4000)
- 2 new TDD tests for hierarchical chunking
2026-04-11 23:58:05 -04:00

265 lines
8.4 KiB
Python

"""Markdown parsing, structured + unstructured chunking, metadata enrichment."""
from __future__ import annotations
import re
import hashlib
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING
import frontmatter
if TYPE_CHECKING:
from obsidian_rag.config import ObsidianRagConfig
# ----------------------------------------------------------------------
# Types
# ----------------------------------------------------------------------
@dataclass
class Chunk:
chunk_id: str
text: str
source_file: str
source_directory: str
section: str | None
date: str | None
tags: list[str] = field(default_factory=list)
chunk_index: int = 0
total_chunks: int = 1
modified_at: str | None = None
indexed_at: str | None = None
# ----------------------------------------------------------------------
# Markdown parsing
# ----------------------------------------------------------------------
def parse_frontmatter(content: str) -> tuple[dict, str]:
"""Parse frontmatter from markdown content. Returns (metadata, body)."""
try:
post = frontmatter.parse(content)
meta = dict(post[0]) if post[0] else {}
body = str(post[1])
return meta, body
except Exception:
return {}, content
def extract_tags(text: str) -> list[str]:
"""Extract all #hashtags from text, deduplicated, lowercased."""
return list(dict.fromkeys(t.lower() for t in re.findall(r"#[\w-]+", text)))
def extract_date_from_filename(filepath: Path) -> str | None:
"""Try to parse an ISO date from a filename (e.g. 2024-01-15.md)."""
name = filepath.stem # filename without extension
# Match YYYY-MM-DD or YYYYMMDD
m = re.search(r"(\d{4}-\d{2}-\d{2})|(\d{4}\d{2}\d{2})", name)
if m:
date_str = m.group(1) or m.group(2)
# Normalize YYYYMMDD → YYYY-MM-DD
if len(date_str) == 8:
return f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}"
return date_str
return None
def is_structured_note(filepath: Path) -> bool:
"""Heuristic: journal/daily notes use date-named files with section headers."""
name = filepath.stem
date_match = re.search(r"\d{4}-\d{2}-\d{2}", name)
return date_match is not None
# ----------------------------------------------------------------------
# Section-split chunker (structured notes)
# ----------------------------------------------------------------------
SECTION_HEADER_RE = re.compile(r"^#{1,3}\s+(.+)$", re.MULTILINE)
def split_by_sections(body: str, metadata: dict) -> list[tuple[str, str]]:
"""Split markdown body into (section_name, section_content) pairs.
If no headers found, returns [(None, body)].
"""
sections: list[tuple[str | None, str]] = []
lines = body.splitlines(keepends=True)
current_heading: str | None = None
current_content: list[str] = []
for line in lines:
m = SECTION_HEADER_RE.match(line.rstrip())
if m:
# Flush previous section
if current_heading is not None or current_content:
sections.append((current_heading, "".join(current_content).strip()))
current_content = []
current_heading = m.group(1).strip()
else:
current_content.append(line)
# Flush last section
if current_heading is not None or current_content:
sections.append((current_heading, "".join(current_content).strip()))
if not sections:
sections = [(None, body.strip())]
return sections
# ----------------------------------------------------------------------
# Sliding window chunker (unstructured notes)
# ----------------------------------------------------------------------
def _count_tokens(text: str) -> int:
"""Rough token count: split on whitespace, average ~4 chars per token."""
return len(text.split())
def sliding_window_chunks(
text: str,
chunk_size: int = 500,
overlap: int = 100,
) -> list[str]:
"""Split text into overlapping sliding-window chunks of ~chunk_size tokens.
Returns list of chunk strings.
"""
words = text.split()
if not words:
return []
chunks: list[str] = []
start = 0
while start < len(words):
end = start + chunk_size
chunk_words = words[start:end]
chunks.append(" ".join(chunk_words))
# Advance by (chunk_size - overlap)
advance = chunk_size - overlap
if advance <= 0:
advance = max(1, chunk_size // 2)
start += advance
if start >= len(words):
break
return chunks
# ----------------------------------------------------------------------
# Main chunk router
# ----------------------------------------------------------------------
def _stable_chunk_id(content_hash: str, chunk_index: int) -> str:
"""Generate a stable chunk_id from content hash and index."""
raw = f"{content_hash}:{chunk_index}"
return hashlib.sha1(raw.encode()).hexdigest()[:12]
def chunk_file(
filepath: Path,
content: str,
modified_at: str,
config: "ObsidianRagConfig",
chunk_id_prefix: str = "",
) -> list[Chunk]:
"""Parse a markdown file and return a list of Chunks.
Uses section-split for structured notes (journal entries with date filenames),
sliding window for everything else.
"""
rel_path = filepath if filepath.is_absolute() else filepath
source_file = str(rel_path)
source_directory = rel_path.parts[0] if rel_path.parts else ""
metadata, body = parse_frontmatter(content)
tags = extract_tags(body)
date = extract_date_from_filename(filepath)
chunk_size = config.indexing.chunk_size
overlap = config.indexing.chunk_overlap
# Compute content hash for stable, content-addressable chunk_ids
content_hash = hashlib.sha1(body.encode()).hexdigest()[:12]
chunks: list[Chunk] = []
if is_structured_note(filepath):
sections = split_by_sections(body, metadata)
total = len(sections)
for idx, (section, section_text) in enumerate(sections):
if not section_text.strip():
continue
section_tags = extract_tags(section_text)
combined_tags = list(dict.fromkeys([*tags, *section_tags]))
section_heading = f"#{section}" if section else None
if len(section_text) > config.indexing.max_section_chars:
sub_chunks = sliding_window_chunks(section_text, chunk_size, overlap)
sub_total = len(sub_chunks)
for sub_idx, sub_text in enumerate(sub_chunks):
chunk = Chunk(
chunk_id=f"{chunk_id_prefix}{_stable_chunk_id(content_hash, idx)}_{sub_idx}",
text=sub_text,
source_file=source_file,
source_directory=source_directory,
section=section_heading,
date=date,
tags=combined_tags,
chunk_index=sub_idx,
total_chunks=sub_total,
modified_at=modified_at,
)
chunks.append(chunk)
else:
chunk = Chunk(
chunk_id=f"{chunk_id_prefix}{_stable_chunk_id(content_hash, idx)}",
text=section_text,
source_file=source_file,
source_directory=source_directory,
section=section_heading,
date=date,
tags=combined_tags,
chunk_index=idx,
total_chunks=total,
modified_at=modified_at,
)
chunks.append(chunk)
else:
# Sliding window for unstructured notes
text_chunks = sliding_window_chunks(body, chunk_size, overlap)
total = len(text_chunks)
for idx, text_chunk in enumerate(text_chunks):
if not text_chunk.strip():
continue
chunk = Chunk(
chunk_id=f"{chunk_id_prefix}{_stable_chunk_id(content_hash, idx)}",
text=text_chunk,
source_file=source_file,
source_directory=source_directory,
section=None,
date=date,
tags=tags,
chunk_index=idx,
total_chunks=total,
modified_at=modified_at,
)
chunks.append(chunk)
return chunks