Coverage for mcpgateway / plugins / framework / external / mcp / server / runtime.py: 100%

152 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/plugins/framework/external/mcp/server/runtime.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Fred Araujo, Teryl Taylor 

7 

8MCP Plugin Runtime using FastMCP with SSL/TLS support. 

9 

10This runtime does the following: 

11- Uses FastMCP from the MCP Python SDK 

12- Supports both mTLS and non-mTLS configurations 

13- Reads configuration from PLUGINS_SERVER_* environment variables or uses configurations 

14 the plugin config.yaml 

15- Implements all plugin hook tools (get_plugin_configs, tool_pre_invoke, etc.) 

16 

17Examples: 

18 Create an SSL-capable FastMCP server: 

19 

20 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

21 >>> config = MCPServerConfig(host="localhost", port=8000) 

22 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

23 >>> server.settings.host 

24 'localhost' 

25 >>> server.settings.port 

26 8000 

27 

28 Check SSL configuration returns empty dict when TLS is not configured: 

29 

30 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

31 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None) 

32 >>> server = SSLCapableFastMCP(server_config=config, name="NoTLSServer") 

33 >>> ssl_config = server._get_ssl_config() 

34 >>> ssl_config 

35 {} 

36 

37 Verify server configuration is accessible: 

38 

39 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

40 >>> config = MCPServerConfig(host="localhost", port=9000) 

41 >>> server = SSLCapableFastMCP(server_config=config, name="ConfigTest") 

42 >>> server.server_config.host 

43 'localhost' 

44 >>> server.server_config.port 

45 9000 

46 

47 Settings are properly passed to FastMCP: 

48 

49 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

50 >>> config = MCPServerConfig(host="0.0.0.0", port=8080) 

51 >>> server = SSLCapableFastMCP(server_config=config, name="SettingsTest") 

52 >>> server.settings.host 

53 '0.0.0.0' 

54 >>> server.settings.port 

55 8080 

56""" 

57 

58# Standard 

59import asyncio 

60import logging 

61import os 

62import sys 

63from typing import Any, Dict, Literal 

64 

65# Third-Party 

66from fastapi import Response, status 

67from mcp.server.fastmcp import FastMCP 

68from mcp.server.transport_security import TransportSecuritySettings 

69from prometheus_client import Gauge, generate_latest, REGISTRY 

70import uvicorn 

71 

72# First-Party 

73from mcpgateway.plugins.framework import ExternalPluginServer, MCPServerConfig 

74from mcpgateway.plugins.framework.constants import GET_PLUGIN_CONFIG, GET_PLUGIN_CONFIGS, INVOKE_HOOK, MCP_SERVER_INSTRUCTIONS, MCP_SERVER_NAME 

75 

76logger = logging.getLogger(__name__) 

77 

78SERVER: ExternalPluginServer | None = None 

79 

80PLUGIN_INFO = Gauge( 

81 "plugin_info", 

82 "Plugin server information", 

83 ["server_name", "transport", "ssl_enabled"], 

84 registry=REGISTRY, 

85) 

86 

87# Module-level tool functions (extracted for testability) 

88 

89 

90async def get_plugin_configs() -> list[dict]: 

91 """Get the plugin configurations installed on the server. 

92 

93 Returns: 

94 JSON string containing list of plugin configuration dictionaries. 

95 

96 Raises: 

97 RuntimeError: If plugin server not initialized. 

98 

99 Examples: 

100 Function raises RuntimeError when server is not initialized: 

101 

102 >>> import asyncio 

103 >>> asyncio.run(get_plugin_configs()) # doctest: +SKIP 

104 Traceback (most recent call last): 

105 ... 

106 RuntimeError: Plugin server not initialized 

107 """ 

108 if not SERVER: 

109 raise RuntimeError("Plugin server not initialized") 

110 return await SERVER.get_plugin_configs() 

111 

112 

113async def get_plugin_config(name: str) -> dict: 

114 """Get the plugin configuration for a specific plugin. 

115 

116 Args: 

117 name: The name of the plugin 

118 

119 Returns: 

120 JSON string containing plugin configuration dictionary. 

121 

122 Raises: 

123 RuntimeError: If plugin server not initialized. 

124 

