Coverage for mcpgateway / plugins / framework / models.py: 100%

572 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/models.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor, Mihai Criveti 

6 

7Pydantic models for plugins. 

8This module implements the pydantic models associated with 

9the base plugin layer including configurations, and contexts. 

10""" 

11 

12# Standard 

13from enum import Enum 

14import logging 

15import os 

16from pathlib import Path 

17from typing import Any, Generic, Optional, Self, TypeAlias, TypeVar, Union 

18 

19# Third-Party 

20from pydantic import BaseModel, Field, field_serializer, field_validator, model_validator, PrivateAttr, ValidationInfo 

21 

22# First-Party 

23from mcpgateway.common.models import TransportType 

24from mcpgateway.common.validators import SecurityValidator 

25from mcpgateway.plugins.framework.constants import CMD, CWD, ENV, EXTERNAL_PLUGIN_TYPE, IGNORE_CONFIG_EXTERNAL, PYTHON_SUFFIX, SCRIPT, UDS, URL 

26 

27T = TypeVar("T") 

28 

29 

30class PluginMode(str, Enum): 

31 """Plugin modes of operation. 

32 

33 Attributes: 

34 enforce: enforces the plugin result, and blocks execution when there is an error. 

35 enforce_ignore_error: enforces the plugin result, but allows execution when there is an error. 

36 permissive: audits the result. 

37 disabled: plugin disabled. 

38 

39 Examples: 

40 >>> PluginMode.ENFORCE 

41 <PluginMode.ENFORCE: 'enforce'> 

42 >>> PluginMode.ENFORCE_IGNORE_ERROR 

43 <PluginMode.ENFORCE_IGNORE_ERROR: 'enforce_ignore_error'> 

44 >>> PluginMode.PERMISSIVE.value 

45 'permissive' 

46 >>> PluginMode('disabled') 

47 <PluginMode.DISABLED: 'disabled'> 

48 >>> 'enforce' in [m.value for m in PluginMode] 

49 True 

50 """ 

51 

52 ENFORCE = "enforce" 

53 ENFORCE_IGNORE_ERROR = "enforce_ignore_error" 

54 PERMISSIVE = "permissive" 

55 DISABLED = "disabled" 

56 

57 

58class BaseTemplate(BaseModel): 

59 """Base Template.The ToolTemplate, PromptTemplate and ResourceTemplate could be extended using this 

60 

61 Attributes: 

62 context (Optional[list[str]]): specifies the keys of context to be extracted. The context could be global (shared between the plugins) or 

63 local (shared within the plugin). Example: global.key1. 

64 extensions (Optional[dict[str, Any]]): add custom keys for your specific plugin. Example - 'policy' 

65 key for opa plugin. 

66 

67 Examples: 

68 >>> base = BaseTemplate(context=["global.key1.key2", "local.key1.key2"]) 

69 >>> base.context 

70 ['global.key1.key2', 'local.key1.key2'] 

71 >>> base = BaseTemplate(context=["global.key1.key2"], extensions={"policy" : "sample policy"}) 

72 >>> base.extensions 

73 {'policy': 'sample policy'} 

74 """ 

75 

76 context: Optional[list[str]] = None 

77 extensions: Optional[dict[str, Any]] = None 

78 

79 

80class ToolTemplate(BaseTemplate): 

81 """Tool Template. 

82 

83 Attributes: 

84 tool_name (str): the name of the tool. 

85 fields (Optional[list[str]]): the tool fields that are affected. 

86 result (bool): analyze tool output if true. 

87 

88 Examples: 

89 >>> tool = ToolTemplate(tool_name="my_tool") 

90 >>> tool.tool_name 

91 'my_tool' 

92 >>> tool.result 

93 False 

94 >>> tool2 = ToolTemplate(tool_name="analyzer", fields=["input", "params"], result=True) 

95 >>> tool2.fields 

96 ['input', 'params'] 

97 >>> tool2.result 

98 True 

99 """ 

100 

101 tool_name: str 

102 fields: Optional[list[str]] = None 

103 result: bool = False 

104 

105 

106class PromptTemplate(BaseTemplate): 

107 """Prompt Template. 

108 

109 Attributes: 

110 prompt_name (str): the name of the prompt. 

111 fields (Optional[list[str]]): the prompt fields that are affected. 

112 result (bool): analyze tool output if true. 

113 

114 Examples: 

115 >>> prompt = PromptTemplate(prompt_name="greeting") 

116 >>> prompt.prompt_name 

117 'greeting' 

118 >>> prompt.result 

119 False 

120 >>> prompt2 = PromptTemplate(prompt_name="question", fields=["context"], result=True) 

121 >>> prompt2.fields 

122 ['context'] 

123 """ 

124 

125 prompt_name: str 

126 fields: Optional[list[str]] = None 

127 result: bool = False 

128 

129 

130class ResourceTemplate(BaseTemplate): 

131 """Resource Template. 

132 

133 Attributes: 

134 resource_uri (str): the URI of the resource. 

135 fields (Optional[list[str]]): the resource fields that are affected. 

136 result (bool): analyze resource output if true. 

137 

138 Examples: 

139 >>> resource = ResourceTemplate(resource_uri="file:///data.txt") 

140 >>> resource.resource_uri 

141 'file:///data.txt' 

142 >>> resource.result 

143 False 

144 >>> resource2 = ResourceTemplate(resource_uri="http://api/data", fields=["content"], result=True) 

145 >>> resource2.fields 

146 ['content'] 

147 """ 

148 

149 resource_uri: str 

150 fields: Optional[list[str]] = None 

151 result: bool = False 

152 

153 

154class PluginCondition(BaseModel): 

155 """Conditions for when plugin should execute. 

156 

157 Attributes: 

158 server_ids (Optional[set[str]]): set of server ids. 

159 tenant_ids (Optional[set[str]]): set of tenant ids. 

160 tools (Optional[set[str]]): set of tool names. 

161 prompts (Optional[set[str]]): set of prompt names. 

162 resources (Optional[set[str]]): set of resource URIs. 

163 agents (Optional[set[str]]): set of agent IDs. 

164 user_pattern (Optional[list[str]]): list of user patterns. 

165 content_types (Optional[list[str]]): list of content types. 

166 

167 Examples: 

168 >>> cond = PluginCondition(server_ids={"server1", "server2"}) 

169 >>> "server1" in cond.server_ids 

170 True 

171 >>> cond2 = PluginCondition(tools={"tool1"}, prompts={"prompt1"}) 

172 >>> cond2.tools 

173 {'tool1'} 

174 >>> cond3 = PluginCondition(user_patterns=["admin", "root"]) 

175 >>> len(cond3.user_patterns) 

176 2 

177 """ 

178 

179 server_ids: Optional[set[str]] = None 

180 tenant_ids: Optional[set[str]] = None 

181 tools: Optional[set[str]] = None 

182 prompts: Optional[set[str]] = None 

183 resources: Optional[set[str]] = None 

184 agents: Optional[set[str]] = None 

185 user_patterns: Optional[list[str]] = None 

186 content_types: Optional[list[str]] = None 

187 

188 @field_serializer("server_ids", "tenant_ids", "tools", "prompts", "resources", "agents") 

189 def serialize_set(self, value: set[str] | None) -> list[str] | None: 

190 """Serialize set objects in PluginCondition for MCP. 

191 

192 Args: 

193 value: a set of server ids, tenant ids, tools or prompts. 

194 

195 Returns: 

196 The set as a serializable list. 

197 """ 

198 if value: 

199 values = [] 

200 for key in value: 

201 values.append(key) 

202 return values 

203 return None 

204 

205 

206class AppliedTo(BaseModel): 

207 """What tools/prompts/resources and fields the plugin will be applied to. 

