Coverage for mcpgateway / plugins / framework / models.py: 100%
572 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/models.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor, Mihai Criveti
7Pydantic models for plugins.
8This module implements the pydantic models associated with
9the base plugin layer including configurations, and contexts.
10"""
12# Standard
13from enum import Enum
14import logging
15import os
16from pathlib import Path
17from typing import Any, Generic, Optional, Self, TypeAlias, TypeVar, Union
19# Third-Party
20from pydantic import BaseModel, Field, field_serializer, field_validator, model_validator, PrivateAttr, ValidationInfo
22# First-Party
23from mcpgateway.common.models import TransportType
24from mcpgateway.common.validators import SecurityValidator
25from mcpgateway.plugins.framework.constants import CMD, CWD, ENV, EXTERNAL_PLUGIN_TYPE, IGNORE_CONFIG_EXTERNAL, PYTHON_SUFFIX, SCRIPT, UDS, URL
27T = TypeVar("T")
30class PluginMode(str, Enum):
31 """Plugin modes of operation.
33 Attributes:
34 enforce: enforces the plugin result, and blocks execution when there is an error.
35 enforce_ignore_error: enforces the plugin result, but allows execution when there is an error.
36 permissive: audits the result.
37 disabled: plugin disabled.
39 Examples:
40 >>> PluginMode.ENFORCE
41 <PluginMode.ENFORCE: 'enforce'>
42 >>> PluginMode.ENFORCE_IGNORE_ERROR
43 <PluginMode.ENFORCE_IGNORE_ERROR: 'enforce_ignore_error'>
44 >>> PluginMode.PERMISSIVE.value
45 'permissive'
46 >>> PluginMode('disabled')
47 <PluginMode.DISABLED: 'disabled'>
48 >>> 'enforce' in [m.value for m in PluginMode]
49 True
50 """
52 ENFORCE = "enforce"
53 ENFORCE_IGNORE_ERROR = "enforce_ignore_error"
54 PERMISSIVE = "permissive"
55 DISABLED = "disabled"
58class BaseTemplate(BaseModel):
59 """Base Template.The ToolTemplate, PromptTemplate and ResourceTemplate could be extended using this
61 Attributes:
62 context (Optional[list[str]]): specifies the keys of context to be extracted. The context could be global (shared between the plugins) or
63 local (shared within the plugin). Example: global.key1.
64 extensions (Optional[dict[str, Any]]): add custom keys for your specific plugin. Example - 'policy'
65 key for opa plugin.
67 Examples:
68 >>> base = BaseTemplate(context=["global.key1.key2", "local.key1.key2"])
69 >>> base.context
70 ['global.key1.key2', 'local.key1.key2']
71 >>> base = BaseTemplate(context=["global.key1.key2"], extensions={"policy" : "sample policy"})
72 >>> base.extensions
73 {'policy': 'sample policy'}
74 """
76 context: Optional[list[str]] = None
77 extensions: Optional[dict[str, Any]] = None
80class ToolTemplate(BaseTemplate):
81 """Tool Template.
83 Attributes:
84 tool_name (str): the name of the tool.
85 fields (Optional[list[str]]): the tool fields that are affected.
86 result (bool): analyze tool output if true.
88 Examples:
89 >>> tool = ToolTemplate(tool_name="my_tool")
90 >>> tool.tool_name
91 'my_tool'
92 >>> tool.result
93 False
94 >>> tool2 = ToolTemplate(tool_name="analyzer", fields=["input", "params"], result=True)
95 >>> tool2.fields
96 ['input', 'params']
97 >>> tool2.result
98 True
99 """
101 tool_name: str
102 fields: Optional[list[str]] = None
103 result: bool = False
106class PromptTemplate(BaseTemplate):
107 """Prompt Template.
109 Attributes:
110 prompt_name (str): the name of the prompt.
111 fields (Optional[list[str]]): the prompt fields that are affected.
112 result (bool): analyze tool output if true.
114 Examples:
115 >>> prompt = PromptTemplate(prompt_name="greeting")
116 >>> prompt.prompt_name
117 'greeting'
118 >>> prompt.result
119 False
120 >>> prompt2 = PromptTemplate(prompt_name="question", fields=["context"], result=True)
121 >>> prompt2.fields
122 ['context']
123 """
125 prompt_name: str
126 fields: Optional[list[str]] = None
127 result: bool = False
130class ResourceTemplate(BaseTemplate):
131 """Resource Template.
133 Attributes:
134 resource_uri (str): the URI of the resource.
135 fields (Optional[list[str]]): the resource fields that are affected.
136 result (bool): analyze resource output if true.
138 Examples:
139 >>> resource = ResourceTemplate(resource_uri="file:///data.txt")
140 >>> resource.resource_uri
141 'file:///data.txt'
142 >>> resource.result
143 False
144 >>> resource2 = ResourceTemplate(resource_uri="http://api/data", fields=["content"], result=True)
145 >>> resource2.fields
146 ['content']
147 """
149 resource_uri: str
150 fields: Optional[list[str]] = None
151 result: bool = False
154class PluginCondition(BaseModel):
155 """Conditions for when plugin should execute.
157 Attributes:
158 server_ids (Optional[set[str]]): set of server ids.
159 tenant_ids (Optional[set[str]]): set of tenant ids.
160 tools (Optional[set[str]]): set of tool names.
161 prompts (Optional[set[str]]): set of prompt names.
162 resources (Optional[set[str]]): set of resource URIs.
163 agents (Optional[set[str]]): set of agent IDs.
164 user_pattern (Optional[list[str]]): list of user patterns.
165 content_types (Optional[list[str]]): list of content types.
167 Examples:
168 >>> cond = PluginCondition(server_ids={"server1", "server2"})
169 >>> "server1" in cond.server_ids
170 True
171 >>> cond2 = PluginCondition(tools={"tool1"}, prompts={"prompt1"})
172 >>> cond2.tools
173 {'tool1'}
174 >>> cond3 = PluginCondition(user_patterns=["admin", "root"])
175 >>> len(cond3.user_patterns)
176 2
177 """
179 server_ids: Optional[set[str]] = None
180 tenant_ids: Optional[set[str]] = None
181 tools: Optional[set[str]] = None
182 prompts: Optional[set[str]] = None
183 resources: Optional[set[str]] = None
184 agents: Optional[set[str]] = None
185 user_patterns: Optional[list[str]] = None
186 content_types: Optional[list[str]] = None
188 @field_serializer("server_ids", "tenant_ids", "tools", "prompts", "resources", "agents")
189 def serialize_set(self, value: set[str] | None) -> list[str] | None:
190 """Serialize set objects in PluginCondition for MCP.
192 Args:
193 value: a set of server ids, tenant ids, tools or prompts.
195 Returns:
196 The set as a serializable list.
197 """
198 if value:
199 values = []
200 for key in value:
201 values.append(key)
202 return values
203 return None
206class AppliedTo(BaseModel):
207 """What tools/prompts/resources and fields the plugin will be applied to.
209 Attributes:
210 tools (Optional[list[ToolTemplate]]): tools and fields to be applied.
211 prompts (Optional[list[PromptTemplate]]): prompts and fields to be applied.
212 resources (Optional[list[ResourceTemplate]]): resources and fields to be applied.
213 global_context (Optional[list[str]]): keys in the context to be applied on globally
214 local_context(Optional[list[str]]): keys in the context to be applied on locally
215 """
217 tools: Optional[list[ToolTemplate]] = None
218 prompts: Optional[list[PromptTemplate]] = None
219 resources: Optional[list[ResourceTemplate]] = None
222class MCPTransportTLSConfigBase(BaseModel):
223 """Base TLS configuration with common fields for both client and server.
225 Attributes:
226 certfile (Optional[str]): Path to the PEM-encoded certificate file.
227 keyfile (Optional[str]): Path to the PEM-encoded private key file.
228 ca_bundle (Optional[str]): Path to a CA bundle file for verification.
229 keyfile_password (Optional[str]): Optional password for encrypted private key.
230 """
232 certfile: Optional[str] = Field(default=None, description="Path to PEM certificate file")
233 keyfile: Optional[str] = Field(default=None, description="Path to PEM private key file")
234 ca_bundle: Optional[str] = Field(default=None, description="Path to CA bundle for verification")
235 keyfile_password: Optional[str] = Field(default=None, description="Password for encrypted private key")
237 @field_validator("ca_bundle", "certfile", "keyfile", mode="after")
238 @classmethod
239 def validate_path(cls, value: Optional[str]) -> Optional[str]:
240 """Expand and validate file paths supplied in TLS configuration.
242 Args:
243 value: File path to validate.
245 Returns:
246 Expanded file path or None if not provided.
248 Raises:
249 ValueError: If file path does not exist.
250 """
252 if not value:
253 return value
254 expanded = Path(value).expanduser()
255 if not expanded.is_file():
256 raise ValueError(f"TLS file path does not exist: {value}")
257 return str(expanded)
259 @model_validator(mode="after")
260 def validate_cert_key(self) -> Self: # pylint: disable=bad-classmethod-argument
261 """Ensure certificate and key options are consistent.
263 Returns:
264 Self after validation.
266 Raises:
267 ValueError: If keyfile is specified without certfile.
268 """
270 if self.keyfile and not self.certfile:
271 raise ValueError("keyfile requires certfile to be specified")
272 return self
274 @staticmethod
275 def _parse_bool(value: Optional[str]) -> Optional[bool]:
276 """Convert a string environment value to boolean.
278 Args:
279 value: String value to parse as boolean.
281 Returns:
282 Boolean value or None if value is None.
284 Raises:
285 ValueError: If value is not a valid boolean string.
286 """
288 if value is None:
289 return None
290 normalized = value.strip().lower()
291 if normalized in {"1", "true", "yes", "on"}:
292 return True
293 if normalized in {"0", "false", "no", "off"}:
294 return False
295 raise ValueError(f"Invalid boolean value: {value}")
298class MCPClientTLSConfig(MCPTransportTLSConfigBase):
299 """Client-side TLS configuration (gateway connecting to plugin).
301 Attributes:
302 verify (bool): Whether to verify the remote server certificate.
303 check_hostname (bool): Enable hostname verification when verify is true.
304 """
306 verify: bool = Field(default=True, description="Verify the upstream server certificate")
307 check_hostname: bool = Field(default=True, description="Enable hostname verification")
309 @classmethod
310 def from_env(cls) -> Optional["MCPClientTLSConfig"]:
311 """Construct client TLS configuration from PLUGINS_CLIENT_* environment variables.
313 Returns:
314 MCPClientTLSConfig instance or None if no environment variables are set.
315 """
317 env = os.environ
318 data: dict[str, Any] = {}
320 if env.get("PLUGINS_CLIENT_MTLS_CERTFILE"):
321 data["certfile"] = env["PLUGINS_CLIENT_MTLS_CERTFILE"]
322 if env.get("PLUGINS_CLIENT_MTLS_KEYFILE"):
323 data["keyfile"] = env["PLUGINS_CLIENT_MTLS_KEYFILE"]
324 if env.get("PLUGINS_CLIENT_MTLS_CA_BUNDLE"):
325 data["ca_bundle"] = env["PLUGINS_CLIENT_MTLS_CA_BUNDLE"]
326 if env.get("PLUGINS_CLIENT_MTLS_KEYFILE_PASSWORD") is not None:
327 data["keyfile_password"] = env["PLUGINS_CLIENT_MTLS_KEYFILE_PASSWORD"]
329 verify_val = cls._parse_bool(env.get("PLUGINS_CLIENT_MTLS_VERIFY"))
330 if verify_val is not None:
331 data["verify"] = verify_val
333 check_hostname_val = cls._parse_bool(env.get("PLUGINS_CLIENT_MTLS_CHECK_HOSTNAME"))
334 if check_hostname_val is not None:
335 data["check_hostname"] = check_hostname_val
337 if not data:
338 return None
340 return cls(**data)
343class MCPServerTLSConfig(MCPTransportTLSConfigBase):
344 """Server-side TLS configuration (plugin accepting gateway connections).
346 Attributes:
347 ssl_cert_reqs (int): Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED).
348 """
350 ssl_cert_reqs: int = Field(default=2, description="Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED)")
352 @classmethod
353 def from_env(cls) -> Optional["MCPServerTLSConfig"]:
354 """Construct server TLS configuration from PLUGINS_SERVER_SSL_* environment variables.
356 Returns:
357 MCPServerTLSConfig instance or None if no environment variables are set.
359 Raises:
360 ValueError: If PLUGINS_SERVER_SSL_CERT_REQS is not a valid integer.
361 """
363 env = os.environ
364 data: dict[str, Any] = {}
366 if env.get("PLUGINS_SERVER_SSL_KEYFILE"):
367 data["keyfile"] = env["PLUGINS_SERVER_SSL_KEYFILE"]
368 if env.get("PLUGINS_SERVER_SSL_CERTFILE"):
369 data["certfile"] = env["PLUGINS_SERVER_SSL_CERTFILE"]
370 if env.get("PLUGINS_SERVER_SSL_CA_CERTS"):
371 data["ca_bundle"] = env["PLUGINS_SERVER_SSL_CA_CERTS"]
372 if env.get("PLUGINS_SERVER_SSL_KEYFILE_PASSWORD") is not None:
373 data["keyfile_password"] = env["PLUGINS_SERVER_SSL_KEYFILE_PASSWORD"]
375 if env.get("PLUGINS_SERVER_SSL_CERT_REQS"):
376 try:
377 data["ssl_cert_reqs"] = int(env["PLUGINS_SERVER_SSL_CERT_REQS"])
378 except ValueError:
379 raise ValueError(f"Invalid PLUGINS_SERVER_SSL_CERT_REQS: {env['PLUGINS_SERVER_SSL_CERT_REQS']}")
381 if not data:
382 return None
384 return cls(**data)
387class MCPServerConfig(BaseModel):
388 """Server-side MCP configuration (plugin running as server).
390 Attributes:
391 host (str): Server host to bind to.
392 port (int): Server port to bind to.
393 uds (Optional[str]): Unix domain socket path for streamable HTTP.
394 tls (Optional[MCPServerTLSConfig]): Server-side TLS configuration.
395 """
397 host: str = Field(default="127.0.0.1", description="Server host to bind to")
398 port: int = Field(default=8000, description="Server port to bind to")
399 uds: Optional[str] = Field(default=None, description="Unix domain socket path for streamable HTTP")
400 tls: Optional[MCPServerTLSConfig] = Field(default=None, description="Server-side TLS configuration")
402 @field_validator("uds", mode="after")
403 @classmethod
404 def validate_uds(cls, uds: str | None) -> str | None:
405 """Validate the Unix domain socket path for security.
407 Args:
408 uds: Unix domain socket path.
410 Returns:
411 The validated canonical uds path or None if none is set.
413 Raises:
414 ValueError: if uds is empty, not absolute, or parent directory is invalid.
415 """
416 if uds is None:
417 return uds
418 if not isinstance(uds, str) or not uds.strip():
419 raise ValueError("MCP server uds must be a non-empty string.")
421 uds_path = Path(uds).expanduser().resolve()
422 if not uds_path.is_absolute():
423 raise ValueError(f"MCP server uds path must be absolute: {uds}")
425 parent_dir = uds_path.parent
426 if not parent_dir.is_dir():
427 raise ValueError(f"MCP server uds parent directory does not exist: {parent_dir}")
429 # Check parent directory permissions for security
430 try:
431 parent_mode = parent_dir.stat().st_mode
432 # Warn if parent directory is world-writable (o+w = 0o002)
433 if parent_mode & 0o002:
434 logging.getLogger(__name__).warning(
435 "MCP server uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).",
436 parent_dir,
437 )
438 except OSError:
439 pass # Best effort - continue if we can't check permissions
441 return str(uds_path)
443 @model_validator(mode="after")
444 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument
445 """Ensure TLS is not configured when using a Unix domain socket.
447 Returns:
448 Self after validation.
450 Raises:
451 ValueError: if tls is set with uds.
452 """
453 if self.uds and self.tls:
454 raise ValueError("TLS configuration is not supported for Unix domain sockets.")
455 return self
457 @staticmethod
458 def _parse_bool(value: Optional[str]) -> Optional[bool]:
459 """Convert a string environment value to boolean.
461 Args:
462 value: String value to parse as boolean.
464 Returns:
465 Boolean value or None if value is None.
467 Raises:
468 ValueError: If value is not a valid boolean string.
469 """
471 if value is None:
472 return None
473 normalized = value.strip().lower()
474 if normalized in {"1", "true", "yes", "on"}:
475 return True
476 if normalized in {"0", "false", "no", "off"}:
477 return False
478 raise ValueError(f"Invalid boolean value: {value}")
480 @classmethod
481 def from_env(cls) -> Optional["MCPServerConfig"]:
482 """Construct server configuration from PLUGINS_SERVER_* environment variables.
484 Returns:
485 MCPServerConfig instance or None if no environment variables are set.
487 Raises:
488 ValueError: If PLUGINS_SERVER_PORT is not a valid integer.
489 """
491 env = os.environ
492 data: dict[str, Any] = {}
494 if env.get("PLUGINS_SERVER_HOST"):
495 data["host"] = env["PLUGINS_SERVER_HOST"]
496 if env.get("PLUGINS_SERVER_PORT"):
497 try:
498 data["port"] = int(env["PLUGINS_SERVER_PORT"])
499 except ValueError:
500 raise ValueError(f"Invalid PLUGINS_SERVER_PORT: {env['PLUGINS_SERVER_PORT']}")
501 if env.get("PLUGINS_SERVER_UDS"):
502 data["uds"] = env["PLUGINS_SERVER_UDS"]
504 # Check if SSL/TLS is enabled
505 ssl_enabled = cls._parse_bool(env.get("PLUGINS_SERVER_SSL_ENABLED"))
506 if ssl_enabled:
507 # Load TLS configuration
508 tls_config = MCPServerTLSConfig.from_env()
509 if tls_config:
510 data["tls"] = tls_config
512 if not data:
513 return None
515 return cls(**data)
518class MCPClientConfig(BaseModel):
519 """Client-side MCP configuration (gateway connecting to external plugin).
521 Attributes:
522 proto (TransportType): The MCP transport type. Can be SSE, STDIO, or STREAMABLEHTTP
523 url (Optional[str]): An MCP URL. Only valid when MCP transport type is SSE or STREAMABLEHTTP.
524 script (Optional[str]): The path and name to the STDIO script that runs the plugin server. Only valid for STDIO type.
525 cmd (Optional[list[str]]): Command + args used to start a STDIO MCP server. Only valid for STDIO type.
526 env (Optional[dict[str, str]]): Environment overrides for STDIO server process.
527 cwd (Optional[str]): Working directory for STDIO server process.
528 uds (Optional[str]): Unix domain socket path for streamable HTTP.
529 tls (Optional[MCPClientTLSConfig]): Client-side TLS configuration for mTLS.
530 """
532 proto: TransportType
533 url: Optional[str] = None
534 script: Optional[str] = None
535 cmd: Optional[list[str]] = None
536 env: Optional[dict[str, str]] = None
537 cwd: Optional[str] = None
538 uds: Optional[str] = None
539 tls: Optional[MCPClientTLSConfig] = None
541 @field_validator(URL, mode="after")
542 @classmethod
543 def validate_url(cls, url: str | None) -> str | None:
544 """Validate a MCP url for streamable HTTP connections.
546 Args:
547 url: the url to be validated.
549 Raises:
550 ValueError: if the URL fails validation.
552 Returns:
553 The validated URL or None if none is set.
554 """
555 if url:
556 result = SecurityValidator.validate_url(url)
557 return result
558 return url
560 @field_validator(SCRIPT, mode="after")
561 @classmethod
562 def validate_script(cls, script: str | None) -> str | None:
563 """Validate an MCP stdio script.
565 Args:
566 script: the script to be validated.
568 Raises:
569 ValueError: if the script doesn't exist or isn't executable when required.
571 Returns:
572 The validated string or None if none is set.
573 """
574 if script:
575 file_path = Path(script).expanduser()
576 # Allow relative paths; they are resolved at runtime (optionally using cwd).
577 if file_path.is_absolute():
578 if not file_path.is_file():
579 raise ValueError(f"MCP server script {script} does not exist.")
580 # Allow Python (.py) and shell scripts (.sh). Other files must be executable.
581 if file_path.suffix not in {PYTHON_SUFFIX, ".sh"} and not os.access(file_path, os.X_OK):
582 raise ValueError(f"MCP server script {script} must be executable.")
583 return script
585 @field_validator(CMD, mode="after")
586 @classmethod
587 def validate_cmd(cls, cmd: list[str] | None) -> list[str] | None:
588 """Validate an MCP stdio command.
590 Args:
591 cmd: the command to be validated.
593 Raises:
594 ValueError: if cmd is empty or contains empty values.
596 Returns:
597 The validated command list or None if none is set.
598 """
599 if cmd is None:
600 return cmd
601 if not isinstance(cmd, list) or not cmd:
602 raise ValueError("MCP stdio cmd must be a non-empty list.")
603 if not all(isinstance(part, str) and part.strip() for part in cmd):
604 raise ValueError("MCP stdio cmd entries must be non-empty strings.")
605 return cmd
607 @field_validator(ENV, mode="after")
608 @classmethod
609 def validate_env(cls, env: dict[str, str] | None) -> dict[str, str] | None:
610 """Validate environment overrides for MCP stdio.
612 Args:
613 env: Environment overrides to set for the stdio plugin process.
615 Returns:
616 The validated environment dict or None if none is set.
618 Raises:
619 ValueError: if keys/values are invalid or the dict is empty.
620 """
621 if env is None:
622 return env
623 if not isinstance(env, dict) or not env:
624 raise ValueError("MCP stdio env must be a non-empty dict.")
625 for key, value in env.items():
626 if not isinstance(key, str) or not key.strip():
627 raise ValueError("MCP stdio env keys must be non-empty strings.")
628 if not isinstance(value, str):
629 raise ValueError("MCP stdio env values must be strings.")
630 return env
632 @field_validator(CWD, mode="after")
633 @classmethod
634 def validate_cwd(cls, cwd: str | None) -> str | None:
635 """Validate the working directory for MCP stdio.
637 Args:
638 cwd: Working directory for the stdio plugin process.
640 Returns:
641 The validated canonical cwd path or None if none is set.
643 Raises:
644 ValueError: if cwd does not exist or is not a directory.
645 """
646 if not cwd:
647 return cwd
648 cwd_path = Path(cwd).expanduser().resolve()
649 if not cwd_path.is_dir():
650 raise ValueError(f"MCP stdio cwd {cwd} does not exist or is not a directory.")
651 return str(cwd_path)
653 @field_validator(UDS, mode="after")
654 @classmethod
655 def validate_uds(cls, uds: str | None) -> str | None:
656 """Validate a Unix domain socket path for streamable HTTP.
658 Args:
659 uds: Unix domain socket path.
661 Returns:
662 The validated canonical uds path or None if none is set.
664 Raises:
665 ValueError: if uds is empty, not absolute, or parent directory is invalid.
666 """
667 if uds is None:
668 return uds
669 if not isinstance(uds, str) or not uds.strip():
670 raise ValueError("MCP client uds must be a non-empty string.")
672 uds_path = Path(uds).expanduser().resolve()
673 if not uds_path.is_absolute():
674 raise ValueError(f"MCP client uds path must be absolute: {uds}")
676 parent_dir = uds_path.parent
677 if not parent_dir.is_dir():
678 raise ValueError(f"MCP client uds parent directory does not exist: {parent_dir}")
680 # Check parent directory permissions for security
681 try:
682 parent_mode = parent_dir.stat().st_mode
683 # Warn if parent directory is world-writable (o+w = 0o002)
684 if parent_mode & 0o002:
685 logging.getLogger(__name__).warning(
686 "MCP client uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).",
687 parent_dir,
688 )
689 except OSError:
690 pass # Best effort - continue if we can't check permissions
692 return str(uds_path)
694 @model_validator(mode="after")
695 def validate_tls_usage(self) -> Self: # pylint: disable=bad-classmethod-argument
696 """Ensure TLS configuration is only used with HTTP-based transports.
698 Returns:
699 Self after validation.
701 Raises:
702 ValueError: If TLS configuration is used with non-HTTP transports.
703 """
705 if self.tls and self.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP):
706 raise ValueError("TLS configuration is only valid for HTTP/SSE transports")
707 if self.uds and self.tls:
708 raise ValueError("TLS configuration is not supported for Unix domain sockets.")
709 return self
711 @model_validator(mode="after")
712 def validate_transport_fields(self) -> Self: # pylint: disable=bad-classmethod-argument
713 """Ensure transport-specific fields are only used with matching transports.
715 Returns:
716 Self after validation.
718 Raises:
719 ValueError: if fields are incompatible with the selected transport.
720 """
721 if self.proto == TransportType.STDIO and self.url:
722 raise ValueError("URL is only valid for HTTP/SSE transports")
723 if self.proto != TransportType.STDIO and (self.script or self.cmd or self.env or self.cwd):
724 raise ValueError("script/cmd/env/cwd are only valid for STDIO transport")
725 if self.proto != TransportType.STREAMABLEHTTP and self.uds:
726 raise ValueError("uds is only valid for STREAMABLEHTTP transport")
727 return self
730class GRPCClientTLSConfig(MCPTransportTLSConfigBase):
731 """Client-side gRPC TLS configuration (gateway connecting to plugin).
733 Attributes:
734 verify (bool): Whether to verify the remote server certificate.
735 """
737 verify: bool = Field(default=True, description="Verify the upstream server certificate")
739 @classmethod
740 def from_env(cls) -> Optional["GRPCClientTLSConfig"]:
741 """Construct gRPC client TLS configuration from PLUGINS_GRPC_CLIENT_* environment variables.
743 Returns:
744 GRPCClientTLSConfig instance or None if no environment variables are set.
745 """
746 env = os.environ
747 data: dict[str, Any] = {}
749 if env.get("PLUGINS_GRPC_CLIENT_MTLS_CERTFILE"):
750 data["certfile"] = env["PLUGINS_GRPC_CLIENT_MTLS_CERTFILE"]
751 if env.get("PLUGINS_GRPC_CLIENT_MTLS_KEYFILE"):
752 data["keyfile"] = env["PLUGINS_GRPC_CLIENT_MTLS_KEYFILE"]
753 if env.get("PLUGINS_GRPC_CLIENT_MTLS_CA_BUNDLE"):
754 data["ca_bundle"] = env["PLUGINS_GRPC_CLIENT_MTLS_CA_BUNDLE"]
755 if env.get("PLUGINS_GRPC_CLIENT_MTLS_KEYFILE_PASSWORD") is not None:
756 data["keyfile_password"] = env["PLUGINS_GRPC_CLIENT_MTLS_KEYFILE_PASSWORD"]
758 verify_val = cls._parse_bool(env.get("PLUGINS_GRPC_CLIENT_MTLS_VERIFY"))
759 if verify_val is not None:
760 data["verify"] = verify_val
762 if not data:
763 return None
765 return cls(**data)
768class GRPCServerTLSConfig(MCPTransportTLSConfigBase):
769 """Server-side gRPC TLS configuration (plugin accepting gateway connections).
771 Attributes:
772 client_auth (str): Client certificate requirement ('none', 'optional', 'require').
773 """
775 client_auth: str = Field(default="require", description="Client certificate requirement (none, optional, require)")
777 @field_validator("client_auth", mode="after")
778 @classmethod
779 def validate_client_auth(cls, value: str) -> str:
780 """Validate client_auth value.
782 Args:
783 value: Client auth requirement string.
785 Returns:
786 Validated client auth string.
788 Raises:
789 ValueError: If client_auth is not a valid value.
790 """
791 valid_values = {"none", "optional", "require"}
792 if value.lower() not in valid_values:
793 raise ValueError(f"client_auth must be one of {valid_values}, got '{value}'")
794 return value.lower()
796 @classmethod
797 def from_env(cls) -> Optional["GRPCServerTLSConfig"]:
798 """Construct gRPC server TLS configuration from PLUGINS_GRPC_SERVER_SSL_* environment variables.
800 Returns:
801 GRPCServerTLSConfig instance or None if no environment variables are set.
802 """
803 env = os.environ
804 data: dict[str, Any] = {}
806 if env.get("PLUGINS_GRPC_SERVER_SSL_KEYFILE"):
807 data["keyfile"] = env["PLUGINS_GRPC_SERVER_SSL_KEYFILE"]
808 if env.get("PLUGINS_GRPC_SERVER_SSL_CERTFILE"):
809 data["certfile"] = env["PLUGINS_GRPC_SERVER_SSL_CERTFILE"]
810 if env.get("PLUGINS_GRPC_SERVER_SSL_CA_CERTS"):
811 data["ca_bundle"] = env["PLUGINS_GRPC_SERVER_SSL_CA_CERTS"]
812 if env.get("PLUGINS_GRPC_SERVER_SSL_KEYFILE_PASSWORD") is not None:
813 data["keyfile_password"] = env["PLUGINS_GRPC_SERVER_SSL_KEYFILE_PASSWORD"]
814 if env.get("PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH"):
815 data["client_auth"] = env["PLUGINS_GRPC_SERVER_SSL_CLIENT_AUTH"]
817 if not data:
818 return None
820 return cls(**data)
823class GRPCClientConfig(BaseModel):
824 """Client-side gRPC configuration (gateway connecting to external plugin).
826 Attributes:
827 target (Optional[str]): The gRPC target address in host:port format.
828 uds (Optional[str]): Unix domain socket path (alternative to target).
829 tls (Optional[GRPCClientTLSConfig]): Client-side TLS configuration for mTLS.
831 Examples:
832 >>> # TCP connection
833 >>> config = GRPCClientConfig(target="localhost:50051")
834 >>> config.get_target()
835 'localhost:50051'
836 >>> # Unix domain socket connection (path is resolved to canonical form)
837 >>> config = GRPCClientConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP
838 >>> config.get_target() # doctest: +SKIP
839 'unix:///tmp/grpc-plugin.sock'
840 """
842 target: Optional[str] = Field(default=None, description="gRPC target address (host:port)")
843 uds: Optional[str] = Field(default=None, description="Unix domain socket path")
844 tls: Optional[GRPCClientTLSConfig] = None
846 @field_validator("target", mode="after")
847 @classmethod
848 def validate_target(cls, target: str | None) -> str | None:
849 """Validate gRPC target address format.
851 Args:
852 target: The target address to validate.
854 Returns:
855 The validated target address.
857 Raises:
858 ValueError: If target is not in host:port format.
859 """
860 if target is None:
861 return target
862 if not target:
863 raise ValueError("gRPC target address cannot be empty")
864 # Basic validation - should contain host and port
865 if ":" not in target:
866 raise ValueError(f"gRPC target must be in host:port format, got '{target}'")
867 return target
869 @field_validator("uds", mode="after")
870 @classmethod
871 def validate_uds(cls, uds: str | None) -> str | None:
872 """Validate Unix domain socket path for gRPC.
874 Args:
875 uds: Unix domain socket path.
877 Returns:
878 The validated canonical uds path or None if none is set.
880 Raises:
881 ValueError: if uds is empty, not absolute, or parent directory is invalid.
882 """
883 if uds is None:
884 return uds
885 if not isinstance(uds, str) or not uds.strip():
886 raise ValueError("gRPC client uds must be a non-empty string.")
888 uds_path = Path(uds).expanduser().resolve()
889 if not uds_path.is_absolute():
890 raise ValueError(f"gRPC client uds path must be absolute: {uds}")
892 parent_dir = uds_path.parent
893 if not parent_dir.is_dir():
894 raise ValueError(f"gRPC client uds parent directory does not exist: {parent_dir}")
896 # Check parent directory permissions for security
897 try:
898 parent_mode = parent_dir.stat().st_mode
899 if parent_mode & 0o002:
900 logging.getLogger(__name__).warning(
901 "gRPC client uds parent directory %s is world-writable. Consider using a directory with restricted permissions.",
902 parent_dir,
903 )
904 except OSError:
905 pass
907 return str(uds_path)
909 @model_validator(mode="after")
910 def validate_target_or_uds(self) -> Self: # pylint: disable=bad-classmethod-argument
911 """Ensure exactly one of target or uds is configured.
913 Returns:
914 Self after validation.
916 Raises:
917 ValueError: If neither or both target and uds are set.
918 """
919 has_target = self.target is not None
920 has_uds = self.uds is not None
922 if not has_target and not has_uds:
923 raise ValueError("gRPC client must have either 'target' or 'uds' configured")
924 if has_target and has_uds:
925 raise ValueError("gRPC client cannot have both 'target' and 'uds' configured")
926 if has_uds and self.tls:
927 raise ValueError("TLS configuration is not supported for Unix domain sockets")
928 return self
930 def get_target(self) -> str:
931 """Get the gRPC target string for channel creation.
933 Returns:
934 str: The target string, either host:port or unix:///path format.
935 """
936 if self.uds:
937 return f"unix://{self.uds}"
938 return self.target or ""
941class GRPCServerConfig(BaseModel):
942 """Server-side gRPC configuration (plugin running as gRPC server).
944 Attributes:
945 host (str): Server host to bind to.
946 port (int): Server port to bind to.
947 uds (Optional[str]): Unix domain socket path (alternative to host:port).
948 tls (Optional[GRPCServerTLSConfig]): Server-side TLS configuration.
950 Examples:
951 >>> # TCP binding
952 >>> config = GRPCServerConfig(host="0.0.0.0", port=50051)
953 >>> config.get_bind_address()
954 '0.0.0.0:50051'
955 >>> # Unix domain socket binding (path is resolved to canonical form)
956 >>> config = GRPCServerConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP
957 >>> config.get_bind_address() # doctest: +SKIP
958 'unix:///tmp/grpc-plugin.sock'
959 """
961 host: str = Field(default="127.0.0.1", description="Server host to bind to")
962 port: int = Field(default=50051, description="Server port to bind to")
963 uds: Optional[str] = Field(default=None, description="Unix domain socket path")
964 tls: Optional[GRPCServerTLSConfig] = Field(default=None, description="Server-side TLS configuration")
966 @field_validator("uds", mode="after")
967 @classmethod
968 def validate_uds(cls, uds: str | None) -> str | None:
969 """Validate Unix domain socket path for gRPC server.
971 Args:
972 uds: Unix domain socket path.
974 Returns:
975 The validated canonical uds path or None if none is set.
977 Raises:
978 ValueError: if uds is empty, not absolute, or parent directory is invalid.
979 """
980 if uds is None:
981 return uds
982 if not isinstance(uds, str) or not uds.strip():
983 raise ValueError("gRPC server uds must be a non-empty string.")
985 uds_path = Path(uds).expanduser().resolve()
986 if not uds_path.is_absolute():
987 raise ValueError(f"gRPC server uds path must be absolute: {uds}")
989 parent_dir = uds_path.parent
990 if not parent_dir.is_dir():
991 raise ValueError(f"gRPC server uds parent directory does not exist: {parent_dir}")
993 # Check parent directory permissions for security
994 try:
995 parent_mode = parent_dir.stat().st_mode
996 if parent_mode & 0o002:
997 logging.getLogger(__name__).warning(
998 "gRPC server uds parent directory %s is world-writable. Consider using a directory with restricted permissions.",
999 parent_dir,
1000 )
1001 except OSError:
1002 pass
1004 return str(uds_path)
1006 @model_validator(mode="after")
1007 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument
1008 """Ensure TLS is not configured when using a Unix domain socket.
1010 Returns:
1011 Self after validation.
1013 Raises:
1014 ValueError: if tls is set with uds.
1015 """
1016 if self.uds and self.tls:
1017 raise ValueError("TLS configuration is not supported for Unix domain sockets")
1018 return self
1020 def get_bind_address(self) -> str:
1021 """Get the gRPC bind address string.
1023 Returns:
1024 str: The bind address, either host:port or unix:///path format.
1025 """
1026 if self.uds:
1027 return f"unix://{self.uds}"
1028 return f"{self.host}:{self.port}"
1030 @classmethod
1031 def from_env(cls) -> Optional["GRPCServerConfig"]:
1032 """Construct gRPC server configuration from PLUGINS_GRPC_SERVER_* environment variables.
1034 Returns:
1035 GRPCServerConfig instance or None if no environment variables are set.
1037 Raises:
1038 ValueError: If PLUGINS_GRPC_SERVER_PORT is not a valid integer.
1039 """
1040 env = os.environ
1041 data: dict[str, Any] = {}
1043 if env.get("PLUGINS_GRPC_SERVER_HOST"):
1044 data["host"] = env["PLUGINS_GRPC_SERVER_HOST"]
1045 if env.get("PLUGINS_GRPC_SERVER_PORT"):
1046 try:
1047 data["port"] = int(env["PLUGINS_GRPC_SERVER_PORT"])
1048 except ValueError:
1049 raise ValueError(f"Invalid PLUGINS_GRPC_SERVER_PORT: {env['PLUGINS_GRPC_SERVER_PORT']}")
1050 if env.get("PLUGINS_GRPC_SERVER_UDS"):
1051 data["uds"] = env["PLUGINS_GRPC_SERVER_UDS"]
1053 # Check if SSL/TLS is enabled
1054 ssl_enabled_str = env.get("PLUGINS_GRPC_SERVER_SSL_ENABLED", "").lower()
1055 if ssl_enabled_str in {"1", "true", "yes", "on"}:
1056 tls_config = GRPCServerTLSConfig.from_env()
1057 if tls_config:
1058 data["tls"] = tls_config
1060 if not data:
1061 return None
1063 return cls(**data)
1066class UnixSocketClientConfig(BaseModel):
1067 """Client-side Unix socket configuration (gateway connecting to external plugin).
1069 Attributes:
1070 path (str): Path to the Unix domain socket file.
1071 reconnect_attempts (int): Number of reconnection attempts on failure.
1072 reconnect_delay (float): Base delay between reconnection attempts (with exponential backoff).
1073 timeout (float): Timeout for read operations in seconds.
1075 Examples:
1076 >>> config = UnixSocketClientConfig(path="/tmp/plugin.sock")
1077 >>> config.path
1078 '/tmp/plugin.sock'
1079 >>> config.reconnect_attempts
1080 3
1081 """
1083 path: str = Field(..., description="Path to the Unix domain socket")
1084 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts")
1085 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)")
1086 timeout: float = Field(default=30.0, description="Read timeout in seconds")
1088 @field_validator("path", mode="after")
1089 @classmethod
1090 def validate_path(cls, path: str) -> str:
1091 """Validate Unix socket path.
1093 Args:
1094 path: The socket path to validate.
1096 Returns:
1097 The validated path.
1099 Raises:
1100 ValueError: If path is empty or invalid.
1101 """
1102 if not path:
1103 raise ValueError("Unix socket path cannot be empty")
1104 if not path.startswith("/"):
1105 raise ValueError(f"Unix socket path must be absolute, got '{path}'")
1106 return path
1109class UnixSocketServerConfig(BaseModel):
1110 """Server-side Unix socket configuration (plugin running as Unix socket server).
1112 Attributes:
1113 path (str): Path to the Unix domain socket file.
1115 Examples:
1116 >>> config = UnixSocketServerConfig(path="/tmp/plugin.sock")
1117 >>> config.path
1118 '/tmp/plugin.sock'
1119 """
1121 path: str = Field(default="/tmp/mcpgateway-plugins.sock", description="Path to the Unix domain socket") # nosec B108 - configurable default
1123 @classmethod
1124 def from_env(cls) -> Optional["UnixSocketServerConfig"]:
1125 """Construct Unix socket server configuration from environment variables.
1127 Returns:
1128 UnixSocketServerConfig instance or None if no environment variables are set.
1129 """
1130 env = os.environ
1131 data: dict[str, Any] = {}
1133 if env.get("UNIX_SOCKET_PATH"):
1134 data["path"] = env["UNIX_SOCKET_PATH"]
1136 if not data:
1137 return None
1139 return cls(**data)
1142class PluginConfig(BaseModel):
1143 """A plugin configuration.
1145 Attributes:
1146 name (str): The unique name of the plugin.
1147 description (str): A description of the plugin.
1148 author (str): The author of the plugin.
1149 kind (str): The kind or type of plugin. Usually a fully qualified object type.
1150 namespace (str): The namespace where the plugin resides.
1151 version (str): version of the plugin.
1152 hooks (list[str]): a list of the hook points where the plugin will be called. Default: [].
1153 tags (list[str]): a list of tags for making the plugin searchable.
1154 mode (bool): whether the plugin is active.
1155 priority (int): indicates the order in which the plugin is run. Lower = higher priority. Default: 100.
1156 conditions (Optional[list[PluginCondition]]): the conditions on which the plugin is run.
1157 applied_to (Optional[list[AppliedTo]]): the tools, fields, that the plugin is applied to.
1158 config (dict[str, Any]): the plugin specific configurations.
1159 mcp (Optional[MCPClientConfig]): Client-side MCP configuration (gateway connecting to plugin).
1160 grpc (Optional[GRPCClientConfig]): Client-side gRPC configuration (gateway connecting to plugin).
1161 """
1163 name: str
1164 description: Optional[str] = None
1165 author: Optional[str] = None
1166 kind: str
1167 namespace: Optional[str] = None
1168 version: Optional[str] = None
1169 hooks: list[str] = Field(default_factory=list)
1170 tags: list[str] = Field(default_factory=list)
1171 mode: PluginMode = PluginMode.ENFORCE
1172 priority: int = 100 # Lower = higher priority
1173 conditions: list[PluginCondition] = Field(default_factory=list) # When to apply
1174 applied_to: Optional[AppliedTo] = None # Fields to apply to.
1175 config: Optional[dict[str, Any]] = None
1176 mcp: Optional[MCPClientConfig] = None
1177 grpc: Optional[GRPCClientConfig] = None
1178 unix_socket: Optional[UnixSocketClientConfig] = None
1180 @model_validator(mode="after")
1181 def check_url_or_script_filled(self) -> Self: # pylint: disable=bad-classmethod-argument
1182 """Checks to see that at least one of url or script are set depending on MCP server configuration.
1184 Raises:
1185 ValueError: if the script/cmd attribute is not defined with STDIO set, or the URL not defined with HTTP transports.
1187 Returns:
1188 The model after validation.
1189 """
1190 if not self.mcp:
1191 return self
1192 if self.mcp.proto == TransportType.STDIO and not (self.mcp.script or self.mcp.cmd):
1193 raise ValueError(f"Plugin {self.name} has transport type set to STDIO but no script/cmd value")
1194 if self.mcp.proto == TransportType.STDIO and self.mcp.script and self.mcp.cmd:
1195 raise ValueError(f"Plugin {self.name} must set either script or cmd for STDIO, not both")
1196 if self.mcp.proto in (TransportType.STREAMABLEHTTP, TransportType.SSE) and not self.mcp.url:
1197 raise ValueError(f"Plugin {self.name} has transport type set to StreamableHTTP but no url value")
1198 if self.mcp.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP, TransportType.STDIO):
1199 raise ValueError(f"Plugin {self.name} must set transport type to either SSE or STREAMABLEHTTP or STDIO")
1200 return self
1202 @model_validator(mode="after")
1203 def check_config_and_external(self, info: ValidationInfo) -> Self: # pylint: disable=bad-classmethod-argument
1204 """Checks to see that a plugin's 'config' section is not defined if the kind is 'external'. This is because developers cannot override items in the plugin config section for external plugins.
1206 Args:
1207 info: the contextual information passed into the pydantic model during model validation. Used to determine validation sequence.
1209 Raises:
1210 ValueError: if the script attribute is not defined with STDIO set, or the URL not defined with HTTP transports.
1212 Returns:
1213 The model after validation.
1214 """
1215 ignore_config_external = False
1216 if info and info.context and IGNORE_CONFIG_EXTERNAL in info.context:
1217 ignore_config_external = info.context[IGNORE_CONFIG_EXTERNAL]
1219 if not ignore_config_external and self.config and self.kind == EXTERNAL_PLUGIN_TYPE:
1220 raise ValueError(f"""Cannot have {self.name} plugin defined as 'external' with 'config' set.""" """ 'config' section settings can only be set on the plugin server.""")
1222 # External plugins must have exactly one transport configured (mcp, grpc, or unix_socket)
1223 if self.kind == EXTERNAL_PLUGIN_TYPE:
1224 has_mcp = self.mcp is not None
1225 has_grpc = self.grpc is not None
1226 has_unix = self.unix_socket is not None
1227 transport_count = sum([has_mcp, has_grpc, has_unix])
1229 if transport_count == 0:
1230 raise ValueError(f"External plugin {self.name} must have 'mcp', 'grpc', or 'unix_socket' section configured")
1231 if transport_count > 1:
1232 raise ValueError(f"External plugin {self.name} can only have one transport configured (mcp, grpc, or unix_socket)")
1234 return self
1237class PluginManifest(BaseModel):
1238 """Plugin manifest.
1240 Attributes:
1241 description (str): A description of the plugin.
1242 author (str): The author of the plugin.
1243 version (str): version of the plugin.
1244 tags (list[str]): a list of tags for making the plugin searchable.
1245 available_hooks (list[str]): a list of the hook points where the plugin is callable.
1246 default_config (dict[str, Any]): the default configurations.
1247 """
1249 description: str
1250 author: str
1251 version: str
1252 tags: list[str]
1253 available_hooks: list[str]
1254 default_config: dict[str, Any]
1257class PluginErrorModel(BaseModel):
1258 """A plugin error, used to denote exceptions/errors inside external plugins.
1260 Attributes:
1261 message (str): the reason for the error.
1262 code (str): an error code.
1263 details: (dict[str, Any]): additional error details.
1264 plugin_name (str): the plugin name.
1265 mcp_error_code ([int]): The MCP error code passed back to the client. Defaults to Internal Error.
1266 """
1268 message: str
1269 plugin_name: str
1270 code: Optional[str] = ""
1271 details: Optional[dict[str, Any]] = Field(default_factory=dict)
1272 mcp_error_code: int = -32603
1275class PluginViolation(BaseModel):
1276 """A plugin violation, used to denote policy violations.
1278 Attributes:
1279 reason (str): the reason for the violation.
1280 description (str): a longer description of the violation.
1281 code (str): a violation code.
1282 details: (dict[str, Any]): additional violation details.
1283 _plugin_name (str): the plugin name, private attribute set by the plugin manager.
1284 mcp_error_code(Optional[int]): A valid mcp error code which will be sent back to the client if plugin enabled.
1286 Examples:
1287 >>> violation = PluginViolation(
1288 ... reason="Invalid input",
1289 ... description="The input contains prohibited content",
1290 ... code="PROHIBITED_CONTENT",
1291 ... details={"field": "message", "value": "test"}
1292 ... )
1293 >>> violation.reason
1294 'Invalid input'
1295 >>> violation.code
1296 'PROHIBITED_CONTENT'
1297 >>> violation.plugin_name = "content_filter"
1298 >>> violation.plugin_name
1299 'content_filter'
1300 """
1302 reason: str
1303 description: str
1304 code: str
1305 details: Optional[dict[str, Any]] = Field(default_factory=dict)
1306 _plugin_name: str = PrivateAttr(default="")
1307 mcp_error_code: Optional[int] = None
1309 @property
1310 def plugin_name(self) -> str:
1311 """Getter for the plugin name attribute.
1313 Returns:
1314 The plugin name associated with the violation.
1315 """
1316 return self._plugin_name
1318 @plugin_name.setter
1319 def plugin_name(self, name: str) -> None:
1320 """Setter for the plugin_name attribute.
1322 Args:
1323 name: the plugin name.
1325 Raises:
1326 ValueError: if name is empty or not a string.
1327 """
1328 if not isinstance(name, str) or not name.strip():
1329 raise ValueError("Name must be a non-empty string.")
1330 self._plugin_name = name
1333class PluginSettings(BaseModel):
1334 """Global plugin settings.
1336 Attributes:
1337 parallel_execution_within_band (bool): execute plugins with same priority in parallel.
1338 plugin_timeout (int): timeout value for plugins operations.
1339 fail_on_plugin_error (bool): error when there is a plugin connectivity or ignore.
1340 enable_plugin_api (bool): enable or disable plugins globally.
1341 plugin_health_check_interval (int): health check interval check.
1342 include_user_info (bool): if enabled user info is injected in plugin context
1343 """
1345 parallel_execution_within_band: bool = False
1346 plugin_timeout: int = 30
1347 fail_on_plugin_error: bool = False
1348 enable_plugin_api: bool = False
1349 plugin_health_check_interval: int = 60
1350 include_user_info: bool = False
1353class Config(BaseModel):
1354 """Configurations for plugins.
1356 Attributes:
1357 plugins (Optional[list[PluginConfig]]): the list of plugins to enable.
1358 plugin_dirs (list[str]): The directories in which to look for plugins.
1359 plugin_settings (PluginSettings): global settings for plugins.
1360 server_settings (Optional[MCPServerConfig]): Server-side MCP configuration (when plugins run as server).
1361 grpc_server_settings (Optional[GRPCServerConfig]): Server-side gRPC configuration (when plugins run as gRPC server).
1362 unix_socket_server_settings (Optional[UnixSocketServerConfig]): Server-side Unix socket configuration.
1363 """
1365 plugins: Optional[list[PluginConfig]] = []
1366 plugin_dirs: list[str] = []
1367 plugin_settings: PluginSettings
1368 server_settings: Optional[MCPServerConfig] = None
1369 grpc_server_settings: Optional[GRPCServerConfig] = None
1370 unix_socket_server_settings: Optional[UnixSocketServerConfig] = None
1373class PluginResult(BaseModel, Generic[T]):
1374 """A result of the plugin hook processing. The actual type is dependent on the hook.
1376 Attributes:
1377 continue_processing (bool): Whether to stop processing.
1378 modified_payload (Optional[Any]): The modified payload if the plugin is a transformer.
1379 violation (Optional[PluginViolation]): violation object.
1380 metadata (Optional[dict[str, Any]]): additional metadata.
1382 Examples:
1383 >>> result = PluginResult()
1384 >>> result.continue_processing
1385 True
1386 >>> result.metadata
1387 {}
1388 >>> from mcpgateway.plugins.framework import PluginViolation
1389 >>> violation = PluginViolation(
1390 ... reason="Test", description="Test desc", code="TEST", details={}
1391 ... )
1392 >>> result2 = PluginResult(continue_processing=False, violation=violation)
1393 >>> result2.continue_processing
1394 False
1395 >>> result2.violation.code
1396 'TEST'
1397 >>> r = PluginResult(metadata={"key": "value"})
1398 >>> r.metadata["key"]
1399 'value'
1400 >>> r2 = PluginResult(continue_processing=False)
1401 >>> r2.continue_processing
1402 False
1403 """
1405 continue_processing: bool = True
1406 modified_payload: Optional[T] = None
1407 violation: Optional[PluginViolation] = None
1408 metadata: Optional[dict[str, Any]] = Field(default_factory=dict)
1411class GlobalContext(BaseModel):
1412 """The global context, which shared across all plugins.
1414 Attributes:
1415 request_id (str): ID of the HTTP request.
1416 user (str): user ID associated with the request.
1417 tenant_id (str): tenant ID.
1418 server_id (str): server ID.
1419 metadata (Optional[dict[str,Any]]): a global shared metadata across plugins (Read-only from plugin's perspective).
1420 state (Optional[dict[str,Any]]): a global shared state across plugins.
1422 Examples:
1423 >>> ctx = GlobalContext(request_id="req-123")
1424 >>> ctx.request_id
1425 'req-123'
1426 >>> ctx.user is None
1427 True
1428 >>> ctx2 = GlobalContext(request_id="req-456", user="alice", tenant_id="tenant1")
1429 >>> ctx2.user
1430 'alice'
1431 >>> ctx2.tenant_id
1432 'tenant1'
1433 >>> c = GlobalContext(request_id="123", server_id="srv1")
1434 >>> c.request_id
1435 '123'
1436 >>> c.server_id
1437 'srv1'
1438 """
1440 request_id: str
1441 user: Optional[Union[str, dict[str, Any]]] = None
1442 tenant_id: Optional[str] = None
1443 server_id: Optional[str] = None
1444 state: dict[str, Any] = Field(default_factory=dict)
1445 metadata: dict[str, Any] = Field(default_factory=dict)
1448class PluginContext(BaseModel):
1449 """The plugin's context, which lasts a request lifecycle.
1451 Attributes:
1452 state: the inmemory state of the request.
1453 global_context: the context that is shared across plugins.
1454 metadata: plugin meta data.
1456 Examples:
1457 >>> gctx = GlobalContext(request_id="req-123")
1458 >>> ctx = PluginContext(global_context=gctx)
1459 >>> ctx.global_context.request_id
1460 'req-123'
1461 >>> ctx.global_context.user is None
1462 True
1463 >>> ctx.state["somekey"] = "some value"
1464 >>> ctx.state["somekey"]
1465 'some value'
1466 """
1468 state: dict[str, Any] = Field(default_factory=dict)
1469 global_context: GlobalContext
1470 metadata: dict[str, Any] = Field(default_factory=dict)
1472 def get_state(self, key: str, default: Any = None) -> Any:
1473 """Get value from shared state.
1475 Args:
1476 key: The key to access the shared state.
1477 default: A default value if one doesn't exist.
1479 Returns:
1480 The state value.
1481 """
1482 return self.state.get(key, default)
1484 def set_state(self, key: str, value: Any) -> None:
1485 """Set value in shared state.
1487 Args:
1488 key: the key to add to the state.
1489 value: the value to add to the state.
1490 """
1491 self.state[key] = value
1493 async def cleanup(self) -> None:
1494 """Cleanup context resources."""
1495 self.state.clear()
1496 self.metadata.clear()
1498 def is_empty(self) -> bool:
1499 """Check whether the state and metadata objects are empty.
1501 Returns:
1502 True if the context state and metadata are empty.
1503 """
1504 return not (self.state or self.metadata or self.global_context.state)
1507PluginContextTable = dict[str, PluginContext]
1509PluginPayload: TypeAlias = BaseModel