Skip to content

Vector Stores API

Milvus (OGX)

ogx

Classes

OGXVectorStore

OGXVectorStore(
    embedding_model: OGXEmbeddingModel,
    client: OgxClient,
    provider_id: str,
    reuse_collection_name: str | None = None,
    distance_metric: str | None = None,
)

Bases: BaseVectorStore

OGX client wrapper used for communication with vector store (single index/collection).

Source code in ai4rag/rag/vector_store/ogx.py
def __init__(
    self,
    embedding_model: OGXEmbeddingModel,
    client: OgxClient,
    provider_id: str,
    reuse_collection_name: str | None = None,
    distance_metric: str | None = None,
):
    super().__init__(embedding_model, distance_metric, reuse_collection_name)
    self.client = client
    self._ogx_vs = self._initialize_ogx_vector_store(
        client=client,
        embedding_model=embedding_model,
        provider_id=provider_id,
        reuse_collection_name=reuse_collection_name,
    )
    self._collection_name = self._ogx_vs.id
Functions
search
search(
    query: str,
    k: int,
    include_scores: bool = False,
    search_mode: str = "vector",
    ranker_strategy: str | None = None,
    ranker_k: int | None = None,
    ranker_alpha: float | None = None,
    **kwargs
) -> list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]

Search for the chunks relevant to the query.

Parameters:

  • query (str) –

    Question / query for which the similarity search will be executed.

  • k (int) –

    Number of chunks to be returned as a result of similarity search

  • include_scores (bool, default: False ) –

    If True, similarity scores will be returned in the response

  • search_mode (str, default: "vector" ) –

    Search mode: "vector" or "hybrid".

  • ranker_strategy (str | None, default: None ) –

    Ranking strategy for hybrid search: "rrf", "weighted", or "normalized". Empty string means no ranker (used for non-hybrid modes).

  • ranker_k (int | None, default: None ) –

    Parameter k for the ranking function. 0 means not set.

  • ranker_alpha (float, default: None ) –

    Alpha parameter for weighted ranking strategy. 1 means not set (vector-only sentinel).

Returns:

  • list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]

    List of chunks with or without scores, depending on the input.

Source code in ai4rag/rag/vector_store/ogx.py
def search(
    self,
    query: str,
    k: int,
    include_scores: bool = False,
    search_mode: str = "vector",
    ranker_strategy: str | None = None,
    ranker_k: int | None = None,
    ranker_alpha: float | None = None,
    **kwargs,
) -> list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]:
    """
    Search for the chunks relevant to the query.

    Parameters
    ----------
    query : str
        Question / query for which the similarity search will be executed.

    k : int
        Number of chunks to be returned as a result of similarity search

    include_scores : bool, default=False
        If True, similarity scores will be returned in the response

    search_mode : str, default="vector"
        Search mode: "vector" or "hybrid".

    ranker_strategy : str | None, default=None
        Ranking strategy for hybrid search: "rrf", "weighted", or "normalized".
        Empty string means no ranker (used for non-hybrid modes).

    ranker_k : int | None, default=None
        Parameter k for the ranking function. 0 means not set.

    ranker_alpha : float, default=None
        Alpha parameter for weighted ranking strategy. 1 means not set (vector-only sentinel).

    Returns
    -------
    list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]
        List of chunks with or without scores, depending on the input.
    """
    self._validate_search_params(search_mode, ranker_strategy, ranker_k, ranker_alpha)
    params = {
        "max_chunks": k,
        "mode": search_mode,
    }

    if search_mode == "hybrid" and ranker_strategy:
        params["reranker_type"] = ranker_strategy
        reranker_params = {}
        if ranker_strategy == "rrf" and ranker_k is not None and ranker_k > 0:
            reranker_params["impact_factor"] = ranker_k
        if ranker_strategy == "weighted" and ranker_alpha is not None and ranker_alpha != 1:
            reranker_params["alpha"] = ranker_alpha
        params["reranker_params"] = reranker_params

    resp = self.client.vector_io.query(query=query, vector_store_id=self._ogx_vs.id, params=params)

    if include_scores:
        return [
            (AI4RAGChunk(text=chunk.content, metadata=chunk.chunk_metadata.to_dict()), score)
            for chunk, score in zip(resp.chunks, resp.scores)
        ]

    return [AI4RAGChunk(text=chunk.content, metadata=chunk.chunk_metadata.to_dict()) for chunk in resp.chunks]
