Coverage for mcpgateway / plugins / framework / manager.py: 98%

292 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/manager.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo 

6 

7Plugin manager. 

8Module that manages and calls plugins at hookpoints throughout the gateway. 

9 

10This module provides the core plugin management functionality including: 

11- Plugin lifecycle management (initialization, execution, shutdown) 

12- Timeout protection for plugin execution 

13- Context management with automatic cleanup 

14- Priority-based plugin ordering 

15- Conditional plugin execution based on prompts/servers/tenants 

16 

17Examples: 

18 >>> # Initialize plugin manager with configuration 

19 >>> manager = PluginManager("plugins/config.yaml") 

20 >>> # await manager.initialize() # Called in async context 

21 

22 >>> # Create test payload and context 

23 >>> from mcpgateway.plugins.framework.models import GlobalContext 

24 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload 

25 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={"user": "input"}) 

26 >>> context = GlobalContext(request_id="123") 

27 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context) # Called in async context 

28""" 

29 

30# Standard 

31import asyncio 

32import copy 

33import logging 

34import threading 

35from typing import Any, Optional, Union 

36 

37# Third-Party 

38from pydantic import BaseModel, RootModel 

39 

40# First-Party 

41from mcpgateway.plugins.framework.base import HookRef, Plugin 

42from mcpgateway.plugins.framework.errors import convert_exception_to_error, PluginError, PluginViolationError 

43from mcpgateway.plugins.framework.hooks.policies import apply_policy, DefaultHookPolicy, HookPayloadPolicy 

44from mcpgateway.plugins.framework.loader.config import ConfigLoader 

45from mcpgateway.plugins.framework.loader.plugin import PluginLoader 

46from mcpgateway.plugins.framework.memory import copyonwrite 

47from mcpgateway.plugins.framework.models import Config, GlobalContext, PluginContext, PluginContextTable, PluginErrorModel, PluginMode, PluginPayload, PluginResult 

48from mcpgateway.plugins.framework.observability import current_trace_id, ObservabilityProvider 

49from mcpgateway.plugins.framework.registry import PluginInstanceRegistry 

50from mcpgateway.plugins.framework.settings import settings 

51from mcpgateway.plugins.framework.utils import payload_matches 

52 

53# Use standard logging to avoid circular imports (plugins -> services -> plugins) 

54logger = logging.getLogger(__name__) 

55 

56# Configuration constants 

57DEFAULT_PLUGIN_TIMEOUT = 30 # seconds 

58MAX_PAYLOAD_SIZE = 1_000_000 # 1MB 

59CONTEXT_CLEANUP_INTERVAL = 300 # 5 minutes 

60CONTEXT_MAX_AGE = 3600 # 1 hour 

61HTTP_AUTH_CHECK_PERMISSION_HOOK = "http_auth_check_permission" 

62DECISION_PLUGIN_METADATA_KEY = "_decision_plugin" 

63RESERVED_INTERNAL_METADATA_KEYS = frozenset({DECISION_PLUGIN_METADATA_KEY}) 

64 

65 

66class PluginTimeoutError(Exception): 

67 """Raised when a plugin execution exceeds the timeout limit.""" 

68 

69 

70class PayloadSizeError(ValueError): 

71 """Raised when a payload exceeds the maximum allowed size.""" 

72 

73 

74class PluginExecutor: 

75 """Executes a list of plugins with timeout protection and error handling. 

76 

77 This class manages the execution of plugins in priority order, handling: 

78 - Timeout protection for each plugin 

79 - Context management between plugins 

80 - Error isolation to prevent plugin failures from affecting the gateway 

81 - Metadata aggregation from multiple plugins 

82 

83 Examples: 

84 >>> executor = PluginExecutor() 

85 >>> # In async context: 

86 >>> # result, contexts = await executor.execute( 

87 >>> # plugins=[plugin1, plugin2], 

88 >>> # payload=payload, 

89 >>> # global_context=context, 

90 >>> # plugin_run=pre_prompt_fetch, 

91 >>> # compare=pre_prompt_matches 

92 >>> # ) 

93 """ 

94 

95 def __init__( 

96 self, 

97 config: Optional[Config] = None, 

98 timeout: int = DEFAULT_PLUGIN_TIMEOUT, 

99 observability: Optional[ObservabilityProvider] = None, 

100 hook_policies: Optional[dict[str, HookPayloadPolicy]] = None, 

101 ): 

102 """Initialize the plugin executor. 

103 

104 Args: 

105 config: the plugin manager configuration. 

106 timeout: Maximum execution time per plugin in seconds. 

107 observability: Optional observability provider implementing ObservabilityProvider protocol. 

108 hook_policies: Per-hook-type payload modification policies. 

