Coverage for mcpgateway / utils / ssl_context_cache.py: 100%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""SSL context caching utilities for MCP Gateway services. 

3 

4This module provides caching for SSL contexts to avoid repeatedly creating 

5them for the same CA certificates, improving performance for services that 

6make many SSL connections. 

7""" 

8 

9# Standard 

10import hashlib 

11import ssl 

12 

13# Cache for SSL contexts keyed by CA certificate hash 

14_ssl_context_cache: dict[str, ssl.SSLContext] = {} 

15 

16 

17def get_cached_ssl_context(ca_certificate: str) -> ssl.SSLContext: 

18 """Get or create cached SSL context for a CA certificate. 

19 

20 Args: 

21 ca_certificate: CA certificate in PEM format (str or bytes) 

22 

23 Returns: 

24 ssl.SSLContext: Configured SSL context 

25 

26 Examples: 

27 The actual `ssl.SSLContext.load_verify_locations()` call requires valid PEM 

28 data; in doctests we mock it to focus on caching behavior. 

29 

30 >>> from unittest.mock import Mock, patch 

31 >>> from mcpgateway.utils.ssl_context_cache import clear_ssl_context_cache, get_cached_ssl_context 

32 >>> clear_ssl_context_cache() 

33 >>> with patch("mcpgateway.utils.ssl_context_cache.ssl.create_default_context") as mock_create: 

34 ... ctx = Mock() 

35 ... mock_create.return_value = ctx 

36 ... a = get_cached_ssl_context("CERTDATA") 

37 ... b = get_cached_ssl_context(b"CERTDATA") # same bytes => same cache entry 

38 ... (a is ctx, b is ctx, mock_create.call_count) 

39 (True, True, 1) 

40 

41 Note: 

42 The function handles bytes, str, and other types (for test mocks). 

43 SSL contexts are cached by the SHA256 hash of the certificate to 

44 avoid repeated expensive SSL setup operations. 

45 """ 

46 # Handle bytes, string, or other types (e.g., MagicMock in tests) 

47 if isinstance(ca_certificate, bytes): 

48 cert_bytes = ca_certificate 

49 elif isinstance(ca_certificate, str): 

50 cert_bytes = ca_certificate.encode() 

51 else: 

52 # For non-string/non-bytes (e.g., MagicMock in tests), convert to string first 

53 cert_bytes = str(ca_certificate).encode() 

54 

55 cert_hash = hashlib.sha256(cert_bytes).hexdigest() 

56 

57 if cert_hash in _ssl_context_cache: 

58 return _ssl_context_cache[cert_hash] 

59 

60 # Create new SSL context 

61 ctx = ssl.create_default_context() 

62 ctx.load_verify_locations(cadata=ca_certificate) 

63 

64 # Cache it (limit cache size) 

65 if len(_ssl_context_cache) > 100: 

66 _ssl_context_cache.clear() 

67 _ssl_context_cache[cert_hash] = ctx 

68 

69 return ctx 

70 

71 

72def clear_ssl_context_cache() -> None: 

73 """Clear the SSL context cache. 

74 

75 Call this function: 

76 - In test fixtures to ensure test isolation 

77 - After CA certificate rotation 

78 - When memory pressure requires cache cleanup 

79 

80 Examples: 

81 >>> from unittest.mock import Mock, patch 

82 >>> from mcpgateway.utils.ssl_context_cache import clear_ssl_context_cache, get_cached_ssl_context 

83 >>> with patch("mcpgateway.utils.ssl_context_cache.ssl.create_default_context") as mock_create: 

84 ... mock_create.return_value = Mock() 

85 ... _ = get_cached_ssl_context("CERTDATA") 

86 ... clear_ssl_context_cache() 

87 ... _ = get_cached_ssl_context("CERTDATA") 

88 ... mock_create.call_count 

89 2 

90 """ 

91 _ssl_context_cache.clear()