Coverage for mcpgateway / config.py: 99%
940 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/config.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti, Manav Gupta
7ContextForge AI Gateway Configuration.
8This module defines configuration settings for ContextForge AI Gateway using Pydantic.
9It loads configuration from environment variables with sensible defaults.
11Environment variables:
12- APP_NAME: Gateway name (default: "ContextForge")
13- HOST: Host to bind to (default: "127.0.0.1")
14- PORT: Port to listen on (default: 4444)
15- DATABASE_URL: SQLite database URL (default: "sqlite:///./mcp.db")
16- BASIC_AUTH_USER: Username for API Basic auth when enabled (default: "admin")
17- BASIC_AUTH_PASSWORD: Password for API Basic auth when enabled (default: "changeme")
18- LOG_LEVEL: Logging level (default: "INFO")
19- SKIP_SSL_VERIFY: Disable SSL verification (default: False)
20- AUTH_REQUIRED: Require authentication (default: True)
21- TRANSPORT_TYPE: Transport mechanisms (default: "all")
22- DOCS_ALLOW_BASIC_AUTH: Allow basic auth for docs (default: False)
23- RESOURCE_CACHE_SIZE: Max cached resources (default: 1000)
24- RESOURCE_CACHE_TTL: Cache TTL in seconds (default: 3600)
25- TOOL_TIMEOUT: Tool invocation timeout (default: 60)
26- PROMPT_CACHE_SIZE: Max cached prompts (default: 100)
27- HEALTH_CHECK_INTERVAL: Gateway health check interval (default: 300)
28- REQUIRE_TOKEN_EXPIRATION: Require JWT tokens to have expiration (default: True)
29- REQUIRE_JTI: Require JTI claim in tokens for revocation (default: True)
30- REQUIRE_USER_IN_DB: Require all users to exist in database (default: False)
32Examples:
33 >>> from mcpgateway.config import Settings
34 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret')
35 >>> s.api_key
36 'admin:secret'
37 >>> s2 = Settings(transport_type='http')
38 >>> s2.validate_transport() # no error
39 >>> s3 = Settings(transport_type='invalid')
40 >>> try:
41 ... s3.validate_transport()
42 ... except ValueError as e:
43 ... print('error')
44 error
45 >>> s4 = Settings(database_url='sqlite:///./test.db')
46 >>> isinstance(s4.database_settings, dict)
47 True
48"""
50# Standard
51from functools import lru_cache
52from importlib.resources import files
53import logging
54import os
55from pathlib import Path
56import re
57import sys
58from typing import Annotated, Any, ClassVar, Dict, List, Literal, NotRequired, Optional, Self, Set, TypedDict
59from urllib.parse import urlparse
61# Third-Party
62from cryptography.hazmat.primitives import serialization
63from cryptography.hazmat.primitives.asymmetric import ed25519
64import orjson
65from pydantic import Field, field_validator, HttpUrl, model_validator, PositiveInt, SecretStr, ValidationInfo
66from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict
68# Only configure basic logging if no handlers exist yet
69# This prevents conflicts with LoggingService while ensuring config logging works
70if not logging.getLogger().handlers:
71 logging.basicConfig(
72 level=logging.INFO,
73 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
74 datefmt="%Y-%m-%dT%H:%M:%S",
75 )
77logger = logging.getLogger(__name__)
80def _normalize_env_list_vars() -> None:
81 """Normalize list-typed env vars to valid JSON arrays.
83 Ensures env values parse cleanly when providers expect JSON for complex types.
84 If a value is empty or CSV, convert to a JSON array string.
85 """
86 keys = [
87 "SSO_TRUSTED_DOMAINS",
88 "SSO_AUTO_ADMIN_DOMAINS",
89 "SSO_GITHUB_ADMIN_ORGS",
90 "SSO_GOOGLE_ADMIN_DOMAINS",
91 "SSO_ENTRA_ADMIN_GROUPS",
92 "LOG_DETAILED_SKIP_ENDPOINTS",
93 ]
94 for key in keys:
95 raw = os.environ.get(key)
96 if raw is None:
97 continue
98 s = raw.strip()
99 if not s:
100 os.environ[key] = "[]"
101 continue
102 if s.startswith("["):
103 # Already JSON-like, keep as is
104 try:
105 orjson.loads(s)
106 continue
107 except Exception:
108 pass # nosec B110 - Intentionally continue with CSV parsing if JSON parsing fails
109 # Convert CSV to JSON array
110 items = [item.strip() for item in s.split(",") if item.strip()]
111 os.environ[key] = orjson.dumps(items).decode()
114_normalize_env_list_vars()
117# Default content type for outgoing requests to Forge
118FORGE_CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
120# UI embedding / visibility controls
121UI_HIDABLE_SECTIONS = frozenset(
122 {
123 "overview",
124 "servers",
125 "gateways",
126 "tools",
127 "prompts",
128 "resources",
129 "roots",
130 "mcp-registry",
131 "metrics",
132 "plugins",
133 "export-import",
134 "logs",
135 "version-info",
136 "maintenance",
137 "teams",
138 "users",
139 "agents",
140 "tokens",
141 "settings",
142 }
143)
144UI_HIDABLE_HEADER_ITEMS = frozenset({"logout", "team_selector", "user_identity", "theme_toggle"})
145UI_HIDE_SECTION_ALIASES = {
146 "catalog": "servers",
147 "virtual_servers": "servers",
148 "a2a-agents": "agents",
149 "a2a": "agents",
150 "grpc-services": "agents",
151 "api_tokens": "tokens",
152 "llm-settings": "settings",
153}
156class Settings(BaseSettings):
157 """
158 ContextForge AI Gateway configuration settings.
160 Examples:
161 >>> from mcpgateway.config import Settings
162 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret')
163 >>> s.api_key
164 'admin:secret'
165 >>> s2 = Settings(transport_type='http')
166 >>> s2.validate_transport() # no error
167 >>> s3 = Settings(transport_type='invalid')
168 >>> try:
169 ... s3.validate_transport()
170 ... except ValueError as e:
171 ... print('error')
172 error
173 >>> s4 = Settings(database_url='sqlite:///./test.db')
174 >>> isinstance(s4.database_settings, dict)
175 True
176 >>> s5 = Settings()
177 >>> s5.app_name
178 'ContextForge'
179 >>> s5.host in ('0.0.0.0', '127.0.0.1') # Default can be either
180 True
181 >>> s5.port
182 4444
183 >>> s5.auth_required
184 True
185 >>> isinstance(s5.allowed_origins, set)
186 True
187 >>> s6 = Settings(log_detailed_skip_endpoints=["/metrics", "/health"])
188 >>> s6.log_detailed_skip_endpoints
189 ['/metrics', '/health']
190 >>> s7 = Settings(log_detailed_sample_rate=0.5)
191 >>> s7.log_detailed_sample_rate
192 0.5
193 >>> s8 = Settings(log_resolve_user_identity=True)
194 >>> s8.log_resolve_user_identity
195 True
196 >>> s9 = Settings()
197 >>> s9.log_detailed_skip_endpoints
198 []
199 >>> s9.log_detailed_sample_rate
200 1.0
201 >>> s9.log_resolve_user_identity
202 False
203 """
205 # Basic Settings
206 app_name: str = "ContextForge"
207 host: str = "127.0.0.1"
208 port: PositiveInt = Field(default=4444, ge=1, le=65535)
209 client_mode: bool = False
210 docs_allow_basic_auth: bool = False # Allow basic auth for docs
211 api_allow_basic_auth: bool = Field(
212 default=False,
213 description="Allow Basic authentication for API endpoints. Disabled by default for security. Use JWT or API tokens instead.",
214 )
215 database_url: str = Field(
216 default="sqlite:///./mcp.db",
217 description=(
218 "Database connection URL. Supports SQLite, PostgreSQL, MySQL/MariaDB. "
219 "For PostgreSQL with custom schema, use the 'options' query parameter: "
220 "postgresql://user:pass@host:5432/db?options=-c%20search_path=schema_name "
221 "(See Issue #1535 for details)"
222 ),
223 )
225 # Absolute paths resolved at import-time (still override-able via env vars)
226 templates_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "templates")))
227 static_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "static")))
229 # Template auto-reload: False for production (default), True for development
230 # Disabling prevents re-parsing templates on each request, improving performance under load
231 # Use TEMPLATES_AUTO_RELOAD=true for development (make dev sets this automatically)
232 templates_auto_reload: bool = Field(default=False, description="Auto-reload Jinja2 templates on change (enable for development)")
234 app_root_path: str = ""
236 # Protocol
237 protocol_version: str = "2025-11-25"
239 # Authentication
240 basic_auth_user: str = "admin"
241 basic_auth_password: SecretStr = Field(default=SecretStr("changeme"))
242 jwt_algorithm: str = "HS256"
243 jwt_secret_key: SecretStr = Field(default=SecretStr("my-test-key"))
244 jwt_public_key_path: str = ""
245 jwt_private_key_path: str = ""
246 jwt_audience: str = "mcpgateway-api"
247 jwt_issuer: str = "mcpgateway"
248 jwt_audience_verification: bool = True
249 jwt_issuer_verification: bool = True
250 auth_required: bool = True
251 allow_unauthenticated_admin: bool = Field(
252 default=False,
253 description="Allow unauthenticated requests to receive platform-admin context when AUTH_REQUIRED=false (dangerous; development-only override).",
254 )
255 token_expiry: int = 10080 # minutes
257 require_token_expiration: bool = Field(default=True, description="Require all JWT tokens to have expiration claims (secure default)")
258 require_jti: bool = Field(default=True, description="Require JTI (JWT ID) claim in all tokens for revocation support (secure default)")
259 require_user_in_db: bool = Field(
260 default=False,
261 description="Require all authenticated users to exist in the database. When true, disables the platform admin bootstrap mechanism. WARNING: Enabling this on a fresh deployment will lock you out.",
262 )
263 embed_environment_in_tokens: bool = Field(default=False, description="Embed environment claim in gateway-issued JWTs for environment isolation")
264 validate_token_environment: bool = Field(default=False, description="Reject tokens with mismatched environment claim (tokens without env claim are allowed)")
266 # JSON Schema Validation for registration (Tool Input Schemas, Prompt schemas, etc)
267 json_schema_validation_strict: bool = Field(default=True, description="Strict schema validation mode - reject invalid JSON schemas")
269 # SSO Configuration
270 sso_enabled: bool = Field(default=False, description="Enable Single Sign-On authentication")
271 sso_github_enabled: bool = Field(default=False, description="Enable GitHub OAuth authentication")
272 sso_github_client_id: Optional[str] = Field(default=None, description="GitHub OAuth client ID")
273 sso_github_client_secret: Optional[SecretStr] = Field(default=None, description="GitHub OAuth client secret")
275 sso_google_enabled: bool = Field(default=False, description="Enable Google OAuth authentication")
276 sso_google_client_id: Optional[str] = Field(default=None, description="Google OAuth client ID")
277 sso_google_client_secret: Optional[SecretStr] = Field(default=None, description="Google OAuth client secret")
279 sso_ibm_verify_enabled: bool = Field(default=False, description="Enable IBM Security Verify OIDC authentication")
280 sso_ibm_verify_client_id: Optional[str] = Field(default=None, description="IBM Security Verify client ID")
281 sso_ibm_verify_client_secret: Optional[SecretStr] = Field(default=None, description="IBM Security Verify client secret")
282 sso_ibm_verify_issuer: Optional[str] = Field(default=None, description="IBM Security Verify OIDC issuer URL")
284 sso_okta_enabled: bool = Field(default=False, description="Enable Okta OIDC authentication")
285 sso_okta_client_id: Optional[str] = Field(default=None, description="Okta client ID")
286 sso_okta_client_secret: Optional[SecretStr] = Field(default=None, description="Okta client secret")
287 sso_okta_issuer: Optional[str] = Field(default=None, description="Okta issuer URL")
289 sso_keycloak_enabled: bool = Field(default=False, description="Enable Keycloak OIDC authentication")
290 sso_keycloak_base_url: Optional[str] = Field(default=None, description="Keycloak base URL (e.g., https://keycloak.example.com)")
291 sso_keycloak_public_base_url: Optional[str] = Field(
292 default=None,
293 description="Browser-facing Keycloak base URL for authorization redirects (e.g., http://localhost:8180)",
294 )
295 sso_keycloak_realm: str = Field(default="master", description="Keycloak realm name")
296 sso_keycloak_client_id: Optional[str] = Field(default=None, description="Keycloak client ID")
297 sso_keycloak_client_secret: Optional[SecretStr] = Field(default=None, description="Keycloak client secret")
298 sso_keycloak_map_realm_roles: bool = Field(default=True, description="Map Keycloak realm roles to gateway teams")
299 sso_keycloak_map_client_roles: bool = Field(default=False, description="Map Keycloak client roles to gateway RBAC")
300 sso_keycloak_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map Keycloak groups/roles to ContextForge roles (JSON: {group_or_role: role_name})")
301 sso_keycloak_default_role: Optional[str] = Field(default=None, description="Default ContextForge role for Keycloak users without role mapping")
302 sso_keycloak_resolve_team_scope_to_personal_team: bool = Field(default=False, description="Resolve team-scoped Keycloak role mappings to the user's personal team")
303 sso_keycloak_username_claim: str = Field(default="preferred_username", description="JWT claim for username")
305 # Security Validation & Sanitization
306 experimental_validate_io: bool = Field(default=False, description="Enable experimental input validation and output sanitization")
307 validation_middleware_enabled: bool = Field(default=False, description="Enable validation middleware for all requests")
308 validation_strict: bool = Field(default=True, description="Strict validation mode - reject on violations")
309 sanitize_output: bool = Field(default=True, description="Sanitize output to remove control characters")
310 allowed_roots: List[str] = Field(default_factory=list, description="Allowed root paths for resource access")
311 max_path_depth: int = Field(default=10, description="Maximum allowed path depth")
312 max_param_length: int = Field(default=10000, description="Maximum parameter length")
313 dangerous_patterns: List[str] = Field(
314 default_factory=lambda: [
315 r"[;&|`$(){}\[\]<>]", # Shell metacharacters
316 r"\.\.[\\/]", # Path traversal
317 r"[\x00-\x1f\x7f-\x9f]", # Control characters
318 ],
319 description="Regex patterns for dangerous input",
320 )
322 sso_keycloak_email_claim: str = Field(default="email", description="JWT claim for email")
323 sso_keycloak_groups_claim: str = Field(default="groups", description="JWT claim for groups/roles")
325 sso_entra_enabled: bool = Field(default=False, description="Enable Microsoft Entra ID OIDC authentication")
326 sso_entra_client_id: Optional[str] = Field(default=None, description="Microsoft Entra ID client ID")
327 sso_entra_client_secret: Optional[SecretStr] = Field(default=None, description="Microsoft Entra ID client secret")
328 sso_entra_tenant_id: Optional[str] = Field(default=None, description="Microsoft Entra ID tenant ID")
329 sso_entra_groups_claim: str = Field(default="groups", description="JWT claim for EntraID groups (groups/roles)")
330 sso_entra_admin_groups: Annotated[list[str], NoDecode] = Field(default_factory=list, description="EntraID groups granting platform_admin role (CSV/JSON)")
331 sso_entra_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map EntraID groups to ContextForge roles (JSON: {group_id: role_name})")
332 sso_entra_default_role: Optional[str] = Field(default=None, description="Default role for EntraID users without group mapping (None = no role assigned)")
333 sso_entra_sync_roles_on_login: bool = Field(default=True, description="Synchronize role assignments on each login")
334 sso_entra_graph_api_enabled: bool = Field(default=True, description="Enable Microsoft Graph fallback for EntraID groups overage claims")
335 sso_entra_graph_api_timeout: int = Field(default=10, ge=1, le=120, description="Timeout in seconds for Microsoft Graph group fallback requests")
336 sso_entra_graph_api_max_groups: int = Field(default=0, ge=0, description="Maximum groups to keep from Graph fallback (0 = no limit)")
338 sso_generic_enabled: bool = Field(default=False, description="Enable generic OIDC provider (Keycloak, Auth0, etc.)")
339 sso_generic_provider_id: Optional[str] = Field(default=None, description="Provider ID (e.g., 'keycloak', 'auth0', 'authentik')")
340 sso_generic_display_name: Optional[str] = Field(default=None, description="Display name shown on login page")
341 sso_generic_client_id: Optional[str] = Field(default=None, description="Generic OIDC client ID")
342 sso_generic_client_secret: Optional[SecretStr] = Field(default=None, description="Generic OIDC client secret")
343 sso_generic_authorization_url: Optional[str] = Field(default=None, description="Authorization endpoint URL")
344 sso_generic_token_url: Optional[str] = Field(default=None, description="Token endpoint URL")
345 sso_generic_userinfo_url: Optional[str] = Field(default=None, description="Userinfo endpoint URL")
346 sso_generic_issuer: Optional[str] = Field(default=None, description="OIDC issuer URL")
347 sso_generic_jwks_uri: Optional[str] = Field(default=None, description="OIDC JWKS endpoint URL for token signature verification")
348 sso_generic_scope: Optional[str] = Field(default="openid profile email", description="OAuth scopes (space-separated)")
350 # SSO Settings
351 sso_auto_create_users: bool = Field(default=True, description="Automatically create users from SSO providers")
352 sso_trusted_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Trusted email domains (CSV or JSON list)")
353 sso_preserve_admin_auth: bool = Field(default=True, description="Preserve local admin authentication when SSO is enabled")
355 # SSO Admin Assignment Settings
356 sso_auto_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Admin domains (CSV or JSON list)")
357 sso_github_admin_orgs: Annotated[list[str], NoDecode] = Field(default_factory=list, description="GitHub orgs granting admin (CSV/JSON)")
358 sso_google_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Google admin domains (CSV/JSON)")
359 sso_require_admin_approval: bool = Field(default=False, description="Require admin approval for new SSO registrations")
361 # MCP Client Authentication
362 mcp_client_auth_enabled: bool = Field(default=True, description="Enable JWT authentication for MCP client operations")
363 mcp_require_auth: Optional[bool] = Field(
364 default=None,
365 description=(
366 "Require authentication for /mcp endpoints. "
367 "When unset, inherits AUTH_REQUIRED. "
368 "Set false explicitly to allow unauthenticated access to public items only; "
369 "set true to require a valid Bearer token for all /mcp requests."
370 ),
371 )
372 trust_proxy_auth: bool = Field(
373 default=False,
374 description="Trust proxy authentication headers (required when mcp_client_auth_enabled=false)",
375 )
376 trust_proxy_auth_dangerously: bool = Field(
377 default=False,
378 description="Acknowledge and allow trusted proxy headers when MCP_CLIENT_AUTH_ENABLED=false (dangerous; only for strictly trusted proxy deployments).",
379 )
380 proxy_user_header: str = Field(default="X-Authenticated-User", description="Header containing authenticated username from proxy")
382 # Encryption key phrase for auth storage
383 auth_encryption_secret: SecretStr = Field(default=SecretStr("my-test-salt"))
385 # Query Parameter Authentication (INSECURE - disabled by default)
386 insecure_allow_queryparam_auth: bool = Field(
387 default=False,
388 description=("Enable query parameter authentication for gateway peers. " "WARNING: API keys may appear in proxy logs. See CWE-598."),
389 )
390 insecure_queryparam_auth_allowed_hosts: List[str] = Field(
391 default_factory=list,
392 description=("Allowlist of hosts permitted to use query parameter auth. " "Empty list allows any host when feature is enabled. " "Format: ['mcp.tavily.com', 'api.example.com']"),
393 )
395 # ===================================
396 # SSRF Protection Configuration
397 # ===================================
398 # Server-Side Request Forgery (SSRF) protection prevents the gateway from being
399 # used to access internal resources or cloud metadata services.
401 ssrf_protection_enabled: bool = Field(
402 default=True,
403 description="Enable SSRF protection for gateway/tool URLs. Blocks access to dangerous endpoints.",
404 )
406 ssrf_blocked_networks: List[str] = Field(
407 default=[
408 # Cloud metadata services (ALWAYS dangerous - credential exposure)
409 "169.254.169.254/32", # AWS/GCP/Azure instance metadata
410 "169.254.169.123/32", # AWS NTP service
411 "fd00::1/128", # IPv6 cloud metadata
412 # Link-local (often used for cloud metadata)
413 "169.254.0.0/16", # Full link-local IPv4 range
414 "fe80::/10", # IPv6 link-local
415 ],
416 description=(
417 "CIDR ranges to block for SSRF protection. These are ALWAYS blocked regardless of other settings. " "Default blocks cloud metadata endpoints. Add private ranges for stricter security."
418 ),
419 )
421 ssrf_blocked_hosts: List[str] = Field(
422 default=[
423 "metadata.google.internal", # GCP metadata hostname
424 "metadata.internal", # Generic cloud metadata
425 ],
426 description="Hostnames to block for SSRF protection. Matched case-insensitively.",
427 )
429 ssrf_allow_localhost: bool = Field(
430 default=False,
431 description=("Allow localhost/loopback addresses (127.0.0.0/8, ::1). " "Default false for safer production behavior."),
432 )
434 ssrf_allow_private_networks: bool = Field(
435 default=False,
436 description=(
437 "Allow RFC 1918 private network addresses (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16). " "When false, private destinations are blocked unless explicitly listed in SSRF_ALLOWED_NETWORKS."
438 ),
439 )
441 ssrf_allowed_networks: List[str] = Field(
442 default_factory=list,
443 description=("Optional CIDR allowlist for internal/private destinations. " "Used when SSRF_ALLOW_PRIVATE_NETWORKS=false to allow specific internal ranges."),
444 )
446 ssrf_dns_fail_closed: bool = Field(
447 default=True,
448 description=(
449 "Fail closed on DNS resolution errors. When true, URLs that cannot be resolved "
450 "are rejected. When false, unresolvable hostnames are allowed through "
451 "(hostname blocklist still applies)."
452 ),
453 )
455 # OAuth Configuration
456 oauth_request_timeout: int = Field(default=30, description="OAuth request timeout in seconds")
457 oauth_max_retries: int = Field(default=3, description="Maximum retries for OAuth token requests")
458 oauth_default_timeout: int = Field(default=3600, description="Default OAuth token timeout in seconds")
460 # ===================================
461 # Dynamic Client Registration (DCR) - Client Mode
462 # ===================================
464 # Enable DCR client functionality
465 dcr_enabled: bool = Field(default=True, description="Enable Dynamic Client Registration (RFC 7591) - gateway acts as DCR client")
467 # Auto-register when missing credentials
468 dcr_auto_register_on_missing_credentials: bool = Field(default=True, description="Automatically register with AS when gateway has issuer but no client_id")
470 # Default scopes for DCR
471 dcr_default_scopes: List[str] = Field(default=["mcp:read"], description="Default MCP scopes to request during DCR")
473 # Issuer allowlist (empty = allow any)
474 dcr_allowed_issuers: List[str] = Field(default_factory=list, description="Optional allowlist of issuer URLs for DCR (empty = allow any)")
476 # Token endpoint auth method
477 dcr_token_endpoint_auth_method: str = Field(default="client_secret_basic", description="Token endpoint auth method for DCR (client_secret_basic or client_secret_post)")
479 # Metadata cache TTL
480 dcr_metadata_cache_ttl: int = Field(default=3600, description="AS metadata cache TTL in seconds (RFC 8414 discovery)")
482 # Client name template
483 dcr_client_name_template: str = Field(default="ContextForge ({gateway_name})", description="Template for client_name in DCR requests")
485 # Refresh token behavior
486 dcr_request_refresh_token_when_unsupported: bool = Field(
487 default=False,
488 description="Request refresh_token even when AS metadata omits grant_types_supported. Enable for AS servers that support refresh tokens but don't advertise it.",
489 )
491 # ===================================
492 # OAuth Discovery (RFC 8414)
493 # ===================================
495 oauth_discovery_enabled: bool = Field(default=True, description="Enable OAuth AS metadata discovery (RFC 8414)")
497 oauth_preferred_code_challenge_method: str = Field(default="S256", description="Preferred PKCE code challenge method (S256 or plain)")
499 # Email-Based Authentication
500 email_auth_enabled: bool = Field(default=True, description="Enable email-based authentication")
501 public_registration_enabled: bool = Field(
502 default=False,
503 description="Allow unauthenticated users to self-register accounts. When false, only admins can create users via /admin/users endpoint.",
504 )
505 allow_public_visibility: bool = Field(
506 default=True,
507 description="When false, creating or updating any entity with public visibility is blocked in team scope.",
508 )
509 protect_all_admins: bool = Field(
510 default=True,
511 description="When true (default), prevent any admin from being demoted, deactivated, or locked out via API/UI. When false, only the last active admin is protected.",
512 )
513 platform_admin_email: str = Field(default="admin@example.com", description="Platform administrator email address")
514 platform_admin_password: SecretStr = Field(default=SecretStr("changeme"), description="Platform administrator password")
515 default_user_password: SecretStr = Field(default=SecretStr("changeme"), description="Default password for new users") # nosec B105
516 platform_admin_full_name: str = Field(default="Platform Administrator", description="Platform administrator full name")
518 # Argon2id Password Hashing Configuration
519 argon2id_time_cost: int = Field(default=3, description="Argon2id time cost (number of iterations)")
520 argon2id_memory_cost: int = Field(default=65536, description="Argon2id memory cost in KiB")
521 argon2id_parallelism: int = Field(default=1, description="Argon2id parallelism (number of threads)")
523 # Password Policy Configuration
524 password_min_length: int = Field(default=8, description="Minimum password length")
525 password_require_uppercase: bool = Field(default=True, description="Require uppercase letters in passwords")
526 password_require_lowercase: bool = Field(default=True, description="Require lowercase letters in passwords")
527 password_require_numbers: bool = Field(default=False, description="Require numbers in passwords")
528 password_require_special: bool = Field(default=True, description="Require special characters in passwords")
530 # Password change enforcement and policy toggles
531 password_change_enforcement_enabled: bool = Field(default=True, description="Master switch for password change enforcement checks")
532 admin_require_password_change_on_bootstrap: bool = Field(default=True, description="Force admin to change password after bootstrap")
533 detect_default_password_on_login: bool = Field(default=True, description="Detect default password during login and mark user for change")
534 require_password_change_for_default_password: bool = Field(default=True, description="Require password change when user is created with the default password")
535 password_policy_enabled: bool = Field(default=True, description="Enable password complexity validation for new/changed passwords")
536 password_prevent_reuse: bool = Field(default=True, description="Prevent reusing the current password when changing")
537 password_max_age_days: int = Field(default=90, description="Password maximum age in days before expiry forces a change")
538 # Account Security Configuration
539 max_failed_login_attempts: int = Field(default=10, description="Maximum failed login attempts before account lockout")
540 account_lockout_duration_minutes: int = Field(default=1, description="Account lockout duration in minutes")
541 account_lockout_notification_enabled: bool = Field(default=True, description="Send lockout notification emails when accounts are locked")
542 failed_login_min_response_ms: int = Field(default=250, description="Minimum response duration for failed login attempts to reduce timing side channels")
544 # Self-service password reset
545 password_reset_enabled: bool = Field(default=True, description="Enable self-service password reset workflow (set false to disable public forgot/reset endpoints)")
546 password_reset_token_expiry_minutes: int = Field(default=60, description="Password reset token expiration time in minutes")
547 password_reset_rate_limit: int = Field(default=5, description="Maximum password reset requests allowed per email in each rate-limit window")
548 password_reset_rate_window_minutes: int = Field(default=15, description="Password reset request rate-limit window in minutes")
549 password_reset_invalidate_sessions: bool = Field(default=True, description="Invalidate active sessions after password reset")
550 password_reset_min_response_ms: int = Field(default=250, description="Minimum response duration for forgot-password requests to reduce timing side channels")
552 # Email delivery for auth notifications
553 smtp_enabled: bool = Field(
554 default=False,
555 description="Enable SMTP email delivery for password reset and account lockout notifications (when false, reset requests are accepted but no email is sent)",
556 )
557 smtp_host: Optional[str] = Field(default=None, description="SMTP server host")
558 smtp_port: int = Field(default=587, description="SMTP server port")
559 smtp_user: Optional[str] = Field(default=None, description="SMTP username")
560 smtp_password: Optional[SecretStr] = Field(default=None, description="SMTP password")
561 smtp_from_email: Optional[str] = Field(default=None, description="From email address used for auth notifications")
562 smtp_from_name: str = Field(default="ContextForge", description="From display name used for auth notifications")
563 smtp_use_tls: bool = Field(default=True, description="Use STARTTLS for SMTP connections")
564 smtp_use_ssl: bool = Field(default=False, description="Use implicit SSL/TLS for SMTP connections")
565 smtp_timeout_seconds: int = Field(default=15, description="SMTP connection timeout in seconds")
567 # Personal Teams Configuration
568 auto_create_personal_teams: bool = Field(default=True, description="Enable automatic personal team creation for new users")
569 personal_team_prefix: str = Field(default="", description="Personal team naming prefix")
570 max_teams_per_user: int = Field(default=50, description="Maximum number of teams a user can belong to")
571 max_members_per_team: int = Field(default=100, description="Maximum number of members per team")
572 invitation_expiry_days: int = Field(default=7, description="Number of days before team invitations expire")
573 require_email_verification_for_invites: bool = Field(default=True, description="Require email verification for team invitations")
575 # Team Governance
576 allow_team_creation: bool = Field(default=True, description="Allow users to create organizational teams. Admins can always create teams.")
577 allow_team_join_requests: bool = Field(default=True, description="Allow users to request to join public teams")
578 allow_team_invitations: bool = Field(default=True, description="Allow team owners to send invitations")
580 # Default Role Configuration
581 default_admin_role: str = Field(default="platform_admin", description="Global role assigned to admin users")
582 default_user_role: str = Field(default="platform_viewer", description="Global role assigned to non-admin users")
583 default_team_owner_role: str = Field(default="team_admin", description="Team-scoped role assigned to team owners (e.g. personal team creator)")
584 default_team_member_role: str = Field(default="viewer", description="Team-scoped role assigned to team members")
586 # UI/Admin Feature Flags
587 mcpgateway_ui_enabled: bool = False
588 mcpgateway_admin_api_enabled: bool = False
589 mcpgateway_ui_airgapped: bool = Field(default=False, description="Use local CDN assets instead of external CDNs for airgapped deployments")
590 mcpgateway_ui_embedded: bool = Field(default=False, description="Enable embedded UI mode (hides select header controls by default)")
591 mcpgateway_ui_hide_sections: Annotated[list[str], NoDecode] = Field(
592 default_factory=list,
593 description=(
594 "CSV/JSON list of UI sections to hide. "
595 "Valid values: overview, servers, gateways, tools, prompts, resources, roots, mcp-registry, "
596 "metrics, plugins, export-import, logs, version-info, maintenance, teams, users, agents, tokens, settings"
597 ),
598 )
599 mcpgateway_ui_hide_header_items: Annotated[list[str], NoDecode] = Field(
600 default_factory=list,
601 description="CSV/JSON list of header items to hide. Valid values: logout, team_selector, user_identity, theme_toggle",
602 )
603 mcpgateway_bulk_import_enabled: bool = True
604 mcpgateway_bulk_import_max_tools: int = 200
605 mcpgateway_bulk_import_rate_limit: int = 10
607 # UI Tool Test Configuration
608 mcpgateway_ui_tool_test_timeout: int = Field(default=60000, description="Tool test timeout in milliseconds for the admin UI")
610 # Tool Execution Cancellation
611 mcpgateway_tool_cancellation_enabled: bool = Field(default=True, description="Enable gateway-authoritative tool execution cancellation with REST API endpoints")
613 # A2A (Agent-to-Agent) Feature Flags
614 mcpgateway_a2a_enabled: bool = True
615 mcpgateway_a2a_max_agents: int = 100
616 mcpgateway_a2a_default_timeout: int = 30
617 mcpgateway_a2a_max_retries: int = 3
618 mcpgateway_a2a_metrics_enabled: bool = True
620 # gRPC Support Configuration (EXPERIMENTAL - disabled by default)
621 mcpgateway_grpc_enabled: bool = Field(default=False, description="Enable gRPC to MCP translation support (experimental feature)")
622 mcpgateway_grpc_reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection by default")
623 mcpgateway_grpc_max_message_size: int = Field(default=4194304, description="Maximum gRPC message size in bytes (4MB)")
624 mcpgateway_grpc_timeout: int = Field(default=30, description="Default gRPC call timeout in seconds")
625 mcpgateway_grpc_tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connections by default")
627 # Direct Proxy Configuration (disabled by default)
628 mcpgateway_direct_proxy_enabled: bool = Field(default=False, description="Enable direct_proxy gateway mode for pass-through MCP operations")
629 mcpgateway_direct_proxy_timeout: int = Field(default=30, description="Default timeout in seconds for direct proxy MCP operations")
631 # ===================================
632 # Performance Monitoring Configuration
633 # ===================================
634 mcpgateway_performance_tracking: bool = Field(default=False, description="Enable performance tracking tab in admin UI")
635 mcpgateway_performance_collection_interval: int = Field(default=10, ge=1, le=300, description="Metric collection interval in seconds")
636 mcpgateway_performance_retention_hours: int = Field(default=24, ge=1, le=168, description="Snapshot retention period in hours")
637 mcpgateway_performance_retention_days: int = Field(default=90, ge=1, le=365, description="Aggregate retention period in days")
638 mcpgateway_performance_max_snapshots: int = Field(default=10000, ge=100, le=1000000, description="Maximum performance snapshots to retain")
639 mcpgateway_performance_distributed: bool = Field(default=False, description="Enable distributed mode metrics aggregation via Redis")
640 mcpgateway_performance_net_connections_enabled: bool = Field(default=True, description="Enable network connections counting (can be CPU intensive)")
641 mcpgateway_performance_net_connections_cache_ttl: int = Field(default=15, ge=1, le=300, description="Cache TTL for net_connections in seconds")
643 # MCP Server Catalog Configuration
644 mcpgateway_catalog_enabled: bool = Field(default=True, description="Enable MCP server catalog feature")
645 mcpgateway_catalog_file: str = Field(default="mcp-catalog.yml", description="Path to catalog configuration file")
646 mcpgateway_catalog_auto_health_check: bool = Field(default=True, description="Automatically health check catalog servers")
647 mcpgateway_catalog_cache_ttl: int = Field(default=3600, description="Catalog cache TTL in seconds")
648 mcpgateway_catalog_page_size: int = Field(default=100, description="Number of catalog servers per page")
650 # ContextForge Bootstrap Roles In DB Configuration
651 mcpgateway_bootstrap_roles_in_db_enabled: bool = Field(default=False, description="Enable ContextForge add additional roles in db")
652 mcpgateway_bootstrap_roles_in_db_file: str = Field(default="additional_roles_in_db.json", description="Path to add additional roles in db")
654 # Elicitation support (MCP 2025-06-18)
655 mcpgateway_elicitation_enabled: bool = Field(default=True, description="Enable elicitation passthrough support (MCP 2025-06-18)")
656 mcpgateway_elicitation_timeout: int = Field(default=60, description="Default timeout for elicitation requests in seconds")
657 mcpgateway_elicitation_max_concurrent: int = Field(default=100, description="Maximum concurrent elicitation requests")
659 # Security
660 skip_ssl_verify: bool = Field(
661 default=False,
662 description="Skip SSL certificate verification for ALL outbound HTTPS requests "
663 "(federation, MCP servers, LLM providers, A2A agents). "
664 "WARNING: Only enable in dev environments with self-signed certificates.",
665 )
666 cors_enabled: bool = True
668 # Environment
669 environment: Literal["development", "staging", "production"] = Field(default="development")
671 # Domain configuration
672 app_domain: HttpUrl = Field(default=HttpUrl("http://localhost:4444"))
674 # Security settings
675 secure_cookies: bool = Field(default=True)
676 cookie_samesite: str = Field(default="lax")
678 # CORS settings
679 cors_allow_credentials: bool = Field(default=True)
681 # Security Headers Configuration
682 security_headers_enabled: bool = Field(default=True)
683 x_frame_options: Optional[str] = Field(default="DENY")
685 @field_validator("x_frame_options")
686 @classmethod
687 def normalize_x_frame_options(cls, v: Optional[str]) -> Optional[str]:
688 """Convert string 'null', 'none', or empty/whitespace-only string to Python None to disable iframe restrictions.
690 Args:
691 v: The X-Frame-Options value to normalize.
693 Returns:
694 None if v is None, an empty/whitespace-only string, or case-insensitive 'null'/'none';
695 otherwise returns the stripped string value.
696 """
697 if v is None:
698 return None
699 val = v.strip()
700 if val == "" or val.lower() in ("null", "none"):
701 return None
702 return val
704 x_content_type_options_enabled: bool = Field(default=True)
705 x_xss_protection_enabled: bool = Field(default=True)
706 x_download_options_enabled: bool = Field(default=True)
707 hsts_enabled: bool = Field(default=True)
708 hsts_max_age: int = Field(default=31536000) # 1 year
709 hsts_include_subdomains: bool = Field(default=True)
710 remove_server_headers: bool = Field(default=True)
712 # Response Compression Configuration
713 compression_enabled: bool = Field(default=True, description="Enable response compression (Brotli, Zstd, GZip)")
714 compression_minimum_size: int = Field(default=500, ge=0, description="Minimum response size in bytes to compress (0 = compress all)")
715 compression_gzip_level: int = Field(default=6, ge=1, le=9, description="GZip compression level (1=fastest, 9=best compression)")
716 compression_brotli_quality: int = Field(default=4, ge=0, le=11, description="Brotli compression quality (0-3=fast, 4-9=balanced, 10-11=max)")
717 compression_zstd_level: int = Field(default=3, ge=1, le=22, description="Zstd compression level (1-3=fast, 4-9=balanced, 10+=slow)")
719 # For allowed_origins, strip '' to ensure we're passing on valid JSON via env
720 # Tell pydantic *not* to touch this env var - our validator will.
721 allowed_origins: Annotated[Set[str], NoDecode] = {
722 "http://localhost",
723 "http://localhost:4444",
724 }
726 # Security validation thresholds
727 min_secret_length: int = 32
728 min_password_length: int = 12
729 require_strong_secrets: bool = False # Default to False for backward compatibility, will be enforced in 1.0.0
731 llmchat_enabled: bool = Field(default=True, description="Enable LLM Chat feature")
732 mcpgateway_stdio_transport_enabled: bool = Field(
733 default=False,
734 description=("Enable stdio transport for MCP chat client configuration. Disabled by default; " "set true only in trusted environments that intentionally need stdio process execution."),
735 )
736 toolops_enabled: bool = Field(default=False, description="Enable ToolOps feature")
737 plugins_can_override_rbac: bool = Field(
738 default=False,
739 description=("Allow HTTP_AUTH_CHECK_PERMISSION plugins to short-circuit built-in RBAC grants. " "Disabled by default so plugin grant decisions are audit-only unless explicitly enabled."),
740 )
742 # database-backed polling settings for session message delivery
743 poll_interval: float = Field(default=1.0, description="Initial polling interval in seconds for checking new session messages")
744 max_interval: float = Field(default=5.0, description="Maximum polling interval in seconds when the session is idle")
745 backoff_factor: float = Field(default=1.5, description="Multiplier used to gradually increase the polling interval during inactivity")
747 # redis configurations for Maintaining Chat Sessions in multi-worker environment
748 llmchat_session_ttl: int = Field(default=300, description="Seconds for active_session key TTL")
749 llmchat_session_lock_ttl: int = Field(default=30, description="Seconds for lock expiry")
750 llmchat_session_lock_retries: int = Field(default=10, description="How many times to poll while waiting")
751 llmchat_session_lock_wait: float = Field(default=0.2, description="Seconds between polls")
752 llmchat_chat_history_ttl: int = Field(default=3600, description="Seconds for chat history expiry")
753 llmchat_chat_history_max_messages: int = Field(default=50, description="Maximum message history to store per user")
755 # LLM Settings (Internal API for LLM Chat)
756 llm_api_prefix: str = Field(default="/v1", description="API prefix for internal LLM endpoints")
757 llm_request_timeout: int = Field(default=120, description="Request timeout in seconds for LLM API calls")
758 llm_streaming_enabled: bool = Field(default=True, description="Enable streaming responses for LLM Chat")
759 llm_health_check_interval: int = Field(default=300, description="Provider health check interval in seconds")
761 @field_validator("allowed_roots", mode="before")
762 @classmethod
763 def parse_allowed_roots(cls, v):
764 """Parse allowed roots from environment variable or config value.
766 Args:
767 v: The input value to parse
769 Returns:
770 list: Parsed list of allowed root paths
771 """
772 if isinstance(v, str):
773 # Support both JSON array and comma-separated values
774 v = v.strip()
775 if not v:
776 return []
777 # Try JSON first
778 try:
779 loaded = orjson.loads(v)
780 if isinstance(loaded, list):
781 return loaded
782 except orjson.JSONDecodeError:
783 # Not a valid JSON array → fallback to comma-separated parsing
784 pass
785 # Fallback to comma-split
786 return [x.strip() for x in v.split(",") if x.strip()]
787 return v
789 @field_validator("jwt_secret_key", "auth_encryption_secret")
790 @classmethod
791 def validate_secrets(cls, v: Any, info: ValidationInfo) -> SecretStr:
792 """
793 Validate that secret keys meet basic security requirements.
795 This validator is applied to the `jwt_secret_key` and `auth_encryption_secret` fields.
796 It performs the following checks:
798 1. Detects default or weak secrets (e.g., "changeme", "secret", "password").
799 Logs a warning if detected.
801 2. Checks minimum length (at least 32 characters). Logs a warning if shorter.
803 3. Performs a basic entropy check (at least 10 unique characters). Logs a warning if low.
805 Notes:
806 - Logging is used for warnings; the function does not raise exceptions.
807 - The original value is returned as a `SecretStr` for safe handling.
809 Args:
810 v: The secret value to validate.
811 info: Pydantic validation info object, used to get the field name.
813 Returns:
814 SecretStr: The validated secret value, wrapped as a SecretStr if it wasn't already.
815 """
817 field_name = info.field_name
819 # Extract actual string value safely
820 if isinstance(v, SecretStr):
821 value = v.get_secret_value()
822 else:
823 value = str(v)
825 # Check for default/weak secrets
826 if not info.data.get("client_mode"):
827 weak_secrets = ["my-test-key", "my-test-salt", "changeme", "secret", "password"]
828 if value.lower() in weak_secrets:
829 logger.warning(f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! Please set a strong, unique value for production.")
831 # Check minimum length
832 if len(value) < 32:
833 logger.warning(f"⚠️ SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. Current length: {len(value)}")
835 # Basic entropy check (at least 10 unique characters)
836 if len(set(value)) < 10:
837 logger.warning(f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. Consider using a more random value.")
839 # Always return SecretStr to keep it secret-safe
840 return v if isinstance(v, SecretStr) else SecretStr(value)
842 @field_validator("basic_auth_password")
843 @classmethod
844 def validate_admin_password(cls, v: str | SecretStr, info: ValidationInfo) -> SecretStr:
845 """Validate admin password meets security requirements.
847 Args:
848 v: The admin password value to validate.
849 info: ValidationInfo containing field data.
851 Returns:
852 SecretStr: The validated admin password value, wrapped as SecretStr.
853 """
854 # Extract actual string value safely
855 if isinstance(v, SecretStr):
856 value = v.get_secret_value()
857 else:
858 value = v
860 if not info.data.get("client_mode"):
861 if value == "changeme": # nosec B105 - checking for default value
862 logger.warning("🔓 SECURITY WARNING: Default BASIC_AUTH_PASSWORD detected! Please change it if you enable API_ALLOW_BASIC_AUTH.")
864 # Note: We can't access password_min_length here as it's not set yet during validation
865 # Using default value of 8 to match the field default
866 min_length = 8 # This matches the default in password_min_length field
867 if len(value) < min_length:
868 logger.warning(f"⚠️ SECURITY WARNING: Admin password should be at least {min_length} characters long. Current length: {len(value)}")
870 # Check password complexity
871 has_upper = any(c.isupper() for c in value)
872 has_lower = any(c.islower() for c in value)
873 has_digit = any(c.isdigit() for c in value)
874 has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', value))
876 complexity_score = sum([has_upper, has_lower, has_digit, has_special])
877 if complexity_score < 3:
878 logger.warning("🔐 SECURITY WARNING: Admin password has low complexity. Should contain at least 3 of: uppercase, lowercase, digits, special characters")
880 # Always return SecretStr to keep it secret-safe
881 return v if isinstance(v, SecretStr) else SecretStr(value)
883 @field_validator("allowed_origins")
884 @classmethod
885 def validate_cors_origins(cls, v: Any, info: ValidationInfo) -> set[str] | None:
886 """Validate CORS allowed origins.
888 Args:
889 v: The set of allowed origins to validate.
890 info: ValidationInfo containing field data.
892 Returns:
893 set: The validated set of allowed origins.
895 Raises:
896 ValueError: If allowed_origins is not a set or list of strings.
897 """
898 if v is None:
899 return v
900 if not isinstance(v, (set, list)):
901 raise ValueError("allowed_origins must be a set or list of strings")
903 dangerous_origins = ["*", "null", ""]
904 if not info.data.get("client_mode"):
905 for origin in v:
906 if origin in dangerous_origins:
907 logger.warning(f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. Consider specifying explicit origins instead of wildcards.")
909 # Validate URL format
910 if not origin.startswith(("http://", "https://")) and origin not in dangerous_origins:
911 logger.warning(f"⚠️ SECURITY WARNING: Invalid origin format '{origin}'. Origins should start with http:// or https://")
913 return set({str(origin) for origin in v})
915 @field_validator("database_url")
916 @classmethod
917 def validate_database_url(cls, v: str, info: ValidationInfo) -> str:
918 """Validate database connection string security.
920 Args:
921 v: The database URL to validate.
922 info: ValidationInfo containing field data.
924 Returns:
925 str: The validated database URL.
926 """
927 # Check for hardcoded passwords in non-SQLite databases
928 if not info.data.get("client_mode"):
929 if not v.startswith("sqlite"):
930 if "password" in v and any(weak in v for weak in ["password", "123", "admin", "test"]):
931 logger.warning("Potentially weak database password detected. Consider using a stronger password.")
933 # Warn about SQLite in production
934 if v.startswith("sqlite"):
935 logger.info("Using SQLite database. Consider PostgreSQL or MySQL for production.")
937 return v
939 @model_validator(mode="after")
940 def validate_security_combinations(self) -> Self:
941 """Validate security setting combinations. Only logs warnings; no changes are made.
943 Returns:
944 Itself.
945 """
946 if not self.client_mode:
947 # Check for dangerous combinations - only log warnings, don't raise errors
948 if not self.auth_required and self.mcpgateway_ui_enabled:
949 logger.warning("🔓 SECURITY WARNING: Admin UI is enabled without authentication. Consider setting AUTH_REQUIRED=true for production.")
951 if self.skip_ssl_verify and not self.dev_mode:
952 logger.warning("🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. This is a security risk! Set SKIP_SSL_VERIFY=false for production.")
954 if self.debug and not self.dev_mode:
955 logger.warning("🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. This may leak sensitive information! Set DEBUG=false for production.")
957 return self
959 def get_security_warnings(self) -> List[str]:
960 """Get list of security warnings for current configuration.
962 Returns:
963 List[str]: List of security warning messages.
964 """
965 warnings = []
967 # Authentication warnings
968 if not self.auth_required:
969 warnings.append("🔓 Authentication is disabled - ensure this is intentional")
971 if self.basic_auth_user == "admin":
972 warnings.append("⚠️ Using default admin username - consider changing it")
974 # SSL/TLS warnings
975 if self.skip_ssl_verify:
976 warnings.append("🔓 SSL verification is disabled - not recommended for production")
978 # Debug/Dev warnings
979 if self.debug and not self.dev_mode:
980 warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage")
982 if self.dev_mode:
983 warnings.append("🔧 Development mode enabled - not for production use")
985 # CORS warnings
986 if self.cors_enabled and "*" in self.allowed_origins:
987 warnings.append("🌐 CORS allows all origins (*) - this is a security risk")
989 # Token warnings
990 if self.token_expiry > 10080: # More than 7 days
991 warnings.append("⏱️ JWT token expiry is very long - consider shorter duration")
993 # Database warnings
994 if self.database_url.startswith("sqlite") and not self.dev_mode:
995 warnings.append("💾 SQLite database in use - consider PostgreSQL/MySQL for production")
997 # Rate limiting warnings
998 if self.tool_rate_limit > 1000:
999 warnings.append("🚦 Tool rate limit is very high - may allow abuse")
1001 return warnings
1003 class SecurityStatus(TypedDict):
1004 """TypedDict for comprehensive security status."""
1006 secure_secrets: bool
1007 auth_enabled: bool
1008 ssl_verification: bool
1009 debug_disabled: bool
1010 cors_restricted: bool
1011 ui_protected: bool
1012 warnings: List[str]
1013 security_score: int
1015 def get_security_status(self) -> SecurityStatus:
1016 """Get comprehensive security status.
1018 Returns:
1019 SecurityStatus: Dictionary containing security status information including score and warnings.
1020 """
1022 # Compute a security score: 100 minus 10 for each warning
1023 security_score = max(0, 100 - 10 * len(self.get_security_warnings()))
1025 return {
1026 "secure_secrets": (self.jwt_secret_key.get_secret_value() if isinstance(self.jwt_secret_key, SecretStr) else self.jwt_secret_key)
1027 != "my-test-key", # nosec B105 - checking for default value
1028 "auth_enabled": self.auth_required,
1029 "ssl_verification": not self.skip_ssl_verify,
1030 "debug_disabled": not self.debug,
1031 "cors_restricted": "*" not in self.allowed_origins if self.cors_enabled else True,
1032 "ui_protected": not self.mcpgateway_ui_enabled or self.auth_required,
1033 "warnings": self.get_security_warnings(),
1034 "security_score": security_score,
1035 }
1037 # Max retries for HTTP requests
1038 retry_max_attempts: int = 3
1039 retry_base_delay: float = 1.0 # seconds
1040 retry_max_delay: int = 60 # seconds
1041 retry_jitter_max: float = 0.5 # fraction of base delay
1043 # HTTPX Client Configuration (for shared singleton client)
1044 # See: https://www.python-httpx.org/advanced/#pool-limits
1045 # Formula: max_connections = expected_concurrent_outbound_requests × 1.5
1046 httpx_max_connections: int = Field(
1047 default=200,
1048 ge=10,
1049 le=1000,
1050 description="Maximum total concurrent HTTP connections (global, not per-host). " "Increase for high-traffic deployments with many outbound calls.",
1051 )
1052 httpx_max_keepalive_connections: int = Field(
1053 default=100,
1054 ge=1,
1055 le=500,
1056 description="Maximum idle keepalive connections to retain (typically 50% of max_connections)",
1057 )
1058 httpx_keepalive_expiry: float = Field(
1059 default=30.0,
1060 ge=5.0,
1061 le=300.0,
1062 description="Seconds before idle keepalive connections are closed",
1063 )
1064 httpx_connect_timeout: float = Field(
1065 default=5.0,
1066 ge=1.0,
1067 le=60.0,
1068 description="Timeout in seconds for establishing new connections (5s for LAN, increase for WAN)",
1069 )
1070 httpx_read_timeout: float = Field(
1071 default=120.0,
1072 ge=1.0,
1073 le=600.0,
1074 description="Timeout in seconds for reading response data (set high for slow MCP tool calls)",
1075 )
1076 httpx_write_timeout: float = Field(
1077 default=30.0,
1078 ge=1.0,
1079 le=600.0,
1080 description="Timeout in seconds for writing request data",
1081 )
1082 httpx_pool_timeout: float = Field(
1083 default=10.0,
1084 ge=1.0,
1085 le=120.0,
1086 description="Timeout in seconds waiting for a connection from the pool (fail fast on exhaustion)",
1087 )
1088 httpx_http2_enabled: bool = Field(
1089 default=False,
1090 description="Enable HTTP/2 (requires h2 package; enable only if upstreams support HTTP/2)",
1091 )
1092 httpx_admin_read_timeout: float = Field(
1093 default=30.0,
1094 ge=1.0,
1095 le=120.0,
1096 description="Read timeout for admin UI operations (model fetching, health checks). " "Shorter than httpx_read_timeout to fail fast on admin pages.",
1097 )
1099 @field_validator("allowed_origins", mode="before")
1100 @classmethod
1101 def _parse_allowed_origins(cls, v: Any) -> Set[str]:
1102 """Parse allowed origins from environment variable or config value.
1104 Handles multiple input formats for the allowed_origins field:
1105 - JSON array string: '["http://localhost", "http://example.com"]'
1106 - Comma-separated string: "http://localhost, http://example.com"
1107 - Already parsed set/list
1109 Automatically strips whitespace and removes outer quotes if present.
1111 Args:
1112 v: The input value to parse. Can be a string (JSON or CSV), set, list, or other iterable.
1114 Returns:
1115 Set[str]: A set of allowed origin strings.
1117 Examples:
1118 >>> sorted(Settings._parse_allowed_origins('["https://a.com", "https://b.com"]'))
1119 ['https://a.com', 'https://b.com']
1120 >>> sorted(Settings._parse_allowed_origins("https://x.com , https://y.com"))
1121 ['https://x.com', 'https://y.com']
1122 >>> Settings._parse_allowed_origins('""')
1123 set()
1124 >>> Settings._parse_allowed_origins('"https://single.com"')
1125 {'https://single.com'}
1126 >>> sorted(Settings._parse_allowed_origins(['http://a.com', 'http://b.com']))
1127 ['http://a.com', 'http://b.com']
1128 >>> Settings._parse_allowed_origins({'http://existing.com'})
1129 {'http://existing.com'}
1130 """
1131 if isinstance(v, str):
1132 v = v.strip()
1133 if v[:1] in "\"'" and v[-1:] == v[:1]: # strip 1 outer quote pair
1134 v = v[1:-1]
1135 try:
1136 parsed = set(orjson.loads(v))
1137 except orjson.JSONDecodeError:
1138 parsed = {s.strip() for s in v.split(",") if s.strip()}
1139 return parsed
1140 return set(v)
1142 # Logging
1143 log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default="ERROR")
1144 log_requests: bool = Field(default=False, description="Enable request payload logging with sensitive data masking")
1145 log_format: Literal["json", "text"] = "json" # json or text
1146 log_to_file: bool = False # Enable file logging (default: stdout/stderr only)
1147 log_filemode: str = "a+" # append or overwrite
1148 log_file: Optional[str] = None # Only used if log_to_file=True
1149 log_folder: Optional[str] = None # Only used if log_to_file=True
1151 # Log Rotation (optional - only used if log_to_file=True)
1152 log_rotation_enabled: bool = False # Enable log file rotation
1153 log_max_size_mb: int = 1 # Max file size in MB before rotation (default: 1MB)
1154 log_backup_count: int = 5 # Number of backup files to keep (default: 5)
1156 # Detailed Request Logging Configuration
1157 log_detailed_max_body_size: int = Field(
1158 default=16384, # 16KB - sensible default for request body logging
1159 ge=1024,
1160 le=1048576, # Max 1MB
1161 description="Maximum request body size to log in detailed mode (bytes). Separate from log_max_size_mb which is for file rotation.",
1162 )
1164 # Optional: endpoints to skip for detailed request logging (prefix match)
1165 log_detailed_skip_endpoints: List[str] = Field(
1166 default_factory=list,
1167 description="List of path prefixes to skip when log_detailed_requests is enabled",
1168 )
1170 # Whether to attempt resolving user identity via DB fallback when logging.
1171 # Keep default False to avoid implicit DB queries during normal request handling.
1172 log_resolve_user_identity: bool = Field(
1173 default=False,
1174 description="If true, RequestLoggingMiddleware will attempt DB fallback to resolve user identity when needed",
1175 )
1177 # Sampling rate for detailed request logging (0.0-1.0). Applied when log_detailed_requests is enabled.
1178 log_detailed_sample_rate: float = Field(
1179 default=1.0,
1180 ge=0.0,
1181 le=1.0,
1182 description="Fraction of requests to sample for detailed logging (0.0-1.0)",
1183 )
1185 # Log Buffer (for in-memory storage in admin UI)
1186 log_buffer_size_mb: float = 1.0 # Size of in-memory log buffer in MB
1188 # ===================================
1189 # Observability Configuration
1190 # ===================================
1192 # Enable observability features (traces, spans, metrics)
1193 observability_enabled: bool = Field(default=False, description="Enable observability tracing and metrics collection")
1195 # Automatic HTTP request tracing
1196 observability_trace_http_requests: bool = Field(default=True, description="Automatically trace HTTP requests")
1198 # Trace retention period (days)
1199 observability_trace_retention_days: int = Field(default=7, ge=1, description="Number of days to retain trace data")
1201 # Maximum traces to store (prevents unbounded growth)
1202 observability_max_traces: int = Field(default=100000, ge=1000, description="Maximum number of traces to retain")
1204 # Sample rate (0.0 to 1.0) - 1.0 means trace everything
1205 observability_sample_rate: float = Field(default=1.0, ge=0.0, le=1.0, description="Trace sampling rate (0.0-1.0)")
1207 # Include paths for tracing (regex patterns)
1208 observability_include_paths: List[str] = Field(
1209 default_factory=lambda: [
1210 r"^/rpc/?$",
1211 r"^/sse$",
1212 r"^/message$",
1213 r"^/mcp(?:/|$)",
1214 r"^/servers/[^/]+/mcp/?$",
1215 r"^/servers/[^/]+/sse$",
1216 r"^/servers/[^/]+/message$",
1217 r"^/a2a(?:/|$)",
1218 ],
1219 description="Regex patterns to include for tracing (when empty, all paths are eligible before excludes)",
1220 )
1222 # Exclude paths from tracing (regex patterns)
1223 observability_exclude_paths: List[str] = Field(
1224 default_factory=lambda: ["/health", "/healthz", "/ready", "/metrics", "/static/.*"],
1225 description="Regex patterns to exclude from tracing (applies after include patterns)",
1226 )
1228 # Enable performance metrics
1229 observability_metrics_enabled: bool = Field(default=True, description="Enable metrics collection")
1231 # Enable span events
1232 observability_events_enabled: bool = Field(default=True, description="Enable event logging within spans")
1234 # Correlation ID Settings
1235 correlation_id_enabled: bool = Field(default=True, description="Enable automatic correlation ID tracking for requests")
1236 correlation_id_header: str = Field(default="X-Correlation-ID", description="HTTP header name for correlation ID")
1237 correlation_id_preserve: bool = Field(default=True, description="Preserve correlation IDs from incoming requests")
1238 correlation_id_response_header: bool = Field(default=True, description="Include correlation ID in response headers")
1240 # ===================================
1241 # Database Query Logging (N+1 Detection)
1242 # ===================================
1243 db_query_log_enabled: bool = Field(default=False, description="Enable database query logging to file (for N+1 detection)")
1244 db_query_log_file: str = Field(default="logs/db-queries.log", description="Path to database query log file")
1245 db_query_log_json_file: str = Field(default="logs/db-queries.jsonl", description="Path to JSON Lines query log file")
1246 db_query_log_format: str = Field(default="both", description="Log format: 'json', 'text', or 'both'")
1247 db_query_log_min_queries: int = Field(default=1, ge=1, description="Only log requests with >= N queries")
1248 db_query_log_include_params: bool = Field(default=False, description="Include query parameters (may expose sensitive data)")
1249 db_query_log_detect_n1: bool = Field(default=True, description="Automatically detect and flag N+1 query patterns")
1250 db_query_log_n1_threshold: int = Field(default=3, ge=2, description="Number of similar queries to flag as potential N+1")
1252 # Structured Logging Configuration
1253 structured_logging_enabled: bool = Field(default=True, description="Enable structured JSON logging with database persistence")
1254 structured_logging_database_enabled: bool = Field(default=False, description="Persist structured logs to database (enables /api/logs/* endpoints, impacts performance)")
1255 structured_logging_external_enabled: bool = Field(default=False, description="Send logs to external systems")
1257 # Performance Tracking Configuration
1258 performance_tracking_enabled: bool = Field(default=True, description="Enable performance tracking and metrics")
1259 performance_threshold_database_query_ms: float = Field(default=100.0, description="Alert threshold for database queries (ms)")
1260 performance_threshold_tool_invocation_ms: float = Field(default=2000.0, description="Alert threshold for tool invocations (ms)")
1261 performance_threshold_resource_read_ms: float = Field(default=1000.0, description="Alert threshold for resource reads (ms)")
1262 performance_threshold_http_request_ms: float = Field(default=500.0, description="Alert threshold for HTTP requests (ms)")
1263 performance_degradation_multiplier: float = Field(default=1.5, description="Alert if performance degrades by this multiplier vs baseline")
1265 # Audit Trail Configuration
1266 # Audit trail logging is disabled by default for performance.
1267 # When enabled, it logs all CRUD operations (create, read, update, delete) on resources.
1268 # WARNING: This causes a database write on every API request and can cause significant load.
1269 audit_trail_enabled: bool = Field(default=False, description="Enable audit trail logging to database for compliance")
1270 permission_audit_enabled: bool = Field(
1271 default=False,
1272 description="Enable permission audit logging for RBAC checks (writes a row per permission check)",
1273 )
1275 # Security Logging Configuration
1276 # Security event logging is disabled by default for performance.
1277 # When enabled, it logs authentication attempts, authorization failures, and security events.
1278 # WARNING: "all" level logs every request and can cause significant database write load.
1279 security_logging_enabled: bool = Field(default=False, description="Enable security event logging to database")
1280 security_logging_level: Literal["all", "failures_only", "high_severity"] = Field(
1281 default="failures_only",
1282 description=(
1283 "Security logging level: "
1284 "'all' = log all events including successful auth (high DB load), "
1285 "'failures_only' = log only authentication/authorization failures, "
1286 "'high_severity' = log only high/critical severity events"
1287 ),
1288 )
1289 security_failed_auth_threshold: int = Field(default=5, description="Failed auth attempts before high severity alert")
1290 security_threat_score_alert: float = Field(default=0.7, description="Threat score threshold for alerts (0.0-1.0)")
1291 security_rate_limit_window_minutes: int = Field(default=5, description="Time window for rate limit checks (minutes)")
1293 # API Token Tracking Configuration
1294 # Controls how token usage and last_used timestamps are tracked
1295 token_usage_logging_enabled: bool = Field(default=True, description="Enable API token usage logging middleware")
1296 token_last_used_update_interval_minutes: int = Field(default=5, ge=1, le=1440, description="Minimum minutes between last_used timestamp updates (rate-limits DB writes)")
1298 # Metrics Aggregation Configuration
1299 metrics_aggregation_enabled: bool = Field(default=True, description="Enable automatic log aggregation into performance metrics")
1300 metrics_aggregation_backfill_hours: int = Field(default=6, ge=0, le=168, description="Hours of structured logs to backfill into performance metrics on startup")
1301 metrics_aggregation_window_minutes: int = Field(default=5, description="Time window for metrics aggregation (minutes)")
1302 metrics_aggregation_auto_start: bool = Field(default=False, description="Automatically run the log aggregation loop on application startup")
1303 yield_batch_size: int = Field(
1304 default=1000,
1305 ge=100,
1306 le=100000,
1307 description="Number of rows fetched per batch when streaming hourly metric data from the database. "
1308 "Used to limit memory usage during aggregation and percentile calculations. "
1309 "Smaller values reduce memory footprint but increase DB round-trips; larger values improve throughput "
1310 "at the cost of higher memory usage.",
1311 )
1313 # Execution Metrics Recording
1314 # Controls whether tool/resource/prompt/server/A2A execution metrics are written to the database.
1315 # Disable if using external observability (ELK, Datadog, Splunk).
1316 # Note: Does NOT affect log aggregation (METRICS_AGGREGATION_ENABLED) or Prometheus (ENABLE_METRICS).
1317 db_metrics_recording_enabled: bool = Field(
1318 default=True, description="Enable recording of execution metrics (tool/resource/prompt/server/A2A) to database. Disable if using external observability."
1319 )
1321 # Metrics Buffer Configuration (for batching tool/resource/prompt metrics writes)
1322 metrics_buffer_enabled: bool = Field(default=True, description="Enable buffered metrics writes (reduces DB pressure under load)")
1323 metrics_buffer_flush_interval: int = Field(default=60, ge=5, le=300, description="Seconds between automatic metrics buffer flushes")
1324 metrics_buffer_max_size: int = Field(default=1000, ge=100, le=10000, description="Maximum buffered metrics before forced flush")
1326 # Metrics Cache Configuration (for caching aggregate metrics queries)
1327 metrics_cache_enabled: bool = Field(default=True, description="Enable in-memory caching for aggregate metrics queries")
1328 metrics_cache_ttl_seconds: int = Field(default=60, ge=1, le=300, description="TTL for cached aggregate metrics in seconds")
1330 # Metrics Cleanup Configuration (automatic deletion of old metrics)
1331 metrics_cleanup_enabled: bool = Field(default=True, description="Enable automatic cleanup of old metrics data")
1332 metrics_retention_days: int = Field(default=7, ge=1, le=365, description="Days to retain raw metrics before cleanup (fallback when rollup disabled)")
1333 metrics_cleanup_interval_hours: int = Field(default=1, ge=1, le=168, description="Hours between automatic cleanup runs")
1334 metrics_cleanup_batch_size: int = Field(default=10000, ge=100, le=100000, description="Batch size for metrics deletion (prevents long locks)")
1336 # Metrics Rollup Configuration (hourly aggregation for historical queries)
1337 metrics_rollup_enabled: bool = Field(default=True, description="Enable hourly metrics rollup for efficient historical queries")
1338 metrics_rollup_interval_hours: int = Field(default=1, ge=1, le=24, description="Hours between rollup runs")
1339 metrics_rollup_retention_days: int = Field(default=365, ge=30, le=3650, description="Days to retain hourly rollup data")
1340 metrics_rollup_late_data_hours: int = Field(
1341 default=1, ge=1, le=48, description="Hours to re-process on each run to catch late-arriving data (smaller = less CPU, larger = more tolerance for delayed metrics)"
1342 )
1343 metrics_delete_raw_after_rollup: bool = Field(default=True, description="Delete raw metrics after hourly rollup exists (recommended for production)")
1344 metrics_delete_raw_after_rollup_hours: int = Field(default=1, ge=1, le=8760, description="Hours to retain raw metrics when hourly rollup exists")
1346 # Auth Cache Configuration (reduces DB queries during authentication)
1347 auth_cache_enabled: bool = Field(default=True, description="Enable Redis/in-memory caching for authentication data (user, team, revocation)")
1348 auth_cache_user_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for cached user data")
1349 auth_cache_revocation_ttl: int = Field(default=30, ge=5, le=120, description="TTL in seconds for token revocation cache (security-critical, keep short)")
1350 auth_cache_team_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for team membership cache")
1351 auth_cache_role_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user role in team cache")
1352 auth_cache_teams_enabled: bool = Field(default=True, description="Enable caching for get_user_teams() (default: true)")
1353 auth_cache_teams_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user teams list cache")
1354 auth_cache_batch_queries: bool = Field(default=True, description="Batch auth DB queries into single call (reduces 3 queries to 1)")
1356 # Registry Cache Configuration (reduces DB queries for list endpoints)
1357 registry_cache_enabled: bool = Field(default=True, description="Enable caching for registry list endpoints (tools, prompts, resources, etc.)")
1358 registry_cache_tools_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for tools list cache")
1359 registry_cache_prompts_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for prompts list cache")
1360 registry_cache_resources_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for resources list cache")
1361 registry_cache_agents_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for agents list cache")
1362 registry_cache_servers_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for servers list cache")
1363 registry_cache_gateways_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for gateways list cache")
1364 registry_cache_catalog_ttl: int = Field(default=300, ge=60, le=600, description="TTL in seconds for catalog servers list cache (external catalog, changes infrequently)")
1366 # Tool Lookup Cache Configuration (reduces hot-path DB lookups in invoke_tool)
1367 tool_lookup_cache_enabled: bool = Field(default=True, description="Enable tool lookup cache (tool name -> tool config)")
1368 tool_lookup_cache_ttl_seconds: int = Field(default=60, ge=5, le=600, description="TTL in seconds for tool lookup cache entries")
1369 tool_lookup_cache_negative_ttl_seconds: int = Field(default=10, ge=1, le=60, description="TTL in seconds for negative tool lookup cache entries")
1370 tool_lookup_cache_l1_maxsize: int = Field(default=10000, ge=100, le=1000000, description="Max entries for in-memory tool lookup cache (L1)")
1371 tool_lookup_cache_l2_enabled: bool = Field(default=True, description="Enable Redis-backed tool lookup cache (L2) when cache_type=redis")
1373 # Admin Stats Cache Configuration (reduces dashboard query overhead)
1374 admin_stats_cache_enabled: bool = Field(default=True, description="Enable caching for admin dashboard statistics")
1375 admin_stats_cache_system_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for system stats cache")
1376 admin_stats_cache_observability_ttl: int = Field(default=30, ge=10, le=120, description="TTL in seconds for observability stats cache")
1377 admin_stats_cache_tags_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for tags listing cache")
1378 admin_stats_cache_plugins_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for plugin stats cache")
1379 admin_stats_cache_performance_ttl: int = Field(default=60, ge=15, le=300, description="TTL in seconds for performance aggregates cache")
1381 # Team Member Count Cache Configuration (reduces N+1 queries in admin UI)
1382 team_member_count_cache_enabled: bool = Field(default=True, description="Enable Redis caching for team member counts")
1383 team_member_count_cache_ttl: int = Field(default=300, ge=30, le=3600, description="TTL in seconds for team member count cache (default: 5 minutes)")
1385 # Log Search Configuration
1386 log_search_max_results: int = Field(default=1000, description="Maximum results per log search query")
1387 log_retention_days: int = Field(default=30, description="Number of days to retain logs in database")
1389 # External Log Integration Configuration
1390 elasticsearch_enabled: bool = Field(default=False, description="Send logs to Elasticsearch")
1391 elasticsearch_url: Optional[str] = Field(default=None, description="Elasticsearch cluster URL")
1392 elasticsearch_index_prefix: str = Field(default="mcpgateway-logs", description="Elasticsearch index prefix")
1393 syslog_enabled: bool = Field(default=False, description="Send logs to syslog")
1394 syslog_host: Optional[str] = Field(default=None, description="Syslog server host")
1395 syslog_port: int = Field(default=514, description="Syslog server port")
1396 webhook_logging_enabled: bool = Field(default=False, description="Send logs to webhook endpoints")
1397 webhook_logging_urls: List[str] = Field(default_factory=list, description="Webhook URLs for log delivery")
1399 @field_validator("log_level", mode="before")
1400 @classmethod
1401 def validate_log_level(cls, v: str) -> str:
1402 """
1403 Normalize and validate the log level value.
1405 Ensures that the input string matches one of the allowed log levels,
1406 case-insensitively. The value is uppercased before validation so that
1407 "debug", "Debug", etc. are all accepted as "DEBUG".
1409 Args:
1410 v (str): The log level string provided via configuration or environment.
1412 Returns:
1413 str: The validated and normalized (uppercase) log level.
1415 Raises:
1416 ValueError: If the provided value is not one of
1417 {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}.
1418 """
1419 allowed = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
1420 v_up = v.upper()
1421 if v_up not in allowed:
1422 raise ValueError(f"Invalid log_level: {v}")
1423 return v_up
1425 # Transport
1426 mcpgateway_ws_relay_enabled: bool = Field(default=False, description="Enable WebSocket JSON-RPC relay endpoint at /ws")
1427 mcpgateway_reverse_proxy_enabled: bool = Field(default=False, description="Enable reverse-proxy transport endpoints under /reverse-proxy/*")
1428 transport_type: str = "all" # http, ws, sse, all
1429 websocket_ping_interval: int = 30 # seconds
1430 sse_retry_timeout: int = 5000 # milliseconds - client retry interval on disconnect
1431 sse_keepalive_enabled: bool = True # Enable SSE keepalive events
1432 sse_keepalive_interval: int = 30 # seconds between keepalive events
1433 sse_send_timeout: float = 30.0 # seconds - timeout for ASGI send() calls, protects against hung connections
1434 sse_rapid_yield_window_ms: int = 1000 # milliseconds - time window for rapid yield detection
1435 sse_rapid_yield_max: int = 50 # max yields per window before assuming client disconnected (0=disabled)
1437 # Gateway/Server Connection Timeout
1438 # Timeout in seconds for HTTP requests to registered gateways and MCP servers.
1439 # Used by: GatewayService, ToolService, ServerService for health checks and tool invocations.
1440 # Note: Previously part of federation settings, retained for gateway connectivity.
1441 federation_timeout: int = 120
1443 # SSO
1444 # For sso_issuers strip out quotes to ensure we're passing valid JSON via env
1445 sso_issuers: Optional[list[HttpUrl]] = Field(default=None)
1447 @field_validator("sso_issuers", mode="before")
1448 @classmethod
1449 def parse_issuers(cls, v: Any) -> list[str]:
1450 """
1451 Parse and validate the SSO issuers configuration value.
1453 Accepts:
1454 - JSON array string: '["https://idp1.com", "https://idp2.com"]'
1455 - Comma-separated string: "https://idp1.com, https://idp2.com"
1456 - Empty string or None → []
1457 - Already-parsed list
1459 Args:
1460 v: The input value to parse.
1462 Returns:
1463 list[str]: Parsed list of issuer URLs.
1465 Raises:
1466 ValueError: If the input is not a valid format.
1467 """
1468 if v is None:
1469 return []
1470 if isinstance(v, list):
1471 return v
1472 if isinstance(v, str):
1473 s = v.strip()
1474 if not s:
1475 return []
1476 if s.startswith("["):
1477 try:
1478 parsed = orjson.loads(s)
1479 return parsed if isinstance(parsed, list) else []
1480 except orjson.JSONDecodeError:
1481 raise ValueError(f"Invalid JSON for SSO_ISSUERS: {v!r}")
1482 # Fallback to comma-separated parsing
1483 return [item.strip() for item in s.split(",") if item.strip()]
1484 raise ValueError("Invalid type for SSO_ISSUERS")
1486 # Resources
1487 resource_cache_size: int = 1000
1488 resource_cache_ttl: int = 3600 # seconds
1489 max_resource_size: int = 10 * 1024 * 1024 # 10MB
1490 allowed_mime_types: Set[str] = {
1491 "text/plain",
1492 "text/markdown",
1493 "text/html",
1494 "application/json",
1495 "application/xml",
1496 "image/png",
1497 "image/jpeg",
1498 "image/gif",
1499 }
1501 # Tools
1502 tool_timeout: int = 60 # seconds
1503 max_tool_retries: int = 3
1504 tool_rate_limit: int = 100 # requests per minute
1505 tool_concurrent_limit: int = 10
1507 # MCP Session Pool - reduces per-request latency from ~20ms to ~1-2ms
1508 # Disabled by default for safety. Enable explicitly in production after testing.
1509 mcp_session_pool_enabled: bool = False
1510 mcp_session_pool_max_per_key: int = 10 # Max sessions per (URL, identity, transport)
1511 mcp_session_pool_ttl: float = 300.0 # Session TTL in seconds
1512 mcp_session_pool_health_check_interval: float = 60.0 # Idle time before health check (aligned with health_check_interval)
1513 mcp_session_pool_acquire_timeout: float = 30.0 # Timeout waiting for session slot
1514 mcp_session_pool_create_timeout: float = 30.0 # Timeout creating new session
1515 mcp_session_pool_circuit_breaker_threshold: int = 5 # Failures before circuit opens
1516 mcp_session_pool_circuit_breaker_reset: float = 60.0 # Seconds before circuit resets
1517 mcp_session_pool_idle_eviction: float = 600.0 # Evict idle pool keys after this time
1518 # Transport timeout for pooled sessions (default 30s to match MCP SDK default).
1519 # This timeout applies to all HTTP operations (connect, read, write) on pooled sessions.
1520 # Use a higher value for deployments with long-running tool calls.
1521 mcp_session_pool_transport_timeout: float = 30.0
1522 # Force explicit RPC (list_tools) on gateway health checks even when session is fresh.
1523 # Off by default: pool's internal staleness check (idle > health_check_interval) handles this.
1524 # Enable for stricter health verification at the cost of ~5ms latency per check.
1525 mcp_session_pool_explicit_health_rpc: bool = False
1526 # Configurable health check chain - ordered list of methods to try.
1527 # Options: ping, list_tools, list_prompts, list_resources, skip
1528 # Default: ping,skip - try lightweight ping, skip if unsupported (for legacy servers)
1529 mcp_session_pool_health_check_methods: List[str] = ["ping", "skip"]
1530 # Timeout in seconds for each health check attempt
1531 mcp_session_pool_health_check_timeout: float = 5.0
1532 mcp_session_pool_identity_headers: List[str] = ["authorization", "x-tenant-id", "x-user-id", "x-api-key", "cookie", "x-mcp-session-id"]
1533 # Timeout for session/transport cleanup operations (__aexit__ calls).
1534 # This prevents CPU spin loops when internal tasks (like post_writer waiting on
1535 # memory streams) don't respond to cancellation. Does NOT affect tool execution
1536 # time - only cleanup of idle/released sessions. Increase if you see frequent
1537 # "cleanup timed out" warnings; decrease for faster recovery from spin loops.
1538 mcp_session_pool_cleanup_timeout: float = 5.0
1540 # Timeout for SSE task group cleanup (seconds).
1541 # When an SSE connection is cancelled, this controls how long to wait for
1542 # internal tasks to respond before forcing cleanup. Shorter values reduce
1543 # CPU waste during anyio _deliver_cancellation spin loops but may interrupt
1544 # legitimate cleanup. Only affects cancelled connections, not normal operation.
1545 # See: https://github.com/agronholm/anyio/issues/695
1546 sse_task_group_cleanup_timeout: float = 5.0
1548 # =========================================================================
1549 # EXPERIMENTAL: anyio _deliver_cancellation spin loop workaround
1550 # =========================================================================
1551 # When enabled, monkey-patches anyio's CancelScope._deliver_cancellation to
1552 # limit the number of retry iterations. This prevents 100% CPU spin loops
1553 # when tasks don't respond to CancelledError (anyio issue #695).
1554 #
1555 # WARNING: This is a workaround for an upstream issue. May be removed when
1556 # anyio or MCP SDK fix the underlying problem. Enable only if you experience
1557 # CPU spin loops during SSE/MCP connection cleanup.
1558 #
1559 # Trade-offs when enabled:
1560 # - Prevents indefinite CPU spin (good)
1561 # - May leave some tasks uncancelled after max iterations (usually harmless)
1562 # - Worker recycling (GUNICORN_MAX_REQUESTS) cleans up orphaned tasks
1563 #
1564 # See: https://github.com/agronholm/anyio/issues/695
1565 # Env: ANYIO_CANCEL_DELIVERY_PATCH_ENABLED
1566 anyio_cancel_delivery_patch_enabled: bool = False
1568 # Maximum iterations for _deliver_cancellation before giving up.
1569 # Only used when anyio_cancel_delivery_patch_enabled=True.
1570 # Higher values = more attempts to cancel tasks, but longer potential spin.
1571 # Lower values = faster recovery, but more orphaned tasks.
1572 # Env: ANYIO_CANCEL_DELIVERY_MAX_ITERATIONS
1573 anyio_cancel_delivery_max_iterations: int = 100
1575 # Session Affinity
1576 mcpgateway_session_affinity_enabled: bool = False # Global session affinity toggle
1577 mcpgateway_session_affinity_ttl: int = 300 # Session affinity binding TTL
1578 mcpgateway_session_affinity_max_sessions: int = 1 # Max sessions per identity for affinity
1579 mcpgateway_pool_rpc_forward_timeout: int = 30 # Timeout for forwarding RPC requests to owner worker
1581 # Prompts
1582 prompt_cache_size: int = 100
1583 max_prompt_size: int = 100 * 1024 # 100KB
1584 prompt_render_timeout: int = 10 # seconds
1586 # Health Checks
1587 # Interval in seconds between health checks (aligned with mcp_session_pool_health_check_interval)
1588 health_check_interval: int = 60
1589 # Timeout in seconds for each health check request
1590 health_check_timeout: int = 5
1591 # Per-check timeout (seconds) to bound total time of one gateway health check
1592 # Env: GATEWAY_HEALTH_CHECK_TIMEOUT
1593 gateway_health_check_timeout: float = 5.0
1594 # Consecutive failures before marking gateway offline
1595 unhealthy_threshold: int = 3
1596 # Max concurrent health checks per worker
1597 max_concurrent_health_checks: int = 10
1599 # Auto-refresh tools/resources/prompts from gateways during health checks
1600 # When enabled, tools/resources/prompts are fetched and synced with DB during health checks
1601 auto_refresh_servers: bool = Field(default=False, description="Enable automatic tool/resource/prompt refresh during gateway health checks")
1603 # Per-gateway refresh configuration (used when auto_refresh_servers is True)
1604 # Gateways can override this with their own refresh_interval_seconds
1605 gateway_auto_refresh_interval: int = Field(default=300, ge=60, description="Default refresh interval in seconds for gateway tools/resources/prompts sync (minimum 60 seconds)")
1607 # Validation Gateway URL
1608 gateway_validation_timeout: int = 5 # seconds
1609 gateway_max_redirects: int = 5
1611 filelock_name: str = "gateway_service_leader.lock"
1613 # Default Roots
1614 default_roots: List[str] = []
1616 # Database
1617 db_driver: str = "mariadb+mariadbconnector"
1618 db_pool_size: int = 200
1619 db_max_overflow: int = 10
1620 db_pool_timeout: int = 30
1621 db_pool_recycle: int = 3600
1622 db_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total)
1623 db_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter
1624 db_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap)
1626 # Database Performance Optimization
1627 use_postgresdb_percentiles: bool = Field(
1628 default=True,
1629 description="Use database-native percentile functions (percentile_cont) for performance metrics. "
1630 "When enabled, PostgreSQL uses native SQL percentile calculations (5-10x faster). "
1631 "When disabled or using SQLite, falls back to Python-based percentile calculations. "
1632 "Recommended: true for PostgreSQL, auto-detected for SQLite.",
1633 )
1635 # psycopg3-specific: Number of times a query must be executed before it's
1636 # prepared server-side. Set to 0 to disable, 1 to prepare immediately.
1637 # Default of 5 balances memory usage with query performance.
1638 db_prepare_threshold: int = Field(default=5, ge=0, le=100, description="psycopg3 prepare_threshold for auto-prepared statements")
1640 # Connection pool class: "auto" (default), "null", or "queue"
1641 # - "auto": Uses NullPool when PgBouncer detected, QueuePool otherwise
1642 # - "null": Always use NullPool (recommended with PgBouncer - lets PgBouncer handle pooling)
1643 # - "queue": Always use QueuePool (application-side pooling)
1644 db_pool_class: Literal["auto", "null", "queue"] = Field(
1645 default="auto",
1646 description="Connection pool class: auto (NullPool with PgBouncer), null, or queue",
1647 )
1649 # Pre-ping connections before checkout (validates connection is alive)
1650 # - "auto": Enabled for non-PgBouncer, disabled for PgBouncer (default)
1651 # - "true": Always enable (adds SELECT 1 overhead but catches stale connections)
1652 # - "false": Always disable
1653 db_pool_pre_ping: Literal["auto", "true", "false"] = Field(
1654 default="auto",
1655 description="Pre-ping connections: auto, true, or false",
1656 )
1658 # SQLite busy timeout: Maximum time (ms) SQLite will wait to acquire a database lock before returning SQLITE_BUSY.
1659 db_sqlite_busy_timeout: int = Field(default=5000, ge=1000, le=60000, description="SQLite busy timeout in milliseconds (default: 5000ms)")
1661 # Cache
1662 cache_type: Literal["redis", "memory", "none", "database"] = "database" # memory or redis or database
1663 redis_url: Optional[str] = "redis://localhost:6379/0"
1664 cache_prefix: str = "mcpgw:"
1665 session_ttl: int = 3600
1666 message_ttl: int = 600
1667 redis_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total)
1668 redis_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter
1669 redis_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap)
1671 # GlobalConfig In-Memory Cache (Issue #1715)
1672 # Caches GlobalConfig (passthrough headers) to eliminate redundant DB queries
1673 global_config_cache_ttl: int = Field(
1674 default=60,
1675 ge=5,
1676 le=3600,
1677 description="TTL in seconds for GlobalConfig in-memory cache (default: 60)",
1678 )
1680 # A2A Stats In-Memory Cache
1681 # Caches A2A agent counts (total, active) to eliminate redundant COUNT queries
1682 a2a_stats_cache_ttl: int = Field(
1683 default=30,
1684 ge=5,
1685 le=3600,
1686 description="TTL in seconds for A2A stats in-memory cache (default: 30)",
1687 )
1689 # Redis Parser Configuration (ADR-026)
1690 # hiredis C parser provides up to 83x faster response parsing for large responses
1691 redis_parser: Literal["auto", "hiredis", "python"] = Field(
1692 default="auto",
1693 description="Redis protocol parser: auto (use hiredis if available), hiredis (require hiredis), python (pure-Python)",
1694 )
1696 # Redis Connection Pool - Performance Optimized
1697 redis_decode_responses: bool = Field(default=True, description="Return strings instead of bytes")
1698 redis_max_connections: int = Field(default=50, description="Connection pool size per worker")
1699 redis_socket_timeout: float = Field(default=2.0, description="Socket read/write timeout in seconds")
1700 redis_socket_connect_timeout: float = Field(default=2.0, description="Connection timeout in seconds")
1701 redis_retry_on_timeout: bool = Field(default=True, description="Retry commands on timeout")
1702 redis_health_check_interval: int = Field(default=30, description="Seconds between connection health checks (0=disabled)")
1704 # Redis Leader Election - Multi-Node Deployments
1705 redis_leader_ttl: int = Field(default=15, description="Leader election TTL in seconds")
1706 redis_leader_key: str = Field(default="gateway_service_leader", description="Leader key name")
1707 redis_leader_heartbeat_interval: int = Field(default=5, description="Seconds between leader heartbeats")
1709 # streamable http transport
1710 use_stateful_sessions: bool = False # Set to False to use stateless sessions without event store
1711 json_response_enabled: bool = True # Enable JSON responses instead of SSE streams
1712 streamable_http_max_events_per_stream: int = 100 # Ring buffer capacity per stream
1713 streamable_http_event_ttl: int = 3600 # Event stream TTL in seconds (1 hour)
1715 # Development
1716 dev_mode: bool = False
1717 reload: bool = False
1718 debug: bool = False
1720 # Observability (OpenTelemetry)
1721 otel_enable_observability: bool = Field(default=False, description="Enable OpenTelemetry observability")
1722 otel_traces_exporter: str = Field(default="otlp", description="Traces exporter: otlp, jaeger, zipkin, console, none")
1723 otel_exporter_otlp_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint (e.g., http://localhost:4317)")
1724 otel_exporter_otlp_protocol: str = Field(default="grpc", description="OTLP protocol: grpc or http")
1725 otel_exporter_otlp_insecure: bool = Field(default=True, description="Use insecure connection for OTLP")
1726 otel_exporter_otlp_headers: Optional[str] = Field(default=None, description="OTLP headers (comma-separated key=value)")
1727 otel_exporter_jaeger_endpoint: Optional[str] = Field(default=None, description="Jaeger endpoint")
1728 otel_exporter_zipkin_endpoint: Optional[str] = Field(default=None, description="Zipkin endpoint")
1729 otel_service_name: str = Field(default="mcp-gateway", description="Service name for traces")
1730 otel_resource_attributes: Optional[str] = Field(default=None, description="Resource attributes (comma-separated key=value)")
1731 otel_bsp_max_queue_size: int = Field(default=2048, description="Max queue size for batch span processor")
1732 otel_bsp_max_export_batch_size: int = Field(default=512, description="Max export batch size")
1733 otel_bsp_schedule_delay: int = Field(default=5000, description="Schedule delay in milliseconds")
1735 # ===================================
1736 # Well-Known URI Configuration
1737 # ===================================
1739 # Enable well-known URI endpoints
1740 well_known_enabled: bool = True
1742 # robots.txt content (default: disallow all crawling for private API)
1743 well_known_robots_txt: str = """User-agent: *
1744Disallow: /
1746# ContextForge is a private API gateway
1747# Public crawling is disabled by default"""
1749 # security.txt content (optional, user-defined)
1750 # Example: "Contact: security@example.com\nExpires: 2025-12-31T23:59:59Z\nPreferred-Languages: en"
1751 well_known_security_txt: str = ""
1753 # Enable security.txt only if content is provided
1754 well_known_security_txt_enabled: bool = False
1756 # Additional custom well-known files (JSON format)
1757 # Example: {"ai.txt": "This service uses AI for...", "dnt-policy.txt": "Do Not Track policy..."}
1758 well_known_custom_files: str = "{}"
1760 # Cache control for well-known files (seconds)
1761 well_known_cache_max_age: int = 3600 # 1 hour default
1763 # ===================================
1764 # Performance / Startup Tuning
1765 # ===================================
1767 slug_refresh_batch_size: int = Field(default=1000, description="Batch size for gateway/tool slug refresh at startup")
1768 model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore")
1770 gateway_tool_name_separator: str = "-"
1771 valid_slug_separator_regexp: ClassVar[str] = r"^(-{1,2}|[_.])$"
1773 @field_validator("gateway_tool_name_separator")
1774 @classmethod
1775 def must_be_allowed_sep(cls, v: str) -> str:
1776 """Validate the gateway tool name separator.
1778 Args:
1779 v: The separator value to validate.
1781 Returns:
1782 The validated separator, defaults to '-' if invalid.
1784 Examples:
1785 >>> Settings.must_be_allowed_sep('-')
1786 '-'
1787 >>> Settings.must_be_allowed_sep('--')
1788 '--'
1789 >>> Settings.must_be_allowed_sep('_')
1790 '_'
1791 >>> Settings.must_be_allowed_sep('.')
1792 '.'
1793 >>> Settings.must_be_allowed_sep('invalid')
1794 '-'
1795 """
1796 if not re.fullmatch(cls.valid_slug_separator_regexp, v):
1797 logger.warning(
1798 f"Invalid gateway_tool_name_separator '{v}'. Must be '-', '--', '_' or '.'. Defaulting to '-'.",
1799 stacklevel=2,
1800 )
1801 return "-"
1802 return v
1804 @property
1805 def custom_well_known_files(self) -> Dict[str, str]:
1806 """Parse custom well-known files from JSON string.
1808 Returns:
1809 Dict[str, str]: Parsed custom well-known files mapping filename to content.
1810 """
1811 try:
1812 return orjson.loads(self.well_known_custom_files) if self.well_known_custom_files else {}
1813 except orjson.JSONDecodeError:
1814 logger.error(f"Invalid JSON in WELL_KNOWN_CUSTOM_FILES: {self.well_known_custom_files}")
1815 return {}
1817 @field_validator("well_known_security_txt_enabled", mode="after")
1818 @classmethod
1819 def _auto_enable_security_txt(cls, v: Any, info: ValidationInfo) -> bool:
1820 """Auto-enable security.txt if content is provided.
1822 Args:
1823 v: The current value of well_known_security_txt_enabled.
1824 info: ValidationInfo containing field data.
1826 Returns:
1827 bool: True if security.txt content is provided, otherwise the original value.
1828 """
1829 if info.data and "well_known_security_txt" in info.data:
1830 return bool(info.data["well_known_security_txt"].strip())
1831 return bool(v)
1833 # -------------------------------
1834 # Flexible list parsing for envs
1835 # -------------------------------
1836 @field_validator(
1837 "sso_entra_admin_groups",
1838 "sso_trusted_domains",
1839 "sso_auto_admin_domains",
1840 "sso_github_admin_orgs",
1841 "sso_google_admin_domains",
1842 "insecure_queryparam_auth_allowed_hosts",
1843 "mcpgateway_ui_hide_sections",
1844 "mcpgateway_ui_hide_header_items",
1845 mode="before",
1846 )
1847 @classmethod
1848 def _parse_list_from_env(cls, v: None | str | list[str]) -> list[str]:
1849 """Parse list fields from environment values.
1851 Accepts either JSON arrays (e.g. '["a","b"]') or comma-separated
1852 strings (e.g. 'a,b'). Empty or None becomes an empty list.
1854 Args:
1855 v: The value to parse, can be None, list, or string.
1857 Returns:
1858 list: Parsed list of values.
1860 Raises:
1861 ValueError: If the value type is invalid for list field parsing.
1862 """
1863 if v is None:
1864 return []
1865 if isinstance(v, list):
1866 return v
1867 if isinstance(v, str):
1868 s = v.strip()
1869 if not s:
1870 return []
1871 if s.startswith("["):
1872 try:
1873 parsed = orjson.loads(s)
1874 return parsed if isinstance(parsed, list) else []
1875 except Exception:
1876 logger.warning("Invalid JSON list in env for list field; falling back to CSV parsing")
1877 # CSV fallback
1878 return [item.strip() for item in s.split(",") if item.strip()]
1879 raise ValueError("Invalid type for list field")
1881 @field_validator("mcpgateway_ui_hide_sections", mode="after")
1882 @classmethod
1883 def _validate_ui_hide_sections(cls, value: list[str]) -> list[str]:
1884 """Normalize and filter hidable UI sections.
1886 Args:
1887 value: Candidate section identifiers from environment/config.
1889 Returns:
1890 list[str]: Normalized unique section identifiers.
1891 """
1892 normalized: list[str] = []
1893 seen: set[str] = set()
1895 for item in value:
1896 candidate = str(item).strip().lower()
1897 if not candidate:
1898 continue
1899 candidate = UI_HIDE_SECTION_ALIASES.get(candidate, candidate)
1900 if candidate not in UI_HIDABLE_SECTIONS:
1901 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_SECTIONS item: %s", item)
1902 continue
1903 if candidate not in seen:
1904 seen.add(candidate)
1905 normalized.append(candidate)
1907 return normalized
1909 @field_validator("mcpgateway_ui_hide_header_items", mode="after")
1910 @classmethod
1911 def _validate_ui_hide_header_items(cls, value: list[str]) -> list[str]:
1912 """Normalize and filter hidable header items.
1914 Args:
1915 value: Candidate header identifiers from environment/config.
1917 Returns:
1918 list[str]: Normalized unique header identifiers.
1919 """
1920 normalized: list[str] = []
1921 seen: set[str] = set()
1923 for item in value:
1924 candidate = str(item).strip().lower()
1925 if not candidate:
1926 continue
1927 if candidate not in UI_HIDABLE_HEADER_ITEMS:
1928 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_HEADER_ITEMS item: %s", item)
1929 continue
1930 if candidate not in seen:
1931 seen.add(candidate)
1932 normalized.append(candidate)
1934 return normalized
1936 @property
1937 def api_key(self) -> str:
1938 """
1939 Generate API key from auth credentials.
1941 Returns:
1942 str: API key string in the format "username:password".
1944 Examples:
1945 >>> from mcpgateway.config import Settings
1946 >>> settings = Settings(basic_auth_user="admin", basic_auth_password="secret")
1947 >>> settings.api_key
1948 'admin:secret'
1949 >>> settings = Settings(basic_auth_user="user123", basic_auth_password="pass456")
1950 >>> settings.api_key
1951 'user123:pass456'
1952 """
1953 return f"{self.basic_auth_user}:{self.basic_auth_password.get_secret_value()}"
1955 @property
1956 def supports_http(self) -> bool:
1957 """Check if HTTP transport is enabled.
1959 Returns:
1960 bool: True if HTTP transport is enabled, False otherwise.
1962 Examples:
1963 >>> settings = Settings(transport_type="http")
1964 >>> settings.supports_http
1965 True
1966 >>> settings = Settings(transport_type="all")
1967 >>> settings.supports_http
1968 True
1969 >>> settings = Settings(transport_type="ws")
1970 >>> settings.supports_http
1971 False
1972 """
1973 return self.transport_type in ["http", "all"]
1975 @property
1976 def supports_websocket(self) -> bool:
1977 """Check if WebSocket transport is enabled.
1979 Returns:
1980 bool: True if WebSocket transport is enabled, False otherwise.
1982 Examples:
1983 >>> settings = Settings(transport_type="ws")
1984 >>> settings.supports_websocket
1985 True
1986 >>> settings = Settings(transport_type="all")
1987 >>> settings.supports_websocket
1988 True
1989 >>> settings = Settings(transport_type="http")
1990 >>> settings.supports_websocket
1991 False
1992 """
1993 return self.transport_type in ["ws", "all"]
1995 @property
1996 def supports_sse(self) -> bool:
1997 """Check if SSE transport is enabled.
1999 Returns:
2000 bool: True if SSE transport is enabled, False otherwise.
2002 Examples:
2003 >>> settings = Settings(transport_type="sse")
2004 >>> settings.supports_sse
2005 True
2006 >>> settings = Settings(transport_type="all")
2007 >>> settings.supports_sse
2008 True
2009 >>> settings = Settings(transport_type="http")
2010 >>> settings.supports_sse
2011 False
2012 """
2013 return self.transport_type in ["sse", "all"]
2015 class DatabaseSettings(TypedDict):
2016 """TypedDict for SQLAlchemy database settings."""
2018 pool_size: int
2019 max_overflow: int
2020 pool_timeout: int
2021 pool_recycle: int
2022 connect_args: dict[str, Any] # consider more specific type if needed
2024 @property
2025 def database_settings(self) -> DatabaseSettings:
2026 """
2027 Get SQLAlchemy database settings.
2029 Returns:
2030 DatabaseSettings: Dictionary containing SQLAlchemy database configuration options.
2032 Examples:
2033 >>> from mcpgateway.config import Settings
2034 >>> s = Settings(database_url='sqlite:///./test.db')
2035 >>> isinstance(s.database_settings, dict)
2036 True
2037 """
2038 return {
2039 "pool_size": self.db_pool_size,
2040 "max_overflow": self.db_max_overflow,
2041 "pool_timeout": self.db_pool_timeout,
2042 "pool_recycle": self.db_pool_recycle,
2043 "connect_args": {"check_same_thread": False} if self.database_url.startswith("sqlite") else {},
2044 }
2046 class CORSSettings(TypedDict):
2047 """TypedDict for CORS settings."""
2049 allow_origins: NotRequired[List[str]]
2050 allow_credentials: NotRequired[bool]
2051 allow_methods: NotRequired[List[str]]
2052 allow_headers: NotRequired[List[str]]
2054 @property
2055 def cors_settings(self) -> CORSSettings:
2056 """Get CORS settings.
2058 Returns:
2059 CORSSettings: Dictionary containing CORS configuration options.
2061 Examples:
2062 >>> s = Settings(cors_enabled=True, allowed_origins={'http://localhost'})
2063 >>> cors = s.cors_settings
2064 >>> cors['allow_origins']
2065 ['http://localhost']
2066 >>> cors['allow_credentials']
2067 True
2068 >>> s2 = Settings(cors_enabled=False)
2069 >>> s2.cors_settings
2070 {}
2071 """
2072 return (
2073 {
2074 "allow_origins": list(self.allowed_origins),
2075 "allow_credentials": True,
2076 "allow_methods": ["*"],
2077 "allow_headers": ["*"],
2078 }
2079 if self.cors_enabled
2080 else {}
2081 )
2083 def validate_transport(self) -> None:
2084 """
2085 Validate transport configuration.
2087 Raises:
2088 ValueError: If the transport type is not one of the valid options.
2090 Examples:
2091 >>> from mcpgateway.config import Settings
2092 >>> s = Settings(transport_type='http')
2093 >>> s.validate_transport() # no error
2094 >>> s2 = Settings(transport_type='invalid')
2095 >>> try:
2096 ... s2.validate_transport()
2097 ... except ValueError as e:
2098 ... print('error')
2099 error
2100 """
2101 # valid_types = {"http", "ws", "sse", "all"}
2102 valid_types = {"sse", "streamablehttp", "all", "http"}
2103 if self.transport_type not in valid_types:
2104 raise ValueError(f"Invalid transport type. Must be one of: {valid_types}")
2106 def validate_database(self) -> None:
2107 """Validate database configuration.
2109 Examples:
2110 >>> from mcpgateway.config import Settings
2111 >>> s = Settings(database_url='sqlite:///./test.db')
2112 >>> s.validate_database() # Should create the directory if it does not exist
2113 """
2114 if self.database_url.startswith("sqlite"):
2115 db_path = Path(self.database_url.replace("sqlite:///", ""))
2116 db_dir = db_path.parent
2117 if not db_dir.exists():
2118 db_dir.mkdir(parents=True)
2120 # Validation patterns for safe display (configurable)
2121 validation_dangerous_html_pattern: str = (
2122 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>"
2123 )
2125 validation_dangerous_js_pattern: str = r"(?i)(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)"
2127 validation_allowed_url_schemes: List[str] = ["http://", "https://", "ws://", "wss://"]
2129 # Character validation patterns
2130 validation_name_pattern: str = r"^[a-zA-Z0-9_.\- ]+$" # Allow spaces for names (literal space, not \s to reject control chars)
2131 validation_identifier_pattern: str = r"^[a-zA-Z0-9_\-\.]+$" # No spaces for IDs
2132 validation_safe_uri_pattern: str = r"^[a-zA-Z0-9_\-.:/?=&%{}]+$"
2133 validation_unsafe_uri_pattern: str = r'[<>"\'\\]'
2134 validation_tool_name_pattern: str = r"^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$" # MCP tool naming per SEP-986
2135 validation_tool_method_pattern: str = r"^[a-zA-Z][a-zA-Z0-9_\./-]*$"
2137 # MCP-compliant size limits (configurable via env)
2138 validation_max_name_length: int = 255
2139 validation_max_description_length: int = 8192 # 8KB
2140 validation_max_template_length: int = 65536 # 64KB
2141 validation_max_content_length: int = 1048576 # 1MB
2142 validation_max_json_depth: int = Field(
2143 default=int(os.getenv("VALIDATION_MAX_JSON_DEPTH", "30")),
2144 description=(
2145 "Maximum allowed JSON nesting depth for tool/resource schemas. "
2146 "Increased from 10 to 30 for compatibility with deeply nested schemas "
2147 "like Notion MCP (issue #1542). Override with VALIDATION_MAX_JSON_DEPTH "
2148 "environment variable. Minimum: 1, Maximum: 100"
2149 ),
2150 ge=1,
2151 le=100,
2152 )
2153 validation_max_url_length: int = 2048
2154 validation_max_rpc_param_size: int = 262144 # 256KB
2156 validation_max_method_length: int = 128
2158 # Allowed MIME types
2159 validation_allowed_mime_types: List[str] = [
2160 "text/plain",
2161 "text/html",
2162 "text/css",
2163 "text/markdown",
2164 "text/javascript",
2165 "application/json",
2166 "application/xml",
2167 "application/pdf",
2168 "image/png",
2169 "image/jpeg",
2170 "image/gif",
2171 "image/svg+xml",
2172 "application/octet-stream",
2173 ]
2175 # Rate limiting
2176 validation_max_requests_per_minute: int = 60
2178 # Header passthrough feature (disabled by default for security)
2179 enable_header_passthrough: bool = Field(default=False, description="Enable HTTP header passthrough feature (WARNING: Security implications - only enable if needed)")
2180 enable_overwrite_base_headers: bool = Field(default=False, description="Enable overwriting of base headers")
2182 # Passthrough headers configuration
2183 default_passthrough_headers: List[str] = Field(default_factory=list)
2185 # Passthrough headers source priority
2186 # - "env": Environment variable always wins (ideal for Kubernetes/containerized deployments)
2187 # - "db": Database take precedence if configured, env as fallback (default)
2188 # - "merge": Union of both sources - env provides base, other configuration in DB can add more headers
2189 passthrough_headers_source: Literal["env", "db", "merge"] = Field(
2190 default="db",
2191 description="Source priority for passthrough headers: env (environment always wins), db (database wins, default), merge (combine both)",
2192 )
2194 # ===================================
2195 # Pagination Configuration
2196 # ===================================
2198 # Default number of items per page for paginated endpoints
2199 pagination_default_page_size: int = Field(default=50, ge=1, le=1000, description="Default number of items per page")
2201 # Maximum allowed items per page (prevents abuse)
2202 pagination_max_page_size: int = Field(default=500, ge=1, le=10000, description="Maximum allowed items per page")
2204 # Minimum items per page
2205 pagination_min_page_size: int = Field(default=1, ge=1, description="Minimum items per page")
2207 # Threshold for switching from offset to cursor-based pagination
2208 pagination_cursor_threshold: int = Field(default=10000, ge=1, description="Threshold for cursor-based pagination")
2210 # Enable cursor-based pagination globally
2211 pagination_cursor_enabled: bool = Field(default=True, description="Enable cursor-based pagination")
2213 # Default sort field for paginated queries
2214 pagination_default_sort_field: str = Field(default="created_at", description="Default sort field")
2216 # Default sort order for paginated queries
2217 pagination_default_sort_order: str = Field(default="desc", pattern="^(asc|desc)$", description="Default sort order")
2219 # Maximum offset allowed for offset-based pagination (prevents abuse)
2220 pagination_max_offset: int = Field(default=100000, ge=0, description="Maximum offset for pagination")
2222 # Cache pagination counts for performance (seconds)
2223 pagination_count_cache_ttl: int = Field(default=300, ge=0, description="Cache TTL for pagination counts")
2225 # Enable pagination links in API responses
2226 pagination_include_links: bool = Field(default=True, description="Include pagination links")
2228 # Base URL for pagination links (defaults to request URL)
2229 pagination_base_url: Optional[str] = Field(default=None, description="Base URL for pagination links")
2231 # Ed25519 keys for signing
2232 enable_ed25519_signing: bool = Field(default=False, description="Enable Ed25519 signing for certificates")
2233 prev_ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Previous Ed25519 private key for signing")
2234 prev_ed25519_public_key: Optional[str] = Field(default=None, description="Derived previous Ed25519 public key")
2235 ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Ed25519 private key for signing")
2236 ed25519_public_key: Optional[str] = Field(default=None, description="Derived Ed25519 public key")
2238 @model_validator(mode="after")
2239 def derive_public_keys(self) -> "Settings":
2240 """
2241 Derive public keys after all individual field validations are complete.
2243 Returns:
2244 Settings: The updated Settings instance with derived public keys.
2245 """
2246 for private_key_field in ["ed25519_private_key", "prev_ed25519_private_key"]:
2247 public_key_field = private_key_field.replace("private", "public")
2249 # 1. Get the private key SecretStr object
2250 private_key_secret: SecretStr = getattr(self, private_key_field)
2252 # 2. Proceed only if a key is present and the public key hasn't been set
2253 pem = private_key_secret.get_secret_value().strip()
2254 if not pem:
2255 continue
2257 try:
2258 # Load the private key
2259 private_key = serialization.load_pem_private_key(pem.encode(), password=None)
2260 if not isinstance(private_key, ed25519.Ed25519PrivateKey):
2261 # This check is useful, though model_validator should not raise
2262 # for an invalid key if the field validator has already passed.
2263 continue
2265 # Derive and PEM-encode the public key
2266 public_key = private_key.public_key()
2267 public_pem = public_key.public_bytes(
2268 encoding=serialization.Encoding.PEM,
2269 format=serialization.PublicFormat.SubjectPublicKeyInfo,
2270 ).decode()
2272 # 3. Set the public key attribute directly on the model instance (self)
2273 setattr(self, public_key_field, public_pem)
2274 # logger.info(f"Derived and stored {public_key_field} automatically.")
2276 except Exception:
2277 logger.warning("Failed to derive public key for private_key")
2278 # You can choose to raise an error here if a failure should halt model creation
2280 return self
2282 def __init__(self, **kwargs: Any) -> None:
2283 """Initialize Settings with environment variable parsing.
2285 Args:
2286 **kwargs: Keyword arguments passed to parent Settings class
2288 Raises:
2289 ValueError: When environment variable parsing fails or produces invalid data
2291 Examples:
2292 >>> import os
2293 >>> # Test with no environment variable set
2294 >>> old_val = os.environ.get('DEFAULT_PASSTHROUGH_HEADERS')
2295 >>> if 'DEFAULT_PASSTHROUGH_HEADERS' in os.environ:
2296 ... del os.environ['DEFAULT_PASSTHROUGH_HEADERS']
2297 >>> s = Settings()
2298 >>> s.default_passthrough_headers
2299 ['X-Tenant-Id', 'X-Trace-Id']
2300 >>> # Restore original value if it existed
2301 >>> if old_val is not None:
2302 ... os.environ['DEFAULT_PASSTHROUGH_HEADERS'] = old_val
2303 """
2304 super().__init__(**kwargs)
2306 # Parse DEFAULT_PASSTHROUGH_HEADERS environment variable
2307 default_value = os.environ.get("DEFAULT_PASSTHROUGH_HEADERS")
2308 if default_value:
2309 try:
2310 # Try JSON parsing first
2311 self.default_passthrough_headers = orjson.loads(default_value)
2312 if not isinstance(self.default_passthrough_headers, list):
2313 raise ValueError("Must be a JSON array")
2314 except (orjson.JSONDecodeError, ValueError):
2315 # Fallback to comma-separated parsing
2316 self.default_passthrough_headers = [h.strip() for h in default_value.split(",") if h.strip()]
2317 logger.info(f"Parsed comma-separated passthrough headers: {self.default_passthrough_headers}")
2318 else:
2319 # Safer defaults without Authorization header
2320 self.default_passthrough_headers = ["X-Tenant-Id", "X-Trace-Id"]
2322 # Configure environment-aware CORS origins if not explicitly set via env or kwargs
2323 # Only apply defaults if using the default allowed_origins value
2324 if not os.environ.get("ALLOWED_ORIGINS") and "allowed_origins" not in kwargs and self.allowed_origins == {"http://localhost", "http://localhost:4444"}:
2325 if self.environment == "development":
2326 self.allowed_origins = {
2327 "http://localhost",
2328 "http://localhost:3000",
2329 "http://localhost:8080",
2330 "http://127.0.0.1:3000",
2331 "http://127.0.0.1:8080",
2332 f"http://localhost:{self.port}",
2333 f"http://127.0.0.1:{self.port}",
2334 }
2335 else:
2336 # Production origins - construct from app_domain (extract hostname from HttpUrl)
2337 app_domain_host = urlparse(str(self.app_domain)).hostname or "localhost"
2338 self.allowed_origins = {f"https://{app_domain_host}", f"https://app.{app_domain_host}", f"https://admin.{app_domain_host}"}
2340 # MCP transport auth policy:
2341 # - If MCP_REQUIRE_AUTH is unset, derive it from AUTH_REQUIRED
2342 # - If AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false is explicit, emit a warning
2343 if self.mcp_require_auth is None:
2344 self.mcp_require_auth = bool(self.auth_required)
2345 logger.info(
2346 "MCP_REQUIRE_AUTH not set; defaulting to %s to match AUTH_REQUIRED=%s.",
2347 self.mcp_require_auth,
2348 self.auth_required,
2349 )
2350 elif self.auth_required and self.mcp_require_auth is False:
2351 logger.warning("AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false. MCP endpoints (/servers/*/mcp) allow unauthenticated access to public items.")
2353 # Validate proxy auth configuration
2354 if not self.mcp_client_auth_enabled and self.trust_proxy_auth and not self.trust_proxy_auth_dangerously:
2355 logger.warning(
2356 "TRUST_PROXY_AUTH=true ignored because TRUST_PROXY_AUTH_DANGEROUSLY is false "
2357 "while MCP_CLIENT_AUTH_ENABLED=false. Set TRUST_PROXY_AUTH_DANGEROUSLY=true "
2358 "only behind a strictly trusted authentication proxy."
2359 )
2360 self.trust_proxy_auth = False
2361 elif not self.mcp_client_auth_enabled and self.trust_proxy_auth and self.trust_proxy_auth_dangerously:
2362 logger.warning("TRUST_PROXY_AUTH_DANGEROUSLY=true acknowledged. Requests may trust identity headers from the upstream proxy.")
2363 elif not self.mcp_client_auth_enabled and not self.trust_proxy_auth:
2364 logger.warning(
2365 "MCP client authentication is disabled but trust_proxy_auth is not set. "
2366 "This is a security risk! Set TRUST_PROXY_AUTH=true only if ContextForge "
2367 "is behind a trusted authentication proxy."
2368 )
2370 if not self.auth_required and self.allow_unauthenticated_admin:
2371 logger.warning("ALLOW_UNAUTHENTICATED_ADMIN=true acknowledged while AUTH_REQUIRED=false. Unauthenticated requests may receive admin context.")
2373 # Masking value for all sensitive data
2374 masked_auth_value: str = "*****"
2376 def log_summary(self) -> None:
2377 """
2378 Log a summary of the application settings.
2380 Dumps the current settings to a dictionary while excluding sensitive
2381 information such as `database_url` and `memcached_url`, and logs it
2382 at the INFO level.
2384 This method is useful for debugging or auditing purposes without
2385 exposing credentials or secrets in logs.
2386 """
2387 summary = self.model_dump(exclude={"database_url", "memcached_url"})
2388 logger.info(f"Application settings summary: {summary}")
2390 ENABLE_METRICS: bool = Field(False, description="Enable Prometheus metrics endpoint at /metrics/prometheus (requires authentication)")
2391 METRICS_EXCLUDED_HANDLERS: str = Field("", description="Comma-separated regex patterns for paths to exclude from metrics")
2392 METRICS_NAMESPACE: str = Field("default", description="Prometheus metrics namespace")
2393 METRICS_SUBSYSTEM: str = Field("", description="Prometheus metrics subsystem")
2394 METRICS_CUSTOM_LABELS: str = Field("", description='Comma-separated "key=value" pairs for static custom labels')
2397@lru_cache()
2398def get_settings(**kwargs: Any) -> Settings:
2399 """Get cached settings instance.
2401 Args:
2402 **kwargs: Keyword arguments to pass to the Settings setup.
2404 Returns:
2405 Settings: A cached instance of the Settings class.
2407 Examples:
2408 >>> settings = get_settings()
2409 >>> isinstance(settings, Settings)
2410 True
2411 >>> # Second call returns the same cached instance
2412 >>> settings2 = get_settings()
2413 >>> settings is settings2
2414 True
2415 """
2416 # Instantiate a fresh Pydantic Settings object,
2417 # loading from env vars or .env exactly once.
2418 cfg = Settings(**kwargs)
2419 # Validate that transport_type is correct; will
2420 # raise if mis-configured.
2421 cfg.validate_transport()
2422 # Ensure sqlite DB directories exist if needed.
2423 cfg.validate_database()
2424 # Return the one-and-only Settings instance (cached).
2425 return cfg
2428def generate_settings_schema() -> dict[str, Any]:
2429 """
2430 Return the JSON Schema describing the Settings model.
2432 This schema can be used for validation or documentation purposes.
2434 Returns:
2435 dict: A dictionary representing the JSON Schema of the Settings model.
2436 """
2437 return Settings.model_json_schema(mode="validation")
2440# Lazy "instance" of settings
2441class LazySettingsWrapper:
2442 """Lazily initialize settings singleton on getattr"""
2444 @property
2445 def plugins(self) -> Any:
2446 """Access plugin framework settings via ``settings.plugins``.
2448 Returns a ``LazySettingsWrapper`` from the plugin framework that
2449 provides lightweight ``@property`` accessors for startup-critical
2450 fields and a ``__getattr__`` fallback to the full ``PluginsSettings``.
2452 Returns:
2453 The plugin framework settings wrapper.
2454 """
2455 # First-Party
2456 from mcpgateway.plugins.framework.settings import settings as _plugin_settings # pylint: disable=import-outside-toplevel
2458 return _plugin_settings
2460 def __getattr__(self, key: str) -> Any:
2461 """Get the real settings object and forward to it
2463 Args:
2464 key: The key to fetch from settings
2466 Returns:
2467 Any: The value of the attribute on the settings
2468 """
2469 return getattr(get_settings(), key)
2472settings = LazySettingsWrapper()
2475if __name__ == "__main__":
2476 if "--schema" in sys.argv:
2477 schema = generate_settings_schema()
2478 print(orjson.dumps(schema, option=orjson.OPT_INDENT_2).decode())
2479 sys.exit(0)
2480 settings.log_summary()