125 Examples: 

126 Function returns empty dict when result is None: 

127 

128 >>> result = None 

129 >>> result if result is not None else {} 

130 {} 

131 """ 

132 if not SERVER: 

133 raise RuntimeError("Plugin server not initialized") 

134 result = await SERVER.get_plugin_config(name) 

135 if result is None: 

136 return {} 

137 return result 

138 

139 

140async def invoke_hook(hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any]) -> dict: 

141 """Execute a hook for a plugin. 

142 

143 Args: 

144 hook_type: The name or type of the hook. 

145 plugin_name: The name of the plugin to execute 

146 payload: The resource payload to be analyzed 

147 context: Contextual information 

148 

149 Returns: 

150 Result dictionary with payload, context and any error information. 

151 

152 Raises: 

153 RuntimeError: If plugin server not initialized. 

154 

155 Examples: 

156 Function raises RuntimeError when server is not initialized: 

157 

158 >>> import asyncio 

159 >>> asyncio.run(invoke_hook("hook", "plugin", {}, {})) # doctest: +SKIP 

160 Traceback (most recent call last): 

161 ... 

162 RuntimeError: Plugin server not initialized 

163 """ 

164 if not SERVER: 

165 raise RuntimeError("Plugin server not initialized") 

166 return await SERVER.invoke_hook(hook_type, plugin_name, payload, context) 

167 

168 

169class SSLCapableFastMCP(FastMCP): 

170 """FastMCP server with SSL/TLS support using MCPServerConfig. 

171 

172 Examples: 

173 Create an SSL-capable FastMCP server: 

174 

175 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

176 >>> config = MCPServerConfig(host="127.0.0.1", port=8000) 

177 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

178 >>> server.settings.host 

179 '127.0.0.1' 

180 >>> server.settings.port 

181 8000 

182 """ 

183 

184 def __init__(self, server_config: MCPServerConfig, *args, **kwargs): 

185 """Initialize an SSL capable Fast MCP server. 

186 

187 Args: 

188 server_config: the MCP server configuration including mTLS information. 

189 *args: Additional positional arguments passed to FastMCP. 

190 **kwargs: Additional keyword arguments passed to FastMCP. 

191 

192 Examples: 

193 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

194 >>> config = MCPServerConfig(host="0.0.0.0", port=9000) 

195 >>> server = SSLCapableFastMCP(server_config=config, name="PluginServer") 

196 >>> server.server_config.host 

197 '0.0.0.0' 

198 >>> server.server_config.port 

199 9000 

200 """ 

201 # Load server config from environment 

202 

203 self.server_config = server_config 

204 # Override FastMCP settings with our server config 

205 if "host" not in kwargs: 

206 kwargs["host"] = self.server_config.host 

207 if "port" not in kwargs: 

208 kwargs["port"] = self.server_config.port 

209 if self.server_config.uds and kwargs.get("transport_security") is None: 

210 kwargs["transport_security"] = TransportSecuritySettings( 

211 enable_dns_rebinding_protection=True, 

212 allowed_hosts=[ 

213 "127.0.0.1", 

214 "localhost", 

215 "[::1]", 

216 "127.0.0.1:*", 

217 "localhost:*", 

218 "[::1]:*", 

219 ], 

220 allowed_origins=[ 

221 "http://127.0.0.1", 

222 "http://localhost", 

223 "http://[::1]", 

224 "http://127.0.0.1:*", 

225 "http://localhost:*", 

226 "http://[::1]:*", 

227 ], 

228 ) 

229 

230 super().__init__(*args, **kwargs) 

231 

232 def _get_ssl_config(self) -> dict: 

233 """Build SSL configuration for uvicorn from MCPServerConfig. 

234 

235 Returns: 

236 Dictionary of SSL configuration parameters for uvicorn. 

237 

238 Examples: 

239 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

240 >>> config = MCPServerConfig(host="127.0.0.1", port=8000, tls=None) 

241 >>> server = SSLCapableFastMCP(server_config=config, name="TestServer") 

242 >>> ssl_config = server._get_ssl_config() 

243 >>> ssl_config 

244 {} 

