Coverage for mcpgateway / plugins / framework / models.py: 99%

545 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/models.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo 

6 

7Pydantic models for plugins. 

8This module implements the pydantic models associated with 

9the base plugin layer including configurations, and contexts. 

10""" 

11 

12# Standard 

13from enum import Enum 

14import logging 

15import os 

16from pathlib import Path 

17from typing import Any, Generic, Optional, Self, TypeVar, Union 

18 

19# Third-Party 

20from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator, model_validator, PrivateAttr, ValidationInfo 

21 

22# First-Party 

23from mcpgateway.plugins.framework.constants import CMD, CWD, ENV, EXTERNAL_PLUGIN_TYPE, IGNORE_CONFIG_EXTERNAL, PYTHON_SUFFIX, SCRIPT, UDS, URL 

24from mcpgateway.plugins.framework.settings import get_client_mtls_settings, get_grpc_client_mtls_settings, get_grpc_server_settings, get_mcp_server_settings, get_transport_settings 

25from mcpgateway.plugins.framework.validators import validate_plugin_url 

26 

27T = TypeVar("T") 

28 

29 

30class TransportType(str, Enum): 

31 """Supported transport mechanisms for MCP plugin communication. 

32 

33 Attributes: 

34 SSE: Server-Sent Events transport. 

35 HTTP: Standard HTTP-based transport. 

36 STDIO: Standard input/output transport. 

37 STREAMABLEHTTP: HTTP transport with streaming. 

38 GRPC: gRPC transport for external plugins. 

39 

40 Examples: 

41 >>> TransportType.SSE 

42 <TransportType.SSE: 'SSE'> 

43 >>> TransportType.STDIO.value 

44 'STDIO' 

45 >>> TransportType('STREAMABLEHTTP') 

46 <TransportType.STREAMABLEHTTP: 'STREAMABLEHTTP'> 

47 """ 

48 

49 SSE = "SSE" 

50 HTTP = "HTTP" 

51 STDIO = "STDIO" 

52 STREAMABLEHTTP = "STREAMABLEHTTP" 

53 GRPC = "GRPC" 

54 

55 

56class PluginMode(str, Enum): 

57 """Plugin modes of operation. 

58 

59 Attributes: 

60 enforce: enforces the plugin result, and blocks execution when there is an error. 

61 enforce_ignore_error: enforces the plugin result, but allows execution when there is an error. 

62 permissive: audits the result. 

63 disabled: plugin disabled. 

64 

65 Examples: 

66 >>> PluginMode.ENFORCE 

67 <PluginMode.ENFORCE: 'enforce'> 

68 >>> PluginMode.ENFORCE_IGNORE_ERROR 

69 <PluginMode.ENFORCE_IGNORE_ERROR: 'enforce_ignore_error'> 

70 >>> PluginMode.PERMISSIVE.value 

71 'permissive' 

72 >>> PluginMode('disabled') 

73 <PluginMode.DISABLED: 'disabled'> 

74 >>> 'enforce' in [m.value for m in PluginMode] 

75 True 

76 """ 

77 

78 ENFORCE = "enforce" 

79 ENFORCE_IGNORE_ERROR = "enforce_ignore_error" 

80 PERMISSIVE = "permissive" 

81 DISABLED = "disabled" 

82 

83 

84class BaseTemplate(BaseModel): 

85 """Base Template.The ToolTemplate, PromptTemplate and ResourceTemplate could be extended using this 

86 

87 Attributes: 

88 context (Optional[list[str]]): specifies the keys of context to be extracted. The context could be global (shared between the plugins) or 

89 local (shared within the plugin). Example: global.key1. 

90 extensions (Optional[dict[str, Any]]): add custom keys for your specific plugin. Example - 'policy' 

91 key for opa plugin. 

92 

93 Examples: 

94 >>> base = BaseTemplate(context=["global.key1.key2", "local.key1.key2"]) 

95 >>> base.context 

96 ['global.key1.key2', 'local.key1.key2'] 

97 >>> base = BaseTemplate(context=["global.key1.key2"], extensions={"policy" : "sample policy"}) 

98 >>> base.extensions 

99 {'policy': 'sample policy'} 

100 """ 

101 

102 context: Optional[list[str]] = None 

103 extensions: Optional[dict[str, Any]] = None 

104 

105 

106class ToolTemplate(BaseTemplate): 

107 """Tool Template. 

108 

109 Attributes: 

110 tool_name (str): the name of the tool. 

111 fields (Optional[list[str]]): the tool fields that are affected. 

112 result (bool): analyze tool output if true. 

113 

114 Examples: 

115 >>> tool = ToolTemplate(tool_name="my_tool") 

116 >>> tool.tool_name 

117 'my_tool' 

118 >>> tool.result 

119 False 

120 >>> tool2 = ToolTemplate(tool_name="analyzer", fields=["input", "params"], result=True) 

121 >>> tool2.fields 

122 ['input', 'params'] 

123 >>> tool2.result 

124 True 

125 """ 

126 

127 tool_name: str 

128 fields: Optional[list[str]] = None 

129 result: bool = False 

130 

131 

132class PromptTemplate(BaseTemplate): 

133 """Prompt Template. 

134 

135 Attributes: 

136 prompt_name (str): the name of the prompt. 

137 fields (Optional[list[str]]): the prompt fields that are affected. 

138 result (bool): analyze tool output if true. 

139 

140 Examples: 

141 >>> prompt = PromptTemplate(prompt_name="greeting") 

142 >>> prompt.prompt_name 

143 'greeting' 

144 >>> prompt.result 

145 False 

146 >>> prompt2 = PromptTemplate(prompt_name="question", fields=["context"], result=True) 

147 >>> prompt2.fields 

148 ['context'] 

149 """ 

150 

151 prompt_name: str 

152 fields: Optional[list[str]] = None 

153 result: bool = False 

154 

155 

156class ResourceTemplate(BaseTemplate): 

157 """Resource Template. 

158 

159 Attributes: 

160 resource_uri (str): the URI of the resource. 

161 fields (Optional[list[str]]): the resource fields that are affected. 

162 result (bool): analyze resource output if true. 

163 

164 Examples: 

165 >>> resource = ResourceTemplate(resource_uri="file:///data.txt") 

166 >>> resource.resource_uri 

167 'file:///data.txt' 

168 >>> resource.result 

169 False 

170 >>> resource2 = ResourceTemplate(resource_uri="http://api/data", fields=["content"], result=True) 

171 >>> resource2.fields 

172 ['content'] 

173 """ 

174 

175 resource_uri: str 

176 fields: Optional[list[str]] = None 

177 result: bool = False 

178 

179 

180class PluginCondition(BaseModel): 

181 """Conditions for when plugin should execute. 

182 

183 Attributes: 

184 server_ids (Optional[set[str]]): set of server ids. 

185 tenant_ids (Optional[set[str]]): set of tenant ids. 

186 tools (Optional[set[str]]): set of tool names. 

187 prompts (Optional[set[str]]): set of prompt names. 

188 resources (Optional[set[str]]): set of resource URIs. 

189 agents (Optional[set[str]]): set of agent IDs. 

190 user_pattern (Optional[list[str]]): list of user patterns. 