109 """ 

110 self.timeout = timeout 

111 self.config = config 

112 self.observability = observability 

113 self.hook_policies: dict[str, HookPayloadPolicy] = hook_policies or {} 

114 self.default_hook_policy = DefaultHookPolicy(settings.default_hook_policy) 

115 

116 async def execute( 

117 self, 

118 hook_refs: list[HookRef], 

119 payload: PluginPayload, 

120 global_context: GlobalContext, 

121 hook_type: str, 

122 local_contexts: Optional[PluginContextTable] = None, 

123 violations_as_exceptions: bool = False, 

124 ) -> tuple[PluginResult, PluginContextTable | None]: 

125 """Execute plugins in priority order with timeout protection. 

126 

127 Args: 

128 hook_refs: List of hook references to execute, sorted by priority. 

129 payload: The payload to be processed by plugins. 

130 global_context: Shared context for all plugins containing request metadata. 

131 hook_type: The hook type identifier (e.g., "tool_pre_invoke"). 

132 local_contexts: Optional existing contexts from previous hook executions. 

133 violations_as_exceptions: Raise violations as exceptions rather than as returns. 

134 

135 Returns: 

136 A tuple containing: 

137 - PluginResult with processing status, modified payload, and metadata 

138 - PluginContextTable with updated local contexts for each plugin 

139 

140 Raises: 

141 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE. 

142 PluginError: If there is an error inside a plugin. 

143 PluginViolationError: If a violation occurs and violation_as_exceptions is set. 

144 

145 Examples: 

146 >>> # Execute plugins with timeout protection 

147 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptHookType 

148 >>> executor = PluginExecutor(timeout=30) 

149 >>> # Assuming you have a registry instance: 

150 >>> # plugins = registry.get_plugins_for_hook(PromptHookType.PROMPT_PRE_FETCH) 

151 >>> # In async context: 

152 >>> # result, contexts = await executor.execute( 

153 >>> # plugins=plugins, 

154 >>> # payload=PromptPrehookPayload(prompt_id="123", name="test", args={}), 

155 >>> # global_context=GlobalContext(request_id="123"), 

156 >>> # plugin_run=pre_prompt_fetch, 

157 >>> # compare=pre_prompt_matches 

158 >>> # ) 

