Coverage for mcpgateway / services / email_auth_service.py: 99%

727 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/email_auth_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Email Authentication Service. 

8This module provides email-based user authentication services including 

9user creation, authentication, password management, and security features. 

10 

11Examples: 

12 Basic usage (requires async context): 

13 from mcpgateway.services.email_auth_service import EmailAuthService 

14 from mcpgateway.db import SessionLocal 

15 

16 with SessionLocal() as db: 

17 service = EmailAuthService(db) 

18 # Use in async context: 

19 # user = await service.create_user("test@example.com", "password123") 

20 # authenticated = await service.authenticate_user("test@example.com", "password123") 

21""" 

22 

23# Standard 

24import asyncio 

25import base64 

26from dataclasses import dataclass 

27from datetime import datetime, timedelta, timezone 

28import hashlib 

29import hmac 

30import re 

31import secrets 

32import time 

33from typing import Optional 

34import urllib.parse 

35import warnings 

36 

37# Third-Party 

38import orjson 

39from sqlalchemy import and_, delete, desc, func, or_, select 

40from sqlalchemy.exc import IntegrityError 

41from sqlalchemy.orm import Session 

42 

43# First-Party 

44from mcpgateway.config import settings 

45from mcpgateway.db import ( 

46 EmailAuthEvent, 

47 EmailTeam, 

48 EmailTeamInvitation, 

49 EmailTeamJoinRequest, 

50 EmailTeamMember, 

51 EmailTeamMemberHistory, 

52 EmailUser, 

53 PasswordResetToken, 

54 PendingUserApproval, 

55 Role, 

56 SSOAuthSession, 

57 TokenRevocation, 

58 UserRole, 

59 utc_now, 

60) 

61from mcpgateway.schemas import PaginationLinks, PaginationMeta 

62from mcpgateway.services.argon2_service import Argon2PasswordService 

63from mcpgateway.services.email_notification_service import AuthEmailNotificationService 

64from mcpgateway.services.logging_service import LoggingService 

65from mcpgateway.services.metrics import password_reset_completions_counter, password_reset_requests_counter 

66from mcpgateway.utils.pagination import unified_paginate 

67 

68# Initialize logging 

69logging_service = LoggingService() 

70logger = logging_service.get_logger(__name__) 

71 

72# Strong references to background tasks to prevent GC before completion 

73_background_tasks: set[asyncio.Task] = set() 

74 

75_GET_ALL_USERS_LIMIT = 10000 

76_DUMMY_ARGON2_HASH = "$argon2id$v=19$m=65536,t=3,p=1$9x/nTs9D0R97+BI7BWP2Tg$V/40qCuaGh4i+94HpGpxJESEVs3IDpLzUqtNqRPuty4" 

77 

78 

79@dataclass(frozen=True) 

80class UsersListResult: 

81 """Result for list_users queries.""" 

82 

83 data: list[EmailUser] 

84 next_cursor: Optional[str] = None 

85 pagination: Optional[PaginationMeta] = None 

86 links: Optional[PaginationLinks] = None 

87 

88 

89@dataclass(frozen=True) 

90class PasswordResetRequestResult: 

91 """Result for forgot-password requests.""" 

92 

93 rate_limited: bool 

94 email_sent: bool 

95 

96 

97class EmailValidationError(Exception): 

98 """Raised when email format is invalid. 

99 

100 Examples: 

101 >>> try: 

102 ... raise EmailValidationError("Invalid email format") 

103 ... except EmailValidationError as e: 

104 ... str(e) 

105 'Invalid email format' 

106 """ 

107 

108 

109class PasswordValidationError(Exception): 

110 """Raised when password doesn't meet policy requirements. 

111 

112 Examples: 

113 >>> try: 

114 ... raise PasswordValidationError("Password too short") 

115 ... except PasswordValidationError as e: 

116 ... str(e) 

117 'Password too short' 

118 """ 

119 

120 

121class UserExistsError(Exception): 

122 """Raised when attempting to create a user that already exists. 

123 

124 Examples: 

125 >>> try: 

126 ... raise UserExistsError("User already exists") 

127 ... except UserExistsError as e: 

128 ... str(e) 

129 'User already exists' 

130 """ 

131 

132 

133class AuthenticationError(Exception): 

134 """Raised when authentication fails. 

135 

136 Examples: 

137 >>> try: 

138 ... raise AuthenticationError("Invalid credentials") 

139 ... except AuthenticationError as e: 

140 ... str(e) 

141 'Invalid credentials' 

142 """ 

143 

144 

145class EmailAuthService: 

146 """Service for email-based user authentication. 

147 

148 This service handles user registration, authentication, password management, 

149 and security features like account lockout and failed attempt tracking. 

150 

151 Attributes: 

152 db (Session): Database session 

153 password_service (Argon2PasswordService): Password hashing service 

154 

155 Examples: 

156 >>> from mcpgateway.db import SessionLocal 

157 >>> with SessionLocal() as db: 

158 ... service = EmailAuthService(db) 

159 ... # Service is ready to use 

160 """ 

161 

162 get_all_users_deprecated_warned = False 

163 

164 def __init__(self, db: Session): 

165 """Initialize the email authentication service. 

166 

167 Args: 

168 db: SQLAlchemy database session 

169 """ 

170 self.db = db 

171 self.password_service = Argon2PasswordService() 

172 self.email_notification_service = AuthEmailNotificationService() 

173 self._role_service = None 

174 logger.debug("EmailAuthService initialized") 

175 

176 @property 

177 def role_service(self): 

178 """Lazy-initialized RoleService to avoid circular imports. 

179 

180 Returns: 

181 RoleService: Instance of RoleService 

182 """ 

183 if self._role_service is None: 

184 # First-Party 

185 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

186 

187 self._role_service = RoleService(self.db) 

188 return self._role_service 

189 

190 def validate_email(self, email: str) -> bool: 

191 """Validate email address format. 

192 

193 Args: 

194 email: Email address to validate 

195 

196 Returns: 

197 bool: True if email is valid 

198 

199 Raises: 

200 EmailValidationError: If email format is invalid 

201 

202 Examples: 

203 >>> service = EmailAuthService(None) 

204 >>> service.validate_email("user@example.com") 

205 True 

206 >>> service.validate_email("test.user+tag@domain.co.uk") 

207 True 

208 >>> service.validate_email("user123@test-domain.com") 

209 True 

210 >>> try: 

211 ... service.validate_email("invalid-email") 

212 ... except EmailValidationError as e: 

213 ... "Invalid email format" in str(e) 

214 True 

215 >>> try: 

216 ... service.validate_email("") 

217 ... except EmailValidationError as e: 

218 ... "Email is required" in str(e) 

219 True 

220 >>> try: 

221 ... service.validate_email("user@") 

222 ... except EmailValidationError as e: 

223 ... "Invalid email format" in str(e) 

224 True 

225 >>> try: 

226 ... service.validate_email("@domain.com") 

227 ... except EmailValidationError as e: 

228 ... "Invalid email format" in str(e) 

229 True 

230 >>> try: 

231 ... service.validate_email("user@domain") 

232 ... except EmailValidationError as e: 

233 ... "Invalid email format" in str(e) 

234 True 

235 >>> try: 

236 ... service.validate_email("a" * 250 + "@domain.com") 

237 ... except EmailValidationError as e: 

238 ... "Email address too long" in str(e) 

239 True 

240 >>> try: 

241 ... service.validate_email(None) 

242 ... except EmailValidationError as e: 

243 ... "Email is required" in str(e) 

244 True 

245 """ 

246 if not email or not isinstance(email, str): 

247 raise EmailValidationError("Email is required and must be a string") 

248 

249 # Basic email regex pattern 

250 email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" 

251 

252 if not re.match(email_pattern, email): 

253 raise EmailValidationError("Invalid email format") 

254 

255 if len(email) > 255: 

256 raise EmailValidationError("Email address too long (max 255 characters)") 

257 

258 return True 

259 

260 def validate_password(self, password: str) -> bool: 

261 """Validate password against policy requirements. 

262 

263 Args: 

264 password: Password to validate 

265 

266 Returns: 

267 bool: True if password meets policy 

268 

269 Raises: 

270 PasswordValidationError: If password doesn't meet requirements 

271 

272 Examples: 

273 >>> service = EmailAuthService(None) 

274 >>> service.validate_password("Password123!") # Meets all requirements 

275 True 

276 >>> service.validate_password("ValidPassword123!") 

277 True 

278 >>> service.validate_password("Shortpass!") # 8+ chars with requirements 

279 True 

280 >>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!") 

281 True 

282 >>> try: 

283 ... service.validate_password("") 

284 ... except PasswordValidationError as e: 

285 ... "Password is required" in str(e) 

286 True 