245 """ 

246 ssl_config = {} 

247 

248 if self.server_config.tls: 

249 tls = self.server_config.tls 

250 if tls.keyfile and tls.certfile: 

251 ssl_config["ssl_keyfile"] = tls.keyfile 

252 ssl_config["ssl_certfile"] = tls.certfile 

253 

254 if tls.ca_bundle: 

255 ssl_config["ssl_ca_certs"] = tls.ca_bundle 

256 

257 ssl_config["ssl_cert_reqs"] = str(tls.ssl_cert_reqs) 

258 

259 if tls.keyfile_password: 

260 ssl_config["ssl_keyfile_password"] = tls.keyfile_password 

261 

262 logger.info("SSL/TLS enabled (mTLS)") 

263 logger.info(f" Key: {ssl_config['ssl_keyfile']}") 

264 logger.info(f" Cert: {ssl_config['ssl_certfile']}") 

265 if "ssl_ca_certs" in ssl_config: 

266 logger.info(f" CA: {ssl_config['ssl_ca_certs']}") 

267 logger.info(f" Client cert required: {ssl_config['ssl_cert_reqs'] == 2}") 

268 else: 

269 logger.warning("TLS config present but keyfile/certfile not configured") 

270 else: 

271 logger.info("SSL/TLS not enabled") 

272 

273 return ssl_config 

274 

275 async def _start_health_check_server(self, health_port: int) -> None: 

276 """Start a simple HTTP-only health check server on a separate port. 

277 

278 This allows health checks to work even when the main server uses HTTPS/mTLS. 

279 

280 Args: 

281 health_port: Port number for the health check server. 

282 

283 Examples: 

284 Health check endpoint returns expected JSON response: 

285 

286 >>> import asyncio 

287 >>> from starlette.responses import JSONResponse 

288 >>> from starlette.requests import Request 

289 >>> async def health_check(_request: Request): 

290 ... return JSONResponse({"status": "healthy"}) 

291 >>> response = asyncio.run(health_check(None)) 

292 >>> response.status_code 

293 200 

294 """ 

295 # Third-Party 

296 from starlette.applications import Starlette # pylint: disable=import-outside-toplevel 

297 from starlette.requests import Request # pylint: disable=import-outside-toplevel 

298 from starlette.routing import Route # pylint: disable=import-outside-toplevel 

299 

300 # First-Party 

301 from mcpgateway.utils.orjson_response import ORJSONResponse # pylint: disable=import-outside-toplevel 

302 

303 async def health_check(_request: Request): 

304 """Health check endpoint for container orchestration. 

305 

306 Returns: 

307 JSON response with health status. 

308 """ 

309 return ORJSONResponse({"status": "healthy"}) 

310 

311 async def metrics_endpoint(_request: Request): 

312 """Prometheus metrics endpoint. 

313 

314 Returns: 

315 JSON response with health status. 

316 

317 """ 

318 metrics_data = generate_latest(REGISTRY) 

319 return Response(content=metrics_data, media_type="text/plain; version=0.0.4") 

320 

321 async def metrics_disabled(): 

322 """Returns metrics response when metrics collection is disabled. 

323 

324 Returns: 

325 Response: HTTP 503 response indicating metrics are disabled. 

326 """ 

327 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) 

328 

329 routes = [ 

330 Route("/health", health_check, methods=["GET"]), 

331 ] 

332 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true" 

333 if enable_metrics: 

334 routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"])) 

335 else: 

336 routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"])) 

337 

338 # Create a minimal Starlette app with only the health endpoint 

339 health_app = Starlette(routes=routes) 

340 

341 logger.info(f"Starting HTTP health check server on {self.settings.host}:{health_port}") 

342 config = uvicorn.Config( 

343 app=health_app, 

344 host=self.settings.host, 

345 port=health_port, 

346 log_level="warning", # Reduce noise from health checks 

347 ) 

348 server = uvicorn.Server(config) 

349 await server.serve() 

350 

351 async def run_streamable_http_async(self) -> None: 

352 """Run the server using StreamableHTTP transport with optional SSL/TLS. 

353 

354 Examples: 

355 Server uses configured host and port: 

356 

357 >>> from mcpgateway.plugins.framework.models import MCPServerConfig 

358 >>> config = MCPServerConfig(host="0.0.0.0", port=9000) 

