Coverage for mcpgateway / utils / db_isready.py: 100%
93 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""Location: ./mcpgateway/utils/db_isready.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8db_isready - Wait until the configured database is ready
9==========================================================
10This helper blocks until the given database (defined by an **SQLAlchemy** URL)
11successfully answers a trivial round-trip - ``SELECT 1`` - and then returns.
12It is useful as a container **readiness/health probe** or imported from Python
13code to delay start-up of services that depend on the DB.
15Exit codes when executed as a script
16-----------------------------------
17* ``0`` - database ready.
18* ``1`` - all attempts exhausted / timed-out.
19* ``2`` - :pypi:`SQLAlchemy` is **not** installed.
20* ``3`` - invalid parameter combination (``max_tries``/``interval``/``timeout``).
22Features
23--------
24* Accepts **any** SQLAlchemy URL supported by the installed version.
25* **Exponential backoff with jitter** - prevents thundering herd on reconnect:
26 - Retry delays: 2s → 4s → 8s → 16s → 30s (capped) → 30s...
27 - Random jitter of ±25% prevents synchronized reconnection storms
28 - Default: 30 retries ≈ 5 minutes total wait before giving up
29* Timing knobs (tries, interval, connect-timeout) configurable through
30 *environment variables* **or** *CLI flags* - see below.
31* Works **synchronously** (blocking) or **asynchronously** - simply
32 ``await wait_for_db_ready()``.
33* Credentials appearing in log lines are automatically **redacted**.
34* Depends only on ``sqlalchemy`` (already required by *mcpgateway*).
36Environment variables
37---------------------
38The script falls back to :pydata:`mcpgateway.config.settings`, but the values
39below can be overridden via environment variables *or* the corresponding
40command-line options.
42+----------------------------+----------------------------------------------+-----------+
43| Name | Description | Default |
44+============================+==============================================+===========+
45| ``DATABASE_URL`` | SQLAlchemy connection URL | ``sqlite:///./mcp.db`` |
46| ``DB_WAIT_MAX_TRIES`` | Maximum attempts before giving up | ``30`` |
47| ``DB_WAIT_INTERVAL`` | Delay between attempts *(seconds)* | ``2`` |
48| ``DB_CONNECT_TIMEOUT`` | Per-attempt connect timeout *(seconds)* | ``2`` |
49| ``DB_MAX_BACKOFF_SECONDS`` | Max backoff cap *(seconds, jitter added)* | ``30`` |
50| ``LOG_LEVEL`` | Log verbosity when not set via ``--log-level`` | ``INFO`` |
51+----------------------------+----------------------------------------------+-----------+
53Usage examples
54--------------
55Shell ::
57 python3 db_isready.py
58 python3 db_isready.py --database-url "sqlite:///./mcp.db" --max-tries 2 --interval 1 --timeout 1
60Python ::
62 from mcpgateway.utils.db_isready import wait_for_db_ready
64 # Synchronous/blocking
65 wait_for_db_ready(sync=True)
67 # Asynchronous
68 import asyncio
69 asyncio.run(wait_for_db_ready())
71Doctest examples
72----------------
73>>> from mcpgateway.utils.db_isready import wait_for_db_ready
74>>> import logging
75>>> class DummyLogger:
76... def __init__(self): self.infos = []
77... def info(self, msg): self.infos.append(msg)
78... def debug(self, msg): pass
79... def error(self, msg): pass
80... @property
81... def handlers(self): return [True]
82>>> import sys
83>>> sys.modules['sqlalchemy'] = type('sqlalchemy', (), {
84... 'create_engine': lambda *a, **k: type('E', (), {'connect': lambda self: type('C', (), {'execute': lambda self, q: 1, '__enter__': lambda self: self, '__exit__': lambda self, exc_type, exc_val, exc_tb: None})()})(),
85... 'text': lambda q: q,
86... 'engine': type('engine', (), {'Engine': object, 'URL': object, 'url': type('url', (), {'make_url': lambda u: type('U', (), {'get_backend_name': lambda self: "sqlite"})()}),}),
87... 'exc': type('exc', (), {'OperationalError': Exception})
88... })
89>>> wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=1, interval=1, timeout=1, logger=DummyLogger(), sync=True)
90>>> try:
91... wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=0, interval=1, timeout=1, logger=DummyLogger(), sync=True)
92... except RuntimeError as e:
93... print('error')
94error
95"""
97# Future
98from __future__ import annotations
100# Standard
101# ---------------------------------------------------------------------------
102# Standard library imports
103# ---------------------------------------------------------------------------
104import argparse
105import asyncio
106import logging
107import os
108import random
109import re
110import sys
111import time
112from typing import Any, Dict, Final, Optional
114# ---------------------------------------------------------------------------
115# Third-party imports - abort early if SQLAlchemy is missing
116# ---------------------------------------------------------------------------
117try:
118 # Third-Party
119 from sqlalchemy import create_engine, text
120 from sqlalchemy.engine import Engine, URL
121 from sqlalchemy.engine.url import make_url
122 from sqlalchemy.exc import OperationalError
123except ImportError: # pragma: no cover - handled at runtime for the CLI
124 sys.stderr.write("SQLAlchemy not installed - aborting (pip install sqlalchemy)\n")
125 sys.exit(2)
127# ---------------------------------------------------------------------------
128# Optional project settings (silently ignored if mcpgateway package is absent)
129# ---------------------------------------------------------------------------
130try:
131 # First-Party
132 from mcpgateway.config import settings
133except Exception: # pragma: no cover - fallback minimal settings
135 class _Settings:
136 """Fallback dummy settings when *mcpgateway* is not import-able."""
138 database_url: str = "sqlite:///./mcp.db"
139 log_level: str = "INFO"
141 settings = _Settings() # type: ignore
143# ---------------------------------------------------------------------------
144# Environment variable names
145# ---------------------------------------------------------------------------
146ENV_DB_URL: Final[str] = "DATABASE_URL"
147ENV_MAX_TRIES: Final[str] = "DB_WAIT_MAX_TRIES"
148ENV_INTERVAL: Final[str] = "DB_WAIT_INTERVAL"
149ENV_TIMEOUT: Final[str] = "DB_CONNECT_TIMEOUT"
150ENV_MAX_BACKOFF: Final[str] = "DB_MAX_BACKOFF_SECONDS"
152# ---------------------------------------------------------------------------
153# Defaults - overridable via env-vars or CLI flags
154# ---------------------------------------------------------------------------
155DEFAULT_DB_URL: Final[str] = os.getenv(ENV_DB_URL, settings.database_url)
156DEFAULT_MAX_TRIES: Final[int] = int(os.getenv(ENV_MAX_TRIES, "30"))
157DEFAULT_INTERVAL: Final[float] = float(os.getenv(ENV_INTERVAL, "2"))
158DEFAULT_TIMEOUT: Final[int] = int(os.getenv(ENV_TIMEOUT, "2"))
159DEFAULT_MAX_BACKOFF: Final[float] = float(os.getenv(ENV_MAX_BACKOFF, "30"))
160DEFAULT_LOG_LEVEL: Final[str] = os.getenv("LOG_LEVEL", settings.log_level).upper()
162# ---------------------------------------------------------------------------
163# Helpers - sanitising / formatting util functions
164# ---------------------------------------------------------------------------
165_CRED_RE: Final[re.Pattern[str]] = re.compile(r"://([^:/?#]+):([^@]+)@")
166_PWD_RE: Final[re.Pattern[str]] = re.compile(r"(?i)(password|pwd)=([^\s]+)")
169def _sanitize(txt: str) -> str:
170 """Hide credentials contained in connection strings or driver errors.
172 Args:
173 txt: Arbitrary text that may contain a DB DSN or ``password=...``
174 parameter.
176 Returns:
177 Same *txt* but with credentials replaced by ``***``.
178 """
180 redacted = _CRED_RE.sub(r"://\\1:***@", txt)
181 return _PWD_RE.sub(r"\\1=***", redacted)
184def _format_target(url: URL) -> str:
185 """Return a concise *host[:port]/db* representation for logging.
187 Args:
188 url: A parsed :class:`sqlalchemy.engine.url.URL` instance.
190 Returns:
191 Human-readable connection target string suitable for log messages.
192 """
194 if url.get_backend_name() == "sqlite":
195 return url.database or "<memory>"
197 host: str = url.host or "localhost"
198 port: str = f":{url.port}" if url.port else ""
199 db: str = f"/{url.database}" if url.database else ""
200 return f"{host}{port}{db}"
203# ---------------------------------------------------------------------------
204# Public API - *wait_for_db_ready*
205# ---------------------------------------------------------------------------
208def wait_for_db_ready(
209 *,
210 database_url: str = DEFAULT_DB_URL,
211 max_tries: int = DEFAULT_MAX_TRIES,
212 interval: float = DEFAULT_INTERVAL,
213 timeout: int = DEFAULT_TIMEOUT,
214 max_backoff: float = DEFAULT_MAX_BACKOFF,
215 logger: Optional[logging.Logger] = None,
216 sync: bool = False,
217) -> None:
218 """
219 Block until the database replies to ``SELECT 1``.
221 The helper can be awaited **asynchronously** *or* called in *blocking*
222 mode by passing ``sync=True``.
224 Uses **exponential backoff with jitter** to prevent thundering herd when
225 multiple workers attempt to reconnect simultaneously. The delay between
226 attempts doubles each time (capped at max_backoff), with ±25% random jitter.
228 Example retry progression with interval=2s, max_backoff=30s:
229 Attempt 1: 2s, Attempt 2: 4s, Attempt 3: 8s, Attempt 4: 16s,
230 Attempt 5+: 30s (capped), each ±25% jitter
232 Args:
233 database_url: SQLAlchemy URL to probe. Falls back to ``$DATABASE_URL``
234 or the project default (usually an on-disk SQLite file).
235 max_tries: Total number of connection attempts before giving up.
236 interval: Base delay *in seconds* between attempts. Actual delay uses
237 exponential backoff: ``min(interval * 2^(attempt-1), max_backoff)``, then ±25% jitter.
238 timeout: Per-attempt connection timeout in seconds (passed to the DB
239 driver when supported).
240 max_backoff: Maximum backoff delay in seconds (default 30). Jitter is applied
241 after this cap, so actual sleep can be ±25% of this value.
242 logger: Optional custom :class:`logging.Logger`. If omitted, a default
243 one named ``"db_isready"`` is lazily configured.
244 sync: When *True*, run in the **current** thread instead of scheduling
245 the probe inside an executor. Setting this flag from inside a
246 running event-loop will block that loop!
248 Raises:
249 RuntimeError: If *invalid* parameters are supplied or the database is
250 still unavailable after the configured number of attempts.
252 Doctest:
253 >>> from mcpgateway.utils.db_isready import wait_for_db_ready
254 >>> import logging
255 >>> class DummyLogger:
256 ... def __init__(self): self.infos = []
257 ... def info(self, msg): self.infos.append(msg)
258 ... def debug(self, msg): pass
259 ... def error(self, msg): pass
260 ... @property
261 ... def handlers(self): return [True]
262 >>> import sys
263 >>> sys.modules['sqlalchemy'] = type('sqlalchemy', (), {
264 ... 'create_engine': lambda *a, **k: type('E', (), {'connect': lambda self: type('C', (), {'execute': lambda self, q: 1, '__enter__': lambda self: self, '__exit__': lambda self, exc_type, exc_val, exc_tb: None})()})(),
265 ... 'text': lambda q: q,
266 ... 'engine': type('engine', (), {'Engine': object, 'URL': object, 'url': type('url', (), {'make_url': lambda u: type('U', (), {'get_backend_name': lambda self: "sqlite"})()}),}),
267 ... 'exc': type('exc', (), {'OperationalError': Exception})
268 ... })
269 >>> wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=1, interval=1, timeout=1, logger=DummyLogger(), sync=True)
270 >>> try:
271 ... wait_for_db_ready(database_url='sqlite:///./mcp.db', max_tries=0, interval=1, timeout=1, logger=DummyLogger(), sync=True)
272 ... except RuntimeError as e:
273 ... print('error')
274 error
275 """
277 log = logger or logging.getLogger("db_isready")
278 if not log.handlers: # basicConfig **once** - respects *log.setLevel* later
279 logging.basicConfig(
280 level=getattr(logging, DEFAULT_LOG_LEVEL, logging.INFO),
281 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
282 datefmt="%Y-%m-%dT%H:%M:%S",
283 )
285 if max_tries < 1 or interval <= 0 or timeout <= 0:
286 raise RuntimeError("Invalid max_tries / interval / timeout values")
288 url_obj: URL = make_url(database_url)
289 backend: str = url_obj.get_backend_name()
290 target: str = _format_target(url_obj)
292 log.info(f"Probing {backend} at {target} (timeout={timeout}s, interval={interval}s, max_tries={max_tries}, max_backoff={max_backoff}s)")
294 connect_args: Dict[str, Any] = {}
295 if backend.startswith(("postgresql", "mysql")):
296 # Most drivers honour this parameter - harmless for others.
297 connect_args["connect_timeout"] = timeout
299 if backend == "sqlite":
300 # SQLite doesn't support pool overflow/timeout parameters
301 engine: Engine = create_engine(
302 database_url,
303 connect_args=connect_args,
304 )
305 else:
306 # Other databases support full pooling configuration
307 engine: Engine = create_engine(
308 database_url,
309 pool_pre_ping=True,
310 pool_size=1,
311 max_overflow=0,
312 connect_args=connect_args,
313 )
315 def _probe() -> None: # noqa: D401 - internal helper
316 """Inner synchronous probe running in either the current or a thread.
318 Returns:
319 None - the function exits successfully once the DB answers.
321 Raises:
322 RuntimeError: Forwarded after exhausting ``max_tries`` attempts.
323 """
325 start = time.perf_counter()
326 for attempt in range(1, max_tries + 1):
327 try:
328 with engine.connect() as conn:
329 conn.execute(text("SELECT 1"))
330 elapsed = time.perf_counter() - start
331 log.info(f"Database ready after {elapsed:.2f}s (attempt {attempt})")
332 return
333 except OperationalError as exc:
334 if attempt < max_tries: # Don't sleep on the last attempt
335 # Exponential backoff: interval * 2^(attempt-1), capped at max_backoff
336 backoff = min(interval * (2 ** (attempt - 1)), max_backoff)
337 # Add jitter (±25%) to prevent thundering herd
338 jitter = backoff * random.uniform(-0.25, 0.25) # noqa: DUO102 # nosec B311 - timing jitter, not security
339 sleep_time = max(0.1, backoff + jitter) # Ensure minimum 0.1s
340 log.debug(f"Attempt {attempt}/{max_tries} failed ({_sanitize(str(exc))}) - retrying in {sleep_time:.1f}s")
341 time.sleep(sleep_time)
342 else:
343 log.debug(f"Attempt {attempt}/{max_tries} failed ({_sanitize(str(exc))})")
344 raise RuntimeError(f"Database not ready after {max_tries} attempts")
346 if sync:
347 try:
348 _probe()
349 finally:
350 # Readiness probe should not keep pooled connections open.
351 if hasattr(engine, "dispose"):
352 engine.dispose()
353 else:
354 loop = asyncio.get_event_loop()
355 try:
356 # Off-load to default executor to avoid blocking the event-loop.
357 loop.run_until_complete(loop.run_in_executor(None, _probe))
358 finally:
359 if hasattr(engine, "dispose"):
360 engine.dispose()
363# ---------------------------------------------------------------------------
364# CLI helpers
365# ---------------------------------------------------------------------------
368def _parse_cli() -> argparse.Namespace:
369 """Parse command-line arguments for the *db_isready* CLI wrapper.
371 Returns:
372 Parsed :class:`argparse.Namespace` holding all CLI options.
374 Examples:
375 >>> import sys
376 >>> # Save original argv
377 >>> original_argv = sys.argv
378 >>>
379 >>> # Test default values
380 >>> sys.argv = ['db_isready.py']
381 >>> args = _parse_cli()
382 >>> args.database_url == DEFAULT_DB_URL
383 True
384 >>> args.max_tries == DEFAULT_MAX_TRIES
385 True
386 >>> args.interval == DEFAULT_INTERVAL
387 True
388 >>> args.timeout == DEFAULT_TIMEOUT
389 True
390 >>> args.log_level == DEFAULT_LOG_LEVEL
391 True
393 >>> # Test custom values
394 >>> sys.argv = ['db_isready.py', '--database-url', 'postgresql://localhost/test',
395 ... '--max-tries', '5', '--interval', '1.5', '--timeout', '10',
396 ... '--log-level', 'DEBUG']
397 >>> args = _parse_cli()
398 >>> args.database_url
399 'postgresql://localhost/test'
400 >>> args.max_tries
401 5
402 >>> args.interval
403 1.5
404 >>> args.timeout
405 10
406 >>> args.log_level
407 'DEBUG'
409 >>> # Restore original argv
410 >>> sys.argv = original_argv
411 """
413 parser = argparse.ArgumentParser(
414 description="Wait until the configured database is ready.",
415 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
416 )
417 parser.add_argument(
418 "--database-url",
419 default=DEFAULT_DB_URL,
420 help="SQLAlchemy URL (env DATABASE_URL)",
421 )
422 parser.add_argument("--max-tries", type=int, default=DEFAULT_MAX_TRIES, help="Maximum connection attempts")
423 parser.add_argument("--interval", type=float, default=DEFAULT_INTERVAL, help="Delay between attempts in seconds")
424 parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Per-attempt connect timeout in seconds")
425 parser.add_argument("--max-backoff", type=float, default=DEFAULT_MAX_BACKOFF, help="Maximum backoff delay in seconds (jitter applied after)")
426 parser.add_argument("--log-level", default=DEFAULT_LOG_LEVEL, help="Logging level (DEBUG, INFO, ...)")
427 return parser.parse_args()
430def main() -> None: # pragma: no cover
431 """CLI entry-point.
433 * Parses command-line options.
434 * Applies ``--log-level`` to the *db_isready* logger **before** the first
435 message is emitted.
436 * Delegates the actual probing to :func:`wait_for_db_ready`.
437 * Exits with:
439 * ``0`` - database became ready.
440 * ``1`` - connection attempts exhausted.
441 * ``2`` - SQLAlchemy missing (handled on import).
442 * ``3`` - invalid parameter combination.
443 """
444 cli_args = _parse_cli()
446 log = logging.getLogger("db_isready")
447 log.setLevel(cli_args.log_level.upper())
449 try:
450 wait_for_db_ready(
451 database_url=cli_args.database_url,
452 max_tries=cli_args.max_tries,
453 interval=cli_args.interval,
454 timeout=cli_args.timeout,
455 max_backoff=cli_args.max_backoff,
456 sync=True,
457 logger=log,
458 )
459 except RuntimeError as exc:
460 log.error(f"Database unavailable: {exc}")
461 sys.exit(1)
463 sys.exit(0)
466if __name__ == "__main__": # pragma: no cover
467 main()