Coverage for mcpgateway / config.py: 99%

1008 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/config.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti, Manav Gupta 

6 

7ContextForge AI Gateway Configuration. 

8This module defines configuration settings for ContextForge AI Gateway using Pydantic. 

9It loads configuration from environment variables with sensible defaults. 

10 

11Environment variables: 

12- APP_NAME: Gateway name (default: "ContextForge") 

13- HOST: Host to bind to (default: "127.0.0.1") 

14- PORT: Port to listen on (default: 4444) 

15- DATABASE_URL: SQLite database URL (default: "sqlite:///./mcp.db") 

16- BASIC_AUTH_USER: Username for API Basic auth when enabled (default: "admin") 

17- BASIC_AUTH_PASSWORD: Password for API Basic auth when enabled (default: "changeme") 

18- LOG_LEVEL: Logging level (default: "INFO") 

19- SKIP_SSL_VERIFY: Disable SSL verification (default: False) 

20- AUTH_REQUIRED: Require authentication (default: True) 

21- TRANSPORT_TYPE: Transport mechanisms (default: "all") 

22- DOCS_ALLOW_BASIC_AUTH: Allow basic auth for docs (default: False) 

23- RESOURCE_CACHE_SIZE: Max cached resources (default: 1000) 

24- RESOURCE_CACHE_TTL: Cache TTL in seconds (default: 3600) 

25- TOOL_TIMEOUT: Tool invocation timeout (default: 60) 

26- PROMPT_CACHE_SIZE: Max cached prompts (default: 100) 

27- HEALTH_CHECK_INTERVAL: Gateway health check interval (default: 300) 

28- REQUIRE_TOKEN_EXPIRATION: Require JWT tokens to have expiration (default: True) 

29- REQUIRE_JTI: Require JTI claim in tokens for revocation (default: True) 

30- REQUIRE_USER_IN_DB: Require all users to exist in database (default: False) 

31 

32Examples: 

33 >>> from mcpgateway.config import Settings 

34 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret') 

35 >>> s.api_key 

36 'admin:secret' 

37 >>> s2 = Settings(transport_type='http') 

38 >>> s2.validate_transport() # no error 

39 >>> s3 = Settings(transport_type='invalid') 

40 >>> try: 

41 ... s3.validate_transport() 

42 ... except ValueError as e: 

43 ... print('error') 

44 error 

45 >>> s4 = Settings(database_url='sqlite:///./test.db') 

46 >>> isinstance(s4.database_settings, dict) 

47 True 

48""" 

49 

50# Standard 

51from functools import lru_cache 

52from importlib.resources import files 

53import logging 

54import os 

55from pathlib import Path 

56import re 

57import sys 

58from typing import Annotated, Any, ClassVar, Dict, List, Literal, NotRequired, Optional, Self, Set, TypedDict 

59from urllib.parse import urlparse 

60 

61# Third-Party 

62from cryptography.hazmat.primitives import serialization 

63from cryptography.hazmat.primitives.asymmetric import ed25519 

64import orjson 

65from pydantic import AliasChoices, Field, field_validator, HttpUrl, model_validator, PositiveInt, SecretStr, ValidationInfo 

66from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict 

67 

68# Only configure basic logging if no handlers exist yet 

69# This prevents conflicts with LoggingService while ensuring config logging works 

70if not logging.getLogger().handlers: 

71 logging.basicConfig( 

72 level=logging.INFO, 

73 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

74 datefmt="%Y-%m-%dT%H:%M:%S", 

75 ) 

76 

77logger = logging.getLogger(__name__) 

78 

79 

80def _normalize_env_list_vars() -> None: 

81 """Normalize list-typed env vars to valid JSON arrays. 

82 

83 Ensures env values parse cleanly when providers expect JSON for complex types. 

84 If a value is empty or CSV, convert to a JSON array string. 

85 """ 

86 keys = [ 

87 "SSO_TRUSTED_DOMAINS", 

88 "SSO_AUTO_ADMIN_DOMAINS", 

89 "SSO_GITHUB_ADMIN_ORGS", 

90 "SSO_GOOGLE_ADMIN_DOMAINS", 

91 "SSO_ENTRA_ADMIN_GROUPS", 

92 "LOG_DETAILED_SKIP_ENDPOINTS", 

93 "TOOL_DESCRIPTION_FORBIDDEN_PATTERNS", 

94 "CONTENT_ALLOWED_RESOURCE_MIMETYPES", 

95 ] 

96 for key in keys: 

97 raw = os.environ.get(key) 

98 if raw is None: 

99 continue 

100 s = raw.strip() 

101 if not s: 

102 os.environ[key] = "[]" 

103 continue 

104 if s.startswith("["): 

105 # Already JSON-like, keep as is 

106 try: 

107 orjson.loads(s) 

108 continue 

109 except Exception: 

110 pass # nosec B110 - Intentionally continue with CSV parsing if JSON parsing fails 

111 # Convert CSV to JSON array 

112 items = [item.strip() for item in s.split(",") if item.strip()] 

113 os.environ[key] = orjson.dumps(items).decode() 

114 

115 

116_normalize_env_list_vars() 

117 

118 

119# Default content type for outgoing requests to Forge 

120FORGE_CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

121 

122# UI embedding / visibility controls 

123UI_HIDABLE_SECTIONS = frozenset( 

124 { 

125 "overview", 

126 "servers", 

127 "gateways", 

128 "tools", 

129 "prompts", 

130 "resources", 

131 "roots", 

132 "mcp-registry", 

133 "metrics", 

134 "plugins", 

135 "export-import", 

136 "logs", 

137 "version-info", 

138 "maintenance", 

139 "teams", 

140 "users", 

141 "agents", 

142 "grpc-services", 

143 "tokens", 

144 "settings", 

145 } 

146) 

147UI_HIDABLE_HEADER_ITEMS = frozenset({"logout", "team_selector", "user_identity", "theme_toggle"}) 

148UI_HIDE_SECTION_ALIASES = { 

149 "catalog": "servers", 

150 "virtual_servers": "servers", 

151 "a2a-agents": "agents", 

152 "a2a": "agents", 

153 "api_tokens": "tokens", 

154 "llm-settings": "settings", 

155} 

156 

157 

158class Settings(BaseSettings): 

159 """ 

160 ContextForge AI Gateway configuration settings. 

161 

162 Examples: 

163 >>> from mcpgateway.config import Settings 

164 >>> s = Settings(basic_auth_user='admin', basic_auth_password='secret') 

165 >>> s.api_key 

166 'admin:secret' 

167 >>> s2 = Settings(transport_type='http') 

168 >>> s2.validate_transport() # no error 

169 >>> s3 = Settings(transport_type='invalid') 

170 >>> try: 

171 ... s3.validate_transport() 

172 ... except ValueError as e: 

173 ... print('error') 

174 error 

175 >>> s4 = Settings(database_url='sqlite:///./test.db') 

176 >>> isinstance(s4.database_settings, dict) 

177 True 

178 >>> s5 = Settings() 

179 >>> s5.app_name 

180 'ContextForge' 

181 >>> s5.host in ('0.0.0.0', '127.0.0.1') # Default can be either 

182 True 

183 >>> s5.port 

184 4444 

185 >>> s5.auth_required 

186 True 

187 >>> isinstance(s5.allowed_origins, set) 

188 True 

189 >>> s6 = Settings(log_detailed_skip_endpoints=["/metrics", "/health"]) 

190 >>> s6.log_detailed_skip_endpoints 

191 ['/metrics', '/health'] 

192 >>> s7 = Settings(log_detailed_sample_rate=0.5) 

193 >>> s7.log_detailed_sample_rate 

194 0.5 

195 >>> s8 = Settings(log_resolve_user_identity=True) 

196 >>> s8.log_resolve_user_identity 

197 True 

198 >>> s9 = Settings() 

199 >>> s9.log_detailed_skip_endpoints 

200 [] 

201 >>> s9.log_detailed_sample_rate 

202 1.0 

203 >>> s9.log_resolve_user_identity 

204 False 

