Coverage for mcpgateway / plugins / framework / models.py: 99%

551 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/models.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo 

6 

7Pydantic models for plugins. 

8This module implements the pydantic models associated with 

9the base plugin layer including configurations, and contexts. 

10""" 

11 

12# Standard 

13from enum import Enum 

14import logging 

15import os 

16from pathlib import Path 

17from typing import Any, Generic, Optional, Self, TypeVar, Union 

18 

19# Third-Party 

20from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator, model_validator, PrivateAttr, ValidationInfo 

21 

22# First-Party 

23from mcpgateway.plugins.framework.constants import CMD, CWD, ENV, EXTERNAL_PLUGIN_TYPE, IGNORE_CONFIG_EXTERNAL, PYTHON_SUFFIX, SCRIPT, UDS, URL 

24from mcpgateway.plugins.framework.settings import get_client_mtls_settings, get_grpc_client_mtls_settings, get_grpc_server_settings, get_mcp_server_settings, get_transport_settings 

25from mcpgateway.plugins.framework.validators import validate_plugin_url 

26 

27T = TypeVar("T") 

28 

29 

30class TransportType(str, Enum): 

31 """Supported transport mechanisms for MCP plugin communication. 

32 

33 Attributes: 

34 SSE: Server-Sent Events transport. 

35 HTTP: Standard HTTP-based transport. 

36 STDIO: Standard input/output transport. 

37 STREAMABLEHTTP: HTTP transport with streaming. 

38 GRPC: gRPC transport for external plugins. 

39 

40 Examples: 

41 >>> TransportType.SSE 

42 <TransportType.SSE: 'SSE'> 

43 >>> TransportType.STDIO.value 

44 'STDIO' 

45 >>> TransportType('STREAMABLEHTTP') 

46 <TransportType.STREAMABLEHTTP: 'STREAMABLEHTTP'> 

47 """ 

48 

49 SSE = "SSE" 

50 HTTP = "HTTP" 

51 STDIO = "STDIO" 

52 STREAMABLEHTTP = "STREAMABLEHTTP" 

53 GRPC = "GRPC" 

54 

55 

56class PluginMode(str, Enum): 

57 """Plugin modes of operation. 

58 

59 Attributes: 

60 enforce: enforces the plugin result, and blocks execution when there is an error. 

61 enforce_ignore_error: enforces the plugin result, but allows execution when there is an error. 

62 permissive: audits the result. 

63 disabled: plugin disabled. 

64 

65 Examples: 

66 >>> PluginMode.ENFORCE 

67 <PluginMode.ENFORCE: 'enforce'> 

68 >>> PluginMode.ENFORCE_IGNORE_ERROR 

69 <PluginMode.ENFORCE_IGNORE_ERROR: 'enforce_ignore_error'> 

70 >>> PluginMode.PERMISSIVE.value 

71 'permissive' 

72 >>> PluginMode('disabled') 

73 <PluginMode.DISABLED: 'disabled'> 

74 >>> 'enforce' in [m.value for m in PluginMode] 

75 True 

76 """ 

77 

78 ENFORCE = "enforce" 

79 ENFORCE_IGNORE_ERROR = "enforce_ignore_error" 

80 PERMISSIVE = "permissive" 

81 DISABLED = "disabled" 

82 

83 

84class BaseTemplate(BaseModel): 

85 """Base Template.The ToolTemplate, PromptTemplate and ResourceTemplate could be extended using this 

86 

87 Attributes: 

88 context (Optional[list[str]]): specifies the keys of context to be extracted. The context could be global (shared between the plugins) or 

89 local (shared within the plugin). Example: global.key1. 

90 extensions (Optional[dict[str, Any]]): add custom keys for your specific plugin. Example - 'policy' 

91 key for opa plugin. 

92 

93 Examples: 

94 >>> base = BaseTemplate(context=["global.key1.key2", "local.key1.key2"]) 

95 >>> base.context 

96 ['global.key1.key2', 'local.key1.key2'] 

97 >>> base = BaseTemplate(context=["global.key1.key2"], extensions={"policy" : "sample policy"}) 

98 >>> base.extensions 

99 {'policy': 'sample policy'} 

100 """ 

101 

102 context: Optional[list[str]] = None 

103 extensions: Optional[dict[str, Any]] = None 

104 

105 

106class ToolTemplate(BaseTemplate): 

107 """Tool Template. 

108 

109 Attributes: 

110 tool_name (str): the name of the tool. 

111 fields (Optional[list[str]]): the tool fields that are affected. 

112 result (bool): analyze tool output if true. 

113 

114 Examples: 

115 >>> tool = ToolTemplate(tool_name="my_tool") 

116 >>> tool.tool_name 

117 'my_tool' 

118 >>> tool.result 

119 False 

120 >>> tool2 = ToolTemplate(tool_name="analyzer", fields=["input", "params"], result=True) 

121 >>> tool2.fields 

122 ['input', 'params'] 

123 >>> tool2.result 

124 True 

125 """ 

126 

127 tool_name: str 

128 fields: Optional[list[str]] = None 

129 result: bool = False 

130 

131 

132class PromptTemplate(BaseTemplate): 

133 """Prompt Template. 

134 

135 Attributes: 

136 prompt_name (str): the name of the prompt. 

137 fields (Optional[list[str]]): the prompt fields that are affected. 

138 result (bool): analyze tool output if true. 

139 

140 Examples: 

141 >>> prompt = PromptTemplate(prompt_name="greeting") 

142 >>> prompt.prompt_name 

143 'greeting' 

144 >>> prompt.result 

145 False 

146 >>> prompt2 = PromptTemplate(prompt_name="question", fields=["context"], result=True) 

147 >>> prompt2.fields 

148 ['context'] 

149 """ 

150 

151 prompt_name: str 

152 fields: Optional[list[str]] = None 

153 result: bool = False 

154 

155 

156class ResourceTemplate(BaseTemplate): 

157 """Resource Template. 

158 

159 Attributes: 

160 resource_uri (str): the URI of the resource. 

161 fields (Optional[list[str]]): the resource fields that are affected. 

162 result (bool): analyze resource output if true. 

163 

164 Examples: 

165 >>> resource = ResourceTemplate(resource_uri="file:///data.txt") 

166 >>> resource.resource_uri 

167 'file:///data.txt' 

168 >>> resource.result 

169 False 

170 >>> resource2 = ResourceTemplate(resource_uri="http://api/data", fields=["content"], result=True) 

171 >>> resource2.fields 

172 ['content'] 

173 """ 

174 

175 resource_uri: str 

176 fields: Optional[list[str]] = None 

177 result: bool = False 

178 

179 

180class PluginCondition(BaseModel): 

181 """Conditions for when plugin should execute. 

182 

183 Attributes: 

184 server_ids (Optional[set[str]]): set of server ids. 

185 tenant_ids (Optional[set[str]]): set of tenant ids. 

186 tools (Optional[set[str]]): set of tool names. 

187 prompts (Optional[set[str]]): set of prompt names. 

188 resources (Optional[set[str]]): set of resource URIs. 

189 agents (Optional[set[str]]): set of agent IDs. 

190 user_pattern (Optional[list[str]]): list of user patterns. 

191 content_types (Optional[list[str]]): list of content types. 

192 

193 Examples: 

194 >>> cond = PluginCondition(server_ids={"server1", "server2"}) 

