Coverage for mcpgateway / config.py: 100%

839 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/config.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti, Manav Gupta 

6 

7MCP Gateway Configuration. 

8This module defines configuration settings for the MCP Gateway using Pydantic. 

9It loads configuration from environment variables with sensible defaults. 

10 

11Environment variables: 

12- APP_NAME: Gateway name (default: "MCP_Gateway") 

13- HOST: Host to bind to (default: "127.0.0.1") 

14- PORT: Port to listen on (default: 4444) 

15- DATABASE_URL: SQLite database URL (default: "sqlite:///./mcp.db") 

16- BASIC_AUTH_USER: Username for API Basic auth when enabled (default: "admin") 

17- BASIC_AUTH_PASSWORD: Password for API Basic auth when enabled (default: "changeme") 

18- LOG_LEVEL: Logging level (default: "INFO") 

19- SKIP_SSL_VERIFY: Disable SSL verification (default: False) 

20- AUTH_REQUIRED: Require authentication (default: True) 

21- TRANSPORT_TYPE: Transport mechanisms (default: "all") 

22- DOCS_ALLOW_BASIC_AUTH: Allow basic auth for docs (default: False) 

23- RESOURCE_CACHE_SIZE: Max cached resources (default: 1000) 

24- RESOURCE_CACHE_TTL: Cache TTL in seconds (default: 3600) 

25- TOOL_TIMEOUT: Tool invocation timeout (default: 60) 

26- PROMPT_CACHE_SIZE: Max cached prompts (default: 100) 

27- HEALTH_CHECK_INTERVAL: Gateway health check interval (default: 300) 

28- REQUIRE_TOKEN_EXPIRATION: Require JWT tokens to have expiration (default: True) 

29- REQUIRE_JTI: Require JTI claim in tokens for revocation (default: True) 

30- REQUIRE_USER_IN_DB: Require all users to exist in database (default: False) 

31 

32Examples: 

33 >>> from mcpgateway.config import Settings 

34 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret') 

35 >>> s.api_key 

36 'admin:secret' 

37 >>> s2 = Settings(transport_type='http') 

38 >>> s2.validate_transport() # no error 

39 >>> s3 = Settings(transport_type='invalid') 

40 >>> try: 

41 ... s3.validate_transport() 

42 ... except ValueError as e: 

43 ... print('error') 

44 error 

45 >>> s4 = Settings(database_url='sqlite:///./test.db') 

46 >>> isinstance(s4.database_settings, dict) 

47 True 

48""" 

49 

50# Standard 

51from functools import lru_cache 

52from importlib.resources import files 

53import logging 

54import os 

55from pathlib import Path 

56import re 

57import sys 

58from typing import Annotated, Any, ClassVar, Dict, List, Literal, NotRequired, Optional, Self, Set, TypedDict 

59 

60# Third-Party 

61from cryptography.hazmat.primitives import serialization 

62from cryptography.hazmat.primitives.asymmetric import ed25519 

63import orjson 

64from pydantic import Field, field_validator, HttpUrl, model_validator, PositiveInt, SecretStr, ValidationInfo 

65from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict 

66 

67# Only configure basic logging if no handlers exist yet 

68# This prevents conflicts with LoggingService while ensuring config logging works 

69if not logging.getLogger().handlers: 

70 logging.basicConfig( 

71 level=logging.INFO, 

72 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

73 datefmt="%Y-%m-%dT%H:%M:%S", 

74 ) 

75 

76logger = logging.getLogger(__name__) 

77 

78 

79def _normalize_env_list_vars() -> None: 

80 """Normalize list-typed env vars to valid JSON arrays. 

81 

82 Ensures env values parse cleanly when providers expect JSON for complex types. 

83 If a value is empty or CSV, convert to a JSON array string. 

84 """ 

85 keys = [ 

86 "SSO_TRUSTED_DOMAINS", 

87 "SSO_AUTO_ADMIN_DOMAINS", 

88 "SSO_GITHUB_ADMIN_ORGS", 

89 "SSO_GOOGLE_ADMIN_DOMAINS", 

90 "SSO_ENTRA_ADMIN_GROUPS", 

91 "LOG_DETAILED_SKIP_ENDPOINTS", 

92 ] 

93 for key in keys: 

94 raw = os.environ.get(key) 

95 if raw is None: 

96 continue 

97 s = raw.strip() 

98 if not s: 

99 os.environ[key] = "[]" 

100 continue 

101 if s.startswith("["): 

102 # Already JSON-like, keep as is 

103 try: 

104 orjson.loads(s) 

105 continue 

106 except Exception: 

107 pass # nosec B110 - Intentionally continue with CSV parsing if JSON parsing fails 

108 # Convert CSV to JSON array 

109 items = [item.strip() for item in s.split(",") if item.strip()] 

110 os.environ[key] = orjson.dumps(items).decode() 

111 

112 

113_normalize_env_list_vars() 

114 

115 

116# Default content type for outgoing requests to Forge 

117FORGE_CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

118 

119 

120class Settings(BaseSettings): 

121 """ 

122 MCP Gateway configuration settings. 

123 

124 Examples: 

125 >>> from mcpgateway.config import Settings 

126 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret') 

127 >>> s.api_key 

128 'admin:secret' 

129 >>> s2 = Settings(transport_type='http') 

130 >>> s2.validate_transport() # no error 

131 >>> s3 = Settings(transport_type='invalid') 

132 >>> try: 

133 ... s3.validate_transport() 

134 ... except ValueError as e: 

135 ... print('error') 

136 error 

137 >>> s4 = Settings(database_url='sqlite:///./test.db') 

138 >>> isinstance(s4.database_settings, dict) 

139 True 

140 >>> s5 = Settings() 

141 >>> s5.app_name 

142 'MCP_Gateway' 

143 >>> s5.host in ('0.0.0.0', '127.0.0.1') # Default can be either 

144 True 

145 >>> s5.port 

146 4444 

147 >>> s5.auth_required 

148 True 

149 >>> isinstance(s5.allowed_origins, set) 

150 True 

151 >>> s6 = Settings(log_detailed_skip_endpoints=["/metrics", "/health"]) 

152 >>> s6.log_detailed_skip_endpoints 

153 ['/metrics', '/health'] 

154 >>> s7 = Settings(log_detailed_sample_rate=0.5) 

155 >>> s7.log_detailed_sample_rate 

156 0.5 

157 >>> s8 = Settings(log_resolve_user_identity=True) 

158 >>> s8.log_resolve_user_identity 

159 True 

160 >>> s9 = Settings() 

161 >>> s9.log_detailed_skip_endpoints 

162 [] 

163 >>> s9.log_detailed_sample_rate 

164 1.0 

165 >>> s9.log_resolve_user_identity 

166 False 

