Coverage for mcpgateway / plugins / framework / external / grpc / server / runtime.py: 100%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/grpc/server/runtime.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7gRPC server runtime for external plugins.
9This module provides the entry point for running a gRPC server that exposes
10plugin functionality. It reuses the ExternalPluginServer for plugin loading
11and wraps it with gRPC servicers.
13Usage:
14 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\
15 --config plugins/config.yaml \\
16 --host 0.0.0.0 \\
17 --port 50051
19Environment Variables:
20 PLUGINS_CONFIG_PATH: Path to plugins configuration file (default: ./resources/plugins/config.yaml)
21 PLUGINS_GRPC_SERVER_HOST: Server host (default: 0.0.0.0)
22 PLUGINS_GRPC_SERVER_PORT: Server port (default: 50051)
23 PLUGINS_GRPC_SERVER_UDS: Unix domain socket path (alternative to host:port)
24 PLUGINS_GRPC_SERVER_SSL_ENABLED: Enable TLS (true/false). Required to enable TLS. Not supported with UDS.
25 PLUGINS_GRPC_SERVER_SSL_CERTFILE: Path to server certificate (required when SSL_ENABLED=true)
26 PLUGINS_GRPC_SERVER_SSL_KEYFILE: Path to server private key (required when SSL_ENABLED=true)
27 PLUGINS_GRPC_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification (for mTLS)
28 PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH: Client auth requirement (none/optional/require, default: require)
29"""
31# Standard
32import argparse
33import asyncio
34import logging
35import os
36import signal
37import sys
38from typing import Optional
40# Third-Party
41import grpc
43# First-Party
44from mcpgateway.plugins.framework.external.grpc.proto import plugin_service_pb2_grpc
45from mcpgateway.plugins.framework.external.grpc.server.server import GrpcHealthServicer, GrpcPluginServicer
46from mcpgateway.plugins.framework.external.grpc.tls_utils import create_server_credentials
47from mcpgateway.plugins.framework.external.mcp.server.server import ExternalPluginServer
48from mcpgateway.plugins.framework.models import GRPCServerConfig
49from mcpgateway.plugins.framework.settings import get_settings
51logger = logging.getLogger(__name__)
54class GrpcPluginRuntime:
55 """Runtime manager for the gRPC plugin server.
57 This class handles the lifecycle of the gRPC server, including:
58 - Plugin server initialization
59 - gRPC server setup and configuration
60 - TLS/mTLS configuration
61 - Graceful shutdown handling
63 Examples:
64 >>> runtime = GrpcPluginRuntime(config_path="plugins/config.yaml")
65 >>> # In async context:
66 >>> # await runtime.start()
67 """
69 def __init__(
70 self,
71 config_path: Optional[str] = None,
72 host: Optional[str] = None,
73 port: Optional[int] = None,
74 ) -> None:
75 """Initialize the gRPC plugin runtime.
77 Args:
78 config_path: Path to the plugins configuration file.
79 host: Server host to bind to (overrides config/env).
80 port: Server port to bind to (overrides config/env).
81 """
82 self._config_path = config_path
83 self._host_override = host
84 self._port_override = port
85 self._server: Optional[grpc.aio.Server] = None
86 self._plugin_server: Optional[ExternalPluginServer] = None
87 self._shutdown_event = asyncio.Event()
89 async def start(self) -> None:
90 """Start the gRPC plugin server.
92 This method:
93 1. Creates and initializes the ExternalPluginServer
94 2. Creates the gRPC server with servicers
95 3. Configures TLS if enabled
96 4. Starts serving requests
98 Raises:
99 RuntimeError: If server fails to start.
100 """
101 logger.info("Starting gRPC plugin server...")
103 # Create and initialize the plugin server
104 self._plugin_server = ExternalPluginServer(config_path=self._config_path)
105 await self._plugin_server.initialize()
107 # Get server configuration
108 server_config = self._get_server_config()
110 # Determine bind address (UDS takes precedence, then overrides, then config)
111 if server_config.uds:
112 address = server_config.get_bind_address()
113 is_uds = True
114 else:
115 host = self._host_override or server_config.host
116 port = self._port_override or server_config.port
117 address = f"{host}:{port}"
118 is_uds = False
120 # Create gRPC server
121 self._server = grpc.aio.server()
123 # Add servicers
124 plugin_servicer = GrpcPluginServicer(self._plugin_server)
125 health_servicer = GrpcHealthServicer(self._plugin_server)
127 plugin_service_pb2_grpc.add_PluginServiceServicer_to_server(plugin_servicer, self._server)
128 plugin_service_pb2_grpc.add_HealthServicer_to_server(health_servicer, self._server)
130 # Configure address and TLS (TLS not supported for Unix domain sockets)
131 if not is_uds and server_config.tls is not None:
132 credentials = create_server_credentials(server_config.tls)
133 self._server.add_secure_port(address, credentials)
134 logger.info("gRPC server configured with TLS on %s", address)
135 else:
136 self._server.add_insecure_port(address)
137 if is_uds:
138 logger.info("gRPC server configured on Unix socket %s", server_config.uds)
139 else:
140 logger.warning("gRPC server configured WITHOUT TLS on %s - not recommended for production", address)
142 # Start serving
143 await self._server.start()
145 # Set restrictive permissions on Unix socket (owner read/write only)
146 if is_uds and server_config.uds and os.path.exists(server_config.uds):
147 os.chmod(server_config.uds, 0o600)
149 logger.info("gRPC plugin server started on %s", address)
150 logger.info("Loaded %d plugins", len(await self._plugin_server.get_plugin_configs()))
152 # Wait for shutdown signal
153 await self._shutdown_event.wait()
155 async def stop(self) -> None:
156 """Stop the gRPC plugin server gracefully.
158 This method:
159 1. Stops accepting new connections
160 2. Waits for existing connections to complete (with timeout)
161 3. Shuts down the plugin server
162 """
163 logger.info("Stopping gRPC plugin server...")
165 if self._server:
166 # Stop accepting new connections
167 await self._server.stop(grace=5.0)
168 logger.info("gRPC server stopped")
170 if self._plugin_server:
171 await self._plugin_server.shutdown()
172 logger.info("Plugin server shutdown complete")
174 def request_shutdown(self) -> None:
175 """Request the server to shut down."""
176 self._shutdown_event.set()
178 def _get_server_config(self) -> GRPCServerConfig:
179 """Get the gRPC server configuration.
181 Checks the plugin configuration file first, then falls back to
182 environment variables, then uses defaults.
184 Returns:
185 GRPCServerConfig with server settings.
186 """
187 # Check if config has gRPC server settings
188 if self._plugin_server:
189 grpc_config = self._plugin_server.get_grpc_server_config()
190 if grpc_config:
191 return grpc_config
193 # Fall back to environment variables
194 env_config = GRPCServerConfig.from_env()
195 if env_config:
196 return env_config
198 # Use defaults
199 return GRPCServerConfig()
202async def run_server(
203 config_path: Optional[str] = None,
204 host: Optional[str] = None,
205 port: Optional[int] = None,
206) -> None:
207 """Run the gRPC plugin server.
209 Args:
210 config_path: Path to the plugins configuration file.
211 host: Server host to bind to.
212 port: Server port to bind to.
213 """
214 runtime = GrpcPluginRuntime(
215 config_path=config_path,
216 host=host,
217 port=port,
218 )
220 # Set up signal handlers for graceful shutdown
221 loop = asyncio.get_running_loop()
223 def signal_handler() -> None:
224 """Handle SIGINT/SIGTERM by requesting graceful shutdown."""
225 logger.info("Received shutdown signal")
226 runtime.request_shutdown()
228 for sig in (signal.SIGINT, signal.SIGTERM):
229 loop.add_signal_handler(sig, signal_handler)
231 try:
232 await runtime.start()
233 finally:
234 await runtime.stop()
237def main() -> None:
238 """Main entry point for the gRPC plugin server."""
239 parser = argparse.ArgumentParser(
240 description="gRPC server for ContextForge external plugins",
241 formatter_class=argparse.RawDescriptionHelpFormatter,
242 epilog="""
243Examples:
244 # Start with default settings
245 python -m mcpgateway.plugins.framework.external.grpc.server.runtime
247 # Start with custom config and port
248 python -m mcpgateway.plugins.framework.external.grpc.server.runtime \\
249 --config plugins/config.yaml --port 50051
251 # Start with TLS enabled (configure via environment variables)
252 PLUGINS_GRPC_SERVER_SSL_ENABLED=true \\
253 PLUGINS_GRPC_SERVER_SSL_CERTFILE=/path/to/server.pem \\
254 PLUGINS_GRPC_SERVER_SSL_KEYFILE=/path/to/server-key.pem \\
255 PLUGINS_GRPC_SERVER_SSL_CA_CERTS=/path/to/ca.pem \\
256 python -m mcpgateway.plugins.framework.external.grpc.server.runtime
257 """,
258 )
260 parser.add_argument(
261 "--config",
262 "-c",
263 type=str,
264 default=None,
265 help="Path to plugins configuration file",
266 )
267 parser.add_argument(
268 "--host",
269 "-H",
270 type=str,
271 default=None,
272 help="Server host to bind to (default: 0.0.0.0)",
273 )
274 parser.add_argument(
275 "--port",
276 "-p",
277 type=int,
278 default=None,
279 help="Server port to bind to (default: 50051)",
280 )
281 parser.add_argument(
282 "--log-level",
283 "-l",
284 type=str,
285 default="INFO",
286 choices=["DEBUG", "INFO", "WARNING", "ERROR"],
287 help="Logging level (default: INFO)",
288 )
290 args = parser.parse_args()
292 # Configure logging
293 logging.basicConfig(
294 level=getattr(logging, args.log_level),
295 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
296 )
298 # Run the server
299 config_path = args.config or get_settings().config_path
300 try:
301 asyncio.run(
302 run_server(
303 config_path=config_path,
304 host=args.host,
305 port=args.port,
306 )
307 )
308 except KeyboardInterrupt:
309 logger.info("Server shutdown complete")
310 sys.exit(0)
311 except Exception as e:
312 logger.error("Server failed: %s", e)
313 sys.exit(1)
316if __name__ == "__main__":
317 main()