Coverage for mcpgateway / plugins / framework / external / grpc / server / runtime.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/runtime.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7gRPC server runtime for external plugins. 

8 

9This module provides the entry point for running a gRPC server that exposes 

10plugin functionality. It reuses the ExternalPluginServer for plugin loading 

11and wraps it with gRPC servicers. 

12 

13Usage: 

14 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\ 

15 --config plugins/config.yaml \\ 

16 --host 0.0.0.0 \\ 

17 --port 50051 

18 

19Environment Variables: 

20 PLUGINS_CONFIG_PATH: Path to plugins configuration file (default: ./resources/plugins/config.yaml) 

21 PLUGINS_GRPC_SERVER_HOST: Server host (default: 0.0.0.0) 

22 PLUGINS_GRPC_SERVER_PORT: Server port (default: 50051) 

23 PLUGINS_GRPC_SERVER_UDS: Unix domain socket path (alternative to host:port) 

24 PLUGINS_GRPC_SERVER_SSL_ENABLED: Enable TLS (true/false). Required to enable TLS. Not supported with UDS. 

25 PLUGINS_GRPC_SERVER_SSL_CERTFILE: Path to server certificate (required when SSL_ENABLED=true) 

26 PLUGINS_GRPC_SERVER_SSL_KEYFILE: Path to server private key (required when SSL_ENABLED=true) 

27 PLUGINS_GRPC_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification (for mTLS) 

28 PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH: Client auth requirement (none/optional/require, default: require) 

29""" 

30 

31# Standard 

32import argparse 

33import asyncio 

34import logging 

35import os 

36import signal 

37import sys 

38from typing import Optional 

39 

40# Third-Party 

41import grpc 

42 

43# First-Party 

44from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2_grpc 

45from mcpgateway.plugins.framework.external.grpc.server.server import GrpcHealthServicer, GrpcPluginServicer 

46from mcpgateway.plugins.framework.external.grpc.tls_utils import create_server_credentials 

47from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer 

48from mcpgateway.plugins.framework.models import GRPCServerConfig 

49from mcpgateway.plugins.framework.settings import get_settings 

50 

51logger = logging.getLogger(__name__) 

52 

53 

54class GrpcPluginRuntime: 

55 """Runtime manager for the gRPC plugin server. 

56 

57 This class handles the lifecycle of the gRPC server, including: 

58 - Plugin server initialization 

59 - gRPC server setup and configuration 

60 - TLS/mTLS configuration 

61 - Graceful shutdown handling 

62 

63 Examples: 

64 >>> runtime = GrpcPluginRuntime(config_path="plugins/config.yaml") 

65 >>> # In async context: 

66 >>> # await runtime.start() 

67 """ 

68 

69 def __init__( 

70 self, 

71 config_path: Optional[str] = None, 

72 host: Optional[str] = None, 

73 port: Optional[int] = None, 

74 ) -> None: 

75 """Initialize the gRPC plugin runtime. 

76 

77 Args: 

78 config_path: Path to the plugins configuration file. 

79 host: Server host to bind to (overrides config/env). 

80 port: Server port to bind to (overrides config/env). 

81 """ 

82 self._config_path = config_path 

83 self._host_override = host 

84 self._port_override = port 

85 self._server: Optional[grpc.aio.Server] = None 

86 self._plugin_server: Optional[ExternalPluginServer] = None 

87 self._shutdown_event = asyncio.Event() 

88 

89 async def start(self) -> None: 

90 """Start the gRPC plugin server. 

91 

92 This method: 

93 1. Creates and initializes the ExternalPluginServer 

94 2. Creates the gRPC server with servicers 

95 3. Configures TLS if enabled 

96 4. Starts serving requests 

97 

98 Raises: 

99 RuntimeError: If server fails to start. 

100 """ 

101 logger.info("Starting gRPC plugin server...") 

102 

103 # Create and initialize the plugin server 

104 self._plugin_server = ExternalPluginServer(config_path=self._config_path) 

105 await self._plugin_server.initialize() 

106 

107 # Get server configuration 

108 server_config = self._get_server_config() 

109 

110 # Determine bind address (UDS takes precedence, then overrides, then config) 

111 if server_config.uds: 

112 address = server_config.get_bind_address() 

113 is_uds = True 

114 else: 

115 host = self._host_override or server_config.host 

116 port = self._port_override or server_config.port 

117 address = f"{host}:{port}" 

118 is_uds = False 

119 

120 # Create gRPC server 

121 self._server = grpc.aio.server() 

122 

123 # Add servicers 

124 plugin_servicer = GrpcPluginServicer(self._plugin_server) 

125 health_servicer = GrpcHealthServicer(self._plugin_server) 

126 

127 plugin_service_pb2_grpc.add_PluginServiceServicer_to_server(plugin_servicer, self._server) 

128 plugin_service_pb2_grpc.add_HealthServicer_to_server(health_servicer, self._server) 

129 

130 # Configure address and TLS (TLS not supported for Unix domain sockets) 

131 if not is_uds and server_config.tls is not None: 

132 credentials = create_server_credentials(server_config.tls) 

133 self._server.add_secure_port(address, credentials) 

134 logger.info("gRPC server configured with TLS on %s", address) 

135 else: 

136 self._server.add_insecure_port(address) 

137 if is_uds: 

138 logger.info("gRPC server configured on Unix socket %s", server_config.uds) 

139 else: 

140 logger.warning("gRPC server configured WITHOUT TLS on %s - not recommended for production", address) 

141 

142 # Start serving 

143 await self._server.start() 

144 

145 # Set restrictive permissions on Unix socket (owner read/write only) 

146 if is_uds and server_config.uds and os.path.exists(server_config.uds): 

147 os.chmod(server_config.uds, 0o600) 

148 

149 logger.info("gRPC plugin server started on %s", address) 

150 logger.info("Loaded %d plugins", len(await self._plugin_server.get_plugin_configs())) 

151 

152 # Wait for shutdown signal 

153 await self._shutdown_event.wait() 

154 

155 async def stop(self) -> None: 

156 """Stop the gRPC plugin server gracefully. 