167 """ 

168 

169 # Basic Settings 

170 app_name: str = "MCP_Gateway" 

171 host: str = "127.0.0.1" 

172 port: PositiveInt = Field(default=4444, ge=1, le=65535) 

173 client_mode: bool = False 

174 docs_allow_basic_auth: bool = False # Allow basic auth for docs 

175 api_allow_basic_auth: bool = Field( 

176 default=False, 

177 description="Allow Basic authentication for API endpoints. Disabled by default for security. Use JWT or API tokens instead.", 

178 ) 

179 database_url: str = Field( 

180 default="sqlite:///./mcp.db", 

181 description=( 

182 "Database connection URL. Supports SQLite, PostgreSQL, MySQL/MariaDB. " 

183 "For PostgreSQL with custom schema, use the 'options' query parameter: " 

184 "postgresql://user:pass@host:5432/db?options=-c%20search_path=schema_name " 

185 "(See Issue #1535 for details)" 

186 ), 

187 ) 

188 

189 # Absolute paths resolved at import-time (still override-able via env vars) 

190 templates_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "templates"))) 

191 static_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "static"))) 

192 

193 # Template auto-reload: False for production (default), True for development 

194 # Disabling prevents re-parsing templates on each request, improving performance under load 

195 # Use TEMPLATES_AUTO_RELOAD=true for development (make dev sets this automatically) 

196 templates_auto_reload: bool = Field(default=False, description="Auto-reload Jinja2 templates on change (enable for development)") 

197 

198 app_root_path: str = "" 

199 

200 # Protocol 

201 protocol_version: str = "2025-06-18" 

202 

203 # Authentication 

204 basic_auth_user: str = "admin" 

205 basic_auth_password: SecretStr = Field(default=SecretStr("changeme")) 

206 jwt_algorithm: str = "HS256" 

207 jwt_secret_key: SecretStr = Field(default=SecretStr("my-test-key")) 

208 jwt_public_key_path: str = "" 

209 jwt_private_key_path: str = "" 

210 jwt_audience: str = "mcpgateway-api" 

211 jwt_issuer: str = "mcpgateway" 

212 jwt_audience_verification: bool = True 

213 jwt_issuer_verification: bool = True 

214 auth_required: bool = True 

215 token_expiry: int = 10080 # minutes 

216 

217 require_token_expiration: bool = Field(default=True, description="Require all JWT tokens to have expiration claims (secure default)") 

218 require_jti: bool = Field(default=True, description="Require JTI (JWT ID) claim in all tokens for revocation support (secure default)") 

219 require_user_in_db: bool = Field( 

220 default=False, 

221 description="Require all authenticated users to exist in the database. When true, disables the platform admin bootstrap mechanism. WARNING: Enabling this on a fresh deployment will lock you out.", 

222 ) 

223 embed_environment_in_tokens: bool = Field(default=False, description="Embed environment claim in gateway-issued JWTs for environment isolation") 

224 validate_token_environment: bool = Field(default=False, description="Reject tokens with mismatched environment claim (tokens without env claim are allowed)") 

225 

226 # JSON Schema Validation for registration (Tool Input Schemas, Prompt schemas, etc) 

227 json_schema_validation_strict: bool = Field(default=True, description="Strict schema validation mode - reject invalid JSON schemas") 

228 

229 # SSO Configuration 

230 sso_enabled: bool = Field(default=False, description="Enable Single Sign-On authentication") 

231 sso_github_enabled: bool = Field(default=False, description="Enable GitHub OAuth authentication") 

232 sso_github_client_id: Optional[str] = Field(default=None, description="GitHub OAuth client ID") 

233 sso_github_client_secret: Optional[SecretStr] = Field(default=None, description="GitHub OAuth client secret") 

234 

235 sso_google_enabled: bool = Field(default=False, description="Enable Google OAuth authentication") 

236 sso_google_client_id: Optional[str] = Field(default=None, description="Google OAuth client ID") 

237 sso_google_client_secret: Optional[SecretStr] = Field(default=None, description="Google OAuth client secret") 

238 

239 sso_ibm_verify_enabled: bool = Field(default=False, description="Enable IBM Security Verify OIDC authentication") 

240 sso_ibm_verify_client_id: Optional[str] = Field(default=None, description="IBM Security Verify client ID") 

241 sso_ibm_verify_client_secret: Optional[SecretStr] = Field(default=None, description="IBM Security Verify client secret") 

242 sso_ibm_verify_issuer: Optional[str] = Field(default=None, description="IBM Security Verify OIDC issuer URL") 

243 

244 sso_okta_enabled: bool = Field(default=False, description="Enable Okta OIDC authentication") 

245 sso_okta_client_id: Optional[str] = Field(default=None, description="Okta client ID") 

246 sso_okta_client_secret: Optional[SecretStr] = Field(default=None, description="Okta client secret") 

247 sso_okta_issuer: Optional[str] = Field(default=None, description="Okta issuer URL") 

248 

249 sso_keycloak_enabled: bool = Field(default=False, description="Enable Keycloak OIDC authentication") 

250 sso_keycloak_base_url: Optional[str] = Field(default=None, description="Keycloak base URL (e.g., https://keycloak.example.com)") 

251 sso_keycloak_realm: str = Field(default="master", description="Keycloak realm name") 

252 sso_keycloak_client_id: Optional[str] = Field(default=None, description="Keycloak client ID") 

253 sso_keycloak_client_secret: Optional[SecretStr] = Field(default=None, description="Keycloak client secret") 

254 sso_keycloak_map_realm_roles: bool = Field(default=True, description="Map Keycloak realm roles to gateway teams") 

255 sso_keycloak_map_client_roles: bool = Field(default=False, description="Map Keycloak client roles to gateway RBAC") 

256 sso_keycloak_username_claim: str = Field(default="preferred_username", description="JWT claim for username") 

257 

258 # Security Validation & Sanitization 

259 experimental_validate_io: bool = Field(default=False, description="Enable experimental input validation and output sanitization") 

260 validation_middleware_enabled: bool = Field(default=False, description="Enable validation middleware for all requests") 

261 validation_strict: bool = Field(default=True, description="Strict validation mode - reject on violations") 

262 sanitize_output: bool = Field(default=True, description="Sanitize output to remove control characters") 

263 allowed_roots: List[str] = Field(default_factory=list, description="Allowed root paths for resource access") 

264 max_path_depth: int = Field(default=10, description="Maximum allowed path depth") 

265 max_param_length: int = Field(default=10000, description="Maximum parameter length") 

266 dangerous_patterns: List[str] = Field( 

267 default_factory=lambda: [ 

268 r"[;&|`$(){}\[\]<>]", # Shell metacharacters 

269 r"\.\.[\\/]", # Path traversal 

270 r"[\x00-\x1f\x7f-\x9f]", # Control characters 

271 ], 

272 description="Regex patterns for dangerous input", 

273 ) 

274 

275 sso_keycloak_email_claim: str = Field(default="email", description="JWT claim for email") 

276 sso_keycloak_groups_claim: str = Field(default="groups", description="JWT claim for groups/roles") 

277 

278 sso_entra_enabled: bool = Field(default=False, description="Enable Microsoft Entra ID OIDC authentication") 

279 sso_entra_client_id: Optional[str] = Field(default=None, description="Microsoft Entra ID client ID") 

280 sso_entra_client_secret: Optional[SecretStr] = Field(default=None, description="Microsoft Entra ID client secret") 

281 sso_entra_tenant_id: Optional[str] = Field(default=None, description="Microsoft Entra ID tenant ID") 

282 sso_entra_groups_claim: str = Field(default="groups", description="JWT claim for EntraID groups (groups/roles)") 

283 sso_entra_admin_groups: Annotated[list[str], NoDecode()] = Field(default_factory=list, description="EntraID groups granting platform_admin role (CSV/JSON)") 

284 sso_entra_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map EntraID groups to Context Forge roles (JSON: {group_id: role_name})") 

285 sso_entra_default_role: Optional[str] = Field(default=None, description="Default role for EntraID users without group mapping (None = no role assigned)") 

286 sso_entra_sync_roles_on_login: bool = Field(default=True, description="Synchronize role assignments on each login") 

287 

288 sso_generic_enabled: bool = Field(default=False, description="Enable generic OIDC provider (Keycloak, Auth0, etc.)") 

289 sso_generic_provider_id: Optional[str] = Field(default=None, description="Provider ID (e.g., 'keycloak', 'auth0', 'authentik')") 

290 sso_generic_display_name: Optional[str] = Field(default=None, description="Display name shown on login page") 

291 sso_generic_client_id: Optional[str] = Field(default=None, description="Generic OIDC client ID") 

292 sso_generic_client_secret: Optional[SecretStr] = Field(default=None, description="Generic OIDC client secret") 

293 sso_generic_authorization_url: Optional[str] = Field(default=None, description="Authorization endpoint URL") 

294 sso_generic_token_url: Optional[str] = Field(default=None, description="Token endpoint URL") 

295 sso_generic_userinfo_url: Optional[str] = Field(default=None, description="Userinfo endpoint URL") 

296 sso_generic_issuer: Optional[str] = Field(default=None, description="OIDC issuer URL") 

297 sso_generic_scope: Optional[str] = Field(default="openid profile email", description="OAuth scopes (space-separated)") 

298 

299 # SSO Settings 

300 sso_auto_create_users: bool = Field(default=True, description="Automatically create users from SSO providers") 

301 sso_trusted_domains: Annotated[list[str], NoDecode()] = Field(default_factory=list, description="Trusted email domains (CSV or JSON list)") 

302 sso_preserve_admin_auth: bool = Field(default=True, description="Preserve local admin authentication when SSO is enabled") 

303 

304 # SSO Admin Assignment Settings 

305 sso_auto_admin_domains: Annotated[list[str], NoDecode()] = Field(default_factory=list, description="Admin domains (CSV or JSON list)") 

306 sso_github_admin_orgs: Annotated[list[str], NoDecode()] = Field(default_factory=list, description="GitHub orgs granting admin (CSV/JSON)") 

307 sso_google_admin_domains: Annotated[list[str], NoDecode()] = Field(default_factory=list, description="Google admin domains (CSV/JSON)") 

308 sso_require_admin_approval: bool = Field(default=False, description="Require admin approval for new SSO registrations") 

309 

310 # MCP Client Authentication 

311 mcp_client_auth_enabled: bool = Field(default=True, description="Enable JWT authentication for MCP client operations") 

312 mcp_require_auth: bool = Field( 

313 default=False, 

314 description="Require authentication for /mcp endpoints. If false, unauthenticated requests can access public items only. " "If true, all /mcp requests must include a valid Bearer token.", 

315 ) 

316 trust_proxy_auth: bool = Field( 

317 default=False, 

318 description="Trust proxy authentication headers (required when mcp_client_auth_enabled=false)", 

319 ) 

320 proxy_user_header: str = Field(default="X-Authenticated-User", description="Header containing authenticated username from proxy") 

321 

322 # Encryption key phrase for auth storage 

323 auth_encryption_secret: SecretStr = Field(default=SecretStr("my-test-salt")) 

324 

325 # Query Parameter Authentication (INSECURE - disabled by default) 

326 insecure_allow_queryparam_auth: bool = Field( 

327 default=False, 

328 description=("Enable query parameter authentication for gateway peers. " "WARNING: API keys may appear in proxy logs. See CWE-598."), 

329 ) 

330 insecure_queryparam_auth_allowed_hosts: List[str] = Field( 

331 default_factory=list, 

332 description=("Allowlist of hosts permitted to use query parameter auth. " "Empty list allows any host when feature is enabled. " "Format: ['mcp.tavily.com', 'api.example.com']"), 

333 ) 

334 

335 # =================================== 

336 # SSRF Protection Configuration 

337 # =================================== 

338 # Server-Side Request Forgery (SSRF) protection prevents the gateway from being 

339 # used to access internal resources or cloud metadata services. 

340 

341 ssrf_protection_enabled: bool = Field( 

342 default=True, 

343 description="Enable SSRF protection for gateway/tool URLs. Blocks access to dangerous endpoints.", 

344 ) 

345 

346 ssrf_blocked_networks: List[str] = Field( 

347 default=[ 

348 # Cloud metadata services (ALWAYS dangerous - credential exposure) 

349 "169.254.169.254/32", # AWS/GCP/Azure instance metadata 

350 "169.254.169.123/32", # AWS NTP service 

351 "fd00::1/128", # IPv6 cloud metadata 

352 # Link-local (often used for cloud metadata) 

353 "169.254.0.0/16", # Full link-local IPv4 range 

354 "fe80::/10", # IPv6 link-local 

355 ], 

356 description=( 

357 "CIDR ranges to block for SSRF protection. These are ALWAYS blocked regardless of other settings. " "Default blocks cloud metadata endpoints. Add private ranges for stricter security." 

358 ), 

359 ) 

360 

361 ssrf_blocked_hosts: List[str] = Field( 

362 default=[ 

363 "metadata.google.internal", # GCP metadata hostname 

364 "metadata.internal", # Generic cloud metadata 

365 ], 

366 description="Hostnames to block for SSRF protection. Matched case-insensitively.", 

367 ) 

368 

369 ssrf_allow_localhost: bool = Field( 

370 default=True, 

371 description=("Allow localhost/loopback addresses (127.0.0.0/8, ::1). " "Set to false to block localhost access for stricter security. " "Default true for development compatibility."), 

372 ) 

373 

374 ssrf_allow_private_networks: bool = Field( 

375 default=True, 

376 description=( 

377 "Allow RFC 1918 private network addresses (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16). " 

378 "Set to false if the gateway should only access public internet endpoints. " 

379 "Default true for internal deployment compatibility." 

380 ), 

381 ) 

382 

383 ssrf_dns_fail_closed: bool = Field( 

384 default=False, 

385 description=( 

386 "Fail closed on DNS resolution errors. When true, URLs that cannot be resolved " 

387 "are rejected. When false (default), unresolvable hostnames are allowed through " 

388 "(hostname blocklist still applies). Set to true for stricter security." 

389 ), 

390 ) 

391 

392 # OAuth Configuration 

393 oauth_request_timeout: int = Field(default=30, description="OAuth request timeout in seconds") 

394 oauth_max_retries: int = Field(default=3, description="Maximum retries for OAuth token requests") 

395 oauth_default_timeout: int = Field(default=3600, description="Default OAuth token timeout in seconds") 

396 

397 # =================================== 

398 # Dynamic Client Registration (DCR) - Client Mode 

399 # =================================== 

400 

401 # Enable DCR client functionality 

402 dcr_enabled: bool = Field(default=True, description="Enable Dynamic Client Registration (RFC 7591) - gateway acts as DCR client") 

403 

404 # Auto-register when missing credentials 

405 dcr_auto_register_on_missing_credentials: bool = Field(default=True, description="Automatically register with AS when gateway has issuer but no client_id") 

406 

407 # Default scopes for DCR 

408 dcr_default_scopes: List[str] = Field(default=["mcp:read"], description="Default MCP scopes to request during DCR") 

409 

410 # Issuer allowlist (empty = allow any) 

411 dcr_allowed_issuers: List[str] = Field(default_factory=list, description="Optional allowlist of issuer URLs for DCR (empty = allow any)") 

412 

413 # Token endpoint auth method 

414 dcr_token_endpoint_auth_method: str = Field(default="client_secret_basic", description="Token endpoint auth method for DCR (client_secret_basic or client_secret_post)") 

415 

416 # Metadata cache TTL 

417 dcr_metadata_cache_ttl: int = Field(default=3600, description="AS metadata cache TTL in seconds (RFC 8414 discovery)") 

418 

419 # Client name template 

420 dcr_client_name_template: str = Field(default="MCP Gateway ({gateway_name})", description="Template for client_name in DCR requests") 

421 

422 # Refresh token behavior 

423 dcr_request_refresh_token_when_unsupported: bool = Field( 

424 default=False, 

425 description="Request refresh_token even when AS metadata omits grant_types_supported. Enable for AS servers that support refresh tokens but don't advertise it.", 

426 ) 

427 

428 # =================================== 

429 # OAuth Discovery (RFC 8414) 

430 # =================================== 

431 

432 oauth_discovery_enabled: bool = Field(default=True, description="Enable OAuth AS metadata discovery (RFC 8414)") 

433 

434 oauth_preferred_code_challenge_method: str = Field(default="S256", description="Preferred PKCE code challenge method (S256 or plain)") 

435 

436 # Email-Based Authentication 

437 email_auth_enabled: bool = Field(default=True, description="Enable email-based authentication") 

438 public_registration_enabled: bool = Field( 

439 default=False, 

440 description="Allow unauthenticated users to self-register accounts. When false, only admins can create users via /admin/users endpoint.", 

441 ) 

442 protect_all_admins: bool = Field( 

443 default=True, 

444 description="When true (default), prevent any admin from being demoted, deactivated, or locked out via API/UI. When false, only the last active admin is protected.", 

445 ) 

446 platform_admin_email: str = Field(default="admin@example.com", description="Platform administrator email address") 

447 platform_admin_password: SecretStr = Field(default=SecretStr("changeme"), description="Platform administrator password") 

448 default_user_password: SecretStr = Field(default=SecretStr("changeme"), description="Default password for new users") # nosec B105 

449 platform_admin_full_name: str = Field(default="Platform Administrator", description="Platform administrator full name") 

450 

451 # Argon2id Password Hashing Configuration 

452 argon2id_time_cost: int = Field(default=3, description="Argon2id time cost (number of iterations)") 

453 argon2id_memory_cost: int = Field(default=65536, description="Argon2id memory cost in KiB") 

454 argon2id_parallelism: int = Field(default=1, description="Argon2id parallelism (number of threads)") 

455 

456 # Password Policy Configuration 

457 password_min_length: int = Field(default=8, description="Minimum password length") 

458 password_require_uppercase: bool = Field(default=True, description="Require uppercase letters in passwords") 

459 password_require_lowercase: bool = Field(default=True, description="Require lowercase letters in passwords") 

460 password_require_numbers: bool = Field(default=False, description="Require numbers in passwords") 

461 password_require_special: bool = Field(default=True, description="Require special characters in passwords") 

462 

463 # Password change enforcement and policy toggles 

464 password_change_enforcement_enabled: bool = Field(default=True, description="Master switch for password change enforcement checks") 

465 admin_require_password_change_on_bootstrap: bool = Field(default=True, description="Force admin to change password after bootstrap") 

466 detect_default_password_on_login: bool = Field(default=True, description="Detect default password during login and mark user for change") 

467 require_password_change_for_default_password: bool = Field(default=True, description="Require password change when user is created with the default password") 

468 password_policy_enabled: bool = Field(default=True, description="Enable password complexity validation for new/changed passwords") 

469 password_prevent_reuse: bool = Field(default=True, description="Prevent reusing the current password when changing") 

470 password_max_age_days: int = Field(default=90, description="Password maximum age in days before expiry forces a change") 

471 # Account Security Configuration 

472 max_failed_login_attempts: int = Field(default=10, description="Maximum failed login attempts before account lockout") 

473 account_lockout_duration_minutes: int = Field(default=1, description="Account lockout duration in minutes") 

474 

475 # Personal Teams Configuration 

476 auto_create_personal_teams: bool = Field(default=True, description="Enable automatic personal team creation for new users") 

477 personal_team_prefix: str = Field(default="personal", description="Personal team naming prefix") 

478 max_teams_per_user: int = Field(default=50, description="Maximum number of teams a user can belong to") 

479 max_members_per_team: int = Field(default=100, description="Maximum number of members per team") 

480 invitation_expiry_days: int = Field(default=7, description="Number of days before team invitations expire") 

481 require_email_verification_for_invites: bool = Field(default=True, description="Require email verification for team invitations") 

482 

483 # UI/Admin Feature Flags 

484 mcpgateway_ui_enabled: bool = False 

485 mcpgateway_admin_api_enabled: bool = False 

486 mcpgateway_ui_airgapped: bool = Field(default=False, description="Use local CDN assets instead of external CDNs for airgapped deployments") 

487 mcpgateway_bulk_import_enabled: bool = True 

488 mcpgateway_bulk_import_max_tools: int = 200 

489 mcpgateway_bulk_import_rate_limit: int = 10 

490 

491 # UI Tool Test Configuration 

492 mcpgateway_ui_tool_test_timeout: int = Field(default=60000, description="Tool test timeout in milliseconds for the admin UI") 

493 

494 # Tool Execution Cancellation 

495 mcpgateway_tool_cancellation_enabled: bool = Field(default=True, description="Enable gateway-authoritative tool execution cancellation with REST API endpoints") 

496 

497 # A2A (Agent-to-Agent) Feature Flags 

498 mcpgateway_a2a_enabled: bool = True 

499 mcpgateway_a2a_max_agents: int = 100 

500 mcpgateway_a2a_default_timeout: int = 30 

501 mcpgateway_a2a_max_retries: int = 3 

502 mcpgateway_a2a_metrics_enabled: bool = True 

503 

504 # gRPC Support Configuration (EXPERIMENTAL - disabled by default) 

505 mcpgateway_grpc_enabled: bool = Field(default=False, description="Enable gRPC to MCP translation support (experimental feature)") 

506 mcpgateway_grpc_reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection by default") 

507 mcpgateway_grpc_max_message_size: int = Field(default=4194304, description="Maximum gRPC message size in bytes (4MB)") 

508 mcpgateway_grpc_timeout: int = Field(default=30, description="Default gRPC call timeout in seconds") 

509 mcpgateway_grpc_tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connections by default") 

510 

511 # =================================== 

512 # Performance Monitoring Configuration 

513 # =================================== 

514 mcpgateway_performance_tracking: bool = Field(default=False, description="Enable performance tracking tab in admin UI") 

515 mcpgateway_performance_collection_interval: int = Field(default=10, ge=1, le=300, description="Metric collection interval in seconds") 

516 mcpgateway_performance_retention_hours: int = Field(default=24, ge=1, le=168, description="Snapshot retention period in hours") 

517 mcpgateway_performance_retention_days: int = Field(default=90, ge=1, le=365, description="Aggregate retention period in days") 

518 mcpgateway_performance_max_snapshots: int = Field(default=10000, ge=100, le=1000000, description="Maximum performance snapshots to retain") 

519 mcpgateway_performance_distributed: bool = Field(default=False, description="Enable distributed mode metrics aggregation via Redis") 

520 mcpgateway_performance_net_connections_enabled: bool = Field(default=True, description="Enable network connections counting (can be CPU intensive)") 

521 mcpgateway_performance_net_connections_cache_ttl: int = Field(default=15, ge=1, le=300, description="Cache TTL for net_connections in seconds") 

522 

523 # MCP Server Catalog Configuration 

524 mcpgateway_catalog_enabled: bool = Field(default=True, description="Enable MCP server catalog feature") 

525 mcpgateway_catalog_file: str = Field(default="mcp-catalog.yml", description="Path to catalog configuration file") 

526 mcpgateway_catalog_auto_health_check: bool = Field(default=True, description="Automatically health check catalog servers") 

527 mcpgateway_catalog_cache_ttl: int = Field(default=3600, description="Catalog cache TTL in seconds") 

528 mcpgateway_catalog_page_size: int = Field(default=100, description="Number of catalog servers per page") 

529 

530 # MCP Gateway Bootstrap Roles In DB Configuration 

531 mcpgateway_bootstrap_roles_in_db_enabled: bool = Field(default=False, description="Enable MCP Gateway add additional roles in db") 

532 mcpgateway_bootstrap_roles_in_db_file: str = Field(default="additional_roles_in_db.json", description="Path to add additional roles in db") 

533 

534 # Elicitation support (MCP 2025-06-18) 

535 mcpgateway_elicitation_enabled: bool = Field(default=True, description="Enable elicitation passthrough support (MCP 2025-06-18)") 

536 mcpgateway_elicitation_timeout: int = Field(default=60, description="Default timeout for elicitation requests in seconds") 

537 mcpgateway_elicitation_max_concurrent: int = Field(default=100, description="Maximum concurrent elicitation requests") 

538 

539 # Security 

540 skip_ssl_verify: bool = Field( 

541 default=False, 

542 description="Skip SSL certificate verification for ALL outbound HTTPS requests " 

543 "(federation, MCP servers, LLM providers, A2A agents). " 

544 "WARNING: Only enable in dev environments with self-signed certificates.", 

545 ) 

546 cors_enabled: bool = True 

547 

548 # Environment 

549 environment: Literal["development", "staging", "production"] = Field(default="development") 

550 

551 # Domain configuration 

552 app_domain: HttpUrl = Field(default=HttpUrl("http://localhost:4444")) 

553 

554 # Security settings 

555 secure_cookies: bool = Field(default=True) 

556 cookie_samesite: str = Field(default="lax") 

557 

558 # CORS settings 

559 cors_allow_credentials: bool = Field(default=True) 

560 

561 # Security Headers Configuration 

562 security_headers_enabled: bool = Field(default=True) 

563 x_frame_options: Optional[str] = Field(default="DENY") 

564 

565 @field_validator("x_frame_options") 

566 @classmethod 

567 def normalize_x_frame_options(cls, v: Optional[str]) -> Optional[str]: 

568 """Convert string 'null' or 'none' to Python None to disable iframe restrictions. 