191 content_types (Optional[list[str]]): list of content types. 

192 

193 Examples: 

194 >>> cond = PluginCondition(server_ids={"server1", "server2"}) 

195 >>> "server1" in cond.server_ids 

196 True 

197 >>> cond2 = PluginCondition(tools={"tool1"}, prompts={"prompt1"}) 

198 >>> cond2.tools 

199 {'tool1'} 

200 >>> cond3 = PluginCondition(user_patterns=["admin", "root"]) 

201 >>> len(cond3.user_patterns) 

202 2 

203 """ 

204 

205 server_ids: Optional[set[str]] = None 

206 tenant_ids: Optional[set[str]] = None 

207 tools: Optional[set[str]] = None 

208 prompts: Optional[set[str]] = None 

209 resources: Optional[set[str]] = None 

210 agents: Optional[set[str]] = None 

211 user_patterns: Optional[list[str]] = None 

212 content_types: Optional[list[str]] = None 

213 

214 @field_serializer("server_ids", "tenant_ids", "tools", "prompts", "resources", "agents") 

215 def serialize_set(self, value: set[str] | None) -> list[str] | None: 

216 """Serialize set objects in PluginCondition for MCP. 

217 

218 Args: 

219 value: a set of server ids, tenant ids, tools or prompts. 

220 

221 Returns: 

222 The set as a serializable list. 

223 """ 

224 if value: 

225 values = [] 

226 for key in value: 

227 values.append(key) 

228 return values 

229 return None 

230 

231 

232class AppliedTo(BaseModel): 

233 """What tools/prompts/resources and fields the plugin will be applied to. 

234 

235 Attributes: 

236 tools (Optional[list[ToolTemplate]]): tools and fields to be applied. 

237 prompts (Optional[list[PromptTemplate]]): prompts and fields to be applied. 

238 resources (Optional[list[ResourceTemplate]]): resources and fields to be applied. 

239 global_context (Optional[list[str]]): keys in the context to be applied on globally 

240 local_context(Optional[list[str]]): keys in the context to be applied on locally 

241 """ 

242 

243 tools: Optional[list[ToolTemplate]] = None 

244 prompts: Optional[list[PromptTemplate]] = None 

245 resources: Optional[list[ResourceTemplate]] = None 

246 

247 

248class MCPTransportTLSConfigBase(BaseModel): 

249 """Base TLS configuration with common fields for both client and server. 

250 

251 Attributes: 

252 certfile (Optional[str]): Path to the PEM-encoded certificate file. 

253 keyfile (Optional[str]): Path to the PEM-encoded private key file. 

254 ca_bundle (Optional[str]): Path to a CA bundle file for verification. 

255 keyfile_password (Optional[str]): Optional password for encrypted private key. 

256 """ 

257 

258 certfile: Optional[str] = Field(default=None, description="Path to PEM certificate file") 

259 keyfile: Optional[str] = Field(default=None, description="Path to PEM private key file") 

260 ca_bundle: Optional[str] = Field(default=None, description="Path to CA bundle for verification") 

261 keyfile_password: Optional[str] = Field(default=None, description="Password for encrypted private key") 

262 

263 @field_validator("ca_bundle", "certfile", "keyfile", mode="after") 

264 @classmethod 

265 def validate_path(cls, value: Optional[str]) -> Optional[str]: 

266 """Expand and validate file paths supplied in TLS configuration. 

267 

268 Args: 

269 value: File path to validate. 

270 

271 Returns: 

272 Expanded file path or None if not provided. 

273 

274 Raises: 

275 ValueError: If file path does not exist. 

276 """ 

277 

278 if not value: 

279 return value 

280 expanded = Path(value).expanduser() 

281 if not expanded.is_file(): 

282 raise ValueError(f"TLS file path does not exist: {value}") 

283 return str(expanded) 

284 

285 @model_validator(mode="after") 

286 def validate_cert_key(self) -> Self: # pylint: disable=bad-classmethod-argument 

287 """Ensure certificate and key options are consistent. 

288 

289 Returns: 

290 Self after validation. 

291 

292 Raises: 

293 ValueError: If keyfile is specified without certfile. 

294 """ 

295 

296 if self.keyfile and not self.certfile: 

297 raise ValueError("keyfile requires certfile to be specified") 

298 return self 

299 

300 

301class MCPClientTLSConfig(MCPTransportTLSConfigBase): 

302 """Client-side TLS configuration (gateway connecting to plugin). 

303 

304 Attributes: 

305 verify (bool): Whether to verify the remote server certificate. 

306 check_hostname (bool): Enable hostname verification when verify is true. 

307 """ 

308 

309 verify: bool = Field(default=True, description="Verify the upstream server certificate") 

310 check_hostname: bool = Field(default=True, description="Enable hostname verification") 

311 

312 @classmethod 

313 def from_env(cls) -> Optional["MCPClientTLSConfig"]: 

314 """Construct client TLS configuration from PLUGINS_CLIENT_* environment variables. 

315 

316 Returns: 

317 MCPClientTLSConfig instance or None if no environment variables are set. 

318 """ 

319 s = get_client_mtls_settings() 

320 data: dict[str, Any] = {} 

321 

322 if s.client_mtls_certfile: 

323 data["certfile"] = s.client_mtls_certfile 

324 if s.client_mtls_keyfile: 

325 data["keyfile"] = s.client_mtls_keyfile 

326 if s.client_mtls_ca_bundle: 

327 data["ca_bundle"] = s.client_mtls_ca_bundle 

328 if s.client_mtls_keyfile_password is not None: 

329 data["keyfile_password"] = s.client_mtls_keyfile_password.get_secret_value() 

330 if s.client_mtls_verify is not None: 

331 data["verify"] = s.client_mtls_verify 

332 if s.client_mtls_check_hostname is not None: 

333 data["check_hostname"] = s.client_mtls_check_hostname 

334 

335 if not data: 

336 return None 

337 

338 return cls(**data) 

339 

340 

341class MCPServerTLSConfig(MCPTransportTLSConfigBase): 

342 """Server-side TLS configuration (plugin accepting gateway connections). 

343 

344 Attributes: 

345 ssl_cert_reqs (int): Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED). 

346 """ 

347 

348 ssl_cert_reqs: int = Field(default=2, description="Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED)") 

349 

350 @classmethod 

351 def from_env(cls) -> Optional["MCPServerTLSConfig"]: 

352 """Construct server TLS configuration from PLUGINS_SERVER_SSL_* environment variables. 

353 

354 Returns: 

355 MCPServerTLSConfig instance or None if no environment variables are set. 

356 """ 

357 s = get_mcp_server_settings() 

358 data: dict[str, Any] = {} 

359 

360 if s.server_ssl_keyfile: 

361 data["keyfile"] = s.server_ssl_keyfile 

362 if s.server_ssl_certfile: 

363 data["certfile"] = s.server_ssl_certfile 

364 if s.server_ssl_ca_certs: 

365 data["ca_bundle"] = s.server_ssl_ca_certs 

366 if s.server_ssl_keyfile_password is not None: 

367 data["keyfile_password"] = s.server_ssl_keyfile_password.get_secret_value() 

368 if s.server_ssl_cert_reqs is not None: 

369 data["ssl_cert_reqs"] = s.server_ssl_cert_reqs 

370 

371 if not data: 

372 return None 

373 

374 return cls(**data) 

375 

376 

377class MCPServerConfig(BaseModel): 

378 """Server-side MCP configuration (plugin running as server). 

