Coverage for mcpgateway / utils / sso_bootstrap.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/sso_bootstrap.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Bootstrap SSO providers with predefined configurations. 

8""" 

9 

10# Future 

11from __future__ import annotations 

12 

13# Standard 

14import logging 

15from typing import Dict, List 

16 

17# First-Party 

18from mcpgateway.config import settings 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def get_predefined_sso_providers() -> List[Dict]: 

24 """Get list of predefined SSO providers based on environment configuration. 

25 

26 Returns: 

27 List of SSO provider configurations ready for database storage. 

28 

29 Examples: 

30 Default (no providers configured): 

31 >>> providers = get_predefined_sso_providers() 

32 >>> isinstance(providers, list) 

33 True 

34 

35 Patch configuration to include GitHub provider: 

36 >>> from types import SimpleNamespace 

37 >>> from unittest.mock import patch 

38 >>> cfg = SimpleNamespace( 

39 ... sso_github_enabled=True, 

40 ... sso_github_client_id='id', 

41 ... sso_github_client_secret='sec', 

42 ... sso_trusted_domains=[], 

43 ... sso_auto_create_users=True, 

44 ... sso_google_enabled=False, 

45 ... sso_ibm_verify_enabled=False, 

46 ... sso_okta_enabled=False, 

47 ... sso_entra_enabled=False, 

48 ... ) 

49 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

50 ... result = get_predefined_sso_providers() 

51 >>> isinstance(result, list) 

52 True 

53 

54 Patch configuration to include Google provider: 

55 >>> cfg = SimpleNamespace( 

56 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

57 ... sso_trusted_domains=[], sso_auto_create_users=True, 

58 ... sso_google_enabled=True, sso_google_client_id='gid', sso_google_client_secret='gsec', 

59 ... sso_ibm_verify_enabled=False, sso_okta_enabled=False, sso_entra_enabled=False 

60 ... ) 

61 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

62 ... result = get_predefined_sso_providers() 

63 >>> isinstance(result, list) 

64 True 

65 

66 Patch configuration to include Okta provider: 

67 >>> cfg = SimpleNamespace( 

68 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

69 ... sso_trusted_domains=[], sso_auto_create_users=True, 

70 ... sso_google_enabled=False, sso_okta_enabled=True, sso_okta_client_id='ok', sso_okta_client_secret='os', sso_okta_issuer='https://company.okta.com', 

71 ... sso_ibm_verify_enabled=False, sso_entra_enabled=False 

72 ... ) 

73 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

74 ... result = get_predefined_sso_providers() 

75 >>> isinstance(result, list) 

76 True 

77 

78 Patch configuration to include Microsoft Entra ID provider: 

79 >>> cfg = SimpleNamespace( 

80 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

81 ... sso_trusted_domains=[], sso_auto_create_users=True, 

82 ... sso_google_enabled=False, sso_okta_enabled=False, 

83 ... sso_ibm_verify_enabled=False, sso_entra_enabled=True, sso_entra_client_id='entra_client', sso_entra_client_secret='entra_secret', sso_entra_tenant_id='tenant-id-123', 

84 ... sso_generic_enabled=False 

85 ... ) 

86 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

87 ... result = get_predefined_sso_providers() 

88 >>> isinstance(result, list) 

89 True 

90 

91 Patch configuration to include Generic OIDC provider: 

92 >>> cfg = SimpleNamespace( 

93 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

94 ... sso_trusted_domains=[], sso_auto_create_users=True, 

95 ... sso_google_enabled=False, sso_okta_enabled=False, sso_ibm_verify_enabled=False, sso_entra_enabled=False, 

96 ... sso_generic_enabled=True, sso_generic_provider_id='keycloak', sso_generic_display_name='Keycloak', 

97 ... sso_generic_client_id='kc_client', sso_generic_client_secret='kc_secret', 

98 ... sso_generic_authorization_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/auth', 

99 ... sso_generic_token_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/token', 

100 ... sso_generic_userinfo_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/userinfo', 

101 ... sso_generic_issuer='https://keycloak.company.com/auth/realms/master', 

102 ... sso_generic_jwks_uri='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/certs', 

103 ... sso_generic_scope='openid profile email' 

104 ... ) 

105 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

106 ... result = get_predefined_sso_providers() 

107 >>> isinstance(result, list) 

108 True 

