Batch Processing¶
Overview¶
Batch processing optimizes extraction by grouping multiple chunks into single LLM calls, reducing API overhead while maximizing context window utilization.
In this guide: - Why batching matters - Adaptive batching algorithm - Context window optimization - Performance tuning - Best practices
Why Batching Matters¶
The API Call Problem¶
Without batching:
# 10 chunks = 10 API calls
for chunk in chunks: # 10 iterations
model = llm.extract(chunk) # 10 API calls
models.append(model)
# Cost: 10 × API_COST
# Time: 10 × LATENCY
With batching:
# 10 chunks = 3 batches = 3 API calls
batches = batcher.batch_chunks(chunks) # Group into 3 batches
for batch in batches: # 3 iterations
model = llm.extract(batch.combined_text) # 3 API calls
models.append(model)
# Cost: 3 × API_COST (70% savings)
# Time: 3 × LATENCY (70% faster)
ChunkBatcher¶
What is ChunkBatcher?¶
ChunkBatcher intelligently groups chunks to fit within context windows, minimizing API calls while preserving semantic boundaries.
Architecture¶
%%{init: {'theme': 'redux-dark', 'look': 'default', 'layout': 'elk'}}%%
flowchart LR
%% 1. Define Classes
classDef input fill:#E3F2FD,stroke:#90CAF9,color:#0D47A1
classDef config fill:#FFF8E1,stroke:#FFECB3,color:#5D4037
classDef output fill:#E8F5E9,stroke:#A5D6A7,color:#1B5E20
classDef decision fill:#FFE0B2,stroke:#FFB74D,color:#E65100
classDef data fill:#EDE7F6,stroke:#B39DDB,color:#4527A0
classDef operator fill:#F3E5F5,stroke:#CE93D8,color:#6A1B9A
classDef process fill:#ECEFF1,stroke:#B0BEC5,color:#263238
%% 2. Define Nodes
A@{ shape: terminal, label: "10 Chunks" }
B@{ shape: procs, label: "Greedy Packing" }
C@{ shape: doc, label: "5 Candidate Batches" }
D@{ shape: lin-proc, label: "Merge Undersized" }
E@{ shape: doc, label: "3 Final Batches" }
F@{ shape: tag-proc, label: "3 API Calls" }
%% 3. Define Connections
A --> B
B --> C
C --> D
D --> E
E --> F
%% 4. Apply Classes
class A input
class B,D process
class C,E data
class F output
Basic Usage¶
Initialize Batcher¶
from docling_graph.core.extractors import ChunkBatcher
# Create batcher with context constraints
batcher = ChunkBatcher(
context_limit=8000, # Total context window
system_prompt_tokens=500, # System prompt overhead
response_buffer_tokens=500, # Response space
merge_threshold=0.95 # Merge if <95% utilized (default)
)
# Available for content: 8000 - 500 - 500 = 7000 tokens
Batch Chunks¶
# Batch your chunks
batches = batcher.batch_chunks(chunks)
print(f"Reduced {len(chunks)} chunks to {len(batches)} batches")
# Process batches
for batch in batches:
print(f"Batch {batch.batch_id}: {batch.chunk_count} chunks")
print(f" Tokens: {batch.total_tokens}")
print(f" Utilization: {batch.total_tokens / 7000 * 100:.1f}%")
Batching Algorithm¶
Phase 1: Greedy Packing¶
Strategy: Pack chunks sequentially until context limit reached
current_batch = []
current_tokens = 0
for chunk in chunks:
chunk_tokens = estimate_tokens(chunk) + OVERHEAD
if current_tokens + chunk_tokens > available_tokens:
# Start new batch
batches.append(current_batch)
current_batch = [chunk]
current_tokens = chunk_tokens
else:
# Add to current batch
current_batch.append(chunk)
current_tokens += chunk_tokens
Result: Candidate batches that fit context window
Phase 2: Merge Undersized¶
Strategy: Combine small batches to improve utilization
threshold = available_tokens * merge_threshold # e.g., 95% (default)
for batch in batches:
if batch.total_tokens < threshold:
# Try to merge with next batch
if can_merge_with_next(batch):
merge_batches(batch, next_batch)
Result: Optimized batches with high utilization
ChunkBatch Object¶
Structure¶
@dataclass
class ChunkBatch:
batch_id: int # Batch sequence number
chunks: List[str] # Chunk texts
total_tokens: int # Estimated tokens
chunk_indices: List[int] # Original indices
@property
def chunk_count(self) -> int:
return len(self.chunks)
@property
def combined_text(self) -> str:
# Chunks with separators
return "\n\n---CHUNK BOUNDARY---\n\n".join(chunks)
Usage¶
batch = batches[0]
print(f"Batch ID: {batch.batch_id}")
print(f"Chunks: {batch.chunk_count}")
print(f"Tokens: {batch.total_tokens}")
print(f"Indices: {batch.chunk_indices}")
# Get combined text for LLM
combined = batch.combined_text
Token Estimation¶
Estimation Methods¶
1. Heuristic (Default)¶
Pros: Fast, no dependencies
Cons: Approximate (±20% error)
2. Custom Tokenizer¶
from transformers import AutoTokenizer
# Accurate token counting
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1")
def count_tokens(text: str) -> int:
return len(tokenizer.encode(text))
# Use with batcher
batches = batcher.batch_chunks(chunks, tokenizer_fn=count_tokens)
Pros: Accurate
Cons: Slower, requires tokenizer
Configuration Parameters¶
Context Limit¶
Definition: Total context window size
How to choose: - Use model's actual context limit - Be conservative (leave buffer) - Account for prompt overhead
System Prompt Tokens¶
Definition: Tokens used by system prompt
Includes: - Extraction instructions - Schema definition - Example format
Response Buffer Tokens¶
Definition: Space reserved for LLM response
Depends on: - Schema complexity - Expected output size - Safety margin
Merge Threshold¶
Definition: Minimum utilization before merging (default: 95%)
batcher = ChunkBatcher(
context_limit=8000,
merge_threshold=0.95 # Merge if <95% utilized (default)
)
Effects: - Higher (0.95-0.98): More batches, better fit, less merging - Lower (0.80-0.90): Fewer batches, more aggressive merging
Default: 95% for all providers. This ensures efficient batching while maintaining good chunk boundaries.
Complete Examples¶
📍 Basic Batching¶
from docling_graph.core.extractors import ChunkBatcher, DocumentChunker, DocumentProcessor
# Convert and chunk document
processor = DocumentProcessor(docling_config="ocr")
document = processor.convert_to_docling_doc("large_document.pdf")
chunker = DocumentChunker(provider="mistral")
chunks = chunker.chunk_document(document)
print(f"Created {len(chunks)} chunks")
# Batch chunks
batcher = ChunkBatcher(
context_limit=32000, # Mistral Large
system_prompt_tokens=500,
response_buffer_tokens=500,
merge_threshold=0.95 # Default: 95%
)
batches = batcher.batch_chunks(chunks)
print(f"Reduced to {len(batches)} batches")
print(f"API call reduction: {(1 - len(batches)/len(chunks)) * 100:.0f}%")
📍 With Custom Tokenizer¶
from docling_graph.core.extractors import ChunkBatcher
from transformers import AutoTokenizer
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.2")
def count_tokens(text: str) -> int:
return len(tokenizer.encode(text))
# Batch with accurate token counting
batcher = ChunkBatcher(context_limit=8000)
batches = batcher.batch_chunks(chunks, tokenizer_fn=count_tokens)
print(f"Accurate batching: {len(batches)} batches")
📍 Integration with Extraction¶
from docling_graph.core.extractors.backends import LlmBackend
from docling_graph.llm_clients import get_client
from docling_graph.llm_clients.config import resolve_effective_model_config
# Initialize backend
effective = resolve_effective_model_config("mistral", "mistral-large-latest")
client = get_client("mistral")(model_config=effective)
backend = LlmBackend(llm_client=client)
# Batch chunks
batcher = ChunkBatcher(context_limit=32000)
batches = batcher.batch_chunks(chunks)
# Extract from batches
models = []
for batch in batches:
print(f"Processing batch {batch.batch_id} ({batch.chunk_count} chunks)")
model = backend.extract_from_markdown(
markdown=batch.combined_text,
template=InvoiceTemplate,
context=f"batch {batch.batch_id}",
is_partial=True
)
if model:
models.append(model)
print(f"Extracted {len(models)} models from {len(batches)} batches")
📍 Automatic Batching in Pipeline¶
from docling_graph import run_pipeline, PipelineConfig
# Batching happens automatically
config = PipelineConfig(
source="large_document.pdf",
template="templates.BillingDocument",
backend="llm",
inference="remote",
use_chunking=True # Enables automatic batching
)
run_pipeline(config)
# ChunkBatcher is used internally to optimize API calls
Performance Optimization¶
Batch Size vs API Calls¶
| Chunks | No Batching | With Batching | Reduction |
|---|---|---|---|
| 10 | 10 calls | 3 calls | 70% |
| 20 | 20 calls | 5 calls | 75% |
| 50 | 50 calls | 12 calls | 76% |
| 100 | 100 calls | 23 calls | 77% |
Cost Savings¶
# Example: Mistral Large API
API_COST_PER_CALL = $0.002 # Input tokens
# Without batching: 50 chunks
cost_without = 50 * API_COST_PER_CALL = $0.10
# With batching: 12 batches
cost_with = 12 * API_COST_PER_CALL = $0.024
# Savings: 76%
Context Utilization¶
Measuring Utilization¶
batches = batcher.batch_chunks(chunks)
for batch in batches:
utilization = batch.total_tokens / batcher.available_tokens * 100
print(f"Batch {batch.batch_id}: {utilization:.1f}% utilized")
# Average utilization
avg_util = sum(b.total_tokens for b in batches) / (len(batches) * batcher.available_tokens) * 100
print(f"Average utilization: {avg_util:.1f}%")
Optimization Tips¶
# ✅ Good - High utilization (90-95%)
batcher = ChunkBatcher(
context_limit=8000,
merge_threshold=0.95 # Default: merge if <95% utilized
)
# ❌ Bad - Low utilization (<70%)
batcher = ChunkBatcher(
context_limit=8000,
merge_threshold=0.5 # Too aggressive merging
)
Best Practices¶
👍 Match Context Limit to Model¶
# ✅ Good - Use actual model limits
if model == "mistral-large":
context_limit = 32000
elif model == "gpt-4-turbo":
context_limit = 128000
elif model == "llama3.1:8b":
context_limit = 8000
batcher = ChunkBatcher(context_limit=context_limit)
👍 Leave Adequate Buffer¶
# ✅ Good - Conservative buffers
batcher = ChunkBatcher(
context_limit=8000,
system_prompt_tokens=500, # Adequate
response_buffer_tokens=500 # Safe margin
)
# ❌ Bad - Insufficient buffer
batcher = ChunkBatcher(
context_limit=8000,
system_prompt_tokens=100, # Too small
response_buffer_tokens=100 # Risky
)
👍 Use Merge Threshold Wisely¶
# ✅ Good - Default balance (recommended)
batcher = ChunkBatcher(
merge_threshold=0.95 # Default: 95% - good balance
)
# For many small chunks (more aggressive merging)
batcher = ChunkBatcher(
merge_threshold=0.85 # More aggressive merging
)
# For few large chunks (less merging)
batcher = ChunkBatcher(
merge_threshold=0.98 # Minimal merging, better fit
)
👍 Monitor Batch Statistics¶
# ✅ Good - Check batching effectiveness
batches = batcher.batch_chunks(chunks)
reduction = (1 - len(batches) / len(chunks)) * 100
print(f"API call reduction: {reduction:.0f}%")
if reduction < 50:
print("Warning: Low batching efficiency")
Troubleshooting¶
🐛 Too Many Batches¶
Batching not reducing API calls enough
Solution:
# Lower merge threshold for more aggressive merging
batcher = ChunkBatcher(
context_limit=8000,
merge_threshold=0.85 # More aggressive (default is 0.95)
)
# Or increase context limit if model supports it
batcher = ChunkBatcher(
context_limit=16000 # Use larger context
)
🐛 Batches Too Large¶
Batches exceeding context limit
Solution:
# Increase buffer sizes
batcher = ChunkBatcher(
context_limit=8000,
system_prompt_tokens=700, # More buffer (was 500)
response_buffer_tokens=700 # More buffer (was 500)
)
🐛 Low Utilization¶
Batches not filling context window
Solution:
# Increase merge threshold (closer to default 0.95)
batcher = ChunkBatcher(
context_limit=8000,
merge_threshold=0.98 # Less merging, better fit (default is 0.95)
)
# Or use smaller chunks
chunker = DocumentChunker(
provider="mistral",
max_tokens=3000 # Smaller chunks (was 4096)
)
Advanced Techniques¶
Custom Batch Processing¶
from docling_graph.core.extractors import ChunkBatcher
def process_with_retry(batches, backend, template):
"""Process batches with retry logic."""
models = []
for batch in batches:
max_retries = 3
for attempt in range(max_retries):
try:
model = backend.extract_from_markdown(
markdown=batch.combined_text,
template=template,
context=f"batch {batch.batch_id}",
is_partial=True
)
if model:
models.append(model)
break
except Exception as e:
if attempt == max_retries - 1:
print(f"Failed batch {batch.batch_id} after {max_retries} attempts")
else:
print(f"Retry {attempt + 1} for batch {batch.batch_id}")
return models
Next Steps¶
Now that you understand batch processing:
- Graph Management → - Work with knowledge graphs
- Export Formats → - Export graphs
- Visualization → - Visualize graphs