379 

380 Attributes: 

381 host (str): Server host to bind to. 

382 port (int): Server port to bind to. 

383 uds (Optional[str]): Unix domain socket path for streamable HTTP. 

384 tls (Optional[MCPServerTLSConfig]): Server-side TLS configuration. 

385 """ 

386 

387 host: str = Field(default="127.0.0.1", description="Server host to bind to") 

388 port: int = Field(default=8000, description="Server port to bind to") 

389 uds: Optional[str] = Field(default=None, description="Unix domain socket path for streamable HTTP") 

390 tls: Optional[MCPServerTLSConfig] = Field(default=None, description="Server-side TLS configuration") 

391 

392 @field_validator("uds", mode="after") 

393 @classmethod 

394 def validate_uds(cls, uds: str | None) -> str | None: 

395 """Validate the Unix domain socket path for security. 

396 

397 Args: 

398 uds: Unix domain socket path. 

399 

400 Returns: 

401 The validated canonical uds path or None if none is set. 

402 

403 Raises: 

404 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

405 """ 

406 if uds is None: 

407 return uds 

408 if not isinstance(uds, str) or not uds.strip(): 

409 raise ValueError("MCP server uds must be a non-empty string.") 

410 

411 uds_path = Path(uds).expanduser().resolve() 

412 if not uds_path.is_absolute(): 

413 raise ValueError(f"MCP server uds path must be absolute: {uds}") 

414 

415 parent_dir = uds_path.parent 

416 if not parent_dir.is_dir(): 

417 raise ValueError(f"MCP server uds parent directory does not exist: {parent_dir}") 

418 

419 # Check parent directory permissions for security 

420 try: 

421 parent_mode = parent_dir.stat().st_mode 

422 # Warn if parent directory is world-writable (o+w = 0o002) 

423 if parent_mode & 0o002: 

424 logging.getLogger(__name__).warning( 

425 "MCP server uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).", 

426 parent_dir, 

427 ) 

428 except OSError: 

429 pass # Best effort - continue if we can't check permissions 

430 

431 return str(uds_path) 

432 

433 @model_validator(mode="after") 

434 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument 

435 """Ensure TLS is not configured when using a Unix domain socket. 

436 

437 Returns: 

438 Self after validation. 

439 

440 Raises: 

441 ValueError: if tls is set with uds. 

442 """ 

443 if self.uds and self.tls: 

444 raise ValueError("TLS configuration is not supported for Unix domain sockets.") 

445 return self 

446 

447 @classmethod 

448 def from_env(cls) -> Optional["MCPServerConfig"]: 

449 """Construct server configuration from PLUGINS_SERVER_* environment variables. 

450 

451 Returns: 

452 MCPServerConfig instance or None if no environment variables are set. 

453 """ 

454 s = get_mcp_server_settings() 

455 data: dict[str, Any] = {} 

456 

457 if s.server_host: 

458 data["host"] = s.server_host 

459 if s.server_port is not None: 

460 data["port"] = s.server_port 

461 if s.server_uds: 

462 data["uds"] = s.server_uds 

463 

464 # Check if SSL/TLS is enabled 

465 if s.server_ssl_enabled: 

466 tls_config = MCPServerTLSConfig.from_env() 

467 if tls_config: 

468 data["tls"] = tls_config 

469 

470 if not data: 

471 return None 

472 

473 return cls(**data) 

474 

475 

476class MCPClientConfig(BaseModel): 

477 """Client-side MCP configuration (gateway connecting to external plugin). 

478 

479 Attributes: 

480 proto (TransportType): The MCP transport type. Can be SSE, STDIO, or STREAMABLEHTTP 

481 url (Optional[str]): An MCP URL. Only valid when MCP transport type is SSE or STREAMABLEHTTP. 

482 script (Optional[str]): The path and name to the STDIO script that runs the plugin server. Only valid for STDIO type. 

483 cmd (Optional[list[str]]): Command + args used to start a STDIO MCP server. Only valid for STDIO type. 

484 env (Optional[dict[str, str]]): Environment overrides for STDIO server process. 

485 cwd (Optional[str]): Working directory for STDIO server process. 

486 uds (Optional[str]): Unix domain socket path for streamable HTTP. 

487 tls (Optional[MCPClientTLSConfig]): Client-side TLS configuration for mTLS. 

488 """ 

489 

490 proto: TransportType 

491 url: Optional[str] = None 

492 script: Optional[str] = None 

493 cmd: Optional[list[str]] = None 

494 env: Optional[dict[str, str]] = None 

495 cwd: Optional[str] = None 

496 uds: Optional[str] = None 

497 tls: Optional[MCPClientTLSConfig] = None 

498 

499 @field_validator(URL, mode="after") 

500 @classmethod 

501 def validate_url(cls, url: str | None) -> str | None: 

502 """Validate a MCP url for streamable HTTP connections. 

503 

504 Args: 

505 url: the url to be validated. 

506 

507 Raises: 

508 ValueError: if the URL fails validation. 

509 

510 Returns: 

511 The validated URL or None if none is set. 

512 """ 

513 if url: 

514 result = validate_plugin_url(url) 

515 return result 

516 return url 

517 

518 @field_validator(SCRIPT, mode="after") 

519 @classmethod 

520 def validate_script(cls, script: str | None) -> str | None: 

521 """Validate an MCP stdio script. 

522 

523 Args: 

524 script: the script to be validated. 

525 

526 Raises: 

527 ValueError: if the script doesn't exist or isn't executable when required. 

528 

529 Returns: 

530 The validated string or None if none is set. 

531 """ 

532 if script: 

533 file_path = Path(script).expanduser() 

534 # Allow relative paths; they are resolved at runtime (optionally using cwd). 

535 if file_path.is_absolute(): 

536 if not file_path.is_file(): 

537 raise ValueError(f"MCP server script {script} does not exist.") 

538 # Allow Python (.py) and shell scripts (.sh). Other files must be executable. 

539 if file_path.suffix not in {PYTHON_SUFFIX, ".sh"} and not os.access(file_path, os.X_OK): 

540 raise ValueError(f"MCP server script {script} must be executable.") 

541 return script 

542 

543 @field_validator(CMD, mode="after") 

544 @classmethod 

545 def validate_cmd(cls, cmd: list[str] | None) -> list[str] | None: 

546 """Validate an MCP stdio command. 

547 

548 Args: 

549 cmd: the command to be validated. 

550 

551 Raises: 

552 ValueError: if cmd is empty or contains empty values. 

553 

554 Returns: 

555 The validated command list or None if none is set. 

556 """ 

557 if cmd is None: 

558 return cmd 

559 if not isinstance(cmd, list) or not cmd: 

560 raise ValueError("MCP stdio cmd must be a non-empty list.") 

561 if not all(isinstance(part, str) and part.strip() for part in cmd): 

562 raise ValueError("MCP stdio cmd entries must be non-empty strings.") 

563 return cmd 

564 

565 @field_validator(ENV, mode="after") 

566 @classmethod 

567 def validate_env(cls, env: dict[str, str] | None) -> dict[str, str] | None: 

568 """Validate environment overrides for MCP stdio. 

