Coverage for mcpgateway / config.py: 99%

940 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/config.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti, Manav Gupta 

6 

7ContextForge AI Gateway Configuration. 

8This module defines configuration settings for ContextForge AI Gateway using Pydantic. 

9It loads configuration from environment variables with sensible defaults. 

10 

11Environment variables: 

12- APP_NAME: Gateway name (default: "ContextForge") 

13- HOST: Host to bind to (default: "127.0.0.1") 

14- PORT: Port to listen on (default: 4444) 

15- DATABASE_URL: SQLite database URL (default: "sqlite:///./mcp.db") 

16- BASIC_AUTH_USER: Username for API Basic auth when enabled (default: "admin") 

17- BASIC_AUTH_PASSWORD: Password for API Basic auth when enabled (default: "changeme") 

18- LOG_LEVEL: Logging level (default: "INFO") 

19- SKIP_SSL_VERIFY: Disable SSL verification (default: False) 

20- AUTH_REQUIRED: Require authentication (default: True) 

21- TRANSPORT_TYPE: Transport mechanisms (default: "all") 

22- DOCS_ALLOW_BASIC_AUTH: Allow basic auth for docs (default: False) 

23- RESOURCE_CACHE_SIZE: Max cached resources (default: 1000) 

24- RESOURCE_CACHE_TTL: Cache TTL in seconds (default: 3600) 

25- TOOL_TIMEOUT: Tool invocation timeout (default: 60) 

26- PROMPT_CACHE_SIZE: Max cached prompts (default: 100) 

27- HEALTH_CHECK_INTERVAL: Gateway health check interval (default: 300) 

28- REQUIRE_TOKEN_EXPIRATION: Require JWT tokens to have expiration (default: True) 

29- REQUIRE_JTI: Require JTI claim in tokens for revocation (default: True) 

30- REQUIRE_USER_IN_DB: Require all users to exist in database (default: False) 

31 

32Examples: 

33 >>> from mcpgateway.config import Settings 

34 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret') 

35 >>> s.api_key 

36 'admin:secret' 

37 >>> s2 = Settings(transport_type='http') 

38 >>> s2.validate_transport() # no error 

39 >>> s3 = Settings(transport_type='invalid') 

40 >>> try: 

41 ... s3.validate_transport() 

42 ... except ValueError as e: 

43 ... print('error') 

44 error 

45 >>> s4 = Settings(database_url='sqlite:///./test.db') 

46 >>> isinstance(s4.database_settings, dict) 

47 True 

48""" 

49 

50# Standard 

51from functools import lru_cache 

52from importlib.resources import files 

53import logging 

54import os 

55from pathlib import Path 

56import re 

57import sys 

58from typing import Annotated, Any, ClassVar, Dict, List, Literal, NotRequired, Optional, Self, Set, TypedDict 

59from urllib.parse import urlparse 

60 

61# Third-Party 

62from cryptography.hazmat.primitives import serialization 

63from cryptography.hazmat.primitives.asymmetric import ed25519 

64import orjson 

65from pydantic import Field, field_validator, HttpUrl, model_validator, PositiveInt, SecretStr, ValidationInfo 

66from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict 

67 

68# Only configure basic logging if no handlers exist yet 

69# This prevents conflicts with LoggingService while ensuring config logging works 

70if not logging.getLogger().handlers: 

71 logging.basicConfig( 

72 level=logging.INFO, 

73 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

74 datefmt="%Y-%m-%dT%H:%M:%S", 

75 ) 

76 

77logger = logging.getLogger(__name__) 

78 

79 

80def _normalize_env_list_vars() -> None: 

81 """Normalize list-typed env vars to valid JSON arrays. 

82 

83 Ensures env values parse cleanly when providers expect JSON for complex types. 

84 If a value is empty or CSV, convert to a JSON array string. 

85 """ 

86 keys = [ 

87 "SSO_TRUSTED_DOMAINS", 

88 "SSO_AUTO_ADMIN_DOMAINS", 

89 "SSO_GITHUB_ADMIN_ORGS", 

90 "SSO_GOOGLE_ADMIN_DOMAINS", 

91 "SSO_ENTRA_ADMIN_GROUPS", 

92 "LOG_DETAILED_SKIP_ENDPOINTS", 

93 ] 

94 for key in keys: 

95 raw = os.environ.get(key) 

96 if raw is None: 

97 continue 

98 s = raw.strip() 

99 if not s: 

100 os.environ[key] = "[]" 

101 continue 

102 if s.startswith("["): 

103 # Already JSON-like, keep as is 

104 try: 

105 orjson.loads(s) 

106 continue 

107 except Exception: 

108 pass # nosec B110 - Intentionally continue with CSV parsing if JSON parsing fails 

109 # Convert CSV to JSON array 

110 items = [item.strip() for item in s.split(",") if item.strip()] 

111 os.environ[key] = orjson.dumps(items).decode() 

112 

113 

114_normalize_env_list_vars() 

115 

116 

117# Default content type for outgoing requests to Forge 

118FORGE_CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

119 

120# UI embedding / visibility controls 

121UI_HIDABLE_SECTIONS = frozenset( 

122 { 

123 "overview", 

124 "servers", 

125 "gateways", 

126 "tools", 

127 "prompts", 

128 "resources", 

129 "roots", 

130 "mcp-registry", 

131 "metrics", 

132 "plugins", 

133 "export-import", 

134 "logs", 

135 "version-info", 

136 "maintenance", 

137 "teams", 

138 "users", 

139 "agents", 

140 "tokens", 

141 "settings", 

142 } 

143) 

144UI_HIDABLE_HEADER_ITEMS = frozenset({"logout", "team_selector", "user_identity", "theme_toggle"}) 

145UI_HIDE_SECTION_ALIASES = { 

146 "catalog": "servers", 

147 "virtual_servers": "servers", 

148 "a2a-agents": "agents", 

149 "a2a": "agents", 

150 "grpc-services": "agents", 

151 "api_tokens": "tokens", 

152 "llm-settings": "settings", 

153} 

154 

155 

156class Settings(BaseSettings): 

157 """ 

158 ContextForge AI Gateway configuration settings. 

159 

160 Examples: 

161 >>> from mcpgateway.config import Settings 

162 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret') 

163 >>> s.api_key 

164 'admin:secret' 

165 >>> s2 = Settings(transport_type='http') 

166 >>> s2.validate_transport() # no error 

167 >>> s3 = Settings(transport_type='invalid') 

168 >>> try: 

169 ... s3.validate_transport() 

170 ... except ValueError as e: 

171 ... print('error') 

172 error 

173 >>> s4 = Settings(database_url='sqlite:///./test.db') 

174 >>> isinstance(s4.database_settings, dict) 

175 True 

176 >>> s5 = Settings() 

177 >>> s5.app_name 

178 'ContextForge' 

179 >>> s5.host in ('0.0.0.0', '127.0.0.1') # Default can be either 

180 True 

181 >>> s5.port 

182 4444 

183 >>> s5.auth_required 

184 True 

185 >>> isinstance(s5.allowed_origins, set) 

186 True 

187 >>> s6 = Settings(log_detailed_skip_endpoints=["/metrics", "/health"]) 

188 >>> s6.log_detailed_skip_endpoints 

189 ['/metrics', '/health'] 

190 >>> s7 = Settings(log_detailed_sample_rate=0.5) 

191 >>> s7.log_detailed_sample_rate 

192 0.5 

193 >>> s8 = Settings(log_resolve_user_identity=True) 

194 >>> s8.log_resolve_user_identity 

195 True 

196 >>> s9 = Settings() 

197 >>> s9.log_detailed_skip_endpoints 

198 [] 

199 >>> s9.log_detailed_sample_rate 

200 1.0 

201 >>> s9.log_resolve_user_identity 

202 False 

