Coverage for mcpgateway / routers / email_auth.py: 100%

308 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/email_auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Email Authentication Router. 

8This module provides FastAPI routes for email-based authentication 

9including login, registration, password management, and user profile endpoints. 

10 

11Examples: 

12 >>> from fastapi import FastAPI 

13 >>> from mcpgateway.routers.email_auth import email_auth_router 

14 >>> app = FastAPI() 

15 >>> app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Auth"]) 

16 >>> isinstance(email_auth_router, APIRouter) 

17 True 

18""" 

19 

20# Standard 

21from datetime import datetime, timedelta, UTC 

22from typing import List, Optional, Union 

23 

24# Third-Party 

25from fastapi import APIRouter, Depends, HTTPException, Query, Request, status 

26from fastapi.security import HTTPBearer 

27from sqlalchemy.orm import Session 

28 

29# First-Party 

30from mcpgateway.auth import get_current_user 

31from mcpgateway.config import settings 

32from mcpgateway.db import EmailUser, SessionLocal, utc_now 

33from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

34from mcpgateway.schemas import ( 

35 AdminCreateUserRequest, 

36 AdminUserUpdateRequest, 

37 AuthenticationResponse, 

38 AuthEventResponse, 

39 ChangePasswordRequest, 

40 CursorPaginatedUsersResponse, 

41 EmailLoginRequest, 

42 EmailUserResponse, 

43 ForgotPasswordRequest, 

44 PasswordResetTokenValidationResponse, 

45 PublicRegistrationRequest, 

46 ResetPasswordRequest, 

47 SuccessResponse, 

48) 

49from mcpgateway.services.email_auth_service import AuthenticationError, EmailAuthService, EmailValidationError, PasswordValidationError, UserExistsError 

50from mcpgateway.services.logging_service import LoggingService 

51from mcpgateway.utils.create_jwt_token import create_jwt_token 

52from mcpgateway.utils.orjson_response import ORJSONResponse 

53 

54# Initialize logging 

55logging_service = LoggingService() 

56logger = logging_service.get_logger(__name__) 

57 

58# Create router 

59email_auth_router = APIRouter() 

60 

61# Security scheme 

62bearer_scheme = HTTPBearer(auto_error=False) 

63 

64 

65def get_db(): 

66 """Database dependency. 

67 

68 Commits the transaction on successful completion to avoid implicit rollbacks 

69 for read-only operations. Rolls back explicitly on exception. 

70 

71 Yields: 

72 Session: SQLAlchemy database session 

73 

74 Raises: 

75 Exception: Re-raises any exception after rolling back the transaction. 

76 """ 

77 db = SessionLocal() 

78 try: 

79 yield db 

80 db.commit() 

81 except Exception: 

82 try: 

83 db.rollback() 

84 except Exception: 

85 try: 

86 db.invalidate() 

87 except Exception: 

88 pass # nosec B110 - Best effort cleanup on connection failure 

89 raise 

90 finally: 

91 db.close() 

92 

93 

94def get_client_ip(request: Request) -> str: 

95 """Extract client IP address from request. 

96 

97 Args: 

98 request: FastAPI request object 

99 

100 Returns: 

101 str: Client IP address 

102 """ 

103 # Check for X-Forwarded-For header (proxy/load balancer) 

104 forwarded_for = request.headers.get("X-Forwarded-For") 

105 if forwarded_for: 

106 return forwarded_for.split(",")[0].strip() 

107 

108 # Check for X-Real-IP header 

109 real_ip = request.headers.get("X-Real-IP") 

110 if real_ip: 

111 return real_ip 

112 

113 # Fall back to direct client IP 

114 return request.client.host if request.client else "unknown" 

115 

116 

117def get_user_agent(request: Request) -> str: 

118 """Extract user agent from request. 

119 

120 Args: 

121 request: FastAPI request object 

122 

123 Returns: 

124 str: User agent string 

125 """ 

126 return request.headers.get("User-Agent", "unknown") 

127 

128 

129async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = None, jti: Optional[str] = None) -> tuple[str, int]: 

130 """Create JWT access token for user with enhanced scoping. 

131 

132 Args: 

133 user: EmailUser instance 

134 token_scopes: Optional token scoping information 

135 jti: Optional JWT ID for revocation tracking 

136 

137 Returns: 

138 Tuple of (token_string, expires_in_seconds) 

