Coverage for mcpgateway / plugins / framework / external / mcp / server / runtime.py: 100%

163 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/plugins/framework/external/mcp/server/runtime.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Fred Araujo, Teryl Taylor 

7 

8MCP Plugin Runtime using FastMCP with SSL/TLS support. 

9 

10This runtime does the following: 

11- Uses FastMCP from the MCP Python SDK 

12- Supports both mTLS and non-mTLS configurations 

13- Reads configuration from PLUGINS_SERVER_* environment variables or uses configurations 

14 the plugin config.yaml 

15- Implements all plugin hook tools (get_plugin_configs, tool_pre_invoke, etc.) 

16 

17Examples: 

18 Create an SSL-capable FastMCP server: 

19 

20 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

21 >>> config = MCPServerConfig(host="localhost", port=8000) 

22 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

23 >>> server.settings.host 

24 'localhost' 

25 >>> server.settings.port 

26 8000 

27 

28 Check SSL configuration returns empty dict when TLS is not configured: 

29 

30 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

31 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None) 

32 >>> server = SSLCapableFastMCP(server_config=config, name="NoTLSServer") 

33 >>> ssl_config = server._get_ssl_config() 

34 >>> ssl_config 

35 {} 

36 

37 Verify server configuration is accessible: 

38 

39 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

40 >>> config = MCPServerConfig(host="localhost", port=9000) 

41 >>> server = SSLCapableFastMCP(server_config=config, name="ConfigTest") 

42 >>> server.server_config.host 

43 'localhost' 

44 >>> server.server_config.port 

45 9000 

46 

47 Settings are properly passed to FastMCP: 

48 

49 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

50 >>> config = MCPServerConfig(host="0.0.0.0", port=8080) 

51 >>> server = SSLCapableFastMCP(server_config=config, name="SettingsTest") 

52 >>> server.settings.host 

53 '0.0.0.0' 

54 >>> server.settings.port 

55 8080 

56""" 

57 

58# Standard 

59import asyncio 

60import logging 

61import os 

62import sys 

63from typing import Any, Dict, Literal 

64 

65# Third-Party 

66from fastapi import Response, status 

67from mcp.server.fastmcp import FastMCP 

68from mcp.server.transport_security import TransportSecuritySettings 

69from prometheus_client import Gauge, generate_latest, REGISTRY 

70import uvicorn 

71 

72# First-Party 

73from mcpgateway.plugins.framework import ExternalPluginServer, MCPServerConfig 

74from mcpgateway.plugins.framework.constants import GET_PLUGIN_CONFIG, GET_PLUGIN_CONFIGS, INVOKE_HOOK, MCP_SERVER_INSTRUCTIONS, MCP_SERVER_NAME 

75from mcpgateway.plugins.framework.settings import get_transport_settings 

76 

77logger = logging.getLogger(__name__) 

78 

79SERVER: ExternalPluginServer | None = None 

80 

81PLUGIN_INFO = Gauge( 

82 "plugin_info", 

83 "Plugin server information", 

84 ["server_name", "transport", "ssl_enabled"], 

85 registry=REGISTRY, 

86) 

87 

88# Module-level tool functions (extracted for testability) 

89 

90 

91async def get_plugin_configs() -> list[dict]: 

92 """Get the plugin configurations installed on the server. 

93 

94 Returns: 

95 JSON string containing list of plugin configuration dictionaries. 

96 

97 Raises: 

98 RuntimeError: If plugin server not initialized. 

99 

100 Examples: 

101 Function raises RuntimeError when server is not initialized: 

102 

103 >>> import asyncio 

104 >>> asyncio.run(get_plugin_configs()) # doctest: +SKIP 

105 Traceback (most recent call last): 

106 ... 

107 RuntimeError: Plugin server not initialized 

108 """ 

109 if not SERVER: 

110 raise RuntimeError("Plugin server not initialized") 

111 return await SERVER.get_plugin_configs() 

112 

113 

114async def get_plugin_config(name: str) -> dict: 

115 """Get the plugin configuration for a specific plugin. 

116 

117 Args: 

118 name: The name of the plugin 

119 

120 Returns: 

121 JSON string containing plugin configuration dictionary. 

122 

123 Raises: 

124 RuntimeError: If plugin server not initialized. 

125 

