Skip to content

run_pipeline()

Overview

The run_pipeline() function is the main entry point for executing the document-to-graph pipeline programmatically.

Function Signature:

def run_pipeline(config: Union[PipelineConfig, Dict[str, Any]]) -> PipelineContext

Returns: PipelineContext object containing the knowledge graph, Pydantic model, and other pipeline results.


Basic Usage

Default Behavior (No File Exports)

from docling_graph import run_pipeline

# Returns data directly - no file exports by default
context = run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument",
    "backend": "llm",
    "inference": "remote"
})

# Access results
graph = context.knowledge_graph
models = context.extracted_models or []
print(f"Extracted {graph.number_of_nodes()} nodes, {len(models)} model(s)")

With File Exports

from docling_graph import run_pipeline

# Enable file exports with dump_to_disk
context = run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument",
    "backend": "llm",
    "inference": "remote",
    "dump_to_disk": True,
    "output_dir": "outputs"
})

# Results available both in memory and on disk
graph = context.knowledge_graph
# Files also written to outputs/

With PipelineConfig

from docling_graph import PipelineConfig, run_pipeline

config = PipelineConfig(
    source="document.pdf",
    template="templates.BillingDocument",
    backend="llm",
    inference="remote"
)

# Returns PipelineContext
context = run_pipeline(config)

Parameters

config

Type: PipelineConfig | Dict[str, Any]

Required: Yes

Description: Pipeline configuration as either: - PipelineConfig object (recommended) - Dictionary with configuration keys


Configuration Keys

Required Keys

Key Type Description
source str Path to source document
template str | Type[BaseModel] Pydantic template (dotted path or class)

Optional Keys

Key Type Default Description
backend str "llm" Backend type: "llm" or "vlm"
inference str "local" Inference mode: "local" or "remote"
processing_mode str "many-to-one" Processing strategy
docling_config str "ocr" Docling pipeline: "ocr" or "vision"
use_chunking bool True Enable document chunking
dump_to_disk bool or None None Control file exports (None=auto: CLI=True, API=False)
export_format str "csv" Export format: "csv" or "cypher"
model_override str None Override model name
provider_override str None Override provider name

See PipelineConfig for complete list.


Return Value

Type: PipelineContext

Returns a PipelineContext object containing:

Attribute Type Description
knowledge_graph nx.DiGraph NetworkX directed graph with extracted entities and relationships
extracted_models list[BaseModel] List of validated Pydantic model instances (one or more)
graph_metadata GraphMetadata Graph statistics (node/edge counts, etc.)
docling_document DoclingDocument or None Original Docling document (if available)
config PipelineConfig Pipeline configuration used

Example:

context = run_pipeline(config)

# Access the knowledge graph
graph = context.knowledge_graph
print(f"Nodes: {graph.number_of_nodes()}")
print(f"Edges: {graph.number_of_edges()}")

# Access extracted Pydantic models
for model in context.extracted_models or []:
    print(f"Model type: {type(model).__name__}")


Exceptions

ConfigurationError

Raised when configuration is invalid.

from docling_graph import run_pipeline
from docling_graph.exceptions import ConfigurationError

try:
    run_pipeline({
        "source": "document.pdf",
        "template": "templates.BillingDocument",
        "backend": "invalid"  # Invalid backend
    })
except ConfigurationError as e:
    print(f"Configuration error: {e.message}")
    print(f"Details: {e.details}")

ExtractionError

Raised when document extraction fails.

from docling_graph.exceptions import ExtractionError

try:
    run_pipeline({
        "source": "document.pdf",
        "template": "templates.Missing"  # Template not found
    })
except ExtractionError as e:
    print(f"Extraction failed: {e.message}")

PipelineError

Raised when pipeline execution fails.

from docling_graph.exceptions import PipelineError

try:
    run_pipeline({
        "source": "document.pdf",
        "template": "templates.BillingDocument"
    })
except PipelineError as e:
    print(f"Pipeline error: {e.message}")

Complete Examples

📍 Minimal Configuration (API Mode)

from docling_graph import run_pipeline

# Minimal required configuration - returns data, no file exports
context = run_pipeline({
    "source": "invoice.pdf",
    "template": "templates.BillingDocument"
})

# Access results in memory
graph = context.knowledge_graph
invoice = (context.extracted_models or [None])[0]
print(f"Extracted invoice with {graph.number_of_nodes()} entities")

📍 Remote LLM

import os
from docling_graph import run_pipeline

# Set API key
os.environ["MISTRAL_API_KEY"] = "your-key"

# Configure for remote inference
context = run_pipeline({
    "source": "research.pdf",
    "template": "templates.ScholarlyRheologyPaper",
    "backend": "llm",
    "inference": "remote",
    "provider_override": "mistral",
    "model_override": "mistral-large-latest",
    "processing_mode": "many-to-one",
    "use_chunking": True
})

