Coverage for mcpgateway / schemas.py: 99%
2839 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/schemas.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7ContextForge Schema Definitions.
8This module provides Pydantic models for request/response validation in ContextForge.
9It implements schemas for:
10- Tool registration and invocation
11- Resource management and subscriptions
12- Prompt templates and arguments
13- Gateway federation
14- RPC message formats
15- Event messages
16- Admin interface
18The schemas ensure proper validation according to the MCP specification while adding
19gateway-specific extensions for federation support.
20"""
22# Standard
23import base64
24from datetime import datetime, timezone
25from enum import Enum
26import logging
27import re
28from typing import Any, Dict, List, Literal, Optional, Pattern, Self, Union
29from urllib.parse import urlparse
31# Third-Party
32import orjson
33from pydantic import AnyHttpUrl, BaseModel, ConfigDict, EmailStr, Field, field_serializer, field_validator, model_serializer, model_validator, SecretStr, ValidationInfo
35# First-Party
36from mcpgateway.common.models import Annotations, ImageContent
37from mcpgateway.common.models import Prompt as MCPPrompt
38from mcpgateway.common.models import Resource as MCPResource
39from mcpgateway.common.models import ResourceContent, TextContent
40from mcpgateway.common.models import Tool as MCPTool
41from mcpgateway.common.oauth import OAUTH_SENSITIVE_KEYS
42from mcpgateway.common.validators import SecurityValidator, validate_core_url
43from mcpgateway.config import settings
44from mcpgateway.utils.base_models import BaseModelWithConfigDict
45from mcpgateway.utils.services_auth import decode_auth, encode_auth
46from mcpgateway.validation.tags import validate_tags_field
48logger = logging.getLogger(__name__)
50# ============================================================================
51# Precompiled regex patterns (compiled once at module load for performance)
52# ============================================================================
53# Note: Only truly static patterns are precompiled here. Settings-based patterns
54# (e.g., from settings.* or SecurityValidator.*) are NOT precompiled because tests
55# override class/settings attributes at runtime via monkeypatch.
56_HOSTNAME_RE: Pattern[str] = re.compile(r"^(https?://)?([a-zA-Z0-9.-]+)(:[0-9]+)?$")
57_SLUG_RE: Pattern[str] = re.compile(r"^[a-z0-9-]+$")
59_VALID_VISIBILITY = {"private", "team", "public"}
62def _coerce_visibility(v: Optional[str]) -> Optional[str]:
63 """Normalize legacy visibility values in Read/response schemas.
65 DB columns are unconstrained strings, so historical rows may contain
66 values outside the Literal enum. Coerce them to 'public' (the DB
67 default) instead of letting Pydantic raise a ValidationError on the
68 read path.
70 Args:
71 v: Visibility value to normalize.
73 Returns:
74 The original value if valid, 'public' if invalid, or None if None.
75 """
76 if v is not None and v not in _VALID_VISIBILITY:
77 logger.warning("Coercing invalid visibility value %r to 'public'", v)
78 return "public"
79 return v
82def encode_datetime(v: datetime) -> str:
83 """
84 Convert a datetime object to an ISO 8601 formatted string.
86 Args:
87 v (datetime): The datetime object to be encoded.
89 Returns:
90 str: The ISO 8601 formatted string representation of the datetime object.
92 Examples:
93 >>> from datetime import datetime, timezone
94 >>> encode_datetime(datetime(2023, 5, 22, 14, 30, 0))
95 '2023-05-22T14:30:00'
96 >>> encode_datetime(datetime(2024, 12, 25, 9, 15, 30))
97 '2024-12-25T09:15:30'
98 >>> encode_datetime(datetime(2025, 1, 1, 0, 0, 0))
99 '2025-01-01T00:00:00'
100 >>> # Test with timezone
101 >>> dt_utc = datetime(2023, 6, 15, 12, 0, 0, tzinfo=timezone.utc)
102 >>> encode_datetime(dt_utc)
103 '2023-06-15T12:00:00+00:00'
104 >>> # Test microseconds
105 >>> dt_micro = datetime(2023, 7, 20, 16, 45, 30, 123456)
106 >>> encode_datetime(dt_micro)
107 '2023-07-20T16:45:30.123456'
108 """
109 return v.isoformat()
112# --- Metrics Schemas ---
115class ToolMetrics(BaseModelWithConfigDict):
116 """
117 Represents the performance and execution statistics for a tool.
119 Attributes:
120 total_executions (int): Total number of tool invocations.
121 successful_executions (int): Number of successful tool invocations.
122 failed_executions (int): Number of failed tool invocations.
123 failure_rate (float): Failure rate (failed invocations / total invocations).
124 min_response_time (Optional[float]): Minimum response time in seconds.
125 max_response_time (Optional[float]): Maximum response time in seconds.
126 avg_response_time (Optional[float]): Average response time in seconds.
127 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
129 Examples:
130 >>> from datetime import datetime
131 >>> metrics = ToolMetrics(
132 ... total_executions=100,
133 ... successful_executions=95,
134 ... failed_executions=5,
135 ... failure_rate=0.05,
136 ... min_response_time=0.1,
137 ... max_response_time=2.5,
138 ... avg_response_time=0.8
139 ... )
140 >>> metrics.total_executions
141 100
142 >>> metrics.failure_rate
143 0.05
144 >>> metrics.successful_executions + metrics.failed_executions == metrics.total_executions
145 True
146 >>> # Test with minimal data
147 >>> minimal_metrics = ToolMetrics(
148 ... total_executions=10,
149 ... successful_executions=8,
150 ... failed_executions=2,
151 ... failure_rate=0.2
152 ... )
153 >>> minimal_metrics.min_response_time is None
154 True
155 >>> # Test model dump functionality
156 >>> data = metrics.model_dump()
157 >>> isinstance(data, dict)
158 True
159 >>> data['total_executions']
160 100
161 """
163 total_executions: int = Field(..., description="Total number of tool invocations")
164 successful_executions: int = Field(..., description="Number of successful tool invocations")
165 failed_executions: int = Field(..., description="Number of failed tool invocations")
166 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
167 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
168 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
169 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
170 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
173class ResourceMetrics(BaseModelWithConfigDict):
174 """
175 Represents the performance and execution statistics for a resource.
177 Attributes:
178 total_executions (int): Total number of resource invocations.
179 successful_executions (int): Number of successful resource invocations.
180 failed_executions (int): Number of failed resource invocations.
181 failure_rate (float): Failure rate (failed invocations / total invocations).
182 min_response_time (Optional[float]): Minimum response time in seconds.
183 max_response_time (Optional[float]): Maximum response time in seconds.
184 avg_response_time (Optional[float]): Average response time in seconds.
185 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
186 """
188 total_executions: int = Field(..., description="Total number of resource invocations")
189 successful_executions: int = Field(..., description="Number of successful resource invocations")
190 failed_executions: int = Field(..., description="Number of failed resource invocations")
191 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
192 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
193 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
194 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
195 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
198class ServerMetrics(BaseModelWithConfigDict):
199 """
200 Represents the performance and execution statistics for a server.
202 Attributes:
203 total_executions (int): Total number of server invocations.
204 successful_executions (int): Number of successful server invocations.
205 failed_executions (int): Number of failed server invocations.
206 failure_rate (float): Failure rate (failed invocations / total invocations).
207 min_response_time (Optional[float]): Minimum response time in seconds.
208 max_response_time (Optional[float]): Maximum response time in seconds.
209 avg_response_time (Optional[float]): Average response time in seconds.
210 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
211 """
213 total_executions: int = Field(..., description="Total number of server invocations")
214 successful_executions: int = Field(..., description="Number of successful server invocations")
215 failed_executions: int = Field(..., description="Number of failed server invocations")
216 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
217 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
218 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
219 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
220 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
223class PromptMetrics(BaseModelWithConfigDict):
224 """
225 Represents the performance and execution statistics for a prompt.
227 Attributes:
228 total_executions (int): Total number of prompt invocations.
229 successful_executions (int): Number of successful prompt invocations.
230 failed_executions (int): Number of failed prompt invocations.
231 failure_rate (float): Failure rate (failed invocations / total invocations).
232 min_response_time (Optional[float]): Minimum response time in seconds.
233 max_response_time (Optional[float]): Maximum response time in seconds.
234 avg_response_time (Optional[float]): Average response time in seconds.
235 last_execution_time (Optional[datetime]): Timestamp of the most recent invocation.
236 """
238 total_executions: int = Field(..., description="Total number of prompt invocations")
239 successful_executions: int = Field(..., description="Number of successful prompt invocations")
240 failed_executions: int = Field(..., description="Number of failed prompt invocations")
241 failure_rate: float = Field(..., description="Failure rate (failed invocations / total invocations)")
242 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
243 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
244 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
245 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent invocation")
248class A2AAgentMetrics(BaseModelWithConfigDict):
249 """
250 Represents the performance and execution statistics for an A2A agent.
252 Attributes:
253 total_executions (int): Total number of agent interactions.
254 successful_executions (int): Number of successful agent interactions.
255 failed_executions (int): Number of failed agent interactions.
256 failure_rate (float): Failure rate (failed interactions / total interactions).
257 min_response_time (Optional[float]): Minimum response time in seconds.
258 max_response_time (Optional[float]): Maximum response time in seconds.
259 avg_response_time (Optional[float]): Average response time in seconds.
260 last_execution_time (Optional[datetime]): Timestamp of the most recent interaction.
261 """
263 total_executions: int = Field(..., description="Total number of agent interactions")
264 successful_executions: int = Field(..., description="Number of successful agent interactions")
265 failed_executions: int = Field(..., description="Number of failed agent interactions")
266 failure_rate: float = Field(..., description="Failure rate (failed interactions / total interactions)")
267 min_response_time: Optional[float] = Field(None, description="Minimum response time in seconds")
268 max_response_time: Optional[float] = Field(None, description="Maximum response time in seconds")
269 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
270 last_execution_time: Optional[datetime] = Field(None, description="Timestamp of the most recent interaction")
273class A2AAgentAggregateMetrics(BaseModelWithConfigDict):
274 """
275 Represents aggregated metrics for all A2A agents in the system.
277 This model is used for the /metrics endpoint to provide system-wide A2A agent statistics
278 with consistent camelCase field naming.
280 Attributes:
281 total_agents (int): Total number of A2A agents registered.
282 active_agents (int): Number of currently active A2A agents.
283 total_interactions (int): Total number of agent interactions.
284 successful_interactions (int): Number of successful agent interactions.
285 failed_interactions (int): Number of failed agent interactions.
286 success_rate (float): Success rate as a percentage (0-100).
287 avg_response_time (float): Average response time in seconds.
288 min_response_time (float): Minimum response time in seconds.
289 max_response_time (float): Maximum response time in seconds.
290 """
292 total_agents: int = Field(..., description="Total number of A2A agents registered")
293 active_agents: int = Field(..., description="Number of currently active A2A agents")
294 total_interactions: int = Field(..., description="Total number of agent interactions")
295 successful_interactions: int = Field(..., description="Number of successful agent interactions")
296 failed_interactions: int = Field(..., description="Number of failed agent interactions")
297 success_rate: float = Field(..., description="Success rate as a percentage (0-100)")
298 avg_response_time: float = Field(..., description="Average response time in seconds")
299 min_response_time: float = Field(..., description="Minimum response time in seconds")
300 max_response_time: float = Field(..., description="Maximum response time in seconds")
303class MetricsResponse(BaseModelWithConfigDict):
304 """
305 Response model for the aggregated metrics endpoint.
307 Contains metrics for all entity types with consistent camelCase field names.
308 When A2A metrics are disabled, the a2a_agents key is omitted entirely
309 to preserve backwards compatibility with existing consumers.
310 """
312 tools: ToolMetrics
313 resources: ResourceMetrics
314 servers: ServerMetrics
315 prompts: PromptMetrics
316 a2a_agents: Optional[A2AAgentAggregateMetrics] = None
318 @model_serializer(mode="wrap")
319 def _exclude_none_a2a(self, handler):
320 """Omit the A2A metrics field when that feature is disabled.
322 Args:
323 handler: Pydantic serializer callback for the wrapped model.
325 Returns:
326 Dict[str, Any]: Serialized metrics payload without empty A2A fields.
327 """
328 result = handler(self)
329 if self.a2a_agents is None:
330 result.pop("a2aAgents", None)
331 result.pop("a2a_agents", None)
332 return result
335# --- JSON Path API modifier Schema
338class JsonPathModifier(BaseModelWithConfigDict):
339 """Schema for JSONPath queries.
341 Provides the structure for parsing JSONPath queries and optional mapping.
342 """
344 model_config = ConfigDict(extra="forbid") # ← rejects unknown fields
345 jsonpath: Optional[str] = Field(None, description="JSONPath expression for querying JSON data.")
346 mapping: Optional[Dict[str, str]] = Field(None, description="Mapping of fields from original data to output.")
349# --- Tool Schemas ---
350# Authentication model
351class AuthenticationValues(BaseModelWithConfigDict):
352 """Schema for all Authentications.
353 Provides the authentication values for different types of authentication.
354 """
356 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, authheaders or None")
357 auth_value: Optional[str] = Field(None, description="Encoded Authentication values")
359 # Only For tool read and view tool
360 username: Optional[str] = Field("", description="Username for basic authentication")
361 password: Optional[str] = Field("", description="Password for basic authentication")
362 token: Optional[str] = Field("", description="Bearer token for authentication")
363 auth_header_key: Optional[str] = Field("", description="Key for custom headers authentication (legacy single header)")
364 auth_header_value: Optional[str] = Field("", description="Value for custom headers authentication (legacy single header)")
365 authHeaders: Optional[List[Dict[str, str]]] = Field(None, alias="authHeaders", description="List of custom headers for authentication (multi-header format)") # noqa: N815
368class ToolCreate(BaseModel):
369 """
370 Represents the configuration for creating a tool with various attributes and settings.
372 Attributes:
373 model_config (ConfigDict): Configuration for the model.
374 name (str): Unique name for the tool.
375 url (Union[str, AnyHttpUrl]): Tool endpoint URL.
376 description (Optional[str]): Tool description.
377 integration_type (Literal["REST", "MCP"]): Tool integration type - REST for individual endpoints, MCP for gateway-discovered tools.
378 request_type (Literal["GET", "POST", "PUT", "DELETE", "PATCH"]): HTTP method to be used for invoking the tool.
379 headers (Optional[Dict[str, str]]): Additional headers to send when invoking the tool.
380 input_schema (Optional[Dict[str, Any]]): JSON Schema for validating tool parameters. Alias 'inputSchema'.
381 output_schema (Optional[Dict[str, Any]]): JSON Schema for validating tool output. Alias 'outputSchema'.
382 annotations (Optional[Dict[str, Any]]): Tool annotations for behavior hints such as title, readOnlyHint, destructiveHint, idempotentHint, openWorldHint.
383 jsonpath_filter (Optional[str]): JSON modification filter.
384 auth (Optional[AuthenticationValues]): Authentication credentials (Basic or Bearer Token or custom headers) if required.
385 gateway_id (Optional[str]): ID of the gateway for the tool.
386 """
388 model_config = ConfigDict(str_strip_whitespace=True, populate_by_name=True)
389 allow_auto: bool = False # Internal flag to allow system-initiated A2A tool creation
391 name: str = Field(..., description="Unique name for the tool")
392 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
393 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the tool (MCP BaseMetadata)")
394 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
395 description: Optional[str] = Field(None, description="Tool description")
396 integration_type: Literal["REST", "MCP", "A2A"] = Field("REST", description="'REST' for individual endpoints, 'MCP' for gateway-discovered tools, 'A2A' for A2A agents")
397 request_type: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "SSE", "STDIO", "STREAMABLEHTTP"] = Field("SSE", description="HTTP method to be used for invoking the tool")
398 headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
399 input_schema: Optional[Dict[str, Any]] = Field(default_factory=lambda: {"type": "object", "properties": {}}, description="JSON Schema for validating tool parameters", alias="inputSchema")
400 output_schema: Optional[Dict[str, Any]] = Field(default=None, description="JSON Schema for validating tool output", alias="outputSchema")
401 annotations: Optional[Dict[str, Any]] = Field(
402 default_factory=dict,
403 description="Tool annotations for behavior hints (title, readOnlyHint, destructiveHint, idempotentHint, openWorldHint)",
404 )
405 jsonpath_filter: Optional[str] = Field(default="", description="JSON modification filter")
406 auth: Optional[AuthenticationValues] = Field(None, description="Authentication credentials (Basic or Bearer Token or custom headers) if required")
407 gateway_id: Optional[str] = Field(None, description="id of gateway for the tool")
408 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the tool")
410 # Team scoping fields
411 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
412 owner_email: Optional[str] = Field(None, description="Email of the tool owner")
413 visibility: Optional[Literal["private", "team", "public"]] = Field(default=None, description="Visibility level: private, team, or public")
415 # Passthrough REST fields
416 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
417 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
418 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
419 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
420 timeout_ms: Optional[int] = Field(default=None, description="Timeout in milliseconds for REST passthrough (20000 if integration_type='REST', else None)")
421 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
422 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
423 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
424 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
426 @field_validator("tags")
427 @classmethod
428 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
429 """Validate and normalize tags.
431 Args:
432 v: Optional list of tag strings to validate
434 Returns:
435 List of validated tag strings
436 """
437 return validate_tags_field(v)
439 @field_validator("name")
440 @classmethod
441 def validate_name(cls, v: str) -> str:
442 """Ensure tool names follow MCP naming conventions
444 Args:
445 v (str): Value to validate
447 Returns:
448 str: Value if validated as safe
450 Raises:
451 ValueError: When displayName contains unsafe content or exceeds length limits
453 Examples:
454 >>> from mcpgateway.schemas import ToolCreate
455 >>> ToolCreate.validate_name('valid_tool')
456 'valid_tool'
457 >>> ToolCreate.validate_name('Invalid Tool!')
458 Traceback (most recent call last):
459 ...
460 ValueError: ...
461 """
462 return SecurityValidator.validate_tool_name(v)
464 @field_validator("url")
465 @classmethod
466 def validate_url(cls, v: Optional[str]) -> Optional[str]:
467 """Validate URL format and ensure safe display
469 Args:
470 v (Optional[str]): Value to validate
472 Returns:
473 Optional[str]: Value if validated as safe
475 Raises:
476 ValueError: When displayName contains unsafe content or exceeds length limits
478 Examples:
479 >>> from mcpgateway.schemas import ToolCreate
480 >>> ToolCreate.validate_url('https://example.com')
481 'https://example.com'
482 >>> ToolCreate.validate_url('ftp://example.com')
483 Traceback (most recent call last):
484 ...
485 ValueError: ...
486 """
487 if v is None:
488 return v
489 return validate_core_url(v, "Tool URL")
491 @field_validator("description")
492 @classmethod
493 def validate_description(cls, v: Optional[str]) -> Optional[str]:
494 """Ensure descriptions display safely, truncate if too long
496 Args:
497 v (str): Value to validate
499 Returns:
500 str: Value if validated as safe and truncated if too long
502 Raises:
503 ValueError: When value is unsafe and VALIDATION_STRICT=true (default)
505 Note:
506 When ``VALIDATION_STRICT=false`` the forbidden-pattern check is skipped
507 and a warning is logged instead. This allows MCP server tools whose
508 descriptions contain Markdown syntax (e.g. ``> blockquote``,
509 ``< input``, ``cmd | grep``) to register successfully.
511 Examples:
512 >>> from mcpgateway.schemas import ToolCreate
513 >>> ToolCreate.validate_description('A safe description')
514 'A safe description'
515 >>> ToolCreate.validate_description(None) # Test None case
516 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
517 >>> truncated = ToolCreate.validate_description(long_desc)
518 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
519 0
520 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
521 True
522 """
523 if v is None:
524 return v
526 # Note: backticks (`) and semicolons (;) are allowed as they are commonly used in Markdown
527 # for inline code examples in tool descriptions.
528 # When VALIDATION_STRICT=false these patterns produce a warning only so
529 # that MCP servers with Markdown-formatted descriptions (e.g. "> quote",
530 # "< input", "cmd | grep") can register without error.
531 if settings.tool_description_forbidden_patterns_enabled:
532 for pat in settings.tool_description_forbidden_patterns:
533 if not pat or not pat.strip():
534 continue
535 if pat in v:
536 if settings.validation_strict:
537 raise ValueError(f"Description contains unsafe characters: '{pat}'")
538 logger.warning("Description contains potentially unsafe characters: '%s' (VALIDATION_STRICT=false, proceeding)", pat)
539 break
541 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
542 # Truncate the description to the maximum allowed length
543 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
544 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
545 return SecurityValidator.sanitize_display_text(truncated, "Description")
546 return SecurityValidator.sanitize_display_text(v, "Description")
548 @field_validator("displayName")
549 @classmethod
550 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
551 """Ensure display names display safely
553 Args:
554 v (str): Value to validate
556 Returns:
557 str: Value if validated as safe
559 Raises:
560 ValueError: When displayName contains unsafe content or exceeds length limits
562 Examples:
563 >>> from mcpgateway.schemas import ToolCreate
564 >>> ToolCreate.validate_display_name('My Custom Tool')
565 'My Custom Tool'
566 >>> ToolCreate.validate_display_name('<script>alert("xss")</script>')
567 Traceback (most recent call last):
568 ...
569 ValueError: ...
570 """
571 if v is None:
572 return v
573 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
574 raise ValueError(f"Display name exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
575 return SecurityValidator.sanitize_display_text(v, "Display name")
577 @field_validator("headers", "input_schema", "annotations")
578 @classmethod
579 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
580 """Validate JSON structure depth
582 Args:
583 v (dict): Value to validate
585 Returns:
586 dict: Value if validated as safe
588 Examples:
589 >>> from mcpgateway.schemas import ToolCreate
590 >>> ToolCreate.validate_json_fields({'a': 1})
591 {'a': 1}
592 >>> # Test depth within limit (11 levels, default limit is 30)
593 >>> ToolCreate.validate_json_fields({'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': {'k': 1}}}}}}}}}}})
594 {'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': {'k': 1}}}}}}}}}}}
595 >>> # Test exceeding depth limit (31 levels)
596 >>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'too deep'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
597 >>> ToolCreate.validate_json_fields(deep_31)
598 Traceback (most recent call last):
599 ...
600 ValueError: ...
601 """
602 SecurityValidator.validate_json_depth(v)
603 return v
605 @field_validator("request_type")
606 @classmethod
607 def validate_request_type(cls, v: str, info: ValidationInfo) -> str:
608 """Validate request type based on integration type (REST, MCP, A2A)
610 Args:
611 v (str): Value to validate
612 info (ValidationInfo): Values used for validation
614 Returns:
615 str: Value if validated as safe
617 Raises:
618 ValueError: When value is unsafe
620 Examples:
621 >>> from pydantic import ValidationInfo
622 >>> # REST integration types with valid methods
623 >>> info_rest = type('obj', (object,), {'data': {'integration_type': 'REST'}})
624 >>> ToolCreate.validate_request_type('POST', info_rest)
625 'POST'
626 >>> ToolCreate.validate_request_type('GET', info_rest)
627 'GET'
628 >>> # MCP integration types with valid transports
629 >>> info_mcp = type('obj', (object,), {'data': {'integration_type': 'MCP'}})
630 >>> ToolCreate.validate_request_type('SSE', info_mcp)
631 'SSE'
632 >>> ToolCreate.validate_request_type('STDIO', info_mcp)
633 'STDIO'
634 >>> # A2A integration type with valid method
635 >>> info_a2a = type('obj', (object,), {'data': {'integration_type': 'A2A'}})
636 >>> ToolCreate.validate_request_type('POST', info_a2a)
637 'POST'
638 >>> # Invalid REST type
639 >>> try:
640 ... ToolCreate.validate_request_type('SSE', info_rest)
641 ... except ValueError as e:
642 ... "not allowed for REST" in str(e)
643 True
644 >>> # Invalid MCP type
645 >>> try:
646 ... ToolCreate.validate_request_type('POST', info_mcp)
647 ... except ValueError as e:
648 ... "not allowed for MCP" in str(e)
649 True
650 >>> # Invalid A2A type
651 >>> try:
652 ... ToolCreate.validate_request_type('GET', info_a2a)
653 ... except ValueError as e:
654 ... "not allowed for A2A" in str(e)
655 True
656 >>> # Invalid integration type
657 >>> info_invalid = type('obj', (object,), {'data': {'integration_type': 'INVALID'}})
658 >>> try:
659 ... ToolCreate.validate_request_type('GET', info_invalid)
660 ... except ValueError as e:
661 ... "Unknown integration type" in str(e)
662 True
663 """
665 integration_type = info.data.get("integration_type")
667 if integration_type not in ["REST", "MCP", "A2A"]:
668 raise ValueError(f"Unknown integration type: {integration_type}")
670 if integration_type == "REST":
671 allowed = ["GET", "POST", "PUT", "DELETE", "PATCH"]
672 if v not in allowed:
673 raise ValueError(f"Request type '{v}' not allowed for REST. Only {allowed} methods are accepted.")
674 elif integration_type == "MCP":
675 allowed = ["SSE", "STDIO", "STREAMABLEHTTP"]
676 if v not in allowed:
677 raise ValueError(f"Request type '{v}' not allowed for MCP. Only {allowed} transports are accepted.")
678 elif integration_type == "A2A":
679 allowed = ["POST"]
680 if v not in allowed:
681 raise ValueError(f"Request type '{v}' not allowed for A2A. Only {allowed} methods are accepted.")
682 return v
684 @model_validator(mode="before")
685 @classmethod
686 def assemble_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
687 """
688 Assemble authentication information from separate keys if provided.
690 Looks for keys "auth_type", "auth_username", "auth_password", "auth_token", "auth_header_key" and "auth_header_value".
691 Constructs the "auth" field as a dictionary suitable for BasicAuth or BearerTokenAuth or HeadersAuth.
693 Args:
694 values: Dict with authentication information
696 Returns:
697 Dict: Reformatedd values dict
699 Examples:
700 >>> # Test basic auth
701 >>> values = {'auth_type': 'basic', 'auth_username': 'user', 'auth_password': 'pass'}
702 >>> result = ToolCreate.assemble_auth(values)
703 >>> 'auth' in result
704 True
705 >>> result['auth']['auth_type']
706 'basic'
708 >>> # Test bearer auth
709 >>> values = {'auth_type': 'bearer', 'auth_token': 'mytoken'}
710 >>> result = ToolCreate.assemble_auth(values)
711 >>> result['auth']['auth_type']
712 'bearer'
714 >>> # Test authheaders
715 >>> values = {'auth_type': 'authheaders', 'auth_header_key': 'X-API-Key', 'auth_header_value': 'secret'}
716 >>> result = ToolCreate.assemble_auth(values)
717 >>> result['auth']['auth_type']
718 'authheaders'
720 >>> # Test no auth type
721 >>> values = {'name': 'test'}
722 >>> result = ToolCreate.assemble_auth(values)
723 >>> 'auth' in result
724 False
725 """
726 logger.debug(
727 "Assembling auth in ToolCreate with raw values",
728 extra={
729 "auth_type": values.get("auth_type"),
730 "auth_username": values.get("auth_username"),
731 "auth_header_key": values.get("auth_header_key"),
732 "auth_assembled": bool(values.get("auth_type") and str(values.get("auth_type")).lower() != "one_time_auth"),
733 },
734 )
736 auth_type = values.get("auth_type")
737 if auth_type and auth_type.lower() != "one_time_auth":
738 if auth_type.lower() == "basic":
739 creds = base64.b64encode(f"{values.get('auth_username', '')}:{values.get('auth_password', '')}".encode("utf-8")).decode()
740 encoded_auth = encode_auth({"Authorization": f"Basic {creds}"})
741 values["auth"] = {"auth_type": "basic", "auth_value": encoded_auth}
742 elif auth_type.lower() == "bearer":
743 encoded_auth = encode_auth({"Authorization": f"Bearer {values.get('auth_token', '')}"})
744 values["auth"] = {"auth_type": "bearer", "auth_value": encoded_auth}
745 elif auth_type.lower() == "authheaders":
746 header_key = values.get("auth_header_key", "")
747 header_value = values.get("auth_header_value", "")
748 if header_key and header_value:
749 encoded_auth = encode_auth({header_key: header_value})
750 values["auth"] = {"auth_type": "authheaders", "auth_value": encoded_auth}
751 else:
752 # Don't encode empty headers - leave auth empty
753 values["auth"] = {"auth_type": "authheaders", "auth_value": None}
754 return values
756 @model_validator(mode="before")
757 @classmethod
758 def prevent_manual_mcp_creation(cls, values: Dict[str, Any]) -> Dict[str, Any]:
759 """
760 Prevent manual creation of MCP tools via API.
762 MCP tools should only be created by the gateway service when discovering
763 tools from MCP servers. Users should add MCP servers via the Gateways interface.
765 Args:
766 values: The input values
768 Returns:
769 Dict[str, Any]: The validated values
771 Raises:
772 ValueError: If attempting to manually create MCP integration type
773 """
774 integration_type = values.get("integration_type")
775 allow_auto = values.get("allow_auto", False)
776 if integration_type == "MCP":
777 raise ValueError("Cannot manually create MCP tools. Add MCP servers via the Gateways interface - tools will be auto-discovered and registered with integration_type='MCP'.")
778 if integration_type == "A2A" and not allow_auto:
779 raise ValueError("Cannot manually create A2A tools. Add A2A agents via the A2A interface - tools will be auto-created when agents are associated with servers.")
780 return values
782 @model_validator(mode="before")
783 @classmethod
784 def enforce_passthrough_fields_for_rest(cls, values: Dict[str, Any]) -> Dict[str, Any]:
785 """
786 Enforce that passthrough REST fields are only set for integration_type 'REST'.
787 If any passthrough field is set for non-REST, raise ValueError.
789 Args:
790 values (Dict[str, Any]): The input values to validate.
792 Returns:
793 Dict[str, Any]: The validated values.
795 Raises:
796 ValueError: If passthrough fields are set for non-REST integration_type.
797 """
798 passthrough_fields = ["base_url", "path_template", "query_mapping", "header_mapping", "timeout_ms", "expose_passthrough", "allowlist", "plugin_chain_pre", "plugin_chain_post"]
799 integration_type = values.get("integration_type")
800 if integration_type != "REST":
801 for field in passthrough_fields:
802 if field in values and values[field] not in (None, [], {}):
803 raise ValueError(f"Field '{field}' is only allowed for integration_type 'REST'.")
804 return values
806 @model_validator(mode="before")
807 @classmethod
808 def extract_base_url_and_path_template(cls, values: dict) -> dict:
809 """
810 Only for integration_type 'REST':
811 If 'url' is provided, extract 'base_url' and 'path_template'.
812 Ensures path_template starts with a single '/'.
814 Args:
815 values (dict): The input values to process.
817 Returns:
818 dict: The updated values with base_url and path_template if applicable.
819 """
820 integration_type = values.get("integration_type")
821 if integration_type != "REST":
822 # Only process for REST, skip for others
823 return values
824 url = values.get("url")
825 if url:
826 parsed = urlparse(str(url))
827 base_url = f"{parsed.scheme}://{parsed.netloc}"
828 path_template = parsed.path
829 # Ensure path_template starts with a single '/'
830 if path_template:
831 path_template = "/" + path_template.lstrip("/")
832 if not values.get("base_url"):
833 values["base_url"] = base_url
834 if not values.get("path_template"):
835 values["path_template"] = path_template
836 return values
838 @field_validator("base_url")
839 @classmethod
840 def validate_base_url(cls, v):
841 """
842 Validate that base_url is a valid URL with scheme and netloc.
844 Args:
845 v (str): The base_url value to validate.
847 Returns:
848 str: The validated base_url value.
850 Raises:
851 ValueError: If base_url is not a valid URL.
852 """
853 if v is None:
854 return v
855 parsed = urlparse(str(v))
856 if not parsed.scheme or not parsed.netloc:
857 raise ValueError("base_url must be a valid URL with scheme and netloc")
858 return v
860 @field_validator("path_template")
861 @classmethod
862 def validate_path_template(cls, v):
863 """
864 Validate that path_template starts with '/'.
866 Args:
867 v (str): The path_template value to validate.
869 Returns:
870 str: The validated path_template value.
872 Raises:
873 ValueError: If path_template does not start with '/'.
874 """
875 if v and not str(v).startswith("/"):
876 raise ValueError("path_template must start with '/'")
877 return v
879 @field_validator("timeout_ms")
880 @classmethod
881 def validate_timeout_ms(cls, v):
882 """
883 Validate that timeout_ms is a positive integer.
885 Args:
886 v (int): The timeout_ms value to validate.
888 Returns:
889 int: The validated timeout_ms value.
891 Raises:
892 ValueError: If timeout_ms is not a positive integer.
893 """
894 if v is not None and v <= 0:
895 raise ValueError("timeout_ms must be a positive integer")
896 return v
898 @field_validator("allowlist")
899 @classmethod
900 def validate_allowlist(cls, v):
901 """
902 Validate that allowlist is a list and each entry is a valid host or scheme string.
904 Args:
905 v (List[str]): The allowlist to validate.
907 Returns:
908 List[str]: The validated allowlist.
910 Raises:
911 ValueError: If allowlist is not a list or any entry is not a valid host/scheme string.
912 """
913 if v is None:
914 return None
915 if not isinstance(v, list):
916 raise ValueError("allowlist must be a list of host/scheme strings")
917 # Uses precompiled regex for hostname validation
918 for host in v:
919 if not isinstance(host, str):
920 raise ValueError(f"Invalid type in allowlist: {host} (must be str)")
921 if not _HOSTNAME_RE.match(host):
922 raise ValueError(f"Invalid host/scheme in allowlist: {host}")
923 return v
925 @field_validator("plugin_chain_pre", "plugin_chain_post")
926 @classmethod
927 def validate_plugin_chain(cls, v):
928 """
929 Validate that each plugin in the chain is allowed.
931 Args:
932 v (List[str]): The plugin chain to validate.
934 Returns:
935 List[str]: The validated plugin chain.
937 Raises:
938 ValueError: If any plugin is not in the allowed set.
939 """
940 allowed_plugins = {"deny_filter", "rate_limit", "pii_filter", "response_shape", "regex_filter", "resource_filter"}
941 if v is not None:
942 for plugin in v:
943 if plugin not in allowed_plugins:
944 raise ValueError(f"Unknown plugin: {plugin}")
945 return v
947 @model_validator(mode="after")
948 def handle_timeout_ms_defaults(self):
949 """Handle timeout_ms defaults based on integration_type and expose_passthrough.
951 Returns:
952 self: The validated model instance with timeout_ms potentially set to default.
953 """
954 # If timeout_ms is None and we have REST with passthrough, set default
955 if self.timeout_ms is None and self.integration_type == "REST" and getattr(self, "expose_passthrough", True):
956 self.timeout_ms = 20000
957 return self
960class ToolUpdate(BaseModelWithConfigDict):
961 """Schema for updating an existing tool.
963 Similar to ToolCreate but all fields are optional to allow partial updates.
964 """
966 name: Optional[str] = Field(None, description="Unique name for the tool")
967 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
968 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the tool (MCP BaseMetadata)")
969 custom_name: Optional[str] = Field(None, description="Custom name for the tool")
970 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
971 description: Optional[str] = Field(None, description="Tool description")
972 integration_type: Optional[Literal["REST", "MCP", "A2A"]] = Field(None, description="Tool integration type")
973 request_type: Optional[Literal["GET", "POST", "PUT", "DELETE", "PATCH"]] = Field(None, description="HTTP method to be used for invoking the tool")
974 headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
975 input_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool parameters")
976 output_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool output")
977 annotations: Optional[Dict[str, Any]] = Field(None, description="Tool annotations for behavior hints")
978 jsonpath_filter: Optional[str] = Field(None, description="JSON path filter for rpc tool calls")
979 auth: Optional[AuthenticationValues] = Field(None, description="Authentication credentials (Basic or Bearer Token or custom headers) if required")
980 gateway_id: Optional[str] = Field(None, description="id of gateway for the tool")
981 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the tool")
982 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
984 # Passthrough REST fields
985 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
986 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
987 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
988 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
989 timeout_ms: Optional[int] = Field(default=None, description="Timeout in milliseconds for REST passthrough (20000 if integration_type='REST', else None)")
990 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
991 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
992 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
993 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
995 @field_validator("tags")
996 @classmethod
997 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
998 """Validate and normalize tags.
1000 Args:
1001 v: Optional list of tag strings to validate
1003 Returns:
1004 List of validated tag strings or None if input is None
1005 """
1006 return validate_tags_field(v)
1008 @field_validator("name")
1009 @classmethod
1010 def validate_name(cls, v: str) -> str:
1011 """Ensure tool names follow MCP naming conventions
1013 Args:
1014 v (str): Value to validate
1016 Returns:
1017 str: Value if validated as safe
1018 """
1019 return SecurityValidator.validate_tool_name(v)
1021 @field_validator("custom_name")
1022 @classmethod
1023 def validate_custom_name(cls, v: str) -> str:
1024 """Ensure custom tool names follow MCP naming conventions
1026 Args:
1027 v (str): Value to validate
1029 Returns:
1030 str: Value if validated as safe
1031 """
1032 return SecurityValidator.validate_tool_name(v)
1034 @field_validator("url")
1035 @classmethod
1036 def validate_url(cls, v: Optional[str]) -> Optional[str]:
1037 """Validate URL format and ensure safe display
1039 Args:
1040 v (Optional[str]): Value to validate
1042 Returns:
1043 Optional[str]: Value if validated as safe
1044 """
1045 if v is None:
1046 return v
1047 return validate_core_url(v, "Tool URL")
1049 @field_validator("description")
1050 @classmethod
1051 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1052 """Ensure descriptions display safely, truncate if too long
1054 Args:
1055 v (str): Value to validate
1057 Returns:
1058 str: Value if validated as safe and truncated if too long
1060 Raises:
1061 ValueError: When value is unsafe and VALIDATION_STRICT=true (default)
1063 Note:
1064 When ``VALIDATION_STRICT=false`` the forbidden-pattern check is skipped
1065 and a warning is logged instead. This allows MCP server tools whose
1066 descriptions contain Markdown syntax (e.g. ``> blockquote``,
1067 ``< input``, ``cmd | grep``) to be updated successfully.
1069 Examples:
1070 >>> from mcpgateway.schemas import ToolUpdate
1071 >>> ToolUpdate.validate_description('A safe description')
1072 'A safe description'
1073 >>> ToolUpdate.validate_description(None) # Test None case
1074 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1075 >>> truncated = ToolUpdate.validate_description(long_desc)
1076 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1077 0
1078 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1079 True
1080 """
1081 if v is None:
1082 return v
1084 # Note: backticks (`) and semicolons (;) are allowed as they are commonly used in Markdown
1085 # for inline code examples in tool descriptions.
1086 # When VALIDATION_STRICT=false these patterns produce a warning only so
1087 # that MCP servers with Markdown-formatted descriptions (e.g. "> quote",
1088 # "< input", "cmd | grep") can be updated without error.
1089 if settings.tool_description_forbidden_patterns_enabled:
1090 for pat in settings.tool_description_forbidden_patterns:
1091 if not pat or not pat.strip():
1092 continue
1093 if pat in v:
1094 if settings.validation_strict:
1095 raise ValueError(f"Description contains unsafe characters: '{pat}'")
1096 logger.warning("Description contains potentially unsafe characters: '%s' (VALIDATION_STRICT=false, proceeding)", pat)
1097 break
1099 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1100 # Truncate the description to the maximum allowed length
1101 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1102 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1103 return SecurityValidator.sanitize_display_text(truncated, "Description")
1104 return SecurityValidator.sanitize_display_text(v, "Description")
1106 @field_validator("headers", "input_schema", "annotations")
1107 @classmethod
1108 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
1109 """Validate JSON structure depth
1111 Args:
1112 v (dict): Value to validate
1114 Returns:
1115 dict: Value if validated as safe
1116 """
1117 SecurityValidator.validate_json_depth(v)
1118 return v
1120 @field_validator("request_type")
1121 @classmethod
1122 def validate_request_type(cls, v: str, info: ValidationInfo) -> str:
1123 """Validate request type based on integration type
1125 Args:
1126 v (str): Value to validate
1127 info (ValidationInfo): Validation context with other field values
1129 Returns:
1130 str: Value if validated as safe
1132 Raises:
1133 ValueError: When value is unsafe
1134 """
1136 integration_type = info.data.get("integration_type", "REST")
1138 if integration_type == "REST":
1139 allowed = ["GET", "POST", "PUT", "DELETE", "PATCH"]
1140 elif integration_type == "MCP":
1141 allowed = ["SSE", "STDIO", "STREAMABLEHTTP"]
1142 elif integration_type == "A2A":
1143 allowed = ["POST"] # A2A agents typically use POST
1144 else:
1145 raise ValueError(f"Unknown integration type: {integration_type}")
1147 if v not in allowed:
1148 raise ValueError(f"Request type '{v}' not allowed for {integration_type} integration")
1149 return v
1151 @model_validator(mode="before")
1152 @classmethod
1153 def assemble_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
1154 """
1155 Assemble authentication information from separate keys if provided.
1157 Looks for keys "auth_type", "auth_username", "auth_password", "auth_token", "auth_header_key" and "auth_header_value".
1158 Constructs the "auth" field as a dictionary suitable for BasicAuth or BearerTokenAuth or HeadersAuth.
1160 Args:
1161 values: Dict with authentication information
1163 Returns:
1164 Dict: Reformatedd values dict
1165 """
1166 logger.debug(
1167 "Assembling auth in ToolCreate with raw values",
1168 extra={
1169 "auth_type": values.get("auth_type"),
1170 "auth_username": values.get("auth_username"),
1171 "auth_header_key": values.get("auth_header_key"),
1172 "auth_assembled": bool(values.get("auth_type") and str(values.get("auth_type")).lower() != "one_time_auth"),
1173 },
1174 )
1176 auth_type = values.get("auth_type")
1177 if auth_type and auth_type.lower() != "one_time_auth":
1178 if auth_type.lower() == "basic":
1179 creds = base64.b64encode(f"{values.get('auth_username', '')}:{values.get('auth_password', '')}".encode("utf-8")).decode()
1180 encoded_auth = encode_auth({"Authorization": f"Basic {creds}"})
1181 values["auth"] = {"auth_type": "basic", "auth_value": encoded_auth}
1182 elif auth_type.lower() == "bearer":
1183 encoded_auth = encode_auth({"Authorization": f"Bearer {values.get('auth_token', '')}"})
1184 values["auth"] = {"auth_type": "bearer", "auth_value": encoded_auth}
1185 elif auth_type.lower() == "authheaders":
1186 header_key = values.get("auth_header_key", "")
1187 header_value = values.get("auth_header_value", "")
1188 if header_key and header_value:
1189 encoded_auth = encode_auth({header_key: header_value})
1190 values["auth"] = {"auth_type": "authheaders", "auth_value": encoded_auth}
1191 else:
1192 # Don't encode empty headers - leave auth empty
1193 values["auth"] = {"auth_type": "authheaders", "auth_value": None}
1194 return values
1196 @field_validator("displayName")
1197 @classmethod
1198 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
1199 """Ensure display names display safely
1201 Args:
1202 v (str): Value to validate
1204 Returns:
1205 str: Value if validated as safe
1207 Raises:
1208 ValueError: When displayName contains unsafe content or exceeds length limits
1210 Examples:
1211 >>> from mcpgateway.schemas import ToolUpdate
1212 >>> ToolUpdate.validate_display_name('My Custom Tool')
1213 'My Custom Tool'
1214 >>> ToolUpdate.validate_display_name('<script>alert("xss")</script>')
1215 Traceback (most recent call last):
1216 ...
1217 ValueError: ...
1218 """
1219 if v is None:
1220 return v
1221 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
1222 raise ValueError(f"Display name exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
1223 return SecurityValidator.sanitize_display_text(v, "Display name")
1225 @model_validator(mode="before")
1226 @classmethod
1227 def prevent_manual_mcp_update(cls, values: Dict[str, Any]) -> Dict[str, Any]:
1228 """
1229 Prevent updating tools to MCP integration type via API.
1231 MCP tools should only be managed by the gateway service. Users should not
1232 be able to change a REST tool to MCP type or vice versa manually.
1234 Args:
1235 values: The input values
1237 Returns:
1238 Dict[str, Any]: The validated values
1240 Raises:
1241 ValueError: If attempting to update to MCP integration type
1242 """
1243 integration_type = values.get("integration_type")
1244 if integration_type == "MCP":
1245 raise ValueError("Cannot update tools to MCP integration type. MCP tools are managed by the gateway service.")
1246 if integration_type == "A2A":
1247 raise ValueError("Cannot update tools to A2A integration type. A2A tools are managed by the A2A service.")
1248 return values
1250 @model_validator(mode="before")
1251 @classmethod
1252 def extract_base_url_and_path_template(cls, values: dict) -> dict:
1253 """
1254 If 'integration_type' is 'REST' and 'url' is provided, extract 'base_url' and 'path_template'.
1255 Ensures path_template starts with a single '/'.
1257 Args:
1258 values (dict): The input values to process.
1260 Returns:
1261 dict: The updated values with base_url and path_template if applicable.
1262 """
1263 integration_type = values.get("integration_type")
1264 url = values.get("url")
1265 if integration_type == "REST" and url:
1266 parsed = urlparse(str(url))
1267 base_url = f"{parsed.scheme}://{parsed.netloc}"
1268 path_template = parsed.path
1269 # Ensure path_template starts with a single '/'
1270 if path_template:
1271 path_template = "/" + path_template.lstrip("/")
1272 if not values.get("base_url"):
1273 values["base_url"] = base_url
1274 if not values.get("path_template"):
1275 values["path_template"] = path_template
1276 return values
1278 @field_validator("base_url")
1279 @classmethod
1280 def validate_base_url(cls, v):
1281 """
1282 Validate that base_url is a valid URL with scheme and netloc.
1284 Args:
1285 v (str): The base_url value to validate.
1287 Returns:
1288 str: The validated base_url value.
1290 Raises:
1291 ValueError: If base_url is not a valid URL.
1292 """
1293 if v is None:
1294 return v
1295 parsed = urlparse(str(v))
1296 if not parsed.scheme or not parsed.netloc:
1297 raise ValueError("base_url must be a valid URL with scheme and netloc")
1298 return v
1300 @field_validator("path_template")
1301 @classmethod
1302 def validate_path_template(cls, v):
1303 """
1304 Validate that path_template starts with '/'.
1306 Args:
1307 v (str): The path_template value to validate.
1309 Returns:
1310 str: The validated path_template value.
1312 Raises:
1313 ValueError: If path_template does not start with '/'.
1314 """
1315 if v and not str(v).startswith("/"):
1316 raise ValueError("path_template must start with '/'")
1317 return v
1319 @field_validator("timeout_ms")
1320 @classmethod
1321 def validate_timeout_ms(cls, v):
1322 """
1323 Validate that timeout_ms is a positive integer.
1325 Args:
1326 v (int): The timeout_ms value to validate.
1328 Returns:
1329 int: The validated timeout_ms value.
1331 Raises:
1332 ValueError: If timeout_ms is not a positive integer.
1333 """
1334 if v is not None and v <= 0:
1335 raise ValueError("timeout_ms must be a positive integer")
1336 return v
1338 @field_validator("allowlist")
1339 @classmethod
1340 def validate_allowlist(cls, v):
1341 """
1342 Validate that allowlist is a list and each entry is a valid host or scheme string.
1344 Args:
1345 v (List[str]): The allowlist to validate.
1347 Returns:
1348 List[str]: The validated allowlist.
1350 Raises:
1351 ValueError: If allowlist is not a list or any entry is not a valid host/scheme string.
1352 """
1353 if v is None:
1354 return None
1355 if not isinstance(v, list):
1356 raise ValueError("allowlist must be a list of host/scheme strings")
1357 # Uses precompiled regex for hostname validation
1358 for host in v:
1359 if not isinstance(host, str):
1360 raise ValueError(f"Invalid type in allowlist: {host} (must be str)")
1361 if not _HOSTNAME_RE.match(host):
1362 raise ValueError(f"Invalid host/scheme in allowlist: {host}")
1363 return v
1365 @field_validator("plugin_chain_pre", "plugin_chain_post")
1366 @classmethod
1367 def validate_plugin_chain(cls, v):
1368 """
1369 Validate that each plugin in the chain is allowed.
1371 Args:
1372 v (List[str]): The plugin chain to validate.
1374 Returns:
1375 List[str]: The validated plugin chain.
1377 Raises:
1378 ValueError: If any plugin is not in the allowed set.
1379 """
1380 allowed_plugins = {"deny_filter", "rate_limit", "pii_filter", "response_shape", "regex_filter", "resource_filter"}
1381 if v is not None:
1382 for plugin in v:
1383 if plugin not in allowed_plugins:
1384 raise ValueError(f"Unknown plugin: {plugin}")
1385 return v
1388class ToolRead(BaseModelWithConfigDict):
1389 """Schema for reading tool information.
1391 Includes all tool fields plus:
1392 - Database ID
1393 - Creation/update timestamps
1394 - enabled: If Tool is enabled or disabled.
1395 - reachable: If Tool is reachable or not.
1396 - Gateway ID for federation
1397 - Execution count indicating the number of times the tool has been executed.
1398 - Metrics: Aggregated metrics for the tool invocations.
1399 - Request type and authentication settings.
1400 """
1402 id: str
1403 original_name: str
1404 url: Optional[str]
1405 description: Optional[str]
1406 original_description: Optional[str] = None
1407 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the tool (MCP BaseMetadata)")
1408 request_type: str
1409 integration_type: str
1410 headers: Optional[Dict[str, str]]
1411 input_schema: Dict[str, Any]
1412 output_schema: Optional[Dict[str, Any]] = Field(None)
1413 annotations: Optional[Dict[str, Any]]
1414 jsonpath_filter: Optional[str]
1415 auth: Optional[AuthenticationValues]
1416 created_at: datetime
1417 updated_at: datetime
1418 enabled: bool
1419 reachable: bool
1420 gateway_id: Optional[str]
1421 execution_count: Optional[int] = Field(None)
1422 metrics: Optional[ToolMetrics] = Field(None)
1423 name: str
1424 displayName: Optional[str] = Field(None, description="Display name for the tool (shown in UI)") # noqa: N815
1425 gateway_slug: str
1426 custom_name: str
1427 custom_name_slug: str
1428 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the tool")
1430 # Comprehensive metadata for audit tracking
1431 created_by: Optional[str] = Field(None, description="Username who created this entity")
1432 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
1433 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
1434 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
1436 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
1437 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
1438 modified_via: Optional[str] = Field(None, description="Modification method")
1439 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
1441 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
1442 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
1443 version: Optional[int] = Field(1, description="Entity version for change tracking")
1445 # Team scoping fields
1446 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
1447 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
1448 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
1449 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
1451 # Passthrough REST fields
1452 base_url: Optional[str] = Field(None, description="Base URL for REST passthrough")
1453 path_template: Optional[str] = Field(None, description="Path template for REST passthrough")
1454 query_mapping: Optional[Dict[str, Any]] = Field(None, description="Query mapping for REST passthrough")
1455 header_mapping: Optional[Dict[str, Any]] = Field(None, description="Header mapping for REST passthrough")
1456 timeout_ms: Optional[int] = Field(20000, description="Timeout in milliseconds for REST passthrough")
1457 expose_passthrough: Optional[bool] = Field(True, description="Expose passthrough endpoint for this tool")
1458 allowlist: Optional[List[str]] = Field(None, description="Allowed upstream hosts/schemes for passthrough")
1459 plugin_chain_pre: Optional[List[str]] = Field(None, description="Pre-plugin chain for passthrough")
1460 plugin_chain_post: Optional[List[str]] = Field(None, description="Post-plugin chain for passthrough")
1462 # MCP protocol extension field
1463 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
1465 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
1468class ToolInvocation(BaseModelWithConfigDict):
1469 """Schema for tool invocation requests.
1471 This schema validates tool invocation requests to ensure they follow MCP
1472 (Model Context Protocol) naming conventions and prevent security vulnerabilities
1473 such as XSS attacks or deeply nested payloads that could cause DoS.
1475 Captures:
1476 - Tool name to invoke (validated for safety and MCP compliance)
1477 - Arguments matching tool's input schema (validated for depth limits)
1479 Validation Rules:
1480 - Tool names must start with a letter, number, or underscore and contain only
1481 letters, numbers, periods, underscores, hyphens, and slashes (per SEP-986)
1482 - Tool names cannot contain HTML special characters (<, >, ", ')
1483 - Arguments are validated to prevent excessively deep nesting (default max: 10 levels)
1485 Attributes:
1486 name (str): Name of the tool to invoke. Must follow MCP naming conventions.
1487 arguments (Dict[str, Any]): Arguments to pass to the tool. Must match the
1488 tool's input schema and not exceed depth limits.
1490 Examples:
1491 >>> from pydantic import ValidationError
1492 >>> # Valid tool invocation
1493 >>> tool_inv = ToolInvocation(name="get_weather", arguments={"city": "London"})
1494 >>> tool_inv.name
1495 'get_weather'
1496 >>> tool_inv.arguments
1497 {'city': 'London'}
1499 >>> # Valid tool name with underscores and numbers
1500 >>> tool_inv = ToolInvocation(name="tool_v2_beta", arguments={})
1501 >>> tool_inv.name
1502 'tool_v2_beta'
1504 >>> # Invalid: Tool name with special characters
1505 >>> try:
1506 ... ToolInvocation(name="tool-name!", arguments={})
1507 ... except ValidationError as e:
1508 ... print("Validation failed: Special characters not allowed")
1509 Validation failed: Special characters not allowed
1511 >>> # Invalid: XSS attempt in tool name
1512 >>> try:
1513 ... ToolInvocation(name="<script>alert('XSS')</script>", arguments={})
1514 ... except ValidationError as e:
1515 ... print("Validation failed: HTML tags not allowed")
1516 Validation failed: HTML tags not allowed
1518 >>> # Valid: Tool name starting with number (per MCP spec)
1519 >>> tool_num = ToolInvocation(name="123_tool", arguments={})
1520 >>> tool_num.name
1521 '123_tool'
1523 >>> # Valid: Tool name starting with underscore (per MCP spec)
1524 >>> tool_underscore = ToolInvocation(name="_5gpt_query", arguments={})
1525 >>> tool_underscore.name
1526 '_5gpt_query'
1528 >>> # Invalid: Tool name starting with hyphen
1529 >>> try:
1530 ... ToolInvocation(name="-invalid_tool", arguments={})
1531 ... except ValidationError as e:
1532 ... print("Validation failed: Must start with letter, number, or underscore")
1533 Validation failed: Must start with letter, number, or underscore
1535 >>> # Valid: Complex but not too deep arguments
1536 >>> args = {"level1": {"level2": {"level3": {"data": "value"}}}}
1537 >>> tool_inv = ToolInvocation(name="process_data", arguments=args)
1538 >>> tool_inv.arguments["level1"]["level2"]["level3"]["data"]
1539 'value'
1541 >>> # Invalid: Arguments too deeply nested (>30 levels)
1542 >>> deep_args = {"a": {"b": {"c": {"d": {"e": {"f": {"g": {"h": {"i": {"j": {"k": {"l": {"m": {"n": {"o": {"p": {"q": {"r": {"s": {"t": {"u": {"v": {"w": {"x": {"y": {"z": {"aa": {"bb": {"cc": {"dd": {"ee": "too deep"}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
1543 >>> try:
1544 ... ToolInvocation(name="process_data", arguments=deep_args)
1545 ... except ValidationError as e:
1546 ... print("Validation failed: Exceeds maximum depth")
1547 Validation failed: Exceeds maximum depth
1549 >>> # Edge case: Empty tool name
1550 >>> try:
1551 ... ToolInvocation(name="", arguments={})
1552 ... except ValidationError as e:
1553 ... print("Validation failed: Name cannot be empty")
1554 Validation failed: Name cannot be empty
1556 >>> # Valid: Tool name with hyphen (but not starting/ending)
1557 >>> tool_inv = ToolInvocation(name="get_user_info", arguments={"id": 123})
1558 >>> tool_inv.name
1559 'get_user_info'
1561 >>> # Arguments with various types
1562 >>> args = {
1563 ... "string": "value",
1564 ... "number": 42,
1565 ... "boolean": True,
1566 ... "array": [1, 2, 3],
1567 ... "nested": {"key": "value"}
1568 ... }
1569 >>> tool_inv = ToolInvocation(name="complex_tool", arguments=args)
1570 >>> tool_inv.arguments["number"]
1571 42
1572 """
1574 name: str = Field(..., description="Name of tool to invoke")
1575 arguments: Dict[str, Any] = Field(default_factory=dict, description="Arguments matching tool's input schema")
1577 @field_validator("name")
1578 @classmethod
1579 def validate_name(cls, v: str) -> str:
1580 """Ensure tool names follow MCP naming conventions.
1582 Validates that the tool name:
1583 - Is not empty
1584 - Starts with a letter (not a number or special character)
1585 - Contains only letters, numbers, underscores, and hyphens
1586 - Does not contain HTML special characters that could cause XSS
1587 - Does not exceed maximum length (255 characters)
1589 Args:
1590 v (str): Tool name to validate
1592 Returns:
1593 str: The validated tool name if it passes all checks
1595 Raises:
1596 ValueError: If the tool name violates any validation rules
1597 """
1598 return SecurityValidator.validate_tool_name(v)
1600 @field_validator("arguments")
1601 @classmethod
1602 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
1603 """Validate arguments structure depth to prevent DoS attacks.
1605 Ensures that the arguments dictionary doesn't have excessive nesting
1606 that could cause performance issues or stack overflow. The default
1607 maximum depth is 10 levels.
1609 Args:
1610 v (dict): Arguments dictionary to validate
1612 Returns:
1613 dict: The validated arguments if within depth limits
1615 Raises:
1616 ValueError: If the arguments exceed the maximum allowed depth
1617 """
1618 SecurityValidator.validate_json_depth(v)
1619 return v
1622class ToolResult(BaseModelWithConfigDict):
1623 """Schema for tool invocation results.
1625 Supports:
1626 - Multiple content types (text/image)
1627 - Error reporting
1628 - Optional error messages
1629 """
1631 content: List[Union[TextContent, ImageContent]]
1632 structured_content: Optional[Dict[str, Any]] = None
1633 is_error: bool = False
1634 error_message: Optional[str] = None
1637class ResourceCreate(BaseModel):
1638 """
1639 Schema for creating a new resource.
1641 Attributes:
1642 model_config (ConfigDict): Configuration for the model.
1643 uri (str): Unique URI for the resource.
1644 name (str): Human-readable name for the resource.
1645 description (Optional[str]): Optional description of the resource.
1646 mime_type (Optional[str]): Optional MIME type of the resource.
1647 template (Optional[str]): Optional URI template for parameterized resources.
1648 content (Union[str, bytes]): Content of the resource, which can be text or binary.
1649 """
1651 model_config = ConfigDict(str_strip_whitespace=True, populate_by_name=True)
1653 uri: str = Field(..., description="Unique URI for the resource")
1654 name: str = Field(..., description="Human-readable resource name")
1655 description: Optional[str] = Field(None, description="Resource description")
1656 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the resource (MCP BaseMetadata)")
1657 mime_type: Optional[str] = Field(None, alias="mimeType", description="Resource MIME type")
1658 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1659 content: Union[str, bytes] = Field(..., description="Resource content (text or binary)")
1660 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the resource")
1662 # Team scoping fields
1663 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
1664 owner_email: Optional[str] = Field(None, description="Email of the resource owner")
1665 visibility: Optional[Literal["private", "team", "public"]] = Field(default=None, description="Visibility level: private, team, or public")
1666 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the resource")
1668 @field_validator("tags")
1669 @classmethod
1670 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
1671 """Validate and normalize tags.
1673 Args:
1674 v: Optional list of tag strings to validate
1676 Returns:
1677 List of validated tag strings
1678 """
1679 return validate_tags_field(v)
1681 @field_validator("uri")
1682 @classmethod
1683 def validate_uri(cls, v: str) -> str:
1684 """Validate URI format
1686 Args:
1687 v (str): Value to validate
1689 Returns:
1690 str: Value if validated as safe
1691 """
1692 return SecurityValidator.validate_uri(v, "Resource URI")
1694 @field_validator("name")
1695 @classmethod
1696 def validate_name(cls, v: str) -> str:
1697 """Validate resource name
1699 Args:
1700 v (str): Value to validate
1702 Returns:
1703 str: Value if validated as safe
1704 """
1705 return SecurityValidator.validate_name(v, "Resource name")
1707 @field_validator("description")
1708 @classmethod
1709 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1710 """Ensure descriptions display safely, truncate if too long
1712 Args:
1713 v (str): Value to validate
1715 Returns:
1716 str: Value if validated as safe and truncated if too long
1718 Raises:
1719 ValueError: When value is unsafe
1721 Examples:
1722 >>> from mcpgateway.schemas import ResourceCreate
1723 >>> ResourceCreate.validate_description('A safe description')
1724 'A safe description'
1725 >>> ResourceCreate.validate_description(None) # Test None case
1726 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1727 >>> truncated = ResourceCreate.validate_description(long_desc)
1728 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1729 0
1730 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1731 True
1732 """
1733 if v is None:
1734 return v
1735 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1736 # Truncate the description to the maximum allowed length
1737 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1738 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1739 return SecurityValidator.sanitize_display_text(truncated, "Description")
1740 return SecurityValidator.sanitize_display_text(v, "Description")
1742 @field_validator("mime_type")
1743 @classmethod
1744 def validate_mime_type(cls, v: Optional[str]) -> Optional[str]:
1745 """Validate MIME type format
1747 Args:
1748 v (str): Value to validate
1750 Returns:
1751 str: Value if validated as safe
1752 """
1753 if v is None:
1754 return v
1755 return SecurityValidator.validate_mime_type(v)
1757 @field_validator("content")
1758 @classmethod
1759 def validate_content(cls, v: Optional[Union[str, bytes]]) -> Optional[Union[str, bytes]]:
1760 """Validate content safety.
1762 Note: Size validation is performed at the service layer using configurable limits.
1763 This validator only checks encoding and dangerous patterns.
1765 Args:
1766 v (Union[str, bytes]): Value to validate
1768 Returns:
1769 Union[str, bytes]: Value if validated as safe
1771 Raises:
1772 ValueError: When value is unsafe
1773 """
1774 if v is None:
1775 return v
1777 # Validate UTF-8 encoding for bytes
1778 if isinstance(v, bytes):
1779 try:
1780 text = v.decode("utf-8")
1781 except UnicodeDecodeError:
1782 raise ValueError("Content must be UTF-8 decodable")
1783 else:
1784 text = v
1786 # Check for dangerous HTML patterns
1787 # Runtime pattern matching (not precompiled to allow test monkeypatching)
1788 if re.search(SecurityValidator.DANGEROUS_HTML_PATTERN, text, re.IGNORECASE):
1789 raise ValueError("Content contains HTML tags that may cause display issues")
1791 return v
1794class ResourceUpdate(BaseModelWithConfigDict):
1795 """Schema for updating an existing resource.
1797 Similar to ResourceCreate but URI is not required and all fields are optional.
1798 """
1800 uri: Optional[str] = Field(None, description="Unique URI for the resource")
1801 name: Optional[str] = Field(None, description="Human-readable resource name")
1802 description: Optional[str] = Field(None, description="Resource description")
1803 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the resource (MCP BaseMetadata)")
1804 mime_type: Optional[str] = Field(None, description="Resource MIME type")
1805 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1806 content: Optional[Union[str, bytes]] = Field(None, description="Resource content (text or binary)")
1807 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the resource")
1809 # Team scoping fields
1810 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
1811 owner_email: Optional[str] = Field(None, description="Email of the resource owner")
1812 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
1814 @field_validator("tags")
1815 @classmethod
1816 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
1817 """Validate and normalize tags.
1819 Args:
1820 v: Optional list of tag strings to validate
1822 Returns:
1823 List of validated tag strings or None if input is None
1824 """
1825 return validate_tags_field(v)
1827 @field_validator("name")
1828 @classmethod
1829 def validate_name(cls, v: str) -> str:
1830 """Validate resource name
1832 Args:
1833 v (str): Value to validate
1835 Returns:
1836 str: Value if validated as safe
1837 """
1838 return SecurityValidator.validate_name(v, "Resource name")
1840 @field_validator("description")
1841 @classmethod
1842 def validate_description(cls, v: Optional[str]) -> Optional[str]:
1843 """Ensure descriptions display safely, truncate if too long
1845 Args:
1846 v (str): Value to validate
1848 Returns:
1849 str: Value if validated as safe and truncated if too long
1851 Raises:
1852 ValueError: When value is unsafe
1854 Examples:
1855 >>> from mcpgateway.schemas import ResourceUpdate
1856 >>> ResourceUpdate.validate_description('A safe description')
1857 'A safe description'
1858 >>> ResourceUpdate.validate_description(None) # Test None case
1859 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
1860 >>> truncated = ResourceUpdate.validate_description(long_desc)
1861 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
1862 0
1863 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
1864 True
1865 """
1866 if v is None:
1867 return v
1868 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
1869 # Truncate the description to the maximum allowed length
1870 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
1871 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
1872 return SecurityValidator.sanitize_display_text(truncated, "Description")
1873 return SecurityValidator.sanitize_display_text(v, "Description")
1875 @field_validator("mime_type")
1876 @classmethod
1877 def validate_mime_type(cls, v: Optional[str]) -> Optional[str]:
1878 """Validate MIME type format
1880 Args:
1881 v (str): Value to validate
1883 Returns:
1884 str: Value if validated as safe
1885 """
1886 if v is None:
1887 return v
1888 return SecurityValidator.validate_mime_type(v)
1890 @field_validator("content")
1891 @classmethod
1892 def validate_content(cls, v: Optional[Union[str, bytes]]) -> Optional[Union[str, bytes]]:
1893 """Validate content safety.
1895 Note: Size validation is performed at the service layer using configurable limits.
1896 This validator only checks encoding and dangerous patterns.
1898 Args:
1899 v (Union[str, bytes]): Value to validate
1901 Returns:
1902 Union[str, bytes]: Value if validated as safe
1904 Raises:
1905 ValueError: When value is unsafe
1906 """
1907 if v is None:
1908 return v
1910 # Validate UTF-8 encoding for bytes
1911 if isinstance(v, bytes):
1912 try:
1913 text = v.decode("utf-8")
1914 except UnicodeDecodeError:
1915 raise ValueError("Content must be UTF-8 decodable")
1916 else:
1917 text = v
1919 # Check for dangerous HTML patterns
1920 # Runtime pattern matching (not precompiled to allow test monkeypatching)
1921 if re.search(SecurityValidator.DANGEROUS_HTML_PATTERN, text, re.IGNORECASE):
1922 raise ValueError("Content contains HTML tags that may cause display issues")
1924 return v
1927class ResourceRead(BaseModelWithConfigDict):
1928 """Schema for reading resource information.
1930 Includes all resource fields plus:
1931 - Database ID
1932 - Content size
1933 - Creation/update timestamps
1934 - Active status
1935 - Metrics: Aggregated metrics for the resource invocations.
1936 """
1938 id: str = Field(description="Unique ID of the resource")
1939 uri: str
1940 name: str
1941 description: Optional[str]
1942 mime_type: Optional[str]
1943 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the resource")
1944 uri_template: Optional[str] = Field(None, description="URI template for parameterized resources")
1945 size: Optional[int]
1946 created_at: datetime
1947 updated_at: datetime
1948 enabled: bool
1949 metrics: Optional[ResourceMetrics] = Field(None, description="Resource metrics (may be None in list operations)")
1950 tags: List[str] = Field(default_factory=list, description="Tags for categorizing the resource")
1952 # Comprehensive metadata for audit tracking
1953 created_by: Optional[str] = Field(None, description="Username who created this entity")
1954 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
1955 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
1956 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
1958 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
1959 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
1960 modified_via: Optional[str] = Field(None, description="Modification method")
1961 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
1963 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
1964 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
1965 version: Optional[int] = Field(1, description="Entity version for change tracking")
1967 # Team scoping fields
1968 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
1969 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
1970 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
1971 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
1973 # MCP protocol fields
1974 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the resource")
1975 annotations: Optional[Annotations] = Field(None, description="Optional annotations for client rendering hints")
1976 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
1978 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
1981class ResourceSubscription(BaseModelWithConfigDict):
1982 """Schema for resource subscriptions.
1984 This schema validates resource subscription requests to ensure URIs are safe
1985 and subscriber IDs follow proper formatting rules. It prevents various
1986 injection attacks and ensures data consistency.
1988 Tracks:
1989 - Resource URI being subscribed to (validated for safety)
1990 - Unique subscriber identifier (validated for proper format)
1992 Validation Rules:
1993 - URIs cannot contain HTML special characters (<, >, ", ', backslash)
1994 - URIs cannot contain directory traversal sequences (..)
1995 - URIs must contain only safe characters (alphanumeric, _, -, :, /, ?, =, &, %)
1996 - Subscriber IDs must contain only alphanumeric characters, underscores, hyphens, and dots
1997 - Both fields have maximum length limits (255 characters)
1999 Attributes:
2000 uri (str): URI of the resource to subscribe to. Must be a safe, valid URI.
2001 subscriber_id (str): Unique identifier for the subscriber. Must follow
2002 identifier naming conventions.
2004 Examples:
2005 >>> from pydantic import ValidationError
2006 >>> # Valid subscription
2007 >>> sub = ResourceSubscription(uri="/api/v1/users/123", subscriber_id="client_001")
2008 >>> sub.uri
2009 '/api/v1/users/123'
2010 >>> sub.subscriber_id
2011 'client_001'
2013 >>> # Valid URI with query parameters
2014 >>> sub = ResourceSubscription(uri="/data?type=json&limit=10", subscriber_id="app.service.1")
2015 >>> sub.uri
2016 '/data?type=json&limit=10'
2018 >>> # Valid subscriber ID with dots (common for service names)
2019 >>> sub = ResourceSubscription(uri="/events", subscriber_id="com.example.service")
2020 >>> sub.subscriber_id
2021 'com.example.service'
2023 >>> # Invalid: XSS attempt in URI
2024 >>> try:
2025 ... ResourceSubscription(uri="<script>alert('XSS')</script>", subscriber_id="sub1")
2026 ... except ValidationError as e:
2027 ... print("Validation failed: HTML characters not allowed")
2028 Validation failed: HTML characters not allowed
2030 >>> # Invalid: Directory traversal in URI
2031 >>> try:
2032 ... ResourceSubscription(uri="/api/../../../etc/passwd", subscriber_id="sub1")
2033 ... except ValidationError as e:
2034 ... print("Validation failed: Directory traversal detected")
2035 Validation failed: Directory traversal detected
2037 >>> # Invalid: SQL injection attempt in URI
2038 >>> try:
2039 ... ResourceSubscription(uri="/users'; DROP TABLE users;--", subscriber_id="sub1")
2040 ... except ValidationError as e:
2041 ... print("Validation failed: Invalid characters in URI")
2042 Validation failed: Invalid characters in URI
2044 >>> # Invalid: Special characters in subscriber ID
2045 >>> try:
2046 ... ResourceSubscription(uri="/api/data", subscriber_id="sub@123!")
2047 ... except ValidationError as e:
2048 ... print("Validation failed: Invalid subscriber ID format")
2049 Validation failed: Invalid subscriber ID format
2051 >>> # Invalid: Empty URI
2052 >>> try:
2053 ... ResourceSubscription(uri="", subscriber_id="sub1")
2054 ... except ValidationError as e:
2055 ... print("Validation failed: URI cannot be empty")
2056 Validation failed: URI cannot be empty
2058 >>> # Invalid: Empty subscriber ID
2059 >>> try:
2060 ... ResourceSubscription(uri="/api/data", subscriber_id="")
2061 ... except ValidationError as e:
2062 ... print("Validation failed: Subscriber ID cannot be empty")
2063 Validation failed: Subscriber ID cannot be empty
2065 >>> # Valid: Complex but safe URI
2066 >>> sub = ResourceSubscription(
2067 ... uri="/api/v2/resources/category:items/filter?status=active&limit=50",
2068 ... subscriber_id="monitor-service-01"
2069 ... )
2070 >>> sub.uri
2071 '/api/v2/resources/category:items/filter?status=active&limit=50'
2073 >>> # Edge case: Maximum length validation (simulated)
2074 >>> long_uri = "/" + "a" * 254 # Just under limit
2075 >>> sub = ResourceSubscription(uri=long_uri, subscriber_id="sub1")
2076 >>> len(sub.uri)
2077 255
2079 >>> # Invalid: Quotes in URI (could break out of attributes)
2080 >>> try:
2081 ... ResourceSubscription(uri='/api/data"onclick="alert(1)', subscriber_id="sub1")
2082 ... except ValidationError as e:
2083 ... print("Validation failed: Quotes not allowed in URI")
2084 Validation failed: Quotes not allowed in URI
2085 """
2087 uri: str = Field(..., description="URI of resource to subscribe to")
2088 subscriber_id: str = Field(..., description="Unique subscriber identifier")
2090 @field_validator("uri")
2091 @classmethod
2092 def validate_uri(cls, v: str) -> str:
2093 """Validate URI format for safety and correctness.
2095 Ensures the URI:
2096 - Is not empty
2097 - Does not contain HTML special characters that could cause XSS
2098 - Does not contain directory traversal sequences (..)
2099 - Contains only allowed characters for URIs
2100 - Does not exceed maximum length (255 characters)
2102 This prevents various injection attacks including XSS, path traversal,
2103 and other URI-based vulnerabilities.
2105 Args:
2106 v (str): URI to validate
2108 Returns:
2109 str: The validated URI if it passes all security checks
2111 Raises:
2112 ValueError: If the URI contains dangerous patterns or invalid characters
2113 """
2114 return SecurityValidator.validate_uri(v, "Resource URI")
2116 @field_validator("subscriber_id")
2117 @classmethod
2118 def validate_subscriber_id(cls, v: str) -> str:
2119 """Validate subscriber ID format.
2121 Ensures the subscriber ID:
2122 - Is not empty
2123 - Contains only safe identifier characters
2124 - Allows email-style IDs for authenticated subscribers
2125 - Does not contain HTML special characters
2126 - Follows standard identifier naming conventions
2127 - Does not exceed maximum length (255 characters)
2129 This ensures consistency and prevents injection attacks through
2130 subscriber identifiers.
2132 Args:
2133 v (str): Subscriber ID to validate
2135 Returns:
2136 str: The validated subscriber ID if it passes all checks
2138 Raises:
2139 ValueError: If the subscriber ID violates naming conventions
2140 """
2141 if not v:
2142 raise ValueError("Subscriber ID cannot be empty")
2144 # Allow email-like subscriber IDs while keeping strict character controls.
2145 if re.match(r"^[A-Za-z0-9_.@+-]+$", v):
2146 if re.search(SecurityValidator.VALIDATION_UNSAFE_URI_PATTERN, v):
2147 raise ValueError("Subscriber ID cannot contain HTML special characters")
2148 if len(v) > SecurityValidator.MAX_NAME_LENGTH:
2149 raise ValueError(f"Subscriber ID exceeds maximum length of {SecurityValidator.MAX_NAME_LENGTH}")
2150 return v
2152 return SecurityValidator.validate_identifier(v, "Subscriber ID")
2155class ResourceNotification(BaseModelWithConfigDict):
2156 """Schema for resource update notifications.
2158 Contains:
2159 - Resource URI
2160 - Updated content
2161 - Update timestamp
2162 """
2164 uri: str
2165 content: ResourceContent
2166 timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
2168 @field_serializer("timestamp")
2169 def serialize_timestamp(self, dt: datetime) -> str:
2170 """Serialize the `timestamp` field as an ISO 8601 string with UTC timezone.
2172 Converts the given datetime to UTC and returns it in ISO 8601 format,
2173 replacing the "+00:00" suffix with "Z" to indicate UTC explicitly.
2175 Args:
2176 dt (datetime): The datetime object to serialize.
2178 Returns:
2179 str: ISO 8601 formatted string in UTC, ending with 'Z'.
2180 """
2181 return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
2184# --- Prompt Schemas ---
2187class PromptArgument(BaseModelWithConfigDict):
2188 """Schema for prompt template arguments.
2190 Defines:
2191 - Argument name
2192 - Optional description
2193 - Required flag
2194 """
2196 name: str = Field(..., description="Argument name")
2197 description: Optional[str] = Field(None, description="Argument description")
2198 required: bool = Field(default=False, description="Whether argument is required")
2200 # Use base config; example metadata removed to avoid config merging type issues in static checks
2203class PromptCreate(BaseModelWithConfigDict):
2204 """
2205 Schema for creating a new prompt.
2207 Attributes:
2208 model_config (ConfigDict): Configuration for the model.
2209 name (str): Unique name for the prompt.
2210 description (Optional[str]): Optional description of the prompt.
2211 template (str): Template text for the prompt.
2212 arguments (List[PromptArgument]): List of arguments for the template.
2213 """
2215 model_config = ConfigDict(**dict(BaseModelWithConfigDict.model_config), str_strip_whitespace=True)
2217 name: str = Field(..., description="Unique name for the prompt")
2218 custom_name: Optional[str] = Field(None, description="Custom prompt name used for MCP invocation")
2219 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2220 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the prompt (MCP BaseMetadata)")
2221 description: Optional[str] = Field(None, description="Prompt description")
2222 template: str = Field(..., description="Prompt template text")
2223 arguments: List[PromptArgument] = Field(default_factory=list, description="List of arguments for the template")
2224 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the prompt")
2226 # Team scoping fields
2227 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
2228 owner_email: Optional[str] = Field(None, description="Email of the prompt owner")
2229 visibility: Optional[Literal["private", "team", "public"]] = Field(default=None, description="Visibility level: private, team, or public")
2230 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the prompt")
2232 @field_validator("tags")
2233 @classmethod
2234 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2235 """Validate and normalize tags.
2237 Args:
2238 v: Optional list of tag strings to validate
2240 Returns:
2241 List of validated tag strings
2242 """
2243 return validate_tags_field(v)
2245 @field_validator("name")
2246 @classmethod
2247 def validate_name(cls, v: str) -> str:
2248 """Ensure prompt names display correctly in UI
2250 Args:
2251 v (str): Value to validate
2253 Returns:
2254 str: Value if validated as safe
2255 """
2256 return SecurityValidator.validate_name(v, "Prompt name")
2258 @field_validator("custom_name")
2259 @classmethod
2260 def validate_custom_name(cls, v: Optional[str]) -> Optional[str]:
2261 """Ensure custom prompt names follow MCP naming conventions.
2263 Args:
2264 v: Custom prompt name to validate.
2266 Returns:
2267 The validated custom name or None.
2268 """
2269 if v is None:
2270 return v
2271 return SecurityValidator.validate_name(v, "Prompt name")
2273 @field_validator("display_name")
2274 @classmethod
2275 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
2276 """Ensure display names render safely in UI.
2278 Args:
2279 v: Display name to validate.
2281 Returns:
2282 The validated display name or None.
2283 """
2284 if v is None:
2285 return v
2286 return SecurityValidator.sanitize_display_text(v, "Prompt display name")
2288 @field_validator("description")
2289 @classmethod
2290 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2291 """Ensure descriptions display safely, truncate if too long
2293 Args:
2294 v (str): Value to validate
2296 Returns:
2297 str: Value if validated as safe and truncated if too long
2299 Raises:
2300 ValueError: When value is unsafe
2302 Examples:
2303 >>> from mcpgateway.schemas import PromptCreate
2304 >>> PromptCreate.validate_description('A safe description')
2305 'A safe description'
2306 >>> PromptCreate.validate_description(None) # Test None case
2307 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2308 >>> truncated = PromptCreate.validate_description(long_desc)
2309 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2310 0
2311 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2312 True
2313 """
2314 if v is None:
2315 return v
2316 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2317 # Truncate the description to the maximum allowed length
2318 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2319 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2320 return SecurityValidator.sanitize_display_text(truncated, "Description")
2321 return SecurityValidator.sanitize_display_text(v, "Description")
2323 @field_validator("template")
2324 @classmethod
2325 def validate_template(cls, v: str) -> str:
2326 """Validate template content for safe display
2328 Args:
2329 v (str): Value to validate
2331 Returns:
2332 str: Value if validated as safe
2333 """
2334 return SecurityValidator.validate_template(v)
2336 @field_validator("arguments")
2337 @classmethod
2338 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
2339 """Ensure JSON structure is valid and within complexity limits
2341 Args:
2342 v (dict): Value to validate
2344 Returns:
2345 dict: Value if validated as safe
2346 """
2347 SecurityValidator.validate_json_depth(v)
2348 return v
2351class PromptExecuteArgs(BaseModel):
2352 """
2353 Schema for args executing a prompt
2355 Attributes:
2356 args (Dict[str, str]): Arguments for prompt execution.
2357 """
2359 model_config = ConfigDict(str_strip_whitespace=True)
2361 args: Dict[str, str] = Field(default_factory=dict, description="Arguments for prompt execution")
2363 @field_validator("args")
2364 @classmethod
2365 def validate_args(cls, v: dict) -> dict:
2366 """Ensure prompt arguments pass XSS validation
2368 Args:
2369 v (dict): Value to validate
2371 Returns:
2372 dict: Value if validated as safe
2373 """
2374 for val in v.values():
2375 SecurityValidator.validate_no_xss(val, "Prompt execution arguments")
2376 return v
2379class PromptUpdate(BaseModelWithConfigDict):
2380 """Schema for updating an existing prompt.
2382 Similar to PromptCreate but all fields are optional to allow partial updates.
2383 """
2385 name: Optional[str] = Field(None, description="Unique name for the prompt")
2386 custom_name: Optional[str] = Field(None, description="Custom prompt name used for MCP invocation")
2387 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2388 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the prompt (MCP BaseMetadata)")
2389 description: Optional[str] = Field(None, description="Prompt description")
2390 template: Optional[str] = Field(None, description="Prompt template text")
2391 arguments: Optional[List[PromptArgument]] = Field(None, description="List of arguments for the template")
2393 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the prompt")
2395 # Team scoping fields
2396 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
2397 owner_email: Optional[str] = Field(None, description="Email of the prompt owner")
2398 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
2400 @field_validator("tags")
2401 @classmethod
2402 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2403 """Validate and normalize tags.
2405 Args:
2406 v: Optional list of tag strings to validate
2408 Returns:
2409 List of validated tag strings
2410 """
2411 return validate_tags_field(v)
2413 @field_validator("name")
2414 @classmethod
2415 def validate_name(cls, v: str) -> str:
2416 """Ensure prompt names display correctly in UI
2418 Args:
2419 v (str): Value to validate
2421 Returns:
2422 str: Value if validated as safe
2423 """
2424 return SecurityValidator.validate_name(v, "Prompt name")
2426 @field_validator("custom_name")
2427 @classmethod
2428 def validate_custom_name(cls, v: Optional[str]) -> Optional[str]:
2429 """Ensure custom prompt names follow MCP naming conventions.
2431 Args:
2432 v: Custom prompt name to validate.
2434 Returns:
2435 The validated custom name or None.
2436 """
2437 if v is None:
2438 return v
2439 return SecurityValidator.validate_name(v, "Prompt name")
2441 @field_validator("display_name")
2442 @classmethod
2443 def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
2444 """Ensure display names render safely in UI.
2446 Args:
2447 v: Display name to validate.
2449 Returns:
2450 The validated display name or None.
2451 """
2452 if v is None:
2453 return v
2454 return SecurityValidator.sanitize_display_text(v, "Prompt display name")
2456 @field_validator("description")
2457 @classmethod
2458 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2459 """Ensure descriptions display safely, truncate if too long
2461 Args:
2462 v (str): Value to validate
2464 Returns:
2465 str: Value if validated as safe and truncated if too long
2467 Raises:
2468 ValueError: When value is unsafe
2470 Examples:
2471 >>> from mcpgateway.schemas import PromptUpdate
2472 >>> PromptUpdate.validate_description('A safe description')
2473 'A safe description'
2474 >>> PromptUpdate.validate_description(None) # Test None case
2475 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2476 >>> truncated = PromptUpdate.validate_description(long_desc)
2477 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2478 0
2479 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2480 True
2481 """
2482 if v is None:
2483 return v
2484 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2485 # Truncate the description to the maximum allowed length
2486 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2487 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2488 return SecurityValidator.sanitize_display_text(truncated, "Description")
2489 return SecurityValidator.sanitize_display_text(v, "Description")
2491 @field_validator("template")
2492 @classmethod
2493 def validate_template(cls, v: str) -> str:
2494 """Validate template content for safe display
2496 Args:
2497 v (str): Value to validate
2499 Returns:
2500 str: Value if validated as safe
2501 """
2502 return SecurityValidator.validate_template(v)
2504 @field_validator("arguments")
2505 @classmethod
2506 def validate_arguments(cls, v: Dict[str, Any]) -> Dict[str, Any]:
2507 """Ensure JSON structure is valid and within complexity limits
2509 Args:
2510 v (dict): Value to validate
2512 Returns:
2513 dict: Value if validated as safe
2514 """
2515 SecurityValidator.validate_json_depth(v)
2516 return v
2519class PromptRead(BaseModelWithConfigDict):
2520 """Schema for reading prompt information.
2522 Includes all prompt fields plus:
2523 - Database ID
2524 - Creation/update timestamps
2525 - Active status
2526 - Metrics: Aggregated metrics for the prompt invocations.
2527 """
2529 id: str = Field(description="Unique ID of the prompt")
2530 name: str
2531 original_name: str
2532 custom_name: str
2533 custom_name_slug: str
2534 display_name: Optional[str] = Field(None, description="Display name for the prompt (shown in UI)")
2535 gateway_id: Optional[str] = Field(None, description="ID of the gateway for the prompt")
2536 gateway_slug: Optional[str] = None
2537 description: Optional[str]
2538 template: str
2539 arguments: List[PromptArgument]
2540 created_at: datetime
2541 updated_at: datetime
2542 # is_active: bool
2543 enabled: bool
2544 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the prompt")
2545 metrics: Optional[PromptMetrics] = Field(None, description="Prompt metrics (may be None in list operations)")
2547 # Comprehensive metadata for audit tracking
2548 created_by: Optional[str] = Field(None, description="Username who created this entity")
2549 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
2550 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
2551 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
2553 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
2554 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
2555 modified_via: Optional[str] = Field(None, description="Modification method")
2556 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
2558 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
2559 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
2560 version: Optional[int] = Field(1, description="Entity version for change tracking")
2562 # Team scoping fields
2563 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
2564 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
2565 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
2566 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
2568 # MCP protocol fields
2569 title: Optional[str] = Field(None, max_length=255, description="Human-readable title for the prompt")
2570 meta: Optional[Dict[str, Any]] = Field(None, alias="_meta", description="Optional metadata for protocol extension")
2572 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
2575class PromptInvocation(BaseModelWithConfigDict):
2576 """Schema for prompt invocation requests.
2578 Contains:
2579 - Prompt name to use
2580 - Arguments for template rendering
2581 """
2583 name: str = Field(..., description="Name of prompt to use")
2584 arguments: Dict[str, str] = Field(default_factory=dict, description="Arguments for template rendering")
2587# --- Global Config Schemas ---
2588class GlobalConfigUpdate(BaseModel):
2589 """Schema for updating global configuration.
2591 Attributes:
2592 passthrough_headers (Optional[List[str]]): List of headers allowed to be passed through globally
2593 """
2595 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through globally")
2598class GlobalConfigRead(BaseModel):
2599 """Schema for reading global configuration.
2601 Attributes:
2602 passthrough_headers (Optional[List[str]]): List of headers allowed to be passed through globally
2603 """
2605 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through globally")
2608# --- Gateway Schemas ---
2611# --- Transport Type ---
2612class TransportType(str, Enum):
2613 """
2614 Enumeration of supported transport mechanisms for communication between components.
2616 Attributes:
2617 SSE (str): Server-Sent Events transport.
2618 HTTP (str): Standard HTTP-based transport.
2619 STDIO (str): Standard input/output transport.
2620 STREAMABLEHTTP (str): HTTP transport with streaming.
2621 """
2623 SSE = "SSE"
2624 HTTP = "HTTP"
2625 STDIO = "STDIO"
2626 STREAMABLEHTTP = "STREAMABLEHTTP"
2629class GatewayCreate(BaseModelWithConfigDict):
2630 """
2631 Schema for creating a new gateway.
2633 Attributes:
2634 model_config (ConfigDict): Configuration for the model.
2635 name (str): Unique name for the gateway.
2636 url (Union[str, AnyHttpUrl]): Gateway endpoint URL.
2637 description (Optional[str]): Optional description of the gateway.
2638 transport (str): Transport used by the MCP server, default is "SSE".
2639 auth_type (Optional[str]): Type of authentication (basic, bearer, authheaders, or none).
2640 auth_username (Optional[str]): Username for basic authentication.
2641 auth_password (Optional[str]): Password for basic authentication.
2642 auth_token (Optional[str]): Token for bearer authentication.
2643 auth_header_key (Optional[str]): Key for custom headers authentication.
2644 auth_header_value (Optional[str]): Value for custom headers authentication.
2645 auth_headers (Optional[List[Dict[str, str]]]): List of custom headers for authentication.
2646 auth_value (Optional[str]): Alias for authentication value, used for better access post-validation.
2647 """
2649 model_config = ConfigDict(str_strip_whitespace=True)
2651 name: str = Field(..., description="Unique name for the gateway")
2652 url: Union[str, AnyHttpUrl] = Field(..., description="Gateway endpoint URL")
2653 description: Optional[str] = Field(None, description="Gateway description")
2654 transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
2655 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
2657 # Authorizations
2658 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, authheaders, oauth, query_param, or none")
2659 # Fields for various types of authentication
2660 auth_username: Optional[str] = Field(None, description="Username for basic authentication")
2661 auth_password: Optional[str] = Field(None, description="Password for basic authentication")
2662 auth_token: Optional[str] = Field(None, description="Token for bearer authentication")
2663 auth_header_key: Optional[str] = Field(None, description="Key for custom headers authentication")
2664 auth_header_value: Optional[str] = Field(None, description="Value for custom headers authentication")
2665 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
2667 # OAuth 2.0 configuration
2668 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
2670 # Query Parameter Authentication (INSECURE)
2671 auth_query_param_key: Optional[str] = Field(
2672 None,
2673 description="Query parameter name for authentication (e.g., 'api_key', 'tavilyApiKey')",
2674 pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-]*$",
2675 )
2676 auth_query_param_value: Optional[SecretStr] = Field(
2677 None,
2678 description="Query parameter value (API key). Stored encrypted.",
2679 )
2681 # Adding `auth_value` as an alias for better access post-validation
2682 auth_value: Optional[str] = Field(None, validate_default=True)
2684 # One time auth - do not store the auth in gateway flag
2685 one_time_auth: Optional[bool] = Field(default=False, description="The authentication should be used only once and not stored in the gateway")
2687 tags: Optional[List[Union[str, Dict[str, str]]]] = Field(default_factory=list, description="Tags for categorizing the gateway")
2689 # Team scoping fields for resource organization
2690 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
2691 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
2692 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Gateway visibility: private, team, or public")
2694 # CA certificate
2695 ca_certificate: Optional[str] = Field(None, description="Custom CA certificate for TLS verification")
2696 ca_certificate_sig: Optional[str] = Field(None, description="Signature of the custom CA certificate for integrity verification")
2697 signing_algorithm: Optional[str] = Field("ed25519", description="Algorithm used for signing the CA certificate")
2699 # mTLS client certificate/key
2700 client_cert: Optional[str] = Field(None, description="Client TLS certificate for mTLS authentication")
2701 client_key: Optional[str] = Field(None, description="Client TLS key for mTLS authentication")
2703 # Per-gateway refresh configuration
2704 refresh_interval_seconds: Optional[int] = Field(None, ge=60, description="Per-gateway refresh interval in seconds (minimum 60); uses global default if not set")
2706 # Gateway mode configuration
2707 gateway_mode: str = Field(default="cache", description="Gateway mode: 'cache' (database caching, default) or 'direct_proxy' (pass-through mode with no caching)", pattern="^(cache|direct_proxy)$")
2709 @field_validator("gateway_mode", mode="before")
2710 @classmethod
2711 def default_gateway_mode(cls, v: Optional[str]) -> str:
2712 """Default gateway_mode to 'cache' when None is provided.
2714 Args:
2715 v: Gateway mode value (may be None).
2717 Returns:
2718 The validated gateway mode string, defaulting to 'cache'.
2719 """
2720 return v if v is not None else "cache"
2722 @field_validator("tags")
2723 @classmethod
2724 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
2725 """Validate and normalize tags.
2727 Args:
2728 v: Optional list of tag strings to validate
2730 Returns:
2731 List of validated tag strings
2732 """
2733 return validate_tags_field(v)
2735 @field_validator("name")
2736 @classmethod
2737 def validate_name(cls, v: str) -> str:
2738 """Validate gateway name
2740 Args:
2741 v (str): Value to validate
2743 Returns:
2744 str: Value if validated as safe
2745 """
2746 return SecurityValidator.validate_name(v, "Gateway name")
2748 @field_validator("url")
2749 @classmethod
2750 def validate_url(cls, v: str) -> str:
2751 """Validate gateway URL
2753 Args:
2754 v (str): Value to validate
2756 Returns:
2757 str: Value if validated as safe
2758 """
2759 return validate_core_url(v, "Gateway URL")
2761 @field_validator("description")
2762 @classmethod
2763 def validate_description(cls, v: Optional[str]) -> Optional[str]:
2764 """Ensure descriptions display safely, truncate if too long
2766 Args:
2767 v (str): Value to validate
2769 Returns:
2770 str: Value if validated as safe and truncated if too long
2772 Raises:
2773 ValueError: When value is unsafe
2775 Examples:
2776 >>> from mcpgateway.schemas import GatewayCreate
2777 >>> GatewayCreate.validate_description('A safe description')
2778 'A safe description'
2779 >>> GatewayCreate.validate_description(None) # Test None case
2780 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
2781 >>> truncated = ToolCreate.validate_description(long_desc)
2782 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
2783 0
2784 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
2785 True
2786 """
2787 if v is None:
2788 return v
2789 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
2790 # Truncate the description to the maximum allowed length
2791 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
2792 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
2793 return SecurityValidator.sanitize_display_text(truncated, "Description")
2794 return SecurityValidator.sanitize_display_text(v, "Description")
2796 @field_validator("auth_value", mode="before")
2797 @classmethod
2798 def create_auth_value(cls, v, info):
2799 """
2800 This validator will run before the model is fully instantiated (mode="before")
2801 It will process the auth fields based on auth_type and generate auth_value.
2803 Args:
2804 v: Input url
2805 info: ValidationInfo containing auth_type
2807 Returns:
2808 str: Auth value
2809 """
2810 data = info.data
2811 auth_type = data.get("auth_type")
2813 if (auth_type is None) or (auth_type == ""):
2814 return v # If no auth_type is provided, no need to create auth_value
2816 # Process the auth fields and generate auth_value based on auth_type
2817 auth_value = cls._process_auth_fields(info)
2818 return auth_value
2820 @field_validator("transport")
2821 @classmethod
2822 def validate_transport(cls, v: str) -> str:
2823 """
2824 Validates that the given transport value is one of the supported TransportType values.
2826 Args:
2827 v (str): The transport value to validate.
2829 Returns:
2830 str: The validated transport value if it is valid.
2832 Raises:
2833 ValueError: If the provided value is not a valid transport type.
2835 Valid transport types are defined in the TransportType enum:
2836 - SSE
2837 - HTTP
2838 - STDIO
2839 - STREAMABLEHTTP
2840 """
2841 allowed = [t.value for t in TransportType.__members__.values()]
2842 if v not in allowed:
2843 raise ValueError(f"Invalid transport type: {v}. Must be one of: {', '.join(allowed)}")
2844 return v
2846 @staticmethod
2847 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
2848 """
2849 Processes the input authentication fields and returns the correct auth_value.
2850 This method is called based on the selected auth_type.
2852 Args:
2853 info: ValidationInfo containing auth fields
2855 Returns:
2856 Encoded auth string or None
2858 Raises:
2859 ValueError: If auth_type is invalid
2860 """
2861 data = info.data
2862 auth_type = data.get("auth_type")
2864 if auth_type == "basic":
2865 # For basic authentication, both username and password must be present
2866 username = data.get("auth_username")
2867 password = data.get("auth_password")
2869 if not username or not password:
2870 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
2872 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
2873 return encode_auth({"Authorization": f"Basic {creds}"})
2875 if auth_type == "bearer":
2876 # For bearer authentication, only token is required
2877 token = data.get("auth_token")
2879 if not token:
2880 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
2882 return encode_auth({"Authorization": f"Bearer {token}"})
2884 if auth_type == "oauth":
2885 # For OAuth authentication, we don't encode anything here
2886 # The OAuth configuration is handled separately in the oauth_config field
2887 # This method is only called for traditional auth types
2888 return None
2890 if auth_type == "authheaders":
2891 # Support both new multi-headers format and legacy single header format
2892 auth_headers = data.get("auth_headers")
2893 if auth_headers and isinstance(auth_headers, list):
2894 # New multi-headers format with enhanced validation
2895 header_dict = {}
2896 duplicate_keys = set()
2898 for header in auth_headers:
2899 if not isinstance(header, dict):
2900 continue
2902 key = header.get("key")
2903 value = header.get("value", "")
2905 # Skip headers without keys
2906 if not key:
2907 continue
2909 # Track duplicate keys (last value wins)
2910 if key in header_dict:
2911 duplicate_keys.add(key)
2913 # Validate header key format (basic HTTP header validation)
2914 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
2915 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
2917 # Store header (empty values are allowed)
2918 header_dict[key] = value
2920 # Ensure at least one valid header
2921 if not header_dict:
2922 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
2924 # Warn about duplicate keys (optional - could log this instead)
2925 if duplicate_keys:
2926 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
2928 # Check for excessive headers (prevent abuse)
2929 if len(header_dict) > 100:
2930 raise ValueError("Maximum of 100 headers allowed per gateway.")
2932 return encode_auth(header_dict)
2934 # Legacy single header format (backward compatibility)
2935 header_key = data.get("auth_header_key")
2936 header_value = data.get("auth_header_value")
2938 if not header_key or not header_value:
2939 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
2941 return encode_auth({header_key: header_value})
2943 if auth_type == "one_time_auth":
2944 return None # No auth_value needed for one-time auth
2946 if auth_type == "query_param":
2947 # Query param auth doesn't use auth_value field
2948 # Validation is handled by model_validator
2949 return None
2951 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
2953 @model_validator(mode="after")
2954 def validate_query_param_auth(self) -> "GatewayCreate":
2955 """Validate query parameter authentication configuration.
2957 Returns:
2958 GatewayCreate: The validated instance.
2960 Raises:
2961 ValueError: If query param auth is disabled or host is not in allowlist.
2962 """
2963 if self.auth_type != "query_param":
2964 return self
2966 # Check feature flag
2967 if not settings.insecure_allow_queryparam_auth:
2968 raise ValueError("Query parameter authentication is disabled. " + "Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable. " + "WARNING: API keys in URLs may appear in proxy logs.")
2970 # Check required fields
2971 if not self.auth_query_param_key:
2972 raise ValueError("auth_query_param_key is required when auth_type is 'query_param'")
2973 if not self.auth_query_param_value:
2974 raise ValueError("auth_query_param_value is required when auth_type is 'query_param'")
2976 # Check host allowlist (if configured)
2977 if settings.insecure_queryparam_auth_allowed_hosts:
2978 parsed = urlparse(str(self.url))
2979 # Extract hostname properly (handles IPv6, ports, userinfo)
2980 hostname = parsed.hostname or ""
2981 hostname = hostname.lower()
2983 if hostname not in settings.insecure_queryparam_auth_allowed_hosts:
2984 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
2985 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query parameter auth. Allowed hosts: {allowed}")
2987 return self
2990class GatewayUpdate(BaseModelWithConfigDict):
2991 """Schema for updating an existing federation gateway.
2993 Similar to GatewayCreate but all fields are optional to allow partial updates.
2994 """
2996 name: Optional[str] = Field(None, description="Unique name for the gateway")
2997 url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Gateway endpoint URL")
2998 description: Optional[str] = Field(None, description="Gateway description")
2999 transport: Optional[str] = Field(None, description="Transport used by MCP server: SSE or STREAMABLEHTTP")
3001 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
3003 # Authorizations
3004 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, authheaders or None")
3005 auth_username: Optional[str] = Field(None, description="username for basic authentication")
3006 auth_password: Optional[str] = Field(None, description="password for basic authentication")
3007 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
3008 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
3009 auth_header_value: Optional[str] = Field(None, description="value for custom headers authentication")
3010 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
3012 # Adding `auth_value` as an alias for better access post-validation
3013 auth_value: Optional[str] = Field(None, validate_default=True)
3015 # OAuth 2.0 configuration
3016 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
3018 # Query Parameter Authentication (INSECURE)
3019 auth_query_param_key: Optional[str] = Field(
3020 None,
3021 description="Query parameter name for authentication",
3022 pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-]*$",
3023 )
3024 auth_query_param_value: Optional[SecretStr] = Field(
3025 None,
3026 description="Query parameter value (API key)",
3027 )
3029 # One time auth - do not store the auth in gateway flag
3030 one_time_auth: Optional[bool] = Field(default=False, description="The authentication should be used only once and not stored in the gateway")
3032 tags: Optional[List[Union[str, Dict[str, str]]]] = Field(None, description="Tags for categorizing the gateway")
3034 # Team scoping fields for resource organization
3035 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
3036 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
3037 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Gateway visibility: private, team, or public")
3039 # Per-gateway refresh configuration
3040 refresh_interval_seconds: Optional[int] = Field(None, ge=60, description="Per-gateway refresh interval in seconds (minimum 60); uses global default if not set")
3042 # Gateway mode configuration
3043 gateway_mode: Optional[str] = Field(None, description="Gateway mode: 'cache' (database caching, default) or 'direct_proxy' (pass-through mode with no caching)", pattern="^(cache|direct_proxy)$")
3045 # CA certificate configuration for custom TLS trust
3046 ca_certificate: Optional[str] = Field(None, description="Custom CA certificate for TLS verification")
3047 ca_certificate_sig: Optional[str] = Field(None, description="Signature of the custom CA certificate")
3048 signing_algorithm: Optional[str] = Field(None, description="Algorithm used for signing the CA certificate")
3050 # mTLS client TLS certificate and key
3051 client_cert: Optional[str] = Field(None, description="Client TLS certificate for mTLS gateway authentication")
3052 client_key: Optional[str] = Field(None, description="Client TLS key for mTLS gateway authentication")
3054 @field_validator("tags")
3055 @classmethod
3056 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
3057 """Validate and normalize tags.
3059 Args:
3060 v: Optional list of tag strings to validate
3062 Returns:
3063 List of validated tag strings
3064 """
3065 return validate_tags_field(v)
3067 @field_validator("name", mode="before")
3068 @classmethod
3069 def validate_name(cls, v: str) -> str:
3070 """Validate gateway name
3072 Args:
3073 v (str): Value to validate
3075 Returns:
3076 str: Value if validated as safe
3077 """
3078 return SecurityValidator.validate_name(v, "Gateway name")
3080 @field_validator("url", mode="before")
3081 @classmethod
3082 def validate_url(cls, v: str) -> str:
3083 """Validate gateway URL
3085 Args:
3086 v (str): Value to validate
3088 Returns:
3089 str: Value if validated as safe
3090 """
3091 return validate_core_url(v, "Gateway URL")
3093 @field_validator("description", mode="before")
3094 @classmethod
3095 def validate_description(cls, v: Optional[str]) -> Optional[str]:
3096 """Ensure descriptions display safely, truncate if too long
3098 Args:
3099 v (str): Value to validate
3101 Returns:
3102 str: Value if validated as safe and truncated if too long
3104 Raises:
3105 ValueError: When value is unsafe
3107 Examples:
3108 >>> from mcpgateway.schemas import GatewayUpdate
3109 >>> GatewayUpdate.validate_description('A safe description')
3110 'A safe description'
3111 >>> GatewayUpdate.validate_description(None) # Test None case
3112 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
3113 >>> truncated = ToolCreate.validate_description(long_desc)
3114 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
3115 0
3116 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
3117 True
3118 """
3119 if v is None:
3120 return v
3121 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
3122 # Truncate the description to the maximum allowed length
3123 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
3124 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
3125 return SecurityValidator.sanitize_display_text(truncated, "Description")
3126 return SecurityValidator.sanitize_display_text(v, "Description")
3128 @field_validator("auth_value", mode="before")
3129 @classmethod
3130 def create_auth_value(cls, v, info):
3131 """
3132 This validator will run before the model is fully instantiated (mode="before")
3133 It will process the auth fields based on auth_type and generate auth_value.
3135 Args:
3136 v: Input URL
3137 info: ValidationInfo containing auth_type
3139 Returns:
3140 str: Auth value or URL
3141 """
3142 data = info.data
3143 auth_type = data.get("auth_type")
3145 if (auth_type is None) or (auth_type == ""):
3146 return v # If no auth_type is provided, no need to create auth_value
3148 # Process the auth fields and generate auth_value based on auth_type
3149 auth_value = cls._process_auth_fields(info)
3150 return auth_value
3152 @staticmethod
3153 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
3154 """
3155 Processes the input authentication fields and returns the correct auth_value.
3156 This method is called based on the selected auth_type.
3158 Args:
3159 info: ValidationInfo containing auth fields
3161 Returns:
3162 Encoded auth string or None
3164 Raises:
3165 ValueError: If auth type is invalid
3166 """
3168 data = info.data
3169 auth_type = data.get("auth_type")
3171 if auth_type == "basic":
3172 # For basic authentication, both username and password must be present
3173 username = data.get("auth_username")
3174 password = data.get("auth_password")
3175 if not username or not password:
3176 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
3178 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
3179 return encode_auth({"Authorization": f"Basic {creds}"})
3181 if auth_type == "bearer":
3182 # For bearer authentication, only token is required
3183 token = data.get("auth_token")
3185 if not token:
3186 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
3188 return encode_auth({"Authorization": f"Bearer {token}"})
3190 if auth_type == "oauth":
3191 # For OAuth authentication, we don't encode anything here
3192 # The OAuth configuration is handled separately in the oauth_config field
3193 # This method is only called for traditional auth types
3194 return None
3196 if auth_type == "authheaders":
3197 # Support both new multi-headers format and legacy single header format
3198 auth_headers = data.get("auth_headers")
3199 if auth_headers and isinstance(auth_headers, list):
3200 # New multi-headers format with enhanced validation
3201 header_dict = {}
3202 duplicate_keys = set()
3204 for header in auth_headers:
3205 if not isinstance(header, dict):
3206 continue
3208 key = header.get("key")
3209 value = header.get("value", "")
3211 # Skip headers without keys
3212 if not key:
3213 continue
3215 # Track duplicate keys (last value wins)
3216 if key in header_dict:
3217 duplicate_keys.add(key)
3219 # Validate header key format (basic HTTP header validation)
3220 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
3221 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
3223 # Store header (empty values are allowed)
3224 header_dict[key] = value
3226 # Ensure at least one valid header
3227 if not header_dict:
3228 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
3230 # Warn about duplicate keys (optional - could log this instead)
3231 if duplicate_keys:
3232 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
3234 # Check for excessive headers (prevent abuse)
3235 if len(header_dict) > 100:
3236 raise ValueError("Maximum of 100 headers allowed per gateway.")
3238 return encode_auth(header_dict)
3240 # Legacy single header format (backward compatibility)
3241 header_key = data.get("auth_header_key")
3242 header_value = data.get("auth_header_value")
3244 if not header_key or not header_value:
3245 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
3247 return encode_auth({header_key: header_value})
3249 if auth_type == "one_time_auth":
3250 return None # No auth_value needed for one-time auth
3252 if auth_type == "query_param":
3253 # Query param auth doesn't use auth_value field
3254 # Validation is handled by model_validator
3255 return None
3257 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
3259 @model_validator(mode="after")
3260 def validate_query_param_auth(self) -> "GatewayUpdate":
3261 """Validate query parameter authentication configuration.
3263 NOTE: This only runs when auth_type is explicitly set to "query_param".
3264 Service-layer enforcement in update_gateway() handles the case where
3265 auth_type is omitted but the existing gateway uses query_param auth.
3267 Returns:
3268 GatewayUpdate: The validated instance.
3270 Raises:
3271 ValueError: If required fields are missing when setting query_param auth.
3272 """
3273 if self.auth_type == "query_param":
3274 # Validate fields are provided when explicitly setting query_param auth
3275 # Feature flag/allowlist check happens in service layer (has access to existing gateway)
3276 if not self.auth_query_param_key:
3277 raise ValueError("auth_query_param_key is required when setting auth_type to 'query_param'")
3278 if not self.auth_query_param_value:
3279 raise ValueError("auth_query_param_value is required when setting auth_type to 'query_param'")
3281 return self
3284# ---------------------------------------------------------------------------
3285# OAuth config masking helper (used by GatewayRead.masked / A2AAgentRead.masked)
3286# ---------------------------------------------------------------------------
3287_SENSITIVE_OAUTH_KEYS = OAUTH_SENSITIVE_KEYS
3290def _mask_oauth_config(oauth_config: Any) -> Any:
3291 """Recursively mask sensitive keys inside an ``oauth_config`` dict.
3293 Args:
3294 oauth_config: The oauth_config value to mask (dict, list, or scalar).
3296 Returns:
3297 The masked copy with sensitive values replaced.
3298 """
3299 if isinstance(oauth_config, dict):
3300 out: Dict[str, Any] = {}
3301 for k, v in oauth_config.items():
3302 if isinstance(k, str) and k.lower() in _SENSITIVE_OAUTH_KEYS:
3303 out[k] = settings.masked_auth_value if v else v
3304 else:
3305 out[k] = _mask_oauth_config(v)
3306 return out
3307 if isinstance(oauth_config, list):
3308 return [_mask_oauth_config(x) for x in oauth_config]
3309 return oauth_config
3312class GatewayRead(BaseModelWithConfigDict):
3313 """Schema for reading gateway information.
3315 Includes all gateway fields plus:
3316 - Database ID
3317 - Capabilities dictionary
3318 - Creation/update timestamps
3319 - enabled status
3320 - reachable status
3321 - Last seen timestamp
3322 - Authentication type: basic, bearer, authheaders, oauth
3323 - Authentication value: username/password or token or custom headers
3324 - OAuth configuration for OAuth 2.0 authentication
3326 Auto Populated fields:
3327 - Authentication username: for basic auth
3328 - Authentication password: for basic auth
3329 - Authentication token: for bearer auth
3330 - Authentication header key: for authheaders auth
3331 - Authentication header value: for authheaders auth
3332 """
3334 id: Optional[str] = Field(None, description="Unique ID of the gateway")
3335 name: str = Field(..., description="Unique name for the gateway")
3336 url: str = Field(..., description="Gateway endpoint URL")
3337 description: Optional[str] = Field(None, description="Gateway description")
3338 transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
3339 capabilities: Dict[str, Any] = Field(default_factory=dict, description="Gateway capabilities")
3340 created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Creation timestamp")
3341 updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Last update timestamp")
3342 enabled: bool = Field(default=True, description="Is the gateway enabled?")
3343 reachable: bool = Field(default=True, description="Is the gateway reachable/online?")
3345 last_seen: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), description="Last seen timestamp")
3347 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
3348 ca_certificate: Optional[str] = Field(default=None, description="Custom CA certificate for TLS verification")
3349 ca_certificate_sig: Optional[str] = Field(default=None, description="Signature of the custom CA certificate")
3350 signing_algorithm: Optional[str] = Field(default="ed25519", description="Algorithm used for signing the CA certificate")
3351 client_cert: Optional[str] = Field(default=None, description="Client TLS certificate for mTLS authentication")
3352 client_key: Optional[str] = Field(default=None, description="Client TLS key for mTLS authentication")
3353 # Authorizations
3354 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, authheaders, oauth, query_param, or None")
3355 auth_value: Optional[str] = Field(None, description="auth value: username/password or token or custom headers")
3356 auth_headers: Optional[List[Dict[str, str]]] = Field(default=None, description="List of custom headers for authentication")
3357 auth_headers_unmasked: Optional[List[Dict[str, str]]] = Field(default=None, description="Unmasked custom headers for administrative views")
3359 # OAuth 2.0 configuration
3360 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
3362 # Query Parameter Authentication (masked for security)
3363 auth_query_param_key: Optional[str] = Field(
3364 None,
3365 description="Query parameter name for authentication",
3366 )
3367 auth_query_param_value_masked: Optional[str] = Field(
3368 None,
3369 description="Masked indicator if query param auth is configured",
3370 )
3372 # auth_value will populate the following fields
3373 auth_username: Optional[str] = Field(None, description="username for basic authentication")
3374 auth_password: Optional[str] = Field(None, description="password for basic authentication")
3375 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
3376 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
3377 auth_header_value: Optional[str] = Field(None, description="vallue for custom headers authentication")
3378 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the gateway")
3380 auth_password_unmasked: Optional[str] = Field(default=None, description="Unmasked password for basic authentication")
3381 auth_token_unmasked: Optional[str] = Field(default=None, description="Unmasked bearer token for authentication")
3382 auth_header_value_unmasked: Optional[str] = Field(default=None, description="Unmasked single custom header value")
3384 # Team scoping fields for resource organization
3385 team_id: Optional[str] = Field(None, description="Team ID this gateway belongs to")
3386 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
3387 owner_email: Optional[str] = Field(None, description="Email of the gateway owner")
3388 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Gateway visibility: private, team, or public")
3390 # Comprehensive metadata for audit tracking
3391 created_by: Optional[str] = Field(None, description="Username who created this entity")
3392 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
3393 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
3394 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
3396 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
3397 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
3398 modified_via: Optional[str] = Field(None, description="Modification method")
3399 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
3401 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
3402 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
3403 version: Optional[int] = Field(1, description="Entity version for change tracking")
3405 slug: Optional[str] = Field(None, description="Slug for gateway endpoint URL")
3407 # Per-gateway refresh configuration
3408 refresh_interval_seconds: Optional[int] = Field(None, description="Per-gateway refresh interval in seconds")
3409 last_refresh_at: Optional[datetime] = Field(None, description="Timestamp of last successful refresh")
3411 # Gateway mode configuration
3412 gateway_mode: str = Field(default="cache", description="Gateway mode: 'cache' (database caching, default) or 'direct_proxy' (pass-through mode with no caching)")
3414 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
3416 # Tool count (populated from the tools relationship; 0 when not loaded)
3417 tool_count: int = Field(default=0, description="Number of tools registered for this gateway")
3419 @model_validator(mode="before")
3420 @classmethod
3421 def _mask_query_param_auth(cls, data: Any) -> Any:
3422 """Mask query param auth value when constructing from DB model.
3424 This extracts auth_query_params from the raw data (DB model or dict)
3425 and populates the masked fields for display.
3427 Args:
3428 data: The raw data (dict or ORM model) to process.
3430 Returns:
3431 Any: The processed data with masked query param values.
3432 """
3433 # Handle dict input
3434 if isinstance(data, dict):
3435 auth_query_params = data.get("auth_query_params")
3436 if auth_query_params and isinstance(auth_query_params, dict):
3437 # Extract the param key name and set masked value
3438 first_key = next(iter(auth_query_params.keys()), None)
3439 if first_key:
3440 data["auth_query_param_key"] = first_key
3441 data["auth_query_param_value_masked"] = settings.masked_auth_value
3442 # Handle ORM model input (has auth_query_params attribute)
3443 elif hasattr(data, "auth_query_params"):
3444 auth_query_params = getattr(data, "auth_query_params", None)
3445 if auth_query_params and isinstance(auth_query_params, dict):
3446 # Convert ORM to dict for modification, preserving all attributes
3447 # Start with table columns
3448 data_dict = {c.name: getattr(data, c.name) for c in data.__table__.columns}
3449 # Preserve dynamically added attributes like 'team' (from relationships)
3450 for attr in ["team"]:
3451 if hasattr(data, attr):
3452 data_dict[attr] = getattr(data, attr)
3453 first_key = next(iter(auth_query_params.keys()), None)
3454 if first_key:
3455 data_dict["auth_query_param_key"] = first_key
3456 data_dict["auth_query_param_value_masked"] = settings.masked_auth_value
3457 return data_dict
3458 return data
3460 # This will be the main method to automatically populate fields
3461 @model_validator(mode="after")
3462 def _populate_auth(self) -> Self:
3463 """Populate authentication fields based on auth_type and encoded auth_value.
3465 This post-validation method decodes the stored authentication value and
3466 populates the appropriate authentication fields (username/password, token,
3467 or custom headers) based on the authentication type. It ensures the
3468 authentication data is properly formatted and accessible through individual
3469 fields for display purposes.
3471 The method handles three authentication types:
3472 - basic: Extracts username and password from Authorization header
3473 - bearer: Extracts token from Bearer Authorization header
3474 - authheaders: Extracts custom header key/value pair
3476 Returns:
3477 Self: The instance with populated authentication fields:
3478 - For basic: auth_username and auth_password
3479 - For bearer: auth_token
3480 - For authheaders: auth_header_key and auth_header_value
3482 Raises:
3483 ValueError: If the authentication data is malformed:
3484 - Basic auth missing username or password
3485 - Bearer auth missing or improperly formatted Authorization header
3486 - Custom headers not exactly one key/value pair
3488 Examples:
3489 >>> # Basic auth example
3490 >>> string_bytes = "admin:secret".encode("utf-8")
3491 >>> encoded_auth = base64.urlsafe_b64encode(string_bytes).decode("utf-8")
3492 >>> values = GatewayRead.model_construct(
3493 ... auth_type="basic",
3494 ... auth_value=encode_auth({"Authorization": f"Basic {encoded_auth}"})
3495 ... )
3496 >>> values = GatewayRead._populate_auth(values)
3497 >>> values.auth_username
3498 'admin'
3499 >>> values.auth_password
3500 'secret'
3502 >>> # Bearer auth example
3503 >>> values = GatewayRead.model_construct(
3504 ... auth_type="bearer",
3505 ... auth_value=encode_auth({"Authorization": "Bearer mytoken123"})
3506 ... )
3507 >>> values = GatewayRead._populate_auth(values)
3508 >>> values.auth_token
3509 'mytoken123'
3511 >>> # Custom headers example
3512 >>> values = GatewayRead.model_construct(
3513 ... auth_type='authheaders',
3514 ... auth_value=encode_auth({"X-API-Key": "abc123"})
3515 ... )
3516 >>> values = GatewayRead._populate_auth(values)
3517 >>> values.auth_header_key
3518 'X-API-Key'
3519 >>> values.auth_header_value
3520 'abc123'
3521 """
3522 auth_type = self.auth_type
3523 auth_value_encoded = self.auth_value
3525 # Skip validation logic if masked value
3526 if auth_value_encoded == settings.masked_auth_value:
3527 return self
3529 # Handle OAuth authentication (no auth_value to decode)
3530 if auth_type == "oauth":
3531 # OAuth gateways don't have traditional auth_value to decode
3532 # They use oauth_config instead
3533 return self
3535 if auth_type == "one_time_auth":
3536 # One-time auth gateways don't store auth_value
3537 return self
3539 if auth_type == "query_param":
3540 # Query param auth is handled by the before validator
3541 # (auth_query_params from DB model is processed there)
3542 return self
3544 # If no encoded value is present, nothing to populate
3545 if not auth_value_encoded:
3546 return self
3548 auth_value = decode_auth(auth_value_encoded)
3549 if auth_type == "basic":
3550 auth = auth_value.get("Authorization")
3551 if not (isinstance(auth, str) and auth.startswith("Basic ")):
3552 raise ValueError("basic auth requires an Authorization header of the form 'Basic <base64>'")
3553 auth = auth.removeprefix("Basic ")
3554 u, p = base64.urlsafe_b64decode(auth).decode("utf-8").split(":")
3555 if not u or not p:
3556 raise ValueError("basic auth requires both username and password")
3557 self.auth_username, self.auth_password = u, p
3558 self.auth_password_unmasked = p
3560 elif auth_type == "bearer":
3561 auth = auth_value.get("Authorization")
3562 if not (isinstance(auth, str) and auth.startswith("Bearer ")):
3563 raise ValueError("bearer auth requires an Authorization header of the form 'Bearer <token>'")
3564 self.auth_token = auth.removeprefix("Bearer ")
3565 self.auth_token_unmasked = self.auth_token
3567 elif auth_type == "authheaders":
3568 # For backward compatibility, populate first header in key/value fields
3569 if not isinstance(auth_value, dict) or len(auth_value) == 0:
3570 raise ValueError("authheaders requires at least one key/value pair")
3571 self.auth_headers = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
3572 self.auth_headers_unmasked = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
3573 k, v = next(iter(auth_value.items()))
3574 self.auth_header_key, self.auth_header_value = k, v
3575 self.auth_header_value_unmasked = v
3577 return self
3579 def masked(self) -> "GatewayRead":
3580 """
3581 Return a masked version of the model instance with sensitive authentication fields hidden.
3583 This method creates a dictionary representation of the model data and replaces sensitive fields
3584 such as `auth_value`, `auth_password`, `auth_token`, and `auth_header_value` with a masked
3585 placeholder value defined in `settings.masked_auth_value`. Masking is only applied if the fields
3586 are present and not already masked.
3588 Args:
3589 None
3591 Returns:
3592 GatewayRead: A new instance of the GatewayRead model with sensitive authentication-related fields
3593 masked to prevent exposure of sensitive information.
3595 Notes:
3596 - The `auth_value` field is only masked if it exists and its value is different from the masking
3597 placeholder.
3598 - Other sensitive fields (`auth_password`, `auth_token`, `auth_header_value`) are masked if present.
3599 - Fields not related to authentication remain unmodified.
3600 """
3601 masked_data = self.model_dump()
3603 # Only mask if auth_value is present and not already masked
3604 if masked_data.get("auth_value") and masked_data["auth_value"] != settings.masked_auth_value:
3605 masked_data["auth_value"] = settings.masked_auth_value
3607 masked_data["auth_password"] = settings.masked_auth_value if masked_data.get("auth_password") else None
3608 masked_data["auth_token"] = settings.masked_auth_value if masked_data.get("auth_token") else None
3609 masked_data["auth_header_value"] = settings.masked_auth_value if masked_data.get("auth_header_value") else None
3610 if masked_data.get("auth_headers"):
3611 masked_data["auth_headers"] = [
3612 {
3613 "key": header.get("key"),
3614 "value": settings.masked_auth_value if header.get("value") else header.get("value"),
3615 }
3616 for header in masked_data["auth_headers"]
3617 ]
3619 # Mask sensitive keys inside oauth_config (e.g. password, client_secret)
3620 if masked_data.get("oauth_config"):
3621 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
3623 # SECURITY: Never expose unmasked credentials in API responses
3624 masked_data["auth_password_unmasked"] = None
3625 masked_data["auth_token_unmasked"] = None
3626 masked_data["auth_header_value_unmasked"] = None
3627 masked_data["auth_headers_unmasked"] = None
3628 # SECURITY: Mask mTLS client private key
3629 masked_data["client_key"] = settings.masked_auth_value if masked_data.get("client_key") else None
3630 return GatewayRead.model_validate(masked_data)
3633class GatewayRefreshResponse(BaseModelWithConfigDict):
3634 """Response schema for manual gateway refresh API.
3636 Contains counts of added, updated, and removed items for tools, resources, and prompts,
3637 along with any validation errors encountered during the refresh operation.
3638 """
3640 gateway_id: str = Field(..., description="ID of the refreshed gateway")
3641 success: bool = Field(default=True, description="Whether the refresh operation was successful")
3642 error: Optional[str] = Field(None, description="Error message if the refresh failed")
3643 tools_added: int = Field(default=0, description="Number of tools added")
3644 tools_updated: int = Field(default=0, description="Number of tools updated")
3645 tools_removed: int = Field(default=0, description="Number of tools removed")
3646 resources_added: int = Field(default=0, description="Number of resources added")
3647 resources_updated: int = Field(default=0, description="Number of resources updated")
3648 resources_removed: int = Field(default=0, description="Number of resources removed")
3649 prompts_added: int = Field(default=0, description="Number of prompts added")
3650 prompts_updated: int = Field(default=0, description="Number of prompts updated")
3651 prompts_removed: int = Field(default=0, description="Number of prompts removed")
3652 validation_errors: List[str] = Field(default_factory=list, description="List of validation errors encountered")
3653 duration_ms: float = Field(..., description="Duration of the refresh operation in milliseconds")
3654 refreshed_at: datetime = Field(..., description="Timestamp when the refresh completed")
3657class FederatedTool(BaseModelWithConfigDict):
3658 """Schema for tools provided by federated gateways.
3660 Contains:
3661 - Tool definition
3662 - Source gateway information
3663 """
3665 tool: MCPTool
3666 gateway_id: str
3667 gateway_name: str
3668 gateway_url: str
3671class FederatedResource(BaseModelWithConfigDict):
3672 """Schema for resources from federated gateways.
3674 Contains:
3675 - Resource definition
3676 - Source gateway information
3677 """
3679 resource: MCPResource
3680 gateway_id: str
3681 gateway_name: str
3682 gateway_url: str
3685class FederatedPrompt(BaseModelWithConfigDict):
3686 """Schema for prompts from federated gateways.
3688 Contains:
3689 - Prompt definition
3690 - Source gateway information
3691 """
3693 prompt: MCPPrompt
3694 gateway_id: str
3695 gateway_name: str
3696 gateway_url: str
3699# --- RPC Schemas ---
3700class RPCRequest(BaseModel):
3701 """MCP-compliant RPC request validation"""
3703 model_config = ConfigDict(hide_input_in_errors=True)
3705 jsonrpc: Literal["2.0"]
3706 method: str
3707 params: Optional[Dict[str, Any]] = None
3708 id: Optional[Union[int, str]] = None
3710 @field_validator("method")
3711 @classmethod
3712 def validate_method(cls, v: str) -> str:
3713 """Ensure method names follow MCP format
3715 Args:
3716 v (str): Value to validate
3718 Returns:
3719 str: Value if determined as safe
3721 Raises:
3722 ValueError: When value is not safe
3723 """
3724 SecurityValidator.validate_no_xss(v, "RPC method name")
3725 # Runtime pattern matching (not precompiled to allow test monkeypatching)
3726 if not re.match(settings.validation_tool_method_pattern, v):
3727 raise ValueError("Invalid method name format")
3728 if len(v) > settings.validation_max_method_length:
3729 raise ValueError("Method name too long")
3730 return v
3732 @field_validator("params")
3733 @classmethod
3734 def validate_params(cls, v: Optional[Union[Dict, List]]) -> Optional[Union[Dict, List]]:
3735 """Validate RPC parameters
3737 Args:
3738 v (Union[dict, list]): Value to validate
3740 Returns:
3741 Union[dict, list]: Value if determined as safe
3743 Raises:
3744 ValueError: When value is not safe
3745 """
3746 if v is None:
3747 return v
3749 # Check size limits (MCP recommends max 256KB for params)
3750 param_size = len(orjson.dumps(v))
3751 if param_size > settings.validation_max_rpc_param_size:
3752 raise ValueError(f"Parameters exceed maximum size of {settings.validation_max_rpc_param_size} bytes")
3754 # Check depth
3755 SecurityValidator.validate_json_depth(v)
3756 return v
3759class RPCResponse(BaseModelWithConfigDict):
3760 """Schema for JSON-RPC 2.0 responses.
3762 Contains:
3763 - Protocol version
3764 - Result or error
3765 - Request ID
3766 """
3768 jsonrpc: Literal["2.0"]
3769 result: Optional[Any] = None
3770 error: Optional[Dict[str, Any]] = None
3771 id: Optional[Union[int, str]] = None
3774# --- Event and Admin Schemas ---
3777class EventMessage(BaseModelWithConfigDict):
3778 """Schema for SSE event messages.
3780 Includes:
3781 - Event type
3782 - Event data payload
3783 - Event timestamp
3784 """
3786 type: str = Field(..., description="Event type (tool_added, resource_updated, etc)")
3787 data: Dict[str, Any] = Field(..., description="Event payload")
3788 timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
3790 @field_serializer("timestamp")
3791 def serialize_timestamp(self, dt: datetime) -> str:
3792 """
3793 Serialize the `timestamp` field as an ISO 8601 string with UTC timezone.
3795 Converts the given datetime to UTC and returns it in ISO 8601 format,
3796 replacing the "+00:00" suffix with "Z" to indicate UTC explicitly.
3798 Args:
3799 dt (datetime): The datetime object to serialize.
3801 Returns:
3802 str: ISO 8601 formatted string in UTC, ending with 'Z'.
3803 """
3804 return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
3807class AdminToolCreate(BaseModelWithConfigDict):
3808 """Schema for creating tools via admin UI.
3810 Handles:
3811 - Basic tool information
3812 - JSON string inputs for headers/schema
3813 """
3815 name: str
3816 url: str
3817 description: Optional[str] = None
3818 integration_type: str = "MCP"
3819 headers: Optional[str] = None # JSON string
3820 input_schema: Optional[str] = None # JSON string
3822 @field_validator("headers", "input_schema")
3823 @classmethod
3824 def validate_json(cls, v: Optional[str]) -> Optional[Dict[str, Any]]:
3825 """
3826 Validate and parse JSON string inputs.
3828 Args:
3829 v: Input string
3831 Returns:
3832 dict: Output JSON version of v
3834 Raises:
3835 ValueError: When unable to convert to JSON
3836 """
3837 if not v:
3838 return None
3839 try:
3840 return orjson.loads(v)
3841 except orjson.JSONDecodeError:
3842 raise ValueError("Invalid JSON")
3845class AdminGatewayCreate(BaseModelWithConfigDict):
3846 """Schema for creating gateways via admin UI.
3848 Captures:
3849 - Gateway name
3850 - Endpoint URL
3851 - Optional description
3852 """
3854 name: str
3855 url: str
3856 description: Optional[str] = None
3859# --- New Schemas for Status Toggle Operations ---
3862class StatusToggleRequest(BaseModelWithConfigDict):
3863 """Request schema for toggling active status."""
3865 activate: bool = Field(..., description="Whether to activate (true) or deactivate (false) the item")
3868class StatusToggleResponse(BaseModelWithConfigDict):
3869 """Response schema for status toggle operations."""
3871 id: int
3872 name: str
3873 is_active: bool
3874 message: str = Field(..., description="Success message")
3877# --- Optional Filter Parameters for Listing Operations ---
3880class ListFilters(BaseModelWithConfigDict):
3881 """Filtering options for list operations."""
3883 include_inactive: bool = Field(False, description="Whether to include inactive items in the results")
3886# --- Server Schemas ---
3889class ServerCreate(BaseModel):
3890 """
3891 Schema for creating a new server.
3893 Attributes:
3894 model_config (ConfigDict): Configuration for the model, such as stripping whitespace from strings.
3895 name (str): The server's name.
3896 description (Optional[str]): Optional description of the server.
3897 icon (Optional[str]): Optional URL for the server's icon.
3898 associated_tools (Optional[List[str]]): Optional list of associated tool IDs.
3899 associated_resources (Optional[List[str]]): Optional list of associated resource IDs.
3900 associated_prompts (Optional[List[str]]): Optional list of associated prompt IDs.
3901 """
3903 model_config = ConfigDict(str_strip_whitespace=True)
3905 id: Optional[str] = Field(None, description="Custom UUID for the server (if not provided, one will be generated)")
3906 name: str = Field(..., description="The server's name")
3907 description: Optional[str] = Field(None, description="Server description")
3908 icon: Optional[str] = Field(None, description="URL for the server's icon")
3909 tags: Optional[List[str]] = Field(default_factory=list, description="Tags for categorizing the server")
3911 @field_validator("tags")
3912 @classmethod
3913 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
3914 """Validate and normalize tags.
3916 Args:
3917 v: Optional list of tag strings to validate
3919 Returns:
3920 List of validated tag strings
3921 """
3922 return validate_tags_field(v)
3924 @field_validator("id")
3925 @classmethod
3926 def validate_id(cls, v: Optional[str]) -> Optional[str]:
3927 """Validate server ID/UUID format
3929 Args:
3930 v (str): Value to validate
3932 Returns:
3933 str: Value if validated as safe
3935 Raises:
3936 ValueError: When displayName contains unsafe content or exceeds length limits
3938 Examples:
3939 >>> from mcpgateway.schemas import ServerCreate
3940 >>> ServerCreate.validate_id('550e8400-e29b-41d4-a716-446655440000')
3941 '550e8400e29b41d4a716446655440000'
3942 >>> ServerCreate.validate_id('invalid-uuid')
3943 Traceback (most recent call last):
3944 ...
3945 ValueError: ...
3946 """
3947 if v is None:
3948 return v
3949 return SecurityValidator.validate_uuid(v, "Server ID")
3951 associated_tools: Optional[List[str]] = Field(None, description="Comma-separated tool IDs")
3952 associated_resources: Optional[List[str]] = Field(None, description="Comma-separated resource IDs")
3953 associated_prompts: Optional[List[str]] = Field(None, description="Comma-separated prompt IDs")
3954 associated_a2a_agents: Optional[List[str]] = Field(None, description="Comma-separated A2A agent IDs")
3956 # Team scoping fields
3957 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
3958 owner_email: Optional[str] = Field(None, description="Email of the server owner")
3959 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
3961 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
3962 oauth_enabled: bool = Field(False, description="Enable OAuth 2.0 for MCP client authentication")
3963 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
3965 @field_validator("name")
3966 @classmethod
3967 def validate_name(cls, v: str) -> str:
3968 """Validate server name
3970 Args:
3971 v (str): Value to validate
3973 Returns:
3974 str: Value if validated as safe
3975 """
3976 return SecurityValidator.validate_name(v, "Server name")
3978 @field_validator("description")
3979 @classmethod
3980 def validate_description(cls, v: Optional[str]) -> Optional[str]:
3981 """Ensure descriptions display safely, truncate if too long
3983 Args:
3984 v (str): Value to validate
3986 Returns:
3987 str: Value if validated as safe and truncated if too long
3989 Raises:
3990 ValueError: When value is unsafe
3992 Examples:
3993 >>> from mcpgateway.schemas import ServerCreate
3994 >>> ServerCreate.validate_description('A safe description')
3995 'A safe description'
3996 >>> ServerCreate.validate_description(None) # Test None case
3997 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
3998 >>> truncated = ServerCreate.validate_description(long_desc)
3999 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4000 0
4001 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4002 True
4003 """
4004 if v is None:
4005 return v
4006 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4007 # Truncate the description to the maximum allowed length
4008 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4009 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4010 return SecurityValidator.sanitize_display_text(truncated, "Description")
4011 return SecurityValidator.sanitize_display_text(v, "Description")
4013 @field_validator("icon")
4014 @classmethod
4015 def validate_icon(cls, v: Optional[str]) -> Optional[str]:
4016 """Validate icon URL
4018 Args:
4019 v (str): Value to validate
4021 Returns:
4022 str: Value if validated as safe
4023 """
4024 if v is None or v == "":
4025 return v
4026 return validate_core_url(v, "Icon URL")
4028 @field_validator("associated_tools", "associated_resources", "associated_prompts", "associated_a2a_agents", mode="before")
4029 @classmethod
4030 def split_comma_separated(cls, v):
4031 """
4032 Splits a comma-separated string into a list of strings if needed.
4034 Args:
4035 v: Input string
4037 Returns:
4038 list: Comma separated array of input string
4039 """
4040 if isinstance(v, str):
4041 return [item.strip() for item in v.split(",") if item.strip()]
4042 return v
4044 @field_validator("team_id")
4045 @classmethod
4046 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4047 """Validate team ID format.
4049 Args:
4050 v: Team ID to validate
4052 Returns:
4053 Validated team ID
4054 """
4055 if v is not None:
4056 return SecurityValidator.validate_uuid(v, "team_id")
4057 return v
4060class ServerUpdate(BaseModelWithConfigDict):
4061 """Schema for updating an existing server.
4063 All fields are optional to allow partial updates.
4064 """
4066 id: Optional[str] = Field(None, description="Custom UUID for the server")
4067 name: Optional[str] = Field(None, description="The server's name")
4068 description: Optional[str] = Field(None, description="Server description")
4069 icon: Optional[str] = Field(None, description="URL for the server's icon")
4070 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the server")
4072 # Team scoping fields
4073 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4074 owner_email: Optional[str] = Field(None, description="Email of the server owner")
4075 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
4077 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
4078 oauth_enabled: Optional[bool] = Field(None, description="Enable OAuth 2.0 for MCP client authentication")
4079 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
4081 @field_validator("tags")
4082 @classmethod
4083 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
4084 """Validate and normalize tags.
4086 Args:
4087 v: Optional list of tag strings to validate
4089 Returns:
4090 List of validated tag strings
4091 """
4092 return validate_tags_field(v)
4094 @field_validator("id")
4095 @classmethod
4096 def validate_id(cls, v: Optional[str]) -> Optional[str]:
4097 """Validate server ID/UUID format
4099 Args:
4100 v (str): Value to validate
4102 Returns:
4103 str: Value if validated as safe
4105 Raises:
4106 ValueError: When displayName contains unsafe content or exceeds length limits
4108 Examples:
4109 >>> from mcpgateway.schemas import ServerUpdate
4110 >>> ServerUpdate.validate_id('550e8400-e29b-41d4-a716-446655440000')
4111 '550e8400e29b41d4a716446655440000'
4112 >>> ServerUpdate.validate_id('invalid-uuid')
4113 Traceback (most recent call last):
4114 ...
4115 ValueError: ...
4116 """
4117 if v is None:
4118 return v
4119 return SecurityValidator.validate_uuid(v, "Server ID")
4121 associated_tools: Optional[List[str]] = Field(None, description="Comma-separated tool IDs")
4122 associated_resources: Optional[List[str]] = Field(None, description="Comma-separated resource IDs")
4123 associated_prompts: Optional[List[str]] = Field(None, description="Comma-separated prompt IDs")
4124 associated_a2a_agents: Optional[List[str]] = Field(None, description="Comma-separated A2A agent IDs")
4126 @field_validator("name")
4127 @classmethod
4128 def validate_name(cls, v: str) -> str:
4129 """Validate server name
4131 Args:
4132 v (str): Value to validate
4134 Returns:
4135 str: Value if validated as safe
4136 """
4137 return SecurityValidator.validate_name(v, "Server name")
4139 @field_validator("description")
4140 @classmethod
4141 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4142 """Ensure descriptions display safely, truncate if too long
4144 Args:
4145 v (str): Value to validate
4147 Returns:
4148 str: Value if validated as safe and truncated if too long
4150 Raises:
4151 ValueError: When value is unsafe
4153 Examples:
4154 >>> from mcpgateway.schemas import ServerUpdate
4155 >>> ServerUpdate.validate_description('A safe description')
4156 'A safe description'
4157 >>> ServerUpdate.validate_description(None) # Test None case
4158 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4159 >>> truncated = ServerUpdate.validate_description(long_desc)
4160 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4161 0
4162 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4163 True
4164 """
4165 if v is None:
4166 return v
4167 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4168 # Truncate the description to the maximum allowed length
4169 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4170 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4171 return SecurityValidator.sanitize_display_text(truncated, "Description")
4172 return SecurityValidator.sanitize_display_text(v, "Description")
4174 @field_validator("icon")
4175 @classmethod
4176 def validate_icon(cls, v: Optional[str]) -> Optional[str]:
4177 """Validate icon URL
4179 Args:
4180 v (str): Value to validate
4182 Returns:
4183 str: Value if validated as safe
4184 """
4185 if v is None or v == "":
4186 return v
4187 return validate_core_url(v, "Icon URL")
4189 @field_validator("associated_tools", "associated_resources", "associated_prompts", "associated_a2a_agents", mode="before")
4190 @classmethod
4191 def split_comma_separated(cls, v):
4192 """
4193 Splits a comma-separated string into a list of strings if needed.
4195 Args:
4196 v: Input string
4198 Returns:
4199 list: Comma separated array of input string
4200 """
4201 if isinstance(v, str):
4202 return [item.strip() for item in v.split(",") if item.strip()]
4203 return v
4206class ServerRead(BaseModelWithConfigDict):
4207 """Schema for reading server information.
4209 Includes all server fields plus:
4210 - Database ID
4211 - Associated tool, resource, and prompt IDs
4212 - Creation/update timestamps
4213 - Active status
4214 - Metrics: Aggregated metrics for the server invocations.
4215 """
4217 id: str
4218 name: str
4219 description: Optional[str]
4220 icon: Optional[str]
4221 created_at: datetime
4222 updated_at: datetime
4223 # is_active: bool
4224 enabled: bool
4225 associated_tools: List[str] = []
4226 associated_tool_ids: List[str] = []
4227 associated_resources: List[str] = []
4228 associated_prompts: List[str] = []
4229 associated_a2a_agents: List[str] = []
4230 metrics: Optional[ServerMetrics] = Field(None, description="Server metrics (may be None in list operations)")
4231 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the server")
4233 # Comprehensive metadata for audit tracking
4234 created_by: Optional[str] = Field(None, description="Username who created this entity")
4235 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
4236 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
4237 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
4239 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
4240 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
4241 modified_via: Optional[str] = Field(None, description="Modification method")
4242 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
4244 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
4245 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
4246 version: Optional[int] = Field(1, description="Entity version for change tracking")
4248 # Team scoping fields
4249 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
4250 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
4251 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
4252 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
4254 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
4255 oauth_enabled: bool = Field(False, description="Whether OAuth 2.0 is enabled for MCP client authentication")
4256 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration (authorization_server, scopes_supported, etc.)")
4258 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
4260 @model_validator(mode="before")
4261 @classmethod
4262 def populate_associated_ids(cls, values):
4263 """
4264 Pre-validation method that converts associated objects to their 'id'.
4266 This method checks 'associated_tools', 'associated_resources', and
4267 'associated_prompts' in the input and replaces each object with its `id`
4268 if present.
4270 Args:
4271 values (dict): The input values.
4273 Returns:
4274 dict: Updated values with object ids, or the original values if no
4275 changes are made.
4276 """
4277 # Normalize to a mutable dict
4278 if isinstance(values, dict):
4279 data = dict(values)
4280 else:
4281 try:
4282 data = dict(vars(values))
4283 except Exception:
4284 return values
4286 if data.get("associated_tools"):
4287 data["associated_tools"] = [getattr(tool, "id", tool) for tool in data["associated_tools"]]
4288 if data.get("associated_resources"):
4289 data["associated_resources"] = [getattr(res, "id", res) for res in data["associated_resources"]]
4290 if data.get("associated_prompts"):
4291 data["associated_prompts"] = [getattr(prompt, "id", prompt) for prompt in data["associated_prompts"]]
4292 if data.get("associated_a2a_agents"):
4293 data["associated_a2a_agents"] = [getattr(agent, "id", agent) for agent in data["associated_a2a_agents"]]
4294 return data
4296 def masked(self) -> "ServerRead":
4297 """Return a masked model with oauth_config secrets redacted.
4299 Returns:
4300 ServerRead: Masked server model.
4301 """
4302 masked_data = self.model_dump()
4303 if masked_data.get("oauth_config"):
4304 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
4305 return ServerRead.model_validate(masked_data)
4308class GatewayTestRequest(BaseModelWithConfigDict):
4309 """Schema for testing gateway connectivity.
4311 Includes the HTTP method, base URL, path, optional headers, body, and content type.
4312 """
4314 method: str = Field(..., description="HTTP method to test (GET, POST, etc.)")
4315 base_url: AnyHttpUrl = Field(..., description="Base URL of the gateway to test")
4316 path: str = Field(..., description="Path to append to the base URL")
4317 headers: Optional[Dict[str, str]] = Field(None, description="Optional headers for the request")
4318 body: Optional[Union[str, Dict[str, Any]]] = Field(None, description="Optional body for the request, can be a string or JSON object")
4319 content_type: Optional[str] = Field("application/json", description="Content type for the request body")
4322class GatewayTestResponse(BaseModelWithConfigDict):
4323 """Schema for the response from a gateway test request.
4325 Contains:
4326 - HTTP status code
4327 - Latency in milliseconds
4328 - Optional response body, which can be a string or JSON object
4329 """
4331 status_code: int = Field(..., description="HTTP status code returned by the gateway")
4332 latency_ms: int = Field(..., description="Latency of the request in milliseconds")
4333 body: Optional[Union[str, Dict[str, Any]]] = Field(None, description="Response body, can be a string or JSON object")
4336class TaggedEntity(BaseModelWithConfigDict):
4337 """A simplified representation of an entity that has a tag."""
4339 id: str = Field(..., description="The entity's ID")
4340 name: str = Field(..., description="The entity's name")
4341 type: str = Field(..., description="The entity type (tool, resource, prompt, server, gateway)")
4342 description: Optional[str] = Field(None, description="The entity's description")
4345class TagStats(BaseModelWithConfigDict):
4346 """Statistics for a single tag across all entity types."""
4348 tools: int = Field(default=0, description="Number of tools with this tag")
4349 resources: int = Field(default=0, description="Number of resources with this tag")
4350 prompts: int = Field(default=0, description="Number of prompts with this tag")
4351 servers: int = Field(default=0, description="Number of servers with this tag")
4352 gateways: int = Field(default=0, description="Number of gateways with this tag")
4353 total: int = Field(default=0, description="Total occurrences of this tag")
4356class TagInfo(BaseModelWithConfigDict):
4357 """Information about a single tag."""
4359 name: str = Field(..., description="The tag name")
4360 stats: TagStats = Field(..., description="Statistics for this tag")
4361 entities: Optional[List[TaggedEntity]] = Field(default_factory=list, description="Entities that have this tag")
4364class TopPerformer(BaseModelWithConfigDict):
4365 """Schema for representing top-performing entities with performance metrics.
4367 Used to encapsulate metrics for entities such as prompts, resources, servers, or tools,
4368 including execution count, average response time, success rate, and last execution timestamp.
4370 Attributes:
4371 id (Union[str, int]): Unique identifier for the entity.
4372 name (str): Name of the entity (e.g., prompt name, resource URI, server name, or tool name).
4373 execution_count (int): Total number of executions for the entity.
4374 avg_response_time (Optional[float]): Average response time in seconds, or None if no metrics.
4375 success_rate (Optional[float]): Success rate percentage, or None if no metrics.
4376 last_execution (Optional[datetime]): Timestamp of the last execution, or None if no metrics.
4377 """
4379 id: Union[str, int] = Field(..., description="Entity ID")
4380 name: str = Field(..., description="Entity name")
4381 execution_count: int = Field(..., description="Number of executions")
4382 avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
4383 success_rate: Optional[float] = Field(None, description="Success rate percentage")
4384 last_execution: Optional[datetime] = Field(None, description="Timestamp of last execution")
4387# --- A2A Agent Schemas ---
4390class A2AAgentCreate(BaseModel):
4391 """
4392 Schema for creating a new A2A (Agent-to-Agent) compatible agent.
4394 Attributes:
4395 model_config (ConfigDict): Configuration for the model.
4396 name (str): Unique name for the agent.
4397 description (Optional[str]): Optional description of the agent.
4398 endpoint_url (str): URL endpoint for the agent.
4399 agent_type (str): Type of agent (e.g., "openai", "anthropic", "custom").
4400 protocol_version (str): A2A protocol version supported.
4401 capabilities (Dict[str, Any]): Agent capabilities and features.
4402 config (Dict[str, Any]): Agent-specific configuration parameters.
4403 auth_type (Optional[str]): Type of authentication ("api_key", "oauth", "bearer", etc.).
4404 auth_username (Optional[str]): Username for basic authentication.
4405 auth_password (Optional[str]): Password for basic authentication.
4406 auth_token (Optional[str]): Token for bearer authentication.
4407 auth_header_key (Optional[str]): Key for custom headers authentication.
4408 auth_header_value (Optional[str]): Value for custom headers authentication.
4409 auth_headers (Optional[List[Dict[str, str]]]): List of custom headers for authentication.
4410 auth_value (Optional[str]): Alias for authentication value, used for better access post-validation.
4411 tags (List[str]): Tags for categorizing the agent.
4412 team_id (Optional[str]): Team ID for resource organization.
4413 visibility (str): Visibility level ("private", "team", "public").
4414 """
4416 model_config = ConfigDict(str_strip_whitespace=True)
4418 name: str = Field(..., description="Unique name for the agent")
4419 slug: Optional[str] = Field(None, description="Optional slug for the agent (auto-generated if not provided)")
4420 description: Optional[str] = Field(None, description="Agent description")
4421 endpoint_url: str = Field(..., description="URL endpoint for the agent")
4422 agent_type: str = Field(default="generic", description="Type of agent (e.g., 'openai', 'anthropic', 'custom')")
4423 protocol_version: str = Field(default="1.0", description="A2A protocol version supported")
4424 capabilities: Dict[str, Any] = Field(default_factory=dict, description="Agent capabilities and features")
4425 config: Dict[str, Any] = Field(default_factory=dict, description="Agent-specific configuration parameters")
4426 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4427 # Authorizations
4428 auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, authheaders, oauth, query_param, or none")
4429 # Fields for various types of authentication
4430 auth_username: Optional[str] = Field(None, description="Username for basic authentication")
4431 auth_password: Optional[str] = Field(None, description="Password for basic authentication")
4432 auth_token: Optional[str] = Field(None, description="Token for bearer authentication")
4433 auth_header_key: Optional[str] = Field(None, description="Key for custom headers authentication")
4434 auth_header_value: Optional[str] = Field(None, description="Value for custom headers authentication")
4435 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
4437 # OAuth 2.0 configuration
4438 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4440 # Query Parameter Authentication (CWE-598 security concern - use only when required by upstream)
4441 auth_query_param_key: Optional[str] = Field(
4442 None,
4443 description="Query parameter name for authentication (e.g., 'tavilyApiKey')",
4444 )
4445 auth_query_param_value: Optional[SecretStr] = Field(
4446 None,
4447 description="Query parameter value (API key) - will be encrypted at rest",
4448 )
4450 # Adding `auth_value` as an alias for better access post-validation
4451 auth_value: Optional[str] = Field(None, validate_default=True)
4452 tags: List[str] = Field(default_factory=list, description="Tags for categorizing the agent")
4454 # Team scoping fields
4455 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4456 owner_email: Optional[str] = Field(None, description="Email of the agent owner")
4457 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
4459 @field_validator("tags")
4460 @classmethod
4461 def validate_tags(cls, v: Optional[List[str]]) -> List[str]:
4462 """Validate and normalize tags.
4464 Args:
4465 v: Optional list of tag strings to validate
4467 Returns:
4468 List of validated tag strings
4469 """
4470 return validate_tags_field(v)
4472 @field_validator("name")
4473 @classmethod
4474 def validate_name(cls, v: str) -> str:
4475 """Validate agent name
4477 Args:
4478 v (str): Value to validate
4480 Returns:
4481 str: Value if validated as safe
4482 """
4483 return SecurityValidator.validate_name(v, "A2A Agent name")
4485 @field_validator("endpoint_url")
4486 @classmethod
4487 def validate_endpoint_url(cls, v: str) -> str:
4488 """Validate agent endpoint URL
4490 Args:
4491 v (str): Value to validate
4493 Returns:
4494 str: Value if validated as safe
4495 """
4496 return validate_core_url(v, "Agent endpoint URL")
4498 @field_validator("description")
4499 @classmethod
4500 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4501 """Ensure descriptions display safely, truncate if too long
4503 Args:
4504 v (str): Value to validate
4506 Returns:
4507 str: Value if validated as safe and truncated if too long
4509 Raises:
4510 ValueError: When value is unsafe
4512 Examples:
4513 >>> from mcpgateway.schemas import A2AAgentCreate
4514 >>> A2AAgentCreate.validate_description('A safe description')
4515 'A safe description'
4516 >>> A2AAgentCreate.validate_description(None) # Test None case
4517 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4518 >>> truncated = A2AAgentCreate.validate_description(long_desc)
4519 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4520 0
4521 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4522 True
4523 """
4524 if v is None:
4525 return v
4526 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4527 # Truncate the description to the maximum allowed length
4528 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4529 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4530 return SecurityValidator.sanitize_display_text(truncated, "Description")
4531 return SecurityValidator.sanitize_display_text(v, "Description")
4533 @field_validator("capabilities", "config")
4534 @classmethod
4535 def validate_json_fields(cls, v: Dict[str, Any]) -> Dict[str, Any]:
4536 """Validate JSON structure depth
4538 Args:
4539 v (dict): Value to validate
4541 Returns:
4542 dict: Value if validated as safe
4543 """
4544 SecurityValidator.validate_json_depth(v)
4545 return v
4547 @field_validator("team_id")
4548 @classmethod
4549 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4550 """Validate team ID format.
4552 Args:
4553 v: Team ID to validate
4555 Returns:
4556 Validated team ID
4557 """
4558 if v is not None:
4559 return SecurityValidator.validate_uuid(v, "team_id")
4560 return v
4562 @field_validator("auth_value", mode="before")
4563 @classmethod
4564 def create_auth_value(cls, v, info):
4565 """
4566 This validator will run before the model is fully instantiated (mode="before")
4567 It will process the auth fields based on auth_type and generate auth_value.
4569 Args:
4570 v: Input url
4571 info: ValidationInfo containing auth_type
4573 Returns:
4574 str: Auth value
4575 """
4576 data = info.data
4577 auth_type = data.get("auth_type")
4579 if (auth_type is None) or (auth_type == ""):
4580 return v # If no auth_type is provided, no need to create auth_value
4582 # Process the auth fields and generate auth_value based on auth_type
4583 auth_value = cls._process_auth_fields(info)
4584 return auth_value
4586 @staticmethod
4587 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
4588 """
4589 Processes the input authentication fields and returns the correct auth_value.
4590 This method is called based on the selected auth_type.
4592 Args:
4593 info: ValidationInfo containing auth fields
4595 Returns:
4596 Encoded auth string or None
4598 Raises:
4599 ValueError: If auth_type is invalid
4600 """
4601 data = info.data
4602 auth_type = data.get("auth_type")
4604 if auth_type == "basic":
4605 # For basic authentication, both username and password must be present
4606 username = data.get("auth_username")
4607 password = data.get("auth_password")
4609 if not username or not password:
4610 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
4612 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
4613 return encode_auth({"Authorization": f"Basic {creds}"})
4615 if auth_type == "bearer":
4616 # For bearer authentication, only token is required
4617 token = data.get("auth_token")
4619 if not token:
4620 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
4622 return encode_auth({"Authorization": f"Bearer {token}"})
4624 if auth_type == "oauth":
4625 # For OAuth authentication, we don't encode anything here
4626 # The OAuth configuration is handled separately in the oauth_config field
4627 # This method is only called for traditional auth types
4628 return None
4630 if auth_type == "authheaders":
4631 # Support both new multi-headers format and legacy single header format
4632 auth_headers = data.get("auth_headers")
4633 if auth_headers and isinstance(auth_headers, list):
4634 # New multi-headers format with enhanced validation
4635 header_dict = {}
4636 duplicate_keys = set()
4638 for header in auth_headers:
4639 if not isinstance(header, dict):
4640 continue
4642 key = header.get("key")
4643 value = header.get("value", "")
4645 # Skip headers without keys
4646 if not key:
4647 continue
4649 # Track duplicate keys (last value wins)
4650 if key in header_dict:
4651 duplicate_keys.add(key)
4653 # Validate header key format (basic HTTP header validation)
4654 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
4655 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
4657 # Store header (empty values are allowed)
4658 header_dict[key] = value
4660 # Ensure at least one valid header
4661 if not header_dict:
4662 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
4664 # Warn about duplicate keys (optional - could log this instead)
4665 if duplicate_keys:
4666 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
4668 # Check for excessive headers (prevent abuse)
4669 if len(header_dict) > 100:
4670 raise ValueError("Maximum of 100 headers allowed per gateway.")
4672 return encode_auth(header_dict)
4674 # Legacy single header format (backward compatibility)
4675 header_key = data.get("auth_header_key")
4676 header_value = data.get("auth_header_value")
4678 if not header_key or not header_value:
4679 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
4681 return encode_auth({header_key: header_value})
4683 if auth_type == "one_time_auth":
4684 # One-time auth does not require encoding here
4685 return None
4687 if auth_type == "query_param":
4688 # Query param auth doesn't use auth_value field
4689 # Validation is handled by model_validator
4690 return None
4692 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
4694 @model_validator(mode="after")
4695 def validate_query_param_auth(self) -> "A2AAgentCreate":
4696 """Validate query parameter authentication configuration.
4698 Returns:
4699 A2AAgentCreate: The validated instance.
4701 Raises:
4702 ValueError: If query param auth is disabled or host is not in allowlist.
4703 """
4704 if self.auth_type != "query_param":
4705 return self
4707 # Check feature flag
4708 if not settings.insecure_allow_queryparam_auth:
4709 raise ValueError("Query parameter authentication is disabled. " + "Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable. " + "WARNING: API keys in URLs may appear in proxy logs.")
4711 # Check required fields
4712 if not self.auth_query_param_key:
4713 raise ValueError("auth_query_param_key is required when auth_type is 'query_param'")
4714 if not self.auth_query_param_value:
4715 raise ValueError("auth_query_param_value is required when auth_type is 'query_param'")
4717 # Check host allowlist (if configured)
4718 if settings.insecure_queryparam_auth_allowed_hosts:
4719 parsed = urlparse(str(self.endpoint_url))
4720 # Extract hostname properly (handles IPv6, ports, userinfo)
4721 hostname = parsed.hostname or parsed.netloc.split("@")[-1].split(":")[0]
4722 hostname_lower = hostname.lower()
4724 if hostname_lower not in settings.insecure_queryparam_auth_allowed_hosts:
4725 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
4726 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query parameter auth. Allowed hosts: {allowed}")
4728 return self
4731class A2AAgentUpdate(BaseModelWithConfigDict):
4732 """Schema for updating an existing A2A agent.
4734 Similar to A2AAgentCreate but all fields are optional to allow partial updates.
4735 """
4737 name: Optional[str] = Field(None, description="Unique name for the agent")
4738 description: Optional[str] = Field(None, description="Agent description")
4739 endpoint_url: Optional[str] = Field(None, description="URL endpoint for the agent")
4740 agent_type: Optional[str] = Field(None, description="Type of agent")
4741 protocol_version: Optional[str] = Field(None, description="A2A protocol version supported")
4742 capabilities: Optional[Dict[str, Any]] = Field(None, description="Agent capabilities and features")
4743 config: Optional[Dict[str, Any]] = Field(None, description="Agent-specific configuration parameters")
4744 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
4745 auth_type: Optional[str] = Field(None, description="Type of authentication")
4746 auth_username: Optional[str] = Field(None, description="username for basic authentication")
4747 auth_password: Optional[str] = Field(None, description="password for basic authentication")
4748 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
4749 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
4750 auth_header_value: Optional[str] = Field(None, description="value for custom headers authentication")
4751 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
4753 # Adding `auth_value` as an alias for better access post-validation
4754 auth_value: Optional[str] = Field(None, validate_default=True)
4756 # OAuth 2.0 configuration
4757 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
4759 # Query Parameter Authentication (CWE-598 security concern - use only when required by upstream)
4760 auth_query_param_key: Optional[str] = Field(
4761 None,
4762 description="Query parameter name for authentication (e.g., 'tavilyApiKey')",
4763 )
4764 auth_query_param_value: Optional[SecretStr] = Field(
4765 None,
4766 description="Query parameter value (API key) - will be encrypted at rest",
4767 )
4769 tags: Optional[List[str]] = Field(None, description="Tags for categorizing the agent")
4771 # Team scoping fields
4772 team_id: Optional[str] = Field(None, description="Team ID for resource organization")
4773 owner_email: Optional[str] = Field(None, description="Email of the agent owner")
4774 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
4776 @field_validator("tags")
4777 @classmethod
4778 def validate_tags(cls, v: Optional[List[str]]) -> Optional[List[str]]:
4779 """Validate and normalize tags.
4781 Args:
4782 v: Optional list of tag strings to validate
4784 Returns:
4785 List of validated tag strings or None if input is None
4786 """
4787 if v is None:
4788 return None
4789 return validate_tags_field(v)
4791 @field_validator("name")
4792 @classmethod
4793 def validate_name(cls, v: str) -> str:
4794 """Validate agent name
4796 Args:
4797 v (str): Value to validate
4799 Returns:
4800 str: Value if validated as safe
4801 """
4802 return SecurityValidator.validate_name(v, "A2A Agent name")
4804 @field_validator("endpoint_url")
4805 @classmethod
4806 def validate_endpoint_url(cls, v: str) -> str:
4807 """Validate agent endpoint URL
4809 Args:
4810 v (str): Value to validate
4812 Returns:
4813 str: Value if validated as safe
4814 """
4815 return validate_core_url(v, "Agent endpoint URL")
4817 @field_validator("description")
4818 @classmethod
4819 def validate_description(cls, v: Optional[str]) -> Optional[str]:
4820 """Ensure descriptions display safely, truncate if too long
4822 Args:
4823 v (str): Value to validate
4825 Returns:
4826 str: Value if validated as safe and truncated if too long
4828 Raises:
4829 ValueError: When value is unsafe
4831 Examples:
4832 >>> from mcpgateway.schemas import A2AAgentUpdate
4833 >>> A2AAgentUpdate.validate_description('A safe description')
4834 'A safe description'
4835 >>> A2AAgentUpdate.validate_description(None) # Test None case
4836 >>> long_desc = 'x' * SecurityValidator.MAX_DESCRIPTION_LENGTH
4837 >>> truncated = A2AAgentUpdate.validate_description(long_desc)
4838 >>> len(truncated) - SecurityValidator.MAX_DESCRIPTION_LENGTH
4839 0
4840 >>> truncated == long_desc[:SecurityValidator.MAX_DESCRIPTION_LENGTH]
4841 True
4842 """
4843 if v is None:
4844 return v
4845 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
4846 # Truncate the description to the maximum allowed length
4847 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
4848 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
4849 return SecurityValidator.sanitize_display_text(truncated, "Description")
4850 return SecurityValidator.sanitize_display_text(v, "Description")
4852 @field_validator("capabilities", "config")
4853 @classmethod
4854 def validate_json_fields(cls, v: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
4855 """Validate JSON structure depth
4857 Args:
4858 v (dict): Value to validate
4860 Returns:
4861 dict: Value if validated as safe
4862 """
4863 if v is None:
4864 return v
4865 SecurityValidator.validate_json_depth(v)
4866 return v
4868 @field_validator("team_id")
4869 @classmethod
4870 def validate_team_id(cls, v: Optional[str]) -> Optional[str]:
4871 """Validate team ID format.
4873 Args:
4874 v: Team ID to validate
4876 Returns:
4877 Validated team ID
4878 """
4879 if v is not None:
4880 return SecurityValidator.validate_uuid(v, "team_id")
4881 return v
4883 @field_validator("auth_value", mode="before")
4884 @classmethod
4885 def create_auth_value(cls, v, info):
4886 """
4887 This validator will run before the model is fully instantiated (mode="before")
4888 It will process the auth fields based on auth_type and generate auth_value.
4890 Args:
4891 v: Input URL
4892 info: ValidationInfo containing auth_type
4894 Returns:
4895 str: Auth value or URL
4896 """
4897 data = info.data
4898 auth_type = data.get("auth_type")
4900 if (auth_type is None) or (auth_type == ""):
4901 return v # If no auth_type is provided, no need to create auth_value
4903 # Process the auth fields and generate auth_value based on auth_type
4904 auth_value = cls._process_auth_fields(info)
4905 return auth_value
4907 @staticmethod
4908 def _process_auth_fields(info: ValidationInfo) -> Optional[str]:
4909 """
4910 Processes the input authentication fields and returns the correct auth_value.
4911 This method is called based on the selected auth_type.
4913 Args:
4914 info: ValidationInfo containing auth fields
4916 Returns:
4917 Encoded auth string or None
4919 Raises:
4920 ValueError: If auth type is invalid
4921 """
4923 data = info.data
4924 auth_type = data.get("auth_type")
4926 if auth_type == "basic":
4927 # For basic authentication, both username and password must be present
4928 username = data.get("auth_username")
4929 password = data.get("auth_password")
4930 if not username or not password:
4931 raise ValueError("For 'basic' auth, both 'auth_username' and 'auth_password' must be provided.")
4933 creds = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode()
4934 return encode_auth({"Authorization": f"Basic {creds}"})
4936 if auth_type == "bearer":
4937 # For bearer authentication, only token is required
4938 token = data.get("auth_token")
4940 if not token:
4941 raise ValueError("For 'bearer' auth, 'auth_token' must be provided.")
4943 return encode_auth({"Authorization": f"Bearer {token}"})
4945 if auth_type == "oauth":
4946 # For OAuth authentication, we don't encode anything here
4947 # The OAuth configuration is handled separately in the oauth_config field
4948 # This method is only called for traditional auth types
4949 return None
4951 if auth_type == "authheaders":
4952 # Support both new multi-headers format and legacy single header format
4953 auth_headers = data.get("auth_headers")
4954 if auth_headers and isinstance(auth_headers, list):
4955 # New multi-headers format with enhanced validation
4956 header_dict = {}
4957 duplicate_keys = set()
4959 for header in auth_headers:
4960 if not isinstance(header, dict):
4961 continue
4963 key = header.get("key")
4964 value = header.get("value", "")
4966 # Skip headers without keys
4967 if not key:
4968 continue
4970 # Track duplicate keys (last value wins)
4971 if key in header_dict:
4972 duplicate_keys.add(key)
4974 # Validate header key format (basic HTTP header validation)
4975 if not all(c.isalnum() or c in "-_" for c in key.replace(" ", "")):
4976 raise ValueError(f"Invalid header key format: '{key}'. Header keys should contain only alphanumeric characters, hyphens, and underscores.")
4978 # Store header (empty values are allowed)
4979 header_dict[key] = value
4981 # Ensure at least one valid header
4982 if not header_dict:
4983 raise ValueError("For 'authheaders' auth, at least one valid header with a key must be provided.")
4985 # Warn about duplicate keys (optional - could log this instead)
4986 if duplicate_keys:
4987 logger.warning(f"Duplicate header keys detected (last value used): {', '.join(duplicate_keys)}")
4989 # Check for excessive headers (prevent abuse)
4990 if len(header_dict) > 100:
4991 raise ValueError("Maximum of 100 headers allowed per gateway.")
4993 return encode_auth(header_dict)
4995 # Legacy single header format (backward compatibility)
4996 header_key = data.get("auth_header_key")
4997 header_value = data.get("auth_header_value")
4999 if not header_key or not header_value:
5000 raise ValueError("For 'authheaders' auth, either 'auth_headers' list or both 'auth_header_key' and 'auth_header_value' must be provided.")
5002 return encode_auth({header_key: header_value})
5004 if auth_type == "one_time_auth":
5005 # One-time auth does not require encoding here
5006 return None
5008 if auth_type == "query_param":
5009 # Query param auth doesn't use auth_value field
5010 # Validation is handled by model_validator
5011 return None
5013 raise ValueError("Invalid 'auth_type'. Must be one of: basic, bearer, oauth, authheaders, or query_param.")
5015 @model_validator(mode="after")
5016 def validate_query_param_auth(self) -> "A2AAgentUpdate":
5017 """Validate query parameter authentication configuration.
5019 NOTE: This only runs when auth_type is explicitly set to "query_param".
5020 Service-layer enforcement handles the case where auth_type is omitted
5021 but the existing agent uses query_param auth.
5023 Returns:
5024 A2AAgentUpdate: The validated instance.
5026 Raises:
5027 ValueError: If required fields are missing when setting query_param auth.
5028 """
5029 if self.auth_type == "query_param":
5030 # Validate fields are provided when explicitly setting query_param auth
5031 # Feature flag/allowlist check happens in service layer (has access to existing agent)
5032 if not self.auth_query_param_key:
5033 raise ValueError("auth_query_param_key is required when setting auth_type to 'query_param'")
5034 if not self.auth_query_param_value:
5035 raise ValueError("auth_query_param_value is required when setting auth_type to 'query_param'")
5037 return self
5040class A2AAgentRead(BaseModelWithConfigDict):
5041 """Schema for reading A2A agent information.
5043 Includes all agent fields plus:
5044 - Database ID
5045 - Slug
5046 - Creation/update timestamps
5047 - Enabled/reachable status
5048 - Metrics
5049 - Authentication type: basic, bearer, authheaders, oauth, query_param
5050 - Authentication value: username/password or token or custom headers
5051 - OAuth configuration for OAuth 2.0 authentication
5052 - Query parameter authentication (key name and masked value)
5054 Auto Populated fields:
5055 - Authentication username: for basic auth
5056 - Authentication password: for basic auth
5057 - Authentication token: for bearer auth
5058 - Authentication header key: for authheaders auth
5059 - Authentication header value: for authheaders auth
5060 - Query param key: for query_param auth
5061 - Query param value (masked): for query_param auth
5062 """
5064 id: Optional[str] = Field(None, description="Unique ID of the a2a agent")
5065 name: str = Field(..., description="Unique name for the a2a agent")
5066 slug: Optional[str] = Field(None, description="Slug for a2a agent endpoint URL")
5067 description: Optional[str] = Field(None, description="a2a agent description")
5068 endpoint_url: str = Field(..., description="a2a agent endpoint URL")
5069 agent_type: str
5070 protocol_version: str
5071 capabilities: Dict[str, Any]
5072 config: Dict[str, Any]
5073 enabled: bool
5074 reachable: bool
5075 created_at: datetime
5076 updated_at: datetime
5077 last_interaction: Optional[datetime]
5078 tags: List[Dict[str, str]] = Field(default_factory=list, description="Tags for categorizing the agent")
5079 metrics: Optional[A2AAgentMetrics] = Field(None, description="Agent metrics (may be None in list operations)")
5080 passthrough_headers: Optional[List[str]] = Field(default=None, description="List of headers allowed to be passed through from client to target")
5081 # Authorizations
5082 auth_type: Optional[str] = Field(None, description="auth_type: basic, bearer, authheaders, oauth, query_param, or None")
5083 auth_value: Optional[str] = Field(None, description="auth value: username/password or token or custom headers")
5085 # OAuth 2.0 configuration
5086 oauth_config: Optional[Dict[str, Any]] = Field(None, description="OAuth 2.0 configuration including grant_type, client_id, encrypted client_secret, URLs, and scopes")
5088 # auth_value will populate the following fields
5089 auth_username: Optional[str] = Field(None, description="username for basic authentication")
5090 auth_password: Optional[str] = Field(None, description="password for basic authentication")
5091 auth_token: Optional[str] = Field(None, description="token for bearer authentication")
5092 auth_header_key: Optional[str] = Field(None, description="key for custom headers authentication")
5093 auth_header_value: Optional[str] = Field(None, description="vallue for custom headers authentication")
5094 auth_headers: Optional[List[Dict[str, str]]] = Field(None, description="List of custom headers for authentication")
5096 # Query Parameter Authentication (masked for security)
5097 auth_query_param_key: Optional[str] = Field(
5098 None,
5099 description="Query parameter name for authentication",
5100 )
5101 auth_query_param_value_masked: Optional[str] = Field(
5102 None,
5103 description="Masked query parameter value (actual value is encrypted at rest)",
5104 )
5106 # Comprehensive metadata for audit tracking
5107 created_by: Optional[str] = Field(None, description="Username who created this entity")
5108 created_from_ip: Optional[str] = Field(None, description="IP address of creator")
5109 created_via: Optional[str] = Field(None, description="Creation method: ui|api|import|federation")
5110 created_user_agent: Optional[str] = Field(None, description="User agent of creation request")
5112 modified_by: Optional[str] = Field(None, description="Username who last modified this entity")
5113 modified_from_ip: Optional[str] = Field(None, description="IP address of last modifier")
5114 modified_via: Optional[str] = Field(None, description="Modification method")
5115 modified_user_agent: Optional[str] = Field(None, description="User agent of modification request")
5117 import_batch_id: Optional[str] = Field(None, description="UUID of bulk import batch")
5118 federation_source: Optional[str] = Field(None, description="Source gateway for federated entities")
5119 version: Optional[int] = Field(1, description="Entity version for change tracking")
5121 # Team scoping fields
5122 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
5123 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
5124 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
5125 visibility: Optional[Literal["private", "team", "public"]] = Field(default="public", description="Visibility level: private, team, or public")
5127 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
5129 @model_validator(mode="before")
5130 @classmethod
5131 def _mask_query_param_auth(cls, data: Any) -> Any:
5132 """Mask query param auth value when constructing from DB model.
5134 This extracts auth_query_params from the raw data (DB model or dict)
5135 and populates the masked fields for display.
5137 Args:
5138 data: The raw data (dict or ORM model) to process.
5140 Returns:
5141 Any: The processed data with masked query param values.
5142 """
5143 # Handle dict input
5144 if isinstance(data, dict):
5145 auth_query_params = data.get("auth_query_params")
5146 if auth_query_params and isinstance(auth_query_params, dict):
5147 # Extract the param key name and set masked value
5148 first_key = next(iter(auth_query_params.keys()), None)
5149 if first_key:
5150 data["auth_query_param_key"] = first_key
5151 data["auth_query_param_value_masked"] = settings.masked_auth_value
5152 # Handle ORM model input (has auth_query_params attribute)
5153 elif hasattr(data, "auth_query_params"):
5154 auth_query_params = getattr(data, "auth_query_params", None)
5155 if auth_query_params and isinstance(auth_query_params, dict):
5156 # Convert ORM to dict for modification, preserving all attributes
5157 # Start with table columns
5158 data_dict = {c.name: getattr(data, c.name) for c in data.__table__.columns}
5159 # Preserve dynamically added attributes like 'team' (from relationships)
5160 for attr in ["team"]:
5161 if hasattr(data, attr):
5162 data_dict[attr] = getattr(data, attr)
5163 first_key = next(iter(auth_query_params.keys()), None)
5164 if first_key:
5165 data_dict["auth_query_param_key"] = first_key
5166 data_dict["auth_query_param_value_masked"] = settings.masked_auth_value
5167 return data_dict
5168 return data
5170 # This will be the main method to automatically populate fields
5171 @model_validator(mode="after")
5172 def _populate_auth(self) -> Self:
5173 """Populate authentication fields based on auth_type and encoded auth_value.
5175 This post-validation method decodes the stored authentication value and
5176 populates the appropriate authentication fields (username/password, token,
5177 or custom headers) based on the authentication type. It ensures the
5178 authentication data is properly formatted and accessible through individual
5179 fields for display purposes.
5181 The method handles three authentication types:
5182 - basic: Extracts username and password from Authorization header
5183 - bearer: Extracts token from Bearer Authorization header
5184 - authheaders: Extracts custom header key/value pair
5186 Returns:
5187 Self: The instance with populated authentication fields:
5188 - For basic: auth_username and auth_password
5189 - For bearer: auth_token
5190 - For authheaders: auth_header_key and auth_header_value
5192 Raises:
5193 ValueError: If the authentication data is malformed:
5194 - Basic auth missing username or password
5195 - Bearer auth missing or improperly formatted Authorization header
5196 - Custom headers not exactly one key/value pair
5198 Examples:
5199 >>> # Basic auth example
5200 >>> string_bytes = "admin:secret".encode("utf-8")
5201 >>> encoded_auth = base64.urlsafe_b64encode(string_bytes).decode("utf-8")
5202 >>> values = GatewayRead.model_construct(
5203 ... auth_type="basic",
5204 ... auth_value=encode_auth({"Authorization": f"Basic {encoded_auth}"})
5205 ... )
5206 >>> values = A2AAgentRead._populate_auth(values)
5207 >>> values.auth_username
5208 'admin'
5209 >>> values.auth_password
5210 'secret'
5212 >>> # Bearer auth example
5213 >>> values = A2AAgentRead.model_construct(
5214 ... auth_type="bearer",
5215 ... auth_value=encode_auth({"Authorization": "Bearer mytoken123"})
5216 ... )
5217 >>> values = A2AAgentRead._populate_auth(values)
5218 >>> values.auth_token
5219 'mytoken123'
5221 >>> # Custom headers example
5222 >>> values = A2AAgentRead.model_construct(
5223 ... auth_type='authheaders',
5224 ... auth_value=encode_auth({"X-API-Key": "abc123"})
5225 ... )
5226 >>> values = A2AAgentRead._populate_auth(values)
5227 >>> values.auth_header_key
5228 'X-API-Key'
5229 >>> values.auth_header_value
5230 'abc123'
5231 """
5232 auth_type = self.auth_type
5233 auth_value_encoded = self.auth_value
5234 # Skip validation logic if masked value
5235 if auth_value_encoded == settings.masked_auth_value:
5236 return self
5238 # Handle OAuth authentication (no auth_value to decode)
5239 if auth_type == "oauth":
5240 # OAuth gateways don't have traditional auth_value to decode
5241 # They use oauth_config instead
5242 return self
5244 if auth_type == "one_time_auth":
5245 return self
5247 if auth_type == "query_param":
5248 # Query param auth is handled by the before validator
5249 # (auth_query_params from DB model is processed there)
5250 return self
5252 # If no encoded value is present, nothing to populate
5253 if not auth_value_encoded:
5254 return self
5256 auth_value = decode_auth(auth_value_encoded)
5257 if auth_type == "basic":
5258 auth = auth_value.get("Authorization")
5259 if not (isinstance(auth, str) and auth.startswith("Basic ")):
5260 raise ValueError("basic auth requires an Authorization header of the form 'Basic <base64>'")
5261 auth = auth.removeprefix("Basic ")
5262 u, p = base64.urlsafe_b64decode(auth).decode("utf-8").split(":")
5263 if not u or not p:
5264 raise ValueError("basic auth requires both username and password")
5265 self.auth_username, self.auth_password = u, p
5267 elif auth_type == "bearer":
5268 auth = auth_value.get("Authorization")
5269 if not (isinstance(auth, str) and auth.startswith("Bearer ")):
5270 raise ValueError("bearer auth requires an Authorization header of the form 'Bearer <token>'")
5271 self.auth_token = auth.removeprefix("Bearer ")
5273 elif auth_type == "authheaders":
5274 # For backward compatibility, populate first header in key/value fields
5275 if not isinstance(auth_value, dict) or len(auth_value) == 0:
5276 raise ValueError("authheaders requires at least one key/value pair")
5277 # Populate auth_headers list for multi-header support
5278 self.auth_headers = [{"key": str(key), "value": "" if value is None else str(value)} for key, value in auth_value.items()]
5279 # Maintain backward compatibility with single header fields
5280 k, v = next(iter(auth_value.items()))
5281 self.auth_header_key, self.auth_header_value = k, v
5282 return self
5284 def masked(self) -> "A2AAgentRead":
5285 """
5286 Return a masked version of the model instance with sensitive authentication fields hidden.
5288 This method creates a dictionary representation of the model data and replaces sensitive fields
5289 such as `auth_value`, `auth_password`, `auth_token`, and `auth_header_value` with a masked
5290 placeholder value defined in `settings.masked_auth_value`. Masking is only applied if the fields
5291 are present and not already masked.
5293 Args:
5294 None
5296 Returns:
5297 A2AAgentRead: A new instance of the A2AAgentRead model with sensitive authentication-related fields
5298 masked to prevent exposure of sensitive information.
5300 Notes:
5301 - The `auth_value` field is only masked if it exists and its value is different from the masking
5302 placeholder.
5303 - Other sensitive fields (`auth_password`, `auth_token`, `auth_header_value`) are masked if present.
5304 - Fields not related to authentication remain unmodified.
5305 """
5306 masked_data = self.model_dump()
5308 # Only mask if auth_value is present and not already masked
5309 if masked_data.get("auth_value") and masked_data["auth_value"] != settings.masked_auth_value:
5310 masked_data["auth_value"] = settings.masked_auth_value
5312 masked_data["auth_password"] = settings.masked_auth_value if masked_data.get("auth_password") else None
5313 masked_data["auth_token"] = settings.masked_auth_value if masked_data.get("auth_token") else None
5314 masked_data["auth_header_value"] = settings.masked_auth_value if masked_data.get("auth_header_value") else None
5315 if masked_data.get("auth_headers"):
5316 masked_data["auth_headers"] = [
5317 {
5318 "key": header.get("key"),
5319 "value": settings.masked_auth_value if header.get("value") else header.get("value"),
5320 }
5321 for header in masked_data["auth_headers"]
5322 ]
5324 # Mask sensitive keys inside oauth_config (e.g. password, client_secret)
5325 if masked_data.get("oauth_config"):
5326 masked_data["oauth_config"] = _mask_oauth_config(masked_data["oauth_config"])
5328 return A2AAgentRead.model_validate(masked_data)
5331class A2AAgentInvocation(BaseModelWithConfigDict):
5332 """Schema for A2A agent invocation requests.
5334 Contains:
5335 - Agent name or ID to invoke
5336 - Parameters for the agent interaction
5337 - Interaction type (query, execute, etc.)
5338 """
5340 agent_name: str = Field(..., description="Name of the A2A agent to invoke")
5341 parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters for agent interaction")
5342 interaction_type: str = Field(default="query", description="Type of interaction (query, execute, etc.)")
5344 @field_validator("agent_name")
5345 @classmethod
5346 def validate_agent_name(cls, v: str) -> str:
5347 """Ensure agent names follow naming conventions
5349 Args:
5350 v (str): Value to validate
5352 Returns:
5353 str: Value if validated as safe
5354 """
5355 return SecurityValidator.validate_name(v, "Agent name")
5357 @field_validator("parameters")
5358 @classmethod
5359 def validate_parameters(cls, v: Dict[str, Any]) -> Dict[str, Any]:
5360 """Validate parameters structure depth to prevent DoS attacks.
5362 Args:
5363 v (dict): Parameters dictionary to validate
5365 Returns:
5366 dict: The validated parameters if within depth limits
5368 Raises:
5369 ValueError: If the parameters exceed the maximum allowed depth
5370 """
5371 SecurityValidator.validate_json_depth(v)
5372 return v
5375# ---------------------------------------------------------------------------
5376# Email-Based Authentication Schemas
5377# ---------------------------------------------------------------------------
5380class EmailLoginRequest(BaseModel):
5381 """Request schema for email login.
5383 Attributes:
5384 email: User's email address
5385 password: User's password
5387 Examples:
5388 >>> request = EmailLoginRequest(email="user@example.com", password="secret123")
5389 >>> request.email
5390 'user@example.com'
5391 >>> request.password
5392 'secret123'
5393 """
5395 model_config = ConfigDict(str_strip_whitespace=True)
5397 email: EmailStr = Field(..., description="User's email address")
5398 password: str = Field(..., min_length=1, description="User's password")
5401class PublicRegistrationRequest(BaseModel):
5402 """Public self-registration request — minimal fields, password required.
5404 Extra fields are rejected (extra="forbid") so clients cannot submit
5405 admin-only fields like is_admin or is_active.
5407 Attributes:
5408 email: User's email address
5409 password: User's password (required, min 8 chars)
5410 full_name: Optional full name for display
5412 Examples:
5413 >>> request = PublicRegistrationRequest(
5414 ... email="new@example.com",
5415 ... password="secure123",
5416 ... full_name="New User"
5417 ... )
5418 >>> request.email
5419 'new@example.com'
5420 >>> request.full_name
5421 'New User'
5422 """
5424 model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
5426 email: EmailStr = Field(..., description="User's email address")
5427 password: str = Field(..., min_length=8, description="User's password")
5428 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5431class AdminCreateUserRequest(BaseModel):
5432 """Admin user creation request — all fields, password required.
5434 Attributes:
5435 email: User's email address
5436 password: User's password (required, min 8 chars)
5437 full_name: Optional full name for display
5438 is_admin: Whether user should have admin privileges (default: False)
5439 is_active: Whether user account is active (default: True)
5440 password_change_required: Whether user must change password on next login (default: False)
5442 Examples:
5443 >>> request = AdminCreateUserRequest(
5444 ... email="new@example.com",
5445 ... password="secure123",
5446 ... full_name="New User"
5447 ... )
5448 >>> request.email
5449 'new@example.com'
5450 >>> request.full_name
5451 'New User'
5452 >>> request.is_admin
5453 False
5454 >>> request.is_active
5455 True
5456 >>> request.password_change_required
5457 False
5458 """
5460 model_config = ConfigDict(str_strip_whitespace=True)
5462 email: EmailStr = Field(..., description="User's email address")
5463 password: str = Field(..., min_length=8, description="User's password")
5464 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5465 is_admin: bool = Field(False, description="Grant admin privileges to user")
5466 is_active: bool = Field(True, description="Whether user account is active")
5467 password_change_required: bool = Field(False, description="Whether user must change password on next login")
5470# Deprecated alias — use AdminCreateUserRequest or PublicRegistrationRequest instead
5471EmailRegistrationRequest = AdminCreateUserRequest
5474class ChangePasswordRequest(BaseModel):
5475 """Request schema for password change.
5477 Attributes:
5478 old_password: Current password for verification
5479 new_password: New password to set
5481 Examples:
5482 >>> request = ChangePasswordRequest(
5483 ... old_password="old_secret",
5484 ... new_password="new_secure_password"
5485 ... )
5486 >>> request.old_password
5487 'old_secret'
5488 >>> request.new_password
5489 'new_secure_password'
5490 """
5492 model_config = ConfigDict(str_strip_whitespace=True)
5494 old_password: str = Field(..., min_length=1, description="Current password")
5495 new_password: str = Field(..., min_length=8, description="New password")
5497 @field_validator("new_password")
5498 @classmethod
5499 def validate_new_password(cls, v: str) -> str:
5500 """Validate new password meets minimum requirements.
5502 Args:
5503 v: New password string to validate
5505 Returns:
5506 str: Validated new password
5508 Raises:
5509 ValueError: If new password doesn't meet requirements
5510 """
5511 if len(v) < 8:
5512 raise ValueError("New password must be at least 8 characters long")
5513 return v
5516class ForgotPasswordRequest(BaseModel):
5517 """Request schema for forgot-password flow."""
5519 model_config = ConfigDict(str_strip_whitespace=True)
5521 email: EmailStr = Field(..., description="Email address for password reset")
5524class ResetPasswordRequest(BaseModel):
5525 """Request schema for completing password reset."""
5527 model_config = ConfigDict(str_strip_whitespace=True)
5529 new_password: str = Field(..., min_length=8, description="New password to set")
5530 confirm_password: str = Field(..., min_length=8, description="Password confirmation")
5532 @model_validator(mode="after")
5533 def validate_password_match(self):
5534 """Ensure password and confirmation are identical.
5536 Returns:
5537 ResetPasswordRequest: Validated request instance.
5539 Raises:
5540 ValueError: If the password and confirmation do not match.
5541 """
5542 if self.new_password != self.confirm_password:
5543 raise ValueError("Passwords do not match")
5544 return self
5547class PasswordResetTokenValidationResponse(BaseModel):
5548 """Response schema for reset-token validation."""
5550 valid: bool = Field(..., description="Whether token is currently valid")
5551 message: str = Field(..., description="Validation status message")
5552 expires_at: Optional[datetime] = Field(None, description="Token expiration timestamp when valid")
5555class EmailUserResponse(BaseModel):
5556 """Response schema for user information.
5558 Attributes:
5559 email: User's email address
5560 full_name: User's full name
5561 is_admin: Whether user has admin privileges
5562 is_active: Whether account is active
5563 auth_provider: Authentication provider used
5564 created_at: Account creation timestamp
5565 last_login: Last successful login timestamp
5566 email_verified: Whether email is verified
5567 password_change_required: Whether user must change password on next login
5569 Examples:
5570 >>> user = EmailUserResponse(
5571 ... email="user@example.com",
5572 ... full_name="Test User",
5573 ... is_admin=False,
5574 ... is_active=True,
5575 ... auth_provider="local",
5576 ... created_at=datetime.now(),
5577 ... last_login=None,
5578 ... email_verified=False
5579 ... )
5580 >>> user.email
5581 'user@example.com'
5582 >>> user.is_admin
5583 False
5584 """
5586 model_config = ConfigDict(from_attributes=True)
5588 email: str = Field(..., description="User's email address")
5589 full_name: Optional[str] = Field(None, description="User's full name")
5590 is_admin: bool = Field(..., description="Whether user has admin privileges")
5591 is_active: bool = Field(..., description="Whether account is active")
5592 auth_provider: str = Field(..., description="Authentication provider")
5593 created_at: datetime = Field(..., description="Account creation timestamp")
5594 last_login: Optional[datetime] = Field(None, description="Last successful login")
5595 email_verified: bool = Field(False, description="Whether email is verified")
5596 password_change_required: bool = Field(False, description="Whether user must change password on next login")
5597 failed_login_attempts: int = Field(0, description="Current failed login attempts counter")
5598 locked_until: Optional[datetime] = Field(None, description="Account lock expiration timestamp")
5599 is_locked: bool = Field(False, description="Whether the account is currently locked")
5601 @classmethod
5602 def from_email_user(cls, user) -> "EmailUserResponse":
5603 """Create response from EmailUser model.
5605 Args:
5606 user: EmailUser model instance
5608 Returns:
5609 EmailUserResponse: Response schema instance
5610 """
5611 is_locked = user.is_account_locked()
5612 locked_until_raw = getattr(user, "locked_until", None)
5613 locked_until = locked_until_raw if isinstance(locked_until_raw, datetime) else None
5614 failed_attempts_raw = getattr(user, "failed_login_attempts", 0)
5615 try:
5616 failed_attempts = int(failed_attempts_raw or 0)
5617 except (TypeError, ValueError):
5618 failed_attempts = 0
5619 return cls(
5620 email=user.email,
5621 full_name=user.full_name,
5622 is_admin=user.is_admin,
5623 is_active=user.is_active,
5624 auth_provider=user.auth_provider,
5625 created_at=user.created_at,
5626 last_login=user.last_login,
5627 email_verified=user.is_email_verified(),
5628 password_change_required=user.password_change_required,
5629 failed_login_attempts=failed_attempts,
5630 locked_until=locked_until,
5631 is_locked=is_locked,
5632 )
5635class AuthenticationResponse(BaseModel):
5636 """Response schema for successful authentication.
5638 Attributes:
5639 access_token: JWT token for API access
5640 token_type: Type of token (always 'bearer')
5641 expires_in: Token expiration time in seconds
5642 user: User information
5644 Examples:
5645 >>> from datetime import datetime
5646 >>> response = AuthenticationResponse(
5647 ... access_token="jwt.token.here",
5648 ... token_type="bearer",
5649 ... expires_in=3600,
5650 ... user=EmailUserResponse(
5651 ... email="user@example.com",
5652 ... full_name="Test User",
5653 ... is_admin=False,
5654 ... is_active=True,
5655 ... auth_provider="local",
5656 ... created_at=datetime.now(),
5657 ... last_login=None,
5658 ... email_verified=False
5659 ... )
5660 ... )
5661 >>> response.token_type
5662 'bearer'
5663 >>> response.user.email
5664 'user@example.com'
5665 """
5667 access_token: str = Field(..., description="JWT access token")
5668 token_type: str = Field(default="bearer", description="Token type")
5669 expires_in: int = Field(..., description="Token expiration in seconds")
5670 user: EmailUserResponse = Field(..., description="User information")
5673class AuthEventResponse(BaseModel):
5674 """Response schema for authentication events.
5676 Attributes:
5677 id: Event ID
5678 timestamp: Event timestamp
5679 user_email: User's email address
5680 event_type: Type of authentication event
5681 success: Whether the event was successful
5682 ip_address: Client IP address
5683 failure_reason: Reason for failure (if applicable)
5685 Examples:
5686 >>> from datetime import datetime
5687 >>> event = AuthEventResponse(
5688 ... id=1,
5689 ... timestamp=datetime.now(),
5690 ... user_email="user@example.com",
5691 ... event_type="login",
5692 ... success=True,
5693 ... ip_address="192.168.1.1",
5694 ... failure_reason=None
5695 ... )
5696 >>> event.event_type
5697 'login'
5698 >>> event.success
5699 True
5700 """
5702 model_config = ConfigDict(from_attributes=True)
5704 id: int = Field(..., description="Event ID")
5705 timestamp: datetime = Field(..., description="Event timestamp")
5706 user_email: Optional[str] = Field(None, description="User's email address")
5707 event_type: str = Field(..., description="Type of authentication event")
5708 success: bool = Field(..., description="Whether the event was successful")
5709 ip_address: Optional[str] = Field(None, description="Client IP address")
5710 failure_reason: Optional[str] = Field(None, description="Reason for failure")
5713class UserListResponse(BaseModel):
5714 """Response schema for user list.
5716 Attributes:
5717 users: List of users
5718 total_count: Total number of users
5719 limit: Request limit
5720 offset: Request offset
5722 Examples:
5723 >>> user_list = UserListResponse(
5724 ... users=[],
5725 ... total_count=0,
5726 ... limit=10,
5727 ... offset=0
5728 ... )
5729 >>> user_list.total_count
5730 0
5731 >>> len(user_list.users)
5732 0
5733 """
5735 users: list[EmailUserResponse] = Field(..., description="List of users")
5736 total_count: int = Field(..., description="Total number of users")
5737 limit: int = Field(..., description="Request limit")
5738 offset: int = Field(..., description="Request offset")
5741class AdminUserUpdateRequest(BaseModel):
5742 """Request schema for admin user updates.
5744 Attributes:
5745 full_name: User's full name
5746 is_admin: Whether user has admin privileges
5747 is_active: Whether account is active
5748 password_change_required: Whether user must change password on next login
5749 password: New password (admin can reset without old password)
5751 Examples:
5752 >>> request = AdminUserUpdateRequest(
5753 ... full_name="Updated Name",
5754 ... is_admin=True,
5755 ... is_active=True
5756 ... )
5757 >>> request.full_name
5758 'Updated Name'
5759 >>> request.is_admin
5760 True
5761 """
5763 model_config = ConfigDict(str_strip_whitespace=True)
5765 full_name: Optional[str] = Field(None, max_length=255, description="User's full name")
5766 is_admin: Optional[bool] = Field(None, description="Whether user has admin privileges")
5767 is_active: Optional[bool] = Field(None, description="Whether account is active")
5768 email_verified: Optional[bool] = Field(None, description="Whether user's email is verified")
5769 password_change_required: Optional[bool] = Field(None, description="Whether user must change password on next login")
5770 password: Optional[str] = Field(None, min_length=8, description="New password (admin reset)")
5773class ErrorResponse(BaseModel):
5774 """Standard error response schema.
5776 Attributes:
5777 error: Error type
5778 message: Human-readable error message
5779 details: Additional error details
5781 Examples:
5782 >>> error = ErrorResponse(
5783 ... error="authentication_failed",
5784 ... message="Invalid email or password",
5785 ... details=None
5786 ... )
5787 >>> error.error
5788 'authentication_failed'
5789 >>> error.message
5790 'Invalid email or password'
5791 """
5793 error: str = Field(..., description="Error type")
5794 message: str = Field(..., description="Human-readable error message")
5795 details: Optional[dict] = Field(None, description="Additional error details")
5798class SuccessResponse(BaseModel):
5799 """Standard success response schema.
5801 Attributes:
5802 success: Whether operation was successful
5803 message: Human-readable success message
5805 Examples:
5806 >>> response = SuccessResponse(
5807 ... success=True,
5808 ... message="Password changed successfully"
5809 ... )
5810 >>> response.success
5811 True
5812 >>> response.message
5813 'Password changed successfully'
5814 """
5816 success: bool = Field(True, description="Operation success status")
5817 message: str = Field(..., description="Human-readable success message")
5820# ---------------------------------------------------------------------------
5821# Team Management Schemas
5822# ---------------------------------------------------------------------------
5825class TeamCreateRequest(BaseModel):
5826 """Schema for creating a new team.
5828 Attributes:
5829 name: Team display name
5830 slug: URL-friendly team identifier (optional, auto-generated if not provided)
5831 description: Team description
5832 visibility: Team visibility level
5833 max_members: Maximum number of members allowed
5835 Examples:
5836 >>> request = TeamCreateRequest(
5837 ... name="Engineering Team",
5838 ... description="Software development team"
5839 ... )
5840 >>> request.name
5841 'Engineering Team'
5842 >>> request.visibility
5843 'private'
5844 >>> request.slug is None
5845 True
5846 >>>
5847 >>> # Test with all fields
5848 >>> full_request = TeamCreateRequest(
5849 ... name="DevOps Team",
5850 ... slug="devops-team",
5851 ... description="Infrastructure and deployment team",
5852 ... visibility="public",
5853 ... max_members=50
5854 ... )
5855 >>> full_request.slug
5856 'devops-team'
5857 >>> full_request.max_members
5858 50
5859 >>> full_request.visibility
5860 'public'
5861 >>>
5862 >>> # Test validation
5863 >>> try:
5864 ... TeamCreateRequest(name=" ", description="test")
5865 ... except ValueError as e:
5866 ... "empty" in str(e).lower()
5867 True
5868 >>>
5869 >>> # Test slug validation
5870 >>> try:
5871 ... TeamCreateRequest(name="Test", slug="Invalid_Slug")
5872 ... except ValueError:
5873 ... True
5874 True
5875 >>>
5876 >>> # Test valid slug patterns
5877 >>> valid_slug = TeamCreateRequest(name="Test", slug="valid-slug-123")
5878 >>> valid_slug.slug
5879 'valid-slug-123'
5880 """
5882 name: str = Field(..., min_length=1, max_length=255, description="Team display name")
5883 slug: Optional[str] = Field(None, min_length=2, max_length=255, pattern="^[a-z0-9-]+$", description="URL-friendly team identifier")
5884 description: Optional[str] = Field(None, max_length=1000, description="Team description")
5885 visibility: Literal["private", "public"] = Field("private", description="Team visibility level")
5886 max_members: Optional[int] = Field(default=None, ge=1, description="Maximum number of team members. If omitted, the team inherits the global MAX_MEMBERS_PER_TEAM setting at check time.")
5888 @field_validator("name")
5889 @classmethod
5890 def validate_name(cls, v: str) -> str:
5891 """Validate team name.
5893 Args:
5894 v: Team name to validate
5896 Returns:
5897 str: Validated and stripped team name
5899 Raises:
5900 ValueError: If team name is empty or contains invalid characters
5901 """
5902 if not v.strip():
5903 raise ValueError("Team name cannot be empty")
5904 v = v.strip()
5905 # Strict validation: only alphanumeric, underscore, period, dash, and spaces
5906 if not re.match(settings.validation_name_pattern, v):
5907 raise ValueError("Team name can only contain letters, numbers, spaces, underscores, periods, and dashes")
5908 SecurityValidator.validate_no_xss(v, "Team name")
5909 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5910 raise ValueError("Team name contains script patterns that may cause security issues")
5911 return v
5913 @field_validator("description")
5914 @classmethod
5915 def validate_description(cls, v: Optional[str]) -> Optional[str]:
5916 """Validate team description for XSS.
5918 Args:
5919 v: Team description to validate
5921 Returns:
5922 Optional[str]: Validated description or None
5924 Raises:
5925 ValueError: If description contains dangerous patterns
5926 """
5927 if v is not None:
5928 v = v.strip()
5929 if v:
5930 SecurityValidator.validate_no_xss(v, "Team description")
5931 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
5932 raise ValueError("Team description contains script patterns that may cause security issues")
5933 return v if v else None
5935 @field_validator("slug")
5936 @classmethod
5937 def validate_slug(cls, v: Optional[str]) -> Optional[str]:
5938 """Validate team slug.
5940 Args:
5941 v: Team slug to validate
5943 Returns:
5944 Optional[str]: Validated and formatted slug or None
5946 Raises:
5947 ValueError: If slug format is invalid
5948 """
5949 if v is None:
5950 return v
5951 v = v.strip().lower()
5952 # Uses precompiled regex for slug validation
5953 if not _SLUG_RE.match(v):
5954 raise ValueError("Slug must contain only lowercase letters, numbers, and hyphens")
5955 if v.startswith("-") or v.endswith("-"):
5956 raise ValueError("Slug cannot start or end with hyphens")
5957 return v
5960class TeamUpdateRequest(BaseModel):
5961 """Schema for updating a team.
5963 Attributes:
5964 name: Team display name
5965 description: Team description
5966 visibility: Team visibility level
5967 max_members: Maximum number of members allowed
5969 Examples:
5970 >>> request = TeamUpdateRequest(
5971 ... name="Updated Engineering Team",
5972 ... description="Updated description"
5973 ... )
5974 >>> request.name
5975 'Updated Engineering Team'
5976 """
5978 name: Optional[str] = Field(None, min_length=1, max_length=255, description="Team display name")
5979 description: Optional[str] = Field(None, max_length=1000, description="Team description")
5980 visibility: Optional[Literal["private", "public"]] = Field(None, description="Team visibility level")
5981 max_members: Optional[int] = Field(
5982 default=None, ge=1, description="Maximum number of team members. Set to null to clear a per-team override and revert to the global MAX_MEMBERS_PER_TEAM setting."
5983 )
5985 @field_validator("name")
5986 @classmethod
5987 def validate_name(cls, v: Optional[str]) -> Optional[str]:
5988 """Validate team name.
5990 Args:
5991 v: Team name to validate
5993 Returns:
5994 Optional[str]: Validated and stripped team name or None
5996 Raises:
5997 ValueError: If team name is empty or contains invalid characters
5998 """
5999 if v is not None:
6000 if not v.strip():
6001 raise ValueError("Team name cannot be empty")
6002 v = v.strip()
6003 # Strict validation: only alphanumeric, underscore, period, dash, and spaces
6004 if not re.match(settings.validation_name_pattern, v):
6005 raise ValueError("Team name can only contain letters, numbers, spaces, underscores, periods, and dashes")
6006 SecurityValidator.validate_no_xss(v, "Team name")
6007 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
6008 raise ValueError("Team name contains script patterns that may cause security issues")
6009 return v
6010 return v
6012 @field_validator("description")
6013 @classmethod
6014 def validate_description(cls, v: Optional[str]) -> Optional[str]:
6015 """Validate team description for XSS.
6017 Args:
6018 v: Team description to validate
6020 Returns:
6021 Optional[str]: Validated description or None
6023 Raises:
6024 ValueError: If description contains dangerous patterns
6025 """
6026 if v is not None:
6027 v = v.strip()
6028 if v:
6029 SecurityValidator.validate_no_xss(v, "Team description")
6030 if re.search(SecurityValidator.DANGEROUS_JS_PATTERN, v, re.IGNORECASE):
6031 raise ValueError("Team description contains script patterns that may cause security issues")
6032 return v if v else None
6035class TeamResponse(BaseModel):
6036 """Schema for team response data.
6038 Attributes:
6039 id: Team UUID
6040 name: Team display name
6041 slug: URL-friendly team identifier
6042 description: Team description
6043 created_by: Email of team creator
6044 is_personal: Whether this is a personal team
6045 visibility: Team visibility level
6046 max_members: Maximum number of members allowed
6047 member_count: Current number of team members
6048 created_at: Team creation timestamp
6049 updated_at: Last update timestamp
6050 is_active: Whether the team is active
6052 Examples:
6053 >>> team = TeamResponse(
6054 ... id="team-123",
6055 ... name="Engineering Team",
6056 ... slug="engineering-team",
6057 ... created_by="admin@example.com",
6058 ... is_personal=False,
6059 ... visibility="private",
6060 ... member_count=5,
6061 ... created_at=datetime.now(timezone.utc),
6062 ... updated_at=datetime.now(timezone.utc),
6063 ... is_active=True
6064 ... )
6065 >>> team.name
6066 'Engineering Team'
6067 """
6069 id: str = Field(..., description="Team UUID")
6070 name: str = Field(..., description="Team display name")
6071 slug: str = Field(..., description="URL-friendly team identifier")
6072 description: Optional[str] = Field(None, description="Team description")
6073 created_by: str = Field(..., description="Email of team creator")
6074 is_personal: bool = Field(..., description="Whether this is a personal team")
6075 visibility: Optional[Literal["private", "public"]] = Field(..., description="Team visibility level")
6076 max_members: Optional[int] = Field(None, description="Per-team member limit override. Null means the team uses the global MAX_MEMBERS_PER_TEAM setting.")
6077 member_count: int = Field(..., description="Current number of team members")
6078 created_at: datetime = Field(..., description="Team creation timestamp")
6079 updated_at: datetime = Field(..., description="Last update timestamp")
6080 is_active: bool = Field(..., description="Whether the team is active")
6083class TeamMemberResponse(BaseModel):
6084 """Schema for team member response data.
6086 Attributes:
6087 id: Member UUID
6088 team_id: Team UUID
6089 user_email: Member email address
6090 role: Member role in the team
6091 joined_at: When the member joined
6092 invited_by: Email of user who invited this member
6093 is_active: Whether the membership is active
6094 grant_source: Origin of the grant (e.g., 'sso', 'manual', 'bootstrap', 'auto')
6096 Examples:
6097 >>> member = TeamMemberResponse(
6098 ... id="member-123",
6099 ... team_id="team-123",
6100 ... user_email="user@example.com",
6101 ... role="member",
6102 ... joined_at=datetime.now(timezone.utc),
6103 ... is_active=True
6104 ... )
6105 >>> member.role
6106 'member'
6107 """
6109 model_config = ConfigDict(from_attributes=True)
6111 id: str = Field(..., description="Member UUID")
6112 team_id: str = Field(..., description="Team UUID")
6113 user_email: str = Field(..., description="Member email address")
6114 role: str = Field(..., description="Member role in the team")
6115 joined_at: datetime = Field(..., description="When the member joined")
6116 invited_by: Optional[str] = Field(None, description="Email of user who invited this member")
6117 is_active: bool = Field(..., description="Whether the membership is active")
6118 grant_source: Optional[str] = Field(None, description="Origin of the grant (e.g., 'sso', 'manual', 'bootstrap', 'auto')")
6121class PaginatedTeamMembersResponse(BaseModel):
6122 """Schema for paginated team member list response.
6124 Attributes:
6125 members: List of team members
6126 next_cursor: Optional cursor for next page of results
6128 Examples:
6129 >>> member1 = TeamMemberResponse(
6130 ... id="member-1",
6131 ... team_id="team-123",
6132 ... user_email="user1@example.com",
6133 ... role="member",
6134 ... joined_at=datetime.now(timezone.utc),
6135 ... is_active=True
6136 ... )
6137 >>> member2 = TeamMemberResponse(
6138 ... id="member-2",
6139 ... team_id="team-123",
6140 ... user_email="user2@example.com",
6141 ... role="member",
6142 ... joined_at=datetime.now(timezone.utc),
6143 ... is_active=True
6144 ... )
6145 >>> response = PaginatedTeamMembersResponse(
6146 ... members=[member1, member2],
6147 ... nextCursor="cursor-token-123"
6148 ... )
6149 >>> len(response.members)
6150 2
6151 """
6153 members: List[TeamMemberResponse] = Field(..., description="List of team members")
6154 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for next page of results")
6157class TeamInviteRequest(BaseModel):
6158 """Schema for inviting users to a team.
6160 Attributes:
6161 email: Email address of user to invite
6162 role: Role to assign to the user
6164 Examples:
6165 >>> invite = TeamInviteRequest(
6166 ... email="newuser@example.com",
6167 ... role="member"
6168 ... )
6169 >>> invite.email
6170 'newuser@example.com'
6171 """
6173 email: EmailStr = Field(..., description="Email address of user to invite")
6174 role: Literal["owner", "member"] = Field("member", description="Role to assign to the user")
6177class TeamInvitationResponse(BaseModel):
6178 """Schema for team invitation response data.
6180 Attributes:
6181 id: Invitation UUID
6182 team_id: Team UUID
6183 team_name: Team display name
6184 email: Email address of invited user
6185 role: Role the user will have when they accept
6186 invited_by: Email of user who sent the invitation
6187 invited_at: When the invitation was sent
6188 expires_at: When the invitation expires
6189 token: Invitation token
6190 is_active: Whether the invitation is active
6191 is_expired: Whether the invitation has expired
6193 Examples:
6194 >>> invitation = TeamInvitationResponse(
6195 ... id="invite-123",
6196 ... team_id="team-123",
6197 ... team_name="Engineering Team",
6198 ... email="newuser@example.com",
6199 ... role="member",
6200 ... invited_by="admin@example.com",
6201 ... invited_at=datetime.now(timezone.utc),
6202 ... expires_at=datetime.now(timezone.utc),
6203 ... token="invitation-token",
6204 ... is_active=True,
6205 ... is_expired=False
6206 ... )
6207 >>> invitation.role
6208 'member'
6209 """
6211 id: str = Field(..., description="Invitation UUID")
6212 team_id: str = Field(..., description="Team UUID")
6213 team_name: str = Field(..., description="Team display name")
6214 email: str = Field(..., description="Email address of invited user")
6215 role: str = Field(..., description="Role the user will have when they accept")
6216 invited_by: str = Field(..., description="Email of user who sent the invitation")
6217 invited_at: datetime = Field(..., description="When the invitation was sent")
6218 expires_at: datetime = Field(..., description="When the invitation expires")
6219 token: str = Field(..., description="Invitation token")
6220 is_active: bool = Field(..., description="Whether the invitation is active")
6221 is_expired: bool = Field(..., description="Whether the invitation has expired")
6224class TeamMemberAddRequest(BaseModel):
6225 """Schema for adding a team member.
6227 Attributes:
6228 email: Email address of user to be added to the team
6229 role: New role for the team member
6230 """
6232 email: EmailStr = Field(..., description="Email address of user to be added to the team")
6233 role: Literal["owner", "member"] = Field(..., description="New role for the team member")
6236class TeamMemberUpdateRequest(BaseModel):
6237 """Schema for updating a team member's role.
6239 Attributes:
6240 role: New role for the team member
6242 Examples:
6243 >>> update = TeamMemberUpdateRequest(role="member")
6244 >>> update.role
6245 'member'
6246 """
6248 role: Literal["owner", "member"] = Field(..., description="New role for the team member")
6251class TeamListResponse(BaseModel):
6252 """Schema for team list response.
6254 Attributes:
6255 teams: List of teams
6256 total: Total number of teams
6258 Examples:
6259 >>> response = TeamListResponse(teams=[], total=0)
6260 >>> response.total
6261 0
6262 """
6264 teams: List[TeamResponse] = Field(..., description="List of teams")
6265 total: int = Field(..., description="Total number of teams")
6268class TeamDiscoveryResponse(BaseModel):
6269 """Schema for public team discovery response.
6271 Provides limited metadata about public teams for discovery purposes.
6273 Attributes:
6274 id: Team ID
6275 name: Team name
6276 description: Team description
6277 member_count: Number of members
6278 created_at: Team creation timestamp
6279 is_joinable: Whether the current user can join this team
6280 """
6282 id: str = Field(..., description="Team ID")
6283 name: str = Field(..., description="Team name")
6284 description: Optional[str] = Field(None, description="Team description")
6285 member_count: int = Field(..., description="Number of team members")
6286 created_at: datetime = Field(..., description="Team creation timestamp")
6287 is_joinable: bool = Field(..., description="Whether the current user can join this team")
6290class TeamJoinRequest(BaseModel):
6291 """Schema for requesting to join a public team.
6293 Attributes:
6294 message: Optional message to team owners
6295 """
6297 message: Optional[str] = Field(None, description="Optional message to team owners", max_length=500)
6300class TeamJoinRequestResponse(BaseModel):
6301 """Schema for team join request response.
6303 Attributes:
6304 id: Join request ID
6305 team_id: Target team ID
6306 team_name: Target team name
6307 user_email: Requesting user email
6308 message: Request message
6309 status: Request status (pending, approved, rejected)
6310 requested_at: Request timestamp
6311 expires_at: Request expiration timestamp
6312 """
6314 id: str = Field(..., description="Join request ID")
6315 team_id: str = Field(..., description="Target team ID")
6316 team_name: str = Field(..., description="Target team name")
6317 user_email: str = Field(..., description="Requesting user email")
6318 message: Optional[str] = Field(None, description="Request message")
6319 status: str = Field(..., description="Request status")
6320 requested_at: datetime = Field(..., description="Request timestamp")
6321 expires_at: datetime = Field(..., description="Request expiration")
6324# API Token Management Schemas
6327class TokenScopeRequest(BaseModel):
6328 """Schema for token scoping configuration.
6330 Attributes:
6331 server_id: Optional server ID limitation
6332 permissions: List of permission scopes
6333 ip_restrictions: List of IP address/CIDR restrictions
6334 time_restrictions: Time-based access limitations
6335 usage_limits: Rate limiting and quota settings
6337 Examples:
6338 >>> scope = TokenScopeRequest(
6339 ... server_id="server-123",
6340 ... permissions=["tools.read", "resources.read"],
6341 ... ip_restrictions=["192.168.1.0/24"]
6342 ... )
6343 >>> scope.server_id
6344 'server-123'
6345 """
6347 server_id: Optional[str] = Field(None, description="Limit token to specific server")
6348 permissions: List[str] = Field(default_factory=list, description="Permission scopes")
6349 ip_restrictions: List[str] = Field(default_factory=list, description="IP address restrictions")
6350 time_restrictions: Dict[str, Any] = Field(default_factory=dict, description="Time-based restrictions")
6351 usage_limits: Dict[str, Any] = Field(default_factory=dict, description="Usage limits and quotas")
6353 @field_validator("ip_restrictions")
6354 @classmethod
6355 def validate_ip_restrictions(cls, v: List[str]) -> List[str]:
6356 """Validate IP addresses and CIDR notation.
6358 Args:
6359 v: List of IP address or CIDR strings to validate.
6361 Returns:
6362 List of validated IP/CIDR strings with whitespace stripped.
6364 Raises:
6365 ValueError: If any IP address or CIDR notation is invalid.
6367 Examples:
6368 >>> TokenScopeRequest.validate_ip_restrictions(["192.168.1.0/24"])
6369 ['192.168.1.0/24']
6370 >>> TokenScopeRequest.validate_ip_restrictions(["10.0.0.1"])
6371 ['10.0.0.1']
6372 """
6373 # Standard
6374 import ipaddress # pylint: disable=import-outside-toplevel
6376 if not v:
6377 return v
6379 validated = []
6380 for ip_str in v:
6381 ip_str = ip_str.strip()
6382 if not ip_str:
6383 continue
6384 try:
6385 # Try parsing as network (CIDR notation)
6386 if "/" in ip_str:
6387 ipaddress.ip_network(ip_str, strict=False)
6388 else:
6389 # Try parsing as single IP address
6390 ipaddress.ip_address(ip_str)
6391 validated.append(ip_str)
6392 except ValueError as e:
6393 raise ValueError(f"Invalid IP address or CIDR notation '{ip_str}': {e}") from e
6394 return validated
6396 @field_validator("permissions")
6397 @classmethod
6398 def validate_permissions(cls, v: List[str]) -> List[str]:
6399 """Validate permission scope format.
6401 Permissions must be in format 'resource.action' or wildcard '*'.
6403 Args:
6404 v: List of permission strings to validate.
6406 Returns:
6407 List of validated permission strings with whitespace stripped.
6409 Raises:
6410 ValueError: If any permission does not match 'resource.action' format or '*'.
6412 Examples:
6413 >>> TokenScopeRequest.validate_permissions(["tools.read", "resources.write"])
6414 ['tools.read', 'resources.write']
6415 >>> TokenScopeRequest.validate_permissions(["*"])
6416 ['*']
6417 """
6418 if not v:
6419 return v
6421 # Permission pattern: resource.action (alphanumeric with underscores)
6422 permission_pattern = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]*\.[a-zA-Z][a-zA-Z0-9_]*$")
6424 validated = []
6425 for perm in v:
6426 perm = perm.strip()
6427 if not perm:
6428 continue
6429 # Allow wildcard
6430 if perm == "*":
6431 validated.append(perm)
6432 continue
6433 if not permission_pattern.match(perm):
6434 raise ValueError(f"Invalid permission format '{perm}'. Use 'resource.action' format (e.g., 'tools.read') or '*' for full access")
6435 validated.append(perm)
6436 return validated
6439class TokenCreateRequest(BaseModel):
6440 """Schema for creating a new API token.
6442 Attributes:
6443 name: Human-readable token name
6444 description: Optional token description
6445 expires_in_days: Optional expiry in days
6446 scope: Optional token scoping configuration
6447 tags: Optional organizational tags
6448 is_active: Token active status (defaults to True)
6450 Examples:
6451 >>> request = TokenCreateRequest(
6452 ... name="Production Access",
6453 ... description="Read-only production access",
6454 ... expires_in_days=30,
6455 ... tags=["production", "readonly"]
6456 ... )
6457 >>> request.name
6458 'Production Access'
6459 """
6461 name: str = Field(..., description="Human-readable token name", min_length=1, max_length=255)
6462 description: Optional[str] = Field(None, description="Token description", max_length=1000)
6463 expires_in_days: Optional[int] = Field(default=None, ge=1, description="Expiry in days (must be >= 1 if specified)")
6464 scope: Optional[TokenScopeRequest] = Field(None, description="Token scoping configuration")
6465 tags: List[str] = Field(default_factory=list, description="Organizational tags")
6466 team_id: Optional[str] = Field(None, description="Team ID for team-scoped tokens")
6467 is_active: bool = Field(default=True, description="Token active status")
6470class TokenUpdateRequest(BaseModel):
6471 """Schema for updating an existing API token.
6473 Attributes:
6474 name: New token name
6475 description: New token description
6476 scope: New token scoping configuration
6477 tags: New organizational tags
6478 is_active: New token active status
6480 Examples:
6481 >>> request = TokenUpdateRequest(
6482 ... name="Updated Token Name",
6483 ... description="Updated description"
6484 ... )
6485 >>> request.name
6486 'Updated Token Name'
6487 """
6489 name: Optional[str] = Field(None, description="New token name", min_length=1, max_length=255)
6490 description: Optional[str] = Field(None, description="New token description", max_length=1000)
6491 scope: Optional[TokenScopeRequest] = Field(None, description="New token scoping configuration")
6492 tags: Optional[List[str]] = Field(None, description="New organizational tags")
6493 is_active: Optional[bool] = Field(None, description="New token active status")
6496class TokenResponse(BaseModel):
6497 """Schema for API token response.
6499 Attributes:
6500 id: Token ID
6501 name: Token name
6502 description: Token description
6503 server_id: Server scope limitation
6504 resource_scopes: Permission scopes
6505 ip_restrictions: IP restrictions
6506 time_restrictions: Time-based restrictions
6507 usage_limits: Usage limits
6508 created_at: Creation timestamp
6509 expires_at: Expiry timestamp
6510 last_used: Last usage timestamp
6511 is_active: Active status
6512 tags: Organizational tags
6514 Examples:
6515 >>> from datetime import datetime
6516 >>> token = TokenResponse(
6517 ... id="token-123",
6518 ... name="Test Token",
6519 ... description="Test description",
6520 ... user_email="test@example.com",
6521 ... server_id=None,
6522 ... resource_scopes=["tools.read"],
6523 ... ip_restrictions=[],
6524 ... time_restrictions={},
6525 ... usage_limits={},
6526 ... created_at=datetime.now(),
6527 ... expires_at=None,
6528 ... last_used=None,
6529 ... is_active=True,
6530 ... tags=[]
6531 ... )
6532 >>> token.name
6533 'Test Token'
6534 """
6536 model_config = ConfigDict(from_attributes=True)
6538 id: str = Field(..., description="Token ID")
6539 name: str = Field(..., description="Token name")
6540 description: Optional[str] = Field(None, description="Token description")
6541 user_email: str = Field(..., description="Token creator's email")
6542 team_id: Optional[str] = Field(None, description="Team ID for team-scoped tokens")
6543 server_id: Optional[str] = Field(None, description="Server scope limitation")
6544 resource_scopes: List[str] = Field(..., description="Permission scopes")
6545 ip_restrictions: List[str] = Field(..., description="IP restrictions")
6546 time_restrictions: Dict[str, Any] = Field(..., description="Time-based restrictions")
6547 usage_limits: Dict[str, Any] = Field(..., description="Usage limits")
6548 created_at: datetime = Field(..., description="Creation timestamp")
6549 expires_at: Optional[datetime] = Field(None, description="Expiry timestamp")
6550 last_used: Optional[datetime] = Field(None, description="Last usage timestamp")
6551 is_active: bool = Field(..., description="Active status")
6552 is_revoked: bool = Field(False, description="Whether token is revoked")
6553 revoked_at: Optional[datetime] = Field(None, description="Revocation timestamp")
6554 revoked_by: Optional[str] = Field(None, description="Email of user who revoked token")
6555 revocation_reason: Optional[str] = Field(None, description="Reason for revocation")
6556 tags: List[str] = Field(..., description="Organizational tags")
6559class TokenCreateResponse(BaseModel):
6560 """Schema for token creation response.
6562 Attributes:
6563 token: Token information
6564 access_token: The actual token string (only returned on creation)
6566 Examples:
6567 >>> from datetime import datetime
6568 >>> token_info = TokenResponse(
6569 ... id="token-123", name="Test Token", description=None,
6570 ... user_email="test@example.com", server_id=None, resource_scopes=[], ip_restrictions=[],
6571 ... time_restrictions={}, usage_limits={}, created_at=datetime.now(),
6572 ... expires_at=None, last_used=None, is_active=True, tags=[]
6573 ... )
6574 >>> response = TokenCreateResponse(
6575 ... token=token_info,
6576 ... access_token="abc123xyz"
6577 ... )
6578 >>> response.access_token
6579 'abc123xyz'
6580 """
6582 token: TokenResponse = Field(..., description="Token information")
6583 access_token: str = Field(..., description="The actual token string")
6586class TokenListResponse(BaseModel):
6587 """Schema for token list response.
6589 Attributes:
6590 tokens: List of tokens
6591 total: Total number of tokens
6592 limit: Request limit
6593 offset: Request offset
6595 Examples:
6596 >>> response = TokenListResponse(
6597 ... tokens=[],
6598 ... total=0,
6599 ... limit=10,
6600 ... offset=0
6601 ... )
6602 >>> response.total
6603 0
6604 """
6606 tokens: List[TokenResponse] = Field(..., description="List of tokens")
6607 total: int = Field(..., description="Total number of tokens")
6608 limit: int = Field(..., description="Request limit")
6609 offset: int = Field(..., description="Request offset")
6612class TokenRevokeRequest(BaseModel):
6613 """Schema for token revocation.
6615 Attributes:
6616 reason: Optional reason for revocation
6618 Examples:
6619 >>> request = TokenRevokeRequest(reason="Security incident")
6620 >>> request.reason
6621 'Security incident'
6622 """
6624 reason: Optional[str] = Field(None, description="Reason for revocation", max_length=255)
6627class TokenUsageStatsResponse(BaseModel):
6628 """Schema for token usage statistics.
6630 Attributes:
6631 period_days: Number of days analyzed
6632 total_requests: Total number of requests
6633 successful_requests: Number of successful requests
6634 blocked_requests: Number of blocked requests
6635 success_rate: Success rate percentage
6636 average_response_time_ms: Average response time
6637 top_endpoints: Most accessed endpoints
6639 Examples:
6640 >>> stats = TokenUsageStatsResponse(
6641 ... period_days=30,
6642 ... total_requests=100,
6643 ... successful_requests=95,
6644 ... blocked_requests=5,
6645 ... success_rate=0.95,
6646 ... average_response_time_ms=150.5,
6647 ... top_endpoints=[("/tools", 50), ("/resources", 30)]
6648 ... )
6649 >>> stats.success_rate
6650 0.95
6651 """
6653 period_days: int = Field(..., description="Number of days analyzed")
6654 total_requests: int = Field(..., description="Total number of requests")
6655 successful_requests: int = Field(..., description="Number of successful requests")
6656 blocked_requests: int = Field(..., description="Number of blocked requests")
6657 success_rate: float = Field(..., description="Success rate (0-1)")
6658 average_response_time_ms: float = Field(..., description="Average response time in milliseconds")
6659 top_endpoints: List[tuple[str, int]] = Field(..., description="Most accessed endpoints with counts")
6662# ===== RBAC Schemas =====
6665class RoleCreateRequest(BaseModel):
6666 """Schema for creating a new role.
6668 Attributes:
6669 name: Unique role name
6670 description: Role description
6671 scope: Role scope (global, team, personal)
6672 permissions: List of permission strings
6673 inherits_from: Optional parent role ID
6674 is_system_role: Whether this is a system role
6676 Examples:
6677 >>> request = RoleCreateRequest(
6678 ... name="team_admin",
6679 ... description="Team administrator with member management",
6680 ... scope="team",
6681 ... permissions=["teams.manage_members", "resources.create"]
6682 ... )
6683 >>> request.name
6684 'team_admin'
6685 """
6687 name: str = Field(..., description="Unique role name", max_length=255)
6688 description: Optional[str] = Field(None, description="Role description")
6689 scope: str = Field(..., description="Role scope", pattern="^(global|team|personal)$")
6690 permissions: List[str] = Field(..., description="List of permission strings")
6691 inherits_from: Optional[str] = Field(None, description="Parent role ID for inheritance")
6692 is_system_role: Optional[bool] = Field(False, description="Whether this is a system role")
6695class RoleUpdateRequest(BaseModel):
6696 """Schema for updating an existing role.
6698 Attributes:
6699 name: Optional new name
6700 description: Optional new description
6701 permissions: Optional new permissions list
6702 inherits_from: Optional new parent role
6703 is_active: Optional active status
6705 Examples:
6706 >>> request = RoleUpdateRequest(
6707 ... description="Updated role description",
6708 ... permissions=["new.permission"]
6709 ... )
6710 >>> request.description
6711 'Updated role description'
6712 """
6714 name: Optional[str] = Field(None, description="Role name", max_length=255)
6715 description: Optional[str] = Field(None, description="Role description")
6716 permissions: Optional[List[str]] = Field(None, description="List of permission strings")
6717 inherits_from: Optional[str] = Field(None, description="Parent role ID for inheritance")
6718 is_active: Optional[bool] = Field(None, description="Whether role is active")
6721class RoleResponse(BaseModel):
6722 """Schema for role response.
6724 Attributes:
6725 id: Role identifier
6726 name: Role name
6727 description: Role description
6728 scope: Role scope
6729 permissions: List of permissions
6730 effective_permissions: All permissions including inherited
6731 inherits_from: Parent role ID
6732 created_by: Creator email
6733 is_system_role: Whether system role
6734 is_active: Whether role is active
6735 created_at: Creation timestamp
6736 updated_at: Update timestamp
6738 Examples:
6739 >>> role = RoleResponse(
6740 ... id="role-123",
6741 ... name="admin",
6742 ... scope="global",
6743 ... permissions=["*"],
6744 ... effective_permissions=["*"],
6745 ... created_by="admin@example.com",
6746 ... is_system_role=True,
6747 ... is_active=True,
6748 ... created_at=datetime.now(),
6749 ... updated_at=datetime.now()
6750 ... )
6751 >>> role.name
6752 'admin'
6753 """
6755 model_config = ConfigDict(from_attributes=True)
6757 id: str = Field(..., description="Role identifier")
6758 name: str = Field(..., description="Role name")
6759 description: Optional[str] = Field(None, description="Role description")
6760 scope: str = Field(..., description="Role scope")
6761 permissions: List[str] = Field(..., description="Direct permissions")
6762 effective_permissions: Optional[List[str]] = Field(None, description="All permissions including inherited")
6763 inherits_from: Optional[str] = Field(None, description="Parent role ID")
6764 created_by: str = Field(..., description="Creator email")
6765 is_system_role: bool = Field(..., description="Whether system role")
6766 is_active: bool = Field(..., description="Whether role is active")
6767 created_at: datetime = Field(..., description="Creation timestamp")
6768 updated_at: datetime = Field(..., description="Update timestamp")
6771class UserRoleAssignRequest(BaseModel):
6772 """Schema for assigning a role to a user.
6774 Attributes:
6775 role_id: Role to assign
6776 scope: Assignment scope
6777 scope_id: Team ID if team-scoped
6778 expires_at: Optional expiration timestamp
6780 Examples:
6781 >>> request = UserRoleAssignRequest(
6782 ... role_id="role-123",
6783 ... scope="team",
6784 ... scope_id="team-456"
6785 ... )
6786 >>> request.scope
6787 'team'
6788 """
6790 role_id: str = Field(..., description="Role ID to assign")
6791 scope: str = Field(..., description="Assignment scope", pattern="^(global|team|personal)$")
6792 scope_id: Optional[str] = Field(None, description="Team ID if team-scoped")
6793 expires_at: Optional[datetime] = Field(None, description="Optional expiration timestamp")
6796class UserRoleResponse(BaseModel):
6797 """Schema for user role assignment response.
6799 Attributes:
6800 id: Assignment identifier
6801 user_email: User email
6802 role_id: Role identifier
6803 role_name: Role name for convenience
6804 scope: Assignment scope
6805 scope_id: Team ID if applicable
6806 granted_by: Who granted the role
6807 granted_at: When role was granted
6808 expires_at: Optional expiration
6809 is_active: Whether assignment is active
6811 Examples:
6812 >>> user_role = UserRoleResponse(
6813 ... id="assignment-123",
6814 ... user_email="user@example.com",
6815 ... role_id="role-456",
6816 ... role_name="team_admin",
6817 ... scope="team",
6818 ... scope_id="team-789",
6819 ... granted_by="admin@example.com",
6820 ... granted_at=datetime.now(),
6821 ... is_active=True
6822 ... )
6823 >>> user_role.scope
6824 'team'
6825 """
6827 model_config = ConfigDict(from_attributes=True)
6829 id: str = Field(..., description="Assignment identifier")
6830 user_email: str = Field(..., description="User email")
6831 role_id: str = Field(..., description="Role identifier")
6832 role_name: Optional[str] = Field(None, description="Role name for convenience")
6833 scope: str = Field(..., description="Assignment scope")
6834 scope_id: Optional[str] = Field(None, description="Team ID if applicable")
6835 granted_by: str = Field(..., description="Who granted the role")
6836 granted_at: datetime = Field(..., description="When role was granted")
6837 expires_at: Optional[datetime] = Field(None, description="Optional expiration")
6838 is_active: bool = Field(..., description="Whether assignment is active")
6839 grant_source: Optional[str] = Field(None, description="Origin of the grant (e.g., 'sso', 'manual', 'bootstrap', 'auto')")
6842class PermissionCheckRequest(BaseModel):
6843 """Schema for permission check request.
6845 Attributes:
6846 user_email: User to check
6847 permission: Permission to verify
6848 resource_type: Optional resource type
6849 resource_id: Optional resource ID
6850 team_id: Optional team context
6852 Examples:
6853 >>> request = PermissionCheckRequest(
6854 ... user_email="user@example.com",
6855 ... permission="tools.create",
6856 ... resource_type="tools"
6857 ... )
6858 >>> request.permission
6859 'tools.create'
6860 """
6862 user_email: str = Field(..., description="User email to check")
6863 permission: str = Field(..., description="Permission to verify")
6864 resource_type: Optional[str] = Field(None, description="Resource type")
6865 resource_id: Optional[str] = Field(None, description="Resource ID")
6866 team_id: Optional[str] = Field(None, description="Team context")
6869class PermissionCheckResponse(BaseModel):
6870 """Schema for permission check response.
6872 Attributes:
6873 user_email: User checked
6874 permission: Permission checked
6875 granted: Whether permission was granted
6876 checked_at: When check was performed
6877 checked_by: Who performed the check
6879 Examples:
6880 >>> response = PermissionCheckResponse(
6881 ... user_email="user@example.com",
6882 ... permission="tools.create",
6883 ... granted=True,
6884 ... checked_at=datetime.now(),
6885 ... checked_by="admin@example.com"
6886 ... )
6887 >>> response.granted
6888 True
6889 """
6891 user_email: str = Field(..., description="User email checked")
6892 permission: str = Field(..., description="Permission checked")
6893 granted: bool = Field(..., description="Whether permission was granted")
6894 checked_at: datetime = Field(..., description="When check was performed")
6895 checked_by: str = Field(..., description="Who performed the check")
6898class PermissionListResponse(BaseModel):
6899 """Schema for available permissions list.
6901 Attributes:
6902 all_permissions: List of all available permissions
6903 permissions_by_resource: Permissions grouped by resource type
6904 total_count: Total number of permissions
6906 Examples:
6907 >>> response = PermissionListResponse(
6908 ... all_permissions=["users.create", "tools.read"],
6909 ... permissions_by_resource={"users": ["users.create"], "tools": ["tools.read"]},
6910 ... total_count=2
6911 ... )
6912 >>> response.total_count
6913 2
6914 """
6916 all_permissions: List[str] = Field(..., description="All available permissions")
6917 permissions_by_resource: Dict[str, List[str]] = Field(..., description="Permissions by resource type")
6918 total_count: int = Field(..., description="Total number of permissions")
6921# ==============================================================================
6922# SSO Authentication Schemas
6923# ==============================================================================
6926class SSOProviderResponse(BaseModelWithConfigDict):
6927 """Response schema for SSO provider information.
6929 Attributes:
6930 id: Provider identifier (e.g., 'github', 'google')
6931 name: Provider name
6932 display_name: Human-readable display name
6933 provider_type: Type of provider ('oauth2', 'oidc')
6934 is_enabled: Whether provider is currently enabled
6935 authorization_url: OAuth authorization URL (optional)
6937 Examples:
6938 >>> provider = SSOProviderResponse(
6939 ... id="github",
6940 ... name="github",
6941 ... display_name="GitHub",
6942 ... provider_type="oauth2",
6943 ... is_enabled=True
6944 ... )
6945 >>> provider.id
6946 'github'
6947 """
6949 id: str = Field(..., description="Provider identifier")
6950 name: str = Field(..., description="Provider name")
6951 display_name: str = Field(..., description="Human-readable display name")
6952 provider_type: Optional[str] = Field(None, description="Provider type (oauth2, oidc)")
6953 is_enabled: Optional[bool] = Field(None, description="Whether provider is enabled")
6954 authorization_url: Optional[str] = Field(None, description="OAuth authorization URL")
6955 jwks_uri: Optional[str] = Field(None, description="OIDC JWKS endpoint for token signature verification")
6958class SSOLoginResponse(BaseModelWithConfigDict):
6959 """Response schema for SSO login initiation.
6961 Attributes:
6962 authorization_url: URL to redirect user for authentication
6963 state: CSRF state parameter for validation
6965 Examples:
6966 >>> login = SSOLoginResponse(
6967 ... authorization_url="https://github.com/login/oauth/authorize?...",
6968 ... state="csrf-token-123"
6969 ... )
6970 >>> "github.com" in login.authorization_url
6971 True
6972 """
6974 authorization_url: str = Field(..., description="OAuth authorization URL")
6975 state: str = Field(..., description="CSRF state parameter")
6978class SSOCallbackResponse(BaseModelWithConfigDict):
6979 """Response schema for SSO authentication callback.
6981 Attributes:
6982 access_token: JWT access token for authenticated user
6983 token_type: Token type (always 'bearer')
6984 expires_in: Token expiration time in seconds
6985 user: User information from SSO provider
6987 Examples:
6988 >>> callback = SSOCallbackResponse(
6989 ... access_token="jwt.token.here",
6990 ... token_type="bearer",
6991 ... expires_in=3600,
6992 ... user={"email": "user@example.com", "full_name": "User"}
6993 ... )
6994 >>> callback.token_type
6995 'bearer'
6996 """
6998 access_token: str = Field(..., description="JWT access token")
6999 token_type: str = Field(default="bearer", description="Token type")
7000 expires_in: int = Field(..., description="Token expiration in seconds")
7001 user: Dict[str, Any] = Field(..., description="User information")
7004# gRPC Service schemas
7007class GrpcServiceCreate(BaseModel):
7008 """Schema for creating a new gRPC service."""
7010 name: str = Field(..., min_length=1, max_length=255, description="Unique name for the gRPC service")
7011 target: str = Field(..., description="gRPC server target address (host:port)")
7012 description: Optional[str] = Field(None, description="Description of the gRPC service")
7013 reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection")
7014 tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connection")
7015 tls_cert_path: Optional[str] = Field(None, description="Path to TLS certificate file")
7016 tls_key_path: Optional[str] = Field(None, description="Path to TLS key file")
7017 grpc_metadata: Dict[str, str] = Field(default_factory=dict, description="gRPC metadata headers")
7018 tags: List[str] = Field(default_factory=list, description="Tags for categorization")
7020 # Team scoping fields
7021 team_id: Optional[str] = Field(None, description="ID of the team that owns this resource")
7022 owner_email: Optional[str] = Field(None, description="Email of the user who owns this resource")
7023 visibility: Literal["private", "team", "public"] = Field(default="public", description="Visibility level: private, team, or public")
7025 @field_validator("name")
7026 @classmethod
7027 def validate_name(cls, v: str) -> str:
7028 """Validate service name.
7030 Args:
7031 v: Service name to validate
7033 Returns:
7034 Validated service name
7035 """
7036 return SecurityValidator.validate_name(v, "gRPC service name")
7038 @field_validator("target")
7039 @classmethod
7040 def validate_target(cls, v: str) -> str:
7041 """Validate target address format (host:port).
7043 Args:
7044 v: Target address to validate
7046 Returns:
7047 Validated target address
7049 Raises:
7050 ValueError: If target is not in host:port format
7051 """
7052 if not v or ":" not in v:
7053 raise ValueError("Target must be in host:port format")
7054 return v
7056 @field_validator("description")
7057 @classmethod
7058 def validate_description(cls, v: Optional[str]) -> Optional[str]:
7059 """Validate description.
7061 Args:
7062 v: Description to validate
7064 Returns:
7065 Validated and sanitized description
7066 """
7067 if v is None:
7068 return None
7069 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
7070 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
7071 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
7072 return SecurityValidator.sanitize_display_text(truncated, "Description")
7073 return SecurityValidator.sanitize_display_text(v, "Description")
7076class GrpcServiceUpdate(BaseModel):
7077 """Schema for updating an existing gRPC service."""
7079 name: Optional[str] = Field(None, min_length=1, max_length=255, description="Service name")
7080 target: Optional[str] = Field(None, description="gRPC server target address")
7081 description: Optional[str] = Field(None, description="Service description")
7082 reflection_enabled: Optional[bool] = Field(None, description="Enable server reflection")
7083 tls_enabled: Optional[bool] = Field(None, description="Enable TLS")
7084 tls_cert_path: Optional[str] = Field(None, description="TLS certificate path")
7085 tls_key_path: Optional[str] = Field(None, description="TLS key path")
7086 grpc_metadata: Optional[Dict[str, str]] = Field(None, description="gRPC metadata headers")
7087 tags: Optional[List[str]] = Field(None, description="Service tags")
7088 visibility: Optional[Literal["private", "team", "public"]] = Field(None, description="Visibility level: private, team, or public")
7090 @field_validator("name")
7091 @classmethod
7092 def validate_name(cls, v: Optional[str]) -> Optional[str]:
7093 """Validate service name.
7095 Args:
7096 v: Service name to validate
7098 Returns:
7099 Validated service name or None
7100 """
7101 if v is None:
7102 return None
7103 return SecurityValidator.validate_name(v, "gRPC service name")
7105 @field_validator("target")
7106 @classmethod
7107 def validate_target(cls, v: Optional[str]) -> Optional[str]:
7108 """Validate target address.
7110 Args:
7111 v: Target address to validate
7113 Returns:
7114 Validated target address or None
7116 Raises:
7117 ValueError: If target is not in host:port format
7118 """
7119 if v is None:
7120 return None
7121 if ":" not in v:
7122 raise ValueError("Target must be in host:port format")
7123 return v
7125 @field_validator("description")
7126 @classmethod
7127 def validate_description(cls, v: Optional[str]) -> Optional[str]:
7128 """Validate description.
7130 Args:
7131 v: Description to validate
7133 Returns:
7134 Validated and sanitized description
7135 """
7136 if v is None:
7137 return None
7138 if len(v) > SecurityValidator.MAX_DESCRIPTION_LENGTH:
7139 truncated = v[: SecurityValidator.MAX_DESCRIPTION_LENGTH]
7140 logger.info(f"Description too long, truncated to {SecurityValidator.MAX_DESCRIPTION_LENGTH} characters.")
7141 return SecurityValidator.sanitize_display_text(truncated, "Description")
7142 return SecurityValidator.sanitize_display_text(v, "Description")
7145class GrpcServiceRead(BaseModel):
7146 """Schema for reading gRPC service information."""
7148 model_config = ConfigDict(from_attributes=True)
7150 id: str = Field(..., description="Unique service identifier")
7151 name: str = Field(..., description="Service name")
7152 slug: str = Field(..., description="URL-safe slug")
7153 target: str = Field(..., description="gRPC server target (host:port)")
7154 description: Optional[str] = Field(None, description="Service description")
7156 # Configuration
7157 reflection_enabled: bool = Field(..., description="Reflection enabled")
7158 tls_enabled: bool = Field(..., description="TLS enabled")
7159 tls_cert_path: Optional[str] = Field(None, description="TLS certificate path")
7160 tls_key_path: Optional[str] = Field(None, description="TLS key path")
7161 grpc_metadata: Dict[str, str] = Field(default_factory=dict, description="gRPC metadata")
7163 # Status
7164 enabled: bool = Field(..., description="Service enabled")
7165 reachable: bool = Field(..., description="Service reachable")
7167 # Discovery
7168 service_count: int = Field(default=0, description="Number of gRPC services discovered")
7169 method_count: int = Field(default=0, description="Number of methods discovered")
7170 discovered_services: Dict[str, Any] = Field(default_factory=dict, description="Discovered service descriptors")
7171 last_reflection: Optional[datetime] = Field(None, description="Last reflection timestamp")
7173 # Tags
7174 tags: List[str] = Field(default_factory=list, description="Service tags")
7176 # Timestamps
7177 created_at: datetime = Field(..., description="Creation timestamp")
7178 updated_at: datetime = Field(..., description="Last update timestamp")
7180 # Team scoping
7181 team_id: Optional[str] = Field(None, description="Team ID")
7182 team: Optional[str] = Field(None, description="Name of the team that owns this resource")
7183 owner_email: Optional[str] = Field(None, description="Owner email")
7184 visibility: Literal["private", "team", "public"] = Field(default="public", description="Visibility level: private, team, or public")
7186 _normalize_visibility = field_validator("visibility", mode="before")(classmethod(lambda cls, v: _coerce_visibility(v)))
7189# Plugin-related schemas
7192class PluginSummary(BaseModel):
7193 """Summary information for a plugin in list views."""
7195 name: str = Field(..., description="Unique plugin name")
7196 description: str = Field("", description="Plugin description")
7197 author: str = Field("Unknown", description="Plugin author")
7198 version: str = Field("0.0.0", description="Plugin version")
7199 mode: str = Field(..., description="Plugin mode: enforce, permissive, or disabled")
7200 priority: int = Field(..., description="Plugin execution priority (lower = higher priority)")
7201 hooks: List[str] = Field(default_factory=list, description="Hook points where plugin executes")
7202 tags: List[str] = Field(default_factory=list, description="Plugin tags for categorization")
7203 status: str = Field(..., description="Plugin status: enabled or disabled")
7204 config_summary: Dict[str, Any] = Field(default_factory=dict, description="Summary of plugin configuration")
7207class PluginDetail(PluginSummary):
7208 """Detailed plugin information including full configuration."""
7210 kind: str = Field("", description="Plugin type or class")
7211 namespace: Optional[str] = Field(None, description="Plugin namespace")
7212 conditions: List[Any] = Field(default_factory=list, description="Conditions for plugin execution")
7213 config: Dict[str, Any] = Field(default_factory=dict, description="Full plugin configuration")
7214 manifest: Optional[Dict[str, Any]] = Field(None, description="Plugin manifest information")
7217class PluginListResponse(BaseModel):
7218 """Response for plugin list endpoint."""
7220 plugins: List[PluginSummary] = Field(..., description="List of plugins")
7221 total: int = Field(..., description="Total number of plugins")
7222 enabled_count: int = Field(0, description="Number of enabled plugins")
7223 disabled_count: int = Field(0, description="Number of disabled plugins")
7226class PluginStatsResponse(BaseModel):
7227 """Response for plugin statistics endpoint."""
7229 total_plugins: int = Field(..., description="Total number of plugins")
7230 enabled_plugins: int = Field(..., description="Number of enabled plugins")
7231 disabled_plugins: int = Field(..., description="Number of disabled plugins")
7232 plugins_by_hook: Dict[str, int] = Field(default_factory=dict, description="Plugin count by hook type")
7233 plugins_by_mode: Dict[str, int] = Field(default_factory=dict, description="Plugin count by mode")
7236# MCP Server Catalog Schemas
7239class CatalogServer(BaseModel):
7240 """Schema for a catalog server entry."""
7242 id: str = Field(..., description="Unique identifier for the catalog server")
7243 name: str = Field(..., description="Display name of the server")
7244 category: str = Field(..., description="Server category (e.g., Project Management, Software Development)")
7245 url: str = Field(..., description="Server endpoint URL")
7246 auth_type: str = Field(..., description="Authentication type (e.g., OAuth2.1, API Key, Open)")
7247 provider: str = Field(..., description="Provider/vendor name")
7248 description: str = Field(..., description="Server description")
7249 requires_api_key: bool = Field(default=False, description="Whether API key is required")
7250 secure: bool = Field(default=False, description="Whether additional security is required")
7251 tags: List[str] = Field(default_factory=list, description="Tags for categorization")
7252 transport: Optional[str] = Field(None, description="Transport type: SSE, STREAMABLEHTTP, or WEBSOCKET")
7253 logo_url: Optional[str] = Field(None, description="URL to server logo/icon")
7254 documentation_url: Optional[str] = Field(None, description="URL to server documentation")
7255 is_registered: bool = Field(default=False, description="Whether server is already registered")
7256 is_available: bool = Field(default=True, description="Whether server is currently available")
7257 requires_oauth_config: bool = Field(default=False, description="Whether server is registered but needs OAuth configuration")
7260class CatalogServerRegisterRequest(BaseModel):
7261 """Request to register a catalog server."""
7263 server_id: str = Field(..., description="Catalog server ID to register")
7264 name: Optional[str] = Field(None, description="Optional custom name for the server")
7265 api_key: Optional[str] = Field(None, description="API key if required")
7266 oauth_credentials: Optional[Dict[str, Any]] = Field(None, description="OAuth credentials if required")
7269class CatalogServerRegisterResponse(BaseModel):
7270 """Response after registering a catalog server."""
7272 success: bool = Field(..., description="Whether registration was successful")
7273 server_id: str = Field(..., description="ID of the registered server in the system")
7274 message: str = Field(..., description="Status message")
7275 error: Optional[str] = Field(None, description="Error message if registration failed")
7276 oauth_required: bool = Field(False, description="Whether OAuth configuration is required before activation")
7279class CatalogServerStatusRequest(BaseModel):
7280 """Request to check catalog server status."""
7282 server_id: str = Field(..., description="Catalog server ID to check")
7285class CatalogServerStatusResponse(BaseModel):
7286 """Response for catalog server status check."""
7288 server_id: str = Field(..., description="Catalog server ID")
7289 is_available: bool = Field(..., description="Whether server is reachable")
7290 is_registered: bool = Field(..., description="Whether server is registered")
7291 last_checked: Optional[datetime] = Field(None, description="Last health check timestamp")
7292 response_time_ms: Optional[float] = Field(None, description="Response time in milliseconds")
7293 error: Optional[str] = Field(None, description="Error message if check failed")
7296class CatalogListRequest(BaseModel):
7297 """Request to list catalog servers."""
7299 category: Optional[str] = Field(None, description="Filter by category")
7300 auth_type: Optional[str] = Field(None, description="Filter by auth type")
7301 provider: Optional[str] = Field(None, description="Filter by provider")
7302 search: Optional[str] = Field(None, description="Search term for name/description")
7303 tags: Optional[List[str]] = Field(None, description="Filter by tags")
7304 show_registered_only: bool = Field(default=False, description="Show only registered servers")
7305 show_available_only: bool = Field(default=True, description="Show only available servers")
7306 limit: int = Field(default=100, description="Maximum number of results")
7307 offset: int = Field(default=0, description="Offset for pagination")
7310class CatalogListResponse(BaseModel):
7311 """Response containing catalog servers."""
7313 servers: List[CatalogServer] = Field(..., description="List of catalog servers")
7314 total: int = Field(..., description="Total number of matching servers")
7315 categories: List[str] = Field(..., description="Available categories")
7316 auth_types: List[str] = Field(..., description="Available auth types")
7317 providers: List[str] = Field(..., description="Available providers")
7318 all_tags: List[str] = Field(default_factory=list, description="All available tags")
7321class CatalogBulkRegisterRequest(BaseModel):
7322 """Request to register multiple catalog servers."""
7324 server_ids: List[str] = Field(..., description="List of catalog server IDs to register")
7325 skip_errors: bool = Field(default=True, description="Continue on error")
7328class CatalogBulkRegisterResponse(BaseModel):
7329 """Response after bulk registration."""
7331 successful: List[str] = Field(..., description="Successfully registered server IDs")
7332 failed: List[Dict[str, str]] = Field(..., description="Failed registrations with error messages")
7333 total_attempted: int = Field(..., description="Total servers attempted")
7334 total_successful: int = Field(..., description="Total successful registrations")
7337# ===================================
7338# Pagination Schemas
7339# ===================================
7342class PaginationMeta(BaseModel):
7343 """Pagination metadata.
7345 Attributes:
7346 page: Current page number (1-indexed)
7347 per_page: Items per page
7348 total_items: Total number of items across all pages
7349 total_pages: Total number of pages
7350 has_next: Whether there is a next page
7351 has_prev: Whether there is a previous page
7352 next_cursor: Cursor for next page (cursor-based only)
7353 prev_cursor: Cursor for previous page (cursor-based only)
7355 Examples:
7356 >>> meta = PaginationMeta(
7357 ... page=2,
7358 ... per_page=50,
7359 ... total_items=250,
7360 ... total_pages=5,
7361 ... has_next=True,
7362 ... has_prev=True
7363 ... )
7364 >>> meta.page
7365 2
7366 >>> meta.total_pages
7367 5
7368 """
7370 page: int = Field(..., description="Current page number (1-indexed)", ge=1)
7371 per_page: int = Field(..., description="Items per page", ge=1)
7372 total_items: int = Field(..., description="Total number of items", ge=0)
7373 total_pages: int = Field(..., description="Total number of pages", ge=0)
7374 has_next: bool = Field(..., description="Whether there is a next page")
7375 has_prev: bool = Field(..., description="Whether there is a previous page")
7376 page_items: Optional[int] = Field(None, description="Actual number of items on current page (after conversion failures)", ge=0)
7377 next_cursor: Optional[str] = Field(None, description="Cursor for next page (cursor-based only)")
7378 prev_cursor: Optional[str] = Field(None, description="Cursor for previous page (cursor-based only)")
7381class PaginationLinks(BaseModel):
7382 """Pagination navigation links.
7384 Attributes:
7385 self: Current page URL
7386 first: First page URL
7387 last: Last page URL
7388 next: Next page URL (None if no next page)
7389 prev: Previous page URL (None if no previous page)
7391 Examples:
7392 >>> links = PaginationLinks(
7393 ... self="/admin/tools?page=2&per_page=50",
7394 ... first="/admin/tools?page=1&per_page=50",
7395 ... last="/admin/tools?page=5&per_page=50",
7396 ... next="/admin/tools?page=3&per_page=50",
7397 ... prev="/admin/tools?page=1&per_page=50"
7398 ... )
7399 >>> links.self
7400 '/admin/tools?page=2&per_page=50'
7401 """
7403 self: str = Field(..., description="Current page URL")
7404 first: str = Field(..., description="First page URL")
7405 last: str = Field(..., description="Last page URL")
7406 next: Optional[str] = Field(None, description="Next page URL")
7407 prev: Optional[str] = Field(None, description="Previous page URL")
7410class PaginatedResponse(BaseModel):
7411 """Generic paginated response wrapper.
7413 This is a container for paginated data with metadata and navigation links.
7414 The actual data is stored in the 'data' field as a list of items.
7416 Attributes:
7417 data: List of items for the current page
7418 pagination: Pagination metadata (counts, page info)
7419 links: Navigation links (optional)
7421 Examples:
7422 >>> from mcpgateway.schemas import ToolRead
7423 >>> response = PaginatedResponse(
7424 ... data=[],
7425 ... pagination=PaginationMeta(
7426 ... page=1, per_page=50, total_items=0,
7427 ... total_pages=0, has_next=False, has_prev=False
7428 ... ),
7429 ... links=None
7430 ... )
7431 >>> response.pagination.page
7432 1
7433 """
7435 data: List[Any] = Field(..., description="List of items")
7436 pagination: PaginationMeta = Field(..., description="Pagination metadata")
7437 links: Optional[PaginationLinks] = Field(None, description="Navigation links")
7440# ============================================================================
7441# Cursor Pagination Response Schemas (for main API endpoints)
7442# ============================================================================
7445class CursorPaginatedToolsResponse(BaseModel):
7446 """Cursor-paginated response for tools list endpoint."""
7448 tools: List["ToolRead"] = Field(..., description="List of tools for this page")
7449 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7452class CursorPaginatedServersResponse(BaseModel):
7453 """Cursor-paginated response for servers list endpoint."""
7455 servers: List["ServerRead"] = Field(..., description="List of servers for this page")
7456 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7459class CursorPaginatedGatewaysResponse(BaseModel):
7460 """Cursor-paginated response for gateways list endpoint."""
7462 gateways: List["GatewayRead"] = Field(..., description="List of gateways for this page")
7463 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7466class CursorPaginatedResourcesResponse(BaseModel):
7467 """Cursor-paginated response for resources list endpoint."""
7469 resources: List["ResourceRead"] = Field(..., description="List of resources for this page")
7470 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7473class CursorPaginatedPromptsResponse(BaseModel):
7474 """Cursor-paginated response for prompts list endpoint."""
7476 prompts: List["PromptRead"] = Field(..., description="List of prompts for this page")
7477 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7480class CursorPaginatedA2AAgentsResponse(BaseModel):
7481 """Cursor-paginated response for A2A agents list endpoint."""
7483 agents: List["A2AAgentRead"] = Field(..., description="List of A2A agents for this page")
7484 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7487class CursorPaginatedTeamsResponse(BaseModel):
7488 """Cursor-paginated response for teams list endpoint."""
7490 teams: List["TeamResponse"] = Field(..., description="List of teams for this page")
7491 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7494class CursorPaginatedUsersResponse(BaseModel):
7495 """Cursor-paginated response for users list endpoint."""
7497 users: List["EmailUserResponse"] = Field(..., description="List of users for this page")
7498 next_cursor: Optional[str] = Field(None, alias="nextCursor", description="Cursor for the next page, null if no more pages")
7501# ============================================================================
7502# Observability Schemas (OpenTelemetry-style traces, spans, events, metrics)
7503# ============================================================================
7506class ObservabilityTraceBase(BaseModel):
7507 """Base schema for observability traces."""
7509 name: str = Field(..., description="Trace name (e.g., 'POST /tools/invoke')")
7510 start_time: datetime = Field(..., description="Trace start timestamp")
7511 end_time: Optional[datetime] = Field(None, description="Trace end timestamp")
7512 duration_ms: Optional[float] = Field(None, description="Total duration in milliseconds")
7513 status: str = Field("unset", description="Trace status (unset, ok, error)")
7514 status_message: Optional[str] = Field(None, description="Status message or error description")
7515 http_method: Optional[str] = Field(None, description="HTTP method")
7516 http_url: Optional[str] = Field(None, description="HTTP URL")
7517 http_status_code: Optional[int] = Field(None, description="HTTP status code")
7518 user_email: Optional[str] = Field(None, description="User email")
7519 user_agent: Optional[str] = Field(None, description="User agent string")
7520 ip_address: Optional[str] = Field(None, description="Client IP address")
7521 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional trace attributes")
7522 resource_attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Resource attributes")
7525class ObservabilityTraceCreate(ObservabilityTraceBase):
7526 """Schema for creating an observability trace."""
7528 trace_id: Optional[str] = Field(None, description="Trace ID (generated if not provided)")
7531class ObservabilityTraceUpdate(BaseModel):
7532 """Schema for updating an observability trace."""
7534 end_time: Optional[datetime] = None
7535 duration_ms: Optional[float] = None
7536 status: Optional[str] = None
7537 status_message: Optional[str] = None
7538 http_status_code: Optional[int] = None
7539 attributes: Optional[Dict[str, Any]] = None
7542class ObservabilityTraceRead(ObservabilityTraceBase):
7543 """Schema for reading an observability trace."""
7545 trace_id: str = Field(..., description="Trace ID")
7546 created_at: datetime = Field(..., description="Creation timestamp")
7548 model_config = {"from_attributes": True}
7551class ObservabilitySpanBase(BaseModel):
7552 """Base schema for observability spans."""
7554 trace_id: str = Field(..., description="Parent trace ID")
7555 parent_span_id: Optional[str] = Field(None, description="Parent span ID (for nested spans)")
7556 name: str = Field(..., description="Span name (e.g., 'database_query', 'tool_invocation')")
7557 kind: str = Field("internal", description="Span kind (internal, server, client, producer, consumer)")
7558 start_time: datetime = Field(..., description="Span start timestamp")
7559 end_time: Optional[datetime] = Field(None, description="Span end timestamp")
7560 duration_ms: Optional[float] = Field(None, description="Span duration in milliseconds")
7561 status: str = Field("unset", description="Span status (unset, ok, error)")
7562 status_message: Optional[str] = Field(None, description="Status message")
7563 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Span attributes")
7564 resource_name: Optional[str] = Field(None, description="Resource name")
7565 resource_type: Optional[str] = Field(None, description="Resource type (tool, resource, prompt, gateway, a2a_agent)")
7566 resource_id: Optional[str] = Field(None, description="Resource ID")
7569class ObservabilitySpanCreate(ObservabilitySpanBase):
7570 """Schema for creating an observability span."""
7572 span_id: Optional[str] = Field(None, description="Span ID (generated if not provided)")
7575class ObservabilitySpanUpdate(BaseModel):
7576 """Schema for updating an observability span."""
7578 end_time: Optional[datetime] = None
7579 duration_ms: Optional[float] = None
7580 status: Optional[str] = None
7581 status_message: Optional[str] = None
7582 attributes: Optional[Dict[str, Any]] = None
7585class ObservabilitySpanRead(ObservabilitySpanBase):
7586 """Schema for reading an observability span."""
7588 span_id: str = Field(..., description="Span ID")
7589 created_at: datetime = Field(..., description="Creation timestamp")
7591 model_config = {"from_attributes": True}
7594class ObservabilityEventBase(BaseModel):
7595 """Base schema for observability events."""
7597 span_id: str = Field(..., description="Parent span ID")
7598 name: str = Field(..., description="Event name (e.g., 'exception', 'log', 'checkpoint')")
7599 timestamp: datetime = Field(..., description="Event timestamp")
7600 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Event attributes")
7601 severity: Optional[str] = Field(None, description="Log severity (debug, info, warning, error, critical)")
7602 message: Optional[str] = Field(None, description="Event message")
7603 exception_type: Optional[str] = Field(None, description="Exception class name")
7604 exception_message: Optional[str] = Field(None, description="Exception message")
7605 exception_stacktrace: Optional[str] = Field(None, description="Exception stacktrace")
7608class ObservabilityEventCreate(ObservabilityEventBase):
7609 """Schema for creating an observability event."""
7612class ObservabilityEventRead(ObservabilityEventBase):
7613 """Schema for reading an observability event."""
7615 id: int = Field(..., description="Event ID")
7616 created_at: datetime = Field(..., description="Creation timestamp")
7618 model_config = {"from_attributes": True}
7621class ObservabilityMetricBase(BaseModel):
7622 """Base schema for observability metrics."""
7624 name: str = Field(..., description="Metric name (e.g., 'http.request.duration', 'tool.invocation.count')")
7625 metric_type: str = Field(..., description="Metric type (counter, gauge, histogram)")
7626 value: float = Field(..., description="Metric value")
7627 timestamp: datetime = Field(..., description="Metric timestamp")
7628 unit: Optional[str] = Field(None, description="Metric unit (ms, count, bytes, etc.)")
7629 attributes: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Metric attributes/labels")
7630 resource_type: Optional[str] = Field(None, description="Resource type")
7631 resource_id: Optional[str] = Field(None, description="Resource ID")
7632 trace_id: Optional[str] = Field(None, description="Associated trace ID")
7635class ObservabilityMetricCreate(ObservabilityMetricBase):
7636 """Schema for creating an observability metric."""
7639class ObservabilityMetricRead(ObservabilityMetricBase):
7640 """Schema for reading an observability metric."""
7642 id: int = Field(..., description="Metric ID")
7643 created_at: datetime = Field(..., description="Creation timestamp")
7645 model_config = {"from_attributes": True}
7648class ObservabilityTraceWithSpans(ObservabilityTraceRead):
7649 """Schema for reading a trace with its spans."""
7651 spans: List[ObservabilitySpanRead] = Field(default_factory=list, description="List of spans in this trace")
7654class ObservabilitySpanWithEvents(ObservabilitySpanRead):
7655 """Schema for reading a span with its events."""
7657 events: List[ObservabilityEventRead] = Field(default_factory=list, description="List of events in this span")
7660# --- Performance Monitoring Schemas ---
7663class WorkerMetrics(BaseModel):
7664 """Metrics for a single worker process."""
7666 pid: int = Field(..., description="Process ID")
7667 cpu_percent: float = Field(..., description="CPU utilization percentage")
7668 memory_rss_mb: float = Field(..., description="Resident Set Size memory in MB")
7669 memory_vms_mb: float = Field(..., description="Virtual Memory Size in MB")
7670 threads: int = Field(..., description="Number of threads")
7671 connections: int = Field(0, description="Number of network connections")
7672 open_fds: Optional[int] = Field(None, description="Number of open file descriptors")
7673 status: str = Field("running", description="Worker status")
7674 create_time: Optional[datetime] = Field(None, description="Worker start time")
7675 uptime_seconds: Optional[int] = Field(None, description="Worker uptime in seconds")
7678class SystemMetricsSchema(BaseModel):
7679 """System-wide resource metrics."""
7681 # CPU metrics
7682 cpu_percent: float = Field(..., description="Total CPU utilization percentage")
7683 cpu_count: int = Field(..., description="Number of logical CPU cores")
7684 cpu_freq_mhz: Optional[float] = Field(None, description="Current CPU frequency in MHz")
7685 load_avg_1m: Optional[float] = Field(None, description="1-minute load average")
7686 load_avg_5m: Optional[float] = Field(None, description="5-minute load average")
7687 load_avg_15m: Optional[float] = Field(None, description="15-minute load average")
7689 # Memory metrics
7690 memory_total_mb: int = Field(..., description="Total physical memory in MB")
7691 memory_used_mb: int = Field(..., description="Used physical memory in MB")
7692 memory_available_mb: int = Field(..., description="Available memory in MB")
7693 memory_percent: float = Field(..., description="Memory utilization percentage")
7694 swap_total_mb: int = Field(0, description="Total swap space in MB")
7695 swap_used_mb: int = Field(0, description="Used swap space in MB")
7697 # Disk metrics
7698 disk_total_gb: float = Field(..., description="Total disk space in GB")
7699 disk_used_gb: float = Field(..., description="Used disk space in GB")
7700 disk_percent: float = Field(..., description="Disk utilization percentage")
7702 # Network metrics
7703 network_bytes_sent: int = Field(0, description="Total network bytes sent")
7704 network_bytes_recv: int = Field(0, description="Total network bytes received")
7705 network_connections: int = Field(0, description="Active network connections")
7707 # Process info
7708 boot_time: Optional[datetime] = Field(None, description="System boot time")
7711class RequestMetricsSchema(BaseModel):
7712 """HTTP request performance metrics."""
7714 requests_total: int = Field(0, description="Total HTTP requests")
7715 requests_per_second: float = Field(0, description="Current request rate")
7716 requests_1xx: int = Field(0, description="1xx informational responses")
7717 requests_2xx: int = Field(0, description="2xx success responses")
7718 requests_3xx: int = Field(0, description="3xx redirect responses")
7719 requests_4xx: int = Field(0, description="4xx client error responses")
7720 requests_5xx: int = Field(0, description="5xx server error responses")
7722 # Response time percentiles
7723 response_time_avg_ms: float = Field(0, description="Average response time in ms")
7724 response_time_p50_ms: float = Field(0, description="50th percentile response time")
7725 response_time_p95_ms: float = Field(0, description="95th percentile response time")
7726 response_time_p99_ms: float = Field(0, description="99th percentile response time")
7728 # Error rate
7729 error_rate: float = Field(0, description="Percentage of 4xx/5xx responses")
7731 # Active requests
7732 active_requests: int = Field(0, description="Currently processing requests")
7735class DatabaseMetricsSchema(BaseModel):
7736 """Database connection pool metrics."""
7738 pool_size: int = Field(0, description="Connection pool size")
7739 connections_in_use: int = Field(0, description="Active connections")
7740 connections_available: int = Field(0, description="Available connections")
7741 overflow: int = Field(0, description="Overflow connections")
7742 query_count: int = Field(0, description="Total queries executed")
7743 query_avg_time_ms: float = Field(0, description="Average query time in ms")
7746class CacheMetricsSchema(BaseModel):
7747 """Redis cache metrics."""
7749 connected: bool = Field(False, description="Redis connection status")
7750 version: Optional[str] = Field(None, description="Redis version")
7751 used_memory_mb: float = Field(0, description="Redis memory usage in MB")
7752 connected_clients: int = Field(0, description="Connected Redis clients")
7753 ops_per_second: int = Field(0, description="Redis operations per second")
7754 hit_rate: float = Field(0, description="Cache hit rate percentage")
7755 keyspace_hits: int = Field(0, description="Successful key lookups")
7756 keyspace_misses: int = Field(0, description="Failed key lookups")
7759class GunicornMetricsSchema(BaseModel):
7760 """Gunicorn server metrics."""
7762 master_pid: Optional[int] = Field(None, description="Master process PID")
7763 workers_total: int = Field(0, description="Total configured workers")
7764 workers_active: int = Field(0, description="Currently active workers")
7765 workers_idle: int = Field(0, description="Idle workers")
7766 max_requests: int = Field(0, description="Max requests before worker restart")
7769class PerformanceSnapshotCreate(BaseModel):
7770 """Schema for creating a performance snapshot."""
7772 host: str = Field(..., description="Hostname")
7773 worker_id: Optional[str] = Field(None, description="Worker identifier")
7774 metrics_json: Dict[str, Any] = Field(..., description="Serialized metrics data")
7777class PerformanceSnapshotRead(BaseModel):
7778 """Schema for reading a performance snapshot."""
7780 id: int = Field(..., description="Snapshot ID")
7781 timestamp: datetime = Field(..., description="Snapshot timestamp")
7782 host: str = Field(..., description="Hostname")
7783 worker_id: Optional[str] = Field(None, description="Worker identifier")
7784 metrics_json: Dict[str, Any] = Field(..., description="Serialized metrics data")
7785 created_at: datetime = Field(..., description="Creation timestamp")
7787 model_config = {"from_attributes": True}
7790class PerformanceAggregateBase(BaseModel):
7791 """Base schema for performance aggregates."""
7793 period_start: datetime = Field(..., description="Start of aggregation period")
7794 period_end: datetime = Field(..., description="End of aggregation period")
7795 period_type: str = Field(..., description="Aggregation type (hourly, daily)")
7796 host: Optional[str] = Field(None, description="Host (None for cluster-wide)")
7798 # Request aggregates
7799 requests_total: int = Field(0, description="Total requests in period")
7800 requests_2xx: int = Field(0, description="2xx responses in period")
7801 requests_4xx: int = Field(0, description="4xx responses in period")
7802 requests_5xx: int = Field(0, description="5xx responses in period")
7803 avg_response_time_ms: float = Field(0, description="Average response time")
7804 p95_response_time_ms: float = Field(0, description="95th percentile response time")
7805 peak_requests_per_second: float = Field(0, description="Peak request rate")
7807 # Resource aggregates
7808 avg_cpu_percent: float = Field(0, description="Average CPU utilization")
7809 avg_memory_percent: float = Field(0, description="Average memory utilization")
7810 peak_cpu_percent: float = Field(0, description="Peak CPU utilization")
7811 peak_memory_percent: float = Field(0, description="Peak memory utilization")
7814class PerformanceAggregateCreate(PerformanceAggregateBase):
7815 """Schema for creating a performance aggregate."""
7818class PerformanceAggregateRead(PerformanceAggregateBase):
7819 """Schema for reading a performance aggregate."""
7821 id: int = Field(..., description="Aggregate ID")
7822 created_at: datetime = Field(..., description="Creation timestamp")
7824 model_config = {"from_attributes": True}
7827class PerformanceDashboard(BaseModel):
7828 """Complete performance dashboard data."""
7830 timestamp: datetime = Field(..., description="Dashboard generation timestamp")
7831 uptime_seconds: int = Field(0, description="Application uptime in seconds")
7832 host: str = Field(..., description="Current hostname")
7834 # Current metrics
7835 system: SystemMetricsSchema = Field(..., description="Current system metrics")
7836 requests: RequestMetricsSchema = Field(..., description="Current request metrics")
7837 database: DatabaseMetricsSchema = Field(..., description="Current database metrics")
7838 cache: CacheMetricsSchema = Field(..., description="Current cache metrics")
7839 gunicorn: GunicornMetricsSchema = Field(..., description="Current Gunicorn metrics")
7840 workers: List[WorkerMetrics] = Field(default_factory=list, description="Per-worker metrics")
7842 # Cluster info (for distributed mode)
7843 cluster_hosts: List[str] = Field(default_factory=list, description="Known cluster hosts")
7844 is_distributed: bool = Field(False, description="Running in distributed mode")
7847class PerformanceHistoryResponse(BaseModel):
7848 """Response for historical performance data."""
7850 aggregates: List[PerformanceAggregateRead] = Field(default_factory=list, description="Historical aggregates")
7851 period_type: str = Field(..., description="Aggregation period type")
7852 total_count: int = Field(0, description="Total matching records")