203 """ 

204 

205 # Basic Settings 

206 app_name: str = "ContextForge" 

207 host: str = "127.0.0.1" 

208 port: PositiveInt = Field(default=4444, ge=1, le=65535) 

209 client_mode: bool = False 

210 docs_allow_basic_auth: bool = False # Allow basic auth for docs 

211 api_allow_basic_auth: bool = Field( 

212 default=False, 

213 description="Allow Basic authentication for API endpoints. Disabled by default for security. Use JWT or API tokens instead.", 

214 ) 

215 database_url: str = Field( 

216 default="sqlite:///./mcp.db", 

217 description=( 

218 "Database connection URL. Supports SQLite, PostgreSQL, MySQL/MariaDB. " 

219 "For PostgreSQL with custom schema, use the 'options' query parameter: " 

220 "postgresql://user:pass@host:5432/db?options=-c%20search_path=schema_name " 

221 "(See Issue #1535 for details)" 

222 ), 

223 ) 

224 

225 # Absolute paths resolved at import-time (still override-able via env vars) 

226 templates_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "templates"))) 

227 static_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "static"))) 

228 

229 # Template auto-reload: False for production (default), True for development 

230 # Disabling prevents re-parsing templates on each request, improving performance under load 

231 # Use TEMPLATES_AUTO_RELOAD=true for development (make dev sets this automatically) 

232 templates_auto_reload: bool = Field(default=False, description="Auto-reload Jinja2 templates on change (enable for development)") 

233 

234 app_root_path: str = "" 

235 

236 # Protocol 

237 protocol_version: str = "2025-11-25" 

238 

239 # Authentication 

240 basic_auth_user: str = "admin" 

241 basic_auth_password: SecretStr = Field(default=SecretStr("changeme")) 

242 jwt_algorithm: str = "HS256" 

243 jwt_secret_key: SecretStr = Field(default=SecretStr("my-test-key")) 

244 jwt_public_key_path: str = "" 

245 jwt_private_key_path: str = "" 

246 jwt_audience: str = "mcpgateway-api" 

247 jwt_issuer: str = "mcpgateway" 

248 jwt_audience_verification: bool = True 

249 jwt_issuer_verification: bool = True 

250 auth_required: bool = True 

251 allow_unauthenticated_admin: bool = Field( 

252 default=False, 

253 description="Allow unauthenticated requests to receive platform-admin context when AUTH_REQUIRED=false (dangerous; development-only override).", 

254 ) 

255 token_expiry: int = 10080 # minutes 

256 

257 require_token_expiration: bool = Field(default=True, description="Require all JWT tokens to have expiration claims (secure default)") 

258 require_jti: bool = Field(default=True, description="Require JTI (JWT ID) claim in all tokens for revocation support (secure default)") 

259 require_user_in_db: bool = Field( 

260 default=False, 

261 description="Require all authenticated users to exist in the database. When true, disables the platform admin bootstrap mechanism. WARNING: Enabling this on a fresh deployment will lock you out.", 

262 ) 

263 embed_environment_in_tokens: bool = Field(default=False, description="Embed environment claim in gateway-issued JWTs for environment isolation") 

264 validate_token_environment: bool = Field(default=False, description="Reject tokens with mismatched environment claim (tokens without env claim are allowed)") 

265 

266 # JSON Schema Validation for registration (Tool Input Schemas, Prompt schemas, etc) 

267 json_schema_validation_strict: bool = Field(default=True, description="Strict schema validation mode - reject invalid JSON schemas") 

268 

269 # SSO Configuration 

270 sso_enabled: bool = Field(default=False, description="Enable Single Sign-On authentication") 

271 sso_github_enabled: bool = Field(default=False, description="Enable GitHub OAuth authentication") 

272 sso_github_client_id: Optional[str] = Field(default=None, description="GitHub OAuth client ID") 

273 sso_github_client_secret: Optional[SecretStr] = Field(default=None, description="GitHub OAuth client secret") 

274 

275 sso_google_enabled: bool = Field(default=False, description="Enable Google OAuth authentication") 

276 sso_google_client_id: Optional[str] = Field(default=None, description="Google OAuth client ID") 

277 sso_google_client_secret: Optional[SecretStr] = Field(default=None, description="Google OAuth client secret") 

278 

279 sso_ibm_verify_enabled: bool = Field(default=False, description="Enable IBM Security Verify OIDC authentication") 

280 sso_ibm_verify_client_id: Optional[str] = Field(default=None, description="IBM Security Verify client ID") 

281 sso_ibm_verify_client_secret: Optional[SecretStr] = Field(default=None, description="IBM Security Verify client secret") 

282 sso_ibm_verify_issuer: Optional[str] = Field(default=None, description="IBM Security Verify OIDC issuer URL") 

283 

284 sso_okta_enabled: bool = Field(default=False, description="Enable Okta OIDC authentication") 

285 sso_okta_client_id: Optional[str] = Field(default=None, description="Okta client ID") 

286 sso_okta_client_secret: Optional[SecretStr] = Field(default=None, description="Okta client secret") 

287 sso_okta_issuer: Optional[str] = Field(default=None, description="Okta issuer URL") 

288 

289 sso_keycloak_enabled: bool = Field(default=False, description="Enable Keycloak OIDC authentication") 

290 sso_keycloak_base_url: Optional[str] = Field(default=None, description="Keycloak base URL (e.g., https://keycloak.example.com)") 

291 sso_keycloak_public_base_url: Optional[str] = Field( 

292 default=None, 

293 description="Browser-facing Keycloak base URL for authorization redirects (e.g., http://localhost:8180)", 

294 ) 

295 sso_keycloak_realm: str = Field(default="master", description="Keycloak realm name") 

296 sso_keycloak_client_id: Optional[str] = Field(default=None, description="Keycloak client ID") 

297 sso_keycloak_client_secret: Optional[SecretStr] = Field(default=None, description="Keycloak client secret") 

298 sso_keycloak_map_realm_roles: bool = Field(default=True, description="Map Keycloak realm roles to gateway teams") 

299 sso_keycloak_map_client_roles: bool = Field(default=False, description="Map Keycloak client roles to gateway RBAC") 

300 sso_keycloak_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map Keycloak groups/roles to ContextForge roles (JSON: {group_or_role: role_name})") 

301 sso_keycloak_default_role: Optional[str] = Field(default=None, description="Default ContextForge role for Keycloak users without role mapping") 

302 sso_keycloak_resolve_team_scope_to_personal_team: bool = Field(default=False, description="Resolve team-scoped Keycloak role mappings to the user's personal team") 

303 sso_keycloak_username_claim: str = Field(default="preferred_username", description="JWT claim for username") 

304 

305 # Security Validation & Sanitization 

306 experimental_validate_io: bool = Field(default=False, description="Enable experimental input validation and output sanitization") 

307 validation_middleware_enabled: bool = Field(default=False, description="Enable validation middleware for all requests") 

308 validation_strict: bool = Field(default=True, description="Strict validation mode - reject on violations") 

309 sanitize_output: bool = Field(default=True, description="Sanitize output to remove control characters") 

310 allowed_roots: List[str] = Field(default_factory=list, description="Allowed root paths for resource access") 

311 max_path_depth: int = Field(default=10, description="Maximum allowed path depth") 

312 max_param_length: int = Field(default=10000, description="Maximum parameter length") 

313 dangerous_patterns: List[str] = Field( 

314 default_factory=lambda: [ 

315 r"[;&|`$(){}\[\]<>]", # Shell metacharacters 

316 r"\.\.[\\/]", # Path traversal 

317 r"[\x00-\x1f\x7f-\x9f]", # Control characters 

318 ], 

319 description="Regex patterns for dangerous input", 

320 ) 

321 

322 sso_keycloak_email_claim: str = Field(default="email", description="JWT claim for email") 

323 sso_keycloak_groups_claim: str = Field(default="groups", description="JWT claim for groups/roles") 

324 

325 sso_entra_enabled: bool = Field(default=False, description="Enable Microsoft Entra ID OIDC authentication") 

326 sso_entra_client_id: Optional[str] = Field(default=None, description="Microsoft Entra ID client ID") 

327 sso_entra_client_secret: Optional[SecretStr] = Field(default=None, description="Microsoft Entra ID client secret") 

328 sso_entra_tenant_id: Optional[str] = Field(default=None, description="Microsoft Entra ID tenant ID") 

329 sso_entra_groups_claim: str = Field(default="groups", description="JWT claim for EntraID groups (groups/roles)") 

330 sso_entra_admin_groups: Annotated[list[str], NoDecode] = Field(default_factory=list, description="EntraID groups granting platform_admin role (CSV/JSON)") 

331 sso_entra_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map EntraID groups to ContextForge roles (JSON: {group_id: role_name})") 

332 sso_entra_default_role: Optional[str] = Field(default=None, description="Default role for EntraID users without group mapping (None = no role assigned)") 

333 sso_entra_sync_roles_on_login: bool = Field(default=True, description="Synchronize role assignments on each login") 

334 sso_entra_graph_api_enabled: bool = Field(default=True, description="Enable Microsoft Graph fallback for EntraID groups overage claims") 

335 sso_entra_graph_api_timeout: int = Field(default=10, ge=1, le=120, description="Timeout in seconds for Microsoft Graph group fallback requests") 

336 sso_entra_graph_api_max_groups: int = Field(default=0, ge=0, description="Maximum groups to keep from Graph fallback (0 = no limit)") 

337 

338 sso_generic_enabled: bool = Field(default=False, description="Enable generic OIDC provider (Keycloak, Auth0, etc.)") 

339 sso_generic_provider_id: Optional[str] = Field(default=None, description="Provider ID (e.g., 'keycloak', 'auth0', 'authentik')") 

340 sso_generic_display_name: Optional[str] = Field(default=None, description="Display name shown on login page") 

341 sso_generic_client_id: Optional[str] = Field(default=None, description="Generic OIDC client ID") 

342 sso_generic_client_secret: Optional[SecretStr] = Field(default=None, description="Generic OIDC client secret") 

343 sso_generic_authorization_url: Optional[str] = Field(default=None, description="Authorization endpoint URL") 

344 sso_generic_token_url: Optional[str] = Field(default=None, description="Token endpoint URL") 

345 sso_generic_userinfo_url: Optional[str] = Field(default=None, description="Userinfo endpoint URL") 

346 sso_generic_issuer: Optional[str] = Field(default=None, description="OIDC issuer URL") 

347 sso_generic_jwks_uri: Optional[str] = Field(default=None, description="OIDC JWKS endpoint URL for token signature verification") 

348 sso_generic_scope: Optional[str] = Field(default="openid profile email", description="OAuth scopes (space-separated)") 

349 

350 # SSO Settings 

351 sso_auto_create_users: bool = Field(default=True, description="Automatically create users from SSO providers") 

352 sso_trusted_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Trusted email domains (CSV or JSON list)") 

353 sso_preserve_admin_auth: bool = Field(default=True, description="Preserve local admin authentication when SSO is enabled") 

354 

355 # SSO Admin Assignment Settings 

356 sso_auto_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Admin domains (CSV or JSON list)") 

357 sso_github_admin_orgs: Annotated[list[str], NoDecode] = Field(default_factory=list, description="GitHub orgs granting admin (CSV/JSON)") 

358 sso_google_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Google admin domains (CSV/JSON)") 

359 sso_require_admin_approval: bool = Field(default=False, description="Require admin approval for new SSO registrations") 

360 

361 # MCP Client Authentication 

362 mcp_client_auth_enabled: bool = Field(default=True, description="Enable JWT authentication for MCP client operations") 

363 mcp_require_auth: Optional[bool] = Field( 

364 default=None, 

365 description=( 

366 "Require authentication for /mcp endpoints. " 

367 "When unset, inherits AUTH_REQUIRED. " 

368 "Set false explicitly to allow unauthenticated access to public items only; " 

369 "set true to require a valid Bearer token for all /mcp requests." 

370 ), 

371 ) 

372 trust_proxy_auth: bool = Field( 

373 default=False, 

374 description="Trust proxy authentication headers (required when mcp_client_auth_enabled=false)", 

375 ) 

376 trust_proxy_auth_dangerously: bool = Field( 

377 default=False, 

378 description="Acknowledge and allow trusted proxy headers when MCP_CLIENT_AUTH_ENABLED=false (dangerous; only for strictly trusted proxy deployments).", 

379 ) 

380 proxy_user_header: str = Field(default="X-Authenticated-User", description="Header containing authenticated username from proxy") 

381 

382 # Encryption key phrase for auth storage 

383 auth_encryption_secret: SecretStr = Field(default=SecretStr("my-test-salt")) 

384 

385 # Query Parameter Authentication (INSECURE - disabled by default) 

386 insecure_allow_queryparam_auth: bool = Field( 

387 default=False, 

388 description=("Enable query parameter authentication for gateway peers. " "WARNING: API keys may appear in proxy logs. See CWE-598."), 

389 ) 

390 insecure_queryparam_auth_allowed_hosts: List[str] = Field( 

391 default_factory=list, 

392 description=("Allowlist of hosts permitted to use query parameter auth. " "Empty list allows any host when feature is enabled. " "Format: ['mcp.tavily.com', 'api.example.com']"), 

393 ) 

394 

395 # =================================== 

396 # SSRF Protection Configuration 

397 # =================================== 

398 # Server-Side Request Forgery (SSRF) protection prevents the gateway from being 

399 # used to access internal resources or cloud metadata services. 

400 

401 ssrf_protection_enabled: bool = Field( 

402 default=True, 

403 description="Enable SSRF protection for gateway/tool URLs. Blocks access to dangerous endpoints.", 

404 ) 

405 

406 ssrf_blocked_networks: List[str] = Field( 

407 default=[ 

408 # Cloud metadata services (ALWAYS dangerous - credential exposure) 

409 "169.254.169.254/32", # AWS/GCP/Azure instance metadata 

410 "169.254.169.123/32", # AWS NTP service 

411 "fd00::1/128", # IPv6 cloud metadata 

412 # Link-local (often used for cloud metadata) 

413 "169.254.0.0/16", # Full link-local IPv4 range 

414 "fe80::/10", # IPv6 link-local 

415 ], 

416 description=( 

417 "CIDR ranges to block for SSRF protection. These are ALWAYS blocked regardless of other settings. " "Default blocks cloud metadata endpoints. Add private ranges for stricter security." 

418 ), 

419 ) 

420 

421 ssrf_blocked_hosts: List[str] = Field( 

422 default=[ 

423 "metadata.google.internal", # GCP metadata hostname 

424 "metadata.internal", # Generic cloud metadata 

425 ], 

426 description="Hostnames to block for SSRF protection. Matched case-insensitively.", 

427 ) 

428 

429 ssrf_allow_localhost: bool = Field( 

430 default=False, 

431 description=("Allow localhost/loopback addresses (127.0.0.0/8, ::1). " "Default false for safer production behavior."), 

432 ) 

433 

434 ssrf_allow_private_networks: bool = Field( 

435 default=False, 

436 description=( 

437 "Allow RFC 1918 private network addresses (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16). " "When false, private destinations are blocked unless explicitly listed in SSRF_ALLOWED_NETWORKS." 

438 ), 

439 ) 

440 

441 ssrf_allowed_networks: List[str] = Field( 

442 default_factory=list, 

443 description=("Optional CIDR allowlist for internal/private destinations. " "Used when SSRF_ALLOW_PRIVATE_NETWORKS=false to allow specific internal ranges."), 

444 ) 

445 

446 ssrf_dns_fail_closed: bool = Field( 

447 default=True, 

448 description=( 

449 "Fail closed on DNS resolution errors. When true, URLs that cannot be resolved " 

450 "are rejected. When false, unresolvable hostnames are allowed through " 

451 "(hostname blocklist still applies)." 

452 ), 

453 ) 

454 

455 # OAuth Configuration 

456 oauth_request_timeout: int = Field(default=30, description="OAuth request timeout in seconds") 

457 oauth_max_retries: int = Field(default=3, description="Maximum retries for OAuth token requests") 

458 oauth_default_timeout: int = Field(default=3600, description="Default OAuth token timeout in seconds") 

459 

460 # =================================== 

461 # Dynamic Client Registration (DCR) - Client Mode 

462 # =================================== 

463 

464 # Enable DCR client functionality 

465 dcr_enabled: bool = Field(default=True, description="Enable Dynamic Client Registration (RFC 7591) - gateway acts as DCR client") 

466 

467 # Auto-register when missing credentials 

468 dcr_auto_register_on_missing_credentials: bool = Field(default=True, description="Automatically register with AS when gateway has issuer but no client_id") 

469 

470 # Default scopes for DCR 

471 dcr_default_scopes: List[str] = Field(default=["mcp:read"], description="Default MCP scopes to request during DCR") 

472 

473 # Issuer allowlist (empty = allow any) 

474 dcr_allowed_issuers: List[str] = Field(default_factory=list, description="Optional allowlist of issuer URLs for DCR (empty = allow any)") 

475 

476 # Token endpoint auth method 

477 dcr_token_endpoint_auth_method: str = Field(default="client_secret_basic", description="Token endpoint auth method for DCR (client_secret_basic or client_secret_post)") 

478 

479 # Metadata cache TTL 

480 dcr_metadata_cache_ttl: int = Field(default=3600, description="AS metadata cache TTL in seconds (RFC 8414 discovery)") 

481 

482 # Client name template 

483 dcr_client_name_template: str = Field(default="ContextForge ({gateway_name})", description="Template for client_name in DCR requests") 

484 

485 # Refresh token behavior 

486 dcr_request_refresh_token_when_unsupported: bool = Field( 

487 default=False, 

488 description="Request refresh_token even when AS metadata omits grant_types_supported. Enable for AS servers that support refresh tokens but don't advertise it.", 

489 ) 

490 

491 # =================================== 

492 # OAuth Discovery (RFC 8414) 

493 # =================================== 

494 

495 oauth_discovery_enabled: bool = Field(default=True, description="Enable OAuth AS metadata discovery (RFC 8414)") 

496 

497 oauth_preferred_code_challenge_method: str = Field(default="S256", description="Preferred PKCE code challenge method (S256 or plain)") 

498 

499 # Email-Based Authentication 

500 email_auth_enabled: bool = Field(default=True, description="Enable email-based authentication") 

501 public_registration_enabled: bool = Field( 

502 default=False, 

503 description="Allow unauthenticated users to self-register accounts. When false, only admins can create users via /admin/users endpoint.", 

504 ) 

505 allow_public_visibility: bool = Field( 

506 default=True, 

507 description="When false, creating or updating any entity with public visibility is blocked in team scope.", 

508 ) 

509 protect_all_admins: bool = Field( 

510 default=True, 

511 description="When true (default), prevent any admin from being demoted, deactivated, or locked out via API/UI. When false, only the last active admin is protected.", 

512 ) 

513 platform_admin_email: str = Field(default="admin@example.com", description="Platform administrator email address") 

514 platform_admin_password: SecretStr = Field(default=SecretStr("changeme"), description="Platform administrator password") 

515 default_user_password: SecretStr = Field(default=SecretStr("changeme"), description="Default password for new users") # nosec B105 

516 platform_admin_full_name: str = Field(default="Platform Administrator", description="Platform administrator full name") 

517 

518 # Argon2id Password Hashing Configuration 

519 argon2id_time_cost: int = Field(default=3, description="Argon2id time cost (number of iterations)") 

520 argon2id_memory_cost: int = Field(default=65536, description="Argon2id memory cost in KiB") 

521 argon2id_parallelism: int = Field(default=1, description="Argon2id parallelism (number of threads)") 

522 

523 # Password Policy Configuration 

524 password_min_length: int = Field(default=8, description="Minimum password length") 

525 password_require_uppercase: bool = Field(default=True, description="Require uppercase letters in passwords") 

526 password_require_lowercase: bool = Field(default=True, description="Require lowercase letters in passwords") 

527 password_require_numbers: bool = Field(default=False, description="Require numbers in passwords") 

528 password_require_special: bool = Field(default=True, description="Require special characters in passwords") 

529 

530 # Password change enforcement and policy toggles 

531 password_change_enforcement_enabled: bool = Field(default=True, description="Master switch for password change enforcement checks") 

532 admin_require_password_change_on_bootstrap: bool = Field(default=True, description="Force admin to change password after bootstrap") 

533 detect_default_password_on_login: bool = Field(default=True, description="Detect default password during login and mark user for change") 

534 require_password_change_for_default_password: bool = Field(default=True, description="Require password change when user is created with the default password") 

535 password_policy_enabled: bool = Field(default=True, description="Enable password complexity validation for new/changed passwords") 

536 password_prevent_reuse: bool = Field(default=True, description="Prevent reusing the current password when changing") 

537 password_max_age_days: int = Field(default=90, description="Password maximum age in days before expiry forces a change") 

538 # Account Security Configuration 

539 max_failed_login_attempts: int = Field(default=10, description="Maximum failed login attempts before account lockout") 

540 account_lockout_duration_minutes: int = Field(default=1, description="Account lockout duration in minutes") 

541 account_lockout_notification_enabled: bool = Field(default=True, description="Send lockout notification emails when accounts are locked") 

542 failed_login_min_response_ms: int = Field(default=250, description="Minimum response duration for failed login attempts to reduce timing side channels") 

543 

544 # Self-service password reset 

545 password_reset_enabled: bool = Field(default=True, description="Enable self-service password reset workflow (set false to disable public forgot/reset endpoints)") 

546 password_reset_token_expiry_minutes: int = Field(default=60, description="Password reset token expiration time in minutes") 

547 password_reset_rate_limit: int = Field(default=5, description="Maximum password reset requests allowed per email in each rate-limit window") 

548 password_reset_rate_window_minutes: int = Field(default=15, description="Password reset request rate-limit window in minutes") 

549 password_reset_invalidate_sessions: bool = Field(default=True, description="Invalidate active sessions after password reset") 

550 password_reset_min_response_ms: int = Field(default=250, description="Minimum response duration for forgot-password requests to reduce timing side channels") 

551 

552 # Email delivery for auth notifications 

553 smtp_enabled: bool = Field( 

554 default=False, 

555 description="Enable SMTP email delivery for password reset and account lockout notifications (when false, reset requests are accepted but no email is sent)", 

556 ) 

557 smtp_host: Optional[str] = Field(default=None, description="SMTP server host") 

558 smtp_port: int = Field(default=587, description="SMTP server port") 

559 smtp_user: Optional[str] = Field(default=None, description="SMTP username") 

560 smtp_password: Optional[SecretStr] = Field(default=None, description="SMTP password") 

561 smtp_from_email: Optional[str] = Field(default=None, description="From email address used for auth notifications") 

562 smtp_from_name: str = Field(default="ContextForge", description="From display name used for auth notifications") 

563 smtp_use_tls: bool = Field(default=True, description="Use STARTTLS for SMTP connections") 

564 smtp_use_ssl: bool = Field(default=False, description="Use implicit SSL/TLS for SMTP connections") 

565 smtp_timeout_seconds: int = Field(default=15, description="SMTP connection timeout in seconds") 

566 

567 # Personal Teams Configuration 

568 auto_create_personal_teams: bool = Field(default=True, description="Enable automatic personal team creation for new users") 

569 personal_team_prefix: str = Field(default="", description="Personal team naming prefix") 

570 max_teams_per_user: int = Field(default=50, description="Maximum number of teams a user can belong to") 

571 max_members_per_team: int = Field(default=100, description="Maximum number of members per team") 

572 invitation_expiry_days: int = Field(default=7, description="Number of days before team invitations expire") 

573 require_email_verification_for_invites: bool = Field(default=True, description="Require email verification for team invitations") 

574 

575 # Team Governance 

576 allow_team_creation: bool = Field(default=True, description="Allow users to create organizational teams. Admins can always create teams.") 

577 allow_team_join_requests: bool = Field(default=True, description="Allow users to request to join public teams") 

578 allow_team_invitations: bool = Field(default=True, description="Allow team owners to send invitations") 

579 

580 # Default Role Configuration 

581 default_admin_role: str = Field(default="platform_admin", description="Global role assigned to admin users") 

582 default_user_role: str = Field(default="platform_viewer", description="Global role assigned to non-admin users") 

583 default_team_owner_role: str = Field(default="team_admin", description="Team-scoped role assigned to team owners (e.g. personal team creator)") 

584 default_team_member_role: str = Field(default="viewer", description="Team-scoped role assigned to team members") 

585 

586 # UI/Admin Feature Flags 

587 mcpgateway_ui_enabled: bool = False 

588 mcpgateway_admin_api_enabled: bool = False 

589 mcpgateway_ui_airgapped: bool = Field(default=False, description="Use local CDN assets instead of external CDNs for airgapped deployments") 

590 mcpgateway_ui_embedded: bool = Field(default=False, description="Enable embedded UI mode (hides select header controls by default)") 

591 mcpgateway_ui_hide_sections: Annotated[list[str], NoDecode] = Field( 

592 default_factory=list, 

593 description=( 

594 "CSV/JSON list of UI sections to hide. " 

595 "Valid values: overview, servers, gateways, tools, prompts, resources, roots, mcp-registry, " 

596 "metrics, plugins, export-import, logs, version-info, maintenance, teams, users, agents, tokens, settings" 

597 ), 

598 ) 

599 mcpgateway_ui_hide_header_items: Annotated[list[str], NoDecode] = Field( 

600 default_factory=list, 

601 description="CSV/JSON list of header items to hide. Valid values: logout, team_selector, user_identity, theme_toggle", 

602 ) 

603 mcpgateway_bulk_import_enabled: bool = True 

604 mcpgateway_bulk_import_max_tools: int = 200 

605 mcpgateway_bulk_import_rate_limit: int = 10 

606 

607 # UI Tool Test Configuration 

608 mcpgateway_ui_tool_test_timeout: int = Field(default=60000, description="Tool test timeout in milliseconds for the admin UI") 

609 

610 # Tool Execution Cancellation 

611 mcpgateway_tool_cancellation_enabled: bool = Field(default=True, description="Enable gateway-authoritative tool execution cancellation with REST API endpoints") 

612 

613 # A2A (Agent-to-Agent) Feature Flags 

614 mcpgateway_a2a_enabled: bool = True 

615 mcpgateway_a2a_max_agents: int = 100 

616 mcpgateway_a2a_default_timeout: int = 30 

617 mcpgateway_a2a_max_retries: int = 3 

618 mcpgateway_a2a_metrics_enabled: bool = True 

619 

620 # gRPC Support Configuration (EXPERIMENTAL - disabled by default) 

621 mcpgateway_grpc_enabled: bool = Field(default=False, description="Enable gRPC to MCP translation support (experimental feature)") 

622 mcpgateway_grpc_reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection by default") 

623 mcpgateway_grpc_max_message_size: int = Field(default=4194304, description="Maximum gRPC message size in bytes (4MB)") 

624 mcpgateway_grpc_timeout: int = Field(default=30, description="Default gRPC call timeout in seconds") 

625 mcpgateway_grpc_tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connections by default") 

626 

627 # Direct Proxy Configuration (disabled by default) 

628 mcpgateway_direct_proxy_enabled: bool = Field(default=False, description="Enable direct_proxy gateway mode for pass-through MCP operations") 

629 mcpgateway_direct_proxy_timeout: int = Field(default=30, description="Default timeout in seconds for direct proxy MCP operations") 

630 

631 # =================================== 

632 # Performance Monitoring Configuration 

633 # =================================== 

634 mcpgateway_performance_tracking: bool = Field(default=False, description="Enable performance tracking tab in admin UI") 

635 mcpgateway_performance_collection_interval: int = Field(default=10, ge=1, le=300, description="Metric collection interval in seconds") 

636 mcpgateway_performance_retention_hours: int = Field(default=24, ge=1, le=168, description="Snapshot retention period in hours") 

637 mcpgateway_performance_retention_days: int = Field(default=90, ge=1, le=365, description="Aggregate retention period in days") 

638 mcpgateway_performance_max_snapshots: int = Field(default=10000, ge=100, le=1000000, description="Maximum performance snapshots to retain") 

639 mcpgateway_performance_distributed: bool = Field(default=False, description="Enable distributed mode metrics aggregation via Redis") 

640 mcpgateway_performance_net_connections_enabled: bool = Field(default=True, description="Enable network connections counting (can be CPU intensive)") 

641 mcpgateway_performance_net_connections_cache_ttl: int = Field(default=15, ge=1, le=300, description="Cache TTL for net_connections in seconds") 

642 

643 # MCP Server Catalog Configuration 

644 mcpgateway_catalog_enabled: bool = Field(default=True, description="Enable MCP server catalog feature") 

645 mcpgateway_catalog_file: str = Field(default="mcp-catalog.yml", description="Path to catalog configuration file") 

646 mcpgateway_catalog_auto_health_check: bool = Field(default=True, description="Automatically health check catalog servers") 

647 mcpgateway_catalog_cache_ttl: int = Field(default=3600, description="Catalog cache TTL in seconds") 

648 mcpgateway_catalog_page_size: int = Field(default=100, description="Number of catalog servers per page") 

649 

650 # ContextForge Bootstrap Roles In DB Configuration 

651 mcpgateway_bootstrap_roles_in_db_enabled: bool = Field(default=False, description="Enable ContextForge add additional roles in db") 

652 mcpgateway_bootstrap_roles_in_db_file: str = Field(default="additional_roles_in_db.json", description="Path to add additional roles in db") 

653 

654 # Elicitation support (MCP 2025-06-18) 

655 mcpgateway_elicitation_enabled: bool = Field(default=True, description="Enable elicitation passthrough support (MCP 2025-06-18)") 

656 mcpgateway_elicitation_timeout: int = Field(default=60, description="Default timeout for elicitation requests in seconds") 

657 mcpgateway_elicitation_max_concurrent: int = Field(default=100, description="Maximum concurrent elicitation requests") 

658 

659 # Security 

660 skip_ssl_verify: bool = Field( 

661 default=False, 

662 description="Skip SSL certificate verification for ALL outbound HTTPS requests " 

663 "(federation, MCP servers, LLM providers, A2A agents). " 

664 "WARNING: Only enable in dev environments with self-signed certificates.", 

665 ) 

666 cors_enabled: bool = True 

667 

668 # Environment 

669 environment: Literal["development", "staging", "production"] = Field(default="development") 

670 

671 # Domain configuration 

672 app_domain: HttpUrl = Field(default=HttpUrl("http://localhost:4444")) 

673 

674 # Security settings 

675 secure_cookies: bool = Field(default=True) 

676 cookie_samesite: str = Field(default="lax") 

677 

678 # CORS settings 

679 cors_allow_credentials: bool = Field(default=True) 

680 

681 # Security Headers Configuration 

682 security_headers_enabled: bool = Field(default=True) 

683 x_frame_options: Optional[str] = Field(default="DENY") 

684 

685 @field_validator("x_frame_options") 

686 @classmethod 

687 def normalize_x_frame_options(cls, v: Optional[str]) -> Optional[str]: 

688 """Convert string 'null', 'none', or empty/whitespace-only string to Python None to disable iframe restrictions. 