569 

570 Args: 

571 env: Environment overrides to set for the stdio plugin process. 

572 

573 Returns: 

574 The validated environment dict or None if none is set. 

575 

576 Raises: 

577 ValueError: if keys/values are invalid or the dict is empty. 

578 """ 

579 if env is None: 

580 return env 

581 if not isinstance(env, dict) or not env: 

582 raise ValueError("MCP stdio env must be a non-empty dict.") 

583 for key, value in env.items(): 

584 if not isinstance(key, str) or not key.strip(): 

585 raise ValueError("MCP stdio env keys must be non-empty strings.") 

586 if not isinstance(value, str): 

587 raise ValueError("MCP stdio env values must be strings.") 

588 return env 

589 

590 @field_validator(CWD, mode="after") 

591 @classmethod 

592 def validate_cwd(cls, cwd: str | None) -> str | None: 

593 """Validate the working directory for MCP stdio. 

594 

595 Args: 

596 cwd: Working directory for the stdio plugin process. 

597 

598 Returns: 

599 The validated canonical cwd path or None if none is set. 

600 

601 Raises: 

602 ValueError: if cwd does not exist or is not a directory. 

603 """ 

604 if not cwd: 

605 return cwd 

606 cwd_path = Path(cwd).expanduser().resolve() 

607 if not cwd_path.is_dir(): 

608 raise ValueError(f"MCP stdio cwd {cwd} does not exist or is not a directory.") 

609 return str(cwd_path) 

610 

611 @field_validator(UDS, mode="after") 

612 @classmethod 

613 def validate_uds(cls, uds: str | None) -> str | None: 

614 """Validate a Unix domain socket path for streamable HTTP. 

615 

616 Args: 

617 uds: Unix domain socket path. 

618 

619 Returns: 

620 The validated canonical uds path or None if none is set. 

621 

622 Raises: 

623 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

624 """ 

625 if uds is None: 

626 return uds 

627 if not isinstance(uds, str) or not uds.strip(): 

628 raise ValueError("MCP client uds must be a non-empty string.") 

629 

630 uds_path = Path(uds).expanduser().resolve() 

631 if not uds_path.is_absolute(): 

632 raise ValueError(f"MCP client uds path must be absolute: {uds}") 

633 

634 parent_dir = uds_path.parent 

635 if not parent_dir.is_dir(): 

636 raise ValueError(f"MCP client uds parent directory does not exist: {parent_dir}") 

637 

638 # Check parent directory permissions for security 

639 try: 

640 parent_mode = parent_dir.stat().st_mode 

641 # Warn if parent directory is world-writable (o+w = 0o002) 

642 if parent_mode & 0o002: 

643 logging.getLogger(__name__).warning( 

644 "MCP client uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).", 

645 parent_dir, 

646 ) 

647 except OSError: 

648 pass # Best effort - continue if we can't check permissions 

649 

650 return str(uds_path) 

651 

652 @model_validator(mode="after") 

653 def validate_tls_usage(self) -> Self: # pylint: disable=bad-classmethod-argument 

654 """Ensure TLS configuration is only used with HTTP-based transports. 

655 

656 Returns: 

657 Self after validation. 

658 

659 Raises: 

660 ValueError: If TLS configuration is used with non-HTTP transports. 

661 """ 

662 

663 if self.tls and self.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP): 

664 raise ValueError("TLS configuration is only valid for HTTP/SSE transports") 

665 if self.uds and self.tls: 

666 raise ValueError("TLS configuration is not supported for Unix domain sockets.") 

667 return self 

668 

669 @model_validator(mode="after") 

670 def validate_transport_fields(self) -> Self: # pylint: disable=bad-classmethod-argument 

671 """Ensure transport-specific fields are only used with matching transports. 

672 

673 Returns: 

674 Self after validation. 

675 

676 Raises: 

677 ValueError: if fields are incompatible with the selected transport. 

678 """ 

679 if self.proto == TransportType.STDIO and self.url: 

680 raise ValueError("URL is only valid for HTTP/SSE transports") 

681 if self.proto != TransportType.STDIO and (self.script or self.cmd or self.env or self.cwd): 

682 raise ValueError("script/cmd/env/cwd are only valid for STDIO transport") 

683 if self.proto != TransportType.STREAMABLEHTTP and self.uds: 

684 raise ValueError("uds is only valid for STREAMABLEHTTP transport") 

685 return self 

686 

687 

688class GRPCClientTLSConfig(MCPTransportTLSConfigBase): 

689 """Client-side gRPC TLS configuration (gateway connecting to plugin). 

690 

691 Attributes: 

692 verify (bool): Whether to verify the remote server certificate. 

693 """ 

694 

695 verify: bool = Field(default=True, description="Verify the upstream server certificate") 

696 

697 @classmethod 

698 def from_env(cls) -> Optional["GRPCClientTLSConfig"]: 

699 """Construct gRPC client TLS configuration from PLUGINS_GRPC_CLIENT_* environment variables. 

700 

701 Returns: 

702 GRPCClientTLSConfig instance or None if no environment variables are set. 

703 """ 

704 s = get_grpc_client_mtls_settings() 

705 data: dict[str, Any] = {} 

706 

707 if s.grpc_client_mtls_certfile: 

708 data["certfile"] = s.grpc_client_mtls_certfile 

709 if s.grpc_client_mtls_keyfile: 

710 data["keyfile"] = s.grpc_client_mtls_keyfile 

711 if s.grpc_client_mtls_ca_bundle: 

712 data["ca_bundle"] = s.grpc_client_mtls_ca_bundle 

713 if s.grpc_client_mtls_keyfile_password is not None: 

714 data["keyfile_password"] = s.grpc_client_mtls_keyfile_password.get_secret_value() 

715 if s.grpc_client_mtls_verify is not None: 

716 data["verify"] = s.grpc_client_mtls_verify 

717 

718 if not data: 

719 return None 

720 

721 return cls(**data) 

722 

723 

724class GRPCServerTLSConfig(MCPTransportTLSConfigBase): 

725 """Server-side gRPC TLS configuration (plugin accepting gateway connections). 

726 

727 Attributes: 

728 client_auth (str): Client certificate requirement ('none', 'optional', 'require'). 

729 """ 

730 

731 client_auth: str = Field(default="require", description="Client certificate requirement (none, optional, require)") 

732 

733 @field_validator("client_auth", mode="after") 

734 @classmethod 

735 def validate_client_auth(cls, value: str) -> str: 

736 """Validate client_auth value. 

737 

738 Args: 

739 value: Client auth requirement string. 

740 

741 Returns: 

742 Validated client auth string. 

743 

744 Raises: 

745 ValueError: If client_auth is not a valid value. 

746 """ 

747 valid_values = {"none", "optional", "require"} 

748 if value.lower() not in valid_values: 

749 raise ValueError(f"client_auth must be one of {valid_values}, got '{value}'") 

750 return value.lower() 

751 

752 @classmethod 

753 def from_env(cls) -> Optional["GRPCServerTLSConfig"]: 

754 """Construct gRPC server TLS configuration from PLUGINS_GRPC_SERVER_SSL_* environment variables. 

