Coverage for mcpgateway / utils / sso_bootstrap.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/sso_bootstrap.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Bootstrap SSO providers with predefined configurations. 

8""" 

9 

10# Future 

11from __future__ import annotations 

12 

13# Standard 

14import logging 

15from typing import Dict, List 

16 

17# First-Party 

18from mcpgateway.config import settings 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def get_predefined_sso_providers() -> List[Dict]: 

24 """Get list of predefined SSO providers based on environment configuration. 

25 

26 Returns: 

27 List of SSO provider configurations ready for database storage. 

28 

29 Examples: 

30 Default (no providers configured): 

31 >>> providers = get_predefined_sso_providers() 

32 >>> isinstance(providers, list) 

33 True 

34 

35 Patch configuration to include GitHub provider: 

36 >>> from types import SimpleNamespace 

37 >>> from unittest.mock import patch 

38 >>> cfg = SimpleNamespace( 

39 ... sso_github_enabled=True, 

40 ... sso_github_client_id='id', 

41 ... sso_github_client_secret='sec', 

42 ... sso_trusted_domains=[], 

43 ... sso_auto_create_users=True, 

44 ... sso_google_enabled=False, 

45 ... sso_ibm_verify_enabled=False, 

46 ... sso_okta_enabled=False, 

47 ... sso_entra_enabled=False, 

48 ... ) 

49 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

50 ... result = get_predefined_sso_providers() 

51 >>> isinstance(result, list) 

52 True 

53 

54 Patch configuration to include Google provider: 

55 >>> cfg = SimpleNamespace( 

56 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

57 ... sso_trusted_domains=[], sso_auto_create_users=True, 

58 ... sso_google_enabled=True, sso_google_client_id='gid', sso_google_client_secret='gsec', 

59 ... sso_ibm_verify_enabled=False, sso_okta_enabled=False, sso_entra_enabled=False 

60 ... ) 

61 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

62 ... result = get_predefined_sso_providers() 

63 >>> isinstance(result, list) 

64 True 

65 

66 Patch configuration to include Okta provider: 

67 >>> cfg = SimpleNamespace( 

68 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

69 ... sso_trusted_domains=[], sso_auto_create_users=True, 

70 ... sso_google_enabled=False, sso_okta_enabled=True, sso_okta_client_id='ok', sso_okta_client_secret='os', sso_okta_issuer='https://company.okta.com', 

71 ... sso_ibm_verify_enabled=False, sso_entra_enabled=False 

72 ... ) 

73 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

74 ... result = get_predefined_sso_providers() 

75 >>> isinstance(result, list) 

76 True 

77 

78 Patch configuration to include Microsoft Entra ID provider: 

79 >>> cfg = SimpleNamespace( 

80 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

81 ... sso_trusted_domains=[], sso_auto_create_users=True, 

82 ... sso_google_enabled=False, sso_okta_enabled=False, 

83 ... sso_ibm_verify_enabled=False, sso_entra_enabled=True, sso_entra_client_id='entra_client', sso_entra_client_secret='entra_secret', sso_entra_tenant_id='tenant-id-123', 

84 ... sso_generic_enabled=False 

85 ... ) 

86 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

87 ... result = get_predefined_sso_providers() 

88 >>> isinstance(result, list) 

89 True 

90 

91 Patch configuration to include Generic OIDC provider: 

92 >>> cfg = SimpleNamespace( 

93 ... sso_github_enabled=False, sso_github_client_id=None, sso_github_client_secret=None, 

94 ... sso_trusted_domains=[], sso_auto_create_users=True, 

95 ... sso_google_enabled=False, sso_okta_enabled=False, sso_ibm_verify_enabled=False, sso_entra_enabled=False, 

96 ... sso_generic_enabled=True, sso_generic_provider_id='keycloak', sso_generic_display_name='Keycloak', 

97 ... sso_generic_client_id='kc_client', sso_generic_client_secret='kc_secret', 

98 ... sso_generic_authorization_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/auth', 

99 ... sso_generic_token_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/token', 

100 ... sso_generic_userinfo_url='https://keycloak.company.com/auth/realms/master/protocol/openid-connect/userinfo', 

101 ... sso_generic_issuer='https://keycloak.company.com/auth/realms/master', 

102 ... sso_generic_scope='openid profile email' 

103 ... ) 

