Coverage for mcpgateway / services / email_auth_service.py: 99%

524 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/email_auth_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Email Authentication Service. 

8This module provides email-based user authentication services including 

9user creation, authentication, password management, and security features. 

10 

11Examples: 

12 Basic usage (requires async context): 

13 from mcpgateway.services.email_auth_service import EmailAuthService 

14 from mcpgateway.db import SessionLocal 

15 

16 with SessionLocal() as db: 

17 service = EmailAuthService(db) 

18 # Use in async context: 

19 # user = await service.create_user("test@example.com", "password123") 

20 # authenticated = await service.authenticate_user("test@example.com", "password123") 

21""" 

22 

23# Standard 

24import base64 

25from dataclasses import dataclass 

26from datetime import datetime, timezone 

27import re 

28from typing import Optional 

29import warnings 

30 

31# Third-Party 

32import orjson 

33from sqlalchemy import and_, delete, desc, func, or_, select 

34from sqlalchemy.exc import IntegrityError 

35from sqlalchemy.orm import Session 

36 

37# First-Party 

38from mcpgateway.config import settings 

39from mcpgateway.db import EmailAuthEvent, EmailTeam, EmailTeamMember, EmailUser, utc_now 

40from mcpgateway.schemas import PaginationLinks, PaginationMeta 

41from mcpgateway.services.argon2_service import Argon2PasswordService 

42from mcpgateway.services.logging_service import LoggingService 

43from mcpgateway.utils.pagination import unified_paginate 

44 

45# Initialize logging 

46logging_service = LoggingService() 

47logger = logging_service.get_logger(__name__) 

48 

49_GET_ALL_USERS_LIMIT = 10000 

50 

51 

52@dataclass(frozen=True) 

53class UsersListResult: 

54 """Result for list_users queries.""" 

55 

56 data: list[EmailUser] 

57 next_cursor: Optional[str] = None 

58 pagination: Optional[PaginationMeta] = None 

59 links: Optional[PaginationLinks] = None 

60 

61 

62class EmailValidationError(Exception): 

63 """Raised when email format is invalid. 

64 

65 Examples: 

66 >>> try: 

67 ... raise EmailValidationError("Invalid email format") 

68 ... except EmailValidationError as e: 

69 ... str(e) 

70 'Invalid email format' 

71 """ 

72 

73 

74class PasswordValidationError(Exception): 

75 """Raised when password doesn't meet policy requirements. 

76 

77 Examples: 

78 >>> try: 

79 ... raise PasswordValidationError("Password too short") 

80 ... except PasswordValidationError as e: 

81 ... str(e) 

82 'Password too short' 

83 """ 

84 

85 

86class UserExistsError(Exception): 

87 """Raised when attempting to create a user that already exists. 

88 

89 Examples: 

90 >>> try: 

91 ... raise UserExistsError("User already exists") 

92 ... except UserExistsError as e: 

93 ... str(e) 

94 'User already exists' 

95 """ 

96 

97 

98class AuthenticationError(Exception): 

99 """Raised when authentication fails. 

100 

101 Examples: 

102 >>> try: 

103 ... raise AuthenticationError("Invalid credentials") 

104 ... except AuthenticationError as e: 

105 ... str(e) 

106 'Invalid credentials' 

107 """ 

108 

109 

110class EmailAuthService: 

111 """Service for email-based user authentication. 

112 

113 This service handles user registration, authentication, password management, 

114 and security features like account lockout and failed attempt tracking. 

115 

116 Attributes: 

117 db (Session): Database session 

118 password_service (Argon2PasswordService): Password hashing service 

119 

120 Examples: 

121 >>> from mcpgateway.db import SessionLocal 

122 >>> with SessionLocal() as db: 

123 ... service = EmailAuthService(db) 

124 ... # Service is ready to use 

125 """ 

126 

127 get_all_users_deprecated_warned = False 

128 

129 def __init__(self, db: Session): 

130 """Initialize the email authentication service. 

131 

132 Args: 

133 db: SQLAlchemy database session 

134 """ 

135 self.db = db 

136 self.password_service = Argon2PasswordService() 

137 self._role_service = None 

138 logger.debug("EmailAuthService initialized") 

139 

140 @property 

141 def role_service(self): 

142 """Lazy-initialized RoleService to avoid circular imports. 

143 

144 Returns: 

145 RoleService: Instance of RoleService 

146 """ 

147 if self._role_service is None: 

148 # First-Party 

149 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

150 

151 self._role_service = RoleService(self.db) 

152 return self._role_service 

153 

154 def validate_email(self, email: str) -> bool: 

155 """Validate email address format. 

156 

157 Args: 

158 email: Email address to validate 

159 

160 Returns: 

161 bool: True if email is valid 

162 

163 Raises: 

164 EmailValidationError: If email format is invalid 

165 

166 Examples: 

167 >>> service = EmailAuthService(None) 

168 >>> service.validate_email("user@example.com") 

169 True 

170 >>> service.validate_email("test.user+tag@domain.co.uk") 

171 True 

172 >>> service.validate_email("user123@test-domain.com") 

173 True 

174 >>> try: 

175 ... service.validate_email("invalid-email") 

176 ... except EmailValidationError as e: 

177 ... "Invalid email format" in str(e) 

178 True 

179 >>> try: 

180 ... service.validate_email("") 

181 ... except EmailValidationError as e: 

182 ... "Email is required" in str(e) 

183 True 

184 >>> try: 

185 ... service.validate_email("user@") 

186 ... except EmailValidationError as e: 

187 ... "Invalid email format" in str(e) 

188 True 

189 >>> try: 

190 ... service.validate_email("@domain.com") 

191 ... except EmailValidationError as e: 

192 ... "Invalid email format" in str(e) 

193 True 

194 >>> try: 

195 ... service.validate_email("user@domain") 

196 ... except EmailValidationError as e: 

197 ... "Invalid email format" in str(e) 

198 True 

199 >>> try: 

200 ... service.validate_email("a" * 250 + "@domain.com") 

201 ... except EmailValidationError as e: 

202 ... "Email address too long" in str(e) 

203 True 

204 >>> try: 

205 ... service.validate_email(None) 

206 ... except EmailValidationError as e: 

207 ... "Email is required" in str(e) 

208 True 

