Coverage for mcpgateway / services / email_auth_service.py: 99%

765 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/email_auth_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Email Authentication Service. 

8This module provides email-based user authentication services including 

9user creation, authentication, password management, and security features. 

10 

11Examples: 

12 Basic usage (requires async context): 

13 from mcpgateway.services.email_auth_service import EmailAuthService 

14 from mcpgateway.db import SessionLocal 

15 

16 with SessionLocal() as db: 

17 service = EmailAuthService(db) 

18 # Use in async context: 

19 # user = await service.create_user("test@example.com", "password123") 

20 # authenticated = await service.authenticate_user("test@example.com", "password123") 

21""" 

22 

23# Standard 

24import asyncio 

25import base64 

26from dataclasses import dataclass 

27from datetime import datetime, timedelta, timezone 

28import hashlib 

29import hmac 

30import re 

31import secrets 

32import time 

33from typing import Optional 

34import urllib.parse 

35import warnings 

36 

37# Third-Party 

38import orjson 

39from sqlalchemy import and_, delete, desc, func, or_, select 

40from sqlalchemy.exc import IntegrityError 

41from sqlalchemy.orm import Session 

42 

43# First-Party 

44from mcpgateway.common.validators import SecurityValidator 

45from mcpgateway.config import settings 

46from mcpgateway.db import ( 

47 EmailAuthEvent, 

48 EmailTeam, 

49 EmailTeamInvitation, 

50 EmailTeamJoinRequest, 

51 EmailTeamMember, 

52 EmailTeamMemberHistory, 

53 EmailUser, 

54 PasswordResetToken, 

55 PendingUserApproval, 

56 Role, 

57 SSOAuthSession, 

58 TokenRevocation, 

59 UserRole, 

60 utc_now, 

61) 

62from mcpgateway.schemas import PaginationLinks, PaginationMeta 

63from mcpgateway.services.argon2_service import Argon2PasswordService 

64from mcpgateway.services.email_notification_service import AuthEmailNotificationService 

65from mcpgateway.services.logging_service import LoggingService 

66from mcpgateway.services.metrics import password_reset_completions_counter, password_reset_requests_counter 

67from mcpgateway.utils.pagination import unified_paginate 

68 

69# Initialize logging 

70logging_service = LoggingService() 

71logger = logging_service.get_logger(__name__) 

72 

73_GET_ALL_USERS_LIMIT = 10000 

74_DUMMY_ARGON2_HASH = "$argon2id$v=19$m=65536,t=3,p=1$9x/nTs9D0R97+BI7BWP2Tg$V/40qCuaGh4i+94HpGpxJESEVs3IDpLzUqtNqRPuty4" 

75 

76 

77@dataclass(frozen=True) 

78class UsersListResult: 

79 """Result for list_users queries.""" 

80 

81 data: list[EmailUser] 

82 next_cursor: Optional[str] = None 

83 pagination: Optional[PaginationMeta] = None 

84 links: Optional[PaginationLinks] = None 

85 

86 

87@dataclass(frozen=True) 

88class PasswordResetRequestResult: 

89 """Result for forgot-password requests.""" 

90 

91 rate_limited: bool 

92 email_sent: bool 

93 

94 

95class EmailValidationError(Exception): 

96 """Raised when email format is invalid. 

97 

98 Examples: 

99 >>> try: 

100 ... raise EmailValidationError("Invalid email format") 

101 ... except EmailValidationError as e: 

102 ... str(e) 

103 'Invalid email format' 

104 """ 

105 

106 

107class PasswordValidationError(Exception): 

108 """Raised when password doesn't meet policy requirements. 

109 

110 Examples: 

111 >>> try: 

112 ... raise PasswordValidationError("Password too short") 

113 ... except PasswordValidationError as e: 

114 ... str(e) 

115 'Password too short' 

116 """ 

117 

118 

119class UserExistsError(Exception): 

120 """Raised when attempting to create a user that already exists. 

121 

122 Examples: 

123 >>> try: 

124 ... raise UserExistsError("User already exists") 

125 ... except UserExistsError as e: 

126 ... str(e) 

127 'User already exists' 

128 """ 

129 

130 

131class AuthenticationError(Exception): 

132 """Raised when authentication fails. 

133 

134 Examples: 

135 >>> try: 

136 ... raise AuthenticationError("Invalid credentials") 

137 ... except AuthenticationError as e: 

138 ... str(e) 

139 'Invalid credentials' 

140 """ 

141 

142 

143class EmailAuthService: 

144 """Service for email-based user authentication. 

145 

146 This service handles user registration, authentication, password management, 

147 and security features like account lockout and failed attempt tracking. 

148 

149 Attributes: 

150 db (Session): Database session 

151 password_service (Argon2PasswordService): Password hashing service 

152 

153 Examples: 

154 >>> from mcpgateway.db import SessionLocal 

155 >>> with SessionLocal() as db: 

156 ... service = EmailAuthService(db) 

157 ... # Service is ready to use 

158 """ 

159 

160 get_all_users_deprecated_warned = False 

161 

162 def __init__(self, db: Session): 

163 """Initialize the email authentication service. 

164 

165 Args: 

166 db: SQLAlchemy database session 

167 """ 

168 self.db = db 

169 self.password_service = Argon2PasswordService() 

170 self.email_notification_service = AuthEmailNotificationService() 

171 self._role_service = None 

172 logger.debug("EmailAuthService initialized") 

173 

174 @property 

175 def role_service(self): 

176 """Lazy-initialized RoleService to avoid circular imports. 

177 

178 Returns: 

179 RoleService: Instance of RoleService 

180 """ 

181 if self._role_service is None: 

182 # First-Party 

183 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

184 

185 self._role_service = RoleService(self.db) 

186 return self._role_service 

187 

188 def validate_email(self, email: str) -> bool: 

189 """Validate email address format. 

190 

191 Args: 

192 email: Email address to validate 

193 

194 Returns: 

195 bool: True if email is valid 

196 

197 Raises: 

198 EmailValidationError: If email format is invalid 

199 

200 Examples: 

201 >>> service = EmailAuthService(None) 

202 >>> service.validate_email("user@example.com") 

203 True 

204 >>> service.validate_email("test.user+tag@domain.co.uk") 

205 True 

206 >>> service.validate_email("user123@test-domain.com") 

207 True 

208 >>> try: 

209 ... service.validate_email("invalid-email") 

210 ... except EmailValidationError as e: 

211 ... "Invalid email format" in str(e) 

212 True 

213 >>> try: 

214 ... service.validate_email("") 

215 ... except EmailValidationError as e: 

216 ... "Email is required" in str(e) 

217 True 

218 >>> try: 

219 ... service.validate_email("user@") 

220 ... except EmailValidationError as e: 

221 ... "Invalid email format" in str(e) 

222 True 

223 >>> try: 

224 ... service.validate_email("@domain.com") 

225 ... except EmailValidationError as e: 

226 ... "Invalid email format" in str(e) 

227 True 

228 >>> try: 

229 ... service.validate_email("user@domain") 

230 ... except EmailValidationError as e: 

231 ... "Invalid email format" in str(e) 

232 True 

233 >>> try: 

234 ... service.validate_email("a" * 250 + "@domain.com") 

235 ... except EmailValidationError as e: 

236 ... "Email address too long" in str(e) 

237 True 

238 >>> try: 

239 ... service.validate_email(None) 

240 ... except EmailValidationError as e: 

241 ... "Email is required" in str(e) 

242 True 

243 """ 

244 if not email or not isinstance(email, str): 

245 raise EmailValidationError("Email is required and must be a string") 

246 

247 # Basic email regex pattern 

248 email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" 

249 

250 if not re.match(email_pattern, email): 

251 raise EmailValidationError("Invalid email format") 

252 

253 if len(email) > 255: 

254 raise EmailValidationError("Email address too long (max 255 characters)") 

255 

256 return True 

257 

258 def validate_password(self, password: str) -> bool: 

259 """Validate password against policy requirements. 

260 

261 Args: 

262 password: Password to validate 

263 

264 Returns: 

265 bool: True if password meets policy 

266 

267 Raises: 

268 PasswordValidationError: If password doesn't meet requirements 

269 

270 Examples: 

271 >>> service = EmailAuthService(None) 

272 >>> service.validate_password("Password123!") # Meets all requirements 

273 True 

274 >>> service.validate_password("ValidPassword123!") 

275 True 

276 >>> service.validate_password("Shortpass!") # 8+ chars with requirements 

277 True 

278 >>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!") 

279 True 

280 >>> try: 

281 ... service.validate_password("") 

282 ... except PasswordValidationError as e: 

283 ... "Password is required" in str(e) 

284 True 

285 >>> try: 

286 ... service.validate_password(None) 

287 ... except PasswordValidationError as e: 

288 ... "Password is required" in str(e) 

289 True 

290 >>> try: 

291 ... service.validate_password("short") # Only 5 chars, should fail with default min_length=8 

292 ... except PasswordValidationError as e: 

293 ... "characters long" in str(e) 

294 True 

