Coverage for mcpgateway / plugins / framework / external / mcp / server / runtime.py: 100%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/plugins/framework/external/mcp/server/runtime.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Fred Araujo, Teryl Taylor 

7 

8MCP Plugin Runtime using FastMCP with SSL/TLS support. 

9 

10This runtime does the following: 

11- Uses FastMCP from the MCP Python SDK 

12- Supports both mTLS and non-mTLS configurations 

13- Reads configuration from PLUGINS_SERVER_* environment variables or uses configurations 

14 the plugin config.yaml 

15- Implements all plugin hook tools (get_plugin_configs, tool_pre_invoke, etc.) 

16 

17Examples: 

18 Create an SSL-capable FastMCP server: 

19 

20 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

21 >>> config = MCPServerConfig(host="localhost", port=8000) 

22 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

23 >>> server.settings.host 

24 'localhost' 

25 >>> server.settings.port 

26 8000 

27 

28 Check SSL configuration returns empty dict when TLS is not configured: 

29 

30 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

31 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None) 

32 >>> server = SSLCapableFastMCP(server_config=config, name="NoTLSServer") 

33 >>> ssl_config = server._get_ssl_config() 

34 >>> ssl_config 

35 {} 

36 

37 Verify server configuration is accessible: 

38 

39 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

40 >>> config = MCPServerConfig(host="localhost", port=9000) 

41 >>> server = SSLCapableFastMCP(server_config=config, name="ConfigTest") 

42 >>> server.server_config.host 

43 'localhost' 

44 >>> server.server_config.port 

45 9000 

46 

47 Settings are properly passed to FastMCP: 

48 

49 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

50 >>> config = MCPServerConfig(host="0.0.0.0", port=8080) 

51 >>> server = SSLCapableFastMCP(server_config=config, name="SettingsTest") 

52 >>> server.settings.host 

53 '0.0.0.0' 

54 >>> server.settings.port 

55 8080 

56""" 

57 

58# Standard 

59import asyncio 

60import logging 

61import os 

62import sys 

63from typing import Any, Dict, Literal 

64 

65# Third-Party 

66from fastapi import Response, status 

67from mcp.server.fastmcp import FastMCP 

68from mcp.server.transport_security import TransportSecuritySettings 

69from prometheus_client import Gauge, generate_latest, REGISTRY 

70import uvicorn 

71 

72# First-Party 

73from mcpgateway.plugins.framework import ExternalPluginServer, MCPServerConfig 

74from mcpgateway.plugins.framework.constants import GET_PLUGIN_CONFIG, GET_PLUGIN_CONFIGS, INVOKE_HOOK, MCP_SERVER_INSTRUCTIONS, MCP_SERVER_NAME 

75from mcpgateway.plugins.framework.settings import get_transport_settings 

76 

77logger = logging.getLogger(__name__) 

78 

79SERVER: ExternalPluginServer | None = None 

80 

81PLUGIN_INFO = Gauge( 

82 "plugin_info", 

83 "Plugin server information", 

84 ["server_name", "transport", "ssl_enabled"], 

85 registry=REGISTRY, 

86) 

87 

88# Module-level tool functions (extracted for testability) 

89 

90 

91async def get_plugin_configs() -> list[dict]: 

92 """Get the plugin configurations installed on the server. 

93 

94 Returns: 

95 JSON string containing list of plugin configuration dictionaries. 

96 

97 Raises: 

98 RuntimeError: If plugin server not initialized. 

99 

100 Examples: 

101 Function raises RuntimeError when server is not initialized: 

102 

103 >>> import asyncio 

104 >>> asyncio.run(get_plugin_configs()) # doctest: +SKIP 

105 Traceback (most recent call last): 

106 ... 

107 RuntimeError: Plugin server not initialized 

108 """ 

109 if not SERVER: 

110 raise RuntimeError("Plugin server not initialized") 

111 return await SERVER.get_plugin_configs() 

112 

113 

114async def get_plugin_config(name: str) -> dict: 

115 """Get the plugin configuration for a specific plugin. 

116 