126 Examples: 

127 Function returns empty dict when result is None: 

128 

129 >>> result = None 

130 >>> result if result is not None else {} 

131 {} 

132 """ 

133 if not SERVER: 

134 raise RuntimeError("Plugin server not initialized") 

135 result = await SERVER.get_plugin_config(name) 

136 if result is None: 

137 return {} 

138 return result 

139 

140 

141async def invoke_hook(hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any]) -> dict: 

142 """Execute a hook for a plugin. 

143 

144 Args: 

145 hook_type: The name or type of the hook. 

146 plugin_name: The name of the plugin to execute 

147 payload: The resource payload to be analyzed 

148 context: Contextual information 

149 

150 Returns: 

151 Result dictionary with payload, context and any error information. 

152 

153 Raises: 

154 RuntimeError: If plugin server not initialized. 

155 

156 Examples: 

157 Function raises RuntimeError when server is not initialized: 

158 

159 >>> import asyncio 

160 >>> asyncio.run(invoke_hook("hook", "plugin", {}, {})) # doctest: +SKIP 

161 Traceback (most recent call last): 

162 ... 

163 RuntimeError: Plugin server not initialized 

164 """ 

165 if not SERVER: 

166 raise RuntimeError("Plugin server not initialized") 

167 return await SERVER.invoke_hook(hook_type, plugin_name, payload, context) 

168 

169 

170def _should_trace_plugin_server_path(path: str) -> bool: 

171 """Trace streamable MCP traffic while excluding health and metrics endpoints. 

172 

173 Args: 

174 path: Incoming plugin server request path. 

175 

176 Returns: 

177 bool: ``True`` when the request path should be traced. 

178 """ 

179 

180 normalized = path.rstrip("/") or "/" 

181 return normalized not in {"/health", "/metrics/prometheus"} 

182 

183 

184class SSLCapableFastMCP(FastMCP): 

185 """FastMCP server with SSL/TLS support using MCPServerConfig. 

186 

187 Examples: 

188 Create an SSL-capable FastMCP server: 

189 

190 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

191 >>> config = MCPServerConfig(host="127.0.0.1", port=8000) 

192 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

193 >>> server.settings.host 

194 '127.0.0.1' 

195 >>> server.settings.port 

196 8000 

197 """ 

198 

199 def __init__(self, server_config: MCPServerConfig, *args, **kwargs): 

200 """Initialize an SSL capable Fast MCP server. 

201 

202 Args: 

203 server_config: the MCP server configuration including mTLS information. 

204 *args: Additional positional arguments passed to FastMCP. 

205 **kwargs: Additional keyword arguments passed to FastMCP. 

206 

207 Examples: 

208 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

209 >>> config = MCPServerConfig(host="0.0.0.0", port=9000) 

210 >>> server = SSLCapableFastMCP(server_config=config, name="PluginServer") 

211 >>> server.server_config.host 

212 '0.0.0.0' 

213 >>> server.server_config.port 

214 9000 

215 """ 

216 # Load server config from environment 

217 

218 self.server_config = server_config 

219 # Override FastMCP settings with our server config 

220 if "host" not in kwargs: 

221 kwargs["host"] = self.server_config.host 

222 if "port" not in kwargs: 

223 kwargs["port"] = self.server_config.port 

224 if self.server_config.uds and kwargs.get("transport_security") is None: 

225 kwargs["transport_security"] = TransportSecuritySettings( 

226 enable_dns_rebinding_protection=True, 

227 allowed_hosts=[ 

228 "127.0.0.1", 

229 "localhost", 

230 "[::1]", 

231 "127.0.0.1:*", 

232 "localhost:*", 

233 "[::1]:*", 

234 ], 

235 allowed_origins=[ 

236 "http://127.0.0.1", 

237 "http://localhost", 

238 "http://[::1]", 

239 "http://127.0.0.1:*", 

240 "http://localhost:*", 

241 "http://[::1]:*", 

242 ], 

243 ) 

244 

245 super().__init__(*args, **kwargs) 

246 

247 def _get_ssl_config(self) -> dict: 

248 """Build SSL configuration for uvicorn from MCPServerConfig. 

249 

250 Returns: 

251 Dictionary of SSL configuration parameters for uvicorn. 

252 

253 Examples: 

254 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

