Coverage for mcpgateway / utils / redis_client.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Centralized Redis client factory for consistent configuration. 

3 

4This module provides a single source of truth for Redis client creation, 

5ensuring all services use the same connection pool and settings. 

6 

7Performance: Uses hiredis C parser by default (ADR-026) for up to 83x faster 

8response parsing on large responses. Falls back to pure-Python parser if 

9hiredis is unavailable or explicitly disabled via REDIS_PARSER setting. 

10 

11SPDX-License-Identifier: Apache-2.0 

12 

13Usage: 

14 from mcpgateway.utils.redis_client import get_redis_client, close_redis_client 

15 

16 # In async context: 

17 client = await get_redis_client() 

18 if client: 

19 await client.set("key", "value") 

20 

21 # On shutdown: 

22 await close_redis_client() 

23""" 

24 

25# Standard 

26import logging 

27from typing import Any, Optional 

28 

29logger = logging.getLogger(__name__) 

30 

31# Track which parser is being used for logging 

32_parser_info: Optional[str] = None 

33 

34_client: Optional[Any] = None 

35_initialized: bool = False 

36 

37 

38def _is_hiredis_available() -> bool: 

39 """Check if hiredis library is available and functional. 

40 

41 Returns: 

42 bool: True if hiredis can be used, False otherwise. 

43 """ 

44 try: 

45 # Third-Party 

46 import hiredis # noqa: F401 

47 

48 return True 

49 except ImportError: 

50 return False 

51 

52 

53def _get_async_parser_class(parser_setting: str) -> tuple[Any, str]: 

54 """Get the appropriate async Redis parser class based on settings. 

55 

56 Args: 

57 parser_setting: One of "auto", "hiredis", or "python" 

58 

59 Returns: 

60 Tuple of (parser_class or None, parser_name) where parser_class is None 

61 for auto-detection (redis-py default behavior) 

62 

63 Raises: 

64 ImportError: If hiredis is required but not available 

65 """ 

66 if parser_setting == "python": 

67 # Force pure-Python async parser 

68 # Third-Party 

69 from redis._parsers import _AsyncRESP2Parser 

70 

71 return _AsyncRESP2Parser, "AsyncRESP2Parser (pure-Python)" 

72 

73 if parser_setting == "hiredis": 

74 # Require hiredis - fail if not available 

75 if not _is_hiredis_available(): 

76 raise ImportError("REDIS_PARSER=hiredis requires hiredis to be installed. " "Install with: pip install 'redis[hiredis]'") 

77 # Don't set parser_class explicitly - let redis-py auto-detect for async 

78 # Setting _AsyncHiredisParser explicitly can cause issues 

79 return None, "AsyncHiredisParser (C extension)" 

80 

81 # "auto" mode - let redis-py auto-detect (prefers hiredis if available) 

82 if _is_hiredis_available(): 

83 return None, "AsyncHiredisParser (C extension, auto-detected)" 

84 return None, "AsyncRESP2Parser (pure-Python, auto-detected)" 

85 

86 

87async def get_redis_client() -> Optional[Any]: 

88 """Get or create the shared async Redis client. 

89 

90 Uses hiredis C parser by default for up to 83x faster response parsing. 

91 Parser selection controlled by REDIS_PARSER setting (auto/hiredis/python). 

92 

93 Returns: 

94 Optional[Redis]: Async Redis client, or None if Redis is disabled/unavailable. 

95 

96 Examples: 

97 >>> import asyncio 

98 >>> # When Redis is disabled 

99 >>> async def test_disabled(): 

100 ... from mcpgateway.config import settings 

101 ... original = settings.cache_type 

102 ... settings.cache_type = "memory" 

103 ... from mcpgateway.utils.redis_client import get_redis_client, _reset_client 

104 ... _reset_client() 

105 ... client = await get_redis_client() 

106 ... settings.cache_type = original 

107 ... _reset_client() 

108 ... return client is None 

109 >>> asyncio.run(test_disabled()) 

110 True 