295 """ 

296 if not password: 

297 raise PasswordValidationError("Password is required") 

298 

299 # Respect global toggle for password policy 

300 if not getattr(settings, "password_policy_enabled", True): 

301 return True 

302 

303 # Get password policy settings 

304 min_length = getattr(settings, "password_min_length", 8) 

305 require_uppercase = getattr(settings, "password_require_uppercase", False) 

306 require_lowercase = getattr(settings, "password_require_lowercase", False) 

307 require_numbers = getattr(settings, "password_require_numbers", False) 

308 require_special = getattr(settings, "password_require_special", False) 

309 

310 if len(password) < min_length: 

311 raise PasswordValidationError(f"Password must be at least {min_length} characters long") 

312 

313 if require_uppercase and not re.search(r"[A-Z]", password): 

314 raise PasswordValidationError("Password must contain at least one uppercase letter") 

315 

316 if require_lowercase and not re.search(r"[a-z]", password): 

317 raise PasswordValidationError("Password must contain at least one lowercase letter") 

318 

319 if require_numbers and not re.search(r"[0-9]", password): 

320 raise PasswordValidationError("Password must contain at least one number") 

321 

322 if require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): 

323 raise PasswordValidationError("Password must contain at least one special character") 

324 

325 return True 

326 

327 @staticmethod 

328 def _hash_reset_token(token: str) -> str: 

329 """Hash a plaintext password-reset token using SHA-256. 

330 

331 Args: 

332 token: Plaintext reset token. 

333 

334 Returns: 

335 str: Hex-encoded SHA-256 digest. 

336 """ 

337 return hashlib.sha256(token.encode("utf-8")).hexdigest() 

338 

339 @staticmethod 

340 def _minimum_reset_response_seconds() -> float: 

341 """Get minimum forgot-password response duration. 

342 

343 Returns: 

344 float: Minimum response duration in seconds. 

345 """ 

346 min_ms = max(0, int(getattr(settings, "password_reset_min_response_ms", 250))) 

347 return min_ms / 1000.0 

348 

349 @staticmethod 

350 def _minimum_login_failure_seconds() -> float: 

351 """Get minimum failed-login response duration. 

352 

353 Returns: 

354 float: Minimum failure response duration in seconds. 

355 """ 

356 min_ms = max(0, int(getattr(settings, "failed_login_min_response_ms", 250))) 

357 return min_ms / 1000.0 

358 

359 async def _apply_failed_login_floor(self, start_time: float) -> None: 

360 """Apply minimum failed-login response duration. 

361 

362 Args: 

363 start_time: Monotonic timestamp when login processing started. 

364 """ 

365 remaining = self._minimum_login_failure_seconds() - (time.monotonic() - start_time) 

366 if remaining > 0: 

367 await asyncio.sleep(remaining) 

368 

369 async def _verify_dummy_password_for_timing(self, password: str) -> None: 

370 """Run dummy Argon2 verification to reduce observable timing differences. 

371 

372 Args: 

373 password: User-supplied password candidate. 

374 """ 

375 try: 

376 await self.password_service.verify_password_async(password, _DUMMY_ARGON2_HASH) 

377 except Exception as exc: # nosec B110 

378 logger.debug("Dummy password verification failed: %s", exc) 

379 

380 @staticmethod 

381 def _build_forgot_password_url() -> str: 

382 """Build the absolute forgot-password page URL. 

383 

384 Returns: 

385 str: Absolute forgot-password URL. 

386 """ 

387 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/") 

388 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/") 

389 return f"{app_domain}{root_path}/admin/forgot-password" 

390 

391 @staticmethod 

392 def _build_reset_password_url(token: str) -> str: 

393 """Build the absolute reset-password URL for a token. 

394 

395 Args: 

396 token: Plaintext reset token. 

397 

398 Returns: 

399 str: Absolute reset-password URL. 

400 """ 

401 safe_token = urllib.parse.quote(token, safe="") 

402 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/") 

403 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/") 

404 return f"{app_domain}{root_path}/admin/reset-password/{safe_token}" 

405 

406 async def _invalidate_user_auth_cache(self, email: str) -> None: 

407 """Invalidate cached authentication data for a user. 

408 

409 Args: 

410 email: User email for cache invalidation. 

411 """ 

412 try: 

413 # First-Party 

414 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

415 

416 await asyncio.wait_for(asyncio.shield(auth_cache.invalidate_user(email)), timeout=5.0) 

417 except asyncio.TimeoutError: 

418 logger.warning("Auth cache invalidation timed out for %s - continuing", email) 

419 except Exception as cache_error: # nosec B110 

420 logger.debug("Failed to invalidate auth cache for %s: %s", email, cache_error) 

421 

422 async def _invalidate_deleted_user_auth_caches(self, email: str) -> None: 

423 """Invalidate all auth-cache entries affected by permanent user deletion. 

424 

425 Args: 

426 email: User email for cache invalidation. 

427 """ 

428 try: 

429 # First-Party 

430 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

431 

432 results = await asyncio.wait_for( 

433 asyncio.gather( 

434 auth_cache.invalidate_user(email), 

435 auth_cache.invalidate_user_teams(email), 

436 auth_cache.invalidate_team_membership(email), 

437 return_exceptions=True, 

438 ), 

439 timeout=5.0, 

440 ) 

441 for result in results: 

442 if isinstance(result, Exception): 

443 logger.debug("Failed to invalidate delete-user auth cache for %s: %s", email, result) 

444 except asyncio.TimeoutError: 

445 logger.warning("Delete-user auth cache invalidation timed out for %s - continuing", email) 

446 except Exception as cache_error: # nosec B110 

447 logger.debug("Failed to invalidate delete-user auth cache for %s: %s", email, cache_error) 

448 

449 def _log_auth_event( 

450 self, 

451 event_type: str, 

452 success: bool, 

453 user_email: Optional[str], 

454 ip_address: Optional[str] = None, 

455 user_agent: Optional[str] = None, 

456 failure_reason: Optional[str] = None, 

457 details: Optional[dict] = None, 

458 ) -> None: 

459 """Persist a custom authentication/security event. 

460 

461 Args: 

462 event_type: Event type identifier. 

463 success: Whether the event succeeded. 

464 user_email: Related user email, if available. 

465 ip_address: Source IP address. 

466 user_agent: Source user agent string. 

467 failure_reason: Failure detail when `success` is False. 

468 details: Additional structured event payload. 

469 """ 

470 try: 

471 event = EmailAuthEvent( 

472 user_email=user_email, 

473 event_type=event_type, 

474 success=success, 

475 ip_address=ip_address, 

476 user_agent=user_agent, 

477 failure_reason=failure_reason, 

478 details=orjson.dumps(details).decode() if details else None, 

479 ) 

480 self.db.add(event) 

481 self.db.commit() 

482 except Exception as exc: 

483 self.db.rollback() 

484 logger.warning("Failed to persist auth event %s for %s: %s", event_type, user_email, exc) 

485 

486 def _recent_password_reset_request_count(self, email: str, now: datetime) -> int: 

487 """Count recent password-reset requests for rate limiting. 

488 

489 Args: 

490 email: Email to count requests for. 

491 now: Current UTC timestamp. 

492 

493 Returns: 

494 int: Number of reset requests in the current rate-limit window. 

495 """ 

496 window_minutes = int(getattr(settings, "password_reset_rate_window_minutes", 15)) 

497 window_start = now - timedelta(minutes=window_minutes) 

498 stmt = ( 

499 select(func.count(EmailAuthEvent.id)) # pylint: disable=not-callable 

500 .where(EmailAuthEvent.event_type == "PASSWORD_RESET_REQUESTED") 

501 .where(EmailAuthEvent.user_email == email) 

502 .where(EmailAuthEvent.timestamp >= window_start) 

503 ) 

504 count = self.db.execute(stmt).scalar() 

505 return int(count or 0) 

506 

507 async def get_user_by_email(self, email: str) -> Optional[EmailUser]: 

508 """Get user by email address. 

509 

510 Args: 

511 email: Email address to look up 

512 

513 Returns: 

514 EmailUser or None if not found 

515 

516 Examples: 

517 # Assuming database has user "test@example.com" 

518 # user = await service.get_user_by_email("test@example.com") 

519 # user.email if user else None # Returns: 'test@example.com' 

520 """ 

521 try: 

522 stmt = select(EmailUser).where(EmailUser.email == email.lower()) 

523 result = self.db.execute(stmt) 

524 user = result.scalar_one_or_none() 

525 return user 

526 except Exception as e: 

527 logger.error(f"Error getting user by email {SecurityValidator.sanitize_log_message(email)}: {e}") 

528 return None 

529 

530 async def create_user( 

531 self, 

532 email: str, 

533 password: str, 

534 full_name: Optional[str] = None, 

535 is_admin: bool = False, 

536 is_active: bool = True, 

537 password_change_required: bool = False, 

538 auth_provider: str = "local", 

539 skip_password_validation: bool = False, 

540 granted_by: Optional[str] = None, 

541 skip_onboarding: bool = False, 

542 ) -> EmailUser: 

543 """Create a new user with email authentication. 

544 

545 Args: 

546 email: User's email address (primary key) 

547 password: Plain text password (will be hashed) 

548 full_name: Optional full name for display 

549 is_admin: Whether user has admin privileges 

550 is_active: Whether user account is active (default: True) 

551 password_change_required: Whether user must change password on next login (default: False) 

552 auth_provider: Authentication provider ('local', 'github', etc.) 

553 skip_password_validation: Skip password policy validation (for bootstrap) 

554 granted_by: Email of user creating this user (for role assignment audit trail) 

555 skip_onboarding: Skip personal team creation, role assignment, and 

556 success-path registration event logging (for service accounts / 

557 synthetic users). Unexpected-failure audit events (the 

558 ``except Exception`` path) are always recorded regardless of 

559 this flag. Duplicate-user rejections (``UserExistsError``, 

560 ``IntegrityError``) are not audited by design. 

561 

562 Returns: 

563 EmailUser: The created user object 

564 

565 Raises: 

566 EmailValidationError: If email format is invalid 

567 PasswordValidationError: If password doesn't meet policy 

