Coverage for mcpgateway / routers / email_auth.py: 100%

309 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/email_auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Email Authentication Router. 

8This module provides FastAPI routes for email-based authentication 

9including login, registration, password management, and user profile endpoints. 

10 

11Examples: 

12 >>> from fastapi import FastAPI 

13 >>> from mcpgateway.routers.email_auth import email_auth_router 

14 >>> app = FastAPI() 

15 >>> app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Auth"]) 

16 >>> isinstance(email_auth_router, APIRouter) 

17 True 

18""" 

19 

20# Standard 

21from datetime import datetime, timedelta, UTC 

22from typing import List, Optional, Union 

23 

24# Third-Party 

25from fastapi import APIRouter, Depends, HTTPException, Query, Request, status 

26from fastapi.security import HTTPBearer 

27from sqlalchemy.orm import Session 

28 

29# First-Party 

30from mcpgateway.auth import get_current_user 

31from mcpgateway.common.validators import SecurityValidator 

32from mcpgateway.config import settings 

33from mcpgateway.db import EmailUser, SessionLocal, utc_now 

34from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

35from mcpgateway.schemas import ( 

36 AdminCreateUserRequest, 

37 AdminUserUpdateRequest, 

38 AuthenticationResponse, 

39 AuthEventResponse, 

40 ChangePasswordRequest, 

41 CursorPaginatedUsersResponse, 

42 EmailLoginRequest, 

43 EmailUserResponse, 

44 ForgotPasswordRequest, 

45 PasswordResetTokenValidationResponse, 

46 PublicRegistrationRequest, 

47 ResetPasswordRequest, 

48 SuccessResponse, 

49) 

50from mcpgateway.services.email_auth_service import AuthenticationError, EmailAuthService, EmailValidationError, PasswordValidationError, UserExistsError 

51from mcpgateway.services.logging_service import LoggingService 

52from mcpgateway.utils.create_jwt_token import create_jwt_token 

53from mcpgateway.utils.orjson_response import ORJSONResponse 

54 

55# Initialize logging 

56logging_service = LoggingService() 

57logger = logging_service.get_logger(__name__) 

58 

59# Create router 

60email_auth_router = APIRouter() 

61 

62# Security scheme 

63bearer_scheme = HTTPBearer(auto_error=False) 

64 

65 

66def get_db(): 

67 """Database dependency. 

68 

69 Commits the transaction on successful completion to avoid implicit rollbacks 

70 for read-only operations. Rolls back explicitly on exception. 

71 

72 Yields: 

73 Session: SQLAlchemy database session 

74 

75 Raises: 

76 Exception: Re-raises any exception after rolling back the transaction. 

77 """ 

78 db = SessionLocal() 

79 try: 

80 yield db 

81 db.commit() 

82 except Exception: 

83 try: 

84 db.rollback() 

85 except Exception: 

86 try: 

87 db.invalidate() 

88 except Exception: 

89 pass # nosec B110 - Best effort cleanup on connection failure 

90 raise 

91 finally: 

92 db.close() 

93 

94 

95def get_client_ip(request: Request) -> str: 

96 """Extract client IP address from request. 

97 

98 Args: 

99 request: FastAPI request object 

100 

101 Returns: 

102 str: Client IP address 

103 """ 

104 # Check for X-Forwarded-For header (proxy/load balancer) 

105 forwarded_for = request.headers.get("X-Forwarded-For") 

106 if forwarded_for: 

107 return forwarded_for.split(",")[0].strip() 

108 

109 # Check for X-Real-IP header 

110 real_ip = request.headers.get("X-Real-IP") 

111 if real_ip: 

112 return real_ip 

113 

114 # Fall back to direct client IP 

115 return request.client.host if request.client else "unknown" 

116 

117 

118def get_user_agent(request: Request) -> str: 

119 """Extract user agent from request. 

120 

121 Args: 

122 request: FastAPI request object 

123 

124 Returns: 

125 str: User agent string 

126 """ 

127 return request.headers.get("User-Agent", "unknown") 

128 

129 

130async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = None, jti: Optional[str] = None) -> tuple[str, int]: 

131 """Create JWT access token for user with enhanced scoping. 

132 

133 Args: 

134 user: EmailUser instance 

135 token_scopes: Optional token scoping information 

136 jti: Optional JWT ID for revocation tracking 

137 

138 Returns: 

139 Tuple of (token_string, expires_in_seconds) 