139 """ 

140 now = datetime.now(tz=UTC) 

141 expires_delta = timedelta(minutes=settings.token_expiry) 

142 expire = now + expires_delta 

143 

144 # Create JWT payload — session token (teams resolved server-side at request time) 

145 payload = { 

146 # Standard JWT claims 

147 "sub": user.email, 

148 "iss": settings.jwt_issuer, 

149 "aud": settings.jwt_audience, 

150 "iat": int(now.timestamp()), 

151 "exp": int(expire.timestamp()), 

152 "jti": jti or str(__import__("uuid").uuid4()), 

153 # User profile information 

154 "user": { 

155 "email": str(getattr(user, "email", "")), 

156 "full_name": str(getattr(user, "full_name", "")), 

157 "is_admin": bool(getattr(user, "is_admin", False)), 

158 "auth_provider": str(getattr(user, "auth_provider", "local")), 

159 }, 

160 "token_use": "session", # nosec B105 - token type marker, not a password 

161 # Token scoping (if provided) 

162 "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}}, 

163 } 

164 

165 # Generate token using centralized token creation 

166 token = await create_jwt_token(payload) 

167 

168 return token, int(expires_delta.total_seconds()) 

169 

170 

171async def create_legacy_access_token(user: EmailUser) -> tuple[str, int]: 

172 """Create legacy JWT access token for backwards compatibility. 

173 

174 Args: 

175 user: EmailUser instance 

176 

177 Returns: 

178 Tuple of (token_string, expires_in_seconds) 

179 """ 

180 now = datetime.now(tz=UTC) 

181 expires_delta = timedelta(minutes=settings.token_expiry) 

182 expire = now + expires_delta 

183 

184 # Create simple JWT payload (original format) with primitives only 

185 payload = { 

186 "sub": str(getattr(user, "email", "")), 

187 "email": str(getattr(user, "email", "")), 

188 "full_name": str(getattr(user, "full_name", "")), 

189 "is_admin": bool(getattr(user, "is_admin", False)), 

190 "auth_provider": str(getattr(user, "auth_provider", "local")), 

191 "iat": int(now.timestamp()), 

192 "exp": int(expire.timestamp()), 

193 "iss": settings.jwt_issuer, 

194 "aud": settings.jwt_audience, 

195 } 

196 

197 # Generate token using centralized token creation 

198 token = await create_jwt_token(payload) 

199 

200 return token, int(expires_delta.total_seconds()) 

201 

202 

203@email_auth_router.post("/login", response_model=AuthenticationResponse) 

204async def login(login_request: EmailLoginRequest, request: Request, db: Session = Depends(get_db)): 

205 """Authenticate user with email and password. 

206 

207 Args: 

208 login_request: Login credentials 

209 request: FastAPI request object 

210 db: Database session 

211 

212 Returns: 

213 AuthenticationResponse: Access token and user info 

214 

215 Examples: 

216 >>> import asyncio 

217 >>> asyncio.iscoroutinefunction(login) 

218 True 

219 

220 Raises: 

221 HTTPException: If authentication fails 

222 

223 Examples: 

224 Request JSON: 

225 { 

226 "email": "user@example.com", 

227 "password": "secure_password" 

228 } 