689 

690 Args: 

691 v: The X-Frame-Options value to normalize. 

692 

693 Returns: 

694 None if v is None, an empty/whitespace-only string, or case-insensitive 'null'/'none'; 

695 otherwise returns the stripped string value. 

696 """ 

697 if v is None: 

698 return None 

699 val = v.strip() 

700 if val == "" or val.lower() in ("null", "none"): 

701 return None 

702 return val 

703 

704 x_content_type_options_enabled: bool = Field(default=True) 

705 x_xss_protection_enabled: bool = Field(default=True) 

706 x_download_options_enabled: bool = Field(default=True) 

707 hsts_enabled: bool = Field(default=True) 

708 hsts_max_age: int = Field(default=31536000) # 1 year 

709 hsts_include_subdomains: bool = Field(default=True) 

710 remove_server_headers: bool = Field(default=True) 

711 

712 # Response Compression Configuration 

713 compression_enabled: bool = Field(default=True, description="Enable response compression (Brotli, Zstd, GZip)") 

714 compression_minimum_size: int = Field(default=500, ge=0, description="Minimum response size in bytes to compress (0 = compress all)") 

715 compression_gzip_level: int = Field(default=6, ge=1, le=9, description="GZip compression level (1=fastest, 9=best compression)") 

716 compression_brotli_quality: int = Field(default=4, ge=0, le=11, description="Brotli compression quality (0-3=fast, 4-9=balanced, 10-11=max)") 

717 compression_zstd_level: int = Field(default=3, ge=1, le=22, description="Zstd compression level (1-3=fast, 4-9=balanced, 10+=slow)") 

718 

719 # For allowed_origins, strip '' to ensure we're passing on valid JSON via env 

720 # Tell pydantic *not* to touch this env var - our validator will. 

721 allowed_origins: Annotated[Set[str], NoDecode] = { 

722 "http://localhost", 

723 "http://localhost:4444", 

724 } 

725 

726 # Security validation thresholds 

727 min_secret_length: int = 32 

728 min_password_length: int = 12 

729 require_strong_secrets: bool = False # Default to False for backward compatibility, will be enforced in 1.0.0 

730 

731 llmchat_enabled: bool = Field(default=True, description="Enable LLM Chat feature") 

732 mcpgateway_stdio_transport_enabled: bool = Field( 

733 default=False, 

734 description=("Enable stdio transport for MCP chat client configuration. Disabled by default; " "set true only in trusted environments that intentionally need stdio process execution."), 

735 ) 

736 toolops_enabled: bool = Field(default=False, description="Enable ToolOps feature") 

737 plugins_can_override_rbac: bool = Field( 

738 default=False, 

739 description=("Allow HTTP_AUTH_CHECK_PERMISSION plugins to short-circuit built-in RBAC grants. " "Disabled by default so plugin grant decisions are audit-only unless explicitly enabled."), 

740 ) 

741 

742 # database-backed polling settings for session message delivery 

743 poll_interval: float = Field(default=1.0, description="Initial polling interval in seconds for checking new session messages") 

744 max_interval: float = Field(default=5.0, description="Maximum polling interval in seconds when the session is idle") 

745 backoff_factor: float = Field(default=1.5, description="Multiplier used to gradually increase the polling interval during inactivity") 

746 

747 # redis configurations for Maintaining Chat Sessions in multi-worker environment 

748 llmchat_session_ttl: int = Field(default=300, description="Seconds for active_session key TTL") 

749 llmchat_session_lock_ttl: int = Field(default=30, description="Seconds for lock expiry") 

750 llmchat_session_lock_retries: int = Field(default=10, description="How many times to poll while waiting") 

751 llmchat_session_lock_wait: float = Field(default=0.2, description="Seconds between polls") 

752 llmchat_chat_history_ttl: int = Field(default=3600, description="Seconds for chat history expiry") 

753 llmchat_chat_history_max_messages: int = Field(default=50, description="Maximum message history to store per user") 

754 

755 # LLM Settings (Internal API for LLM Chat) 

756 llm_api_prefix: str = Field(default="/v1", description="API prefix for internal LLM endpoints") 

757 llm_request_timeout: int = Field(default=120, description="Request timeout in seconds for LLM API calls") 

758 llm_streaming_enabled: bool = Field(default=True, description="Enable streaming responses for LLM Chat") 

759 llm_health_check_interval: int = Field(default=300, description="Provider health check interval in seconds") 

760 

761 @field_validator("allowed_roots", mode="before") 

762 @classmethod 

763 def parse_allowed_roots(cls, v): 

764 """Parse allowed roots from environment variable or config value. 

765 

766 Args: 

767 v: The input value to parse 

768 

769 Returns: 

770 list: Parsed list of allowed root paths 

771 """ 

772 if isinstance(v, str): 

773 # Support both JSON array and comma-separated values 

774 v = v.strip() 

775 if not v: 

776 return [] 

777 # Try JSON first 

778 try: 

779 loaded = orjson.loads(v) 

780 if isinstance(loaded, list): 

781 return loaded 

782 except orjson.JSONDecodeError: 

783 # Not a valid JSON array → fallback to comma-separated parsing 

784 pass 

785 # Fallback to comma-split 

786 return [x.strip() for x in v.split(",") if x.strip()] 

787 return v 

788 

789 @field_validator("jwt_secret_key", "auth_encryption_secret") 

790 @classmethod 

791 def validate_secrets(cls, v: Any, info: ValidationInfo) -> SecretStr: 

792 """ 

793 Validate that secret keys meet basic security requirements. 

794 

795 This validator is applied to the `jwt_secret_key` and `auth_encryption_secret` fields. 

796 It performs the following checks: 

797 

798 1. Detects default or weak secrets (e.g., "changeme", "secret", "password"). 

799 Logs a warning if detected. 

800 

801 2. Checks minimum length (at least 32 characters). Logs a warning if shorter. 

802 

803 3. Performs a basic entropy check (at least 10 unique characters). Logs a warning if low. 

804 

805 Notes: 

806 - Logging is used for warnings; the function does not raise exceptions. 

807 - The original value is returned as a `SecretStr` for safe handling. 

808 

809 Args: 

810 v: The secret value to validate. 

811 info: Pydantic validation info object, used to get the field name. 

812 

813 Returns: 

814 SecretStr: The validated secret value, wrapped as a SecretStr if it wasn't already. 

815 """ 

816 

817 field_name = info.field_name 

818 

819 # Extract actual string value safely 

820 if isinstance(v, SecretStr): 

821 value = v.get_secret_value() 

822 else: 

823 value = str(v) 

824 

825 # Check for default/weak secrets 

826 if not info.data.get("client_mode"): 

827 weak_secrets = ["my-test-key", "my-test-salt", "changeme", "secret", "password"] 

828 if value.lower() in weak_secrets: 

829 logger.warning(f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! Please set a strong, unique value for production.") 

830 

831 # Check minimum length 

832 if len(value) < 32: 

833 logger.warning(f"⚠️ SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. Current length: {len(value)}") 

834 

835 # Basic entropy check (at least 10 unique characters) 

836 if len(set(value)) < 10: 

837 logger.warning(f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. Consider using a more random value.") 

838 

839 # Always return SecretStr to keep it secret-safe 

840 return v if isinstance(v, SecretStr) else SecretStr(value) 

841 

842 @field_validator("basic_auth_password") 

843 @classmethod 

844 def validate_admin_password(cls, v: str | SecretStr, info: ValidationInfo) -> SecretStr: 

845 """Validate admin password meets security requirements. 

846 

847 Args: 

848 v: The admin password value to validate. 

849 info: ValidationInfo containing field data. 

850 

851 Returns: 

852 SecretStr: The validated admin password value, wrapped as SecretStr. 

853 """ 

854 # Extract actual string value safely 

855 if isinstance(v, SecretStr): 

856 value = v.get_secret_value() 

857 else: 

858 value = v 

859 

860 if not info.data.get("client_mode"): 

861 if value == "changeme": # nosec B105 - checking for default value 

862 logger.warning("🔓 SECURITY WARNING: Default BASIC_AUTH_PASSWORD detected! Please change it if you enable API_ALLOW_BASIC_AUTH.") 

863 

864 # Note: We can't access password_min_length here as it's not set yet during validation 

865 # Using default value of 8 to match the field default 

866 min_length = 8 # This matches the default in password_min_length field 

867 if len(value) < min_length: 

868 logger.warning(f"⚠️ SECURITY WARNING: Admin password should be at least {min_length} characters long. Current length: {len(value)}") 

869 

870 # Check password complexity 

871 has_upper = any(c.isupper() for c in value) 

872 has_lower = any(c.islower() for c in value) 

873 has_digit = any(c.isdigit() for c in value) 

874 has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', value)) 

875 

876 complexity_score = sum([has_upper, has_lower, has_digit, has_special]) 

877 if complexity_score < 3: 

878 logger.warning("🔐 SECURITY WARNING: Admin password has low complexity. Should contain at least 3 of: uppercase, lowercase, digits, special characters") 

879 

880 # Always return SecretStr to keep it secret-safe 

881 return v if isinstance(v, SecretStr) else SecretStr(value) 

882 

883 @field_validator("allowed_origins") 

884 @classmethod 

885 def validate_cors_origins(cls, v: Any, info: ValidationInfo) -> set[str] | None: 

886 """Validate CORS allowed origins. 