140 """ 

141 now = datetime.now(tz=UTC) 

142 expires_delta = timedelta(minutes=settings.token_expiry) 

143 expire = now + expires_delta 

144 

145 # Create JWT payload — session token (teams resolved server-side at request time) 

146 payload = { 

147 # Standard JWT claims 

148 "sub": user.email, 

149 "iss": settings.jwt_issuer, 

150 "aud": settings.jwt_audience, 

151 "iat": int(now.timestamp()), 

152 "exp": int(expire.timestamp()), 

153 "jti": jti or str(__import__("uuid").uuid4()), 

154 # User profile information 

155 "user": { 

156 "email": str(getattr(user, "email", "")), 

157 "full_name": str(getattr(user, "full_name", "")), 

158 "is_admin": bool(getattr(user, "is_admin", False)), 

159 "auth_provider": str(getattr(user, "auth_provider", "local")), 

160 }, 

161 "token_use": "session", # nosec B105 - token type marker, not a password 

162 # Token scoping (if provided) 

163 "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}}, 

164 } 

165 

166 # Generate token using centralized token creation 

167 token = await create_jwt_token(payload) 

168 

169 return token, int(expires_delta.total_seconds()) 

170 

171 

172async def create_legacy_access_token(user: EmailUser) -> tuple[str, int]: 

173 """Create legacy JWT access token for backwards compatibility. 

174 

175 Args: 

176 user: EmailUser instance 

177 

178 Returns: 

179 Tuple of (token_string, expires_in_seconds) 

180 """ 

181 now = datetime.now(tz=UTC) 

182 expires_delta = timedelta(minutes=settings.token_expiry) 

183 expire = now + expires_delta 

184 

185 # Create simple JWT payload (original format) with primitives only 

186 payload = { 

187 "sub": str(getattr(user, "email", "")), 

188 "email": str(getattr(user, "email", "")), 

189 "full_name": str(getattr(user, "full_name", "")), 

190 "is_admin": bool(getattr(user, "is_admin", False)), 

191 "auth_provider": str(getattr(user, "auth_provider", "local")), 

192 "iat": int(now.timestamp()), 

193 "exp": int(expire.timestamp()), 

194 "iss": settings.jwt_issuer, 

195 "aud": settings.jwt_audience, 

196 } 

197 

198 # Generate token using centralized token creation 

199 token = await create_jwt_token(payload) 

200 

201 return token, int(expires_delta.total_seconds()) 

202 

203 

204@email_auth_router.post("/login", response_model=AuthenticationResponse) 

205async def login(login_request: EmailLoginRequest, request: Request, db: Session = Depends(get_db)): 

206 """Authenticate user with email and password. 

207 

208 Args: 

209 login_request: Login credentials 

210 request: FastAPI request object 

211 db: Database session 

212 

213 Returns: 

214 AuthenticationResponse: Access token and user info 

215 

216 Examples: 

217 >>> import asyncio 

218 >>> asyncio.iscoroutinefunction(login) 

219 True 

220 

221 Raises: 

222 HTTPException: If authentication fails 

223 

224 Examples: 

225 Request JSON: 

226 { 

227 "email": "user@example.com", 

228 "password": "secure_password" 

229 } 

