Coverage for mcpgateway / utils / retry_manager.py: 100%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/retry_manager.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Keval Mahajan 

6 

7MCP Gateway Resilient HTTP Client with Retry Logic. 

8This module provides a resilient HTTP client that automatically retries requests 

9in the event of certain errors or status codes. It implements exponential backoff 

10with jitter for retrying requests, making it suitable for use in environments where 

11network reliability is a concern or when dealing with rate-limited APIs. 

12 

13Key Features: 

14- Automatic retry logic for transient failures 

15- Exponential backoff with configurable jitter 

16- Support for HTTP 429 Retry-After headers 

17- Configurable retry policies and delay parameters 

18- Async context manager support for resource cleanup 

19- Standard HTTP methods (GET, POST, PUT, DELETE) 

20- Comprehensive error classification 

21 

22The client distinguishes between retryable and non-retryable errors: 

23 

24Retryable Status Codes: 

25- 429 (Too Many Requests) - with Retry-After header support 

26- 503 (Service Unavailable) 

27- 502 (Bad Gateway) 

28- 504 (Gateway Timeout) 

29- 408 (Request Timeout) 

30 

31Non-Retryable Status Codes: 

32- 400 (Bad Request) 

33- 401 (Unauthorized) 

34- 403 (Forbidden) 

35- 404 (Not Found) 

36- 405 (Method Not Allowed) 

37- 406 (Not Acceptable) 

38 

39Retryable Network Errors: 

40- Connection timeouts 

41- Read timeouts 

42- Network errors 

43 

44Dependencies: 

45- Standard Library: asyncio, logging, random 

46- Third-party: httpx 

47- First-party: mcpgateway.config.settings 

48 

49Example Usage: 

50 Basic usage with default settings: 

51 

52 >>> import asyncio 

53 >>> from mcpgateway.utils.retry_manager import ResilientHttpClient 

54 >>> 

55 >>> # Test client initialization and context manager 

56 >>> async def test_basic_usage(): 

57 ... async with ResilientHttpClient() as client: 

58 ... # Verify client is properly initialized 

59 ... assert client.max_retries > 0 

60 ... assert client.base_backoff > 0 

61 ... assert isinstance(client.client, httpx.AsyncClient) 

62 ... return True 

63 >>> # asyncio.run(test_basic_usage()) # Would return True 

64 

65 Custom configuration: 

66 

67 >>> # Test custom configuration 

68 >>> client = ResilientHttpClient( 

69 ... max_retries=5, 

70 ... base_backoff=2.0, 

71 ... max_delay=120.0 

72 ... ) 

73 >>> client.max_retries 

74 5 

75 >>> client.base_backoff 

76 2.0 

77 >>> client.max_delay 

78 120.0 

79 >>> 

80 >>> # Test client cleanup 

81 >>> async def cleanup_test(): 

82 ... await client.aclose() 

83 ... return True 

84 >>> # asyncio.run(cleanup_test()) # Would properly close the client 

85 

86 Testing retry behavior: 

87 

88 >>> # Test that retryable errors are identified correctly 

89 >>> client = ResilientHttpClient() 

90 >>> client._should_retry(httpx.NetworkError("Network error"), None) 

91 True 

92 >>> 

93 >>> # Test non-retryable status codes 

94 >>> from unittest.mock import Mock 

95 >>> response_404 = Mock() 

96 >>> response_404.status_code = 404 

97 >>> client._should_retry(Exception(), response_404) 

98 False 

99 >>> 

100 >>> # Test retryable status codes 

101 >>> response_503 = Mock() 

102 >>> response_503.status_code = 503 

103 >>> client._should_retry(Exception(), response_503) 

104 True 

105 

106 Testing HTTP methods: 

107 

108 >>> # Verify all HTTP methods are available 

109 >>> client = ResilientHttpClient() 

110 >>> import inspect 

