Skip to content

Batch Processing

Overview

Batch processing optimizes extraction by grouping multiple chunks into single LLM calls, reducing API overhead while maximizing context window utilization.

In this guide: - Why batching matters - Adaptive batching algorithm - Context window optimization - Performance tuning - Best practices


Why Batching Matters

The API Call Problem

Without batching:

# 10 chunks = 10 API calls
for chunk in chunks:  # 10 iterations
    model = llm.extract(chunk)  # 10 API calls
    models.append(model)

# Cost: 10 × API_COST
# Time: 10 × LATENCY

With batching:

# 10 chunks = 3 batches = 3 API calls
batches = batcher.batch_chunks(chunks)  # Group into 3 batches
for batch in batches:  # 3 iterations
    model = llm.extract(batch.combined_text)  # 3 API calls
    models.append(model)

# Cost: 3 × API_COST (70% savings)
# Time: 3 × LATENCY (70% faster)

ChunkBatcher

What is ChunkBatcher?

ChunkBatcher intelligently groups chunks to fit within context windows, minimizing API calls while preserving semantic boundaries.

Architecture

%%{init: {'theme': 'redux-dark', 'look': 'default', 'layout': 'elk'}}%%
flowchart LR
    %% 1. Define Classes
    classDef input fill:#E3F2FD,stroke:#90CAF9,color:#0D47A1
    classDef config fill:#FFF8E1,stroke:#FFECB3,color:#5D4037
    classDef output fill:#E8F5E9,stroke:#A5D6A7,color:#1B5E20
    classDef decision fill:#FFE0B2,stroke:#FFB74D,color:#E65100
    classDef data fill:#EDE7F6,stroke:#B39DDB,color:#4527A0
    classDef operator fill:#F3E5F5,stroke:#CE93D8,color:#6A1B9A
    classDef process fill:#ECEFF1,stroke:#B0BEC5,color:#263238

    %% 2. Define Nodes
    A@{ shape: terminal, label: "10 Chunks" }

    B@{ shape: procs, label: "Greedy Packing" }
    C@{ shape: doc, label: "5 Candidate Batches" }

    D@{ shape: lin-proc, label: "Merge Undersized" }
    E@{ shape: doc, label: "3 Final Batches" }

    F@{ shape: tag-proc, label: "3 API Calls" }

    %% 3. Define Connections
    A --> B
    B --> C
    C --> D
    D --> E
    E --> F

    %% 4. Apply Classes
    class A input
    class B,D process
    class C,E data
    class F output

Basic Usage

Initialize Batcher

from docling_graph.core.extractors import ChunkBatcher

# Create batcher with context constraints
batcher = ChunkBatcher(
    context_limit=8000,          # Total context window
    system_prompt_tokens=500,    # System prompt overhead
    response_buffer_tokens=500,  # Response space
    merge_threshold=0.95         # Merge if <95% utilized (default)
)

# Available for content: 8000 - 500 - 500 = 7000 tokens

Batch Chunks

# Batch your chunks
batches = batcher.batch_chunks(chunks)

print(f"Reduced {len(chunks)} chunks to {len(batches)} batches")

# Process batches
for batch in batches:
    print(f"Batch {batch.batch_id}: {batch.chunk_count} chunks")
    print(f"  Tokens: {batch.total_tokens}")
    print(f"  Utilization: {batch.total_tokens / 7000 * 100:.1f}%")

Batching Algorithm

Phase 1: Greedy Packing

Strategy: Pack chunks sequentially until context limit reached

current_batch = []
current_tokens = 0

for chunk in chunks:
    chunk_tokens = estimate_tokens(chunk) + OVERHEAD

    if current_tokens + chunk_tokens > available_tokens:
        # Start new batch
        batches.append(current_batch)
        current_batch = [chunk]
        current_tokens = chunk_tokens
    else:
        # Add to current batch
        current_batch.append(chunk)
        current_tokens += chunk_tokens

Result: Candidate batches that fit context window


Phase 2: Merge Undersized

Strategy: Combine small batches to improve utilization

threshold = available_tokens * merge_threshold  # e.g., 95% (default)

for batch in batches:
    if batch.total_tokens < threshold:
        # Try to merge with next batch
        if can_merge_with_next(batch):
            merge_batches(batch, next_batch)

Result: Optimized batches with high utilization


ChunkBatch Object

Structure

@dataclass
class ChunkBatch:
    batch_id: int              # Batch sequence number
    chunks: List[str]          # Chunk texts
    total_tokens: int          # Estimated tokens
    chunk_indices: List[int]   # Original indices

    @property
    def chunk_count(self) -> int:
        return len(self.chunks)

    @property
    def combined_text(self) -> str:
        # Chunks with separators
        return "\n\n---CHUNK BOUNDARY---\n\n".join(chunks)

Usage