208 

209 Attributes: 

210 tools (Optional[list[ToolTemplate]]): tools and fields to be applied. 

211 prompts (Optional[list[PromptTemplate]]): prompts and fields to be applied. 

212 resources (Optional[list[ResourceTemplate]]): resources and fields to be applied. 

213 global_context (Optional[list[str]]): keys in the context to be applied on globally 

214 local_context(Optional[list[str]]): keys in the context to be applied on locally 

215 """ 

216 

217 tools: Optional[list[ToolTemplate]] = None 

218 prompts: Optional[list[PromptTemplate]] = None 

219 resources: Optional[list[ResourceTemplate]] = None 

220 

221 

222class MCPTransportTLSConfigBase(BaseModel): 

223 """Base TLS configuration with common fields for both client and server. 

224 

225 Attributes: 

226 certfile (Optional[str]): Path to the PEM-encoded certificate file. 

227 keyfile (Optional[str]): Path to the PEM-encoded private key file. 

228 ca_bundle (Optional[str]): Path to a CA bundle file for verification. 

229 keyfile_password (Optional[str]): Optional password for encrypted private key. 

230 """ 

231 

232 certfile: Optional[str] = Field(default=None, description="Path to PEM certificate file") 

233 keyfile: Optional[str] = Field(default=None, description="Path to PEM private key file") 

234 ca_bundle: Optional[str] = Field(default=None, description="Path to CA bundle for verification") 

235 keyfile_password: Optional[str] = Field(default=None, description="Password for encrypted private key") 

236 

237 @field_validator("ca_bundle", "certfile", "keyfile", mode="after") 

238 @classmethod 

239 def validate_path(cls, value: Optional[str]) -> Optional[str]: 

240 """Expand and validate file paths supplied in TLS configuration. 

241 

242 Args: 

243 value: File path to validate. 

244 

245 Returns: 

246 Expanded file path or None if not provided. 

247 

248 Raises: 

249 ValueError: If file path does not exist. 

250 """ 

251 

252 if not value: 

253 return value 

254 expanded = Path(value).expanduser() 

255 if not expanded.is_file(): 

256 raise ValueError(f"TLS file path does not exist: {value}") 

257 return str(expanded) 

258 

259 @model_validator(mode="after") 

260 def validate_cert_key(self) -> Self: # pylint: disable=bad-classmethod-argument 

261 """Ensure certificate and key options are consistent. 

262 

263 Returns: 

264 Self after validation. 

265 

266 Raises: 

267 ValueError: If keyfile is specified without certfile. 

268 """ 

269 

270 if self.keyfile and not self.certfile: 

271 raise ValueError("keyfile requires certfile to be specified") 

272 return self 

273 

274 @staticmethod 

275 def _parse_bool(value: Optional[str]) -> Optional[bool]: 

276 """Convert a string environment value to boolean. 

277 

278 Args: 

279 value: String value to parse as boolean. 

280 

281 Returns: 

282 Boolean value or None if value is None. 

283 

284 Raises: 

285 ValueError: If value is not a valid boolean string. 

286 """ 

287 

288 if value is None: 

289 return None 

290 normalized = value.strip().lower() 

291 if normalized in {"1", "true", "yes", "on"}: 

292 return True 

293 if normalized in {"0", "false", "no", "off"}: 

294 return False 

295 raise ValueError(f"Invalid boolean value: {value}") 

296 

297 

298class MCPClientTLSConfig(MCPTransportTLSConfigBase): 

299 """Client-side TLS configuration (gateway connecting to plugin). 

300 

301 Attributes: 

302 verify (bool): Whether to verify the remote server certificate. 

303 check_hostname (bool): Enable hostname verification when verify is true. 

304 """ 

305 

306 verify: bool = Field(default=True, description="Verify the upstream server certificate") 

307 check_hostname: bool = Field(default=True, description="Enable hostname verification") 

308 

309 @classmethod 

310 def from_env(cls) -> Optional["MCPClientTLSConfig"]: 

311 """Construct client TLS configuration from PLUGINS_CLIENT_* environment variables. 

312 

313 Returns: 

314 MCPClientTLSConfig instance or None if no environment variables are set. 

315 """ 

316 

317 env = os.environ 

318 data: dict[str, Any] = {} 

319 

320 if env.get("PLUGINS_CLIENT_MTLS_CERTFILE"): 

321 data["certfile"] = env["PLUGINS_CLIENT_MTLS_CERTFILE"] 

322 if env.get("PLUGINS_CLIENT_MTLS_KEYFILE"): 

323 data["keyfile"] = env["PLUGINS_CLIENT_MTLS_KEYFILE"] 

324 if env.get("PLUGINS_CLIENT_MTLS_CA_BUNDLE"): 

325 data["ca_bundle"] = env["PLUGINS_CLIENT_MTLS_CA_BUNDLE"] 

326 if env.get("PLUGINS_CLIENT_MTLS_KEYFILE_PASSWORD") is not None: 

327 data["keyfile_password"] = env["PLUGINS_CLIENT_MTLS_KEYFILE_PASSWORD"] 

328 

329 verify_val = cls._parse_bool(env.get("PLUGINS_CLIENT_MTLS_VERIFY")) 

330 if verify_val is not None: 

331 data["verify"] = verify_val 

332 

333 check_hostname_val = cls._parse_bool(env.get("PLUGINS_CLIENT_MTLS_CHECK_HOSTNAME")) 

334 if check_hostname_val is not None: 

335 data["check_hostname"] = check_hostname_val 

336 

337 if not data: 

338 return None 

339 

340 return cls(**data) 

341 

342 

343class MCPServerTLSConfig(MCPTransportTLSConfigBase): 

344 """Server-side TLS configuration (plugin accepting gateway connections). 

345 

346 Attributes: 

347 ssl_cert_reqs (int): Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED). 

348 """ 

349 

350 ssl_cert_reqs: int = Field(default=2, description="Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED)") 

351 

352 @classmethod 

353 def from_env(cls) -> Optional["MCPServerTLSConfig"]: 

354 """Construct server TLS configuration from PLUGINS_SERVER_SSL_* environment variables. 

355 

356 Returns: 

357 MCPServerTLSConfig instance or None if no environment variables are set. 

358 

359 Raises: 

360 ValueError: If PLUGINS_SERVER_SSL_CERT_REQS is not a valid integer. 

361 """ 

362 

363 env = os.environ 

364 data: dict[str, Any] = {} 

365 

366 if env.get("PLUGINS_SERVER_SSL_KEYFILE"): 

367 data["keyfile"] = env["PLUGINS_SERVER_SSL_KEYFILE"] 

368 if env.get("PLUGINS_SERVER_SSL_CERTFILE"): 

369 data["certfile"] = env["PLUGINS_SERVER_SSL_CERTFILE"] 

370 if env.get("PLUGINS_SERVER_SSL_CA_CERTS"): 

371 data["ca_bundle"] = env["PLUGINS_SERVER_SSL_CA_CERTS"] 

372 if env.get("PLUGINS_SERVER_SSL_KEYFILE_PASSWORD") is not None: 

373 data["keyfile_password"] = env["PLUGINS_SERVER_SSL_KEYFILE_PASSWORD"] 

374 

375 if env.get("PLUGINS_SERVER_SSL_CERT_REQS"): 

376 try: 

377 data["ssl_cert_reqs"] = int(env["PLUGINS_SERVER_SSL_CERT_REQS"]) 

378 except ValueError: 

379 raise ValueError(f"Invalid PLUGINS_SERVER_SSL_CERT_REQS: {env['PLUGINS_SERVER_SSL_CERT_REQS']}") 

380 

381 if not data: 

382 return None 

383 

384 return cls(**data) 

385 

386 

387class MCPServerConfig(BaseModel): 

388 """Server-side MCP configuration (plugin running as server). 