117 Args: 

118 name: The name of the plugin 

119 

120 Returns: 

121 JSON string containing plugin configuration dictionary. 

122 

123 Raises: 

124 RuntimeError: If plugin server not initialized. 

125 

126 Examples: 

127 Function returns empty dict when result is None: 

128 

129 >>> result = None 

130 >>> result if result is not None else {} 

131 {} 

132 """ 

133 if not SERVER: 

134 raise RuntimeError("Plugin server not initialized") 

135 result = await SERVER.get_plugin_config(name) 

136 if result is None: 

137 return {} 

138 return result 

139 

140 

141async def invoke_hook(hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any]) -> dict: 

142 """Execute a hook for a plugin. 

143 

144 Args: 

145 hook_type: The name or type of the hook. 

146 plugin_name: The name of the plugin to execute 

147 payload: The resource payload to be analyzed 

148 context: Contextual information 

149 

150 Returns: 

151 Result dictionary with payload, context and any error information. 

152 

153 Raises: 

154 RuntimeError: If plugin server not initialized. 

155 

156 Examples: 

157 Function raises RuntimeError when server is not initialized: 

158 

159 >>> import asyncio 

160 >>> asyncio.run(invoke_hook("hook", "plugin", {}, {})) # doctest: +SKIP 

161 Traceback (most recent call last): 

162 ... 

163 RuntimeError: Plugin server not initialized 

164 """ 

165 if not SERVER: 

166 raise RuntimeError("Plugin server not initialized") 

167 return await SERVER.invoke_hook(hook_type, plugin_name, payload, context) 

168 

169 

170class SSLCapableFastMCP(FastMCP): 

171 """FastMCP server with SSL/TLS support using MCPServerConfig. 

172 

173 Examples: 

174 Create an SSL-capable FastMCP server: 

175 

176 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

177 >>> config = MCPServerConfig(host="127.0.0.1", port=8000) 

178 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

179 >>> server.settings.host 

180 '127.0.0.1' 

181 >>> server.settings.port 

182 8000 

183 """ 

184 

185 def __init__(self, server_config: MCPServerConfig, *args, **kwargs): 

186 """Initialize an SSL capable Fast MCP server. 

187 

188 Args: 

189 server_config: the MCP server configuration including mTLS information. 

190 *args: Additional positional arguments passed to FastMCP. 

191 **kwargs: Additional keyword arguments passed to FastMCP. 

192 

193 Examples: 

194 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

195 >>> config = MCPServerConfig(host="0.0.0.0", port=9000) 

196 >>> server = SSLCapableFastMCP(server_config=config, name="PluginServer") 

197 >>> server.server_config.host 

198 '0.0.0.0' 

199 >>> server.server_config.port 

200 9000 

201 """ 

202 # Load server config from environment 

203 

204 self.server_config = server_config 

205 # Override FastMCP settings with our server config 

206 if "host" not in kwargs: 

207 kwargs["host"] = self.server_config.host 

208 if "port" not in kwargs: 

209 kwargs["port"] = self.server_config.port 

210 if self.server_config.uds and kwargs.get("transport_security") is None: 

211 kwargs["transport_security"] = TransportSecuritySettings( 

212 enable_dns_rebinding_protection=True, 

213 allowed_hosts=[ 

214 "127.0.0.1", 

215 "localhost", 

216 "[::1]", 

217 "127.0.0.1:*", 

218 "localhost:*", 

219 "[::1]:*", 

220 ], 

221 allowed_origins=[ 

222 "http://127.0.0.1", 

223 "http://localhost", 

224 "http://[::1]", 

225 "http://127.0.0.1:*", 

226 "http://localhost:*", 

227 "http://[::1]:*", 

228 ], 

229 ) 

230 

231 super().__init__(*args, **kwargs) 

232 

233 def _get_ssl_config(self) -> dict: 

234 """Build SSL configuration for uvicorn from MCPServerConfig. 

235 

236 Returns: 

237 Dictionary of SSL configuration parameters for uvicorn. 

238 

239 Examples: 

240 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