add_documents
add_documents(documents: list[AI4RAGChunk], **kwargs) -> None

Add documents to the collection.

Parameters:

  • documents (list[AI4RAGChunk]) –

    Chunks to add to the collection.

Source code in ai4rag/rag/vector_store/ogx.py
def add_documents(self, documents: list[AI4RAGChunk], **kwargs) -> None:
    """
    Add documents to the collection.

    Parameters
    ----------
    documents : list[AI4RAGChunk]
        Chunks to add to the collection.
    """
    batch_size = kwargs.get("batch_size", 2048)

    # Handle both dict and OGXEmbeddingParams for backward compatibility
    if isinstance(self.embedding_model.params, dict):
        embedding_dimension = self.embedding_model.params["embedding_dimension"]
    else:
        embedding_dimension = self.embedding_model.params.embedding_dimension

    chunks = [
        {
            "content": doc.text,
            "chunk_metadata": {
                "document_id": doc.metadata["document_id"],
            },
            "metadata": doc.metadata,
            "chunk_id": str(hash(doc.text)),
            "embedding_model": self.embedding_model.model_id,
            "embedding_dimension": embedding_dimension,
        }
        for doc in documents
    ]

    embeddings = self.embedding_model.embed_documents([doc.text for doc in documents])

    seen_ids = set()
    unique_chunks = []
    unique_embeddings = []

    for chunk, embedding in zip(chunks, embeddings):
        chunk_id = chunk["chunk_id"]
        if chunk_id in seen_ids:
            logger.warning(
                "Skipping duplicate chunk_id: %s from document: %s",
                chunk_id,
                chunk["chunk_metadata"]["document_id"],
            )
            continue
        seen_ids.add(chunk_id)
        unique_chunks.append(chunk)
        unique_embeddings.append(embedding)

    full_chunks = [chunk | {"embedding": embedding} for chunk, embedding in zip(unique_chunks, unique_embeddings)]

    for idx in range(0, len(full_chunks), batch_size):
        self.client.vector_io.insert(
            vector_store_id=self._ogx_vs.id,
            chunks=full_chunks[idx : idx + batch_size],
        )
clean_collection
clean_collection()

Remove content of the collection and remove vector store instance.

Source code in ai4rag/rag/vector_store/ogx.py
def clean_collection(self):
    """Remove content of the collection and remove vector store instance."""
    self.client.vector_stores.delete(self._ogx_vs.id)

ChromaDB

chroma

Classes

ChromaVectorStore

ChromaVectorStore(
    embedding_model: BaseEmbeddingModel,
    reuse_collection_name: str | None = None,
    distance_metric: str = "cosine",
    document_name_field: str = "document_id",
    chunk_sequence_number_field: str = "sequence_number",
    **kwargs
)

Bases: BaseVectorStore

Class representing single index in the chroma vector database.

Internally converts between AI4RAGChunk (pipeline type) and langchain Document (required by langchain_chroma.Chroma).

Parameters:

  • embedding_model (BaseEmbeddingModel) –

    Instance used for embedding documents and user's queries.

  • reuse_collection_name (str, default: None ) –

    Name of the collection that will be created as a vector store.

  • distance_metric (str, default: "cosine" ) –

    Metric that will be used to calculate similarity score between vectors.

  • document_name_field (str, default: "document_id" ) –

    Default document ID field name.

  • chunk_sequence_number_field (str, default: "chunk_sequence_number" ) –

    Default chunk sequence number field name.

