Coverage for mcpgateway / routers / email_auth.py: 100%
308 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/email_auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Email Authentication Router.
8This module provides FastAPI routes for email-based authentication
9including login, registration, password management, and user profile endpoints.
11Examples:
12 >>> from fastapi import FastAPI
13 >>> from mcpgateway.routers.email_auth import email_auth_router
14 >>> app = FastAPI()
15 >>> app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Auth"])
16 >>> isinstance(email_auth_router, APIRouter)
17 True
18"""
20# Standard
21from datetime import datetime, timedelta, UTC
22from typing import List, Optional, Union
24# Third-Party
25from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
26from fastapi.security import HTTPBearer
27from sqlalchemy.orm import Session
29# First-Party
30from mcpgateway.auth import get_current_user
31from mcpgateway.config import settings
32from mcpgateway.db import EmailUser, SessionLocal, utc_now
33from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
34from mcpgateway.schemas import (
35 AdminCreateUserRequest,
36 AdminUserUpdateRequest,
37 AuthenticationResponse,
38 AuthEventResponse,
39 ChangePasswordRequest,
40 CursorPaginatedUsersResponse,
41 EmailLoginRequest,
42 EmailUserResponse,
43 ForgotPasswordRequest,
44 PasswordResetTokenValidationResponse,
45 PublicRegistrationRequest,
46 ResetPasswordRequest,
47 SuccessResponse,
48)
49from mcpgateway.services.email_auth_service import AuthenticationError, EmailAuthService, EmailValidationError, PasswordValidationError, UserExistsError
50from mcpgateway.services.logging_service import LoggingService
51from mcpgateway.utils.create_jwt_token import create_jwt_token
52from mcpgateway.utils.orjson_response import ORJSONResponse
54# Initialize logging
55logging_service = LoggingService()
56logger = logging_service.get_logger(__name__)
58# Create router
59email_auth_router = APIRouter()
61# Security scheme
62bearer_scheme = HTTPBearer(auto_error=False)
65def get_db():
66 """Database dependency.
68 Commits the transaction on successful completion to avoid implicit rollbacks
69 for read-only operations. Rolls back explicitly on exception.
71 Yields:
72 Session: SQLAlchemy database session
74 Raises:
75 Exception: Re-raises any exception after rolling back the transaction.
76 """
77 db = SessionLocal()
78 try:
79 yield db
80 db.commit()
81 except Exception:
82 try:
83 db.rollback()
84 except Exception:
85 try:
86 db.invalidate()
87 except Exception:
88 pass # nosec B110 - Best effort cleanup on connection failure
89 raise
90 finally:
91 db.close()
94def get_client_ip(request: Request) -> str:
95 """Extract client IP address from request.
97 Args:
98 request: FastAPI request object
100 Returns:
101 str: Client IP address
102 """
103 # Check for X-Forwarded-For header (proxy/load balancer)
104 forwarded_for = request.headers.get("X-Forwarded-For")
105 if forwarded_for:
106 return forwarded_for.split(",")[0].strip()
108 # Check for X-Real-IP header
109 real_ip = request.headers.get("X-Real-IP")
110 if real_ip:
111 return real_ip
113 # Fall back to direct client IP
114 return request.client.host if request.client else "unknown"
117def get_user_agent(request: Request) -> str:
118 """Extract user agent from request.
120 Args:
121 request: FastAPI request object
123 Returns:
124 str: User agent string
125 """
126 return request.headers.get("User-Agent", "unknown")
129async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = None, jti: Optional[str] = None) -> tuple[str, int]:
130 """Create JWT access token for user with enhanced scoping.
132 Args:
133 user: EmailUser instance
134 token_scopes: Optional token scoping information
135 jti: Optional JWT ID for revocation tracking
137 Returns:
138 Tuple of (token_string, expires_in_seconds)
139 """
140 now = datetime.now(tz=UTC)
141 expires_delta = timedelta(minutes=settings.token_expiry)
142 expire = now + expires_delta
144 # Create JWT payload — session token (teams resolved server-side at request time)
145 payload = {
146 # Standard JWT claims
147 "sub": user.email,
148 "iss": settings.jwt_issuer,
149 "aud": settings.jwt_audience,
150 "iat": int(now.timestamp()),
151 "exp": int(expire.timestamp()),
152 "jti": jti or str(__import__("uuid").uuid4()),
153 # User profile information
154 "user": {
155 "email": str(getattr(user, "email", "")),
156 "full_name": str(getattr(user, "full_name", "")),
157 "is_admin": bool(getattr(user, "is_admin", False)),
158 "auth_provider": str(getattr(user, "auth_provider", "local")),
159 },
160 "token_use": "session", # nosec B105 - token type marker, not a password
161 # Token scoping (if provided)
162 "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}},
163 }
165 # Generate token using centralized token creation
166 token = await create_jwt_token(payload)
168 return token, int(expires_delta.total_seconds())
171async def create_legacy_access_token(user: EmailUser) -> tuple[str, int]:
172 """Create legacy JWT access token for backwards compatibility.
174 Args:
175 user: EmailUser instance
177 Returns:
178 Tuple of (token_string, expires_in_seconds)
179 """
180 now = datetime.now(tz=UTC)
181 expires_delta = timedelta(minutes=settings.token_expiry)
182 expire = now + expires_delta
184 # Create simple JWT payload (original format) with primitives only
185 payload = {
186 "sub": str(getattr(user, "email", "")),
187 "email": str(getattr(user, "email", "")),
188 "full_name": str(getattr(user, "full_name", "")),
189 "is_admin": bool(getattr(user, "is_admin", False)),
190 "auth_provider": str(getattr(user, "auth_provider", "local")),
191 "iat": int(now.timestamp()),
192 "exp": int(expire.timestamp()),
193 "iss": settings.jwt_issuer,
194 "aud": settings.jwt_audience,
195 }
197 # Generate token using centralized token creation
198 token = await create_jwt_token(payload)
200 return token, int(expires_delta.total_seconds())
203@email_auth_router.post("/login", response_model=AuthenticationResponse)
204async def login(login_request: EmailLoginRequest, request: Request, db: Session = Depends(get_db)):
205 """Authenticate user with email and password.
207 Args:
208 login_request: Login credentials
209 request: FastAPI request object
210 db: Database session
212 Returns:
213 AuthenticationResponse: Access token and user info
215 Examples:
216 >>> import asyncio
217 >>> asyncio.iscoroutinefunction(login)
218 True
220 Raises:
221 HTTPException: If authentication fails
223 Examples:
224 Request JSON:
225 {
226 "email": "user@example.com",
227 "password": "secure_password"
228 }
229 """
230 auth_service = EmailAuthService(db)
231 ip_address = get_client_ip(request)
232 user_agent = get_user_agent(request)
234 try:
235 # Authenticate user
236 user = await auth_service.authenticate_user(email=login_request.email, password=login_request.password, ip_address=ip_address, user_agent=user_agent)
238 if not user:
239 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
241 # Password change enforcement respects master switch and individual toggles
242 needs_password_change = False
244 if settings.password_change_enforcement_enabled:
245 # If flag is set on the user, always honor it (flag is cleared when password is changed)
246 if getattr(user, "password_change_required", False):
247 needs_password_change = True
248 logger.debug("User %s has password_change_required flag set", login_request.email)
250 # Enforce expiry-based password change if configured and not already required
251 if not needs_password_change:
252 try:
253 pwd_changed = getattr(user, "password_changed_at", None)
254 if isinstance(pwd_changed, datetime):
255 age_days = (utc_now() - pwd_changed).days
256 max_age = getattr(settings, "password_max_age_days", 90)
257 if age_days >= max_age:
258 needs_password_change = True
259 logger.debug("User %s password expired (%s days >= %s)", login_request.email, age_days, max_age)
260 except Exception as exc:
261 logger.debug("Failed to evaluate password age for %s: %s", login_request.email, exc)
263 # Detect default password on login if enabled
264 if getattr(settings, "detect_default_password_on_login", True):
265 # First-Party
266 from mcpgateway.services.argon2_service import Argon2PasswordService
268 password_service = Argon2PasswordService()
269 is_using_default_password = await password_service.verify_password_async(settings.default_user_password.get_secret_value(), user.password_hash) # nosec B105
270 if is_using_default_password:
271 # Mark user for password change depending on configuration
272 if getattr(settings, "require_password_change_for_default_password", True):
273 user.password_change_required = True
274 needs_password_change = True
275 try:
276 db.commit()
277 except Exception as exc: # log commit failures
278 logger.warning("Failed to commit password_change_required flag for %s: %s", login_request.email, exc)
279 else:
280 logger.info("User %s is using default password but enforcement is disabled", login_request.email)
282 if needs_password_change:
283 logger.info(f"Login blocked for {login_request.email}: password change required")
284 return ORJSONResponse(
285 status_code=status.HTTP_403_FORBIDDEN,
286 content={"detail": "Password change required. Please change your password before continuing."},
287 headers={"X-Password-Change-Required": "true"},
288 )
290 # Create access token
291 access_token, expires_in = await create_access_token(user)
293 # Return authentication response
294 return AuthenticationResponse(
295 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
296 ) # nosec B106 - OAuth2 token type, not a password
298 except HTTPException:
299 raise # Re-raise HTTP exceptions as-is (401, 403, etc.)
300 except Exception as e:
301 logger.error(f"Login error for {login_request.email}: {e}")
302 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")
305@email_auth_router.post("/register", response_model=AuthenticationResponse)
306async def register(registration_request: PublicRegistrationRequest, request: Request, db: Session = Depends(get_db)):
307 """Register a new user account.
309 This endpoint is controlled by the PUBLIC_REGISTRATION_ENABLED setting.
310 When disabled (default), returns 403 Forbidden and users can only be
311 created by administrators via the admin API.
313 Args:
314 registration_request: Registration information (email, password, full_name only)
315 request: FastAPI request object
316 db: Database session
318 Returns:
319 AuthenticationResponse: Access token and user info
321 Raises:
322 HTTPException: If registration fails or is disabled
324 Examples:
325 Request JSON:
326 {
327 "email": "new@example.com",
328 "password": "secure_password",
329 "full_name": "New User"
330 }
331 """
332 # Check if public registration is allowed
333 if not settings.public_registration_enabled:
334 logger.warning(f"Registration attempt rejected - public registration disabled: {registration_request.email}")
335 raise HTTPException(
336 status_code=status.HTTP_403_FORBIDDEN,
337 detail="Public registration is disabled. Please contact an administrator to create an account.",
338 )
340 auth_service = EmailAuthService(db)
341 get_client_ip(request)
342 get_user_agent(request)
344 try:
345 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing
346 # Security-sensitive fields are hardcoded (not exposed on public schema)
347 user = await auth_service.create_user(
348 email=registration_request.email,
349 password=registration_request.password,
350 full_name=registration_request.full_name,
351 is_admin=False, # Regular users cannot self-register as admin
352 is_active=True, # Public registrations are always active
353 password_change_required=False, # No forced password change for self-registration
354 auth_provider="local",
355 )
357 # Create access token
358 access_token, expires_in = await create_access_token(user)
360 logger.info(f"New user registered: {user.email}")
362 return AuthenticationResponse(
363 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
364 ) # nosec B106 - OAuth2 token type, not a password
366 except EmailValidationError as e:
367 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
368 except PasswordValidationError as e:
369 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
370 except UserExistsError as e:
371 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
372 except Exception as e:
373 logger.error(f"Registration error for {registration_request.email}: {e}")
374 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration service error")
377@email_auth_router.post("/change-password", response_model=SuccessResponse)
378async def change_password(password_request: ChangePasswordRequest, request: Request, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)):
379 """Change user's password.
381 Args:
382 password_request: Old and new passwords
383 request: FastAPI request object
384 current_user: Currently authenticated user
385 db: Database session
387 Returns:
388 SuccessResponse: Success confirmation
390 Raises:
391 HTTPException: If password change fails
393 Examples:
394 Request JSON (with Bearer token in Authorization header):
395 {
396 "old_password": "current_password",
397 "new_password": "new_secure_password"
398 }
399 """
400 auth_service = EmailAuthService(db)
401 ip_address = get_client_ip(request)
402 user_agent = get_user_agent(request)
404 try:
405 # Change password
406 success = await auth_service.change_password(
407 email=current_user.email, old_password=password_request.old_password, new_password=password_request.new_password, ip_address=ip_address, user_agent=user_agent
408 )
410 if success:
411 return SuccessResponse(success=True, message="Password changed successfully")
412 else:
413 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password")
415 except AuthenticationError as e:
416 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
417 except PasswordValidationError as e:
418 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
419 except Exception as e:
420 logger.error(f"Password change error for {current_user.email}: {e}")
421 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Password change service error")
424@email_auth_router.post("/forgot-password", response_model=SuccessResponse)
425async def forgot_password(reset_request: ForgotPasswordRequest, request: Request, db: Session = Depends(get_db)):
426 """Request a one-time password reset token via email.
428 Args:
429 reset_request: Forgot-password request payload.
430 request: Incoming HTTP request.
431 db: Database session dependency.
433 Returns:
434 SuccessResponse: Generic success response to avoid account enumeration.
436 Raises:
437 HTTPException: If password reset is disabled or the request is rate limited.
438 """
439 if not getattr(settings, "password_reset_enabled", True):
440 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled")
442 auth_service = EmailAuthService(db)
443 ip_address = get_client_ip(request)
444 user_agent = get_user_agent(request)
446 result = await auth_service.request_password_reset(email=reset_request.email, ip_address=ip_address, user_agent=user_agent)
447 if result.rate_limited:
448 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests. Please try again later.")
450 return SuccessResponse(success=True, message="If this email is registered, you will receive a reset link.")
453@email_auth_router.get("/reset-password/{token}", response_model=PasswordResetTokenValidationResponse)
454async def validate_password_reset_token(token: str, request: Request, db: Session = Depends(get_db)):
455 """Validate a password reset token before submitting a new password.
457 Args:
458 token: One-time reset token.
459 request: Incoming HTTP request.
460 db: Database session dependency.
462 Returns:
463 PasswordResetTokenValidationResponse: Token validity and expiration data.
465 Raises:
466 HTTPException: If password reset is disabled or token validation fails.
467 """
468 if not getattr(settings, "password_reset_enabled", True):
469 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled")
471 auth_service = EmailAuthService(db)
472 ip_address = get_client_ip(request)
473 user_agent = get_user_agent(request)
475 try:
476 reset_token = await auth_service.validate_password_reset_token(token=token, ip_address=ip_address, user_agent=user_agent)
477 return PasswordResetTokenValidationResponse(valid=True, message="Reset token is valid", expires_at=reset_token.expires_at)
478 except AuthenticationError as exc:
479 detail = str(exc)
480 if "expired" in detail.lower():
481 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail)
482 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
485@email_auth_router.post("/reset-password/{token}", response_model=SuccessResponse)
486async def complete_password_reset(token: str, reset_request: ResetPasswordRequest, request: Request, db: Session = Depends(get_db)):
487 """Complete password reset with a valid one-time token.
489 Args:
490 token: One-time reset token.
491 reset_request: Reset-password payload with new credentials.
492 request: Incoming HTTP request.
493 db: Database session dependency.
495 Returns:
496 SuccessResponse: Password reset completion status.
498 Raises:
499 HTTPException: If password reset is disabled or reset validation fails.
500 """
501 if not getattr(settings, "password_reset_enabled", True):
502 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled")
504 auth_service = EmailAuthService(db)
505 ip_address = get_client_ip(request)
506 user_agent = get_user_agent(request)
508 try:
509 await auth_service.reset_password_with_token(token=token, new_password=reset_request.new_password, ip_address=ip_address, user_agent=user_agent)
510 return SuccessResponse(success=True, message="Password reset successful. Please sign in with your new password.")
511 except AuthenticationError as exc:
512 detail = str(exc)
513 if "expired" in detail.lower():
514 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail)
515 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
516 except PasswordValidationError as exc:
517 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
520@email_auth_router.get("/me", response_model=EmailUserResponse)
521async def get_current_user_profile(current_user: EmailUser = Depends(get_current_user)):
522 """Get current user's profile information.
524 Args:
525 current_user: Currently authenticated user
527 Returns:
528 EmailUserResponse: User profile information
530 Raises:
531 HTTPException: If user authentication fails
533 Examples:
534 >>> # GET /auth/email/me
535 >>> # Headers: Authorization: Bearer <token>
536 """
537 return EmailUserResponse.from_email_user(current_user)
540@email_auth_router.get("/events", response_model=list[AuthEventResponse])
541async def get_auth_events(limit: int = 50, offset: int = 0, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)):
542 """Get authentication events for the current user.
544 Args:
545 limit: Maximum number of events to return
546 offset: Number of events to skip
547 current_user: Currently authenticated user
548 db: Database session
550 Returns:
551 List[AuthEventResponse]: Authentication events
553 Raises:
554 HTTPException: If user authentication fails
556 Examples:
557 >>> # GET /auth/email/events?limit=10&offset=0
558 >>> # Headers: Authorization: Bearer <token>
559 """
560 auth_service = EmailAuthService(db)
562 try:
563 events = await auth_service.get_auth_events(email=current_user.email, limit=limit, offset=offset)
565 return [AuthEventResponse.model_validate(event) for event in events]
567 except Exception as e:
568 logger.error(f"Error getting auth events for {current_user.email}: {e}")
569 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events")
572# Admin-only endpoints
573@email_auth_router.get("/admin/users", response_model=Union[CursorPaginatedUsersResponse, List[EmailUserResponse]])
574@require_permission("admin.user_management")
575async def list_users(
576 cursor: Optional[str] = Query(None, description="Pagination cursor for fetching the next set of results"),
577 limit: Optional[int] = Query(
578 None,
579 ge=0,
580 le=settings.pagination_max_page_size,
581 description="Maximum number of users to return. 0 means all (no limit). Default uses pagination_default_page_size.",
582 ),
583 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
584 current_user_ctx: dict = Depends(get_current_user_with_permissions),
585 db: Session = Depends(get_db),
586) -> Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]:
587 """List all users (admin only) with cursor-based pagination support.
589 Args:
590 cursor: Pagination cursor for fetching the next set of results
591 limit: Maximum number of users to return. Use 0 for all users (no limit).
592 If not specified, uses pagination_default_page_size (default: 50).
593 include_pagination: Whether to include cursor pagination metadata in the response (default: false)
594 current_user_ctx: Currently authenticated user context with permissions
595 db: Database session
597 Returns:
598 CursorPaginatedUsersResponse with users and nextCursor if include_pagination=true, or
599 List of users if include_pagination=false
601 Raises:
602 HTTPException: If user is not admin
604 Examples:
605 >>> # Cursor-based with pagination: GET /auth/email/admin/users?cursor=eyJlbWFpbCI6Li4ufQ&include_pagination=true
606 >>> # Simple list: GET /auth/email/admin/users
607 >>> # Headers: Authorization: Bearer <admin_token>
608 """
609 auth_service = EmailAuthService(db)
611 try:
612 result = await auth_service.list_users(cursor=cursor, limit=limit)
613 user_responses = [EmailUserResponse.from_email_user(user) for user in result.data]
615 if include_pagination:
616 return CursorPaginatedUsersResponse(users=user_responses, next_cursor=result.next_cursor)
618 return user_responses
620 except Exception as e:
621 logger.error(f"Error listing users: {e}")
622 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user list")
625@email_auth_router.get("/admin/events", response_model=list[AuthEventResponse])
626@require_permission("admin.user_management")
627async def list_all_auth_events(limit: int = 100, offset: int = 0, user_email: Optional[str] = None, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
628 """List authentication events for all users (admin only).
630 Args:
631 limit: Maximum number of events to return
632 offset: Number of events to skip
633 user_email: Filter events by specific user email
634 current_user_ctx: Currently authenticated user context with permissions
635 db: Database session
637 Returns:
638 List[AuthEventResponse]: Authentication events
640 Raises:
641 HTTPException: If user is not admin
643 Examples:
644 >>> # GET /auth/email/admin/events?limit=50&user_email=user@example.com
645 >>> # Headers: Authorization: Bearer <admin_token>
646 """
647 auth_service = EmailAuthService(db)
649 try:
650 events = await auth_service.get_auth_events(email=user_email, limit=limit, offset=offset)
652 return [AuthEventResponse.model_validate(event) for event in events]
654 except Exception as e:
655 logger.error(f"Error getting auth events: {e}")
656 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events")
659@email_auth_router.post("/admin/users", response_model=EmailUserResponse, status_code=status.HTTP_201_CREATED)
660@require_permission("admin.user_management")
661async def create_user(user_request: AdminCreateUserRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
662 """Create a new user account (admin only).
664 Args:
665 user_request: User creation information
666 current_user_ctx: Currently authenticated user context with permissions
667 db: Database session
669 Returns:
670 EmailUserResponse: Created user information
672 Raises:
673 HTTPException: If user creation fails
675 Examples:
676 Request JSON:
677 {
678 "email": "newuser@example.com",
679 "password": "secure_password",
680 "full_name": "New User",
681 "is_admin": false
682 }
683 """
684 auth_service = EmailAuthService(db)
686 try:
687 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing
688 # Create new user with all fields from request
689 user = await auth_service.create_user(
690 email=user_request.email,
691 password=user_request.password,
692 full_name=user_request.full_name,
693 is_admin=user_request.is_admin,
694 is_active=user_request.is_active,
695 password_change_required=user_request.password_change_required,
696 auth_provider="local",
697 granted_by=current_user_ctx.get("email"),
698 )
700 # If the user was created with the default password, optionally force password change
701 if (
702 settings.password_change_enforcement_enabled
703 and getattr(settings, "require_password_change_for_default_password", True)
704 and user_request.password == settings.default_user_password.get_secret_value()
705 ): # nosec B105
706 user.password_change_required = True
707 db.commit()
709 logger.info(f"Admin {current_user_ctx['email']} created user: {user.email}")
711 db.commit()
712 db.close()
713 return EmailUserResponse.from_email_user(user)
715 except EmailValidationError as e:
716 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
717 except PasswordValidationError as e:
718 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
719 except UserExistsError as e:
720 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
721 except Exception as e:
722 logger.error(f"Admin user creation error: {e}")
723 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="User creation failed")
726@email_auth_router.get("/admin/users/{user_email}", response_model=EmailUserResponse)
727@require_permission("admin.user_management")
728async def get_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
729 """Get user by email (admin only).
731 Args:
732 user_email: Email of user to retrieve
733 current_user_ctx: Currently authenticated user context with permissions
734 db: Database session
736 Returns:
737 EmailUserResponse: User information
739 Raises:
740 HTTPException: If user not found
741 """
742 auth_service = EmailAuthService(db)
744 try:
745 user = await auth_service.get_user_by_email(user_email)
746 if not user:
747 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
749 return EmailUserResponse.from_email_user(user)
751 except HTTPException:
752 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.)
753 except Exception as e:
754 logger.error(f"Error retrieving user {user_email}: {e}")
755 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user")
758@email_auth_router.put("/admin/users/{user_email}", response_model=EmailUserResponse)
759@require_permission("admin.user_management")
760async def update_user(user_email: str, user_request: AdminUserUpdateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
761 """Update user information (admin only).
763 Args:
764 user_email: Email of user to update
765 user_request: Updated user information
766 current_user_ctx: Currently authenticated user context with permissions
767 db: Database session
769 Returns:
770 EmailUserResponse: Updated user information
772 Raises:
773 HTTPException: If user not found or update fails
774 """
775 auth_service = EmailAuthService(db)
777 try:
778 user = await auth_service.update_user(
779 email=user_email,
780 full_name=user_request.full_name,
781 is_admin=user_request.is_admin,
782 is_active=user_request.is_active,
783 email_verified=user_request.email_verified,
784 password_change_required=user_request.password_change_required,
785 password=user_request.password,
786 admin_origin_source="api",
787 )
789 logger.info(f"Admin {current_user_ctx['email']} updated user: {user.email}")
791 result = EmailUserResponse.from_email_user(user)
792 return result
794 except ValueError as e:
795 error_msg = str(e)
796 if "not found" in error_msg.lower():
797 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
798 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
799 except PasswordValidationError as e:
800 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
801 except Exception as e:
802 logger.error(f"Error updating user {user_email}: {e}")
803 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user")
806@email_auth_router.delete("/admin/users/{user_email}", response_model=SuccessResponse)
807@require_permission("admin.user_management")
808async def delete_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
809 """Delete/deactivate user (admin only).
811 Args:
812 user_email: Email of user to delete
813 current_user_ctx: Currently authenticated user context with permissions
814 db: Database session
816 Returns:
817 SuccessResponse: Success confirmation
819 Raises:
820 HTTPException: If user not found or deletion fails
821 """
822 auth_service = EmailAuthService(db)
824 try:
825 # Prevent admin from deleting themselves
826 if user_email == current_user_ctx["email"]:
827 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account")
829 # Prevent deleting the last active admin user
830 if await auth_service.is_last_active_admin(user_email):
831 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete the last remaining admin user")
833 # Hard delete using auth service
834 await auth_service.delete_user(user_email)
836 logger.info(f"Admin {current_user_ctx['email']} deleted user: {user_email}")
838 db.commit()
839 db.close()
840 return SuccessResponse(success=True, message=f"User {user_email} has been deleted")
842 except HTTPException:
843 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.)
844 except Exception as e:
845 logger.error(f"Error deleting user {user_email}: {e}")
846 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete user")
849@email_auth_router.post("/admin/users/{user_email}/unlock", response_model=SuccessResponse)
850@require_permission("admin.user_management")
851async def unlock_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
852 """Unlock a user account by clearing lockout state and failed login counter.
854 Args:
855 user_email: Email address of the user to unlock.
856 current_user_ctx: Authenticated admin context.
857 db: Database session dependency.
859 Returns:
860 SuccessResponse: Unlock operation result.
862 Raises:
863 HTTPException: If user is missing or unlock operation fails.
864 """
865 auth_service = EmailAuthService(db)
867 try:
868 await auth_service.unlock_user_account(email=user_email, unlocked_by=current_user_ctx.get("email"))
869 return SuccessResponse(success=True, message=f"User {user_email} has been unlocked")
870 except ValueError as exc:
871 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
872 except Exception as exc:
873 logger.error("Failed to unlock user %s: %s", user_email, exc)
874 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to unlock user")