""" Infill Utilities for Batch Gap-Filling Handles gap detection, JSON parsing from LLM output, and text reconstruction. Gap Notation Support: - [GAP:n]: Explicit numbered gaps (preferred) - ___: Underscores (auto-numbered in scan order) FUTURE: Chunking Support ------------------------- For texts exceeding ~2000 tokens (approx 6000 chars), implement per-gap prompting: 1. Split text into chunks preserving gap context (±150 tokens around each gap) 2. Process each gap individually with left/right context 3. Merge results back into full text 4. This avoids context window overflow on smaller models (2k-4k context) Current implementation assumes texts fit within model context window. Add chunking when processing long-form content (articles, full listings). """ import re import json from typing import List, Optional, Tuple from dataclasses import dataclass @dataclass class GapInfo: """Information about a detected gap in text.""" index: int # 1-based index marker: str # Original marker string start: int # Start position in text end: int # End position in text def detect_gaps(text: str, notation: str = "auto") -> List[GapInfo]: """ Detect gaps in text and return their positions. Args: text: Input text with gap markers notation: "auto", "[GAP:n]", or "___" Returns: List of GapInfo objects sorted by position Examples: >>> detect_gaps("Buy this [GAP:1] car with [GAP:2] features") [GapInfo(index=1, marker='[GAP:1]', ...), GapInfo(index=2, marker='[GAP:2]', ...)] >>> detect_gaps("Buy this ___ car with ___ features") [GapInfo(index=1, marker='___', ...), GapInfo(index=2, marker='___', ...)] """ gaps = [] # Pattern for [GAP:n] notation gap_tag_pattern = r'\[GAP:(\d+)\]' # Pattern for underscore notation (3+ underscores) underscore_pattern = r'_{3,}' if notation == "auto": # Try [GAP:n] first, fallback to ___ gap_matches = list(re.finditer(gap_tag_pattern, text)) if gap_matches: notation = "[GAP:n]" else: notation = "___" if notation == "[GAP:n]": for match in re.finditer(gap_tag_pattern, text): gaps.append(GapInfo( index=int(match.group(1)), marker=match.group(0), start=match.start(), end=match.end() )) else: # "___" for i, match in enumerate(re.finditer(underscore_pattern, text), start=1): gaps.append(GapInfo( index=i, marker=match.group(0), start=match.start(), end=match.end() )) # Sort by position (should already be, but ensure) gaps.sort(key=lambda g: g.start) return gaps def parse_infill_json(raw_output: str) -> Optional[dict]: """ Extract and parse JSON from LLM output. Handles common LLM quirks: - JSON wrapped in markdown code blocks - Leading/trailing text before/after JSON - Function-call style wrapper ({"name": "...", "arguments": {...}}) - Double-escaped JSON strings in arguments field - Minor formatting issues Returns: Parsed dict with 'filled_text' and 'gaps' keys, or None if parsing fails """ if not raw_output: return None # Try to extract JSON from markdown code blocks json_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```' match = re.search(json_block_pattern, raw_output) if match: raw_output = match.group(1) # Find JSON object boundaries start_idx = raw_output.find('{') if start_idx == -1: return None # Find matching closing brace depth = 0 end_idx = -1 for i, char in enumerate(raw_output[start_idx:], start=start_idx): if char == '{': depth += 1 elif char == '}': depth -= 1 if depth == 0: end_idx = i + 1 break if end_idx == -1: return None json_str = raw_output[start_idx:end_idx] try: parsed = json.loads(json_str) # Handle function-call style wrapper with STRING arguments (double-escaped): # {"name": "fill_in_text", "arguments": "{\"filled_text\": \"...\"}"} if 'arguments' in parsed: args = parsed['arguments'] if isinstance(args, str): try: parsed = json.loads(args) except json.JSONDecodeError: return None elif isinstance(args, dict): parsed = args # Also handle: {"name": "...", "parameters": {...}} if 'parameters' in parsed: params = parsed['parameters'] if isinstance(params, str): try: parsed = json.loads(params) except json.JSONDecodeError: return None elif isinstance(params, dict): parsed = params # Validate required fields if 'filled_text' not in parsed and 'gaps' not in parsed: return None return parsed except json.JSONDecodeError: return None def apply_fills(original_text: str, gaps: List[GapInfo], fills: dict) -> str: """ Apply gap fills to original text. Uses fills from parsed JSON, replacing markers with chosen words. This is a fallback when LLM's 'filled_text' might be corrupted. Args: original_text: Original text with gap markers gaps: Detected gaps from detect_gaps() fills: Dict mapping gap index to fill choice e.g., {1: "excellent", 2: "powerful"} Returns: Text with gaps replaced by fill choices """ if not gaps or not fills: return original_text # Process from end to start to preserve positions result = original_text for gap in reversed(gaps): if gap.index in fills: result = result[:gap.start] + fills[gap.index] + result[gap.end:] return result def build_fills_dict(gaps_list: List[dict]) -> dict: """ Convert gaps list from JSON to fills dict. Args: gaps_list: List of gap dicts from parsed JSON [{"index": 1, "choice": "word"}, ...] Returns: Dict mapping index to choice: {1: "word", ...} """ fills = {} for gap in gaps_list: if 'index' in gap and 'choice' in gap: fills[gap['index']] = gap['choice'] return fills def normalize_gaps_to_tagged(text: str) -> Tuple[str, List[GapInfo]]: """ Normalize any gap notation to [GAP:n] format. Useful for standardizing input before processing. Args: text: Text with any gap notation Returns: Tuple of (normalized_text, gaps) """ gaps = detect_gaps(text, "auto") if not gaps: return text, [] # If already [GAP:n], return as-is if gaps[0].marker.startswith('[GAP:'): return text, gaps # Convert ___ to [GAP:n] result = text for gap in reversed(gaps): new_marker = f"[GAP:{gap.index}]" result = result[:gap.start] + new_marker + result[gap.end:] # Re-detect with new positions return result, detect_gaps(result, "[GAP:n]")