Coverage for mcpgateway / plugins / framework / external / mcp / server / runtime.py: 100%
152 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""Location: ./mcpgateway/plugins/framework/external/mcp/server/runtime.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Fred Araujo, Teryl Taylor
8MCP Plugin Runtime using FastMCP with SSL/TLS support.
10This runtime does the following:
11- Uses FastMCP from the MCP Python SDK
12- Supports both mTLS and non-mTLS configurations
13- Reads configuration from PLUGINS_SERVER_* environment variables or uses configurations
14 the plugin config.yaml
15- Implements all plugin hook tools (get_plugin_configs, tool_pre_invoke, etc.)
17Examples:
18 Create an SSL-capable FastMCP server:
20 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
21 >>> config = MCPServerConfig(host="localhost", port=8000)
22 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
23 >>> server.settings.host
24 'localhost'
25 >>> server.settings.port
26 8000
28 Check SSL configuration returns empty dict when TLS is not configured:
30 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
31 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None)
32 >>> server = SSLCapableFastMCP(server_config=config, name="NoTLSServer")
33 >>> ssl_config = server._get_ssl_config()
34 >>> ssl_config
35 {}
37 Verify server configuration is accessible:
39 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
40 >>> config = MCPServerConfig(host="localhost", port=9000)
41 >>> server = SSLCapableFastMCP(server_config=config, name="ConfigTest")
42 >>> server.server_config.host
43 'localhost'
44 >>> server.server_config.port
45 9000
47 Settings are properly passed to FastMCP:
49 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
50 >>> config = MCPServerConfig(host="0.0.0.0", port=8080)
51 >>> server = SSLCapableFastMCP(server_config=config, name="SettingsTest")
52 >>> server.settings.host
53 '0.0.0.0'
54 >>> server.settings.port
55 8080
56"""
58# Standard
59import asyncio
60import logging
61import os
62import sys
63from typing import Any, Dict, Literal
65# Third-Party
66from fastapi import Response, status
67from mcp.server.fastmcp import FastMCP
68from mcp.server.transport_security import TransportSecuritySettings
69from prometheus_client import Gauge, generate_latest, REGISTRY
70import uvicorn
72# First-Party
73from mcpgateway.plugins.framework import ExternalPluginServer, MCPServerConfig
74from mcpgateway.plugins.framework.constants import GET_PLUGIN_CONFIG, GET_PLUGIN_CONFIGS, INVOKE_HOOK, MCP_SERVER_INSTRUCTIONS, MCP_SERVER_NAME
76logger = logging.getLogger(__name__)
78SERVER: ExternalPluginServer | None = None
80PLUGIN_INFO = Gauge(
81 "plugin_info",
82 "Plugin server information",
83 ["server_name", "transport", "ssl_enabled"],
84 registry=REGISTRY,
85)
87# Module-level tool functions (extracted for testability)
90async def get_plugin_configs() -> list[dict]:
91 """Get the plugin configurations installed on the server.
93 Returns:
94 JSON string containing list of plugin configuration dictionaries.
96 Raises:
97 RuntimeError: If plugin server not initialized.
99 Examples:
100 Function raises RuntimeError when server is not initialized:
102 >>> import asyncio
103 >>> asyncio.run(get_plugin_configs()) # doctest: +SKIP
104 Traceback (most recent call last):
105 ...
106 RuntimeError: Plugin server not initialized
107 """
108 if not SERVER:
109 raise RuntimeError("Plugin server not initialized")
110 return await SERVER.get_plugin_configs()
113async def get_plugin_config(name: str) -> dict:
114 """Get the plugin configuration for a specific plugin.
116 Args:
117 name: The name of the plugin
119 Returns:
120 JSON string containing plugin configuration dictionary.
122 Raises:
123 RuntimeError: If plugin server not initialized.
125 Examples:
126 Function returns empty dict when result is None:
128 >>> result = None
129 >>> result if result is not None else {}
130 {}
131 """
132 if not SERVER:
133 raise RuntimeError("Plugin server not initialized")
134 result = await SERVER.get_plugin_config(name)
135 if result is None:
136 return {}
137 return result
140async def invoke_hook(hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any]) -> dict:
141 """Execute a hook for a plugin.
143 Args:
144 hook_type: The name or type of the hook.
145 plugin_name: The name of the plugin to execute
146 payload: The resource payload to be analyzed
147 context: Contextual information
149 Returns:
150 Result dictionary with payload, context and any error information.
152 Raises:
153 RuntimeError: If plugin server not initialized.
155 Examples:
156 Function raises RuntimeError when server is not initialized:
158 >>> import asyncio
159 >>> asyncio.run(invoke_hook("hook", "plugin", {}, {})) # doctest: +SKIP
160 Traceback (most recent call last):
161 ...
162 RuntimeError: Plugin server not initialized
163 """
164 if not SERVER:
165 raise RuntimeError("Plugin server not initialized")
166 return await SERVER.invoke_hook(hook_type, plugin_name, payload, context)
169class SSLCapableFastMCP(FastMCP):
170 """FastMCP server with SSL/TLS support using MCPServerConfig.
172 Examples:
173 Create an SSL-capable FastMCP server:
175 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
176 >>> config = MCPServerConfig(host="127.0.0.1", port=8000)
177 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
178 >>> server.settings.host
179 '127.0.0.1'
180 >>> server.settings.port
181 8000
182 """
184 def __init__(self, server_config: MCPServerConfig, *args, **kwargs):
185 """Initialize an SSL capable Fast MCP server.
187 Args:
188 server_config: the MCP server configuration including mTLS information.
189 *args: Additional positional arguments passed to FastMCP.
190 **kwargs: Additional keyword arguments passed to FastMCP.
192 Examples:
193 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
194 >>> config = MCPServerConfig(host="0.0.0.0", port=9000)
195 >>> server = SSLCapableFastMCP(server_config=config, name="PluginServer")
196 >>> server.server_config.host
197 '0.0.0.0'
198 >>> server.server_config.port
199 9000
200 """
201 # Load server config from environment
203 self.server_config = server_config
204 # Override FastMCP settings with our server config
205 if "host" not in kwargs:
206 kwargs["host"] = self.server_config.host
207 if "port" not in kwargs:
208 kwargs["port"] = self.server_config.port
209 if self.server_config.uds and kwargs.get("transport_security") is None:
210 kwargs["transport_security"] = TransportSecuritySettings(
211 enable_dns_rebinding_protection=True,
212 allowed_hosts=[
213 "127.0.0.1",
214 "localhost",
215 "[::1]",
216 "127.0.0.1:*",
217 "localhost:*",
218 "[::1]:*",
219 ],
220 allowed_origins=[
221 "http://127.0.0.1",
222 "http://localhost",
223 "http://[::1]",
224 "http://127.0.0.1:*",
225 "http://localhost:*",
226 "http://[::1]:*",
227 ],
228 )
230 super().__init__(*args, **kwargs)
232 def _get_ssl_config(self) -> dict:
233 """Build SSL configuration for uvicorn from MCPServerConfig.
235 Returns:
236 Dictionary of SSL configuration parameters for uvicorn.
238 Examples:
239 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
240 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None)
241 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer")
242 >>> ssl_config = server._get_ssl_config()
243 >>> ssl_config
244 {}
245 """
246 ssl_config = {}
248 if self.server_config.tls:
249 tls = self.server_config.tls
250 if tls.keyfile and tls.certfile:
251 ssl_config["ssl_keyfile"] = tls.keyfile
252 ssl_config["ssl_certfile"] = tls.certfile
254 if tls.ca_bundle:
255 ssl_config["ssl_ca_certs"] = tls.ca_bundle
257 ssl_config["ssl_cert_reqs"] = str(tls.ssl_cert_reqs)
259 if tls.keyfile_password:
260 ssl_config["ssl_keyfile_password"] = tls.keyfile_password
262 logger.info("SSL/TLS enabled (mTLS)")
263 logger.info(f" Key: {ssl_config['ssl_keyfile']}")
264 logger.info(f" Cert: {ssl_config['ssl_certfile']}")
265 if "ssl_ca_certs" in ssl_config:
266 logger.info(f" CA: {ssl_config['ssl_ca_certs']}")
267 logger.info(f" Client cert required: {ssl_config['ssl_cert_reqs'] == 2}")
268 else:
269 logger.warning("TLS config present but keyfile/certfile not configured")
270 else:
271 logger.info("SSL/TLS not enabled")
273 return ssl_config
275 async def _start_health_check_server(self, health_port: int) -> None:
276 """Start a simple HTTP-only health check server on a separate port.
278 This allows health checks to work even when the main server uses HTTPS/mTLS.
280 Args:
281 health_port: Port number for the health check server.
283 Examples:
284 Health check endpoint returns expected JSON response:
286 >>> import asyncio
287 >>> from starlette.responses import JSONResponse
288 >>> from starlette.requests import Request
289 >>> async def health_check(_request: Request):
290 ... return JSONResponse({"status": "healthy"})
291 >>> response = asyncio.run(health_check(None))
292 >>> response.status_code
293 200
294 """
295 # Third-Party
296 from starlette.applications import Starlette # pylint: disable=import-outside-toplevel
297 from starlette.requests import Request # pylint: disable=import-outside-toplevel
298 from starlette.routing import Route # pylint: disable=import-outside-toplevel
300 # First-Party
301 from mcpgateway.utils.orjson_response import ORJSONResponse # pylint: disable=import-outside-toplevel
303 async def health_check(_request: Request):
304 """Health check endpoint for container orchestration.
306 Returns:
307 JSON response with health status.
308 """
309 return ORJSONResponse({"status": "healthy"})
311 async def metrics_endpoint(_request: Request):
312 """Prometheus metrics endpoint.
314 Returns:
315 JSON response with health status.
317 """
318 metrics_data = generate_latest(REGISTRY)
319 return Response(content=metrics_data, media_type="text/plain; version=0.0.4")
321 async def metrics_disabled():
322 """Returns metrics response when metrics collection is disabled.
324 Returns:
325 Response: HTTP 503 response indicating metrics are disabled.
326 """
327 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
329 routes = [
330 Route("/health", health_check, methods=["GET"]),
331 ]
332 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true"
333 if enable_metrics:
334 routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"]))
335 else:
336 routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"]))
338 # Create a minimal Starlette app with only the health endpoint
339 health_app = Starlette(routes=routes)
341 logger.info(f"Starting HTTP health check server on {self.settings.host}:{health_port}")
342 config = uvicorn.Config(
343 app=health_app,
344 host=self.settings.host,
345 port=health_port,
346 log_level="warning", # Reduce noise from health checks
347 )
348 server = uvicorn.Server(config)
349 await server.serve()
351 async def run_streamable_http_async(self) -> None:
352 """Run the server using StreamableHTTP transport with optional SSL/TLS.
354 Examples:
355 Server uses configured host and port:
357 >>> from mcpgateway.plugins.framework.models import MCPServerConfig
358 >>> config = MCPServerConfig(host="0.0.0.0", port=9000)
359 >>> server = SSLCapableFastMCP(server_config=config, name="HTTPServer")
360 >>> server.settings.host
361 '0.0.0.0'
362 >>> server.settings.port
363 9000
364 """
365 starlette_app = self.streamable_http_app()
367 # Add health check endpoint to main app
368 # Third-Party
369 from starlette.requests import Request # pylint: disable=import-outside-toplevel
370 from starlette.routing import Route # pylint: disable=import-outside-toplevel
372 # First-Party
373 from mcpgateway.utils.orjson_response import ORJSONResponse # pylint: disable=import-outside-toplevel
375 async def health_check(_request: Request):
376 """Health check endpoint for container orchestration.
378 Returns:
379 JSON response with health status.
380 """
381 return ORJSONResponse({"status": "healthy"})
383 # Add the health route to the Starlette app
384 starlette_app.routes.append(Route("/health", health_check, methods=["GET"]))
386 async def metrics_endpoint(_request: Request):
387 """Prometheus metrics endpoint.
389 Returns:
390 text response with metrics detail.
391 """
392 metrics_data = generate_latest(REGISTRY)
393 return Response(content=metrics_data, media_type="text/plain; version=0.0.4")
395 async def metrics_disabled():
396 """Returns metrics response when metrics collection is disabled.
398 Returns:
399 Response: HTTP 503 response indicating metrics are disabled.
400 """
401 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
403 # Add the metrics route to the Starlette app
404 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true"
405 if enable_metrics:
406 starlette_app.routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"]))
407 else:
408 starlette_app.routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"]))
410 # Build uvicorn config with optional SSL
411 ssl_config = self._get_ssl_config()
412 config_kwargs = {
413 "app": starlette_app,
414 "host": self.settings.host,
415 "port": self.settings.port,
416 "log_level": self.settings.log_level.lower(),
417 }
418 config_kwargs.update(ssl_config)
420 if self.server_config.uds:
421 config_kwargs.pop("host", None)
422 config_kwargs.pop("port", None)
423 config_kwargs["uds"] = self.server_config.uds
424 logger.info(f"Starting plugin server on unix socket {self.server_config.uds}")
425 else:
426 logger.info(f"Starting plugin server on {self.settings.host}:{self.settings.port}")
427 config = uvicorn.Config(**config_kwargs) # type: ignore[arg-type]
428 server = uvicorn.Server(config)
430 # If SSL is enabled, start a separate HTTP health check server
431 if ssl_config and not self.server_config.uds:
432 health_port = self.settings.port + 1000 # Use port+1000 for health checks
433 logger.info(f"SSL enabled - starting separate HTTP health check on port {health_port}")
434 # Run both servers concurrently
435 await asyncio.gather(server.serve(), self._start_health_check_server(health_port))
436 else:
437 # Just run the main server (health check is already on it)
438 await server.serve()
441async def run() -> None:
442 """Run the external plugin server with FastMCP.
444 Supports both stdio and HTTP transports. Auto-detects transport based on stdin
445 (if stdin is not a TTY, uses stdio mode), or you can explicitly set PLUGINS_TRANSPORT.
447 Reads configuration from PLUGINS_SERVER_* environment variables:
448 - PLUGINS_TRANSPORT: Transport type - 'stdio' or 'http' (default: auto-detect)
449 - PLUGINS_SERVER_HOST: Server host (default: 0.0.0.0) - HTTP mode only
450 - PLUGINS_SERVER_PORT: Server port (default: 8000) - HTTP mode only
451 - PLUGINS_SERVER_UDS: Unix domain socket path - HTTP mode only (no TLS)
452 - PLUGINS_SERVER_SSL_ENABLED: Enable SSL/TLS (true/false) - HTTP mode only
453 - PLUGINS_SERVER_SSL_KEYFILE: Path to server private key - HTTP mode only
454 - PLUGINS_SERVER_SSL_CERTFILE: Path to server certificate - HTTP mode only
455 - PLUGINS_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification - HTTP mode only
456 - PLUGINS_SERVER_SSL_CERT_REQS: Client cert requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED) - HTTP mode only
458 Raises:
459 Exception: If plugin server initialization or execution fails.
461 Examples:
462 SERVER module variable starts as None:
464 >>> SERVER is None
465 True
467 FastMCP server names are defined as constants:
469 >>> from mcpgateway.plugins.framework.constants import MCP_SERVER_NAME
470 >>> isinstance(MCP_SERVER_NAME, str)
471 True
472 >>> len(MCP_SERVER_NAME) > 0
473 True
474 """
475 global SERVER # pylint: disable=global-statement
477 # Initialize plugin server
478 SERVER = ExternalPluginServer()
480 if not await SERVER.initialize():
481 logger.error("Failed to initialize plugin server")
482 return
484 # Determine transport type from environment variable or auto-detect
485 # Auto-detect: if stdin is not a TTY (i.e., it's being piped), use stdio mode
486 transport = os.environ.get("PLUGINS_TRANSPORT", None)
487 if transport is None:
488 # Auto-detect based on stdin
489 if not sys.stdin.isatty():
490 transport = "stdio"
491 logger.info("Auto-detected stdio transport (stdin is not a TTY)")
492 else:
493 transport = "http"
494 else:
495 transport = transport.lower()
497 try:
498 if transport == "stdio":
499 # Create basic FastMCP server for stdio (no SSL support needed for stdio)
500 mcp = FastMCP(
501 name=MCP_SERVER_NAME,
502 instructions=MCP_SERVER_INSTRUCTIONS,
503 )
505 # Register module-level tool functions with FastMCP
506 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs)
507 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config)
508 mcp.tool(name=INVOKE_HOOK)(invoke_hook)
509 # set the plugin_info gauge on startup
510 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="stdio", ssl_enabled="false").set(1)
512 # Run with stdio transport
513 logger.info("Starting MCP plugin server with FastMCP (stdio transport)")
514 await mcp.run_stdio_async()
516 else: # http or streamablehttp
517 server_config: MCPServerConfig = SERVER.get_server_config()
518 # Create FastMCP server with SSL support
519 mcp = SSLCapableFastMCP(
520 server_config,
521 name=MCP_SERVER_NAME,
522 instructions=MCP_SERVER_INSTRUCTIONS,
523 )
525 # Register module-level tool functions with FastMCP
526 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs)
527 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config)
528 mcp.tool(name=INVOKE_HOOK)(invoke_hook)
529 # set the plugin_info gauge on startup
530 ssl_enabled: Literal["true", "false"] = "true" if server_config and server_config.tls is not None else "false"
531 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="http", ssl_enabled=ssl_enabled).set(1)
532 if server_config:
533 logger.info(f"Prometheus metrics available at http://{server_config.host}:{server_config.port}/metrics/prometheus")
534 # Run with streamable-http transport
535 logger.info("Starting MCP plugin server with FastMCP (HTTP transport)")
536 await mcp.run_streamable_http_async()
538 except Exception:
539 logger.exception("Caught error while executing plugin server")
540 raise
541 finally:
542 await SERVER.shutdown()
545if __name__ == "__main__":
546 asyncio.run(run())