209 """ 

210 if not email or not isinstance(email, str): 

211 raise EmailValidationError("Email is required and must be a string") 

212 

213 # Basic email regex pattern 

214 email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" 

215 

216 if not re.match(email_pattern, email): 

217 raise EmailValidationError("Invalid email format") 

218 

219 if len(email) > 255: 

220 raise EmailValidationError("Email address too long (max 255 characters)") 

221 

222 return True 

223 

224 def validate_password(self, password: str) -> bool: 

225 """Validate password against policy requirements. 

226 

227 Args: 

228 password: Password to validate 

229 

230 Returns: 

231 bool: True if password meets policy 

232 

233 Raises: 

234 PasswordValidationError: If password doesn't meet requirements 

235 

236 Examples: 

237 >>> service = EmailAuthService(None) 

238 >>> service.validate_password("Password123!") # Meets all requirements 

239 True 

240 >>> service.validate_password("ValidPassword123!") 

241 True 

242 >>> service.validate_password("Shortpass!") # 8+ chars with requirements 

243 True 

244 >>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!") 

245 True 

246 >>> try: 

247 ... service.validate_password("") 

248 ... except PasswordValidationError as e: 

249 ... "Password is required" in str(e) 

250 True 

251 >>> try: 

252 ... service.validate_password(None) 

253 ... except PasswordValidationError as e: 

254 ... "Password is required" in str(e) 

255 True 

256 >>> try: 

257 ... service.validate_password("short") # Only 5 chars, should fail with default min_length=8 

258 ... except PasswordValidationError as e: 

259 ... "characters long" in str(e) 

260 True 

261 """ 

262 if not password: 

263 raise PasswordValidationError("Password is required") 

264 

265 # Respect global toggle for password policy 

266 if not getattr(settings, "password_policy_enabled", True): 

267 return True 

268 

269 # Get password policy settings 

270 min_length = getattr(settings, "password_min_length", 8) 

271 require_uppercase = getattr(settings, "password_require_uppercase", False) 

272 require_lowercase = getattr(settings, "password_require_lowercase", False) 

273 require_numbers = getattr(settings, "password_require_numbers", False) 

274 require_special = getattr(settings, "password_require_special", False) 

275 

276 if len(password) < min_length: 

277 raise PasswordValidationError(f"Password must be at least {min_length} characters long") 

278 

279 if require_uppercase and not re.search(r"[A-Z]", password): 

280 raise PasswordValidationError("Password must contain at least one uppercase letter") 

281 

282 if require_lowercase and not re.search(r"[a-z]", password): 

283 raise PasswordValidationError("Password must contain at least one lowercase letter") 

284 

285 if require_numbers and not re.search(r"[0-9]", password): 

286 raise PasswordValidationError("Password must contain at least one number") 

287 

288 if require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): 

289 raise PasswordValidationError("Password must contain at least one special character") 

290 

291 return True 

292 

293 async def get_user_by_email(self, email: str) -> Optional[EmailUser]: 

294 """Get user by email address. 

295 

296 Args: 

297 email: Email address to look up 

298 

299 Returns: 

300 EmailUser or None if not found 

301 

302 Examples: 

303 # Assuming database has user "test@example.com" 

304 # user = await service.get_user_by_email("test@example.com") 

305 # user.email if user else None # Returns: 'test@example.com' 

306 """ 

307 try: 

308 stmt = select(EmailUser).where(EmailUser.email == email.lower()) 

309 result = self.db.execute(stmt) 

310 user = result.scalar_one_or_none() 

311 return user 

312 except Exception as e: 

313 logger.error(f"Error getting user by email {email}: {e}") 

314 return None 

315 

316 async def create_user( 

317 self, 

318 email: str, 

319 password: str, 

320 full_name: Optional[str] = None, 

321 is_admin: bool = False, 

322 is_active: bool = True, 

323 password_change_required: bool = False, 

324 auth_provider: str = "local", 

325 skip_password_validation: bool = False, 

326 granted_by: Optional[str] = None, 

327 ) -> EmailUser: 

328 """Create a new user with email authentication. 

329 

330 Args: 

331 email: User's email address (primary key) 

332 password: Plain text password (will be hashed) 

333 full_name: Optional full name for display 

334 is_admin: Whether user has admin privileges 

335 is_active: Whether user account is active (default: True) 

336 password_change_required: Whether user must change password on next login (default: False) 

337 auth_provider: Authentication provider ('local', 'github', etc.) 

338 skip_password_validation: Skip password policy validation (for bootstrap) 

339 granted_by: Email of user creating this user (for role assignment audit trail) 

340 

341 Returns: 

342 EmailUser: The created user object 

343 

344 Raises: 

345 EmailValidationError: If email format is invalid 

346 PasswordValidationError: If password doesn't meet policy 

347 UserExistsError: If user already exists 

348 

349 Examples: 

350 # user = await service.create_user( 

351 # email="new@example.com", 

352 # password="secure123", 

353 # full_name="New User", 

354 # is_active=True, 

355 # password_change_required=False 

356 # ) 

357 # user.email # Returns: 'new@example.com' 

358 # user.full_name # Returns: 'New User' 

359 # user.is_active # Returns: True 

