Coverage for mcpgateway / routers / sso.py: 98%

287 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/sso.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Single Sign-On (SSO) authentication routes for OAuth2/OIDC providers. 

8Handles SSO login flows, provider configuration, and callback handling. 

9""" 

10 

11# Standard 

12import secrets 

13from typing import Dict, List, Optional 

14from urllib.parse import urlparse 

15 

16# Third-Party 

17from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status 

18from pydantic import BaseModel 

19from sqlalchemy.orm import Session 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.db import get_db 

24from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

25from mcpgateway.services.logging_service import LoggingService 

26from mcpgateway.services.sso_service import SSOService 

27from mcpgateway.utils.log_sanitizer import sanitize_for_log 

28 

29# Initialize logging 

30logging_service = LoggingService() 

31logger = logging_service.get_logger("mcpgateway.routers.sso") 

32 

33 

34class SSOProviderCreateRequest(BaseModel): 

35 """Request to create SSO provider.""" 

36 

37 id: str 

38 name: str 

39 display_name: str 

40 provider_type: str # oauth2, oidc 

41 client_id: str 

42 client_secret: str 

43 authorization_url: str 

44 token_url: str 

45 userinfo_url: str 

46 issuer: Optional[str] = None 

47 jwks_uri: Optional[str] = None 

48 scope: str = "openid profile email" 

49 trusted_domains: List[str] = [] 

50 auto_create_users: bool = True 

51 team_mapping: Dict = {} 

52 provider_metadata: Dict = {} # Role mappings, groups_claim config, etc. 

53 

54 

55class SSOProviderUpdateRequest(BaseModel): 

56 """Request to update SSO provider.""" 

57 

58 name: Optional[str] = None 

59 display_name: Optional[str] = None 

60 provider_type: Optional[str] = None 

61 client_id: Optional[str] = None 

62 client_secret: Optional[str] = None 

63 authorization_url: Optional[str] = None 

64 token_url: Optional[str] = None 

65 userinfo_url: Optional[str] = None 

66 issuer: Optional[str] = None 

67 jwks_uri: Optional[str] = None 

68 scope: Optional[str] = None 

69 trusted_domains: Optional[List[str]] = None 

70 auto_create_users: Optional[bool] = None 

71 team_mapping: Optional[Dict] = None 

72 provider_metadata: Optional[Dict] = None # Role mappings, groups_claim config, etc. 

73 is_enabled: Optional[bool] = None 

74 

75 

76# Create router 

77sso_router = APIRouter(prefix="/auth/sso", tags=["SSO Authentication"]) 

78 

79 

80class SSOProviderResponse(BaseModel): 

81 """SSO provider information for client.""" 

82 

83 id: str 

84 name: str 

85 display_name: str 

86 authorization_url: Optional[str] = None # Only provided when initiating login 

87 

88 

89class SSOLoginResponse(BaseModel): 

90 """SSO login initiation response.""" 

91 

92 authorization_url: str 

93 state: str 

94 

95 

96class SSOCallbackResponse(BaseModel): 

97 """SSO authentication callback response.""" 

98 

99 access_token: str 

100 token_type: str = "bearer" 

101 expires_in: int 

102 user: Dict 

103 

104 

105@sso_router.get("/providers", response_model=List[SSOProviderResponse]) 

106async def list_sso_providers( 

107 db: Session = Depends(get_db), 

108) -> List[SSOProviderResponse]: 

109 """List available SSO providers for login. 

110 

111 Args: 

112 db: Database session 

113 

114 Returns: 

115 List of enabled SSO providers with basic information. 

116 

117 Raises: 

118 HTTPException: If SSO authentication is disabled 

119 

120 Examples: 

121 >>> import asyncio 

122 >>> asyncio.iscoroutinefunction(list_sso_providers) 

123 True 

