Coverage for mcpgateway / plugins / framework / external / grpc / server / server.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/server.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7gRPC servicer implementations for external plugin server. 

8 

9This module provides gRPC servicer classes that adapt gRPC calls to the 

10ExternalPluginServer, which handles the actual plugin loading and execution. 

11""" 

12 

13# pylint: disable=no-member,no-name-in-module 

14 

15# Standard 

16import logging 

17from typing import Any 

18 

19# Third-Party 

20from google.protobuf import json_format 

21from google.protobuf.struct_pb2 import Struct 

22import grpc 

23 

24# First-Party 

25from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc 

26from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer 

27from mcpgateway.plugins.framework.external.proto_convert import ( 

28 proto_context_to_pydantic, 

29 pydantic_context_to_proto, 

30) 

31from mcpgateway.plugins.framework.models import PluginContext 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36class GrpcPluginServicer(plugin_service_pb2_grpc.PluginServiceServicer): 

37 """gRPC servicer that adapts gRPC calls to ExternalPluginServer. 

38 

39 This servicer wraps an ExternalPluginServer instance and translates 

40 between gRPC protocol buffer messages and the Pydantic models used 

41 by the plugin framework. 

42 

43 Examples: 

44 >>> from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer 

45 >>> plugin_server = ExternalPluginServer(config_path="plugins/config.yaml") 

46 >>> servicer = GrpcPluginServicer(plugin_server) 

47 """ 

48 

49 def __init__(self, plugin_server: ExternalPluginServer) -> None: 

50 """Initialize the gRPC servicer with a plugin server. 

51 

52 Args: 

53 plugin_server: The ExternalPluginServer instance that handles 

54 plugin loading and execution. 

55 """ 

56 self._plugin_server = plugin_server 

57 

58 async def GetPluginConfig( # pylint: disable=invalid-overridden-method 

59 self, 

60 request: plugin_service_pb2.GetPluginConfigRequest, 

61 context: grpc.aio.ServicerContext, 

62 ) -> plugin_service_pb2.GetPluginConfigResponse: 

63 """Get configuration for a single plugin by name. 

64 

65 Args: 

66 request: gRPC request containing the plugin name. 

67 context: gRPC servicer context. 

68 

69 Returns: 

70 Response containing the plugin configuration or empty if not found. 

71 """ 

72 logger.debug("GetPluginConfig called for plugin: %s", request.name) 

73 

74 try: 

75 config = await self._plugin_server.get_plugin_config(request.name) 

76 

77 response = plugin_service_pb2.GetPluginConfigResponse() 

78 if config: 

79 response.found = True 

80 json_format.ParseDict(config, response.config) 

81 else: 

82 response.found = False 

83 

84 return response 

85 

86 except Exception as e: 

87 logger.exception("Error in GetPluginConfig: %s", e) 

88 context.set_code(grpc.StatusCode.INTERNAL) 

89 context.set_details(str(e)) 

90 return plugin_service_pb2.GetPluginConfigResponse(found=False) 

91 

92 async def GetPluginConfigs( # pylint: disable=invalid-overridden-method 

93 self, 

94 request: plugin_service_pb2.GetPluginConfigsRequest, 

95 context: grpc.aio.ServicerContext, 

96 ) -> plugin_service_pb2.GetPluginConfigsResponse: 

97 """Get configurations for all plugins on the server. 

98 

99 Args: 

100 request: gRPC request (empty). 

101 context: gRPC servicer context. 

102 

103 Returns: 

104 Response containing list of all plugin configurations. 

105 """ 

106 logger.debug("GetPluginConfigs called") 

107 

108 try: 

109 configs = await self._plugin_server.get_plugin_configs() 

110 

111 response = plugin_service_pb2.GetPluginConfigsResponse() 

112 for config in configs: 

113 config_struct = Struct() 

114 json_format.ParseDict(config, config_struct) 

115 response.configs.append(config_struct) 

116 

117 return response 

118 

119 except Exception as e: 

120 logger.exception("Error in GetPluginConfigs: %s", e) 

121 context.set_code(grpc.StatusCode.INTERNAL) 

122 context.set_details(str(e)) 

123 return plugin_service_pb2.GetPluginConfigsResponse() 

124 

125 async def InvokeHook( # pylint: disable=invalid-overridden-method 

126 self, 

127 request: plugin_service_pb2.InvokeHookRequest, 

128 context: grpc.aio.ServicerContext, 

129 ) -> plugin_service_pb2.InvokeHookResponse: 

130 """Invoke a plugin hook. 

131 

132 Args: 

133 request: gRPC request containing hook_type, plugin_name, payload, and context. 