159 """ 

160 if not hook_refs: 

161 return (PluginResult(modified_payload=None), None) 

162 

163 # Validate payload size 

164 self._validate_payload_size(payload) 

165 

166 # Look up the policy for this hook type (may be None) 

167 policy = self.hook_policies.get(hook_type) 

168 

169 res_local_contexts = {} 

170 combined_metadata: dict[str, Any] = {} 

171 current_payload: PluginPayload | None = None 

172 decision_plugin_name: Optional[str] = None 

173 

174 for hook_ref in hook_refs: 

175 # Skip disabled plugins 

176 if hook_ref.plugin_ref.mode == PluginMode.DISABLED: 

177 continue 

178 

179 # Check if plugin conditions match current context 

180 if hook_ref.plugin_ref.conditions and not payload_matches(payload, hook_type, hook_ref.plugin_ref.conditions, global_context): 

181 logger.debug("Skipping plugin %s - conditions not met", hook_ref.plugin_ref.name) 

182 continue 

183 

184 tmp_global_context = GlobalContext( 

185 request_id=global_context.request_id, 

186 user=global_context.user, 

187 tenant_id=global_context.tenant_id, 

188 server_id=global_context.server_id, 

189 state={} if not global_context.state else copyonwrite(global_context.state), 

190 metadata={} if not global_context.metadata else copyonwrite(global_context.metadata), 

191 ) 

192 # Get or create local context for this plugin 

193 local_context_key = global_context.request_id + hook_ref.plugin_ref.uuid 

194 if local_contexts and local_context_key in local_contexts: 

195 local_context = local_contexts[local_context_key] 

196 local_context.global_context = tmp_global_context 

197 else: 

198 local_context = PluginContext(global_context=tmp_global_context) 

199 res_local_contexts[local_context_key] = local_context 

200 

201 # When a policy exists or default=deny is active, deep-copy the 

202 # payload before handing it to the plugin. The plugin operates on 

203 # the copy, so in-place nested mutations (e.g. payload.args[k]=v) 

204 # cannot pollute the live chain. model_copy(deep=True) is used 

205 # for Pydantic models; copy.deepcopy handles plain dicts that 

206 # arrive via cross-type hooks (e.g. http_auth_resolve_user). 

207 effective_payload = current_payload if current_payload is not None else payload 

208 # RootModel subclasses (e.g. HttpHeaderPayload) wrap mutable containers 

209 # that bypass the frozen=True constraint via __setitem__, so they must 

210 # always be deep-copied to prevent in-place corruption of the live chain. 

211 needs_isolation = policy or self.default_hook_policy == DefaultHookPolicy.DENY or isinstance(effective_payload, RootModel) 

212 if needs_isolation: 

213 plugin_input = effective_payload.model_copy(deep=True) if isinstance(effective_payload, BaseModel) else copy.deepcopy(effective_payload) 

214 else: 

215 plugin_input = effective_payload 

216 

217 # Execute plugin with timeout protection 

218 result = await self.execute_plugin( 

219 hook_ref, 

220 plugin_input, 

221 local_context, 

222 violations_as_exceptions, 

223 global_context, 

224 combined_metadata, 

225 ) 

226 

227 # Apply policy-based controlled merge (per-plugin) 

228 if result.modified_payload is not None: 

229 if policy: 

230 if isinstance(result.modified_payload, type(effective_payload)) and isinstance(effective_payload, BaseModel): 

231 # Same-type BaseModel payload — apply field-level policy filtering 

232 filtered = apply_policy( 

233 effective_payload, 

234 result.modified_payload, 

235 policy, 

236 ) 

237 if filtered is not None: 

238 current_payload = filtered 

239 decision_plugin_name = hook_ref.plugin_ref.name 

240 else: 

241 # Cross-type payload (e.g. HTTP hooks returning a different 

242 # result type than the input). Field-level filtering is not 

243 # applicable; the policy's presence authorises the hook. 

244 # Guard: only accept PluginPayload subtypes or dict (used 

245 # by http_auth_resolve_user and similar hooks). 

246 if isinstance(result.modified_payload, (PluginPayload, dict)): 

247 logger.debug( 

248 "Plugin %s returned cross-type payload (%s -> %s) on hook %s; accepting without field filtering", 

249 hook_ref.plugin_ref.name, 

250 type(effective_payload).__name__, 

251 type(result.modified_payload).__name__, 

252 hook_type, 

253 ) 

254 current_payload = result.modified_payload 

255 decision_plugin_name = hook_ref.plugin_ref.name 

256 else: 

257 logger.warning( 

258 "Plugin %s returned unexpected type %s on hook %s; ignoring modification", 

259 hook_ref.plugin_ref.name, 

260 type(result.modified_payload).__name__, 

261 hook_type, 

262 ) 

263 elif self.default_hook_policy == DefaultHookPolicy.ALLOW: 

264 # No explicit policy + default=allow -- accept all modifications 

265 current_payload = result.modified_payload 

266 decision_plugin_name = hook_ref.plugin_ref.name 

267 else: 

268 # No explicit policy + default=deny -- reject all modifications 

269 logger.warning( 

270 "Plugin %s attempted payload modification on hook %s but no policy is defined and default is deny", 

271 hook_ref.plugin_ref.name, 

272 hook_type, 

273 ) 

274 

275 # Both ENFORCE and ENFORCE_IGNORE_ERROR honour continue_processing=False 

276 # and halt the chain. They differ only in error handling: ENFORCE raises 

277 # on plugin errors/timeouts, while ENFORCE_IGNORE_ERROR swallows them and 

278 # lets the chain continue (see execute_plugin exception handlers). 

279 if not result.continue_processing and hook_ref.plugin_ref.mode in (PluginMode.ENFORCE, PluginMode.ENFORCE_IGNORE_ERROR): 

280 if hook_type == HTTP_AUTH_CHECK_PERMISSION_HOOK and decision_plugin_name: 

281 combined_metadata[DECISION_PLUGIN_METADATA_KEY] = decision_plugin_name 

282 return ( 

283 PluginResult( 

284 continue_processing=False, 

285 modified_payload=current_payload, 

286 violation=result.violation, 

287 metadata=combined_metadata, 

288 ), 

289 res_local_contexts, 

290 ) 

291 

292 if hook_type == HTTP_AUTH_CHECK_PERMISSION_HOOK and decision_plugin_name: 

293 combined_metadata[DECISION_PLUGIN_METADATA_KEY] = decision_plugin_name 

294 

295 return (PluginResult(continue_processing=True, modified_payload=current_payload, violation=None, metadata=combined_metadata), res_local_contexts) 

296 

297 async def execute_plugin( 

298 self, 

299 hook_ref: HookRef, 

300 payload: PluginPayload, 

301 local_context: PluginContext, 

302 violations_as_exceptions: bool, 

303 global_context: Optional[GlobalContext] = None, 

304 combined_metadata: Optional[dict[str, Any]] = None, 

305 ) -> PluginResult: 

306 """Execute a single plugin with timeout protection. 

307 

308 Args: 

309 hook_ref: Hooking structure that contains the plugin and hook. 

310 payload: The payload to be processed by plugins. 

311 local_context: local context. 

312 violations_as_exceptions: Raise violations as exceptions rather than as returns. 

313 global_context: Shared context for all plugins containing request metadata. 

314 combined_metadata: combination of the metadata of all plugins. 

315 

316 Returns: 

317 A tuple containing: 

318 - PluginResult with processing status, modified payload, and metadata 

319 - PluginContextTable with updated local contexts for each plugin 

320 

321 Raises: 

322 PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE. 

323 PluginError: If there is an error inside a plugin. 

324 PluginViolationError: If a violation occurs and violation_as_exceptions is set. 