389 

390 Attributes: 

391 host (str): Server host to bind to. 

392 port (int): Server port to bind to. 

393 uds (Optional[str]): Unix domain socket path for streamable HTTP. 

394 tls (Optional[MCPServerTLSConfig]): Server-side TLS configuration. 

395 """ 

396 

397 host: str = Field(default="127.0.0.1", description="Server host to bind to") 

398 port: int = Field(default=8000, description="Server port to bind to") 

399 uds: Optional[str] = Field(default=None, description="Unix domain socket path for streamable HTTP") 

400 tls: Optional[MCPServerTLSConfig] = Field(default=None, description="Server-side TLS configuration") 

401 

402 @field_validator("uds", mode="after") 

403 @classmethod 

404 def validate_uds(cls, uds: str | None) -> str | None: 

405 """Validate the Unix domain socket path for security. 

406 

407 Args: 

408 uds: Unix domain socket path. 

409 

410 Returns: 

411 The validated canonical uds path or None if none is set. 

412 

413 Raises: 

414 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

415 """ 

416 if uds is None: 

417 return uds 

418 if not isinstance(uds, str) or not uds.strip(): 

419 raise ValueError("MCP server uds must be a non-empty string.") 

420 

421 uds_path = Path(uds).expanduser().resolve() 

422 if not uds_path.is_absolute(): 

423 raise ValueError(f"MCP server uds path must be absolute: {uds}") 

424 

425 parent_dir = uds_path.parent 

426 if not parent_dir.is_dir(): 

427 raise ValueError(f"MCP server uds parent directory does not exist: {parent_dir}") 

428 

429 # Check parent directory permissions for security 

430 try: 

431 parent_mode = parent_dir.stat().st_mode 

432 # Warn if parent directory is world-writable (o+w = 0o002) 

433 if parent_mode & 0o002: 

434 logging.getLogger(__name__).warning( 

435 "MCP server uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).", 

436 parent_dir, 

437 ) 

438 except OSError: 

439 pass # Best effort - continue if we can't check permissions 

440 

441 return str(uds_path) 

442 

443 @model_validator(mode="after") 

444 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument 

445 """Ensure TLS is not configured when using a Unix domain socket. 

446 

447 Returns: 

448 Self after validation. 

449 

450 Raises: 

451 ValueError: if tls is set with uds. 

452 """ 

453 if self.uds and self.tls: 

454 raise ValueError("TLS configuration is not supported for Unix domain sockets.") 

455 return self 

456 

457 @staticmethod 

458 def _parse_bool(value: Optional[str]) -> Optional[bool]: 

459 """Convert a string environment value to boolean. 

460 

461 Args: 

462 value: String value to parse as boolean. 

463 

464 Returns: 

465 Boolean value or None if value is None. 

466 

467 Raises: 

468 ValueError: If value is not a valid boolean string. 

469 """ 

470 

471 if value is None: 

472 return None 

473 normalized = value.strip().lower() 

474 if normalized in {"1", "true", "yes", "on"}: 

475 return True 

476 if normalized in {"0", "false", "no", "off"}: 

477 return False 

478 raise ValueError(f"Invalid boolean value: {value}") 

479 

480 @classmethod 

481 def from_env(cls) -> Optional["MCPServerConfig"]: 

482 """Construct server configuration from PLUGINS_SERVER_* environment variables. 

483 

484 Returns: 

485 MCPServerConfig instance or None if no environment variables are set. 

486 

487 Raises: 

488 ValueError: If PLUGINS_SERVER_PORT is not a valid integer. 

489 """ 

490 

491 env = os.environ 

492 data: dict[str, Any] = {} 

493 

494 if env.get("PLUGINS_SERVER_HOST"): 

495 data["host"] = env["PLUGINS_SERVER_HOST"] 

496 if env.get("PLUGINS_SERVER_PORT"): 

497 try: 

498 data["port"] = int(env["PLUGINS_SERVER_PORT"]) 

499 except ValueError: 

500 raise ValueError(f"Invalid PLUGINS_SERVER_PORT: {env['PLUGINS_SERVER_PORT']}") 

501 if env.get("PLUGINS_SERVER_UDS"): 

502 data["uds"] = env["PLUGINS_SERVER_UDS"] 

503 

504 # Check if SSL/TLS is enabled 

505 ssl_enabled = cls._parse_bool(env.get("PLUGINS_SERVER_SSL_ENABLED")) 

506 if ssl_enabled: 

507 # Load TLS configuration 

508 tls_config = MCPServerTLSConfig.from_env() 

509 if tls_config: 

510 data["tls"] = tls_config 

511 

512 if not data: 

513 return None 

514 

515 return cls(**data) 

516 

517 

518class MCPClientConfig(BaseModel): 

519 """Client-side MCP configuration (gateway connecting to external plugin). 

520 

521 Attributes: 

522 proto (TransportType): The MCP transport type. Can be SSE, STDIO, or STREAMABLEHTTP 

523 url (Optional[str]): An MCP URL. Only valid when MCP transport type is SSE or STREAMABLEHTTP. 

524 script (Optional[str]): The path and name to the STDIO script that runs the plugin server. Only valid for STDIO type. 

525 cmd (Optional[list[str]]): Command + args used to start a STDIO MCP server. Only valid for STDIO type. 

526 env (Optional[dict[str, str]]): Environment overrides for STDIO server process. 

527 cwd (Optional[str]): Working directory for STDIO server process. 

528 uds (Optional[str]): Unix domain socket path for streamable HTTP. 

529 tls (Optional[MCPClientTLSConfig]): Client-side TLS configuration for mTLS. 

530 """ 

531 

532 proto: TransportType 

533 url: Optional[str] = None 

534 script: Optional[str] = None 

535 cmd: Optional[list[str]] = None 

536 env: Optional[dict[str, str]] = None 

537 cwd: Optional[str] = None 

538 uds: Optional[str] = None 

539 tls: Optional[MCPClientTLSConfig] = None 

540 

541 @field_validator(URL, mode="after") 

542 @classmethod 

543 def validate_url(cls, url: str | None) -> str | None: 

544 """Validate a MCP url for streamable HTTP connections. 

545 

546 Args: 

547 url: the url to be validated. 

548 

549 Raises: 

550 ValueError: if the URL fails validation. 

551 

552 Returns: 

553 The validated URL or None if none is set. 

554 """ 

555 if url: 

556 result = SecurityValidator.validate_url(url) 

557 return result 

558 return url 

559 

560 @field_validator(SCRIPT, mode="after") 

561 @classmethod 

562 def validate_script(cls, script: str | None) -> str | None: 

563 """Validate an MCP stdio script. 

564 

565 Args: 

566 script: the script to be validated. 

567 

568 Raises: 

569 ValueError: if the script doesn't exist or isn't executable when required. 

570 

571 Returns: 

572 The validated string or None if none is set. 

573 """ 

574 if script: 

575 file_path = Path(script).expanduser() 

576 # Allow relative paths; they are resolved at runtime (optionally using cwd). 

577 if file_path.is_absolute(): 

578 if not file_path.is_file(): 

579 raise ValueError(f"MCP server script {script} does not exist.") 

580 # Allow Python (.py) and shell scripts (.sh). Other files must be executable. 

581 if file_path.suffix not in {PYTHON_SUFFIX, ".sh"} and not os.access(file_path, os.X_OK): 

582 raise ValueError(f"MCP server script {script} must be executable.") 

583 return script 

584 

585 @field_validator(CMD, mode="after") 

586 @classmethod 

587 def validate_cmd(cls, cmd: list[str] | None) -> list[str] | None: 

588 """Validate an MCP stdio command. 