# Access the knowledge graph
graph = context.knowledge_graph
print(f"Rheology research: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges")

📍 Local VLM

from docling_graph import run_pipeline

# VLM for form extraction
context = run_pipeline({
    "source": "form.jpg",
    "template": "templates.IDCard",
    "backend": "vlm",
    "inference": "local",
    "processing_mode": "one-to-one",
    "docling_config": "vision"
})

# Access extracted data (one-to-one returns one model per page; take first)
id_card = (context.extracted_models or [None])[0]
if id_card:
    print(f"Name: {id_card.first_name} {id_card.last_name}")

📍 With Error Handling

from docling_graph import run_pipeline, PipelineContext
from docling_graph.exceptions import (
    ConfigurationError,
    ExtractionError,
    PipelineError
)
import logging
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_document(source: str, template: str) -> PipelineContext | None:
    """Process document with comprehensive error handling."""
    try:
        context = run_pipeline({
            "source": source,
            "template": template,
            "backend": "llm",
            "inference": "remote"
        })
        logger.info(f"✅ Successfully processed: {source}")
        logger.info(f"  Nodes: {context.knowledge_graph.number_of_nodes()}")
        return context

    except ConfigurationError as e:
        logger.error(f"Configuration error for {source}: {e.message}")
        if e.details:
            logger.error(f"Details: {e.details}")
        return None

    except ExtractionError as e:
        logger.error(f"Extraction failed for {source}: {e.message}")
        return None

    except PipelineError as e:
        logger.error(f"Pipeline error for {source}: {e.message}")
        return None

    except Exception as e:
        logger.exception(f"Unexpected error for {source}: {e}")
        return None

# Use the function
context = process_document("invoice.pdf", "templates.BillingDocument")
if context:
    print(f"Graph has {context.knowledge_graph.number_of_nodes()} nodes")

📍 Batch Processing (Memory-Efficient)

from pathlib import Path
from docling_graph import run_pipeline

def batch_process(input_dir: str, template: str):
    """Process all PDFs in a directory without disk writes."""
    documents = Path(input_dir).glob("*.pdf")
    results = {"success": [], "failed": []}
    all_graphs = []

    for doc in documents:
        try:
            # Process without file exports
            context = run_pipeline({
                "source": str(doc),
                "template": template
            })

            # Store graph in memory
            all_graphs.append({
                "filename": doc.name,
                "graph": context.knowledge_graph,
                "models": context.extracted_models
            })

            results["success"].append(doc.name)
            print(f"✅ {doc.name}: {context.knowledge_graph.number_of_nodes()} nodes")

        except Exception as e:
            results["failed"].append((doc.name, str(e)))
            print(f"❌ {doc.name}: {e}")

    # Summary
    print(f"\nProcessed: {len(results['success'])} succeeded, {len(results['failed'])} failed")
    return results, all_graphs

# Run batch processing
results, graphs = batch_process("documents/", "templates.BillingDocument")

# Optionally export combined results
if graphs:
    print(f"\nTotal entities across all documents: {sum(g['graph'].number_of_nodes() for g in graphs)}")

Advanced Usage

dump_to_disk Behavior

The dump_to_disk parameter controls file exports:

from docling_graph import run_pipeline

# Default: No file exports (API mode)
context = run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument"
})
# Returns data in memory only

# Explicit: Disable file exports
context = run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument",
    "dump_to_disk": False
})
# Returns data only

CLI vs API defaults

CLI mode defaults to dump_to_disk=True, API mode defaults to dump_to_disk=False.

Custom Models Configuration

from docling_graph import run_pipeline

# Override models from config
context = run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument",
    "backend": "llm",
    "inference": "remote",
    "models": {
        "llm": {
            "remote": {
                "model": "gpt-4o",
                "provider": "openai"
            }
        }
    }
})

# Access results
graph = context.knowledge_graph

Conditional Processing

from pathlib import Path
from docling_graph import run_pipeline

def smart_process(source: str):
    """Choose configuration based on document type."""
    path = Path(source)

    # Determine template and config
    if "invoice" in path.name.lower():
        template = "templates.BillingDocument"
        backend = "vlm"
        processing = "one-to-one"
    elif "research" in path.name.lower():
        template = "templates.ScholarlyRheologyPaper"
        backend = "llm"
        processing = "many-to-one"
    else:
        raise ValueError(f"Unknown document type: {path.name}")

    # Process and return results
    context = run_pipeline({
        "source": source,
        "template": template,
        "backend": backend,
        "processing_mode": processing
    })

    return context

# Use smart processing
invoice_context = smart_process("invoice_001.pdf")
research_context = smart_process("research_paper.pdf")