325 """ 

326 try: 

327 # Execute plugin with timeout protection 

328 result = await self._execute_with_timeout(hook_ref, payload, local_context) 

329 # Only merge global state for enforce modes; permissive plugins 

330 # operate on copy-on-write snapshots and should not mutate shared state. 

331 if local_context.global_context and global_context and hook_ref.plugin_ref.mode in (PluginMode.ENFORCE, PluginMode.ENFORCE_IGNORE_ERROR): 

332 global_context.state.update(local_context.global_context.state) 

333 global_context.metadata.update(local_context.global_context.metadata) 

334 # Aggregate metadata from all plugins 

335 if result.metadata and combined_metadata is not None: 

336 combined_metadata.update({k: v for k, v in result.metadata.items() if k not in RESERVED_INTERNAL_METADATA_KEYS}) 

337 

338 # Track payload modifications 

339 # if result.modified_payload is not None: 

340 # current_payload = result.modified_payload 

341 

342 # Set plugin name in violation if present 

343 if result.violation: 

344 result.violation.plugin_name = hook_ref.plugin_ref.plugin.name 

345 

346 # Handle plugin blocking the request 

347 if not result.continue_processing: 

348 if hook_ref.plugin_ref.mode == PluginMode.ENFORCE: 

349 logger.warning("Plugin %s blocked request in enforce mode", hook_ref.plugin_ref.plugin.name) 

350 if violations_as_exceptions: 

351 if result.violation: 

352 plugin_name = result.violation.plugin_name 

353 violation_reason = result.violation.reason 

354 violation_desc = result.violation.description 

355 violation_code = result.violation.code 

356 raise PluginViolationError( 

357 f"{hook_ref.name} blocked by plugin {plugin_name}: {violation_code} - {violation_reason} ({violation_desc})", 

358 violation=result.violation, 

359 ) 

360 raise PluginViolationError(f"{hook_ref.name} blocked by plugin") 

361 return PluginResult( 

362 continue_processing=False, 

363 modified_payload=payload, 

364 violation=result.violation, 

365 metadata=combined_metadata, 

366 ) 

367 if hook_ref.plugin_ref.mode == PluginMode.PERMISSIVE: 

368 logger.warning( 

369 "Plugin %s would block (permissive mode): %s", 

370 hook_ref.plugin_ref.plugin.name, 

371 result.violation.description if result.violation else "No description", 

372 ) 

373 return result 

374 except asyncio.TimeoutError as exc: 

375 logger.error("Plugin %s timed out after %ds", hook_ref.plugin_ref.name, self.timeout) 

376 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.mode == PluginMode.ENFORCE: 

377 raise PluginError( 

378 error=PluginErrorModel( 

379 message=f"Plugin {hook_ref.plugin_ref.name} exceeded {self.timeout}s timeout", 

380 plugin_name=hook_ref.plugin_ref.name, 

381 ) 

382 ) from exc 

383 # In permissive or enforce_ignore_error mode, continue with next plugin 

384 except PluginViolationError: 

385 raise 

386 except PluginError as pe: 

387 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(pe)) 

388 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.mode == PluginMode.ENFORCE: 

389 raise 

390 except Exception as e: 

391 logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(e)) 

392 if (self.config and self.config.plugin_settings.fail_on_plugin_error) or hook_ref.plugin_ref.mode == PluginMode.ENFORCE: 

393 raise PluginError(error=convert_exception_to_error(e, hook_ref.plugin_ref.name)) from e 

394 # In permissive or enforce_ignore_error mode, continue with next plugin 

395 # Return a result indicating processing should continue despite the error 

396 return PluginResult(continue_processing=True) 

397 

398 async def _execute_with_timeout(self, hook_ref: HookRef, payload: PluginPayload, context: PluginContext) -> PluginResult: 

399 """Execute a plugin with timeout protection. 

400 

401 Args: 

402 hook_ref: Reference to the hook and plugin to execute. 

403 payload: Payload to process. 

404 context: Plugin execution context. 

405 

406 Returns: 

407 Result from plugin execution. 

408 

409 Raises: 

410 asyncio.TimeoutError: If plugin exceeds timeout. 

411 asyncio.CancelledError: If plugin execution is cancelled. 

412 Exception: Re-raised from plugin hook execution failures. 

413 """ 

414 # Start observability span if tracing is active 

415 trace_id = current_trace_id.get() 

416 span_id = None 

417 

418 if trace_id and self.observability: 

419 try: 

420 span_id = self.observability.start_span( 

421 trace_id=trace_id, 

422 name=f"plugin.execute.{hook_ref.plugin_ref.name}", 

423 kind="internal", 

424 resource_type="plugin", 

425 resource_name=hook_ref.plugin_ref.name, 

426 attributes={ 

427 "plugin.name": hook_ref.plugin_ref.name, 

428 "plugin.uuid": hook_ref.plugin_ref.uuid, 

429 "plugin.mode": hook_ref.plugin_ref.mode.value if hasattr(hook_ref.plugin_ref.mode, "value") else str(hook_ref.plugin_ref.mode), 

430 "plugin.priority": hook_ref.plugin_ref.priority, 

431 "plugin.timeout": self.timeout, 

432 }, 

433 ) 

434 except Exception as e: 

435 logger.debug("Plugin observability start_span failed: %s", e) 

436 

437 # Execute plugin 

438 try: 

439 result = await asyncio.wait_for(hook_ref.hook(payload, context), timeout=self.timeout) 

440 except Exception: 

441 if span_id is not None: 

442 try: 

443 self.observability.end_span(span_id=span_id, status="error") 

444 except Exception: # nosec B110 

445 pass 

446 raise 

447 

448 # End span with success 

449 if span_id is not None: 

450 try: 

451 self.observability.end_span( 

452 span_id=span_id, 

453 status="ok", 

454 attributes={ 

455 "plugin.had_violation": result.violation is not None, 

456 "plugin.modified_payload": result.modified_payload is not None, 

457 }, 

458 ) 

459 except Exception as e: 

460 logger.debug("Plugin observability end_span failed: %s", e) 

461 

462 return result 

463 

464 def _validate_payload_size(self, payload: Any) -> None: 

465 """Validate that payload doesn't exceed size limits. 

