Coverage for mcpgateway / plugins / framework / external / mcp / server / server.py: 97%
55 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/mcp/server/server.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Fred Araujo, Teryl Taylor
7Module that contains plugin MCP server code to serve external plugins.
9Examples:
10 Create an external plugin server with a configuration file:
12 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
13 >>> server is not None
14 True
15 >>> isinstance(server._config_path, str)
16 True
18 Get server configuration with defaults:
20 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
21 >>> config = server.get_server_config()
22 >>> config.host == '127.0.0.1'
23 True
24 >>> config.port == 8000
25 True
27 Verify plugin manager is initialized:
29 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
30 >>> server._plugin_manager is not None
31 True
32 >>> server._config is not None
33 True
35 Multiple servers can be created:
37 >>> server1 = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
38 >>> server2 = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_multiple_plugins_filter.yaml")
39 >>> server1._config_path != server2._config_path
40 True
42 Configuration is loaded from file:
44 >>> import asyncio
45 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
46 >>> plugins = asyncio.run(server.get_plugin_configs())
47 >>> isinstance(plugins, list)
48 True
49 >>> len(plugins) >= 1
50 True
52 Server configuration defaults are sensible:
54 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
55 >>> config = server.get_server_config()
56 >>> isinstance(config.host, str)
57 True
58 >>> isinstance(config.port, int)
59 True
60 >>> config.port > 0
61 True
62"""
64# Standard
65import logging
66import os
67from typing import Any, Dict, TypeVar
69# Third-Party
70from pydantic import BaseModel
72# First-Party
73from mcpgateway.plugins.framework.constants import CONTEXT, ERROR, PLUGIN_NAME, RESULT
74from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError
75from mcpgateway.plugins.framework.loader.config import ConfigLoader
76from mcpgateway.plugins.framework.manager import PluginManager
77from mcpgateway.plugins.framework.models import GRPCServerConfig, MCPServerConfig, PluginContext
79P = TypeVar("P", bound=BaseModel)
81logger = logging.getLogger(__name__)
84class ExternalPluginServer:
85 """External plugin server, providing methods for invoking plugin hooks."""
87 def __init__(self, config_path: str | None = None) -> None:
88 """Create an external plugin server.
90 Args:
91 config_path: The configuration file path for loading plugins.
92 If set, this attribute overrides the value in PLUGINS_CONFIG_PATH.
94 Examples:
95 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
96 >>> server is not None
97 True
98 """
99 self._config_path = config_path or os.environ.get("PLUGINS_CONFIG_PATH", os.path.join(".", "resources", "plugins", "config.yaml"))
100 self._config = ConfigLoader.load_config(self._config_path, use_jinja=False)
101 self._plugin_manager = PluginManager(self._config_path)
103 async def get_plugin_configs(self) -> list[dict]:
104 """Return a list of plugin configurations for plugins currently installed on the MCP server.
106 Returns:
107 A list of plugin configurations.
109 Examples:
110 >>> import asyncio
111 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
112 >>> plugins = asyncio.run(server.get_plugin_configs())
113 >>> len(plugins) > 0
114 True
116 Returns empty list when no plugins configured:
118 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
119 >>> server._config.plugins = None
120 >>> plugins = asyncio.run(server.get_plugin_configs())
121 >>> plugins
122 []
124 Each plugin config is a dictionary:
126 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
127 >>> plugins = asyncio.run(server.get_plugin_configs())
128 >>> all(isinstance(p, dict) for p in plugins)
129 True
130 """
131 plugins: list[dict] = []
132 if self._config.plugins:
133 for plug in self._config.plugins:
134 plugins.append(plug.model_dump())
135 return plugins
137 async def get_plugin_config(self, name: str) -> dict | None:
138 """Return a plugin configuration give a plugin name.
140 Args:
141 name: The name of the plugin of which to return the plugin configuration.
143 Returns:
144 A plugin configuration dict, or None if not found.
146 Examples:
147 >>> import asyncio
148 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
149 >>> c = asyncio.run(server.get_plugin_config(name = "ReplaceBadWordsPlugin"))
150 >>> c is not None
151 True
152 >>> c["name"] == "ReplaceBadWordsPlugin"
153 True
155 Returns None when plugin not found:
157 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
158 >>> c = asyncio.run(server.get_plugin_config(name="NonExistentPlugin"))
159 >>> c is None
160 True
162 Case-insensitive plugin name lookup:
164 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
165 >>> c1 = asyncio.run(server.get_plugin_config(name="ReplaceBadWordsPlugin"))
166 >>> c2 = asyncio.run(server.get_plugin_config(name="replacebadwordsplugin"))
167 >>> c1 == c2
168 True
169 """
170 if self._config.plugins:
171 for plug in self._config.plugins:
172 if plug.name.lower() == name.lower():
173 return plug.model_dump()
174 return None
176 async def invoke_hook(self, hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any] | PluginContext) -> dict:
177 """Invoke a plugin hook.
179 Args:
180 hook_type: The type of hook function to be invoked.
181 plugin_name: The name of the plugin to execute.
182 payload: The prompt name and arguments to be analyzed.
183 context: The contextual and state information required for the execution of the hook.
184 Can be a dict (for MCP transport) or PluginContext (for gRPC/Unix socket).
186 Raises:
187 ValueError: If unable to retrieve a plugin.
189 Returns:
190 The transformed or filtered response from the plugin hook.
192 Examples:
193 >>> import asyncio
194 >>> import os
195 >>> os.environ["PYTHONPATH"] = "."
196 >>> from mcpgateway.plugins.framework import GlobalContext, Plugin, PromptHookType, PromptPrehookPayload, PluginContext, PromptPrehookResult, PluginManager
197 >>> PluginManager.reset()
198 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
199 >>> payload = PromptPrehookPayload(prompt_id="123", name="test_prompt", args={"user": "This is a crap app"})
200 >>> context = PluginContext(global_context=GlobalContext(request_id="1", server_id="2"))
201 >>> initialized = asyncio.run(server.initialize())
202 >>> initialized
203 True
204 >>> result = asyncio.run(server.invoke_hook(PromptHookType.PROMPT_PRE_FETCH, "ReplaceBadWordsPlugin", payload.model_dump(), context.model_dump()))
205 >>> result is not None
206 True
207 >>> result["result"]["continue_processing"]
208 True
209 >>> "yikes" in result["result"]["modified_payload"]["args"]["user"]
210 True
211 """
212 result_payload: dict[str, Any] = {PLUGIN_NAME: plugin_name}
213 try:
214 # Track if input was Pydantic (for optimized response path)
215 context_is_pydantic = isinstance(context, PluginContext)
216 _context = context if context_is_pydantic else PluginContext.model_validate(context)
218 result = await self._plugin_manager.invoke_hook_for_plugin(plugin_name, hook_type, payload, _context, payload_as_json=True)
220 result_payload[RESULT] = result.model_dump()
221 if not _context.is_empty(): 221 ↛ 223line 221 didn't jump to line 223 because the condition on line 221 was never true
222 # Return Pydantic directly if input was Pydantic (avoids extra serialization)
223 result_payload[CONTEXT] = _context if context_is_pydantic else _context.model_dump()
224 return result_payload
225 except PluginError as pe:
226 result_payload[ERROR] = pe.error
227 return result_payload
228 except Exception as ex:
229 logger.exception(ex)
230 result_payload[ERROR] = convert_exception_to_error(ex, plugin_name=plugin_name).model_dump()
231 return result_payload
233 async def initialize(self) -> bool:
234 """Initialize the plugin server.
236 Returns:
237 A boolean indicating the intialization status of the server.
239 Examples:
240 >>> import asyncio
241 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
242 >>> result = asyncio.run(server.initialize())
243 >>> result
244 True
245 >>> asyncio.run(server.shutdown())
246 """
247 await self._plugin_manager.initialize()
248 return self._plugin_manager.initialized
250 async def shutdown(self) -> None:
251 """Shutdown the plugin server.
253 Examples:
254 >>> import asyncio
255 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
256 >>> asyncio.run(server.initialize())
257 True
258 >>> asyncio.run(server.shutdown())
259 """
260 if self._plugin_manager.initialized:
261 await self._plugin_manager.shutdown()
263 def get_server_config(self) -> MCPServerConfig:
264 """Return the configuration for the plugin server.
266 Returns:
267 A server configuration including host, port, and TLS information.
269 Examples:
270 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
271 >>> config = server.get_server_config()
272 >>> isinstance(config, MCPServerConfig)
273 True
274 >>> config.host
275 '127.0.0.1'
276 >>> config.port
277 8000
278 """
279 return self._config.server_settings or MCPServerConfig.from_env() or MCPServerConfig()
281 def get_grpc_server_config(self) -> GRPCServerConfig | None:
282 """Return the gRPC server configuration if defined.
284 Returns:
285 The gRPC server configuration from the config file, or None if not defined.
287 Examples:
288 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml")
289 >>> config = server.get_grpc_server_config()
290 >>> config is None or isinstance(config, GRPCServerConfig)
291 True
292 """
293 return self._config.grpc_server_settings