Skip to content

Data Flow

This page provides a detailed analysis of how data flows through the ai4rag system during optimization, from documents to optimal RAG configuration.


Overview

ai4rag's data flow consists of four distinct phases executed for each configuration evaluated during hyperparameter optimization:

  1. Indexing Phase: Documents → Chunks → Embeddings → Vector Store
  2. Query Phase: Question → Retrieval → Context → Generation → Answer
  3. Evaluation Phase: Answers + Ground Truth → Metrics → Scores
  4. Optimization Loop: Scores → Optimizer → Next Configuration

Indexing Phase

The indexing phase transforms raw documents into searchable vector embeddings stored in a vector database.

sequenceDiagram
    participant Exp as AI4RAGExperiment
    participant LC as BaseChunker
    participant EM as OGXEmbeddingModel
    participant VS as OGXVectorStore
    participant OGXClient as OGX Client

    Exp->>Exp: check if collection exists
    alt Collection exists
        Note over Exp: Reuse existing collection
    else New collection needed
        Exp->>LC: split_documents(DoclingDocuments)
        activate LC
        LC->>LC: apply chunking method
        LC->>LC: set document_id and sequence_number
        LC-->>Exp: AI4RAGChunks (with metadata)
        deactivate LC

        Exp->>VS: add_documents(chunks)
        activate VS
        VS->>EM: embed_documents([chunk.text])
        activate EM
        loop Batches of 1024 chunks
            EM->>OGXClient: embeddings.create(batch)
            OGXClient-->>EM: embeddings
        end
        EM-->>VS: all embeddings
        deactivate EM

        loop Batches of 2048 chunks
            VS->>OGXClient: vector_io.insert(chunks + embeddings)
            OGXClient-->>VS: success
        end
        VS-->>Exp: indexing complete
        deactivate VS
    end

Document Processing

Input Documents:

Documents are provided as list[DoclingDocument] (from the docling-core library). Each document is a structured representation of a parsed file, with the document name used as document_id. If the name is not set, a content-based hash is generated instead.

Chunking

The configured chunker (DoclingChunker or LangChainChunker) splits DoclingDocument objects into AI4RAGChunk instances. Both chunkers use tiktoken for token counting.

DoclingChunker (structure-aware): Operates directly on the document structure, preserving headings and tables. Does not support overlap.

LangChainChunker (token-based): Converts each DoclingDocument to markdown internally, then applies RecursiveCharacterTextSplitter with token-based length measurement.

Metadata:

Each chunk receives:

AI4RAGChunk(
    text="Chunk text content...",
    metadata={
        "document_id": "original_doc_id",    # From document name or content hash
        "sequence_number": 1,                 # Position within document
        # chunker-specific keys (e.g. "start_index" for LangChain, "headings" for Docling)
    }
)

Sequence numbering: - Sequence numbers assigned sequentially per document - Enables window-based retrieval (adjacent chunks)

Output:

[
    AI4RAGChunk(
        text="First chunk text...",
        metadata={"document_id": "doc1", "sequence_number": 1}
    ),
    AI4RAGChunk(
        text="Second chunk text...",
        metadata={"document_id": "doc1", "sequence_number": 2}
    ),
    # ...
]

Embedding

OGXEmbeddingModel converts chunk text to vector embeddings:

Auto-detection (on first use):

embedding_model = OGXEmbeddingModel(
    model_id="ollama/nomic-embed-text:latest",
    client=ogx_client,
    params={"embedding_dimension": 768, "context_length": 8192}  # Optional
)

If embedding_dimension or context_length not provided: - embedding_dimension: Detected via test embedding call (counts dimensions) - context_length: Detected via binary search (64 to 8192 tokens, ~5 API calls)

Batch Processing:

Embeddings created in batches of 1024 chunks to respect API limits:

def embed_documents(texts: list[str]) -> list[list[float]]:
    embeddings = []
    for idx in range(0, len(texts), 1024):
        batch = texts[idx : idx + 1024]
        batch_embeddings = self.client.embeddings.create(
            input=batch,
            model=self.model_id
        )
        embeddings.extend([data.embedding for data in batch_embeddings.data])
    return embeddings