287 >>> try: 

288 ... service.validate_password(None) 

289 ... except PasswordValidationError as e: 

290 ... "Password is required" in str(e) 

291 True 

292 >>> try: 

293 ... service.validate_password("short") # Only 5 chars, should fail with default min_length=8 

294 ... except PasswordValidationError as e: 

295 ... "characters long" in str(e) 

296 True 

297 """ 

298 if not password: 

299 raise PasswordValidationError("Password is required") 

300 

301 # Respect global toggle for password policy 

302 if not getattr(settings, "password_policy_enabled", True): 

303 return True 

304 

305 # Get password policy settings 

306 min_length = getattr(settings, "password_min_length", 8) 

307 require_uppercase = getattr(settings, "password_require_uppercase", False) 

308 require_lowercase = getattr(settings, "password_require_lowercase", False) 

309 require_numbers = getattr(settings, "password_require_numbers", False) 

310 require_special = getattr(settings, "password_require_special", False) 

311 

312 if len(password) < min_length: 

313 raise PasswordValidationError(f"Password must be at least {min_length} characters long") 

314 

315 if require_uppercase and not re.search(r"[A-Z]", password): 

316 raise PasswordValidationError("Password must contain at least one uppercase letter") 

317 

318 if require_lowercase and not re.search(r"[a-z]", password): 

319 raise PasswordValidationError("Password must contain at least one lowercase letter") 

320 

321 if require_numbers and not re.search(r"[0-9]", password): 

322 raise PasswordValidationError("Password must contain at least one number") 

323 

324 if require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): 

325 raise PasswordValidationError("Password must contain at least one special character") 

326 

327 return True 

328 

329 @staticmethod 

330 def _hash_reset_token(token: str) -> str: 

331 """Hash a plaintext password-reset token using SHA-256. 

332 

333 Args: 

334 token: Plaintext reset token. 

335 

336 Returns: 

337 str: Hex-encoded SHA-256 digest. 

338 """ 

339 return hashlib.sha256(token.encode("utf-8")).hexdigest() 

340 

341 @staticmethod 

342 def _minimum_reset_response_seconds() -> float: 

343 """Get minimum forgot-password response duration. 

344 

345 Returns: 

346 float: Minimum response duration in seconds. 

347 """ 

348 min_ms = max(0, int(getattr(settings, "password_reset_min_response_ms", 250))) 

349 return min_ms / 1000.0 

350 

351 @staticmethod 

352 def _minimum_login_failure_seconds() -> float: 

353 """Get minimum failed-login response duration. 

354 

355 Returns: 

356 float: Minimum failure response duration in seconds. 

357 """ 

358 min_ms = max(0, int(getattr(settings, "failed_login_min_response_ms", 250))) 

359 return min_ms / 1000.0 

360 

361 async def _apply_failed_login_floor(self, start_time: float) -> None: 

362 """Apply minimum failed-login response duration. 

363 

364 Args: 

365 start_time: Monotonic timestamp when login processing started. 

366 """ 

367 remaining = self._minimum_login_failure_seconds() - (time.monotonic() - start_time) 

368 if remaining > 0: 

369 await asyncio.sleep(remaining) 

370 

371 async def _verify_dummy_password_for_timing(self, password: str) -> None: 

372 """Run dummy Argon2 verification to reduce observable timing differences. 

373 

374 Args: 

375 password: User-supplied password candidate. 

376 """ 

377 try: 

378 await self.password_service.verify_password_async(password, _DUMMY_ARGON2_HASH) 

379 except Exception as exc: # nosec B110 

380 logger.debug("Dummy password verification failed: %s", exc) 

381 

382 @staticmethod 

383 def _build_forgot_password_url() -> str: 

384 """Build the absolute forgot-password page URL. 

385 

386 Returns: 

387 str: Absolute forgot-password URL. 

388 """ 

389 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/") 

390 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/") 

391 return f"{app_domain}{root_path}/admin/forgot-password" 

392 

393 @staticmethod 

394 def _build_reset_password_url(token: str) -> str: 

395 """Build the absolute reset-password URL for a token. 

396 

397 Args: 

398 token: Plaintext reset token. 

399 

400 Returns: 

401 str: Absolute reset-password URL. 

402 """ 

403 safe_token = urllib.parse.quote(token, safe="") 

404 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/") 

405 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/") 

406 return f"{app_domain}{root_path}/admin/reset-password/{safe_token}" 

407 

408 async def _invalidate_user_auth_cache(self, email: str) -> None: 

409 """Invalidate cached authentication data for a user. 

410 

411 Args: 

412 email: User email for cache invalidation. 

413 """ 

414 try: 

415 # First-Party 

416 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

417 

418 await asyncio.wait_for(asyncio.shield(auth_cache.invalidate_user(email)), timeout=5.0) 

419 except asyncio.TimeoutError: 

420 logger.warning("Auth cache invalidation timed out for %s - continuing", email) 

421 except Exception as cache_error: # nosec B110 

422 logger.debug("Failed to invalidate auth cache for %s: %s", email, cache_error) 

423 

424 def _log_auth_event( 

425 self, 

426 event_type: str, 

427 success: bool, 

428 user_email: Optional[str], 

429 ip_address: Optional[str] = None, 

430 user_agent: Optional[str] = None, 

431 failure_reason: Optional[str] = None, 

432 details: Optional[dict] = None, 

433 ) -> None: 

434 """Persist a custom authentication/security event. 

435 

436 Args: 

437 event_type: Event type identifier. 

438 success: Whether the event succeeded. 

439 user_email: Related user email, if available. 

440 ip_address: Source IP address. 

441 user_agent: Source user agent string. 

442 failure_reason: Failure detail when `success` is False. 

443 details: Additional structured event payload. 

444 """ 

445 try: 

446 event = EmailAuthEvent( 

447 user_email=user_email, 

448 event_type=event_type, 

449 success=success, 

450 ip_address=ip_address, 

451 user_agent=user_agent, 

452 failure_reason=failure_reason, 

453 details=orjson.dumps(details).decode() if details else None, 

454 ) 

455 self.db.add(event) 

456 self.db.commit() 

457 except Exception as exc: 

458 self.db.rollback() 

459 logger.warning("Failed to persist auth event %s for %s: %s", event_type, user_email, exc) 

460 

461 def _recent_password_reset_request_count(self, email: str, now: datetime) -> int: 

462 """Count recent password-reset requests for rate limiting. 

463 

464 Args: 

465 email: Email to count requests for. 

466 now: Current UTC timestamp. 

467 

468 Returns: 

469 int: Number of reset requests in the current rate-limit window. 

470 """ 

471 window_minutes = int(getattr(settings, "password_reset_rate_window_minutes", 15)) 

472 window_start = now - timedelta(minutes=window_minutes) 

473 stmt = ( 

474 select(func.count(EmailAuthEvent.id)) # pylint: disable=not-callable 

475 .where(EmailAuthEvent.event_type == "PASSWORD_RESET_REQUESTED") 

476 .where(EmailAuthEvent.user_email == email) 

477 .where(EmailAuthEvent.timestamp >= window_start) 

478 ) 

479 count = self.db.execute(stmt).scalar() 

480 return int(count or 0) 

481 

482 async def get_user_by_email(self, email: str) -> Optional[EmailUser]: 

483 """Get user by email address. 

484 

485 Args: 

486 email: Email address to look up 

487 

488 Returns: 

489 EmailUser or None if not found 

490 

491 Examples: 

492 # Assuming database has user "test@example.com" 

493 # user = await service.get_user_by_email("test@example.com") 

494 # user.email if user else None # Returns: 'test@example.com' 

495 """ 

496 try: 

497 stmt = select(EmailUser).where(EmailUser.email == email.lower()) 

498 result = self.db.execute(stmt) 

499 user = result.scalar_one_or_none() 

500 return user 

501 except Exception as e: 

502 logger.error(f"Error getting user by email {email}: {e}") 

503 return None 

504 

505 async def create_user( 

506 self, 

507 email: str, 

508 password: str, 

509 full_name: Optional[str] = None, 

510 is_admin: bool = False, 

511 is_active: bool = True, 

512 password_change_required: bool = False, 

513 auth_provider: str = "local", 

514 skip_password_validation: bool = False, 

515 granted_by: Optional[str] = None, 

516 ) -> EmailUser: 

517 """Create a new user with email authentication. 

518 

519 Args: 

520 email: User's email address (primary key) 

521 password: Plain text password (will be hashed) 

522 full_name: Optional full name for display 

523 is_admin: Whether user has admin privileges 

524 is_active: Whether user account is active (default: True) 

525 password_change_required: Whether user must change password on next login (default: False) 

526 auth_provider: Authentication provider ('local', 'github', etc.) 

