Coverage for mcpgateway / plugins / framework / models.py: 99%
551 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/models.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor, Mihai Criveti, Fred Araujo
7Pydantic models for plugins.
8This module implements the pydantic models associated with
9the base plugin layer including configurations, and contexts.
10"""
12# Standard
13from enum import Enum
14import logging
15import os
16from pathlib import Path
17from typing import Any, Generic, Optional, Self, TypeVar, Union
19# Third-Party
20from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator, model_validator, PrivateAttr, ValidationInfo
22# First-Party
23from mcpgateway.plugins.framework.constants import CMD, CWD, ENV, EXTERNAL_PLUGIN_TYPE, IGNORE_CONFIG_EXTERNAL, PYTHON_SUFFIX, SCRIPT, UDS, URL
24from mcpgateway.plugins.framework.settings import get_client_mtls_settings, get_grpc_client_mtls_settings, get_grpc_server_settings, get_mcp_server_settings, get_transport_settings
25from mcpgateway.plugins.framework.validators import validate_plugin_url
27T = TypeVar("T")
30class TransportType(str, Enum):
31 """Supported transport mechanisms for MCP plugin communication.
33 Attributes:
34 SSE: Server-Sent Events transport.
35 HTTP: Standard HTTP-based transport.
36 STDIO: Standard input/output transport.
37 STREAMABLEHTTP: HTTP transport with streaming.
38 GRPC: gRPC transport for external plugins.
40 Examples:
41 >>> TransportType.SSE
42 <TransportType.SSE: 'SSE'>
43 >>> TransportType.STDIO.value
44 'STDIO'
45 >>> TransportType('STREAMABLEHTTP')
46 <TransportType.STREAMABLEHTTP: 'STREAMABLEHTTP'>
47 """
49 SSE = "SSE"
50 HTTP = "HTTP"
51 STDIO = "STDIO"
52 STREAMABLEHTTP = "STREAMABLEHTTP"
53 GRPC = "GRPC"
56class PluginMode(str, Enum):
57 """Plugin modes of operation.
59 Attributes:
60 enforce: enforces the plugin result, and blocks execution when there is an error.
61 enforce_ignore_error: enforces the plugin result, but allows execution when there is an error.
62 permissive: audits the result.
63 disabled: plugin disabled.
65 Examples:
66 >>> PluginMode.ENFORCE
67 <PluginMode.ENFORCE: 'enforce'>
68 >>> PluginMode.ENFORCE_IGNORE_ERROR
69 <PluginMode.ENFORCE_IGNORE_ERROR: 'enforce_ignore_error'>
70 >>> PluginMode.PERMISSIVE.value
71 'permissive'
72 >>> PluginMode('disabled')
73 <PluginMode.DISABLED: 'disabled'>
74 >>> 'enforce' in [m.value for m in PluginMode]
75 True
76 """
78 ENFORCE = "enforce"
79 ENFORCE_IGNORE_ERROR = "enforce_ignore_error"
80 PERMISSIVE = "permissive"
81 DISABLED = "disabled"
84class BaseTemplate(BaseModel):
85 """Base Template.The ToolTemplate, PromptTemplate and ResourceTemplate could be extended using this
87 Attributes:
88 context (Optional[list[str]]): specifies the keys of context to be extracted. The context could be global (shared between the plugins) or
89 local (shared within the plugin). Example: global.key1.
90 extensions (Optional[dict[str, Any]]): add custom keys for your specific plugin. Example - 'policy'
91 key for opa plugin.
93 Examples:
94 >>> base = BaseTemplate(context=["global.key1.key2", "local.key1.key2"])
95 >>> base.context
96 ['global.key1.key2', 'local.key1.key2']
97 >>> base = BaseTemplate(context=["global.key1.key2"], extensions={"policy" : "sample policy"})
98 >>> base.extensions
99 {'policy': 'sample policy'}
100 """
102 context: Optional[list[str]] = None
103 extensions: Optional[dict[str, Any]] = None
106class ToolTemplate(BaseTemplate):
107 """Tool Template.
109 Attributes:
110 tool_name (str): the name of the tool.
111 fields (Optional[list[str]]): the tool fields that are affected.
112 result (bool): analyze tool output if true.
114 Examples:
115 >>> tool = ToolTemplate(tool_name="my_tool")
116 >>> tool.tool_name
117 'my_tool'
118 >>> tool.result
119 False
120 >>> tool2 = ToolTemplate(tool_name="analyzer", fields=["input", "params"], result=True)
121 >>> tool2.fields
122 ['input', 'params']
123 >>> tool2.result
124 True
125 """
127 tool_name: str
128 fields: Optional[list[str]] = None
129 result: bool = False
132class PromptTemplate(BaseTemplate):
133 """Prompt Template.
135 Attributes:
136 prompt_name (str): the name of the prompt.
137 fields (Optional[list[str]]): the prompt fields that are affected.
138 result (bool): analyze tool output if true.
140 Examples:
141 >>> prompt = PromptTemplate(prompt_name="greeting")
142 >>> prompt.prompt_name
143 'greeting'
144 >>> prompt.result
145 False
146 >>> prompt2 = PromptTemplate(prompt_name="question", fields=["context"], result=True)
147 >>> prompt2.fields
148 ['context']
149 """
151 prompt_name: str
152 fields: Optional[list[str]] = None
153 result: bool = False
156class ResourceTemplate(BaseTemplate):
157 """Resource Template.
159 Attributes:
160 resource_uri (str): the URI of the resource.
161 fields (Optional[list[str]]): the resource fields that are affected.
162 result (bool): analyze resource output if true.
164 Examples:
165 >>> resource = ResourceTemplate(resource_uri="file:///data.txt")
166 >>> resource.resource_uri
167 'file:///data.txt'
168 >>> resource.result
169 False
170 >>> resource2 = ResourceTemplate(resource_uri="http://api/data", fields=["content"], result=True)
171 >>> resource2.fields
172 ['content']
173 """
175 resource_uri: str
176 fields: Optional[list[str]] = None
177 result: bool = False
180class PluginCondition(BaseModel):
181 """Conditions for when plugin should execute.
183 Attributes:
184 server_ids (Optional[set[str]]): set of server ids.
185 tenant_ids (Optional[set[str]]): set of tenant ids.
186 tools (Optional[set[str]]): set of tool names.
187 prompts (Optional[set[str]]): set of prompt names.
188 resources (Optional[set[str]]): set of resource URIs.
189 agents (Optional[set[str]]): set of agent IDs.
190 user_pattern (Optional[list[str]]): list of user patterns.
191 content_types (Optional[list[str]]): list of content types.
193 Examples:
194 >>> cond = PluginCondition(server_ids={"server1", "server2"})
195 >>> "server1" in cond.server_ids
196 True
197 >>> cond2 = PluginCondition(tools={"tool1"}, prompts={"prompt1"})
198 >>> cond2.tools
199 {'tool1'}
200 >>> cond3 = PluginCondition(user_patterns=["admin", "root"])
201 >>> len(cond3.user_patterns)
202 2
203 """
205 server_ids: Optional[set[str]] = None
206 tenant_ids: Optional[set[str]] = None
207 tools: Optional[set[str]] = None
208 prompts: Optional[set[str]] = None
209 resources: Optional[set[str]] = None
210 agents: Optional[set[str]] = None
211 user_patterns: Optional[list[str]] = None
212 content_types: Optional[list[str]] = None
214 @field_serializer("server_ids", "tenant_ids", "tools", "prompts", "resources", "agents")
215 def serialize_set(self, value: set[str] | None) -> list[str] | None:
216 """Serialize set objects in PluginCondition for MCP.
218 Args:
219 value: a set of server ids, tenant ids, tools or prompts.
221 Returns:
222 The set as a serializable list.
223 """
224 if value:
225 values = []
226 for key in value:
227 values.append(key)
228 return values
229 return None
232class AppliedTo(BaseModel):
233 """What tools/prompts/resources and fields the plugin will be applied to.
235 Attributes:
236 tools (Optional[list[ToolTemplate]]): tools and fields to be applied.
237 prompts (Optional[list[PromptTemplate]]): prompts and fields to be applied.
238 resources (Optional[list[ResourceTemplate]]): resources and fields to be applied.
239 global_context (Optional[list[str]]): keys in the context to be applied on globally
240 local_context(Optional[list[str]]): keys in the context to be applied on locally
241 """
243 tools: Optional[list[ToolTemplate]] = None
244 prompts: Optional[list[PromptTemplate]] = None
245 resources: Optional[list[ResourceTemplate]] = None
248class MCPTransportTLSConfigBase(BaseModel):
249 """Base TLS configuration with common fields for both client and server.
251 Attributes:
252 certfile (Optional[str]): Path to the PEM-encoded certificate file.
253 keyfile (Optional[str]): Path to the PEM-encoded private key file.
254 ca_bundle (Optional[str]): Path to a CA bundle file for verification.
255 keyfile_password (Optional[str]): Optional password for encrypted private key.
256 """
258 certfile: Optional[str] = Field(default=None, description="Path to PEM certificate file")
259 keyfile: Optional[str] = Field(default=None, description="Path to PEM private key file")
260 ca_bundle: Optional[str] = Field(default=None, description="Path to CA bundle for verification")
261 keyfile_password: Optional[str] = Field(default=None, description="Password for encrypted private key")
263 @field_validator("ca_bundle", "certfile", "keyfile", mode="after")
264 @classmethod
265 def validate_path(cls, value: Optional[str]) -> Optional[str]:
266 """Expand and validate file paths supplied in TLS configuration.
268 Args:
269 value: File path to validate.
271 Returns:
272 Expanded file path or None if not provided.
274 Raises:
275 ValueError: If file path does not exist.
276 """
278 if not value:
279 return value
280 expanded = Path(value).expanduser()
281 if not expanded.is_file():
282 raise ValueError(f"TLS file path does not exist: {value}")
283 return str(expanded)
285 @model_validator(mode="after")
286 def validate_cert_key(self) -> Self: # pylint: disable=bad-classmethod-argument
287 """Ensure certificate and key options are consistent.
289 Returns:
290 Self after validation.
292 Raises:
293 ValueError: If keyfile is specified without certfile.
294 """
296 if self.keyfile and not self.certfile:
297 raise ValueError("keyfile requires certfile to be specified")
298 return self
301class MCPClientTLSConfig(MCPTransportTLSConfigBase):
302 """Client-side TLS configuration (gateway connecting to plugin).
304 Attributes:
305 verify (bool): Whether to verify the remote server certificate.
306 check_hostname (bool): Enable hostname verification when verify is true.
307 """
309 verify: bool = Field(default=True, description="Verify the upstream server certificate")
310 check_hostname: bool = Field(default=True, description="Enable hostname verification")
312 @classmethod
313 def from_env(cls) -> Optional["MCPClientTLSConfig"]:
314 """Construct client TLS configuration from PLUGINS_CLIENT_* environment variables.
316 Returns:
317 MCPClientTLSConfig instance or None if no environment variables are set.
318 """
319 s = get_client_mtls_settings()
320 data: dict[str, Any] = {}
322 if s.client_mtls_certfile:
323 data["certfile"] = s.client_mtls_certfile
324 if s.client_mtls_keyfile:
325 data["keyfile"] = s.client_mtls_keyfile
326 if s.client_mtls_ca_bundle:
327 data["ca_bundle"] = s.client_mtls_ca_bundle
328 if s.client_mtls_keyfile_password is not None:
329 data["keyfile_password"] = s.client_mtls_keyfile_password.get_secret_value()
330 if s.client_mtls_verify is not None:
331 data["verify"] = s.client_mtls_verify
332 if s.client_mtls_check_hostname is not None:
333 data["check_hostname"] = s.client_mtls_check_hostname
335 if not data:
336 return None
338 return cls(**data)
341class MCPServerTLSConfig(MCPTransportTLSConfigBase):
342 """Server-side TLS configuration (plugin accepting gateway connections).
344 Attributes:
345 ssl_cert_reqs (int): Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED).
346 """
348 ssl_cert_reqs: int = Field(default=2, description="Client certificate requirement (0=NONE, 1=OPTIONAL, 2=REQUIRED)")
350 @classmethod
351 def from_env(cls) -> Optional["MCPServerTLSConfig"]:
352 """Construct server TLS configuration from PLUGINS_SERVER_SSL_* environment variables.
354 Returns:
355 MCPServerTLSConfig instance or None if no environment variables are set.
356 """
357 s = get_mcp_server_settings()
358 data: dict[str, Any] = {}
360 if s.server_ssl_keyfile:
361 data["keyfile"] = s.server_ssl_keyfile
362 if s.server_ssl_certfile:
363 data["certfile"] = s.server_ssl_certfile
364 if s.server_ssl_ca_certs:
365 data["ca_bundle"] = s.server_ssl_ca_certs
366 if s.server_ssl_keyfile_password is not None:
367 data["keyfile_password"] = s.server_ssl_keyfile_password.get_secret_value()
368 if s.server_ssl_cert_reqs is not None:
369 data["ssl_cert_reqs"] = s.server_ssl_cert_reqs
371 if not data:
372 return None
374 return cls(**data)
377class MCPServerConfig(BaseModel):
378 """Server-side MCP configuration (plugin running as server).
380 Attributes:
381 host (str): Server host to bind to.
382 port (int): Server port to bind to.
383 uds (Optional[str]): Unix domain socket path for streamable HTTP.
384 tls (Optional[MCPServerTLSConfig]): Server-side TLS configuration.
385 """
387 host: str = Field(default="127.0.0.1", description="Server host to bind to")
388 port: int = Field(default=8000, description="Server port to bind to")
389 uds: Optional[str] = Field(default=None, description="Unix domain socket path for streamable HTTP")
390 tls: Optional[MCPServerTLSConfig] = Field(default=None, description="Server-side TLS configuration")
392 @field_validator("uds", mode="after")
393 @classmethod
394 def validate_uds(cls, uds: str | None) -> str | None:
395 """Validate the Unix domain socket path for security.
397 Args:
398 uds: Unix domain socket path.
400 Returns:
401 The validated canonical uds path or None if none is set.
403 Raises:
404 ValueError: if uds is empty, not absolute, or parent directory is invalid.
405 """
406 if uds is None:
407 return uds
408 if not isinstance(uds, str) or not uds.strip():
409 raise ValueError("MCP server uds must be a non-empty string.")
411 uds_path = Path(uds).expanduser().resolve()
412 if not uds_path.is_absolute():
413 raise ValueError(f"MCP server uds path must be absolute: {uds}")
415 parent_dir = uds_path.parent
416 if not parent_dir.is_dir():
417 raise ValueError(f"MCP server uds parent directory does not exist: {parent_dir}")
419 # Check parent directory permissions for security
420 try:
421 parent_mode = parent_dir.stat().st_mode
422 # Warn if parent directory is world-writable (o+w = 0o002)
423 if parent_mode & 0o002:
424 logging.getLogger(__name__).warning(
425 "MCP server uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).",
426 parent_dir,
427 )
428 except OSError:
429 pass # Best effort - continue if we can't check permissions
431 return str(uds_path)
433 @model_validator(mode="after")
434 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument
435 """Ensure TLS is not configured when using a Unix domain socket.
437 Returns:
438 Self after validation.
440 Raises:
441 ValueError: if tls is set with uds.
442 """
443 if self.uds and self.tls:
444 raise ValueError("TLS configuration is not supported for Unix domain sockets.")
445 return self
447 @classmethod
448 def from_env(cls) -> Optional["MCPServerConfig"]:
449 """Construct server configuration from PLUGINS_SERVER_* environment variables.
451 Returns:
452 MCPServerConfig instance or None if no environment variables are set.
453 """
454 s = get_mcp_server_settings()
455 data: dict[str, Any] = {}
457 if s.server_host:
458 data["host"] = s.server_host
459 if s.server_port is not None:
460 data["port"] = s.server_port
461 if s.server_uds:
462 data["uds"] = s.server_uds
464 # Check if SSL/TLS is enabled
465 if s.server_ssl_enabled:
466 tls_config = MCPServerTLSConfig.from_env()
467 if tls_config:
468 data["tls"] = tls_config
470 if not data:
471 return None
473 return cls(**data)
476class MCPClientConfig(BaseModel):
477 """Client-side MCP configuration (gateway connecting to external plugin).
479 Attributes:
480 proto (TransportType): The MCP transport type. Can be SSE, STDIO, or STREAMABLEHTTP
481 url (Optional[str]): An MCP URL. Only valid when MCP transport type is SSE or STREAMABLEHTTP.
482 script (Optional[str]): The path and name to the STDIO script that runs the plugin server. Only valid for STDIO type.
483 cmd (Optional[list[str]]): Command + args used to start a STDIO MCP server. Only valid for STDIO type.
484 env (Optional[dict[str, str]]): Environment overrides for STDIO server process.
485 cwd (Optional[str]): Working directory for STDIO server process.
486 uds (Optional[str]): Unix domain socket path for streamable HTTP.
487 tls (Optional[MCPClientTLSConfig]): Client-side TLS configuration for mTLS.
488 reconnect_attempts (int): Number of reconnection attempts on failure.
489 reconnect_delay (float): Base delay between reconnection attempts (seconds).
490 """
492 proto: TransportType
493 url: Optional[str] = None
494 script: Optional[str] = None
495 cmd: Optional[list[str]] = None
496 env: Optional[dict[str, str]] = None
497 cwd: Optional[str] = None
498 uds: Optional[str] = None
499 tls: Optional[MCPClientTLSConfig] = None
500 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts on failure")
501 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)")
503 @field_validator(URL, mode="after")
504 @classmethod
505 def validate_url(cls, url: str | None) -> str | None:
506 """Validate a MCP url for streamable HTTP connections.
508 Args:
509 url: the url to be validated.
511 Raises:
512 ValueError: if the URL fails validation.
514 Returns:
515 The validated URL or None if none is set.
516 """
517 if url:
518 result = validate_plugin_url(url)
519 return result
520 return url
522 @field_validator(SCRIPT, mode="after")
523 @classmethod
524 def validate_script(cls, script: str | None) -> str | None:
525 """Validate an MCP stdio script.
527 Args:
528 script: the script to be validated.
530 Raises:
531 ValueError: if the script doesn't exist or isn't executable when required.
533 Returns:
534 The validated string or None if none is set.
535 """
536 if script:
537 file_path = Path(script).expanduser()
538 # Allow relative paths; they are resolved at runtime (optionally using cwd).
539 if file_path.is_absolute():
540 if not file_path.is_file():
541 raise ValueError(f"MCP server script {script} does not exist.")
542 # Allow Python (.py) and shell scripts (.sh). Other files must be executable.
543 if file_path.suffix not in {PYTHON_SUFFIX, ".sh"} and not os.access(file_path, os.X_OK):
544 raise ValueError(f"MCP server script {script} must be executable.")
545 return script
547 @field_validator(CMD, mode="after")
548 @classmethod
549 def validate_cmd(cls, cmd: list[str] | None) -> list[str] | None:
550 """Validate an MCP stdio command.
552 Args:
553 cmd: the command to be validated.
555 Raises:
556 ValueError: if cmd is empty or contains empty values.
558 Returns:
559 The validated command list or None if none is set.
560 """
561 if cmd is None:
562 return cmd
563 if not isinstance(cmd, list) or not cmd:
564 raise ValueError("MCP stdio cmd must be a non-empty list.")
565 if not all(isinstance(part, str) and part.strip() for part in cmd):
566 raise ValueError("MCP stdio cmd entries must be non-empty strings.")
567 return cmd
569 @field_validator(ENV, mode="after")
570 @classmethod
571 def validate_env(cls, env: dict[str, str] | None) -> dict[str, str] | None:
572 """Validate environment overrides for MCP stdio.
574 Args:
575 env: Environment overrides to set for the stdio plugin process.
577 Returns:
578 The validated environment dict or None if none is set.
580 Raises:
581 ValueError: if keys/values are invalid or the dict is empty.
582 """
583 if env is None:
584 return env
585 if not isinstance(env, dict) or not env:
586 raise ValueError("MCP stdio env must be a non-empty dict.")
587 for key, value in env.items():
588 if not isinstance(key, str) or not key.strip():
589 raise ValueError("MCP stdio env keys must be non-empty strings.")
590 if not isinstance(value, str):
591 raise ValueError("MCP stdio env values must be strings.")
592 return env
594 @field_validator(CWD, mode="after")
595 @classmethod
596 def validate_cwd(cls, cwd: str | None) -> str | None:
597 """Validate the working directory for MCP stdio.
599 Args:
600 cwd: Working directory for the stdio plugin process.
602 Returns:
603 The validated canonical cwd path or None if none is set.
605 Raises:
606 ValueError: if cwd does not exist or is not a directory.
607 """
608 if not cwd:
609 return cwd
610 cwd_path = Path(cwd).expanduser().resolve()
611 if not cwd_path.is_dir():
612 raise ValueError(f"MCP stdio cwd {cwd} does not exist or is not a directory.")
613 return str(cwd_path)
615 @field_validator(UDS, mode="after")
616 @classmethod
617 def validate_uds(cls, uds: str | None) -> str | None:
618 """Validate a Unix domain socket path for streamable HTTP.
620 Args:
621 uds: Unix domain socket path.
623 Returns:
624 The validated canonical uds path or None if none is set.
626 Raises:
627 ValueError: if uds is empty, not absolute, or parent directory is invalid.
628 """
629 if uds is None:
630 return uds
631 if not isinstance(uds, str) or not uds.strip():
632 raise ValueError("MCP client uds must be a non-empty string.")
634 uds_path = Path(uds).expanduser().resolve()
635 if not uds_path.is_absolute():
636 raise ValueError(f"MCP client uds path must be absolute: {uds}")
638 parent_dir = uds_path.parent
639 if not parent_dir.is_dir():
640 raise ValueError(f"MCP client uds parent directory does not exist: {parent_dir}")
642 # Check parent directory permissions for security
643 try:
644 parent_mode = parent_dir.stat().st_mode
645 # Warn if parent directory is world-writable (o+w = 0o002)
646 if parent_mode & 0o002:
647 logging.getLogger(__name__).warning(
648 "MCP client uds parent directory %s is world-writable. This may allow unauthorized socket hijacking. Consider using a directory with restricted permissions (e.g., 0o700).",
649 parent_dir,
650 )
651 except OSError:
652 pass # Best effort - continue if we can't check permissions
654 return str(uds_path)
656 @model_validator(mode="after")
657 def validate_tls_usage(self) -> Self: # pylint: disable=bad-classmethod-argument
658 """Ensure TLS configuration is only used with HTTP-based transports.
660 Returns:
661 Self after validation.
663 Raises:
664 ValueError: If TLS configuration is used with non-HTTP transports.
665 """
667 if self.tls and self.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP):
668 raise ValueError("TLS configuration is only valid for HTTP/SSE transports")
669 if self.uds and self.tls:
670 raise ValueError("TLS configuration is not supported for Unix domain sockets.")
671 return self
673 @model_validator(mode="after")
674 def validate_transport_fields(self) -> Self: # pylint: disable=bad-classmethod-argument
675 """Ensure transport-specific fields are only used with matching transports.
677 Returns:
678 Self after validation.
680 Raises:
681 ValueError: if fields are incompatible with the selected transport.
682 """
683 if self.proto == TransportType.STDIO and self.url:
684 raise ValueError("URL is only valid for HTTP/SSE transports")
685 if self.proto != TransportType.STDIO and (self.script or self.cmd or self.env or self.cwd):
686 raise ValueError("script/cmd/env/cwd are only valid for STDIO transport")
687 if self.proto != TransportType.STREAMABLEHTTP and self.uds:
688 raise ValueError("uds is only valid for STREAMABLEHTTP transport")
689 return self
692class GRPCClientTLSConfig(MCPTransportTLSConfigBase):
693 """Client-side gRPC TLS configuration (gateway connecting to plugin).
695 Attributes:
696 verify (bool): Whether to verify the remote server certificate.
697 """
699 verify: bool = Field(default=True, description="Verify the upstream server certificate")
701 @classmethod
702 def from_env(cls) -> Optional["GRPCClientTLSConfig"]:
703 """Construct gRPC client TLS configuration from PLUGINS_GRPC_CLIENT_* environment variables.
705 Returns:
706 GRPCClientTLSConfig instance or None if no environment variables are set.
707 """
708 s = get_grpc_client_mtls_settings()
709 data: dict[str, Any] = {}
711 if s.grpc_client_mtls_certfile:
712 data["certfile"] = s.grpc_client_mtls_certfile
713 if s.grpc_client_mtls_keyfile:
714 data["keyfile"] = s.grpc_client_mtls_keyfile
715 if s.grpc_client_mtls_ca_bundle:
716 data["ca_bundle"] = s.grpc_client_mtls_ca_bundle
717 if s.grpc_client_mtls_keyfile_password is not None:
718 data["keyfile_password"] = s.grpc_client_mtls_keyfile_password.get_secret_value()
719 if s.grpc_client_mtls_verify is not None:
720 data["verify"] = s.grpc_client_mtls_verify
722 if not data:
723 return None
725 return cls(**data)
728class GRPCServerTLSConfig(MCPTransportTLSConfigBase):
729 """Server-side gRPC TLS configuration (plugin accepting gateway connections).
731 Attributes:
732 client_auth (str): Client certificate requirement ('none', 'optional', 'require').
733 """
735 client_auth: str = Field(default="require", description="Client certificate requirement (none, optional, require)")
737 @field_validator("client_auth", mode="after")
738 @classmethod
739 def validate_client_auth(cls, value: str) -> str:
740 """Validate client_auth value.
742 Args:
743 value: Client auth requirement string.
745 Returns:
746 Validated client auth string.
748 Raises:
749 ValueError: If client_auth is not a valid value.
750 """
751 valid_values = {"none", "optional", "require"}
752 if value.lower() not in valid_values:
753 raise ValueError(f"client_auth must be one of {valid_values}, got '{value}'")
754 return value.lower()
756 @classmethod
757 def from_env(cls) -> Optional["GRPCServerTLSConfig"]:
758 """Construct gRPC server TLS configuration from PLUGINS_GRPC_SERVER_SSL_* environment variables.
760 Returns:
761 GRPCServerTLSConfig instance or None if no environment variables are set.
762 """
763 s = get_grpc_server_settings()
764 data: dict[str, Any] = {}
766 if s.grpc_server_ssl_keyfile:
767 data["keyfile"] = s.grpc_server_ssl_keyfile
768 if s.grpc_server_ssl_certfile:
769 data["certfile"] = s.grpc_server_ssl_certfile
770 if s.grpc_server_ssl_ca_certs:
771 data["ca_bundle"] = s.grpc_server_ssl_ca_certs
772 if s.grpc_server_ssl_keyfile_password is not None:
773 data["keyfile_password"] = s.grpc_server_ssl_keyfile_password.get_secret_value()
774 if s.grpc_server_ssl_client_auth:
775 data["client_auth"] = s.grpc_server_ssl_client_auth
777 if not data:
778 return None
780 return cls(**data)
783class GRPCClientConfig(BaseModel):
784 """Client-side gRPC configuration (gateway connecting to external plugin).
786 Attributes:
787 target (Optional[str]): The gRPC target address in host:port format.
788 uds (Optional[str]): Unix domain socket path (alternative to target).
789 tls (Optional[GRPCClientTLSConfig]): Client-side TLS configuration for mTLS.
791 Examples:
792 >>> # TCP connection
793 >>> config = GRPCClientConfig(target="localhost:50051")
794 >>> config.get_target()
795 'localhost:50051'
796 >>> # Unix domain socket connection (path is resolved to canonical form)
797 >>> config = GRPCClientConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP
798 >>> config.get_target() # doctest: +SKIP
799 'unix:///tmp/grpc-plugin.sock'
800 """
802 target: Optional[str] = Field(default=None, description="gRPC target address (host:port)")
803 uds: Optional[str] = Field(default=None, description="Unix domain socket path")
804 tls: Optional[GRPCClientTLSConfig] = None
806 @field_validator("target", mode="after")
807 @classmethod
808 def validate_target(cls, target: str | None) -> str | None:
809 """Validate gRPC target address format.
811 Args:
812 target: The target address to validate.
814 Returns:
815 The validated target address.
817 Raises:
818 ValueError: If target is not in host:port format.
819 """
820 if target is None:
821 return target
822 if not target:
823 raise ValueError("gRPC target address cannot be empty")
824 # Basic validation - should contain host and port
825 if ":" not in target:
826 raise ValueError(f"gRPC target must be in host:port format, got '{target}'")
827 return target
829 @field_validator("uds", mode="after")
830 @classmethod
831 def validate_uds(cls, uds: str | None) -> str | None:
832 """Validate Unix domain socket path for gRPC.
834 Args:
835 uds: Unix domain socket path.
837 Returns:
838 The validated canonical uds path or None if none is set.
840 Raises:
841 ValueError: if uds is empty, not absolute, or parent directory is invalid.
842 """
843 if uds is None:
844 return uds
845 if not isinstance(uds, str) or not uds.strip():
846 raise ValueError("gRPC client uds must be a non-empty string.")
848 uds_path = Path(uds).expanduser().resolve()
849 if not uds_path.is_absolute():
850 raise ValueError(f"gRPC client uds path must be absolute: {uds}")
852 parent_dir = uds_path.parent
853 if not parent_dir.is_dir():
854 raise ValueError(f"gRPC client uds parent directory does not exist: {parent_dir}")
856 # Check parent directory permissions for security
857 try:
858 parent_mode = parent_dir.stat().st_mode
859 if parent_mode & 0o002:
860 logging.getLogger(__name__).warning(
861 "gRPC client uds parent directory %s is world-writable. Consider using a directory with restricted permissions.",
862 parent_dir,
863 )
864 except OSError:
865 pass
867 return str(uds_path)
869 @model_validator(mode="after")
870 def validate_target_or_uds(self) -> Self: # pylint: disable=bad-classmethod-argument
871 """Ensure exactly one of target or uds is configured.
873 Returns:
874 Self after validation.
876 Raises:
877 ValueError: If neither or both target and uds are set.
878 """
879 has_target = self.target is not None
880 has_uds = self.uds is not None
882 if not has_target and not has_uds:
883 raise ValueError("gRPC client must have either 'target' or 'uds' configured")
884 if has_target and has_uds:
885 raise ValueError("gRPC client cannot have both 'target' and 'uds' configured")
886 if has_uds and self.tls:
887 raise ValueError("TLS configuration is not supported for Unix domain sockets")
888 return self
890 def get_target(self) -> str:
891 """Get the gRPC target string for channel creation.
893 Returns:
894 str: The target string, either host:port or unix:///path format.
895 """
896 if self.uds:
897 return f"unix://{self.uds}"
898 return self.target or ""
901class GRPCServerConfig(BaseModel):
902 """Server-side gRPC configuration (plugin running as gRPC server).
904 Attributes:
905 host (str): Server host to bind to.
906 port (int): Server port to bind to.
907 uds (Optional[str]): Unix domain socket path (alternative to host:port).
908 tls (Optional[GRPCServerTLSConfig]): Server-side TLS configuration.
910 Examples:
911 >>> # TCP binding
912 >>> config = GRPCServerConfig(host="0.0.0.0", port=50051)
913 >>> config.get_bind_address()
914 '0.0.0.0:50051'
915 >>> # Unix domain socket binding (path is resolved to canonical form)
916 >>> config = GRPCServerConfig(uds="/tmp/grpc-plugin.sock") # doctest: +SKIP
917 >>> config.get_bind_address() # doctest: +SKIP
918 'unix:///tmp/grpc-plugin.sock'
919 """
921 host: str = Field(default="127.0.0.1", description="Server host to bind to")
922 port: int = Field(default=50051, description="Server port to bind to")
923 uds: Optional[str] = Field(default=None, description="Unix domain socket path")
924 tls: Optional[GRPCServerTLSConfig] = Field(default=None, description="Server-side TLS configuration")
926 @field_validator("uds", mode="after")
927 @classmethod
928 def validate_uds(cls, uds: str | None) -> str | None:
929 """Validate Unix domain socket path for gRPC server.
931 Args:
932 uds: Unix domain socket path.
934 Returns:
935 The validated canonical uds path or None if none is set.
937 Raises:
938 ValueError: if uds is empty, not absolute, or parent directory is invalid.
939 """
940 if uds is None:
941 return uds
942 if not isinstance(uds, str) or not uds.strip():
943 raise ValueError("gRPC server uds must be a non-empty string.")
945 uds_path = Path(uds).expanduser().resolve()
946 if not uds_path.is_absolute():
947 raise ValueError(f"gRPC server uds path must be absolute: {uds}")
949 parent_dir = uds_path.parent
950 if not parent_dir.is_dir():
951 raise ValueError(f"gRPC server uds parent directory does not exist: {parent_dir}")
953 # Check parent directory permissions for security
954 try:
955 parent_mode = parent_dir.stat().st_mode
956 if parent_mode & 0o002:
957 logging.getLogger(__name__).warning(
958 "gRPC server uds parent directory %s is world-writable. Consider using a directory with restricted permissions.",
959 parent_dir,
960 )
961 except OSError:
962 pass
964 return str(uds_path)
966 @model_validator(mode="after")
967 def validate_uds_tls(self) -> Self: # pylint: disable=bad-classmethod-argument
968 """Ensure TLS is not configured when using a Unix domain socket.
970 Returns:
971 Self after validation.
973 Raises:
974 ValueError: if tls is set with uds.
975 """
976 if self.uds and self.tls:
977 raise ValueError("TLS configuration is not supported for Unix domain sockets")
978 return self
980 def get_bind_address(self) -> str:
981 """Get the gRPC bind address string.
983 Returns:
984 str: The bind address, either host:port or unix:///path format.
985 """
986 if self.uds:
987 return f"unix://{self.uds}"
988 return f"{self.host}:{self.port}"
990 @classmethod
991 def from_env(cls) -> Optional["GRPCServerConfig"]:
992 """Construct gRPC server configuration from PLUGINS_GRPC_SERVER_* environment variables.
994 Returns:
995 GRPCServerConfig instance or None if no environment variables are set.
996 """
997 s = get_grpc_server_settings()
998 data: dict[str, Any] = {}
1000 if s.grpc_server_host:
1001 data["host"] = s.grpc_server_host
1002 if s.grpc_server_port is not None:
1003 data["port"] = s.grpc_server_port
1004 if s.grpc_server_uds:
1005 data["uds"] = s.grpc_server_uds
1007 # Check if SSL/TLS is enabled
1008 if s.grpc_server_ssl_enabled:
1009 tls_config = GRPCServerTLSConfig.from_env()
1010 if tls_config:
1011 data["tls"] = tls_config
1013 if not data:
1014 return None
1016 return cls(**data)
1019class UnixSocketClientConfig(BaseModel):
1020 """Client-side Unix socket configuration (gateway connecting to external plugin).
1022 Attributes:
1023 path (str): Path to the Unix domain socket file.
1024 reconnect_attempts (int): Number of reconnection attempts on failure.
1025 reconnect_delay (float): Base delay between reconnection attempts (with exponential backoff).
1026 timeout (float): Timeout for read operations in seconds.
1028 Examples:
1029 >>> config = UnixSocketClientConfig(path="/tmp/plugin.sock")
1030 >>> config.path
1031 '/tmp/plugin.sock'
1032 >>> config.reconnect_attempts
1033 3
1034 """
1036 path: str = Field(..., description="Path to the Unix domain socket")
1037 reconnect_attempts: int = Field(default=3, description="Number of reconnection attempts")
1038 reconnect_delay: float = Field(default=0.1, description="Base delay between reconnection attempts (seconds)")
1039 timeout: float = Field(default=30.0, description="Read timeout in seconds")
1041 @field_validator("path", mode="after")
1042 @classmethod
1043 def validate_path(cls, path: str) -> str:
1044 """Validate Unix socket path.
1046 Args:
1047 path: The socket path to validate.
1049 Returns:
1050 The validated path.
1052 Raises:
1053 ValueError: If path is empty or invalid.
1054 """
1055 if not path:
1056 raise ValueError("Unix socket path cannot be empty")
1057 if not path.startswith("/"):
1058 raise ValueError(f"Unix socket path must be absolute, got '{path}'")
1059 return path
1062class UnixSocketServerConfig(BaseModel):
1063 """Server-side Unix socket configuration (plugin running as Unix socket server).
1065 Attributes:
1066 path (str): Path to the Unix domain socket file.
1068 Examples:
1069 >>> config = UnixSocketServerConfig(path="/tmp/plugin.sock")
1070 >>> config.path
1071 '/tmp/plugin.sock'
1072 """
1074 path: str = Field(default="/tmp/mcpgateway-plugins.sock", description="Path to the Unix domain socket") # nosec B108 - configurable default
1076 @classmethod
1077 def from_env(cls) -> Optional["UnixSocketServerConfig"]:
1078 """Construct Unix socket server configuration from environment variables.
1080 Returns:
1081 UnixSocketServerConfig instance or None if no environment variables are set.
1082 """
1083 s = get_transport_settings()
1084 data: dict[str, Any] = {}
1086 if s.unix_socket_path:
1087 data["path"] = s.unix_socket_path
1089 if not data:
1090 return None
1092 return cls(**data)
1095class PluginConfig(BaseModel):
1096 """A plugin configuration.
1098 Attributes:
1099 name (str): The unique name of the plugin.
1100 description (str): A description of the plugin.
1101 author (str): The author of the plugin.
1102 kind (str): The kind or type of plugin. Usually a fully qualified object type.
1103 namespace (str): The namespace where the plugin resides.
1104 version (str): version of the plugin.
1105 hooks (list[str]): a list of the hook points where the plugin will be called. Default: [].
1106 tags (list[str]): a list of tags for making the plugin searchable.
1107 mode (bool): whether the plugin is active.
1108 priority (int): indicates the order in which the plugin is run. Lower = higher priority. Default: 100.
1109 conditions (Optional[list[PluginCondition]]): the conditions on which the plugin is run.
1110 applied_to (Optional[list[AppliedTo]]): the tools, fields, that the plugin is applied to.
1111 config (dict[str, Any]): the plugin specific configurations.
1112 mcp (Optional[MCPClientConfig]): Client-side MCP configuration (gateway connecting to plugin).
1113 grpc (Optional[GRPCClientConfig]): Client-side gRPC configuration (gateway connecting to plugin).
1114 """
1116 name: str
1117 description: Optional[str] = None
1118 author: Optional[str] = None
1119 kind: str
1120 namespace: Optional[str] = None
1121 version: Optional[str] = None
1122 hooks: list[str] = Field(default_factory=list)
1123 tags: list[str] = Field(default_factory=list)
1124 mode: PluginMode = PluginMode.ENFORCE
1125 priority: int = 100 # Lower = higher priority
1126 conditions: list[PluginCondition] = Field(default_factory=list) # When to apply
1127 applied_to: Optional[AppliedTo] = None # Fields to apply to.
1128 config: Optional[dict[str, Any]] = None
1129 mcp: Optional[MCPClientConfig] = None
1130 grpc: Optional[GRPCClientConfig] = None
1131 unix_socket: Optional[UnixSocketClientConfig] = None
1133 @model_validator(mode="after")
1134 def check_url_or_script_filled(self) -> Self: # pylint: disable=bad-classmethod-argument
1135 """Checks to see that at least one of url or script are set depending on MCP server configuration.
1137 Raises:
1138 ValueError: if the script/cmd attribute is not defined with STDIO set, or the URL not defined with HTTP transports.
1140 Returns:
1141 The model after validation.
1142 """
1143 if not self.mcp:
1144 return self
1145 if self.mcp.proto == TransportType.STDIO and not (self.mcp.script or self.mcp.cmd):
1146 raise ValueError(f"Plugin {self.name} has transport type set to STDIO but no script/cmd value")
1147 if self.mcp.proto == TransportType.STDIO and self.mcp.script and self.mcp.cmd:
1148 raise ValueError(f"Plugin {self.name} must set either script or cmd for STDIO, not both")
1149 if self.mcp.proto in (TransportType.STREAMABLEHTTP, TransportType.SSE) and not self.mcp.url:
1150 raise ValueError(f"Plugin {self.name} has transport type set to StreamableHTTP but no url value")
1151 if self.mcp.proto not in (TransportType.SSE, TransportType.STREAMABLEHTTP, TransportType.STDIO):
1152 raise ValueError(f"Plugin {self.name} must set transport type to either SSE or STREAMABLEHTTP or STDIO")
1153 return self
1155 @model_validator(mode="after")
1156 def check_config_and_external(self, info: ValidationInfo) -> Self: # pylint: disable=bad-classmethod-argument
1157 """Checks to see that a plugin's 'config' section is not defined if the kind is 'external'. This is because developers cannot override items in the plugin config section for external plugins.
1159 Args:
1160 info: the contextual information passed into the pydantic model during model validation. Used to determine validation sequence.
1162 Raises:
1163 ValueError: if the script attribute is not defined with STDIO set, or the URL not defined with HTTP transports.
1165 Returns:
1166 The model after validation.
1167 """
1168 ignore_config_external = False
1169 if info and info.context and IGNORE_CONFIG_EXTERNAL in info.context:
1170 ignore_config_external = info.context[IGNORE_CONFIG_EXTERNAL]
1172 if not ignore_config_external and self.config and self.kind == EXTERNAL_PLUGIN_TYPE:
1173 raise ValueError(f"""Cannot have {self.name} plugin defined as 'external' with 'config' set.""" """ 'config' section settings can only be set on the plugin server.""")
1175 # External plugins must have exactly one transport configured (mcp, grpc, or unix_socket)
1176 if self.kind == EXTERNAL_PLUGIN_TYPE:
1177 has_mcp = self.mcp is not None
1178 has_grpc = self.grpc is not None
1179 has_unix = self.unix_socket is not None
1180 transport_count = sum([has_mcp, has_grpc, has_unix])
1182 if transport_count == 0:
1183 raise ValueError(f"External plugin {self.name} must have 'mcp', 'grpc', or 'unix_socket' section configured")
1184 if transport_count > 1:
1185 raise ValueError(f"External plugin {self.name} can only have one transport configured (mcp, grpc, or unix_socket)")
1187 return self
1190class PluginManifest(BaseModel):
1191 """Plugin manifest.
1193 Attributes:
1194 description (str): A description of the plugin.
1195 author (str): The author of the plugin.
1196 version (str): version of the plugin.
1197 tags (list[str]): a list of tags for making the plugin searchable.
1198 available_hooks (list[str]): a list of the hook points where the plugin is callable.
1199 default_config (dict[str, Any]): the default configurations.
1200 """
1202 description: str
1203 author: str
1204 version: str
1205 tags: list[str]
1206 available_hooks: list[str]
1207 default_config: dict[str, Any]
1210class PluginErrorModel(BaseModel):
1211 """A plugin error, used to denote exceptions/errors inside external plugins.
1213 Attributes:
1214 message (str): the reason for the error.
1215 code (str): an error code.
1216 details: (dict[str, Any]): additional error details.
1217 plugin_name (str): the plugin name.
1218 mcp_error_code ([int]): The MCP error code passed back to the client. Defaults to Internal Error.
1219 """
1221 message: str
1222 plugin_name: str
1223 code: Optional[str] = ""
1224 details: Optional[dict[str, Any]] = Field(default_factory=dict)
1225 mcp_error_code: int = -32603
1228class PluginViolation(BaseModel):
1229 """A plugin violation, used to denote policy violations.
1231 Attributes:
1232 reason (str): the reason for the violation.
1233 description (str): a longer description of the violation.
1234 code (str): a violation code.
1235 details: (dict[str, Any]): additional violation details.
1236 _plugin_name (str): the plugin name, private attribute set by the plugin manager.
1237 mcp_error_code(Optional[int]): A valid mcp error code which will be sent back to the client if plugin enabled.
1238 http_status_code (Optional[int]): HTTP status code to return (e.g., 429 for rate limiting).
1239 http_headers (Optional[dict[str, str]]): HTTP headers to include in the response.
1241 Examples:
1242 >>> violation = PluginViolation(
1243 ... reason="Invalid input",
1244 ... description="The input contains prohibited content",
1245 ... code="PROHIBITED_CONTENT",
1246 ... details={"field": "message", "value": "test"}
1247 ... )
1248 >>> violation.reason
1249 'Invalid input'
1250 >>> violation.code
1251 'PROHIBITED_CONTENT'
1252 >>> violation.plugin_name = "content_filter"
1253 >>> violation.plugin_name
1254 'content_filter'
1255 """
1257 reason: str
1258 description: str
1259 code: str
1260 details: Optional[dict[str, Any]] = Field(default_factory=dict)
1261 _plugin_name: str = PrivateAttr(default="")
1262 mcp_error_code: Optional[int] = None
1263 http_status_code: Optional[int] = None
1264 http_headers: Optional[dict[str, str]] = None
1266 @property
1267 def plugin_name(self) -> str:
1268 """Getter for the plugin name attribute.
1270 Returns:
1271 The plugin name associated with the violation.
1272 """
1273 return self._plugin_name
1275 @plugin_name.setter
1276 def plugin_name(self, name: str) -> None:
1277 """Setter for the plugin_name attribute.
1279 Args:
1280 name: the plugin name.
1282 Raises:
1283 ValueError: if name is empty or not a string.
1284 """
1285 if not isinstance(name, str) or not name.strip():
1286 raise ValueError("Name must be a non-empty string.")
1287 self._plugin_name = name
1290class PluginSettings(BaseModel):
1291 """Global plugin settings.
1293 Attributes:
1294 parallel_execution_within_band (bool): execute plugins with same priority in parallel.
1295 plugin_timeout (int): timeout value for plugins operations.
1296 fail_on_plugin_error (bool): error when there is a plugin connectivity or ignore.
1297 enable_plugin_api (bool): enable or disable plugins globally.
1298 plugin_health_check_interval (int): health check interval check.
1299 include_user_info (bool): if enabled user info is injected in plugin context
1300 """
1302 parallel_execution_within_band: bool = False
1303 plugin_timeout: int = 30
1304 fail_on_plugin_error: bool = False
1305 enable_plugin_api: bool = False
1306 plugin_health_check_interval: int = 60
1307 include_user_info: bool = False
1310class Config(BaseModel):
1311 """Configurations for plugins.
1313 Attributes:
1314 plugins (Optional[list[PluginConfig]]): the list of plugins to enable.
1315 plugin_dirs (list[str]): The directories in which to look for plugins.
1316 plugin_settings (PluginSettings): global settings for plugins.
1317 server_settings (Optional[MCPServerConfig]): Server-side MCP configuration (when plugins run as server).
1318 grpc_server_settings (Optional[GRPCServerConfig]): Server-side gRPC configuration (when plugins run as gRPC server).
1319 unix_socket_server_settings (Optional[UnixSocketServerConfig]): Server-side Unix socket configuration.
1320 """
1322 plugins: Optional[list[PluginConfig]] = []
1323 plugin_dirs: list[str] = []
1324 plugin_settings: PluginSettings
1325 server_settings: Optional[MCPServerConfig] = None
1326 grpc_server_settings: Optional[GRPCServerConfig] = None
1327 unix_socket_server_settings: Optional[UnixSocketServerConfig] = None
1330class PluginResult(BaseModel, Generic[T]):
1331 """A result of the plugin hook processing. The actual type is dependent on the hook.
1333 Attributes:
1334 continue_processing (bool): Whether to stop processing.
1335 modified_payload (Optional[Any]): The modified payload if the plugin is a transformer.
1336 violation (Optional[PluginViolation]): violation object.
1337 metadata (Optional[dict[str, Any]]): additional metadata.
1338 http_headers (Optional[dict[str, str]]): HTTP headers to include in successful responses.
1339 retry_delay_ms (int): Milliseconds the gateway should wait before retrying the tool call.
1340 0 (default) means no retry. Set by retry_with_backoff plugin to request
1341 a delayed re-execution of the tool.
1343 Examples:
1344 >>> result = PluginResult()
1345 >>> result.continue_processing
1346 True
1347 >>> result.metadata
1348 {}
1349 >>> from mcpgateway.plugins.framework import PluginViolation
1350 >>> violation = PluginViolation(
1351 ... reason="Test", description="Test desc", code="TEST", details={}
1352 ... )
1353 >>> result2 = PluginResult(continue_processing=False, violation=violation)
1354 >>> result2.continue_processing
1355 False
1356 >>> result2.violation.code
1357 'TEST'
1358 >>> r = PluginResult(metadata={"key": "value"})
1359 >>> r.metadata["key"]
1360 'value'
1361 >>> r2 = PluginResult(continue_processing=False)
1362 >>> r2.continue_processing
1363 False
1364 >>> r3 = PluginResult(retry_delay_ms=500)
1365 >>> r3.retry_delay_ms
1366 500
1367 """
1369 continue_processing: bool = True
1370 modified_payload: Optional[T] = None
1371 violation: Optional[PluginViolation] = None
1372 metadata: Optional[dict[str, Any]] = Field(default_factory=dict)
1373 http_headers: Optional[dict[str, str]] = None
1374 retry_delay_ms: int = 0
1377class GlobalContext(BaseModel):
1378 """The global context, which shared across all plugins.
1380 Attributes:
1381 request_id (str): ID of the HTTP request.
1382 user (str): user ID associated with the request.
1383 tenant_id (str): tenant ID.
1384 server_id (str): server ID.
1385 metadata (Optional[dict[str,Any]]): a global shared metadata across plugins (Read-only from plugin's perspective).
1386 state (Optional[dict[str,Any]]): a global shared state across plugins.
1388 Examples:
1389 >>> ctx = GlobalContext(request_id="req-123")
1390 >>> ctx.request_id
1391 'req-123'
1392 >>> ctx.user is None
1393 True
1394 >>> ctx2 = GlobalContext(request_id="req-456", user="alice", tenant_id="tenant1")
1395 >>> ctx2.user
1396 'alice'
1397 >>> ctx2.tenant_id
1398 'tenant1'
1399 >>> c = GlobalContext(request_id="123", server_id="srv1")
1400 >>> c.request_id
1401 '123'
1402 >>> c.server_id
1403 'srv1'
1404 """
1406 request_id: str
1407 user: Optional[Union[str, dict[str, Any]]] = None
1408 tenant_id: Optional[str] = None
1409 server_id: Optional[str] = None
1410 state: dict[str, Any] = Field(default_factory=dict)
1411 metadata: dict[str, Any] = Field(default_factory=dict)
1414class PluginContext(BaseModel):
1415 """The plugin's context, which lasts a request lifecycle.
1417 Attributes:
1418 state: the inmemory state of the request.
1419 global_context: the context that is shared across plugins.
1420 metadata: plugin meta data.
1422 Examples:
1423 >>> gctx = GlobalContext(request_id="req-123")
1424 >>> ctx = PluginContext(global_context=gctx)
1425 >>> ctx.global_context.request_id
1426 'req-123'
1427 >>> ctx.global_context.user is None
1428 True
1429 >>> ctx.state["somekey"] = "some value"
1430 >>> ctx.state["somekey"]
1431 'some value'
1432 """
1434 state: dict[str, Any] = Field(default_factory=dict)
1435 global_context: GlobalContext
1436 metadata: dict[str, Any] = Field(default_factory=dict)
1438 def get_state(self, key: str, default: Any = None) -> Any:
1439 """Get value from shared state.
1441 Args:
1442 key: The key to access the shared state.
1443 default: A default value if one doesn't exist.
1445 Returns:
1446 The state value.
1447 """
1448 return self.state.get(key, default)
1450 def set_state(self, key: str, value: Any) -> None:
1451 """Set value in shared state.
1453 Args:
1454 key: the key to add to the state.
1455 value: the value to add to the state.
1456 """
1457 self.state[key] = value
1459 async def cleanup(self) -> None:
1460 """Cleanup context resources."""
1461 self.state.clear()
1462 self.metadata.clear()
1464 def is_empty(self) -> bool:
1465 """Check whether the state and metadata objects are empty.
1467 Returns:
1468 True if the context state and metadata are empty.
1469 """
1470 return not (self.state or self.metadata or self.global_context.state)
1473PluginContextTable = dict[str, PluginContext]
1476class PluginPayload(BaseModel):
1477 """Base class for all hook payloads. Immutable by design.
1479 Frozen payloads prevent in-place mutations by plugins -- attributes
1480 cannot be set directly on the object. Plugins must use
1481 ``model_copy(update=...)`` to create modified payloads and return
1482 modifications via ``PluginResult.modified_payload``.
1484 Examples:
1485 >>> class TestPayload(PluginPayload):
1486 ... name: str
1487 >>> p = TestPayload(name="test")
1488 >>> p.name
1489 'test'
1490 """
1492 model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)