Coverage for mcpgateway / utils / sso_bootstrap.py: 100%
68 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/sso_bootstrap.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Bootstrap SSO providers with predefined configurations.
8"""
10# Future
11from __future__ import annotations
13# Standard
14import logging
15from typing import Dict, List
17# First-Party
18from mcpgateway.config import settings
20logger = logging.getLogger(__name__)
23def get_predefined_sso_providers() -> List[Dict]:
24 """Get list of predefined SSO providers based on environment configuration.
26 Returns:
27 List of SSO provider configurations ready for database storage.
29 Examples:
30 Default (no providers configured):
31 >>> providers = get_predefined_sso_providers()
32 >>> isinstance(providers, list)
33 True
35 Patch configuration to include GitHub provider:
36 >>> from types import SimpleNamespace
37 >>> from unittest.mock import patch
38 >>> cfg = SimpleNamespace(
39 ... sso_github_enabled=True,
40 ... sso_github_client_id='id',
41 ... sso_github_client_secret='sec',
42 ... sso_trusted_domains=[],
43 ... sso_auto_create_users=True,
44 ... sso_google_enabled=False,
45 ... sso_ibm_verify_enabled=False,
46 ... sso_okta_enabled=False,
47 ... sso_entra_enabled=False,
48 ... )
49 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
50 ... result = get_predefined_sso_providers()
51 >>> isinstance(result, list)
52 True
54 Patch configuration to include Google provider:
55 >>> cfg = SimpleNamespace(
56 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
57 ... sso_trusted_domains=[], sso_auto_create_users=True,
58 ... sso_google_enabled=True, sso_google_client_id='gid', sso_google_client_secret='gsec',
59 ... sso_ibm_verify_enabled=False, sso_okta_enabled=False, sso_entra_enabled=False
60 ... )
61 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
62 ... result = get_predefined_sso_providers()
63 >>> isinstance(result, list)
64 True
66 Patch configuration to include Okta provider:
67 >>> cfg = SimpleNamespace(
68 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
69 ... sso_trusted_domains=[], sso_auto_create_users=True,
70 ... sso_google_enabled=False, sso_okta_enabled=True, sso_okta_client_id='ok', sso_okta_client_secret='os', sso_okta_issuer='https://company.okta.com',
71 ... sso_ibm_verify_enabled=False, sso_entra_enabled=False
72 ... )
73 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
74 ... result = get_predefined_sso_providers()
75 >>> isinstance(result, list)
76 True
78 Patch configuration to include Microsoft Entra ID provider:
79 >>> cfg = SimpleNamespace(
80 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
81 ... sso_trusted_domains=[], sso_auto_create_users=True,
82 ... sso_google_enabled=False, sso_okta_enabled=False,
83 ... sso_ibm_verify_enabled=False, sso_entra_enabled=True, sso_entra_client_id='entra_client', sso_entra_client_secret='entra_secret', sso_entra_tenant_id='tenant-id-123',
84 ... sso_generic_enabled=False
85 ... )
86 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
87 ... result = get_predefined_sso_providers()
88 >>> isinstance(result, list)
89 True
91 Patch configuration to include Generic OIDC provider:
92 >>> cfg = SimpleNamespace(
93 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None,
94 ... sso_trusted_domains=[], sso_auto_create_users=True,
95 ... sso_google_enabled=False, sso_okta_enabled=False, sso_ibm_verify_enabled=False, sso_entra_enabled=False,
96 ... sso_generic_enabled=True, sso_generic_provider_id='keycloak', sso_generic_display_name='Keycloak',
97 ... sso_generic_client_id='kc_client', sso_generic_client_secret='kc_secret',
98 ... sso_generic_authorization_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/auth',
99 ... sso_generic_token_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/token',
100 ... sso_generic_userinfo_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/userinfo',
101 ... sso_generic_issuer='https://keycloak.company.com/auth/realms/master',
102 ... sso_generic_scope='openid profile email'
103 ... )
104 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg):
105 ... result = get_predefined_sso_providers()
106 >>> isinstance(result, list)
107 True
108 """
109 providers = []
111 # GitHub OAuth Provider
112 if settings.sso_github_enabled and settings.sso_github_client_id:
113 providers.append(
114 {
115 "id": "github",
116 "name": "github",
117 "display_name": "GitHub",
118 "provider_type": "oauth2",
119 "client_id": settings.sso_github_client_id,
120 "client_secret": settings.sso_github_client_secret.get_secret_value() if settings.sso_github_client_secret else "",
121 "authorization_url": "https://github.com/login/oauth/authorize",
122 "token_url": "https://github.com/login/oauth/access_token", # nosec B105 - public OAuth endpoint
123 "userinfo_url": "https://api.github.com/user",
124 "scope": "user:email",
125 "trusted_domains": settings.sso_trusted_domains,
126 "auto_create_users": settings.sso_auto_create_users,
127 "team_mapping": {},
128 }
129 )
131 # Google OAuth Provider
132 if settings.sso_google_enabled and settings.sso_google_client_id:
133 providers.append(
134 {
135 "id": "google",
136 "name": "google",
137 "display_name": "Google",
138 "provider_type": "oidc",
139 "client_id": settings.sso_google_client_id,
140 "client_secret": settings.sso_google_client_secret.get_secret_value() if settings.sso_google_client_secret else "",
141 "authorization_url": "https://accounts.google.com/o/oauth2/auth",
142 "token_url": "https://oauth2.googleapis.com/token", # nosec B105 - public OAuth endpoint
143 "userinfo_url": "https://openidconnect.googleapis.com/v1/userinfo",
144 "issuer": "https://accounts.google.com",
145 "scope": "openid profile email",
146 "trusted_domains": settings.sso_trusted_domains,
147 "auto_create_users": settings.sso_auto_create_users,
148 "team_mapping": {},
149 }
150 )
152 # IBM Security Verify Provider
153 if settings.sso_ibm_verify_enabled and settings.sso_ibm_verify_client_id:
154 base_url = settings.sso_ibm_verify_issuer or "https://tenant.verify.ibm.com"
155 providers.append(
156 {
157 "id": "ibm_verify",
158 "name": "ibm_verify",
159 "display_name": "IBM Security Verify",
160 "provider_type": "oidc",
161 "client_id": settings.sso_ibm_verify_client_id,
162 "client_secret": settings.sso_ibm_verify_client_secret.get_secret_value() if settings.sso_ibm_verify_client_secret else "",
163 "authorization_url": f"{base_url}/oidc/endpoint/default/authorize",
164 "token_url": f"{base_url}/oidc/endpoint/default/token",
165 "userinfo_url": f"{base_url}/oidc/endpoint/default/userinfo",
166 "issuer": f"{base_url}/oidc/endpoint/default",
167 "scope": "openid profile email",
168 "trusted_domains": settings.sso_trusted_domains,
169 "auto_create_users": settings.sso_auto_create_users,
170 "team_mapping": {},
171 }
172 )
174 # Okta Provider
175 if settings.sso_okta_enabled and settings.sso_okta_client_id:
176 base_url = settings.sso_okta_issuer or "https://company.okta.com"
177 providers.append(
178 {
179 "id": "okta",
180 "name": "okta",
181 "display_name": "Okta",
182 "provider_type": "oidc",
183 "client_id": settings.sso_okta_client_id,
184 "client_secret": settings.sso_okta_client_secret.get_secret_value() if settings.sso_okta_client_secret else "",
185 "authorization_url": f"{base_url}/oauth2/default/v1/authorize",
186 "token_url": f"{base_url}/oauth2/default/v1/token",
187 "userinfo_url": f"{base_url}/oauth2/default/v1/userinfo",
188 "issuer": f"{base_url}/oauth2/default",
189 "scope": "openid profile email",
190 "trusted_domains": settings.sso_trusted_domains,
191 "auto_create_users": settings.sso_auto_create_users,
192 "team_mapping": {},
193 }
194 )
196 # Microsoft Entra ID Provider
197 if settings.sso_entra_enabled and settings.sso_entra_client_id and settings.sso_entra_tenant_id:
198 tenant_id = settings.sso_entra_tenant_id
199 base_url = f"https://login.microsoftonline.com/{tenant_id}"
200 providers.append(
201 {
202 "id": "entra",
203 "name": "entra",
204 "display_name": "Microsoft Entra ID",
205 "provider_type": "oidc",
206 "client_id": settings.sso_entra_client_id,
207 "client_secret": settings.sso_entra_client_secret.get_secret_value() if settings.sso_entra_client_secret else "",
208 "authorization_url": f"{base_url}/oauth2/v2.0/authorize",
209 "token_url": f"{base_url}/oauth2/v2.0/token",
210 "userinfo_url": "https://graph.microsoft.com/oidc/userinfo",
211 "issuer": f"{base_url}/v2.0",
212 "scope": "openid profile email",
213 "trusted_domains": settings.sso_trusted_domains,
214 "auto_create_users": settings.sso_auto_create_users,
215 "team_mapping": {},
216 "provider_metadata": {
217 "groups_claim": settings.sso_entra_groups_claim,
218 "role_mappings": settings.sso_entra_role_mappings,
219 },
220 }
221 )
223 # Keycloak OIDC Provider with Auto-Discovery
224 if settings.sso_keycloak_enabled and settings.sso_keycloak_base_url and settings.sso_keycloak_client_id:
225 try:
226 # First-Party
227 from mcpgateway.utils.keycloak_discovery import discover_keycloak_endpoints_sync
229 endpoints = discover_keycloak_endpoints_sync(settings.sso_keycloak_base_url, settings.sso_keycloak_realm)
231 if endpoints:
232 providers.append(
233 {
234 "id": "keycloak",
235 "name": "keycloak",
236 "display_name": f"Keycloak ({settings.sso_keycloak_realm})",
237 "provider_type": "oidc",
238 "client_id": settings.sso_keycloak_client_id,
239 "client_secret": settings.sso_keycloak_client_secret.get_secret_value() if settings.sso_keycloak_client_secret else "",
240 "authorization_url": endpoints["authorization_url"],
241 "token_url": endpoints["token_url"],
242 "userinfo_url": endpoints["userinfo_url"],
243 "issuer": endpoints["issuer"],
244 "jwks_uri": endpoints.get("jwks_uri"),
245 "scope": "openid profile email",
246 "trusted_domains": settings.sso_trusted_domains,
247 "auto_create_users": settings.sso_auto_create_users,
248 "team_mapping": {},
249 "provider_metadata": {
250 "realm": settings.sso_keycloak_realm,
251 "base_url": settings.sso_keycloak_base_url,
252 "map_realm_roles": settings.sso_keycloak_map_realm_roles,
253 "map_client_roles": settings.sso_keycloak_map_client_roles,
254 "username_claim": settings.sso_keycloak_username_claim,
255 "email_claim": settings.sso_keycloak_email_claim,
256 "groups_claim": settings.sso_keycloak_groups_claim,
257 },
258 }
259 )
260 else:
261 logger.error(f"Failed to discover Keycloak endpoints for realm '{settings.sso_keycloak_realm}' at {settings.sso_keycloak_base_url}")
262 except Exception as e:
263 logger.error(f"Error bootstrapping Keycloak provider: {e}")
265 # Generic OIDC Provider (Keycloak, Auth0, Authentik, etc.)
266 if settings.sso_generic_enabled and settings.sso_generic_client_id and settings.sso_generic_provider_id:
267 provider_id = settings.sso_generic_provider_id
268 display_name = settings.sso_generic_display_name or provider_id.title()
270 providers.append(
271 {
272 "id": provider_id,
273 "name": provider_id,
274 "display_name": display_name,
275 "provider_type": "oidc",
276 "client_id": settings.sso_generic_client_id,
277 "client_secret": settings.sso_generic_client_secret.get_secret_value() if settings.sso_generic_client_secret else "",
278 "authorization_url": settings.sso_generic_authorization_url,
279 "token_url": settings.sso_generic_token_url,
280 "userinfo_url": settings.sso_generic_userinfo_url,
281 "issuer": settings.sso_generic_issuer,
282 "scope": settings.sso_generic_scope,
283 "trusted_domains": settings.sso_trusted_domains,
284 "auto_create_users": settings.sso_auto_create_users,
285 "team_mapping": {},
286 }
287 )
289 return providers
292async def bootstrap_sso_providers() -> None:
293 """Bootstrap SSO providers from environment configuration.
295 This function should be called during application startup to
296 automatically configure SSO providers based on environment variables.
298 Examples:
299 >>> # This would typically be called during app startup
300 >>> import asyncio
301 >>> asyncio.run(bootstrap_sso_providers()) # doctest: +SKIP
302 """
303 if not settings.sso_enabled:
304 return
306 # First-Party
307 from mcpgateway.db import get_db
308 from mcpgateway.services.sso_service import SSOService
310 providers = get_predefined_sso_providers()
311 if not providers:
312 return
314 db = next(get_db())
315 try:
316 sso_service = SSOService(db)
318 for provider_config in providers:
319 # Check if provider already exists by ID or name (both have unique constraints)
320 existing_by_id = sso_service.get_provider(provider_config["id"])
321 existing_by_name = sso_service.get_provider_by_name(provider_config["name"])
323 if not existing_by_id and not existing_by_name:
324 await sso_service.create_provider(provider_config)
325 print(f"✅ Created SSO provider: {provider_config['display_name']}")
326 else:
327 # Update existing provider with current configuration
328 existing_provider = existing_by_id or existing_by_name
330 # Smart merge for provider_metadata (see ADR-0003 for rationale):
331 # - Env config provides DEFAULTS for keys not in DB
332 # - DB values are PRESERVED (Admin API changes survive restarts)
333 # - New env keys introduced in upgrades APPLY automatically
334 #
335 # Trade-off: To change a key that exists in DB, use Admin API or reset provider.
336 # This prevents env config from unexpectedly overriding intentional Admin API changes.
337 #
338 # Example:
339 # env: {"groups_claim": "groups", "new_setting": "value"}
340 # db: {"groups_claim": "custom", "sync_roles": false}
341 # result: {"groups_claim": "custom", "new_setting": "value", "sync_roles": false}
342 if "provider_metadata" in provider_config and existing_provider.provider_metadata:
343 env_metadata = provider_config["provider_metadata"] or {}
344 db_metadata = existing_provider.provider_metadata or {}
345 # Env provides base, DB values override (preserving Admin API changes)
346 merged_metadata = {**env_metadata, **db_metadata}
347 provider_config["provider_metadata"] = merged_metadata
349 updated = await sso_service.update_provider(existing_provider.id, provider_config)
350 if updated:
351 print(f"🔄 Updated SSO provider: {provider_config['display_name']} (ID: {existing_provider.id})")
352 else:
353 print(f"ℹ️ SSO provider unchanged: {existing_provider.display_name} (ID: {existing_provider.id})")
355 except Exception as e:
356 db.rollback() # Rollback on error
357 print(f"❌ Failed to bootstrap SSO providers: {e}")
358 finally:
359 # Ensure close() always runs even if commit() fails
360 # Without this nested try/finally, a commit() failure would skip close(),
361 # leaving the connection in "idle in transaction" state
362 try:
363 db.commit() # Commit transaction to avoid implicit rollback
364 finally:
365 db.close()
368if __name__ == "__main__":
369 # Standard
370 import asyncio
372 asyncio.run(bootstrap_sso_providers())