527 skip_password_validation: Skip password policy validation (for bootstrap) 

528 granted_by: Email of user creating this user (for role assignment audit trail) 

529 

530 Returns: 

531 EmailUser: The created user object 

532 

533 Raises: 

534 EmailValidationError: If email format is invalid 

535 PasswordValidationError: If password doesn't meet policy 

536 UserExistsError: If user already exists 

537 

538 Examples: 

539 # user = await service.create_user( 

540 # email="new@example.com", 

541 # password="secure123", 

542 # full_name="New User", 

543 # is_active=True, 

544 # password_change_required=False 

545 # ) 

546 # user.email # Returns: 'new@example.com' 

547 # user.full_name # Returns: 'New User' 

548 # user.is_active # Returns: True 

549 """ 

550 # Normalize email to lowercase 

551 email = email.lower().strip() 

552 

553 # Validate inputs 

554 self.validate_email(email) 

555 if not skip_password_validation: 

556 self.validate_password(password) 

557 

558 # Check if user already exists 

559 existing_user = await self.get_user_by_email(email) 

560 if existing_user: 

561 raise UserExistsError(f"User with email {email} already exists") 

562 

563 # Hash the password 

564 password_hash = await self.password_service.hash_password_async(password) 

565 

566 # Create new user (record password change timestamp) 

567 user = EmailUser( 

568 email=email, 

569 password_hash=password_hash, 

570 full_name=full_name, 

571 is_admin=is_admin, 

572 is_active=is_active, 

573 password_change_required=password_change_required, 

574 auth_provider=auth_provider, 

575 password_changed_at=utc_now(), 

576 admin_origin="api" if is_admin else None, 

577 ) 

578 

579 # Admin-created users are implicitly email-verified (the admin vouched for them) 

580 if granted_by: 

581 user.email_verified_at = utc_now() 

582 

583 try: 

584 self.db.add(user) 

585 self.db.commit() 

586 self.db.refresh(user) 

587 

588 logger.info(f"Created new user: {email}") 

589 

590 # Create personal team first if enabled (needed for team-scoped role assignment) 

591 personal_team_id = None 

592 if getattr(settings, "auto_create_personal_teams", True): 

593 try: 

594 # Import here to avoid circular imports 

595 # First-Party 

596 from mcpgateway.services.personal_team_service import PersonalTeamService # pylint: disable=import-outside-toplevel 

597 

598 personal_team_service = PersonalTeamService(self.db) 

599 personal_team = await personal_team_service.create_personal_team(user) 

600 personal_team_id = personal_team.id # Get team_id directly from created team 

601 logger.info(f"Created personal team '{personal_team.name}' (ID: {personal_team_id}) for user {email}") 

602 except Exception as e: 

603 logger.warning(f"Failed to create personal team for {email}: {e}") 

604 # Don't fail user creation if personal team creation fails 

605 

606 # Auto-assign dual roles using RoleService (after personal team creation) 

607 try: 

608 granter = granted_by or email # Use granted_by if provided, otherwise self-granted 

609 

610 # Determine global role based on admin status 

611 global_role_name = settings.default_admin_role if is_admin else settings.default_user_role 

612 global_role = await self.role_service.get_role_by_name(global_role_name, "global") 

613 

614 if global_role: 

615 try: 

616 await self.role_service.assign_role_to_user(user_email=email, role_id=global_role.id, scope="global", scope_id=None, granted_by=granter) 

617 logger.info(f"Assigned {global_role_name} role (global scope) to user {email}") 

618 except ValueError as e: 

619 logger.warning(f"Could not assign {global_role_name} role to {email}: {e}") 

620 else: 

621 logger.warning(f"{global_role_name} role not found. User {email} created without global role.") 

622 

623 # Assign team owner role with team scope (if personal team exists) 

624 if personal_team_id: 

625 team_owner_role_name = settings.default_team_owner_role 

626 team_owner_role = await self.role_service.get_role_by_name(team_owner_role_name, "team") 

627 

628 if team_owner_role: 

629 try: 

630 await self.role_service.assign_role_to_user(user_email=email, role_id=team_owner_role.id, scope="team", scope_id=personal_team_id, granted_by=granter) 

631 logger.info(f"Assigned {team_owner_role_name} role (team scope: {personal_team_id}) to user {email}") 

632 except ValueError as e: 

633 logger.warning(f"Could not assign {team_owner_role_name} role to {email}: {e}") 

634 else: 

635 logger.warning(f"{team_owner_role_name} role not found. User {email} created without team owner role.") 

636 

637 except Exception as role_error: 

638 logger.error(f"Failed to assign roles to user {email}: {role_error}") 

639 # Don't fail user creation if role assignment fails 

640 # User can be assigned roles manually later 

641 

642 # Log registration event 

643 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=True) 

644 self.db.add(registration_event) 

645 self.db.commit() 

646 

647 return user 

648 

649 except IntegrityError as e: 

650 self.db.rollback() 

651 logger.error(f"Database error creating user {email}: {e}") 

652 raise UserExistsError(f"User with email {email} already exists") from e 

653 except Exception as e: 

654 self.db.rollback() 

655 logger.error(f"Unexpected error creating user {email}: {e}") 

656 

657 # Log failed registration 

658 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=False, failure_reason=str(e)) 

659 self.db.add(registration_event) 

660 self.db.commit() 

661 

662 raise 

663 

664 async def authenticate_user(self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[EmailUser]: 

665 """Authenticate a user with email and password. 

666 

667 Args: 

668 email: User's email address 

669 password: Plain text password 

670 ip_address: Client IP address for logging 

671 user_agent: Client user agent for logging 

672 

673 Returns: 

674 EmailUser if authentication successful, None otherwise 

675 

676 Examples: 

677 # user = await service.authenticate_user("user@example.com", "correct_password") 

678 # user.email if user else None # Returns: 'user@example.com' 

679 # await service.authenticate_user("user@example.com", "wrong_password") # Returns: None 

680 """ 

681 email = email.lower().strip() 

682 start_time = time.monotonic() 

683 

684 # Get user from database 

685 user = await self.get_user_by_email(email) 

686 

687 # Track authentication attempt 

688 auth_success = False 

689 failure_reason = None 

690 

691 try: 

692 if not user: 

693 failure_reason = "User not found" 

694 logger.info(f"Authentication failed for {email}: user not found") 

695 await self._verify_dummy_password_for_timing(password) 

696 await self._apply_failed_login_floor(start_time) 

697 return None 

698 

699 if not user.is_active: 

700 failure_reason = "Account is disabled" 

701 logger.info(f"Authentication failed for {email}: account disabled") 

702 await self._verify_dummy_password_for_timing(password) 

703 await self._apply_failed_login_floor(start_time) 

704 return None 

705 

706 is_protected_admin = user.is_admin and settings.protect_all_admins 

707 

708 # Enforce lockout for all accounts. Protected admins are allowed 

709 # to continue attempting login (feature-flagged via protect_all_admins) 

710 # but their failed attempts are still tracked for audit purposes. 

711 if user.is_account_locked() and not is_protected_admin: 

712 failure_reason = "Account is locked" 

713 logger.info(f"Authentication failed for {email}: account locked") 

714 await self._verify_dummy_password_for_timing(password) 

715 await self._apply_failed_login_floor(start_time) 

716 return None 

717 

718 # Verify password 

719 if not await self.password_service.verify_password_async(password, user.password_hash): 

720 failure_reason = "Invalid password" 

721 

722 # Always increment failed attempts — including for protected admins 

723 max_attempts = getattr(settings, "max_failed_login_attempts", 5) 

724 lockout_duration = getattr(settings, "account_lockout_duration_minutes", 30) 

725 

726 is_locked = user.increment_failed_attempts(max_attempts, lockout_duration) 

727 

728 if is_locked: 

729 logger.warning(f"Account locked for {email} after {max_attempts} failed attempts") 

730 failure_reason = "Account locked due to too many failed attempts" 

731 lockout_notifications_enabled = getattr(settings, "account_lockout_notification_enabled", True) 

732 if isinstance(lockout_notifications_enabled, bool) and lockout_notifications_enabled: 

733 locked_until_iso = user.locked_until.isoformat() if user.locked_until else "unknown" 

734 try: 

735 await self.email_notification_service.send_account_lockout_email( 

736 to_email=user.email, 

737 full_name=user.full_name, 

738 locked_until_iso=locked_until_iso, 

739 reset_url=self._build_forgot_password_url(), 

740 ) 

741 except Exception as email_exc: 

742 logger.warning("Failed to send lockout notification for %s: %s", email, email_exc) 

743 self._log_auth_event( 

744 event_type="ACCOUNT_LOCKED", 

745 success=True, 

746 user_email=email, 

747 ip_address=ip_address, 

748 user_agent=user_agent, 

749 details={"locked_until": user.locked_until.isoformat() if user.locked_until else None}, 

750 ) 

751 

752 self.db.commit() 

753 logger.info(f"Authentication failed for {email}: invalid password") 

754 await self._apply_failed_login_floor(start_time) 

755 return None 

756 

757 # Authentication successful 

758 user.reset_failed_attempts() 

759 self.db.commit() 

760 

761 auth_success = True 

762 logger.info(f"Authentication successful for {email}") 

763 

764 return user 

765 

766 finally: 

767 # Log authentication event 

768 auth_event = EmailAuthEvent.create_login_attempt(user_email=email, success=auth_success, ip_address=ip_address, user_agent=user_agent, failure_reason=failure_reason) 

769 self.db.add(auth_event) 

770 self.db.commit() 

771 

772 async def request_password_reset(self, email: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetRequestResult: 

773 """Create a password reset token and send reset email when user exists. 