195 >>> "server1" in cond.server_ids 

196 True 

197 >>> cond2 = PluginCondition(tools={"tool1"}, prompts={"prompt1"}) 

198 >>> cond2.tools 

199 {'tool1'} 

200 >>> cond3 = PluginCondition(user_patterns=["admin", "root"]) 

201 >>> len(cond3.user_patterns) 

202 2 

203 """ 

204 

205 server_ids: Optional[set[str]] = None 

206 tenant_ids: Optional[set[str]] = None 

207 tools: Optional[set[str]] = None 

208 prompts: Optional[set[str]] = None 

209 resources: Optional[set[str]] = None 

210 agents: Optional[set[str]] = None 

211 user_patterns: Optional[list[str]] = None 

212 content_types: Optional[list[str]] = None 

213 

214 @field_serializer("server_ids", "tenant_ids", "tools", "prompts", "resources", "agents") 

215 def serialize_set(self, value: set[str] | None) -> list[str] | None: 

216 """Serialize set objects in PluginCondition for MCP. 

217 

218 Args: 

219 value: a set of server ids, tenant ids, tools or prompts. 

220 

221 Returns: 

222 The set as a serializable list. 

223 """ 

224 if value: 

225 values = [] 

226 for key in value: 

227 values.append(key) 

228 return values 

229 return None 

230 

231 

232class AppliedTo(BaseModel): 

233 """What tools/prompts/resources and fields the plugin will be applied to. 

234 

235 Attributes: 

236 tools (Optional[list[ToolTemplate]]): tools and fields to be applied. 

237 prompts (Optional[list[PromptTemplate]]): prompts and fields to be applied. 

238 resources (Optional[list[ResourceTemplate]]): resources and fields to be applied. 

239 global_context (Optional[list[str]]): keys in the context to be applied on globally 

240 local_context(Optional[list[str]]): keys in the context to be applied on locally 

241 """ 

242 

243 tools: Optional[list[ToolTemplate]] = None 

244 prompts: Optional[list[PromptTemplate]] = None 

245 resources: Optional[list[ResourceTemplate]] = None 

246 

247 

248class MCPTransportTLSConfigBase(BaseModel): 

249 """Base TLS configuration with common fields for both client and server. 

250 

251 Attributes: 

252 certfile (Optional[str]): Path to the PEM-encoded certificate file. 

253 keyfile (Optional[str]): Path to the PEM-encoded private key file. 

254 ca_bundle (Optional[str]): Path to a CA bundle file for verification. 

255 keyfile_password (Optional[str]): Optional password for encrypted private key. 

256 """ 

257 

258 certfile: Optional[str] = Field(default=None, description="Path to PEM certificate file") 

259 keyfile: Optional[str] = Field(default=None, description="Path to PEM private key file") 

260 ca_bundle: Optional[str] = Field(default=None, description="Path to CA bundle for verification") 

261 keyfile_password: Optional[str] = Field(default=None, description="Password for encrypted private key") 

262 

263 @field_validator("ca_bundle", "certfile", "keyfile", mode="after") 

264 @classmethod 

265 def validate_path(cls, value: Optional[str]) -> Optional[str]: 

266 """Expand and validate file paths supplied in TLS configuration. 

267 

268 Args: 

269 value: File path to validate. 

270 

271 Returns: 

272 Expanded file path or None if not provided. 

273 

274 Raises: 

275 ValueError: If file path does not exist. 

276 """ 

277 

278 if not value: 

279 return value 

280 expanded = Path(value).expanduser() 

281 if not expanded.is_file(): 

282 raise ValueError(f"TLS file path does not exist: {value}") 

283 return str(expanded) 

284 

285 @model_validator(mode="after") 

286 def validate_cert_key(self) -> Self: # pylint: disable=bad-classmethod-argument 

287 """Ensure certificate and key options are consistent. 

288 

289 Returns: 

290 Self after validation. 

291 

292 Raises: 

293 ValueError: If keyfile is specified without certfile. 

294 """ 

295 

296 if self.keyfile and not self.certfile: 

297 raise ValueError("keyfile requires certfile to be specified") 

298 return self 

299 

300 

301class MCPClientTLSConfig(MCPTransportTLSConfigBase): 

302 """Client-side TLS configuration (gateway connecting to plugin). 

303 

304 Attributes: 

305 verify (bool): Whether to verify the remote server certificate. 

306 check_hostname (bool): Enable hostname verification when verify is true. 

307 """ 

308 

309 verify: bool = Field(default=True, description="Verify the upstream server certificate") 

310 check_hostname: bool = Field(default=True, description="Enable hostname verification") 

311 

312 @classmethod 

313 def from_env(cls) -> Optional["MCPClientTLSConfig"]: 

314 """Construct client TLS configuration from PLUGINS_CLIENT_* environment variables. 

315 

316 Returns: 

317 MCPClientTLSConfig instance or None if no environment variables are set. 

318 """ 

319 s = get_client_mtls_settings() 

320 data: dict[str, Any] = {} 

321 

322 if s.client_mtls_certfile: 

323 data["certfile"] = s.client_mtls_certfile 

324 if s.client_mtls_keyfile: 

325 data["keyfile"] = s.client_mtls_keyfile 

326 if s.client_mtls_ca_bundle: 

327 data["ca_bundle"] = s.client_mtls_ca_bundle 

328 if s.client_mtls_keyfile_password is not None: 

329 data["keyfile_password"] = s.client_mtls_keyfile_password.get_secret_value() 

330 if s.client_mtls_verify is not None: 

331 data["verify"] = s.client_mtls_verify 

332 if s.client_mtls_check_hostname is not None: 

333 data["check_hostname"] = s.client_mtls_check_hostname 

334 

335 if not data: 

336 return None 

337 

338 return cls(**data) 

339 

340 

341class MCPServerTLSConfig(MCPTransportTLSConfigBase): 

342 """Server-side TLS configuration (plugin accepting gateway connections). 

343 

344 Attributes: 

345 ssl_cert_reqs (int): Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED). 

346 """ 

347 

348 ssl_cert_reqs: int = Field(default=2, description="Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED)") 

349 

350 @classmethod 

351 def from_env(cls) -> Optional["MCPServerTLSConfig"]: 

352 """Construct server TLS configuration from PLUGINS_SERVER_SSL_* environment variables. 

353 

354 Returns: 

355 MCPServerTLSConfig instance or None if no environment variables are set. 

356 """ 

357 s = get_mcp_server_settings() 

358 data: dict[str, Any] = {} 

359 

360 if s.server_ssl_keyfile: 

361 data["keyfile"] = s.server_ssl_keyfile 

362 if s.server_ssl_certfile: 

363 data["certfile"] = s.server_ssl_certfile 

364 if s.server_ssl_ca_certs: 

365 data["ca_bundle"] = s.server_ssl_ca_certs 

366 if s.server_ssl_keyfile_password is not None: 

367 data["keyfile_password"] = s.server_ssl_keyfile_password.get_secret_value() 

368 if s.server_ssl_cert_reqs is not None: 

369 data["ssl_cert_reqs"] = s.server_ssl_cert_reqs 

370 

371 if not data: 

372 return None 

373 

374 return cls(**data) 

375 

376 

377class MCPServerConfig(BaseModel): 

378 """Server-side MCP configuration (plugin running as server). 

379 

380 Attributes: 

