Coverage for mcpgateway / plugins / framework / external / grpc / server / server.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/server.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7gRPC servicer implementations for external plugin server. 

8 

9This module provides gRPC servicer classes that adapt gRPC calls to the 

10ExternalPluginServer, which handles the actual plugin loading and execution. 

11""" 

12# pylint: disable=no-member,no-name-in-module 

13 

14# Standard 

15import logging 

16from typing import Any 

17 

18# Third-Party 

19from google.protobuf import json_format 

20from google.protobuf.struct_pb2 import Struct 

21import grpc 

22 

23# First-Party 

24from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2, plugin_service_pb2_grpc 

25from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer 

26from mcpgateway.plugins.framework.external.proto_convert import ( 

27 proto_context_to_pydantic, 

28 pydantic_context_to_proto, 

29) 

30from mcpgateway.plugins.framework.models import PluginContext 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35class GrpcPluginServicer(plugin_service_pb2_grpc.PluginServiceServicer): 

36 """gRPC servicer that adapts gRPC calls to ExternalPluginServer. 

37 

38 This servicer wraps an ExternalPluginServer instance and translates 

39 between gRPC protocol buffer messages and the Pydantic models used 

40 by the plugin framework. 

41 

42 Examples: 

43 >>> from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer 

44 >>> plugin_server = ExternalPluginServer(config_path="plugins/config.yaml") 

45 >>> servicer = GrpcPluginServicer(plugin_server) 

46 """ 

47 

48 def __init__(self, plugin_server: ExternalPluginServer) -> None: 

49 """Initialize the gRPC servicer with a plugin server. 

50 

51 Args: 

52 plugin_server: The ExternalPluginServer instance that handles 

53 plugin loading and execution. 

54 """ 

55 self._plugin_server = plugin_server 

56 

57 async def GetPluginConfig( # pylint: disable=invalid-overridden-method 

58 self, 

59 request: plugin_service_pb2.GetPluginConfigRequest, 

60 context: grpc.aio.ServicerContext, 

61 ) -> plugin_service_pb2.GetPluginConfigResponse: 

62 """Get configuration for a single plugin by name. 

63 

64 Args: 

65 request: gRPC request containing the plugin name. 

66 context: gRPC servicer context. 

67 

68 Returns: 

69 Response containing the plugin configuration or empty if not found. 

70 """ 

71 logger.debug("GetPluginConfig called for plugin: %s", request.name) 

72 

73 try: 

74 config = await self._plugin_server.get_plugin_config(request.name) 

75 

76 response = plugin_service_pb2.GetPluginConfigResponse() 

77 if config: 

78 response.found = True 

79 json_format.ParseDict(config, response.config) 

80 else: 

81 response.found = False 

82 

83 return response 

84 

85 except Exception as e: 

86 logger.exception("Error in GetPluginConfig: %s", e) 

87 context.set_code(grpc.StatusCode.INTERNAL) 

88 context.set_details(str(e)) 

89 return plugin_service_pb2.GetPluginConfigResponse(found=False) 

90 

91 async def GetPluginConfigs( # pylint: disable=invalid-overridden-method 

92 self, 

93 request: plugin_service_pb2.GetPluginConfigsRequest, 

94 context: grpc.aio.ServicerContext, 

95 ) -> plugin_service_pb2.GetPluginConfigsResponse: 

96 """Get configurations for all plugins on the server. 

97 

98 Args: 

99 request: gRPC request (empty). 

100 context: gRPC servicer context. 

101 

102 Returns: 

103 Response containing list of all plugin configurations. 

104 """ 

105 logger.debug("GetPluginConfigs called") 

106 

107 try: 

108 configs = await self._plugin_server.get_plugin_configs() 

109 

110 response = plugin_service_pb2.GetPluginConfigsResponse() 

111 for config in configs: 

112 config_struct = Struct() 

113 json_format.ParseDict(config, config_struct) 

114 response.configs.append(config_struct) 

115 

116 return response 

117 

118 except Exception as e: 

119 logger.exception("Error in GetPluginConfigs: %s", e) 

120 context.set_code(grpc.StatusCode.INTERNAL) 

121 context.set_details(str(e)) 

122 return plugin_service_pb2.GetPluginConfigsResponse() 

123 

124 async def InvokeHook( # pylint: disable=invalid-overridden-method 

125 self, 

126 request: plugin_service_pb2.InvokeHookRequest, 

127 context: grpc.aio.ServicerContext, 

128 ) -> plugin_service_pb2.InvokeHookResponse: 

129 """Invoke a plugin hook. 

130 

131 Args: 

132 request: gRPC request containing hook_type, plugin_name, payload, and context. 

133 context: gRPC servicer context. 