241 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None) 

242 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

243 >>> ssl_config = server._get_ssl_config() 

244 >>> ssl_config 

245 {} 

246 """ 

247 ssl_config = {} 

248 

249 if self.server_config.tls: 

250 tls = self.server_config.tls 

251 if tls.keyfile and tls.certfile: 

252 ssl_config["ssl_keyfile"] = tls.keyfile 

253 ssl_config["ssl_certfile"] = tls.certfile 

254 

255 if tls.ca_bundle: 

256 ssl_config["ssl_ca_certs"] = tls.ca_bundle 

257 

258 ssl_config["ssl_cert_reqs"] = str(tls.ssl_cert_reqs) 

259 

260 if tls.keyfile_password: 

261 ssl_config["ssl_keyfile_password"] = tls.keyfile_password 

262 

263 logger.info("SSL/TLS enabled (mTLS)") 

264 logger.info(f" Key: {ssl_config['ssl_keyfile']}") 

265 logger.info(f" Cert: {ssl_config['ssl_certfile']}") 

266 if "ssl_ca_certs" in ssl_config: 

267 logger.info(f" CA: {ssl_config['ssl_ca_certs']}") 

268 logger.info(f" Client cert required: {ssl_config['ssl_cert_reqs'] == 2}") 

269 else: 

270 logger.warning("TLS config present but keyfile/certfile not configured") 

271 else: 

272 logger.info("SSL/TLS not enabled") 

273 

274 return ssl_config 

275 

276 async def _start_health_check_server(self, health_port: int) -> None: 

277 """Start a simple HTTP-only health check server on a separate port. 

278 

279 This allows health checks to work even when the main server uses HTTPS/mTLS. 

280 

281 Args: 

282 health_port: Port number for the health check server. 

283 

284 Examples: 

285 Health check endpoint returns expected JSON response: 

286 

287 >>> import asyncio 

288 >>> from starlette.responses import JSONResponse 

289 >>> from starlette.requests import Request 

290 >>> async def health_check(_request: Request): 

291 ... return JSONResponse({"status": "healthy"}) 

292 >>> response = asyncio.run(health_check(None)) 

293 >>> response.status_code 

294 200 

295 """ 

296 # Third-Party 

297 from starlette.applications import Starlette # pylint: disable=import-outside-toplevel 

298 from starlette.requests import Request # pylint: disable=import-outside-toplevel 

299 from starlette.routing import Route # pylint: disable=import-outside-toplevel 

300 

301 # First-Party 

302 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel 

303 

304 async def health_check(_request: Request): 

305 """Health check endpoint for container orchestration. 

306 

307 Returns: 

308 JSON response with health status. 

309 """ 

310 return ORJSONResponse({"status": "healthy"}) 

311 

312 async def metrics_endpoint(_request: Request): 

313 """Prometheus metrics endpoint. 

314 

315 Returns: 

316 JSON response with health status. 

317 

318 """ 

319 metrics_data = generate_latest(REGISTRY) 

320 return Response(content=metrics_data, media_type="text/plain; version=0.0.4") 

321 

322 async def metrics_disabled(): 

323 """Returns metrics response when metrics collection is disabled. 

324 

325 Returns: 

326 Response: HTTP 503 response indicating metrics are disabled. 

327 """ 

328 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) 

329 

330 routes = [ 

331 Route("/health", health_check, methods=["GET"]), 

332 ] 

333 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true" 

334 if enable_metrics: 

335 routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"])) 

336 else: 

337 routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"])) 

338 

339 # Create a minimal Starlette app with only the health endpoint 

340 health_app = Starlette(routes=routes) 

341 

342 logger.info(f"Starting HTTP health check server on {self.settings.host}:{health_port}") 

343 config = uvicorn.Config( 

344 app=health_app, 

345 host=self.settings.host, 

346 port=health_port, 

347 log_level="warning", # Reduce noise from health checks 

348 ) 

349 server = uvicorn.Server(config) 

350 await server.serve() 

351 

352 async def run_streamable_http_async(self) -> None: 

353 """Run the server using StreamableHTTP transport with optional SSL/TLS. 

