Coverage for mcpgateway / utils / db_isready.py: 100%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/utils/db_isready.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8db_isready - Wait until the configured database is ready 

9========================================================== 

10This helper blocks until the given database (defined by an **SQLAlchemy** URL) 

11successfully answers a trivial round-trip - ``SELECT 1`` - and then returns. 

12It is useful as a container **readiness/health probe** or imported from Python 

13code to delay start-up of services that depend on the DB. 

14 

15Exit codes when executed as a script 

16----------------------------------- 

17* ``0`` - database ready. 

18* ``1`` - all attempts exhausted / timed-out. 

19* ``2`` - :pypi:`SQLAlchemy` is **not** installed. 

20* ``3`` - invalid parameter combination (``max_tries``/``interval``/``timeout``). 

21 

22Features 

23-------- 

24* Accepts **any** SQLAlchemy URL supported by the installed version. 

25* **Exponential backoff with jitter** - prevents thundering herd on reconnect: 

26 - Retry delays: 2s → 4s → 8s → 16s → 30s (capped) → 30s... 

27 - Random jitter of ±25% prevents synchronized reconnection storms 

28 - Default: 30 retries ≈ 5 minutes total wait before giving up 

29* Timing knobs (tries, interval, connect-timeout) configurable through 

30 *environment variables* **or** *CLI flags* - see below. 

31* Works **synchronously** (blocking) or **asynchronously** - simply 

32 ``await wait_for_db_ready()``. 

33* Credentials appearing in log lines are automatically **redacted**. 

34* Depends only on ``sqlalchemy`` (already required by *mcpgateway*). 

35 

36Environment variables 

37--------------------- 

38The script falls back to :pydata:`mcpgateway.config.settings`, but the values 

39below can be overridden via environment variables *or* the corresponding 

40command-line options. 

41 

42+----------------------------+----------------------------------------------+-----------+ 

43| Name | Description | Default | 

44+============================+==============================================+===========+ 

45| ``DATABASE_URL`` | SQLAlchemy connection URL | ``sqlite:///./mcp.db`` | 

46| ``DB_WAIT_MAX_TRIES`` | Maximum attempts before giving up | ``30`` | 

47| ``DB_WAIT_INTERVAL`` | Delay between attempts *(seconds)* | ``2`` | 

48| ``DB_CONNECT_TIMEOUT`` | Per-attempt connect timeout *(seconds)* | ``2`` | 

49| ``DB_MAX_BACKOFF_SECONDS`` | Max backoff cap *(seconds, jitter added)* | ``30`` | 

50| ``LOG_LEVEL`` | Log verbosity when not set via ``--log-level`` | ``INFO`` | 

51+----------------------------+----------------------------------------------+-----------+ 

52 

53Usage examples 

54-------------- 

55Shell :: 

56 

57 python3 db_isready.py 

58 python3 db_isready.py --database-url "sqlite:///./mcp.db" --max-tries 2 --interval 1 --timeout 1 

59 

60Python :: 

61 

62 from mcpgateway.utils.db_isready import wait_for_db_ready 

63 

64 # Synchronous/blocking 

65 wait_for_db_ready(sync=True) 

66 

67 # Asynchronous 

68 import asyncio 

69 asyncio.run(wait_for_db_ready()) 

70 

71Doctest examples 

72---------------- 

73>>> from mcpgateway.utils.db_isready import wait_for_db_ready 

74>>> import logging 

75>>> class DummyLogger: 

76... def __init__(self): self.infos = [] 

77... def info(self, msg): self.infos.append(msg) 

78... def debug(self, msg): pass 

79... def error(self, msg): pass 

80... @property 

81... def handlers(self): return [True] 

82>>> import sys 

83>>> sys.modules['sqlalchemy'] = type('sqlalchemy', (), { 

84... 'create_engine': lambda *a, **k: type('E', (), {'connect': lambda self: type('C', (), {'execute': lambda self, q: 1, '__enter__': lambda self: self, '__exit__': lambda self, exc_type, exc_val, exc_tb: None})()})(), 

85... 'text': lambda q: q, 

86... 'engine': type('engine', (), {'Engine': object, 'URL': object, 'url': type('url', (), {'make_url': lambda u: type('U', (), {'get_backend_name': lambda self: "sqlite"})()}),}), 

87... 'exc': type('exc', (), {'OperationalError': Exception}) 

88... }) 

