Coverage for mcpgateway / utils / redis_isready.py: 100%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/utils/redis_isready.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Reeve Barreto, Mihai Criveti 

7 

8redis_isready - Wait until Redis is ready and accepting connections 

9This helper blocks until the given **Redis** server (defined by a connection URL) 

10successfully responds to a `PING` command. It is intended to delay application startup until Redis is online. 

11 

12It can be used both **synchronously** or **asynchronously**, and will retry 

13connections with a configurable interval and number of attempts. 

14 

15Exit codes when executed as a script 

16----------------------------------- 

17* ``0`` - Redis ready. 

18* ``1`` - all attempts exhausted / timed-out. 

19* ``2`` - :pypi:`redis` is **not** installed. 

20* ``3`` - invalid parameter combination (``max_retries``/``retry_interval_ms``). 

21 

22Features 

23-------- 

24* Supports any valid Redis URL supported by :pypi:`redis`. 

25* **Exponential backoff with jitter** - prevents thundering herd on reconnect: 

26 - Retry delays: 2s → 4s → 8s → 16s → 30s (capped) → 30s... 

27 - Random jitter of ±25% prevents synchronized reconnection storms 

28 - Default: 30 retries ≈ 5 minutes total wait before giving up 

29* Retry settings are configurable via *environment variables*. 

30* Works both **synchronously** (blocking) and **asynchronously**. 

31 

32Environment variables 

33--------------------- 

34These environment variables can be used to configure retry behavior and Redis connection. 

35 

36+-------------------------------+-----------------------------------------------+-----------------------------+ 

37| Name | Description | Default | 

38+===============================+===============================================+=============================+ 

39| ``REDIS_URL`` | Redis connection URL | ``redis://localhost:6379/0``| 

40| ``REDIS_MAX_RETRIES`` | Maximum retry attempts before failing | ``30`` | 

41| ``REDIS_RETRY_INTERVAL_MS`` | Base delay between retries *(milliseconds)* | ``2000`` | 

42| ``REDIS_MAX_BACKOFF_SECONDS`` | Max backoff cap *(seconds, jitter added)* | ``30`` | 

43| ``LOG_LEVEL`` | Log verbosity when not set via ``--log-level``| ``INFO`` | 

44+-------------------------------+-----------------------------------------------+-----------------------------+ 

45 

46Usage examples 

47-------------- 

48Shell :: 

49 

50 python3 redis_isready.py 

51 python3 redis_isready.py --redis-url "redis://localhost:6379/0" 

52 --max-retries 5 --retry-interval-ms 500 

53 

54Python :: 

55 

56 from mcpgateway.utils.redis_isready import wait_for_redis_ready 

57 

58 # Synchronous/blocking 

59 wait_for_redis_ready(sync=True) 

60 

61 # Asynchronous 

62 import asyncio 

63 asyncio.run(wait_for_redis_ready()) 

64 

65Doctest examples 

66---------------- 

67>>> from mcpgateway.utils.redis_isready import wait_for_redis_ready 

68>>> import logging 

69>>> class DummyLogger: 

70... def __init__(self): self.infos = [] 

71... def info(self, msg): self.infos.append(msg) 

72... def debug(self, msg): pass 

73... def error(self, msg): pass 

74... @property 

75... def handlers(self): return [True] 

76>>> def dummy_probe(*args, **kwargs): return None 

77>>> import sys 

78>>> sys.modules['redis'] = type('redis', (), {'Redis': type('Redis', (), {'from_url': lambda url: type('R', (), {'ping': lambda self: True})()})}) 

79>>> wait_for_redis_ready(redis_url='redis://localhost:6379/0', max_retries=1, retry_interval_ms=1, logger=DummyLogger(), sync=True) 

80 

81>>> try: 

82... wait_for_redis_ready(redis_url='redis://localhost:6379/0', max_retries=0, retry_interval_ms=1, logger=DummyLogger(), sync=True) 

83... except RuntimeError as e: 

84... print('error') 

85error 

86""" 

87 

88# Standard 

89import argparse 

90import asyncio 

91import logging 

92import os 

93import random 

94import sys 

95import time 

96from typing import Any, Optional 

97 

98# First-Party 

99# First Party imports 

100from mcpgateway.config import settings 

101 

102# Environment variables 

103REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") 

104REDIS_MAX_RETRIES = int(os.getenv("REDIS_MAX_RETRIES", "30")) 

105REDIS_RETRY_INTERVAL_MS = int(os.getenv("REDIS_RETRY_INTERVAL_MS", "2000")) 

106REDIS_MAX_BACKOFF_SECONDS = float(os.getenv("REDIS_MAX_BACKOFF_SECONDS", "30")) 

107 

108LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() 

109 

110 

111def wait_for_redis_ready( 

112 *, 

113 redis_url: str = REDIS_URL, 

114 max_retries: int = REDIS_MAX_RETRIES, 

115 retry_interval_ms: int = REDIS_RETRY_INTERVAL_MS, 

116 max_backoff: float = REDIS_MAX_BACKOFF_SECONDS, 

117 logger: Optional[logging.Logger] = None, 

118 sync: bool = False, 

119) -> None: 

120 """ 