134 context: gRPC servicer context. 

135 

136 Returns: 

137 Response containing the plugin result or error. 

138 """ 

139 logger.debug( 

140 "InvokeHook called: hook_type=%s, plugin_name=%s", 

141 request.hook_type, 

142 request.plugin_name, 

143 ) 

144 

145 try: 

146 # Convert payload Struct to Python dict (still polymorphic) 

147 payload_dict = json_format.MessageToDict(request.payload) 

148 

149 # Convert explicit PluginContext proto directly to Pydantic 

150 context_pydantic = proto_context_to_pydantic(request.context) 

151 

152 # Invoke the hook using the plugin server (passing Pydantic context directly) 

153 result = await self._plugin_server.invoke_hook( 

154 hook_type=request.hook_type, 

155 plugin_name=request.plugin_name, 

156 payload=payload_dict, 

157 context=context_pydantic, 

158 ) 

159 

160 # Build the response 

161 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name) 

162 

163 # Check for error in result 

164 if "error" in result: 

165 error_obj = result["error"] 

166 # Handle both Pydantic models and dicts 

167 if hasattr(error_obj, "model_dump"): 

168 error_dict = error_obj.model_dump() 

169 else: 

170 error_dict = error_obj 

171 response.error.CopyFrom(self._dict_to_plugin_error(error_dict)) 

172 else: 

173 # Convert result to Struct (still polymorphic) 

174 if "result" in result: 

175 json_format.ParseDict(result["result"], response.result) 

176 # Convert context to explicit proto message 

177 if "context" in result: 

178 ctx = result["context"] 

179 # Handle both Pydantic (optimized path) and dict (MCP compat) 

180 if isinstance(ctx, PluginContext): 

181 response.context.CopyFrom(pydantic_context_to_proto(ctx)) 

182 else: 

183 updated_context = PluginContext.model_validate(ctx) 

184 response.context.CopyFrom(pydantic_context_to_proto(updated_context)) 

185 

186 return response 

187 

188 except Exception as e: 

189 logger.exception("Error in InvokeHook: %s", e) 

190 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name) 

191 response.error.message = str(e) 

192 response.error.plugin_name = request.plugin_name 

193 response.error.code = "INTERNAL_ERROR" 

194 response.error.mcp_error_code = -32603 

195 return response 

196 

197 def _dict_to_plugin_error(self, error_dict: dict[str, Any]) -> plugin_service_pb2.PluginError: 

198 """Convert an error dictionary to a PluginError protobuf message. 

199 

200 Args: 

201 error_dict: Dictionary containing error information. 

202 

203 Returns: 

204 PluginError protobuf message. 

205 """ 

206 error = plugin_service_pb2.PluginError() 

207 error.message = error_dict.get("message", "Unknown error") 

208 error.plugin_name = error_dict.get("plugin_name", "unknown") 

209 error.code = error_dict.get("code", "") 

210 error.mcp_error_code = error_dict.get("mcp_error_code", -32603) 

211 

212 if "details" in error_dict and error_dict["details"]: 

213 json_format.ParseDict(error_dict["details"], error.details) 

214 

215 return error 

216 

217 

218class GrpcHealthServicer(plugin_service_pb2_grpc.HealthServicer): 

219 """gRPC health check servicer following the standard gRPC health protocol. 

220 

221 This servicer provides health check endpoints that can be used by 

222 load balancers and orchestration systems to verify the server is 

223 operational. 

224 

225 Examples: 

226 >>> servicer = GrpcHealthServicer() 

227 >>> # Register with gRPC server 

228 """ 

229 

230 def __init__(self, plugin_server: ExternalPluginServer | None = None) -> None: 

231 """Initialize the health servicer. 

232 

233 Args: 

234 plugin_server: Optional ExternalPluginServer for checking plugin health. 

235 """ 

236 self._plugin_server = plugin_server 

237 

238 async def Check( # pylint: disable=invalid-overridden-method 

239 self, 

240 request: plugin_service_pb2.HealthCheckRequest, 

241 context: grpc.aio.ServicerContext, 

242 ) -> plugin_service_pb2.HealthCheckResponse: 

243 """Check the health status of the server. 

244 

245 Args: 

246 request: Health check request with optional service name. 

247 context: gRPC servicer context. 

248 

249 Returns: 

250 Health check response with serving status. 

251 """ 

252 logger.debug("Health check called for service: %s", request.service or "(overall)") 

253 

254 # For now, always return SERVING if the server is running 

255 # In the future, could check plugin_server health 

256 return plugin_service_pb2.HealthCheckResponse(status=plugin_service_pb2.HealthCheckResponse.SERVING)