run_pipeline()¶
Overview¶
The run_pipeline() function is the main entry point for executing the document-to-graph pipeline programmatically.
Function Signature:
Returns: PipelineContext object containing the knowledge graph, Pydantic model, and other pipeline results.
Basic Usage¶
Default Behavior (No File Exports)¶
from docling_graph import run_pipeline
# Returns data directly - no file exports by default
context = run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"backend": "llm",
"inference": "remote"
})
# Access results
graph = context.knowledge_graph
models = context.extracted_models or []
print(f"Extracted {graph.number_of_nodes()} nodes, {len(models)} model(s)")
With File Exports¶
from docling_graph import run_pipeline
# Enable file exports with dump_to_disk
context = run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"backend": "llm",
"inference": "remote",
"dump_to_disk": True,
"output_dir": "outputs"
})
# Results available both in memory and on disk
graph = context.knowledge_graph
# Files also written to outputs/
With PipelineConfig¶
from docling_graph import PipelineConfig, run_pipeline
config = PipelineConfig(
source="document.pdf",
template="templates.BillingDocument",
backend="llm",
inference="remote"
)
# Returns PipelineContext
context = run_pipeline(config)
Parameters¶
config¶
Type: PipelineConfig | Dict[str, Any]
Required: Yes
Description: Pipeline configuration as either:
- PipelineConfig object (recommended)
- Dictionary with configuration keys
Configuration Keys¶
Required Keys¶
| Key | Type | Description |
|---|---|---|
source |
str |
Path to source document |
template |
str | Type[BaseModel] |
Pydantic template (dotted path or class) |
Optional Keys¶
| Key | Type | Default | Description |
|---|---|---|---|
backend |
str |
"llm" |
Backend type: "llm" or "vlm" |
inference |
str |
"local" |
Inference mode: "local" or "remote" |
processing_mode |
str |
"many-to-one" |
Processing strategy |
docling_config |
str |
"ocr" |
Docling pipeline: "ocr" or "vision" |
use_chunking |
bool |
True |
Enable document chunking |
dump_to_disk |
bool or None |
None |
Control file exports (None=auto: CLI=True, API=False) |
export_format |
str |
"csv" |
Export format: "csv" or "cypher" |
model_override |
str |
None |
Override model name |
provider_override |
str |
None |
Override provider name |
See PipelineConfig for complete list.
Return Value¶
Type: PipelineContext
Returns a PipelineContext object containing:
| Attribute | Type | Description |
|---|---|---|
knowledge_graph |
nx.DiGraph |
NetworkX directed graph with extracted entities and relationships |
extracted_models |
list[BaseModel] |
List of validated Pydantic model instances (one or more) |
graph_metadata |
GraphMetadata |
Graph statistics (node/edge counts, etc.) |
docling_document |
DoclingDocument or None |
Original Docling document (if available) |
config |
PipelineConfig |
Pipeline configuration used |
Example:
context = run_pipeline(config)
# Access the knowledge graph
graph = context.knowledge_graph
print(f"Nodes: {graph.number_of_nodes()}")
print(f"Edges: {graph.number_of_edges()}")
# Access extracted Pydantic models
for model in context.extracted_models or []:
print(f"Model type: {type(model).__name__}")
Exceptions¶
ConfigurationError¶
Raised when configuration is invalid.
from docling_graph import run_pipeline
from docling_graph.exceptions import ConfigurationError
try:
run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"backend": "invalid" # Invalid backend
})
except ConfigurationError as e:
print(f"Configuration error: {e.message}")
print(f"Details: {e.details}")
ExtractionError¶
Raised when document extraction fails.
from docling_graph.exceptions import ExtractionError
try:
run_pipeline({
"source": "document.pdf",
"template": "templates.Missing" # Template not found
})
except ExtractionError as e:
print(f"Extraction failed: {e.message}")
PipelineError¶
Raised when pipeline execution fails.
from docling_graph.exceptions import PipelineError
try:
run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument"
})
except PipelineError as e:
print(f"Pipeline error: {e.message}")
Complete Examples¶
📍 Minimal Configuration (API Mode)¶
from docling_graph import run_pipeline
# Minimal required configuration - returns data, no file exports
context = run_pipeline({
"source": "invoice.pdf",
"template": "templates.BillingDocument"
})
# Access results in memory
graph = context.knowledge_graph
invoice = (context.extracted_models or [None])[0]
print(f"Extracted invoice with {graph.number_of_nodes()} entities")
📍 Remote LLM¶
import os
from docling_graph import run_pipeline
# Set API key
os.environ["MISTRAL_API_KEY"] = "your-key"
# Configure for remote inference
context = run_pipeline({
"source": "research.pdf",
"template": "templates.ScholarlyRheologyPaper",
"backend": "llm",
"inference": "remote",
"provider_override": "mistral",
"model_override": "mistral-large-latest",
"processing_mode": "many-to-one",
"use_chunking": True
})
# Access the knowledge graph
graph = context.knowledge_graph
print(f"Rheology research: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges")
📍 Local VLM¶
from docling_graph import run_pipeline
# VLM for form extraction
context = run_pipeline({
"source": "form.jpg",
"template": "templates.IDCard",
"backend": "vlm",
"inference": "local",
"processing_mode": "one-to-one",
"docling_config": "vision"
})
# Access extracted data (one-to-one returns one model per page; take first)
id_card = (context.extracted_models or [None])[0]
if id_card:
print(f"Name: {id_card.first_name} {id_card.last_name}")
📍 With Error Handling¶
from docling_graph import run_pipeline, PipelineContext
from docling_graph.exceptions import (
ConfigurationError,
ExtractionError,
PipelineError
)
import logging
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_document(source: str, template: str) -> PipelineContext | None:
"""Process document with comprehensive error handling."""
try:
context = run_pipeline({
"source": source,
"template": template,
"backend": "llm",
"inference": "remote"
})
logger.info(f"✅ Successfully processed: {source}")
logger.info(f" Nodes: {context.knowledge_graph.number_of_nodes()}")
return context
except ConfigurationError as e:
logger.error(f"Configuration error for {source}: {e.message}")
if e.details:
logger.error(f"Details: {e.details}")
return None
except ExtractionError as e:
logger.error(f"Extraction failed for {source}: {e.message}")
return None
except PipelineError as e:
logger.error(f"Pipeline error for {source}: {e.message}")
return None
except Exception as e:
logger.exception(f"Unexpected error for {source}: {e}")
return None
# Use the function
context = process_document("invoice.pdf", "templates.BillingDocument")
if context:
print(f"Graph has {context.knowledge_graph.number_of_nodes()} nodes")
📍 Batch Processing (Memory-Efficient)¶
from pathlib import Path
from docling_graph import run_pipeline
def batch_process(input_dir: str, template: str):
"""Process all PDFs in a directory without disk writes."""
documents = Path(input_dir).glob("*.pdf")
results = {"success": [], "failed": []}
all_graphs = []
for doc in documents:
try:
# Process without file exports
context = run_pipeline({
"source": str(doc),
"template": template
})
# Store graph in memory
all_graphs.append({
"filename": doc.name,
"graph": context.knowledge_graph,
"models": context.extracted_models
})
results["success"].append(doc.name)
print(f"✅ {doc.name}: {context.knowledge_graph.number_of_nodes()} nodes")
except Exception as e:
results["failed"].append((doc.name, str(e)))
print(f"❌ {doc.name}: {e}")
# Summary
print(f"\nProcessed: {len(results['success'])} succeeded, {len(results['failed'])} failed")
return results, all_graphs
# Run batch processing
results, graphs = batch_process("documents/", "templates.BillingDocument")
# Optionally export combined results
if graphs:
print(f"\nTotal entities across all documents: {sum(g['graph'].number_of_nodes() for g in graphs)}")
Advanced Usage¶
dump_to_disk Behavior¶
The dump_to_disk parameter controls file exports:
from docling_graph import run_pipeline
# Default: No file exports (API mode)
context = run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument"
})
# Returns data in memory only
# Explicit: Disable file exports
context = run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"dump_to_disk": False
})
# Returns data only
CLI vs API defaults
CLI mode defaults to dump_to_disk=True, API mode defaults to dump_to_disk=False.
Custom Models Configuration¶
from docling_graph import run_pipeline
# Override models from config
context = run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"backend": "llm",
"inference": "remote",
"models": {
"llm": {
"remote": {
"model": "gpt-4o",
"provider": "openai"
}
}
}
})
# Access results
graph = context.knowledge_graph
Conditional Processing¶
from pathlib import Path
from docling_graph import run_pipeline
def smart_process(source: str):
"""Choose configuration based on document type."""
path = Path(source)
# Determine template and config
if "invoice" in path.name.lower():
template = "templates.BillingDocument"
backend = "vlm"
processing = "one-to-one"
elif "research" in path.name.lower():
template = "templates.ScholarlyRheologyPaper"
backend = "llm"
processing = "many-to-one"
else:
raise ValueError(f"Unknown document type: {path.name}")
# Process and return results
context = run_pipeline({
"source": source,
"template": template,
"backend": backend,
"processing_mode": processing
})
return context
# Use smart processing
invoice_context = smart_process("invoice_001.pdf")
research_context = smart_process("research_paper.pdf")
print(f"Invoice nodes: {invoice_context.knowledge_graph.number_of_nodes()}")
print(f"Research nodes: {research_context.knowledge_graph.number_of_nodes()}")
Integration Patterns¶
Flask API (Memory-Efficient)¶
from flask import Flask, request, jsonify
from docling_graph import run_pipeline
from pathlib import Path
import uuid
app = Flask(__name__)
@app.route('/process', methods=['POST'])
def process_endpoint():
"""API endpoint for document processing - returns data without disk writes."""
file = request.files.get('document')
template = request.form.get('template', 'templates.BillingDocument')
if not file:
return jsonify({"error": "No file provided"}), 400
# Save temporarily
temp_id = str(uuid.uuid4())
temp_path = f"temp/{temp_id}_{file.filename}"
Path("temp").mkdir(exist_ok=True)
file.save(temp_path)
try:
# Process without file exports (memory-efficient)
context = run_pipeline({
"source": temp_path,
"template": template
})
# Extract data from context
graph = context.knowledge_graph
models = context.extracted_models or []
return jsonify({
"status": "success",
"id": temp_id,
"nodes": graph.number_of_nodes(),
"edges": graph.number_of_edges(),
"data": models[0].model_dump() if models else None
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
finally:
# Cleanup
Path(temp_path).unlink(missing_ok=True)
if __name__ == '__main__':
app.run(debug=True)
Celery Task (With Return Data)¶
from celery import Celery
from docling_graph import run_pipeline
from pathlib import Path
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def process_document_task(source: str, template: str):
"""Async document processing task - returns graph statistics."""
try:
context = run_pipeline({
"source": source,
"template": template
})
graph = context.knowledge_graph
return {
"status": "success",
"nodes": graph.number_of_nodes(),
"edges": graph.number_of_edges(),
"node_types": list(set(data.get('type') for _, data in graph.nodes(data=True)))
}
except Exception as e:
return {"status": "error", "message": str(e)}
# Usage
result = process_document_task.delay(
"document.pdf",
"templates.BillingDocument"
)
# Get result
data = result.get(timeout=300)
print(f"Processed: {data['nodes']} nodes, {data['edges']} edges")
Airflow Operator (With XCom)¶
from airflow.operators.python import PythonOperator
from docling_graph import run_pipeline
def process_document(**context):
"""Airflow task for document processing - pushes results to XCom."""
params = context['params']
# Process and get results
pipeline_context = run_pipeline({
"source": params['source'],
"template": params['template']
})
# Push graph statistics to XCom
graph = pipeline_context.knowledge_graph
context['task_instance'].xcom_push(
key='graph_stats',
value={
'nodes': graph.number_of_nodes(),
'edges': graph.number_of_edges()
}
)
return "Processing complete"
# In DAG definition
process_task = PythonOperator(
task_id='process_document',
python_callable=process_document,
params={
'source': 'documents/daily.pdf',
'template': 'templates.BillingDocument'
}
)
Best Practices¶
👍 Use PipelineConfig for Type Safety¶
# ✅ Good - Type-safe with validation
from docling_graph import PipelineConfig, run_pipeline
config = PipelineConfig(
source="document.pdf",
template="templates.BillingDocument",
backend="llm" # Validated at creation
)
run_pipeline(config)
# ❌ Avoid - No validation until runtime
run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"backend": "invalid" # Error at runtime
})
👍 Handle Errors Explicitly¶
# ✅ Good - Specific error handling
from docling_graph.exceptions import ExtractionError
try:
run_pipeline(config)
except ExtractionError as e:
logger.error(f"Extraction failed: {e.message}")
# Implement retry or fallback
# ❌ Avoid - Silent failures
try:
run_pipeline(config)
except:
pass
Troubleshooting¶
🐛 Template Not Found¶
Error:
Solution:
import sys
from pathlib import Path
# Add project root to path
sys.path.append(str(Path.cwd()))
# Now import works
from docling_graph import run_pipeline
run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument"
})
🐛 API Key Not Found¶
Error:
Solution:
import os
# Set API key before running
os.environ["MISTRAL_API_KEY"] = "your-key"
from docling_graph import run_pipeline
run_pipeline({
"source": "document.pdf",
"template": "templates.BillingDocument",
"inference": "remote"
})
Next Steps¶
- PipelineConfig → - Configuration class
- Programmatic Examples → - More examples
- Batch Processing → - Batch patterns