Coverage for mcpgateway / utils / redis_client.py: 100%
79 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Centralized Redis client factory for consistent configuration.
4This module provides a single source of truth for Redis client creation,
5ensuring all services use the same connection pool and settings.
7Performance: Uses hiredis C parser by default (ADR-026) for up to 83x faster
8response parsing on large responses. Falls back to pure-Python parser if
9hiredis is unavailable or explicitly disabled via REDIS_PARSER setting.
11SPDX-License-Identifier: Apache-2.0
13Usage:
14 from mcpgateway.utils.redis_client import get_redis_client, close_redis_client
16 # In async context:
17 client = await get_redis_client()
18 if client:
19 await client.set("key", "value")
21 # On shutdown:
22 await close_redis_client()
23"""
25# Standard
26import logging
27from typing import Any, Optional
29logger = logging.getLogger(__name__)
31# Track which parser is being used for logging
32_parser_info: Optional[str] = None
34_client: Optional[Any] = None
35_initialized: bool = False
38def _is_hiredis_available() -> bool:
39 """Check if hiredis library is available and functional.
41 Returns:
42 bool: True if hiredis can be used, False otherwise.
43 """
44 try:
45 # Third-Party
46 import hiredis # noqa: F401
48 return True
49 except ImportError:
50 return False
53def _get_async_parser_class(parser_setting: str) -> tuple[Any, str]:
54 """Get the appropriate async Redis parser class based on settings.
56 Args:
57 parser_setting: One of "auto", "hiredis", or "python"
59 Returns:
60 Tuple of (parser_class or None, parser_name) where parser_class is None
61 for auto-detection (redis-py default behavior)
63 Raises:
64 ImportError: If hiredis is required but not available
65 """
66 if parser_setting == "python":
67 # Force pure-Python async parser
68 # Third-Party
69 from redis._parsers import _AsyncRESP2Parser
71 return _AsyncRESP2Parser, "AsyncRESP2Parser (pure-Python)"
73 if parser_setting == "hiredis":
74 # Require hiredis - fail if not available
75 if not _is_hiredis_available():
76 raise ImportError("REDIS_PARSER=hiredis requires hiredis to be installed. " "Install with: pip install 'redis[hiredis]'")
77 # Don't set parser_class explicitly - let redis-py auto-detect for async
78 # Setting _AsyncHiredisParser explicitly can cause issues
79 return None, "AsyncHiredisParser (C extension)"
81 # "auto" mode - let redis-py auto-detect (prefers hiredis if available)
82 if _is_hiredis_available():
83 return None, "AsyncHiredisParser (C extension, auto-detected)"
84 return None, "AsyncRESP2Parser (pure-Python, auto-detected)"
87async def get_redis_client() -> Optional[Any]:
88 """Get or create the shared async Redis client.
90 Uses hiredis C parser by default for up to 83x faster response parsing.
91 Parser selection controlled by REDIS_PARSER setting (auto/hiredis/python).
93 Returns:
94 Optional[Redis]: Async Redis client, or None if Redis is disabled/unavailable.
96 Examples:
97 >>> import asyncio
98 >>> # When Redis is disabled
99 >>> async def test_disabled():
100 ... from mcpgateway.config import settings
101 ... original = settings.cache_type
102 ... settings.cache_type = "memory"
103 ... from mcpgateway.utils.redis_client import get_redis_client, _reset_client
104 ... _reset_client()
105 ... client = await get_redis_client()
106 ... settings.cache_type = original
107 ... _reset_client()
108 ... return client is None
109 >>> asyncio.run(test_disabled())
110 True
111 """
112 global _client, _initialized, _parser_info
114 if _initialized:
115 return _client
117 # First-Party
118 from mcpgateway.config import settings
120 if settings.cache_type != "redis" or not settings.redis_url:
121 logger.info("Redis disabled (cache_type != 'redis' or no redis_url)")
122 _initialized = True
123 return None
125 try:
126 # Third-Party
127 import redis.asyncio as aioredis
128 except ImportError:
129 logger.warning("redis.asyncio not available, Redis disabled")
130 _initialized = True
131 return None
133 try:
134 # Get parser configuration (ADR-026)
135 parser_class, _parser_info = _get_async_parser_class(settings.redis_parser)
137 # Build connection kwargs
138 connection_kwargs: dict[str, Any] = {
139 "decode_responses": settings.redis_decode_responses,
140 "max_connections": settings.redis_max_connections,
141 "socket_timeout": settings.redis_socket_timeout,
142 "socket_connect_timeout": settings.redis_socket_connect_timeout,
143 "retry_on_timeout": settings.redis_retry_on_timeout,
144 "health_check_interval": settings.redis_health_check_interval,
145 "encoding": "utf-8",
146 "single_connection_client": False,
147 }
149 # Only specify parser_class if explicitly set (not auto)
150 if parser_class is not None:
151 connection_kwargs["parser_class"] = parser_class
153 _client = aioredis.from_url(settings.redis_url, **connection_kwargs)
154 await _client.ping()
155 logger.info(
156 f"Redis client initialized: parser={_parser_info}, "
157 f"pool_size={settings.redis_max_connections}, "
158 f"timeout={settings.redis_socket_timeout}s, "
159 f"health_check={settings.redis_health_check_interval}s"
160 )
161 except ImportError as e:
162 logger.error(f"Redis parser configuration error: {e}")
163 _client = None
164 except Exception as e:
165 logger.warning(f"Failed to connect to Redis: {e}")
166 _client = None
168 _initialized = True
169 return _client
172async def close_redis_client() -> None:
173 """Close the shared Redis client and release connections."""
174 global _client, _initialized
176 if _client:
177 try:
178 await _client.aclose()
179 logger.info("Redis client closed")
180 except Exception as e:
181 logger.warning(f"Error closing Redis client: {e}")
183 _client = None
184 _initialized = False
187def get_redis_client_sync() -> Optional[Any]:
188 """Get cached Redis client synchronously (returns None if not initialized).
190 This is useful for non-async contexts that need to check if Redis is available,
191 but should not be used to initialize the client.
193 Returns:
194 Optional[Redis]: The cached Redis client, or None if not initialized.
195 """
196 return _client
199async def is_redis_available() -> bool:
200 """Check if Redis is available and connected.
202 Returns:
203 bool: True if Redis is available and responding to ping.
205 Examples:
206 >>> import asyncio
207 >>> async def test_unavailable():
208 ... from mcpgateway.config import settings
209 ... original = settings.cache_type
210 ... settings.cache_type = "memory"
211 ... from mcpgateway.utils.redis_client import is_redis_available, _reset_client
212 ... _reset_client()
213 ... result = await is_redis_available()
214 ... settings.cache_type = original
215 ... _reset_client()
216 ... return result
217 >>> asyncio.run(test_unavailable())
218 False
219 """
220 client = await get_redis_client()
221 if not client:
222 return False
223 try:
224 await client.ping()
225 return True
226 except Exception:
227 return False
230def get_redis_parser_info() -> Optional[str]:
231 """Get information about which Redis parser is being used.
233 Returns:
234 Optional[str]: Parser description string, or None if Redis not initialized.
235 """
236 return _parser_info
239def _reset_client() -> None:
240 """Reset client state (for testing only)."""
241 global _client, _initialized, _parser_info
242 _client = None
243 _initialized = False
244 _parser_info = None