Coverage for mcpgateway / plugins / framework / external / grpc / server / runtime.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/runtime.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7gRPC server runtime for external plugins.
9This module provides the entry point for running a gRPC server that exposes
10plugin functionality. It reuses the ExternalPluginServer for plugin loading
11and wraps it with gRPC servicers.
13Usage:
14 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\
15 --config plugins/config.yaml \\
16 --host 0.0.0.0 \\
17 --port 50051
19Environment Variables:
20 PLUGINS_CONFIG_PATH: Path to plugins configuration file (default: ./resources/plugins/config.yaml)
21 PLUGINS_GRPC_SERVER_HOST: Server host (default: 0.0.0.0)
22 PLUGINS_GRPC_SERVER_PORT: Server port (default: 50051)
23 PLUGINS_GRPC_SERVER_UDS: Unix domain socket path (alternative to host:port)
24 PLUGINS_GRPC_SERVER_SSL_ENABLED: Enable TLS (true/false). Required to enable TLS. Not supported with UDS.
25 PLUGINS_GRPC_SERVER_SSL_CERTFILE: Path to server certificate (required when SSL_ENABLED=true)
26 PLUGINS_GRPC_SERVER_SSL_KEYFILE: Path to server private key (required when SSL_ENABLED=true)
27 PLUGINS_GRPC_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification (for mTLS)
28 PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH: Client auth requirement (none/optional/require, default: require)
29"""
31# Standard
32import argparse
33import asyncio
34import logging
35import os
36import signal
37import sys
38from typing import Optional
40# Third-Party
41import grpc
43# First-Party
44from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2_grpc
45from mcpgateway.plugins.framework.external.grpc.server.server import GrpcHealthServicer, GrpcPluginServicer
46from mcpgateway.plugins.framework.external.grpc.tls_utils import create_server_credentials
47from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer
48from mcpgateway.plugins.framework.models import GRPCServerConfig
50logger = logging.getLogger(__name__)
53class GrpcPluginRuntime:
54 """Runtime manager for the gRPC plugin server.
56 This class handles the lifecycle of the gRPC server, including:
57 - Plugin server initialization
58 - gRPC server setup and configuration
59 - TLS/mTLS configuration
60 - Graceful shutdown handling
62 Examples:
63 >>> runtime = GrpcPluginRuntime(config_path="plugins/config.yaml")
64 >>> # In async context:
65 >>> # await runtime.start()
66 """
68 def __init__(
69 self,
70 config_path: Optional[str] = None,
71 host: Optional[str] = None,
72 port: Optional[int] = None,
73 ) -> None:
74 """Initialize the gRPC plugin runtime.
76 Args:
77 config_path: Path to the plugins configuration file.
78 host: Server host to bind to (overrides config/env).
79 port: Server port to bind to (overrides config/env).
80 """
81 self._config_path = config_path
82 self._host_override = host
83 self._port_override = port
84 self._server: Optional[grpc.aio.Server] = None
85 self._plugin_server: Optional[ExternalPluginServer] = None
86 self._shutdown_event = asyncio.Event()
88 async def start(self) -> None:
89 """Start the gRPC plugin server.
91 This method:
92 1. Creates and initializes the ExternalPluginServer
93 2. Creates the gRPC server with servicers
94 3. Configures TLS if enabled
95 4. Starts serving requests
97 Raises:
98 RuntimeError: If server fails to start.
99 """
100 logger.info("Starting gRPC plugin server...")
102 # Create and initialize the plugin server
103 self._plugin_server = ExternalPluginServer(config_path=self._config_path)
104 await self._plugin_server.initialize()
106 # Get server configuration
107 server_config = self._get_server_config()
109 # Determine bind address (UDS takes precedence, then overrides, then config)
110 if server_config.uds:
111 address = server_config.get_bind_address()
112 is_uds = True
113 else:
114 host = self._host_override or server_config.host
115 port = self._port_override or server_config.port
116 address = f"{host}:{port}"
117 is_uds = False
119 # Create gRPC server
120 self._server = grpc.aio.server()
122 # Add servicers
123 plugin_servicer = GrpcPluginServicer(self._plugin_server)
124 health_servicer = GrpcHealthServicer(self._plugin_server)
126 plugin_service_pb2_grpc.add_PluginServiceServicer_to_server(plugin_servicer, self._server)
127 plugin_service_pb2_grpc.add_HealthServicer_to_server(health_servicer, self._server)
129 # Configure address and TLS (TLS not supported for Unix domain sockets)
130 if not is_uds and server_config.tls is not None:
131 credentials = create_server_credentials(server_config.tls)
132 self._server.add_secure_port(address, credentials)
133 logger.info("gRPC server configured with TLS on %s", address)
134 else:
135 self._server.add_insecure_port(address)
136 if is_uds:
137 logger.info("gRPC server configured on Unix socket %s", server_config.uds)
138 else:
139 logger.warning("gRPC server configured WITHOUT TLS on %s - not recommended for production", address)
141 # Start serving
142 await self._server.start()
144 # Set restrictive permissions on Unix socket (owner read/write only)
145 if is_uds and server_config.uds and os.path.exists(server_config.uds):
146 os.chmod(server_config.uds, 0o600)
148 logger.info("gRPC plugin server started on %s", address)
149 logger.info("Loaded %d plugins", len(await self._plugin_server.get_plugin_configs()))
151 # Wait for shutdown signal
152 await self._shutdown_event.wait()
154 async def stop(self) -> None:
155 """Stop the gRPC plugin server gracefully.
157 This method:
158 1. Stops accepting new connections
159 2. Waits for existing connections to complete (with timeout)
160 3. Shuts down the plugin server
161 """
162 logger.info("Stopping gRPC plugin server...")
164 if self._server:
165 # Stop accepting new connections
166 await self._server.stop(grace=5.0)
167 logger.info("gRPC server stopped")
169 if self._plugin_server:
170 await self._plugin_server.shutdown()
171 logger.info("Plugin server shutdown complete")
173 def request_shutdown(self) -> None:
174 """Request the server to shut down."""
175 self._shutdown_event.set()
177 def _get_server_config(self) -> GRPCServerConfig:
178 """Get the gRPC server configuration.
180 Checks the plugin configuration file first, then falls back to
181 environment variables, then uses defaults.
183 Returns:
184 GRPCServerConfig with server settings.
185 """
186 # Check if config has gRPC server settings
187 if self._plugin_server:
188 grpc_config = self._plugin_server.get_grpc_server_config()
189 if grpc_config:
190 return grpc_config
192 # Fall back to environment variables
193 env_config = GRPCServerConfig.from_env()
194 if env_config:
195 return env_config
197 # Use defaults
198 return GRPCServerConfig()
201async def run_server(
202 config_path: Optional[str] = None,
203 host: Optional[str] = None,
204 port: Optional[int] = None,
205) -> None:
206 """Run the gRPC plugin server.
208 Args:
209 config_path: Path to the plugins configuration file.
210 host: Server host to bind to.
211 port: Server port to bind to.
212 """
213 runtime = GrpcPluginRuntime(
214 config_path=config_path,
215 host=host,
216 port=port,
217 )
219 # Set up signal handlers for graceful shutdown
220 loop = asyncio.get_running_loop()
222 def signal_handler() -> None:
223 """Handle SIGINT/SIGTERM by requesting graceful shutdown."""
224 logger.info("Received shutdown signal")
225 runtime.request_shutdown()
227 for sig in (signal.SIGINT, signal.SIGTERM):
228 loop.add_signal_handler(sig, signal_handler)
230 try:
231 await runtime.start()
232 finally:
233 await runtime.stop()
236def main() -> None:
237 """Main entry point for the gRPC plugin server."""
238 parser = argparse.ArgumentParser(
239 description="gRPC server for MCP Gateway external plugins",
240 formatter_class=argparse.RawDescriptionHelpFormatter,
241 epilog="""
242Examples:
243 # Start with default settings
244 python -m mcpgateway.plugins.framework.external.grpc.server.runtime
246 # Start with custom config and port
247 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\
248 --config plugins/config.yaml --port 50051
250 # Start with TLS enabled (configure via environment variables)
251 PLUGINS_GRPC_SERVER_SSL_ENABLED=true \\
252 PLUGINS_GRPC_SERVER_SSL_CERTFILE=/path/to/server.pem \\
253 PLUGINS_GRPC_SERVER_SSL_KEYFILE=/path/to/server-key.pem \\
254 PLUGINS_GRPC_SERVER_SSL_CA_CERTS=/path/to/ca.pem \\
255 python -m mcpgateway.plugins.framework.external.grpc.server.runtime
256 """,
257 )
259 parser.add_argument(
260 "--config",
261 "-c",
262 type=str,
263 default=os.environ.get("PLUGINS_CONFIG_PATH"),
264 help="Path to plugins configuration file",
265 )
266 parser.add_argument(
267 "--host",
268 "-H",
269 type=str,
270 default=None,
271 help="Server host to bind to (default: 0.0.0.0)",
272 )
273 parser.add_argument(
274 "--port",
275 "-p",
276 type=int,
277 default=None,
278 help="Server port to bind to (default: 50051)",
279 )
280 parser.add_argument(
281 "--log-level",
282 "-l",
283 type=str,
284 default="INFO",
285 choices=["DEBUG", "INFO", "WARNING", "ERROR"],
286 help="Logging level (default: INFO)",
287 )
289 args = parser.parse_args()
291 # Configure logging
292 logging.basicConfig(
293 level=getattr(logging, args.log_level),
294 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
295 )
297 # Run the server
298 try:
299 asyncio.run(
300 run_server(
301 config_path=args.config,
302 host=args.host,
303 port=args.port,
304 )
305 )
306 except KeyboardInterrupt:
307 logger.info("Server shutdown complete")
308 sys.exit(0)
309 except Exception as e:
310 logger.error("Server failed: %s", e)
311 sys.exit(1)
314if __name__ == "__main__":
315 main()