360 """ 

361 # Normalize email to lowercase 

362 email = email.lower().strip() 

363 

364 # Validate inputs 

365 self.validate_email(email) 

366 if not skip_password_validation: 

367 self.validate_password(password) 

368 

369 # Check if user already exists 

370 existing_user = await self.get_user_by_email(email) 

371 if existing_user: 

372 raise UserExistsError(f"User with email {email} already exists") 

373 

374 # Hash the password 

375 password_hash = await self.password_service.hash_password_async(password) 

376 

377 # Create new user (record password change timestamp) 

378 user = EmailUser( 

379 email=email, 

380 password_hash=password_hash, 

381 full_name=full_name, 

382 is_admin=is_admin, 

383 is_active=is_active, 

384 password_change_required=password_change_required, 

385 auth_provider=auth_provider, 

386 password_changed_at=utc_now(), 

387 admin_origin="api" if is_admin else None, 

388 ) 

389 

390 try: 

391 self.db.add(user) 

392 self.db.commit() 

393 self.db.refresh(user) 

394 

395 logger.info(f"Created new user: {email}") 

396 

397 # Create personal team first if enabled (needed for team-scoped role assignment) 

398 personal_team_id = None 

399 if getattr(settings, "auto_create_personal_teams", True): 

400 try: 

401 # Import here to avoid circular imports 

402 # First-Party 

403 from mcpgateway.services.personal_team_service import PersonalTeamService # pylint: disable=import-outside-toplevel 

404 

405 personal_team_service = PersonalTeamService(self.db) 

406 personal_team = await personal_team_service.create_personal_team(user) 

407 personal_team_id = personal_team.id # Get team_id directly from created team 

408 logger.info(f"Created personal team '{personal_team.name}' (ID: {personal_team_id}) for user {email}") 

409 except Exception as e: 

410 logger.warning(f"Failed to create personal team for {email}: {e}") 

411 # Don't fail user creation if personal team creation fails 

412 

413 # Auto-assign dual roles using RoleService (after personal team creation) 

414 try: 

415 granter = granted_by or email # Use granted_by if provided, otherwise self-granted 

416 

417 # Determine global role based on admin status 

418 global_role_name = "platform_admin" if is_admin else "platform_viewer" 

419 global_role = await self.role_service.get_role_by_name(global_role_name, "global") 

420 

421 if global_role: 

422 try: 

423 await self.role_service.assign_role_to_user(user_email=email, role_id=global_role.id, scope="global", scope_id=None, granted_by=granter) 

424 logger.info(f"Assigned {global_role_name} role (global scope) to user {email}") 

425 except ValueError as e: 

426 logger.warning(f"Could not assign {global_role_name} role to {email}: {e}") 

427 else: 

428 logger.warning(f"{global_role_name} role not found. User {email} created without global role.") 

429 

430 # Assign team_admin role with team scope (if personal team exists) 

431 if personal_team_id: 

432 team_admin_role = await self.role_service.get_role_by_name("team_admin", "team") 

433 

434 if team_admin_role: 

435 try: 

436 await self.role_service.assign_role_to_user(user_email=email, role_id=team_admin_role.id, scope="team", scope_id=personal_team_id, granted_by=granter) 

437 logger.info(f"Assigned team_admin role (team scope: {personal_team_id}) to user {email}") 

438 except ValueError as e: 

439 logger.warning(f"Could not assign team_admin role to {email}: {e}") 

440 else: 

441 logger.warning(f"team_admin role not found. User {email} created without team admin role.") 

442 

443 except Exception as role_error: 

444 logger.error(f"Failed to assign roles to user {email}: {role_error}") 

445 # Don't fail user creation if role assignment fails 

446 # User can be assigned roles manually later 

447 

448 # Log registration event 

449 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=True) 

450 self.db.add(registration_event) 

451 self.db.commit() 

452 

453 return user 

454 

455 except IntegrityError as e: 

456 self.db.rollback() 

457 logger.error(f"Database error creating user {email}: {e}") 

458 raise UserExistsError(f"User with email {email} already exists") from e 

459 except Exception as e: 

460 self.db.rollback() 

461 logger.error(f"Unexpected error creating user {email}: {e}") 

462 

463 # Log failed registration 

464 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=False, failure_reason=str(e)) 

465 self.db.add(registration_event) 

466 self.db.commit() 

467 

468 raise 

469 

470 async def authenticate_user(self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[EmailUser]: 

471 """Authenticate a user with email and password. 

472 

473 Args: 

474 email: User's email address 

475 password: Plain text password 

476 ip_address: Client IP address for logging 

477 user_agent: Client user agent for logging 

478 

479 Returns: 

480 EmailUser if authentication successful, None otherwise 

481 

482 Examples: 

483 # user = await service.authenticate_user("user@example.com", "correct_password") 

484 # user.email if user else None # Returns: 'user@example.com' 

485 # await service.authenticate_user("user@example.com", "wrong_password") # Returns: None 

486 """ 

487 email = email.lower().strip() 

488 

489 # Get user from database 

490 user = await self.get_user_by_email(email) 

491 

492 # Track authentication attempt 

493 auth_success = False 

494 failure_reason = None 

495 

496 try: 

497 if not user: 

498 failure_reason = "User not found" 

499 logger.info(f"Authentication failed for {email}: user not found") 

500 return None 

501 

502 if not user.is_active: 

503 failure_reason = "Account is disabled" 

504 logger.info(f"Authentication failed for {email}: account disabled") 

505 return None 

506 

507 is_protected_admin = user.is_admin and settings.protect_all_admins 

508 

509 if user.is_account_locked() and not is_protected_admin: 

510 failure_reason = "Account is locked" 

511 logger.info(f"Authentication failed for {email}: account locked") 

512 return None 

513 

514 # Clear lockout for protected admins so they can always attempt login 

515 if is_protected_admin and user.is_account_locked(): 

516 logger.info(f"Clearing lockout for protected admin {email}") 

517 user.reset_failed_attempts() 

518 self.db.commit() 

519 

520 # Verify password 

521 if not await self.password_service.verify_password_async(password, user.password_hash): 

522 failure_reason = "Invalid password" 

523 

524 # Increment failed attempts (skip for protected admins) 

525 if not is_protected_admin: 

526 max_attempts = getattr(settings, "max_failed_login_attempts", 5) 

527 lockout_duration = getattr(settings, "account_lockout_duration_minutes", 30) 

528 

529 is_locked = user.increment_failed_attempts(max_attempts, lockout_duration) 

530 

531 if is_locked: 

532 logger.warning(f"Account locked for {email} after {max_attempts} failed attempts") 

533 failure_reason = "Account locked due to too many failed attempts" 

534 

535 self.db.commit() 

536 logger.info(f"Authentication failed for {email}: invalid password") 

537 return None 

538 

539 # Authentication successful 

540 user.reset_failed_attempts() 

541 self.db.commit() 

542 

543 auth_success = True 

544 logger.info(f"Authentication successful for {email}") 

545 

546 return user 

547 

548 finally: 

549 # Log authentication event 

550 auth_event = EmailAuthEvent.create_login_attempt(user_email=email, success=auth_success, ip_address=ip_address, user_agent=user_agent, failure_reason=failure_reason) 

551 self.db.add(auth_event) 

552 self.db.commit() 

553 

554 async def change_password(self, email: str, old_password: Optional[str], new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool: 

555 """Change a user's password. 

556 

557 Args: 

558 email: User's email address 

559 old_password: Current password for verification 

560 new_password: New password to set 

561 ip_address: Client IP address for logging 