568 UserExistsError: If user already exists 

569 

570 Examples: 

571 # user = await service.create_user( 

572 # email="new@example.com", 

573 # password="secure123", 

574 # full_name="New User", 

575 # is_active=True, 

576 # password_change_required=False 

577 # ) 

578 # user.email # Returns: 'new@example.com' 

579 # user.full_name # Returns: 'New User' 

580 # user.is_active # Returns: True 

581 """ 

582 # Normalize email to lowercase 

583 email = email.lower().strip() 

584 

585 # Validate inputs 

586 self.validate_email(email) 

587 if not skip_password_validation: 

588 self.validate_password(password) 

589 

590 # Hash before the first DB read so PgBouncer transaction pooling does not 

591 # hold an idle transaction open across the async hashing call. 

592 # Callers that skip password validation with an empty password (e.g. 

593 # ensure_user_exists for service accounts) get a non-loginable sentinel; 

594 # all other callers go through hash_password_async which raises 

595 # ValueError on empty input. 

596 if not password and skip_password_validation: 

597 password_hash = "!disabled" # nosec B105 — not a valid Argon2 hash, verify_password always rejects 

598 else: 

599 password_hash = await self.password_service.hash_password_async(password) 

600 

601 # Check if user already exists 

602 existing_user = await self.get_user_by_email(email) 

603 if existing_user: 

604 raise UserExistsError(f"User with email {email} already exists") 

605 

606 # Create new user (record password change timestamp) 

607 user = EmailUser( 

608 email=email, 

609 password_hash=password_hash, 

610 full_name=full_name, 

611 is_admin=is_admin, 

612 is_active=is_active, 

613 password_change_required=password_change_required, 

614 auth_provider=auth_provider, 

615 password_changed_at=utc_now(), 

616 admin_origin="api" if is_admin else None, 

617 ) 

618 

619 # Admin-created users are implicitly email-verified (the admin vouched for them) 

620 if granted_by: 

621 user.email_verified_at = utc_now() 

622 

623 try: 

624 self.db.add(user) 

625 self.db.commit() 

626 self.db.refresh(user) 

627 

628 logger.info(f"Created new user: {SecurityValidator.sanitize_log_message(email)}") 

629 

630 if not skip_onboarding: 

631 # Create personal team first if enabled (needed for team-scoped role assignment) 

632 personal_team_id = None 

633 if getattr(settings, "auto_create_personal_teams", True): 

634 try: 

635 # Import here to avoid circular imports 

636 # First-Party 

637 from mcpgateway.services.personal_team_service import PersonalTeamService # pylint: disable=import-outside-toplevel 

638 

639 personal_team_service = PersonalTeamService(self.db) 

640 personal_team = await personal_team_service.create_personal_team(user) 

641 personal_team_id = personal_team.id # Get team_id directly from created team 

642 logger.info(f"Created personal team '{personal_team.name}' (ID: {personal_team_id}) for user {SecurityValidator.sanitize_log_message(email)}") 

643 except Exception as e: 

644 logger.warning(f"Failed to create personal team for {SecurityValidator.sanitize_log_message(email)}: {e}") 

645 # Don't fail user creation if personal team creation fails 

646 

647 # Auto-assign dual roles using RoleService (after personal team creation) 

648 try: 

649 granter = granted_by or email # Use granted_by if provided, otherwise self-granted 

650 

651 # Determine global role based on admin status 

652 global_role_name = settings.default_admin_role if is_admin else settings.default_user_role 

653 global_role = await self.role_service.get_role_by_name(global_role_name, "global") 

654 

655 if global_role: 

656 try: 

657 await self.role_service.assign_role_to_user(user_email=email, role_id=global_role.id, scope="global", scope_id=None, granted_by=granter) 

658 logger.info(f"Assigned {global_role_name} role (global scope) to user {SecurityValidator.sanitize_log_message(email)}") 

659 except ValueError as e: 

660 logger.warning(f"Could not assign {global_role_name} role to {SecurityValidator.sanitize_log_message(email)}: {e}") 

661 else: 

662 logger.warning(f"{global_role_name} role not found. User {SecurityValidator.sanitize_log_message(email)} created without global role.") 

663 

664 # Assign team owner role with team scope (if personal team exists) 

665 if personal_team_id: 

666 team_owner_role_name = settings.default_team_owner_role 

667 team_owner_role = await self.role_service.get_role_by_name(team_owner_role_name, "team") 

668 

669 if team_owner_role: 

670 try: 

671 await self.role_service.assign_role_to_user(user_email=email, role_id=team_owner_role.id, scope="team", scope_id=personal_team_id, granted_by=granter) 

672 logger.info(f"Assigned {team_owner_role_name} role (team scope: {personal_team_id}) to user {SecurityValidator.sanitize_log_message(email)}") 

673 except ValueError as e: 

674 logger.warning(f"Could not assign {team_owner_role_name} role to {SecurityValidator.sanitize_log_message(email)}: {e}") 

675 else: 

676 logger.warning(f"{team_owner_role_name} role not found. User {SecurityValidator.sanitize_log_message(email)} created without team owner role.") 

677 

678 except Exception as role_error: 

679 logger.error(f"Failed to assign roles to user {SecurityValidator.sanitize_log_message(email)}: {role_error}") 

680 # Don't fail user creation if role assignment fails 

681 # User can be assigned roles manually later 

682 

683 # Log registration event 

684 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=True) 

685 self.db.add(registration_event) 

686 self.db.commit() 

687 

688 return user 

689 

690 except IntegrityError as e: 

691 self.db.rollback() 

692 logger.error(f"Database error creating user {SecurityValidator.sanitize_log_message(email)}: {e}") 

693 raise UserExistsError(f"User with email {email} already exists") from e 

694 except Exception as e: 

695 self.db.rollback() 

696 logger.error(f"Unexpected error creating user {SecurityValidator.sanitize_log_message(email)}: {e}") 

697 

698 # Log failed registration 

699 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=False, failure_reason=str(e)) 

700 self.db.add(registration_event) 

701 self.db.commit() 

702 

703 raise 

704 

705 async def ensure_user_exists( 

706 self, 

707 email: str, 

708 full_name: Optional[str] = None, 

709 is_admin: bool = False, 

710 auth_provider: str = "local", 

711 granted_by: Optional[str] = None, 

712 skip_onboarding: bool = False, 

713 ) -> tuple[EmailUser, bool]: 

714 """Idempotent user creation — returns existing user or creates a new one. 

715 

716 Args: 

717 email: User's email address 

718 full_name: Optional display name 

719 is_admin: Whether user has admin privileges 

720 auth_provider: Authentication provider 

721 granted_by: Email of creating user (for audit trail) 

722 skip_onboarding: Skip personal team, role assignment, and success-path 

723 audit events (unexpected-failure auditing is always recorded) 

724 

725 Returns: 

726 Tuple of (user, created) where created is True if the user was newly created. 

727 

728 Raises: 

729 EmailValidationError: If the email format is invalid. 

730 UserExistsError: If a race-condition insert fails and re-fetch still returns None. 

731 """ 

732 email = email.lower().strip() 

733 existing = await self.get_user_by_email(email) 

734 if existing: 

735 return existing, False 

736 

737 try: 

738 user = await self.create_user( 

739 email=email, 

740 password="", # nosec B106 — intentionally empty for service accounts 

741 full_name=full_name, 

742 is_admin=is_admin, 

743 auth_provider=auth_provider, 

744 skip_password_validation=True, 

745 granted_by=granted_by, 

746 skip_onboarding=skip_onboarding, 

747 password_change_required=True, 

748 ) 

749 return user, True 

750 except UserExistsError: 

751 # Race condition: another request created the user between our check and insert 

752 logger.info(f"Race-condition user creation for {SecurityValidator.sanitize_log_message(email)}, re-fetching existing record") 

753 user = await self.get_user_by_email(email) 

754 if user: 

755 return user, False 

756 raise # Should not happen, but don't swallow the error 

757 

758 async def authenticate_user(self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[EmailUser]: 

759 """Authenticate a user with email and password. 

760 

761 Args: 

762 email: User's email address 

763 password: Plain text password 

764 ip_address: Client IP address for logging 

765 user_agent: Client user agent for logging 

766 

767 Returns: 

768 EmailUser if authentication successful, None otherwise 

769 

770 Examples: 

771 # user = await service.authenticate_user("user@example.com", "correct_password") 

772 # user.email if user else None # Returns: 'user@example.com' 

773 # await service.authenticate_user("user@example.com", "wrong_password") # Returns: None 