230 """ 

231 auth_service = EmailAuthService(db) 

232 ip_address = get_client_ip(request) 

233 user_agent = get_user_agent(request) 

234 

235 try: 

236 # Authenticate user 

237 user = await auth_service.authenticate_user(email=login_request.email, password=login_request.password, ip_address=ip_address, user_agent=user_agent) 

238 

239 if not user: 

240 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") 

241 

242 # Password change enforcement respects master switch and individual toggles 

243 needs_password_change = False 

244 

245 if settings.password_change_enforcement_enabled: 

246 # If flag is set on the user, always honor it (flag is cleared when password is changed) 

247 if getattr(user, "password_change_required", False): 

248 needs_password_change = True 

249 logger.debug("User %s has password_change_required flag set", login_request.email) 

250 

251 # Enforce expiry-based password change if configured and not already required 

252 if not needs_password_change: 

253 try: 

254 pwd_changed = getattr(user, "password_changed_at", None) 

255 if isinstance(pwd_changed, datetime): 

256 age_days = (utc_now() - pwd_changed).days 

257 max_age = getattr(settings, "password_max_age_days", 90) 

258 if age_days >= max_age: 

259 needs_password_change = True 

260 logger.debug("User %s password expired (%s days >= %s)", login_request.email, age_days, max_age) 

261 except Exception as exc: 

262 logger.debug("Failed to evaluate password age for %s: %s", login_request.email, exc) 

263 

264 # Detect default password on login if enabled 

265 if getattr(settings, "detect_default_password_on_login", True): 

266 # First-Party 

267 from mcpgateway.services.argon2_service import Argon2PasswordService 

268 

269 password_service = Argon2PasswordService() 

270 is_using_default_password = await password_service.verify_password_async(settings.default_user_password.get_secret_value(), user.password_hash) # nosec B105 

271 if is_using_default_password: 

272 # Mark user for password change depending on configuration 

273 if getattr(settings, "require_password_change_for_default_password", True): 

274 user.password_change_required = True 

275 needs_password_change = True 

276 try: 

277 db.commit() 

278 except Exception as exc: # log commit failures 

279 logger.warning("Failed to commit password_change_required flag for %s: %s", login_request.email, exc) 

280 else: 

281 logger.info("User %s is using default password but enforcement is disabled", login_request.email) 

282 

283 if needs_password_change: 

284 logger.info(f"Login blocked for {SecurityValidator.sanitize_log_message(login_request.email)}: password change required") 

285 return ORJSONResponse( 

286 status_code=status.HTTP_403_FORBIDDEN, 

287 content={"detail": "Password change required. Please change your password before continuing."}, 

288 headers={"X-Password-Change-Required": "true"}, 

289 ) 

290 

291 # Create access token 

292 access_token, expires_in = await create_access_token(user) 

293 

294 # Return authentication response 

295 return AuthenticationResponse( 

296 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

297 ) # nosec B106 - OAuth2 token type, not a password 

298 

299 except HTTPException: 

300 raise # Re-raise HTTP exceptions as-is (401, 403, etc.) 

301 except Exception as e: 

302 logger.error(f"Login error for {SecurityValidator.sanitize_log_message(login_request.email)}: {e}") 

303 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error") 

304 

305 

306@email_auth_router.post("/register", response_model=AuthenticationResponse) 

307async def register(registration_request: PublicRegistrationRequest, request: Request, db: Session = Depends(get_db)): 

308 """Register a new user account. 

309 

310 This endpoint is controlled by the PUBLIC_REGISTRATION_ENABLED setting. 

311 When disabled (default), returns 403 Forbidden and users can only be 

312 created by administrators via the admin API. 

313 

314 Args: 

315 registration_request: Registration information (email, password, full_name only) 

316 request: FastAPI request object 

317 db: Database session 

318 

319 Returns: 

320 AuthenticationResponse: Access token and user info 

321 

322 Raises: 

323 HTTPException: If registration fails or is disabled 

324 

325 Examples: 

326 Request JSON: 

327 { 

328 "email": "new@example.com", 

329 "password": "secure_password", 

330 "full_name": "New User" 

331 } 

332 """ 

333 # Check if public registration is allowed 

334 if not settings.public_registration_enabled: 

335 logger.warning(f"Registration attempt rejected - public registration disabled: {SecurityValidator.sanitize_log_message(registration_request.email)}") 

336 raise HTTPException( 

337 status_code=status.HTTP_403_FORBIDDEN, 

338 detail="Public registration is disabled. Please contact an administrator to create an account.", 

339 ) 

340 

341 auth_service = EmailAuthService(db) 

342 get_client_ip(request) 

343 get_user_agent(request) 

344 

345 try: 

346 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing 

347 # Security-sensitive fields are hardcoded (not exposed on public schema) 

348 user = await auth_service.create_user( 

349 email=registration_request.email, 

350 password=registration_request.password, 

351 full_name=registration_request.full_name, 

352 is_admin=False, # Regular users cannot self-register as admin 

353 is_active=True, # Public registrations are always active 

354 password_change_required=False, # No forced password change for self-registration 

355 auth_provider="local", 

356 ) 

357 

358 # Create access token 

359 access_token, expires_in = await create_access_token(user) 

360 

361 logger.info(f"New user registered: {SecurityValidator.sanitize_log_message(user.email)}") 

362 

363 return AuthenticationResponse( 

364 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

365 ) # nosec B106 - OAuth2 token type, not a password 

366 

367 except EmailValidationError as e: 

368 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

369 except PasswordValidationError as e: 

370 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

371 except UserExistsError as e: 

372 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) 

373 except Exception as e: 

374 logger.error(f"Registration error for {SecurityValidator.sanitize_log_message(registration_request.email)}: {e}") 

375 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration service error") 

376 

377 

378@email_auth_router.post("/change-password", response_model=SuccessResponse) 

379async def change_password(password_request: ChangePasswordRequest, request: Request, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)): 

380 """Change user's password. 

