Coverage for mcpgateway / plugins / framework / manager.py: 96%

218 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/manager.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo 

6 

7Plugin manager. 

8Module that manages and calls plugins at hookpoints throughout the gateway. 

9 

10This module provides the core plugin management functionality including: 

11- Plugin lifecycle management (initialization, execution, shutdown) 

12- Timeout protection for plugin execution 

13- Context management with automatic cleanup 

14- Priority-based plugin ordering 

15- Conditional plugin execution based on prompts/servers/tenants 

16 

17Examples: 

18 >>> # Initialize plugin manager with configuration 

19 >>> manager = PluginManager("plugins/config.yaml") 

20 >>> # await manager.initialize() # Called in async context 

21 

22 >>> # Create test payload and context 

23 >>> from mcpgateway.plugins.framework.models import GlobalContext 

24 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload 

25 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={"user": "input"}) 

26 >>> context = GlobalContext(request_id="123") 

27 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context) # Called in async context 

28""" 

29 

30# Standard 

31import asyncio 

32import logging 

33import threading 

34from typing import Any, Optional, Union 

35 

36# First-Party 

37from mcpgateway.plugins.framework.base import HookRef, Plugin 

38from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError, PluginViolationError 

39from mcpgateway.plugins.framework.loader.config import ConfigLoader 

40from mcpgateway.plugins.framework.loader.plugin import PluginLoader 

41from mcpgateway.plugins.framework.memory import copyonwrite 

42from mcpgateway.plugins.framework.models import Config, GlobalContext, PluginContext, PluginContextTable, PluginErrorModel, PluginMode, PluginPayload, PluginResult 

43from mcpgateway.plugins.framework.registry import PluginInstanceRegistry 

44from mcpgateway.plugins.framework.utils import payload_matches 

45 

46# Use standard logging to avoid circular imports (plugins -> services -> plugins) 

47logger = logging.getLogger(__name__) 

48 

49# Configuration constants 

50DEFAULT_PLUGIN_TIMEOUT = 30 # seconds 

51MAX_PAYLOAD_SIZE = 1_000_000 # 1MB 

52CONTEXT_CLEANUP_INTERVAL = 300 # 5 minutes 

53CONTEXT_MAX_AGE = 3600 # 1 hour 

54 

55 

56class PluginTimeoutError(Exception): 

57 """Raised when a plugin execution exceeds the timeout limit.""" 

58 

59 

60class PayloadSizeError(ValueError): 

61 """Raised when a payload exceeds the maximum allowed size.""" 

62 

63 

64class PluginExecutor: 

65 """Executes a list of plugins with timeout protection and error handling. 

66 

67 This class manages the execution of plugins in priority order, handling: 

68 - Timeout protection for each plugin 

69 - Context management between plugins 

70 - Error isolation to prevent plugin failures from affecting the gateway 

71 - Metadata aggregation from multiple plugins 

72 

73 Examples: 

74 >>> executor = PluginExecutor() 

75 >>> # In async context: 

76 >>> # result, contexts = await executor.execute( 

77 >>> # plugins=[plugin1, plugin2], 

78 >>> # payload=payload, 

79 >>> # global_context=context, 

80 >>> # plugin_run=pre_prompt_fetch, 

81 >>> # compare=pre_prompt_matches 

82 >>> # ) 

83 """ 

84 

85 def __init__(self, config: Optional[Config] = None, timeout: int = DEFAULT_PLUGIN_TIMEOUT): 

86 """Initialize the plugin executor. 

87 

88 Args: 

89 timeout: Maximum execution time per plugin in seconds. 

90 config: the plugin manager configuration. 

91 """ 

92 self.timeout = timeout 

93 self.config = config 

94 

95 async def execute( 

96 self, 

97 hook_refs: list[HookRef], 

98 payload: PluginPayload, 

99 global_context: GlobalContext, 

100 hook_type: str, 

101 local_contexts: Optional[PluginContextTable] = None, 

102 violations_as_exceptions: bool = False, 

103 ) -> tuple[PluginResult, PluginContextTable | None]: 

104 """Execute plugins in priority order with timeout protection. 