229 """ 

230 auth_service = EmailAuthService(db) 

231 ip_address = get_client_ip(request) 

232 user_agent = get_user_agent(request) 

233 

234 try: 

235 # Authenticate user 

236 user = await auth_service.authenticate_user(email=login_request.email, password=login_request.password, ip_address=ip_address, user_agent=user_agent) 

237 

238 if not user: 

239 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") 

240 

241 # Password change enforcement respects master switch and individual toggles 

242 needs_password_change = False 

243 

244 if settings.password_change_enforcement_enabled: 

245 # If flag is set on the user, always honor it (flag is cleared when password is changed) 

246 if getattr(user, "password_change_required", False): 

247 needs_password_change = True 

248 logger.debug("User %s has password_change_required flag set", login_request.email) 

249 

250 # Enforce expiry-based password change if configured and not already required 

251 if not needs_password_change: 

252 try: 

253 pwd_changed = getattr(user, "password_changed_at", None) 

254 if isinstance(pwd_changed, datetime): 

255 age_days = (utc_now() - pwd_changed).days 

256 max_age = getattr(settings, "password_max_age_days", 90) 

257 if age_days >= max_age: 

258 needs_password_change = True 

259 logger.debug("User %s password expired (%s days >= %s)", login_request.email, age_days, max_age) 

260 except Exception as exc: 

261 logger.debug("Failed to evaluate password age for %s: %s", login_request.email, exc) 

262 

263 # Detect default password on login if enabled 

264 if getattr(settings, "detect_default_password_on_login", True): 

265 # First-Party 

266 from mcpgateway.services.argon2_service import Argon2PasswordService 

267 

268 password_service = Argon2PasswordService() 

269 is_using_default_password = await password_service.verify_password_async(settings.default_user_password.get_secret_value(), user.password_hash) # nosec B105 

270 if is_using_default_password: 

271 # Mark user for password change depending on configuration 

272 if getattr(settings, "require_password_change_for_default_password", True): 

273 user.password_change_required = True 

274 needs_password_change = True 

275 try: 

276 db.commit() 

277 except Exception as exc: # log commit failures 

278 logger.warning("Failed to commit password_change_required flag for %s: %s", login_request.email, exc) 

279 else: 

280 logger.info("User %s is using default password but enforcement is disabled", login_request.email) 

281 

282 if needs_password_change: 

283 logger.info(f"Login blocked for {login_request.email}: password change required") 

284 return ORJSONResponse( 

285 status_code=status.HTTP_403_FORBIDDEN, 

286 content={"detail": "Password change required. Please change your password before continuing."}, 

287 headers={"X-Password-Change-Required": "true"}, 

288 ) 

289 

290 # Create access token 

291 access_token, expires_in = await create_access_token(user) 

292 

293 # Return authentication response 

294 return AuthenticationResponse( 

295 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

296 ) # nosec B106 - OAuth2 token type, not a password 

297 

298 except HTTPException: 

299 raise # Re-raise HTTP exceptions as-is (401, 403, etc.) 

300 except Exception as e: 

301 logger.error(f"Login error for {login_request.email}: {e}") 

302 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error") 

303 

304 

305@email_auth_router.post("/register", response_model=AuthenticationResponse) 

306async def register(registration_request: PublicRegistrationRequest, request: Request, db: Session = Depends(get_db)): 

307 """Register a new user account. 

308 

309 This endpoint is controlled by the PUBLIC_REGISTRATION_ENABLED setting. 

310 When disabled (default), returns 403 Forbidden and users can only be 

311 created by administrators via the admin API. 

312 

313 Args: 

314 registration_request: Registration information (email, password, full_name only) 

315 request: FastAPI request object 

316 db: Database session 

317 

318 Returns: 

319 AuthenticationResponse: Access token and user info 

320 

321 Raises: 

322 HTTPException: If registration fails or is disabled 

323 

324 Examples: 

325 Request JSON: 

326 { 

327 "email": "new@example.com", 

328 "password": "secure_password", 

329 "full_name": "New User" 

330 } 

331 """ 

332 # Check if public registration is allowed 

333 if not settings.public_registration_enabled: 

334 logger.warning(f"Registration attempt rejected - public registration disabled: {registration_request.email}") 

335 raise HTTPException( 

336 status_code=status.HTTP_403_FORBIDDEN, 

337 detail="Public registration is disabled. Please contact an administrator to create an account.", 

338 ) 

339 

340 auth_service = EmailAuthService(db) 

341 get_client_ip(request) 

342 get_user_agent(request) 

343 

344 try: 

345 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing 

346 # Security-sensitive fields are hardcoded (not exposed on public schema) 

347 user = await auth_service.create_user( 

348 email=registration_request.email, 

349 password=registration_request.password, 

350 full_name=registration_request.full_name, 

351 is_admin=False, # Regular users cannot self-register as admin 

352 is_active=True, # Public registrations are always active 

353 password_change_required=False, # No forced password change for self-registration 

354 auth_provider="local", 

355 ) 

356 

357 # Create access token 

358 access_token, expires_in = await create_access_token(user) 

359 

360 logger.info(f"New user registered: {user.email}") 

361 

362 return AuthenticationResponse( 

363 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

364 ) # nosec B106 - OAuth2 token type, not a password 

365 

366 except EmailValidationError as e: 

367 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

368 except PasswordValidationError as e: 

369 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

370 except UserExistsError as e: 

371 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) 

372 except Exception as e: 

373 logger.error(f"Registration error for {registration_request.email}: {e}") 

374 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration service error") 

375 

376 

377@email_auth_router.post("/change-password", response_model=SuccessResponse) 

378async def change_password(password_request: ChangePasswordRequest, request: Request, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)): 

379 """Change user's password. 

