Skip to content

src.database.elastic_client.ElasticsearchClient

Bases: DatabaseAdapter

Asynchronous Elasticsearch client adapter for database operations.

This class implements the DatabaseAdapter interface to provide asynchronous interaction with Elasticsearch. It handles document indexing, searching, index management, and other essential Elasticsearch operations.

The client requires Elasticsearch endpoint and API key to be set in environment variables ES_ENDPOINT and ES_API_KEY respectively.

Attributes:

Name Type Description
client AsyncElasticsearch

Async Elasticsearch client instance configured with the provided endpoint and API key.

Raises:

Type Description
ValueError

If either ES_ENDPOINT or ES_API_KEY environment variables are not set.

Example
# Initialize the client
es_client = ElasticsearchClient()

# Add a document
document = {"title": "Example", "content": "Sample text"}
await es_client.add(document, "my_index")

# Search for documents
query = {"query": {"match": {"content": "sample"}}}
results = await es_client.search(query, "my_index")
Source code in src/database/elastic_client.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class ElasticsearchClient(DatabaseAdapter):
    """Asynchronous Elasticsearch client adapter for database operations.

    This class implements the DatabaseAdapter interface to provide asynchronous
    interaction with Elasticsearch. It handles document indexing, searching,
    index management, and other essential Elasticsearch operations.

    The client requires Elasticsearch endpoint and API key to be set in environment
    variables ES_ENDPOINT and ES_API_KEY respectively.

    Attributes:
        client (AsyncElasticsearch): Async Elasticsearch client instance configured
            with the provided endpoint and API key.

    Raises:
        ValueError: If either ES_ENDPOINT or ES_API_KEY environment variables are not set.

    Example:
        ```python
        # Initialize the client
        es_client = ElasticsearchClient()

        # Add a document
        document = {"title": "Example", "content": "Sample text"}
        await es_client.add(document, "my_index")

        # Search for documents
        query = {"query": {"match": {"content": "sample"}}}
        results = await es_client.search(query, "my_index")
        ```
    """
    def __init__(self, verify_certs: bool = True):
        """Initialize the Elasticsearch client with credentials from environment variables.

        Establishes connection to Elasticsearch using endpoint and API key from
        environment variables. Sets up the async client with specific configurations
        for SSL verification and request timeout.

        Raises:
            ValueError: If required environment variables are not set.
        """
        es_endpoint = os.getenv("ES_ENDPOINT")
        es_api_key = os.getenv("ES_API_KEY")

        if not es_endpoint or not es_api_key:
            raise ValueError("Elasticsearch endpoint and API key must be provided.")

        self.client = AsyncElasticsearch(
            es_endpoint,
            api_key=es_api_key,
            verify_certs=verify_certs,
            request_timeout=60,
        )

    async def add(self, document, index_name):
        """Add a document to the specified Elasticsearch index.

        Args:
            document: Dictionary containing the document data to be indexed.
            index_name (str): Name of the index to add the document to.

        Returns:
            dict: Elasticsearch response containing the indexing result.

        Example:
            ```python
            response = await client.add(
                {"title": "Test", "content": "Content"},
                "my_index"
            )
            ```
        """
        response = await self.client.index(index=index_name, body=document)
        return response

    async def search(self, query_body: dict, index_name: str, size: int = 5):
        """Search for documents in the specified index.

        Args:
            query_body (dict): Elasticsearch query DSL dictionary.
            index_name (str): Name of the index to search in.
            size (int, optional): Maximum number of results to return. Defaults to 5.

        Returns:
            dict: Elasticsearch response containing search results.

        Example:
            ```python
            query = {
                "query": {
                    "match": {
                        "content": "search text"
                    }
                }
            }
            results = await client.search(query, "my_index", size=10)
            ```
        """
        response = await self.client.search(
            index=index_name,
            body=query_body,
            size=size,
        )
        return response

    async def reset(self, index_name):
        """Delete all documents from the specified index.

        Args:
            index_name (str): Name of the index to reset.

        Example:
            ```python
            await client.reset("my_index")
            ```
        """
        await self.client.delete_by_query(
            index=index_name,
            body={"query": {"match_all": {}}},
        )

    async def create_index(self, index_name, settings=None, mappings=None):
        """Create a new Elasticsearch index if it doesn't exist.

        Args:
            index_name (str): Name of the index to create.
            settings (dict, optional): Index settings configuration. Defaults to empty dict.
            mappings (dict, optional): Index mappings configuration. Defaults to empty dict.

        Example:
            ```python
            settings = {"number_of_shards": 1}
            mappings = {
                "properties": {
                    "title": {"type": "text"},
                    "content": {"type": "text"}
                }
            }
            await client.create_index("my_index", settings, mappings)
            ```
        """
        if not await self.client.indices.exists(index=index_name):
            await self.client.indices.create(index=index_name, body={
                "settings": settings if settings else {},
                "mappings": mappings if mappings else {}
            })

    async def index_exists(self, index_name: str) -> bool:
        """Check if an index exists.

        Args:
            index_name (str): Name of the index to check.

        Returns:
            bool: True if index exists, False otherwise.

        Example:
            ```python
            exists = await client.index_exists("my_index")
            if not exists:
                await client.create_index("my_index")
            ```
        """
        return await self.client.indices.exists(index=index_name)

__init__(verify_certs=True)

Initialize the Elasticsearch client with credentials from environment variables.