105 

106 Args: 

107 hook_refs: List of hook references to execute, sorted by priority. 

108 payload: The payload to be processed by plugins. 

109 global_context: Shared context for all plugins containing request metadata. 

110 hook_type: The hook type identifier (e.g., "tool_pre_invoke"). 

111 local_contexts: Optional existing contexts from previous hook executions. 

112 violations_as_exceptions: Raise violations as exceptions rather than as returns. 

113 

114 Returns: 

115 A tuple containing: 

116 - PluginResult with processing status, modified payload, and metadata 

117 - PluginContextTable with updated local contexts for each plugin 

118 

119 Raises: 

120 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE. 

121 PluginError: If there is an error inside a plugin. 

122 PluginViolationError: If a violation occurs and violation_as_exceptions is set. 

123 

124 Examples: 

125 >>> # Execute plugins with timeout protection 

126 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptHookType 

127 >>> executor = PluginExecutor(timeout=30) 

128 >>> # Assuming you have a registry instance: 

129 >>> # plugins = registry.get_plugins_for_hook(PromptHookType.PROMPT_PRE_FETCH) 

130 >>> # In async context: 

131 >>> # result, contexts = await executor.execute( 

132 >>> # plugins=plugins, 

133 >>> # payload=PromptPrehookPayload(prompt_id="123", name="test", args={}), 

134 >>> # global_context=GlobalContext(request_id="123"), 

135 >>> # plugin_run=pre_prompt_fetch, 

136 >>> # compare=pre_prompt_matches 

137 >>> # ) 

138 """ 

139 if not hook_refs: 

140 return (PluginResult(modified_payload=None), None) 

141 

142 # Validate payload size 

143 self._validate_payload_size(payload) 

144 

145 res_local_contexts = {} 

146 combined_metadata: dict[str, Any] = {} 

147 current_payload: PluginPayload | None = None 

148 

149 for hook_ref in hook_refs: 

150 # Skip disabled plugins 

151 if hook_ref.plugin_ref.mode == PluginMode.DISABLED: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 continue 

153 

154 # Check if plugin conditions match current context 

155 if hook_ref.plugin_ref.conditions and not payload_matches(payload, hook_type, hook_ref.plugin_ref.conditions, global_context): 

156 logger.debug("Skipping plugin %s - conditions not met", hook_ref.plugin_ref.name) 

157 continue 

158 

159 tmp_global_context = GlobalContext( 

160 request_id=global_context.request_id, 

161 user=global_context.user, 

162 tenant_id=global_context.tenant_id, 

163 server_id=global_context.server_id, 

164 state={} if not global_context.state else copyonwrite(global_context.state), 

165 metadata={} if not global_context.metadata else copyonwrite(global_context.metadata), 

166 ) 

167 # Get or create local context for this plugin 

168 local_context_key = global_context.request_id + hook_ref.plugin_ref.uuid 

169 if local_contexts and local_context_key in local_contexts: 

170 local_context = local_contexts[local_context_key] 

171 local_context.global_context = tmp_global_context 

172 else: 

173 local_context = PluginContext(global_context=tmp_global_context) 

174 res_local_contexts[local_context_key] = local_context 

175 

176 # Execute plugin with timeout protection 

177 result = await self.execute_plugin( 

178 hook_ref, 

179 current_payload or payload, 

180 local_context, 

181 violations_as_exceptions, 

182 global_context, 

183 combined_metadata, 

184 ) 

185 # Track payload modifications 

186 if result.modified_payload is not None: 

187 current_payload = result.modified_payload 

188 if not result.continue_processing and hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE: 

189 return (result, res_local_contexts) 

190 

191 return ( 

192 PluginResult(continue_processing=True, modified_payload=current_payload, violation=None, metadata=combined_metadata), 

193 res_local_contexts, 

194 ) 

195 

196 async def execute_plugin( 

197 self, 

198 hook_ref: HookRef, 

199 payload: PluginPayload, 

200 local_context: PluginContext, 

201 violations_as_exceptions: bool, 

202 global_context: Optional[GlobalContext] = None, 

203 combined_metadata: Optional[dict[str, Any]] = None, 

204 ) -> PluginResult: 

205 """Execute a single plugin with timeout protection. 