569 

570 Args: 

571 v: The x_frame_options value from environment/config 

572 

573 Returns: 

574 None if v is "null" or "none" (case-insensitive), otherwise returns v unchanged 

575 """ 

576 if isinstance(v, str) and v.lower() in ("null", "none"): 

577 return None 

578 return v 

579 

580 x_content_type_options_enabled: bool = Field(default=True) 

581 x_xss_protection_enabled: bool = Field(default=True) 

582 x_download_options_enabled: bool = Field(default=True) 

583 hsts_enabled: bool = Field(default=True) 

584 hsts_max_age: int = Field(default=31536000) # 1 year 

585 hsts_include_subdomains: bool = Field(default=True) 

586 remove_server_headers: bool = Field(default=True) 

587 

588 # Response Compression Configuration 

589 compression_enabled: bool = Field(default=True, description="Enable response compression (Brotli, Zstd, GZip)") 

590 compression_minimum_size: int = Field(default=500, ge=0, description="Minimum response size in bytes to compress (0 = compress all)") 

591 compression_gzip_level: int = Field(default=6, ge=1, le=9, description="GZip compression level (1=fastest, 9=best compression)") 

592 compression_brotli_quality: int = Field(default=4, ge=0, le=11, description="Brotli compression quality (0-3=fast, 4-9=balanced, 10-11=max)") 

593 compression_zstd_level: int = Field(default=3, ge=1, le=22, description="Zstd compression level (1-3=fast, 4-9=balanced, 10+=slow)") 

594 

595 # For allowed_origins, strip '' to ensure we're passing on valid JSON via env 

596 # Tell pydantic *not* to touch this env var - our validator will. 

597 allowed_origins: Annotated[Set[str], NoDecode] = { 

598 "http://localhost", 

599 "http://localhost:4444", 

600 } 

601 

602 # Security validation thresholds 

603 min_secret_length: int = 32 

604 min_password_length: int = 12 

605 require_strong_secrets: bool = False # Default to False for backward compatibility, will be enforced in 1.0.0 

606 

607 llmchat_enabled: bool = Field(default=False, description="Enable LLM Chat feature") 

608 toolops_enabled: bool = Field(default=False, description="Enable ToolOps feature") 

609 

610 # database-backed polling settings for session message delivery 

611 poll_interval: float = Field(default=1.0, description="Initial polling interval in seconds for checking new session messages") 

612 max_interval: float = Field(default=5.0, description="Maximum polling interval in seconds when the session is idle") 

613 backoff_factor: float = Field(default=1.5, description="Multiplier used to gradually increase the polling interval during inactivity") 

614 

615 # redis configurations for Maintaining Chat Sessions in multi-worker environment 

616 llmchat_session_ttl: int = Field(default=300, description="Seconds for active_session key TTL") 

617 llmchat_session_lock_ttl: int = Field(default=30, description="Seconds for lock expiry") 

618 llmchat_session_lock_retries: int = Field(default=10, description="How many times to poll while waiting") 

619 llmchat_session_lock_wait: float = Field(default=0.2, description="Seconds between polls") 

620 llmchat_chat_history_ttl: int = Field(default=3600, description="Seconds for chat history expiry") 

621 llmchat_chat_history_max_messages: int = Field(default=50, description="Maximum message history to store per user") 

622 

623 # LLM Settings (Internal API for LLM Chat) 

624 llm_api_prefix: str = Field(default="/v1", description="API prefix for internal LLM endpoints") 

625 llm_request_timeout: int = Field(default=120, description="Request timeout in seconds for LLM API calls") 

626 llm_streaming_enabled: bool = Field(default=True, description="Enable streaming responses for LLM Chat") 

627 llm_health_check_interval: int = Field(default=300, description="Provider health check interval in seconds") 

628 

629 @field_validator("allowed_roots", mode="before") 

630 @classmethod 

631 def parse_allowed_roots(cls, v): 

632 """Parse allowed roots from environment variable or config value. 

633 

634 Args: 

635 v: The input value to parse 

636 

637 Returns: 

638 list: Parsed list of allowed root paths 

639 """ 

640 if isinstance(v, str): 

641 # Support both JSON array and comma-separated values 

642 v = v.strip() 

643 if not v: 

644 return [] 

645 # Try JSON first 

646 try: 

647 loaded = orjson.loads(v) 

648 if isinstance(loaded, list): 

649 return loaded 

650 except orjson.JSONDecodeError: 

651 # Not a valid JSON array → fallback to comma-separated parsing 

652 pass 

653 # Fallback to comma-split 

654 return [x.strip() for x in v.split(",") if x.strip()] 

655 return v 

656 

657 @field_validator("jwt_secret_key", "auth_encryption_secret") 

658 @classmethod 

659 def validate_secrets(cls, v: Any, info: ValidationInfo) -> SecretStr: 

660 """ 

661 Validate that secret keys meet basic security requirements. 

662 

663 This validator is applied to the `jwt_secret_key` and `auth_encryption_secret` fields. 

664 It performs the following checks: 

665 

666 1. Detects default or weak secrets (e.g., "changeme", "secret", "password"). 

667 Logs a warning if detected. 

668 

669 2. Checks minimum length (at least 32 characters). Logs a warning if shorter. 

670 

671 3. Performs a basic entropy check (at least 10 unique characters). Logs a warning if low. 

672 

673 Notes: 

674 - Logging is used for warnings; the function does not raise exceptions. 

675 - The original value is returned as a `SecretStr` for safe handling. 

676 

677 Args: 

678 v: The secret value to validate. 

679 info: Pydantic validation info object, used to get the field name. 

680 

681 Returns: 

682 SecretStr: The validated secret value, wrapped as a SecretStr if it wasn't already. 

683 """ 

684 

685 field_name = info.field_name 

686 

687 # Extract actual string value safely 

688 if isinstance(v, SecretStr): 

689 value = v.get_secret_value() 

690 else: 

691 value = str(v) 

692 

693 # Check for default/weak secrets 

694 if not info.data.get("client_mode"): 

695 weak_secrets = ["my-test-key", "my-test-salt", "changeme", "secret", "password"] 

696 if value.lower() in weak_secrets: 

697 logger.warning(f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! Please set a strong, unique value for production.") 

698 

699 # Check minimum length 

700 if len(value) < 32: 

701 logger.warning(f"⚠️ SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. Current length: {len(value)}") 

702 

703 # Basic entropy check (at least 10 unique characters) 

704 if len(set(value)) < 10: 

705 logger.warning(f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. Consider using a more random value.") 

706 

707 # Always return SecretStr to keep it secret-safe 

708 return v if isinstance(v, SecretStr) else SecretStr(value) 

709 

710 @field_validator("basic_auth_password") 

711 @classmethod 

712 def validate_admin_password(cls, v: str | SecretStr, info: ValidationInfo) -> SecretStr: 

713 """Validate admin password meets security requirements. 

714 

715 Args: 

716 v: The admin password value to validate. 

717 info: ValidationInfo containing field data. 

718 

719 Returns: 

720 SecretStr: The validated admin password value, wrapped as SecretStr. 

721 """ 

722 # Extract actual string value safely 

723 if isinstance(v, SecretStr): 

724 value = v.get_secret_value() 

725 else: 

726 value = v 

727 

728 if not info.data.get("client_mode"): 

729 if value == "changeme": # nosec B105 - checking for default value 

730 logger.warning("🔓 SECURITY WARNING: Default BASIC_AUTH_PASSWORD detected! Please change it if you enable API_ALLOW_BASIC_AUTH.") 

731 

732 # Note: We can't access password_min_length here as it's not set yet during validation 

733 # Using default value of 8 to match the field default 

734 min_length = 8 # This matches the default in password_min_length field 

735 if len(value) < min_length: 

736 logger.warning(f"⚠️ SECURITY WARNING: Admin password should be at least {min_length} characters long. Current length: {len(value)}") 

737 

738 # Check password complexity 

739 has_upper = any(c.isupper() for c in value) 

740 has_lower = any(c.islower() for c in value) 

741 has_digit = any(c.isdigit() for c in value) 

742 has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', value)) 

743 

744 complexity_score = sum([has_upper, has_lower, has_digit, has_special]) 

745 if complexity_score < 3: 

746 logger.warning("🔐 SECURITY WARNING: Admin password has low complexity. Should contain at least 3 of: uppercase, lowercase, digits, special characters") 

747 

748 # Always return SecretStr to keep it secret-safe 

749 return v if isinstance(v, SecretStr) else SecretStr(value) 

750 

751 @field_validator("allowed_origins") 

752 @classmethod 

753 def validate_cors_origins(cls, v: Any, info: ValidationInfo) -> set[str] | None: 

754 """Validate CORS allowed origins. 

755 

756 Args: 

757 v: The set of allowed origins to validate. 

758 info: ValidationInfo containing field data. 

759 

760 Returns: 

761 set: The validated set of allowed origins. 

762 

763 Raises: 

764 ValueError: If allowed_origins is not a set or list of strings. 

765 """ 

766 if v is None: 

767 return v 

768 if not isinstance(v, (set, list)): 

769 raise ValueError("allowed_origins must be a set or list of strings") 

770 

771 dangerous_origins = ["*", "null", ""] 

772 if not info.data.get("client_mode"): 

773 for origin in v: 

774 if origin in dangerous_origins: 

775 logger.warning(f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. Consider specifying explicit origins instead of wildcards.") 

776 

777 # Validate URL format 

778 if not origin.startswith(("http://", "https://")) and origin not in dangerous_origins: 

779 logger.warning(f"⚠️ SECURITY WARNING: Invalid origin format '{origin}'. Origins should start with http:// or https://") 

780 

781 return set({str(origin) for origin in v}) 

782 

783 @field_validator("database_url") 

784 @classmethod 

785 def validate_database_url(cls, v: str, info: ValidationInfo) -> str: 

786 """Validate database connection string security. 

787 

788 Args: 

789 v: The database URL to validate. 

790 info: ValidationInfo containing field data. 

791 

792 Returns: 

793 str: The validated database URL. 

794 """ 

795 # Check for hardcoded passwords in non-SQLite databases 

796 if not info.data.get("client_mode"): 

797 if not v.startswith("sqlite"): 

798 if "password" in v and any(weak in v for weak in ["password", "123", "admin", "test"]): 

799 logger.warning("Potentially weak database password detected. Consider using a stronger password.") 

800 

801 # Warn about SQLite in production 

802 if v.startswith("sqlite"): 

803 logger.info("Using SQLite database. Consider PostgreSQL or MySQL for production.") 

804 

805 return v 

806 

807 @model_validator(mode="after") 

808 def validate_security_combinations(self) -> Self: 

809 """Validate security setting combinations. Only logs warnings; no changes are made. 

810 

811 Returns: 

812 Itself. 