124 """ 

125 if not settings.sso_enabled: 

126 raise HTTPException(status_code=404, detail="SSO authentication is disabled") 

127 

128 sso_service = SSOService(db) 

129 providers = sso_service.list_enabled_providers() 

130 

131 return [SSOProviderResponse(id=provider.id, name=provider.name, display_name=provider.display_name) for provider in providers] 

132 

133 

134def _normalize_origin(scheme: str, host: str, port: int | None) -> str: 

135 """Normalize an origin to scheme://host:port format. 

136 

137 Args: 

138 scheme: URL scheme (http/https) 

139 host: Hostname 

140 port: Port number (None uses default for scheme) 

141 

142 Returns: 

143 Normalized origin string 

144 """ 

145 # Use default ports for scheme if not specified 

146 default_ports = {"http": 80, "https": 443} 

147 if port is None or port == default_ports.get(scheme): 

148 return f"{scheme}://{host}" 

149 return f"{scheme}://{host}:{port}" 

150 

151 

152def _validate_redirect_uri(redirect_uri: str, request: Request | None = None) -> bool: 

153 """Validate redirect_uri to prevent open redirect attacks. 

154 

155 Validates against a server-side allowlist (settings.allowed_origins and settings.app_domain). 

156 Does NOT trust the Host header to prevent spoofing attacks. 

157 

158 Allows: 

159 - Relative URIs (no scheme/host) 

160 - URIs matching configured allowed_origins (full origin including scheme and port) 

161 - URIs matching app_domain (if configured) 

162 

163 Args: 

164 redirect_uri: The redirect URI to validate 

165 request: The FastAPI request object (unused, kept for API compatibility) 

166 

167 Returns: 

168 True if the redirect_uri is safe, False otherwise 

169 """ 

170 parsed = urlparse(redirect_uri) 

171 

172 # Allow relative URIs (no scheme and no netloc) 

173 if not parsed.scheme and not parsed.netloc: 

174 return True 

175 

176 # For absolute URIs, validate against server-side allowlist only 

177 # Extract full origin components from redirect_uri 

178 redirect_scheme = parsed.scheme.lower() 

179 redirect_host = parsed.hostname.lower() if parsed.hostname else "" 

180 redirect_port = parsed.port 

181 

182 # Normalize the redirect origin 

183 redirect_origin = _normalize_origin(redirect_scheme, redirect_host, redirect_port) 

184 

185 # Check against app_domain (if configured) 

186 if hasattr(settings, "app_domain") and settings.app_domain: 

187 # app_domain is an HttpUrl - extract the hostname for comparison 

188 app_domain_host = urlparse(str(settings.app_domain)).hostname or "" 

189 app_domain_host = app_domain_host.lower() 

190 if redirect_host == app_domain_host: 

191 # Only allow HTTPS in production, or HTTP for localhost 

192 if redirect_scheme == "https" or (redirect_scheme == "http" and app_domain_host in ("localhost", "127.0.0.1")): 

193 return True 

194 

195 # Check against allowed_origins (full origin match including scheme and port) 

196 if hasattr(settings, "allowed_origins") and settings.allowed_origins: 

197 for origin in settings.allowed_origins: 

198 origin = origin.strip() 

199 if not origin: 

200 continue 

201 

202 # Parse the allowed origin 

203 origin_parsed = urlparse(origin if "://" in origin else f"https://{origin}") 

204 origin_scheme = origin_parsed.scheme.lower() if origin_parsed.scheme else "https" 

205 origin_host = origin_parsed.hostname.lower() if origin_parsed.hostname else origin.lower() 

206 origin_port = origin_parsed.port 

207 

208 # Normalize and compare full origins 

209 allowed_origin = _normalize_origin(origin_scheme, origin_host, origin_port) 

210 if redirect_origin == allowed_origin: 

211 return True 

212 

213 return False 

214 

215 

216@sso_router.get("/login/{provider_id}", response_model=SSOLoginResponse) 

217async def initiate_sso_login( 

218 provider_id: str, 

219 request: Request, 

220 response: Response, 

221 redirect_uri: str = Query(..., description="Callback URI after authentication"), 

222 scopes: Optional[str] = Query(None, description="Space-separated OAuth scopes"), 

223 db: Session = Depends(get_db), 

224) -> SSOLoginResponse: 

225 """Initiate SSO authentication flow. 

