Coverage for mcpgateway / routers / email_auth.py: 100%
309 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/email_auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Email Authentication Router.
8This module provides FastAPI routes for email-based authentication
9including login, registration, password management, and user profile endpoints.
11Examples:
12 >>> from fastapi import FastAPI
13 >>> from mcpgateway.routers.email_auth import email_auth_router
14 >>> app = FastAPI()
15 >>> app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Auth"])
16 >>> isinstance(email_auth_router, APIRouter)
17 True
18"""
20# Standard
21from datetime import datetime, timedelta, UTC
22from typing import List, Optional, Union
24# Third-Party
25from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
26from fastapi.security import HTTPBearer
27from sqlalchemy.orm import Session
29# First-Party
30from mcpgateway.auth import get_current_user
31from mcpgateway.common.validators import SecurityValidator
32from mcpgateway.config import settings
33from mcpgateway.db import EmailUser, SessionLocal, utc_now
34from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
35from mcpgateway.schemas import (
36 AdminCreateUserRequest,
37 AdminUserUpdateRequest,
38 AuthenticationResponse,
39 AuthEventResponse,
40 ChangePasswordRequest,
41 CursorPaginatedUsersResponse,
42 EmailLoginRequest,
43 EmailUserResponse,
44 ForgotPasswordRequest,
45 PasswordResetTokenValidationResponse,
46 PublicRegistrationRequest,
47 ResetPasswordRequest,
48 SuccessResponse,
49)
50from mcpgateway.services.email_auth_service import AuthenticationError, EmailAuthService, EmailValidationError, PasswordValidationError, UserExistsError
51from mcpgateway.services.logging_service import LoggingService
52from mcpgateway.utils.create_jwt_token import create_jwt_token
53from mcpgateway.utils.orjson_response import ORJSONResponse
55# Initialize logging
56logging_service = LoggingService()
57logger = logging_service.get_logger(__name__)
59# Create router
60email_auth_router = APIRouter()
62# Security scheme
63bearer_scheme = HTTPBearer(auto_error=False)
66def get_db():
67 """Database dependency.
69 Commits the transaction on successful completion to avoid implicit rollbacks
70 for read-only operations. Rolls back explicitly on exception.
72 Yields:
73 Session: SQLAlchemy database session
75 Raises:
76 Exception: Re-raises any exception after rolling back the transaction.
77 """
78 db = SessionLocal()
79 try:
80 yield db
81 db.commit()
82 except Exception:
83 try:
84 db.rollback()
85 except Exception:
86 try:
87 db.invalidate()
88 except Exception:
89 pass # nosec B110 - Best effort cleanup on connection failure
90 raise
91 finally:
92 db.close()
95def get_client_ip(request: Request) -> str:
96 """Extract client IP address from request.
98 Args:
99 request: FastAPI request object
101 Returns:
102 str: Client IP address
103 """
104 # Check for X-Forwarded-For header (proxy/load balancer)
105 forwarded_for = request.headers.get("X-Forwarded-For")
106 if forwarded_for:
107 return forwarded_for.split(",")[0].strip()
109 # Check for X-Real-IP header
110 real_ip = request.headers.get("X-Real-IP")
111 if real_ip:
112 return real_ip
114 # Fall back to direct client IP
115 return request.client.host if request.client else "unknown"
118def get_user_agent(request: Request) -> str:
119 """Extract user agent from request.
121 Args:
122 request: FastAPI request object
124 Returns:
125 str: User agent string
126 """
127 return request.headers.get("User-Agent", "unknown")
130async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = None, jti: Optional[str] = None) -> tuple[str, int]:
131 """Create JWT access token for user with enhanced scoping.
133 Args:
134 user: EmailUser instance
135 token_scopes: Optional token scoping information
136 jti: Optional JWT ID for revocation tracking
138 Returns:
139 Tuple of (token_string, expires_in_seconds)
140 """
141 now = datetime.now(tz=UTC)
142 expires_delta = timedelta(minutes=settings.token_expiry)
143 expire = now + expires_delta
145 # Create JWT payload — session token (teams resolved server-side at request time)
146 payload = {
147 # Standard JWT claims
148 "sub": user.email,
149 "iss": settings.jwt_issuer,
150 "aud": settings.jwt_audience,
151 "iat": int(now.timestamp()),
152 "exp": int(expire.timestamp()),
153 "jti": jti or str(__import__("uuid").uuid4()),
154 # User profile information
155 "user": {
156 "email": str(getattr(user, "email", "")),
157 "full_name": str(getattr(user, "full_name", "")),
158 "is_admin": bool(getattr(user, "is_admin", False)),
159 "auth_provider": str(getattr(user, "auth_provider", "local")),
160 },
161 "token_use": "session", # nosec B105 - token type marker, not a password
162 # Token scoping (if provided)
163 "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}},
164 }
166 # Generate token using centralized token creation
167 token = await create_jwt_token(payload)
169 return token, int(expires_delta.total_seconds())
172async def create_legacy_access_token(user: EmailUser) -> tuple[str, int]:
173 """Create legacy JWT access token for backwards compatibility.
175 Args:
176 user: EmailUser instance
178 Returns:
179 Tuple of (token_string, expires_in_seconds)
180 """
181 now = datetime.now(tz=UTC)
182 expires_delta = timedelta(minutes=settings.token_expiry)
183 expire = now + expires_delta
185 # Create simple JWT payload (original format) with primitives only
186 payload = {
187 "sub": str(getattr(user, "email", "")),
188 "email": str(getattr(user, "email", "")),
189 "full_name": str(getattr(user, "full_name", "")),
190 "is_admin": bool(getattr(user, "is_admin", False)),
191 "auth_provider": str(getattr(user, "auth_provider", "local")),
192 "iat": int(now.timestamp()),
193 "exp": int(expire.timestamp()),
194 "iss": settings.jwt_issuer,
195 "aud": settings.jwt_audience,
196 }
198 # Generate token using centralized token creation
199 token = await create_jwt_token(payload)
201 return token, int(expires_delta.total_seconds())
204@email_auth_router.post("/login", response_model=AuthenticationResponse)
205async def login(login_request: EmailLoginRequest, request: Request, db: Session = Depends(get_db)):
206 """Authenticate user with email and password.
208 Args:
209 login_request: Login credentials
210 request: FastAPI request object
211 db: Database session
213 Returns:
214 AuthenticationResponse: Access token and user info
216 Examples:
217 >>> import asyncio
218 >>> asyncio.iscoroutinefunction(login)
219 True
221 Raises:
222 HTTPException: If authentication fails
224 Examples:
225 Request JSON:
226 {
227 "email": "user@example.com",
228 "password": "secure_password"
229 }
230 """
231 auth_service = EmailAuthService(db)
232 ip_address = get_client_ip(request)
233 user_agent = get_user_agent(request)
235 try:
236 # Authenticate user
237 user = await auth_service.authenticate_user(email=login_request.email, password=login_request.password, ip_address=ip_address, user_agent=user_agent)
239 if not user:
240 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
242 # Password change enforcement respects master switch and individual toggles
243 needs_password_change = False
245 if settings.password_change_enforcement_enabled:
246 # If flag is set on the user, always honor it (flag is cleared when password is changed)
247 if getattr(user, "password_change_required", False):
248 needs_password_change = True
249 logger.debug("User %s has password_change_required flag set", login_request.email)
251 # Enforce expiry-based password change if configured and not already required
252 if not needs_password_change:
253 try:
254 pwd_changed = getattr(user, "password_changed_at", None)
255 if isinstance(pwd_changed, datetime):
256 age_days = (utc_now() - pwd_changed).days
257 max_age = getattr(settings, "password_max_age_days", 90)
258 if age_days >= max_age:
259 needs_password_change = True
260 logger.debug("User %s password expired (%s days >= %s)", login_request.email, age_days, max_age)
261 except Exception as exc:
262 logger.debug("Failed to evaluate password age for %s: %s", login_request.email, exc)
264 # Detect default password on login if enabled
265 if getattr(settings, "detect_default_password_on_login", True):
266 # First-Party
267 from mcpgateway.services.argon2_service import Argon2PasswordService
269 password_service = Argon2PasswordService()
270 is_using_default_password = await password_service.verify_password_async(settings.default_user_password.get_secret_value(), user.password_hash) # nosec B105
271 if is_using_default_password:
272 # Mark user for password change depending on configuration
273 if getattr(settings, "require_password_change_for_default_password", True):
274 user.password_change_required = True
275 needs_password_change = True
276 try:
277 db.commit()
278 except Exception as exc: # log commit failures
279 logger.warning("Failed to commit password_change_required flag for %s: %s", login_request.email, exc)
280 else:
281 logger.info("User %s is using default password but enforcement is disabled", login_request.email)
283 if needs_password_change:
284 logger.info(f"Login blocked for {SecurityValidator.sanitize_log_message(login_request.email)}: password change required")
285 return ORJSONResponse(
286 status_code=status.HTTP_403_FORBIDDEN,
287 content={"detail": "Password change required. Please change your password before continuing."},
288 headers={"X-Password-Change-Required": "true"},
289 )
291 # Create access token
292 access_token, expires_in = await create_access_token(user)
294 # Return authentication response
295 return AuthenticationResponse(
296 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
297 ) # nosec B106 - OAuth2 token type, not a password
299 except HTTPException:
300 raise # Re-raise HTTP exceptions as-is (401, 403, etc.)
301 except Exception as e:
302 logger.error(f"Login error for {SecurityValidator.sanitize_log_message(login_request.email)}: {e}")
303 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")
306@email_auth_router.post("/register", response_model=AuthenticationResponse)
307async def register(registration_request: PublicRegistrationRequest, request: Request, db: Session = Depends(get_db)):
308 """Register a new user account.
310 This endpoint is controlled by the PUBLIC_REGISTRATION_ENABLED setting.
311 When disabled (default), returns 403 Forbidden and users can only be
312 created by administrators via the admin API.
314 Args:
315 registration_request: Registration information (email, password, full_name only)
316 request: FastAPI request object
317 db: Database session
319 Returns:
320 AuthenticationResponse: Access token and user info
322 Raises:
323 HTTPException: If registration fails or is disabled
325 Examples:
326 Request JSON:
327 {
328 "email": "new@example.com",
329 "password": "secure_password",
330 "full_name": "New User"
331 }
332 """
333 # Check if public registration is allowed
334 if not settings.public_registration_enabled:
335 logger.warning(f"Registration attempt rejected - public registration disabled: {SecurityValidator.sanitize_log_message(registration_request.email)}")
336 raise HTTPException(
337 status_code=status.HTTP_403_FORBIDDEN,
338 detail="Public registration is disabled. Please contact an administrator to create an account.",
339 )
341 auth_service = EmailAuthService(db)
342 get_client_ip(request)
343 get_user_agent(request)
345 try:
346 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing
347 # Security-sensitive fields are hardcoded (not exposed on public schema)
348 user = await auth_service.create_user(
349 email=registration_request.email,
350 password=registration_request.password,
351 full_name=registration_request.full_name,
352 is_admin=False, # Regular users cannot self-register as admin
353 is_active=True, # Public registrations are always active
354 password_change_required=False, # No forced password change for self-registration
355 auth_provider="local",
356 )
358 # Create access token
359 access_token, expires_in = await create_access_token(user)
361 logger.info(f"New user registered: {SecurityValidator.sanitize_log_message(user.email)}")
363 return AuthenticationResponse(
364 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
365 ) # nosec B106 - OAuth2 token type, not a password
367 except EmailValidationError as e:
368 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
369 except PasswordValidationError as e:
370 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
371 except UserExistsError as e:
372 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
373 except Exception as e:
374 logger.error(f"Registration error for {SecurityValidator.sanitize_log_message(registration_request.email)}: {e}")
375 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration service error")
378@email_auth_router.post("/change-password", response_model=SuccessResponse)
379async def change_password(password_request: ChangePasswordRequest, request: Request, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)):
380 """Change user's password.
382 Args:
383 password_request: Old and new passwords
384 request: FastAPI request object
385 current_user: Currently authenticated user
386 db: Database session
388 Returns:
389 SuccessResponse: Success confirmation
391 Raises:
392 HTTPException: If password change fails
394 Examples:
395 Request JSON (with Bearer token in Authorization header):
396 {
397 "old_password": "current_password",
398 "new_password": "new_secure_password"
399 }
400 """
401 auth_service = EmailAuthService(db)
402 ip_address = get_client_ip(request)
403 user_agent = get_user_agent(request)
405 try:
406 # Change password
407 success = await auth_service.change_password(
408 email=current_user.email, old_password=password_request.old_password, new_password=password_request.new_password, ip_address=ip_address, user_agent=user_agent
409 )
411 if success:
412 return SuccessResponse(success=True, message="Password changed successfully")
413 else:
414 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password")
416 except AuthenticationError as e:
417 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
418 except PasswordValidationError as e:
419 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
420 except Exception as e:
421 logger.error(f"Password change error for {SecurityValidator.sanitize_log_message(current_user.email)}: {e}")
422 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Password change service error")
425@email_auth_router.post("/forgot-password", response_model=SuccessResponse)
426async def forgot_password(reset_request: ForgotPasswordRequest, request: Request, db: Session = Depends(get_db)):
427 """Request a one-time password reset token via email.
429 Args:
430 reset_request: Forgot-password request payload.
431 request: Incoming HTTP request.
432 db: Database session dependency.
434 Returns:
435 SuccessResponse: Generic success response to avoid account enumeration.
437 Raises:
438 HTTPException: If password reset is disabled or the request is rate limited.
439 """
440 if not getattr(settings, "password_reset_enabled", True):
441 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled")
443 auth_service = EmailAuthService(db)
444 ip_address = get_client_ip(request)
445 user_agent = get_user_agent(request)
447 result = await auth_service.request_password_reset(email=reset_request.email, ip_address=ip_address, user_agent=user_agent)
448 if result.rate_limited:
449 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests. Please try again later.")
451 return SuccessResponse(success=True, message="If this email is registered, you will receive a reset link.")
454@email_auth_router.get("/reset-password/{token}", response_model=PasswordResetTokenValidationResponse)
455async def validate_password_reset_token(token: str, request: Request, db: Session = Depends(get_db)):
456 """Validate a password reset token before submitting a new password.
458 Args:
459 token: One-time reset token.
460 request: Incoming HTTP request.
461 db: Database session dependency.
463 Returns:
464 PasswordResetTokenValidationResponse: Token validity and expiration data.
466 Raises:
467 HTTPException: If password reset is disabled or token validation fails.
468 """
469 if not getattr(settings, "password_reset_enabled", True):
470 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled")
472 auth_service = EmailAuthService(db)
473 ip_address = get_client_ip(request)
474 user_agent = get_user_agent(request)
476 try:
477 reset_token = await auth_service.validate_password_reset_token(token=token, ip_address=ip_address, user_agent=user_agent)
478 return PasswordResetTokenValidationResponse(valid=True, message="Reset token is valid", expires_at=reset_token.expires_at)
479 except AuthenticationError as exc:
480 detail = str(exc)
481 if "expired" in detail.lower():
482 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail)
483 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
486@email_auth_router.post("/reset-password/{token}", response_model=SuccessResponse)
487async def complete_password_reset(token: str, reset_request: ResetPasswordRequest, request: Request, db: Session = Depends(get_db)):
488 """Complete password reset with a valid one-time token.
490 Args:
491 token: One-time reset token.
492 reset_request: Reset-password payload with new credentials.
493 request: Incoming HTTP request.
494 db: Database session dependency.
496 Returns:
497 SuccessResponse: Password reset completion status.
499 Raises:
500 HTTPException: If password reset is disabled or reset validation fails.
501 """
502 if not getattr(settings, "password_reset_enabled", True):
503 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled")
505 auth_service = EmailAuthService(db)
506 ip_address = get_client_ip(request)
507 user_agent = get_user_agent(request)
509 try:
510 await auth_service.reset_password_with_token(token=token, new_password=reset_request.new_password, ip_address=ip_address, user_agent=user_agent)
511 return SuccessResponse(success=True, message="Password reset successful. Please sign in with your new password.")
512 except AuthenticationError as exc:
513 detail = str(exc)
514 if "expired" in detail.lower():
515 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail)
516 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
517 except PasswordValidationError as exc:
518 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
521@email_auth_router.get("/me", response_model=EmailUserResponse)
522async def get_current_user_profile(current_user: EmailUser = Depends(get_current_user)):
523 """Get current user's profile information.
525 Args:
526 current_user: Currently authenticated user
528 Returns:
529 EmailUserResponse: User profile information
531 Raises:
532 HTTPException: If user authentication fails
534 Examples:
535 >>> # GET /auth/email/me
536 >>> # Headers: Authorization: Bearer <token>
537 """
538 return EmailUserResponse.from_email_user(current_user)
541@email_auth_router.get("/events", response_model=list[AuthEventResponse])
542async def get_auth_events(limit: int = 50, offset: int = 0, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)):
543 """Get authentication events for the current user.
545 Args:
546 limit: Maximum number of events to return
547 offset: Number of events to skip
548 current_user: Currently authenticated user
549 db: Database session
551 Returns:
552 List[AuthEventResponse]: Authentication events
554 Raises:
555 HTTPException: If user authentication fails
557 Examples:
558 >>> # GET /auth/email/events?limit=10&offset=0
559 >>> # Headers: Authorization: Bearer <token>
560 """
561 auth_service = EmailAuthService(db)
563 try:
564 events = await auth_service.get_auth_events(email=current_user.email, limit=limit, offset=offset)
566 return [AuthEventResponse.model_validate(event) for event in events]
568 except Exception as e:
569 logger.error(f"Error getting auth events for {SecurityValidator.sanitize_log_message(current_user.email)}: {e}")
570 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events")
573# Admin-only endpoints
574@email_auth_router.get("/admin/users", response_model=Union[CursorPaginatedUsersResponse, List[EmailUserResponse]])
575@require_permission("admin.user_management")
576async def list_users(
577 cursor: Optional[str] = Query(None, description="Pagination cursor for fetching the next set of results"),
578 limit: Optional[int] = Query(
579 None,
580 ge=0,
581 le=settings.pagination_max_page_size,
582 description="Maximum number of users to return. 0 means all (no limit). Default uses pagination_default_page_size.",
583 ),
584 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
585 current_user_ctx: dict = Depends(get_current_user_with_permissions),
586 db: Session = Depends(get_db),
587) -> Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]:
588 """List all users (admin only) with cursor-based pagination support.
590 Args:
591 cursor: Pagination cursor for fetching the next set of results
592 limit: Maximum number of users to return. Use 0 for all users (no limit).
593 If not specified, uses pagination_default_page_size (default: 50).
594 include_pagination: Whether to include cursor pagination metadata in the response (default: false)
595 current_user_ctx: Currently authenticated user context with permissions
596 db: Database session
598 Returns:
599 CursorPaginatedUsersResponse with users and nextCursor if include_pagination=true, or
600 List of users if include_pagination=false
602 Raises:
603 HTTPException: If user is not admin
605 Examples:
606 >>> # Cursor-based with pagination: GET /auth/email/admin/users?cursor=eyJlbWFpbCI6Li4ufQ&include_pagination=true
607 >>> # Simple list: GET /auth/email/admin/users
608 >>> # Headers: Authorization: Bearer <admin_token>
609 """
610 auth_service = EmailAuthService(db)
612 try:
613 result = await auth_service.list_users(cursor=cursor, limit=limit)
614 user_responses = [EmailUserResponse.from_email_user(user) for user in result.data]
616 if include_pagination:
617 return CursorPaginatedUsersResponse(users=user_responses, next_cursor=result.next_cursor)
619 return user_responses
621 except Exception as e:
622 logger.error(f"Error listing users: {e}")
623 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user list")
626@email_auth_router.get("/admin/events", response_model=list[AuthEventResponse])
627@require_permission("admin.user_management")
628async def list_all_auth_events(limit: int = 100, offset: int = 0, user_email: Optional[str] = None, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
629 """List authentication events for all users (admin only).
631 Args:
632 limit: Maximum number of events to return
633 offset: Number of events to skip
634 user_email: Filter events by specific user email
635 current_user_ctx: Currently authenticated user context with permissions
636 db: Database session
638 Returns:
639 List[AuthEventResponse]: Authentication events
641 Raises:
642 HTTPException: If user is not admin
644 Examples:
645 >>> # GET /auth/email/admin/events?limit=50&user_email=user@example.com
646 >>> # Headers: Authorization: Bearer <admin_token>
647 """
648 auth_service = EmailAuthService(db)
650 try:
651 events = await auth_service.get_auth_events(email=user_email, limit=limit, offset=offset)
653 return [AuthEventResponse.model_validate(event) for event in events]
655 except Exception as e:
656 logger.error(f"Error getting auth events: {e}")
657 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events")
660@email_auth_router.post("/admin/users", response_model=EmailUserResponse, status_code=status.HTTP_201_CREATED)
661@require_permission("admin.user_management")
662async def create_user(user_request: AdminCreateUserRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
663 """Create a new user account (admin only).
665 Args:
666 user_request: User creation information
667 current_user_ctx: Currently authenticated user context with permissions
668 db: Database session
670 Returns:
671 EmailUserResponse: Created user information
673 Raises:
674 HTTPException: If user creation fails
676 Examples:
677 Request JSON:
678 {
679 "email": "newuser@example.com",
680 "password": "secure_password",
681 "full_name": "New User",
682 "is_admin": false
683 }
684 """
685 auth_service = EmailAuthService(db)
687 try:
688 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing
689 # Create new user with all fields from request
690 user = await auth_service.create_user(
691 email=user_request.email,
692 password=user_request.password,
693 full_name=user_request.full_name,
694 is_admin=user_request.is_admin,
695 is_active=user_request.is_active,
696 password_change_required=user_request.password_change_required,
697 auth_provider="local",
698 granted_by=current_user_ctx.get("email"),
699 )
701 # If the user was created with the default password, optionally force password change
702 if (
703 settings.password_change_enforcement_enabled
704 and getattr(settings, "require_password_change_for_default_password", True)
705 and user_request.password == settings.default_user_password.get_secret_value()
706 ): # nosec B105
707 user.password_change_required = True
708 db.commit()
710 logger.info(f"Admin {SecurityValidator.sanitize_log_message(current_user_ctx['email'])} created user: {SecurityValidator.sanitize_log_message(user.email)}")
712 db.commit()
713 db.close()
714 return EmailUserResponse.from_email_user(user)
716 except EmailValidationError as e:
717 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
718 except PasswordValidationError as e:
719 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
720 except UserExistsError as e:
721 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
722 except Exception as e:
723 logger.error(f"Admin user creation error: {e}")
724 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="User creation failed")
727@email_auth_router.get("/admin/users/{user_email}", response_model=EmailUserResponse)
728@require_permission("admin.user_management")
729async def get_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
730 """Get user by email (admin only).
732 Args:
733 user_email: Email of user to retrieve
734 current_user_ctx: Currently authenticated user context with permissions
735 db: Database session
737 Returns:
738 EmailUserResponse: User information
740 Raises:
741 HTTPException: If user not found
742 """
743 auth_service = EmailAuthService(db)
745 try:
746 user = await auth_service.get_user_by_email(user_email)
747 if not user:
748 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
750 return EmailUserResponse.from_email_user(user)
752 except HTTPException:
753 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.)
754 except Exception as e:
755 logger.error(f"Error retrieving user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
756 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user")
759@email_auth_router.put("/admin/users/{user_email}", response_model=EmailUserResponse)
760@require_permission("admin.user_management")
761async def update_user(user_email: str, user_request: AdminUserUpdateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
762 """Update user information (admin only).
764 Args:
765 user_email: Email of user to update
766 user_request: Updated user information
767 current_user_ctx: Currently authenticated user context with permissions
768 db: Database session
770 Returns:
771 EmailUserResponse: Updated user information
773 Raises:
774 HTTPException: If user not found or update fails
775 """
776 auth_service = EmailAuthService(db)
778 try:
779 user = await auth_service.update_user(
780 email=user_email,
781 full_name=user_request.full_name,
782 is_admin=user_request.is_admin,
783 is_active=user_request.is_active,
784 email_verified=user_request.email_verified,
785 password_change_required=user_request.password_change_required,
786 password=user_request.password,
787 admin_origin_source="api",
788 )
790 logger.info(f"Admin {SecurityValidator.sanitize_log_message(current_user_ctx['email'])} updated user: {SecurityValidator.sanitize_log_message(user.email)}")
792 result = EmailUserResponse.from_email_user(user)
793 return result
795 except ValueError as e:
796 error_msg = str(e)
797 if "not found" in error_msg.lower():
798 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
799 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
800 except PasswordValidationError as e:
801 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
802 except Exception as e:
803 logger.error(f"Error updating user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
804 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user")
807@email_auth_router.delete("/admin/users/{user_email}", response_model=SuccessResponse)
808@require_permission("admin.user_management")
809async def delete_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
810 """Delete/deactivate user (admin only).
812 Args:
813 user_email: Email of user to delete
814 current_user_ctx: Currently authenticated user context with permissions
815 db: Database session
817 Returns:
818 SuccessResponse: Success confirmation
820 Raises:
821 HTTPException: If user not found or deletion fails
822 """
823 auth_service = EmailAuthService(db)
825 try:
826 # Prevent admin from deleting themselves
827 if user_email == current_user_ctx["email"]:
828 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account")
830 # Prevent deleting the last active admin user
831 if await auth_service.is_last_active_admin(user_email):
832 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete the last remaining admin user")
834 # Hard delete using auth service
835 await auth_service.delete_user(user_email)
837 logger.info(f"Admin {SecurityValidator.sanitize_log_message(current_user_ctx['email'])} deleted user: {SecurityValidator.sanitize_log_message(user_email)}")
839 db.commit()
840 db.close()
841 return SuccessResponse(success=True, message=f"User {user_email} has been deleted")
843 except HTTPException:
844 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.)
845 except Exception as e:
846 logger.error(f"Error deleting user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
847 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete user")
850@email_auth_router.post("/admin/users/{user_email}/unlock", response_model=SuccessResponse)
851@require_permission("admin.user_management")
852async def unlock_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
853 """Unlock a user account by clearing lockout state and failed login counter.
855 Args:
856 user_email: Email address of the user to unlock.
857 current_user_ctx: Authenticated admin context.
858 db: Database session dependency.
860 Returns:
861 SuccessResponse: Unlock operation result.
863 Raises:
864 HTTPException: If user is missing or unlock operation fails.
865 """
866 auth_service = EmailAuthService(db)
868 try:
869 await auth_service.unlock_user_account(email=user_email, unlocked_by=current_user_ctx.get("email"))
870 return SuccessResponse(success=True, message=f"User {user_email} has been unlocked")
871 except ValueError as exc:
872 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
873 except Exception as exc:
874 logger.error("Failed to unlock user %s: %s", user_email, exc)
875 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to unlock user")