774 """ 

775 email = email.lower().strip() 

776 start_time = time.monotonic() 

777 

778 # Get user from database 

779 user = await self.get_user_by_email(email) 

780 

781 # Track authentication attempt 

782 auth_success = False 

783 failure_reason = None 

784 

785 try: 

786 if not user: 

787 failure_reason = "User not found" 

788 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: user not found") 

789 await self._verify_dummy_password_for_timing(password) 

790 await self._apply_failed_login_floor(start_time) 

791 return None 

792 

793 if not user.is_active: 

794 failure_reason = "Account is disabled" 

795 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: account disabled") 

796 await self._verify_dummy_password_for_timing(password) 

797 await self._apply_failed_login_floor(start_time) 

798 return None 

799 

800 is_protected_admin = user.is_admin and settings.protect_all_admins 

801 

802 # Enforce lockout for all accounts. Protected admins are allowed 

803 # to continue attempting login (feature-flagged via protect_all_admins) 

804 # but their failed attempts are still tracked for audit purposes. 

805 if user.is_account_locked() and not is_protected_admin: 

806 failure_reason = "Account is locked" 

807 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: account locked") 

808 await self._verify_dummy_password_for_timing(password) 

809 await self._apply_failed_login_floor(start_time) 

810 return None 

811 

812 # Verify password 

813 if not await self.password_service.verify_password_async(password, user.password_hash): 

814 failure_reason = "Invalid password" 

815 

816 # Always increment failed attempts — including for protected admins 

817 max_attempts = getattr(settings, "max_failed_login_attempts", 5) 

818 lockout_duration = getattr(settings, "account_lockout_duration_minutes", 30) 

819 

820 is_locked = user.increment_failed_attempts(max_attempts, lockout_duration) 

821 

822 if is_locked: 

823 logger.warning(f"Account locked for {SecurityValidator.sanitize_log_message(email)} after {max_attempts} failed attempts") 

824 failure_reason = "Account locked due to too many failed attempts" 

825 lockout_notifications_enabled = getattr(settings, "account_lockout_notification_enabled", True) 

826 if isinstance(lockout_notifications_enabled, bool) and lockout_notifications_enabled: 

827 locked_until_iso = user.locked_until.isoformat() if user.locked_until else "unknown" 

828 try: 

829 await self.email_notification_service.send_account_lockout_email( 

830 to_email=user.email, 

831 full_name=user.full_name, 

832 locked_until_iso=locked_until_iso, 

833 reset_url=self._build_forgot_password_url(), 

834 ) 

835 except Exception as email_exc: 

836 logger.warning("Failed to send lockout notification for %s: %s", email, email_exc) 

837 self._log_auth_event( 

838 event_type="ACCOUNT_LOCKED", 

839 success=True, 

840 user_email=email, 

841 ip_address=ip_address, 

842 user_agent=user_agent, 

843 details={"locked_until": user.locked_until.isoformat() if user.locked_until else None}, 

844 ) 

845 

846 self.db.commit() 

847 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: invalid password") 

848 await self._apply_failed_login_floor(start_time) 

849 return None 

850 

851 # Authentication successful 

852 user.reset_failed_attempts() 

853 self.db.commit() 

854 

855 auth_success = True 

856 logger.info(f"Authentication successful for {SecurityValidator.sanitize_log_message(email)}") 

857 

858 return user 

859 

860 finally: 

861 # Log authentication event 

862 auth_event = EmailAuthEvent.create_login_attempt(user_email=email, success=auth_success, ip_address=ip_address, user_agent=user_agent, failure_reason=failure_reason) 

863 self.db.add(auth_event) 

864 self.db.commit() 

865 

866 async def request_password_reset(self, email: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetRequestResult: 

867 """Create a password reset token and send reset email when user exists. 

868 

869 The function intentionally returns generic outcomes to avoid account 

870 enumeration while still allowing rate-limit enforcement. 

871 

872 Args: 

873 email: User email requesting password reset. 

874 ip_address: Source IP address. 

875 user_agent: Source user agent string. 

876 

877 Returns: 

878 PasswordResetRequestResult: Reset request processing outcome. 

879 """ 

880 start_time = time.monotonic() 

881 normalized_email = (email or "").lower().strip() 

882 now = utc_now() 

883 _ = self._hash_reset_token(secrets.token_urlsafe(32)) 

884 

885 rate_limit = int(getattr(settings, "password_reset_rate_limit", 5)) 

886 is_rate_limited = bool(normalized_email and self._recent_password_reset_request_count(normalized_email, now) >= rate_limit) 

887 if is_rate_limited: 

888 password_reset_requests_counter.labels(outcome="rate_limited").inc() 

889 self._log_auth_event( 

890 event_type="PASSWORD_RESET_RATE_LIMITED", 

891 success=False, 

892 user_email=normalized_email or None, 

893 ip_address=ip_address, 

894 user_agent=user_agent, 

895 ) 

896 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time) 

897 if remaining > 0: 

898 await asyncio.sleep(remaining) 

899 return PasswordResetRequestResult(rate_limited=True, email_sent=False) 

900 

901 user = await self.get_user_by_email(normalized_email) if normalized_email else None 

902 self._log_auth_event( 

903 event_type="PASSWORD_RESET_REQUESTED", 

904 success=True, 

905 user_email=normalized_email or None, 

906 ip_address=ip_address, 

907 user_agent=user_agent, 

908 ) 

909 

910 email_sent = False 

911 if user and user.is_active: 

912 token_plaintext = secrets.token_urlsafe(48) 

913 token_hash = self._hash_reset_token(token_plaintext) 

914 expires_minutes = int(getattr(settings, "password_reset_token_expiry_minutes", 60)) 

915 expires_at = now + timedelta(minutes=expires_minutes) 

916 

917 existing_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.used_at.is_(None)).where(PasswordResetToken.expires_at > now) 

918 for existing in self.db.execute(existing_stmt).scalars().all(): 

919 existing.used_at = now 

920 

921 token_record = PasswordResetToken( 

922 user_email=user.email, 

923 token_hash=token_hash, 

924 expires_at=expires_at, 

925 ip_address=ip_address, 

926 user_agent=user_agent, 

927 ) 

928 self.db.add(token_record) 

929 self.db.commit() 

930 

931 try: 

932 email_sent = await self.email_notification_service.send_password_reset_email( 

933 to_email=user.email, 

934 full_name=user.full_name, 

935 reset_url=self._build_reset_password_url(token_plaintext), 

936 expires_minutes=expires_minutes, 

937 ) 

938 except Exception as exc: 

939 logger.warning("Failed to send password reset email to %s: %s", user.email, exc) 

940 

941 password_reset_requests_counter.labels(outcome="accepted").inc() 

942 self._log_auth_event( 

943 event_type="PASSWORD_RESET_EMAIL_SENT", 

944 success=True, 

945 user_email=user.email, 

946 ip_address=ip_address, 

947 user_agent=user_agent, 

948 details={"token_hash": token_hash, "expires_at": expires_at.isoformat(), "email_sent": email_sent}, 

949 ) 

950 else: 

951 password_reset_requests_counter.labels(outcome="accepted").inc() 

952 

953 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time) 

954 if remaining > 0: 

955 await asyncio.sleep(remaining) 

956 return PasswordResetRequestResult(rate_limited=False, email_sent=email_sent) 

957 

958 async def validate_password_reset_token(self, token: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetToken: 

959 """Validate a one-time password reset token. 

960 

961 Args: 

962 token: Plaintext password reset token. 

963 ip_address: Source IP address. 

964 user_agent: Source user agent string. 

965 

966 Returns: 

967 PasswordResetToken: Matching valid reset token record. 

968 

969 Raises: 

970 AuthenticationError: If token is missing, invalid, used, or expired. 

971 """ 

972 if not token: 

973 password_reset_completions_counter.labels(outcome="invalid_token").inc() 

974 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Missing token") 

975 raise AuthenticationError("This reset link is invalid") 

976 

977 token_hash = self._hash_reset_token(token) 

978 stmt = select(PasswordResetToken).where(PasswordResetToken.token_hash == token_hash) 

979 reset_token = self.db.execute(stmt).scalar_one_or_none() 

980 

981 if not reset_token: 

982 password_reset_completions_counter.labels(outcome="invalid_token").inc() 

983 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Invalid token hash") 

984 raise AuthenticationError("This reset link is invalid") 

985 

986 if not hmac.compare_digest(reset_token.token_hash, token_hash): 

987 password_reset_completions_counter.labels(outcome="invalid_token").inc() 

988 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token hash mismatch") 

989 raise AuthenticationError("This reset link is invalid") 

990 

991 if reset_token.is_used(): 

992 password_reset_completions_counter.labels(outcome="used_token").inc() 

993 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token already used") 

994 raise AuthenticationError("This reset link has already been used") 

995 

996 if reset_token.is_expired(): 

997 password_reset_completions_counter.labels(outcome="expired_token").inc() 

998 self._log_auth_event("PASSWORD_RESET_TOKEN_EXPIRED", False, reset_token.user_email, ip_address, user_agent, details={"token_hash": token_hash}) 

999 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token expired") 

1000 raise AuthenticationError("This reset link has expired") 

1001 

1002 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", True, reset_token.user_email, ip_address, user_agent) 

1003 return reset_token 

1004 

1005 async def reset_password_with_token(self, token: str, new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool: 

1006 """Complete password reset using a validated one-time token. 

1007 

1008 Args: 

1009 token: Plaintext password reset token. 

1010 new_password: New password value. 

1011 ip_address: Source IP address. 

1012 user_agent: Source user agent string. 

1013 

1014 Returns: 

1015 bool: True when password reset completed successfully. 

1016 

1017 Raises: 

1018 AuthenticationError: If token or associated user is invalid. 

1019 PasswordValidationError: If new password violates policy or reuse checks. 

