Error Handling¶
Overview¶
Handle errors gracefully in docling-graph pipelines with structured exception handling, retry logic, and debugging strategies.
Prerequisites: - Understanding of Pipeline Architecture - Familiarity with Python API - Basic Python exception handling
New: Zero Data Loss
Docling Graph now implements zero data loss - extraction failures return partial models instead of empty results, ensuring you never lose successfully extracted data.
Exception Hierarchy¶
Docling-graph uses a structured exception hierarchy:
DoclingGraphError (base)
├── ConfigurationError # Invalid configuration
├── ClientError # LLM/API client errors
├── ExtractionError # Document extraction failures
├── ValidationError # Data validation failures
├── GraphError # Graph operation failures
└── PipelineError # Pipeline execution failures
Import Exceptions¶
from docling_graph.exceptions import (
DoclingGraphError,
ConfigurationError,
ClientError,
ExtractionError,
ValidationError,
GraphError,
PipelineError
)
Common Error Scenarios¶
1. Configuration Errors¶
"""Handle configuration errors."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import ConfigurationError
try:
config = PipelineConfig(
source="document.pdf",
template="templates.MyTemplate",
backend="vlm",
inference="remote" # VLM doesn't support remote!
)
run_pipeline(config)
except ConfigurationError as e:
print(f"Configuration error: {e.message}")
print(f"Details: {e.details}")
# Fix: Use local inference with VLM
config = PipelineConfig(
source="document.pdf",
template="templates.MyTemplate",
backend="vlm",
inference="local" # Corrected
)
run_pipeline(config)
2. Client Errors (API)¶
"""Handle API client errors."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import ClientError
import time
def process_with_retry(source: str, max_retries: int = 3):
"""Process with retry on client errors."""
for attempt in range(max_retries):
try:
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
backend="llm",
inference="remote"
)
run_pipeline(config)
print("✅ Processing successful")
return
except ClientError as e:
print(f"Attempt {attempt + 1} failed: {e.message}")
if "rate limit" in str(e).lower():
# Rate limit - wait and retry
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
elif "authentication" in str(e).lower():
# Auth error - don't retry
print("Authentication failed. Check API key.")
raise
elif attempt == max_retries - 1:
# Last attempt failed
print("Max retries reached")
raise
else:
# Other error - retry
time.sleep(1)
# Usage
process_with_retry("document.pdf")
3. Extraction Errors¶
"""Handle extraction errors."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import ExtractionError
def process_with_fallback(source: str):
"""Process with fallback strategy."""
# Try VLM first (faster)
try:
print("Trying VLM extraction...")
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
backend="vlm",
inference="local"
)
run_pipeline(config)
print("✅ VLM extraction successful")
return
except ExtractionError as e:
print(f"VLM failed: {e.message}")
print("Falling back to LLM...")
# Fallback to LLM
try:
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
backend="llm",
inference="local"
)
run_pipeline(config)
print("✅ LLM extraction successful")
except ExtractionError as e:
print(f"Both methods failed: {e.message}")
print(f"Details: {e.details}")
raise
# Usage
process_with_fallback("document.pdf")
4. Validation Errors¶
"""Handle validation errors."""
from pydantic import BaseModel, Field, ValidationError as PydanticValidationError
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import ValidationError
class StrictTemplate(BaseModel):
"""Template with strict validation."""
name: str = Field(..., min_length=1)
age: int = Field(..., ge=0, le=150)
email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
def process_with_validation_handling(source: str):
"""Process with validation error handling."""
try:
config = PipelineConfig(
source=source,
template="templates.StrictTemplate"
)
run_pipeline(config)
except ValidationError as e:
print(f"Validation failed: {e.message}")
# Check if it's a Pydantic validation error
if e.cause and isinstance(e.cause, PydanticValidationError):
print("\nValidation errors:")
for error in e.cause.errors():
field = error['loc'][0]
msg = error['msg']
print(f" - {field}: {msg}")
# Option 1: Use more lenient template
print("\nRetrying with lenient template...")
config = PipelineConfig(
source=source,
template="templates.LenientTemplate"
)
run_pipeline(config)
# Usage
process_with_validation_handling("document.pdf")
5. Graph Errors¶
"""Handle graph construction errors."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import GraphError
def process_with_graph_validation(source: str):
"""Process with graph validation."""
try:
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
export_format="cypher"
)
run_pipeline(config)
except GraphError as e:
print(f"Graph error: {e.message}")
print(f"Details: {e.details}")
# Try alternative export format
print("Trying CSV export instead...")
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
export_format="csv" # Fallback format
)
run_pipeline(config)
# Usage
process_with_graph_validation("document.pdf")
Retry Strategies¶
Exponential Backoff¶
"""Implement exponential backoff for retries."""
import time
from typing import Callable, Any
from docling_graph.exceptions import ClientError
def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0
) -> Any:
"""
Retry function with exponential backoff.
Args:
func: Function to retry
max_retries: Maximum number of retries
base_delay: Initial delay in seconds
max_delay: Maximum delay in seconds
backoff_factor: Multiplier for delay
Returns:
Function result
Raises:
Exception from last attempt
"""
last_exception = None
for attempt in range(max_retries):
try:
return func()
except ClientError as e:
last_exception = e
if attempt == max_retries - 1:
# Last attempt
break
# Calculate delay
delay = min(base_delay * (backoff_factor ** attempt), max_delay)
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.1f}s...")
time.sleep(delay)
# All retries failed
raise last_exception
# Usage
def process_document():
config = PipelineConfig(
source="document.pdf",
template="templates.MyTemplate",
backend="llm",
inference="remote"
)
run_pipeline(config)
retry_with_backoff(process_document, max_retries=3)
Conditional Retry¶
"""Retry only for specific errors."""
from docling_graph.exceptions import ClientError, ConfigurationError
def should_retry(exception: Exception) -> bool:
"""Determine if error is retryable."""
# Don't retry configuration errors
if isinstance(exception, ConfigurationError):
return False
# Retry client errors
if isinstance(exception, ClientError):
error_msg = str(exception).lower()
# Don't retry auth errors
if "authentication" in error_msg or "unauthorized" in error_msg:
return False
# Retry rate limits and timeouts
if "rate limit" in error_msg or "timeout" in error_msg:
return True
# Default: don't retry
return False
def process_with_conditional_retry(source: str, max_retries: int = 3):
"""Process with conditional retry."""
for attempt in range(max_retries):
try:
config = PipelineConfig(
source=source,
template="templates.MyTemplate"
)
run_pipeline(config)
return
except Exception as e:
if not should_retry(e) or attempt == max_retries - 1:
raise
print(f"Retryable error. Attempt {attempt + 2}...")
time.sleep(2 ** attempt)
Logging and Debugging¶
Enable Detailed Logging¶
"""Configure logging for debugging."""
import logging
from docling_graph import run_pipeline, PipelineConfig
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('docling_graph')
# Run pipeline with logging
try:
config = PipelineConfig(
source="document.pdf",
template="templates.MyTemplate"
)
run_pipeline(config)
except Exception as e:
logger.error(f"Pipeline failed: {e}", exc_info=True)
raise
Debug Mode¶
"""Run pipeline in debug mode."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import DoclingGraphError
def debug_pipeline(source: str):
"""Run pipeline with detailed error information."""
try:
config = PipelineConfig(
source=source,
template="templates.MyTemplate"
)
run_pipeline(config)
except DoclingGraphError as e:
print("\n" + "="*60)
print("ERROR DETAILS")
print("="*60)
print(f"Type: {type(e).__name__}")
print(f"Message: {e.message}")
if e.details:
print("\nDetails:")
for key, value in e.details.items():
print(f" {key}: {value}")
if e.cause:
print(f"\nCaused by: {type(e.cause).__name__}")
print(f" {e.cause}")
print("="*60)
raise
# Usage
debug_pipeline("document.pdf")
### Trace Data for Debugging
**Trace data** provides visibility into pipeline internals for debugging extraction issues:
```python
"""Use trace data to debug extraction failures."""
from docling_graph import run_pipeline, PipelineConfig
def debug_with_trace(source: str):
"""Debug extraction using trace data."""
config = PipelineConfig(
source=source,
template="templates.ComplexTemplate",
debug=True, # Enable debug mode
dump_to_disk=True, # Export for analysis
output_dir="debug_output"
)
context = run_pipeline(config)
# Analyze debug artifacts
from pathlib import Path
import json
debug_dir = Path(context.output_dir) / "debug"
if debug_dir.exists():
# Check for validation errors in atom extractions
atoms_dir = debug_dir / "atoms"
errors = []
for attempt_file in atoms_dir.glob("*_attempt*.json"):
with open(attempt_file) as f:
attempt = json.load(f)
if not attempt['validation_success']:
errors.append(attempt)
if errors:
print(f"❌ Found {len(errors)} validation errors:")
for attempt in errors:
print(f"\n Slot {attempt['slot_id']} (attempt {attempt['attempt']}):")
print(f" Error: {attempt['error']}")
# Check confidence distribution
with open(debug_dir / "atoms_all.jsonl") as f:
atoms = [json.loads(line) for line in f]
confidences = [a['confidence'] for a in atoms]
if confidences:
avg_conf = sum(confidences) / len(confidences)
print(f"\n📊 Confidence Statistics:")
print(f" Total atoms: {len(atoms)}")
print(f" Average confidence: {avg_conf:.2f}")
print(f" Min confidence: {min(confidences):.2f}")
print(f" Max confidence: {max(confidences):.2f}")
# Check for conflicts
with open(debug_dir / "reducer_report.json") as f:
report = json.load(f)
conflicts = report.get('conflicts', [])
if conflicts:
print(f"\n⚠️ Found {len(conflicts)} conflicts:")
for conflict in conflicts:
print(f" Field: {conflict['field_path']}")
print(f" Values: {len(conflict['conflicting_values'])}")
return context
# Usage
context = debug_with_trace("problematic_document.pdf")
See Also: Trace Data Debugging Guide for comprehensive examples.
### Automatic Cleanup on Failure
When `dump_to_disk=True`, the pipeline automatically cleans up empty output directories if processing fails:
```python
"""Automatic cleanup of empty directories on failure."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import PipelineError
config = PipelineConfig(
source="document.pdf",
template="templates.MyTemplate",
dump_to_disk=True,
output_dir="outputs"
)
try:
context = run_pipeline(config)
except PipelineError as e:
# If the pipeline fails before writing any files,
# the empty output directory is automatically removed
print(f"Pipeline failed: {e.message}")
# No empty artifact directories left in outputs/
Cleanup Behavior:
- Empty directories are removed - If the pipeline fails before writing any files, the output directory is automatically deleted
- Partial results are preserved - If any files were written before the failure, the directory is kept
- Only when dump_to_disk=True - Cleanup only runs when file exports are enabled
- Logged for transparency - Cleanup actions are logged for visibility
Example Scenarios:
# Scenario 1: Failure during template loading (before any files)
# → Output directory is removed (empty)
# Scenario 2: Failure during extraction (after docling conversion)
# → Output directory is kept (contains docling/ files)
# Scenario 3: dump_to_disk=False
# → No cleanup needed (no directory created)
This ensures your outputs/ folder stays clean without manual intervention.
Error Recovery Patterns¶
Zero Data Loss¶
Zero data loss ensures extraction failures never result in completely empty results:
"""Handle extraction with zero data loss."""
from docling_graph import run_pipeline, PipelineConfig
from pathlib import Path
import json
def process_with_zero_data_loss(source: str):
"""Process document with zero data loss guarantee."""
config = PipelineConfig(
source=source,
template="templates.BillingDocument",
processing_mode="many-to-one",
output_dir="outputs"
)
try:
results = run_pipeline(config)
# Check result type
if len(results) == 1:
print("✅ Successfully merged into single model")
return {"status": "complete", "models": results}
else:
print(f"⚠ Got {len(results)} partial models (merge failed)")
print(" But data is preserved!")
return {"status": "partial", "models": results}
except Exception as e:
print(f"Pipeline failed: {e}")
# Even on failure, check for partial results
output_dir = Path("outputs")
if output_dir.exists():
# Look for partial model files
model_files = list(output_dir.glob("*.json"))
if model_files:
print(f"✅ Found {len(model_files)} partial model files")
# Load partial models
partial_models = []
for file in model_files:
with open(file) as f:
partial_models.append(json.load(f))
return {"status": "recovered", "models": partial_models}
return {"status": "failed", "models": []}
# Usage
result = process_with_zero_data_loss("invoice.pdf")
if result["status"] == "complete":
# Use merged model
model = result["models"][0]
print(f"Invoice: {model.get('document_no')}")
elif result["status"] == "partial":
# Use partial models
print("Working with partial models:")
for i, model in enumerate(result["models"], 1):
print(f" Model {i}: {model.get('document_no', 'N/A')}")
elif result["status"] == "recovered":
# Recovered from files
print("Recovered partial data from files")
else:
print("No data available")
Partial Model Handling¶
"""Work with partial models when merging fails."""
from docling_graph import run_pipeline, PipelineConfig
from typing import List, Dict, Any
def extract_with_partial_handling(source: str) -> Dict[str, Any]:
"""Extract and handle partial models intelligently."""
config = PipelineConfig(
source=source,
template="templates.BillingDocument",
processing_mode="many-to-one",
llm_consolidation=True # Try LLM consolidation
)
results = run_pipeline(config)
if len(results) == 1:
# Success: single merged model
return {
"status": "merged",
"document_no": results[0].document_no,
"total": results[0].total,
"line_items": len(results[0].line_items or []),
"completeness": 100
}
else:
# Partial: multiple models
print(f"⚠ Merge failed, got {len(results)} partial models")
# Combine data from partial models
combined = {
"status": "partial",
"document_no": None,
"total": None,
"line_items": 0,
"completeness": 0
}
# Extract what we can
for model in results:
if model.document_no and not combined["document_no"]:
combined["document_no"] = model.document_no
if model.total and not combined["total"]:
combined["total"] = model.total
if model.line_items:
combined["line_items"] += len(model.line_items)
# Calculate completeness
fields_found = sum([
bool(combined["document_no"]),
bool(combined["total"]),
bool(combined["line_items"] > 0)
])
combined["completeness"] = int((fields_found / 3) * 100)
return combined
# Usage
result = extract_with_partial_handling("invoice.pdf")
print(f"Status: {result['status']}")
print(f"Invoice: {result['document_no'] or 'N/A'}")
print(f"Total: ${result['total'] or 0}")
print(f"Line items: {result['line_items']}")
print(f"Completeness: {result['completeness']}%")
if result['completeness'] < 100:
print("⚠ Incomplete extraction - consider manual review")
Graceful Degradation¶
"""Degrade gracefully on errors."""
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import ExtractionError
def process_with_degradation(source: str):
"""Process with graceful degradation."""
results = {
"success": False,
"method": None,
"output_dir": None,
"models": []
}
# Try best method first
methods = [
("VLM Local", {"backend": "vlm", "inference": "local"}),
("LLM Local", {"backend": "llm", "inference": "local"}),
("LLM Remote", {"backend": "llm", "inference": "remote"})
]
for method_name, config_overrides in methods:
try:
print(f"Trying {method_name}...")
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
**config_overrides
)
models = run_pipeline(config)
results["success"] = True
results["method"] = method_name
results["output_dir"] = config.output_dir
results["models"] = models
print(f"✅ Success with {method_name}")
print(f" Extracted {len(models)} model(s)")
break
except ExtractionError as e:
print(f"❌ {method_name} failed: {e.message}")
continue
if not results["success"]:
print("❌ All methods failed")
return results
Partial Success Handling¶
"""Handle partial extraction success."""
from pathlib import Path
import json
from docling_graph import run_pipeline, PipelineConfig
def process_with_partial_success(source: str):
"""Process and handle partial results."""
try:
config = PipelineConfig(
source=source,
template="templates.MyTemplate",
output_dir="outputs"
)
models = run_pipeline(config)
# Check completeness
if len(models) == 1:
return {
"status": "complete",
"models": models,
"output_dir": config.output_dir
}
else:
return {
"status": "partial",
"models": models,
"output_dir": config.output_dir,
"warning": f"Got {len(models)} partial models instead of 1"
}
except Exception as e:
print(f"Pipeline failed: {e}")
# Check if partial results exist
output_dir = Path("outputs")
if output_dir.exists():
# Check for extracted data
nodes_file = output_dir / "nodes.csv"
if nodes_file.exists():
print("✅ Partial results available in files")
print(f" Nodes: {nodes_file}")
# Use partial results
return {
"status": "recovered",
"models": [],
"output_dir": output_dir
}
return {"status": "failed", "models": [], "output_dir": None}
Validation Strategies¶
Pre-Validation¶
"""Validate before processing."""
from pathlib import Path
from docling_graph import run_pipeline, PipelineConfig
from docling_graph.exceptions import ConfigurationError
def validate_and_process(source: str, template: str):
"""Validate configuration before processing."""
# Validate source
source_path = Path(source)
if not source_path.exists():
raise ConfigurationError(
"Source file not found",
details={"source": source}
)
# Validate template
try:
# Try to import template
module_path, class_name = template.rsplit(".", 1)
import importlib
module = importlib.import_module(module_path)
template_class = getattr(module, class_name)
except Exception as e:
raise ConfigurationError(
"Invalid template",
details={"template": template},
cause=e
)
# Validate file size
size_mb = source_path.stat().st_size / (1024 * 1024)
if size_mb > 100:
print(f"⚠️ Large file: {size_mb:.1f}MB")
# Process
config = PipelineConfig(
source=source,
template=template
)
run_pipeline(config)
Best Practices¶
👍 Use Specific Exceptions¶
# ✅ Good - Catch specific exceptions
try:
run_pipeline(config)
except ClientError as e:
# Handle API errors
pass
except ExtractionError as e:
# Handle extraction errors
pass
# ❌ Avoid - Catch all exceptions
try:
run_pipeline(config)
except Exception:
pass # What went wrong?
👍 Provide Context¶
# ✅ Good - Detailed error context
from docling_graph.exceptions import ExtractionError
try:
result = extract_data(source)
except Exception as e:
raise ExtractionError(
"Failed to extract data",
details={
"source": source,
"template": template.__name__,
"stage": "extraction"
},
cause=e
)
# ❌ Avoid - Generic errors
try:
result = extract_data(source)
except Exception as e:
raise Exception("Extraction failed")
👍 Log Before Raising¶
# ✅ Good - Log then raise
import logging
logger = logging.getLogger(__name__)
try:
run_pipeline(config)
except ExtractionError as e:
logger.error(f"Extraction failed: {e}", exc_info=True)
raise
# ❌ Avoid - Silent failures
try:
run_pipeline(config)
except ExtractionError:
pass # Error lost!
👍 Clean Up Resources¶
# ✅ Good - Always clean up
try:
run_pipeline(config)
finally:
# Clean up even if error occurs
cleanup_resources()
# ❌ Avoid - No cleanup on error
try:
run_pipeline(config)
cleanup_resources() # Not called if error!
except:
pass
Troubleshooting Guide¶
Common Errors¶
| Error | Cause | Solution |
|---|---|---|
ConfigurationError: VLM backend only supports local inference |
VLM with remote | Use inference="local" |
ClientError: API key not found |
Missing API key | Set environment variable |
ExtractionError: Empty extraction result |
Poor template | Improve field descriptions |
ValidationError: Field required |
Missing data | Make field optional |
GraphError: Invalid graph structure |
Bad relationships | Check edge definitions |
Zero Data Loss Best Practices¶
1. Always Check Result Count¶
# ✅ Good - Check if merge succeeded
results = run_pipeline(config)
if len(results) == 1:
# Merged successfully
process_merged_model(results[0])
else:
# Got partial models
process_partial_models(results)
2. Handle Partial Models Gracefully¶
# ✅ Good - Extract what you can from partial models
def get_document_no(models: List) -> str:
"""Get invoice number from any model that has it."""
for model in models:
if model.document_no:
return model.document_no
return "N/A"
3. Log Partial Results¶
# ✅ Good - Log when you get partial results
import logging
logger = logging.getLogger(__name__)
results = run_pipeline(config)
if len(results) > 1:
logger.warning(f"Got {len(results)} partial models instead of 1")
logger.info("Data preserved despite merge failure")
4. Provide User Feedback¶
# ✅ Good - Inform users about partial results
results = run_pipeline(config)
if len(results) == 1:
print("✅ Extraction complete")
else:
print(f"⚠ Extraction partially complete ({len(results)} fragments)")
print(" All data preserved - manual review recommended")
Next Steps¶
- Model Merging → - Learn about zero data loss
- Testing → - Test error handling
- Exceptions Reference → - Full exception API
- Extraction Process → - Extraction guide