774 

775 The function intentionally returns generic outcomes to avoid account 

776 enumeration while still allowing rate-limit enforcement. 

777 

778 Args: 

779 email: User email requesting password reset. 

780 ip_address: Source IP address. 

781 user_agent: Source user agent string. 

782 

783 Returns: 

784 PasswordResetRequestResult: Reset request processing outcome. 

785 """ 

786 start_time = time.monotonic() 

787 normalized_email = (email or "").lower().strip() 

788 now = utc_now() 

789 _ = self._hash_reset_token(secrets.token_urlsafe(32)) 

790 

791 rate_limit = int(getattr(settings, "password_reset_rate_limit", 5)) 

792 is_rate_limited = bool(normalized_email and self._recent_password_reset_request_count(normalized_email, now) >= rate_limit) 

793 if is_rate_limited: 

794 password_reset_requests_counter.labels(outcome="rate_limited").inc() 

795 self._log_auth_event( 

796 event_type="PASSWORD_RESET_RATE_LIMITED", 

797 success=False, 

798 user_email=normalized_email or None, 

799 ip_address=ip_address, 

800 user_agent=user_agent, 

801 ) 

802 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time) 

803 if remaining > 0: 

804 await asyncio.sleep(remaining) 

805 return PasswordResetRequestResult(rate_limited=True, email_sent=False) 

806 

807 user = await self.get_user_by_email(normalized_email) if normalized_email else None 

808 self._log_auth_event( 

809 event_type="PASSWORD_RESET_REQUESTED", 

810 success=True, 

811 user_email=normalized_email or None, 

812 ip_address=ip_address, 

813 user_agent=user_agent, 

814 ) 

815 

816 email_sent = False 

817 if user and user.is_active: 

818 token_plaintext = secrets.token_urlsafe(48) 

819 token_hash = self._hash_reset_token(token_plaintext) 

820 expires_minutes = int(getattr(settings, "password_reset_token_expiry_minutes", 60)) 

821 expires_at = now + timedelta(minutes=expires_minutes) 

822 

823 existing_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.used_at.is_(None)).where(PasswordResetToken.expires_at > now) 

824 for existing in self.db.execute(existing_stmt).scalars().all(): 

825 existing.used_at = now 

826 

827 token_record = PasswordResetToken( 

828 user_email=user.email, 

829 token_hash=token_hash, 

830 expires_at=expires_at, 

831 ip_address=ip_address, 

832 user_agent=user_agent, 

833 ) 

834 self.db.add(token_record) 

835 self.db.commit() 

836 

837 try: 

838 email_sent = await self.email_notification_service.send_password_reset_email( 

839 to_email=user.email, 

840 full_name=user.full_name, 

841 reset_url=self._build_reset_password_url(token_plaintext), 

842 expires_minutes=expires_minutes, 

843 ) 

844 except Exception as exc: 

845 logger.warning("Failed to send password reset email to %s: %s", user.email, exc) 

846 

847 password_reset_requests_counter.labels(outcome="accepted").inc() 

848 self._log_auth_event( 

849 event_type="PASSWORD_RESET_EMAIL_SENT", 

850 success=True, 

851 user_email=user.email, 

852 ip_address=ip_address, 

853 user_agent=user_agent, 

854 details={"token_hash": token_hash, "expires_at": expires_at.isoformat(), "email_sent": email_sent}, 

855 ) 

856 else: 

857 password_reset_requests_counter.labels(outcome="accepted").inc() 

858 

859 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time) 

860 if remaining > 0: 

861 await asyncio.sleep(remaining) 

862 return PasswordResetRequestResult(rate_limited=False, email_sent=email_sent) 

863 

864 async def validate_password_reset_token(self, token: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetToken: 

865 """Validate a one-time password reset token. 

866 

867 Args: 

868 token: Plaintext password reset token. 

869 ip_address: Source IP address. 

870 user_agent: Source user agent string. 

871 

872 Returns: 

873 PasswordResetToken: Matching valid reset token record. 

874 

875 Raises: 

876 AuthenticationError: If token is missing, invalid, used, or expired. 

877 """ 

878 if not token: 

879 password_reset_completions_counter.labels(outcome="invalid_token").inc() 

880 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Missing token") 

881 raise AuthenticationError("This reset link is invalid") 

882 

883 token_hash = self._hash_reset_token(token) 

884 stmt = select(PasswordResetToken).where(PasswordResetToken.token_hash == token_hash) 

885 reset_token = self.db.execute(stmt).scalar_one_or_none() 

886 

887 if not reset_token: 

888 password_reset_completions_counter.labels(outcome="invalid_token").inc() 

889 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Invalid token hash") 

890 raise AuthenticationError("This reset link is invalid") 

891 

892 if not hmac.compare_digest(reset_token.token_hash, token_hash): 

893 password_reset_completions_counter.labels(outcome="invalid_token").inc() 

894 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token hash mismatch") 

895 raise AuthenticationError("This reset link is invalid") 

896 

897 if reset_token.is_used(): 

898 password_reset_completions_counter.labels(outcome="used_token").inc() 

899 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token already used") 

900 raise AuthenticationError("This reset link has already been used") 

901 

902 if reset_token.is_expired(): 

903 password_reset_completions_counter.labels(outcome="expired_token").inc() 

904 self._log_auth_event("PASSWORD_RESET_TOKEN_EXPIRED", False, reset_token.user_email, ip_address, user_agent, details={"token_hash": token_hash}) 

905 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token expired") 

906 raise AuthenticationError("This reset link has expired") 

907 

908 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", True, reset_token.user_email, ip_address, user_agent) 

909 return reset_token 

910 

911 async def reset_password_with_token(self, token: str, new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool: 

912 """Complete password reset using a validated one-time token. 

913 

914 Args: 

915 token: Plaintext password reset token. 

916 new_password: New password value. 

917 ip_address: Source IP address. 

918 user_agent: Source user agent string. 

919 

920 Returns: 

921 bool: True when password reset completed successfully. 

922 

923 Raises: 

924 AuthenticationError: If token or associated user is invalid. 

925 PasswordValidationError: If new password violates policy or reuse checks. 

926 """ 

927 reset_token = await self.validate_password_reset_token(token, ip_address=ip_address, user_agent=user_agent) 

928 user = await self.get_user_by_email(reset_token.user_email) 

929 if not user or not user.is_active: 

930 password_reset_completions_counter.labels(outcome="invalid_user").inc() 

931 raise AuthenticationError("This reset link is invalid") 

932 

933 self.validate_password(new_password) 

934 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash): 

935 password_reset_completions_counter.labels(outcome="reused_password").inc() 

936 raise PasswordValidationError("New password must be different from current password") 

937 

938 now = utc_now() 

939 user.password_hash = await self.password_service.hash_password_async(new_password) 

940 user.password_change_required = False 

941 user.password_changed_at = now 

942 user.failed_login_attempts = 0 

943 user.locked_until = None 

944 

945 reset_token.used_at = now 

946 outstanding_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.id != reset_token.id).where(PasswordResetToken.used_at.is_(None)) 

947 for outstanding in self.db.execute(outstanding_stmt).scalars().all(): 

948 outstanding.used_at = now 

949 

950 self.db.commit() 

951 

952 if getattr(settings, "password_reset_invalidate_sessions", True): 

953 await self._invalidate_user_auth_cache(user.email) 

954 

955 email_sent = False 

956 try: 

957 email_sent = await self.email_notification_service.send_password_reset_confirmation_email(to_email=user.email, full_name=user.full_name) 

958 except Exception as exc: 

959 logger.warning("Failed to send password reset confirmation for %s: %s", user.email, exc) 

960 

961 password_reset_completions_counter.labels(outcome="success").inc() 

962 self._log_auth_event( 

963 event_type="PASSWORD_RESET_COMPLETED", 

964 success=True, 

965 user_email=user.email, 

966 ip_address=ip_address, 

967 user_agent=user_agent, 

968 details={"email_sent": email_sent}, 

969 ) 

970 return True 

971 

972 async def unlock_user_account(self, email: str, unlocked_by: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> EmailUser: 

973 """Clear lockout state for a user account. 