1020 """ 

1021 reset_token = await self.validate_password_reset_token(token, ip_address=ip_address, user_agent=user_agent) 

1022 user = await self.get_user_by_email(reset_token.user_email) 

1023 if not user or not user.is_active: 

1024 password_reset_completions_counter.labels(outcome="invalid_user").inc() 

1025 raise AuthenticationError("This reset link is invalid") 

1026 

1027 self.validate_password(new_password) 

1028 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash): 

1029 password_reset_completions_counter.labels(outcome="reused_password").inc() 

1030 raise PasswordValidationError("New password must be different from current password") 

1031 

1032 now = utc_now() 

1033 user.password_hash = await self.password_service.hash_password_async(new_password) 

1034 user.password_change_required = False 

1035 user.password_changed_at = now 

1036 user.failed_login_attempts = 0 

1037 user.locked_until = None 

1038 

1039 reset_token.used_at = now 

1040 outstanding_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.id != reset_token.id).where(PasswordResetToken.used_at.is_(None)) 

1041 for outstanding in self.db.execute(outstanding_stmt).scalars().all(): 

1042 outstanding.used_at = now 

1043 

1044 self.db.commit() 

1045 

1046 if getattr(settings, "password_reset_invalidate_sessions", True): 

1047 await self._invalidate_user_auth_cache(user.email) 

1048 

1049 email_sent = False 

1050 try: 

1051 email_sent = await self.email_notification_service.send_password_reset_confirmation_email(to_email=user.email, full_name=user.full_name) 

1052 except Exception as exc: 

1053 logger.warning("Failed to send password reset confirmation for %s: %s", user.email, exc) 

1054 

1055 password_reset_completions_counter.labels(outcome="success").inc() 

1056 self._log_auth_event( 

1057 event_type="PASSWORD_RESET_COMPLETED", 

1058 success=True, 

1059 user_email=user.email, 

1060 ip_address=ip_address, 

1061 user_agent=user_agent, 

1062 details={"email_sent": email_sent}, 

1063 ) 

1064 return True 

1065 

1066 async def unlock_user_account(self, email: str, unlocked_by: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> EmailUser: 

1067 """Clear lockout state for a user account. 

1068 

1069 Args: 

1070 email: User email to unlock. 

1071 unlocked_by: Admin/user identifier who performed unlock. 

1072 ip_address: Source IP address. 

1073 user_agent: Source user agent string. 

1074 

1075 Returns: 

1076 EmailUser: Updated user record after unlock. 

1077 

1078 Raises: 

1079 ValueError: If the target user cannot be found. 

1080 """ 

1081 normalized_email = email.lower().strip() 

1082 user = await self.get_user_by_email(normalized_email) 

1083 if not user: 

1084 raise ValueError(f"User {normalized_email} not found") 

1085 

1086 user.failed_login_attempts = 0 

1087 user.locked_until = None 

1088 user.updated_at = utc_now() 

1089 self.db.commit() 

1090 self._log_auth_event( 

1091 event_type="ACCOUNT_UNLOCKED", 

1092 success=True, 

1093 user_email=user.email, 

1094 ip_address=ip_address, 

1095 user_agent=user_agent, 

1096 details={"unlocked_by": unlocked_by}, 

1097 ) 

1098 return user 

1099 

1100 async def change_password(self, email: str, old_password: Optional[str], new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool: 

1101 """Change a user's password. 

1102 

1103 Args: 

1104 email: User's email address 

1105 old_password: Current password for verification 

1106 new_password: New password to set 

1107 ip_address: Client IP address for logging 

1108 user_agent: Client user agent for logging 

1109 

1110 Returns: 

1111 bool: True if password changed successfully 

1112 

1113 Raises: 

1114 AuthenticationError: If old password is incorrect 

1115 PasswordValidationError: If new password doesn't meet policy 

1116 Exception: If database operation fails 

1117 

1118 Examples: 

1119 # success = await service.change_password( 

1120 # "user@example.com", 

1121 # "old_password", 

1122 # "new_secure_password" 

1123 # ) 

1124 # success # Returns: True 

1125 """ 

1126 # Validate old password is provided 

1127 if old_password is None: 

1128 raise AuthenticationError("Current password is required") 

1129 

1130 # First authenticate with old password 

1131 user = await self.authenticate_user(email, old_password, ip_address, user_agent) 

1132 if not user: 

1133 raise AuthenticationError("Current password is incorrect") 

1134 

1135 # Validate new password 

1136 self.validate_password(new_password) 

1137 

1138 # Check if new password is same as old (optional policy) 

1139 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash): 

1140 raise PasswordValidationError("New password must be different from current password") 

1141 

1142 success = False 

1143 try: 

1144 # Hash new password and update 

1145 new_password_hash = await self.password_service.hash_password_async(new_password) 

1146 user.password_hash = new_password_hash 

1147 # Clear the flag that requires the user to change password 

1148 user.password_change_required = False 

1149 # Record the password change timestamp 

1150 try: 

1151 user.password_changed_at = utc_now() 

1152 except Exception as exc: 

1153 logger.debug("Failed to set password_changed_at for %s: %s", email, exc) 

1154 

1155 self.db.commit() 

1156 success = True 

1157 

1158 # Invalidate auth cache for user 

1159 try: 

1160 await self._invalidate_user_auth_cache(email) 

1161 except Exception as cache_error: # nosec B110 - best effort cache invalidation 

1162 logger.debug("Failed to invalidate auth cache on password change: %s", cache_error) 

1163 

1164 logger.info(f"Password changed successfully for {SecurityValidator.sanitize_log_message(email)}") 

1165 

1166 except Exception as e: 

1167 self.db.rollback() 

1168 logger.error(f"Error changing password for {SecurityValidator.sanitize_log_message(email)}: {e}") 

1169 raise 

1170 finally: 

1171 # Log password change event 

1172 password_event = EmailAuthEvent.create_password_change_event(user_email=email, success=success, ip_address=ip_address, user_agent=user_agent) 

1173 self.db.add(password_event) 

1174 self.db.commit() 

1175 

1176 return success 

1177 

1178 async def create_platform_admin(self, email: str, password: str, full_name: Optional[str] = None) -> EmailUser: 

1179 """Create or update the platform administrator user. 

1180 

1181 This method is used during system bootstrap to create the initial 

1182 admin user from environment variables. 

1183 

1184 Args: 

1185 email: Admin email address 

1186 password: Admin password 

1187 full_name: Admin full name 

1188 

1189 Returns: 

1190 EmailUser: The admin user 

1191 

1192 Examples: 

1193 # admin = await service.create_platform_admin( 

1194 # "admin@example.com", 

1195 # "admin_password", 

1196 # "Platform Administrator" 

1197 # ) 

1198 # admin.is_admin # Returns: True 

1199 """ 

1200 # Check if admin user already exists 

1201 existing_admin = await self.get_user_by_email(email) 

1202 

1203 if existing_admin: 

1204 # Update existing admin if password or name changed 

1205 if full_name and existing_admin.full_name != full_name: 

1206 existing_admin.full_name = full_name 

1207 

1208 # Check if password needs update (verify current password first) 

1209 if not await self.password_service.verify_password_async(password, existing_admin.password_hash): 

1210 existing_admin.password_hash = await self.password_service.hash_password_async(password) 

1211 try: 

1212 existing_admin.password_changed_at = utc_now() 

1213 except Exception as exc: 

1214 logger.debug("Failed to set password_changed_at for existing admin %s: %s", email, exc) 

1215 

1216 # Ensure admin status 

1217 existing_admin.is_admin = True 

1218 existing_admin.is_active = True 

1219 

1220 # Synchronize platform_admin RBAC role with is_admin flag 

1221 # This ensures atomicity: when setting is_admin=True, also assign the platform_admin role 

1222 try: 

1223 platform_admin_role = await self.role_service.get_role_by_name("platform_admin", "global") 

1224 if platform_admin_role: 

1225 # Check if role assignment already exists 

1226 existing_assignment = await self.role_service.get_user_role_assignment(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None) 

1227 

1228 if not existing_assignment or not existing_assignment.is_active: 

1229 await self.role_service.assign_role_to_user(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=email) 

1230 logger.info(f"Assigned platform_admin role to {SecurityValidator.sanitize_log_message(email)} during create_platform_admin()") 

1231 else: 

1232 logger.debug(f"User {SecurityValidator.sanitize_log_message(email)} already has active platform_admin role") 

1233 else: 

1234 logger.warning(f"platform_admin role not found. User {SecurityValidator.sanitize_log_message(email)} updated with is_admin=True but without platform_admin role assignment.") 

1235 except Exception as role_error: 

1236 logger.error( 

1237 f"Failed to assign platform_admin role to {SecurityValidator.sanitize_log_message(email)}: {SecurityValidator.sanitize_log_message(str(role_error))}. User updated with is_admin=True but role assignment failed." 

1238 ) 

1239 # Rollback to clear any failed transaction state (e.g. PendingRollbackError 

1240 # from a failed commit inside assign_role_to_user), then re-apply admin flags 

1241 # so the subsequent commit can persist the admin user update. 

1242 try: 

1243 self.db.rollback() 

1244 existing_admin.is_admin = True 

1245 existing_admin.is_active = True 

1246 except Exception as rollback_error: # nosec B110 

1247 logger.debug("Session rollback after role sync failure also failed: %s", rollback_error) 

1248 # bootstrap_default_roles() will sync the role assignment later 

1249 

1250 self.db.commit() 

1251 logger.info(f"Updated platform admin user: {SecurityValidator.sanitize_log_message(email)}") 

1252 return existing_admin 

1253 

1254 # Create new admin user - skip password validation during bootstrap 

1255 admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True) 

1256 

1257 logger.info(f"Created platform admin user: {SecurityValidator.sanitize_log_message(email)}") 

1258 return admin_user 

1259 

1260 async def update_last_login(self, email: str) -> None: 

1261 """Update the last login timestamp for a user. 

1262 

1263 Args: 

1264 email: User's email address 

1265 """ 

1266 user = await self.get_user_by_email(email) 

1267 if user: 

1268 user.reset_failed_attempts() # This also updates last_login 

1269 self.db.commit() 

1270 

1271 @staticmethod 

1272 def _escape_like(value: str) -> str: 

1273 """Escape LIKE wildcards for prefix search. 

1274 

1275 Args: 

1276 value: Raw value to escape for LIKE matching. 

1277 

1278 Returns: 

