Pipeline Components¶
The ai4rag.components package provides reusable building blocks for RAG pipeline workflows. These functions encapsulate the business logic that was previously inlined in Kubeflow Pipeline components, making it available for use in any context — KFP pipelines, standalone scripts, notebooks, or tests.
Architecture¶
┌──────────────────────────────────────────────┐
│ pipelines-components (KFP wrappers) │
│ ┌─────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ @dsl. │ │ @dsl. │ │ @dsl. │ │
│ │component│ │component │ │ component │ │
│ └────┬────┘ └─────┬────┘ └──────┬───────┘ │
│ │ │ │ │
└───────┼────────────┼─────────────┼───────────┘
│ │ │
┌───────▼────────────▼─────────────▼───────────┐
│ ai4rag (business logic) │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ components/ │ │
│ │ data/ │ │
│ │ optimization/ │ │
│ │ assets_generator/ │ │
│ │ notebook, leaderboard, templates │ │
│ └───────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────┐ │
│ │ core/ — experiment, HPO, search space │ │
│ └──────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
KFP wrappers handle only artifact I/O (reading dsl.Input[Artifact], writing dsl.Output[Artifact]) and Kubernetes-specific concerns (secrets, resource limits). All business logic lives in ai4rag.
Installation¶
S3 support (boto3), multiprocessing (multiprocess), and text extraction (docling) are all included in the core ai4rag install — no extra dependencies are needed to use pipeline components.
Data Components¶
Document Discovery¶
List and sample documents from an S3-compatible bucket:
from ai4rag.components.data import discover_documents
result = discover_documents(
bucket_name="my-bucket",
prefix="documents/",
sampling_enabled=True,
sampling_max_size_gb=1.0,
)
print(f"Found {result.count} documents ({result.total_size_bytes} bytes)")
result.save("/tmp/discovery_output")
Text Extraction¶
Download documents from S3 and extract text using Docling:
from ai4rag.components.data import extract_text
result = extract_text(
documents=[{"key": "docs/report.pdf", "size_bytes": 1024}],
bucket="my-bucket",
output_dir="/tmp/extracted",
max_extraction_workers=4,
)
print(f"Processed {result.processed_count}/{result.total_documents}")
Document Indexing¶
Chunk, embed, and index documents into a vector store:
from ai4rag.components.data import index_documents
from ai4rag.components import create_ogx_client
client = create_ogx_client(base_url="http://localhost:8321", api_key="key")
total_chunks = index_documents(
extracted_text_dir="/tmp/extracted",
embedding_model_id="ibm/slate-125m-english-rtrvr",
vector_io_provider_id="milvus",
ogx_client=client,
chunking_method="hybrid",
chunk_size=1024,
)
Test Data Loading¶
Load benchmark test data from S3:
from ai4rag.components.data import load_test_data
result = load_test_data(
bucket_name="my-bucket",
key="benchmarks/test_data.json",
benchmark_sample_size=25,
)
print(f"Loaded {result.record_count} records (sampled: {result.sampled})")
Optimization Components¶
Search Space Preparation¶
Build a search space report with model pre-selection:
from ai4rag.components.optimization import prepare_search_space_report
report = prepare_search_space_report(
test_data_path="/tmp/test_data.json",
extracted_text_path="/tmp/extracted/",
ogx_client=client,
embedding_models=["ibm/slate-125m-english-rtrvr"],
generation_models=["ibm/granite-3.1-8b-instruct"],
)
report.save_yaml("/tmp/search_space.yaml")
RAG Optimization¶
Run a full optimization experiment:
from ai4rag.components.optimization import run_rag_optimization
result = run_rag_optimization(
extracted_text_path="/tmp/extracted/",
test_data_path="/tmp/test_data.json",
search_space_report_path="/tmp/search_space.yaml",
output_dir="/tmp/rag_patterns/",
ogx_client=client,
vector_io_provider_id="milvus",
test_data_key="benchmarks/test_data.json",
input_data_key="documents/",
)
print(f"Generated {len(result.patterns)} patterns")
Shared Utilities¶
The ai4rag.components package provides three shared utility modules used across components:
| Module | Function | Purpose |
|---|---|---|
utils.s3 | create_s3_client() | S3 client factory with env-var fallback |
utils.ogx_client | create_ogx_client() | OGX client with SSL self-signed cert fallback |
utils.docling_io | load_docling_documents() | Load DoclingDocument JSON files |
These are importable from ai4rag.components or ai4rag.components.utils:
Design Principles¶
- No KFP types: Functions accept plain Python types (
str,Path,dict) and return frozen dataclasses. - Dependency injection: All functions accept pre-configured clients (S3, OGX) as optional parameters — when omitted, clients are created from environment variables.
- Lazy imports: Heavy optional dependencies (
boto3,multiprocess,docling) are imported only when used. - SSL fallback: All S3 and OGX operations automatically retry with
verify=Falsewhen self-signed certificate errors are detected.