381 host (str): Server host to bind to. 

382 port (int): Server port to bind to. 

383 uds (Optional[str]): Unix domain socket path for streamable HTTP. 

384 tls (Optional[MCPServerTLSConfig]): Server-side TLS configuration. 

385 """ 

386 

387 host: str = Field(default="127.0.0.1", description="Server host to bind to") 

388 port: int = Field(default=8000, description="Server port to bind to") 

389 uds: Optional[str] = Field(default=None, description="Unix domain socket path for streamable HTTP") 

390 tls: Optional[MCPServerTLSConfig] = Field(default=None, description="Server-side TLS configuration") 

391 

392 @field_validator("uds", mode="after") 

393 @classmethod 

394 def validate_uds(cls, uds: str | None) -> str | None: 

395 """Validate the Unix domain socket path for security. 

396 

397 Args: 

398 uds: Unix domain socket path. 

399 

400 Returns: 

401 The validated canonical uds path or None if none is set. 

402 

403 Raises: 

404 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

405 """ 

406 if uds is None: 

407 return uds 

408 if not isinstance(uds, str) or not uds.strip(): 

409 raise ValueError("MCP server uds must be a non-empty string.") 

410 

411 uds_path = Path(uds).expanduser().resolve() 

412 if not uds_path.is_absolute(): 

413 raise ValueError(f"MCP server uds path must be absolute: {uds}") 

414 

415 parent_dir = uds_path.parent 

416 if not parent_dir.is_dir(): 

417 raise ValueError(f"MCP server uds parent directory does not exist: {parent_dir}") 

418 

419 # Check parent directory permissions for security 

420 try: 

421 parent_mode = parent_dir.stat().st_mode 

422 # Warn if parent directory is world-writable (o+w = 0o002) 

423 if parent_mode & 0o002: 

424 logging.getLogger(__name__).warning( 

425 "MCP server uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).", 

426 parent_dir, 

427 ) 

428 except OSError: 

429 pass # Best effort - continue if we can't check permissions 

430 

431 return str(uds_path) 

432 

433 @model_validator(mode="after") 

434 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument 

435 """Ensure TLS is not configured when using a Unix domain socket. 

436 

437 Returns: 

438 Self after validation. 

439 

440 Raises: 

441 ValueError: if tls is set with uds. 

442 """ 

443 if self.uds and self.tls: 

444 raise ValueError("TLS configuration is not supported for Unix domain sockets.") 

445 return self 

446 

447 @classmethod 

448 def from_env(cls) -> Optional["MCPServerConfig"]: 

449 """Construct server configuration from PLUGINS_SERVER_* environment variables. 

450 

451 Returns: 

452 MCPServerConfig instance or None if no environment variables are set. 

453 """ 

454 s = get_mcp_server_settings() 

455 data: dict[str, Any] = {} 

456 

457 if s.server_host: 

458 data["host"] = s.server_host 

459 if s.server_port is not None: 

460 data["port"] = s.server_port 

461 if s.server_uds: 

462 data["uds"] = s.server_uds 

463 

464 # Check if SSL/TLS is enabled 

465 if s.server_ssl_enabled: 

466 tls_config = MCPServerTLSConfig.from_env() 

467 if tls_config: 

468 data["tls"] = tls_config 

469 

470 if not data: 

471 return None 

472 

473 return cls(**data) 

474 

475 

476class MCPClientConfig(BaseModel): 

477 """Client-side MCP configuration (gateway connecting to external plugin). 

478 

479 Attributes: 

480 proto (TransportType): The MCP transport type. Can be SSE, STDIO, or STREAMABLEHTTP 

481 url (Optional[str]): An MCP URL. Only valid when MCP transport type is SSE or STREAMABLEHTTP. 

482 script (Optional[str]): The path and name to the STDIO script that runs the plugin server. Only valid for STDIO type. 

483 cmd (Optional[list[str]]): Command + args used to start a STDIO MCP server. Only valid for STDIO type. 

484 env (Optional[dict[str, str]]): Environment overrides for STDIO server process. 

485 cwd (Optional[str]): Working directory for STDIO server process. 

486 uds (Optional[str]): Unix domain socket path for streamable HTTP. 

487 tls (Optional[MCPClientTLSConfig]): Client-side TLS configuration for mTLS. 

488 reconnect_attempts (int): Number of reconnection attempts on failure. 

489 reconnect_delay (float): Base delay between reconnection attempts (seconds). 

490 """ 

491 

492 proto: TransportType 

493 url: Optional[str] = None 

494 script: Optional[str] = None 

495 cmd: Optional[list[str]] = None 

496 env: Optional[dict[str, str]] = None 

497 cwd: Optional[str] = None 

498 uds: Optional[str] = None 

499 tls: Optional[MCPClientTLSConfig] = None 

500 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts on failure") 

501 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)") 

502 

503 @field_validator(URL, mode="after") 

504 @classmethod 

505 def validate_url(cls, url: str | None) -> str | None: 

506 """Validate a MCP url for streamable HTTP connections. 

507 

508 Args: 

509 url: the url to be validated. 

510 

511 Raises: 

512 ValueError: if the URL fails validation. 

513 

514 Returns: 

515 The validated URL or None if none is set. 

516 """ 

517 if url: 

518 result = validate_plugin_url(url) 

519 return result 

520 return url 

521 

522 @field_validator(SCRIPT, mode="after") 

523 @classmethod 

524 def validate_script(cls, script: str | None) -> str | None: 

525 """Validate an MCP stdio script. 

526 

527 Args: 

528 script: the script to be validated. 

529 

530 Raises: 

531 ValueError: if the script doesn't exist or isn't executable when required. 

532 

533 Returns: 

534 The validated string or None if none is set. 

535 """ 

536 if script: 

537 file_path = Path(script).expanduser() 

538 # Allow relative paths; they are resolved at runtime (optionally using cwd). 

539 if file_path.is_absolute(): 

540 if not file_path.is_file(): 

541 raise ValueError(f"MCP server script {script} does not exist.") 

542 # Allow Python (.py) and shell scripts (.sh). Other files must be executable. 

543 if file_path.suffix not in {PYTHON_SUFFIX, ".sh"} and not os.access(file_path, os.X_OK): 

544 raise ValueError(f"MCP server script {script} must be executable.") 

545 return script 

546 

547 @field_validator(CMD, mode="after") 

548 @classmethod 

549 def validate_cmd(cls, cmd: list[str] | None) -> list[str] | None: 

550 """Validate an MCP stdio command. 

551 

552 Args: 

553 cmd: the command to be validated. 

554 

555 Raises: 

556 ValueError: if cmd is empty or contains empty values. 

557 

558 Returns: 

559 The validated command list or None if none is set. 

560 """ 

561 if cmd is None: 

562 return cmd 

563 if not isinstance(cmd, list) or not cmd: 

564 raise ValueError("MCP stdio cmd must be a non-empty list.") 

565 if not all(isinstance(part, str) and part.strip() for part in cmd): 

566 raise ValueError("MCP stdio cmd entries must be non-empty strings.") 

567 return cmd 

568 

569 @field_validator(ENV, mode="after") 

570 @classmethod 

571 def validate_env(cls, env: dict[str, str] | None) -> dict[str, str] | None: 

572 """Validate environment overrides for MCP stdio. 

573 

574 Args: 