466 

467 Args: 

468 payload: The payload to validate. 

469 

470 Raises: 

471 PayloadSizeError: If payload exceeds MAX_PAYLOAD_SIZE. 

472 """ 

473 # For PromptPrehookPayload, check args size 

474 if hasattr(payload, "args") and payload.args: 

475 total_size = sum(len(str(v)) for v in payload.args.values()) 

476 if total_size > MAX_PAYLOAD_SIZE: 

477 raise PayloadSizeError(f"Payload size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes") 

478 # For PromptPosthookPayload, check result size 

479 elif hasattr(payload, "result") and payload.result: 

480 # Estimate size of result messages 

481 total_size = len(str(payload.result)) 

482 if total_size > MAX_PAYLOAD_SIZE: 

483 raise PayloadSizeError(f"Result size {total_size} exceeds limit of {MAX_PAYLOAD_SIZE} bytes") 

484 

485 

486class PluginManager: 

487 """Plugin manager for managing the plugin lifecycle. 

488 

489 This class implements a thread-safe Borg singleton pattern to ensure consistent 

490 plugin management across the application. It handles: 

491 - Plugin discovery and loading from configuration 

492 - Plugin lifecycle management (initialization, execution, shutdown) 

493 - Context management with automatic cleanup 

494 - Hook execution orchestration 

495 

496 Thread Safety: 

497 Uses double-checked locking to prevent race conditions when multiple threads 

498 create PluginManager instances simultaneously. The first instance to acquire 

499 the lock loads the configuration; subsequent instances reuse the shared state. 

500 

501 Attributes: 

502 config: The loaded plugin configuration. 

503 plugin_count: Number of currently loaded plugins. 

504 initialized: Whether the manager has been initialized. 

505 

506 Examples: 

507 >>> # Initialize plugin manager 

508 >>> manager = PluginManager("plugins/config.yaml") 

509 >>> # In async context: 

510 >>> # await manager.initialize() 

511 >>> # print(f"Loaded {manager.plugin_count} plugins") 

512 >>> 

513 >>> # Execute prompt hooks 

514 >>> from mcpgateway.plugins.framework.models import GlobalContext 

515 >>> from mcpgateway.plugins.framework.hooks.prompts import PromptPrehookPayload 

516 >>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={}) 

517 >>> context = GlobalContext(request_id="req-123") 

518 >>> # In async context: 

519 >>> # result, contexts = await manager.prompt_pre_fetch(payload, context) 

520 >>> 

521 >>> # Shutdown when done 

522 >>> # await manager.shutdown() 

523 """ 

524 

525 __shared_state: dict[Any, Any] = {} 

526 __lock: threading.Lock = threading.Lock() # Thread safety for synchronous init 

527 _async_lock: asyncio.Lock | None = None # Async lock for initialize/shutdown 

528 _loader: PluginLoader = PluginLoader() 

529 _initialized: bool = False 

530 _registry: PluginInstanceRegistry = PluginInstanceRegistry() 

531 _config: Config | None = None 

532 _config_path: str | None = None 

533 _executor: PluginExecutor | None = None 

534 

535 def __init__( 

536 self, 

537 config: str = "", 

538 timeout: int = DEFAULT_PLUGIN_TIMEOUT, 

539 observability: Optional[ObservabilityProvider] = None, 

540 hook_policies: Optional[dict[str, HookPayloadPolicy]] = None, 

541 ): 

542 """Initialize plugin manager. 

543 

544 PluginManager implements a thread-safe Borg singleton: 

545 - Shared state is initialized only once across all instances. 

546 - Subsequent instantiations reuse same state and skip config reload. 

547 - Uses double-checked locking to prevent race conditions in multi-threaded environments. 

548 

549 Thread Safety: 

550 The initialization uses a double-checked locking pattern to ensure that 

551 config loading only happens once, even when multiple threads create 

552 PluginManager instances simultaneously. 

553 

554 Args: 

555 config: Path to plugin configuration file (YAML). 

556 timeout: Maximum execution time per plugin in seconds. 