381 

382 Args: 

383 password_request: Old and new passwords 

384 request: FastAPI request object 

385 current_user: Currently authenticated user 

386 db: Database session 

387 

388 Returns: 

389 SuccessResponse: Success confirmation 

390 

391 Raises: 

392 HTTPException: If password change fails 

393 

394 Examples: 

395 Request JSON (with Bearer token in Authorization header): 

396 { 

397 "old_password": "current_password", 

398 "new_password": "new_secure_password" 

399 } 

400 """ 

401 auth_service = EmailAuthService(db) 

402 ip_address = get_client_ip(request) 

403 user_agent = get_user_agent(request) 

404 

405 try: 

406 # Change password 

407 success = await auth_service.change_password( 

408 email=current_user.email, old_password=password_request.old_password, new_password=password_request.new_password, ip_address=ip_address, user_agent=user_agent 

409 ) 

410 

411 if success: 

412 return SuccessResponse(success=True, message="Password changed successfully") 

413 else: 

414 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password") 

415 

416 except AuthenticationError as e: 

417 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e)) 

418 except PasswordValidationError as e: 

419 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

420 except Exception as e: 

421 logger.error(f"Password change error for {SecurityValidator.sanitize_log_message(current_user.email)}: {e}") 

422 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Password change service error") 

423 

424 

425@email_auth_router.post("/forgot-password", response_model=SuccessResponse) 

426async def forgot_password(reset_request: ForgotPasswordRequest, request: Request, db: Session = Depends(get_db)): 

427 """Request a one-time password reset token via email. 

428 

429 Args: 

430 reset_request: Forgot-password request payload. 

431 request: Incoming HTTP request. 

432 db: Database session dependency. 

433 

434 Returns: 

435 SuccessResponse: Generic success response to avoid account enumeration. 

436 

437 Raises: 

438 HTTPException: If password reset is disabled or the request is rate limited. 

439 """ 

440 if not getattr(settings, "password_reset_enabled", True): 

441 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled") 

442 

443 auth_service = EmailAuthService(db) 

444 ip_address = get_client_ip(request) 

445 user_agent = get_user_agent(request) 

446 

447 result = await auth_service.request_password_reset(email=reset_request.email, ip_address=ip_address, user_agent=user_agent) 

448 if result.rate_limited: 

449 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests. Please try again later.") 

450 

451 return SuccessResponse(success=True, message="If this email is registered, you will receive a reset link.") 

452 

453 

454@email_auth_router.get("/reset-password/{token}", response_model=PasswordResetTokenValidationResponse) 

455async def validate_password_reset_token(token: str, request: Request, db: Session = Depends(get_db)): 

456 """Validate a password reset token before submitting a new password. 

457 

458 Args: 

459 token: One-time reset token. 

460 request: Incoming HTTP request. 

461 db: Database session dependency. 

462 

463 Returns: 

464 PasswordResetTokenValidationResponse: Token validity and expiration data. 

465 

466 Raises: 

467 HTTPException: If password reset is disabled or token validation fails. 

468 """ 

469 if not getattr(settings, "password_reset_enabled", True): 

470 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled") 

471 

472 auth_service = EmailAuthService(db) 

473 ip_address = get_client_ip(request) 

474 user_agent = get_user_agent(request) 

475 

476 try: 

477 reset_token = await auth_service.validate_password_reset_token(token=token, ip_address=ip_address, user_agent=user_agent) 

478 return PasswordResetTokenValidationResponse(valid=True, message="Reset token is valid", expires_at=reset_token.expires_at) 

479 except AuthenticationError as exc: 

480 detail = str(exc) 

481 if "expired" in detail.lower(): 

482 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail) 

483 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail) 

484 

485 

486@email_auth_router.post("/reset-password/{token}", response_model=SuccessResponse) 

487async def complete_password_reset(token: str, reset_request: ResetPasswordRequest, request: Request, db: Session = Depends(get_db)): 

488 """Complete password reset with a valid one-time token. 

489 

490 Args: 

491 token: One-time reset token. 

492 reset_request: Reset-password payload with new credentials. 