575 env: Environment overrides to set for the stdio plugin process. 

576 

577 Returns: 

578 The validated environment dict or None if none is set. 

579 

580 Raises: 

581 ValueError: if keys/values are invalid or the dict is empty. 

582 """ 

583 if env is None: 

584 return env 

585 if not isinstance(env, dict) or not env: 

586 raise ValueError("MCP stdio env must be a non-empty dict.") 

587 for key, value in env.items(): 

588 if not isinstance(key, str) or not key.strip(): 

589 raise ValueError("MCP stdio env keys must be non-empty strings.") 

590 if not isinstance(value, str): 

591 raise ValueError("MCP stdio env values must be strings.") 

592 return env 

593 

594 @field_validator(CWD, mode="after") 

595 @classmethod 

596 def validate_cwd(cls, cwd: str | None) -> str | None: 

597 """Validate the working directory for MCP stdio. 

598 

599 Args: 

600 cwd: Working directory for the stdio plugin process. 

601 

602 Returns: 

603 The validated canonical cwd path or None if none is set. 

604 

605 Raises: 

606 ValueError: if cwd does not exist or is not a directory. 

607 """ 

608 if not cwd: 

609 return cwd 

610 cwd_path = Path(cwd).expanduser().resolve() 

611 if not cwd_path.is_dir(): 

612 raise ValueError(f"MCP stdio cwd {cwd} does not exist or is not a directory.") 

613 return str(cwd_path) 

614 

615 @field_validator(UDS, mode="after") 

616 @classmethod 

617 def validate_uds(cls, uds: str | None) -> str | None: 

618 """Validate a Unix domain socket path for streamable HTTP. 

619 

620 Args: 

621 uds: Unix domain socket path. 

622 

623 Returns: 

624 The validated canonical uds path or None if none is set. 

625 

626 Raises: 

627 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

628 """ 

629 if uds is None: 

630 return uds 

631 if not isinstance(uds, str) or not uds.strip(): 

632 raise ValueError("MCP client uds must be a non-empty string.") 

633 

634 uds_path = Path(uds).expanduser().resolve() 

635 if not uds_path.is_absolute(): 

636 raise ValueError(f"MCP client uds path must be absolute: {uds}") 

637 

638 parent_dir = uds_path.parent 

639 if not parent_dir.is_dir(): 

640 raise ValueError(f"MCP client uds parent directory does not exist: {parent_dir}") 

641 

642 # Check parent directory permissions for security 

643 try: 

644 parent_mode = parent_dir.stat().st_mode 

645 # Warn if parent directory is world-writable (o+w = 0o002) 

646 if parent_mode & 0o002: 

647 logging.getLogger(__name__).warning( 

648 "MCP client uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).", 

649 parent_dir, 

650 ) 

651 except OSError: 

652 pass # Best effort - continue if we can't check permissions 

653 

654 return str(uds_path) 

655 

656 @model_validator(mode="after") 

657 def validate_tls_usage(self) -> Self: # pylint: disable=bad-classmethod-argument 

658 """Ensure TLS configuration is only used with HTTP-based transports. 

659 

660 Returns: 

661 Self after validation. 

662 

663 Raises: 

664 ValueError: If TLS configuration is used with non-HTTP transports. 

665 """ 

666 

667 if self.tls and self.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP): 

668 raise ValueError("TLS configuration is only valid for HTTP/SSE transports") 

669 if self.uds and self.tls: 

670 raise ValueError("TLS configuration is not supported for Unix domain sockets.") 

671 return self 

672 

673 @model_validator(mode="after") 

674 def validate_transport_fields(self) -> Self: # pylint: disable=bad-classmethod-argument 

675 """Ensure transport-specific fields are only used with matching transports. 

676 

677 Returns: 

678 Self after validation. 

679 

680 Raises: 

681 ValueError: if fields are incompatible with the selected transport. 

682 """ 

683 if self.proto == TransportType.STDIO and self.url: 

684 raise ValueError("URL is only valid for HTTP/SSE transports") 

685 if self.proto != TransportType.STDIO and (self.script or self.cmd or self.env or self.cwd): 

686 raise ValueError("script/cmd/env/cwd are only valid for STDIO transport") 

687 if self.proto != TransportType.STREAMABLEHTTP and self.uds: 

688 raise ValueError("uds is only valid for STREAMABLEHTTP transport") 

689 return self 

690 

691 

692class GRPCClientTLSConfig(MCPTransportTLSConfigBase): 

693 """Client-side gRPC TLS configuration (gateway connecting to plugin). 

694 

695 Attributes: 

696 verify (bool): Whether to verify the remote server certificate. 

697 """ 

698 

699 verify: bool = Field(default=True, description="Verify the upstream server certificate") 

700 

701 @classmethod 

702 def from_env(cls) -> Optional["GRPCClientTLSConfig"]: 

703 """Construct gRPC client TLS configuration from PLUGINS_GRPC_CLIENT_* environment variables. 

704 

705 Returns: 

706 GRPCClientTLSConfig instance or None if no environment variables are set. 

707 """ 

708 s = get_grpc_client_mtls_settings() 

709 data: dict[str, Any] = {} 

710 

711 if s.grpc_client_mtls_certfile: 

712 data["certfile"] = s.grpc_client_mtls_certfile 

713 if s.grpc_client_mtls_keyfile: 

714 data["keyfile"] = s.grpc_client_mtls_keyfile 

715 if s.grpc_client_mtls_ca_bundle: 

716 data["ca_bundle"] = s.grpc_client_mtls_ca_bundle 

717 if s.grpc_client_mtls_keyfile_password is not None: 

718 data["keyfile_password"] = s.grpc_client_mtls_keyfile_password.get_secret_value() 

719 if s.grpc_client_mtls_verify is not None: 

720 data["verify"] = s.grpc_client_mtls_verify 

721 

722 if not data: 

723 return None 

724 

725 return cls(**data) 

726 

727 

728class GRPCServerTLSConfig(MCPTransportTLSConfigBase): 

729 """Server-side gRPC TLS configuration (plugin accepting gateway connections). 

730 

731 Attributes: 

732 client_auth (str): Client certificate requirement ('none', 'optional', 'require'). 

733 """ 

734 

735 client_auth: str = Field(default="require", description="Client certificate requirement (none, optional, require)") 

736 

737 @field_validator("client_auth", mode="after") 

738 @classmethod 

739 def validate_client_auth(cls, value: str) -> str: 

740 """Validate client_auth value. 

741 

742 Args: 

743 value: Client auth requirement string. 

744 

745 Returns: 

746 Validated client auth string. 

747 

748 Raises: 

749 ValueError: If client_auth is not a valid value. 

750 """ 

751 valid_values = {"none", "optional", "require"} 

752 if value.lower() not in valid_values: 

753 raise ValueError(f"client_auth must be one of {valid_values}, got '{value}'") 

754 return value.lower() 

755 

756 @classmethod 

757 def from_env(cls) -> Optional["GRPCServerTLSConfig"]: 

758 """Construct gRPC server TLS configuration from PLUGINS_GRPC_SERVER_SSL_* environment variables. 

759 

760 Returns: 

761 GRPCServerTLSConfig instance or None if no environment variables are set. 