887 

888 Args: 

889 v: The set of allowed origins to validate. 

890 info: ValidationInfo containing field data. 

891 

892 Returns: 

893 set: The validated set of allowed origins. 

894 

895 Raises: 

896 ValueError: If allowed_origins is not a set or list of strings. 

897 """ 

898 if v is None: 

899 return v 

900 if not isinstance(v, (set, list)): 

901 raise ValueError("allowed_origins must be a set or list of strings") 

902 

903 dangerous_origins = ["*", "null", ""] 

904 if not info.data.get("client_mode"): 

905 for origin in v: 

906 if origin in dangerous_origins: 

907 logger.warning(f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. Consider specifying explicit origins instead of wildcards.") 

908 

909 # Validate URL format 

910 if not origin.startswith(("http://", "https://")) and origin not in dangerous_origins: 

911 logger.warning(f"⚠️ SECURITY WARNING: Invalid origin format '{origin}'. Origins should start with http:// or https://") 

912 

913 return set({str(origin) for origin in v}) 

914 

915 @field_validator("database_url") 

916 @classmethod 

917 def validate_database_url(cls, v: str, info: ValidationInfo) -> str: 

918 """Validate database connection string security. 

919 

920 Args: 

921 v: The database URL to validate. 

922 info: ValidationInfo containing field data. 

923 

924 Returns: 

925 str: The validated database URL. 

926 """ 

927 # Check for hardcoded passwords in non-SQLite databases 

928 if not info.data.get("client_mode"): 

929 if not v.startswith("sqlite"): 

930 if "password" in v and any(weak in v for weak in ["password", "123", "admin", "test"]): 

931 logger.warning("Potentially weak database password detected. Consider using a stronger password.") 

932 

933 # Warn about SQLite in production 

934 if v.startswith("sqlite"): 

935 logger.info("Using SQLite database. Consider PostgreSQL or MySQL for production.") 

936 

937 return v 

938 

939 @model_validator(mode="after") 

940 def validate_security_combinations(self) -> Self: 

941 """Validate security setting combinations. Only logs warnings; no changes are made. 

942 

943 Returns: 

944 Itself. 

945 """ 

946 if not self.client_mode: 

947 # Check for dangerous combinations - only log warnings, don't raise errors 

948 if not self.auth_required and self.mcpgateway_ui_enabled: 

949 logger.warning("🔓 SECURITY WARNING: Admin UI is enabled without authentication. Consider setting AUTH_REQUIRED=true for production.") 

950 

951 if self.skip_ssl_verify and not self.dev_mode: 

952 logger.warning("🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. This is a security risk! Set SKIP_SSL_VERIFY=false for production.") 

953 

954 if self.debug and not self.dev_mode: 

955 logger.warning("🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. This may leak sensitive information! Set DEBUG=false for production.") 

956 

957 return self 

958 

959 def get_security_warnings(self) -> List[str]: 

960 """Get list of security warnings for current configuration. 

961 

962 Returns: 

963 List[str]: List of security warning messages. 

964 """ 

965 warnings = [] 

966 

967 # Authentication warnings 

968 if not self.auth_required: 

969 warnings.append("🔓 Authentication is disabled - ensure this is intentional") 

970 

971 if self.basic_auth_user == "admin": 

972 warnings.append("⚠️ Using default admin username - consider changing it") 

973 

974 # SSL/TLS warnings 

975 if self.skip_ssl_verify: 

976 warnings.append("🔓 SSL verification is disabled - not recommended for production") 

977 

978 # Debug/Dev warnings 

979 if self.debug and not self.dev_mode: 

980 warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage") 

981 

982 if self.dev_mode: 

983 warnings.append("🔧 Development mode enabled - not for production use") 

984 

985 # CORS warnings 

986 if self.cors_enabled and "*" in self.allowed_origins: 

987 warnings.append("🌐 CORS allows all origins (*) - this is a security risk") 

988 

989 # Token warnings 

990 if self.token_expiry > 10080: # More than 7 days 

991 warnings.append("⏱️ JWT token expiry is very long - consider shorter duration") 

992 

993 # Database warnings 

994 if self.database_url.startswith("sqlite") and not self.dev_mode: 

995 warnings.append("💾 SQLite database in use - consider PostgreSQL/MySQL for production") 

996 

997 # Rate limiting warnings 

998 if self.tool_rate_limit > 1000: 

999 warnings.append("🚦 Tool rate limit is very high - may allow abuse") 

1000 

1001 return warnings 

1002 

1003 class SecurityStatus(TypedDict): 

1004 """TypedDict for comprehensive security status.""" 

1005 

1006 secure_secrets: bool 

1007 auth_enabled: bool 

1008 ssl_verification: bool 

1009 debug_disabled: bool 

1010 cors_restricted: bool 

1011 ui_protected: bool 

1012 warnings: List[str] 

1013 security_score: int 

1014 

1015 def get_security_status(self) -> SecurityStatus: 

1016 """Get comprehensive security status. 

1017 

1018 Returns: 

1019 SecurityStatus: Dictionary containing security status information including score and warnings. 

1020 """ 

1021 

1022 # Compute a security score: 100 minus 10 for each warning 

1023 security_score = max(0, 100 - 10 * len(self.get_security_warnings())) 

1024 

1025 return { 

1026 "secure_secrets": (self.jwt_secret_key.get_secret_value() if isinstance(self.jwt_secret_key, SecretStr) else self.jwt_secret_key) 

1027 != "my-test-key", # nosec B105 - checking for default value 

1028 "auth_enabled": self.auth_required, 

1029 "ssl_verification": not self.skip_ssl_verify, 

1030 "debug_disabled": not self.debug, 

1031 "cors_restricted": "*" not in self.allowed_origins if self.cors_enabled else True, 

1032 "ui_protected": not self.mcpgateway_ui_enabled or self.auth_required, 

1033 "warnings": self.get_security_warnings(), 

1034 "security_score": security_score, 

1035 } 

1036 

1037 # Max retries for HTTP requests 

1038 retry_max_attempts: int = 3 

1039 retry_base_delay: float = 1.0 # seconds 

1040 retry_max_delay: int = 60 # seconds 

1041 retry_jitter_max: float = 0.5 # fraction of base delay 

1042 

1043 # HTTPX Client Configuration (for shared singleton client) 

1044 # See: https://www.python-httpx.org/advanced/#pool-limits 

1045 # Formula: max_connections = expected_concurrent_outbound_requests × 1.5 

1046 httpx_max_connections: int = Field( 

1047 default=200, 

1048 ge=10, 

1049 le=1000, 

1050 description="Maximum total concurrent HTTP connections (global, not per-host). " "Increase for high-traffic deployments with many outbound calls.", 

1051 ) 

1052 httpx_max_keepalive_connections: int = Field( 

1053 default=100, 

1054 ge=1, 

1055 le=500, 

1056 description="Maximum idle keepalive connections to retain (typically 50% of max_connections)", 

1057 ) 

1058 httpx_keepalive_expiry: float = Field( 

1059 default=30.0, 

1060 ge=5.0, 

1061 le=300.0, 

1062 description="Seconds before idle keepalive connections are closed", 

1063 ) 

1064 httpx_connect_timeout: float = Field( 

1065 default=5.0, 

1066 ge=1.0, 

1067 le=60.0, 

1068 description="Timeout in seconds for establishing new connections (5s for LAN, increase for WAN)", 

1069 ) 

1070 httpx_read_timeout: float = Field( 

1071 default=120.0, 

1072 ge=1.0, 

1073 le=600.0, 

1074 description="Timeout in seconds for reading response data (set high for slow MCP tool calls)", 

1075 ) 

1076 httpx_write_timeout: float = Field( 

1077 default=30.0, 

1078 ge=1.0, 

1079 le=600.0, 

1080 description="Timeout in seconds for writing request data", 

1081 ) 

1082 httpx_pool_timeout: float = Field( 

1083 default=10.0, 

1084 ge=1.0, 

1085 le=120.0, 

1086 description="Timeout in seconds waiting for a connection from the pool (fail fast on exhaustion)", 

1087 ) 

1088 httpx_http2_enabled: bool = Field( 

1089 default=False, 

1090 description="Enable HTTP/2 (requires h2 package; enable only if upstreams support HTTP/2)", 

1091 ) 

1092 httpx_admin_read_timeout: float = Field( 

1093 default=30.0, 

1094 ge=1.0, 

1095 le=120.0, 

1096 description="Read timeout for admin UI operations (model fetching, health checks). " "Shorter than httpx_read_timeout to fail fast on admin pages.", 

1097 ) 

1098 

1099 @field_validator("allowed_origins", mode="before") 

1100 @classmethod 

1101 def _parse_allowed_origins(cls, v: Any) -> Set[str]: 

1102 """Parse allowed origins from environment variable or config value. 

1103 

1104 Handles multiple input formats for the allowed_origins field: 

1105 - JSON array string: '["http://localhost", "http://example.com"]' 

1106 - Comma-separated string: "http://localhost, http://example.com" 

1107 - Already parsed set/list 

1108 

1109 Automatically strips whitespace and removes outer quotes if present. 

1110 

1111 Args: 

1112 v: The input value to parse. Can be a string (JSON or CSV), set, list, or other iterable. 

1113 

1114 Returns: 

1115 Set[str]: A set of allowed origin strings. 

1116 

1117 Examples: 

1118 >>> sorted(Settings._parse_allowed_origins('["https://a.com", "https://b.com"]')) 

1119 ['https://a.com', 'https://b.com'] 

1120 >>> sorted(Settings._parse_allowed_origins("https://x.com , https://y.com")) 

1121 ['https://x.com', 'https://y.com'] 

1122 >>> Settings._parse_allowed_origins('""') 

1123 set() 

1124 >>> Settings._parse_allowed_origins('"https://single.com"') 

1125 {'https://single.com'} 

1126 >>> sorted(Settings._parse_allowed_origins(['http://a.com', 'http://b.com'])) 

1127 ['http://a.com', 'http://b.com'] 

1128 >>> Settings._parse_allowed_origins({'http://existing.com'}) 

1129 {'http://existing.com'} 