813 """ 

814 if not self.client_mode: 

815 # Check for dangerous combinations - only log warnings, don't raise errors 

816 if not self.auth_required and self.mcpgateway_ui_enabled: 

817 logger.warning("🔓 SECURITY WARNING: Admin UI is enabled without authentication. Consider setting AUTH_REQUIRED=true for production.") 

818 

819 if self.skip_ssl_verify and not self.dev_mode: 

820 logger.warning("🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. This is a security risk! Set SKIP_SSL_VERIFY=false for production.") 

821 

822 if self.debug and not self.dev_mode: 

823 logger.warning("🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. This may leak sensitive information! Set DEBUG=false for production.") 

824 

825 return self 

826 

827 def get_security_warnings(self) -> List[str]: 

828 """Get list of security warnings for current configuration. 

829 

830 Returns: 

831 List[str]: List of security warning messages. 

832 """ 

833 warnings = [] 

834 

835 # Authentication warnings 

836 if not self.auth_required: 

837 warnings.append("🔓 Authentication is disabled - ensure this is intentional") 

838 

839 if self.basic_auth_user == "admin": 

840 warnings.append("⚠️ Using default admin username - consider changing it") 

841 

842 # SSL/TLS warnings 

843 if self.skip_ssl_verify: 

844 warnings.append("🔓 SSL verification is disabled - not recommended for production") 

845 

846 # Debug/Dev warnings 

847 if self.debug and not self.dev_mode: 

848 warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage") 

849 

850 if self.dev_mode: 

851 warnings.append("🔧 Development mode enabled - not for production use") 

852 

853 # CORS warnings 

854 if self.cors_enabled and "*" in self.allowed_origins: 

855 warnings.append("🌐 CORS allows all origins (*) - this is a security risk") 

856 

857 # Token warnings 

858 if self.token_expiry > 10080: # More than 7 days 

859 warnings.append("⏱️ JWT token expiry is very long - consider shorter duration") 

860 

861 # Database warnings 

862 if self.database_url.startswith("sqlite") and not self.dev_mode: 

863 warnings.append("💾 SQLite database in use - consider PostgreSQL/MySQL for production") 

864 

865 # Rate limiting warnings 

866 if self.tool_rate_limit > 1000: 

867 warnings.append("🚦 Tool rate limit is very high - may allow abuse") 

868 

869 return warnings 

870 

871 class SecurityStatus(TypedDict): 

872 """TypedDict for comprehensive security status.""" 

873 

874 secure_secrets: bool 

875 auth_enabled: bool 

876 ssl_verification: bool 

877 debug_disabled: bool 

878 cors_restricted: bool 

879 ui_protected: bool 

880 warnings: List[str] 

881 security_score: int 

882 

883 def get_security_status(self) -> SecurityStatus: 

884 """Get comprehensive security status. 

885 

886 Returns: 

887 SecurityStatus: Dictionary containing security status information including score and warnings. 

888 """ 

889 

890 # Compute a security score: 100 minus 10 for each warning 

891 security_score = max(0, 100 - 10 * len(self.get_security_warnings())) 

892 

893 return { 

894 "secure_secrets": self.jwt_secret_key != "my-test-key", # nosec B105 - checking for default value 

895 "auth_enabled": self.auth_required, 

896 "ssl_verification": not self.skip_ssl_verify, 

897 "debug_disabled": not self.debug, 

898 "cors_restricted": "*" not in self.allowed_origins if self.cors_enabled else True, 

899 "ui_protected": not self.mcpgateway_ui_enabled or self.auth_required, 

900 "warnings": self.get_security_warnings(), 

901 "security_score": security_score, 

902 } 

903 

904 # Max retries for HTTP requests 

905 retry_max_attempts: int = 3 

906 retry_base_delay: float = 1.0 # seconds 

907 retry_max_delay: int = 60 # seconds 

908 retry_jitter_max: float = 0.5 # fraction of base delay 

909 

910 # HTTPX Client Configuration (for shared singleton client) 

911 # See: https://www.python-httpx.org/advanced/#pool-limits 

912 # Formula: max_connections = expected_concurrent_outbound_requests × 1.5 

913 httpx_max_connections: int = Field( 

914 default=200, 

915 ge=10, 

916 le=1000, 

917 description="Maximum total concurrent HTTP connections (global, not per-host). " "Increase for high-traffic deployments with many outbound calls.", 

918 ) 

919 httpx_max_keepalive_connections: int = Field( 

920 default=100, 

921 ge=1, 

922 le=500, 

923 description="Maximum idle keepalive connections to retain (typically 50% of max_connections)", 

924 ) 

925 httpx_keepalive_expiry: float = Field( 

926 default=30.0, 

927 ge=5.0, 

928 le=300.0, 

929 description="Seconds before idle keepalive connections are closed", 

930 ) 

931 httpx_connect_timeout: float = Field( 

932 default=5.0, 

933 ge=1.0, 

934 le=60.0, 

935 description="Timeout in seconds for establishing new connections (5s for LAN, increase for WAN)", 

936 ) 

937 httpx_read_timeout: float = Field( 

938 default=120.0, 

939 ge=1.0, 

940 le=600.0, 

941 description="Timeout in seconds for reading response data (set high for slow MCP tool calls)", 

942 ) 

943 httpx_write_timeout: float = Field( 

944 default=30.0, 

945 ge=1.0, 

946 le=600.0, 

947 description="Timeout in seconds for writing request data", 

948 ) 

949 httpx_pool_timeout: float = Field( 

950 default=10.0, 

951 ge=1.0, 

952 le=120.0, 

953 description="Timeout in seconds waiting for a connection from the pool (fail fast on exhaustion)", 

954 ) 

955 httpx_http2_enabled: bool = Field( 

956 default=False, 

957 description="Enable HTTP/2 (requires h2 package; enable only if upstreams support HTTP/2)", 

958 ) 

959 httpx_admin_read_timeout: float = Field( 

960 default=30.0, 

961 ge=1.0, 

962 le=120.0, 

963 description="Read timeout for admin UI operations (model fetching, health checks). " "Shorter than httpx_read_timeout to fail fast on admin pages.", 

964 ) 

965 

966 @field_validator("allowed_origins", mode="before") 

967 @classmethod 

968 def _parse_allowed_origins(cls, v: Any) -> Set[str]: 

969 """Parse allowed origins from environment variable or config value. 

970 

971 Handles multiple input formats for the allowed_origins field: 

972 - JSON array string: '["http://localhost", "http://example.com"]' 

973 - Comma-separated string: "http://localhost, http://example.com" 

974 - Already parsed set/list 

975 

976 Automatically strips whitespace and removes outer quotes if present. 

977 

978 Args: 

979 v: The input value to parse. Can be a string (JSON or CSV), set, list, or other iterable. 

980 

981 Returns: 

982 Set[str]: A set of allowed origin strings. 

983 

984 Examples: 

985 >>> sorted(Settings._parse_allowed_origins('["https://a.com", "https://b.com"]')) 

986 ['https://a.com', 'https://b.com'] 

987 >>> sorted(Settings._parse_allowed_origins("https://x.com , https://y.com")) 

988 ['https://x.com', 'https://y.com'] 

989 >>> Settings._parse_allowed_origins('""') 

990 set() 

991 >>> Settings._parse_allowed_origins('"https://single.com"') 

992 {'https://single.com'} 

993 >>> sorted(Settings._parse_allowed_origins(['http://a.com', 'http://b.com'])) 

994 ['http://a.com', 'http://b.com'] 

995 >>> Settings._parse_allowed_origins({'http://existing.com'}) 

996 {'http://existing.com'} 