205 """ 

206 

207 # Basic Settings 

208 app_name: str = "ContextForge" 

209 host: str = "127.0.0.1" 

210 port: PositiveInt = Field(default=4444, ge=1, le=65535) 

211 client_mode: bool = False 

212 docs_allow_basic_auth: bool = False # Allow basic auth for docs 

213 api_allow_basic_auth: bool = Field( 

214 default=False, 

215 description="Allow Basic authentication for API endpoints. Disabled by default for security. Use JWT or API tokens instead.", 

216 ) 

217 database_url: str = Field( 

218 default="sqlite:///./mcp.db", 

219 description=( 

220 "Database connection URL. Supports SQLite (dev) and PostgreSQL (production). " 

221 "For PostgreSQL with custom schema, use the 'options' query parameter: " 

222 "postgresql://user:pass@host:5432/db?options=-c%20search_path=schema_name " 

223 "(See Issue #1535 for details)" 

224 ), 

225 ) 

226 

227 # Absolute paths resolved at import-time (still override-able via env vars) 

228 templates_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "templates"))) 

229 static_dir: Path = Field(default_factory=lambda: Path(str(files("mcpgateway") / "static"))) 

230 

231 # Template auto-reload: False for production (default), True for development 

232 # Disabling prevents re-parsing templates on each request, improving performance under load 

233 # Use TEMPLATES_AUTO_RELOAD=true for development (make dev sets this automatically) 

234 templates_auto_reload: bool = Field(default=False, description="Auto-reload Jinja2 templates on change (enable for development)") 

235 

236 app_root_path: str = "" 

237 

238 # Protocol 

239 protocol_version: str = "2025-11-25" 

240 experimental_rust_mcp_runtime_enabled: bool = Field( 

241 default=False, 

242 description="Proxy POST /mcp traffic through the experimental Rust MCP runtime sidecar.", 

243 ) 

244 experimental_rust_mcp_runtime_url: str = Field( 

245 default="http://127.0.0.1:8787", 

246 description="Base URL for the experimental Rust MCP runtime sidecar.", 

247 ) 

248 experimental_rust_mcp_runtime_uds: Optional[str] = Field( 

249 default=None, 

250 description="Optional Unix domain socket path for the experimental Rust MCP runtime sidecar.", 

251 ) 

252 experimental_rust_mcp_runtime_timeout_seconds: int = Field( 

253 default=30, 

254 ge=1, 

255 le=300, 

256 description="Timeout in seconds for Python-to-Rust MCP runtime proxy requests.", 

257 ) 

258 experimental_rust_mcp_session_core_enabled: bool = Field( 

259 default=False, 

260 description="Enable the experimental Rust-owned MCP session metadata core while keeping Python as the fallback transport backend.", 

261 ) 

262 experimental_rust_mcp_event_store_enabled: bool = Field( 

263 default=False, 

264 description="Enable the experimental Rust-owned resumable MCP event-store backend for Streamable HTTP sessions.", 

265 ) 

266 experimental_rust_mcp_resume_core_enabled: bool = Field( 

267 default=False, 

268 description="Enable the experimental Rust-owned public MCP replay/resume path for GET /mcp with Last-Event-ID while keeping Python fallback available.", 

269 ) 

270 experimental_rust_mcp_live_stream_core_enabled: bool = Field( 

271 default=False, 

272 description="Enable the experimental Rust-owned public MCP live GET /mcp SSE path while keeping Python as the fallback upstream stream source.", 

273 ) 

274 experimental_rust_mcp_affinity_core_enabled: bool = Field( 

275 default=False, 

276 description="Enable the experimental Rust-owned MCP session-affinity forwarding path while keeping Python worker forwarding as the fallback.", 

277 ) 

278 experimental_rust_mcp_session_auth_reuse_enabled: bool = Field( 

279 default=False, 

280 description="Enable the experimental Rust-owned MCP session-bound auth-context reuse path for direct public /mcp ingress.", 

281 ) 

282 

283 # Authentication 

284 basic_auth_user: str = "admin" 

285 basic_auth_password: SecretStr = Field(default=SecretStr("changeme")) 

286 jwt_algorithm: str = "HS256" 

287 jwt_secret_key: SecretStr = Field(default=SecretStr("my-test-key")) 

288 jwt_public_key_path: str = "" 

289 jwt_private_key_path: str = "" 

290 jwt_audience: str = "mcpgateway-api" 

291 jwt_issuer: str = "mcpgateway" 

292 jwt_audience_verification: bool = True 

293 jwt_issuer_verification: bool = True 

294 auth_required: bool = True 

295 allow_unauthenticated_admin: bool = Field( 

296 default=False, 

297 description="Allow unauthenticated requests to receive platform-admin context when AUTH_REQUIRED=false (dangerous; development-only override).", 

298 ) 

299 token_expiry: int = 10080 # minutes 

300 

301 require_token_expiration: bool = Field(default=True, description="Require all JWT tokens to have expiration claims (secure default)") 

302 require_jti: bool = Field(default=True, description="Require JTI (JWT ID) claim in all tokens for revocation support (secure default)") 

303 require_user_in_db: bool = Field( 

304 default=False, 

305 description="Require all authenticated users to exist in the database. When true, disables the platform admin bootstrap mechanism. WARNING: Enabling this on a fresh deployment will lock you out.", 

306 ) 

307 embed_environment_in_tokens: bool = Field(default=False, description="Embed environment claim in gateway-issued JWTs for environment isolation") 

308 validate_token_environment: bool = Field(default=False, description="Reject tokens with mismatched environment claim (tokens without env claim are allowed)") 

309 

310 # JSON Schema Validation for registration (Tool Input Schemas, Prompt schemas, etc) 

311 json_schema_validation_strict: bool = Field(default=True, description="Strict schema validation mode - reject invalid JSON schemas") 

312 

313 # SSO Configuration 

314 sso_enabled: bool = Field(default=False, description="Enable Single Sign-On authentication") 

315 sso_github_enabled: bool = Field(default=False, description="Enable GitHub OAuth authentication") 

316 sso_github_client_id: Optional[str] = Field(default=None, description="GitHub OAuth client ID") 

317 sso_github_client_secret: Optional[SecretStr] = Field(default=None, description="GitHub OAuth client secret") 

318 

319 sso_google_enabled: bool = Field(default=False, description="Enable Google OAuth authentication") 

320 sso_google_client_id: Optional[str] = Field(default=None, description="Google OAuth client ID") 

321 sso_google_client_secret: Optional[SecretStr] = Field(default=None, description="Google OAuth client secret") 

322 

323 sso_ibm_verify_enabled: bool = Field(default=False, description="Enable IBM Security Verify OIDC authentication") 

324 sso_ibm_verify_client_id: Optional[str] = Field(default=None, description="IBM Security Verify client ID") 

325 sso_ibm_verify_client_secret: Optional[SecretStr] = Field(default=None, description="IBM Security Verify client secret") 

326 sso_ibm_verify_issuer: Optional[str] = Field(default=None, description="IBM Security Verify OIDC issuer URL") 

327 

328 sso_okta_enabled: bool = Field(default=False, description="Enable Okta OIDC authentication") 

329 sso_okta_client_id: Optional[str] = Field(default=None, description="Okta client ID") 

330 sso_okta_client_secret: Optional[SecretStr] = Field(default=None, description="Okta client secret") 

331 sso_okta_issuer: Optional[str] = Field(default=None, description="Okta issuer URL") 

332 sso_okta_scope: str = Field(default="openid profile email", description="Okta OIDC scopes (space-separated)") 

333 okta_group_mapping: Optional[str] = Field(default=None, description="JSON mapping of Okta group names to team UUIDs") 

334 

335 sso_keycloak_enabled: bool = Field(default=False, description="Enable Keycloak OIDC authentication") 

336 sso_keycloak_base_url: Optional[str] = Field(default=None, description="Keycloak base URL (e.g., https://keycloak.example.com)") 

337 sso_keycloak_public_base_url: Optional[str] = Field( 

338 default=None, 

339 description="Browser-facing Keycloak base URL for authorization redirects (e.g., http://localhost:8180)", 

340 ) 

341 sso_keycloak_realm: str = Field(default="master", description="Keycloak realm name") 

342 sso_keycloak_client_id: Optional[str] = Field(default=None, description="Keycloak client ID") 

343 sso_keycloak_client_secret: Optional[SecretStr] = Field(default=None, description="Keycloak client secret") 

344 sso_keycloak_map_realm_roles: bool = Field(default=True, description="Map Keycloak realm roles to gateway teams") 

345 sso_keycloak_map_client_roles: bool = Field(default=False, description="Map Keycloak client roles to gateway RBAC") 

346 sso_keycloak_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map Keycloak groups/roles to ContextForge roles (JSON: {group_or_role: role_name})") 

347 sso_keycloak_default_role: Optional[str] = Field(default=None, description="Default ContextForge role for Keycloak users without role mapping") 

348 sso_keycloak_resolve_team_scope_to_personal_team: bool = Field(default=False, description="Resolve team-scoped Keycloak role mappings to the user's personal team") 

349 sso_keycloak_username_claim: str = Field(default="preferred_username", description="JWT claim for username") 

350 

351 # Security Validation & Sanitization 

352 experimental_validate_io: bool = Field(default=False, description="Enable experimental input validation and output sanitization") 

353 validation_middleware_enabled: bool = Field(default=False, description="Enable validation middleware for all requests") 

354 validation_strict: bool = Field(default=True, description="Strict validation mode - reject on violations") 

355 sanitize_output: bool = Field(default=True, description="Sanitize output to remove control characters") 

356 allowed_roots: List[str] = Field(default_factory=list, description="Allowed root paths for resource access") 

357 max_path_depth: int = Field(default=10, description="Maximum allowed path depth") 

358 max_param_length: int = Field(default=10000, description="Maximum parameter length") 

359 dangerous_patterns: List[str] = Field( 

360 default_factory=lambda: [ 

361 r"[;&|`$(){}\[\]<>]", # Shell metacharacters 

362 r"\.\.[\\/]", # Path traversal 

363 r"[\x00-\x1f\x7f-\x9f]", # Control characters 

364 ], 

365 description="Regex patterns for dangerous input", 

366 ) 

367 tool_description_forbidden_patterns_enabled: bool = Field(default=True, description="Enable forbidden pattern validation on tool descriptions. Set to false to disable all checks.") 

368 tool_description_forbidden_patterns: List[str] = Field( 

369 default_factory=lambda: ["&&", "||", "$(", "> ", "< "], 

370 description='Substrings forbidden in tool descriptions. Override via TOOL_DESCRIPTION_FORBIDDEN_PATTERNS env var as a JSON array, e.g. \'["&&","||"]\'.', 

371 ) 

372 

373 sso_keycloak_email_claim: str = Field(default="email", description="JWT claim for email") 

374 sso_keycloak_groups_claim: str = Field(default="groups", description="JWT claim for groups/roles") 

375 

376 sso_entra_enabled: bool = Field(default=False, description="Enable Microsoft Entra ID OIDC authentication") 

377 sso_entra_client_id: Optional[str] = Field(default=None, description="Microsoft Entra ID client ID") 

378 sso_entra_client_secret: Optional[SecretStr] = Field(default=None, description="Microsoft Entra ID client secret") 

379 sso_entra_tenant_id: Optional[str] = Field(default=None, description="Microsoft Entra ID tenant ID") 

380 sso_entra_groups_claim: str = Field(default="groups", description="JWT claim for EntraID groups (groups/roles)") 

381 sso_entra_admin_groups: Annotated[list[str], NoDecode] = Field(default_factory=list, description="EntraID groups granting platform_admin role (CSV/JSON)") 

382 sso_entra_role_mappings: Dict[str, str] = Field(default_factory=dict, description="Map EntraID groups to ContextForge roles (JSON: {group_id: role_name})") 

383 sso_entra_default_role: Optional[str] = Field(default=None, description="Default role for EntraID users without group mapping (None = no role assigned)") 

384 sso_entra_sync_roles_on_login: bool = Field(default=True, description="Synchronize role assignments on each login") 

385 sso_entra_graph_api_enabled: bool = Field(default=True, description="Enable Microsoft Graph fallback for EntraID groups overage claims") 

386 sso_entra_graph_api_timeout: int = Field(default=10, ge=1, le=120, description="Timeout in seconds for Microsoft Graph group fallback requests") 

387 sso_entra_graph_api_max_groups: int = Field(default=0, ge=0, description="Maximum groups to keep from Graph fallback (0 = no limit)") 

388 

389 sso_adfs_enabled: bool = Field(default=False, description="Enable ADFS OIDC authentication") 

390 sso_adfs_client_id: Optional[str] = Field(default=None, description="ADFS OAuth client ID") 

391 sso_adfs_client_secret: Optional[SecretStr] = Field(default=None, description="ADFS OAuth client secret") 

392 sso_adfs_authorization_url: Optional[str] = Field(default=None, description="ADFS authorization endpoint URL (e.g., https://adfs.example.com/adfs/oauth2/authorize/)") 

393 sso_adfs_token_url: Optional[str] = Field(default=None, description="ADFS token endpoint URL (e.g., https://adfs.example.com/adfs/oauth2/token/)") 

394 sso_adfs_issuer: Optional[str] = Field(default=None, description="ADFS issuer URL (e.g., https://adfs.example.com/adfs)") 

395 sso_adfs_scope: Optional[str] = Field(default="openid profile email", description="ADFS OAuth scopes (space-separated)") 

396 sso_adfs_display_name: Optional[str] = Field(default="ADFS Login", description="Display name shown on login page for ADFS") 

397 

398 sso_generic_enabled: bool = Field(default=False, description="Enable generic OIDC provider (Keycloak, Auth0, etc.)") 

399 sso_generic_provider_id: Optional[str] = Field(default=None, description="Provider ID (e.g., 'keycloak', 'auth0', 'authentik')") 

400 sso_generic_display_name: Optional[str] = Field(default=None, description="Display name shown on login page") 

401 sso_generic_client_id: Optional[str] = Field(default=None, description="Generic OIDC client ID") 

402 sso_generic_client_secret: Optional[SecretStr] = Field(default=None, description="Generic OIDC client secret") 

403 sso_generic_authorization_url: Optional[str] = Field(default=None, description="Authorization endpoint URL") 

404 sso_generic_token_url: Optional[str] = Field(default=None, description="Token endpoint URL") 

405 sso_generic_userinfo_url: Optional[str] = Field(default=None, description="Userinfo endpoint URL") 

406 sso_generic_issuer: Optional[str] = Field(default=None, description="OIDC issuer URL") 

407 sso_generic_jwks_uri: Optional[str] = Field(default=None, description="OIDC JWKS endpoint URL for token signature verification") 

408 sso_generic_scope: Optional[str] = Field(default="openid profile email", description="OAuth scopes (space-separated)") 

409 

410 # SSO Settings 

411 sso_auto_create_users: bool = Field(default=True, description="Automatically create users from SSO providers") 

412 sso_trusted_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Trusted email domains (CSV or JSON list)") 

413 sso_preserve_admin_auth: bool = Field(default=True, description="Preserve local admin authentication when SSO is enabled") 

414 sso_auto_disable_unconfigured_providers: bool = Field( 

415 default=False, 

416 description=( 

417 "Automatically disable SSO providers not present in environment configuration during bootstrap. " 

418 "When enabled, providers configured in the database but missing from SSO_*_ENABLED environment variables " 

419 "will be disabled. This enforces environment config as the single source of truth. " 

420 "Default: false (preserves manually configured providers for backward compatibility)." 

421 ), 

422 ) 

423 

424 # SSO Admin Assignment Settings 

425 sso_auto_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Admin domains (CSV or JSON list)") 

426 sso_github_admin_orgs: Annotated[list[str], NoDecode] = Field(default_factory=list, description="GitHub orgs granting admin (CSV/JSON)") 

427 sso_google_admin_domains: Annotated[list[str], NoDecode] = Field(default_factory=list, description="Google admin domains (CSV/JSON)") 

428 sso_require_admin_approval: bool = Field(default=False, description="Require admin approval for new SSO registrations") 

429 

430 # ADFS-specific Settings 

431 sso_adfs_default_email_domain: Optional[str] = Field( 

432 default=None, description="Default email domain for ADFS when UPN is plain username (e.g., 'company.com' converts 'user123' to 'user123@company.com')" 

433 ) 

434 

435 # MCP Client Authentication 

436 mcp_client_auth_enabled: bool = Field(default=True, description="Enable JWT authentication for MCP client operations") 

437 mcp_require_auth: Optional[bool] = Field( 

438 default=None, 

439 description=( 

440 "Require authentication for /mcp endpoints. " 

441 "When unset, inherits AUTH_REQUIRED. " 

442 "Set false explicitly to allow unauthenticated access to public items only; " 

443 "set true to require a valid Bearer token for all /mcp requests." 

444 ), 

445 ) 

446 trust_proxy_auth: bool = Field( 

447 default=False, 

448 description="Trust proxy authentication headers (required when mcp_client_auth_enabled=false)", 

449 ) 

450 trust_proxy_auth_dangerously: bool = Field( 

451 default=False, 

452 description="Acknowledge and allow trusted proxy headers when MCP_CLIENT_AUTH_ENABLED=false (dangerous; only for strictly trusted proxy deployments).", 

453 ) 

454 proxy_user_header: str = Field(default="X-Authenticated-User", description="Header containing authenticated username from proxy") 

455 

456 # Encryption key phrase for auth storage 

457 auth_encryption_secret: SecretStr = Field(default=SecretStr("my-test-salt")) 

458 

459 # Query Parameter Authentication (INSECURE - disabled by default) 

460 insecure_allow_queryparam_auth: bool = Field( 

461 default=False, 

462 description=("Enable query parameter authentication for gateway peers. " "WARNING: API keys may appear in proxy logs. See CWE-598."), 

463 ) 

464 insecure_queryparam_auth_allowed_hosts: List[str] = Field( 

465 default_factory=list, 

466 description=("Allowlist of hosts permitted to use query parameter auth. " "Empty list allows any host when feature is enabled. " "Format: ['mcp.tavily.com', 'api.example.com']"), 

467 ) 

468 

469 # =================================== 

470 # SSRF Protection Configuration 

471 # =================================== 

472 # Server-Side Request Forgery (SSRF) protection prevents the gateway from being 

473 # used to access internal resources or cloud metadata services. 

474 

475 ssrf_protection_enabled: bool = Field( 

476 default=True, 

477 description="Enable SSRF protection for gateway/tool URLs. Blocks access to dangerous endpoints.", 

478 ) 

479 

480 ssrf_blocked_networks: List[str] = Field( 

481 default=[ 

482 # Cloud metadata services (ALWAYS dangerous - credential exposure) 

483 "169.254.169.254/32", # AWS/GCP/Azure instance metadata 

484 "169.254.169.123/32", # AWS NTP service 

485 "fd00::1/128", # IPv6 cloud metadata 

486 # Link-local (often used for cloud metadata) 

487 "169.254.0.0/16", # Full link-local IPv4 range 

488 "fe80::/10", # IPv6 link-local 

489 ], 

490 description=( 

491 "CIDR ranges to block for SSRF protection. These are ALWAYS blocked regardless of other settings. " "Default blocks cloud metadata endpoints. Add private ranges for stricter security." 

492 ), 

493 ) 

494 

495 ssrf_blocked_hosts: List[str] = Field( 

496 default=[ 

497 "metadata.google.internal", # GCP metadata hostname 

498 "metadata.internal", # Generic cloud metadata 

499 ], 

500 description="Hostnames to block for SSRF protection. Matched case-insensitively.", 

501 ) 

502 

503 ssrf_allow_localhost: bool = Field( 

504 default=False, 

505 description=("Allow localhost/loopback addresses (127.0.0.0/8, ::1). " "Default false for safer production behavior."), 

506 ) 

507 

508 ssrf_allow_private_networks: bool = Field( 

509 default=False, 

510 description=( 

511 "Allow RFC 1918 private network addresses (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16). " "When false, private destinations are blocked unless explicitly listed in SSRF_ALLOWED_NETWORKS." 

512 ), 

513 ) 

514 

515 ssrf_allowed_networks: List[str] = Field( 

516 default_factory=list, 

517 description=("Optional CIDR allowlist for internal/private destinations. " "Used when SSRF_ALLOW_PRIVATE_NETWORKS=false to allow specific internal ranges."), 

518 ) 

519 

520 ssrf_dns_fail_closed: bool = Field( 

521 default=True, 

522 description=( 

523 "Fail closed on DNS resolution errors. When true, URLs that cannot be resolved " 

524 "are rejected. When false, unresolvable hostnames are allowed through " 

525 "(hostname blocklist still applies)." 

526 ), 

527 ) 

528 

529 # OAuth Configuration 

530 oauth_request_timeout: int = Field(default=30, description="OAuth request timeout in seconds") 

531 oauth_max_retries: int = Field(default=3, description="Maximum retries for OAuth token requests") 

532 oauth_default_timeout: int = Field(default=3600, description="Default OAuth token timeout in seconds") 

533 

534 # =================================== 

535 # Dynamic Client Registration (DCR) - Client Mode 

536 # =================================== 

537 

538 # Enable DCR client functionality 

539 dcr_enabled: bool = Field(default=True, description="Enable Dynamic Client Registration (RFC 7591) - gateway acts as DCR client") 

540 

541 # Auto-register when missing credentials 

542 dcr_auto_register_on_missing_credentials: bool = Field(default=True, description="Automatically register with AS when gateway has issuer but no client_id") 

543 

544 # Default scopes for DCR 

545 dcr_default_scopes: List[str] = Field(default=["mcp:read"], description="Default MCP scopes to request during DCR") 

546 

547 # Issuer allowlist (empty = allow any) 

548 dcr_allowed_issuers: List[str] = Field(default_factory=list, description="Optional allowlist of issuer URLs for DCR (empty = allow any)") 

549 

550 # Token endpoint auth method 

551 dcr_token_endpoint_auth_method: str = Field(default="client_secret_basic", description="Token endpoint auth method for DCR (client_secret_basic or client_secret_post)") 

552 

553 # Metadata cache TTL 

554 dcr_metadata_cache_ttl: int = Field(default=3600, description="AS metadata cache TTL in seconds (RFC 8414 discovery)") 

555 

556 # Client name template 

557 dcr_client_name_template: str = Field(default="ContextForge ({gateway_name})", description="Template for client_name in DCR requests") 

558 

559 # Refresh token behavior 

560 dcr_request_refresh_token_when_unsupported: bool = Field( 

561 default=False, 

562 description="Request refresh_token even when AS metadata omits grant_types_supported. Enable for AS servers that support refresh tokens but don't advertise it.", 

563 ) 

564 

565 # =================================== 

566 # OAuth Discovery (RFC 8414) 

567 # =================================== 

568 

569 oauth_discovery_enabled: bool = Field(default=True, description="Enable OAuth AS metadata discovery (RFC 8414)") 

570 

571 oauth_preferred_code_challenge_method: str = Field(default="S256", description="Preferred PKCE code challenge method (S256 or plain)") 

572 

573 # Email-Based Authentication 

574 email_auth_enabled: bool = Field(default=True, description="Enable email-based authentication") 

575 public_registration_enabled: bool = Field( 

576 default=False, 

577 description="Allow unauthenticated users to self-register accounts. When false, only admins can create users via /admin/users endpoint.", 

578 ) 

579 allow_public_visibility: bool = Field( 

580 default=True, 

581 description="When false, creating or updating any entity with public visibility is blocked in team scope.", 

582 ) 

583 protect_all_admins: bool = Field( 

584 default=True, 

585 description="When true (default), prevent any admin from being demoted, deactivated, or locked out via API/UI. When false, only the last active admin is protected.", 

586 ) 

587 platform_admin_email: str = Field(default="admin@example.com", description="Platform administrator email address") 

588 platform_admin_password: SecretStr = Field(default=SecretStr("changeme"), description="Platform administrator password") 

589 default_user_password: SecretStr = Field(default=SecretStr("changeme"), description="Default password for new users") # nosec B105 

590 platform_admin_full_name: str = Field(default="Platform Administrator", description="Platform administrator full name") 

591 

592 # Argon2id Password Hashing Configuration 

593 argon2id_time_cost: int = Field(default=3, description="Argon2id time cost (number of iterations)") 

594 argon2id_memory_cost: int = Field(default=65536, description="Argon2id memory cost in KiB") 

595 argon2id_parallelism: int = Field(default=1, description="Argon2id parallelism (number of threads)") 

596 

597 # Password Policy Configuration 

598 password_min_length: int = Field(default=8, description="Minimum password length") 

599 password_require_uppercase: bool = Field(default=True, description="Require uppercase letters in passwords") 

600 password_require_lowercase: bool = Field(default=True, description="Require lowercase letters in passwords") 

601 password_require_numbers: bool = Field(default=False, description="Require numbers in passwords") 

602 password_require_special: bool = Field(default=True, description="Require special characters in passwords") 

603 

604 # Password change enforcement and policy toggles 

605 password_change_enforcement_enabled: bool = Field(default=True, description="Master switch for password change enforcement checks") 

606 admin_require_password_change_on_bootstrap: bool = Field(default=True, description="Force admin to change password after bootstrap") 

607 detect_default_password_on_login: bool = Field(default=True, description="Detect default password during login and mark user for change") 

608 require_password_change_for_default_password: bool = Field(default=True, description="Require password change when user is created with the default password") 

609 password_policy_enabled: bool = Field(default=True, description="Enable password complexity validation for new/changed passwords") 

610 password_prevent_reuse: bool = Field(default=True, description="Prevent reusing the current password when changing") 

611 password_max_age_days: int = Field(default=90, description="Password maximum age in days before expiry forces a change") 

612 # Account Security Configuration 

613 max_failed_login_attempts: int = Field(default=10, description="Maximum failed login attempts before account lockout") 

614 account_lockout_duration_minutes: int = Field(default=1, description="Account lockout duration in minutes") 

615 account_lockout_notification_enabled: bool = Field(default=True, description="Send lockout notification emails when accounts are locked") 

616 failed_login_min_response_ms: int = Field(default=250, description="Minimum response duration for failed login attempts to reduce timing side channels") 

617 

618 # Self-service password reset 

619 password_reset_enabled: bool = Field(default=True, description="Enable self-service password reset workflow (set false to disable public forgot/reset endpoints)") 

620 password_reset_token_expiry_minutes: int = Field(default=60, description="Password reset token expiration time in minutes") 

621 password_reset_rate_limit: int = Field(default=5, description="Maximum password reset requests allowed per email in each rate-limit window") 

622 password_reset_rate_window_minutes: int = Field(default=15, description="Password reset request rate-limit window in minutes") 

623 password_reset_invalidate_sessions: bool = Field(default=True, description="Invalidate active sessions after password reset") 

624 password_reset_min_response_ms: int = Field(default=250, description="Minimum response duration for forgot-password requests to reduce timing side channels") 

625 

626 # Email delivery for auth notifications 

627 smtp_enabled: bool = Field( 

628 default=False, 

629 description="Enable SMTP email delivery for password reset and account lockout notifications (when false, reset requests are accepted but no email is sent)", 

630 ) 

631 smtp_host: Optional[str] = Field(default=None, description="SMTP server host") 

632 smtp_port: int = Field(default=587, description="SMTP server port") 

633 smtp_user: Optional[str] = Field(default=None, description="SMTP username") 

634 smtp_password: Optional[SecretStr] = Field(default=None, description="SMTP password") 

635 smtp_from_email: Optional[str] = Field(default=None, description="From email address used for auth notifications") 

636 smtp_from_name: str = Field(default="ContextForge", description="From display name used for auth notifications") 

637 smtp_use_tls: bool = Field(default=True, description="Use STARTTLS for SMTP connections") 

638 smtp_use_ssl: bool = Field(default=False, description="Use implicit SSL/TLS for SMTP connections") 

639 smtp_timeout_seconds: int = Field(default=15, description="SMTP connection timeout in seconds") 

640 

641 # Personal Teams Configuration 

642 auto_create_personal_teams: bool = Field(default=True, description="Enable automatic personal team creation for new users") 

643 personal_team_prefix: str = Field(default="", description="Personal team naming prefix") 

644 max_teams_per_user: int = Field(default=50, description="Maximum number of teams a user can belong to") 

645 max_members_per_team: int = Field(default=100, description="Maximum number of members per team") 

646 invitation_expiry_days: int = Field(default=7, description="Number of days before team invitations expire") 

647 require_email_verification_for_invites: bool = Field(default=True, description="Require email verification for team invitations") 

648 

649 # Team Governance 

650 allow_team_creation: bool = Field(default=True, description="Allow users to create organizational teams. Admins can always create teams.") 

651 allow_team_join_requests: bool = Field(default=True, description="Allow users to request to join public teams") 

652 allow_team_invitations: bool = Field(default=True, description="Allow team owners to send invitations") 

653 

654 # Default Role Configuration 

655 default_admin_role: str = Field(default="platform_admin", description="Global role assigned to admin users") 

656 default_user_role: str = Field(default="platform_viewer", description="Global role assigned to non-admin users") 

657 default_team_owner_role: str = Field(default="team_admin", description="Team-scoped role assigned to team owners (e.g. personal team creator)") 

658 default_team_member_role: str = Field(default="viewer", description="Team-scoped role assigned to team members") 

659 

660 # UI/Admin Feature Flags 

661 mcpgateway_ui_enabled: bool = False 

662 mcpgateway_admin_api_enabled: bool = False 

663 mcpgateway_ui_airgapped: bool = Field(default=False, description="Use local CDN assets instead of external CDNs for airgapped deployments") 

664 mcpgateway_ui_embedded: bool = Field(default=False, description="Enable embedded UI mode (hides select header controls by default)") 

665 mcpgateway_ui_hide_sections: Annotated[list[str], NoDecode] = Field( 

666 default_factory=list, 

667 description=( 

668 "CSV/JSON list of UI sections to hide. " 

669 "Valid values: overview, servers, gateways, tools, prompts, resources, roots, mcp-registry, " 

670 "metrics, plugins, export-import, logs, version-info, maintenance, teams, users, agents, tokens, settings" 

671 ), 

672 ) 

673 mcpgateway_ui_hide_header_items: Annotated[list[str], NoDecode] = Field( 

674 default_factory=list, 

675 description="CSV/JSON list of header items to hide. Valid values: logout, team_selector, user_identity, theme_toggle", 

676 ) 

677 mcpgateway_ui_hide_sections_admin: Annotated[list[str], NoDecode] = Field( 

678 default_factory=list, 

679 description=("CSV/JSON list of UI sections to hide for admin users. " "Same valid values as MCPGATEWAY_UI_HIDE_SECTIONS. " "When unset, admins see all sections."), 

680 ) 

681 mcpgateway_ui_hide_header_items_admin: Annotated[list[str], NoDecode] = Field( 

682 default_factory=list, 

683 description="CSV/JSON list of header items to hide for admin users. Same valid values as MCPGATEWAY_UI_HIDE_HEADER_ITEMS.", 

684 ) 

685 mcpgateway_bulk_import_enabled: bool = True 

686 mcpgateway_bulk_import_max_tools: int = 200 

687 mcpgateway_bulk_import_rate_limit: int = 10 

688 

689 # UI Tool Test Configuration 

690 mcpgateway_ui_tool_test_timeout: int = Field(default=60000, description="Tool test timeout in milliseconds for the admin UI") 

691 

692 # Tool Execution Cancellation 

693 mcpgateway_tool_cancellation_enabled: bool = Field(default=True, description="Enable gateway-authoritative tool execution cancellation with REST API endpoints") 

694 

695 # A2A (Agent-to-Agent) Feature Flags 

696 mcpgateway_a2a_enabled: bool = True 

697 mcpgateway_a2a_max_agents: int = 100 

698 mcpgateway_a2a_default_timeout: int = 30 

699 mcpgateway_a2a_max_retries: int = 3 

700 mcpgateway_a2a_metrics_enabled: bool = True 

701 

702 # gRPC Support Configuration (EXPERIMENTAL - disabled by default) 

703 mcpgateway_grpc_enabled: bool = Field(default=False, description="Enable gRPC to MCP translation support (experimental feature)") 

704 mcpgateway_grpc_reflection_enabled: bool = Field(default=True, description="Enable gRPC server reflection by default") 

705 mcpgateway_grpc_max_message_size: int = Field(default=4194304, description="Maximum gRPC message size in bytes (4MB)") 

706 mcpgateway_grpc_timeout: int = Field(default=30, description="Default gRPC call timeout in seconds") 

707 mcpgateway_grpc_tls_enabled: bool = Field(default=False, description="Enable TLS for gRPC connections by default") 

708 

709 # Direct Proxy Configuration (disabled by default) 

710 mcpgateway_direct_proxy_enabled: bool = Field(default=False, description="Enable direct_proxy gateway mode for pass-through MCP operations") 

711 mcpgateway_direct_proxy_timeout: int = Field(default=30, description="Default timeout in seconds for direct proxy MCP operations") 

712 

713 # =================================== 

714 # Performance Monitoring Configuration 

715 # =================================== 

716 mcpgateway_performance_tracking: bool = Field(default=False, description="Enable performance tracking tab in admin UI") 

717 mcpgateway_performance_collection_interval: int = Field(default=10, ge=1, le=300, description="Metric collection interval in seconds") 

718 mcpgateway_performance_retention_hours: int = Field(default=24, ge=1, le=168, description="Snapshot retention period in hours") 

719 mcpgateway_performance_retention_days: int = Field(default=90, ge=1, le=365, description="Aggregate retention period in days") 

720 mcpgateway_performance_max_snapshots: int = Field(default=10000, ge=100, le=1000000, description="Maximum performance snapshots to retain") 

721 mcpgateway_performance_distributed: bool = Field(default=False, description="Enable distributed mode metrics aggregation via Redis") 

722 mcpgateway_performance_net_connections_enabled: bool = Field(default=True, description="Enable network connections counting (can be CPU intensive)") 

723 mcpgateway_performance_net_connections_cache_ttl: int = Field(default=15, ge=1, le=300, description="Cache TTL for net_connections in seconds") 

724 

725 # MCP Server Catalog Configuration 

726 mcpgateway_catalog_enabled: bool = Field(default=True, description="Enable MCP server catalog feature") 

727 mcpgateway_catalog_file: str = Field(default="mcp-catalog.yml", description="Path to catalog configuration file") 

728 mcpgateway_catalog_auto_health_check: bool = Field(default=True, description="Automatically health check catalog servers") 

729 mcpgateway_catalog_cache_ttl: int = Field(default=3600, description="Catalog cache TTL in seconds") 

730 mcpgateway_catalog_page_size: int = Field(default=100, description="Number of catalog servers per page") 

731 

732 # ContextForge Bootstrap Roles In DB Configuration 

733 mcpgateway_bootstrap_roles_in_db_enabled: bool = Field(default=False, description="Enable ContextForge add additional roles in db") 

734 mcpgateway_bootstrap_roles_in_db_file: str = Field(default="additional_roles_in_db.json", description="Path to add additional roles in db") 

735 

736 # Elicitation support (MCP 2025-06-18) 

737 mcpgateway_elicitation_enabled: bool = Field(default=True, description="Enable elicitation passthrough support (MCP 2025-06-18)") 

738 mcpgateway_elicitation_timeout: int = Field(default=60, description="Default timeout for elicitation requests in seconds") 

739 mcpgateway_elicitation_max_concurrent: int = Field(default=100, description="Maximum concurrent elicitation requests") 

740 

741 # Security 

742 skip_ssl_verify: bool = Field( 

743 default=False, 

744 description="Skip SSL certificate verification for ALL outbound HTTPS requests " 

745 "(federation, MCP servers, LLM providers, A2A agents). " 

746 "WARNING: Only enable in dev environments with self-signed certificates.", 

747 ) 

748 cors_enabled: bool = True 

749 

750 # Environment 

751 environment: Literal["development", "staging", "production"] = Field(default="development") 

752 

753 # Domain configuration 

754 app_domain: HttpUrl = Field(default=HttpUrl("http://localhost:4444")) 

755 

756 # Security settings 

757 secure_cookies: bool = Field(default=True) 

758 cookie_samesite: str = Field(default="lax") 

759 

760 # CORS settings 

761 cors_allow_credentials: bool = Field(default=True) 

762 

763 # Security Headers Configuration 

764 security_headers_enabled: bool = Field(default=True) 

765 x_frame_options: Optional[str] = Field(default="DENY") 

766 

767 @field_validator("x_frame_options") 

768 @classmethod 

769 def normalize_x_frame_options(cls, v: Optional[str]) -> Optional[str]: 

770 """Convert string 'null', 'none', or empty/whitespace-only string to Python None to disable iframe restrictions. 