226 

227 Validates the redirect_uri against a server-side allowlist to prevent open redirect attacks. 

228 Only allows relative URIs, URIs matching app_domain, or URIs from configured allowed_origins. 

229 Does NOT trust the Host header for validation. 

230 

231 Args: 

232 provider_id: SSO provider identifier (e.g., 'github', 'google') 

233 request: FastAPI request object 

234 response: FastAPI response object used to set session-binding cookie 

235 redirect_uri: Callback URI after successful authentication 

236 scopes: Optional custom OAuth scopes (space-separated) 

237 db: Database session 

238 

239 Returns: 

240 Authorization URL and state parameter for redirect. 

241 

242 Raises: 

243 HTTPException: If SSO is disabled, provider not found, or redirect_uri is invalid 

244 

245 Examples: 

246 >>> import asyncio 

247 >>> asyncio.iscoroutinefunction(initiate_sso_login) 

248 True 

249 """ 

250 if not settings.sso_enabled: 

251 raise HTTPException(status_code=404, detail="SSO authentication is disabled") 

252 

253 # Validate redirect_uri to prevent open redirect attacks 

254 # Uses server-side allowlist (allowed_origins, app_domain) - does NOT trust Host header 

255 if not _validate_redirect_uri(redirect_uri, request): 

256 # Sanitize untrusted redirect_uri before logging to prevent log injection 

257 logger.warning(f"SSO login rejected - invalid redirect_uri: {sanitize_for_log(redirect_uri)}") 

258 raise HTTPException( 

259 status_code=status.HTTP_400_BAD_REQUEST, 

260 detail="Invalid redirect_uri. Must be a relative path or URL matching allowed origins.", 

261 ) 

262 

263 sso_service = SSOService(db) 

264 scope_list = scopes.split() if scopes else None 

265 browser_session_binding = secrets.token_urlsafe(32) 

266 

267 try: 

268 auth_url = sso_service.get_authorization_url(provider_id, redirect_uri, scope_list, session_binding=browser_session_binding) 

269 except ValueError as exc: 

270 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc 

271 

272 if not auth_url: 

273 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found or disabled") 

274 

275 # Extract state from URL for client reference 

276 # Standard 

277 import urllib.parse 

278 

279 parsed = urllib.parse.urlparse(auth_url) 

280 params = urllib.parse.parse_qs(parsed.query) 

281 state = params.get("state", [""])[0] 

282 

283 use_secure = (settings.environment == "production") or settings.secure_cookies 

284 response.set_cookie( 

285 key="sso_session_id", 

286 value=browser_session_binding, 

287 httponly=True, 

288 secure=use_secure, 

289 samesite=settings.cookie_samesite, 

290 path=settings.app_root_path or "/", 

291 ) 

292 

293 return SSOLoginResponse(authorization_url=auth_url, state=state) 

294 

295 

296@sso_router.get("/callback/{provider_id}") 

297async def handle_sso_callback( 

298 provider_id: str, 

299 code: Optional[str] = Query(None, description="Authorization code from SSO provider"), 

300 state: Optional[str] = Query(None, description="CSRF state parameter"), 

301 error: Optional[str] = Query(None, description="OAuth error code"), 

302 error_description: Optional[str] = Query(None, description="OAuth error description"), 

303 request: Request = None, 

304 response: Response = None, 

305 db: Session = Depends(get_db), 

306): 

307 """Handle SSO authentication callback. 

308 

309 Args: 

310 provider_id: SSO provider identifier 

311 code: Authorization code from provider (present on success) 

312 state: CSRF state parameter for validation 

313 error: OAuth error code (present on failure) 

314 error_description: OAuth error description (present on failure) 

315 request: FastAPI request object 

316 response: FastAPI response object 

317 db: Database session 

318 

319 Returns: 

