Coverage for mcpgateway / config.py: 99%
1008 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/config.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti, Manav Gupta
7ContextForge AI Gateway Configuration.
8This module defines configuration settings for ContextForge AI Gateway using Pydantic.
9It loads configuration from environment variables with sensible defaults.
11Environment variables:
12- APP_NAME: Gateway name (default: "ContextForge")
13- HOST: Host to bind to (default: "127.0.0.1")
14- PORT: Port to listen on (default: 4444)
15- DATABASE_URL: SQLite database URL (default: "sqlite:///./mcp.db")
16- BASIC_AUTH_USER: Username for API Basic auth when enabled (default: "admin")
17- BASIC_AUTH_PASSWORD: Password for API Basic auth when enabled (default: "changeme")
18- LOG_LEVEL: Logging level (default: "INFO")
19- SKIP_SSL_VERIFY: Disable SSL verification (default: False)
20- AUTH_REQUIRED: Require authentication (default: True)
21- TRANSPORT_TYPE: Transport mechanisms (default: "all")
22- DOCS_ALLOW_BASIC_AUTH: Allow basic auth for docs (default: False)
23- RESOURCE_CACHE_SIZE: Max cached resources (default: 1000)
24- RESOURCE_CACHE_TTL: Cache TTL in seconds (default: 3600)
25- TOOL_TIMEOUT: Tool invocation timeout (default: 60)
26- PROMPT_CACHE_SIZE: Max cached prompts (default: 100)
27- HEALTH_CHECK_INTERVAL: Gateway health check interval (default: 300)
28- REQUIRE_TOKEN_EXPIRATION: Require JWT tokens to have expiration (default: True)
29- REQUIRE_JTI: Require JTI claim in tokens for revocation (default: True)
30- REQUIRE_USER_IN_DB: Require all users to exist in database (default: False)
32Examples:
33 >>> from mcpgateway.config import Settings
34 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret')
35 >>> s.api_key
36 'admin:secret'
37 >>> s2 = Settings(transport_type='http')
38 >>> s2.validate_transport() # no error
39 >>> s3 = Settings(transport_type='invalid')
40 >>> try:
41 ... s3.validate_transport()
42 ... except ValueError as e:
43 ... print('error')
44 error
45 >>> s4 = Settings(database_url='sqlite:///./test.db')
46 >>> isinstance(s4.database_settings, dict)
47 True
48"""
50# Standard
51from functools import lru_cache
52from importlib.resources import files
53import logging
54import os
55from pathlib import Path
56import re
57import sys
58from typing import Annotated, Any, ClassVar, Dict, List, Literal, NotRequired, Optional, Self, Set, TypedDict
59from urllib.parse import urlparse
61# Third-Party
62from cryptography.hazmat.primitives import serialization
63from cryptography.hazmat.primitives.asymmetric import ed25519
64import orjson
65from pydantic import AliasChoices, Field, field_validator, HttpUrl, model_validator, PositiveInt, SecretStr, ValidationInfo
66from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict
68# Only configure basic logging if no handlers exist yet
69# This prevents conflicts with LoggingService while ensuring config logging works
70if not logging.getLogger().handlers:
71 logging.basicConfig(
72 level=logging.INFO,
73 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
74 datefmt="%Y-%m-%dT%H:%M:%S",
75 )
77logger = logging.getLogger(__name__)
80def _normalize_env_list_vars() -> None:
81 """Normalize list-typed env vars to valid JSON arrays.
83 Ensures env values parse cleanly when providers expect JSON for complex types.
84 If a value is empty or CSV, convert to a JSON array string.
85 """
86 keys = [
87 "SSO_TRUSTED_DOMAINS",
88 "SSO_AUTO_ADMIN_DOMAINS",
89 "SSO_GITHUB_ADMIN_ORGS",
90 "SSO_GOOGLE_ADMIN_DOMAINS",
91 "SSO_ENTRA_ADMIN_GROUPS",
92 "LOG_DETAILED_SKIP_ENDPOINTS",
93 "TOOL_DESCRIPTION_FORBIDDEN_PATTERNS",
94 "CONTENT_ALLOWED_RESOURCE_MIMETYPES",
95 ]
96 for key in keys:
97 raw = os.environ.get(key)
98 if raw is None:
99 continue
100 s = raw.strip()
101 if not s:
102 os.environ[key] = "[]"
103 continue
104 if s.startswith("["):
105 # Already JSON-like, keep as is
106 try:
107 orjson.loads(s)
108 continue
109 except Exception:
110 pass # nosec B110 - Intentionally continue with CSV parsing if JSON parsing fails
111 # Convert CSV to JSON array
112 items = [item.strip() for item in s.split(",") if item.strip()]
113 os.environ[key] = orjson.dumps(items).decode()
116_normalize_env_list_vars()
119# Default content type for outgoing requests to Forge
120FORGE_CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
122# UI embedding / visibility controls
123UI_HIDABLE_SECTIONS = frozenset(
124 {
125 "overview",
126 "servers",
127 "gateways",
128 "tools",
129 "prompts",
130 "resources",
131 "roots",
132 "mcp-registry",
133 "metrics",
134 "plugins",
135 "export-import",
136 "logs",
137 "version-info",
138 "maintenance",
139 "teams",
140 "users",
141 "agents",
142 "grpc-services",
143 "tokens",
144 "settings",
145 }
146)
147UI_HIDABLE_HEADER_ITEMS = frozenset({"logout", "team_selector", "user_identity", "theme_toggle"})
148UI_HIDE_SECTION_ALIASES = {
149 "catalog": "servers",
150 "virtual_servers": "servers",
151 "a2a-agents": "agents",
152 "a2a": "agents",
153 "api_tokens": "tokens",
154 "llm-settings": "settings",
155}
158class Settings(BaseSettings):
159 """
160 ContextForge AI Gateway configuration settings.
162 Examples:
163 >>> from mcpgateway.config import Settings
164 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret')
165 >>> s.api_key
166 'admin:secret'
167 >>> s2 = Settings(transport_type='http')
168 >>> s2.validate_transport() # no error
169 >>> s3 = Settings(transport_type='invalid')
170 >>> try:
171 ... s3.validate_transport()
172 ... except ValueError as e:
173 ... print('error')
174 error
175 >>> s4 = Settings(database_url='sqlite:///./test.db')
176 >>> isinstance(s4.database_settings, dict)
177 True
178 >>> s5 = Settings()
179 >>> s5.app_name
180 'ContextForge'
181 >>> s5.host in ('0.0.0.0', '127.0.0.1') # Default can be either
182 True
183 >>> s5.port
184 4444
185 >>> s5.auth_required
186 True
187 >>> isinstance(s5.allowed_origins, set)
188 True
189 >>> s6 = Settings(log_detailed_skip_endpoints=["/metrics", "/health"])
190 >>> s6.log_detailed_skip_endpoints
191 ['/metrics', '/health']
192 >>> s7 = Settings(log_detailed_sample_rate=0.5)
193 >>> s7.log_detailed_sample_rate
194 0.5
195 >>> s8 = Settings(log_resolve_user_identity=True)
196 >>> s8.log_resolve_user_identity
197 True
198 >>> s9 = Settings()
199 >>> s9.log_detailed_skip_endpoints
200 []
201 >>> s9.log_detailed_sample_rate
202 1.0
203 >>> s9.log_resolve_user_identity
204 False
205 """
207 # Basic Settings
208 app_name: str = "ContextForge"
209 host: str = "127.0.0.1"
210 port: PositiveInt = Field(default=4444, ge=1, le=65535)
211 client_mode: bool = False
212 docs_allow_basic_auth: bool = False # Allow basic auth for docs
213 api_allow_basic_auth: bool = Field(
214 default=False,
215 description="Allow Basic authentication for API endpoints. Disabled by default for security. Use JWT or API tokens instead.",
216 )
217 database_url: str = Field(
218 default="sqlite:///./mcp.db",
219 description=(
220 "Database connection URL. Supports SQLite (dev) and PostgreSQL (production). "
221 "For PostgreSQL with custom schema, use the 'options' query parameter: "
222 "postgresql://user:pass@host:5432/db?options=-c%20search_path=schema_name "
223 "(See Issue #1535 for details)"
224 ),
225 )
227 # Absolute paths resolved at import-time (still override-able via env vars)
228 templates_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "templates")))
229 static_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "static")))
231 # Template auto-reload: False for production (default), True for development
232 # Disabling prevents re-parsing templates on each request, improving performance under load
233 # Use TEMPLATES_AUTO_RELOAD=true for development (make dev sets this automatically)
234 templates_auto_reload: bool = Field(default=False, description="Auto-reload Jinja2 templates on change (enable for development)")
236 app_root_path: str = ""
238 # Protocol
239 protocol_version: str = "2025-11-25"
240 experimental_rust_mcp_runtime_enabled: bool = Field(
241 default=False,
242 description="Proxy POST /mcp traffic through the experimental Rust MCP runtime sidecar.",
243 )
244 experimental_rust_mcp_runtime_url: str = Field(
245 default="http://127.0.0.1:8787",
246 description="Base URL for the experimental Rust MCP runtime sidecar.",
247 )
248 experimental_rust_mcp_runtime_uds: Optional[str] = Field(
249 default=None,
250 description="Optional Unix domain socket path for the experimental Rust MCP runtime sidecar.",
251 )
252 experimental_rust_mcp_runtime_timeout_seconds: int = Field(
253 default=30,
254 ge=1,
255 le=300,
256 description="Timeout in seconds for Python-to-Rust MCP runtime proxy requests.",
257 )
258 experimental_rust_mcp_session_core_enabled: bool = Field(
259 default=False,
260 description="Enable the experimental Rust-owned MCP session metadata core while keeping Python as the fallback transport backend.",
261 )
262 experimental_rust_mcp_event_store_enabled: bool = Field(
263 default=False,
264 description="Enable the experimental Rust-owned resumable MCP event-store backend for Streamable HTTP sessions.",
265 )
266 experimental_rust_mcp_resume_core_enabled: bool = Field(
267 default=False,
268 description="Enable the experimental Rust-owned public MCP replay/resume path for GET /mcp with Last-Event-ID while keeping Python fallback available.",
269 )
270 experimental_rust_mcp_live_stream_core_enabled: bool = Field(
271 default=False,
272 description="Enable the experimental Rust-owned public MCP live GET /mcp SSE path while keeping Python as the fallback upstream stream source.",
273 )
274 experimental_rust_mcp_affinity_core_enabled: bool = Field(
275 default=False,
276 description="Enable the experimental Rust-owned MCP session-affinity forwarding path while keeping Python worker forwarding as the fallback.",
277 )
278 experimental_rust_mcp_session_auth_reuse_enabled: bool = Field(
279 default=False,
280 description="Enable the experimental Rust-owned MCP session-bound auth-context reuse path for direct public /mcp ingress.",
281 )
283 # Authentication
284 basic_auth_user: str = "admin"
285 basic_auth_password: SecretStr = Field(default=SecretStr("changeme"))
286 jwt_algorithm: str = "HS256"
287 jwt_secret_key: SecretStr = Field(default=SecretStr("my-test-key"))
288 jwt_public_key_path: str = ""
289 jwt_private_key_path: str = ""
290 jwt_audience: str = "mcpgateway-api"
291 jwt_issuer: str = "mcpgateway"
292 jwt_audience_verification: bool = True
293 jwt_issuer_verification: bool = True
294 auth_required: bool = True
295 allow_unauthenticated_admin: bool = Field(
296 default=False,
297 description="Allow unauthenticated requests to receive platform-admin context when AUTH_REQUIRED=false (dangerous; development-only override).",
298 )
299 token_expiry: int = 10080 # minutes
301 require_token_expiration: bool = Field(default=True, description="Require all JWT tokens to have expiration claims (secure default)")
302 require_jti: bool = Field(default=True, description="Require JTI (JWT ID) claim in all tokens for revocation support (secure default)")
303 require_user_in_db: bool = Field(
304 default=False,
305 description="Require all authenticated users to exist in the database. When true, disables the platform admin bootstrap mechanism. WARNING: Enabling this on a fresh deployment will lock you out.",
306 )
307 embed_environment_in_tokens: bool = Field(default=False, description="Embed environment claim in gateway-issued JWTs for environment isolation")
308 validate_token_environment: bool = Field(default=False, description="Reject tokens with mismatched environment claim (tokens without env claim are allowed)")
310 # JSON Schema Validation for registration (Tool Input Schemas, Prompt schemas, etc)
311 json_schema_validation_strict: bool = Field(default=True, description="Strict schema validation mode - reject invalid JSON schemas")
313 # SSO Configuration
314 sso_enabled: bool = Field(default=False, description="Enable Single Sign-On authentication")
315 sso_github_enabled: bool = Field(default=False, description="Enable GitHub OAuth authentication")
316 sso_github_client_id: Optional[str] = Field(default=None, description="GitHub OAuth client ID")
317 sso_github_client_secret: Optional[SecretStr] = Field(default=None, description="GitHub OAuth client secret")
319 sso_google_enabled: bool = Field(default=False, description="Enable Google OAuth authentication")
320 sso_google_client_id: Optional[str] = Field(default=None, description="Google OAuth client ID")
321 sso_google_client_secret: Optional[SecretStr] = Field(default=None, description="Google OAuth client secret")
323 sso_ibm_verify_enabled: bool = Field(default=False, description="Enable IBM Security Verify OIDC authentication")
324 sso_ibm_verify_client_id: Optional[str] = Field(default=None, description="IBM Security Verify client ID")
325 sso_ibm_verify_client_secret: Optional[SecretStr] = Field(default=None, description="IBM Security Verify client secret")
326 sso_ibm_verify_issuer: Optional[str] = Field(default=None, description="IBM Security Verify OIDC issuer URL")
328 sso_okta_enabled: bool = Field(default=False, description="Enable Okta OIDC authentication")
329 sso_okta_client_id: Optional[str] = Field(default=None, description="Okta client ID")
330 sso_okta_client_secret: Optional[SecretStr] = Field(default=None, description="Okta client secret")
331 sso_okta_issuer: Optional[str] = Field(default=None, description="Okta issuer URL")
332 sso_okta_scope: str = Field(default="openid profile email", description="Okta OIDC scopes (space-separated)")
333 okta_group_mapping: Optional[str] = Field(default=None, description="JSON mapping of Okta group names to team UUIDs")
335 sso_keycloak_enabled: bool = Field(default=False, description="Enable Keycloak OIDC authentication")
336 sso_keycloak_base_url: Optional[str] = Field(default=None, description="Keycloak base URL (e.g., https://keycloak.example.com)")
337 sso_keycloak_public_base_url: Optional[str] = Field(
338 default=None,
339 description="Browser-facing Keycloak base URL for authorization redirects (e.g., http://localhost:8180)",
340 )
341 sso_keycloak_realm: str = Field(default="master", description="Keycloak realm name")
342 sso_keycloak_client_id: Optional[str] = Field(default=None, description="Keycloak client ID")
343 sso_keycloak_client_secret: Optional[SecretStr] = Field(default=None, description="Keycloak client secret")
344 sso_keycloak_map_realm_roles: bool = Field(default=True, description="Map Keycloak realm roles to gateway teams")
345 sso_keycloak_map_client_roles: bool = Field(default=False, description="Map Keycloak client roles to gateway RBAC")
346 sso_keycloak_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map Keycloak groups/roles to ContextForge roles (JSON: {group_or_role: role_name})")
347 sso_keycloak_default_role: Optional[str] = Field(default=None, description="Default ContextForge role for Keycloak users without role mapping")
348 sso_keycloak_resolve_team_scope_to_personal_team: bool = Field(default=False, description="Resolve team-scoped Keycloak role mappings to the user's personal team")
349 sso_keycloak_username_claim: str = Field(default="preferred_username", description="JWT claim for username")
351 # Security Validation & Sanitization
352 experimental_validate_io: bool = Field(default=False, description="Enable experimental input validation and output sanitization")
353 validation_middleware_enabled: bool = Field(default=False, description="Enable validation middleware for all requests")
354 validation_strict: bool = Field(default=True, description="Strict validation mode - reject on violations")
355 sanitize_output: bool = Field(default=True, description="Sanitize output to remove control characters")
356 allowed_roots: List[str] = Field(default_factory=list, description="Allowed root paths for resource access")
357 max_path_depth: int = Field(default=10, description="Maximum allowed path depth")
358 max_param_length: int = Field(default=10000, description="Maximum parameter length")
359 dangerous_patterns: List[str] = Field(
360 default_factory=lambda: [
361 r"[;&|`$(){}\[\]<>]", # Shell metacharacters
362 r"\.\.[\\/]", # Path traversal
363 r"[\x00-\x1f\x7f-\x9f]", # Control characters
364 ],
365 description="Regex patterns for dangerous input",
366 )
367 tool_description_forbidden_patterns_enabled: bool = Field(default=True, description="Enable forbidden pattern validation on tool descriptions. Set to false to disable all checks.")
368 tool_description_forbidden_patterns: List[str] = Field(
369 default_factory=lambda: ["&&", "||", "$(", "> ", "< "],
370 description='Substrings forbidden in tool descriptions. Override via TOOL_DESCRIPTION_FORBIDDEN_PATTERNS env var as a JSON array, e.g. \'["&&","||"]\'.',
371 )
373 sso_keycloak_email_claim: str = Field(default="email", description="JWT claim for email")
374 sso_keycloak_groups_claim: str = Field(default="groups", description="JWT claim for groups/roles")
376 sso_entra_enabled: bool = Field(default=False, description="Enable Microsoft Entra ID OIDC authentication")
377 sso_entra_client_id: Optional[str] = Field(default=None, description="Microsoft Entra ID client ID")
378 sso_entra_client_secret: Optional[SecretStr] = Field(default=None, description="Microsoft Entra ID client secret")
379 sso_entra_tenant_id: Optional[str] = Field(default=None, description="Microsoft Entra ID tenant ID")
380 sso_entra_groups_claim: str = Field(default="groups", description="JWT claim for EntraID groups (groups/roles)")
381 sso_entra_admin_groups: Annotated[list[str], NoDecode] = Field(default_factory=list, description="EntraID groups granting platform_admin role (CSV/JSON)")
382 sso_entra_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map EntraID groups to ContextForge roles (JSON: {group_id: role_name})")
383 sso_entra_default_role: Optional[str] = Field(default=None, description="Default role for EntraID users without group mapping (None = no role assigned)")
384 sso_entra_sync_roles_on_login: bool = Field(default=True, description="Synchronize role assignments on each login")
385 sso_entra_graph_api_enabled: bool = Field(default=True, description="Enable Microsoft Graph fallback for EntraID groups overage claims")
386 sso_entra_graph_api_timeout: int = Field(default=10, ge=1, le=120, description="Timeout in seconds for Microsoft Graph group fallback requests")
387 sso_entra_graph_api_max_groups: int = Field(default=0, ge=0, description="Maximum groups to keep from Graph fallback (0 = no limit)")
389 sso_adfs_enabled: bool = Field(default=False, description="Enable ADFS OIDC authentication")
390 sso_adfs_client_id: Optional[str] = Field(default=None, description="ADFS OAuth client ID")
391 sso_adfs_client_secret: Optional[SecretStr] = Field(default=None, description="ADFS OAuth client secret")
392 sso_adfs_authorization_url: Optional[str] = Field(default=None, description="ADFS authorization endpoint URL (e.g., https://adfs.example.com/adfs/oauth2/authorize/)")
393 sso_adfs_token_url: Optional[str] = Field(default=None, description="ADFS token endpoint URL (e.g., https://adfs.example.com/adfs/oauth2/token/)")
394 sso_adfs_issuer: Optional[str] = Field(default=None, description="ADFS issuer URL (e.g., https://adfs.example.com/adfs)")
395 sso_adfs_scope: Optional[str] = Field(default="openid profile email", description="ADFS OAuth scopes (space-separated)")
396 sso_adfs_display_name: Optional[str] = Field(default="ADFS Login", description="Display name shown on login page for ADFS")
398 sso_generic_enabled: bool = Field(default=False, description="Enable generic OIDC provider (Keycloak, Auth0, etc.)")
399 sso_generic_provider_id: Optional[str] = Field(default=None, description="Provider ID (e.g., 'keycloak', 'auth0', 'authentik')")
400 sso_generic_display_name: Optional[str] = Field(default=None, description="Display name shown on login page")
401 sso_generic_client_id: Optional[str] = Field(default=None, description="Generic OIDC client ID")
402 sso_generic_client_secret: Optional[SecretStr] = Field(default=None, description="Generic OIDC client secret")
403 sso_generic_authorization_url: Optional[str] = Field(default=None, description="Authorization endpoint URL")
404 sso_generic_token_url: Optional[str] = Field(default=None, description="Token endpoint URL")
405 sso_generic_userinfo_url: Optional[str] = Field(default=None, description="Userinfo endpoint URL")
406 sso_generic_issuer: Optional[str] = Field(default=None, description="OIDC issuer URL")
407 sso_generic_jwks_uri: Optional[str] = Field(default=None, description="OIDC JWKS endpoint URL for token signature verification")
408 sso_generic_scope: Optional[str] = Field(default="openid profile email", description="OAuth scopes (space-separated)")
410 # SSO Settings
411 sso_auto_create_users: bool = Field(default=True, description="Automatically create users from SSO providers")
412 sso_trusted_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Trusted email domains (CSV or JSON list)")
413 sso_preserve_admin_auth: bool = Field(default=True, description="Preserve local admin authentication when SSO is enabled")
414 sso_auto_disable_unconfigured_providers: bool = Field(
415 default=False,
416 description=(
417 "Automatically disable SSO providers not present in environment configuration during bootstrap. "
418 "When enabled, providers configured in the database but missing from SSO_*_ENABLED environment variables "
419 "will be disabled. This enforces environment config as the single source of truth. "
420 "Default: false (preserves manually configured providers for backward compatibility)."
421 ),
422 )
424 # SSO Admin Assignment Settings
425 sso_auto_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Admin domains (CSV or JSON list)")
426 sso_github_admin_orgs: Annotated[list[str], NoDecode] = Field(default_factory=list, description="GitHub orgs granting admin (CSV/JSON)")
427 sso_google_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Google admin domains (CSV/JSON)")
428 sso_require_admin_approval: bool = Field(default=False, description="Require admin approval for new SSO registrations")
430 # ADFS-specific Settings
431 sso_adfs_default_email_domain: Optional[str] = Field(
432 default=None, description="Default email domain for ADFS when UPN is plain username (e.g., 'company.com' converts 'user123' to 'user123@company.com')"
433 )
435 # MCP Client Authentication
436 mcp_client_auth_enabled: bool = Field(default=True, description="Enable JWT authentication for MCP client operations")
437 mcp_require_auth: Optional[bool] = Field(
438 default=None,
439 description=(
440 "Require authentication for /mcp endpoints. "
441 "When unset, inherits AUTH_REQUIRED. "
442 "Set false explicitly to allow unauthenticated access to public items only; "
443 "set true to require a valid Bearer token for all /mcp requests."
444 ),
445 )
446 trust_proxy_auth: bool = Field(
447 default=False,
448 description="Trust proxy authentication headers (required when mcp_client_auth_enabled=false)",
449 )
450 trust_proxy_auth_dangerously: bool = Field(
451 default=False,
452 description="Acknowledge and allow trusted proxy headers when MCP_CLIENT_AUTH_ENABLED=false (dangerous; only for strictly trusted proxy deployments).",
453 )
454 proxy_user_header: str = Field(default="X-Authenticated-User", description="Header containing authenticated username from proxy")
456 # Encryption key phrase for auth storage
457 auth_encryption_secret: SecretStr = Field(default=SecretStr("my-test-salt"))
459 # Query Parameter Authentication (INSECURE - disabled by default)
460 insecure_allow_queryparam_auth: bool = Field(
461 default=False,
462 description=("Enable query parameter authentication for gateway peers. " "WARNING: API keys may appear in proxy logs. See CWE-598."),
463 )
464 insecure_queryparam_auth_allowed_hosts: List[str] = Field(
465 default_factory=list,
466 description=("Allowlist of hosts permitted to use query parameter auth. " "Empty list allows any host when feature is enabled. " "Format: ['mcp.tavily.com', 'api.example.com']"),
467 )
469 # ===================================
470 # SSRF Protection Configuration
471 # ===================================
472 # Server-Side Request Forgery (SSRF) protection prevents the gateway from being
473 # used to access internal resources or cloud metadata services.
475 ssrf_protection_enabled: bool = Field(
476 default=True,
477 description="Enable SSRF protection for gateway/tool URLs. Blocks access to dangerous endpoints.",
478 )
480 ssrf_blocked_networks: List[str] = Field(
481 default=[
482 # Cloud metadata services (ALWAYS dangerous - credential exposure)
483 "169.254.169.254/32", # AWS/GCP/Azure instance metadata
484 "169.254.169.123/32", # AWS NTP service
485 "fd00::1/128", # IPv6 cloud metadata
486 # Link-local (often used for cloud metadata)
487 "169.254.0.0/16", # Full link-local IPv4 range
488 "fe80::/10", # IPv6 link-local
489 ],
490 description=(
491 "CIDR ranges to block for SSRF protection. These are ALWAYS blocked regardless of other settings. " "Default blocks cloud metadata endpoints. Add private ranges for stricter security."
492 ),
493 )
495 ssrf_blocked_hosts: List[str] = Field(
496 default=[
497 "metadata.google.internal", # GCP metadata hostname
498 "metadata.internal", # Generic cloud metadata
499 ],
500 description="Hostnames to block for SSRF protection. Matched case-insensitively.",
501 )
503 ssrf_allow_localhost: bool = Field(
504 default=False,
505 description=("Allow localhost/loopback addresses (127.0.0.0/8, ::1). " "Default false for safer production behavior."),
506 )
508 ssrf_allow_private_networks: bool = Field(
509 default=False,
510 description=(
511 "Allow RFC 1918 private network addresses (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16). " "When false, private destinations are blocked unless explicitly listed in SSRF_ALLOWED_NETWORKS."
512 ),
513 )
515 ssrf_allowed_networks: List[str] = Field(
516 default_factory=list,
517 description=("Optional CIDR allowlist for internal/private destinations. " "Used when SSRF_ALLOW_PRIVATE_NETWORKS=false to allow specific internal ranges."),
518 )
520 ssrf_dns_fail_closed: bool = Field(
521 default=True,
522 description=(
523 "Fail closed on DNS resolution errors. When true, URLs that cannot be resolved "
524 "are rejected. When false, unresolvable hostnames are allowed through "
525 "(hostname blocklist still applies)."
526 ),
527 )
529 # OAuth Configuration
530 oauth_request_timeout: int = Field(default=30, description="OAuth request timeout in seconds")
531 oauth_max_retries: int = Field(default=3, description="Maximum retries for OAuth token requests")
532 oauth_default_timeout: int = Field(default=3600, description="Default OAuth token timeout in seconds")
534 # ===================================
535 # Dynamic Client Registration (DCR) - Client Mode
536 # ===================================
538 # Enable DCR client functionality
539 dcr_enabled: bool = Field(default=True, description="Enable Dynamic Client Registration (RFC 7591) - gateway acts as DCR client")
541 # Auto-register when missing credentials
542 dcr_auto_register_on_missing_credentials: bool = Field(default=True, description="Automatically register with AS when gateway has issuer but no client_id")
544 # Default scopes for DCR
545 dcr_default_scopes: List[str] = Field(default=["mcp:read"], description="Default MCP scopes to request during DCR")
547 # Issuer allowlist (empty = allow any)
548 dcr_allowed_issuers: List[str] = Field(default_factory=list, description="Optional allowlist of issuer URLs for DCR (empty = allow any)")
550 # Token endpoint auth method
551 dcr_token_endpoint_auth_method: str = Field(default="client_secret_basic", description="Token endpoint auth method for DCR (client_secret_basic or client_secret_post)")
553 # Metadata cache TTL
554 dcr_metadata_cache_ttl: int = Field(default=3600, description="AS metadata cache TTL in seconds (RFC 8414 discovery)")
556 # Client name template
557 dcr_client_name_template: str = Field(default="ContextForge ({gateway_name})", description="Template for client_name in DCR requests")
559 # Refresh token behavior
560 dcr_request_refresh_token_when_unsupported: bool = Field(
561 default=False,
562 description="Request refresh_token even when AS metadata omits grant_types_supported. Enable for AS servers that support refresh tokens but don't advertise it.",
563 )
565 # ===================================
566 # OAuth Discovery (RFC 8414)
567 # ===================================
569 oauth_discovery_enabled: bool = Field(default=True, description="Enable OAuth AS metadata discovery (RFC 8414)")
571 oauth_preferred_code_challenge_method: str = Field(default="S256", description="Preferred PKCE code challenge method (S256 or plain)")
573 # Email-Based Authentication
574 email_auth_enabled: bool = Field(default=True, description="Enable email-based authentication")
575 public_registration_enabled: bool = Field(
576 default=False,
577 description="Allow unauthenticated users to self-register accounts. When false, only admins can create users via /admin/users endpoint.",
578 )
579 allow_public_visibility: bool = Field(
580 default=True,
581 description="When false, creating or updating any entity with public visibility is blocked in team scope.",
582 )
583 protect_all_admins: bool = Field(
584 default=True,
585 description="When true (default), prevent any admin from being demoted, deactivated, or locked out via API/UI. When false, only the last active admin is protected.",
586 )
587 platform_admin_email: str = Field(default="admin@example.com", description="Platform administrator email address")
588 platform_admin_password: SecretStr = Field(default=SecretStr("changeme"), description="Platform administrator password")
589 default_user_password: SecretStr = Field(default=SecretStr("changeme"), description="Default password for new users") # nosec B105
590 platform_admin_full_name: str = Field(default="Platform Administrator", description="Platform administrator full name")
592 # Argon2id Password Hashing Configuration
593 argon2id_time_cost: int = Field(default=3, description="Argon2id time cost (number of iterations)")
594 argon2id_memory_cost: int = Field(default=65536, description="Argon2id memory cost in KiB")
595 argon2id_parallelism: int = Field(default=1, description="Argon2id parallelism (number of threads)")
597 # Password Policy Configuration
598 password_min_length: int = Field(default=8, description="Minimum password length")
599 password_require_uppercase: bool = Field(default=True, description="Require uppercase letters in passwords")
600 password_require_lowercase: bool = Field(default=True, description="Require lowercase letters in passwords")
601 password_require_numbers: bool = Field(default=False, description="Require numbers in passwords")
602 password_require_special: bool = Field(default=True, description="Require special characters in passwords")
604 # Password change enforcement and policy toggles
605 password_change_enforcement_enabled: bool = Field(default=True, description="Master switch for password change enforcement checks")
606 admin_require_password_change_on_bootstrap: bool = Field(default=True, description="Force admin to change password after bootstrap")
607 detect_default_password_on_login: bool = Field(default=True, description="Detect default password during login and mark user for change")
608 require_password_change_for_default_password: bool = Field(default=True, description="Require password change when user is created with the default password")
609 password_policy_enabled: bool = Field(default=True, description="Enable password complexity validation for new/changed passwords")
610 password_prevent_reuse: bool = Field(default=True, description="Prevent reusing the current password when changing")
611 password_max_age_days: int = Field(default=90, description="Password maximum age in days before expiry forces a change")
612 # Account Security Configuration
613 max_failed_login_attempts: int = Field(default=10, description="Maximum failed login attempts before account lockout")
614 account_lockout_duration_minutes: int = Field(default=1, description="Account lockout duration in minutes")
615 account_lockout_notification_enabled: bool = Field(default=True, description="Send lockout notification emails when accounts are locked")
616 failed_login_min_response_ms: int = Field(default=250, description="Minimum response duration for failed login attempts to reduce timing side channels")
618 # Self-service password reset
619 password_reset_enabled: bool = Field(default=True, description="Enable self-service password reset workflow (set false to disable public forgot/reset endpoints)")
620 password_reset_token_expiry_minutes: int = Field(default=60, description="Password reset token expiration time in minutes")
621 password_reset_rate_limit: int = Field(default=5, description="Maximum password reset requests allowed per email in each rate-limit window")
622 password_reset_rate_window_minutes: int = Field(default=15, description="Password reset request rate-limit window in minutes")
623 password_reset_invalidate_sessions: bool = Field(default=True, description="Invalidate active sessions after password reset")
624 password_reset_min_response_ms: int = Field(default=250, description="Minimum response duration for forgot-password requests to reduce timing side channels")
626 # Email delivery for auth notifications
627 smtp_enabled: bool = Field(
628 default=False,
629 description="Enable SMTP email delivery for password reset and account lockout notifications (when false, reset requests are accepted but no email is sent)",
630 )
631 smtp_host: Optional[str] = Field(default=None, description="SMTP server host")
632 smtp_port: int = Field(default=587, description="SMTP server port")
633 smtp_user: Optional[str] = Field(default=None, description="SMTP username")
634 smtp_password: Optional[SecretStr] = Field(default=None, description="SMTP password")
635 smtp_from_email: Optional[str] = Field(default=None, description="From email address used for auth notifications")
636 smtp_from_name: str = Field(default="ContextForge", description="From display name used for auth notifications")
637 smtp_use_tls: bool = Field(default=True, description="Use STARTTLS for SMTP connections")
638 smtp_use_ssl: bool = Field(default=False, description="Use implicit SSL/TLS for SMTP connections")
639 smtp_timeout_seconds: int = Field(default=15, description="SMTP connection timeout in seconds")
641 # Personal Teams Configuration
642 auto_create_personal_teams: bool = Field(default=True, description="Enable automatic personal team creation for new users")
643 personal_team_prefix: str = Field(default="", description="Personal team naming prefix")
644 max_teams_per_user: int = Field(default=50, description="Maximum number of teams a user can belong to")
645 max_members_per_team: int = Field(default=100, description="Maximum number of members per team")
646 invitation_expiry_days: int = Field(default=7, description="Number of days before team invitations expire")
647 require_email_verification_for_invites: bool = Field(default=True, description="Require email verification for team invitations")
649 # Team Governance
650 allow_team_creation: bool = Field(default=True, description="Allow users to create organizational teams. Admins can always create teams.")
651 allow_team_join_requests: bool = Field(default=True, description="Allow users to request to join public teams")
652 allow_team_invitations: bool = Field(default=True, description="Allow team owners to send invitations")
654 # Default Role Configuration
655 default_admin_role: str = Field(default="platform_admin", description="Global role assigned to admin users")
656 default_user_role: str = Field(default="platform_viewer", description="Global role assigned to non-admin users")
657 default_team_owner_role: str = Field(default="team_admin", description="Team-scoped role assigned to team owners (e.g. personal team creator)")
658 default_team_member_role: str = Field(default="viewer", description="Team-scoped role assigned to team members")
660 # UI/Admin Feature Flags
661 mcpgateway_ui_enabled: bool = False
662 mcpgateway_admin_api_enabled: bool = False
663 mcpgateway_ui_airgapped: bool = Field(default=False, description="Use local CDN assets instead of external CDNs for airgapped deployments")
664 mcpgateway_ui_embedded: bool = Field(default=False, description="Enable embedded UI mode (hides select header controls by default)")
665 mcpgateway_ui_hide_sections: Annotated[list[str], NoDecode] = Field(
666 default_factory=list,
667 description=(
668 "CSV/JSON list of UI sections to hide. "
669 "Valid values: overview, servers, gateways, tools, prompts, resources, roots, mcp-registry, "
670 "metrics, plugins, export-import, logs, version-info, maintenance, teams, users, agents, tokens, settings"
671 ),
672 )
673 mcpgateway_ui_hide_header_items: Annotated[list[str], NoDecode] = Field(
674 default_factory=list,
675 description="CSV/JSON list of header items to hide. Valid values: logout, team_selector, user_identity, theme_toggle",
676 )
677 mcpgateway_ui_hide_sections_admin: Annotated[list[str], NoDecode] = Field(
678 default_factory=list,
679 description=("CSV/JSON list of UI sections to hide for admin users. " "Same valid values as MCPGATEWAY_UI_HIDE_SECTIONS. " "When unset, admins see all sections."),
680 )
681 mcpgateway_ui_hide_header_items_admin: Annotated[list[str], NoDecode] = Field(
682 default_factory=list,
683 description="CSV/JSON list of header items to hide for admin users. Same valid values as MCPGATEWAY_UI_HIDE_HEADER_ITEMS.",
684 )
685 mcpgateway_bulk_import_enabled: bool = True
686 mcpgateway_bulk_import_max_tools: int = 200
687 mcpgateway_bulk_import_rate_limit: int = 10
689 # UI Tool Test Configuration
690 mcpgateway_ui_tool_test_timeout: int = Field(default=60000, description="Tool test timeout in milliseconds for the admin UI")
692 # Tool Execution Cancellation
693 mcpgateway_tool_cancellation_enabled: bool = Field(default=True, description="Enable gateway-authoritative tool execution cancellation with REST API endpoints")
695 # A2A (Agent-to-Agent) Feature Flags
696 mcpgateway_a2a_enabled: bool = True
697 mcpgateway_a2a_max_agents: int = 100
698 mcpgateway_a2a_default_timeout: int = 30
699 mcpgateway_a2a_max_retries: int = 3
700 mcpgateway_a2a_metrics_enabled: bool = True
702 # gRPC Support Configuration (EXPERIMENTAL - disabled by default)
703 mcpgateway_grpc_enabled: bool = Field(default=False, description="Enable gRPC to MCP translation support (experimental feature)")
704 mcpgateway_grpc_reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection by default")
705 mcpgateway_grpc_max_message_size: int = Field(default=4194304, description="Maximum gRPC message size in bytes (4MB)")
706 mcpgateway_grpc_timeout: int = Field(default=30, description="Default gRPC call timeout in seconds")
707 mcpgateway_grpc_tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connections by default")
709 # Direct Proxy Configuration (disabled by default)
710 mcpgateway_direct_proxy_enabled: bool = Field(default=False, description="Enable direct_proxy gateway mode for pass-through MCP operations")
711 mcpgateway_direct_proxy_timeout: int = Field(default=30, description="Default timeout in seconds for direct proxy MCP operations")
713 # ===================================
714 # Performance Monitoring Configuration
715 # ===================================
716 mcpgateway_performance_tracking: bool = Field(default=False, description="Enable performance tracking tab in admin UI")
717 mcpgateway_performance_collection_interval: int = Field(default=10, ge=1, le=300, description="Metric collection interval in seconds")
718 mcpgateway_performance_retention_hours: int = Field(default=24, ge=1, le=168, description="Snapshot retention period in hours")
719 mcpgateway_performance_retention_days: int = Field(default=90, ge=1, le=365, description="Aggregate retention period in days")
720 mcpgateway_performance_max_snapshots: int = Field(default=10000, ge=100, le=1000000, description="Maximum performance snapshots to retain")
721 mcpgateway_performance_distributed: bool = Field(default=False, description="Enable distributed mode metrics aggregation via Redis")
722 mcpgateway_performance_net_connections_enabled: bool = Field(default=True, description="Enable network connections counting (can be CPU intensive)")
723 mcpgateway_performance_net_connections_cache_ttl: int = Field(default=15, ge=1, le=300, description="Cache TTL for net_connections in seconds")
725 # MCP Server Catalog Configuration
726 mcpgateway_catalog_enabled: bool = Field(default=True, description="Enable MCP server catalog feature")
727 mcpgateway_catalog_file: str = Field(default="mcp-catalog.yml", description="Path to catalog configuration file")
728 mcpgateway_catalog_auto_health_check: bool = Field(default=True, description="Automatically health check catalog servers")
729 mcpgateway_catalog_cache_ttl: int = Field(default=3600, description="Catalog cache TTL in seconds")
730 mcpgateway_catalog_page_size: int = Field(default=100, description="Number of catalog servers per page")
732 # ContextForge Bootstrap Roles In DB Configuration
733 mcpgateway_bootstrap_roles_in_db_enabled: bool = Field(default=False, description="Enable ContextForge add additional roles in db")
734 mcpgateway_bootstrap_roles_in_db_file: str = Field(default="additional_roles_in_db.json", description="Path to add additional roles in db")
736 # Elicitation support (MCP 2025-06-18)
737 mcpgateway_elicitation_enabled: bool = Field(default=True, description="Enable elicitation passthrough support (MCP 2025-06-18)")
738 mcpgateway_elicitation_timeout: int = Field(default=60, description="Default timeout for elicitation requests in seconds")
739 mcpgateway_elicitation_max_concurrent: int = Field(default=100, description="Maximum concurrent elicitation requests")
741 # Security
742 skip_ssl_verify: bool = Field(
743 default=False,
744 description="Skip SSL certificate verification for ALL outbound HTTPS requests "
745 "(federation, MCP servers, LLM providers, A2A agents). "
746 "WARNING: Only enable in dev environments with self-signed certificates.",
747 )
748 cors_enabled: bool = True
750 # Environment
751 environment: Literal["development", "staging", "production"] = Field(default="development")
753 # Domain configuration
754 app_domain: HttpUrl = Field(default=HttpUrl("http://localhost:4444"))
756 # Security settings
757 secure_cookies: bool = Field(default=True)
758 cookie_samesite: str = Field(default="lax")
760 # CORS settings
761 cors_allow_credentials: bool = Field(default=True)
763 # Security Headers Configuration
764 security_headers_enabled: bool = Field(default=True)
765 x_frame_options: Optional[str] = Field(default="DENY")
767 @field_validator("x_frame_options")
768 @classmethod
769 def normalize_x_frame_options(cls, v: Optional[str]) -> Optional[str]:
770 """Convert string 'null', 'none', or empty/whitespace-only string to Python None to disable iframe restrictions.
772 Args:
773 v: The X-Frame-Options value to normalize.
775 Returns:
776 None if v is None, an empty/whitespace-only string, or case-insensitive 'null'/'none';
777 otherwise returns the stripped string value.
778 """
779 if v is None:
780 return None
781 val = v.strip()
782 if val == "" or val.lower() in ("null", "none"):
783 return None
784 return val
786 x_content_type_options_enabled: bool = Field(default=True)
787 x_xss_protection_enabled: bool = Field(default=True)
788 x_download_options_enabled: bool = Field(default=True)
789 hsts_enabled: bool = Field(default=True)
790 hsts_max_age: int = Field(default=31536000) # 1 year
791 hsts_include_subdomains: bool = Field(default=True)
792 remove_server_headers: bool = Field(default=True)
794 # Response Compression Configuration
795 compression_enabled: bool = Field(default=True, description="Enable response compression (Brotli, Zstd, GZip)")
796 compression_minimum_size: int = Field(default=500, ge=0, description="Minimum response size in bytes to compress (0 = compress all)")
797 compression_gzip_level: int = Field(default=6, ge=1, le=9, description="GZip compression level (1=fastest, 9=best compression)")
798 compression_brotli_quality: int = Field(default=4, ge=0, le=11, description="Brotli compression quality (0-3=fast, 4-9=balanced, 10-11=max)")
799 compression_zstd_level: int = Field(default=3, ge=1, le=22, description="Zstd compression level (1-3=fast, 4-9=balanced, 10+=slow)")
801 # For allowed_origins, strip '' to ensure we're passing on valid JSON via env
802 # Tell pydantic *not* to touch this env var - our validator will.
803 allowed_origins: Annotated[Set[str], NoDecode] = {
804 "http://localhost",
805 "http://localhost:4444",
806 }
808 # Security validation thresholds
809 min_secret_length: int = 32
810 min_password_length: int = 12
811 require_strong_secrets: bool = False # Default to False for backward compatibility, will be enforced in 1.0.0
813 llmchat_enabled: bool = Field(default=True, description="Enable LLM Chat feature")
814 mcpgateway_stdio_transport_enabled: bool = Field(
815 default=False,
816 description=("Enable stdio transport for MCP chat client configuration. Disabled by default; " "set true only in trusted environments that intentionally need stdio process execution."),
817 )
818 toolops_enabled: bool = Field(default=False, description="Enable ToolOps feature")
819 plugins_can_override_rbac: bool = Field(
820 default=False,
821 description=("Allow HTTP_AUTH_CHECK_PERMISSION plugins to short-circuit built-in RBAC grants. " "Disabled by default so plugin grant decisions are audit-only unless explicitly enabled."),
822 )
823 plugins_can_override_auth_headers: bool = Field(
824 default=False,
825 description=(
826 "DANGEROUS: Allow pre-request plugin hooks to override auth-sensitive headers "
827 "(authorization, cookie, x-api-key, proxy-authorization) that the client already sent. "
828 "Disabled by default because a malicious or misconfigured plugin could impersonate any "
829 "user by rewriting the Authorization header. Only enable when all loaded plugins are "
830 "fully trusted and the deployment requires token exchange (e.g. WXO auth). "
831 "Requires a server restart to take effect."
832 ),
833 )
835 # database-backed polling settings for session message delivery
836 poll_interval: float = Field(default=1.0, description="Initial polling interval in seconds for checking new session messages")
837 max_interval: float = Field(default=5.0, description="Maximum polling interval in seconds when the session is idle")
838 backoff_factor: float = Field(default=1.5, description="Multiplier used to gradually increase the polling interval during inactivity")
840 # redis configurations for Maintaining Chat Sessions in multi-worker environment
841 llmchat_session_ttl: int = Field(default=300, description="Seconds for active_session key TTL")
842 llmchat_session_lock_ttl: int = Field(default=30, description="Seconds for lock expiry")
843 llmchat_session_lock_retries: int = Field(default=10, description="How many times to poll while waiting")
844 llmchat_session_lock_wait: float = Field(default=0.2, description="Seconds between polls")
845 llmchat_chat_history_ttl: int = Field(default=3600, description="Seconds for chat history expiry")
846 llmchat_chat_history_max_messages: int = Field(default=50, description="Maximum message history to store per user")
848 # LLM Settings (Internal API for LLM Chat)
849 llm_api_prefix: str = Field(default="/v1", description="API prefix for internal LLM endpoints")
850 llm_request_timeout: int = Field(default=120, description="Request timeout in seconds for LLM API calls")
851 llm_streaming_enabled: bool = Field(default=True, description="Enable streaming responses for LLM Chat")
852 llm_health_check_interval: int = Field(default=300, description="Provider health check interval in seconds")
854 @field_validator("allowed_roots", mode="before")
855 @classmethod
856 def parse_allowed_roots(cls, v):
857 """Parse allowed roots from environment variable or config value.
859 Args:
860 v: The input value to parse
862 Returns:
863 list: Parsed list of allowed root paths
864 """
865 if isinstance(v, str):
866 # Support both JSON array and comma-separated values
867 v = v.strip()
868 if not v:
869 return []
870 # Try JSON first
871 try:
872 loaded = orjson.loads(v)
873 if isinstance(loaded, list):
874 return loaded
875 except orjson.JSONDecodeError:
876 # Not a valid JSON array → fallback to comma-separated parsing
877 pass
878 # Fallback to comma-split
879 return [x.strip() for x in v.split(",") if x.strip()]
880 return v
882 @field_validator("jwt_secret_key", "auth_encryption_secret")
883 @classmethod
884 def validate_secrets(cls, v: Any, info: ValidationInfo) -> SecretStr:
885 """
886 Validate that secret keys meet basic security requirements.
888 This validator is applied to the `jwt_secret_key` and `auth_encryption_secret` fields.
889 It performs the following checks:
891 1. Detects default or weak secrets (e.g., "changeme", "secret", "password").
892 Logs a warning if detected.
894 2. Checks minimum length (at least 32 characters). Logs a warning if shorter.
896 3. Performs a basic entropy check (at least 10 unique characters). Logs a warning if low.
898 Notes:
899 - Logging is used for warnings; the function does not raise exceptions.
900 - The original value is returned as a `SecretStr` for safe handling.
902 Args:
903 v: The secret value to validate.
904 info: Pydantic validation info object, used to get the field name.
906 Returns:
907 SecretStr: The validated secret value, wrapped as a SecretStr if it wasn't already.
908 """
910 field_name = info.field_name
912 # Extract actual string value safely
913 if isinstance(v, SecretStr):
914 value = v.get_secret_value()
915 else:
916 value = str(v)
918 # Check for default/weak secrets
919 if not info.data.get("client_mode"):
920 weak_secrets = ["my-test-key", "my-test-key-but-now-longer-than-32-bytes", "my-test-salt", "changeme", "secret", "password"]
921 if value.lower() in weak_secrets:
922 logger.warning(f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! Please set a strong, unique value for production.")
924 # Check minimum length
925 if len(value) < 32:
926 logger.warning(f"⚠️ SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. Current length: {len(value)}")
928 # Basic entropy check (at least 10 unique characters)
929 if len(set(value)) < 10:
930 logger.warning(f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. Consider using a more random value.")
932 # Always return SecretStr to keep it secret-safe
933 return v if isinstance(v, SecretStr) else SecretStr(value)
935 @field_validator("basic_auth_password")
936 @classmethod
937 def validate_admin_password(cls, v: str | SecretStr, info: ValidationInfo) -> SecretStr:
938 """Validate admin password meets security requirements.
940 Args:
941 v: The admin password value to validate.
942 info: ValidationInfo containing field data.
944 Returns:
945 SecretStr: The validated admin password value, wrapped as SecretStr.
946 """
947 # Extract actual string value safely
948 if isinstance(v, SecretStr):
949 value = v.get_secret_value()
950 else:
951 value = v
953 if not info.data.get("client_mode"):
954 if value == "changeme": # nosec B105 - checking for default value
955 logger.warning("🔓 SECURITY WARNING: Default BASIC_AUTH_PASSWORD detected! Please change it if you enable API_ALLOW_BASIC_AUTH.")
957 # Note: We can't access password_min_length here as it's not set yet during validation
958 # Using default value of 8 to match the field default
959 min_length = 8 # This matches the default in password_min_length field
960 if len(value) < min_length:
961 logger.warning(f"⚠️ SECURITY WARNING: Admin password should be at least {min_length} characters long. Current length: {len(value)}")
963 # Check password complexity
964 has_upper = any(c.isupper() for c in value)
965 has_lower = any(c.islower() for c in value)
966 has_digit = any(c.isdigit() for c in value)
967 has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', value))
969 complexity_score = sum([has_upper, has_lower, has_digit, has_special])
970 if complexity_score < 3:
971 logger.warning("🔐 SECURITY WARNING: Admin password has low complexity. Should contain at least 3 of: uppercase, lowercase, digits, special characters")
973 # Always return SecretStr to keep it secret-safe
974 return v if isinstance(v, SecretStr) else SecretStr(value)
976 @field_validator("allowed_origins")
977 @classmethod
978 def validate_cors_origins(cls, v: Any, info: ValidationInfo) -> set[str] | None:
979 """Validate CORS allowed origins.
981 Args:
982 v: The set of allowed origins to validate.
983 info: ValidationInfo containing field data.
985 Returns:
986 set: The validated set of allowed origins.
988 Raises:
989 ValueError: If allowed_origins is not a set or list of strings.
990 """
991 if v is None:
992 return v
993 if not isinstance(v, (set, list)):
994 raise ValueError("allowed_origins must be a set or list of strings")
996 dangerous_origins = ["*", "null", ""]
997 if not info.data.get("client_mode"):
998 for origin in v:
999 if origin in dangerous_origins:
1000 logger.warning(f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. Consider specifying explicit origins instead of wildcards.")
1002 # Validate URL format
1003 if not origin.startswith(("http://", "https://")) and origin not in dangerous_origins:
1004 logger.warning(f"⚠️ SECURITY WARNING: Invalid origin format '{origin}'. Origins should start with http:// or https://")
1006 return set({str(origin) for origin in v})
1008 @field_validator("database_url")
1009 @classmethod
1010 def validate_database_url(cls, v: str, info: ValidationInfo) -> str:
1011 """Validate database connection string security.
1013 Args:
1014 v: The database URL to validate.
1015 info: ValidationInfo containing field data.
1017 Returns:
1018 str: The validated database URL.
1019 """
1020 # Check for hardcoded passwords in non-SQLite databases
1021 if not info.data.get("client_mode"):
1022 if not v.startswith("sqlite"):
1023 if "password" in v and any(weak in v for weak in ["password", "123", "admin", "test"]):
1024 logger.warning("Potentially weak database password detected. Consider using a stronger password.")
1026 # Warn about SQLite in production
1027 if v.startswith("sqlite"):
1028 logger.info("Using SQLite database. Consider PostgreSQL for production.")
1030 return v
1032 @model_validator(mode="after")
1033 def validate_security_combinations(self) -> Self:
1034 """Validate security setting combinations. Only logs warnings; no changes are made.
1036 Returns:
1037 Itself.
1038 """
1039 if not self.client_mode:
1040 # Check for dangerous combinations - only log warnings, don't raise errors
1041 if not self.auth_required and self.mcpgateway_ui_enabled:
1042 logger.warning("🔓 SECURITY WARNING: Admin UI is enabled without authentication. Consider setting AUTH_REQUIRED=true for production.")
1044 if self.skip_ssl_verify and not self.dev_mode:
1045 logger.warning("🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. This is a security risk! Set SKIP_SSL_VERIFY=false for production.")
1047 if self.debug and not self.dev_mode:
1048 logger.warning("🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. This may leak sensitive information! Set DEBUG=false for production.")
1050 return self
1052 def get_security_warnings(self) -> List[str]:
1053 """Get list of security warnings for current configuration.
1055 Returns:
1056 List[str]: List of security warning messages.
1057 """
1058 warnings = []
1060 # Authentication warnings
1061 if not self.auth_required:
1062 warnings.append("🔓 Authentication is disabled - ensure this is intentional")
1064 if self.basic_auth_user == "admin":
1065 warnings.append("⚠️ Using default admin username - consider changing it")
1067 # SSL/TLS warnings
1068 if self.skip_ssl_verify:
1069 warnings.append("🔓 SSL verification is disabled - not recommended for production")
1071 # Debug/Dev warnings
1072 if self.debug and not self.dev_mode:
1073 warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage")
1075 if self.dev_mode:
1076 warnings.append("🔧 Development mode enabled - not for production use")
1078 # CORS warnings
1079 if self.cors_enabled and "*" in self.allowed_origins:
1080 warnings.append("🌐 CORS allows all origins (*) - this is a security risk")
1082 # Token warnings
1083 if self.token_expiry > 10080: # More than 7 days
1084 warnings.append("⏱️ JWT token expiry is very long - consider shorter duration")
1086 # Database warnings
1087 if self.database_url.startswith("sqlite") and not self.dev_mode:
1088 warnings.append("💾 SQLite database in use - consider PostgreSQL for production")
1090 # Rate limiting warnings
1091 if self.tool_rate_limit > 1000:
1092 warnings.append("🚦 Tool rate limit is very high - may allow abuse")
1094 return warnings
1096 class SecurityStatus(TypedDict):
1097 """TypedDict for comprehensive security status."""
1099 secure_secrets: bool
1100 auth_enabled: bool
1101 ssl_verification: bool
1102 debug_disabled: bool
1103 cors_restricted: bool
1104 ui_protected: bool
1105 warnings: List[str]
1106 security_score: int
1108 def get_security_status(self) -> SecurityStatus:
1109 """Get comprehensive security status.
1111 Returns:
1112 SecurityStatus: Dictionary containing security status information including score and warnings.
1113 """
1115 # Compute a security score: 100 minus 10 for each warning
1116 security_score = max(0, 100 - 10 * len(self.get_security_warnings()))
1118 return {
1119 "secure_secrets": (self.jwt_secret_key.get_secret_value() if isinstance(self.jwt_secret_key, SecretStr) else self.jwt_secret_key)
1120 not in ("my-test-key", "my-test-key-but-now-longer-than-32-bytes"), # nosec B105 - checking for default values
1121 "auth_enabled": self.auth_required,
1122 "ssl_verification": not self.skip_ssl_verify,
1123 "debug_disabled": not self.debug,
1124 "cors_restricted": "*" not in self.allowed_origins if self.cors_enabled else True,
1125 "ui_protected": not self.mcpgateway_ui_enabled or self.auth_required,
1126 "warnings": self.get_security_warnings(),
1127 "security_score": security_score,
1128 }
1130 # Max retries for HTTP requests
1131 retry_max_attempts: int = 3
1132 retry_base_delay: float = 1.0 # seconds
1133 retry_max_delay: int = 60 # seconds
1134 retry_jitter_max: float = 0.5 # fraction of base delay
1136 # HTTPX Client Configuration (for shared singleton client)
1137 # See: https://www.python-httpx.org/advanced/#pool-limits
1138 # Formula: max_connections = expected_concurrent_outbound_requests × 1.5
1139 httpx_max_connections: int = Field(
1140 default=200,
1141 ge=10,
1142 le=1000,
1143 description="Maximum total concurrent HTTP connections (global, not per-host). " "Increase for high-traffic deployments with many outbound calls.",
1144 )
1145 httpx_max_keepalive_connections: int = Field(
1146 default=100,
1147 ge=1,
1148 le=500,
1149 description="Maximum idle keepalive connections to retain (typically 50% of max_connections)",
1150 )
1151 httpx_keepalive_expiry: float = Field(
1152 default=30.0,
1153 ge=5.0,
1154 le=300.0,
1155 description="Seconds before idle keepalive connections are closed",
1156 )
1157 httpx_connect_timeout: float = Field(
1158 default=5.0,
1159 ge=1.0,
1160 le=60.0,
1161 description="Timeout in seconds for establishing new connections (5s for LAN, increase for WAN)",
1162 )
1163 httpx_read_timeout: float = Field(
1164 default=120.0,
1165 ge=1.0,
1166 le=600.0,
1167 description="Timeout in seconds for reading response data (set high for slow MCP tool calls)",
1168 )
1169 httpx_write_timeout: float = Field(
1170 default=30.0,
1171 ge=1.0,
1172 le=600.0,
1173 description="Timeout in seconds for writing request data",
1174 )
1175 httpx_pool_timeout: float = Field(
1176 default=10.0,
1177 ge=1.0,
1178 le=120.0,
1179 description="Timeout in seconds waiting for a connection from the pool (fail fast on exhaustion)",
1180 )
1181 httpx_http2_enabled: bool = Field(
1182 default=False,
1183 description="Enable HTTP/2 (requires h2 package; enable only if upstreams support HTTP/2)",
1184 )
1185 httpx_admin_read_timeout: float = Field(
1186 default=30.0,
1187 ge=1.0,
1188 le=120.0,
1189 description="Read timeout for admin UI operations (model fetching, health checks). " "Shorter than httpx_read_timeout to fail fast on admin pages.",
1190 )
1192 @field_validator("allowed_origins", mode="before")
1193 @classmethod
1194 def _parse_allowed_origins(cls, v: Any) -> Set[str]:
1195 """Parse allowed origins from environment variable or config value.
1197 Handles multiple input formats for the allowed_origins field:
1198 - JSON array string: '["http://localhost", "http://example.com"]'
1199 - Comma-separated string: "http://localhost, http://example.com"
1200 - Already parsed set/list
1202 Automatically strips whitespace and removes outer quotes if present.
1204 Args:
1205 v: The input value to parse. Can be a string (JSON or CSV), set, list, or other iterable.
1207 Returns:
1208 Set[str]: A set of allowed origin strings.
1210 Examples:
1211 >>> sorted(Settings._parse_allowed_origins('["https://a.com", "https://b.com"]'))
1212 ['https://a.com', 'https://b.com']
1213 >>> sorted(Settings._parse_allowed_origins("https://x.com , https://y.com"))
1214 ['https://x.com', 'https://y.com']
1215 >>> Settings._parse_allowed_origins('""')
1216 set()
1217 >>> Settings._parse_allowed_origins('"https://single.com"')
1218 {'https://single.com'}
1219 >>> sorted(Settings._parse_allowed_origins(['http://a.com', 'http://b.com']))
1220 ['http://a.com', 'http://b.com']
1221 >>> Settings._parse_allowed_origins({'http://existing.com'})
1222 {'http://existing.com'}
1223 """
1224 if isinstance(v, str):
1225 v = v.strip()
1226 if v[:1] in "\"'" and v[-1:] == v[:1]: # strip 1 outer quote pair
1227 v = v[1:-1]
1228 try:
1229 parsed = set(orjson.loads(v))
1230 except orjson.JSONDecodeError:
1231 parsed = {s.strip() for s in v.split(",") if s.strip()}
1232 return parsed
1233 return set(v)
1235 # Logging
1236 log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default="ERROR")
1237 log_requests: bool = Field(default=False, description="Enable request payload logging with sensitive data masking")
1238 log_format: Literal["json", "text"] = "json" # json or text
1239 log_to_file: bool = False # Enable file logging (default: stdout/stderr only)
1240 log_filemode: str = "a+" # append or overwrite
1241 log_file: Optional[str] = None # Only used if log_to_file=True
1242 log_folder: Optional[str] = None # Only used if log_to_file=True
1244 # Log Rotation (optional - only used if log_to_file=True)
1245 log_rotation_enabled: bool = False # Enable log file rotation
1246 log_max_size_mb: int = 1 # Max file size in MB before rotation (default: 1MB)
1247 log_backup_count: int = 5 # Number of backup files to keep (default: 5)
1249 # Detailed Request Logging Configuration
1250 log_detailed_max_body_size: int = Field(
1251 default=16384, # 16KB - sensible default for request body logging
1252 ge=1024,
1253 le=1048576, # Max 1MB
1254 description="Maximum request body size to log in detailed mode (bytes). Separate from log_max_size_mb which is for file rotation.",
1255 )
1257 # Optional: endpoints to skip for detailed request logging (prefix match)
1258 log_detailed_skip_endpoints: List[str] = Field(
1259 default_factory=list,
1260 description="List of path prefixes to skip when log_detailed_requests is enabled",
1261 )
1263 # Whether to attempt resolving user identity via DB fallback when logging.
1264 # Keep default False to avoid implicit DB queries during normal request handling.
1265 log_resolve_user_identity: bool = Field(
1266 default=False,
1267 description="If true, RequestLoggingMiddleware will attempt DB fallback to resolve user identity when needed",
1268 )
1270 # Sampling rate for detailed request logging (0.0-1.0). Applied when log_detailed_requests is enabled.
1271 log_detailed_sample_rate: float = Field(
1272 default=1.0,
1273 ge=0.0,
1274 le=1.0,
1275 description="Fraction of requests to sample for detailed logging (0.0-1.0)",
1276 )
1278 # Log Buffer (for in-memory storage in admin UI)
1279 log_buffer_size_mb: float = 1.0 # Size of in-memory log buffer in MB
1281 # ===================================
1282 # Observability Configuration
1283 # ===================================
1285 # Enable observability features (traces, spans, metrics)
1286 observability_enabled: bool = Field(default=False, description="Enable observability tracing and metrics collection")
1288 # Automatic HTTP request tracing
1289 observability_trace_http_requests: bool = Field(default=True, description="Automatically trace HTTP requests")
1291 # Trace retention period (days)
1292 observability_trace_retention_days: int = Field(default=7, ge=1, description="Number of days to retain trace data")
1294 # Maximum traces to store (prevents unbounded growth)
1295 observability_max_traces: int = Field(default=100000, ge=1000, description="Maximum number of traces to retain")
1297 # Sample rate (0.0 to 1.0) - 1.0 means trace everything
1298 observability_sample_rate: float = Field(default=1.0, ge=0.0, le=1.0, description="Trace sampling rate (0.0-1.0)")
1300 # Include paths for tracing (regex patterns)
1301 observability_include_paths: List[str] = Field(
1302 default_factory=lambda: [
1303 r"^/rpc/?$",
1304 r"^/sse$",
1305 r"^/message$",
1306 r"^/mcp(?:/|$)",
1307 r"^/servers/[^/]+/mcp/?$",
1308 r"^/servers/[^/]+/sse$",
1309 r"^/servers/[^/]+/message$",
1310 r"^/a2a(?:/|$)",
1311 ],
1312 description="Regex patterns to include for tracing (when empty, all paths are eligible before excludes)",
1313 )
1315 # Exclude paths from tracing (regex patterns)
1316 observability_exclude_paths: List[str] = Field(
1317 default_factory=lambda: ["/health", "/healthz", "/ready", "/metrics", "/static/.*"],
1318 description="Regex patterns to exclude from tracing (applies after include patterns)",
1319 )
1321 # Enable performance metrics
1322 observability_metrics_enabled: bool = Field(default=True, description="Enable metrics collection")
1324 # Enable span events
1325 observability_events_enabled: bool = Field(default=True, description="Enable event logging within spans")
1327 # Correlation ID Settings
1328 correlation_id_enabled: bool = Field(default=True, description="Enable automatic correlation ID tracking for requests")
1329 correlation_id_header: str = Field(default="X-Correlation-ID", description="HTTP header name for correlation ID")
1330 correlation_id_preserve: bool = Field(default=True, description="Preserve correlation IDs from incoming requests")
1331 correlation_id_response_header: bool = Field(default=True, description="Include correlation ID in response headers")
1333 # ===================================
1334 # Database Query Logging (N+1 Detection)
1335 # ===================================
1336 db_query_log_enabled: bool = Field(default=False, description="Enable database query logging to file (for N+1 detection)")
1337 db_query_log_file: str = Field(default="logs/db-queries.log", description="Path to database query log file")
1338 db_query_log_json_file: str = Field(default="logs/db-queries.jsonl", description="Path to JSON Lines query log file")
1339 db_query_log_format: str = Field(default="both", description="Log format: 'json', 'text', or 'both'")
1340 db_query_log_min_queries: int = Field(default=1, ge=1, description="Only log requests with >= N queries")
1341 db_query_log_include_params: bool = Field(default=False, description="Include query parameters (may expose sensitive data)")
1342 db_query_log_detect_n1: bool = Field(default=True, description="Automatically detect and flag N+1 query patterns")
1343 db_query_log_n1_threshold: int = Field(default=3, ge=2, description="Number of similar queries to flag as potential N+1")
1345 # Structured Logging Configuration
1346 structured_logging_enabled: bool = Field(default=True, description="Enable structured JSON logging with database persistence")
1347 structured_logging_database_enabled: bool = Field(default=False, description="Persist structured logs to database (enables /api/logs/* endpoints, impacts performance)")
1348 structured_logging_external_enabled: bool = Field(default=False, description="Send logs to external systems")
1350 # Performance Tracking Configuration
1351 performance_tracking_enabled: bool = Field(default=True, description="Enable performance tracking and metrics")
1352 performance_threshold_database_query_ms: float = Field(default=100.0, description="Alert threshold for database queries (ms)")
1353 performance_threshold_tool_invocation_ms: float = Field(default=2000.0, description="Alert threshold for tool invocations (ms)")
1354 performance_threshold_resource_read_ms: float = Field(default=1000.0, description="Alert threshold for resource reads (ms)")
1355 performance_threshold_http_request_ms: float = Field(default=500.0, description="Alert threshold for HTTP requests (ms)")
1356 performance_degradation_multiplier: float = Field(default=1.5, description="Alert if performance degrades by this multiplier vs baseline")
1358 # Audit Trail Configuration
1359 # Audit trail logging is disabled by default for performance.
1360 # When enabled, it logs all CRUD operations (create, read, update, delete) on resources.
1361 # WARNING: This causes a database write on every API request and can cause significant load.
1362 audit_trail_enabled: bool = Field(default=False, description="Enable audit trail logging to database for compliance")
1363 permission_audit_enabled: bool = Field(
1364 default=False,
1365 description="Enable permission audit logging for RBAC checks (writes a row per permission check)",
1366 )
1368 # Security Logging Configuration
1369 # Security event logging is disabled by default for performance.
1370 # When enabled, it logs authentication attempts, authorization failures, and security events.
1371 # WARNING: "all" level logs every request and can cause significant database write load.
1372 security_logging_enabled: bool = Field(default=False, description="Enable security event logging to database")
1373 security_logging_level: Literal["all", "failures_only", "high_severity"] = Field(
1374 default="failures_only",
1375 description=(
1376 "Security logging level: "
1377 "'all' = log all events including successful auth (high DB load), "
1378 "'failures_only' = log only authentication/authorization failures, "
1379 "'high_severity' = log only high/critical severity events"
1380 ),
1381 )
1382 security_failed_auth_threshold: int = Field(default=5, description="Failed auth attempts before high severity alert")
1383 security_threat_score_alert: float = Field(default=0.7, description="Threat score threshold for alerts (0.0-1.0)")
1384 security_rate_limit_window_minutes: int = Field(default=5, description="Time window for rate limit checks (minutes)")
1386 # API Token Tracking Configuration
1387 # Controls how token usage and last_used timestamps are tracked
1388 token_usage_logging_enabled: bool = Field(default=True, description="Enable API token usage logging middleware")
1389 token_last_used_update_interval_minutes: int = Field(default=5, ge=1, le=1440, description="Minimum minutes between last_used timestamp updates (rate-limits DB writes)")
1391 # Metrics Aggregation Configuration
1392 metrics_aggregation_enabled: bool = Field(default=True, description="Enable automatic log aggregation into performance metrics")
1393 metrics_aggregation_backfill_hours: int = Field(default=6, ge=0, le=168, description="Hours of structured logs to backfill into performance metrics on startup")
1394 metrics_aggregation_window_minutes: int = Field(default=5, description="Time window for metrics aggregation (minutes)")
1395 metrics_aggregation_auto_start: bool = Field(default=False, description="Automatically run the log aggregation loop on application startup")
1396 yield_batch_size: int = Field(
1397 default=1000,
1398 ge=100,
1399 le=100000,
1400 description="Number of rows fetched per batch when streaming hourly metric data from the database. "
1401 "Used to limit memory usage during aggregation and percentile calculations. "
1402 "Smaller values reduce memory footprint but increase DB round-trips; larger values improve throughput "
1403 "at the cost of higher memory usage.",
1404 )
1406 # Execution Metrics Recording
1407 # Controls whether tool/resource/prompt/server/A2A execution metrics are written to the database.
1408 # Disable if using external observability (ELK, Datadog, Splunk).
1409 # Note: Does NOT affect log aggregation (METRICS_AGGREGATION_ENABLED) or Prometheus (ENABLE_METRICS).
1410 db_metrics_recording_enabled: bool = Field(
1411 default=True, description="Enable recording of execution metrics (tool/resource/prompt/server/A2A) to database. Disable if using external observability."
1412 )
1414 # Metrics Buffer Configuration (for batching tool/resource/prompt metrics writes)
1415 metrics_buffer_enabled: bool = Field(default=True, description="Enable buffered metrics writes (reduces DB pressure under load)")
1416 metrics_buffer_flush_interval: int = Field(default=60, ge=5, le=300, description="Seconds between automatic metrics buffer flushes")
1417 metrics_buffer_max_size: int = Field(default=1000, ge=100, le=10000, description="Maximum buffered metrics before forced flush")
1419 # Metrics Cache Configuration (for caching aggregate metrics queries)
1420 metrics_cache_enabled: bool = Field(default=True, description="Enable in-memory caching for aggregate metrics queries")
1421 metrics_cache_ttl_seconds: int = Field(default=60, ge=1, le=300, description="TTL for cached aggregate metrics in seconds")
1423 # Metrics Cleanup Configuration (automatic deletion of old metrics)
1424 metrics_cleanup_enabled: bool = Field(default=True, description="Enable automatic cleanup of old metrics data")
1425 metrics_retention_days: int = Field(default=7, ge=1, le=365, description="Days to retain raw metrics before cleanup (fallback when rollup disabled)")
1426 metrics_cleanup_interval_hours: int = Field(default=1, ge=1, le=168, description="Hours between automatic cleanup runs")
1427 metrics_cleanup_batch_size: int = Field(default=10000, ge=100, le=100000, description="Batch size for metrics deletion (prevents long locks)")
1429 # Metrics Rollup Configuration (hourly aggregation for historical queries)
1430 metrics_rollup_enabled: bool = Field(default=True, description="Enable hourly metrics rollup for efficient historical queries")
1431 metrics_rollup_interval_hours: int = Field(default=1, ge=1, le=24, description="Hours between rollup runs")
1432 metrics_rollup_retention_days: int = Field(default=365, ge=30, le=3650, description="Days to retain hourly rollup data")
1433 metrics_rollup_late_data_hours: int = Field(
1434 default=1, ge=1, le=48, description="Hours to re-process on each run to catch late-arriving data (smaller = less CPU, larger = more tolerance for delayed metrics)"
1435 )
1436 metrics_delete_raw_after_rollup: bool = Field(default=True, description="Delete raw metrics after hourly rollup exists (recommended for production)")
1437 metrics_delete_raw_after_rollup_hours: int = Field(default=1, ge=1, le=8760, description="Hours to retain raw metrics when hourly rollup exists")
1439 # Auth Cache Configuration (reduces DB queries during authentication)
1440 auth_cache_enabled: bool = Field(default=True, description="Enable Redis/in-memory caching for authentication data (user, team, revocation)")
1441 auth_cache_user_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for cached user data")
1442 auth_cache_revocation_ttl: int = Field(default=30, ge=5, le=120, description="TTL in seconds for token revocation cache (security-critical, keep short)")
1443 auth_cache_team_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for team membership cache")
1444 auth_cache_role_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user role in team cache")
1445 auth_cache_teams_enabled: bool = Field(default=True, description="Enable caching for get_user_teams() (default: true)")
1446 auth_cache_teams_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user teams list cache")
1447 auth_cache_batch_queries: bool = Field(default=True, description="Batch auth DB queries into single call (reduces 3 queries to 1)")
1449 # Registry Cache Configuration (reduces DB queries for list endpoints)
1450 registry_cache_enabled: bool = Field(default=True, description="Enable caching for registry list endpoints (tools, prompts, resources, etc.)")
1451 registry_cache_tools_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for tools list cache")
1452 registry_cache_prompts_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for prompts list cache")
1453 registry_cache_resources_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for resources list cache")
1454 registry_cache_agents_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for agents list cache")
1455 registry_cache_servers_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for servers list cache")
1456 registry_cache_gateways_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for gateways list cache")
1457 registry_cache_catalog_ttl: int = Field(default=300, ge=60, le=600, description="TTL in seconds for catalog servers list cache (external catalog, changes infrequently)")
1459 # Tool Lookup Cache Configuration (reduces hot-path DB lookups in invoke_tool)
1460 tool_lookup_cache_enabled: bool = Field(default=True, description="Enable tool lookup cache (tool name -> tool config)")
1461 tool_lookup_cache_ttl_seconds: int = Field(default=60, ge=5, le=600, description="TTL in seconds for tool lookup cache entries")
1462 tool_lookup_cache_negative_ttl_seconds: int = Field(default=10, ge=1, le=60, description="TTL in seconds for negative tool lookup cache entries")
1463 tool_lookup_cache_l1_maxsize: int = Field(default=10000, ge=100, le=1000000, description="Max entries for in-memory tool lookup cache (L1)")
1464 tool_lookup_cache_l2_enabled: bool = Field(default=True, description="Enable Redis-backed tool lookup cache (L2) when cache_type=redis")
1466 # Admin Stats Cache Configuration (reduces dashboard query overhead)
1467 admin_stats_cache_enabled: bool = Field(default=True, description="Enable caching for admin dashboard statistics")
1468 admin_stats_cache_system_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for system stats cache")
1469 admin_stats_cache_observability_ttl: int = Field(default=30, ge=10, le=120, description="TTL in seconds for observability stats cache")
1470 admin_stats_cache_tags_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for tags listing cache")
1471 admin_stats_cache_plugins_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for plugin stats cache")
1472 admin_stats_cache_performance_ttl: int = Field(default=60, ge=15, le=300, description="TTL in seconds for performance aggregates cache")
1474 # Team Member Count Cache Configuration (reduces N+1 queries in admin UI)
1475 team_member_count_cache_enabled: bool = Field(default=True, description="Enable Redis caching for team member counts")
1476 team_member_count_cache_ttl: int = Field(default=300, ge=30, le=3600, description="TTL in seconds for team member count cache (default: 5 minutes)")
1478 # Log Search Configuration
1479 log_search_max_results: int = Field(default=1000, description="Maximum results per log search query")
1480 log_retention_days: int = Field(default=30, description="Number of days to retain logs in database")
1482 # External Log Integration Configuration
1483 elasticsearch_enabled: bool = Field(default=False, description="Send logs to Elasticsearch")
1484 elasticsearch_url: Optional[str] = Field(default=None, description="Elasticsearch cluster URL")
1485 elasticsearch_index_prefix: str = Field(default="mcpgateway-logs", description="Elasticsearch index prefix")
1486 syslog_enabled: bool = Field(default=False, description="Send logs to syslog")
1487 syslog_host: Optional[str] = Field(default=None, description="Syslog server host")
1488 syslog_port: int = Field(default=514, description="Syslog server port")
1489 webhook_logging_enabled: bool = Field(default=False, description="Send logs to webhook endpoints")
1490 webhook_logging_urls: List[str] = Field(default_factory=list, description="Webhook URLs for log delivery")
1492 @field_validator("log_level", mode="before")
1493 @classmethod
1494 def validate_log_level(cls, v: str) -> str:
1495 """
1496 Normalize and validate the log level value.
1498 Ensures that the input string matches one of the allowed log levels,
1499 case-insensitively. The value is uppercased before validation so that
1500 "debug", "Debug", etc. are all accepted as "DEBUG".
1502 Args:
1503 v (str): The log level string provided via configuration or environment.
1505 Returns:
1506 str: The validated and normalized (uppercase) log level.
1508 Raises:
1509 ValueError: If the provided value is not one of
1510 {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}.
1511 """
1512 allowed = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
1513 v_up = v.upper()
1514 if v_up not in allowed:
1515 raise ValueError(f"Invalid log_level: {v}")
1516 return v_up
1518 # Transport
1519 mcpgateway_ws_relay_enabled: bool = Field(default=False, description="Enable WebSocket JSON-RPC relay endpoint at /ws")
1520 mcpgateway_reverse_proxy_enabled: bool = Field(default=False, description="Enable reverse-proxy transport endpoints under /reverse-proxy/*")
1521 transport_type: str = "all" # http, ws, sse, all
1522 websocket_ping_interval: int = 30 # seconds
1523 sse_retry_timeout: int = 5000 # milliseconds - client retry interval on disconnect
1524 sse_keepalive_enabled: bool = True # Enable SSE keepalive events
1525 sse_keepalive_interval: int = 30 # seconds between keepalive events
1526 sse_send_timeout: float = 30.0 # seconds - timeout for ASGI send() calls, protects against hung connections
1527 sse_rapid_yield_window_ms: int = 1000 # milliseconds - time window for rapid yield detection
1528 sse_rapid_yield_max: int = 50 # max yields per window before assuming client disconnected (0=disabled)
1530 # Gateway/Server Connection Timeout
1531 # Timeout in seconds for HTTP requests to registered gateways and MCP servers.
1532 # Used by: GatewayService, ToolService, ServerService for health checks and tool invocations.
1533 # Note: Previously part of federation settings, retained for gateway connectivity.
1534 federation_timeout: int = 120
1536 # SSO
1537 # For sso_issuers strip out quotes to ensure we're passing valid JSON via env
1538 sso_issuers: Optional[list[HttpUrl]] = Field(default=None)
1540 @field_validator("sso_issuers", mode="before")
1541 @classmethod
1542 def parse_issuers(cls, v: Any) -> list[str]:
1543 """
1544 Parse and validate the SSO issuers configuration value.
1546 Accepts:
1547 - JSON array string: '["https://idp1.com", "https://idp2.com"]'
1548 - Comma-separated string: "https://idp1.com, https://idp2.com"
1549 - Empty string or None → []
1550 - Already-parsed list
1552 Args:
1553 v: The input value to parse.
1555 Returns:
1556 list[str]: Parsed list of issuer URLs.
1558 Raises:
1559 ValueError: If the input is not a valid format.
1560 """
1561 if v is None:
1562 return []
1563 if isinstance(v, list):
1564 return v
1565 if isinstance(v, str):
1566 s = v.strip()
1567 if not s:
1568 return []
1569 if s.startswith("["):
1570 try:
1571 parsed = orjson.loads(s)
1572 return parsed if isinstance(parsed, list) else []
1573 except orjson.JSONDecodeError:
1574 raise ValueError(f"Invalid JSON for SSO_ISSUERS: {v!r}")
1575 # Fallback to comma-separated parsing
1576 return [item.strip() for item in s.split(",") if item.strip()]
1577 raise ValueError("Invalid type for SSO_ISSUERS")
1579 # Resources
1580 resource_cache_size: int = 1000
1581 resource_cache_ttl: int = 3600 # seconds
1582 max_resource_size: int = 10 * 1024 * 1024 # 10MB
1583 allowed_mime_types: Set[str] = {
1584 "text/plain",
1585 "text/markdown",
1586 "text/html",
1587 "application/json",
1588 "application/xml",
1589 "image/png",
1590 "image/jpeg",
1591 "image/gif",
1592 }
1594 # Tools
1595 tool_timeout: int = 60 # seconds
1596 max_tool_retries: int = 3
1597 tool_rate_limit: int = 100 # requests per minute
1598 tool_concurrent_limit: int = 10
1599 rest_response_text_max_length: int = Field(
1600 default=5000,
1601 ge=1000,
1602 le=100000,
1603 description="Maximum length of response text to return for non-JSON REST API responses. "
1604 "Longer responses are truncated to prevent exposing excessive sensitive data. "
1605 "Default: 5000 characters. Range: 1000-100000.",
1606 )
1608 # Content Security - Size Limits
1609 content_max_resource_size: int = Field(default=102400, ge=1024, le=10485760, description="Maximum size in bytes for resource content (default: 100KB)") # 100KB # Minimum 1KB # Maximum 10MB
1610 content_max_prompt_size: int = Field(default=10240, ge=512, le=1048576, description="Maximum size in bytes for prompt templates (default: 10KB)") # 10KB # Minimum 512 bytes # Maximum 1MB
1612 # Content Security - MIME Type Restrictions (US-2)
1613 content_allowed_resource_mimetypes: List[str] = Field(
1614 default_factory=lambda: [
1615 "text/plain",
1616 "text/markdown",
1617 "text/html",
1618 "text/csv",
1619 "application/json",
1620 "application/xml",
1621 "application/yaml",
1622 "application/pdf",
1623 "image/png",
1624 "image/jpeg",
1625 "image/gif",
1626 "image/svg+xml",
1627 "image/webp",
1628 "audio/mpeg",
1629 "audio/wav",
1630 "video/mp4",
1631 "video/webm",
1632 ],
1633 description="Allowed MIME types for resources. In strict mode, only types explicitly listed here are accepted. Vendor types (application/x-*, text/x-*) and suffix types (+json, +xml) must be explicitly added if needed.",
1634 )
1635 content_strict_mime_validation: bool = Field(
1636 default=False,
1637 description="Enable strict MIME type validation for resources (US-2). Set to false to log violations without blocking.",
1638 )
1640 # MCP Session Pool - reduces per-request latency from ~20ms to ~1-2ms
1641 # Disabled by default for safety. Enable explicitly in production after testing.
1642 mcp_session_pool_enabled: bool = False
1643 mcp_session_pool_max_per_key: int = 10 # Max sessions per (URL, identity, transport)
1644 mcp_session_pool_ttl: float = 300.0 # Session TTL in seconds
1645 mcp_session_pool_health_check_interval: float = 60.0 # Idle time before health check (aligned with health_check_interval)
1646 mcp_session_pool_acquire_timeout: float = 30.0 # Timeout waiting for session slot
1647 mcp_session_pool_create_timeout: float = 30.0 # Timeout creating new session
1648 mcp_session_pool_circuit_breaker_threshold: int = 5 # Failures before circuit opens
1649 mcp_session_pool_circuit_breaker_reset: float = 60.0 # Seconds before circuit resets
1650 mcp_session_pool_idle_eviction: float = 600.0 # Evict idle pool keys after this time
1651 # Transport timeout for pooled sessions (default 30s to match MCP SDK default).
1652 # This timeout applies to all HTTP operations (connect, read, write) on pooled sessions.
1653 # Use a higher value for deployments with long-running tool calls.
1654 mcp_session_pool_transport_timeout: float = 30.0
1655 # Force explicit RPC (list_tools) on gateway health checks even when session is fresh.
1656 # Off by default: pool's internal staleness check (idle > health_check_interval) handles this.
1657 # Enable for stricter health verification at the cost of ~5ms latency per check.
1658 mcp_session_pool_explicit_health_rpc: bool = False
1659 # Configurable health check chain - ordered list of methods to try.
1660 # Options: ping, list_tools, list_prompts, list_resources, skip
1661 # Default: ping,skip - try lightweight ping, skip if unsupported (for legacy servers)
1662 mcp_session_pool_health_check_methods: List[str] = ["ping", "skip"]
1663 # Timeout in seconds for each health check attempt
1664 mcp_session_pool_health_check_timeout: float = 5.0
1665 mcp_session_pool_identity_headers: List[str] = ["authorization", "x-tenant-id", "x-user-id", "x-api-key", "cookie", "x-mcp-session-id"]
1666 # Timeout for session/transport cleanup operations (__aexit__ calls).
1667 # This prevents CPU spin loops when internal tasks (like post_writer waiting on
1668 # memory streams) don't respond to cancellation. Does NOT affect tool execution
1669 # time - only cleanup of idle/released sessions. Increase if you see frequent
1670 # "cleanup timed out" warnings; decrease for faster recovery from spin loops.
1671 mcp_session_pool_cleanup_timeout: float = 5.0
1673 # Timeout for SSE task group cleanup (seconds).
1674 # When an SSE connection is cancelled, this controls how long to wait for
1675 # internal tasks to respond before forcing cleanup. Shorter values reduce
1676 # CPU waste during anyio _deliver_cancellation spin loops but may interrupt
1677 # legitimate cleanup. Only affects cancelled connections, not normal operation.
1678 # See: https://github.com/agronholm/anyio/issues/695
1679 sse_task_group_cleanup_timeout: float = 5.0
1681 # =========================================================================
1682 # EXPERIMENTAL: anyio _deliver_cancellation spin loop workaround
1683 # =========================================================================
1684 # When enabled, monkey-patches anyio's CancelScope._deliver_cancellation to
1685 # limit the number of retry iterations. This prevents 100% CPU spin loops
1686 # when tasks don't respond to CancelledError (anyio issue #695).
1687 #
1688 # WARNING: This is a workaround for an upstream issue. May be removed when
1689 # anyio or MCP SDK fix the underlying problem. Enable only if you experience
1690 # CPU spin loops during SSE/MCP connection cleanup.
1691 #
1692 # Trade-offs when enabled:
1693 # - Prevents indefinite CPU spin (good)
1694 # - May leave some tasks uncancelled after max iterations (usually harmless)
1695 # - Worker recycling (GUNICORN_MAX_REQUESTS) cleans up orphaned tasks
1696 #
1697 # See: https://github.com/agronholm/anyio/issues/695
1698 # Env: ANYIO_CANCEL_DELIVERY_PATCH_ENABLED
1699 anyio_cancel_delivery_patch_enabled: bool = False
1701 # Maximum iterations for _deliver_cancellation before giving up.
1702 # Only used when anyio_cancel_delivery_patch_enabled=True.
1703 # Higher values = more attempts to cancel tasks, but longer potential spin.
1704 # Lower values = faster recovery, but more orphaned tasks.
1705 # Env: ANYIO_CANCEL_DELIVERY_MAX_ITERATIONS
1706 anyio_cancel_delivery_max_iterations: int = 100
1708 # Session Affinity
1709 mcpgateway_session_affinity_enabled: bool = False # Global session affinity toggle
1710 mcpgateway_session_affinity_ttl: int = 300 # Session affinity binding TTL
1711 mcpgateway_session_affinity_max_sessions: int = 1 # Max sessions per identity for affinity
1712 mcpgateway_pool_rpc_forward_timeout: int = 30 # Timeout for forwarding RPC requests to owner worker
1714 # Prompts
1715 prompt_cache_size: int = 100
1716 max_prompt_size: int = 100 * 1024 # 100KB
1717 prompt_render_timeout: int = 10 # seconds
1719 # Health Checks
1720 # Interval in seconds between health checks (aligned with mcp_session_pool_health_check_interval)
1721 health_check_interval: int = 60
1722 # Timeout in seconds for each health check request
1723 health_check_timeout: int = 5
1724 # Per-check timeout (seconds) to bound total time of one gateway health check
1725 # Env: GATEWAY_HEALTH_CHECK_TIMEOUT
1726 gateway_health_check_timeout: float = 5.0
1727 # Consecutive failures before marking gateway offline
1728 unhealthy_threshold: int = 3
1729 # Max concurrent health checks per worker
1730 max_concurrent_health_checks: int = 10
1732 # Auto-refresh tools/resources/prompts from gateways during health checks
1733 # When enabled, tools/resources/prompts are fetched and synced with DB during health checks
1734 auto_refresh_servers: bool = Field(default=False, description="Enable automatic tool/resource/prompt refresh during gateway health checks")
1736 # Per-gateway refresh configuration (used when auto_refresh_servers is True)
1737 # Gateways can override this with their own refresh_interval_seconds
1738 gateway_auto_refresh_interval: int = Field(default=300, ge=60, description="Default refresh interval in seconds for gateway tools/resources/prompts sync (minimum 60 seconds)")
1740 # Hot/Cold Server Classification
1741 # Classify servers by usage (hot = active sessions, cold = inactive) for optimized polling
1742 # Poll intervals auto-derived: hot = gateway_auto_refresh_interval (1x), cold = 3x
1743 # Classification refresh uses gateway_auto_refresh_interval (no separate config needed)
1744 hot_cold_classification_enabled: bool = Field(default=False, description="Enable hot/cold server classification for staggered polling (requires Redis for multi-worker)")
1746 # Validation Gateway URL
1747 gateway_validation_timeout: int = 5 # seconds
1748 gateway_max_redirects: int = 5
1750 filelock_name: str = "gateway_service_leader.lock"
1752 # Default Roots
1753 default_roots: List[str] = []
1755 # Database
1756 db_driver: str = "postgresql+psycopg"
1757 db_pool_size: int = 200
1758 db_max_overflow: int = 10
1759 db_pool_timeout: int = 30
1760 db_pool_recycle: int = 3600
1761 db_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total)
1762 db_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter
1763 db_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap)
1765 # Database Performance Optimization
1766 use_postgresdb_percentiles: bool = Field(
1767 default=True,
1768 description="Use database-native percentile functions (percentile_cont) for performance metrics. "
1769 "When enabled, PostgreSQL uses native SQL percentile calculations (5-10x faster). "
1770 "When disabled or using SQLite, falls back to Python-based percentile calculations. "
1771 "Recommended: true for PostgreSQL, auto-detected for SQLite.",
1772 )
1774 # psycopg3-specific: Number of times a query must be executed before it's
1775 # prepared server-side. Set to 0 to disable, 1 to prepare immediately.
1776 # Default of 5 balances memory usage with query performance.
1777 db_prepare_threshold: int = Field(default=5, ge=0, le=100, description="psycopg3 prepare_threshold for auto-prepared statements")
1779 # Connection pool class: "auto" (default), "null", or "queue"
1780 # - "auto": Uses NullPool when PgBouncer detected, QueuePool otherwise
1781 # - "null": Always use NullPool (recommended with PgBouncer - lets PgBouncer handle pooling)
1782 # - "queue": Always use QueuePool (application-side pooling)
1783 db_pool_class: Literal["auto", "null", "queue"] = Field(
1784 default="auto",
1785 description="Connection pool class: auto (NullPool with PgBouncer), null, or queue",
1786 )
1788 # Pre-ping connections before checkout (validates connection is alive)
1789 # - "auto": Enabled for non-PgBouncer, disabled for PgBouncer (default)
1790 # - "true": Always enable (adds SELECT 1 overhead but catches stale connections)
1791 # - "false": Always disable
1792 db_pool_pre_ping: Literal["auto", "true", "false"] = Field(
1793 default="auto",
1794 description="Pre-ping connections: auto, true, or false",
1795 )
1797 # SQLite busy timeout: Maximum time (ms) SQLite will wait to acquire a database lock before returning SQLITE_BUSY.
1798 db_sqlite_busy_timeout: int = Field(default=5000, ge=1000, le=60000, description="SQLite busy timeout in milliseconds (default: 5000ms)")
1800 # Cache
1801 cache_type: Literal["redis", "memory", "none", "database"] = "database" # memory or redis or database
1802 redis_url: Optional[str] = "redis://localhost:6379/0"
1803 cache_prefix: str = "mcpgw:"
1804 session_ttl: int = 3600
1805 message_ttl: int = 600
1806 redis_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total)
1807 redis_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter
1808 redis_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap)
1810 # GlobalConfig In-Memory Cache (Issue #1715)
1811 # Caches GlobalConfig (passthrough headers) to eliminate redundant DB queries
1812 global_config_cache_ttl: int = Field(
1813 default=60,
1814 ge=5,
1815 le=3600,
1816 description="TTL in seconds for GlobalConfig in-memory cache (default: 60)",
1817 )
1819 # A2A Stats In-Memory Cache
1820 # Caches A2A agent counts (total, active) to eliminate redundant COUNT queries
1821 a2a_stats_cache_ttl: int = Field(
1822 default=30,
1823 ge=5,
1824 le=3600,
1825 description="TTL in seconds for A2A stats in-memory cache (default: 30)",
1826 )
1828 # Redis Parser Configuration (ADR-026)
1829 # hiredis C parser provides up to 83x faster response parsing for large responses
1830 redis_parser: Literal["auto", "hiredis", "python"] = Field(
1831 default="auto",
1832 description="Redis protocol parser: auto (use hiredis if available), hiredis (require hiredis), python (pure-Python)",
1833 )
1835 # Redis Connection Pool - Performance Optimized
1836 redis_decode_responses: bool = Field(default=True, description="Return strings instead of bytes")
1837 redis_max_connections: int = Field(default=50, description="Connection pool size per worker")
1838 redis_socket_timeout: float = Field(default=2.0, description="Socket read/write timeout in seconds")
1839 redis_socket_connect_timeout: float = Field(default=2.0, description="Connection timeout in seconds")
1840 redis_retry_on_timeout: bool = Field(default=True, description="Retry commands on timeout")
1841 redis_health_check_interval: int = Field(default=30, description="Seconds between connection health checks (0=disabled)")
1843 # Redis Leader Election - Multi-Node Deployments
1844 redis_leader_ttl: int = Field(default=15, description="Leader election TTL in seconds")
1845 redis_leader_key: str = Field(default="gateway_service_leader", description="Leader key name")
1846 redis_leader_heartbeat_interval: int = Field(default=5, description="Seconds between leader heartbeats")
1848 # streamable http transport
1849 use_stateful_sessions: bool = False # Set to False to use stateless sessions without event store
1850 json_response_enabled: bool = True # Enable JSON responses instead of SSE streams
1851 streamable_http_max_events_per_stream: int = 100 # Ring buffer capacity per stream
1852 streamable_http_event_ttl: int = 3600 # Event stream TTL in seconds (1 hour)
1854 # Development
1855 dev_mode: bool = False
1856 reload: bool = False
1857 debug: bool = False
1859 # Observability (OpenTelemetry)
1860 deployment_env: str = Field(default="development", validation_alias=AliasChoices("DEPLOYMENT_ENV", "ENVIRONMENT"), description="Deployment environment label")
1861 otel_enable_observability: bool = Field(default=False, description="Enable OpenTelemetry observability")
1862 otel_traces_exporter: str = Field(default="otlp", description="Traces exporter: otlp, jaeger, zipkin, console, none")
1863 otel_exporter_otlp_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint (e.g., http://localhost:4317)")
1864 otel_exporter_otlp_protocol: str = Field(default="grpc", description="OTLP protocol: grpc or http")
1865 otel_exporter_otlp_insecure: bool = Field(default=True, description="Use insecure connection for OTLP")
1866 otel_exporter_otlp_headers: Optional[str] = Field(default=None, description="OTLP headers (comma-separated key=value)")
1867 otel_emit_langfuse_attributes: Optional[bool] = Field(
1868 default=None,
1869 description="Emit Langfuse-specific span attributes. Defaults to auto-enable when a Langfuse OTLP endpoint is configured.",
1870 )
1871 otel_capture_identity_attributes: Optional[bool] = Field(
1872 default=None,
1873 description="Capture user/team identity span attributes. Defaults to auto-enable when Langfuse-specific attributes are emitted.",
1874 )
1875 otel_copy_resource_attrs_to_spans: bool = Field(default=False, description="Copy selected OTEL resource attributes onto spans")
1876 otel_redact_fields: str = Field(
1877 default="password,secret,token,api_key,authorization,credential,auth_value,access_token,refresh_token,auth_token,client_secret,cookie,set-cookie,private_key,session_id,sessionid",
1878 description="Comma-separated trace payload field names to redact before export",
1879 )
1880 otel_max_trace_payload_size: int = Field(default=32768, ge=256, description="Maximum serialized trace payload size in characters")
1881 otel_capture_input_spans: str = Field(default="", description="Comma-separated span names allowed to capture input payloads")
1882 otel_capture_output_spans: str = Field(default="", description="Comma-separated span names allowed to capture output payloads")
1883 langfuse_otel_endpoint: Optional[str] = Field(default=None, description="Langfuse OTLP/HTTP endpoint override")
1884 langfuse_public_key: Optional[SecretStr] = Field(default=None, description="Langfuse project public key for derived OTLP auth")
1885 langfuse_secret_key: Optional[SecretStr] = Field(default=None, description="Langfuse project secret key for derived OTLP auth")
1886 langfuse_otel_auth: Optional[SecretStr] = Field(default=None, description="Base64-encoded Langfuse OTLP basic auth override")
1887 otel_exporter_jaeger_endpoint: Optional[str] = Field(default=None, description="Jaeger endpoint")
1888 otel_exporter_jaeger_user: Optional[str] = Field(default=None, description="Jaeger collector username")
1889 otel_exporter_jaeger_password: Optional[SecretStr] = Field(default=None, description="Jaeger collector password")
1890 otel_exporter_zipkin_endpoint: Optional[str] = Field(default=None, description="Zipkin endpoint")
1891 otel_service_name: str = Field(default="mcp-gateway", description="Service name for traces")
1892 otel_resource_attributes: Optional[str] = Field(default=None, description="Resource attributes (comma-separated key=value)")
1893 otel_bsp_max_queue_size: int = Field(default=2048, description="Max queue size for batch span processor")
1894 otel_bsp_max_export_batch_size: int = Field(default=512, description="Max export batch size")
1895 otel_bsp_schedule_delay: int = Field(default=5000, description="Schedule delay in milliseconds")
1897 # ===================================
1898 # Well-Known URI Configuration
1899 # ===================================
1901 # Enable well-known URI endpoints
1902 well_known_enabled: bool = True
1904 # robots.txt content (default: disallow all crawling for private API)
1905 well_known_robots_txt: str = """User-agent: *
1906Disallow: /
1908# ContextForge is a private API gateway
1909# Public crawling is disabled by default"""
1911 # security.txt content (optional, user-defined)
1912 # Example: "Contact: security@example.com\nExpires: 2025-12-31T23:59:59Z\nPreferred-Languages: en"
1913 well_known_security_txt: str = ""
1915 # Enable security.txt only if content is provided
1916 well_known_security_txt_enabled: bool = False
1918 # Additional custom well-known files (JSON format)
1919 # Example: {"ai.txt": "This service uses AI for...", "dnt-policy.txt": "Do Not Track policy..."}
1920 well_known_custom_files: str = "{}"
1922 # Cache control for well-known files (seconds)
1923 well_known_cache_max_age: int = 3600 # 1 hour default
1925 # ===================================
1926 # Performance / Startup Tuning
1927 # ===================================
1929 slug_refresh_batch_size: int = Field(default=1000, description="Batch size for gateway/tool slug refresh at startup")
1930 model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore")
1932 gateway_tool_name_separator: str = "-"
1933 valid_slug_separator_regexp: ClassVar[str] = r"^(-{1,2}|[_.])$"
1935 @field_validator("gateway_tool_name_separator")
1936 @classmethod
1937 def must_be_allowed_sep(cls, v: str) -> str:
1938 """Validate the gateway tool name separator.
1940 Args:
1941 v: The separator value to validate.
1943 Returns:
1944 The validated separator, defaults to '-' if invalid.
1946 Examples:
1947 >>> Settings.must_be_allowed_sep('-')
1948 '-'
1949 >>> Settings.must_be_allowed_sep('--')
1950 '--'
1951 >>> Settings.must_be_allowed_sep('_')
1952 '_'
1953 >>> Settings.must_be_allowed_sep('.')
1954 '.'
1955 >>> Settings.must_be_allowed_sep('invalid')
1956 '-'
1957 """
1958 if not re.fullmatch(cls.valid_slug_separator_regexp, v):
1959 logger.warning(
1960 f"Invalid gateway_tool_name_separator '{v}'. Must be '-', '--', '_' or '.'. Defaulting to '-'.",
1961 stacklevel=2,
1962 )
1963 return "-"
1964 return v
1966 @property
1967 def custom_well_known_files(self) -> Dict[str, str]:
1968 """Parse custom well-known files from JSON string.
1970 Returns:
1971 Dict[str, str]: Parsed custom well-known files mapping filename to content.
1972 """
1973 try:
1974 return orjson.loads(self.well_known_custom_files) if self.well_known_custom_files else {}
1975 except orjson.JSONDecodeError:
1976 logger.error(f"Invalid JSON in WELL_KNOWN_CUSTOM_FILES: {self.well_known_custom_files}")
1977 return {}
1979 @property
1980 def hot_server_check_interval(self) -> float:
1981 """Hot server polling interval (auto-derived from gateway_auto_refresh_interval).
1983 Hot servers (top 20% by usage) are polled at the same rate as gateway tool refresh.
1985 Returns:
1986 float: Hot server check interval in seconds (equals gateway_auto_refresh_interval)
1987 """
1988 return float(self.gateway_auto_refresh_interval)
1990 @property
1991 def cold_server_check_interval(self) -> float:
1992 """Cold server polling interval (auto-derived from gateway_auto_refresh_interval).
1994 Cold servers (remaining 80%) are polled at 3x the gateway refresh rate to save resources.
1996 Examples:
1997 - gateway_auto_refresh_interval=300s → cold=900s (15 minutes)
1998 - gateway_auto_refresh_interval=60s → cold=180s (3 minutes)
2000 Returns:
2001 float: Cold server check interval in seconds (3x gateway_auto_refresh_interval)
2002 """
2003 return float(self.gateway_auto_refresh_interval * 3)
2005 @field_validator("well_known_security_txt_enabled", mode="after")
2006 @classmethod
2007 def _auto_enable_security_txt(cls, v: Any, info: ValidationInfo) -> bool:
2008 """Auto-enable security.txt if content is provided.
2010 Args:
2011 v: The current value of well_known_security_txt_enabled.
2012 info: ValidationInfo containing field data.
2014 Returns:
2015 bool: True if security.txt content is provided, otherwise the original value.
2016 """
2017 if info.data and "well_known_security_txt" in info.data:
2018 return bool(info.data["well_known_security_txt"].strip())
2019 return bool(v)
2021 @field_validator("experimental_rust_mcp_runtime_uds", mode="after")
2022 @classmethod
2023 def _validate_experimental_rust_mcp_runtime_uds(cls, value: Optional[str]) -> Optional[str]:
2024 """Validate the optional UDS path used for the Rust MCP runtime sidecar.
2026 Args:
2027 value: Candidate UDS path from configuration.
2029 Returns:
2030 The normalized absolute UDS path, or ``None`` when unset.
2032 Raises:
2033 ValueError: If the path is not absolute or its parent directory is missing.
2034 """
2035 if value in (None, ""):
2036 return None
2038 uds_path = Path(value).expanduser()
2039 if not uds_path.is_absolute():
2040 raise ValueError("experimental_rust_mcp_runtime_uds must be an absolute path")
2041 if not uds_path.parent.exists():
2042 raise ValueError(f"experimental_rust_mcp_runtime_uds parent directory does not exist: {uds_path.parent}")
2043 return str(uds_path)
2045 # -------------------------------
2046 # Flexible list parsing for envs
2047 # -------------------------------
2048 @field_validator(
2049 "sso_entra_admin_groups",
2050 "sso_trusted_domains",
2051 "sso_auto_admin_domains",
2052 "sso_github_admin_orgs",
2053 "sso_google_admin_domains",
2054 "insecure_queryparam_auth_allowed_hosts",
2055 "mcpgateway_ui_hide_sections",
2056 "mcpgateway_ui_hide_header_items",
2057 "mcpgateway_ui_hide_sections_admin",
2058 "mcpgateway_ui_hide_header_items_admin",
2059 "tool_description_forbidden_patterns",
2060 mode="before",
2061 )
2062 @classmethod
2063 def _parse_list_from_env(cls, v: None | str | list[str]) -> list[str]:
2064 """Parse list fields from environment values.
2066 Accepts either JSON arrays (e.g. '["a","b"]') or comma-separated
2067 strings (e.g. 'a,b'). Empty or None becomes an empty list.
2069 Args:
2070 v: The value to parse, can be None, list, or string.
2072 Returns:
2073 list: Parsed list of values.
2075 Raises:
2076 ValueError: If the value type is invalid for list field parsing.
2077 """
2078 if v is None:
2079 return []
2080 if isinstance(v, list):
2081 return v
2082 if isinstance(v, str):
2083 s = v.strip()
2084 if not s:
2085 return []
2086 if s.startswith("["):
2087 try:
2088 parsed = orjson.loads(s)
2089 return parsed if isinstance(parsed, list) else []
2090 except Exception:
2091 logger.warning("Invalid JSON list in env for list field; falling back to CSV parsing")
2092 # CSV fallback
2093 return [item.strip() for item in s.split(",") if item.strip()]
2094 raise ValueError("Invalid type for list field")
2096 @field_validator("tool_description_forbidden_patterns", mode="after")
2097 @classmethod
2098 def _filter_empty_forbidden_patterns(cls, value: list[str]) -> list[str]:
2099 """Strip empty/blank entries that would match every description.
2101 Args:
2102 value: List of forbidden pattern strings.
2104 Returns:
2105 list[str]: Filtered list with empty/blank entries removed.
2106 """
2107 return [p for p in value if p and p.strip()]
2109 @field_validator("mcpgateway_ui_hide_sections", "mcpgateway_ui_hide_sections_admin", mode="after")
2110 @classmethod
2111 def _validate_ui_hide_sections(cls, value: list[str]) -> list[str]:
2112 """Normalize and filter hidable UI sections.
2114 Args:
2115 value: Candidate section identifiers from environment/config.
2117 Returns:
2118 list[str]: Normalized unique section identifiers.
2119 """
2120 normalized: list[str] = []
2121 seen: set[str] = set()
2123 for item in value:
2124 candidate = str(item).strip().lower()
2125 if not candidate:
2126 continue
2127 candidate = UI_HIDE_SECTION_ALIASES.get(candidate, candidate)
2128 if candidate not in UI_HIDABLE_SECTIONS:
2129 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_SECTIONS item: %s", item)
2130 continue
2131 if candidate not in seen:
2132 seen.add(candidate)
2133 normalized.append(candidate)
2135 return normalized
2137 @field_validator("mcpgateway_ui_hide_header_items", "mcpgateway_ui_hide_header_items_admin", mode="after")
2138 @classmethod
2139 def _validate_ui_hide_header_items(cls, value: list[str]) -> list[str]:
2140 """Normalize and filter hidable header items.
2142 Args:
2143 value: Candidate header identifiers from environment/config.
2145 Returns:
2146 list[str]: Normalized unique header identifiers.
2147 """
2148 normalized: list[str] = []
2149 seen: set[str] = set()
2151 for item in value:
2152 candidate = str(item).strip().lower()
2153 if not candidate:
2154 continue
2155 if candidate not in UI_HIDABLE_HEADER_ITEMS:
2156 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_HEADER_ITEMS item: %s", item)
2157 continue
2158 if candidate not in seen:
2159 seen.add(candidate)
2160 normalized.append(candidate)
2162 return normalized
2164 @property
2165 def api_key(self) -> str:
2166 """
2167 Generate API key from auth credentials.
2169 Returns:
2170 str: API key string in the format "username:password".
2172 Examples:
2173 >>> from mcpgateway.config import Settings
2174 >>> settings = Settings(basic_auth_user="admin", basic_auth_password="secret")
2175 >>> settings.api_key
2176 'admin:secret'
2177 >>> settings = Settings(basic_auth_user="user123", basic_auth_password="pass456")
2178 >>> settings.api_key
2179 'user123:pass456'
2180 """
2181 return f"{self.basic_auth_user}:{self.basic_auth_password.get_secret_value()}"
2183 @property
2184 def supports_http(self) -> bool:
2185 """Check if HTTP transport is enabled.
2187 Returns:
2188 bool: True if HTTP transport is enabled, False otherwise.
2190 Examples:
2191 >>> settings = Settings(transport_type="http")
2192 >>> settings.supports_http
2193 True
2194 >>> settings = Settings(transport_type="all")
2195 >>> settings.supports_http
2196 True
2197 >>> settings = Settings(transport_type="ws")
2198 >>> settings.supports_http
2199 False
2200 """
2201 return self.transport_type in ["http", "all"]
2203 @property
2204 def supports_websocket(self) -> bool:
2205 """Check if WebSocket transport is enabled.
2207 Returns:
2208 bool: True if WebSocket transport is enabled, False otherwise.
2210 Examples:
2211 >>> settings = Settings(transport_type="ws")
2212 >>> settings.supports_websocket
2213 True
2214 >>> settings = Settings(transport_type="all")
2215 >>> settings.supports_websocket
2216 True
2217 >>> settings = Settings(transport_type="http")
2218 >>> settings.supports_websocket
2219 False
2220 """
2221 return self.transport_type in ["ws", "all"]
2223 @property
2224 def supports_sse(self) -> bool:
2225 """Check if SSE transport is enabled.
2227 Returns:
2228 bool: True if SSE transport is enabled, False otherwise.
2230 Examples:
2231 >>> settings = Settings(transport_type="sse")
2232 >>> settings.supports_sse
2233 True
2234 >>> settings = Settings(transport_type="all")
2235 >>> settings.supports_sse
2236 True
2237 >>> settings = Settings(transport_type="http")
2238 >>> settings.supports_sse
2239 False
2240 """
2241 return self.transport_type in ["sse", "all"]
2243 class DatabaseSettings(TypedDict):
2244 """TypedDict for SQLAlchemy database settings."""
2246 pool_size: int
2247 max_overflow: int
2248 pool_timeout: int
2249 pool_recycle: int
2250 connect_args: dict[str, Any] # consider more specific type if needed
2252 @property
2253 def database_settings(self) -> DatabaseSettings:
2254 """
2255 Get SQLAlchemy database settings.
2257 Returns:
2258 DatabaseSettings: Dictionary containing SQLAlchemy database configuration options.
2260 Examples:
2261 >>> from mcpgateway.config import Settings
2262 >>> s = Settings(database_url='sqlite:///./test.db')
2263 >>> isinstance(s.database_settings, dict)
2264 True
2265 """
2266 return {
2267 "pool_size": self.db_pool_size,
2268 "max_overflow": self.db_max_overflow,
2269 "pool_timeout": self.db_pool_timeout,
2270 "pool_recycle": self.db_pool_recycle,
2271 "connect_args": {"check_same_thread": False} if self.database_url.startswith("sqlite") else {},
2272 }
2274 class CORSSettings(TypedDict):
2275 """TypedDict for CORS settings."""
2277 allow_origins: NotRequired[List[str]]
2278 allow_credentials: NotRequired[bool]
2279 allow_methods: NotRequired[List[str]]
2280 allow_headers: NotRequired[List[str]]
2282 @property
2283 def cors_settings(self) -> CORSSettings:
2284 """Get CORS settings.
2286 Returns:
2287 CORSSettings: Dictionary containing CORS configuration options.
2289 Examples:
2290 >>> s = Settings(cors_enabled=True, allowed_origins={'http://localhost'})
2291 >>> cors = s.cors_settings
2292 >>> cors['allow_origins']
2293 ['http://localhost']
2294 >>> cors['allow_credentials']
2295 True
2296 >>> s2 = Settings(cors_enabled=False)
2297 >>> s2.cors_settings
2298 {}
2299 """
2300 return (
2301 {
2302 "allow_origins": list(self.allowed_origins),
2303 "allow_credentials": True,
2304 "allow_methods": ["*"],
2305 "allow_headers": ["*"],
2306 }
2307 if self.cors_enabled
2308 else {}
2309 )
2311 def validate_transport(self) -> None:
2312 """
2313 Validate transport configuration.
2315 Raises:
2316 ValueError: If the transport type is not one of the valid options.
2318 Examples:
2319 >>> from mcpgateway.config import Settings
2320 >>> s = Settings(transport_type='http')
2321 >>> s.validate_transport() # no error
2322 >>> s2 = Settings(transport_type='invalid')
2323 >>> try:
2324 ... s2.validate_transport()
2325 ... except ValueError as e:
2326 ... print('error')
2327 error
2328 """
2329 # valid_types = {"http", "ws", "sse", "all"}
2330 valid_types = {"sse", "streamablehttp", "all", "http"}
2331 if self.transport_type not in valid_types:
2332 raise ValueError(f"Invalid transport type. Must be one of: {valid_types}")
2334 def validate_database(self) -> None:
2335 """Validate database configuration.
2337 Examples:
2338 >>> from mcpgateway.config import Settings
2339 >>> s = Settings(database_url='sqlite:///./test.db')
2340 >>> s.validate_database() # Should create the directory if it does not exist
2341 """
2342 if self.database_url.startswith("sqlite"):
2343 db_path = Path(self.database_url.replace("sqlite:///", ""))
2344 db_dir = db_path.parent
2345 if not db_dir.exists():
2346 db_dir.mkdir(parents=True)
2348 # Validation patterns for safe display (configurable)
2349 validation_dangerous_html_pattern: str = (
2350 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>"
2351 )
2353 validation_dangerous_js_pattern: str = r"(?i)(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)"
2355 validation_allowed_url_schemes: List[str] = ["http://", "https://", "ws://", "wss://"]
2357 # Character validation patterns
2358 validation_name_pattern: str = r"^[a-zA-Z0-9_.\- ]+$" # Allow spaces for names (literal space, not \s to reject control chars)
2359 validation_identifier_pattern: str = r"^[a-zA-Z0-9_\-\.]+$" # No spaces for IDs
2360 validation_safe_uri_pattern: str = r"^[a-zA-Z0-9_\-.:/?=&%{}]+$"
2361 validation_unsafe_uri_pattern: str = r'[<>"\'\\]'
2362 validation_tool_name_pattern: str = r"^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$" # MCP tool naming per SEP-986
2363 validation_tool_method_pattern: str = r"^[a-zA-Z][a-zA-Z0-9_\./-]*$"
2365 # MCP-compliant size limits (configurable via env)
2366 validation_max_name_length: int = 255
2367 validation_max_description_length: int = 8192 # 8KB
2368 validation_max_template_length: int = 65536 # 64KB
2369 validation_max_content_length: int = 1048576 # 1MB
2370 validation_max_json_depth: int = Field(
2371 default=int(os.getenv("VALIDATION_MAX_JSON_DEPTH", "30")),
2372 description=(
2373 "Maximum allowed JSON nesting depth for tool/resource schemas. "
2374 "Increased from 10 to 30 for compatibility with deeply nested schemas "
2375 "like Notion MCP (issue #1542). Override with VALIDATION_MAX_JSON_DEPTH "
2376 "environment variable. Minimum: 1, Maximum: 100"
2377 ),
2378 ge=1,
2379 le=100,
2380 )
2381 validation_max_url_length: int = 2048
2382 validation_max_rpc_param_size: int = 262144 # 256KB
2384 validation_max_method_length: int = 128
2386 # Allowed MIME types
2387 validation_allowed_mime_types: List[str] = [
2388 "text/plain",
2389 "text/html",
2390 "text/css",
2391 "text/markdown",
2392 "text/javascript",
2393 "application/json",
2394 "application/xml",
2395 "application/pdf",
2396 "image/png",
2397 "image/jpeg",
2398 "image/gif",
2399 "image/svg+xml",
2400 "application/octet-stream",
2401 ]
2403 # Rate limiting
2404 validation_max_requests_per_minute: int = 60
2406 # Header passthrough feature (disabled by default for security)
2407 enable_header_passthrough: bool = Field(default=False, description="Enable HTTP header passthrough feature (WARNING: Security implications - only enable if needed)")
2408 enable_overwrite_base_headers: bool = Field(default=False, description="Enable overwriting of base headers")
2410 # Passthrough headers configuration
2411 default_passthrough_headers: List[str] = Field(default_factory=list)
2413 # Passthrough headers source priority
2414 # - "env": Environment variable always wins (ideal for Kubernetes/containerized deployments)
2415 # - "db": Database take precedence if configured, env as fallback (default)
2416 # - "merge": Union of both sources - env provides base, other configuration in DB can add more headers
2417 passthrough_headers_source: Literal["env", "db", "merge"] = Field(
2418 default="db",
2419 description="Source priority for passthrough headers: env (environment always wins), db (database wins, default), merge (combine both)",
2420 )
2422 # ===================================
2423 # Pagination Configuration
2424 # ===================================
2426 # Default number of items per page for paginated endpoints
2427 pagination_default_page_size: int = Field(default=50, ge=1, le=1000, description="Default number of items per page")
2429 # Maximum allowed items per page (prevents abuse)
2430 pagination_max_page_size: int = Field(default=500, ge=1, le=10000, description="Maximum allowed items per page")
2432 # Minimum items per page
2433 pagination_min_page_size: int = Field(default=1, ge=1, description="Minimum items per page")
2435 # Threshold for switching from offset to cursor-based pagination
2436 pagination_cursor_threshold: int = Field(default=10000, ge=1, description="Threshold for cursor-based pagination")
2438 # Enable cursor-based pagination globally
2439 pagination_cursor_enabled: bool = Field(default=True, description="Enable cursor-based pagination")
2441 # Default sort field for paginated queries
2442 pagination_default_sort_field: str = Field(default="created_at", description="Default sort field")
2444 # Default sort order for paginated queries
2445 pagination_default_sort_order: str = Field(default="desc", pattern="^(asc|desc)$", description="Default sort order")
2447 # Maximum offset allowed for offset-based pagination (prevents abuse)
2448 pagination_max_offset: int = Field(default=100000, ge=0, description="Maximum offset for pagination")
2450 # Cache pagination counts for performance (seconds)
2451 pagination_count_cache_ttl: int = Field(default=300, ge=0, description="Cache TTL for pagination counts")
2453 # Enable pagination links in API responses
2454 pagination_include_links: bool = Field(default=True, description="Include pagination links")
2456 # Base URL for pagination links (defaults to request URL)
2457 pagination_base_url: Optional[str] = Field(default=None, description="Base URL for pagination links")
2459 # Ed25519 keys for signing
2460 enable_ed25519_signing: bool = Field(default=False, description="Enable Ed25519 signing for certificates")
2461 prev_ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Previous Ed25519 private key for signing")
2462 prev_ed25519_public_key: Optional[str] = Field(default=None, description="Derived previous Ed25519 public key")
2463 ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Ed25519 private key for signing")
2464 ed25519_public_key: Optional[str] = Field(default=None, description="Derived Ed25519 public key")
2466 @model_validator(mode="after")
2467 def derive_public_keys(self) -> "Settings":
2468 """
2469 Derive public keys after all individual field validations are complete.
2471 Returns:
2472 Settings: The updated Settings instance with derived public keys.
2473 """
2474 for private_key_field in ["ed25519_private_key", "prev_ed25519_private_key"]:
2475 public_key_field = private_key_field.replace("private", "public")
2477 # 1. Get the private key SecretStr object
2478 private_key_secret: SecretStr = getattr(self, private_key_field)
2480 # 2. Proceed only if a key is present and the public key hasn't been set
2481 pem = private_key_secret.get_secret_value().strip()
2482 if not pem:
2483 continue
2485 try:
2486 # Load the private key
2487 private_key = serialization.load_pem_private_key(pem.encode(), password=None)
2488 if not isinstance(private_key, ed25519.Ed25519PrivateKey):
2489 # This check is useful, though model_validator should not raise
2490 # for an invalid key if the field validator has already passed.
2491 continue
2493 # Derive and PEM-encode the public key
2494 public_key = private_key.public_key()
2495 public_pem = public_key.public_bytes(
2496 encoding=serialization.Encoding.PEM,
2497 format=serialization.PublicFormat.SubjectPublicKeyInfo,
2498 ).decode()
2500 # 3. Set the public key attribute directly on the model instance (self)
2501 setattr(self, public_key_field, public_pem)
2502 # logger.info(f"Derived and stored {public_key_field} automatically.")
2504 except Exception:
2505 logger.warning("Failed to derive public key for private_key")
2506 # You can choose to raise an error here if a failure should halt model creation
2508 return self
2510 def __init__(self, **kwargs: Any) -> None:
2511 """Initialize Settings with environment variable parsing.
2513 Args:
2514 **kwargs: Keyword arguments passed to parent Settings class
2516 Raises:
2517 ValueError: When environment variable parsing fails or produces invalid data
2519 Examples:
2520 >>> import os
2521 >>> # Test with no environment variable set
2522 >>> old_val = os.environ.get('DEFAULT_PASSTHROUGH_HEADERS')
2523 >>> if 'DEFAULT_PASSTHROUGH_HEADERS' in os.environ:
2524 ... del os.environ['DEFAULT_PASSTHROUGH_HEADERS']
2525 >>> s = Settings()
2526 >>> s.default_passthrough_headers
2527 ['X-Tenant-Id', 'X-Trace-Id']
2528 >>> # Restore original value if it existed
2529 >>> if old_val is not None:
2530 ... os.environ['DEFAULT_PASSTHROUGH_HEADERS'] = old_val
2531 """
2532 super().__init__(**kwargs)
2534 # Parse DEFAULT_PASSTHROUGH_HEADERS environment variable
2535 default_value = os.environ.get("DEFAULT_PASSTHROUGH_HEADERS")
2536 if default_value:
2537 try:
2538 # Try JSON parsing first
2539 self.default_passthrough_headers = orjson.loads(default_value)
2540 if not isinstance(self.default_passthrough_headers, list):
2541 raise ValueError("Must be a JSON array")
2542 except (orjson.JSONDecodeError, ValueError):
2543 # Fallback to comma-separated parsing
2544 self.default_passthrough_headers = [h.strip() for h in default_value.split(",") if h.strip()]
2545 logger.info(f"Parsed comma-separated passthrough headers: {self.default_passthrough_headers}")
2546 else:
2547 # Safer defaults without Authorization header
2548 self.default_passthrough_headers = ["X-Tenant-Id", "X-Trace-Id"]
2550 # Configure environment-aware CORS origins if not explicitly set via env or kwargs
2551 # Only apply defaults if using the default allowed_origins value
2552 if not os.environ.get("ALLOWED_ORIGINS") and "allowed_origins" not in kwargs and self.allowed_origins == {"http://localhost", "http://localhost:4444"}:
2553 if self.environment == "development":
2554 self.allowed_origins = {
2555 "http://localhost",
2556 "http://localhost:3000",
2557 "http://localhost:8080",
2558 "http://127.0.0.1:3000",
2559 "http://127.0.0.1:8080",
2560 f"http://localhost:{self.port}",
2561 f"http://127.0.0.1:{self.port}",
2562 }
2563 else:
2564 # Production origins - construct from app_domain (extract hostname from HttpUrl)
2565 app_domain_host = urlparse(str(self.app_domain)).hostname or "localhost"
2566 self.allowed_origins = {f"https://{app_domain_host}", f"https://app.{app_domain_host}", f"https://admin.{app_domain_host}"}
2568 # MCP transport auth policy:
2569 # - If MCP_REQUIRE_AUTH is unset, derive it from AUTH_REQUIRED
2570 # - If AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false is explicit, emit a warning
2571 if self.mcp_require_auth is None:
2572 self.mcp_require_auth = bool(self.auth_required)
2573 logger.info(
2574 "MCP_REQUIRE_AUTH not set; defaulting to %s to match AUTH_REQUIRED=%s.",
2575 self.mcp_require_auth,
2576 self.auth_required,
2577 )
2578 elif self.auth_required and self.mcp_require_auth is False:
2579 logger.warning("AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false. MCP endpoints (/servers/*/mcp) allow unauthenticated access to public items.")
2581 # Validate proxy auth configuration
2582 if not self.mcp_client_auth_enabled and self.trust_proxy_auth and not self.trust_proxy_auth_dangerously:
2583 logger.warning(
2584 "TRUST_PROXY_AUTH=true ignored because TRUST_PROXY_AUTH_DANGEROUSLY is false "
2585 "while MCP_CLIENT_AUTH_ENABLED=false. Set TRUST_PROXY_AUTH_DANGEROUSLY=true "
2586 "only behind a strictly trusted authentication proxy."
2587 )
2588 self.trust_proxy_auth = False
2589 elif not self.mcp_client_auth_enabled and self.trust_proxy_auth and self.trust_proxy_auth_dangerously:
2590 logger.warning("TRUST_PROXY_AUTH_DANGEROUSLY=true acknowledged. Requests may trust identity headers from the upstream proxy.")
2591 elif not self.mcp_client_auth_enabled and not self.trust_proxy_auth:
2592 logger.warning(
2593 "MCP client authentication is disabled but trust_proxy_auth is not set. "
2594 "This is a security risk! Set TRUST_PROXY_AUTH=true only if ContextForge "
2595 "is behind a trusted authentication proxy."
2596 )
2598 if not self.auth_required and self.allow_unauthenticated_admin:
2599 logger.warning("ALLOW_UNAUTHENTICATED_ADMIN=true acknowledged while AUTH_REQUIRED=false. Unauthenticated requests may receive admin context.")
2601 # Masking value for all sensitive data
2602 masked_auth_value: str = "*****"
2604 def log_summary(self) -> None:
2605 """
2606 Log a summary of the application settings.
2608 Dumps the current settings to a dictionary while excluding sensitive
2609 information such as `database_url` and `memcached_url`, and logs it
2610 at the INFO level.
2612 This method is useful for debugging or auditing purposes without
2613 exposing credentials or secrets in logs.
2614 """
2615 summary = self.model_dump(exclude={"database_url", "memcached_url"})
2616 logger.info(f"Application settings summary: {summary}")
2618 ENABLE_METRICS: bool = Field(False, description="Enable Prometheus metrics endpoint at /metrics/prometheus (requires authentication)")
2619 METRICS_EXCLUDED_HANDLERS: str = Field("", description="Comma-separated regex patterns for paths to exclude from metrics")
2620 METRICS_NAMESPACE: str = Field("default", description="Prometheus metrics namespace")
2621 METRICS_SUBSYSTEM: str = Field("", description="Prometheus metrics subsystem")
2622 METRICS_CUSTOM_LABELS: str = Field("", description='Comma-separated "key=value" pairs for static custom labels')
2625@lru_cache()
2626def get_settings(**kwargs: Any) -> Settings:
2627 """Get cached settings instance.
2629 Args:
2630 **kwargs: Keyword arguments to pass to the Settings setup.
2632 Returns:
2633 Settings: A cached instance of the Settings class.
2635 Examples:
2636 >>> settings = get_settings()
2637 >>> isinstance(settings, Settings)
2638 True
2639 >>> # Second call returns the same cached instance
2640 >>> settings2 = get_settings()
2641 >>> settings is settings2
2642 True
2643 """
2644 # Instantiate a fresh Pydantic Settings object,
2645 # loading from env vars or .env exactly once.
2646 cfg = Settings(**kwargs)
2647 # Validate that transport_type is correct; will
2648 # raise if mis-configured.
2649 cfg.validate_transport()
2650 # Ensure sqlite DB directories exist if needed.
2651 cfg.validate_database()
2652 # Return the one-and-only Settings instance (cached).
2653 return cfg
2656def generate_settings_schema() -> dict[str, Any]:
2657 """
2658 Return the JSON Schema describing the Settings model.
2660 This schema can be used for validation or documentation purposes.
2662 Returns:
2663 dict: A dictionary representing the JSON Schema of the Settings model.
2664 """
2665 return Settings.model_json_schema(mode="validation")
2668# Lazy "instance" of settings
2669class LazySettingsWrapper:
2670 """Lazily initialize settings singleton on getattr"""
2672 @property
2673 def plugins(self) -> Any:
2674 """Access plugin framework settings via ``settings.plugins``.
2676 Returns a ``LazySettingsWrapper`` from the plugin framework that
2677 provides lightweight ``@property`` accessors for startup-critical
2678 fields and a ``__getattr__`` fallback to the full ``PluginsSettings``.
2680 Returns:
2681 The plugin framework settings wrapper.
2682 """
2683 # First-Party
2684 from mcpgateway.plugins.framework.settings import settings as _plugin_settings # pylint: disable=import-outside-toplevel
2686 return _plugin_settings
2688 def __getattr__(self, key: str) -> Any:
2689 """Get the real settings object and forward to it
2691 Args:
2692 key: The key to fetch from settings
2694 Returns:
2695 Any: The value of the attribute on the settings
2696 """
2697 return getattr(get_settings(), key)
2700settings = LazySettingsWrapper()
2703if __name__ == "__main__":
2704 if "--schema" in sys.argv:
2705 schema = generate_settings_schema()
2706 print(orjson.dumps(schema, option=orjson.OPT_INDENT_2).decode())
2707 sys.exit(0)
2708 settings.log_summary()