771 

772 Args: 

773 v: The X-Frame-Options value to normalize. 

774 

775 Returns: 

776 None if v is None, an empty/whitespace-only string, or case-insensitive 'null'/'none'; 

777 otherwise returns the stripped string value. 

778 """ 

779 if v is None: 

780 return None 

781 val = v.strip() 

782 if val == "" or val.lower() in ("null", "none"): 

783 return None 

784 return val 

785 

786 x_content_type_options_enabled: bool = Field(default=True) 

787 x_xss_protection_enabled: bool = Field(default=True) 

788 x_download_options_enabled: bool = Field(default=True) 

789 hsts_enabled: bool = Field(default=True) 

790 hsts_max_age: int = Field(default=31536000) # 1 year 

791 hsts_include_subdomains: bool = Field(default=True) 

792 remove_server_headers: bool = Field(default=True) 

793 

794 # Response Compression Configuration 

795 compression_enabled: bool = Field(default=True, description="Enable response compression (Brotli, Zstd, GZip)") 

796 compression_minimum_size: int = Field(default=500, ge=0, description="Minimum response size in bytes to compress (0 = compress all)") 

797 compression_gzip_level: int = Field(default=6, ge=1, le=9, description="GZip compression level (1=fastest, 9=best compression)") 

798 compression_brotli_quality: int = Field(default=4, ge=0, le=11, description="Brotli compression quality (0-3=fast, 4-9=balanced, 10-11=max)") 

799 compression_zstd_level: int = Field(default=3, ge=1, le=22, description="Zstd compression level (1-3=fast, 4-9=balanced, 10+=slow)") 

800 

801 # For allowed_origins, strip '' to ensure we're passing on valid JSON via env 

802 # Tell pydantic *not* to touch this env var - our validator will. 

803 allowed_origins: Annotated[Set[str], NoDecode] = { 

804 "http://localhost", 

805 "http://localhost:4444", 

806 } 

807 

808 # Security validation thresholds 

809 min_secret_length: int = 32 

810 min_password_length: int = 12 

811 require_strong_secrets: bool = False # Default to False for backward compatibility, will be enforced in 1.0.0 

812 

813 llmchat_enabled: bool = Field(default=True, description="Enable LLM Chat feature") 

814 mcpgateway_stdio_transport_enabled: bool = Field( 

815 default=False, 

816 description=("Enable stdio transport for MCP chat client configuration. Disabled by default; " "set true only in trusted environments that intentionally need stdio process execution."), 

817 ) 

818 toolops_enabled: bool = Field(default=False, description="Enable ToolOps feature") 

819 plugins_can_override_rbac: bool = Field( 

820 default=False, 

821 description=("Allow HTTP_AUTH_CHECK_PERMISSION plugins to short-circuit built-in RBAC grants. " "Disabled by default so plugin grant decisions are audit-only unless explicitly enabled."), 

822 ) 

823 plugins_can_override_auth_headers: bool = Field( 

824 default=False, 

825 description=( 

826 "DANGEROUS: Allow pre-request plugin hooks to override auth-sensitive headers " 

827 "(authorization, cookie, x-api-key, proxy-authorization) that the client already sent. " 

828 "Disabled by default because a malicious or misconfigured plugin could impersonate any " 

829 "user by rewriting the Authorization header. Only enable when all loaded plugins are " 

830 "fully trusted and the deployment requires token exchange (e.g. WXO auth). " 

831 "Requires a server restart to take effect." 

832 ), 

833 ) 

834 

835 # database-backed polling settings for session message delivery 

836 poll_interval: float = Field(default=1.0, description="Initial polling interval in seconds for checking new session messages") 

837 max_interval: float = Field(default=5.0, description="Maximum polling interval in seconds when the session is idle") 

838 backoff_factor: float = Field(default=1.5, description="Multiplier used to gradually increase the polling interval during inactivity") 

839 

840 # redis configurations for Maintaining Chat Sessions in multi-worker environment 

841 llmchat_session_ttl: int = Field(default=300, description="Seconds for active_session key TTL") 

842 llmchat_session_lock_ttl: int = Field(default=30, description="Seconds for lock expiry") 

843 llmchat_session_lock_retries: int = Field(default=10, description="How many times to poll while waiting") 

844 llmchat_session_lock_wait: float = Field(default=0.2, description="Seconds between polls") 

845 llmchat_chat_history_ttl: int = Field(default=3600, description="Seconds for chat history expiry") 

846 llmchat_chat_history_max_messages: int = Field(default=50, description="Maximum message history to store per user") 

847 

848 # LLM Settings (Internal API for LLM Chat) 

849 llm_api_prefix: str = Field(default="/v1", description="API prefix for internal LLM endpoints") 

850 llm_request_timeout: int = Field(default=120, description="Request timeout in seconds for LLM API calls") 

851 llm_streaming_enabled: bool = Field(default=True, description="Enable streaming responses for LLM Chat") 

852 llm_health_check_interval: int = Field(default=300, description="Provider health check interval in seconds") 

853 

854 @field_validator("allowed_roots", mode="before") 

855 @classmethod 

856 def parse_allowed_roots(cls, v): 

857 """Parse allowed roots from environment variable or config value. 

858 

859 Args: 

860 v: The input value to parse 

861 

862 Returns: 

863 list: Parsed list of allowed root paths 

864 """ 

865 if isinstance(v, str): 

866 # Support both JSON array and comma-separated values 

867 v = v.strip() 

868 if not v: 

869 return [] 

870 # Try JSON first 

871 try: 

872 loaded = orjson.loads(v) 

873 if isinstance(loaded, list): 

874 return loaded 

875 except orjson.JSONDecodeError: 

876 # Not a valid JSON array → fallback to comma-separated parsing 

877 pass 

878 # Fallback to comma-split 

879 return [x.strip() for x in v.split(",") if x.strip()] 

880 return v 

881 

882 @field_validator("jwt_secret_key", "auth_encryption_secret") 

883 @classmethod 

884 def validate_secrets(cls, v: Any, info: ValidationInfo) -> SecretStr: 

885 """ 

886 Validate that secret keys meet basic security requirements. 

887 

888 This validator is applied to the `jwt_secret_key` and `auth_encryption_secret` fields. 

889 It performs the following checks: 

890 

891 1. Detects default or weak secrets (e.g., "changeme", "secret", "password"). 

892 Logs a warning if detected. 

893 

894 2. Checks minimum length (at least 32 characters). Logs a warning if shorter. 

895 

896 3. Performs a basic entropy check (at least 10 unique characters). Logs a warning if low. 

897 

898 Notes: 

899 - Logging is used for warnings; the function does not raise exceptions. 

900 - The original value is returned as a `SecretStr` for safe handling. 

901 

902 Args: 

903 v: The secret value to validate. 

904 info: Pydantic validation info object, used to get the field name. 

905 

906 Returns: 

907 SecretStr: The validated secret value, wrapped as a SecretStr if it wasn't already. 

908 """ 

909 

910 field_name = info.field_name 

911 

912 # Extract actual string value safely 

913 if isinstance(v, SecretStr): 

914 value = v.get_secret_value() 

915 else: 

916 value = str(v) 

917 

918 # Check for default/weak secrets 

919 if not info.data.get("client_mode"): 

920 weak_secrets = ["my-test-key", "my-test-key-but-now-longer-than-32-bytes", "my-test-salt", "changeme", "secret", "password"] 

921 if value.lower() in weak_secrets: 

922 logger.warning(f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! Please set a strong, unique value for production.") 

923 

924 # Check minimum length 

925 if len(value) < 32: 

926 logger.warning(f"⚠️ SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. Current length: {len(value)}") 

927 

928 # Basic entropy check (at least 10 unique characters) 

929 if len(set(value)) < 10: 

930 logger.warning(f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. Consider using a more random value.") 

931 

932 # Always return SecretStr to keep it secret-safe 

933 return v if isinstance(v, SecretStr) else SecretStr(value) 

934 

935 @field_validator("basic_auth_password") 

936 @classmethod 

937 def validate_admin_password(cls, v: str | SecretStr, info: ValidationInfo) -> SecretStr: 

938 """Validate admin password meets security requirements. 

939 

940 Args: 

941 v: The admin password value to validate. 

942 info: ValidationInfo containing field data. 

943 

944 Returns: 

945 SecretStr: The validated admin password value, wrapped as SecretStr. 