974 

975 Args: 

976 email: User email to unlock. 

977 unlocked_by: Admin/user identifier who performed unlock. 

978 ip_address: Source IP address. 

979 user_agent: Source user agent string. 

980 

981 Returns: 

982 EmailUser: Updated user record after unlock. 

983 

984 Raises: 

985 ValueError: If the target user cannot be found. 

986 """ 

987 normalized_email = email.lower().strip() 

988 user = await self.get_user_by_email(normalized_email) 

989 if not user: 

990 raise ValueError(f"User {normalized_email} not found") 

991 

992 user.failed_login_attempts = 0 

993 user.locked_until = None 

994 user.updated_at = utc_now() 

995 self.db.commit() 

996 self._log_auth_event( 

997 event_type="ACCOUNT_UNLOCKED", 

998 success=True, 

999 user_email=user.email, 

1000 ip_address=ip_address, 

1001 user_agent=user_agent, 

1002 details={"unlocked_by": unlocked_by}, 

1003 ) 

1004 return user 

1005 

1006 async def change_password(self, email: str, old_password: Optional[str], new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool: 

1007 """Change a user's password. 

1008 

1009 Args: 

1010 email: User's email address 

1011 old_password: Current password for verification 

1012 new_password: New password to set 

1013 ip_address: Client IP address for logging 

1014 user_agent: Client user agent for logging 

1015 

1016 Returns: 

1017 bool: True if password changed successfully 

1018 

1019 Raises: 

1020 AuthenticationError: If old password is incorrect 

1021 PasswordValidationError: If new password doesn't meet policy 

1022 Exception: If database operation fails 

1023 

1024 Examples: 

1025 # success = await service.change_password( 

1026 # "user@example.com", 

1027 # "old_password", 

1028 # "new_secure_password" 

1029 # ) 

1030 # success # Returns: True 

1031 """ 

1032 # Validate old password is provided 

1033 if old_password is None: 

1034 raise AuthenticationError("Current password is required") 

1035 

1036 # First authenticate with old password 

1037 user = await self.authenticate_user(email, old_password, ip_address, user_agent) 

1038 if not user: 

1039 raise AuthenticationError("Current password is incorrect") 

1040 

1041 # Validate new password 

1042 self.validate_password(new_password) 

1043 

1044 # Check if new password is same as old (optional policy) 

1045 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash): 

1046 raise PasswordValidationError("New password must be different from current password") 

1047 

1048 success = False 

1049 try: 

1050 # Hash new password and update 

1051 new_password_hash = await self.password_service.hash_password_async(new_password) 

1052 user.password_hash = new_password_hash 

1053 # Clear the flag that requires the user to change password 

1054 user.password_change_required = False 

1055 # Record the password change timestamp 

1056 try: 

1057 user.password_changed_at = utc_now() 

1058 except Exception as exc: 

1059 logger.debug("Failed to set password_changed_at for %s: %s", email, exc) 

1060 

1061 self.db.commit() 

1062 success = True 

1063 

1064 # Invalidate auth cache for user 

1065 try: 

1066 await self._invalidate_user_auth_cache(email) 

1067 except Exception as cache_error: # nosec B110 - best effort cache invalidation 

1068 logger.debug("Failed to invalidate auth cache on password change: %s", cache_error) 

1069 

1070 logger.info(f"Password changed successfully for {email}") 

1071 

1072 except Exception as e: 

1073 self.db.rollback() 

1074 logger.error(f"Error changing password for {email}: {e}") 

1075 raise 

1076 finally: 

1077 # Log password change event 

1078 password_event = EmailAuthEvent.create_password_change_event(user_email=email, success=success, ip_address=ip_address, user_agent=user_agent) 

1079 self.db.add(password_event) 

1080 self.db.commit() 

1081 

1082 return success 

1083 

1084 async def create_platform_admin(self, email: str, password: str, full_name: Optional[str] = None) -> EmailUser: 

1085 """Create or update the platform administrator user. 

1086 

1087 This method is used during system bootstrap to create the initial 

1088 admin user from environment variables. 

1089 

1090 Args: 

1091 email: Admin email address 

1092 password: Admin password 

1093 full_name: Admin full name 

1094 

1095 Returns: 

1096 EmailUser: The admin user 

1097 

1098 Examples: 

1099 # admin = await service.create_platform_admin( 

1100 # "admin@example.com", 

1101 # "admin_password", 

1102 # "Platform Administrator" 

1103 # ) 

1104 # admin.is_admin # Returns: True 

1105 """ 

1106 # Check if admin user already exists 

1107 existing_admin = await self.get_user_by_email(email) 

1108 

1109 if existing_admin: 

1110 # Update existing admin if password or name changed 

1111 if full_name and existing_admin.full_name != full_name: 

1112 existing_admin.full_name = full_name 

1113 

1114 # Check if password needs update (verify current password first) 

1115 if not await self.password_service.verify_password_async(password, existing_admin.password_hash): 

1116 existing_admin.password_hash = await self.password_service.hash_password_async(password) 

1117 try: 

1118 existing_admin.password_changed_at = utc_now() 

1119 except Exception as exc: 

1120 logger.debug("Failed to set password_changed_at for existing admin %s: %s", email, exc) 

1121 

1122 # Ensure admin status 

1123 existing_admin.is_admin = True 

1124 existing_admin.is_active = True 

1125 

1126 self.db.commit() 

1127 logger.info(f"Updated platform admin user: {email}") 

1128 return existing_admin 

1129 

1130 # Create new admin user - skip password validation during bootstrap 

1131 admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True) 

1132 

1133 logger.info(f"Created platform admin user: {email}") 

1134 return admin_user 

1135 

1136 async def update_last_login(self, email: str) -> None: 

1137 """Update the last login timestamp for a user. 

1138 

1139 Args: 

1140 email: User's email address 

1141 """ 

1142 user = await self.get_user_by_email(email) 

1143 if user: 

1144 user.reset_failed_attempts() # This also updates last_login 

1145 self.db.commit() 

1146 

1147 @staticmethod 

1148 def _escape_like(value: str) -> str: 

1149 """Escape LIKE wildcards for prefix search. 

1150 

1151 Args: 

1152 value: Raw value to escape for LIKE matching. 

1153 

1154 Returns: 

1155 Escaped string safe for LIKE patterns. 

1156 """ 

1157 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

1158 

1159 async def list_users( 

1160 self, 

1161 limit: Optional[int] = None, 

1162 cursor: Optional[str] = None, 

1163 page: Optional[int] = None, 

1164 per_page: Optional[int] = None, 

1165 search: Optional[str] = None, 

1166 ) -> UsersListResult: 

1167 """List all users with cursor or page-based pagination support and optional search. 

1168 

1169 This method supports both cursor-based (for API endpoints with large datasets) 

1170 and page-based (for admin UI with page numbers) pagination, with optional 

1171 search filtering by email or full name. 

1172 

1173 Note: This method returns ORM objects and cannot be cached since callers 

1174 depend on ORM attributes and methods (e.g., EmailUserResponse.from_email_user). 

1175 

1176 Args: 

1177 limit: Maximum number of users to return (for cursor-based pagination) 

1178 cursor: Opaque cursor token for cursor-based pagination 

1179 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1180 per_page: Items per page for page-based pagination 

1181 search: Optional search term to filter by email or full name (case-insensitive) 

1182 

1183 Returns: 

1184 UsersListResult with data and optional pagination metadata. 

1185 

1186 Examples: 

1187 # Cursor-based pagination (for APIs) 

1188 # result = await service.list_users(cursor=None, limit=50) 

1189 # len(result.data) <= 50 # Returns: True 

1190 

1191 # Page-based pagination (for admin UI) 

1192 # result = await service.list_users(page=1, per_page=10) 

1193 # result.data # Returns: list of users 

1194 # result.pagination # Returns: pagination metadata 

1195 

1196 # Search users 

1197 # users = await service.list_users(search="john", page=1, per_page=10) 

1198 # All users with "john" in email or name 

