Coverage for mcpgateway / plugins / framework / external / grpc / client.py: 100%
108 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/grpc/client.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7External plugin client which connects to a remote server through gRPC.
8Module that contains plugin gRPC client code to serve external plugins.
9"""
10# pylint: disable=no-member,no-name-in-module
12# Standard
13import asyncio
14import logging
15from typing import Optional
17# Third-Party
18from google.protobuf import json_format
19from google.protobuf.struct_pb2 import Struct
20import grpc
22# First-Party
23from mcpgateway.plugins.framework.base import Plugin
24from mcpgateway.plugins.framework.constants import IGNORE_CONFIG_EXTERNAL
25from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError
26from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc
27from mcpgateway.plugins.framework.external.grpc.tls_utils import create_insecure_channel, create_secure_channel
28from mcpgateway.plugins.framework.external.proto_convert import pydantic_context_to_proto, update_pydantic_context_from_proto
29from mcpgateway.plugins.framework.hooks.registry import get_hook_registry
30from mcpgateway.plugins.framework.models import GRPCClientTLSConfig, PluginConfig, PluginContext, PluginErrorModel, PluginPayload, PluginResult
32logger = logging.getLogger(__name__)
35class GrpcExternalPlugin(Plugin):
36 """External plugin object that connects to a remote gRPC server.
38 This plugin implementation connects to a remote plugin server via gRPC,
39 providing a faster binary protocol alternative to the MCP transport.
41 Examples:
42 >>> from mcpgateway.plugins.framework.models import PluginConfig, GRPCClientConfig
43 >>> config = PluginConfig(
44 ... name="MyGrpcPlugin",
45 ... kind="external",
46 ... grpc=GRPCClientConfig(target="localhost:50051")
47 ... )
48 >>> plugin = GrpcExternalPlugin(config)
49 >>> # await plugin.initialize()
50 """
52 def __init__(self, config: PluginConfig) -> None:
53 """Initialize a gRPC external plugin with a configuration.
55 Args:
56 config: The plugin configuration containing gRPC connection details.
57 """
58 super().__init__(config)
59 self._channel: Optional[grpc.aio.Channel] = None
60 self._stub: Optional[plugin_service_pb2_grpc.PluginServiceStub] = None
62 async def initialize(self) -> None:
63 """Initialize the plugin's connection to the gRPC server.
65 This method:
66 1. Creates a gRPC channel (secure or insecure based on config)
67 2. Creates the service stub
68 3. Fetches remote plugin configuration
69 4. Merges remote config with local config
71 Raises:
72 PluginError: If unable to connect or retrieve plugin configuration.
73 """
74 if not self._config.grpc:
75 raise PluginError(
76 error=PluginErrorModel(
77 message="The grpc section must be defined for gRPC external plugin",
78 plugin_name=self.name,
79 )
80 )
82 target = self._config.grpc.get_target()
83 tls_config = self._config.grpc.tls or GRPCClientTLSConfig.from_env()
84 is_uds = self._config.grpc.uds is not None
86 try:
87 # Create channel (TLS not supported for Unix domain sockets)
88 if tls_config and not is_uds:
89 self._channel = create_secure_channel(target, tls_config, self.name)
90 else:
91 self._channel = create_insecure_channel(target)
93 # Create stub
94 self._stub = plugin_service_pb2_grpc.PluginServiceStub(self._channel)
96 # Verify connection and get remote config
97 config = await self._get_plugin_config_with_retry()
99 if not config:
100 raise PluginError(
101 error=PluginErrorModel(
102 message="Unable to retrieve configuration for external plugin",
103 plugin_name=self.name,
104 )
105 )
107 # Merge remote config with local config (local takes precedence)
108 current_config = self._config.model_dump(exclude_unset=True)
109 remote_config = config.model_dump(exclude_unset=True)
110 remote_config.update(current_config)
112 context = {IGNORE_CONFIG_EXTERNAL: True}
113 self._config = PluginConfig.model_validate(remote_config, context=context)
115 logger.info("Successfully connected to gRPC plugin server at %s for plugin %s", target, self.name)
117 except PluginError:
118 raise
119 except Exception as e:
120 logger.exception("Error connecting to gRPC plugin server: %s", e)
121 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
123 async def _get_plugin_config_with_retry(self, max_retries: int = 3, base_delay: float = 1.0) -> Optional[PluginConfig]:
124 """Retrieve plugin configuration with retry logic.
126 Args:
127 max_retries: Maximum number of retry attempts.
128 base_delay: Base delay between retries (exponential backoff).
130 Returns:
131 PluginConfig if successful, None otherwise.
133 Raises:
134 PluginError: If all retries fail.
135 """
136 for attempt in range(max_retries):
137 try:
138 return await self._get_plugin_config()
139 except Exception as e:
140 logger.warning("Connection attempt %d/%d failed: %s", attempt + 1, max_retries, e)
141 if attempt == max_retries - 1:
142 error_msg = f"gRPC plugin '{self.name}' connection failed after {max_retries} attempts"
143 raise PluginError(error=PluginErrorModel(message=error_msg, plugin_name=self.name))
144 delay = base_delay * (2**attempt)
145 logger.info("Retrying in %ss...", delay)
146 await asyncio.sleep(delay)
148 return None # pragma: no cover
150 async def _get_plugin_config(self) -> Optional[PluginConfig]:
151 """Retrieve plugin configuration from the remote gRPC server.
153 Returns:
154 PluginConfig if found, None otherwise.
156 Raises:
157 PluginError: If there is a connection or validation error.
158 """
159 if not self._stub:
160 raise PluginError(
161 error=PluginErrorModel(
162 message="gRPC stub not initialized",
163 plugin_name=self.name,
164 )
165 )
167 try:
168 request = plugin_service_pb2.GetPluginConfigRequest(name=self.name)
169 response = await self._stub.GetPluginConfig(request)
171 if response.found:
172 config_dict = json_format.MessageToDict(response.config)
173 return PluginConfig.model_validate(config_dict)
175 return None
177 except grpc.RpcError as e:
178 logger.error("gRPC error getting plugin config: %s", e)
179 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
181 async def invoke_hook(self, hook_type: str, payload: PluginPayload, context: PluginContext) -> PluginResult:
182 """Invoke an external plugin hook using gRPC.
184 Args:
185 hook_type: The type of hook invoked (e.g., "tool_pre_invoke").
186 payload: The payload to be passed to the hook.
187 context: The plugin context passed to the hook.
189 Returns:
190 The resulting payload from the plugin.
192 Raises:
193 PluginError: If there is an error invoking the hook.
194 """
195 # Get the result type from the global registry
196 registry = get_hook_registry()
197 result_type = registry.get_result_type(hook_type)
198 if not result_type:
199 raise PluginError(
200 error=PluginErrorModel(
201 message=f"Hook type '{hook_type}' not registered in hook registry",
202 plugin_name=self.name,
203 )
204 )
206 if not self._stub:
207 raise PluginError(
208 error=PluginErrorModel(
209 message="gRPC stub not initialized",
210 plugin_name=self.name,
211 )
212 )
214 try:
215 # Convert payload to Struct (still polymorphic)
216 payload_struct = Struct()
217 json_format.ParseDict(payload.model_dump(), payload_struct)
219 # Convert context to explicit proto message (faster than Struct)
220 context_proto = pydantic_context_to_proto(context)
222 # Create and send request
223 request = plugin_service_pb2.InvokeHookRequest(
224 hook_type=hook_type,
225 plugin_name=self.name,
226 payload=payload_struct,
227 context=context_proto,
228 )
230 response = await self._stub.InvokeHook(request)
232 # Check for error
233 if response.HasField("error") and response.error.message:
234 error = PluginErrorModel(
235 message=response.error.message,
236 plugin_name=response.error.plugin_name or self.name,
237 code=response.error.code,
238 mcp_error_code=response.error.mcp_error_code,
239 )
240 if response.error.HasField("details"):
241 error.details = json_format.MessageToDict(response.error.details)
242 raise PluginError(error=error)
244 # Update context if modified (using explicit proto message)
245 if response.HasField("context"):
246 update_pydantic_context_from_proto(context, response.context)
248 # Parse and return result
249 if response.HasField("result"):
250 result_dict = json_format.MessageToDict(response.result)
251 return result_type.model_validate(result_dict)
253 raise PluginError(
254 error=PluginErrorModel(
255 message="Received invalid response from gRPC plugin server",
256 plugin_name=self.name,
257 )
258 )
260 except PluginError:
261 raise
262 except grpc.RpcError as e:
263 logger.exception("gRPC error invoking hook: %s", e)
264 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
265 except Exception as e:
266 logger.exception("Error invoking gRPC hook: %s", e)
267 raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
269 async def shutdown(self) -> None:
270 """Shutdown the gRPC connection and cleanup resources."""
271 if self._channel:
272 await self._channel.close()
273 self._channel = None
274 self._stub = None
275 logger.info("gRPC channel closed for plugin %s", self.name)