354 

355 Examples: 

356 Server uses configured host and port: 

357 

358 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

359 >>> config = MCPServerConfig(host="0.0.0.0", port=9000) 

360 >>> server = SSLCapableFastMCP(server_config=config, name="HTTPServer") 

361 >>> server.settings.host 

362 '0.0.0.0' 

363 >>> server.settings.port 

364 9000 

365 """ 

366 starlette_app = self.streamable_http_app() 

367 

368 # Add health check endpoint to main app 

369 # Third-Party 

370 from starlette.requests import Request # pylint: disable=import-outside-toplevel 

371 from starlette.routing import Route # pylint: disable=import-outside-toplevel 

372 

373 # First-Party 

374 from mcpgateway.plugins.framework.utils import ORJSONResponse # pylint: disable=import-outside-toplevel 

375 

376 async def health_check(_request: Request): 

377 """Health check endpoint for container orchestration. 

378 

379 Returns: 

380 JSON response with health status. 

381 """ 

382 return ORJSONResponse({"status": "healthy"}) 

383 

384 # Add the health route to the Starlette app 

385 starlette_app.routes.append(Route("/health", health_check, methods=["GET"])) 

386 

387 async def metrics_endpoint(_request: Request): 

388 """Prometheus metrics endpoint. 

389 

390 Returns: 

391 text response with metrics detail. 

392 """ 

393 metrics_data = generate_latest(REGISTRY) 

394 return Response(content=metrics_data, media_type="text/plain; version=0.0.4") 

395 

396 async def metrics_disabled(): 

397 """Returns metrics response when metrics collection is disabled. 

398 

399 Returns: 

400 Response: HTTP 503 response indicating metrics are disabled. 

401 """ 

402 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) 

403 

404 # Add the metrics route to the Starlette app 

405 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true" 

406 if enable_metrics: 

407 starlette_app.routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"])) 

408 else: 

409 starlette_app.routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"])) 

410 

411 # Build uvicorn config with optional SSL 

412 ssl_config = self._get_ssl_config() 

413 config_kwargs = { 

414 "app": starlette_app, 

415 "host": self.settings.host, 

416 "port": self.settings.port, 

417 "log_level": self.settings.log_level.lower(), 

418 } 

419 config_kwargs.update(ssl_config) 

420 

421 if self.server_config.uds: 

422 config_kwargs.pop("host", None) 

423 config_kwargs.pop("port", None) 

424 config_kwargs["uds"] = self.server_config.uds 

425 logger.info(f"Starting plugin server on unix socket {self.server_config.uds}") 

426 else: 

427 logger.info(f"Starting plugin server on {self.settings.host}:{self.settings.port}") 

428 config = uvicorn.Config(**config_kwargs) # type: ignore[arg-type] 

429 server = uvicorn.Server(config) 

430 

431 # If SSL is enabled, start a separate HTTP health check server 

432 if ssl_config and not self.server_config.uds: 

433 health_port = self.settings.port + 1000 # Use port+1000 for health checks 

434 logger.info(f"SSL enabled - starting separate HTTP health check on port {health_port}") 

435 # Run both servers concurrently 

436 await asyncio.gather(server.serve(), self._start_health_check_server(health_port)) 

437 else: 

438 # Just run the main server (health check is already on it) 

439 await server.serve() 

440 

441 

442async def run() -> None: 

443 """Run the external plugin server with FastMCP. 

444 

445 Supports both stdio and HTTP transports. Auto-detects transport based on stdin 

446 (if stdin is not a TTY, uses stdio mode), or you can explicitly set PLUGINS_TRANSPORT. 

447 

448 Reads configuration from PLUGINS_SERVER_* environment variables: 

449 - PLUGINS_TRANSPORT: Transport type - 'stdio' or 'http' (default: auto-detect) 

450 - PLUGINS_SERVER_HOST: Server host (default: 0.0.0.0) - HTTP mode only 

451 - PLUGINS_SERVER_PORT: Server port (default: 8000) - HTTP mode only 

