Coverage for mcpgateway / plugins / framework / external / mcp / server / runtime.py: 100%
153 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""Location: ./mcpgateway/plugins/framework/external/mcp/server/runtime.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Fred Araujo, Teryl Taylor
8MCP Plugin Runtime using FastMCP with SSL/TLS support.
10This runtime does the following:
11- Uses FastMCP from the MCP Python SDK
12- Supports both mTLS and non-mTLS configurations
13- Reads configuration from PLUGINS_SERVER_* environment variables or uses configurations
14 the plugin config.yaml
15- Implements all plugin hook tools (get_plugin_configs, tool_pre_invoke, etc.)
17Examples:
18 Create an SSL-capable FastMCP server:
20 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
21 >>> config = MCPServerConfig(host="localhost", port=8000)
22 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
23 >>> server.settings.host
24 'localhost'
25 >>> server.settings.port
26 8000
28 Check SSL configuration returns empty dict when TLS is not configured:
30 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
31 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None)
32 >>> server = SSLCapableFastMCP(server_config=config, name="NoTLSServer")
33 >>> ssl_config = server._get_ssl_config()
34 >>> ssl_config
35 {}
37 Verify server configuration is accessible:
39 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
40 >>> config = MCPServerConfig(host="localhost", port=9000)
41 >>> server = SSLCapableFastMCP(server_config=config, name="ConfigTest")
42 >>> server.server_config.host
43 'localhost'
44 >>> server.server_config.port
45 9000
47 Settings are properly passed to FastMCP:
49 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
50 >>> config = MCPServerConfig(host="0.0.0.0", port=8080)
51 >>> server = SSLCapableFastMCP(server_config=config, name="SettingsTest")
52 >>> server.settings.host
53 '0.0.0.0'
54 >>> server.settings.port
55 8080
56"""
58# Standard
59import asyncio
60import logging
61import os
62import sys
63from typing import Any, Dict, Literal
65# Third-Party
66from fastapi import Response, status
67from mcp.server.fastmcp import FastMCP
68from mcp.server.transport_security import TransportSecuritySettings
69from prometheus_client import Gauge, generate_latest, REGISTRY
70import uvicorn
72# First-Party
73from mcpgateway.plugins.framework import ExternalPluginServer, MCPServerConfig
74from mcpgateway.plugins.framework.constants import GET_PLUGIN_CONFIG, GET_PLUGIN_CONFIGS, INVOKE_HOOK, MCP_SERVER_INSTRUCTIONS, MCP_SERVER_NAME
75from mcpgateway.plugins.framework.settings import get_transport_settings
77logger = logging.getLogger(__name__)
79SERVER: ExternalPluginServer | None = None
81PLUGIN_INFO = Gauge(
82 "plugin_info",
83 "Plugin server information",
84 ["server_name", "transport", "ssl_enabled"],
85 registry=REGISTRY,
86)
88# Module-level tool functions (extracted for testability)
91async def get_plugin_configs() -> list[dict]:
92 """Get the plugin configurations installed on the server.
94 Returns:
95 JSON string containing list of plugin configuration dictionaries.
97 Raises:
98 RuntimeError: If plugin server not initialized.
100 Examples:
101 Function raises RuntimeError when server is not initialized:
103 >>> import asyncio
104 >>> asyncio.run(get_plugin_configs()) # doctest: +SKIP
105 Traceback (most recent call last):
106 ...
107 RuntimeError: Plugin server not initialized
108 """
109 if not SERVER:
110 raise RuntimeError("Plugin server not initialized")
111 return await SERVER.get_plugin_configs()
114async def get_plugin_config(name: str) -> dict:
115 """Get the plugin configuration for a specific plugin.
117 Args:
118 name: The name of the plugin
120 Returns:
121 JSON string containing plugin configuration dictionary.
123 Raises:
124 RuntimeError: If plugin server not initialized.
126 Examples:
127 Function returns empty dict when result is None:
129 >>> result = None
130 >>> result if result is not None else {}
131 {}
132 """
133 if not SERVER:
134 raise RuntimeError("Plugin server not initialized")
135 result = await SERVER.get_plugin_config(name)
136 if result is None:
137 return {}
138 return result
141async def invoke_hook(hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any]) -> dict:
142 """Execute a hook for a plugin.
144 Args:
145 hook_type: The name or type of the hook.
146 plugin_name: The name of the plugin to execute
147 payload: The resource payload to be analyzed
148 context: Contextual information
150 Returns:
151 Result dictionary with payload, context and any error information.
153 Raises:
154 RuntimeError: If plugin server not initialized.
156 Examples:
157 Function raises RuntimeError when server is not initialized:
159 >>> import asyncio
160 >>> asyncio.run(invoke_hook("hook", "plugin", {}, {})) # doctest: +SKIP
161 Traceback (most recent call last):
162 ...
163 RuntimeError: Plugin server not initialized
164 """
165 if not SERVER:
166 raise RuntimeError("Plugin server not initialized")
167 return await SERVER.invoke_hook(hook_type, plugin_name, payload, context)
170class SSLCapableFastMCP(FastMCP):
171 """FastMCP server with SSL/TLS support using MCPServerConfig.
173 Examples:
174 Create an SSL-capable FastMCP server:
176 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
177 >>> config = MCPServerConfig(host="127.0.0.1", port=8000)
178 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
179 >>> server.settings.host
180 '127.0.0.1'
181 >>> server.settings.port
182 8000
183 """
185 def __init__(self, server_config: MCPServerConfig, *args, **kwargs):
186 """Initialize an SSL capable Fast MCP server.
188 Args:
189 server_config: the MCP server configuration including mTLS information.
190 *args: Additional positional arguments passed to FastMCP.
191 **kwargs: Additional keyword arguments passed to FastMCP.
193 Examples:
194 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
195 >>> config = MCPServerConfig(host="0.0.0.0", port=9000)
196 >>> server = SSLCapableFastMCP(server_config=config, name="PluginServer")
197 >>> server.server_config.host
198 '0.0.0.0'
199 >>> server.server_config.port
200 9000
201 """
202 # Load server config from environment
204 self.server_config = server_config
205 # Override FastMCP settings with our server config
206 if "host" not in kwargs:
207 kwargs["host"] = self.server_config.host
208 if "port" not in kwargs:
209 kwargs["port"] = self.server_config.port
210 if self.server_config.uds and kwargs.get("transport_security") is None:
211 kwargs["transport_security"] = TransportSecuritySettings(
212 enable_dns_rebinding_protection=True,
213 allowed_hosts=[
214 "127.0.0.1",
215 "localhost",
216 "[::1]",
217 "127.0.0.1:*",
218 "localhost:*",
219 "[::1]:*",
220 ],
221 allowed_origins=[
222 "http://127.0.0.1",
223 "http://localhost",
224 "http://[::1]",
225 "http://127.0.0.1:*",
226 "http://localhost:*",
227 "http://[::1]:*",
228 ],
229 )
231 super().__init__(*args, **kwargs)
233 def _get_ssl_config(self) -> dict:
234 """Build SSL configuration for uvicorn from MCPServerConfig.
236 Returns:
237 Dictionary of SSL configuration parameters for uvicorn.
239 Examples:
240 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
241 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None)
242 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
243 >>> ssl_config = server._get_ssl_config()
244 >>> ssl_config
245 {}
246 """
247 ssl_config = {}
249 if self.server_config.tls:
250 tls = self.server_config.tls
251 if tls.keyfile and tls.certfile:
252 ssl_config["ssl_keyfile"] = tls.keyfile
253 ssl_config["ssl_certfile"] = tls.certfile
255 if tls.ca_bundle:
256 ssl_config["ssl_ca_certs"] = tls.ca_bundle
258 ssl_config["ssl_cert_reqs"] = str(tls.ssl_cert_reqs)
260 if tls.keyfile_password:
261 ssl_config["ssl_keyfile_password"] = tls.keyfile_password
263 logger.info("SSL/TLS enabled (mTLS)")
264 logger.info(f" Key: {ssl_config['ssl_keyfile']}")
265 logger.info(f" Cert: {ssl_config['ssl_certfile']}")
266 if "ssl_ca_certs" in ssl_config:
267 logger.info(f" CA: {ssl_config['ssl_ca_certs']}")
268 logger.info(f" Client cert required: {ssl_config['ssl_cert_reqs'] == 2}")
269 else:
270 logger.warning("TLS config present but keyfile/certfile not configured")
271 else:
272 logger.info("SSL/TLS not enabled")
274 return ssl_config
276 async def _start_health_check_server(self, health_port: int) -> None:
277 """Start a simple HTTP-only health check server on a separate port.
279 This allows health checks to work even when the main server uses HTTPS/mTLS.
281 Args:
282 health_port: Port number for the health check server.
284 Examples:
285 Health check endpoint returns expected JSON response:
287 >>> import asyncio
288 >>> from starlette.responses import JSONResponse
289 >>> from starlette.requests import Request
290 >>> async def health_check(_request: Request):
291 ... return JSONResponse({"status": "healthy"})
292 >>> response = asyncio.run(health_check(None))
293 >>> response.status_code
294 200
295 """
296 # Third-Party
297 from starlette.applications import Starlette # pylint: disable=import-outside-toplevel
298 from starlette.requests import Request # pylint: disable=import-outside-toplevel
299 from starlette.routing import Route # pylint: disable=import-outside-toplevel
301 # First-Party
302 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel
304 async def health_check(_request: Request):
305 """Health check endpoint for container orchestration.
307 Returns:
308 JSON response with health status.
309 """
310 return ORJSONResponse({"status": "healthy"})
312 async def metrics_endpoint(_request: Request):
313 """Prometheus metrics endpoint.
315 Returns:
316 JSON response with health status.
318 """
319 metrics_data = generate_latest(REGISTRY)
320 return Response(content=metrics_data, media_type="text/plain; version=0.0.4")
322 async def metrics_disabled():
323 """Returns metrics response when metrics collection is disabled.
325 Returns:
326 Response: HTTP 503 response indicating metrics are disabled.
327 """
328 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
330 routes = [
331 Route("/health", health_check, methods=["GET"]),
332 ]
333 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true"
334 if enable_metrics:
335 routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"]))
336 else:
337 routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"]))
339 # Create a minimal Starlette app with only the health endpoint
340 health_app = Starlette(routes=routes)
342 logger.info(f"Starting HTTP health check server on {self.settings.host}:{health_port}")
343 config = uvicorn.Config(
344 app=health_app,
345 host=self.settings.host,
346 port=health_port,
347 log_level="warning", # Reduce noise from health checks
348 )
349 server = uvicorn.Server(config)
350 await server.serve()
352 async def run_streamable_http_async(self) -> None:
353 """Run the server using StreamableHTTP transport with optional SSL/TLS.
355 Examples:
356 Server uses configured host and port:
358 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
359 >>> config = MCPServerConfig(host="0.0.0.0", port=9000)
360 >>> server = SSLCapableFastMCP(server_config=config, name="HTTPServer")
361 >>> server.settings.host
362 '0.0.0.0'
363 >>> server.settings.port
364 9000
365 """
366 starlette_app = self.streamable_http_app()
368 # Add health check endpoint to main app
369 # Third-Party
370 from starlette.requests import Request # pylint: disable=import-outside-toplevel
371 from starlette.routing import Route # pylint: disable=import-outside-toplevel
373 # First-Party
374 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel
376 async def health_check(_request: Request):
377 """Health check endpoint for container orchestration.
379 Returns:
380 JSON response with health status.
381 """
382 return ORJSONResponse({"status": "healthy"})
384 # Add the health route to the Starlette app
385 starlette_app.routes.append(Route("/health", health_check, methods=["GET"]))
387 async def metrics_endpoint(_request: Request):
388 """Prometheus metrics endpoint.
390 Returns:
391 text response with metrics detail.
392 """
393 metrics_data = generate_latest(REGISTRY)
394 return Response(content=metrics_data, media_type="text/plain; version=0.0.4")
396 async def metrics_disabled():
397 """Returns metrics response when metrics collection is disabled.
399 Returns:
400 Response: HTTP 503 response indicating metrics are disabled.
401 """
402 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
404 # Add the metrics route to the Starlette app
405 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true"
406 if enable_metrics:
407 starlette_app.routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"]))
408 else:
409 starlette_app.routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"]))
411 # Build uvicorn config with optional SSL
412 ssl_config = self._get_ssl_config()
413 config_kwargs = {
414 "app": starlette_app,
415 "host": self.settings.host,
416 "port": self.settings.port,
417 "log_level": self.settings.log_level.lower(),
418 }
419 config_kwargs.update(ssl_config)
421 if self.server_config.uds:
422 config_kwargs.pop("host", None)
423 config_kwargs.pop("port", None)
424 config_kwargs["uds"] = self.server_config.uds
425 logger.info(f"Starting plugin server on unix socket {self.server_config.uds}")
426 else:
427 logger.info(f"Starting plugin server on {self.settings.host}:{self.settings.port}")
428 config = uvicorn.Config(**config_kwargs) # type: ignore[arg-type]
429 server = uvicorn.Server(config)
431 # If SSL is enabled, start a separate HTTP health check server
432 if ssl_config and not self.server_config.uds:
433 health_port = self.settings.port + 1000 # Use port+1000 for health checks
434 logger.info(f"SSL enabled - starting separate HTTP health check on port {health_port}")
435 # Run both servers concurrently
436 await asyncio.gather(server.serve(), self._start_health_check_server(health_port))
437 else:
438 # Just run the main server (health check is already on it)
439 await server.serve()
442async def run() -> None:
443 """Run the external plugin server with FastMCP.
445 Supports both stdio and HTTP transports. Auto-detects transport based on stdin
446 (if stdin is not a TTY, uses stdio mode), or you can explicitly set PLUGINS_TRANSPORT.
448 Reads configuration from PLUGINS_SERVER_* environment variables:
449 - PLUGINS_TRANSPORT: Transport type - 'stdio' or 'http' (default: auto-detect)
450 - PLUGINS_SERVER_HOST: Server host (default: 0.0.0.0) - HTTP mode only
451 - PLUGINS_SERVER_PORT: Server port (default: 8000) - HTTP mode only
452 - PLUGINS_SERVER_UDS: Unix domain socket path - HTTP mode only (no TLS)
453 - PLUGINS_SERVER_SSL_ENABLED: Enable SSL/TLS (true/false) - HTTP mode only
454 - PLUGINS_SERVER_SSL_KEYFILE: Path to server private key - HTTP mode only
455 - PLUGINS_SERVER_SSL_CERTFILE: Path to server certificate - HTTP mode only
456 - PLUGINS_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification - HTTP mode only
457 - PLUGINS_SERVER_SSL_CERT_REQS: Client cert requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED) - HTTP mode only
459 Raises:
460 Exception: If plugin server initialization or execution fails.
462 Examples:
463 SERVER module variable starts as None:
465 >>> SERVER is None
466 True
468 FastMCP server names are defined as constants:
470 >>> from mcpgateway.plugins.framework.constants import MCP_SERVER_NAME
471 >>> isinstance(MCP_SERVER_NAME, str)
472 True
473 >>> len(MCP_SERVER_NAME) > 0
474 True
475 """
476 global SERVER # pylint: disable=global-statement
478 # Initialize plugin server
479 SERVER = ExternalPluginServer()
481 if not await SERVER.initialize():
482 logger.error("Failed to initialize plugin server")
483 return
485 # Determine transport type from environment variable or auto-detect
486 # Auto-detect: if stdin is not a TTY (i.e., it's being piped), use stdio mode
487 # First-Party
488 transport = get_transport_settings().transport
489 if transport is None:
490 # Auto-detect based on stdin
491 if not sys.stdin.isatty():
492 transport = "stdio"
493 logger.info("Auto-detected stdio transport (stdin is not a TTY)")
494 else:
495 transport = "http"
496 else:
497 transport = transport.lower()
499 try:
500 if transport == "stdio":
501 # Create basic FastMCP server for stdio (no SSL support needed for stdio)
502 mcp = FastMCP(
503 name=MCP_SERVER_NAME,
504 instructions=MCP_SERVER_INSTRUCTIONS,
505 )
507 # Register module-level tool functions with FastMCP
508 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs)
509 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config)
510 mcp.tool(name=INVOKE_HOOK)(invoke_hook)
511 # set the plugin_info gauge on startup
512 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="stdio", ssl_enabled="false").set(1)
514 # Run with stdio transport
515 logger.info("Starting MCP plugin server with FastMCP (stdio transport)")
516 await mcp.run_stdio_async()
518 else: # http or streamablehttp
519 server_config: MCPServerConfig = SERVER.get_server_config()
520 # Create FastMCP server with SSL support
521 mcp = SSLCapableFastMCP(
522 server_config,
523 name=MCP_SERVER_NAME,
524 instructions=MCP_SERVER_INSTRUCTIONS,
525 )
527 # Register module-level tool functions with FastMCP
528 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs)
529 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config)
530 mcp.tool(name=INVOKE_HOOK)(invoke_hook)
531 # set the plugin_info gauge on startup
532 ssl_enabled: Literal["true", "false"] = "true" if server_config and server_config.tls is not None else "false"
533 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="http", ssl_enabled=ssl_enabled).set(1)
534 if server_config:
535 logger.info(f"Prometheus metrics available at http://{server_config.host}:{server_config.port}/metrics/prometheus")
536 # Run with streamable-http transport
537 logger.info("Starting MCP plugin server with FastMCP (HTTP transport)")
538 await mcp.run_streamable_http_async()
540 except Exception:
541 logger.exception("Caught error while executing plugin server")
542 raise
543 finally:
544 await SERVER.shutdown()
547if __name__ == "__main__":
548 asyncio.run(run())