255 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None) 

256 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

257 >>> ssl_config = server._get_ssl_config() 

258 >>> ssl_config 

259 {} 

260 """ 

261 ssl_config = {} 

262 

263 if self.server_config.tls: 

264 tls = self.server_config.tls 

265 if tls.keyfile and tls.certfile: 

266 ssl_config["ssl_keyfile"] = tls.keyfile 

267 ssl_config["ssl_certfile"] = tls.certfile 

268 

269 if tls.ca_bundle: 

270 ssl_config["ssl_ca_certs"] = tls.ca_bundle 

271 

272 ssl_config["ssl_cert_reqs"] = str(tls.ssl_cert_reqs) 

273 

274 if tls.keyfile_password: 

275 ssl_config["ssl_keyfile_password"] = tls.keyfile_password 

276 

277 logger.info("SSL/TLS enabled (mTLS)") 

278 logger.info(f" Key: {ssl_config['ssl_keyfile']}") 

279 logger.info(f" Cert: {ssl_config['ssl_certfile']}") 

280 if "ssl_ca_certs" in ssl_config: 

281 logger.info(f" CA: {ssl_config['ssl_ca_certs']}") 

282 logger.info(f" Client cert required: {ssl_config['ssl_cert_reqs'] == 2}") 

283 else: 

284 logger.warning("TLS config present but keyfile/certfile not configured") 

285 else: 

286 logger.info("SSL/TLS not enabled") 

287 

288 return ssl_config 

289 

290 async def _start_health_check_server(self, health_port: int) -> None: 

291 """Start a simple HTTP-only health check server on a separate port. 

292 

293 This allows health checks to work even when the main server uses HTTPS/mTLS. 

294 

295 Args: 

296 health_port: Port number for the health check server. 

297 

298 Examples: 

299 Health check endpoint returns expected JSON response: 

300 

301 >>> import asyncio 

302 >>> from starlette.responses import JSONResponse 

303 >>> from starlette.requests import Request 

304 >>> async def health_check(_request: Request): 

305 ... return JSONResponse({"status": "healthy"}) 

306 >>> response = asyncio.run(health_check(None)) 

307 >>> response.status_code 

308 200 

309 """ 

310 # Third-Party 

311 from starlette.applications import Starlette # pylint: disable=import-outside-toplevel 

312 from starlette.requests import Request # pylint: disable=import-outside-toplevel 

313 from starlette.routing import Route # pylint: disable=import-outside-toplevel 

314 

315 # First-Party 

316 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel 

317 

318 async def health_check(_request: Request): 

319 """Health check endpoint for container orchestration. 

320 

321 Returns: 

322 JSON response with health status. 

323 """ 

324 return ORJSONResponse({"status": "healthy"}) 

325 

326 async def metrics_endpoint(_request: Request): 

327 """Prometheus metrics endpoint. 

328 

329 Returns: 

330 JSON response with health status. 

331 

332 """ 

333 metrics_data = generate_latest(REGISTRY) 

334 return Response(content=metrics_data, media_type="text/plain; version=0.0.4") 

335 

336 async def metrics_disabled(): 

337 """Returns metrics response when metrics collection is disabled. 

338 

339 Returns: 

340 Response: HTTP 503 response indicating metrics are disabled. 

341 """ 

342 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) 

343 

344 routes = [ 

345 Route("/health", health_check, methods=["GET"]), 

346 ] 

347 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true" 

348 if enable_metrics: 

349 routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"])) 

350 else: 

351 routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"])) 

352 

353 # Create a minimal Starlette app with only the health endpoint 

354 health_app = Starlette(routes=routes) 

355 

356 logger.info(f"Starting HTTP health check server on {self.settings.host}:{health_port}") 

357 config = uvicorn.Config( 

358 app=health_app, 

359 host=self.settings.host, 

360 port=health_port, 

361 log_level="warning", # Reduce noise from health checks 

362 ) 

363 server = uvicorn.Server(config) 

364 await server.serve() 

365 

366 async def run_streamable_http_async(self) -> None: 

367 """Run the server using StreamableHTTP transport with optional SSL/TLS. 

368 

369 Examples: 

370 Server uses configured host and port: 

371 

372 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

373 >>> config = MCPServerConfig(host="0.0.0.0", port=9000) 