206 

207 Args: 

208 hook_ref: Hooking structure that contains the plugin and hook. 

209 payload: The payload to be processed by plugins. 

210 local_context: local context. 

211 violations_as_exceptions: Raise violations as exceptions rather than as returns. 

212 global_context: Shared context for all plugins containing request metadata. 

213 combined_metadata: combination of the metadata of all plugins. 

214 

215 Returns: 

216 A tuple containing: 

217 - PluginResult with processing status, modified payload, and metadata 

218 - PluginContextTable with updated local contexts for each plugin 

219 

220 Raises: 

221 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE. 

222 PluginError: If there is an error inside a plugin. 

223 PluginViolationError: If a violation occurs and violation_as_exceptions is set. 

224 """ 

225 try: 

226 # Execute plugin with timeout protection 

227 result = await self._execute_with_timeout(hook_ref, payload, local_context) 

228 if local_context.global_context and global_context: 

229 global_context.state.update(local_context.global_context.state) 

230 global_context.metadata.update(local_context.global_context.metadata) 

231 # Aggregate metadata from all plugins 

232 if result.metadata and combined_metadata is not None: 

233 combined_metadata.update(result.metadata) 

234 

235 # Track payload modifications 

236 # if result.modified_payload is not None: 

237 # current_payload = result.modified_payload 

238 

239 # Set plugin name in violation if present 

240 if result.violation: 

241 result.violation.plugin_name = hook_ref.plugin_ref.plugin.name 

242 

243 # Handle plugin blocking the request 

244 if not result.continue_processing: 

245 if hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE: 

246 logger.warning("Plugin %s blocked request in enforce mode", hook_ref.plugin_ref.plugin.name) 

247 if violations_as_exceptions: 

248 if result.violation: 248 ↛ 257line 248 didn't jump to line 257 because the condition on line 248 was always true

249 plugin_name = result.violation.plugin_name 

250 violation_reason = result.violation.reason 

251 violation_desc = result.violation.description 

252 violation_code = result.violation.code 

253 raise PluginViolationError( 

254 f"{hook_ref.name} blocked by plugin {plugin_name}: {violation_code} - {violation_reason} ({violation_desc})", 

255 violation=result.violation, 

256 ) 

257 raise PluginViolationError(f"{hook_ref.name} blocked by plugin") 

258 return PluginResult( 

259 continue_processing=False, 

260 modified_payload=payload, 

261 violation=result.violation, 

262 metadata=combined_metadata, 

263 ) 

264 if hook_ref.plugin_ref.plugin.mode == PluginMode.PERMISSIVE: 264 ↛ 270line 264 didn't jump to line 270 because the condition on line 264 was always true

265 logger.warning( 

266 "Plugin %s would block (permissive mode): %s", 

267 hook_ref.plugin_ref.plugin.name, 

268 result.violation.description if result.violation else "No description", 

269 ) 

270 return result 

271 except asyncio.TimeoutError as exc: 

272 logger.error("Plugin %s timed out after %ds", hook_ref.plugin_ref.name, self.timeout) 

273 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE: 

274 raise PluginError( 

275 error=PluginErrorModel( 

276 message=f"Plugin {hook_ref.plugin_ref.name} exceeded {self.timeout}s timeout", 

277 plugin_name=hook_ref.plugin_ref.name, 

278 ) 

279 ) from exc 

280 # In permissive or enforce_ignore_error mode, continue with next plugin 

281 except PluginViolationError: 

282 raise 

283 except PluginError as pe: 

284 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(pe)) 

285 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE: 285 ↛ 293line 285 didn't jump to line 293 because the condition on line 285 was always true

286 raise 

287 except Exception as e: 

288 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(e)) 

289 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.plugin.mode == PluginMode.ENFORCE: 

290 raise PluginError(error=convert_exception_to_error(e, hook_ref.plugin_ref.name)) from e 

291 # In permissive or enforce_ignore_error mode, continue with next plugin 

292 # Return a result indicating processing should continue despite the error 

293 return PluginResult(continue_processing=True) 

294 

295 async def _execute_with_timeout(self, hook_ref: HookRef, payload: PluginPayload, context: PluginContext) -> PluginResult: 

296 """Execute a plugin with timeout protection. 