121 Wait until a Redis server is ready to accept connections. 

122 

123 This function attempts to connect to Redis and issue a `PING` command, 

124 retrying if the connection fails. It can run synchronously (blocking) 

125 or asynchronously using an executor. Intended for use during service 

126 startup to ensure Redis is reachable before proceeding. 

127 

128 Uses **exponential backoff with jitter** to prevent thundering herd when 

129 multiple workers attempt to reconnect simultaneously. The delay between 

130 attempts doubles each time (capped at max_backoff), with ±25% random jitter. 

131 

132 Example retry progression with retry_interval_ms=2000, max_backoff=30s: 

133 Attempt 1: 2s, Attempt 2: 4s, Attempt 3: 8s, Attempt 4: 16s, 

134 Attempt 5+: 30s (capped), each ±25% jitter 

135 

136 Args: 

137 redis_url : str 

138 Redis connection URL. Defaults to the value of the `REDIS_URL` environment variable. 

139 max_retries : int 

140 Maximum number of connection attempts before failing. 

141 retry_interval_ms : int 

142 Base delay between retry attempts, in milliseconds. Actual delay uses 

143 exponential backoff: ``min(interval * 2^(attempt-1), max_backoff)``, then ±25% jitter. 

144 max_backoff : float 

145 Maximum backoff delay in seconds (default 30). Jitter is applied after this cap, 

146 so actual sleep can be ±25% of this value. 

147 logger : logging.Logger, optional 

148 Logger instance to use. If not provided, a default logger is configured. 

149 sync : bool 

150 If True, runs the probe synchronously. If False (default), runs it asynchronously. 

151 

152 Raises: 

153 RuntimeError: If Redis does not respond successfully after all retry attempts. 

154 

155 Examples: 

156 >>> from mcpgateway.utils.redis_isready import wait_for_redis_ready 

157 >>> import logging 

158 >>> class DummyLogger: 

159 ... def __init__(self): self.infos = [] 

160 ... def info(self, msg): self.infos.append(msg) 

161 ... def debug(self, msg): pass 

162 ... def error(self, msg): pass 

163 ... @property 

164 ... def handlers(self): return [True] 

165 >>> import sys 

166 >>> sys.modules['redis'] = type('redis', (), {'Redis': type('Redis', (), {'from_url': lambda url: type('R', (), {'ping': lambda self: True})()})}) 

167 >>> wait_for_redis_ready(redis_url='redis://localhost:6379/0', max_retries=1, retry_interval_ms=1, logger=DummyLogger(), sync=True) 

168 >>> try: 

169 ... wait_for_redis_ready(redis_url='redis://localhost:6379/0', max_retries=0, retry_interval_ms=1, logger=DummyLogger(), sync=True) 

170 ... except RuntimeError as e: 

171 ... print('error') 

172 error 

173 """ 

174 log = logger or logging.getLogger("redis_isready") 

175 if not log.handlers: # basicConfig **once** - respects *log.setLevel* later 

176 logging.basicConfig( 

177 level=getattr(logging, LOG_LEVEL, logging.INFO), 

178 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

179 datefmt="%Y-%m-%dT%H:%M:%S", 

180 ) 

181 

182 if max_retries < 1 or retry_interval_ms <= 0: 

183 raise RuntimeError("Invalid max_retries or retry_interval_ms values") 

184 

185 log.info(f"Probing Redis at {redis_url} (interval={retry_interval_ms}ms, max_retries={max_retries}, max_backoff={max_backoff}s)") 

186 

187 def _probe(*_: Any) -> None: 

188 """ 

189 Inner synchronous probe running in either the current or a thread. 

190 

191 Args: 

192 *_: Ignored arguments (for compatibility with run_in_executor). 

193 

194 Returns: 

195 None - the function exits successfully once Redis answers. 

196 

197 Raises: 

198 RuntimeError: Forwarded after exhausting ``max_retries`` attempts. 

