Coverage for mcpgateway / routers / sso.py: 98%
287 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/sso.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Single Sign-On (SSO) authentication routes for OAuth2/OIDC providers.
8Handles SSO login flows, provider configuration, and callback handling.
9"""
11# Standard
12import secrets
13from typing import Dict, List, Optional
14from urllib.parse import urlparse
16# Third-Party
17from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status
18from pydantic import BaseModel
19from sqlalchemy.orm import Session
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.db import get_db
24from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
25from mcpgateway.services.logging_service import LoggingService
26from mcpgateway.services.sso_service import SSOService
27from mcpgateway.utils.log_sanitizer import sanitize_for_log
29# Initialize logging
30logging_service = LoggingService()
31logger = logging_service.get_logger("mcpgateway.routers.sso")
34class SSOProviderCreateRequest(BaseModel):
35 """Request to create SSO provider."""
37 id: str
38 name: str
39 display_name: str
40 provider_type: str # oauth2, oidc
41 client_id: str
42 client_secret: str
43 authorization_url: str
44 token_url: str
45 userinfo_url: str
46 issuer: Optional[str] = None
47 jwks_uri: Optional[str] = None
48 scope: str = "openid profile email"
49 trusted_domains: List[str] = []
50 auto_create_users: bool = True
51 team_mapping: Dict = {}
52 provider_metadata: Dict = {} # Role mappings, groups_claim config, etc.
55class SSOProviderUpdateRequest(BaseModel):
56 """Request to update SSO provider."""
58 name: Optional[str] = None
59 display_name: Optional[str] = None
60 provider_type: Optional[str] = None
61 client_id: Optional[str] = None
62 client_secret: Optional[str] = None
63 authorization_url: Optional[str] = None
64 token_url: Optional[str] = None
65 userinfo_url: Optional[str] = None
66 issuer: Optional[str] = None
67 jwks_uri: Optional[str] = None
68 scope: Optional[str] = None
69 trusted_domains: Optional[List[str]] = None
70 auto_create_users: Optional[bool] = None
71 team_mapping: Optional[Dict] = None
72 provider_metadata: Optional[Dict] = None # Role mappings, groups_claim config, etc.
73 is_enabled: Optional[bool] = None
76# Create router
77sso_router = APIRouter(prefix="/auth/sso", tags=["SSO Authentication"])
80class SSOProviderResponse(BaseModel):
81 """SSO provider information for client."""
83 id: str
84 name: str
85 display_name: str
86 authorization_url: Optional[str] = None # Only provided when initiating login
89class SSOLoginResponse(BaseModel):
90 """SSO login initiation response."""
92 authorization_url: str
93 state: str
96class SSOCallbackResponse(BaseModel):
97 """SSO authentication callback response."""
99 access_token: str
100 token_type: str = "bearer"
101 expires_in: int
102 user: Dict
105@sso_router.get("/providers", response_model=List[SSOProviderResponse])
106async def list_sso_providers(
107 db: Session = Depends(get_db),
108) -> List[SSOProviderResponse]:
109 """List available SSO providers for login.
111 Args:
112 db: Database session
114 Returns:
115 List of enabled SSO providers with basic information.
117 Raises:
118 HTTPException: If SSO authentication is disabled
120 Examples:
121 >>> import asyncio
122 >>> asyncio.iscoroutinefunction(list_sso_providers)
123 True
124 """
125 if not settings.sso_enabled:
126 raise HTTPException(status_code=404, detail="SSO authentication is disabled")
128 sso_service = SSOService(db)
129 providers = sso_service.list_enabled_providers()
131 return [SSOProviderResponse(id=provider.id, name=provider.name, display_name=provider.display_name) for provider in providers]
134def _normalize_origin(scheme: str, host: str, port: int | None) -> str:
135 """Normalize an origin to scheme://host:port format.
137 Args:
138 scheme: URL scheme (http/https)
139 host: Hostname
140 port: Port number (None uses default for scheme)
142 Returns:
143 Normalized origin string
144 """
145 # Use default ports for scheme if not specified
146 default_ports = {"http": 80, "https": 443}
147 if port is None or port == default_ports.get(scheme):
148 return f"{scheme}://{host}"
149 return f"{scheme}://{host}:{port}"
152def _validate_redirect_uri(redirect_uri: str, request: Request | None = None) -> bool:
153 """Validate redirect_uri to prevent open redirect attacks.
155 Validates against a server-side allowlist (settings.allowed_origins and settings.app_domain).
156 Does NOT trust the Host header to prevent spoofing attacks.
158 Allows:
159 - Relative URIs (no scheme/host)
160 - URIs matching configured allowed_origins (full origin including scheme and port)
161 - URIs matching app_domain (if configured)
163 Args:
164 redirect_uri: The redirect URI to validate
165 request: The FastAPI request object (unused, kept for API compatibility)
167 Returns:
168 True if the redirect_uri is safe, False otherwise
169 """
170 parsed = urlparse(redirect_uri)
172 # Allow relative URIs (no scheme and no netloc)
173 if not parsed.scheme and not parsed.netloc:
174 return True
176 # For absolute URIs, validate against server-side allowlist only
177 # Extract full origin components from redirect_uri
178 redirect_scheme = parsed.scheme.lower()
179 redirect_host = parsed.hostname.lower() if parsed.hostname else ""
180 redirect_port = parsed.port
182 # Normalize the redirect origin
183 redirect_origin = _normalize_origin(redirect_scheme, redirect_host, redirect_port)
185 # Check against app_domain (if configured)
186 if hasattr(settings, "app_domain") and settings.app_domain:
187 # app_domain is an HttpUrl - extract the hostname for comparison
188 app_domain_host = urlparse(str(settings.app_domain)).hostname or ""
189 app_domain_host = app_domain_host.lower()
190 if redirect_host == app_domain_host:
191 # Only allow HTTPS in production, or HTTP for localhost
192 if redirect_scheme == "https" or (redirect_scheme == "http" and app_domain_host in ("localhost", "127.0.0.1")):
193 return True
195 # Check against allowed_origins (full origin match including scheme and port)
196 if hasattr(settings, "allowed_origins") and settings.allowed_origins:
197 for origin in settings.allowed_origins:
198 origin = origin.strip()
199 if not origin:
200 continue
202 # Parse the allowed origin
203 origin_parsed = urlparse(origin if "://" in origin else f"https://{origin}")
204 origin_scheme = origin_parsed.scheme.lower() if origin_parsed.scheme else "https"
205 origin_host = origin_parsed.hostname.lower() if origin_parsed.hostname else origin.lower()
206 origin_port = origin_parsed.port
208 # Normalize and compare full origins
209 allowed_origin = _normalize_origin(origin_scheme, origin_host, origin_port)
210 if redirect_origin == allowed_origin:
211 return True
213 return False
216@sso_router.get("/login/{provider_id}", response_model=SSOLoginResponse)
217async def initiate_sso_login(
218 provider_id: str,
219 request: Request,
220 response: Response,
221 redirect_uri: str = Query(..., description="Callback URI after authentication"),
222 scopes: Optional[str] = Query(None, description="Space-separated OAuth scopes"),
223 db: Session = Depends(get_db),
224) -> SSOLoginResponse:
225 """Initiate SSO authentication flow.
227 Validates the redirect_uri against a server-side allowlist to prevent open redirect attacks.
228 Only allows relative URIs, URIs matching app_domain, or URIs from configured allowed_origins.
229 Does NOT trust the Host header for validation.
231 Args:
232 provider_id: SSO provider identifier (e.g., 'github', 'google')
233 request: FastAPI request object
234 response: FastAPI response object used to set session-binding cookie
235 redirect_uri: Callback URI after successful authentication
236 scopes: Optional custom OAuth scopes (space-separated)
237 db: Database session
239 Returns:
240 Authorization URL and state parameter for redirect.
242 Raises:
243 HTTPException: If SSO is disabled, provider not found, or redirect_uri is invalid
245 Examples:
246 >>> import asyncio
247 >>> asyncio.iscoroutinefunction(initiate_sso_login)
248 True
249 """
250 if not settings.sso_enabled:
251 raise HTTPException(status_code=404, detail="SSO authentication is disabled")
253 # Validate redirect_uri to prevent open redirect attacks
254 # Uses server-side allowlist (allowed_origins, app_domain) - does NOT trust Host header
255 if not _validate_redirect_uri(redirect_uri, request):
256 # Sanitize untrusted redirect_uri before logging to prevent log injection
257 logger.warning(f"SSO login rejected - invalid redirect_uri: {sanitize_for_log(redirect_uri)}")
258 raise HTTPException(
259 status_code=status.HTTP_400_BAD_REQUEST,
260 detail="Invalid redirect_uri. Must be a relative path or URL matching allowed origins.",
261 )
263 sso_service = SSOService(db)
264 scope_list = scopes.split() if scopes else None
265 browser_session_binding = secrets.token_urlsafe(32)
267 try:
268 auth_url = sso_service.get_authorization_url(provider_id, redirect_uri, scope_list, session_binding=browser_session_binding)
269 except ValueError as exc:
270 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
272 if not auth_url:
273 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found or disabled")
275 # Extract state from URL for client reference
276 # Standard
277 import urllib.parse
279 parsed = urllib.parse.urlparse(auth_url)
280 params = urllib.parse.parse_qs(parsed.query)
281 state = params.get("state", [""])[0]
283 use_secure = (settings.environment == "production") or settings.secure_cookies
284 response.set_cookie(
285 key="sso_session_id",
286 value=browser_session_binding,
287 httponly=True,
288 secure=use_secure,
289 samesite=settings.cookie_samesite,
290 path=settings.app_root_path or "/",
291 )
293 return SSOLoginResponse(authorization_url=auth_url, state=state)
296@sso_router.get("/callback/{provider_id}")
297async def handle_sso_callback(
298 provider_id: str,
299 code: Optional[str] = Query(None, description="Authorization code from SSO provider"),
300 state: Optional[str] = Query(None, description="CSRF state parameter"),
301 error: Optional[str] = Query(None, description="OAuth error code"),
302 error_description: Optional[str] = Query(None, description="OAuth error description"),
303 request: Request = None,
304 response: Response = None,
305 db: Session = Depends(get_db),
306):
307 """Handle SSO authentication callback.
309 Args:
310 provider_id: SSO provider identifier
311 code: Authorization code from provider (present on success)
312 state: CSRF state parameter for validation
313 error: OAuth error code (present on failure)
314 error_description: OAuth error description (present on failure)
315 request: FastAPI request object
316 response: FastAPI response object
317 db: Database session
319 Returns:
320 JWT access token and user information, or redirect to login with error.
322 Raises:
323 HTTPException: If SSO is disabled or authentication fails
325 Examples:
326 >>> import asyncio
327 >>> asyncio.iscoroutinefunction(handle_sso_callback)
328 True
329 """
330 # Third-Party
331 from fastapi.responses import RedirectResponse
333 if not settings.sso_enabled:
334 raise HTTPException(status_code=404, detail="SSO authentication is disabled")
336 # Get root path for URL construction
337 root_path = request.scope.get("root_path", "") if request else ""
339 # Handle OAuth error responses from provider (RFC 6749 Section 4.1.2.1)
340 if error:
341 error_msg = error_description or error
342 logger.warning("SSO callback error from provider '%s': %s - %s", provider_id, error, error_msg)
344 error_mappings = {
345 "access_denied": "sso_cancelled",
346 "invalid_request": "sso_invalid_request",
347 "unauthorized_client": "sso_unauthorized",
348 "unsupported_response_type": "sso_config_error",
349 "invalid_scope": "sso_invalid_scope",
350 "server_error": "sso_server_error",
351 "temporarily_unavailable": "sso_unavailable",
352 }
353 error_code = error_mappings.get(error, "sso_failed")
354 return RedirectResponse(url=f"{root_path}/admin/login?error={error_code}", status_code=302)
356 # Code and state are required if no error was returned
357 if not code:
358 logger.warning("SSO callback for provider '%s' missing both code and error parameters", provider_id)
359 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302)
361 if not state:
362 logger.warning("SSO callback for provider '%s' missing required state parameter", provider_id)
363 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302)
365 sso_service = SSOService(db)
367 # Handle OAuth callback — returns (user_info, token_data) or None
368 user_info: Optional[Dict[str, object]] = None
369 token_data: Dict[str, object] = {}
371 browser_session_binding = request.cookies.get("sso_session_id") if request else None
372 if not browser_session_binding:
373 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302)
375 callback_result = await sso_service.handle_oauth_callback_with_tokens(provider_id, code, state, session_binding=browser_session_binding)
376 if callback_result:
377 user_info, token_data = callback_result
379 if not user_info:
380 return RedirectResponse(url=f"{root_path}/admin/login?error=sso_failed", status_code=302)
382 # Authenticate or create user
383 access_token = await sso_service.authenticate_or_create_user(user_info)
384 if not access_token:
385 return RedirectResponse(url=f"{root_path}/admin/login?error=user_creation_failed", status_code=302)
387 # Create redirect response
388 redirect_response = RedirectResponse(url=f"{root_path}/admin", status_code=302)
390 # Set secure HTTP-only cookie using the same method as email auth
391 # First-Party
392 from mcpgateway.utils.security_cookies import CookieTooLargeError, set_auth_cookie
394 try:
395 set_auth_cookie(redirect_response, access_token, remember_me=False)
396 except CookieTooLargeError:
397 redirect_response = RedirectResponse(
398 url=f"{root_path}/admin/login?error=token_too_large",
399 status_code=302,
400 )
401 return redirect_response
403 # Persist Keycloak ID token as short-lived, HTTP-only hint for RP-initiated logout.
404 # Without id_token_hint, some Keycloak versions show confirmation and may preserve SSO.
405 id_token = token_data.get("id_token")
406 if provider_id == "keycloak" and isinstance(id_token, str) and id_token:
407 if len(id_token) > 3800: # Leave room for cookie metadata within browser 4KB limit
408 logger.warning("Keycloak id_token too large for cookie storage. RP-initiated logout will not include id_token_hint.")
409 else:
410 use_secure = (settings.environment == "production") or settings.secure_cookies
411 redirect_response.set_cookie(
412 key="sso_id_token_hint",
413 value=id_token,
414 max_age=settings.token_expiry * 60, # match session token lifetime
415 httponly=True,
416 secure=use_secure,
417 samesite=settings.cookie_samesite,
418 path=settings.app_root_path or "/",
419 )
421 return redirect_response
424# Admin endpoints for SSO provider management
425@sso_router.post("/admin/providers", response_model=Dict)
426@require_permission("admin.sso_providers:create")
427async def create_sso_provider(
428 provider_data: SSOProviderCreateRequest,
429 db: Session = Depends(get_db),
430 user=Depends(get_current_user_with_permissions),
431) -> Dict:
432 """Create new SSO provider configuration (Admin only).
434 Args:
435 provider_data: SSO provider configuration
436 db: Database session
437 user: Current authenticated user
439 Returns:
440 Created provider information.
442 Raises:
443 HTTPException: If provider already exists or creation fails
444 """
445 sso_service = SSOService(db)
447 # Check if provider already exists
448 existing = sso_service.get_provider(provider_data.id)
449 if existing:
450 raise HTTPException(status_code=409, detail=f"SSO provider '{provider_data.id}' already exists")
452 try:
453 provider = await sso_service.create_provider(provider_data.model_dump())
454 except ValueError as exc:
455 raise HTTPException(status_code=400, detail=str(exc)) from exc
457 result = {
458 "id": provider.id,
459 "name": provider.name,
460 "display_name": provider.display_name,
461 "provider_type": provider.provider_type,
462 "is_enabled": provider.is_enabled,
463 "created_at": provider.created_at,
464 }
465 db.commit()
466 db.close()
467 return result
470@sso_router.get("/admin/providers", response_model=List[Dict])
471@require_permission("admin.sso_providers:read")
472async def list_all_sso_providers(
473 db: Session = Depends(get_db),
474 user=Depends(get_current_user_with_permissions),
475) -> List[Dict]:
476 """List all SSO providers including disabled ones (Admin only).
478 Args:
479 db: Database session
480 user: Current authenticated user
482 Returns:
483 List of all SSO providers with configuration details.
484 """
485 # Third-Party
486 from sqlalchemy import select
488 # First-Party
489 from mcpgateway.db import SSOProvider
491 stmt = select(SSOProvider)
492 result = db.execute(stmt)
493 providers = result.scalars().all()
495 result = [
496 {
497 "id": provider.id,
498 "name": provider.name,
499 "display_name": provider.display_name,
500 "provider_type": provider.provider_type,
501 "is_enabled": provider.is_enabled,
502 "trusted_domains": provider.trusted_domains,
503 "auto_create_users": provider.auto_create_users,
504 "created_at": provider.created_at,
505 "updated_at": provider.updated_at,
506 }
507 for provider in providers
508 ]
509 db.commit()
510 db.close()
511 return result
514@sso_router.get("/admin/providers/{provider_id}", response_model=Dict)
515@require_permission("admin.sso_providers:read")
516async def get_sso_provider(
517 provider_id: str,
518 db: Session = Depends(get_db),
519 user=Depends(get_current_user_with_permissions),
520) -> Dict:
521 """Get SSO provider details (Admin only).
523 Args:
524 provider_id: Provider identifier
525 db: Database session
526 user: Current authenticated user
528 Returns:
529 Provider configuration details.
531 Raises:
532 HTTPException: If provider not found
533 """
534 sso_service = SSOService(db)
535 provider = sso_service.get_provider(provider_id)
537 if not provider:
538 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found")
540 result = {
541 "id": provider.id,
542 "name": provider.name,
543 "display_name": provider.display_name,
544 "provider_type": provider.provider_type,
545 "client_id": provider.client_id,
546 "authorization_url": provider.authorization_url,
547 "token_url": provider.token_url,
548 "userinfo_url": provider.userinfo_url,
549 "issuer": provider.issuer,
550 "jwks_uri": provider.jwks_uri,
551 "scope": provider.scope,
552 "trusted_domains": provider.trusted_domains,
553 "auto_create_users": provider.auto_create_users,
554 "team_mapping": provider.team_mapping,
555 "is_enabled": provider.is_enabled,
556 "created_at": provider.created_at,
557 "updated_at": provider.updated_at,
558 "provider_metadata": provider.provider_metadata,
559 }
560 db.commit()
561 db.close()
562 return result
565@sso_router.put("/admin/providers/{provider_id}", response_model=Dict)
566@require_permission("admin.sso_providers:update")
567async def update_sso_provider(
568 provider_id: str,
569 provider_data: SSOProviderUpdateRequest,
570 db: Session = Depends(get_db),
571 user=Depends(get_current_user_with_permissions),
572) -> Dict:
573 """Update SSO provider configuration (Admin only).
575 Args:
576 provider_id: Provider identifier
577 provider_data: Updated provider configuration
578 db: Database session
579 user: Current authenticated user
581 Returns:
582 Updated provider information.
584 Raises:
585 HTTPException: If provider not found or update fails
586 """
587 sso_service = SSOService(db)
589 # Filter out None values
590 update_data = {k: v for k, v in provider_data.model_dump().items() if v is not None}
591 if not update_data:
592 raise HTTPException(status_code=400, detail="No update data provided")
594 try:
595 provider = await sso_service.update_provider(provider_id, update_data)
596 except ValueError as exc:
597 raise HTTPException(status_code=400, detail=str(exc)) from exc
599 if not provider:
600 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found")
602 result = {
603 "id": provider.id,
604 "name": provider.name,
605 "display_name": provider.display_name,
606 "provider_type": provider.provider_type,
607 "is_enabled": provider.is_enabled,
608 "updated_at": provider.updated_at,
609 }
610 db.commit()
611 db.close()
612 return result
615@sso_router.delete("/admin/providers/{provider_id}")
616@require_permission("admin.sso_providers:delete")
617async def delete_sso_provider(
618 provider_id: str,
619 db: Session = Depends(get_db),
620 user=Depends(get_current_user_with_permissions),
621) -> Dict:
622 """Delete SSO provider configuration (Admin only).
624 Args:
625 provider_id: Provider identifier
626 db: Database session
627 user: Current authenticated user
629 Returns:
630 Deletion confirmation.
632 Raises:
633 HTTPException: If provider not found
634 """
635 sso_service = SSOService(db)
637 if not sso_service.delete_provider(provider_id):
638 raise HTTPException(status_code=404, detail=f"SSO provider '{provider_id}' not found")
640 db.commit()
641 db.close()
642 return {"message": f"SSO provider '{provider_id}' deleted successfully"}
645# ---------------------------------------------------------------------------
646# SSO User Approval Management Endpoints
647# ---------------------------------------------------------------------------
650class PendingUserApprovalResponse(BaseModel):
651 """Response model for pending user approval."""
653 id: str
654 email: str
655 full_name: str
656 auth_provider: str
657 requested_at: str
658 expires_at: str
659 status: str
660 sso_metadata: Optional[Dict] = None
663class ApprovalActionRequest(BaseModel):
664 """Request model for approval actions."""
666 action: str # "approve" or "reject"
667 reason: Optional[str] = None # Required for rejection
668 notes: Optional[str] = None
671@sso_router.get("/pending-approvals", response_model=List[PendingUserApprovalResponse])
672@require_permission("admin.user_management")
673async def list_pending_approvals(
674 include_expired: bool = Query(False, description="Include expired approval requests"),
675 db: Session = Depends(get_db),
676 user=Depends(get_current_user_with_permissions),
677) -> List[PendingUserApprovalResponse]:
678 """List pending SSO user approval requests (Admin only).
680 Args:
681 include_expired: Whether to include expired requests
682 db: Database session
683 user: Current authenticated admin user
685 Returns:
686 List of pending approval requests
687 """
688 # Third-Party
689 from sqlalchemy import select
691 # First-Party
692 from mcpgateway.db import PendingUserApproval
694 query = select(PendingUserApproval)
696 if not include_expired:
697 # First-Party
698 from mcpgateway.db import utc_now
700 query = query.where(PendingUserApproval.expires_at > utc_now())
702 # Filter by status
703 query = query.where(PendingUserApproval.status == "pending")
704 query = query.order_by(PendingUserApproval.requested_at.desc())
706 result = db.execute(query)
707 pending_approvals = result.scalars().all()
709 return [
710 PendingUserApprovalResponse(
711 id=approval.id,
712 email=approval.email,
713 full_name=approval.full_name,
714 auth_provider=approval.auth_provider,
715 requested_at=approval.requested_at.isoformat(),
716 expires_at=approval.expires_at.isoformat(),
717 status=approval.status,
718 sso_metadata=approval.sso_metadata,
719 )
720 for approval in pending_approvals
721 ]
724@sso_router.post("/pending-approvals/{approval_id}/action")
725@require_permission("admin.user_management")
726async def handle_approval_request(
727 approval_id: str,
728 request: ApprovalActionRequest,
729 db: Session = Depends(get_db),
730 user=Depends(get_current_user_with_permissions),
731) -> Dict:
732 """Approve or reject a pending SSO user registration (Admin only).
734 Args:
735 approval_id: ID of the approval request
736 request: Approval action (approve/reject) with optional reason/notes
737 db: Database session
738 user: Current authenticated admin user
740 Returns:
741 Action confirmation message
743 Raises:
744 HTTPException: If approval not found or invalid action
745 """
746 # Third-Party
747 from sqlalchemy import select
749 # First-Party
750 from mcpgateway.db import PendingUserApproval
752 # Get pending approval
753 approval = db.execute(select(PendingUserApproval).where(PendingUserApproval.id == approval_id)).scalar_one_or_none()
755 if not approval:
756 raise HTTPException(status_code=404, detail="Approval request not found")
758 if approval.status != "pending":
759 raise HTTPException(status_code=400, detail=f"Approval request is already {approval.status}")
761 if approval.is_expired():
762 approval.status = "expired"
763 db.commit()
764 raise HTTPException(status_code=400, detail="Approval request has expired")
766 admin_email = user["email"]
768 if request.action == "approve":
769 approval.approve(admin_email, request.notes)
770 db.commit()
771 return {"message": f"User {approval.email} approved successfully"}
773 elif request.action == "reject":
774 if not request.reason:
775 raise HTTPException(status_code=400, detail="Rejection reason is required")
776 approval.reject(admin_email, request.reason, request.notes)
777 db.commit()
778 return {"message": f"User {approval.email} rejected"}
780 else:
781 raise HTTPException(status_code=400, detail="Invalid action. Must be 'approve' or 'reject'")