946 """ 

947 # Extract actual string value safely 

948 if isinstance(v, SecretStr): 

949 value = v.get_secret_value() 

950 else: 

951 value = v 

952 

953 if not info.data.get("client_mode"): 

954 if value == "changeme": # nosec B105 - checking for default value 

955 logger.warning("🔓 SECURITY WARNING: Default BASIC_AUTH_PASSWORD detected! Please change it if you enable API_ALLOW_BASIC_AUTH.") 

956 

957 # Note: We can't access password_min_length here as it's not set yet during validation 

958 # Using default value of 8 to match the field default 

959 min_length = 8 # This matches the default in password_min_length field 

960 if len(value) < min_length: 

961 logger.warning(f"⚠️ SECURITY WARNING: Admin password should be at least {min_length} characters long. Current length: {len(value)}") 

962 

963 # Check password complexity 

964 has_upper = any(c.isupper() for c in value) 

965 has_lower = any(c.islower() for c in value) 

966 has_digit = any(c.isdigit() for c in value) 

967 has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', value)) 

968 

969 complexity_score = sum([has_upper, has_lower, has_digit, has_special]) 

970 if complexity_score < 3: 

971 logger.warning("🔐 SECURITY WARNING: Admin password has low complexity. Should contain at least 3 of: uppercase, lowercase, digits, special characters") 

972 

973 # Always return SecretStr to keep it secret-safe 

974 return v if isinstance(v, SecretStr) else SecretStr(value) 

975 

976 @field_validator("allowed_origins") 

977 @classmethod 

978 def validate_cors_origins(cls, v: Any, info: ValidationInfo) -> set[str] | None: 

979 """Validate CORS allowed origins. 

980 

981 Args: 

982 v: The set of allowed origins to validate. 

983 info: ValidationInfo containing field data. 

984 

985 Returns: 

986 set: The validated set of allowed origins. 

987 

988 Raises: 

989 ValueError: If allowed_origins is not a set or list of strings. 

990 """ 

991 if v is None: 

992 return v 

993 if not isinstance(v, (set, list)): 

994 raise ValueError("allowed_origins must be a set or list of strings") 

995 

996 dangerous_origins = ["*", "null", ""] 

997 if not info.data.get("client_mode"): 

998 for origin in v: 

999 if origin in dangerous_origins: 

1000 logger.warning(f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. Consider specifying explicit origins instead of wildcards.") 

1001 

1002 # Validate URL format 

1003 if not origin.startswith(("http://", "https://")) and origin not in dangerous_origins: 

1004 logger.warning(f"⚠️ SECURITY WARNING: Invalid origin format '{origin}'. Origins should start with http:// or https://") 

1005 

1006 return set({str(origin) for origin in v}) 

1007 

1008 @field_validator("database_url") 

1009 @classmethod 

1010 def validate_database_url(cls, v: str, info: ValidationInfo) -> str: 

1011 """Validate database connection string security. 

1012 

1013 Args: 

1014 v: The database URL to validate. 

1015 info: ValidationInfo containing field data. 

1016 

1017 Returns: 

1018 str: The validated database URL. 

1019 """ 

1020 # Check for hardcoded passwords in non-SQLite databases 

1021 if not info.data.get("client_mode"): 

1022 if not v.startswith("sqlite"): 

1023 if "password" in v and any(weak in v for weak in ["password", "123", "admin", "test"]): 

1024 logger.warning("Potentially weak database password detected. Consider using a stronger password.") 

1025 

1026 # Warn about SQLite in production 

1027 if v.startswith("sqlite"): 

1028 logger.info("Using SQLite database. Consider PostgreSQL for production.") 

1029 

1030 return v 

1031 

1032 @model_validator(mode="after") 

1033 def validate_security_combinations(self) -> Self: 

1034 """Validate security setting combinations. Only logs warnings; no changes are made. 

1035 

1036 Returns: 

1037 Itself. 

1038 """ 

1039 if not self.client_mode: 

1040 # Check for dangerous combinations - only log warnings, don't raise errors 

1041 if not self.auth_required and self.mcpgateway_ui_enabled: 

1042 logger.warning("🔓 SECURITY WARNING: Admin UI is enabled without authentication. Consider setting AUTH_REQUIRED=true for production.") 

1043 

1044 if self.skip_ssl_verify and not self.dev_mode: 

1045 logger.warning("🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. This is a security risk! Set SKIP_SSL_VERIFY=false for production.") 

1046 

1047 if self.debug and not self.dev_mode: 

1048 logger.warning("🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. This may leak sensitive information! Set DEBUG=false for production.") 

1049 

1050 return self 

1051 

1052 def get_security_warnings(self) -> List[str]: 

1053 """Get list of security warnings for current configuration. 

1054 

1055 Returns: 

1056 List[str]: List of security warning messages. 

1057 """ 

1058 warnings = [] 

1059 

1060 # Authentication warnings 

1061 if not self.auth_required: 

1062 warnings.append("🔓 Authentication is disabled - ensure this is intentional") 

1063 

1064 if self.basic_auth_user == "admin": 

1065 warnings.append("⚠️ Using default admin username - consider changing it") 

1066 

1067 # SSL/TLS warnings 

1068 if self.skip_ssl_verify: 

1069 warnings.append("🔓 SSL verification is disabled - not recommended for production") 

1070 

1071 # Debug/Dev warnings 

1072 if self.debug and not self.dev_mode: 

1073 warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage") 

1074 

1075 if self.dev_mode: 

1076 warnings.append("🔧 Development mode enabled - not for production use") 

1077 

1078 # CORS warnings 

1079 if self.cors_enabled and "*" in self.allowed_origins: 

1080 warnings.append("🌐 CORS allows all origins (*) - this is a security risk") 

1081 

1082 # Token warnings 

1083 if self.token_expiry > 10080: # More than 7 days 

1084 warnings.append("⏱️ JWT token expiry is very long - consider shorter duration") 

1085 

1086 # Database warnings 

1087 if self.database_url.startswith("sqlite") and not self.dev_mode: 

1088 warnings.append("💾 SQLite database in use - consider PostgreSQL for production") 

1089 

1090 # Rate limiting warnings 

1091 if self.tool_rate_limit > 1000: 

1092 warnings.append("🚦 Tool rate limit is very high - may allow abuse") 

1093 

1094 return warnings 

1095 

1096 class SecurityStatus(TypedDict): 

1097 """TypedDict for comprehensive security status.""" 

1098 

1099 secure_secrets: bool 

1100 auth_enabled: bool 

1101 ssl_verification: bool 

1102 debug_disabled: bool 

1103 cors_restricted: bool 

1104 ui_protected: bool 

1105 warnings: List[str] 

1106 security_score: int 

1107 

1108 def get_security_status(self) -> SecurityStatus: 

1109 """Get comprehensive security status. 

1110 

1111 Returns: 

1112 SecurityStatus: Dictionary containing security status information including score and warnings. 

1113 """ 

1114 

1115 # Compute a security score: 100 minus 10 for each warning 

1116 security_score = max(0, 100 - 10 * len(self.get_security_warnings())) 

1117 

1118 return { 

1119 "secure_secrets": (self.jwt_secret_key.get_secret_value() if isinstance(self.jwt_secret_key, SecretStr) else self.jwt_secret_key) 

1120 not in ("my-test-key", "my-test-key-but-now-longer-than-32-bytes"), # nosec B105 - checking for default values 

1121 "auth_enabled": self.auth_required, 

1122 "ssl_verification": not self.skip_ssl_verify, 

1123 "debug_disabled": not self.debug, 

1124 "cors_restricted": "*" not in self.allowed_origins if self.cors_enabled else True, 

1125 "ui_protected": not self.mcpgateway_ui_enabled or self.auth_required, 

1126 "warnings": self.get_security_warnings(), 

1127 "security_score": security_score, 

1128 } 

1129 

1130 # Max retries for HTTP requests 

1131 retry_max_attempts: int = 3 

1132 retry_base_delay: float = 1.0 # seconds 

1133 retry_max_delay: int = 60 # seconds 

1134 retry_jitter_max: float = 0.5 # fraction of base delay 

1135 

1136 # HTTPX Client Configuration (for shared singleton client) 

1137 # See: https://www.python-httpx.org/advanced/#pool-limits 

1138 # Formula: max_connections = expected_concurrent_outbound_requests × 1.5 

1139 httpx_max_connections: int = Field( 

1140 default=200, 

1141 ge=10, 

1142 le=1000, 

1143 description="Maximum total concurrent HTTP connections (global, not per-host). " "Increase for high-traffic deployments with many outbound calls.", 

1144 ) 

1145 httpx_max_keepalive_connections: int = Field( 

1146 default=100, 

1147 ge=1, 

1148 le=500, 

1149 description="Maximum idle keepalive connections to retain (typically 50% of max_connections)", 

1150 ) 

1151 httpx_keepalive_expiry: float = Field( 

1152 default=30.0, 

1153 ge=5.0, 

1154 le=300.0, 

1155 description="Seconds before idle keepalive connections are closed", 

1156 ) 

1157 httpx_connect_timeout: float = Field( 

1158 default=5.0, 

1159 ge=1.0, 

1160 le=60.0, 

1161 description="Timeout in seconds for establishing new connections (5s for LAN, increase for WAN)", 

1162 ) 

1163 httpx_read_timeout: float = Field( 

1164 default=120.0, 

1165 ge=1.0, 

1166 le=600.0, 

1167 description="Timeout in seconds for reading response data (set high for slow MCP tool calls)", 

1168 ) 

1169 httpx_write_timeout: float = Field( 

1170 default=30.0, 

1171 ge=1.0, 

1172 le=600.0, 

1173 description="Timeout in seconds for writing request data", 

1174 ) 

1175 httpx_pool_timeout: float = Field( 

1176 default=10.0, 

1177 ge=1.0, 

1178 le=120.0, 

1179 description="Timeout in seconds waiting for a connection from the pool (fail fast on exhaustion)", 

1180 ) 

1181 httpx_http2_enabled: bool = Field( 

1182 default=False, 

1183 description="Enable HTTP/2 (requires h2 package; enable only if upstreams support HTTP/2)", 

1184 ) 

1185 httpx_admin_read_timeout: float = Field( 

1186 default=30.0, 

1187 ge=1.0, 

1188 le=120.0, 

1189 description="Read timeout for admin UI operations (model fetching, health checks). " "Shorter than httpx_read_timeout to fail fast on admin pages.", 

1190 ) 

1191 

1192 @field_validator("allowed_origins", mode="before") 

1193 @classmethod 

1194 def _parse_allowed_origins(cls, v: Any) -> Set[str]: 

1195 """Parse allowed origins from environment variable or config value. 

1196 

1197 Handles multiple input formats for the allowed_origins field: 

1198 - JSON array string: '["http://localhost", "http://example.com"]' 

1199 - Comma-separated string: "http://localhost, http://example.com" 

1200 - Already parsed set/list 

1201 

1202 Automatically strips whitespace and removes outer quotes if present. 

1203 

1204 Args: 

1205 v: The input value to parse. Can be a string (JSON or CSV), set, list, or other iterable. 

1206 

1207 Returns: 

1208 Set[str]: A set of allowed origin strings. 

1209 

1210 Examples: 

1211 >>> sorted(Settings._parse_allowed_origins('["https://a.com", "https://b.com"]')) 

1212 ['https://a.com', 'https://b.com'] 

1213 >>> sorted(Settings._parse_allowed_origins("https://x.com , https://y.com")) 

1214 ['https://x.com', 'https://y.com'] 

1215 >>> Settings._parse_allowed_origins('""') 

1216 set() 

1217 >>> Settings._parse_allowed_origins('"https://single.com"') 

1218 {'https://single.com'} 

1219 >>> sorted(Settings._parse_allowed_origins(['http://a.com', 'http://b.com'])) 

1220 ['http://a.com', 'http://b.com'] 

1221 >>> Settings._parse_allowed_origins({'http://existing.com'}) 

1222 {'http://existing.com'} 

