Skip to content

Experiment API

AI4RAGExperiment

AI4RAGExperiment(
    documents: list[Document],
    benchmark_data: DataFrame,
    search_space: AI4RAGSearchSpace,
    vector_store_type: Literal["chroma", "ls_milvus"],
    optimizer_settings: OptimizerSettings,
    event_handler: BaseEventHandler,
    client: LlamaStackClient | Any = None,
    optimization_metric: str = MetricType.FAITHFULNESS,
    **kwargs
)

Class responsible for conducting AutoRAG experiment, that consists of finding the best hyperparameters for several steps/stages.

AI4RAGExperiment is essentially an orchestrator for the RAG Patterns hyperparameters optimization for the desired metric. It requires from user to provide fully defined search space on which the experiment will be executed.

AI4RAG uses 'BaseRAGTemplate' inheriting classes as definitions on how to build and utilize RAG Pattern with the given search space nodes.

Parameters:

  • documents (list[Document | tuple[str, str]]) –

    List of documents to embed in vector db and use as context in RAG. When given as list of langchain's Document instances, both content and document ids must be provided: Document(page_content=..., metadata={document_id: 'some_id'}) When given as list of tuples it should be (content, document_id)

  • benchmark_data (DataFrame | BenchmarkData) –

    Structure with 3 columns: 'question', 'correct_answers' and - if applicable - 'correct_answer_document_ids'.

  • search_space (AI4RAGSearchSpace) –

    Grid of parameters used during hyperparameter optimization.

  • vector_store_type (Literal['chroma', 'ls_milvus']) –

    Specific type of Vector Data Base that will be used during the experiment.

  • optimizer_settings (OptimizerSettings) –

    Settings for the optimizer to be used during the experiment.

  • client (LlamaStackClient | Any, default: None ) –

    Instance of the llama stack client or other client allowing to communicate with the available vector store providers.

  • event_handler (BaseEventHandler) –

    Instance satisfying BaseEventHandler's interface to stream pattern evaluation results and intermediate status updates. EventHandler is an entrypoint to configure custom logging and assets handling.

  • optimization_metric (str, default: MetricType.FAITHFULNESS ) –

    Metrics that should be used for calculating final score value that will be minimized. This sequence should contain 1 value for first release.

Other Parameters:

  • job_id (str) –

    Unique identifier for a job.

  • metrics (Sequence[str]) –

    Metrics that will be evaluated during AutoRAG experiment. Not all of these metrics will be used to calculate final score, but they will be included in the evaluation results.

  • evaluator (BaseEvaluator) –

    An implementation of the BaseEvaluator class, that will be used by the AI4RAGExperiment To evaluate the RAG pattern performance and will be utilized during the optimization process.

  • n_mps_foundation_models (int) –

    Amount of foundation models to be further used in experiment post pre-selection.

  • n_mps_embedding_models (int) –

    Amount of embedding models to be further used in experiment post pre-selection.

Attributes:

  • results (ExperimentResults) –

    Instance holding information about each iteration during the experiment. It consists of statuses, RAG pattern objects, scores and settings.

Source code in ai4rag/core/experiment/experiment.py
def __init__(
    self,
    documents: list[Document],
    benchmark_data: pd.DataFrame,
    search_space: AI4RAGSearchSpace,
    vector_store_type: Literal["chroma", "ls_milvus"],
    optimizer_settings: OptimizerSettings,
    event_handler: BaseEventHandler,
    client: LlamaStackClient | Any = None,
    optimization_metric: str = MetricType.FAITHFULNESS,
    **kwargs,
):
    self.documents = documents
    self.benchmark_data = BenchmarkData(benchmark_data)
    self.search_space = search_space
    self.vector_store_type = vector_store_type
    self.optimizer_settings = optimizer_settings
    self.event_handler = event_handler
    self.client = client
    self.optimization_metric = optimization_metric

    self.job_id = kwargs.pop("job_id", "ai4rag_job_a0b1c2d3").replace("-", "_")
    self.metrics: Sequence[str] = kwargs.pop(
        "metrics", (MetricType.ANSWER_CORRECTNESS, MetricType.FAITHFULNESS, MetricType.CONTEXT_CORRECTNESS)
    )
    self.evaluator: BaseEvaluator = kwargs.pop(
        "evaluator",
        UnitxtEvaluator(),
    )
    self.n_mps_foundation_models = kwargs.pop("n_mps_fm", ModelsPreSelector.DEFAULT_N_FOUNDATION_MODELS)
    self.n_mps_embedding_models = kwargs.pop("n_mps_em", ModelsPreSelector.DEFAULT_N_EMBEDDING_MODELS)
    self.known_observations: list[dict] | None = kwargs.pop("known_observations", None)

    self.results: ExperimentResults = ExperimentResults()
    self._exception_handler = ExperimentExceptionHandler(self.event_handler)

    if kwargs:
        logger.warning("Unknown parameters: %s", kwargs)