104 >>> with patch('mcpgateway.utils.sso_bootstrap.settings', cfg): 

105 ... result = get_predefined_sso_providers() 

106 >>> isinstance(result, list) 

107 True 

108 """ 

109 providers = [] 

110 

111 # GitHub OAuth Provider 

112 if settings.sso_github_enabled and settings.sso_github_client_id: 

113 providers.append( 

114 { 

115 "id": "github", 

116 "name": "github", 

117 "display_name": "GitHub", 

118 "provider_type": "oauth2", 

119 "client_id": settings.sso_github_client_id, 

120 "client_secret": settings.sso_github_client_secret.get_secret_value() if settings.sso_github_client_secret else "", 

121 "authorization_url": "https://github.com/login/oauth/authorize", 

122 "token_url": "https://github.com/login/oauth/access_token", # nosec B105 - public OAuth endpoint 

123 "userinfo_url": "https://api.github.com/user", 

124 "scope": "user:email", 

125 "trusted_domains": settings.sso_trusted_domains, 

126 "auto_create_users": settings.sso_auto_create_users, 

127 "team_mapping": {}, 

128 } 

129 ) 

130 

131 # Google OAuth Provider 

132 if settings.sso_google_enabled and settings.sso_google_client_id: 

133 providers.append( 

134 { 

135 "id": "google", 

136 "name": "google", 

137 "display_name": "Google", 

138 "provider_type": "oidc", 

139 "client_id": settings.sso_google_client_id, 

140 "client_secret": settings.sso_google_client_secret.get_secret_value() if settings.sso_google_client_secret else "", 

141 "authorization_url": "https://accounts.google.com/o/oauth2/auth", 

142 "token_url": "https://oauth2.googleapis.com/token", # nosec B105 - public OAuth endpoint 

143 "userinfo_url": "https://openidconnect.googleapis.com/v1/userinfo", 

144 "issuer": "https://accounts.google.com", 

145 "scope": "openid profile email", 

146 "trusted_domains": settings.sso_trusted_domains, 

147 "auto_create_users": settings.sso_auto_create_users, 

148 "team_mapping": {}, 

149 } 

150 ) 

151 

152 # IBM Security Verify Provider 

153 if settings.sso_ibm_verify_enabled and settings.sso_ibm_verify_client_id: 

154 base_url = settings.sso_ibm_verify_issuer or "https://tenant.verify.ibm.com" 

155 providers.append( 

156 { 

157 "id": "ibm_verify", 

158 "name": "ibm_verify", 

159 "display_name": "IBM Security Verify", 

160 "provider_type": "oidc", 

161 "client_id": settings.sso_ibm_verify_client_id, 

162 "client_secret": settings.sso_ibm_verify_client_secret.get_secret_value() if settings.sso_ibm_verify_client_secret else "", 

163 "authorization_url": f"{base_url}/oidc/endpoint/default/authorize", 

164 "token_url": f"{base_url}/oidc/endpoint/default/token", 

165 "userinfo_url": f"{base_url}/oidc/endpoint/default/userinfo", 

166 "issuer": f"{base_url}/oidc/endpoint/default", 

167 "scope": "openid profile email", 

168 "trusted_domains": settings.sso_trusted_domains, 

169 "auto_create_users": settings.sso_auto_create_users, 

170 "team_mapping": {}, 

171 } 

172 ) 

173 

174 # Okta Provider 

175 if settings.sso_okta_enabled and settings.sso_okta_client_id: 

176 base_url = settings.sso_okta_issuer or "https://company.okta.com" 

177 providers.append( 

178 { 

179 "id": "okta", 

180 "name": "okta", 

181 "display_name": "Okta", 

182 "provider_type": "oidc", 

183 "client_id": settings.sso_okta_client_id, 

184 "client_secret": settings.sso_okta_client_secret.get_secret_value() if settings.sso_okta_client_secret else "", 

185 "authorization_url": f"{base_url}/oauth2/default/v1/authorize", 

186 "token_url": f"{base_url}/oauth2/default/v1/token", 

187 "userinfo_url": f"{base_url}/oauth2/default/v1/userinfo", 

188 "issuer": f"{base_url}/oauth2/default", 

189 "scope": "openid profile email", 

190 "trusted_domains": settings.sso_trusted_domains, 

191 "auto_create_users": settings.sso_auto_create_users, 

192 "team_mapping": {}, 

193 } 

194 ) 

195 

196 # Microsoft Entra ID Provider 

197 if settings.sso_entra_enabled and settings.sso_entra_client_id and settings.sso_entra_tenant_id: 

198 tenant_id = settings.sso_entra_tenant_id 

199 base_url = f"https://login.microsoftonline.com/{tenant_id}" 

200 providers.append( 

201 { 

202 "id": "entra", 

203 "name": "entra", 

204 "display_name": "Microsoft Entra ID", 

205 "provider_type": "oidc", 

206 "client_id": settings.sso_entra_client_id, 

207 "client_secret": settings.sso_entra_client_secret.get_secret_value() if settings.sso_entra_client_secret else "", 

208 "authorization_url": f"{base_url}/oauth2/v2.0/authorize", 

209 "token_url": f"{base_url}/oauth2/v2.0/token", 

210 "userinfo_url": "https://graph.microsoft.com/oidc/userinfo", 

211 "issuer": f"{base_url}/v2.0", 

212 "scope": "openid profile email", 

213 "trusted_domains": settings.sso_trusted_domains, 

214 "auto_create_users": settings.sso_auto_create_users, 

215 "team_mapping": {}, 

216 "provider_metadata": { 

217 "groups_claim": settings.sso_entra_groups_claim, 

218 "role_mappings": settings.sso_entra_role_mappings, 

219 }, 

220 } 

221 ) 

222 

223 # Keycloak OIDC Provider with Auto-Discovery 

224 if settings.sso_keycloak_enabled and settings.sso_keycloak_base_url and settings.sso_keycloak_client_id: 

225 try: 

226 # First-Party 

227 from mcpgateway.utils.keycloak_discovery import discover_keycloak_endpoints_sync 

228 

229 endpoints = discover_keycloak_endpoints_sync(settings.sso_keycloak_base_url, settings.sso_keycloak_realm) 

230 

231 if endpoints: 

232 providers.append( 

233 { 

234 "id": "keycloak", 

235 "name": "keycloak", 

236 "display_name": f"Keycloak ({settings.sso_keycloak_realm})", 

237 "provider_type": "oidc", 

238 "client_id": settings.sso_keycloak_client_id, 

239 "client_secret": settings.sso_keycloak_client_secret.get_secret_value() if settings.sso_keycloak_client_secret else "", 

240 "authorization_url": endpoints["authorization_url"], 

241 "token_url": endpoints["token_url"], 

242 "userinfo_url": endpoints["userinfo_url"], 

243 "issuer": endpoints["issuer"], 

244 "jwks_uri": endpoints.get("jwks_uri"), 

245 "scope": "openid profile email", 

246 "trusted_domains": settings.sso_trusted_domains, 

247 "auto_create_users": settings.sso_auto_create_users, 

248 "team_mapping": {}, 

249 "provider_metadata": { 

250 "realm": settings.sso_keycloak_realm, 

251 "base_url": settings.sso_keycloak_base_url, 

252 "map_realm_roles": settings.sso_keycloak_map_realm_roles, 

253 "map_client_roles": settings.sso_keycloak_map_client_roles, 

254 "username_claim": settings.sso_keycloak_username_claim, 

255 "email_claim": settings.sso_keycloak_email_claim, 

256 "groups_claim": settings.sso_keycloak_groups_claim, 

257 }, 

258 } 

259 ) 

260 else: 

261 logger.error(f"Failed to discover Keycloak endpoints for realm '{settings.sso_keycloak_realm}' at {settings.sso_keycloak_base_url}") 

262 except Exception as e: 

263 logger.error(f"Error bootstrapping Keycloak provider: {e}") 

264 

265 # Generic OIDC Provider (Keycloak, Auth0, Authentik, etc.) 

266 if settings.sso_generic_enabled and settings.sso_generic_client_id and settings.sso_generic_provider_id: 

267 provider_id = settings.sso_generic_provider_id 

268 display_name = settings.sso_generic_display_name or provider_id.title() 

269 

270 providers.append( 

271 { 

272 "id": provider_id, 

273 "name": provider_id, 

274 "display_name": display_name, 

275 "provider_type": "oidc", 

276 "client_id": settings.sso_generic_client_id, 

277 "client_secret": settings.sso_generic_client_secret.get_secret_value() if settings.sso_generic_client_secret else "", 

278 "authorization_url": settings.sso_generic_authorization_url, 

279 "token_url": settings.sso_generic_token_url, 

280 "userinfo_url": settings.sso_generic_userinfo_url, 

281 "issuer": settings.sso_generic_issuer, 

282 "scope": settings.sso_generic_scope, 

283 "trusted_domains": settings.sso_trusted_domains, 

284 "auto_create_users": settings.sso_auto_create_users, 

285 "team_mapping": {}, 

286 } 

287 ) 

288 

289 return providers 

290 

291 

292async def bootstrap_sso_providers() -> None: 

293 """Bootstrap SSO providers from environment configuration. 