762 """ 

763 s = get_grpc_server_settings() 

764 data: dict[str, Any] = {} 

765 

766 if s.grpc_server_ssl_keyfile: 

767 data["keyfile"] = s.grpc_server_ssl_keyfile 

768 if s.grpc_server_ssl_certfile: 

769 data["certfile"] = s.grpc_server_ssl_certfile 

770 if s.grpc_server_ssl_ca_certs: 

771 data["ca_bundle"] = s.grpc_server_ssl_ca_certs 

772 if s.grpc_server_ssl_keyfile_password is not None: 

773 data["keyfile_password"] = s.grpc_server_ssl_keyfile_password.get_secret_value() 

774 if s.grpc_server_ssl_client_auth: 

775 data["client_auth"] = s.grpc_server_ssl_client_auth 

776 

777 if not data: 

778 return None 

779 

780 return cls(**data) 

781 

782 

783class GRPCClientConfig(BaseModel): 

784 """Client-side gRPC configuration (gateway connecting to external plugin). 

785 

786 Attributes: 

787 target (Optional[str]): The gRPC target address in host:port format. 

788 uds (Optional[str]): Unix domain socket path (alternative to target). 

789 tls (Optional[GRPCClientTLSConfig]): Client-side TLS configuration for mTLS. 

790 

791 Examples: 

792 >>> # TCP connection 

793 >>> config = GRPCClientConfig(target="localhost:50051") 

794 >>> config.get_target() 

795 'localhost:50051' 

796 >>> # Unix domain socket connection (path is resolved to canonical form) 

797 >>> config = GRPCClientConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP 

798 >>> config.get_target() # doctest: +SKIP 

799 'unix:///tmp/grpc-plugin.sock' 

800 """ 

801 

802 target: Optional[str] = Field(default=None, description="gRPC target address (host:port)") 

803 uds: Optional[str] = Field(default=None, description="Unix domain socket path") 

804 tls: Optional[GRPCClientTLSConfig] = None 

805 

806 @field_validator("target", mode="after") 

807 @classmethod 

808 def validate_target(cls, target: str | None) -> str | None: 

809 """Validate gRPC target address format. 

810 

811 Args: 

812 target: The target address to validate. 

813 

814 Returns: 

815 The validated target address. 

816 

817 Raises: 

818 ValueError: If target is not in host:port format. 

819 """ 

820 if target is None: 

821 return target 

822 if not target: 

823 raise ValueError("gRPC target address cannot be empty") 

824 # Basic validation - should contain host and port 

825 if ":" not in target: 

826 raise ValueError(f"gRPC target must be in host:port format, got '{target}'") 

827 return target 

828 

829 @field_validator("uds", mode="after") 

830 @classmethod 

831 def validate_uds(cls, uds: str | None) -> str | None: 

832 """Validate Unix domain socket path for gRPC. 

833 

834 Args: 

835 uds: Unix domain socket path. 

836 

837 Returns: 

838 The validated canonical uds path or None if none is set. 

839 

840 Raises: 

841 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

842 """ 

843 if uds is None: 

844 return uds 

845 if not isinstance(uds, str) or not uds.strip(): 

846 raise ValueError("gRPC client uds must be a non-empty string.") 

847 

848 uds_path = Path(uds).expanduser().resolve() 

849 if not uds_path.is_absolute(): 

850 raise ValueError(f"gRPC client uds path must be absolute: {uds}") 

851 

852 parent_dir = uds_path.parent 

853 if not parent_dir.is_dir(): 

854 raise ValueError(f"gRPC client uds parent directory does not exist: {parent_dir}") 

855 

856 # Check parent directory permissions for security 

857 try: 

858 parent_mode = parent_dir.stat().st_mode 

859 if parent_mode & 0o002: 

860 logging.getLogger(__name__).warning( 

861 "gRPC client uds parent directory %s is world-writable. Consider using a directory with restricted permissions.", 

862 parent_dir, 

863 ) 

864 except OSError: 

865 pass 

866 

867 return str(uds_path) 

868 

869 @model_validator(mode="after") 

870 def validate_target_or_uds(self) -> Self: # pylint: disable=bad-classmethod-argument 

871 """Ensure exactly one of target or uds is configured. 

872 

873 Returns: 

874 Self after validation. 

875 

876 Raises: 

877 ValueError: If neither or both target and uds are set. 

878 """ 

879 has_target = self.target is not None 

880 has_uds = self.uds is not None 

881 

882 if not has_target and not has_uds: 

883 raise ValueError("gRPC client must have either 'target' or 'uds' configured") 

884 if has_target and has_uds: 

885 raise ValueError("gRPC client cannot have both 'target' and 'uds' configured") 

886 if has_uds and self.tls: 

887 raise ValueError("TLS configuration is not supported for Unix domain sockets") 

888 return self 

889 

890 def get_target(self) -> str: 

891 """Get the gRPC target string for channel creation. 

892 

893 Returns: 

894 str: The target string, either host:port or unix:///path format. 

895 """ 

896 if self.uds: 

897 return f"unix://{self.uds}" 

898 return self.target or "" 

899 

900 

901class GRPCServerConfig(BaseModel): 

902 """Server-side gRPC configuration (plugin running as gRPC server). 

903 

904 Attributes: 

905 host (str): Server host to bind to. 

906 port (int): Server port to bind to. 

907 uds (Optional[str]): Unix domain socket path (alternative to host:port). 

908 tls (Optional[GRPCServerTLSConfig]): Server-side TLS configuration. 

909 

910 Examples: 

911 >>> # TCP binding 

912 >>> config = GRPCServerConfig(host="0.0.0.0", port=50051) 

913 >>> config.get_bind_address() 

914 '0.0.0.0:50051' 

915 >>> # Unix domain socket binding (path is resolved to canonical form) 

916 >>> config = GRPCServerConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP 

917 >>> config.get_bind_address() # doctest: +SKIP 

918 'unix:///tmp/grpc-plugin.sock' 

919 """ 

920 

921 host: str = Field(default="127.0.0.1", description="Server host to bind to") 

922 port: int = Field(default=50051, description="Server port to bind to") 

923 uds: Optional[str] = Field(default=None, description="Unix domain socket path") 

924 tls: Optional[GRPCServerTLSConfig] = Field(default=None, description="Server-side TLS configuration") 

925 

926 @field_validator("uds", mode="after") 

927 @classmethod 

928 def validate_uds(cls, uds: str | None) -> str | None: 

929 """Validate Unix domain socket path for gRPC server. 

930 

931 Args: 

932 uds: Unix domain socket path. 

933 

934 Returns: 

935 The validated canonical uds path or None if none is set. 

936 

937 Raises: 

938 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

939 """ 

940 if uds is None: 

941 return uds 

942 if not isinstance(uds, str) or not uds.strip(): 

943 raise ValueError("gRPC server uds must be a non-empty string.") 

944 

945 uds_path = Path(uds).expanduser().resolve() 

946 if not uds_path.is_absolute(): 

947 raise ValueError(f"gRPC server uds path must be absolute: {uds}") 

948 

949 parent_dir = uds_path.parent 

950 if not parent_dir.is_dir(): 

951 raise ValueError(f"gRPC server uds parent directory does not exist: {parent_dir}") 

952 

953 # Check parent directory permissions for security 

954 try: 

955 parent_mode = parent_dir.stat().st_mode 

956 if parent_mode & 0o002: 

957 logging.getLogger(__name__).warning( 

958 "gRPC server uds parent directory %s is world-writable. Consider using a directory with restricted permissions.", 

959 parent_dir, 

960 ) 

961 except OSError: 

962 pass 

963 

964 return str(uds_path) 

965 

966 @model_validator(mode="after") 

967 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument 

968 """Ensure TLS is not configured when using a Unix domain socket. 

