Coverage for mcpgateway / routers / email_auth.py: 100%

253 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/email_auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Email Authentication Router. 

8This module provides FastAPI routes for email-based authentication 

9including login, registration, password management, and user profile endpoints. 

10 

11Examples: 

12 >>> from fastapi import FastAPI 

13 >>> from mcpgateway.routers.email_auth import email_auth_router 

14 >>> app = FastAPI() 

15 >>> app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Auth"]) 

16 >>> isinstance(email_auth_router, APIRouter) 

17 True 

18""" 

19 

20# Standard 

21from datetime import datetime, timedelta, UTC 

22from typing import List, Optional, Union 

23 

24# Third-Party 

25from fastapi import APIRouter, Depends, HTTPException, Query, Request, status 

26from fastapi.security import HTTPBearer 

27from sqlalchemy.orm import Session 

28 

29# First-Party 

30from mcpgateway.auth import get_current_user 

31from mcpgateway.config import settings 

32from mcpgateway.db import EmailUser, SessionLocal, utc_now 

33from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

34from mcpgateway.schemas import ( 

35 AdminCreateUserRequest, 

36 AdminUserUpdateRequest, 

37 AuthenticationResponse, 

38 AuthEventResponse, 

39 ChangePasswordRequest, 

40 CursorPaginatedUsersResponse, 

41 EmailLoginRequest, 

42 EmailUserResponse, 

43 PublicRegistrationRequest, 

44 SuccessResponse, 

45) 

46from mcpgateway.services.email_auth_service import AuthenticationError, EmailAuthService, EmailValidationError, PasswordValidationError, UserExistsError 

47from mcpgateway.services.logging_service import LoggingService 

48from mcpgateway.utils.create_jwt_token import create_jwt_token 

49from mcpgateway.utils.orjson_response import ORJSONResponse 

50 

51# Initialize logging 

52logging_service = LoggingService() 

53logger = logging_service.get_logger(__name__) 

54 

55# Create router 

56email_auth_router = APIRouter() 

57 

58# Security scheme 

59bearer_scheme = HTTPBearer(auto_error=False) 

60 

61 

62def get_db(): 

63 """Database dependency. 

64 

65 Commits the transaction on successful completion to avoid implicit rollbacks 

66 for read-only operations. Rolls back explicitly on exception. 

67 

68 Yields: 

69 Session: SQLAlchemy database session 

70 

71 Raises: 

72 Exception: Re-raises any exception after rolling back the transaction. 

73 """ 

74 db = SessionLocal() 

75 try: 

76 yield db 

77 db.commit() 

78 except Exception: 

79 try: 

80 db.rollback() 

81 except Exception: 

82 try: 

83 db.invalidate() 

84 except Exception: 

85 pass # nosec B110 - Best effort cleanup on connection failure 

86 raise 

87 finally: 

88 db.close() 

89 

90 

91def get_client_ip(request: Request) -> str: 

92 """Extract client IP address from request. 

93 

94 Args: 

95 request: FastAPI request object 

96 

97 Returns: 

98 str: Client IP address 

99 """ 

100 # Check for X-Forwarded-For header (proxy/load balancer) 

101 forwarded_for = request.headers.get("X-Forwarded-For") 

102 if forwarded_for: 

103 return forwarded_for.split(",")[0].strip() 

104 

105 # Check for X-Real-IP header 

106 real_ip = request.headers.get("X-Real-IP") 

107 if real_ip: 

108 return real_ip 

109 

110 # Fall back to direct client IP 

111 return request.client.host if request.client else "unknown" 

112 

113 

114def get_user_agent(request: Request) -> str: 

115 """Extract user agent from request. 

116 

117 Args: 

118 request: FastAPI request object 

119 

120 Returns: 

121 str: User agent string 

122 """ 

123 return request.headers.get("User-Agent", "unknown") 

124 

125 

126async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = None, jti: Optional[str] = None) -> tuple[str, int]: 

127 """Create JWT access token for user with enhanced scoping. 

128 

129 Args: 

130 user: EmailUser instance 

131 token_scopes: Optional token scoping information 

132 jti: Optional JWT ID for revocation tracking 

133 

134 Returns: 

135 Tuple of (token_string, expires_in_seconds) 