557 observability: Optional observability provider implementing ObservabilityProvider protocol. 

558 hook_policies: Per-hook-type payload modification policies (injected by gateway). 

559 

560 Examples: 

561 >>> # Initialize with configuration file 

562 >>> manager = PluginManager("plugins/config.yaml") 

563 

564 >>> # Initialize with custom timeout 

565 >>> manager = PluginManager("plugins/config.yaml", timeout=60) 

566 """ 

567 self.__dict__ = self.__shared_state 

568 

569 # Only initialize once (first instance when shared state is empty) 

570 # Use lock to prevent race condition in multi-threaded environments 

571 if not self.__shared_state: 

572 with self.__lock: 

573 # Double-check after acquiring lock (another thread may have initialized) 

574 if not self.__shared_state: 

575 if config: 

576 self._config = ConfigLoader.load_config(config) 

577 self._config_path = config 

578 

579 # Update executor with timeout, observability, and policies 

580 self._executor = PluginExecutor( 

581 config=self._config, 

582 timeout=timeout, 

583 observability=observability, 

584 hook_policies=hook_policies, 

585 ) 

586 elif hook_policies: 

587 # Allow hook policies to be injected after initial Borg creation. 

588 # This handles the case where the first PluginManager instantiation 

589 # (e.g. from a service) didn't have policies, but a later one does. 

590 with self.__lock: 

591 executor = self._get_executor() 

592 # Only update timeout if caller provided a non-default value 

593 if timeout != DEFAULT_PLUGIN_TIMEOUT: 

594 executor.timeout = timeout 

595 if not executor.hook_policies: 

596 executor.hook_policies = hook_policies 

597 elif executor.hook_policies != hook_policies: 

598 logger.warning("PluginManager: hook_policies already set; ignoring new policies (call reset() first to replace them)") 

599 if observability and not executor.observability: 

600 executor.observability = observability 

601 elif self._executor is None: 

602 # Defensive initialization for unusual state transitions in tests. 

603 with self.__lock: 

604 if self._executor is None: 

605 self._executor = PluginExecutor(config=self._config, timeout=timeout, observability=observability) 

606 

607 def _get_executor(self) -> PluginExecutor: 

608 """Get plugin executor, creating it lazily if necessary. 

609 

610 Returns: 

611 PluginExecutor: The plugin executor instance. 

612 """ 

613 if self._executor is None: 

614 self._executor = PluginExecutor(config=self._config) 

615 return self._executor 

616 

617 @property 

618 def executor(self) -> PluginExecutor: 

619 """Expose executor for tests and internal callers. 

620 

621 Returns: 

622 PluginExecutor: The plugin executor instance. 

623 """ 

624 return self._get_executor() 

625 

626 @executor.setter 

627 def executor(self, value: PluginExecutor) -> None: 

628 """Set the plugin executor instance. 

629 

630 Args: 

631 value: The plugin executor to assign. 

632 """ 

633 self._executor = value 

634 

635 @classmethod 

636 def reset(cls) -> None: 

637 """Reset the Borg pattern shared state. 

638 

639 This method clears all shared state, allowing a fresh PluginManager 

640 instance to be created with new configuration. Primarily used for testing. 

641 

642 Thread-safe: Uses lock to ensure atomic reset operation. 

643 

644 Examples: 

645 >>> # Between tests, reset shared state 

646 >>> PluginManager.reset() 

647 >>> manager = PluginManager("new_config.yaml") 

648 """ 

649 with cls.__lock: 

650 cls.__shared_state.clear() 

651 cls._initialized = False 

652 cls._config = None 

653 cls._config_path = None 

654 cls._async_lock = None 

655 cls._registry = PluginInstanceRegistry() 

656 cls._executor = None 

657 cls._loader = PluginLoader() 

658 

659 @property 

660 def config(self) -> Config | None: 

661 """Plugin manager configuration. 

662 

663 Returns: 

664 The plugin configuration object or None if not configured. 

665 """ 

666 return self._config 

667 

668 @property 

669 def plugin_count(self) -> int: 

670 """Number of plugins loaded. 

671 

672 Returns: 

673 The number of currently loaded plugins. 

674 """ 

675 return self._registry.plugin_count 

676 

677 @property 

678 def initialized(self) -> bool: 

679 """Plugin manager initialization status. 

680 

681 Returns: 

682 True if the plugin manager has been initialized. 

683 """ 

684 return self._initialized 

685 

686 @property 

687 def observability(self) -> Optional[ObservabilityProvider]: 

688 """Current observability provider. 

689 

690 Returns: 

691 The observability provider or None if not configured. 

692 """ 

693 return self._executor.observability 

694 

695 @observability.setter 

696 def observability(self, provider: Optional[ObservabilityProvider]) -> None: 

697 """Set the observability provider. 

698 

699 Thread-safe: uses lock to prevent races with concurrent readers. 

700 

701 Args: 

702 provider: ObservabilityProvider to inject into the executor. 