562 user_agent: Client user agent for logging 

563 

564 Returns: 

565 bool: True if password changed successfully 

566 

567 Raises: 

568 AuthenticationError: If old password is incorrect 

569 PasswordValidationError: If new password doesn't meet policy 

570 Exception: If database operation fails 

571 

572 Examples: 

573 # success = await service.change_password( 

574 # "user@example.com", 

575 # "old_password", 

576 # "new_secure_password" 

577 # ) 

578 # success # Returns: True 

579 """ 

580 # Validate old password is provided 

581 if old_password is None: 

582 raise AuthenticationError("Current password is required") 

583 

584 # First authenticate with old password 

585 user = await self.authenticate_user(email, old_password, ip_address, user_agent) 

586 if not user: 

587 raise AuthenticationError("Current password is incorrect") 

588 

589 # Validate new password 

590 self.validate_password(new_password) 

591 

592 # Check if new password is same as old (optional policy) 

593 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash): 

594 raise PasswordValidationError("New password must be different from current password") 

595 

596 success = False 

597 try: 

598 # Hash new password and update 

599 new_password_hash = await self.password_service.hash_password_async(new_password) 

600 user.password_hash = new_password_hash 

601 # Clear the flag that requires the user to change password 

602 user.password_change_required = False 

603 # Record the password change timestamp 

604 try: 

605 user.password_changed_at = utc_now() 

606 except Exception as exc: 

607 logger.debug("Failed to set password_changed_at for %s: %s", email, exc) 

608 

609 self.db.commit() 

610 success = True 

611 

612 # Invalidate auth cache for user 

613 try: 

614 # Standard 

615 import asyncio # pylint: disable=import-outside-toplevel 

616 

617 # First-Party 

618 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

619 

620 # Ensure cache invalidation runs before returning to avoid stale 

621 # auth context being used by subsequent requests. Use a timeout 

622 # to prevent blocking if Redis is slow or unresponsive. 

623 # Shield the task to prevent cancellation on timeout - allows Redis 

624 # operations to complete in background even if we stop waiting. 

625 await asyncio.wait_for(asyncio.shield(auth_cache.invalidate_user(email)), timeout=5.0) 

626 except asyncio.TimeoutError: 

627 logger.warning(f"Auth cache invalidation timed out for {email} - continuing in background") 

628 except Exception as cache_error: 

629 logger.debug(f"Failed to invalidate auth cache on password change: {cache_error}") 

630 

631 logger.info(f"Password changed successfully for {email}") 

632 

633 except Exception as e: 

634 self.db.rollback() 

635 logger.error(f"Error changing password for {email}: {e}") 

636 raise 

637 finally: 

638 # Log password change event 

639 password_event = EmailAuthEvent.create_password_change_event(user_email=email, success=success, ip_address=ip_address, user_agent=user_agent) 

640 self.db.add(password_event) 

641 self.db.commit() 

642 

643 return success 

644 

645 async def create_platform_admin(self, email: str, password: str, full_name: Optional[str] = None) -> EmailUser: 

646 """Create or update the platform administrator user. 

647 

648 This method is used during system bootstrap to create the initial 

649 admin user from environment variables. 

650 

651 Args: 

652 email: Admin email address 

653 password: Admin password 

654 full_name: Admin full name 

655 

656 Returns: 

657 EmailUser: The admin user 

658 

659 Examples: 

660 # admin = await service.create_platform_admin( 

661 # "admin@example.com", 

662 # "admin_password", 

663 # "Platform Administrator" 

664 # ) 

665 # admin.is_admin # Returns: True 

666 """ 

667 # Check if admin user already exists 

668 existing_admin = await self.get_user_by_email(email) 

669 

670 if existing_admin: 

671 # Update existing admin if password or name changed 

672 if full_name and existing_admin.full_name != full_name: 

673 existing_admin.full_name = full_name 

674 

675 # Check if password needs update (verify current password first) 

676 if not await self.password_service.verify_password_async(password, existing_admin.password_hash): 

677 existing_admin.password_hash = await self.password_service.hash_password_async(password) 

678 try: 

679 existing_admin.password_changed_at = utc_now() 

680 except Exception as exc: 

681 logger.debug("Failed to set password_changed_at for existing admin %s: %s", email, exc) 

682 

683 # Ensure admin status 

684 existing_admin.is_admin = True 

685 existing_admin.is_active = True 

686 

687 self.db.commit() 

688 logger.info(f"Updated platform admin user: {email}") 

689 return existing_admin 

690 

691 # Create new admin user - skip password validation during bootstrap 

692 admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True) 

693 

694 logger.info(f"Created platform admin user: {email}") 

695 return admin_user 

696 

697 async def update_last_login(self, email: str) -> None: 

698 """Update the last login timestamp for a user. 

699 

700 Args: 

701 email: User's email address 

702 """ 

703 user = await self.get_user_by_email(email) 

704 if user: 

705 user.reset_failed_attempts() # This also updates last_login 

706 self.db.commit() 

707 

708 @staticmethod 

709 def _escape_like(value: str) -> str: 

710 """Escape LIKE wildcards for prefix search. 

711 

712 Args: 

713 value: Raw value to escape for LIKE matching. 

714 

715 Returns: 

716 Escaped string safe for LIKE patterns. 

717 """ 

718 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

719 

720 async def list_users( 

721 self, 

722 limit: Optional[int] = None, 

723 cursor: Optional[str] = None, 

724 page: Optional[int] = None, 

725 per_page: Optional[int] = None, 

726 search: Optional[str] = None, 

727 ) -> UsersListResult: 

728 """List all users with cursor or page-based pagination support and optional search. 

729 

730 This method supports both cursor-based (for API endpoints with large datasets) 

731 and page-based (for admin UI with page numbers) pagination, with optional 

732 search filtering by email or full name. 

733 

734 Note: This method returns ORM objects and cannot be cached since callers 

735 depend on ORM attributes and methods (e.g., EmailUserResponse.from_email_user). 

736 

737 Args: 

738 limit: Maximum number of users to return (for cursor-based pagination) 

739 cursor: Opaque cursor token for cursor-based pagination 

740 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

741 per_page: Items per page for page-based pagination 

742 search: Optional search term to filter by email or full name (case-insensitive) 

743 

744 Returns: 

745 UsersListResult with data and optional pagination metadata. 