997 """ 

998 if isinstance(v, str): 

999 v = v.strip() 

1000 if v[:1] in "\"'" and v[-1:] == v[:1]: # strip 1 outer quote pair 

1001 v = v[1:-1] 

1002 try: 

1003 parsed = set(orjson.loads(v)) 

1004 except orjson.JSONDecodeError: 

1005 parsed = {s.strip() for s in v.split(",") if s.strip()} 

1006 return parsed 

1007 return set(v) 

1008 

1009 # Logging 

1010 log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default="ERROR") 

1011 log_requests: bool = Field(default=False, description="Enable request payload logging with sensitive data masking") 

1012 log_format: Literal["json", "text"] = "json" # json or text 

1013 log_to_file: bool = False # Enable file logging (default: stdout/stderr only) 

1014 log_filemode: str = "a+" # append or overwrite 

1015 log_file: Optional[str] = None # Only used if log_to_file=True 

1016 log_folder: Optional[str] = None # Only used if log_to_file=True 

1017 

1018 # Log Rotation (optional - only used if log_to_file=True) 

1019 log_rotation_enabled: bool = False # Enable log file rotation 

1020 log_max_size_mb: int = 1 # Max file size in MB before rotation (default: 1MB) 

1021 log_backup_count: int = 5 # Number of backup files to keep (default: 5) 

1022 

1023 # Detailed Request Logging Configuration 

1024 log_detailed_max_body_size: int = Field( 

1025 default=16384, # 16KB - sensible default for request body logging 

1026 ge=1024, 

1027 le=1048576, # Max 1MB 

1028 description="Maximum request body size to log in detailed mode (bytes). Separate from log_max_size_mb which is for file rotation.", 

1029 ) 

1030 

1031 # Optional: endpoints to skip for detailed request logging (prefix match) 

1032 log_detailed_skip_endpoints: List[str] = Field( 

1033 default_factory=list, 

1034 description="List of path prefixes to skip when log_detailed_requests is enabled", 

1035 ) 

1036 

1037 # Whether to attempt resolving user identity via DB fallback when logging. 

1038 # Keep default False to avoid implicit DB queries during normal request handling. 

1039 log_resolve_user_identity: bool = Field( 

1040 default=False, 

1041 description="If true, RequestLoggingMiddleware will attempt DB fallback to resolve user identity when needed", 

1042 ) 

1043 

1044 # Sampling rate for detailed request logging (0.0-1.0). Applied when log_detailed_requests is enabled. 

1045 log_detailed_sample_rate: float = Field( 

1046 default=1.0, 

1047 ge=0.0, 

1048 le=1.0, 

1049 description="Fraction of requests to sample for detailed logging (0.0-1.0)", 

1050 ) 

1051 

1052 # Log Buffer (for in-memory storage in admin UI) 

1053 log_buffer_size_mb: float = 1.0 # Size of in-memory log buffer in MB 

1054 

1055 # =================================== 

1056 # Observability Configuration 

1057 # =================================== 

1058 

1059 # Enable observability features (traces, spans, metrics) 

1060 observability_enabled: bool = Field(default=False, description="Enable observability tracing and metrics collection") 

1061 

1062 # Automatic HTTP request tracing 

1063 observability_trace_http_requests: bool = Field(default=True, description="Automatically trace HTTP requests") 

1064 

1065 # Trace retention period (days) 

1066 observability_trace_retention_days: int = Field(default=7, ge=1, description="Number of days to retain trace data") 

1067 

1068 # Maximum traces to store (prevents unbounded growth) 

1069 observability_max_traces: int = Field(default=100000, ge=1000, description="Maximum number of traces to retain") 

1070 

1071 # Sample rate (0.0 to 1.0) - 1.0 means trace everything 

1072 observability_sample_rate: float = Field(default=1.0, ge=0.0, le=1.0, description="Trace sampling rate (0.0-1.0)") 

1073 

1074 # Include paths for tracing (regex patterns) 

1075 observability_include_paths: List[str] = Field( 

1076 default_factory=lambda: [ 

1077 r"^/rpc/?$", 

1078 r"^/sse$", 

1079 r"^/message$", 

1080 r"^/mcp(?:/|$)", 

1081 r"^/servers/[^/]+/mcp/?$", 

1082 r"^/servers/[^/]+/sse$", 

1083 r"^/servers/[^/]+/message$", 

1084 r"^/a2a(?:/|$)", 

1085 ], 

1086 description="Regex patterns to include for tracing (when empty, all paths are eligible before excludes)", 

1087 ) 

1088 

1089 # Exclude paths from tracing (regex patterns) 

1090 observability_exclude_paths: List[str] = Field( 

1091 default_factory=lambda: ["/health", "/healthz", "/ready", "/metrics", "/static/.*"], 

1092 description="Regex patterns to exclude from tracing (applies after include patterns)", 

1093 ) 

1094 

1095 # Enable performance metrics 

1096 observability_metrics_enabled: bool = Field(default=True, description="Enable metrics collection") 

1097 

1098 # Enable span events 

1099 observability_events_enabled: bool = Field(default=True, description="Enable event logging within spans") 

1100 

1101 # Correlation ID Settings 

1102 correlation_id_enabled: bool = Field(default=True, description="Enable automatic correlation ID tracking for requests") 

1103 correlation_id_header: str = Field(default="X-Correlation-ID", description="HTTP header name for correlation ID") 

1104 correlation_id_preserve: bool = Field(default=True, description="Preserve correlation IDs from incoming requests") 

1105 correlation_id_response_header: bool = Field(default=True, description="Include correlation ID in response headers") 

1106 

1107 # =================================== 

1108 # Database Query Logging (N+1 Detection) 

1109 # =================================== 

1110 db_query_log_enabled: bool = Field(default=False, description="Enable database query logging to file (for N+1 detection)") 

1111 db_query_log_file: str = Field(default="logs/db-queries.log", description="Path to database query log file") 

1112 db_query_log_json_file: str = Field(default="logs/db-queries.jsonl", description="Path to JSON Lines query log file") 

1113 db_query_log_format: str = Field(default="both", description="Log format: 'json', 'text', or 'both'") 

1114 db_query_log_min_queries: int = Field(default=1, ge=1, description="Only log requests with >= N queries") 

1115 db_query_log_include_params: bool = Field(default=False, description="Include query parameters (may expose sensitive data)") 

1116 db_query_log_detect_n1: bool = Field(default=True, description="Automatically detect and flag N+1 query patterns") 

1117 db_query_log_n1_threshold: int = Field(default=3, ge=2, description="Number of similar queries to flag as potential N+1") 

1118 

1119 # Structured Logging Configuration 

1120 structured_logging_enabled: bool = Field(default=True, description="Enable structured JSON logging with database persistence") 

1121 structured_logging_database_enabled: bool = Field(default=False, description="Persist structured logs to database (enables /api/logs/* endpoints, impacts performance)") 

1122 structured_logging_external_enabled: bool = Field(default=False, description="Send logs to external systems") 

1123 

1124 # Performance Tracking Configuration 

1125 performance_tracking_enabled: bool = Field(default=True, description="Enable performance tracking and metrics") 

1126 performance_threshold_database_query_ms: float = Field(default=100.0, description="Alert threshold for database queries (ms)") 

1127 performance_threshold_tool_invocation_ms: float = Field(default=2000.0, description="Alert threshold for tool invocations (ms)") 

1128 performance_threshold_resource_read_ms: float = Field(default=1000.0, description="Alert threshold for resource reads (ms)") 

1129 performance_threshold_http_request_ms: float = Field(default=500.0, description="Alert threshold for HTTP requests (ms)") 

1130 performance_degradation_multiplier: float = Field(default=1.5, description="Alert if performance degrades by this multiplier vs baseline") 

1131 

1132 # Audit Trail Configuration 

1133 # Audit trail logging is disabled by default for performance. 

1134 # When enabled, it logs all CRUD operations (create, read, update, delete) on resources. 

1135 # WARNING: This causes a database write on every API request and can cause significant load. 

1136 audit_trail_enabled: bool = Field(default=False, description="Enable audit trail logging to database for compliance") 

1137 permission_audit_enabled: bool = Field( 

1138 default=False, 

1139 description="Enable permission audit logging for RBAC checks (writes a row per permission check)", 

1140 ) 

1141 

1142 # Security Logging Configuration 

1143 # Security event logging is disabled by default for performance. 

1144 # When enabled, it logs authentication attempts, authorization failures, and security events. 

1145 # WARNING: "all" level logs every request and can cause significant database write load. 

1146 security_logging_enabled: bool = Field(default=False, description="Enable security event logging to database") 

1147 security_logging_level: Literal["all", "failures_only", "high_severity"] = Field( 

1148 default="failures_only", 

1149 description=( 

1150 "Security logging level: " 

1151 "'all' = log all events including successful auth (high DB load), " 

1152 "'failures_only' = log only authentication/authorization failures, " 

1153 "'high_severity' = log only high/critical severity events" 

1154 ), 

1155 ) 

1156 security_failed_auth_threshold: int = Field(default=5, description="Failed auth attempts before high severity alert") 

1157 security_threat_score_alert: float = Field(default=0.7, description="Threat score threshold for alerts (0.0-1.0)") 

1158 security_rate_limit_window_minutes: int = Field(default=5, description="Time window for rate limit checks (minutes)") 

1159 

1160 # Metrics Aggregation Configuration 

1161 metrics_aggregation_enabled: bool = Field(default=True, description="Enable automatic log aggregation into performance metrics") 

1162 metrics_aggregation_backfill_hours: int = Field(default=6, ge=0, le=168, description="Hours of structured logs to backfill into performance metrics on startup") 

1163 metrics_aggregation_window_minutes: int = Field(default=5, description="Time window for metrics aggregation (minutes)") 

1164 metrics_aggregation_auto_start: bool = Field(default=False, description="Automatically run the log aggregation loop on application startup") 

1165 yield_batch_size: int = Field( 

1166 default=1000, 

1167 ge=100, 

1168 le=100000, 

1169 description="Number of rows fetched per batch when streaming hourly metric data from the database. " 

1170 "Used to limit memory usage during aggregation and percentile calculations. " 

1171 "Smaller values reduce memory footprint but increase DB round-trips; larger values improve throughput " 

1172 "at the cost of higher memory usage.", 

1173 ) 

1174 

1175 # Execution Metrics Recording 

1176 # Controls whether tool/resource/prompt/server/A2A execution metrics are written to the database. 

1177 # Disable if using external observability (ELK, Datadog, Splunk). 

1178 # Note: Does NOT affect log aggregation (METRICS_AGGREGATION_ENABLED) or Prometheus (ENABLE_METRICS). 

1179 db_metrics_recording_enabled: bool = Field( 

1180 default=True, description="Enable recording of execution metrics (tool/resource/prompt/server/A2A) to database. Disable if using external observability." 

1181 ) 

1182 

1183 # Metrics Buffer Configuration (for batching tool/resource/prompt metrics writes) 

1184 metrics_buffer_enabled: bool = Field(default=True, description="Enable buffered metrics writes (reduces DB pressure under load)") 

1185 metrics_buffer_flush_interval: int = Field(default=60, ge=5, le=300, description="Seconds between automatic metrics buffer flushes") 

1186 metrics_buffer_max_size: int = Field(default=1000, ge=100, le=10000, description="Maximum buffered metrics before forced flush") 

1187 

1188 # Metrics Cache Configuration (for caching aggregate metrics queries) 

1189 metrics_cache_enabled: bool = Field(default=True, description="Enable in-memory caching for aggregate metrics queries") 

1190 metrics_cache_ttl_seconds: int = Field(default=60, ge=1, le=300, description="TTL for cached aggregate metrics in seconds") 

1191 

1192 # Metrics Cleanup Configuration (automatic deletion of old metrics) 

1193 metrics_cleanup_enabled: bool = Field(default=True, description="Enable automatic cleanup of old metrics data") 

1194 metrics_retention_days: int = Field(default=7, ge=1, le=365, description="Days to retain raw metrics before cleanup (fallback when rollup disabled)") 

1195 metrics_cleanup_interval_hours: int = Field(default=1, ge=1, le=168, description="Hours between automatic cleanup runs") 

1196 metrics_cleanup_batch_size: int = Field(default=10000, ge=100, le=100000, description="Batch size for metrics deletion (prevents long locks)") 

1197 

1198 # Metrics Rollup Configuration (hourly aggregation for historical queries) 

1199 metrics_rollup_enabled: bool = Field(default=True, description="Enable hourly metrics rollup for efficient historical queries") 

1200 metrics_rollup_interval_hours: int = Field(default=1, ge=1, le=24, description="Hours between rollup runs") 

1201 metrics_rollup_retention_days: int = Field(default=365, ge=30, le=3650, description="Days to retain hourly rollup data") 

1202 metrics_rollup_late_data_hours: int = Field( 

1203 default=1, ge=1, le=48, description="Hours to re-process on each run to catch late-arriving data (smaller = less CPU, larger = more tolerance for delayed metrics)" 

1204 ) 

1205 metrics_delete_raw_after_rollup: bool = Field(default=True, description="Delete raw metrics after hourly rollup exists (recommended for production)") 

1206 metrics_delete_raw_after_rollup_hours: int = Field(default=1, ge=1, le=8760, description="Hours to retain raw metrics when hourly rollup exists") 

1207 

1208 # Auth Cache Configuration (reduces DB queries during authentication) 

1209 auth_cache_enabled: bool = Field(default=True, description="Enable Redis/in-memory caching for authentication data (user, team, revocation)") 

1210 auth_cache_user_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for cached user data") 

1211 auth_cache_revocation_ttl: int = Field(default=30, ge=5, le=120, description="TTL in seconds for token revocation cache (security-critical, keep short)") 

1212 auth_cache_team_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for team membership cache") 

1213 auth_cache_role_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user role in team cache") 

1214 auth_cache_teams_enabled: bool = Field(default=True, description="Enable caching for get_user_teams() (default: true)") 

1215 auth_cache_teams_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user teams list cache") 

1216 auth_cache_batch_queries: bool = Field(default=True, description="Batch auth DB queries into single call (reduces 3 queries to 1)") 

1217 

1218 # Registry Cache Configuration (reduces DB queries for list endpoints) 

1219 registry_cache_enabled: bool = Field(default=True, description="Enable caching for registry list endpoints (tools, prompts, resources, etc.)") 

1220 registry_cache_tools_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for tools list cache") 

1221 registry_cache_prompts_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for prompts list cache") 

1222 registry_cache_resources_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for resources list cache") 

1223 registry_cache_agents_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for agents list cache") 

1224 registry_cache_servers_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for servers list cache") 

1225 registry_cache_gateways_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for gateways list cache") 

1226 registry_cache_catalog_ttl: int = Field(default=300, ge=60, le=600, description="TTL in seconds for catalog servers list cache (external catalog, changes infrequently)") 

1227 

1228 # Tool Lookup Cache Configuration (reduces hot-path DB lookups in invoke_tool) 

1229 tool_lookup_cache_enabled: bool = Field(default=True, description="Enable tool lookup cache (tool name -> tool config)") 

1230 tool_lookup_cache_ttl_seconds: int = Field(default=60, ge=5, le=600, description="TTL in seconds for tool lookup cache entries") 

1231 tool_lookup_cache_negative_ttl_seconds: int = Field(default=10, ge=1, le=60, description="TTL in seconds for negative tool lookup cache entries") 

1232 tool_lookup_cache_l1_maxsize: int = Field(default=10000, ge=100, le=1000000, description="Max entries for in-memory tool lookup cache (L1)") 

1233 tool_lookup_cache_l2_enabled: bool = Field(default=True, description="Enable Redis-backed tool lookup cache (L2) when cache_type=redis") 

1234 

1235 # Admin Stats Cache Configuration (reduces dashboard query overhead) 

1236 admin_stats_cache_enabled: bool = Field(default=True, description="Enable caching for admin dashboard statistics") 

1237 admin_stats_cache_system_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for system stats cache") 

1238 admin_stats_cache_observability_ttl: int = Field(default=30, ge=10, le=120, description="TTL in seconds for observability stats cache") 

1239 admin_stats_cache_tags_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for tags listing cache") 

1240 admin_stats_cache_plugins_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for plugin stats cache") 

1241 admin_stats_cache_performance_ttl: int = Field(default=60, ge=15, le=300, description="TTL in seconds for performance aggregates cache") 

1242 

1243 # Team Member Count Cache Configuration (reduces N+1 queries in admin UI) 

1244 team_member_count_cache_enabled: bool = Field(default=True, description="Enable Redis caching for team member counts") 

1245 team_member_count_cache_ttl: int = Field(default=300, ge=30, le=3600, description="TTL in seconds for team member count cache (default: 5 minutes)") 

1246 

1247 # Log Search Configuration 

1248 log_search_max_results: int = Field(default=1000, description="Maximum results per log search query") 

1249 log_retention_days: int = Field(default=30, description="Number of days to retain logs in database") 

1250 

1251 # External Log Integration Configuration 

1252 elasticsearch_enabled: bool = Field(default=False, description="Send logs to Elasticsearch") 

1253 elasticsearch_url: Optional[str] = Field(default=None, description="Elasticsearch cluster URL") 

1254 elasticsearch_index_prefix: str = Field(default="mcpgateway-logs", description="Elasticsearch index prefix") 

1255 syslog_enabled: bool = Field(default=False, description="Send logs to syslog") 

1256 syslog_host: Optional[str] = Field(default=None, description="Syslog server host") 

1257 syslog_port: int = Field(default=514, description="Syslog server port") 

1258 webhook_logging_enabled: bool = Field(default=False, description="Send logs to webhook endpoints") 

1259 webhook_logging_urls: List[str] = Field(default_factory=list, description="Webhook URLs for log delivery") 

1260 

1261 @field_validator("log_level", mode="before") 

1262 @classmethod 

1263 def validate_log_level(cls, v: str) -> str: 

1264 """ 

1265 Normalize and validate the log level value. 

1266 

1267 Ensures that the input string matches one of the allowed log levels, 

1268 case-insensitively. The value is uppercased before validation so that 

1269 "debug", "Debug", etc. are all accepted as "DEBUG". 

1270 

1271 Args: 

1272 v (str): The log level string provided via configuration or environment. 

1273 

1274 Returns: 

1275 str: The validated and normalized (uppercase) log level. 

1276 

1277 Raises: 

1278 ValueError: If the provided value is not one of 

1279 {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}. 

1280 """ 

1281 allowed = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} 

1282 v_up = v.upper() 

1283 if v_up not in allowed: 

1284 raise ValueError(f"Invalid log_level: {v}") 

1285 return v_up 

1286 

1287 # Transport 

1288 transport_type: str = "all" # http, ws, sse, all 

1289 websocket_ping_interval: int = 30 # seconds 

1290 sse_retry_timeout: int = 5000 # milliseconds - client retry interval on disconnect 

1291 sse_keepalive_enabled: bool = True # Enable SSE keepalive events 

1292 sse_keepalive_interval: int = 30 # seconds between keepalive events 

1293 sse_send_timeout: float = 30.0 # seconds - timeout for ASGI send() calls, protects against hung connections 

1294 sse_rapid_yield_window_ms: int = 1000 # milliseconds - time window for rapid yield detection 

1295 sse_rapid_yield_max: int = 50 # max yields per window before assuming client disconnected (0=disabled) 

1296 

1297 # Gateway/Server Connection Timeout 

1298 # Timeout in seconds for HTTP requests to registered gateways and MCP servers. 

1299 # Used by: GatewayService, ToolService, ServerService for health checks and tool invocations. 

1300 # Note: Previously part of federation settings, retained for gateway connectivity. 

1301 federation_timeout: int = 120 

1302 

1303 # SSO 

1304 # For sso_issuers strip out quotes to ensure we're passing valid JSON via env 

1305 sso_issuers: Optional[list[HttpUrl]] = Field(default=None) 

1306 

1307 @field_validator("sso_issuers", mode="before") 

1308 @classmethod 

1309 def parse_issuers(cls, v: Any) -> list[str]: 

1310 """ 

1311 Parse and validate the SSO issuers configuration value. 

1312 

1313 Accepts: 

1314 - JSON array string: '["https://idp1.com", "https://idp2.com"]' 

1315 - Comma-separated string: "https://idp1.com, https://idp2.com" 

1316 - Empty string or None → [] 

1317 - Already-parsed list 

1318 

1319 Args: 

1320 v: The input value to parse. 

1321 

1322 Returns: 

1323 list[str]: Parsed list of issuer URLs. 

1324 

1325 Raises: 

1326 ValueError: If the input is not a valid format. 