1223 """ 

1224 if isinstance(v, str): 

1225 v = v.strip() 

1226 if v[:1] in "\"'" and v[-1:] == v[:1]: # strip 1 outer quote pair 

1227 v = v[1:-1] 

1228 try: 

1229 parsed = set(orjson.loads(v)) 

1230 except orjson.JSONDecodeError: 

1231 parsed = {s.strip() for s in v.split(",") if s.strip()} 

1232 return parsed 

1233 return set(v) 

1234 

1235 # Logging 

1236 log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default="ERROR") 

1237 log_requests: bool = Field(default=False, description="Enable request payload logging with sensitive data masking") 

1238 log_format: Literal["json", "text"] = "json" # json or text 

1239 log_to_file: bool = False # Enable file logging (default: stdout/stderr only) 

1240 log_filemode: str = "a+" # append or overwrite 

1241 log_file: Optional[str] = None # Only used if log_to_file=True 

1242 log_folder: Optional[str] = None # Only used if log_to_file=True 

1243 

1244 # Log Rotation (optional - only used if log_to_file=True) 

1245 log_rotation_enabled: bool = False # Enable log file rotation 

1246 log_max_size_mb: int = 1 # Max file size in MB before rotation (default: 1MB) 

1247 log_backup_count: int = 5 # Number of backup files to keep (default: 5) 

1248 

1249 # Detailed Request Logging Configuration 

1250 log_detailed_max_body_size: int = Field( 

1251 default=16384, # 16KB - sensible default for request body logging 

1252 ge=1024, 

1253 le=1048576, # Max 1MB 

1254 description="Maximum request body size to log in detailed mode (bytes). Separate from log_max_size_mb which is for file rotation.", 

1255 ) 

1256 

1257 # Optional: endpoints to skip for detailed request logging (prefix match) 

1258 log_detailed_skip_endpoints: List[str] = Field( 

1259 default_factory=list, 

1260 description="List of path prefixes to skip when log_detailed_requests is enabled", 

1261 ) 

1262 

1263 # Whether to attempt resolving user identity via DB fallback when logging. 

1264 # Keep default False to avoid implicit DB queries during normal request handling. 

1265 log_resolve_user_identity: bool = Field( 

1266 default=False, 

1267 description="If true, RequestLoggingMiddleware will attempt DB fallback to resolve user identity when needed", 

1268 ) 

1269 

1270 # Sampling rate for detailed request logging (0.0-1.0). Applied when log_detailed_requests is enabled. 

1271 log_detailed_sample_rate: float = Field( 

1272 default=1.0, 

1273 ge=0.0, 

1274 le=1.0, 

1275 description="Fraction of requests to sample for detailed logging (0.0-1.0)", 

1276 ) 

1277 

1278 # Log Buffer (for in-memory storage in admin UI) 

1279 log_buffer_size_mb: float = 1.0 # Size of in-memory log buffer in MB 

1280 

1281 # =================================== 

1282 # Observability Configuration 

1283 # =================================== 

1284 

1285 # Enable observability features (traces, spans, metrics) 

1286 observability_enabled: bool = Field(default=False, description="Enable observability tracing and metrics collection") 

1287 

1288 # Automatic HTTP request tracing 

1289 observability_trace_http_requests: bool = Field(default=True, description="Automatically trace HTTP requests") 

1290 

1291 # Trace retention period (days) 

1292 observability_trace_retention_days: int = Field(default=7, ge=1, description="Number of days to retain trace data") 

1293 

1294 # Maximum traces to store (prevents unbounded growth) 

1295 observability_max_traces: int = Field(default=100000, ge=1000, description="Maximum number of traces to retain") 

1296 

1297 # Sample rate (0.0 to 1.0) - 1.0 means trace everything 

1298 observability_sample_rate: float = Field(default=1.0, ge=0.0, le=1.0, description="Trace sampling rate (0.0-1.0)") 

1299 

1300 # Include paths for tracing (regex patterns) 

1301 observability_include_paths: List[str] = Field( 

1302 default_factory=lambda: [ 

1303 r"^/rpc/?$", 

1304 r"^/sse$", 

1305 r"^/message$", 

1306 r"^/mcp(?:/|$)", 

1307 r"^/servers/[^/]+/mcp/?$", 

1308 r"^/servers/[^/]+/sse$", 

1309 r"^/servers/[^/]+/message$", 

1310 r"^/a2a(?:/|$)", 

1311 ], 

1312 description="Regex patterns to include for tracing (when empty, all paths are eligible before excludes)", 

1313 ) 

1314 

1315 # Exclude paths from tracing (regex patterns) 

1316 observability_exclude_paths: List[str] = Field( 

1317 default_factory=lambda: ["/health", "/healthz", "/ready", "/metrics", "/static/.*"], 

1318 description="Regex patterns to exclude from tracing (applies after include patterns)", 

1319 ) 

1320 

1321 # Enable performance metrics 

1322 observability_metrics_enabled: bool = Field(default=True, description="Enable metrics collection") 

1323 

1324 # Enable span events 

1325 observability_events_enabled: bool = Field(default=True, description="Enable event logging within spans") 

1326 

1327 # Correlation ID Settings 

1328 correlation_id_enabled: bool = Field(default=True, description="Enable automatic correlation ID tracking for requests") 

1329 correlation_id_header: str = Field(default="X-Correlation-ID", description="HTTP header name for correlation ID") 

1330 correlation_id_preserve: bool = Field(default=True, description="Preserve correlation IDs from incoming requests") 

1331 correlation_id_response_header: bool = Field(default=True, description="Include correlation ID in response headers") 

1332 

1333 # =================================== 

1334 # Database Query Logging (N+1 Detection) 

1335 # =================================== 

1336 db_query_log_enabled: bool = Field(default=False, description="Enable database query logging to file (for N+1 detection)") 

1337 db_query_log_file: str = Field(default="logs/db-queries.log", description="Path to database query log file") 

1338 db_query_log_json_file: str = Field(default="logs/db-queries.jsonl", description="Path to JSON Lines query log file") 

1339 db_query_log_format: str = Field(default="both", description="Log format: 'json', 'text', or 'both'") 

1340 db_query_log_min_queries: int = Field(default=1, ge=1, description="Only log requests with >= N queries") 

1341 db_query_log_include_params: bool = Field(default=False, description="Include query parameters (may expose sensitive data)") 

1342 db_query_log_detect_n1: bool = Field(default=True, description="Automatically detect and flag N+1 query patterns") 

1343 db_query_log_n1_threshold: int = Field(default=3, ge=2, description="Number of similar queries to flag as potential N+1") 

1344 

1345 # Structured Logging Configuration 

1346 structured_logging_enabled: bool = Field(default=True, description="Enable structured JSON logging with database persistence") 

1347 structured_logging_database_enabled: bool = Field(default=False, description="Persist structured logs to database (enables /api/logs/* endpoints, impacts performance)") 

1348 structured_logging_external_enabled: bool = Field(default=False, description="Send logs to external systems") 

1349 

1350 # Performance Tracking Configuration 

1351 performance_tracking_enabled: bool = Field(default=True, description="Enable performance tracking and metrics") 

1352 performance_threshold_database_query_ms: float = Field(default=100.0, description="Alert threshold for database queries (ms)") 

1353 performance_threshold_tool_invocation_ms: float = Field(default=2000.0, description="Alert threshold for tool invocations (ms)") 

1354 performance_threshold_resource_read_ms: float = Field(default=1000.0, description="Alert threshold for resource reads (ms)") 

1355 performance_threshold_http_request_ms: float = Field(default=500.0, description="Alert threshold for HTTP requests (ms)") 

1356 performance_degradation_multiplier: float = Field(default=1.5, description="Alert if performance degrades by this multiplier vs baseline") 

1357 

1358 # Audit Trail Configuration 

1359 # Audit trail logging is disabled by default for performance. 

1360 # When enabled, it logs all CRUD operations (create, read, update, delete) on resources. 

1361 # WARNING: This causes a database write on every API request and can cause significant load. 

1362 audit_trail_enabled: bool = Field(default=False, description="Enable audit trail logging to database for compliance") 

1363 permission_audit_enabled: bool = Field( 

1364 default=False, 

1365 description="Enable permission audit logging for RBAC checks (writes a row per permission check)", 

1366 ) 

1367 

1368 # Security Logging Configuration 

1369 # Security event logging is disabled by default for performance. 

1370 # When enabled, it logs authentication attempts, authorization failures, and security events. 

1371 # WARNING: "all" level logs every request and can cause significant database write load. 

1372 security_logging_enabled: bool = Field(default=False, description="Enable security event logging to database") 

1373 security_logging_level: Literal["all", "failures_only", "high_severity"] = Field( 

1374 default="failures_only", 

1375 description=( 

1376 "Security logging level: " 

1377 "'all' = log all events including successful auth (high DB load), " 

1378 "'failures_only' = log only authentication/authorization failures, " 

1379 "'high_severity' = log only high/critical severity events" 

1380 ), 

1381 ) 

1382 security_failed_auth_threshold: int = Field(default=5, description="Failed auth attempts before high severity alert") 

1383 security_threat_score_alert: float = Field(default=0.7, description="Threat score threshold for alerts (0.0-1.0)") 

1384 security_rate_limit_window_minutes: int = Field(default=5, description="Time window for rate limit checks (minutes)") 

1385 

1386 # API Token Tracking Configuration 

1387 # Controls how token usage and last_used timestamps are tracked 

1388 token_usage_logging_enabled: bool = Field(default=True, description="Enable API token usage logging middleware") 

1389 token_last_used_update_interval_minutes: int = Field(default=5, ge=1, le=1440, description="Minimum minutes between last_used timestamp updates (rate-limits DB writes)") 

1390 

1391 # Metrics Aggregation Configuration 

1392 metrics_aggregation_enabled: bool = Field(default=True, description="Enable automatic log aggregation into performance metrics") 

1393 metrics_aggregation_backfill_hours: int = Field(default=6, ge=0, le=168, description="Hours of structured logs to backfill into performance metrics on startup") 

1394 metrics_aggregation_window_minutes: int = Field(default=5, description="Time window for metrics aggregation (minutes)") 

1395 metrics_aggregation_auto_start: bool = Field(default=False, description="Automatically run the log aggregation loop on application startup") 

1396 yield_batch_size: int = Field( 

1397 default=1000, 

1398 ge=100, 

1399 le=100000, 

1400 description="Number of rows fetched per batch when streaming hourly metric data from the database. " 

1401 "Used to limit memory usage during aggregation and percentile calculations. " 

1402 "Smaller values reduce memory footprint but increase DB round-trips; larger values improve throughput " 

1403 "at the cost of higher memory usage.", 

1404 ) 

1405 

1406 # Execution Metrics Recording 

1407 # Controls whether tool/resource/prompt/server/A2A execution metrics are written to the database. 

1408 # Disable if using external observability (ELK, Datadog, Splunk). 

1409 # Note: Does NOT affect log aggregation (METRICS_AGGREGATION_ENABLED) or Prometheus (ENABLE_METRICS). 

1410 db_metrics_recording_enabled: bool = Field( 

1411 default=True, description="Enable recording of execution metrics (tool/resource/prompt/server/A2A) to database. Disable if using external observability." 

1412 ) 

1413 

1414 # Metrics Buffer Configuration (for batching tool/resource/prompt metrics writes) 

1415 metrics_buffer_enabled: bool = Field(default=True, description="Enable buffered metrics writes (reduces DB pressure under load)") 

1416 metrics_buffer_flush_interval: int = Field(default=60, ge=5, le=300, description="Seconds between automatic metrics buffer flushes") 

1417 metrics_buffer_max_size: int = Field(default=1000, ge=100, le=10000, description="Maximum buffered metrics before forced flush") 

1418 

1419 # Metrics Cache Configuration (for caching aggregate metrics queries) 

1420 metrics_cache_enabled: bool = Field(default=True, description="Enable in-memory caching for aggregate metrics queries") 

1421 metrics_cache_ttl_seconds: int = Field(default=60, ge=1, le=300, description="TTL for cached aggregate metrics in seconds") 

1422 

1423 # Metrics Cleanup Configuration (automatic deletion of old metrics) 

1424 metrics_cleanup_enabled: bool = Field(default=True, description="Enable automatic cleanup of old metrics data") 

1425 metrics_retention_days: int = Field(default=7, ge=1, le=365, description="Days to retain raw metrics before cleanup (fallback when rollup disabled)") 

1426 metrics_cleanup_interval_hours: int = Field(default=1, ge=1, le=168, description="Hours between automatic cleanup runs") 

1427 metrics_cleanup_batch_size: int = Field(default=10000, ge=100, le=100000, description="Batch size for metrics deletion (prevents long locks)") 

1428 

1429 # Metrics Rollup Configuration (hourly aggregation for historical queries) 

1430 metrics_rollup_enabled: bool = Field(default=True, description="Enable hourly metrics rollup for efficient historical queries") 

1431 metrics_rollup_interval_hours: int = Field(default=1, ge=1, le=24, description="Hours between rollup runs") 

1432 metrics_rollup_retention_days: int = Field(default=365, ge=30, le=3650, description="Days to retain hourly rollup data") 

1433 metrics_rollup_late_data_hours: int = Field( 

1434 default=1, ge=1, le=48, description="Hours to re-process on each run to catch late-arriving data (smaller = less CPU, larger = more tolerance for delayed metrics)" 

1435 ) 

1436 metrics_delete_raw_after_rollup: bool = Field(default=True, description="Delete raw metrics after hourly rollup exists (recommended for production)") 

1437 metrics_delete_raw_after_rollup_hours: int = Field(default=1, ge=1, le=8760, description="Hours to retain raw metrics when hourly rollup exists") 

1438 

1439 # Auth Cache Configuration (reduces DB queries during authentication) 

1440 auth_cache_enabled: bool = Field(default=True, description="Enable Redis/in-memory caching for authentication data (user, team, revocation)") 

1441 auth_cache_user_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for cached user data") 

1442 auth_cache_revocation_ttl: int = Field(default=30, ge=5, le=120, description="TTL in seconds for token revocation cache (security-critical, keep short)") 

1443 auth_cache_team_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for team membership cache") 

1444 auth_cache_role_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user role in team cache") 

1445 auth_cache_teams_enabled: bool = Field(default=True, description="Enable caching for get_user_teams() (default: true)") 

1446 auth_cache_teams_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for user teams list cache") 

1447 auth_cache_batch_queries: bool = Field(default=True, description="Batch auth DB queries into single call (reduces 3 queries to 1)") 

1448 

1449 # Registry Cache Configuration (reduces DB queries for list endpoints) 

1450 registry_cache_enabled: bool = Field(default=True, description="Enable caching for registry list endpoints (tools, prompts, resources, etc.)") 

1451 registry_cache_tools_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for tools list cache") 

1452 registry_cache_prompts_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for prompts list cache") 

1453 registry_cache_resources_ttl: int = Field(default=15, ge=5, le=300, description="TTL in seconds for resources list cache") 

1454 registry_cache_agents_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for agents list cache") 

1455 registry_cache_servers_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for servers list cache") 

1456 registry_cache_gateways_ttl: int = Field(default=20, ge=5, le=300, description="TTL in seconds for gateways list cache") 

1457 registry_cache_catalog_ttl: int = Field(default=300, ge=60, le=600, description="TTL in seconds for catalog servers list cache (external catalog, changes infrequently)") 

1458 

1459 # Tool Lookup Cache Configuration (reduces hot-path DB lookups in invoke_tool) 

1460 tool_lookup_cache_enabled: bool = Field(default=True, description="Enable tool lookup cache (tool name -> tool config)") 

1461 tool_lookup_cache_ttl_seconds: int = Field(default=60, ge=5, le=600, description="TTL in seconds for tool lookup cache entries") 

1462 tool_lookup_cache_negative_ttl_seconds: int = Field(default=10, ge=1, le=60, description="TTL in seconds for negative tool lookup cache entries") 

1463 tool_lookup_cache_l1_maxsize: int = Field(default=10000, ge=100, le=1000000, description="Max entries for in-memory tool lookup cache (L1)") 

1464 tool_lookup_cache_l2_enabled: bool = Field(default=True, description="Enable Redis-backed tool lookup cache (L2) when cache_type=redis") 

1465 

1466 # Admin Stats Cache Configuration (reduces dashboard query overhead) 

1467 admin_stats_cache_enabled: bool = Field(default=True, description="Enable caching for admin dashboard statistics") 

1468 admin_stats_cache_system_ttl: int = Field(default=60, ge=10, le=300, description="TTL in seconds for system stats cache") 

1469 admin_stats_cache_observability_ttl: int = Field(default=30, ge=10, le=120, description="TTL in seconds for observability stats cache") 

1470 admin_stats_cache_tags_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for tags listing cache") 

1471 admin_stats_cache_plugins_ttl: int = Field(default=120, ge=30, le=600, description="TTL in seconds for plugin stats cache") 

1472 admin_stats_cache_performance_ttl: int = Field(default=60, ge=15, le=300, description="TTL in seconds for performance aggregates cache") 

1473 

1474 # Team Member Count Cache Configuration (reduces N+1 queries in admin UI) 

1475 team_member_count_cache_enabled: bool = Field(default=True, description="Enable Redis caching for team member counts") 

1476 team_member_count_cache_ttl: int = Field(default=300, ge=30, le=3600, description="TTL in seconds for team member count cache (default: 5 minutes)") 

1477 

1478 # Log Search Configuration 

1479 log_search_max_results: int = Field(default=1000, description="Maximum results per log search query") 

1480 log_retention_days: int = Field(default=30, description="Number of days to retain logs in database") 

1481 

1482 # External Log Integration Configuration 

1483 elasticsearch_enabled: bool = Field(default=False, description="Send logs to Elasticsearch") 

1484 elasticsearch_url: Optional[str] = Field(default=None, description="Elasticsearch cluster URL") 

1485 elasticsearch_index_prefix: str = Field(default="mcpgateway-logs", description="Elasticsearch index prefix") 

1486 syslog_enabled: bool = Field(default=False, description="Send logs to syslog") 

1487 syslog_host: Optional[str] = Field(default=None, description="Syslog server host") 

1488 syslog_port: int = Field(default=514, description="Syslog server port") 

1489 webhook_logging_enabled: bool = Field(default=False, description="Send logs to webhook endpoints") 

1490 webhook_logging_urls: List[str] = Field(default_factory=list, description="Webhook URLs for log delivery") 

1491 

1492 @field_validator("log_level", mode="before") 

1493 @classmethod 

1494 def validate_log_level(cls, v: str) -> str: 

1495 """ 

1496 Normalize and validate the log level value. 

1497 

1498 Ensures that the input string matches one of the allowed log levels, 

1499 case-insensitively. The value is uppercased before validation so that 

1500 "debug", "Debug", etc. are all accepted as "DEBUG". 

1501 

1502 Args: 

1503 v (str): The log level string provided via configuration or environment. 

1504 

1505 Returns: 

1506 str: The validated and normalized (uppercase) log level. 

1507 

1508 Raises: 

1509 ValueError: If the provided value is not one of 

1510 {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}. 

1511 """ 

1512 allowed = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} 

1513 v_up = v.upper() 

1514 if v_up not in allowed: 

1515 raise ValueError(f"Invalid log_level: {v}") 

1516 return v_up 

1517 

1518 # Transport 

1519 mcpgateway_ws_relay_enabled: bool = Field(default=False, description="Enable WebSocket JSON-RPC relay endpoint at /ws") 

1520 mcpgateway_reverse_proxy_enabled: bool = Field(default=False, description="Enable reverse-proxy transport endpoints under /reverse-proxy/*") 

1521 transport_type: str = "all" # http, ws, sse, all 

1522 websocket_ping_interval: int = 30 # seconds 

1523 sse_retry_timeout: int = 5000 # milliseconds - client retry interval on disconnect 

1524 sse_keepalive_enabled: bool = True # Enable SSE keepalive events 

1525 sse_keepalive_interval: int = 30 # seconds between keepalive events 

1526 sse_send_timeout: float = 30.0 # seconds - timeout for ASGI send() calls, protects against hung connections 

1527 sse_rapid_yield_window_ms: int = 1000 # milliseconds - time window for rapid yield detection 

1528 sse_rapid_yield_max: int = 50 # max yields per window before assuming client disconnected (0=disabled) 

1529 

1530 # Gateway/Server Connection Timeout 

1531 # Timeout in seconds for HTTP requests to registered gateways and MCP servers. 

1532 # Used by: GatewayService, ToolService, ServerService for health checks and tool invocations. 

1533 # Note: Previously part of federation settings, retained for gateway connectivity. 

1534 federation_timeout: int = 120 

1535 

1536 # SSO 

1537 # For sso_issuers strip out quotes to ensure we're passing valid JSON via env 

1538 sso_issuers: Optional[list[HttpUrl]] = Field(default=None) 

1539 

1540 @field_validator("sso_issuers", mode="before") 

1541 @classmethod 

1542 def parse_issuers(cls, v: Any) -> list[str]: 

1543 """ 

1544 Parse and validate the SSO issuers configuration value. 

1545 

1546 Accepts: 

1547 - JSON array string: '["https://idp1.com", "https://idp2.com"]' 

1548 - Comma-separated string: "https://idp1.com, https://idp2.com" 

1549 - Empty string or None → [] 

1550 - Already-parsed list 

1551 

1552 Args: 

1553 v: The input value to parse. 

1554 

1555 Returns: 

1556 list[str]: Parsed list of issuer URLs. 

1557 

1558 Raises: 

1559 ValueError: If the input is not a valid format. 