batch = batches[0]

print(f"Batch ID: {batch.batch_id}")
print(f"Chunks: {batch.chunk_count}")
print(f"Tokens: {batch.total_tokens}")
print(f"Indices: {batch.chunk_indices}")

# Get combined text for LLM
combined = batch.combined_text

Token Estimation

Estimation Methods

1. Heuristic (Default)

# Fast but approximate
tokens = len(text) // 4  # ~4 chars per token

Pros: Fast, no dependencies
Cons: Approximate (±20% error)

2. Custom Tokenizer

from transformers import AutoTokenizer

# Accurate token counting
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1")

def count_tokens(text: str) -> int:
    return len(tokenizer.encode(text))

# Use with batcher
batches = batcher.batch_chunks(chunks, tokenizer_fn=count_tokens)

Pros: Accurate
Cons: Slower, requires tokenizer


Configuration Parameters

Context Limit

Definition: Total context window size

batcher = ChunkBatcher(
    context_limit=8000  # Mistral: 32K, GPT-4: 128K, Llama: 8K
)

How to choose: - Use model's actual context limit - Be conservative (leave buffer) - Account for prompt overhead


System Prompt Tokens

Definition: Tokens used by system prompt

batcher = ChunkBatcher(
    context_limit=8000,
    system_prompt_tokens=500  # Typical: 300-700
)

Includes: - Extraction instructions - Schema definition - Example format


Response Buffer Tokens

Definition: Space reserved for LLM response

batcher = ChunkBatcher(
    context_limit=8000,
    response_buffer_tokens=500  # Typical: 500-1000
)

Depends on: - Schema complexity - Expected output size - Safety margin


Merge Threshold

Definition: Minimum utilization before merging (default: 95%)

batcher = ChunkBatcher(
    context_limit=8000,
    merge_threshold=0.95  # Merge if <95% utilized (default)
)

Effects: - Higher (0.95-0.98): More batches, better fit, less merging - Lower (0.80-0.90): Fewer batches, more aggressive merging

Default: 95% for all providers. This ensures efficient batching while maintaining good chunk boundaries.


Complete Examples

📍 Basic Batching

from docling_graph.core.extractors import ChunkBatcher, DocumentChunker, DocumentProcessor

# Convert and chunk document
processor = DocumentProcessor(docling_config="ocr")
document = processor.convert_to_docling_doc("large_document.pdf")

chunker = DocumentChunker(provider="mistral")
chunks = chunker.chunk_document(document)

print(f"Created {len(chunks)} chunks")

# Batch chunks
batcher = ChunkBatcher(
    context_limit=32000,  # Mistral Large
    system_prompt_tokens=500,
    response_buffer_tokens=500,
    merge_threshold=0.95  # Default: 95%
)

batches = batcher.batch_chunks(chunks)

print(f"Reduced to {len(batches)} batches")
print(f"API call reduction: {(1 - len(batches)/len(chunks)) * 100:.0f}%")

📍 With Custom Tokenizer

from docling_graph.core.extractors import ChunkBatcher
from transformers import AutoTokenizer

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.2")

def count_tokens(text: str) -> int:
    return len(tokenizer.encode(text))

# Batch with accurate token counting
batcher = ChunkBatcher(context_limit=8000)
batches = batcher.batch_chunks(chunks, tokenizer_fn=count_tokens)

print(f"Accurate batching: {len(batches)} batches")

📍 Integration with Extraction

from docling_graph.core.extractors.backends import LlmBackend
from docling_graph.llm_clients import get_client
from docling_graph.llm_clients.config import resolve_effective_model_config

# Initialize backend
effective = resolve_effective_model_config("mistral", "mistral-large-latest")
client = get_client("mistral")(model_config=effective)
backend = LlmBackend(llm_client=client)

# Batch chunks
batcher = ChunkBatcher(context_limit=32000)
batches = batcher.batch_chunks(chunks)

# Extract from batches
models = []
for batch in batches:
    print(f"Processing batch {batch.batch_id} ({batch.chunk_count} chunks)")

    model = backend.extract_from_markdown(
        markdown=batch.combined_text,
        template=InvoiceTemplate,
        context=f"batch {batch.batch_id}",
        is_partial=True
    )

    if model:
        models.append(model)

print(f"Extracted {len(models)} models from {len(batches)} batches")

📍 Automatic Batching in Pipeline

from docling_graph import run_pipeline, PipelineConfig

# Batching happens automatically
config = PipelineConfig(
    source="large_document.pdf",
    template="templates.BillingDocument",
    backend="llm",
    inference="remote",
    use_chunking=True  # Enables automatic batching
)

run_pipeline(config)

# ChunkBatcher is used internally to optimize API calls

Performance Optimization

Batch Size vs API Calls