703 """ 

704 with self.__lock: 

705 self._executor.observability = provider 

706 

707 def get_plugin(self, name: str) -> Optional[Plugin]: 

708 """Get a plugin by name. 

709 

710 Args: 

711 name: the name of the plugin to return. 

712 

713 Returns: 

714 A plugin. 

715 """ 

716 plugin_ref = self._registry.get_plugin(name) 

717 return plugin_ref.plugin if plugin_ref else None 

718 

719 def has_hooks_for(self, hook_type: str) -> bool: 

720 """Check if there are any hooks registered for a specific hook type. 

721 

722 Args: 

723 hook_type: The type of hook to check for. 

724 

725 Returns: 

726 True if there are hooks registered for the specified type, False otherwise. 

727 """ 

728 return self._registry.has_hooks_for(hook_type) 

729 

730 async def initialize(self) -> None: 

731 """Initialize the plugin manager and load all configured plugins. 

732 

733 This method: 

734 1. Loads plugin configurations from the config file 

735 2. Instantiates each enabled plugin 

736 3. Registers plugins with the registry 

737 4. Validates plugin initialization 

738 

739 Thread Safety: 

740 Uses asyncio.Lock to prevent concurrent initialization from multiple 

741 coroutines or async tasks. Combined with threading.Lock in __init__ 

742 for full multi-threaded safety. 

743 

744 Raises: 

745 RuntimeError: If plugin initialization fails with an exception. 

746 ValueError: If a plugin cannot be initialized or registered. 

747 

748 Examples: 

749 >>> manager = PluginManager("plugins/config.yaml") 

750 >>> # In async context: 

751 >>> # await manager.initialize() 

752 >>> # Manager is now ready to execute plugins 

753 """ 

754 # Initialize async lock lazily (can't create asyncio.Lock in class definition) 

755 with self.__lock: 

756 if self._async_lock is None: 

757 self._async_lock = asyncio.Lock() 

758 

759 async with self._async_lock: 

760 # Double-check after acquiring lock 

761 if self._initialized: 

762 logger.debug("Plugin manager already initialized") 

763 return 

764 

765 # Defensive cleanup: registry should be empty when not initialized 

766 if self._registry.plugin_count: 

767 logger.debug("Plugin registry not empty before initialize; clearing stale plugins") 

768 await self._registry.shutdown() 

769 

770 plugins = self._config.plugins if self._config and self._config.plugins else [] 

771 loaded_count = 0 

772 

773 for plugin_config in plugins: 

774 try: 

775 # For disabled plugins, create a stub plugin without full instantiation 

776 if plugin_config.mode != PluginMode.DISABLED: 

777 # Fully instantiate enabled plugins 

778 plugin = await self._loader.load_and_instantiate_plugin(plugin_config) 

779 if plugin: 

780 self._registry.register(plugin) 

781 loaded_count += 1 

782 logger.info("Loaded plugin: %s (mode: %s)", plugin_config.name, plugin_config.mode) 

783 else: 

784 raise ValueError(f"Unable to instantiate plugin: {plugin_config.name}") 

785 else: 

786 logger.info("Plugin: %s is disabled. Ignoring.", plugin_config.name) 

787 

788 except Exception as e: 

789 # Clean error message without stack trace spam 

790 logger.error("Failed to load plugin %s: {%s}", plugin_config.name, str(e)) 

791 # Let it crash gracefully with a clean error 

792 raise RuntimeError(f"Plugin initialization failed: {plugin_config.name} - {str(e)}") from e 

793 

794 self._initialized = True 

795 logger.info("Plugin manager initialized with %s plugins", loaded_count) 

796 

797 async def shutdown(self) -> None: 

798 """Shutdown all plugins and cleanup resources. 

799 

800 This method: 

801 1. Shuts down all registered plugins 

802 2. Clears the plugin registry 

803 3. Cleans up stored contexts 

804 4. Resets initialization state 

805 

806 Thread Safety: 

807 Uses asyncio.Lock to prevent concurrent shutdown with initialization 

808 or with another shutdown call. 

809 

810 Note: The config is preserved to allow modifying settings and re-initializing. 

811 To fully reset for a new config, create a new PluginManager instance. 

812 

813 Examples: 

814 >>> manager = PluginManager("plugins/config.yaml") 

815 >>> # In async context: 

816 >>> # await manager.initialize() 

817 >>> # ... use the manager ... 

818 >>> # await manager.shutdown() 

819 """ 

820 # Initialize async lock lazily if needed 

821 with self.__lock: 

822 if self._async_lock is None: 

823 self._async_lock = asyncio.Lock() 

824 

825 async with self._async_lock: 

826 if not self._initialized: 

827 logger.debug("Plugin manager not initialized, nothing to shutdown") 

828 return 

829 

830 logger.info("Shutting down plugin manager") 

831 

832 # Shutdown all plugins 

833 await self._registry.shutdown() 

834 

835 # Reset state to allow re-initialization 

836 self._initialized = False 

837 

838 logger.info("Plugin manager shutdown complete") 

839 

840 async def invoke_hook( 

841 self, 

842 hook_type: str, 

843 payload: PluginPayload, 

844 global_context: GlobalContext, 

845 local_contexts: Optional[PluginContextTable] = None, 

846 violations_as_exceptions: bool = False, 

847 ) -> tuple[PluginResult, PluginContextTable | None]: 

848 """Invoke a set of plugins configured for the hook point in priority order. 