1560 """ 

1561 if v is None: 

1562 return [] 

1563 if isinstance(v, list): 

1564 return v 

1565 if isinstance(v, str): 

1566 s = v.strip() 

1567 if not s: 

1568 return [] 

1569 if s.startswith("["): 

1570 try: 

1571 parsed = orjson.loads(s) 

1572 return parsed if isinstance(parsed, list) else [] 

1573 except orjson.JSONDecodeError: 

1574 raise ValueError(f"Invalid JSON for SSO_ISSUERS: {v!r}") 

1575 # Fallback to comma-separated parsing 

1576 return [item.strip() for item in s.split(",") if item.strip()] 

1577 raise ValueError("Invalid type for SSO_ISSUERS") 

1578 

1579 # Resources 

1580 resource_cache_size: int = 1000 

1581 resource_cache_ttl: int = 3600 # seconds 

1582 max_resource_size: int = 10 * 1024 * 1024 # 10MB 

1583 allowed_mime_types: Set[str] = { 

1584 "text/plain", 

1585 "text/markdown", 

1586 "text/html", 

1587 "application/json", 

1588 "application/xml", 

1589 "image/png", 

1590 "image/jpeg", 

1591 "image/gif", 

1592 } 

1593 

1594 # Tools 

1595 tool_timeout: int = 60 # seconds 

1596 max_tool_retries: int = 3 

1597 tool_rate_limit: int = 100 # requests per minute 

1598 tool_concurrent_limit: int = 10 

1599 rest_response_text_max_length: int = Field( 

1600 default=5000, 

1601 ge=1000, 

1602 le=100000, 

1603 description="Maximum length of response text to return for non-JSON REST API responses. " 

1604 "Longer responses are truncated to prevent exposing excessive sensitive data. " 

1605 "Default: 5000 characters. Range: 1000-100000.", 

1606 ) 

1607 

1608 # Content Security - Size Limits 

1609 content_max_resource_size: int = Field(default=102400, ge=1024, le=10485760, description="Maximum size in bytes for resource content (default: 100KB)") # 100KB # Minimum 1KB # Maximum 10MB 

1610 content_max_prompt_size: int = Field(default=10240, ge=512, le=1048576, description="Maximum size in bytes for prompt templates (default: 10KB)") # 10KB # Minimum 512 bytes # Maximum 1MB 

1611 

1612 # Content Security - MIME Type Restrictions (US-2) 

1613 content_allowed_resource_mimetypes: List[str] = Field( 

1614 default_factory=lambda: [ 

1615 "text/plain", 

1616 "text/markdown", 

1617 "text/html", 

1618 "text/csv", 

1619 "application/json", 

1620 "application/xml", 

1621 "application/yaml", 

1622 "application/pdf", 

1623 "image/png", 

1624 "image/jpeg", 

1625 "image/gif", 

1626 "image/svg+xml", 

1627 "image/webp", 

1628 "audio/mpeg", 

1629 "audio/wav", 

1630 "video/mp4", 

1631 "video/webm", 

1632 ], 

1633 description="Allowed MIME types for resources. In strict mode, only types explicitly listed here are accepted. Vendor types (application/x-*, text/x-*) and suffix types (+json, +xml) must be explicitly added if needed.", 

1634 ) 

1635 content_strict_mime_validation: bool = Field( 

1636 default=False, 

1637 description="Enable strict MIME type validation for resources (US-2). Set to false to log violations without blocking.", 

1638 ) 

1639 

1640 # MCP Session Pool - reduces per-request latency from ~20ms to ~1-2ms 

1641 # Disabled by default for safety. Enable explicitly in production after testing. 

1642 mcp_session_pool_enabled: bool = False 

1643 mcp_session_pool_max_per_key: int = 10 # Max sessions per (URL, identity, transport) 

1644 mcp_session_pool_ttl: float = 300.0 # Session TTL in seconds 

1645 mcp_session_pool_health_check_interval: float = 60.0 # Idle time before health check (aligned with health_check_interval) 

1646 mcp_session_pool_acquire_timeout: float = 30.0 # Timeout waiting for session slot 

1647 mcp_session_pool_create_timeout: float = 30.0 # Timeout creating new session 

1648 mcp_session_pool_circuit_breaker_threshold: int = 5 # Failures before circuit opens 

1649 mcp_session_pool_circuit_breaker_reset: float = 60.0 # Seconds before circuit resets 

1650 mcp_session_pool_idle_eviction: float = 600.0 # Evict idle pool keys after this time 

1651 # Transport timeout for pooled sessions (default 30s to match MCP SDK default). 

1652 # This timeout applies to all HTTP operations (connect, read, write) on pooled sessions. 

1653 # Use a higher value for deployments with long-running tool calls. 

1654 mcp_session_pool_transport_timeout: float = 30.0 

1655 # Force explicit RPC (list_tools) on gateway health checks even when session is fresh. 

1656 # Off by default: pool's internal staleness check (idle > health_check_interval) handles this. 

1657 # Enable for stricter health verification at the cost of ~5ms latency per check. 

1658 mcp_session_pool_explicit_health_rpc: bool = False 

1659 # Configurable health check chain - ordered list of methods to try. 

1660 # Options: ping, list_tools, list_prompts, list_resources, skip 

1661 # Default: ping,skip - try lightweight ping, skip if unsupported (for legacy servers) 

1662 mcp_session_pool_health_check_methods: List[str] = ["ping", "skip"] 

1663 # Timeout in seconds for each health check attempt 

1664 mcp_session_pool_health_check_timeout: float = 5.0 

1665 mcp_session_pool_identity_headers: List[str] = ["authorization", "x-tenant-id", "x-user-id", "x-api-key", "cookie", "x-mcp-session-id"] 

1666 # Timeout for session/transport cleanup operations (__aexit__ calls). 

1667 # This prevents CPU spin loops when internal tasks (like post_writer waiting on 

1668 # memory streams) don't respond to cancellation. Does NOT affect tool execution 

1669 # time - only cleanup of idle/released sessions. Increase if you see frequent 

1670 # "cleanup timed out" warnings; decrease for faster recovery from spin loops. 

1671 mcp_session_pool_cleanup_timeout: float = 5.0 

1672 

1673 # Timeout for SSE task group cleanup (seconds). 

1674 # When an SSE connection is cancelled, this controls how long to wait for 

1675 # internal tasks to respond before forcing cleanup. Shorter values reduce 

1676 # CPU waste during anyio _deliver_cancellation spin loops but may interrupt 

1677 # legitimate cleanup. Only affects cancelled connections, not normal operation. 

1678 # See: https://github.com/agronholm/anyio/issues/695 

1679 sse_task_group_cleanup_timeout: float = 5.0 

1680 

1681 # ========================================================================= 

1682 # EXPERIMENTAL: anyio _deliver_cancellation spin loop workaround 

1683 # ========================================================================= 

1684 # When enabled, monkey-patches anyio's CancelScope._deliver_cancellation to 

1685 # limit the number of retry iterations. This prevents 100% CPU spin loops 

1686 # when tasks don't respond to CancelledError (anyio issue #695). 

1687 # 

1688 # WARNING: This is a workaround for an upstream issue. May be removed when 

1689 # anyio or MCP SDK fix the underlying problem. Enable only if you experience 

1690 # CPU spin loops during SSE/MCP connection cleanup. 

1691 # 

1692 # Trade-offs when enabled: 

1693 # - Prevents indefinite CPU spin (good) 

1694 # - May leave some tasks uncancelled after max iterations (usually harmless) 

1695 # - Worker recycling (GUNICORN_MAX_REQUESTS) cleans up orphaned tasks 

1696 # 

1697 # See: https://github.com/agronholm/anyio/issues/695 

1698 # Env: ANYIO_CANCEL_DELIVERY_PATCH_ENABLED 

1699 anyio_cancel_delivery_patch_enabled: bool = False 

1700 

1701 # Maximum iterations for _deliver_cancellation before giving up. 

1702 # Only used when anyio_cancel_delivery_patch_enabled=True. 

1703 # Higher values = more attempts to cancel tasks, but longer potential spin. 

1704 # Lower values = faster recovery, but more orphaned tasks. 

1705 # Env: ANYIO_CANCEL_DELIVERY_MAX_ITERATIONS 

1706 anyio_cancel_delivery_max_iterations: int = 100 

1707 

1708 # Session Affinity 

1709 mcpgateway_session_affinity_enabled: bool = False # Global session affinity toggle 

1710 mcpgateway_session_affinity_ttl: int = 300 # Session affinity binding TTL 

1711 mcpgateway_session_affinity_max_sessions: int = 1 # Max sessions per identity for affinity 

1712 mcpgateway_pool_rpc_forward_timeout: int = 30 # Timeout for forwarding RPC requests to owner worker 

1713 

1714 # Prompts 

1715 prompt_cache_size: int = 100 

1716 max_prompt_size: int = 100 * 1024 # 100KB 

1717 prompt_render_timeout: int = 10 # seconds 

1718 

1719 # Health Checks 

1720 # Interval in seconds between health checks (aligned with mcp_session_pool_health_check_interval) 

1721 health_check_interval: int = 60 

1722 # Timeout in seconds for each health check request 

1723 health_check_timeout: int = 5 

1724 # Per-check timeout (seconds) to bound total time of one gateway health check 

1725 # Env: GATEWAY_HEALTH_CHECK_TIMEOUT 

1726 gateway_health_check_timeout: float = 5.0 

1727 # Consecutive failures before marking gateway offline 

1728 unhealthy_threshold: int = 3 

1729 # Max concurrent health checks per worker 

1730 max_concurrent_health_checks: int = 10 

1731 

1732 # Auto-refresh tools/resources/prompts from gateways during health checks 

1733 # When enabled, tools/resources/prompts are fetched and synced with DB during health checks 

1734 auto_refresh_servers: bool = Field(default=False, description="Enable automatic tool/resource/prompt refresh during gateway health checks") 

1735 

1736 # Per-gateway refresh configuration (used when auto_refresh_servers is True) 

1737 # Gateways can override this with their own refresh_interval_seconds 

1738 gateway_auto_refresh_interval: int = Field(default=300, ge=60, description="Default refresh interval in seconds for gateway tools/resources/prompts sync (minimum 60 seconds)") 

1739 

1740 # Hot/Cold Server Classification 

1741 # Classify servers by usage (hot = active sessions, cold = inactive) for optimized polling 

1742 # Poll intervals auto-derived: hot = gateway_auto_refresh_interval (1x), cold = 3x 

1743 # Classification refresh uses gateway_auto_refresh_interval (no separate config needed) 

1744 hot_cold_classification_enabled: bool = Field(default=False, description="Enable hot/cold server classification for staggered polling (requires Redis for multi-worker)") 

1745 

1746 # Validation Gateway URL 

1747 gateway_validation_timeout: int = 5 # seconds 

1748 gateway_max_redirects: int = 5 

1749 

1750 filelock_name: str = "gateway_service_leader.lock" 

1751 

1752 # Default Roots 

1753 default_roots: List[str] = [] 

1754 

1755 # Database 

1756 db_driver: str = "postgresql+psycopg" 

1757 db_pool_size: int = 200 

1758 db_max_overflow: int = 10 

1759 db_pool_timeout: int = 30 

1760 db_pool_recycle: int = 3600 

1761 db_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total) 

1762 db_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter 

1763 db_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap) 

1764 

1765 # Database Performance Optimization 

1766 use_postgresdb_percentiles: bool = Field( 

1767 default=True, 

1768 description="Use database-native percentile functions (percentile_cont) for performance metrics. " 

1769 "When enabled, PostgreSQL uses native SQL percentile calculations (5-10x faster). " 

1770 "When disabled or using SQLite, falls back to Python-based percentile calculations. " 

1771 "Recommended: true for PostgreSQL, auto-detected for SQLite.", 

1772 ) 

1773 

1774 # psycopg3-specific: Number of times a query must be executed before it's 

1775 # prepared server-side. Set to 0 to disable, 1 to prepare immediately. 

1776 # Default of 5 balances memory usage with query performance. 

1777 db_prepare_threshold: int = Field(default=5, ge=0, le=100, description="psycopg3 prepare_threshold for auto-prepared statements") 

1778 

1779 # Connection pool class: "auto" (default), "null", or "queue" 

1780 # - "auto": Uses NullPool when PgBouncer detected, QueuePool otherwise 

1781 # - "null": Always use NullPool (recommended with PgBouncer - lets PgBouncer handle pooling) 

1782 # - "queue": Always use QueuePool (application-side pooling) 

1783 db_pool_class: Literal["auto", "null", "queue"] = Field( 

1784 default="auto", 

1785 description="Connection pool class: auto (NullPool with PgBouncer), null, or queue", 

1786 ) 

1787 

1788 # Pre-ping connections before checkout (validates connection is alive) 

1789 # - "auto": Enabled for non-PgBouncer, disabled for PgBouncer (default) 

1790 # - "true": Always enable (adds SELECT 1 overhead but catches stale connections) 

1791 # - "false": Always disable 

1792 db_pool_pre_ping: Literal["auto", "true", "false"] = Field( 

1793 default="auto", 

1794 description="Pre-ping connections: auto, true, or false", 

1795 ) 

1796 

1797 # SQLite busy timeout: Maximum time (ms) SQLite will wait to acquire a database lock before returning SQLITE_BUSY. 

1798 db_sqlite_busy_timeout: int = Field(default=5000, ge=1000, le=60000, description="SQLite busy timeout in milliseconds (default: 5000ms)") 

1799 

1800 # Cache 

1801 cache_type: Literal["redis", "memory", "none", "database"] = "database" # memory or redis or database 

1802 redis_url: Optional[str] = "redis://localhost:6379/0" 

1803 cache_prefix: str = "mcpgw:" 

1804 session_ttl: int = 3600 

1805 message_ttl: int = 600 

1806 redis_max_retries: int = 30 # Max attempts with exponential backoff (≈5 min total) 

1807 redis_retry_interval_ms: int = 2000 # Base interval; doubles each attempt, ±25% jitter 

1808 redis_max_backoff_seconds: int = 30 # Cap for exponential backoff (jitter applied after cap) 

1809 

1810 # GlobalConfig In-Memory Cache (Issue #1715) 

1811 # Caches GlobalConfig (passthrough headers) to eliminate redundant DB queries 

1812 global_config_cache_ttl: int = Field( 

1813 default=60, 

1814 ge=5, 

1815 le=3600, 

1816 description="TTL in seconds for GlobalConfig in-memory cache (default: 60)", 

1817 ) 

1818 

1819 # A2A Stats In-Memory Cache 

1820 # Caches A2A agent counts (total, active) to eliminate redundant COUNT queries 

1821 a2a_stats_cache_ttl: int = Field( 

1822 default=30, 

1823 ge=5, 

1824 le=3600, 

1825 description="TTL in seconds for A2A stats in-memory cache (default: 30)", 

1826 ) 

1827 

1828 # Redis Parser Configuration (ADR-026) 

1829 # hiredis C parser provides up to 83x faster response parsing for large responses 

1830 redis_parser: Literal["auto", "hiredis", "python"] = Field( 

1831 default="auto", 

1832 description="Redis protocol parser: auto (use hiredis if available), hiredis (require hiredis), python (pure-Python)", 

1833 ) 

1834 

1835 # Redis Connection Pool - Performance Optimized 

1836 redis_decode_responses: bool = Field(default=True, description="Return strings instead of bytes") 

1837 redis_max_connections: int = Field(default=50, description="Connection pool size per worker") 

1838 redis_socket_timeout: float = Field(default=2.0, description="Socket read/write timeout in seconds") 

1839 redis_socket_connect_timeout: float = Field(default=2.0, description="Connection timeout in seconds") 

1840 redis_retry_on_timeout: bool = Field(default=True, description="Retry commands on timeout") 

1841 redis_health_check_interval: int = Field(default=30, description="Seconds between connection health checks (0=disabled)") 

1842 

1843 # Redis Leader Election - Multi-Node Deployments 

1844 redis_leader_ttl: int = Field(default=15, description="Leader election TTL in seconds") 

1845 redis_leader_key: str = Field(default="gateway_service_leader", description="Leader key name") 

1846 redis_leader_heartbeat_interval: int = Field(default=5, description="Seconds between leader heartbeats") 

1847 

1848 # streamable http transport 

1849 use_stateful_sessions: bool = False # Set to False to use stateless sessions without event store 

1850 json_response_enabled: bool = True # Enable JSON responses instead of SSE streams 

1851 streamable_http_max_events_per_stream: int = 100 # Ring buffer capacity per stream 

1852 streamable_http_event_ttl: int = 3600 # Event stream TTL in seconds (1 hour) 

1853 

1854 # Development 

1855 dev_mode: bool = False 

1856 reload: bool = False 

1857 debug: bool = False 

1858 

1859 # Observability (OpenTelemetry) 

1860 deployment_env: str = Field(default="development", validation_alias=AliasChoices("DEPLOYMENT_ENV", "ENVIRONMENT"), description="Deployment environment label") 

1861 otel_enable_observability: bool = Field(default=False, description="Enable OpenTelemetry observability") 

1862 otel_traces_exporter: str = Field(default="otlp", description="Traces exporter: otlp, jaeger, zipkin, console, none") 

1863 otel_exporter_otlp_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint (e.g., http://localhost:4317)") 

1864 otel_exporter_otlp_protocol: str = Field(default="grpc", description="OTLP protocol: grpc or http") 

1865 otel_exporter_otlp_insecure: bool = Field(default=True, description="Use insecure connection for OTLP") 

1866 otel_exporter_otlp_headers: Optional[str] = Field(default=None, description="OTLP headers (comma-separated key=value)") 

1867 otel_emit_langfuse_attributes: Optional[bool] = Field( 

1868 default=None, 

1869 description="Emit Langfuse-specific span attributes. Defaults to auto-enable when a Langfuse OTLP endpoint is configured.", 

1870 ) 

1871 otel_capture_identity_attributes: Optional[bool] = Field( 

1872 default=None, 

1873 description="Capture user/team identity span attributes. Defaults to auto-enable when Langfuse-specific attributes are emitted.", 

1874 ) 

1875 otel_copy_resource_attrs_to_spans: bool = Field(default=False, description="Copy selected OTEL resource attributes onto spans") 

1876 otel_redact_fields: str = Field( 

1877 default="password,secret,token,api_key,authorization,credential,auth_value,access_token,refresh_token,auth_token,client_secret,cookie,set-cookie,private_key,session_id,sessionid", 

1878 description="Comma-separated trace payload field names to redact before export", 

1879 ) 

1880 otel_max_trace_payload_size: int = Field(default=32768, ge=256, description="Maximum serialized trace payload size in characters") 

1881 otel_capture_input_spans: str = Field(default="", description="Comma-separated span names allowed to capture input payloads") 

1882 otel_capture_output_spans: str = Field(default="", description="Comma-separated span names allowed to capture output payloads") 

1883 langfuse_otel_endpoint: Optional[str] = Field(default=None, description="Langfuse OTLP/HTTP endpoint override") 

1884 langfuse_public_key: Optional[SecretStr] = Field(default=None, description="Langfuse project public key for derived OTLP auth") 

1885 langfuse_secret_key: Optional[SecretStr] = Field(default=None, description="Langfuse project secret key for derived OTLP auth") 

1886 langfuse_otel_auth: Optional[SecretStr] = Field(default=None, description="Base64-encoded Langfuse OTLP basic auth override") 

1887 otel_exporter_jaeger_endpoint: Optional[str] = Field(default=None, description="Jaeger endpoint") 

1888 otel_exporter_jaeger_user: Optional[str] = Field(default=None, description="Jaeger collector username") 

1889 otel_exporter_jaeger_password: Optional[SecretStr] = Field(default=None, description="Jaeger collector password") 

1890 otel_exporter_zipkin_endpoint: Optional[str] = Field(default=None, description="Zipkin endpoint") 

1891 otel_service_name: str = Field(default="mcp-gateway", description="Service name for traces") 

1892 otel_resource_attributes: Optional[str] = Field(default=None, description="Resource attributes (comma-separated key=value)") 

1893 otel_bsp_max_queue_size: int = Field(default=2048, description="Max queue size for batch span processor") 

1894 otel_bsp_max_export_batch_size: int = Field(default=512, description="Max export batch size") 

1895 otel_bsp_schedule_delay: int = Field(default=5000, description="Schedule delay in milliseconds") 

1896 

1897 # =================================== 

1898 # Well-Known URI Configuration 

1899 # =================================== 

1900 

1901 # Enable well-known URI endpoints 

1902 well_known_enabled: bool = True 

1903 

1904 # robots.txt content (default: disallow all crawling for private API) 

1905 well_known_robots_txt: str = """User-agent: * 