Chunks No Batching With Batching Reduction
10 10 calls 3 calls 70%
20 20 calls 5 calls 75%
50 50 calls 12 calls 76%
100 100 calls 23 calls 77%

Cost Savings

# Example: Mistral Large API
API_COST_PER_CALL = $0.002  # Input tokens

# Without batching: 50 chunks
cost_without = 50 * API_COST_PER_CALL = $0.10

# With batching: 12 batches
cost_with = 12 * API_COST_PER_CALL = $0.024

# Savings: 76%

Context Utilization

Measuring Utilization

batches = batcher.batch_chunks(chunks)

for batch in batches:
    utilization = batch.total_tokens / batcher.available_tokens * 100
    print(f"Batch {batch.batch_id}: {utilization:.1f}% utilized")

# Average utilization
avg_util = sum(b.total_tokens for b in batches) / (len(batches) * batcher.available_tokens) * 100
print(f"Average utilization: {avg_util:.1f}%")

Optimization Tips

# ✅ Good - High utilization (90-95%)
batcher = ChunkBatcher(
    context_limit=8000,
    merge_threshold=0.95  # Default: merge if <95% utilized
)

# ❌ Bad - Low utilization (<70%)
batcher = ChunkBatcher(
    context_limit=8000,
    merge_threshold=0.5  # Too aggressive merging
)

Best Practices

👍 Match Context Limit to Model

# ✅ Good - Use actual model limits
if model == "mistral-large":
    context_limit = 32000
elif model == "gpt-4-turbo":
    context_limit = 128000
elif model == "llama3.1:8b":
    context_limit = 8000

batcher = ChunkBatcher(context_limit=context_limit)

👍 Leave Adequate Buffer

# ✅ Good - Conservative buffers
batcher = ChunkBatcher(
    context_limit=8000,
    system_prompt_tokens=500,  # Adequate
    response_buffer_tokens=500  # Safe margin
)

# ❌ Bad - Insufficient buffer
batcher = ChunkBatcher(
    context_limit=8000,
    system_prompt_tokens=100,  # Too small
    response_buffer_tokens=100  # Risky
)

👍 Use Merge Threshold Wisely

# ✅ Good - Default balance (recommended)
batcher = ChunkBatcher(
    merge_threshold=0.95  # Default: 95% - good balance
)

# For many small chunks (more aggressive merging)
batcher = ChunkBatcher(
    merge_threshold=0.85  # More aggressive merging
)

# For few large chunks (less merging)
batcher = ChunkBatcher(
    merge_threshold=0.98  # Minimal merging, better fit
)

👍 Monitor Batch Statistics

# ✅ Good - Check batching effectiveness
batches = batcher.batch_chunks(chunks)

reduction = (1 - len(batches) / len(chunks)) * 100
print(f"API call reduction: {reduction:.0f}%")

if reduction < 50:
    print("Warning: Low batching efficiency")

Troubleshooting

🐛 Too Many Batches

Batching not reducing API calls enough

Solution:

# Lower merge threshold for more aggressive merging
batcher = ChunkBatcher(
    context_limit=8000,
    merge_threshold=0.85  # More aggressive (default is 0.95)
)

# Or increase context limit if model supports it
batcher = ChunkBatcher(
    context_limit=16000  # Use larger context
)

🐛 Batches Too Large

Batches exceeding context limit

Solution:

# Increase buffer sizes
batcher = ChunkBatcher(
    context_limit=8000,
    system_prompt_tokens=700,  # More buffer (was 500)
    response_buffer_tokens=700  # More buffer (was 500)
)

🐛 Low Utilization

Batches not filling context window

Solution:

# Increase merge threshold (closer to default 0.95)
batcher = ChunkBatcher(
    context_limit=8000,
    merge_threshold=0.98  # Less merging, better fit (default is 0.95)
)

# Or use smaller chunks
chunker = DocumentChunker(
    provider="mistral",
    max_tokens=3000  # Smaller chunks (was 4096)
)


Advanced Techniques

Custom Batch Processing

from docling_graph.core.extractors import ChunkBatcher

def process_with_retry(batches, backend, template):
    """Process batches with retry logic."""
    models = []

    for batch in batches:
        max_retries = 3
        for attempt in range(max_retries):
            try:
                model = backend.extract_from_markdown(
                    markdown=batch.combined_text,
                    template=template,
                    context=f"batch {batch.batch_id}",
                    is_partial=True
                )

                if model:
                    models.append(model)
                    break

            except Exception as e:
                if attempt == max_retries - 1:
                    print(f"Failed batch {batch.batch_id} after {max_retries} attempts")
                else:
                    print(f"Retry {attempt + 1} for batch {batch.batch_id}")

    return models

Next Steps

Now that you understand batch processing:

  1. Graph Management → - Work with knowledge graphs
  2. Export Formats → - Export graphs
  3. Visualization → - Visualize graphs