493 request: Incoming HTTP request. 

494 db: Database session dependency. 

495 

496 Returns: 

497 SuccessResponse: Password reset completion status. 

498 

499 Raises: 

500 HTTPException: If password reset is disabled or reset validation fails. 

501 """ 

502 if not getattr(settings, "password_reset_enabled", True): 

503 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled") 

504 

505 auth_service = EmailAuthService(db) 

506 ip_address = get_client_ip(request) 

507 user_agent = get_user_agent(request) 

508 

509 try: 

510 await auth_service.reset_password_with_token(token=token, new_password=reset_request.new_password, ip_address=ip_address, user_agent=user_agent) 

511 return SuccessResponse(success=True, message="Password reset successful. Please sign in with your new password.") 

512 except AuthenticationError as exc: 

513 detail = str(exc) 

514 if "expired" in detail.lower(): 

515 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail) 

516 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail) 

517 except PasswordValidationError as exc: 

518 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) 

519 

520 

521@email_auth_router.get("/me", response_model=EmailUserResponse) 

522async def get_current_user_profile(current_user: EmailUser = Depends(get_current_user)): 

523 """Get current user's profile information. 

524 

525 Args: 

526 current_user: Currently authenticated user 

527 

528 Returns: 

529 EmailUserResponse: User profile information 

530 

531 Raises: 

532 HTTPException: If user authentication fails 

533 

534 Examples: 

535 >>> # GET /auth/email/me 

536 >>> # Headers: Authorization: Bearer <token> 

537 """ 

538 return EmailUserResponse.from_email_user(current_user) 

539 

540 

541@email_auth_router.get("/events", response_model=list[AuthEventResponse]) 

542async def get_auth_events(limit: int = 50, offset: int = 0, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)): 

543 """Get authentication events for the current user. 

544 

545 Args: 

546 limit: Maximum number of events to return 

547 offset: Number of events to skip 

548 current_user: Currently authenticated user 

549 db: Database session 

550 

551 Returns: 

552 List[AuthEventResponse]: Authentication events 

553 

554 Raises: 

555 HTTPException: If user authentication fails 

556 

557 Examples: 

558 >>> # GET /auth/email/events?limit=10&offset=0 

559 >>> # Headers: Authorization: Bearer <token> 

560 """ 

561 auth_service = EmailAuthService(db) 

562 

563 try: 

564 events = await auth_service.get_auth_events(email=current_user.email, limit=limit, offset=offset) 

565 

566 return [AuthEventResponse.model_validate(event) for event in events] 

567 

568 except Exception as e: 

569 logger.error(f"Error getting auth events for {SecurityValidator.sanitize_log_message(current_user.email)}: {e}") 

570 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events") 

571 

572 

573# Admin-only endpoints 

574@email_auth_router.get("/admin/users", response_model=Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]) 

575@require_permission("admin.user_management") 

576async def list_users( 

577 cursor: Optional[str] = Query(None, description="Pagination cursor for fetching the next set of results"), 

578 limit: Optional[int] = Query( 

579 None, 

580 ge=0, 

581 le=settings.pagination_max_page_size, 

582 description="Maximum number of users to return. 0 means all (no limit). Default uses pagination_default_page_size.", 

583 ), 

584 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"), 

585 current_user_ctx: dict = Depends(get_current_user_with_permissions), 

586 db: Session = Depends(get_db), 

587) -> Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]: 

588 """List all users (admin only) with cursor-based pagination support. 

589 

590 Args: 

591 cursor: Pagination cursor for fetching the next set of results 

592 limit: Maximum number of users to return. Use 0 for all users (no limit). 

593 If not specified, uses pagination_default_page_size (default: 50). 

594 include_pagination: Whether to include cursor pagination metadata in the response (default: false) 

595 current_user_ctx: Currently authenticated user context with permissions 

596 db: Database session 

597 

598 Returns: 

599 CursorPaginatedUsersResponse with users and nextCursor if include_pagination=true, or 

600 List of users if include_pagination=false 

601 

602 Raises: 

603 HTTPException: If user is not admin 

604 

605 Examples: 

606 >>> # Cursor-based with pagination: GET /auth/email/admin/users?cursor=eyJlbWFpbCI6Li4ufQ&include_pagination=true 

607 >>> # Simple list: GET /auth/email/admin/users 

608 >>> # Headers: Authorization: Bearer <admin_token> 