320 JWT access token and user information, or redirect to login with error. 

321 

322 Raises: 

323 HTTPException: If SSO is disabled or authentication fails 

324 

325 Examples: 

326 >>> import asyncio 

327 >>> asyncio.iscoroutinefunction(handle_sso_callback) 

328 True 

329 """ 

330 # Third-Party 

331 from fastapi.responses import RedirectResponse 

332 

333 if not settings.sso_enabled: 

334 raise HTTPException(status_code=404, detail="SSO authentication is disabled") 

335 

336 # Get root path for URL construction 

337 root_path = request.scope.get("root_path", "") if request else "" 

338 

339 # Handle OAuth error responses from provider (RFC 6749 Section 4.1.2.1) 

340 if error: 

341 error_msg = error_description or error 

342 logger.warning("SSO callback error from provider '%s': %s - %s", provider_id, error, error_msg) 

343 

344 error_mappings = { 

345 "access_denied": "sso_cancelled", 

346 "invalid_request": "sso_invalid_request", 

347 "unauthorized_client": "sso_unauthorized", 

348 "unsupported_response_type": "sso_config_error", 

349 "invalid_scope": "sso_invalid_scope", 

350 "server_error": "sso_server_error", 

351 "temporarily_unavailable": "sso_unavailable", 

352 } 

353 error_code = error_mappings.get(error, "sso_failed") 

354 return RedirectResponse(url=f"{root_path}/admin/login?error={error_code}", status_code=302) 

355 

356 # Code and state are required if no error was returned 

357 if not code: 

358 logger.warning("SSO callback for provider '%s' missing both code and error parameters", provider_id) 

359 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302) 

360 

361 if not state: 

362 logger.warning("SSO callback for provider '%s' missing required state parameter", provider_id) 

363 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302) 

364 

365 sso_service = SSOService(db) 

366 

367 # Handle OAuth callback — returns (user_info, token_data) or None 

368 user_info: Optional[Dict[str, object]] = None 

369 token_data: Dict[str, object] = {} 

370 

371 browser_session_binding = request.cookies.get("sso_session_id") if request else None 

372 if not browser_session_binding: 

373 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302) 

374 

375 callback_result = await sso_service.handle_oauth_callback_with_tokens(provider_id, code, state, session_binding=browser_session_binding) 

376 if callback_result: 

377 user_info, token_data = callback_result 

378 

379 if not user_info: 

380 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302) 

381 

382 # Authenticate or create user 

383 access_token = await sso_service.authenticate_or_create_user(user_info) 

384 if not access_token: 

385 return RedirectResponse(url=f"{root_path}/admin/login?error=user_creation_failed", status_code=302) 

386 

387 # Create redirect response 

388 redirect_response = RedirectResponse(url=f"{root_path}/admin", status_code=302) 

389 

390 # Set secure HTTP-only cookie using the same method as email auth 

391 # First-Party 

392 from mcpgateway.utils.security_cookies import CookieTooLargeError, set_auth_cookie 

393 

394 try: 

395 set_auth_cookie(redirect_response, access_token, remember_me=False) 

396 except CookieTooLargeError: 

397 redirect_response = RedirectResponse( 

398 url=f"{root_path}/admin/login?error=token_too_large", 

399 status_code=302, 

400 ) 

401 return redirect_response 

402 

403 # Persist Keycloak ID token as short-lived, HTTP-only hint for RP-initiated logout. 

404 # Without id_token_hint, some Keycloak versions show confirmation and may preserve SSO. 

405 id_token = token_data.get("id_token") 

406 if provider_id == "keycloak" and isinstance(id_token, str) and id_token: 

407 if len(id_token) > 3800: # Leave room for cookie metadata within browser 4KB limit 

408 logger.warning("Keycloak id_token too large for cookie storage. RP-initiated logout will not include id_token_hint.") 

409 else: 

410 use_secure = (settings.environment == "production") or settings.secure_cookies 

411 redirect_response.set_cookie( 

412 key="sso_id_token_hint", 

413 value=id_token, 

414 max_age=settings.token_expiry * 60, # match session token lifetime 

415 httponly=True, 

416 secure=use_secure, 

417 samesite=settings.cookie_samesite, 

418 path=settings.app_root_path or "/", 

419 ) 

420 

421 return redirect_response 

422 

423 

424# Admin endpoints for SSO provider management 

425@sso_router.post("/admin/providers", response_model=Dict) 

426@require_permission("admin.sso_providers:create") 

427async def create_sso_provider( 

428 provider_data: SSOProviderCreateRequest, 

429 db: Session = Depends(get_db), 

430 user=Depends(get_current_user_with_permissions), 

431) -> Dict: 

432 """Create new SSO provider configuration (Admin only). 