755 

756 Returns: 

757 GRPCServerTLSConfig instance or None if no environment variables are set. 

758 """ 

759 s = get_grpc_server_settings() 

760 data: dict[str, Any] = {} 

761 

762 if s.grpc_server_ssl_keyfile: 

763 data["keyfile"] = s.grpc_server_ssl_keyfile 

764 if s.grpc_server_ssl_certfile: 

765 data["certfile"] = s.grpc_server_ssl_certfile 

766 if s.grpc_server_ssl_ca_certs: 

767 data["ca_bundle"] = s.grpc_server_ssl_ca_certs 

768 if s.grpc_server_ssl_keyfile_password is not None: 

769 data["keyfile_password"] = s.grpc_server_ssl_keyfile_password.get_secret_value() 

770 if s.grpc_server_ssl_client_auth: 

771 data["client_auth"] = s.grpc_server_ssl_client_auth 

772 

773 if not data: 

774 return None 

775 

776 return cls(**data) 

777 

778 

779class GRPCClientConfig(BaseModel): 

780 """Client-side gRPC configuration (gateway connecting to external plugin). 

781 

782 Attributes: 

783 target (Optional[str]): The gRPC target address in host:port format. 

784 uds (Optional[str]): Unix domain socket path (alternative to target). 

785 tls (Optional[GRPCClientTLSConfig]): Client-side TLS configuration for mTLS. 

786 

787 Examples: 

788 >>> # TCP connection 

789 >>> config = GRPCClientConfig(target="localhost:50051") 

790 >>> config.get_target() 

791 'localhost:50051' 

792 >>> # Unix domain socket connection (path is resolved to canonical form) 

793 >>> config = GRPCClientConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP 

794 >>> config.get_target() # doctest: +SKIP 

795 'unix:///tmp/grpc-plugin.sock' 

796 """ 

797 

798 target: Optional[str] = Field(default=None, description="gRPC target address (host:port)") 

799 uds: Optional[str] = Field(default=None, description="Unix domain socket path") 

800 tls: Optional[GRPCClientTLSConfig] = None 

801 

802 @field_validator("target", mode="after") 

803 @classmethod 

804 def validate_target(cls, target: str | None) -> str | None: 

805 """Validate gRPC target address format. 

806 

807 Args: 

808 target: The target address to validate. 

809 

810 Returns: 

811 The validated target address. 

812 

813 Raises: 

814 ValueError: If target is not in host:port format. 

815 """ 

816 if target is None: 

817 return target 

818 if not target: 

819 raise ValueError("gRPC target address cannot be empty") 

820 # Basic validation - should contain host and port 

821 if ":" not in target: 

822 raise ValueError(f"gRPC target must be in host:port format, got '{target}'") 

823 return target 

824 

825 @field_validator("uds", mode="after") 

826 @classmethod 

827 def validate_uds(cls, uds: str | None) -> str | None: 

828 """Validate Unix domain socket path for gRPC. 

829 

830 Args: 

831 uds: Unix domain socket path. 

832 

833 Returns: 

834 The validated canonical uds path or None if none is set. 

835 

836 Raises: 

837 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

838 """ 

839 if uds is None: 

840 return uds 

841 if not isinstance(uds, str) or not uds.strip(): 

842 raise ValueError("gRPC client uds must be a non-empty string.") 

843 

844 uds_path = Path(uds).expanduser().resolve() 

845 if not uds_path.is_absolute(): 

846 raise ValueError(f"gRPC client uds path must be absolute: {uds}") 

847 

848 parent_dir = uds_path.parent 

849 if not parent_dir.is_dir(): 

850 raise ValueError(f"gRPC client uds parent directory does not exist: {parent_dir}") 

851 

852 # Check parent directory permissions for security 

853 try: 

854 parent_mode = parent_dir.stat().st_mode 

855 if parent_mode & 0o002: 

856 logging.getLogger(__name__).warning( 

857 "gRPC client uds parent directory %s is world-writable. Consider using a directory with restricted permissions.", 

858 parent_dir, 

859 ) 

860 except OSError: 

861 pass 

862 

863 return str(uds_path) 

864 

865 @model_validator(mode="after") 

866 def validate_target_or_uds(self) -> Self: # pylint: disable=bad-classmethod-argument 

867 """Ensure exactly one of target or uds is configured. 

868 

869 Returns: 

870 Self after validation. 

871 

872 Raises: 

873 ValueError: If neither or both target and uds are set. 

874 """ 

875 has_target = self.target is not None 

876 has_uds = self.uds is not None 

877 

878 if not has_target and not has_uds: 

879 raise ValueError("gRPC client must have either 'target' or 'uds' configured") 

880 if has_target and has_uds: 

881 raise ValueError("gRPC client cannot have both 'target' and 'uds' configured") 

882 if has_uds and self.tls: 

883 raise ValueError("TLS configuration is not supported for Unix domain sockets") 

884 return self 

885 

886 def get_target(self) -> str: 

887 """Get the gRPC target string for channel creation. 

888 

889 Returns: 

890 str: The target string, either host:port or unix:///path format. 

891 """ 

892 if self.uds: 

893 return f"unix://{self.uds}" 

894 return self.target or "" 

895 

896 

897class GRPCServerConfig(BaseModel): 

898 """Server-side gRPC configuration (plugin running as gRPC server). 

899 

900 Attributes: 

901 host (str): Server host to bind to. 

902 port (int): Server port to bind to. 

903 uds (Optional[str]): Unix domain socket path (alternative to host:port). 

904 tls (Optional[GRPCServerTLSConfig]): Server-side TLS configuration. 

905 

906 Examples: 

907 >>> # TCP binding 

908 >>> config = GRPCServerConfig(host="0.0.0.0", port=50051) 

909 >>> config.get_bind_address() 

910 '0.0.0.0:50051' 

911 >>> # Unix domain socket binding (path is resolved to canonical form) 

912 >>> config = GRPCServerConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP 

913 >>> config.get_bind_address() # doctest: +SKIP 

914 'unix:///tmp/grpc-plugin.sock' 

915 """ 

916 

917 host: str = Field(default="127.0.0.1", description="Server host to bind to") 

918 port: int = Field(default=50051, description="Server port to bind to") 

919 uds: Optional[str] = Field(default=None, description="Unix domain socket path") 

920 tls: Optional[GRPCServerTLSConfig] = Field(default=None, description="Server-side TLS configuration") 

921 

922 @field_validator("uds", mode="after") 

923 @classmethod 

924 def validate_uds(cls, uds: str | None) -> str | None: 

925 """Validate Unix domain socket path for gRPC server. 

926 

927 Args: 

928 uds: Unix domain socket path. 

929 

930 Returns: 

931 The validated canonical uds path or None if none is set. 

932 

933 Raises: 

934 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