111 """ 

112 global _client, _initialized, _parser_info 

113 

114 if _initialized: 

115 return _client 

116 

117 # First-Party 

118 from mcpgateway.config import settings 

119 

120 if settings.cache_type != "redis" or not settings.redis_url: 

121 logger.info("Redis disabled (cache_type != 'redis' or no redis_url)") 

122 _initialized = True 

123 return None 

124 

125 try: 

126 # Third-Party 

127 import redis.asyncio as aioredis 

128 except ImportError: 

129 logger.warning("redis.asyncio not available, Redis disabled") 

130 _initialized = True 

131 return None 

132 

133 try: 

134 # Get parser configuration (ADR-026) 

135 parser_class, _parser_info = _get_async_parser_class(settings.redis_parser) 

136 

137 # Build connection kwargs 

138 connection_kwargs: dict[str, Any] = { 

139 "decode_responses": settings.redis_decode_responses, 

140 "max_connections": settings.redis_max_connections, 

141 "socket_timeout": settings.redis_socket_timeout, 

142 "socket_connect_timeout": settings.redis_socket_connect_timeout, 

143 "retry_on_timeout": settings.redis_retry_on_timeout, 

144 "health_check_interval": settings.redis_health_check_interval, 

145 "encoding": "utf-8", 

146 "single_connection_client": False, 

147 } 

148 

149 # Only specify parser_class if explicitly set (not auto) 

150 if parser_class is not None: 

151 connection_kwargs["parser_class"] = parser_class 

152 

153 _client = aioredis.from_url(settings.redis_url, **connection_kwargs) 

154 await _client.ping() 

155 logger.info( 

156 f"Redis client initialized: parser={_parser_info}, " 

157 f"pool_size={settings.redis_max_connections}, " 

158 f"timeout={settings.redis_socket_timeout}s, " 

159 f"health_check={settings.redis_health_check_interval}s" 

160 ) 

161 except ImportError as e: 

162 logger.error(f"Redis parser configuration error: {e}") 

163 _client = None 

164 except Exception as e: 

165 logger.warning(f"Failed to connect to Redis: {e}") 

166 _client = None 

167 

168 _initialized = True 

169 return _client 

170 

171 

172async def close_redis_client() -> None: 

173 """Close the shared Redis client and release connections.""" 

174 global _client, _initialized 

175 

176 if _client: 

177 try: 

178 await _client.aclose() 

179 logger.info("Redis client closed") 

180 except Exception as e: 

181 logger.warning(f"Error closing Redis client: {e}") 

182 

183 _client = None 

184 _initialized = False 

185 

186 

187def get_redis_client_sync() -> Optional[Any]: 

188 """Get cached Redis client synchronously (returns None if not initialized). 

189 

190 This is useful for non-async contexts that need to check if Redis is available, 

191 but should not be used to initialize the client. 

192 

193 Returns: 

194 Optional[Redis]: The cached Redis client, or None if not initialized. 

195 """ 

196 return _client 

197 

198 

199async def is_redis_available() -> bool: 

200 """Check if Redis is available and connected. 

201 

202 Returns: 

203 bool: True if Redis is available and responding to ping. 

204 

205 Examples: 

206 >>> import asyncio 

207 >>> async def test_unavailable(): 

208 ... from mcpgateway.config import settings 

209 ... original = settings.cache_type 

210 ... settings.cache_type = "memory" 

211 ... from mcpgateway.utils.redis_client import is_redis_available, _reset_client 

212 ... _reset_client() 

213 ... result = await is_redis_available() 

214 ... settings.cache_type = original 

215 ... _reset_client() 

216 ... return result 

217 >>> asyncio.run(test_unavailable()) 

218 False 

219 """ 

220 client = await get_redis_client() 

221 if not client: 

222 return False 

223 try: 

224 await client.ping() 

225 return True 

226 except Exception: 

227 return False 

228 

229 

230def get_redis_parser_info() -> Optional[str]: 

231 """Get information about which Redis parser is being used. 

232 

233 Returns: 

234 Optional[str]: Parser description string, or None if Redis not initialized. 

235 """ 

236 return _parser_info 

237 

238 

239def _reset_client() -> None: 

240 """Reset client state (for testing only).""" 

241 global _client, _initialized, _parser_info 

242 _client = None 

243 _initialized = False 

244 _parser_info = None