Coverage for mcpgateway / plugins / framework / external / grpc / server / runtime.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/runtime.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7gRPC server runtime for external plugins. 

8 

9This module provides the entry point for running a gRPC server that exposes 

10plugin functionality. It reuses the ExternalPluginServer for plugin loading 

11and wraps it with gRPC servicers. 

12 

13Usage: 

14 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\ 

15 --config plugins/config.yaml \\ 

16 --host 0.0.0.0 \\ 

17 --port 50051 

18 

19Environment Variables: 

20 PLUGINS_CONFIG_PATH: Path to plugins configuration file (default: ./resources/plugins/config.yaml) 

21 PLUGINS_GRPC_SERVER_HOST: Server host (default: 0.0.0.0) 

22 PLUGINS_GRPC_SERVER_PORT: Server port (default: 50051) 

23 PLUGINS_GRPC_SERVER_UDS: Unix domain socket path (alternative to host:port) 

24 PLUGINS_GRPC_SERVER_SSL_ENABLED: Enable TLS (true/false). Required to enable TLS. Not supported with UDS. 

25 PLUGINS_GRPC_SERVER_SSL_CERTFILE: Path to server certificate (required when SSL_ENABLED=true) 

26 PLUGINS_GRPC_SERVER_SSL_KEYFILE: Path to server private key (required when SSL_ENABLED=true) 

27 PLUGINS_GRPC_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification (for mTLS) 

28 PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH: Client auth requirement (none/optional/require, default: require) 

29""" 

30 

31# Standard 

32import argparse 

33import asyncio 

34import logging 

35import os 

36import signal 

37import sys 

38from typing import Optional 

39 

40# Third-Party 

41import grpc 

42 

43# First-Party 

44from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2_grpc 

45from mcpgateway.plugins.framework.external.grpc.server.server import GrpcHealthServicer, GrpcPluginServicer 

46from mcpgateway.plugins.framework.external.grpc.tls_utils import create_server_credentials 

47from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer 

48from mcpgateway.plugins.framework.models import GRPCServerConfig 

49 

50logger = logging.getLogger(__name__) 

51 

52 

53class GrpcPluginRuntime: 

54 """Runtime manager for the gRPC plugin server. 

55 

56 This class handles the lifecycle of the gRPC server, including: 

57 - Plugin server initialization 

58 - gRPC server setup and configuration 

59 - TLS/mTLS configuration 

60 - Graceful shutdown handling 

61 

62 Examples: 

63 >>> runtime = GrpcPluginRuntime(config_path="plugins/config.yaml") 

64 >>> # In async context: 

65 >>> # await runtime.start() 

66 """ 

67 

68 def __init__( 

69 self, 

70 config_path: Optional[str] = None, 

71 host: Optional[str] = None, 

72 port: Optional[int] = None, 

73 ) -> None: 

74 """Initialize the gRPC plugin runtime. 

75 

76 Args: 

77 config_path: Path to the plugins configuration file. 

78 host: Server host to bind to (overrides config/env). 

79 port: Server port to bind to (overrides config/env). 

80 """ 

81 self._config_path = config_path 

82 self._host_override = host 

83 self._port_override = port 

84 self._server: Optional[grpc.aio.Server] = None 

85 self._plugin_server: Optional[ExternalPluginServer] = None 

86 self._shutdown_event = asyncio.Event() 

87 

88 async def start(self) -> None: 

89 """Start the gRPC plugin server. 

90 

91 This method: 

92 1. Creates and initializes the ExternalPluginServer 

93 2. Creates the gRPC server with servicers 

94 3. Configures TLS if enabled 

95 4. Starts serving requests 

96 

97 Raises: 

98 RuntimeError: If server fails to start. 

99 """ 

100 logger.info("Starting gRPC plugin server...") 

101 

102 # Create and initialize the plugin server 

103 self._plugin_server = ExternalPluginServer(config_path=self._config_path) 

104 await self._plugin_server.initialize() 

105 

106 # Get server configuration 

107 server_config = self._get_server_config() 

108 

109 # Determine bind address (UDS takes precedence, then overrides, then config) 

110 if server_config.uds: 

111 address = server_config.get_bind_address() 

112 is_uds = True 

113 else: 

114 host = self._host_override or server_config.host 

115 port = self._port_override or server_config.port 

116 address = f"{host}:{port}" 

117 is_uds = False 

118 

119 # Create gRPC server 

120 self._server = grpc.aio.server() 

121 

122 # Add servicers 

123 plugin_servicer = GrpcPluginServicer(self._plugin_server) 

124 health_servicer = GrpcHealthServicer(self._plugin_server) 

125 

126 plugin_service_pb2_grpc.add_PluginServiceServicer_to_server(plugin_servicer, self._server) 

127 plugin_service_pb2_grpc.add_HealthServicer_to_server(health_servicer, self._server) 

128 

129 # Configure address and TLS (TLS not supported for Unix domain sockets) 

130 if not is_uds and server_config.tls is not None: 

131 credentials = create_server_credentials(server_config.tls) 

132 self._server.add_secure_port(address, credentials) 

133 logger.info("gRPC server configured with TLS on %s", address) 

134 else: 

135 self._server.add_insecure_port(address) 

136 if is_uds: 

137 logger.info("gRPC server configured on Unix socket %s", server_config.uds) 

138 else: 

139 logger.warning("gRPC server configured WITHOUT TLS on %s - not recommended for production", address) 

140 

141 # Start serving 

142 await self._server.start() 

143 

144 # Set restrictive permissions on Unix socket (owner read/write only) 

145 if is_uds and server_config.uds and os.path.exists(server_config.uds): 

146 os.chmod(server_config.uds, 0o600) 

147 

148 logger.info("gRPC plugin server started on %s", address) 

149 logger.info("Loaded %d plugins", len(await self._plugin_server.get_plugin_configs())) 

150 

151 # Wait for shutdown signal 

152 await self._shutdown_event.wait() 

153 

154 async def stop(self) -> None: 

155 """Stop the gRPC plugin server gracefully. 