433 

434 Args: 

435 provider_data: SSO provider configuration 

436 db: Database session 

437 user: Current authenticated user 

438 

439 Returns: 

440 Created provider information. 

441 

442 Raises: 

443 HTTPException: If provider already exists or creation fails 

444 """ 

445 sso_service = SSOService(db) 

446 

447 # Check if provider already exists 

448 existing = sso_service.get_provider(provider_data.id) 

449 if existing: 

450 raise HTTPException(status_code=409, detail=f"SSO provider '{provider_data.id}' already exists") 

451 

452 try: 

453 provider = await sso_service.create_provider(provider_data.model_dump()) 

454 except ValueError as exc: 

455 raise HTTPException(status_code=400, detail=str(exc)) from exc 

456 

457 result = { 

458 "id": provider.id, 

459 "name": provider.name, 

460 "display_name": provider.display_name, 

461 "provider_type": provider.provider_type, 

462 "is_enabled": provider.is_enabled, 

463 "created_at": provider.created_at, 

464 } 

465 db.commit() 

466 db.close() 

467 return result 

468 

469 

470@sso_router.get("/admin/providers", response_model=List[Dict]) 

471@require_permission("admin.sso_providers:read") 

472async def list_all_sso_providers( 

473 db: Session = Depends(get_db), 

474 user=Depends(get_current_user_with_permissions), 

475) -> List[Dict]: 

476 """List all SSO providers including disabled ones (Admin only). 

477 

478 Args: 

479 db: Database session 

480 user: Current authenticated user 

481 

482 Returns: 

483 List of all SSO providers with configuration details. 

484 """ 

485 # Third-Party 

486 from sqlalchemy import select 

487 

488 # First-Party 

489 from mcpgateway.db import SSOProvider 

490 

491 stmt = select(SSOProvider) 

492 result = db.execute(stmt) 

493 providers = result.scalars().all() 

494 

495 result = [ 

496 { 

497 "id": provider.id, 

498 "name": provider.name, 

499 "display_name": provider.display_name, 

500 "provider_type": provider.provider_type, 

501 "is_enabled": provider.is_enabled, 

502 "trusted_domains": provider.trusted_domains, 

503 "auto_create_users": provider.auto_create_users, 

504 "created_at": provider.created_at, 

505 "updated_at": provider.updated_at, 

506 } 

507 for provider in providers 

508 ] 

509 db.commit() 

510 db.close() 

511 return result 

512 

513 

514@sso_router.get("/admin/providers/{provider_id}", response_model=Dict) 

515@require_permission("admin.sso_providers:read") 

516async def get_sso_provider( 

517 provider_id: str, 

518 db: Session = Depends(get_db), 

519 user=Depends(get_current_user_with_permissions), 

520) -> Dict: 

521 """Get SSO provider details (Admin only). 

522 

523 Args: 

524 provider_id: Provider identifier 

525 db: Database session 

526 user: Current authenticated user 

527 

528 Returns: 

529 Provider configuration details. 

530 

531 Raises: 

532 HTTPException: If provider not found 

