Coverage for mcpgateway / plugins / framework / manager.py: 98%
292 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/manager.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo
7Plugin manager.
8Module that manages and calls plugins at hookpoints throughout the gateway.
10This module provides the core plugin management functionality including:
11- Plugin lifecycle management (initialization, execution, shutdown)
12- Timeout protection for plugin execution
13- Context management with automatic cleanup
14- Priority-based plugin ordering
15- Conditional plugin execution based on prompts/servers/tenants
17Examples:
18 >>> # Initialize plugin manager with configuration
19 >>> manager = PluginManager("plugins/config.yaml")
20 >>> # await manager.initialize() # Called in async context
22 >>> # Create test payload and context
23 >>> from mcpgateway.plugins.framework.models import GlobalContext
24 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload
25 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={"user": "input"})
26 >>> context = GlobalContext(request_id="123")
27 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context) # Called in async context
28"""
30# Standard
31import asyncio
32import copy
33import logging
34import threading
35from typing import Any, Optional, Union
37# Third-Party
38from pydantic import BaseModel, RootModel
40# First-Party
41from mcpgateway.plugins.framework.base import HookRef, Plugin
42from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError, PluginViolationError
43from mcpgateway.plugins.framework.hooks.policies import apply_policy, DefaultHookPolicy, HookPayloadPolicy
44from mcpgateway.plugins.framework.loader.config import ConfigLoader
45from mcpgateway.plugins.framework.loader.plugin import PluginLoader
46from mcpgateway.plugins.framework.memory import copyonwrite
47from mcpgateway.plugins.framework.models import Config, GlobalContext, PluginContext, PluginContextTable, PluginErrorModel, PluginMode, PluginPayload, PluginResult
48from mcpgateway.plugins.framework.observability import current_trace_id, ObservabilityProvider
49from mcpgateway.plugins.framework.registry import PluginInstanceRegistry
50from mcpgateway.plugins.framework.settings import settings
51from mcpgateway.plugins.framework.utils import payload_matches
53# Use standard logging to avoid circular imports (plugins -> services -> plugins)
54logger = logging.getLogger(__name__)
56# Configuration constants
57DEFAULT_PLUGIN_TIMEOUT = 30 # seconds
58MAX_PAYLOAD_SIZE = 1_000_000 # 1MB
59CONTEXT_CLEANUP_INTERVAL = 300 # 5 minutes
60CONTEXT_MAX_AGE = 3600 # 1 hour
61HTTP_AUTH_CHECK_PERMISSION_HOOK = "http_auth_check_permission"
62DECISION_PLUGIN_METADATA_KEY = "_decision_plugin"
63RESERVED_INTERNAL_METADATA_KEYS = frozenset({DECISION_PLUGIN_METADATA_KEY})
66class PluginTimeoutError(Exception):
67 """Raised when a plugin execution exceeds the timeout limit."""
70class PayloadSizeError(ValueError):
71 """Raised when a payload exceeds the maximum allowed size."""
74class PluginExecutor:
75 """Executes a list of plugins with timeout protection and error handling.
77 This class manages the execution of plugins in priority order, handling:
78 - Timeout protection for each plugin
79 - Context management between plugins
80 - Error isolation to prevent plugin failures from affecting the gateway
81 - Metadata aggregation from multiple plugins
83 Examples:
84 >>> executor = PluginExecutor()
85 >>> # In async context:
86 >>> # result, contexts = await executor.execute(
87 >>> # plugins=[plugin1, plugin2],
88 >>> # payload=payload,
89 >>> # global_context=context,
90 >>> # plugin_run=pre_prompt_fetch,
91 >>> # compare=pre_prompt_matches
92 >>> # )
93 """
95 def __init__(
96 self,
97 config: Optional[Config] = None,
98 timeout: int = DEFAULT_PLUGIN_TIMEOUT,
99 observability: Optional[ObservabilityProvider] = None,
100 hook_policies: Optional[dict[str, HookPayloadPolicy]] = None,
101 ):
102 """Initialize the plugin executor.
104 Args:
105 config: the plugin manager configuration.
106 timeout: Maximum execution time per plugin in seconds.
107 observability: Optional observability provider implementing ObservabilityProvider protocol.
108 hook_policies: Per-hook-type payload modification policies.
109 """
110 self.timeout = timeout
111 self.config = config
112 self.observability = observability
113 self.hook_policies: dict[str, HookPayloadPolicy] = hook_policies or {}
114 self.default_hook_policy = DefaultHookPolicy(settings.default_hook_policy)
116 async def execute(
117 self,
118 hook_refs: list[HookRef],
119 payload: PluginPayload,
120 global_context: GlobalContext,
121 hook_type: str,
122 local_contexts: Optional[PluginContextTable] = None,
123 violations_as_exceptions: bool = False,
124 ) -> tuple[PluginResult, PluginContextTable | None]:
125 """Execute plugins in priority order with timeout protection.
127 Args:
128 hook_refs: List of hook references to execute, sorted by priority.
129 payload: The payload to be processed by plugins.
130 global_context: Shared context for all plugins containing request metadata.
131 hook_type: The hook type identifier (e.g., "tool_pre_invoke").
132 local_contexts: Optional existing contexts from previous hook executions.
133 violations_as_exceptions: Raise violations as exceptions rather than as returns.
135 Returns:
136 A tuple containing:
137 - PluginResult with processing status, modified payload, and metadata
138 - PluginContextTable with updated local contexts for each plugin
140 Raises:
141 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE.
142 PluginError: If there is an error inside a plugin.
143 PluginViolationError: If a violation occurs and violation_as_exceptions is set.
145 Examples:
146 >>> # Execute plugins with timeout protection
147 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptHookType
148 >>> executor = PluginExecutor(timeout=30)
149 >>> # Assuming you have a registry instance:
150 >>> # plugins = registry.get_plugins_for_hook(PromptHookType.PROMPT_PRE_FETCH)
151 >>> # In async context:
152 >>> # result, contexts = await executor.execute(
153 >>> # plugins=plugins,
154 >>> # payload=PromptPrehookPayload(prompt_id="123", name="test", args={}),
155 >>> # global_context=GlobalContext(request_id="123"),
156 >>> # plugin_run=pre_prompt_fetch,
157 >>> # compare=pre_prompt_matches
158 >>> # )
159 """
160 if not hook_refs:
161 return (PluginResult(modified_payload=None), None)
163 # Validate payload size
164 self._validate_payload_size(payload)
166 # Look up the policy for this hook type (may be None)
167 policy = self.hook_policies.get(hook_type)
169 res_local_contexts = {}
170 combined_metadata: dict[str, Any] = {}
171 current_payload: PluginPayload | None = None
172 decision_plugin_name: Optional[str] = None
174 for hook_ref in hook_refs:
175 # Skip disabled plugins
176 if hook_ref.plugin_ref.mode == PluginMode.DISABLED:
177 continue
179 # Check if plugin conditions match current context
180 if hook_ref.plugin_ref.conditions and not payload_matches(payload, hook_type, hook_ref.plugin_ref.conditions, global_context):
181 logger.debug("Skipping plugin %s - conditions not met", hook_ref.plugin_ref.name)
182 continue
184 tmp_global_context = GlobalContext(
185 request_id=global_context.request_id,
186 user=global_context.user,
187 tenant_id=global_context.tenant_id,
188 server_id=global_context.server_id,
189 state={} if not global_context.state else copyonwrite(global_context.state),
190 metadata={} if not global_context.metadata else copyonwrite(global_context.metadata),
191 )
192 # Get or create local context for this plugin
193 local_context_key = global_context.request_id + hook_ref.plugin_ref.uuid
194 if local_contexts and local_context_key in local_contexts:
195 local_context = local_contexts[local_context_key]
196 local_context.global_context = tmp_global_context
197 else:
198 local_context = PluginContext(global_context=tmp_global_context)
199 res_local_contexts[local_context_key] = local_context
201 # When a policy exists or default=deny is active, deep-copy the
202 # payload before handing it to the plugin. The plugin operates on
203 # the copy, so in-place nested mutations (e.g. payload.args[k]=v)
204 # cannot pollute the live chain. model_copy(deep=True) is used
205 # for Pydantic models; copy.deepcopy handles plain dicts that
206 # arrive via cross-type hooks (e.g. http_auth_resolve_user).
207 effective_payload = current_payload if current_payload is not None else payload
208 # RootModel subclasses (e.g. HttpHeaderPayload) wrap mutable containers
209 # that bypass the frozen=True constraint via __setitem__, so they must
210 # always be deep-copied to prevent in-place corruption of the live chain.
211 needs_isolation = policy or self.default_hook_policy == DefaultHookPolicy.DENY or isinstance(effective_payload, RootModel)
212 if needs_isolation:
213 plugin_input = effective_payload.model_copy(deep=True) if isinstance(effective_payload, BaseModel) else copy.deepcopy(effective_payload)
214 else:
215 plugin_input = effective_payload
217 # Execute plugin with timeout protection
218 result = await self.execute_plugin(
219 hook_ref,
220 plugin_input,
221 local_context,
222 violations_as_exceptions,
223 global_context,
224 combined_metadata,
225 )
227 # Apply policy-based controlled merge (per-plugin)
228 if result.modified_payload is not None:
229 if policy:
230 if isinstance(result.modified_payload, type(effective_payload)) and isinstance(effective_payload, BaseModel):
231 # Same-type BaseModel payload — apply field-level policy filtering
232 filtered = apply_policy(
233 effective_payload,
234 result.modified_payload,
235 policy,
236 )
237 if filtered is not None:
238 current_payload = filtered
239 decision_plugin_name = hook_ref.plugin_ref.name
240 else:
241 # Cross-type payload (e.g. HTTP hooks returning a different
242 # result type than the input). Field-level filtering is not
243 # applicable; the policy's presence authorises the hook.
244 # Guard: only accept PluginPayload subtypes or dict (used
245 # by http_auth_resolve_user and similar hooks).
246 if isinstance(result.modified_payload, (PluginPayload, dict)):
247 logger.debug(
248 "Plugin %s returned cross-type payload (%s -> %s) on hook %s; accepting without field filtering",
249 hook_ref.plugin_ref.name,
250 type(effective_payload).__name__,
251 type(result.modified_payload).__name__,
252 hook_type,
253 )
254 current_payload = result.modified_payload
255 decision_plugin_name = hook_ref.plugin_ref.name
256 else:
257 logger.warning(
258 "Plugin %s returned unexpected type %s on hook %s; ignoring modification",
259 hook_ref.plugin_ref.name,
260 type(result.modified_payload).__name__,
261 hook_type,
262 )
263 elif self.default_hook_policy == DefaultHookPolicy.ALLOW:
264 # No explicit policy + default=allow -- accept all modifications
265 current_payload = result.modified_payload
266 decision_plugin_name = hook_ref.plugin_ref.name
267 else:
268 # No explicit policy + default=deny -- reject all modifications
269 logger.warning(
270 "Plugin %s attempted payload modification on hook %s but no policy is defined and default is deny",
271 hook_ref.plugin_ref.name,
272 hook_type,
273 )
275 # Both ENFORCE and ENFORCE_IGNORE_ERROR honour continue_processing=False
276 # and halt the chain. They differ only in error handling: ENFORCE raises
277 # on plugin errors/timeouts, while ENFORCE_IGNORE_ERROR swallows them and
278 # lets the chain continue (see execute_plugin exception handlers).
279 if not result.continue_processing and hook_ref.plugin_ref.mode in (PluginMode.ENFORCE, PluginMode.ENFORCE_IGNORE_ERROR):
280 if hook_type == HTTP_AUTH_CHECK_PERMISSION_HOOK and decision_plugin_name:
281 combined_metadata[DECISION_PLUGIN_METADATA_KEY] = decision_plugin_name
282 return (
283 PluginResult(
284 continue_processing=False,
285 modified_payload=current_payload,
286 violation=result.violation,
287 metadata=combined_metadata,
288 ),
289 res_local_contexts,
290 )
292 if hook_type == HTTP_AUTH_CHECK_PERMISSION_HOOK and decision_plugin_name:
293 combined_metadata[DECISION_PLUGIN_METADATA_KEY] = decision_plugin_name
295 return (PluginResult(continue_processing=True, modified_payload=current_payload, violation=None, metadata=combined_metadata), res_local_contexts)
297 async def execute_plugin(
298 self,
299 hook_ref: HookRef,
300 payload: PluginPayload,
301 local_context: PluginContext,
302 violations_as_exceptions: bool,
303 global_context: Optional[GlobalContext] = None,
304 combined_metadata: Optional[dict[str, Any]] = None,
305 ) -> PluginResult:
306 """Execute a single plugin with timeout protection.
308 Args:
309 hook_ref: Hooking structure that contains the plugin and hook.
310 payload: The payload to be processed by plugins.
311 local_context: local context.
312 violations_as_exceptions: Raise violations as exceptions rather than as returns.
313 global_context: Shared context for all plugins containing request metadata.
314 combined_metadata: combination of the metadata of all plugins.
316 Returns:
317 A tuple containing:
318 - PluginResult with processing status, modified payload, and metadata
319 - PluginContextTable with updated local contexts for each plugin
321 Raises:
322 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE.
323 PluginError: If there is an error inside a plugin.
324 PluginViolationError: If a violation occurs and violation_as_exceptions is set.
325 """
326 try:
327 # Execute plugin with timeout protection
328 result = await self._execute_with_timeout(hook_ref, payload, local_context)
329 # Only merge global state for enforce modes; permissive plugins
330 # operate on copy-on-write snapshots and should not mutate shared state.
331 if local_context.global_context and global_context and hook_ref.plugin_ref.mode in (PluginMode.ENFORCE, PluginMode.ENFORCE_IGNORE_ERROR):
332 global_context.state.update(local_context.global_context.state)
333 global_context.metadata.update(local_context.global_context.metadata)
334 # Aggregate metadata from all plugins
335 if result.metadata and combined_metadata is not None:
336 combined_metadata.update({k: v for k, v in result.metadata.items() if k not in RESERVED_INTERNAL_METADATA_KEYS})
338 # Track payload modifications
339 # if result.modified_payload is not None:
340 # current_payload = result.modified_payload
342 # Set plugin name in violation if present
343 if result.violation:
344 result.violation.plugin_name = hook_ref.plugin_ref.plugin.name
346 # Handle plugin blocking the request
347 if not result.continue_processing:
348 if hook_ref.plugin_ref.mode == PluginMode.ENFORCE:
349 logger.warning("Plugin %s blocked request in enforce mode", hook_ref.plugin_ref.plugin.name)
350 if violations_as_exceptions:
351 if result.violation:
352 plugin_name = result.violation.plugin_name
353 violation_reason = result.violation.reason
354 violation_desc = result.violation.description
355 violation_code = result.violation.code
356 raise PluginViolationError(
357 f"{hook_ref.name} blocked by plugin {plugin_name}: {violation_code} - {violation_reason} ({violation_desc})",
358 violation=result.violation,
359 )
360 raise PluginViolationError(f"{hook_ref.name} blocked by plugin")
361 return PluginResult(
362 continue_processing=False,
363 modified_payload=payload,
364 violation=result.violation,
365 metadata=combined_metadata,
366 )
367 if hook_ref.plugin_ref.mode == PluginMode.PERMISSIVE:
368 logger.warning(
369 "Plugin %s would block (permissive mode): %s",
370 hook_ref.plugin_ref.plugin.name,
371 result.violation.description if result.violation else "No description",
372 )
373 return result
374 except asyncio.TimeoutError as exc:
375 logger.error("Plugin %s timed out after %ds", hook_ref.plugin_ref.name, self.timeout)
376 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.mode == PluginMode.ENFORCE:
377 raise PluginError(
378 error=PluginErrorModel(
379 message=f"Plugin {hook_ref.plugin_ref.name} exceeded {self.timeout}s timeout",
380 plugin_name=hook_ref.plugin_ref.name,
381 )
382 ) from exc
383 # In permissive or enforce_ignore_error mode, continue with next plugin
384 except PluginViolationError:
385 raise
386 except PluginError as pe:
387 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(pe))
388 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.mode == PluginMode.ENFORCE:
389 raise
390 except Exception as e:
391 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(e))
392 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.mode == PluginMode.ENFORCE:
393 raise PluginError(error=convert_exception_to_error(e, hook_ref.plugin_ref.name)) from e
394 # In permissive or enforce_ignore_error mode, continue with next plugin
395 # Return a result indicating processing should continue despite the error
396 return PluginResult(continue_processing=True)
398 async def _execute_with_timeout(self, hook_ref: HookRef, payload: PluginPayload, context: PluginContext) -> PluginResult:
399 """Execute a plugin with timeout protection.
401 Args:
402 hook_ref: Reference to the hook and plugin to execute.
403 payload: Payload to process.
404 context: Plugin execution context.
406 Returns:
407 Result from plugin execution.
409 Raises:
410 asyncio.TimeoutError: If plugin exceeds timeout.
411 asyncio.CancelledError: If plugin execution is cancelled.
412 Exception: Re-raised from plugin hook execution failures.
413 """
414 # Start observability span if tracing is active
415 trace_id = current_trace_id.get()
416 span_id = None
418 if trace_id and self.observability:
419 try:
420 span_id = self.observability.start_span(
421 trace_id=trace_id,
422 name=f"plugin.execute.{hook_ref.plugin_ref.name}",
423 kind="internal",
424 resource_type="plugin",
425 resource_name=hook_ref.plugin_ref.name,
426 attributes={
427 "plugin.name": hook_ref.plugin_ref.name,
428 "plugin.uuid": hook_ref.plugin_ref.uuid,
429 "plugin.mode": hook_ref.plugin_ref.mode.value if hasattr(hook_ref.plugin_ref.mode, "value") else str(hook_ref.plugin_ref.mode),
430 "plugin.priority": hook_ref.plugin_ref.priority,
431 "plugin.timeout": self.timeout,
432 },
433 )
434 except Exception as e:
435 logger.debug("Plugin observability start_span failed: %s", e)
437 # Execute plugin
438 try:
439 result = await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout)
440 except Exception:
441 if span_id is not None:
442 try:
443 self.observability.end_span(span_id=span_id, status="error")
444 except Exception: # nosec B110
445 pass
446 raise
448 # End span with success
449 if span_id is not None:
450 try:
451 self.observability.end_span(
452 span_id=span_id,
453 status="ok",
454 attributes={
455 "plugin.had_violation": result.violation is not None,
456 "plugin.modified_payload": result.modified_payload is not None,
457 },
458 )
459 except Exception as e:
460 logger.debug("Plugin observability end_span failed: %s", e)
462 return result
464 def _validate_payload_size(self, payload: Any) -> None:
465 """Validate that payload doesn't exceed size limits.
467 Args:
468 payload: The payload to validate.
470 Raises:
471 PayloadSizeError: If payload exceeds MAX_PAYLOAD_SIZE.
472 """
473 # For PromptPrehookPayload, check args size
474 if hasattr(payload, "args") and payload.args:
475 total_size = sum(len(str(v)) for v in payload.args.values())
476 if total_size > MAX_PAYLOAD_SIZE:
477 raise PayloadSizeError(f"Payload size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes")
478 # For PromptPosthookPayload, check result size
479 elif hasattr(payload, "result") and payload.result:
480 # Estimate size of result messages
481 total_size = len(str(payload.result))
482 if total_size > MAX_PAYLOAD_SIZE:
483 raise PayloadSizeError(f"Result size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes")
486class PluginManager:
487 """Plugin manager for managing the plugin lifecycle.
489 This class implements a thread-safe Borg singleton pattern to ensure consistent
490 plugin management across the application. It handles:
491 - Plugin discovery and loading from configuration
492 - Plugin lifecycle management (initialization, execution, shutdown)
493 - Context management with automatic cleanup
494 - Hook execution orchestration
496 Thread Safety:
497 Uses double-checked locking to prevent race conditions when multiple threads
498 create PluginManager instances simultaneously. The first instance to acquire
499 the lock loads the configuration; subsequent instances reuse the shared state.
501 Attributes:
502 config: The loaded plugin configuration.
503 plugin_count: Number of currently loaded plugins.
504 initialized: Whether the manager has been initialized.
506 Examples:
507 >>> # Initialize plugin manager
508 >>> manager = PluginManager("plugins/config.yaml")
509 >>> # In async context:
510 >>> # await manager.initialize()
511 >>> # print(f"Loaded {manager.plugin_count} plugins")
512 >>>
513 >>> # Execute prompt hooks
514 >>> from mcpgateway.plugins.framework.models import GlobalContext
515 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload
516 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={})
517 >>> context = GlobalContext(request_id="req-123")
518 >>> # In async context:
519 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context)
520 >>>
521 >>> # Shutdown when done
522 >>> # await manager.shutdown()
523 """
525 __shared_state: dict[Any, Any] = {}
526 __lock: threading.Lock = threading.Lock() # Thread safety for synchronous init
527 _async_lock: asyncio.Lock | None = None # Async lock for initialize/shutdown
528 _loader: PluginLoader = PluginLoader()
529 _initialized: bool = False
530 _registry: PluginInstanceRegistry = PluginInstanceRegistry()
531 _config: Config | None = None
532 _config_path: str | None = None
533 _executor: PluginExecutor | None = None
535 def __init__(
536 self,
537 config: str = "",
538 timeout: int = DEFAULT_PLUGIN_TIMEOUT,
539 observability: Optional[ObservabilityProvider] = None,
540 hook_policies: Optional[dict[str, HookPayloadPolicy]] = None,
541 ):
542 """Initialize plugin manager.
544 PluginManager implements a thread-safe Borg singleton:
545 - Shared state is initialized only once across all instances.
546 - Subsequent instantiations reuse same state and skip config reload.
547 - Uses double-checked locking to prevent race conditions in multi-threaded environments.
549 Thread Safety:
550 The initialization uses a double-checked locking pattern to ensure that
551 config loading only happens once, even when multiple threads create
552 PluginManager instances simultaneously.
554 Args:
555 config: Path to plugin configuration file (YAML).
556 timeout: Maximum execution time per plugin in seconds.
557 observability: Optional observability provider implementing ObservabilityProvider protocol.
558 hook_policies: Per-hook-type payload modification policies (injected by gateway).
560 Examples:
561 >>> # Initialize with configuration file
562 >>> manager = PluginManager("plugins/config.yaml")
564 >>> # Initialize with custom timeout
565 >>> manager = PluginManager("plugins/config.yaml", timeout=60)
566 """
567 self.__dict__ = self.__shared_state
569 # Only initialize once (first instance when shared state is empty)
570 # Use lock to prevent race condition in multi-threaded environments
571 if not self.__shared_state:
572 with self.__lock:
573 # Double-check after acquiring lock (another thread may have initialized)
574 if not self.__shared_state:
575 if config:
576 self._config = ConfigLoader.load_config(config)
577 self._config_path = config
579 # Update executor with timeout, observability, and policies
580 self._executor = PluginExecutor(
581 config=self._config,
582 timeout=timeout,
583 observability=observability,
584 hook_policies=hook_policies,
585 )
586 elif hook_policies:
587 # Allow hook policies to be injected after initial Borg creation.
588 # This handles the case where the first PluginManager instantiation
589 # (e.g. from a service) didn't have policies, but a later one does.
590 with self.__lock:
591 executor = self._get_executor()
592 # Only update timeout if caller provided a non-default value
593 if timeout != DEFAULT_PLUGIN_TIMEOUT:
594 executor.timeout = timeout
595 if not executor.hook_policies:
596 executor.hook_policies = hook_policies
597 elif executor.hook_policies != hook_policies:
598 logger.warning("PluginManager: hook_policies already set; ignoring new policies (call reset() first to replace them)")
599 if observability and not executor.observability:
600 executor.observability = observability
601 elif self._executor is None:
602 # Defensive initialization for unusual state transitions in tests.
603 with self.__lock:
604 if self._executor is None:
605 self._executor = PluginExecutor(config=self._config, timeout=timeout, observability=observability)
607 def _get_executor(self) -> PluginExecutor:
608 """Get plugin executor, creating it lazily if necessary.
610 Returns:
611 PluginExecutor: The plugin executor instance.
612 """
613 if self._executor is None:
614 self._executor = PluginExecutor(config=self._config)
615 return self._executor
617 @property
618 def executor(self) -> PluginExecutor:
619 """Expose executor for tests and internal callers.
621 Returns:
622 PluginExecutor: The plugin executor instance.
623 """
624 return self._get_executor()
626 @executor.setter
627 def executor(self, value: PluginExecutor) -> None:
628 """Set the plugin executor instance.
630 Args:
631 value: The plugin executor to assign.
632 """
633 self._executor = value
635 @classmethod
636 def reset(cls) -> None:
637 """Reset the Borg pattern shared state.
639 This method clears all shared state, allowing a fresh PluginManager
640 instance to be created with new configuration. Primarily used for testing.
642 Thread-safe: Uses lock to ensure atomic reset operation.
644 Examples:
645 >>> # Between tests, reset shared state
646 >>> PluginManager.reset()
647 >>> manager = PluginManager("new_config.yaml")
648 """
649 with cls.__lock:
650 cls.__shared_state.clear()
651 cls._initialized = False
652 cls._config = None
653 cls._config_path = None
654 cls._async_lock = None
655 cls._registry = PluginInstanceRegistry()
656 cls._executor = None
657 cls._loader = PluginLoader()
659 @property
660 def config(self) -> Config | None:
661 """Plugin manager configuration.
663 Returns:
664 The plugin configuration object or None if not configured.
665 """
666 return self._config
668 @property
669 def plugin_count(self) -> int:
670 """Number of plugins loaded.
672 Returns:
673 The number of currently loaded plugins.
674 """
675 return self._registry.plugin_count
677 @property
678 def initialized(self) -> bool:
679 """Plugin manager initialization status.
681 Returns:
682 True if the plugin manager has been initialized.
683 """
684 return self._initialized
686 @property
687 def observability(self) -> Optional[ObservabilityProvider]:
688 """Current observability provider.
690 Returns:
691 The observability provider or None if not configured.
692 """
693 return self._executor.observability
695 @observability.setter
696 def observability(self, provider: Optional[ObservabilityProvider]) -> None:
697 """Set the observability provider.
699 Thread-safe: uses lock to prevent races with concurrent readers.
701 Args:
702 provider: ObservabilityProvider to inject into the executor.
703 """
704 with self.__lock:
705 self._executor.observability = provider
707 def get_plugin(self, name: str) -> Optional[Plugin]:
708 """Get a plugin by name.
710 Args:
711 name: the name of the plugin to return.
713 Returns:
714 A plugin.
715 """
716 plugin_ref = self._registry.get_plugin(name)
717 return plugin_ref.plugin if plugin_ref else None
719 def has_hooks_for(self, hook_type: str) -> bool:
720 """Check if there are any hooks registered for a specific hook type.
722 Args:
723 hook_type: The type of hook to check for.
725 Returns:
726 True if there are hooks registered for the specified type, False otherwise.
727 """
728 return self._registry.has_hooks_for(hook_type)
730 async def initialize(self) -> None:
731 """Initialize the plugin manager and load all configured plugins.
733 This method:
734 1. Loads plugin configurations from the config file
735 2. Instantiates each enabled plugin
736 3. Registers plugins with the registry
737 4. Validates plugin initialization
739 Thread Safety:
740 Uses asyncio.Lock to prevent concurrent initialization from multiple
741 coroutines or async tasks. Combined with threading.Lock in __init__
742 for full multi-threaded safety.
744 Raises:
745 RuntimeError: If plugin initialization fails with an exception.
746 ValueError: If a plugin cannot be initialized or registered.
748 Examples:
749 >>> manager = PluginManager("plugins/config.yaml")
750 >>> # In async context:
751 >>> # await manager.initialize()
752 >>> # Manager is now ready to execute plugins
753 """
754 # Initialize async lock lazily (can't create asyncio.Lock in class definition)
755 with self.__lock:
756 if self._async_lock is None:
757 self._async_lock = asyncio.Lock()
759 async with self._async_lock:
760 # Double-check after acquiring lock
761 if self._initialized:
762 logger.debug("Plugin manager already initialized")
763 return
765 # Defensive cleanup: registry should be empty when not initialized
766 if self._registry.plugin_count:
767 logger.debug("Plugin registry not empty before initialize; clearing stale plugins")
768 await self._registry.shutdown()
770 plugins = self._config.plugins if self._config and self._config.plugins else []
771 loaded_count = 0
773 for plugin_config in plugins:
774 try:
775 # For disabled plugins, create a stub plugin without full instantiation
776 if plugin_config.mode != PluginMode.DISABLED:
777 # Fully instantiate enabled plugins
778 plugin = await self._loader.load_and_instantiate_plugin(plugin_config)
779 if plugin:
780 self._registry.register(plugin)
781 loaded_count += 1
782 logger.info("Loaded plugin: %s (mode: %s)", plugin_config.name, plugin_config.mode)
783 else:
784 raise ValueError(f"Unable to instantiate plugin: {plugin_config.name}")
785 else:
786 logger.info("Plugin: %s is disabled. Ignoring.", plugin_config.name)
788 except Exception as e:
789 # Clean error message without stack trace spam
790 logger.error("Failed to load plugin %s: {%s}", plugin_config.name, str(e))
791 # Let it crash gracefully with a clean error
792 raise RuntimeError(f"Plugin initialization failed: {plugin_config.name} - {str(e)}") from e
794 self._initialized = True
795 logger.info("Plugin manager initialized with %s plugins", loaded_count)
797 async def shutdown(self) -> None:
798 """Shutdown all plugins and cleanup resources.
800 This method:
801 1. Shuts down all registered plugins
802 2. Clears the plugin registry
803 3. Cleans up stored contexts
804 4. Resets initialization state
806 Thread Safety:
807 Uses asyncio.Lock to prevent concurrent shutdown with initialization
808 or with another shutdown call.
810 Note: The config is preserved to allow modifying settings and re-initializing.
811 To fully reset for a new config, create a new PluginManager instance.
813 Examples:
814 >>> manager = PluginManager("plugins/config.yaml")
815 >>> # In async context:
816 >>> # await manager.initialize()
817 >>> # ... use the manager ...
818 >>> # await manager.shutdown()
819 """
820 # Initialize async lock lazily if needed
821 with self.__lock:
822 if self._async_lock is None:
823 self._async_lock = asyncio.Lock()
825 async with self._async_lock:
826 if not self._initialized:
827 logger.debug("Plugin manager not initialized, nothing to shutdown")
828 return
830 logger.info("Shutting down plugin manager")
832 # Shutdown all plugins
833 await self._registry.shutdown()
835 # Reset state to allow re-initialization
836 self._initialized = False
838 logger.info("Plugin manager shutdown complete")
840 async def invoke_hook(
841 self,
842 hook_type: str,
843 payload: PluginPayload,
844 global_context: GlobalContext,
845 local_contexts: Optional[PluginContextTable] = None,
846 violations_as_exceptions: bool = False,
847 ) -> tuple[PluginResult, PluginContextTable | None]:
848 """Invoke a set of plugins configured for the hook point in priority order.
850 Args:
851 hook_type: The type of hook to execute.
852 payload: The plugin payload for which the plugins will analyze and modify.
853 global_context: Shared context for all plugins with request metadata.
854 local_contexts: Optional existing contexts from previous hook executions.
855 violations_as_exceptions: Raise violations as exceptions rather than as returns.
857 Returns:
858 A tuple containing:
859 - PluginResult with processing status and modified payload
860 - PluginContextTable with plugin contexts for state management
862 Examples:
863 >>> manager = PluginManager("plugins/config.yaml")
864 >>> # In async context:
865 >>> # await manager.initialize()
866 >>> # payload = ResourcePreFetchPayload("file:///data.txt")
867 >>> # context = GlobalContext(request_id="123", server_id="srv1")
868 >>> # result, contexts = await manager.resource_pre_fetch(payload, context)
869 >>> # if result.continue_processing:
870 >>> # # Use modified payload
871 >>> # uri = result.modified_payload.uri
872 """
873 # Get plugins configured for this hook
874 hook_refs = self._registry.get_hook_refs_for_hook(hook_type=hook_type)
876 # Execute plugins
877 result = await self._get_executor().execute(hook_refs, payload, global_context, hook_type, local_contexts, violations_as_exceptions)
879 return result
881 async def invoke_hook_for_plugin(
882 self,
883 name: str,
884 hook_type: str,
885 payload: Union[PluginPayload, dict[str, Any], str],
886 context: Union[PluginContext, GlobalContext],
887 violations_as_exceptions: bool = False,
888 payload_as_json: bool = False,
889 ) -> PluginResult:
890 """Invoke a specific hook for a single named plugin.
892 This method allows direct invocation of a particular plugin's hook by name,
893 bypassing the normal priority-ordered execution. Useful for testing individual
894 plugins or when specific plugin behavior needs to be triggered independently.
896 Args:
897 name: The name of the plugin to invoke.
898 hook_type: The type of hook to execute (e.g., "prompt_pre_fetch").
899 payload: The plugin payload to be processed by the hook.
900 context: Plugin execution context (PluginContext) or GlobalContext (will be wrapped).
901 violations_as_exceptions: Raise violations as exceptions rather than returns.
902 payload_as_json: payload passed in as json rather than pydantic.
904 Returns:
905 PluginResult with processing status, modified payload, and metadata.
907 Raises:
908 PluginError: If the plugin or hook type cannot be found in the registry.
909 ValueError: If payload type does not match payload_as_json setting.
911 Examples:
912 >>> manager = PluginManager("plugins/config.yaml")
913 >>> # In async context:
914 >>> # await manager.initialize()
915 >>> # payload = PromptPrehookPayload(name="test", args={})
916 >>> # context = PluginContext(global_context=GlobalContext(request_id="123"))
917 >>> # result = await manager.invoke_hook_for_plugin(
918 >>> # name="auth_plugin",
919 >>> # hook_type="prompt_pre_fetch",
920 >>> # payload=payload,
921 >>> # context=context
922 >>> # )
923 """
924 # Auto-wrap GlobalContext in PluginContext for convenience
925 if isinstance(context, GlobalContext):
926 context = PluginContext(global_context=context)
928 hook_ref = self._registry.get_plugin_hook_by_name(name, hook_type)
929 if not hook_ref:
930 raise PluginError(
931 error=PluginErrorModel(
932 message=f"Unable to find {hook_type} for plugin {name}. Make sure the plugin is registered.",
933 plugin_name=name,
934 )
935 )
936 if payload_as_json:
937 plugin = hook_ref.plugin_ref.plugin
938 # When payload_as_json=True, payload should be str or dict
939 if isinstance(payload, (str, dict)):
940 pydantic_payload = plugin.json_to_payload(hook_type, payload)
941 return await self._get_executor().execute_plugin(hook_ref, pydantic_payload, context, violations_as_exceptions)
942 raise ValueError(f"When payload_as_json=True, payload must be str or dict, got {type(payload)}")
943 # When payload_as_json=False, payload should already be a PluginPayload
944 if not isinstance(payload, PluginPayload):
945 raise ValueError(f"When payload_as_json=False, payload must be a PluginPayload, got {type(payload)}")
946 return await self._get_executor().execute_plugin(hook_ref, payload, context, violations_as_exceptions)