297 

298 Args: 

299 hook_ref: Reference to the hook and plugin to execute. 

300 payload: Payload to process. 

301 context: Plugin execution context. 

302 

303 Returns: 

304 Result from plugin execution. 

305 

306 Raises: 

307 asyncio.TimeoutError: If plugin exceeds timeout. 

308 """ 

309 # Add observability tracing for plugin execution 

310 try: 

311 # First-Party 

312 # pylint: disable=import-outside-toplevel 

313 from mcpgateway.db import SessionLocal 

314 from mcpgateway.services.observability_service import current_trace_id, ObservabilityService 

315 

316 # pylint: enable=import-outside-toplevel 

317 

318 trace_id = current_trace_id.get() 

319 if trace_id: 

320 db = SessionLocal() 

321 try: 

322 service = ObservabilityService() 

323 span_id = service.start_span( 

324 db=db, 

325 trace_id=trace_id, 

326 name=f"plugin.execute.{hook_ref.plugin_ref.name}", 

327 kind="internal", 

328 resource_type="plugin", 

329 resource_name=hook_ref.plugin_ref.name, 

330 attributes={ 

331 "plugin.name": hook_ref.plugin_ref.name, 

332 "plugin.uuid": hook_ref.plugin_ref.uuid, 

333 "plugin.mode": hook_ref.plugin_ref.mode.value if hasattr(hook_ref.plugin_ref.mode, "value") else str(hook_ref.plugin_ref.mode), 

334 "plugin.priority": hook_ref.plugin_ref.priority, 

335 "plugin.timeout": self.timeout, 

336 }, 

337 ) 

338 

339 # Execute plugin 

340 result = await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout) 

341 

342 # End span with success 

343 service.end_span( 

344 db=db, 

345 span_id=span_id, 

346 status="ok", 

347 attributes={ 

348 "plugin.had_violation": result.violation is not None, 

349 "plugin.modified_payload": result.modified_payload is not None, 

350 }, 

351 ) 

352 return result 

353 finally: 

354 db.close() # Observability service handles its own commits 

355 else: 

356 # No active trace, execute without instrumentation 

357 return await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout) 

358 

359 except Exception as e: 

360 # If observability setup fails, continue without instrumentation 

361 logger.debug("Plugin observability setup failed: %s", e) 

362 return await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout) 

363 

364 def _validate_payload_size(self, payload: Any) -> None: 

365 """Validate that payload doesn't exceed size limits. 

366 

367 Args: 

368 payload: The payload to validate. 

369 

370 Raises: 

371 PayloadSizeError: If payload exceeds MAX_PAYLOAD_SIZE. 