380 

381 Args: 

382 password_request: Old and new passwords 

383 request: FastAPI request object 

384 current_user: Currently authenticated user 

385 db: Database session 

386 

387 Returns: 

388 SuccessResponse: Success confirmation 

389 

390 Raises: 

391 HTTPException: If password change fails 

392 

393 Examples: 

394 Request JSON (with Bearer token in Authorization header): 

395 { 

396 "old_password": "current_password", 

397 "new_password": "new_secure_password" 

398 } 

399 """ 

400 auth_service = EmailAuthService(db) 

401 ip_address = get_client_ip(request) 

402 user_agent = get_user_agent(request) 

403 

404 try: 

405 # Change password 

406 success = await auth_service.change_password( 

407 email=current_user.email, old_password=password_request.old_password, new_password=password_request.new_password, ip_address=ip_address, user_agent=user_agent 

408 ) 

409 

410 if success: 

411 return SuccessResponse(success=True, message="Password changed successfully") 

412 else: 

413 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password") 

414 

415 except AuthenticationError as e: 

416 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e)) 

417 except PasswordValidationError as e: 

418 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

419 except Exception as e: 

420 logger.error(f"Password change error for {current_user.email}: {e}") 

421 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Password change service error") 

422 

423 

424@email_auth_router.post("/forgot-password", response_model=SuccessResponse) 

425async def forgot_password(reset_request: ForgotPasswordRequest, request: Request, db: Session = Depends(get_db)): 

426 """Request a one-time password reset token via email. 

427 

428 Args: 

429 reset_request: Forgot-password request payload. 

430 request: Incoming HTTP request. 

431 db: Database session dependency. 

432 

433 Returns: 

434 SuccessResponse: Generic success response to avoid account enumeration. 

435 

436 Raises: 

437 HTTPException: If password reset is disabled or the request is rate limited. 

438 """ 

439 if not getattr(settings, "password_reset_enabled", True): 

440 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled") 

441 

442 auth_service = EmailAuthService(db) 

443 ip_address = get_client_ip(request) 

444 user_agent = get_user_agent(request) 

445 

446 result = await auth_service.request_password_reset(email=reset_request.email, ip_address=ip_address, user_agent=user_agent) 

447 if result.rate_limited: 

448 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests. Please try again later.") 

449 

450 return SuccessResponse(success=True, message="If this email is registered, you will receive a reset link.") 

451 

452 

453@email_auth_router.get("/reset-password/{token}", response_model=PasswordResetTokenValidationResponse) 

454async def validate_password_reset_token(token: str, request: Request, db: Session = Depends(get_db)): 

455 """Validate a password reset token before submitting a new password. 

456 

457 Args: 

458 token: One-time reset token. 

459 request: Incoming HTTP request. 

460 db: Database session dependency. 

461 

462 Returns: 

463 PasswordResetTokenValidationResponse: Token validity and expiration data. 

464 

465 Raises: 

466 HTTPException: If password reset is disabled or token validation fails. 

467 """ 

468 if not getattr(settings, "password_reset_enabled", True): 

469 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled") 

470 

471 auth_service = EmailAuthService(db) 

472 ip_address = get_client_ip(request) 

473 user_agent = get_user_agent(request) 

474 

475 try: 

476 reset_token = await auth_service.validate_password_reset_token(token=token, ip_address=ip_address, user_agent=user_agent) 

477 return PasswordResetTokenValidationResponse(valid=True, message="Reset token is valid", expires_at=reset_token.expires_at) 

478 except AuthenticationError as exc: 

479 detail = str(exc) 

480 if "expired" in detail.lower(): 

481 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail) 

482 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail) 

483 

484 

485@email_auth_router.post("/reset-password/{token}", response_model=SuccessResponse) 

486async def complete_password_reset(token: str, reset_request: ResetPasswordRequest, request: Request, db: Session = Depends(get_db)): 

487 """Complete password reset with a valid one-time token. 

488 

489 Args: 

490 token: One-time reset token. 

491 reset_request: Reset-password payload with new credentials. 

492 request: Incoming HTTP request. 