Output:

[
    [0.023, -0.145, 0.678, ...],  # 768-dimensional vector for chunk 1
    [0.091, -0.023, 0.445, ...],  # 768-dimensional vector for chunk 2
    # ...
]

Vector Store Insertion

OGXVectorStore stores chunks and embeddings in OGX vector database:

Collection Creation:

vs = client.vector_stores.create(
    extra_body={
        "provider_id": "milvus",  # From ogx_vector_io_provider_id="milvus"
        "embedding_model": "ollama/nomic-embed-text:latest",
        "embedding_dimension": 768,
    }
)
collection_name = vs.id  # Unique collection ID

Batch Insertion:

Chunks inserted in batches of 2048:

chunks = [
    {
        "content": chunk.text,
        "chunk_metadata": chunk.metadata,
        "chunk_id": chunk.metadata["document_id"],
        "embedding_model": embedding_model.model_id,
        "embedding_dimension": 768,
        "embedding": embedding_vector,
    }
    for chunk, embedding_vector in zip(chunks, embeddings)
]

for idx in range(0, len(chunks), 2048):
    client.vector_io.insert(
        vector_store_id=collection_name,
        chunks=chunks[idx : idx + 2048]
    )

Collection Reuse

Key Optimization: Collections are reused when indexing_params match:

indexing_params = {
    "chunking": {
        "chunking_method": "recursive",
        "chunk_size": 512,
        "chunk_overlap": 128,
    },
    "embedding": {
        "model_id": "ollama/nomic-embed-text:latest",
        "distance_metric": "cosine",
        "embedding_dimension": 768,
        "context_length": 8192,
    }
}

Reuse Logic:

  1. Hash indexing_params to generate canonical representation
  2. Check results.collection_names for matching params
  3. If match found, reuse collection_name (skip chunking, embedding, insertion)
  4. If no match, create new collection

Performance Impact:

  • Avoids re-chunking and re-embedding when only retrieval/generation params change
  • Typical speedup: 50-80% per evaluation when collection reused

Query Phase

The query phase retrieves relevant chunks and generates answers for benchmark questions.

sequenceDiagram
    participant QR as query_rag()
    participant RAG as SimpleRAG
    participant Ret as Retriever
    participant VS as OGXVectorStore
    participant EM as OGXEmbeddingModel
    participant FM as OGXFoundationModel
    participant OGXClient as OGX Client

    Note over QR: Parallel execution (ThreadPoolExecutor)
    par Question 1
        QR->>RAG: generate(question_1)
    and Question 2
        QR->>RAG: generate(question_2)
    and Question N
        QR->>RAG: generate(question_N)
    end

    activate RAG
    RAG->>Ret: retrieve(question)
    activate Ret
    Ret->>EM: embed_query(question)
    EM->>OGXClient: embeddings.create(question)
    OGXClient-->>EM: query_embedding
    EM-->>Ret: query_embedding

    alt search_mode == "vector"
        Ret->>VS: search(query_embedding, k, mode="vector")
        VS->>OGXClient: vector_io.query(vector_store_id, params)
        OGXClient-->>VS: ranked chunks
    else search_mode == "hybrid"
        Ret->>VS: search(query_embedding, k, mode="hybrid", ranker_*)
        VS->>OGXClient: vector_io.query(vector_store_id, params + reranker_params)
        OGXClient-->>VS: re-ranked chunks (dense + sparse)
    end

    VS-->>Ret: retrieved documents
    Ret-->>RAG: reference_documents
    deactivate Ret

    RAG->>RAG: format context using context_template_text
    RAG->>RAG: format user message using user_message_text
    RAG->>FM: chat(messages)
    activate FM
    FM->>OGXClient: chat.completions.create(model, messages, params)
    OGXClient-->>FM: response
    FM-->>RAG: answer
    deactivate FM

    RAG-->>QR: {"answer": ..., "reference_documents": ..., "question": ...}
    deactivate RAG

Parallel Query Execution

ThreadPoolExecutor processes benchmark questions concurrently:

def query_rag(
    rag: BaseRAGTemplate,
    questions: list[str],
    max_threads: int = 10
) -> list[dict]:

    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        responses = list(executor.map(
            partial(_generate_response, rag=rag),
            questions
        ))
    return responses

Concurrency: - Default: 10 concurrent threads - Balances throughput with server load - Each thread executes full retrieval + generation pipeline independently

Retrieval

Retriever fetches relevant chunks from the vector store:

Step 1: Embed Query

query_embedding = embedding_model.embed_query(question)
# Returns: [0.123, -0.456, 0.789, ...]  (same dimension as document embeddings)

Step 2: Vector Search

Vector Mode (semantic search only):

params = {
    "max_chunks": 5,  # number_of_chunks
    "mode": "vector"
}
response = client.vector_io.query(
    query=question,
    vector_store_id=collection_name,
    params=params
)

Returns top-k chunks by cosine similarity (or other distance metric).

Hybrid Mode (semantic + keyword):

params = {
    "max_chunks": 5,
    "mode": "hybrid",
    "reranker_type": "rrf",  # or "weighted", "normalized"
    "reranker_params": {
        "impact_factor": 60  # ranker_k for RRF
        # or "alpha": 0.7 for weighted
    }
}
response = client.vector_io.query(...)

Combines dense vector search with sparse keyword search (e.g., BM25), then re-ranks using specified strategy.

Step 3: Convert to AI4RAGChunks

reference_documents = [
    AI4RAGChunk(
        text=chunk.content,
        metadata=chunk.chunk_metadata.to_dict()
    )
    for chunk in response.chunks
]

Context Formatting

SimpleRAG formats retrieved chunks into LLM context:

# Default context_template_text: "{document}\n"
context = "\n".join([
    foundation_model.context_template_text.format(document=chunk.text)
    for chunk in reference_documents
])

Example output:

According to the document: "First retrieved chunk text..."

According to the document: "Second retrieved chunk text..."

...

Customization:

foundation_model = OGXFoundationModel(
    model_id="ollama/llama3.2:3b",
    client=client,
    context_template_text="Source {document}\n---\n"  # Custom format
)

User Message Construction

Combine context with question:

# Default user_message_text:
# "{reference_documents}\n\nQuestion: {question}\nAnswer:"

user_message = foundation_model.user_message_text.format(
    reference_documents=context,
    question=question
)

Example output:

According to the document: "Chunk 1..."

According to the document: "Chunk 2..."

Question: What is the capital of France?
Answer:

Generation

OGXFoundationModel generates answer via chat completion:

messages = [
    {
        "role": "system",
        "content": foundation_model.system_message_text
        # Default: "You are a helpful, respectful and honest assistant..."
    },
    {
        "role": "user",
        "content": user_message
    }
]

response = client.chat.completions.create(
    model=foundation_model.model_id,
    messages=messages,
    max_completion_tokens=1024,
    temperature=0.1
)

answer = response.choices[0].message.content

Output:

{
    "answer": "Based on the provided documents, Paris is the capital of France.",
    "reference_documents": [Document(...), Document(...), ...],
    "question": "What is the capital of France?"
}

Evaluation Phase

The evaluation phase compares generated answers against ground truth using unitxt metrics.

sequenceDiagram
    participant Exp as AI4RAGExperiment
    participant Builder as build_evaluation_data()
    participant UE as UnitxtEvaluator
    participant Unitxt as unitxt.evaluate()

    Exp->>Builder: build_evaluation_data(benchmark_data, inference_response)
    activate Builder
    loop For each question
        Builder->>Builder: extract answer, contexts, context_ids
        Builder->>Builder: match with ground_truths, ground_truth_doc_ids
        Builder->>Builder: create EvaluationData instance
    end
    Builder-->>Exp: evaluation_data: list[EvaluationData]
    deactivate Builder

    Exp->>UE: evaluate_metrics(evaluation_data, metrics)
    activate UE
    UE->>UE: convert to DataFrame
    UE->>Unitxt: evaluate(df, metric_names, compute_conf_intervals=True)
    Unitxt-->>UE: scores_df, ci_table

    UE->>UE: _handle_ci_calculations(ci_table)
    Note over UE: Extract mean, ci_low, ci_high<br/>for each metric

    UE->>UE: _handle_questions_scores(scores_df)
    Note over UE: Extract per-question scores<br/>for each metric

    UE-->>Exp: {"scores": {...}, "question_scores": {...}}
    deactivate UE