89>>> wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=1, interval=1, timeout=1, logger=DummyLogger(), sync=True) 

90>>> try: 

91... wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=0, interval=1, timeout=1, logger=DummyLogger(), sync=True) 

92... except RuntimeError as e: 

93... print('error') 

94error 

95""" 

96 

97# Future 

98from __future__ import annotations 

99 

100# Standard 

101# --------------------------------------------------------------------------- 

102# Standard library imports 

103# --------------------------------------------------------------------------- 

104import argparse 

105import asyncio 

106import logging 

107import os 

108import random 

109import re 

110import sys 

111import time 

112from typing import Any, Dict, Final, Optional 

113 

114# --------------------------------------------------------------------------- 

115# Third-party imports - abort early if SQLAlchemy is missing 

116# --------------------------------------------------------------------------- 

117try: 

118 # Third-Party 

119 from sqlalchemy import create_engine, text 

120 from sqlalchemy.engine import Engine, URL 

121 from sqlalchemy.engine.url import make_url 

122 from sqlalchemy.exc import OperationalError 

123except ImportError: # pragma: no cover - handled at runtime for the CLI 

124 sys.stderr.write("SQLAlchemy not installed - aborting (pip install sqlalchemy)\n") 

125 sys.exit(2) 

126 

127# --------------------------------------------------------------------------- 

128# Optional project settings (silently ignored if mcpgateway package is absent) 

129# --------------------------------------------------------------------------- 

130try: 

131 # First-Party 

132 from mcpgateway.config import settings 

133except Exception: # pragma: no cover - fallback minimal settings 

134 

135 class _Settings: 

136 """Fallback dummy settings when *mcpgateway* is not import-able.""" 

137 

138 database_url: str = "sqlite:///./mcp.db" 

139 log_level: str = "INFO" 

140 

141 settings = _Settings() # type: ignore 

142 

143# --------------------------------------------------------------------------- 

144# Environment variable names 

145# --------------------------------------------------------------------------- 

146ENV_DB_URL: Final[str] = "DATABASE_URL" 

147ENV_MAX_TRIES: Final[str] = "DB_WAIT_MAX_TRIES" 

148ENV_INTERVAL: Final[str] = "DB_WAIT_INTERVAL" 

149ENV_TIMEOUT: Final[str] = "DB_CONNECT_TIMEOUT" 

150ENV_MAX_BACKOFF: Final[str] = "DB_MAX_BACKOFF_SECONDS" 

151 

152# --------------------------------------------------------------------------- 

153# Defaults - overridable via env-vars or CLI flags 

154# --------------------------------------------------------------------------- 

155DEFAULT_DB_URL: Final[str] = os.getenv(ENV_DB_URL, settings.database_url) 

156DEFAULT_MAX_TRIES: Final[int] = int(os.getenv(ENV_MAX_TRIES, "30")) 

157DEFAULT_INTERVAL: Final[float] = float(os.getenv(ENV_INTERVAL, "2")) 

158DEFAULT_TIMEOUT: Final[int] = int(os.getenv(ENV_TIMEOUT, "2")) 

159DEFAULT_MAX_BACKOFF: Final[float] = float(os.getenv(ENV_MAX_BACKOFF, "30")) 

160DEFAULT_LOG_LEVEL: Final[str] = os.getenv("LOG_LEVEL", settings.log_level).upper() 

161 

162# --------------------------------------------------------------------------- 

163# Helpers - sanitising / formatting util functions 

164# --------------------------------------------------------------------------- 

165_CRED_RE: Final[re.Pattern[str]] = re.compile(r"://([^:/?#]+):([^@]+)@") 

166_PWD_RE: Final[re.Pattern[str]] = re.compile(r"(?i)(password|pwd)=([^\s]+)") 

167 

168 

169def _sanitize(txt: str) -> str: 

170 """Hide credentials contained in connection strings or driver errors. 

171 

172 Args: 

173 txt: Arbitrary text that may contain a DB DSN or ``password=...`` 

174 parameter. 

175 

