"""Markdown parsing, structured + unstructured chunking, metadata enrichment.""" from __future__ import annotations import re import unicodedata import hashlib from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING import frontmatter if TYPE_CHECKING: from obsidian_rag.config import ObsidianRagConfig # ---------------------------------------------------------------------- # Types # ---------------------------------------------------------------------- @dataclass class Chunk: chunk_id: str text: str source_file: str source_directory: str section: str | None date: str | None tags: list[str] = field(default_factory=list) chunk_index: int = 0 total_chunks: int = 1 modified_at: str | None = None indexed_at: str | None = None # ---------------------------------------------------------------------- # Markdown parsing # ---------------------------------------------------------------------- def parse_frontmatter(content: str) -> tuple[dict, str]: """Parse frontmatter from markdown content. Returns (metadata, body).""" try: post = frontmatter.parse(content) meta = dict(post[0]) if post[0] else {} body = str(post[1]) return meta, body except Exception: return {}, content def extract_tags(text: str) -> list[str]: """Extract all #hashtags from text, deduplicated, lowercased.""" return list(dict.fromkeys(t.lower() for t in re.findall(r"#[\w-]+", text))) def extract_date_from_filename(filepath: Path) -> str | None: """Try to parse an ISO date from a filename (e.g. 2024-01-15.md).""" name = filepath.stem # filename without extension # Match YYYY-MM-DD or YYYYMMDD m = re.search(r"(\d{4}-\d{2}-\d{2})|(\d{4}\d{2}\d{2})", name) if m: date_str = m.group(1) or m.group(2) # Normalize YYYYMMDD → YYYY-MM-DD if len(date_str) == 8: return f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" return date_str return None def is_structured_note(filepath: Path) -> bool: """Heuristic: journal/daily notes use date-named files with section headers.""" name = filepath.stem date_match = re.search(r"\d{4}-\d{2}-\d{2}", name) return date_match is not None # ---------------------------------------------------------------------- # Section-split chunker (structured notes) # ---------------------------------------------------------------------- SECTION_HEADER_RE = re.compile(r"^#{1,3}\s+(.+)$", re.MULTILINE) def split_by_sections(body: str, metadata: dict) -> list[tuple[str, str]]: """Split markdown body into (section_name, section_content) pairs. If no headers found, returns [(None, body)]. """ sections: list[tuple[str | None, str]] = [] lines = body.splitlines(keepends=True) current_heading: str | None = None current_content: list[str] = [] for line in lines: m = SECTION_HEADER_RE.match(line.rstrip()) if m: # Flush previous section if current_heading is not None or current_content: sections.append((current_heading, "".join(current_content).strip())) current_content = [] current_heading = m.group(1).strip() else: current_content.append(line) # Flush last section if current_heading is not None or current_content: sections.append((current_heading, "".join(current_content).strip())) if not sections: sections = [(None, body.strip())] return sections # ---------------------------------------------------------------------- # Sliding window chunker (unstructured notes) # ---------------------------------------------------------------------- def _count_tokens(text: str) -> int: """Rough token count: split on whitespace, average ~4 chars per token.""" return len(text.split()) def sliding_window_chunks( text: str, chunk_size: int = 500, overlap: int = 100, ) -> list[str]: """Split text into overlapping sliding-window chunks of ~chunk_size tokens. Returns list of chunk strings. """ words = text.split() if not words: return [] chunks: list[str] = [] start = 0 while start < len(words): end = start + chunk_size chunk_words = words[start:end] chunks.append(" ".join(chunk_words)) # Advance by (chunk_size - overlap) advance = chunk_size - overlap if advance <= 0: advance = max(1, chunk_size // 2) start += advance if start >= len(words): break return chunks # ---------------------------------------------------------------------- # Main chunk router # ---------------------------------------------------------------------- def _stable_chunk_id(content_hash: str, chunk_index: int) -> str: """Generate a stable chunk_id from content hash and index.""" raw = f"{content_hash}:{chunk_index}" return hashlib.sha1(raw.encode()).hexdigest()[:12] def chunk_file( filepath: Path, content: str, modified_at: str, config: "ObsidianRagConfig", chunk_id_prefix: str = "", ) -> list[Chunk]: """Parse a markdown file and return a list of Chunks. Uses section-split for structured notes (journal entries with date filenames), sliding window for everything else. """ import uuid vault_path = Path(config.vault_path) rel_path = filepath if filepath.is_absolute() else filepath source_file = str(rel_path) source_directory = rel_path.parts[0] if rel_path.parts else "" metadata, body = parse_frontmatter(content) tags = extract_tags(body) date = extract_date_from_filename(filepath) chunk_size = config.indexing.chunk_size overlap = config.indexing.chunk_overlap # Compute content hash for stable, content-addressable chunk_ids content_hash = hashlib.sha1(body.encode()).hexdigest()[:12] chunks: list[Chunk] = [] if is_structured_note(filepath): # Section-split for journal/daily notes sections = split_by_sections(body, metadata) total = len(sections) for idx, (section, section_text) in enumerate(sections): if not section_text.strip(): continue section_tags = extract_tags(section_text) combined_tags = list(dict.fromkeys([*tags, *section_tags])) chunk_text = section_text chunk = Chunk( chunk_id=f"{chunk_id_prefix}{_stable_chunk_id(content_hash, idx)}", text=chunk_text, source_file=source_file, source_directory=source_directory, section=f"#{section}" if section else None, date=date, tags=combined_tags, chunk_index=idx, total_chunks=total, modified_at=modified_at, ) chunks.append(chunk) else: # Sliding window for unstructured notes text_chunks = sliding_window_chunks(body, chunk_size, overlap) total = len(text_chunks) for idx, text_chunk in enumerate(text_chunks): if not text_chunk.strip(): continue chunk = Chunk( chunk_id=f"{chunk_id_prefix}{_stable_chunk_id(content_hash, idx)}", text=text_chunk, source_file=source_file, source_directory=source_directory, section=None, date=date, tags=tags, chunk_index=idx, total_chunks=total, modified_at=modified_at, ) chunks.append(chunk) return chunks