372 """ 

373 # For PromptPrehookPayload, check args size 

374 if hasattr(payload, "args") and payload.args: 

375 total_size = sum(len(str(v)) for v in payload.args.values()) 

376 if total_size > MAX_PAYLOAD_SIZE: 

377 raise PayloadSizeError(f"Payload size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes") 

378 # For PromptPosthookPayload, check result size 

379 elif hasattr(payload, "result") and payload.result: 

380 # Estimate size of result messages 

381 total_size = len(str(payload.result)) 

382 if total_size > MAX_PAYLOAD_SIZE: 

383 raise PayloadSizeError(f"Result size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes") 

384 

385 

386class PluginManager: 

387 """Plugin manager for managing the plugin lifecycle. 

388 

389 This class implements a thread-safe Borg singleton pattern to ensure consistent 

390 plugin management across the application. It handles: 

391 - Plugin discovery and loading from configuration 

392 - Plugin lifecycle management (initialization, execution, shutdown) 

393 - Context management with automatic cleanup 

394 - Hook execution orchestration 

395 

396 Thread Safety: 

397 Uses double-checked locking to prevent race conditions when multiple threads 

398 create PluginManager instances simultaneously. The first instance to acquire 

399 the lock loads the configuration; subsequent instances reuse the shared state. 

400 

401 Attributes: 

402 config: The loaded plugin configuration. 

403 plugin_count: Number of currently loaded plugins. 

404 initialized: Whether the manager has been initialized. 

405 

406 Examples: 

407 >>> # Initialize plugin manager 

408 >>> manager = PluginManager("plugins/config.yaml") 

409 >>> # In async context: 

410 >>> # await manager.initialize() 

411 >>> # print(f"Loaded {manager.plugin_count} plugins") 

412 >>> 

413 >>> # Execute prompt hooks 

414 >>> from mcpgateway.plugins.framework.models import GlobalContext 

415 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload 

416 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={}) 

417 >>> context = GlobalContext(request_id="req-123") 

418 >>> # In async context: 

419 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context) 

420 >>> 

421 >>> # Shutdown when done 

422 >>> # await manager.shutdown() 

423 """ 

424 

425 __shared_state: dict[Any, Any] = {} 

426 __lock: threading.Lock = threading.Lock() # Thread safety for synchronous init 

427 _async_lock: asyncio.Lock | None = None # Async lock for initialize/shutdown 

428 _loader: PluginLoader = PluginLoader() 

429 _initialized: bool = False 

430 _registry: PluginInstanceRegistry = PluginInstanceRegistry() 

431 _config: Config | None = None 

432 _config_path: str | None = None 

433 _executor: PluginExecutor = PluginExecutor() 

434 

435 def __init__(self, config: str = "", timeout: int = DEFAULT_PLUGIN_TIMEOUT): 

436 """Initialize plugin manager. 

437 

438 PluginManager implements a thread-safe Borg singleton: 

439 - Shared state is initialized only once across all instances. 

440 - Subsequent instantiations reuse same state and skip config reload. 

441 - Uses double-checked locking to prevent race conditions in multi-threaded environments. 

442 

443 Thread Safety: 

444 The initialization uses a double-checked locking pattern to ensure that 

445 config loading only happens once, even when multiple threads create 

446 PluginManager instances simultaneously. 

447 

448 Args: 

449 config: Path to plugin configuration file (YAML). 

450 timeout: Maximum execution time per plugin in seconds. 

451 

452 Examples: 

453 >>> # Initialize with configuration file 

454 >>> manager = PluginManager("plugins/config.yaml") 

455 

456 >>> # Initialize with custom timeout 

457 >>> manager = PluginManager("plugins/config.yaml", timeout=60) 

458 """ 

459 self.__dict__ = self.__shared_state 

460 

461 # Only initialize once (first instance when shared state is empty) 

462 # Use lock to prevent race condition in multi-threaded environments 

463 if not self.__shared_state: 

464 with self.__lock: 

465 # Double-check after acquiring lock (another thread may have initialized) 

466 if not self.__shared_state: 

467 if config: 

468 self._config = ConfigLoader.load_config(config) 

469 self._config_path = config 

470 

471 # Update executor timeouts 

472 self._executor.config = self._config 

473 self._executor.timeout = timeout 

474 

475 @classmethod 

476 def reset(cls) -> None: 

477 """Reset the Borg pattern shared state. 

