Data Flow¶
This page provides a detailed analysis of how data flows through the ai4rag system during optimization, from documents to optimal RAG configuration.
Overview¶
ai4rag's data flow consists of four distinct phases executed for each configuration evaluated during hyperparameter optimization:
- Indexing Phase: Documents → Chunks → Embeddings → Vector Store
- Query Phase: Question → Retrieval → Context → Generation → Answer
- Evaluation Phase: Answers + Ground Truth → Metrics → Scores
- Optimization Loop: Scores → Optimizer → Next Configuration
Indexing Phase¶
The indexing phase transforms raw documents into searchable vector embeddings stored in a vector database.
sequenceDiagram
participant Exp as AI4RAGExperiment
participant LC as LangChainChunker
participant EM as LSEmbeddingModel
participant VS as LSVectorStore
participant LSClient as Llama Stack Client
Exp->>Exp: check if collection exists
alt Collection exists
Note over Exp: Reuse existing collection
else New collection needed
Exp->>LC: split_documents(documents)
activate LC
LC->>LC: set document_id if missing
LC->>LC: apply chunking method
LC->>LC: set sequence_number
LC-->>Exp: chunks (with metadata)
deactivate LC
Exp->>VS: add_documents(chunks)
activate VS
VS->>EM: embed_documents([chunk.page_content])
activate EM
loop Batches of 2048 chunks
EM->>LSClient: embeddings.create(batch)
LSClient-->>EM: embeddings
end
EM-->>VS: all embeddings
deactivate EM
loop Batches of 2048 chunks
VS->>LSClient: vector_io.insert(chunks + embeddings)
LSClient-->>VS: success
end
VS-->>Exp: indexing complete
deactivate VS
end Document Processing¶
Input Documents:
Documents are provided as list[Document] where each Document must have:
Document(
page_content="Document text content...",
metadata={
"document_id": "unique_doc_id", # Required
# other metadata fields...
}
)
If document_id is missing, the chunker auto-generates it via hash(page_content).
Chunking¶
LangChainChunker splits documents into smaller chunks:
Step 1: Split Documents
Uses RecursiveCharacterTextSplitter with configured parameters:
chunker = LangChainChunker(
method="recursive",
chunk_size=512, # Max chunk size in characters
chunk_overlap=128, # Overlap between chunks
separators=["\n\n", r"(?<=\. )", "\n", " ", ""] # Split hierarchy
)
Splitting hierarchy:
- Try splitting on double newlines (
\n\n) - If chunks still too large, try sentence boundaries (
(?<=\. )) - Then single newlines, spaces, characters
Step 2: Add Metadata
Each chunk receives:
chunk.metadata = {
"document_id": "original_doc_id", # Inherited from parent document
"sequence_number": 1, # Position within document
"start_index": 0, # Character offset in original
# ... original metadata preserved ...
}
Sequence numbering: - Chunks sorted by (document_id, start_index) - Sequence numbers assigned sequentially per document - Enables window-based retrieval (adjacent chunks)
Output:
[
Document(
page_content="First chunk text...",
metadata={"document_id": "doc1", "sequence_number": 1, "start_index": 0}
),
Document(
page_content="Second chunk text (with overlap)...",
metadata={"document_id": "doc1", "sequence_number": 2, "start_index": 384}
),
# ...
]
Embedding¶
LSEmbeddingModel converts chunk text to vector embeddings:
Auto-detection (on first use):
embedding_model = LSEmbeddingModel(
model_id="ollama/nomic-embed-text:latest",
client=llama_stack_client,
params={"embedding_dimension": 768, "context_length": 8192} # Optional
)
If embedding_dimension or context_length not provided: - embedding_dimension: Detected via test embedding call (counts dimensions) - context_length: Detected via binary search (64 to 8192 tokens, ~5 API calls)
Batch Processing:
Embeddings created in batches of 2048 chunks to respect API limits:
def embed_documents(texts: list[str]) -> list[list[float]]:
embeddings = []
for idx in range(0, len(texts), 2048):
batch = texts[idx : idx + 2048]
batch_embeddings = self.client.embeddings.create(
input=batch,
model=self.model_id
)
embeddings.extend([data.embedding for data in batch_embeddings.data])
return embeddings
Output:
[
[0.023, -0.145, 0.678, ...], # 768-dimensional vector for chunk 1
[0.091, -0.023, 0.445, ...], # 768-dimensional vector for chunk 2
# ...
]
Vector Store Insertion¶
LSVectorStore stores chunks and embeddings in Llama Stack vector database:
Collection Creation:
vs = client.vector_stores.create(
extra_body={
"provider_id": "milvus", # Extracted from vector_store_type="ls_milvus"
"embedding_model": "ollama/nomic-embed-text:latest",
"embedding_dimension": 768,
}
)
collection_name = vs.id # Unique collection ID
Batch Insertion:
Chunks inserted in batches of 2048:
chunks = [
{
"content": chunk.page_content,
"chunk_metadata": chunk.metadata,
"chunk_id": chunk.metadata["document_id"],
"embedding_model": embedding_model.model_id,
"embedding_dimension": 768,
"embedding": embedding_vector,
}
for chunk, embedding_vector in zip(chunks, embeddings)
]
for idx in range(0, len(chunks), 2048):
client.vector_io.insert(
vector_store_id=collection_name,
chunks=chunks[idx : idx + 2048]
)
Collection Reuse¶
Key Optimization: Collections are reused when indexing_params match:
indexing_params = {
"chunking": {
"chunking_method": "recursive",
"chunk_size": 512,
"chunk_overlap": 128,
},
"embedding": {
"model_id": "ollama/nomic-embed-text:latest",
"distance_metric": "cosine",
"embedding_dimension": 768,
"context_length": 8192,
}
}
Reuse Logic:
- Hash
indexing_paramsto generate canonical representation - Check
results.collection_namesfor matching params - If match found, reuse
collection_name(skip chunking, embedding, insertion) - If no match, create new collection
Performance Impact:
- Avoids re-chunking and re-embedding when only retrieval/generation params change
- Typical speedup: 50-80% per evaluation when collection reused
Query Phase¶
The query phase retrieves relevant chunks and generates answers for benchmark questions.
sequenceDiagram
participant QR as query_rag()
participant RAG as SimpleRAG
participant Ret as Retriever
participant VS as LSVectorStore
participant EM as LSEmbeddingModel
participant FM as LSFoundationModel
participant LSClient as Llama Stack Client
Note over QR: Parallel execution (ThreadPoolExecutor)
par Question 1
QR->>RAG: generate(question_1)
and Question 2
QR->>RAG: generate(question_2)
and Question N
QR->>RAG: generate(question_N)
end
activate RAG
RAG->>Ret: retrieve(question)
activate Ret
Ret->>EM: embed_query(question)
EM->>LSClient: embeddings.create(question)
LSClient-->>EM: query_embedding
EM-->>Ret: query_embedding
alt search_mode == "vector"
Ret->>VS: search(query_embedding, k, mode="vector")
VS->>LSClient: vector_io.query(vector_store_id, params)
LSClient-->>VS: ranked chunks
else search_mode == "hybrid"
Ret->>VS: search(query_embedding, k, mode="hybrid", ranker_*)
VS->>LSClient: vector_io.query(vector_store_id, params + reranker_params)
LSClient-->>VS: re-ranked chunks (dense + sparse)
end
VS-->>Ret: retrieved documents
Ret-->>RAG: reference_documents
deactivate Ret
RAG->>RAG: format context using context_template_text
RAG->>RAG: format user message using user_message_text
RAG->>FM: chat(messages)
activate FM
FM->>LSClient: chat.completions.create(model, messages, params)
LSClient-->>FM: response
FM-->>RAG: answer
deactivate FM
RAG-->>QR: {"answer": ..., "reference_documents": ..., "question": ...}
deactivate RAG Parallel Query Execution¶
ThreadPoolExecutor processes benchmark questions concurrently:
def query_rag(
rag: BaseRAGTemplate,
questions: list[str],
max_threads: int = 10
) -> list[dict]:
with ThreadPoolExecutor(max_workers=max_threads) as executor:
responses = list(executor.map(
partial(_generate_response, rag=rag),
questions
))
return responses
Concurrency: - Default: 10 concurrent threads - Balances throughput with server load - Each thread executes full retrieval + generation pipeline independently
Retrieval¶
Retriever fetches relevant chunks from the vector store:
Step 1: Embed Query
query_embedding = embedding_model.embed_query(question)
# Returns: [0.123, -0.456, 0.789, ...] (same dimension as document embeddings)
Step 2: Vector Search
Vector Mode (semantic search only):
params = {
"max_chunks": 5, # number_of_chunks
"mode": "vector"
}
response = client.vector_io.query(
query=question,
vector_store_id=collection_name,
params=params
)
Returns top-k chunks by cosine similarity (or other distance metric).
Hybrid Mode (semantic + keyword):
params = {
"max_chunks": 5,
"mode": "hybrid",
"reranker_type": "rrf", # or "weighted", "normalized"
"reranker_params": {
"impact_factor": 60 # ranker_k for RRF
# or "alpha": 0.7 for weighted
}
}
response = client.vector_io.query(...)
Combines dense vector search with sparse keyword search (e.g., BM25), then re-ranks using specified strategy.
Step 3: Convert to Documents
reference_documents = [
Document(
page_content=chunk.content,
metadata=chunk.chunk_metadata.to_dict()
)
for chunk in response.chunks
]
Context Formatting¶
SimpleRAG formats retrieved chunks into LLM context:
# Default context_template_text: "{document}\n"
context = "\n".join([
foundation_model.context_template_text.format(document=doc.page_content)
for doc in reference_documents
])
Example output:
According to the document: "First retrieved chunk text..."
According to the document: "Second retrieved chunk text..."
...
Customization:
foundation_model = LSFoundationModel(
model_id="ollama/llama3.2:3b",
client=client,
context_template_text="Source {document}\n---\n" # Custom format
)
User Message Construction¶
Combine context with question:
# Default user_message_text:
# "{reference_documents}\n\nQuestion: {question}\nAnswer:"
user_message = foundation_model.user_message_text.format(
reference_documents=context,
question=question
)
Example output:
According to the document: "Chunk 1..."
According to the document: "Chunk 2..."
Question: What is the capital of France?
Answer:
Generation¶
LSFoundationModel generates answer via chat completion:
messages = [
{
"role": "system",
"content": foundation_model.system_message_text
# Default: "You are a helpful, respectful and honest assistant..."
},
{
"role": "user",
"content": user_message
}
]
response = client.chat.completions.create(
model=foundation_model.model_id,
messages=messages,
max_completion_tokens=1024,
temperature=0.1
)
answer = response.choices[0].message.content
Output:
{
"answer": "Based on the provided documents, Paris is the capital of France.",
"reference_documents": [Document(...), Document(...), ...],
"question": "What is the capital of France?"
}
Evaluation Phase¶
The evaluation phase compares generated answers against ground truth using unitxt metrics.
sequenceDiagram
participant Exp as AI4RAGExperiment
participant Builder as build_evaluation_data()
participant UE as UnitxtEvaluator
participant Unitxt as unitxt.evaluate()
Exp->>Builder: build_evaluation_data(benchmark_data, inference_response)
activate Builder
loop For each question
Builder->>Builder: extract answer, contexts, context_ids
Builder->>Builder: match with ground_truths, ground_truth_doc_ids
Builder->>Builder: create EvaluationData instance
end
Builder-->>Exp: evaluation_data: list[EvaluationData]
deactivate Builder
Exp->>UE: evaluate_metrics(evaluation_data, metrics)
activate UE
UE->>UE: convert to DataFrame
UE->>Unitxt: evaluate(df, metric_names, compute_conf_intervals=True)
Unitxt-->>UE: scores_df, ci_table
UE->>UE: _handle_ci_calculations(ci_table)
Note over UE: Extract mean, ci_low, ci_high<br/>for each metric
UE->>UE: _handle_questions_scores(scores_df)
Note over UE: Extract per-question scores<br/>for each metric
UE-->>Exp: {"scores": {...}, "question_scores": {...}}
deactivate UE EvaluationData Assembly¶
build_evaluation_data() combines inference results with benchmark data:
Input: inference_response
[
{
"question": "What is X?",
"answer": "Based on the documents, X is...",
"reference_documents": [Document(...), Document(...)]
},
# ... one per benchmark question
]
Input: benchmark_data
BenchmarkData with:
- questions: ["What is X?", ...]
- correct_answers: [["X is...", "Alternative answer"], ...]
- document_ids: [["doc1", "doc3"], ...] # Ground truth source docs
- questions_ids: ["q0", "q1", ...]
Output: EvaluationData list
[
EvaluationData(
question="What is X?",
answer="Based on the documents, X is...",
contexts=[
"Retrieved chunk 1 text...",
"Retrieved chunk 2 text..."
],
context_ids=["doc1", "doc1"], # Source document IDs
ground_truths=["X is...", "Alternative answer"],
ground_truths_context_ids=["doc1", "doc3"],
question_id="q0"
),
# ...
]
Unitxt Evaluation¶
UnitxtEvaluator wraps the unitxt library for metric computation:
Metrics Evaluated:
| ai4rag Metric | Unitxt Metric | Description |
|---|---|---|
faithfulness | metrics.rag.external_rag.faithfulness | How well is the answer grounded in retrieved context? |
answer_correctness | metrics.rag.external_rag.answer_correctness | How correct is the answer vs ground truth? |
context_correctness | metrics.rag.external_rag.context_correctness | How relevant are retrieved docs vs ground truth docs? |
Evaluation Process:
- Convert to DataFrame:
- Call unitxt.evaluate():
scores_df, ci_table = evaluate(
df,
metric_names=[
"metrics.rag.external_rag.faithfulness",
"metrics.rag.external_rag.answer_correctness",
"metrics.rag.external_rag.context_correctness"
],
compute_conf_intervals=True
)
- Extract Aggregate Scores (mean + confidence intervals):
{
"faithfulness": {
"mean": 0.72,
"ci_low": 0.61,
"ci_high": 0.83
},
"answer_correctness": {
"mean": 0.68,
"ci_low": 0.55,
"ci_high": 0.81
},
"context_correctness": {
"mean": 0.80,
"ci_low": 0.70,
"ci_high": 0.90
}
}
- Extract Per-Question Scores:
{
"faithfulness": {
"q0": 0.71,
"q1": 0.73,
"q2": 0.68,
...
},
"answer_correctness": {
"q0": 0.65,
"q1": 0.70,
...
},
...
}
Optimization Score Calculation¶
The final optimization score is extracted from the aggregate results:
optimization_metric = "faithfulness" # User-configured
optimization_score = result_scores["scores"][optimization_metric]["mean"]
# Example: 0.72
This single scalar value is returned to the optimizer.
Optimization Loop¶
The optimization loop iterates configurations until the evaluation budget is exhausted.
flowchart TD
Start([Start Experiment]) --> MPS{Models Pre-Selection<br/>needed?}
MPS -->|Yes| RunMPS[Run MPS on sample]
RunMPS --> UpdateSS[Update search space<br/>with selected models]
UpdateSS --> InitOpt[Initialize Optimizer]
MPS -->|No| InitOpt
InitOpt --> RandomPhase[Random Phase:<br/>Evaluate n_random_nodes]
RandomPhase --> CheckBudget1{Reached<br/>max_evals?}
CheckBudget1 -->|Yes| ReturnBest
CheckBudget1 -->|No| GAMPhase
GAMPhase[GAM Phase:<br/>Train model on evaluations]
GAMPhase --> Predict[Predict scores for<br/>remaining configs]
Predict --> SelectTop[Select top evals_per_trial<br/>configurations]
SelectTop --> EvalSelected[Evaluate selected configs]
EvalSelected --> CheckBudget2{Reached<br/>max_evals?}
CheckBudget2 -->|No| GAMPhase
CheckBudget2 -->|Yes| ReturnBest
ReturnBest([Return Best Configuration]) Iteration Details¶
Each optimizer iteration follows this flow:
Random Phase (GAMOptimizer only):
def evaluate_initial_random_nodes():
successful = count(evaluations where score is not None)
while successful < n_random_nodes and len(evaluations) < max_evals:
config = random.choice(unevaluated_combinations)
score = objective_function(config) # Calls run_single_evaluation()
evaluations.append({"config": config, "score": score})
if score is not None:
successful += 1
GAM Phase:
def _run_iteration():
# 1. Train GAM
successful_evals = [e for e in evaluations if e["score"] is not None]
X_train = encode_categorical_params(successful_evals)
y_train = [e["score"] for e in successful_evals]
gam = LinearGAM().fit(X_train, y_train)
# 2. Predict
remaining = get_unevaluated_configs()
X_predict = encode_categorical_params(remaining)
predictions = gam.predict(X_predict)
# 3. Select top N
ranked = sorted(
zip(remaining, predictions),
key=lambda x: x[1],
reverse=True # Highest predictions first
)
top_n = ranked[:evals_per_trial]
# 4. Evaluate
for config, predicted_score in top_n:
actual_score = objective_function(config)
evaluations.append({"config": config, "score": actual_score})
Results Caching¶
Two-level caching prevents redundant work:
Level 1: Results Cache (entire evaluation)
def results.evaluation_explored_or_cached(indexing_params, rag_params) -> float | None:
cache_key = hash((indexing_params, rag_params))
if cache_key in cached_evaluations:
return cached_evaluations[cache_key]["final_score"]
return None
If exact (indexing_params, rag_params) evaluated before, return cached score immediately.
Level 2: Collection Reuse (indexing only)
def results.get_existing_collection(indexing_params) -> str | None:
cache_key = hash(indexing_params)
if cache_key in existing_collections:
return existing_collections[cache_key]["collection_name"]
return None
If indexing_params match but rag_params differ, reuse collection (skip indexing, but re-run retrieval/generation/evaluation).
Cache Hit Rates:
- Full cache hit: ~1% of time (optimizer rarely suggests exact duplicate)
- Collection reuse: ~40-60% of time (common when varying only retrieval/generation params)
Event Streaming¶
Throughout the loop, event handlers receive status updates and results:
on_status_change() calls:
# Chunking
event_handler.on_status_change(
level=LogLevel.INFO,
message="Chunking documents using recursive method...",
step=ExperimentStep.CHUNKING
)
# Embedding
event_handler.on_status_change(
level=LogLevel.INFO,
message="Embedding chunks using ollama/nomic-embed-text:latest...",
step=ExperimentStep.EMBEDDING
)
# Generation
event_handler.on_status_change(
level=LogLevel.INFO,
message="Retrieval and generation using collection 'xyz'...",
step=ExperimentStep.GENERATION
)
# Evaluation
event_handler.on_status_change(
level=LogLevel.INFO,
message="Evaluating RAG Pattern 'Pattern5'...",
step="evaluation"
)
on_pattern_creation() calls:
After each evaluation completes:
event_handler.on_pattern_creation(
payload={
"pattern_name": "Pattern5",
"iteration": 4,
"final_score": 0.72,
"execution_time": 134,
"scores": {...},
"settings": {
"chunking": {...},
"embedding": {...},
"retrieval": {...},
"generation": {...}
}
},
evaluation_results=[
{"question": ..., "answer": ..., "scores": {...}},
...
]
)
Performance Optimizations¶
1. Parallel Query Execution¶
Impact: 5-10x speedup for query phase
Implementation:
with ThreadPoolExecutor(max_workers=10) as executor:
responses = list(executor.map(generate_fn, questions))
Trade-offs: - More threads = higher throughput but more server load - 10 threads balances throughput with API rate limits
2. Batch Embedding¶
Impact: 2-3x speedup for indexing phase
Implementation:
for idx in range(0, len(texts), 2048):
batch = texts[idx : idx + 2048]
embeddings.extend(embed_batch(batch))
Constraints: - Llama Stack max batch size: 2048 chunks - Smaller batches = more API calls = slower
3. Collection Reuse¶
Impact: 50-80% speedup when indexing params unchanged
Cache Key:
cache_key = hash((
chunking_method,
chunk_size,
chunk_overlap,
embedding_model_id,
embedding_dimension
))
Reuse Rate: - Typical search spaces: 40-60% reuse - Search spaces varying only retrieval/generation: 80-95% reuse
4. Results Caching¶
Impact: Instant return for duplicate evaluations
Frequency: - Random optimizer: ~0-1% hit rate (no duplicates by design) - GAM optimizer: ~1-5% hit rate (rare duplicate suggestions)
5. Early Stopping (Models Pre-Selection)¶
Impact: 30-70% reduction in total evaluations
Example: - Without MPS: 10 models × 5 embeddings × 4 configs = 200 evaluations - With MPS: 3 models × 2 embeddings × 4 configs = 24 evaluations (88% reduction)
Data Transformations Summary¶
Documents → Chunks:
Document(page_content="Long text...", metadata={"document_id": "doc1"})
↓ (LangChainChunker)
[
Document(page_content="Chunk 1", metadata={"document_id": "doc1", "sequence_number": 1}),
Document(page_content="Chunk 2", metadata={"document_id": "doc1", "sequence_number": 2}),
...
]
Chunks → Embeddings:
["Chunk 1 text", "Chunk 2 text", ...]
↓ (LSEmbeddingModel)
[[0.1, -0.2, ...], [0.3, 0.1, ...], ...] # 768-dim vectors
Embeddings → Vector Store:
[{"content": "Chunk 1", "embedding": [0.1, ...], "metadata": {...}}, ...]
↓ (LSVectorStore.add_documents)
Collection "xyz" in Llama Stack vector DB
Question → Query Embedding:
"What is the capital of France?"
↓ (LSEmbeddingModel.embed_query)
[0.05, -0.12, 0.34, ...] # 768-dim vector
Query → Retrieved Chunks:
[0.05, -0.12, ...]
↓ (LSVectorStore.search via vector_io.query)
[
Document(page_content="Paris is the capital...", metadata={...}),
Document(page_content="France's capital city...", metadata={...}),
...
]
Chunks + Question → Answer:
contexts = ["Paris is the capital...", "France's capital city..."]
question = "What is the capital of France?"
↓ (SimpleRAG.generate via LSFoundationModel.chat)
"Based on the provided documents, Paris is the capital of France."
Answer + Ground Truth → Scores:
{
"question": "What is the capital of France?",
"answer": "Paris is the capital of France.",
"ground_truths": ["Paris"],
"contexts": [...],
"ground_truths_context_ids": [...]
}
↓ (UnitxtEvaluator)
{
"faithfulness": {"mean": 0.95, "ci_low": 0.90, "ci_high": 1.0},
"answer_correctness": {"mean": 0.88, "ci_low": 0.75, "ci_high": 1.0},
...
}
Error Handling¶
Failed Iterations:
When an evaluation fails, the optimizer continues:
try:
score = run_single_evaluation(config)
except AI4RAGError as err:
exception_handler.handle_exception(err)
raise FailedIterationError(msg) from err
In optimizer:
try:
score = objective_function(config)
except FailedIterationError:
score = None # Don't penalize, just skip
evaluations.append({"config": config, "score": score})
Result: Failed evaluations don't stop optimization but don't contribute to GAM training.
Common Failures:
- IndexingError: Embedding API timeout, vector store connection issue
- GenerationError: LLM API error, rate limit exceeded
- EvaluationError: Malformed unitxt data, metric computation failure
Next Steps¶
- Core Components - Detailed component documentation
- RAG Components - RAG pipeline component details
- Architecture Overview - High-level design