609 """ 

610 auth_service = EmailAuthService(db) 

611 

612 try: 

613 result = await auth_service.list_users(cursor=cursor, limit=limit) 

614 user_responses = [EmailUserResponse.from_email_user(user) for user in result.data] 

615 

616 if include_pagination: 

617 return CursorPaginatedUsersResponse(users=user_responses, next_cursor=result.next_cursor) 

618 

619 return user_responses 

620 

621 except Exception as e: 

622 logger.error(f"Error listing users: {e}") 

623 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user list") 

624 

625 

626@email_auth_router.get("/admin/events", response_model=list[AuthEventResponse]) 

627@require_permission("admin.user_management") 

628async def list_all_auth_events(limit: int = 100, offset: int = 0, user_email: Optional[str] = None, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

629 """List authentication events for all users (admin only). 

630 

631 Args: 

632 limit: Maximum number of events to return 

633 offset: Number of events to skip 

634 user_email: Filter events by specific user email 

635 current_user_ctx: Currently authenticated user context with permissions 

636 db: Database session 

637 

638 Returns: 

639 List[AuthEventResponse]: Authentication events 

640 

641 Raises: 

642 HTTPException: If user is not admin 

643 

644 Examples: 

645 >>> # GET /auth/email/admin/events?limit=50&user_email=user@example.com 

646 >>> # Headers: Authorization: Bearer <admin_token> 

647 """ 

648 auth_service = EmailAuthService(db) 

649 

650 try: 

651 events = await auth_service.get_auth_events(email=user_email, limit=limit, offset=offset) 

652 

653 return [AuthEventResponse.model_validate(event) for event in events] 

654 

655 except Exception as e: 

656 logger.error(f"Error getting auth events: {e}") 

657 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events") 

658 

659 

660@email_auth_router.post("/admin/users", response_model=EmailUserResponse, status_code=status.HTTP_201_CREATED) 

661@require_permission("admin.user_management") 

662async def create_user(user_request: AdminCreateUserRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

663 """Create a new user account (admin only). 

664 

665 Args: 

666 user_request: User creation information 

667 current_user_ctx: Currently authenticated user context with permissions 

668 db: Database session 

669 

670 Returns: 

671 EmailUserResponse: Created user information 

672 

673 Raises: 

674 HTTPException: If user creation fails 

675 

676 Examples: 

677 Request JSON: 

678 { 

679 "email": "newuser@example.com", 

680 "password": "secure_password", 

681 "full_name": "New User", 

682 "is_admin": false 

683 } 

684 """ 

685 auth_service = EmailAuthService(db) 

686 

687 try: 

688 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing 

689 # Create new user with all fields from request 

690 user = await auth_service.create_user( 

691 email=user_request.email, 

692 password=user_request.password, 

693 full_name=user_request.full_name, 

694 is_admin=user_request.is_admin, 

695 is_active=user_request.is_active, 

696 password_change_required=user_request.password_change_required, 

697 auth_provider="local", 

698 granted_by=current_user_ctx.get("email"), 

699 ) 

700 

701 # If the user was created with the default password, optionally force password change 

702 if ( 

703 settings.password_change_enforcement_enabled 

704 and getattr(settings, "require_password_change_for_default_password", True) 

705 and user_request.password == settings.default_user_password.get_secret_value() 

706 ): # nosec B105 

707 user.password_change_required = True 

708 db.commit() 

709 

710 logger.info(f"Admin {SecurityValidator.sanitize_log_message(current_user_ctx['email'])} created user: {SecurityValidator.sanitize_log_message(user.email)}") 

711 

712 db.commit() 

713 db.close() 

714 return EmailUserResponse.from_email_user(user) 

715 

716 except EmailValidationError as e: 

717 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

718 except PasswordValidationError as e: 

719 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

720 except UserExistsError as e: 

721 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) 

722 except Exception as e: 

723 logger.error(f"Admin user creation error: {e}") 

724 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="User creation failed") 

725 

726 

727@email_auth_router.get("/admin/users/{user_email}", response_model=EmailUserResponse) 

728@require_permission("admin.user_management") 

729async def get_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

730 """Get user by email (admin only). 

731 

732 Args: 

733 user_email: Email of user to retrieve 

734 current_user_ctx: Currently authenticated user context with permissions 

735 db: Database session 

736 

737 Returns: 

738 EmailUserResponse: User information 

739 

740 Raises: 

741 HTTPException: If user not found 