478 

479 This method clears all shared state, allowing a fresh PluginManager 

480 instance to be created with new configuration. Primarily used for testing. 

481 

482 Thread-safe: Uses lock to ensure atomic reset operation. 

483 

484 Examples: 

485 >>> # Between tests, reset shared state 

486 >>> PluginManager.reset() 

487 >>> manager = PluginManager("new_config.yaml") 

488 """ 

489 with cls.__lock: 

490 cls.__shared_state.clear() 

491 cls._initialized = False 

492 cls._config = None 

493 cls._config_path = None 

494 cls._async_lock = None 

495 cls._registry = PluginInstanceRegistry() 

496 cls._executor = PluginExecutor() 

497 cls._loader = PluginLoader() 

498 

499 @property 

500 def config(self) -> Config | None: 

501 """Plugin manager configuration. 

502 

503 Returns: 

504 The plugin configuration object or None if not configured. 

505 """ 

506 return self._config 

507 

508 @property 

509 def plugin_count(self) -> int: 

510 """Number of plugins loaded. 

511 

512 Returns: 

513 The number of currently loaded plugins. 

514 """ 

515 return self._registry.plugin_count 

516 

517 @property 

518 def initialized(self) -> bool: 

519 """Plugin manager initialization status. 

520 

521 Returns: 

522 True if the plugin manager has been initialized. 

523 """ 

524 return self._initialized 

525 

526 def get_plugin(self, name: str) -> Optional[Plugin]: 

527 """Get a plugin by name. 

528 

529 Args: 

530 name: the name of the plugin to return. 

531 

532 Returns: 

533 A plugin. 

534 """ 

535 plugin_ref = self._registry.get_plugin(name) 

536 return plugin_ref.plugin if plugin_ref else None 

537 

538 def has_hooks_for(self, hook_type: str) -> bool: 

539 """Check if there are any hooks registered for a specific hook type. 

540 

541 Args: 

542 hook_type: The type of hook to check for. 

543 

544 Returns: 

545 True if there are hooks registered for the specified type, False otherwise. 

546 """ 

547 return self._registry.has_hooks_for(hook_type) 

548 

549 async def initialize(self) -> None: 

550 """Initialize the plugin manager and load all configured plugins. 

551 

552 This method: 

553 1. Loads plugin configurations from the config file 

554 2. Instantiates each enabled plugin 

555 3. Registers plugins with the registry 

556 4. Validates plugin initialization 

557 

558 Thread Safety: 

559 Uses asyncio.Lock to prevent concurrent initialization from multiple 

560 coroutines or async tasks. Combined with threading.Lock in __init__ 

561 for full multi-threaded safety. 

562 

563 Raises: 

564 RuntimeError: If plugin initialization fails with an exception. 

565 ValueError: If a plugin cannot be initialized or registered. 

566 

567 Examples: 

568 >>> manager = PluginManager("plugins/config.yaml") 

569 >>> # In async context: 

570 >>> # await manager.initialize() 

571 >>> # Manager is now ready to execute plugins 

