Coverage for mcpgateway / plugins / framework / manager.py: 96%
218 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/manager.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo
7Plugin manager.
8Module that manages and calls plugins at hookpoints throughout the gateway.
10This module provides the core plugin management functionality including:
11- Plugin lifecycle management (initialization, execution, shutdown)
12- Timeout protection for plugin execution
13- Context management with automatic cleanup
14- Priority-based plugin ordering
15- Conditional plugin execution based on prompts/servers/tenants
17Examples:
18 >>> # Initialize plugin manager with configuration
19 >>> manager = PluginManager("plugins/config.yaml")
20 >>> # await manager.initialize() # Called in async context
22 >>> # Create test payload and context
23 >>> from mcpgateway.plugins.framework.models import GlobalContext
24 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload
25 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={"user": "input"})
26 >>> context = GlobalContext(request_id="123")
27 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context) # Called in async context
28"""
30# Standard
31import asyncio
32import logging
33import threading
34from typing import Any, Optional, Union
36# First-Party
37from mcpgateway.plugins.framework.base import HookRef, Plugin
38from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError, PluginViolationError
39from mcpgateway.plugins.framework.loader.config import ConfigLoader
40from mcpgateway.plugins.framework.loader.plugin import PluginLoader
41from mcpgateway.plugins.framework.memory import copyonwrite
42from mcpgateway.plugins.framework.models import Config, GlobalContext, PluginContext, PluginContextTable, PluginErrorModel, PluginMode, PluginPayload, PluginResult
43from mcpgateway.plugins.framework.registry import PluginInstanceRegistry
44from mcpgateway.plugins.framework.utils import payload_matches
46# Use standard logging to avoid circular imports (plugins -> services -> plugins)
47logger = logging.getLogger(__name__)
49# Configuration constants
50DEFAULT_PLUGIN_TIMEOUT = 30 # seconds
51MAX_PAYLOAD_SIZE = 1_000_000 # 1MB
52CONTEXT_CLEANUP_INTERVAL = 300 # 5 minutes
53CONTEXT_MAX_AGE = 3600 # 1 hour
56class PluginTimeoutError(Exception):
57 """Raised when a plugin execution exceeds the timeout limit."""
60class PayloadSizeError(ValueError):
61 """Raised when a payload exceeds the maximum allowed size."""
64class PluginExecutor:
65 """Executes a list of plugins with timeout protection and error handling.
67 This class manages the execution of plugins in priority order, handling:
68 - Timeout protection for each plugin
69 - Context management between plugins
70 - Error isolation to prevent plugin failures from affecting the gateway
71 - Metadata aggregation from multiple plugins
73 Examples:
74 >>> executor = PluginExecutor()
75 >>> # In async context:
76 >>> # result, contexts = await executor.execute(
77 >>> # plugins=[plugin1, plugin2],
78 >>> # payload=payload,
79 >>> # global_context=context,
80 >>> # plugin_run=pre_prompt_fetch,
81 >>> # compare=pre_prompt_matches
82 >>> # )
83 """
85 def __init__(self, config: Optional[Config] = None, timeout: int = DEFAULT_PLUGIN_TIMEOUT):
86 """Initialize the plugin executor.
88 Args:
89 timeout: Maximum execution time per plugin in seconds.
90 config: the plugin manager configuration.
91 """
92 self.timeout = timeout
93 self.config = config
95 async def execute(
96 self,
97 hook_refs: list[HookRef],
98 payload: PluginPayload,
99 global_context: GlobalContext,
100 hook_type: str,
101 local_contexts: Optional[PluginContextTable] = None,
102 violations_as_exceptions: bool = False,
103 ) -> tuple[PluginResult, PluginContextTable | None]:
104 """Execute plugins in priority order with timeout protection.
106 Args:
107 hook_refs: List of hook references to execute, sorted by priority.
108 payload: The payload to be processed by plugins.
109 global_context: Shared context for all plugins containing request metadata.
110 hook_type: The hook type identifier (e.g., "tool_pre_invoke").
111 local_contexts: Optional existing contexts from previous hook executions.
112 violations_as_exceptions: Raise violations as exceptions rather than as returns.
114 Returns:
115 A tuple containing:
116 - PluginResult with processing status, modified payload, and metadata
117 - PluginContextTable with updated local contexts for each plugin
119 Raises:
120 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE.
121 PluginError: If there is an error inside a plugin.
122 PluginViolationError: If a violation occurs and violation_as_exceptions is set.
124 Examples:
125 >>> # Execute plugins with timeout protection
126 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptHookType
127 >>> executor = PluginExecutor(timeout=30)
128 >>> # Assuming you have a registry instance:
129 >>> # plugins = registry.get_plugins_for_hook(PromptHookType.PROMPT_PRE_FETCH)
130 >>> # In async context:
131 >>> # result, contexts = await executor.execute(
132 >>> # plugins=plugins,
133 >>> # payload=PromptPrehookPayload(prompt_id="123", name="test", args={}),
134 >>> # global_context=GlobalContext(request_id="123"),
135 >>> # plugin_run=pre_prompt_fetch,
136 >>> # compare=pre_prompt_matches
137 >>> # )
138 """
139 if not hook_refs:
140 return (PluginResult(modified_payload=None), None)
142 # Validate payload size
143 self._validate_payload_size(payload)
145 res_local_contexts = {}
146 combined_metadata: dict[str, Any] = {}
147 current_payload: PluginPayload | None = None
149 for hook_ref in hook_refs:
150 # Skip disabled plugins
151 if hook_ref.plugin_ref.mode == PluginMode.DISABLED: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 continue
154 # Check if plugin conditions match current context
155 if hook_ref.plugin_ref.conditions and not payload_matches(payload, hook_type, hook_ref.plugin_ref.conditions, global_context):
156 logger.debug("Skipping plugin %s - conditions not met", hook_ref.plugin_ref.name)
157 continue
159 tmp_global_context = GlobalContext(
160 request_id=global_context.request_id,
161 user=global_context.user,
162 tenant_id=global_context.tenant_id,
163 server_id=global_context.server_id,
164 state={} if not global_context.state else copyonwrite(global_context.state),
165 metadata={} if not global_context.metadata else copyonwrite(global_context.metadata),
166 )
167 # Get or create local context for this plugin
168 local_context_key = global_context.request_id + hook_ref.plugin_ref.uuid
169 if local_contexts and local_context_key in local_contexts:
170 local_context = local_contexts[local_context_key]
171 local_context.global_context = tmp_global_context
172 else:
173 local_context = PluginContext(global_context=tmp_global_context)
174 res_local_contexts[local_context_key] = local_context
176 # Execute plugin with timeout protection
177 result = await self.execute_plugin(
178 hook_ref,
179 current_payload or payload,
180 local_context,
181 violations_as_exceptions,
182 global_context,
183 combined_metadata,
184 )
185 # Track payload modifications
186 if result.modified_payload is not None:
187 current_payload = result.modified_payload
188 if not result.continue_processing and hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE:
189 return (result, res_local_contexts)
191 return (
192 PluginResult(continue_processing=True, modified_payload=current_payload, violation=None, metadata=combined_metadata),
193 res_local_contexts,
194 )
196 async def execute_plugin(
197 self,
198 hook_ref: HookRef,
199 payload: PluginPayload,
200 local_context: PluginContext,
201 violations_as_exceptions: bool,
202 global_context: Optional[GlobalContext] = None,
203 combined_metadata: Optional[dict[str, Any]] = None,
204 ) -> PluginResult:
205 """Execute a single plugin with timeout protection.
207 Args:
208 hook_ref: Hooking structure that contains the plugin and hook.
209 payload: The payload to be processed by plugins.
210 local_context: local context.
211 violations_as_exceptions: Raise violations as exceptions rather than as returns.
212 global_context: Shared context for all plugins containing request metadata.
213 combined_metadata: combination of the metadata of all plugins.
215 Returns:
216 A tuple containing:
217 - PluginResult with processing status, modified payload, and metadata
218 - PluginContextTable with updated local contexts for each plugin
220 Raises:
221 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE.
222 PluginError: If there is an error inside a plugin.
223 PluginViolationError: If a violation occurs and violation_as_exceptions is set.
224 """
225 try:
226 # Execute plugin with timeout protection
227 result = await self._execute_with_timeout(hook_ref, payload, local_context)
228 if local_context.global_context and global_context:
229 global_context.state.update(local_context.global_context.state)
230 global_context.metadata.update(local_context.global_context.metadata)
231 # Aggregate metadata from all plugins
232 if result.metadata and combined_metadata is not None:
233 combined_metadata.update(result.metadata)
235 # Track payload modifications
236 # if result.modified_payload is not None:
237 # current_payload = result.modified_payload
239 # Set plugin name in violation if present
240 if result.violation:
241 result.violation.plugin_name = hook_ref.plugin_ref.plugin.name
243 # Handle plugin blocking the request
244 if not result.continue_processing:
245 if hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE:
246 logger.warning("Plugin %s blocked request in enforce mode", hook_ref.plugin_ref.plugin.name)
247 if violations_as_exceptions:
248 if result.violation: 248 ↛ 257line 248 didn't jump to line 257 because the condition on line 248 was always true
249 plugin_name = result.violation.plugin_name
250 violation_reason = result.violation.reason
251 violation_desc = result.violation.description
252 violation_code = result.violation.code
253 raise PluginViolationError(
254 f"{hook_ref.name} blocked by plugin {plugin_name}: {violation_code} - {violation_reason} ({violation_desc})",
255 violation=result.violation,
256 )
257 raise PluginViolationError(f"{hook_ref.name} blocked by plugin")
258 return PluginResult(
259 continue_processing=False,
260 modified_payload=payload,
261 violation=result.violation,
262 metadata=combined_metadata,
263 )
264 if hook_ref.plugin_ref.plugin.mode == PluginMode.PERMISSIVE: 264 ↛ 270line 264 didn't jump to line 270 because the condition on line 264 was always true
265 logger.warning(
266 "Plugin %s would block (permissive mode): %s",
267 hook_ref.plugin_ref.plugin.name,
268 result.violation.description if result.violation else "No description",
269 )
270 return result
271 except asyncio.TimeoutError as exc:
272 logger.error("Plugin %s timed out after %ds", hook_ref.plugin_ref.name, self.timeout)
273 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE:
274 raise PluginError(
275 error=PluginErrorModel(
276 message=f"Plugin {hook_ref.plugin_ref.name} exceeded {self.timeout}s timeout",
277 plugin_name=hook_ref.plugin_ref.name,
278 )
279 ) from exc
280 # In permissive or enforce_ignore_error mode, continue with next plugin
281 except PluginViolationError:
282 raise
283 except PluginError as pe:
284 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(pe))
285 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE: 285 ↛ 293line 285 didn't jump to line 293 because the condition on line 285 was always true
286 raise
287 except Exception as e:
288 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(e))
289 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE:
290 raise PluginError(error=convert_exception_to_error(e, hook_ref.plugin_ref.name)) from e
291 # In permissive or enforce_ignore_error mode, continue with next plugin
292 # Return a result indicating processing should continue despite the error
293 return PluginResult(continue_processing=True)
295 async def _execute_with_timeout(self, hook_ref: HookRef, payload: PluginPayload, context: PluginContext) -> PluginResult:
296 """Execute a plugin with timeout protection.
298 Args:
299 hook_ref: Reference to the hook and plugin to execute.
300 payload: Payload to process.
301 context: Plugin execution context.
303 Returns:
304 Result from plugin execution.
306 Raises:
307 asyncio.TimeoutError: If plugin exceeds timeout.
308 """
309 # Add observability tracing for plugin execution
310 try:
311 # First-Party
312 # pylint: disable=import-outside-toplevel
313 from mcpgateway.db import SessionLocal
314 from mcpgateway.services.observability_service import current_trace_id, ObservabilityService
316 # pylint: enable=import-outside-toplevel
318 trace_id = current_trace_id.get()
319 if trace_id:
320 db = SessionLocal()
321 try:
322 service = ObservabilityService()
323 span_id = service.start_span(
324 db=db,
325 trace_id=trace_id,
326 name=f"plugin.execute.{hook_ref.plugin_ref.name}",
327 kind="internal",
328 resource_type="plugin",
329 resource_name=hook_ref.plugin_ref.name,
330 attributes={
331 "plugin.name": hook_ref.plugin_ref.name,
332 "plugin.uuid": hook_ref.plugin_ref.uuid,
333 "plugin.mode": hook_ref.plugin_ref.mode.value if hasattr(hook_ref.plugin_ref.mode, "value") else str(hook_ref.plugin_ref.mode),
334 "plugin.priority": hook_ref.plugin_ref.priority,
335 "plugin.timeout": self.timeout,
336 },
337 )
339 # Execute plugin
340 result = await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout)
342 # End span with success
343 service.end_span(
344 db=db,
345 span_id=span_id,
346 status="ok",
347 attributes={
348 "plugin.had_violation": result.violation is not None,
349 "plugin.modified_payload": result.modified_payload is not None,
350 },
351 )
352 return result
353 finally:
354 db.close() # Observability service handles its own commits
355 else:
356 # No active trace, execute without instrumentation
357 return await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout)
359 except Exception as e:
360 # If observability setup fails, continue without instrumentation
361 logger.debug("Plugin observability setup failed: %s", e)
362 return await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout)
364 def _validate_payload_size(self, payload: Any) -> None:
365 """Validate that payload doesn't exceed size limits.
367 Args:
368 payload: The payload to validate.
370 Raises:
371 PayloadSizeError: If payload exceeds MAX_PAYLOAD_SIZE.
372 """
373 # For PromptPrehookPayload, check args size
374 if hasattr(payload, "args") and payload.args:
375 total_size = sum(len(str(v)) for v in payload.args.values())
376 if total_size > MAX_PAYLOAD_SIZE:
377 raise PayloadSizeError(f"Payload size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes")
378 # For PromptPosthookPayload, check result size
379 elif hasattr(payload, "result") and payload.result:
380 # Estimate size of result messages
381 total_size = len(str(payload.result))
382 if total_size > MAX_PAYLOAD_SIZE:
383 raise PayloadSizeError(f"Result size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes")
386class PluginManager:
387 """Plugin manager for managing the plugin lifecycle.
389 This class implements a thread-safe Borg singleton pattern to ensure consistent
390 plugin management across the application. It handles:
391 - Plugin discovery and loading from configuration
392 - Plugin lifecycle management (initialization, execution, shutdown)
393 - Context management with automatic cleanup
394 - Hook execution orchestration
396 Thread Safety:
397 Uses double-checked locking to prevent race conditions when multiple threads
398 create PluginManager instances simultaneously. The first instance to acquire
399 the lock loads the configuration; subsequent instances reuse the shared state.
401 Attributes:
402 config: The loaded plugin configuration.
403 plugin_count: Number of currently loaded plugins.
404 initialized: Whether the manager has been initialized.
406 Examples:
407 >>> # Initialize plugin manager
408 >>> manager = PluginManager("plugins/config.yaml")
409 >>> # In async context:
410 >>> # await manager.initialize()
411 >>> # print(f"Loaded {manager.plugin_count} plugins")
412 >>>
413 >>> # Execute prompt hooks
414 >>> from mcpgateway.plugins.framework.models import GlobalContext
415 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload
416 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={})
417 >>> context = GlobalContext(request_id="req-123")
418 >>> # In async context:
419 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context)
420 >>>
421 >>> # Shutdown when done
422 >>> # await manager.shutdown()
423 """
425 __shared_state: dict[Any, Any] = {}
426 __lock: threading.Lock = threading.Lock() # Thread safety for synchronous init
427 _async_lock: asyncio.Lock | None = None # Async lock for initialize/shutdown
428 _loader: PluginLoader = PluginLoader()
429 _initialized: bool = False
430 _registry: PluginInstanceRegistry = PluginInstanceRegistry()
431 _config: Config | None = None
432 _config_path: str | None = None
433 _executor: PluginExecutor = PluginExecutor()
435 def __init__(self, config: str = "", timeout: int = DEFAULT_PLUGIN_TIMEOUT):
436 """Initialize plugin manager.
438 PluginManager implements a thread-safe Borg singleton:
439 - Shared state is initialized only once across all instances.
440 - Subsequent instantiations reuse same state and skip config reload.
441 - Uses double-checked locking to prevent race conditions in multi-threaded environments.
443 Thread Safety:
444 The initialization uses a double-checked locking pattern to ensure that
445 config loading only happens once, even when multiple threads create
446 PluginManager instances simultaneously.
448 Args:
449 config: Path to plugin configuration file (YAML).
450 timeout: Maximum execution time per plugin in seconds.
452 Examples:
453 >>> # Initialize with configuration file
454 >>> manager = PluginManager("plugins/config.yaml")
456 >>> # Initialize with custom timeout
457 >>> manager = PluginManager("plugins/config.yaml", timeout=60)
458 """
459 self.__dict__ = self.__shared_state
461 # Only initialize once (first instance when shared state is empty)
462 # Use lock to prevent race condition in multi-threaded environments
463 if not self.__shared_state:
464 with self.__lock:
465 # Double-check after acquiring lock (another thread may have initialized)
466 if not self.__shared_state:
467 if config:
468 self._config = ConfigLoader.load_config(config)
469 self._config_path = config
471 # Update executor timeouts
472 self._executor.config = self._config
473 self._executor.timeout = timeout
475 @classmethod
476 def reset(cls) -> None:
477 """Reset the Borg pattern shared state.
479 This method clears all shared state, allowing a fresh PluginManager
480 instance to be created with new configuration. Primarily used for testing.
482 Thread-safe: Uses lock to ensure atomic reset operation.
484 Examples:
485 >>> # Between tests, reset shared state
486 >>> PluginManager.reset()
487 >>> manager = PluginManager("new_config.yaml")
488 """
489 with cls.__lock:
490 cls.__shared_state.clear()
491 cls._initialized = False
492 cls._config = None
493 cls._config_path = None
494 cls._async_lock = None
495 cls._registry = PluginInstanceRegistry()
496 cls._executor = PluginExecutor()
497 cls._loader = PluginLoader()
499 @property
500 def config(self) -> Config | None:
501 """Plugin manager configuration.
503 Returns:
504 The plugin configuration object or None if not configured.
505 """
506 return self._config
508 @property
509 def plugin_count(self) -> int:
510 """Number of plugins loaded.
512 Returns:
513 The number of currently loaded plugins.
514 """
515 return self._registry.plugin_count
517 @property
518 def initialized(self) -> bool:
519 """Plugin manager initialization status.
521 Returns:
522 True if the plugin manager has been initialized.
523 """
524 return self._initialized
526 def get_plugin(self, name: str) -> Optional[Plugin]:
527 """Get a plugin by name.
529 Args:
530 name: the name of the plugin to return.
532 Returns:
533 A plugin.
534 """
535 plugin_ref = self._registry.get_plugin(name)
536 return plugin_ref.plugin if plugin_ref else None
538 def has_hooks_for(self, hook_type: str) -> bool:
539 """Check if there are any hooks registered for a specific hook type.
541 Args:
542 hook_type: The type of hook to check for.
544 Returns:
545 True if there are hooks registered for the specified type, False otherwise.
546 """
547 return self._registry.has_hooks_for(hook_type)
549 async def initialize(self) -> None:
550 """Initialize the plugin manager and load all configured plugins.
552 This method:
553 1. Loads plugin configurations from the config file
554 2. Instantiates each enabled plugin
555 3. Registers plugins with the registry
556 4. Validates plugin initialization
558 Thread Safety:
559 Uses asyncio.Lock to prevent concurrent initialization from multiple
560 coroutines or async tasks. Combined with threading.Lock in __init__
561 for full multi-threaded safety.
563 Raises:
564 RuntimeError: If plugin initialization fails with an exception.
565 ValueError: If a plugin cannot be initialized or registered.
567 Examples:
568 >>> manager = PluginManager("plugins/config.yaml")
569 >>> # In async context:
570 >>> # await manager.initialize()
571 >>> # Manager is now ready to execute plugins
572 """
573 # Initialize async lock lazily (can't create asyncio.Lock in class definition)
574 if self._async_lock is None:
575 self._async_lock = asyncio.Lock()
577 async with self._async_lock:
578 # Double-check after acquiring lock
579 if self._initialized:
580 logger.debug("Plugin manager already initialized")
581 return
583 # Defensive cleanup: registry should be empty when not initialized
584 if self._registry.plugin_count: 584 ↛ 585line 584 didn't jump to line 585 because the condition on line 584 was never true
585 logger.debug("Plugin registry not empty before initialize; clearing stale plugins")
586 await self._registry.shutdown()
588 plugins = self._config.plugins if self._config and self._config.plugins else []
589 loaded_count = 0
591 for plugin_config in plugins:
592 try:
593 # For disabled plugins, create a stub plugin without full instantiation
594 if plugin_config.mode != PluginMode.DISABLED:
595 # Fully instantiate enabled plugins
596 plugin = await self._loader.load_and_instantiate_plugin(plugin_config)
597 if plugin:
598 self._registry.register(plugin)
599 loaded_count += 1
600 logger.info("Loaded plugin: %s (mode: %s)", plugin_config.name, plugin_config.mode)
601 else:
602 raise ValueError(f"Unable to instantiate plugin: {plugin_config.name}")
603 else:
604 logger.info("Plugin: %s is disabled. Ignoring.", plugin_config.name)
606 except Exception as e:
607 # Clean error message without stack trace spam
608 logger.error("Failed to load plugin %s: {%s}", plugin_config.name, str(e))
609 # Let it crash gracefully with a clean error
610 raise RuntimeError(f"Plugin initialization failed: {plugin_config.name} - {str(e)}") from e
612 self._initialized = True
613 logger.info("Plugin manager initialized with %s plugins", loaded_count)
615 async def shutdown(self) -> None:
616 """Shutdown all plugins and cleanup resources.
618 This method:
619 1. Shuts down all registered plugins
620 2. Clears the plugin registry
621 3. Cleans up stored contexts
622 4. Resets initialization state
624 Thread Safety:
625 Uses asyncio.Lock to prevent concurrent shutdown with initialization
626 or with another shutdown call.
628 Note: The config is preserved to allow modifying settings and re-initializing.
629 To fully reset for a new config, create a new PluginManager instance.
631 Examples:
632 >>> manager = PluginManager("plugins/config.yaml")
633 >>> # In async context:
634 >>> # await manager.initialize()
635 >>> # ... use the manager ...
636 >>> # await manager.shutdown()
637 """
638 # Initialize async lock lazily if needed
639 if self._async_lock is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 self._async_lock = asyncio.Lock()
642 async with self._async_lock:
643 if not self._initialized:
644 logger.debug("Plugin manager not initialized, nothing to shutdown")
645 return
647 logger.info("Shutting down plugin manager")
649 # Shutdown all plugins
650 await self._registry.shutdown()
652 # Reset state to allow re-initialization
653 self._initialized = False
655 logger.info("Plugin manager shutdown complete")
657 async def invoke_hook(
658 self,
659 hook_type: str,
660 payload: PluginPayload,
661 global_context: GlobalContext,
662 local_contexts: Optional[PluginContextTable] = None,
663 violations_as_exceptions: bool = False,
664 ) -> tuple[PluginResult, PluginContextTable | None]:
665 """Invoke a set of plugins configured for the hook point in priority order.
667 Args:
668 hook_type: The type of hook to execute.
669 payload: The plugin payload for which the plugins will analyze and modify.
670 global_context: Shared context for all plugins with request metadata.
671 local_contexts: Optional existing contexts from previous hook executions.
672 violations_as_exceptions: Raise violations as exceptions rather than as returns.
674 Returns:
675 A tuple containing:
676 - PluginResult with processing status and modified payload
677 - PluginContextTable with plugin contexts for state management
679 Examples:
680 >>> manager = PluginManager("plugins/config.yaml")
681 >>> # In async context:
682 >>> # await manager.initialize()
683 >>> # payload = ResourcePreFetchPayload("file:///data.txt")
684 >>> # context = GlobalContext(request_id="123", server_id="srv1")
685 >>> # result, contexts = await manager.resource_pre_fetch(payload, context)
686 >>> # if result.continue_processing:
687 >>> # # Use modified payload
688 >>> # uri = result.modified_payload.uri
689 """
690 # Get plugins configured for this hook
691 hook_refs = self._registry.get_hook_refs_for_hook(hook_type=hook_type)
693 # Execute plugins
694 result = await self._executor.execute(hook_refs, payload, global_context, hook_type, local_contexts, violations_as_exceptions)
696 return result
698 async def invoke_hook_for_plugin(
699 self,
700 name: str,
701 hook_type: str,
702 payload: Union[PluginPayload, dict[str, Any], str],
703 context: Union[PluginContext, GlobalContext],
704 violations_as_exceptions: bool = False,
705 payload_as_json: bool = False,
706 ) -> PluginResult:
707 """Invoke a specific hook for a single named plugin.
709 This method allows direct invocation of a particular plugin's hook by name,
710 bypassing the normal priority-ordered execution. Useful for testing individual
711 plugins or when specific plugin behavior needs to be triggered independently.
713 Args:
714 name: The name of the plugin to invoke.
715 hook_type: The type of hook to execute (e.g., "prompt_pre_fetch").
716 payload: The plugin payload to be processed by the hook.
717 context: Plugin execution context (PluginContext) or GlobalContext (will be wrapped).
718 violations_as_exceptions: Raise violations as exceptions rather than returns.
719 payload_as_json: payload passed in as json rather than pydantic.
721 Returns:
722 PluginResult with processing status, modified payload, and metadata.
724 Raises:
725 PluginError: If the plugin or hook type cannot be found in the registry.
726 ValueError: If payload type does not match payload_as_json setting.
728 Examples:
729 >>> manager = PluginManager("plugins/config.yaml")
730 >>> # In async context:
731 >>> # await manager.initialize()
732 >>> # payload = PromptPrehookPayload(name="test", args={})
733 >>> # context = PluginContext(global_context=GlobalContext(request_id="123"))
734 >>> # result = await manager.invoke_hook_for_plugin(
735 >>> # name="auth_plugin",
736 >>> # hook_type="prompt_pre_fetch",
737 >>> # payload=payload,
738 >>> # context=context
739 >>> # )
740 """
741 # Auto-wrap GlobalContext in PluginContext for convenience
742 if isinstance(context, GlobalContext):
743 context = PluginContext(global_context=context)
745 hook_ref = self._registry.get_plugin_hook_by_name(name, hook_type)
746 if not hook_ref:
747 raise PluginError(
748 error=PluginErrorModel(
749 message=f"Unable to find {hook_type} for plugin {name}. Make sure the plugin is registered.",
750 plugin_name=name,
751 )
752 )
753 if payload_as_json:
754 plugin = hook_ref.plugin_ref.plugin
755 # When payload_as_json=True, payload should be str or dict
756 if isinstance(payload, (str, dict)):
757 pydantic_payload = plugin.json_to_payload(hook_type, payload)
758 return await self._executor.execute_plugin(hook_ref, pydantic_payload, context, violations_as_exceptions)
759 raise ValueError(f"When payload_as_json=True, payload must be str or dict, got {type(payload)}")
760 # When payload_as_json=False, payload should already be a PluginPayload
761 if not isinstance(payload, PluginPayload):
762 raise ValueError(f"When payload_as_json=False, payload must be a PluginPayload, got {type(payload)}")
763 return await self._executor.execute_plugin(hook_ref, payload, context, violations_as_exceptions)