109 """ 

110 providers = [] 

111 

112 # GitHub OAuth Provider 

113 if settings.sso_github_enabled and settings.sso_github_client_id: 

114 providers.append( 

115 { 

116 "id": "github", 

117 "name": "github", 

118 "display_name": "GitHub", 

119 "provider_type": "oauth2", 

120 "client_id": settings.sso_github_client_id, 

121 "client_secret": settings.sso_github_client_secret.get_secret_value() if settings.sso_github_client_secret else "", 

122 "authorization_url": "https://github.com/login/oauth/authorize", 

123 "token_url": "https://github.com/login/oauth/access_token", # nosec B105 - public OAuth endpoint 

124 "userinfo_url": "https://api.github.com/user", 

125 "scope": "user:email", 

126 "trusted_domains": settings.sso_trusted_domains, 

127 "auto_create_users": settings.sso_auto_create_users, 

128 "team_mapping": {}, 

129 } 

130 ) 

131 

132 # Google OAuth Provider 

133 if settings.sso_google_enabled and settings.sso_google_client_id: 

134 providers.append( 

135 { 

136 "id": "google", 

137 "name": "google", 

138 "display_name": "Google", 

139 "provider_type": "oidc", 

140 "client_id": settings.sso_google_client_id, 

141 "client_secret": settings.sso_google_client_secret.get_secret_value() if settings.sso_google_client_secret else "", 

142 "authorization_url": "https://accounts.google.com/o/oauth2/auth", 

143 "token_url": "https://oauth2.googleapis.com/token", # nosec B105 - public OAuth endpoint 

144 "userinfo_url": "https://openidconnect.googleapis.com/v1/userinfo", 

145 "issuer": "https://accounts.google.com", 

146 "scope": "openid profile email", 

147 "trusted_domains": settings.sso_trusted_domains, 

148 "auto_create_users": settings.sso_auto_create_users, 

149 "team_mapping": {}, 

150 } 

151 ) 

152 

153 # IBM Security Verify Provider 

154 if settings.sso_ibm_verify_enabled and settings.sso_ibm_verify_client_id: 

155 base_url = settings.sso_ibm_verify_issuer or "https://tenant.verify.ibm.com" 

156 providers.append( 

157 { 

158 "id": "ibm_verify", 

159 "name": "ibm_verify", 

160 "display_name": "IBM Security Verify", 

161 "provider_type": "oidc", 

162 "client_id": settings.sso_ibm_verify_client_id, 

163 "client_secret": settings.sso_ibm_verify_client_secret.get_secret_value() if settings.sso_ibm_verify_client_secret else "", 

164 "authorization_url": f"{base_url}/oidc/endpoint/default/authorize", 

165 "token_url": f"{base_url}/oidc/endpoint/default/token", 

166 "userinfo_url": f"{base_url}/oidc/endpoint/default/userinfo", 

167 "issuer": f"{base_url}/oidc/endpoint/default", 

168 "scope": "openid profile email", 

169 "trusted_domains": settings.sso_trusted_domains, 

170 "auto_create_users": settings.sso_auto_create_users, 

171 "team_mapping": {}, 

172 } 

173 ) 

174 

175 # Okta Provider 

176 if settings.sso_okta_enabled and settings.sso_okta_client_id: 

177 base_url = settings.sso_okta_issuer or "https://company.okta.com" 

178 providers.append( 

179 { 

180 "id": "okta", 

181 "name": "okta", 

182 "display_name": "Okta", 

183 "provider_type": "oidc", 

184 "client_id": settings.sso_okta_client_id, 

185 "client_secret": settings.sso_okta_client_secret.get_secret_value() if settings.sso_okta_client_secret else "", 

186 "authorization_url": f"{base_url}/oauth2/default/v1/authorize", 

187 "token_url": f"{base_url}/oauth2/default/v1/token", 

188 "userinfo_url": f"{base_url}/oauth2/default/v1/userinfo", 

189 "issuer": f"{base_url}/oauth2/default", 

190 "scope": "openid profile email", 

191 "trusted_domains": settings.sso_trusted_domains, 

192 "auto_create_users": settings.sso_auto_create_users, 

193 "team_mapping": {}, 

194 } 

195 ) 

196 

197 # Microsoft Entra ID Provider 

198 if settings.sso_entra_enabled and settings.sso_entra_client_id and settings.sso_entra_tenant_id: 

199 tenant_id = settings.sso_entra_tenant_id 

200 base_url = f"https://login.microsoftonline.com/{tenant_id}" 

201 providers.append( 

202 { 

203 "id": "entra", 

204 "name": "entra", 

205 "display_name": "Microsoft Entra ID", 

206 "provider_type": "oidc", 

207 "client_id": settings.sso_entra_client_id, 

208 "client_secret": settings.sso_entra_client_secret.get_secret_value() if settings.sso_entra_client_secret else "", 

209 "authorization_url": f"{base_url}/oauth2/v2.0/authorize", 

210 "token_url": f"{base_url}/oauth2/v2.0/token", 

211 "userinfo_url": "https://graph.microsoft.com/oidc/userinfo", 

212 "issuer": f"{base_url}/v2.0", 

213 "scope": "openid profile email User.Read", 

214 "trusted_domains": settings.sso_trusted_domains, 

215 "auto_create_users": settings.sso_auto_create_users, 

216 "team_mapping": {}, 

217 "provider_metadata": { 

218 "groups_claim": settings.sso_entra_groups_claim, 

219 "role_mappings": settings.sso_entra_role_mappings, 

220 "graph_api_enabled": settings.sso_entra_graph_api_enabled, 

221 "graph_api_timeout": settings.sso_entra_graph_api_timeout, 

222 "graph_api_max_groups": settings.sso_entra_graph_api_max_groups, 

223 }, 

224 } 

225 ) 

226 

227 # Keycloak OIDC Provider with Auto-Discovery 

228 if settings.sso_keycloak_enabled and settings.sso_keycloak_base_url and settings.sso_keycloak_client_id: 

229 try: 

230 # First-Party 

231 from mcpgateway.utils.keycloak_discovery import discover_keycloak_endpoints_sync 

232 

233 endpoints = discover_keycloak_endpoints_sync( 

234 settings.sso_keycloak_base_url, 

235 settings.sso_keycloak_realm, 

236 public_base_url=getattr(settings, "sso_keycloak_public_base_url", None), 

237 ) 

238 

239 if endpoints: 

240 providers.append( 

241 { 

242 "id": "keycloak", 

243 "name": "keycloak", 

244 "display_name": f"Keycloak ({settings.sso_keycloak_realm})", 

245 "provider_type": "oidc", 

246 "client_id": settings.sso_keycloak_client_id, 

247 "client_secret": settings.sso_keycloak_client_secret.get_secret_value() if settings.sso_keycloak_client_secret else "", 

248 "authorization_url": endpoints["authorization_url"], 

249 "token_url": endpoints["token_url"], 

250 "userinfo_url": endpoints["userinfo_url"], 

251 "issuer": endpoints["issuer"], 

252 "jwks_uri": endpoints.get("jwks_uri"), 

253 "scope": "openid profile email", 

254 "trusted_domains": settings.sso_trusted_domains, 

255 "auto_create_users": settings.sso_auto_create_users, 

256 "team_mapping": {}, 

257 "provider_metadata": { 

258 "realm": settings.sso_keycloak_realm, 

259 "base_url": settings.sso_keycloak_base_url, 

260 "public_base_url": getattr(settings, "sso_keycloak_public_base_url", None), 

261 "map_realm_roles": settings.sso_keycloak_map_realm_roles, 

262 "map_client_roles": settings.sso_keycloak_map_client_roles, 

263 "username_claim": settings.sso_keycloak_username_claim, 

264 "email_claim": settings.sso_keycloak_email_claim, 

265 "groups_claim": settings.sso_keycloak_groups_claim, 

266 "jwks_uri": endpoints.get("jwks_uri"), 

267 "role_mappings": getattr(settings, "sso_keycloak_role_mappings", {}), 

268 "default_role": getattr(settings, "sso_keycloak_default_role", None), 

269 "resolve_team_scope_to_personal_team": getattr(settings, "sso_keycloak_resolve_team_scope_to_personal_team", False), 

270 }, 

271 } 

272 ) 

273 else: 

274 logger.error(f"Failed to discover Keycloak endpoints for realm '{settings.sso_keycloak_realm}' at {settings.sso_keycloak_base_url}") 

275 except Exception as e: 

276 logger.error(f"Error bootstrapping Keycloak provider: {type(e).__name__}: {e}", exc_info=True) 

277 

278 # Generic OIDC Provider (Keycloak, Auth0, Authentik, etc.) 

279 if settings.sso_generic_enabled and settings.sso_generic_client_id and settings.sso_generic_provider_id: 

280 provider_id = settings.sso_generic_provider_id 

281 display_name = settings.sso_generic_display_name or provider_id.title() 

282 

283 provider_config = { 

284 "id": provider_id, 

285 "name": provider_id, 

286 "display_name": display_name, 

287 "provider_type": "oidc", 

288 "client_id": settings.sso_generic_client_id, 

289 "client_secret": settings.sso_generic_client_secret.get_secret_value() if settings.sso_generic_client_secret else "", 

290 "authorization_url": settings.sso_generic_authorization_url, 

291 "token_url": settings.sso_generic_token_url, 

292 "userinfo_url": settings.sso_generic_userinfo_url, 

293 "issuer": settings.sso_generic_issuer, 

294 "scope": settings.sso_generic_scope, 

295 "trusted_domains": settings.sso_trusted_domains, 

296 "auto_create_users": settings.sso_auto_create_users, 

297 "team_mapping": {}, 

298 } 

299 if settings.sso_generic_jwks_uri: 

300 provider_config["jwks_uri"] = settings.sso_generic_jwks_uri 

301 providers.append(provider_config) 

302 

303 return providers 

304 

305 

306async def bootstrap_sso_providers() -> None: 

307 """Bootstrap SSO providers from environment configuration. 