294 

295 This function should be called during application startup to 

296 automatically configure SSO providers based on environment variables. 

297 

298 Examples: 

299 >>> # This would typically be called during app startup 

300 >>> import asyncio 

301 >>> asyncio.run(bootstrap_sso_providers()) # doctest: +SKIP 

302 """ 

303 if not settings.sso_enabled: 

304 return 

305 

306 # First-Party 

307 from mcpgateway.db import get_db 

308 from mcpgateway.services.sso_service import SSOService 

309 

310 providers = get_predefined_sso_providers() 

311 if not providers: 

312 return 

313 

314 db = next(get_db()) 

315 try: 

316 sso_service = SSOService(db) 

317 

318 for provider_config in providers: 

319 # Check if provider already exists by ID or name (both have unique constraints) 

320 existing_by_id = sso_service.get_provider(provider_config["id"]) 

321 existing_by_name = sso_service.get_provider_by_name(provider_config["name"]) 

322 

323 if not existing_by_id and not existing_by_name: 

324 await sso_service.create_provider(provider_config) 

325 print(f"✅ Created SSO provider: {provider_config['display_name']}") 

326 else: 

327 # Update existing provider with current configuration 

328 existing_provider = existing_by_id or existing_by_name 

329 

330 # Smart merge for provider_metadata (see ADR-0003 for rationale): 

331 # - Env config provides DEFAULTS for keys not in DB 

332 # - DB values are PRESERVED (Admin API changes survive restarts) 

333 # - New env keys introduced in upgrades APPLY automatically 

334 # 

335 # Trade-off: To change a key that exists in DB, use Admin API or reset provider. 

336 # This prevents env config from unexpectedly overriding intentional Admin API changes. 

337 # 

338 # Example: 

339 # env: {"groups_claim": "groups", "new_setting": "value"} 

340 # db: {"groups_claim": "custom", "sync_roles": false} 

341 # result: {"groups_claim": "custom", "new_setting": "value", "sync_roles": false} 

342 if "provider_metadata" in provider_config and existing_provider.provider_metadata: 

343 env_metadata = provider_config["provider_metadata"] or {} 

344 db_metadata = existing_provider.provider_metadata or {} 

345 # Env provides base, DB values override (preserving Admin API changes) 

346 merged_metadata = {**env_metadata, **db_metadata} 

347 provider_config["provider_metadata"] = merged_metadata 

348 

349 updated = await sso_service.update_provider(existing_provider.id, provider_config) 

350 if updated: 

351 print(f"🔄 Updated SSO provider: {provider_config['display_name']} (ID: {existing_provider.id})") 

352 else: 

353 print(f"ℹ️ SSO provider unchanged: {existing_provider.display_name} (ID: {existing_provider.id})") 

354 

355 except Exception as e: 

356 db.rollback() # Rollback on error 

357 print(f"❌ Failed to bootstrap SSO providers: {e}") 

358 finally: 

359 # Ensure close() always runs even if commit() fails 

360 # Without this nested try/finally, a commit() failure would skip close(), 

361 # leaving the connection in "idle in transaction" state 

362 try: 

363 db.commit() # Commit transaction to avoid implicit rollback 

364 finally: 

365 db.close() 

366 

367 

368if __name__ == "__main__": 

369 # Standard 

370 import asyncio 

371 

372 asyncio.run(bootstrap_sso_providers())