969 

970 Returns: 

971 Self after validation. 

972 

973 Raises: 

974 ValueError: if tls is set with uds. 

975 """ 

976 if self.uds and self.tls: 

977 raise ValueError("TLS configuration is not supported for Unix domain sockets") 

978 return self 

979 

980 def get_bind_address(self) -> str: 

981 """Get the gRPC bind address string. 

982 

983 Returns: 

984 str: The bind address, either host:port or unix:///path format. 

985 """ 

986 if self.uds: 

987 return f"unix://{self.uds}" 

988 return f"{self.host}:{self.port}" 

989 

990 @classmethod 

991 def from_env(cls) -> Optional["GRPCServerConfig"]: 

992 """Construct gRPC server configuration from PLUGINS_GRPC_SERVER_* environment variables. 

993 

994 Returns: 

995 GRPCServerConfig instance or None if no environment variables are set. 

996 """ 

997 s = get_grpc_server_settings() 

998 data: dict[str, Any] = {} 

999 

1000 if s.grpc_server_host: 

1001 data["host"] = s.grpc_server_host 

1002 if s.grpc_server_port is not None: 

1003 data["port"] = s.grpc_server_port 

1004 if s.grpc_server_uds: 

1005 data["uds"] = s.grpc_server_uds 

1006 

1007 # Check if SSL/TLS is enabled 

1008 if s.grpc_server_ssl_enabled: 

1009 tls_config = GRPCServerTLSConfig.from_env() 

1010 if tls_config: 

1011 data["tls"] = tls_config 

1012 

1013 if not data: 

1014 return None 

1015 

1016 return cls(**data) 

1017 

1018 

1019class UnixSocketClientConfig(BaseModel): 

1020 """Client-side Unix socket configuration (gateway connecting to external plugin). 

1021 

1022 Attributes: 

1023 path (str): Path to the Unix domain socket file. 

1024 reconnect_attempts (int): Number of reconnection attempts on failure. 

1025 reconnect_delay (float): Base delay between reconnection attempts (with exponential backoff). 

1026 timeout (float): Timeout for read operations in seconds. 

1027 

1028 Examples: 

1029 >>> config = UnixSocketClientConfig(path="/tmp/plugin.sock") 

1030 >>> config.path 

1031 '/tmp/plugin.sock' 

1032 >>> config.reconnect_attempts 

1033 3 

1034 """ 

1035 

1036 path: str = Field(..., description="Path to the Unix domain socket") 

1037 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts") 

1038 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)") 

1039 timeout: float = Field(default=30.0, description="Read timeout in seconds") 

1040 

1041 @field_validator("path", mode="after") 

1042 @classmethod 

1043 def validate_path(cls, path: str) -> str: 

1044 """Validate Unix socket path. 

1045 

1046 Args: 

1047 path: The socket path to validate. 

1048 

1049 Returns: 

1050 The validated path. 

1051 

1052 Raises: 

1053 ValueError: If path is empty or invalid. 

1054 """ 

1055 if not path: 

1056 raise ValueError("Unix socket path cannot be empty") 

1057 if not path.startswith("/"): 

1058 raise ValueError(f"Unix socket path must be absolute, got '{path}'") 

1059 return path 

1060 

1061 

1062class UnixSocketServerConfig(BaseModel): 

1063 """Server-side Unix socket configuration (plugin running as Unix socket server). 

1064 

1065 Attributes: 

1066 path (str): Path to the Unix domain socket file. 

1067 

1068 Examples: 

1069 >>> config = UnixSocketServerConfig(path="/tmp/plugin.sock") 

1070 >>> config.path 

1071 '/tmp/plugin.sock' 

1072 """ 

1073 

1074 path: str = Field(default="/tmp/mcpgateway-plugins.sock", description="Path to the Unix domain socket") # nosec B108 - configurable default 

1075 

1076 @classmethod 

1077 def from_env(cls) -> Optional["UnixSocketServerConfig"]: 

1078 """Construct Unix socket server configuration from environment variables. 

1079 

1080 Returns: 

1081 UnixSocketServerConfig instance or None if no environment variables are set. 

1082 """ 

1083 s = get_transport_settings() 

1084 data: dict[str, Any] = {} 

1085 

1086 if s.unix_socket_path: 

1087 data["path"] = s.unix_socket_path 

1088 

1089 if not data: 

1090 return None 

1091 

1092 return cls(**data) 

1093 

1094 

1095class PluginConfig(BaseModel): 

1096 """A plugin configuration. 

1097 

1098 Attributes: 

1099 name (str): The unique name of the plugin. 

1100 description (str): A description of the plugin. 

1101 author (str): The author of the plugin. 

1102 kind (str): The kind or type of plugin. Usually a fully qualified object type. 

1103 namespace (str): The namespace where the plugin resides. 

1104 version (str): version of the plugin. 

1105 hooks (list[str]): a list of the hook points where the plugin will be called. Default: []. 

1106 tags (list[str]): a list of tags for making the plugin searchable. 

1107 mode (bool): whether the plugin is active. 

1108 priority (int): indicates the order in which the plugin is run. Lower = higher priority. Default: 100. 

1109 conditions (Optional[list[PluginCondition]]): the conditions on which the plugin is run. 

1110 applied_to (Optional[list[AppliedTo]]): the tools, fields, that the plugin is applied to. 

1111 config (dict[str, Any]): the plugin specific configurations. 

1112 mcp (Optional[MCPClientConfig]): Client-side MCP configuration (gateway connecting to plugin). 

1113 grpc (Optional[GRPCClientConfig]): Client-side gRPC configuration (gateway connecting to plugin). 

1114 """ 

1115 

1116 name: str 

1117 description: Optional[str] = None 

1118 author: Optional[str] = None 

1119 kind: str 

1120 namespace: Optional[str] = None 

1121 version: Optional[str] = None 

1122 hooks: list[str] = Field(default_factory=list) 

1123 tags: list[str] = Field(default_factory=list) 

1124 mode: PluginMode = PluginMode.ENFORCE 

1125 priority: int = 100 # Lower = higher priority 

1126 conditions: list[PluginCondition] = Field(default_factory=list) # When to apply 

1127 applied_to: Optional[AppliedTo] = None # Fields to apply to. 

1128 config: Optional[dict[str, Any]] = None 

1129 mcp: Optional[MCPClientConfig] = None 

1130 grpc: Optional[GRPCClientConfig] = None 

1131 unix_socket: Optional[UnixSocketClientConfig] = None 

1132 

1133 @model_validator(mode="after") 

1134 def check_url_or_script_filled(self) -> Self: # pylint: disable=bad-classmethod-argument 

1135 """Checks to see that at least one of url or script are set depending on MCP server configuration. 

1136 

1137 Raises: 

1138 ValueError: if the script/cmd attribute is not defined with STDIO set, or the URL not defined with HTTP transports. 

1139 

1140 Returns: 

1141 The model after validation. 