1327 """ 

1328 if v is None: 

1329 return [] 

1330 if isinstance(v, list): 

1331 return v 

1332 if isinstance(v, str): 

1333 s = v.strip() 

1334 if not s: 

1335 return [] 

1336 if s.startswith("["): 

1337 try: 

1338 parsed = orjson.loads(s) 

1339 return parsed if isinstance(parsed, list) else [] 

1340 except orjson.JSONDecodeError: 

1341 raise ValueError(f"Invalid JSON for SSO_ISSUERS: {v!r}") 

1342 # Fallback to comma-separated parsing 

1343 return [item.strip() for item in s.split(",") if item.strip()] 

1344 raise ValueError("Invalid type for SSO_ISSUERS") 

1345 

1346 # Resources 

1347 resource_cache_size: int = 1000 

1348 resource_cache_ttl: int = 3600 # seconds 

1349 max_resource_size: int = 10 * 1024 * 1024 # 10MB 

1350 allowed_mime_types: Set[str] = { 

1351 "text/plain", 

1352 "text/markdown", 

1353 "text/html", 

1354 "application/json", 

1355 "application/xml", 

1356 "image/png", 

1357 "image/jpeg", 

1358 "image/gif", 

1359 } 

1360 

1361 # Tools 

1362 tool_timeout: int = 60 # seconds 

1363 max_tool_retries: int = 3 

1364 tool_rate_limit: int = 100 # requests per minute 

1365 tool_concurrent_limit: int = 10 

1366 

1367 # MCP Session Pool - reduces per-request latency from ~20ms to ~1-2ms 

1368 # Disabled by default for safety. Enable explicitly in production after testing. 

1369 mcp_session_pool_enabled: bool = False 

1370 mcp_session_pool_max_per_key: int = 10 # Max sessions per (URL, identity, transport) 

1371 mcp_session_pool_ttl: float = 300.0 # Session TTL in seconds 

1372 mcp_session_pool_health_check_interval: float = 60.0 # Idle time before health check (aligned with health_check_interval) 

1373 mcp_session_pool_acquire_timeout: float = 30.0 # Timeout waiting for session slot 

1374 mcp_session_pool_create_timeout: float = 30.0 # Timeout creating new session 

1375 mcp_session_pool_circuit_breaker_threshold: int = 5 # Failures before circuit opens 

1376 mcp_session_pool_circuit_breaker_reset: float = 60.0 # Seconds before circuit resets 

1377 mcp_session_pool_idle_eviction: float = 600.0 # Evict idle pool keys after this time 

1378 # Transport timeout for pooled sessions (default 30s to match MCP SDK default). 

1379 # This timeout applies to all HTTP operations (connect, read, write) on pooled sessions. 

1380 # Use a higher value for deployments with long-running tool calls. 

1381 mcp_session_pool_transport_timeout: float = 30.0 

1382 # Force explicit RPC (list_tools) on gateway health checks even when session is fresh. 

1383 # Off by default: pool's internal staleness check (idle > health_check_interval) handles this. 

1384 # Enable for stricter health verification at the cost of ~5ms latency per check. 

1385 mcp_session_pool_explicit_health_rpc: bool = False 

1386 # Configurable health check chain - ordered list of methods to try. 

1387 # Options: ping, list_tools, list_prompts, list_resources, skip 

1388 # Default: ping,skip - try lightweight ping, skip if unsupported (for legacy servers) 

1389 mcp_session_pool_health_check_methods: List[str] = ["ping", "skip"] 

1390 # Timeout in seconds for each health check attempt 

1391 mcp_session_pool_health_check_timeout: float = 5.0 

1392 mcp_session_pool_identity_headers: List[str] = ["authorization", "x-tenant-id", "x-user-id", "x-api-key", "cookie", "x-mcp-session-id"] 

1393 # Timeout for session/transport cleanup operations (__aexit__ calls). 

1394 # This prevents CPU spin loops when internal tasks (like post_writer waiting on 

1395 # memory streams) don't respond to cancellation. Does NOT affect tool execution 

1396 # time - only cleanup of idle/released sessions. Increase if you see frequent 

1397 # "cleanup timed out" warnings; decrease for faster recovery from spin loops. 

1398 mcp_session_pool_cleanup_timeout: float = 5.0 

1399 

1400 # Timeout for SSE task group cleanup (seconds). 

1401 # When an SSE connection is cancelled, this controls how long to wait for 

1402 # internal tasks to respond before forcing cleanup. Shorter values reduce 

1403 # CPU waste during anyio _deliver_cancellation spin loops but may interrupt 

1404 # legitimate cleanup. Only affects cancelled connections, not normal operation. 

1405 # See: https://github.com/agronholm/anyio/issues/695 

1406 sse_task_group_cleanup_timeout: float = 5.0 

1407 

1408 # ========================================================================= 

1409 # EXPERIMENTAL: anyio _deliver_cancellation spin loop workaround 

1410 # ========================================================================= 

1411 # When enabled, monkey-patches anyio's CancelScope._deliver_cancellation to 

1412 # limit the number of retry iterations. This prevents 100% CPU spin loops 

1413 # when tasks don't respond to CancelledError (anyio issue #695). 

1414 # 

1415 # WARNING: This is a workaround for an upstream issue. May be removed when 

1416 # anyio or MCP SDK fix the underlying problem. Enable only if you experience 

1417 # CPU spin loops during SSE/MCP connection cleanup. 

1418 # 

1419 # Trade-offs when enabled: 

1420 # - Prevents indefinite CPU spin (good) 

1421 # - May leave some tasks uncancelled after max iterations (usually harmless) 

1422 # - Worker recycling (GUNICORN_MAX_REQUESTS) cleans up orphaned tasks 

1423 # 

1424 # See: https://github.com/agronholm/anyio/issues/695 

1425 # Env: ANYIO_CANCEL_DELIVERY_PATCH_ENABLED 

1426 anyio_cancel_delivery_patch_enabled: bool = False 

1427 

1428 # Maximum iterations for _deliver_cancellation before giving up. 

1429 # Only used when anyio_cancel_delivery_patch_enabled=True. 

1430 # Higher values = more attempts to cancel tasks, but longer potential spin. 

1431 # Lower values = faster recovery, but more orphaned tasks. 

1432 # Env: ANYIO_CANCEL_DELIVERY_MAX_ITERATIONS 

1433 anyio_cancel_delivery_max_iterations: int = 100 

1434 

1435 # Session Affinity 

1436 mcpgateway_session_affinity_enabled: bool = False # Global session affinity toggle 

1437 mcpgateway_session_affinity_ttl: int = 300 # Session affinity binding TTL 

1438 mcpgateway_session_affinity_max_sessions: int = 1 # Max sessions per identity for affinity 

1439 mcpgateway_pool_rpc_forward_timeout: int = 30 # Timeout for forwarding RPC requests to owner worker 

1440 

1441 # Prompts 

1442 prompt_cache_size: int = 100 

1443 max_prompt_size: int = 100 * 1024 # 100KB 

1444 prompt_render_timeout: int = 10 # seconds 

1445 

1446 # Health Checks 

1447 # Interval in seconds between health checks (aligned with mcp_session_pool_health_check_interval) 

1448 health_check_interval: int = 60 

1449 # Timeout in seconds for each health check request 

1450 health_check_timeout: int = 5 

1451 # Per-check timeout (seconds) to bound total time of one gateway health check 

1452 # Env: GATEWAY_HEALTH_CHECK_TIMEOUT 

1453 gateway_health_check_timeout: float = 5.0 

1454 # Consecutive failures before marking gateway offline 

1455 unhealthy_threshold: int = 3 

1456 # Max concurrent health checks per worker 

1457 max_concurrent_health_checks: int = 10 

1458 

1459 # Auto-refresh tools/resources/prompts from gateways during health checks 

1460 # When enabled, tools/resources/prompts are fetched and synced with DB during health checks 

1461 auto_refresh_servers: bool = Field(default=False, description="Enable automatic tool/resource/prompt refresh during gateway health checks") 

1462 

1463 # Per-gateway refresh configuration (used when auto_refresh_servers is True) 

1464 # Gateways can override this with their own refresh_interval_seconds 

1465 gateway_auto_refresh_interval: int = Field(default=300, ge=60, description="Default refresh interval in seconds for gateway tools/resources/prompts sync (minimum 60 seconds)") 

1466 

1467 # Validation Gateway URL 

1468 gateway_validation_timeout: int = 5 # seconds 

1469 gateway_max_redirects: int = 5 

1470 

1471 filelock_name: str = "gateway_service_leader.lock" 

1472 

1473 # Default Roots 

1474 default_roots: List[str] = [] 

1475 

1476 # Database 

1477 db_driver: str = "mariadb+mariadbconnector" 

1478 db_pool_size: int = 200 

1479 db_max_overflow: int = 10 

1480 db_pool_timeout: int = 30 

1481 db_pool_recycle: int = 3600 

1482 db_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total) 

1483 db_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter 

1484 db_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap) 

1485 

1486 # Database Performance Optimization 

1487 use_postgresdb_percentiles: bool = Field( 

1488 default=True, 

1489 description="Use database-native percentile functions (percentile_cont) for performance metrics. " 

1490 "When enabled, PostgreSQL uses native SQL percentile calculations (5-10x faster). " 

1491 "When disabled or using SQLite, falls back to Python-based percentile calculations. " 

1492 "Recommended: true for PostgreSQL, auto-detected for SQLite.", 

1493 ) 

1494 

1495 # psycopg3-specific: Number of times a query must be executed before it's 

1496 # prepared server-side. Set to 0 to disable, 1 to prepare immediately. 

1497 # Default of 5 balances memory usage with query performance. 

1498 db_prepare_threshold: int = Field(default=5, ge=0, le=100, description="psycopg3 prepare_threshold for auto-prepared statements") 

1499 

1500 # Connection pool class: "auto" (default), "null", or "queue" 

1501 # - "auto": Uses NullPool when PgBouncer detected, QueuePool otherwise 

1502 # - "null": Always use NullPool (recommended with PgBouncer - lets PgBouncer handle pooling) 

1503 # - "queue": Always use QueuePool (application-side pooling) 

1504 db_pool_class: Literal["auto", "null", "queue"] = Field( 

1505 default="auto", 

1506 description="Connection pool class: auto (NullPool with PgBouncer), null, or queue", 

1507 ) 

1508 

1509 # Pre-ping connections before checkout (validates connection is alive) 

1510 # - "auto": Enabled for non-PgBouncer, disabled for PgBouncer (default) 

1511 # - "true": Always enable (adds SELECT 1 overhead but catches stale connections) 

1512 # - "false": Always disable 

1513 db_pool_pre_ping: Literal["auto", "true", "false"] = Field( 

1514 default="auto", 

1515 description="Pre-ping connections: auto, true, or false", 

1516 ) 

1517 

1518 # SQLite busy timeout: Maximum time (ms) SQLite will wait to acquire a database lock before returning SQLITE_BUSY. 

1519 db_sqlite_busy_timeout: int = Field(default=5000, ge=1000, le=60000, description="SQLite busy timeout in milliseconds (default: 5000ms)") 

1520 

1521 # Cache 

1522 cache_type: Literal["redis", "memory", "none", "database"] = "database" # memory or redis or database 

1523 redis_url: Optional[str] = "redis://localhost:6379/0" 

1524 cache_prefix: str = "mcpgw:" 

1525 session_ttl: int = 3600 

1526 message_ttl: int = 600 

1527 redis_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total) 

1528 redis_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter 

1529 redis_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap) 

1530 

1531 # GlobalConfig In-Memory Cache (Issue #1715) 

1532 # Caches GlobalConfig (passthrough headers) to eliminate redundant DB queries 

1533 global_config_cache_ttl: int = Field( 

1534 default=60, 

1535 ge=5, 

1536 le=3600, 

1537 description="TTL in seconds for GlobalConfig in-memory cache (default: 60)", 

1538 ) 

1539 

1540 # A2A Stats In-Memory Cache 

1541 # Caches A2A agent counts (total, active) to eliminate redundant COUNT queries 

1542 a2a_stats_cache_ttl: int = Field( 

1543 default=30, 

1544 ge=5, 

1545 le=3600, 

1546 description="TTL in seconds for A2A stats in-memory cache (default: 30)", 

1547 ) 

1548 

1549 # Redis Parser Configuration (ADR-026) 

1550 # hiredis C parser provides up to 83x faster response parsing for large responses 

1551 redis_parser: Literal["auto", "hiredis", "python"] = Field( 

1552 default="auto", 

1553 description="Redis protocol parser: auto (use hiredis if available), hiredis (require hiredis), python (pure-Python)", 

1554 ) 

1555 

1556 # Redis Connection Pool - Performance Optimized 

1557 redis_decode_responses: bool = Field(default=True, description="Return strings instead of bytes") 

1558 redis_max_connections: int = Field(default=50, description="Connection pool size per worker") 

1559 redis_socket_timeout: float = Field(default=2.0, description="Socket read/write timeout in seconds") 

1560 redis_socket_connect_timeout: float = Field(default=2.0, description="Connection timeout in seconds") 

1561 redis_retry_on_timeout: bool = Field(default=True, description="Retry commands on timeout") 

1562 redis_health_check_interval: int = Field(default=30, description="Seconds between connection health checks (0=disabled)") 

1563 

1564 # Redis Leader Election - Multi-Node Deployments 

1565 redis_leader_ttl: int = Field(default=15, description="Leader election TTL in seconds") 

1566 redis_leader_key: str = Field(default="gateway_service_leader", description="Leader key name") 

1567 redis_leader_heartbeat_interval: int = Field(default=5, description="Seconds between leader heartbeats") 

1568 

1569 # streamable http transport 

1570 use_stateful_sessions: bool = False # Set to False to use stateless sessions without event store 

1571 json_response_enabled: bool = True # Enable JSON responses instead of SSE streams 

1572 streamable_http_max_events_per_stream: int = 100 # Ring buffer capacity per stream 

1573 streamable_http_event_ttl: int = 3600 # Event stream TTL in seconds (1 hour) 

1574 

1575 # Core plugin settings 

1576 plugins_enabled: bool = Field(default=False, description="Enable the plugin framework") 

1577 plugin_config_file: str = Field(default="plugins/config.yaml", description="Path to main plugin configuration file") 

1578 

1579 # Plugin CLI settings 

1580 plugins_cli_completion: bool = Field(default=False, description="Enable auto-completion for plugins CLI") 

1581 plugins_cli_markup_mode: Literal["markdown", "rich", "disabled"] | None = Field(default=None, description="Set markup mode for plugins CLI") 

1582 

1583 # Development 

1584 dev_mode: bool = False 

1585 reload: bool = False 

1586 debug: bool = False 

1587 

1588 # Observability (OpenTelemetry) 

1589 otel_enable_observability: bool = Field(default=False, description="Enable OpenTelemetry observability") 

1590 otel_traces_exporter: str = Field(default="otlp", description="Traces exporter: otlp, jaeger, zipkin, console, none") 

1591 otel_exporter_otlp_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint (e.g., http://localhost:4317)") 

1592 otel_exporter_otlp_protocol: str = Field(default="grpc", description="OTLP protocol: grpc or http") 

1593 otel_exporter_otlp_insecure: bool = Field(default=True, description="Use insecure connection for OTLP") 

1594 otel_exporter_otlp_headers: Optional[str] = Field(default=None, description="OTLP headers (comma-separated key=value)") 

1595 otel_exporter_jaeger_endpoint: Optional[str] = Field(default=None, description="Jaeger endpoint") 

1596 otel_exporter_zipkin_endpoint: Optional[str] = Field(default=None, description="Zipkin endpoint") 

1597 otel_service_name: str = Field(default="mcp-gateway", description="Service name for traces") 

1598 otel_resource_attributes: Optional[str] = Field(default=None, description="Resource attributes (comma-separated key=value)") 

1599 otel_bsp_max_queue_size: int = Field(default=2048, description="Max queue size for batch span processor") 

1600 otel_bsp_max_export_batch_size: int = Field(default=512, description="Max export batch size") 

1601 otel_bsp_schedule_delay: int = Field(default=5000, description="Schedule delay in milliseconds") 

1602 

1603 # =================================== 

1604 # Well-Known URI Configuration 

1605 # =================================== 

1606 

1607 # Enable well-known URI endpoints 

1608 well_known_enabled: bool = True 

1609 

1610 # robots.txt content (default: disallow all crawling for private API) 

1611 well_known_robots_txt: str = """User-agent: * 