Attributes

documents property writable

documents: list[Document]

Get list of documents

optimization_metric property writable

optimization_metric: str

Get optimization metrics used for the experiment.

benchmark_data property writable

benchmark_data: BenchmarkData

Get benchmark data.

Functions

run_pre_selection

run_pre_selection(
    foundation_models: list[BaseFoundationModel],
    embedding_models: list[BaseEmbeddingModel],
    n_records: int = 5,
    random_seed: int = 17,
) -> dict[str, list[BaseEmbeddingModel | BaseFoundationModel]]

Run models pre-selection using ModelsPreSelector and sample of the data.

Parameters:

  • embedding_models (list[BaseEmbeddingModel]) –

    Embedding models to be considered during pre-selection process.

  • foundation_models (list[BaseFoundationModel]) –

    Foundation models to be evaluated during pre-selection process.

  • n_records (int, default: 5 ) –

    Amount of records that should be used during models pre-selection.

  • random_seed (int, default: 17 ) –

    Random seed value used for sampling benchmark data records.

Returns:

  • dict[str, list[BaseFoundationModel | EmbeddingModel]]

    Best embedding models and foundation models found in pre-selection.

Source code in ai4rag/core/experiment/experiment.py
def run_pre_selection(
    self,
    foundation_models: list[BaseFoundationModel],
    embedding_models: list[BaseEmbeddingModel],
    n_records: int = 5,
    random_seed: int = 17,
) -> dict[str, list[BaseEmbeddingModel | BaseFoundationModel]]:
    """
    Run models pre-selection using ModelsPreSelector and sample
    of the data.

    Parameters
    ----------
    embedding_models : list[BaseEmbeddingModel]
        Embedding models to be considered during pre-selection process.

    foundation_models : list[BaseFoundationModel]
        Foundation models to be evaluated during pre-selection process.

    n_records : int, default=5
        Amount of records that should be used during models pre-selection.

    random_seed : int, default=17
        Random seed value used for sampling benchmark data records.

    Returns
    -------
    dict[str, list[BaseFoundationModel | EmbeddingModel]]
        Best embedding models and foundation models found in pre-selection.
    """
    _log_start_mps = (
        f"Starting foundation models pre-selection with following "
        f"foundation models: {[str(fm) for fm in foundation_models]} "
        f"and following embedding models: {[str(em) for em in embedding_models]}."
    )
    logger.info(_log_start_mps)
    self.event_handler.on_status_change(
        level=LogLevel.INFO,
        message=_log_start_mps,
        step=ExperimentStep.MODEL_SELECTION,
    )

    mps = ModelsPreSelector(
        benchmark_data=self.benchmark_data.get_random_sample(n_records=n_records, random_seed=random_seed),
        documents=self.documents.copy(),
        foundation_models=foundation_models,
        embedding_models=embedding_models,
        metric=self.optimization_metric,
    )
    mps.evaluate_patterns()

    selected_models = mps.select_models(
        n_embedding_models=self.n_mps_embedding_models, n_foundation_models=self.n_mps_foundation_models
    )

    logger.info(
        "Models pre-selection has been finished. Selected foundation models: %s and selected embedding models: %s.",
        [str(model) for model in selected_models["foundation_models"]],
        [str(model) for model in selected_models["embedding_models"]],
    )

    return selected_models

run_single_evaluation

run_single_evaluation(rag_params: RAGParamsType) -> float

Evaluate a single RAG configuration and return its score using provided documents.

Parameters:

  • rag_params (RAGParamsType) –

    A dictionary containing rag parameters as keys and their values.

Returns:

  • float

    A single evaluation score obtained by the executed rag pattern.