1906Disallow: / 

1907 

1908# ContextForge is a private API gateway 

1909# Public crawling is disabled by default""" 

1910 

1911 # security.txt content (optional, user-defined) 

1912 # Example: "Contact: security@example.com\nExpires: 2025-12-31T23:59:59Z\nPreferred-Languages: en" 

1913 well_known_security_txt: str = "" 

1914 

1915 # Enable security.txt only if content is provided 

1916 well_known_security_txt_enabled: bool = False 

1917 

1918 # Additional custom well-known files (JSON format) 

1919 # Example: {"ai.txt": "This service uses AI for...", "dnt-policy.txt": "Do Not Track policy..."} 

1920 well_known_custom_files: str = "{}" 

1921 

1922 # Cache control for well-known files (seconds) 

1923 well_known_cache_max_age: int = 3600 # 1 hour default 

1924 

1925 # =================================== 

1926 # Performance / Startup Tuning 

1927 # =================================== 

1928 

1929 slug_refresh_batch_size: int = Field(default=1000, description="Batch size for gateway/tool slug refresh at startup") 

1930 model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore") 

1931 

1932 gateway_tool_name_separator: str = "-" 

1933 valid_slug_separator_regexp: ClassVar[str] = r"^(-{1,2}|[_.])$" 

1934 

1935 @field_validator("gateway_tool_name_separator") 

1936 @classmethod 

1937 def must_be_allowed_sep(cls, v: str) -> str: 

1938 """Validate the gateway tool name separator. 

1939 

1940 Args: 

1941 v: The separator value to validate. 

1942 

1943 Returns: 

1944 The validated separator, defaults to '-' if invalid. 

1945 

1946 Examples: 

1947 >>> Settings.must_be_allowed_sep('-') 

1948 '-' 

1949 >>> Settings.must_be_allowed_sep('--') 

1950 '--' 

1951 >>> Settings.must_be_allowed_sep('_') 

1952 '_' 

1953 >>> Settings.must_be_allowed_sep('.') 

1954 '.' 

1955 >>> Settings.must_be_allowed_sep('invalid') 

1956 '-' 

1957 """ 

1958 if not re.fullmatch(cls.valid_slug_separator_regexp, v): 

1959 logger.warning( 

1960 f"Invalid gateway_tool_name_separator '{v}'. Must be '-', '--', '_' or '.'. Defaulting to '-'.", 

1961 stacklevel=2, 

1962 ) 

1963 return "-" 

1964 return v 

1965 

1966 @property 

1967 def custom_well_known_files(self) -> Dict[str, str]: 

1968 """Parse custom well-known files from JSON string. 

1969 

1970 Returns: 

1971 Dict[str, str]: Parsed custom well-known files mapping filename to content. 

1972 """ 

1973 try: 

1974 return orjson.loads(self.well_known_custom_files) if self.well_known_custom_files else {} 

1975 except orjson.JSONDecodeError: 

1976 logger.error(f"Invalid JSON in WELL_KNOWN_CUSTOM_FILES: {self.well_known_custom_files}") 

1977 return {} 

1978 

1979 @property 

1980 def hot_server_check_interval(self) -> float: 

1981 """Hot server polling interval (auto-derived from gateway_auto_refresh_interval). 

1982 

1983 Hot servers (top 20% by usage) are polled at the same rate as gateway tool refresh. 

1984 

1985 Returns: 

1986 float: Hot server check interval in seconds (equals gateway_auto_refresh_interval) 

1987 """ 

1988 return float(self.gateway_auto_refresh_interval) 

1989 

1990 @property 

1991 def cold_server_check_interval(self) -> float: 

1992 """Cold server polling interval (auto-derived from gateway_auto_refresh_interval). 

1993 

1994 Cold servers (remaining 80%) are polled at 3x the gateway refresh rate to save resources. 

1995 

1996 Examples: 

1997 - gateway_auto_refresh_interval=300s → cold=900s (15 minutes) 

1998 - gateway_auto_refresh_interval=60s → cold=180s (3 minutes) 

1999 

2000 Returns: 

2001 float: Cold server check interval in seconds (3x gateway_auto_refresh_interval) 

2002 """ 

2003 return float(self.gateway_auto_refresh_interval * 3) 

2004 

2005 @field_validator("well_known_security_txt_enabled", mode="after") 

2006 @classmethod 

2007 def _auto_enable_security_txt(cls, v: Any, info: ValidationInfo) -> bool: 

2008 """Auto-enable security.txt if content is provided. 

2009 

2010 Args: 

2011 v: The current value of well_known_security_txt_enabled. 

2012 info: ValidationInfo containing field data. 

2013 

2014 Returns: 

2015 bool: True if security.txt content is provided, otherwise the original value. 

2016 """ 

2017 if info.data and "well_known_security_txt" in info.data: 

2018 return bool(info.data["well_known_security_txt"].strip()) 

2019 return bool(v) 

2020 

2021 @field_validator("experimental_rust_mcp_runtime_uds", mode="after") 

2022 @classmethod 

2023 def _validate_experimental_rust_mcp_runtime_uds(cls, value: Optional[str]) -> Optional[str]: 

2024 """Validate the optional UDS path used for the Rust MCP runtime sidecar. 

2025 

2026 Args: 

2027 value: Candidate UDS path from configuration. 

2028 

2029 Returns: 

2030 The normalized absolute UDS path, or ``None`` when unset. 

2031 

2032 Raises: 

2033 ValueError: If the path is not absolute or its parent directory is missing. 

2034 """ 

2035 if value in (None, ""): 

2036 return None 

2037 

2038 uds_path = Path(value).expanduser() 

2039 if not uds_path.is_absolute(): 

2040 raise ValueError("experimental_rust_mcp_runtime_uds must be an absolute path") 

2041 if not uds_path.parent.exists(): 

2042 raise ValueError(f"experimental_rust_mcp_runtime_uds parent directory does not exist: {uds_path.parent}") 

2043 return str(uds_path) 

2044 

2045 # ------------------------------- 

2046 # Flexible list parsing for envs 

2047 # ------------------------------- 

2048 @field_validator( 

2049 "sso_entra_admin_groups", 

2050 "sso_trusted_domains", 

2051 "sso_auto_admin_domains", 

2052 "sso_github_admin_orgs", 

2053 "sso_google_admin_domains", 

2054 "insecure_queryparam_auth_allowed_hosts", 

2055 "mcpgateway_ui_hide_sections", 

2056 "mcpgateway_ui_hide_header_items", 

2057 "mcpgateway_ui_hide_sections_admin", 

2058 "mcpgateway_ui_hide_header_items_admin", 

2059 "tool_description_forbidden_patterns", 

2060 mode="before", 

2061 ) 

2062 @classmethod 

2063 def _parse_list_from_env(cls, v: None | str | list[str]) -> list[str]: 

2064 """Parse list fields from environment values. 

2065 

2066 Accepts either JSON arrays (e.g. '["a","b"]') or comma-separated 

2067 strings (e.g. 'a,b'). Empty or None becomes an empty list. 

2068 

2069 Args: 

2070 v: The value to parse, can be None, list, or string. 

2071 

2072 Returns: 

2073 list: Parsed list of values. 

2074 

2075 Raises: 

2076 ValueError: If the value type is invalid for list field parsing. 

2077 """ 

2078 if v is None: 

2079 return [] 

2080 if isinstance(v, list): 

2081 return v 

2082 if isinstance(v, str): 

2083 s = v.strip() 

2084 if not s: 

2085 return [] 

2086 if s.startswith("["): 

2087 try: 

2088 parsed = orjson.loads(s) 

2089 return parsed if isinstance(parsed, list) else [] 

2090 except Exception: 

2091 logger.warning("Invalid JSON list in env for list field; falling back to CSV parsing") 

2092 # CSV fallback 

2093 return [item.strip() for item in s.split(",") if item.strip()] 

2094 raise ValueError("Invalid type for list field") 

2095 

2096 @field_validator("tool_description_forbidden_patterns", mode="after") 

2097 @classmethod 

2098 def _filter_empty_forbidden_patterns(cls, value: list[str]) -> list[str]: 

2099 """Strip empty/blank entries that would match every description. 

2100 

2101 Args: 

2102 value: List of forbidden pattern strings. 

2103 

2104 Returns: 

2105 list[str]: Filtered list with empty/blank entries removed. 

2106 """ 

2107 return [p for p in value if p and p.strip()] 

2108 

2109 @field_validator("mcpgateway_ui_hide_sections", "mcpgateway_ui_hide_sections_admin", mode="after") 

2110 @classmethod 

2111 def _validate_ui_hide_sections(cls, value: list[str]) -> list[str]: 

2112 """Normalize and filter hidable UI sections. 

2113 

2114 Args: 

2115 value: Candidate section identifiers from environment/config. 

2116 

2117 Returns: 

2118 list[str]: Normalized unique section identifiers. 

2119 """ 

2120 normalized: list[str] = [] 

2121 seen: set[str] = set() 

2122 

2123 for item in value: 

2124 candidate = str(item).strip().lower() 

2125 if not candidate: 

2126 continue 

2127 candidate = UI_HIDE_SECTION_ALIASES.get(candidate, candidate) 

2128 if candidate not in UI_HIDABLE_SECTIONS: 

2129 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_SECTIONS item: %s", item) 

2130 continue 

2131 if candidate not in seen: 

2132 seen.add(candidate) 

2133 normalized.append(candidate) 

2134 

2135 return normalized 

2136 

2137 @field_validator("mcpgateway_ui_hide_header_items", "mcpgateway_ui_hide_header_items_admin", mode="after") 

2138 @classmethod 

2139 def _validate_ui_hide_header_items(cls, value: list[str]) -> list[str]: 

2140 """Normalize and filter hidable header items. 

2141 

2142 Args: 

2143 value: Candidate header identifiers from environment/config. 

2144 

2145 Returns: 

2146 list[str]: Normalized unique header identifiers. 

2147 """ 

2148 normalized: list[str] = [] 

2149 seen: set[str] = set() 

2150 

2151 for item in value: 

2152 candidate = str(item).strip().lower() 

2153 if not candidate: 

2154 continue 

2155 if candidate not in UI_HIDABLE_HEADER_ITEMS: 

2156 logger.warning("Ignoring invalid MCPGATEWAY_UI_HIDE_HEADER_ITEMS item: %s", item) 

2157 continue 

2158 if candidate not in seen: 

2159 seen.add(candidate) 

2160 normalized.append(candidate) 

2161 

2162 return normalized 

2163 

2164 @property 

2165 def api_key(self) -> str: 

2166 """ 

2167 Generate API key from auth credentials. 

2168 

2169 Returns: 

2170 str: API key string in the format "username:password". 

2171 

2172 Examples: 

2173 >>> from mcpgateway.config import Settings 

2174 >>> settings = Settings(basic_auth_user="admin", basic_auth_password="secret") 

2175 >>> settings.api_key 

2176 'admin:secret' 

2177 >>> settings = Settings(basic_auth_user="user123", basic_auth_password="pass456") 

2178 >>> settings.api_key 

2179 'user123:pass456' 

2180 """ 

2181 return f"{self.basic_auth_user}:{self.basic_auth_password.get_secret_value()}" 

2182 

2183 @property 

2184 def supports_http(self) -> bool: 

2185 """Check if HTTP transport is enabled. 

2186 

2187 Returns: 

2188 bool: True if HTTP transport is enabled, False otherwise. 

2189 

2190 Examples: 

2191 >>> settings = Settings(transport_type="http") 

2192 >>> settings.supports_http 

2193 True 

2194 >>> settings = Settings(transport_type="all") 

2195 >>> settings.supports_http 

2196 True 

2197 >>> settings = Settings(transport_type="ws") 

2198 >>> settings.supports_http 

2199 False 

2200 """ 

2201 return self.transport_type in ["http", "all"] 

2202 

2203 @property 

2204 def supports_websocket(self) -> bool: 

2205 """Check if WebSocket transport is enabled. 

2206 

2207 Returns: 

2208 bool: True if WebSocket transport is enabled, False otherwise. 

2209 

2210 Examples: 

2211 >>> settings = Settings(transport_type="ws") 

2212 >>> settings.supports_websocket 

2213 True 

2214 >>> settings = Settings(transport_type="all") 

2215 >>> settings.supports_websocket 

2216 True 

2217 >>> settings = Settings(transport_type="http") 

2218 >>> settings.supports_websocket 

2219 False 

2220 """ 

2221 return self.transport_type in ["ws", "all"] 

2222 

2223 @property 

2224 def supports_sse(self) -> bool: 

2225 """Check if SSE transport is enabled. 

2226 

2227 Returns: 

2228 bool: True if SSE transport is enabled, False otherwise. 

2229 

2230 Examples: 

2231 >>> settings = Settings(transport_type="sse") 

2232 >>> settings.supports_sse 

2233 True 

2234 >>> settings = Settings(transport_type="all") 

2235 >>> settings.supports_sse 

2236 True 

2237 >>> settings = Settings(transport_type="http") 

2238 >>> settings.supports_sse 

2239 False 

2240 """ 

2241 return self.transport_type in ["sse", "all"] 

2242 

2243 class DatabaseSettings(TypedDict): 

2244 """TypedDict for SQLAlchemy database settings.""" 

2245 

2246 pool_size: int 

2247 max_overflow: int 

2248 pool_timeout: int 

2249 pool_recycle: int 

2250 connect_args: dict[str, Any] # consider more specific type if needed 

2251 

2252 @property 

2253 def database_settings(self) -> DatabaseSettings: 

2254 """ 

2255 Get SQLAlchemy database settings. 

2256 

2257 Returns: 

2258 DatabaseSettings: Dictionary containing SQLAlchemy database configuration options. 

2259 

2260 Examples: 

2261 >>> from mcpgateway.config import Settings 

2262 >>> s = Settings(database_url='sqlite:///./test.db') 

2263 >>> isinstance(s.database_settings, dict) 

2264 True 

2265 """ 

2266 return { 

2267 "pool_size": self.db_pool_size, 

2268 "max_overflow": self.db_max_overflow, 

2269 "pool_timeout": self.db_pool_timeout, 

2270 "pool_recycle": self.db_pool_recycle, 

2271 "connect_args": {"check_same_thread": False} if self.database_url.startswith("sqlite") else {}, 

2272 } 

2273 

2274 class CORSSettings(TypedDict): 

2275 """TypedDict for CORS settings.""" 

2276 

2277 allow_origins: NotRequired[List[str]] 

2278 allow_credentials: NotRequired[bool] 

2279 allow_methods: NotRequired[List[str]] 

2280 allow_headers: NotRequired[List[str]] 

2281 

2282 @property 

2283 def cors_settings(self) -> CORSSettings: 

2284 """Get CORS settings. 

2285 

2286 Returns: 

2287 CORSSettings: Dictionary containing CORS configuration options. 

2288 

2289 Examples: 

2290 >>> s = Settings(cors_enabled=True, allowed_origins={'http://localhost'}) 

2291 >>> cors = s.cors_settings 

2292 >>> cors['allow_origins'] 

2293 ['http://localhost'] 

2294 >>> cors['allow_credentials'] 

2295 True 

2296 >>> s2 = Settings(cors_enabled=False) 

2297 >>> s2.cors_settings 

2298 {} 

2299 """ 

2300 return ( 

2301 { 

2302 "allow_origins": list(self.allowed_origins), 

2303 "allow_credentials": True, 

2304 "allow_methods": ["*"], 

2305 "allow_headers": ["*"], 

2306 } 

2307 if self.cors_enabled 

2308 else {} 

2309 ) 

2310 

2311 def validate_transport(self) -> None: 

2312 """ 

2313 Validate transport configuration. 

2314 

2315 Raises: 

2316 ValueError: If the transport type is not one of the valid options. 

2317 

2318 Examples: 

2319 >>> from mcpgateway.config import Settings 

2320 >>> s = Settings(transport_type='http') 

2321 >>> s.validate_transport() # no error 

2322 >>> s2 = Settings(transport_type='invalid') 

2323 >>> try: 

2324 ... s2.validate_transport() 

2325 ... except ValueError as e: 

2326 ... print('error') 

2327 error 

2328 """ 

2329 # valid_types = {"http", "ws", "sse", "all"} 

2330 valid_types = {"sse", "streamablehttp", "all", "http"} 

2331 if self.transport_type not in valid_types: 

2332 raise ValueError(f"Invalid transport type. Must be one of: {valid_types}") 

2333 

2334 def validate_database(self) -> None: 

2335 """Validate database configuration. 

2336 

2337 Examples: 

2338 >>> from mcpgateway.config import Settings 

2339 >>> s = Settings(database_url='sqlite:///./test.db') 

2340 >>> s.validate_database() # Should create the directory if it does not exist 

2341 """ 