1612Disallow: / 

1613 

1614# MCP Gateway is a private API gateway 

1615# Public crawling is disabled by default""" 

1616 

1617 # security.txt content (optional, user-defined) 

1618 # Example: "Contact: security@example.com\nExpires: 2025-12-31T23:59:59Z\nPreferred-Languages: en" 

1619 well_known_security_txt: str = "" 

1620 

1621 # Enable security.txt only if content is provided 

1622 well_known_security_txt_enabled: bool = False 

1623 

1624 # Additional custom well-known files (JSON format) 

1625 # Example: {"ai.txt": "This service uses AI for...", "dnt-policy.txt": "Do Not Track policy..."} 

1626 well_known_custom_files: str = "{}" 

1627 

1628 # Cache control for well-known files (seconds) 

1629 well_known_cache_max_age: int = 3600 # 1 hour default 

1630 

1631 # =================================== 

1632 # Performance / Startup Tuning 

1633 # =================================== 

1634 

1635 slug_refresh_batch_size: int = Field(default=1000, description="Batch size for gateway/tool slug refresh at startup") 

1636 model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore") 

1637 

1638 gateway_tool_name_separator: str = "-" 

1639 valid_slug_separator_regexp: ClassVar[str] = r"^(-{1,2}|[_.])$" 

1640 

1641 @field_validator("gateway_tool_name_separator") 

1642 @classmethod 

1643 def must_be_allowed_sep(cls, v: str) -> str: 

1644 """Validate the gateway tool name separator. 

1645 

1646 Args: 

1647 v: The separator value to validate. 

1648 

1649 Returns: 

1650 The validated separator, defaults to '-' if invalid. 

1651 

1652 Examples: 

1653 >>> Settings.must_be_allowed_sep('-') 

1654 '-' 

1655 >>> Settings.must_be_allowed_sep('--') 

1656 '--' 

1657 >>> Settings.must_be_allowed_sep('_') 

1658 '_' 

1659 >>> Settings.must_be_allowed_sep('.') 

1660 '.' 

1661 >>> Settings.must_be_allowed_sep('invalid') 

1662 '-' 

1663 """ 

1664 if not re.fullmatch(cls.valid_slug_separator_regexp, v): 

1665 logger.warning( 

1666 f"Invalid gateway_tool_name_separator '{v}'. Must be '-', '--', '_' or '.'. Defaulting to '-'.", 

1667 stacklevel=2, 

1668 ) 

1669 return "-" 

1670 return v 

1671 

1672 @property 

1673 def custom_well_known_files(self) -> Dict[str, str]: 

1674 """Parse custom well-known files from JSON string. 

1675 

1676 Returns: 

1677 Dict[str, str]: Parsed custom well-known files mapping filename to content. 

1678 """ 

1679 try: 

1680 return orjson.loads(self.well_known_custom_files) if self.well_known_custom_files else {} 

1681 except orjson.JSONDecodeError: 

1682 logger.error(f"Invalid JSON in WELL_KNOWN_CUSTOM_FILES: {self.well_known_custom_files}") 

1683 return {} 

1684 

1685 @field_validator("well_known_security_txt_enabled", mode="after") 

1686 @classmethod 

1687 def _auto_enable_security_txt(cls, v: Any, info: ValidationInfo) -> bool: 

1688 """Auto-enable security.txt if content is provided. 

1689 

1690 Args: 

1691 v: The current value of well_known_security_txt_enabled. 

1692 info: ValidationInfo containing field data. 

1693 

1694 Returns: 

1695 bool: True if security.txt content is provided, otherwise the original value. 

1696 """ 

1697 if info.data and "well_known_security_txt" in info.data: 

1698 return bool(info.data["well_known_security_txt"].strip()) 

1699 return bool(v) 

1700 

1701 # ------------------------------- 

1702 # Flexible list parsing for envs 

1703 # ------------------------------- 

1704 @field_validator( 

1705 "sso_entra_admin_groups", 

1706 "sso_trusted_domains", 

1707 "sso_auto_admin_domains", 

1708 "sso_github_admin_orgs", 

1709 "sso_google_admin_domains", 

1710 "insecure_queryparam_auth_allowed_hosts", 

1711 mode="before", 

1712 ) 

1713 @classmethod 

1714 def _parse_list_from_env(cls, v: None | str | list[str]) -> list[str]: 

1715 """Parse list fields from environment values. 

1716 

1717 Accepts either JSON arrays (e.g. '["a","b"]') or comma-separated 

1718 strings (e.g. 'a,b'). Empty or None becomes an empty list. 

1719 

1720 Args: 

1721 v: The value to parse, can be None, list, or string. 

1722 

1723 Returns: 

1724 list: Parsed list of values. 

1725 

1726 Raises: 

1727 ValueError: If the value type is invalid for list field parsing. 

1728 """ 

1729 if v is None: 

1730 return [] 

1731 if isinstance(v, list): 

1732 return v 

1733 if isinstance(v, str): 

1734 s = v.strip() 

1735 if not s: 

1736 return [] 

1737 if s.startswith("["): 

1738 try: 

1739 parsed = orjson.loads(s) 

1740 return parsed if isinstance(parsed, list) else [] 

1741 except Exception: 

1742 logger.warning("Invalid JSON list in env for list field; falling back to CSV parsing") 

1743 # CSV fallback 

1744 return [item.strip() for item in s.split(",") if item.strip()] 

1745 raise ValueError("Invalid type for list field") 

1746 

1747 @property 

1748 def api_key(self) -> str: 

1749 """ 

1750 Generate API key from auth credentials. 

1751 

1752 Returns: 

1753 str: API key string in the format "username:password". 

1754 

1755 Examples: 

1756 >>> from mcpgateway.config import Settings 

1757 >>> settings = Settings(basic_auth_user="admin", basic_auth_password="secret") 

1758 >>> settings.api_key 

1759 'admin:secret' 

1760 >>> settings = Settings(basic_auth_user="user123", basic_auth_password="pass456") 

1761 >>> settings.api_key 

1762 'user123:pass456' 

1763 """ 

1764 return f"{self.basic_auth_user}:{self.basic_auth_password.get_secret_value()}" 

1765 

1766 @property 

1767 def supports_http(self) -> bool: 

1768 """Check if HTTP transport is enabled. 

1769 

1770 Returns: 

1771 bool: True if HTTP transport is enabled, False otherwise. 

1772 

1773 Examples: 

1774 >>> settings = Settings(transport_type="http") 

1775 >>> settings.supports_http 

1776 True 

1777 >>> settings = Settings(transport_type="all") 

1778 >>> settings.supports_http 

1779 True 

1780 >>> settings = Settings(transport_type="ws") 

1781 >>> settings.supports_http 

1782 False 

1783 """ 

1784 return self.transport_type in ["http", "all"] 

1785 

1786 @property 

1787 def supports_websocket(self) -> bool: 

1788 """Check if WebSocket transport is enabled. 

1789 

1790 Returns: 

1791 bool: True if WebSocket transport is enabled, False otherwise. 

1792 

1793 Examples: 

1794 >>> settings = Settings(transport_type="ws") 

1795 >>> settings.supports_websocket 

1796 True 

1797 >>> settings = Settings(transport_type="all") 

1798 >>> settings.supports_websocket 

1799 True 

1800 >>> settings = Settings(transport_type="http") 

1801 >>> settings.supports_websocket 

1802 False 

1803 """ 

1804 return self.transport_type in ["ws", "all"] 

1805 

1806 @property 

1807 def supports_sse(self) -> bool: 

1808 """Check if SSE transport is enabled. 

1809 

1810 Returns: 

1811 bool: True if SSE transport is enabled, False otherwise. 

1812 

1813 Examples: 

1814 >>> settings = Settings(transport_type="sse") 

1815 >>> settings.supports_sse 

1816 True 

1817 >>> settings = Settings(transport_type="all") 

1818 >>> settings.supports_sse 

1819 True 

1820 >>> settings = Settings(transport_type="http") 

1821 >>> settings.supports_sse 

1822 False 

1823 """ 

1824 return self.transport_type in ["sse", "all"] 

1825 

1826 class DatabaseSettings(TypedDict): 

1827 """TypedDict for SQLAlchemy database settings.""" 

1828 

1829 pool_size: int 

1830 max_overflow: int 

1831 pool_timeout: int 

1832 pool_recycle: int 

1833 connect_args: dict[str, Any] # consider more specific type if needed 

1834 

1835 @property 

1836 def database_settings(self) -> DatabaseSettings: 

1837 """ 

1838 Get SQLAlchemy database settings. 

1839 

1840 Returns: 

1841 DatabaseSettings: Dictionary containing SQLAlchemy database configuration options. 

1842 

1843 Examples: 

1844 >>> from mcpgateway.config import Settings 

1845 >>> s = Settings(database_url='sqlite:///./test.db') 

1846 >>> isinstance(s.database_settings, dict) 

1847 True 

1848 """ 

1849 return { 

1850 "pool_size": self.db_pool_size, 

1851 "max_overflow": self.db_max_overflow, 

1852 "pool_timeout": self.db_pool_timeout, 

1853 "pool_recycle": self.db_pool_recycle, 

1854 "connect_args": {"check_same_thread": False} if self.database_url.startswith("sqlite") else {}, 

1855 } 

1856 

1857 class CORSSettings(TypedDict): 

1858 """TypedDict for CORS settings.""" 

1859 

1860 allow_origins: NotRequired[List[str]] 

1861 allow_credentials: NotRequired[bool] 

1862 allow_methods: NotRequired[List[str]] 

1863 allow_headers: NotRequired[List[str]] 

1864 

1865 @property 

1866 def cors_settings(self) -> CORSSettings: 

1867 """Get CORS settings. 

1868 

1869 Returns: 

1870 CORSSettings: Dictionary containing CORS configuration options. 

1871 

1872 Examples: 

1873 >>> s = Settings(cors_enabled=True, allowed_origins={'http://localhost'}) 

1874 >>> cors = s.cors_settings 

1875 >>> cors['allow_origins'] 

1876 ['http://localhost'] 

1877 >>> cors['allow_credentials'] 

1878 True 

1879 >>> s2 = Settings(cors_enabled=False) 

1880 >>> s2.cors_settings 

1881 {} 

1882 """ 

1883 return ( 

1884 { 

1885 "allow_origins": list(self.allowed_origins), 

1886 "allow_credentials": True, 

1887 "allow_methods": ["*"], 

1888 "allow_headers": ["*"], 

1889 } 

1890 if self.cors_enabled 

1891 else {} 

1892 ) 

1893 

1894 def validate_transport(self) -> None: 

1895 """ 

1896 Validate transport configuration. 

1897 

1898 Raises: 

1899 ValueError: If the transport type is not one of the valid options. 

1900 

1901 Examples: 

1902 >>> from mcpgateway.config import Settings 

1903 >>> s = Settings(transport_type='http') 

1904 >>> s.validate_transport() # no error 

1905 >>> s2 = Settings(transport_type='invalid') 

1906 >>> try: 

1907 ... s2.validate_transport() 

1908 ... except ValueError as e: 

1909 ... print('error') 

1910 error 

1911 """ 

1912 # valid_types = {"http", "ws", "sse", "all"} 

1913 valid_types = {"sse", "streamablehttp", "all", "http"} 

1914 if self.transport_type not in valid_types: 

1915 raise ValueError(f"Invalid transport type. Must be one of: {valid_types}") 

1916 

1917 def validate_database(self) -> None: 

1918 """Validate database configuration. 

1919 

1920 Examples: 

1921 >>> from mcpgateway.config import Settings 

1922 >>> s = Settings(database_url='sqlite:///./test.db') 

1923 >>> s.validate_database() # Should create the directory if it does not exist 

1924 """ 

1925 if self.database_url.startswith("sqlite"): 

1926 db_path = Path(self.database_url.replace("sqlite:///", "")) 

1927 db_dir = db_path.parent 

1928 if not db_dir.exists(): 

1929 db_dir.mkdir(parents=True) 

1930 

1931 # Validation patterns for safe display (configurable) 

1932 validation_dangerous_html_pattern: str = ( 

1933 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>" 

1934 ) 

1935 

1936 validation_dangerous_js_pattern: str = r"(?i)(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)" 

1937 

1938 validation_allowed_url_schemes: List[str] = ["http://", "https://", "ws://", "wss://"] 

1939 

1940 # Character validation patterns 