EvaluationData Assembly

build_evaluation_data() combines inference results with benchmark data:

Input: inference_response

[
    {
        "question": "What is X?",
        "answer": "Based on the documents, X is...",
        "reference_documents": [Document(...), Document(...)]
    },
    # ... one per benchmark question
]

Input: benchmark_data

BenchmarkData with:
- questions: ["What is X?", ...]
- correct_answers: [["X is...", "Alternative answer"], ...]
- document_ids: [["doc1", "doc3"], ...]  # Ground truth source docs
- questions_ids: ["q0", "q1", ...]

Output: EvaluationData list

[
    EvaluationData(
        question="What is X?",
        answer="Based on the documents, X is...",
        contexts=[
            "Retrieved chunk 1 text...",
            "Retrieved chunk 2 text..."
        ],
        context_ids=["doc1", "doc1"],  # Source document IDs
        ground_truths=["X is...", "Alternative answer"],
        ground_truths_context_ids=["doc1", "doc3"],
        question_id="q0"
    ),
    # ...
]

Unitxt Evaluation

UnitxtEvaluator wraps the unitxt library for metric computation:

Metrics Evaluated:

ai4rag Metric Unitxt Metric Description
faithfulness metrics.rag.external_rag.faithfulness How well is the answer grounded in retrieved context?
answer_correctness metrics.rag.external_rag.answer_correctness How correct is the answer vs ground truth?
context_correctness metrics.rag.external_rag.context_correctness How relevant are retrieved docs vs ground truth docs?

Evaluation Process:

  1. Convert to DataFrame:
df = pd.DataFrame([data.to_dict() for data in evaluation_data])
  1. Call unitxt.evaluate():
scores_df, ci_table = evaluate(
    df,
    metric_names=[
        "metrics.rag.external_rag.faithfulness",
        "metrics.rag.external_rag.answer_correctness",
        "metrics.rag.external_rag.context_correctness"
    ],
    compute_conf_intervals=True
)
  1. Extract Aggregate Scores (mean + confidence intervals):
{
    "faithfulness": {
        "mean": 0.72,
        "ci_low": 0.61,
        "ci_high": 0.83
    },
    "answer_correctness": {
        "mean": 0.68,
        "ci_low": 0.55,
        "ci_high": 0.81
    },
    "context_correctness": {
        "mean": 0.80,
        "ci_low": 0.70,
        "ci_high": 0.90
    }
}
  1. Extract Per-Question Scores:
{
    "faithfulness": {
        "q0": 0.71,
        "q1": 0.73,
        "q2": 0.68,
        ...
    },
    "answer_correctness": {
        "q0": 0.65,
        "q1": 0.70,
        ...
    },
    ...
}

Optimization Score Calculation

The final optimization score is extracted from the aggregate results:

optimization_metric = "faithfulness"  # User-configured
optimization_score = result_scores["scores"][optimization_metric]["mean"]
# Example: 0.72

This single scalar value is returned to the optimizer.


Optimization Loop

The optimization loop iterates configurations until the evaluation budget is exhausted.

flowchart TD
    Start([Start Experiment]) --> MPS{Models Pre-Selection<br/>needed?}

    MPS -->|Yes| RunMPS[Run MPS on sample]
    RunMPS --> UpdateSS[Update search space<br/>with selected models]
    UpdateSS --> InitOpt[Initialize Optimizer]

    MPS -->|No| InitOpt

    InitOpt --> RandomPhase[Random Phase:<br/>Evaluate n_random_nodes]
    RandomPhase --> CheckBudget1{Reached<br/>max_evals?}

    CheckBudget1 -->|Yes| ReturnBest
    CheckBudget1 -->|No| GAMPhase

    GAMPhase[GAM Phase:<br/>Train model on evaluations]
    GAMPhase --> Predict[Predict scores for<br/>remaining configs]
    Predict --> SelectTop[Select top evals_per_trial<br/>configurations]
    SelectTop --> EvalSelected[Evaluate selected configs]

    EvalSelected --> CheckBudget2{Reached<br/>max_evals?}
    CheckBudget2 -->|No| GAMPhase
    CheckBudget2 -->|Yes| ReturnBest

    ReturnBest([Return Best Configuration])