111 >>> all([ 

112 ... inspect.iscoroutinefunction(client.get), 

113 ... inspect.iscoroutinefunction(client.post), 

114 ... inspect.iscoroutinefunction(client.put), 

115 ... inspect.iscoroutinefunction(client.delete) 

116 ... ]) 

117 True 

118 

119 Testing backoff calculation: 

120 

121 >>> # Test exponential backoff calculation 

122 >>> client = ResilientHttpClient(base_backoff=1.0, jitter_max=0.5) 

123 >>> # First retry: 1.0 * (2^0) = 1.0 seconds base 

124 >>> # Second retry: 1.0 * (2^1) = 2.0 seconds base 

125 >>> # Third retry: 1.0 * (2^2) = 4.0 seconds base 

126 >>> client.base_backoff * (2**0) 

127 1.0 

128 >>> client.base_backoff * (2**1) 

129 2.0 

130 >>> client.base_backoff * (2**2) 

131 4.0 

132 

133 Testing error classification: 

134 

135 >>> # Verify error code sets 

136 >>> from mcpgateway.utils.retry_manager import RETRYABLE_STATUS_CODES, NON_RETRYABLE_STATUS_CODES 

137 >>> 429 in RETRYABLE_STATUS_CODES 

138 True 

139 >>> 503 in RETRYABLE_STATUS_CODES 

140 True 

141 >>> 400 in NON_RETRYABLE_STATUS_CODES 

142 True 

143 >>> 404 in NON_RETRYABLE_STATUS_CODES 

144 True 

145 >>> # Ensure no overlap between sets 

146 >>> len(RETRYABLE_STATUS_CODES & NON_RETRYABLE_STATUS_CODES) 

147 0 

148""" 

149 

150# Standard 

151import asyncio 

152from contextlib import asynccontextmanager 

153import logging 

154import random 

155from typing import Any, AsyncContextManager, Dict, Optional 

156 

157# Third-Party 

158import httpx 

159 

160# First-Party 

161from mcpgateway.config import settings 

162 

163# Configure logger 

164logger = logging.getLogger(__name__) 

165 

166RETRYABLE_STATUS_CODES = { 

167 429, # Too Many Requests 

168 503, # Service Unavailable 

169 502, # Bad Gateway 

170 504, # Gateway Timeout 

171 408, # Request Timeout 

172} 

173 

174NON_RETRYABLE_STATUS_CODES = { 

175 400, # Bad Request 

176 401, # Unauthorized 

177 403, # Forbidden 

178 404, # Not Found 

179 405, # Method Not Allowed 

180 406, # Not Acceptable 

181} 

182 

183 

184class ResilientHttpClient: 

185 """A resilient HTTP client with automatic retry capabilities. 

186 

187 This client automatically retries requests in the event of certain errors 

188 or status codes using exponential backoff with jitter. It's designed to 

189 handle transient network issues and rate limiting gracefully. 

190 

191 The retry logic implements: 

192 - Exponential backoff: delay = base_backoff * (2 ** attempt) 

193 - Jitter: random additional delay to prevent thundering herd 

194 - Respect for HTTP 429 Retry-After headers 

195 - Maximum delay caps to prevent excessive waiting 

196 

197 Attributes: 

198 max_retries: Maximum number of retry attempts 

199 base_backoff: Base delay in seconds before first retry 

200 max_delay: Maximum delay between retries in seconds 

201 jitter_max: Maximum jitter fraction (0-1) to add randomness 

202 client_args: Additional arguments for httpx.AsyncClient 

203 client: The underlying httpx.AsyncClient instance 

204 

205 Examples: 

206 >>> # Test initialization with default values 

207 >>> client = ResilientHttpClient() 

208 >>> client.max_retries > 0 

209 True 

210 >>> client.base_backoff > 0 

211 True 

212 >>> client.max_delay > 0 

213 True 

214 >>> isinstance(client.client, httpx.AsyncClient) 

215 True 

216 

217 >>> # Test initialization with custom values 