1279 Escaped string safe for LIKE patterns. 

1280 """ 

1281 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

1282 

1283 async def list_users( 

1284 self, 

1285 limit: Optional[int] = None, 

1286 cursor: Optional[str] = None, 

1287 page: Optional[int] = None, 

1288 per_page: Optional[int] = None, 

1289 search: Optional[str] = None, 

1290 ) -> UsersListResult: 

1291 """List all users with cursor or page-based pagination support and optional search. 

1292 

1293 This method supports both cursor-based (for API endpoints with large datasets) 

1294 and page-based (for admin UI with page numbers) pagination, with optional 

1295 search filtering by email or full name. 

1296 

1297 Note: This method returns ORM objects and cannot be cached since callers 

1298 depend on ORM attributes and methods (e.g., EmailUserResponse.from_email_user). 

1299 

1300 Args: 

1301 limit: Maximum number of users to return (for cursor-based pagination) 

1302 cursor: Opaque cursor token for cursor-based pagination 

1303 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1304 per_page: Items per page for page-based pagination 

1305 search: Optional search term to filter by email or full name (case-insensitive) 

1306 

1307 Returns: 

1308 UsersListResult with data and optional pagination metadata. 

1309 

1310 Examples: 

1311 # Cursor-based pagination (for APIs) 

1312 # result = await service.list_users(cursor=None, limit=50) 

1313 # len(result.data) <= 50 # Returns: True 

1314 

1315 # Page-based pagination (for admin UI) 

1316 # result = await service.list_users(page=1, per_page=10) 

1317 # result.data # Returns: list of users 

1318 # result.pagination # Returns: pagination metadata 

1319 

1320 # Search users 

1321 # users = await service.list_users(search="john", page=1, per_page=10) 

1322 # All users with "john" in email or name 

1323 """ 

1324 try: 

1325 # Build base query with ordering by created_at, email for consistent pagination 

1326 # Note: EmailUser uses email as primary key, not id 

1327 query = select(EmailUser).order_by(desc(EmailUser.created_at), desc(EmailUser.email)) 

1328 

1329 # Apply search filter if provided (prefix search for better index usage) 

1330 if search and search.strip(): 

1331 search_term = f"{self._escape_like(search.strip())}%" 

1332 # NOTE: For large Postgres datasets, consider citext or functional indexes for case-insensitive search. 

1333 query = query.where( 

1334 or_( 

1335 EmailUser.email.ilike(search_term, escape="\\"), 

1336 EmailUser.full_name.ilike(search_term, escape="\\"), 

1337 ) 

1338 ) 

1339 

1340 # Page-based pagination: use unified_paginate 

1341 if page is not None: 

1342 pag_result = await unified_paginate( 

1343 db=self.db, 

1344 query=query, 

1345 page=page, 

1346 per_page=per_page, 

1347 cursor=None, 

1348 limit=None, 

1349 base_url="/admin/users", 

1350 query_params={}, 

1351 ) 

1352 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"]) 

1353 

1354 # Cursor-based pagination: custom implementation for EmailUser 

1355 # EmailUser uses email as PK (not id), so we need custom cursor using (created_at, email) 

1356 page_size = limit if limit and limit > 0 else settings.pagination_default_page_size 

1357 if limit == 0: 

1358 page_size = None # No limit 

1359 

1360 # Decode cursor and apply keyset filter if provided 

1361 if cursor: 

1362 try: 

1363 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

1364 cursor_data = orjson.loads(cursor_json) 

1365 last_email = cursor_data.get("email") 

1366 created_str = cursor_data.get("created_at") 

1367 if last_email and created_str: 

1368 last_created = datetime.fromisoformat(created_str) 

1369 # Apply keyset filter (assumes DESC order on created_at, email) 

1370 query = query.where( 

1371 or_( 

1372 EmailUser.created_at < last_created, 

1373 and_(EmailUser.created_at == last_created, EmailUser.email < last_email), 

1374 ) 

1375 ) 

1376 except (ValueError, TypeError) as e: 

1377 logger.warning(f"Invalid cursor for user pagination, ignoring: {e}") 

1378 

1379 # Fetch page_size + 1 to determine if there are more results 

1380 if page_size is not None: 

1381 query = query.limit(page_size + 1) 

1382 result = self.db.execute(query) 

1383 users = list(result.scalars().all()) 

1384 

1385 if page_size is None: 

1386 return UsersListResult(data=users, next_cursor=None) 

1387 

1388 # Check if there are more results 

1389 has_more = len(users) > page_size 

1390 if has_more: 

1391 users = users[:page_size] 

1392 

1393 # Generate next cursor using (created_at, email) for EmailUser 

1394 next_cursor = None 

1395 if has_more and users: 

1396 last_user = users[-1] 

1397 cursor_data = { 

1398 "created_at": last_user.created_at.isoformat() if last_user.created_at else None, 

1399 "email": last_user.email, 

1400 } 

1401 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

1402 

1403 return UsersListResult(data=users, next_cursor=next_cursor) 

1404 

1405 except Exception as e: 

1406 logger.error(f"Error listing users: {e}") 

1407 # Return appropriate empty response based on pagination mode 

1408 if page is not None: 

1409 fallback_per_page = per_page or 50 

1410 return UsersListResult( 

1411 data=[], 

1412 pagination=PaginationMeta(page=page, per_page=fallback_per_page, total_items=0, total_pages=0, has_next=False, has_prev=False), 

1413 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg 

1414 self=f"/admin/users?page=1&per_page={fallback_per_page}", 

1415 first=f"/admin/users?page=1&per_page={fallback_per_page}", 

1416 last=f"/admin/users?page=1&per_page={fallback_per_page}", 

1417 ), 

1418 ) 

1419 

1420 if cursor is not None: 

1421 return UsersListResult(data=[], next_cursor=None) 

1422 

1423 return UsersListResult(data=[]) 

1424 

1425 async def list_users_not_in_team( 

1426 self, 

1427 team_id: str, 

1428 cursor: Optional[str] = None, 

1429 limit: Optional[int] = None, 

1430 page: Optional[int] = None, 

1431 per_page: Optional[int] = None, 

1432 search: Optional[str] = None, 

1433 ) -> UsersListResult: 

1434 """List users who are NOT members of the specified team with cursor or page-based pagination. 

1435 

1436 Uses a NOT IN subquery to efficiently exclude team members. 

1437 

1438 Args: 

1439 team_id: ID of the team to exclude members from 

1440 cursor: Opaque cursor token for cursor-based pagination 

1441 limit: Maximum number of users to return (for cursor-based, default: 50) 

1442 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1443 per_page: Items per page for page-based pagination (default: 30) 

1444 search: Optional search term to filter by email or full name 

1445 

1446 Returns: 

1447 UsersListResult with data and either cursor or pagination metadata 

1448 

1449 Examples: 

1450 # Page-based (admin UI) 

1451 # result = await service.list_users_not_in_team("team-123", page=1, per_page=30) 

1452 # result.pagination # Returns: pagination metadata 

1453 

1454 # Cursor-based (API) 

1455 # result = await service.list_users_not_in_team("team-123", cursor=None, limit=50) 

1456 # result.next_cursor # Returns: next cursor token 

1457 """ 

1458 try: 

1459 # Build base query 

1460 query = select(EmailUser) 

1461 

1462 # Apply search filter if provided 

1463 if search and search.strip(): 

1464 search_term = f"{self._escape_like(search.strip())}%" 

1465 query = query.where( 

1466 or_( 

1467 EmailUser.email.ilike(search_term, escape="\\"), 

1468 EmailUser.full_name.ilike(search_term, escape="\\"), 

1469 ) 

1470 ) 

1471 

1472 # Exclude team members using NOT IN subquery 

1473 member_emails_subquery = select(EmailTeamMember.user_email).where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

1474 query = query.where(EmailUser.is_active.is_(True), ~EmailUser.email.in_(member_emails_subquery)) 

1475 

1476 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate 

1477 if page is not None: 

1478 query = query.order_by(EmailUser.full_name, EmailUser.email) 

1479 pag_result = await unified_paginate( 

1480 db=self.db, 

1481 query=query, 

1482 page=page, 

1483 per_page=per_page or 30, 

1484 cursor=None, 

1485 limit=None, 

1486 base_url=f"/admin/teams/{team_id}/non-members", 

1487 query_params={}, 

1488 ) 

1489 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"]) 

1490 

1491 # CURSOR-BASED PAGINATION - custom implementation using (created_at, email) 

1492 # unified_paginate uses (created_at, id) but EmailUser uses email as PK 

1493 query = query.order_by(desc(EmailUser.created_at), desc(EmailUser.email)) 

1494 

1495 # Decode cursor and apply keyset filter 

1496 if cursor: 

1497 try: 

1498 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

1499 cursor_data = orjson.loads(cursor_json) 

1500 last_email = cursor_data.get("email") 

1501 created_str = cursor_data.get("created_at") 

1502 if last_email and created_str: 

1503 last_created = datetime.fromisoformat(created_str) 

1504 # Keyset filter: (created_at < last) OR (created_at = last AND email < last_email) 

1505 query = query.where( 

1506 or_( 

1507 EmailUser.created_at < last_created, 

1508 and_(EmailUser.created_at == last_created, EmailUser.email < last_email), 

1509 ) 

1510 ) 

1511 except (ValueError, TypeError) as e: 

1512 logger.warning(f"Invalid cursor for non-members list, ignoring: {e}") 

1513 

1514 # Fetch limit + 1 to check for more results 

1515 page_size = limit or 50 

1516 query = query.limit(page_size + 1) 

1517 users = list(self.db.execute(query).scalars().all()) 

1518 

1519 # Check if there are more results 

1520 has_more = len(users) > page_size 

1521 if has_more: 

1522 users = users[:page_size] 

1523 

1524 # Generate next cursor using (created_at, email) 

1525 next_cursor = None 

1526 if has_more and users: 

1527 last_user = users[-1] 

1528 cursor_data = { 

1529 "created_at": last_user.created_at.isoformat() if last_user.created_at else None, 

1530 "email": last_user.email, 

1531 } 

1532 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

1533 

1534 self.db.commit() 

1535 return UsersListResult(data=users, next_cursor=next_cursor) 

1536 

1537 except Exception as e: 

1538 logger.error(f"Error listing non-members for team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

1539 

1540 # Return appropriate empty response based on mode 

1541 if page is not None: 

1542 return UsersListResult( 

1543 data=[], 

1544 pagination=PaginationMeta(page=page, per_page=per_page or 30, total_items=0, total_pages=0, has_next=False, has_prev=False), 

1545 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg 

1546 self=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

1547 first=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

1548 last=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

1549 ), 

1550 ) 

1551 

1552 return UsersListResult(data=[], next_cursor=None) 

1553 

1554 async def get_all_users(self) -> list[EmailUser]: 

1555 """Get all users without pagination. 