589 

590 Args: 

591 cmd: the command to be validated. 

592 

593 Raises: 

594 ValueError: if cmd is empty or contains empty values. 

595 

596 Returns: 

597 The validated command list or None if none is set. 

598 """ 

599 if cmd is None: 

600 return cmd 

601 if not isinstance(cmd, list) or not cmd: 

602 raise ValueError("MCP stdio cmd must be a non-empty list.") 

603 if not all(isinstance(part, str) and part.strip() for part in cmd): 

604 raise ValueError("MCP stdio cmd entries must be non-empty strings.") 

605 return cmd 

606 

607 @field_validator(ENV, mode="after") 

608 @classmethod 

609 def validate_env(cls, env: dict[str, str] | None) -> dict[str, str] | None: 

610 """Validate environment overrides for MCP stdio. 

611 

612 Args: 

613 env: Environment overrides to set for the stdio plugin process. 

614 

615 Returns: 

616 The validated environment dict or None if none is set. 

617 

618 Raises: 

619 ValueError: if keys/values are invalid or the dict is empty. 

620 """ 

621 if env is None: 

622 return env 

623 if not isinstance(env, dict) or not env: 

624 raise ValueError("MCP stdio env must be a non-empty dict.") 

625 for key, value in env.items(): 

626 if not isinstance(key, str) or not key.strip(): 

627 raise ValueError("MCP stdio env keys must be non-empty strings.") 

628 if not isinstance(value, str): 

629 raise ValueError("MCP stdio env values must be strings.") 

630 return env 

631 

632 @field_validator(CWD, mode="after") 

633 @classmethod 

634 def validate_cwd(cls, cwd: str | None) -> str | None: 

635 """Validate the working directory for MCP stdio. 

636 

637 Args: 

638 cwd: Working directory for the stdio plugin process. 

639 

640 Returns: 

641 The validated canonical cwd path or None if none is set. 

642 

643 Raises: 

644 ValueError: if cwd does not exist or is not a directory. 

645 """ 

646 if not cwd: 

647 return cwd 

648 cwd_path = Path(cwd).expanduser().resolve() 

649 if not cwd_path.is_dir(): 

650 raise ValueError(f"MCP stdio cwd {cwd} does not exist or is not a directory.") 

651 return str(cwd_path) 

652 

653 @field_validator(UDS, mode="after") 

654 @classmethod 

655 def validate_uds(cls, uds: str | None) -> str | None: 

656 """Validate a Unix domain socket path for streamable HTTP. 

657 

658 Args: 

659 uds: Unix domain socket path. 

660 

661 Returns: 

662 The validated canonical uds path or None if none is set. 

663 

664 Raises: 

665 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

666 """ 

667 if uds is None: 

668 return uds 

669 if not isinstance(uds, str) or not uds.strip(): 

670 raise ValueError("MCP client uds must be a non-empty string.") 

671 

672 uds_path = Path(uds).expanduser().resolve() 

673 if not uds_path.is_absolute(): 

674 raise ValueError(f"MCP client uds path must be absolute: {uds}") 

675 

676 parent_dir = uds_path.parent 

677 if not parent_dir.is_dir(): 

678 raise ValueError(f"MCP client uds parent directory does not exist: {parent_dir}") 

679 

680 # Check parent directory permissions for security 

681 try: 

682 parent_mode = parent_dir.stat().st_mode 

683 # Warn if parent directory is world-writable (o+w = 0o002) 

684 if parent_mode & 0o002: 

685 logging.getLogger(__name__).warning( 

686 "MCP client uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).", 

687 parent_dir, 

688 ) 

689 except OSError: 

690 pass # Best effort - continue if we can't check permissions 

691 

692 return str(uds_path) 

693 

694 @model_validator(mode="after") 

695 def validate_tls_usage(self) -> Self: # pylint: disable=bad-classmethod-argument 

696 """Ensure TLS configuration is only used with HTTP-based transports. 

697 

698 Returns: 

699 Self after validation. 

700 

701 Raises: 

702 ValueError: If TLS configuration is used with non-HTTP transports. 

703 """ 

704 

705 if self.tls and self.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP): 

706 raise ValueError("TLS configuration is only valid for HTTP/SSE transports") 

707 if self.uds and self.tls: 

708 raise ValueError("TLS configuration is not supported for Unix domain sockets.") 

709 return self 

710 

711 @model_validator(mode="after") 

712 def validate_transport_fields(self) -> Self: # pylint: disable=bad-classmethod-argument 

713 """Ensure transport-specific fields are only used with matching transports. 

714 

715 Returns: 

716 Self after validation. 

717 

718 Raises: 

719 ValueError: if fields are incompatible with the selected transport. 

720 """ 

721 if self.proto == TransportType.STDIO and self.url: 

722 raise ValueError("URL is only valid for HTTP/SSE transports") 

723 if self.proto != TransportType.STDIO and (self.script or self.cmd or self.env or self.cwd): 

724 raise ValueError("script/cmd/env/cwd are only valid for STDIO transport") 

725 if self.proto != TransportType.STREAMABLEHTTP and self.uds: 

726 raise ValueError("uds is only valid for STREAMABLEHTTP transport") 

727 return self 

728 

729 

730class GRPCClientTLSConfig(MCPTransportTLSConfigBase): 

731 """Client-side gRPC TLS configuration (gateway connecting to plugin). 

732 

733 Attributes: 

734 verify (bool): Whether to verify the remote server certificate. 

735 """ 

736 

737 verify: bool = Field(default=True, description="Verify the upstream server certificate") 

738 

739 @classmethod 

740 def from_env(cls) -> Optional["GRPCClientTLSConfig"]: 

741 """Construct gRPC client TLS configuration from PLUGINS_GRPC_CLIENT_* environment variables. 

742 

743 Returns: 

744 GRPCClientTLSConfig instance or None if no environment variables are set. 

745 """ 

746 env = os.environ 

747 data: dict[str, Any] = {} 

748 

749 if env.get("PLUGINS_GRPC_CLIENT_MTLS_CERTFILE"): 

750 data["certfile"] = env["PLUGINS_GRPC_CLIENT_MTLS_CERTFILE"] 

751 if env.get("PLUGINS_GRPC_CLIENT_MTLS_KEYFILE"): 

752 data["keyfile"] = env["PLUGINS_GRPC_CLIENT_MTLS_KEYFILE"] 

753 if env.get("PLUGINS_GRPC_CLIENT_MTLS_CA_BUNDLE"): 

754 data["ca_bundle"] = env["PLUGINS_GRPC_CLIENT_MTLS_CA_BUNDLE"] 

755 if env.get("PLUGINS_GRPC_CLIENT_MTLS_KEYFILE_PASSWORD") is not None: 

756 data["keyfile_password"] = env["PLUGINS_GRPC_CLIENT_MTLS_KEYFILE_PASSWORD"] 

757 

758 verify_val = cls._parse_bool(env.get("PLUGINS_GRPC_CLIENT_MTLS_VERIFY")) 

759 if verify_val is not None: 

760 data["verify"] = verify_val 

761 

762 if not data: 

763 return None 

764 

765 return cls(**data) 

766 

767 

768class GRPCServerTLSConfig(MCPTransportTLSConfigBase): 

769 """Server-side gRPC TLS configuration (plugin accepting gateway connections). 

