Coverage for mcpgateway / plugins / framework / external / mcp / server / server.py: 97%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/mcp/server/server.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Fred Araujo, Teryl Taylor 

6 

7Module that contains plugin MCP server code to serve external plugins. 

8 

9Examples: 

10 Create an external plugin server with a configuration file: 

11 

12 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

13 >>> server is not None 

14 True 

15 >>> isinstance(server._config_path, str) 

16 True 

17 

18 Get server configuration with defaults: 

19 

20 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

21 >>> config = server.get_server_config() 

22 >>> config.host == '127.0.0.1' 

23 True 

24 >>> config.port == 8000 

25 True 

26 

27 Verify plugin manager is initialized: 

28 

29 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

30 >>> server._plugin_manager is not None 

31 True 

32 >>> server._config is not None 

33 True 

34 

35 Multiple servers can be created: 

36 

37 >>> server1 = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

38 >>> server2 = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_multiple_plugins_filter.yaml") 

39 >>> server1._config_path != server2._config_path 

40 True 

41 

42 Configuration is loaded from file: 

43 

44 >>> import asyncio 

45 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

46 >>> plugins = asyncio.run(server.get_plugin_configs()) 

47 >>> isinstance(plugins, list) 

48 True 

49 >>> len(plugins) >= 1 

50 True 

51 

52 Server configuration defaults are sensible: 

53 

54 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

55 >>> config = server.get_server_config() 

56 >>> isinstance(config.host, str) 

57 True 

58 >>> isinstance(config.port, int) 

59 True 

60 >>> config.port > 0 

61 True 

62""" 

63 

64# Standard 

65import logging 

66import os 

67from typing import Any, Dict, TypeVar 

68 

69# Third-Party 

70from pydantic import BaseModel 

71 

72# First-Party 

73from mcpgateway.plugins.framework.constants import CONTEXT, ERROR, PLUGIN_NAME, RESULT 

74from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError 

75from mcpgateway.plugins.framework.loader.config import ConfigLoader 

76from mcpgateway.plugins.framework.manager import PluginManager 

77from mcpgateway.plugins.framework.models import GRPCServerConfig, MCPServerConfig, PluginContext 

78 

79P = TypeVar("P", bound=BaseModel) 

80 

81logger = logging.getLogger(__name__) 

82 

83 

84class ExternalPluginServer: 

85 """External plugin server, providing methods for invoking plugin hooks.""" 

86 

87 def __init__(self, config_path: str | None = None) -> None: 

88 """Create an external plugin server. 

89 

90 Args: 

91 config_path: The configuration file path for loading plugins. 

92 If set, this attribute overrides the value in PLUGINS_CONFIG_PATH. 

93 

94 Examples: 

95 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

96 >>> server is not None 

97 True 

98 """ 

99 self._config_path = config_path or os.environ.get("PLUGINS_CONFIG_PATH", os.path.join(".", "resources", "plugins", "config.yaml")) 

100 self._config = ConfigLoader.load_config(self._config_path, use_jinja=False) 

101 self._plugin_manager = PluginManager(self._config_path) 

102 

103 async def get_plugin_configs(self) -> list[dict]: 

104 """Return a list of plugin configurations for plugins currently installed on the MCP server. 

105 

106 Returns: 

107 A list of plugin configurations. 

108 

109 Examples: 

110 >>> import asyncio 

111 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

112 >>> plugins = asyncio.run(server.get_plugin_configs()) 

113 >>> len(plugins) > 0 

114 True 

115 

116 Returns empty list when no plugins configured: 

117 

118 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

119 >>> server._config.plugins = None 

120 >>> plugins = asyncio.run(server.get_plugin_configs()) 

121 >>> plugins 

122 [] 

123 

124 Each plugin config is a dictionary: 

125 

126 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

127 >>> plugins = asyncio.run(server.get_plugin_configs()) 

128 >>> all(isinstance(p, dict) for p in plugins) 

129 True 

130 """ 

131 plugins: list[dict] = [] 

132 if self._config.plugins: 

133 for plug in self._config.plugins: 

134 plugins.append(plug.model_dump()) 

135 return plugins 

136 

137 async def get_plugin_config(self, name: str) -> dict | None: 

138 """Return a plugin configuration give a plugin name. 

139 

140 Args: 

141 name: The name of the plugin of which to return the plugin configuration. 

142 

143 Returns: 

144 A plugin configuration dict, or None if not found. 

145 

146 Examples: 

147 >>> import asyncio 

148 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

149 >>> c = asyncio.run(server.get_plugin_config(name = "ReplaceBadWordsPlugin")) 

150 >>> c is not None 

151 True 

152 >>> c["name"] == "ReplaceBadWordsPlugin" 

153 True 

154 

155 Returns None when plugin not found: 

156 

157 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

158 >>> c = asyncio.run(server.get_plugin_config(name="NonExistentPlugin")) 

159 >>> c is None 

160 True 

161 

162 Case-insensitive plugin name lookup: 

163 

164 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

165 >>> c1 = asyncio.run(server.get_plugin_config(name="ReplaceBadWordsPlugin")) 

166 >>> c2 = asyncio.run(server.get_plugin_config(name="replacebadwordsplugin")) 