746 

747 Examples: 

748 # Cursor-based pagination (for APIs) 

749 # result = await service.list_users(cursor=None, limit=50) 

750 # len(result.data) <= 50 # Returns: True 

751 

752 # Page-based pagination (for admin UI) 

753 # result = await service.list_users(page=1, per_page=10) 

754 # result.data # Returns: list of users 

755 # result.pagination # Returns: pagination metadata 

756 

757 # Search users 

758 # users = await service.list_users(search="john", page=1, per_page=10) 

759 # All users with "john" in email or name 

760 """ 

761 try: 

762 # Build base query with ordering by created_at, email for consistent pagination 

763 # Note: EmailUser uses email as primary key, not id 

764 query = select(EmailUser).order_by(desc(EmailUser.created_at), desc(EmailUser.email)) 

765 

766 # Apply search filter if provided (prefix search for better index usage) 

767 if search and search.strip(): 

768 search_term = f"{self._escape_like(search.strip())}%" 

769 # NOTE: For large Postgres datasets, consider citext or functional indexes for case-insensitive search. 

770 query = query.where( 

771 or_( 

772 EmailUser.email.ilike(search_term, escape="\\"), 

773 EmailUser.full_name.ilike(search_term, escape="\\"), 

774 ) 

775 ) 

776 

777 # Page-based pagination: use unified_paginate 

778 if page is not None: 

779 pag_result = await unified_paginate( 

780 db=self.db, 

781 query=query, 

782 page=page, 

783 per_page=per_page, 

784 cursor=None, 

785 limit=None, 

786 base_url="/admin/users", 

787 query_params={}, 

788 ) 

789 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"]) 

790 

791 # Cursor-based pagination: custom implementation for EmailUser 

792 # EmailUser uses email as PK (not id), so we need custom cursor using (created_at, email) 

793 page_size = limit if limit and limit > 0 else settings.pagination_default_page_size 

794 if limit == 0: 

795 page_size = None # No limit 

796 

797 # Decode cursor and apply keyset filter if provided 

798 if cursor: 

799 try: 

800 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

801 cursor_data = orjson.loads(cursor_json) 

802 last_email = cursor_data.get("email") 

803 created_str = cursor_data.get("created_at") 

804 if last_email and created_str: 

805 last_created = datetime.fromisoformat(created_str) 

806 # Apply keyset filter (assumes DESC order on created_at, email) 

807 query = query.where( 

808 or_( 

809 EmailUser.created_at < last_created, 

810 and_(EmailUser.created_at == last_created, EmailUser.email < last_email), 

811 ) 

812 ) 

813 except (ValueError, TypeError) as e: 

814 logger.warning(f"Invalid cursor for user pagination, ignoring: {e}") 

815 

816 # Fetch page_size + 1 to determine if there are more results 

817 if page_size is not None: 

818 query = query.limit(page_size + 1) 

819 result = self.db.execute(query) 

820 users = list(result.scalars().all()) 

821 

822 if page_size is None: 

823 return UsersListResult(data=users, next_cursor=None) 

824 

825 # Check if there are more results 

826 has_more = len(users) > page_size 

827 if has_more: 

828 users = users[:page_size] 

829 

830 # Generate next cursor using (created_at, email) for EmailUser 

831 next_cursor = None 

832 if has_more and users: 

833 last_user = users[-1] 

834 cursor_data = { 

835 "created_at": last_user.created_at.isoformat() if last_user.created_at else None, 

836 "email": last_user.email, 

837 } 

838 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

839 

840 return UsersListResult(data=users, next_cursor=next_cursor) 

841 

842 except Exception as e: 

843 logger.error(f"Error listing users: {e}") 

844 # Return appropriate empty response based on pagination mode 

845 if page is not None: 

846 fallback_per_page = per_page or 50 

847 return UsersListResult( 

848 data=[], 

849 pagination=PaginationMeta(page=page, per_page=fallback_per_page, total_items=0, total_pages=0, has_next=False, has_prev=False), 

850 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg 

851 self=f"/admin/users?page=1&per_page={fallback_per_page}", 

852 first=f"/admin/users?page=1&per_page={fallback_per_page}", 

853 last=f"/admin/users?page=1&per_page={fallback_per_page}", 

854 ), 

855 ) 

856 

857 if cursor is not None: 

858 return UsersListResult(data=[], next_cursor=None) 

859 

860 return UsersListResult(data=[]) 

861 

862 async def list_users_not_in_team( 

863 self, 

864 team_id: str, 

865 cursor: Optional[str] = None, 

866 limit: Optional[int] = None, 

867 page: Optional[int] = None, 

868 per_page: Optional[int] = None, 

869 search: Optional[str] = None, 

870 ) -> UsersListResult: 

871 """List users who are NOT members of the specified team with cursor or page-based pagination. 

872 

873 Uses a NOT IN subquery to efficiently exclude team members. 

874 

875 Args: 

876 team_id: ID of the team to exclude members from 

877 cursor: Opaque cursor token for cursor-based pagination 

878 limit: Maximum number of users to return (for cursor-based, default: 50) 

879 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

880 per_page: Items per page for page-based pagination (default: 30) 

881 search: Optional search term to filter by email or full name 

882 

883 Returns: 

884 UsersListResult with data and either cursor or pagination metadata 

885 

886 Examples: 

887 # Page-based (admin UI) 

888 # result = await service.list_users_not_in_team("team-123", page=1, per_page=30) 

889 # result.pagination # Returns: pagination metadata 

890 

891 # Cursor-based (API) 

892 # result = await service.list_users_not_in_team("team-123", cursor=None, limit=50) 

893 # result.next_cursor # Returns: next cursor token 