134 

135 Returns: 

136 Response containing the plugin result or error. 

137 """ 

138 logger.debug( 

139 "InvokeHook called: hook_type=%s, plugin_name=%s", 

140 request.hook_type, 

141 request.plugin_name, 

142 ) 

143 

144 try: 

145 # Convert payload Struct to Python dict (still polymorphic) 

146 payload_dict = json_format.MessageToDict(request.payload) 

147 

148 # Convert explicit PluginContext proto directly to Pydantic 

149 context_pydantic = proto_context_to_pydantic(request.context) 

150 

151 # Invoke the hook using the plugin server (passing Pydantic context directly) 

152 result = await self._plugin_server.invoke_hook( 

153 hook_type=request.hook_type, 

154 plugin_name=request.plugin_name, 

155 payload=payload_dict, 

156 context=context_pydantic, 

157 ) 

158 

159 # Build the response 

160 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name) 

161 

162 # Check for error in result 

163 if "error" in result: 

164 error_obj = result["error"] 

165 # Handle both Pydantic models and dicts 

166 if hasattr(error_obj, "model_dump"): 

167 error_dict = error_obj.model_dump() 

168 else: 

169 error_dict = error_obj 

170 response.error.CopyFrom(self._dict_to_plugin_error(error_dict)) 

171 else: 

172 # Convert result to Struct (still polymorphic) 

173 if "result" in result: 

174 json_format.ParseDict(result["result"], response.result) 

175 # Convert context to explicit proto message 

176 if "context" in result: 

177 ctx = result["context"] 

178 # Handle both Pydantic (optimized path) and dict (MCP compat) 

179 if isinstance(ctx, PluginContext): 

180 response.context.CopyFrom(pydantic_context_to_proto(ctx)) 

181 else: 

182 updated_context = PluginContext.model_validate(ctx) 

183 response.context.CopyFrom(pydantic_context_to_proto(updated_context)) 

184 

185 return response 

186 

187 except Exception as e: 

188 logger.exception("Error in InvokeHook: %s", e) 

189 response = plugin_service_pb2.InvokeHookResponse(plugin_name=request.plugin_name) 

190 response.error.message = str(e) 

191 response.error.plugin_name = request.plugin_name 

192 response.error.code = "INTERNAL_ERROR" 

193 response.error.mcp_error_code = -32603 

194 return response 

195 

196 def _dict_to_plugin_error(self, error_dict: dict[str, Any]) -> plugin_service_pb2.PluginError: 

197 """Convert an error dictionary to a PluginError protobuf message. 

198 

199 Args: 

200 error_dict: Dictionary containing error information. 

201 

202 Returns: 

203 PluginError protobuf message. 

204 """ 

205 error = plugin_service_pb2.PluginError() 

206 error.message = error_dict.get("message", "Unknown error") 

207 error.plugin_name = error_dict.get("plugin_name", "unknown") 

208 error.code = error_dict.get("code", "") 

209 error.mcp_error_code = error_dict.get("mcp_error_code", -32603) 

210 

211 if "details" in error_dict and error_dict["details"]: 

212 json_format.ParseDict(error_dict["details"], error.details) 

213 

214 return error 

215 

216 

217class GrpcHealthServicer(plugin_service_pb2_grpc.HealthServicer): 

218 """gRPC health check servicer following the standard gRPC health protocol. 

219 

220 This servicer provides health check endpoints that can be used by 

221 load balancers and orchestration systems to verify the server is 

222 operational. 

223 

224 Examples: 

225 >>> servicer = GrpcHealthServicer() 

226 >>> # Register with gRPC server 

227 """ 

228 

229 def __init__(self, plugin_server: ExternalPluginServer | None = None) -> None: 

230 """Initialize the health servicer. 

231 

232 Args: 

233 plugin_server: Optional ExternalPluginServer for checking plugin health. 

234 """ 

235 self._plugin_server = plugin_server 

236 

237 async def Check( # pylint: disable=invalid-overridden-method 

238 self, 

239 request: plugin_service_pb2.HealthCheckRequest, 

240 context: grpc.aio.ServicerContext, 

241 ) -> plugin_service_pb2.HealthCheckResponse: 

242 """Check the health status of the server. 

243 

244 Args: 

245 request: Health check request with optional service name. 

246 context: gRPC servicer context. 

247 

248 Returns: 

249 Health check response with serving status. 

250 """ 

251 logger.debug("Health check called for service: %s", request.service or "(overall)") 

252 

253 # For now, always return SERVING if the server is running 

254 # In the future, could check plugin_server health 

255 return plugin_service_pb2.HealthCheckResponse(status=plugin_service_pb2.HealthCheckResponse.SERVING)