493 db: Database session dependency. 

494 

495 Returns: 

496 SuccessResponse: Password reset completion status. 

497 

498 Raises: 

499 HTTPException: If password reset is disabled or reset validation fails. 

500 """ 

501 if not getattr(settings, "password_reset_enabled", True): 

502 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Password reset is disabled") 

503 

504 auth_service = EmailAuthService(db) 

505 ip_address = get_client_ip(request) 

506 user_agent = get_user_agent(request) 

507 

508 try: 

509 await auth_service.reset_password_with_token(token=token, new_password=reset_request.new_password, ip_address=ip_address, user_agent=user_agent) 

510 return SuccessResponse(success=True, message="Password reset successful. Please sign in with your new password.") 

511 except AuthenticationError as exc: 

512 detail = str(exc) 

513 if "expired" in detail.lower(): 

514 raise HTTPException(status_code=status.HTTP_410_GONE, detail=detail) 

515 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail) 

516 except PasswordValidationError as exc: 

517 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) 

518 

519 

520@email_auth_router.get("/me", response_model=EmailUserResponse) 

521async def get_current_user_profile(current_user: EmailUser = Depends(get_current_user)): 

522 """Get current user's profile information. 

523 

524 Args: 

525 current_user: Currently authenticated user 

526 

527 Returns: 

528 EmailUserResponse: User profile information 

529 

530 Raises: 

531 HTTPException: If user authentication fails 

532 

533 Examples: 

534 >>> # GET /auth/email/me 

535 >>> # Headers: Authorization: Bearer <token> 

536 """ 

537 return EmailUserResponse.from_email_user(current_user) 

538 

539 

540@email_auth_router.get("/events", response_model=list[AuthEventResponse]) 

541async def get_auth_events(limit: int = 50, offset: int = 0, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)): 

542 """Get authentication events for the current user. 

543 

544 Args: 

545 limit: Maximum number of events to return 

546 offset: Number of events to skip 

547 current_user: Currently authenticated user 

548 db: Database session 

549 

550 Returns: 

551 List[AuthEventResponse]: Authentication events 

552 

553 Raises: 

554 HTTPException: If user authentication fails 

555 

556 Examples: 

557 >>> # GET /auth/email/events?limit=10&offset=0 

558 >>> # Headers: Authorization: Bearer <token> 

559 """ 

560 auth_service = EmailAuthService(db) 

561 

562 try: 

563 events = await auth_service.get_auth_events(email=current_user.email, limit=limit, offset=offset) 

564 

565 return [AuthEventResponse.model_validate(event) for event in events] 

566 

567 except Exception as e: 

568 logger.error(f"Error getting auth events for {current_user.email}: {e}") 

569 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events") 

570 

571 

572# Admin-only endpoints 

573@email_auth_router.get("/admin/users", response_model=Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]) 

574@require_permission("admin.user_management") 

575async def list_users( 

576 cursor: Optional[str] = Query(None, description="Pagination cursor for fetching the next set of results"), 

577 limit: Optional[int] = Query( 

578 None, 

579 ge=0, 

580 le=settings.pagination_max_page_size, 

581 description="Maximum number of users to return. 0 means all (no limit). Default uses pagination_default_page_size.", 

582 ), 

583 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"), 

584 current_user_ctx: dict = Depends(get_current_user_with_permissions), 

585 db: Session = Depends(get_db), 

586) -> Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]: 

587 """List all users (admin only) with cursor-based pagination support. 

588 

589 Args: 

590 cursor: Pagination cursor for fetching the next set of results 

591 limit: Maximum number of users to return. Use 0 for all users (no limit). 

592 If not specified, uses pagination_default_page_size (default: 50). 

593 include_pagination: Whether to include cursor pagination metadata in the response (default: false) 

594 current_user_ctx: Currently authenticated user context with permissions 

595 db: Database session 

596 

597 Returns: 

598 CursorPaginatedUsersResponse with users and nextCursor if include_pagination=true, or 

599 List of users if include_pagination=false 

600 

601 Raises: 

602 HTTPException: If user is not admin 

603 

604 Examples: 

605 >>> # Cursor-based with pagination: GET /auth/email/admin/users?cursor=eyJlbWFpbCI6Li4ufQ&include_pagination=true 

606 >>> # Simple list: GET /auth/email/admin/users 

607 >>> # Headers: Authorization: Bearer <admin_token> 