176 Returns: 

177 Same *txt* but with credentials replaced by ``***``. 

178 """ 

179 

180 redacted = _CRED_RE.sub(r"://\\1:***@", txt) 

181 return _PWD_RE.sub(r"\\1=***", redacted) 

182 

183 

184def _format_target(url: URL) -> str: 

185 """Return a concise *host[:port]/db* representation for logging. 

186 

187 Args: 

188 url: A parsed :class:`sqlalchemy.engine.url.URL` instance. 

189 

190 Returns: 

191 Human-readable connection target string suitable for log messages. 

192 """ 

193 

194 if url.get_backend_name() == "sqlite": 

195 return url.database or "<memory>" 

196 

197 host: str = url.host or "localhost" 

198 port: str = f":{url.port}" if url.port else "" 

199 db: str = f"/{url.database}" if url.database else "" 

200 return f"{host}{port}{db}" 

201 

202 

203# --------------------------------------------------------------------------- 

204# Public API - *wait_for_db_ready* 

205# --------------------------------------------------------------------------- 

206 

207 

208def wait_for_db_ready( 

209 *, 

210 database_url: str = DEFAULT_DB_URL, 

211 max_tries: int = DEFAULT_MAX_TRIES, 

212 interval: float = DEFAULT_INTERVAL, 

213 timeout: int = DEFAULT_TIMEOUT, 

214 max_backoff: float = DEFAULT_MAX_BACKOFF, 

215 logger: Optional[logging.Logger] = None, 

216 sync: bool = False, 

217) -> None: 

218 """ 

219 Block until the database replies to ``SELECT 1``. 

220 

221 The helper can be awaited **asynchronously** *or* called in *blocking* 

222 mode by passing ``sync=True``. 

223 

224 Uses **exponential backoff with jitter** to prevent thundering herd when 

225 multiple workers attempt to reconnect simultaneously. The delay between 

226 attempts doubles each time (capped at max_backoff), with ±25% random jitter. 

227 

228 Example retry progression with interval=2s, max_backoff=30s: 

229 Attempt 1: 2s, Attempt 2: 4s, Attempt 3: 8s, Attempt 4: 16s, 

230 Attempt 5+: 30s (capped), each ±25% jitter 

231 

232 Args: 

233 database_url: SQLAlchemy URL to probe. Falls back to ``$DATABASE_URL`` 

234 or the project default (usually an on-disk SQLite file). 

235 max_tries: Total number of connection attempts before giving up. 

236 interval: Base delay *in seconds* between attempts. Actual delay uses 

237 exponential backoff: ``min(interval * 2^(attempt-1), max_backoff)``, then ±25% jitter. 

238 timeout: Per-attempt connection timeout in seconds (passed to the DB 

239 driver when supported). 

240 max_backoff: Maximum backoff delay in seconds (default 30). Jitter is applied 

241 after this cap, so actual sleep can be ±25% of this value. 

242 logger: Optional custom :class:`logging.Logger`. If omitted, a default 

243 one named ``"db_isready"`` is lazily configured. 

244 sync: When *True*, run in the **current** thread instead of scheduling 

245 the probe inside an executor. Setting this flag from inside a 

246 running event-loop will block that loop! 

247 

248 Raises: 

249 RuntimeError: If *invalid* parameters are supplied or the database is 

250 still unavailable after the configured number of attempts. 

251 

252 Doctest: 

253 >>> from mcpgateway.utils.db_isready import wait_for_db_ready 

254 >>> import logging 

255 >>> class DummyLogger: 

256 ... def __init__(self): self.infos = [] 

257 ... def info(self, msg): self.infos.append(msg) 

258 ... def debug(self, msg): pass 

259 ... def error(self, msg): pass 

260 ... @property 

261 ... def handlers(self): return [True] 

262 >>> import sys 

263 >>> sys.modules['sqlalchemy'] = type('sqlalchemy', (), { 

264 ... 'create_engine': lambda *a, **k: type('E', (), {'connect': lambda self: type('C', (), {'execute': lambda self, q: 1, '__enter__': lambda self: self, '__exit__': lambda self, exc_type, exc_val, exc_tb: None})()})(), 

265 ... 'text': lambda q: q, 

266 ... 'engine': type('engine', (), {'Engine': object, 'URL': object, 'url': type('url', (), {'make_url': lambda u: type('U', (), {'get_backend_name': lambda self: "sqlite"})()}),}), 

267 ... 'exc': type('exc', (), {'OperationalError': Exception}) 

268 ... }) 