935 """ 

936 if uds is None: 

937 return uds 

938 if not isinstance(uds, str) or not uds.strip(): 

939 raise ValueError("gRPC server uds must be a non-empty string.") 

940 

941 uds_path = Path(uds).expanduser().resolve() 

942 if not uds_path.is_absolute(): 

943 raise ValueError(f"gRPC server uds path must be absolute: {uds}") 

944 

945 parent_dir = uds_path.parent 

946 if not parent_dir.is_dir(): 

947 raise ValueError(f"gRPC server uds parent directory does not exist: {parent_dir}") 

948 

949 # Check parent directory permissions for security 

950 try: 

951 parent_mode = parent_dir.stat().st_mode 

952 if parent_mode & 0o002: 

953 logging.getLogger(__name__).warning( 

954 "gRPC server uds parent directory %s is world-writable. Consider using a directory with restricted permissions.", 

955 parent_dir, 

956 ) 

957 except OSError: 

958 pass 

959 

960 return str(uds_path) 

961 

962 @model_validator(mode="after") 

963 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument 

964 """Ensure TLS is not configured when using a Unix domain socket. 

965 

966 Returns: 

967 Self after validation. 

968 

969 Raises: 

970 ValueError: if tls is set with uds. 

971 """ 

972 if self.uds and self.tls: 

973 raise ValueError("TLS configuration is not supported for Unix domain sockets") 

974 return self 

975 

976 def get_bind_address(self) -> str: 

977 """Get the gRPC bind address string. 

978 

979 Returns: 

980 str: The bind address, either host:port or unix:///path format. 

981 """ 

982 if self.uds: 

983 return f"unix://{self.uds}" 

984 return f"{self.host}:{self.port}" 

985 

986 @classmethod 

987 def from_env(cls) -> Optional["GRPCServerConfig"]: 

988 """Construct gRPC server configuration from PLUGINS_GRPC_SERVER_* environment variables. 

989 

990 Returns: 

991 GRPCServerConfig instance or None if no environment variables are set. 

992 """ 

993 s = get_grpc_server_settings() 

994 data: dict[str, Any] = {} 

995 

996 if s.grpc_server_host: 

997 data["host"] = s.grpc_server_host 

998 if s.grpc_server_port is not None: 

999 data["port"] = s.grpc_server_port 

1000 if s.grpc_server_uds: 

1001 data["uds"] = s.grpc_server_uds 

1002 

1003 # Check if SSL/TLS is enabled 

1004 if s.grpc_server_ssl_enabled: 

1005 tls_config = GRPCServerTLSConfig.from_env() 

1006 if tls_config: 

1007 data["tls"] = tls_config 

1008 

1009 if not data: 

1010 return None 

1011 

1012 return cls(**data) 

1013 

1014 

1015class UnixSocketClientConfig(BaseModel): 

1016 """Client-side Unix socket configuration (gateway connecting to external plugin). 

1017 

1018 Attributes: 

1019 path (str): Path to the Unix domain socket file. 

1020 reconnect_attempts (int): Number of reconnection attempts on failure. 

1021 reconnect_delay (float): Base delay between reconnection attempts (with exponential backoff). 

1022 timeout (float): Timeout for read operations in seconds. 

1023 

1024 Examples: 

1025 >>> config = UnixSocketClientConfig(path="/tmp/plugin.sock") 

1026 >>> config.path 

1027 '/tmp/plugin.sock' 

1028 >>> config.reconnect_attempts 

1029 3 

1030 """ 

1031 

1032 path: str = Field(..., description="Path to the Unix domain socket") 

1033 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts") 

1034 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)") 

1035 timeout: float = Field(default=30.0, description="Read timeout in seconds") 

1036 

1037 @field_validator("path", mode="after") 

1038 @classmethod 

1039 def validate_path(cls, path: str) -> str: 

1040 """Validate Unix socket path. 

1041 

1042 Args: 

1043 path: The socket path to validate. 

1044 

1045 Returns: 

1046 The validated path. 

1047 

1048 Raises: 

1049 ValueError: If path is empty or invalid. 

1050 """ 

1051 if not path: 

1052 raise ValueError("Unix socket path cannot be empty") 

1053 if not path.startswith("/"): 

1054 raise ValueError(f"Unix socket path must be absolute, got '{path}'") 

1055 return path 

1056 

1057 

1058class UnixSocketServerConfig(BaseModel): 

1059 """Server-side Unix socket configuration (plugin running as Unix socket server). 

1060 

1061 Attributes: 

1062 path (str): Path to the Unix domain socket file. 

1063 

1064 Examples: 

1065 >>> config = UnixSocketServerConfig(path="/tmp/plugin.sock") 

1066 >>> config.path 

1067 '/tmp/plugin.sock' 

1068 """ 

1069 

1070 path: str = Field(default="/tmp/mcpgateway-plugins.sock", description="Path to the Unix domain socket") # nosec B108 - configurable default 

1071 

1072 @classmethod 

1073 def from_env(cls) -> Optional["UnixSocketServerConfig"]: 

1074 """Construct Unix socket server configuration from environment variables. 

1075 

1076 Returns: 

1077 UnixSocketServerConfig instance or None if no environment variables are set. 

1078 """ 

1079 s = get_transport_settings() 

1080 data: dict[str, Any] = {} 

1081 

1082 if s.unix_socket_path: 

1083 data["path"] = s.unix_socket_path 

1084 

1085 if not data: 

1086 return None 

1087 

1088 return cls(**data) 

1089 

1090 

1091class PluginConfig(BaseModel): 

1092 """A plugin configuration. 

1093 

1094 Attributes: 

1095 name (str): The unique name of the plugin. 

1096 description (str): A description of the plugin. 

1097 author (str): The author of the plugin. 

1098 kind (str): The kind or type of plugin. Usually a fully qualified object type. 

1099 namespace (str): The namespace where the plugin resides. 

1100 version (str): version of the plugin. 

1101 hooks (list[str]): a list of the hook points where the plugin will be called. Default: []. 

1102 tags (list[str]): a list of tags for making the plugin searchable. 

1103 mode (bool): whether the plugin is active. 

1104 priority (int): indicates the order in which the plugin is run. Lower = higher priority. Default: 100. 

1105 conditions (Optional[list[PluginCondition]]): the conditions on which the plugin is run. 

1106 applied_to (Optional[list[AppliedTo]]): the tools, fields, that the plugin is applied to. 

1107 config (dict[str, Any]): the plugin specific configurations. 

1108 mcp (Optional[MCPClientConfig]): Client-side MCP configuration (gateway connecting to plugin). 

1109 grpc (Optional[GRPCClientConfig]): Client-side gRPC configuration (gateway connecting to plugin). 

1110 """ 

1111 

1112 name: str 

1113 description: Optional[str] = None 

1114 author: Optional[str] = None 

1115 kind: str 

1116 namespace: Optional[str] = None 

1117 version: Optional[str] = None 

1118 hooks: list[str] = Field(default_factory=list) 

1119 tags: list[str] = Field(default_factory=list) 

1120 mode: PluginMode = PluginMode.ENFORCE 

1121 priority: int = 100 # Lower = higher priority 

1122 conditions: list[PluginCondition] = Field(default_factory=list) # When to apply 

1123 applied_to: Optional[AppliedTo] = None # Fields to apply to. 

1124 config: Optional[dict[str, Any]] = None 

1125 mcp: Optional[MCPClientConfig] = None 

1126 grpc: Optional[GRPCClientConfig] = None 

1127 unix_socket: Optional[UnixSocketClientConfig] = None 

1128 

1129 @model_validator(mode="after") 

1130 def check_url_or_script_filled(self) -> Self: # pylint: disable=bad-classmethod-argument 

1131 """Checks to see that at least one of url or script are set depending on MCP server configuration. 