print(f"Invoice nodes: {invoice_context.knowledge_graph.number_of_nodes()}")
print(f"Research nodes: {research_context.knowledge_graph.number_of_nodes()}")

Integration Patterns

Flask API (Memory-Efficient)

from flask import Flask, request, jsonify
from docling_graph import run_pipeline
from pathlib import Path
import uuid

app = Flask(__name__)

@app.route('/process', methods=['POST'])
def process_endpoint():
    """API endpoint for document processing - returns data without disk writes."""
    file = request.files.get('document')
    template = request.form.get('template', 'templates.BillingDocument')

    if not file:
        return jsonify({"error": "No file provided"}), 400

    # Save temporarily
    temp_id = str(uuid.uuid4())
    temp_path = f"temp/{temp_id}_{file.filename}"
    Path("temp").mkdir(exist_ok=True)
    file.save(temp_path)

    try:
        # Process without file exports (memory-efficient)
        context = run_pipeline({
            "source": temp_path,
            "template": template
        })

        # Extract data from context
        graph = context.knowledge_graph
        models = context.extracted_models or []

        return jsonify({
            "status": "success",
            "id": temp_id,
            "nodes": graph.number_of_nodes(),
            "edges": graph.number_of_edges(),
            "data": models[0].model_dump() if models else None
        })

    except Exception as e:
        return jsonify({
            "status": "error",
            "message": str(e)
        }), 500

    finally:
        # Cleanup
        Path(temp_path).unlink(missing_ok=True)

if __name__ == '__main__':
    app.run(debug=True)

Celery Task (With Return Data)

from celery import Celery
from docling_graph import run_pipeline
from pathlib import Path

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def process_document_task(source: str, template: str):
    """Async document processing task - returns graph statistics."""
    try:
        context = run_pipeline({
            "source": source,
            "template": template
        })

        graph = context.knowledge_graph
        return {
            "status": "success",
            "nodes": graph.number_of_nodes(),
            "edges": graph.number_of_edges(),
            "node_types": list(set(data.get('type') for _, data in graph.nodes(data=True)))
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}

# Usage
result = process_document_task.delay(
    "document.pdf",
    "templates.BillingDocument"
)
# Get result
data = result.get(timeout=300)
print(f"Processed: {data['nodes']} nodes, {data['edges']} edges")

Airflow Operator (With XCom)

from airflow.operators.python import PythonOperator
from docling_graph import run_pipeline

def process_document(**context):
    """Airflow task for document processing - pushes results to XCom."""
    params = context['params']

    # Process and get results
    pipeline_context = run_pipeline({
        "source": params['source'],
        "template": params['template']
    })

    # Push graph statistics to XCom
    graph = pipeline_context.knowledge_graph
    context['task_instance'].xcom_push(
        key='graph_stats',
        value={
            'nodes': graph.number_of_nodes(),
            'edges': graph.number_of_edges()
        }
    )

    return "Processing complete"

# In DAG definition
process_task = PythonOperator(
    task_id='process_document',
    python_callable=process_document,
    params={
        'source': 'documents/daily.pdf',
        'template': 'templates.BillingDocument'
    }
)

Best Practices

👍 Use PipelineConfig for Type Safety

# ✅ Good - Type-safe with validation
from docling_graph import PipelineConfig, run_pipeline

config = PipelineConfig(
    source="document.pdf",
    template="templates.BillingDocument",
    backend="llm"  # Validated at creation
)
run_pipeline(config)

# ❌ Avoid - No validation until runtime
run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument",
    "backend": "invalid"  # Error at runtime
})

👍 Handle Errors Explicitly

# ✅ Good - Specific error handling
from docling_graph.exceptions import ExtractionError

try:
    run_pipeline(config)
except ExtractionError as e:
    logger.error(f"Extraction failed: {e.message}")
    # Implement retry or fallback

# ❌ Avoid - Silent failures
try:
    run_pipeline(config)
except:
    pass

Troubleshooting

🐛 Template Not Found

Error:

ModuleNotFoundError: No module named 'templates'

Solution:

import sys
from pathlib import Path

# Add project root to path
sys.path.append(str(Path.cwd()))

# Now import works
from docling_graph import run_pipeline

run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument"
})

🐛 API Key Not Found

Error:

ConfigurationError: API key not found for provider: mistral

Solution:

import os

# Set API key before running
os.environ["MISTRAL_API_KEY"] = "your-key"

from docling_graph import run_pipeline

run_pipeline({
    "source": "document.pdf",
    "template": "templates.BillingDocument",
    "inference": "remote"
})


Next Steps

  1. PipelineConfig → - Configuration class
  2. Programmatic Examples → - More examples
  3. Batch Processing → - Batch patterns