533 """ 

534 sso_service = SSOService(db) 

535 provider = sso_service.get_provider(provider_id) 

536 

537 if not provider: 

538 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found") 

539 

540 result = { 

541 "id": provider.id, 

542 "name": provider.name, 

543 "display_name": provider.display_name, 

544 "provider_type": provider.provider_type, 

545 "client_id": provider.client_id, 

546 "authorization_url": provider.authorization_url, 

547 "token_url": provider.token_url, 

548 "userinfo_url": provider.userinfo_url, 

549 "issuer": provider.issuer, 

550 "jwks_uri": provider.jwks_uri, 

551 "scope": provider.scope, 

552 "trusted_domains": provider.trusted_domains, 

553 "auto_create_users": provider.auto_create_users, 

554 "team_mapping": provider.team_mapping, 

555 "is_enabled": provider.is_enabled, 

556 "created_at": provider.created_at, 

557 "updated_at": provider.updated_at, 

558 "provider_metadata": provider.provider_metadata, 

559 } 

560 db.commit() 

561 db.close() 

562 return result 

563 

564 

565@sso_router.put("/admin/providers/{provider_id}", response_model=Dict) 

566@require_permission("admin.sso_providers:update") 

567async def update_sso_provider( 

568 provider_id: str, 

569 provider_data: SSOProviderUpdateRequest, 

570 db: Session = Depends(get_db), 

571 user=Depends(get_current_user_with_permissions), 

572) -> Dict: 

573 """Update SSO provider configuration (Admin only). 

574 

575 Args: 

576 provider_id: Provider identifier 

577 provider_data: Updated provider configuration 

578 db: Database session 

579 user: Current authenticated user 

580 

581 Returns: 

582 Updated provider information. 

583 

584 Raises: 

585 HTTPException: If provider not found or update fails 

586 """ 

587 sso_service = SSOService(db) 

588 

589 # Filter out None values 

590 update_data = {k: v for k, v in provider_data.model_dump().items() if v is not None} 

591 if not update_data: 

592 raise HTTPException(status_code=400, detail="No update data provided") 

593 

594 try: 

595 provider = await sso_service.update_provider(provider_id, update_data) 

596 except ValueError as exc: 

597 raise HTTPException(status_code=400, detail=str(exc)) from exc 

598 

599 if not provider: 

600 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found") 

601 

602 result = { 

603 "id": provider.id, 

604 "name": provider.name, 

605 "display_name": provider.display_name, 

606 "provider_type": provider.provider_type, 

607 "is_enabled": provider.is_enabled, 

608 "updated_at": provider.updated_at, 

609 } 

610 db.commit() 

611 db.close() 

612 return result 

613 

614 

615@sso_router.delete("/admin/providers/{provider_id}") 

616@require_permission("admin.sso_providers:delete") 

617async def delete_sso_provider( 

618 provider_id: str, 

619 db: Session = Depends(get_db), 

620 user=Depends(get_current_user_with_permissions), 

621) -> Dict: 

622 """Delete SSO provider configuration (Admin only). 

623 

624 Args: 

625 provider_id: Provider identifier 

626 db: Database session 

627 user: Current authenticated user 

628 

629 Returns: 

630 Deletion confirmation. 

631 

632 Raises: 

633 HTTPException: If provider not found 

634 """ 

635 sso_service = SSOService(db) 

636 

637 if not sso_service.delete_provider(provider_id): 

638 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found") 

639 

640 db.commit() 

641 db.close() 

642 return {"message": f"SSO provider '{provider_id}' deleted successfully"} 

643 

644 

645# --------------------------------------------------------------------------- 

646# SSO User Approval Management Endpoints 

647# --------------------------------------------------------------------------- 

648 

649 

650class PendingUserApprovalResponse(BaseModel): 

651 """Response model for pending user approval.""" 

652 

653 id: str 

654 email: str 

655 full_name: str 

656 auth_provider: str 

657 requested_at: str 

658 expires_at: str 

659 status: str 

660 sso_metadata: Optional[Dict] = None 

661 

662 

663class ApprovalActionRequest(BaseModel): 

664 """Request model for approval actions.""" 

665 

666 action: str # "approve" or "reject" 

667 reason: Optional[str] = None # Required for rejection 

668 notes: Optional[str] = None 

669 

670 

671@sso_router.get("/pending-approvals", response_model=List[PendingUserApprovalResponse]) 

672@require_permission("admin.user_management") 

673async def list_pending_approvals( 

674 include_expired: bool = Query(False, description="Include expired approval requests"), 

675 db: Session = Depends(get_db), 

676 user=Depends(get_current_user_with_permissions), 

677) -> List[PendingUserApprovalResponse]: 

678 """List pending SSO user approval requests (Admin only). 