359 >>> server = SSLCapableFastMCP(server_config=config, name="HTTPServer") 

360 >>> server.settings.host 

361 '0.0.0.0' 

362 >>> server.settings.port 

363 9000 

364 """ 

365 starlette_app = self.streamable_http_app() 

366 

367 # Add health check endpoint to main app 

368 # Third-Party 

369 from starlette.requests import Request # pylint: disable=import-outside-toplevel 

370 from starlette.routing import Route # pylint: disable=import-outside-toplevel 

371 

372 # First-Party 

373 from mcpgateway.utils.orjson_response import ORJSONResponse # pylint: disable=import-outside-toplevel 

374 

375 async def health_check(_request: Request): 

376 """Health check endpoint for container orchestration. 

377 

378 Returns: 

379 JSON response with health status. 

380 """ 

381 return ORJSONResponse({"status": "healthy"}) 

382 

383 # Add the health route to the Starlette app 

384 starlette_app.routes.append(Route("/health", health_check, methods=["GET"])) 

385 

386 async def metrics_endpoint(_request: Request): 

387 """Prometheus metrics endpoint. 

388 

389 Returns: 

390 text response with metrics detail. 

391 """ 

392 metrics_data = generate_latest(REGISTRY) 

393 return Response(content=metrics_data, media_type="text/plain; version=0.0.4") 

394 

395 async def metrics_disabled(): 

396 """Returns metrics response when metrics collection is disabled. 

397 

398 Returns: 

399 Response: HTTP 503 response indicating metrics are disabled. 

400 """ 

401 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) 

402 

403 # Add the metrics route to the Starlette app 

404 enable_metrics = os.getenv("ENABLE_METRICS", "true").lower() == "true" 

405 if enable_metrics: 

406 starlette_app.routes.append(Route("/metrics/prometheus", metrics_endpoint, methods=["GET"])) 

407 else: 

408 starlette_app.routes.append(Route("/metrics/prometheus", metrics_disabled, methods=["GET"])) 

409 

410 # Build uvicorn config with optional SSL 

411 ssl_config = self._get_ssl_config() 

412 config_kwargs = { 

413 "app": starlette_app, 

414 "host": self.settings.host, 

415 "port": self.settings.port, 

416 "log_level": self.settings.log_level.lower(), 

417 } 

418 config_kwargs.update(ssl_config) 

419 

420 if self.server_config.uds: 

421 config_kwargs.pop("host", None) 

422 config_kwargs.pop("port", None) 

423 config_kwargs["uds"] = self.server_config.uds 

424 logger.info(f"Starting plugin server on unix socket {self.server_config.uds}") 

425 else: 

426 logger.info(f"Starting plugin server on {self.settings.host}:{self.settings.port}") 

427 config = uvicorn.Config(**config_kwargs) # type: ignore[arg-type] 

428 server = uvicorn.Server(config) 

429 

430 # If SSL is enabled, start a separate HTTP health check server 

431 if ssl_config and not self.server_config.uds: 

432 health_port = self.settings.port + 1000 # Use port+1000 for health checks 

433 logger.info(f"SSL enabled - starting separate HTTP health check on port {health_port}") 

434 # Run both servers concurrently 

435 await asyncio.gather(server.serve(), self._start_health_check_server(health_port)) 

436 else: 

437 # Just run the main server (health check is already on it) 

438 await server.serve() 

439 

440 

441async def run() -> None: 

442 """Run the external plugin server with FastMCP. 

443 

444 Supports both stdio and HTTP transports. Auto-detects transport based on stdin 

445 (if stdin is not a TTY, uses stdio mode), or you can explicitly set PLUGINS_TRANSPORT. 

446 

447 Reads configuration from PLUGINS_SERVER_* environment variables: 

448 - PLUGINS_TRANSPORT: Transport type - 'stdio' or 'http' (default: auto-detect) 

449 - PLUGINS_SERVER_HOST: Server host (default: 0.0.0.0) - HTTP mode only 

450 - PLUGINS_SERVER_PORT: Server port (default: 8000) - HTTP mode only 

451 - PLUGINS_SERVER_UDS: Unix domain socket path - HTTP mode only (no TLS) 

