Coverage for mcpgateway / plugins / framework / external / grpc / server / server.py: 100%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/server.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7gRPC servicer implementations for external plugin server.
9This module provides gRPC servicer classes that adapt gRPC calls to the
10ExternalPluginServer, which handles the actual plugin loading and execution.
11"""
13# pylint: disable=no-member,no-name-in-module
15# Standard
16import logging
17from typing import Any
19# Third-Party
20from google.protobuf import json_format
21from google.protobuf.struct_pb2 import Struct
22import grpc
24# First-Party
25from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc
26from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer
27from mcpgateway.plugins.framework.external.proto_convert import (
28 proto_context_to_pydantic,
29 pydantic_context_to_proto,
30)
31from mcpgateway.plugins.framework.models import PluginContext
33logger = logging.getLogger(__name__)
36class GrpcPluginServicer(plugin_service_pb2_grpc.PluginServiceServicer):
37 """gRPC servicer that adapts gRPC calls to ExternalPluginServer.
39 This servicer wraps an ExternalPluginServer instance and translates
40 between gRPC protocol buffer messages and the Pydantic models used
41 by the plugin framework.
43 Examples:
44 >>> from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer
45 >>> plugin_server = ExternalPluginServer(config_path="plugins/config.yaml")
46 >>> servicer = GrpcPluginServicer(plugin_server)
47 """
49 def __init__(self, plugin_server: ExternalPluginServer) -> None:
50 """Initialize the gRPC servicer with a plugin server.
52 Args:
53 plugin_server: The ExternalPluginServer instance that handles
54 plugin loading and execution.
55 """
56 self._plugin_server = plugin_server
58 async def GetPluginConfig( # pylint: disable=invalid-overridden-method
59 self,
60 request: plugin_service_pb2.GetPluginConfigRequest,
61 context: grpc.aio.ServicerContext,
62 ) -> plugin_service_pb2.GetPluginConfigResponse:
63 """Get configuration for a single plugin by name.
65 Args:
66 request: gRPC request containing the plugin name.
67 context: gRPC servicer context.
69 Returns:
70 Response containing the plugin configuration or empty if not found.
71 """
72 logger.debug("GetPluginConfig called for plugin: %s", request.name)
74 try:
75 config = await self._plugin_server.get_plugin_config(request.name)
77 response = plugin_service_pb2.GetPluginConfigResponse()
78 if config:
79 response.found = True
80 json_format.ParseDict(config, response.config)
81 else:
82 response.found = False
84 return response
86 except Exception as e:
87 logger.exception("Error in GetPluginConfig: %s", e)
88 context.set_code(grpc.StatusCode.INTERNAL)
89 context.set_details(str(e))
90 return plugin_service_pb2.GetPluginConfigResponse(found=False)
92 async def GetPluginConfigs( # pylint: disable=invalid-overridden-method
93 self,
94 request: plugin_service_pb2.GetPluginConfigsRequest,
95 context: grpc.aio.ServicerContext,
96 ) -> plugin_service_pb2.GetPluginConfigsResponse:
97 """Get configurations for all plugins on the server.
99 Args:
100 request: gRPC request (empty).
101 context: gRPC servicer context.
103 Returns:
104 Response containing list of all plugin configurations.
105 """
106 logger.debug("GetPluginConfigs called")
108 try:
109 configs = await self._plugin_server.get_plugin_configs()
111 response = plugin_service_pb2.GetPluginConfigsResponse()
112 for config in configs:
113 config_struct = Struct()
114 json_format.ParseDict(config, config_struct)
115 response.configs.append(config_struct)
117 return response
119 except Exception as e:
120 logger.exception("Error in GetPluginConfigs: %s", e)
121 context.set_code(grpc.StatusCode.INTERNAL)
122 context.set_details(str(e))
123 return plugin_service_pb2.GetPluginConfigsResponse()
125 async def InvokeHook( # pylint: disable=invalid-overridden-method
126 self,
127 request: plugin_service_pb2.InvokeHookRequest,
128 context: grpc.aio.ServicerContext,
129 ) -> plugin_service_pb2.InvokeHookResponse:
130 """Invoke a plugin hook.
132 Args:
133 request: gRPC request containing hook_type, plugin_name, payload, and context.
134 context: gRPC servicer context.
136 Returns:
137 Response containing the plugin result or error.
138 """
139 logger.debug(
140 "InvokeHook called: hook_type=%s, plugin_name=%s",
141 request.hook_type,
142 request.plugin_name,
143 )
145 try:
146 # Convert payload Struct to Python dict (still polymorphic)
147 payload_dict = json_format.MessageToDict(request.payload)
149 # Convert explicit PluginContext proto directly to Pydantic
150 context_pydantic = proto_context_to_pydantic(request.context)
152 # Invoke the hook using the plugin server (passing Pydantic context directly)
153 result = await self._plugin_server.invoke_hook(
154 hook_type=request.hook_type,
155 plugin_name=request.plugin_name,
156 payload=payload_dict,
157 context=context_pydantic,
158 )
160 # Build the response
161 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name)
163 # Check for error in result
164 if "error" in result:
165 error_obj = result["error"]
166 # Handle both Pydantic models and dicts
167 if hasattr(error_obj, "model_dump"):
168 error_dict = error_obj.model_dump()
169 else:
170 error_dict = error_obj
171 response.error.CopyFrom(self._dict_to_plugin_error(error_dict))
172 else:
173 # Convert result to Struct (still polymorphic)
174 if "result" in result:
175 json_format.ParseDict(result["result"], response.result)
176 # Convert context to explicit proto message
177 if "context" in result:
178 ctx = result["context"]
179 # Handle both Pydantic (optimized path) and dict (MCP compat)
180 if isinstance(ctx, PluginContext):
181 response.context.CopyFrom(pydantic_context_to_proto(ctx))
182 else:
183 updated_context = PluginContext.model_validate(ctx)
184 response.context.CopyFrom(pydantic_context_to_proto(updated_context))
186 return response
188 except Exception as e:
189 logger.exception("Error in InvokeHook: %s", e)
190 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name)
191 response.error.message = str(e)
192 response.error.plugin_name = request.plugin_name
193 response.error.code = "INTERNAL_ERROR"
194 response.error.mcp_error_code = -32603
195 return response
197 def _dict_to_plugin_error(self, error_dict: dict[str, Any]) -> plugin_service_pb2.PluginError:
198 """Convert an error dictionary to a PluginError protobuf message.
200 Args:
201 error_dict: Dictionary containing error information.
203 Returns:
204 PluginError protobuf message.
205 """
206 error = plugin_service_pb2.PluginError()
207 error.message = error_dict.get("message", "Unknown error")
208 error.plugin_name = error_dict.get("plugin_name", "unknown")
209 error.code = error_dict.get("code", "")
210 error.mcp_error_code = error_dict.get("mcp_error_code", -32603)
212 if "details" in error_dict and error_dict["details"]:
213 json_format.ParseDict(error_dict["details"], error.details)
215 return error
218class GrpcHealthServicer(plugin_service_pb2_grpc.HealthServicer):
219 """gRPC health check servicer following the standard gRPC health protocol.
221 This servicer provides health check endpoints that can be used by
222 load balancers and orchestration systems to verify the server is
223 operational.
225 Examples:
226 >>> servicer = GrpcHealthServicer()
227 >>> # Register with gRPC server
228 """
230 def __init__(self, plugin_server: ExternalPluginServer | None = None) -> None:
231 """Initialize the health servicer.
233 Args:
234 plugin_server: Optional ExternalPluginServer for checking plugin health.
235 """
236 self._plugin_server = plugin_server
238 async def Check( # pylint: disable=invalid-overridden-method
239 self,
240 request: plugin_service_pb2.HealthCheckRequest,
241 context: grpc.aio.ServicerContext,
242 ) -> plugin_service_pb2.HealthCheckResponse:
243 """Check the health status of the server.
245 Args:
246 request: Health check request with optional service name.
247 context: gRPC servicer context.
249 Returns:
250 Health check response with serving status.
251 """
252 logger.debug("Health check called for service: %s", request.service or "(overall)")
254 # For now, always return SERVING if the server is running
255 # In the future, could check plugin_server health
256 return plugin_service_pb2.HealthCheckResponse(status=plugin_service_pb2.HealthCheckResponse.SERVING)