608 """ 

609 auth_service = EmailAuthService(db) 

610 

611 try: 

612 result = await auth_service.list_users(cursor=cursor, limit=limit) 

613 user_responses = [EmailUserResponse.from_email_user(user) for user in result.data] 

614 

615 if include_pagination: 

616 return CursorPaginatedUsersResponse(users=user_responses, next_cursor=result.next_cursor) 

617 

618 return user_responses 

619 

620 except Exception as e: 

621 logger.error(f"Error listing users: {e}") 

622 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user list") 

623 

624 

625@email_auth_router.get("/admin/events", response_model=list[AuthEventResponse]) 

626@require_permission("admin.user_management") 

627async def list_all_auth_events(limit: int = 100, offset: int = 0, user_email: Optional[str] = None, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

628 """List authentication events for all users (admin only). 

629 

630 Args: 

631 limit: Maximum number of events to return 

632 offset: Number of events to skip 

633 user_email: Filter events by specific user email 

634 current_user_ctx: Currently authenticated user context with permissions 

635 db: Database session 

636 

637 Returns: 

638 List[AuthEventResponse]: Authentication events 

639 

640 Raises: 

641 HTTPException: If user is not admin 

642 

643 Examples: 

644 >>> # GET /auth/email/admin/events?limit=50&user_email=user@example.com 

645 >>> # Headers: Authorization: Bearer <admin_token> 

646 """ 

647 auth_service = EmailAuthService(db) 

648 

649 try: 

650 events = await auth_service.get_auth_events(email=user_email, limit=limit, offset=offset) 

651 

652 return [AuthEventResponse.model_validate(event) for event in events] 

653 

654 except Exception as e: 

655 logger.error(f"Error getting auth events: {e}") 

656 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events") 

657 

658 

659@email_auth_router.post("/admin/users", response_model=EmailUserResponse, status_code=status.HTTP_201_CREATED) 

660@require_permission("admin.user_management") 

661async def create_user(user_request: AdminCreateUserRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

662 """Create a new user account (admin only). 

663 

664 Args: 

665 user_request: User creation information 

666 current_user_ctx: Currently authenticated user context with permissions 

667 db: Database session 

668 

669 Returns: 

670 EmailUserResponse: Created user information 

671 

672 Raises: 

673 HTTPException: If user creation fails 

674 

675 Examples: 

676 Request JSON: 

677 { 

678 "email": "newuser@example.com", 

679 "password": "secure_password", 

680 "full_name": "New User", 

681 "is_admin": false 

682 } 

683 """ 

684 auth_service = EmailAuthService(db) 

685 

686 try: 

687 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing 

688 # Create new user with all fields from request 

689 user = await auth_service.create_user( 

690 email=user_request.email, 

691 password=user_request.password, 

692 full_name=user_request.full_name, 

693 is_admin=user_request.is_admin, 

694 is_active=user_request.is_active, 

695 password_change_required=user_request.password_change_required, 

696 auth_provider="local", 

697 granted_by=current_user_ctx.get("email"), 

698 ) 

699 

700 # If the user was created with the default password, optionally force password change 

701 if ( 

702 settings.password_change_enforcement_enabled 

703 and getattr(settings, "require_password_change_for_default_password", True) 

704 and user_request.password == settings.default_user_password.get_secret_value() 

705 ): # nosec B105 

706 user.password_change_required = True 

707 db.commit() 

708 

709 logger.info(f"Admin {current_user_ctx['email']} created user: {user.email}") 

710 

711 db.commit() 

712 db.close() 

713 return EmailUserResponse.from_email_user(user) 

714 

715 except EmailValidationError as e: 

716 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

717 except PasswordValidationError as e: 

718 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

719 except UserExistsError as e: 

720 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) 

721 except Exception as e: 

722 logger.error(f"Admin user creation error: {e}") 

723 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="User creation failed") 

724 

725 

726@email_auth_router.get("/admin/users/{user_email}", response_model=EmailUserResponse) 

727@require_permission("admin.user_management") 

728async def get_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

729 """Get user by email (admin only). 

730 

731 Args: 

732 user_email: Email of user to retrieve 

733 current_user_ctx: Currently authenticated user context with permissions 

734 db: Database session 

735 

736 Returns: 

737 EmailUserResponse: User information 

738 

739 Raises: 

740 HTTPException: If user not found 

