Coverage for mcpgateway / utils / retry_manager.py: 100%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/retry_manager.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Keval Mahajan
7MCP Gateway Resilient HTTP Client with Retry Logic.
8This module provides a resilient HTTP client that automatically retries requests
9in the event of certain errors or status codes. It implements exponential backoff
10with jitter for retrying requests, making it suitable for use in environments where
11network reliability is a concern or when dealing with rate-limited APIs.
13Key Features:
14- Automatic retry logic for transient failures
15- Exponential backoff with configurable jitter
16- Support for HTTP 429 Retry-After headers
17- Configurable retry policies and delay parameters
18- Async context manager support for resource cleanup
19- Standard HTTP methods (GET, POST, PUT, DELETE)
20- Comprehensive error classification
22The client distinguishes between retryable and non-retryable errors:
24Retryable Status Codes:
25- 429 (Too Many Requests) - with Retry-After header support
26- 503 (Service Unavailable)
27- 502 (Bad Gateway)
28- 504 (Gateway Timeout)
29- 408 (Request Timeout)
31Non-Retryable Status Codes:
32- 400 (Bad Request)
33- 401 (Unauthorized)
34- 403 (Forbidden)
35- 404 (Not Found)
36- 405 (Method Not Allowed)
37- 406 (Not Acceptable)
39Retryable Network Errors:
40- Connection timeouts
41- Read timeouts
42- Network errors
44Dependencies:
45- Standard Library: asyncio, logging, random
46- Third-party: httpx
47- First-party: mcpgateway.config.settings
49Example Usage:
50 Basic usage with default settings:
52 >>> import asyncio
53 >>> from mcpgateway.utils.retry_manager import ResilientHttpClient
54 >>>
55 >>> # Test client initialization and context manager
56 >>> async def test_basic_usage():
57 ... async with ResilientHttpClient() as client:
58 ... # Verify client is properly initialized
59 ... assert client.max_retries > 0
60 ... assert client.base_backoff > 0
61 ... assert isinstance(client.client, httpx.AsyncClient)
62 ... return True
63 >>> # asyncio.run(test_basic_usage()) # Would return True
65 Custom configuration:
67 >>> # Test custom configuration
68 >>> client = ResilientHttpClient(
69 ... max_retries=5,
70 ... base_backoff=2.0,
71 ... max_delay=120.0
72 ... )
73 >>> client.max_retries
74 5
75 >>> client.base_backoff
76 2.0
77 >>> client.max_delay
78 120.0
79 >>>
80 >>> # Test client cleanup
81 >>> async def cleanup_test():
82 ... await client.aclose()
83 ... return True
84 >>> # asyncio.run(cleanup_test()) # Would properly close the client
86 Testing retry behavior:
88 >>> # Test that retryable errors are identified correctly
89 >>> client = ResilientHttpClient()
90 >>> client._should_retry(httpx.NetworkError("Network error"), None)
91 True
92 >>>
93 >>> # Test non-retryable status codes
94 >>> from unittest.mock import Mock
95 >>> response_404 = Mock()
96 >>> response_404.status_code = 404
97 >>> client._should_retry(Exception(), response_404)
98 False
99 >>>
100 >>> # Test retryable status codes
101 >>> response_503 = Mock()
102 >>> response_503.status_code = 503
103 >>> client._should_retry(Exception(), response_503)
104 True
106 Testing HTTP methods:
108 >>> # Verify all HTTP methods are available
109 >>> client = ResilientHttpClient()
110 >>> import inspect
111 >>> all([
112 ... inspect.iscoroutinefunction(client.get),
113 ... inspect.iscoroutinefunction(client.post),
114 ... inspect.iscoroutinefunction(client.put),
115 ... inspect.iscoroutinefunction(client.delete)
116 ... ])
117 True
119 Testing backoff calculation:
121 >>> # Test exponential backoff calculation
122 >>> client = ResilientHttpClient(base_backoff=1.0, jitter_max=0.5)
123 >>> # First retry: 1.0 * (2^0) = 1.0 seconds base
124 >>> # Second retry: 1.0 * (2^1) = 2.0 seconds base
125 >>> # Third retry: 1.0 * (2^2) = 4.0 seconds base
126 >>> client.base_backoff * (2**0)
127 1.0
128 >>> client.base_backoff * (2**1)
129 2.0
130 >>> client.base_backoff * (2**2)
131 4.0
133 Testing error classification:
135 >>> # Verify error code sets
136 >>> from mcpgateway.utils.retry_manager import RETRYABLE_STATUS_CODES, NON_RETRYABLE_STATUS_CODES
137 >>> 429 in RETRYABLE_STATUS_CODES
138 True
139 >>> 503 in RETRYABLE_STATUS_CODES
140 True
141 >>> 400 in NON_RETRYABLE_STATUS_CODES
142 True
143 >>> 404 in NON_RETRYABLE_STATUS_CODES
144 True
145 >>> # Ensure no overlap between sets
146 >>> len(RETRYABLE_STATUS_CODES & NON_RETRYABLE_STATUS_CODES)
147 0
148"""
150# Standard
151import asyncio
152from contextlib import asynccontextmanager
153import logging
154import random
155from typing import Any, AsyncContextManager, Dict, Optional
157# Third-Party
158import httpx
160# First-Party
161from mcpgateway.config import settings
163# Configure logger
164logger = logging.getLogger(__name__)
166RETRYABLE_STATUS_CODES = {
167 429, # Too Many Requests
168 503, # Service Unavailable
169 502, # Bad Gateway
170 504, # Gateway Timeout
171 408, # Request Timeout
172}
174NON_RETRYABLE_STATUS_CODES = {
175 400, # Bad Request
176 401, # Unauthorized
177 403, # Forbidden
178 404, # Not Found
179 405, # Method Not Allowed
180 406, # Not Acceptable
181}
184class ResilientHttpClient:
185 """A resilient HTTP client with automatic retry capabilities.
187 This client automatically retries requests in the event of certain errors
188 or status codes using exponential backoff with jitter. It's designed to
189 handle transient network issues and rate limiting gracefully.
191 The retry logic implements:
192 - Exponential backoff: delay = base_backoff * (2 ** attempt)
193 - Jitter: random additional delay to prevent thundering herd
194 - Respect for HTTP 429 Retry-After headers
195 - Maximum delay caps to prevent excessive waiting
197 Attributes:
198 max_retries: Maximum number of retry attempts
199 base_backoff: Base delay in seconds before first retry
200 max_delay: Maximum delay between retries in seconds
201 jitter_max: Maximum jitter fraction (0-1) to add randomness
202 client_args: Additional arguments for httpx.AsyncClient
203 client: The underlying httpx.AsyncClient instance
205 Examples:
206 >>> # Test initialization with default values
207 >>> client = ResilientHttpClient()
208 >>> client.max_retries > 0
209 True
210 >>> client.base_backoff > 0
211 True
212 >>> client.max_delay > 0
213 True
214 >>> isinstance(client.client, httpx.AsyncClient)
215 True
217 >>> # Test initialization with custom values
218 >>> client = ResilientHttpClient(max_retries=5, base_backoff=2.0)
219 >>> client.max_retries
220 5
221 >>> client.base_backoff
222 2.0
224 >>> # Test client_args parameter (limits added by default)
225 >>> args = {"timeout": 30.0}
226 >>> client = ResilientHttpClient(client_args=args)
227 >>> client.client_args["timeout"]
228 30.0
229 >>> "limits" in client.client_args
230 True
231 """
233 def __init__(
234 self,
235 max_retries: int = settings.retry_max_attempts,
236 base_backoff: float = settings.retry_base_delay,
237 max_delay: float = settings.retry_max_delay,
238 jitter_max: float = settings.retry_jitter_max,
239 client_args: Optional[Dict[str, Any]] = None,
240 ):
241 """Initialize the ResilientHttpClient with configurable retry behavior.
243 Args:
244 max_retries: Maximum number of retry attempts before giving up
245 base_backoff: Base delay in seconds before retrying a request
246 max_delay: Maximum backoff delay in seconds
247 jitter_max: Maximum jitter fraction (0-1) to add randomness
248 client_args: Additional arguments to pass to httpx.AsyncClient
250 Examples:
251 >>> # Test default initialization
252 >>> client = ResilientHttpClient()
253 >>> client.max_retries >= 0
254 True
255 >>> client.base_backoff >= 0
256 True
258 >>> # Test parameter assignment
259 >>> client = ResilientHttpClient(max_retries=10, base_backoff=5.0)
260 >>> client.max_retries
261 10
262 >>> client.base_backoff
263 5.0
265 >>> # Test client_args handling (default limits added)
266 >>> client = ResilientHttpClient(client_args=None)
267 >>> "limits" in client.client_args
268 True
269 """
270 self.max_retries = max_retries
271 self.base_backoff = base_backoff
272 self.max_delay = max_delay
273 self.jitter_max = jitter_max
274 self.client_args = client_args or {}
276 # Add default httpx.Limits if not provided for connection pooling
277 if "limits" not in self.client_args:
278 self.client_args["limits"] = httpx.Limits(
279 max_connections=settings.httpx_max_connections,
280 max_keepalive_connections=settings.httpx_max_keepalive_connections,
281 keepalive_expiry=settings.httpx_keepalive_expiry,
282 )
284 self.client = httpx.AsyncClient(**self.client_args)
286 async def _sleep_with_jitter(self, base: float, jitter_range: float):
287 """Sleep for a period with added jitter to prevent thundering herd.
289 Implements jittered exponential backoff by adding random delay to
290 the base sleep time. The total delay is capped at max_delay.
292 Args:
293 base: Base sleep time in seconds
294 jitter_range: Maximum additional random delay in seconds
296 Examples:
297 >>> import asyncio
298 >>> client = ResilientHttpClient()
299 >>> # Test that method exists and is callable
300 >>> callable(client._sleep_with_jitter)
301 True
303 >>> # Test delay calculation logic (without actual sleep)
304 >>> base_time = 2.0
305 >>> jitter = 1.0
306 >>> # Simulate the delay calculation
307 >>> import random
308 >>> delay = base_time + random.uniform(0, jitter)
309 >>> delay >= base_time
310 True
311 >>> delay <= base_time + jitter
312 True
313 """
314 # random.uniform() is safe here as jitter is only used for retry timing, not security
315 delay = base + random.uniform(0, jitter_range) # nosec B311 # noqa: DUO102
316 # Ensure delay doesn't exceed the max allowed
317 delay = min(delay, self.max_delay)
318 await asyncio.sleep(delay)
320 def _should_retry(self, exc: Exception, response: Optional[httpx.Response]) -> bool:
321 """Determine whether a request should be retried.
323 Evaluates the exception and response to decide if the request should
324 be retried based on error type and HTTP status code.
326 Args:
327 exc: Exception raised during the request
328 response: HTTP response object if available
330 Returns:
331 True if the request should be retried, False otherwise
333 Examples:
334 >>> client = ResilientHttpClient()
335 >>> # Test network errors (should retry)
336 >>> client._should_retry(httpx.ConnectTimeout("Connection timeout"), None)
337 True
338 >>> client._should_retry(httpx.ReadTimeout("Read timeout"), None)
339 True
340 >>> client._should_retry(httpx.NetworkError("Network error"), None)
341 True
343 >>> # Test non-retryable status codes
344 >>> from unittest.mock import Mock
345 >>> response_400 = Mock()
346 >>> response_400.status_code = 400
347 >>> client._should_retry(Exception(), response_400)
348 False
350 >>> response_401 = Mock()
351 >>> response_401.status_code = 401
352 >>> client._should_retry(Exception(), response_401)
353 False
355 >>> # Test retryable status codes
356 >>> response_429 = Mock()
357 >>> response_429.status_code = 429
358 >>> client._should_retry(Exception(), response_429)
359 True
361 >>> response_503 = Mock()
362 >>> response_503.status_code = 503
363 >>> client._should_retry(Exception(), response_503)
364 True
366 >>> # Test unknown status codes (should not retry)
367 >>> response_418 = Mock()
368 >>> response_418.status_code = 418
369 >>> client._should_retry(Exception(), response_418)
370 False
371 """
372 if isinstance(exc, (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.NetworkError)):
373 return True
374 if response:
375 if response.status_code in NON_RETRYABLE_STATUS_CODES:
376 logger.info(f"Response {response.status_code}: Not retrying.")
377 return False
378 if response.status_code in RETRYABLE_STATUS_CODES:
379 logger.info(f"Response {response.status_code}: Retrying.")
380 return True
381 return False
383 async def request(self, method: str, url: str, **kwargs) -> httpx.Response:
384 """Make a resilient HTTP request with automatic retries.
386 Performs an HTTP request with automatic retry logic for transient
387 failures. Implements exponential backoff with jitter and respects
388 HTTP 429 Retry-After headers.
390 Args:
391 method: HTTP method (GET, POST, PUT, DELETE, etc.)
392 url: Target URL for the request
393 **kwargs: Additional parameters to pass to httpx.request
395 Returns:
396 HTTP response object from the successful request
398 Raises:
399 httpx.HTTPError: For non-retryable HTTP errors or when max retries exceeded
400 last_exc: The last exception encountered during the retries, raised if the request
401 ultimately fails after all retry attempts.
402 Exception: The last exception encountered if all retries fail
404 Note:
405 This method requires actual HTTP connectivity for complete testing.
406 Doctests focus on validating method existence and basic parameter
407 handling without making actual network requests.
409 Examples:
410 >>> client = ResilientHttpClient()
411 >>> # Test method exists and is callable
412 >>> callable(client.request)
413 True
415 >>> # Test parameter validation
416 >>> import asyncio
417 >>> # Method should accept string parameters
418 >>> method = "GET"
419 >>> url = "https://example.com"
420 >>> isinstance(method, str) and isinstance(url, str)
421 True
422 """
423 attempt = 0
424 last_exc = None
425 response = None
427 while attempt < self.max_retries:
428 try:
429 logger.debug(f"Attempt {attempt + 1} to {method} {url}")
430 response = await self.client.request(method, url, **kwargs)
432 if response.status_code in NON_RETRYABLE_STATUS_CODES or response.is_success:
433 return response
435 # Handle 429 - Retry-After header
436 if response.status_code == 429:
437 retry_after = response.headers.get("Retry-After")
438 if retry_after:
439 retry_after_sec = float(retry_after)
440 logger.info(f"Rate-limited. Retrying after {retry_after_sec}s.")
441 await asyncio.sleep(retry_after_sec)
442 attempt += 1
443 continue
445 if not self._should_retry(Exception(), response):
446 return response
448 except Exception as exc:
449 if not self._should_retry(exc, None):
450 raise
451 last_exc = exc
452 logger.warning(f"Retrying due to error: {exc}")
454 # Backoff calculation
455 delay = self.base_backoff * (2**attempt)
456 jitter = delay * self.jitter_max
457 await self._sleep_with_jitter(delay, jitter)
458 attempt += 1
459 logger.debug(f"Retry scheduled after delay of {delay:.2f} seconds.")
461 if last_exc:
462 raise last_exc
464 logger.error(f"Max retries reached for {url}")
465 return response
467 async def get(self, url: str, **kwargs):
468 """Make a resilient GET request.
470 Args:
471 url: URL to send the GET request to
472 **kwargs: Additional parameters to pass to the request
474 Returns:
475 HTTP response object from the GET request
477 Examples:
478 >>> client = ResilientHttpClient()
479 >>> callable(client.get)
480 True
481 >>> # Verify it's an async method
482 >>> import inspect
483 >>> inspect.iscoroutinefunction(client.get)
484 True
485 """
486 return await self.request("GET", url, **kwargs)
488 async def post(self, url: str, **kwargs):
489 """Make a resilient POST request.
491 Args:
492 url: URL to send the POST request to
493 **kwargs: Additional parameters to pass to the request
495 Returns:
496 HTTP response object from the POST request
498 Examples:
499 >>> client = ResilientHttpClient()
500 >>> callable(client.post)
501 True
502 >>> import inspect
503 >>> inspect.iscoroutinefunction(client.post)
504 True
505 """
506 return await self.request("POST", url, **kwargs)
508 async def put(self, url: str, **kwargs):
509 """Make a resilient PUT request.
511 Args:
512 url: URL to send the PUT request to
513 **kwargs: Additional parameters to pass to the request
515 Returns:
516 HTTP response object from the PUT request
518 Examples:
519 >>> client = ResilientHttpClient()
520 >>> callable(client.put)
521 True
522 >>> import inspect
523 >>> inspect.iscoroutinefunction(client.put)
524 True
525 """
526 return await self.request("PUT", url, **kwargs)
528 async def delete(self, url: str, **kwargs):
529 """Make a resilient DELETE request.
531 Args:
532 url: URL to send the DELETE request to
533 **kwargs: Additional parameters to pass to the request
535 Returns:
536 HTTP response object from the DELETE request
538 Examples:
539 >>> client = ResilientHttpClient()
540 >>> callable(client.delete)
541 True
542 >>> import inspect
543 >>> inspect.iscoroutinefunction(client.delete)
544 True
545 """
546 return await self.request("DELETE", url, **kwargs)
548 @asynccontextmanager
549 async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[httpx.Response]:
550 """Open a resilient streaming HTTP request.
552 Args:
553 method: HTTP method to use (e.g. "GET", "POST")
554 url: URL to send the request to
555 **kwargs: Additional parameters to pass to the request
557 Yields:
558 HTTP response object with streaming capability
560 Raises:
561 Exception: If a non-retryable error occurs while opening the stream
562 RuntimeError: If the maximum number of retries is exceeded
564 Examples:
565 >>> client = ResilientHttpClient()
566 >>> import contextlib
567 >>> isinstance(client.stream("GET", "https://example.com"), contextlib.AbstractAsyncContextManager)
568 True
569 >>> async def fetch():
570 ... async with client.stream("GET", "https://example.com") as response:
571 ... async for chunk in response.aiter_bytes():
572 ... print(chunk)
573 """
574 attempt = 0
575 last_exc: Optional[Exception] = None
576 while attempt < self.max_retries:
577 try:
578 logging.debug("Attempt %d (stream) %s %s", attempt + 1, method, url)
579 stream_cm = self.client.stream(method, url, **kwargs)
580 async with stream_cm as resp:
581 if not (200 <= resp.status_code < 300 or resp.is_success):
582 if resp.status_code == 429:
583 ra = resp.headers.get("Retry-After")
584 if ra:
585 try:
586 wait = float(ra)
587 except ValueError:
588 wait = None
589 if wait:
590 logging.info("Rate-limited. Sleeping Retry-After=%s", wait)
591 await asyncio.sleep(wait)
592 attempt += 1
593 continue
594 if not self._should_retry(None, resp):
595 # give caller the error response once and return
596 yield resp
597 return
598 logging.info("Stream response %s considered retryable; will retry opening.", resp.status_code)
599 else:
600 # good response -> yield it to caller
601 yield resp
602 return
603 except Exception as exc:
604 last_exc = exc
605 if not self._should_retry(exc, None):
606 raise
607 logging.warning("Error opening stream (will retry): %s", exc)
609 backoff = self.base_backoff * (2**attempt)
610 jitter_range = backoff * self.jitter_max
611 await self._sleep_with_jitter(backoff, jitter_range)
613 attempt += 1
614 logging.debug("Retrying stream open (attempt %d) after backoff %.2f", attempt + 1, backoff)
616 if last_exc:
617 raise RuntimeError(last_exc)
618 raise RuntimeError("Max retries reached opening stream")
620 async def aclose(self):
621 """Close the underlying HTTP client gracefully.
623 Ensures proper cleanup of the httpx.AsyncClient and its resources.
625 Examples:
626 >>> client = ResilientHttpClient()
627 >>> callable(client.aclose)
628 True
629 >>> import inspect
630 >>> inspect.iscoroutinefunction(client.aclose)
631 True
632 """
633 await self.client.aclose()
635 async def __aenter__(self):
636 """Asynchronous context manager entry point.
638 Returns:
639 The client instance for use in async with statements
641 Examples:
642 >>> client = ResilientHttpClient()
643 >>> callable(client.__aenter__)
644 True
645 >>> import inspect
646 >>> inspect.iscoroutinefunction(client.__aenter__)
647 True
648 """
649 return self
651 async def __aexit__(self, *args):
652 """Asynchronous context manager exit point.
654 Ensures the HTTP client is properly closed after use, even if
655 exceptions occur during the context manager block.
657 Args:
658 *args: Exception information passed by the context manager
659 (exc_type, exc_value, traceback) or None values if no exception
661 Examples:
662 >>> client = ResilientHttpClient()
663 >>> callable(client.__aexit__)
664 True
665 >>> import inspect
666 >>> inspect.iscoroutinefunction(client.__aexit__)
667 True
668 """
669 await self.aclose()