1130 """ 

1131 if isinstance(v, str): 

1132 v = v.strip() 

1133 if v[:1] in "\"'" and v[-1:] == v[:1]: # strip 1 outer quote pair 

1134 v = v[1:-1] 

1135 try: 

1136 parsed = set(orjson.loads(v)) 

1137 except orjson.JSONDecodeError: 

1138 parsed = {s.strip() for s in v.split(",") if s.strip()} 

1139 return parsed 

1140 return set(v) 

1141 

1142 # Logging 

1143 log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default="ERROR") 

1144 log_requests: bool = Field(default=False, description="Enable request payload logging with sensitive data masking") 

1145 log_format: Literal["json", "text"] = "json" # json or text 

1146 log_to_file: bool = False # Enable file logging (default: stdout/stderr only) 

1147 log_filemode: str = "a+" # append or overwrite 

1148 log_file: Optional[str] = None # Only used if log_to_file=True 

1149 log_folder: Optional[str] = None # Only used if log_to_file=True 

1150 

1151 # Log Rotation (optional - only used if log_to_file=True) 

1152 log_rotation_enabled: bool = False # Enable log file rotation 

1153 log_max_size_mb: int = 1 # Max file size in MB before rotation (default: 1MB) 

1154 log_backup_count: int = 5 # Number of backup files to keep (default: 5) 

1155 

1156 # Detailed Request Logging Configuration 

1157 log_detailed_max_body_size: int = Field( 

1158 default=16384, # 16KB - sensible default for request body logging 

1159 ge=1024, 

1160 le=1048576, # Max 1MB 

1161 description="Maximum request body size to log in detailed mode (bytes). Separate from log_max_size_mb which is for file rotation.", 

1162 ) 

1163 

1164 # Optional: endpoints to skip for detailed request logging (prefix match) 

1165 log_detailed_skip_endpoints: List[str] = Field( 

1166 default_factory=list, 

1167 description="List of path prefixes to skip when log_detailed_requests is enabled", 

1168 ) 

1169 

1170 # Whether to attempt resolving user identity via DB fallback when logging. 

1171 # Keep default False to avoid implicit DB queries during normal request handling. 

1172 log_resolve_user_identity: bool = Field( 

1173 default=False, 

1174 description="If true, RequestLoggingMiddleware will attempt DB fallback to resolve user identity when needed", 

1175 ) 

1176 

1177 # Sampling rate for detailed request logging (0.0-1.0). Applied when log_detailed_requests is enabled. 

1178 log_detailed_sample_rate: float = Field( 

1179 default=1.0, 

1180 ge=0.0, 

1181 le=1.0, 

1182 description="Fraction of requests to sample for detailed logging (0.0-1.0)", 

1183 ) 

1184 

1185 # Log Buffer (for in-memory storage in admin UI) 

1186 log_buffer_size_mb: float = 1.0 # Size of in-memory log buffer in MB 

1187 

1188 # =================================== 

1189 # Observability Configuration 

1190 # =================================== 

1191 

1192 # Enable observability features (traces, spans, metrics) 

1193 observability_enabled: bool = Field(default=False, description="Enable observability tracing and metrics collection") 

1194 

1195 # Automatic HTTP request tracing 

1196 observability_trace_http_requests: bool = Field(default=True, description="Automatically trace HTTP requests") 

1197 

1198 # Trace retention period (days) 

1199 observability_trace_retention_days: int = Field(default=7, ge=1, description="Number of days to retain trace data") 

1200 

1201 # Maximum traces to store (prevents unbounded growth) 

1202 observability_max_traces: int = Field(default=100000, ge=1000, description="Maximum number of traces to retain") 

1203 

1204 # Sample rate (0.0 to 1.0) - 1.0 means trace everything 

1205 observability_sample_rate: float = Field(default=1.0, ge=0.0, le=1.0, description="Trace sampling rate (0.0-1.0)") 

1206 

1207 # Include paths for tracing (regex patterns) 

1208 observability_include_paths: List[str] = Field( 

1209 default_factory=lambda: [ 

1210 r"^/rpc/?$", 

1211 r"^/sse$", 

1212 r"^/message$", 

1213 r"^/mcp(?:/|$)", 

1214 r"^/servers/[^/]+/mcp/?$", 

1215 r"^/servers/[^/]+/sse$", 

1216 r"^/servers/[^/]+/message$", 

1217 r"^/a2a(?:/|$)", 

1218 ], 

1219 description="Regex patterns to include for tracing (when empty, all paths are eligible before excludes)", 

1220 ) 

1221 

1222 # Exclude paths from tracing (regex patterns) 

1223 observability_exclude_paths: List[str] = Field( 

1224 default_factory=lambda: ["/health", "/healthz", "/ready", "/metrics", "/static/.*"], 

1225 description="Regex patterns to exclude from tracing (applies after include patterns)", 

1226 ) 

1227 

1228 # Enable performance metrics 

1229 observability_metrics_enabled: bool = Field(default=True, description="Enable metrics collection") 

1230 

1231 # Enable span events 

1232 observability_events_enabled: bool = Field(default=True, description="Enable event logging within spans") 

1233 

1234 # Correlation ID Settings 

1235 correlation_id_enabled: bool = Field(default=True, description="Enable automatic correlation ID tracking for requests") 

1236 correlation_id_header: str = Field(default="X-Correlation-ID", description="HTTP header name for correlation ID") 

1237 correlation_id_preserve: bool = Field(default=True, description="Preserve correlation IDs from incoming requests") 

1238 correlation_id_response_header: bool = Field(default=True, description="Include correlation ID in response headers") 

1239 

1240 # =================================== 

1241 # Database Query Logging (N+1 Detection) 

1242 # =================================== 

1243 db_query_log_enabled: bool = Field(default=False, description="Enable database query logging to file (for N+1 detection)") 

1244 db_query_log_file: str = Field(default="logs/db-queries.log", description="Path to database query log file") 

1245 db_query_log_json_file: str = Field(default="logs/db-queries.jsonl", description="Path to JSON Lines query log file") 

1246 db_query_log_format: str = Field(default="both", description="Log format: 'json', 'text', or 'both'") 

1247 db_query_log_min_queries: int = Field(default=1, ge=1, description="Only log requests with >= N queries") 

1248 db_query_log_include_params: bool = Field(default=False, description="Include query parameters (may expose sensitive data)") 

1249 db_query_log_detect_n1: bool = Field(default=True, description="Automatically detect and flag N+1 query patterns") 

1250 db_query_log_n1_threshold: int = Field(default=3, ge=2, description="Number of similar queries to flag as potential N+1") 

1251 

1252 # Structured Logging Configuration 

1253 structured_logging_enabled: bool = Field(default=True, description="Enable structured JSON logging with database persistence") 

1254 structured_logging_database_enabled: bool = Field(default=False, description="Persist structured logs to database (enables /api/logs/* endpoints, impacts performance)") 

1255 structured_logging_external_enabled: bool = Field(default=False, description="Send logs to external systems") 

1256 

1257 # Performance Tracking Configuration 

1258 performance_tracking_enabled: bool = Field(default=True, description="Enable performance tracking and metrics") 

1259 performance_threshold_database_query_ms: float = Field(default=100.0, description="Alert threshold for database queries (ms)") 

1260 performance_threshold_tool_invocation_ms: float = Field(default=2000.0, description="Alert threshold for tool invocations (ms)") 

1261 performance_threshold_resource_read_ms: float = Field(default=1000.0, description="Alert threshold for resource reads (ms)") 

1262 performance_threshold_http_request_ms: float = Field(default=500.0, description="Alert threshold for HTTP requests (ms)") 

1263 performance_degradation_multiplier: float = Field(default=1.5, description="Alert if performance degrades by this multiplier vs baseline") 

1264 

1265 # Audit Trail Configuration 

1266 # Audit trail logging is disabled by default for performance. 

1267 # When enabled, it logs all CRUD operations (create, read, update, delete) on resources. 

1268 # WARNING: This causes a database write on every API request and can cause significant load. 

1269 audit_trail_enabled: bool = Field(default=False, description="Enable audit trail logging to database for compliance") 

1270 permission_audit_enabled: bool = Field( 

1271 default=False, 

1272 description="Enable permission audit logging for RBAC checks (writes a row per permission check)", 

1273 ) 

1274 

1275 # Security Logging Configuration 

1276 # Security event logging is disabled by default for performance. 

1277 # When enabled, it logs authentication attempts, authorization failures, and security events. 

1278 # WARNING: "all" level logs every request and can cause significant database write load. 

1279 security_logging_enabled: bool = Field(default=False, description="Enable security event logging to database") 

1280 security_logging_level: Literal["all", "failures_only", "high_severity"] = Field( 

1281 default="failures_only", 

1282 description=( 

1283 "Security logging level: " 

1284 "'all' = log all events including successful auth (high DB load), " 

1285 "'failures_only' = log only authentication/authorization failures, " 

1286 "'high_severity' = log only high/critical severity events" 

1287 ), 

1288 ) 

1289 security_failed_auth_threshold: int = Field(default=5, description="Failed auth attempts before high severity alert") 

1290 security_threat_score_alert: float = Field(default=0.7, description="Threat score threshold for alerts (0.0-1.0)") 

1291 security_rate_limit_window_minutes: int = Field(default=5, description="Time window for rate limit checks (minutes)") 

1292 

1293 # API Token Tracking Configuration 

1294 # Controls how token usage and last_used timestamps are tracked 

1295 token_usage_logging_enabled: bool = Field(default=True, description="Enable API token usage logging middleware") 

1296 token_last_used_update_interval_minutes: int = Field(default=5, ge=1, le=1440, description="Minimum minutes between last_used timestamp updates (rate-limits DB writes)") 

1297 

1298 # Metrics Aggregation Configuration 

1299 metrics_aggregation_enabled: bool = Field(default=True, description="Enable automatic log aggregation into performance metrics") 

1300 metrics_aggregation_backfill_hours: int = Field(default=6, ge=0, le=168, description="Hours of structured logs to backfill into performance metrics on startup") 

1301 metrics_aggregation_window_minutes: int = Field(default=5, description="Time window for metrics aggregation (minutes)") 

1302 metrics_aggregation_auto_start: bool = Field(default=False, description="Automatically run the log aggregation loop on application startup") 

1303 yield_batch_size: int = Field( 

1304 default=1000, 

1305 ge=100, 

1306 le=100000, 

1307 description="Number of rows fetched per batch when streaming hourly metric data from the database. " 

1308 "Used to limit memory usage during aggregation and percentile calculations. " 

1309 "Smaller values reduce memory footprint but increase DB round-trips; larger values improve throughput " 

1310 "at the cost of higher memory usage.", 

1311 ) 

1312 

1313 # Execution Metrics Recording 

1314 # Controls whether tool/resource/prompt/server/A2A execution metrics are written to the database. 

1315 # Disable if using external observability (ELK, Datadog, Splunk). 

1316 # Note: Does NOT affect log aggregation (METRICS_AGGREGATION_ENABLED) or Prometheus (ENABLE_METRICS). 

1317 db_metrics_recording_enabled: bool = Field( 

1318 default=True, description="Enable recording of execution metrics (tool/resource/prompt/server/A2A) to database. Disable if using external observability." 

1319 ) 

1320 

1321 # Metrics Buffer Configuration (for batching tool/resource/prompt metrics writes) 

1322 metrics_buffer_enabled: bool = Field(default=True, description="Enable buffered metrics writes (reduces DB pressure under load)") 

1323 metrics_buffer_flush_interval: int = Field(default=60, ge=5, le=300, description="Seconds between automatic metrics buffer flushes") 

1324 metrics_buffer_max_size: int = Field(default=1000, ge=100, le=10000, description="Maximum buffered metrics before forced flush") 

1325 

1326 # Metrics Cache Configuration (for caching aggregate metrics queries) 

1327 metrics_cache_enabled: bool = Field(default=True, description="Enable in-memory caching for aggregate metrics queries") 

1328 metrics_cache_ttl_seconds: int = Field(default=60, ge=1, le=300, description="TTL for cached aggregate metrics in seconds") 

1329 

1330 # Metrics Cleanup Configuration (automatic deletion of old metrics) 

1331 metrics_cleanup_enabled: bool = Field(default=True, description="Enable automatic cleanup of old metrics data") 

1332 metrics_retention_days: int = Field(default=7, ge=1, le=365, description="Days to retain raw metrics before cleanup (fallback when rollup disabled)") 

1333 metrics_cleanup_interval_hours: int = Field(default=1, ge=1, le=168, description="Hours between automatic cleanup runs") 

1334 metrics_cleanup_batch_size: int = Field(default=10000, ge=100, le=100000, description="Batch size for metrics deletion (prevents long locks)") 

1335 

1336 # Metrics Rollup Configuration (hourly aggregation for historical queries) 

1337 metrics_rollup_enabled: bool = Field(default=True, description="Enable hourly metrics rollup for efficient historical queries") 

1338 metrics_rollup_interval_hours: int = Field(default=1, ge=1, le=24, description="Hours between rollup runs") 

1339 metrics_rollup_retention_days: int = Field(default=365, ge=30, le=3650, description="Days to retain hourly rollup data") 

1340 metrics_rollup_late_data_hours: int = Field( 

1341 default=1, ge=1, le=48, description="Hours to re-process on each run to catch late-arriving data (smaller = less CPU, larger = more tolerance for delayed metrics)" 

1342 ) 

1343 metrics_delete_raw_after_rollup: bool = Field(default=True, description="Delete raw metrics after hourly rollup exists (recommended for production)") 

1344 metrics_delete_raw_after_rollup_hours: int = Field(default=1, ge=1, le=8760, description="Hours to retain raw metrics when hourly rollup exists") 

1345 

1346 # Auth Cache Configuration (reduces DB queries during authentication) 

1347 auth_cache_enabled: bool = Field(default=True, description="Enable Redis/in-memory caching for authentication data (user, team, revocation)") 

1348 auth_cache_user_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for cached user data") 

1349 auth_cache_revocation_ttl: int = Field(default=30, ge=5, le=120, description="TTL in seconds for token revocation cache (security-critical, keep short)") 

1350 auth_cache_team_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for team membership cache") 

1351 auth_cache_role_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user role in team cache") 

1352 auth_cache_teams_enabled: bool = Field(default=True, description="Enable caching for get_user_teams() (default: true)") 

1353 auth_cache_teams_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user teams list cache") 

1354 auth_cache_batch_queries: bool = Field(default=True, description="Batch auth DB queries into single call (reduces 3 queries to 1)") 

1355 

1356 # Registry Cache Configuration (reduces DB queries for list endpoints) 

1357 registry_cache_enabled: bool = Field(default=True, description="Enable caching for registry list endpoints (tools, prompts, resources, etc.)") 

1358 registry_cache_tools_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for tools list cache") 

1359 registry_cache_prompts_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for prompts list cache") 

1360 registry_cache_resources_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for resources list cache") 

1361 registry_cache_agents_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for agents list cache") 

1362 registry_cache_servers_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for servers list cache") 

1363 registry_cache_gateways_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for gateways list cache") 

1364 registry_cache_catalog_ttl: int = Field(default=300, ge=60, le=600, description="TTL in seconds for catalog servers list cache (external catalog, changes infrequently)") 

1365 

1366 # Tool Lookup Cache Configuration (reduces hot-path DB lookups in invoke_tool) 

1367 tool_lookup_cache_enabled: bool = Field(default=True, description="Enable tool lookup cache (tool name -> tool config)") 

1368 tool_lookup_cache_ttl_seconds: int = Field(default=60, ge=5, le=600, description="TTL in seconds for tool lookup cache entries") 

1369 tool_lookup_cache_negative_ttl_seconds: int = Field(default=10, ge=1, le=60, description="TTL in seconds for negative tool lookup cache entries") 

1370 tool_lookup_cache_l1_maxsize: int = Field(default=10000, ge=100, le=1000000, description="Max entries for in-memory tool lookup cache (L1)") 

1371 tool_lookup_cache_l2_enabled: bool = Field(default=True, description="Enable Redis-backed tool lookup cache (L2) when cache_type=redis") 

1372 

1373 # Admin Stats Cache Configuration (reduces dashboard query overhead) 

1374 admin_stats_cache_enabled: bool = Field(default=True, description="Enable caching for admin dashboard statistics") 

1375 admin_stats_cache_system_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for system stats cache") 

1376 admin_stats_cache_observability_ttl: int = Field(default=30, ge=10, le=120, description="TTL in seconds for observability stats cache") 

1377 admin_stats_cache_tags_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for tags listing cache") 

1378 admin_stats_cache_plugins_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for plugin stats cache") 

1379 admin_stats_cache_performance_ttl: int = Field(default=60, ge=15, le=300, description="TTL in seconds for performance aggregates cache") 

1380 

1381 # Team Member Count Cache Configuration (reduces N+1 queries in admin UI) 

1382 team_member_count_cache_enabled: bool = Field(default=True, description="Enable Redis caching for team member counts") 

1383 team_member_count_cache_ttl: int = Field(default=300, ge=30, le=3600, description="TTL in seconds for team member count cache (default: 5 minutes)") 

1384 

1385 # Log Search Configuration 

1386 log_search_max_results: int = Field(default=1000, description="Maximum results per log search query") 

1387 log_retention_days: int = Field(default=30, description="Number of days to retain logs in database") 

1388 

1389 # External Log Integration Configuration 

1390 elasticsearch_enabled: bool = Field(default=False, description="Send logs to Elasticsearch") 

1391 elasticsearch_url: Optional[str] = Field(default=None, description="Elasticsearch cluster URL") 

1392 elasticsearch_index_prefix: str = Field(default="mcpgateway-logs", description="Elasticsearch index prefix") 

1393 syslog_enabled: bool = Field(default=False, description="Send logs to syslog") 

1394 syslog_host: Optional[str] = Field(default=None, description="Syslog server host") 

1395 syslog_port: int = Field(default=514, description="Syslog server port") 

1396 webhook_logging_enabled: bool = Field(default=False, description="Send logs to webhook endpoints") 

1397 webhook_logging_urls: List[str] = Field(default_factory=list, description="Webhook URLs for log delivery") 

1398 

1399 @field_validator("log_level", mode="before") 

1400 @classmethod 

1401 def validate_log_level(cls, v: str) -> str: 

1402 """ 

1403 Normalize and validate the log level value. 

1404 

1405 Ensures that the input string matches one of the allowed log levels, 

1406 case-insensitively. The value is uppercased before validation so that 

1407 "debug", "Debug", etc. are all accepted as "DEBUG". 

1408 

1409 Args: 

1410 v (str): The log level string provided via configuration or environment. 

1411 

1412 Returns: 

1413 str: The validated and normalized (uppercase) log level. 

1414 

1415 Raises: 

1416 ValueError: If the provided value is not one of 

1417 {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}. 

1418 """ 

1419 allowed = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} 

1420 v_up = v.upper() 

1421 if v_up not in allowed: 

1422 raise ValueError(f"Invalid log_level: {v}") 

1423 return v_up 

1424 

1425 # Transport 

1426 mcpgateway_ws_relay_enabled: bool = Field(default=False, description="Enable WebSocket JSON-RPC relay endpoint at /ws") 

1427 mcpgateway_reverse_proxy_enabled: bool = Field(default=False, description="Enable reverse-proxy transport endpoints under /reverse-proxy/*") 

1428 transport_type: str = "all" # http, ws, sse, all 

1429 websocket_ping_interval: int = 30 # seconds 

1430 sse_retry_timeout: int = 5000 # milliseconds - client retry interval on disconnect 

1431 sse_keepalive_enabled: bool = True # Enable SSE keepalive events 

1432 sse_keepalive_interval: int = 30 # seconds between keepalive events 

1433 sse_send_timeout: float = 30.0 # seconds - timeout for ASGI send() calls, protects against hung connections 

1434 sse_rapid_yield_window_ms: int = 1000 # milliseconds - time window for rapid yield detection 

1435 sse_rapid_yield_max: int = 50 # max yields per window before assuming client disconnected (0=disabled) 

1436 

1437 # Gateway/Server Connection Timeout 

1438 # Timeout in seconds for HTTP requests to registered gateways and MCP servers. 

1439 # Used by: GatewayService, ToolService, ServerService for health checks and tool invocations. 

1440 # Note: Previously part of federation settings, retained for gateway connectivity. 

1441 federation_timeout: int = 120 

1442 

1443 # SSO 

1444 # For sso_issuers strip out quotes to ensure we're passing valid JSON via env 

1445 sso_issuers: Optional[list[HttpUrl]] = Field(default=None) 

1446 

1447 @field_validator("sso_issuers", mode="before") 

1448 @classmethod 

1449 def parse_issuers(cls, v: Any) -> list[str]: 

1450 """ 

1451 Parse and validate the SSO issuers configuration value. 

1452 

1453 Accepts: 

1454 - JSON array string: '["https://idp1.com", "https://idp2.com"]' 

1455 - Comma-separated string: "https://idp1.com, https://idp2.com" 