741 """ 

742 auth_service = EmailAuthService(db) 

743 

744 try: 

745 user = await auth_service.get_user_by_email(user_email) 

746 if not user: 

747 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") 

748 

749 return EmailUserResponse.from_email_user(user) 

750 

751 except HTTPException: 

752 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.) 

753 except Exception as e: 

754 logger.error(f"Error retrieving user {user_email}: {e}") 

755 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user") 

756 

757 

758@email_auth_router.put("/admin/users/{user_email}", response_model=EmailUserResponse) 

759@require_permission("admin.user_management") 

760async def update_user(user_email: str, user_request: AdminUserUpdateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

761 """Update user information (admin only). 

762 

763 Args: 

764 user_email: Email of user to update 

765 user_request: Updated user information 

766 current_user_ctx: Currently authenticated user context with permissions 

767 db: Database session 

768 

769 Returns: 

770 EmailUserResponse: Updated user information 

771 

772 Raises: 

773 HTTPException: If user not found or update fails 

774 """ 

775 auth_service = EmailAuthService(db) 

776 

777 try: 

778 user = await auth_service.update_user( 

779 email=user_email, 

780 full_name=user_request.full_name, 

781 is_admin=user_request.is_admin, 

782 is_active=user_request.is_active, 

783 email_verified=user_request.email_verified, 

784 password_change_required=user_request.password_change_required, 

785 password=user_request.password, 

786 admin_origin_source="api", 

787 ) 

788 

789 logger.info(f"Admin {current_user_ctx['email']} updated user: {user.email}") 

790 

791 result = EmailUserResponse.from_email_user(user) 

792 return result 

793 

794 except ValueError as e: 

795 error_msg = str(e) 

796 if "not found" in error_msg.lower(): 

797 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") 

798 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg) 

799 except PasswordValidationError as e: 

800 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

801 except Exception as e: 

802 logger.error(f"Error updating user {user_email}: {e}") 

803 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user") 

804 

805 

806@email_auth_router.delete("/admin/users/{user_email}", response_model=SuccessResponse) 

807@require_permission("admin.user_management") 

808async def delete_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

809 """Delete/deactivate user (admin only). 

810 

811 Args: 

812 user_email: Email of user to delete 

813 current_user_ctx: Currently authenticated user context with permissions 

814 db: Database session 

815 

816 Returns: 

817 SuccessResponse: Success confirmation 

818 

819 Raises: 

820 HTTPException: If user not found or deletion fails 

821 """ 

822 auth_service = EmailAuthService(db) 

823 

824 try: 

825 # Prevent admin from deleting themselves 

826 if user_email == current_user_ctx["email"]: 

827 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account") 

828 

829 # Prevent deleting the last active admin user 

830 if await auth_service.is_last_active_admin(user_email): 

831 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete the last remaining admin user") 

832 

833 # Hard delete using auth service 

834 await auth_service.delete_user(user_email) 

835 

836 logger.info(f"Admin {current_user_ctx['email']} deleted user: {user_email}") 

837 

838 db.commit() 

839 db.close() 

840 return SuccessResponse(success=True, message=f"User {user_email} has been deleted") 

841 

842 except HTTPException: 

843 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.) 

844 except Exception as e: 

845 logger.error(f"Error deleting user {user_email}: {e}") 

846 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete user") 

847 

848 

849@email_auth_router.post("/admin/users/{user_email}/unlock", response_model=SuccessResponse) 

850@require_permission("admin.user_management") 

851async def unlock_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

852 """Unlock a user account by clearing lockout state and failed login counter. 

853 

854 Args: 

855 user_email: Email address of the user to unlock. 

856 current_user_ctx: Authenticated admin context. 

857 db: Database session dependency. 

858 

859 Returns: 

860 SuccessResponse: Unlock operation result. 

861 

862 Raises: 

863 HTTPException: If user is missing or unlock operation fails. 

864 """ 

865 auth_service = EmailAuthService(db) 

866 

867 try: 

868 await auth_service.unlock_user_account(email=user_email, unlocked_by=current_user_ctx.get("email")) 

869 return SuccessResponse(success=True, message=f"User {user_email} has been unlocked") 

870 except ValueError as exc: 

871 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) 

872 except Exception as exc: 

873 logger.error("Failed to unlock user %s: %s", user_email, exc) 

874 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to unlock user")