136 """ 

137 now = datetime.now(tz=UTC) 

138 expires_delta = timedelta(minutes=settings.token_expiry) 

139 expire = now + expires_delta 

140 

141 # Create JWT payload — session token (teams resolved server-side at request time) 

142 payload = { 

143 # Standard JWT claims 

144 "sub": user.email, 

145 "iss": settings.jwt_issuer, 

146 "aud": settings.jwt_audience, 

147 "iat": int(now.timestamp()), 

148 "exp": int(expire.timestamp()), 

149 "jti": jti or str(__import__("uuid").uuid4()), 

150 # User profile information 

151 "user": { 

152 "email": str(getattr(user, "email", "")), 

153 "full_name": str(getattr(user, "full_name", "")), 

154 "is_admin": bool(getattr(user, "is_admin", False)), 

155 "auth_provider": str(getattr(user, "auth_provider", "local")), 

156 }, 

157 "token_use": "session", # nosec B105 - token type marker, not a password 

158 # Token scoping (if provided) 

159 "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}}, 

160 } 

161 

162 # Generate token using centralized token creation 

163 token = await create_jwt_token(payload) 

164 

165 return token, int(expires_delta.total_seconds()) 

166 

167 

168async def create_legacy_access_token(user: EmailUser) -> tuple[str, int]: 

169 """Create legacy JWT access token for backwards compatibility. 

170 

171 Args: 

172 user: EmailUser instance 

173 

174 Returns: 

175 Tuple of (token_string, expires_in_seconds) 

176 """ 

177 now = datetime.now(tz=UTC) 

178 expires_delta = timedelta(minutes=settings.token_expiry) 

179 expire = now + expires_delta 

180 

181 # Create simple JWT payload (original format) with primitives only 

182 payload = { 

183 "sub": str(getattr(user, "email", "")), 

184 "email": str(getattr(user, "email", "")), 

185 "full_name": str(getattr(user, "full_name", "")), 

186 "is_admin": bool(getattr(user, "is_admin", False)), 

187 "auth_provider": str(getattr(user, "auth_provider", "local")), 

188 "iat": int(now.timestamp()), 

189 "exp": int(expire.timestamp()), 

190 "iss": settings.jwt_issuer, 

191 "aud": settings.jwt_audience, 

192 } 

193 

194 # Generate token using centralized token creation 

195 token = await create_jwt_token(payload) 

196 

197 return token, int(expires_delta.total_seconds()) 

198 

199 

200@email_auth_router.post("/login", response_model=AuthenticationResponse) 

201async def login(login_request: EmailLoginRequest, request: Request, db: Session = Depends(get_db)): 

202 """Authenticate user with email and password. 

203 

204 Args: 

205 login_request: Login credentials 

206 request: FastAPI request object 

207 db: Database session 

208 

209 Returns: 

210 AuthenticationResponse: Access token and user info 

211 

212 Examples: 

213 >>> import asyncio 

214 >>> asyncio.iscoroutinefunction(login) 

215 True 

216 

217 Raises: 

218 HTTPException: If authentication fails 

219 

220 Examples: 

221 Request JSON: 

222 { 

223 "email": "user@example.com", 

224 "password": "secure_password" 

225 } 