770 

771 Attributes: 

772 client_auth (str): Client certificate requirement ('none', 'optional', 'require'). 

773 """ 

774 

775 client_auth: str = Field(default="require", description="Client certificate requirement (none, optional, require)") 

776 

777 @field_validator("client_auth", mode="after") 

778 @classmethod 

779 def validate_client_auth(cls, value: str) -> str: 

780 """Validate client_auth value. 

781 

782 Args: 

783 value: Client auth requirement string. 

784 

785 Returns: 

786 Validated client auth string. 

787 

788 Raises: 

789 ValueError: If client_auth is not a valid value. 

790 """ 

791 valid_values = {"none", "optional", "require"} 

792 if value.lower() not in valid_values: 

793 raise ValueError(f"client_auth must be one of {valid_values}, got '{value}'") 

794 return value.lower() 

795 

796 @classmethod 

797 def from_env(cls) -> Optional["GRPCServerTLSConfig"]: 

798 """Construct gRPC server TLS configuration from PLUGINS_GRPC_SERVER_SSL_* environment variables. 

799 

800 Returns: 

801 GRPCServerTLSConfig instance or None if no environment variables are set. 

802 """ 

803 env = os.environ 

804 data: dict[str, Any] = {} 

805 

806 if env.get("PLUGINS_GRPC_SERVER_SSL_KEYFILE"): 

807 data["keyfile"] = env["PLUGINS_GRPC_SERVER_SSL_KEYFILE"] 

808 if env.get("PLUGINS_GRPC_SERVER_SSL_CERTFILE"): 

809 data["certfile"] = env["PLUGINS_GRPC_SERVER_SSL_CERTFILE"] 

810 if env.get("PLUGINS_GRPC_SERVER_SSL_CA_CERTS"): 

811 data["ca_bundle"] = env["PLUGINS_GRPC_SERVER_SSL_CA_CERTS"] 

812 if env.get("PLUGINS_GRPC_SERVER_SSL_KEYFILE_PASSWORD") is not None: 

813 data["keyfile_password"] = env["PLUGINS_GRPC_SERVER_SSL_KEYFILE_PASSWORD"] 

814 if env.get("PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH"): 

815 data["client_auth"] = env["PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH"] 

816 

817 if not data: 

818 return None 

819 

820 return cls(**data) 

821 

822 

823class GRPCClientConfig(BaseModel): 

824 """Client-side gRPC configuration (gateway connecting to external plugin). 

825 

826 Attributes: 

827 target (Optional[str]): The gRPC target address in host:port format. 

828 uds (Optional[str]): Unix domain socket path (alternative to target). 

829 tls (Optional[GRPCClientTLSConfig]): Client-side TLS configuration for mTLS. 

830 

831 Examples: 

832 >>> # TCP connection 

833 >>> config = GRPCClientConfig(target="localhost:50051") 

834 >>> config.get_target() 

835 'localhost:50051' 

836 >>> # Unix domain socket connection (path is resolved to canonical form) 

837 >>> config = GRPCClientConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP 

838 >>> config.get_target() # doctest: +SKIP 

839 'unix:///tmp/grpc-plugin.sock' 

840 """ 

841 

842 target: Optional[str] = Field(default=None, description="gRPC target address (host:port)") 

843 uds: Optional[str] = Field(default=None, description="Unix domain socket path") 

844 tls: Optional[GRPCClientTLSConfig] = None 

845 

846 @field_validator("target", mode="after") 

847 @classmethod 

848 def validate_target(cls, target: str | None) -> str | None: 

849 """Validate gRPC target address format. 

850 

851 Args: 

852 target: The target address to validate. 

853 

854 Returns: 

855 The validated target address. 

856 

857 Raises: 

858 ValueError: If target is not in host:port format. 

859 """ 

860 if target is None: 

861 return target 

862 if not target: 

863 raise ValueError("gRPC target address cannot be empty") 

864 # Basic validation - should contain host and port 

865 if ":" not in target: 

866 raise ValueError(f"gRPC target must be in host:port format, got '{target}'") 

867 return target 

868 

869 @field_validator("uds", mode="after") 

870 @classmethod 

871 def validate_uds(cls, uds: str | None) -> str | None: 

872 """Validate Unix domain socket path for gRPC. 

873 

874 Args: 

875 uds: Unix domain socket path. 

876 

877 Returns: 

878 The validated canonical uds path or None if none is set. 

879 

880 Raises: 

881 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

882 """ 

883 if uds is None: 

884 return uds 

885 if not isinstance(uds, str) or not uds.strip(): 

886 raise ValueError("gRPC client uds must be a non-empty string.") 

887 

888 uds_path = Path(uds).expanduser().resolve() 

889 if not uds_path.is_absolute(): 

890 raise ValueError(f"gRPC client uds path must be absolute: {uds}") 

891 

892 parent_dir = uds_path.parent 

893 if not parent_dir.is_dir(): 

894 raise ValueError(f"gRPC client uds parent directory does not exist: {parent_dir}") 

895 

896 # Check parent directory permissions for security 

897 try: 

898 parent_mode = parent_dir.stat().st_mode 

899 if parent_mode & 0o002: 

900 logging.getLogger(__name__).warning( 

901 "gRPC client uds parent directory %s is world-writable. Consider using a directory with restricted permissions.", 

902 parent_dir, 

903 ) 

904 except OSError: 

905 pass 

906 

907 return str(uds_path) 

908 

909 @model_validator(mode="after") 

910 def validate_target_or_uds(self) -> Self: # pylint: disable=bad-classmethod-argument 

911 """Ensure exactly one of target or uds is configured. 

912 

913 Returns: 

914 Self after validation. 

915 

916 Raises: 

917 ValueError: If neither or both target and uds are set. 

918 """ 

919 has_target = self.target is not None 

920 has_uds = self.uds is not None 

921 

922 if not has_target and not has_uds: 

923 raise ValueError("gRPC client must have either 'target' or 'uds' configured") 

924 if has_target and has_uds: 

925 raise ValueError("gRPC client cannot have both 'target' and 'uds' configured") 

926 if has_uds and self.tls: 

927 raise ValueError("TLS configuration is not supported for Unix domain sockets") 

928 return self 

929 

930 def get_target(self) -> str: 

931 """Get the gRPC target string for channel creation. 

932 

933 Returns: 

934 str: The target string, either host:port or unix:///path format. 

935 """ 

936 if self.uds: 

937 return f"unix://{self.uds}" 

938 return self.target or "" 

939 

940 

941class GRPCServerConfig(BaseModel): 

942 """Server-side gRPC configuration (plugin running as gRPC server). 

943 

944 Attributes: 

945 host (str): Server host to bind to. 

946 port (int): Server port to bind to. 

947 uds (Optional[str]): Unix domain socket path (alternative to host:port). 

948 tls (Optional[GRPCServerTLSConfig]): Server-side TLS configuration. 

949 

950 Examples: 

951 >>> # TCP binding 

952 >>> config = GRPCServerConfig(host="0.0.0.0", port=50051) 

953 >>> config.get_bind_address() 

954 '0.0.0.0:50051' 

955 >>> # Unix domain socket binding (path is resolved to canonical form) 

956 >>> config = GRPCServerConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP 

957 >>> config.get_bind_address() # doctest: +SKIP 

958 'unix:///tmp/grpc-plugin.sock' 