218 >>> client = ResilientHttpClient(max_retries=5, base_backoff=2.0) 

219 >>> client.max_retries 

220 5 

221 >>> client.base_backoff 

222 2.0 

223 

224 >>> # Test client_args parameter (limits added by default) 

225 >>> args = {"timeout": 30.0} 

226 >>> client = ResilientHttpClient(client_args=args) 

227 >>> client.client_args["timeout"] 

228 30.0 

229 >>> "limits" in client.client_args 

230 True 

231 """ 

232 

233 def __init__( 

234 self, 

235 max_retries: int = settings.retry_max_attempts, 

236 base_backoff: float = settings.retry_base_delay, 

237 max_delay: float = settings.retry_max_delay, 

238 jitter_max: float = settings.retry_jitter_max, 

239 client_args: Optional[Dict[str, Any]] = None, 

240 ): 

241 """Initialize the ResilientHttpClient with configurable retry behavior. 

242 

243 Args: 

244 max_retries: Maximum number of retry attempts before giving up 

245 base_backoff: Base delay in seconds before retrying a request 

246 max_delay: Maximum backoff delay in seconds 

247 jitter_max: Maximum jitter fraction (0-1) to add randomness 

248 client_args: Additional arguments to pass to httpx.AsyncClient 

249 

250 Examples: 

251 >>> # Test default initialization 

252 >>> client = ResilientHttpClient() 

253 >>> client.max_retries >= 0 

254 True 

255 >>> client.base_backoff >= 0 

256 True 

257 

258 >>> # Test parameter assignment 

259 >>> client = ResilientHttpClient(max_retries=10, base_backoff=5.0) 

260 >>> client.max_retries 

261 10 

262 >>> client.base_backoff 

263 5.0 

264 

265 >>> # Test client_args handling (default limits added) 

266 >>> client = ResilientHttpClient(client_args=None) 

267 >>> "limits" in client.client_args 

268 True 

269 """ 

270 self.max_retries = max_retries 

271 self.base_backoff = base_backoff 

272 self.max_delay = max_delay 

273 self.jitter_max = jitter_max 

274 self.client_args = client_args or {} 

275 

276 # Add default httpx.Limits if not provided for connection pooling 

277 if "limits" not in self.client_args: 

278 self.client_args["limits"] = httpx.Limits( 

279 max_connections=settings.httpx_max_connections, 

280 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

281 keepalive_expiry=settings.httpx_keepalive_expiry, 

282 ) 

283 

284 self.client = httpx.AsyncClient(**self.client_args) 

285 

286 async def _sleep_with_jitter(self, base: float, jitter_range: float): 

287 """Sleep for a period with added jitter to prevent thundering herd. 

288 

289 Implements jittered exponential backoff by adding random delay to 

290 the base sleep time. The total delay is capped at max_delay. 

291 

292 Args: 

293 base: Base sleep time in seconds 

294 jitter_range: Maximum additional random delay in seconds 

295 

296 Examples: 

297 >>> import asyncio 

298 >>> client = ResilientHttpClient() 

299 >>> # Test that method exists and is callable 

300 >>> callable(client._sleep_with_jitter) 

301 True 

302 

303 >>> # Test delay calculation logic (without actual sleep) 

304 >>> base_time = 2.0 

305 >>> jitter = 1.0 

306 >>> # Simulate the delay calculation 

307 >>> import random 

308 >>> delay = base_time + random.uniform(0, jitter) 

309 >>> delay >= base_time 

310 True 

311 >>> delay <= base_time + jitter 

312 True 

313 """ 

314 # random.uniform() is safe here as jitter is only used for retry timing, not security 

315 delay = base + random.uniform(0, jitter_range) # nosec B311 # noqa: DUO102 

316 # Ensure delay doesn't exceed the max allowed 

317 delay = min(delay, self.max_delay) 

318 await asyncio.sleep(delay) 

319 

320 def _should_retry(self, exc: Exception, response: Optional[httpx.Response]) -> bool: 

321 """Determine whether a request should be retried. 