1556 

1557 .. deprecated:: 1.0 

1558 Use :meth:`list_users` with proper pagination instead. 

1559 This method has a hardcoded limit of 10,000 users and will not return 

1560 more than that. For production systems with many users, use paginated 

1561 access with search/filtering. 

1562 

1563 Returns: 

1564 List of up to 10,000 EmailUser objects 

1565 

1566 Raises: 

1567 ValueError: If total users exceed 10,000 

1568 

1569 Examples: 

1570 # users = await service.get_all_users() 

1571 # isinstance(users, list) # Returns: True 

1572 

1573 Warning: 

1574 This method is deprecated and will be removed in a future version. 

1575 Use list_users() with pagination instead: 

1576 

1577 # For small datasets 

1578 users = await service.list_users(page=1, per_page=1000).data 

1579 

1580 # For searching 

1581 users = await service.list_users(search="john", page=1, per_page=10).data 

1582 """ 

1583 if not self.__class__.get_all_users_deprecated_warned: 

1584 warnings.warn( 

1585 "get_all_users() is deprecated and limited to 10,000 users. " + "Use list_users() with pagination instead.", 

1586 DeprecationWarning, 

1587 stacklevel=2, 

1588 ) 

1589 self.__class__.get_all_users_deprecated_warned = True 

1590 

1591 total_users = await self.count_users() 

1592 if total_users > _GET_ALL_USERS_LIMIT: 

1593 raise ValueError("get_all_users() supports up to 10,000 users. Use list_users() pagination instead.") 

1594 

1595 result = await self.list_users(limit=_GET_ALL_USERS_LIMIT) 

1596 return result.data # Large limit to get all users 

1597 

1598 async def count_users(self) -> int: 

1599 """Count total number of users. 

1600 

1601 Returns: 

1602 int: Total user count 

1603 """ 

1604 try: 

1605 stmt = select(func.count(EmailUser.email)) # pylint: disable=not-callable 

1606 count = self.db.execute(stmt).scalar() or 0 

1607 return count 

1608 except Exception as e: 

1609 logger.error(f"Error counting users: {e}") 

1610 return 0 

1611 

1612 async def get_auth_events(self, email: Optional[str] = None, limit: int = 100, offset: int = 0) -> list[EmailAuthEvent]: 

1613 """Get authentication events for auditing. 

1614 

1615 Args: 

1616 email: Filter by specific user email (optional) 

1617 limit: Maximum number of events to return 

1618 offset: Number of events to skip 

1619 

1620 Returns: 

1621 List of EmailAuthEvent objects 

1622 """ 

1623 try: 

1624 stmt = select(EmailAuthEvent) 

1625 if email: 

1626 stmt = stmt.where(EmailAuthEvent.user_email == email) 

1627 stmt = stmt.order_by(EmailAuthEvent.timestamp.desc()).offset(offset).limit(limit) 

1628 

1629 result = self.db.execute(stmt) 

1630 events = list(result.scalars().all()) 

1631 return events 

1632 except Exception as e: 

1633 logger.error(f"Error getting auth events: {e}") 

1634 return [] 

1635 

1636 async def update_user( 

1637 self, 

1638 email: str, 

1639 full_name: Optional[str] = None, 

1640 is_admin: Optional[bool] = None, 

1641 is_active: Optional[bool] = None, 

1642 email_verified: Optional[bool] = None, 

1643 password_change_required: Optional[bool] = None, 

1644 password: Optional[str] = None, 

1645 admin_origin_source: Optional[str] = None, 

1646 ) -> EmailUser: 

1647 """Update user information. 

1648 

1649 Args: 

1650 email: User's email address (primary key) 

1651 full_name: New full name (optional) 

1652 is_admin: New admin status (optional) 

1653 is_active: New active status (optional) 

1654 email_verified: Set email verification status (optional) 

1655 password_change_required: Whether user must change password on next login (optional) 

1656 password: New password (optional, will be hashed) 

1657 admin_origin_source: Source of admin change for tracking (e.g. "api", "ui"). Callers should pass explicitly. 

1658 

1659 Returns: 

1660 EmailUser: Updated user object 

1661 

1662 Raises: 

1663 ValueError: If user doesn't exist, if protect_all_admins blocks the change, or if it would remove the last active admin 

1664 PasswordValidationError: If password doesn't meet policy 

1665 """ 

1666 try: 

1667 # Normalize email to match create_user() / get_user_by_email() behavior 

1668 email = email.lower().strip() 

1669 

1670 # Get existing user 

1671 stmt = select(EmailUser).where(EmailUser.email == email) 

1672 result = self.db.execute(stmt) 

1673 user = result.scalar_one_or_none() 

1674 

1675 if not user: 

1676 raise ValueError(f"User {email} not found") 

1677 

1678 # Admin protection guard 

1679 if user.is_admin and user.is_active: 

1680 would_lose_admin = (is_admin is not None and not is_admin) or (is_active is not None and not is_active) 

1681 if would_lose_admin: 

1682 if settings.protect_all_admins: 

1683 raise ValueError("Admin protection is enabled — cannot demote or deactivate any admin user") 

1684 if await self.is_last_active_admin(email): 

1685 raise ValueError("Cannot demote or deactivate the last remaining active admin user") 

1686 

1687 # Update fields if provided 

1688 if full_name is not None: 

1689 user.full_name = full_name 

1690 

1691 if email_verified is not None: 

1692 user.email_verified_at = utc_now() if email_verified else None 

1693 

1694 if is_admin is not None: 

1695 # Track admin_origin when status actually changes 

1696 if is_admin != user.is_admin: 

1697 user.is_admin = is_admin 

1698 user.admin_origin = admin_origin_source if is_admin else None 

1699 

1700 # Sync global role assignment with is_admin flag: 

1701 # Promotion: revoke default_user_role, assign default_admin_role 

1702 # Demotion: revoke default_admin_role, assign default_user_role 

1703 try: 

1704 admin_role_name = settings.default_admin_role 

1705 user_role_name = settings.default_user_role 

1706 admin_role = await self.role_service.get_role_by_name(admin_role_name, "global") 

1707 user_role = await self.role_service.get_role_by_name(user_role_name, "global") 

1708 

1709 if is_admin: 

1710 # Promotion: assign admin role, revoke user role 

1711 if admin_role: 

1712 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=admin_role.id, scope="global", scope_id=None) 

1713 if not existing or not existing.is_active: 

1714 await self.role_service.assign_role_to_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None, granted_by=email) 

1715 logger.info(f"Assigned {admin_role_name} role to {SecurityValidator.sanitize_log_message(email)}") 

1716 else: 

1717 logger.warning(f"{admin_role_name} role not found, cannot assign to {SecurityValidator.sanitize_log_message(email)}") 

1718 

1719 if user_role: 

1720 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None) 

1721 if revoked: 

1722 logger.info(f"Revoked {SecurityValidator.sanitize_log_message(user_role_name)} role from {SecurityValidator.sanitize_log_message(email)}") 

1723 else: 

1724 # Demotion: revoke admin role, assign user role 

1725 if admin_role: 

1726 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None) 

1727 if revoked: 

1728 logger.info(f"Revoked {admin_role_name} role from {SecurityValidator.sanitize_log_message(email)}") 

1729 

1730 if user_role: 

1731 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=user_role.id, scope="global", scope_id=None) 

1732 if not existing or not existing.is_active: 

1733 await self.role_service.assign_role_to_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None, granted_by=email) 

1734 logger.info(f"Assigned {SecurityValidator.sanitize_log_message(user_role_name)} role to {SecurityValidator.sanitize_log_message(email)}") 

1735 else: 

1736 logger.warning(f"{SecurityValidator.sanitize_log_message(user_role_name)} role not found, cannot assign to {SecurityValidator.sanitize_log_message(email)}") 

1737 

1738 except Exception as e: 

1739 logger.warning(f"Failed to sync global roles for {SecurityValidator.sanitize_log_message(email)}: {e}") 

1740 # Don't fail user update if role sync fails 

1741 

1742 if is_active is not None: 

1743 user.is_active = is_active 

1744 

1745 if password is not None: 

1746 self.validate_password(password) 

1747 user.password_hash = await self.password_service.hash_password_async(password) 

1748 # Only clear password_change_required if it wasn't explicitly set 

1749 if password_change_required is None: 

1750 user.password_change_required = False 

1751 user.password_changed_at = utc_now() 

1752 

1753 # Set password_change_required after password processing to allow explicit override 

1754 if password_change_required is not None: 

1755 user.password_change_required = password_change_required 

1756 

1757 user.updated_at = datetime.now(timezone.utc) 

1758 

1759 self.db.commit() 

1760 

1761 return user 

1762 

1763 except Exception as e: 

1764 self.db.rollback() 

1765 logger.error(f"Error updating user {SecurityValidator.sanitize_log_message(email)}: {e}") 

1766 raise 

1767 

1768 async def activate_user(self, email: str) -> EmailUser: 

1769 """Activate a user account. 