452 - PLUGINS_SERVER_UDS: Unix domain socket path - HTTP mode only (no TLS) 

453 - PLUGINS_SERVER_SSL_ENABLED: Enable SSL/TLS (true/false) - HTTP mode only 

454 - PLUGINS_SERVER_SSL_KEYFILE: Path to server private key - HTTP mode only 

455 - PLUGINS_SERVER_SSL_CERTFILE: Path to server certificate - HTTP mode only 

456 - PLUGINS_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification - HTTP mode only 

457 - PLUGINS_SERVER_SSL_CERT_REQS: Client cert requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED) - HTTP mode only 

458 

459 Raises: 

460 Exception: If plugin server initialization or execution fails. 

461 

462 Examples: 

463 SERVER module variable starts as None: 

464 

465 >>> SERVER is None 

466 True 

467 

468 FastMCP server names are defined as constants: 

469 

470 >>> from mcpgateway.plugins.framework.constants import MCP_SERVER_NAME 

471 >>> isinstance(MCP_SERVER_NAME, str) 

472 True 

473 >>> len(MCP_SERVER_NAME) > 0 

474 True 

475 """ 

476 global SERVER # pylint: disable=global-statement 

477 

478 # Initialize plugin server 

479 SERVER = ExternalPluginServer() 

480 

481 if not await SERVER.initialize(): 

482 logger.error("Failed to initialize plugin server") 

483 return 

484 

485 # Determine transport type from environment variable or auto-detect 

486 # Auto-detect: if stdin is not a TTY (i.e., it's being piped), use stdio mode 

487 # First-Party 

488 transport = get_transport_settings().transport 

489 if transport is None: 

490 # Auto-detect based on stdin 

491 if not sys.stdin.isatty(): 

492 transport = "stdio" 

493 logger.info("Auto-detected stdio transport (stdin is not a TTY)") 

494 else: 

495 transport = "http" 

496 else: 

497 transport = transport.lower() 

498 

499 try: 

500 if transport == "stdio": 

501 # Create basic FastMCP server for stdio (no SSL support needed for stdio) 

502 mcp = FastMCP( 

503 name=MCP_SERVER_NAME, 

504 instructions=MCP_SERVER_INSTRUCTIONS, 

505 ) 

506 

507 # Register module-level tool functions with FastMCP 

508 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs) 

509 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config) 

510 mcp.tool(name=INVOKE_HOOK)(invoke_hook) 

511 # set the plugin_info gauge on startup 

512 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="stdio", ssl_enabled="false").set(1) 

513 

514 # Run with stdio transport 

515 logger.info("Starting MCP plugin server with FastMCP (stdio transport)") 

516 await mcp.run_stdio_async() 

517 

518 else: # http or streamablehttp 

519 server_config: MCPServerConfig = SERVER.get_server_config() 

520 # Create FastMCP server with SSL support 

521 mcp = SSLCapableFastMCP( 

522 server_config, 

523 name=MCP_SERVER_NAME, 

524 instructions=MCP_SERVER_INSTRUCTIONS, 

525 ) 

526 

527 # Register module-level tool functions with FastMCP 

528 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs) 

529 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config) 

530 mcp.tool(name=INVOKE_HOOK)(invoke_hook) 

531 # set the plugin_info gauge on startup 

532 ssl_enabled: Literal["true", "false"] = "true" if server_config and server_config.tls is not None else "false" 

533 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="http", ssl_enabled=ssl_enabled).set(1) 

534 if server_config: 

535 logger.info(f"Prometheus metrics available at http://{server_config.host}:{server_config.port}/metrics/prometheus") 

536 # Run with streamable-http transport 

537 logger.info("Starting MCP plugin server with FastMCP (HTTP transport)") 

538 await mcp.run_streamable_http_async() 

539 

540 except Exception: 

541 logger.exception("Caught error while executing plugin server") 

542 raise 

543 finally: 

544 await SERVER.shutdown() 

545 

546 

547if __name__ == "__main__": 

548 asyncio.run(run())