Coverage for mcpgateway / utils / sso_bootstrap.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/sso_bootstrap.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Bootstrap SSO providers with predefined configurations. 

8""" 

9 

10# Future 

11from __future__ import annotations 

12 

13# Standard 

14import json 

15import logging 

16from typing import Any, Dict, List 

17 

18# First-Party 

19from mcpgateway.config import settings 

20from mcpgateway.services.sso_service import ADFS_PROVIDER_ID 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25def get_predefined_sso_providers() -> List[Dict]: 

26 """Get list of predefined SSO providers based on environment configuration. 

27 

28 Returns: 

29 List of SSO provider configurations ready for database storage. 

30 

31 Examples: 

32 Default (no providers configured): 

33 >>> providers = get_predefined_sso_providers() 

34 >>> isinstance(providers, list) 

35 True 

36 

37 Patch configuration to include GitHub provider: 

38 >>> from types import SimpleNamespace 

39 >>> from unittest.mock import patch 

40 >>> cfg = SimpleNamespace( 

41 ... sso_github_enabled=True, 

42 ... sso_github_client_id='id', 

43 ... sso_github_client_secret='sec', 

44 ... sso_trusted_domains=[], 

45 ... sso_auto_create_users=True, 

46 ... sso_google_enabled=False, 

47 ... sso_ibm_verify_enabled=False, 

48 ... sso_okta_enabled=False, 

49 ... sso_entra_enabled=False, 

50 ... ) 

51 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

52 ... result = get_predefined_sso_providers() 

53 >>> isinstance(result, list) 

54 True 

55 

56 Patch configuration to include Google provider: 

57 >>> cfg = SimpleNamespace( 

58 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

59 ... sso_trusted_domains=[], sso_auto_create_users=True, 

60 ... sso_google_enabled=True, sso_google_client_id='gid', sso_google_client_secret='gsec', 

61 ... sso_ibm_verify_enabled=False, sso_okta_enabled=False, sso_entra_enabled=False 

62 ... ) 

63 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

64 ... result = get_predefined_sso_providers() 

65 >>> isinstance(result, list) 

66 True 

67 

68 Patch configuration to include Okta provider: 

69 >>> cfg = SimpleNamespace( 

70 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

71 ... sso_trusted_domains=[], sso_auto_create_users=True, 

72 ... sso_google_enabled=False, sso_okta_enabled=True, sso_okta_client_id='ok', sso_okta_client_secret='os', sso_okta_issuer='https://company.okta.com', 

73 ... sso_ibm_verify_enabled=False, sso_entra_enabled=False 

74 ... ) 

75 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

76 ... result = get_predefined_sso_providers() 

77 >>> isinstance(result, list) 

78 True 

79 

80 Patch configuration to include Microsoft Entra ID provider: 

81 >>> cfg = SimpleNamespace( 

82 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

83 ... sso_trusted_domains=[], sso_auto_create_users=True, 

84 ... sso_google_enabled=False, sso_okta_enabled=False, 

85 ... sso_ibm_verify_enabled=False, sso_entra_enabled=True, sso_entra_client_id='entra_client', sso_entra_client_secret='entra_secret', sso_entra_tenant_id='tenant-id-123', 

86 ... sso_generic_enabled=False 

87 ... ) 

88 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

89 ... result = get_predefined_sso_providers() 

90 >>> isinstance(result, list) 

91 True 

92 

93 Patch configuration to include Generic OIDC provider: 

94 >>> cfg = SimpleNamespace( 

95 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

96 ... sso_trusted_domains=[], sso_auto_create_users=True, 

97 ... sso_google_enabled=False, sso_okta_enabled=False, sso_ibm_verify_enabled=False, sso_entra_enabled=False, 

98 ... sso_generic_enabled=True, sso_generic_provider_id='keycloak', sso_generic_display_name='Keycloak', 

99 ... sso_generic_client_id='kc_client', sso_generic_client_secret='kc_secret', 

100 ... sso_generic_authorization_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/auth', 

101 ... sso_generic_token_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/token', 

102 ... sso_generic_userinfo_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/userinfo', 

103 ... sso_generic_issuer='https://keycloak.company.com/auth/realms/master', 

104 ... sso_generic_jwks_uri='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/certs', 

105 ... sso_generic_scope='openid profile email' 

106 ... ) 