1770 

1771 Args: 

1772 email: User's email address 

1773 

1774 Returns: 

1775 EmailUser: Updated user object 

1776 

1777 Raises: 

1778 ValueError: If user doesn't exist 

1779 """ 

1780 try: 

1781 stmt = select(EmailUser).where(EmailUser.email == email) 

1782 result = self.db.execute(stmt) 

1783 user = result.scalar_one_or_none() 

1784 

1785 if not user: 

1786 raise ValueError(f"User {email} not found") 

1787 

1788 user.is_active = True 

1789 user.updated_at = datetime.now(timezone.utc) 

1790 

1791 self.db.commit() 

1792 

1793 logger.info(f"User {SecurityValidator.sanitize_log_message(email)} activated") 

1794 return user 

1795 

1796 except Exception as e: 

1797 self.db.rollback() 

1798 logger.error(f"Error activating user {SecurityValidator.sanitize_log_message(email)}: {e}") 

1799 raise 

1800 

1801 async def deactivate_user(self, email: str) -> EmailUser: 

1802 """Deactivate a user account. 

1803 

1804 Args: 

1805 email: User's email address 

1806 

1807 Returns: 

1808 EmailUser: Updated user object 

1809 

1810 Raises: 

1811 ValueError: If user doesn't exist 

1812 """ 

1813 try: 

1814 stmt = select(EmailUser).where(EmailUser.email == email) 

1815 result = self.db.execute(stmt) 

1816 user = result.scalar_one_or_none() 

1817 

1818 if not user: 

1819 raise ValueError(f"User {email} not found") 

1820 

1821 user.is_active = False 

1822 user.updated_at = datetime.now(timezone.utc) 

1823 

1824 self.db.commit() 

1825 

1826 logger.info(f"User {SecurityValidator.sanitize_log_message(email)} deactivated") 

1827 return user 

1828 

1829 except Exception as e: 

1830 self.db.rollback() 

1831 logger.error(f"Error deactivating user {SecurityValidator.sanitize_log_message(email)}: {e}") 

1832 raise 

1833 

1834 async def delete_user(self, email: str) -> bool: 

1835 """Delete a user account permanently. 

1836 

1837 Args: 

1838 email: User's email address 

1839 

1840 Returns: 

1841 bool: True if user was deleted 

1842 

1843 Raises: 

1844 ValueError: If user doesn't exist 

1845 ValueError: If user owns teams that cannot be transferred 

1846 """ 

1847 try: 

1848 stmt = select(EmailUser).where(EmailUser.email == email) 

1849 result = self.db.execute(stmt) 

1850 user = result.scalar_one_or_none() 

1851 

1852 if not user: 

1853 raise ValueError(f"User {email} not found") 

1854 

1855 # Check if user owns any teams 

1856 teams_owned_stmt = select(EmailTeam).where(EmailTeam.created_by == email) 

1857 teams_owned = self.db.execute(teams_owned_stmt).scalars().all() 

1858 

1859 if teams_owned: 

1860 # For each team, try to transfer ownership to another owner 

1861 for team in teams_owned: 

1862 # Find other team owners who can take ownership 

1863 potential_owners_stmt = ( 

1864 select(EmailTeamMember).where(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email != email, EmailTeamMember.role == "owner").order_by(EmailTeamMember.role.desc()) 

1865 ) 

1866 

1867 potential_owners = self.db.execute(potential_owners_stmt).scalars().all() 

1868 

1869 if potential_owners: 

1870 # Transfer ownership to the first available owner 

1871 new_owner = potential_owners[0] 

1872 team.created_by = new_owner.user_email 

1873 logger.info(f"Transferred team '{SecurityValidator.sanitize_log_message(team.name)}' ownership from {SecurityValidator.sanitize_log_message(email)} to {new_owner.user_email}") 

1874 else: 

1875 # No other owners available - check if it's a single-user team 

1876 all_members_stmt = select(EmailTeamMember).where(EmailTeamMember.team_id == team.id) 

1877 all_members = self.db.execute(all_members_stmt).scalars().all() 

1878 

1879 if len(all_members) == 1 and all_members[0].user_email == email: 

1880 # This is a single-user personal team - cascade delete it 

1881 logger.info(f"Deleting personal team '{SecurityValidator.sanitize_log_message(team.name)}' (single member: {SecurityValidator.sanitize_log_message(email)})") 

1882 # Delete team members first (should be just the owner) 

1883 delete_team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.team_id == team.id) 

1884 self.db.execute(delete_team_members_stmt) 

1885 # Delete the team 

1886 self.db.delete(team) 

1887 else: 

1888 # Multi-member team with no other owners - cannot delete user 

1889 raise ValueError(f"Cannot delete user {email}: owns team '{team.name}' with {len(all_members)} members but no other owners to transfer ownership to") 

1890 

1891 # Delete all role assignments for the user 

1892 try: 

1893 await self.role_service.delete_all_user_roles(email) 

1894 except Exception as e: 

1895 logger.warning(f"Failed to delete role assignments for {SecurityValidator.sanitize_log_message(email)}: {e}") 

1896 

1897 # Reassign non-null audit FKs to another user so deleting this user does not 

1898 # break referential integrity for historical records. 

1899 replacement_row = self.db.query(EmailUser.email).filter(EmailUser.email != email).order_by(EmailUser.is_admin.desc(), EmailUser.created_at.asc()).first() 

1900 replacement_email = replacement_row[0] if replacement_row else None 

1901 

1902 if replacement_email: 

1903 self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.invited_by == email).update({EmailTeamInvitation.invited_by: replacement_email}, synchronize_session=False) 

1904 self.db.query(Role).filter(Role.created_by == email).update({Role.created_by: replacement_email}, synchronize_session=False) 

1905 self.db.query(UserRole).filter(UserRole.granted_by == email).update({UserRole.granted_by: replacement_email}, synchronize_session=False) 

1906 self.db.query(TokenRevocation).filter(TokenRevocation.revoked_by == email).update({TokenRevocation.revoked_by: replacement_email}, synchronize_session=False) 

1907 

1908 # Nullify nullable actor references. 

1909 self.db.query(EmailTeamMember).filter(EmailTeamMember.invited_by == email).update({EmailTeamMember.invited_by: None}, synchronize_session=False) 

1910 self.db.query(EmailTeamMemberHistory).filter(EmailTeamMemberHistory.action_by == email).update({EmailTeamMemberHistory.action_by: None}, synchronize_session=False) 

1911 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.reviewed_by == email).update({EmailTeamJoinRequest.reviewed_by: None}, synchronize_session=False) 

1912 self.db.query(PendingUserApproval).filter(PendingUserApproval.approved_by == email).update({PendingUserApproval.approved_by: None}, synchronize_session=False) 

1913 self.db.query(SSOAuthSession).filter(SSOAuthSession.user_email == email).update({SSOAuthSession.user_email: None}, synchronize_session=False) 

1914 

1915 # Remove rows where this user is the primary subject. 

1916 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == email).delete(synchronize_session=False) 

1917 

1918 # Delete related auth events 

1919 auth_events_stmt = delete(EmailAuthEvent).where(EmailAuthEvent.user_email == email) 

1920 self.db.execute(auth_events_stmt) 

1921 

1922 # Remove user from all team memberships 

1923 team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.user_email == email) 

1924 self.db.execute(team_members_stmt) 

1925 

1926 # Delete the user 

1927 self.db.delete(user) 

1928 self.db.commit() 

1929 

1930 await self._invalidate_deleted_user_auth_caches(email) 

1931 

1932 logger.info(f"User {SecurityValidator.sanitize_log_message(email)} deleted permanently") 

1933 return True 

1934 

1935 except Exception as e: 

1936 self.db.rollback() 

1937 logger.error(f"Error deleting user {SecurityValidator.sanitize_log_message(email)}: {e}") 

1938 raise 

1939 

1940 async def count_active_admin_users(self) -> int: 

1941 """Count the number of active admin users. 

1942 

1943 Returns: 

1944 int: Number of active admin users 

1945 """ 

1946 stmt = select(func.count(EmailUser.email)).where(EmailUser.is_admin.is_(True), EmailUser.is_active.is_(True)) # pylint: disable=not-callable 

1947 result = self.db.execute(stmt) 

1948 return result.scalar() or 0 

1949 

1950 async def is_last_active_admin(self, email: str) -> bool: 

1951 """Check if the given user is the last active admin. 

1952 

1953 Args: 

1954 email: User's email address 

1955 

1956 Returns: 

1957 bool: True if this user is the last active admin 

1958 """ 

1959 # First check if the user is an active admin 

1960 stmt = select(EmailUser).where(EmailUser.email == email) 

1961 result = self.db.execute(stmt) 

1962 user = result.scalar_one_or_none() 

1963 

1964 if not user or not user.is_admin or not user.is_active: 

1965 return False 

1966 

1967 # Count total active admins 

1968 admin_count = await self.count_active_admin_users() 

1969 return admin_count == 1