1199 """ 

1200 try: 

1201 # Build base query with ordering by created_at, email for consistent pagination 

1202 # Note: EmailUser uses email as primary key, not id 

1203 query = select(EmailUser).order_by(desc(EmailUser.created_at), desc(EmailUser.email)) 

1204 

1205 # Apply search filter if provided (prefix search for better index usage) 

1206 if search and search.strip(): 

1207 search_term = f"{self._escape_like(search.strip())}%" 

1208 # NOTE: For large Postgres datasets, consider citext or functional indexes for case-insensitive search. 

1209 query = query.where( 

1210 or_( 

1211 EmailUser.email.ilike(search_term, escape="\\"), 

1212 EmailUser.full_name.ilike(search_term, escape="\\"), 

1213 ) 

1214 ) 

1215 

1216 # Page-based pagination: use unified_paginate 

1217 if page is not None: 

1218 pag_result = await unified_paginate( 

1219 db=self.db, 

1220 query=query, 

1221 page=page, 

1222 per_page=per_page, 

1223 cursor=None, 

1224 limit=None, 

1225 base_url="/admin/users", 

1226 query_params={}, 

1227 ) 

1228 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"]) 

1229 

1230 # Cursor-based pagination: custom implementation for EmailUser 

1231 # EmailUser uses email as PK (not id), so we need custom cursor using (created_at, email) 

1232 page_size = limit if limit and limit > 0 else settings.pagination_default_page_size 

1233 if limit == 0: 

1234 page_size = None # No limit 

1235 

1236 # Decode cursor and apply keyset filter if provided 

1237 if cursor: 

1238 try: 

1239 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

1240 cursor_data = orjson.loads(cursor_json) 

1241 last_email = cursor_data.get("email") 

1242 created_str = cursor_data.get("created_at") 

1243 if last_email and created_str: 

1244 last_created = datetime.fromisoformat(created_str) 

1245 # Apply keyset filter (assumes DESC order on created_at, email) 

1246 query = query.where( 

1247 or_( 

1248 EmailUser.created_at < last_created, 

1249 and_(EmailUser.created_at == last_created, EmailUser.email < last_email), 

1250 ) 

1251 ) 

1252 except (ValueError, TypeError) as e: 

1253 logger.warning(f"Invalid cursor for user pagination, ignoring: {e}") 

1254 

1255 # Fetch page_size + 1 to determine if there are more results 

1256 if page_size is not None: 

1257 query = query.limit(page_size + 1) 

1258 result = self.db.execute(query) 

1259 users = list(result.scalars().all()) 

1260 

1261 if page_size is None: 

1262 return UsersListResult(data=users, next_cursor=None) 

1263 

1264 # Check if there are more results 

1265 has_more = len(users) > page_size 

1266 if has_more: 

1267 users = users[:page_size] 

1268 

1269 # Generate next cursor using (created_at, email) for EmailUser 

1270 next_cursor = None 

1271 if has_more and users: 

1272 last_user = users[-1] 

1273 cursor_data = { 

1274 "created_at": last_user.created_at.isoformat() if last_user.created_at else None, 

1275 "email": last_user.email, 

1276 } 

1277 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

1278 

1279 return UsersListResult(data=users, next_cursor=next_cursor) 

1280 

1281 except Exception as e: 

1282 logger.error(f"Error listing users: {e}") 

1283 # Return appropriate empty response based on pagination mode 

1284 if page is not None: 

1285 fallback_per_page = per_page or 50 

1286 return UsersListResult( 

1287 data=[], 

1288 pagination=PaginationMeta(page=page, per_page=fallback_per_page, total_items=0, total_pages=0, has_next=False, has_prev=False), 

1289 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg 

1290 self=f"/admin/users?page=1&per_page={fallback_per_page}", 

1291 first=f"/admin/users?page=1&per_page={fallback_per_page}", 

1292 last=f"/admin/users?page=1&per_page={fallback_per_page}", 

1293 ), 

1294 ) 

1295 

1296 if cursor is not None: 

1297 return UsersListResult(data=[], next_cursor=None) 

1298 

1299 return UsersListResult(data=[]) 

1300 

1301 async def list_users_not_in_team( 

1302 self, 

1303 team_id: str, 

1304 cursor: Optional[str] = None, 

1305 limit: Optional[int] = None, 

1306 page: Optional[int] = None, 

1307 per_page: Optional[int] = None, 

1308 search: Optional[str] = None, 

1309 ) -> UsersListResult: 

1310 """List users who are NOT members of the specified team with cursor or page-based pagination. 

1311 

1312 Uses a NOT IN subquery to efficiently exclude team members. 

1313 

1314 Args: 

1315 team_id: ID of the team to exclude members from 

1316 cursor: Opaque cursor token for cursor-based pagination 

1317 limit: Maximum number of users to return (for cursor-based, default: 50) 

1318 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1319 per_page: Items per page for page-based pagination (default: 30) 

1320 search: Optional search term to filter by email or full name 

1321 

1322 Returns: 

1323 UsersListResult with data and either cursor or pagination metadata 

1324 

1325 Examples: 

1326 # Page-based (admin UI) 

1327 # result = await service.list_users_not_in_team("team-123", page=1, per_page=30) 

1328 # result.pagination # Returns: pagination metadata 

1329 

1330 # Cursor-based (API) 

1331 # result = await service.list_users_not_in_team("team-123", cursor=None, limit=50) 

1332 # result.next_cursor # Returns: next cursor token 

1333 """ 

1334 try: 

1335 # Build base query 

1336 query = select(EmailUser) 

1337 

1338 # Apply search filter if provided 

1339 if search and search.strip(): 

1340 search_term = f"{self._escape_like(search.strip())}%" 

1341 query = query.where( 

1342 or_( 

1343 EmailUser.email.ilike(search_term, escape="\\"), 

1344 EmailUser.full_name.ilike(search_term, escape="\\"), 

1345 ) 

1346 ) 

1347 

1348 # Exclude team members using NOT IN subquery 

1349 member_emails_subquery = select(EmailTeamMember.user_email).where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

1350 query = query.where(EmailUser.is_active.is_(True), ~EmailUser.email.in_(member_emails_subquery)) 

1351 

1352 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate 

1353 if page is not None: 

1354 query = query.order_by(EmailUser.full_name, EmailUser.email) 

1355 pag_result = await unified_paginate( 

1356 db=self.db, 

1357 query=query, 

1358 page=page, 

1359 per_page=per_page or 30, 

1360 cursor=None, 

1361 limit=None, 

1362 base_url=f"/admin/teams/{team_id}/non-members", 

1363 query_params={}, 

1364 ) 

1365 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"]) 

1366 

1367 # CURSOR-BASED PAGINATION - custom implementation using (created_at, email) 

1368 # unified_paginate uses (created_at, id) but EmailUser uses email as PK 

1369 query = query.order_by(desc(EmailUser.created_at), desc(EmailUser.email)) 

1370 

1371 # Decode cursor and apply keyset filter 

1372 if cursor: 

1373 try: 

1374 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

1375 cursor_data = orjson.loads(cursor_json) 

1376 last_email = cursor_data.get("email") 

1377 created_str = cursor_data.get("created_at") 

1378 if last_email and created_str: 

1379 last_created = datetime.fromisoformat(created_str) 

1380 # Keyset filter: (created_at < last) OR (created_at = last AND email < last_email) 

1381 query = query.where( 

1382 or_( 

1383 EmailUser.created_at < last_created, 

1384 and_(EmailUser.created_at == last_created, EmailUser.email < last_email), 

1385 ) 

1386 ) 

1387 except (ValueError, TypeError) as e: 

1388 logger.warning(f"Invalid cursor for non-members list, ignoring: {e}") 

1389 

1390 # Fetch limit + 1 to check for more results 

1391 page_size = limit or 50 

1392 query = query.limit(page_size + 1) 

1393 users = list(self.db.execute(query).scalars().all()) 

1394 

1395 # Check if there are more results 

1396 has_more = len(users) > page_size 

1397 if has_more: 

1398 users = users[:page_size] 

1399 

1400 # Generate next cursor using (created_at, email) 

1401 next_cursor = None 

1402 if has_more and users: 

1403 last_user = users[-1] 

1404 cursor_data = { 

1405 "created_at": last_user.created_at.isoformat() if last_user.created_at else None, 

1406 "email": last_user.email, 

1407 } 

1408 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

1409 

1410 self.db.commit() 

1411 return UsersListResult(data=users, next_cursor=next_cursor) 

1412 

1413 except Exception as e: 

1414 logger.error(f"Error listing non-members for team {team_id}: {e}") 

1415 

1416 # Return appropriate empty response based on mode 

1417 if page is not None: 

1418 return UsersListResult( 

1419 data=[], 

1420 pagination=PaginationMeta(page=page, per_page=per_page or 30, total_items=0, total_pages=0, has_next=False, has_prev=False), 

1421 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg 

1422 self=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

1423 first=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

1424 last=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}", 

1425 ), 

1426 ) 

1427 

1428 return UsersListResult(data=[], next_cursor=None) 

1429 

1430 async def get_all_users(self) -> list[EmailUser]: 

1431 """Get all users without pagination. 