Source code in ai4rag/core/experiment/experiment.py
def run_single_evaluation(self, rag_params: RAGParamsType) -> float:
    """
    Evaluate a single RAG configuration and return its score using provided documents.

    Parameters
    ----------
    rag_params : RAGParamsType
        A dictionary containing rag parameters as keys and their values.

    Returns
    -------
    float
        A single evaluation score obtained by the executed rag pattern.
    """
    start_time = time.time()

    chunking_params = get_chunking_params(rag_params)
    retrieval_params = get_retrieval_params(rag_params)

    foundation_model = rag_params.get(AI4RAGParamNames.FOUNDATION_MODEL)
    embedding_model = rag_params.get(AI4RAGParamNames.EMBEDDING_MODEL)

    distance_metric = EmbeddingModels.get_distance_metric(embedding_model.model_id)
    embedding_params_dict = (
        asdict(embedding_model.params) if is_dataclass(embedding_model.params) else embedding_model.params
    )
    indexing_params = {
        "chunking": chunking_params,
        "embedding": {
            "model_id": embedding_model.model_id,
            "distance_metric": distance_metric,
            "embedding_params": embedding_params_dict,
        },
    }

    logger.info("Using indexing params: %s", indexing_params)

    retrieval_method = retrieval_params[AI4RAGParamNames.RETRIEVAL_METHOD]
    number_of_chunks = retrieval_params[AI4RAGParamNames.NUMBER_OF_CHUNKS]

    search_mode = retrieval_params.get(AI4RAGParamNames.SEARCH_MODE, "vector")
    if search_mode != "vector" and self.vector_store_type == "chroma":
        raise RAGExperimentError(
            f"Search mode '{search_mode}' is not supported with chroma vector store. "
            "Only 'vector' mode is supported for chroma."
        )

    context_template_text = foundation_model.context_template_text
    system_message_text = foundation_model.system_message_text
    user_message_text = foundation_model.user_message_text

    rag_params = {
        "retrieval": retrieval_params,
        "generation": {
            "model_id": foundation_model.model_id,
            "context_template_text": context_template_text,
            "user_message_text": user_message_text,
            "system_message_text": system_message_text,
        },
    }

    logger.info("Using retrieval and generation params: %s", rag_params)

    result_score = self.results.evaluation_explored_or_cached(
        indexing_params=indexing_params, rag_params=rag_params
    )
    if result_score is not None:
        return result_score

    pattern_name = self._create_pattern_name()
    logger.info("Using name '%s' for the currently evaluated pattern.", pattern_name)

    reuse_collection_name = self._get_reusable_collection_name(indexing_params=indexing_params)

    vector_store = get_vector_store(
        vs_type=self.vector_store_type,
        embedding_model=embedding_model,
        reuse_collection_name=reuse_collection_name,
        client=self.client,
    )

    collection_name = vector_store.collection_name

    if not self._collection_exists(collection_name=collection_name):
        chunking_method = chunking_params.get(AI4RAGParamNames.CHUNKING_METHOD)
        chunk_size = chunking_params.get(AI4RAGParamNames.CHUNK_SIZE)
        chunk_overlap = chunking_params.get(AI4RAGParamNames.CHUNK_OVERLAP)

        chunker = LangChainChunker(method=chunking_method, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        chunked_documents = chunker.split_documents(self.documents)

        if self.event_handler:
            self.event_handler.on_status_change(
                level=LogLevel.INFO,
                message=(
                    f"Chunking documents using the {chunking_method} method, chunk_size: {chunk_size} "
                    f"and chunk_overlap: {chunk_overlap}."
                ),
                step=ExperimentStep.CHUNKING,
            )

        self.event_handler.on_status_change(
            level=LogLevel.INFO,
            message=(
                f"Embedding chunks using the {embedding_model.model_id} model. "
                f"Building index: {collection_name}."
            ),
            step=ExperimentStep.EMBEDDING,
        )

        try:
            vector_store.add_documents(chunked_documents)
        except Exception as exc:
            raise IndexingError(exc, collection_name, embedding_model.model_id) from exc

    else:
        self.event_handler.on_status_change(
            level=LogLevel.INFO,
            message=f"Using index {collection_name}.",
            step=ExperimentStep.EMBEDDING,
        )

    logger.info("Using retriever with parameters: %s", retrieval_params)

    retriever = Retriever(
        vector_store=vector_store,
        number_of_chunks=number_of_chunks,
        method=retrieval_method,
        search_mode=search_mode,
        ranker_strategy=retrieval_params.get(AI4RAGParamNames.RANKER_STRATEGY),
        ranker_k=retrieval_params.get(AI4RAGParamNames.RANKER_K),
        ranker_alpha=retrieval_params.get(AI4RAGParamNames.RANKER_ALPHA),
    )

    rag_pattern = SimpleRAG(
        foundation_model=foundation_model,
        retriever=retriever,
    )

    _rag_log = (
        f"Retrieval and generation using collection: '{collection_name}' and "
        f"foundation model: '{foundation_model.model_id}'."
    )
    logger.info(_rag_log)
    self.event_handler.on_status_change(
        level=LogLevel.INFO,
        message=_rag_log,
        step=ExperimentStep.GENERATION,
    )

    inference_response = query_rag(
        rag=rag_pattern,
        questions=list(self.benchmark_data.questions),
    )
    result_scores, evaluation_data = self._evaluate_response(
        inference_response=inference_response,
        pattern_name=pattern_name,
    )

    stop_time = time.time()
    execution_time = stop_time - start_time

    result_score = result_scores["scores"][self.optimization_metric]["mean"]

    logger.info("Calculated optimization score for '%s': %s", pattern_name, result_score)

    evaluation_result = EvaluationResult(
        pattern_name=pattern_name,
        collection=collection_name,
        indexing_params=indexing_params,
        rag_params=rag_params,
        scores=result_scores,
        execution_time=execution_time,
        final_score=result_score,
        rag_pattern=rag_pattern,
    )

    evaluation_results_json = self.results.create_evaluation_results_json(
        evaluation_data=evaluation_data, evaluation_result=evaluation_result
    )

    logger.info(
        "Evaluation scores: %s",
        {el.get("question_id"): el.get("scores") for el in evaluation_results_json if isinstance(el, dict)},
    )

    try:
        self._stream_finished_pattern(
            evaluation_result=evaluation_result,
            evaluation_results_json=evaluation_results_json,
        )
    except Exception as exc:
        raise AssetSaveError(exc) from exc

    self.results.add_evaluation(
        evaluation_data=evaluation_data,
        evaluation_result=evaluation_result,
    )

    return result_score

search

search(**kwargs) -> None

Prepare and execute experiment to find the best RAG parameters.

Result of the search() can be reviewed via self.results as this object stores results of each evaluation or via self.event_handler with custom implementation.

Source code in ai4rag/core/experiment/experiment.py
def search(self, **kwargs) -> None:
    """
    Prepare and execute experiment to find the best RAG parameters.

    Result of the search() can be reviewed via self.results as this object
    stores results of each evaluation or via self.event_handler with custom
    implementation.
    """

    logger.info("Starting RAG optimization process...")

    def objective_function(space: RAGParamsType) -> float | None:
        """Function passed to the optimizer."""
        try:
            return self.run_single_evaluation(space)
        except AI4RAGError as err:
            msg = self._exception_handler.handle_exception(err)
            raise FailedIterationError(msg) from err

    # MPS - models pre-selection based on sample evaluation.
    # Run if there are more than 3 foundation models or more than 2 embedding models.
    foundation_models = list(self.search_space[AI4RAGParamNames.FOUNDATION_MODEL].values)
    embedding_models = list(self.search_space[AI4RAGParamNames.EMBEDDING_MODEL].values)

    if (
        len(embedding_models) > self.n_mps_embedding_models or len(foundation_models) > self.n_mps_foundation_models
    ) and not kwargs.get("skip_mps", False):
        selected_models = self.run_pre_selection(
            foundation_models=foundation_models, embedding_models=embedding_models
        )
        self.search_space[AI4RAGParamNames.FOUNDATION_MODEL] = Parameter(
            name=AI4RAGParamNames.FOUNDATION_MODEL, param_type="C", values=selected_models["foundation_models"]
        )
        self.search_space[AI4RAGParamNames.EMBEDDING_MODEL] = Parameter(
            name=AI4RAGParamNames.EMBEDDING_MODEL, param_type="C", values=selected_models["embedding_models"]
        )

    optimizer_class: type[BaseOptimizer] = kwargs.get("optimizer", GAMOptimizer)

    optimizer_kwargs = {}
    if self.known_observations is not None:
        optimizer_kwargs["known_observations"] = self.known_observations

    # In the search kwargs user may pass different optimizer class for testing purposes
    optimizer = optimizer_class(
        objective_function=objective_function,
        search_space=self.search_space,
        settings=self.optimizer_settings,
        **optimizer_kwargs,
    )
    logger.info(
        "Using optimizer: %s with optimizer settings: %s",
        optimizer_class.__name__,
        self.optimizer_settings.to_dict(),
    )

    try:
        _ = optimizer.search()
    except OptimizationError as err:
        final_error_msg = self._exception_handler.get_final_error_msg()
        raise RAGExperimentError(final_error_msg) from err

    self.event_handler.on_status_change(
        level=LogLevel.INFO,
        message="Experiment optimization process finished.",
    )