226 """ 

227 auth_service = EmailAuthService(db) 

228 ip_address = get_client_ip(request) 

229 user_agent = get_user_agent(request) 

230 

231 try: 

232 # Authenticate user 

233 user = await auth_service.authenticate_user(email=login_request.email, password=login_request.password, ip_address=ip_address, user_agent=user_agent) 

234 

235 if not user: 

236 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") 

237 

238 # Password change enforcement respects master switch and individual toggles 

239 needs_password_change = False 

240 

241 if settings.password_change_enforcement_enabled: 

242 # If flag is set on the user, always honor it (flag is cleared when password is changed) 

243 if getattr(user, "password_change_required", False): 

244 needs_password_change = True 

245 logger.debug("User %s has password_change_required flag set", login_request.email) 

246 

247 # Enforce expiry-based password change if configured and not already required 

248 if not needs_password_change: 

249 try: 

250 pwd_changed = getattr(user, "password_changed_at", None) 

251 if isinstance(pwd_changed, datetime): 

252 age_days = (utc_now() - pwd_changed).days 

253 max_age = getattr(settings, "password_max_age_days", 90) 

254 if age_days >= max_age: 

255 needs_password_change = True 

256 logger.debug("User %s password expired (%s days >= %s)", login_request.email, age_days, max_age) 

257 except Exception as exc: 

258 logger.debug("Failed to evaluate password age for %s: %s", login_request.email, exc) 

259 

260 # Detect default password on login if enabled 

261 if getattr(settings, "detect_default_password_on_login", True): 

262 # First-Party 

263 from mcpgateway.services.argon2_service import Argon2PasswordService 

264 

265 password_service = Argon2PasswordService() 

266 is_using_default_password = await password_service.verify_password_async(settings.default_user_password.get_secret_value(), user.password_hash) # nosec B105 

267 if is_using_default_password: 

268 # Mark user for password change depending on configuration 

269 if getattr(settings, "require_password_change_for_default_password", True): 

270 user.password_change_required = True 

271 needs_password_change = True 

272 try: 

273 db.commit() 

274 except Exception as exc: # log commit failures 

275 logger.warning("Failed to commit password_change_required flag for %s: %s", login_request.email, exc) 

276 else: 

277 logger.info("User %s is using default password but enforcement is disabled", login_request.email) 

278 

279 if needs_password_change: 

280 logger.info(f"Login blocked for {login_request.email}: password change required") 

281 return ORJSONResponse( 

282 status_code=status.HTTP_403_FORBIDDEN, 

283 content={"detail": "Password change required. Please change your password before continuing."}, 

284 headers={"X-Password-Change-Required": "true"}, 

285 ) 

286 

287 # Create access token 

288 access_token, expires_in = await create_access_token(user) 

289 

290 # Return authentication response 

291 return AuthenticationResponse( 

292 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

293 ) # nosec B106 - OAuth2 token type, not a password 

294 

295 except HTTPException: 

296 raise # Re-raise HTTP exceptions as-is (401, 403, etc.) 

297 except Exception as e: 

298 logger.error(f"Login error for {login_request.email}: {e}") 

299 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error") 

300 

301 

302@email_auth_router.post("/register", response_model=AuthenticationResponse) 

303async def register(registration_request: PublicRegistrationRequest, request: Request, db: Session = Depends(get_db)): 

304 """Register a new user account. 

305 

306 This endpoint is controlled by the PUBLIC_REGISTRATION_ENABLED setting. 

307 When disabled (default), returns 403 Forbidden and users can only be 

308 created by administrators via the admin API. 

309 

310 Args: 

311 registration_request: Registration information (email, password, full_name only) 

312 request: FastAPI request object 

313 db: Database session 

314 

315 Returns: 

316 AuthenticationResponse: Access token and user info 

317 

318 Raises: 

319 HTTPException: If registration fails or is disabled 

320 

321 Examples: 

322 Request JSON: 

323 { 

324 "email": "new@example.com", 

325 "password": "secure_password", 

326 "full_name": "New User" 

327 } 

328 """ 

329 # Check if public registration is allowed 

330 if not settings.public_registration_enabled: 

331 logger.warning(f"Registration attempt rejected - public registration disabled: {registration_request.email}") 

332 raise HTTPException( 

333 status_code=status.HTTP_403_FORBIDDEN, 

334 detail="Public registration is disabled. Please contact an administrator to create an account.", 

335 ) 

336 

337 auth_service = EmailAuthService(db) 

338 get_client_ip(request) 

339 get_user_agent(request) 

340 

341 try: 

342 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing 

343 # Security-sensitive fields are hardcoded (not exposed on public schema) 

344 user = await auth_service.create_user( 

345 email=registration_request.email, 

346 password=registration_request.password, 

347 full_name=registration_request.full_name, 

348 is_admin=False, # Regular users cannot self-register as admin 

349 is_active=True, # Public registrations are always active 

350 password_change_required=False, # No forced password change for self-registration 

351 auth_provider="local", 

352 ) 

353 

354 # Create access token 

355 access_token, expires_in = await create_access_token(user) 

356 

357 logger.info(f"New user registered: {user.email}") 

358 

359 return AuthenticationResponse( 

360 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

361 ) # nosec B106 - OAuth2 token type, not a password 

362 

363 except EmailValidationError as e: 

364 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

365 except PasswordValidationError as e: 

366 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

367 except UserExistsError as e: 

368 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) 

369 except Exception as e: 

370 logger.error(f"Registration error for {registration_request.email}: {e}") 

371 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration service error") 

372 

373 

374@email_auth_router.post("/change-password", response_model=SuccessResponse) 

375async def change_password(password_request: ChangePasswordRequest, request: Request, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)): 

376 """Change user's password. 

