Coverage for mcpgateway / utils / sso_bootstrap.py: 100%
71 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/sso_bootstrap.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Bootstrap SSO providers with predefined configurations.
8"""
10# Future
11from __future__ import annotations
13# Standard
14import logging
15from typing import Dict, List
17# First-Party
18from mcpgateway.config import settings
20logger = logging.getLogger(__name__)
23def get_predefined_sso_providers() -> List[Dict]:
24 """Get list of predefined SSO providers based on environment configuration.
26 Returns:
27 List of SSO provider configurations ready for database storage.
29 Examples:
30 Default (no providers configured):
31 >>> providers = get_predefined_sso_providers()
32 >>> isinstance(providers, list)
33 True
35 Patch configuration to include GitHub provider:
36 >>> from types import SimpleNamespace
37 >>> from unittest.mock import patch
38 >>> cfg = SimpleNamespace(
39 ... sso_github_enabled=True,
40 ... sso_github_client_id='id',
41 ... sso_github_client_secret='sec',
42 ... sso_trusted_domains=[],
43 ... sso_auto_create_users=True,
44 ... sso_google_enabled=False,
45 ... sso_ibm_verify_enabled=False,
46 ... sso_okta_enabled=False,
47 ... sso_entra_enabled=False,
48 ... )
49 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
50 ... result = get_predefined_sso_providers()
51 >>> isinstance(result, list)
52 True
54 Patch configuration to include Google provider:
55 >>> cfg = SimpleNamespace(
56 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
57 ... sso_trusted_domains=[], sso_auto_create_users=True,
58 ... sso_google_enabled=True, sso_google_client_id='gid', sso_google_client_secret='gsec',
59 ... sso_ibm_verify_enabled=False, sso_okta_enabled=False, sso_entra_enabled=False
60 ... )
61 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
62 ... result = get_predefined_sso_providers()
63 >>> isinstance(result, list)
64 True
66 Patch configuration to include Okta provider:
67 >>> cfg = SimpleNamespace(
68 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
69 ... sso_trusted_domains=[], sso_auto_create_users=True,
70 ... sso_google_enabled=False, sso_okta_enabled=True, sso_okta_client_id='ok', sso_okta_client_secret='os', sso_okta_issuer='https://company.okta.com',
71 ... sso_ibm_verify_enabled=False, sso_entra_enabled=False
72 ... )
73 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
74 ... result = get_predefined_sso_providers()
75 >>> isinstance(result, list)
76 True
78 Patch configuration to include Microsoft Entra ID provider:
79 >>> cfg = SimpleNamespace(
80 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
81 ... sso_trusted_domains=[], sso_auto_create_users=True,
82 ... sso_google_enabled=False, sso_okta_enabled=False,
83 ... sso_ibm_verify_enabled=False, sso_entra_enabled=True, sso_entra_client_id='entra_client', sso_entra_client_secret='entra_secret', sso_entra_tenant_id='tenant-id-123',
84 ... sso_generic_enabled=False
85 ... )
86 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
87 ... result = get_predefined_sso_providers()
88 >>> isinstance(result, list)
89 True
91 Patch configuration to include Generic OIDC provider:
92 >>> cfg = SimpleNamespace(
93 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
94 ... sso_trusted_domains=[], sso_auto_create_users=True,
95 ... sso_google_enabled=False, sso_okta_enabled=False, sso_ibm_verify_enabled=False, sso_entra_enabled=False,
96 ... sso_generic_enabled=True, sso_generic_provider_id='keycloak', sso_generic_display_name='Keycloak',
97 ... sso_generic_client_id='kc_client', sso_generic_client_secret='kc_secret',
98 ... sso_generic_authorization_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/auth',
99 ... sso_generic_token_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/token',
100 ... sso_generic_userinfo_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/userinfo',
101 ... sso_generic_issuer='https://keycloak.company.com/auth/realms/master',
102 ... sso_generic_jwks_uri='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/certs',
103 ... sso_generic_scope='openid profile email'
104 ... )
105 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
106 ... result = get_predefined_sso_providers()
107 >>> isinstance(result, list)
108 True
109 """
110 providers = []
112 # GitHub OAuth Provider
113 if settings.sso_github_enabled and settings.sso_github_client_id:
114 providers.append(
115 {
116 "id": "github",
117 "name": "github",
118 "display_name": "GitHub",
119 "provider_type": "oauth2",
120 "client_id": settings.sso_github_client_id,
121 "client_secret": settings.sso_github_client_secret.get_secret_value() if settings.sso_github_client_secret else "",
122 "authorization_url": "https://github.com/login/oauth/authorize",
123 "token_url": "https://github.com/login/oauth/access_token", # nosec B105 - public OAuth endpoint
124 "userinfo_url": "https://api.github.com/user",
125 "scope": "user:email",
126 "trusted_domains": settings.sso_trusted_domains,
127 "auto_create_users": settings.sso_auto_create_users,
128 "team_mapping": {},
129 }
130 )
132 # Google OAuth Provider
133 if settings.sso_google_enabled and settings.sso_google_client_id:
134 providers.append(
135 {
136 "id": "google",
137 "name": "google",
138 "display_name": "Google",
139 "provider_type": "oidc",
140 "client_id": settings.sso_google_client_id,
141 "client_secret": settings.sso_google_client_secret.get_secret_value() if settings.sso_google_client_secret else "",
142 "authorization_url": "https://accounts.google.com/o/oauth2/auth",
143 "token_url": "https://oauth2.googleapis.com/token", # nosec B105 - public OAuth endpoint
144 "userinfo_url": "https://openidconnect.googleapis.com/v1/userinfo",
145 "issuer": "https://accounts.google.com",
146 "scope": "openid profile email",
147 "trusted_domains": settings.sso_trusted_domains,
148 "auto_create_users": settings.sso_auto_create_users,
149 "team_mapping": {},
150 }
151 )
153 # IBM Security Verify Provider
154 if settings.sso_ibm_verify_enabled and settings.sso_ibm_verify_client_id:
155 base_url = settings.sso_ibm_verify_issuer or "https://tenant.verify.ibm.com"
156 providers.append(
157 {
158 "id": "ibm_verify",
159 "name": "ibm_verify",
160 "display_name": "IBM Security Verify",
161 "provider_type": "oidc",
162 "client_id": settings.sso_ibm_verify_client_id,
163 "client_secret": settings.sso_ibm_verify_client_secret.get_secret_value() if settings.sso_ibm_verify_client_secret else "",
164 "authorization_url": f"{base_url}/oidc/endpoint/default/authorize",
165 "token_url": f"{base_url}/oidc/endpoint/default/token",
166 "userinfo_url": f"{base_url}/oidc/endpoint/default/userinfo",
167 "issuer": f"{base_url}/oidc/endpoint/default",
168 "scope": "openid profile email",
169 "trusted_domains": settings.sso_trusted_domains,
170 "auto_create_users": settings.sso_auto_create_users,
171 "team_mapping": {},
172 }
173 )
175 # Okta Provider
176 if settings.sso_okta_enabled and settings.sso_okta_client_id:
177 base_url = settings.sso_okta_issuer or "https://company.okta.com"
178 providers.append(
179 {
180 "id": "okta",
181 "name": "okta",
182 "display_name": "Okta",
183 "provider_type": "oidc",
184 "client_id": settings.sso_okta_client_id,
185 "client_secret": settings.sso_okta_client_secret.get_secret_value() if settings.sso_okta_client_secret else "",
186 "authorization_url": f"{base_url}/oauth2/default/v1/authorize",
187 "token_url": f"{base_url}/oauth2/default/v1/token",
188 "userinfo_url": f"{base_url}/oauth2/default/v1/userinfo",
189 "issuer": f"{base_url}/oauth2/default",
190 "scope": "openid profile email",
191 "trusted_domains": settings.sso_trusted_domains,
192 "auto_create_users": settings.sso_auto_create_users,
193 "team_mapping": {},
194 }
195 )
197 # Microsoft Entra ID Provider
198 if settings.sso_entra_enabled and settings.sso_entra_client_id and settings.sso_entra_tenant_id:
199 tenant_id = settings.sso_entra_tenant_id
200 base_url = f"https://login.microsoftonline.com/{tenant_id}"
201 providers.append(
202 {
203 "id": "entra",
204 "name": "entra",
205 "display_name": "Microsoft Entra ID",
206 "provider_type": "oidc",
207 "client_id": settings.sso_entra_client_id,
208 "client_secret": settings.sso_entra_client_secret.get_secret_value() if settings.sso_entra_client_secret else "",
209 "authorization_url": f"{base_url}/oauth2/v2.0/authorize",
210 "token_url": f"{base_url}/oauth2/v2.0/token",
211 "userinfo_url": "https://graph.microsoft.com/oidc/userinfo",
212 "issuer": f"{base_url}/v2.0",
213 "scope": "openid profile email User.Read",
214 "trusted_domains": settings.sso_trusted_domains,
215 "auto_create_users": settings.sso_auto_create_users,
216 "team_mapping": {},
217 "provider_metadata": {
218 "groups_claim": settings.sso_entra_groups_claim,
219 "role_mappings": settings.sso_entra_role_mappings,
220 "graph_api_enabled": settings.sso_entra_graph_api_enabled,
221 "graph_api_timeout": settings.sso_entra_graph_api_timeout,
222 "graph_api_max_groups": settings.sso_entra_graph_api_max_groups,
223 },
224 }
225 )
227 # Keycloak OIDC Provider with Auto-Discovery
228 if settings.sso_keycloak_enabled and settings.sso_keycloak_base_url and settings.sso_keycloak_client_id:
229 try:
230 # First-Party
231 from mcpgateway.utils.keycloak_discovery import discover_keycloak_endpoints_sync
233 endpoints = discover_keycloak_endpoints_sync(
234 settings.sso_keycloak_base_url,
235 settings.sso_keycloak_realm,
236 public_base_url=getattr(settings, "sso_keycloak_public_base_url", None),
237 )
239 if endpoints:
240 providers.append(
241 {
242 "id": "keycloak",
243 "name": "keycloak",
244 "display_name": f"Keycloak ({settings.sso_keycloak_realm})",
245 "provider_type": "oidc",
246 "client_id": settings.sso_keycloak_client_id,
247 "client_secret": settings.sso_keycloak_client_secret.get_secret_value() if settings.sso_keycloak_client_secret else "",
248 "authorization_url": endpoints["authorization_url"],
249 "token_url": endpoints["token_url"],
250 "userinfo_url": endpoints["userinfo_url"],
251 "issuer": endpoints["issuer"],
252 "jwks_uri": endpoints.get("jwks_uri"),
253 "scope": "openid profile email",
254 "trusted_domains": settings.sso_trusted_domains,
255 "auto_create_users": settings.sso_auto_create_users,
256 "team_mapping": {},
257 "provider_metadata": {
258 "realm": settings.sso_keycloak_realm,
259 "base_url": settings.sso_keycloak_base_url,
260 "public_base_url": getattr(settings, "sso_keycloak_public_base_url", None),
261 "map_realm_roles": settings.sso_keycloak_map_realm_roles,
262 "map_client_roles": settings.sso_keycloak_map_client_roles,
263 "username_claim": settings.sso_keycloak_username_claim,
264 "email_claim": settings.sso_keycloak_email_claim,
265 "groups_claim": settings.sso_keycloak_groups_claim,
266 "jwks_uri": endpoints.get("jwks_uri"),
267 "role_mappings": getattr(settings, "sso_keycloak_role_mappings", {}),
268 "default_role": getattr(settings, "sso_keycloak_default_role", None),
269 "resolve_team_scope_to_personal_team": getattr(settings, "sso_keycloak_resolve_team_scope_to_personal_team", False),
270 },
271 }
272 )
273 else:
274 logger.error(f"Failed to discover Keycloak endpoints for realm '{settings.sso_keycloak_realm}' at {settings.sso_keycloak_base_url}")
275 except Exception as e:
276 logger.error(f"Error bootstrapping Keycloak provider: {type(e).__name__}: {e}", exc_info=True)
278 # Generic OIDC Provider (Keycloak, Auth0, Authentik, etc.)
279 if settings.sso_generic_enabled and settings.sso_generic_client_id and settings.sso_generic_provider_id:
280 provider_id = settings.sso_generic_provider_id
281 display_name = settings.sso_generic_display_name or provider_id.title()
283 provider_config = {
284 "id": provider_id,
285 "name": provider_id,
286 "display_name": display_name,
287 "provider_type": "oidc",
288 "client_id": settings.sso_generic_client_id,
289 "client_secret": settings.sso_generic_client_secret.get_secret_value() if settings.sso_generic_client_secret else "",
290 "authorization_url": settings.sso_generic_authorization_url,
291 "token_url": settings.sso_generic_token_url,
292 "userinfo_url": settings.sso_generic_userinfo_url,
293 "issuer": settings.sso_generic_issuer,
294 "scope": settings.sso_generic_scope,
295 "trusted_domains": settings.sso_trusted_domains,
296 "auto_create_users": settings.sso_auto_create_users,
297 "team_mapping": {},
298 }
299 if settings.sso_generic_jwks_uri:
300 provider_config["jwks_uri"] = settings.sso_generic_jwks_uri
301 providers.append(provider_config)
303 return providers
306async def bootstrap_sso_providers() -> None:
307 """Bootstrap SSO providers from environment configuration.
309 This function should be called during application startup to
310 automatically configure SSO providers based on environment variables.
312 Examples:
313 >>> # This would typically be called during app startup
314 >>> import asyncio
315 >>> asyncio.run(bootstrap_sso_providers()) # doctest: +SKIP
316 """
317 if not settings.sso_enabled:
318 return
320 # First-Party
321 from mcpgateway.db import get_db
322 from mcpgateway.services.sso_service import SSOService
324 providers = get_predefined_sso_providers()
325 if not providers:
326 return
328 db = next(get_db())
329 try:
330 sso_service = SSOService(db)
332 for provider_config in providers:
333 # Check if provider already exists by ID or name (both have unique constraints)
334 existing_by_id = sso_service.get_provider(provider_config["id"])
335 existing_by_name = sso_service.get_provider_by_name(provider_config["name"])
337 if not existing_by_id and not existing_by_name:
338 await sso_service.create_provider(provider_config)
339 print(f"✅ Created SSO provider: {provider_config['display_name']}")
340 else:
341 # Update existing provider with current configuration
342 existing_provider = existing_by_id or existing_by_name
344 # Smart merge for provider_metadata (see ADR-0003 for rationale):
345 # - Env config provides DEFAULTS for keys not in DB
346 # - DB values are PRESERVED (Admin API changes survive restarts)
347 # - New env keys introduced in upgrades APPLY automatically
348 #
349 # Trade-off: To change a key that exists in DB, use Admin API or reset provider.
350 # This prevents env config from unexpectedly overriding intentional Admin API changes.
351 #
352 # Example:
353 # env: {"groups_claim": "groups", "new_setting": "value"}
354 # db: {"groups_claim": "custom", "sync_roles": false}
355 # result: {"groups_claim": "custom", "new_setting": "value", "sync_roles": false}
356 if "provider_metadata" in provider_config and existing_provider.provider_metadata:
357 env_metadata = provider_config["provider_metadata"] or {}
358 db_metadata = existing_provider.provider_metadata or {}
359 # Env provides base, DB values override (preserving Admin API changes)
360 merged_metadata = {**env_metadata, **db_metadata}
361 provider_config["provider_metadata"] = merged_metadata
363 updated = await sso_service.update_provider(existing_provider.id, provider_config)
364 if updated:
365 print(f"🔄 Updated SSO provider: {provider_config['display_name']} (ID: {existing_provider.id})")
366 else:
367 print(f"ℹ️ SSO provider unchanged: {existing_provider.display_name} (ID: {existing_provider.id})")
369 except Exception as e:
370 db.rollback() # Rollback on error
371 print(f"❌ Failed to bootstrap SSO providers: {e}")
372 finally:
373 # Ensure close() always runs even if commit() fails
374 # Without this nested try/finally, a commit() failure would skip close(),
375 # leaving the connection in "idle in transaction" state
376 try:
377 db.commit() # Commit transaction to avoid implicit rollback
378 finally:
379 db.close()
382if __name__ == "__main__":
383 # Standard
384 import asyncio
386 asyncio.run(bootstrap_sso_providers())