Batch Processing¶
Overview¶
Batch processing enables efficient processing of multiple documents with progress tracking, error handling, and result aggregation.
Key Features: - Memory-efficient processing (no file exports by default) - Parallel processing - Progress tracking - Error recovery - Result aggregation - Resource management
Basic Batch Processing¶
Simple Loop (Memory-Efficient)¶
from pathlib import Path
from docling_graph import run_pipeline, PipelineConfig
documents = Path("documents").glob("*.pdf")
results = []
for doc in documents:
config = PipelineConfig(
source=str(doc),
template="templates.BillingDocument"
)
try:
# Process without file exports - memory efficient
context = run_pipeline(config)
results.append({
"filename": doc.name,
"nodes": context.knowledge_graph.number_of_nodes(),
"edges": context.knowledge_graph.number_of_edges()
})
print(f"✅ {doc.name}: {results[-1]['nodes']} nodes")
except Exception as e:
print(f"❌ {doc.name}: {e}")
# Summary
print(f"\nTotal documents: {len(results)}")
print(f"Total nodes: {sum(r['nodes'] for r in results)}")
With File Exports¶
from pathlib import Path
from docling_graph import run_pipeline, PipelineConfig
documents = Path("documents").glob("*.pdf")
for doc in documents:
config = PipelineConfig(
source=str(doc),
template="templates.BillingDocument",
dump_to_disk=True, # Enable file exports
output_dir=f"outputs/{doc.stem}"
)
try:
context = run_pipeline(config)
print(f"✅ {doc.name}")
except Exception as e:
print(f"❌ {doc.name}: {e}")
Progress Tracking¶
Using tqdm¶
from pathlib import Path
from docling_graph import PipelineConfig
from tqdm import tqdm
documents = list(Path("documents").glob("*.pdf"))
for doc in tqdm(documents, desc="Processing"):
config = PipelineConfig(
source=str(doc),
template="templates.BillingDocument"
)
try:
run_pipeline(config)
except Exception as e:
tqdm.write(f"❌ {doc.name}: {e}")
Install tqdm:
Error Handling¶
Comprehensive Error Tracking (Memory-Efficient)¶
from pathlib import Path
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import DoclingGraphError
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def batch_process(input_dir: str, template: str):
"""Process documents with error tracking - no file exports."""
documents = list(Path(input_dir).glob("*.pdf"))
results = {
"success": [],
"failed": [],
"graphs": []
}
for doc in documents:
try:
config = PipelineConfig(
source=str(doc),
template=template
)
# Process without file exports
context = run_pipeline(config)
# Store results in memory
results["success"].append(doc.name)
results["graphs"].append({
"filename": doc.name,
"graph": context.knowledge_graph,
"model": context.pydantic_model
})
logger.info(f"✅ Success: {doc.name} ({context.knowledge_graph.number_of_nodes()} nodes)")
except DoclingGraphError as e:
results["failed"].append({
"document": doc.name,
"error": e.message,
"details": e.details
})
logger.error(f"❌ Failed: {doc.name} - {e.message}")
except Exception as e:
results["failed"].append({
"document": doc.name,
"error": str(e),
"details": None
})
logger.exception(f"❌ Unexpected error: {doc.name}")
# Summary
total = len(documents)
logger.info(f"\n{'='*50}")
logger.info(f"Total: {total}")
logger.info(f"Success: {len(results['success'])}")
logger.info(f"Failed: {len(results['failed'])}")
logger.info(f"Total nodes: {sum(g['graph'].number_of_nodes() for g in results['graphs'])}")
return results
# Run batch processing
results = batch_process(
input_dir="documents/invoices",
template="templates.billing_document.BillingDocument"
)
Parallel Processing¶
Using ThreadPoolExecutor (Memory-Efficient)¶
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from docling_graph import run_pipeline, PipelineConfig
from tqdm import tqdm
def process_document(doc_path: Path, template: str):
"""Process single document without file exports."""
try:
config = PipelineConfig(
source=str(doc_path),
template=template
)
context = run_pipeline(config)
return {
"status": "success",
"document": doc_path.name,
"nodes": context.knowledge_graph.number_of_nodes(),
"edges": context.knowledge_graph.number_of_edges(),
"graph": context.knowledge_graph,
"model": context.pydantic_model
}
except Exception as e:
return {"status": "error", "document": doc_path.name, "error": str(e)}
def parallel_batch_process(
input_dir: str,
template: str,
max_workers: int = 4
):
"""Process documents in parallel - memory efficient."""
documents = list(Path(input_dir).glob("*.pdf"))
results = {"success": [], "failed": [], "graphs": []}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
futures = {
executor.submit(process_document, doc, template): doc
for doc in documents
}
# Process results as they complete
for future in tqdm(as_completed(futures), total=len(documents), desc="Processing"):
result = future.result()
if result["status"] == "success":
results["success"].append(result["document"])
results["graphs"].append({
"filename": result["document"],
"graph": result["graph"],
"model": result["model"],
"nodes": result["nodes"],
"edges": result["edges"]
})
else:
results["failed"].append({
"document": result["document"],
"error": result["error"]
})
# Summary
total_nodes = sum(g["nodes"] for g in results["graphs"])
total_edges = sum(g["edges"] for g in results["graphs"])
print(f"\nCompleted: {len(results['success'])} succeeded, {len(results['failed'])} failed")
print(f"Total entities: {total_nodes} nodes, {total_edges} edges")
return results
# Run parallel processing
results = parallel_batch_process(
input_dir="documents/invoices",
template="templates.billing_document.BillingDocument",
max_workers=4
)
Result Aggregation¶
Collecting Statistics¶
from pathlib import Path
import json
import pandas as pd
from docling_graph import PipelineConfig
def batch_with_stats(input_dir: str, template: str):
"""Process documents and collect statistics."""
documents = list(Path(input_dir).glob("*.pdf"))
all_stats = []
for doc in documents:
try:
# Process document
config = PipelineConfig(
source=str(doc),
template=template
)
context = run_pipeline(config)
graph = context.knowledge_graph
stats = {
"document": doc.name,
"status": "success",
"node_count": graph.number_of_nodes(),
"edge_count": graph.number_of_edges(),
}
all_stats.append(stats)
except Exception as e:
all_stats.append({
"document": doc.name,
"status": "error",
"error": str(e)
})
# Create summary DataFrame
df = pd.DataFrame(all_stats)
# Print statistics
print("\n=== Batch Statistics ===")
print(f"Total documents: {len(df)}")
print(f"Successful: {(df['status'] == 'success').sum()}")
print(f"Failed: {(df['status'] == 'error').sum()}")
if 'node_count' in df.columns:
successful = df[df['status'] == 'success']
print(f"\nAverage nodes: {successful['node_count'].mean():.1f}")
print(f"Average edges: {successful['edge_count'].mean():.1f}")
print(f"Average density: {successful['density'].mean():.3f}")
return df
# Run with statistics
df = batch_with_stats(
input_dir="documents/invoices",
template="templates.billing_document.BillingDocument"
)
# Analyze results
print("\nTop 5 documents by node count:")
print(df.nlargest(5, 'node_count')[['document', 'node_count', 'edge_count']])
Advanced Patterns¶
Pattern 1: Conditional Processing¶
from pathlib import Path
from docling_graph import PipelineConfig
def smart_batch_process(input_dir: str):
"""Process documents with template selection."""
documents = Path(input_dir).glob("*")
for doc in documents:
# Determine template based on filename
if "invoice" in doc.name.lower():
template = "templates.billing_document.BillingDocument"
backend = "vlm"
elif "research" in doc.name.lower():
template = "templates.rheology_research.ScholarlyRheologyPaper"
backend = "llm"
else:
print(f"⊘ Skipped (unknown type): {doc.name}")
continue
# Process with appropriate config
config = PipelineConfig(
source=str(doc),
template=template,
backend=backend
)
try:
run_pipeline(config)
print(f"✅ {doc.name}")
except Exception as e:
print(f"❌ {doc.name}: {e}")
smart_batch_process("documents/mixed")
Pattern 2: Retry Logic¶
from pathlib import Path
from docling_graph import PipelineConfig
import time
def process_with_retry(
doc_path: Path,
template: str,
max_retries: int = 3,
delay: int = 5
):
"""Process document with retry logic."""
for attempt in range(1, max_retries + 1):
try:
config = PipelineConfig(
source=str(doc_path),
template=template
)
run_pipeline(config)
return {"status": "success", "attempts": attempt}
except Exception as e:
if attempt < max_retries:
print(f"Attempt {attempt} failed, retrying in {delay}s...")
time.sleep(delay)
else:
return {
"status": "error",
"attempts": attempt,
"error": str(e)
}
def batch_with_retry(input_dir: str, template: str):
"""Batch process with retry logic."""
documents = list(Path(input_dir).glob("*.pdf"))
results = []
for doc in documents:
result = process_with_retry(
doc_path=doc,
template=template,
max_retries=3
)
result["document"] = doc.name
results.append(result)
status = "✅" if result["status"] == "success" else "❌"
print(f"{status} {doc.name} (attempts: {result['attempts']})")
return results
results = batch_with_retry(
input_dir="documents/invoices",
template="templates.billing_document.BillingDocument"
)
Pattern 3: Checkpoint and Resume¶
from pathlib import Path
import json
from docling_graph import PipelineConfig
def batch_with_checkpoint(
input_dir: str,
template: str,
checkpoint_file: str = "checkpoint.json"
):
"""Batch process with checkpoint support."""
documents = list(Path(input_dir).glob("*.pdf"))
checkpoint_path = Path(checkpoint_file)
# Load checkpoint
if checkpoint_path.exists():
with open(checkpoint_path) as f:
checkpoint = json.load(f)
processed = set(checkpoint.get("processed", []))
print(f"Resuming from checkpoint: {len(processed)} already processed")
else:
processed = set()
checkpoint = {"processed": [], "failed": []}
# Process remaining documents
for doc in documents:
if doc.name in processed:
print(f"⊘ Skipped (already processed): {doc.name}")
continue
try:
config = PipelineConfig(
source=str(doc),
template=template
)
run_pipeline(config)
# Update checkpoint
checkpoint["processed"].append(doc.name)
print(f"✅ {doc.name}")
except Exception as e:
checkpoint["failed"].append({
"document": doc.name,
"error": str(e)
})
print(f"❌ {doc.name}: {e}")
# Save checkpoint after each document
with open(checkpoint_path, 'w') as f:
json.dump(checkpoint, f, indent=2)
print(f"\nProcessed: {len(checkpoint['processed'])}")
print(f"Failed: {len(checkpoint['failed'])}")
return checkpoint
# Run with checkpoint
checkpoint = batch_with_checkpoint(
input_dir="documents/invoices",
template="templates.billing_document.BillingDocument"
)
Resource Management¶
Memory Management¶
from pathlib import Path
from docling_graph import PipelineConfig
import gc
def batch_with_memory_management(
input_dir: str,
template: str,
cleanup_interval: int = 10
):
"""Batch process with memory cleanup."""
documents = list(Path(input_dir).glob("*.pdf"))
for i, doc in enumerate(documents, 1):
config = PipelineConfig(
source=str(doc),
template=template
)
try:
run_pipeline(config)
print(f"✅ {doc.name}")
except Exception as e:
print(f"❌ {doc.name}: {e}")
# Periodic cleanup
if i % cleanup_interval == 0:
gc.collect()
print(f"[Cleanup after {i} documents]")
batch_with_memory_management(
input_dir="documents/large_batch",
template="templates.billing_document.BillingDocument",
cleanup_interval=10
)
Complete Example¶
Production-Ready Batch Processor¶
"""
Production-ready batch processor with all features.
"""
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from docling_graph import PipelineConfig
from docling_graph.exceptions import DoclingGraphError
import json
import logging
from datetime import datetime
from tqdm import tqdm
import pandas as pd
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class BatchProcessor:
"""Production-ready batch document processor."""
def __init__(
self,
input_dir: str,
template: str,
output_base: str,
max_workers: int = 4,
max_retries: int = 3
):
self.input_dir = Path(input_dir)
self.template = template
self.output_base = Path(output_base)
self.max_workers = max_workers
self.max_retries = max_retries
# Create output directory
self.output_base.mkdir(parents=True, exist_ok=True)
# Initialize checkpoint
self.checkpoint_file = self.output_base / "checkpoint.json"
self.load_checkpoint()
def load_checkpoint(self):
"""Load processing checkpoint."""
if self.checkpoint_file.exists():
with open(self.checkpoint_file) as f:
self.checkpoint = json.load(f)
logger.info(f"Loaded checkpoint: {len(self.checkpoint['processed'])} processed")
else:
self.checkpoint = {
"processed": [],
"failed": [],
"started_at": datetime.now().isoformat()
}
def save_checkpoint(self):
"""Save processing checkpoint."""
with open(self.checkpoint_file, 'w') as f:
json.dump(self.checkpoint, f, indent=2)
def process_document(self, doc_path: Path):
"""Process single document with retry logic."""
# Skip if already processed
if doc_path.name in self.checkpoint["processed"]:
return {"status": "skipped", "document": doc_path.name}
# Retry loop
for attempt in range(1, self.max_retries + 1):
try:
config = PipelineConfig(
source=str(doc_path),
template=self.template
)
context = run_pipeline(config)
graph = context.knowledge_graph
stats = {
"node_count": graph.number_of_nodes(),
"edge_count": graph.number_of_edges(),
}
return {
"status": "success",
"document": doc_path.name,
"attempts": attempt,
**stats
}
except DoclingGraphError as e:
if attempt < self.max_retries:
logger.warning(f"Attempt {attempt} failed for {doc_path.name}, retrying...")
continue
else:
return {
"status": "error",
"document": doc_path.name,
"attempts": attempt,
"error": e.message
}
except Exception as e:
return {
"status": "error",
"document": doc_path.name,
"attempts": attempt,
"error": str(e)
}
def process_batch(self):
"""Process all documents in batch."""
documents = list(self.input_dir.glob("*.pdf"))
logger.info(f"Found {len(documents)} documents to process")
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.process_document, doc): doc
for doc in documents
}
for future in tqdm(as_completed(futures), total=len(documents), desc="Processing"):
result = future.result()
results.append(result)
# Update checkpoint
if result["status"] == "success":
self.checkpoint["processed"].append(result["document"])
elif result["status"] == "error":
self.checkpoint["failed"].append({
"document": result["document"],
"error": result["error"]
})
self.save_checkpoint()
# Generate summary
self.generate_summary(results)
return results
def generate_summary(self, results):
"""Generate processing summary."""
df = pd.DataFrame(results)
# Save detailed results
summary_file = self.output_base / "batch_results.csv"
df.to_csv(summary_file, index=False)
# Print summary
logger.info("\n" + "="*50)
logger.info("BATCH PROCESSING SUMMARY")
logger.info("="*50)
logger.info(f"Total documents: {len(df)}")
logger.info(f"Successful: {(df['status'] == 'success').sum()}")
logger.info(f"Failed: {(df['status'] == 'error').sum()}")
logger.info(f"Skipped: {(df['status'] == 'skipped').sum()}")
if 'node_count' in df.columns:
successful = df[df['status'] == 'success']
if len(successful) > 0:
logger.info(f"\nAverage nodes: {successful['node_count'].mean():.1f}")
logger.info(f"Average edges: {successful['edge_count'].mean():.1f}")
logger.info(f"Average density: {successful['density'].mean():.3f}")
logger.info(f"\nResults saved to: {summary_file}")
# Usage
if __name__ == "__main__":
processor = BatchProcessor(
input_dir="documents/invoices",
template="templates.billing_document.BillingDocument",
output_base="outputs/production_batch",
max_workers=4,
max_retries=3
)
results = processor.process_batch()
Run:
Best Practices¶
👍 Use Progress Tracking¶
# ✅ Good - Visual progress
from tqdm import tqdm
for doc in tqdm(documents, desc="Processing"):
run_pipeline(config)
# ❌ Avoid - No feedback
for doc in documents:
run_pipeline(config)
👍 Implement Error Recovery¶
# ✅ Good - Checkpoint and resume
checkpoint = load_checkpoint()
for doc in documents:
if doc.name not in checkpoint["processed"]:
process(doc)
checkpoint["processed"].append(doc.name)
save_checkpoint(checkpoint)
# ❌ Avoid - Start from scratch on failure
for doc in documents:
process(doc)
👍 Aggregate Results¶
# ✅ Good - Collect statistics
results = []
for doc in documents:
result = process(doc)
results.append(result)
df = pd.DataFrame(results)
df.to_csv("summary.csv")
# ❌ Avoid - No summary
for doc in documents:
process(doc)
Next Steps¶
- Examples → - Real-world examples
- Advanced Topics → - Custom backends
- API Reference → - Complete API docs