377 

378 Args: 

379 password_request: Old and new passwords 

380 request: FastAPI request object 

381 current_user: Currently authenticated user 

382 db: Database session 

383 

384 Returns: 

385 SuccessResponse: Success confirmation 

386 

387 Raises: 

388 HTTPException: If password change fails 

389 

390 Examples: 

391 Request JSON (with Bearer token in Authorization header): 

392 { 

393 "old_password": "current_password", 

394 "new_password": "new_secure_password" 

395 } 

396 """ 

397 auth_service = EmailAuthService(db) 

398 ip_address = get_client_ip(request) 

399 user_agent = get_user_agent(request) 

400 

401 try: 

402 # Change password 

403 success = await auth_service.change_password( 

404 email=current_user.email, old_password=password_request.old_password, new_password=password_request.new_password, ip_address=ip_address, user_agent=user_agent 

405 ) 

406 

407 if success: 

408 return SuccessResponse(success=True, message="Password changed successfully") 

409 else: 

410 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password") 

411 

412 except AuthenticationError as e: 

413 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e)) 

414 except PasswordValidationError as e: 

415 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

416 except Exception as e: 

417 logger.error(f"Password change error for {current_user.email}: {e}") 

418 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Password change service error") 

419 

420 

421@email_auth_router.get("/me", response_model=EmailUserResponse) 

422async def get_current_user_profile(current_user: EmailUser = Depends(get_current_user)): 

423 """Get current user's profile information. 

424 

425 Args: 

426 current_user: Currently authenticated user 

427 

428 Returns: 

429 EmailUserResponse: User profile information 

430 

431 Raises: 

432 HTTPException: If user authentication fails 

433 

434 Examples: 

435 >>> # GET /auth/email/me 

436 >>> # Headers: Authorization: Bearer <token> 

437 """ 

438 return EmailUserResponse.from_email_user(current_user) 

439 

440 

441@email_auth_router.get("/events", response_model=list[AuthEventResponse]) 

442async def get_auth_events(limit: int = 50, offset: int = 0, current_user: EmailUser = Depends(get_current_user), db: Session = Depends(get_db)): 

443 """Get authentication events for the current user. 

444 

445 Args: 

446 limit: Maximum number of events to return 

447 offset: Number of events to skip 

448 current_user: Currently authenticated user 

449 db: Database session 

450 

451 Returns: 

452 List[AuthEventResponse]: Authentication events 

453 

454 Raises: 

455 HTTPException: If user authentication fails 

456 

457 Examples: 

458 >>> # GET /auth/email/events?limit=10&offset=0 

459 >>> # Headers: Authorization: Bearer <token> 

460 """ 

461 auth_service = EmailAuthService(db) 

462 

463 try: 

464 events = await auth_service.get_auth_events(email=current_user.email, limit=limit, offset=offset) 

465 

466 return [AuthEventResponse.model_validate(event) for event in events] 

467 

468 except Exception as e: 

469 logger.error(f"Error getting auth events for {current_user.email}: {e}") 

470 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events") 

471 

472 

473# Admin-only endpoints 

474@email_auth_router.get("/admin/users", response_model=Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]) 

475@require_permission("admin.user_management") 

476async def list_users( 

477 cursor: Optional[str] = Query(None, description="Pagination cursor for fetching the next set of results"), 

478 limit: Optional[int] = Query( 

479 None, 

480 ge=0, 

481 le=settings.pagination_max_page_size, 

482 description="Maximum number of users to return. 0 means all (no limit). Default uses pagination_default_page_size.", 

483 ), 

484 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"), 

485 current_user_ctx: dict = Depends(get_current_user_with_permissions), 

486 db: Session = Depends(get_db), 

487) -> Union[CursorPaginatedUsersResponse, List[EmailUserResponse]]: 

488 """List all users (admin only) with cursor-based pagination support. 

489 

490 Args: 

491 cursor: Pagination cursor for fetching the next set of results 

492 limit: Maximum number of users to return. Use 0 for all users (no limit). 