894 """ 

895 try: 

896 # Build base query 

897 query = select(EmailUser) 

898 

899 # Apply search filter if provided 

900 if search and search.strip(): 

901 search_term = f"{self._escape_like(search.strip())}%" 

902 query = query.where( 

903 or_( 

904 EmailUser.email.ilike(search_term, escape="\\"), 

905 EmailUser.full_name.ilike(search_term, escape="\\"), 

906 ) 

907 ) 

908 

909 # Exclude team members using NOT IN subquery 

910 member_emails_subquery = select(EmailTeamMember.user_email).where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

911 query = query.where(EmailUser.is_active.is_(True), ~EmailUser.email.in_(member_emails_subquery)) 

912 

913 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate 

914 if page is not None: 

915 query = query.order_by(EmailUser.full_name, EmailUser.email) 

916 pag_result = await unified_paginate( 

917 db=self.db, 

918 query=query, 

919 page=page, 

920 per_page=per_page or 30, 

921 cursor=None, 

922 limit=None, 

923 base_url=f"/admin/teams/{team_id}/non-members", 

924 query_params={}, 

925 ) 

926 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"]) 

927 

928 # CURSOR-BASED PAGINATION - custom implementation using (created_at, email) 

929 # unified_paginate uses (created_at, id) but EmailUser uses email as PK 

930 query = query.order_by(desc(EmailUser.created_at), desc(EmailUser.email)) 

931 

932 # Decode cursor and apply keyset filter 

933 if cursor: 

934 try: 

935 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

936 cursor_data = orjson.loads(cursor_json) 

937 last_email = cursor_data.get("email") 

938 created_str = cursor_data.get("created_at") 

939 if last_email and created_str: 

940 last_created = datetime.fromisoformat(created_str) 

941 # Keyset filter: (created_at < last) OR (created_at = last AND email < last_email) 

942 query = query.where( 

943 or_( 

944 EmailUser.created_at < last_created, 

945 and_(EmailUser.created_at == last_created, EmailUser.email < last_email), 

946 ) 

947 ) 

948 except (ValueError, TypeError) as e: 

949 logger.warning(f"Invalid cursor for non-members list, ignoring: {e}") 

950 

951 # Fetch limit + 1 to check for more results 

952 page_size = limit or 50 

953 query = query.limit(page_size + 1) 

954 users = list(self.db.execute(query).scalars().all()) 

955 

956 # Check if there are more results 

957 has_more = len(users) > page_size 

958 if has_more: 

959 users = users[:page_size] 

960 

961 # Generate next cursor using (created_at, email) 

962 next_cursor = None 

963 if has_more and users: 

964 last_user = users[-1] 

965 cursor_data = { 

966 "created_at": last_user.created_at.isoformat() if last_user.created_at else None, 

967 "email": last_user.email, 

968 } 

969 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

970 

971 self.db.commit() 

972 return UsersListResult(data=users, next_cursor=next_cursor) 

973 

974 except Exception as e: 

975 logger.error(f"Error listing non-members for team {team_id}: {e}") 

976 

977 # Return appropriate empty response based on mode 

978 if page is not None: 

979 return UsersListResult( 

980 data=[], 

981 pagination=PaginationMeta(page=page, per_page=per_page or 30, total_items=0, total_pages=0, has_next=False, has_prev=False), 

982 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg 

983 self=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

984 first=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

985 last=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

986 ), 

987 ) 

988 

989 return UsersListResult(data=[], next_cursor=None) 

990 

991 async def get_all_users(self) -> list[EmailUser]: 

992 """Get all users without pagination. 

993 

994 .. deprecated:: 1.0 

995 Use :meth:`list_users` with proper pagination instead. 

996 This method has a hardcoded limit of 10,000 users and will not return 

997 more than that. For production systems with many users, use paginated 

998 access with search/filtering. 

999 

1000 Returns: 

1001 List of up to 10,000 EmailUser objects 

1002 

1003 Raises: 

1004 ValueError: If total users exceed 10,000 

1005 

1006 Examples: 

1007 # users = await service.get_all_users() 

1008 # isinstance(users, list) # Returns: True 

1009 

1010 Warning: 

1011 This method is deprecated and will be removed in a future version. 

1012 Use list_users() with pagination instead: 

1013 

1014 # For small datasets 

1015 users = await service.list_users(page=1, per_page=1000).data 

1016 

1017 # For searching 

1018 users = await service.list_users(search="john", page=1, per_page=10).data 

1019 """ 

1020 if not self.__class__.get_all_users_deprecated_warned: 

1021 warnings.warn( 

1022 "get_all_users() is deprecated and limited to 10,000 users. " + "Use list_users() with pagination instead.", 

1023 DeprecationWarning, 

1024 stacklevel=2, 

1025 ) 

1026 self.__class__.get_all_users_deprecated_warned = True 

1027 

1028 total_users = await self.count_users() 

1029 if total_users > _GET_ALL_USERS_LIMIT: 

1030 raise ValueError("get_all_users() supports up to 10,000 users. Use list_users() pagination instead.") 

1031 

1032 result = await self.list_users(limit=_GET_ALL_USERS_LIMIT) 

1033 return result.data # Large limit to get all users 

1034 

1035 async def count_users(self) -> int: 

1036 """Count total number of users. 

1037 

1038 Returns: 

1039 int: Total user count 

1040 """ 

1041 try: 

1042 stmt = select(func.count(EmailUser.email)) # pylint: disable=not-callable 

1043 count = self.db.execute(stmt).scalar() or 0 

1044 return count 

1045 except Exception as e: 

1046 logger.error(f"Error counting users: {e}") 

1047 return 0 

1048 

1049 async def get_auth_events(self, email: Optional[str] = None, limit: int = 100, offset: int = 0) -> list[EmailAuthEvent]: 

1050 """Get authentication events for auditing. 

1051 

1052 Args: 

1053 email: Filter by specific user email (optional) 

1054 limit: Maximum number of events to return 

1055 offset: Number of events to skip 

1056 

1057 Returns: 

1058 List of EmailAuthEvent objects 

1059 """ 

1060 try: 

1061 stmt = select(EmailAuthEvent) 

1062 if email: 

1063 stmt = stmt.where(EmailAuthEvent.user_email == email) 

1064 stmt = stmt.order_by(EmailAuthEvent.timestamp.desc()).offset(offset).limit(limit) 

1065 

1066 result = self.db.execute(stmt) 

1067 events = list(result.scalars().all()) 

1068 return events 

1069 except Exception as e: 

1070 logger.error(f"Error getting auth events: {e}") 

1071 return [] 

1072 

1073 async def update_user( 

1074 self, 

1075 email: str, 

1076 full_name: Optional[str] = None, 

1077 is_admin: Optional[bool] = None, 

1078 is_active: Optional[bool] = None, 

1079 password_change_required: Optional[bool] = None, 

1080 password: Optional[str] = None, 

1081 admin_origin_source: Optional[str] = None, 

1082 ) -> EmailUser: 

1083 """Update user information. 