322 

323 Evaluates the exception and response to decide if the request should 

324 be retried based on error type and HTTP status code. 

325 

326 Args: 

327 exc: Exception raised during the request 

328 response: HTTP response object if available 

329 

330 Returns: 

331 True if the request should be retried, False otherwise 

332 

333 Examples: 

334 >>> client = ResilientHttpClient() 

335 >>> # Test network errors (should retry) 

336 >>> client._should_retry(httpx.ConnectTimeout("Connection timeout"), None) 

337 True 

338 >>> client._should_retry(httpx.ReadTimeout("Read timeout"), None) 

339 True 

340 >>> client._should_retry(httpx.NetworkError("Network error"), None) 

341 True 

342 

343 >>> # Test non-retryable status codes 

344 >>> from unittest.mock import Mock 

345 >>> response_400 = Mock() 

346 >>> response_400.status_code = 400 

347 >>> client._should_retry(Exception(), response_400) 

348 False 

349 

350 >>> response_401 = Mock() 

351 >>> response_401.status_code = 401 

352 >>> client._should_retry(Exception(), response_401) 

353 False 

354 

355 >>> # Test retryable status codes 

356 >>> response_429 = Mock() 

357 >>> response_429.status_code = 429 

358 >>> client._should_retry(Exception(), response_429) 

359 True 

360 

361 >>> response_503 = Mock() 

362 >>> response_503.status_code = 503 

363 >>> client._should_retry(Exception(), response_503) 

364 True 

365 

366 >>> # Test unknown status codes (should not retry) 

367 >>> response_418 = Mock() 

368 >>> response_418.status_code = 418 

369 >>> client._should_retry(Exception(), response_418) 

370 False 

371 """ 

372 if isinstance(exc, (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.NetworkError)): 

373 return True 

374 if response: 

375 if response.status_code in NON_RETRYABLE_STATUS_CODES: 

376 logger.info(f"Response {response.status_code}: Not retrying.") 

377 return False 

378 if response.status_code in RETRYABLE_STATUS_CODES: 

379 logger.info(f"Response {response.status_code}: Retrying.") 

380 return True 

381 return False 

382 

383 async def request(self, method: str, url: str, **kwargs) -> httpx.Response: 

384 """Make a resilient HTTP request with automatic retries. 

385 

386 Performs an HTTP request with automatic retry logic for transient 

387 failures. Implements exponential backoff with jitter and respects 

388 HTTP 429 Retry-After headers. 

389 

390 Args: 

391 method: HTTP method (GET, POST, PUT, DELETE, etc.) 

392 url: Target URL for the request 

393 **kwargs: Additional parameters to pass to httpx.request 

394 

395 Returns: 

396 HTTP response object from the successful request 

397 

398 Raises: 

399 httpx.HTTPError: For non-retryable HTTP errors or when max retries exceeded 

400 last_exc: The last exception encountered during the retries, raised if the request 

401 ultimately fails after all retry attempts. 

402 Exception: The last exception encountered if all retries fail 

403 

404 Note: 

405 This method requires actual HTTP connectivity for complete testing. 

406 Doctests focus on validating method existence and basic parameter 

407 handling without making actual network requests. 

408 

409 Examples: 

410 >>> client = ResilientHttpClient() 

411 >>> # Test method exists and is callable 

412 >>> callable(client.request) 

413 True 

414 

415 >>> # Test parameter validation 

416 >>> import asyncio 

417 >>> # Method should accept string parameters 

418 >>> method = "GET" 

419 >>> url = "https://example.com" 

420 >>> isinstance(method, str) and isinstance(url, str) 

421 True 