1142 """ 

1143 if not self.mcp: 

1144 return self 

1145 if self.mcp.proto == TransportType.STDIO and not (self.mcp.script or self.mcp.cmd): 

1146 raise ValueError(f"Plugin {self.name} has transport type set to STDIO but no script/cmd value") 

1147 if self.mcp.proto == TransportType.STDIO and self.mcp.script and self.mcp.cmd: 

1148 raise ValueError(f"Plugin {self.name} must set either script or cmd for STDIO, not both") 

1149 if self.mcp.proto in (TransportType.STREAMABLEHTTP, TransportType.SSE) and not self.mcp.url: 

1150 raise ValueError(f"Plugin {self.name} has transport type set to StreamableHTTP but no url value") 

1151 if self.mcp.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP, TransportType.STDIO): 

1152 raise ValueError(f"Plugin {self.name} must set transport type to either SSE or STREAMABLEHTTP or STDIO") 

1153 return self 

1154 

1155 @model_validator(mode="after") 

1156 def check_config_and_external(self, info: ValidationInfo) -> Self: # pylint: disable=bad-classmethod-argument 

1157 """Checks to see that a plugin's 'config' section is not defined if the kind is 'external'. This is because developers cannot override items in the plugin config section for external plugins. 

1158 

1159 Args: 

1160 info: the contextual information passed into the pydantic model during model validation. Used to determine validation sequence. 

1161 

1162 Raises: 

1163 ValueError: if the script attribute is not defined with STDIO set, or the URL not defined with HTTP transports. 

1164 

1165 Returns: 

1166 The model after validation. 

1167 """ 

1168 ignore_config_external = False 

1169 if info and info.context and IGNORE_CONFIG_EXTERNAL in info.context: 

1170 ignore_config_external = info.context[IGNORE_CONFIG_EXTERNAL] 

1171 

1172 if not ignore_config_external and self.config and self.kind == EXTERNAL_PLUGIN_TYPE: 

1173 raise ValueError(f"""Cannot have {self.name} plugin defined as 'external' with 'config' set.""" """ 'config' section settings can only be set on the plugin server.""") 

1174 

1175 # External plugins must have exactly one transport configured (mcp, grpc, or unix_socket) 

1176 if self.kind == EXTERNAL_PLUGIN_TYPE: 

1177 has_mcp = self.mcp is not None 

1178 has_grpc = self.grpc is not None 

1179 has_unix = self.unix_socket is not None 

1180 transport_count = sum([has_mcp, has_grpc, has_unix]) 

1181 

1182 if transport_count == 0: 

1183 raise ValueError(f"External plugin {self.name} must have 'mcp', 'grpc', or 'unix_socket' section configured") 

1184 if transport_count > 1: 

1185 raise ValueError(f"External plugin {self.name} can only have one transport configured (mcp, grpc, or unix_socket)") 

1186 

1187 return self 

1188 

1189 

1190class PluginManifest(BaseModel): 

1191 """Plugin manifest. 

1192 

1193 Attributes: 

1194 description (str): A description of the plugin. 

1195 author (str): The author of the plugin. 

1196 version (str): version of the plugin. 

1197 tags (list[str]): a list of tags for making the plugin searchable. 

1198 available_hooks (list[str]): a list of the hook points where the plugin is callable. 

1199 default_config (dict[str, Any]): the default configurations. 

1200 """ 

1201 

1202 description: str 

1203 author: str 

1204 version: str 

1205 tags: list[str] 

1206 available_hooks: list[str] 

1207 default_config: dict[str, Any] 

1208 

1209 

1210class PluginErrorModel(BaseModel): 

1211 """A plugin error, used to denote exceptions/errors inside external plugins. 

1212 

1213 Attributes: 

1214 message (str): the reason for the error. 

1215 code (str): an error code. 

1216 details: (dict[str, Any]): additional error details. 

1217 plugin_name (str): the plugin name. 

1218 mcp_error_code ([int]): The MCP error code passed back to the client. Defaults to Internal Error. 

1219 """ 

1220 

1221 message: str 

1222 plugin_name: str 

1223 code: Optional[str] = "" 

1224 details: Optional[dict[str, Any]] = Field(default_factory=dict) 

1225 mcp_error_code: int = -32603 

1226 

1227 

1228class PluginViolation(BaseModel): 

1229 """A plugin violation, used to denote policy violations. 

1230 

1231 Attributes: 

1232 reason (str): the reason for the violation. 

1233 description (str): a longer description of the violation. 

1234 code (str): a violation code. 

1235 details: (dict[str, Any]): additional violation details. 

1236 _plugin_name (str): the plugin name, private attribute set by the plugin manager. 

1237 mcp_error_code(Optional[int]): A valid mcp error code which will be sent back to the client if plugin enabled. 

1238 http_status_code (Optional[int]): HTTP status code to return (e.g., 429 for rate limiting). 

1239 http_headers (Optional[dict[str, str]]): HTTP headers to include in the response. 

1240 

1241 Examples: 

1242 >>> violation = PluginViolation( 

1243 ... reason="Invalid input", 

1244 ... description="The input contains prohibited content", 

1245 ... code="PROHIBITED_CONTENT", 

1246 ... details={"field": "message", "value": "test"} 

1247 ... ) 

1248 >>> violation.reason 

1249 'Invalid input' 

1250 >>> violation.code 

1251 'PROHIBITED_CONTENT' 

1252 >>> violation.plugin_name = "content_filter" 

1253 >>> violation.plugin_name 

1254 'content_filter' 

1255 """ 

1256 

1257 reason: str 

1258 description: str 

1259 code: str 

1260 details: Optional[dict[str, Any]] = Field(default_factory=dict) 

1261 _plugin_name: str = PrivateAttr(default="") 

1262 mcp_error_code: Optional[int] = None 

1263 http_status_code: Optional[int] = None 

1264 http_headers: Optional[dict[str, str]] = None 

1265 

1266 @property 

1267 def plugin_name(self) -> str: 

1268 """Getter for the plugin name attribute. 

1269 

1270 Returns: 

1271 The plugin name associated with the violation. 

1272 """ 

1273 return self._plugin_name 

1274 

1275 @plugin_name.setter 

1276 def plugin_name(self, name: str) -> None: 

1277 """Setter for the plugin_name attribute. 

1278 

1279 Args: 

1280 name: the plugin name. 

1281 

1282 Raises: 

1283 ValueError: if name is empty or not a string. 

1284 """ 

1285 if not isinstance(name, str) or not name.strip(): 

1286 raise ValueError("Name must be a non-empty string.") 

1287 self._plugin_name = name 

1288 

1289 

1290class PluginSettings(BaseModel): 

1291 """Global plugin settings. 

1292 

1293 Attributes: 

1294 parallel_execution_within_band (bool): execute plugins with same priority in parallel. 

1295 plugin_timeout (int): timeout value for plugins operations. 

1296 fail_on_plugin_error (bool): error when there is a plugin connectivity or ignore. 

1297 enable_plugin_api (bool): enable or disable plugins globally. 

1298 plugin_health_check_interval (int): health check interval check. 

1299 include_user_info (bool): if enabled user info is injected in plugin context 

1300 """ 

1301 

1302 parallel_execution_within_band: bool = False 

1303 plugin_timeout: int = 30 

1304 fail_on_plugin_error: bool = False 

1305 enable_plugin_api: bool = False 

1306 plugin_health_check_interval: int = 60 

1307 include_user_info: bool = False 

1308 

1309 

1310class Config(BaseModel): 

1311 """Configurations for plugins. 

