Coverage for mcpgateway / schemas.py: 99%
2747 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/schemas.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7MCP Gateway Schema Definitions.
8This module provides Pydantic models for request/response validation in the MCP Gateway.
9It implements schemas for:
10- Tool registration and invocation
11- Resource management and subscriptions
12- Prompt templates and arguments
13- Gateway federation
14- RPC message formats
15- Event messages
16- Admin interface
18The schemas ensure proper validation according to the MCP specification while adding
19gateway-specific extensions for federation support.
20"""
22# Standard
23import base64
24from datetime import datetime, timezone
25from enum import Enum
26import logging
27import re
28from typing import Any, Dict, List, Literal, Optional, Pattern, Self, Union
29from urllib.parse import urlparse
31# Third-Party
32import orjson
33from pydantic import AnyHttpUrl, BaseModel, ConfigDict, EmailStr, Field, field_serializer, field_validator, model_validator, SecretStr, ValidationInfo
35# First-Party
36from mcpgateway.common.models import Annotations, ImageContent
37from mcpgateway.common.models import Prompt as MCPPrompt
38from mcpgateway.common.models import Resource as MCPResource
39from mcpgateway.common.models import ResourceContent, TextContent
40from mcpgateway.common.models import Tool as MCPTool
41from mcpgateway.common.validators import SecurityValidator
42from mcpgateway.config import settings
43from mcpgateway.utils.base_models import BaseModelWithConfigDict
44from mcpgateway.utils.services_auth import decode_auth, encode_auth
45from mcpgateway.validation.tags import validate_tags_field
47logger = logging.getLogger(__name__)
49# ============================================================================
50# Precompiled regex patterns (compiled once at module load for performance)
51# ============================================================================
52# Note: Only truly static patterns are precompiled here. Settings-based patterns
53# (e.g., from settings.* or SecurityValidator.*) are NOT precompiled because tests
54# override class/settings attributes at runtime via monkeypatch.
55_HOSTNAME_RE: Pattern[str] = re.compile(r"^(https?://)?([a-zA-Z0-9.-]+)(:[0-9]+)?$")
56_SLUG_RE: Pattern[str] = re.compile(r"^[a-z0-9-]+$")
59def encode_datetime(v: datetime) -> str:
60 """
61 Convert a datetime object to an ISO 8601 formatted string.
63 Args:
64 v (datetime): The datetime object to be encoded.
66 Returns:
67 str: The ISO 8601 formatted string representation of the datetime object.
69 Examples:
70 >>> from datetime import datetime, timezone
71 >>> encode_datetime(datetime(2023, 5, 22, 14, 30, 0))
72 '2023-05-22T14:30:00'
73 >>> encode_datetime(datetime(2024, 12, 25, 9, 15, 30))
74 '2024-12-25T09:15:30'
75 >>> encode_datetime(datetime(2025, 1, 1, 0, 0, 0))
76 '2025-01-01T00:00:00'
77 >>> # Test with timezone
78 >>> dt_utc = datetime(2023, 6, 15, 12, 0, 0, tzinfo=timezone.utc)
79 >>> encode_datetime(dt_utc)
80 '2023-06-15T12:00:00+00:00'
81 >>> # Test microseconds
82 >>> dt_micro = datetime(2023, 7, 20, 16, 45, 30, 123456)
83 >>> encode_datetime(dt_micro)
84 '2023-07-20T16:45:30.123456'
85 """
86 return v.isoformat()
89# --- Metrics Schemas ---
92class ToolMetrics(BaseModelWithConfigDict):
93 """
94 Represents the performance and execution statistics for a tool.
96 Attributes:
97 total_executions (int): Total number of tool invocations.
98 successful_executions (int): Number of successful tool invocations.
99 failed_executions (int): Number of failed tool invocations.
100 failure_rate (float): Failure rate (failed invocations / total invocations).
101 min_response_time (Optional[float]): Minimum response time in seconds.
102 max_response_time (Optional[float]): Maximum response time in seconds.
103 avg_response_time (Optional[float]): Average response time in seconds.
104 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
106 Examples:
107 >>> from datetime import datetime
108 >>> metrics = ToolMetrics(
109 ... total_executions=100,
110 ... successful_executions=95,
111 ... failed_executions=5,
112 ... failure_rate=0.05,
113 ... min_response_time=0.1,
114 ... max_response_time=2.5,
115 ... avg_response_time=0.8
116 ... )
117 >>> metrics.total_executions
118 100
119 >>> metrics.failure_rate
120 0.05
121 >>> metrics.successful_executions + metrics.failed_executions == metrics.total_executions
122 True
123 >>> # Test with minimal data
124 >>> minimal_metrics = ToolMetrics(
125 ... total_executions=10,
126 ... successful_executions=8,
127 ... failed_executions=2,
128 ... failure_rate=0.2
129 ... )
130 >>> minimal_metrics.min_response_time is None
131 True
132 >>> # Test model dump functionality
133 >>> data = metrics.model_dump()
134 >>> isinstance(data, dict)
135 True
136 >>> data['total_executions']
137 100
138 """
140 total_executions: int = Field(..., description="Total number of tool invocations")
141 successful_executions: int = Field(..., description="Number of successful tool invocations")
142 failed_executions: int = Field(..., description="Number of failed tool invocations")
143 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
144 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
145 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
146 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
147 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
150class ResourceMetrics(BaseModelWithConfigDict):
151 """
152 Represents the performance and execution statistics for a resource.
154 Attributes:
155 total_executions (int): Total number of resource invocations.
156 successful_executions (int): Number of successful resource invocations.
157 failed_executions (int): Number of failed resource invocations.
158 failure_rate (float): Failure rate (failed invocations / total invocations).
159 min_response_time (Optional[float]): Minimum response time in seconds.
160 max_response_time (Optional[float]): Maximum response time in seconds.
161 avg_response_time (Optional[float]): Average response time in seconds.
162 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
163 """
165 total_executions: int = Field(..., description="Total number of resource invocations")
166 successful_executions: int = Field(..., description="Number of successful resource invocations")
167 failed_executions: int = Field(..., description="Number of failed resource invocations")
168 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
169 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
170 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
171 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
172 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
175class ServerMetrics(BaseModelWithConfigDict):
176 """
177 Represents the performance and execution statistics for a server.
179 Attributes:
180 total_executions (int): Total number of server invocations.
181 successful_executions (int): Number of successful server invocations.
182 failed_executions (int): Number of failed server invocations.
183 failure_rate (float): Failure rate (failed invocations / total invocations).
184 min_response_time (Optional[float]): Minimum response time in seconds.
185 max_response_time (Optional[float]): Maximum response time in seconds.
186 avg_response_time (Optional[float]): Average response time in seconds.
187 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
188 """
190 total_executions: int = Field(..., description="Total number of server invocations")
191 successful_executions: int = Field(..., description="Number of successful server invocations")
192 failed_executions: int = Field(..., description="Number of failed server invocations")
193 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
194 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
195 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
196 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
197 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
200class PromptMetrics(BaseModelWithConfigDict):
201 """
202 Represents the performance and execution statistics for a prompt.
204 Attributes:
205 total_executions (int): Total number of prompt invocations.
206 successful_executions (int): Number of successful prompt invocations.
207 failed_executions (int): Number of failed prompt invocations.
208 failure_rate (float): Failure rate (failed invocations / total invocations).
209 min_response_time (Optional[float]): Minimum response time in seconds.
210 max_response_time (Optional[float]): Maximum response time in seconds.
211 avg_response_time (Optional[float]): Average response time in seconds.
212 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
213 """
215 total_executions: int = Field(..., description="Total number of prompt invocations")
216 successful_executions: int = Field(..., description="Number of successful prompt invocations")
217 failed_executions: int = Field(..., description="Number of failed prompt invocations")
218 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
219 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
220 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
221 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
222 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
225class A2AAgentMetrics(BaseModelWithConfigDict):
226 """
227 Represents the performance and execution statistics for an A2A agent.
229 Attributes:
230 total_executions (int): Total number of agent interactions.
231 successful_executions (int): Number of successful agent interactions.
232 failed_executions (int): Number of failed agent interactions.
233 failure_rate (float): Failure rate (failed interactions / total interactions).
234 min_response_time (Optional[float]): Minimum response time in seconds.
235 max_response_time (Optional[float]): Maximum response time in seconds.
236 avg_response_time (Optional[float]): Average response time in seconds.
237 last_execution_time (Optional[datetime]): Timestamp of the most recent interaction.
238 """
240 total_executions: int = Field(..., description="Total number of agent interactions")
241 successful_executions: int = Field(..., description="Number of successful agent interactions")
242 failed_executions: int = Field(..., description="Number of failed agent interactions")
243 failure_rate: float = Field(..., description="Failure rate (failed interactions / total interactions)")
244 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
245 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
246 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
247 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent interaction")
250# --- JSON Path API modifier Schema
253class JsonPathModifier(BaseModelWithConfigDict):
254 """Schema for JSONPath queries.
256 Provides the structure for parsing JSONPath queries and optional mapping.
257 """
259 jsonpath: Optional[str] = Field(None, description="JSONPath expression for querying JSON data.")
260 mapping: Optional[Dict[str, str]] = Field(None, description="Mapping of fields from original data to output.")
263# --- Tool Schemas ---
264# Authentication model
265class AuthenticationValues(BaseModelWithConfigDict):
266 """Schema for all Authentications.
267 Provides the authentication values for different types of authentication.
268 """
270 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, headers or None")
271 auth_value: Optional[str] = Field(None, description="Encoded Authentication values")
273 # Only For tool read and view tool
274 username: Optional[str] = Field("", description="Username for basic authentication")
275 password: Optional[str] = Field("", description="Password for basic authentication")
276 token: Optional[str] = Field("", description="Bearer token for authentication")
277 auth_header_key: Optional[str] = Field("", description="Key for custom headers authentication")
278 auth_header_value: Optional[str] = Field("", description="Value for custom headers authentication")
281class ToolCreate(BaseModel):
282 """
283 Represents the configuration for creating a tool with various attributes and settings.
285 Attributes:
286 model_config (ConfigDict): Configuration for the model.
287 name (str): Unique name for the tool.
288 url (Union[str, AnyHttpUrl]): Tool endpoint URL.
289 description (Optional[str]): Tool description.
290 integration_type (Literal["REST", "MCP"]): Tool integration type - REST for individual endpoints, MCP for gateway-discovered tools.
291 request_type (Literal["GET", "POST", "PUT", "DELETE", "PATCH"]): HTTP method to be used for invoking the tool.
292 headers (Optional[Dict[str, str]]): Additional headers to send when invoking the tool.
293 input_schema (Optional[Dict[str, Any]]): JSON Schema for validating tool parameters. Alias 'inputSchema'.
294 output_schema (Optional[Dict[str, Any]]): JSON Schema for validating tool output. Alias 'outputSchema'.
295 annotations (Optional[Dict[str, Any]]): Tool annotations for behavior hints such as title, readOnlyHint, destructiveHint, idempotentHint, openWorldHint.
296 jsonpath_filter (Optional[str]): JSON modification filter.
297 auth (Optional[AuthenticationValues]): Authentication credentials (Basic or Bearer Token or custom headers) if required.
298 gateway_id (Optional[str]): ID of the gateway for the tool.
299 """
301 model_config = ConfigDict(str_strip_whitespace=True, populate_by_name=True)
302 allow_auto: bool = False # Internal flag to allow system-initiated A2A tool creation
304 name: str = Field(..., description="Unique name for the tool")
305 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
306 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
307 description: Optional[str] = Field(None, description="Tool description")
308 integration_type: Literal["REST", "MCP", "A2A"] = Field("REST", description="'REST' for individual endpoints, 'MCP' for gateway-discovered tools, 'A2A' for A2A agents")
309 request_type: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "SSE", "STDIO", "STREAMABLEHTTP"] = Field("SSE", description="HTTP method to be used for invoking the tool")
310 headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
311 input_schema: Optional[Dict[str, Any]] = Field(default_factory=lambda: {"type": "object", "properties": {}}, description="JSON Schema for validating tool parameters", alias="inputSchema")
312 output_schema: Optional[Dict[str, Any]] = Field(default=None, description="JSON Schema for validating tool output", alias="outputSchema")
313 annotations: Optional[Dict[str, Any]] = Field(
314 default_factory=dict,
315 description="Tool annotations for behavior hints (title, readOnlyHint, destructiveHint, idempotentHint, openWorldHint)",
316 )
317 jsonpath_filter: Optional[str] = Field(default="", description="JSON modification filter")
318 auth: Optional[AuthenticationValues] = Field(None, description="Authentication credentials (Basic or Bearer Token or custom headers) if required")
319 gateway_id: Optional[str] = Field(None, description="id of gateway for the tool")
320 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the tool")
322 # Team scoping fields
323 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
324 owner_email: Optional[str] = Field(None, description="Email of the tool owner")
325 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
327 # Passthrough REST fields
328 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
329 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
330 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
331 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
332 timeout_ms: Optional[int] = Field(default=None, description="Timeout in milliseconds for REST passthrough (20000 if integration_type='REST', else None)")
333 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
334 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
335 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
336 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
338 @field_validator("tags")
339 @classmethod
340 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
341 """Validate and normalize tags.
343 Args:
344 v: Optional list of tag strings to validate
346 Returns:
347 List of validated tag strings
348 """
349 return validate_tags_field(v)
351 @field_validator("name")
352 @classmethod
353 def validate_name(cls, v: str) -> str:
354 """Ensure tool names follow MCP naming conventions
356 Args:
357 v (str): Value to validate
359 Returns:
360 str: Value if validated as safe
362 Raises:
363 ValueError: When displayName contains unsafe content or exceeds length limits
365 Examples:
366 >>> from mcpgateway.schemas import ToolCreate
367 >>> ToolCreate.validate_name('valid_tool')
368 'valid_tool'
369 >>> ToolCreate.validate_name('Invalid Tool!')
370 Traceback (most recent call last):
371 ...
372 ValueError: ...
373 """
374 return SecurityValidator.validate_tool_name(v)
376 @field_validator("url")
377 @classmethod
378 def validate_url(cls, v: Optional[str]) -> Optional[str]:
379 """Validate URL format and ensure safe display
381 Args:
382 v (Optional[str]): Value to validate
384 Returns:
385 Optional[str]: Value if validated as safe
387 Raises:
388 ValueError: When displayName contains unsafe content or exceeds length limits
390 Examples:
391 >>> from mcpgateway.schemas import ToolCreate
392 >>> ToolCreate.validate_url('https://example.com')
393 'https://example.com'
394 >>> ToolCreate.validate_url('ftp://example.com')
395 Traceback (most recent call last):
396 ...
397 ValueError: ...
398 """
399 if v is None:
400 return v
401 return SecurityValidator.validate_url(v, "Tool URL")
403 @field_validator("description")
404 @classmethod
405 def validate_description(cls, v: Optional[str]) -> Optional[str]:
406 """Ensure descriptions display safely, truncate if too long
408 Args:
409 v (str): Value to validate
411 Returns:
412 str: Value if validated as safe and truncated if too long
414 Raises:
415 ValueError: When value is unsafe
417 Examples:
418 >>> from mcpgateway.schemas import ToolCreate
419 >>> ToolCreate.validate_description('A safe description')
420 'A safe description'
421 >>> ToolCreate.validate_description(None) # Test None case
422 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
423 >>> truncated = ToolCreate.validate_description(long_desc)
424 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
425 0
426 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
427 True
428 """
429 if v is None:
430 return v
432 # Note: backticks (`) are allowed as they are commonly used in Markdown
433 # for inline code examples in tool descriptions
434 forbidden_patterns = ["&&", ";", "||", "$(", "|", "> ", "< "]
435 for pat in forbidden_patterns:
436 if pat in v:
437 raise ValueError(f"Description contains unsafe characters: '{pat}'")
439 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
440 # Truncate the description to the maximum allowed length
441 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
442 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
443 return SecurityValidator.sanitize_display_text(truncated, "Description")
444 return SecurityValidator.sanitize_display_text(v, "Description")
446 @field_validator("displayName")
447 @classmethod
448 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
449 """Ensure display names display safely
451 Args:
452 v (str): Value to validate
454 Returns:
455 str: Value if validated as safe
457 Raises:
458 ValueError: When displayName contains unsafe content or exceeds length limits
460 Examples:
461 >>> from mcpgateway.schemas import ToolCreate
462 >>> ToolCreate.validate_display_name('My Custom Tool')
463 'My Custom Tool'
464 >>> ToolCreate.validate_display_name('<script>alert("xss")</script>')
465 Traceback (most recent call last):
466 ...
467 ValueError: ...
468 """
469 if v is None:
470 return v
471 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
472 raise ValueError(f"Display name exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
473 return SecurityValidator.sanitize_display_text(v, "Display name")
475 @field_validator("headers", "input_schema", "annotations")
476 @classmethod
477 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
478 """Validate JSON structure depth
480 Args:
481 v (dict): Value to validate
483 Returns:
484 dict: Value if validated as safe
486 Examples:
487 >>> from mcpgateway.schemas import ToolCreate
488 >>> ToolCreate.validate_json_fields({'a': 1})
489 {'a': 1}
490 >>> # Test depth within limit (11 levels, default limit is 30)
491 >>> ToolCreate.validate_json_fields({'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': {'k': 1}}}}}}}}}}})
492 {'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': {'k': 1}}}}}}}}}}}
493 >>> # Test exceeding depth limit (31 levels)
494 >>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'too deep'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
495 >>> ToolCreate.validate_json_fields(deep_31)
496 Traceback (most recent call last):
497 ...
498 ValueError: ...
499 """
500 SecurityValidator.validate_json_depth(v)
501 return v
503 @field_validator("request_type")
504 @classmethod
505 def validate_request_type(cls, v: str, info: ValidationInfo) -> str:
506 """Validate request type based on integration type (REST, MCP, A2A)
508 Args:
509 v (str): Value to validate
510 info (ValidationInfo): Values used for validation
512 Returns:
513 str: Value if validated as safe
515 Raises:
516 ValueError: When value is unsafe
518 Examples:
519 >>> from pydantic import ValidationInfo
520 >>> # REST integration types with valid methods
521 >>> info_rest = type('obj', (object,), {'data': {'integration_type': 'REST'}})
522 >>> ToolCreate.validate_request_type('POST', info_rest)
523 'POST'
524 >>> ToolCreate.validate_request_type('GET', info_rest)
525 'GET'
526 >>> # MCP integration types with valid transports
527 >>> info_mcp = type('obj', (object,), {'data': {'integration_type': 'MCP'}})
528 >>> ToolCreate.validate_request_type('SSE', info_mcp)
529 'SSE'
530 >>> ToolCreate.validate_request_type('STDIO', info_mcp)
531 'STDIO'
532 >>> # A2A integration type with valid method
533 >>> info_a2a = type('obj', (object,), {'data': {'integration_type': 'A2A'}})
534 >>> ToolCreate.validate_request_type('POST', info_a2a)
535 'POST'
536 >>> # Invalid REST type
537 >>> try:
538 ... ToolCreate.validate_request_type('SSE', info_rest)
539 ... except ValueError as e:
540 ... "not allowed for REST" in str(e)
541 True
542 >>> # Invalid MCP type
543 >>> try:
544 ... ToolCreate.validate_request_type('POST', info_mcp)
545 ... except ValueError as e:
546 ... "not allowed for MCP" in str(e)
547 True
548 >>> # Invalid A2A type
549 >>> try:
550 ... ToolCreate.validate_request_type('GET', info_a2a)
551 ... except ValueError as e:
552 ... "not allowed for A2A" in str(e)
553 True
554 >>> # Invalid integration type
555 >>> info_invalid = type('obj', (object,), {'data': {'integration_type': 'INVALID'}})
556 >>> try:
557 ... ToolCreate.validate_request_type('GET', info_invalid)
558 ... except ValueError as e:
559 ... "Unknown integration type" in str(e)
560 True
561 """
563 integration_type = info.data.get("integration_type")
565 if integration_type not in ["REST", "MCP", "A2A"]:
566 raise ValueError(f"Unknown integration type: {integration_type}")
568 if integration_type == "REST":
569 allowed = ["GET", "POST", "PUT", "DELETE", "PATCH"]
570 if v not in allowed:
571 raise ValueError(f"Request type '{v}' not allowed for REST. Only {allowed} methods are accepted.")
572 elif integration_type == "MCP":
573 allowed = ["SSE", "STDIO", "STREAMABLEHTTP"]
574 if v not in allowed:
575 raise ValueError(f"Request type '{v}' not allowed for MCP. Only {allowed} transports are accepted.")
576 elif integration_type == "A2A": 576 ↛ 580line 576 didn't jump to line 580 because the condition on line 576 was always true
577 allowed = ["POST"]
578 if v not in allowed:
579 raise ValueError(f"Request type '{v}' not allowed for A2A. Only {allowed} methods are accepted.")
580 return v
582 @model_validator(mode="before")
583 @classmethod
584 def assemble_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
585 """
586 Assemble authentication information from separate keys if provided.
588 Looks for keys "auth_type", "auth_username", "auth_password", "auth_token", "auth_header_key" and "auth_header_value".
589 Constructs the "auth" field as a dictionary suitable for BasicAuth or BearerTokenAuth or HeadersAuth.
591 Args:
592 values: Dict with authentication information
594 Returns:
595 Dict: Reformatedd values dict
597 Examples:
598 >>> # Test basic auth
599 >>> values = {'auth_type': 'basic', 'auth_username': 'user', 'auth_password': 'pass'}
600 >>> result = ToolCreate.assemble_auth(values)
601 >>> 'auth' in result
602 True
603 >>> result['auth']['auth_type']
604 'basic'
606 >>> # Test bearer auth
607 >>> values = {'auth_type': 'bearer', 'auth_token': 'mytoken'}
608 >>> result = ToolCreate.assemble_auth(values)
609 >>> result['auth']['auth_type']
610 'bearer'
612 >>> # Test authheaders
613 >>> values = {'auth_type': 'authheaders', 'auth_header_key': 'X-API-Key', 'auth_header_value': 'secret'}
614 >>> result = ToolCreate.assemble_auth(values)
615 >>> result['auth']['auth_type']
616 'authheaders'
618 >>> # Test no auth type
619 >>> values = {'name': 'test'}
620 >>> result = ToolCreate.assemble_auth(values)
621 >>> 'auth' in result
622 False
623 """
624 logger.debug(
625 "Assembling auth in ToolCreate with raw values",
626 extra={
627 "auth_type": values.get("auth_type"),
628 "auth_username": values.get("auth_username"),
629 "auth_password": values.get("auth_password"),
630 "auth_token": values.get("auth_token"),
631 "auth_header_key": values.get("auth_header_key"),
632 "auth_header_value": values.get("auth_header_value"),
633 },
634 )
636 auth_type = values.get("auth_type")
637 if auth_type and auth_type.lower() != "one_time_auth":
638 if auth_type.lower() == "basic":
639 creds = base64.b64encode(f"{values.get('auth_username', '')}:{values.get('auth_password', '')}".encode("utf-8")).decode()
640 encoded_auth = encode_auth({"Authorization": f"Basic {creds}"})
641 values["auth"] = {"auth_type": "basic", "auth_value": encoded_auth}
642 elif auth_type.lower() == "bearer":
643 encoded_auth = encode_auth({"Authorization": f"Bearer {values.get('auth_token', '')}"})
644 values["auth"] = {"auth_type": "bearer", "auth_value": encoded_auth}
645 elif auth_type.lower() == "authheaders":
646 header_key = values.get("auth_header_key", "")
647 header_value = values.get("auth_header_value", "")
648 if header_key and header_value:
649 encoded_auth = encode_auth({header_key: header_value})
650 values["auth"] = {"auth_type": "authheaders", "auth_value": encoded_auth}
651 else:
652 # Don't encode empty headers - leave auth empty
653 values["auth"] = {"auth_type": "authheaders", "auth_value": None}
654 return values
656 @model_validator(mode="before")
657 @classmethod
658 def prevent_manual_mcp_creation(cls, values: Dict[str, Any]) -> Dict[str, Any]:
659 """
660 Prevent manual creation of MCP tools via API.
662 MCP tools should only be created by the gateway service when discovering
663 tools from MCP servers. Users should add MCP servers via the Gateways interface.
665 Args:
666 values: The input values
668 Returns:
669 Dict[str, Any]: The validated values
671 Raises:
672 ValueError: If attempting to manually create MCP integration type
673 """
674 integration_type = values.get("integration_type")
675 allow_auto = values.get("allow_auto", False)
676 if integration_type == "MCP":
677 raise ValueError("Cannot manually create MCP tools. Add MCP servers via the Gateways interface - tools will be auto-discovered and registered with integration_type='MCP'.")
678 if integration_type == "A2A" and not allow_auto:
679 raise ValueError("Cannot manually create A2A tools. Add A2A agents via the A2A interface - tools will be auto-created when agents are associated with servers.")
680 return values
682 @model_validator(mode="before")
683 @classmethod
684 def enforce_passthrough_fields_for_rest(cls, values: Dict[str, Any]) -> Dict[str, Any]:
685 """
686 Enforce that passthrough REST fields are only set for integration_type 'REST'.
687 If any passthrough field is set for non-REST, raise ValueError.
689 Args:
690 values (Dict[str, Any]): The input values to validate.
692 Returns:
693 Dict[str, Any]: The validated values.
695 Raises:
696 ValueError: If passthrough fields are set for non-REST integration_type.
697 """
698 passthrough_fields = ["base_url", "path_template", "query_mapping", "header_mapping", "timeout_ms", "expose_passthrough", "allowlist", "plugin_chain_pre", "plugin_chain_post"]
699 integration_type = values.get("integration_type")
700 if integration_type != "REST":
701 for field in passthrough_fields:
702 if field in values and values[field] not in (None, [], {}):
703 raise ValueError(f"Field '{field}' is only allowed for integration_type 'REST'.")
704 return values
706 @model_validator(mode="before")
707 @classmethod
708 def extract_base_url_and_path_template(cls, values: dict) -> dict:
709 """
710 Only for integration_type 'REST':
711 If 'url' is provided, extract 'base_url' and 'path_template'.
712 Ensures path_template starts with a single '/'.
714 Args:
715 values (dict): The input values to process.
717 Returns:
718 dict: The updated values with base_url and path_template if applicable.
719 """
720 integration_type = values.get("integration_type")
721 if integration_type != "REST":
722 # Only process for REST, skip for others
723 return values
724 url = values.get("url")
725 if url:
726 parsed = urlparse(str(url))
727 base_url = f"{parsed.scheme}://{parsed.netloc}"
728 path_template = parsed.path
729 # Ensure path_template starts with a single '/'
730 if path_template:
731 path_template = "/" + path_template.lstrip("/")
732 if not values.get("base_url"):
733 values["base_url"] = base_url
734 if not values.get("path_template"):
735 values["path_template"] = path_template
736 return values
738 @field_validator("base_url")
739 @classmethod
740 def validate_base_url(cls, v):
741 """
742 Validate that base_url is a valid URL with scheme and netloc.
744 Args:
745 v (str): The base_url value to validate.
747 Returns:
748 str: The validated base_url value.
750 Raises:
751 ValueError: If base_url is not a valid URL.
752 """
753 if v is None:
754 return v
755 parsed = urlparse(str(v))
756 if not parsed.scheme or not parsed.netloc:
757 raise ValueError("base_url must be a valid URL with scheme and netloc")
758 return v
760 @field_validator("path_template")
761 @classmethod
762 def validate_path_template(cls, v):
763 """
764 Validate that path_template starts with '/'.
766 Args:
767 v (str): The path_template value to validate.
769 Returns:
770 str: The validated path_template value.
772 Raises:
773 ValueError: If path_template does not start with '/'.
774 """
775 if v and not str(v).startswith("/"):
776 raise ValueError("path_template must start with '/'")
777 return v
779 @field_validator("timeout_ms")
780 @classmethod
781 def validate_timeout_ms(cls, v):
782 """
783 Validate that timeout_ms is a positive integer.
785 Args:
786 v (int): The timeout_ms value to validate.
788 Returns:
789 int: The validated timeout_ms value.
791 Raises:
792 ValueError: If timeout_ms is not a positive integer.
793 """
794 if v is not None and v <= 0:
795 raise ValueError("timeout_ms must be a positive integer")
796 return v
798 @field_validator("allowlist")
799 @classmethod
800 def validate_allowlist(cls, v):
801 """
802 Validate that allowlist is a list and each entry is a valid host or scheme string.
804 Args:
805 v (List[str]): The allowlist to validate.
807 Returns:
808 List[str]: The validated allowlist.
810 Raises:
811 ValueError: If allowlist is not a list or any entry is not a valid host/scheme string.
812 """
813 if v is None:
814 return None
815 if not isinstance(v, list):
816 raise ValueError("allowlist must be a list of host/scheme strings")
817 # Uses precompiled regex for hostname validation
818 for host in v:
819 if not isinstance(host, str):
820 raise ValueError(f"Invalid type in allowlist: {host} (must be str)")
821 if not _HOSTNAME_RE.match(host):
822 raise ValueError(f"Invalid host/scheme in allowlist: {host}")
823 return v
825 @field_validator("plugin_chain_pre", "plugin_chain_post")
826 @classmethod
827 def validate_plugin_chain(cls, v):
828 """
829 Validate that each plugin in the chain is allowed.
831 Args:
832 v (List[str]): The plugin chain to validate.
834 Returns:
835 List[str]: The validated plugin chain.
837 Raises:
838 ValueError: If any plugin is not in the allowed set.
839 """
840 allowed_plugins = {"deny_filter", "rate_limit", "pii_filter", "response_shape", "regex_filter", "resource_filter"}
841 if v is not None: 841 ↛ 845line 841 didn't jump to line 845 because the condition on line 841 was always true
842 for plugin in v:
843 if plugin not in allowed_plugins:
844 raise ValueError(f"Unknown plugin: {plugin}")
845 return v
847 @model_validator(mode="after")
848 def handle_timeout_ms_defaults(self):
849 """Handle timeout_ms defaults based on integration_type and expose_passthrough.
851 Returns:
852 self: The validated model instance with timeout_ms potentially set to default.
853 """
854 # If timeout_ms is None and we have REST with passthrough, set default
855 if self.timeout_ms is None and self.integration_type == "REST" and getattr(self, "expose_passthrough", True):
856 self.timeout_ms = 20000
857 return self
860class ToolUpdate(BaseModelWithConfigDict):
861 """Schema for updating an existing tool.
863 Similar to ToolCreate but all fields are optional to allow partial updates.
864 """
866 name: Optional[str] = Field(None, description="Unique name for the tool")
867 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
868 custom_name: Optional[str] = Field(None, description="Custom name for the tool")
869 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
870 description: Optional[str] = Field(None, description="Tool description")
871 integration_type: Optional[Literal["REST", "MCP", "A2A"]] = Field(None, description="Tool integration type")
872 request_type: Optional[Literal["GET", "POST", "PUT", "DELETE", "PATCH"]] = Field(None, description="HTTP method to be used for invoking the tool")
873 headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
874 input_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool parameters")
875 output_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool output")
876 annotations: Optional[Dict[str, Any]] = Field(None, description="Tool annotations for behavior hints")
877 jsonpath_filter: Optional[str] = Field(None, description="JSON path filter for rpc tool calls")
878 auth: Optional[AuthenticationValues] = Field(None, description="Authentication credentials (Basic or Bearer Token or custom headers) if required")
879 gateway_id: Optional[str] = Field(None, description="id of gateway for the tool")
880 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the tool")
881 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
883 # Passthrough REST fields
884 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
885 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
886 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
887 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
888 timeout_ms: Optional[int] = Field(default=None, description="Timeout in milliseconds for REST passthrough (20000 if integration_type='REST', else None)")
889 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
890 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
891 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
892 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
894 @field_validator("tags")
895 @classmethod
896 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
897 """Validate and normalize tags.
899 Args:
900 v: Optional list of tag strings to validate
902 Returns:
903 List of validated tag strings or None if input is None
904 """
905 return validate_tags_field(v)
907 @field_validator("name")
908 @classmethod
909 def validate_name(cls, v: str) -> str:
910 """Ensure tool names follow MCP naming conventions
912 Args:
913 v (str): Value to validate
915 Returns:
916 str: Value if validated as safe
917 """
918 return SecurityValidator.validate_tool_name(v)
920 @field_validator("custom_name")
921 @classmethod
922 def validate_custom_name(cls, v: str) -> str:
923 """Ensure custom tool names follow MCP naming conventions
925 Args:
926 v (str): Value to validate
928 Returns:
929 str: Value if validated as safe
930 """
931 return SecurityValidator.validate_tool_name(v)
933 @field_validator("url")
934 @classmethod
935 def validate_url(cls, v: Optional[str]) -> Optional[str]:
936 """Validate URL format and ensure safe display
938 Args:
939 v (Optional[str]): Value to validate
941 Returns:
942 Optional[str]: Value if validated as safe
943 """
944 if v is None:
945 return v
946 return SecurityValidator.validate_url(v, "Tool URL")
948 @field_validator("description")
949 @classmethod
950 def validate_description(cls, v: Optional[str]) -> Optional[str]:
951 """Ensure descriptions display safely
953 Args:
954 v (str): Value to validate
956 Returns:
957 str: Value if validated as safe
959 Raises:
960 ValueError: When value is unsafe
962 Examples:
963 >>> from mcpgateway.schemas import ToolUpdate
964 >>> ToolUpdate.validate_description('A safe description')
965 'A safe description'
966 >>> ToolUpdate.validate_description(None) # Test None case
967 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
968 >>> truncated = ToolUpdate.validate_description(long_desc)
969 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
970 0
971 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
972 True
973 """
974 if v is None:
975 return v
976 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
977 # Truncate the description to the maximum allowed length
978 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
979 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
980 return SecurityValidator.sanitize_display_text(truncated, "Description")
981 return SecurityValidator.sanitize_display_text(v, "Description")
983 @field_validator("headers", "input_schema", "annotations")
984 @classmethod
985 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
986 """Validate JSON structure depth
988 Args:
989 v (dict): Value to validate
991 Returns:
992 dict: Value if validated as safe
993 """
994 SecurityValidator.validate_json_depth(v)
995 return v
997 @field_validator("request_type")
998 @classmethod
999 def validate_request_type(cls, v: str, info: ValidationInfo) -> str:
1000 """Validate request type based on integration type
1002 Args:
1003 v (str): Value to validate
1004 info (ValidationInfo): Validation context with other field values
1006 Returns:
1007 str: Value if validated as safe
1009 Raises:
1010 ValueError: When value is unsafe
1011 """
1013 integration_type = info.data.get("integration_type", "REST")
1015 if integration_type == "REST":
1016 allowed = ["GET", "POST", "PUT", "DELETE", "PATCH"]
1017 elif integration_type == "MCP":
1018 allowed = ["SSE", "STDIO", "STREAMABLEHTTP"]
1019 elif integration_type == "A2A":
1020 allowed = ["POST"] # A2A agents typically use POST
1021 else:
1022 raise ValueError(f"Unknown integration type: {integration_type}")
1024 if v not in allowed:
1025 raise ValueError(f"Request type '{v}' not allowed for {integration_type} integration")
1026 return v
1028 @model_validator(mode="before")
1029 @classmethod
1030 def assemble_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
1031 """
1032 Assemble authentication information from separate keys if provided.
1034 Looks for keys "auth_type", "auth_username", "auth_password", "auth_token", "auth_header_key" and "auth_header_value".
1035 Constructs the "auth" field as a dictionary suitable for BasicAuth or BearerTokenAuth or HeadersAuth.
1037 Args:
1038 values: Dict with authentication information
1040 Returns:
1041 Dict: Reformatedd values dict
1042 """
1043 logger.debug(
1044 "Assembling auth in ToolCreate with raw values",
1045 extra={
1046 "auth_type": values.get("auth_type"),
1047 "auth_username": values.get("auth_username"),
1048 "auth_password": values.get("auth_password"),
1049 "auth_token": values.get("auth_token"),
1050 "auth_header_key": values.get("auth_header_key"),
1051 "auth_header_value": values.get("auth_header_value"),
1052 },
1053 )
1055 auth_type = values.get("auth_type")
1056 if auth_type and auth_type.lower() != "one_time_auth":
1057 if auth_type.lower() == "basic":
1058 creds = base64.b64encode(f"{values.get('auth_username', '')}:{values.get('auth_password', '')}".encode("utf-8")).decode()
1059 encoded_auth = encode_auth({"Authorization": f"Basic {creds}"})
1060 values["auth"] = {"auth_type": "basic", "auth_value": encoded_auth}
1061 elif auth_type.lower() == "bearer":
1062 encoded_auth = encode_auth({"Authorization": f"Bearer {values.get('auth_token', '')}"})
1063 values["auth"] = {"auth_type": "bearer", "auth_value": encoded_auth}
1064 elif auth_type.lower() == "authheaders": 1064 ↛ 1073line 1064 didn't jump to line 1073 because the condition on line 1064 was always true
1065 header_key = values.get("auth_header_key", "")
1066 header_value = values.get("auth_header_value", "")
1067 if header_key and header_value:
1068 encoded_auth = encode_auth({header_key: header_value})
1069 values["auth"] = {"auth_type": "authheaders", "auth_value": encoded_auth}
1070 else:
1071 # Don't encode empty headers - leave auth empty
1072 values["auth"] = {"auth_type": "authheaders", "auth_value": None}
1073 return values
1075 @field_validator("displayName")
1076 @classmethod
1077 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
1078 """Ensure display names display safely
1080 Args:
1081 v (str): Value to validate
1083 Returns:
1084 str: Value if validated as safe
1086 Raises:
1087 ValueError: When displayName contains unsafe content or exceeds length limits
1089 Examples:
1090 >>> from mcpgateway.schemas import ToolUpdate
1091 >>> ToolUpdate.validate_display_name('My Custom Tool')
1092 'My Custom Tool'
1093 >>> ToolUpdate.validate_display_name('<script>alert("xss")</script>')
1094 Traceback (most recent call last):
1095 ...
1096 ValueError: ...
1097 """
1098 if v is None:
1099 return v
1100 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
1101 raise ValueError(f"Display name exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
1102 return SecurityValidator.sanitize_display_text(v, "Display name")
1104 @model_validator(mode="before")
1105 @classmethod
1106 def prevent_manual_mcp_update(cls, values: Dict[str, Any]) -> Dict[str, Any]:
1107 """
1108 Prevent updating tools to MCP integration type via API.
1110 MCP tools should only be managed by the gateway service. Users should not
1111 be able to change a REST tool to MCP type or vice versa manually.
1113 Args:
1114 values: The input values
1116 Returns:
1117 Dict[str, Any]: The validated values
1119 Raises:
1120 ValueError: If attempting to update to MCP integration type
1121 """
1122 integration_type = values.get("integration_type")
1123 if integration_type == "MCP":
1124 raise ValueError("Cannot update tools to MCP integration type. MCP tools are managed by the gateway service.")
1125 if integration_type == "A2A":
1126 raise ValueError("Cannot update tools to A2A integration type. A2A tools are managed by the A2A service.")
1127 return values
1129 @model_validator(mode="before")
1130 @classmethod
1131 def extract_base_url_and_path_template(cls, values: dict) -> dict:
1132 """
1133 If 'integration_type' is 'REST' and 'url' is provided, extract 'base_url' and 'path_template'.
1134 Ensures path_template starts with a single '/'.
1136 Args:
1137 values (dict): The input values to process.
1139 Returns:
1140 dict: The updated values with base_url and path_template if applicable.
1141 """
1142 integration_type = values.get("integration_type")
1143 url = values.get("url")
1144 if integration_type == "REST" and url:
1145 parsed = urlparse(str(url))
1146 base_url = f"{parsed.scheme}://{parsed.netloc}"
1147 path_template = parsed.path
1148 # Ensure path_template starts with a single '/'
1149 if path_template and not path_template.startswith("/"):
1150 path_template = "/" + path_template.lstrip("/")
1151 elif path_template:
1152 path_template = "/" + path_template.lstrip("/")
1153 if not values.get("base_url"): 1153 ↛ 1155line 1153 didn't jump to line 1155 because the condition on line 1153 was always true
1154 values["base_url"] = base_url
1155 if not values.get("path_template"): 1155 ↛ 1157line 1155 didn't jump to line 1157 because the condition on line 1155 was always true
1156 values["path_template"] = path_template
1157 return values
1159 @field_validator("base_url")
1160 @classmethod
1161 def validate_base_url(cls, v):
1162 """
1163 Validate that base_url is a valid URL with scheme and netloc.
1165 Args:
1166 v (str): The base_url value to validate.
1168 Returns:
1169 str: The validated base_url value.
1171 Raises:
1172 ValueError: If base_url is not a valid URL.
1173 """
1174 if v is None:
1175 return v
1176 parsed = urlparse(str(v))
1177 if not parsed.scheme or not parsed.netloc:
1178 raise ValueError("base_url must be a valid URL with scheme and netloc")
1179 return v
1181 @field_validator("path_template")
1182 @classmethod
1183 def validate_path_template(cls, v):
1184 """
1185 Validate that path_template starts with '/'.
1187 Args:
1188 v (str): The path_template value to validate.
1190 Returns:
1191 str: The validated path_template value.
1193 Raises:
1194 ValueError: If path_template does not start with '/'.
1195 """
1196 if v and not str(v).startswith("/"):
1197 raise ValueError("path_template must start with '/'")
1198 return v
1200 @field_validator("timeout_ms")
1201 @classmethod
1202 def validate_timeout_ms(cls, v):
1203 """
1204 Validate that timeout_ms is a positive integer.
1206 Args:
1207 v (int): The timeout_ms value to validate.
1209 Returns:
1210 int: The validated timeout_ms value.
1212 Raises:
1213 ValueError: If timeout_ms is not a positive integer.
1214 """
1215 if v is not None and v <= 0:
1216 raise ValueError("timeout_ms must be a positive integer")
1217 return v
1219 @field_validator("allowlist")
1220 @classmethod
1221 def validate_allowlist(cls, v):
1222 """
1223 Validate that allowlist is a list and each entry is a valid host or scheme string.
1225 Args:
1226 v (List[str]): The allowlist to validate.
1228 Returns:
1229 List[str]: The validated allowlist.
1231 Raises:
1232 ValueError: If allowlist is not a list or any entry is not a valid host/scheme string.
1233 """
1234 if v is None:
1235 return None
1236 if not isinstance(v, list):
1237 raise ValueError("allowlist must be a list of host/scheme strings")
1238 # Uses precompiled regex for hostname validation
1239 for host in v:
1240 if not isinstance(host, str):
1241 raise ValueError(f"Invalid type in allowlist: {host} (must be str)")
1242 if not _HOSTNAME_RE.match(host):
1243 raise ValueError(f"Invalid host/scheme in allowlist: {host}")
1244 return v
1246 @field_validator("plugin_chain_pre", "plugin_chain_post")
1247 @classmethod
1248 def validate_plugin_chain(cls, v):
1249 """
1250 Validate that each plugin in the chain is allowed.
1252 Args:
1253 v (List[str]): The plugin chain to validate.
1255 Returns:
1256 List[str]: The validated plugin chain.
1258 Raises:
1259 ValueError: If any plugin is not in the allowed set.
1260 """
1261 allowed_plugins = {"deny_filter", "rate_limit", "pii_filter", "response_shape", "regex_filter", "resource_filter"}
1262 if v is not None: 1262 ↛ 1266line 1262 didn't jump to line 1266 because the condition on line 1262 was always true
1263 for plugin in v:
1264 if plugin not in allowed_plugins:
1265 raise ValueError(f"Unknown plugin: {plugin}")
1266 return v
1269class ToolRead(BaseModelWithConfigDict):
1270 """Schema for reading tool information.
1272 Includes all tool fields plus:
1273 - Database ID
1274 - Creation/update timestamps
1275 - enabled: If Tool is enabled or disabled.
1276 - reachable: If Tool is reachable or not.
1277 - Gateway ID for federation
1278 - Execution count indicating the number of times the tool has been executed.
1279 - Metrics: Aggregated metrics for the tool invocations.
1280 - Request type and authentication settings.
1281 """
1283 id: str
1284 original_name: str
1285 url: Optional[str]
1286 description: Optional[str]
1287 request_type: str
1288 integration_type: str
1289 headers: Optional[Dict[str, str]]
1290 input_schema: Dict[str, Any]
1291 output_schema: Optional[Dict[str, Any]] = Field(None)
1292 annotations: Optional[Dict[str, Any]]
1293 jsonpath_filter: Optional[str]
1294 auth: Optional[AuthenticationValues]
1295 created_at: datetime
1296 updated_at: datetime
1297 enabled: bool
1298 reachable: bool
1299 gateway_id: Optional[str]
1300 execution_count: Optional[int] = Field(None)
1301 metrics: Optional[ToolMetrics] = Field(None)
1302 name: str
1303 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
1304 gateway_slug: str
1305 custom_name: str
1306 custom_name_slug: str
1307 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the tool")
1309 # Comprehensive metadata for audit tracking
1310 created_by: Optional[str] = Field(None, description="Username who created this entity")
1311 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
1312 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
1313 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
1315 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
1316 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
1317 modified_via: Optional[str] = Field(None, description="Modification method")
1318 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
1320 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
1321 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
1322 version: Optional[int] = Field(1, description="Entity version for change tracking")
1324 # Team scoping fields
1325 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
1326 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
1327 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
1328 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
1330 # Passthrough REST fields
1331 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
1332 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
1333 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
1334 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
1335 timeout_ms: Optional[int] = Field(20000, description="Timeout in milliseconds for REST passthrough")
1336 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
1337 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
1338 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
1339 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
1341 # MCP protocol extension field
1342 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
1345class ToolInvocation(BaseModelWithConfigDict):
1346 """Schema for tool invocation requests.
1348 This schema validates tool invocation requests to ensure they follow MCP
1349 (Model Context Protocol) naming conventions and prevent security vulnerabilities
1350 such as XSS attacks or deeply nested payloads that could cause DoS.
1352 Captures:
1353 - Tool name to invoke (validated for safety and MCP compliance)
1354 - Arguments matching tool's input schema (validated for depth limits)
1356 Validation Rules:
1357 - Tool names must start with a letter, number, or underscore and contain only
1358 letters, numbers, periods, underscores, hyphens, and slashes (per SEP-986)
1359 - Tool names cannot contain HTML special characters (<, >, ", ')
1360 - Arguments are validated to prevent excessively deep nesting (default max: 10 levels)
1362 Attributes:
1363 name (str): Name of the tool to invoke. Must follow MCP naming conventions.
1364 arguments (Dict[str, Any]): Arguments to pass to the tool. Must match the
1365 tool's input schema and not exceed depth limits.
1367 Examples:
1368 >>> from pydantic import ValidationError
1369 >>> # Valid tool invocation
1370 >>> tool_inv = ToolInvocation(name="get_weather", arguments={"city": "London"})
1371 >>> tool_inv.name
1372 'get_weather'
1373 >>> tool_inv.arguments
1374 {'city': 'London'}
1376 >>> # Valid tool name with underscores and numbers
1377 >>> tool_inv = ToolInvocation(name="tool_v2_beta", arguments={})
1378 >>> tool_inv.name
1379 'tool_v2_beta'
1381 >>> # Invalid: Tool name with special characters
1382 >>> try:
1383 ... ToolInvocation(name="tool-name!", arguments={})
1384 ... except ValidationError as e:
1385 ... print("Validation failed: Special characters not allowed")
1386 Validation failed: Special characters not allowed
1388 >>> # Invalid: XSS attempt in tool name
1389 >>> try:
1390 ... ToolInvocation(name="<script>alert('XSS')</script>", arguments={})
1391 ... except ValidationError as e:
1392 ... print("Validation failed: HTML tags not allowed")
1393 Validation failed: HTML tags not allowed
1395 >>> # Valid: Tool name starting with number (per MCP spec)
1396 >>> tool_num = ToolInvocation(name="123_tool", arguments={})
1397 >>> tool_num.name
1398 '123_tool'
1400 >>> # Valid: Tool name starting with underscore (per MCP spec)
1401 >>> tool_underscore = ToolInvocation(name="_5gpt_query", arguments={})
1402 >>> tool_underscore.name
1403 '_5gpt_query'
1405 >>> # Invalid: Tool name starting with hyphen
1406 >>> try:
1407 ... ToolInvocation(name="-invalid_tool", arguments={})
1408 ... except ValidationError as e:
1409 ... print("Validation failed: Must start with letter, number, or underscore")
1410 Validation failed: Must start with letter, number, or underscore
1412 >>> # Valid: Complex but not too deep arguments
1413 >>> args = {"level1": {"level2": {"level3": {"data": "value"}}}}
1414 >>> tool_inv = ToolInvocation(name="process_data", arguments=args)
1415 >>> tool_inv.arguments["level1"]["level2"]["level3"]["data"]
1416 'value'
1418 >>> # Invalid: Arguments too deeply nested (>30 levels)
1419 >>> deep_args = {"a": {"b": {"c": {"d": {"e": {"f": {"g": {"h": {"i": {"j": {"k": {"l": {"m": {"n": {"o": {"p": {"q": {"r": {"s": {"t": {"u": {"v": {"w": {"x": {"y": {"z": {"aa": {"bb": {"cc": {"dd": {"ee": "too deep"}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
1420 >>> try:
1421 ... ToolInvocation(name="process_data", arguments=deep_args)
1422 ... except ValidationError as e:
1423 ... print("Validation failed: Exceeds maximum depth")
1424 Validation failed: Exceeds maximum depth
1426 >>> # Edge case: Empty tool name
1427 >>> try:
1428 ... ToolInvocation(name="", arguments={})
1429 ... except ValidationError as e:
1430 ... print("Validation failed: Name cannot be empty")
1431 Validation failed: Name cannot be empty
1433 >>> # Valid: Tool name with hyphen (but not starting/ending)
1434 >>> tool_inv = ToolInvocation(name="get_user_info", arguments={"id": 123})
1435 >>> tool_inv.name
1436 'get_user_info'
1438 >>> # Arguments with various types
1439 >>> args = {
1440 ... "string": "value",
1441 ... "number": 42,
1442 ... "boolean": True,
1443 ... "array": [1, 2, 3],
1444 ... "nested": {"key": "value"}
1445 ... }
1446 >>> tool_inv = ToolInvocation(name="complex_tool", arguments=args)
1447 >>> tool_inv.arguments["number"]
1448 42
1449 """
1451 name: str = Field(..., description="Name of tool to invoke")
1452 arguments: Dict[str, Any] = Field(default_factory=dict, description="Arguments matching tool's input schema")
1454 @field_validator("name")
1455 @classmethod
1456 def validate_name(cls, v: str) -> str:
1457 """Ensure tool names follow MCP naming conventions.
1459 Validates that the tool name:
1460 - Is not empty
1461 - Starts with a letter (not a number or special character)
1462 - Contains only letters, numbers, underscores, and hyphens
1463 - Does not contain HTML special characters that could cause XSS
1464 - Does not exceed maximum length (255 characters)
1466 Args:
1467 v (str): Tool name to validate
1469 Returns:
1470 str: The validated tool name if it passes all checks
1472 Raises:
1473 ValueError: If the tool name violates any validation rules
1474 """
1475 return SecurityValidator.validate_tool_name(v)
1477 @field_validator("arguments")
1478 @classmethod
1479 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
1480 """Validate arguments structure depth to prevent DoS attacks.
1482 Ensures that the arguments dictionary doesn't have excessive nesting
1483 that could cause performance issues or stack overflow. The default
1484 maximum depth is 10 levels.
1486 Args:
1487 v (dict): Arguments dictionary to validate
1489 Returns:
1490 dict: The validated arguments if within depth limits
1492 Raises:
1493 ValueError: If the arguments exceed the maximum allowed depth
1494 """
1495 SecurityValidator.validate_json_depth(v)
1496 return v
1499class ToolResult(BaseModelWithConfigDict):
1500 """Schema for tool invocation results.
1502 Supports:
1503 - Multiple content types (text/image)
1504 - Error reporting
1505 - Optional error messages
1506 """
1508 content: List[Union[TextContent, ImageContent]]
1509 structured_content: Optional[Dict[str, Any]] = None
1510 is_error: bool = False
1511 error_message: Optional[str] = None
1514class ResourceCreate(BaseModel):
1515 """
1516 Schema for creating a new resource.
1518 Attributes:
1519 model_config (ConfigDict): Configuration for the model.
1520 uri (str): Unique URI for the resource.
1521 name (str): Human-readable name for the resource.
1522 description (Optional[str]): Optional description of the resource.
1523 mime_type (Optional[str]): Optional MIME type of the resource.
1524 template (Optional[str]): Optional URI template for parameterized resources.
1525 content (Union[str, bytes]): Content of the resource, which can be text or binary.
1526 """
1528 model_config = ConfigDict(str_strip_whitespace=True, populate_by_name=True)
1530 uri: str = Field(..., description="Unique URI for the resource")
1531 name: str = Field(..., description="Human-readable resource name")
1532 description: Optional[str] = Field(None, description="Resource description")
1533 mime_type: Optional[str] = Field(None, alias="mimeType", description="Resource MIME type")
1534 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1535 content: Union[str, bytes] = Field(..., description="Resource content (text or binary)")
1536 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the resource")
1538 # Team scoping fields
1539 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
1540 owner_email: Optional[str] = Field(None, description="Email of the resource owner")
1541 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
1542 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the resource")
1544 @field_validator("tags")
1545 @classmethod
1546 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
1547 """Validate and normalize tags.
1549 Args:
1550 v: Optional list of tag strings to validate
1552 Returns:
1553 List of validated tag strings
1554 """
1555 return validate_tags_field(v)
1557 @field_validator("uri")
1558 @classmethod
1559 def validate_uri(cls, v: str) -> str:
1560 """Validate URI format
1562 Args:
1563 v (str): Value to validate
1565 Returns:
1566 str: Value if validated as safe
1567 """
1568 return SecurityValidator.validate_uri(v, "Resource URI")
1570 @field_validator("name")
1571 @classmethod
1572 def validate_name(cls, v: str) -> str:
1573 """Validate resource name
1575 Args:
1576 v (str): Value to validate
1578 Returns:
1579 str: Value if validated as safe
1580 """
1581 return SecurityValidator.validate_name(v, "Resource name")
1583 @field_validator("description")
1584 @classmethod
1585 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1586 """Ensure descriptions display safely, truncate if too long
1588 Args:
1589 v (str): Value to validate
1591 Returns:
1592 str: Value if validated as safe and truncated if too long
1594 Raises:
1595 ValueError: When value is unsafe
1597 Examples:
1598 >>> from mcpgateway.schemas import ResourceCreate
1599 >>> ResourceCreate.validate_description('A safe description')
1600 'A safe description'
1601 >>> ResourceCreate.validate_description(None) # Test None case
1602 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1603 >>> truncated = ResourceCreate.validate_description(long_desc)
1604 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1605 0
1606 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1607 True
1608 """
1609 if v is None:
1610 return v
1611 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1612 # Truncate the description to the maximum allowed length
1613 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1614 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1615 return SecurityValidator.sanitize_display_text(truncated, "Description")
1616 return SecurityValidator.sanitize_display_text(v, "Description")
1618 @field_validator("mime_type")
1619 @classmethod
1620 def validate_mime_type(cls, v: Optional[str]) -> Optional[str]:
1621 """Validate MIME type format
1623 Args:
1624 v (str): Value to validate
1626 Returns:
1627 str: Value if validated as safe
1628 """
1629 if v is None:
1630 return v
1631 return SecurityValidator.validate_mime_type(v)
1633 @field_validator("content")
1634 @classmethod
1635 def validate_content(cls, v: Optional[Union[str, bytes]]) -> Optional[Union[str, bytes]]:
1636 """Validate content size and safety
1638 Args:
1639 v (Union[str, bytes]): Value to validate
1641 Returns:
1642 Union[str, bytes]: Value if validated as safe
1644 Raises:
1645 ValueError: When value is unsafe
1646 """
1647 if v is None:
1648 return v
1650 if len(v) > SecurityValidator.MAX_CONTENT_LENGTH:
1651 raise ValueError(f"Content exceeds maximum length of {SecurityValidator.MAX_CONTENT_LENGTH}")
1653 if isinstance(v, bytes):
1654 try:
1655 text = v.decode("utf-8")
1656 except UnicodeDecodeError:
1657 raise ValueError("Content must be UTF-8 decodable")
1658 else:
1659 text = v
1660 # Runtime pattern matching (not precompiled to allow test monkeypatching)
1661 if re.search(SecurityValidator.DANGEROUS_HTML_PATTERN, text, re.IGNORECASE):
1662 raise ValueError("Content contains HTML tags that may cause display issues")
1664 return v
1667class ResourceUpdate(BaseModelWithConfigDict):
1668 """Schema for updating an existing resource.
1670 Similar to ResourceCreate but URI is not required and all fields are optional.
1671 """
1673 uri: Optional[str] = Field(None, description="Unique URI for the resource")
1674 name: Optional[str] = Field(None, description="Human-readable resource name")
1675 description: Optional[str] = Field(None, description="Resource description")
1676 mime_type: Optional[str] = Field(None, description="Resource MIME type")
1677 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1678 content: Optional[Union[str, bytes]] = Field(None, description="Resource content (text or binary)")
1679 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the resource")
1681 # Team scoping fields
1682 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
1683 owner_email: Optional[str] = Field(None, description="Email of the resource owner")
1684 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
1686 @field_validator("tags")
1687 @classmethod
1688 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
1689 """Validate and normalize tags.
1691 Args:
1692 v: Optional list of tag strings to validate
1694 Returns:
1695 List of validated tag strings or None if input is None
1696 """
1697 return validate_tags_field(v)
1699 @field_validator("name")
1700 @classmethod
1701 def validate_name(cls, v: str) -> str:
1702 """Validate resource name
1704 Args:
1705 v (str): Value to validate
1707 Returns:
1708 str: Value if validated as safe
1709 """
1710 return SecurityValidator.validate_name(v, "Resource name")
1712 @field_validator("description")
1713 @classmethod
1714 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1715 """Ensure descriptions display safely, truncate if too long
1717 Args:
1718 v (str): Value to validate
1720 Returns:
1721 str: Value if validated as safe and truncated if too long
1723 Raises:
1724 ValueError: When value is unsafe
1726 Examples:
1727 >>> from mcpgateway.schemas import ResourceUpdate
1728 >>> ResourceUpdate.validate_description('A safe description')
1729 'A safe description'
1730 >>> ResourceUpdate.validate_description(None) # Test None case
1731 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1732 >>> truncated = ResourceUpdate.validate_description(long_desc)
1733 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1734 0
1735 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1736 True
1737 """
1738 if v is None:
1739 return v
1740 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1741 # Truncate the description to the maximum allowed length
1742 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1743 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1744 return SecurityValidator.sanitize_display_text(truncated, "Description")
1745 return SecurityValidator.sanitize_display_text(v, "Description")
1747 @field_validator("mime_type")
1748 @classmethod
1749 def validate_mime_type(cls, v: Optional[str]) -> Optional[str]:
1750 """Validate MIME type format
1752 Args:
1753 v (str): Value to validate
1755 Returns:
1756 str: Value if validated as safe
1757 """
1758 if v is None:
1759 return v
1760 return SecurityValidator.validate_mime_type(v)
1762 @field_validator("content")
1763 @classmethod
1764 def validate_content(cls, v: Optional[Union[str, bytes]]) -> Optional[Union[str, bytes]]:
1765 """Validate content size and safety
1767 Args:
1768 v (Union[str, bytes]): Value to validate
1770 Returns:
1771 Union[str, bytes]: Value if validated as safe
1773 Raises:
1774 ValueError: When value is unsafe
1775 """
1776 if v is None:
1777 return v
1779 if len(v) > SecurityValidator.MAX_CONTENT_LENGTH:
1780 raise ValueError(f"Content exceeds maximum length of {SecurityValidator.MAX_CONTENT_LENGTH}")
1782 if isinstance(v, bytes):
1783 try:
1784 text = v.decode("utf-8")
1785 except UnicodeDecodeError:
1786 raise ValueError("Content must be UTF-8 decodable")
1787 else:
1788 text = v
1789 # Runtime pattern matching (not precompiled to allow test monkeypatching)
1790 if re.search(SecurityValidator.DANGEROUS_HTML_PATTERN, text, re.IGNORECASE):
1791 raise ValueError("Content contains HTML tags that may cause display issues")
1793 return v
1796class ResourceRead(BaseModelWithConfigDict):
1797 """Schema for reading resource information.
1799 Includes all resource fields plus:
1800 - Database ID
1801 - Content size
1802 - Creation/update timestamps
1803 - Active status
1804 - Metrics: Aggregated metrics for the resource invocations.
1805 """
1807 id: str = Field(description="Unique ID of the resource")
1808 uri: str
1809 name: str
1810 description: Optional[str]
1811 mime_type: Optional[str]
1812 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1813 size: Optional[int]
1814 created_at: datetime
1815 updated_at: datetime
1816 enabled: bool
1817 metrics: Optional[ResourceMetrics] = Field(None, description="Resource metrics (may be None in list operations)")
1818 tags: List[str] = Field(default_factory=list, description="Tags for categorizing the resource")
1820 # Comprehensive metadata for audit tracking
1821 created_by: Optional[str] = Field(None, description="Username who created this entity")
1822 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
1823 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
1824 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
1826 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
1827 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
1828 modified_via: Optional[str] = Field(None, description="Modification method")
1829 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
1831 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
1832 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
1833 version: Optional[int] = Field(1, description="Entity version for change tracking")
1835 # Team scoping fields
1836 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
1837 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
1838 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
1839 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
1841 # MCP protocol fields
1842 title: Optional[str] = Field(None, description="Human-readable title for the resource")
1843 annotations: Optional[Annotations] = Field(None, description="Optional annotations for client rendering hints")
1844 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
1847class ResourceSubscription(BaseModelWithConfigDict):
1848 """Schema for resource subscriptions.
1850 This schema validates resource subscription requests to ensure URIs are safe
1851 and subscriber IDs follow proper formatting rules. It prevents various
1852 injection attacks and ensures data consistency.
1854 Tracks:
1855 - Resource URI being subscribed to (validated for safety)
1856 - Unique subscriber identifier (validated for proper format)
1858 Validation Rules:
1859 - URIs cannot contain HTML special characters (<, >, ", ', backslash)
1860 - URIs cannot contain directory traversal sequences (..)
1861 - URIs must contain only safe characters (alphanumeric, _, -, :, /, ?, =, &, %)
1862 - Subscriber IDs must contain only alphanumeric characters, underscores, hyphens, and dots
1863 - Both fields have maximum length limits (255 characters)
1865 Attributes:
1866 uri (str): URI of the resource to subscribe to. Must be a safe, valid URI.
1867 subscriber_id (str): Unique identifier for the subscriber. Must follow
1868 identifier naming conventions.
1870 Examples:
1871 >>> from pydantic import ValidationError
1872 >>> # Valid subscription
1873 >>> sub = ResourceSubscription(uri="/api/v1/users/123", subscriber_id="client_001")
1874 >>> sub.uri
1875 '/api/v1/users/123'
1876 >>> sub.subscriber_id
1877 'client_001'
1879 >>> # Valid URI with query parameters
1880 >>> sub = ResourceSubscription(uri="/data?type=json&limit=10", subscriber_id="app.service.1")
1881 >>> sub.uri
1882 '/data?type=json&limit=10'
1884 >>> # Valid subscriber ID with dots (common for service names)
1885 >>> sub = ResourceSubscription(uri="/events", subscriber_id="com.example.service")
1886 >>> sub.subscriber_id
1887 'com.example.service'
1889 >>> # Invalid: XSS attempt in URI
1890 >>> try:
1891 ... ResourceSubscription(uri="<script>alert('XSS')</script>", subscriber_id="sub1")
1892 ... except ValidationError as e:
1893 ... print("Validation failed: HTML characters not allowed")
1894 Validation failed: HTML characters not allowed
1896 >>> # Invalid: Directory traversal in URI
1897 >>> try:
1898 ... ResourceSubscription(uri="/api/../../../etc/passwd", subscriber_id="sub1")
1899 ... except ValidationError as e:
1900 ... print("Validation failed: Directory traversal detected")
1901 Validation failed: Directory traversal detected
1903 >>> # Invalid: SQL injection attempt in URI
1904 >>> try:
1905 ... ResourceSubscription(uri="/users'; DROP TABLE users;--", subscriber_id="sub1")
1906 ... except ValidationError as e:
1907 ... print("Validation failed: Invalid characters in URI")
1908 Validation failed: Invalid characters in URI
1910 >>> # Invalid: Special characters in subscriber ID
1911 >>> try:
1912 ... ResourceSubscription(uri="/api/data", subscriber_id="sub@123!")
1913 ... except ValidationError as e:
1914 ... print("Validation failed: Invalid subscriber ID format")
1915 Validation failed: Invalid subscriber ID format
1917 >>> # Invalid: Empty URI
1918 >>> try:
1919 ... ResourceSubscription(uri="", subscriber_id="sub1")
1920 ... except ValidationError as e:
1921 ... print("Validation failed: URI cannot be empty")
1922 Validation failed: URI cannot be empty
1924 >>> # Invalid: Empty subscriber ID
1925 >>> try:
1926 ... ResourceSubscription(uri="/api/data", subscriber_id="")
1927 ... except ValidationError as e:
1928 ... print("Validation failed: Subscriber ID cannot be empty")
1929 Validation failed: Subscriber ID cannot be empty
1931 >>> # Valid: Complex but safe URI
1932 >>> sub = ResourceSubscription(
1933 ... uri="/api/v2/resources/category:items/filter?status=active&limit=50",
1934 ... subscriber_id="monitor-service-01"
1935 ... )
1936 >>> sub.uri
1937 '/api/v2/resources/category:items/filter?status=active&limit=50'
1939 >>> # Edge case: Maximum length validation (simulated)
1940 >>> long_uri = "/" + "a" * 254 # Just under limit
1941 >>> sub = ResourceSubscription(uri=long_uri, subscriber_id="sub1")
1942 >>> len(sub.uri)
1943 255
1945 >>> # Invalid: Quotes in URI (could break out of attributes)
1946 >>> try:
1947 ... ResourceSubscription(uri='/api/data"onclick="alert(1)', subscriber_id="sub1")
1948 ... except ValidationError as e:
1949 ... print("Validation failed: Quotes not allowed in URI")
1950 Validation failed: Quotes not allowed in URI
1951 """
1953 uri: str = Field(..., description="URI of resource to subscribe to")
1954 subscriber_id: str = Field(..., description="Unique subscriber identifier")
1956 @field_validator("uri")
1957 @classmethod
1958 def validate_uri(cls, v: str) -> str:
1959 """Validate URI format for safety and correctness.
1961 Ensures the URI:
1962 - Is not empty
1963 - Does not contain HTML special characters that could cause XSS
1964 - Does not contain directory traversal sequences (..)
1965 - Contains only allowed characters for URIs
1966 - Does not exceed maximum length (255 characters)
1968 This prevents various injection attacks including XSS, path traversal,
1969 and other URI-based vulnerabilities.
1971 Args:
1972 v (str): URI to validate
1974 Returns:
1975 str: The validated URI if it passes all security checks
1977 Raises:
1978 ValueError: If the URI contains dangerous patterns or invalid characters
1979 """
1980 return SecurityValidator.validate_uri(v, "Resource URI")
1982 @field_validator("subscriber_id")
1983 @classmethod
1984 def validate_subscriber_id(cls, v: str) -> str:
1985 """Validate subscriber ID format.
1987 Ensures the subscriber ID:
1988 - Is not empty
1989 - Contains only alphanumeric characters, underscores, hyphens, and dots
1990 - Does not contain HTML special characters
1991 - Follows standard identifier naming conventions
1992 - Does not exceed maximum length (255 characters)
1994 This ensures consistency and prevents injection attacks through
1995 subscriber identifiers.
1997 Args:
1998 v (str): Subscriber ID to validate
2000 Returns:
2001 str: The validated subscriber ID if it passes all checks
2003 Raises:
2004 ValueError: If the subscriber ID violates naming conventions
2005 """
2006 return SecurityValidator.validate_identifier(v, "Subscriber ID")
2009class ResourceNotification(BaseModelWithConfigDict):
2010 """Schema for resource update notifications.
2012 Contains:
2013 - Resource URI
2014 - Updated content
2015 - Update timestamp
2016 """
2018 uri: str
2019 content: ResourceContent
2020 timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
2022 @field_serializer("timestamp")
2023 def serialize_timestamp(self, dt: datetime) -> str:
2024 """Serialize the `timestamp` field as an ISO 8601 string with UTC timezone.
2026 Converts the given datetime to UTC and returns it in ISO 8601 format,
2027 replacing the "+00:00" suffix with "Z" to indicate UTC explicitly.
2029 Args:
2030 dt (datetime): The datetime object to serialize.
2032 Returns:
2033 str: ISO 8601 formatted string in UTC, ending with 'Z'.
2034 """
2035 return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
2038# --- Prompt Schemas ---
2041class PromptArgument(BaseModelWithConfigDict):
2042 """Schema for prompt template arguments.
2044 Defines:
2045 - Argument name
2046 - Optional description
2047 - Required flag
2048 """
2050 name: str = Field(..., description="Argument name")
2051 description: Optional[str] = Field(None, description="Argument description")
2052 required: bool = Field(default=False, description="Whether argument is required")
2054 # Use base config; example metadata removed to avoid config merging type issues in static checks
2057class PromptCreate(BaseModelWithConfigDict):
2058 """
2059 Schema for creating a new prompt.
2061 Attributes:
2062 model_config (ConfigDict): Configuration for the model.
2063 name (str): Unique name for the prompt.
2064 description (Optional[str]): Optional description of the prompt.
2065 template (str): Template text for the prompt.
2066 arguments (List[PromptArgument]): List of arguments for the template.
2067 """
2069 model_config = ConfigDict(**dict(BaseModelWithConfigDict.model_config), str_strip_whitespace=True)
2071 name: str = Field(..., description="Unique name for the prompt")
2072 custom_name: Optional[str] = Field(None, description="Custom prompt name used for MCP invocation")
2073 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2074 description: Optional[str] = Field(None, description="Prompt description")
2075 template: str = Field(..., description="Prompt template text")
2076 arguments: List[PromptArgument] = Field(default_factory=list, description="List of arguments for the template")
2077 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the prompt")
2079 # Team scoping fields
2080 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
2081 owner_email: Optional[str] = Field(None, description="Email of the prompt owner")
2082 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
2083 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the prompt")
2085 @field_validator("tags")
2086 @classmethod
2087 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2088 """Validate and normalize tags.
2090 Args:
2091 v: Optional list of tag strings to validate
2093 Returns:
2094 List of validated tag strings
2095 """
2096 return validate_tags_field(v)
2098 @field_validator("name")
2099 @classmethod
2100 def validate_name(cls, v: str) -> str:
2101 """Ensure prompt names display correctly in UI
2103 Args:
2104 v (str): Value to validate
2106 Returns:
2107 str: Value if validated as safe
2108 """
2109 return SecurityValidator.validate_name(v, "Prompt name")
2111 @field_validator("custom_name")
2112 @classmethod
2113 def validate_custom_name(cls, v: Optional[str]) -> Optional[str]:
2114 """Ensure custom prompt names follow MCP naming conventions.
2116 Args:
2117 v: Custom prompt name to validate.
2119 Returns:
2120 The validated custom name or None.
2121 """
2122 if v is None:
2123 return v
2124 return SecurityValidator.validate_name(v, "Prompt name")
2126 @field_validator("display_name")
2127 @classmethod
2128 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
2129 """Ensure display names render safely in UI.
2131 Args:
2132 v: Display name to validate.
2134 Returns:
2135 The validated display name or None.
2136 """
2137 if v is None:
2138 return v
2139 return SecurityValidator.sanitize_display_text(v, "Prompt display name")
2141 @field_validator("description")
2142 @classmethod
2143 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2144 """Ensure descriptions display safely, truncate if too long
2146 Args:
2147 v (str): Value to validate
2149 Returns:
2150 str: Value if validated as safe and truncated if too long
2152 Raises:
2153 ValueError: When value is unsafe
2155 Examples:
2156 >>> from mcpgateway.schemas import PromptCreate
2157 >>> PromptCreate.validate_description('A safe description')
2158 'A safe description'
2159 >>> PromptCreate.validate_description(None) # Test None case
2160 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2161 >>> truncated = PromptCreate.validate_description(long_desc)
2162 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2163 0
2164 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2165 True
2166 """
2167 if v is None:
2168 return v
2169 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2170 # Truncate the description to the maximum allowed length
2171 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2172 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2173 return SecurityValidator.sanitize_display_text(truncated, "Description")
2174 return SecurityValidator.sanitize_display_text(v, "Description")
2176 @field_validator("template")
2177 @classmethod
2178 def validate_template(cls, v: str) -> str:
2179 """Validate template content for safe display
2181 Args:
2182 v (str): Value to validate
2184 Returns:
2185 str: Value if validated as safe
2186 """
2187 return SecurityValidator.validate_template(v)
2189 @field_validator("arguments")
2190 @classmethod
2191 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
2192 """Ensure JSON structure is valid and within complexity limits
2194 Args:
2195 v (dict): Value to validate
2197 Returns:
2198 dict: Value if validated as safe
2199 """
2200 SecurityValidator.validate_json_depth(v)
2201 return v
2204class PromptExecuteArgs(BaseModel):
2205 """
2206 Schema for args executing a prompt
2208 Attributes:
2209 args (Dict[str, str]): Arguments for prompt execution.
2210 """
2212 model_config = ConfigDict(str_strip_whitespace=True)
2214 args: Dict[str, str] = Field(default_factory=dict, description="Arguments for prompt execution")
2216 @field_validator("args")
2217 @classmethod
2218 def validate_args(cls, v: dict) -> dict:
2219 """Ensure prompt arguments pass XSS validation
2221 Args:
2222 v (dict): Value to validate
2224 Returns:
2225 dict: Value if validated as safe
2226 """
2227 for val in v.values():
2228 SecurityValidator.validate_no_xss(val, "Prompt execution arguments")
2229 return v
2232class PromptUpdate(BaseModelWithConfigDict):
2233 """Schema for updating an existing prompt.
2235 Similar to PromptCreate but all fields are optional to allow partial updates.
2236 """
2238 name: Optional[str] = Field(None, description="Unique name for the prompt")
2239 custom_name: Optional[str] = Field(None, description="Custom prompt name used for MCP invocation")
2240 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2241 description: Optional[str] = Field(None, description="Prompt description")
2242 template: Optional[str] = Field(None, description="Prompt template text")
2243 arguments: Optional[List[PromptArgument]] = Field(None, description="List of arguments for the template")
2245 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the prompt")
2247 # Team scoping fields
2248 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
2249 owner_email: Optional[str] = Field(None, description="Email of the prompt owner")
2250 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
2252 @field_validator("tags")
2253 @classmethod
2254 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2255 """Validate and normalize tags.
2257 Args:
2258 v: Optional list of tag strings to validate
2260 Returns:
2261 List of validated tag strings
2262 """
2263 return validate_tags_field(v)
2265 @field_validator("name")
2266 @classmethod
2267 def validate_name(cls, v: str) -> str:
2268 """Ensure prompt names display correctly in UI
2270 Args:
2271 v (str): Value to validate
2273 Returns:
2274 str: Value if validated as safe
2275 """
2276 return SecurityValidator.validate_name(v, "Prompt name")
2278 @field_validator("custom_name")
2279 @classmethod
2280 def validate_custom_name(cls, v: Optional[str]) -> Optional[str]:
2281 """Ensure custom prompt names follow MCP naming conventions.
2283 Args:
2284 v: Custom prompt name to validate.
2286 Returns:
2287 The validated custom name or None.
2288 """
2289 if v is None:
2290 return v
2291 return SecurityValidator.validate_name(v, "Prompt name")
2293 @field_validator("display_name")
2294 @classmethod
2295 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
2296 """Ensure display names render safely in UI.
2298 Args:
2299 v: Display name to validate.
2301 Returns:
2302 The validated display name or None.
2303 """
2304 if v is None:
2305 return v
2306 return SecurityValidator.sanitize_display_text(v, "Prompt display name")
2308 @field_validator("description")
2309 @classmethod
2310 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2311 """Ensure descriptions display safely, truncate if too long
2313 Args:
2314 v (str): Value to validate
2316 Returns:
2317 str: Value if validated as safe and truncated if too long
2319 Raises:
2320 ValueError: When value is unsafe
2322 Examples:
2323 >>> from mcpgateway.schemas import PromptUpdate
2324 >>> PromptUpdate.validate_description('A safe description')
2325 'A safe description'
2326 >>> PromptUpdate.validate_description(None) # Test None case
2327 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2328 >>> truncated = PromptUpdate.validate_description(long_desc)
2329 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2330 0
2331 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2332 True
2333 """
2334 if v is None:
2335 return v
2336 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2337 # Truncate the description to the maximum allowed length
2338 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2339 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2340 return SecurityValidator.sanitize_display_text(truncated, "Description")
2341 return SecurityValidator.sanitize_display_text(v, "Description")
2343 @field_validator("template")
2344 @classmethod
2345 def validate_template(cls, v: str) -> str:
2346 """Validate template content for safe display
2348 Args:
2349 v (str): Value to validate
2351 Returns:
2352 str: Value if validated as safe
2353 """
2354 return SecurityValidator.validate_template(v)
2356 @field_validator("arguments")
2357 @classmethod
2358 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
2359 """Ensure JSON structure is valid and within complexity limits
2361 Args:
2362 v (dict): Value to validate
2364 Returns:
2365 dict: Value if validated as safe
2366 """
2367 SecurityValidator.validate_json_depth(v)
2368 return v
2371class PromptRead(BaseModelWithConfigDict):
2372 """Schema for reading prompt information.
2374 Includes all prompt fields plus:
2375 - Database ID
2376 - Creation/update timestamps
2377 - Active status
2378 - Metrics: Aggregated metrics for the prompt invocations.
2379 """
2381 id: str = Field(description="Unique ID of the prompt")
2382 name: str
2383 original_name: str
2384 custom_name: str
2385 custom_name_slug: str
2386 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2387 gateway_slug: Optional[str] = None
2388 description: Optional[str]
2389 template: str
2390 arguments: List[PromptArgument]
2391 created_at: datetime
2392 updated_at: datetime
2393 # is_active: bool
2394 enabled: bool
2395 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the prompt")
2396 metrics: Optional[PromptMetrics] = Field(None, description="Prompt metrics (may be None in list operations)")
2398 # Comprehensive metadata for audit tracking
2399 created_by: Optional[str] = Field(None, description="Username who created this entity")
2400 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
2401 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
2402 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
2404 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
2405 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
2406 modified_via: Optional[str] = Field(None, description="Modification method")
2407 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
2409 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
2410 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
2411 version: Optional[int] = Field(1, description="Entity version for change tracking")
2413 # Team scoping fields
2414 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
2415 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
2416 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
2417 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
2419 # MCP protocol fields
2420 title: Optional[str] = Field(None, description="Human-readable title for the prompt")
2421 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
2424class PromptInvocation(BaseModelWithConfigDict):
2425 """Schema for prompt invocation requests.
2427 Contains:
2428 - Prompt name to use
2429 - Arguments for template rendering
2430 """
2432 name: str = Field(..., description="Name of prompt to use")
2433 arguments: Dict[str, str] = Field(default_factory=dict, description="Arguments for template rendering")
2436# --- Global Config Schemas ---
2437class GlobalConfigUpdate(BaseModel):
2438 """Schema for updating global configuration.
2440 Attributes:
2441 passthrough_headers (Optional[List[str]]): List of headers allowed to be passed through globally
2442 """
2444 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through globally")
2447class GlobalConfigRead(BaseModel):
2448 """Schema for reading global configuration.
2450 Attributes:
2451 passthrough_headers (Optional[List[str]]): List of headers allowed to be passed through globally
2452 """
2454 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through globally")
2457# --- Gateway Schemas ---
2460# --- Transport Type ---
2461class TransportType(str, Enum):
2462 """
2463 Enumeration of supported transport mechanisms for communication between components.
2465 Attributes:
2466 SSE (str): Server-Sent Events transport.
2467 HTTP (str): Standard HTTP-based transport.
2468 STDIO (str): Standard input/output transport.
2469 STREAMABLEHTTP (str): HTTP transport with streaming.
2470 """
2472 SSE = "SSE"
2473 HTTP = "HTTP"
2474 STDIO = "STDIO"
2475 STREAMABLEHTTP = "STREAMABLEHTTP"
2478class GatewayCreate(BaseModel):
2479 """
2480 Schema for creating a new gateway.
2482 Attributes:
2483 model_config (ConfigDict): Configuration for the model.
2484 name (str): Unique name for the gateway.
2485 url (Union[str, AnyHttpUrl]): Gateway endpoint URL.
2486 description (Optional[str]): Optional description of the gateway.
2487 transport (str): Transport used by the MCP server, default is "SSE".
2488 auth_type (Optional[str]): Type of authentication (basic, bearer, headers, or none).
2489 auth_username (Optional[str]): Username for basic authentication.
2490 auth_password (Optional[str]): Password for basic authentication.
2491 auth_token (Optional[str]): Token for bearer authentication.
2492 auth_header_key (Optional[str]): Key for custom headers authentication.
2493 auth_header_value (Optional[str]): Value for custom headers authentication.
2494 auth_headers (Optional[List[Dict[str, str]]]): List of custom headers for authentication.
2495 auth_value (Optional[str]): Alias for authentication value, used for better access post-validation.
2496 """
2498 model_config = ConfigDict(str_strip_whitespace=True)
2500 name: str = Field(..., description="Unique name for the gateway")
2501 url: Union[str, AnyHttpUrl] = Field(..., description="Gateway endpoint URL")
2502 description: Optional[str] = Field(None, description="Gateway description")
2503 transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
2504 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
2506 # Authorizations
2507 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, headers, oauth, query_param, or none")
2508 # Fields for various types of authentication
2509 auth_username: Optional[str] = Field(None, description="Username for basic authentication")
2510 auth_password: Optional[str] = Field(None, description="Password for basic authentication")
2511 auth_token: Optional[str] = Field(None, description="Token for bearer authentication")
2512 auth_header_key: Optional[str] = Field(None, description="Key for custom headers authentication")
2513 auth_header_value: Optional[str] = Field(None, description="Value for custom headers authentication")
2514 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
2516 # OAuth 2.0 configuration
2517 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
2519 # Query Parameter Authentication (INSECURE)
2520 auth_query_param_key: Optional[str] = Field(
2521 None,
2522 description="Query parameter name for authentication (e.g., 'api_key', 'tavilyApiKey')",
2523 pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-]*$",
2524 )
2525 auth_query_param_value: Optional[SecretStr] = Field(
2526 None,
2527 description="Query parameter value (API key). Stored encrypted.",
2528 )
2530 # Adding `auth_value` as an alias for better access post-validation
2531 auth_value: Optional[str] = Field(None, validate_default=True)
2533 # One time auth - do not store the auth in gateway flag
2534 one_time_auth: Optional[bool] = Field(default=False, description="The authentication should be used only once and not stored in the gateway")
2536 tags: Optional[List[Union[str, Dict[str, str]]]] = Field(default_factory=list, description="Tags for categorizing the gateway")
2538 # Team scoping fields for resource organization
2539 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
2540 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
2541 visibility: Optional[str] = Field(default="public", description="Gateway visibility: private, team, or public")
2543 # CA certificate
2544 ca_certificate: Optional[str] = Field(None, description="Custom CA certificate for TLS verification")
2545 ca_certificate_sig: Optional[str] = Field(None, description="Signature of the custom CA certificate for integrity verification")
2546 signing_algorithm: Optional[str] = Field("ed25519", description="Algorithm used for signing the CA certificate")
2548 # Per-gateway refresh configuration
2549 refresh_interval_seconds: Optional[int] = Field(None, ge=60, description="Per-gateway refresh interval in seconds (minimum 60); uses global default if not set")
2551 @field_validator("tags")
2552 @classmethod
2553 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2554 """Validate and normalize tags.
2556 Args:
2557 v: Optional list of tag strings to validate
2559 Returns:
2560 List of validated tag strings
2561 """
2562 return validate_tags_field(v)
2564 @field_validator("name")
2565 @classmethod
2566 def validate_name(cls, v: str) -> str:
2567 """Validate gateway name
2569 Args:
2570 v (str): Value to validate
2572 Returns:
2573 str: Value if validated as safe
2574 """
2575 return SecurityValidator.validate_name(v, "Gateway name")
2577 @field_validator("url")
2578 @classmethod
2579 def validate_url(cls, v: str) -> str:
2580 """Validate gateway URL
2582 Args:
2583 v (str): Value to validate
2585 Returns:
2586 str: Value if validated as safe
2587 """
2588 return SecurityValidator.validate_url(v, "Gateway URL")
2590 @field_validator("description")
2591 @classmethod
2592 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2593 """Ensure descriptions display safely, truncate if too long
2595 Args:
2596 v (str): Value to validate
2598 Returns:
2599 str: Value if validated as safe and truncated if too long
2601 Raises:
2602 ValueError: When value is unsafe
2604 Examples:
2605 >>> from mcpgateway.schemas import GatewayCreate
2606 >>> GatewayCreate.validate_description('A safe description')
2607 'A safe description'
2608 >>> GatewayCreate.validate_description(None) # Test None case
2609 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2610 >>> truncated = ToolCreate.validate_description(long_desc)
2611 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2612 0
2613 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2614 True
2615 """
2616 if v is None:
2617 return v
2618 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2619 # Truncate the description to the maximum allowed length
2620 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2621 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2622 return SecurityValidator.sanitize_display_text(truncated, "Description")
2623 return SecurityValidator.sanitize_display_text(v, "Description")
2625 @field_validator("auth_value", mode="before")
2626 @classmethod
2627 def create_auth_value(cls, v, info):
2628 """
2629 This validator will run before the model is fully instantiated (mode="before")
2630 It will process the auth fields based on auth_type and generate auth_value.
2632 Args:
2633 v: Input url
2634 info: ValidationInfo containing auth_type
2636 Returns:
2637 str: Auth value
2638 """
2639 data = info.data
2640 auth_type = data.get("auth_type")
2642 if (auth_type is None) or (auth_type == ""):
2643 return v # If no auth_type is provided, no need to create auth_value
2645 # Process the auth fields and generate auth_value based on auth_type
2646 auth_value = cls._process_auth_fields(info)
2647 return auth_value
2649 @field_validator("transport")
2650 @classmethod
2651 def validate_transport(cls, v: str) -> str:
2652 """
2653 Validates that the given transport value is one of the supported TransportType values.
2655 Args:
2656 v (str): The transport value to validate.
2658 Returns:
2659 str: The validated transport value if it is valid.
2661 Raises:
2662 ValueError: If the provided value is not a valid transport type.
2664 Valid transport types are defined in the TransportType enum:
2665 - SSE
2666 - HTTP
2667 - STDIO
2668 - STREAMABLEHTTP
2669 """
2670 allowed = [t.value for t in TransportType.__members__.values()]
2671 if v not in allowed:
2672 raise ValueError(f"Invalid transport type: {v}. Must be one of: {', '.join(allowed)}")
2673 return v
2675 @staticmethod
2676 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
2677 """
2678 Processes the input authentication fields and returns the correct auth_value.
2679 This method is called based on the selected auth_type.
2681 Args:
2682 info: ValidationInfo containing auth fields
2684 Returns:
2685 Encoded auth string or None
2687 Raises:
2688 ValueError: If auth_type is invalid
2689 """
2690 data = info.data
2691 auth_type = data.get("auth_type")
2693 if auth_type == "basic":
2694 # For basic authentication, both username and password must be present
2695 username = data.get("auth_username")
2696 password = data.get("auth_password")
2698 if not username or not password:
2699 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
2701 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
2702 return encode_auth({"Authorization": f"Basic {creds}"})
2704 if auth_type == "bearer":
2705 # For bearer authentication, only token is required
2706 token = data.get("auth_token")
2708 if not token:
2709 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
2711 return encode_auth({"Authorization": f"Bearer {token}"})
2713 if auth_type == "oauth":
2714 # For OAuth authentication, we don't encode anything here
2715 # The OAuth configuration is handled separately in the oauth_config field
2716 # This method is only called for traditional auth types
2717 return None
2719 if auth_type == "authheaders":
2720 # Support both new multi-headers format and legacy single header format
2721 auth_headers = data.get("auth_headers")
2722 if auth_headers and isinstance(auth_headers, list):
2723 # New multi-headers format with enhanced validation
2724 header_dict = {}
2725 duplicate_keys = set()
2727 for header in auth_headers:
2728 if not isinstance(header, dict):
2729 continue
2731 key = header.get("key")
2732 value = header.get("value", "")
2734 # Skip headers without keys
2735 if not key:
2736 continue
2738 # Track duplicate keys (last value wins)
2739 if key in header_dict:
2740 duplicate_keys.add(key)
2742 # Validate header key format (basic HTTP header validation)
2743 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
2744 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
2746 # Store header (empty values are allowed)
2747 header_dict[key] = value
2749 # Ensure at least one valid header
2750 if not header_dict:
2751 raise ValueError("For 'headers' auth, at least one valid header with a key must be provided.")
2753 # Warn about duplicate keys (optional - could log this instead)
2754 if duplicate_keys:
2755 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
2757 # Check for excessive headers (prevent abuse)
2758 if len(header_dict) > 100:
2759 raise ValueError("Maximum of 100 headers allowed per gateway.")
2761 return encode_auth(header_dict)
2763 # Legacy single header format (backward compatibility)
2764 header_key = data.get("auth_header_key")
2765 header_value = data.get("auth_header_value")
2767 if not header_key or not header_value:
2768 raise ValueError("For 'headers' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
2770 return encode_auth({header_key: header_value})
2772 if auth_type == "one_time_auth":
2773 return None # No auth_value needed for one-time auth
2775 if auth_type == "query_param":
2776 # Query param auth doesn't use auth_value field
2777 # Validation is handled by model_validator
2778 return None
2780 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, headers, or query_param.")
2782 @model_validator(mode="after")
2783 def validate_query_param_auth(self) -> "GatewayCreate":
2784 """Validate query parameter authentication configuration.
2786 Returns:
2787 GatewayCreate: The validated instance.
2789 Raises:
2790 ValueError: If query param auth is disabled or host is not in allowlist.
2791 """
2792 if self.auth_type != "query_param":
2793 return self
2795 # Check feature flag
2796 if not settings.insecure_allow_queryparam_auth:
2797 raise ValueError("Query parameter authentication is disabled. " + "Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable. " + "WARNING: API keys in URLs may appear in proxy logs.")
2799 # Check required fields
2800 if not self.auth_query_param_key:
2801 raise ValueError("auth_query_param_key is required when auth_type is 'query_param'")
2802 if not self.auth_query_param_value:
2803 raise ValueError("auth_query_param_value is required when auth_type is 'query_param'")
2805 # Check host allowlist (if configured)
2806 if settings.insecure_queryparam_auth_allowed_hosts:
2807 parsed = urlparse(str(self.url))
2808 # Extract hostname properly (handles IPv6, ports, userinfo)
2809 hostname = parsed.hostname or ""
2810 hostname = hostname.lower()
2812 if hostname not in settings.insecure_queryparam_auth_allowed_hosts:
2813 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
2814 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query parameter auth. " f"Allowed hosts: {allowed}")
2816 return self
2819class GatewayUpdate(BaseModelWithConfigDict):
2820 """Schema for updating an existing federation gateway.
2822 Similar to GatewayCreate but all fields are optional to allow partial updates.
2823 """
2825 name: Optional[str] = Field(None, description="Unique name for the gateway")
2826 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Gateway endpoint URL")
2827 description: Optional[str] = Field(None, description="Gateway description")
2828 transport: Optional[str] = Field(None, description="Transport used by MCP server: SSE or STREAMABLEHTTP")
2830 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
2832 # Authorizations
2833 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, headers or None")
2834 auth_username: Optional[str] = Field(None, description="username for basic authentication")
2835 auth_password: Optional[str] = Field(None, description="password for basic authentication")
2836 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
2837 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
2838 auth_header_value: Optional[str] = Field(None, description="value for custom headers authentication")
2839 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
2841 # Adding `auth_value` as an alias for better access post-validation
2842 auth_value: Optional[str] = Field(None, validate_default=True)
2844 # OAuth 2.0 configuration
2845 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
2847 # Query Parameter Authentication (INSECURE)
2848 auth_query_param_key: Optional[str] = Field(
2849 None,
2850 description="Query parameter name for authentication",
2851 pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-]*$",
2852 )
2853 auth_query_param_value: Optional[SecretStr] = Field(
2854 None,
2855 description="Query parameter value (API key)",
2856 )
2858 # One time auth - do not store the auth in gateway flag
2859 one_time_auth: Optional[bool] = Field(default=False, description="The authentication should be used only once and not stored in the gateway")
2861 tags: Optional[List[Union[str, Dict[str, str]]]] = Field(None, description="Tags for categorizing the gateway")
2863 # Team scoping fields for resource organization
2864 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
2865 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
2866 visibility: Optional[str] = Field(None, description="Gateway visibility: private, team, or public")
2868 # Per-gateway refresh configuration
2869 refresh_interval_seconds: Optional[int] = Field(None, ge=60, description="Per-gateway refresh interval in seconds (minimum 60); uses global default if not set")
2871 @field_validator("tags")
2872 @classmethod
2873 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2874 """Validate and normalize tags.
2876 Args:
2877 v: Optional list of tag strings to validate
2879 Returns:
2880 List of validated tag strings
2881 """
2882 return validate_tags_field(v)
2884 @field_validator("name", mode="before")
2885 @classmethod
2886 def validate_name(cls, v: str) -> str:
2887 """Validate gateway name
2889 Args:
2890 v (str): Value to validate
2892 Returns:
2893 str: Value if validated as safe
2894 """
2895 return SecurityValidator.validate_name(v, "Gateway name")
2897 @field_validator("url", mode="before")
2898 @classmethod
2899 def validate_url(cls, v: str) -> str:
2900 """Validate gateway URL
2902 Args:
2903 v (str): Value to validate
2905 Returns:
2906 str: Value if validated as safe
2907 """
2908 return SecurityValidator.validate_url(v, "Gateway URL")
2910 @field_validator("description", mode="before")
2911 @classmethod
2912 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2913 """Ensure descriptions display safely, truncate if too long
2915 Args:
2916 v (str): Value to validate
2918 Returns:
2919 str: Value if validated as safe and truncated if too long
2921 Raises:
2922 ValueError: When value is unsafe
2924 Examples:
2925 >>> from mcpgateway.schemas import GatewayUpdate
2926 >>> GatewayUpdate.validate_description('A safe description')
2927 'A safe description'
2928 >>> GatewayUpdate.validate_description(None) # Test None case
2929 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2930 >>> truncated = ToolCreate.validate_description(long_desc)
2931 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2932 0
2933 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2934 True
2935 """
2936 if v is None:
2937 return v
2938 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2939 # Truncate the description to the maximum allowed length
2940 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2941 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2942 return SecurityValidator.sanitize_display_text(truncated, "Description")
2943 return SecurityValidator.sanitize_display_text(v, "Description")
2945 @field_validator("auth_value", mode="before")
2946 @classmethod
2947 def create_auth_value(cls, v, info):
2948 """
2949 This validator will run before the model is fully instantiated (mode="before")
2950 It will process the auth fields based on auth_type and generate auth_value.
2952 Args:
2953 v: Input URL
2954 info: ValidationInfo containing auth_type
2956 Returns:
2957 str: Auth value or URL
2958 """
2959 data = info.data
2960 auth_type = data.get("auth_type")
2962 if (auth_type is None) or (auth_type == ""):
2963 return v # If no auth_type is provided, no need to create auth_value
2965 # Process the auth fields and generate auth_value based on auth_type
2966 auth_value = cls._process_auth_fields(info)
2967 return auth_value
2969 @staticmethod
2970 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
2971 """
2972 Processes the input authentication fields and returns the correct auth_value.
2973 This method is called based on the selected auth_type.
2975 Args:
2976 info: ValidationInfo containing auth fields
2978 Returns:
2979 Encoded auth string or None
2981 Raises:
2982 ValueError: If auth type is invalid
2983 """
2985 data = info.data
2986 auth_type = data.get("auth_type")
2988 if auth_type == "basic":
2989 # For basic authentication, both username and password must be present
2990 username = data.get("auth_username")
2991 password = data.get("auth_password")
2992 if not username or not password:
2993 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
2995 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
2996 return encode_auth({"Authorization": f"Basic {creds}"})
2998 if auth_type == "bearer":
2999 # For bearer authentication, only token is required
3000 token = data.get("auth_token")
3002 if not token:
3003 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
3005 return encode_auth({"Authorization": f"Bearer {token}"})
3007 if auth_type == "oauth":
3008 # For OAuth authentication, we don't encode anything here
3009 # The OAuth configuration is handled separately in the oauth_config field
3010 # This method is only called for traditional auth types
3011 return None
3013 if auth_type == "authheaders":
3014 # Support both new multi-headers format and legacy single header format
3015 auth_headers = data.get("auth_headers")
3016 if auth_headers and isinstance(auth_headers, list):
3017 # New multi-headers format with enhanced validation
3018 header_dict = {}
3019 duplicate_keys = set()
3021 for header in auth_headers:
3022 if not isinstance(header, dict):
3023 continue
3025 key = header.get("key")
3026 value = header.get("value", "")
3028 # Skip headers without keys
3029 if not key:
3030 continue
3032 # Track duplicate keys (last value wins)
3033 if key in header_dict:
3034 duplicate_keys.add(key)
3036 # Validate header key format (basic HTTP header validation)
3037 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
3038 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
3040 # Store header (empty values are allowed)
3041 header_dict[key] = value
3043 # Ensure at least one valid header
3044 if not header_dict:
3045 raise ValueError("For 'headers' auth, at least one valid header with a key must be provided.")
3047 # Warn about duplicate keys (optional - could log this instead)
3048 if duplicate_keys:
3049 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
3051 # Check for excessive headers (prevent abuse)
3052 if len(header_dict) > 100:
3053 raise ValueError("Maximum of 100 headers allowed per gateway.")
3055 return encode_auth(header_dict)
3057 # Legacy single header format (backward compatibility)
3058 header_key = data.get("auth_header_key")
3059 header_value = data.get("auth_header_value")
3061 if not header_key or not header_value:
3062 raise ValueError("For 'headers' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
3064 return encode_auth({header_key: header_value})
3066 if auth_type == "one_time_auth":
3067 return None # No auth_value needed for one-time auth
3069 if auth_type == "query_param":
3070 # Query param auth doesn't use auth_value field
3071 # Validation is handled by model_validator
3072 return None
3074 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, headers, or query_param.")
3076 @model_validator(mode="after")
3077 def validate_query_param_auth(self) -> "GatewayUpdate":
3078 """Validate query parameter authentication configuration.
3080 NOTE: This only runs when auth_type is explicitly set to "query_param".
3081 Service-layer enforcement in update_gateway() handles the case where
3082 auth_type is omitted but the existing gateway uses query_param auth.
3084 Returns:
3085 GatewayUpdate: The validated instance.
3087 Raises:
3088 ValueError: If required fields are missing when setting query_param auth.
3089 """
3090 if self.auth_type == "query_param":
3091 # Validate fields are provided when explicitly setting query_param auth
3092 # Feature flag/allowlist check happens in service layer (has access to existing gateway)
3093 if not self.auth_query_param_key:
3094 raise ValueError("auth_query_param_key is required when setting auth_type to 'query_param'")
3095 if not self.auth_query_param_value:
3096 raise ValueError("auth_query_param_value is required when setting auth_type to 'query_param'")
3098 return self
3101# ---------------------------------------------------------------------------
3102# OAuth config masking helper (used by GatewayRead.masked / A2AAgentRead.masked)
3103# ---------------------------------------------------------------------------
3104_SENSITIVE_OAUTH_KEYS = frozenset(
3105 {
3106 "client_secret",
3107 "password",
3108 "refresh_token",
3109 "access_token",
3110 "id_token",
3111 "token",
3112 "secret",
3113 "private_key",
3114 }
3115)
3118def _mask_oauth_config(oauth_config: Any) -> Any:
3119 """Recursively mask sensitive keys inside an ``oauth_config`` dict.
3121 Args:
3122 oauth_config: The oauth_config value to mask (dict, list, or scalar).
3124 Returns:
3125 The masked copy with sensitive values replaced.
3126 """
3127 if isinstance(oauth_config, dict):
3128 out: Dict[str, Any] = {}
3129 for k, v in oauth_config.items():
3130 if isinstance(k, str) and k.lower() in _SENSITIVE_OAUTH_KEYS:
3131 out[k] = settings.masked_auth_value if v else v
3132 else:
3133 out[k] = _mask_oauth_config(v)
3134 return out
3135 if isinstance(oauth_config, list):
3136 return [_mask_oauth_config(x) for x in oauth_config]
3137 return oauth_config
3140class GatewayRead(BaseModelWithConfigDict):
3141 """Schema for reading gateway information.
3143 Includes all gateway fields plus:
3144 - Database ID
3145 - Capabilities dictionary
3146 - Creation/update timestamps
3147 - enabled status
3148 - reachable status
3149 - Last seen timestamp
3150 - Authentication type: basic, bearer, headers, oauth
3151 - Authentication value: username/password or token or custom headers
3152 - OAuth configuration for OAuth 2.0 authentication
3154 Auto Populated fields:
3155 - Authentication username: for basic auth
3156 - Authentication password: for basic auth
3157 - Authentication token: for bearer auth
3158 - Authentication header key: for headers auth
3159 - Authentication header value: for headers auth
3160 """
3162 id: Optional[str] = Field(None, description="Unique ID of the gateway")
3163 name: str = Field(..., description="Unique name for the gateway")
3164 url: str = Field(..., description="Gateway endpoint URL")
3165 description: Optional[str] = Field(None, description="Gateway description")
3166 transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
3167 capabilities: Dict[str, Any] = Field(default_factory=dict, description="Gateway capabilities")
3168 created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Creation timestamp")
3169 updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Last update timestamp")
3170 enabled: bool = Field(default=True, description="Is the gateway enabled?")
3171 reachable: bool = Field(default=True, description="Is the gateway reachable/online?")
3173 last_seen: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), description="Last seen timestamp")
3175 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
3176 # Authorizations
3177 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, headers, oauth, query_param, or None")
3178 auth_value: Optional[str] = Field(None, description="auth value: username/password or token or custom headers")
3179 auth_headers: Optional[List[Dict[str, str]]] = Field(default=None, description="List of custom headers for authentication")
3180 auth_headers_unmasked: Optional[List[Dict[str, str]]] = Field(default=None, description="Unmasked custom headers for administrative views")
3182 # OAuth 2.0 configuration
3183 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
3185 # Query Parameter Authentication (masked for security)
3186 auth_query_param_key: Optional[str] = Field(
3187 None,
3188 description="Query parameter name for authentication",
3189 )
3190 auth_query_param_value_masked: Optional[str] = Field(
3191 None,
3192 description="Masked indicator if query param auth is configured",
3193 )
3195 # auth_value will populate the following fields
3196 auth_username: Optional[str] = Field(None, description="username for basic authentication")
3197 auth_password: Optional[str] = Field(None, description="password for basic authentication")
3198 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
3199 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
3200 auth_header_value: Optional[str] = Field(None, description="vallue for custom headers authentication")
3201 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the gateway")
3203 auth_password_unmasked: Optional[str] = Field(default=None, description="Unmasked password for basic authentication")
3204 auth_token_unmasked: Optional[str] = Field(default=None, description="Unmasked bearer token for authentication")
3205 auth_header_value_unmasked: Optional[str] = Field(default=None, description="Unmasked single custom header value")
3207 # Team scoping fields for resource organization
3208 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
3209 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
3210 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
3211 visibility: Optional[str] = Field(default="public", description="Gateway visibility: private, team, or public")
3213 # Comprehensive metadata for audit tracking
3214 created_by: Optional[str] = Field(None, description="Username who created this entity")
3215 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
3216 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
3217 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
3219 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
3220 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
3221 modified_via: Optional[str] = Field(None, description="Modification method")
3222 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
3224 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
3225 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
3226 version: Optional[int] = Field(1, description="Entity version for change tracking")
3228 slug: Optional[str] = Field(None, description="Slug for gateway endpoint URL")
3230 # Per-gateway refresh configuration
3231 refresh_interval_seconds: Optional[int] = Field(None, description="Per-gateway refresh interval in seconds")
3232 last_refresh_at: Optional[datetime] = Field(None, description="Timestamp of last successful refresh")
3234 @model_validator(mode="before")
3235 @classmethod
3236 def _mask_query_param_auth(cls, data: Any) -> Any:
3237 """Mask query param auth value when constructing from DB model.
3239 This extracts auth_query_params from the raw data (DB model or dict)
3240 and populates the masked fields for display.
3242 Args:
3243 data: The raw data (dict or ORM model) to process.
3245 Returns:
3246 Any: The processed data with masked query param values.
3247 """
3248 # Handle dict input
3249 if isinstance(data, dict):
3250 auth_query_params = data.get("auth_query_params")
3251 if auth_query_params and isinstance(auth_query_params, dict):
3252 # Extract the param key name and set masked value
3253 first_key = next(iter(auth_query_params.keys()), None)
3254 if first_key: 3254 ↛ 3273line 3254 didn't jump to line 3273 because the condition on line 3254 was always true
3255 data["auth_query_param_key"] = first_key
3256 data["auth_query_param_value_masked"] = settings.masked_auth_value
3257 # Handle ORM model input (has auth_query_params attribute)
3258 elif hasattr(data, "auth_query_params"): 3258 ↛ 3273line 3258 didn't jump to line 3273 because the condition on line 3258 was always true
3259 auth_query_params = getattr(data, "auth_query_params", None)
3260 if auth_query_params and isinstance(auth_query_params, dict):
3261 # Convert ORM to dict for modification, preserving all attributes
3262 # Start with table columns
3263 data_dict = {c.name: getattr(data, c.name) for c in data.__table__.columns}
3264 # Preserve dynamically added attributes like 'team' (from relationships)
3265 for attr in ["team"]:
3266 if hasattr(data, attr): 3266 ↛ 3265line 3266 didn't jump to line 3265 because the condition on line 3266 was always true
3267 data_dict[attr] = getattr(data, attr)
3268 first_key = next(iter(auth_query_params.keys()), None)
3269 if first_key: 3269 ↛ 3272line 3269 didn't jump to line 3272 because the condition on line 3269 was always true
3270 data_dict["auth_query_param_key"] = first_key
3271 data_dict["auth_query_param_value_masked"] = settings.masked_auth_value
3272 return data_dict
3273 return data
3275 # This will be the main method to automatically populate fields
3276 @model_validator(mode="after")
3277 def _populate_auth(self) -> Self:
3278 """Populate authentication fields based on auth_type and encoded auth_value.
3280 This post-validation method decodes the stored authentication value and
3281 populates the appropriate authentication fields (username/password, token,
3282 or custom headers) based on the authentication type. It ensures the
3283 authentication data is properly formatted and accessible through individual
3284 fields for display purposes.
3286 The method handles three authentication types:
3287 - basic: Extracts username and password from Authorization header
3288 - bearer: Extracts token from Bearer Authorization header
3289 - authheaders: Extracts custom header key/value pair
3291 Returns:
3292 Self: The instance with populated authentication fields:
3293 - For basic: auth_username and auth_password
3294 - For bearer: auth_token
3295 - For authheaders: auth_header_key and auth_header_value
3297 Raises:
3298 ValueError: If the authentication data is malformed:
3299 - Basic auth missing username or password
3300 - Bearer auth missing or improperly formatted Authorization header
3301 - Custom headers not exactly one key/value pair
3303 Examples:
3304 >>> # Basic auth example
3305 >>> string_bytes = "admin:secret".encode("utf-8")
3306 >>> encoded_auth = base64.urlsafe_b64encode(string_bytes).decode("utf-8")
3307 >>> values = GatewayRead.model_construct(
3308 ... auth_type="basic",
3309 ... auth_value=encode_auth({"Authorization": f"Basic {encoded_auth}"})
3310 ... )
3311 >>> values = GatewayRead._populate_auth(values)
3312 >>> values.auth_username
3313 'admin'
3314 >>> values.auth_password
3315 'secret'
3317 >>> # Bearer auth example
3318 >>> values = GatewayRead.model_construct(
3319 ... auth_type="bearer",
3320 ... auth_value=encode_auth({"Authorization": "Bearer mytoken123"})
3321 ... )
3322 >>> values = GatewayRead._populate_auth(values)
3323 >>> values.auth_token
3324 'mytoken123'
3326 >>> # Custom headers example
3327 >>> values = GatewayRead.model_construct(
3328 ... auth_type='authheaders',
3329 ... auth_value=encode_auth({"X-API-Key": "abc123"})
3330 ... )
3331 >>> values = GatewayRead._populate_auth(values)
3332 >>> values.auth_header_key
3333 'X-API-Key'
3334 >>> values.auth_header_value
3335 'abc123'
3336 """
3337 auth_type = self.auth_type
3338 auth_value_encoded = self.auth_value
3340 # Skip validation logic if masked value
3341 if auth_value_encoded == settings.masked_auth_value:
3342 return self
3344 # Handle OAuth authentication (no auth_value to decode)
3345 if auth_type == "oauth":
3346 # OAuth gateways don't have traditional auth_value to decode
3347 # They use oauth_config instead
3348 return self
3350 if auth_type == "one_time_auth":
3351 # One-time auth gateways don't store auth_value
3352 return self
3354 if auth_type == "query_param":
3355 # Query param auth is handled by the before validator
3356 # (auth_query_params from DB model is processed there)
3357 return self
3359 # If no encoded value is present, nothing to populate
3360 if not auth_value_encoded:
3361 return self
3363 auth_value = decode_auth(auth_value_encoded)
3364 if auth_type == "basic":
3365 auth = auth_value.get("Authorization")
3366 if not (isinstance(auth, str) and auth.startswith("Basic ")):
3367 raise ValueError("basic auth requires an Authorization header of the form 'Basic <base64>'")
3368 auth = auth.removeprefix("Basic ")
3369 u, p = base64.urlsafe_b64decode(auth).decode("utf-8").split(":")
3370 if not u or not p:
3371 raise ValueError("basic auth requires both username and password")
3372 self.auth_username, self.auth_password = u, p
3373 self.auth_password_unmasked = p
3375 elif auth_type == "bearer":
3376 auth = auth_value.get("Authorization")
3377 if not (isinstance(auth, str) and auth.startswith("Bearer ")):
3378 raise ValueError("bearer auth requires an Authorization header of the form 'Bearer <token>'")
3379 self.auth_token = auth.removeprefix("Bearer ")
3380 self.auth_token_unmasked = self.auth_token
3382 elif auth_type == "authheaders": 3382 ↛ 3392line 3382 didn't jump to line 3392 because the condition on line 3382 was always true
3383 # For backward compatibility, populate first header in key/value fields
3384 if not isinstance(auth_value, dict) or len(auth_value) == 0:
3385 raise ValueError("authheaders requires at least one key/value pair")
3386 self.auth_headers = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
3387 self.auth_headers_unmasked = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
3388 k, v = next(iter(auth_value.items()))
3389 self.auth_header_key, self.auth_header_value = k, v
3390 self.auth_header_value_unmasked = v
3392 return self
3394 def masked(self) -> "GatewayRead":
3395 """
3396 Return a masked version of the model instance with sensitive authentication fields hidden.
3398 This method creates a dictionary representation of the model data and replaces sensitive fields
3399 such as `auth_value`, `auth_password`, `auth_token`, and `auth_header_value` with a masked
3400 placeholder value defined in `settings.masked_auth_value`. Masking is only applied if the fields
3401 are present and not already masked.
3403 Args:
3404 None
3406 Returns:
3407 GatewayRead: A new instance of the GatewayRead model with sensitive authentication-related fields
3408 masked to prevent exposure of sensitive information.
3410 Notes:
3411 - The `auth_value` field is only masked if it exists and its value is different from the masking
3412 placeholder.
3413 - Other sensitive fields (`auth_password`, `auth_token`, `auth_header_value`) are masked if present.
3414 - Fields not related to authentication remain unmodified.
3415 """
3416 masked_data = self.model_dump()
3418 # Only mask if auth_value is present and not already masked
3419 if masked_data.get("auth_value") and masked_data["auth_value"] != settings.masked_auth_value:
3420 masked_data["auth_value"] = settings.masked_auth_value
3422 masked_data["auth_password"] = settings.masked_auth_value if masked_data.get("auth_password") else None
3423 masked_data["auth_token"] = settings.masked_auth_value if masked_data.get("auth_token") else None
3424 masked_data["auth_header_value"] = settings.masked_auth_value if masked_data.get("auth_header_value") else None
3425 if masked_data.get("auth_headers"):
3426 masked_data["auth_headers"] = [
3427 {
3428 "key": header.get("key"),
3429 "value": settings.masked_auth_value if header.get("value") else header.get("value"),
3430 }
3431 for header in masked_data["auth_headers"]
3432 ]
3434 # Mask sensitive keys inside oauth_config (e.g. password, client_secret)
3435 if masked_data.get("oauth_config"):
3436 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
3438 # SECURITY: Never expose unmasked credentials in API responses
3439 masked_data["auth_password_unmasked"] = None
3440 masked_data["auth_token_unmasked"] = None
3441 masked_data["auth_header_value_unmasked"] = None
3442 masked_data["auth_headers_unmasked"] = None
3443 return GatewayRead.model_validate(masked_data)
3446class GatewayRefreshResponse(BaseModelWithConfigDict):
3447 """Response schema for manual gateway refresh API.
3449 Contains counts of added, updated, and removed items for tools, resources, and prompts,
3450 along with any validation errors encountered during the refresh operation.
3451 """
3453 gateway_id: str = Field(..., description="ID of the refreshed gateway")
3454 success: bool = Field(default=True, description="Whether the refresh operation was successful")
3455 error: Optional[str] = Field(None, description="Error message if the refresh failed")
3456 tools_added: int = Field(default=0, description="Number of tools added")
3457 tools_updated: int = Field(default=0, description="Number of tools updated")
3458 tools_removed: int = Field(default=0, description="Number of tools removed")
3459 resources_added: int = Field(default=0, description="Number of resources added")
3460 resources_updated: int = Field(default=0, description="Number of resources updated")
3461 resources_removed: int = Field(default=0, description="Number of resources removed")
3462 prompts_added: int = Field(default=0, description="Number of prompts added")
3463 prompts_updated: int = Field(default=0, description="Number of prompts updated")
3464 prompts_removed: int = Field(default=0, description="Number of prompts removed")
3465 validation_errors: List[str] = Field(default_factory=list, description="List of validation errors encountered")
3466 duration_ms: float = Field(..., description="Duration of the refresh operation in milliseconds")
3467 refreshed_at: datetime = Field(..., description="Timestamp when the refresh completed")
3470class FederatedTool(BaseModelWithConfigDict):
3471 """Schema for tools provided by federated gateways.
3473 Contains:
3474 - Tool definition
3475 - Source gateway information
3476 """
3478 tool: MCPTool
3479 gateway_id: str
3480 gateway_name: str
3481 gateway_url: str
3484class FederatedResource(BaseModelWithConfigDict):
3485 """Schema for resources from federated gateways.
3487 Contains:
3488 - Resource definition
3489 - Source gateway information
3490 """
3492 resource: MCPResource
3493 gateway_id: str
3494 gateway_name: str
3495 gateway_url: str
3498class FederatedPrompt(BaseModelWithConfigDict):
3499 """Schema for prompts from federated gateways.
3501 Contains:
3502 - Prompt definition
3503 - Source gateway information
3504 """
3506 prompt: MCPPrompt
3507 gateway_id: str
3508 gateway_name: str
3509 gateway_url: str
3512# --- RPC Schemas ---
3513class RPCRequest(BaseModel):
3514 """MCP-compliant RPC request validation"""
3516 model_config = ConfigDict(hide_input_in_errors=True)
3518 jsonrpc: Literal["2.0"]
3519 method: str
3520 params: Optional[Dict[str, Any]] = None
3521 id: Optional[Union[int, str]] = None
3523 @field_validator("method")
3524 @classmethod
3525 def validate_method(cls, v: str) -> str:
3526 """Ensure method names follow MCP format
3528 Args:
3529 v (str): Value to validate
3531 Returns:
3532 str: Value if determined as safe
3534 Raises:
3535 ValueError: When value is not safe
3536 """
3537 SecurityValidator.validate_no_xss(v, "RPC method name")
3538 # Runtime pattern matching (not precompiled to allow test monkeypatching)
3539 if not re.match(settings.validation_tool_method_pattern, v):
3540 raise ValueError("Invalid method name format")
3541 if len(v) > settings.validation_max_method_length:
3542 raise ValueError("Method name too long")
3543 return v
3545 @field_validator("params")
3546 @classmethod
3547 def validate_params(cls, v: Optional[Union[Dict, List]]) -> Optional[Union[Dict, List]]:
3548 """Validate RPC parameters
3550 Args:
3551 v (Union[dict, list]): Value to validate
3553 Returns:
3554 Union[dict, list]: Value if determined as safe
3556 Raises:
3557 ValueError: When value is not safe
3558 """
3559 if v is None:
3560 return v
3562 # Check size limits (MCP recommends max 256KB for params)
3563 param_size = len(orjson.dumps(v))
3564 if param_size > settings.validation_max_rpc_param_size:
3565 raise ValueError(f"Parameters exceed maximum size of {settings.validation_max_rpc_param_size} bytes")
3567 # Check depth
3568 SecurityValidator.validate_json_depth(v)
3569 return v
3572class RPCResponse(BaseModelWithConfigDict):
3573 """Schema for JSON-RPC 2.0 responses.
3575 Contains:
3576 - Protocol version
3577 - Result or error
3578 - Request ID
3579 """
3581 jsonrpc: Literal["2.0"]
3582 result: Optional[Any] = None
3583 error: Optional[Dict[str, Any]] = None
3584 id: Optional[Union[int, str]] = None
3587# --- Event and Admin Schemas ---
3590class EventMessage(BaseModelWithConfigDict):
3591 """Schema for SSE event messages.
3593 Includes:
3594 - Event type
3595 - Event data payload
3596 - Event timestamp
3597 """
3599 type: str = Field(..., description="Event type (tool_added, resource_updated, etc)")
3600 data: Dict[str, Any] = Field(..., description="Event payload")
3601 timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
3603 @field_serializer("timestamp")
3604 def serialize_timestamp(self, dt: datetime) -> str:
3605 """
3606 Serialize the `timestamp` field as an ISO 8601 string with UTC timezone.
3608 Converts the given datetime to UTC and returns it in ISO 8601 format,
3609 replacing the "+00:00" suffix with "Z" to indicate UTC explicitly.
3611 Args:
3612 dt (datetime): The datetime object to serialize.
3614 Returns:
3615 str: ISO 8601 formatted string in UTC, ending with 'Z'.
3616 """
3617 return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
3620class AdminToolCreate(BaseModelWithConfigDict):
3621 """Schema for creating tools via admin UI.
3623 Handles:
3624 - Basic tool information
3625 - JSON string inputs for headers/schema
3626 """
3628 name: str
3629 url: str
3630 description: Optional[str] = None
3631 integration_type: str = "MCP"
3632 headers: Optional[str] = None # JSON string
3633 input_schema: Optional[str] = None # JSON string
3635 @field_validator("headers", "input_schema")
3636 @classmethod
3637 def validate_json(cls, v: Optional[str]) -> Optional[Dict[str, Any]]:
3638 """
3639 Validate and parse JSON string inputs.
3641 Args:
3642 v: Input string
3644 Returns:
3645 dict: Output JSON version of v
3647 Raises:
3648 ValueError: When unable to convert to JSON
3649 """
3650 if not v:
3651 return None
3652 try:
3653 return orjson.loads(v)
3654 except orjson.JSONDecodeError:
3655 raise ValueError("Invalid JSON")
3658class AdminGatewayCreate(BaseModelWithConfigDict):
3659 """Schema for creating gateways via admin UI.
3661 Captures:
3662 - Gateway name
3663 - Endpoint URL
3664 - Optional description
3665 """
3667 name: str
3668 url: str
3669 description: Optional[str] = None
3672# --- New Schemas for Status Toggle Operations ---
3675class StatusToggleRequest(BaseModelWithConfigDict):
3676 """Request schema for toggling active status."""
3678 activate: bool = Field(..., description="Whether to activate (true) or deactivate (false) the item")
3681class StatusToggleResponse(BaseModelWithConfigDict):
3682 """Response schema for status toggle operations."""
3684 id: int
3685 name: str
3686 is_active: bool
3687 message: str = Field(..., description="Success message")
3690# --- Optional Filter Parameters for Listing Operations ---
3693class ListFilters(BaseModelWithConfigDict):
3694 """Filtering options for list operations."""
3696 include_inactive: bool = Field(False, description="Whether to include inactive items in the results")
3699# --- Server Schemas ---
3702class ServerCreate(BaseModel):
3703 """
3704 Schema for creating a new server.
3706 Attributes:
3707 model_config (ConfigDict): Configuration for the model, such as stripping whitespace from strings.
3708 name (str): The server's name.
3709 description (Optional[str]): Optional description of the server.
3710 icon (Optional[str]): Optional URL for the server's icon.
3711 associated_tools (Optional[List[str]]): Optional list of associated tool IDs.
3712 associated_resources (Optional[List[str]]): Optional list of associated resource IDs.
3713 associated_prompts (Optional[List[str]]): Optional list of associated prompt IDs.
3714 """
3716 model_config = ConfigDict(str_strip_whitespace=True)
3718 id: Optional[str] = Field(None, description="Custom UUID for the server (if not provided, one will be generated)")
3719 name: str = Field(..., description="The server's name")
3720 description: Optional[str] = Field(None, description="Server description")
3721 icon: Optional[str] = Field(None, description="URL for the server's icon")
3722 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the server")
3724 @field_validator("tags")
3725 @classmethod
3726 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
3727 """Validate and normalize tags.
3729 Args:
3730 v: Optional list of tag strings to validate
3732 Returns:
3733 List of validated tag strings
3734 """
3735 return validate_tags_field(v)
3737 @field_validator("id")
3738 @classmethod
3739 def validate_id(cls, v: Optional[str]) -> Optional[str]:
3740 """Validate server ID/UUID format
3742 Args:
3743 v (str): Value to validate
3745 Returns:
3746 str: Value if validated as safe
3748 Raises:
3749 ValueError: When displayName contains unsafe content or exceeds length limits
3751 Examples:
3752 >>> from mcpgateway.schemas import ServerCreate
3753 >>> ServerCreate.validate_id('550e8400-e29b-41d4-a716-446655440000')
3754 '550e8400e29b41d4a716446655440000'
3755 >>> ServerCreate.validate_id('invalid-uuid')
3756 Traceback (most recent call last):
3757 ...
3758 ValueError: ...
3759 """
3760 if v is None:
3761 return v
3762 return SecurityValidator.validate_uuid(v, "Server ID")
3764 associated_tools: Optional[List[str]] = Field(None, description="Comma-separated tool IDs")
3765 associated_resources: Optional[List[str]] = Field(None, description="Comma-separated resource IDs")
3766 associated_prompts: Optional[List[str]] = Field(None, description="Comma-separated prompt IDs")
3767 associated_a2a_agents: Optional[List[str]] = Field(None, description="Comma-separated A2A agent IDs")
3769 # Team scoping fields
3770 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
3771 owner_email: Optional[str] = Field(None, description="Email of the server owner")
3772 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
3774 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
3775 oauth_enabled: bool = Field(False, description="Enable OAuth 2.0 for MCP client authentication")
3776 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
3778 @field_validator("name")
3779 @classmethod
3780 def validate_name(cls, v: str) -> str:
3781 """Validate server name
3783 Args:
3784 v (str): Value to validate
3786 Returns:
3787 str: Value if validated as safe
3788 """
3789 return SecurityValidator.validate_name(v, "Server name")
3791 @field_validator("description")
3792 @classmethod
3793 def validate_description(cls, v: Optional[str]) -> Optional[str]:
3794 """Ensure descriptions display safely, truncate if too long
3796 Args:
3797 v (str): Value to validate
3799 Returns:
3800 str: Value if validated as safe and truncated if too long
3802 Raises:
3803 ValueError: When value is unsafe
3805 Examples:
3806 >>> from mcpgateway.schemas import ServerCreate
3807 >>> ServerCreate.validate_description('A safe description')
3808 'A safe description'
3809 >>> ServerCreate.validate_description(None) # Test None case
3810 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
3811 >>> truncated = ServerCreate.validate_description(long_desc)
3812 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
3813 0
3814 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
3815 True
3816 """
3817 if v is None:
3818 return v
3819 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
3820 # Truncate the description to the maximum allowed length
3821 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
3822 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
3823 return SecurityValidator.sanitize_display_text(truncated, "Description")
3824 return SecurityValidator.sanitize_display_text(v, "Description")
3826 @field_validator("icon")
3827 @classmethod
3828 def validate_icon(cls, v: Optional[str]) -> Optional[str]:
3829 """Validate icon URL
3831 Args:
3832 v (str): Value to validate
3834 Returns:
3835 str: Value if validated as safe
3836 """
3837 if v is None or v == "":
3838 return v
3839 return SecurityValidator.validate_url(v, "Icon URL")
3841 @field_validator("associated_tools", "associated_resources", "associated_prompts", "associated_a2a_agents", mode="before")
3842 @classmethod
3843 def split_comma_separated(cls, v):
3844 """
3845 Splits a comma-separated string into a list of strings if needed.
3847 Args:
3848 v: Input string
3850 Returns:
3851 list: Comma separated array of input string
3852 """
3853 if isinstance(v, str):
3854 return [item.strip() for item in v.split(",") if item.strip()]
3855 return v
3857 @field_validator("visibility")
3858 @classmethod
3859 def validate_visibility(cls, v: str) -> str:
3860 """Validate visibility level.
3862 Args:
3863 v: Visibility value to validate
3865 Returns:
3866 Validated visibility value
3868 Raises:
3869 ValueError: If visibility is invalid
3870 """
3871 if v not in ["private", "team", "public"]:
3872 raise ValueError("Visibility must be one of: private, team, public")
3873 return v
3875 @field_validator("team_id")
3876 @classmethod
3877 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
3878 """Validate team ID format.
3880 Args:
3881 v: Team ID to validate
3883 Returns:
3884 Validated team ID
3885 """
3886 if v is not None: 3886 ↛ 3888line 3886 didn't jump to line 3888 because the condition on line 3886 was always true
3887 return SecurityValidator.validate_uuid(v, "team_id")
3888 return v
3891class ServerUpdate(BaseModelWithConfigDict):
3892 """Schema for updating an existing server.
3894 All fields are optional to allow partial updates.
3895 """
3897 id: Optional[str] = Field(None, description="Custom UUID for the server")
3898 name: Optional[str] = Field(None, description="The server's name")
3899 description: Optional[str] = Field(None, description="Server description")
3900 icon: Optional[str] = Field(None, description="URL for the server's icon")
3901 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the server")
3903 # Team scoping fields
3904 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
3905 owner_email: Optional[str] = Field(None, description="Email of the server owner")
3906 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
3908 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
3909 oauth_enabled: Optional[bool] = Field(None, description="Enable OAuth 2.0 for MCP client authentication")
3910 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
3912 @field_validator("tags")
3913 @classmethod
3914 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
3915 """Validate and normalize tags.
3917 Args:
3918 v: Optional list of tag strings to validate
3920 Returns:
3921 List of validated tag strings
3922 """
3923 return validate_tags_field(v)
3925 @field_validator("id")
3926 @classmethod
3927 def validate_id(cls, v: Optional[str]) -> Optional[str]:
3928 """Validate server ID/UUID format
3930 Args:
3931 v (str): Value to validate
3933 Returns:
3934 str: Value if validated as safe
3936 Raises:
3937 ValueError: When displayName contains unsafe content or exceeds length limits
3939 Examples:
3940 >>> from mcpgateway.schemas import ServerUpdate
3941 >>> ServerUpdate.validate_id('550e8400-e29b-41d4-a716-446655440000')
3942 '550e8400e29b41d4a716446655440000'
3943 >>> ServerUpdate.validate_id('invalid-uuid')
3944 Traceback (most recent call last):
3945 ...
3946 ValueError: ...
3947 """
3948 if v is None:
3949 return v
3950 return SecurityValidator.validate_uuid(v, "Server ID")
3952 associated_tools: Optional[List[str]] = Field(None, description="Comma-separated tool IDs")
3953 associated_resources: Optional[List[str]] = Field(None, description="Comma-separated resource IDs")
3954 associated_prompts: Optional[List[str]] = Field(None, description="Comma-separated prompt IDs")
3955 associated_a2a_agents: Optional[List[str]] = Field(None, description="Comma-separated A2A agent IDs")
3957 @field_validator("name")
3958 @classmethod
3959 def validate_name(cls, v: str) -> str:
3960 """Validate server name
3962 Args:
3963 v (str): Value to validate
3965 Returns:
3966 str: Value if validated as safe
3967 """
3968 return SecurityValidator.validate_name(v, "Server name")
3970 @field_validator("description")
3971 @classmethod
3972 def validate_description(cls, v: Optional[str]) -> Optional[str]:
3973 """Ensure descriptions display safely, truncate if too long
3975 Args:
3976 v (str): Value to validate
3978 Returns:
3979 str: Value if validated as safe and truncated if too long
3981 Raises:
3982 ValueError: When value is unsafe
3984 Examples:
3985 >>> from mcpgateway.schemas import ServerUpdate
3986 >>> ServerUpdate.validate_description('A safe description')
3987 'A safe description'
3988 >>> ServerUpdate.validate_description(None) # Test None case
3989 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
3990 >>> truncated = ServerUpdate.validate_description(long_desc)
3991 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
3992 0
3993 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
3994 True
3995 """
3996 if v is None:
3997 return v
3998 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
3999 # Truncate the description to the maximum allowed length
4000 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4001 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4002 return SecurityValidator.sanitize_display_text(truncated, "Description")
4003 return SecurityValidator.sanitize_display_text(v, "Description")
4005 @field_validator("icon")
4006 @classmethod
4007 def validate_icon(cls, v: Optional[str]) -> Optional[str]:
4008 """Validate icon URL
4010 Args:
4011 v (str): Value to validate
4013 Returns:
4014 str: Value if validated as safe
4015 """
4016 if v is None or v == "":
4017 return v
4018 return SecurityValidator.validate_url(v, "Icon URL")
4020 @field_validator("associated_tools", "associated_resources", "associated_prompts", "associated_a2a_agents", mode="before")
4021 @classmethod
4022 def split_comma_separated(cls, v):
4023 """
4024 Splits a comma-separated string into a list of strings if needed.
4026 Args:
4027 v: Input string
4029 Returns:
4030 list: Comma separated array of input string
4031 """
4032 if isinstance(v, str):
4033 return [item.strip() for item in v.split(",") if item.strip()]
4034 return v
4037class ServerRead(BaseModelWithConfigDict):
4038 """Schema for reading server information.
4040 Includes all server fields plus:
4041 - Database ID
4042 - Associated tool, resource, and prompt IDs
4043 - Creation/update timestamps
4044 - Active status
4045 - Metrics: Aggregated metrics for the server invocations.
4046 """
4048 id: str
4049 name: str
4050 description: Optional[str]
4051 icon: Optional[str]
4052 created_at: datetime
4053 updated_at: datetime
4054 # is_active: bool
4055 enabled: bool
4056 associated_tools: List[str] = []
4057 associated_resources: List[str] = []
4058 associated_prompts: List[str] = []
4059 associated_a2a_agents: List[str] = []
4060 metrics: Optional[ServerMetrics] = Field(None, description="Server metrics (may be None in list operations)")
4061 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the server")
4063 # Comprehensive metadata for audit tracking
4064 created_by: Optional[str] = Field(None, description="Username who created this entity")
4065 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
4066 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
4067 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
4069 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
4070 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
4071 modified_via: Optional[str] = Field(None, description="Modification method")
4072 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
4074 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
4075 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
4076 version: Optional[int] = Field(1, description="Entity version for change tracking")
4078 # Team scoping fields
4079 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
4080 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
4081 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
4082 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
4084 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
4085 oauth_enabled: bool = Field(False, description="Whether OAuth 2.0 is enabled for MCP client authentication")
4086 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
4088 @model_validator(mode="before")
4089 @classmethod
4090 def populate_associated_ids(cls, values):
4091 """
4092 Pre-validation method that converts associated objects to their 'id'.
4094 This method checks 'associated_tools', 'associated_resources', and
4095 'associated_prompts' in the input and replaces each object with its `id`
4096 if present.
4098 Args:
4099 values (dict): The input values.
4101 Returns:
4102 dict: Updated values with object ids, or the original values if no
4103 changes are made.
4104 """
4105 # Normalize to a mutable dict
4106 if isinstance(values, dict):
4107 data = dict(values)
4108 else:
4109 try:
4110 data = dict(vars(values))
4111 except Exception:
4112 return values
4114 if data.get("associated_tools"):
4115 data["associated_tools"] = [getattr(tool, "id", tool) for tool in data["associated_tools"]]
4116 if data.get("associated_resources"):
4117 data["associated_resources"] = [getattr(res, "id", res) for res in data["associated_resources"]]
4118 if data.get("associated_prompts"):
4119 data["associated_prompts"] = [getattr(prompt, "id", prompt) for prompt in data["associated_prompts"]]
4120 if data.get("associated_a2a_agents"):
4121 data["associated_a2a_agents"] = [getattr(agent, "id", agent) for agent in data["associated_a2a_agents"]]
4122 return data
4125class GatewayTestRequest(BaseModelWithConfigDict):
4126 """Schema for testing gateway connectivity.
4128 Includes the HTTP method, base URL, path, optional headers, body, and content type.
4129 """
4131 method: str = Field(..., description="HTTP method to test (GET, POST, etc.)")
4132 base_url: AnyHttpUrl = Field(..., description="Base URL of the gateway to test")
4133 path: str = Field(..., description="Path to append to the base URL")
4134 headers: Optional[Dict[str, str]] = Field(None, description="Optional headers for the request")
4135 body: Optional[Union[str, Dict[str, Any]]] = Field(None, description="Optional body for the request, can be a string or JSON object")
4136 content_type: Optional[str] = Field("application/json", description="Content type for the request body")
4139class GatewayTestResponse(BaseModelWithConfigDict):
4140 """Schema for the response from a gateway test request.
4142 Contains:
4143 - HTTP status code
4144 - Latency in milliseconds
4145 - Optional response body, which can be a string or JSON object
4146 """
4148 status_code: int = Field(..., description="HTTP status code returned by the gateway")
4149 latency_ms: int = Field(..., description="Latency of the request in milliseconds")
4150 body: Optional[Union[str, Dict[str, Any]]] = Field(None, description="Response body, can be a string or JSON object")
4153class TaggedEntity(BaseModelWithConfigDict):
4154 """A simplified representation of an entity that has a tag."""
4156 id: str = Field(..., description="The entity's ID")
4157 name: str = Field(..., description="The entity's name")
4158 type: str = Field(..., description="The entity type (tool, resource, prompt, server, gateway)")
4159 description: Optional[str] = Field(None, description="The entity's description")
4162class TagStats(BaseModelWithConfigDict):
4163 """Statistics for a single tag across all entity types."""
4165 tools: int = Field(default=0, description="Number of tools with this tag")
4166 resources: int = Field(default=0, description="Number of resources with this tag")
4167 prompts: int = Field(default=0, description="Number of prompts with this tag")
4168 servers: int = Field(default=0, description="Number of servers with this tag")
4169 gateways: int = Field(default=0, description="Number of gateways with this tag")
4170 total: int = Field(default=0, description="Total occurrences of this tag")
4173class TagInfo(BaseModelWithConfigDict):
4174 """Information about a single tag."""
4176 name: str = Field(..., description="The tag name")
4177 stats: TagStats = Field(..., description="Statistics for this tag")
4178 entities: Optional[List[TaggedEntity]] = Field(default_factory=list, description="Entities that have this tag")
4181class TopPerformer(BaseModelWithConfigDict):
4182 """Schema for representing top-performing entities with performance metrics.
4184 Used to encapsulate metrics for entities such as prompts, resources, servers, or tools,
4185 including execution count, average response time, success rate, and last execution timestamp.
4187 Attributes:
4188 id (Union[str, int]): Unique identifier for the entity.
4189 name (str): Name of the entity (e.g., prompt name, resource URI, server name, or tool name).
4190 execution_count (int): Total number of executions for the entity.
4191 avg_response_time (Optional[float]): Average response time in seconds, or None if no metrics.
4192 success_rate (Optional[float]): Success rate percentage, or None if no metrics.
4193 last_execution (Optional[datetime]): Timestamp of the last execution, or None if no metrics.
4194 """
4196 id: Union[str, int] = Field(..., description="Entity ID")
4197 name: str = Field(..., description="Entity name")
4198 execution_count: int = Field(..., description="Number of executions")
4199 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
4200 success_rate: Optional[float] = Field(None, description="Success rate percentage")
4201 last_execution: Optional[datetime] = Field(None, description="Timestamp of last execution")
4204# --- A2A Agent Schemas ---
4207class A2AAgentCreate(BaseModel):
4208 """
4209 Schema for creating a new A2A (Agent-to-Agent) compatible agent.
4211 Attributes:
4212 model_config (ConfigDict): Configuration for the model.
4213 name (str): Unique name for the agent.
4214 description (Optional[str]): Optional description of the agent.
4215 endpoint_url (str): URL endpoint for the agent.
4216 agent_type (str): Type of agent (e.g., "openai", "anthropic", "custom").
4217 protocol_version (str): A2A protocol version supported.
4218 capabilities (Dict[str, Any]): Agent capabilities and features.
4219 config (Dict[str, Any]): Agent-specific configuration parameters.
4220 auth_type (Optional[str]): Type of authentication ("api_key", "oauth", "bearer", etc.).
4221 auth_username (Optional[str]): Username for basic authentication.
4222 auth_password (Optional[str]): Password for basic authentication.
4223 auth_token (Optional[str]): Token for bearer authentication.
4224 auth_header_key (Optional[str]): Key for custom headers authentication.
4225 auth_header_value (Optional[str]): Value for custom headers authentication.
4226 auth_headers (Optional[List[Dict[str, str]]]): List of custom headers for authentication.
4227 auth_value (Optional[str]): Alias for authentication value, used for better access post-validation.
4228 tags (List[str]): Tags for categorizing the agent.
4229 team_id (Optional[str]): Team ID for resource organization.
4230 visibility (str): Visibility level ("private", "team", "public").
4231 """
4233 model_config = ConfigDict(str_strip_whitespace=True)
4235 name: str = Field(..., description="Unique name for the agent")
4236 slug: Optional[str] = Field(None, description="Optional slug for the agent (auto-generated if not provided)")
4237 description: Optional[str] = Field(None, description="Agent description")
4238 endpoint_url: str = Field(..., description="URL endpoint for the agent")
4239 agent_type: str = Field(default="generic", description="Type of agent (e.g., 'openai', 'anthropic', 'custom')")
4240 protocol_version: str = Field(default="1.0", description="A2A protocol version supported")
4241 capabilities: Dict[str, Any] = Field(default_factory=dict, description="Agent capabilities and features")
4242 config: Dict[str, Any] = Field(default_factory=dict, description="Agent-specific configuration parameters")
4243 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4244 # Authorizations
4245 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, headers, oauth, query_param, or none")
4246 # Fields for various types of authentication
4247 auth_username: Optional[str] = Field(None, description="Username for basic authentication")
4248 auth_password: Optional[str] = Field(None, description="Password for basic authentication")
4249 auth_token: Optional[str] = Field(None, description="Token for bearer authentication")
4250 auth_header_key: Optional[str] = Field(None, description="Key for custom headers authentication")
4251 auth_header_value: Optional[str] = Field(None, description="Value for custom headers authentication")
4252 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
4254 # OAuth 2.0 configuration
4255 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4257 # Query Parameter Authentication (CWE-598 security concern - use only when required by upstream)
4258 auth_query_param_key: Optional[str] = Field(
4259 None,
4260 description="Query parameter name for authentication (e.g., 'tavilyApiKey')",
4261 )
4262 auth_query_param_value: Optional[SecretStr] = Field(
4263 None,
4264 description="Query parameter value (API key) - will be encrypted at rest",
4265 )
4267 # Adding `auth_value` as an alias for better access post-validation
4268 auth_value: Optional[str] = Field(None, validate_default=True)
4269 tags: List[str] = Field(default_factory=list, description="Tags for categorizing the agent")
4271 # Team scoping fields
4272 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4273 owner_email: Optional[str] = Field(None, description="Email of the agent owner")
4274 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
4276 @field_validator("tags")
4277 @classmethod
4278 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
4279 """Validate and normalize tags.
4281 Args:
4282 v: Optional list of tag strings to validate
4284 Returns:
4285 List of validated tag strings
4286 """
4287 return validate_tags_field(v)
4289 @field_validator("name")
4290 @classmethod
4291 def validate_name(cls, v: str) -> str:
4292 """Validate agent name
4294 Args:
4295 v (str): Value to validate
4297 Returns:
4298 str: Value if validated as safe
4299 """
4300 return SecurityValidator.validate_name(v, "A2A Agent name")
4302 @field_validator("endpoint_url")
4303 @classmethod
4304 def validate_endpoint_url(cls, v: str) -> str:
4305 """Validate agent endpoint URL
4307 Args:
4308 v (str): Value to validate
4310 Returns:
4311 str: Value if validated as safe
4312 """
4313 return SecurityValidator.validate_url(v, "Agent endpoint URL")
4315 @field_validator("description")
4316 @classmethod
4317 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4318 """Ensure descriptions display safely, truncate if too long
4320 Args:
4321 v (str): Value to validate
4323 Returns:
4324 str: Value if validated as safe and truncated if too long
4326 Raises:
4327 ValueError: When value is unsafe
4329 Examples:
4330 >>> from mcpgateway.schemas import A2AAgentCreate
4331 >>> A2AAgentCreate.validate_description('A safe description')
4332 'A safe description'
4333 >>> A2AAgentCreate.validate_description(None) # Test None case
4334 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4335 >>> truncated = A2AAgentCreate.validate_description(long_desc)
4336 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4337 0
4338 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4339 True
4340 """
4341 if v is None:
4342 return v
4343 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4344 # Truncate the description to the maximum allowed length
4345 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4346 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4347 return SecurityValidator.sanitize_display_text(truncated, "Description")
4348 return SecurityValidator.sanitize_display_text(v, "Description")
4350 @field_validator("capabilities", "config")
4351 @classmethod
4352 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
4353 """Validate JSON structure depth
4355 Args:
4356 v (dict): Value to validate
4358 Returns:
4359 dict: Value if validated as safe
4360 """
4361 SecurityValidator.validate_json_depth(v)
4362 return v
4364 @field_validator("visibility")
4365 @classmethod
4366 def validate_visibility(cls, v: str) -> str:
4367 """Validate visibility level.
4369 Args:
4370 v: Visibility value to validate
4372 Returns:
4373 Validated visibility value
4375 Raises:
4376 ValueError: If visibility is invalid
4377 """
4378 if v not in ["private", "team", "public"]:
4379 raise ValueError("Visibility must be one of: private, team, public")
4380 return v
4382 @field_validator("team_id")
4383 @classmethod
4384 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4385 """Validate team ID format.
4387 Args:
4388 v: Team ID to validate
4390 Returns:
4391 Validated team ID
4392 """
4393 if v is not None:
4394 return SecurityValidator.validate_uuid(v, "team_id")
4395 return v
4397 @field_validator("auth_value", mode="before")
4398 @classmethod
4399 def create_auth_value(cls, v, info):
4400 """
4401 This validator will run before the model is fully instantiated (mode="before")
4402 It will process the auth fields based on auth_type and generate auth_value.
4404 Args:
4405 v: Input url
4406 info: ValidationInfo containing auth_type
4408 Returns:
4409 str: Auth value
4410 """
4411 data = info.data
4412 auth_type = data.get("auth_type")
4414 if (auth_type is None) or (auth_type == ""):
4415 return v # If no auth_type is provided, no need to create auth_value
4417 # Process the auth fields and generate auth_value based on auth_type
4418 auth_value = cls._process_auth_fields(info)
4419 return auth_value
4421 @staticmethod
4422 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
4423 """
4424 Processes the input authentication fields and returns the correct auth_value.
4425 This method is called based on the selected auth_type.
4427 Args:
4428 info: ValidationInfo containing auth fields
4430 Returns:
4431 Encoded auth string or None
4433 Raises:
4434 ValueError: If auth_type is invalid
4435 """
4436 data = info.data
4437 auth_type = data.get("auth_type")
4439 if auth_type == "basic":
4440 # For basic authentication, both username and password must be present
4441 username = data.get("auth_username")
4442 password = data.get("auth_password")
4444 if not username or not password:
4445 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
4447 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
4448 return encode_auth({"Authorization": f"Basic {creds}"})
4450 if auth_type == "bearer":
4451 # For bearer authentication, only token is required
4452 token = data.get("auth_token")
4454 if not token:
4455 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
4457 return encode_auth({"Authorization": f"Bearer {token}"})
4459 if auth_type == "oauth":
4460 # For OAuth authentication, we don't encode anything here
4461 # The OAuth configuration is handled separately in the oauth_config field
4462 # This method is only called for traditional auth types
4463 return None
4465 if auth_type == "authheaders":
4466 # Support both new multi-headers format and legacy single header format
4467 auth_headers = data.get("auth_headers")
4468 if auth_headers and isinstance(auth_headers, list):
4469 # New multi-headers format with enhanced validation
4470 header_dict = {}
4471 duplicate_keys = set()
4473 for header in auth_headers:
4474 if not isinstance(header, dict):
4475 continue
4477 key = header.get("key")
4478 value = header.get("value", "")
4480 # Skip headers without keys
4481 if not key:
4482 continue
4484 # Track duplicate keys (last value wins)
4485 if key in header_dict:
4486 duplicate_keys.add(key)
4488 # Validate header key format (basic HTTP header validation)
4489 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
4490 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
4492 # Store header (empty values are allowed)
4493 header_dict[key] = value
4495 # Ensure at least one valid header
4496 if not header_dict:
4497 raise ValueError("For 'headers' auth, at least one valid header with a key must be provided.")
4499 # Warn about duplicate keys (optional - could log this instead)
4500 if duplicate_keys:
4501 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
4503 # Check for excessive headers (prevent abuse)
4504 if len(header_dict) > 100:
4505 raise ValueError("Maximum of 100 headers allowed per gateway.")
4507 return encode_auth(header_dict)
4509 # Legacy single header format (backward compatibility)
4510 header_key = data.get("auth_header_key")
4511 header_value = data.get("auth_header_value")
4513 if not header_key or not header_value:
4514 raise ValueError("For 'headers' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
4516 return encode_auth({header_key: header_value})
4518 if auth_type == "one_time_auth":
4519 # One-time auth does not require encoding here
4520 return None
4522 if auth_type == "query_param":
4523 # Query param auth doesn't use auth_value field
4524 # Validation is handled by model_validator
4525 return None
4527 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, headers, or query_param.")
4529 @model_validator(mode="after")
4530 def validate_query_param_auth(self) -> "A2AAgentCreate":
4531 """Validate query parameter authentication configuration.
4533 Returns:
4534 A2AAgentCreate: The validated instance.
4536 Raises:
4537 ValueError: If query param auth is disabled or host is not in allowlist.
4538 """
4539 if self.auth_type != "query_param":
4540 return self
4542 # Check feature flag
4543 if not settings.insecure_allow_queryparam_auth:
4544 raise ValueError("Query parameter authentication is disabled. " + "Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable. " + "WARNING: API keys in URLs may appear in proxy logs.")
4546 # Check required fields
4547 if not self.auth_query_param_key:
4548 raise ValueError("auth_query_param_key is required when auth_type is 'query_param'")
4549 if not self.auth_query_param_value:
4550 raise ValueError("auth_query_param_value is required when auth_type is 'query_param'")
4552 # Check host allowlist (if configured)
4553 if settings.insecure_queryparam_auth_allowed_hosts: 4553 ↛ 4563line 4553 didn't jump to line 4563 because the condition on line 4553 was always true
4554 parsed = urlparse(str(self.endpoint_url))
4555 # Extract hostname properly (handles IPv6, ports, userinfo)
4556 hostname = parsed.hostname or parsed.netloc.split("@")[-1].split(":")[0]
4557 hostname_lower = hostname.lower()
4559 if hostname_lower not in settings.insecure_queryparam_auth_allowed_hosts:
4560 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
4561 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query parameter auth. " f"Allowed hosts: {allowed}")
4563 return self
4566class A2AAgentUpdate(BaseModelWithConfigDict):
4567 """Schema for updating an existing A2A agent.
4569 Similar to A2AAgentCreate but all fields are optional to allow partial updates.
4570 """
4572 name: Optional[str] = Field(None, description="Unique name for the agent")
4573 description: Optional[str] = Field(None, description="Agent description")
4574 endpoint_url: Optional[str] = Field(None, description="URL endpoint for the agent")
4575 agent_type: Optional[str] = Field(None, description="Type of agent")
4576 protocol_version: Optional[str] = Field(None, description="A2A protocol version supported")
4577 capabilities: Optional[Dict[str, Any]] = Field(None, description="Agent capabilities and features")
4578 config: Optional[Dict[str, Any]] = Field(None, description="Agent-specific configuration parameters")
4579 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4580 auth_type: Optional[str] = Field(None, description="Type of authentication")
4581 auth_username: Optional[str] = Field(None, description="username for basic authentication")
4582 auth_password: Optional[str] = Field(None, description="password for basic authentication")
4583 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
4584 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
4585 auth_header_value: Optional[str] = Field(None, description="value for custom headers authentication")
4586 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
4588 # Adding `auth_value` as an alias for better access post-validation
4589 auth_value: Optional[str] = Field(None, validate_default=True)
4591 # OAuth 2.0 configuration
4592 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4594 # Query Parameter Authentication (CWE-598 security concern - use only when required by upstream)
4595 auth_query_param_key: Optional[str] = Field(
4596 None,
4597 description="Query parameter name for authentication (e.g., 'tavilyApiKey')",
4598 )
4599 auth_query_param_value: Optional[SecretStr] = Field(
4600 None,
4601 description="Query parameter value (API key) - will be encrypted at rest",
4602 )
4604 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the agent")
4606 # Team scoping fields
4607 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4608 owner_email: Optional[str] = Field(None, description="Email of the agent owner")
4609 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
4611 @field_validator("tags")
4612 @classmethod
4613 def validate_tags(cls, v: Optional[List[str]]) -> Optional[List[str]]:
4614 """Validate and normalize tags.
4616 Args:
4617 v: Optional list of tag strings to validate
4619 Returns:
4620 List of validated tag strings or None if input is None
4621 """
4622 if v is None:
4623 return None
4624 return validate_tags_field(v)
4626 @field_validator("name")
4627 @classmethod
4628 def validate_name(cls, v: str) -> str:
4629 """Validate agent name
4631 Args:
4632 v (str): Value to validate
4634 Returns:
4635 str: Value if validated as safe
4636 """
4637 return SecurityValidator.validate_name(v, "A2A Agent name")
4639 @field_validator("endpoint_url")
4640 @classmethod
4641 def validate_endpoint_url(cls, v: str) -> str:
4642 """Validate agent endpoint URL
4644 Args:
4645 v (str): Value to validate
4647 Returns:
4648 str: Value if validated as safe
4649 """
4650 return SecurityValidator.validate_url(v, "Agent endpoint URL")
4652 @field_validator("description")
4653 @classmethod
4654 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4655 """Ensure descriptions display safely, truncate if too long
4657 Args:
4658 v (str): Value to validate
4660 Returns:
4661 str: Value if validated as safe and truncated if too long
4663 Raises:
4664 ValueError: When value is unsafe
4666 Examples:
4667 >>> from mcpgateway.schemas import A2AAgentUpdate
4668 >>> A2AAgentUpdate.validate_description('A safe description')
4669 'A safe description'
4670 >>> A2AAgentUpdate.validate_description(None) # Test None case
4671 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4672 >>> truncated = A2AAgentUpdate.validate_description(long_desc)
4673 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4674 0
4675 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4676 True
4677 """
4678 if v is None:
4679 return v
4680 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4681 # Truncate the description to the maximum allowed length
4682 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4683 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4684 return SecurityValidator.sanitize_display_text(truncated, "Description")
4685 return SecurityValidator.sanitize_display_text(v, "Description")
4687 @field_validator("capabilities", "config")
4688 @classmethod
4689 def validate_json_fields(cls, v: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
4690 """Validate JSON structure depth
4692 Args:
4693 v (dict): Value to validate
4695 Returns:
4696 dict: Value if validated as safe
4697 """
4698 if v is None:
4699 return v
4700 SecurityValidator.validate_json_depth(v)
4701 return v
4703 @field_validator("visibility")
4704 @classmethod
4705 def validate_visibility(cls, v: Optional[str]) -> Optional[str]:
4706 """Validate visibility level.
4708 Args:
4709 v: Visibility value to validate
4711 Returns:
4712 Validated visibility value
4714 Raises:
4715 ValueError: If visibility is invalid
4716 """
4717 if v is not None and v not in ["private", "team", "public"]:
4718 raise ValueError("Visibility must be one of: private, team, public")
4719 return v
4721 @field_validator("team_id")
4722 @classmethod
4723 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4724 """Validate team ID format.
4726 Args:
4727 v: Team ID to validate
4729 Returns:
4730 Validated team ID
4731 """
4732 if v is not None:
4733 return SecurityValidator.validate_uuid(v, "team_id")
4734 return v
4736 @field_validator("auth_value", mode="before")
4737 @classmethod
4738 def create_auth_value(cls, v, info):
4739 """
4740 This validator will run before the model is fully instantiated (mode="before")
4741 It will process the auth fields based on auth_type and generate auth_value.
4743 Args:
4744 v: Input URL
4745 info: ValidationInfo containing auth_type
4747 Returns:
4748 str: Auth value or URL
4749 """
4750 data = info.data
4751 auth_type = data.get("auth_type")
4753 if (auth_type is None) or (auth_type == ""):
4754 return v # If no auth_type is provided, no need to create auth_value
4756 # Process the auth fields and generate auth_value based on auth_type
4757 auth_value = cls._process_auth_fields(info)
4758 return auth_value
4760 @staticmethod
4761 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
4762 """
4763 Processes the input authentication fields and returns the correct auth_value.
4764 This method is called based on the selected auth_type.
4766 Args:
4767 info: ValidationInfo containing auth fields
4769 Returns:
4770 Encoded auth string or None
4772 Raises:
4773 ValueError: If auth type is invalid
4774 """
4776 data = info.data
4777 auth_type = data.get("auth_type")
4779 if auth_type == "basic":
4780 # For basic authentication, both username and password must be present
4781 username = data.get("auth_username")
4782 password = data.get("auth_password")
4783 if not username or not password:
4784 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
4786 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
4787 return encode_auth({"Authorization": f"Basic {creds}"})
4789 if auth_type == "bearer":
4790 # For bearer authentication, only token is required
4791 token = data.get("auth_token")
4793 if not token:
4794 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
4796 return encode_auth({"Authorization": f"Bearer {token}"})
4798 if auth_type == "oauth":
4799 # For OAuth authentication, we don't encode anything here
4800 # The OAuth configuration is handled separately in the oauth_config field
4801 # This method is only called for traditional auth types
4802 return None
4804 if auth_type == "authheaders":
4805 # Support both new multi-headers format and legacy single header format
4806 auth_headers = data.get("auth_headers")
4807 if auth_headers and isinstance(auth_headers, list):
4808 # New multi-headers format with enhanced validation
4809 header_dict = {}
4810 duplicate_keys = set()
4812 for header in auth_headers:
4813 if not isinstance(header, dict):
4814 continue
4816 key = header.get("key")
4817 value = header.get("value", "")
4819 # Skip headers without keys
4820 if not key:
4821 continue
4823 # Track duplicate keys (last value wins)
4824 if key in header_dict:
4825 duplicate_keys.add(key)
4827 # Validate header key format (basic HTTP header validation)
4828 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
4829 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
4831 # Store header (empty values are allowed)
4832 header_dict[key] = value
4834 # Ensure at least one valid header
4835 if not header_dict:
4836 raise ValueError("For 'headers' auth, at least one valid header with a key must be provided.")
4838 # Warn about duplicate keys (optional - could log this instead)
4839 if duplicate_keys:
4840 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
4842 # Check for excessive headers (prevent abuse)
4843 if len(header_dict) > 100:
4844 raise ValueError("Maximum of 100 headers allowed per gateway.")
4846 return encode_auth(header_dict)
4848 # Legacy single header format (backward compatibility)
4849 header_key = data.get("auth_header_key")
4850 header_value = data.get("auth_header_value")
4852 if not header_key or not header_value:
4853 raise ValueError("For 'headers' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
4855 return encode_auth({header_key: header_value})
4857 if auth_type == "one_time_auth":
4858 # One-time auth does not require encoding here
4859 return None
4861 if auth_type == "query_param":
4862 # Query param auth doesn't use auth_value field
4863 # Validation is handled by model_validator
4864 return None
4866 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, headers, or query_param.")
4868 @model_validator(mode="after")
4869 def validate_query_param_auth(self) -> "A2AAgentUpdate":
4870 """Validate query parameter authentication configuration.
4872 NOTE: This only runs when auth_type is explicitly set to "query_param".
4873 Service-layer enforcement handles the case where auth_type is omitted
4874 but the existing agent uses query_param auth.
4876 Returns:
4877 A2AAgentUpdate: The validated instance.
4879 Raises:
4880 ValueError: If required fields are missing when setting query_param auth.
4881 """
4882 if self.auth_type == "query_param":
4883 # Validate fields are provided when explicitly setting query_param auth
4884 # Feature flag/allowlist check happens in service layer (has access to existing agent)
4885 if not self.auth_query_param_key:
4886 raise ValueError("auth_query_param_key is required when setting auth_type to 'query_param'")
4887 if not self.auth_query_param_value:
4888 raise ValueError("auth_query_param_value is required when setting auth_type to 'query_param'")
4890 return self
4893class A2AAgentRead(BaseModelWithConfigDict):
4894 """Schema for reading A2A agent information.
4896 Includes all agent fields plus:
4897 - Database ID
4898 - Slug
4899 - Creation/update timestamps
4900 - Enabled/reachable status
4901 - Metrics
4902 - Authentication type: basic, bearer, headers, oauth, query_param
4903 - Authentication value: username/password or token or custom headers
4904 - OAuth configuration for OAuth 2.0 authentication
4905 - Query parameter authentication (key name and masked value)
4907 Auto Populated fields:
4908 - Authentication username: for basic auth
4909 - Authentication password: for basic auth
4910 - Authentication token: for bearer auth
4911 - Authentication header key: for headers auth
4912 - Authentication header value: for headers auth
4913 - Query param key: for query_param auth
4914 - Query param value (masked): for query_param auth
4915 """
4917 id: Optional[str] = Field(None, description="Unique ID of the a2a agent")
4918 name: str = Field(..., description="Unique name for the a2a agent")
4919 slug: Optional[str] = Field(None, description="Slug for a2a agent endpoint URL")
4920 description: Optional[str] = Field(None, description="a2a agent description")
4921 endpoint_url: str = Field(..., description="a2a agent endpoint URL")
4922 agent_type: str
4923 protocol_version: str
4924 capabilities: Dict[str, Any]
4925 config: Dict[str, Any]
4926 enabled: bool
4927 reachable: bool
4928 created_at: datetime
4929 updated_at: datetime
4930 last_interaction: Optional[datetime]
4931 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the agent")
4932 metrics: Optional[A2AAgentMetrics] = Field(None, description="Agent metrics (may be None in list operations)")
4933 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4934 # Authorizations
4935 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, headers, oauth, query_param, or None")
4936 auth_value: Optional[str] = Field(None, description="auth value: username/password or token or custom headers")
4938 # OAuth 2.0 configuration
4939 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4941 # auth_value will populate the following fields
4942 auth_username: Optional[str] = Field(None, description="username for basic authentication")
4943 auth_password: Optional[str] = Field(None, description="password for basic authentication")
4944 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
4945 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
4946 auth_header_value: Optional[str] = Field(None, description="vallue for custom headers authentication")
4948 # Query Parameter Authentication (masked for security)
4949 auth_query_param_key: Optional[str] = Field(
4950 None,
4951 description="Query parameter name for authentication",
4952 )
4953 auth_query_param_value_masked: Optional[str] = Field(
4954 None,
4955 description="Masked query parameter value (actual value is encrypted at rest)",
4956 )
4958 # Comprehensive metadata for audit tracking
4959 created_by: Optional[str] = Field(None, description="Username who created this entity")
4960 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
4961 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
4962 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
4964 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
4965 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
4966 modified_via: Optional[str] = Field(None, description="Modification method")
4967 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
4969 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
4970 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
4971 version: Optional[int] = Field(1, description="Entity version for change tracking")
4973 # Team scoping fields
4974 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
4975 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
4976 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
4977 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
4979 @model_validator(mode="before")
4980 @classmethod
4981 def _mask_query_param_auth(cls, data: Any) -> Any:
4982 """Mask query param auth value when constructing from DB model.
4984 This extracts auth_query_params from the raw data (DB model or dict)
4985 and populates the masked fields for display.
4987 Args:
4988 data: The raw data (dict or ORM model) to process.
4990 Returns:
4991 Any: The processed data with masked query param values.
4992 """
4993 # Handle dict input
4994 if isinstance(data, dict):
4995 auth_query_params = data.get("auth_query_params")
4996 if auth_query_params and isinstance(auth_query_params, dict):
4997 # Extract the param key name and set masked value
4998 first_key = next(iter(auth_query_params.keys()), None)
4999 if first_key: 4999 ↛ 5018line 4999 didn't jump to line 5018 because the condition on line 4999 was always true
5000 data["auth_query_param_key"] = first_key
5001 data["auth_query_param_value_masked"] = settings.masked_auth_value
5002 # Handle ORM model input (has auth_query_params attribute)
5003 elif hasattr(data, "auth_query_params"): 5003 ↛ 5018line 5003 didn't jump to line 5018 because the condition on line 5003 was always true
5004 auth_query_params = getattr(data, "auth_query_params", None)
5005 if auth_query_params and isinstance(auth_query_params, dict): 5005 ↛ 5018line 5005 didn't jump to line 5018 because the condition on line 5005 was always true
5006 # Convert ORM to dict for modification, preserving all attributes
5007 # Start with table columns
5008 data_dict = {c.name: getattr(data, c.name) for c in data.__table__.columns}
5009 # Preserve dynamically added attributes like 'team' (from relationships)
5010 for attr in ["team"]:
5011 if hasattr(data, attr): 5011 ↛ 5010line 5011 didn't jump to line 5010 because the condition on line 5011 was always true
5012 data_dict[attr] = getattr(data, attr)
5013 first_key = next(iter(auth_query_params.keys()), None)
5014 if first_key: 5014 ↛ 5017line 5014 didn't jump to line 5017 because the condition on line 5014 was always true
5015 data_dict["auth_query_param_key"] = first_key
5016 data_dict["auth_query_param_value_masked"] = settings.masked_auth_value
5017 return data_dict
5018 return data
5020 # This will be the main method to automatically populate fields
5021 @model_validator(mode="after")
5022 def _populate_auth(self) -> Self:
5023 """Populate authentication fields based on auth_type and encoded auth_value.
5025 This post-validation method decodes the stored authentication value and
5026 populates the appropriate authentication fields (username/password, token,
5027 or custom headers) based on the authentication type. It ensures the
5028 authentication data is properly formatted and accessible through individual
5029 fields for display purposes.
5031 The method handles three authentication types:
5032 - basic: Extracts username and password from Authorization header
5033 - bearer: Extracts token from Bearer Authorization header
5034 - authheaders: Extracts custom header key/value pair
5036 Returns:
5037 Self: The instance with populated authentication fields:
5038 - For basic: auth_username and auth_password
5039 - For bearer: auth_token
5040 - For authheaders: auth_header_key and auth_header_value
5042 Raises:
5043 ValueError: If the authentication data is malformed:
5044 - Basic auth missing username or password
5045 - Bearer auth missing or improperly formatted Authorization header
5046 - Custom headers not exactly one key/value pair
5048 Examples:
5049 >>> # Basic auth example
5050 >>> string_bytes = "admin:secret".encode("utf-8")
5051 >>> encoded_auth = base64.urlsafe_b64encode(string_bytes).decode("utf-8")
5052 >>> values = GatewayRead.model_construct(
5053 ... auth_type="basic",
5054 ... auth_value=encode_auth({"Authorization": f"Basic {encoded_auth}"})
5055 ... )
5056 >>> values = A2AAgentRead._populate_auth(values)
5057 >>> values.auth_username
5058 'admin'
5059 >>> values.auth_password
5060 'secret'
5062 >>> # Bearer auth example
5063 >>> values = A2AAgentRead.model_construct(
5064 ... auth_type="bearer",
5065 ... auth_value=encode_auth({"Authorization": "Bearer mytoken123"})
5066 ... )
5067 >>> values = A2AAgentRead._populate_auth(values)
5068 >>> values.auth_token
5069 'mytoken123'
5071 >>> # Custom headers example
5072 >>> values = A2AAgentRead.model_construct(
5073 ... auth_type='authheaders',
5074 ... auth_value=encode_auth({"X-API-Key": "abc123"})
5075 ... )
5076 >>> values = A2AAgentRead._populate_auth(values)
5077 >>> values.auth_header_key
5078 'X-API-Key'
5079 >>> values.auth_header_value
5080 'abc123'
5081 """
5082 auth_type = self.auth_type
5083 auth_value_encoded = self.auth_value
5084 # Skip validation logic if masked value
5085 if auth_value_encoded == settings.masked_auth_value:
5086 return self
5088 # Handle OAuth authentication (no auth_value to decode)
5089 if auth_type == "oauth":
5090 # OAuth gateways don't have traditional auth_value to decode
5091 # They use oauth_config instead
5092 return self
5094 if auth_type == "one_time_auth":
5095 return self
5097 if auth_type == "query_param":
5098 # Query param auth is handled by the before validator
5099 # (auth_query_params from DB model is processed there)
5100 return self
5102 # If no encoded value is present, nothing to populate
5103 if not auth_value_encoded:
5104 return self
5106 auth_value = decode_auth(auth_value_encoded)
5107 if auth_type == "basic":
5108 auth = auth_value.get("Authorization")
5109 if not (isinstance(auth, str) and auth.startswith("Basic ")):
5110 raise ValueError("basic auth requires an Authorization header of the form 'Basic <base64>'")
5111 auth = auth.removeprefix("Basic ")
5112 u, p = base64.urlsafe_b64decode(auth).decode("utf-8").split(":")
5113 if not u or not p:
5114 raise ValueError("basic auth requires both username and password")
5115 self.auth_username, self.auth_password = u, p
5117 elif auth_type == "bearer":
5118 auth = auth_value.get("Authorization")
5119 if not (isinstance(auth, str) and auth.startswith("Bearer ")):
5120 raise ValueError("bearer auth requires an Authorization header of the form 'Bearer <token>'")
5121 self.auth_token = auth.removeprefix("Bearer ")
5123 elif auth_type == "authheaders": 5123 ↛ 5129line 5123 didn't jump to line 5129 because the condition on line 5123 was always true
5124 # For backward compatibility, populate first header in key/value fields
5125 if len(auth_value) == 0:
5126 raise ValueError("authheaders requires at least one key/value pair")
5127 k, v = next(iter(auth_value.items()))
5128 self.auth_header_key, self.auth_header_value = k, v
5129 return self
5131 def masked(self) -> "A2AAgentRead":
5132 """
5133 Return a masked version of the model instance with sensitive authentication fields hidden.
5135 This method creates a dictionary representation of the model data and replaces sensitive fields
5136 such as `auth_value`, `auth_password`, `auth_token`, and `auth_header_value` with a masked
5137 placeholder value defined in `settings.masked_auth_value`. Masking is only applied if the fields
5138 are present and not already masked.
5140 Args:
5141 None
5143 Returns:
5144 A2AAgentRead: A new instance of the A2AAgentRead model with sensitive authentication-related fields
5145 masked to prevent exposure of sensitive information.
5147 Notes:
5148 - The `auth_value` field is only masked if it exists and its value is different from the masking
5149 placeholder.
5150 - Other sensitive fields (`auth_password`, `auth_token`, `auth_header_value`) are masked if present.
5151 - Fields not related to authentication remain unmodified.
5152 """
5153 masked_data = self.model_dump()
5155 # Only mask if auth_value is present and not already masked
5156 if masked_data.get("auth_value") and masked_data["auth_value"] != settings.masked_auth_value:
5157 masked_data["auth_value"] = settings.masked_auth_value
5159 masked_data["auth_password"] = settings.masked_auth_value if masked_data.get("auth_password") else None
5160 masked_data["auth_token"] = settings.masked_auth_value if masked_data.get("auth_token") else None
5161 masked_data["auth_header_value"] = settings.masked_auth_value if masked_data.get("auth_header_value") else None
5163 # Mask sensitive keys inside oauth_config (e.g. password, client_secret)
5164 if masked_data.get("oauth_config"):
5165 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
5167 return A2AAgentRead.model_validate(masked_data)
5170class A2AAgentInvocation(BaseModelWithConfigDict):
5171 """Schema for A2A agent invocation requests.
5173 Contains:
5174 - Agent name or ID to invoke
5175 - Parameters for the agent interaction
5176 - Interaction type (query, execute, etc.)
5177 """
5179 agent_name: str = Field(..., description="Name of the A2A agent to invoke")
5180 parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters for agent interaction")
5181 interaction_type: str = Field(default="query", description="Type of interaction (query, execute, etc.)")
5183 @field_validator("agent_name")
5184 @classmethod
5185 def validate_agent_name(cls, v: str) -> str:
5186 """Ensure agent names follow naming conventions
5188 Args:
5189 v (str): Value to validate
5191 Returns:
5192 str: Value if validated as safe
5193 """
5194 return SecurityValidator.validate_name(v, "Agent name")
5196 @field_validator("parameters")
5197 @classmethod
5198 def validate_parameters(cls, v: Dict[str, Any]) -> Dict[str, Any]:
5199 """Validate parameters structure depth to prevent DoS attacks.
5201 Args:
5202 v (dict): Parameters dictionary to validate
5204 Returns:
5205 dict: The validated parameters if within depth limits
5207 Raises:
5208 ValueError: If the parameters exceed the maximum allowed depth
5209 """
5210 SecurityValidator.validate_json_depth(v)
5211 return v
5214# ---------------------------------------------------------------------------
5215# Email-Based Authentication Schemas
5216# ---------------------------------------------------------------------------
5219class EmailLoginRequest(BaseModel):
5220 """Request schema for email login.
5222 Attributes:
5223 email: User's email address
5224 password: User's password
5226 Examples:
5227 >>> request = EmailLoginRequest(email="user@example.com", password="secret123")
5228 >>> request.email
5229 'user@example.com'
5230 >>> request.password
5231 'secret123'
5232 """
5234 model_config = ConfigDict(str_strip_whitespace=True)
5236 email: EmailStr = Field(..., description="User's email address")
5237 password: str = Field(..., min_length=1, description="User's password")
5240class PublicRegistrationRequest(BaseModel):
5241 """Public self-registration request — minimal fields, password required.
5243 Extra fields are rejected (extra="forbid") so clients cannot submit
5244 admin-only fields like is_admin or is_active.
5246 Attributes:
5247 email: User's email address
5248 password: User's password (required, min 8 chars)
5249 full_name: Optional full name for display
5251 Examples:
5252 >>> request = PublicRegistrationRequest(
5253 ... email="new@example.com",
5254 ... password="secure123",
5255 ... full_name="New User"
5256 ... )
5257 >>> request.email
5258 'new@example.com'
5259 >>> request.full_name
5260 'New User'
5261 """
5263 model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
5265 email: EmailStr = Field(..., description="User's email address")
5266 password: str = Field(..., min_length=8, description="User's password")
5267 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5270class AdminCreateUserRequest(BaseModel):
5271 """Admin user creation request — all fields, password required.
5273 Attributes:
5274 email: User's email address
5275 password: User's password (required, min 8 chars)
5276 full_name: Optional full name for display
5277 is_admin: Whether user should have admin privileges (default: False)
5278 is_active: Whether user account is active (default: True)
5279 password_change_required: Whether user must change password on next login (default: False)
5281 Examples:
5282 >>> request = AdminCreateUserRequest(
5283 ... email="new@example.com",
5284 ... password="secure123",
5285 ... full_name="New User"
5286 ... )
5287 >>> request.email
5288 'new@example.com'
5289 >>> request.full_name
5290 'New User'
5291 >>> request.is_admin
5292 False
5293 >>> request.is_active
5294 True
5295 >>> request.password_change_required
5296 False
5297 """
5299 model_config = ConfigDict(str_strip_whitespace=True)
5301 email: EmailStr = Field(..., description="User's email address")
5302 password: str = Field(..., min_length=8, description="User's password")
5303 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5304 is_admin: bool = Field(False, description="Grant admin privileges to user")
5305 is_active: bool = Field(True, description="Whether user account is active")
5306 password_change_required: bool = Field(False, description="Whether user must change password on next login")
5309# Deprecated alias — use AdminCreateUserRequest or PublicRegistrationRequest instead
5310EmailRegistrationRequest = AdminCreateUserRequest
5313class ChangePasswordRequest(BaseModel):
5314 """Request schema for password change.
5316 Attributes:
5317 old_password: Current password for verification
5318 new_password: New password to set
5320 Examples:
5321 >>> request = ChangePasswordRequest(
5322 ... old_password="old_secret",
5323 ... new_password="new_secure_password"
5324 ... )
5325 >>> request.old_password
5326 'old_secret'
5327 >>> request.new_password
5328 'new_secure_password'
5329 """
5331 model_config = ConfigDict(str_strip_whitespace=True)
5333 old_password: str = Field(..., min_length=1, description="Current password")
5334 new_password: str = Field(..., min_length=8, description="New password")
5336 @field_validator("new_password")
5337 @classmethod
5338 def validate_new_password(cls, v: str) -> str:
5339 """Validate new password meets minimum requirements.
5341 Args:
5342 v: New password string to validate
5344 Returns:
5345 str: Validated new password
5347 Raises:
5348 ValueError: If new password doesn't meet requirements
5349 """
5350 if len(v) < 8:
5351 raise ValueError("New password must be at least 8 characters long")
5352 return v
5355class EmailUserResponse(BaseModel):
5356 """Response schema for user information.
5358 Attributes:
5359 email: User's email address
5360 full_name: User's full name
5361 is_admin: Whether user has admin privileges
5362 is_active: Whether account is active
5363 auth_provider: Authentication provider used
5364 created_at: Account creation timestamp
5365 last_login: Last successful login timestamp
5366 email_verified: Whether email is verified
5367 password_change_required: Whether user must change password on next login
5369 Examples:
5370 >>> user = EmailUserResponse(
5371 ... email="user@example.com",
5372 ... full_name="Test User",
5373 ... is_admin=False,
5374 ... is_active=True,
5375 ... auth_provider="local",
5376 ... created_at=datetime.now(),
5377 ... last_login=None,
5378 ... email_verified=False
5379 ... )
5380 >>> user.email
5381 'user@example.com'
5382 >>> user.is_admin
5383 False
5384 """
5386 model_config = ConfigDict(from_attributes=True)
5388 email: str = Field(..., description="User's email address")
5389 full_name: Optional[str] = Field(None, description="User's full name")
5390 is_admin: bool = Field(..., description="Whether user has admin privileges")
5391 is_active: bool = Field(..., description="Whether account is active")
5392 auth_provider: str = Field(..., description="Authentication provider")
5393 created_at: datetime = Field(..., description="Account creation timestamp")
5394 last_login: Optional[datetime] = Field(None, description="Last successful login")
5395 email_verified: bool = Field(False, description="Whether email is verified")
5396 password_change_required: bool = Field(False, description="Whether user must change password on next login")
5398 @classmethod
5399 def from_email_user(cls, user) -> "EmailUserResponse":
5400 """Create response from EmailUser model.
5402 Args:
5403 user: EmailUser model instance
5405 Returns:
5406 EmailUserResponse: Response schema instance
5407 """
5408 return cls(
5409 email=user.email,
5410 full_name=user.full_name,
5411 is_admin=user.is_admin,
5412 is_active=user.is_active,
5413 auth_provider=user.auth_provider,
5414 created_at=user.created_at,
5415 last_login=user.last_login,
5416 email_verified=user.is_email_verified(),
5417 password_change_required=user.password_change_required,
5418 )
5421class AuthenticationResponse(BaseModel):
5422 """Response schema for successful authentication.
5424 Attributes:
5425 access_token: JWT token for API access
5426 token_type: Type of token (always 'bearer')
5427 expires_in: Token expiration time in seconds
5428 user: User information
5430 Examples:
5431 >>> from datetime import datetime
5432 >>> response = AuthenticationResponse(
5433 ... access_token="jwt.token.here",
5434 ... token_type="bearer",
5435 ... expires_in=3600,
5436 ... user=EmailUserResponse(
5437 ... email="user@example.com",
5438 ... full_name="Test User",
5439 ... is_admin=False,
5440 ... is_active=True,
5441 ... auth_provider="local",
5442 ... created_at=datetime.now(),
5443 ... last_login=None,
5444 ... email_verified=False
5445 ... )
5446 ... )
5447 >>> response.token_type
5448 'bearer'
5449 >>> response.user.email
5450 'user@example.com'
5451 """
5453 access_token: str = Field(..., description="JWT access token")
5454 token_type: str = Field(default="bearer", description="Token type")
5455 expires_in: int = Field(..., description="Token expiration in seconds")
5456 user: EmailUserResponse = Field(..., description="User information")
5459class AuthEventResponse(BaseModel):
5460 """Response schema for authentication events.
5462 Attributes:
5463 id: Event ID
5464 timestamp: Event timestamp
5465 user_email: User's email address
5466 event_type: Type of authentication event
5467 success: Whether the event was successful
5468 ip_address: Client IP address
5469 failure_reason: Reason for failure (if applicable)
5471 Examples:
5472 >>> from datetime import datetime
5473 >>> event = AuthEventResponse(
5474 ... id=1,
5475 ... timestamp=datetime.now(),
5476 ... user_email="user@example.com",
5477 ... event_type="login",
5478 ... success=True,
5479 ... ip_address="192.168.1.1",
5480 ... failure_reason=None
5481 ... )
5482 >>> event.event_type
5483 'login'
5484 >>> event.success
5485 True
5486 """
5488 model_config = ConfigDict(from_attributes=True)
5490 id: int = Field(..., description="Event ID")
5491 timestamp: datetime = Field(..., description="Event timestamp")
5492 user_email: Optional[str] = Field(None, description="User's email address")
5493 event_type: str = Field(..., description="Type of authentication event")
5494 success: bool = Field(..., description="Whether the event was successful")
5495 ip_address: Optional[str] = Field(None, description="Client IP address")
5496 failure_reason: Optional[str] = Field(None, description="Reason for failure")
5499class UserListResponse(BaseModel):
5500 """Response schema for user list.
5502 Attributes:
5503 users: List of users
5504 total_count: Total number of users
5505 limit: Request limit
5506 offset: Request offset
5508 Examples:
5509 >>> user_list = UserListResponse(
5510 ... users=[],
5511 ... total_count=0,
5512 ... limit=10,
5513 ... offset=0
5514 ... )
5515 >>> user_list.total_count
5516 0
5517 >>> len(user_list.users)
5518 0
5519 """
5521 users: list[EmailUserResponse] = Field(..., description="List of users")
5522 total_count: int = Field(..., description="Total number of users")
5523 limit: int = Field(..., description="Request limit")
5524 offset: int = Field(..., description="Request offset")
5527class AdminUserUpdateRequest(BaseModel):
5528 """Request schema for admin user updates.
5530 Attributes:
5531 full_name: User's full name
5532 is_admin: Whether user has admin privileges
5533 is_active: Whether account is active
5534 password_change_required: Whether user must change password on next login
5535 password: New password (admin can reset without old password)
5537 Examples:
5538 >>> request = AdminUserUpdateRequest(
5539 ... full_name="Updated Name",
5540 ... is_admin=True,
5541 ... is_active=True
5542 ... )
5543 >>> request.full_name
5544 'Updated Name'
5545 >>> request.is_admin
5546 True
5547 """
5549 model_config = ConfigDict(str_strip_whitespace=True)
5551 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5552 is_admin: Optional[bool] = Field(None, description="Whether user has admin privileges")
5553 is_active: Optional[bool] = Field(None, description="Whether account is active")
5554 password_change_required: Optional[bool] = Field(None, description="Whether user must change password on next login")
5555 password: Optional[str] = Field(None, min_length=8, description="New password (admin reset)")
5558class ErrorResponse(BaseModel):
5559 """Standard error response schema.
5561 Attributes:
5562 error: Error type
5563 message: Human-readable error message
5564 details: Additional error details
5566 Examples:
5567 >>> error = ErrorResponse(
5568 ... error="authentication_failed",
5569 ... message="Invalid email or password",
5570 ... details=None
5571 ... )
5572 >>> error.error
5573 'authentication_failed'
5574 >>> error.message
5575 'Invalid email or password'
5576 """
5578 error: str = Field(..., description="Error type")
5579 message: str = Field(..., description="Human-readable error message")
5580 details: Optional[dict] = Field(None, description="Additional error details")
5583class SuccessResponse(BaseModel):
5584 """Standard success response schema.
5586 Attributes:
5587 success: Whether operation was successful
5588 message: Human-readable success message
5590 Examples:
5591 >>> response = SuccessResponse(
5592 ... success=True,
5593 ... message="Password changed successfully"
5594 ... )
5595 >>> response.success
5596 True
5597 >>> response.message
5598 'Password changed successfully'
5599 """
5601 success: bool = Field(True, description="Operation success status")
5602 message: str = Field(..., description="Human-readable success message")
5605# ---------------------------------------------------------------------------
5606# Team Management Schemas
5607# ---------------------------------------------------------------------------
5610class TeamCreateRequest(BaseModel):
5611 """Schema for creating a new team.
5613 Attributes:
5614 name: Team display name
5615 slug: URL-friendly team identifier (optional, auto-generated if not provided)
5616 description: Team description
5617 visibility: Team visibility level
5618 max_members: Maximum number of members allowed
5620 Examples:
5621 >>> request = TeamCreateRequest(
5622 ... name="Engineering Team",
5623 ... description="Software development team"
5624 ... )
5625 >>> request.name
5626 'Engineering Team'
5627 >>> request.visibility
5628 'private'
5629 >>> request.slug is None
5630 True
5631 >>>
5632 >>> # Test with all fields
5633 >>> full_request = TeamCreateRequest(
5634 ... name="DevOps Team",
5635 ... slug="devops-team",
5636 ... description="Infrastructure and deployment team",
5637 ... visibility="public",
5638 ... max_members=50
5639 ... )
5640 >>> full_request.slug
5641 'devops-team'
5642 >>> full_request.max_members
5643 50
5644 >>> full_request.visibility
5645 'public'
5646 >>>
5647 >>> # Test validation
5648 >>> try:
5649 ... TeamCreateRequest(name=" ", description="test")
5650 ... except ValueError as e:
5651 ... "empty" in str(e).lower()
5652 True
5653 >>>
5654 >>> # Test slug validation
5655 >>> try:
5656 ... TeamCreateRequest(name="Test", slug="Invalid_Slug")
5657 ... except ValueError:
5658 ... True
5659 True
5660 >>>
5661 >>> # Test valid slug patterns
5662 >>> valid_slug = TeamCreateRequest(name="Test", slug="valid-slug-123")
5663 >>> valid_slug.slug
5664 'valid-slug-123'
5665 """
5667 name: str = Field(..., min_length=1, max_length=255, description="Team display name")
5668 slug: Optional[str] = Field(None, min_length=2, max_length=255, pattern="^[a-z0-9-]+$", description="URL-friendly team identifier")
5669 description: Optional[str] = Field(None, max_length=1000, description="Team description")
5670 visibility: Literal["private", "public"] = Field("private", description="Team visibility level")
5671 max_members: Optional[int] = Field(default=None, description="Maximum number of team members")
5673 @field_validator("name")
5674 @classmethod
5675 def validate_name(cls, v: str) -> str:
5676 """Validate team name.
5678 Args:
5679 v: Team name to validate
5681 Returns:
5682 str: Validated and stripped team name
5684 Raises:
5685 ValueError: If team name is empty or contains invalid characters
5686 """
5687 if not v.strip():
5688 raise ValueError("Team name cannot be empty")
5689 v = v.strip()
5690 # Strict validation: only alphanumeric, underscore, period, dash, and spaces
5691 if not re.match(settings.validation_name_pattern, v):
5692 raise ValueError("Team name can only contain letters, numbers, spaces, underscores, periods, and dashes")
5693 SecurityValidator.validate_no_xss(v, "Team name")
5694 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5695 raise ValueError("Team name contains script patterns that may cause security issues")
5696 return v
5698 @field_validator("description")
5699 @classmethod
5700 def validate_description(cls, v: Optional[str]) -> Optional[str]:
5701 """Validate team description for XSS.
5703 Args:
5704 v: Team description to validate
5706 Returns:
5707 Optional[str]: Validated description or None
5709 Raises:
5710 ValueError: If description contains dangerous patterns
5711 """
5712 if v is not None:
5713 v = v.strip()
5714 if v: 5714 ↛ 5718line 5714 didn't jump to line 5718 because the condition on line 5714 was always true
5715 SecurityValidator.validate_no_xss(v, "Team description")
5716 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5717 raise ValueError("Team description contains script patterns that may cause security issues")
5718 return v if v else None
5720 @field_validator("slug")
5721 @classmethod
5722 def validate_slug(cls, v: Optional[str]) -> Optional[str]:
5723 """Validate team slug.
5725 Args:
5726 v: Team slug to validate
5728 Returns:
5729 Optional[str]: Validated and formatted slug or None
5731 Raises:
5732 ValueError: If slug format is invalid
5733 """
5734 if v is None:
5735 return v
5736 v = v.strip().lower()
5737 # Uses precompiled regex for slug validation
5738 if not _SLUG_RE.match(v):
5739 raise ValueError("Slug must contain only lowercase letters, numbers, and hyphens")
5740 if v.startswith("-") or v.endswith("-"):
5741 raise ValueError("Slug cannot start or end with hyphens")
5742 return v
5745class TeamUpdateRequest(BaseModel):
5746 """Schema for updating a team.
5748 Attributes:
5749 name: Team display name
5750 description: Team description
5751 visibility: Team visibility level
5752 max_members: Maximum number of members allowed
5754 Examples:
5755 >>> request = TeamUpdateRequest(
5756 ... name="Updated Engineering Team",
5757 ... description="Updated description"
5758 ... )
5759 >>> request.name
5760 'Updated Engineering Team'
5761 """
5763 name: Optional[str] = Field(None, min_length=1, max_length=255, description="Team display name")
5764 description: Optional[str] = Field(None, max_length=1000, description="Team description")
5765 visibility: Optional[Literal["private", "public"]] = Field(None, description="Team visibility level")
5766 max_members: Optional[int] = Field(default=None, description="Maximum number of team members")
5768 @field_validator("name")
5769 @classmethod
5770 def validate_name(cls, v: Optional[str]) -> Optional[str]:
5771 """Validate team name.
5773 Args:
5774 v: Team name to validate
5776 Returns:
5777 Optional[str]: Validated and stripped team name or None
5779 Raises:
5780 ValueError: If team name is empty or contains invalid characters
5781 """
5782 if v is not None:
5783 if not v.strip():
5784 raise ValueError("Team name cannot be empty")
5785 v = v.strip()
5786 # Strict validation: only alphanumeric, underscore, period, dash, and spaces
5787 if not re.match(settings.validation_name_pattern, v):
5788 raise ValueError("Team name can only contain letters, numbers, spaces, underscores, periods, and dashes")
5789 SecurityValidator.validate_no_xss(v, "Team name")
5790 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5791 raise ValueError("Team name contains script patterns that may cause security issues")
5792 return v
5793 return v
5795 @field_validator("description")
5796 @classmethod
5797 def validate_description(cls, v: Optional[str]) -> Optional[str]:
5798 """Validate team description for XSS.
5800 Args:
5801 v: Team description to validate
5803 Returns:
5804 Optional[str]: Validated description or None
5806 Raises:
5807 ValueError: If description contains dangerous patterns
5808 """
5809 if v is not None: 5809 ↛ 5815line 5809 didn't jump to line 5815 because the condition on line 5809 was always true
5810 v = v.strip()
5811 if v: 5811 ↛ 5815line 5811 didn't jump to line 5815 because the condition on line 5811 was always true
5812 SecurityValidator.validate_no_xss(v, "Team description")
5813 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5814 raise ValueError("Team description contains script patterns that may cause security issues")
5815 return v if v else None
5818class TeamResponse(BaseModel):
5819 """Schema for team response data.
5821 Attributes:
5822 id: Team UUID
5823 name: Team display name
5824 slug: URL-friendly team identifier
5825 description: Team description
5826 created_by: Email of team creator
5827 is_personal: Whether this is a personal team
5828 visibility: Team visibility level
5829 max_members: Maximum number of members allowed
5830 member_count: Current number of team members
5831 created_at: Team creation timestamp
5832 updated_at: Last update timestamp
5833 is_active: Whether the team is active
5835 Examples:
5836 >>> team = TeamResponse(
5837 ... id="team-123",
5838 ... name="Engineering Team",
5839 ... slug="engineering-team",
5840 ... created_by="admin@example.com",
5841 ... is_personal=False,
5842 ... visibility="private",
5843 ... member_count=5,
5844 ... created_at=datetime.now(timezone.utc),
5845 ... updated_at=datetime.now(timezone.utc),
5846 ... is_active=True
5847 ... )
5848 >>> team.name
5849 'Engineering Team'
5850 """
5852 id: str = Field(..., description="Team UUID")
5853 name: str = Field(..., description="Team display name")
5854 slug: str = Field(..., description="URL-friendly team identifier")
5855 description: Optional[str] = Field(None, description="Team description")
5856 created_by: str = Field(..., description="Email of team creator")
5857 is_personal: bool = Field(..., description="Whether this is a personal team")
5858 visibility: Optional[str] = Field(..., description="Team visibility level")
5859 max_members: Optional[int] = Field(None, description="Maximum number of members allowed")
5860 member_count: int = Field(..., description="Current number of team members")
5861 created_at: datetime = Field(..., description="Team creation timestamp")
5862 updated_at: datetime = Field(..., description="Last update timestamp")
5863 is_active: bool = Field(..., description="Whether the team is active")
5866class TeamMemberResponse(BaseModel):
5867 """Schema for team member response data.
5869 Attributes:
5870 id: Member UUID
5871 team_id: Team UUID
5872 user_email: Member email address
5873 role: Member role in the team
5874 joined_at: When the member joined
5875 invited_by: Email of user who invited this member
5876 is_active: Whether the membership is active
5878 Examples:
5879 >>> member = TeamMemberResponse(
5880 ... id="member-123",
5881 ... team_id="team-123",
5882 ... user_email="user@example.com",
5883 ... role="member",
5884 ... joined_at=datetime.now(timezone.utc),
5885 ... is_active=True
5886 ... )
5887 >>> member.role
5888 'member'
5889 """
5891 id: str = Field(..., description="Member UUID")
5892 team_id: str = Field(..., description="Team UUID")
5893 user_email: str = Field(..., description="Member email address")
5894 role: str = Field(..., description="Member role in the team")
5895 joined_at: datetime = Field(..., description="When the member joined")
5896 invited_by: Optional[str] = Field(None, description="Email of user who invited this member")
5897 is_active: bool = Field(..., description="Whether the membership is active")
5900class PaginatedTeamMembersResponse(BaseModel):
5901 """Schema for paginated team member list response.
5903 Attributes:
5904 members: List of team members
5905 next_cursor: Optional cursor for next page of results
5907 Examples:
5908 >>> member1 = TeamMemberResponse(
5909 ... id="member-1",
5910 ... team_id="team-123",
5911 ... user_email="user1@example.com",
5912 ... role="member",
5913 ... joined_at=datetime.now(timezone.utc),
5914 ... is_active=True
5915 ... )
5916 >>> member2 = TeamMemberResponse(
5917 ... id="member-2",
5918 ... team_id="team-123",
5919 ... user_email="user2@example.com",
5920 ... role="member",
5921 ... joined_at=datetime.now(timezone.utc),
5922 ... is_active=True
5923 ... )
5924 >>> response = PaginatedTeamMembersResponse(
5925 ... members=[member1, member2],
5926 ... nextCursor="cursor-token-123"
5927 ... )
5928 >>> len(response.members)
5929 2
5930 """
5932 members: List[TeamMemberResponse] = Field(..., description="List of team members")
5933 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for next page of results")
5936class TeamInviteRequest(BaseModel):
5937 """Schema for inviting users to a team.
5939 Attributes:
5940 email: Email address of user to invite
5941 role: Role to assign to the user
5943 Examples:
5944 >>> invite = TeamInviteRequest(
5945 ... email="newuser@example.com",
5946 ... role="member"
5947 ... )
5948 >>> invite.email
5949 'newuser@example.com'
5950 """
5952 email: EmailStr = Field(..., description="Email address of user to invite")
5953 role: Literal["owner", "member"] = Field("member", description="Role to assign to the user")
5956class TeamInvitationResponse(BaseModel):
5957 """Schema for team invitation response data.
5959 Attributes:
5960 id: Invitation UUID
5961 team_id: Team UUID
5962 team_name: Team display name
5963 email: Email address of invited user
5964 role: Role the user will have when they accept
5965 invited_by: Email of user who sent the invitation
5966 invited_at: When the invitation was sent
5967 expires_at: When the invitation expires
5968 token: Invitation token
5969 is_active: Whether the invitation is active
5970 is_expired: Whether the invitation has expired
5972 Examples:
5973 >>> invitation = TeamInvitationResponse(
5974 ... id="invite-123",
5975 ... team_id="team-123",
5976 ... team_name="Engineering Team",
5977 ... email="newuser@example.com",
5978 ... role="member",
5979 ... invited_by="admin@example.com",
5980 ... invited_at=datetime.now(timezone.utc),
5981 ... expires_at=datetime.now(timezone.utc),
5982 ... token="invitation-token",
5983 ... is_active=True,
5984 ... is_expired=False
5985 ... )
5986 >>> invitation.role
5987 'member'
5988 """
5990 id: str = Field(..., description="Invitation UUID")
5991 team_id: str = Field(..., description="Team UUID")
5992 team_name: str = Field(..., description="Team display name")
5993 email: str = Field(..., description="Email address of invited user")
5994 role: str = Field(..., description="Role the user will have when they accept")
5995 invited_by: str = Field(..., description="Email of user who sent the invitation")
5996 invited_at: datetime = Field(..., description="When the invitation was sent")
5997 expires_at: datetime = Field(..., description="When the invitation expires")
5998 token: str = Field(..., description="Invitation token")
5999 is_active: bool = Field(..., description="Whether the invitation is active")
6000 is_expired: bool = Field(..., description="Whether the invitation has expired")
6003class TeamMemberUpdateRequest(BaseModel):
6004 """Schema for updating a team member's role.
6006 Attributes:
6007 role: New role for the team member
6009 Examples:
6010 >>> update = TeamMemberUpdateRequest(role="member")
6011 >>> update.role
6012 'member'
6013 """
6015 role: Literal["owner", "member"] = Field(..., description="New role for the team member")
6018class TeamListResponse(BaseModel):
6019 """Schema for team list response.
6021 Attributes:
6022 teams: List of teams
6023 total: Total number of teams
6025 Examples:
6026 >>> response = TeamListResponse(teams=[], total=0)
6027 >>> response.total
6028 0
6029 """
6031 teams: List[TeamResponse] = Field(..., description="List of teams")
6032 total: int = Field(..., description="Total number of teams")
6035class TeamDiscoveryResponse(BaseModel):
6036 """Schema for public team discovery response.
6038 Provides limited metadata about public teams for discovery purposes.
6040 Attributes:
6041 id: Team ID
6042 name: Team name
6043 description: Team description
6044 member_count: Number of members
6045 created_at: Team creation timestamp
6046 is_joinable: Whether the current user can join this team
6047 """
6049 id: str = Field(..., description="Team ID")
6050 name: str = Field(..., description="Team name")
6051 description: Optional[str] = Field(None, description="Team description")
6052 member_count: int = Field(..., description="Number of team members")
6053 created_at: datetime = Field(..., description="Team creation timestamp")
6054 is_joinable: bool = Field(..., description="Whether the current user can join this team")
6057class TeamJoinRequest(BaseModel):
6058 """Schema for requesting to join a public team.
6060 Attributes:
6061 message: Optional message to team owners
6062 """
6064 message: Optional[str] = Field(None, description="Optional message to team owners", max_length=500)
6067class TeamJoinRequestResponse(BaseModel):
6068 """Schema for team join request response.
6070 Attributes:
6071 id: Join request ID
6072 team_id: Target team ID
6073 team_name: Target team name
6074 user_email: Requesting user email
6075 message: Request message
6076 status: Request status (pending, approved, rejected)
6077 requested_at: Request timestamp
6078 expires_at: Request expiration timestamp
6079 """
6081 id: str = Field(..., description="Join request ID")
6082 team_id: str = Field(..., description="Target team ID")
6083 team_name: str = Field(..., description="Target team name")
6084 user_email: str = Field(..., description="Requesting user email")
6085 message: Optional[str] = Field(None, description="Request message")
6086 status: str = Field(..., description="Request status")
6087 requested_at: datetime = Field(..., description="Request timestamp")
6088 expires_at: datetime = Field(..., description="Request expiration")
6091# API Token Management Schemas
6094class TokenScopeRequest(BaseModel):
6095 """Schema for token scoping configuration.
6097 Attributes:
6098 server_id: Optional server ID limitation
6099 permissions: List of permission scopes
6100 ip_restrictions: List of IP address/CIDR restrictions
6101 time_restrictions: Time-based access limitations
6102 usage_limits: Rate limiting and quota settings
6104 Examples:
6105 >>> scope = TokenScopeRequest(
6106 ... server_id="server-123",
6107 ... permissions=["tools.read", "resources.read"],
6108 ... ip_restrictions=["192.168.1.0/24"]
6109 ... )
6110 >>> scope.server_id
6111 'server-123'
6112 """
6114 server_id: Optional[str] = Field(None, description="Limit token to specific server")
6115 permissions: List[str] = Field(default_factory=list, description="Permission scopes")
6116 ip_restrictions: List[str] = Field(default_factory=list, description="IP address restrictions")
6117 time_restrictions: Dict[str, Any] = Field(default_factory=dict, description="Time-based restrictions")
6118 usage_limits: Dict[str, Any] = Field(default_factory=dict, description="Usage limits and quotas")
6120 @field_validator("ip_restrictions")
6121 @classmethod
6122 def validate_ip_restrictions(cls, v: List[str]) -> List[str]:
6123 """Validate IP addresses and CIDR notation.
6125 Args:
6126 v: List of IP address or CIDR strings to validate.
6128 Returns:
6129 List of validated IP/CIDR strings with whitespace stripped.
6131 Raises:
6132 ValueError: If any IP address or CIDR notation is invalid.
6134 Examples:
6135 >>> TokenScopeRequest.validate_ip_restrictions(["192.168.1.0/24"])
6136 ['192.168.1.0/24']
6137 >>> TokenScopeRequest.validate_ip_restrictions(["10.0.0.1"])
6138 ['10.0.0.1']
6139 """
6140 # Standard
6141 import ipaddress # pylint: disable=import-outside-toplevel
6143 if not v:
6144 return v
6146 validated = []
6147 for ip_str in v:
6148 ip_str = ip_str.strip()
6149 if not ip_str:
6150 continue
6151 try:
6152 # Try parsing as network (CIDR notation)
6153 if "/" in ip_str:
6154 ipaddress.ip_network(ip_str, strict=False)
6155 else:
6156 # Try parsing as single IP address
6157 ipaddress.ip_address(ip_str)
6158 validated.append(ip_str)
6159 except ValueError as e:
6160 raise ValueError(f"Invalid IP address or CIDR notation '{ip_str}': {e}") from e
6161 return validated
6163 @field_validator("permissions")
6164 @classmethod
6165 def validate_permissions(cls, v: List[str]) -> List[str]:
6166 """Validate permission scope format.
6168 Permissions must be in format 'resource.action' or wildcard '*'.
6170 Args:
6171 v: List of permission strings to validate.
6173 Returns:
6174 List of validated permission strings with whitespace stripped.
6176 Raises:
6177 ValueError: If any permission does not match 'resource.action' format or '*'.
6179 Examples:
6180 >>> TokenScopeRequest.validate_permissions(["tools.read", "resources.write"])
6181 ['tools.read', 'resources.write']
6182 >>> TokenScopeRequest.validate_permissions(["*"])
6183 ['*']
6184 """
6185 if not v:
6186 return v
6188 # Permission pattern: resource.action (alphanumeric with underscores)
6189 permission_pattern = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]*\.[a-zA-Z][a-zA-Z0-9_]*$")
6191 validated = []
6192 for perm in v:
6193 perm = perm.strip()
6194 if not perm:
6195 continue
6196 # Allow wildcard
6197 if perm == "*":
6198 validated.append(perm)
6199 continue
6200 if not permission_pattern.match(perm):
6201 raise ValueError(f"Invalid permission format '{perm}'. Use 'resource.action' format (e.g., 'tools.read') or '*' for full access")
6202 validated.append(perm)
6203 return validated
6206class TokenCreateRequest(BaseModel):
6207 """Schema for creating a new API token.
6209 Attributes:
6210 name: Human-readable token name
6211 description: Optional token description
6212 expires_in_days: Optional expiry in days
6213 scope: Optional token scoping configuration
6214 tags: Optional organizational tags
6215 is_active: Token active status (defaults to True)
6217 Examples:
6218 >>> request = TokenCreateRequest(
6219 ... name="Production Access",
6220 ... description="Read-only production access",
6221 ... expires_in_days=30,
6222 ... tags=["production", "readonly"]
6223 ... )
6224 >>> request.name
6225 'Production Access'
6226 """
6228 name: str = Field(..., description="Human-readable token name", min_length=1, max_length=255)
6229 description: Optional[str] = Field(None, description="Token description", max_length=1000)
6230 expires_in_days: Optional[int] = Field(default=None, ge=1, description="Expiry in days (must be >= 1 if specified)")
6231 scope: Optional[TokenScopeRequest] = Field(None, description="Token scoping configuration")
6232 tags: List[str] = Field(default_factory=list, description="Organizational tags")
6233 team_id: Optional[str] = Field(None, description="Team ID for team-scoped tokens")
6234 is_active: bool = Field(default=True, description="Token active status")
6237class TokenUpdateRequest(BaseModel):
6238 """Schema for updating an existing API token.
6240 Attributes:
6241 name: New token name
6242 description: New token description
6243 scope: New token scoping configuration
6244 tags: New organizational tags
6245 is_active: New token active status
6247 Examples:
6248 >>> request = TokenUpdateRequest(
6249 ... name="Updated Token Name",
6250 ... description="Updated description"
6251 ... )
6252 >>> request.name
6253 'Updated Token Name'
6254 """
6256 name: Optional[str] = Field(None, description="New token name", min_length=1, max_length=255)
6257 description: Optional[str] = Field(None, description="New token description", max_length=1000)
6258 scope: Optional[TokenScopeRequest] = Field(None, description="New token scoping configuration")
6259 tags: Optional[List[str]] = Field(None, description="New organizational tags")
6260 is_active: Optional[bool] = Field(None, description="New token active status")
6263class TokenResponse(BaseModel):
6264 """Schema for API token response.
6266 Attributes:
6267 id: Token ID
6268 name: Token name
6269 description: Token description
6270 server_id: Server scope limitation
6271 resource_scopes: Permission scopes
6272 ip_restrictions: IP restrictions
6273 time_restrictions: Time-based restrictions
6274 usage_limits: Usage limits
6275 created_at: Creation timestamp
6276 expires_at: Expiry timestamp
6277 last_used: Last usage timestamp
6278 is_active: Active status
6279 tags: Organizational tags
6281 Examples:
6282 >>> from datetime import datetime
6283 >>> token = TokenResponse(
6284 ... id="token-123",
6285 ... name="Test Token",
6286 ... description="Test description",
6287 ... user_email="test@example.com",
6288 ... server_id=None,
6289 ... resource_scopes=["tools.read"],
6290 ... ip_restrictions=[],
6291 ... time_restrictions={},
6292 ... usage_limits={},
6293 ... created_at=datetime.now(),
6294 ... expires_at=None,
6295 ... last_used=None,
6296 ... is_active=True,
6297 ... tags=[]
6298 ... )
6299 >>> token.name
6300 'Test Token'
6301 """
6303 model_config = ConfigDict(from_attributes=True)
6305 id: str = Field(..., description="Token ID")
6306 name: str = Field(..., description="Token name")
6307 description: Optional[str] = Field(None, description="Token description")
6308 user_email: str = Field(..., description="Token creator's email")
6309 team_id: Optional[str] = Field(None, description="Team ID for team-scoped tokens")
6310 server_id: Optional[str] = Field(None, description="Server scope limitation")
6311 resource_scopes: List[str] = Field(..., description="Permission scopes")
6312 ip_restrictions: List[str] = Field(..., description="IP restrictions")
6313 time_restrictions: Dict[str, Any] = Field(..., description="Time-based restrictions")
6314 usage_limits: Dict[str, Any] = Field(..., description="Usage limits")
6315 created_at: datetime = Field(..., description="Creation timestamp")
6316 expires_at: Optional[datetime] = Field(None, description="Expiry timestamp")
6317 last_used: Optional[datetime] = Field(None, description="Last usage timestamp")
6318 is_active: bool = Field(..., description="Active status")
6319 is_revoked: bool = Field(False, description="Whether token is revoked")
6320 revoked_at: Optional[datetime] = Field(None, description="Revocation timestamp")
6321 revoked_by: Optional[str] = Field(None, description="Email of user who revoked token")
6322 revocation_reason: Optional[str] = Field(None, description="Reason for revocation")
6323 tags: List[str] = Field(..., description="Organizational tags")
6326class TokenCreateResponse(BaseModel):
6327 """Schema for token creation response.
6329 Attributes:
6330 token: Token information
6331 access_token: The actual token string (only returned on creation)
6333 Examples:
6334 >>> from datetime import datetime
6335 >>> token_info = TokenResponse(
6336 ... id="token-123", name="Test Token", description=None,
6337 ... user_email="test@example.com", server_id=None, resource_scopes=[], ip_restrictions=[],
6338 ... time_restrictions={}, usage_limits={}, created_at=datetime.now(),
6339 ... expires_at=None, last_used=None, is_active=True, tags=[]
6340 ... )
6341 >>> response = TokenCreateResponse(
6342 ... token=token_info,
6343 ... access_token="abc123xyz"
6344 ... )
6345 >>> response.access_token
6346 'abc123xyz'
6347 """
6349 token: TokenResponse = Field(..., description="Token information")
6350 access_token: str = Field(..., description="The actual token string")
6353class TokenListResponse(BaseModel):
6354 """Schema for token list response.
6356 Attributes:
6357 tokens: List of tokens
6358 total: Total number of tokens
6359 limit: Request limit
6360 offset: Request offset
6362 Examples:
6363 >>> response = TokenListResponse(
6364 ... tokens=[],
6365 ... total=0,
6366 ... limit=10,
6367 ... offset=0
6368 ... )
6369 >>> response.total
6370 0
6371 """
6373 tokens: List[TokenResponse] = Field(..., description="List of tokens")
6374 total: int = Field(..., description="Total number of tokens")
6375 limit: int = Field(..., description="Request limit")
6376 offset: int = Field(..., description="Request offset")
6379class TokenRevokeRequest(BaseModel):
6380 """Schema for token revocation.
6382 Attributes:
6383 reason: Optional reason for revocation
6385 Examples:
6386 >>> request = TokenRevokeRequest(reason="Security incident")
6387 >>> request.reason
6388 'Security incident'
6389 """
6391 reason: Optional[str] = Field(None, description="Reason for revocation", max_length=255)
6394class TokenUsageStatsResponse(BaseModel):
6395 """Schema for token usage statistics.
6397 Attributes:
6398 period_days: Number of days analyzed
6399 total_requests: Total number of requests
6400 successful_requests: Number of successful requests
6401 blocked_requests: Number of blocked requests
6402 success_rate: Success rate percentage
6403 average_response_time_ms: Average response time
6404 top_endpoints: Most accessed endpoints
6406 Examples:
6407 >>> stats = TokenUsageStatsResponse(
6408 ... period_days=30,
6409 ... total_requests=100,
6410 ... successful_requests=95,
6411 ... blocked_requests=5,
6412 ... success_rate=0.95,
6413 ... average_response_time_ms=150.5,
6414 ... top_endpoints=[("/tools", 50), ("/resources", 30)]
6415 ... )
6416 >>> stats.success_rate
6417 0.95
6418 """
6420 period_days: int = Field(..., description="Number of days analyzed")
6421 total_requests: int = Field(..., description="Total number of requests")
6422 successful_requests: int = Field(..., description="Number of successful requests")
6423 blocked_requests: int = Field(..., description="Number of blocked requests")
6424 success_rate: float = Field(..., description="Success rate (0-1)")
6425 average_response_time_ms: float = Field(..., description="Average response time in milliseconds")
6426 top_endpoints: List[tuple[str, int]] = Field(..., description="Most accessed endpoints with counts")
6429# ===== RBAC Schemas =====
6432class RoleCreateRequest(BaseModel):
6433 """Schema for creating a new role.
6435 Attributes:
6436 name: Unique role name
6437 description: Role description
6438 scope: Role scope (global, team, personal)
6439 permissions: List of permission strings
6440 inherits_from: Optional parent role ID
6441 is_system_role: Whether this is a system role
6443 Examples:
6444 >>> request = RoleCreateRequest(
6445 ... name="team_admin",
6446 ... description="Team administrator with member management",
6447 ... scope="team",
6448 ... permissions=["teams.manage_members", "resources.create"]
6449 ... )
6450 >>> request.name
6451 'team_admin'
6452 """
6454 name: str = Field(..., description="Unique role name", max_length=255)
6455 description: Optional[str] = Field(None, description="Role description")
6456 scope: str = Field(..., description="Role scope", pattern="^(global|team|personal)$")
6457 permissions: List[str] = Field(..., description="List of permission strings")
6458 inherits_from: Optional[str] = Field(None, description="Parent role ID for inheritance")
6459 is_system_role: Optional[bool] = Field(False, description="Whether this is a system role")
6462class RoleUpdateRequest(BaseModel):
6463 """Schema for updating an existing role.
6465 Attributes:
6466 name: Optional new name
6467 description: Optional new description
6468 permissions: Optional new permissions list
6469 inherits_from: Optional new parent role
6470 is_active: Optional active status
6472 Examples:
6473 >>> request = RoleUpdateRequest(
6474 ... description="Updated role description",
6475 ... permissions=["new.permission"]
6476 ... )
6477 >>> request.description
6478 'Updated role description'
6479 """
6481 name: Optional[str] = Field(None, description="Role name", max_length=255)
6482 description: Optional[str] = Field(None, description="Role description")
6483 permissions: Optional[List[str]] = Field(None, description="List of permission strings")
6484 inherits_from: Optional[str] = Field(None, description="Parent role ID for inheritance")
6485 is_active: Optional[bool] = Field(None, description="Whether role is active")
6488class RoleResponse(BaseModel):
6489 """Schema for role response.
6491 Attributes:
6492 id: Role identifier
6493 name: Role name
6494 description: Role description
6495 scope: Role scope
6496 permissions: List of permissions
6497 effective_permissions: All permissions including inherited
6498 inherits_from: Parent role ID
6499 created_by: Creator email
6500 is_system_role: Whether system role
6501 is_active: Whether role is active
6502 created_at: Creation timestamp
6503 updated_at: Update timestamp
6505 Examples:
6506 >>> role = RoleResponse(
6507 ... id="role-123",
6508 ... name="admin",
6509 ... scope="global",
6510 ... permissions=["*"],
6511 ... effective_permissions=["*"],
6512 ... created_by="admin@example.com",
6513 ... is_system_role=True,
6514 ... is_active=True,
6515 ... created_at=datetime.now(),
6516 ... updated_at=datetime.now()
6517 ... )
6518 >>> role.name
6519 'admin'
6520 """
6522 model_config = ConfigDict(from_attributes=True)
6524 id: str = Field(..., description="Role identifier")
6525 name: str = Field(..., description="Role name")
6526 description: Optional[str] = Field(None, description="Role description")
6527 scope: str = Field(..., description="Role scope")
6528 permissions: List[str] = Field(..., description="Direct permissions")
6529 effective_permissions: Optional[List[str]] = Field(None, description="All permissions including inherited")
6530 inherits_from: Optional[str] = Field(None, description="Parent role ID")
6531 created_by: str = Field(..., description="Creator email")
6532 is_system_role: bool = Field(..., description="Whether system role")
6533 is_active: bool = Field(..., description="Whether role is active")
6534 created_at: datetime = Field(..., description="Creation timestamp")
6535 updated_at: datetime = Field(..., description="Update timestamp")
6538class UserRoleAssignRequest(BaseModel):
6539 """Schema for assigning a role to a user.
6541 Attributes:
6542 role_id: Role to assign
6543 scope: Assignment scope
6544 scope_id: Team ID if team-scoped
6545 expires_at: Optional expiration timestamp
6547 Examples:
6548 >>> request = UserRoleAssignRequest(
6549 ... role_id="role-123",
6550 ... scope="team",
6551 ... scope_id="team-456"
6552 ... )
6553 >>> request.scope
6554 'team'
6555 """
6557 role_id: str = Field(..., description="Role ID to assign")
6558 scope: str = Field(..., description="Assignment scope", pattern="^(global|team|personal)$")
6559 scope_id: Optional[str] = Field(None, description="Team ID if team-scoped")
6560 expires_at: Optional[datetime] = Field(None, description="Optional expiration timestamp")
6563class UserRoleResponse(BaseModel):
6564 """Schema for user role assignment response.
6566 Attributes:
6567 id: Assignment identifier
6568 user_email: User email
6569 role_id: Role identifier
6570 role_name: Role name for convenience
6571 scope: Assignment scope
6572 scope_id: Team ID if applicable
6573 granted_by: Who granted the role
6574 granted_at: When role was granted
6575 expires_at: Optional expiration
6576 is_active: Whether assignment is active
6578 Examples:
6579 >>> user_role = UserRoleResponse(
6580 ... id="assignment-123",
6581 ... user_email="user@example.com",
6582 ... role_id="role-456",
6583 ... role_name="team_admin",
6584 ... scope="team",
6585 ... scope_id="team-789",
6586 ... granted_by="admin@example.com",
6587 ... granted_at=datetime.now(),
6588 ... is_active=True
6589 ... )
6590 >>> user_role.scope
6591 'team'
6592 """
6594 model_config = ConfigDict(from_attributes=True)
6596 id: str = Field(..., description="Assignment identifier")
6597 user_email: str = Field(..., description="User email")
6598 role_id: str = Field(..., description="Role identifier")
6599 role_name: Optional[str] = Field(None, description="Role name for convenience")
6600 scope: str = Field(..., description="Assignment scope")
6601 scope_id: Optional[str] = Field(None, description="Team ID if applicable")
6602 granted_by: str = Field(..., description="Who granted the role")
6603 granted_at: datetime = Field(..., description="When role was granted")
6604 expires_at: Optional[datetime] = Field(None, description="Optional expiration")
6605 is_active: bool = Field(..., description="Whether assignment is active")
6608class PermissionCheckRequest(BaseModel):
6609 """Schema for permission check request.
6611 Attributes:
6612 user_email: User to check
6613 permission: Permission to verify
6614 resource_type: Optional resource type
6615 resource_id: Optional resource ID
6616 team_id: Optional team context
6618 Examples:
6619 >>> request = PermissionCheckRequest(
6620 ... user_email="user@example.com",
6621 ... permission="tools.create",
6622 ... resource_type="tools"
6623 ... )
6624 >>> request.permission
6625 'tools.create'
6626 """
6628 user_email: str = Field(..., description="User email to check")
6629 permission: str = Field(..., description="Permission to verify")
6630 resource_type: Optional[str] = Field(None, description="Resource type")
6631 resource_id: Optional[str] = Field(None, description="Resource ID")
6632 team_id: Optional[str] = Field(None, description="Team context")
6635class PermissionCheckResponse(BaseModel):
6636 """Schema for permission check response.
6638 Attributes:
6639 user_email: User checked
6640 permission: Permission checked
6641 granted: Whether permission was granted
6642 checked_at: When check was performed
6643 checked_by: Who performed the check
6645 Examples:
6646 >>> response = PermissionCheckResponse(
6647 ... user_email="user@example.com",
6648 ... permission="tools.create",
6649 ... granted=True,
6650 ... checked_at=datetime.now(),
6651 ... checked_by="admin@example.com"
6652 ... )
6653 >>> response.granted
6654 True
6655 """
6657 user_email: str = Field(..., description="User email checked")
6658 permission: str = Field(..., description="Permission checked")
6659 granted: bool = Field(..., description="Whether permission was granted")
6660 checked_at: datetime = Field(..., description="When check was performed")
6661 checked_by: str = Field(..., description="Who performed the check")
6664class PermissionListResponse(BaseModel):
6665 """Schema for available permissions list.
6667 Attributes:
6668 all_permissions: List of all available permissions
6669 permissions_by_resource: Permissions grouped by resource type
6670 total_count: Total number of permissions
6672 Examples:
6673 >>> response = PermissionListResponse(
6674 ... all_permissions=["users.create", "tools.read"],
6675 ... permissions_by_resource={"users": ["users.create"], "tools": ["tools.read"]},
6676 ... total_count=2
6677 ... )
6678 >>> response.total_count
6679 2
6680 """
6682 all_permissions: List[str] = Field(..., description="All available permissions")
6683 permissions_by_resource: Dict[str, List[str]] = Field(..., description="Permissions by resource type")
6684 total_count: int = Field(..., description="Total number of permissions")
6687# ==============================================================================
6688# SSO Authentication Schemas
6689# ==============================================================================
6692class SSOProviderResponse(BaseModelWithConfigDict):
6693 """Response schema for SSO provider information.
6695 Attributes:
6696 id: Provider identifier (e.g., 'github', 'google')
6697 name: Provider name
6698 display_name: Human-readable display name
6699 provider_type: Type of provider ('oauth2', 'oidc')
6700 is_enabled: Whether provider is currently enabled
6701 authorization_url: OAuth authorization URL (optional)
6703 Examples:
6704 >>> provider = SSOProviderResponse(
6705 ... id="github",
6706 ... name="github",
6707 ... display_name="GitHub",
6708 ... provider_type="oauth2",
6709 ... is_enabled=True
6710 ... )
6711 >>> provider.id
6712 'github'
6713 """
6715 id: str = Field(..., description="Provider identifier")
6716 name: str = Field(..., description="Provider name")
6717 display_name: str = Field(..., description="Human-readable display name")
6718 provider_type: Optional[str] = Field(None, description="Provider type (oauth2, oidc)")
6719 is_enabled: Optional[bool] = Field(None, description="Whether provider is enabled")
6720 authorization_url: Optional[str] = Field(None, description="OAuth authorization URL")
6723class SSOLoginResponse(BaseModelWithConfigDict):
6724 """Response schema for SSO login initiation.
6726 Attributes:
6727 authorization_url: URL to redirect user for authentication
6728 state: CSRF state parameter for validation
6730 Examples:
6731 >>> login = SSOLoginResponse(
6732 ... authorization_url="https://github.com/login/oauth/authorize?...",
6733 ... state="csrf-token-123"
6734 ... )
6735 >>> "github.com" in login.authorization_url
6736 True
6737 """
6739 authorization_url: str = Field(..., description="OAuth authorization URL")
6740 state: str = Field(..., description="CSRF state parameter")
6743class SSOCallbackResponse(BaseModelWithConfigDict):
6744 """Response schema for SSO authentication callback.
6746 Attributes:
6747 access_token: JWT access token for authenticated user
6748 token_type: Token type (always 'bearer')
6749 expires_in: Token expiration time in seconds
6750 user: User information from SSO provider
6752 Examples:
6753 >>> callback = SSOCallbackResponse(
6754 ... access_token="jwt.token.here",
6755 ... token_type="bearer",
6756 ... expires_in=3600,
6757 ... user={"email": "user@example.com", "full_name": "User"}
6758 ... )
6759 >>> callback.token_type
6760 'bearer'
6761 """
6763 access_token: str = Field(..., description="JWT access token")
6764 token_type: str = Field(default="bearer", description="Token type")
6765 expires_in: int = Field(..., description="Token expiration in seconds")
6766 user: Dict[str, Any] = Field(..., description="User information")
6769# gRPC Service schemas
6772class GrpcServiceCreate(BaseModel):
6773 """Schema for creating a new gRPC service."""
6775 name: str = Field(..., min_length=1, max_length=255, description="Unique name for the gRPC service")
6776 target: str = Field(..., description="gRPC server target address (host:port)")
6777 description: Optional[str] = Field(None, description="Description of the gRPC service")
6778 reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection")
6779 tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connection")
6780 tls_cert_path: Optional[str] = Field(None, description="Path to TLS certificate file")
6781 tls_key_path: Optional[str] = Field(None, description="Path to TLS key file")
6782 grpc_metadata: Dict[str, str] = Field(default_factory=dict, description="gRPC metadata headers")
6783 tags: List[str] = Field(default_factory=list, description="Tags for categorization")
6785 # Team scoping fields
6786 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
6787 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
6788 visibility: str = Field(default="public", description="Visibility level: private, team, or public")
6790 @field_validator("name")
6791 @classmethod
6792 def validate_name(cls, v: str) -> str:
6793 """Validate service name.
6795 Args:
6796 v: Service name to validate
6798 Returns:
6799 Validated service name
6800 """
6801 return SecurityValidator.validate_name(v, "gRPC service name")
6803 @field_validator("target")
6804 @classmethod
6805 def validate_target(cls, v: str) -> str:
6806 """Validate target address format (host:port).
6808 Args:
6809 v: Target address to validate
6811 Returns:
6812 Validated target address
6814 Raises:
6815 ValueError: If target is not in host:port format
6816 """
6817 if not v or ":" not in v:
6818 raise ValueError("Target must be in host:port format")
6819 return v
6821 @field_validator("description")
6822 @classmethod
6823 def validate_description(cls, v: Optional[str]) -> Optional[str]:
6824 """Validate description.
6826 Args:
6827 v: Description to validate
6829 Returns:
6830 Validated and sanitized description
6831 """
6832 if v is None:
6833 return None
6834 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
6835 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
6836 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
6837 return SecurityValidator.sanitize_display_text(truncated, "Description")
6838 return SecurityValidator.sanitize_display_text(v, "Description")
6841class GrpcServiceUpdate(BaseModel):
6842 """Schema for updating an existing gRPC service."""
6844 name: Optional[str] = Field(None, min_length=1, max_length=255, description="Service name")
6845 target: Optional[str] = Field(None, description="gRPC server target address")
6846 description: Optional[str] = Field(None, description="Service description")
6847 reflection_enabled: Optional[bool] = Field(None, description="Enable server reflection")
6848 tls_enabled: Optional[bool] = Field(None, description="Enable TLS")
6849 tls_cert_path: Optional[str] = Field(None, description="TLS certificate path")
6850 tls_key_path: Optional[str] = Field(None, description="TLS key path")
6851 grpc_metadata: Optional[Dict[str, str]] = Field(None, description="gRPC metadata headers")
6852 tags: Optional[List[str]] = Field(None, description="Service tags")
6853 visibility: Optional[str] = Field(None, description="Visibility level")
6855 @field_validator("name")
6856 @classmethod
6857 def validate_name(cls, v: Optional[str]) -> Optional[str]:
6858 """Validate service name.
6860 Args:
6861 v: Service name to validate
6863 Returns:
6864 Validated service name or None
6865 """
6866 if v is None:
6867 return None
6868 return SecurityValidator.validate_name(v, "gRPC service name")
6870 @field_validator("target")
6871 @classmethod
6872 def validate_target(cls, v: Optional[str]) -> Optional[str]:
6873 """Validate target address.
6875 Args:
6876 v: Target address to validate
6878 Returns:
6879 Validated target address or None
6881 Raises:
6882 ValueError: If target is not in host:port format
6883 """
6884 if v is None:
6885 return None
6886 if ":" not in v: 6886 ↛ 6888line 6886 didn't jump to line 6888 because the condition on line 6886 was always true
6887 raise ValueError("Target must be in host:port format")
6888 return v
6890 @field_validator("description")
6891 @classmethod
6892 def validate_description(cls, v: Optional[str]) -> Optional[str]:
6893 """Validate description.
6895 Args:
6896 v: Description to validate
6898 Returns:
6899 Validated and sanitized description
6900 """
6901 if v is None:
6902 return None
6903 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
6904 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
6905 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
6906 return SecurityValidator.sanitize_display_text(truncated, "Description")
6907 return SecurityValidator.sanitize_display_text(v, "Description")
6910class GrpcServiceRead(BaseModel):
6911 """Schema for reading gRPC service information."""
6913 model_config = ConfigDict(from_attributes=True)
6915 id: str = Field(..., description="Unique service identifier")
6916 name: str = Field(..., description="Service name")
6917 slug: str = Field(..., description="URL-safe slug")
6918 target: str = Field(..., description="gRPC server target (host:port)")
6919 description: Optional[str] = Field(None, description="Service description")
6921 # Configuration
6922 reflection_enabled: bool = Field(..., description="Reflection enabled")
6923 tls_enabled: bool = Field(..., description="TLS enabled")
6924 tls_cert_path: Optional[str] = Field(None, description="TLS certificate path")
6925 tls_key_path: Optional[str] = Field(None, description="TLS key path")
6926 grpc_metadata: Dict[str, str] = Field(default_factory=dict, description="gRPC metadata")
6928 # Status
6929 enabled: bool = Field(..., description="Service enabled")
6930 reachable: bool = Field(..., description="Service reachable")
6932 # Discovery
6933 service_count: int = Field(default=0, description="Number of gRPC services discovered")
6934 method_count: int = Field(default=0, description="Number of methods discovered")
6935 discovered_services: Dict[str, Any] = Field(default_factory=dict, description="Discovered service descriptors")
6936 last_reflection: Optional[datetime] = Field(None, description="Last reflection timestamp")
6938 # Tags
6939 tags: List[str] = Field(default_factory=list, description="Service tags")
6941 # Timestamps
6942 created_at: datetime = Field(..., description="Creation timestamp")
6943 updated_at: datetime = Field(..., description="Last update timestamp")
6945 # Team scoping
6946 team_id: Optional[str] = Field(None, description="Team ID")
6947 owner_email: Optional[str] = Field(None, description="Owner email")
6948 visibility: str = Field(default="public", description="Visibility level")
6951# Plugin-related schemas
6954class PluginSummary(BaseModel):
6955 """Summary information for a plugin in list views."""
6957 name: str = Field(..., description="Unique plugin name")
6958 description: str = Field("", description="Plugin description")
6959 author: str = Field("Unknown", description="Plugin author")
6960 version: str = Field("0.0.0", description="Plugin version")
6961 mode: str = Field(..., description="Plugin mode: enforce, permissive, or disabled")
6962 priority: int = Field(..., description="Plugin execution priority (lower = higher priority)")
6963 hooks: List[str] = Field(default_factory=list, description="Hook points where plugin executes")
6964 tags: List[str] = Field(default_factory=list, description="Plugin tags for categorization")
6965 status: str = Field(..., description="Plugin status: enabled or disabled")
6966 config_summary: Dict[str, Any] = Field(default_factory=dict, description="Summary of plugin configuration")
6969class PluginDetail(PluginSummary):
6970 """Detailed plugin information including full configuration."""
6972 kind: str = Field("", description="Plugin type or class")
6973 namespace: Optional[str] = Field(None, description="Plugin namespace")
6974 conditions: List[Any] = Field(default_factory=list, description="Conditions for plugin execution")
6975 config: Dict[str, Any] = Field(default_factory=dict, description="Full plugin configuration")
6976 manifest: Optional[Dict[str, Any]] = Field(None, description="Plugin manifest information")
6979class PluginListResponse(BaseModel):
6980 """Response for plugin list endpoint."""
6982 plugins: List[PluginSummary] = Field(..., description="List of plugins")
6983 total: int = Field(..., description="Total number of plugins")
6984 enabled_count: int = Field(0, description="Number of enabled plugins")
6985 disabled_count: int = Field(0, description="Number of disabled plugins")
6988class PluginStatsResponse(BaseModel):
6989 """Response for plugin statistics endpoint."""
6991 total_plugins: int = Field(..., description="Total number of plugins")
6992 enabled_plugins: int = Field(..., description="Number of enabled plugins")
6993 disabled_plugins: int = Field(..., description="Number of disabled plugins")
6994 plugins_by_hook: Dict[str, int] = Field(default_factory=dict, description="Plugin count by hook type")
6995 plugins_by_mode: Dict[str, int] = Field(default_factory=dict, description="Plugin count by mode")
6998# MCP Server Catalog Schemas
7001class CatalogServer(BaseModel):
7002 """Schema for a catalog server entry."""
7004 id: str = Field(..., description="Unique identifier for the catalog server")
7005 name: str = Field(..., description="Display name of the server")
7006 category: str = Field(..., description="Server category (e.g., Project Management, Software Development)")
7007 url: str = Field(..., description="Server endpoint URL")
7008 auth_type: str = Field(..., description="Authentication type (e.g., OAuth2.1, API Key, Open)")
7009 provider: str = Field(..., description="Provider/vendor name")
7010 description: str = Field(..., description="Server description")
7011 requires_api_key: bool = Field(default=False, description="Whether API key is required")
7012 secure: bool = Field(default=False, description="Whether additional security is required")
7013 tags: List[str] = Field(default_factory=list, description="Tags for categorization")
7014 transport: Optional[str] = Field(None, description="Transport type: SSE, STREAMABLEHTTP, or WEBSOCKET")
7015 logo_url: Optional[str] = Field(None, description="URL to server logo/icon")
7016 documentation_url: Optional[str] = Field(None, description="URL to server documentation")
7017 is_registered: bool = Field(default=False, description="Whether server is already registered")
7018 is_available: bool = Field(default=True, description="Whether server is currently available")
7019 requires_oauth_config: bool = Field(default=False, description="Whether server is registered but needs OAuth configuration")
7022class CatalogServerRegisterRequest(BaseModel):
7023 """Request to register a catalog server."""
7025 server_id: str = Field(..., description="Catalog server ID to register")
7026 name: Optional[str] = Field(None, description="Optional custom name for the server")
7027 api_key: Optional[str] = Field(None, description="API key if required")
7028 oauth_credentials: Optional[Dict[str, Any]] = Field(None, description="OAuth credentials if required")
7031class CatalogServerRegisterResponse(BaseModel):
7032 """Response after registering a catalog server."""
7034 success: bool = Field(..., description="Whether registration was successful")
7035 server_id: str = Field(..., description="ID of the registered server in the system")
7036 message: str = Field(..., description="Status message")
7037 error: Optional[str] = Field(None, description="Error message if registration failed")
7038 oauth_required: bool = Field(False, description="Whether OAuth configuration is required before activation")
7041class CatalogServerStatusRequest(BaseModel):
7042 """Request to check catalog server status."""
7044 server_id: str = Field(..., description="Catalog server ID to check")
7047class CatalogServerStatusResponse(BaseModel):
7048 """Response for catalog server status check."""
7050 server_id: str = Field(..., description="Catalog server ID")
7051 is_available: bool = Field(..., description="Whether server is reachable")
7052 is_registered: bool = Field(..., description="Whether server is registered")
7053 last_checked: Optional[datetime] = Field(None, description="Last health check timestamp")
7054 response_time_ms: Optional[float] = Field(None, description="Response time in milliseconds")
7055 error: Optional[str] = Field(None, description="Error message if check failed")
7058class CatalogListRequest(BaseModel):
7059 """Request to list catalog servers."""
7061 category: Optional[str] = Field(None, description="Filter by category")
7062 auth_type: Optional[str] = Field(None, description="Filter by auth type")
7063 provider: Optional[str] = Field(None, description="Filter by provider")
7064 search: Optional[str] = Field(None, description="Search term for name/description")
7065 tags: Optional[List[str]] = Field(None, description="Filter by tags")
7066 show_registered_only: bool = Field(default=False, description="Show only registered servers")
7067 show_available_only: bool = Field(default=True, description="Show only available servers")
7068 limit: int = Field(default=100, description="Maximum number of results")
7069 offset: int = Field(default=0, description="Offset for pagination")
7072class CatalogListResponse(BaseModel):
7073 """Response containing catalog servers."""
7075 servers: List[CatalogServer] = Field(..., description="List of catalog servers")
7076 total: int = Field(..., description="Total number of matching servers")
7077 categories: List[str] = Field(..., description="Available categories")
7078 auth_types: List[str] = Field(..., description="Available auth types")
7079 providers: List[str] = Field(..., description="Available providers")
7080 all_tags: List[str] = Field(default_factory=list, description="All available tags")
7083class CatalogBulkRegisterRequest(BaseModel):
7084 """Request to register multiple catalog servers."""
7086 server_ids: List[str] = Field(..., description="List of catalog server IDs to register")
7087 skip_errors: bool = Field(default=True, description="Continue on error")
7090class CatalogBulkRegisterResponse(BaseModel):
7091 """Response after bulk registration."""
7093 successful: List[str] = Field(..., description="Successfully registered server IDs")
7094 failed: List[Dict[str, str]] = Field(..., description="Failed registrations with error messages")
7095 total_attempted: int = Field(..., description="Total servers attempted")
7096 total_successful: int = Field(..., description="Total successful registrations")
7099# ===================================
7100# Pagination Schemas
7101# ===================================
7104class PaginationMeta(BaseModel):
7105 """Pagination metadata.
7107 Attributes:
7108 page: Current page number (1-indexed)
7109 per_page: Items per page
7110 total_items: Total number of items across all pages
7111 total_pages: Total number of pages
7112 has_next: Whether there is a next page
7113 has_prev: Whether there is a previous page
7114 next_cursor: Cursor for next page (cursor-based only)
7115 prev_cursor: Cursor for previous page (cursor-based only)
7117 Examples:
7118 >>> meta = PaginationMeta(
7119 ... page=2,
7120 ... per_page=50,
7121 ... total_items=250,
7122 ... total_pages=5,
7123 ... has_next=True,
7124 ... has_prev=True
7125 ... )
7126 >>> meta.page
7127 2
7128 >>> meta.total_pages
7129 5
7130 """
7132 page: int = Field(..., description="Current page number (1-indexed)", ge=1)
7133 per_page: int = Field(..., description="Items per page", ge=1)
7134 total_items: int = Field(..., description="Total number of items", ge=0)
7135 total_pages: int = Field(..., description="Total number of pages", ge=0)
7136 has_next: bool = Field(..., description="Whether there is a next page")
7137 has_prev: bool = Field(..., description="Whether there is a previous page")
7138 next_cursor: Optional[str] = Field(None, description="Cursor for next page (cursor-based only)")
7139 prev_cursor: Optional[str] = Field(None, description="Cursor for previous page (cursor-based only)")
7142class PaginationLinks(BaseModel):
7143 """Pagination navigation links.
7145 Attributes:
7146 self: Current page URL
7147 first: First page URL
7148 last: Last page URL
7149 next: Next page URL (None if no next page)
7150 prev: Previous page URL (None if no previous page)
7152 Examples:
7153 >>> links = PaginationLinks(
7154 ... self="/admin/tools?page=2&per_page=50",
7155 ... first="/admin/tools?page=1&per_page=50",
7156 ... last="/admin/tools?page=5&per_page=50",
7157 ... next="/admin/tools?page=3&per_page=50",
7158 ... prev="/admin/tools?page=1&per_page=50"
7159 ... )
7160 >>> links.self
7161 '/admin/tools?page=2&per_page=50'
7162 """
7164 self: str = Field(..., description="Current page URL")
7165 first: str = Field(..., description="First page URL")
7166 last: str = Field(..., description="Last page URL")
7167 next: Optional[str] = Field(None, description="Next page URL")
7168 prev: Optional[str] = Field(None, description="Previous page URL")
7171class PaginatedResponse(BaseModel):
7172 """Generic paginated response wrapper.
7174 This is a container for paginated data with metadata and navigation links.
7175 The actual data is stored in the 'data' field as a list of items.
7177 Attributes:
7178 data: List of items for the current page
7179 pagination: Pagination metadata (counts, page info)
7180 links: Navigation links (optional)
7182 Examples:
7183 >>> from mcpgateway.schemas import ToolRead
7184 >>> response = PaginatedResponse(
7185 ... data=[],
7186 ... pagination=PaginationMeta(
7187 ... page=1, per_page=50, total_items=0,
7188 ... total_pages=0, has_next=False, has_prev=False
7189 ... ),
7190 ... links=None
7191 ... )
7192 >>> response.pagination.page
7193 1
7194 """
7196 data: List[Any] = Field(..., description="List of items")
7197 pagination: PaginationMeta = Field(..., description="Pagination metadata")
7198 links: Optional[PaginationLinks] = Field(None, description="Navigation links")
7201class PaginationParams(BaseModel):
7202 """Common pagination query parameters.
7204 Attributes:
7205 page: Page number (1-indexed)
7206 per_page: Items per page
7207 cursor: Cursor for cursor-based pagination
7208 sort_by: Field to sort by
7209 sort_order: Sort order (asc/desc)
7211 Examples:
7212 >>> params = PaginationParams(page=1, per_page=50)
7213 >>> params.page
7214 1
7215 >>> params.sort_order
7216 'desc'
7217 """
7219 page: int = Field(default=1, ge=1, description="Page number (1-indexed)")
7220 per_page: int = Field(default=50, ge=1, le=500, description="Items per page (max 500)")
7221 cursor: Optional[str] = Field(None, description="Cursor for cursor-based pagination")
7222 sort_by: Optional[str] = Field("created_at", description="Sort field")
7223 sort_order: Optional[str] = Field("desc", pattern="^(asc|desc)$", description="Sort order")
7226# ============================================================================
7227# Cursor Pagination Response Schemas (for main API endpoints)
7228# ============================================================================
7231class CursorPaginatedToolsResponse(BaseModel):
7232 """Cursor-paginated response for tools list endpoint."""
7234 tools: List["ToolRead"] = Field(..., description="List of tools for this page")
7235 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7238class CursorPaginatedServersResponse(BaseModel):
7239 """Cursor-paginated response for servers list endpoint."""
7241 servers: List["ServerRead"] = Field(..., description="List of servers for this page")
7242 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7245class CursorPaginatedGatewaysResponse(BaseModel):
7246 """Cursor-paginated response for gateways list endpoint."""
7248 gateways: List["GatewayRead"] = Field(..., description="List of gateways for this page")
7249 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7252class CursorPaginatedResourcesResponse(BaseModel):
7253 """Cursor-paginated response for resources list endpoint."""
7255 resources: List["ResourceRead"] = Field(..., description="List of resources for this page")
7256 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7259class CursorPaginatedPromptsResponse(BaseModel):
7260 """Cursor-paginated response for prompts list endpoint."""
7262 prompts: List["PromptRead"] = Field(..., description="List of prompts for this page")
7263 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7266class CursorPaginatedA2AAgentsResponse(BaseModel):
7267 """Cursor-paginated response for A2A agents list endpoint."""
7269 agents: List["A2AAgentRead"] = Field(..., description="List of A2A agents for this page")
7270 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7273class CursorPaginatedTeamsResponse(BaseModel):
7274 """Cursor-paginated response for teams list endpoint."""
7276 teams: List["TeamResponse"] = Field(..., description="List of teams for this page")
7277 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7280class CursorPaginatedUsersResponse(BaseModel):
7281 """Cursor-paginated response for users list endpoint."""
7283 users: List["EmailUserResponse"] = Field(..., description="List of users for this page")
7284 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7287# ============================================================================
7288# Observability Schemas (OpenTelemetry-style traces, spans, events, metrics)
7289# ============================================================================
7292class ObservabilityTraceBase(BaseModel):
7293 """Base schema for observability traces."""
7295 name: str = Field(..., description="Trace name (e.g., 'POST /tools/invoke')")
7296 start_time: datetime = Field(..., description="Trace start timestamp")
7297 end_time: Optional[datetime] = Field(None, description="Trace end timestamp")
7298 duration_ms: Optional[float] = Field(None, description="Total duration in milliseconds")
7299 status: str = Field("unset", description="Trace status (unset, ok, error)")
7300 status_message: Optional[str] = Field(None, description="Status message or error description")
7301 http_method: Optional[str] = Field(None, description="HTTP method")
7302 http_url: Optional[str] = Field(None, description="HTTP URL")
7303 http_status_code: Optional[int] = Field(None, description="HTTP status code")
7304 user_email: Optional[str] = Field(None, description="User email")
7305 user_agent: Optional[str] = Field(None, description="User agent string")
7306 ip_address: Optional[str] = Field(None, description="Client IP address")
7307 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional trace attributes")
7308 resource_attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Resource attributes")
7311class ObservabilityTraceCreate(ObservabilityTraceBase):
7312 """Schema for creating an observability trace."""
7314 trace_id: Optional[str] = Field(None, description="Trace ID (generated if not provided)")
7317class ObservabilityTraceUpdate(BaseModel):
7318 """Schema for updating an observability trace."""
7320 end_time: Optional[datetime] = None
7321 duration_ms: Optional[float] = None
7322 status: Optional[str] = None
7323 status_message: Optional[str] = None
7324 http_status_code: Optional[int] = None
7325 attributes: Optional[Dict[str, Any]] = None
7328class ObservabilityTraceRead(ObservabilityTraceBase):
7329 """Schema for reading an observability trace."""
7331 trace_id: str = Field(..., description="Trace ID")
7332 created_at: datetime = Field(..., description="Creation timestamp")
7334 model_config = {"from_attributes": True}
7337class ObservabilitySpanBase(BaseModel):
7338 """Base schema for observability spans."""
7340 trace_id: str = Field(..., description="Parent trace ID")
7341 parent_span_id: Optional[str] = Field(None, description="Parent span ID (for nested spans)")
7342 name: str = Field(..., description="Span name (e.g., 'database_query', 'tool_invocation')")
7343 kind: str = Field("internal", description="Span kind (internal, server, client, producer, consumer)")
7344 start_time: datetime = Field(..., description="Span start timestamp")
7345 end_time: Optional[datetime] = Field(None, description="Span end timestamp")
7346 duration_ms: Optional[float] = Field(None, description="Span duration in milliseconds")
7347 status: str = Field("unset", description="Span status (unset, ok, error)")
7348 status_message: Optional[str] = Field(None, description="Status message")
7349 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Span attributes")
7350 resource_name: Optional[str] = Field(None, description="Resource name")
7351 resource_type: Optional[str] = Field(None, description="Resource type (tool, resource, prompt, gateway, a2a_agent)")
7352 resource_id: Optional[str] = Field(None, description="Resource ID")
7355class ObservabilitySpanCreate(ObservabilitySpanBase):
7356 """Schema for creating an observability span."""
7358 span_id: Optional[str] = Field(None, description="Span ID (generated if not provided)")
7361class ObservabilitySpanUpdate(BaseModel):
7362 """Schema for updating an observability span."""
7364 end_time: Optional[datetime] = None
7365 duration_ms: Optional[float] = None
7366 status: Optional[str] = None
7367 status_message: Optional[str] = None
7368 attributes: Optional[Dict[str, Any]] = None
7371class ObservabilitySpanRead(ObservabilitySpanBase):
7372 """Schema for reading an observability span."""
7374 span_id: str = Field(..., description="Span ID")
7375 created_at: datetime = Field(..., description="Creation timestamp")
7377 model_config = {"from_attributes": True}
7380class ObservabilityEventBase(BaseModel):
7381 """Base schema for observability events."""
7383 span_id: str = Field(..., description="Parent span ID")
7384 name: str = Field(..., description="Event name (e.g., 'exception', 'log', 'checkpoint')")
7385 timestamp: datetime = Field(..., description="Event timestamp")
7386 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Event attributes")
7387 severity: Optional[str] = Field(None, description="Log severity (debug, info, warning, error, critical)")
7388 message: Optional[str] = Field(None, description="Event message")
7389 exception_type: Optional[str] = Field(None, description="Exception class name")
7390 exception_message: Optional[str] = Field(None, description="Exception message")
7391 exception_stacktrace: Optional[str] = Field(None, description="Exception stacktrace")
7394class ObservabilityEventCreate(ObservabilityEventBase):
7395 """Schema for creating an observability event."""
7398class ObservabilityEventRead(ObservabilityEventBase):
7399 """Schema for reading an observability event."""
7401 id: int = Field(..., description="Event ID")
7402 created_at: datetime = Field(..., description="Creation timestamp")
7404 model_config = {"from_attributes": True}
7407class ObservabilityMetricBase(BaseModel):
7408 """Base schema for observability metrics."""
7410 name: str = Field(..., description="Metric name (e.g., 'http.request.duration', 'tool.invocation.count')")
7411 metric_type: str = Field(..., description="Metric type (counter, gauge, histogram)")
7412 value: float = Field(..., description="Metric value")
7413 timestamp: datetime = Field(..., description="Metric timestamp")
7414 unit: Optional[str] = Field(None, description="Metric unit (ms, count, bytes, etc.)")
7415 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Metric attributes/labels")
7416 resource_type: Optional[str] = Field(None, description="Resource type")
7417 resource_id: Optional[str] = Field(None, description="Resource ID")
7418 trace_id: Optional[str] = Field(None, description="Associated trace ID")
7421class ObservabilityMetricCreate(ObservabilityMetricBase):
7422 """Schema for creating an observability metric."""
7425class ObservabilityMetricRead(ObservabilityMetricBase):
7426 """Schema for reading an observability metric."""
7428 id: int = Field(..., description="Metric ID")
7429 created_at: datetime = Field(..., description="Creation timestamp")
7431 model_config = {"from_attributes": True}
7434class ObservabilityTraceWithSpans(ObservabilityTraceRead):
7435 """Schema for reading a trace with its spans."""
7437 spans: List[ObservabilitySpanRead] = Field(default_factory=list, description="List of spans in this trace")
7440class ObservabilitySpanWithEvents(ObservabilitySpanRead):
7441 """Schema for reading a span with its events."""
7443 events: List[ObservabilityEventRead] = Field(default_factory=list, description="List of events in this span")
7446class ObservabilityQueryParams(BaseModel):
7447 """Query parameters for filtering observability data."""
7449 start_time: Optional[datetime] = Field(None, description="Filter traces/spans/metrics after this time")
7450 end_time: Optional[datetime] = Field(None, description="Filter traces/spans/metrics before this time")
7451 status: Optional[str] = Field(None, description="Filter by status (ok, error, unset)")
7452 http_status_code: Optional[int] = Field(None, description="Filter by HTTP status code")
7453 user_email: Optional[str] = Field(None, description="Filter by user email")
7454 resource_type: Optional[str] = Field(None, description="Filter by resource type")
7455 resource_name: Optional[str] = Field(None, description="Filter by resource name")
7456 trace_id: Optional[str] = Field(None, description="Filter by trace ID")
7457 limit: int = Field(default=100, ge=1, le=1000, description="Maximum number of results")
7458 offset: int = Field(default=0, ge=0, description="Result offset for pagination")
7461# --- Performance Monitoring Schemas ---
7464class WorkerMetrics(BaseModel):
7465 """Metrics for a single worker process."""
7467 pid: int = Field(..., description="Process ID")
7468 cpu_percent: float = Field(..., description="CPU utilization percentage")
7469 memory_rss_mb: float = Field(..., description="Resident Set Size memory in MB")
7470 memory_vms_mb: float = Field(..., description="Virtual Memory Size in MB")
7471 threads: int = Field(..., description="Number of threads")
7472 connections: int = Field(0, description="Number of network connections")
7473 open_fds: Optional[int] = Field(None, description="Number of open file descriptors")
7474 status: str = Field("running", description="Worker status")
7475 create_time: Optional[datetime] = Field(None, description="Worker start time")
7476 uptime_seconds: Optional[int] = Field(None, description="Worker uptime in seconds")
7479class SystemMetricsSchema(BaseModel):
7480 """System-wide resource metrics."""
7482 # CPU metrics
7483 cpu_percent: float = Field(..., description="Total CPU utilization percentage")
7484 cpu_count: int = Field(..., description="Number of logical CPU cores")
7485 cpu_freq_mhz: Optional[float] = Field(None, description="Current CPU frequency in MHz")
7486 load_avg_1m: Optional[float] = Field(None, description="1-minute load average")
7487 load_avg_5m: Optional[float] = Field(None, description="5-minute load average")
7488 load_avg_15m: Optional[float] = Field(None, description="15-minute load average")
7490 # Memory metrics
7491 memory_total_mb: int = Field(..., description="Total physical memory in MB")
7492 memory_used_mb: int = Field(..., description="Used physical memory in MB")
7493 memory_available_mb: int = Field(..., description="Available memory in MB")
7494 memory_percent: float = Field(..., description="Memory utilization percentage")
7495 swap_total_mb: int = Field(0, description="Total swap space in MB")
7496 swap_used_mb: int = Field(0, description="Used swap space in MB")
7498 # Disk metrics
7499 disk_total_gb: float = Field(..., description="Total disk space in GB")
7500 disk_used_gb: float = Field(..., description="Used disk space in GB")
7501 disk_percent: float = Field(..., description="Disk utilization percentage")
7503 # Network metrics
7504 network_bytes_sent: int = Field(0, description="Total network bytes sent")
7505 network_bytes_recv: int = Field(0, description="Total network bytes received")
7506 network_connections: int = Field(0, description="Active network connections")
7508 # Process info
7509 boot_time: Optional[datetime] = Field(None, description="System boot time")
7512class RequestMetricsSchema(BaseModel):
7513 """HTTP request performance metrics."""
7515 requests_total: int = Field(0, description="Total HTTP requests")
7516 requests_per_second: float = Field(0, description="Current request rate")
7517 requests_1xx: int = Field(0, description="1xx informational responses")
7518 requests_2xx: int = Field(0, description="2xx success responses")
7519 requests_3xx: int = Field(0, description="3xx redirect responses")
7520 requests_4xx: int = Field(0, description="4xx client error responses")
7521 requests_5xx: int = Field(0, description="5xx server error responses")
7523 # Response time percentiles
7524 response_time_avg_ms: float = Field(0, description="Average response time in ms")
7525 response_time_p50_ms: float = Field(0, description="50th percentile response time")
7526 response_time_p95_ms: float = Field(0, description="95th percentile response time")
7527 response_time_p99_ms: float = Field(0, description="99th percentile response time")
7529 # Error rate
7530 error_rate: float = Field(0, description="Percentage of 4xx/5xx responses")
7532 # Active requests
7533 active_requests: int = Field(0, description="Currently processing requests")
7536class DatabaseMetricsSchema(BaseModel):
7537 """Database connection pool metrics."""
7539 pool_size: int = Field(0, description="Connection pool size")
7540 connections_in_use: int = Field(0, description="Active connections")
7541 connections_available: int = Field(0, description="Available connections")
7542 overflow: int = Field(0, description="Overflow connections")
7543 query_count: int = Field(0, description="Total queries executed")
7544 query_avg_time_ms: float = Field(0, description="Average query time in ms")
7547class CacheMetricsSchema(BaseModel):
7548 """Redis cache metrics."""
7550 connected: bool = Field(False, description="Redis connection status")
7551 version: Optional[str] = Field(None, description="Redis version")
7552 used_memory_mb: float = Field(0, description="Redis memory usage in MB")
7553 connected_clients: int = Field(0, description="Connected Redis clients")
7554 ops_per_second: int = Field(0, description="Redis operations per second")
7555 hit_rate: float = Field(0, description="Cache hit rate percentage")
7556 keyspace_hits: int = Field(0, description="Successful key lookups")
7557 keyspace_misses: int = Field(0, description="Failed key lookups")
7560class GunicornMetricsSchema(BaseModel):
7561 """Gunicorn server metrics."""
7563 master_pid: Optional[int] = Field(None, description="Master process PID")
7564 workers_total: int = Field(0, description="Total configured workers")
7565 workers_active: int = Field(0, description="Currently active workers")
7566 workers_idle: int = Field(0, description="Idle workers")
7567 max_requests: int = Field(0, description="Max requests before worker restart")
7570class PerformanceSnapshotCreate(BaseModel):
7571 """Schema for creating a performance snapshot."""
7573 host: str = Field(..., description="Hostname")
7574 worker_id: Optional[str] = Field(None, description="Worker identifier")
7575 metrics_json: Dict[str, Any] = Field(..., description="Serialized metrics data")
7578class PerformanceSnapshotRead(BaseModel):
7579 """Schema for reading a performance snapshot."""
7581 id: int = Field(..., description="Snapshot ID")
7582 timestamp: datetime = Field(..., description="Snapshot timestamp")
7583 host: str = Field(..., description="Hostname")
7584 worker_id: Optional[str] = Field(None, description="Worker identifier")
7585 metrics_json: Dict[str, Any] = Field(..., description="Serialized metrics data")
7586 created_at: datetime = Field(..., description="Creation timestamp")
7588 model_config = {"from_attributes": True}
7591class PerformanceAggregateBase(BaseModel):
7592 """Base schema for performance aggregates."""
7594 period_start: datetime = Field(..., description="Start of aggregation period")
7595 period_end: datetime = Field(..., description="End of aggregation period")
7596 period_type: str = Field(..., description="Aggregation type (hourly, daily)")
7597 host: Optional[str] = Field(None, description="Host (None for cluster-wide)")
7599 # Request aggregates
7600 requests_total: int = Field(0, description="Total requests in period")
7601 requests_2xx: int = Field(0, description="2xx responses in period")
7602 requests_4xx: int = Field(0, description="4xx responses in period")
7603 requests_5xx: int = Field(0, description="5xx responses in period")
7604 avg_response_time_ms: float = Field(0, description="Average response time")
7605 p95_response_time_ms: float = Field(0, description="95th percentile response time")
7606 peak_requests_per_second: float = Field(0, description="Peak request rate")
7608 # Resource aggregates
7609 avg_cpu_percent: float = Field(0, description="Average CPU utilization")
7610 avg_memory_percent: float = Field(0, description="Average memory utilization")
7611 peak_cpu_percent: float = Field(0, description="Peak CPU utilization")
7612 peak_memory_percent: float = Field(0, description="Peak memory utilization")
7615class PerformanceAggregateCreate(PerformanceAggregateBase):
7616 """Schema for creating a performance aggregate."""
7619class PerformanceAggregateRead(PerformanceAggregateBase):
7620 """Schema for reading a performance aggregate."""
7622 id: int = Field(..., description="Aggregate ID")
7623 created_at: datetime = Field(..., description="Creation timestamp")
7625 model_config = {"from_attributes": True}
7628class PerformanceDashboard(BaseModel):
7629 """Complete performance dashboard data."""
7631 timestamp: datetime = Field(..., description="Dashboard generation timestamp")
7632 uptime_seconds: int = Field(0, description="Application uptime in seconds")
7633 host: str = Field(..., description="Current hostname")
7635 # Current metrics
7636 system: SystemMetricsSchema = Field(..., description="Current system metrics")
7637 requests: RequestMetricsSchema = Field(..., description="Current request metrics")
7638 database: DatabaseMetricsSchema = Field(..., description="Current database metrics")
7639 cache: CacheMetricsSchema = Field(..., description="Current cache metrics")
7640 gunicorn: GunicornMetricsSchema = Field(..., description="Current Gunicorn metrics")
7641 workers: List[WorkerMetrics] = Field(default_factory=list, description="Per-worker metrics")
7643 # Cluster info (for distributed mode)
7644 cluster_hosts: List[str] = Field(default_factory=list, description="Known cluster hosts")
7645 is_distributed: bool = Field(False, description="Running in distributed mode")
7648class PerformanceHistoryParams(BaseModel):
7649 """Query parameters for historical performance data."""
7651 start_time: Optional[datetime] = Field(None, description="Start of time range")
7652 end_time: Optional[datetime] = Field(None, description="End of time range")
7653 period_type: str = Field("hourly", description="Aggregation period (hourly, daily)")
7654 host: Optional[str] = Field(None, description="Filter by host")
7655 limit: int = Field(default=168, ge=1, le=1000, description="Maximum results")
7658class PerformanceHistoryResponse(BaseModel):
7659 """Response for historical performance data."""
7661 aggregates: List[PerformanceAggregateRead] = Field(default_factory=list, description="Historical aggregates")
7662 period_type: str = Field(..., description="Aggregation period type")
7663 total_count: int = Field(0, description="Total matching records")