Source code for ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores.adapters.es_adapter

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2024-2025.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

import copy
import logging
from typing import Any, cast

from ibm_watsonx_ai import APIClient
from ibm_watsonx_ai.foundation_models.embeddings import BaseEmbeddings
from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores.langchain_vector_store_adapter import (
    DEFAULT_CHUNK_SEQUENCE_NUMBER_FIELD,
    DEFAULT_DOCUMENT_NAME_FIELD,
    LangChainVectorStoreAdapter,
)
from ibm_watsonx_ai.utils.utils import is_lib_installed
from ibm_watsonx_ai.wml_client_error import (
    MissingExtension,
    VectorStoreSerializationError,
)

if not is_lib_installed(ext := "langchain-elasticsearch"):
    raise MissingExtension(ext, extra_info="rag")
import elasticsearch
from elastic_transport import ConnectionTimeout
from langchain_elasticsearch.vectorstores import ElasticsearchStore

if not is_lib_installed(ext := "langchain-core"):
    raise MissingExtension(ext, extra_info="rag")
from langchain_core.documents import Document

from .es_utils import HybridStrategyElasticsearch

logger = logging.getLogger(__name__)


[docs] class ElasticsearchVectorStore(LangChainVectorStoreAdapter[ElasticsearchStore]): """Elasticsearch vector store client for a RAG pattern. Instantiates the vector store connection in the watsonx.ai environment and handles the necessary operations. The parameters given by the keyword arguments are used to instantiate the vector store client in their particular constructor. Those parameters might be parsed differently. :param api_client: api client is required if connecting by connection_id, defaults to None :type api_client: APIClient, optional :param connection_id: connection asset ID, defaults to None :type connection_id: str, optional :param vector_store: initialized langchain_elasticsearch vector store, defaults to None :type vector_store: langchain_elasticsearch.ElasticsearchStore, optional :param embeddings: default dense embeddings to be used, defaults to None :type embeddings: BaseEmbeddings, optional :param index_name: name of the vector database index, defaults to None :type index_name: str, optional :param document_name_field: mapping field for document name, defaults to `document_id` :type document_name_field: str, optional :param chunk_sequence_number_field: mapping field for chunk sequence number, defaults to `sequence_number` :type chunk_sequence_number_field: str, optional :param query_field: mapping field for text field :type query_field: str, optional :param kwargs: keyword arguments that will be directly passed to `langchain_elasticsearch.ElasticsearchStore` constructor :type kwargs: Any, optional .. note:: For hybrid search (multi-vector search), if no ranker type is specified in strategy, a `weighted` reranker with default weights equal to 1 is used. For more details, see the `langchain-elasticsearch documentation <https://python.langchain.com/docs/integrations/vectorstores/elasticsearch/>`_ and `Elasticsearch documentation <https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html#_combine_approximate_knn_with_other_features>`_. .. warning:: The default retrieval strategy is the same as in ``langchain_elasticsearch.ElasticsearchStore``, i.e. when no strategy is specified the ``elasticsearch.helpers.vectorstore.DenseVectorStrategy`` will be used (see `langchain-elasticsearch` `documentation <https://python.langchain.com/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html#langchain_elasticsearch.vectorstores.ElasticsearchStore>`_). Please note, that this strategy differ from the default one in ``ibm_watsonx.ai.foundation_models.extensions.rag.vector_stores.VectorStore``, where ``elasticsearch.helpers.vectorstore.DenseVectorScriptScoreStrategy`` is used. To ensure the same functionality when migrating from ``VectorStore`` to ``ElasticsearchVectorStore``, you may want to pass ``DenseVectorScriptScoreStrategy(distance=distance_metric)`` explicitly to ``ElasticsearchVectorStore`` constructor. **Example:** To connect, provide the connection asset ID. You can use custom embeddings to add and search documents. .. code-block:: python from ibm_watsonx_ai import APIClient, Credentials from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores import ( ElasticsearchVectorStore, ) from ibm_watsonx_ai.foundation_models.embeddings import Embeddings credentials = Credentials( api_key=IAM_API_KEY, url="https://us-south.ml.cloud.ibm.com" ) api_client = APIClient(credentials) embedding = Embeddings( model_id=EmbeddingTypes.IBM_SLATE_30M_ENG, api_client=api_client ) vector_store = ElasticsearchVectorStore( api_client, connection_id="***", index_name="my_test_index", embeddings=embedding, ) vector_store.add_documents( [ { "content": "document one content", "metadata": {"url": "ibm.com"}, }, { "content": "document two content", "metadata": {"url": "ibm.com"}, }, ] ) vector_store.search("one", k=1) .. note:: Optionally, like in LangChain, it is possible to use direct credentials to connect to Elastic Cloud. The keyword arguments can be used as direct params to LangChain's ``ElasticsearchStore`` constructor. .. code-block:: python from ibm_watsonx_ai import APIClient, Credentials from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores import ( ElasticsearchVectorStore, HybridStrategyElasticsearch, RetrievalOptions, ) credentials = Credentials( api_key=IAM_API_KEY, url="https://us-south.ml.cloud.ibm.com" ) api_client = APIClient(credentials) vector_store = ElasticsearchVectorStore( api_client, index_name="my_test_index", strategy=HybridStrategyElasticsearch( retrieval_strategies={ RetrievalOptions.SPARSE: {"model_id": ".elser"} } ), cloud_id="***", api_key=IAM_API_KEY, ) vector_store.add_documents( [ { "content": "document one content", "metadata": {"url": "ibm.com"}, }, { "content": "document two content", "metadata": {"url": "ibm.com"}, }, ] ) vector_store.search("one", k=1) .. note:: To use hybrid search please specify multiple retrieval strategies in HybridStrategyElasticsearch. Example with weighted ranker. .. code-block:: python from ibm_watsonx_ai import APIClient, Credentials from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores import ( ElasticsearchVectorStore, HybridStrategyElasticsearch, RetrievalOptions, ) credentials = Credentials( api_key=IAM_API_KEY, url="https://us-south.ml.cloud.ibm.com" ) api_client = APIClient(credentials) vector_store = ElasticsearchVectorStore( api_client, connection_id=es_connection_id, index_name="my_test_index", strategy=HybridStrategyElasticsearch( retrieval_strategies={ RetrievalOptions.SPARSE: { "model_id": ".elser", "boost": 0.5, }, RetrievalOptions.BM25: {"boost": 1}, } ), ) vector_store.add_documents( [ { "content": "document one content", "metadata": {"url": "ibm.com"}, }, { "content": "document two content", "metadata": {"url": "ibm.com"}, }, ] ) vector_store.search("one", k=1) Example with rrf ranker: .. code-block:: python from ibm_watsonx_ai import APIClient, Credentials from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores import ( ElasticsearchVectorStore, HybridStrategyElasticsearch, RetrievalOptions, ) credentials = Credentials(api_key=IAM_API_KEY, url="https://us-south.ml.cloud.ibm.com") api_client = APIClient(credentials) vector_store = ElasticsearchVectorStore( api_client, connection_id=es_connection_id, index_name="my_test_index", strategy=HybridStrategyElasticsearch( retrieval_strategies={ RetrievalOptions.SPARSE: {"model_id": ".elser"}, RetrievalOptions.BM25: {}, }, use_rrf=True rrf_params={"k": 50} ), ) vector_store.add_documents( [ {"content": "document one content", "metadata": {"url": "ibm.com"}}, {"content": "document two content", "metadata": {"url": "ibm.com"}}, ] ) vector_store.search("one", k=1) """ def __init__( self, api_client: APIClient | None = None, *, connection_id: str | None = None, vector_store: ElasticsearchStore | None = None, index_name: str | None = None, embedding: BaseEmbeddings | None = None, document_name_field: str = DEFAULT_DOCUMENT_NAME_FIELD, chunk_sequence_number_field: str = DEFAULT_CHUNK_SEQUENCE_NUMBER_FIELD, query_field: str | None = None, **kwargs: Any, ) -> None: self._connection_id = connection_id self._client = api_client self._document_name_field = document_name_field self._chunk_sequence_number_field = chunk_sequence_number_field self._query_field = query_field self._is_serializable = not bool(vector_store) # used in .to_dict method self._embedding = embedding self._index_name = index_name self._index_properties = kwargs ### # import is not at top-level of file due to circular import from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores.vector_store_connector import ( # noqa: E501, PLC0415 VectorStoreConnector, VectorStoreDataSourceType, ) if vector_store is None: if self._client is not None and self._connection_id is not None: self._datasource_type, connection_properties = self._connect_by_type( cast(str, self._connection_id) ) else: self._datasource_type, connection_properties = ( VectorStoreDataSourceType.ELASTICSEARCH, {}, ) logger.info("Initializing vector store of type: %s", self._datasource_type) # overwrite text and vector field names set in langchain_elasticsearch if isinstance( (strategy := self._index_properties.get("strategy")), HybridStrategyElasticsearch, ): if "query_field" not in self._index_properties: self._index_properties["query_field"] = strategy._text_field if "vector_query_field" not in self._index_properties: self._index_properties["vector_query_field"] = ( strategy._dense_vector_field ) self._properties = { **connection_properties, **self._index_properties, "embedding": self._embedding, "index_name": index_name, } if self._query_field is not None: self._properties["query_field"] = self._query_field self._properties = VectorStoreConnector( self._properties )._get_elasticsearch_connection_params() vector_store = ElasticsearchStore(**self._properties) else: self._datasource_type = ( VectorStoreConnector.get_type_from_langchain_vector_store(vector_store) ) self._query_field = vector_store.query_field super().__init__( vector_store=vector_store, document_name_field=self._document_name_field, chunk_sequence_number_field=self._chunk_sequence_number_field, )
[docs] def get_client(self) -> ElasticsearchStore: """Get langchain_elasticsearch.ElasticsearchStore instance.""" return super().get_client()
[docs] def clear(self) -> None: """ Clear index by removing all records. """ es_vs = self.get_client()._store es = self.get_client().client try: es.delete_by_query( index=es_vs.index, body={"query": {"match_all": {}}}, refresh=True ) except elasticsearch.NotFoundError: pass
[docs] def count(self) -> int: """ Count number of records in index. """ es = self.get_client().client return es.count(index=self.get_client()._store.index)["count"]
[docs] def add_documents( self, content: list[str] | list[dict] | list[Document], **kwargs: Any ) -> list[str]: """ Embed documents and add to the vectorstore. :param content: Documents to add to the vectorstore. :type content: list[str] | list[dict] | list[langchain_core.documents.Document] :return: List of IDs of the added texts. :rtype: list[str] """ ids, docs = self._process_documents(content) texts = [doc.page_content for doc in docs] metadatas = [doc.metadata for doc in docs] if len(texts) == 0: return [] return self._fallback_add_documents(ids, docs, texts, metadatas, **kwargs)
def _fallback_add_documents( self, ids: list[str], docs: list[Document], texts: list[str], metadatas: list[Any], chunk_size: int = 500, # default set to 500 text_embeddings: list[tuple[str, list[float]]] | None = None, **kwargs: Any, ) -> list[str]: stop_fallback = False bulk_kwargs: dict[str, Any] | None = kwargs.pop("bulk_kwargs", None) if bulk_kwargs is None: bulk_kwargs = {"chunk_size": chunk_size} elif bulk_kwargs is not None and bulk_kwargs.get("chunk_size") is None: bulk_kwargs["chunk_size"] = chunk_size else: stop_fallback = True if self._embedding and not text_embeddings: vectors = self._embedding.embed_documents(texts) # type: ignore[union-attr] text_embeddings = [(text, vector) for text, vector in zip(texts, vectors)] try: if text_embeddings: return self._langchain_vector_store.add_embeddings( text_embeddings=text_embeddings, # type: ignore[arg-type] metadatas=metadatas, ids=ids, bulk_kwargs=bulk_kwargs, **kwargs, ) else: return self._langchain_vector_store.add_texts( texts=texts, metadatas=metadatas, ids=ids, bulk_kwargs=bulk_kwargs, **kwargs, ) except ConnectionTimeout as e: if chunk_size <= 50 or stop_fallback: raise e return self._fallback_add_documents( ids=ids, docs=docs, texts=texts, metadatas=metadatas, chunk_size=50, text_embeddings=text_embeddings, **kwargs, )
[docs] def to_dict(self) -> dict: """Serialize ``ElasticsearchVectorStore`` into a dict that allows reconstruction using the ``from_dict`` class method. :return: dict for the `from_dict` initialization :rtype: dict :raises VectorStoreSerializationError: when instance is not serializable """ if not self._is_serializable: raise VectorStoreSerializationError( "Serialization is not available when passing vector store instance in `ElasticsearchVectorStore` constructor." ) strategy = self._index_properties.get("strategy") if ( strategy is not None and not isinstance( strategy, HybridStrategyElasticsearch, ) ) or ( self._embedding is not None and not isinstance(self._embedding, BaseEmbeddings) ): raise VectorStoreSerializationError( ( "Serialization is allowed only for `HybridStrategyElasticsearch` strategy or when no strategy is provided and " "dense embeddings is an instance of `ibm_watsonx_ai.foundation_models.embeddings.BaseEmbeddings`." ) ) data_dict = { "connection_id": self._connection_id, "embedding": ( self._embedding.to_dict() if self._embedding is not None else None ), "index_name": self._index_name, **self._index_properties, "datasource_type": str(self._datasource_type), "document_name_field": self._document_name_field, "chunk_sequence_number_field": self._chunk_sequence_number_field, } if strategy is not None: data_dict["strategy"] = strategy.to_dict() if self._query_field is not None: data_dict["query_field"] = self._query_field return data_dict
[docs] @classmethod def from_dict( cls, api_client: APIClient | None = None, data: dict | None = None ) -> "ElasticsearchVectorStore": """Creates ``ElasticsearchVectorStore`` using only a primitive data type dict. :param api_client: initialised APIClient used in vector store constructor, defaults to None :type api_client: APIClient, optional :param data: dict in schema like the ``to_dict()`` method :type data: dict :return: reconstructed ElasticsearchVectorStore :rtype: ElasticsearchVectorStore """ d = copy.deepcopy(data) if isinstance(data, dict) else {} # Remove `datasource_type` if present d.pop("datasource_type", None) d["embedding"] = BaseEmbeddings.from_dict( data=d.get("embedding", {}), api_client=api_client ) strategy_dict = d.get("strategy") if strategy_dict is not None: d["strategy"] = HybridStrategyElasticsearch.from_dict(data=strategy_dict) return cls(api_client, **d)
def _get_window_documents( self, doc_id: str, seq_nums_window: list[int] ) -> list[Document]: """ Receives a document ID and a list of chunks' sequence_numbers, and searches the vector store according to the metadata. :param doc_id: ID of document :type doc_id: str :param seq_nums_window: list of sequence numbers :type seq_nums_window: list[int] :return: list of documents from that document with these sequence_numbers :rtype: list[Document] """ query = { "bool": { "must": [ {"term": {f"metadata.{self._document_name_field}.keyword": doc_id}} ], "filter": [ { "range": { f"metadata.{self._chunk_sequence_number_field}": { "gte": seq_nums_window[0], "lte": seq_nums_window[-1], } } } ], } } response = self._langchain_vector_store.search( query="*:*", search_type="similarity", filter=query, k=len(seq_nums_window) ) window_documents = [ Document( page_content=doc.page_content, metadata={ self._chunk_sequence_number_field: doc.metadata[ self._chunk_sequence_number_field ], self._document_name_field: doc.metadata[self._document_name_field], }, ) for doc in response ] return window_documents