Coverage for mcpgateway / plugins / framework / external / grpc / client.py: 100%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/client.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7External plugin client which connects to a remote server through gRPC. 

8Module that contains plugin gRPC client code to serve external plugins. 

9""" 

10# pylint: disable=no-member,no-name-in-module 

11 

12# Standard 

13import asyncio 

14import logging 

15from typing import Optional 

16 

17# Third-Party 

18from google.protobuf import json_format 

19from google.protobuf.struct_pb2 import Struct 

20import grpc 

21 

22# First-Party 

23from mcpgateway.plugins.framework.base import Plugin 

24from mcpgateway.plugins.framework.constants import IGNORE_CONFIG_EXTERNAL 

25from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError 

26from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc 

27from mcpgateway.plugins.framework.external.grpc.tls_utils import create_insecure_channel, create_secure_channel 

28from mcpgateway.plugins.framework.external.proto_convert import pydantic_context_to_proto, update_pydantic_context_from_proto 

29from mcpgateway.plugins.framework.hooks.registry import get_hook_registry 

30from mcpgateway.plugins.framework.models import GRPCClientTLSConfig, PluginConfig, PluginContext, PluginErrorModel, PluginPayload, PluginResult 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35class GrpcExternalPlugin(Plugin): 

36 """External plugin object that connects to a remote gRPC server. 

37 

38 This plugin implementation connects to a remote plugin server via gRPC, 

39 providing a faster binary protocol alternative to the MCP transport. 

40 

41 Examples: 

42 >>> from mcpgateway.plugins.framework.models import PluginConfig, GRPCClientConfig 

43 >>> config = PluginConfig( 

44 ... name="MyGrpcPlugin", 

45 ... kind="external", 

46 ... grpc=GRPCClientConfig(target="localhost:50051") 

47 ... ) 

48 >>> plugin = GrpcExternalPlugin(config) 

49 >>> # await plugin.initialize() 

50 """ 

51 

52 def __init__(self, config: PluginConfig) -> None: 

53 """Initialize a gRPC external plugin with a configuration. 

54 

55 Args: 

56 config: The plugin configuration containing gRPC connection details. 

57 """ 

58 super().__init__(config) 

59 self._channel: Optional[grpc.aio.Channel] = None 

60 self._stub: Optional[plugin_service_pb2_grpc.PluginServiceStub] = None 

61 

62 async def initialize(self) -> None: 

63 """Initialize the plugin's connection to the gRPC server. 

64 

65 This method: 

66 1. Creates a gRPC channel (secure or insecure based on config) 

67 2. Creates the service stub 

68 3. Fetches remote plugin configuration 

69 4. Merges remote config with local config 

70 

71 Raises: 

72 PluginError: If unable to connect or retrieve plugin configuration. 

73 """ 

74 if not self._config.grpc: 

75 raise PluginError( 

76 error=PluginErrorModel( 

77 message="The grpc section must be defined for gRPC external plugin", 

78 plugin_name=self.name, 

79 ) 

80 ) 

81 

82 target = self._config.grpc.get_target() 

83 tls_config = self._config.grpc.tls or GRPCClientTLSConfig.from_env() 

84 is_uds = self._config.grpc.uds is not None 

85 

86 try: 

87 # Create channel (TLS not supported for Unix domain sockets) 

88 if tls_config and not is_uds: 

89 self._channel = create_secure_channel(target, tls_config, self.name) 

90 else: 

91 self._channel = create_insecure_channel(target) 

92 

93 # Create stub 

94 self._stub = plugin_service_pb2_grpc.PluginServiceStub(self._channel) 

95 

96 # Verify connection and get remote config 

97 config = await self._get_plugin_config_with_retry() 

98 

99 if not config: 

100 raise PluginError( 

101 error=PluginErrorModel( 

102 message="Unable to retrieve configuration for external plugin", 

103 plugin_name=self.name, 

104 ) 

105 ) 

106 

107 # Merge remote config with local config (local takes precedence) 

108 current_config = self._config.model_dump(exclude_unset=True) 

109 remote_config = config.model_dump(exclude_unset=True) 

110 remote_config.update(current_config) 

111 

112 context = {IGNORE_CONFIG_EXTERNAL: True} 

113 self._config = PluginConfig.model_validate(remote_config, context=context) 

114 

115 logger.info("Successfully connected to gRPC plugin server at %s for plugin %s", target, self.name) 

116 

117 except PluginError: 

118 raise 

119 except Exception as e: 

120 logger.exception("Error connecting to gRPC plugin server: %s", e) 

121 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

122 

123 async def _get_plugin_config_with_retry(self, max_retries: int = 3, base_delay: float = 1.0) -> Optional[PluginConfig]: 

124 """Retrieve plugin configuration with retry logic. 

125 

126 Args: 

127 max_retries: Maximum number of retry attempts. 

128 base_delay: Base delay between retries (exponential backoff). 

129 

130 Returns: 

131 PluginConfig if successful, None otherwise. 

132 