Iteration Details

Each optimizer iteration follows this flow:

Random Phase (GAMOptimizer only):

def evaluate_initial_random_nodes():
    successful = count(evaluations where score is not None)

    while successful < n_random_nodes and len(evaluations) < max_evals:
        config = random.choice(unevaluated_combinations)
        score = objective_function(config)  # Calls run_single_evaluation()
        evaluations.append({"config": config, "score": score})
        if score is not None:
            successful += 1

GAM Phase:

def _run_iteration():
    # 1. Train GAM
    successful_evals = [e for e in evaluations if e["score"] is not None]
    X_train = encode_categorical_params(successful_evals)
    y_train = [e["score"] for e in successful_evals]
    gam = LinearGAM().fit(X_train, y_train)

    # 2. Predict
    remaining = get_unevaluated_configs()
    X_predict = encode_categorical_params(remaining)
    predictions = gam.predict(X_predict)

    # 3. Select top N
    ranked = sorted(
        zip(remaining, predictions),
        key=lambda x: x[1],
        reverse=True  # Highest predictions first
    )
    top_n = ranked[:evals_per_trial]

    # 4. Evaluate
    for config, predicted_score in top_n:
        actual_score = objective_function(config)
        evaluations.append({"config": config, "score": actual_score})

Results Caching

Two-level caching prevents redundant work:

Level 1: Results Cache (entire evaluation)

def results.evaluation_explored_or_cached(indexing_params, rag_params) -> float | None:
    cache_key = hash((indexing_params, rag_params))
    if cache_key in cached_evaluations:
        return cached_evaluations[cache_key]["final_score"]
    return None

If exact (indexing_params, rag_params) evaluated before, return cached score immediately.

Level 2: Collection Reuse (indexing only)

def results.get_existing_collection(indexing_params) -> str | None:
    cache_key = hash(indexing_params)
    if cache_key in existing_collections:
        return existing_collections[cache_key]["collection_name"]
    return None

If indexing_params match but rag_params differ, reuse collection (skip indexing, but re-run retrieval/generation/evaluation).

Cache Hit Rates:

  • Full cache hit: ~1% of time (optimizer rarely suggests exact duplicate)
  • Collection reuse: ~40-60% of time (common when varying only retrieval/generation params)

Event Streaming

Throughout the loop, event handlers receive status updates and results:

on_status_change() calls:

# Chunking
event_handler.on_status_change(
    level=LogLevel.INFO,
    message="Chunking documents using recursive method...",
    step=ExperimentStep.CHUNKING
)

# Embedding
event_handler.on_status_change(
    level=LogLevel.INFO,
    message="Embedding chunks using ollama/nomic-embed-text:latest...",
    step=ExperimentStep.EMBEDDING
)

# Generation
event_handler.on_status_change(
    level=LogLevel.INFO,
    message="Retrieval and generation using collection 'xyz'...",
    step=ExperimentStep.GENERATION
)

# Evaluation
event_handler.on_status_change(
    level=LogLevel.INFO,
    message="Evaluating RAG Pattern 'Pattern5'...",
    step="evaluation"
)

on_pattern_creation() calls:

After each evaluation completes:

event_handler.on_pattern_creation(
    payload={
        "pattern_name": "Pattern5",
        "iteration": 4,
        "final_score": 0.72,
        "execution_time": 134,
        "scores": {...},
        "settings": {
            "chunking": {...},
            "embedding": {...},
            "retrieval": {...},
            "generation": {...}
        }
    },
    evaluation_results=[
        {"question": ..., "answer": ..., "scores": {...}},
        ...
    ]
)

Performance Optimizations

1. Parallel Query Execution

Impact: 5-10x speedup for query phase

Implementation:

with ThreadPoolExecutor(max_workers=10) as executor:
    responses = list(executor.map(generate_fn, questions))