422 """ 

423 attempt = 0 

424 last_exc = None 

425 response = None 

426 

427 while attempt < self.max_retries: 

428 try: 

429 logger.debug(f"Attempt {attempt + 1} to {method} {url}") 

430 response = await self.client.request(method, url, **kwargs) 

431 

432 if response.status_code in NON_RETRYABLE_STATUS_CODES or response.is_success: 

433 return response 

434 

435 # Handle 429 - Retry-After header 

436 if response.status_code == 429: 

437 retry_after = response.headers.get("Retry-After") 

438 if retry_after: 

439 retry_after_sec = float(retry_after) 

440 logger.info(f"Rate-limited. Retrying after {retry_after_sec}s.") 

441 await asyncio.sleep(retry_after_sec) 

442 attempt += 1 

443 continue 

444 

445 if not self._should_retry(Exception(), response): 

446 return response 

447 

448 except Exception as exc: 

449 if not self._should_retry(exc, None): 

450 raise 

451 last_exc = exc 

452 logger.warning(f"Retrying due to error: {exc}") 

453 

454 # Backoff calculation 

455 delay = self.base_backoff * (2**attempt) 

456 jitter = delay * self.jitter_max 

457 await self._sleep_with_jitter(delay, jitter) 

458 attempt += 1 

459 logger.debug(f"Retry scheduled after delay of {delay:.2f} seconds.") 

460 

461 if last_exc: 

462 raise last_exc 

463 

464 logger.error(f"Max retries reached for {url}") 

465 return response 

466 

467 async def get(self, url: str, **kwargs): 

468 """Make a resilient GET request. 

469 

470 Args: 

471 url: URL to send the GET request to 

472 **kwargs: Additional parameters to pass to the request 

473 

474 Returns: 

475 HTTP response object from the GET request 

476 

477 Examples: 

478 >>> client = ResilientHttpClient() 

479 >>> callable(client.get) 

480 True 

481 >>> # Verify it's an async method 

482 >>> import inspect 

483 >>> inspect.iscoroutinefunction(client.get) 

484 True 

485 """ 

486 return await self.request("GET", url, **kwargs) 

487 

488 async def post(self, url: str, **kwargs): 

489 """Make a resilient POST request. 

490 

491 Args: 

492 url: URL to send the POST request to 

493 **kwargs: Additional parameters to pass to the request 

494 

495 Returns: 

496 HTTP response object from the POST request 

497 

498 Examples: 

499 >>> client = ResilientHttpClient() 

500 >>> callable(client.post) 

501 True 

502 >>> import inspect 

503 >>> inspect.iscoroutinefunction(client.post) 

504 True 

505 """ 

506 return await self.request("POST", url, **kwargs) 

507 

508 async def put(self, url: str, **kwargs): 

509 """Make a resilient PUT request. 

510 

511 Args: 

512 url: URL to send the PUT request to 

513 **kwargs: Additional parameters to pass to the request 

514 

515 Returns: 

516 HTTP response object from the PUT request 

517 

518 Examples: 

519 >>> client = ResilientHttpClient() 

520 >>> callable(client.put) 

521 True 

522 >>> import inspect 

523 >>> inspect.iscoroutinefunction(client.put) 

524 True 

525 """ 

526 return await self.request("PUT", url, **kwargs) 

527 

528 async def delete(self, url: str, **kwargs): 

529 """Make a resilient DELETE request. 

530 

531 Args: 

532 url: URL to send the DELETE request to 

533 **kwargs: Additional parameters to pass to the request 

534 

535 Returns: 

536 HTTP response object from the DELETE request 

537 

538 Examples: 

539 >>> client = ResilientHttpClient() 

540 >>> callable(client.delete) 

541 True 

542 >>> import inspect 

543 >>> inspect.iscoroutinefunction(client.delete) 

544 True 

545 """ 

546 return await self.request("DELETE", url, **kwargs) 

547 

548 @asynccontextmanager 

549 async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[httpx.Response]: 

550 """Open a resilient streaming HTTP request. 

551 

552 Args: 

553 method: HTTP method to use (e.g. "GET", "POST") 

554 url: URL to send the request to 

555 **kwargs: Additional parameters to pass to the request 