1084 

1085 Args: 

1086 email: User's email address (primary key) 

1087 full_name: New full name (optional) 

1088 is_admin: New admin status (optional) 

1089 is_active: New active status (optional) 

1090 password_change_required: Whether user must change password on next login (optional) 

1091 password: New password (optional, will be hashed) 

1092 admin_origin_source: Source of admin change for tracking (e.g. "api", "ui"). Callers should pass explicitly. 

1093 

1094 Returns: 

1095 EmailUser: Updated user object 

1096 

1097 Raises: 

1098 ValueError: If user doesn't exist, if protect_all_admins blocks the change, or if it would remove the last active admin 

1099 PasswordValidationError: If password doesn't meet policy 

1100 """ 

1101 try: 

1102 # Normalize email to match create_user() / get_user_by_email() behavior 

1103 email = email.lower().strip() 

1104 

1105 # Get existing user 

1106 stmt = select(EmailUser).where(EmailUser.email == email) 

1107 result = self.db.execute(stmt) 

1108 user = result.scalar_one_or_none() 

1109 

1110 if not user: 

1111 raise ValueError(f"User {email} not found") 

1112 

1113 # Admin protection guard 

1114 if user.is_admin and user.is_active: 

1115 would_lose_admin = (is_admin is not None and not is_admin) or (is_active is not None and not is_active) 

1116 if would_lose_admin: 

1117 if settings.protect_all_admins: 

1118 raise ValueError("Admin protection is enabled — cannot demote or deactivate any admin user") 

1119 if await self.is_last_active_admin(email): 

1120 raise ValueError("Cannot demote or deactivate the last remaining active admin user") 

1121 

1122 # Update fields if provided 

1123 if full_name is not None: 

1124 user.full_name = full_name 

1125 

1126 if is_admin is not None: 

1127 # Track admin_origin when status actually changes 

1128 if is_admin != user.is_admin: 1128 ↛ 1172line 1128 didn't jump to line 1172 because the condition on line 1128 was always true

1129 user.is_admin = is_admin 

1130 user.admin_origin = admin_origin_source if is_admin else None 

1131 

1132 # Sync global role assignment with is_admin flag: 

1133 # Promotion: revoke platform_viewer, assign platform_admin 

1134 # Demotion: revoke platform_admin, assign platform_viewer 

1135 try: 

1136 platform_admin_role = await self.role_service.get_role_by_name("platform_admin", "global") 

1137 platform_viewer_role = await self.role_service.get_role_by_name("platform_viewer", "global") 

1138 

1139 if is_admin: 

1140 # Promotion: assign platform_admin, revoke platform_viewer 

1141 if platform_admin_role: 

1142 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None) 

1143 if not existing or not existing.is_active: 1143 ↛ 1149line 1143 didn't jump to line 1149 because the condition on line 1143 was always true

1144 await self.role_service.assign_role_to_user(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=email) 

1145 logger.info(f"Assigned platform_admin role to {email}") 

1146 else: 

1147 logger.warning(f"platform_admin role not found, cannot assign to {email}") 

1148 

1149 if platform_viewer_role: 

1150 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=platform_viewer_role.id, scope="global", scope_id=None) 

1151 if revoked: 1151 ↛ 1172line 1151 didn't jump to line 1172 because the condition on line 1151 was always true

1152 logger.info(f"Revoked platform_viewer role from {email}") 

1153 else: 

1154 # Demotion: revoke platform_admin, assign platform_viewer 

1155 if platform_admin_role: 1155 ↛ 1160line 1155 didn't jump to line 1160 because the condition on line 1155 was always true

1156 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None) 

1157 if revoked: 1157 ↛ 1160line 1157 didn't jump to line 1160 because the condition on line 1157 was always true

1158 logger.info(f"Revoked platform_admin role from {email}") 

1159 

1160 if platform_viewer_role: 1160 ↛ 1166line 1160 didn't jump to line 1166 because the condition on line 1160 was always true

1161 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=platform_viewer_role.id, scope="global", scope_id=None) 

1162 if not existing or not existing.is_active: 1162 ↛ 1172line 1162 didn't jump to line 1172 because the condition on line 1162 was always true

1163 await self.role_service.assign_role_to_user(user_email=email, role_id=platform_viewer_role.id, scope="global", scope_id=None, granted_by=email) 

1164 logger.info(f"Assigned platform_viewer role to {email}") 

1165 else: 

1166 logger.warning(f"platform_viewer role not found, cannot assign to {email}") 

1167 

1168 except Exception as e: 

1169 logger.warning(f"Failed to sync global roles for {email}: {e}") 

1170 # Don't fail user update if role sync fails 

1171 

1172 if is_active is not None: 

1173 user.is_active = is_active 

1174 

1175 if password is not None: 

1176 self.validate_password(password) 

1177 user.password_hash = await self.password_service.hash_password_async(password) 

1178 # Only clear password_change_required if it wasn't explicitly set 

1179 if password_change_required is None: 

1180 user.password_change_required = False 

1181 user.password_changed_at = utc_now() 

1182 

1183 # Set password_change_required after password processing to allow explicit override 

1184 if password_change_required is not None: 

1185 user.password_change_required = password_change_required 

1186 

1187 user.updated_at = datetime.now(timezone.utc) 

1188 

1189 self.db.commit() 

1190 

1191 return user 

1192 

1193 except Exception as e: 

1194 self.db.rollback() 

1195 logger.error(f"Error updating user {email}: {e}") 

1196 raise 

1197 

1198 async def activate_user(self, email: str) -> EmailUser: 

1199 """Activate a user account. 

1200 

1201 Args: 

1202 email: User's email address 

1203 

1204 Returns: 

1205 EmailUser: Updated user object 

1206 

1207 Raises: 

1208 ValueError: If user doesn't exist 

1209 """ 

1210 try: 

1211 stmt = select(EmailUser).where(EmailUser.email == email) 

1212 result = self.db.execute(stmt) 

1213 user = result.scalar_one_or_none() 

1214 

1215 if not user: 

1216 raise ValueError(f"User {email} not found") 

1217 

1218 user.is_active = True 

1219 user.updated_at = datetime.now(timezone.utc) 

1220 

1221 self.db.commit() 

1222 

1223 logger.info(f"User {email} activated") 

1224 return user 

1225 

1226 except Exception as e: 

1227 self.db.rollback() 

1228 logger.error(f"Error activating user {email}: {e}") 

1229 raise 

1230 

1231 async def deactivate_user(self, email: str) -> EmailUser: 

1232 """Deactivate a user account. 

