Coverage for mcpgateway / plugins / framework / registry.py: 99%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/registry.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Plugin instance registry. 

8Module that stores plugin instances and manages hook points. 

9""" 

10 

11# Standard 

12from collections import defaultdict 

13import logging 

14from typing import Optional 

15 

16# First-Party 

17from mcpgateway.plugins.framework.base import HookRef, Plugin, PluginRef 

18from mcpgateway.plugins.framework.external.mcp.client import ExternalHookRef 

19 

20# Use standard logging to avoid circular imports (plugins -> services -> plugins) 

21logger = logging.getLogger(__name__) 

22 

23 

24class PluginInstanceRegistry: 

25 """Registry for managing loaded plugins. 

26 

27 Examples: 

28 >>> from mcpgateway.plugins.framework import Plugin, PluginConfig 

29 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptHookType 

30 >>> registry = PluginInstanceRegistry() 

31 >>> config = PluginConfig( 

32 ... name="test", 

33 ... description="Test", 

34 ... author="test", 

35 ... kind="test.Plugin", 

36 ... version="1.0", 

37 ... hooks=[PromptHookType.PROMPT_PRE_FETCH], 

38 ... tags=[] 

39 ... ) 

40 >>> async def prompt_pre_fetch(payload, context): ... 

41 >>> plugin = Plugin(config) 

42 >>> plugin.prompt_pre_fetch = prompt_pre_fetch 

43 >>> registry.register(plugin) 

44 >>> registry.get_plugin("test").name 

45 'test' 

46 >>> len(registry.get_hook_refs_for_hook(PromptHookType.PROMPT_PRE_FETCH)) 

47 1 

48 >>> registry.unregister("test") 

49 >>> registry.get_plugin("test") is None 

50 True 

51 """ 

52 

53 def __init__(self) -> None: 

54 """Initialize a plugin instance registry. 

55 

56 Examples: 

57 >>> registry = PluginInstanceRegistry() 

58 >>> isinstance(registry._plugins, dict) 

59 True 

60 >>> isinstance(registry._hooks, dict) 

61 True 

62 >>> len(registry._plugins) 

63 0 

64 """ 

65 self._plugins: dict[str, PluginRef] = {} 

66 self._hooks: dict[str, list[HookRef]] = defaultdict(list) 

67 self._hooks_by_name: dict[str, dict[str, HookRef]] = {} 

68 self._priority_cache: dict[str, list[HookRef]] = {} 

69 

70 def register(self, plugin: Plugin) -> None: 

71 """Register a plugin instance. 

72 

73 Args: 

74 plugin: plugin to be registered. 

75 

76 Raises: 

77 ValueError: if plugin is already registered. 

78 """ 

79 if plugin.name in self._plugins: 

80 raise ValueError(f"Plugin {plugin.name} already registered") 

81 

82 plugin_ref = PluginRef(plugin) 

83 

84 self._plugins[plugin.name] = plugin_ref 

85 

86 plugin_hooks = {} 

87 

88 # Check if this is an external plugin by looking for the invoke_hook method 

89 # External plugins (MCP, gRPC, Unix socket) use invoke_hook instead of direct hook methods 

90 external = hasattr(plugin, "invoke_hook") and callable(getattr(plugin, "invoke_hook")) 

91 

92 # Register hooks 

93 for hook_type in plugin.hooks: 

94 hook_ref: HookRef 

95 if external: 

96 hook_ref = ExternalHookRef(hook_type, plugin_ref) 

97 else: 

98 hook_ref = HookRef(hook_type, plugin_ref) 

99 self._hooks[hook_type].append(hook_ref) 

100 plugin_hooks[hook_type] = hook_ref 

101 # Invalidate priority cache for this hook 

102 self._priority_cache.pop(hook_type, None) 

103 self._hooks_by_name[plugin.name] = plugin_hooks 

104 

105 logger.info(f"Registered plugin: {plugin.name} with hooks: {list(plugin.hooks)}") 

106 

107 def unregister(self, plugin_name: str) -> None: 

108 """Unregister a plugin given its name. 

109 

110 Args: 

111 plugin_name: The name of the plugin to unregister. 

112 

113 Returns: 

114 None 

115 """ 

116 if plugin_name not in self._plugins: 

117 return 

118 

119 plugin = self._plugins.pop(plugin_name) 

120 # Remove from hooks 

121 for hook_type in plugin.hooks: 

122 self._hooks[hook_type] = [p for p in self._hooks[hook_type] if p.plugin_ref.name != plugin_name] 

123 self._priority_cache.pop(hook_type, None) 

124 

125 # Remove from hooks by name 

126 self._hooks_by_name.pop(plugin_name, None) 

127 

128 logger.info(f"Unregistered plugin: {plugin_name}") 

129 

130 def get_plugin(self, name: str) -> Optional[PluginRef]: 

131 """Get a plugin by name. 

132 

133 Args: 

134 name: the name of the plugin to return. 

135 

136 Returns: 

137 A plugin. 

138 """ 

139 return self._plugins.get(name) 

140 

141 def get_plugin_hook_by_name(self, name: str, hook_type: str) -> Optional[HookRef]: 

142 """Gets a hook reference for a particular plugin and hook type. 

143 

144 Args: 

145 name: plugin name. 

146 hook_type: the hook type. 

147 

148 Returns: 

149 A hook reference for the plugin or None if not found. 

150 """ 

151 if name in self._hooks_by_name: 151 ↛ 155line 151 didn't jump to line 155 because the condition on line 151 was always true

152 hooks = self._hooks_by_name[name] 

153 if hook_type in hooks: 

154 return hooks[hook_type] 

155 return None 

156 

157 def get_hook_refs_for_hook(self, hook_type: str) -> list[HookRef]: 

158 """Get all plugins for a specific hook, sorted by priority. 

159 

160 Args: 

161 hook_type: the hook type. 

162 

163 Returns: 

164 A list of plugin instances. 

165 """ 

166 if hook_type not in self._priority_cache: 

167 hook_refs = sorted(self._hooks[hook_type], key=lambda p: p.plugin_ref.priority) 

168 self._priority_cache[hook_type] = hook_refs 

169 return self._priority_cache[hook_type] 

170 

171 def get_all_plugins(self) -> list[PluginRef]: 

172 """Get all registered plugin instances. 

173 

174 Returns: 

175 A list of registered plugin instances. 

176 """ 

177 return list(self._plugins.values()) 

178 

179 def has_hooks_for(self, hook_type: str) -> bool: 

180 """Check if there are any hooks registered for a specific hook type. 

181 

182 Args: 

183 hook_type: The type of hook to check for. 

184 

185 Returns: 

186 bool: True if there are hooks registered for the specified type, False otherwise. 

187 """ 

188 return bool(self._hooks.get(hook_type)) 

189 

190 @property 

191 def plugin_count(self) -> int: 

192 """Return the number of plugins registered. 

193 

194 Returns: 

195 The number of plugins registered. 

196 """ 

197 return len(self._plugins) 

198 

199 async def shutdown(self) -> None: 

200 """Shutdown all plugins.""" 

201 # Must cleanup the plugins in reverse of creating them to handle asyncio cleanup issues. 

202 # https://github.com/microsoft/semantic-kernel/issues/12627 

203 for plugin_ref in reversed(self._plugins.values()): 

204 try: 

205 await plugin_ref.plugin.shutdown() 

206 except Exception as e: 

207 logger.error(f"Error shutting down plugin {plugin_ref.plugin.name}: {e}") 

208 self._plugins.clear() 

209 self._hooks.clear() 

210 self._priority_cache.clear()