1132 

1133 Raises: 

1134 ValueError: if the script/cmd attribute is not defined with STDIO set, or the URL not defined with HTTP transports. 

1135 

1136 Returns: 

1137 The model after validation. 

1138 """ 

1139 if not self.mcp: 

1140 return self 

1141 if self.mcp.proto == TransportType.STDIO and not (self.mcp.script or self.mcp.cmd): 

1142 raise ValueError(f"Plugin {self.name} has transport type set to STDIO but no script/cmd value") 

1143 if self.mcp.proto == TransportType.STDIO and self.mcp.script and self.mcp.cmd: 

1144 raise ValueError(f"Plugin {self.name} must set either script or cmd for STDIO, not both") 

1145 if self.mcp.proto in (TransportType.STREAMABLEHTTP, TransportType.SSE) and not self.mcp.url: 

1146 raise ValueError(f"Plugin {self.name} has transport type set to StreamableHTTP but no url value") 

1147 if self.mcp.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP, TransportType.STDIO): 

1148 raise ValueError(f"Plugin {self.name} must set transport type to either SSE or STREAMABLEHTTP or STDIO") 

1149 return self 

1150 

1151 @model_validator(mode="after") 

1152 def check_config_and_external(self, info: ValidationInfo) -> Self: # pylint: disable=bad-classmethod-argument 

1153 """Checks to see that a plugin's 'config' section is not defined if the kind is 'external'. This is because developers cannot override items in the plugin config section for external plugins. 

1154 

1155 Args: 

1156 info: the contextual information passed into the pydantic model during model validation. Used to determine validation sequence. 

1157 

1158 Raises: 

1159 ValueError: if the script attribute is not defined with STDIO set, or the URL not defined with HTTP transports. 

1160 

1161 Returns: 

1162 The model after validation. 

1163 """ 

1164 ignore_config_external = False 

1165 if info and info.context and IGNORE_CONFIG_EXTERNAL in info.context: 

1166 ignore_config_external = info.context[IGNORE_CONFIG_EXTERNAL] 

1167 

1168 if not ignore_config_external and self.config and self.kind == EXTERNAL_PLUGIN_TYPE: 

1169 raise ValueError(f"""Cannot have {self.name} plugin defined as 'external' with 'config' set.""" """ 'config' section settings can only be set on the plugin server.""") 

1170 

1171 # External plugins must have exactly one transport configured (mcp, grpc, or unix_socket) 

1172 if self.kind == EXTERNAL_PLUGIN_TYPE: 

1173 has_mcp = self.mcp is not None 

1174 has_grpc = self.grpc is not None 

1175 has_unix = self.unix_socket is not None 

1176 transport_count = sum([has_mcp, has_grpc, has_unix]) 

1177 

1178 if transport_count == 0: 

1179 raise ValueError(f"External plugin {self.name} must have 'mcp', 'grpc', or 'unix_socket' section configured") 

1180 if transport_count > 1: 

1181 raise ValueError(f"External plugin {self.name} can only have one transport configured (mcp, grpc, or unix_socket)") 

1182 

1183 return self 

1184 

1185 

1186class PluginManifest(BaseModel): 

1187 """Plugin manifest. 

1188 

1189 Attributes: 

1190 description (str): A description of the plugin. 

1191 author (str): The author of the plugin. 

1192 version (str): version of the plugin. 

1193 tags (list[str]): a list of tags for making the plugin searchable. 

1194 available_hooks (list[str]): a list of the hook points where the plugin is callable. 

1195 default_config (dict[str, Any]): the default configurations. 

1196 """ 

1197 

1198 description: str 

1199 author: str 

1200 version: str 

1201 tags: list[str] 

1202 available_hooks: list[str] 

1203 default_config: dict[str, Any] 

1204 

1205 

1206class PluginErrorModel(BaseModel): 

1207 """A plugin error, used to denote exceptions/errors inside external plugins. 

1208 

1209 Attributes: 

1210 message (str): the reason for the error. 

1211 code (str): an error code. 

1212 details: (dict[str, Any]): additional error details. 

1213 plugin_name (str): the plugin name. 

1214 mcp_error_code ([int]): The MCP error code passed back to the client. Defaults to Internal Error. 

1215 """ 

1216 

1217 message: str 

1218 plugin_name: str 

1219 code: Optional[str] = "" 

1220 details: Optional[dict[str, Any]] = Field(default_factory=dict) 

1221 mcp_error_code: int = -32603 

1222 

1223 

1224class PluginViolation(BaseModel): 

1225 """A plugin violation, used to denote policy violations. 

1226 

1227 Attributes: 

1228 reason (str): the reason for the violation. 

1229 description (str): a longer description of the violation. 

1230 code (str): a violation code. 

1231 details: (dict[str, Any]): additional violation details. 

1232 _plugin_name (str): the plugin name, private attribute set by the plugin manager. 

1233 mcp_error_code(Optional[int]): A valid mcp error code which will be sent back to the client if plugin enabled. 

1234 

1235 Examples: 

1236 >>> violation = PluginViolation( 

1237 ... reason="Invalid input", 

1238 ... description="The input contains prohibited content", 

1239 ... code="PROHIBITED_CONTENT", 

1240 ... details={"field": "message", "value": "test"} 

1241 ... ) 

1242 >>> violation.reason 

1243 'Invalid input' 

1244 >>> violation.code 

1245 'PROHIBITED_CONTENT' 

1246 >>> violation.plugin_name = "content_filter" 

1247 >>> violation.plugin_name 

1248 'content_filter' 

1249 """ 

1250 

1251 reason: str 

1252 description: str 

1253 code: str 

1254 details: Optional[dict[str, Any]] = Field(default_factory=dict) 

1255 _plugin_name: str = PrivateAttr(default="") 

1256 mcp_error_code: Optional[int] = None 

1257 

1258 @property 

1259 def plugin_name(self) -> str: 

1260 """Getter for the plugin name attribute. 

1261 

1262 Returns: 

1263 The plugin name associated with the violation. 

1264 """ 

1265 return self._plugin_name 

1266 

1267 @plugin_name.setter 

1268 def plugin_name(self, name: str) -> None: 

1269 """Setter for the plugin_name attribute. 

1270 

1271 Args: 

1272 name: the plugin name. 

1273 

1274 Raises: 

1275 ValueError: if name is empty or not a string. 

1276 """ 

1277 if not isinstance(name, str) or not name.strip(): 

1278 raise ValueError("Name must be a non-empty string.") 

1279 self._plugin_name = name 

1280 

1281 

1282class PluginSettings(BaseModel): 

1283 """Global plugin settings. 

1284 

1285 Attributes: 

1286 parallel_execution_within_band (bool): execute plugins with same priority in parallel. 

1287 plugin_timeout (int): timeout value for plugins operations. 

1288 fail_on_plugin_error (bool): error when there is a plugin connectivity or ignore. 

1289 enable_plugin_api (bool): enable or disable plugins globally. 

1290 plugin_health_check_interval (int): health check interval check. 