Trade-offs: - More threads = higher throughput but more server load - 10 threads balances throughput with API rate limits

2. Batch Embedding

Impact: 2-3x speedup for indexing phase

Implementation:

for idx in range(0, len(texts), 1024):
    batch = texts[idx : idx + 1024]
    embeddings.extend(embed_batch(batch))

Constraints: - OGX max batch size: 1024 chunks - Smaller batches = more API calls = slower

3. Collection Reuse

Impact: 50-80% speedup when indexing params unchanged

Cache Key:

cache_key = hash((
    chunking_method,
    chunk_size,
    chunk_overlap,
    embedding_model_id,
    embedding_dimension
))

Reuse Rate: - Typical search spaces: 40-60% reuse - Search spaces varying only retrieval/generation: 80-95% reuse

4. Results Caching

Impact: Instant return for duplicate evaluations

Frequency: - Random optimizer: ~0-1% hit rate (no duplicates by design) - GAM optimizer: ~1-5% hit rate (rare duplicate suggestions)

5. Early Stopping (Models Pre-Selection)

Impact: 30-70% reduction in total evaluations

Example: - Without MPS: 10 models × 5 embeddings × 4 configs = 200 evaluations - With MPS: 3 models × 2 embeddings × 4 configs = 24 evaluations (88% reduction)


Data Transformations Summary

Documents → Chunks:

DoclingDocument(name="doc1", ...)
 (DoclingChunker or LangChainChunker)
[
    AI4RAGChunk(text="Chunk 1", metadata={"document_id": "doc1", "sequence_number": 1}),
    AI4RAGChunk(text="Chunk 2", metadata={"document_id": "doc1", "sequence_number": 2}),
    ...
]

Chunks → Embeddings:

["Chunk 1 text", "Chunk 2 text", ...]
 (OGXEmbeddingModel)
[[0.1, -0.2, ...], [0.3, 0.1, ...], ...]  # 768-dim vectors

Embeddings → Vector Store:

[{"content": "Chunk 1", "embedding": [0.1, ...], "metadata": {...}}, ...]
 (OGXVectorStore.add_documents)
Collection "xyz" in OGX vector DB

Question → Query Embedding:

"What is the capital of France?"
 (OGXEmbeddingModel.embed_query)
[0.05, -0.12, 0.34, ...]  # 768-dim vector

Query → Retrieved Chunks:

[0.05, -0.12, ...]
 (OGXVectorStore.search via vector_io.query)
[
    AI4RAGChunk(text="Paris is the capital...", metadata={...}),
    AI4RAGChunk(text="France's capital city...", metadata={...}),
    ...
]

Chunks + Question → Answer:

contexts = ["Paris is the capital...", "France's capital city..."]
question = "What is the capital of France?"
 (SimpleRAG.generate via OGXFoundationModel.chat)
"Based on the provided documents, Paris is the capital of France."

Answer + Ground Truth → Scores:

{
    "question": "What is the capital of France?",
    "answer": "Paris is the capital of France.",
    "ground_truths": ["Paris"],
    "contexts": [...],
    "ground_truths_context_ids": [...]
}
 (UnitxtEvaluator)
{
    "faithfulness": {"mean": 0.95, "ci_low": 0.90, "ci_high": 1.0},
    "answer_correctness": {"mean": 0.88, "ci_low": 0.75, "ci_high": 1.0},
    ...
}

Error Handling

Failed Iterations:

When an evaluation fails, the optimizer continues:

try:
    score = run_single_evaluation(config)
except AI4RAGError as err:
    exception_handler.handle_exception(err)
    raise FailedIterationError(msg) from err

In optimizer:

try:
    score = objective_function(config)
except FailedIterationError:
    score = None  # Don't penalize, just skip

evaluations.append({"config": config, "score": score})

Result: Failed evaluations don't stop optimization but don't contribute to GAM training.

Common Failures:

  • IndexingError: Embedding API timeout, vector store connection issue
  • GenerationError: LLM API error, rate limit exceeded
  • EvaluationError: Malformed unitxt data, metric computation failure

Next Steps