Coverage for mcpgateway / plugins / framework / registry.py: 99%
66 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/registry.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Plugin instance registry.
8Module that stores plugin instances and manages hook points.
9"""
11# Standard
12from collections import defaultdict
13import logging
14from typing import Optional
16# First-Party
17from mcpgateway.plugins.framework.base import HookRef, Plugin, PluginRef
18from mcpgateway.plugins.framework.external.mcp.client import ExternalHookRef
20# Use standard logging to avoid circular imports (plugins -> services -> plugins)
21logger = logging.getLogger(__name__)
24class PluginInstanceRegistry:
25 """Registry for managing loaded plugins.
27 Examples:
28 >>> from mcpgateway.plugins.framework import Plugin, PluginConfig
29 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptHookType
30 >>> registry = PluginInstanceRegistry()
31 >>> config = PluginConfig(
32 ... name="test",
33 ... description="Test",
34 ... author="test",
35 ... kind="test.Plugin",
36 ... version="1.0",
37 ... hooks=[PromptHookType.PROMPT_PRE_FETCH],
38 ... tags=[]
39 ... )
40 >>> async def prompt_pre_fetch(payload, context): ...
41 >>> plugin = Plugin(config)
42 >>> plugin.prompt_pre_fetch = prompt_pre_fetch
43 >>> registry.register(plugin)
44 >>> registry.get_plugin("test").name
45 'test'
46 >>> len(registry.get_hook_refs_for_hook(PromptHookType.PROMPT_PRE_FETCH))
47 1
48 >>> registry.unregister("test")
49 >>> registry.get_plugin("test") is None
50 True
51 """
53 def __init__(self) -> None:
54 """Initialize a plugin instance registry.
56 Examples:
57 >>> registry = PluginInstanceRegistry()
58 >>> isinstance(registry._plugins, dict)
59 True
60 >>> isinstance(registry._hooks, dict)
61 True
62 >>> len(registry._plugins)
63 0
64 """
65 self._plugins: dict[str, PluginRef] = {}
66 self._hooks: dict[str, list[HookRef]] = defaultdict(list)
67 self._hooks_by_name: dict[str, dict[str, HookRef]] = {}
68 self._priority_cache: dict[str, list[HookRef]] = {}
70 def register(self, plugin: Plugin) -> None:
71 """Register a plugin instance.
73 Args:
74 plugin: plugin to be registered.
76 Raises:
77 ValueError: if plugin is already registered.
78 """
79 if plugin.name in self._plugins:
80 raise ValueError(f"Plugin {plugin.name} already registered")
82 plugin_ref = PluginRef(plugin)
84 self._plugins[plugin.name] = plugin_ref
86 plugin_hooks = {}
88 # Check if this is an external plugin by looking for the invoke_hook method
89 # External plugins (MCP, gRPC, Unix socket) use invoke_hook instead of direct hook methods
90 external = hasattr(plugin, "invoke_hook") and callable(getattr(plugin, "invoke_hook"))
92 # Register hooks
93 for hook_type in plugin.hooks:
94 hook_ref: HookRef
95 if external:
96 hook_ref = ExternalHookRef(hook_type, plugin_ref)
97 else:
98 hook_ref = HookRef(hook_type, plugin_ref)
99 self._hooks[hook_type].append(hook_ref)
100 plugin_hooks[hook_type] = hook_ref
101 # Invalidate priority cache for this hook
102 self._priority_cache.pop(hook_type, None)
103 self._hooks_by_name[plugin.name] = plugin_hooks
105 logger.info(f"Registered plugin: {plugin.name} with hooks: {list(plugin.hooks)}")
107 def unregister(self, plugin_name: str) -> None:
108 """Unregister a plugin given its name.
110 Args:
111 plugin_name: The name of the plugin to unregister.
113 Returns:
114 None
115 """
116 if plugin_name not in self._plugins:
117 return
119 plugin = self._plugins.pop(plugin_name)
120 # Remove from hooks
121 for hook_type in plugin.hooks:
122 self._hooks[hook_type] = [p for p in self._hooks[hook_type] if p.plugin_ref.name != plugin_name]
123 self._priority_cache.pop(hook_type, None)
125 # Remove from hooks by name
126 self._hooks_by_name.pop(plugin_name, None)
128 logger.info(f"Unregistered plugin: {plugin_name}")
130 def get_plugin(self, name: str) -> Optional[PluginRef]:
131 """Get a plugin by name.
133 Args:
134 name: the name of the plugin to return.
136 Returns:
137 A plugin.
138 """
139 return self._plugins.get(name)
141 def get_plugin_hook_by_name(self, name: str, hook_type: str) -> Optional[HookRef]:
142 """Gets a hook reference for a particular plugin and hook type.
144 Args:
145 name: plugin name.
146 hook_type: the hook type.
148 Returns:
149 A hook reference for the plugin or None if not found.
150 """
151 if name in self._hooks_by_name: 151 ↛ 155line 151 didn't jump to line 155 because the condition on line 151 was always true
152 hooks = self._hooks_by_name[name]
153 if hook_type in hooks:
154 return hooks[hook_type]
155 return None
157 def get_hook_refs_for_hook(self, hook_type: str) -> list[HookRef]:
158 """Get all plugins for a specific hook, sorted by priority.
160 Args:
161 hook_type: the hook type.
163 Returns:
164 A list of plugin instances.
165 """
166 if hook_type not in self._priority_cache:
167 hook_refs = sorted(self._hooks[hook_type], key=lambda p: p.plugin_ref.priority)
168 self._priority_cache[hook_type] = hook_refs
169 return self._priority_cache[hook_type]
171 def get_all_plugins(self) -> list[PluginRef]:
172 """Get all registered plugin instances.
174 Returns:
175 A list of registered plugin instances.
176 """
177 return list(self._plugins.values())
179 def has_hooks_for(self, hook_type: str) -> bool:
180 """Check if there are any hooks registered for a specific hook type.
182 Args:
183 hook_type: The type of hook to check for.
185 Returns:
186 bool: True if there are hooks registered for the specified type, False otherwise.
187 """
188 return bool(self._hooks.get(hook_type))
190 @property
191 def plugin_count(self) -> int:
192 """Return the number of plugins registered.
194 Returns:
195 The number of plugins registered.
196 """
197 return len(self._plugins)
199 async def shutdown(self) -> None:
200 """Shutdown all plugins."""
201 # Must cleanup the plugins in reverse of creating them to handle asyncio cleanup issues.
202 # https://github.com/microsoft/semantic-kernel/issues/12627
203 for plugin_ref in reversed(self._plugins.values()):
204 try:
205 await plugin_ref.plugin.shutdown()
206 except Exception as e:
207 logger.error(f"Error shutting down plugin {plugin_ref.plugin.name}: {e}")
208 self._plugins.clear()
209 self._hooks.clear()
210 self._priority_cache.clear()