572 """ 

573 # Initialize async lock lazily (can't create asyncio.Lock in class definition) 

574 if self._async_lock is None: 

575 self._async_lock = asyncio.Lock() 

576 

577 async with self._async_lock: 

578 # Double-check after acquiring lock 

579 if self._initialized: 

580 logger.debug("Plugin manager already initialized") 

581 return 

582 

583 # Defensive cleanup: registry should be empty when not initialized 

584 if self._registry.plugin_count: 584 ↛ 585line 584 didn't jump to line 585 because the condition on line 584 was never true

585 logger.debug("Plugin registry not empty before initialize; clearing stale plugins") 

586 await self._registry.shutdown() 

587 

588 plugins = self._config.plugins if self._config and self._config.plugins else [] 

589 loaded_count = 0 

590 

591 for plugin_config in plugins: 

592 try: 

593 # For disabled plugins, create a stub plugin without full instantiation 

594 if plugin_config.mode != PluginMode.DISABLED: 

595 # Fully instantiate enabled plugins 

596 plugin = await self._loader.load_and_instantiate_plugin(plugin_config) 

597 if plugin: 

598 self._registry.register(plugin) 

599 loaded_count += 1 

600 logger.info("Loaded plugin: %s (mode: %s)", plugin_config.name, plugin_config.mode) 

601 else: 

602 raise ValueError(f"Unable to instantiate plugin: {plugin_config.name}") 

603 else: 

604 logger.info("Plugin: %s is disabled. Ignoring.", plugin_config.name) 

605 

606 except Exception as e: 

607 # Clean error message without stack trace spam 

608 logger.error("Failed to load plugin %s: {%s}", plugin_config.name, str(e)) 

609 # Let it crash gracefully with a clean error 

610 raise RuntimeError(f"Plugin initialization failed: {plugin_config.name} - {str(e)}") from e 

611 

612 self._initialized = True 

613 logger.info("Plugin manager initialized with %s plugins", loaded_count) 

614 

615 async def shutdown(self) -> None: 

616 """Shutdown all plugins and cleanup resources. 

617 

618 This method: 

619 1. Shuts down all registered plugins 

620 2. Clears the plugin registry 

621 3. Cleans up stored contexts 

622 4. Resets initialization state 

623 

624 Thread Safety: 

625 Uses asyncio.Lock to prevent concurrent shutdown with initialization 

626 or with another shutdown call. 

627 

628 Note: The config is preserved to allow modifying settings and re-initializing. 

629 To fully reset for a new config, create a new PluginManager instance. 

630 

631 Examples: 

632 >>> manager = PluginManager("plugins/config.yaml") 

633 >>> # In async context: 

634 >>> # await manager.initialize() 

635 >>> # ... use the manager ... 

636 >>> # await manager.shutdown() 

637 """ 

638 # Initialize async lock lazily if needed 

639 if self._async_lock is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 self._async_lock = asyncio.Lock() 

641 

642 async with self._async_lock: 

643 if not self._initialized: 

644 logger.debug("Plugin manager not initialized, nothing to shutdown") 

645 return 

646 

647 logger.info("Shutting down plugin manager") 

648 

649 # Shutdown all plugins 

650 await self._registry.shutdown() 

651 

652 # Reset state to allow re-initialization 

653 self._initialized = False 

654 

655 logger.info("Plugin manager shutdown complete") 

656 

657 async def invoke_hook( 

658 self, 

659 hook_type: str, 

660 payload: PluginPayload, 

661 global_context: GlobalContext, 

662 local_contexts: Optional[PluginContextTable] = None, 

663 violations_as_exceptions: bool = False, 

664 ) -> tuple[PluginResult, PluginContextTable | None]: 

665 """Invoke a set of plugins configured for the hook point in priority order. 

666 

667 Args: 

668 hook_type: The type of hook to execute. 

669 payload: The plugin payload for which the plugins will analyze and modify. 

670 global_context: Shared context for all plugins with request metadata. 

671 local_contexts: Optional existing contexts from previous hook executions. 

672 violations_as_exceptions: Raise violations as exceptions rather than as returns. 

673 

674 Returns: 

675 A tuple containing: 

676 - PluginResult with processing status and modified payload 

677 - PluginContextTable with plugin contexts for state management 

678 

679 Examples: 

680 >>> manager = PluginManager("plugins/config.yaml") 

681 >>> # In async context: 

682 >>> # await manager.initialize() 

683 >>> # payload = ResourcePreFetchPayload("file:///data.txt") 

