Coverage for mcpgateway / plugins / framework / models.py: 99%
545 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/models.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo
7Pydantic models for plugins.
8This module implements the pydantic models associated with
9the base plugin layer including configurations, and contexts.
10"""
12# Standard
13from enum import Enum
14import logging
15import os
16from pathlib import Path
17from typing import Any, Generic, Optional, Self, TypeVar, Union
19# Third-Party
20from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator, model_validator, PrivateAttr, ValidationInfo
22# First-Party
23from mcpgateway.plugins.framework.constants import CMD, CWD, ENV, EXTERNAL_PLUGIN_TYPE, IGNORE_CONFIG_EXTERNAL, PYTHON_SUFFIX, SCRIPT, UDS, URL
24from mcpgateway.plugins.framework.settings import get_client_mtls_settings, get_grpc_client_mtls_settings, get_grpc_server_settings, get_mcp_server_settings, get_transport_settings
25from mcpgateway.plugins.framework.validators import validate_plugin_url
27T = TypeVar("T")
30class TransportType(str, Enum):
31 """Supported transport mechanisms for MCP plugin communication.
33 Attributes:
34 SSE: Server-Sent Events transport.
35 HTTP: Standard HTTP-based transport.
36 STDIO: Standard input/output transport.
37 STREAMABLEHTTP: HTTP transport with streaming.
38 GRPC: gRPC transport for external plugins.
40 Examples:
41 >>> TransportType.SSE
42 <TransportType.SSE: 'SSE'>
43 >>> TransportType.STDIO.value
44 'STDIO'
45 >>> TransportType('STREAMABLEHTTP')
46 <TransportType.STREAMABLEHTTP: 'STREAMABLEHTTP'>
47 """
49 SSE = "SSE"
50 HTTP = "HTTP"
51 STDIO = "STDIO"
52 STREAMABLEHTTP = "STREAMABLEHTTP"
53 GRPC = "GRPC"
56class PluginMode(str, Enum):
57 """Plugin modes of operation.
59 Attributes:
60 enforce: enforces the plugin result, and blocks execution when there is an error.
61 enforce_ignore_error: enforces the plugin result, but allows execution when there is an error.
62 permissive: audits the result.
63 disabled: plugin disabled.
65 Examples:
66 >>> PluginMode.ENFORCE
67 <PluginMode.ENFORCE: 'enforce'>
68 >>> PluginMode.ENFORCE_IGNORE_ERROR
69 <PluginMode.ENFORCE_IGNORE_ERROR: 'enforce_ignore_error'>
70 >>> PluginMode.PERMISSIVE.value
71 'permissive'
72 >>> PluginMode('disabled')
73 <PluginMode.DISABLED: 'disabled'>
74 >>> 'enforce' in [m.value for m in PluginMode]
75 True
76 """
78 ENFORCE = "enforce"
79 ENFORCE_IGNORE_ERROR = "enforce_ignore_error"
80 PERMISSIVE = "permissive"
81 DISABLED = "disabled"
84class BaseTemplate(BaseModel):
85 """Base Template.The ToolTemplate, PromptTemplate and ResourceTemplate could be extended using this
87 Attributes:
88 context (Optional[list[str]]): specifies the keys of context to be extracted. The context could be global (shared between the plugins) or
89 local (shared within the plugin). Example: global.key1.
90 extensions (Optional[dict[str, Any]]): add custom keys for your specific plugin. Example - 'policy'
91 key for opa plugin.
93 Examples:
94 >>> base = BaseTemplate(context=["global.key1.key2", "local.key1.key2"])
95 >>> base.context
96 ['global.key1.key2', 'local.key1.key2']
97 >>> base = BaseTemplate(context=["global.key1.key2"], extensions={"policy" : "sample policy"})
98 >>> base.extensions
99 {'policy': 'sample policy'}
100 """
102 context: Optional[list[str]] = None
103 extensions: Optional[dict[str, Any]] = None
106class ToolTemplate(BaseTemplate):
107 """Tool Template.
109 Attributes:
110 tool_name (str): the name of the tool.
111 fields (Optional[list[str]]): the tool fields that are affected.
112 result (bool): analyze tool output if true.
114 Examples:
115 >>> tool = ToolTemplate(tool_name="my_tool")
116 >>> tool.tool_name
117 'my_tool'
118 >>> tool.result
119 False
120 >>> tool2 = ToolTemplate(tool_name="analyzer", fields=["input", "params"], result=True)
121 >>> tool2.fields
122 ['input', 'params']
123 >>> tool2.result
124 True
125 """
127 tool_name: str
128 fields: Optional[list[str]] = None
129 result: bool = False
132class PromptTemplate(BaseTemplate):
133 """Prompt Template.
135 Attributes:
136 prompt_name (str): the name of the prompt.
137 fields (Optional[list[str]]): the prompt fields that are affected.
138 result (bool): analyze tool output if true.
140 Examples:
141 >>> prompt = PromptTemplate(prompt_name="greeting")
142 >>> prompt.prompt_name
143 'greeting'
144 >>> prompt.result
145 False
146 >>> prompt2 = PromptTemplate(prompt_name="question", fields=["context"], result=True)
147 >>> prompt2.fields
148 ['context']
149 """
151 prompt_name: str
152 fields: Optional[list[str]] = None
153 result: bool = False
156class ResourceTemplate(BaseTemplate):
157 """Resource Template.
159 Attributes:
160 resource_uri (str): the URI of the resource.
161 fields (Optional[list[str]]): the resource fields that are affected.
162 result (bool): analyze resource output if true.
164 Examples:
165 >>> resource = ResourceTemplate(resource_uri="file:///data.txt")
166 >>> resource.resource_uri
167 'file:///data.txt'
168 >>> resource.result
169 False
170 >>> resource2 = ResourceTemplate(resource_uri="http://api/data", fields=["content"], result=True)
171 >>> resource2.fields
172 ['content']
173 """
175 resource_uri: str
176 fields: Optional[list[str]] = None
177 result: bool = False
180class PluginCondition(BaseModel):
181 """Conditions for when plugin should execute.
183 Attributes:
184 server_ids (Optional[set[str]]): set of server ids.
185 tenant_ids (Optional[set[str]]): set of tenant ids.
186 tools (Optional[set[str]]): set of tool names.
187 prompts (Optional[set[str]]): set of prompt names.
188 resources (Optional[set[str]]): set of resource URIs.
189 agents (Optional[set[str]]): set of agent IDs.
190 user_pattern (Optional[list[str]]): list of user patterns.
191 content_types (Optional[list[str]]): list of content types.
193 Examples:
194 >>> cond = PluginCondition(server_ids={"server1", "server2"})
195 >>> "server1" in cond.server_ids
196 True
197 >>> cond2 = PluginCondition(tools={"tool1"}, prompts={"prompt1"})
198 >>> cond2.tools
199 {'tool1'}
200 >>> cond3 = PluginCondition(user_patterns=["admin", "root"])
201 >>> len(cond3.user_patterns)
202 2
203 """
205 server_ids: Optional[set[str]] = None
206 tenant_ids: Optional[set[str]] = None
207 tools: Optional[set[str]] = None
208 prompts: Optional[set[str]] = None
209 resources: Optional[set[str]] = None
210 agents: Optional[set[str]] = None
211 user_patterns: Optional[list[str]] = None
212 content_types: Optional[list[str]] = None
214 @field_serializer("server_ids", "tenant_ids", "tools", "prompts", "resources", "agents")
215 def serialize_set(self, value: set[str] | None) -> list[str] | None:
216 """Serialize set objects in PluginCondition for MCP.
218 Args:
219 value: a set of server ids, tenant ids, tools or prompts.
221 Returns:
222 The set as a serializable list.
223 """
224 if value:
225 values = []
226 for key in value:
227 values.append(key)
228 return values
229 return None
232class AppliedTo(BaseModel):
233 """What tools/prompts/resources and fields the plugin will be applied to.
235 Attributes:
236 tools (Optional[list[ToolTemplate]]): tools and fields to be applied.
237 prompts (Optional[list[PromptTemplate]]): prompts and fields to be applied.
238 resources (Optional[list[ResourceTemplate]]): resources and fields to be applied.
239 global_context (Optional[list[str]]): keys in the context to be applied on globally
240 local_context(Optional[list[str]]): keys in the context to be applied on locally
241 """
243 tools: Optional[list[ToolTemplate]] = None
244 prompts: Optional[list[PromptTemplate]] = None
245 resources: Optional[list[ResourceTemplate]] = None
248class MCPTransportTLSConfigBase(BaseModel):
249 """Base TLS configuration with common fields for both client and server.
251 Attributes:
252 certfile (Optional[str]): Path to the PEM-encoded certificate file.
253 keyfile (Optional[str]): Path to the PEM-encoded private key file.
254 ca_bundle (Optional[str]): Path to a CA bundle file for verification.
255 keyfile_password (Optional[str]): Optional password for encrypted private key.
256 """
258 certfile: Optional[str] = Field(default=None, description="Path to PEM certificate file")
259 keyfile: Optional[str] = Field(default=None, description="Path to PEM private key file")
260 ca_bundle: Optional[str] = Field(default=None, description="Path to CA bundle for verification")
261 keyfile_password: Optional[str] = Field(default=None, description="Password for encrypted private key")
263 @field_validator("ca_bundle", "certfile", "keyfile", mode="after")
264 @classmethod
265 def validate_path(cls, value: Optional[str]) -> Optional[str]:
266 """Expand and validate file paths supplied in TLS configuration.
268 Args:
269 value: File path to validate.
271 Returns:
272 Expanded file path or None if not provided.
274 Raises:
275 ValueError: If file path does not exist.
276 """
278 if not value:
279 return value
280 expanded = Path(value).expanduser()
281 if not expanded.is_file():
282 raise ValueError(f"TLS file path does not exist: {value}")
283 return str(expanded)
285 @model_validator(mode="after")
286 def validate_cert_key(self) -> Self: # pylint: disable=bad-classmethod-argument
287 """Ensure certificate and key options are consistent.
289 Returns:
290 Self after validation.
292 Raises:
293 ValueError: If keyfile is specified without certfile.
294 """
296 if self.keyfile and not self.certfile:
297 raise ValueError("keyfile requires certfile to be specified")
298 return self
301class MCPClientTLSConfig(MCPTransportTLSConfigBase):
302 """Client-side TLS configuration (gateway connecting to plugin).
304 Attributes:
305 verify (bool): Whether to verify the remote server certificate.
306 check_hostname (bool): Enable hostname verification when verify is true.
307 """
309 verify: bool = Field(default=True, description="Verify the upstream server certificate")
310 check_hostname: bool = Field(default=True, description="Enable hostname verification")
312 @classmethod
313 def from_env(cls) -> Optional["MCPClientTLSConfig"]:
314 """Construct client TLS configuration from PLUGINS_CLIENT_* environment variables.
316 Returns:
317 MCPClientTLSConfig instance or None if no environment variables are set.
318 """
319 s = get_client_mtls_settings()
320 data: dict[str, Any] = {}
322 if s.client_mtls_certfile:
323 data["certfile"] = s.client_mtls_certfile
324 if s.client_mtls_keyfile:
325 data["keyfile"] = s.client_mtls_keyfile
326 if s.client_mtls_ca_bundle:
327 data["ca_bundle"] = s.client_mtls_ca_bundle
328 if s.client_mtls_keyfile_password is not None:
329 data["keyfile_password"] = s.client_mtls_keyfile_password.get_secret_value()
330 if s.client_mtls_verify is not None:
331 data["verify"] = s.client_mtls_verify
332 if s.client_mtls_check_hostname is not None:
333 data["check_hostname"] = s.client_mtls_check_hostname
335 if not data:
336 return None
338 return cls(**data)
341class MCPServerTLSConfig(MCPTransportTLSConfigBase):
342 """Server-side TLS configuration (plugin accepting gateway connections).
344 Attributes:
345 ssl_cert_reqs (int): Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED).
346 """
348 ssl_cert_reqs: int = Field(default=2, description="Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED)")
350 @classmethod
351 def from_env(cls) -> Optional["MCPServerTLSConfig"]:
352 """Construct server TLS configuration from PLUGINS_SERVER_SSL_* environment variables.
354 Returns:
355 MCPServerTLSConfig instance or None if no environment variables are set.
356 """
357 s = get_mcp_server_settings()
358 data: dict[str, Any] = {}
360 if s.server_ssl_keyfile:
361 data["keyfile"] = s.server_ssl_keyfile
362 if s.server_ssl_certfile:
363 data["certfile"] = s.server_ssl_certfile
364 if s.server_ssl_ca_certs:
365 data["ca_bundle"] = s.server_ssl_ca_certs
366 if s.server_ssl_keyfile_password is not None:
367 data["keyfile_password"] = s.server_ssl_keyfile_password.get_secret_value()
368 if s.server_ssl_cert_reqs is not None:
369 data["ssl_cert_reqs"] = s.server_ssl_cert_reqs
371 if not data:
372 return None
374 return cls(**data)
377class MCPServerConfig(BaseModel):
378 """Server-side MCP configuration (plugin running as server).
380 Attributes:
381 host (str): Server host to bind to.
382 port (int): Server port to bind to.
383 uds (Optional[str]): Unix domain socket path for streamable HTTP.
384 tls (Optional[MCPServerTLSConfig]): Server-side TLS configuration.
385 """
387 host: str = Field(default="127.0.0.1", description="Server host to bind to")
388 port: int = Field(default=8000, description="Server port to bind to")
389 uds: Optional[str] = Field(default=None, description="Unix domain socket path for streamable HTTP")
390 tls: Optional[MCPServerTLSConfig] = Field(default=None, description="Server-side TLS configuration")
392 @field_validator("uds", mode="after")
393 @classmethod
394 def validate_uds(cls, uds: str | None) -> str | None:
395 """Validate the Unix domain socket path for security.
397 Args:
398 uds: Unix domain socket path.
400 Returns:
401 The validated canonical uds path or None if none is set.
403 Raises:
404 ValueError: if uds is empty, not absolute, or parent directory is invalid.
405 """
406 if uds is None:
407 return uds
408 if not isinstance(uds, str) or not uds.strip():
409 raise ValueError("MCP server uds must be a non-empty string.")
411 uds_path = Path(uds).expanduser().resolve()
412 if not uds_path.is_absolute():
413 raise ValueError(f"MCP server uds path must be absolute: {uds}")
415 parent_dir = uds_path.parent
416 if not parent_dir.is_dir():
417 raise ValueError(f"MCP server uds parent directory does not exist: {parent_dir}")
419 # Check parent directory permissions for security
420 try:
421 parent_mode = parent_dir.stat().st_mode
422 # Warn if parent directory is world-writable (o+w = 0o002)
423 if parent_mode & 0o002:
424 logging.getLogger(__name__).warning(
425 "MCP server uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).",
426 parent_dir,
427 )
428 except OSError:
429 pass # Best effort - continue if we can't check permissions
431 return str(uds_path)
433 @model_validator(mode="after")
434 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument
435 """Ensure TLS is not configured when using a Unix domain socket.
437 Returns:
438 Self after validation.
440 Raises:
441 ValueError: if tls is set with uds.
442 """
443 if self.uds and self.tls:
444 raise ValueError("TLS configuration is not supported for Unix domain sockets.")
445 return self
447 @classmethod
448 def from_env(cls) -> Optional["MCPServerConfig"]:
449 """Construct server configuration from PLUGINS_SERVER_* environment variables.
451 Returns:
452 MCPServerConfig instance or None if no environment variables are set.
453 """
454 s = get_mcp_server_settings()
455 data: dict[str, Any] = {}
457 if s.server_host:
458 data["host"] = s.server_host
459 if s.server_port is not None:
460 data["port"] = s.server_port
461 if s.server_uds:
462 data["uds"] = s.server_uds
464 # Check if SSL/TLS is enabled
465 if s.server_ssl_enabled:
466 tls_config = MCPServerTLSConfig.from_env()
467 if tls_config:
468 data["tls"] = tls_config
470 if not data:
471 return None
473 return cls(**data)
476class MCPClientConfig(BaseModel):
477 """Client-side MCP configuration (gateway connecting to external plugin).
479 Attributes:
480 proto (TransportType): The MCP transport type. Can be SSE, STDIO, or STREAMABLEHTTP
481 url (Optional[str]): An MCP URL. Only valid when MCP transport type is SSE or STREAMABLEHTTP.
482 script (Optional[str]): The path and name to the STDIO script that runs the plugin server. Only valid for STDIO type.
483 cmd (Optional[list[str]]): Command + args used to start a STDIO MCP server. Only valid for STDIO type.
484 env (Optional[dict[str, str]]): Environment overrides for STDIO server process.
485 cwd (Optional[str]): Working directory for STDIO server process.
486 uds (Optional[str]): Unix domain socket path for streamable HTTP.
487 tls (Optional[MCPClientTLSConfig]): Client-side TLS configuration for mTLS.
488 """
490 proto: TransportType
491 url: Optional[str] = None
492 script: Optional[str] = None
493 cmd: Optional[list[str]] = None
494 env: Optional[dict[str, str]] = None
495 cwd: Optional[str] = None
496 uds: Optional[str] = None
497 tls: Optional[MCPClientTLSConfig] = None
499 @field_validator(URL, mode="after")
500 @classmethod
501 def validate_url(cls, url: str | None) -> str | None:
502 """Validate a MCP url for streamable HTTP connections.
504 Args:
505 url: the url to be validated.
507 Raises:
508 ValueError: if the URL fails validation.
510 Returns:
511 The validated URL or None if none is set.
512 """
513 if url:
514 result = validate_plugin_url(url)
515 return result
516 return url
518 @field_validator(SCRIPT, mode="after")
519 @classmethod
520 def validate_script(cls, script: str | None) -> str | None:
521 """Validate an MCP stdio script.
523 Args:
524 script: the script to be validated.
526 Raises:
527 ValueError: if the script doesn't exist or isn't executable when required.
529 Returns:
530 The validated string or None if none is set.
531 """
532 if script:
533 file_path = Path(script).expanduser()
534 # Allow relative paths; they are resolved at runtime (optionally using cwd).
535 if file_path.is_absolute():
536 if not file_path.is_file():
537 raise ValueError(f"MCP server script {script} does not exist.")
538 # Allow Python (.py) and shell scripts (.sh). Other files must be executable.
539 if file_path.suffix not in {PYTHON_SUFFIX, ".sh"} and not os.access(file_path, os.X_OK):
540 raise ValueError(f"MCP server script {script} must be executable.")
541 return script
543 @field_validator(CMD, mode="after")
544 @classmethod
545 def validate_cmd(cls, cmd: list[str] | None) -> list[str] | None:
546 """Validate an MCP stdio command.
548 Args:
549 cmd: the command to be validated.
551 Raises:
552 ValueError: if cmd is empty or contains empty values.
554 Returns:
555 The validated command list or None if none is set.
556 """
557 if cmd is None:
558 return cmd
559 if not isinstance(cmd, list) or not cmd:
560 raise ValueError("MCP stdio cmd must be a non-empty list.")
561 if not all(isinstance(part, str) and part.strip() for part in cmd):
562 raise ValueError("MCP stdio cmd entries must be non-empty strings.")
563 return cmd
565 @field_validator(ENV, mode="after")
566 @classmethod
567 def validate_env(cls, env: dict[str, str] | None) -> dict[str, str] | None:
568 """Validate environment overrides for MCP stdio.
570 Args:
571 env: Environment overrides to set for the stdio plugin process.
573 Returns:
574 The validated environment dict or None if none is set.
576 Raises:
577 ValueError: if keys/values are invalid or the dict is empty.
578 """
579 if env is None:
580 return env
581 if not isinstance(env, dict) or not env:
582 raise ValueError("MCP stdio env must be a non-empty dict.")
583 for key, value in env.items():
584 if not isinstance(key, str) or not key.strip():
585 raise ValueError("MCP stdio env keys must be non-empty strings.")
586 if not isinstance(value, str):
587 raise ValueError("MCP stdio env values must be strings.")
588 return env
590 @field_validator(CWD, mode="after")
591 @classmethod
592 def validate_cwd(cls, cwd: str | None) -> str | None:
593 """Validate the working directory for MCP stdio.
595 Args:
596 cwd: Working directory for the stdio plugin process.
598 Returns:
599 The validated canonical cwd path or None if none is set.
601 Raises:
602 ValueError: if cwd does not exist or is not a directory.
603 """
604 if not cwd:
605 return cwd
606 cwd_path = Path(cwd).expanduser().resolve()
607 if not cwd_path.is_dir():
608 raise ValueError(f"MCP stdio cwd {cwd} does not exist or is not a directory.")
609 return str(cwd_path)
611 @field_validator(UDS, mode="after")
612 @classmethod
613 def validate_uds(cls, uds: str | None) -> str | None:
614 """Validate a Unix domain socket path for streamable HTTP.
616 Args:
617 uds: Unix domain socket path.
619 Returns:
620 The validated canonical uds path or None if none is set.
622 Raises:
623 ValueError: if uds is empty, not absolute, or parent directory is invalid.
624 """
625 if uds is None:
626 return uds
627 if not isinstance(uds, str) or not uds.strip():
628 raise ValueError("MCP client uds must be a non-empty string.")
630 uds_path = Path(uds).expanduser().resolve()
631 if not uds_path.is_absolute():
632 raise ValueError(f"MCP client uds path must be absolute: {uds}")
634 parent_dir = uds_path.parent
635 if not parent_dir.is_dir():
636 raise ValueError(f"MCP client uds parent directory does not exist: {parent_dir}")
638 # Check parent directory permissions for security
639 try:
640 parent_mode = parent_dir.stat().st_mode
641 # Warn if parent directory is world-writable (o+w = 0o002)
642 if parent_mode & 0o002:
643 logging.getLogger(__name__).warning(
644 "MCP client uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).",
645 parent_dir,
646 )
647 except OSError:
648 pass # Best effort - continue if we can't check permissions
650 return str(uds_path)
652 @model_validator(mode="after")
653 def validate_tls_usage(self) -> Self: # pylint: disable=bad-classmethod-argument
654 """Ensure TLS configuration is only used with HTTP-based transports.
656 Returns:
657 Self after validation.
659 Raises:
660 ValueError: If TLS configuration is used with non-HTTP transports.
661 """
663 if self.tls and self.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP):
664 raise ValueError("TLS configuration is only valid for HTTP/SSE transports")
665 if self.uds and self.tls:
666 raise ValueError("TLS configuration is not supported for Unix domain sockets.")
667 return self
669 @model_validator(mode="after")
670 def validate_transport_fields(self) -> Self: # pylint: disable=bad-classmethod-argument
671 """Ensure transport-specific fields are only used with matching transports.
673 Returns:
674 Self after validation.
676 Raises:
677 ValueError: if fields are incompatible with the selected transport.
678 """
679 if self.proto == TransportType.STDIO and self.url:
680 raise ValueError("URL is only valid for HTTP/SSE transports")
681 if self.proto != TransportType.STDIO and (self.script or self.cmd or self.env or self.cwd):
682 raise ValueError("script/cmd/env/cwd are only valid for STDIO transport")
683 if self.proto != TransportType.STREAMABLEHTTP and self.uds:
684 raise ValueError("uds is only valid for STREAMABLEHTTP transport")
685 return self
688class GRPCClientTLSConfig(MCPTransportTLSConfigBase):
689 """Client-side gRPC TLS configuration (gateway connecting to plugin).
691 Attributes:
692 verify (bool): Whether to verify the remote server certificate.
693 """
695 verify: bool = Field(default=True, description="Verify the upstream server certificate")
697 @classmethod
698 def from_env(cls) -> Optional["GRPCClientTLSConfig"]:
699 """Construct gRPC client TLS configuration from PLUGINS_GRPC_CLIENT_* environment variables.
701 Returns:
702 GRPCClientTLSConfig instance or None if no environment variables are set.
703 """
704 s = get_grpc_client_mtls_settings()
705 data: dict[str, Any] = {}
707 if s.grpc_client_mtls_certfile:
708 data["certfile"] = s.grpc_client_mtls_certfile
709 if s.grpc_client_mtls_keyfile:
710 data["keyfile"] = s.grpc_client_mtls_keyfile
711 if s.grpc_client_mtls_ca_bundle:
712 data["ca_bundle"] = s.grpc_client_mtls_ca_bundle
713 if s.grpc_client_mtls_keyfile_password is not None:
714 data["keyfile_password"] = s.grpc_client_mtls_keyfile_password.get_secret_value()
715 if s.grpc_client_mtls_verify is not None:
716 data["verify"] = s.grpc_client_mtls_verify
718 if not data:
719 return None
721 return cls(**data)
724class GRPCServerTLSConfig(MCPTransportTLSConfigBase):
725 """Server-side gRPC TLS configuration (plugin accepting gateway connections).
727 Attributes:
728 client_auth (str): Client certificate requirement ('none', 'optional', 'require').
729 """
731 client_auth: str = Field(default="require", description="Client certificate requirement (none, optional, require)")
733 @field_validator("client_auth", mode="after")
734 @classmethod
735 def validate_client_auth(cls, value: str) -> str:
736 """Validate client_auth value.
738 Args:
739 value: Client auth requirement string.
741 Returns:
742 Validated client auth string.
744 Raises:
745 ValueError: If client_auth is not a valid value.
746 """
747 valid_values = {"none", "optional", "require"}
748 if value.lower() not in valid_values:
749 raise ValueError(f"client_auth must be one of {valid_values}, got '{value}'")
750 return value.lower()
752 @classmethod
753 def from_env(cls) -> Optional["GRPCServerTLSConfig"]:
754 """Construct gRPC server TLS configuration from PLUGINS_GRPC_SERVER_SSL_* environment variables.
756 Returns:
757 GRPCServerTLSConfig instance or None if no environment variables are set.
758 """
759 s = get_grpc_server_settings()
760 data: dict[str, Any] = {}
762 if s.grpc_server_ssl_keyfile:
763 data["keyfile"] = s.grpc_server_ssl_keyfile
764 if s.grpc_server_ssl_certfile:
765 data["certfile"] = s.grpc_server_ssl_certfile
766 if s.grpc_server_ssl_ca_certs:
767 data["ca_bundle"] = s.grpc_server_ssl_ca_certs
768 if s.grpc_server_ssl_keyfile_password is not None:
769 data["keyfile_password"] = s.grpc_server_ssl_keyfile_password.get_secret_value()
770 if s.grpc_server_ssl_client_auth:
771 data["client_auth"] = s.grpc_server_ssl_client_auth
773 if not data:
774 return None
776 return cls(**data)
779class GRPCClientConfig(BaseModel):
780 """Client-side gRPC configuration (gateway connecting to external plugin).
782 Attributes:
783 target (Optional[str]): The gRPC target address in host:port format.
784 uds (Optional[str]): Unix domain socket path (alternative to target).
785 tls (Optional[GRPCClientTLSConfig]): Client-side TLS configuration for mTLS.
787 Examples:
788 >>> # TCP connection
789 >>> config = GRPCClientConfig(target="localhost:50051")
790 >>> config.get_target()
791 'localhost:50051'
792 >>> # Unix domain socket connection (path is resolved to canonical form)
793 >>> config = GRPCClientConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP
794 >>> config.get_target() # doctest: +SKIP
795 'unix:///tmp/grpc-plugin.sock'
796 """
798 target: Optional[str] = Field(default=None, description="gRPC target address (host:port)")
799 uds: Optional[str] = Field(default=None, description="Unix domain socket path")
800 tls: Optional[GRPCClientTLSConfig] = None
802 @field_validator("target", mode="after")
803 @classmethod
804 def validate_target(cls, target: str | None) -> str | None:
805 """Validate gRPC target address format.
807 Args:
808 target: The target address to validate.
810 Returns:
811 The validated target address.
813 Raises:
814 ValueError: If target is not in host:port format.
815 """
816 if target is None:
817 return target
818 if not target:
819 raise ValueError("gRPC target address cannot be empty")
820 # Basic validation - should contain host and port
821 if ":" not in target:
822 raise ValueError(f"gRPC target must be in host:port format, got '{target}'")
823 return target
825 @field_validator("uds", mode="after")
826 @classmethod
827 def validate_uds(cls, uds: str | None) -> str | None:
828 """Validate Unix domain socket path for gRPC.
830 Args:
831 uds: Unix domain socket path.
833 Returns:
834 The validated canonical uds path or None if none is set.
836 Raises:
837 ValueError: if uds is empty, not absolute, or parent directory is invalid.
838 """
839 if uds is None:
840 return uds
841 if not isinstance(uds, str) or not uds.strip():
842 raise ValueError("gRPC client uds must be a non-empty string.")
844 uds_path = Path(uds).expanduser().resolve()
845 if not uds_path.is_absolute():
846 raise ValueError(f"gRPC client uds path must be absolute: {uds}")
848 parent_dir = uds_path.parent
849 if not parent_dir.is_dir():
850 raise ValueError(f"gRPC client uds parent directory does not exist: {parent_dir}")
852 # Check parent directory permissions for security
853 try:
854 parent_mode = parent_dir.stat().st_mode
855 if parent_mode & 0o002:
856 logging.getLogger(__name__).warning(
857 "gRPC client uds parent directory %s is world-writable. Consider using a directory with restricted permissions.",
858 parent_dir,
859 )
860 except OSError:
861 pass
863 return str(uds_path)
865 @model_validator(mode="after")
866 def validate_target_or_uds(self) -> Self: # pylint: disable=bad-classmethod-argument
867 """Ensure exactly one of target or uds is configured.
869 Returns:
870 Self after validation.
872 Raises:
873 ValueError: If neither or both target and uds are set.
874 """
875 has_target = self.target is not None
876 has_uds = self.uds is not None
878 if not has_target and not has_uds:
879 raise ValueError("gRPC client must have either 'target' or 'uds' configured")
880 if has_target and has_uds:
881 raise ValueError("gRPC client cannot have both 'target' and 'uds' configured")
882 if has_uds and self.tls:
883 raise ValueError("TLS configuration is not supported for Unix domain sockets")
884 return self
886 def get_target(self) -> str:
887 """Get the gRPC target string for channel creation.
889 Returns:
890 str: The target string, either host:port or unix:///path format.
891 """
892 if self.uds:
893 return f"unix://{self.uds}"
894 return self.target or ""
897class GRPCServerConfig(BaseModel):
898 """Server-side gRPC configuration (plugin running as gRPC server).
900 Attributes:
901 host (str): Server host to bind to.
902 port (int): Server port to bind to.
903 uds (Optional[str]): Unix domain socket path (alternative to host:port).
904 tls (Optional[GRPCServerTLSConfig]): Server-side TLS configuration.
906 Examples:
907 >>> # TCP binding
908 >>> config = GRPCServerConfig(host="0.0.0.0", port=50051)
909 >>> config.get_bind_address()
910 '0.0.0.0:50051'
911 >>> # Unix domain socket binding (path is resolved to canonical form)
912 >>> config = GRPCServerConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP
913 >>> config.get_bind_address() # doctest: +SKIP
914 'unix:///tmp/grpc-plugin.sock'
915 """
917 host: str = Field(default="127.0.0.1", description="Server host to bind to")
918 port: int = Field(default=50051, description="Server port to bind to")
919 uds: Optional[str] = Field(default=None, description="Unix domain socket path")
920 tls: Optional[GRPCServerTLSConfig] = Field(default=None, description="Server-side TLS configuration")
922 @field_validator("uds", mode="after")
923 @classmethod
924 def validate_uds(cls, uds: str | None) -> str | None:
925 """Validate Unix domain socket path for gRPC server.
927 Args:
928 uds: Unix domain socket path.
930 Returns:
931 The validated canonical uds path or None if none is set.
933 Raises:
934 ValueError: if uds is empty, not absolute, or parent directory is invalid.
935 """
936 if uds is None:
937 return uds
938 if not isinstance(uds, str) or not uds.strip():
939 raise ValueError("gRPC server uds must be a non-empty string.")
941 uds_path = Path(uds).expanduser().resolve()
942 if not uds_path.is_absolute():
943 raise ValueError(f"gRPC server uds path must be absolute: {uds}")
945 parent_dir = uds_path.parent
946 if not parent_dir.is_dir():
947 raise ValueError(f"gRPC server uds parent directory does not exist: {parent_dir}")
949 # Check parent directory permissions for security
950 try:
951 parent_mode = parent_dir.stat().st_mode
952 if parent_mode & 0o002:
953 logging.getLogger(__name__).warning(
954 "gRPC server uds parent directory %s is world-writable. Consider using a directory with restricted permissions.",
955 parent_dir,
956 )
957 except OSError:
958 pass
960 return str(uds_path)
962 @model_validator(mode="after")
963 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument
964 """Ensure TLS is not configured when using a Unix domain socket.
966 Returns:
967 Self after validation.
969 Raises:
970 ValueError: if tls is set with uds.
971 """
972 if self.uds and self.tls:
973 raise ValueError("TLS configuration is not supported for Unix domain sockets")
974 return self
976 def get_bind_address(self) -> str:
977 """Get the gRPC bind address string.
979 Returns:
980 str: The bind address, either host:port or unix:///path format.
981 """
982 if self.uds:
983 return f"unix://{self.uds}"
984 return f"{self.host}:{self.port}"
986 @classmethod
987 def from_env(cls) -> Optional["GRPCServerConfig"]:
988 """Construct gRPC server configuration from PLUGINS_GRPC_SERVER_* environment variables.
990 Returns:
991 GRPCServerConfig instance or None if no environment variables are set.
992 """
993 s = get_grpc_server_settings()
994 data: dict[str, Any] = {}
996 if s.grpc_server_host:
997 data["host"] = s.grpc_server_host
998 if s.grpc_server_port is not None:
999 data["port"] = s.grpc_server_port
1000 if s.grpc_server_uds:
1001 data["uds"] = s.grpc_server_uds
1003 # Check if SSL/TLS is enabled
1004 if s.grpc_server_ssl_enabled:
1005 tls_config = GRPCServerTLSConfig.from_env()
1006 if tls_config:
1007 data["tls"] = tls_config
1009 if not data:
1010 return None
1012 return cls(**data)
1015class UnixSocketClientConfig(BaseModel):
1016 """Client-side Unix socket configuration (gateway connecting to external plugin).
1018 Attributes:
1019 path (str): Path to the Unix domain socket file.
1020 reconnect_attempts (int): Number of reconnection attempts on failure.
1021 reconnect_delay (float): Base delay between reconnection attempts (with exponential backoff).
1022 timeout (float): Timeout for read operations in seconds.
1024 Examples:
1025 >>> config = UnixSocketClientConfig(path="/tmp/plugin.sock")
1026 >>> config.path
1027 '/tmp/plugin.sock'
1028 >>> config.reconnect_attempts
1029 3
1030 """
1032 path: str = Field(..., description="Path to the Unix domain socket")
1033 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts")
1034 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)")
1035 timeout: float = Field(default=30.0, description="Read timeout in seconds")
1037 @field_validator("path", mode="after")
1038 @classmethod
1039 def validate_path(cls, path: str) -> str:
1040 """Validate Unix socket path.
1042 Args:
1043 path: The socket path to validate.
1045 Returns:
1046 The validated path.
1048 Raises:
1049 ValueError: If path is empty or invalid.
1050 """
1051 if not path:
1052 raise ValueError("Unix socket path cannot be empty")
1053 if not path.startswith("/"):
1054 raise ValueError(f"Unix socket path must be absolute, got '{path}'")
1055 return path
1058class UnixSocketServerConfig(BaseModel):
1059 """Server-side Unix socket configuration (plugin running as Unix socket server).
1061 Attributes:
1062 path (str): Path to the Unix domain socket file.
1064 Examples:
1065 >>> config = UnixSocketServerConfig(path="/tmp/plugin.sock")
1066 >>> config.path
1067 '/tmp/plugin.sock'
1068 """
1070 path: str = Field(default="/tmp/mcpgateway-plugins.sock", description="Path to the Unix domain socket") # nosec B108 - configurable default
1072 @classmethod
1073 def from_env(cls) -> Optional["UnixSocketServerConfig"]:
1074 """Construct Unix socket server configuration from environment variables.
1076 Returns:
1077 UnixSocketServerConfig instance or None if no environment variables are set.
1078 """
1079 s = get_transport_settings()
1080 data: dict[str, Any] = {}
1082 if s.unix_socket_path:
1083 data["path"] = s.unix_socket_path
1085 if not data:
1086 return None
1088 return cls(**data)
1091class PluginConfig(BaseModel):
1092 """A plugin configuration.
1094 Attributes:
1095 name (str): The unique name of the plugin.
1096 description (str): A description of the plugin.
1097 author (str): The author of the plugin.
1098 kind (str): The kind or type of plugin. Usually a fully qualified object type.
1099 namespace (str): The namespace where the plugin resides.
1100 version (str): version of the plugin.
1101 hooks (list[str]): a list of the hook points where the plugin will be called. Default: [].
1102 tags (list[str]): a list of tags for making the plugin searchable.
1103 mode (bool): whether the plugin is active.
1104 priority (int): indicates the order in which the plugin is run. Lower = higher priority. Default: 100.
1105 conditions (Optional[list[PluginCondition]]): the conditions on which the plugin is run.
1106 applied_to (Optional[list[AppliedTo]]): the tools, fields, that the plugin is applied to.
1107 config (dict[str, Any]): the plugin specific configurations.
1108 mcp (Optional[MCPClientConfig]): Client-side MCP configuration (gateway connecting to plugin).
1109 grpc (Optional[GRPCClientConfig]): Client-side gRPC configuration (gateway connecting to plugin).
1110 """
1112 name: str
1113 description: Optional[str] = None
1114 author: Optional[str] = None
1115 kind: str
1116 namespace: Optional[str] = None
1117 version: Optional[str] = None
1118 hooks: list[str] = Field(default_factory=list)
1119 tags: list[str] = Field(default_factory=list)
1120 mode: PluginMode = PluginMode.ENFORCE
1121 priority: int = 100 # Lower = higher priority
1122 conditions: list[PluginCondition] = Field(default_factory=list) # When to apply
1123 applied_to: Optional[AppliedTo] = None # Fields to apply to.
1124 config: Optional[dict[str, Any]] = None
1125 mcp: Optional[MCPClientConfig] = None
1126 grpc: Optional[GRPCClientConfig] = None
1127 unix_socket: Optional[UnixSocketClientConfig] = None
1129 @model_validator(mode="after")
1130 def check_url_or_script_filled(self) -> Self: # pylint: disable=bad-classmethod-argument
1131 """Checks to see that at least one of url or script are set depending on MCP server configuration.
1133 Raises:
1134 ValueError: if the script/cmd attribute is not defined with STDIO set, or the URL not defined with HTTP transports.
1136 Returns:
1137 The model after validation.
1138 """
1139 if not self.mcp:
1140 return self
1141 if self.mcp.proto == TransportType.STDIO and not (self.mcp.script or self.mcp.cmd):
1142 raise ValueError(f"Plugin {self.name} has transport type set to STDIO but no script/cmd value")
1143 if self.mcp.proto == TransportType.STDIO and self.mcp.script and self.mcp.cmd:
1144 raise ValueError(f"Plugin {self.name} must set either script or cmd for STDIO, not both")
1145 if self.mcp.proto in (TransportType.STREAMABLEHTTP, TransportType.SSE) and not self.mcp.url:
1146 raise ValueError(f"Plugin {self.name} has transport type set to StreamableHTTP but no url value")
1147 if self.mcp.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP, TransportType.STDIO):
1148 raise ValueError(f"Plugin {self.name} must set transport type to either SSE or STREAMABLEHTTP or STDIO")
1149 return self
1151 @model_validator(mode="after")
1152 def check_config_and_external(self, info: ValidationInfo) -> Self: # pylint: disable=bad-classmethod-argument
1153 """Checks to see that a plugin's 'config' section is not defined if the kind is 'external'. This is because developers cannot override items in the plugin config section for external plugins.
1155 Args:
1156 info: the contextual information passed into the pydantic model during model validation. Used to determine validation sequence.
1158 Raises:
1159 ValueError: if the script attribute is not defined with STDIO set, or the URL not defined with HTTP transports.
1161 Returns:
1162 The model after validation.
1163 """
1164 ignore_config_external = False
1165 if info and info.context and IGNORE_CONFIG_EXTERNAL in info.context:
1166 ignore_config_external = info.context[IGNORE_CONFIG_EXTERNAL]
1168 if not ignore_config_external and self.config and self.kind == EXTERNAL_PLUGIN_TYPE:
1169 raise ValueError(f"""Cannot have {self.name} plugin defined as 'external' with 'config' set.""" """ 'config' section settings can only be set on the plugin server.""")
1171 # External plugins must have exactly one transport configured (mcp, grpc, or unix_socket)
1172 if self.kind == EXTERNAL_PLUGIN_TYPE:
1173 has_mcp = self.mcp is not None
1174 has_grpc = self.grpc is not None
1175 has_unix = self.unix_socket is not None
1176 transport_count = sum([has_mcp, has_grpc, has_unix])
1178 if transport_count == 0:
1179 raise ValueError(f"External plugin {self.name} must have 'mcp', 'grpc', or 'unix_socket' section configured")
1180 if transport_count > 1:
1181 raise ValueError(f"External plugin {self.name} can only have one transport configured (mcp, grpc, or unix_socket)")
1183 return self
1186class PluginManifest(BaseModel):
1187 """Plugin manifest.
1189 Attributes:
1190 description (str): A description of the plugin.
1191 author (str): The author of the plugin.
1192 version (str): version of the plugin.
1193 tags (list[str]): a list of tags for making the plugin searchable.
1194 available_hooks (list[str]): a list of the hook points where the plugin is callable.
1195 default_config (dict[str, Any]): the default configurations.
1196 """
1198 description: str
1199 author: str
1200 version: str
1201 tags: list[str]
1202 available_hooks: list[str]
1203 default_config: dict[str, Any]
1206class PluginErrorModel(BaseModel):
1207 """A plugin error, used to denote exceptions/errors inside external plugins.
1209 Attributes:
1210 message (str): the reason for the error.
1211 code (str): an error code.
1212 details: (dict[str, Any]): additional error details.
1213 plugin_name (str): the plugin name.
1214 mcp_error_code ([int]): The MCP error code passed back to the client. Defaults to Internal Error.
1215 """
1217 message: str
1218 plugin_name: str
1219 code: Optional[str] = ""
1220 details: Optional[dict[str, Any]] = Field(default_factory=dict)
1221 mcp_error_code: int = -32603
1224class PluginViolation(BaseModel):
1225 """A plugin violation, used to denote policy violations.
1227 Attributes:
1228 reason (str): the reason for the violation.
1229 description (str): a longer description of the violation.
1230 code (str): a violation code.
1231 details: (dict[str, Any]): additional violation details.
1232 _plugin_name (str): the plugin name, private attribute set by the plugin manager.
1233 mcp_error_code(Optional[int]): A valid mcp error code which will be sent back to the client if plugin enabled.
1235 Examples:
1236 >>> violation = PluginViolation(
1237 ... reason="Invalid input",
1238 ... description="The input contains prohibited content",
1239 ... code="PROHIBITED_CONTENT",
1240 ... details={"field": "message", "value": "test"}
1241 ... )
1242 >>> violation.reason
1243 'Invalid input'
1244 >>> violation.code
1245 'PROHIBITED_CONTENT'
1246 >>> violation.plugin_name = "content_filter"
1247 >>> violation.plugin_name
1248 'content_filter'
1249 """
1251 reason: str
1252 description: str
1253 code: str
1254 details: Optional[dict[str, Any]] = Field(default_factory=dict)
1255 _plugin_name: str = PrivateAttr(default="")
1256 mcp_error_code: Optional[int] = None
1258 @property
1259 def plugin_name(self) -> str:
1260 """Getter for the plugin name attribute.
1262 Returns:
1263 The plugin name associated with the violation.
1264 """
1265 return self._plugin_name
1267 @plugin_name.setter
1268 def plugin_name(self, name: str) -> None:
1269 """Setter for the plugin_name attribute.
1271 Args:
1272 name: the plugin name.
1274 Raises:
1275 ValueError: if name is empty or not a string.
1276 """
1277 if not isinstance(name, str) or not name.strip():
1278 raise ValueError("Name must be a non-empty string.")
1279 self._plugin_name = name
1282class PluginSettings(BaseModel):
1283 """Global plugin settings.
1285 Attributes:
1286 parallel_execution_within_band (bool): execute plugins with same priority in parallel.
1287 plugin_timeout (int): timeout value for plugins operations.
1288 fail_on_plugin_error (bool): error when there is a plugin connectivity or ignore.
1289 enable_plugin_api (bool): enable or disable plugins globally.
1290 plugin_health_check_interval (int): health check interval check.
1291 include_user_info (bool): if enabled user info is injected in plugin context
1292 """
1294 parallel_execution_within_band: bool = False
1295 plugin_timeout: int = 30
1296 fail_on_plugin_error: bool = False
1297 enable_plugin_api: bool = False
1298 plugin_health_check_interval: int = 60
1299 include_user_info: bool = False
1302class Config(BaseModel):
1303 """Configurations for plugins.
1305 Attributes:
1306 plugins (Optional[list[PluginConfig]]): the list of plugins to enable.
1307 plugin_dirs (list[str]): The directories in which to look for plugins.
1308 plugin_settings (PluginSettings): global settings for plugins.
1309 server_settings (Optional[MCPServerConfig]): Server-side MCP configuration (when plugins run as server).
1310 grpc_server_settings (Optional[GRPCServerConfig]): Server-side gRPC configuration (when plugins run as gRPC server).
1311 unix_socket_server_settings (Optional[UnixSocketServerConfig]): Server-side Unix socket configuration.
1312 """
1314 plugins: Optional[list[PluginConfig]] = []
1315 plugin_dirs: list[str] = []
1316 plugin_settings: PluginSettings
1317 server_settings: Optional[MCPServerConfig] = None
1318 grpc_server_settings: Optional[GRPCServerConfig] = None
1319 unix_socket_server_settings: Optional[UnixSocketServerConfig] = None
1322class PluginResult(BaseModel, Generic[T]):
1323 """A result of the plugin hook processing. The actual type is dependent on the hook.
1325 Attributes:
1326 continue_processing (bool): Whether to stop processing.
1327 modified_payload (Optional[Any]): The modified payload if the plugin is a transformer.
1328 violation (Optional[PluginViolation]): violation object.
1329 metadata (Optional[dict[str, Any]]): additional metadata.
1331 Examples:
1332 >>> result = PluginResult()
1333 >>> result.continue_processing
1334 True
1335 >>> result.metadata
1336 {}
1337 >>> from mcpgateway.plugins.framework import PluginViolation
1338 >>> violation = PluginViolation(
1339 ... reason="Test", description="Test desc", code="TEST", details={}
1340 ... )
1341 >>> result2 = PluginResult(continue_processing=False, violation=violation)
1342 >>> result2.continue_processing
1343 False
1344 >>> result2.violation.code
1345 'TEST'
1346 >>> r = PluginResult(metadata={"key": "value"})
1347 >>> r.metadata["key"]
1348 'value'
1349 >>> r2 = PluginResult(continue_processing=False)
1350 >>> r2.continue_processing
1351 False
1352 """
1354 continue_processing: bool = True
1355 modified_payload: Optional[T] = None
1356 violation: Optional[PluginViolation] = None
1357 metadata: Optional[dict[str, Any]] = Field(default_factory=dict)
1360class GlobalContext(BaseModel):
1361 """The global context, which shared across all plugins.
1363 Attributes:
1364 request_id (str): ID of the HTTP request.
1365 user (str): user ID associated with the request.
1366 tenant_id (str): tenant ID.
1367 server_id (str): server ID.
1368 metadata (Optional[dict[str,Any]]): a global shared metadata across plugins (Read-only from plugin's perspective).
1369 state (Optional[dict[str,Any]]): a global shared state across plugins.
1371 Examples:
1372 >>> ctx = GlobalContext(request_id="req-123")
1373 >>> ctx.request_id
1374 'req-123'
1375 >>> ctx.user is None
1376 True
1377 >>> ctx2 = GlobalContext(request_id="req-456", user="alice", tenant_id="tenant1")
1378 >>> ctx2.user
1379 'alice'
1380 >>> ctx2.tenant_id
1381 'tenant1'
1382 >>> c = GlobalContext(request_id="123", server_id="srv1")
1383 >>> c.request_id
1384 '123'
1385 >>> c.server_id
1386 'srv1'
1387 """
1389 request_id: str
1390 user: Optional[Union[str, dict[str, Any]]] = None
1391 tenant_id: Optional[str] = None
1392 server_id: Optional[str] = None
1393 state: dict[str, Any] = Field(default_factory=dict)
1394 metadata: dict[str, Any] = Field(default_factory=dict)
1397class PluginContext(BaseModel):
1398 """The plugin's context, which lasts a request lifecycle.
1400 Attributes:
1401 state: the inmemory state of the request.
1402 global_context: the context that is shared across plugins.
1403 metadata: plugin meta data.
1405 Examples:
1406 >>> gctx = GlobalContext(request_id="req-123")
1407 >>> ctx = PluginContext(global_context=gctx)
1408 >>> ctx.global_context.request_id
1409 'req-123'
1410 >>> ctx.global_context.user is None
1411 True
1412 >>> ctx.state["somekey"] = "some value"
1413 >>> ctx.state["somekey"]
1414 'some value'
1415 """
1417 state: dict[str, Any] = Field(default_factory=dict)
1418 global_context: GlobalContext
1419 metadata: dict[str, Any] = Field(default_factory=dict)
1421 def get_state(self, key: str, default: Any = None) -> Any:
1422 """Get value from shared state.
1424 Args:
1425 key: The key to access the shared state.
1426 default: A default value if one doesn't exist.
1428 Returns:
1429 The state value.
1430 """
1431 return self.state.get(key, default)
1433 def set_state(self, key: str, value: Any) -> None:
1434 """Set value in shared state.
1436 Args:
1437 key: the key to add to the state.
1438 value: the value to add to the state.
1439 """
1440 self.state[key] = value
1442 async def cleanup(self) -> None:
1443 """Cleanup context resources."""
1444 self.state.clear()
1445 self.metadata.clear()
1447 def is_empty(self) -> bool:
1448 """Check whether the state and metadata objects are empty.
1450 Returns:
1451 True if the context state and metadata are empty.
1452 """
1453 return not (self.state or self.metadata or self.global_context.state)
1456PluginContextTable = dict[str, PluginContext]
1459class PluginPayload(BaseModel):
1460 """Base class for all hook payloads. Immutable by design.
1462 Frozen payloads prevent in-place mutations by plugins -- attributes
1463 cannot be set directly on the object. Plugins must use
1464 ``model_copy(update=...)`` to create modified payloads and return
1465 modifications via ``PluginResult.modified_payload``.
1467 Examples:
1468 >>> class TestPayload(PluginPayload):
1469 ... name: str
1470 >>> p = TestPayload(name="test")
1471 >>> p.name
1472 'test'
1473 """
1475 model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)