Coverage for mcpgateway / plugins / framework / external / grpc / tls_utils.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/tls_utils.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7gRPC TLS credential utilities for external plugin transport. 

8 

9This module provides helper functions for creating gRPC channel and server 

10credentials with TLS/mTLS support. 

11""" 

12 

13# Standard 

14import logging 

15from typing import Optional 

16 

17# Third-Party 

18import grpc 

19 

20# First-Party 

21from mcpgateway.plugins.framework.models import GRPCClientTLSConfig, GRPCServerTLSConfig 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26def _read_file(path: str) -> bytes: 

27 """Read file contents as bytes. 

28 

29 Args: 

30 path: Path to the file to read. 

31 

32 Returns: 

33 File contents as bytes. 

34 

35 Raises: 

36 FileNotFoundError: If file does not exist. 

37 IOError: If file cannot be read. 

38 """ 

39 with open(path, "rb") as f: 

40 return f.read() 

41 

42 

43def create_client_credentials(tls_config: GRPCClientTLSConfig, plugin_name: str = "unknown") -> grpc.ChannelCredentials: 

44 """Create gRPC channel credentials for client connections. 

45 

46 This function creates SSL channel credentials for connecting to a gRPC server. 

47 It supports: 

48 - Server certificate verification (with custom CA bundle) 

49 - Client certificate authentication (mTLS) 

50 - Disabling verification (not recommended for production) 

51 

52 Args: 

53 tls_config: TLS configuration containing certificate paths and options. 

54 plugin_name: Name of the plugin for logging purposes. 

55 

56 Returns: 

57 gRPC ChannelCredentials configured for TLS/mTLS. 

58 

59 Raises: 

60 FileNotFoundError: If certificate files are not found. 

61 ValueError: If TLS configuration is invalid. 

62 

63 Examples: 

64 >>> from mcpgateway.plugins.framework.models import GRPCClientTLSConfig 

65 >>> config = GRPCClientTLSConfig( # doctest: +SKIP 

66 ... ca_bundle="/path/to/ca.pem", 

67 ... certfile="/path/to/client.pem", 

68 ... keyfile="/path/to/client-key.pem", 

69 ... verify=True 

70 ... ) 

71 >>> creds = create_client_credentials(config, "my_plugin") # doctest: +SKIP 

72 """ 

73 root_certificates: Optional[bytes] = None 

74 private_key: Optional[bytes] = None 

75 certificate_chain: Optional[bytes] = None 

76 

77 # Load CA bundle for server verification 

78 if tls_config.ca_bundle: 

79 logger.debug("Loading CA bundle for plugin %s: %s", plugin_name, tls_config.ca_bundle) 

80 root_certificates = _read_file(tls_config.ca_bundle) 

81 

82 # Load client certificate for mTLS 

83 if tls_config.certfile and tls_config.keyfile: 

84 logger.debug("Loading client certificate for plugin %s: %s", plugin_name, tls_config.certfile) 

85 certificate_chain = _read_file(tls_config.certfile) 

86 private_key = _read_file(tls_config.keyfile) 

87 

88 # Handle verification setting 

89 if not tls_config.verify: 

90 logger.warning("TLS verification disabled for plugin %s - not recommended for production", plugin_name) 

91 # When verification is disabled, we still create credentials but without root certificates 

92 # This allows the connection but skips certificate validation 

93 # Note: grpc-python doesn't have a direct "skip verify" option, so we use empty root_certificates 

94 # which effectively disables server certificate validation 

95 return grpc.ssl_channel_credentials( 

96 root_certificates=None, 

97 private_key=private_key, 

98 certificate_chain=certificate_chain, 

99 ) 

100 

101 return grpc.ssl_channel_credentials( 

102 root_certificates=root_certificates, 

103 private_key=private_key, 

104 certificate_chain=certificate_chain, 

105 ) 

106 

107 

108def create_server_credentials(tls_config: GRPCServerTLSConfig) -> grpc.ServerCredentials: 

109 """Create gRPC server credentials for accepting client connections. 

110 

111 This function creates SSL server credentials for a gRPC server. 

112 It supports: 

113 - Server certificate presentation 

114 - Client certificate authentication (mTLS with configurable requirements) 

115 

116 Args: 

117 tls_config: TLS configuration containing certificate paths and client auth settings. 

118 

119 Returns: 

120 gRPC ServerCredentials configured for TLS/mTLS. 

121 

122 Raises: 

123 FileNotFoundError: If certificate files are not found. 

124 ValueError: If required certificates are not provided. 

125 

126 Examples: 

127 >>> from mcpgateway.plugins.framework.models import GRPCServerTLSConfig 

128 >>> config = GRPCServerTLSConfig( # doctest: +SKIP 

129 ... certfile="/path/to/server.pem", 

130 ... keyfile="/path/to/server-key.pem", 

131 ... ca_bundle="/path/to/ca.pem", 

132 ... client_auth="require" 

133 ... ) 

134 >>> creds = create_server_credentials(config) # doctest: +SKIP 

135 """ 

136 if not tls_config.certfile or not tls_config.keyfile: 

137 raise ValueError("Server certificate (certfile) and private key (keyfile) are required for gRPC TLS") 

138 

139 logger.debug("Loading server certificate: %s", tls_config.certfile) 

140 server_certificate = _read_file(tls_config.certfile) 

141 private_key = _read_file(tls_config.keyfile) 

142 

143 # Load CA bundle for client certificate verification 

144 root_certificates: Optional[bytes] = None 

145 if tls_config.ca_bundle: 

146 logger.debug("Loading CA bundle for client verification: %s", tls_config.ca_bundle) 

147 root_certificates = _read_file(tls_config.ca_bundle) 

148 

149 # Map client_auth setting to gRPC requirement 

150 client_auth_map = { 

151 "none": False, 

152 "optional": False, # gRPC doesn't have "optional" - handled in application layer 

153 "require": True, 

154 } 

155 require_client_auth = client_auth_map.get(tls_config.client_auth.lower(), True) 

156 

157 logger.info( 

158 "Creating gRPC server credentials with client_auth=%s (require_client_auth=%s)", 

159 tls_config.client_auth, 

160 require_client_auth, 

161 ) 

162 

163 return grpc.ssl_server_credentials( 

164 private_key_certificate_chain_pairs=[(private_key, server_certificate)], 

165 root_certificates=root_certificates, 

166 require_client_auth=require_client_auth, 

167 ) 

168 

169 

170def create_insecure_channel(target: str) -> grpc.aio.Channel: 

171 """Create an insecure gRPC channel (no TLS). 

172 

173 Args: 

174 target: The target address in host:port format. 

175 

176 Returns: 

177 An insecure async gRPC channel. 

178 

179 Note: 

180 This should only be used for development/testing. 

181 Production deployments should always use TLS. 

182 """ 

183 logger.warning("Creating insecure gRPC channel to %s - not recommended for production", target) 

184 return grpc.aio.insecure_channel(target) 

185 

186 

187def create_secure_channel(target: str, tls_config: GRPCClientTLSConfig, plugin_name: str = "unknown") -> grpc.aio.Channel: 

188 """Create a secure gRPC channel with TLS. 

189 

190 Args: 

191 target: The target address in host:port format. 

192 tls_config: TLS configuration for the channel. 

193 plugin_name: Name of the plugin for logging purposes. 

194 

195 Returns: 

196 A secure async gRPC channel with TLS credentials. 

197 """ 

198 credentials = create_client_credentials(tls_config, plugin_name) 

199 logger.info("Creating secure gRPC channel to %s for plugin %s", target, plugin_name) 

200 return grpc.aio.secure_channel(target, credentials)