1456 - Empty string or None → [] 

1457 - Already-parsed list 

1458 

1459 Args: 

1460 v: The input value to parse. 

1461 

1462 Returns: 

1463 list[str]: Parsed list of issuer URLs. 

1464 

1465 Raises: 

1466 ValueError: If the input is not a valid format. 

1467 """ 

1468 if v is None: 

1469 return [] 

1470 if isinstance(v, list): 

1471 return v 

1472 if isinstance(v, str): 

1473 s = v.strip() 

1474 if not s: 

1475 return [] 

1476 if s.startswith("["): 

1477 try: 

1478 parsed = orjson.loads(s) 

1479 return parsed if isinstance(parsed, list) else [] 

1480 except orjson.JSONDecodeError: 

1481 raise ValueError(f"Invalid JSON for SSO_ISSUERS: {v!r}") 

1482 # Fallback to comma-separated parsing 

1483 return [item.strip() for item in s.split(",") if item.strip()] 

1484 raise ValueError("Invalid type for SSO_ISSUERS") 

1485 

1486 # Resources 

1487 resource_cache_size: int = 1000 

1488 resource_cache_ttl: int = 3600 # seconds 

1489 max_resource_size: int = 10 * 1024 * 1024 # 10MB 

1490 allowed_mime_types: Set[str] = { 

1491 "text/plain", 

1492 "text/markdown", 

1493 "text/html", 

1494 "application/json", 

1495 "application/xml", 

1496 "image/png", 

1497 "image/jpeg", 

1498 "image/gif", 

1499 } 

1500 

1501 # Tools 

1502 tool_timeout: int = 60 # seconds 

1503 max_tool_retries: int = 3 

1504 tool_rate_limit: int = 100 # requests per minute 

1505 tool_concurrent_limit: int = 10 

1506 

1507 # MCP Session Pool - reduces per-request latency from ~20ms to ~1-2ms 

1508 # Disabled by default for safety. Enable explicitly in production after testing. 

1509 mcp_session_pool_enabled: bool = False 

1510 mcp_session_pool_max_per_key: int = 10 # Max sessions per (URL, identity, transport) 

1511 mcp_session_pool_ttl: float = 300.0 # Session TTL in seconds 

1512 mcp_session_pool_health_check_interval: float = 60.0 # Idle time before health check (aligned with health_check_interval) 

1513 mcp_session_pool_acquire_timeout: float = 30.0 # Timeout waiting for session slot 

1514 mcp_session_pool_create_timeout: float = 30.0 # Timeout creating new session 

1515 mcp_session_pool_circuit_breaker_threshold: int = 5 # Failures before circuit opens 

1516 mcp_session_pool_circuit_breaker_reset: float = 60.0 # Seconds before circuit resets 

1517 mcp_session_pool_idle_eviction: float = 600.0 # Evict idle pool keys after this time 

1518 # Transport timeout for pooled sessions (default 30s to match MCP SDK default). 

1519 # This timeout applies to all HTTP operations (connect, read, write) on pooled sessions. 

1520 # Use a higher value for deployments with long-running tool calls. 

1521 mcp_session_pool_transport_timeout: float = 30.0 

1522 # Force explicit RPC (list_tools) on gateway health checks even when session is fresh. 

1523 # Off by default: pool's internal staleness check (idle > health_check_interval) handles this. 

1524 # Enable for stricter health verification at the cost of ~5ms latency per check. 

1525 mcp_session_pool_explicit_health_rpc: bool = False 

1526 # Configurable health check chain - ordered list of methods to try. 

1527 # Options: ping, list_tools, list_prompts, list_resources, skip 

1528 # Default: ping,skip - try lightweight ping, skip if unsupported (for legacy servers) 

1529 mcp_session_pool_health_check_methods: List[str] = ["ping", "skip"] 

1530 # Timeout in seconds for each health check attempt 

1531 mcp_session_pool_health_check_timeout: float = 5.0 

1532 mcp_session_pool_identity_headers: List[str] = ["authorization", "x-tenant-id", "x-user-id", "x-api-key", "cookie", "x-mcp-session-id"] 

1533 # Timeout for session/transport cleanup operations (__aexit__ calls). 

1534 # This prevents CPU spin loops when internal tasks (like post_writer waiting on 

1535 # memory streams) don't respond to cancellation. Does NOT affect tool execution 

1536 # time - only cleanup of idle/released sessions. Increase if you see frequent 

1537 # "cleanup timed out" warnings; decrease for faster recovery from spin loops. 

1538 mcp_session_pool_cleanup_timeout: float = 5.0 

1539 

1540 # Timeout for SSE task group cleanup (seconds). 

1541 # When an SSE connection is cancelled, this controls how long to wait for 

1542 # internal tasks to respond before forcing cleanup. Shorter values reduce 

1543 # CPU waste during anyio _deliver_cancellation spin loops but may interrupt 

1544 # legitimate cleanup. Only affects cancelled connections, not normal operation. 

1545 # See: https://github.com/agronholm/anyio/issues/695 

1546 sse_task_group_cleanup_timeout: float = 5.0 

1547 

1548 # ========================================================================= 

1549 # EXPERIMENTAL: anyio _deliver_cancellation spin loop workaround 

1550 # ========================================================================= 

1551 # When enabled, monkey-patches anyio's CancelScope._deliver_cancellation to 

1552 # limit the number of retry iterations. This prevents 100% CPU spin loops 

1553 # when tasks don't respond to CancelledError (anyio issue #695). 

1554 # 

1555 # WARNING: This is a workaround for an upstream issue. May be removed when 

1556 # anyio or MCP SDK fix the underlying problem. Enable only if you experience 

1557 # CPU spin loops during SSE/MCP connection cleanup. 

1558 # 

1559 # Trade-offs when enabled: 

1560 # - Prevents indefinite CPU spin (good) 

1561 # - May leave some tasks uncancelled after max iterations (usually harmless) 

1562 # - Worker recycling (GUNICORN_MAX_REQUESTS) cleans up orphaned tasks 

1563 # 

1564 # See: https://github.com/agronholm/anyio/issues/695 

1565 # Env: ANYIO_CANCEL_DELIVERY_PATCH_ENABLED 

1566 anyio_cancel_delivery_patch_enabled: bool = False 

1567 

1568 # Maximum iterations for _deliver_cancellation before giving up. 

1569 # Only used when anyio_cancel_delivery_patch_enabled=True. 

1570 # Higher values = more attempts to cancel tasks, but longer potential spin. 

1571 # Lower values = faster recovery, but more orphaned tasks. 

1572 # Env: ANYIO_CANCEL_DELIVERY_MAX_ITERATIONS 

1573 anyio_cancel_delivery_max_iterations: int = 100 

1574 

1575 # Session Affinity 

1576 mcpgateway_session_affinity_enabled: bool = False # Global session affinity toggle 

1577 mcpgateway_session_affinity_ttl: int = 300 # Session affinity binding TTL 

1578 mcpgateway_session_affinity_max_sessions: int = 1 # Max sessions per identity for affinity 

1579 mcpgateway_pool_rpc_forward_timeout: int = 30 # Timeout for forwarding RPC requests to owner worker 

1580 

1581 # Prompts 

1582 prompt_cache_size: int = 100 

1583 max_prompt_size: int = 100 * 1024 # 100KB 

1584 prompt_render_timeout: int = 10 # seconds 

1585 

1586 # Health Checks 

1587 # Interval in seconds between health checks (aligned with mcp_session_pool_health_check_interval) 

1588 health_check_interval: int = 60 

1589 # Timeout in seconds for each health check request 

1590 health_check_timeout: int = 5 

1591 # Per-check timeout (seconds) to bound total time of one gateway health check 

1592 # Env: GATEWAY_HEALTH_CHECK_TIMEOUT 

1593 gateway_health_check_timeout: float = 5.0 

1594 # Consecutive failures before marking gateway offline 

1595 unhealthy_threshold: int = 3 

1596 # Max concurrent health checks per worker 

1597 max_concurrent_health_checks: int = 10 

1598 

1599 # Auto-refresh tools/resources/prompts from gateways during health checks 

1600 # When enabled, tools/resources/prompts are fetched and synced with DB during health checks 

1601 auto_refresh_servers: bool = Field(default=False, description="Enable automatic tool/resource/prompt refresh during gateway health checks") 

1602 

1603 # Per-gateway refresh configuration (used when auto_refresh_servers is True) 

1604 # Gateways can override this with their own refresh_interval_seconds 

1605 gateway_auto_refresh_interval: int = Field(default=300, ge=60, description="Default refresh interval in seconds for gateway tools/resources/prompts sync (minimum 60 seconds)") 

1606 

1607 # Validation Gateway URL 

1608 gateway_validation_timeout: int = 5 # seconds 

1609 gateway_max_redirects: int = 5 

1610 

1611 filelock_name: str = "gateway_service_leader.lock" 

1612 

1613 # Default Roots 

1614 default_roots: List[str] = [] 

1615 

1616 # Database 

1617 db_driver: str = "mariadb+mariadbconnector" 

1618 db_pool_size: int = 200 

1619 db_max_overflow: int = 10 

1620 db_pool_timeout: int = 30 

1621 db_pool_recycle: int = 3600 

1622 db_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total) 

1623 db_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter 

1624 db_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap) 

1625 

1626 # Database Performance Optimization 

1627 use_postgresdb_percentiles: bool = Field( 

1628 default=True, 

1629 description="Use database-native percentile functions (percentile_cont) for performance metrics. " 

1630 "When enabled, PostgreSQL uses native SQL percentile calculations (5-10x faster). " 

1631 "When disabled or using SQLite, falls back to Python-based percentile calculations. " 

1632 "Recommended: true for PostgreSQL, auto-detected for SQLite.", 

1633 ) 

1634 

1635 # psycopg3-specific: Number of times a query must be executed before it's 

1636 # prepared server-side. Set to 0 to disable, 1 to prepare immediately. 

1637 # Default of 5 balances memory usage with query performance. 

1638 db_prepare_threshold: int = Field(default=5, ge=0, le=100, description="psycopg3 prepare_threshold for auto-prepared statements") 

1639 

1640 # Connection pool class: "auto" (default), "null", or "queue" 

1641 # - "auto": Uses NullPool when PgBouncer detected, QueuePool otherwise 

1642 # - "null": Always use NullPool (recommended with PgBouncer - lets PgBouncer handle pooling) 

1643 # - "queue": Always use QueuePool (application-side pooling) 

1644 db_pool_class: Literal["auto", "null", "queue"] = Field( 

1645 default="auto", 

1646 description="Connection pool class: auto (NullPool with PgBouncer), null, or queue", 

1647 ) 

1648 

1649 # Pre-ping connections before checkout (validates connection is alive) 

1650 # - "auto": Enabled for non-PgBouncer, disabled for PgBouncer (default) 

1651 # - "true": Always enable (adds SELECT 1 overhead but catches stale connections) 

1652 # - "false": Always disable 

1653 db_pool_pre_ping: Literal["auto", "true", "false"] = Field( 

1654 default="auto", 

1655 description="Pre-ping connections: auto, true, or false", 

1656 ) 

1657 

1658 # SQLite busy timeout: Maximum time (ms) SQLite will wait to acquire a database lock before returning SQLITE_BUSY. 

1659 db_sqlite_busy_timeout: int = Field(default=5000, ge=1000, le=60000, description="SQLite busy timeout in milliseconds (default: 5000ms)") 

1660 

1661 # Cache 

1662 cache_type: Literal["redis", "memory", "none", "database"] = "database" # memory or redis or database 

1663 redis_url: Optional[str] = "redis://localhost:6379/0" 

1664 cache_prefix: str = "mcpgw:" 

1665 session_ttl: int = 3600 

1666 message_ttl: int = 600 

1667 redis_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total) 

1668 redis_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter 

1669 redis_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap) 

1670 

1671 # GlobalConfig In-Memory Cache (Issue #1715) 

1672 # Caches GlobalConfig (passthrough headers) to eliminate redundant DB queries 

1673 global_config_cache_ttl: int = Field( 

1674 default=60, 

1675 ge=5, 

1676 le=3600, 

1677 description="TTL in seconds for GlobalConfig in-memory cache (default: 60)", 

1678 ) 

1679 

1680 # A2A Stats In-Memory Cache 

1681 # Caches A2A agent counts (total, active) to eliminate redundant COUNT queries 

1682 a2a_stats_cache_ttl: int = Field( 

1683 default=30, 

1684 ge=5, 

1685 le=3600, 

1686 description="TTL in seconds for A2A stats in-memory cache (default: 30)", 

1687 ) 

1688 

1689 # Redis Parser Configuration (ADR-026) 

1690 # hiredis C parser provides up to 83x faster response parsing for large responses 

1691 redis_parser: Literal["auto", "hiredis", "python"] = Field( 

1692 default="auto", 

1693 description="Redis protocol parser: auto (use hiredis if available), hiredis (require hiredis), python (pure-Python)", 

1694 ) 

1695 

1696 # Redis Connection Pool - Performance Optimized 

1697 redis_decode_responses: bool = Field(default=True, description="Return strings instead of bytes") 

1698 redis_max_connections: int = Field(default=50, description="Connection pool size per worker") 

1699 redis_socket_timeout: float = Field(default=2.0, description="Socket read/write timeout in seconds") 

1700 redis_socket_connect_timeout: float = Field(default=2.0, description="Connection timeout in seconds") 

1701 redis_retry_on_timeout: bool = Field(default=True, description="Retry commands on timeout") 

1702 redis_health_check_interval: int = Field(default=30, description="Seconds between connection health checks (0=disabled)") 

1703 

1704 # Redis Leader Election - Multi-Node Deployments 

1705 redis_leader_ttl: int = Field(default=15, description="Leader election TTL in seconds") 

1706 redis_leader_key: str = Field(default="gateway_service_leader", description="Leader key name") 

1707 redis_leader_heartbeat_interval: int = Field(default=5, description="Seconds between leader heartbeats") 

1708 

1709 # streamable http transport 

1710 use_stateful_sessions: bool = False # Set to False to use stateless sessions without event store 

1711 json_response_enabled: bool = True # Enable JSON responses instead of SSE streams 

1712 streamable_http_max_events_per_stream: int = 100 # Ring buffer capacity per stream 

1713 streamable_http_event_ttl: int = 3600 # Event stream TTL in seconds (1 hour) 

1714 

1715 # Development 

1716 dev_mode: bool = False 

1717 reload: bool = False 

1718 debug: bool = False 

1719 

1720 # Observability (OpenTelemetry) 

1721 otel_enable_observability: bool = Field(default=False, description="Enable OpenTelemetry observability") 

1722 otel_traces_exporter: str = Field(default="otlp", description="Traces exporter: otlp, jaeger, zipkin, console, none") 

1723 otel_exporter_otlp_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint (e.g., http://localhost:4317)") 

1724 otel_exporter_otlp_protocol: str = Field(default="grpc", description="OTLP protocol: grpc or http") 

1725 otel_exporter_otlp_insecure: bool = Field(default=True, description="Use insecure connection for OTLP") 

1726 otel_exporter_otlp_headers: Optional[str] = Field(default=None, description="OTLP headers (comma-separated key=value)") 

1727 otel_exporter_jaeger_endpoint: Optional[str] = Field(default=None, description="Jaeger endpoint") 

1728 otel_exporter_zipkin_endpoint: Optional[str] = Field(default=None, description="Zipkin endpoint") 

1729 otel_service_name: str = Field(default="mcp-gateway", description="Service name for traces") 

1730 otel_resource_attributes: Optional[str] = Field(default=None, description="Resource attributes (comma-separated key=value)") 

1731 otel_bsp_max_queue_size: int = Field(default=2048, description="Max queue size for batch span processor") 

1732 otel_bsp_max_export_batch_size: int = Field(default=512, description="Max export batch size") 

1733 otel_bsp_schedule_delay: int = Field(default=5000, description="Schedule delay in milliseconds") 

1734 

1735 # =================================== 

1736 # Well-Known URI Configuration 

1737 # =================================== 

1738 

1739 # Enable well-known URI endpoints 

1740 well_known_enabled: bool = True 

1741 

1742 # robots.txt content (default: disallow all crawling for private API) 

1743 well_known_robots_txt: str = """User-agent: * 

