Coverage for mcpgateway / plugins / framework / external / grpc / client.py: 100%
108 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/grpc/client.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7External plugin client which connects to a remote server through gRPC.
8Module that contains plugin gRPC client code to serve external plugins.
9"""
11# pylint: disable=no-member,no-name-in-module
13# Standard
14import asyncio
15import logging
16from typing import Optional
18# Third-Party
19from google.protobuf import json_format
20from google.protobuf.struct_pb2 import Struct
21import grpc
23# First-Party
24from mcpgateway.plugins.framework.base import Plugin
25from mcpgateway.plugins.framework.constants import IGNORE_CONFIG_EXTERNAL
26from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError
27from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc
28from mcpgateway.plugins.framework.external.grpc.tls_utils import create_insecure_channel, create_secure_channel
29from mcpgateway.plugins.framework.external.proto_convert import pydantic_context_to_proto, update_pydantic_context_from_proto
30from mcpgateway.plugins.framework.hooks.registry import get_hook_registry
31from mcpgateway.plugins.framework.models import GRPCClientTLSConfig, PluginConfig, PluginContext, PluginErrorModel, PluginPayload, PluginResult
33logger = logging.getLogger(__name__)
36class GrpcExternalPlugin(Plugin):
37 """External plugin object that connects to a remote gRPC server.
39 This plugin implementation connects to a remote plugin server via gRPC,
40 providing a faster binary protocol alternative to the MCP transport.
42 Examples:
43 >>> from mcpgateway.plugins.framework.models import PluginConfig, GRPCClientConfig
44 >>> config = PluginConfig(
45 ... name="MyGrpcPlugin",
46 ... kind="external",
47 ... grpc=GRPCClientConfig(target="localhost:50051")
48 ... )
49 >>> plugin = GrpcExternalPlugin(config)
50 >>> # await plugin.initialize()
51 """
53 def __init__(self, config: PluginConfig) -> None:
54 """Initialize a gRPC external plugin with a configuration.
56 Args:
57 config: The plugin configuration containing gRPC connection details.
58 """
59 super().__init__(config)
60 self._channel: Optional[grpc.aio.Channel] = None
61 self._stub: Optional[plugin_service_pb2_grpc.PluginServiceStub] = None
63 async def initialize(self) -> None:
64 """Initialize the plugin's connection to the gRPC server.
66 This method:
67 1. Creates a gRPC channel (secure or insecure based on config)
68 2. Creates the service stub
69 3. Fetches remote plugin configuration
70 4. Merges remote config with local config
72 Raises:
73 PluginError: If unable to connect or retrieve plugin configuration.
74 """
75 if not self._config.grpc:
76 raise PluginError(
77 error=PluginErrorModel(
78 message="The grpc section must be defined for gRPC external plugin",
79 plugin_name=self.name,
80 )
81 )
83 target = self._config.grpc.get_target()
84 tls_config = self._config.grpc.tls or GRPCClientTLSConfig.from_env()
85 is_uds = self._config.grpc.uds is not None
87 try:
88 # Create channel (TLS not supported for Unix domain sockets)
89 if tls_config and not is_uds:
90 self._channel = create_secure_channel(target, tls_config, self.name)
91 else:
92 self._channel = create_insecure_channel(target)
94 # Create stub
95 self._stub = plugin_service_pb2_grpc.PluginServiceStub(self._channel)
97 # Verify connection and get remote config
98 config = await self._get_plugin_config_with_retry()
100 if not config:
101 raise PluginError(
102 error=PluginErrorModel(
103 message="Unable to retrieve configuration for external plugin",
104 plugin_name=self.name,
105 )
106 )
108 # Merge remote config with local config (local takes precedence)
109 current_config = self._config.model_dump(exclude_unset=True)
110 remote_config = config.model_dump(exclude_unset=True)
111 remote_config.update(current_config)
113 context = {IGNORE_CONFIG_EXTERNAL: True}
114 self._config = PluginConfig.model_validate(remote_config, context=context)
116 logger.info("Successfully connected to gRPC plugin server at %s for plugin %s", target, self.name)
118 except PluginError:
119 raise
120 except Exception as e:
121 logger.exception("Error connecting to gRPC plugin server: %s", e)
122 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
124 async def _get_plugin_config_with_retry(self, max_retries: int = 3, base_delay: float = 1.0) -> Optional[PluginConfig]:
125 """Retrieve plugin configuration with retry logic.
127 Args:
128 max_retries: Maximum number of retry attempts.
129 base_delay: Base delay between retries (exponential backoff).
131 Returns:
132 PluginConfig if successful, None otherwise.
134 Raises:
135 PluginError: If all retries fail.
136 """
137 for attempt in range(max_retries):
138 try:
139 return await self._get_plugin_config()
140 except Exception as e:
141 logger.warning("Connection attempt %d/%d failed: %s", attempt + 1, max_retries, e)
142 if attempt == max_retries - 1:
143 error_msg = f"gRPC plugin '{self.name}' connection failed after {max_retries} attempts"
144 raise PluginError(error=PluginErrorModel(message=error_msg, plugin_name=self.name))
145 delay = base_delay * (2**attempt)
146 logger.info("Retrying in %ss...", delay)
147 await asyncio.sleep(delay)
149 return None # pragma: no cover
151 async def _get_plugin_config(self) -> Optional[PluginConfig]:
152 """Retrieve plugin configuration from the remote gRPC server.
154 Returns:
155 PluginConfig if found, None otherwise.
157 Raises:
158 PluginError: If there is a connection or validation error.
159 """
160 if not self._stub:
161 raise PluginError(
162 error=PluginErrorModel(
163 message="gRPC stub not initialized",
164 plugin_name=self.name,
165 )
166 )
168 try:
169 request = plugin_service_pb2.GetPluginConfigRequest(name=self.name)
170 response = await self._stub.GetPluginConfig(request)
172 if response.found:
173 config_dict = json_format.MessageToDict(response.config)
174 return PluginConfig.model_validate(config_dict)
176 return None
178 except grpc.RpcError as e:
179 logger.error("gRPC error getting plugin config: %s", e)
180 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
182 async def invoke_hook(self, hook_type: str, payload: PluginPayload, context: PluginContext) -> PluginResult:
183 """Invoke an external plugin hook using gRPC.
185 Args:
186 hook_type: The type of hook invoked (e.g., "tool_pre_invoke").
187 payload: The payload to be passed to the hook.
188 context: The plugin context passed to the hook.
190 Returns:
191 The resulting payload from the plugin.
193 Raises:
194 PluginError: If there is an error invoking the hook.
195 """
196 # Get the result type from the global registry
197 registry = get_hook_registry()
198 result_type = registry.get_result_type(hook_type)
199 if not result_type:
200 raise PluginError(
201 error=PluginErrorModel(
202 message=f"Hook type '{hook_type}' not registered in hook registry",
203 plugin_name=self.name,
204 )
205 )
207 if not self._stub:
208 raise PluginError(
209 error=PluginErrorModel(
210 message="gRPC stub not initialized",
211 plugin_name=self.name,
212 )
213 )
215 try:
216 # Convert payload to Struct (still polymorphic)
217 payload_struct = Struct()
218 json_format.ParseDict(payload.model_dump(), payload_struct)
220 # Convert context to explicit proto message (faster than Struct)
221 context_proto = pydantic_context_to_proto(context)
223 # Create and send request
224 request = plugin_service_pb2.InvokeHookRequest(
225 hook_type=hook_type,
226 plugin_name=self.name,
227 payload=payload_struct,
228 context=context_proto,
229 )
231 response = await self._stub.InvokeHook(request)
233 # Check for error
234 if response.HasField("error") and response.error.message:
235 error = PluginErrorModel(
236 message=response.error.message,
237 plugin_name=response.error.plugin_name or self.name,
238 code=response.error.code,
239 mcp_error_code=response.error.mcp_error_code,
240 )
241 if response.error.HasField("details"):
242 error.details = json_format.MessageToDict(response.error.details)
243 raise PluginError(error=error)
245 # Update context if modified (using explicit proto message)
246 if response.HasField("context"):
247 update_pydantic_context_from_proto(context, response.context)
249 # Parse and return result
250 if response.HasField("result"):
251 result_dict = json_format.MessageToDict(response.result)
252 return result_type.model_validate(result_dict)
254 raise PluginError(
255 error=PluginErrorModel(
256 message="Received invalid response from gRPC plugin server",
257 plugin_name=self.name,
258 )
259 )
261 except PluginError:
262 raise
263 except grpc.RpcError as e:
264 logger.exception("gRPC error invoking hook: %s", e)
265 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
266 except Exception as e:
267 logger.exception("Error invoking gRPC hook: %s", e)
268 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
270 async def shutdown(self) -> None:
271 """Shutdown the gRPC connection and cleanup resources."""
272 if self._channel:
273 await self._channel.close()
274 self._channel = None
275 self._stub = None
276 logger.info("gRPC channel closed for plugin %s", self.name)