684 >>> # context = GlobalContext(request_id="123", server_id="srv1") 

685 >>> # result, contexts = await manager.resource_pre_fetch(payload, context) 

686 >>> # if result.continue_processing: 

687 >>> # # Use modified payload 

688 >>> # uri = result.modified_payload.uri 

689 """ 

690 # Get plugins configured for this hook 

691 hook_refs = self._registry.get_hook_refs_for_hook(hook_type=hook_type) 

692 

693 # Execute plugins 

694 result = await self._executor.execute(hook_refs, payload, global_context, hook_type, local_contexts, violations_as_exceptions) 

695 

696 return result 

697 

698 async def invoke_hook_for_plugin( 

699 self, 

700 name: str, 

701 hook_type: str, 

702 payload: Union[PluginPayload, dict[str, Any], str], 

703 context: Union[PluginContext, GlobalContext], 

704 violations_as_exceptions: bool = False, 

705 payload_as_json: bool = False, 

706 ) -> PluginResult: 

707 """Invoke a specific hook for a single named plugin. 

708 

709 This method allows direct invocation of a particular plugin's hook by name, 

710 bypassing the normal priority-ordered execution. Useful for testing individual 

711 plugins or when specific plugin behavior needs to be triggered independently. 

712 

713 Args: 

714 name: The name of the plugin to invoke. 

715 hook_type: The type of hook to execute (e.g., "prompt_pre_fetch"). 

716 payload: The plugin payload to be processed by the hook. 

717 context: Plugin execution context (PluginContext) or GlobalContext (will be wrapped). 

718 violations_as_exceptions: Raise violations as exceptions rather than returns. 

719 payload_as_json: payload passed in as json rather than pydantic. 

720 

721 Returns: 

722 PluginResult with processing status, modified payload, and metadata. 

723 

724 Raises: 

725 PluginError: If the plugin or hook type cannot be found in the registry. 

726 ValueError: If payload type does not match payload_as_json setting. 

727 

728 Examples: 

729 >>> manager = PluginManager("plugins/config.yaml") 

730 >>> # In async context: 

731 >>> # await manager.initialize() 

732 >>> # payload = PromptPrehookPayload(name="test", args={}) 

733 >>> # context = PluginContext(global_context=GlobalContext(request_id="123")) 

734 >>> # result = await manager.invoke_hook_for_plugin( 

735 >>> # name="auth_plugin", 

736 >>> # hook_type="prompt_pre_fetch", 

737 >>> # payload=payload, 

738 >>> # context=context 

739 >>> # ) 

740 """ 

741 # Auto-wrap GlobalContext in PluginContext for convenience 

742 if isinstance(context, GlobalContext): 

743 context = PluginContext(global_context=context) 

744 

745 hook_ref = self._registry.get_plugin_hook_by_name(name, hook_type) 

746 if not hook_ref: 

747 raise PluginError( 

748 error=PluginErrorModel( 

749 message=f"Unable to find {hook_type} for plugin {name}. Make sure the plugin is registered.", 

750 plugin_name=name, 

751 ) 

752 ) 

753 if payload_as_json: 

754 plugin = hook_ref.plugin_ref.plugin 

755 # When payload_as_json=True, payload should be str or dict 

756 if isinstance(payload, (str, dict)): 

757 pydantic_payload = plugin.json_to_payload(hook_type, payload) 

758 return await self._executor.execute_plugin(hook_ref, pydantic_payload, context, violations_as_exceptions) 

759 raise ValueError(f"When payload_as_json=True, payload must be str or dict, got {type(payload)}") 

760 # When payload_as_json=False, payload should already be a PluginPayload 

761 if not isinstance(payload, PluginPayload): 

762 raise ValueError(f"When payload_as_json=False, payload must be a PluginPayload, got {type(payload)}") 

763 return await self._executor.execute_plugin(hook_ref, payload, context, violations_as_exceptions)