Source code in ai4rag/rag/vector_store/chroma.py
def __init__(
    self,
    embedding_model: BaseEmbeddingModel,
    reuse_collection_name: str | None = None,
    distance_metric: str = "cosine",
    document_name_field: str = "document_id",
    chunk_sequence_number_field: str = "sequence_number",
    **kwargs,
) -> None:
    super().__init__(
        embedding_model=embedding_model,
        distance_metric=distance_metric,
        reuse_collection_name=reuse_collection_name,
    )
    self._document_name_field = document_name_field
    self._chunk_sequence_number_field = chunk_sequence_number_field
    self._collection_name = reuse_collection_name or kwargs.pop(
        "collection_name", f"ai4rag_{datetime.now().strftime("%Y%m%d%H%M%S")}"
    )
    self._vector_store = self._get_chroma_client(**kwargs)
Attributes
distance_metric property writable
distance_metric: str

Get used distance metric.

collection_name property
collection_name: str

Dynamically get collection name.

Functions
clear
clear() -> None

Clear the vector store.

Source code in ai4rag/rag/vector_store/chroma.py
def clear(self) -> None:
    """Clear the vector store."""
    all_docs_ids = self._vector_store.get()["ids"]
    if len(all_docs_ids) > 0:
        self.delete(all_docs_ids)
count
count() -> int

Count the number of shards in the vector store.

Returns:

  • int

    Number of shards in the vector store.

Source code in ai4rag/rag/vector_store/chroma.py
def count(self) -> int:
    """Count the number of shards in the vector store.

    Returns
    -------
    int
        Number of shards in the vector store.
    """
    return len(self._vector_store.get()["ids"])
add_documents
add_documents(documents: list[AI4RAGChunk], **kwargs: Any) -> list[str]

Embed and add chunks to the vector store.

Parameters:

  • documents (list[AI4RAGChunk]) –

    Chunks to be embedded and added to the vector store.

Returns:

  • list[str]

    List of document IDs.

Source code in ai4rag/rag/vector_store/chroma.py
def add_documents(self, documents: list[AI4RAGChunk], **kwargs: Any) -> list[str]:
    """
    Embed and add chunks to the vector store.

    Parameters
    ----------
    documents : list[AI4RAGChunk]
        Chunks to be embedded and added to the vector store.

    Returns
    -------
    list[str]
        List of document IDs.
    """
    max_batch_size = kwargs.get("max_batch_size", 2048)

    ids, docs = self._process_documents(documents)
    if len(docs) > max_batch_size:
        batch_ids = []

        for batch_start in range(0, len(docs), max_batch_size):
            batch_ids.extend(
                self._vector_store.add_documents(
                    docs[batch_start : batch_start + max_batch_size],
                    ids=ids[batch_start : batch_start + max_batch_size],
                    **kwargs,
                )
            )
        return batch_ids

    return self._vector_store.add_documents(docs, ids=ids, **kwargs)
search
search(
    query: str, k: int = 5, include_scores: bool = False, **kwargs: Any
) -> list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]

Searches for documents most similar to the query.

The method is designed as a wrapper for respective LangChain VectorStores' similarity search methods. Therefore, additional search parameters passed in kwargs should be consistent with those methods, and can be found in the LangChain documentation.

Parameters:

  • query (str) –

    Query for which grounding documents will be searched for.

  • k (int, default: 5 ) –

    Number of documents to retrieve

  • include_scores (bool, default: False ) –

    Whether similarity scores of found documents should be returned.

Returns:

Source code in ai4rag/rag/vector_store/chroma.py
def search(
    self,
    query: str,
    k: int = 5,
    include_scores: bool = False,
    **kwargs: Any,
) -> list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]:
    """Searches for documents most similar to the query.

    The method is designed as a wrapper for respective LangChain VectorStores' similarity search methods.
    Therefore, additional search parameters passed in ``kwargs`` should be consistent with those methods,
    and can be found in the LangChain documentation.

    Parameters
    ----------
    query : str
        Query for which grounding documents will be searched for.

    k : int, default=5
        Number of documents to retrieve

    include_scores : bool, default=False
        Whether similarity scores of found documents should be returned.

    Returns
    -------
    list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]
        Found chunks with or without scores.
    """
    filtered_kwargs = {k_: v for k_, v in kwargs.items() if k_ not in self._HYBRID_KWARGS}
    if include_scores:
        lc_results = self._vector_store.similarity_search_with_score(query, k=k, **filtered_kwargs)
        return [(self._from_langchain_document(doc), score) for doc, score in lc_results]

    lc_results = self._vector_store.similarity_search(query, k=k, **filtered_kwargs)
    return [self._from_langchain_document(doc) for doc in lc_results]
window_search(
    query: str, k: int = 5, include_scores: bool = False, window_size: int = 2, **kwargs: Any
) -> list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]

Searches for documents most similar to the query and extend a document (a chunk) to its adjacent chunks (if they exist) from the same origin document.

The method is designed as a wrapper for respective LangChain VectorStores' similarity search methods. Therefore, additional search parameters passed in kwargs should be consistent with those methods, and can be found in the LangChain documentation.

Parameters:

  • query (str) –

    Query for which grounding documents will be searched for.

  • k (int, default: 5 ) –

    Number of documents to retrieve

  • include_scores (bool, default: False ) –

    Whether similarity scores of found documents should be returned.

  • window_size (int, default: 2 ) –

    Number of chunks from right and left side of the original chunk.

Returns:

Source code in ai4rag/rag/vector_store/chroma.py
def window_search(
    self,
    query: str,
    k: int = 5,
    include_scores: bool = False,
    window_size: int = 2,
    **kwargs: Any,
) -> list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]:
    """
    Searches for documents most similar to the query and extend a document (a chunk)
    to its adjacent chunks (if they exist) from the same origin document.

    The method is designed as a wrapper for respective LangChain VectorStores' similarity search methods.
    Therefore, additional search parameters passed in ``kwargs`` should be consistent with those methods,
    and can be found in the LangChain documentation.

    Parameters
    ----------
    query : str
        Query for which grounding documents will be searched for.

    k : int, default=5
        Number of documents to retrieve

    include_scores : bool, default=False
        Whether similarity scores of found documents should be returned.

    window_size : int, default=2
        Number of chunks from right and left side of the original chunk.

    Returns
    -------
    list[AI4RAGChunk] | list[tuple[AI4RAGChunk, float]]
        Found chunks with or without scores.
    """
    results = self.search(query, k, include_scores, **kwargs)
    if window_size <= 0:
        return results

    if not include_scores:
        chunks = cast(list[AI4RAGChunk], results)
        return [self._window_extend_and_merge(chunk, window_size) for chunk in chunks]

    chunks_and_scores = cast(list[tuple[AI4RAGChunk, float]], results)
    chunks = [t[0] for t in chunks_and_scores]
    scores = [t[1] for t in chunks_and_scores]
    extended = [self._window_extend_and_merge(chunk, window_size) for chunk in chunks]
    return list(zip(extended, scores))
delete
delete(ids: list[str], **kwargs: Any) -> None

Delete by vector ID or other criteria. For more details see LangChain documentation https://python.langchain.com/api_reference/core/vectorstores/langchain_core.vectorstores.base.VectorStore.html#langchain_core.vectorstores.base.VectorStore

Source code in ai4rag/rag/vector_store/chroma.py
def delete(self, ids: list[str], **kwargs: Any) -> None:
    """Delete by vector ID or other criteria. For more details see LangChain documentation
    https://python.langchain.com/api_reference/core/vectorstores/langchain_core.vectorstores.base.VectorStore.html#langchain_core.vectorstores.base.VectorStore
    """
    self._vector_store.delete(ids, **kwargs)

Functions