556 

557 Yields: 

558 HTTP response object with streaming capability 

559 

560 Raises: 

561 Exception: If a non-retryable error occurs while opening the stream 

562 RuntimeError: If the maximum number of retries is exceeded 

563 

564 Examples: 

565 >>> client = ResilientHttpClient() 

566 >>> import contextlib 

567 >>> isinstance(client.stream("GET", "https://example.com"), contextlib.AbstractAsyncContextManager) 

568 True 

569 >>> async def fetch(): 

570 ... async with client.stream("GET", "https://example.com") as response: 

571 ... async for chunk in response.aiter_bytes(): 

572 ... print(chunk) 

573 """ 

574 attempt = 0 

575 last_exc: Optional[Exception] = None 

576 while attempt < self.max_retries: 

577 try: 

578 logging.debug("Attempt %d (stream) %s %s", attempt + 1, method, url) 

579 stream_cm = self.client.stream(method, url, **kwargs) 

580 async with stream_cm as resp: 

581 if not (200 <= resp.status_code < 300 or resp.is_success): 

582 if resp.status_code == 429: 

583 ra = resp.headers.get("Retry-After") 

584 if ra: 

585 try: 

586 wait = float(ra) 

587 except ValueError: 

588 wait = None 

589 if wait: 

590 logging.info("Rate-limited. Sleeping Retry-After=%s", wait) 

591 await asyncio.sleep(wait) 

592 attempt += 1 

593 continue 

594 if not self._should_retry(None, resp): 

595 # give caller the error response once and return 

596 yield resp 

597 return 

598 logging.info("Stream response %s considered retryable; will retry opening.", resp.status_code) 

599 else: 

600 # good response -> yield it to caller 

601 yield resp 

602 return 

603 except Exception as exc: 

604 last_exc = exc 

605 if not self._should_retry(exc, None): 

606 raise 

607 logging.warning("Error opening stream (will retry): %s", exc) 

608 

609 backoff = self.base_backoff * (2**attempt) 

610 jitter_range = backoff * self.jitter_max 

611 await self._sleep_with_jitter(backoff, jitter_range) 

612 

613 attempt += 1 

614 logging.debug("Retrying stream open (attempt %d) after backoff %.2f", attempt + 1, backoff) 

615 

616 if last_exc: 

617 raise RuntimeError(last_exc) 

618 raise RuntimeError("Max retries reached opening stream") 

619 

620 async def aclose(self): 

621 """Close the underlying HTTP client gracefully. 

622 

623 Ensures proper cleanup of the httpx.AsyncClient and its resources. 

624 

625 Examples: 

626 >>> client = ResilientHttpClient() 

627 >>> callable(client.aclose) 

628 True 

629 >>> import inspect 

630 >>> inspect.iscoroutinefunction(client.aclose) 

631 True 

632 """ 

633 await self.client.aclose() 

634 

635 async def __aenter__(self): 

636 """Asynchronous context manager entry point. 

637 

638 Returns: 

639 The client instance for use in async with statements 

640 

641 Examples: 

642 >>> client = ResilientHttpClient() 

643 >>> callable(client.__aenter__) 

644 True 

645 >>> import inspect 

646 >>> inspect.iscoroutinefunction(client.__aenter__) 

647 True 

648 """ 

649 return self 

650 

651 async def __aexit__(self, *args): 

652 """Asynchronous context manager exit point. 

653 

654 Ensures the HTTP client is properly closed after use, even if 

655 exceptions occur during the context manager block. 

656 

657 Args: 

658 *args: Exception information passed by the context manager 

659 (exc_type, exc_value, traceback) or None values if no exception 

660 

661 Examples: 

662 >>> client = ResilientHttpClient() 

663 >>> callable(client.__aexit__) 

664 True 

665 >>> import inspect 

666 >>> inspect.iscoroutinefunction(client.__aexit__) 

667 True 

668 """ 

669 await self.aclose()