107 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

108 ... result = get_predefined_sso_providers() 

109 >>> isinstance(result, list) 

110 True 

111 """ 

112 providers = [] 

113 

114 # GitHub OAuth Provider 

115 if settings.sso_github_enabled and settings.sso_github_client_id: 

116 providers.append( 

117 { 

118 "id": "github", 

119 "name": "github", 

120 "display_name": "GitHub", 

121 "provider_type": "oauth2", 

122 "client_id": settings.sso_github_client_id, 

123 "client_secret": settings.sso_github_client_secret.get_secret_value() if settings.sso_github_client_secret else "", 

124 "authorization_url": "https://github.com/login/oauth/authorize", 

125 "token_url": "https://github.com/login/oauth/access_token", # nosec B105 - public OAuth endpoint 

126 "userinfo_url": "https://api.github.com/user", 

127 "scope": "user:email", 

128 "trusted_domains": settings.sso_trusted_domains, 

129 "auto_create_users": settings.sso_auto_create_users, 

130 "team_mapping": {}, 

131 } 

132 ) 

133 

134 # Google OAuth Provider 

135 if settings.sso_google_enabled and settings.sso_google_client_id: 

136 providers.append( 

137 { 

138 "id": "google", 

139 "name": "google", 

140 "display_name": "Google", 

141 "provider_type": "oidc", 

142 "client_id": settings.sso_google_client_id, 

143 "client_secret": settings.sso_google_client_secret.get_secret_value() if settings.sso_google_client_secret else "", 

144 "authorization_url": "https://accounts.google.com/o/oauth2/auth", 

145 "token_url": "https://oauth2.googleapis.com/token", # nosec B105 - public OAuth endpoint 

146 "userinfo_url": "https://openidconnect.googleapis.com/v1/userinfo", 

147 "issuer": "https://accounts.google.com", 

148 "scope": "openid profile email", 

149 "trusted_domains": settings.sso_trusted_domains, 

150 "auto_create_users": settings.sso_auto_create_users, 

151 "team_mapping": {}, 

152 } 

153 ) 

154 

155 # IBM Security Verify Provider 

156 if settings.sso_ibm_verify_enabled and settings.sso_ibm_verify_client_id: 

157 base_url = settings.sso_ibm_verify_issuer or "https://tenant.verify.ibm.com" 

158 providers.append( 

159 { 

160 "id": "ibm_verify", 

161 "name": "ibm_verify", 

162 "display_name": "IBM Security Verify", 

163 "provider_type": "oidc", 

164 "client_id": settings.sso_ibm_verify_client_id, 

165 "client_secret": settings.sso_ibm_verify_client_secret.get_secret_value() if settings.sso_ibm_verify_client_secret else "", 

166 "authorization_url": f"{base_url}/oidc/endpoint/default/authorize", 

167 "token_url": f"{base_url}/oidc/endpoint/default/token", 

168 "userinfo_url": f"{base_url}/oidc/endpoint/default/userinfo", 

169 "issuer": f"{base_url}/oidc/endpoint/default", 

170 "scope": "openid profile email", 

171 "trusted_domains": settings.sso_trusted_domains, 

172 "auto_create_users": settings.sso_auto_create_users, 

173 "team_mapping": {}, 

174 } 

175 ) 

176 

177 # Okta Provider 

178 if settings.sso_okta_enabled and settings.sso_okta_client_id: 

179 base_url = settings.sso_okta_issuer or "https://company.okta.com" 

180 okta_team_mapping: Dict[str, Any] = {} 

181 if settings.okta_group_mapping: 

182 try: 

183 parsed = json.loads(settings.okta_group_mapping) 

184 if isinstance(parsed, dict): 

185 okta_team_mapping = parsed 

186 else: 

187 logger.warning("OKTA_GROUP_MAPPING must be a JSON object (got %s); using empty team mapping", type(parsed).__name__) 

188 except (json.JSONDecodeError, TypeError): 

189 logger.warning("Failed to parse OKTA_GROUP_MAPPING as JSON; using empty team mapping") 

190 providers.append( 

191 { 

192 "id": "okta", 

193 "name": "okta", 

194 "display_name": "Okta", 

195 "provider_type": "oidc", 

196 "client_id": settings.sso_okta_client_id, 

197 "client_secret": settings.sso_okta_client_secret.get_secret_value() if settings.sso_okta_client_secret else "", 

198 "authorization_url": f"{base_url}/oauth2/default/v1/authorize", 

199 "token_url": f"{base_url}/oauth2/default/v1/token", 

200 "userinfo_url": f"{base_url}/oauth2/default/v1/userinfo", 

201 "issuer": f"{base_url}/oauth2/default", 

202 "scope": settings.sso_okta_scope, 

203 "trusted_domains": settings.sso_trusted_domains, 

204 "auto_create_users": settings.sso_auto_create_users, 

205 "team_mapping": okta_team_mapping, 

206 } 

207 ) 

208 

209 # Microsoft Entra ID Provider 

210 if settings.sso_entra_enabled and settings.sso_entra_client_id and settings.sso_entra_tenant_id: 

211 tenant_id = settings.sso_entra_tenant_id 

212 base_url = f"https://login.microsoftonline.com/{tenant_id}" 

213 providers.append( 

214 { 

215 "id": "entra", 

216 "name": "entra", 

217 "display_name": "Microsoft Entra ID", 

218 "provider_type": "oidc", 

219 "client_id": settings.sso_entra_client_id, 

220 "client_secret": settings.sso_entra_client_secret.get_secret_value() if settings.sso_entra_client_secret else "", 

221 "authorization_url": f"{base_url}/oauth2/v2.0/authorize", 

222 "token_url": f"{base_url}/oauth2/v2.0/token", 

223 "userinfo_url": "https://graph.microsoft.com/oidc/userinfo", 

224 "issuer": f"{base_url}/v2.0", 

225 "scope": "openid profile email User.Read", 

226 "trusted_domains": settings.sso_trusted_domains, 

227 "auto_create_users": settings.sso_auto_create_users, 

228 "team_mapping": {}, 

229 "provider_metadata": { 

230 "groups_claim": settings.sso_entra_groups_claim, 

231 "role_mappings": settings.sso_entra_role_mappings, 

232 "graph_api_enabled": settings.sso_entra_graph_api_enabled, 

233 "graph_api_timeout": settings.sso_entra_graph_api_timeout, 

234 "graph_api_max_groups": settings.sso_entra_graph_api_max_groups, 

235 }, 

236 } 

237 ) 

238 

239 # Keycloak OIDC Provider with Auto-Discovery 

240 if settings.sso_keycloak_enabled and settings.sso_keycloak_base_url and settings.sso_keycloak_client_id: 

241 try: 

242 # First-Party 

243 from mcpgateway.utils.keycloak_discovery import discover_keycloak_endpoints_sync 

244 

245 endpoints = discover_keycloak_endpoints_sync( 

246 settings.sso_keycloak_base_url, 

247 settings.sso_keycloak_realm, 

248 public_base_url=getattr(settings, "sso_keycloak_public_base_url", None), 

249 ) 

250 

251 if endpoints: 

252 providers.append( 

253 { 

254 "id": "keycloak", 

255 "name": "keycloak", 

256 "display_name": f"Keycloak ({settings.sso_keycloak_realm})", 

257 "provider_type": "oidc", 

258 "client_id": settings.sso_keycloak_client_id, 

259 "client_secret": settings.sso_keycloak_client_secret.get_secret_value() if settings.sso_keycloak_client_secret else "", 

260 "authorization_url": endpoints["authorization_url"], 

261 "token_url": endpoints["token_url"], 

262 "userinfo_url": endpoints["userinfo_url"], 

263 "issuer": endpoints["issuer"], 

264 "jwks_uri": endpoints.get("jwks_uri"), 

265 "scope": "openid profile email", 

266 "trusted_domains": settings.sso_trusted_domains, 

267 "auto_create_users": settings.sso_auto_create_users, 

268 "team_mapping": {}, 

269 "provider_metadata": { 

270 "realm": settings.sso_keycloak_realm, 

271 "base_url": settings.sso_keycloak_base_url, 

272 "public_base_url": getattr(settings, "sso_keycloak_public_base_url", None), 

273 "map_realm_roles": settings.sso_keycloak_map_realm_roles, 

274 "map_client_roles": settings.sso_keycloak_map_client_roles, 

275 "username_claim": settings.sso_keycloak_username_claim, 

276 "email_claim": settings.sso_keycloak_email_claim, 

277 "groups_claim": settings.sso_keycloak_groups_claim, 

278 "jwks_uri": endpoints.get("jwks_uri"), 

279 "role_mappings": getattr(settings, "sso_keycloak_role_mappings", {}), 

280 "default_role": getattr(settings, "sso_keycloak_default_role", None), 

281 "resolve_team_scope_to_personal_team": getattr(settings, "sso_keycloak_resolve_team_scope_to_personal_team", False), 

282 }, 

283 } 

284 ) 

285 else: 

286 logger.error(f"Failed to discover Keycloak endpoints for realm '{settings.sso_keycloak_realm}' at {settings.sso_keycloak_base_url}") 

287 except Exception as e: 

288 logger.error(f"Error bootstrapping Keycloak provider: {type(e).__name__}: {e}", exc_info=True) 

289 

290 # ADFS Provider 

291 if settings.sso_adfs_enabled and settings.sso_adfs_client_id and settings.sso_adfs_authorization_url and settings.sso_adfs_token_url: 

292 display_name = settings.sso_adfs_display_name or "ADFS Login" 

293 

294 # ADFS uses OIDC but doesn't support GET on userinfo endpoint; 

295 # user info is extracted from the ID token instead. 

296 providers.append( 

297 { 

298 "id": ADFS_PROVIDER_ID, 

299 "name": ADFS_PROVIDER_ID, 

300 "display_name": display_name, 

301 "provider_type": "oidc", 

302 "client_id": settings.sso_adfs_client_id, 

303 "client_secret": settings.sso_adfs_client_secret.get_secret_value() if settings.sso_adfs_client_secret else "", 

304 "authorization_url": settings.sso_adfs_authorization_url, 

305 "token_url": settings.sso_adfs_token_url, 

306 "userinfo_url": settings.sso_adfs_token_url, # Placeholder — not used for ADFS 

307 "issuer": settings.sso_adfs_issuer, 

308 "scope": settings.sso_adfs_scope or "openid profile email", 

309 "trusted_domains": settings.sso_trusted_domains, 

310 "auto_create_users": settings.sso_auto_create_users, 

311 "team_mapping": {}, 

312 } 

313 ) 

314 

315 # Generic OIDC Provider (Keycloak, Auth0, Authentik, etc.) 

316 if settings.sso_generic_enabled and settings.sso_generic_client_id and settings.sso_generic_provider_id: 

317 provider_id = settings.sso_generic_provider_id 

318 display_name = settings.sso_generic_display_name or provider_id.title() 

319 

320 provider_config = { 

321 "id": provider_id, 

322 "name": provider_id, 

323 "display_name": display_name, 

324 "provider_type": "oidc", 

325 "client_id": settings.sso_generic_client_id, 

326 "client_secret": settings.sso_generic_client_secret.get_secret_value() if settings.sso_generic_client_secret else "", 

327 "authorization_url": settings.sso_generic_authorization_url, 

328 "token_url": settings.sso_generic_token_url, 

329 "userinfo_url": settings.sso_generic_userinfo_url, 

330 "issuer": settings.sso_generic_issuer, 

331 "scope": settings.sso_generic_scope, 

332 "trusted_domains": settings.sso_trusted_domains, 

333 "auto_create_users": settings.sso_auto_create_users, 

334 "team_mapping": {}, 

335 } 

336 if settings.sso_generic_jwks_uri: 

337 provider_config["jwks_uri"] = settings.sso_generic_jwks_uri 

338 providers.append(provider_config) 

339 

340 return providers 

341 

342 

343async def bootstrap_sso_providers() -> None: 

344 """Bootstrap SSO providers from environment configuration. 