133 Raises: 

134 PluginError: If all retries fail. 

135 """ 

136 for attempt in range(max_retries): 

137 try: 

138 return await self._get_plugin_config() 

139 except Exception as e: 

140 logger.warning("Connection attempt %d/%d failed: %s", attempt + 1, max_retries, e) 

141 if attempt == max_retries - 1: 

142 error_msg = f"gRPC plugin '{self.name}' connection failed after {max_retries} attempts" 

143 raise PluginError(error=PluginErrorModel(message=error_msg, plugin_name=self.name)) 

144 delay = base_delay * (2**attempt) 

145 logger.info("Retrying in %ss...", delay) 

146 await asyncio.sleep(delay) 

147 

148 return None # pragma: no cover 

149 

150 async def _get_plugin_config(self) -> Optional[PluginConfig]: 

151 """Retrieve plugin configuration from the remote gRPC server. 

152 

153 Returns: 

154 PluginConfig if found, None otherwise. 

155 

156 Raises: 

157 PluginError: If there is a connection or validation error. 

158 """ 

159 if not self._stub: 

160 raise PluginError( 

161 error=PluginErrorModel( 

162 message="gRPC stub not initialized", 

163 plugin_name=self.name, 

164 ) 

165 ) 

166 

167 try: 

168 request = plugin_service_pb2.GetPluginConfigRequest(name=self.name) 

169 response = await self._stub.GetPluginConfig(request) 

170 

171 if response.found: 

172 config_dict = json_format.MessageToDict(response.config) 

173 return PluginConfig.model_validate(config_dict) 

174 

175 return None 

176 

177 except grpc.RpcError as e: 

178 logger.error("gRPC error getting plugin config: %s", e) 

179 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

180 

181 async def invoke_hook(self, hook_type: str, payload: PluginPayload, context: PluginContext) -> PluginResult: 

182 """Invoke an external plugin hook using gRPC. 

183 

184 Args: 

185 hook_type: The type of hook invoked (e.g., "tool_pre_invoke"). 

186 payload: The payload to be passed to the hook. 

187 context: The plugin context passed to the hook. 

188 

189 Returns: 

190 The resulting payload from the plugin. 

191 

192 Raises: 

193 PluginError: If there is an error invoking the hook. 

194 """ 

195 # Get the result type from the global registry 

196 registry = get_hook_registry() 

197 result_type = registry.get_result_type(hook_type) 

198 if not result_type: 

199 raise PluginError( 

200 error=PluginErrorModel( 

201 message=f"Hook type '{hook_type}' not registered in hook registry", 

202 plugin_name=self.name, 

203 ) 

204 ) 

205 

206 if not self._stub: 

207 raise PluginError( 

208 error=PluginErrorModel( 

209 message="gRPC stub not initialized", 

210 plugin_name=self.name, 

211 ) 

212 ) 

213 

214 try: 

215 # Convert payload to Struct (still polymorphic) 

216 payload_struct = Struct() 

217 json_format.ParseDict(payload.model_dump(), payload_struct) 

218 

219 # Convert context to explicit proto message (faster than Struct) 

220 context_proto = pydantic_context_to_proto(context) 

221 

222 # Create and send request 

223 request = plugin_service_pb2.InvokeHookRequest( 

224 hook_type=hook_type, 

225 plugin_name=self.name, 

226 payload=payload_struct, 

227 context=context_proto, 

228 ) 

229 

230 response = await self._stub.InvokeHook(request) 

231 

232 # Check for error 

233 if response.HasField("error") and response.error.message: 

234 error = PluginErrorModel( 

235 message=response.error.message, 

236 plugin_name=response.error.plugin_name or self.name, 

237 code=response.error.code, 

238 mcp_error_code=response.error.mcp_error_code, 

239 ) 

240 if response.error.HasField("details"): 

241 error.details = json_format.MessageToDict(response.error.details) 

242 raise PluginError(error=error) 

243 

244 # Update context if modified (using explicit proto message) 

245 if response.HasField("context"): 

246 update_pydantic_context_from_proto(context, response.context) 

247 

248 # Parse and return result 

249 if response.HasField("result"): 

250 result_dict = json_format.MessageToDict(response.result) 

251 return result_type.model_validate(result_dict) 

252 

253 raise PluginError( 

254 error=PluginErrorModel( 

255 message="Received invalid response from gRPC plugin server", 

256 plugin_name=self.name, 

257 ) 

258 ) 

259 

260 except PluginError: 

261 raise 

262 except grpc.RpcError as e: 

263 logger.exception("gRPC error invoking hook: %s", e) 

264 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

265 except Exception as e: 

266 logger.exception("Error invoking gRPC hook: %s", e) 

267 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) 

268 

269 async def shutdown(self) -> None: 

270 """Shutdown the gRPC connection and cleanup resources.""" 

271 if self._channel: 

272 await self._channel.close() 

273 self._channel = None 

274 self._stub = None 

275 logger.info("gRPC channel closed for plugin %s", self.name)