Coverage for mcpgateway / plugins / framework / external / grpc / client.py: 100%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/client.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7External plugin client which connects to a remote server through gRPC. 

8Module that contains plugin gRPC client code to serve external plugins. 

9""" 

10 

11# pylint: disable=no-member,no-name-in-module 

12 

13# Standard 

14import asyncio 

15import logging 

16from typing import Optional 

17 

18# Third-Party 

19from google.protobuf import json_format 

20from google.protobuf.struct_pb2 import Struct 

21import grpc 

22 

23# First-Party 

24from mcpgateway.plugins.framework.base import Plugin 

25from mcpgateway.plugins.framework.constants import IGNORE_CONFIG_EXTERNAL 

26from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError 

27from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc 

28from mcpgateway.plugins.framework.external.grpc.tls_utils import create_insecure_channel, create_secure_channel 

29from mcpgateway.plugins.framework.external.proto_convert import pydantic_context_to_proto, update_pydantic_context_from_proto 

30from mcpgateway.plugins.framework.hooks.registry import get_hook_registry 

31from mcpgateway.plugins.framework.models import GRPCClientTLSConfig, PluginConfig, PluginContext, PluginErrorModel, PluginPayload, PluginResult 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36class GrpcExternalPlugin(Plugin): 

37 """External plugin object that connects to a remote gRPC server. 

38 

39 This plugin implementation connects to a remote plugin server via gRPC, 

40 providing a faster binary protocol alternative to the MCP transport. 

41 

42 Examples: 

43 >>> from mcpgateway.plugins.framework.models import PluginConfig, GRPCClientConfig 

44 >>> config = PluginConfig( 

45 ... name="MyGrpcPlugin", 

46 ... kind="external", 

47 ... grpc=GRPCClientConfig(target="localhost:50051") 

48 ... ) 

49 >>> plugin = GrpcExternalPlugin(config) 

50 >>> # await plugin.initialize() 

51 """ 

52 

53 def __init__(self, config: PluginConfig) -> None: 

54 """Initialize a gRPC external plugin with a configuration. 

55 

56 Args: 

57 config: The plugin configuration containing gRPC connection details. 

58 """ 

59 super().__init__(config) 

60 self._channel: Optional[grpc.aio.Channel] = None 

61 self._stub: Optional[plugin_service_pb2_grpc.PluginServiceStub] = None 

62 

63 async def initialize(self) -> None: 

64 """Initialize the plugin's connection to the gRPC server. 

65 

66 This method: 

67 1. Creates a gRPC channel (secure or insecure based on config) 

68 2. Creates the service stub 

69 3. Fetches remote plugin configuration 

70 4. Merges remote config with local config 

71 

72 Raises: 

73 PluginError: If unable to connect or retrieve plugin configuration. 

74 """ 

75 if not self._config.grpc: 

76 raise PluginError( 

77 error=PluginErrorModel( 

78 message="The grpc section must be defined for gRPC external plugin", 

79 plugin_name=self.name, 

80 ) 

81 ) 

82 

83 target = self._config.grpc.get_target() 

84 tls_config = self._config.grpc.tls or GRPCClientTLSConfig.from_env() 

85 is_uds = self._config.grpc.uds is not None 

86 

87 try: 

88 # Create channel (TLS not supported for Unix domain sockets) 

89 if tls_config and not is_uds: 

90 self._channel = create_secure_channel(target, tls_config, self.name) 

91 else: 

92 self._channel = create_insecure_channel(target) 

93 

94 # Create stub 

95 self._stub = plugin_service_pb2_grpc.PluginServiceStub(self._channel) 

96 

97 # Verify connection and get remote config 

98 config = await self._get_plugin_config_with_retry() 

99 

100 if not config: 

101 raise PluginError( 

102 error=PluginErrorModel( 

103 message="Unable to retrieve configuration for external plugin", 

104 plugin_name=self.name, 

105 ) 

106 ) 

107 

108 # Merge remote config with local config (local takes precedence) 

109 current_config = self._config.model_dump(exclude_unset=True) 

110 remote_config = config.model_dump(exclude_unset=True) 

111 remote_config.update(current_config) 

112 

113 context = {IGNORE_CONFIG_EXTERNAL: True} 

114 self._config = PluginConfig.model_validate(remote_config, context=context) 

115 

116 logger.info("Successfully connected to gRPC plugin server at %s for plugin %s", target, self.name) 

117 

118 except PluginError: 

119 raise 

120 except Exception as e: 

121 logger.exception("Error connecting to gRPC plugin server: %s", e) 

122 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

123 

124 async def _get_plugin_config_with_retry(self, max_retries: int = 3, base_delay: float = 1.0) -> Optional[PluginConfig]: 

125 """Retrieve plugin configuration with retry logic. 

126 

127 Args: 

128 max_retries: Maximum number of retry attempts. 

129 base_delay: Base delay between retries (exponential backoff). 

130 

131 Returns: 

132 PluginConfig if successful, None otherwise. 