1941 validation_name_pattern: str = r"^[a-zA-Z0-9_.\-\s]+$" # Allow spaces for names 

1942 validation_identifier_pattern: str = r"^[a-zA-Z0-9_\-\.]+$" # No spaces for IDs 

1943 validation_safe_uri_pattern: str = r"^[a-zA-Z0-9_\-.:/?=&%{}]+$" 

1944 validation_unsafe_uri_pattern: str = r'[<>"\'\\]' 

1945 validation_tool_name_pattern: str = r"^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$" # MCP tool naming per SEP-986 

1946 validation_tool_method_pattern: str = r"^[a-zA-Z][a-zA-Z0-9_\./-]*$" 

1947 

1948 # MCP-compliant size limits (configurable via env) 

1949 validation_max_name_length: int = 255 

1950 validation_max_description_length: int = 8192 # 8KB 

1951 validation_max_template_length: int = 65536 # 64KB 

1952 validation_max_content_length: int = 1048576 # 1MB 

1953 validation_max_json_depth: int = Field( 

1954 default=int(os.getenv("VALIDATION_MAX_JSON_DEPTH", "30")), 

1955 description=( 

1956 "Maximum allowed JSON nesting depth for tool/resource schemas. " 

1957 "Increased from 10 to 30 for compatibility with deeply nested schemas " 

1958 "like Notion MCP (issue #1542). Override with VALIDATION_MAX_JSON_DEPTH " 

1959 "environment variable. Minimum: 1, Maximum: 100" 

1960 ), 

1961 ge=1, 

1962 le=100, 

1963 ) 

1964 validation_max_url_length: int = 2048 

1965 validation_max_rpc_param_size: int = 262144 # 256KB 

1966 

1967 validation_max_method_length: int = 128 

1968 

1969 # Allowed MIME types 

1970 validation_allowed_mime_types: List[str] = [ 

1971 "text/plain", 

1972 "text/html", 

1973 "text/css", 

1974 "text/markdown", 

1975 "text/javascript", 

1976 "application/json", 

1977 "application/xml", 

1978 "application/pdf", 

1979 "image/png", 

1980 "image/jpeg", 

1981 "image/gif", 

1982 "image/svg+xml", 

1983 "application/octet-stream", 

1984 ] 

1985 

1986 # Rate limiting 

1987 validation_max_requests_per_minute: int = 60 

1988 

1989 # Header passthrough feature (disabled by default for security) 

1990 enable_header_passthrough: bool = Field(default=False, description="Enable HTTP header passthrough feature (WARNING: Security implications - only enable if needed)") 

1991 enable_overwrite_base_headers: bool = Field(default=False, description="Enable overwriting of base headers") 

1992 

1993 # Passthrough headers configuration 

1994 default_passthrough_headers: List[str] = Field(default_factory=list) 

1995 

1996 # Passthrough headers source priority 

1997 # - "env": Environment variable always wins (ideal for Kubernetes/containerized deployments) 

1998 # - "db": Database take precedence if configured, env as fallback (default) 

1999 # - "merge": Union of both sources - env provides base, other configuration in DB can add more headers 

2000 passthrough_headers_source: Literal["env", "db", "merge"] = Field( 

2001 default="db", 

2002 description="Source priority for passthrough headers: env (environment always wins), db (database wins, default), merge (combine both)", 

2003 ) 

2004 

2005 # =================================== 

2006 # Pagination Configuration 

2007 # =================================== 

2008 

2009 # Default number of items per page for paginated endpoints 

2010 pagination_default_page_size: int = Field(default=50, ge=1, le=1000, description="Default number of items per page") 

2011 

2012 # Maximum allowed items per page (prevents abuse) 

2013 pagination_max_page_size: int = Field(default=500, ge=1, le=10000, description="Maximum allowed items per page") 

2014 

2015 # Minimum items per page 

2016 pagination_min_page_size: int = Field(default=1, ge=1, description="Minimum items per page") 

2017 

2018 # Threshold for switching from offset to cursor-based pagination 

2019 pagination_cursor_threshold: int = Field(default=10000, ge=1, description="Threshold for cursor-based pagination") 

2020 

2021 # Enable cursor-based pagination globally 

2022 pagination_cursor_enabled: bool = Field(default=True, description="Enable cursor-based pagination") 

2023 

2024 # Default sort field for paginated queries 

2025 pagination_default_sort_field: str = Field(default="created_at", description="Default sort field") 

2026 

2027 # Default sort order for paginated queries 

2028 pagination_default_sort_order: str = Field(default="desc", pattern="^(asc|desc)$", description="Default sort order") 

2029 

2030 # Maximum offset allowed for offset-based pagination (prevents abuse) 

2031 pagination_max_offset: int = Field(default=100000, ge=0, description="Maximum offset for pagination") 

2032 

2033 # Cache pagination counts for performance (seconds) 

2034 pagination_count_cache_ttl: int = Field(default=300, ge=0, description="Cache TTL for pagination counts") 

2035 

2036 # Enable pagination links in API responses 

2037 pagination_include_links: bool = Field(default=True, description="Include pagination links") 

2038 

2039 # Base URL for pagination links (defaults to request URL) 

2040 pagination_base_url: Optional[str] = Field(default=None, description="Base URL for pagination links") 

2041 

2042 # Ed25519 keys for signing 

2043 enable_ed25519_signing: bool = Field(default=False, description="Enable Ed25519 signing for certificates") 

2044 prev_ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Previous Ed25519 private key for signing") 

2045 prev_ed25519_public_key: Optional[str] = Field(default=None, description="Derived previous Ed25519 public key") 

2046 ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Ed25519 private key for signing") 

2047 ed25519_public_key: Optional[str] = Field(default=None, description="Derived Ed25519 public key") 

2048 

2049 @model_validator(mode="after") 

2050 def derive_public_keys(self) -> "Settings": 

2051 """ 

2052 Derive public keys after all individual field validations are complete. 

2053 

2054 Returns: 

2055 Settings: The updated Settings instance with derived public keys. 

2056 """ 

2057 for private_key_field in ["ed25519_private_key", "prev_ed25519_private_key"]: 

2058 public_key_field = private_key_field.replace("private", "public") 

2059 

2060 # 1. Get the private key SecretStr object 

2061 private_key_secret: SecretStr = getattr(self, private_key_field) 

2062 

2063 # 2. Proceed only if a key is present and the public key hasn't been set 

2064 pem = private_key_secret.get_secret_value().strip() 

2065 if not pem: 

2066 continue 

2067 

2068 try: 

2069 # Load the private key 

2070 private_key = serialization.load_pem_private_key(pem.encode(), password=None) 

2071 if not isinstance(private_key, ed25519.Ed25519PrivateKey): 

2072 # This check is useful, though model_validator should not raise 

2073 # for an invalid key if the field validator has already passed. 

2074 continue 

2075 

2076 # Derive and PEM-encode the public key 

2077 public_key = private_key.public_key() 

2078 public_pem = public_key.public_bytes( 

2079 encoding=serialization.Encoding.PEM, 

2080 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

2081 ).decode() 

2082 

2083 # 3. Set the public key attribute directly on the model instance (self) 

2084 setattr(self, public_key_field, public_pem) 

2085 # logger.info(f"Derived and stored {public_key_field} automatically.") 

2086 

2087 except Exception: 

2088 logger.warning("Failed to derive public key for private_key") 

2089 # You can choose to raise an error here if a failure should halt model creation 

2090 

2091 return self 

2092 

2093 def __init__(self, **kwargs: Any) -> None: 

2094 """Initialize Settings with environment variable parsing. 

2095 

2096 Args: 

2097 **kwargs: Keyword arguments passed to parent Settings class 

2098 

2099 Raises: 

2100 ValueError: When environment variable parsing fails or produces invalid data 

2101 

2102 Examples: 

2103 >>> import os 

2104 >>> # Test with no environment variable set 

2105 >>> old_val = os.environ.get('DEFAULT_PASSTHROUGH_HEADERS') 

2106 >>> if 'DEFAULT_PASSTHROUGH_HEADERS' in os.environ: 

2107 ... del os.environ['DEFAULT_PASSTHROUGH_HEADERS'] 

2108 >>> s = Settings() 

2109 >>> s.default_passthrough_headers 

2110 ['X-Tenant-Id', 'X-Trace-Id'] 

2111 >>> # Restore original value if it existed 

2112 >>> if old_val is not None: 

2113 ... os.environ['DEFAULT_PASSTHROUGH_HEADERS'] = old_val 

2114 """ 

2115 super().__init__(**kwargs) 

2116 

2117 # Parse DEFAULT_PASSTHROUGH_HEADERS environment variable 

2118 default_value = os.environ.get("DEFAULT_PASSTHROUGH_HEADERS") 

2119 if default_value: 

2120 try: 

2121 # Try JSON parsing first 

2122 self.default_passthrough_headers = orjson.loads(default_value) 

2123 if not isinstance(self.default_passthrough_headers, list): 

2124 raise ValueError("Must be a JSON array") 

2125 except (orjson.JSONDecodeError, ValueError): 

2126 # Fallback to comma-separated parsing 

2127 self.default_passthrough_headers = [h.strip() for h in default_value.split(",") if h.strip()] 

2128 logger.info(f"Parsed comma-separated passthrough headers: {self.default_passthrough_headers}") 

2129 else: 

2130 # Safer defaults without Authorization header 

2131 self.default_passthrough_headers = ["X-Tenant-Id", "X-Trace-Id"] 

2132 

2133 # Configure environment-aware CORS origins if not explicitly set via env or kwargs 

2134 # Only apply defaults if using the default allowed_origins value 

2135 if not os.environ.get("ALLOWED_ORIGINS") and "allowed_origins" not in kwargs and self.allowed_origins == {"http://localhost", "http://localhost:4444"}: 

2136 if self.environment == "development": 

2137 self.allowed_origins = { 

2138 "http://localhost", 

2139 "http://localhost:3000", 

2140 "http://localhost:8080", 

2141 "http://127.0.0.1:3000", 

2142 "http://127.0.0.1:8080", 

2143 f"http://localhost:{self.port}", 

2144 f"http://127.0.0.1:{self.port}", 

2145 } 

2146 else: 

2147 # Production origins - construct from app_domain 

2148 self.allowed_origins = {f"https://{self.app_domain}", f"https://app.{self.app_domain}", f"https://admin.{self.app_domain}"} 

2149 

2150 # Validate proxy auth configuration 

2151 if not self.mcp_client_auth_enabled and not self.trust_proxy_auth: 

2152 logger.warning( 

2153 "MCP client authentication is disabled but trust_proxy_auth is not set. " 

2154 "This is a security risk! Set TRUST_PROXY_AUTH=true only if MCP Gateway " 

2155 "is behind a trusted authentication proxy." 

2156 ) 

2157 

2158 # Masking value for all sensitive data 

2159 masked_auth_value: str = "*****" 

2160 

2161 def log_summary(self) -> None: 

2162 """ 

2163 Log a summary of the application settings. 

2164 

2165 Dumps the current settings to a dictionary while excluding sensitive 

2166 information such as `database_url` and `memcached_url`, and logs it 

2167 at the INFO level. 

2168 

2169 This method is useful for debugging or auditing purposes without 

2170 exposing credentials or secrets in logs. 

2171 """ 

2172 summary = self.model_dump(exclude={"database_url", "memcached_url"}) 

2173 logger.info(f"Application settings summary: {summary}") 

2174 

2175 ENABLE_METRICS: bool = Field(True, description="Enable Prometheus metrics instrumentation") 

2176 METRICS_EXCLUDED_HANDLERS: str = Field("", description="Comma-separated regex patterns for paths to exclude from metrics") 

2177 METRICS_NAMESPACE: str = Field("default", description="Prometheus metrics namespace") 

2178 METRICS_SUBSYSTEM: str = Field("", description="Prometheus metrics subsystem") 

2179 METRICS_CUSTOM_LABELS: str = Field("", description='Comma-separated "key=value" pairs for static custom labels') 

2180 

2181 

2182@lru_cache() 

2183def get_settings(**kwargs: Any) -> Settings: 

2184 """Get cached settings instance. 

2185 

2186 Args: 

2187 **kwargs: Keyword arguments to pass to the Settings setup. 

2188 

2189 Returns: 

2190 Settings: A cached instance of the Settings class. 

2191 

2192 Examples: 

2193 >>> settings = get_settings() 

2194 >>> isinstance(settings, Settings) 

2195 True 

2196 >>> # Second call returns the same cached instance 

2197 >>> settings2 = get_settings() 

2198 >>> settings is settings2 

2199 True 

2200 """ 

2201 # Instantiate a fresh Pydantic Settings object, 

2202 # loading from env vars or .env exactly once. 

2203 cfg = Settings(**kwargs) 

2204 # Validate that transport_type is correct; will 

2205 # raise if mis-configured. 

2206 cfg.validate_transport() 

2207 # Ensure sqlite DB directories exist if needed. 

2208 cfg.validate_database() 

2209 # Return the one-and-only Settings instance (cached). 

2210 return cfg 

2211 

2212 

2213def generate_settings_schema() -> dict[str, Any]: 

2214 """ 

2215 Return the JSON Schema describing the Settings model. 

2216 

2217 This schema can be used for validation or documentation purposes. 

2218 

2219 Returns: 

2220 dict: A dictionary representing the JSON Schema of the Settings model. 

2221 """ 

2222 return Settings.model_json_schema(mode="validation") 

2223 

2224 

2225# Lazy "instance" of settings 

2226class LazySettingsWrapper: 

2227 """Lazily initialize settings singleton on getattr""" 

2228 

2229 def __getattr__(self, key: str) -> Any: 

2230 """Get the real settings object and forward to it 

2231 

2232 Args: 

2233 key: The key to fetch from settings 

2234 

2235 Returns: 

2236 Any: The value of the attribute on the settings 

2237 """ 

2238 return getattr(get_settings(), key) 

2239 

2240 

2241settings = LazySettingsWrapper() 

2242 

2243 

2244if __name__ == "__main__": 

2245 if "--schema" in sys.argv: 

2246 schema = generate_settings_schema() 

2247 print(orjson.dumps(schema, option=orjson.OPT_INDENT_2).decode()) 

2248 sys.exit(0) 

2249 settings.log_summary()