959 """ 

960 

961 host: str = Field(default="127.0.0.1", description="Server host to bind to") 

962 port: int = Field(default=50051, description="Server port to bind to") 

963 uds: Optional[str] = Field(default=None, description="Unix domain socket path") 

964 tls: Optional[GRPCServerTLSConfig] = Field(default=None, description="Server-side TLS configuration") 

965 

966 @field_validator("uds", mode="after") 

967 @classmethod 

968 def validate_uds(cls, uds: str | None) -> str | None: 

969 """Validate Unix domain socket path for gRPC server. 

970 

971 Args: 

972 uds: Unix domain socket path. 

973 

974 Returns: 

975 The validated canonical uds path or None if none is set. 

976 

977 Raises: 

978 ValueError: if uds is empty, not absolute, or parent directory is invalid. 

979 """ 

980 if uds is None: 

981 return uds 

982 if not isinstance(uds, str) or not uds.strip(): 

983 raise ValueError("gRPC server uds must be a non-empty string.") 

984 

985 uds_path = Path(uds).expanduser().resolve() 

986 if not uds_path.is_absolute(): 

987 raise ValueError(f"gRPC server uds path must be absolute: {uds}") 

988 

989 parent_dir = uds_path.parent 

990 if not parent_dir.is_dir(): 

991 raise ValueError(f"gRPC server uds parent directory does not exist: {parent_dir}") 

992 

993 # Check parent directory permissions for security 

994 try: 

995 parent_mode = parent_dir.stat().st_mode 

996 if parent_mode & 0o002: 

997 logging.getLogger(__name__).warning( 

998 "gRPC server uds parent directory %s is world-writable. Consider using a directory with restricted permissions.", 

999 parent_dir, 

1000 ) 

1001 except OSError: 

1002 pass 

1003 

1004 return str(uds_path) 

1005 

1006 @model_validator(mode="after") 

1007 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument 

1008 """Ensure TLS is not configured when using a Unix domain socket. 

1009 

1010 Returns: 

1011 Self after validation. 

1012 

1013 Raises: 

1014 ValueError: if tls is set with uds. 

1015 """ 

1016 if self.uds and self.tls: 

1017 raise ValueError("TLS configuration is not supported for Unix domain sockets") 

1018 return self 

1019 

1020 def get_bind_address(self) -> str: 

1021 """Get the gRPC bind address string. 

1022 

1023 Returns: 

1024 str: The bind address, either host:port or unix:///path format. 

1025 """ 

1026 if self.uds: 

1027 return f"unix://{self.uds}" 

1028 return f"{self.host}:{self.port}" 

1029 

1030 @classmethod 

1031 def from_env(cls) -> Optional["GRPCServerConfig"]: 

1032 """Construct gRPC server configuration from PLUGINS_GRPC_SERVER_* environment variables. 

1033 

1034 Returns: 

1035 GRPCServerConfig instance or None if no environment variables are set. 

1036 

1037 Raises: 

1038 ValueError: If PLUGINS_GRPC_SERVER_PORT is not a valid integer. 

1039 """ 

1040 env = os.environ 

1041 data: dict[str, Any] = {} 

1042 

1043 if env.get("PLUGINS_GRPC_SERVER_HOST"): 

1044 data["host"] = env["PLUGINS_GRPC_SERVER_HOST"] 

1045 if env.get("PLUGINS_GRPC_SERVER_PORT"): 

1046 try: 

1047 data["port"] = int(env["PLUGINS_GRPC_SERVER_PORT"]) 

1048 except ValueError: 

1049 raise ValueError(f"Invalid PLUGINS_GRPC_SERVER_PORT: {env['PLUGINS_GRPC_SERVER_PORT']}") 

1050 if env.get("PLUGINS_GRPC_SERVER_UDS"): 

1051 data["uds"] = env["PLUGINS_GRPC_SERVER_UDS"] 

1052 

1053 # Check if SSL/TLS is enabled 

1054 ssl_enabled_str = env.get("PLUGINS_GRPC_SERVER_SSL_ENABLED", "").lower() 

1055 if ssl_enabled_str in {"1", "true", "yes", "on"}: 

1056 tls_config = GRPCServerTLSConfig.from_env() 

1057 if tls_config: 

1058 data["tls"] = tls_config 

1059 

1060 if not data: 

1061 return None 

1062 

1063 return cls(**data) 

1064 

1065 

1066class UnixSocketClientConfig(BaseModel): 

1067 """Client-side Unix socket configuration (gateway connecting to external plugin). 

1068 

1069 Attributes: 

1070 path (str): Path to the Unix domain socket file. 

1071 reconnect_attempts (int): Number of reconnection attempts on failure. 

1072 reconnect_delay (float): Base delay between reconnection attempts (with exponential backoff). 

1073 timeout (float): Timeout for read operations in seconds. 

1074 

1075 Examples: 

1076 >>> config = UnixSocketClientConfig(path="/tmp/plugin.sock") 

1077 >>> config.path 

1078 '/tmp/plugin.sock' 

1079 >>> config.reconnect_attempts 

1080 3 

1081 """ 

1082 

1083 path: str = Field(..., description="Path to the Unix domain socket") 

1084 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts") 

1085 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)") 

1086 timeout: float = Field(default=30.0, description="Read timeout in seconds") 

1087 

1088 @field_validator("path", mode="after") 

1089 @classmethod 

1090 def validate_path(cls, path: str) -> str: 

1091 """Validate Unix socket path. 

1092 

1093 Args: 

1094 path: The socket path to validate. 

1095 

1096 Returns: 

1097 The validated path. 

1098 

1099 Raises: 

1100 ValueError: If path is empty or invalid. 

1101 """ 

1102 if not path: 

1103 raise ValueError("Unix socket path cannot be empty") 

1104 if not path.startswith("/"): 

1105 raise ValueError(f"Unix socket path must be absolute, got '{path}'") 

1106 return path 

1107 

1108 

1109class UnixSocketServerConfig(BaseModel): 

1110 """Server-side Unix socket configuration (plugin running as Unix socket server). 

1111 

1112 Attributes: 

1113 path (str): Path to the Unix domain socket file. 

1114 

1115 Examples: 

1116 >>> config = UnixSocketServerConfig(path="/tmp/plugin.sock") 

1117 >>> config.path 

1118 '/tmp/plugin.sock' 

1119 """ 

1120 

1121 path: str = Field(default="/tmp/mcpgateway-plugins.sock", description="Path to the Unix domain socket") # nosec B108 - configurable default 

1122 

1123 @classmethod 

1124 def from_env(cls) -> Optional["UnixSocketServerConfig"]: 

1125 """Construct Unix socket server configuration from environment variables. 

1126 

1127 Returns: 

1128 UnixSocketServerConfig instance or None if no environment variables are set. 

1129 """ 

1130 env = os.environ 

1131 data: dict[str, Any] = {} 

1132 

1133 if env.get("UNIX_SOCKET_PATH"): 

1134 data["path"] = env["UNIX_SOCKET_PATH"] 

1135 

1136 if not data: 

1137 return None 

1138 

1139 return cls(**data) 

1140 

1141 

1142class PluginConfig(BaseModel): 

1143 """A plugin configuration. 

1144 

1145 Attributes: 

1146 name (str): The unique name of the plugin. 

1147 description (str): A description of the plugin. 

1148 author (str): The author of the plugin. 

1149 kind (str): The kind or type of plugin. Usually a fully qualified object type. 

1150 namespace (str): The namespace where the plugin resides. 

1151 version (str): version of the plugin. 

1152 hooks (list[str]): a list of the hook points where the plugin will be called. Default: []. 

1153 tags (list[str]): a list of tags for making the plugin searchable. 

1154 mode (bool): whether the plugin is active. 

1155 priority (int): indicates the order in which the plugin is run. Lower = higher priority. Default: 100. 