Establishes connection to Elasticsearch using endpoint and API key from environment variables. Sets up the async client with specific configurations for SSL verification and request timeout.

Raises:

Type Description
ValueError

If required environment variables are not set.

Source code in src/database/elastic_client.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, verify_certs: bool = True):
    """Initialize the Elasticsearch client with credentials from environment variables.

    Establishes connection to Elasticsearch using endpoint and API key from
    environment variables. Sets up the async client with specific configurations
    for SSL verification and request timeout.

    Raises:
        ValueError: If required environment variables are not set.
    """
    es_endpoint = os.getenv("ES_ENDPOINT")
    es_api_key = os.getenv("ES_API_KEY")

    if not es_endpoint or not es_api_key:
        raise ValueError("Elasticsearch endpoint and API key must be provided.")

    self.client = AsyncElasticsearch(
        es_endpoint,
        api_key=es_api_key,
        verify_certs=verify_certs,
        request_timeout=60,
    )

add(document, index_name) async

Add a document to the specified Elasticsearch index.

Parameters:

Name Type Description Default
document

Dictionary containing the document data to be indexed.

required
index_name str

Name of the index to add the document to.

required

Returns:

Name Type Description
dict

Elasticsearch response containing the indexing result.

Example
response = await client.add(
    {"title": "Test", "content": "Content"},
    "my_index"
)
Source code in src/database/elastic_client.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def add(self, document, index_name):
    """Add a document to the specified Elasticsearch index.

    Args:
        document: Dictionary containing the document data to be indexed.
        index_name (str): Name of the index to add the document to.

    Returns:
        dict: Elasticsearch response containing the indexing result.

    Example:
        ```python
        response = await client.add(
            {"title": "Test", "content": "Content"},
            "my_index"
        )
        ```
    """
    response = await self.client.index(index=index_name, body=document)
    return response

create_index(index_name, settings=None, mappings=None) async

Create a new Elasticsearch index if it doesn't exist.

Parameters:

Name Type Description Default
index_name str

Name of the index to create.

required
settings dict

Index settings configuration. Defaults to empty dict.

None
mappings dict

Index mappings configuration. Defaults to empty dict.

None
Example
settings = {"number_of_shards": 1}
mappings = {
    "properties": {
        "title": {"type": "text"},
        "content": {"type": "text"}
    }
}
await client.create_index("my_index", settings, mappings)
Source code in src/database/elastic_client.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def create_index(self, index_name, settings=None, mappings=None):
    """Create a new Elasticsearch index if it doesn't exist.

    Args:
        index_name (str): Name of the index to create.
        settings (dict, optional): Index settings configuration. Defaults to empty dict.
        mappings (dict, optional): Index mappings configuration. Defaults to empty dict.

    Example:
        ```python
        settings = {"number_of_shards": 1}
        mappings = {
            "properties": {
                "title": {"type": "text"},
                "content": {"type": "text"}
            }
        }
        await client.create_index("my_index", settings, mappings)
        ```
    """
    if not await self.client.indices.exists(index=index_name):
        await self.client.indices.create(index=index_name, body={
            "settings": settings if settings else {},
            "mappings": mappings if mappings else {}
        })

index_exists(index_name) async

Check if an index exists.

Parameters:

Name Type Description Default
index_name str

Name of the index to check.

required

Returns:

Name Type Description
bool bool

True if index exists, False otherwise.

Example
exists = await client.index_exists("my_index")
if not exists:
    await client.create_index("my_index")
Source code in src/database/elastic_client.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def index_exists(self, index_name: str) -> bool:
    """Check if an index exists.

    Args:
        index_name (str): Name of the index to check.

    Returns:
        bool: True if index exists, False otherwise.

    Example:
        ```python
        exists = await client.index_exists("my_index")
        if not exists:
            await client.create_index("my_index")
        ```
    """
    return await self.client.indices.exists(index=index_name)

reset(index_name) async

Delete all documents from the specified index.

Parameters:

Name Type Description Default
index_name str

Name of the index to reset.

required
Example
await client.reset("my_index")
Source code in src/database/elastic_client.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def reset(self, index_name):
    """Delete all documents from the specified index.

    Args:
        index_name (str): Name of the index to reset.

    Example:
        ```python
        await client.reset("my_index")
        ```
    """
    await self.client.delete_by_query(
        index=index_name,
        body={"query": {"match_all": {}}},
    )

search(query_body, index_name, size=5) async

Search for documents in the specified index.

Parameters:

Name Type Description Default
query_body dict

Elasticsearch query DSL dictionary.

required
index_name str

Name of the index to search in.

required
size int

Maximum number of results to return. Defaults to 5.

5

Returns:

Name Type Description
dict

Elasticsearch response containing search results.

Example
query = {
    "query": {
        "match": {
            "content": "search text"
        }
    }
}
results = await client.search(query, "my_index", size=10)
Source code in src/database/elastic_client.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def search(self, query_body: dict, index_name: str, size: int = 5):
    """Search for documents in the specified index.

    Args:
        query_body (dict): Elasticsearch query DSL dictionary.
        index_name (str): Name of the index to search in.
        size (int, optional): Maximum number of results to return. Defaults to 5.

    Returns:
        dict: Elasticsearch response containing search results.

    Example:
        ```python
        query = {
            "query": {
                "match": {
                    "content": "search text"
                }
            }
        }
        results = await client.search(query, "my_index", size=10)
        ```
    """
    response = await self.client.search(
        index=index_name,
        body=query_body,
        size=size,
    )
    return response