849 

850 Args: 

851 hook_type: The type of hook to execute. 

852 payload: The plugin payload for which the plugins will analyze and modify. 

853 global_context: Shared context for all plugins with request metadata. 

854 local_contexts: Optional existing contexts from previous hook executions. 

855 violations_as_exceptions: Raise violations as exceptions rather than as returns. 

856 

857 Returns: 

858 A tuple containing: 

859 - PluginResult with processing status and modified payload 

860 - PluginContextTable with plugin contexts for state management 

861 

862 Examples: 

863 >>> manager = PluginManager("plugins/config.yaml") 

864 >>> # In async context: 

865 >>> # await manager.initialize() 

866 >>> # payload = ResourcePreFetchPayload("file:///data.txt") 

867 >>> # context = GlobalContext(request_id="123", server_id="srv1") 

868 >>> # result, contexts = await manager.resource_pre_fetch(payload, context) 

869 >>> # if result.continue_processing: 

870 >>> # # Use modified payload 

871 >>> # uri = result.modified_payload.uri 

872 """ 

873 # Get plugins configured for this hook 

874 hook_refs = self._registry.get_hook_refs_for_hook(hook_type=hook_type) 

875 

876 # Execute plugins 

877 result = await self._get_executor().execute(hook_refs, payload, global_context, hook_type, local_contexts, violations_as_exceptions) 

878 

879 return result 

880 

881 async def invoke_hook_for_plugin( 

882 self, 

883 name: str, 

884 hook_type: str, 

885 payload: Union[PluginPayload, dict[str, Any], str], 

886 context: Union[PluginContext, GlobalContext], 

887 violations_as_exceptions: bool = False, 

888 payload_as_json: bool = False, 

889 ) -> PluginResult: 

890 """Invoke a specific hook for a single named plugin. 

891 

892 This method allows direct invocation of a particular plugin's hook by name, 

893 bypassing the normal priority-ordered execution. Useful for testing individual 

894 plugins or when specific plugin behavior needs to be triggered independently. 

895 

896 Args: 

897 name: The name of the plugin to invoke. 

898 hook_type: The type of hook to execute (e.g., "prompt_pre_fetch"). 

899 payload: The plugin payload to be processed by the hook. 

900 context: Plugin execution context (PluginContext) or GlobalContext (will be wrapped). 

901 violations_as_exceptions: Raise violations as exceptions rather than returns. 

902 payload_as_json: payload passed in as json rather than pydantic. 

903 

904 Returns: 

905 PluginResult with processing status, modified payload, and metadata. 

906 

907 Raises: 

908 PluginError: If the plugin or hook type cannot be found in the registry. 

909 ValueError: If payload type does not match payload_as_json setting. 

910 

911 Examples: 

912 >>> manager = PluginManager("plugins/config.yaml") 

913 >>> # In async context: 

914 >>> # await manager.initialize() 

915 >>> # payload = PromptPrehookPayload(name="test", args={}) 

916 >>> # context = PluginContext(global_context=GlobalContext(request_id="123")) 

917 >>> # result = await manager.invoke_hook_for_plugin( 

918 >>> # name="auth_plugin", 

919 >>> # hook_type="prompt_pre_fetch", 

920 >>> # payload=payload, 

921 >>> # context=context 

922 >>> # ) 

923 """ 

924 # Auto-wrap GlobalContext in PluginContext for convenience 

925 if isinstance(context, GlobalContext): 

926 context = PluginContext(global_context=context) 

927 

928 hook_ref = self._registry.get_plugin_hook_by_name(name, hook_type) 

929 if not hook_ref: 

930 raise PluginError( 

931 error=PluginErrorModel( 

932 message=f"Unable to find {hook_type} for plugin {name}. Make sure the plugin is registered.", 

933 plugin_name=name, 

934 ) 

935 ) 

936 if payload_as_json: 

937 plugin = hook_ref.plugin_ref.plugin 

938 # When payload_as_json=True, payload should be str or dict 

939 if isinstance(payload, (str, dict)): 

940 pydantic_payload = plugin.json_to_payload(hook_type, payload) 

941 return await self._get_executor().execute_plugin(hook_ref, pydantic_payload, context, violations_as_exceptions) 

942 raise ValueError(f"When payload_as_json=True, payload must be str or dict, got {type(payload)}") 

943 # When payload_as_json=False, payload should already be a PluginPayload 

944 if not isinstance(payload, PluginPayload): 

945 raise ValueError(f"When payload_as_json=False, payload must be a PluginPayload, got {type(payload)}") 

946 return await self._get_executor().execute_plugin(hook_ref, payload, context, violations_as_exceptions)