Coverage for mcpgateway / plugins / framework / external / grpc / server / server.py: 100%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/server.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7gRPC servicer implementations for external plugin server.
9This module provides gRPC servicer classes that adapt gRPC calls to the
10ExternalPluginServer, which handles the actual plugin loading and execution.
11"""
12# pylint: disable=no-member,no-name-in-module
14# Standard
15import logging
16from typing import Any
18# Third-Party
19from google.protobuf import json_format
20from google.protobuf.struct_pb2 import Struct
21import grpc
23# First-Party
24from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc
25from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer
26from mcpgateway.plugins.framework.external.proto_convert import (
27 proto_context_to_pydantic,
28 pydantic_context_to_proto,
29)
30from mcpgateway.plugins.framework.models import PluginContext
32logger = logging.getLogger(__name__)
35class GrpcPluginServicer(plugin_service_pb2_grpc.PluginServiceServicer):
36 """gRPC servicer that adapts gRPC calls to ExternalPluginServer.
38 This servicer wraps an ExternalPluginServer instance and translates
39 between gRPC protocol buffer messages and the Pydantic models used
40 by the plugin framework.
42 Examples:
43 >>> from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer
44 >>> plugin_server = ExternalPluginServer(config_path="plugins/config.yaml")
45 >>> servicer = GrpcPluginServicer(plugin_server)
46 """
48 def __init__(self, plugin_server: ExternalPluginServer) -> None:
49 """Initialize the gRPC servicer with a plugin server.
51 Args:
52 plugin_server: The ExternalPluginServer instance that handles
53 plugin loading and execution.
54 """
55 self._plugin_server = plugin_server
57 async def GetPluginConfig( # pylint: disable=invalid-overridden-method
58 self,
59 request: plugin_service_pb2.GetPluginConfigRequest,
60 context: grpc.aio.ServicerContext,
61 ) -> plugin_service_pb2.GetPluginConfigResponse:
62 """Get configuration for a single plugin by name.
64 Args:
65 request: gRPC request containing the plugin name.
66 context: gRPC servicer context.
68 Returns:
69 Response containing the plugin configuration or empty if not found.
70 """
71 logger.debug("GetPluginConfig called for plugin: %s", request.name)
73 try:
74 config = await self._plugin_server.get_plugin_config(request.name)
76 response = plugin_service_pb2.GetPluginConfigResponse()
77 if config:
78 response.found = True
79 json_format.ParseDict(config, response.config)
80 else:
81 response.found = False
83 return response
85 except Exception as e:
86 logger.exception("Error in GetPluginConfig: %s", e)
87 context.set_code(grpc.StatusCode.INTERNAL)
88 context.set_details(str(e))
89 return plugin_service_pb2.GetPluginConfigResponse(found=False)
91 async def GetPluginConfigs( # pylint: disable=invalid-overridden-method
92 self,
93 request: plugin_service_pb2.GetPluginConfigsRequest,
94 context: grpc.aio.ServicerContext,
95 ) -> plugin_service_pb2.GetPluginConfigsResponse:
96 """Get configurations for all plugins on the server.
98 Args:
99 request: gRPC request (empty).
100 context: gRPC servicer context.
102 Returns:
103 Response containing list of all plugin configurations.
104 """
105 logger.debug("GetPluginConfigs called")
107 try:
108 configs = await self._plugin_server.get_plugin_configs()
110 response = plugin_service_pb2.GetPluginConfigsResponse()
111 for config in configs:
112 config_struct = Struct()
113 json_format.ParseDict(config, config_struct)
114 response.configs.append(config_struct)
116 return response
118 except Exception as e:
119 logger.exception("Error in GetPluginConfigs: %s", e)
120 context.set_code(grpc.StatusCode.INTERNAL)
121 context.set_details(str(e))
122 return plugin_service_pb2.GetPluginConfigsResponse()
124 async def InvokeHook( # pylint: disable=invalid-overridden-method
125 self,
126 request: plugin_service_pb2.InvokeHookRequest,
127 context: grpc.aio.ServicerContext,
128 ) -> plugin_service_pb2.InvokeHookResponse:
129 """Invoke a plugin hook.
131 Args:
132 request: gRPC request containing hook_type, plugin_name, payload, and context.
133 context: gRPC servicer context.
135 Returns:
136 Response containing the plugin result or error.
137 """
138 logger.debug(
139 "InvokeHook called: hook_type=%s, plugin_name=%s",
140 request.hook_type,
141 request.plugin_name,
142 )
144 try:
145 # Convert payload Struct to Python dict (still polymorphic)
146 payload_dict = json_format.MessageToDict(request.payload)
148 # Convert explicit PluginContext proto directly to Pydantic
149 context_pydantic = proto_context_to_pydantic(request.context)
151 # Invoke the hook using the plugin server (passing Pydantic context directly)
152 result = await self._plugin_server.invoke_hook(
153 hook_type=request.hook_type,
154 plugin_name=request.plugin_name,
155 payload=payload_dict,
156 context=context_pydantic,
157 )
159 # Build the response
160 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name)
162 # Check for error in result
163 if "error" in result:
164 error_obj = result["error"]
165 # Handle both Pydantic models and dicts
166 if hasattr(error_obj, "model_dump"):
167 error_dict = error_obj.model_dump()
168 else:
169 error_dict = error_obj
170 response.error.CopyFrom(self._dict_to_plugin_error(error_dict))
171 else:
172 # Convert result to Struct (still polymorphic)
173 if "result" in result:
174 json_format.ParseDict(result["result"], response.result)
175 # Convert context to explicit proto message
176 if "context" in result:
177 ctx = result["context"]
178 # Handle both Pydantic (optimized path) and dict (MCP compat)
179 if isinstance(ctx, PluginContext):
180 response.context.CopyFrom(pydantic_context_to_proto(ctx))
181 else:
182 updated_context = PluginContext.model_validate(ctx)
183 response.context.CopyFrom(pydantic_context_to_proto(updated_context))
185 return response
187 except Exception as e:
188 logger.exception("Error in InvokeHook: %s", e)
189 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name)
190 response.error.message = str(e)
191 response.error.plugin_name = request.plugin_name
192 response.error.code = "INTERNAL_ERROR"
193 response.error.mcp_error_code = -32603
194 return response
196 def _dict_to_plugin_error(self, error_dict: dict[str, Any]) -> plugin_service_pb2.PluginError:
197 """Convert an error dictionary to a PluginError protobuf message.
199 Args:
200 error_dict: Dictionary containing error information.
202 Returns:
203 PluginError protobuf message.
204 """
205 error = plugin_service_pb2.PluginError()
206 error.message = error_dict.get("message", "Unknown error")
207 error.plugin_name = error_dict.get("plugin_name", "unknown")
208 error.code = error_dict.get("code", "")
209 error.mcp_error_code = error_dict.get("mcp_error_code", -32603)
211 if "details" in error_dict and error_dict["details"]:
212 json_format.ParseDict(error_dict["details"], error.details)
214 return error
217class GrpcHealthServicer(plugin_service_pb2_grpc.HealthServicer):
218 """gRPC health check servicer following the standard gRPC health protocol.
220 This servicer provides health check endpoints that can be used by
221 load balancers and orchestration systems to verify the server is
222 operational.
224 Examples:
225 >>> servicer = GrpcHealthServicer()
226 >>> # Register with gRPC server
227 """
229 def __init__(self, plugin_server: ExternalPluginServer | None = None) -> None:
230 """Initialize the health servicer.
232 Args:
233 plugin_server: Optional ExternalPluginServer for checking plugin health.
234 """
235 self._plugin_server = plugin_server
237 async def Check( # pylint: disable=invalid-overridden-method
238 self,
239 request: plugin_service_pb2.HealthCheckRequest,
240 context: grpc.aio.ServicerContext,
241 ) -> plugin_service_pb2.HealthCheckResponse:
242 """Check the health status of the server.
244 Args:
245 request: Health check request with optional service name.
246 context: gRPC servicer context.
248 Returns:
249 Health check response with serving status.
250 """
251 logger.debug("Health check called for service: %s", request.service or "(overall)")
253 # For now, always return SERVING if the server is running
254 # In the future, could check plugin_server health
255 return plugin_service_pb2.HealthCheckResponse(status=plugin_service_pb2.HealthCheckResponse.SERVING)