452 - PLUGINS_SERVER_SSL_ENABLED: Enable SSL/TLS (true/false) - HTTP mode only 

453 - PLUGINS_SERVER_SSL_KEYFILE: Path to server private key - HTTP mode only 

454 - PLUGINS_SERVER_SSL_CERTFILE: Path to server certificate - HTTP mode only 

455 - PLUGINS_SERVER_SSL_CA_CERTS: Path to CA bundle for client verification - HTTP mode only 

456 - PLUGINS_SERVER_SSL_CERT_REQS: Client cert requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED) - HTTP mode only 

457 

458 Raises: 

459 Exception: If plugin server initialization or execution fails. 

460 

461 Examples: 

462 SERVER module variable starts as None: 

463 

464 >>> SERVER is None 

465 True 

466 

467 FastMCP server names are defined as constants: 

468 

469 >>> from mcpgateway.plugins.framework.constants import MCP_SERVER_NAME 

470 >>> isinstance(MCP_SERVER_NAME, str) 

471 True 

472 >>> len(MCP_SERVER_NAME) > 0 

473 True 

474 """ 

475 global SERVER # pylint: disable=global-statement 

476 

477 # Initialize plugin server 

478 SERVER = ExternalPluginServer() 

479 

480 if not await SERVER.initialize(): 

481 logger.error("Failed to initialize plugin server") 

482 return 

483 

484 # Determine transport type from environment variable or auto-detect 

485 # Auto-detect: if stdin is not a TTY (i.e., it's being piped), use stdio mode 

486 transport = os.environ.get("PLUGINS_TRANSPORT", None) 

487 if transport is None: 

488 # Auto-detect based on stdin 

489 if not sys.stdin.isatty(): 

490 transport = "stdio" 

491 logger.info("Auto-detected stdio transport (stdin is not a TTY)") 

492 else: 

493 transport = "http" 

494 else: 

495 transport = transport.lower() 

496 

497 try: 

498 if transport == "stdio": 

499 # Create basic FastMCP server for stdio (no SSL support needed for stdio) 

500 mcp = FastMCP( 

501 name=MCP_SERVER_NAME, 

502 instructions=MCP_SERVER_INSTRUCTIONS, 

503 ) 

504 

505 # Register module-level tool functions with FastMCP 

506 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs) 

507 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config) 

508 mcp.tool(name=INVOKE_HOOK)(invoke_hook) 

509 # set the plugin_info gauge on startup 

510 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="stdio", ssl_enabled="false").set(1) 

511 

512 # Run with stdio transport 

513 logger.info("Starting MCP plugin server with FastMCP (stdio transport)") 

514 await mcp.run_stdio_async() 

515 

516 else: # http or streamablehttp 

517 server_config: MCPServerConfig = SERVER.get_server_config() 

518 # Create FastMCP server with SSL support 

519 mcp = SSLCapableFastMCP( 

520 server_config, 

521 name=MCP_SERVER_NAME, 

522 instructions=MCP_SERVER_INSTRUCTIONS, 

523 ) 

524 

525 # Register module-level tool functions with FastMCP 

526 mcp.tool(name=GET_PLUGIN_CONFIGS)(get_plugin_configs) 

527 mcp.tool(name=GET_PLUGIN_CONFIG)(get_plugin_config) 

528 mcp.tool(name=INVOKE_HOOK)(invoke_hook) 

529 # set the plugin_info gauge on startup 

530 ssl_enabled: Literal["true", "false"] = "true" if server_config and server_config.tls is not None else "false" 

531 PLUGIN_INFO.labels(server_name=MCP_SERVER_NAME, transport="http", ssl_enabled=ssl_enabled).set(1) 

532 if server_config: 

533 logger.info(f"Prometheus metrics available at http://{server_config.host}:{server_config.port}/metrics/prometheus") 

534 # Run with streamable-http transport 

535 logger.info("Starting MCP plugin server with FastMCP (HTTP transport)") 

536 await mcp.run_streamable_http_async() 

537 

538 except Exception: 

539 logger.exception("Caught error while executing plugin server") 

540 raise 

541 finally: 

542 await SERVER.shutdown() 

543 

544 

545if __name__ == "__main__": 

546 asyncio.run(run())