1744Disallow: / 

1745 

1746# ContextForge is a private API gateway 

1747# Public crawling is disabled by default""" 

1748 

1749 # security.txt content (optional, user-defined) 

1750 # Example: "Contact: security@example.com\nExpires: 2025-12-31T23:59:59Z\nPreferred-Languages: en" 

1751 well_known_security_txt: str = "" 

1752 

1753 # Enable security.txt only if content is provided 

1754 well_known_security_txt_enabled: bool = False 

1755 

1756 # Additional custom well-known files (JSON format) 

1757 # Example: {"ai.txt": "This service uses AI for...", "dnt-policy.txt": "Do Not Track policy..."} 

1758 well_known_custom_files: str = "{}" 

1759 

1760 # Cache control for well-known files (seconds) 

1761 well_known_cache_max_age: int = 3600 # 1 hour default 

1762 

1763 # =================================== 

1764 # Performance / Startup Tuning 

1765 # =================================== 

1766 

1767 slug_refresh_batch_size: int = Field(default=1000, description="Batch size for gateway/tool slug refresh at startup") 

1768 model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore") 

1769 

1770 gateway_tool_name_separator: str = "-" 

1771 valid_slug_separator_regexp: ClassVar[str] = r"^(-{1,2}|[_.])$" 

1772 

1773 @field_validator("gateway_tool_name_separator") 

1774 @classmethod 

1775 def must_be_allowed_sep(cls, v: str) -> str: 

1776 """Validate the gateway tool name separator. 

1777 

1778 Args: 

1779 v: The separator value to validate. 

1780 

1781 Returns: 

1782 The validated separator, defaults to '-' if invalid. 

1783 

1784 Examples: 

1785 >>> Settings.must_be_allowed_sep('-') 

1786 '-' 

1787 >>> Settings.must_be_allowed_sep('--') 

1788 '--' 

1789 >>> Settings.must_be_allowed_sep('_') 

1790 '_' 

1791 >>> Settings.must_be_allowed_sep('.') 

1792 '.' 

1793 >>> Settings.must_be_allowed_sep('invalid') 

1794 '-' 

1795 """ 

1796 if not re.fullmatch(cls.valid_slug_separator_regexp, v): 

1797 logger.warning( 

1798 f"Invalid gateway_tool_name_separator '{v}'. Must be '-', '--', '_' or '.'. Defaulting to '-'.", 

1799 stacklevel=2, 

1800 ) 

1801 return "-" 

1802 return v 

1803 

1804 @property 

1805 def custom_well_known_files(self) -> Dict[str, str]: 

1806 """Parse custom well-known files from JSON string. 

1807 

1808 Returns: 

1809 Dict[str, str]: Parsed custom well-known files mapping filename to content. 

1810 """ 

1811 try: 

1812 return orjson.loads(self.well_known_custom_files) if self.well_known_custom_files else {} 

1813 except orjson.JSONDecodeError: 

1814 logger.error(f"Invalid JSON in WELL_KNOWN_CUSTOM_FILES: {self.well_known_custom_files}") 

1815 return {} 

1816 

1817 @field_validator("well_known_security_txt_enabled", mode="after") 

1818 @classmethod 

1819 def _auto_enable_security_txt(cls, v: Any, info: ValidationInfo) -> bool: 

1820 """Auto-enable security.txt if content is provided. 

1821 

1822 Args: 

1823 v: The current value of well_known_security_txt_enabled. 

1824 info: ValidationInfo containing field data. 

1825 

1826 Returns: 

1827 bool: True if security.txt content is provided, otherwise the original value. 

1828 """ 

1829 if info.data and "well_known_security_txt" in info.data: 

1830 return bool(info.data["well_known_security_txt"].strip()) 

1831 return bool(v) 

1832 

1833 # ------------------------------- 

1834 # Flexible list parsing for envs 

1835 # ------------------------------- 

1836 @field_validator( 

1837 "sso_entra_admin_groups", 

1838 "sso_trusted_domains", 

1839 "sso_auto_admin_domains", 

1840 "sso_github_admin_orgs", 

1841 "sso_google_admin_domains", 

1842 "insecure_queryparam_auth_allowed_hosts", 

1843 "mcpgateway_ui_hide_sections", 

1844 "mcpgateway_ui_hide_header_items", 

1845 mode="before", 

1846 ) 

1847 @classmethod 

1848 def _parse_list_from_env(cls, v: None | str | list[str]) -> list[str]: 

1849 """Parse list fields from environment values. 

1850 

1851 Accepts either JSON arrays (e.g. '["a","b"]') or comma-separated 

1852 strings (e.g. 'a,b'). Empty or None becomes an empty list. 

1853 

1854 Args: 

1855 v: The value to parse, can be None, list, or string. 

1856 

1857 Returns: 

1858 list: Parsed list of values. 

1859 

1860 Raises: 

1861 ValueError: If the value type is invalid for list field parsing. 

1862 """ 

1863 if v is None: 

1864 return [] 

1865 if isinstance(v, list): 

1866 return v 

1867 if isinstance(v, str): 

1868 s = v.strip() 

1869 if not s: 

1870 return [] 

1871 if s.startswith("["): 

1872 try: 

1873 parsed = orjson.loads(s) 

1874 return parsed if isinstance(parsed, list) else [] 

1875 except Exception: 

1876 logger.warning("Invalid JSON list in env for list field; falling back to CSV parsing") 

1877 # CSV fallback 

1878 return [item.strip() for item in s.split(",") if item.strip()] 

1879 raise ValueError("Invalid type for list field") 

1880 

1881 @field_validator("mcpgateway_ui_hide_sections", mode="after") 

1882 @classmethod 

1883 def _validate_ui_hide_sections(cls, value: list[str]) -> list[str]: 

1884 """Normalize and filter hidable UI sections. 

1885 

1886 Args: 

1887 value: Candidate section identifiers from environment/config. 

1888 

1889 Returns: 

1890 list[str]: Normalized unique section identifiers. 

1891 """ 

1892 normalized: list[str] = [] 

1893 seen: set[str] = set() 

1894 

1895 for item in value: 

1896 candidate = str(item).strip().lower() 

1897 if not candidate: 

1898 continue 

1899 candidate = UI_HIDE_SECTION_ALIASES.get(candidate, candidate) 

1900 if candidate not in UI_HIDABLE_SECTIONS: 

1901 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_SECTIONS item: %s", item) 

1902 continue 

1903 if candidate not in seen: 

1904 seen.add(candidate) 

1905 normalized.append(candidate) 

1906 

1907 return normalized 

1908 

1909 @field_validator("mcpgateway_ui_hide_header_items", mode="after") 

1910 @classmethod 

1911 def _validate_ui_hide_header_items(cls, value: list[str]) -> list[str]: 

1912 """Normalize and filter hidable header items. 

1913 

1914 Args: 

1915 value: Candidate header identifiers from environment/config. 

1916 

1917 Returns: 

1918 list[str]: Normalized unique header identifiers. 

1919 """ 

1920 normalized: list[str] = [] 

1921 seen: set[str] = set() 

1922 

1923 for item in value: 

1924 candidate = str(item).strip().lower() 

1925 if not candidate: 

1926 continue 

1927 if candidate not in UI_HIDABLE_HEADER_ITEMS: 

1928 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_HEADER_ITEMS item: %s", item) 

1929 continue 

1930 if candidate not in seen: 

1931 seen.add(candidate) 

1932 normalized.append(candidate) 

1933 

1934 return normalized 

1935 

1936 @property 

1937 def api_key(self) -> str: 

1938 """ 

1939 Generate API key from auth credentials. 

1940 

1941 Returns: 

1942 str: API key string in the format "username:password". 

1943 

1944 Examples: 

1945 >>> from mcpgateway.config import Settings 

1946 >>> settings = Settings(basic_auth_user="admin", basic_auth_password="secret") 

1947 >>> settings.api_key 

1948 'admin:secret' 

1949 >>> settings = Settings(basic_auth_user="user123", basic_auth_password="pass456") 

1950 >>> settings.api_key 

1951 'user123:pass456' 

1952 """ 

1953 return f"{self.basic_auth_user}:{self.basic_auth_password.get_secret_value()}" 

1954 

1955 @property 

1956 def supports_http(self) -> bool: 

1957 """Check if HTTP transport is enabled. 

1958 

1959 Returns: 

1960 bool: True if HTTP transport is enabled, False otherwise. 

1961 

1962 Examples: 

1963 >>> settings = Settings(transport_type="http") 

1964 >>> settings.supports_http 

1965 True 

1966 >>> settings = Settings(transport_type="all") 

1967 >>> settings.supports_http 

1968 True 

1969 >>> settings = Settings(transport_type="ws") 

1970 >>> settings.supports_http 

1971 False 

1972 """ 

1973 return self.transport_type in ["http", "all"] 

1974 

1975 @property 

1976 def supports_websocket(self) -> bool: 

1977 """Check if WebSocket transport is enabled. 

1978 

1979 Returns: 

1980 bool: True if WebSocket transport is enabled, False otherwise. 

1981 

1982 Examples: 

1983 >>> settings = Settings(transport_type="ws") 

1984 >>> settings.supports_websocket 

1985 True 

1986 >>> settings = Settings(transport_type="all") 

1987 >>> settings.supports_websocket 

1988 True 

1989 >>> settings = Settings(transport_type="http") 

1990 >>> settings.supports_websocket 

1991 False 

1992 """ 

1993 return self.transport_type in ["ws", "all"] 

1994 

1995 @property 

1996 def supports_sse(self) -> bool: 

1997 """Check if SSE transport is enabled. 

1998 

1999 Returns: 

2000 bool: True if SSE transport is enabled, False otherwise. 

2001 

2002 Examples: 

2003 >>> settings = Settings(transport_type="sse") 

2004 >>> settings.supports_sse 

2005 True 

2006 >>> settings = Settings(transport_type="all") 

2007 >>> settings.supports_sse 

2008 True 

2009 >>> settings = Settings(transport_type="http") 

2010 >>> settings.supports_sse 

2011 False 

2012 """ 

2013 return self.transport_type in ["sse", "all"] 

2014 

2015 class DatabaseSettings(TypedDict): 

2016 """TypedDict for SQLAlchemy database settings.""" 

2017 

2018 pool_size: int 

2019 max_overflow: int 

2020 pool_timeout: int 

2021 pool_recycle: int 

2022 connect_args: dict[str, Any] # consider more specific type if needed 

2023 

2024 @property 

2025 def database_settings(self) -> DatabaseSettings: 

2026 """ 

2027 Get SQLAlchemy database settings. 

2028 

2029 Returns: 

2030 DatabaseSettings: Dictionary containing SQLAlchemy database configuration options. 

2031 

2032 Examples: 

2033 >>> from mcpgateway.config import Settings 

2034 >>> s = Settings(database_url='sqlite:///./test.db') 

2035 >>> isinstance(s.database_settings, dict) 

2036 True 

2037 """ 

2038 return { 

2039 "pool_size": self.db_pool_size, 

2040 "max_overflow": self.db_max_overflow, 

2041 "pool_timeout": self.db_pool_timeout, 

2042 "pool_recycle": self.db_pool_recycle, 

2043 "connect_args": {"check_same_thread": False} if self.database_url.startswith("sqlite") else {}, 

2044 } 

2045 

2046 class CORSSettings(TypedDict): 

2047 """TypedDict for CORS settings.""" 

2048 

2049 allow_origins: NotRequired[List[str]] 

2050 allow_credentials: NotRequired[bool] 

2051 allow_methods: NotRequired[List[str]] 

2052 allow_headers: NotRequired[List[str]] 

2053 

2054 @property 

2055 def cors_settings(self) -> CORSSettings: 

2056 """Get CORS settings. 

2057 

2058 Returns: 

2059 CORSSettings: Dictionary containing CORS configuration options. 

2060 

2061 Examples: 

2062 >>> s = Settings(cors_enabled=True, allowed_origins={'http://localhost'}) 

2063 >>> cors = s.cors_settings 

2064 >>> cors['allow_origins'] 

2065 ['http://localhost'] 

2066 >>> cors['allow_credentials'] 

2067 True 

2068 >>> s2 = Settings(cors_enabled=False) 

2069 >>> s2.cors_settings 

2070 {} 

2071 """ 

2072 return ( 

2073 { 

2074 "allow_origins": list(self.allowed_origins), 

2075 "allow_credentials": True, 

2076 "allow_methods": ["*"], 

2077 "allow_headers": ["*"], 

2078 } 

2079 if self.cors_enabled 

2080 else {} 

2081 ) 

2082 

2083 def validate_transport(self) -> None: 

2084 """ 

2085 Validate transport configuration. 

2086 

2087 Raises: 

2088 ValueError: If the transport type is not one of the valid options. 

2089 

2090 Examples: 

2091 >>> from mcpgateway.config import Settings 

2092 >>> s = Settings(transport_type='http') 

2093 >>> s.validate_transport() # no error 

2094 >>> s2 = Settings(transport_type='invalid') 

2095 >>> try: 

2096 ... s2.validate_transport() 

2097 ... except ValueError as e: 

2098 ... print('error') 

2099 error 

2100 """ 

2101 # valid_types = {"http", "ws", "sse", "all"} 

2102 valid_types = {"sse", "streamablehttp", "all", "http"} 

2103 if self.transport_type not in valid_types: 

2104 raise ValueError(f"Invalid transport type. Must be one of: {valid_types}") 

2105 

2106 def validate_database(self) -> None: 

2107 """Validate database configuration. 

2108 

2109 Examples: 

2110 >>> from mcpgateway.config import Settings 

2111 >>> s = Settings(database_url='sqlite:///./test.db') 

2112 >>> s.validate_database() # Should create the directory if it does not exist 

2113 """ 

2114 if self.database_url.startswith("sqlite"): 

2115 db_path = Path(self.database_url.replace("sqlite:///", "")) 

2116 db_dir = db_path.parent 

2117 if not db_dir.exists(): 

2118 db_dir.mkdir(parents=True) 

2119 

2120 # Validation patterns for safe display (configurable) 

2121 validation_dangerous_html_pattern: str = ( 

2122 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>" 

2123 ) 

2124 

2125 validation_dangerous_js_pattern: str = r"(?i)(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)" 

2126 

2127 validation_allowed_url_schemes: List[str] = ["http://", "https://", "ws://", "wss://"] 

2128 

2129 # Character validation patterns 

2130 validation_name_pattern: str = r"^[a-zA-Z0-9_.\- ]+$" # Allow spaces for names (literal space, not \s to reject control chars) 

2131 validation_identifier_pattern: str = r"^[a-zA-Z0-9_\-\.]+$" # No spaces for IDs 

2132 validation_safe_uri_pattern: str = r"^[a-zA-Z0-9_\-.:/?=&%{}]+$" 

2133 validation_unsafe_uri_pattern: str = r'[<>"\'\\]' 

2134 validation_tool_name_pattern: str = r"^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$" # MCP tool naming per SEP-986 

2135 validation_tool_method_pattern: str = r"^[a-zA-Z][a-zA-Z0-9_\./-]*$" 

2136 

2137 # MCP-compliant size limits (configurable via env) 

2138 validation_max_name_length: int = 255 

2139 validation_max_description_length: int = 8192 # 8KB 

2140 validation_max_template_length: int = 65536 # 64KB 

2141 validation_max_content_length: int = 1048576 # 1MB 