345 

346 This function should be called during application startup to 

347 automatically configure SSO providers based on environment variables. 

348 

349 Examples: 

350 >>> # This would typically be called during app startup 

351 >>> import asyncio 

352 >>> asyncio.run(bootstrap_sso_providers()) # doctest: +SKIP 

353 """ 

354 if not settings.sso_enabled: 

355 return 

356 

357 # First-Party 

358 from mcpgateway.db import get_db 

359 from mcpgateway.services.sso_service import SSOService 

360 

361 providers = get_predefined_sso_providers() 

362 

363 db = next(get_db()) 

364 try: 

365 sso_service = SSOService(db) 

366 

367 # Get list of provider IDs from environment config 

368 configured_provider_ids = {p["id"] for p in providers} 

369 

370 # Disable providers not in environment config (if feature flag is enabled). 

371 # Controlled by SSO_AUTO_DISABLE_UNCONFIGURED_PROVIDERS (default: false) 

372 # to preserve backward compatibility with manually configured providers. 

373 if settings.sso_auto_disable_unconfigured_providers: 

374 for existing_provider in sso_service.list_all_providers(): 

375 if existing_provider.id not in configured_provider_ids and existing_provider.is_enabled: 

376 await sso_service.update_provider(existing_provider.id, {"is_enabled": False}) 

377 print(f"🔒 Disabled SSO provider (not in config): {existing_provider.display_name} (ID: {existing_provider.id})") 

378 

379 for provider_config in providers: 

380 # Ensure provider is enabled 

381 provider_config["is_enabled"] = True 

382 # Check if provider already exists by ID or name (both have unique constraints) 

383 existing_by_id = sso_service.get_provider(provider_config["id"]) 

384 existing_by_name = sso_service.get_provider_by_name(provider_config["name"]) 

385 

386 if not existing_by_id and not existing_by_name: 

387 await sso_service.create_provider(provider_config) 

388 print(f"✅ Created SSO provider: {provider_config['display_name']}") 

389 else: 

390 # Update existing provider with current configuration 

391 existing_provider = existing_by_id or existing_by_name 

392 

393 # Smart merge for provider_metadata (see ADR-0003 for rationale): 

394 # - Env config provides DEFAULTS for keys not in DB 

395 # - DB values are PRESERVED (Admin API changes survive restarts) 

396 # - New env keys introduced in upgrades APPLY automatically 

397 # 

398 # Trade-off: To change a key that exists in DB, use Admin API or reset provider. 

399 # This prevents env config from unexpectedly overriding intentional Admin API changes. 

400 # 

401 # Example: 

402 # env: {"groups_claim": "groups", "new_setting": "value"} 

403 # db: {"groups_claim": "custom", "sync_roles": false} 

404 # result: {"groups_claim": "custom", "new_setting": "value", "sync_roles": false} 

405 if "provider_metadata" in provider_config and existing_provider.provider_metadata: 

406 env_metadata = provider_config["provider_metadata"] or {} 

407 db_metadata = existing_provider.provider_metadata or {} 

408 # Env provides base, DB values override (preserving Admin API changes) 

409 merged_metadata = {**env_metadata, **db_metadata} 

410 provider_config["provider_metadata"] = merged_metadata 

411 

412 # Preserve DB scope when env provides only the default value; 

413 # an explicit non-default env scope takes precedence over DB. 

414 if existing_provider.scope and existing_provider.scope != "openid profile email" and provider_config.get("scope") == "openid profile email": 

415 provider_config["scope"] = existing_provider.scope 

416 

417 # Preserve DB team_mapping if env provides empty mapping 

418 if existing_provider.team_mapping and not provider_config.get("team_mapping"): 

419 provider_config["team_mapping"] = existing_provider.team_mapping 

420 

421 updated = await sso_service.update_provider(existing_provider.id, provider_config) 

422 if updated: 

423 print(f"🔄 Updated SSO provider: {provider_config['display_name']} (ID: {existing_provider.id})") 

424 else: 

425 print(f"ℹ️ SSO provider unchanged: {existing_provider.display_name} (ID: {existing_provider.id})") 

426 

427 except Exception as e: 

428 db.rollback() # Rollback on error 

429 print(f"❌ Failed to bootstrap SSO providers: {e}") 

430 finally: 

431 # Ensure close() always runs even if commit() fails 

432 # Without this nested try/finally, a commit() failure would skip close(), 

433 # leaving the connection in "idle in transaction" state 

434 try: 

435 db.commit() # Commit transaction to avoid implicit rollback 

436 finally: 

437 db.close() 

438 

439 

440if __name__ == "__main__": 

441 # Standard 

442 import asyncio 

443 

444 asyncio.run(bootstrap_sso_providers())