1432 

1433 .. deprecated:: 1.0 

1434 Use :meth:`list_users` with proper pagination instead. 

1435 This method has a hardcoded limit of 10,000 users and will not return 

1436 more than that. For production systems with many users, use paginated 

1437 access with search/filtering. 

1438 

1439 Returns: 

1440 List of up to 10,000 EmailUser objects 

1441 

1442 Raises: 

1443 ValueError: If total users exceed 10,000 

1444 

1445 Examples: 

1446 # users = await service.get_all_users() 

1447 # isinstance(users, list) # Returns: True 

1448 

1449 Warning: 

1450 This method is deprecated and will be removed in a future version. 

1451 Use list_users() with pagination instead: 

1452 

1453 # For small datasets 

1454 users = await service.list_users(page=1, per_page=1000).data 

1455 

1456 # For searching 

1457 users = await service.list_users(search="john", page=1, per_page=10).data 

1458 """ 

1459 if not self.__class__.get_all_users_deprecated_warned: 

1460 warnings.warn( 

1461 "get_all_users() is deprecated and limited to 10,000 users. " + "Use list_users() with pagination instead.", 

1462 DeprecationWarning, 

1463 stacklevel=2, 

1464 ) 

1465 self.__class__.get_all_users_deprecated_warned = True 

1466 

1467 total_users = await self.count_users() 

1468 if total_users > _GET_ALL_USERS_LIMIT: 

1469 raise ValueError("get_all_users() supports up to 10,000 users. Use list_users() pagination instead.") 

1470 

1471 result = await self.list_users(limit=_GET_ALL_USERS_LIMIT) 

1472 return result.data # Large limit to get all users 

1473 

1474 async def count_users(self) -> int: 

1475 """Count total number of users. 

1476 

1477 Returns: 

1478 int: Total user count 

1479 """ 

1480 try: 

1481 stmt = select(func.count(EmailUser.email)) # pylint: disable=not-callable 

1482 count = self.db.execute(stmt).scalar() or 0 

1483 return count 

1484 except Exception as e: 

1485 logger.error(f"Error counting users: {e}") 

1486 return 0 

1487 

1488 async def get_auth_events(self, email: Optional[str] = None, limit: int = 100, offset: int = 0) -> list[EmailAuthEvent]: 

1489 """Get authentication events for auditing. 

1490 

1491 Args: 

1492 email: Filter by specific user email (optional) 

1493 limit: Maximum number of events to return 

1494 offset: Number of events to skip 

1495 

1496 Returns: 

1497 List of EmailAuthEvent objects 

1498 """ 

1499 try: 

1500 stmt = select(EmailAuthEvent) 

1501 if email: 

1502 stmt = stmt.where(EmailAuthEvent.user_email == email) 

1503 stmt = stmt.order_by(EmailAuthEvent.timestamp.desc()).offset(offset).limit(limit) 

1504 

1505 result = self.db.execute(stmt) 

1506 events = list(result.scalars().all()) 

1507 return events 

1508 except Exception as e: 

1509 logger.error(f"Error getting auth events: {e}") 

1510 return [] 

1511 

1512 async def update_user( 

1513 self, 

1514 email: str, 

1515 full_name: Optional[str] = None, 

1516 is_admin: Optional[bool] = None, 

1517 is_active: Optional[bool] = None, 

1518 email_verified: Optional[bool] = None, 

1519 password_change_required: Optional[bool] = None, 

1520 password: Optional[str] = None, 

1521 admin_origin_source: Optional[str] = None, 

1522 ) -> EmailUser: 

1523 """Update user information. 

1524 

1525 Args: 

1526 email: User's email address (primary key) 

1527 full_name: New full name (optional) 

1528 is_admin: New admin status (optional) 

1529 is_active: New active status (optional) 

1530 email_verified: Set email verification status (optional) 

1531 password_change_required: Whether user must change password on next login (optional) 

1532 password: New password (optional, will be hashed) 

1533 admin_origin_source: Source of admin change for tracking (e.g. "api", "ui"). Callers should pass explicitly. 

1534 

1535 Returns: 

1536 EmailUser: Updated user object 

1537 

1538 Raises: 

1539 ValueError: If user doesn't exist, if protect_all_admins blocks the change, or if it would remove the last active admin 

1540 PasswordValidationError: If password doesn't meet policy 

1541 """ 

1542 try: 

1543 # Normalize email to match create_user() / get_user_by_email() behavior 

1544 email = email.lower().strip() 

1545 

1546 # Get existing user 

1547 stmt = select(EmailUser).where(EmailUser.email == email) 

1548 result = self.db.execute(stmt) 

1549 user = result.scalar_one_or_none() 

1550 

1551 if not user: 

1552 raise ValueError(f"User {email} not found") 

1553 

1554 # Admin protection guard 

1555 if user.is_admin and user.is_active: 

1556 would_lose_admin = (is_admin is not None and not is_admin) or (is_active is not None and not is_active) 

1557 if would_lose_admin: 

1558 if settings.protect_all_admins: 

1559 raise ValueError("Admin protection is enabled — cannot demote or deactivate any admin user") 

1560 if await self.is_last_active_admin(email): 

1561 raise ValueError("Cannot demote or deactivate the last remaining active admin user") 

1562 

1563 # Update fields if provided 

1564 if full_name is not None: 

1565 user.full_name = full_name 

1566 

1567 if email_verified is not None: 

1568 user.email_verified_at = utc_now() if email_verified else None 

1569 

1570 if is_admin is not None: 

1571 # Track admin_origin when status actually changes 

1572 if is_admin != user.is_admin: 

1573 user.is_admin = is_admin 

1574 user.admin_origin = admin_origin_source if is_admin else None 

1575 

1576 # Sync global role assignment with is_admin flag: 

1577 # Promotion: revoke default_user_role, assign default_admin_role 

1578 # Demotion: revoke default_admin_role, assign default_user_role 

1579 try: 

1580 admin_role_name = settings.default_admin_role 

1581 user_role_name = settings.default_user_role 

1582 admin_role = await self.role_service.get_role_by_name(admin_role_name, "global") 

1583 user_role = await self.role_service.get_role_by_name(user_role_name, "global") 

1584 

1585 if is_admin: 

1586 # Promotion: assign admin role, revoke user role 

1587 if admin_role: 

1588 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=admin_role.id, scope="global", scope_id=None) 

1589 if not existing or not existing.is_active: 

1590 await self.role_service.assign_role_to_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None, granted_by=email) 

1591 logger.info(f"Assigned {admin_role_name} role to {email}") 

1592 else: 

1593 logger.warning(f"{admin_role_name} role not found, cannot assign to {email}") 

1594 

1595 if user_role: 

1596 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None) 

1597 if revoked: 

1598 logger.info(f"Revoked {user_role_name} role from {email}") 

1599 else: 

1600 # Demotion: revoke admin role, assign user role 

1601 if admin_role: 

1602 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None) 

1603 if revoked: 

1604 logger.info(f"Revoked {admin_role_name} role from {email}") 

1605 

1606 if user_role: 

1607 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=user_role.id, scope="global", scope_id=None) 

1608 if not existing or not existing.is_active: 

1609 await self.role_service.assign_role_to_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None, granted_by=email) 

1610 logger.info(f"Assigned {user_role_name} role to {email}") 

1611 else: 

1612 logger.warning(f"{user_role_name} role not found, cannot assign to {email}") 

1613 

1614 except Exception as e: 

1615 logger.warning(f"Failed to sync global roles for {email}: {e}") 

1616 # Don't fail user update if role sync fails 

1617 

1618 if is_active is not None: 

1619 user.is_active = is_active 

1620 

1621 if password is not None: 

1622 self.validate_password(password) 

1623 user.password_hash = await self.password_service.hash_password_async(password) 

1624 # Only clear password_change_required if it wasn't explicitly set 

1625 if password_change_required is None: 

1626 user.password_change_required = False 

1627 user.password_changed_at = utc_now() 

1628 

1629 # Set password_change_required after password processing to allow explicit override 

1630 if password_change_required is not None: 

1631 user.password_change_required = password_change_required 

1632 

1633 user.updated_at = datetime.now(timezone.utc) 

1634 

1635 self.db.commit() 

1636 

1637 return user 

1638 

1639 except Exception as e: 

1640 self.db.rollback() 

1641 logger.error(f"Error updating user {email}: {e}") 

1642 raise 

1643 

1644 async def activate_user(self, email: str) -> EmailUser: 

1645 """Activate a user account. 

1646 

1647 Args: 

1648 email: User's email address 

1649 

1650 Returns: 

1651 EmailUser: Updated user object 

1652 

1653 Raises: 

1654 ValueError: If user doesn't exist 