374 >>> server = SSLCapableFastMCP(server_config=config, name="HTTPServer") 

375 >>> server.settings.host 

376 '0.0.0.0' 

377 >>> server.settings.port 

378 9000 

379 """ 

380 starlette_app = self.streamable_http_app() 

381 

382 # Add health check endpoint to main app 

383 # Third-Party 

384 from starlette.requests import Request # pylint: disable=import-outside-toplevel 

385 from starlette.routing import Route # pylint: disable=import-outside-toplevel 

386 

387 # First-Party 

388 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel 

389 

390 async def health_check(_request: Request): 

391 """Health check endpoint for container orchestration. 

392 

393 Returns: 

394 JSON response with health status. 

395 """ 

396 return ORJSONResponse({"status": "healthy"}) 

397 

398 # Add the health route to the Starlette app 

399 starlette_app.routes.append(Route("/health", health_check, methods=["GET"])) 

400 

401 async def metrics_endpoint(_request: Request): 

402 """Prometheus metrics endpoint. 

403 

404 Returns: 

405 text response with metrics detail. 

406 """ 

407 metrics_data = generate_latest(REGISTRY) 

408 return Response(content=metrics_data, media_type="text/plain; version=0.0.4") 

409 

410 async def metrics_disabled(): 

411 """Returns metrics response when metrics collection is disabled. 

412 

413 Returns: 

414 Response: HTTP 503 response indicating metrics are disabled. 

415 """ 

416 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) 

417 

418 # Add the metrics route to the Starlette app 

419 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true" 

420 if enable_metrics: 

421 starlette_app.routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"])) 

422 else: 

423 starlette_app.routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"])) 

424 

425 app_to_serve: Any = starlette_app 

426 if os.getenv("OTEL_ENABLE_OBSERVABILITY", "false").lower() == "true": 

427 # Set service name BEFORE importing observability module 

428 # The tracer is initialized at import time, so env vars must be set first 

429 os.environ.setdefault("OTEL_SERVICE_NAME", MCP_SERVER_NAME) 

430 

431 # First-Party 

432 from mcpgateway.observability import init_telemetry, OpenTelemetryRequestMiddleware, otel_tracing_enabled # pylint: disable=import-outside-toplevel 

433 

434 init_telemetry() 

435 if otel_tracing_enabled(): 

436 app_to_serve = OpenTelemetryRequestMiddleware(starlette_app, should_trace_request_path=_should_trace_plugin_server_path) 

437 

438 # Build uvicorn config with optional SSL 

439 ssl_config = self._get_ssl_config() 

440 config_kwargs = { 

441 "app": app_to_serve, 

442 "host": self.settings.host, 

443 "port": self.settings.port, 

444 "log_level": self.settings.log_level.lower(), 

445 } 

446 config_kwargs.update(ssl_config) 

447 

448 if self.server_config.uds: 

449 config_kwargs.pop("host", None) 

450 config_kwargs.pop("port", None) 

451 config_kwargs["uds"] = self.server_config.uds 

452 logger.info(f"Starting plugin server on unix socket {self.server_config.uds}") 

453 else: 

454 logger.info(f"Starting plugin server on {self.settings.host}:{self.settings.port}") 

455 config = uvicorn.Config(**config_kwargs) # type: ignore[arg-type] 

456 server = uvicorn.Server(config) 

457 

458 # If SSL is enabled, start a separate HTTP health check server 

459 if ssl_config and not self.server_config.uds: 

460 health_port = self.settings.port + 1000 # Use port+1000 for health checks 

461 logger.info(f"SSL enabled - starting separate HTTP health check on port {health_port}") 

462 # Run both servers concurrently 

463 await asyncio.gather(server.serve(), self._start_health_check_server(health_port)) 

464 else: 

465 # Just run the main server (health check is already on it) 

466 await server.serve() 

467 

468 

469async def run() -> None: 

470 """Run the external plugin server with FastMCP. 

471 

472 Supports both stdio and HTTP transports. Auto-detects transport based on stdin 

473 (if stdin is not a TTY, uses stdio mode), or you can explicitly set PLUGINS_TRANSPORT. 

474 

475 Reads configuration from PLUGINS_SERVER_* environment variables: 

476 - PLUGINS_TRANSPORT: Transport type - 'stdio' or 'http' (default: auto-detect) 

