Coverage for mcpgateway / utils / sso_bootstrap.py: 100%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/sso_bootstrap.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Bootstrap SSO providers with predefined configurations.
8"""
10# Future
11from __future__ import annotations
13# Standard
14import json
15import logging
16from typing import Any, Dict, List
18# First-Party
19from mcpgateway.config import settings
20from mcpgateway.services.sso_service import ADFS_PROVIDER_ID
22logger = logging.getLogger(__name__)
25def get_predefined_sso_providers() -> List[Dict]:
26 """Get list of predefined SSO providers based on environment configuration.
28 Returns:
29 List of SSO provider configurations ready for database storage.
31 Examples:
32 Default (no providers configured):
33 >>> providers = get_predefined_sso_providers()
34 >>> isinstance(providers, list)
35 True
37 Patch configuration to include GitHub provider:
38 >>> from types import SimpleNamespace
39 >>> from unittest.mock import patch
40 >>> cfg = SimpleNamespace(
41 ... sso_github_enabled=True,
42 ... sso_github_client_id='id',
43 ... sso_github_client_secret='sec',
44 ... sso_trusted_domains=[],
45 ... sso_auto_create_users=True,
46 ... sso_google_enabled=False,
47 ... sso_ibm_verify_enabled=False,
48 ... sso_okta_enabled=False,
49 ... sso_entra_enabled=False,
50 ... )
51 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
52 ... result = get_predefined_sso_providers()
53 >>> isinstance(result, list)
54 True
56 Patch configuration to include Google provider:
57 >>> cfg = SimpleNamespace(
58 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
59 ... sso_trusted_domains=[], sso_auto_create_users=True,
60 ... sso_google_enabled=True, sso_google_client_id='gid', sso_google_client_secret='gsec',
61 ... sso_ibm_verify_enabled=False, sso_okta_enabled=False, sso_entra_enabled=False
62 ... )
63 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
64 ... result = get_predefined_sso_providers()
65 >>> isinstance(result, list)
66 True
68 Patch configuration to include Okta provider:
69 >>> cfg = SimpleNamespace(
70 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
71 ... sso_trusted_domains=[], sso_auto_create_users=True,
72 ... sso_google_enabled=False, sso_okta_enabled=True, sso_okta_client_id='ok', sso_okta_client_secret='os', sso_okta_issuer='https://company.okta.com',
73 ... sso_ibm_verify_enabled=False, sso_entra_enabled=False
74 ... )
75 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
76 ... result = get_predefined_sso_providers()
77 >>> isinstance(result, list)
78 True
80 Patch configuration to include Microsoft Entra ID provider:
81 >>> cfg = SimpleNamespace(
82 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
83 ... sso_trusted_domains=[], sso_auto_create_users=True,
84 ... sso_google_enabled=False, sso_okta_enabled=False,
85 ... sso_ibm_verify_enabled=False, sso_entra_enabled=True, sso_entra_client_id='entra_client', sso_entra_client_secret='entra_secret', sso_entra_tenant_id='tenant-id-123',
86 ... sso_generic_enabled=False
87 ... )
88 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
89 ... result = get_predefined_sso_providers()
90 >>> isinstance(result, list)
91 True
93 Patch configuration to include Generic OIDC provider:
94 >>> cfg = SimpleNamespace(
95 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
96 ... sso_trusted_domains=[], sso_auto_create_users=True,
97 ... sso_google_enabled=False, sso_okta_enabled=False, sso_ibm_verify_enabled=False, sso_entra_enabled=False,
98 ... sso_generic_enabled=True, sso_generic_provider_id='keycloak', sso_generic_display_name='Keycloak',
99 ... sso_generic_client_id='kc_client', sso_generic_client_secret='kc_secret',
100 ... sso_generic_authorization_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/auth',
101 ... sso_generic_token_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/token',
102 ... sso_generic_userinfo_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/userinfo',
103 ... sso_generic_issuer='https://keycloak.company.com/auth/realms/master',
104 ... sso_generic_jwks_uri='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/certs',
105 ... sso_generic_scope='openid profile email'
106 ... )
107 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
108 ... result = get_predefined_sso_providers()
109 >>> isinstance(result, list)
110 True
111 """
112 providers = []
114 # GitHub OAuth Provider
115 if settings.sso_github_enabled and settings.sso_github_client_id:
116 providers.append(
117 {
118 "id": "github",
119 "name": "github",
120 "display_name": "GitHub",
121 "provider_type": "oauth2",
122 "client_id": settings.sso_github_client_id,
123 "client_secret": settings.sso_github_client_secret.get_secret_value() if settings.sso_github_client_secret else "",
124 "authorization_url": "https://github.com/login/oauth/authorize",
125 "token_url": "https://github.com/login/oauth/access_token", # nosec B105 - public OAuth endpoint
126 "userinfo_url": "https://api.github.com/user",
127 "scope": "user:email",
128 "trusted_domains": settings.sso_trusted_domains,
129 "auto_create_users": settings.sso_auto_create_users,
130 "team_mapping": {},
131 }
132 )
134 # Google OAuth Provider
135 if settings.sso_google_enabled and settings.sso_google_client_id:
136 providers.append(
137 {
138 "id": "google",
139 "name": "google",
140 "display_name": "Google",
141 "provider_type": "oidc",
142 "client_id": settings.sso_google_client_id,
143 "client_secret": settings.sso_google_client_secret.get_secret_value() if settings.sso_google_client_secret else "",
144 "authorization_url": "https://accounts.google.com/o/oauth2/auth",
145 "token_url": "https://oauth2.googleapis.com/token", # nosec B105 - public OAuth endpoint
146 "userinfo_url": "https://openidconnect.googleapis.com/v1/userinfo",
147 "issuer": "https://accounts.google.com",
148 "scope": "openid profile email",
149 "trusted_domains": settings.sso_trusted_domains,
150 "auto_create_users": settings.sso_auto_create_users,
151 "team_mapping": {},
152 }
153 )
155 # IBM Security Verify Provider
156 if settings.sso_ibm_verify_enabled and settings.sso_ibm_verify_client_id:
157 base_url = settings.sso_ibm_verify_issuer or "https://tenant.verify.ibm.com"
158 providers.append(
159 {
160 "id": "ibm_verify",
161 "name": "ibm_verify",
162 "display_name": "IBM Security Verify",
163 "provider_type": "oidc",
164 "client_id": settings.sso_ibm_verify_client_id,
165 "client_secret": settings.sso_ibm_verify_client_secret.get_secret_value() if settings.sso_ibm_verify_client_secret else "",
166 "authorization_url": f"{base_url}/oidc/endpoint/default/authorize",
167 "token_url": f"{base_url}/oidc/endpoint/default/token",
168 "userinfo_url": f"{base_url}/oidc/endpoint/default/userinfo",
169 "issuer": f"{base_url}/oidc/endpoint/default",
170 "scope": "openid profile email",
171 "trusted_domains": settings.sso_trusted_domains,
172 "auto_create_users": settings.sso_auto_create_users,
173 "team_mapping": {},
174 }
175 )
177 # Okta Provider
178 if settings.sso_okta_enabled and settings.sso_okta_client_id:
179 base_url = settings.sso_okta_issuer or "https://company.okta.com"
180 okta_team_mapping: Dict[str, Any] = {}
181 if settings.okta_group_mapping:
182 try:
183 parsed = json.loads(settings.okta_group_mapping)
184 if isinstance(parsed, dict):
185 okta_team_mapping = parsed
186 else:
187 logger.warning("OKTA_GROUP_MAPPING must be a JSON object (got %s); using empty team mapping", type(parsed).__name__)
188 except (json.JSONDecodeError, TypeError):
189 logger.warning("Failed to parse OKTA_GROUP_MAPPING as JSON; using empty team mapping")
190 providers.append(
191 {
192 "id": "okta",
193 "name": "okta",
194 "display_name": "Okta",
195 "provider_type": "oidc",
196 "client_id": settings.sso_okta_client_id,
197 "client_secret": settings.sso_okta_client_secret.get_secret_value() if settings.sso_okta_client_secret else "",
198 "authorization_url": f"{base_url}/oauth2/default/v1/authorize",
199 "token_url": f"{base_url}/oauth2/default/v1/token",
200 "userinfo_url": f"{base_url}/oauth2/default/v1/userinfo",
201 "issuer": f"{base_url}/oauth2/default",
202 "scope": settings.sso_okta_scope,
203 "trusted_domains": settings.sso_trusted_domains,
204 "auto_create_users": settings.sso_auto_create_users,
205 "team_mapping": okta_team_mapping,
206 }
207 )
209 # Microsoft Entra ID Provider
210 if settings.sso_entra_enabled and settings.sso_entra_client_id and settings.sso_entra_tenant_id:
211 tenant_id = settings.sso_entra_tenant_id
212 base_url = f"https://login.microsoftonline.com/{tenant_id}"
213 providers.append(
214 {
215 "id": "entra",
216 "name": "entra",
217 "display_name": "Microsoft Entra ID",
218 "provider_type": "oidc",
219 "client_id": settings.sso_entra_client_id,
220 "client_secret": settings.sso_entra_client_secret.get_secret_value() if settings.sso_entra_client_secret else "",
221 "authorization_url": f"{base_url}/oauth2/v2.0/authorize",
222 "token_url": f"{base_url}/oauth2/v2.0/token",
223 "userinfo_url": "https://graph.microsoft.com/oidc/userinfo",
224 "issuer": f"{base_url}/v2.0",
225 "scope": "openid profile email User.Read",
226 "trusted_domains": settings.sso_trusted_domains,
227 "auto_create_users": settings.sso_auto_create_users,
228 "team_mapping": {},
229 "provider_metadata": {
230 "groups_claim": settings.sso_entra_groups_claim,
231 "role_mappings": settings.sso_entra_role_mappings,
232 "graph_api_enabled": settings.sso_entra_graph_api_enabled,
233 "graph_api_timeout": settings.sso_entra_graph_api_timeout,
234 "graph_api_max_groups": settings.sso_entra_graph_api_max_groups,
235 },
236 }
237 )
239 # Keycloak OIDC Provider with Auto-Discovery
240 if settings.sso_keycloak_enabled and settings.sso_keycloak_base_url and settings.sso_keycloak_client_id:
241 try:
242 # First-Party
243 from mcpgateway.utils.keycloak_discovery import discover_keycloak_endpoints_sync
245 endpoints = discover_keycloak_endpoints_sync(
246 settings.sso_keycloak_base_url,
247 settings.sso_keycloak_realm,
248 public_base_url=getattr(settings, "sso_keycloak_public_base_url", None),
249 )
251 if endpoints:
252 providers.append(
253 {
254 "id": "keycloak",
255 "name": "keycloak",
256 "display_name": f"Keycloak ({settings.sso_keycloak_realm})",
257 "provider_type": "oidc",
258 "client_id": settings.sso_keycloak_client_id,
259 "client_secret": settings.sso_keycloak_client_secret.get_secret_value() if settings.sso_keycloak_client_secret else "",
260 "authorization_url": endpoints["authorization_url"],
261 "token_url": endpoints["token_url"],
262 "userinfo_url": endpoints["userinfo_url"],
263 "issuer": endpoints["issuer"],
264 "jwks_uri": endpoints.get("jwks_uri"),
265 "scope": "openid profile email",
266 "trusted_domains": settings.sso_trusted_domains,
267 "auto_create_users": settings.sso_auto_create_users,
268 "team_mapping": {},
269 "provider_metadata": {
270 "realm": settings.sso_keycloak_realm,
271 "base_url": settings.sso_keycloak_base_url,
272 "public_base_url": getattr(settings, "sso_keycloak_public_base_url", None),
273 "map_realm_roles": settings.sso_keycloak_map_realm_roles,
274 "map_client_roles": settings.sso_keycloak_map_client_roles,
275 "username_claim": settings.sso_keycloak_username_claim,
276 "email_claim": settings.sso_keycloak_email_claim,
277 "groups_claim": settings.sso_keycloak_groups_claim,
278 "jwks_uri": endpoints.get("jwks_uri"),
279 "role_mappings": getattr(settings, "sso_keycloak_role_mappings", {}),
280 "default_role": getattr(settings, "sso_keycloak_default_role", None),
281 "resolve_team_scope_to_personal_team": getattr(settings, "sso_keycloak_resolve_team_scope_to_personal_team", False),
282 },
283 }
284 )
285 else:
286 logger.error(f"Failed to discover Keycloak endpoints for realm '{settings.sso_keycloak_realm}' at {settings.sso_keycloak_base_url}")
287 except Exception as e:
288 logger.error(f"Error bootstrapping Keycloak provider: {type(e).__name__}: {e}", exc_info=True)
290 # ADFS Provider
291 if settings.sso_adfs_enabled and settings.sso_adfs_client_id and settings.sso_adfs_authorization_url and settings.sso_adfs_token_url:
292 display_name = settings.sso_adfs_display_name or "ADFS Login"
294 # ADFS uses OIDC but doesn't support GET on userinfo endpoint;
295 # user info is extracted from the ID token instead.
296 providers.append(
297 {
298 "id": ADFS_PROVIDER_ID,
299 "name": ADFS_PROVIDER_ID,
300 "display_name": display_name,
301 "provider_type": "oidc",
302 "client_id": settings.sso_adfs_client_id,
303 "client_secret": settings.sso_adfs_client_secret.get_secret_value() if settings.sso_adfs_client_secret else "",
304 "authorization_url": settings.sso_adfs_authorization_url,
305 "token_url": settings.sso_adfs_token_url,
306 "userinfo_url": settings.sso_adfs_token_url, # Placeholder — not used for ADFS
307 "issuer": settings.sso_adfs_issuer,
308 "scope": settings.sso_adfs_scope or "openid profile email",
309 "trusted_domains": settings.sso_trusted_domains,
310 "auto_create_users": settings.sso_auto_create_users,
311 "team_mapping": {},
312 }
313 )
315 # Generic OIDC Provider (Keycloak, Auth0, Authentik, etc.)
316 if settings.sso_generic_enabled and settings.sso_generic_client_id and settings.sso_generic_provider_id:
317 provider_id = settings.sso_generic_provider_id
318 display_name = settings.sso_generic_display_name or provider_id.title()
320 provider_config = {
321 "id": provider_id,
322 "name": provider_id,
323 "display_name": display_name,
324 "provider_type": "oidc",
325 "client_id": settings.sso_generic_client_id,
326 "client_secret": settings.sso_generic_client_secret.get_secret_value() if settings.sso_generic_client_secret else "",
327 "authorization_url": settings.sso_generic_authorization_url,
328 "token_url": settings.sso_generic_token_url,
329 "userinfo_url": settings.sso_generic_userinfo_url,
330 "issuer": settings.sso_generic_issuer,
331 "scope": settings.sso_generic_scope,
332 "trusted_domains": settings.sso_trusted_domains,
333 "auto_create_users": settings.sso_auto_create_users,
334 "team_mapping": {},
335 }
336 if settings.sso_generic_jwks_uri:
337 provider_config["jwks_uri"] = settings.sso_generic_jwks_uri
338 providers.append(provider_config)
340 return providers
343async def bootstrap_sso_providers() -> None:
344 """Bootstrap SSO providers from environment configuration.
346 This function should be called during application startup to
347 automatically configure SSO providers based on environment variables.
349 Examples:
350 >>> # This would typically be called during app startup
351 >>> import asyncio
352 >>> asyncio.run(bootstrap_sso_providers()) # doctest: +SKIP
353 """
354 if not settings.sso_enabled:
355 return
357 # First-Party
358 from mcpgateway.db import get_db
359 from mcpgateway.services.sso_service import SSOService
361 providers = get_predefined_sso_providers()
363 db = next(get_db())
364 try:
365 sso_service = SSOService(db)
367 # Get list of provider IDs from environment config
368 configured_provider_ids = {p["id"] for p in providers}
370 # Disable providers not in environment config (if feature flag is enabled).
371 # Controlled by SSO_AUTO_DISABLE_UNCONFIGURED_PROVIDERS (default: false)
372 # to preserve backward compatibility with manually configured providers.
373 if settings.sso_auto_disable_unconfigured_providers:
374 for existing_provider in sso_service.list_all_providers():
375 if existing_provider.id not in configured_provider_ids and existing_provider.is_enabled:
376 await sso_service.update_provider(existing_provider.id, {"is_enabled": False})
377 print(f"🔒 Disabled SSO provider (not in config): {existing_provider.display_name} (ID: {existing_provider.id})")
379 for provider_config in providers:
380 # Ensure provider is enabled
381 provider_config["is_enabled"] = True
382 # Check if provider already exists by ID or name (both have unique constraints)
383 existing_by_id = sso_service.get_provider(provider_config["id"])
384 existing_by_name = sso_service.get_provider_by_name(provider_config["name"])
386 if not existing_by_id and not existing_by_name:
387 await sso_service.create_provider(provider_config)
388 print(f"✅ Created SSO provider: {provider_config['display_name']}")
389 else:
390 # Update existing provider with current configuration
391 existing_provider = existing_by_id or existing_by_name
393 # Smart merge for provider_metadata (see ADR-0003 for rationale):
394 # - Env config provides DEFAULTS for keys not in DB
395 # - DB values are PRESERVED (Admin API changes survive restarts)
396 # - New env keys introduced in upgrades APPLY automatically
397 #
398 # Trade-off: To change a key that exists in DB, use Admin API or reset provider.
399 # This prevents env config from unexpectedly overriding intentional Admin API changes.
400 #
401 # Example:
402 # env: {"groups_claim": "groups", "new_setting": "value"}
403 # db: {"groups_claim": "custom", "sync_roles": false}
404 # result: {"groups_claim": "custom", "new_setting": "value", "sync_roles": false}
405 if "provider_metadata" in provider_config and existing_provider.provider_metadata:
406 env_metadata = provider_config["provider_metadata"] or {}
407 db_metadata = existing_provider.provider_metadata or {}
408 # Env provides base, DB values override (preserving Admin API changes)
409 merged_metadata = {**env_metadata, **db_metadata}
410 provider_config["provider_metadata"] = merged_metadata
412 # Preserve DB scope when env provides only the default value;
413 # an explicit non-default env scope takes precedence over DB.
414 if existing_provider.scope and existing_provider.scope != "openid profile email" and provider_config.get("scope") == "openid profile email":
415 provider_config["scope"] = existing_provider.scope
417 # Preserve DB team_mapping if env provides empty mapping
418 if existing_provider.team_mapping and not provider_config.get("team_mapping"):
419 provider_config["team_mapping"] = existing_provider.team_mapping
421 updated = await sso_service.update_provider(existing_provider.id, provider_config)
422 if updated:
423 print(f"🔄 Updated SSO provider: {provider_config['display_name']} (ID: {existing_provider.id})")
424 else:
425 print(f"ℹ️ SSO provider unchanged: {existing_provider.display_name} (ID: {existing_provider.id})")
427 except Exception as e:
428 db.rollback() # Rollback on error
429 print(f"❌ Failed to bootstrap SSO providers: {e}")
430 finally:
431 # Ensure close() always runs even if commit() fails
432 # Without this nested try/finally, a commit() failure would skip close(),
433 # leaving the connection in "idle in transaction" state
434 try:
435 db.commit() # Commit transaction to avoid implicit rollback
436 finally:
437 db.close()
440if __name__ == "__main__":
441 # Standard
442 import asyncio
444 asyncio.run(bootstrap_sso_providers())