742 """ 

743 auth_service = EmailAuthService(db) 

744 

745 try: 

746 user = await auth_service.get_user_by_email(user_email) 

747 if not user: 

748 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") 

749 

750 return EmailUserResponse.from_email_user(user) 

751 

752 except HTTPException: 

753 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.) 

754 except Exception as e: 

755 logger.error(f"Error retrieving user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

756 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user") 

757 

758 

759@email_auth_router.put("/admin/users/{user_email}", response_model=EmailUserResponse) 

760@require_permission("admin.user_management") 

761async def update_user(user_email: str, user_request: AdminUserUpdateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

762 """Update user information (admin only). 

763 

764 Args: 

765 user_email: Email of user to update 

766 user_request: Updated user information 

767 current_user_ctx: Currently authenticated user context with permissions 

768 db: Database session 

769 

770 Returns: 

771 EmailUserResponse: Updated user information 

772 

773 Raises: 

774 HTTPException: If user not found or update fails 

775 """ 

776 auth_service = EmailAuthService(db) 

777 

778 try: 

779 user = await auth_service.update_user( 

780 email=user_email, 

781 full_name=user_request.full_name, 

782 is_admin=user_request.is_admin, 

783 is_active=user_request.is_active, 

784 email_verified=user_request.email_verified, 

785 password_change_required=user_request.password_change_required, 

786 password=user_request.password, 

787 admin_origin_source="api", 

788 ) 

789 

790 logger.info(f"Admin {SecurityValidator.sanitize_log_message(current_user_ctx['email'])} updated user: {SecurityValidator.sanitize_log_message(user.email)}") 

791 

792 result = EmailUserResponse.from_email_user(user) 

793 return result 

794 

795 except ValueError as e: 

796 error_msg = str(e) 

797 if "not found" in error_msg.lower(): 

798 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") 

799 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg) 

800 except PasswordValidationError as e: 

801 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

802 except Exception as e: 

803 logger.error(f"Error updating user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

804 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user") 

805 

806 

807@email_auth_router.delete("/admin/users/{user_email}", response_model=SuccessResponse) 

808@require_permission("admin.user_management") 

809async def delete_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

810 """Delete/deactivate user (admin only). 

811 

812 Args: 

813 user_email: Email of user to delete 

814 current_user_ctx: Currently authenticated user context with permissions 

815 db: Database session 

816 

817 Returns: 

818 SuccessResponse: Success confirmation 

819 

820 Raises: 

821 HTTPException: If user not found or deletion fails 

822 """ 

823 auth_service = EmailAuthService(db) 

824 

825 try: 

826 # Prevent admin from deleting themselves 

827 if user_email == current_user_ctx["email"]: 

828 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account") 

829 

830 # Prevent deleting the last active admin user 

831 if await auth_service.is_last_active_admin(user_email): 

832 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete the last remaining admin user") 

833 

834 # Hard delete using auth service 

835 await auth_service.delete_user(user_email) 

836 

837 logger.info(f"Admin {SecurityValidator.sanitize_log_message(current_user_ctx['email'])} deleted user: {SecurityValidator.sanitize_log_message(user_email)}") 

838 

839 db.commit() 

840 db.close() 

841 return SuccessResponse(success=True, message=f"User {user_email} has been deleted") 

842 

843 except HTTPException: 

844 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.) 

845 except Exception as e: 

846 logger.error(f"Error deleting user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

847 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete user") 

848 

849 

850@email_auth_router.post("/admin/users/{user_email}/unlock", response_model=SuccessResponse) 

851@require_permission("admin.user_management") 

852async def unlock_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

853 """Unlock a user account by clearing lockout state and failed login counter. 

854 

855 Args: 

856 user_email: Email address of the user to unlock. 

857 current_user_ctx: Authenticated admin context. 

858 db: Database session dependency. 

859 

860 Returns: 

861 SuccessResponse: Unlock operation result. 

862 

863 Raises: 

864 HTTPException: If user is missing or unlock operation fails. 

865 """ 

866 auth_service = EmailAuthService(db) 

867 

868 try: 

869 await auth_service.unlock_user_account(email=user_email, unlocked_by=current_user_ctx.get("email")) 

870 return SuccessResponse(success=True, message=f"User {user_email} has been unlocked") 

871 except ValueError as exc: 

872 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) 

873 except Exception as exc: 

874 logger.error("Failed to unlock user %s: %s", user_email, exc) 

875 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to unlock user")