477 - PLUGINS_SERVER_HOST: Server host (default: 0.0.0.0) - HTTP mode only 

478 - PLUGINS_SERVER_PORT: Server port (default: 8000) - HTTP mode only 

479 - PLUGINS_SERVER_UDS: Unix domain socket path - HTTP mode only (no TLS) 

480 - PLUGINS_SERVER_SSL_ENABLED: Enable SSL/TLS (true/false) - HTTP mode only 

481 - PLUGINS_SERVER_SSL_KEYFILE: Path to server private key - HTTP mode only 

482 - PLUGINS_SERVER_SSL_CERTFILE: Path to server certificate - HTTP mode only 

483 - PLUGINS_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification - HTTP mode only 

484 - PLUGINS_SERVER_SSL_CERT_REQS: Client cert requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED) - HTTP mode only 

485 

486 Raises: 

487 Exception: If plugin server initialization or execution fails. 

488 

489 Examples: 

490 SERVER module variable starts as None: 

491 

492 >>> SERVER is None 

493 True 

494 

495 FastMCP server names are defined as constants: 

496 

497 >>> from mcpgateway.plugins.framework.constants import MCP_SERVER_NAME 

498 >>> isinstance(MCP_SERVER_NAME, str) 

499 True 

500 >>> len(MCP_SERVER_NAME) > 0 

501 True 

502 """ 

503 global SERVER # pylint: disable=global-statement 

504 

505 # Initialize plugin server 

506 SERVER = ExternalPluginServer() 

507 

508 if not await SERVER.initialize(): 

509 logger.error("Failed to initialize plugin server") 

510 return 

511 

512 # Determine transport type from environment variable or auto-detect 

513 # Auto-detect: if stdin is not a TTY (i.e., it's being piped), use stdio mode 

514 # First-Party 

515 transport = get_transport_settings().transport 

516 if transport is None: 

517 # Auto-detect based on stdin 

518 if not sys.stdin.isatty(): 

519 transport = "stdio" 

520 logger.info("Auto-detected stdio transport (stdin is not a TTY)") 

521 else: 

522 transport = "http" 

523 else: 

524 transport = transport.lower() 

525 

526 try: 

527 if transport == "stdio": 

528 # Create basic FastMCP server for stdio (no SSL support needed for stdio) 

529 mcp = FastMCP( 

530 name=MCP_SERVER_NAME, 

531 instructions=MCP_SERVER_INSTRUCTIONS, 

532 ) 

533 

534 # Register module-level tool functions with FastMCP 

535 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs) 

536 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config) 

537 mcp.tool(name=INVOKE_HOOK)(invoke_hook) 

538 # set the plugin_info gauge on startup 

539 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="stdio", ssl_enabled="false").set(1) 

540 

541 # Run with stdio transport 

542 logger.info("Starting MCP plugin server with FastMCP (stdio transport)") 

543 await mcp.run_stdio_async() 

544 

545 else: # http or streamablehttp 

546 server_config: MCPServerConfig = SERVER.get_server_config() 

547 # Create FastMCP server with SSL support 

548 mcp = SSLCapableFastMCP( 

549 server_config, 

550 name=MCP_SERVER_NAME, 

551 instructions=MCP_SERVER_INSTRUCTIONS, 

552 ) 

553 

554 # Register module-level tool functions with FastMCP 

555 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs) 

556 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config) 

557 mcp.tool(name=INVOKE_HOOK)(invoke_hook) 

558 # set the plugin_info gauge on startup 

559 ssl_enabled: Literal["true", "false"] = "true" if server_config and server_config.tls is not None else "false" 

560 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="http", ssl_enabled=ssl_enabled).set(1) 

561 if server_config: 

562 logger.info(f"Prometheus metrics available at http://{server_config.host}:{server_config.port}/metrics/prometheus") 

563 # Run with streamable-http transport 

564 logger.info("Starting MCP plugin server with FastMCP (HTTP transport)") 

565 await mcp.run_streamable_http_async() 

566 

567 except Exception: 

568 logger.exception("Caught error while executing plugin server") 

569 raise 

570 finally: 

571 await SERVER.shutdown() 

572 

573 

574if __name__ == "__main__": 

575 asyncio.run(run())