679 

680 Args: 

681 include_expired: Whether to include expired requests 

682 db: Database session 

683 user: Current authenticated admin user 

684 

685 Returns: 

686 List of pending approval requests 

687 """ 

688 # Third-Party 

689 from sqlalchemy import select 

690 

691 # First-Party 

692 from mcpgateway.db import PendingUserApproval 

693 

694 query = select(PendingUserApproval) 

695 

696 if not include_expired: 

697 # First-Party 

698 from mcpgateway.db import utc_now 

699 

700 query = query.where(PendingUserApproval.expires_at > utc_now()) 

701 

702 # Filter by status 

703 query = query.where(PendingUserApproval.status == "pending") 

704 query = query.order_by(PendingUserApproval.requested_at.desc()) 

705 

706 result = db.execute(query) 

707 pending_approvals = result.scalars().all() 

708 

709 return [ 

710 PendingUserApprovalResponse( 

711 id=approval.id, 

712 email=approval.email, 

713 full_name=approval.full_name, 

714 auth_provider=approval.auth_provider, 

715 requested_at=approval.requested_at.isoformat(), 

716 expires_at=approval.expires_at.isoformat(), 

717 status=approval.status, 

718 sso_metadata=approval.sso_metadata, 

719 ) 

720 for approval in pending_approvals 

721 ] 

722 

723 

724@sso_router.post("/pending-approvals/{approval_id}/action") 

725@require_permission("admin.user_management") 

726async def handle_approval_request( 

727 approval_id: str, 

728 request: ApprovalActionRequest, 

729 db: Session = Depends(get_db), 

730 user=Depends(get_current_user_with_permissions), 

731) -> Dict: 

732 """Approve or reject a pending SSO user registration (Admin only). 

733 

734 Args: 

735 approval_id: ID of the approval request 

736 request: Approval action (approve/reject) with optional reason/notes 

737 db: Database session 

738 user: Current authenticated admin user 

739 

740 Returns: 

741 Action confirmation message 

742 

743 Raises: 

744 HTTPException: If approval not found or invalid action 

745 """ 

746 # Third-Party 

747 from sqlalchemy import select 

748 

749 # First-Party 

750 from mcpgateway.db import PendingUserApproval 

751 

752 # Get pending approval 

753 approval = db.execute(select(PendingUserApproval).where(PendingUserApproval.id == approval_id)).scalar_one_or_none() 

754 

755 if not approval: 

756 raise HTTPException(status_code=404, detail="Approval request not found") 

757 

758 if approval.status != "pending": 

759 raise HTTPException(status_code=400, detail=f"Approval request is already {approval.status}") 

760 

761 if approval.is_expired(): 

762 approval.status = "expired" 

763 db.commit() 

764 raise HTTPException(status_code=400, detail="Approval request has expired") 

765 

766 admin_email = user["email"] 

767 

768 if request.action == "approve": 

769 approval.approve(admin_email, request.notes) 

770 db.commit() 

771 return {"message": f"User {approval.email} approved successfully"} 

772 

773 elif request.action == "reject": 

774 if not request.reason: 

775 raise HTTPException(status_code=400, detail="Rejection reason is required") 

776 approval.reject(admin_email, request.reason, request.notes) 

777 db.commit() 

778 return {"message": f"User {approval.email} rejected"} 

779 

780 else: 

781 raise HTTPException(status_code=400, detail="Invalid action. Must be 'approve' or 'reject'")