Coverage for mcpgateway / plugins / framework / external / mcp / server / runtime.py: 100%
163 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""Location: ./mcpgateway/plugins/framework/external/mcp/server/runtime.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Fred Araujo, Teryl Taylor
8MCP Plugin Runtime using FastMCP with SSL/TLS support.
10This runtime does the following:
11- Uses FastMCP from the MCP Python SDK
12- Supports both mTLS and non-mTLS configurations
13- Reads configuration from PLUGINS_SERVER_* environment variables or uses configurations
14 the plugin config.yaml
15- Implements all plugin hook tools (get_plugin_configs, tool_pre_invoke, etc.)
17Examples:
18 Create an SSL-capable FastMCP server:
20 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
21 >>> config = MCPServerConfig(host="localhost", port=8000)
22 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
23 >>> server.settings.host
24 'localhost'
25 >>> server.settings.port
26 8000
28 Check SSL configuration returns empty dict when TLS is not configured:
30 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
31 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None)
32 >>> server = SSLCapableFastMCP(server_config=config, name="NoTLSServer")
33 >>> ssl_config = server._get_ssl_config()
34 >>> ssl_config
35 {}
37 Verify server configuration is accessible:
39 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
40 >>> config = MCPServerConfig(host="localhost", port=9000)
41 >>> server = SSLCapableFastMCP(server_config=config, name="ConfigTest")
42 >>> server.server_config.host
43 'localhost'
44 >>> server.server_config.port
45 9000
47 Settings are properly passed to FastMCP:
49 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
50 >>> config = MCPServerConfig(host="0.0.0.0", port=8080)
51 >>> server = SSLCapableFastMCP(server_config=config, name="SettingsTest")
52 >>> server.settings.host
53 '0.0.0.0'
54 >>> server.settings.port
55 8080
56"""
58# Standard
59import asyncio
60import logging
61import os
62import sys
63from typing import Any, Dict, Literal
65# Third-Party
66from fastapi import Response, status
67from mcp.server.fastmcp import FastMCP
68from mcp.server.transport_security import TransportSecuritySettings
69from prometheus_client import Gauge, generate_latest, REGISTRY
70import uvicorn
72# First-Party
73from mcpgateway.plugins.framework import ExternalPluginServer, MCPServerConfig
74from mcpgateway.plugins.framework.constants import GET_PLUGIN_CONFIG, GET_PLUGIN_CONFIGS, INVOKE_HOOK, MCP_SERVER_INSTRUCTIONS, MCP_SERVER_NAME
75from mcpgateway.plugins.framework.settings import get_transport_settings
77logger = logging.getLogger(__name__)
79SERVER: ExternalPluginServer | None = None
81PLUGIN_INFO = Gauge(
82 "plugin_info",
83 "Plugin server information",
84 ["server_name", "transport", "ssl_enabled"],
85 registry=REGISTRY,
86)
88# Module-level tool functions (extracted for testability)
91async def get_plugin_configs() -> list[dict]:
92 """Get the plugin configurations installed on the server.
94 Returns:
95 JSON string containing list of plugin configuration dictionaries.
97 Raises:
98 RuntimeError: If plugin server not initialized.
100 Examples:
101 Function raises RuntimeError when server is not initialized:
103 >>> import asyncio
104 >>> asyncio.run(get_plugin_configs()) # doctest: +SKIP
105 Traceback (most recent call last):
106 ...
107 RuntimeError: Plugin server not initialized
108 """
109 if not SERVER:
110 raise RuntimeError("Plugin server not initialized")
111 return await SERVER.get_plugin_configs()
114async def get_plugin_config(name: str) -> dict:
115 """Get the plugin configuration for a specific plugin.
117 Args:
118 name: The name of the plugin
120 Returns:
121 JSON string containing plugin configuration dictionary.
123 Raises:
124 RuntimeError: If plugin server not initialized.
126 Examples:
127 Function returns empty dict when result is None:
129 >>> result = None
130 >>> result if result is not None else {}
131 {}
132 """
133 if not SERVER:
134 raise RuntimeError("Plugin server not initialized")
135 result = await SERVER.get_plugin_config(name)
136 if result is None:
137 return {}
138 return result
141async def invoke_hook(hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any]) -> dict:
142 """Execute a hook for a plugin.
144 Args:
145 hook_type: The name or type of the hook.
146 plugin_name: The name of the plugin to execute
147 payload: The resource payload to be analyzed
148 context: Contextual information
150 Returns:
151 Result dictionary with payload, context and any error information.
153 Raises:
154 RuntimeError: If plugin server not initialized.
156 Examples:
157 Function raises RuntimeError when server is not initialized:
159 >>> import asyncio
160 >>> asyncio.run(invoke_hook("hook", "plugin", {}, {})) # doctest: +SKIP
161 Traceback (most recent call last):
162 ...
163 RuntimeError: Plugin server not initialized
164 """
165 if not SERVER:
166 raise RuntimeError("Plugin server not initialized")
167 return await SERVER.invoke_hook(hook_type, plugin_name, payload, context)
170def _should_trace_plugin_server_path(path: str) -> bool:
171 """Trace streamable MCP traffic while excluding health and metrics endpoints.
173 Args:
174 path: Incoming plugin server request path.
176 Returns:
177 bool: ``True`` when the request path should be traced.
178 """
180 normalized = path.rstrip("/") or "/"
181 return normalized not in {"/health", "/metrics/prometheus"}
184class SSLCapableFastMCP(FastMCP):
185 """FastMCP server with SSL/TLS support using MCPServerConfig.
187 Examples:
188 Create an SSL-capable FastMCP server:
190 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
191 >>> config = MCPServerConfig(host="127.0.0.1", port=8000)
192 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
193 >>> server.settings.host
194 '127.0.0.1'
195 >>> server.settings.port
196 8000
197 """
199 def __init__(self, server_config: MCPServerConfig, *args, **kwargs):
200 """Initialize an SSL capable Fast MCP server.
202 Args:
203 server_config: the MCP server configuration including mTLS information.
204 *args: Additional positional arguments passed to FastMCP.
205 **kwargs: Additional keyword arguments passed to FastMCP.
207 Examples:
208 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
209 >>> config = MCPServerConfig(host="0.0.0.0", port=9000)
210 >>> server = SSLCapableFastMCP(server_config=config, name="PluginServer")
211 >>> server.server_config.host
212 '0.0.0.0'
213 >>> server.server_config.port
214 9000
215 """
216 # Load server config from environment
218 self.server_config = server_config
219 # Override FastMCP settings with our server config
220 if "host" not in kwargs:
221 kwargs["host"] = self.server_config.host
222 if "port" not in kwargs:
223 kwargs["port"] = self.server_config.port
224 if self.server_config.uds and kwargs.get("transport_security") is None:
225 kwargs["transport_security"] = TransportSecuritySettings(
226 enable_dns_rebinding_protection=True,
227 allowed_hosts=[
228 "127.0.0.1",
229 "localhost",
230 "[::1]",
231 "127.0.0.1:*",
232 "localhost:*",
233 "[::1]:*",
234 ],
235 allowed_origins=[
236 "http://127.0.0.1",
237 "http://localhost",
238 "http://[::1]",
239 "http://127.0.0.1:*",
240 "http://localhost:*",
241 "http://[::1]:*",
242 ],
243 )
245 super().__init__(*args, **kwargs)
247 def _get_ssl_config(self) -> dict:
248 """Build SSL configuration for uvicorn from MCPServerConfig.
250 Returns:
251 Dictionary of SSL configuration parameters for uvicorn.
253 Examples:
254 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
255 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None)
256 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
257 >>> ssl_config = server._get_ssl_config()
258 >>> ssl_config
259 {}
260 """
261 ssl_config = {}
263 if self.server_config.tls:
264 tls = self.server_config.tls
265 if tls.keyfile and tls.certfile:
266 ssl_config["ssl_keyfile"] = tls.keyfile
267 ssl_config["ssl_certfile"] = tls.certfile
269 if tls.ca_bundle:
270 ssl_config["ssl_ca_certs"] = tls.ca_bundle
272 ssl_config["ssl_cert_reqs"] = str(tls.ssl_cert_reqs)
274 if tls.keyfile_password:
275 ssl_config["ssl_keyfile_password"] = tls.keyfile_password
277 logger.info("SSL/TLS enabled (mTLS)")
278 logger.info(f" Key: {ssl_config['ssl_keyfile']}")
279 logger.info(f" Cert: {ssl_config['ssl_certfile']}")
280 if "ssl_ca_certs" in ssl_config:
281 logger.info(f" CA: {ssl_config['ssl_ca_certs']}")
282 logger.info(f" Client cert required: {ssl_config['ssl_cert_reqs'] == 2}")
283 else:
284 logger.warning("TLS config present but keyfile/certfile not configured")
285 else:
286 logger.info("SSL/TLS not enabled")
288 return ssl_config
290 async def _start_health_check_server(self, health_port: int) -> None:
291 """Start a simple HTTP-only health check server on a separate port.
293 This allows health checks to work even when the main server uses HTTPS/mTLS.
295 Args:
296 health_port: Port number for the health check server.
298 Examples:
299 Health check endpoint returns expected JSON response:
301 >>> import asyncio
302 >>> from starlette.responses import JSONResponse
303 >>> from starlette.requests import Request
304 >>> async def health_check(_request: Request):
305 ... return JSONResponse({"status": "healthy"})
306 >>> response = asyncio.run(health_check(None))
307 >>> response.status_code
308 200
309 """
310 # Third-Party
311 from starlette.applications import Starlette # pylint: disable=import-outside-toplevel
312 from starlette.requests import Request # pylint: disable=import-outside-toplevel
313 from starlette.routing import Route # pylint: disable=import-outside-toplevel
315 # First-Party
316 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel
318 async def health_check(_request: Request):
319 """Health check endpoint for container orchestration.
321 Returns:
322 JSON response with health status.
323 """
324 return ORJSONResponse({"status": "healthy"})
326 async def metrics_endpoint(_request: Request):
327 """Prometheus metrics endpoint.
329 Returns:
330 JSON response with health status.
332 """
333 metrics_data = generate_latest(REGISTRY)
334 return Response(content=metrics_data, media_type="text/plain; version=0.0.4")
336 async def metrics_disabled():
337 """Returns metrics response when metrics collection is disabled.
339 Returns:
340 Response: HTTP 503 response indicating metrics are disabled.
341 """
342 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
344 routes = [
345 Route("/health", health_check, methods=["GET"]),
346 ]
347 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true"
348 if enable_metrics:
349 routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"]))
350 else:
351 routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"]))
353 # Create a minimal Starlette app with only the health endpoint
354 health_app = Starlette(routes=routes)
356 logger.info(f"Starting HTTP health check server on {self.settings.host}:{health_port}")
357 config = uvicorn.Config(
358 app=health_app,
359 host=self.settings.host,
360 port=health_port,
361 log_level="warning", # Reduce noise from health checks
362 )
363 server = uvicorn.Server(config)
364 await server.serve()
366 async def run_streamable_http_async(self) -> None:
367 """Run the server using StreamableHTTP transport with optional SSL/TLS.
369 Examples:
370 Server uses configured host and port:
372 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
373 >>> config = MCPServerConfig(host="0.0.0.0", port=9000)
374 >>> server = SSLCapableFastMCP(server_config=config, name="HTTPServer")
375 >>> server.settings.host
376 '0.0.0.0'
377 >>> server.settings.port
378 9000
379 """
380 starlette_app = self.streamable_http_app()
382 # Add health check endpoint to main app
383 # Third-Party
384 from starlette.requests import Request # pylint: disable=import-outside-toplevel
385 from starlette.routing import Route # pylint: disable=import-outside-toplevel
387 # First-Party
388 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel
390 async def health_check(_request: Request):
391 """Health check endpoint for container orchestration.
393 Returns:
394 JSON response with health status.
395 """
396 return ORJSONResponse({"status": "healthy"})
398 # Add the health route to the Starlette app
399 starlette_app.routes.append(Route("/health", health_check, methods=["GET"]))
401 async def metrics_endpoint(_request: Request):
402 """Prometheus metrics endpoint.
404 Returns:
405 text response with metrics detail.
406 """
407 metrics_data = generate_latest(REGISTRY)
408 return Response(content=metrics_data, media_type="text/plain; version=0.0.4")
410 async def metrics_disabled():
411 """Returns metrics response when metrics collection is disabled.
413 Returns:
414 Response: HTTP 503 response indicating metrics are disabled.
415 """
416 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
418 # Add the metrics route to the Starlette app
419 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true"
420 if enable_metrics:
421 starlette_app.routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"]))
422 else:
423 starlette_app.routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"]))
425 app_to_serve: Any = starlette_app
426 if os.getenv("OTEL_ENABLE_OBSERVABILITY", "false").lower() == "true":
427 # Set service name BEFORE importing observability module
428 # The tracer is initialized at import time, so env vars must be set first
429 os.environ.setdefault("OTEL_SERVICE_NAME", MCP_SERVER_NAME)
431 # First-Party
432 from mcpgateway.observability import init_telemetry, OpenTelemetryRequestMiddleware, otel_tracing_enabled # pylint: disable=import-outside-toplevel
434 init_telemetry()
435 if otel_tracing_enabled():
436 app_to_serve = OpenTelemetryRequestMiddleware(starlette_app, should_trace_request_path=_should_trace_plugin_server_path)
438 # Build uvicorn config with optional SSL
439 ssl_config = self._get_ssl_config()
440 config_kwargs = {
441 "app": app_to_serve,
442 "host": self.settings.host,
443 "port": self.settings.port,
444 "log_level": self.settings.log_level.lower(),
445 }
446 config_kwargs.update(ssl_config)
448 if self.server_config.uds:
449 config_kwargs.pop("host", None)
450 config_kwargs.pop("port", None)
451 config_kwargs["uds"] = self.server_config.uds
452 logger.info(f"Starting plugin server on unix socket {self.server_config.uds}")
453 else:
454 logger.info(f"Starting plugin server on {self.settings.host}:{self.settings.port}")
455 config = uvicorn.Config(**config_kwargs) # type: ignore[arg-type]
456 server = uvicorn.Server(config)
458 # If SSL is enabled, start a separate HTTP health check server
459 if ssl_config and not self.server_config.uds:
460 health_port = self.settings.port + 1000 # Use port+1000 for health checks
461 logger.info(f"SSL enabled - starting separate HTTP health check on port {health_port}")
462 # Run both servers concurrently
463 await asyncio.gather(server.serve(), self._start_health_check_server(health_port))
464 else:
465 # Just run the main server (health check is already on it)
466 await server.serve()
469async def run() -> None:
470 """Run the external plugin server with FastMCP.
472 Supports both stdio and HTTP transports. Auto-detects transport based on stdin
473 (if stdin is not a TTY, uses stdio mode), or you can explicitly set PLUGINS_TRANSPORT.
475 Reads configuration from PLUGINS_SERVER_* environment variables:
476 - PLUGINS_TRANSPORT: Transport type - 'stdio' or 'http' (default: auto-detect)
477 - PLUGINS_SERVER_HOST: Server host (default: 0.0.0.0) - HTTP mode only
478 - PLUGINS_SERVER_PORT: Server port (default: 8000) - HTTP mode only
479 - PLUGINS_SERVER_UDS: Unix domain socket path - HTTP mode only (no TLS)
480 - PLUGINS_SERVER_SSL_ENABLED: Enable SSL/TLS (true/false) - HTTP mode only
481 - PLUGINS_SERVER_SSL_KEYFILE: Path to server private key - HTTP mode only
482 - PLUGINS_SERVER_SSL_CERTFILE: Path to server certificate - HTTP mode only
483 - PLUGINS_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification - HTTP mode only
484 - PLUGINS_SERVER_SSL_CERT_REQS: Client cert requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED) - HTTP mode only
486 Raises:
487 Exception: If plugin server initialization or execution fails.
489 Examples:
490 SERVER module variable starts as None:
492 >>> SERVER is None
493 True
495 FastMCP server names are defined as constants:
497 >>> from mcpgateway.plugins.framework.constants import MCP_SERVER_NAME
498 >>> isinstance(MCP_SERVER_NAME, str)
499 True
500 >>> len(MCP_SERVER_NAME) > 0
501 True
502 """
503 global SERVER # pylint: disable=global-statement
505 # Initialize plugin server
506 SERVER = ExternalPluginServer()
508 if not await SERVER.initialize():
509 logger.error("Failed to initialize plugin server")
510 return
512 # Determine transport type from environment variable or auto-detect
513 # Auto-detect: if stdin is not a TTY (i.e., it's being piped), use stdio mode
514 # First-Party
515 transport = get_transport_settings().transport
516 if transport is None:
517 # Auto-detect based on stdin
518 if not sys.stdin.isatty():
519 transport = "stdio"
520 logger.info("Auto-detected stdio transport (stdin is not a TTY)")
521 else:
522 transport = "http"
523 else:
524 transport = transport.lower()
526 try:
527 if transport == "stdio":
528 # Create basic FastMCP server for stdio (no SSL support needed for stdio)
529 mcp = FastMCP(
530 name=MCP_SERVER_NAME,
531 instructions=MCP_SERVER_INSTRUCTIONS,
532 )
534 # Register module-level tool functions with FastMCP
535 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs)
536 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config)
537 mcp.tool(name=INVOKE_HOOK)(invoke_hook)
538 # set the plugin_info gauge on startup
539 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="stdio", ssl_enabled="false").set(1)
541 # Run with stdio transport
542 logger.info("Starting MCP plugin server with FastMCP (stdio transport)")
543 await mcp.run_stdio_async()
545 else: # http or streamablehttp
546 server_config: MCPServerConfig = SERVER.get_server_config()
547 # Create FastMCP server with SSL support
548 mcp = SSLCapableFastMCP(
549 server_config,
550 name=MCP_SERVER_NAME,
551 instructions=MCP_SERVER_INSTRUCTIONS,
552 )
554 # Register module-level tool functions with FastMCP
555 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs)
556 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config)
557 mcp.tool(name=INVOKE_HOOK)(invoke_hook)
558 # set the plugin_info gauge on startup
559 ssl_enabled: Literal["true", "false"] = "true" if server_config and server_config.tls is not None else "false"
560 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="http", ssl_enabled=ssl_enabled).set(1)
561 if server_config:
562 logger.info(f"Prometheus metrics available at http://{server_config.host}:{server_config.port}/metrics/prometheus")
563 # Run with streamable-http transport
564 logger.info("Starting MCP plugin server with FastMCP (HTTP transport)")
565 await mcp.run_streamable_http_async()
567 except Exception:
568 logger.exception("Caught error while executing plugin server")
569 raise
570 finally:
571 await SERVER.shutdown()
574if __name__ == "__main__":
575 asyncio.run(run())