1291 include_user_info (bool): if enabled user info is injected in plugin context 

1292 """ 

1293 

1294 parallel_execution_within_band: bool = False 

1295 plugin_timeout: int = 30 

1296 fail_on_plugin_error: bool = False 

1297 enable_plugin_api: bool = False 

1298 plugin_health_check_interval: int = 60 

1299 include_user_info: bool = False 

1300 

1301 

1302class Config(BaseModel): 

1303 """Configurations for plugins. 

1304 

1305 Attributes: 

1306 plugins (Optional[list[PluginConfig]]): the list of plugins to enable. 

1307 plugin_dirs (list[str]): The directories in which to look for plugins. 

1308 plugin_settings (PluginSettings): global settings for plugins. 

1309 server_settings (Optional[MCPServerConfig]): Server-side MCP configuration (when plugins run as server). 

1310 grpc_server_settings (Optional[GRPCServerConfig]): Server-side gRPC configuration (when plugins run as gRPC server). 

1311 unix_socket_server_settings (Optional[UnixSocketServerConfig]): Server-side Unix socket configuration. 

1312 """ 

1313 

1314 plugins: Optional[list[PluginConfig]] = [] 

1315 plugin_dirs: list[str] = [] 

1316 plugin_settings: PluginSettings 

1317 server_settings: Optional[MCPServerConfig] = None 

1318 grpc_server_settings: Optional[GRPCServerConfig] = None 

1319 unix_socket_server_settings: Optional[UnixSocketServerConfig] = None 

1320 

1321 

1322class PluginResult(BaseModel, Generic[T]): 

1323 """A result of the plugin hook processing. The actual type is dependent on the hook. 

1324 

1325 Attributes: 

1326 continue_processing (bool): Whether to stop processing. 

1327 modified_payload (Optional[Any]): The modified payload if the plugin is a transformer. 

1328 violation (Optional[PluginViolation]): violation object. 

1329 metadata (Optional[dict[str, Any]]): additional metadata. 

1330 

1331 Examples: 

1332 >>> result = PluginResult() 

1333 >>> result.continue_processing 

1334 True 

1335 >>> result.metadata 

1336 {} 

1337 >>> from mcpgateway.plugins.framework import PluginViolation 

1338 >>> violation = PluginViolation( 

1339 ... reason="Test", description="Test desc", code="TEST", details={} 

1340 ... ) 

1341 >>> result2 = PluginResult(continue_processing=False, violation=violation) 

1342 >>> result2.continue_processing 

1343 False 

1344 >>> result2.violation.code 

1345 'TEST' 

1346 >>> r = PluginResult(metadata={"key": "value"}) 

1347 >>> r.metadata["key"] 

1348 'value' 

1349 >>> r2 = PluginResult(continue_processing=False) 

1350 >>> r2.continue_processing 

1351 False 

1352 """ 

1353 

1354 continue_processing: bool = True 

1355 modified_payload: Optional[T] = None 

1356 violation: Optional[PluginViolation] = None 

1357 metadata: Optional[dict[str, Any]] = Field(default_factory=dict) 

1358 

1359 

1360class GlobalContext(BaseModel): 

1361 """The global context, which shared across all plugins. 

1362 

1363 Attributes: 

1364 request_id (str): ID of the HTTP request. 

1365 user (str): user ID associated with the request. 

1366 tenant_id (str): tenant ID. 

1367 server_id (str): server ID. 

1368 metadata (Optional[dict[str,Any]]): a global shared metadata across plugins (Read-only from plugin's perspective). 

1369 state (Optional[dict[str,Any]]): a global shared state across plugins. 

1370 

1371 Examples: 

1372 >>> ctx = GlobalContext(request_id="req-123") 

1373 >>> ctx.request_id 

1374 'req-123' 

1375 >>> ctx.user is None 

1376 True 

1377 >>> ctx2 = GlobalContext(request_id="req-456", user="alice", tenant_id="tenant1") 

1378 >>> ctx2.user 

1379 'alice' 

1380 >>> ctx2.tenant_id 

1381 'tenant1' 

1382 >>> c = GlobalContext(request_id="123", server_id="srv1") 

1383 >>> c.request_id 

1384 '123' 

1385 >>> c.server_id 

1386 'srv1' 

1387 """ 

1388 

1389 request_id: str 

1390 user: Optional[Union[str, dict[str, Any]]] = None 

1391 tenant_id: Optional[str] = None 

1392 server_id: Optional[str] = None 

1393 state: dict[str, Any] = Field(default_factory=dict) 

1394 metadata: dict[str, Any] = Field(default_factory=dict) 

1395 

1396 

1397class PluginContext(BaseModel): 

1398 """The plugin's context, which lasts a request lifecycle. 

1399 

1400 Attributes: 

1401 state: the inmemory state of the request. 

1402 global_context: the context that is shared across plugins. 

1403 metadata: plugin meta data. 

1404 

1405 Examples: 

1406 >>> gctx = GlobalContext(request_id="req-123") 

1407 >>> ctx = PluginContext(global_context=gctx) 

1408 >>> ctx.global_context.request_id 

1409 'req-123' 

1410 >>> ctx.global_context.user is None 

1411 True 

1412 >>> ctx.state["somekey"] = "some value" 

1413 >>> ctx.state["somekey"] 

1414 'some value' 

1415 """ 

1416 

1417 state: dict[str, Any] = Field(default_factory=dict) 

1418 global_context: GlobalContext 

1419 metadata: dict[str, Any] = Field(default_factory=dict) 

1420 

1421 def get_state(self, key: str, default: Any = None) -> Any: 

1422 """Get value from shared state. 

1423 

1424 Args: 

1425 key: The key to access the shared state. 

1426 default: A default value if one doesn't exist. 

1427 

1428 Returns: 

1429 The state value. 

1430 """ 

1431 return self.state.get(key, default) 

1432 

1433 def set_state(self, key: str, value: Any) -> None: 

1434 """Set value in shared state. 

1435 

1436 Args: 

1437 key: the key to add to the state. 

1438 value: the value to add to the state. 

1439 """ 

1440 self.state[key] = value 

1441 

1442 async def cleanup(self) -> None: 

1443 """Cleanup context resources.""" 

1444 self.state.clear() 

1445 self.metadata.clear() 

1446 

1447 def is_empty(self) -> bool: 

1448 """Check whether the state and metadata objects are empty. 

1449 

1450 Returns: 

1451 True if the context state and metadata are empty. 

1452 """ 

1453 return not (self.state or self.metadata or self.global_context.state) 

1454 

1455 

1456PluginContextTable = dict[str, PluginContext] 

1457 

1458 

1459class PluginPayload(BaseModel): 

1460 """Base class for all hook payloads. Immutable by design. 

1461 

1462 Frozen payloads prevent in-place mutations by plugins -- attributes 

1463 cannot be set directly on the object. Plugins must use 

1464 ``model_copy(update=...)`` to create modified payloads and return 

1465 modifications via ``PluginResult.modified_payload``. 

1466 

1467 Examples: 

1468 >>> class TestPayload(PluginPayload): 

1469 ... name: str 

1470 >>> p = TestPayload(name="test") 

1471 >>> p.name 

1472 'test' 

1473 """ 

1474 

1475 model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)