1655 """ 

1656 try: 

1657 stmt = select(EmailUser).where(EmailUser.email == email) 

1658 result = self.db.execute(stmt) 

1659 user = result.scalar_one_or_none() 

1660 

1661 if not user: 

1662 raise ValueError(f"User {email} not found") 

1663 

1664 user.is_active = True 

1665 user.updated_at = datetime.now(timezone.utc) 

1666 

1667 self.db.commit() 

1668 

1669 logger.info(f"User {email} activated") 

1670 return user 

1671 

1672 except Exception as e: 

1673 self.db.rollback() 

1674 logger.error(f"Error activating user {email}: {e}") 

1675 raise 

1676 

1677 async def deactivate_user(self, email: str) -> EmailUser: 

1678 """Deactivate a user account. 

1679 

1680 Args: 

1681 email: User's email address 

1682 

1683 Returns: 

1684 EmailUser: Updated user object 

1685 

1686 Raises: 

1687 ValueError: If user doesn't exist 

1688 """ 

1689 try: 

1690 stmt = select(EmailUser).where(EmailUser.email == email) 

1691 result = self.db.execute(stmt) 

1692 user = result.scalar_one_or_none() 

1693 

1694 if not user: 

1695 raise ValueError(f"User {email} not found") 

1696 

1697 user.is_active = False 

1698 user.updated_at = datetime.now(timezone.utc) 

1699 

1700 self.db.commit() 

1701 

1702 logger.info(f"User {email} deactivated") 

1703 return user 

1704 

1705 except Exception as e: 

1706 self.db.rollback() 

1707 logger.error(f"Error deactivating user {email}: {e}") 

1708 raise 

1709 

1710 async def delete_user(self, email: str) -> bool: 

1711 """Delete a user account permanently. 

1712 

1713 Args: 

1714 email: User's email address 

1715 

1716 Returns: 

1717 bool: True if user was deleted 

1718 

1719 Raises: 

1720 ValueError: If user doesn't exist 

1721 ValueError: If user owns teams that cannot be transferred 

1722 """ 

1723 try: 

1724 stmt = select(EmailUser).where(EmailUser.email == email) 

1725 result = self.db.execute(stmt) 

1726 user = result.scalar_one_or_none() 

1727 

1728 if not user: 

1729 raise ValueError(f"User {email} not found") 

1730 

1731 # Check if user owns any teams 

1732 teams_owned_stmt = select(EmailTeam).where(EmailTeam.created_by == email) 

1733 teams_owned = self.db.execute(teams_owned_stmt).scalars().all() 

1734 

1735 if teams_owned: 

1736 # For each team, try to transfer ownership to another owner 

1737 for team in teams_owned: 

1738 # Find other team owners who can take ownership 

1739 potential_owners_stmt = ( 

1740 select(EmailTeamMember).where(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email != email, EmailTeamMember.role == "owner").order_by(EmailTeamMember.role.desc()) 

1741 ) 

1742 

1743 potential_owners = self.db.execute(potential_owners_stmt).scalars().all() 

1744 

1745 if potential_owners: 

1746 # Transfer ownership to the first available owner 

1747 new_owner = potential_owners[0] 

1748 team.created_by = new_owner.user_email 

1749 logger.info(f"Transferred team '{team.name}' ownership from {email} to {new_owner.user_email}") 

1750 else: 

1751 # No other owners available - check if it's a single-user team 

1752 all_members_stmt = select(EmailTeamMember).where(EmailTeamMember.team_id == team.id) 

1753 all_members = self.db.execute(all_members_stmt).scalars().all() 

1754 

1755 if len(all_members) == 1 and all_members[0].user_email == email: 

1756 # This is a single-user personal team - cascade delete it 

1757 logger.info(f"Deleting personal team '{team.name}' (single member: {email})") 

1758 # Delete team members first (should be just the owner) 

1759 delete_team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.team_id == team.id) 

1760 self.db.execute(delete_team_members_stmt) 

1761 # Delete the team 

1762 self.db.delete(team) 

1763 else: 

1764 # Multi-member team with no other owners - cannot delete user 

1765 raise ValueError(f"Cannot delete user {email}: owns team '{team.name}' with {len(all_members)} members but no other owners to transfer ownership to") 

1766 

1767 # Delete all role assignments for the user 

1768 try: 

1769 await self.role_service.delete_all_user_roles(email) 

1770 except Exception as e: 

1771 logger.warning(f"Failed to delete role assignments for {email}: {e}") 

1772 

1773 # Reassign non-null audit FKs to another user so deleting this user does not 

1774 # break referential integrity for historical records. 

1775 replacement_row = self.db.query(EmailUser.email).filter(EmailUser.email != email).order_by(EmailUser.is_admin.desc(), EmailUser.created_at.asc()).first() 

1776 replacement_email = replacement_row[0] if replacement_row else None 

1777 

1778 if replacement_email: 

1779 self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.invited_by == email).update({EmailTeamInvitation.invited_by: replacement_email}, synchronize_session=False) 

1780 self.db.query(Role).filter(Role.created_by == email).update({Role.created_by: replacement_email}, synchronize_session=False) 

1781 self.db.query(UserRole).filter(UserRole.granted_by == email).update({UserRole.granted_by: replacement_email}, synchronize_session=False) 

1782 self.db.query(TokenRevocation).filter(TokenRevocation.revoked_by == email).update({TokenRevocation.revoked_by: replacement_email}, synchronize_session=False) 

1783 

1784 # Nullify nullable actor references. 

1785 self.db.query(EmailTeamMember).filter(EmailTeamMember.invited_by == email).update({EmailTeamMember.invited_by: None}, synchronize_session=False) 

1786 self.db.query(EmailTeamMemberHistory).filter(EmailTeamMemberHistory.action_by == email).update({EmailTeamMemberHistory.action_by: None}, synchronize_session=False) 

1787 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.reviewed_by == email).update({EmailTeamJoinRequest.reviewed_by: None}, synchronize_session=False) 

1788 self.db.query(PendingUserApproval).filter(PendingUserApproval.approved_by == email).update({PendingUserApproval.approved_by: None}, synchronize_session=False) 

1789 self.db.query(SSOAuthSession).filter(SSOAuthSession.user_email == email).update({SSOAuthSession.user_email: None}, synchronize_session=False) 

1790 

1791 # Remove rows where this user is the primary subject. 

1792 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == email).delete(synchronize_session=False) 

1793 

1794 # Delete related auth events 

1795 auth_events_stmt = delete(EmailAuthEvent).where(EmailAuthEvent.user_email == email) 

1796 self.db.execute(auth_events_stmt) 

1797 

1798 # Remove user from all team memberships 

1799 team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.user_email == email) 

1800 self.db.execute(team_members_stmt) 

1801 

1802 # Delete the user 

1803 self.db.delete(user) 

1804 self.db.commit() 

1805 

1806 # Invalidate all auth caches for deleted user 

1807 try: 

1808 # First-Party 

1809 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

1810 

1811 for coro in [auth_cache.invalidate_user(email), auth_cache.invalidate_user_teams(email), auth_cache.invalidate_team_membership(email)]: 

1812 task = asyncio.create_task(coro) 

1813 _background_tasks.add(task) 

1814 task.add_done_callback(_background_tasks.discard) 

1815 except Exception as cache_error: 

1816 logger.debug(f"Failed to invalidate cache on user delete: {cache_error}") 

1817 

1818 logger.info(f"User {email} deleted permanently") 

1819 return True 

1820 

1821 except Exception as e: 

1822 self.db.rollback() 

1823 logger.error(f"Error deleting user {email}: {e}") 

1824 raise 

1825 

1826 async def count_active_admin_users(self) -> int: 

1827 """Count the number of active admin users. 

1828 

1829 Returns: 

1830 int: Number of active admin users 

1831 """ 

1832 stmt = select(func.count(EmailUser.email)).where(EmailUser.is_admin.is_(True), EmailUser.is_active.is_(True)) # pylint: disable=not-callable 

1833 result = self.db.execute(stmt) 

1834 return result.scalar() or 0 

1835 

1836 async def is_last_active_admin(self, email: str) -> bool: 

1837 """Check if the given user is the last active admin. 

1838 

1839 Args: 

1840 email: User's email address 

1841 

1842 Returns: 

1843 bool: True if this user is the last active admin 

1844 """ 

1845 # First check if the user is an active admin 

1846 stmt = select(EmailUser).where(EmailUser.email == email) 

1847 result = self.db.execute(stmt) 

1848 user = result.scalar_one_or_none() 

1849 

1850 if not user or not user.is_admin or not user.is_active: 

1851 return False 

1852 

1853 # Count total active admins 

1854 admin_count = await self.count_active_admin_users() 

1855 return admin_count == 1