2142 validation_max_json_depth: int = Field( 

2143 default=int(os.getenv("VALIDATION_MAX_JSON_DEPTH", "30")), 

2144 description=( 

2145 "Maximum allowed JSON nesting depth for tool/resource schemas. " 

2146 "Increased from 10 to 30 for compatibility with deeply nested schemas " 

2147 "like Notion MCP (issue #1542). Override with VALIDATION_MAX_JSON_DEPTH " 

2148 "environment variable. Minimum: 1, Maximum: 100" 

2149 ), 

2150 ge=1, 

2151 le=100, 

2152 ) 

2153 validation_max_url_length: int = 2048 

2154 validation_max_rpc_param_size: int = 262144 # 256KB 

2155 

2156 validation_max_method_length: int = 128 

2157 

2158 # Allowed MIME types 

2159 validation_allowed_mime_types: List[str] = [ 

2160 "text/plain", 

2161 "text/html", 

2162 "text/css", 

2163 "text/markdown", 

2164 "text/javascript", 

2165 "application/json", 

2166 "application/xml", 

2167 "application/pdf", 

2168 "image/png", 

2169 "image/jpeg", 

2170 "image/gif", 

2171 "image/svg+xml", 

2172 "application/octet-stream", 

2173 ] 

2174 

2175 # Rate limiting 

2176 validation_max_requests_per_minute: int = 60 

2177 

2178 # Header passthrough feature (disabled by default for security) 

2179 enable_header_passthrough: bool = Field(default=False, description="Enable HTTP header passthrough feature (WARNING: Security implications - only enable if needed)") 

2180 enable_overwrite_base_headers: bool = Field(default=False, description="Enable overwriting of base headers") 

2181 

2182 # Passthrough headers configuration 

2183 default_passthrough_headers: List[str] = Field(default_factory=list) 

2184 

2185 # Passthrough headers source priority 

2186 # - "env": Environment variable always wins (ideal for Kubernetes/containerized deployments) 

2187 # - "db": Database take precedence if configured, env as fallback (default) 

2188 # - "merge": Union of both sources - env provides base, other configuration in DB can add more headers 

2189 passthrough_headers_source: Literal["env", "db", "merge"] = Field( 

2190 default="db", 

2191 description="Source priority for passthrough headers: env (environment always wins), db (database wins, default), merge (combine both)", 

2192 ) 

2193 

2194 # =================================== 

2195 # Pagination Configuration 

2196 # =================================== 

2197 

2198 # Default number of items per page for paginated endpoints 

2199 pagination_default_page_size: int = Field(default=50, ge=1, le=1000, description="Default number of items per page") 

2200 

2201 # Maximum allowed items per page (prevents abuse) 

2202 pagination_max_page_size: int = Field(default=500, ge=1, le=10000, description="Maximum allowed items per page") 

2203 

2204 # Minimum items per page 

2205 pagination_min_page_size: int = Field(default=1, ge=1, description="Minimum items per page") 

2206 

2207 # Threshold for switching from offset to cursor-based pagination 

2208 pagination_cursor_threshold: int = Field(default=10000, ge=1, description="Threshold for cursor-based pagination") 

2209 

2210 # Enable cursor-based pagination globally 

2211 pagination_cursor_enabled: bool = Field(default=True, description="Enable cursor-based pagination") 

2212 

2213 # Default sort field for paginated queries 

2214 pagination_default_sort_field: str = Field(default="created_at", description="Default sort field") 

2215 

2216 # Default sort order for paginated queries 

2217 pagination_default_sort_order: str = Field(default="desc", pattern="^(asc|desc)$", description="Default sort order") 

2218 

2219 # Maximum offset allowed for offset-based pagination (prevents abuse) 

2220 pagination_max_offset: int = Field(default=100000, ge=0, description="Maximum offset for pagination") 

2221 

2222 # Cache pagination counts for performance (seconds) 

2223 pagination_count_cache_ttl: int = Field(default=300, ge=0, description="Cache TTL for pagination counts") 

2224 

2225 # Enable pagination links in API responses 

2226 pagination_include_links: bool = Field(default=True, description="Include pagination links") 

2227 

2228 # Base URL for pagination links (defaults to request URL) 

2229 pagination_base_url: Optional[str] = Field(default=None, description="Base URL for pagination links") 

2230 

2231 # Ed25519 keys for signing 

2232 enable_ed25519_signing: bool = Field(default=False, description="Enable Ed25519 signing for certificates") 

2233 prev_ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Previous Ed25519 private key for signing") 

2234 prev_ed25519_public_key: Optional[str] = Field(default=None, description="Derived previous Ed25519 public key") 

2235 ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Ed25519 private key for signing") 

2236 ed25519_public_key: Optional[str] = Field(default=None, description="Derived Ed25519 public key") 

2237 

2238 @model_validator(mode="after") 

2239 def derive_public_keys(self) -> "Settings": 

2240 """ 

2241 Derive public keys after all individual field validations are complete. 

2242 

2243 Returns: 

2244 Settings: The updated Settings instance with derived public keys. 

2245 """ 

2246 for private_key_field in ["ed25519_private_key", "prev_ed25519_private_key"]: 

2247 public_key_field = private_key_field.replace("private", "public") 

2248 

2249 # 1. Get the private key SecretStr object 

2250 private_key_secret: SecretStr = getattr(self, private_key_field) 

2251 

2252 # 2. Proceed only if a key is present and the public key hasn't been set 

2253 pem = private_key_secret.get_secret_value().strip() 

2254 if not pem: 

2255 continue 

2256 

2257 try: 

2258 # Load the private key 

2259 private_key = serialization.load_pem_private_key(pem.encode(), password=None) 

2260 if not isinstance(private_key, ed25519.Ed25519PrivateKey): 

2261 # This check is useful, though model_validator should not raise 

2262 # for an invalid key if the field validator has already passed. 

2263 continue 

2264 

2265 # Derive and PEM-encode the public key 

2266 public_key = private_key.public_key() 

2267 public_pem = public_key.public_bytes( 

2268 encoding=serialization.Encoding.PEM, 

2269 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

2270 ).decode() 

2271 

2272 # 3. Set the public key attribute directly on the model instance (self) 

2273 setattr(self, public_key_field, public_pem) 

2274 # logger.info(f"Derived and stored {public_key_field} automatically.") 

2275 

2276 except Exception: 

2277 logger.warning("Failed to derive public key for private_key") 

2278 # You can choose to raise an error here if a failure should halt model creation 

2279 

2280 return self 

2281 

2282 def __init__(self, **kwargs: Any) -> None: 

2283 """Initialize Settings with environment variable parsing. 

2284 

2285 Args: 

2286 **kwargs: Keyword arguments passed to parent Settings class 

2287 

2288 Raises: 

2289 ValueError: When environment variable parsing fails or produces invalid data 

2290 

2291 Examples: 

2292 >>> import os 

2293 >>> # Test with no environment variable set 

2294 >>> old_val = os.environ.get('DEFAULT_PASSTHROUGH_HEADERS') 

2295 >>> if 'DEFAULT_PASSTHROUGH_HEADERS' in os.environ: 

2296 ... del os.environ['DEFAULT_PASSTHROUGH_HEADERS'] 

2297 >>> s = Settings() 

2298 >>> s.default_passthrough_headers 

2299 ['X-Tenant-Id', 'X-Trace-Id'] 

2300 >>> # Restore original value if it existed 

2301 >>> if old_val is not None: 

2302 ... os.environ['DEFAULT_PASSTHROUGH_HEADERS'] = old_val 

2303 """ 

2304 super().__init__(**kwargs) 

2305 

2306 # Parse DEFAULT_PASSTHROUGH_HEADERS environment variable 

2307 default_value = os.environ.get("DEFAULT_PASSTHROUGH_HEADERS") 

2308 if default_value: 

2309 try: 

2310 # Try JSON parsing first 

2311 self.default_passthrough_headers = orjson.loads(default_value) 

2312 if not isinstance(self.default_passthrough_headers, list): 

2313 raise ValueError("Must be a JSON array") 

2314 except (orjson.JSONDecodeError, ValueError): 

2315 # Fallback to comma-separated parsing 

2316 self.default_passthrough_headers = [h.strip() for h in default_value.split(",") if h.strip()] 

2317 logger.info(f"Parsed comma-separated passthrough headers: {self.default_passthrough_headers}") 

2318 else: 

2319 # Safer defaults without Authorization header 

2320 self.default_passthrough_headers = ["X-Tenant-Id", "X-Trace-Id"] 

2321 

2322 # Configure environment-aware CORS origins if not explicitly set via env or kwargs 

2323 # Only apply defaults if using the default allowed_origins value 

2324 if not os.environ.get("ALLOWED_ORIGINS") and "allowed_origins" not in kwargs and self.allowed_origins == {"http://localhost", "http://localhost:4444"}: 

2325 if self.environment == "development": 

2326 self.allowed_origins = { 

2327 "http://localhost", 

2328 "http://localhost:3000", 

2329 "http://localhost:8080", 

2330 "http://127.0.0.1:3000", 

2331 "http://127.0.0.1:8080", 

2332 f"http://localhost:{self.port}", 

2333 f"http://127.0.0.1:{self.port}", 

2334 } 

2335 else: 

2336 # Production origins - construct from app_domain (extract hostname from HttpUrl) 

2337 app_domain_host = urlparse(str(self.app_domain)).hostname or "localhost" 

2338 self.allowed_origins = {f"https://{app_domain_host}", f"https://app.{app_domain_host}", f"https://admin.{app_domain_host}"} 

2339 

2340 # MCP transport auth policy: 

2341 # - If MCP_REQUIRE_AUTH is unset, derive it from AUTH_REQUIRED 

2342 # - If AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false is explicit, emit a warning 

2343 if self.mcp_require_auth is None: 

2344 self.mcp_require_auth = bool(self.auth_required) 

2345 logger.info( 

2346 "MCP_REQUIRE_AUTH not set; defaulting to %s to match AUTH_REQUIRED=%s.", 

2347 self.mcp_require_auth, 

2348 self.auth_required, 

2349 ) 

2350 elif self.auth_required and self.mcp_require_auth is False: 

2351 logger.warning("AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false. MCP endpoints (/servers/*/mcp) allow unauthenticated access to public items.") 

2352 

2353 # Validate proxy auth configuration 

2354 if not self.mcp_client_auth_enabled and self.trust_proxy_auth and not self.trust_proxy_auth_dangerously: 

2355 logger.warning( 

2356 "TRUST_PROXY_AUTH=true ignored because TRUST_PROXY_AUTH_DANGEROUSLY is false " 

2357 "while MCP_CLIENT_AUTH_ENABLED=false. Set TRUST_PROXY_AUTH_DANGEROUSLY=true " 

2358 "only behind a strictly trusted authentication proxy." 

2359 ) 

2360 self.trust_proxy_auth = False 

2361 elif not self.mcp_client_auth_enabled and self.trust_proxy_auth and self.trust_proxy_auth_dangerously: 

2362 logger.warning("TRUST_PROXY_AUTH_DANGEROUSLY=true acknowledged. Requests may trust identity headers from the upstream proxy.") 

2363 elif not self.mcp_client_auth_enabled and not self.trust_proxy_auth: 

2364 logger.warning( 

2365 "MCP client authentication is disabled but trust_proxy_auth is not set. " 

2366 "This is a security risk! Set TRUST_PROXY_AUTH=true only if ContextForge " 

2367 "is behind a trusted authentication proxy." 

2368 ) 

2369 

2370 if not self.auth_required and self.allow_unauthenticated_admin: 

2371 logger.warning("ALLOW_UNAUTHENTICATED_ADMIN=true acknowledged while AUTH_REQUIRED=false. Unauthenticated requests may receive admin context.") 

2372 

2373 # Masking value for all sensitive data 

2374 masked_auth_value: str = "*****" 

2375 

2376 def log_summary(self) -> None: 

2377 """ 

2378 Log a summary of the application settings. 

2379 

2380 Dumps the current settings to a dictionary while excluding sensitive 

2381 information such as `database_url` and `memcached_url`, and logs it 

2382 at the INFO level. 

2383 

2384 This method is useful for debugging or auditing purposes without 

2385 exposing credentials or secrets in logs. 

2386 """ 

2387 summary = self.model_dump(exclude={"database_url", "memcached_url"}) 

2388 logger.info(f"Application settings summary: {summary}") 

2389 

2390 ENABLE_METRICS: bool = Field(False, description="Enable Prometheus metrics endpoint at /metrics/prometheus (requires authentication)") 

2391 METRICS_EXCLUDED_HANDLERS: str = Field("", description="Comma-separated regex patterns for paths to exclude from metrics") 

2392 METRICS_NAMESPACE: str = Field("default", description="Prometheus metrics namespace") 

2393 METRICS_SUBSYSTEM: str = Field("", description="Prometheus metrics subsystem") 

2394 METRICS_CUSTOM_LABELS: str = Field("", description='Comma-separated "key=value" pairs for static custom labels') 

2395 

2396 

2397@lru_cache() 

2398def get_settings(**kwargs: Any) -> Settings: 

2399 """Get cached settings instance. 

2400 

2401 Args: 

2402 **kwargs: Keyword arguments to pass to the Settings setup. 

2403 

2404 Returns: 

2405 Settings: A cached instance of the Settings class. 

2406 

2407 Examples: 

2408 >>> settings = get_settings() 

2409 >>> isinstance(settings, Settings) 

2410 True 

2411 >>> # Second call returns the same cached instance 

2412 >>> settings2 = get_settings() 

2413 >>> settings is settings2 

2414 True 

2415 """ 

2416 # Instantiate a fresh Pydantic Settings object, 

2417 # loading from env vars or .env exactly once. 

2418 cfg = Settings(**kwargs) 

2419 # Validate that transport_type is correct; will 

2420 # raise if mis-configured. 

2421 cfg.validate_transport() 

2422 # Ensure sqlite DB directories exist if needed. 

2423 cfg.validate_database() 

2424 # Return the one-and-only Settings instance (cached). 

2425 return cfg 

2426 

2427 

2428def generate_settings_schema() -> dict[str, Any]: 

2429 """ 

2430 Return the JSON Schema describing the Settings model. 

2431 

2432 This schema can be used for validation or documentation purposes. 

2433 

2434 Returns: 

2435 dict: A dictionary representing the JSON Schema of the Settings model. 

2436 """ 

2437 return Settings.model_json_schema(mode="validation") 

2438 

2439 

2440# Lazy "instance" of settings 

2441class LazySettingsWrapper: 

2442 """Lazily initialize settings singleton on getattr""" 

2443 

2444 @property 

2445 def plugins(self) -> Any: 

2446 """Access plugin framework settings via ``settings.plugins``. 

2447 

2448 Returns a ``LazySettingsWrapper`` from the plugin framework that 

2449 provides lightweight ``@property`` accessors for startup-critical 

2450 fields and a ``__getattr__`` fallback to the full ``PluginsSettings``. 

2451 

2452 Returns: 

2453 The plugin framework settings wrapper. 

2454 """ 

2455 # First-Party 

2456 from mcpgateway.plugins.framework.settings import settings as _plugin_settings # pylint: disable=import-outside-toplevel 

2457 

2458 return _plugin_settings 

2459 

2460 def __getattr__(self, key: str) -> Any: 

2461 """Get the real settings object and forward to it 

2462 

2463 Args: 

2464 key: The key to fetch from settings 

2465 

2466 Returns: 

2467 Any: The value of the attribute on the settings 

2468 """ 

2469 return getattr(get_settings(), key) 

2470 

2471 

2472settings = LazySettingsWrapper() 

2473 

2474 

2475if __name__ == "__main__": 

2476 if "--schema" in sys.argv: 

2477 schema = generate_settings_schema() 

2478 print(orjson.dumps(schema, option=orjson.OPT_INDENT_2).decode()) 

2479 sys.exit(0) 

2480 settings.log_summary()