167 >>> c1 == c2 

168 True 

169 """ 

170 if self._config.plugins: 

171 for plug in self._config.plugins: 

172 if plug.name.lower() == name.lower(): 

173 return plug.model_dump() 

174 return None 

175 

176 async def invoke_hook(self, hook_type: str, plugin_name: str, payload: Dict[str, Any], context: Dict[str, Any] | PluginContext) -> dict: 

177 """Invoke a plugin hook. 

178 

179 Args: 

180 hook_type: The type of hook function to be invoked. 

181 plugin_name: The name of the plugin to execute. 

182 payload: The prompt name and arguments to be analyzed. 

183 context: The contextual and state information required for the execution of the hook. 

184 Can be a dict (for MCP transport) or PluginContext (for gRPC/Unix socket). 

185 

186 Raises: 

187 ValueError: If unable to retrieve a plugin. 

188 

189 Returns: 

190 The transformed or filtered response from the plugin hook. 

191 

192 Examples: 

193 >>> import asyncio 

194 >>> import os 

195 >>> os.environ["PYTHONPATH"] = "." 

196 >>> from mcpgateway.plugins.framework import GlobalContext, Plugin, PromptHookType, PromptPrehookPayload, PluginContext, PromptPrehookResult, PluginManager 

197 >>> PluginManager.reset() 

198 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

199 >>> payload = PromptPrehookPayload(prompt_id="123", name="test_prompt", args={"user": "This is a crap app"}) 

200 >>> context = PluginContext(global_context=GlobalContext(request_id="1", server_id="2")) 

201 >>> initialized = asyncio.run(server.initialize()) 

202 >>> initialized 

203 True 

204 >>> result = asyncio.run(server.invoke_hook(PromptHookType.PROMPT_PRE_FETCH, "ReplaceBadWordsPlugin", payload.model_dump(), context.model_dump())) 

205 >>> result is not None 

206 True 

207 >>> result["result"]["continue_processing"] 

208 True 

209 >>> "yikes" in result["result"]["modified_payload"]["args"]["user"] 

210 True 

211 """ 

212 result_payload: dict[str, Any] = {PLUGIN_NAME: plugin_name} 

213 try: 

214 # Track if input was Pydantic (for optimized response path) 

215 context_is_pydantic = isinstance(context, PluginContext) 

216 _context = context if context_is_pydantic else PluginContext.model_validate(context) 

217 

218 result = await self._plugin_manager.invoke_hook_for_plugin(plugin_name, hook_type, payload, _context, payload_as_json=True) 

219 

220 result_payload[RESULT] = result.model_dump() 

221 if not _context.is_empty(): 221 ↛ 223line 221 didn't jump to line 223 because the condition on line 221 was never true

222 # Return Pydantic directly if input was Pydantic (avoids extra serialization) 

223 result_payload[CONTEXT] = _context if context_is_pydantic else _context.model_dump() 

224 return result_payload 

225 except PluginError as pe: 

226 result_payload[ERROR] = pe.error 

227 return result_payload 

228 except Exception as ex: 

229 logger.exception(ex) 

230 result_payload[ERROR] = convert_exception_to_error(ex, plugin_name=plugin_name).model_dump() 

231 return result_payload 

232 

233 async def initialize(self) -> bool: 

234 """Initialize the plugin server. 

235 

236 Returns: 

237 A boolean indicating the intialization status of the server. 

238 

239 Examples: 

240 >>> import asyncio 

241 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

242 >>> result = asyncio.run(server.initialize()) 

243 >>> result 

244 True 

245 >>> asyncio.run(server.shutdown()) 

246 """ 

247 await self._plugin_manager.initialize() 

248 return self._plugin_manager.initialized 

249 

250 async def shutdown(self) -> None: 

251 """Shutdown the plugin server. 

252 

253 Examples: 

254 >>> import asyncio 

255 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

256 >>> asyncio.run(server.initialize()) 

257 True 

258 >>> asyncio.run(server.shutdown()) 

259 """ 

260 if self._plugin_manager.initialized: 

261 await self._plugin_manager.shutdown() 

262 

263 def get_server_config(self) -> MCPServerConfig: 

264 """Return the configuration for the plugin server. 

265 

266 Returns: 

267 A server configuration including host, port, and TLS information. 

268 

269 Examples: 

270 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

271 >>> config = server.get_server_config() 

272 >>> isinstance(config, MCPServerConfig) 

273 True 

274 >>> config.host 

275 '127.0.0.1' 

276 >>> config.port 

277 8000 

278 """ 

279 return self._config.server_settings or MCPServerConfig.from_env() or MCPServerConfig() 

280 

281 def get_grpc_server_config(self) -> GRPCServerConfig | None: 

282 """Return the gRPC server configuration if defined. 

283 

284 Returns: 

285 The gRPC server configuration from the config file, or None if not defined. 

286 

287 Examples: 

288 >>> server = ExternalPluginServer(config_path="./tests/unit/mcpgateway/plugins/fixtures/configs/valid_single_plugin.yaml") 

289 >>> config = server.get_grpc_server_config() 

290 >>> config is None or isinstance(config, GRPCServerConfig) 

291 True 

292 """ 

293 return self._config.grpc_server_settings