1233 

1234 Args: 

1235 email: User's email address 

1236 

1237 Returns: 

1238 EmailUser: Updated user object 

1239 

1240 Raises: 

1241 ValueError: If user doesn't exist 

1242 """ 

1243 try: 

1244 stmt = select(EmailUser).where(EmailUser.email == email) 

1245 result = self.db.execute(stmt) 

1246 user = result.scalar_one_or_none() 

1247 

1248 if not user: 

1249 raise ValueError(f"User {email} not found") 

1250 

1251 user.is_active = False 

1252 user.updated_at = datetime.now(timezone.utc) 

1253 

1254 self.db.commit() 

1255 

1256 logger.info(f"User {email} deactivated") 

1257 return user 

1258 

1259 except Exception as e: 

1260 self.db.rollback() 

1261 logger.error(f"Error deactivating user {email}: {e}") 

1262 raise 

1263 

1264 async def delete_user(self, email: str) -> bool: 

1265 """Delete a user account permanently. 

1266 

1267 Args: 

1268 email: User's email address 

1269 

1270 Returns: 

1271 bool: True if user was deleted 

1272 

1273 Raises: 

1274 ValueError: If user doesn't exist 

1275 ValueError: If user owns teams that cannot be transferred 

1276 """ 

1277 try: 

1278 stmt = select(EmailUser).where(EmailUser.email == email) 

1279 result = self.db.execute(stmt) 

1280 user = result.scalar_one_or_none() 

1281 

1282 if not user: 

1283 raise ValueError(f"User {email} not found") 

1284 

1285 # Check if user owns any teams 

1286 teams_owned_stmt = select(EmailTeam).where(EmailTeam.created_by == email) 

1287 teams_owned = self.db.execute(teams_owned_stmt).scalars().all() 

1288 

1289 if teams_owned: 

1290 # For each team, try to transfer ownership to another owner 

1291 for team in teams_owned: 

1292 # Find other team owners who can take ownership 

1293 potential_owners_stmt = ( 

1294 select(EmailTeamMember).where(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email != email, EmailTeamMember.role == "owner").order_by(EmailTeamMember.role.desc()) 

1295 ) 

1296 

1297 potential_owners = self.db.execute(potential_owners_stmt).scalars().all() 

1298 

1299 if potential_owners: 

1300 # Transfer ownership to the first available owner 

1301 new_owner = potential_owners[0] 

1302 team.created_by = new_owner.user_email 

1303 logger.info(f"Transferred team '{team.name}' ownership from {email} to {new_owner.user_email}") 

1304 else: 

1305 # No other owners available - check if it's a single-user team 

1306 all_members_stmt = select(EmailTeamMember).where(EmailTeamMember.team_id == team.id) 

1307 all_members = self.db.execute(all_members_stmt).scalars().all() 

1308 

1309 if len(all_members) == 1 and all_members[0].user_email == email: 

1310 # This is a single-user personal team - cascade delete it 

1311 logger.info(f"Deleting personal team '{team.name}' (single member: {email})") 

1312 # Delete team members first (should be just the owner) 

1313 delete_team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.team_id == team.id) 

1314 self.db.execute(delete_team_members_stmt) 

1315 # Delete the team 

1316 self.db.delete(team) 

1317 else: 

1318 # Multi-member team with no other owners - cannot delete user 

1319 raise ValueError(f"Cannot delete user {email}: owns team '{team.name}' with {len(all_members)} members but no other owners to transfer ownership to") 

1320 

1321 # Delete all role assignments for the user 

1322 try: 

1323 await self.role_service.delete_all_user_roles(email) 

1324 except Exception as e: 

1325 logger.warning(f"Failed to delete role assignments for {email}: {e}") 

1326 

1327 # Delete related auth events 

1328 auth_events_stmt = delete(EmailAuthEvent).where(EmailAuthEvent.user_email == email) 

1329 self.db.execute(auth_events_stmt) 

1330 

1331 # Remove user from all team memberships 

1332 team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.user_email == email) 

1333 self.db.execute(team_members_stmt) 

1334 

1335 # Delete the user 

1336 self.db.delete(user) 

1337 self.db.commit() 

1338 

1339 # Invalidate all auth caches for deleted user 

1340 try: 

1341 # Standard 

1342 import asyncio # pylint: disable=import-outside-toplevel 

1343 

1344 # First-Party 

1345 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

1346 

1347 asyncio.create_task(auth_cache.invalidate_user(email)) 

1348 asyncio.create_task(auth_cache.invalidate_user_teams(email)) 

1349 asyncio.create_task(auth_cache.invalidate_team_membership(email)) 

1350 except Exception as cache_error: 

1351 logger.debug(f"Failed to invalidate cache on user delete: {cache_error}") 

1352 

1353 logger.info(f"User {email} deleted permanently") 

1354 return True 

1355 

1356 except Exception as e: 

1357 self.db.rollback() 

1358 logger.error(f"Error deleting user {email}: {e}") 

1359 raise 

1360 

1361 async def count_active_admin_users(self) -> int: 

1362 """Count the number of active admin users. 

1363 

1364 Returns: 

1365 int: Number of active admin users 

1366 """ 

1367 stmt = select(func.count(EmailUser.email)).where(EmailUser.is_admin.is_(True), EmailUser.is_active.is_(True)) # pylint: disable=not-callable 

1368 result = self.db.execute(stmt) 

1369 return result.scalar() or 0 

1370 

1371 async def is_last_active_admin(self, email: str) -> bool: 

1372 """Check if the given user is the last active admin. 

1373 

1374 Args: 

1375 email: User's email address 

1376 

1377 Returns: 

1378 bool: True if this user is the last active admin 

1379 """ 

1380 # First check if the user is an active admin 

1381 stmt = select(EmailUser).where(EmailUser.email == email) 

1382 result = self.db.execute(stmt) 

1383 user = result.scalar_one_or_none() 

1384 

1385 if not user or not user.is_admin or not user.is_active: 

1386 return False 

1387 

1388 # Count total active admins 

1389 admin_count = await self.count_active_admin_users() 

1390 return admin_count == 1