269 >>> wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=1, interval=1, timeout=1, logger=DummyLogger(), sync=True) 

270 >>> try: 

271 ... wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=0, interval=1, timeout=1, logger=DummyLogger(), sync=True) 

272 ... except RuntimeError as e: 

273 ... print('error') 

274 error 

275 """ 

276 

277 log = logger or logging.getLogger("db_isready") 

278 if not log.handlers: # basicConfig **once** - respects *log.setLevel* later 

279 logging.basicConfig( 

280 level=getattr(logging, DEFAULT_LOG_LEVEL, logging.INFO), 

281 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

282 datefmt="%Y-%m-%dT%H:%M:%S", 

283 ) 

284 

285 if max_tries < 1 or interval <= 0 or timeout <= 0: 

286 raise RuntimeError("Invalid max_tries / interval / timeout values") 

287 

288 url_obj: URL = make_url(database_url) 

289 backend: str = url_obj.get_backend_name() 

290 target: str = _format_target(url_obj) 

291 

292 log.info(f"Probing {backend} at {target} (timeout={timeout}s, interval={interval}s, max_tries={max_tries}, max_backoff={max_backoff}s)") 

293 

294 connect_args: Dict[str, Any] = {} 

295 if backend.startswith(("postgresql", "mysql")): 

296 # Most drivers honour this parameter - harmless for others. 

297 connect_args["connect_timeout"] = timeout 

298 

299 if backend == "sqlite": 

300 # SQLite doesn't support pool overflow/timeout parameters 

301 engine: Engine = create_engine( 

302 database_url, 

303 connect_args=connect_args, 

304 ) 

305 else: 

306 # Other databases support full pooling configuration 

307 engine: Engine = create_engine( 

308 database_url, 

309 pool_pre_ping=True, 

310 pool_size=1, 

311 max_overflow=0, 

312 connect_args=connect_args, 

313 ) 

314 

315 def _probe() -> None: # noqa: D401 - internal helper 

316 """Inner synchronous probe running in either the current or a thread. 

317 

318 Returns: 

319 None - the function exits successfully once the DB answers. 

320 

321 Raises: 

322 RuntimeError: Forwarded after exhausting ``max_tries`` attempts. 

323 """ 

324 

325 start = time.perf_counter() 

326 for attempt in range(1, max_tries + 1): 

327 try: 

328 with engine.connect() as conn: 

329 conn.execute(text("SELECT 1")) 

330 elapsed = time.perf_counter() - start 

331 log.info(f"Database ready after {elapsed:.2f}s (attempt {attempt})") 

332 return 

333 except OperationalError as exc: 

334 if attempt < max_tries: # Don't sleep on the last attempt 

335 # Exponential backoff: interval * 2^(attempt-1), capped at max_backoff 

336 backoff = min(interval * (2 ** (attempt - 1)), max_backoff) 

337 # Add jitter (±25%) to prevent thundering herd 

338 jitter = backoff * random.uniform(-0.25, 0.25) # noqa: DUO102 # nosec B311 - timing jitter, not security 

339 sleep_time = max(0.1, backoff + jitter) # Ensure minimum 0.1s 

340 log.debug(f"Attempt {attempt}/{max_tries} failed ({_sanitize(str(exc))}) - retrying in {sleep_time:.1f}s") 

341 time.sleep(sleep_time) 

342 else: 

343 log.debug(f"Attempt {attempt}/{max_tries} failed ({_sanitize(str(exc))})") 

344 raise RuntimeError(f"Database not ready after {max_tries} attempts") 

345 

346 if sync: 

347 try: 

348 _probe() 

349 finally: 

350 # Readiness probe should not keep pooled connections open. 

351 if hasattr(engine, "dispose"): 

352 engine.dispose() 

353 else: 

354 loop = asyncio.get_event_loop() 

355 try: 

356 # Off-load to default executor to avoid blocking the event-loop. 

357 loop.run_until_complete(loop.run_in_executor(None, _probe)) 

358 finally: 

359 if hasattr(engine, "dispose"): 

360 engine.dispose() 

361 

362 

363# --------------------------------------------------------------------------- 

364# CLI helpers 

365# --------------------------------------------------------------------------- 

366 

367 

368def _parse_cli() -> argparse.Namespace: 

369 """Parse command-line arguments for the *db_isready* CLI wrapper. 