1156 conditions (Optional[list[PluginCondition]]): the conditions on which the plugin is run. 

1157 applied_to (Optional[list[AppliedTo]]): the tools, fields, that the plugin is applied to. 

1158 config (dict[str, Any]): the plugin specific configurations. 

1159 mcp (Optional[MCPClientConfig]): Client-side MCP configuration (gateway connecting to plugin). 

1160 grpc (Optional[GRPCClientConfig]): Client-side gRPC configuration (gateway connecting to plugin). 

1161 """ 

1162 

1163 name: str 

1164 description: Optional[str] = None 

1165 author: Optional[str] = None 

1166 kind: str 

1167 namespace: Optional[str] = None 

1168 version: Optional[str] = None 

1169 hooks: list[str] = Field(default_factory=list) 

1170 tags: list[str] = Field(default_factory=list) 

1171 mode: PluginMode = PluginMode.ENFORCE 

1172 priority: int = 100 # Lower = higher priority 

1173 conditions: list[PluginCondition] = Field(default_factory=list) # When to apply 

1174 applied_to: Optional[AppliedTo] = None # Fields to apply to. 

1175 config: Optional[dict[str, Any]] = None 

1176 mcp: Optional[MCPClientConfig] = None 

1177 grpc: Optional[GRPCClientConfig] = None 

1178 unix_socket: Optional[UnixSocketClientConfig] = None 

1179 

1180 @model_validator(mode="after") 

1181 def check_url_or_script_filled(self) -> Self: # pylint: disable=bad-classmethod-argument 

1182 """Checks to see that at least one of url or script are set depending on MCP server configuration. 

1183 

1184 Raises: 

1185 ValueError: if the script/cmd attribute is not defined with STDIO set, or the URL not defined with HTTP transports. 

1186 

1187 Returns: 

1188 The model after validation. 

1189 """ 

1190 if not self.mcp: 

1191 return self 

1192 if self.mcp.proto == TransportType.STDIO and not (self.mcp.script or self.mcp.cmd): 

1193 raise ValueError(f"Plugin {self.name} has transport type set to STDIO but no script/cmd value") 

1194 if self.mcp.proto == TransportType.STDIO and self.mcp.script and self.mcp.cmd: 

1195 raise ValueError(f"Plugin {self.name} must set either script or cmd for STDIO, not both") 

1196 if self.mcp.proto in (TransportType.STREAMABLEHTTP, TransportType.SSE) and not self.mcp.url: 

1197 raise ValueError(f"Plugin {self.name} has transport type set to StreamableHTTP but no url value") 

1198 if self.mcp.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP, TransportType.STDIO): 

1199 raise ValueError(f"Plugin {self.name} must set transport type to either SSE or STREAMABLEHTTP or STDIO") 

1200 return self 

1201 

1202 @model_validator(mode="after") 

1203 def check_config_and_external(self, info: ValidationInfo) -> Self: # pylint: disable=bad-classmethod-argument 

1204 """Checks to see that a plugin's 'config' section is not defined if the kind is 'external'. This is because developers cannot override items in the plugin config section for external plugins. 

1205 

1206 Args: 

1207 info: the contextual information passed into the pydantic model during model validation. Used to determine validation sequence. 

1208 

1209 Raises: 

1210 ValueError: if the script attribute is not defined with STDIO set, or the URL not defined with HTTP transports. 

1211 

1212 Returns: 

1213 The model after validation. 

1214 """ 

1215 ignore_config_external = False 

1216 if info and info.context and IGNORE_CONFIG_EXTERNAL in info.context: 

1217 ignore_config_external = info.context[IGNORE_CONFIG_EXTERNAL] 

1218 

1219 if not ignore_config_external and self.config and self.kind == EXTERNAL_PLUGIN_TYPE: 

1220 raise ValueError(f"""Cannot have {self.name} plugin defined as 'external' with 'config' set.""" """ 'config' section settings can only be set on the plugin server.""") 

1221 

1222 # External plugins must have exactly one transport configured (mcp, grpc, or unix_socket) 

1223 if self.kind == EXTERNAL_PLUGIN_TYPE: 

1224 has_mcp = self.mcp is not None 

1225 has_grpc = self.grpc is not None 

1226 has_unix = self.unix_socket is not None 

1227 transport_count = sum([has_mcp, has_grpc, has_unix]) 

1228 

1229 if transport_count == 0: 

1230 raise ValueError(f"External plugin {self.name} must have 'mcp', 'grpc', or 'unix_socket' section configured") 

1231 if transport_count > 1: 

1232 raise ValueError(f"External plugin {self.name} can only have one transport configured (mcp, grpc, or unix_socket)") 

1233 

1234 return self 

1235 

1236 

1237class PluginManifest(BaseModel): 

1238 """Plugin manifest. 

1239 

1240 Attributes: 

1241 description (str): A description of the plugin. 

1242 author (str): The author of the plugin. 

1243 version (str): version of the plugin. 

1244 tags (list[str]): a list of tags for making the plugin searchable. 

1245 available_hooks (list[str]): a list of the hook points where the plugin is callable. 

1246 default_config (dict[str, Any]): the default configurations. 

1247 """ 

1248 

1249 description: str 

1250 author: str 

1251 version: str 

1252 tags: list[str] 

1253 available_hooks: list[str] 

1254 default_config: dict[str, Any] 

1255 

1256 

1257class PluginErrorModel(BaseModel): 

1258 """A plugin error, used to denote exceptions/errors inside external plugins. 

1259 

1260 Attributes: 

1261 message (str): the reason for the error. 

1262 code (str): an error code. 

1263 details: (dict[str, Any]): additional error details. 

1264 plugin_name (str): the plugin name. 

1265 mcp_error_code ([int]): The MCP error code passed back to the client. Defaults to Internal Error. 

1266 """ 

1267 

1268 message: str 

1269 plugin_name: str 

1270 code: Optional[str] = "" 

1271 details: Optional[dict[str, Any]] = Field(default_factory=dict) 

1272 mcp_error_code: int = -32603 

1273 

1274 

1275class PluginViolation(BaseModel): 

1276 """A plugin violation, used to denote policy violations. 

1277 

1278 Attributes: 

1279 reason (str): the reason for the violation. 

1280 description (str): a longer description of the violation. 

1281 code (str): a violation code. 

1282 details: (dict[str, Any]): additional violation details. 

1283 _plugin_name (str): the plugin name, private attribute set by the plugin manager. 

1284 mcp_error_code(Optional[int]): A valid mcp error code which will be sent back to the client if plugin enabled. 

1285 

1286 Examples: 

1287 >>> violation = PluginViolation( 

1288 ... reason="Invalid input", 

1289 ... description="The input contains prohibited content", 

1290 ... code="PROHIBITED_CONTENT", 

1291 ... details={"field": "message", "value": "test"} 

1292 ... ) 

1293 >>> violation.reason 

1294 'Invalid input' 

1295 >>> violation.code 

1296 'PROHIBITED_CONTENT' 

1297 >>> violation.plugin_name = "content_filter" 

1298 >>> violation.plugin_name 

1299 'content_filter' 

1300 """ 

1301 

1302 reason: str 

1303 description: str 

1304 code: str 

1305 details: Optional[dict[str, Any]] = Field(default_factory=dict) 

1306 _plugin_name: str = PrivateAttr(default="") 

1307 mcp_error_code: Optional[int] = None 

1308 

1309 @property 

1310 def plugin_name(self) -> str: 

1311 """Getter for the plugin name attribute. 

1312 

1313 Returns: 

1314 The plugin name associated with the violation. 