1312 

1313 Attributes: 

1314 plugins (Optional[list[PluginConfig]]): the list of plugins to enable. 

1315 plugin_dirs (list[str]): The directories in which to look for plugins. 

1316 plugin_settings (PluginSettings): global settings for plugins. 

1317 server_settings (Optional[MCPServerConfig]): Server-side MCP configuration (when plugins run as server). 

1318 grpc_server_settings (Optional[GRPCServerConfig]): Server-side gRPC configuration (when plugins run as gRPC server). 

1319 unix_socket_server_settings (Optional[UnixSocketServerConfig]): Server-side Unix socket configuration. 

1320 """ 

1321 

1322 plugins: Optional[list[PluginConfig]] = [] 

1323 plugin_dirs: list[str] = [] 

1324 plugin_settings: PluginSettings 

1325 server_settings: Optional[MCPServerConfig] = None 

1326 grpc_server_settings: Optional[GRPCServerConfig] = None 

1327 unix_socket_server_settings: Optional[UnixSocketServerConfig] = None 

1328 

1329 

1330class PluginResult(BaseModel, Generic[T]): 

1331 """A result of the plugin hook processing. The actual type is dependent on the hook. 

1332 

1333 Attributes: 

1334 continue_processing (bool): Whether to stop processing. 

1335 modified_payload (Optional[Any]): The modified payload if the plugin is a transformer. 

1336 violation (Optional[PluginViolation]): violation object. 

1337 metadata (Optional[dict[str, Any]]): additional metadata. 

1338 http_headers (Optional[dict[str, str]]): HTTP headers to include in successful responses. 

1339 retry_delay_ms (int): Milliseconds the gateway should wait before retrying the tool call. 

1340 0 (default) means no retry. Set by retry_with_backoff plugin to request 

1341 a delayed re-execution of the tool. 

1342 

1343 Examples: 

1344 >>> result = PluginResult() 

1345 >>> result.continue_processing 

1346 True 

1347 >>> result.metadata 

1348 {} 

1349 >>> from mcpgateway.plugins.framework import PluginViolation 

1350 >>> violation = PluginViolation( 

1351 ... reason="Test", description="Test desc", code="TEST", details={} 

1352 ... ) 

1353 >>> result2 = PluginResult(continue_processing=False, violation=violation) 

1354 >>> result2.continue_processing 

1355 False 

1356 >>> result2.violation.code 

1357 'TEST' 

1358 >>> r = PluginResult(metadata={"key": "value"}) 

1359 >>> r.metadata["key"] 

1360 'value' 

1361 >>> r2 = PluginResult(continue_processing=False) 

1362 >>> r2.continue_processing 

1363 False 

1364 >>> r3 = PluginResult(retry_delay_ms=500) 

1365 >>> r3.retry_delay_ms 

1366 500 

1367 """ 

1368 

1369 continue_processing: bool = True 

1370 modified_payload: Optional[T] = None 

1371 violation: Optional[PluginViolation] = None 

1372 metadata: Optional[dict[str, Any]] = Field(default_factory=dict) 

1373 http_headers: Optional[dict[str, str]] = None 

1374 retry_delay_ms: int = 0 

1375 

1376 

1377class GlobalContext(BaseModel): 

1378 """The global context, which shared across all plugins. 

1379 

1380 Attributes: 

1381 request_id (str): ID of the HTTP request. 

1382 user (str): user ID associated with the request. 

1383 tenant_id (str): tenant ID. 

1384 server_id (str): server ID. 

1385 metadata (Optional[dict[str,Any]]): a global shared metadata across plugins (Read-only from plugin's perspective). 

1386 state (Optional[dict[str,Any]]): a global shared state across plugins. 

1387 

1388 Examples: 

1389 >>> ctx = GlobalContext(request_id="req-123") 

1390 >>> ctx.request_id 

1391 'req-123' 

1392 >>> ctx.user is None 

1393 True 

1394 >>> ctx2 = GlobalContext(request_id="req-456", user="alice", tenant_id="tenant1") 

1395 >>> ctx2.user 

1396 'alice' 

1397 >>> ctx2.tenant_id 

1398 'tenant1' 

1399 >>> c = GlobalContext(request_id="123", server_id="srv1") 

1400 >>> c.request_id 

1401 '123' 

1402 >>> c.server_id 

1403 'srv1' 

1404 """ 

1405 

1406 request_id: str 

1407 user: Optional[Union[str, dict[str, Any]]] = None 

1408 tenant_id: Optional[str] = None 

1409 server_id: Optional[str] = None 

1410 state: dict[str, Any] = Field(default_factory=dict) 

1411 metadata: dict[str, Any] = Field(default_factory=dict) 

1412 

1413 

1414class PluginContext(BaseModel): 

1415 """The plugin's context, which lasts a request lifecycle. 

1416 

1417 Attributes: 

1418 state: the inmemory state of the request. 

1419 global_context: the context that is shared across plugins. 

1420 metadata: plugin meta data. 

1421 

1422 Examples: 

1423 >>> gctx = GlobalContext(request_id="req-123") 

1424 >>> ctx = PluginContext(global_context=gctx) 

1425 >>> ctx.global_context.request_id 

1426 'req-123' 

1427 >>> ctx.global_context.user is None 

1428 True 

1429 >>> ctx.state["somekey"] = "some value" 

1430 >>> ctx.state["somekey"] 

1431 'some value' 

1432 """ 

1433 

1434 state: dict[str, Any] = Field(default_factory=dict) 

1435 global_context: GlobalContext 

1436 metadata: dict[str, Any] = Field(default_factory=dict) 

1437 

1438 def get_state(self, key: str, default: Any = None) -> Any: 

1439 """Get value from shared state. 

1440 

1441 Args: 

1442 key: The key to access the shared state. 

1443 default: A default value if one doesn't exist. 

1444 

1445 Returns: 

1446 The state value. 

1447 """ 

1448 return self.state.get(key, default) 

1449 

1450 def set_state(self, key: str, value: Any) -> None: 

1451 """Set value in shared state. 

1452 

1453 Args: 

1454 key: the key to add to the state. 

1455 value: the value to add to the state. 

1456 """ 

1457 self.state[key] = value 

1458 

1459 async def cleanup(self) -> None: 

1460 """Cleanup context resources.""" 

1461 self.state.clear() 

1462 self.metadata.clear() 

1463 

1464 def is_empty(self) -> bool: 

1465 """Check whether the state and metadata objects are empty. 

1466 

1467 Returns: 

1468 True if the context state and metadata are empty. 

1469 """ 

1470 return not (self.state or self.metadata or self.global_context.state) 

1471 

1472 

1473PluginContextTable = dict[str, PluginContext] 

1474 

1475 

1476class PluginPayload(BaseModel): 

1477 """Base class for all hook payloads. Immutable by design. 

1478 

1479 Frozen payloads prevent in-place mutations by plugins -- attributes 

1480 cannot be set directly on the object. Plugins must use 

1481 ``model_copy(update=...)`` to create modified payloads and return 

1482 modifications via ``PluginResult.modified_payload``. 

1483 

1484 Examples: 

1485 >>> class TestPayload(PluginPayload): 

1486 ... name: str 

1487 >>> p = TestPayload(name="test") 

1488 >>> p.name 

1489 'test' 

1490 """ 

1491 

1492 model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)