156 

157 This method: 

158 1. Stops accepting new connections 

159 2. Waits for existing connections to complete (with timeout) 

160 3. Shuts down the plugin server 

161 """ 

162 logger.info("Stopping gRPC plugin server...") 

163 

164 if self._server: 

165 # Stop accepting new connections 

166 await self._server.stop(grace=5.0) 

167 logger.info("gRPC server stopped") 

168 

169 if self._plugin_server: 

170 await self._plugin_server.shutdown() 

171 logger.info("Plugin server shutdown complete") 

172 

173 def request_shutdown(self) -> None: 

174 """Request the server to shut down.""" 

175 self._shutdown_event.set() 

176 

177 def _get_server_config(self) -> GRPCServerConfig: 

178 """Get the gRPC server configuration. 

179 

180 Checks the plugin configuration file first, then falls back to 

181 environment variables, then uses defaults. 

182 

183 Returns: 

184 GRPCServerConfig with server settings. 

185 """ 

186 # Check if config has gRPC server settings 

187 if self._plugin_server: 

188 grpc_config = self._plugin_server.get_grpc_server_config() 

189 if grpc_config: 

190 return grpc_config 

191 

192 # Fall back to environment variables 

193 env_config = GRPCServerConfig.from_env() 

194 if env_config: 

195 return env_config 

196 

197 # Use defaults 

198 return GRPCServerConfig() 

199 

200 

201async def run_server( 

202 config_path: Optional[str] = None, 

203 host: Optional[str] = None, 

204 port: Optional[int] = None, 

205) -> None: 

206 """Run the gRPC plugin server. 

207 

208 Args: 

209 config_path: Path to the plugins configuration file. 

210 host: Server host to bind to. 

211 port: Server port to bind to. 

212 """ 

213 runtime = GrpcPluginRuntime( 

214 config_path=config_path, 

215 host=host, 

216 port=port, 

217 ) 

218 

219 # Set up signal handlers for graceful shutdown 

220 loop = asyncio.get_running_loop() 

221 

222 def signal_handler() -> None: 

223 """Handle SIGINT/SIGTERM by requesting graceful shutdown.""" 

224 logger.info("Received shutdown signal") 

225 runtime.request_shutdown() 

226 

227 for sig in (signal.SIGINT, signal.SIGTERM): 

228 loop.add_signal_handler(sig, signal_handler) 

229 

230 try: 

231 await runtime.start() 

232 finally: 

233 await runtime.stop() 

234 

235 

236def main() -> None: 

237 """Main entry point for the gRPC plugin server.""" 

238 parser = argparse.ArgumentParser( 

239 description="gRPC server for MCP Gateway external plugins", 

240 formatter_class=argparse.RawDescriptionHelpFormatter, 

241 epilog=""" 

242Examples: 

243 # Start with default settings 

244 python -m mcpgateway.plugins.framework.external.grpc.server.runtime 

245 

246 # Start with custom config and port 

247 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\ 

248 --config plugins/config.yaml --port 50051 

249 

250 # Start with TLS enabled (configure via environment variables) 

251 PLUGINS_GRPC_SERVER_SSL_ENABLED=true \\ 

252 PLUGINS_GRPC_SERVER_SSL_CERTFILE=/path/to/server.pem \\ 

253 PLUGINS_GRPC_SERVER_SSL_KEYFILE=/path/to/server-key.pem \\ 

254 PLUGINS_GRPC_SERVER_SSL_CA_CERTS=/path/to/ca.pem \\ 

255 python -m mcpgateway.plugins.framework.external.grpc.server.runtime 

256 """, 

257 ) 

258 

259 parser.add_argument( 

260 "--config", 

261 "-c", 

262 type=str, 

263 default=os.environ.get("PLUGINS_CONFIG_PATH"), 

264 help="Path to plugins configuration file", 

265 ) 

266 parser.add_argument( 

267 "--host", 

268 "-H", 

269 type=str, 

270 default=None, 

271 help="Server host to bind to (default: 0.0.0.0)", 

272 ) 

273 parser.add_argument( 

274 "--port", 

275 "-p", 

276 type=int, 

277 default=None, 

278 help="Server port to bind to (default: 50051)", 

279 ) 

280 parser.add_argument( 

281 "--log-level", 

282 "-l", 

283 type=str, 

284 default="INFO", 

285 choices=["DEBUG", "INFO", "WARNING", "ERROR"], 

286 help="Logging level (default: INFO)", 

287 ) 

288 

289 args = parser.parse_args() 

290 

291 # Configure logging 

292 logging.basicConfig( 

293 level=getattr(logging, args.log_level), 

294 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

295 ) 

296 

297 # Run the server 

298 try: 

299 asyncio.run( 

300 run_server( 

301 config_path=args.config, 

302 host=args.host, 

303 port=args.port, 

304 ) 

305 ) 

306 except KeyboardInterrupt: 

307 logger.info("Server shutdown complete") 

308 sys.exit(0) 

309 except Exception as e: 

310 logger.error("Server failed: %s", e) 

311 sys.exit(1) 

312 

313 

314if __name__ == "__main__": 

315 main()