2342 if self.database_url.startswith("sqlite"): 

2343 db_path = Path(self.database_url.replace("sqlite:///", "")) 

2344 db_dir = db_path.parent 

2345 if not db_dir.exists(): 

2346 db_dir.mkdir(parents=True) 

2347 

2348 # Validation patterns for safe display (configurable) 

2349 validation_dangerous_html_pattern: str = ( 

2350 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>" 

2351 ) 

2352 

2353 validation_dangerous_js_pattern: str = r"(?i)(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)" 

2354 

2355 validation_allowed_url_schemes: List[str] = ["http://", "https://", "ws://", "wss://"] 

2356 

2357 # Character validation patterns 

2358 validation_name_pattern: str = r"^[a-zA-Z0-9_.\- ]+$" # Allow spaces for names (literal space, not \s to reject control chars) 

2359 validation_identifier_pattern: str = r"^[a-zA-Z0-9_\-\.]+$" # No spaces for IDs 

2360 validation_safe_uri_pattern: str = r"^[a-zA-Z0-9_\-.:/?=&%{}]+$" 

2361 validation_unsafe_uri_pattern: str = r'[<>"\'\\]' 

2362 validation_tool_name_pattern: str = r"^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$" # MCP tool naming per SEP-986 

2363 validation_tool_method_pattern: str = r"^[a-zA-Z][a-zA-Z0-9_\./-]*$" 

2364 

2365 # MCP-compliant size limits (configurable via env) 

2366 validation_max_name_length: int = 255 

2367 validation_max_description_length: int = 8192 # 8KB 

2368 validation_max_template_length: int = 65536 # 64KB 

2369 validation_max_content_length: int = 1048576 # 1MB 

2370 validation_max_json_depth: int = Field( 

2371 default=int(os.getenv("VALIDATION_MAX_JSON_DEPTH", "30")), 

2372 description=( 

2373 "Maximum allowed JSON nesting depth for tool/resource schemas. " 

2374 "Increased from 10 to 30 for compatibility with deeply nested schemas " 

2375 "like Notion MCP (issue #1542). Override with VALIDATION_MAX_JSON_DEPTH " 

2376 "environment variable. Minimum: 1, Maximum: 100" 

2377 ), 

2378 ge=1, 

2379 le=100, 

2380 ) 

2381 validation_max_url_length: int = 2048 

2382 validation_max_rpc_param_size: int = 262144 # 256KB 

2383 

2384 validation_max_method_length: int = 128 

2385 

2386 # Allowed MIME types 

2387 validation_allowed_mime_types: List[str] = [ 

2388 "text/plain", 

2389 "text/html", 

2390 "text/css", 

2391 "text/markdown", 

2392 "text/javascript", 

2393 "application/json", 

2394 "application/xml", 

2395 "application/pdf", 

2396 "image/png", 

2397 "image/jpeg", 

2398 "image/gif", 

2399 "image/svg+xml", 

2400 "application/octet-stream", 

2401 ] 

2402 

2403 # Rate limiting 

2404 validation_max_requests_per_minute: int = 60 

2405 

2406 # Header passthrough feature (disabled by default for security) 

2407 enable_header_passthrough: bool = Field(default=False, description="Enable HTTP header passthrough feature (WARNING: Security implications - only enable if needed)") 

2408 enable_overwrite_base_headers: bool = Field(default=False, description="Enable overwriting of base headers") 

2409 

2410 # Passthrough headers configuration 

2411 default_passthrough_headers: List[str] = Field(default_factory=list) 

2412 

2413 # Passthrough headers source priority 

2414 # - "env": Environment variable always wins (ideal for Kubernetes/containerized deployments) 

2415 # - "db": Database take precedence if configured, env as fallback (default) 

2416 # - "merge": Union of both sources - env provides base, other configuration in DB can add more headers 

2417 passthrough_headers_source: Literal["env", "db", "merge"] = Field( 

2418 default="db", 

2419 description="Source priority for passthrough headers: env (environment always wins), db (database wins, default), merge (combine both)", 

2420 ) 

2421 

2422 # =================================== 

2423 # Pagination Configuration 

2424 # =================================== 

2425 

2426 # Default number of items per page for paginated endpoints 

2427 pagination_default_page_size: int = Field(default=50, ge=1, le=1000, description="Default number of items per page") 

2428 

2429 # Maximum allowed items per page (prevents abuse) 

2430 pagination_max_page_size: int = Field(default=500, ge=1, le=10000, description="Maximum allowed items per page") 

2431 

2432 # Minimum items per page 

2433 pagination_min_page_size: int = Field(default=1, ge=1, description="Minimum items per page") 

2434 

2435 # Threshold for switching from offset to cursor-based pagination 

2436 pagination_cursor_threshold: int = Field(default=10000, ge=1, description="Threshold for cursor-based pagination") 

2437 

2438 # Enable cursor-based pagination globally 

2439 pagination_cursor_enabled: bool = Field(default=True, description="Enable cursor-based pagination") 

2440 

2441 # Default sort field for paginated queries 

2442 pagination_default_sort_field: str = Field(default="created_at", description="Default sort field") 

2443 

2444 # Default sort order for paginated queries 

2445 pagination_default_sort_order: str = Field(default="desc", pattern="^(asc|desc)$", description="Default sort order") 

2446 

2447 # Maximum offset allowed for offset-based pagination (prevents abuse) 

2448 pagination_max_offset: int = Field(default=100000, ge=0, description="Maximum offset for pagination") 

2449 

2450 # Cache pagination counts for performance (seconds) 

2451 pagination_count_cache_ttl: int = Field(default=300, ge=0, description="Cache TTL for pagination counts") 

2452 

2453 # Enable pagination links in API responses 

2454 pagination_include_links: bool = Field(default=True, description="Include pagination links") 

2455 

2456 # Base URL for pagination links (defaults to request URL) 

2457 pagination_base_url: Optional[str] = Field(default=None, description="Base URL for pagination links") 

2458 

2459 # Ed25519 keys for signing 

2460 enable_ed25519_signing: bool = Field(default=False, description="Enable Ed25519 signing for certificates") 

2461 prev_ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Previous Ed25519 private key for signing") 

2462 prev_ed25519_public_key: Optional[str] = Field(default=None, description="Derived previous Ed25519 public key") 

2463 ed25519_private_key: SecretStr = Field(default=SecretStr(""), description="Ed25519 private key for signing") 

2464 ed25519_public_key: Optional[str] = Field(default=None, description="Derived Ed25519 public key") 

2465 

2466 @model_validator(mode="after") 

2467 def derive_public_keys(self) -> "Settings": 

2468 """ 

2469 Derive public keys after all individual field validations are complete. 

2470 

2471 Returns: 

2472 Settings: The updated Settings instance with derived public keys. 

2473 """ 

2474 for private_key_field in ["ed25519_private_key", "prev_ed25519_private_key"]: 

2475 public_key_field = private_key_field.replace("private", "public") 

2476 

2477 # 1. Get the private key SecretStr object 

2478 private_key_secret: SecretStr = getattr(self, private_key_field) 

2479 

2480 # 2. Proceed only if a key is present and the public key hasn't been set 

2481 pem = private_key_secret.get_secret_value().strip() 

2482 if not pem: 

2483 continue 

2484 

2485 try: 

2486 # Load the private key 

2487 private_key = serialization.load_pem_private_key(pem.encode(), password=None) 

2488 if not isinstance(private_key, ed25519.Ed25519PrivateKey): 

2489 # This check is useful, though model_validator should not raise 

2490 # for an invalid key if the field validator has already passed. 

2491 continue 

2492 

2493 # Derive and PEM-encode the public key 

2494 public_key = private_key.public_key() 

2495 public_pem = public_key.public_bytes( 

2496 encoding=serialization.Encoding.PEM, 

2497 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

2498 ).decode() 

2499 

2500 # 3. Set the public key attribute directly on the model instance (self) 

2501 setattr(self, public_key_field, public_pem) 

2502 # logger.info(f"Derived and stored {public_key_field} automatically.") 

2503 

2504 except Exception: 

2505 logger.warning("Failed to derive public key for private_key") 

2506 # You can choose to raise an error here if a failure should halt model creation 

2507 

2508 return self 

2509 

2510 def __init__(self, **kwargs: Any) -> None: 

2511 """Initialize Settings with environment variable parsing. 

2512 

2513 Args: 

2514 **kwargs: Keyword arguments passed to parent Settings class 

2515 

2516 Raises: 

2517 ValueError: When environment variable parsing fails or produces invalid data 

2518 

2519 Examples: 

2520 >>> import os 

2521 >>> # Test with no environment variable set 

2522 >>> old_val = os.environ.get('DEFAULT_PASSTHROUGH_HEADERS') 

2523 >>> if 'DEFAULT_PASSTHROUGH_HEADERS' in os.environ: 

2524 ... del os.environ['DEFAULT_PASSTHROUGH_HEADERS'] 

2525 >>> s = Settings() 

2526 >>> s.default_passthrough_headers 

2527 ['X-Tenant-Id', 'X-Trace-Id'] 

2528 >>> # Restore original value if it existed 

2529 >>> if old_val is not None: 

2530 ... os.environ['DEFAULT_PASSTHROUGH_HEADERS'] = old_val 

2531 """ 

2532 super().__init__(**kwargs) 

2533 

2534 # Parse DEFAULT_PASSTHROUGH_HEADERS environment variable 

2535 default_value = os.environ.get("DEFAULT_PASSTHROUGH_HEADERS") 

2536 if default_value: 

2537 try: 

2538 # Try JSON parsing first 

2539 self.default_passthrough_headers = orjson.loads(default_value) 

2540 if not isinstance(self.default_passthrough_headers, list): 

2541 raise ValueError("Must be a JSON array") 

2542 except (orjson.JSONDecodeError, ValueError): 

2543 # Fallback to comma-separated parsing 

2544 self.default_passthrough_headers = [h.strip() for h in default_value.split(",") if h.strip()] 

2545 logger.info(f"Parsed comma-separated passthrough headers: {self.default_passthrough_headers}") 

2546 else: 

2547 # Safer defaults without Authorization header 

2548 self.default_passthrough_headers = ["X-Tenant-Id", "X-Trace-Id"] 

2549 

2550 # Configure environment-aware CORS origins if not explicitly set via env or kwargs 

2551 # Only apply defaults if using the default allowed_origins value 

2552 if not os.environ.get("ALLOWED_ORIGINS") and "allowed_origins" not in kwargs and self.allowed_origins == {"http://localhost", "http://localhost:4444"}: 

2553 if self.environment == "development": 

2554 self.allowed_origins = { 

2555 "http://localhost", 

2556 "http://localhost:3000", 

2557 "http://localhost:8080", 

2558 "http://127.0.0.1:3000", 

2559 "http://127.0.0.1:8080", 

2560 f"http://localhost:{self.port}", 

2561 f"http://127.0.0.1:{self.port}", 

2562 } 

2563 else: 

2564 # Production origins - construct from app_domain (extract hostname from HttpUrl) 

2565 app_domain_host = urlparse(str(self.app_domain)).hostname or "localhost" 

2566 self.allowed_origins = {f"https://{app_domain_host}", f"https://app.{app_domain_host}", f"https://admin.{app_domain_host}"} 

2567 

2568 # MCP transport auth policy: 

2569 # - If MCP_REQUIRE_AUTH is unset, derive it from AUTH_REQUIRED 

2570 # - If AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false is explicit, emit a warning 

2571 if self.mcp_require_auth is None: 

2572 self.mcp_require_auth = bool(self.auth_required) 

2573 logger.info( 

2574 "MCP_REQUIRE_AUTH not set; defaulting to %s to match AUTH_REQUIRED=%s.", 

2575 self.mcp_require_auth, 

2576 self.auth_required, 

2577 ) 

2578 elif self.auth_required and self.mcp_require_auth is False: 

2579 logger.warning("AUTH_REQUIRED=true but MCP_REQUIRE_AUTH=false. MCP endpoints (/servers/*/mcp) allow unauthenticated access to public items.") 

2580 

2581 # Validate proxy auth configuration 

2582 if not self.mcp_client_auth_enabled and self.trust_proxy_auth and not self.trust_proxy_auth_dangerously: 

2583 logger.warning( 

2584 "TRUST_PROXY_AUTH=true ignored because TRUST_PROXY_AUTH_DANGEROUSLY is false " 

2585 "while MCP_CLIENT_AUTH_ENABLED=false. Set TRUST_PROXY_AUTH_DANGEROUSLY=true " 

2586 "only behind a strictly trusted authentication proxy." 

2587 ) 

2588 self.trust_proxy_auth = False 

2589 elif not self.mcp_client_auth_enabled and self.trust_proxy_auth and self.trust_proxy_auth_dangerously: 

2590 logger.warning("TRUST_PROXY_AUTH_DANGEROUSLY=true acknowledged. Requests may trust identity headers from the upstream proxy.") 

2591 elif not self.mcp_client_auth_enabled and not self.trust_proxy_auth: 

2592 logger.warning( 

2593 "MCP client authentication is disabled but trust_proxy_auth is not set. " 

2594 "This is a security risk! Set TRUST_PROXY_AUTH=true only if ContextForge " 

2595 "is behind a trusted authentication proxy." 

2596 ) 

2597 

2598 if not self.auth_required and self.allow_unauthenticated_admin: 

2599 logger.warning("ALLOW_UNAUTHENTICATED_ADMIN=true acknowledged while AUTH_REQUIRED=false. Unauthenticated requests may receive admin context.") 

2600 

2601 # Masking value for all sensitive data 

2602 masked_auth_value: str = "*****" 

2603 

2604 def log_summary(self) -> None: 

2605 """ 

2606 Log a summary of the application settings. 

2607 

2608 Dumps the current settings to a dictionary while excluding sensitive 

2609 information such as `database_url` and `memcached_url`, and logs it 

2610 at the INFO level. 

2611 

2612 This method is useful for debugging or auditing purposes without 

2613 exposing credentials or secrets in logs. 

2614 """ 

2615 summary = self.model_dump(exclude={"database_url", "memcached_url"}) 

2616 logger.info(f"Application settings summary: {summary}") 

2617 

2618 ENABLE_METRICS: bool = Field(False, description="Enable Prometheus metrics endpoint at /metrics/prometheus (requires authentication)") 

2619 METRICS_EXCLUDED_HANDLERS: str = Field("", description="Comma-separated regex patterns for paths to exclude from metrics") 

2620 METRICS_NAMESPACE: str = Field("default", description="Prometheus metrics namespace") 

2621 METRICS_SUBSYSTEM: str = Field("", description="Prometheus metrics subsystem") 

2622 METRICS_CUSTOM_LABELS: str = Field("", description='Comma-separated "key=value" pairs for static custom labels') 

2623 

2624 

2625@lru_cache() 

2626def get_settings(**kwargs: Any) -> Settings: 

2627 """Get cached settings instance. 

2628 

2629 Args: 

2630 **kwargs: Keyword arguments to pass to the Settings setup. 

2631 

2632 Returns: 

2633 Settings: A cached instance of the Settings class. 

2634 

2635 Examples: 

2636 >>> settings = get_settings() 

2637 >>> isinstance(settings, Settings) 

2638 True 

2639 >>> # Second call returns the same cached instance 

2640 >>> settings2 = get_settings() 

2641 >>> settings is settings2 

2642 True 

2643 """ 

2644 # Instantiate a fresh Pydantic Settings object, 

2645 # loading from env vars or .env exactly once. 

2646 cfg = Settings(**kwargs) 

2647 # Validate that transport_type is correct; will 

2648 # raise if mis-configured. 

2649 cfg.validate_transport() 

2650 # Ensure sqlite DB directories exist if needed. 

2651 cfg.validate_database() 

2652 # Return the one-and-only Settings instance (cached). 

2653 return cfg 

2654 

2655 

2656def generate_settings_schema() -> dict[str, Any]: 

2657 """ 

2658 Return the JSON Schema describing the Settings model. 

2659 

2660 This schema can be used for validation or documentation purposes. 

2661 

2662 Returns: 

2663 dict: A dictionary representing the JSON Schema of the Settings model. 

2664 """ 

2665 return Settings.model_json_schema(mode="validation") 

2666 

2667 

2668# Lazy "instance" of settings 

2669class LazySettingsWrapper: 

2670 """Lazily initialize settings singleton on getattr""" 

2671 

2672 @property 

2673 def plugins(self) -> Any: 

2674 """Access plugin framework settings via ``settings.plugins``. 

2675 

2676 Returns a ``LazySettingsWrapper`` from the plugin framework that 

2677 provides lightweight ``@property`` accessors for startup-critical 

2678 fields and a ``__getattr__`` fallback to the full ``PluginsSettings``. 

2679 

2680 Returns: 

2681 The plugin framework settings wrapper. 

2682 """ 

2683 # First-Party 

2684 from mcpgateway.plugins.framework.settings import settings as _plugin_settings # pylint: disable=import-outside-toplevel 

2685 

2686 return _plugin_settings 

2687 

2688 def __getattr__(self, key: str) -> Any: 

2689 """Get the real settings object and forward to it 

2690 

2691 Args: 

2692 key: The key to fetch from settings 

2693 

2694 Returns: 

2695 Any: The value of the attribute on the settings 

2696 """ 

2697 return getattr(get_settings(), key) 

2698 

2699 

2700settings = LazySettingsWrapper() 

2701 

2702 

2703if __name__ == "__main__": 

2704 if "--schema" in sys.argv: 

2705 schema = generate_settings_schema() 

2706 print(orjson.dumps(schema, option=orjson.OPT_INDENT_2).decode()) 

2707 sys.exit(0) 

2708 settings.log_summary()