133 

134 Raises: 

135 PluginError: If all retries fail. 

136 """ 

137 for attempt in range(max_retries): 

138 try: 

139 return await self._get_plugin_config() 

140 except Exception as e: 

141 logger.warning("Connection attempt %d/%d failed: %s", attempt + 1, max_retries, e) 

142 if attempt == max_retries - 1: 

143 error_msg = f"gRPC plugin '{self.name}' connection failed after {max_retries} attempts" 

144 raise PluginError(error=PluginErrorModel(message=error_msg, plugin_name=self.name)) 

145 delay = base_delay * (2**attempt) 

146 logger.info("Retrying in %ss...", delay) 

147 await asyncio.sleep(delay) 

148 

149 return None # pragma: no cover 

150 

151 async def _get_plugin_config(self) -> Optional[PluginConfig]: 

152 """Retrieve plugin configuration from the remote gRPC server. 

153 

154 Returns: 

155 PluginConfig if found, None otherwise. 

156 

157 Raises: 

158 PluginError: If there is a connection or validation error. 

159 """ 

160 if not self._stub: 

161 raise PluginError( 

162 error=PluginErrorModel( 

163 message="gRPC stub not initialized", 

164 plugin_name=self.name, 

165 ) 

166 ) 

167 

168 try: 

169 request = plugin_service_pb2.GetPluginConfigRequest(name=self.name) 

170 response = await self._stub.GetPluginConfig(request) 

171 

172 if response.found: 

173 config_dict = json_format.MessageToDict(response.config) 

174 return PluginConfig.model_validate(config_dict) 

175 

176 return None 

177 

178 except grpc.RpcError as e: 

179 logger.error("gRPC error getting plugin config: %s", e) 

180 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

181 

182 async def invoke_hook(self, hook_type: str, payload: PluginPayload, context: PluginContext) -> PluginResult: 

183 """Invoke an external plugin hook using gRPC. 

184 

185 Args: 

186 hook_type: The type of hook invoked (e.g., "tool_pre_invoke"). 

187 payload: The payload to be passed to the hook. 

188 context: The plugin context passed to the hook. 

189 

190 Returns: 

191 The resulting payload from the plugin. 

192 

193 Raises: 

194 PluginError: If there is an error invoking the hook. 

195 """ 

196 # Get the result type from the global registry 

197 registry = get_hook_registry() 

198 result_type = registry.get_result_type(hook_type) 

199 if not result_type: 

200 raise PluginError( 

201 error=PluginErrorModel( 

202 message=f"Hook type '{hook_type}' not registered in hook registry", 

203 plugin_name=self.name, 

204 ) 

205 ) 

206 

207 if not self._stub: 

208 raise PluginError( 

209 error=PluginErrorModel( 

210 message="gRPC stub not initialized", 

211 plugin_name=self.name, 

212 ) 

213 ) 

214 

215 try: 

216 # Convert payload to Struct (still polymorphic) 

217 payload_struct = Struct() 

218 json_format.ParseDict(payload.model_dump(), payload_struct) 

219 

220 # Convert context to explicit proto message (faster than Struct) 

221 context_proto = pydantic_context_to_proto(context) 

222 

223 # Create and send request 

224 request = plugin_service_pb2.InvokeHookRequest( 

225 hook_type=hook_type, 

226 plugin_name=self.name, 

227 payload=payload_struct, 

228 context=context_proto, 

229 ) 

230 

231 response = await self._stub.InvokeHook(request) 

232 

233 # Check for error 

234 if response.HasField("error") and response.error.message: 

235 error = PluginErrorModel( 

236 message=response.error.message, 

237 plugin_name=response.error.plugin_name or self.name, 

238 code=response.error.code, 

239 mcp_error_code=response.error.mcp_error_code, 

240 ) 

241 if response.error.HasField("details"): 

242 error.details = json_format.MessageToDict(response.error.details) 

243 raise PluginError(error=error) 

244 

245 # Update context if modified (using explicit proto message) 

246 if response.HasField("context"): 

247 update_pydantic_context_from_proto(context, response.context) 

248 

249 # Parse and return result 

250 if response.HasField("result"): 

251 result_dict = json_format.MessageToDict(response.result) 

252 return result_type.model_validate(result_dict) 

253 

254 raise PluginError( 

255 error=PluginErrorModel( 

256 message="Received invalid response from gRPC plugin server", 

257 plugin_name=self.name, 

258 ) 

259 ) 

260 

261 except PluginError: 

262 raise 

263 except grpc.RpcError as e: 

264 logger.exception("gRPC error invoking hook: %s", e) 

265 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

266 except Exception as e: 

267 logger.exception("Error invoking gRPC hook: %s", e) 

268 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

269 

270 async def shutdown(self) -> None: 

271 """Shutdown the gRPC connection and cleanup resources.""" 

272 if self._channel: 

273 await self._channel.close() 

274 self._channel = None 

275 self._stub = None 

276 logger.info("gRPC channel closed for plugin %s", self.name)