Coverage for mcpgateway / routers / email_auth.py: 100%
253 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/email_auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Email Authentication Router.
8This module provides FastAPI routes for email-based authentication
9including login, registration, password management, and user profile endpoints.
11Examples:
12 >>> from fastapi import FastAPI
13 >>> from mcpgateway.routers.email_auth import email_auth_router
14 >>> app = FastAPI()
15 >>> app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Auth"])
16 >>> isinstance(email_auth_router, APIRouter)
17 True
18"""
20# Standard
21from datetime import datetime, timedelta, UTC
22from typing import List, Optional, Union
24# Third-Party
25from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
26from fastapi.security import HTTPBearer
27from sqlalchemy.orm import Session
29# First-Party
30from mcpgateway.auth import get_current_user
31from mcpgateway.config import settings
32from mcpgateway.db import EmailUser, SessionLocal, utc_now
33from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
34from mcpgateway.schemas import (
35 AdminCreateUserRequest,
36 AdminUserUpdateRequest,
37 AuthenticationResponse,
38 AuthEventResponse,
39 ChangePasswordRequest,
40 CursorPaginatedUsersResponse,
41 EmailLoginRequest,
42 EmailUserResponse,
43 PublicRegistrationRequest,
44 SuccessResponse,
45)
46from mcpgateway.services.email_auth_service import AuthenticationError, EmailAuthService, EmailValidationError, PasswordValidationError, UserExistsError
47from mcpgateway.services.logging_service import LoggingService
48from mcpgateway.utils.create_jwt_token import create_jwt_token
49from mcpgateway.utils.orjson_response import ORJSONResponse
51# Initialize logging
52logging_service = LoggingService()
53logger = logging_service.get_logger(__name__)
55# Create router
56email_auth_router = APIRouter()
58# Security scheme
59bearer_scheme = HTTPBearer(auto_error=False)
62def get_db():
63 """Database dependency.
65 Commits the transaction on successful completion to avoid implicit rollbacks
66 for read-only operations. Rolls back explicitly on exception.
68 Yields:
69 Session: SQLAlchemy database session
71 Raises:
72 Exception: Re-raises any exception after rolling back the transaction.
73 """
74 db = SessionLocal()
75 try:
76 yield db
77 db.commit()
78 except Exception:
79 try:
80 db.rollback()
81 except Exception:
82 try:
83 db.invalidate()
84 except Exception:
85 pass # nosec B110 - Best effort cleanup on connection failure
86 raise
87 finally:
88 db.close()
91def get_client_ip(request: Request) -> str:
92 """Extract client IP address from request.
94 Args:
95 request: FastAPI request object
97 Returns:
98 str: Client IP address
99 """
100 # Check for X-Forwarded-For header (proxy/load balancer)
101 forwarded_for = request.headers.get("X-Forwarded-For")
102 if forwarded_for:
103 return forwarded_for.split(",")[0].strip()
105 # Check for X-Real-IP header
106 real_ip = request.headers.get("X-Real-IP")
107 if real_ip:
108 return real_ip
110 # Fall back to direct client IP
111 return request.client.host if request.client else "unknown"
114def get_user_agent(request: Request) -> str:
115 """Extract user agent from request.
117 Args:
118 request: FastAPI request object
120 Returns:
121 str: User agent string
122 """
123 return request.headers.get("User-Agent", "unknown")
126async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = None, jti: Optional[str] = None) -> tuple[str, int]:
127 """Create JWT access token for user with enhanced scoping.
129 Args:
130 user: EmailUser instance
131 token_scopes: Optional token scoping information
132 jti: Optional JWT ID for revocation tracking
134 Returns:
135 Tuple of (token_string, expires_in_seconds)
136 """
137 now = datetime.now(tz=UTC)
138 expires_delta = timedelta(minutes=settings.token_expiry)
139 expire = now + expires_delta
141 # Create JWT payload — session token (teams resolved server-side at request time)
142 payload = {
143 # Standard JWT claims
144 "sub": user.email,
145 "iss": settings.jwt_issuer,
146 "aud": settings.jwt_audience,
147 "iat": int(now.timestamp()),
148 "exp": int(expire.timestamp()),
149 "jti": jti or str(__import__("uuid").uuid4()),
150 # User profile information
151 "user": {
152 "email": str(getattr(user, "email", "")),
153 "full_name": str(getattr(user, "full_name", "")),
154 "is_admin": bool(getattr(user, "is_admin", False)),
155 "auth_provider": str(getattr(user, "auth_provider", "local")),
156 },
157 "token_use": "session", # nosec B105 - token type marker, not a password
158 # Token scoping (if provided)
159 "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}},
160 }
162 # Generate token using centralized token creation
163 token = await create_jwt_token(payload)
165 return token, int(expires_delta.total_seconds())
168async def create_legacy_access_token(user: EmailUser) -> tuple[str, int]:
169 """Create legacy JWT access token for backwards compatibility.
171 Args:
172 user: EmailUser instance
174 Returns:
175 Tuple of (token_string, expires_in_seconds)
176 """
177 now = datetime.now(tz=UTC)
178 expires_delta = timedelta(minutes=settings.token_expiry)
179 expire = now + expires_delta
181 # Create simple JWT payload (original format) with primitives only
182 payload = {
183 "sub": str(getattr(user, "email", "")),
184 "email": str(getattr(user, "email", "")),
185 "full_name": str(getattr(user, "full_name", "")),
186 "is_admin": bool(getattr(user, "is_admin", False)),
187 "auth_provider": str(getattr(user, "auth_provider", "local")),
188 "iat": int(now.timestamp()),
189 "exp": int(expire.timestamp()),
190 "iss": settings.jwt_issuer,
191 "aud": settings.jwt_audience,
192 }
194 # Generate token using centralized token creation
195 token = await create_jwt_token(payload)
197 return token, int(expires_delta.total_seconds())
200@email_auth_router.post("/login", response_model=AuthenticationResponse)
201async def login(login_request: EmailLoginRequest, request: Request, db: Session = Depends(get_db)):
202 """Authenticate user with email and password.
204 Args:
205 login_request: Login credentials
206 request: FastAPI request object
207 db: Database session
209 Returns:
210 AuthenticationResponse: Access token and user info
212 Examples:
213 >>> import asyncio
214 >>> asyncio.iscoroutinefunction(login)
215 True
217 Raises:
218 HTTPException: If authentication fails
220 Examples:
221 Request JSON:
222 {
223 "email": "user@example.com",
224 "password": "secure_password"
225 }
226 """
227 auth_service = EmailAuthService(db)
228 ip_address = get_client_ip(request)
229 user_agent = get_user_agent(request)
231 try:
232 # Authenticate user
233 user = await auth_service.authenticate_user(email=login_request.email, password=login_request.password, ip_address=ip_address, user_agent=user_agent)
235 if not user:
236 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
238 # Password change enforcement respects master switch and individual toggles
239 needs_password_change = False
241 if settings.password_change_enforcement_enabled:
242 # If flag is set on the user, always honor it (flag is cleared when password is changed)
243 if getattr(user, "password_change_required", False):
244 needs_password_change = True
245 logger.debug("User %s has password_change_required flag set", login_request.email)
247 # Enforce expiry-based password change if configured and not already required
248 if not needs_password_change:
249 try:
250 pwd_changed = getattr(user, "password_changed_at", None)
251 if isinstance(pwd_changed, datetime):
252 age_days = (utc_now() - pwd_changed).days
253 max_age = getattr(settings, "password_max_age_days", 90)
254 if age_days >= max_age:
255 needs_password_change = True
256 logger.debug("User %s password expired (%s days >= %s)", login_request.email, age_days, max_age)
257 except Exception as exc:
258 logger.debug("Failed to evaluate password age for %s: %s", login_request.email, exc)
260 # Detect default password on login if enabled
261 if getattr(settings, "detect_default_password_on_login", True):
262 # First-Party
263 from mcpgateway.services.argon2_service import Argon2PasswordService
265 password_service = Argon2PasswordService()
266 is_using_default_password = await password_service.verify_password_async(settings.default_user_password.get_secret_value(), user.password_hash) # nosec B105
267 if is_using_default_password:
268 # Mark user for password change depending on configuration
269 if getattr(settings, "require_password_change_for_default_password", True):
270 user.password_change_required = True
271 needs_password_change = True
272 try:
273 db.commit()
274 except Exception as exc: # log commit failures
275 logger.warning("Failed to commit password_change_required flag for %s: %s", login_request.email, exc)
276 else:
277 logger.info("User %s is using default password but enforcement is disabled", login_request.email)
279 if needs_password_change:
280 logger.info(f"Login blocked for {login_request.email}: password change required")
281 return ORJSONResponse(
282 status_code=status.HTTP_403_FORBIDDEN,
283 content={"detail": "Password change required. Please change your password before continuing."},
284 headers={"X-Password-Change-Required": "true"},
285 )
287 # Create access token
288 access_token, expires_in = await create_access_token(user)
290 # Return authentication response
291 return AuthenticationResponse(
292 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
293 ) # nosec B106 - OAuth2 token type, not a password
295 except HTTPException:
296 raise # Re-raise HTTP exceptions as-is (401, 403, etc.)
297 except Exception as e:
298 logger.error(f"Login error for {login_request.email}: {e}")
299 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")
302@email_auth_router.post("/register", response_model=AuthenticationResponse)
303async def register(registration_request: PublicRegistrationRequest, request: Request, db: Session = Depends(get_db)):
304 """Register a new user account.
306 This endpoint is controlled by the PUBLIC_REGISTRATION_ENABLED setting.
307 When disabled (default), returns 403 Forbidden and users can only be
308 created by administrators via the admin API.
310 Args:
311 registration_request: Registration information (email, password, full_name only)
312 request: FastAPI request object
313 db: Database session
315 Returns:
316 AuthenticationResponse: Access token and user info
318 Raises:
319 HTTPException: If registration fails or is disabled
321 Examples:
322 Request JSON:
323 {
324 "email": "new@example.com",
325 "password": "secure_password",
326 "full_name": "New User"
327 }
328 """
329 # Check if public registration is allowed
330 if not settings.public_registration_enabled:
331 logger.warning(f"Registration attempt rejected - public registration disabled: {registration_request.email}")
332 raise HTTPException(
333 status_code=status.HTTP_403_FORBIDDEN,
334 detail="Public registration is disabled. Please contact an administrator to create an account.",
335 )
337 auth_service = EmailAuthService(db)
338 get_client_ip(request)
339 get_user_agent(request)
341 try:
342 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing
343 # Security-sensitive fields are hardcoded (not exposed on public schema)
344 user = await auth_service.create_user(
345 email=registration_request.email,
346 password=registration_request.password,
347 full_name=registration_request.full_name,
348 is_admin=False, # Regular users cannot self-register as admin
349 is_active=True, # Public registrations are always active
350 password_change_required=False, # No forced password change for self-registration
351 auth_provider="local",
352 )
354 # Create access token
355 access_token, expires_in = await create_access_token(user)
357 logger.info(f"New user registered: {user.email}")
359 return AuthenticationResponse(
360 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
361 ) # nosec B106 - OAuth2 token type, not a password
363 except EmailValidationError as e:
364 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
365 except PasswordValidationError as e:
366 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
367 except UserExistsError as e:
368 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
369 except Exception as e:
370 logger.error(f"Registration error for {registration_request.email}: {e}")
371 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration service error")
374@email_auth_router.post("/change-password", response_model=SuccessResponse)
375async def change_password(password_request: ChangePasswordRequest, request: Request, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)):
376 """Change user's password.
378 Args:
379 password_request: Old and new passwords
380 request: FastAPI request object
381 current_user: Currently authenticated user
382 db: Database session
384 Returns:
385 SuccessResponse: Success confirmation
387 Raises:
388 HTTPException: If password change fails
390 Examples:
391 Request JSON (with Bearer token in Authorization header):
392 {
393 "old_password": "current_password",
394 "new_password": "new_secure_password"
395 }
396 """
397 auth_service = EmailAuthService(db)
398 ip_address = get_client_ip(request)
399 user_agent = get_user_agent(request)
401 try:
402 # Change password
403 success = await auth_service.change_password(
404 email=current_user.email, old_password=password_request.old_password, new_password=password_request.new_password, ip_address=ip_address, user_agent=user_agent
405 )
407 if success:
408 return SuccessResponse(success=True, message="Password changed successfully")
409 else:
410 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password")
412 except AuthenticationError as e:
413 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
414 except PasswordValidationError as e:
415 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
416 except Exception as e:
417 logger.error(f"Password change error for {current_user.email}: {e}")
418 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Password change service error")
421@email_auth_router.get("/me", response_model=EmailUserResponse)
422async def get_current_user_profile(current_user: EmailUser = Depends(get_current_user)):
423 """Get current user's profile information.
425 Args:
426 current_user: Currently authenticated user
428 Returns:
429 EmailUserResponse: User profile information
431 Raises:
432 HTTPException: If user authentication fails
434 Examples:
435 >>> # GET /auth/email/me
436 >>> # Headers: Authorization: Bearer <token>
437 """
438 return EmailUserResponse.from_email_user(current_user)
441@email_auth_router.get("/events", response_model=list[AuthEventResponse])
442async def get_auth_events(limit: int = 50, offset: int = 0, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)):
443 """Get authentication events for the current user.
445 Args:
446 limit: Maximum number of events to return
447 offset: Number of events to skip
448 current_user: Currently authenticated user
449 db: Database session
451 Returns:
452 List[AuthEventResponse]: Authentication events
454 Raises:
455 HTTPException: If user authentication fails
457 Examples:
458 >>> # GET /auth/email/events?limit=10&offset=0
459 >>> # Headers: Authorization: Bearer <token>
460 """
461 auth_service = EmailAuthService(db)
463 try:
464 events = await auth_service.get_auth_events(email=current_user.email, limit=limit, offset=offset)
466 return [AuthEventResponse.model_validate(event) for event in events]
468 except Exception as e:
469 logger.error(f"Error getting auth events for {current_user.email}: {e}")
470 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events")
473# Admin-only endpoints
474@email_auth_router.get("/admin/users", response_model=Union[CursorPaginatedUsersResponse, List[EmailUserResponse]])
475@require_permission("admin.user_management")
476async def list_users(
477 cursor: Optional[str] = Query(None, description="Pagination cursor for fetching the next set of results"),
478 limit: Optional[int] = Query(
479 None,
480 ge=0,
481 le=settings.pagination_max_page_size,
482 description="Maximum number of users to return. 0 means all (no limit). Default uses pagination_default_page_size.",
483 ),
484 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
485 current_user_ctx: dict = Depends(get_current_user_with_permissions),
486 db: Session = Depends(get_db),
487) -> Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]:
488 """List all users (admin only) with cursor-based pagination support.
490 Args:
491 cursor: Pagination cursor for fetching the next set of results
492 limit: Maximum number of users to return. Use 0 for all users (no limit).
493 If not specified, uses pagination_default_page_size (default: 50).
494 include_pagination: Whether to include cursor pagination metadata in the response (default: false)
495 current_user_ctx: Currently authenticated user context with permissions
496 db: Database session
498 Returns:
499 CursorPaginatedUsersResponse with users and nextCursor if include_pagination=true, or
500 List of users if include_pagination=false
502 Raises:
503 HTTPException: If user is not admin
505 Examples:
506 >>> # Cursor-based with pagination: GET /auth/email/admin/users?cursor=eyJlbWFpbCI6Li4ufQ&include_pagination=true
507 >>> # Simple list: GET /auth/email/admin/users
508 >>> # Headers: Authorization: Bearer <admin_token>
509 """
510 auth_service = EmailAuthService(db)
512 try:
513 result = await auth_service.list_users(cursor=cursor, limit=limit)
514 user_responses = [EmailUserResponse.from_email_user(user) for user in result.data]
516 if include_pagination:
517 return CursorPaginatedUsersResponse(users=user_responses, next_cursor=result.next_cursor)
519 return user_responses
521 except Exception as e:
522 logger.error(f"Error listing users: {e}")
523 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user list")
526@email_auth_router.get("/admin/events", response_model=list[AuthEventResponse])
527@require_permission("admin.user_management")
528async def list_all_auth_events(limit: int = 100, offset: int = 0, user_email: Optional[str] = None, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
529 """List authentication events for all users (admin only).
531 Args:
532 limit: Maximum number of events to return
533 offset: Number of events to skip
534 user_email: Filter events by specific user email
535 current_user_ctx: Currently authenticated user context with permissions
536 db: Database session
538 Returns:
539 List[AuthEventResponse]: Authentication events
541 Raises:
542 HTTPException: If user is not admin
544 Examples:
545 >>> # GET /auth/email/admin/events?limit=50&user_email=user@example.com
546 >>> # Headers: Authorization: Bearer <admin_token>
547 """
548 auth_service = EmailAuthService(db)
550 try:
551 events = await auth_service.get_auth_events(email=user_email, limit=limit, offset=offset)
553 return [AuthEventResponse.model_validate(event) for event in events]
555 except Exception as e:
556 logger.error(f"Error getting auth events: {e}")
557 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events")
560@email_auth_router.post("/admin/users", response_model=EmailUserResponse, status_code=status.HTTP_201_CREATED)
561@require_permission("admin.user_management")
562async def create_user(user_request: AdminCreateUserRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
563 """Create a new user account (admin only).
565 Args:
566 user_request: User creation information
567 current_user_ctx: Currently authenticated user context with permissions
568 db: Database session
570 Returns:
571 EmailUserResponse: Created user information
573 Raises:
574 HTTPException: If user creation fails
576 Examples:
577 Request JSON:
578 {
579 "email": "newuser@example.com",
580 "password": "secure_password",
581 "full_name": "New User",
582 "is_admin": false
583 }
584 """
585 auth_service = EmailAuthService(db)
587 try:
588 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing
589 # Create new user with all fields from request
590 user = await auth_service.create_user(
591 email=user_request.email,
592 password=user_request.password,
593 full_name=user_request.full_name,
594 is_admin=user_request.is_admin,
595 is_active=user_request.is_active,
596 password_change_required=user_request.password_change_required,
597 auth_provider="local",
598 granted_by=current_user_ctx.get("email"),
599 )
601 # If the user was created with the default password, optionally force password change
602 if (
603 settings.password_change_enforcement_enabled
604 and getattr(settings, "require_password_change_for_default_password", True)
605 and user_request.password == settings.default_user_password.get_secret_value()
606 ): # nosec B105
607 user.password_change_required = True
608 db.commit()
610 logger.info(f"Admin {current_user_ctx['email']} created user: {user.email}")
612 db.commit()
613 db.close()
614 return EmailUserResponse.from_email_user(user)
616 except EmailValidationError as e:
617 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
618 except PasswordValidationError as e:
619 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
620 except UserExistsError as e:
621 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
622 except Exception as e:
623 logger.error(f"Admin user creation error: {e}")
624 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="User creation failed")
627@email_auth_router.get("/admin/users/{user_email}", response_model=EmailUserResponse)
628@require_permission("admin.user_management")
629async def get_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
630 """Get user by email (admin only).
632 Args:
633 user_email: Email of user to retrieve
634 current_user_ctx: Currently authenticated user context with permissions
635 db: Database session
637 Returns:
638 EmailUserResponse: User information
640 Raises:
641 HTTPException: If user not found
642 """
643 auth_service = EmailAuthService(db)
645 try:
646 user = await auth_service.get_user_by_email(user_email)
647 if not user:
648 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
650 return EmailUserResponse.from_email_user(user)
652 except HTTPException:
653 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.)
654 except Exception as e:
655 logger.error(f"Error retrieving user {user_email}: {e}")
656 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user")
659@email_auth_router.put("/admin/users/{user_email}", response_model=EmailUserResponse)
660@require_permission("admin.user_management")
661async def update_user(user_email: str, user_request: AdminUserUpdateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
662 """Update user information (admin only).
664 Args:
665 user_email: Email of user to update
666 user_request: Updated user information
667 current_user_ctx: Currently authenticated user context with permissions
668 db: Database session
670 Returns:
671 EmailUserResponse: Updated user information
673 Raises:
674 HTTPException: If user not found or update fails
675 """
676 auth_service = EmailAuthService(db)
678 try:
679 user = await auth_service.update_user(
680 email=user_email,
681 full_name=user_request.full_name,
682 is_admin=user_request.is_admin,
683 is_active=user_request.is_active,
684 password_change_required=user_request.password_change_required,
685 password=user_request.password,
686 admin_origin_source="api",
687 )
689 logger.info(f"Admin {current_user_ctx['email']} updated user: {user.email}")
691 result = EmailUserResponse.from_email_user(user)
692 return result
694 except ValueError as e:
695 error_msg = str(e)
696 if "not found" in error_msg.lower():
697 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
698 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
699 except PasswordValidationError as e:
700 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
701 except Exception as e:
702 logger.error(f"Error updating user {user_email}: {e}")
703 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user")
706@email_auth_router.delete("/admin/users/{user_email}", response_model=SuccessResponse)
707@require_permission("admin.user_management")
708async def delete_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
709 """Delete/deactivate user (admin only).
711 Args:
712 user_email: Email of user to delete
713 current_user_ctx: Currently authenticated user context with permissions
714 db: Database session
716 Returns:
717 SuccessResponse: Success confirmation
719 Raises:
720 HTTPException: If user not found or deletion fails
721 """
722 auth_service = EmailAuthService(db)
724 try:
725 # Prevent admin from deleting themselves
726 if user_email == current_user_ctx["email"]:
727 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account")
729 # Prevent deleting the last active admin user
730 if await auth_service.is_last_active_admin(user_email):
731 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete the last remaining admin user")
733 # Hard delete using auth service
734 await auth_service.delete_user(user_email)
736 logger.info(f"Admin {current_user_ctx['email']} deleted user: {user_email}")
738 db.commit()
739 db.close()
740 return SuccessResponse(success=True, message=f"User {user_email} has been deleted")
742 except HTTPException:
743 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.)
744 except Exception as e:
745 logger.error(f"Error deleting user {user_email}: {e}")
746 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete user")