Coverage for mcpgateway / utils / ssl_context_cache.py: 100%
20 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""SSL context caching utilities for MCP Gateway services.
4This module provides caching for SSL contexts to avoid repeatedly creating
5them for the same CA certificates, improving performance for services that
6make many SSL connections.
7"""
9# Standard
10import hashlib
11import ssl
13# Cache for SSL contexts keyed by CA certificate hash
14_ssl_context_cache: dict[str, ssl.SSLContext] = {}
17def get_cached_ssl_context(ca_certificate: str) -> ssl.SSLContext:
18 """Get or create cached SSL context for a CA certificate.
20 Args:
21 ca_certificate: CA certificate in PEM format (str or bytes)
23 Returns:
24 ssl.SSLContext: Configured SSL context
26 Examples:
27 The actual `ssl.SSLContext.load_verify_locations()` call requires valid PEM
28 data; in doctests we mock it to focus on caching behavior.
30 >>> from unittest.mock import Mock, patch
31 >>> from mcpgateway.utils.ssl_context_cache import clear_ssl_context_cache, get_cached_ssl_context
32 >>> clear_ssl_context_cache()
33 >>> with patch("mcpgateway.utils.ssl_context_cache.ssl.create_default_context") as mock_create:
34 ... ctx = Mock()
35 ... mock_create.return_value = ctx
36 ... a = get_cached_ssl_context("CERTDATA")
37 ... b = get_cached_ssl_context(b"CERTDATA") # same bytes => same cache entry
38 ... (a is ctx, b is ctx, mock_create.call_count)
39 (True, True, 1)
41 Note:
42 The function handles bytes, str, and other types (for test mocks).
43 SSL contexts are cached by the SHA256 hash of the certificate to
44 avoid repeated expensive SSL setup operations.
45 """
46 # Handle bytes, string, or other types (e.g., MagicMock in tests)
47 if isinstance(ca_certificate, bytes):
48 cert_bytes = ca_certificate
49 elif isinstance(ca_certificate, str):
50 cert_bytes = ca_certificate.encode()
51 else:
52 # For non-string/non-bytes (e.g., MagicMock in tests), convert to string first
53 cert_bytes = str(ca_certificate).encode()
55 cert_hash = hashlib.sha256(cert_bytes).hexdigest()
57 if cert_hash in _ssl_context_cache:
58 return _ssl_context_cache[cert_hash]
60 # Create new SSL context
61 ctx = ssl.create_default_context()
62 ctx.load_verify_locations(cadata=ca_certificate)
64 # Cache it (limit cache size)
65 if len(_ssl_context_cache) > 100:
66 _ssl_context_cache.clear()
67 _ssl_context_cache[cert_hash] = ctx
69 return ctx
72def clear_ssl_context_cache() -> None:
73 """Clear the SSL context cache.
75 Call this function:
76 - In test fixtures to ensure test isolation
77 - After CA certificate rotation
78 - When memory pressure requires cache cleanup
80 Examples:
81 >>> from unittest.mock import Mock, patch
82 >>> from mcpgateway.utils.ssl_context_cache import clear_ssl_context_cache, get_cached_ssl_context
83 >>> with patch("mcpgateway.utils.ssl_context_cache.ssl.create_default_context") as mock_create:
84 ... mock_create.return_value = Mock()
85 ... _ = get_cached_ssl_context("CERTDATA")
86 ... clear_ssl_context_cache()
87 ... _ = get_cached_ssl_context("CERTDATA")
88 ... mock_create.call_count
89 2
90 """
91 _ssl_context_cache.clear()