308 

309 This function should be called during application startup to 

310 automatically configure SSO providers based on environment variables. 

311 

312 Examples: 

313 >>> # This would typically be called during app startup 

314 >>> import asyncio 

315 >>> asyncio.run(bootstrap_sso_providers()) # doctest: +SKIP 

316 """ 

317 if not settings.sso_enabled: 

318 return 

319 

320 # First-Party 

321 from mcpgateway.db import get_db 

322 from mcpgateway.services.sso_service import SSOService 

323 

324 providers = get_predefined_sso_providers() 

325 if not providers: 

326 return 

327 

328 db = next(get_db()) 

329 try: 

330 sso_service = SSOService(db) 

331 

332 for provider_config in providers: 

333 # Check if provider already exists by ID or name (both have unique constraints) 

334 existing_by_id = sso_service.get_provider(provider_config["id"]) 

335 existing_by_name = sso_service.get_provider_by_name(provider_config["name"]) 

336 

337 if not existing_by_id and not existing_by_name: 

338 await sso_service.create_provider(provider_config) 

339 print(f"✅ Created SSO provider: {provider_config['display_name']}") 

340 else: 

341 # Update existing provider with current configuration 

342 existing_provider = existing_by_id or existing_by_name 

343 

344 # Smart merge for provider_metadata (see ADR-0003 for rationale): 

345 # - Env config provides DEFAULTS for keys not in DB 

346 # - DB values are PRESERVED (Admin API changes survive restarts) 

347 # - New env keys introduced in upgrades APPLY automatically 

348 # 

349 # Trade-off: To change a key that exists in DB, use Admin API or reset provider. 

350 # This prevents env config from unexpectedly overriding intentional Admin API changes. 

351 # 

352 # Example: 

353 # env: {"groups_claim": "groups", "new_setting": "value"} 

354 # db: {"groups_claim": "custom", "sync_roles": false} 

355 # result: {"groups_claim": "custom", "new_setting": "value", "sync_roles": false} 

356 if "provider_metadata" in provider_config and existing_provider.provider_metadata: 

357 env_metadata = provider_config["provider_metadata"] or {} 

358 db_metadata = existing_provider.provider_metadata or {} 

359 # Env provides base, DB values override (preserving Admin API changes) 

360 merged_metadata = {**env_metadata, **db_metadata} 

361 provider_config["provider_metadata"] = merged_metadata 

362 

363 updated = await sso_service.update_provider(existing_provider.id, provider_config) 

364 if updated: 

365 print(f"🔄 Updated SSO provider: {provider_config['display_name']} (ID: {existing_provider.id})") 

366 else: 

367 print(f"ℹ️ SSO provider unchanged: {existing_provider.display_name} (ID: {existing_provider.id})") 

368 

369 except Exception as e: 

370 db.rollback() # Rollback on error 

371 print(f"❌ Failed to bootstrap SSO providers: {e}") 

372 finally: 

373 # Ensure close() always runs even if commit() fails 

374 # Without this nested try/finally, a commit() failure would skip close(), 

375 # leaving the connection in "idle in transaction" state 

376 try: 

377 db.commit() # Commit transaction to avoid implicit rollback 

378 finally: 

379 db.close() 

380 

381 

382if __name__ == "__main__": 

383 # Standard 

384 import asyncio 

385 

386 asyncio.run(bootstrap_sso_providers())