Coverage for mcpgateway / schemas.py: 99%
2827 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/schemas.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7ContextForge Schema Definitions.
8This module provides Pydantic models for request/response validation in ContextForge.
9It implements schemas for:
10- Tool registration and invocation
11- Resource management and subscriptions
12- Prompt templates and arguments
13- Gateway federation
14- RPC message formats
15- Event messages
16- Admin interface
18The schemas ensure proper validation according to the MCP specification while adding
19gateway-specific extensions for federation support.
20"""
22# Standard
23import base64
24from datetime import datetime, timezone
25from enum import Enum
26import logging
27import re
28from typing import Any, Dict, List, Literal, Optional, Pattern, Self, Union
29from urllib.parse import urlparse
31# Third-Party
32import orjson
33from pydantic import AnyHttpUrl, BaseModel, ConfigDict, EmailStr, Field, field_serializer, field_validator, model_serializer, model_validator, SecretStr, ValidationInfo
35# First-Party
36from mcpgateway.common.models import Annotations, ImageContent
37from mcpgateway.common.models import Prompt as MCPPrompt
38from mcpgateway.common.models import Resource as MCPResource
39from mcpgateway.common.models import ResourceContent, TextContent
40from mcpgateway.common.models import Tool as MCPTool
41from mcpgateway.common.oauth import OAUTH_SENSITIVE_KEYS
42from mcpgateway.common.validators import SecurityValidator, validate_core_url
43from mcpgateway.config import settings
44from mcpgateway.utils.base_models import BaseModelWithConfigDict
45from mcpgateway.utils.services_auth import decode_auth, encode_auth
46from mcpgateway.validation.tags import validate_tags_field
48logger = logging.getLogger(__name__)
50# ============================================================================
51# Precompiled regex patterns (compiled once at module load for performance)
52# ============================================================================
53# Note: Only truly static patterns are precompiled here. Settings-based patterns
54# (e.g., from settings.* or SecurityValidator.*) are NOT precompiled because tests
55# override class/settings attributes at runtime via monkeypatch.
56_HOSTNAME_RE: Pattern[str] = re.compile(r"^(https?://)?([a-zA-Z0-9.-]+)(:[0-9]+)?$")
57_SLUG_RE: Pattern[str] = re.compile(r"^[a-z0-9-]+$")
60def encode_datetime(v: datetime) -> str:
61 """
62 Convert a datetime object to an ISO 8601 formatted string.
64 Args:
65 v (datetime): The datetime object to be encoded.
67 Returns:
68 str: The ISO 8601 formatted string representation of the datetime object.
70 Examples:
71 >>> from datetime import datetime, timezone
72 >>> encode_datetime(datetime(2023, 5, 22, 14, 30, 0))
73 '2023-05-22T14:30:00'
74 >>> encode_datetime(datetime(2024, 12, 25, 9, 15, 30))
75 '2024-12-25T09:15:30'
76 >>> encode_datetime(datetime(2025, 1, 1, 0, 0, 0))
77 '2025-01-01T00:00:00'
78 >>> # Test with timezone
79 >>> dt_utc = datetime(2023, 6, 15, 12, 0, 0, tzinfo=timezone.utc)
80 >>> encode_datetime(dt_utc)
81 '2023-06-15T12:00:00+00:00'
82 >>> # Test microseconds
83 >>> dt_micro = datetime(2023, 7, 20, 16, 45, 30, 123456)
84 >>> encode_datetime(dt_micro)
85 '2023-07-20T16:45:30.123456'
86 """
87 return v.isoformat()
90# --- Metrics Schemas ---
93class ToolMetrics(BaseModelWithConfigDict):
94 """
95 Represents the performance and execution statistics for a tool.
97 Attributes:
98 total_executions (int): Total number of tool invocations.
99 successful_executions (int): Number of successful tool invocations.
100 failed_executions (int): Number of failed tool invocations.
101 failure_rate (float): Failure rate (failed invocations / total invocations).
102 min_response_time (Optional[float]): Minimum response time in seconds.
103 max_response_time (Optional[float]): Maximum response time in seconds.
104 avg_response_time (Optional[float]): Average response time in seconds.
105 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
107 Examples:
108 >>> from datetime import datetime
109 >>> metrics = ToolMetrics(
110 ... total_executions=100,
111 ... successful_executions=95,
112 ... failed_executions=5,
113 ... failure_rate=0.05,
114 ... min_response_time=0.1,
115 ... max_response_time=2.5,
116 ... avg_response_time=0.8
117 ... )
118 >>> metrics.total_executions
119 100
120 >>> metrics.failure_rate
121 0.05
122 >>> metrics.successful_executions + metrics.failed_executions == metrics.total_executions
123 True
124 >>> # Test with minimal data
125 >>> minimal_metrics = ToolMetrics(
126 ... total_executions=10,
127 ... successful_executions=8,
128 ... failed_executions=2,
129 ... failure_rate=0.2
130 ... )
131 >>> minimal_metrics.min_response_time is None
132 True
133 >>> # Test model dump functionality
134 >>> data = metrics.model_dump()
135 >>> isinstance(data, dict)
136 True
137 >>> data['total_executions']
138 100
139 """
141 total_executions: int = Field(..., description="Total number of tool invocations")
142 successful_executions: int = Field(..., description="Number of successful tool invocations")
143 failed_executions: int = Field(..., description="Number of failed tool invocations")
144 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
145 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
146 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
147 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
148 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
151class ResourceMetrics(BaseModelWithConfigDict):
152 """
153 Represents the performance and execution statistics for a resource.
155 Attributes:
156 total_executions (int): Total number of resource invocations.
157 successful_executions (int): Number of successful resource invocations.
158 failed_executions (int): Number of failed resource invocations.
159 failure_rate (float): Failure rate (failed invocations / total invocations).
160 min_response_time (Optional[float]): Minimum response time in seconds.
161 max_response_time (Optional[float]): Maximum response time in seconds.
162 avg_response_time (Optional[float]): Average response time in seconds.
163 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
164 """
166 total_executions: int = Field(..., description="Total number of resource invocations")
167 successful_executions: int = Field(..., description="Number of successful resource invocations")
168 failed_executions: int = Field(..., description="Number of failed resource invocations")
169 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
170 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
171 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
172 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
173 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
176class ServerMetrics(BaseModelWithConfigDict):
177 """
178 Represents the performance and execution statistics for a server.
180 Attributes:
181 total_executions (int): Total number of server invocations.
182 successful_executions (int): Number of successful server invocations.
183 failed_executions (int): Number of failed server invocations.
184 failure_rate (float): Failure rate (failed invocations / total invocations).
185 min_response_time (Optional[float]): Minimum response time in seconds.
186 max_response_time (Optional[float]): Maximum response time in seconds.
187 avg_response_time (Optional[float]): Average response time in seconds.
188 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
189 """
191 total_executions: int = Field(..., description="Total number of server invocations")
192 successful_executions: int = Field(..., description="Number of successful server invocations")
193 failed_executions: int = Field(..., description="Number of failed server invocations")
194 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
195 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
196 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
197 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
198 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
201class PromptMetrics(BaseModelWithConfigDict):
202 """
203 Represents the performance and execution statistics for a prompt.
205 Attributes:
206 total_executions (int): Total number of prompt invocations.
207 successful_executions (int): Number of successful prompt invocations.
208 failed_executions (int): Number of failed prompt invocations.
209 failure_rate (float): Failure rate (failed invocations / total invocations).
210 min_response_time (Optional[float]): Minimum response time in seconds.
211 max_response_time (Optional[float]): Maximum response time in seconds.
212 avg_response_time (Optional[float]): Average response time in seconds.
213 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
214 """
216 total_executions: int = Field(..., description="Total number of prompt invocations")
217 successful_executions: int = Field(..., description="Number of successful prompt invocations")
218 failed_executions: int = Field(..., description="Number of failed prompt invocations")
219 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
220 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
221 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
222 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
223 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
226class A2AAgentMetrics(BaseModelWithConfigDict):
227 """
228 Represents the performance and execution statistics for an A2A agent.
230 Attributes:
231 total_executions (int): Total number of agent interactions.
232 successful_executions (int): Number of successful agent interactions.
233 failed_executions (int): Number of failed agent interactions.
234 failure_rate (float): Failure rate (failed interactions / total interactions).
235 min_response_time (Optional[float]): Minimum response time in seconds.
236 max_response_time (Optional[float]): Maximum response time in seconds.
237 avg_response_time (Optional[float]): Average response time in seconds.
238 last_execution_time (Optional[datetime]): Timestamp of the most recent interaction.
239 """
241 total_executions: int = Field(..., description="Total number of agent interactions")
242 successful_executions: int = Field(..., description="Number of successful agent interactions")
243 failed_executions: int = Field(..., description="Number of failed agent interactions")
244 failure_rate: float = Field(..., description="Failure rate (failed interactions / total interactions)")
245 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
246 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
247 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
248 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent interaction")
251class A2AAgentAggregateMetrics(BaseModelWithConfigDict):
252 """
253 Represents aggregated metrics for all A2A agents in the system.
255 This model is used for the /metrics endpoint to provide system-wide A2A agent statistics
256 with consistent camelCase field naming.
258 Attributes:
259 total_agents (int): Total number of A2A agents registered.
260 active_agents (int): Number of currently active A2A agents.
261 total_interactions (int): Total number of agent interactions.
262 successful_interactions (int): Number of successful agent interactions.
263 failed_interactions (int): Number of failed agent interactions.
264 success_rate (float): Success rate as a percentage (0-100).
265 avg_response_time (float): Average response time in seconds.
266 min_response_time (float): Minimum response time in seconds.
267 max_response_time (float): Maximum response time in seconds.
268 """
270 total_agents: int = Field(..., description="Total number of A2A agents registered")
271 active_agents: int = Field(..., description="Number of currently active A2A agents")
272 total_interactions: int = Field(..., description="Total number of agent interactions")
273 successful_interactions: int = Field(..., description="Number of successful agent interactions")
274 failed_interactions: int = Field(..., description="Number of failed agent interactions")
275 success_rate: float = Field(..., description="Success rate as a percentage (0-100)")
276 avg_response_time: float = Field(..., description="Average response time in seconds")
277 min_response_time: float = Field(..., description="Minimum response time in seconds")
278 max_response_time: float = Field(..., description="Maximum response time in seconds")
281class MetricsResponse(BaseModelWithConfigDict):
282 """
283 Response model for the aggregated metrics endpoint.
285 Contains metrics for all entity types with consistent camelCase field names.
286 When A2A metrics are disabled, the a2a_agents key is omitted entirely
287 to preserve backwards compatibility with existing consumers.
288 """
290 tools: ToolMetrics
291 resources: ResourceMetrics
292 servers: ServerMetrics
293 prompts: PromptMetrics
294 a2a_agents: Optional[A2AAgentAggregateMetrics] = None
296 @model_serializer(mode="wrap")
297 def _exclude_none_a2a(self, handler):
298 result = handler(self)
299 if self.a2a_agents is None:
300 result.pop("a2aAgents", None)
301 result.pop("a2a_agents", None)
302 return result
305# --- JSON Path API modifier Schema
308class JsonPathModifier(BaseModelWithConfigDict):
309 """Schema for JSONPath queries.
311 Provides the structure for parsing JSONPath queries and optional mapping.
312 """
314 jsonpath: Optional[str] = Field(None, description="JSONPath expression for querying JSON data.")
315 mapping: Optional[Dict[str, str]] = Field(None, description="Mapping of fields from original data to output.")
318# --- Tool Schemas ---
319# Authentication model
320class AuthenticationValues(BaseModelWithConfigDict):
321 """Schema for all Authentications.
322 Provides the authentication values for different types of authentication.
323 """
325 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, authheaders or None")
326 auth_value: Optional[str] = Field(None, description="Encoded Authentication values")
328 # Only For tool read and view tool
329 username: Optional[str] = Field("", description="Username for basic authentication")
330 password: Optional[str] = Field("", description="Password for basic authentication")
331 token: Optional[str] = Field("", description="Bearer token for authentication")
332 auth_header_key: Optional[str] = Field("", description="Key for custom headers authentication (legacy single header)")
333 auth_header_value: Optional[str] = Field("", description="Value for custom headers authentication (legacy single header)")
334 authHeaders: Optional[List[Dict[str, str]]] = Field(None, alias="authHeaders", description="List of custom headers for authentication (multi-header format)") # noqa: N815
337class ToolCreate(BaseModel):
338 """
339 Represents the configuration for creating a tool with various attributes and settings.
341 Attributes:
342 model_config (ConfigDict): Configuration for the model.
343 name (str): Unique name for the tool.
344 url (Union[str, AnyHttpUrl]): Tool endpoint URL.
345 description (Optional[str]): Tool description.
346 integration_type (Literal["REST", "MCP"]): Tool integration type - REST for individual endpoints, MCP for gateway-discovered tools.
347 request_type (Literal["GET", "POST", "PUT", "DELETE", "PATCH"]): HTTP method to be used for invoking the tool.
348 headers (Optional[Dict[str, str]]): Additional headers to send when invoking the tool.
349 input_schema (Optional[Dict[str, Any]]): JSON Schema for validating tool parameters. Alias 'inputSchema'.
350 output_schema (Optional[Dict[str, Any]]): JSON Schema for validating tool output. Alias 'outputSchema'.
351 annotations (Optional[Dict[str, Any]]): Tool annotations for behavior hints such as title, readOnlyHint, destructiveHint, idempotentHint, openWorldHint.
352 jsonpath_filter (Optional[str]): JSON modification filter.
353 auth (Optional[AuthenticationValues]): Authentication credentials (Basic or Bearer Token or custom headers) if required.
354 gateway_id (Optional[str]): ID of the gateway for the tool.
355 """
357 model_config = ConfigDict(str_strip_whitespace=True, populate_by_name=True)
358 allow_auto: bool = False # Internal flag to allow system-initiated A2A tool creation
360 name: str = Field(..., description="Unique name for the tool")
361 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
362 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
363 description: Optional[str] = Field(None, description="Tool description")
364 integration_type: Literal["REST", "MCP", "A2A"] = Field("REST", description="'REST' for individual endpoints, 'MCP' for gateway-discovered tools, 'A2A' for A2A agents")
365 request_type: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "SSE", "STDIO", "STREAMABLEHTTP"] = Field("SSE", description="HTTP method to be used for invoking the tool")
366 headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
367 input_schema: Optional[Dict[str, Any]] = Field(default_factory=lambda: {"type": "object", "properties": {}}, description="JSON Schema for validating tool parameters", alias="inputSchema")
368 output_schema: Optional[Dict[str, Any]] = Field(default=None, description="JSON Schema for validating tool output", alias="outputSchema")
369 annotations: Optional[Dict[str, Any]] = Field(
370 default_factory=dict,
371 description="Tool annotations for behavior hints (title, readOnlyHint, destructiveHint, idempotentHint, openWorldHint)",
372 )
373 jsonpath_filter: Optional[str] = Field(default="", description="JSON modification filter")
374 auth: Optional[AuthenticationValues] = Field(None, description="Authentication credentials (Basic or Bearer Token or custom headers) if required")
375 gateway_id: Optional[str] = Field(None, description="id of gateway for the tool")
376 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the tool")
378 # Team scoping fields
379 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
380 owner_email: Optional[str] = Field(None, description="Email of the tool owner")
381 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
383 # Passthrough REST fields
384 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
385 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
386 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
387 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
388 timeout_ms: Optional[int] = Field(default=None, description="Timeout in milliseconds for REST passthrough (20000 if integration_type='REST', else None)")
389 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
390 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
391 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
392 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
394 @field_validator("tags")
395 @classmethod
396 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
397 """Validate and normalize tags.
399 Args:
400 v: Optional list of tag strings to validate
402 Returns:
403 List of validated tag strings
404 """
405 return validate_tags_field(v)
407 @field_validator("name")
408 @classmethod
409 def validate_name(cls, v: str) -> str:
410 """Ensure tool names follow MCP naming conventions
412 Args:
413 v (str): Value to validate
415 Returns:
416 str: Value if validated as safe
418 Raises:
419 ValueError: When displayName contains unsafe content or exceeds length limits
421 Examples:
422 >>> from mcpgateway.schemas import ToolCreate
423 >>> ToolCreate.validate_name('valid_tool')
424 'valid_tool'
425 >>> ToolCreate.validate_name('Invalid Tool!')
426 Traceback (most recent call last):
427 ...
428 ValueError: ...
429 """
430 return SecurityValidator.validate_tool_name(v)
432 @field_validator("url")
433 @classmethod
434 def validate_url(cls, v: Optional[str]) -> Optional[str]:
435 """Validate URL format and ensure safe display
437 Args:
438 v (Optional[str]): Value to validate
440 Returns:
441 Optional[str]: Value if validated as safe
443 Raises:
444 ValueError: When displayName contains unsafe content or exceeds length limits
446 Examples:
447 >>> from mcpgateway.schemas import ToolCreate
448 >>> ToolCreate.validate_url('https://example.com')
449 'https://example.com'
450 >>> ToolCreate.validate_url('ftp://example.com')
451 Traceback (most recent call last):
452 ...
453 ValueError: ...
454 """
455 if v is None:
456 return v
457 return validate_core_url(v, "Tool URL")
459 @field_validator("description")
460 @classmethod
461 def validate_description(cls, v: Optional[str]) -> Optional[str]:
462 """Ensure descriptions display safely, truncate if too long
464 Args:
465 v (str): Value to validate
467 Returns:
468 str: Value if validated as safe and truncated if too long
470 Raises:
471 ValueError: When value is unsafe
473 Examples:
474 >>> from mcpgateway.schemas import ToolCreate
475 >>> ToolCreate.validate_description('A safe description')
476 'A safe description'
477 >>> ToolCreate.validate_description(None) # Test None case
478 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
479 >>> truncated = ToolCreate.validate_description(long_desc)
480 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
481 0
482 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
483 True
484 """
485 if v is None:
486 return v
488 # Note: backticks (`) are allowed as they are commonly used in Markdown
489 # for inline code examples in tool descriptions
490 forbidden_patterns = ["&&", ";", "||", "$(", "|", "> ", "< "]
491 for pat in forbidden_patterns:
492 if pat in v:
493 raise ValueError(f"Description contains unsafe characters: '{pat}'")
495 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
496 # Truncate the description to the maximum allowed length
497 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
498 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
499 return SecurityValidator.sanitize_display_text(truncated, "Description")
500 return SecurityValidator.sanitize_display_text(v, "Description")
502 @field_validator("displayName")
503 @classmethod
504 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
505 """Ensure display names display safely
507 Args:
508 v (str): Value to validate
510 Returns:
511 str: Value if validated as safe
513 Raises:
514 ValueError: When displayName contains unsafe content or exceeds length limits
516 Examples:
517 >>> from mcpgateway.schemas import ToolCreate
518 >>> ToolCreate.validate_display_name('My Custom Tool')
519 'My Custom Tool'
520 >>> ToolCreate.validate_display_name('<script>alert("xss")</script>')
521 Traceback (most recent call last):
522 ...
523 ValueError: ...
524 """
525 if v is None:
526 return v
527 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
528 raise ValueError(f"Display name exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
529 return SecurityValidator.sanitize_display_text(v, "Display name")
531 @field_validator("headers", "input_schema", "annotations")
532 @classmethod
533 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
534 """Validate JSON structure depth
536 Args:
537 v (dict): Value to validate
539 Returns:
540 dict: Value if validated as safe
542 Examples:
543 >>> from mcpgateway.schemas import ToolCreate
544 >>> ToolCreate.validate_json_fields({'a': 1})
545 {'a': 1}
546 >>> # Test depth within limit (11 levels, default limit is 30)
547 >>> ToolCreate.validate_json_fields({'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': {'k': 1}}}}}}}}}}})
548 {'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': {'k': 1}}}}}}}}}}}
549 >>> # Test exceeding depth limit (31 levels)
550 >>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'too deep'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
551 >>> ToolCreate.validate_json_fields(deep_31)
552 Traceback (most recent call last):
553 ...
554 ValueError: ...
555 """
556 SecurityValidator.validate_json_depth(v)
557 return v
559 @field_validator("request_type")
560 @classmethod
561 def validate_request_type(cls, v: str, info: ValidationInfo) -> str:
562 """Validate request type based on integration type (REST, MCP, A2A)
564 Args:
565 v (str): Value to validate
566 info (ValidationInfo): Values used for validation
568 Returns:
569 str: Value if validated as safe
571 Raises:
572 ValueError: When value is unsafe
574 Examples:
575 >>> from pydantic import ValidationInfo
576 >>> # REST integration types with valid methods
577 >>> info_rest = type('obj', (object,), {'data': {'integration_type': 'REST'}})
578 >>> ToolCreate.validate_request_type('POST', info_rest)
579 'POST'
580 >>> ToolCreate.validate_request_type('GET', info_rest)
581 'GET'
582 >>> # MCP integration types with valid transports
583 >>> info_mcp = type('obj', (object,), {'data': {'integration_type': 'MCP'}})
584 >>> ToolCreate.validate_request_type('SSE', info_mcp)
585 'SSE'
586 >>> ToolCreate.validate_request_type('STDIO', info_mcp)
587 'STDIO'
588 >>> # A2A integration type with valid method
589 >>> info_a2a = type('obj', (object,), {'data': {'integration_type': 'A2A'}})
590 >>> ToolCreate.validate_request_type('POST', info_a2a)
591 'POST'
592 >>> # Invalid REST type
593 >>> try:
594 ... ToolCreate.validate_request_type('SSE', info_rest)
595 ... except ValueError as e:
596 ... "not allowed for REST" in str(e)
597 True
598 >>> # Invalid MCP type
599 >>> try:
600 ... ToolCreate.validate_request_type('POST', info_mcp)
601 ... except ValueError as e:
602 ... "not allowed for MCP" in str(e)
603 True
604 >>> # Invalid A2A type
605 >>> try:
606 ... ToolCreate.validate_request_type('GET', info_a2a)
607 ... except ValueError as e:
608 ... "not allowed for A2A" in str(e)
609 True
610 >>> # Invalid integration type
611 >>> info_invalid = type('obj', (object,), {'data': {'integration_type': 'INVALID'}})
612 >>> try:
613 ... ToolCreate.validate_request_type('GET', info_invalid)
614 ... except ValueError as e:
615 ... "Unknown integration type" in str(e)
616 True
617 """
619 integration_type = info.data.get("integration_type")
621 if integration_type not in ["REST", "MCP", "A2A"]:
622 raise ValueError(f"Unknown integration type: {integration_type}")
624 if integration_type == "REST":
625 allowed = ["GET", "POST", "PUT", "DELETE", "PATCH"]
626 if v not in allowed:
627 raise ValueError(f"Request type '{v}' not allowed for REST. Only {allowed} methods are accepted.")
628 elif integration_type == "MCP":
629 allowed = ["SSE", "STDIO", "STREAMABLEHTTP"]
630 if v not in allowed:
631 raise ValueError(f"Request type '{v}' not allowed for MCP. Only {allowed} transports are accepted.")
632 elif integration_type == "A2A":
633 allowed = ["POST"]
634 if v not in allowed:
635 raise ValueError(f"Request type '{v}' not allowed for A2A. Only {allowed} methods are accepted.")
636 return v
638 @model_validator(mode="before")
639 @classmethod
640 def assemble_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
641 """
642 Assemble authentication information from separate keys if provided.
644 Looks for keys "auth_type", "auth_username", "auth_password", "auth_token", "auth_header_key" and "auth_header_value".
645 Constructs the "auth" field as a dictionary suitable for BasicAuth or BearerTokenAuth or HeadersAuth.
647 Args:
648 values: Dict with authentication information
650 Returns:
651 Dict: Reformatedd values dict
653 Examples:
654 >>> # Test basic auth
655 >>> values = {'auth_type': 'basic', 'auth_username': 'user', 'auth_password': 'pass'}
656 >>> result = ToolCreate.assemble_auth(values)
657 >>> 'auth' in result
658 True
659 >>> result['auth']['auth_type']
660 'basic'
662 >>> # Test bearer auth
663 >>> values = {'auth_type': 'bearer', 'auth_token': 'mytoken'}
664 >>> result = ToolCreate.assemble_auth(values)
665 >>> result['auth']['auth_type']
666 'bearer'
668 >>> # Test authheaders
669 >>> values = {'auth_type': 'authheaders', 'auth_header_key': 'X-API-Key', 'auth_header_value': 'secret'}
670 >>> result = ToolCreate.assemble_auth(values)
671 >>> result['auth']['auth_type']
672 'authheaders'
674 >>> # Test no auth type
675 >>> values = {'name': 'test'}
676 >>> result = ToolCreate.assemble_auth(values)
677 >>> 'auth' in result
678 False
679 """
680 logger.debug(
681 "Assembling auth in ToolCreate with raw values",
682 extra={
683 "auth_type": values.get("auth_type"),
684 "auth_username": values.get("auth_username"),
685 "auth_header_key": values.get("auth_header_key"),
686 "auth_assembled": bool(values.get("auth_type") and str(values.get("auth_type")).lower() != "one_time_auth"),
687 },
688 )
690 auth_type = values.get("auth_type")
691 if auth_type and auth_type.lower() != "one_time_auth":
692 if auth_type.lower() == "basic":
693 creds = base64.b64encode(f"{values.get('auth_username', '')}:{values.get('auth_password', '')}".encode("utf-8")).decode()
694 encoded_auth = encode_auth({"Authorization": f"Basic {creds}"})
695 values["auth"] = {"auth_type": "basic", "auth_value": encoded_auth}
696 elif auth_type.lower() == "bearer":
697 encoded_auth = encode_auth({"Authorization": f"Bearer {values.get('auth_token', '')}"})
698 values["auth"] = {"auth_type": "bearer", "auth_value": encoded_auth}
699 elif auth_type.lower() == "authheaders":
700 header_key = values.get("auth_header_key", "")
701 header_value = values.get("auth_header_value", "")
702 if header_key and header_value:
703 encoded_auth = encode_auth({header_key: header_value})
704 values["auth"] = {"auth_type": "authheaders", "auth_value": encoded_auth}
705 else:
706 # Don't encode empty headers - leave auth empty
707 values["auth"] = {"auth_type": "authheaders", "auth_value": None}
708 return values
710 @model_validator(mode="before")
711 @classmethod
712 def prevent_manual_mcp_creation(cls, values: Dict[str, Any]) -> Dict[str, Any]:
713 """
714 Prevent manual creation of MCP tools via API.
716 MCP tools should only be created by the gateway service when discovering
717 tools from MCP servers. Users should add MCP servers via the Gateways interface.
719 Args:
720 values: The input values
722 Returns:
723 Dict[str, Any]: The validated values
725 Raises:
726 ValueError: If attempting to manually create MCP integration type
727 """
728 integration_type = values.get("integration_type")
729 allow_auto = values.get("allow_auto", False)
730 if integration_type == "MCP":
731 raise ValueError("Cannot manually create MCP tools. Add MCP servers via the Gateways interface - tools will be auto-discovered and registered with integration_type='MCP'.")
732 if integration_type == "A2A" and not allow_auto:
733 raise ValueError("Cannot manually create A2A tools. Add A2A agents via the A2A interface - tools will be auto-created when agents are associated with servers.")
734 return values
736 @model_validator(mode="before")
737 @classmethod
738 def enforce_passthrough_fields_for_rest(cls, values: Dict[str, Any]) -> Dict[str, Any]:
739 """
740 Enforce that passthrough REST fields are only set for integration_type 'REST'.
741 If any passthrough field is set for non-REST, raise ValueError.
743 Args:
744 values (Dict[str, Any]): The input values to validate.
746 Returns:
747 Dict[str, Any]: The validated values.
749 Raises:
750 ValueError: If passthrough fields are set for non-REST integration_type.
751 """
752 passthrough_fields = ["base_url", "path_template", "query_mapping", "header_mapping", "timeout_ms", "expose_passthrough", "allowlist", "plugin_chain_pre", "plugin_chain_post"]
753 integration_type = values.get("integration_type")
754 if integration_type != "REST":
755 for field in passthrough_fields:
756 if field in values and values[field] not in (None, [], {}):
757 raise ValueError(f"Field '{field}' is only allowed for integration_type 'REST'.")
758 return values
760 @model_validator(mode="before")
761 @classmethod
762 def extract_base_url_and_path_template(cls, values: dict) -> dict:
763 """
764 Only for integration_type 'REST':
765 If 'url' is provided, extract 'base_url' and 'path_template'.
766 Ensures path_template starts with a single '/'.
768 Args:
769 values (dict): The input values to process.
771 Returns:
772 dict: The updated values with base_url and path_template if applicable.
773 """
774 integration_type = values.get("integration_type")
775 if integration_type != "REST":
776 # Only process for REST, skip for others
777 return values
778 url = values.get("url")
779 if url:
780 parsed = urlparse(str(url))
781 base_url = f"{parsed.scheme}://{parsed.netloc}"
782 path_template = parsed.path
783 # Ensure path_template starts with a single '/'
784 if path_template:
785 path_template = "/" + path_template.lstrip("/")
786 if not values.get("base_url"):
787 values["base_url"] = base_url
788 if not values.get("path_template"):
789 values["path_template"] = path_template
790 return values
792 @field_validator("base_url")
793 @classmethod
794 def validate_base_url(cls, v):
795 """
796 Validate that base_url is a valid URL with scheme and netloc.
798 Args:
799 v (str): The base_url value to validate.
801 Returns:
802 str: The validated base_url value.
804 Raises:
805 ValueError: If base_url is not a valid URL.
806 """
807 if v is None:
808 return v
809 parsed = urlparse(str(v))
810 if not parsed.scheme or not parsed.netloc:
811 raise ValueError("base_url must be a valid URL with scheme and netloc")
812 return v
814 @field_validator("path_template")
815 @classmethod
816 def validate_path_template(cls, v):
817 """
818 Validate that path_template starts with '/'.
820 Args:
821 v (str): The path_template value to validate.
823 Returns:
824 str: The validated path_template value.
826 Raises:
827 ValueError: If path_template does not start with '/'.
828 """
829 if v and not str(v).startswith("/"):
830 raise ValueError("path_template must start with '/'")
831 return v
833 @field_validator("timeout_ms")
834 @classmethod
835 def validate_timeout_ms(cls, v):
836 """
837 Validate that timeout_ms is a positive integer.
839 Args:
840 v (int): The timeout_ms value to validate.
842 Returns:
843 int: The validated timeout_ms value.
845 Raises:
846 ValueError: If timeout_ms is not a positive integer.
847 """
848 if v is not None and v <= 0:
849 raise ValueError("timeout_ms must be a positive integer")
850 return v
852 @field_validator("allowlist")
853 @classmethod
854 def validate_allowlist(cls, v):
855 """
856 Validate that allowlist is a list and each entry is a valid host or scheme string.
858 Args:
859 v (List[str]): The allowlist to validate.
861 Returns:
862 List[str]: The validated allowlist.
864 Raises:
865 ValueError: If allowlist is not a list or any entry is not a valid host/scheme string.
866 """
867 if v is None:
868 return None
869 if not isinstance(v, list):
870 raise ValueError("allowlist must be a list of host/scheme strings")
871 # Uses precompiled regex for hostname validation
872 for host in v:
873 if not isinstance(host, str):
874 raise ValueError(f"Invalid type in allowlist: {host} (must be str)")
875 if not _HOSTNAME_RE.match(host):
876 raise ValueError(f"Invalid host/scheme in allowlist: {host}")
877 return v
879 @field_validator("plugin_chain_pre", "plugin_chain_post")
880 @classmethod
881 def validate_plugin_chain(cls, v):
882 """
883 Validate that each plugin in the chain is allowed.
885 Args:
886 v (List[str]): The plugin chain to validate.
888 Returns:
889 List[str]: The validated plugin chain.
891 Raises:
892 ValueError: If any plugin is not in the allowed set.
893 """
894 allowed_plugins = {"deny_filter", "rate_limit", "pii_filter", "response_shape", "regex_filter", "resource_filter"}
895 if v is not None:
896 for plugin in v:
897 if plugin not in allowed_plugins:
898 raise ValueError(f"Unknown plugin: {plugin}")
899 return v
901 @model_validator(mode="after")
902 def handle_timeout_ms_defaults(self):
903 """Handle timeout_ms defaults based on integration_type and expose_passthrough.
905 Returns:
906 self: The validated model instance with timeout_ms potentially set to default.
907 """
908 # If timeout_ms is None and we have REST with passthrough, set default
909 if self.timeout_ms is None and self.integration_type == "REST" and getattr(self, "expose_passthrough", True):
910 self.timeout_ms = 20000
911 return self
914class ToolUpdate(BaseModelWithConfigDict):
915 """Schema for updating an existing tool.
917 Similar to ToolCreate but all fields are optional to allow partial updates.
918 """
920 name: Optional[str] = Field(None, description="Unique name for the tool")
921 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
922 custom_name: Optional[str] = Field(None, description="Custom name for the tool")
923 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
924 description: Optional[str] = Field(None, description="Tool description")
925 integration_type: Optional[Literal["REST", "MCP", "A2A"]] = Field(None, description="Tool integration type")
926 request_type: Optional[Literal["GET", "POST", "PUT", "DELETE", "PATCH"]] = Field(None, description="HTTP method to be used for invoking the tool")
927 headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
928 input_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool parameters")
929 output_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool output")
930 annotations: Optional[Dict[str, Any]] = Field(None, description="Tool annotations for behavior hints")
931 jsonpath_filter: Optional[str] = Field(None, description="JSON path filter for rpc tool calls")
932 auth: Optional[AuthenticationValues] = Field(None, description="Authentication credentials (Basic or Bearer Token or custom headers) if required")
933 gateway_id: Optional[str] = Field(None, description="id of gateway for the tool")
934 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the tool")
935 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
937 # Passthrough REST fields
938 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
939 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
940 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
941 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
942 timeout_ms: Optional[int] = Field(default=None, description="Timeout in milliseconds for REST passthrough (20000 if integration_type='REST', else None)")
943 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
944 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
945 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
946 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
948 @field_validator("tags")
949 @classmethod
950 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
951 """Validate and normalize tags.
953 Args:
954 v: Optional list of tag strings to validate
956 Returns:
957 List of validated tag strings or None if input is None
958 """
959 return validate_tags_field(v)
961 @field_validator("name")
962 @classmethod
963 def validate_name(cls, v: str) -> str:
964 """Ensure tool names follow MCP naming conventions
966 Args:
967 v (str): Value to validate
969 Returns:
970 str: Value if validated as safe
971 """
972 return SecurityValidator.validate_tool_name(v)
974 @field_validator("custom_name")
975 @classmethod
976 def validate_custom_name(cls, v: str) -> str:
977 """Ensure custom tool names follow MCP naming conventions
979 Args:
980 v (str): Value to validate
982 Returns:
983 str: Value if validated as safe
984 """
985 return SecurityValidator.validate_tool_name(v)
987 @field_validator("url")
988 @classmethod
989 def validate_url(cls, v: Optional[str]) -> Optional[str]:
990 """Validate URL format and ensure safe display
992 Args:
993 v (Optional[str]): Value to validate
995 Returns:
996 Optional[str]: Value if validated as safe
997 """
998 if v is None:
999 return v
1000 return validate_core_url(v, "Tool URL")
1002 @field_validator("description")
1003 @classmethod
1004 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1005 """Ensure descriptions display safely
1007 Args:
1008 v (str): Value to validate
1010 Returns:
1011 str: Value if validated as safe
1013 Raises:
1014 ValueError: When value is unsafe
1016 Examples:
1017 >>> from mcpgateway.schemas import ToolUpdate
1018 >>> ToolUpdate.validate_description('A safe description')
1019 'A safe description'
1020 >>> ToolUpdate.validate_description(None) # Test None case
1021 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1022 >>> truncated = ToolUpdate.validate_description(long_desc)
1023 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1024 0
1025 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1026 True
1027 """
1028 if v is None:
1029 return v
1030 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1031 # Truncate the description to the maximum allowed length
1032 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1033 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1034 return SecurityValidator.sanitize_display_text(truncated, "Description")
1035 return SecurityValidator.sanitize_display_text(v, "Description")
1037 @field_validator("headers", "input_schema", "annotations")
1038 @classmethod
1039 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
1040 """Validate JSON structure depth
1042 Args:
1043 v (dict): Value to validate
1045 Returns:
1046 dict: Value if validated as safe
1047 """
1048 SecurityValidator.validate_json_depth(v)
1049 return v
1051 @field_validator("request_type")
1052 @classmethod
1053 def validate_request_type(cls, v: str, info: ValidationInfo) -> str:
1054 """Validate request type based on integration type
1056 Args:
1057 v (str): Value to validate
1058 info (ValidationInfo): Validation context with other field values
1060 Returns:
1061 str: Value if validated as safe
1063 Raises:
1064 ValueError: When value is unsafe
1065 """
1067 integration_type = info.data.get("integration_type", "REST")
1069 if integration_type == "REST":
1070 allowed = ["GET", "POST", "PUT", "DELETE", "PATCH"]
1071 elif integration_type == "MCP":
1072 allowed = ["SSE", "STDIO", "STREAMABLEHTTP"]
1073 elif integration_type == "A2A":
1074 allowed = ["POST"] # A2A agents typically use POST
1075 else:
1076 raise ValueError(f"Unknown integration type: {integration_type}")
1078 if v not in allowed:
1079 raise ValueError(f"Request type '{v}' not allowed for {integration_type} integration")
1080 return v
1082 @model_validator(mode="before")
1083 @classmethod
1084 def assemble_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
1085 """
1086 Assemble authentication information from separate keys if provided.
1088 Looks for keys "auth_type", "auth_username", "auth_password", "auth_token", "auth_header_key" and "auth_header_value".
1089 Constructs the "auth" field as a dictionary suitable for BasicAuth or BearerTokenAuth or HeadersAuth.
1091 Args:
1092 values: Dict with authentication information
1094 Returns:
1095 Dict: Reformatedd values dict
1096 """
1097 logger.debug(
1098 "Assembling auth in ToolCreate with raw values",
1099 extra={
1100 "auth_type": values.get("auth_type"),
1101 "auth_username": values.get("auth_username"),
1102 "auth_header_key": values.get("auth_header_key"),
1103 "auth_assembled": bool(values.get("auth_type") and str(values.get("auth_type")).lower() != "one_time_auth"),
1104 },
1105 )
1107 auth_type = values.get("auth_type")
1108 if auth_type and auth_type.lower() != "one_time_auth":
1109 if auth_type.lower() == "basic":
1110 creds = base64.b64encode(f"{values.get('auth_username', '')}:{values.get('auth_password', '')}".encode("utf-8")).decode()
1111 encoded_auth = encode_auth({"Authorization": f"Basic {creds}"})
1112 values["auth"] = {"auth_type": "basic", "auth_value": encoded_auth}
1113 elif auth_type.lower() == "bearer":
1114 encoded_auth = encode_auth({"Authorization": f"Bearer {values.get('auth_token', '')}"})
1115 values["auth"] = {"auth_type": "bearer", "auth_value": encoded_auth}
1116 elif auth_type.lower() == "authheaders":
1117 header_key = values.get("auth_header_key", "")
1118 header_value = values.get("auth_header_value", "")
1119 if header_key and header_value:
1120 encoded_auth = encode_auth({header_key: header_value})
1121 values["auth"] = {"auth_type": "authheaders", "auth_value": encoded_auth}
1122 else:
1123 # Don't encode empty headers - leave auth empty
1124 values["auth"] = {"auth_type": "authheaders", "auth_value": None}
1125 return values
1127 @field_validator("displayName")
1128 @classmethod
1129 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
1130 """Ensure display names display safely
1132 Args:
1133 v (str): Value to validate
1135 Returns:
1136 str: Value if validated as safe
1138 Raises:
1139 ValueError: When displayName contains unsafe content or exceeds length limits
1141 Examples:
1142 >>> from mcpgateway.schemas import ToolUpdate
1143 >>> ToolUpdate.validate_display_name('My Custom Tool')
1144 'My Custom Tool'
1145 >>> ToolUpdate.validate_display_name('<script>alert("xss")</script>')
1146 Traceback (most recent call last):
1147 ...
1148 ValueError: ...
1149 """
1150 if v is None:
1151 return v
1152 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
1153 raise ValueError(f"Display name exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
1154 return SecurityValidator.sanitize_display_text(v, "Display name")
1156 @model_validator(mode="before")
1157 @classmethod
1158 def prevent_manual_mcp_update(cls, values: Dict[str, Any]) -> Dict[str, Any]:
1159 """
1160 Prevent updating tools to MCP integration type via API.
1162 MCP tools should only be managed by the gateway service. Users should not
1163 be able to change a REST tool to MCP type or vice versa manually.
1165 Args:
1166 values: The input values
1168 Returns:
1169 Dict[str, Any]: The validated values
1171 Raises:
1172 ValueError: If attempting to update to MCP integration type
1173 """
1174 integration_type = values.get("integration_type")
1175 if integration_type == "MCP":
1176 raise ValueError("Cannot update tools to MCP integration type. MCP tools are managed by the gateway service.")
1177 if integration_type == "A2A":
1178 raise ValueError("Cannot update tools to A2A integration type. A2A tools are managed by the A2A service.")
1179 return values
1181 @model_validator(mode="before")
1182 @classmethod
1183 def extract_base_url_and_path_template(cls, values: dict) -> dict:
1184 """
1185 If 'integration_type' is 'REST' and 'url' is provided, extract 'base_url' and 'path_template'.
1186 Ensures path_template starts with a single '/'.
1188 Args:
1189 values (dict): The input values to process.
1191 Returns:
1192 dict: The updated values with base_url and path_template if applicable.
1193 """
1194 integration_type = values.get("integration_type")
1195 url = values.get("url")
1196 if integration_type == "REST" and url:
1197 parsed = urlparse(str(url))
1198 base_url = f"{parsed.scheme}://{parsed.netloc}"
1199 path_template = parsed.path
1200 # Ensure path_template starts with a single '/'
1201 if path_template:
1202 path_template = "/" + path_template.lstrip("/")
1203 if not values.get("base_url"):
1204 values["base_url"] = base_url
1205 if not values.get("path_template"):
1206 values["path_template"] = path_template
1207 return values
1209 @field_validator("base_url")
1210 @classmethod
1211 def validate_base_url(cls, v):
1212 """
1213 Validate that base_url is a valid URL with scheme and netloc.
1215 Args:
1216 v (str): The base_url value to validate.
1218 Returns:
1219 str: The validated base_url value.
1221 Raises:
1222 ValueError: If base_url is not a valid URL.
1223 """
1224 if v is None:
1225 return v
1226 parsed = urlparse(str(v))
1227 if not parsed.scheme or not parsed.netloc:
1228 raise ValueError("base_url must be a valid URL with scheme and netloc")
1229 return v
1231 @field_validator("path_template")
1232 @classmethod
1233 def validate_path_template(cls, v):
1234 """
1235 Validate that path_template starts with '/'.
1237 Args:
1238 v (str): The path_template value to validate.
1240 Returns:
1241 str: The validated path_template value.
1243 Raises:
1244 ValueError: If path_template does not start with '/'.
1245 """
1246 if v and not str(v).startswith("/"):
1247 raise ValueError("path_template must start with '/'")
1248 return v
1250 @field_validator("timeout_ms")
1251 @classmethod
1252 def validate_timeout_ms(cls, v):
1253 """
1254 Validate that timeout_ms is a positive integer.
1256 Args:
1257 v (int): The timeout_ms value to validate.
1259 Returns:
1260 int: The validated timeout_ms value.
1262 Raises:
1263 ValueError: If timeout_ms is not a positive integer.
1264 """
1265 if v is not None and v <= 0:
1266 raise ValueError("timeout_ms must be a positive integer")
1267 return v
1269 @field_validator("allowlist")
1270 @classmethod
1271 def validate_allowlist(cls, v):
1272 """
1273 Validate that allowlist is a list and each entry is a valid host or scheme string.
1275 Args:
1276 v (List[str]): The allowlist to validate.
1278 Returns:
1279 List[str]: The validated allowlist.
1281 Raises:
1282 ValueError: If allowlist is not a list or any entry is not a valid host/scheme string.
1283 """
1284 if v is None:
1285 return None
1286 if not isinstance(v, list):
1287 raise ValueError("allowlist must be a list of host/scheme strings")
1288 # Uses precompiled regex for hostname validation
1289 for host in v:
1290 if not isinstance(host, str):
1291 raise ValueError(f"Invalid type in allowlist: {host} (must be str)")
1292 if not _HOSTNAME_RE.match(host):
1293 raise ValueError(f"Invalid host/scheme in allowlist: {host}")
1294 return v
1296 @field_validator("plugin_chain_pre", "plugin_chain_post")
1297 @classmethod
1298 def validate_plugin_chain(cls, v):
1299 """
1300 Validate that each plugin in the chain is allowed.
1302 Args:
1303 v (List[str]): The plugin chain to validate.
1305 Returns:
1306 List[str]: The validated plugin chain.
1308 Raises:
1309 ValueError: If any plugin is not in the allowed set.
1310 """
1311 allowed_plugins = {"deny_filter", "rate_limit", "pii_filter", "response_shape", "regex_filter", "resource_filter"}
1312 if v is not None:
1313 for plugin in v:
1314 if plugin not in allowed_plugins:
1315 raise ValueError(f"Unknown plugin: {plugin}")
1316 return v
1319class ToolRead(BaseModelWithConfigDict):
1320 """Schema for reading tool information.
1322 Includes all tool fields plus:
1323 - Database ID
1324 - Creation/update timestamps
1325 - enabled: If Tool is enabled or disabled.
1326 - reachable: If Tool is reachable or not.
1327 - Gateway ID for federation
1328 - Execution count indicating the number of times the tool has been executed.
1329 - Metrics: Aggregated metrics for the tool invocations.
1330 - Request type and authentication settings.
1331 """
1333 id: str
1334 original_name: str
1335 url: Optional[str]
1336 description: Optional[str]
1337 original_description: Optional[str] = None
1338 request_type: str
1339 integration_type: str
1340 headers: Optional[Dict[str, str]]
1341 input_schema: Dict[str, Any]
1342 output_schema: Optional[Dict[str, Any]] = Field(None)
1343 annotations: Optional[Dict[str, Any]]
1344 jsonpath_filter: Optional[str]
1345 auth: Optional[AuthenticationValues]
1346 created_at: datetime
1347 updated_at: datetime
1348 enabled: bool
1349 reachable: bool
1350 gateway_id: Optional[str]
1351 execution_count: Optional[int] = Field(None)
1352 metrics: Optional[ToolMetrics] = Field(None)
1353 name: str
1354 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
1355 gateway_slug: str
1356 custom_name: str
1357 custom_name_slug: str
1358 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the tool")
1360 # Comprehensive metadata for audit tracking
1361 created_by: Optional[str] = Field(None, description="Username who created this entity")
1362 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
1363 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
1364 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
1366 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
1367 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
1368 modified_via: Optional[str] = Field(None, description="Modification method")
1369 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
1371 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
1372 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
1373 version: Optional[int] = Field(1, description="Entity version for change tracking")
1375 # Team scoping fields
1376 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
1377 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
1378 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
1379 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
1381 # Passthrough REST fields
1382 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
1383 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
1384 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
1385 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
1386 timeout_ms: Optional[int] = Field(20000, description="Timeout in milliseconds for REST passthrough")
1387 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
1388 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
1389 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
1390 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
1392 # MCP protocol extension field
1393 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
1396class ToolInvocation(BaseModelWithConfigDict):
1397 """Schema for tool invocation requests.
1399 This schema validates tool invocation requests to ensure they follow MCP
1400 (Model Context Protocol) naming conventions and prevent security vulnerabilities
1401 such as XSS attacks or deeply nested payloads that could cause DoS.
1403 Captures:
1404 - Tool name to invoke (validated for safety and MCP compliance)
1405 - Arguments matching tool's input schema (validated for depth limits)
1407 Validation Rules:
1408 - Tool names must start with a letter, number, or underscore and contain only
1409 letters, numbers, periods, underscores, hyphens, and slashes (per SEP-986)
1410 - Tool names cannot contain HTML special characters (<, >, ", ')
1411 - Arguments are validated to prevent excessively deep nesting (default max: 10 levels)
1413 Attributes:
1414 name (str): Name of the tool to invoke. Must follow MCP naming conventions.
1415 arguments (Dict[str, Any]): Arguments to pass to the tool. Must match the
1416 tool's input schema and not exceed depth limits.
1418 Examples:
1419 >>> from pydantic import ValidationError
1420 >>> # Valid tool invocation
1421 >>> tool_inv = ToolInvocation(name="get_weather", arguments={"city": "London"})
1422 >>> tool_inv.name
1423 'get_weather'
1424 >>> tool_inv.arguments
1425 {'city': 'London'}
1427 >>> # Valid tool name with underscores and numbers
1428 >>> tool_inv = ToolInvocation(name="tool_v2_beta", arguments={})
1429 >>> tool_inv.name
1430 'tool_v2_beta'
1432 >>> # Invalid: Tool name with special characters
1433 >>> try:
1434 ... ToolInvocation(name="tool-name!", arguments={})
1435 ... except ValidationError as e:
1436 ... print("Validation failed: Special characters not allowed")
1437 Validation failed: Special characters not allowed
1439 >>> # Invalid: XSS attempt in tool name
1440 >>> try:
1441 ... ToolInvocation(name="<script>alert('XSS')</script>", arguments={})
1442 ... except ValidationError as e:
1443 ... print("Validation failed: HTML tags not allowed")
1444 Validation failed: HTML tags not allowed
1446 >>> # Valid: Tool name starting with number (per MCP spec)
1447 >>> tool_num = ToolInvocation(name="123_tool", arguments={})
1448 >>> tool_num.name
1449 '123_tool'
1451 >>> # Valid: Tool name starting with underscore (per MCP spec)
1452 >>> tool_underscore = ToolInvocation(name="_5gpt_query", arguments={})
1453 >>> tool_underscore.name
1454 '_5gpt_query'
1456 >>> # Invalid: Tool name starting with hyphen
1457 >>> try:
1458 ... ToolInvocation(name="-invalid_tool", arguments={})
1459 ... except ValidationError as e:
1460 ... print("Validation failed: Must start with letter, number, or underscore")
1461 Validation failed: Must start with letter, number, or underscore
1463 >>> # Valid: Complex but not too deep arguments
1464 >>> args = {"level1": {"level2": {"level3": {"data": "value"}}}}
1465 >>> tool_inv = ToolInvocation(name="process_data", arguments=args)
1466 >>> tool_inv.arguments["level1"]["level2"]["level3"]["data"]
1467 'value'
1469 >>> # Invalid: Arguments too deeply nested (>30 levels)
1470 >>> deep_args = {"a": {"b": {"c": {"d": {"e": {"f": {"g": {"h": {"i": {"j": {"k": {"l": {"m": {"n": {"o": {"p": {"q": {"r": {"s": {"t": {"u": {"v": {"w": {"x": {"y": {"z": {"aa": {"bb": {"cc": {"dd": {"ee": "too deep"}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
1471 >>> try:
1472 ... ToolInvocation(name="process_data", arguments=deep_args)
1473 ... except ValidationError as e:
1474 ... print("Validation failed: Exceeds maximum depth")
1475 Validation failed: Exceeds maximum depth
1477 >>> # Edge case: Empty tool name
1478 >>> try:
1479 ... ToolInvocation(name="", arguments={})
1480 ... except ValidationError as e:
1481 ... print("Validation failed: Name cannot be empty")
1482 Validation failed: Name cannot be empty
1484 >>> # Valid: Tool name with hyphen (but not starting/ending)
1485 >>> tool_inv = ToolInvocation(name="get_user_info", arguments={"id": 123})
1486 >>> tool_inv.name
1487 'get_user_info'
1489 >>> # Arguments with various types
1490 >>> args = {
1491 ... "string": "value",
1492 ... "number": 42,
1493 ... "boolean": True,
1494 ... "array": [1, 2, 3],
1495 ... "nested": {"key": "value"}
1496 ... }
1497 >>> tool_inv = ToolInvocation(name="complex_tool", arguments=args)
1498 >>> tool_inv.arguments["number"]
1499 42
1500 """
1502 name: str = Field(..., description="Name of tool to invoke")
1503 arguments: Dict[str, Any] = Field(default_factory=dict, description="Arguments matching tool's input schema")
1505 @field_validator("name")
1506 @classmethod
1507 def validate_name(cls, v: str) -> str:
1508 """Ensure tool names follow MCP naming conventions.
1510 Validates that the tool name:
1511 - Is not empty
1512 - Starts with a letter (not a number or special character)
1513 - Contains only letters, numbers, underscores, and hyphens
1514 - Does not contain HTML special characters that could cause XSS
1515 - Does not exceed maximum length (255 characters)
1517 Args:
1518 v (str): Tool name to validate
1520 Returns:
1521 str: The validated tool name if it passes all checks
1523 Raises:
1524 ValueError: If the tool name violates any validation rules
1525 """
1526 return SecurityValidator.validate_tool_name(v)
1528 @field_validator("arguments")
1529 @classmethod
1530 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
1531 """Validate arguments structure depth to prevent DoS attacks.
1533 Ensures that the arguments dictionary doesn't have excessive nesting
1534 that could cause performance issues or stack overflow. The default
1535 maximum depth is 10 levels.
1537 Args:
1538 v (dict): Arguments dictionary to validate
1540 Returns:
1541 dict: The validated arguments if within depth limits
1543 Raises:
1544 ValueError: If the arguments exceed the maximum allowed depth
1545 """
1546 SecurityValidator.validate_json_depth(v)
1547 return v
1550class ToolResult(BaseModelWithConfigDict):
1551 """Schema for tool invocation results.
1553 Supports:
1554 - Multiple content types (text/image)
1555 - Error reporting
1556 - Optional error messages
1557 """
1559 content: List[Union[TextContent, ImageContent]]
1560 structured_content: Optional[Dict[str, Any]] = None
1561 is_error: bool = False
1562 error_message: Optional[str] = None
1565class ResourceCreate(BaseModel):
1566 """
1567 Schema for creating a new resource.
1569 Attributes:
1570 model_config (ConfigDict): Configuration for the model.
1571 uri (str): Unique URI for the resource.
1572 name (str): Human-readable name for the resource.
1573 description (Optional[str]): Optional description of the resource.
1574 mime_type (Optional[str]): Optional MIME type of the resource.
1575 template (Optional[str]): Optional URI template for parameterized resources.
1576 content (Union[str, bytes]): Content of the resource, which can be text or binary.
1577 """
1579 model_config = ConfigDict(str_strip_whitespace=True, populate_by_name=True)
1581 uri: str = Field(..., description="Unique URI for the resource")
1582 name: str = Field(..., description="Human-readable resource name")
1583 description: Optional[str] = Field(None, description="Resource description")
1584 mime_type: Optional[str] = Field(None, alias="mimeType", description="Resource MIME type")
1585 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1586 content: Union[str, bytes] = Field(..., description="Resource content (text or binary)")
1587 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the resource")
1589 # Team scoping fields
1590 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
1591 owner_email: Optional[str] = Field(None, description="Email of the resource owner")
1592 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
1593 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the resource")
1595 @field_validator("tags")
1596 @classmethod
1597 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
1598 """Validate and normalize tags.
1600 Args:
1601 v: Optional list of tag strings to validate
1603 Returns:
1604 List of validated tag strings
1605 """
1606 return validate_tags_field(v)
1608 @field_validator("uri")
1609 @classmethod
1610 def validate_uri(cls, v: str) -> str:
1611 """Validate URI format
1613 Args:
1614 v (str): Value to validate
1616 Returns:
1617 str: Value if validated as safe
1618 """
1619 return SecurityValidator.validate_uri(v, "Resource URI")
1621 @field_validator("name")
1622 @classmethod
1623 def validate_name(cls, v: str) -> str:
1624 """Validate resource name
1626 Args:
1627 v (str): Value to validate
1629 Returns:
1630 str: Value if validated as safe
1631 """
1632 return SecurityValidator.validate_name(v, "Resource name")
1634 @field_validator("description")
1635 @classmethod
1636 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1637 """Ensure descriptions display safely, truncate if too long
1639 Args:
1640 v (str): Value to validate
1642 Returns:
1643 str: Value if validated as safe and truncated if too long
1645 Raises:
1646 ValueError: When value is unsafe
1648 Examples:
1649 >>> from mcpgateway.schemas import ResourceCreate
1650 >>> ResourceCreate.validate_description('A safe description')
1651 'A safe description'
1652 >>> ResourceCreate.validate_description(None) # Test None case
1653 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1654 >>> truncated = ResourceCreate.validate_description(long_desc)
1655 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1656 0
1657 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1658 True
1659 """
1660 if v is None:
1661 return v
1662 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1663 # Truncate the description to the maximum allowed length
1664 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1665 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1666 return SecurityValidator.sanitize_display_text(truncated, "Description")
1667 return SecurityValidator.sanitize_display_text(v, "Description")
1669 @field_validator("mime_type")
1670 @classmethod
1671 def validate_mime_type(cls, v: Optional[str]) -> Optional[str]:
1672 """Validate MIME type format
1674 Args:
1675 v (str): Value to validate
1677 Returns:
1678 str: Value if validated as safe
1679 """
1680 if v is None:
1681 return v
1682 return SecurityValidator.validate_mime_type(v)
1684 @field_validator("content")
1685 @classmethod
1686 def validate_content(cls, v: Optional[Union[str, bytes]]) -> Optional[Union[str, bytes]]:
1687 """Validate content size and safety
1689 Args:
1690 v (Union[str, bytes]): Value to validate
1692 Returns:
1693 Union[str, bytes]: Value if validated as safe
1695 Raises:
1696 ValueError: When value is unsafe
1697 """
1698 if v is None:
1699 return v
1701 if len(v) > SecurityValidator.MAX_CONTENT_LENGTH:
1702 raise ValueError(f"Content exceeds maximum length of {SecurityValidator.MAX_CONTENT_LENGTH}")
1704 if isinstance(v, bytes):
1705 try:
1706 text = v.decode("utf-8")
1707 except UnicodeDecodeError:
1708 raise ValueError("Content must be UTF-8 decodable")
1709 else:
1710 text = v
1711 # Runtime pattern matching (not precompiled to allow test monkeypatching)
1712 if re.search(SecurityValidator.DANGEROUS_HTML_PATTERN, text, re.IGNORECASE):
1713 raise ValueError("Content contains HTML tags that may cause display issues")
1715 return v
1718class ResourceUpdate(BaseModelWithConfigDict):
1719 """Schema for updating an existing resource.
1721 Similar to ResourceCreate but URI is not required and all fields are optional.
1722 """
1724 uri: Optional[str] = Field(None, description="Unique URI for the resource")
1725 name: Optional[str] = Field(None, description="Human-readable resource name")
1726 description: Optional[str] = Field(None, description="Resource description")
1727 mime_type: Optional[str] = Field(None, description="Resource MIME type")
1728 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1729 content: Optional[Union[str, bytes]] = Field(None, description="Resource content (text or binary)")
1730 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the resource")
1732 # Team scoping fields
1733 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
1734 owner_email: Optional[str] = Field(None, description="Email of the resource owner")
1735 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
1737 @field_validator("tags")
1738 @classmethod
1739 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
1740 """Validate and normalize tags.
1742 Args:
1743 v: Optional list of tag strings to validate
1745 Returns:
1746 List of validated tag strings or None if input is None
1747 """
1748 return validate_tags_field(v)
1750 @field_validator("name")
1751 @classmethod
1752 def validate_name(cls, v: str) -> str:
1753 """Validate resource name
1755 Args:
1756 v (str): Value to validate
1758 Returns:
1759 str: Value if validated as safe
1760 """
1761 return SecurityValidator.validate_name(v, "Resource name")
1763 @field_validator("description")
1764 @classmethod
1765 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1766 """Ensure descriptions display safely, truncate if too long
1768 Args:
1769 v (str): Value to validate
1771 Returns:
1772 str: Value if validated as safe and truncated if too long
1774 Raises:
1775 ValueError: When value is unsafe
1777 Examples:
1778 >>> from mcpgateway.schemas import ResourceUpdate
1779 >>> ResourceUpdate.validate_description('A safe description')
1780 'A safe description'
1781 >>> ResourceUpdate.validate_description(None) # Test None case
1782 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1783 >>> truncated = ResourceUpdate.validate_description(long_desc)
1784 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1785 0
1786 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1787 True
1788 """
1789 if v is None:
1790 return v
1791 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1792 # Truncate the description to the maximum allowed length
1793 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1794 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1795 return SecurityValidator.sanitize_display_text(truncated, "Description")
1796 return SecurityValidator.sanitize_display_text(v, "Description")
1798 @field_validator("mime_type")
1799 @classmethod
1800 def validate_mime_type(cls, v: Optional[str]) -> Optional[str]:
1801 """Validate MIME type format
1803 Args:
1804 v (str): Value to validate
1806 Returns:
1807 str: Value if validated as safe
1808 """
1809 if v is None:
1810 return v
1811 return SecurityValidator.validate_mime_type(v)
1813 @field_validator("content")
1814 @classmethod
1815 def validate_content(cls, v: Optional[Union[str, bytes]]) -> Optional[Union[str, bytes]]:
1816 """Validate content size and safety
1818 Args:
1819 v (Union[str, bytes]): Value to validate
1821 Returns:
1822 Union[str, bytes]: Value if validated as safe
1824 Raises:
1825 ValueError: When value is unsafe
1826 """
1827 if v is None:
1828 return v
1830 if len(v) > SecurityValidator.MAX_CONTENT_LENGTH:
1831 raise ValueError(f"Content exceeds maximum length of {SecurityValidator.MAX_CONTENT_LENGTH}")
1833 if isinstance(v, bytes):
1834 try:
1835 text = v.decode("utf-8")
1836 except UnicodeDecodeError:
1837 raise ValueError("Content must be UTF-8 decodable")
1838 else:
1839 text = v
1840 # Runtime pattern matching (not precompiled to allow test monkeypatching)
1841 if re.search(SecurityValidator.DANGEROUS_HTML_PATTERN, text, re.IGNORECASE):
1842 raise ValueError("Content contains HTML tags that may cause display issues")
1844 return v
1847class ResourceRead(BaseModelWithConfigDict):
1848 """Schema for reading resource information.
1850 Includes all resource fields plus:
1851 - Database ID
1852 - Content size
1853 - Creation/update timestamps
1854 - Active status
1855 - Metrics: Aggregated metrics for the resource invocations.
1856 """
1858 id: str = Field(description="Unique ID of the resource")
1859 uri: str
1860 name: str
1861 description: Optional[str]
1862 mime_type: Optional[str]
1863 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1864 size: Optional[int]
1865 created_at: datetime
1866 updated_at: datetime
1867 enabled: bool
1868 metrics: Optional[ResourceMetrics] = Field(None, description="Resource metrics (may be None in list operations)")
1869 tags: List[str] = Field(default_factory=list, description="Tags for categorizing the resource")
1871 # Comprehensive metadata for audit tracking
1872 created_by: Optional[str] = Field(None, description="Username who created this entity")
1873 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
1874 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
1875 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
1877 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
1878 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
1879 modified_via: Optional[str] = Field(None, description="Modification method")
1880 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
1882 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
1883 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
1884 version: Optional[int] = Field(1, description="Entity version for change tracking")
1886 # Team scoping fields
1887 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
1888 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
1889 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
1890 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
1892 # MCP protocol fields
1893 title: Optional[str] = Field(None, description="Human-readable title for the resource")
1894 annotations: Optional[Annotations] = Field(None, description="Optional annotations for client rendering hints")
1895 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
1898class ResourceSubscription(BaseModelWithConfigDict):
1899 """Schema for resource subscriptions.
1901 This schema validates resource subscription requests to ensure URIs are safe
1902 and subscriber IDs follow proper formatting rules. It prevents various
1903 injection attacks and ensures data consistency.
1905 Tracks:
1906 - Resource URI being subscribed to (validated for safety)
1907 - Unique subscriber identifier (validated for proper format)
1909 Validation Rules:
1910 - URIs cannot contain HTML special characters (<, >, ", ', backslash)
1911 - URIs cannot contain directory traversal sequences (..)
1912 - URIs must contain only safe characters (alphanumeric, _, -, :, /, ?, =, &, %)
1913 - Subscriber IDs must contain only alphanumeric characters, underscores, hyphens, and dots
1914 - Both fields have maximum length limits (255 characters)
1916 Attributes:
1917 uri (str): URI of the resource to subscribe to. Must be a safe, valid URI.
1918 subscriber_id (str): Unique identifier for the subscriber. Must follow
1919 identifier naming conventions.
1921 Examples:
1922 >>> from pydantic import ValidationError
1923 >>> # Valid subscription
1924 >>> sub = ResourceSubscription(uri="/api/v1/users/123", subscriber_id="client_001")
1925 >>> sub.uri
1926 '/api/v1/users/123'
1927 >>> sub.subscriber_id
1928 'client_001'
1930 >>> # Valid URI with query parameters
1931 >>> sub = ResourceSubscription(uri="/data?type=json&limit=10", subscriber_id="app.service.1")
1932 >>> sub.uri
1933 '/data?type=json&limit=10'
1935 >>> # Valid subscriber ID with dots (common for service names)
1936 >>> sub = ResourceSubscription(uri="/events", subscriber_id="com.example.service")
1937 >>> sub.subscriber_id
1938 'com.example.service'
1940 >>> # Invalid: XSS attempt in URI
1941 >>> try:
1942 ... ResourceSubscription(uri="<script>alert('XSS')</script>", subscriber_id="sub1")
1943 ... except ValidationError as e:
1944 ... print("Validation failed: HTML characters not allowed")
1945 Validation failed: HTML characters not allowed
1947 >>> # Invalid: Directory traversal in URI
1948 >>> try:
1949 ... ResourceSubscription(uri="/api/../../../etc/passwd", subscriber_id="sub1")
1950 ... except ValidationError as e:
1951 ... print("Validation failed: Directory traversal detected")
1952 Validation failed: Directory traversal detected
1954 >>> # Invalid: SQL injection attempt in URI
1955 >>> try:
1956 ... ResourceSubscription(uri="/users'; DROP TABLE users;--", subscriber_id="sub1")
1957 ... except ValidationError as e:
1958 ... print("Validation failed: Invalid characters in URI")
1959 Validation failed: Invalid characters in URI
1961 >>> # Invalid: Special characters in subscriber ID
1962 >>> try:
1963 ... ResourceSubscription(uri="/api/data", subscriber_id="sub@123!")
1964 ... except ValidationError as e:
1965 ... print("Validation failed: Invalid subscriber ID format")
1966 Validation failed: Invalid subscriber ID format
1968 >>> # Invalid: Empty URI
1969 >>> try:
1970 ... ResourceSubscription(uri="", subscriber_id="sub1")
1971 ... except ValidationError as e:
1972 ... print("Validation failed: URI cannot be empty")
1973 Validation failed: URI cannot be empty
1975 >>> # Invalid: Empty subscriber ID
1976 >>> try:
1977 ... ResourceSubscription(uri="/api/data", subscriber_id="")
1978 ... except ValidationError as e:
1979 ... print("Validation failed: Subscriber ID cannot be empty")
1980 Validation failed: Subscriber ID cannot be empty
1982 >>> # Valid: Complex but safe URI
1983 >>> sub = ResourceSubscription(
1984 ... uri="/api/v2/resources/category:items/filter?status=active&limit=50",
1985 ... subscriber_id="monitor-service-01"
1986 ... )
1987 >>> sub.uri
1988 '/api/v2/resources/category:items/filter?status=active&limit=50'
1990 >>> # Edge case: Maximum length validation (simulated)
1991 >>> long_uri = "/" + "a" * 254 # Just under limit
1992 >>> sub = ResourceSubscription(uri=long_uri, subscriber_id="sub1")
1993 >>> len(sub.uri)
1994 255
1996 >>> # Invalid: Quotes in URI (could break out of attributes)
1997 >>> try:
1998 ... ResourceSubscription(uri='/api/data"onclick="alert(1)', subscriber_id="sub1")
1999 ... except ValidationError as e:
2000 ... print("Validation failed: Quotes not allowed in URI")
2001 Validation failed: Quotes not allowed in URI
2002 """
2004 uri: str = Field(..., description="URI of resource to subscribe to")
2005 subscriber_id: str = Field(..., description="Unique subscriber identifier")
2007 @field_validator("uri")
2008 @classmethod
2009 def validate_uri(cls, v: str) -> str:
2010 """Validate URI format for safety and correctness.
2012 Ensures the URI:
2013 - Is not empty
2014 - Does not contain HTML special characters that could cause XSS
2015 - Does not contain directory traversal sequences (..)
2016 - Contains only allowed characters for URIs
2017 - Does not exceed maximum length (255 characters)
2019 This prevents various injection attacks including XSS, path traversal,
2020 and other URI-based vulnerabilities.
2022 Args:
2023 v (str): URI to validate
2025 Returns:
2026 str: The validated URI if it passes all security checks
2028 Raises:
2029 ValueError: If the URI contains dangerous patterns or invalid characters
2030 """
2031 return SecurityValidator.validate_uri(v, "Resource URI")
2033 @field_validator("subscriber_id")
2034 @classmethod
2035 def validate_subscriber_id(cls, v: str) -> str:
2036 """Validate subscriber ID format.
2038 Ensures the subscriber ID:
2039 - Is not empty
2040 - Contains only safe identifier characters
2041 - Allows email-style IDs for authenticated subscribers
2042 - Does not contain HTML special characters
2043 - Follows standard identifier naming conventions
2044 - Does not exceed maximum length (255 characters)
2046 This ensures consistency and prevents injection attacks through
2047 subscriber identifiers.
2049 Args:
2050 v (str): Subscriber ID to validate
2052 Returns:
2053 str: The validated subscriber ID if it passes all checks
2055 Raises:
2056 ValueError: If the subscriber ID violates naming conventions
2057 """
2058 if not v:
2059 raise ValueError("Subscriber ID cannot be empty")
2061 # Allow email-like subscriber IDs while keeping strict character controls.
2062 if re.match(r"^[A-Za-z0-9_.@+-]+$", v):
2063 if re.search(SecurityValidator.VALIDATION_UNSAFE_URI_PATTERN, v):
2064 raise ValueError("Subscriber ID cannot contain HTML special characters")
2065 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
2066 raise ValueError(f"Subscriber ID exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
2067 return v
2069 return SecurityValidator.validate_identifier(v, "Subscriber ID")
2072class ResourceNotification(BaseModelWithConfigDict):
2073 """Schema for resource update notifications.
2075 Contains:
2076 - Resource URI
2077 - Updated content
2078 - Update timestamp
2079 """
2081 uri: str
2082 content: ResourceContent
2083 timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
2085 @field_serializer("timestamp")
2086 def serialize_timestamp(self, dt: datetime) -> str:
2087 """Serialize the `timestamp` field as an ISO 8601 string with UTC timezone.
2089 Converts the given datetime to UTC and returns it in ISO 8601 format,
2090 replacing the "+00:00" suffix with "Z" to indicate UTC explicitly.
2092 Args:
2093 dt (datetime): The datetime object to serialize.
2095 Returns:
2096 str: ISO 8601 formatted string in UTC, ending with 'Z'.
2097 """
2098 return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
2101# --- Prompt Schemas ---
2104class PromptArgument(BaseModelWithConfigDict):
2105 """Schema for prompt template arguments.
2107 Defines:
2108 - Argument name
2109 - Optional description
2110 - Required flag
2111 """
2113 name: str = Field(..., description="Argument name")
2114 description: Optional[str] = Field(None, description="Argument description")
2115 required: bool = Field(default=False, description="Whether argument is required")
2117 # Use base config; example metadata removed to avoid config merging type issues in static checks
2120class PromptCreate(BaseModelWithConfigDict):
2121 """
2122 Schema for creating a new prompt.
2124 Attributes:
2125 model_config (ConfigDict): Configuration for the model.
2126 name (str): Unique name for the prompt.
2127 description (Optional[str]): Optional description of the prompt.
2128 template (str): Template text for the prompt.
2129 arguments (List[PromptArgument]): List of arguments for the template.
2130 """
2132 model_config = ConfigDict(**dict(BaseModelWithConfigDict.model_config), str_strip_whitespace=True)
2134 name: str = Field(..., description="Unique name for the prompt")
2135 custom_name: Optional[str] = Field(None, description="Custom prompt name used for MCP invocation")
2136 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2137 description: Optional[str] = Field(None, description="Prompt description")
2138 template: str = Field(..., description="Prompt template text")
2139 arguments: List[PromptArgument] = Field(default_factory=list, description="List of arguments for the template")
2140 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the prompt")
2142 # Team scoping fields
2143 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
2144 owner_email: Optional[str] = Field(None, description="Email of the prompt owner")
2145 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
2146 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the prompt")
2148 @field_validator("tags")
2149 @classmethod
2150 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2151 """Validate and normalize tags.
2153 Args:
2154 v: Optional list of tag strings to validate
2156 Returns:
2157 List of validated tag strings
2158 """
2159 return validate_tags_field(v)
2161 @field_validator("name")
2162 @classmethod
2163 def validate_name(cls, v: str) -> str:
2164 """Ensure prompt names display correctly in UI
2166 Args:
2167 v (str): Value to validate
2169 Returns:
2170 str: Value if validated as safe
2171 """
2172 return SecurityValidator.validate_name(v, "Prompt name")
2174 @field_validator("custom_name")
2175 @classmethod
2176 def validate_custom_name(cls, v: Optional[str]) -> Optional[str]:
2177 """Ensure custom prompt names follow MCP naming conventions.
2179 Args:
2180 v: Custom prompt name to validate.
2182 Returns:
2183 The validated custom name or None.
2184 """
2185 if v is None:
2186 return v
2187 return SecurityValidator.validate_name(v, "Prompt name")
2189 @field_validator("display_name")
2190 @classmethod
2191 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
2192 """Ensure display names render safely in UI.
2194 Args:
2195 v: Display name to validate.
2197 Returns:
2198 The validated display name or None.
2199 """
2200 if v is None:
2201 return v
2202 return SecurityValidator.sanitize_display_text(v, "Prompt display name")
2204 @field_validator("description")
2205 @classmethod
2206 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2207 """Ensure descriptions display safely, truncate if too long
2209 Args:
2210 v (str): Value to validate
2212 Returns:
2213 str: Value if validated as safe and truncated if too long
2215 Raises:
2216 ValueError: When value is unsafe
2218 Examples:
2219 >>> from mcpgateway.schemas import PromptCreate
2220 >>> PromptCreate.validate_description('A safe description')
2221 'A safe description'
2222 >>> PromptCreate.validate_description(None) # Test None case
2223 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2224 >>> truncated = PromptCreate.validate_description(long_desc)
2225 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2226 0
2227 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2228 True
2229 """
2230 if v is None:
2231 return v
2232 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2233 # Truncate the description to the maximum allowed length
2234 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2235 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2236 return SecurityValidator.sanitize_display_text(truncated, "Description")
2237 return SecurityValidator.sanitize_display_text(v, "Description")
2239 @field_validator("template")
2240 @classmethod
2241 def validate_template(cls, v: str) -> str:
2242 """Validate template content for safe display
2244 Args:
2245 v (str): Value to validate
2247 Returns:
2248 str: Value if validated as safe
2249 """
2250 return SecurityValidator.validate_template(v)
2252 @field_validator("arguments")
2253 @classmethod
2254 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
2255 """Ensure JSON structure is valid and within complexity limits
2257 Args:
2258 v (dict): Value to validate
2260 Returns:
2261 dict: Value if validated as safe
2262 """
2263 SecurityValidator.validate_json_depth(v)
2264 return v
2267class PromptExecuteArgs(BaseModel):
2268 """
2269 Schema for args executing a prompt
2271 Attributes:
2272 args (Dict[str, str]): Arguments for prompt execution.
2273 """
2275 model_config = ConfigDict(str_strip_whitespace=True)
2277 args: Dict[str, str] = Field(default_factory=dict, description="Arguments for prompt execution")
2279 @field_validator("args")
2280 @classmethod
2281 def validate_args(cls, v: dict) -> dict:
2282 """Ensure prompt arguments pass XSS validation
2284 Args:
2285 v (dict): Value to validate
2287 Returns:
2288 dict: Value if validated as safe
2289 """
2290 for val in v.values():
2291 SecurityValidator.validate_no_xss(val, "Prompt execution arguments")
2292 return v
2295class PromptUpdate(BaseModelWithConfigDict):
2296 """Schema for updating an existing prompt.
2298 Similar to PromptCreate but all fields are optional to allow partial updates.
2299 """
2301 name: Optional[str] = Field(None, description="Unique name for the prompt")
2302 custom_name: Optional[str] = Field(None, description="Custom prompt name used for MCP invocation")
2303 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2304 description: Optional[str] = Field(None, description="Prompt description")
2305 template: Optional[str] = Field(None, description="Prompt template text")
2306 arguments: Optional[List[PromptArgument]] = Field(None, description="List of arguments for the template")
2308 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the prompt")
2310 # Team scoping fields
2311 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
2312 owner_email: Optional[str] = Field(None, description="Email of the prompt owner")
2313 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
2315 @field_validator("tags")
2316 @classmethod
2317 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2318 """Validate and normalize tags.
2320 Args:
2321 v: Optional list of tag strings to validate
2323 Returns:
2324 List of validated tag strings
2325 """
2326 return validate_tags_field(v)
2328 @field_validator("name")
2329 @classmethod
2330 def validate_name(cls, v: str) -> str:
2331 """Ensure prompt names display correctly in UI
2333 Args:
2334 v (str): Value to validate
2336 Returns:
2337 str: Value if validated as safe
2338 """
2339 return SecurityValidator.validate_name(v, "Prompt name")
2341 @field_validator("custom_name")
2342 @classmethod
2343 def validate_custom_name(cls, v: Optional[str]) -> Optional[str]:
2344 """Ensure custom prompt names follow MCP naming conventions.
2346 Args:
2347 v: Custom prompt name to validate.
2349 Returns:
2350 The validated custom name or None.
2351 """
2352 if v is None:
2353 return v
2354 return SecurityValidator.validate_name(v, "Prompt name")
2356 @field_validator("display_name")
2357 @classmethod
2358 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
2359 """Ensure display names render safely in UI.
2361 Args:
2362 v: Display name to validate.
2364 Returns:
2365 The validated display name or None.
2366 """
2367 if v is None:
2368 return v
2369 return SecurityValidator.sanitize_display_text(v, "Prompt display name")
2371 @field_validator("description")
2372 @classmethod
2373 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2374 """Ensure descriptions display safely, truncate if too long
2376 Args:
2377 v (str): Value to validate
2379 Returns:
2380 str: Value if validated as safe and truncated if too long
2382 Raises:
2383 ValueError: When value is unsafe
2385 Examples:
2386 >>> from mcpgateway.schemas import PromptUpdate
2387 >>> PromptUpdate.validate_description('A safe description')
2388 'A safe description'
2389 >>> PromptUpdate.validate_description(None) # Test None case
2390 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2391 >>> truncated = PromptUpdate.validate_description(long_desc)
2392 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2393 0
2394 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2395 True
2396 """
2397 if v is None:
2398 return v
2399 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2400 # Truncate the description to the maximum allowed length
2401 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2402 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2403 return SecurityValidator.sanitize_display_text(truncated, "Description")
2404 return SecurityValidator.sanitize_display_text(v, "Description")
2406 @field_validator("template")
2407 @classmethod
2408 def validate_template(cls, v: str) -> str:
2409 """Validate template content for safe display
2411 Args:
2412 v (str): Value to validate
2414 Returns:
2415 str: Value if validated as safe
2416 """
2417 return SecurityValidator.validate_template(v)
2419 @field_validator("arguments")
2420 @classmethod
2421 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
2422 """Ensure JSON structure is valid and within complexity limits
2424 Args:
2425 v (dict): Value to validate
2427 Returns:
2428 dict: Value if validated as safe
2429 """
2430 SecurityValidator.validate_json_depth(v)
2431 return v
2434class PromptRead(BaseModelWithConfigDict):
2435 """Schema for reading prompt information.
2437 Includes all prompt fields plus:
2438 - Database ID
2439 - Creation/update timestamps
2440 - Active status
2441 - Metrics: Aggregated metrics for the prompt invocations.
2442 """
2444 id: str = Field(description="Unique ID of the prompt")
2445 name: str
2446 original_name: str
2447 custom_name: str
2448 custom_name_slug: str
2449 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2450 gateway_slug: Optional[str] = None
2451 description: Optional[str]
2452 template: str
2453 arguments: List[PromptArgument]
2454 created_at: datetime
2455 updated_at: datetime
2456 # is_active: bool
2457 enabled: bool
2458 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the prompt")
2459 metrics: Optional[PromptMetrics] = Field(None, description="Prompt metrics (may be None in list operations)")
2461 # Comprehensive metadata for audit tracking
2462 created_by: Optional[str] = Field(None, description="Username who created this entity")
2463 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
2464 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
2465 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
2467 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
2468 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
2469 modified_via: Optional[str] = Field(None, description="Modification method")
2470 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
2472 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
2473 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
2474 version: Optional[int] = Field(1, description="Entity version for change tracking")
2476 # Team scoping fields
2477 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
2478 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
2479 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
2480 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
2482 # MCP protocol fields
2483 title: Optional[str] = Field(None, description="Human-readable title for the prompt")
2484 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
2487class PromptInvocation(BaseModelWithConfigDict):
2488 """Schema for prompt invocation requests.
2490 Contains:
2491 - Prompt name to use
2492 - Arguments for template rendering
2493 """
2495 name: str = Field(..., description="Name of prompt to use")
2496 arguments: Dict[str, str] = Field(default_factory=dict, description="Arguments for template rendering")
2499# --- Global Config Schemas ---
2500class GlobalConfigUpdate(BaseModel):
2501 """Schema for updating global configuration.
2503 Attributes:
2504 passthrough_headers (Optional[List[str]]): List of headers allowed to be passed through globally
2505 """
2507 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through globally")
2510class GlobalConfigRead(BaseModel):
2511 """Schema for reading global configuration.
2513 Attributes:
2514 passthrough_headers (Optional[List[str]]): List of headers allowed to be passed through globally
2515 """
2517 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through globally")
2520# --- Gateway Schemas ---
2523# --- Transport Type ---
2524class TransportType(str, Enum):
2525 """
2526 Enumeration of supported transport mechanisms for communication between components.
2528 Attributes:
2529 SSE (str): Server-Sent Events transport.
2530 HTTP (str): Standard HTTP-based transport.
2531 STDIO (str): Standard input/output transport.
2532 STREAMABLEHTTP (str): HTTP transport with streaming.
2533 """
2535 SSE = "SSE"
2536 HTTP = "HTTP"
2537 STDIO = "STDIO"
2538 STREAMABLEHTTP = "STREAMABLEHTTP"
2541class GatewayCreate(BaseModel):
2542 """
2543 Schema for creating a new gateway.
2545 Attributes:
2546 model_config (ConfigDict): Configuration for the model.
2547 name (str): Unique name for the gateway.
2548 url (Union[str, AnyHttpUrl]): Gateway endpoint URL.
2549 description (Optional[str]): Optional description of the gateway.
2550 transport (str): Transport used by the MCP server, default is "SSE".
2551 auth_type (Optional[str]): Type of authentication (basic, bearer, authheaders, or none).
2552 auth_username (Optional[str]): Username for basic authentication.
2553 auth_password (Optional[str]): Password for basic authentication.
2554 auth_token (Optional[str]): Token for bearer authentication.
2555 auth_header_key (Optional[str]): Key for custom headers authentication.
2556 auth_header_value (Optional[str]): Value for custom headers authentication.
2557 auth_headers (Optional[List[Dict[str, str]]]): List of custom headers for authentication.
2558 auth_value (Optional[str]): Alias for authentication value, used for better access post-validation.
2559 """
2561 model_config = ConfigDict(str_strip_whitespace=True)
2563 name: str = Field(..., description="Unique name for the gateway")
2564 url: Union[str, AnyHttpUrl] = Field(..., description="Gateway endpoint URL")
2565 description: Optional[str] = Field(None, description="Gateway description")
2566 transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
2567 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
2569 # Authorizations
2570 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, authheaders, oauth, query_param, or none")
2571 # Fields for various types of authentication
2572 auth_username: Optional[str] = Field(None, description="Username for basic authentication")
2573 auth_password: Optional[str] = Field(None, description="Password for basic authentication")
2574 auth_token: Optional[str] = Field(None, description="Token for bearer authentication")
2575 auth_header_key: Optional[str] = Field(None, description="Key for custom headers authentication")
2576 auth_header_value: Optional[str] = Field(None, description="Value for custom headers authentication")
2577 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
2579 # OAuth 2.0 configuration
2580 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
2582 # Query Parameter Authentication (INSECURE)
2583 auth_query_param_key: Optional[str] = Field(
2584 None,
2585 description="Query parameter name for authentication (e.g., 'api_key', 'tavilyApiKey')",
2586 pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-]*$",
2587 )
2588 auth_query_param_value: Optional[SecretStr] = Field(
2589 None,
2590 description="Query parameter value (API key). Stored encrypted.",
2591 )
2593 # Adding `auth_value` as an alias for better access post-validation
2594 auth_value: Optional[str] = Field(None, validate_default=True)
2596 # One time auth - do not store the auth in gateway flag
2597 one_time_auth: Optional[bool] = Field(default=False, description="The authentication should be used only once and not stored in the gateway")
2599 tags: Optional[List[Union[str, Dict[str, str]]]] = Field(default_factory=list, description="Tags for categorizing the gateway")
2601 # Team scoping fields for resource organization
2602 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
2603 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
2604 visibility: Optional[str] = Field(default="public", description="Gateway visibility: private, team, or public")
2606 # CA certificate
2607 ca_certificate: Optional[str] = Field(None, description="Custom CA certificate for TLS verification")
2608 ca_certificate_sig: Optional[str] = Field(None, description="Signature of the custom CA certificate for integrity verification")
2609 signing_algorithm: Optional[str] = Field("ed25519", description="Algorithm used for signing the CA certificate")
2611 # Per-gateway refresh configuration
2612 refresh_interval_seconds: Optional[int] = Field(None, ge=60, description="Per-gateway refresh interval in seconds (minimum 60); uses global default if not set")
2614 # Gateway mode configuration
2615 gateway_mode: str = Field(default="cache", description="Gateway mode: 'cache' (database caching, default) or 'direct_proxy' (pass-through mode with no caching)", pattern="^(cache|direct_proxy)$")
2617 @field_validator("gateway_mode", mode="before")
2618 @classmethod
2619 def default_gateway_mode(cls, v: Optional[str]) -> str:
2620 """Default gateway_mode to 'cache' when None is provided.
2622 Args:
2623 v: Gateway mode value (may be None).
2625 Returns:
2626 The validated gateway mode string, defaulting to 'cache'.
2627 """
2628 return v if v is not None else "cache"
2630 @field_validator("tags")
2631 @classmethod
2632 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2633 """Validate and normalize tags.
2635 Args:
2636 v: Optional list of tag strings to validate
2638 Returns:
2639 List of validated tag strings
2640 """
2641 return validate_tags_field(v)
2643 @field_validator("name")
2644 @classmethod
2645 def validate_name(cls, v: str) -> str:
2646 """Validate gateway name
2648 Args:
2649 v (str): Value to validate
2651 Returns:
2652 str: Value if validated as safe
2653 """
2654 return SecurityValidator.validate_name(v, "Gateway name")
2656 @field_validator("url")
2657 @classmethod
2658 def validate_url(cls, v: str) -> str:
2659 """Validate gateway URL
2661 Args:
2662 v (str): Value to validate
2664 Returns:
2665 str: Value if validated as safe
2666 """
2667 return validate_core_url(v, "Gateway URL")
2669 @field_validator("description")
2670 @classmethod
2671 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2672 """Ensure descriptions display safely, truncate if too long
2674 Args:
2675 v (str): Value to validate
2677 Returns:
2678 str: Value if validated as safe and truncated if too long
2680 Raises:
2681 ValueError: When value is unsafe
2683 Examples:
2684 >>> from mcpgateway.schemas import GatewayCreate
2685 >>> GatewayCreate.validate_description('A safe description')
2686 'A safe description'
2687 >>> GatewayCreate.validate_description(None) # Test None case
2688 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2689 >>> truncated = ToolCreate.validate_description(long_desc)
2690 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2691 0
2692 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2693 True
2694 """
2695 if v is None:
2696 return v
2697 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2698 # Truncate the description to the maximum allowed length
2699 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2700 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2701 return SecurityValidator.sanitize_display_text(truncated, "Description")
2702 return SecurityValidator.sanitize_display_text(v, "Description")
2704 @field_validator("auth_value", mode="before")
2705 @classmethod
2706 def create_auth_value(cls, v, info):
2707 """
2708 This validator will run before the model is fully instantiated (mode="before")
2709 It will process the auth fields based on auth_type and generate auth_value.
2711 Args:
2712 v: Input url
2713 info: ValidationInfo containing auth_type
2715 Returns:
2716 str: Auth value
2717 """
2718 data = info.data
2719 auth_type = data.get("auth_type")
2721 if (auth_type is None) or (auth_type == ""):
2722 return v # If no auth_type is provided, no need to create auth_value
2724 # Process the auth fields and generate auth_value based on auth_type
2725 auth_value = cls._process_auth_fields(info)
2726 return auth_value
2728 @field_validator("transport")
2729 @classmethod
2730 def validate_transport(cls, v: str) -> str:
2731 """
2732 Validates that the given transport value is one of the supported TransportType values.
2734 Args:
2735 v (str): The transport value to validate.
2737 Returns:
2738 str: The validated transport value if it is valid.
2740 Raises:
2741 ValueError: If the provided value is not a valid transport type.
2743 Valid transport types are defined in the TransportType enum:
2744 - SSE
2745 - HTTP
2746 - STDIO
2747 - STREAMABLEHTTP
2748 """
2749 allowed = [t.value for t in TransportType.__members__.values()]
2750 if v not in allowed:
2751 raise ValueError(f"Invalid transport type: {v}. Must be one of: {', '.join(allowed)}")
2752 return v
2754 @staticmethod
2755 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
2756 """
2757 Processes the input authentication fields and returns the correct auth_value.
2758 This method is called based on the selected auth_type.
2760 Args:
2761 info: ValidationInfo containing auth fields
2763 Returns:
2764 Encoded auth string or None
2766 Raises:
2767 ValueError: If auth_type is invalid
2768 """
2769 data = info.data
2770 auth_type = data.get("auth_type")
2772 if auth_type == "basic":
2773 # For basic authentication, both username and password must be present
2774 username = data.get("auth_username")
2775 password = data.get("auth_password")
2777 if not username or not password:
2778 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
2780 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
2781 return encode_auth({"Authorization": f"Basic {creds}"})
2783 if auth_type == "bearer":
2784 # For bearer authentication, only token is required
2785 token = data.get("auth_token")
2787 if not token:
2788 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
2790 return encode_auth({"Authorization": f"Bearer {token}"})
2792 if auth_type == "oauth":
2793 # For OAuth authentication, we don't encode anything here
2794 # The OAuth configuration is handled separately in the oauth_config field
2795 # This method is only called for traditional auth types
2796 return None
2798 if auth_type == "authheaders":
2799 # Support both new multi-headers format and legacy single header format
2800 auth_headers = data.get("auth_headers")
2801 if auth_headers and isinstance(auth_headers, list):
2802 # New multi-headers format with enhanced validation
2803 header_dict = {}
2804 duplicate_keys = set()
2806 for header in auth_headers:
2807 if not isinstance(header, dict):
2808 continue
2810 key = header.get("key")
2811 value = header.get("value", "")
2813 # Skip headers without keys
2814 if not key:
2815 continue
2817 # Track duplicate keys (last value wins)
2818 if key in header_dict:
2819 duplicate_keys.add(key)
2821 # Validate header key format (basic HTTP header validation)
2822 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
2823 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
2825 # Store header (empty values are allowed)
2826 header_dict[key] = value
2828 # Ensure at least one valid header
2829 if not header_dict:
2830 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
2832 # Warn about duplicate keys (optional - could log this instead)
2833 if duplicate_keys:
2834 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
2836 # Check for excessive headers (prevent abuse)
2837 if len(header_dict) > 100:
2838 raise ValueError("Maximum of 100 headers allowed per gateway.")
2840 return encode_auth(header_dict)
2842 # Legacy single header format (backward compatibility)
2843 header_key = data.get("auth_header_key")
2844 header_value = data.get("auth_header_value")
2846 if not header_key or not header_value:
2847 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
2849 return encode_auth({header_key: header_value})
2851 if auth_type == "one_time_auth":
2852 return None # No auth_value needed for one-time auth
2854 if auth_type == "query_param":
2855 # Query param auth doesn't use auth_value field
2856 # Validation is handled by model_validator
2857 return None
2859 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
2861 @model_validator(mode="after")
2862 def validate_query_param_auth(self) -> "GatewayCreate":
2863 """Validate query parameter authentication configuration.
2865 Returns:
2866 GatewayCreate: The validated instance.
2868 Raises:
2869 ValueError: If query param auth is disabled or host is not in allowlist.
2870 """
2871 if self.auth_type != "query_param":
2872 return self
2874 # Check feature flag
2875 if not settings.insecure_allow_queryparam_auth:
2876 raise ValueError("Query parameter authentication is disabled. " + "Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable. " + "WARNING: API keys in URLs may appear in proxy logs.")
2878 # Check required fields
2879 if not self.auth_query_param_key:
2880 raise ValueError("auth_query_param_key is required when auth_type is 'query_param'")
2881 if not self.auth_query_param_value:
2882 raise ValueError("auth_query_param_value is required when auth_type is 'query_param'")
2884 # Check host allowlist (if configured)
2885 if settings.insecure_queryparam_auth_allowed_hosts:
2886 parsed = urlparse(str(self.url))
2887 # Extract hostname properly (handles IPv6, ports, userinfo)
2888 hostname = parsed.hostname or ""
2889 hostname = hostname.lower()
2891 if hostname not in settings.insecure_queryparam_auth_allowed_hosts:
2892 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
2893 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query parameter auth. Allowed hosts: {allowed}")
2895 return self
2898class GatewayUpdate(BaseModelWithConfigDict):
2899 """Schema for updating an existing federation gateway.
2901 Similar to GatewayCreate but all fields are optional to allow partial updates.
2902 """
2904 name: Optional[str] = Field(None, description="Unique name for the gateway")
2905 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Gateway endpoint URL")
2906 description: Optional[str] = Field(None, description="Gateway description")
2907 transport: Optional[str] = Field(None, description="Transport used by MCP server: SSE or STREAMABLEHTTP")
2909 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
2911 # Authorizations
2912 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, authheaders or None")
2913 auth_username: Optional[str] = Field(None, description="username for basic authentication")
2914 auth_password: Optional[str] = Field(None, description="password for basic authentication")
2915 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
2916 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
2917 auth_header_value: Optional[str] = Field(None, description="value for custom headers authentication")
2918 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
2920 # Adding `auth_value` as an alias for better access post-validation
2921 auth_value: Optional[str] = Field(None, validate_default=True)
2923 # OAuth 2.0 configuration
2924 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
2926 # Query Parameter Authentication (INSECURE)
2927 auth_query_param_key: Optional[str] = Field(
2928 None,
2929 description="Query parameter name for authentication",
2930 pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-]*$",
2931 )
2932 auth_query_param_value: Optional[SecretStr] = Field(
2933 None,
2934 description="Query parameter value (API key)",
2935 )
2937 # One time auth - do not store the auth in gateway flag
2938 one_time_auth: Optional[bool] = Field(default=False, description="The authentication should be used only once and not stored in the gateway")
2940 tags: Optional[List[Union[str, Dict[str, str]]]] = Field(None, description="Tags for categorizing the gateway")
2942 # Team scoping fields for resource organization
2943 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
2944 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
2945 visibility: Optional[str] = Field(None, description="Gateway visibility: private, team, or public")
2947 # Per-gateway refresh configuration
2948 refresh_interval_seconds: Optional[int] = Field(None, ge=60, description="Per-gateway refresh interval in seconds (minimum 60); uses global default if not set")
2950 # Gateway mode configuration
2951 gateway_mode: Optional[str] = Field(None, description="Gateway mode: 'cache' (database caching, default) or 'direct_proxy' (pass-through mode with no caching)", pattern="^(cache|direct_proxy)$")
2953 @field_validator("tags")
2954 @classmethod
2955 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2956 """Validate and normalize tags.
2958 Args:
2959 v: Optional list of tag strings to validate
2961 Returns:
2962 List of validated tag strings
2963 """
2964 return validate_tags_field(v)
2966 @field_validator("name", mode="before")
2967 @classmethod
2968 def validate_name(cls, v: str) -> str:
2969 """Validate gateway name
2971 Args:
2972 v (str): Value to validate
2974 Returns:
2975 str: Value if validated as safe
2976 """
2977 return SecurityValidator.validate_name(v, "Gateway name")
2979 @field_validator("url", mode="before")
2980 @classmethod
2981 def validate_url(cls, v: str) -> str:
2982 """Validate gateway URL
2984 Args:
2985 v (str): Value to validate
2987 Returns:
2988 str: Value if validated as safe
2989 """
2990 return validate_core_url(v, "Gateway URL")
2992 @field_validator("description", mode="before")
2993 @classmethod
2994 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2995 """Ensure descriptions display safely, truncate if too long
2997 Args:
2998 v (str): Value to validate
3000 Returns:
3001 str: Value if validated as safe and truncated if too long
3003 Raises:
3004 ValueError: When value is unsafe
3006 Examples:
3007 >>> from mcpgateway.schemas import GatewayUpdate
3008 >>> GatewayUpdate.validate_description('A safe description')
3009 'A safe description'
3010 >>> GatewayUpdate.validate_description(None) # Test None case
3011 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
3012 >>> truncated = ToolCreate.validate_description(long_desc)
3013 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
3014 0
3015 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
3016 True
3017 """
3018 if v is None:
3019 return v
3020 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
3021 # Truncate the description to the maximum allowed length
3022 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
3023 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
3024 return SecurityValidator.sanitize_display_text(truncated, "Description")
3025 return SecurityValidator.sanitize_display_text(v, "Description")
3027 @field_validator("auth_value", mode="before")
3028 @classmethod
3029 def create_auth_value(cls, v, info):
3030 """
3031 This validator will run before the model is fully instantiated (mode="before")
3032 It will process the auth fields based on auth_type and generate auth_value.
3034 Args:
3035 v: Input URL
3036 info: ValidationInfo containing auth_type
3038 Returns:
3039 str: Auth value or URL
3040 """
3041 data = info.data
3042 auth_type = data.get("auth_type")
3044 if (auth_type is None) or (auth_type == ""):
3045 return v # If no auth_type is provided, no need to create auth_value
3047 # Process the auth fields and generate auth_value based on auth_type
3048 auth_value = cls._process_auth_fields(info)
3049 return auth_value
3051 @staticmethod
3052 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
3053 """
3054 Processes the input authentication fields and returns the correct auth_value.
3055 This method is called based on the selected auth_type.
3057 Args:
3058 info: ValidationInfo containing auth fields
3060 Returns:
3061 Encoded auth string or None
3063 Raises:
3064 ValueError: If auth type is invalid
3065 """
3067 data = info.data
3068 auth_type = data.get("auth_type")
3070 if auth_type == "basic":
3071 # For basic authentication, both username and password must be present
3072 username = data.get("auth_username")
3073 password = data.get("auth_password")
3074 if not username or not password:
3075 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
3077 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
3078 return encode_auth({"Authorization": f"Basic {creds}"})
3080 if auth_type == "bearer":
3081 # For bearer authentication, only token is required
3082 token = data.get("auth_token")
3084 if not token:
3085 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
3087 return encode_auth({"Authorization": f"Bearer {token}"})
3089 if auth_type == "oauth":
3090 # For OAuth authentication, we don't encode anything here
3091 # The OAuth configuration is handled separately in the oauth_config field
3092 # This method is only called for traditional auth types
3093 return None
3095 if auth_type == "authheaders":
3096 # Support both new multi-headers format and legacy single header format
3097 auth_headers = data.get("auth_headers")
3098 if auth_headers and isinstance(auth_headers, list):
3099 # New multi-headers format with enhanced validation
3100 header_dict = {}
3101 duplicate_keys = set()
3103 for header in auth_headers:
3104 if not isinstance(header, dict):
3105 continue
3107 key = header.get("key")
3108 value = header.get("value", "")
3110 # Skip headers without keys
3111 if not key:
3112 continue
3114 # Track duplicate keys (last value wins)
3115 if key in header_dict:
3116 duplicate_keys.add(key)
3118 # Validate header key format (basic HTTP header validation)
3119 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
3120 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
3122 # Store header (empty values are allowed)
3123 header_dict[key] = value
3125 # Ensure at least one valid header
3126 if not header_dict:
3127 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
3129 # Warn about duplicate keys (optional - could log this instead)
3130 if duplicate_keys:
3131 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
3133 # Check for excessive headers (prevent abuse)
3134 if len(header_dict) > 100:
3135 raise ValueError("Maximum of 100 headers allowed per gateway.")
3137 return encode_auth(header_dict)
3139 # Legacy single header format (backward compatibility)
3140 header_key = data.get("auth_header_key")
3141 header_value = data.get("auth_header_value")
3143 if not header_key or not header_value:
3144 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
3146 return encode_auth({header_key: header_value})
3148 if auth_type == "one_time_auth":
3149 return None # No auth_value needed for one-time auth
3151 if auth_type == "query_param":
3152 # Query param auth doesn't use auth_value field
3153 # Validation is handled by model_validator
3154 return None
3156 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
3158 @model_validator(mode="after")
3159 def validate_query_param_auth(self) -> "GatewayUpdate":
3160 """Validate query parameter authentication configuration.
3162 NOTE: This only runs when auth_type is explicitly set to "query_param".
3163 Service-layer enforcement in update_gateway() handles the case where
3164 auth_type is omitted but the existing gateway uses query_param auth.
3166 Returns:
3167 GatewayUpdate: The validated instance.
3169 Raises:
3170 ValueError: If required fields are missing when setting query_param auth.
3171 """
3172 if self.auth_type == "query_param":
3173 # Validate fields are provided when explicitly setting query_param auth
3174 # Feature flag/allowlist check happens in service layer (has access to existing gateway)
3175 if not self.auth_query_param_key:
3176 raise ValueError("auth_query_param_key is required when setting auth_type to 'query_param'")
3177 if not self.auth_query_param_value:
3178 raise ValueError("auth_query_param_value is required when setting auth_type to 'query_param'")
3180 return self
3183# ---------------------------------------------------------------------------
3184# OAuth config masking helper (used by GatewayRead.masked / A2AAgentRead.masked)
3185# ---------------------------------------------------------------------------
3186_SENSITIVE_OAUTH_KEYS = OAUTH_SENSITIVE_KEYS
3189def _mask_oauth_config(oauth_config: Any) -> Any:
3190 """Recursively mask sensitive keys inside an ``oauth_config`` dict.
3192 Args:
3193 oauth_config: The oauth_config value to mask (dict, list, or scalar).
3195 Returns:
3196 The masked copy with sensitive values replaced.
3197 """
3198 if isinstance(oauth_config, dict):
3199 out: Dict[str, Any] = {}
3200 for k, v in oauth_config.items():
3201 if isinstance(k, str) and k.lower() in _SENSITIVE_OAUTH_KEYS:
3202 out[k] = settings.masked_auth_value if v else v
3203 else:
3204 out[k] = _mask_oauth_config(v)
3205 return out
3206 if isinstance(oauth_config, list):
3207 return [_mask_oauth_config(x) for x in oauth_config]
3208 return oauth_config
3211class GatewayRead(BaseModelWithConfigDict):
3212 """Schema for reading gateway information.
3214 Includes all gateway fields plus:
3215 - Database ID
3216 - Capabilities dictionary
3217 - Creation/update timestamps
3218 - enabled status
3219 - reachable status
3220 - Last seen timestamp
3221 - Authentication type: basic, bearer, authheaders, oauth
3222 - Authentication value: username/password or token or custom headers
3223 - OAuth configuration for OAuth 2.0 authentication
3225 Auto Populated fields:
3226 - Authentication username: for basic auth
3227 - Authentication password: for basic auth
3228 - Authentication token: for bearer auth
3229 - Authentication header key: for authheaders auth
3230 - Authentication header value: for authheaders auth
3231 """
3233 id: Optional[str] = Field(None, description="Unique ID of the gateway")
3234 name: str = Field(..., description="Unique name for the gateway")
3235 url: str = Field(..., description="Gateway endpoint URL")
3236 description: Optional[str] = Field(None, description="Gateway description")
3237 transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
3238 capabilities: Dict[str, Any] = Field(default_factory=dict, description="Gateway capabilities")
3239 created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Creation timestamp")
3240 updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Last update timestamp")
3241 enabled: bool = Field(default=True, description="Is the gateway enabled?")
3242 reachable: bool = Field(default=True, description="Is the gateway reachable/online?")
3244 last_seen: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), description="Last seen timestamp")
3246 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
3247 # Authorizations
3248 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, authheaders, oauth, query_param, or None")
3249 auth_value: Optional[str] = Field(None, description="auth value: username/password or token or custom headers")
3250 auth_headers: Optional[List[Dict[str, str]]] = Field(default=None, description="List of custom headers for authentication")
3251 auth_headers_unmasked: Optional[List[Dict[str, str]]] = Field(default=None, description="Unmasked custom headers for administrative views")
3253 # OAuth 2.0 configuration
3254 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
3256 # Query Parameter Authentication (masked for security)
3257 auth_query_param_key: Optional[str] = Field(
3258 None,
3259 description="Query parameter name for authentication",
3260 )
3261 auth_query_param_value_masked: Optional[str] = Field(
3262 None,
3263 description="Masked indicator if query param auth is configured",
3264 )
3266 # auth_value will populate the following fields
3267 auth_username: Optional[str] = Field(None, description="username for basic authentication")
3268 auth_password: Optional[str] = Field(None, description="password for basic authentication")
3269 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
3270 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
3271 auth_header_value: Optional[str] = Field(None, description="vallue for custom headers authentication")
3272 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the gateway")
3274 auth_password_unmasked: Optional[str] = Field(default=None, description="Unmasked password for basic authentication")
3275 auth_token_unmasked: Optional[str] = Field(default=None, description="Unmasked bearer token for authentication")
3276 auth_header_value_unmasked: Optional[str] = Field(default=None, description="Unmasked single custom header value")
3278 # Team scoping fields for resource organization
3279 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
3280 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
3281 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
3282 visibility: Optional[str] = Field(default="public", description="Gateway visibility: private, team, or public")
3284 # Comprehensive metadata for audit tracking
3285 created_by: Optional[str] = Field(None, description="Username who created this entity")
3286 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
3287 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
3288 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
3290 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
3291 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
3292 modified_via: Optional[str] = Field(None, description="Modification method")
3293 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
3295 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
3296 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
3297 version: Optional[int] = Field(1, description="Entity version for change tracking")
3299 slug: Optional[str] = Field(None, description="Slug for gateway endpoint URL")
3301 # Per-gateway refresh configuration
3302 refresh_interval_seconds: Optional[int] = Field(None, description="Per-gateway refresh interval in seconds")
3303 last_refresh_at: Optional[datetime] = Field(None, description="Timestamp of last successful refresh")
3305 # Gateway mode configuration
3306 gateway_mode: str = Field(default="cache", description="Gateway mode: 'cache' (database caching, default) or 'direct_proxy' (pass-through mode with no caching)")
3308 @model_validator(mode="before")
3309 @classmethod
3310 def _mask_query_param_auth(cls, data: Any) -> Any:
3311 """Mask query param auth value when constructing from DB model.
3313 This extracts auth_query_params from the raw data (DB model or dict)
3314 and populates the masked fields for display.
3316 Args:
3317 data: The raw data (dict or ORM model) to process.
3319 Returns:
3320 Any: The processed data with masked query param values.
3321 """
3322 # Handle dict input
3323 if isinstance(data, dict):
3324 auth_query_params = data.get("auth_query_params")
3325 if auth_query_params and isinstance(auth_query_params, dict):
3326 # Extract the param key name and set masked value
3327 first_key = next(iter(auth_query_params.keys()), None)
3328 if first_key:
3329 data["auth_query_param_key"] = first_key
3330 data["auth_query_param_value_masked"] = settings.masked_auth_value
3331 # Handle ORM model input (has auth_query_params attribute)
3332 elif hasattr(data, "auth_query_params"):
3333 auth_query_params = getattr(data, "auth_query_params", None)
3334 if auth_query_params and isinstance(auth_query_params, dict):
3335 # Convert ORM to dict for modification, preserving all attributes
3336 # Start with table columns
3337 data_dict = {c.name: getattr(data, c.name) for c in data.__table__.columns}
3338 # Preserve dynamically added attributes like 'team' (from relationships)
3339 for attr in ["team"]:
3340 if hasattr(data, attr):
3341 data_dict[attr] = getattr(data, attr)
3342 first_key = next(iter(auth_query_params.keys()), None)
3343 if first_key:
3344 data_dict["auth_query_param_key"] = first_key
3345 data_dict["auth_query_param_value_masked"] = settings.masked_auth_value
3346 return data_dict
3347 return data
3349 # This will be the main method to automatically populate fields
3350 @model_validator(mode="after")
3351 def _populate_auth(self) -> Self:
3352 """Populate authentication fields based on auth_type and encoded auth_value.
3354 This post-validation method decodes the stored authentication value and
3355 populates the appropriate authentication fields (username/password, token,
3356 or custom headers) based on the authentication type. It ensures the
3357 authentication data is properly formatted and accessible through individual
3358 fields for display purposes.
3360 The method handles three authentication types:
3361 - basic: Extracts username and password from Authorization header
3362 - bearer: Extracts token from Bearer Authorization header
3363 - authheaders: Extracts custom header key/value pair
3365 Returns:
3366 Self: The instance with populated authentication fields:
3367 - For basic: auth_username and auth_password
3368 - For bearer: auth_token
3369 - For authheaders: auth_header_key and auth_header_value
3371 Raises:
3372 ValueError: If the authentication data is malformed:
3373 - Basic auth missing username or password
3374 - Bearer auth missing or improperly formatted Authorization header
3375 - Custom headers not exactly one key/value pair
3377 Examples:
3378 >>> # Basic auth example
3379 >>> string_bytes = "admin:secret".encode("utf-8")
3380 >>> encoded_auth = base64.urlsafe_b64encode(string_bytes).decode("utf-8")
3381 >>> values = GatewayRead.model_construct(
3382 ... auth_type="basic",
3383 ... auth_value=encode_auth({"Authorization": f"Basic {encoded_auth}"})
3384 ... )
3385 >>> values = GatewayRead._populate_auth(values)
3386 >>> values.auth_username
3387 'admin'
3388 >>> values.auth_password
3389 'secret'
3391 >>> # Bearer auth example
3392 >>> values = GatewayRead.model_construct(
3393 ... auth_type="bearer",
3394 ... auth_value=encode_auth({"Authorization": "Bearer mytoken123"})
3395 ... )
3396 >>> values = GatewayRead._populate_auth(values)
3397 >>> values.auth_token
3398 'mytoken123'
3400 >>> # Custom headers example
3401 >>> values = GatewayRead.model_construct(
3402 ... auth_type='authheaders',
3403 ... auth_value=encode_auth({"X-API-Key": "abc123"})
3404 ... )
3405 >>> values = GatewayRead._populate_auth(values)
3406 >>> values.auth_header_key
3407 'X-API-Key'
3408 >>> values.auth_header_value
3409 'abc123'
3410 """
3411 auth_type = self.auth_type
3412 auth_value_encoded = self.auth_value
3414 # Skip validation logic if masked value
3415 if auth_value_encoded == settings.masked_auth_value:
3416 return self
3418 # Handle OAuth authentication (no auth_value to decode)
3419 if auth_type == "oauth":
3420 # OAuth gateways don't have traditional auth_value to decode
3421 # They use oauth_config instead
3422 return self
3424 if auth_type == "one_time_auth":
3425 # One-time auth gateways don't store auth_value
3426 return self
3428 if auth_type == "query_param":
3429 # Query param auth is handled by the before validator
3430 # (auth_query_params from DB model is processed there)
3431 return self
3433 # If no encoded value is present, nothing to populate
3434 if not auth_value_encoded:
3435 return self
3437 auth_value = decode_auth(auth_value_encoded)
3438 if auth_type == "basic":
3439 auth = auth_value.get("Authorization")
3440 if not (isinstance(auth, str) and auth.startswith("Basic ")):
3441 raise ValueError("basic auth requires an Authorization header of the form 'Basic <base64>'")
3442 auth = auth.removeprefix("Basic ")
3443 u, p = base64.urlsafe_b64decode(auth).decode("utf-8").split(":")
3444 if not u or not p:
3445 raise ValueError("basic auth requires both username and password")
3446 self.auth_username, self.auth_password = u, p
3447 self.auth_password_unmasked = p
3449 elif auth_type == "bearer":
3450 auth = auth_value.get("Authorization")
3451 if not (isinstance(auth, str) and auth.startswith("Bearer ")):
3452 raise ValueError("bearer auth requires an Authorization header of the form 'Bearer <token>'")
3453 self.auth_token = auth.removeprefix("Bearer ")
3454 self.auth_token_unmasked = self.auth_token
3456 elif auth_type == "authheaders":
3457 # For backward compatibility, populate first header in key/value fields
3458 if not isinstance(auth_value, dict) or len(auth_value) == 0:
3459 raise ValueError("authheaders requires at least one key/value pair")
3460 self.auth_headers = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
3461 self.auth_headers_unmasked = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
3462 k, v = next(iter(auth_value.items()))
3463 self.auth_header_key, self.auth_header_value = k, v
3464 self.auth_header_value_unmasked = v
3466 return self
3468 def masked(self) -> "GatewayRead":
3469 """
3470 Return a masked version of the model instance with sensitive authentication fields hidden.
3472 This method creates a dictionary representation of the model data and replaces sensitive fields
3473 such as `auth_value`, `auth_password`, `auth_token`, and `auth_header_value` with a masked
3474 placeholder value defined in `settings.masked_auth_value`. Masking is only applied if the fields
3475 are present and not already masked.
3477 Args:
3478 None
3480 Returns:
3481 GatewayRead: A new instance of the GatewayRead model with sensitive authentication-related fields
3482 masked to prevent exposure of sensitive information.
3484 Notes:
3485 - The `auth_value` field is only masked if it exists and its value is different from the masking
3486 placeholder.
3487 - Other sensitive fields (`auth_password`, `auth_token`, `auth_header_value`) are masked if present.
3488 - Fields not related to authentication remain unmodified.
3489 """
3490 masked_data = self.model_dump()
3492 # Only mask if auth_value is present and not already masked
3493 if masked_data.get("auth_value") and masked_data["auth_value"] != settings.masked_auth_value:
3494 masked_data["auth_value"] = settings.masked_auth_value
3496 masked_data["auth_password"] = settings.masked_auth_value if masked_data.get("auth_password") else None
3497 masked_data["auth_token"] = settings.masked_auth_value if masked_data.get("auth_token") else None
3498 masked_data["auth_header_value"] = settings.masked_auth_value if masked_data.get("auth_header_value") else None
3499 if masked_data.get("auth_headers"):
3500 masked_data["auth_headers"] = [
3501 {
3502 "key": header.get("key"),
3503 "value": settings.masked_auth_value if header.get("value") else header.get("value"),
3504 }
3505 for header in masked_data["auth_headers"]
3506 ]
3508 # Mask sensitive keys inside oauth_config (e.g. password, client_secret)
3509 if masked_data.get("oauth_config"):
3510 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
3512 # SECURITY: Never expose unmasked credentials in API responses
3513 masked_data["auth_password_unmasked"] = None
3514 masked_data["auth_token_unmasked"] = None
3515 masked_data["auth_header_value_unmasked"] = None
3516 masked_data["auth_headers_unmasked"] = None
3517 return GatewayRead.model_validate(masked_data)
3520class GatewayRefreshResponse(BaseModelWithConfigDict):
3521 """Response schema for manual gateway refresh API.
3523 Contains counts of added, updated, and removed items for tools, resources, and prompts,
3524 along with any validation errors encountered during the refresh operation.
3525 """
3527 gateway_id: str = Field(..., description="ID of the refreshed gateway")
3528 success: bool = Field(default=True, description="Whether the refresh operation was successful")
3529 error: Optional[str] = Field(None, description="Error message if the refresh failed")
3530 tools_added: int = Field(default=0, description="Number of tools added")
3531 tools_updated: int = Field(default=0, description="Number of tools updated")
3532 tools_removed: int = Field(default=0, description="Number of tools removed")
3533 resources_added: int = Field(default=0, description="Number of resources added")
3534 resources_updated: int = Field(default=0, description="Number of resources updated")
3535 resources_removed: int = Field(default=0, description="Number of resources removed")
3536 prompts_added: int = Field(default=0, description="Number of prompts added")
3537 prompts_updated: int = Field(default=0, description="Number of prompts updated")
3538 prompts_removed: int = Field(default=0, description="Number of prompts removed")
3539 validation_errors: List[str] = Field(default_factory=list, description="List of validation errors encountered")
3540 duration_ms: float = Field(..., description="Duration of the refresh operation in milliseconds")
3541 refreshed_at: datetime = Field(..., description="Timestamp when the refresh completed")
3544class FederatedTool(BaseModelWithConfigDict):
3545 """Schema for tools provided by federated gateways.
3547 Contains:
3548 - Tool definition
3549 - Source gateway information
3550 """
3552 tool: MCPTool
3553 gateway_id: str
3554 gateway_name: str
3555 gateway_url: str
3558class FederatedResource(BaseModelWithConfigDict):
3559 """Schema for resources from federated gateways.
3561 Contains:
3562 - Resource definition
3563 - Source gateway information
3564 """
3566 resource: MCPResource
3567 gateway_id: str
3568 gateway_name: str
3569 gateway_url: str
3572class FederatedPrompt(BaseModelWithConfigDict):
3573 """Schema for prompts from federated gateways.
3575 Contains:
3576 - Prompt definition
3577 - Source gateway information
3578 """
3580 prompt: MCPPrompt
3581 gateway_id: str
3582 gateway_name: str
3583 gateway_url: str
3586# --- RPC Schemas ---
3587class RPCRequest(BaseModel):
3588 """MCP-compliant RPC request validation"""
3590 model_config = ConfigDict(hide_input_in_errors=True)
3592 jsonrpc: Literal["2.0"]
3593 method: str
3594 params: Optional[Dict[str, Any]] = None
3595 id: Optional[Union[int, str]] = None
3597 @field_validator("method")
3598 @classmethod
3599 def validate_method(cls, v: str) -> str:
3600 """Ensure method names follow MCP format
3602 Args:
3603 v (str): Value to validate
3605 Returns:
3606 str: Value if determined as safe
3608 Raises:
3609 ValueError: When value is not safe
3610 """
3611 SecurityValidator.validate_no_xss(v, "RPC method name")
3612 # Runtime pattern matching (not precompiled to allow test monkeypatching)
3613 if not re.match(settings.validation_tool_method_pattern, v):
3614 raise ValueError("Invalid method name format")
3615 if len(v) > settings.validation_max_method_length:
3616 raise ValueError("Method name too long")
3617 return v
3619 @field_validator("params")
3620 @classmethod
3621 def validate_params(cls, v: Optional[Union[Dict, List]]) -> Optional[Union[Dict, List]]:
3622 """Validate RPC parameters
3624 Args:
3625 v (Union[dict, list]): Value to validate
3627 Returns:
3628 Union[dict, list]: Value if determined as safe
3630 Raises:
3631 ValueError: When value is not safe
3632 """
3633 if v is None:
3634 return v
3636 # Check size limits (MCP recommends max 256KB for params)
3637 param_size = len(orjson.dumps(v))
3638 if param_size > settings.validation_max_rpc_param_size:
3639 raise ValueError(f"Parameters exceed maximum size of {settings.validation_max_rpc_param_size} bytes")
3641 # Check depth
3642 SecurityValidator.validate_json_depth(v)
3643 return v
3646class RPCResponse(BaseModelWithConfigDict):
3647 """Schema for JSON-RPC 2.0 responses.
3649 Contains:
3650 - Protocol version
3651 - Result or error
3652 - Request ID
3653 """
3655 jsonrpc: Literal["2.0"]
3656 result: Optional[Any] = None
3657 error: Optional[Dict[str, Any]] = None
3658 id: Optional[Union[int, str]] = None
3661# --- Event and Admin Schemas ---
3664class EventMessage(BaseModelWithConfigDict):
3665 """Schema for SSE event messages.
3667 Includes:
3668 - Event type
3669 - Event data payload
3670 - Event timestamp
3671 """
3673 type: str = Field(..., description="Event type (tool_added, resource_updated, etc)")
3674 data: Dict[str, Any] = Field(..., description="Event payload")
3675 timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
3677 @field_serializer("timestamp")
3678 def serialize_timestamp(self, dt: datetime) -> str:
3679 """
3680 Serialize the `timestamp` field as an ISO 8601 string with UTC timezone.
3682 Converts the given datetime to UTC and returns it in ISO 8601 format,
3683 replacing the "+00:00" suffix with "Z" to indicate UTC explicitly.
3685 Args:
3686 dt (datetime): The datetime object to serialize.
3688 Returns:
3689 str: ISO 8601 formatted string in UTC, ending with 'Z'.
3690 """
3691 return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
3694class AdminToolCreate(BaseModelWithConfigDict):
3695 """Schema for creating tools via admin UI.
3697 Handles:
3698 - Basic tool information
3699 - JSON string inputs for headers/schema
3700 """
3702 name: str
3703 url: str
3704 description: Optional[str] = None
3705 integration_type: str = "MCP"
3706 headers: Optional[str] = None # JSON string
3707 input_schema: Optional[str] = None # JSON string
3709 @field_validator("headers", "input_schema")
3710 @classmethod
3711 def validate_json(cls, v: Optional[str]) -> Optional[Dict[str, Any]]:
3712 """
3713 Validate and parse JSON string inputs.
3715 Args:
3716 v: Input string
3718 Returns:
3719 dict: Output JSON version of v
3721 Raises:
3722 ValueError: When unable to convert to JSON
3723 """
3724 if not v:
3725 return None
3726 try:
3727 return orjson.loads(v)
3728 except orjson.JSONDecodeError:
3729 raise ValueError("Invalid JSON")
3732class AdminGatewayCreate(BaseModelWithConfigDict):
3733 """Schema for creating gateways via admin UI.
3735 Captures:
3736 - Gateway name
3737 - Endpoint URL
3738 - Optional description
3739 """
3741 name: str
3742 url: str
3743 description: Optional[str] = None
3746# --- New Schemas for Status Toggle Operations ---
3749class StatusToggleRequest(BaseModelWithConfigDict):
3750 """Request schema for toggling active status."""
3752 activate: bool = Field(..., description="Whether to activate (true) or deactivate (false) the item")
3755class StatusToggleResponse(BaseModelWithConfigDict):
3756 """Response schema for status toggle operations."""
3758 id: int
3759 name: str
3760 is_active: bool
3761 message: str = Field(..., description="Success message")
3764# --- Optional Filter Parameters for Listing Operations ---
3767class ListFilters(BaseModelWithConfigDict):
3768 """Filtering options for list operations."""
3770 include_inactive: bool = Field(False, description="Whether to include inactive items in the results")
3773# --- Server Schemas ---
3776class ServerCreate(BaseModel):
3777 """
3778 Schema for creating a new server.
3780 Attributes:
3781 model_config (ConfigDict): Configuration for the model, such as stripping whitespace from strings.
3782 name (str): The server's name.
3783 description (Optional[str]): Optional description of the server.
3784 icon (Optional[str]): Optional URL for the server's icon.
3785 associated_tools (Optional[List[str]]): Optional list of associated tool IDs.
3786 associated_resources (Optional[List[str]]): Optional list of associated resource IDs.
3787 associated_prompts (Optional[List[str]]): Optional list of associated prompt IDs.
3788 """
3790 model_config = ConfigDict(str_strip_whitespace=True)
3792 id: Optional[str] = Field(None, description="Custom UUID for the server (if not provided, one will be generated)")
3793 name: str = Field(..., description="The server's name")
3794 description: Optional[str] = Field(None, description="Server description")
3795 icon: Optional[str] = Field(None, description="URL for the server's icon")
3796 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the server")
3798 @field_validator("tags")
3799 @classmethod
3800 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
3801 """Validate and normalize tags.
3803 Args:
3804 v: Optional list of tag strings to validate
3806 Returns:
3807 List of validated tag strings
3808 """
3809 return validate_tags_field(v)
3811 @field_validator("id")
3812 @classmethod
3813 def validate_id(cls, v: Optional[str]) -> Optional[str]:
3814 """Validate server ID/UUID format
3816 Args:
3817 v (str): Value to validate
3819 Returns:
3820 str: Value if validated as safe
3822 Raises:
3823 ValueError: When displayName contains unsafe content or exceeds length limits
3825 Examples:
3826 >>> from mcpgateway.schemas import ServerCreate
3827 >>> ServerCreate.validate_id('550e8400-e29b-41d4-a716-446655440000')
3828 '550e8400e29b41d4a716446655440000'
3829 >>> ServerCreate.validate_id('invalid-uuid')
3830 Traceback (most recent call last):
3831 ...
3832 ValueError: ...
3833 """
3834 if v is None:
3835 return v
3836 return SecurityValidator.validate_uuid(v, "Server ID")
3838 associated_tools: Optional[List[str]] = Field(None, description="Comma-separated tool IDs")
3839 associated_resources: Optional[List[str]] = Field(None, description="Comma-separated resource IDs")
3840 associated_prompts: Optional[List[str]] = Field(None, description="Comma-separated prompt IDs")
3841 associated_a2a_agents: Optional[List[str]] = Field(None, description="Comma-separated A2A agent IDs")
3843 # Team scoping fields
3844 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
3845 owner_email: Optional[str] = Field(None, description="Email of the server owner")
3846 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
3848 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
3849 oauth_enabled: bool = Field(False, description="Enable OAuth 2.0 for MCP client authentication")
3850 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
3852 @field_validator("name")
3853 @classmethod
3854 def validate_name(cls, v: str) -> str:
3855 """Validate server name
3857 Args:
3858 v (str): Value to validate
3860 Returns:
3861 str: Value if validated as safe
3862 """
3863 return SecurityValidator.validate_name(v, "Server name")
3865 @field_validator("description")
3866 @classmethod
3867 def validate_description(cls, v: Optional[str]) -> Optional[str]:
3868 """Ensure descriptions display safely, truncate if too long
3870 Args:
3871 v (str): Value to validate
3873 Returns:
3874 str: Value if validated as safe and truncated if too long
3876 Raises:
3877 ValueError: When value is unsafe
3879 Examples:
3880 >>> from mcpgateway.schemas import ServerCreate
3881 >>> ServerCreate.validate_description('A safe description')
3882 'A safe description'
3883 >>> ServerCreate.validate_description(None) # Test None case
3884 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
3885 >>> truncated = ServerCreate.validate_description(long_desc)
3886 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
3887 0
3888 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
3889 True
3890 """
3891 if v is None:
3892 return v
3893 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
3894 # Truncate the description to the maximum allowed length
3895 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
3896 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
3897 return SecurityValidator.sanitize_display_text(truncated, "Description")
3898 return SecurityValidator.sanitize_display_text(v, "Description")
3900 @field_validator("icon")
3901 @classmethod
3902 def validate_icon(cls, v: Optional[str]) -> Optional[str]:
3903 """Validate icon URL
3905 Args:
3906 v (str): Value to validate
3908 Returns:
3909 str: Value if validated as safe
3910 """
3911 if v is None or v == "":
3912 return v
3913 return validate_core_url(v, "Icon URL")
3915 @field_validator("associated_tools", "associated_resources", "associated_prompts", "associated_a2a_agents", mode="before")
3916 @classmethod
3917 def split_comma_separated(cls, v):
3918 """
3919 Splits a comma-separated string into a list of strings if needed.
3921 Args:
3922 v: Input string
3924 Returns:
3925 list: Comma separated array of input string
3926 """
3927 if isinstance(v, str):
3928 return [item.strip() for item in v.split(",") if item.strip()]
3929 return v
3931 @field_validator("visibility")
3932 @classmethod
3933 def validate_visibility(cls, v: str) -> str:
3934 """Validate visibility level.
3936 Args:
3937 v: Visibility value to validate
3939 Returns:
3940 Validated visibility value
3942 Raises:
3943 ValueError: If visibility is invalid
3944 """
3945 if v not in ["private", "team", "public"]:
3946 raise ValueError("Visibility must be one of: private, team, public")
3947 return v
3949 @field_validator("team_id")
3950 @classmethod
3951 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
3952 """Validate team ID format.
3954 Args:
3955 v: Team ID to validate
3957 Returns:
3958 Validated team ID
3959 """
3960 if v is not None:
3961 return SecurityValidator.validate_uuid(v, "team_id")
3962 return v
3965class ServerUpdate(BaseModelWithConfigDict):
3966 """Schema for updating an existing server.
3968 All fields are optional to allow partial updates.
3969 """
3971 id: Optional[str] = Field(None, description="Custom UUID for the server")
3972 name: Optional[str] = Field(None, description="The server's name")
3973 description: Optional[str] = Field(None, description="Server description")
3974 icon: Optional[str] = Field(None, description="URL for the server's icon")
3975 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the server")
3977 # Team scoping fields
3978 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
3979 owner_email: Optional[str] = Field(None, description="Email of the server owner")
3980 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
3982 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
3983 oauth_enabled: Optional[bool] = Field(None, description="Enable OAuth 2.0 for MCP client authentication")
3984 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
3986 @field_validator("tags")
3987 @classmethod
3988 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
3989 """Validate and normalize tags.
3991 Args:
3992 v: Optional list of tag strings to validate
3994 Returns:
3995 List of validated tag strings
3996 """
3997 return validate_tags_field(v)
3999 @field_validator("id")
4000 @classmethod
4001 def validate_id(cls, v: Optional[str]) -> Optional[str]:
4002 """Validate server ID/UUID format
4004 Args:
4005 v (str): Value to validate
4007 Returns:
4008 str: Value if validated as safe
4010 Raises:
4011 ValueError: When displayName contains unsafe content or exceeds length limits
4013 Examples:
4014 >>> from mcpgateway.schemas import ServerUpdate
4015 >>> ServerUpdate.validate_id('550e8400-e29b-41d4-a716-446655440000')
4016 '550e8400e29b41d4a716446655440000'
4017 >>> ServerUpdate.validate_id('invalid-uuid')
4018 Traceback (most recent call last):
4019 ...
4020 ValueError: ...
4021 """
4022 if v is None:
4023 return v
4024 return SecurityValidator.validate_uuid(v, "Server ID")
4026 associated_tools: Optional[List[str]] = Field(None, description="Comma-separated tool IDs")
4027 associated_resources: Optional[List[str]] = Field(None, description="Comma-separated resource IDs")
4028 associated_prompts: Optional[List[str]] = Field(None, description="Comma-separated prompt IDs")
4029 associated_a2a_agents: Optional[List[str]] = Field(None, description="Comma-separated A2A agent IDs")
4031 @field_validator("name")
4032 @classmethod
4033 def validate_name(cls, v: str) -> str:
4034 """Validate server name
4036 Args:
4037 v (str): Value to validate
4039 Returns:
4040 str: Value if validated as safe
4041 """
4042 return SecurityValidator.validate_name(v, "Server name")
4044 @field_validator("description")
4045 @classmethod
4046 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4047 """Ensure descriptions display safely, truncate if too long
4049 Args:
4050 v (str): Value to validate
4052 Returns:
4053 str: Value if validated as safe and truncated if too long
4055 Raises:
4056 ValueError: When value is unsafe
4058 Examples:
4059 >>> from mcpgateway.schemas import ServerUpdate
4060 >>> ServerUpdate.validate_description('A safe description')
4061 'A safe description'
4062 >>> ServerUpdate.validate_description(None) # Test None case
4063 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4064 >>> truncated = ServerUpdate.validate_description(long_desc)
4065 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4066 0
4067 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4068 True
4069 """
4070 if v is None:
4071 return v
4072 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4073 # Truncate the description to the maximum allowed length
4074 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4075 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4076 return SecurityValidator.sanitize_display_text(truncated, "Description")
4077 return SecurityValidator.sanitize_display_text(v, "Description")
4079 @field_validator("icon")
4080 @classmethod
4081 def validate_icon(cls, v: Optional[str]) -> Optional[str]:
4082 """Validate icon URL
4084 Args:
4085 v (str): Value to validate
4087 Returns:
4088 str: Value if validated as safe
4089 """
4090 if v is None or v == "":
4091 return v
4092 return validate_core_url(v, "Icon URL")
4094 @field_validator("associated_tools", "associated_resources", "associated_prompts", "associated_a2a_agents", mode="before")
4095 @classmethod
4096 def split_comma_separated(cls, v):
4097 """
4098 Splits a comma-separated string into a list of strings if needed.
4100 Args:
4101 v: Input string
4103 Returns:
4104 list: Comma separated array of input string
4105 """
4106 if isinstance(v, str):
4107 return [item.strip() for item in v.split(",") if item.strip()]
4108 return v
4111class ServerRead(BaseModelWithConfigDict):
4112 """Schema for reading server information.
4114 Includes all server fields plus:
4115 - Database ID
4116 - Associated tool, resource, and prompt IDs
4117 - Creation/update timestamps
4118 - Active status
4119 - Metrics: Aggregated metrics for the server invocations.
4120 """
4122 id: str
4123 name: str
4124 description: Optional[str]
4125 icon: Optional[str]
4126 created_at: datetime
4127 updated_at: datetime
4128 # is_active: bool
4129 enabled: bool
4130 associated_tools: List[str] = []
4131 associated_tool_ids: List[str] = []
4132 associated_resources: List[str] = []
4133 associated_prompts: List[str] = []
4134 associated_a2a_agents: List[str] = []
4135 metrics: Optional[ServerMetrics] = Field(None, description="Server metrics (may be None in list operations)")
4136 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the server")
4138 # Comprehensive metadata for audit tracking
4139 created_by: Optional[str] = Field(None, description="Username who created this entity")
4140 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
4141 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
4142 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
4144 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
4145 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
4146 modified_via: Optional[str] = Field(None, description="Modification method")
4147 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
4149 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
4150 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
4151 version: Optional[int] = Field(1, description="Entity version for change tracking")
4153 # Team scoping fields
4154 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
4155 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
4156 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
4157 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
4159 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
4160 oauth_enabled: bool = Field(False, description="Whether OAuth 2.0 is enabled for MCP client authentication")
4161 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
4163 @model_validator(mode="before")
4164 @classmethod
4165 def populate_associated_ids(cls, values):
4166 """
4167 Pre-validation method that converts associated objects to their 'id'.
4169 This method checks 'associated_tools', 'associated_resources', and
4170 'associated_prompts' in the input and replaces each object with its `id`
4171 if present.
4173 Args:
4174 values (dict): The input values.
4176 Returns:
4177 dict: Updated values with object ids, or the original values if no
4178 changes are made.
4179 """
4180 # Normalize to a mutable dict
4181 if isinstance(values, dict):
4182 data = dict(values)
4183 else:
4184 try:
4185 data = dict(vars(values))
4186 except Exception:
4187 return values
4189 if data.get("associated_tools"):
4190 data["associated_tools"] = [getattr(tool, "id", tool) for tool in data["associated_tools"]]
4191 if data.get("associated_resources"):
4192 data["associated_resources"] = [getattr(res, "id", res) for res in data["associated_resources"]]
4193 if data.get("associated_prompts"):
4194 data["associated_prompts"] = [getattr(prompt, "id", prompt) for prompt in data["associated_prompts"]]
4195 if data.get("associated_a2a_agents"):
4196 data["associated_a2a_agents"] = [getattr(agent, "id", agent) for agent in data["associated_a2a_agents"]]
4197 return data
4199 def masked(self) -> "ServerRead":
4200 """Return a masked model with oauth_config secrets redacted.
4202 Returns:
4203 ServerRead: Masked server model.
4204 """
4205 masked_data = self.model_dump()
4206 if masked_data.get("oauth_config"):
4207 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
4208 return ServerRead.model_validate(masked_data)
4211class GatewayTestRequest(BaseModelWithConfigDict):
4212 """Schema for testing gateway connectivity.
4214 Includes the HTTP method, base URL, path, optional headers, body, and content type.
4215 """
4217 method: str = Field(..., description="HTTP method to test (GET, POST, etc.)")
4218 base_url: AnyHttpUrl = Field(..., description="Base URL of the gateway to test")
4219 path: str = Field(..., description="Path to append to the base URL")
4220 headers: Optional[Dict[str, str]] = Field(None, description="Optional headers for the request")
4221 body: Optional[Union[str, Dict[str, Any]]] = Field(None, description="Optional body for the request, can be a string or JSON object")
4222 content_type: Optional[str] = Field("application/json", description="Content type for the request body")
4225class GatewayTestResponse(BaseModelWithConfigDict):
4226 """Schema for the response from a gateway test request.
4228 Contains:
4229 - HTTP status code
4230 - Latency in milliseconds
4231 - Optional response body, which can be a string or JSON object
4232 """
4234 status_code: int = Field(..., description="HTTP status code returned by the gateway")
4235 latency_ms: int = Field(..., description="Latency of the request in milliseconds")
4236 body: Optional[Union[str, Dict[str, Any]]] = Field(None, description="Response body, can be a string or JSON object")
4239class TaggedEntity(BaseModelWithConfigDict):
4240 """A simplified representation of an entity that has a tag."""
4242 id: str = Field(..., description="The entity's ID")
4243 name: str = Field(..., description="The entity's name")
4244 type: str = Field(..., description="The entity type (tool, resource, prompt, server, gateway)")
4245 description: Optional[str] = Field(None, description="The entity's description")
4248class TagStats(BaseModelWithConfigDict):
4249 """Statistics for a single tag across all entity types."""
4251 tools: int = Field(default=0, description="Number of tools with this tag")
4252 resources: int = Field(default=0, description="Number of resources with this tag")
4253 prompts: int = Field(default=0, description="Number of prompts with this tag")
4254 servers: int = Field(default=0, description="Number of servers with this tag")
4255 gateways: int = Field(default=0, description="Number of gateways with this tag")
4256 total: int = Field(default=0, description="Total occurrences of this tag")
4259class TagInfo(BaseModelWithConfigDict):
4260 """Information about a single tag."""
4262 name: str = Field(..., description="The tag name")
4263 stats: TagStats = Field(..., description="Statistics for this tag")
4264 entities: Optional[List[TaggedEntity]] = Field(default_factory=list, description="Entities that have this tag")
4267class TopPerformer(BaseModelWithConfigDict):
4268 """Schema for representing top-performing entities with performance metrics.
4270 Used to encapsulate metrics for entities such as prompts, resources, servers, or tools,
4271 including execution count, average response time, success rate, and last execution timestamp.
4273 Attributes:
4274 id (Union[str, int]): Unique identifier for the entity.
4275 name (str): Name of the entity (e.g., prompt name, resource URI, server name, or tool name).
4276 execution_count (int): Total number of executions for the entity.
4277 avg_response_time (Optional[float]): Average response time in seconds, or None if no metrics.
4278 success_rate (Optional[float]): Success rate percentage, or None if no metrics.
4279 last_execution (Optional[datetime]): Timestamp of the last execution, or None if no metrics.
4280 """
4282 id: Union[str, int] = Field(..., description="Entity ID")
4283 name: str = Field(..., description="Entity name")
4284 execution_count: int = Field(..., description="Number of executions")
4285 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
4286 success_rate: Optional[float] = Field(None, description="Success rate percentage")
4287 last_execution: Optional[datetime] = Field(None, description="Timestamp of last execution")
4290# --- A2A Agent Schemas ---
4293class A2AAgentCreate(BaseModel):
4294 """
4295 Schema for creating a new A2A (Agent-to-Agent) compatible agent.
4297 Attributes:
4298 model_config (ConfigDict): Configuration for the model.
4299 name (str): Unique name for the agent.
4300 description (Optional[str]): Optional description of the agent.
4301 endpoint_url (str): URL endpoint for the agent.
4302 agent_type (str): Type of agent (e.g., "openai", "anthropic", "custom").
4303 protocol_version (str): A2A protocol version supported.
4304 capabilities (Dict[str, Any]): Agent capabilities and features.
4305 config (Dict[str, Any]): Agent-specific configuration parameters.
4306 auth_type (Optional[str]): Type of authentication ("api_key", "oauth", "bearer", etc.).
4307 auth_username (Optional[str]): Username for basic authentication.
4308 auth_password (Optional[str]): Password for basic authentication.
4309 auth_token (Optional[str]): Token for bearer authentication.
4310 auth_header_key (Optional[str]): Key for custom headers authentication.
4311 auth_header_value (Optional[str]): Value for custom headers authentication.
4312 auth_headers (Optional[List[Dict[str, str]]]): List of custom headers for authentication.
4313 auth_value (Optional[str]): Alias for authentication value, used for better access post-validation.
4314 tags (List[str]): Tags for categorizing the agent.
4315 team_id (Optional[str]): Team ID for resource organization.
4316 visibility (str): Visibility level ("private", "team", "public").
4317 """
4319 model_config = ConfigDict(str_strip_whitespace=True)
4321 name: str = Field(..., description="Unique name for the agent")
4322 slug: Optional[str] = Field(None, description="Optional slug for the agent (auto-generated if not provided)")
4323 description: Optional[str] = Field(None, description="Agent description")
4324 endpoint_url: str = Field(..., description="URL endpoint for the agent")
4325 agent_type: str = Field(default="generic", description="Type of agent (e.g., 'openai', 'anthropic', 'custom')")
4326 protocol_version: str = Field(default="1.0", description="A2A protocol version supported")
4327 capabilities: Dict[str, Any] = Field(default_factory=dict, description="Agent capabilities and features")
4328 config: Dict[str, Any] = Field(default_factory=dict, description="Agent-specific configuration parameters")
4329 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4330 # Authorizations
4331 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, authheaders, oauth, query_param, or none")
4332 # Fields for various types of authentication
4333 auth_username: Optional[str] = Field(None, description="Username for basic authentication")
4334 auth_password: Optional[str] = Field(None, description="Password for basic authentication")
4335 auth_token: Optional[str] = Field(None, description="Token for bearer authentication")
4336 auth_header_key: Optional[str] = Field(None, description="Key for custom headers authentication")
4337 auth_header_value: Optional[str] = Field(None, description="Value for custom headers authentication")
4338 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
4340 # OAuth 2.0 configuration
4341 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4343 # Query Parameter Authentication (CWE-598 security concern - use only when required by upstream)
4344 auth_query_param_key: Optional[str] = Field(
4345 None,
4346 description="Query parameter name for authentication (e.g., 'tavilyApiKey')",
4347 )
4348 auth_query_param_value: Optional[SecretStr] = Field(
4349 None,
4350 description="Query parameter value (API key) - will be encrypted at rest",
4351 )
4353 # Adding `auth_value` as an alias for better access post-validation
4354 auth_value: Optional[str] = Field(None, validate_default=True)
4355 tags: List[str] = Field(default_factory=list, description="Tags for categorizing the agent")
4357 # Team scoping fields
4358 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4359 owner_email: Optional[str] = Field(None, description="Email of the agent owner")
4360 visibility: Optional[str] = Field(default="public", description="Visibility level (private, team, public)")
4362 @field_validator("tags")
4363 @classmethod
4364 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
4365 """Validate and normalize tags.
4367 Args:
4368 v: Optional list of tag strings to validate
4370 Returns:
4371 List of validated tag strings
4372 """
4373 return validate_tags_field(v)
4375 @field_validator("name")
4376 @classmethod
4377 def validate_name(cls, v: str) -> str:
4378 """Validate agent name
4380 Args:
4381 v (str): Value to validate
4383 Returns:
4384 str: Value if validated as safe
4385 """
4386 return SecurityValidator.validate_name(v, "A2A Agent name")
4388 @field_validator("endpoint_url")
4389 @classmethod
4390 def validate_endpoint_url(cls, v: str) -> str:
4391 """Validate agent endpoint URL
4393 Args:
4394 v (str): Value to validate
4396 Returns:
4397 str: Value if validated as safe
4398 """
4399 return validate_core_url(v, "Agent endpoint URL")
4401 @field_validator("description")
4402 @classmethod
4403 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4404 """Ensure descriptions display safely, truncate if too long
4406 Args:
4407 v (str): Value to validate
4409 Returns:
4410 str: Value if validated as safe and truncated if too long
4412 Raises:
4413 ValueError: When value is unsafe
4415 Examples:
4416 >>> from mcpgateway.schemas import A2AAgentCreate
4417 >>> A2AAgentCreate.validate_description('A safe description')
4418 'A safe description'
4419 >>> A2AAgentCreate.validate_description(None) # Test None case
4420 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4421 >>> truncated = A2AAgentCreate.validate_description(long_desc)
4422 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4423 0
4424 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4425 True
4426 """
4427 if v is None:
4428 return v
4429 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4430 # Truncate the description to the maximum allowed length
4431 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4432 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4433 return SecurityValidator.sanitize_display_text(truncated, "Description")
4434 return SecurityValidator.sanitize_display_text(v, "Description")
4436 @field_validator("capabilities", "config")
4437 @classmethod
4438 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
4439 """Validate JSON structure depth
4441 Args:
4442 v (dict): Value to validate
4444 Returns:
4445 dict: Value if validated as safe
4446 """
4447 SecurityValidator.validate_json_depth(v)
4448 return v
4450 @field_validator("visibility")
4451 @classmethod
4452 def validate_visibility(cls, v: str) -> str:
4453 """Validate visibility level.
4455 Args:
4456 v: Visibility value to validate
4458 Returns:
4459 Validated visibility value
4461 Raises:
4462 ValueError: If visibility is invalid
4463 """
4464 if v not in ["private", "team", "public"]:
4465 raise ValueError("Visibility must be one of: private, team, public")
4466 return v
4468 @field_validator("team_id")
4469 @classmethod
4470 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4471 """Validate team ID format.
4473 Args:
4474 v: Team ID to validate
4476 Returns:
4477 Validated team ID
4478 """
4479 if v is not None:
4480 return SecurityValidator.validate_uuid(v, "team_id")
4481 return v
4483 @field_validator("auth_value", mode="before")
4484 @classmethod
4485 def create_auth_value(cls, v, info):
4486 """
4487 This validator will run before the model is fully instantiated (mode="before")
4488 It will process the auth fields based on auth_type and generate auth_value.
4490 Args:
4491 v: Input url
4492 info: ValidationInfo containing auth_type
4494 Returns:
4495 str: Auth value
4496 """
4497 data = info.data
4498 auth_type = data.get("auth_type")
4500 if (auth_type is None) or (auth_type == ""):
4501 return v # If no auth_type is provided, no need to create auth_value
4503 # Process the auth fields and generate auth_value based on auth_type
4504 auth_value = cls._process_auth_fields(info)
4505 return auth_value
4507 @staticmethod
4508 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
4509 """
4510 Processes the input authentication fields and returns the correct auth_value.
4511 This method is called based on the selected auth_type.
4513 Args:
4514 info: ValidationInfo containing auth fields
4516 Returns:
4517 Encoded auth string or None
4519 Raises:
4520 ValueError: If auth_type is invalid
4521 """
4522 data = info.data
4523 auth_type = data.get("auth_type")
4525 if auth_type == "basic":
4526 # For basic authentication, both username and password must be present
4527 username = data.get("auth_username")
4528 password = data.get("auth_password")
4530 if not username or not password:
4531 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
4533 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
4534 return encode_auth({"Authorization": f"Basic {creds}"})
4536 if auth_type == "bearer":
4537 # For bearer authentication, only token is required
4538 token = data.get("auth_token")
4540 if not token:
4541 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
4543 return encode_auth({"Authorization": f"Bearer {token}"})
4545 if auth_type == "oauth":
4546 # For OAuth authentication, we don't encode anything here
4547 # The OAuth configuration is handled separately in the oauth_config field
4548 # This method is only called for traditional auth types
4549 return None
4551 if auth_type == "authheaders":
4552 # Support both new multi-headers format and legacy single header format
4553 auth_headers = data.get("auth_headers")
4554 if auth_headers and isinstance(auth_headers, list):
4555 # New multi-headers format with enhanced validation
4556 header_dict = {}
4557 duplicate_keys = set()
4559 for header in auth_headers:
4560 if not isinstance(header, dict):
4561 continue
4563 key = header.get("key")
4564 value = header.get("value", "")
4566 # Skip headers without keys
4567 if not key:
4568 continue
4570 # Track duplicate keys (last value wins)
4571 if key in header_dict:
4572 duplicate_keys.add(key)
4574 # Validate header key format (basic HTTP header validation)
4575 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
4576 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
4578 # Store header (empty values are allowed)
4579 header_dict[key] = value
4581 # Ensure at least one valid header
4582 if not header_dict:
4583 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
4585 # Warn about duplicate keys (optional - could log this instead)
4586 if duplicate_keys:
4587 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
4589 # Check for excessive headers (prevent abuse)
4590 if len(header_dict) > 100:
4591 raise ValueError("Maximum of 100 headers allowed per gateway.")
4593 return encode_auth(header_dict)
4595 # Legacy single header format (backward compatibility)
4596 header_key = data.get("auth_header_key")
4597 header_value = data.get("auth_header_value")
4599 if not header_key or not header_value:
4600 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
4602 return encode_auth({header_key: header_value})
4604 if auth_type == "one_time_auth":
4605 # One-time auth does not require encoding here
4606 return None
4608 if auth_type == "query_param":
4609 # Query param auth doesn't use auth_value field
4610 # Validation is handled by model_validator
4611 return None
4613 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
4615 @model_validator(mode="after")
4616 def validate_query_param_auth(self) -> "A2AAgentCreate":
4617 """Validate query parameter authentication configuration.
4619 Returns:
4620 A2AAgentCreate: The validated instance.
4622 Raises:
4623 ValueError: If query param auth is disabled or host is not in allowlist.
4624 """
4625 if self.auth_type != "query_param":
4626 return self
4628 # Check feature flag
4629 if not settings.insecure_allow_queryparam_auth:
4630 raise ValueError("Query parameter authentication is disabled. " + "Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable. " + "WARNING: API keys in URLs may appear in proxy logs.")
4632 # Check required fields
4633 if not self.auth_query_param_key:
4634 raise ValueError("auth_query_param_key is required when auth_type is 'query_param'")
4635 if not self.auth_query_param_value:
4636 raise ValueError("auth_query_param_value is required when auth_type is 'query_param'")
4638 # Check host allowlist (if configured)
4639 if settings.insecure_queryparam_auth_allowed_hosts:
4640 parsed = urlparse(str(self.endpoint_url))
4641 # Extract hostname properly (handles IPv6, ports, userinfo)
4642 hostname = parsed.hostname or parsed.netloc.split("@")[-1].split(":")[0]
4643 hostname_lower = hostname.lower()
4645 if hostname_lower not in settings.insecure_queryparam_auth_allowed_hosts:
4646 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
4647 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query parameter auth. Allowed hosts: {allowed}")
4649 return self
4652class A2AAgentUpdate(BaseModelWithConfigDict):
4653 """Schema for updating an existing A2A agent.
4655 Similar to A2AAgentCreate but all fields are optional to allow partial updates.
4656 """
4658 name: Optional[str] = Field(None, description="Unique name for the agent")
4659 description: Optional[str] = Field(None, description="Agent description")
4660 endpoint_url: Optional[str] = Field(None, description="URL endpoint for the agent")
4661 agent_type: Optional[str] = Field(None, description="Type of agent")
4662 protocol_version: Optional[str] = Field(None, description="A2A protocol version supported")
4663 capabilities: Optional[Dict[str, Any]] = Field(None, description="Agent capabilities and features")
4664 config: Optional[Dict[str, Any]] = Field(None, description="Agent-specific configuration parameters")
4665 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4666 auth_type: Optional[str] = Field(None, description="Type of authentication")
4667 auth_username: Optional[str] = Field(None, description="username for basic authentication")
4668 auth_password: Optional[str] = Field(None, description="password for basic authentication")
4669 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
4670 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
4671 auth_header_value: Optional[str] = Field(None, description="value for custom headers authentication")
4672 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
4674 # Adding `auth_value` as an alias for better access post-validation
4675 auth_value: Optional[str] = Field(None, validate_default=True)
4677 # OAuth 2.0 configuration
4678 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4680 # Query Parameter Authentication (CWE-598 security concern - use only when required by upstream)
4681 auth_query_param_key: Optional[str] = Field(
4682 None,
4683 description="Query parameter name for authentication (e.g., 'tavilyApiKey')",
4684 )
4685 auth_query_param_value: Optional[SecretStr] = Field(
4686 None,
4687 description="Query parameter value (API key) - will be encrypted at rest",
4688 )
4690 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the agent")
4692 # Team scoping fields
4693 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4694 owner_email: Optional[str] = Field(None, description="Email of the agent owner")
4695 visibility: Optional[str] = Field(None, description="Visibility level (private, team, public)")
4697 @field_validator("tags")
4698 @classmethod
4699 def validate_tags(cls, v: Optional[List[str]]) -> Optional[List[str]]:
4700 """Validate and normalize tags.
4702 Args:
4703 v: Optional list of tag strings to validate
4705 Returns:
4706 List of validated tag strings or None if input is None
4707 """
4708 if v is None:
4709 return None
4710 return validate_tags_field(v)
4712 @field_validator("name")
4713 @classmethod
4714 def validate_name(cls, v: str) -> str:
4715 """Validate agent name
4717 Args:
4718 v (str): Value to validate
4720 Returns:
4721 str: Value if validated as safe
4722 """
4723 return SecurityValidator.validate_name(v, "A2A Agent name")
4725 @field_validator("endpoint_url")
4726 @classmethod
4727 def validate_endpoint_url(cls, v: str) -> str:
4728 """Validate agent endpoint URL
4730 Args:
4731 v (str): Value to validate
4733 Returns:
4734 str: Value if validated as safe
4735 """
4736 return validate_core_url(v, "Agent endpoint URL")
4738 @field_validator("description")
4739 @classmethod
4740 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4741 """Ensure descriptions display safely, truncate if too long
4743 Args:
4744 v (str): Value to validate
4746 Returns:
4747 str: Value if validated as safe and truncated if too long
4749 Raises:
4750 ValueError: When value is unsafe
4752 Examples:
4753 >>> from mcpgateway.schemas import A2AAgentUpdate
4754 >>> A2AAgentUpdate.validate_description('A safe description')
4755 'A safe description'
4756 >>> A2AAgentUpdate.validate_description(None) # Test None case
4757 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4758 >>> truncated = A2AAgentUpdate.validate_description(long_desc)
4759 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4760 0
4761 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4762 True
4763 """
4764 if v is None:
4765 return v
4766 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4767 # Truncate the description to the maximum allowed length
4768 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4769 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4770 return SecurityValidator.sanitize_display_text(truncated, "Description")
4771 return SecurityValidator.sanitize_display_text(v, "Description")
4773 @field_validator("capabilities", "config")
4774 @classmethod
4775 def validate_json_fields(cls, v: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
4776 """Validate JSON structure depth
4778 Args:
4779 v (dict): Value to validate
4781 Returns:
4782 dict: Value if validated as safe
4783 """
4784 if v is None:
4785 return v
4786 SecurityValidator.validate_json_depth(v)
4787 return v
4789 @field_validator("visibility")
4790 @classmethod
4791 def validate_visibility(cls, v: Optional[str]) -> Optional[str]:
4792 """Validate visibility level.
4794 Args:
4795 v: Visibility value to validate
4797 Returns:
4798 Validated visibility value
4800 Raises:
4801 ValueError: If visibility is invalid
4802 """
4803 if v is not None and v not in ["private", "team", "public"]:
4804 raise ValueError("Visibility must be one of: private, team, public")
4805 return v
4807 @field_validator("team_id")
4808 @classmethod
4809 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4810 """Validate team ID format.
4812 Args:
4813 v: Team ID to validate
4815 Returns:
4816 Validated team ID
4817 """
4818 if v is not None:
4819 return SecurityValidator.validate_uuid(v, "team_id")
4820 return v
4822 @field_validator("auth_value", mode="before")
4823 @classmethod
4824 def create_auth_value(cls, v, info):
4825 """
4826 This validator will run before the model is fully instantiated (mode="before")
4827 It will process the auth fields based on auth_type and generate auth_value.
4829 Args:
4830 v: Input URL
4831 info: ValidationInfo containing auth_type
4833 Returns:
4834 str: Auth value or URL
4835 """
4836 data = info.data
4837 auth_type = data.get("auth_type")
4839 if (auth_type is None) or (auth_type == ""):
4840 return v # If no auth_type is provided, no need to create auth_value
4842 # Process the auth fields and generate auth_value based on auth_type
4843 auth_value = cls._process_auth_fields(info)
4844 return auth_value
4846 @staticmethod
4847 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
4848 """
4849 Processes the input authentication fields and returns the correct auth_value.
4850 This method is called based on the selected auth_type.
4852 Args:
4853 info: ValidationInfo containing auth fields
4855 Returns:
4856 Encoded auth string or None
4858 Raises:
4859 ValueError: If auth type is invalid
4860 """
4862 data = info.data
4863 auth_type = data.get("auth_type")
4865 if auth_type == "basic":
4866 # For basic authentication, both username and password must be present
4867 username = data.get("auth_username")
4868 password = data.get("auth_password")
4869 if not username or not password:
4870 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
4872 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
4873 return encode_auth({"Authorization": f"Basic {creds}"})
4875 if auth_type == "bearer":
4876 # For bearer authentication, only token is required
4877 token = data.get("auth_token")
4879 if not token:
4880 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
4882 return encode_auth({"Authorization": f"Bearer {token}"})
4884 if auth_type == "oauth":
4885 # For OAuth authentication, we don't encode anything here
4886 # The OAuth configuration is handled separately in the oauth_config field
4887 # This method is only called for traditional auth types
4888 return None
4890 if auth_type == "authheaders":
4891 # Support both new multi-headers format and legacy single header format
4892 auth_headers = data.get("auth_headers")
4893 if auth_headers and isinstance(auth_headers, list):
4894 # New multi-headers format with enhanced validation
4895 header_dict = {}
4896 duplicate_keys = set()
4898 for header in auth_headers:
4899 if not isinstance(header, dict):
4900 continue
4902 key = header.get("key")
4903 value = header.get("value", "")
4905 # Skip headers without keys
4906 if not key:
4907 continue
4909 # Track duplicate keys (last value wins)
4910 if key in header_dict:
4911 duplicate_keys.add(key)
4913 # Validate header key format (basic HTTP header validation)
4914 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
4915 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
4917 # Store header (empty values are allowed)
4918 header_dict[key] = value
4920 # Ensure at least one valid header
4921 if not header_dict:
4922 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
4924 # Warn about duplicate keys (optional - could log this instead)
4925 if duplicate_keys:
4926 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
4928 # Check for excessive headers (prevent abuse)
4929 if len(header_dict) > 100:
4930 raise ValueError("Maximum of 100 headers allowed per gateway.")
4932 return encode_auth(header_dict)
4934 # Legacy single header format (backward compatibility)
4935 header_key = data.get("auth_header_key")
4936 header_value = data.get("auth_header_value")
4938 if not header_key or not header_value:
4939 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
4941 return encode_auth({header_key: header_value})
4943 if auth_type == "one_time_auth":
4944 # One-time auth does not require encoding here
4945 return None
4947 if auth_type == "query_param":
4948 # Query param auth doesn't use auth_value field
4949 # Validation is handled by model_validator
4950 return None
4952 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
4954 @model_validator(mode="after")
4955 def validate_query_param_auth(self) -> "A2AAgentUpdate":
4956 """Validate query parameter authentication configuration.
4958 NOTE: This only runs when auth_type is explicitly set to "query_param".
4959 Service-layer enforcement handles the case where auth_type is omitted
4960 but the existing agent uses query_param auth.
4962 Returns:
4963 A2AAgentUpdate: The validated instance.
4965 Raises:
4966 ValueError: If required fields are missing when setting query_param auth.
4967 """
4968 if self.auth_type == "query_param":
4969 # Validate fields are provided when explicitly setting query_param auth
4970 # Feature flag/allowlist check happens in service layer (has access to existing agent)
4971 if not self.auth_query_param_key:
4972 raise ValueError("auth_query_param_key is required when setting auth_type to 'query_param'")
4973 if not self.auth_query_param_value:
4974 raise ValueError("auth_query_param_value is required when setting auth_type to 'query_param'")
4976 return self
4979class A2AAgentRead(BaseModelWithConfigDict):
4980 """Schema for reading A2A agent information.
4982 Includes all agent fields plus:
4983 - Database ID
4984 - Slug
4985 - Creation/update timestamps
4986 - Enabled/reachable status
4987 - Metrics
4988 - Authentication type: basic, bearer, authheaders, oauth, query_param
4989 - Authentication value: username/password or token or custom headers
4990 - OAuth configuration for OAuth 2.0 authentication
4991 - Query parameter authentication (key name and masked value)
4993 Auto Populated fields:
4994 - Authentication username: for basic auth
4995 - Authentication password: for basic auth
4996 - Authentication token: for bearer auth
4997 - Authentication header key: for authheaders auth
4998 - Authentication header value: for authheaders auth
4999 - Query param key: for query_param auth
5000 - Query param value (masked): for query_param auth
5001 """
5003 id: Optional[str] = Field(None, description="Unique ID of the a2a agent")
5004 name: str = Field(..., description="Unique name for the a2a agent")
5005 slug: Optional[str] = Field(None, description="Slug for a2a agent endpoint URL")
5006 description: Optional[str] = Field(None, description="a2a agent description")
5007 endpoint_url: str = Field(..., description="a2a agent endpoint URL")
5008 agent_type: str
5009 protocol_version: str
5010 capabilities: Dict[str, Any]
5011 config: Dict[str, Any]
5012 enabled: bool
5013 reachable: bool
5014 created_at: datetime
5015 updated_at: datetime
5016 last_interaction: Optional[datetime]
5017 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the agent")
5018 metrics: Optional[A2AAgentMetrics] = Field(None, description="Agent metrics (may be None in list operations)")
5019 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
5020 # Authorizations
5021 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, authheaders, oauth, query_param, or None")
5022 auth_value: Optional[str] = Field(None, description="auth value: username/password or token or custom headers")
5024 # OAuth 2.0 configuration
5025 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
5027 # auth_value will populate the following fields
5028 auth_username: Optional[str] = Field(None, description="username for basic authentication")
5029 auth_password: Optional[str] = Field(None, description="password for basic authentication")
5030 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
5031 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
5032 auth_header_value: Optional[str] = Field(None, description="vallue for custom headers authentication")
5034 # Query Parameter Authentication (masked for security)
5035 auth_query_param_key: Optional[str] = Field(
5036 None,
5037 description="Query parameter name for authentication",
5038 )
5039 auth_query_param_value_masked: Optional[str] = Field(
5040 None,
5041 description="Masked query parameter value (actual value is encrypted at rest)",
5042 )
5044 # Comprehensive metadata for audit tracking
5045 created_by: Optional[str] = Field(None, description="Username who created this entity")
5046 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
5047 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
5048 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
5050 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
5051 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
5052 modified_via: Optional[str] = Field(None, description="Modification method")
5053 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
5055 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
5056 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
5057 version: Optional[int] = Field(1, description="Entity version for change tracking")
5059 # Team scoping fields
5060 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
5061 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
5062 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
5063 visibility: Optional[str] = Field(default="public", description="Visibility level: private, team, or public")
5065 @model_validator(mode="before")
5066 @classmethod
5067 def _mask_query_param_auth(cls, data: Any) -> Any:
5068 """Mask query param auth value when constructing from DB model.
5070 This extracts auth_query_params from the raw data (DB model or dict)
5071 and populates the masked fields for display.
5073 Args:
5074 data: The raw data (dict or ORM model) to process.
5076 Returns:
5077 Any: The processed data with masked query param values.
5078 """
5079 # Handle dict input
5080 if isinstance(data, dict):
5081 auth_query_params = data.get("auth_query_params")
5082 if auth_query_params and isinstance(auth_query_params, dict):
5083 # Extract the param key name and set masked value
5084 first_key = next(iter(auth_query_params.keys()), None)
5085 if first_key:
5086 data["auth_query_param_key"] = first_key
5087 data["auth_query_param_value_masked"] = settings.masked_auth_value
5088 # Handle ORM model input (has auth_query_params attribute)
5089 elif hasattr(data, "auth_query_params"):
5090 auth_query_params = getattr(data, "auth_query_params", None)
5091 if auth_query_params and isinstance(auth_query_params, dict):
5092 # Convert ORM to dict for modification, preserving all attributes
5093 # Start with table columns
5094 data_dict = {c.name: getattr(data, c.name) for c in data.__table__.columns}
5095 # Preserve dynamically added attributes like 'team' (from relationships)
5096 for attr in ["team"]:
5097 if hasattr(data, attr):
5098 data_dict[attr] = getattr(data, attr)
5099 first_key = next(iter(auth_query_params.keys()), None)
5100 if first_key:
5101 data_dict["auth_query_param_key"] = first_key
5102 data_dict["auth_query_param_value_masked"] = settings.masked_auth_value
5103 return data_dict
5104 return data
5106 # This will be the main method to automatically populate fields
5107 @model_validator(mode="after")
5108 def _populate_auth(self) -> Self:
5109 """Populate authentication fields based on auth_type and encoded auth_value.
5111 This post-validation method decodes the stored authentication value and
5112 populates the appropriate authentication fields (username/password, token,
5113 or custom headers) based on the authentication type. It ensures the
5114 authentication data is properly formatted and accessible through individual
5115 fields for display purposes.
5117 The method handles three authentication types:
5118 - basic: Extracts username and password from Authorization header
5119 - bearer: Extracts token from Bearer Authorization header
5120 - authheaders: Extracts custom header key/value pair
5122 Returns:
5123 Self: The instance with populated authentication fields:
5124 - For basic: auth_username and auth_password
5125 - For bearer: auth_token
5126 - For authheaders: auth_header_key and auth_header_value
5128 Raises:
5129 ValueError: If the authentication data is malformed:
5130 - Basic auth missing username or password
5131 - Bearer auth missing or improperly formatted Authorization header
5132 - Custom headers not exactly one key/value pair
5134 Examples:
5135 >>> # Basic auth example
5136 >>> string_bytes = "admin:secret".encode("utf-8")
5137 >>> encoded_auth = base64.urlsafe_b64encode(string_bytes).decode("utf-8")
5138 >>> values = GatewayRead.model_construct(
5139 ... auth_type="basic",
5140 ... auth_value=encode_auth({"Authorization": f"Basic {encoded_auth}"})
5141 ... )
5142 >>> values = A2AAgentRead._populate_auth(values)
5143 >>> values.auth_username
5144 'admin'
5145 >>> values.auth_password
5146 'secret'
5148 >>> # Bearer auth example
5149 >>> values = A2AAgentRead.model_construct(
5150 ... auth_type="bearer",
5151 ... auth_value=encode_auth({"Authorization": "Bearer mytoken123"})
5152 ... )
5153 >>> values = A2AAgentRead._populate_auth(values)
5154 >>> values.auth_token
5155 'mytoken123'
5157 >>> # Custom headers example
5158 >>> values = A2AAgentRead.model_construct(
5159 ... auth_type='authheaders',
5160 ... auth_value=encode_auth({"X-API-Key": "abc123"})
5161 ... )
5162 >>> values = A2AAgentRead._populate_auth(values)
5163 >>> values.auth_header_key
5164 'X-API-Key'
5165 >>> values.auth_header_value
5166 'abc123'
5167 """
5168 auth_type = self.auth_type
5169 auth_value_encoded = self.auth_value
5170 # Skip validation logic if masked value
5171 if auth_value_encoded == settings.masked_auth_value:
5172 return self
5174 # Handle OAuth authentication (no auth_value to decode)
5175 if auth_type == "oauth":
5176 # OAuth gateways don't have traditional auth_value to decode
5177 # They use oauth_config instead
5178 return self
5180 if auth_type == "one_time_auth":
5181 return self
5183 if auth_type == "query_param":
5184 # Query param auth is handled by the before validator
5185 # (auth_query_params from DB model is processed there)
5186 return self
5188 # If no encoded value is present, nothing to populate
5189 if not auth_value_encoded:
5190 return self
5192 auth_value = decode_auth(auth_value_encoded)
5193 if auth_type == "basic":
5194 auth = auth_value.get("Authorization")
5195 if not (isinstance(auth, str) and auth.startswith("Basic ")):
5196 raise ValueError("basic auth requires an Authorization header of the form 'Basic <base64>'")
5197 auth = auth.removeprefix("Basic ")
5198 u, p = base64.urlsafe_b64decode(auth).decode("utf-8").split(":")
5199 if not u or not p:
5200 raise ValueError("basic auth requires both username and password")
5201 self.auth_username, self.auth_password = u, p
5203 elif auth_type == "bearer":
5204 auth = auth_value.get("Authorization")
5205 if not (isinstance(auth, str) and auth.startswith("Bearer ")):
5206 raise ValueError("bearer auth requires an Authorization header of the form 'Bearer <token>'")
5207 self.auth_token = auth.removeprefix("Bearer ")
5209 elif auth_type == "authheaders":
5210 # For backward compatibility, populate first header in key/value fields
5211 if len(auth_value) == 0:
5212 raise ValueError("authheaders requires at least one key/value pair")
5213 k, v = next(iter(auth_value.items()))
5214 self.auth_header_key, self.auth_header_value = k, v
5215 return self
5217 def masked(self) -> "A2AAgentRead":
5218 """
5219 Return a masked version of the model instance with sensitive authentication fields hidden.
5221 This method creates a dictionary representation of the model data and replaces sensitive fields
5222 such as `auth_value`, `auth_password`, `auth_token`, and `auth_header_value` with a masked
5223 placeholder value defined in `settings.masked_auth_value`. Masking is only applied if the fields
5224 are present and not already masked.
5226 Args:
5227 None
5229 Returns:
5230 A2AAgentRead: A new instance of the A2AAgentRead model with sensitive authentication-related fields
5231 masked to prevent exposure of sensitive information.
5233 Notes:
5234 - The `auth_value` field is only masked if it exists and its value is different from the masking
5235 placeholder.
5236 - Other sensitive fields (`auth_password`, `auth_token`, `auth_header_value`) are masked if present.
5237 - Fields not related to authentication remain unmodified.
5238 """
5239 masked_data = self.model_dump()
5241 # Only mask if auth_value is present and not already masked
5242 if masked_data.get("auth_value") and masked_data["auth_value"] != settings.masked_auth_value:
5243 masked_data["auth_value"] = settings.masked_auth_value
5245 masked_data["auth_password"] = settings.masked_auth_value if masked_data.get("auth_password") else None
5246 masked_data["auth_token"] = settings.masked_auth_value if masked_data.get("auth_token") else None
5247 masked_data["auth_header_value"] = settings.masked_auth_value if masked_data.get("auth_header_value") else None
5249 # Mask sensitive keys inside oauth_config (e.g. password, client_secret)
5250 if masked_data.get("oauth_config"):
5251 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
5253 return A2AAgentRead.model_validate(masked_data)
5256class A2AAgentInvocation(BaseModelWithConfigDict):
5257 """Schema for A2A agent invocation requests.
5259 Contains:
5260 - Agent name or ID to invoke
5261 - Parameters for the agent interaction
5262 - Interaction type (query, execute, etc.)
5263 """
5265 agent_name: str = Field(..., description="Name of the A2A agent to invoke")
5266 parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters for agent interaction")
5267 interaction_type: str = Field(default="query", description="Type of interaction (query, execute, etc.)")
5269 @field_validator("agent_name")
5270 @classmethod
5271 def validate_agent_name(cls, v: str) -> str:
5272 """Ensure agent names follow naming conventions
5274 Args:
5275 v (str): Value to validate
5277 Returns:
5278 str: Value if validated as safe
5279 """
5280 return SecurityValidator.validate_name(v, "Agent name")
5282 @field_validator("parameters")
5283 @classmethod
5284 def validate_parameters(cls, v: Dict[str, Any]) -> Dict[str, Any]:
5285 """Validate parameters structure depth to prevent DoS attacks.
5287 Args:
5288 v (dict): Parameters dictionary to validate
5290 Returns:
5291 dict: The validated parameters if within depth limits
5293 Raises:
5294 ValueError: If the parameters exceed the maximum allowed depth
5295 """
5296 SecurityValidator.validate_json_depth(v)
5297 return v
5300# ---------------------------------------------------------------------------
5301# Email-Based Authentication Schemas
5302# ---------------------------------------------------------------------------
5305class EmailLoginRequest(BaseModel):
5306 """Request schema for email login.
5308 Attributes:
5309 email: User's email address
5310 password: User's password
5312 Examples:
5313 >>> request = EmailLoginRequest(email="user@example.com", password="secret123")
5314 >>> request.email
5315 'user@example.com'
5316 >>> request.password
5317 'secret123'
5318 """
5320 model_config = ConfigDict(str_strip_whitespace=True)
5322 email: EmailStr = Field(..., description="User's email address")
5323 password: str = Field(..., min_length=1, description="User's password")
5326class PublicRegistrationRequest(BaseModel):
5327 """Public self-registration request — minimal fields, password required.
5329 Extra fields are rejected (extra="forbid") so clients cannot submit
5330 admin-only fields like is_admin or is_active.
5332 Attributes:
5333 email: User's email address
5334 password: User's password (required, min 8 chars)
5335 full_name: Optional full name for display
5337 Examples:
5338 >>> request = PublicRegistrationRequest(
5339 ... email="new@example.com",
5340 ... password="secure123",
5341 ... full_name="New User"
5342 ... )
5343 >>> request.email
5344 'new@example.com'
5345 >>> request.full_name
5346 'New User'
5347 """
5349 model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
5351 email: EmailStr = Field(..., description="User's email address")
5352 password: str = Field(..., min_length=8, description="User's password")
5353 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5356class AdminCreateUserRequest(BaseModel):
5357 """Admin user creation request — all fields, password required.
5359 Attributes:
5360 email: User's email address
5361 password: User's password (required, min 8 chars)
5362 full_name: Optional full name for display
5363 is_admin: Whether user should have admin privileges (default: False)
5364 is_active: Whether user account is active (default: True)
5365 password_change_required: Whether user must change password on next login (default: False)
5367 Examples:
5368 >>> request = AdminCreateUserRequest(
5369 ... email="new@example.com",
5370 ... password="secure123",
5371 ... full_name="New User"
5372 ... )
5373 >>> request.email
5374 'new@example.com'
5375 >>> request.full_name
5376 'New User'
5377 >>> request.is_admin
5378 False
5379 >>> request.is_active
5380 True
5381 >>> request.password_change_required
5382 False
5383 """
5385 model_config = ConfigDict(str_strip_whitespace=True)
5387 email: EmailStr = Field(..., description="User's email address")
5388 password: str = Field(..., min_length=8, description="User's password")
5389 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5390 is_admin: bool = Field(False, description="Grant admin privileges to user")
5391 is_active: bool = Field(True, description="Whether user account is active")
5392 password_change_required: bool = Field(False, description="Whether user must change password on next login")
5395# Deprecated alias — use AdminCreateUserRequest or PublicRegistrationRequest instead
5396EmailRegistrationRequest = AdminCreateUserRequest
5399class ChangePasswordRequest(BaseModel):
5400 """Request schema for password change.
5402 Attributes:
5403 old_password: Current password for verification
5404 new_password: New password to set
5406 Examples:
5407 >>> request = ChangePasswordRequest(
5408 ... old_password="old_secret",
5409 ... new_password="new_secure_password"
5410 ... )
5411 >>> request.old_password
5412 'old_secret'
5413 >>> request.new_password
5414 'new_secure_password'
5415 """
5417 model_config = ConfigDict(str_strip_whitespace=True)
5419 old_password: str = Field(..., min_length=1, description="Current password")
5420 new_password: str = Field(..., min_length=8, description="New password")
5422 @field_validator("new_password")
5423 @classmethod
5424 def validate_new_password(cls, v: str) -> str:
5425 """Validate new password meets minimum requirements.
5427 Args:
5428 v: New password string to validate
5430 Returns:
5431 str: Validated new password
5433 Raises:
5434 ValueError: If new password doesn't meet requirements
5435 """
5436 if len(v) < 8:
5437 raise ValueError("New password must be at least 8 characters long")
5438 return v
5441class ForgotPasswordRequest(BaseModel):
5442 """Request schema for forgot-password flow."""
5444 model_config = ConfigDict(str_strip_whitespace=True)
5446 email: EmailStr = Field(..., description="Email address for password reset")
5449class ResetPasswordRequest(BaseModel):
5450 """Request schema for completing password reset."""
5452 model_config = ConfigDict(str_strip_whitespace=True)
5454 new_password: str = Field(..., min_length=8, description="New password to set")
5455 confirm_password: str = Field(..., min_length=8, description="Password confirmation")
5457 @model_validator(mode="after")
5458 def validate_password_match(self):
5459 """Ensure password and confirmation are identical.
5461 Returns:
5462 ResetPasswordRequest: Validated request instance.
5464 Raises:
5465 ValueError: If the password and confirmation do not match.
5466 """
5467 if self.new_password != self.confirm_password:
5468 raise ValueError("Passwords do not match")
5469 return self
5472class PasswordResetTokenValidationResponse(BaseModel):
5473 """Response schema for reset-token validation."""
5475 valid: bool = Field(..., description="Whether token is currently valid")
5476 message: str = Field(..., description="Validation status message")
5477 expires_at: Optional[datetime] = Field(None, description="Token expiration timestamp when valid")
5480class EmailUserResponse(BaseModel):
5481 """Response schema for user information.
5483 Attributes:
5484 email: User's email address
5485 full_name: User's full name
5486 is_admin: Whether user has admin privileges
5487 is_active: Whether account is active
5488 auth_provider: Authentication provider used
5489 created_at: Account creation timestamp
5490 last_login: Last successful login timestamp
5491 email_verified: Whether email is verified
5492 password_change_required: Whether user must change password on next login
5494 Examples:
5495 >>> user = EmailUserResponse(
5496 ... email="user@example.com",
5497 ... full_name="Test User",
5498 ... is_admin=False,
5499 ... is_active=True,
5500 ... auth_provider="local",
5501 ... created_at=datetime.now(),
5502 ... last_login=None,
5503 ... email_verified=False
5504 ... )
5505 >>> user.email
5506 'user@example.com'
5507 >>> user.is_admin
5508 False
5509 """
5511 model_config = ConfigDict(from_attributes=True)
5513 email: str = Field(..., description="User's email address")
5514 full_name: Optional[str] = Field(None, description="User's full name")
5515 is_admin: bool = Field(..., description="Whether user has admin privileges")
5516 is_active: bool = Field(..., description="Whether account is active")
5517 auth_provider: str = Field(..., description="Authentication provider")
5518 created_at: datetime = Field(..., description="Account creation timestamp")
5519 last_login: Optional[datetime] = Field(None, description="Last successful login")
5520 email_verified: bool = Field(False, description="Whether email is verified")
5521 password_change_required: bool = Field(False, description="Whether user must change password on next login")
5522 failed_login_attempts: int = Field(0, description="Current failed login attempts counter")
5523 locked_until: Optional[datetime] = Field(None, description="Account lock expiration timestamp")
5524 is_locked: bool = Field(False, description="Whether the account is currently locked")
5526 @classmethod
5527 def from_email_user(cls, user) -> "EmailUserResponse":
5528 """Create response from EmailUser model.
5530 Args:
5531 user: EmailUser model instance
5533 Returns:
5534 EmailUserResponse: Response schema instance
5535 """
5536 is_locked = user.is_account_locked()
5537 locked_until_raw = getattr(user, "locked_until", None)
5538 locked_until = locked_until_raw if isinstance(locked_until_raw, datetime) else None
5539 failed_attempts_raw = getattr(user, "failed_login_attempts", 0)
5540 try:
5541 failed_attempts = int(failed_attempts_raw or 0)
5542 except (TypeError, ValueError):
5543 failed_attempts = 0
5544 return cls(
5545 email=user.email,
5546 full_name=user.full_name,
5547 is_admin=user.is_admin,
5548 is_active=user.is_active,
5549 auth_provider=user.auth_provider,
5550 created_at=user.created_at,
5551 last_login=user.last_login,
5552 email_verified=user.is_email_verified(),
5553 password_change_required=user.password_change_required,
5554 failed_login_attempts=failed_attempts,
5555 locked_until=locked_until,
5556 is_locked=is_locked,
5557 )
5560class AuthenticationResponse(BaseModel):
5561 """Response schema for successful authentication.
5563 Attributes:
5564 access_token: JWT token for API access
5565 token_type: Type of token (always 'bearer')
5566 expires_in: Token expiration time in seconds
5567 user: User information
5569 Examples:
5570 >>> from datetime import datetime
5571 >>> response = AuthenticationResponse(
5572 ... access_token="jwt.token.here",
5573 ... token_type="bearer",
5574 ... expires_in=3600,
5575 ... user=EmailUserResponse(
5576 ... email="user@example.com",
5577 ... full_name="Test User",
5578 ... is_admin=False,
5579 ... is_active=True,
5580 ... auth_provider="local",
5581 ... created_at=datetime.now(),
5582 ... last_login=None,
5583 ... email_verified=False
5584 ... )
5585 ... )
5586 >>> response.token_type
5587 'bearer'
5588 >>> response.user.email
5589 'user@example.com'
5590 """
5592 access_token: str = Field(..., description="JWT access token")
5593 token_type: str = Field(default="bearer", description="Token type")
5594 expires_in: int = Field(..., description="Token expiration in seconds")
5595 user: EmailUserResponse = Field(..., description="User information")
5598class AuthEventResponse(BaseModel):
5599 """Response schema for authentication events.
5601 Attributes:
5602 id: Event ID
5603 timestamp: Event timestamp
5604 user_email: User's email address
5605 event_type: Type of authentication event
5606 success: Whether the event was successful
5607 ip_address: Client IP address
5608 failure_reason: Reason for failure (if applicable)
5610 Examples:
5611 >>> from datetime import datetime
5612 >>> event = AuthEventResponse(
5613 ... id=1,
5614 ... timestamp=datetime.now(),
5615 ... user_email="user@example.com",
5616 ... event_type="login",
5617 ... success=True,
5618 ... ip_address="192.168.1.1",
5619 ... failure_reason=None
5620 ... )
5621 >>> event.event_type
5622 'login'
5623 >>> event.success
5624 True
5625 """
5627 model_config = ConfigDict(from_attributes=True)
5629 id: int = Field(..., description="Event ID")
5630 timestamp: datetime = Field(..., description="Event timestamp")
5631 user_email: Optional[str] = Field(None, description="User's email address")
5632 event_type: str = Field(..., description="Type of authentication event")
5633 success: bool = Field(..., description="Whether the event was successful")
5634 ip_address: Optional[str] = Field(None, description="Client IP address")
5635 failure_reason: Optional[str] = Field(None, description="Reason for failure")
5638class UserListResponse(BaseModel):
5639 """Response schema for user list.
5641 Attributes:
5642 users: List of users
5643 total_count: Total number of users
5644 limit: Request limit
5645 offset: Request offset
5647 Examples:
5648 >>> user_list = UserListResponse(
5649 ... users=[],
5650 ... total_count=0,
5651 ... limit=10,
5652 ... offset=0
5653 ... )
5654 >>> user_list.total_count
5655 0
5656 >>> len(user_list.users)
5657 0
5658 """
5660 users: list[EmailUserResponse] = Field(..., description="List of users")
5661 total_count: int = Field(..., description="Total number of users")
5662 limit: int = Field(..., description="Request limit")
5663 offset: int = Field(..., description="Request offset")
5666class AdminUserUpdateRequest(BaseModel):
5667 """Request schema for admin user updates.
5669 Attributes:
5670 full_name: User's full name
5671 is_admin: Whether user has admin privileges
5672 is_active: Whether account is active
5673 password_change_required: Whether user must change password on next login
5674 password: New password (admin can reset without old password)
5676 Examples:
5677 >>> request = AdminUserUpdateRequest(
5678 ... full_name="Updated Name",
5679 ... is_admin=True,
5680 ... is_active=True
5681 ... )
5682 >>> request.full_name
5683 'Updated Name'
5684 >>> request.is_admin
5685 True
5686 """
5688 model_config = ConfigDict(str_strip_whitespace=True)
5690 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5691 is_admin: Optional[bool] = Field(None, description="Whether user has admin privileges")
5692 is_active: Optional[bool] = Field(None, description="Whether account is active")
5693 email_verified: Optional[bool] = Field(None, description="Whether user's email is verified")
5694 password_change_required: Optional[bool] = Field(None, description="Whether user must change password on next login")
5695 password: Optional[str] = Field(None, min_length=8, description="New password (admin reset)")
5698class ErrorResponse(BaseModel):
5699 """Standard error response schema.
5701 Attributes:
5702 error: Error type
5703 message: Human-readable error message
5704 details: Additional error details
5706 Examples:
5707 >>> error = ErrorResponse(
5708 ... error="authentication_failed",
5709 ... message="Invalid email or password",
5710 ... details=None
5711 ... )
5712 >>> error.error
5713 'authentication_failed'
5714 >>> error.message
5715 'Invalid email or password'
5716 """
5718 error: str = Field(..., description="Error type")
5719 message: str = Field(..., description="Human-readable error message")
5720 details: Optional[dict] = Field(None, description="Additional error details")
5723class SuccessResponse(BaseModel):
5724 """Standard success response schema.
5726 Attributes:
5727 success: Whether operation was successful
5728 message: Human-readable success message
5730 Examples:
5731 >>> response = SuccessResponse(
5732 ... success=True,
5733 ... message="Password changed successfully"
5734 ... )
5735 >>> response.success
5736 True
5737 >>> response.message
5738 'Password changed successfully'
5739 """
5741 success: bool = Field(True, description="Operation success status")
5742 message: str = Field(..., description="Human-readable success message")
5745# ---------------------------------------------------------------------------
5746# Team Management Schemas
5747# ---------------------------------------------------------------------------
5750class TeamCreateRequest(BaseModel):
5751 """Schema for creating a new team.
5753 Attributes:
5754 name: Team display name
5755 slug: URL-friendly team identifier (optional, auto-generated if not provided)
5756 description: Team description
5757 visibility: Team visibility level
5758 max_members: Maximum number of members allowed
5760 Examples:
5761 >>> request = TeamCreateRequest(
5762 ... name="Engineering Team",
5763 ... description="Software development team"
5764 ... )
5765 >>> request.name
5766 'Engineering Team'
5767 >>> request.visibility
5768 'private'
5769 >>> request.slug is None
5770 True
5771 >>>
5772 >>> # Test with all fields
5773 >>> full_request = TeamCreateRequest(
5774 ... name="DevOps Team",
5775 ... slug="devops-team",
5776 ... description="Infrastructure and deployment team",
5777 ... visibility="public",
5778 ... max_members=50
5779 ... )
5780 >>> full_request.slug
5781 'devops-team'
5782 >>> full_request.max_members
5783 50
5784 >>> full_request.visibility
5785 'public'
5786 >>>
5787 >>> # Test validation
5788 >>> try:
5789 ... TeamCreateRequest(name=" ", description="test")
5790 ... except ValueError as e:
5791 ... "empty" in str(e).lower()
5792 True
5793 >>>
5794 >>> # Test slug validation
5795 >>> try:
5796 ... TeamCreateRequest(name="Test", slug="Invalid_Slug")
5797 ... except ValueError:
5798 ... True
5799 True
5800 >>>
5801 >>> # Test valid slug patterns
5802 >>> valid_slug = TeamCreateRequest(name="Test", slug="valid-slug-123")
5803 >>> valid_slug.slug
5804 'valid-slug-123'
5805 """
5807 name: str = Field(..., min_length=1, max_length=255, description="Team display name")
5808 slug: Optional[str] = Field(None, min_length=2, max_length=255, pattern="^[a-z0-9-]+$", description="URL-friendly team identifier")
5809 description: Optional[str] = Field(None, max_length=1000, description="Team description")
5810 visibility: Literal["private", "public"] = Field("private", description="Team visibility level")
5811 max_members: Optional[int] = Field(default=None, description="Maximum number of team members")
5813 @field_validator("name")
5814 @classmethod
5815 def validate_name(cls, v: str) -> str:
5816 """Validate team name.
5818 Args:
5819 v: Team name to validate
5821 Returns:
5822 str: Validated and stripped team name
5824 Raises:
5825 ValueError: If team name is empty or contains invalid characters
5826 """
5827 if not v.strip():
5828 raise ValueError("Team name cannot be empty")
5829 v = v.strip()
5830 # Strict validation: only alphanumeric, underscore, period, dash, and spaces
5831 if not re.match(settings.validation_name_pattern, v):
5832 raise ValueError("Team name can only contain letters, numbers, spaces, underscores, periods, and dashes")
5833 SecurityValidator.validate_no_xss(v, "Team name")
5834 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5835 raise ValueError("Team name contains script patterns that may cause security issues")
5836 return v
5838 @field_validator("description")
5839 @classmethod
5840 def validate_description(cls, v: Optional[str]) -> Optional[str]:
5841 """Validate team description for XSS.
5843 Args:
5844 v: Team description to validate
5846 Returns:
5847 Optional[str]: Validated description or None
5849 Raises:
5850 ValueError: If description contains dangerous patterns
5851 """
5852 if v is not None:
5853 v = v.strip()
5854 if v:
5855 SecurityValidator.validate_no_xss(v, "Team description")
5856 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5857 raise ValueError("Team description contains script patterns that may cause security issues")
5858 return v if v else None
5860 @field_validator("slug")
5861 @classmethod
5862 def validate_slug(cls, v: Optional[str]) -> Optional[str]:
5863 """Validate team slug.
5865 Args:
5866 v: Team slug to validate
5868 Returns:
5869 Optional[str]: Validated and formatted slug or None
5871 Raises:
5872 ValueError: If slug format is invalid
5873 """
5874 if v is None:
5875 return v
5876 v = v.strip().lower()
5877 # Uses precompiled regex for slug validation
5878 if not _SLUG_RE.match(v):
5879 raise ValueError("Slug must contain only lowercase letters, numbers, and hyphens")
5880 if v.startswith("-") or v.endswith("-"):
5881 raise ValueError("Slug cannot start or end with hyphens")
5882 return v
5885class TeamUpdateRequest(BaseModel):
5886 """Schema for updating a team.
5888 Attributes:
5889 name: Team display name
5890 description: Team description
5891 visibility: Team visibility level
5892 max_members: Maximum number of members allowed
5894 Examples:
5895 >>> request = TeamUpdateRequest(
5896 ... name="Updated Engineering Team",
5897 ... description="Updated description"
5898 ... )
5899 >>> request.name
5900 'Updated Engineering Team'
5901 """
5903 name: Optional[str] = Field(None, min_length=1, max_length=255, description="Team display name")
5904 description: Optional[str] = Field(None, max_length=1000, description="Team description")
5905 visibility: Optional[Literal["private", "public"]] = Field(None, description="Team visibility level")
5906 max_members: Optional[int] = Field(default=None, description="Maximum number of team members")
5908 @field_validator("name")
5909 @classmethod
5910 def validate_name(cls, v: Optional[str]) -> Optional[str]:
5911 """Validate team name.
5913 Args:
5914 v: Team name to validate
5916 Returns:
5917 Optional[str]: Validated and stripped team name or None
5919 Raises:
5920 ValueError: If team name is empty or contains invalid characters
5921 """
5922 if v is not None:
5923 if not v.strip():
5924 raise ValueError("Team name cannot be empty")
5925 v = v.strip()
5926 # Strict validation: only alphanumeric, underscore, period, dash, and spaces
5927 if not re.match(settings.validation_name_pattern, v):
5928 raise ValueError("Team name can only contain letters, numbers, spaces, underscores, periods, and dashes")
5929 SecurityValidator.validate_no_xss(v, "Team name")
5930 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5931 raise ValueError("Team name contains script patterns that may cause security issues")
5932 return v
5933 return v
5935 @field_validator("description")
5936 @classmethod
5937 def validate_description(cls, v: Optional[str]) -> Optional[str]:
5938 """Validate team description for XSS.
5940 Args:
5941 v: Team description to validate
5943 Returns:
5944 Optional[str]: Validated description or None
5946 Raises:
5947 ValueError: If description contains dangerous patterns
5948 """
5949 if v is not None:
5950 v = v.strip()
5951 if v:
5952 SecurityValidator.validate_no_xss(v, "Team description")
5953 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5954 raise ValueError("Team description contains script patterns that may cause security issues")
5955 return v if v else None
5958class TeamResponse(BaseModel):
5959 """Schema for team response data.
5961 Attributes:
5962 id: Team UUID
5963 name: Team display name
5964 slug: URL-friendly team identifier
5965 description: Team description
5966 created_by: Email of team creator
5967 is_personal: Whether this is a personal team
5968 visibility: Team visibility level
5969 max_members: Maximum number of members allowed
5970 member_count: Current number of team members
5971 created_at: Team creation timestamp
5972 updated_at: Last update timestamp
5973 is_active: Whether the team is active
5975 Examples:
5976 >>> team = TeamResponse(
5977 ... id="team-123",
5978 ... name="Engineering Team",
5979 ... slug="engineering-team",
5980 ... created_by="admin@example.com",
5981 ... is_personal=False,
5982 ... visibility="private",
5983 ... member_count=5,
5984 ... created_at=datetime.now(timezone.utc),
5985 ... updated_at=datetime.now(timezone.utc),
5986 ... is_active=True
5987 ... )
5988 >>> team.name
5989 'Engineering Team'
5990 """
5992 id: str = Field(..., description="Team UUID")
5993 name: str = Field(..., description="Team display name")
5994 slug: str = Field(..., description="URL-friendly team identifier")
5995 description: Optional[str] = Field(None, description="Team description")
5996 created_by: str = Field(..., description="Email of team creator")
5997 is_personal: bool = Field(..., description="Whether this is a personal team")
5998 visibility: Optional[str] = Field(..., description="Team visibility level")
5999 max_members: Optional[int] = Field(None, description="Maximum number of members allowed")
6000 member_count: int = Field(..., description="Current number of team members")
6001 created_at: datetime = Field(..., description="Team creation timestamp")
6002 updated_at: datetime = Field(..., description="Last update timestamp")
6003 is_active: bool = Field(..., description="Whether the team is active")
6006class TeamMemberResponse(BaseModel):
6007 """Schema for team member response data.
6009 Attributes:
6010 id: Member UUID
6011 team_id: Team UUID
6012 user_email: Member email address
6013 role: Member role in the team
6014 joined_at: When the member joined
6015 invited_by: Email of user who invited this member
6016 is_active: Whether the membership is active
6018 Examples:
6019 >>> member = TeamMemberResponse(
6020 ... id="member-123",
6021 ... team_id="team-123",
6022 ... user_email="user@example.com",
6023 ... role="member",
6024 ... joined_at=datetime.now(timezone.utc),
6025 ... is_active=True
6026 ... )
6027 >>> member.role
6028 'member'
6029 """
6031 model_config = ConfigDict(from_attributes=True)
6033 id: str = Field(..., description="Member UUID")
6034 team_id: str = Field(..., description="Team UUID")
6035 user_email: str = Field(..., description="Member email address")
6036 role: str = Field(..., description="Member role in the team")
6037 joined_at: datetime = Field(..., description="When the member joined")
6038 invited_by: Optional[str] = Field(None, description="Email of user who invited this member")
6039 is_active: bool = Field(..., description="Whether the membership is active")
6042class PaginatedTeamMembersResponse(BaseModel):
6043 """Schema for paginated team member list response.
6045 Attributes:
6046 members: List of team members
6047 next_cursor: Optional cursor for next page of results
6049 Examples:
6050 >>> member1 = TeamMemberResponse(
6051 ... id="member-1",
6052 ... team_id="team-123",
6053 ... user_email="user1@example.com",
6054 ... role="member",
6055 ... joined_at=datetime.now(timezone.utc),
6056 ... is_active=True
6057 ... )
6058 >>> member2 = TeamMemberResponse(
6059 ... id="member-2",
6060 ... team_id="team-123",
6061 ... user_email="user2@example.com",
6062 ... role="member",
6063 ... joined_at=datetime.now(timezone.utc),
6064 ... is_active=True
6065 ... )
6066 >>> response = PaginatedTeamMembersResponse(
6067 ... members=[member1, member2],
6068 ... nextCursor="cursor-token-123"
6069 ... )
6070 >>> len(response.members)
6071 2
6072 """
6074 members: List[TeamMemberResponse] = Field(..., description="List of team members")
6075 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for next page of results")
6078class TeamInviteRequest(BaseModel):
6079 """Schema for inviting users to a team.
6081 Attributes:
6082 email: Email address of user to invite
6083 role: Role to assign to the user
6085 Examples:
6086 >>> invite = TeamInviteRequest(
6087 ... email="newuser@example.com",
6088 ... role="member"
6089 ... )
6090 >>> invite.email
6091 'newuser@example.com'
6092 """
6094 email: EmailStr = Field(..., description="Email address of user to invite")
6095 role: Literal["owner", "member"] = Field("member", description="Role to assign to the user")
6098class TeamInvitationResponse(BaseModel):
6099 """Schema for team invitation response data.
6101 Attributes:
6102 id: Invitation UUID
6103 team_id: Team UUID
6104 team_name: Team display name
6105 email: Email address of invited user
6106 role: Role the user will have when they accept
6107 invited_by: Email of user who sent the invitation
6108 invited_at: When the invitation was sent
6109 expires_at: When the invitation expires
6110 token: Invitation token
6111 is_active: Whether the invitation is active
6112 is_expired: Whether the invitation has expired
6114 Examples:
6115 >>> invitation = TeamInvitationResponse(
6116 ... id="invite-123",
6117 ... team_id="team-123",
6118 ... team_name="Engineering Team",
6119 ... email="newuser@example.com",
6120 ... role="member",
6121 ... invited_by="admin@example.com",
6122 ... invited_at=datetime.now(timezone.utc),
6123 ... expires_at=datetime.now(timezone.utc),
6124 ... token="invitation-token",
6125 ... is_active=True,
6126 ... is_expired=False
6127 ... )
6128 >>> invitation.role
6129 'member'
6130 """
6132 id: str = Field(..., description="Invitation UUID")
6133 team_id: str = Field(..., description="Team UUID")
6134 team_name: str = Field(..., description="Team display name")
6135 email: str = Field(..., description="Email address of invited user")
6136 role: str = Field(..., description="Role the user will have when they accept")
6137 invited_by: str = Field(..., description="Email of user who sent the invitation")
6138 invited_at: datetime = Field(..., description="When the invitation was sent")
6139 expires_at: datetime = Field(..., description="When the invitation expires")
6140 token: str = Field(..., description="Invitation token")
6141 is_active: bool = Field(..., description="Whether the invitation is active")
6142 is_expired: bool = Field(..., description="Whether the invitation has expired")
6145class TeamMemberAddRequest(BaseModel):
6146 """Schema for adding a team member.
6148 Attributes:
6149 email: Email address of user to be added to the team
6150 role: New role for the team member
6151 """
6153 email: EmailStr = Field(..., description="Email address of user to be added to the team")
6154 role: Literal["owner", "member"] = Field(..., description="New role for the team member")
6157class TeamMemberUpdateRequest(BaseModel):
6158 """Schema for updating a team member's role.
6160 Attributes:
6161 role: New role for the team member
6163 Examples:
6164 >>> update = TeamMemberUpdateRequest(role="member")
6165 >>> update.role
6166 'member'
6167 """
6169 role: Literal["owner", "member"] = Field(..., description="New role for the team member")
6172class TeamListResponse(BaseModel):
6173 """Schema for team list response.
6175 Attributes:
6176 teams: List of teams
6177 total: Total number of teams
6179 Examples:
6180 >>> response = TeamListResponse(teams=[], total=0)
6181 >>> response.total
6182 0
6183 """
6185 teams: List[TeamResponse] = Field(..., description="List of teams")
6186 total: int = Field(..., description="Total number of teams")
6189class TeamDiscoveryResponse(BaseModel):
6190 """Schema for public team discovery response.
6192 Provides limited metadata about public teams for discovery purposes.
6194 Attributes:
6195 id: Team ID
6196 name: Team name
6197 description: Team description
6198 member_count: Number of members
6199 created_at: Team creation timestamp
6200 is_joinable: Whether the current user can join this team
6201 """
6203 id: str = Field(..., description="Team ID")
6204 name: str = Field(..., description="Team name")
6205 description: Optional[str] = Field(None, description="Team description")
6206 member_count: int = Field(..., description="Number of team members")
6207 created_at: datetime = Field(..., description="Team creation timestamp")
6208 is_joinable: bool = Field(..., description="Whether the current user can join this team")
6211class TeamJoinRequest(BaseModel):
6212 """Schema for requesting to join a public team.
6214 Attributes:
6215 message: Optional message to team owners
6216 """
6218 message: Optional[str] = Field(None, description="Optional message to team owners", max_length=500)
6221class TeamJoinRequestResponse(BaseModel):
6222 """Schema for team join request response.
6224 Attributes:
6225 id: Join request ID
6226 team_id: Target team ID
6227 team_name: Target team name
6228 user_email: Requesting user email
6229 message: Request message
6230 status: Request status (pending, approved, rejected)
6231 requested_at: Request timestamp
6232 expires_at: Request expiration timestamp
6233 """
6235 id: str = Field(..., description="Join request ID")
6236 team_id: str = Field(..., description="Target team ID")
6237 team_name: str = Field(..., description="Target team name")
6238 user_email: str = Field(..., description="Requesting user email")
6239 message: Optional[str] = Field(None, description="Request message")
6240 status: str = Field(..., description="Request status")
6241 requested_at: datetime = Field(..., description="Request timestamp")
6242 expires_at: datetime = Field(..., description="Request expiration")
6245# API Token Management Schemas
6248class TokenScopeRequest(BaseModel):
6249 """Schema for token scoping configuration.
6251 Attributes:
6252 server_id: Optional server ID limitation
6253 permissions: List of permission scopes
6254 ip_restrictions: List of IP address/CIDR restrictions
6255 time_restrictions: Time-based access limitations
6256 usage_limits: Rate limiting and quota settings
6258 Examples:
6259 >>> scope = TokenScopeRequest(
6260 ... server_id="server-123",
6261 ... permissions=["tools.read", "resources.read"],
6262 ... ip_restrictions=["192.168.1.0/24"]
6263 ... )
6264 >>> scope.server_id
6265 'server-123'
6266 """
6268 server_id: Optional[str] = Field(None, description="Limit token to specific server")
6269 permissions: List[str] = Field(default_factory=list, description="Permission scopes")
6270 ip_restrictions: List[str] = Field(default_factory=list, description="IP address restrictions")
6271 time_restrictions: Dict[str, Any] = Field(default_factory=dict, description="Time-based restrictions")
6272 usage_limits: Dict[str, Any] = Field(default_factory=dict, description="Usage limits and quotas")
6274 @field_validator("ip_restrictions")
6275 @classmethod
6276 def validate_ip_restrictions(cls, v: List[str]) -> List[str]:
6277 """Validate IP addresses and CIDR notation.
6279 Args:
6280 v: List of IP address or CIDR strings to validate.
6282 Returns:
6283 List of validated IP/CIDR strings with whitespace stripped.
6285 Raises:
6286 ValueError: If any IP address or CIDR notation is invalid.
6288 Examples:
6289 >>> TokenScopeRequest.validate_ip_restrictions(["192.168.1.0/24"])
6290 ['192.168.1.0/24']
6291 >>> TokenScopeRequest.validate_ip_restrictions(["10.0.0.1"])
6292 ['10.0.0.1']
6293 """
6294 # Standard
6295 import ipaddress # pylint: disable=import-outside-toplevel
6297 if not v:
6298 return v
6300 validated = []
6301 for ip_str in v:
6302 ip_str = ip_str.strip()
6303 if not ip_str:
6304 continue
6305 try:
6306 # Try parsing as network (CIDR notation)
6307 if "/" in ip_str:
6308 ipaddress.ip_network(ip_str, strict=False)
6309 else:
6310 # Try parsing as single IP address
6311 ipaddress.ip_address(ip_str)
6312 validated.append(ip_str)
6313 except ValueError as e:
6314 raise ValueError(f"Invalid IP address or CIDR notation '{ip_str}': {e}") from e
6315 return validated
6317 @field_validator("permissions")
6318 @classmethod
6319 def validate_permissions(cls, v: List[str]) -> List[str]:
6320 """Validate permission scope format.
6322 Permissions must be in format 'resource.action' or wildcard '*'.
6324 Args:
6325 v: List of permission strings to validate.
6327 Returns:
6328 List of validated permission strings with whitespace stripped.
6330 Raises:
6331 ValueError: If any permission does not match 'resource.action' format or '*'.
6333 Examples:
6334 >>> TokenScopeRequest.validate_permissions(["tools.read", "resources.write"])
6335 ['tools.read', 'resources.write']
6336 >>> TokenScopeRequest.validate_permissions(["*"])
6337 ['*']
6338 """
6339 if not v:
6340 return v
6342 # Permission pattern: resource.action (alphanumeric with underscores)
6343 permission_pattern = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]*\.[a-zA-Z][a-zA-Z0-9_]*$")
6345 validated = []
6346 for perm in v:
6347 perm = perm.strip()
6348 if not perm:
6349 continue
6350 # Allow wildcard
6351 if perm == "*":
6352 validated.append(perm)
6353 continue
6354 if not permission_pattern.match(perm):
6355 raise ValueError(f"Invalid permission format '{perm}'. Use 'resource.action' format (e.g., 'tools.read') or '*' for full access")
6356 validated.append(perm)
6357 return validated
6360class TokenCreateRequest(BaseModel):
6361 """Schema for creating a new API token.
6363 Attributes:
6364 name: Human-readable token name
6365 description: Optional token description
6366 expires_in_days: Optional expiry in days
6367 scope: Optional token scoping configuration
6368 tags: Optional organizational tags
6369 is_active: Token active status (defaults to True)
6371 Examples:
6372 >>> request = TokenCreateRequest(
6373 ... name="Production Access",
6374 ... description="Read-only production access",
6375 ... expires_in_days=30,
6376 ... tags=["production", "readonly"]
6377 ... )
6378 >>> request.name
6379 'Production Access'
6380 """
6382 name: str = Field(..., description="Human-readable token name", min_length=1, max_length=255)
6383 description: Optional[str] = Field(None, description="Token description", max_length=1000)
6384 expires_in_days: Optional[int] = Field(default=None, ge=1, description="Expiry in days (must be >= 1 if specified)")
6385 scope: Optional[TokenScopeRequest] = Field(None, description="Token scoping configuration")
6386 tags: List[str] = Field(default_factory=list, description="Organizational tags")
6387 team_id: Optional[str] = Field(None, description="Team ID for team-scoped tokens")
6388 is_active: bool = Field(default=True, description="Token active status")
6391class TokenUpdateRequest(BaseModel):
6392 """Schema for updating an existing API token.
6394 Attributes:
6395 name: New token name
6396 description: New token description
6397 scope: New token scoping configuration
6398 tags: New organizational tags
6399 is_active: New token active status
6401 Examples:
6402 >>> request = TokenUpdateRequest(
6403 ... name="Updated Token Name",
6404 ... description="Updated description"
6405 ... )
6406 >>> request.name
6407 'Updated Token Name'
6408 """
6410 name: Optional[str] = Field(None, description="New token name", min_length=1, max_length=255)
6411 description: Optional[str] = Field(None, description="New token description", max_length=1000)
6412 scope: Optional[TokenScopeRequest] = Field(None, description="New token scoping configuration")
6413 tags: Optional[List[str]] = Field(None, description="New organizational tags")
6414 is_active: Optional[bool] = Field(None, description="New token active status")
6417class TokenResponse(BaseModel):
6418 """Schema for API token response.
6420 Attributes:
6421 id: Token ID
6422 name: Token name
6423 description: Token description
6424 server_id: Server scope limitation
6425 resource_scopes: Permission scopes
6426 ip_restrictions: IP restrictions
6427 time_restrictions: Time-based restrictions
6428 usage_limits: Usage limits
6429 created_at: Creation timestamp
6430 expires_at: Expiry timestamp
6431 last_used: Last usage timestamp
6432 is_active: Active status
6433 tags: Organizational tags
6435 Examples:
6436 >>> from datetime import datetime
6437 >>> token = TokenResponse(
6438 ... id="token-123",
6439 ... name="Test Token",
6440 ... description="Test description",
6441 ... user_email="test@example.com",
6442 ... server_id=None,
6443 ... resource_scopes=["tools.read"],
6444 ... ip_restrictions=[],
6445 ... time_restrictions={},
6446 ... usage_limits={},
6447 ... created_at=datetime.now(),
6448 ... expires_at=None,
6449 ... last_used=None,
6450 ... is_active=True,
6451 ... tags=[]
6452 ... )
6453 >>> token.name
6454 'Test Token'
6455 """
6457 model_config = ConfigDict(from_attributes=True)
6459 id: str = Field(..., description="Token ID")
6460 name: str = Field(..., description="Token name")
6461 description: Optional[str] = Field(None, description="Token description")
6462 user_email: str = Field(..., description="Token creator's email")
6463 team_id: Optional[str] = Field(None, description="Team ID for team-scoped tokens")
6464 server_id: Optional[str] = Field(None, description="Server scope limitation")
6465 resource_scopes: List[str] = Field(..., description="Permission scopes")
6466 ip_restrictions: List[str] = Field(..., description="IP restrictions")
6467 time_restrictions: Dict[str, Any] = Field(..., description="Time-based restrictions")
6468 usage_limits: Dict[str, Any] = Field(..., description="Usage limits")
6469 created_at: datetime = Field(..., description="Creation timestamp")
6470 expires_at: Optional[datetime] = Field(None, description="Expiry timestamp")
6471 last_used: Optional[datetime] = Field(None, description="Last usage timestamp")
6472 is_active: bool = Field(..., description="Active status")
6473 is_revoked: bool = Field(False, description="Whether token is revoked")
6474 revoked_at: Optional[datetime] = Field(None, description="Revocation timestamp")
6475 revoked_by: Optional[str] = Field(None, description="Email of user who revoked token")
6476 revocation_reason: Optional[str] = Field(None, description="Reason for revocation")
6477 tags: List[str] = Field(..., description="Organizational tags")
6480class TokenCreateResponse(BaseModel):
6481 """Schema for token creation response.
6483 Attributes:
6484 token: Token information
6485 access_token: The actual token string (only returned on creation)
6487 Examples:
6488 >>> from datetime import datetime
6489 >>> token_info = TokenResponse(
6490 ... id="token-123", name="Test Token", description=None,
6491 ... user_email="test@example.com", server_id=None, resource_scopes=[], ip_restrictions=[],
6492 ... time_restrictions={}, usage_limits={}, created_at=datetime.now(),
6493 ... expires_at=None, last_used=None, is_active=True, tags=[]
6494 ... )
6495 >>> response = TokenCreateResponse(
6496 ... token=token_info,
6497 ... access_token="abc123xyz"
6498 ... )
6499 >>> response.access_token
6500 'abc123xyz'
6501 """
6503 token: TokenResponse = Field(..., description="Token information")
6504 access_token: str = Field(..., description="The actual token string")
6507class TokenListResponse(BaseModel):
6508 """Schema for token list response.
6510 Attributes:
6511 tokens: List of tokens
6512 total: Total number of tokens
6513 limit: Request limit
6514 offset: Request offset
6516 Examples:
6517 >>> response = TokenListResponse(
6518 ... tokens=[],
6519 ... total=0,
6520 ... limit=10,
6521 ... offset=0
6522 ... )
6523 >>> response.total
6524 0
6525 """
6527 tokens: List[TokenResponse] = Field(..., description="List of tokens")
6528 total: int = Field(..., description="Total number of tokens")
6529 limit: int = Field(..., description="Request limit")
6530 offset: int = Field(..., description="Request offset")
6533class TokenRevokeRequest(BaseModel):
6534 """Schema for token revocation.
6536 Attributes:
6537 reason: Optional reason for revocation
6539 Examples:
6540 >>> request = TokenRevokeRequest(reason="Security incident")
6541 >>> request.reason
6542 'Security incident'
6543 """
6545 reason: Optional[str] = Field(None, description="Reason for revocation", max_length=255)
6548class TokenUsageStatsResponse(BaseModel):
6549 """Schema for token usage statistics.
6551 Attributes:
6552 period_days: Number of days analyzed
6553 total_requests: Total number of requests
6554 successful_requests: Number of successful requests
6555 blocked_requests: Number of blocked requests
6556 success_rate: Success rate percentage
6557 average_response_time_ms: Average response time
6558 top_endpoints: Most accessed endpoints
6560 Examples:
6561 >>> stats = TokenUsageStatsResponse(
6562 ... period_days=30,
6563 ... total_requests=100,
6564 ... successful_requests=95,
6565 ... blocked_requests=5,
6566 ... success_rate=0.95,
6567 ... average_response_time_ms=150.5,
6568 ... top_endpoints=[("/tools", 50), ("/resources", 30)]
6569 ... )
6570 >>> stats.success_rate
6571 0.95
6572 """
6574 period_days: int = Field(..., description="Number of days analyzed")
6575 total_requests: int = Field(..., description="Total number of requests")
6576 successful_requests: int = Field(..., description="Number of successful requests")
6577 blocked_requests: int = Field(..., description="Number of blocked requests")
6578 success_rate: float = Field(..., description="Success rate (0-1)")
6579 average_response_time_ms: float = Field(..., description="Average response time in milliseconds")
6580 top_endpoints: List[tuple[str, int]] = Field(..., description="Most accessed endpoints with counts")
6583# ===== RBAC Schemas =====
6586class RoleCreateRequest(BaseModel):
6587 """Schema for creating a new role.
6589 Attributes:
6590 name: Unique role name
6591 description: Role description
6592 scope: Role scope (global, team, personal)
6593 permissions: List of permission strings
6594 inherits_from: Optional parent role ID
6595 is_system_role: Whether this is a system role
6597 Examples:
6598 >>> request = RoleCreateRequest(
6599 ... name="team_admin",
6600 ... description="Team administrator with member management",
6601 ... scope="team",
6602 ... permissions=["teams.manage_members", "resources.create"]
6603 ... )
6604 >>> request.name
6605 'team_admin'
6606 """
6608 name: str = Field(..., description="Unique role name", max_length=255)
6609 description: Optional[str] = Field(None, description="Role description")
6610 scope: str = Field(..., description="Role scope", pattern="^(global|team|personal)$")
6611 permissions: List[str] = Field(..., description="List of permission strings")
6612 inherits_from: Optional[str] = Field(None, description="Parent role ID for inheritance")
6613 is_system_role: Optional[bool] = Field(False, description="Whether this is a system role")
6616class RoleUpdateRequest(BaseModel):
6617 """Schema for updating an existing role.
6619 Attributes:
6620 name: Optional new name
6621 description: Optional new description
6622 permissions: Optional new permissions list
6623 inherits_from: Optional new parent role
6624 is_active: Optional active status
6626 Examples:
6627 >>> request = RoleUpdateRequest(
6628 ... description="Updated role description",
6629 ... permissions=["new.permission"]
6630 ... )
6631 >>> request.description
6632 'Updated role description'
6633 """
6635 name: Optional[str] = Field(None, description="Role name", max_length=255)
6636 description: Optional[str] = Field(None, description="Role description")
6637 permissions: Optional[List[str]] = Field(None, description="List of permission strings")
6638 inherits_from: Optional[str] = Field(None, description="Parent role ID for inheritance")
6639 is_active: Optional[bool] = Field(None, description="Whether role is active")
6642class RoleResponse(BaseModel):
6643 """Schema for role response.
6645 Attributes:
6646 id: Role identifier
6647 name: Role name
6648 description: Role description
6649 scope: Role scope
6650 permissions: List of permissions
6651 effective_permissions: All permissions including inherited
6652 inherits_from: Parent role ID
6653 created_by: Creator email
6654 is_system_role: Whether system role
6655 is_active: Whether role is active
6656 created_at: Creation timestamp
6657 updated_at: Update timestamp
6659 Examples:
6660 >>> role = RoleResponse(
6661 ... id="role-123",
6662 ... name="admin",
6663 ... scope="global",
6664 ... permissions=["*"],
6665 ... effective_permissions=["*"],
6666 ... created_by="admin@example.com",
6667 ... is_system_role=True,
6668 ... is_active=True,
6669 ... created_at=datetime.now(),
6670 ... updated_at=datetime.now()
6671 ... )
6672 >>> role.name
6673 'admin'
6674 """
6676 model_config = ConfigDict(from_attributes=True)
6678 id: str = Field(..., description="Role identifier")
6679 name: str = Field(..., description="Role name")
6680 description: Optional[str] = Field(None, description="Role description")
6681 scope: str = Field(..., description="Role scope")
6682 permissions: List[str] = Field(..., description="Direct permissions")
6683 effective_permissions: Optional[List[str]] = Field(None, description="All permissions including inherited")
6684 inherits_from: Optional[str] = Field(None, description="Parent role ID")
6685 created_by: str = Field(..., description="Creator email")
6686 is_system_role: bool = Field(..., description="Whether system role")
6687 is_active: bool = Field(..., description="Whether role is active")
6688 created_at: datetime = Field(..., description="Creation timestamp")
6689 updated_at: datetime = Field(..., description="Update timestamp")
6692class UserRoleAssignRequest(BaseModel):
6693 """Schema for assigning a role to a user.
6695 Attributes:
6696 role_id: Role to assign
6697 scope: Assignment scope
6698 scope_id: Team ID if team-scoped
6699 expires_at: Optional expiration timestamp
6701 Examples:
6702 >>> request = UserRoleAssignRequest(
6703 ... role_id="role-123",
6704 ... scope="team",
6705 ... scope_id="team-456"
6706 ... )
6707 >>> request.scope
6708 'team'
6709 """
6711 role_id: str = Field(..., description="Role ID to assign")
6712 scope: str = Field(..., description="Assignment scope", pattern="^(global|team|personal)$")
6713 scope_id: Optional[str] = Field(None, description="Team ID if team-scoped")
6714 expires_at: Optional[datetime] = Field(None, description="Optional expiration timestamp")
6717class UserRoleResponse(BaseModel):
6718 """Schema for user role assignment response.
6720 Attributes:
6721 id: Assignment identifier
6722 user_email: User email
6723 role_id: Role identifier
6724 role_name: Role name for convenience
6725 scope: Assignment scope
6726 scope_id: Team ID if applicable
6727 granted_by: Who granted the role
6728 granted_at: When role was granted
6729 expires_at: Optional expiration
6730 is_active: Whether assignment is active
6732 Examples:
6733 >>> user_role = UserRoleResponse(
6734 ... id="assignment-123",
6735 ... user_email="user@example.com",
6736 ... role_id="role-456",
6737 ... role_name="team_admin",
6738 ... scope="team",
6739 ... scope_id="team-789",
6740 ... granted_by="admin@example.com",
6741 ... granted_at=datetime.now(),
6742 ... is_active=True
6743 ... )
6744 >>> user_role.scope
6745 'team'
6746 """
6748 model_config = ConfigDict(from_attributes=True)
6750 id: str = Field(..., description="Assignment identifier")
6751 user_email: str = Field(..., description="User email")
6752 role_id: str = Field(..., description="Role identifier")
6753 role_name: Optional[str] = Field(None, description="Role name for convenience")
6754 scope: str = Field(..., description="Assignment scope")
6755 scope_id: Optional[str] = Field(None, description="Team ID if applicable")
6756 granted_by: str = Field(..., description="Who granted the role")
6757 granted_at: datetime = Field(..., description="When role was granted")
6758 expires_at: Optional[datetime] = Field(None, description="Optional expiration")
6759 is_active: bool = Field(..., description="Whether assignment is active")
6760 grant_source: Optional[str] = Field(None, description="Origin of the grant (e.g., 'sso', 'manual', 'bootstrap', 'auto')")
6763class PermissionCheckRequest(BaseModel):
6764 """Schema for permission check request.
6766 Attributes:
6767 user_email: User to check
6768 permission: Permission to verify
6769 resource_type: Optional resource type
6770 resource_id: Optional resource ID
6771 team_id: Optional team context
6773 Examples:
6774 >>> request = PermissionCheckRequest(
6775 ... user_email="user@example.com",
6776 ... permission="tools.create",
6777 ... resource_type="tools"
6778 ... )
6779 >>> request.permission
6780 'tools.create'
6781 """
6783 user_email: str = Field(..., description="User email to check")
6784 permission: str = Field(..., description="Permission to verify")
6785 resource_type: Optional[str] = Field(None, description="Resource type")
6786 resource_id: Optional[str] = Field(None, description="Resource ID")
6787 team_id: Optional[str] = Field(None, description="Team context")
6790class PermissionCheckResponse(BaseModel):
6791 """Schema for permission check response.
6793 Attributes:
6794 user_email: User checked
6795 permission: Permission checked
6796 granted: Whether permission was granted
6797 checked_at: When check was performed
6798 checked_by: Who performed the check
6800 Examples:
6801 >>> response = PermissionCheckResponse(
6802 ... user_email="user@example.com",
6803 ... permission="tools.create",
6804 ... granted=True,
6805 ... checked_at=datetime.now(),
6806 ... checked_by="admin@example.com"
6807 ... )
6808 >>> response.granted
6809 True
6810 """
6812 user_email: str = Field(..., description="User email checked")
6813 permission: str = Field(..., description="Permission checked")
6814 granted: bool = Field(..., description="Whether permission was granted")
6815 checked_at: datetime = Field(..., description="When check was performed")
6816 checked_by: str = Field(..., description="Who performed the check")
6819class PermissionListResponse(BaseModel):
6820 """Schema for available permissions list.
6822 Attributes:
6823 all_permissions: List of all available permissions
6824 permissions_by_resource: Permissions grouped by resource type
6825 total_count: Total number of permissions
6827 Examples:
6828 >>> response = PermissionListResponse(
6829 ... all_permissions=["users.create", "tools.read"],
6830 ... permissions_by_resource={"users": ["users.create"], "tools": ["tools.read"]},
6831 ... total_count=2
6832 ... )
6833 >>> response.total_count
6834 2
6835 """
6837 all_permissions: List[str] = Field(..., description="All available permissions")
6838 permissions_by_resource: Dict[str, List[str]] = Field(..., description="Permissions by resource type")
6839 total_count: int = Field(..., description="Total number of permissions")
6842# ==============================================================================
6843# SSO Authentication Schemas
6844# ==============================================================================
6847class SSOProviderResponse(BaseModelWithConfigDict):
6848 """Response schema for SSO provider information.
6850 Attributes:
6851 id: Provider identifier (e.g., 'github', 'google')
6852 name: Provider name
6853 display_name: Human-readable display name
6854 provider_type: Type of provider ('oauth2', 'oidc')
6855 is_enabled: Whether provider is currently enabled
6856 authorization_url: OAuth authorization URL (optional)
6858 Examples:
6859 >>> provider = SSOProviderResponse(
6860 ... id="github",
6861 ... name="github",
6862 ... display_name="GitHub",
6863 ... provider_type="oauth2",
6864 ... is_enabled=True
6865 ... )
6866 >>> provider.id
6867 'github'
6868 """
6870 id: str = Field(..., description="Provider identifier")
6871 name: str = Field(..., description="Provider name")
6872 display_name: str = Field(..., description="Human-readable display name")
6873 provider_type: Optional[str] = Field(None, description="Provider type (oauth2, oidc)")
6874 is_enabled: Optional[bool] = Field(None, description="Whether provider is enabled")
6875 authorization_url: Optional[str] = Field(None, description="OAuth authorization URL")
6876 jwks_uri: Optional[str] = Field(None, description="OIDC JWKS endpoint for token signature verification")
6879class SSOLoginResponse(BaseModelWithConfigDict):
6880 """Response schema for SSO login initiation.
6882 Attributes:
6883 authorization_url: URL to redirect user for authentication
6884 state: CSRF state parameter for validation
6886 Examples:
6887 >>> login = SSOLoginResponse(
6888 ... authorization_url="https://github.com/login/oauth/authorize?...",
6889 ... state="csrf-token-123"
6890 ... )
6891 >>> "github.com" in login.authorization_url
6892 True
6893 """
6895 authorization_url: str = Field(..., description="OAuth authorization URL")
6896 state: str = Field(..., description="CSRF state parameter")
6899class SSOCallbackResponse(BaseModelWithConfigDict):
6900 """Response schema for SSO authentication callback.
6902 Attributes:
6903 access_token: JWT access token for authenticated user
6904 token_type: Token type (always 'bearer')
6905 expires_in: Token expiration time in seconds
6906 user: User information from SSO provider
6908 Examples:
6909 >>> callback = SSOCallbackResponse(
6910 ... access_token="jwt.token.here",
6911 ... token_type="bearer",
6912 ... expires_in=3600,
6913 ... user={"email": "user@example.com", "full_name": "User"}
6914 ... )
6915 >>> callback.token_type
6916 'bearer'
6917 """
6919 access_token: str = Field(..., description="JWT access token")
6920 token_type: str = Field(default="bearer", description="Token type")
6921 expires_in: int = Field(..., description="Token expiration in seconds")
6922 user: Dict[str, Any] = Field(..., description="User information")
6925# gRPC Service schemas
6928class GrpcServiceCreate(BaseModel):
6929 """Schema for creating a new gRPC service."""
6931 name: str = Field(..., min_length=1, max_length=255, description="Unique name for the gRPC service")
6932 target: str = Field(..., description="gRPC server target address (host:port)")
6933 description: Optional[str] = Field(None, description="Description of the gRPC service")
6934 reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection")
6935 tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connection")
6936 tls_cert_path: Optional[str] = Field(None, description="Path to TLS certificate file")
6937 tls_key_path: Optional[str] = Field(None, description="Path to TLS key file")
6938 grpc_metadata: Dict[str, str] = Field(default_factory=dict, description="gRPC metadata headers")
6939 tags: List[str] = Field(default_factory=list, description="Tags for categorization")
6941 # Team scoping fields
6942 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
6943 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
6944 visibility: str = Field(default="public", description="Visibility level: private, team, or public")
6946 @field_validator("name")
6947 @classmethod
6948 def validate_name(cls, v: str) -> str:
6949 """Validate service name.
6951 Args:
6952 v: Service name to validate
6954 Returns:
6955 Validated service name
6956 """
6957 return SecurityValidator.validate_name(v, "gRPC service name")
6959 @field_validator("target")
6960 @classmethod
6961 def validate_target(cls, v: str) -> str:
6962 """Validate target address format (host:port).
6964 Args:
6965 v: Target address to validate
6967 Returns:
6968 Validated target address
6970 Raises:
6971 ValueError: If target is not in host:port format
6972 """
6973 if not v or ":" not in v:
6974 raise ValueError("Target must be in host:port format")
6975 return v
6977 @field_validator("description")
6978 @classmethod
6979 def validate_description(cls, v: Optional[str]) -> Optional[str]:
6980 """Validate description.
6982 Args:
6983 v: Description to validate
6985 Returns:
6986 Validated and sanitized description
6987 """
6988 if v is None:
6989 return None
6990 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
6991 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
6992 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
6993 return SecurityValidator.sanitize_display_text(truncated, "Description")
6994 return SecurityValidator.sanitize_display_text(v, "Description")
6997class GrpcServiceUpdate(BaseModel):
6998 """Schema for updating an existing gRPC service."""
7000 name: Optional[str] = Field(None, min_length=1, max_length=255, description="Service name")
7001 target: Optional[str] = Field(None, description="gRPC server target address")
7002 description: Optional[str] = Field(None, description="Service description")
7003 reflection_enabled: Optional[bool] = Field(None, description="Enable server reflection")
7004 tls_enabled: Optional[bool] = Field(None, description="Enable TLS")
7005 tls_cert_path: Optional[str] = Field(None, description="TLS certificate path")
7006 tls_key_path: Optional[str] = Field(None, description="TLS key path")
7007 grpc_metadata: Optional[Dict[str, str]] = Field(None, description="gRPC metadata headers")
7008 tags: Optional[List[str]] = Field(None, description="Service tags")
7009 visibility: Optional[str] = Field(None, description="Visibility level")
7011 @field_validator("name")
7012 @classmethod
7013 def validate_name(cls, v: Optional[str]) -> Optional[str]:
7014 """Validate service name.
7016 Args:
7017 v: Service name to validate
7019 Returns:
7020 Validated service name or None
7021 """
7022 if v is None:
7023 return None
7024 return SecurityValidator.validate_name(v, "gRPC service name")
7026 @field_validator("target")
7027 @classmethod
7028 def validate_target(cls, v: Optional[str]) -> Optional[str]:
7029 """Validate target address.
7031 Args:
7032 v: Target address to validate
7034 Returns:
7035 Validated target address or None
7037 Raises:
7038 ValueError: If target is not in host:port format
7039 """
7040 if v is None:
7041 return None
7042 if ":" not in v:
7043 raise ValueError("Target must be in host:port format")
7044 return v
7046 @field_validator("description")
7047 @classmethod
7048 def validate_description(cls, v: Optional[str]) -> Optional[str]:
7049 """Validate description.
7051 Args:
7052 v: Description to validate
7054 Returns:
7055 Validated and sanitized description
7056 """
7057 if v is None:
7058 return None
7059 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
7060 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
7061 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
7062 return SecurityValidator.sanitize_display_text(truncated, "Description")
7063 return SecurityValidator.sanitize_display_text(v, "Description")
7066class GrpcServiceRead(BaseModel):
7067 """Schema for reading gRPC service information."""
7069 model_config = ConfigDict(from_attributes=True)
7071 id: str = Field(..., description="Unique service identifier")
7072 name: str = Field(..., description="Service name")
7073 slug: str = Field(..., description="URL-safe slug")
7074 target: str = Field(..., description="gRPC server target (host:port)")
7075 description: Optional[str] = Field(None, description="Service description")
7077 # Configuration
7078 reflection_enabled: bool = Field(..., description="Reflection enabled")
7079 tls_enabled: bool = Field(..., description="TLS enabled")
7080 tls_cert_path: Optional[str] = Field(None, description="TLS certificate path")
7081 tls_key_path: Optional[str] = Field(None, description="TLS key path")
7082 grpc_metadata: Dict[str, str] = Field(default_factory=dict, description="gRPC metadata")
7084 # Status
7085 enabled: bool = Field(..., description="Service enabled")
7086 reachable: bool = Field(..., description="Service reachable")
7088 # Discovery
7089 service_count: int = Field(default=0, description="Number of gRPC services discovered")
7090 method_count: int = Field(default=0, description="Number of methods discovered")
7091 discovered_services: Dict[str, Any] = Field(default_factory=dict, description="Discovered service descriptors")
7092 last_reflection: Optional[datetime] = Field(None, description="Last reflection timestamp")
7094 # Tags
7095 tags: List[str] = Field(default_factory=list, description="Service tags")
7097 # Timestamps
7098 created_at: datetime = Field(..., description="Creation timestamp")
7099 updated_at: datetime = Field(..., description="Last update timestamp")
7101 # Team scoping
7102 team_id: Optional[str] = Field(None, description="Team ID")
7103 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
7104 owner_email: Optional[str] = Field(None, description="Owner email")
7105 visibility: str = Field(default="public", description="Visibility level")
7108# Plugin-related schemas
7111class PluginSummary(BaseModel):
7112 """Summary information for a plugin in list views."""
7114 name: str = Field(..., description="Unique plugin name")
7115 description: str = Field("", description="Plugin description")
7116 author: str = Field("Unknown", description="Plugin author")
7117 version: str = Field("0.0.0", description="Plugin version")
7118 mode: str = Field(..., description="Plugin mode: enforce, permissive, or disabled")
7119 priority: int = Field(..., description="Plugin execution priority (lower = higher priority)")
7120 hooks: List[str] = Field(default_factory=list, description="Hook points where plugin executes")
7121 tags: List[str] = Field(default_factory=list, description="Plugin tags for categorization")
7122 status: str = Field(..., description="Plugin status: enabled or disabled")
7123 config_summary: Dict[str, Any] = Field(default_factory=dict, description="Summary of plugin configuration")
7126class PluginDetail(PluginSummary):
7127 """Detailed plugin information including full configuration."""
7129 kind: str = Field("", description="Plugin type or class")
7130 namespace: Optional[str] = Field(None, description="Plugin namespace")
7131 conditions: List[Any] = Field(default_factory=list, description="Conditions for plugin execution")
7132 config: Dict[str, Any] = Field(default_factory=dict, description="Full plugin configuration")
7133 manifest: Optional[Dict[str, Any]] = Field(None, description="Plugin manifest information")
7136class PluginListResponse(BaseModel):
7137 """Response for plugin list endpoint."""
7139 plugins: List[PluginSummary] = Field(..., description="List of plugins")
7140 total: int = Field(..., description="Total number of plugins")
7141 enabled_count: int = Field(0, description="Number of enabled plugins")
7142 disabled_count: int = Field(0, description="Number of disabled plugins")
7145class PluginStatsResponse(BaseModel):
7146 """Response for plugin statistics endpoint."""
7148 total_plugins: int = Field(..., description="Total number of plugins")
7149 enabled_plugins: int = Field(..., description="Number of enabled plugins")
7150 disabled_plugins: int = Field(..., description="Number of disabled plugins")
7151 plugins_by_hook: Dict[str, int] = Field(default_factory=dict, description="Plugin count by hook type")
7152 plugins_by_mode: Dict[str, int] = Field(default_factory=dict, description="Plugin count by mode")
7155# MCP Server Catalog Schemas
7158class CatalogServer(BaseModel):
7159 """Schema for a catalog server entry."""
7161 id: str = Field(..., description="Unique identifier for the catalog server")
7162 name: str = Field(..., description="Display name of the server")
7163 category: str = Field(..., description="Server category (e.g., Project Management, Software Development)")
7164 url: str = Field(..., description="Server endpoint URL")
7165 auth_type: str = Field(..., description="Authentication type (e.g., OAuth2.1, API Key, Open)")
7166 provider: str = Field(..., description="Provider/vendor name")
7167 description: str = Field(..., description="Server description")
7168 requires_api_key: bool = Field(default=False, description="Whether API key is required")
7169 secure: bool = Field(default=False, description="Whether additional security is required")
7170 tags: List[str] = Field(default_factory=list, description="Tags for categorization")
7171 transport: Optional[str] = Field(None, description="Transport type: SSE, STREAMABLEHTTP, or WEBSOCKET")
7172 logo_url: Optional[str] = Field(None, description="URL to server logo/icon")
7173 documentation_url: Optional[str] = Field(None, description="URL to server documentation")
7174 is_registered: bool = Field(default=False, description="Whether server is already registered")
7175 is_available: bool = Field(default=True, description="Whether server is currently available")
7176 requires_oauth_config: bool = Field(default=False, description="Whether server is registered but needs OAuth configuration")
7179class CatalogServerRegisterRequest(BaseModel):
7180 """Request to register a catalog server."""
7182 server_id: str = Field(..., description="Catalog server ID to register")
7183 name: Optional[str] = Field(None, description="Optional custom name for the server")
7184 api_key: Optional[str] = Field(None, description="API key if required")
7185 oauth_credentials: Optional[Dict[str, Any]] = Field(None, description="OAuth credentials if required")
7188class CatalogServerRegisterResponse(BaseModel):
7189 """Response after registering a catalog server."""
7191 success: bool = Field(..., description="Whether registration was successful")
7192 server_id: str = Field(..., description="ID of the registered server in the system")
7193 message: str = Field(..., description="Status message")
7194 error: Optional[str] = Field(None, description="Error message if registration failed")
7195 oauth_required: bool = Field(False, description="Whether OAuth configuration is required before activation")
7198class CatalogServerStatusRequest(BaseModel):
7199 """Request to check catalog server status."""
7201 server_id: str = Field(..., description="Catalog server ID to check")
7204class CatalogServerStatusResponse(BaseModel):
7205 """Response for catalog server status check."""
7207 server_id: str = Field(..., description="Catalog server ID")
7208 is_available: bool = Field(..., description="Whether server is reachable")
7209 is_registered: bool = Field(..., description="Whether server is registered")
7210 last_checked: Optional[datetime] = Field(None, description="Last health check timestamp")
7211 response_time_ms: Optional[float] = Field(None, description="Response time in milliseconds")
7212 error: Optional[str] = Field(None, description="Error message if check failed")
7215class CatalogListRequest(BaseModel):
7216 """Request to list catalog servers."""
7218 category: Optional[str] = Field(None, description="Filter by category")
7219 auth_type: Optional[str] = Field(None, description="Filter by auth type")
7220 provider: Optional[str] = Field(None, description="Filter by provider")
7221 search: Optional[str] = Field(None, description="Search term for name/description")
7222 tags: Optional[List[str]] = Field(None, description="Filter by tags")
7223 show_registered_only: bool = Field(default=False, description="Show only registered servers")
7224 show_available_only: bool = Field(default=True, description="Show only available servers")
7225 limit: int = Field(default=100, description="Maximum number of results")
7226 offset: int = Field(default=0, description="Offset for pagination")
7229class CatalogListResponse(BaseModel):
7230 """Response containing catalog servers."""
7232 servers: List[CatalogServer] = Field(..., description="List of catalog servers")
7233 total: int = Field(..., description="Total number of matching servers")
7234 categories: List[str] = Field(..., description="Available categories")
7235 auth_types: List[str] = Field(..., description="Available auth types")
7236 providers: List[str] = Field(..., description="Available providers")
7237 all_tags: List[str] = Field(default_factory=list, description="All available tags")
7240class CatalogBulkRegisterRequest(BaseModel):
7241 """Request to register multiple catalog servers."""
7243 server_ids: List[str] = Field(..., description="List of catalog server IDs to register")
7244 skip_errors: bool = Field(default=True, description="Continue on error")
7247class CatalogBulkRegisterResponse(BaseModel):
7248 """Response after bulk registration."""
7250 successful: List[str] = Field(..., description="Successfully registered server IDs")
7251 failed: List[Dict[str, str]] = Field(..., description="Failed registrations with error messages")
7252 total_attempted: int = Field(..., description="Total servers attempted")
7253 total_successful: int = Field(..., description="Total successful registrations")
7256# ===================================
7257# Pagination Schemas
7258# ===================================
7261class PaginationMeta(BaseModel):
7262 """Pagination metadata.
7264 Attributes:
7265 page: Current page number (1-indexed)
7266 per_page: Items per page
7267 total_items: Total number of items across all pages
7268 total_pages: Total number of pages
7269 has_next: Whether there is a next page
7270 has_prev: Whether there is a previous page
7271 next_cursor: Cursor for next page (cursor-based only)
7272 prev_cursor: Cursor for previous page (cursor-based only)
7274 Examples:
7275 >>> meta = PaginationMeta(
7276 ... page=2,
7277 ... per_page=50,
7278 ... total_items=250,
7279 ... total_pages=5,
7280 ... has_next=True,
7281 ... has_prev=True
7282 ... )
7283 >>> meta.page
7284 2
7285 >>> meta.total_pages
7286 5
7287 """
7289 page: int = Field(..., description="Current page number (1-indexed)", ge=1)
7290 per_page: int = Field(..., description="Items per page", ge=1)
7291 total_items: int = Field(..., description="Total number of items", ge=0)
7292 total_pages: int = Field(..., description="Total number of pages", ge=0)
7293 has_next: bool = Field(..., description="Whether there is a next page")
7294 has_prev: bool = Field(..., description="Whether there is a previous page")
7295 next_cursor: Optional[str] = Field(None, description="Cursor for next page (cursor-based only)")
7296 prev_cursor: Optional[str] = Field(None, description="Cursor for previous page (cursor-based only)")
7299class PaginationLinks(BaseModel):
7300 """Pagination navigation links.
7302 Attributes:
7303 self: Current page URL
7304 first: First page URL
7305 last: Last page URL
7306 next: Next page URL (None if no next page)
7307 prev: Previous page URL (None if no previous page)
7309 Examples:
7310 >>> links = PaginationLinks(
7311 ... self="/admin/tools?page=2&per_page=50",
7312 ... first="/admin/tools?page=1&per_page=50",
7313 ... last="/admin/tools?page=5&per_page=50",
7314 ... next="/admin/tools?page=3&per_page=50",
7315 ... prev="/admin/tools?page=1&per_page=50"
7316 ... )
7317 >>> links.self
7318 '/admin/tools?page=2&per_page=50'
7319 """
7321 self: str = Field(..., description="Current page URL")
7322 first: str = Field(..., description="First page URL")
7323 last: str = Field(..., description="Last page URL")
7324 next: Optional[str] = Field(None, description="Next page URL")
7325 prev: Optional[str] = Field(None, description="Previous page URL")
7328class PaginatedResponse(BaseModel):
7329 """Generic paginated response wrapper.
7331 This is a container for paginated data with metadata and navigation links.
7332 The actual data is stored in the 'data' field as a list of items.
7334 Attributes:
7335 data: List of items for the current page
7336 pagination: Pagination metadata (counts, page info)
7337 links: Navigation links (optional)
7339 Examples:
7340 >>> from mcpgateway.schemas import ToolRead
7341 >>> response = PaginatedResponse(
7342 ... data=[],
7343 ... pagination=PaginationMeta(
7344 ... page=1, per_page=50, total_items=0,
7345 ... total_pages=0, has_next=False, has_prev=False
7346 ... ),
7347 ... links=None
7348 ... )
7349 >>> response.pagination.page
7350 1
7351 """
7353 data: List[Any] = Field(..., description="List of items")
7354 pagination: PaginationMeta = Field(..., description="Pagination metadata")
7355 links: Optional[PaginationLinks] = Field(None, description="Navigation links")
7358class PaginationParams(BaseModel):
7359 """Common pagination query parameters.
7361 Attributes:
7362 page: Page number (1-indexed)
7363 per_page: Items per page
7364 cursor: Cursor for cursor-based pagination
7365 sort_by: Field to sort by
7366 sort_order: Sort order (asc/desc)
7368 Examples:
7369 >>> params = PaginationParams(page=1, per_page=50)
7370 >>> params.page
7371 1
7372 >>> params.sort_order
7373 'desc'
7374 """
7376 page: int = Field(default=1, ge=1, description="Page number (1-indexed)")
7377 per_page: int = Field(default=50, ge=1, le=500, description="Items per page (max 500)")
7378 cursor: Optional[str] = Field(None, description="Cursor for cursor-based pagination")
7379 sort_by: Optional[str] = Field("created_at", description="Sort field")
7380 sort_order: Optional[str] = Field("desc", pattern="^(asc|desc)$", description="Sort order")
7383# ============================================================================
7384# Cursor Pagination Response Schemas (for main API endpoints)
7385# ============================================================================
7388class CursorPaginatedToolsResponse(BaseModel):
7389 """Cursor-paginated response for tools list endpoint."""
7391 tools: List["ToolRead"] = Field(..., description="List of tools for this page")
7392 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7395class CursorPaginatedServersResponse(BaseModel):
7396 """Cursor-paginated response for servers list endpoint."""
7398 servers: List["ServerRead"] = Field(..., description="List of servers for this page")
7399 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7402class CursorPaginatedGatewaysResponse(BaseModel):
7403 """Cursor-paginated response for gateways list endpoint."""
7405 gateways: List["GatewayRead"] = Field(..., description="List of gateways for this page")
7406 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7409class CursorPaginatedResourcesResponse(BaseModel):
7410 """Cursor-paginated response for resources list endpoint."""
7412 resources: List["ResourceRead"] = Field(..., description="List of resources for this page")
7413 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7416class CursorPaginatedPromptsResponse(BaseModel):
7417 """Cursor-paginated response for prompts list endpoint."""
7419 prompts: List["PromptRead"] = Field(..., description="List of prompts for this page")
7420 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7423class CursorPaginatedA2AAgentsResponse(BaseModel):
7424 """Cursor-paginated response for A2A agents list endpoint."""
7426 agents: List["A2AAgentRead"] = Field(..., description="List of A2A agents for this page")
7427 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7430class CursorPaginatedTeamsResponse(BaseModel):
7431 """Cursor-paginated response for teams list endpoint."""
7433 teams: List["TeamResponse"] = Field(..., description="List of teams for this page")
7434 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7437class CursorPaginatedUsersResponse(BaseModel):
7438 """Cursor-paginated response for users list endpoint."""
7440 users: List["EmailUserResponse"] = Field(..., description="List of users for this page")
7441 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7444# ============================================================================
7445# Observability Schemas (OpenTelemetry-style traces, spans, events, metrics)
7446# ============================================================================
7449class ObservabilityTraceBase(BaseModel):
7450 """Base schema for observability traces."""
7452 name: str = Field(..., description="Trace name (e.g., 'POST /tools/invoke')")
7453 start_time: datetime = Field(..., description="Trace start timestamp")
7454 end_time: Optional[datetime] = Field(None, description="Trace end timestamp")
7455 duration_ms: Optional[float] = Field(None, description="Total duration in milliseconds")
7456 status: str = Field("unset", description="Trace status (unset, ok, error)")
7457 status_message: Optional[str] = Field(None, description="Status message or error description")
7458 http_method: Optional[str] = Field(None, description="HTTP method")
7459 http_url: Optional[str] = Field(None, description="HTTP URL")
7460 http_status_code: Optional[int] = Field(None, description="HTTP status code")
7461 user_email: Optional[str] = Field(None, description="User email")
7462 user_agent: Optional[str] = Field(None, description="User agent string")
7463 ip_address: Optional[str] = Field(None, description="Client IP address")
7464 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional trace attributes")
7465 resource_attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Resource attributes")
7468class ObservabilityTraceCreate(ObservabilityTraceBase):
7469 """Schema for creating an observability trace."""
7471 trace_id: Optional[str] = Field(None, description="Trace ID (generated if not provided)")
7474class ObservabilityTraceUpdate(BaseModel):
7475 """Schema for updating an observability trace."""
7477 end_time: Optional[datetime] = None
7478 duration_ms: Optional[float] = None
7479 status: Optional[str] = None
7480 status_message: Optional[str] = None
7481 http_status_code: Optional[int] = None
7482 attributes: Optional[Dict[str, Any]] = None
7485class ObservabilityTraceRead(ObservabilityTraceBase):
7486 """Schema for reading an observability trace."""
7488 trace_id: str = Field(..., description="Trace ID")
7489 created_at: datetime = Field(..., description="Creation timestamp")
7491 model_config = {"from_attributes": True}
7494class ObservabilitySpanBase(BaseModel):
7495 """Base schema for observability spans."""
7497 trace_id: str = Field(..., description="Parent trace ID")
7498 parent_span_id: Optional[str] = Field(None, description="Parent span ID (for nested spans)")
7499 name: str = Field(..., description="Span name (e.g., 'database_query', 'tool_invocation')")
7500 kind: str = Field("internal", description="Span kind (internal, server, client, producer, consumer)")
7501 start_time: datetime = Field(..., description="Span start timestamp")
7502 end_time: Optional[datetime] = Field(None, description="Span end timestamp")
7503 duration_ms: Optional[float] = Field(None, description="Span duration in milliseconds")
7504 status: str = Field("unset", description="Span status (unset, ok, error)")
7505 status_message: Optional[str] = Field(None, description="Status message")
7506 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Span attributes")
7507 resource_name: Optional[str] = Field(None, description="Resource name")
7508 resource_type: Optional[str] = Field(None, description="Resource type (tool, resource, prompt, gateway, a2a_agent)")
7509 resource_id: Optional[str] = Field(None, description="Resource ID")
7512class ObservabilitySpanCreate(ObservabilitySpanBase):
7513 """Schema for creating an observability span."""
7515 span_id: Optional[str] = Field(None, description="Span ID (generated if not provided)")
7518class ObservabilitySpanUpdate(BaseModel):
7519 """Schema for updating an observability span."""
7521 end_time: Optional[datetime] = None
7522 duration_ms: Optional[float] = None
7523 status: Optional[str] = None
7524 status_message: Optional[str] = None
7525 attributes: Optional[Dict[str, Any]] = None
7528class ObservabilitySpanRead(ObservabilitySpanBase):
7529 """Schema for reading an observability span."""
7531 span_id: str = Field(..., description="Span ID")
7532 created_at: datetime = Field(..., description="Creation timestamp")
7534 model_config = {"from_attributes": True}
7537class ObservabilityEventBase(BaseModel):
7538 """Base schema for observability events."""
7540 span_id: str = Field(..., description="Parent span ID")
7541 name: str = Field(..., description="Event name (e.g., 'exception', 'log', 'checkpoint')")
7542 timestamp: datetime = Field(..., description="Event timestamp")
7543 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Event attributes")
7544 severity: Optional[str] = Field(None, description="Log severity (debug, info, warning, error, critical)")
7545 message: Optional[str] = Field(None, description="Event message")
7546 exception_type: Optional[str] = Field(None, description="Exception class name")
7547 exception_message: Optional[str] = Field(None, description="Exception message")
7548 exception_stacktrace: Optional[str] = Field(None, description="Exception stacktrace")
7551class ObservabilityEventCreate(ObservabilityEventBase):
7552 """Schema for creating an observability event."""
7555class ObservabilityEventRead(ObservabilityEventBase):
7556 """Schema for reading an observability event."""
7558 id: int = Field(..., description="Event ID")
7559 created_at: datetime = Field(..., description="Creation timestamp")
7561 model_config = {"from_attributes": True}
7564class ObservabilityMetricBase(BaseModel):
7565 """Base schema for observability metrics."""
7567 name: str = Field(..., description="Metric name (e.g., 'http.request.duration', 'tool.invocation.count')")
7568 metric_type: str = Field(..., description="Metric type (counter, gauge, histogram)")
7569 value: float = Field(..., description="Metric value")
7570 timestamp: datetime = Field(..., description="Metric timestamp")
7571 unit: Optional[str] = Field(None, description="Metric unit (ms, count, bytes, etc.)")
7572 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Metric attributes/labels")
7573 resource_type: Optional[str] = Field(None, description="Resource type")
7574 resource_id: Optional[str] = Field(None, description="Resource ID")
7575 trace_id: Optional[str] = Field(None, description="Associated trace ID")
7578class ObservabilityMetricCreate(ObservabilityMetricBase):
7579 """Schema for creating an observability metric."""
7582class ObservabilityMetricRead(ObservabilityMetricBase):
7583 """Schema for reading an observability metric."""
7585 id: int = Field(..., description="Metric ID")
7586 created_at: datetime = Field(..., description="Creation timestamp")
7588 model_config = {"from_attributes": True}
7591class ObservabilityTraceWithSpans(ObservabilityTraceRead):
7592 """Schema for reading a trace with its spans."""
7594 spans: List[ObservabilitySpanRead] = Field(default_factory=list, description="List of spans in this trace")
7597class ObservabilitySpanWithEvents(ObservabilitySpanRead):
7598 """Schema for reading a span with its events."""
7600 events: List[ObservabilityEventRead] = Field(default_factory=list, description="List of events in this span")
7603class ObservabilityQueryParams(BaseModel):
7604 """Query parameters for filtering observability data."""
7606 start_time: Optional[datetime] = Field(None, description="Filter traces/spans/metrics after this time")
7607 end_time: Optional[datetime] = Field(None, description="Filter traces/spans/metrics before this time")
7608 status: Optional[str] = Field(None, description="Filter by status (ok, error, unset)")
7609 http_status_code: Optional[int] = Field(None, description="Filter by HTTP status code")
7610 user_email: Optional[str] = Field(None, description="Filter by user email")
7611 resource_type: Optional[str] = Field(None, description="Filter by resource type")
7612 resource_name: Optional[str] = Field(None, description="Filter by resource name")
7613 trace_id: Optional[str] = Field(None, description="Filter by trace ID")
7614 limit: int = Field(default=100, ge=1, le=1000, description="Maximum number of results")
7615 offset: int = Field(default=0, ge=0, description="Result offset for pagination")
7618# --- Performance Monitoring Schemas ---
7621class WorkerMetrics(BaseModel):
7622 """Metrics for a single worker process."""
7624 pid: int = Field(..., description="Process ID")
7625 cpu_percent: float = Field(..., description="CPU utilization percentage")
7626 memory_rss_mb: float = Field(..., description="Resident Set Size memory in MB")
7627 memory_vms_mb: float = Field(..., description="Virtual Memory Size in MB")
7628 threads: int = Field(..., description="Number of threads")
7629 connections: int = Field(0, description="Number of network connections")
7630 open_fds: Optional[int] = Field(None, description="Number of open file descriptors")
7631 status: str = Field("running", description="Worker status")
7632 create_time: Optional[datetime] = Field(None, description="Worker start time")
7633 uptime_seconds: Optional[int] = Field(None, description="Worker uptime in seconds")
7636class SystemMetricsSchema(BaseModel):
7637 """System-wide resource metrics."""
7639 # CPU metrics
7640 cpu_percent: float = Field(..., description="Total CPU utilization percentage")
7641 cpu_count: int = Field(..., description="Number of logical CPU cores")
7642 cpu_freq_mhz: Optional[float] = Field(None, description="Current CPU frequency in MHz")
7643 load_avg_1m: Optional[float] = Field(None, description="1-minute load average")
7644 load_avg_5m: Optional[float] = Field(None, description="5-minute load average")
7645 load_avg_15m: Optional[float] = Field(None, description="15-minute load average")
7647 # Memory metrics
7648 memory_total_mb: int = Field(..., description="Total physical memory in MB")
7649 memory_used_mb: int = Field(..., description="Used physical memory in MB")
7650 memory_available_mb: int = Field(..., description="Available memory in MB")
7651 memory_percent: float = Field(..., description="Memory utilization percentage")
7652 swap_total_mb: int = Field(0, description="Total swap space in MB")
7653 swap_used_mb: int = Field(0, description="Used swap space in MB")
7655 # Disk metrics
7656 disk_total_gb: float = Field(..., description="Total disk space in GB")
7657 disk_used_gb: float = Field(..., description="Used disk space in GB")
7658 disk_percent: float = Field(..., description="Disk utilization percentage")
7660 # Network metrics
7661 network_bytes_sent: int = Field(0, description="Total network bytes sent")
7662 network_bytes_recv: int = Field(0, description="Total network bytes received")
7663 network_connections: int = Field(0, description="Active network connections")
7665 # Process info
7666 boot_time: Optional[datetime] = Field(None, description="System boot time")
7669class RequestMetricsSchema(BaseModel):
7670 """HTTP request performance metrics."""
7672 requests_total: int = Field(0, description="Total HTTP requests")
7673 requests_per_second: float = Field(0, description="Current request rate")
7674 requests_1xx: int = Field(0, description="1xx informational responses")
7675 requests_2xx: int = Field(0, description="2xx success responses")
7676 requests_3xx: int = Field(0, description="3xx redirect responses")
7677 requests_4xx: int = Field(0, description="4xx client error responses")
7678 requests_5xx: int = Field(0, description="5xx server error responses")
7680 # Response time percentiles
7681 response_time_avg_ms: float = Field(0, description="Average response time in ms")
7682 response_time_p50_ms: float = Field(0, description="50th percentile response time")
7683 response_time_p95_ms: float = Field(0, description="95th percentile response time")
7684 response_time_p99_ms: float = Field(0, description="99th percentile response time")
7686 # Error rate
7687 error_rate: float = Field(0, description="Percentage of 4xx/5xx responses")
7689 # Active requests
7690 active_requests: int = Field(0, description="Currently processing requests")
7693class DatabaseMetricsSchema(BaseModel):
7694 """Database connection pool metrics."""
7696 pool_size: int = Field(0, description="Connection pool size")
7697 connections_in_use: int = Field(0, description="Active connections")
7698 connections_available: int = Field(0, description="Available connections")
7699 overflow: int = Field(0, description="Overflow connections")
7700 query_count: int = Field(0, description="Total queries executed")
7701 query_avg_time_ms: float = Field(0, description="Average query time in ms")
7704class CacheMetricsSchema(BaseModel):
7705 """Redis cache metrics."""
7707 connected: bool = Field(False, description="Redis connection status")
7708 version: Optional[str] = Field(None, description="Redis version")
7709 used_memory_mb: float = Field(0, description="Redis memory usage in MB")
7710 connected_clients: int = Field(0, description="Connected Redis clients")
7711 ops_per_second: int = Field(0, description="Redis operations per second")
7712 hit_rate: float = Field(0, description="Cache hit rate percentage")
7713 keyspace_hits: int = Field(0, description="Successful key lookups")
7714 keyspace_misses: int = Field(0, description="Failed key lookups")
7717class GunicornMetricsSchema(BaseModel):
7718 """Gunicorn server metrics."""
7720 master_pid: Optional[int] = Field(None, description="Master process PID")
7721 workers_total: int = Field(0, description="Total configured workers")
7722 workers_active: int = Field(0, description="Currently active workers")
7723 workers_idle: int = Field(0, description="Idle workers")
7724 max_requests: int = Field(0, description="Max requests before worker restart")
7727class PerformanceSnapshotCreate(BaseModel):
7728 """Schema for creating a performance snapshot."""
7730 host: str = Field(..., description="Hostname")
7731 worker_id: Optional[str] = Field(None, description="Worker identifier")
7732 metrics_json: Dict[str, Any] = Field(..., description="Serialized metrics data")
7735class PerformanceSnapshotRead(BaseModel):
7736 """Schema for reading a performance snapshot."""
7738 id: int = Field(..., description="Snapshot ID")
7739 timestamp: datetime = Field(..., description="Snapshot timestamp")
7740 host: str = Field(..., description="Hostname")
7741 worker_id: Optional[str] = Field(None, description="Worker identifier")
7742 metrics_json: Dict[str, Any] = Field(..., description="Serialized metrics data")
7743 created_at: datetime = Field(..., description="Creation timestamp")
7745 model_config = {"from_attributes": True}
7748class PerformanceAggregateBase(BaseModel):
7749 """Base schema for performance aggregates."""
7751 period_start: datetime = Field(..., description="Start of aggregation period")
7752 period_end: datetime = Field(..., description="End of aggregation period")
7753 period_type: str = Field(..., description="Aggregation type (hourly, daily)")
7754 host: Optional[str] = Field(None, description="Host (None for cluster-wide)")
7756 # Request aggregates
7757 requests_total: int = Field(0, description="Total requests in period")
7758 requests_2xx: int = Field(0, description="2xx responses in period")
7759 requests_4xx: int = Field(0, description="4xx responses in period")
7760 requests_5xx: int = Field(0, description="5xx responses in period")
7761 avg_response_time_ms: float = Field(0, description="Average response time")
7762 p95_response_time_ms: float = Field(0, description="95th percentile response time")
7763 peak_requests_per_second: float = Field(0, description="Peak request rate")
7765 # Resource aggregates
7766 avg_cpu_percent: float = Field(0, description="Average CPU utilization")
7767 avg_memory_percent: float = Field(0, description="Average memory utilization")
7768 peak_cpu_percent: float = Field(0, description="Peak CPU utilization")
7769 peak_memory_percent: float = Field(0, description="Peak memory utilization")
7772class PerformanceAggregateCreate(PerformanceAggregateBase):
7773 """Schema for creating a performance aggregate."""
7776class PerformanceAggregateRead(PerformanceAggregateBase):
7777 """Schema for reading a performance aggregate."""
7779 id: int = Field(..., description="Aggregate ID")
7780 created_at: datetime = Field(..., description="Creation timestamp")
7782 model_config = {"from_attributes": True}
7785class PerformanceDashboard(BaseModel):
7786 """Complete performance dashboard data."""
7788 timestamp: datetime = Field(..., description="Dashboard generation timestamp")
7789 uptime_seconds: int = Field(0, description="Application uptime in seconds")
7790 host: str = Field(..., description="Current hostname")
7792 # Current metrics
7793 system: SystemMetricsSchema = Field(..., description="Current system metrics")
7794 requests: RequestMetricsSchema = Field(..., description="Current request metrics")
7795 database: DatabaseMetricsSchema = Field(..., description="Current database metrics")
7796 cache: CacheMetricsSchema = Field(..., description="Current cache metrics")
7797 gunicorn: GunicornMetricsSchema = Field(..., description="Current Gunicorn metrics")
7798 workers: List[WorkerMetrics] = Field(default_factory=list, description="Per-worker metrics")
7800 # Cluster info (for distributed mode)
7801 cluster_hosts: List[str] = Field(default_factory=list, description="Known cluster hosts")
7802 is_distributed: bool = Field(False, description="Running in distributed mode")
7805class PerformanceHistoryParams(BaseModel):
7806 """Query parameters for historical performance data."""
7808 start_time: Optional[datetime] = Field(None, description="Start of time range")
7809 end_time: Optional[datetime] = Field(None, description="End of time range")
7810 period_type: str = Field("hourly", description="Aggregation period (hourly, daily)")
7811 host: Optional[str] = Field(None, description="Filter by host")
7812 limit: int = Field(default=168, ge=1, le=1000, description="Maximum results")
7815class PerformanceHistoryResponse(BaseModel):
7816 """Response for historical performance data."""
7818 aggregates: List[PerformanceAggregateRead] = Field(default_factory=list, description="Historical aggregates")
7819 period_type: str = Field(..., description="Aggregation period type")
7820 total_count: int = Field(0, description="Total matching records")