370 

371 Returns: 

372 Parsed :class:`argparse.Namespace` holding all CLI options. 

373 

374 Examples: 

375 >>> import sys 

376 >>> # Save original argv 

377 >>> original_argv = sys.argv 

378 >>> 

379 >>> # Test default values 

380 >>> sys.argv = ['db_isready.py'] 

381 >>> args = _parse_cli() 

382 >>> args.database_url == DEFAULT_DB_URL 

383 True 

384 >>> args.max_tries == DEFAULT_MAX_TRIES 

385 True 

386 >>> args.interval == DEFAULT_INTERVAL 

387 True 

388 >>> args.timeout == DEFAULT_TIMEOUT 

389 True 

390 >>> args.log_level == DEFAULT_LOG_LEVEL 

391 True 

392 

393 >>> # Test custom values 

394 >>> sys.argv = ['db_isready.py', '--database-url', 'postgresql://localhost/test', 

395 ... '--max-tries', '5', '--interval', '1.5', '--timeout', '10', 

396 ... '--log-level', 'DEBUG'] 

397 >>> args = _parse_cli() 

398 >>> args.database_url 

399 'postgresql://localhost/test' 

400 >>> args.max_tries 

401 5 

402 >>> args.interval 

403 1.5 

404 >>> args.timeout 

405 10 

406 >>> args.log_level 

407 'DEBUG' 

408 

409 >>> # Restore original argv 

410 >>> sys.argv = original_argv 

411 """ 

412 

413 parser = argparse.ArgumentParser( 

414 description="Wait until the configured database is ready.", 

415 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

416 ) 

417 parser.add_argument( 

418 "--database-url", 

419 default=DEFAULT_DB_URL, 

420 help="SQLAlchemy URL (env DATABASE_URL)", 

421 ) 

422 parser.add_argument("--max-tries", type=int, default=DEFAULT_MAX_TRIES, help="Maximum connection attempts") 

423 parser.add_argument("--interval", type=float, default=DEFAULT_INTERVAL, help="Delay between attempts in seconds") 

424 parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Per-attempt connect timeout in seconds") 

425 parser.add_argument("--max-backoff", type=float, default=DEFAULT_MAX_BACKOFF, help="Maximum backoff delay in seconds (jitter applied after)") 

426 parser.add_argument("--log-level", default=DEFAULT_LOG_LEVEL, help="Logging level (DEBUG, INFO, ...)") 

427 return parser.parse_args() 

428 

429 

430def main() -> None: # pragma: no cover 

431 """CLI entry-point. 

432 

433 * Parses command-line options. 

434 * Applies ``--log-level`` to the *db_isready* logger **before** the first 

435 message is emitted. 

436 * Delegates the actual probing to :func:`wait_for_db_ready`. 

437 * Exits with: 

438 

439 * ``0`` - database became ready. 

440 * ``1`` - connection attempts exhausted. 

441 * ``2`` - SQLAlchemy missing (handled on import). 

442 * ``3`` - invalid parameter combination. 

443 """ 

444 cli_args = _parse_cli() 

445 

446 log = logging.getLogger("db_isready") 

447 log.setLevel(cli_args.log_level.upper()) 

448 

449 try: 

450 wait_for_db_ready( 

451 database_url=cli_args.database_url, 

452 max_tries=cli_args.max_tries, 

453 interval=cli_args.interval, 

454 timeout=cli_args.timeout, 

455 max_backoff=cli_args.max_backoff, 

456 sync=True, 

457 logger=log, 

458 ) 

459 except RuntimeError as exc: 

460 log.error(f"Database unavailable: {exc}") 

461 sys.exit(1) 

462 

463 sys.exit(0) 

464 

465 

466if __name__ == "__main__": # pragma: no cover 

467 main()