157 

158 This method: 

159 1. Stops accepting new connections 

160 2. Waits for existing connections to complete (with timeout) 

161 3. Shuts down the plugin server 

162 """ 

163 logger.info("Stopping gRPC plugin server...") 

164 

165 if self._server: 

166 # Stop accepting new connections 

167 await self._server.stop(grace=5.0) 

168 logger.info("gRPC server stopped") 

169 

170 if self._plugin_server: 

171 await self._plugin_server.shutdown() 

172 logger.info("Plugin server shutdown complete") 

173 

174 def request_shutdown(self) -> None: 

175 """Request the server to shut down.""" 

176 self._shutdown_event.set() 

177 

178 def _get_server_config(self) -> GRPCServerConfig: 

179 """Get the gRPC server configuration. 

180 

181 Checks the plugin configuration file first, then falls back to 

182 environment variables, then uses defaults. 

183 

184 Returns: 

185 GRPCServerConfig with server settings. 

186 """ 

187 # Check if config has gRPC server settings 

188 if self._plugin_server: 

189 grpc_config = self._plugin_server.get_grpc_server_config() 

190 if grpc_config: 

191 return grpc_config 

192 

193 # Fall back to environment variables 

194 env_config = GRPCServerConfig.from_env() 

195 if env_config: 

196 return env_config 

197 

198 # Use defaults 

199 return GRPCServerConfig() 

200 

201 

202async def run_server( 

203 config_path: Optional[str] = None, 

204 host: Optional[str] = None, 

205 port: Optional[int] = None, 

206) -> None: 

207 """Run the gRPC plugin server. 

208 

209 Args: 

210 config_path: Path to the plugins configuration file. 

211 host: Server host to bind to. 

212 port: Server port to bind to. 

213 """ 

214 runtime = GrpcPluginRuntime( 

215 config_path=config_path, 

216 host=host, 

217 port=port, 

218 ) 

219 

220 # Set up signal handlers for graceful shutdown 

221 loop = asyncio.get_running_loop() 

222 

223 def signal_handler() -> None: 

224 """Handle SIGINT/SIGTERM by requesting graceful shutdown.""" 

225 logger.info("Received shutdown signal") 

226 runtime.request_shutdown() 

227 

228 for sig in (signal.SIGINT, signal.SIGTERM): 

229 loop.add_signal_handler(sig, signal_handler) 

230 

231 try: 

232 await runtime.start() 

233 finally: 

234 await runtime.stop() 

235 

236 

237def main() -> None: 

238 """Main entry point for the gRPC plugin server.""" 

239 parser = argparse.ArgumentParser( 

240 description="gRPC server for ContextForge external plugins", 

241 formatter_class=argparse.RawDescriptionHelpFormatter, 

242 epilog=""" 

243Examples: 

244 # Start with default settings 

245 python -m mcpgateway.plugins.framework.external.grpc.server.runtime 

246 

247 # Start with custom config and port 

248 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\ 

249 --config plugins/config.yaml --port 50051 

250 

251 # Start with TLS enabled (configure via environment variables) 

252 PLUGINS_GRPC_SERVER_SSL_ENABLED=true \\ 

253 PLUGINS_GRPC_SERVER_SSL_CERTFILE=/path/to/server.pem \\ 

254 PLUGINS_GRPC_SERVER_SSL_KEYFILE=/path/to/server-key.pem \\ 

255 PLUGINS_GRPC_SERVER_SSL_CA_CERTS=/path/to/ca.pem \\ 

256 python -m mcpgateway.plugins.framework.external.grpc.server.runtime 

257 """, 

258 ) 

259 

260 parser.add_argument( 

261 "--config", 

262 "-c", 

263 type=str, 

264 default=None, 

265 help="Path to plugins configuration file", 

266 ) 

267 parser.add_argument( 

268 "--host", 

269 "-H", 

270 type=str, 

271 default=None, 

272 help="Server host to bind to (default: 0.0.0.0)", 

273 ) 

274 parser.add_argument( 

275 "--port", 

276 "-p", 

277 type=int, 

278 default=None, 

279 help="Server port to bind to (default: 50051)", 

280 ) 

281 parser.add_argument( 

282 "--log-level", 

283 "-l", 

284 type=str, 

285 default="INFO", 

286 choices=["DEBUG", "INFO", "WARNING", "ERROR"], 

287 help="Logging level (default: INFO)", 

288 ) 

289 

290 args = parser.parse_args() 

291 

292 # Configure logging 

293 logging.basicConfig( 

294 level=getattr(logging, args.log_level), 

295 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

296 ) 

297 

298 # Run the server 

299 config_path = args.config or get_settings().config_path 

300 try: 

301 asyncio.run( 

302 run_server( 

303 config_path=config_path, 

304 host=args.host, 

305 port=args.port, 

306 ) 

307 ) 

308 except KeyboardInterrupt: 

309 logger.info("Server shutdown complete") 

310 sys.exit(0) 

311 except Exception as e: 

312 logger.error("Server failed: %s", e) 

313 sys.exit(1) 

314 

315 

316if __name__ == "__main__": 

317 main()