493 If not specified, uses pagination_default_page_size (default: 50). 

494 include_pagination: Whether to include cursor pagination metadata in the response (default: false) 

495 current_user_ctx: Currently authenticated user context with permissions 

496 db: Database session 

497 

498 Returns: 

499 CursorPaginatedUsersResponse with users and nextCursor if include_pagination=true, or 

500 List of users if include_pagination=false 

501 

502 Raises: 

503 HTTPException: If user is not admin 

504 

505 Examples: 

506 >>> # Cursor-based with pagination: GET /auth/email/admin/users?cursor=eyJlbWFpbCI6Li4ufQ&include_pagination=true 

507 >>> # Simple list: GET /auth/email/admin/users 

508 >>> # Headers: Authorization: Bearer <admin_token> 

509 """ 

510 auth_service = EmailAuthService(db) 

511 

512 try: 

513 result = await auth_service.list_users(cursor=cursor, limit=limit) 

514 user_responses = [EmailUserResponse.from_email_user(user) for user in result.data] 

515 

516 if include_pagination: 

517 return CursorPaginatedUsersResponse(users=user_responses, next_cursor=result.next_cursor) 

518 

519 return user_responses 

520 

521 except Exception as e: 

522 logger.error(f"Error listing users: {e}") 

523 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user list") 

524 

525 

526@email_auth_router.get("/admin/events", response_model=list[AuthEventResponse]) 

527@require_permission("admin.user_management") 

528async def list_all_auth_events(limit: int = 100, offset: int = 0, user_email: Optional[str] = None, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

529 """List authentication events for all users (admin only). 

530 

531 Args: 

532 limit: Maximum number of events to return 

533 offset: Number of events to skip 

534 user_email: Filter events by specific user email 

535 current_user_ctx: Currently authenticated user context with permissions 

536 db: Database session 

537 

538 Returns: 

539 List[AuthEventResponse]: Authentication events 

540 

541 Raises: 

542 HTTPException: If user is not admin 

543 

544 Examples: 

545 >>> # GET /auth/email/admin/events?limit=50&user_email=user@example.com 

546 >>> # Headers: Authorization: Bearer <admin_token> 

547 """ 

548 auth_service = EmailAuthService(db) 

549 

550 try: 

551 events = await auth_service.get_auth_events(email=user_email, limit=limit, offset=offset) 

552 

553 return [AuthEventResponse.model_validate(event) for event in events] 

554 

555 except Exception as e: 

556 logger.error(f"Error getting auth events: {e}") 

557 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve authentication events") 

558 

559 

560@email_auth_router.post("/admin/users", response_model=EmailUserResponse, status_code=status.HTTP_201_CREATED) 

561@require_permission("admin.user_management") 

562async def create_user(user_request: AdminCreateUserRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

563 """Create a new user account (admin only). 

564 

565 Args: 

566 user_request: User creation information 

567 current_user_ctx: Currently authenticated user context with permissions 

568 db: Database session 

569 

570 Returns: 

571 EmailUserResponse: Created user information 

572 

573 Raises: 

574 HTTPException: If user creation fails 

575 

576 Examples: 

577 Request JSON: 

578 { 

579 "email": "newuser@example.com", 

580 "password": "secure_password", 

581 "full_name": "New User", 

582 "is_admin": false 

583 } 

584 """ 

585 auth_service = EmailAuthService(db) 

586 

587 try: 

588 # Password is required by schema (str, not Optional) — Pydantic returns 422 if missing 

589 # Create new user with all fields from request 

590 user = await auth_service.create_user( 

591 email=user_request.email, 

592 password=user_request.password, 

593 full_name=user_request.full_name, 

594 is_admin=user_request.is_admin, 

595 is_active=user_request.is_active, 

596 password_change_required=user_request.password_change_required, 

597 auth_provider="local", 

598 granted_by=current_user_ctx.get("email"), 

599 ) 

600 

601 # If the user was created with the default password, optionally force password change 

602 if ( 

603 settings.password_change_enforcement_enabled 

604 and getattr(settings, "require_password_change_for_default_password", True) 

605 and user_request.password == settings.default_user_password.get_secret_value() 

606 ): # nosec B105 

607 user.password_change_required = True 

608 db.commit() 

609 

610 logger.info(f"Admin {current_user_ctx['email']} created user: {user.email}") 

611 

612 db.commit() 

613 db.close() 

614 return EmailUserResponse.from_email_user(user) 

615 

616 except EmailValidationError as e: 

617 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

618 except PasswordValidationError as e: 

619 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

620 except UserExistsError as e: 

621 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) 

622 except Exception as e: 

623 logger.error(f"Admin user creation error: {e}") 

624 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="User creation failed") 

625 

626 

627@email_auth_router.get("/admin/users/{user_email}", response_model=EmailUserResponse) 

628@require_permission("admin.user_management") 

629async def get_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

630 """Get user by email (admin only). 