199 """ 

200 try: 

201 # Import redis here to avoid dependency issues if not used 

202 # Third-Party 

203 from redis import Redis 

204 except ImportError: # pragma: no cover - handled at runtime for the CLI 

205 sys.stderr.write("redis library not installed - aborting (pip install redis)\n") 

206 sys.exit(2) 

207 

208 redis_client = Redis.from_url(redis_url) 

209 interval_s = retry_interval_ms / 1000.0 # Convert to seconds 

210 for attempt in range(1, max_retries + 1): 

211 try: 

212 redis_client.ping() 

213 log.info(f"Redis ready (attempt {attempt})") 

214 return 

215 except Exception as exc: 

216 if attempt < max_retries: # Don't sleep on the last attempt 

217 # Exponential backoff: interval * 2^(attempt-1), capped at max_backoff 

218 backoff = min(interval_s * (2 ** (attempt - 1)), max_backoff) 

219 # Add jitter (±25%) to prevent thundering herd 

220 jitter = backoff * random.uniform(-0.25, 0.25) # noqa: DUO102 # nosec B311 - timing jitter, not security 

221 sleep_time = max(0.1, backoff + jitter) # Ensure minimum 0.1s 

222 log.debug(f"Attempt {attempt}/{max_retries} failed ({exc}) - retrying in {sleep_time:.1f}s") 

223 time.sleep(sleep_time) 

224 else: 

225 log.debug(f"Attempt {attempt}/{max_retries} failed ({exc})") 

226 raise RuntimeError(f"Redis not ready after {max_retries} attempts") 

227 

228 if sync: 

229 _probe() 

230 else: 

231 loop = asyncio.get_event_loop() 

232 loop.run_until_complete(loop.run_in_executor(None, _probe)) 

233 

234 

235# --------------------------------------------------------------------------- 

236# CLI helpers 

237# --------------------------------------------------------------------------- 

238 

239 

240def _parse_cli() -> argparse.Namespace: 

241 """Parse command-line arguments for the *redis_isready* CLI wrapper. 

242 

243 Returns: 

244 Parsed :class:`argparse.Namespace` holding all CLI options. 

245 

246 Examples: 

247 >>> import sys 

248 >>> # Save original argv 

249 >>> original_argv = sys.argv 

250 >>> 

251 >>> # Test with default values 

252 >>> sys.argv = ['redis_isready.py'] 

253 >>> args = _parse_cli() 

254 >>> args.redis_url == REDIS_URL 

255 True 

256 >>> args.max_retries == REDIS_MAX_RETRIES 

257 True 

258 >>> args.retry_interval_ms == REDIS_RETRY_INTERVAL_MS 

259 True 

260 >>> args.log_level == LOG_LEVEL 

261 True 

262 >>> 

263 >>> # Test with custom values 

264 >>> sys.argv = ['redis_isready.py', '--redis-url', 'redis://custom:6380/1', 

265 ... '--max-retries', '5', '--retry-interval-ms', '500', 

266 ... '--log-level', 'DEBUG'] 

267 >>> args = _parse_cli() 

268 >>> args.redis_url 

269 'redis://custom:6380/1' 

270 >>> args.max_retries 

271 5 

272 >>> args.retry_interval_ms 

273 500 

274 >>> args.log_level 

275 'DEBUG' 

276 >>> 

277 >>> # Restore original argv 

278 >>> sys.argv = original_argv 

279 """ 

280 

281 parser = argparse.ArgumentParser( 

282 description="Wait until Redis is ready and accepting connections.", 

283 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

284 ) 

285 parser.add_argument( 

286 "--redis-url", 

287 default=REDIS_URL, 

288 help="Redis connection URL (env REDIS_URL)", 

289 ) 

290 parser.add_argument("--max-retries", type=int, default=REDIS_MAX_RETRIES, help="Maximum connection attempts") 

291 parser.add_argument("--retry-interval-ms", type=int, default=REDIS_RETRY_INTERVAL_MS, help="Delay between attempts in milliseconds") 

292 parser.add_argument("--max-backoff", type=float, default=REDIS_MAX_BACKOFF_SECONDS, help="Maximum backoff delay in seconds (jitter applied after)") 

293 parser.add_argument("--log-level", default=LOG_LEVEL, help="Logging level (DEBUG, INFO, ...)") 

294 return parser.parse_args() 

295 

296 

297def main() -> None: # pragma: no cover 

298 """CLI entry-point. 

299 

300 * Parses command-line options. 

301 * Applies ``--log-level`` to the *redis_isready* logger **before** the first 

302 message is emitted. 

303 * Delegates the actual probing to :func:`wait_for_redis_ready`. 

304 * Exits with: 

305 

306 * ``0`` - Redis became ready. 

307 * ``1`` - connection attempts exhausted. 

308 * ``2`` - redis library missing. 

309 * ``3`` - invalid parameter combination. 

310 """ 

311 cli_args = _parse_cli() 

312 

313 log = logging.getLogger("redis_isready") 

314 log.setLevel(cli_args.log_level.upper()) 

315 

316 try: 

317 wait_for_redis_ready( 

318 redis_url=cli_args.redis_url, 

319 max_retries=cli_args.max_retries, 

320 retry_interval_ms=cli_args.retry_interval_ms, 

321 max_backoff=cli_args.max_backoff, 

322 sync=True, 

323 logger=log, 

324 ) 

325 except RuntimeError as exc: 

326 log.error(f"Redis unavailable: {exc}") 

327 sys.exit(1) 

328 

329 sys.exit(0) 

330 

331 

332if __name__ == "__main__": # pragma: no cover 

333 if settings.cache_type == "redis": 

334 # Ensure Redis is ready before proceeding 

335 main() 

336 else: 

337 # If not using Redis, just exit with success 

338 sys.exit(0)