1315 """ 

1316 return self._plugin_name 

1317 

1318 @plugin_name.setter 

1319 def plugin_name(self, name: str) -> None: 

1320 """Setter for the plugin_name attribute. 

1321 

1322 Args: 

1323 name: the plugin name. 

1324 

1325 Raises: 

1326 ValueError: if name is empty or not a string. 

1327 """ 

1328 if not isinstance(name, str) or not name.strip(): 

1329 raise ValueError("Name must be a non-empty string.") 

1330 self._plugin_name = name 

1331 

1332 

1333class PluginSettings(BaseModel): 

1334 """Global plugin settings. 

1335 

1336 Attributes: 

1337 parallel_execution_within_band (bool): execute plugins with same priority in parallel. 

1338 plugin_timeout (int): timeout value for plugins operations. 

1339 fail_on_plugin_error (bool): error when there is a plugin connectivity or ignore. 

1340 enable_plugin_api (bool): enable or disable plugins globally. 

1341 plugin_health_check_interval (int): health check interval check. 

1342 include_user_info (bool): if enabled user info is injected in plugin context 

1343 """ 

1344 

1345 parallel_execution_within_band: bool = False 

1346 plugin_timeout: int = 30 

1347 fail_on_plugin_error: bool = False 

1348 enable_plugin_api: bool = False 

1349 plugin_health_check_interval: int = 60 

1350 include_user_info: bool = False 

1351 

1352 

1353class Config(BaseModel): 

1354 """Configurations for plugins. 

1355 

1356 Attributes: 

1357 plugins (Optional[list[PluginConfig]]): the list of plugins to enable. 

1358 plugin_dirs (list[str]): The directories in which to look for plugins. 

1359 plugin_settings (PluginSettings): global settings for plugins. 

1360 server_settings (Optional[MCPServerConfig]): Server-side MCP configuration (when plugins run as server). 

1361 grpc_server_settings (Optional[GRPCServerConfig]): Server-side gRPC configuration (when plugins run as gRPC server). 

1362 unix_socket_server_settings (Optional[UnixSocketServerConfig]): Server-side Unix socket configuration. 

1363 """ 

1364 

1365 plugins: Optional[list[PluginConfig]] = [] 

1366 plugin_dirs: list[str] = [] 

1367 plugin_settings: PluginSettings 

1368 server_settings: Optional[MCPServerConfig] = None 

1369 grpc_server_settings: Optional[GRPCServerConfig] = None 

1370 unix_socket_server_settings: Optional[UnixSocketServerConfig] = None 

1371 

1372 

1373class PluginResult(BaseModel, Generic[T]): 

1374 """A result of the plugin hook processing. The actual type is dependent on the hook. 

1375 

1376 Attributes: 

1377 continue_processing (bool): Whether to stop processing. 

1378 modified_payload (Optional[Any]): The modified payload if the plugin is a transformer. 

1379 violation (Optional[PluginViolation]): violation object. 

1380 metadata (Optional[dict[str, Any]]): additional metadata. 

1381 

1382 Examples: 

1383 >>> result = PluginResult() 

1384 >>> result.continue_processing 

1385 True 

1386 >>> result.metadata 

1387 {} 

1388 >>> from mcpgateway.plugins.framework import PluginViolation 

1389 >>> violation = PluginViolation( 

1390 ... reason="Test", description="Test desc", code="TEST", details={} 

1391 ... ) 

1392 >>> result2 = PluginResult(continue_processing=False, violation=violation) 

1393 >>> result2.continue_processing 

1394 False 

1395 >>> result2.violation.code 

1396 'TEST' 

1397 >>> r = PluginResult(metadata={"key": "value"}) 

1398 >>> r.metadata["key"] 

1399 'value' 

1400 >>> r2 = PluginResult(continue_processing=False) 

1401 >>> r2.continue_processing 

1402 False 

1403 """ 

1404 

1405 continue_processing: bool = True 

1406 modified_payload: Optional[T] = None 

1407 violation: Optional[PluginViolation] = None 

1408 metadata: Optional[dict[str, Any]] = Field(default_factory=dict) 

1409 

1410 

1411class GlobalContext(BaseModel): 

1412 """The global context, which shared across all plugins. 

1413 

1414 Attributes: 

1415 request_id (str): ID of the HTTP request. 

1416 user (str): user ID associated with the request. 

1417 tenant_id (str): tenant ID. 

1418 server_id (str): server ID. 

1419 metadata (Optional[dict[str,Any]]): a global shared metadata across plugins (Read-only from plugin's perspective). 

1420 state (Optional[dict[str,Any]]): a global shared state across plugins. 

1421 

1422 Examples: 

1423 >>> ctx = GlobalContext(request_id="req-123") 

1424 >>> ctx.request_id 

1425 'req-123' 

1426 >>> ctx.user is None 

1427 True 

1428 >>> ctx2 = GlobalContext(request_id="req-456", user="alice", tenant_id="tenant1") 

1429 >>> ctx2.user 

1430 'alice' 

1431 >>> ctx2.tenant_id 

1432 'tenant1' 

1433 >>> c = GlobalContext(request_id="123", server_id="srv1") 

1434 >>> c.request_id 

1435 '123' 

1436 >>> c.server_id 

1437 'srv1' 

1438 """ 

1439 

1440 request_id: str 

1441 user: Optional[Union[str, dict[str, Any]]] = None 

1442 tenant_id: Optional[str] = None 

1443 server_id: Optional[str] = None 

1444 state: dict[str, Any] = Field(default_factory=dict) 

1445 metadata: dict[str, Any] = Field(default_factory=dict) 

1446 

1447 

1448class PluginContext(BaseModel): 

1449 """The plugin's context, which lasts a request lifecycle. 

1450 

1451 Attributes: 

1452 state: the inmemory state of the request. 

1453 global_context: the context that is shared across plugins. 

1454 metadata: plugin meta data. 

1455 

1456 Examples: 

1457 >>> gctx = GlobalContext(request_id="req-123") 

1458 >>> ctx = PluginContext(global_context=gctx) 

1459 >>> ctx.global_context.request_id 

1460 'req-123' 

1461 >>> ctx.global_context.user is None 

1462 True 

1463 >>> ctx.state["somekey"] = "some value" 

1464 >>> ctx.state["somekey"] 

1465 'some value' 

1466 """ 

1467 

1468 state: dict[str, Any] = Field(default_factory=dict) 

1469 global_context: GlobalContext 

1470 metadata: dict[str, Any] = Field(default_factory=dict) 

1471 

1472 def get_state(self, key: str, default: Any = None) -> Any: 

1473 """Get value from shared state. 

1474 

1475 Args: 

1476 key: The key to access the shared state. 

1477 default: A default value if one doesn't exist. 

1478 

1479 Returns: 

1480 The state value. 

1481 """ 

1482 return self.state.get(key, default) 

1483 

1484 def set_state(self, key: str, value: Any) -> None: 

1485 """Set value in shared state. 

1486 

1487 Args: 

1488 key: the key to add to the state. 

1489 value: the value to add to the state. 

1490 """ 

1491 self.state[key] = value 

1492 

1493 async def cleanup(self) -> None: 

1494 """Cleanup context resources.""" 

1495 self.state.clear() 

1496 self.metadata.clear() 

1497 

1498 def is_empty(self) -> bool: 

1499 """Check whether the state and metadata objects are empty. 

1500 

1501 Returns: 

1502 True if the context state and metadata are empty. 

1503 """ 

1504 return not (self.state or self.metadata or self.global_context.state) 

1505 

1506 

1507PluginContextTable = dict[str, PluginContext] 

1508 

1509PluginPayload: TypeAlias = BaseModel