631 

632 Args: 

633 user_email: Email of user to retrieve 

634 current_user_ctx: Currently authenticated user context with permissions 

635 db: Database session 

636 

637 Returns: 

638 EmailUserResponse: User information 

639 

640 Raises: 

641 HTTPException: If user not found 

642 """ 

643 auth_service = EmailAuthService(db) 

644 

645 try: 

646 user = await auth_service.get_user_by_email(user_email) 

647 if not user: 

648 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") 

649 

650 return EmailUserResponse.from_email_user(user) 

651 

652 except HTTPException: 

653 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.) 

654 except Exception as e: 

655 logger.error(f"Error retrieving user {user_email}: {e}") 

656 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user") 

657 

658 

659@email_auth_router.put("/admin/users/{user_email}", response_model=EmailUserResponse) 

660@require_permission("admin.user_management") 

661async def update_user(user_email: str, user_request: AdminUserUpdateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

662 """Update user information (admin only). 

663 

664 Args: 

665 user_email: Email of user to update 

666 user_request: Updated user information 

667 current_user_ctx: Currently authenticated user context with permissions 

668 db: Database session 

669 

670 Returns: 

671 EmailUserResponse: Updated user information 

672 

673 Raises: 

674 HTTPException: If user not found or update fails 

675 """ 

676 auth_service = EmailAuthService(db) 

677 

678 try: 

679 user = await auth_service.update_user( 

680 email=user_email, 

681 full_name=user_request.full_name, 

682 is_admin=user_request.is_admin, 

683 is_active=user_request.is_active, 

684 password_change_required=user_request.password_change_required, 

685 password=user_request.password, 

686 admin_origin_source="api", 

687 ) 

688 

689 logger.info(f"Admin {current_user_ctx['email']} updated user: {user.email}") 

690 

691 result = EmailUserResponse.from_email_user(user) 

692 return result 

693 

694 except ValueError as e: 

695 error_msg = str(e) 

696 if "not found" in error_msg.lower(): 

697 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") 

698 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg) 

699 except PasswordValidationError as e: 

700 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

701 except Exception as e: 

702 logger.error(f"Error updating user {user_email}: {e}") 

703 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user") 

704 

705 

706@email_auth_router.delete("/admin/users/{user_email}", response_model=SuccessResponse) 

707@require_permission("admin.user_management") 

708async def delete_user(user_email: str, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

709 """Delete/deactivate user (admin only). 

710 

711 Args: 

712 user_email: Email of user to delete 

713 current_user_ctx: Currently authenticated user context with permissions 

714 db: Database session 

715 

716 Returns: 

717 SuccessResponse: Success confirmation 

718 

719 Raises: 

720 HTTPException: If user not found or deletion fails 

721 """ 

722 auth_service = EmailAuthService(db) 

723 

724 try: 

725 # Prevent admin from deleting themselves 

726 if user_email == current_user_ctx["email"]: 

727 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account") 

728 

729 # Prevent deleting the last active admin user 

730 if await auth_service.is_last_active_admin(user_email): 

731 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete the last remaining admin user") 

732 

733 # Hard delete using auth service 

734 await auth_service.delete_user(user_email) 

735 

736 logger.info(f"Admin {current_user_ctx['email']} deleted user: {user_email}") 

737 

738 db.commit() 

739 db.close() 

740 return SuccessResponse(success=True, message=f"User {user_email} has been deleted") 

741 

742 except HTTPException: 

743 raise # Re-raise HTTP exceptions as-is (401, 403, 404, etc.) 

744 except Exception as e: 

745 logger.error(f"Error deleting user {user_email}: {e}") 

746 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete user")