Coverage for mcpgateway / services / email_auth_service.py: 99%
765 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/email_auth_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Email Authentication Service.
8This module provides email-based user authentication services including
9user creation, authentication, password management, and security features.
11Examples:
12 Basic usage (requires async context):
13 from mcpgateway.services.email_auth_service import EmailAuthService
14 from mcpgateway.db import SessionLocal
16 with SessionLocal() as db:
17 service = EmailAuthService(db)
18 # Use in async context:
19 # user = await service.create_user("test@example.com", "password123")
20 # authenticated = await service.authenticate_user("test@example.com", "password123")
21"""
23# Standard
24import asyncio
25import base64
26from dataclasses import dataclass
27from datetime import datetime, timedelta, timezone
28import hashlib
29import hmac
30import re
31import secrets
32import time
33from typing import Optional
34import urllib.parse
35import warnings
37# Third-Party
38import orjson
39from sqlalchemy import and_, delete, desc, func, or_, select
40from sqlalchemy.exc import IntegrityError
41from sqlalchemy.orm import Session
43# First-Party
44from mcpgateway.common.validators import SecurityValidator
45from mcpgateway.config import settings
46from mcpgateway.db import (
47 EmailAuthEvent,
48 EmailTeam,
49 EmailTeamInvitation,
50 EmailTeamJoinRequest,
51 EmailTeamMember,
52 EmailTeamMemberHistory,
53 EmailUser,
54 PasswordResetToken,
55 PendingUserApproval,
56 Role,
57 SSOAuthSession,
58 TokenRevocation,
59 UserRole,
60 utc_now,
61)
62from mcpgateway.schemas import PaginationLinks, PaginationMeta
63from mcpgateway.services.argon2_service import Argon2PasswordService
64from mcpgateway.services.email_notification_service import AuthEmailNotificationService
65from mcpgateway.services.logging_service import LoggingService
66from mcpgateway.services.metrics import password_reset_completions_counter, password_reset_requests_counter
67from mcpgateway.utils.pagination import unified_paginate
69# Initialize logging
70logging_service = LoggingService()
71logger = logging_service.get_logger(__name__)
73_GET_ALL_USERS_LIMIT = 10000
74_DUMMY_ARGON2_HASH = "$argon2id$v=19$m=65536,t=3,p=1$9x/nTs9D0R97+BI7BWP2Tg$V/40qCuaGh4i+94HpGpxJESEVs3IDpLzUqtNqRPuty4"
77@dataclass(frozen=True)
78class UsersListResult:
79 """Result for list_users queries."""
81 data: list[EmailUser]
82 next_cursor: Optional[str] = None
83 pagination: Optional[PaginationMeta] = None
84 links: Optional[PaginationLinks] = None
87@dataclass(frozen=True)
88class PasswordResetRequestResult:
89 """Result for forgot-password requests."""
91 rate_limited: bool
92 email_sent: bool
95class EmailValidationError(Exception):
96 """Raised when email format is invalid.
98 Examples:
99 >>> try:
100 ... raise EmailValidationError("Invalid email format")
101 ... except EmailValidationError as e:
102 ... str(e)
103 'Invalid email format'
104 """
107class PasswordValidationError(Exception):
108 """Raised when password doesn't meet policy requirements.
110 Examples:
111 >>> try:
112 ... raise PasswordValidationError("Password too short")
113 ... except PasswordValidationError as e:
114 ... str(e)
115 'Password too short'
116 """
119class UserExistsError(Exception):
120 """Raised when attempting to create a user that already exists.
122 Examples:
123 >>> try:
124 ... raise UserExistsError("User already exists")
125 ... except UserExistsError as e:
126 ... str(e)
127 'User already exists'
128 """
131class AuthenticationError(Exception):
132 """Raised when authentication fails.
134 Examples:
135 >>> try:
136 ... raise AuthenticationError("Invalid credentials")
137 ... except AuthenticationError as e:
138 ... str(e)
139 'Invalid credentials'
140 """
143class EmailAuthService:
144 """Service for email-based user authentication.
146 This service handles user registration, authentication, password management,
147 and security features like account lockout and failed attempt tracking.
149 Attributes:
150 db (Session): Database session
151 password_service (Argon2PasswordService): Password hashing service
153 Examples:
154 >>> from mcpgateway.db import SessionLocal
155 >>> with SessionLocal() as db:
156 ... service = EmailAuthService(db)
157 ... # Service is ready to use
158 """
160 get_all_users_deprecated_warned = False
162 def __init__(self, db: Session):
163 """Initialize the email authentication service.
165 Args:
166 db: SQLAlchemy database session
167 """
168 self.db = db
169 self.password_service = Argon2PasswordService()
170 self.email_notification_service = AuthEmailNotificationService()
171 self._role_service = None
172 logger.debug("EmailAuthService initialized")
174 @property
175 def role_service(self):
176 """Lazy-initialized RoleService to avoid circular imports.
178 Returns:
179 RoleService: Instance of RoleService
180 """
181 if self._role_service is None:
182 # First-Party
183 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
185 self._role_service = RoleService(self.db)
186 return self._role_service
188 def validate_email(self, email: str) -> bool:
189 """Validate email address format.
191 Args:
192 email: Email address to validate
194 Returns:
195 bool: True if email is valid
197 Raises:
198 EmailValidationError: If email format is invalid
200 Examples:
201 >>> service = EmailAuthService(None)
202 >>> service.validate_email("user@example.com")
203 True
204 >>> service.validate_email("test.user+tag@domain.co.uk")
205 True
206 >>> service.validate_email("user123@test-domain.com")
207 True
208 >>> try:
209 ... service.validate_email("invalid-email")
210 ... except EmailValidationError as e:
211 ... "Invalid email format" in str(e)
212 True
213 >>> try:
214 ... service.validate_email("")
215 ... except EmailValidationError as e:
216 ... "Email is required" in str(e)
217 True
218 >>> try:
219 ... service.validate_email("user@")
220 ... except EmailValidationError as e:
221 ... "Invalid email format" in str(e)
222 True
223 >>> try:
224 ... service.validate_email("@domain.com")
225 ... except EmailValidationError as e:
226 ... "Invalid email format" in str(e)
227 True
228 >>> try:
229 ... service.validate_email("user@domain")
230 ... except EmailValidationError as e:
231 ... "Invalid email format" in str(e)
232 True
233 >>> try:
234 ... service.validate_email("a" * 250 + "@domain.com")
235 ... except EmailValidationError as e:
236 ... "Email address too long" in str(e)
237 True
238 >>> try:
239 ... service.validate_email(None)
240 ... except EmailValidationError as e:
241 ... "Email is required" in str(e)
242 True
243 """
244 if not email or not isinstance(email, str):
245 raise EmailValidationError("Email is required and must be a string")
247 # Basic email regex pattern
248 email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
250 if not re.match(email_pattern, email):
251 raise EmailValidationError("Invalid email format")
253 if len(email) > 255:
254 raise EmailValidationError("Email address too long (max 255 characters)")
256 return True
258 def validate_password(self, password: str) -> bool:
259 """Validate password against policy requirements.
261 Args:
262 password: Password to validate
264 Returns:
265 bool: True if password meets policy
267 Raises:
268 PasswordValidationError: If password doesn't meet requirements
270 Examples:
271 >>> service = EmailAuthService(None)
272 >>> service.validate_password("Password123!") # Meets all requirements
273 True
274 >>> service.validate_password("ValidPassword123!")
275 True
276 >>> service.validate_password("Shortpass!") # 8+ chars with requirements
277 True
278 >>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!")
279 True
280 >>> try:
281 ... service.validate_password("")
282 ... except PasswordValidationError as e:
283 ... "Password is required" in str(e)
284 True
285 >>> try:
286 ... service.validate_password(None)
287 ... except PasswordValidationError as e:
288 ... "Password is required" in str(e)
289 True
290 >>> try:
291 ... service.validate_password("short") # Only 5 chars, should fail with default min_length=8
292 ... except PasswordValidationError as e:
293 ... "characters long" in str(e)
294 True
295 """
296 if not password:
297 raise PasswordValidationError("Password is required")
299 # Respect global toggle for password policy
300 if not getattr(settings, "password_policy_enabled", True):
301 return True
303 # Get password policy settings
304 min_length = getattr(settings, "password_min_length", 8)
305 require_uppercase = getattr(settings, "password_require_uppercase", False)
306 require_lowercase = getattr(settings, "password_require_lowercase", False)
307 require_numbers = getattr(settings, "password_require_numbers", False)
308 require_special = getattr(settings, "password_require_special", False)
310 if len(password) < min_length:
311 raise PasswordValidationError(f"Password must be at least {min_length} characters long")
313 if require_uppercase and not re.search(r"[A-Z]", password):
314 raise PasswordValidationError("Password must contain at least one uppercase letter")
316 if require_lowercase and not re.search(r"[a-z]", password):
317 raise PasswordValidationError("Password must contain at least one lowercase letter")
319 if require_numbers and not re.search(r"[0-9]", password):
320 raise PasswordValidationError("Password must contain at least one number")
322 if require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
323 raise PasswordValidationError("Password must contain at least one special character")
325 return True
327 @staticmethod
328 def _hash_reset_token(token: str) -> str:
329 """Hash a plaintext password-reset token using SHA-256.
331 Args:
332 token: Plaintext reset token.
334 Returns:
335 str: Hex-encoded SHA-256 digest.
336 """
337 return hashlib.sha256(token.encode("utf-8")).hexdigest()
339 @staticmethod
340 def _minimum_reset_response_seconds() -> float:
341 """Get minimum forgot-password response duration.
343 Returns:
344 float: Minimum response duration in seconds.
345 """
346 min_ms = max(0, int(getattr(settings, "password_reset_min_response_ms", 250)))
347 return min_ms / 1000.0
349 @staticmethod
350 def _minimum_login_failure_seconds() -> float:
351 """Get minimum failed-login response duration.
353 Returns:
354 float: Minimum failure response duration in seconds.
355 """
356 min_ms = max(0, int(getattr(settings, "failed_login_min_response_ms", 250)))
357 return min_ms / 1000.0
359 async def _apply_failed_login_floor(self, start_time: float) -> None:
360 """Apply minimum failed-login response duration.
362 Args:
363 start_time: Monotonic timestamp when login processing started.
364 """
365 remaining = self._minimum_login_failure_seconds() - (time.monotonic() - start_time)
366 if remaining > 0:
367 await asyncio.sleep(remaining)
369 async def _verify_dummy_password_for_timing(self, password: str) -> None:
370 """Run dummy Argon2 verification to reduce observable timing differences.
372 Args:
373 password: User-supplied password candidate.
374 """
375 try:
376 await self.password_service.verify_password_async(password, _DUMMY_ARGON2_HASH)
377 except Exception as exc: # nosec B110
378 logger.debug("Dummy password verification failed: %s", exc)
380 @staticmethod
381 def _build_forgot_password_url() -> str:
382 """Build the absolute forgot-password page URL.
384 Returns:
385 str: Absolute forgot-password URL.
386 """
387 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/")
388 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/")
389 return f"{app_domain}{root_path}/admin/forgot-password"
391 @staticmethod
392 def _build_reset_password_url(token: str) -> str:
393 """Build the absolute reset-password URL for a token.
395 Args:
396 token: Plaintext reset token.
398 Returns:
399 str: Absolute reset-password URL.
400 """
401 safe_token = urllib.parse.quote(token, safe="")
402 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/")
403 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/")
404 return f"{app_domain}{root_path}/admin/reset-password/{safe_token}"
406 async def _invalidate_user_auth_cache(self, email: str) -> None:
407 """Invalidate cached authentication data for a user.
409 Args:
410 email: User email for cache invalidation.
411 """
412 try:
413 # First-Party
414 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
416 await asyncio.wait_for(asyncio.shield(auth_cache.invalidate_user(email)), timeout=5.0)
417 except asyncio.TimeoutError:
418 logger.warning("Auth cache invalidation timed out for %s - continuing", email)
419 except Exception as cache_error: # nosec B110
420 logger.debug("Failed to invalidate auth cache for %s: %s", email, cache_error)
422 async def _invalidate_deleted_user_auth_caches(self, email: str) -> None:
423 """Invalidate all auth-cache entries affected by permanent user deletion.
425 Args:
426 email: User email for cache invalidation.
427 """
428 try:
429 # First-Party
430 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
432 results = await asyncio.wait_for(
433 asyncio.gather(
434 auth_cache.invalidate_user(email),
435 auth_cache.invalidate_user_teams(email),
436 auth_cache.invalidate_team_membership(email),
437 return_exceptions=True,
438 ),
439 timeout=5.0,
440 )
441 for result in results:
442 if isinstance(result, Exception):
443 logger.debug("Failed to invalidate delete-user auth cache for %s: %s", email, result)
444 except asyncio.TimeoutError:
445 logger.warning("Delete-user auth cache invalidation timed out for %s - continuing", email)
446 except Exception as cache_error: # nosec B110
447 logger.debug("Failed to invalidate delete-user auth cache for %s: %s", email, cache_error)
449 def _log_auth_event(
450 self,
451 event_type: str,
452 success: bool,
453 user_email: Optional[str],
454 ip_address: Optional[str] = None,
455 user_agent: Optional[str] = None,
456 failure_reason: Optional[str] = None,
457 details: Optional[dict] = None,
458 ) -> None:
459 """Persist a custom authentication/security event.
461 Args:
462 event_type: Event type identifier.
463 success: Whether the event succeeded.
464 user_email: Related user email, if available.
465 ip_address: Source IP address.
466 user_agent: Source user agent string.
467 failure_reason: Failure detail when `success` is False.
468 details: Additional structured event payload.
469 """
470 try:
471 event = EmailAuthEvent(
472 user_email=user_email,
473 event_type=event_type,
474 success=success,
475 ip_address=ip_address,
476 user_agent=user_agent,
477 failure_reason=failure_reason,
478 details=orjson.dumps(details).decode() if details else None,
479 )
480 self.db.add(event)
481 self.db.commit()
482 except Exception as exc:
483 self.db.rollback()
484 logger.warning("Failed to persist auth event %s for %s: %s", event_type, user_email, exc)
486 def _recent_password_reset_request_count(self, email: str, now: datetime) -> int:
487 """Count recent password-reset requests for rate limiting.
489 Args:
490 email: Email to count requests for.
491 now: Current UTC timestamp.
493 Returns:
494 int: Number of reset requests in the current rate-limit window.
495 """
496 window_minutes = int(getattr(settings, "password_reset_rate_window_minutes", 15))
497 window_start = now - timedelta(minutes=window_minutes)
498 stmt = (
499 select(func.count(EmailAuthEvent.id)) # pylint: disable=not-callable
500 .where(EmailAuthEvent.event_type == "PASSWORD_RESET_REQUESTED")
501 .where(EmailAuthEvent.user_email == email)
502 .where(EmailAuthEvent.timestamp >= window_start)
503 )
504 count = self.db.execute(stmt).scalar()
505 return int(count or 0)
507 async def get_user_by_email(self, email: str) -> Optional[EmailUser]:
508 """Get user by email address.
510 Args:
511 email: Email address to look up
513 Returns:
514 EmailUser or None if not found
516 Examples:
517 # Assuming database has user "test@example.com"
518 # user = await service.get_user_by_email("test@example.com")
519 # user.email if user else None # Returns: 'test@example.com'
520 """
521 try:
522 stmt = select(EmailUser).where(EmailUser.email == email.lower())
523 result = self.db.execute(stmt)
524 user = result.scalar_one_or_none()
525 return user
526 except Exception as e:
527 logger.error(f"Error getting user by email {SecurityValidator.sanitize_log_message(email)}: {e}")
528 return None
530 async def create_user(
531 self,
532 email: str,
533 password: str,
534 full_name: Optional[str] = None,
535 is_admin: bool = False,
536 is_active: bool = True,
537 password_change_required: bool = False,
538 auth_provider: str = "local",
539 skip_password_validation: bool = False,
540 granted_by: Optional[str] = None,
541 skip_onboarding: bool = False,
542 ) -> EmailUser:
543 """Create a new user with email authentication.
545 Args:
546 email: User's email address (primary key)
547 password: Plain text password (will be hashed)
548 full_name: Optional full name for display
549 is_admin: Whether user has admin privileges
550 is_active: Whether user account is active (default: True)
551 password_change_required: Whether user must change password on next login (default: False)
552 auth_provider: Authentication provider ('local', 'github', etc.)
553 skip_password_validation: Skip password policy validation (for bootstrap)
554 granted_by: Email of user creating this user (for role assignment audit trail)
555 skip_onboarding: Skip personal team creation, role assignment, and
556 success-path registration event logging (for service accounts /
557 synthetic users). Unexpected-failure audit events (the
558 ``except Exception`` path) are always recorded regardless of
559 this flag. Duplicate-user rejections (``UserExistsError``,
560 ``IntegrityError``) are not audited by design.
562 Returns:
563 EmailUser: The created user object
565 Raises:
566 EmailValidationError: If email format is invalid
567 PasswordValidationError: If password doesn't meet policy
568 UserExistsError: If user already exists
570 Examples:
571 # user = await service.create_user(
572 # email="new@example.com",
573 # password="secure123",
574 # full_name="New User",
575 # is_active=True,
576 # password_change_required=False
577 # )
578 # user.email # Returns: 'new@example.com'
579 # user.full_name # Returns: 'New User'
580 # user.is_active # Returns: True
581 """
582 # Normalize email to lowercase
583 email = email.lower().strip()
585 # Validate inputs
586 self.validate_email(email)
587 if not skip_password_validation:
588 self.validate_password(password)
590 # Hash before the first DB read so PgBouncer transaction pooling does not
591 # hold an idle transaction open across the async hashing call.
592 # Callers that skip password validation with an empty password (e.g.
593 # ensure_user_exists for service accounts) get a non-loginable sentinel;
594 # all other callers go through hash_password_async which raises
595 # ValueError on empty input.
596 if not password and skip_password_validation:
597 password_hash = "!disabled" # nosec B105 — not a valid Argon2 hash, verify_password always rejects
598 else:
599 password_hash = await self.password_service.hash_password_async(password)
601 # Check if user already exists
602 existing_user = await self.get_user_by_email(email)
603 if existing_user:
604 raise UserExistsError(f"User with email {email} already exists")
606 # Create new user (record password change timestamp)
607 user = EmailUser(
608 email=email,
609 password_hash=password_hash,
610 full_name=full_name,
611 is_admin=is_admin,
612 is_active=is_active,
613 password_change_required=password_change_required,
614 auth_provider=auth_provider,
615 password_changed_at=utc_now(),
616 admin_origin="api" if is_admin else None,
617 )
619 # Admin-created users are implicitly email-verified (the admin vouched for them)
620 if granted_by:
621 user.email_verified_at = utc_now()
623 try:
624 self.db.add(user)
625 self.db.commit()
626 self.db.refresh(user)
628 logger.info(f"Created new user: {SecurityValidator.sanitize_log_message(email)}")
630 if not skip_onboarding:
631 # Create personal team first if enabled (needed for team-scoped role assignment)
632 personal_team_id = None
633 if getattr(settings, "auto_create_personal_teams", True):
634 try:
635 # Import here to avoid circular imports
636 # First-Party
637 from mcpgateway.services.personal_team_service import PersonalTeamService # pylint: disable=import-outside-toplevel
639 personal_team_service = PersonalTeamService(self.db)
640 personal_team = await personal_team_service.create_personal_team(user)
641 personal_team_id = personal_team.id # Get team_id directly from created team
642 logger.info(f"Created personal team '{personal_team.name}' (ID: {personal_team_id}) for user {SecurityValidator.sanitize_log_message(email)}")
643 except Exception as e:
644 logger.warning(f"Failed to create personal team for {SecurityValidator.sanitize_log_message(email)}: {e}")
645 # Don't fail user creation if personal team creation fails
647 # Auto-assign dual roles using RoleService (after personal team creation)
648 try:
649 granter = granted_by or email # Use granted_by if provided, otherwise self-granted
651 # Determine global role based on admin status
652 global_role_name = settings.default_admin_role if is_admin else settings.default_user_role
653 global_role = await self.role_service.get_role_by_name(global_role_name, "global")
655 if global_role:
656 try:
657 await self.role_service.assign_role_to_user(user_email=email, role_id=global_role.id, scope="global", scope_id=None, granted_by=granter)
658 logger.info(f"Assigned {global_role_name} role (global scope) to user {SecurityValidator.sanitize_log_message(email)}")
659 except ValueError as e:
660 logger.warning(f"Could not assign {global_role_name} role to {SecurityValidator.sanitize_log_message(email)}: {e}")
661 else:
662 logger.warning(f"{global_role_name} role not found. User {SecurityValidator.sanitize_log_message(email)} created without global role.")
664 # Assign team owner role with team scope (if personal team exists)
665 if personal_team_id:
666 team_owner_role_name = settings.default_team_owner_role
667 team_owner_role = await self.role_service.get_role_by_name(team_owner_role_name, "team")
669 if team_owner_role:
670 try:
671 await self.role_service.assign_role_to_user(user_email=email, role_id=team_owner_role.id, scope="team", scope_id=personal_team_id, granted_by=granter)
672 logger.info(f"Assigned {team_owner_role_name} role (team scope: {personal_team_id}) to user {SecurityValidator.sanitize_log_message(email)}")
673 except ValueError as e:
674 logger.warning(f"Could not assign {team_owner_role_name} role to {SecurityValidator.sanitize_log_message(email)}: {e}")
675 else:
676 logger.warning(f"{team_owner_role_name} role not found. User {SecurityValidator.sanitize_log_message(email)} created without team owner role.")
678 except Exception as role_error:
679 logger.error(f"Failed to assign roles to user {SecurityValidator.sanitize_log_message(email)}: {role_error}")
680 # Don't fail user creation if role assignment fails
681 # User can be assigned roles manually later
683 # Log registration event
684 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=True)
685 self.db.add(registration_event)
686 self.db.commit()
688 return user
690 except IntegrityError as e:
691 self.db.rollback()
692 logger.error(f"Database error creating user {SecurityValidator.sanitize_log_message(email)}: {e}")
693 raise UserExistsError(f"User with email {email} already exists") from e
694 except Exception as e:
695 self.db.rollback()
696 logger.error(f"Unexpected error creating user {SecurityValidator.sanitize_log_message(email)}: {e}")
698 # Log failed registration
699 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=False, failure_reason=str(e))
700 self.db.add(registration_event)
701 self.db.commit()
703 raise
705 async def ensure_user_exists(
706 self,
707 email: str,
708 full_name: Optional[str] = None,
709 is_admin: bool = False,
710 auth_provider: str = "local",
711 granted_by: Optional[str] = None,
712 skip_onboarding: bool = False,
713 ) -> tuple[EmailUser, bool]:
714 """Idempotent user creation — returns existing user or creates a new one.
716 Args:
717 email: User's email address
718 full_name: Optional display name
719 is_admin: Whether user has admin privileges
720 auth_provider: Authentication provider
721 granted_by: Email of creating user (for audit trail)
722 skip_onboarding: Skip personal team, role assignment, and success-path
723 audit events (unexpected-failure auditing is always recorded)
725 Returns:
726 Tuple of (user, created) where created is True if the user was newly created.
728 Raises:
729 EmailValidationError: If the email format is invalid.
730 UserExistsError: If a race-condition insert fails and re-fetch still returns None.
731 """
732 email = email.lower().strip()
733 existing = await self.get_user_by_email(email)
734 if existing:
735 return existing, False
737 try:
738 user = await self.create_user(
739 email=email,
740 password="", # nosec B106 — intentionally empty for service accounts
741 full_name=full_name,
742 is_admin=is_admin,
743 auth_provider=auth_provider,
744 skip_password_validation=True,
745 granted_by=granted_by,
746 skip_onboarding=skip_onboarding,
747 password_change_required=True,
748 )
749 return user, True
750 except UserExistsError:
751 # Race condition: another request created the user between our check and insert
752 logger.info(f"Race-condition user creation for {SecurityValidator.sanitize_log_message(email)}, re-fetching existing record")
753 user = await self.get_user_by_email(email)
754 if user:
755 return user, False
756 raise # Should not happen, but don't swallow the error
758 async def authenticate_user(self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[EmailUser]:
759 """Authenticate a user with email and password.
761 Args:
762 email: User's email address
763 password: Plain text password
764 ip_address: Client IP address for logging
765 user_agent: Client user agent for logging
767 Returns:
768 EmailUser if authentication successful, None otherwise
770 Examples:
771 # user = await service.authenticate_user("user@example.com", "correct_password")
772 # user.email if user else None # Returns: 'user@example.com'
773 # await service.authenticate_user("user@example.com", "wrong_password") # Returns: None
774 """
775 email = email.lower().strip()
776 start_time = time.monotonic()
778 # Get user from database
779 user = await self.get_user_by_email(email)
781 # Track authentication attempt
782 auth_success = False
783 failure_reason = None
785 try:
786 if not user:
787 failure_reason = "User not found"
788 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: user not found")
789 await self._verify_dummy_password_for_timing(password)
790 await self._apply_failed_login_floor(start_time)
791 return None
793 if not user.is_active:
794 failure_reason = "Account is disabled"
795 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: account disabled")
796 await self._verify_dummy_password_for_timing(password)
797 await self._apply_failed_login_floor(start_time)
798 return None
800 is_protected_admin = user.is_admin and settings.protect_all_admins
802 # Enforce lockout for all accounts. Protected admins are allowed
803 # to continue attempting login (feature-flagged via protect_all_admins)
804 # but their failed attempts are still tracked for audit purposes.
805 if user.is_account_locked() and not is_protected_admin:
806 failure_reason = "Account is locked"
807 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: account locked")
808 await self._verify_dummy_password_for_timing(password)
809 await self._apply_failed_login_floor(start_time)
810 return None
812 # Verify password
813 if not await self.password_service.verify_password_async(password, user.password_hash):
814 failure_reason = "Invalid password"
816 # Always increment failed attempts — including for protected admins
817 max_attempts = getattr(settings, "max_failed_login_attempts", 5)
818 lockout_duration = getattr(settings, "account_lockout_duration_minutes", 30)
820 is_locked = user.increment_failed_attempts(max_attempts, lockout_duration)
822 if is_locked:
823 logger.warning(f"Account locked for {SecurityValidator.sanitize_log_message(email)} after {max_attempts} failed attempts")
824 failure_reason = "Account locked due to too many failed attempts"
825 lockout_notifications_enabled = getattr(settings, "account_lockout_notification_enabled", True)
826 if isinstance(lockout_notifications_enabled, bool) and lockout_notifications_enabled:
827 locked_until_iso = user.locked_until.isoformat() if user.locked_until else "unknown"
828 try:
829 await self.email_notification_service.send_account_lockout_email(
830 to_email=user.email,
831 full_name=user.full_name,
832 locked_until_iso=locked_until_iso,
833 reset_url=self._build_forgot_password_url(),
834 )
835 except Exception as email_exc:
836 logger.warning("Failed to send lockout notification for %s: %s", email, email_exc)
837 self._log_auth_event(
838 event_type="ACCOUNT_LOCKED",
839 success=True,
840 user_email=email,
841 ip_address=ip_address,
842 user_agent=user_agent,
843 details={"locked_until": user.locked_until.isoformat() if user.locked_until else None},
844 )
846 self.db.commit()
847 logger.info(f"Authentication failed for {SecurityValidator.sanitize_log_message(email)}: invalid password")
848 await self._apply_failed_login_floor(start_time)
849 return None
851 # Authentication successful
852 user.reset_failed_attempts()
853 self.db.commit()
855 auth_success = True
856 logger.info(f"Authentication successful for {SecurityValidator.sanitize_log_message(email)}")
858 return user
860 finally:
861 # Log authentication event
862 auth_event = EmailAuthEvent.create_login_attempt(user_email=email, success=auth_success, ip_address=ip_address, user_agent=user_agent, failure_reason=failure_reason)
863 self.db.add(auth_event)
864 self.db.commit()
866 async def request_password_reset(self, email: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetRequestResult:
867 """Create a password reset token and send reset email when user exists.
869 The function intentionally returns generic outcomes to avoid account
870 enumeration while still allowing rate-limit enforcement.
872 Args:
873 email: User email requesting password reset.
874 ip_address: Source IP address.
875 user_agent: Source user agent string.
877 Returns:
878 PasswordResetRequestResult: Reset request processing outcome.
879 """
880 start_time = time.monotonic()
881 normalized_email = (email or "").lower().strip()
882 now = utc_now()
883 _ = self._hash_reset_token(secrets.token_urlsafe(32))
885 rate_limit = int(getattr(settings, "password_reset_rate_limit", 5))
886 is_rate_limited = bool(normalized_email and self._recent_password_reset_request_count(normalized_email, now) >= rate_limit)
887 if is_rate_limited:
888 password_reset_requests_counter.labels(outcome="rate_limited").inc()
889 self._log_auth_event(
890 event_type="PASSWORD_RESET_RATE_LIMITED",
891 success=False,
892 user_email=normalized_email or None,
893 ip_address=ip_address,
894 user_agent=user_agent,
895 )
896 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time)
897 if remaining > 0:
898 await asyncio.sleep(remaining)
899 return PasswordResetRequestResult(rate_limited=True, email_sent=False)
901 user = await self.get_user_by_email(normalized_email) if normalized_email else None
902 self._log_auth_event(
903 event_type="PASSWORD_RESET_REQUESTED",
904 success=True,
905 user_email=normalized_email or None,
906 ip_address=ip_address,
907 user_agent=user_agent,
908 )
910 email_sent = False
911 if user and user.is_active:
912 token_plaintext = secrets.token_urlsafe(48)
913 token_hash = self._hash_reset_token(token_plaintext)
914 expires_minutes = int(getattr(settings, "password_reset_token_expiry_minutes", 60))
915 expires_at = now + timedelta(minutes=expires_minutes)
917 existing_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.used_at.is_(None)).where(PasswordResetToken.expires_at > now)
918 for existing in self.db.execute(existing_stmt).scalars().all():
919 existing.used_at = now
921 token_record = PasswordResetToken(
922 user_email=user.email,
923 token_hash=token_hash,
924 expires_at=expires_at,
925 ip_address=ip_address,
926 user_agent=user_agent,
927 )
928 self.db.add(token_record)
929 self.db.commit()
931 try:
932 email_sent = await self.email_notification_service.send_password_reset_email(
933 to_email=user.email,
934 full_name=user.full_name,
935 reset_url=self._build_reset_password_url(token_plaintext),
936 expires_minutes=expires_minutes,
937 )
938 except Exception as exc:
939 logger.warning("Failed to send password reset email to %s: %s", user.email, exc)
941 password_reset_requests_counter.labels(outcome="accepted").inc()
942 self._log_auth_event(
943 event_type="PASSWORD_RESET_EMAIL_SENT",
944 success=True,
945 user_email=user.email,
946 ip_address=ip_address,
947 user_agent=user_agent,
948 details={"token_hash": token_hash, "expires_at": expires_at.isoformat(), "email_sent": email_sent},
949 )
950 else:
951 password_reset_requests_counter.labels(outcome="accepted").inc()
953 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time)
954 if remaining > 0:
955 await asyncio.sleep(remaining)
956 return PasswordResetRequestResult(rate_limited=False, email_sent=email_sent)
958 async def validate_password_reset_token(self, token: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetToken:
959 """Validate a one-time password reset token.
961 Args:
962 token: Plaintext password reset token.
963 ip_address: Source IP address.
964 user_agent: Source user agent string.
966 Returns:
967 PasswordResetToken: Matching valid reset token record.
969 Raises:
970 AuthenticationError: If token is missing, invalid, used, or expired.
971 """
972 if not token:
973 password_reset_completions_counter.labels(outcome="invalid_token").inc()
974 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Missing token")
975 raise AuthenticationError("This reset link is invalid")
977 token_hash = self._hash_reset_token(token)
978 stmt = select(PasswordResetToken).where(PasswordResetToken.token_hash == token_hash)
979 reset_token = self.db.execute(stmt).scalar_one_or_none()
981 if not reset_token:
982 password_reset_completions_counter.labels(outcome="invalid_token").inc()
983 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Invalid token hash")
984 raise AuthenticationError("This reset link is invalid")
986 if not hmac.compare_digest(reset_token.token_hash, token_hash):
987 password_reset_completions_counter.labels(outcome="invalid_token").inc()
988 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token hash mismatch")
989 raise AuthenticationError("This reset link is invalid")
991 if reset_token.is_used():
992 password_reset_completions_counter.labels(outcome="used_token").inc()
993 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token already used")
994 raise AuthenticationError("This reset link has already been used")
996 if reset_token.is_expired():
997 password_reset_completions_counter.labels(outcome="expired_token").inc()
998 self._log_auth_event("PASSWORD_RESET_TOKEN_EXPIRED", False, reset_token.user_email, ip_address, user_agent, details={"token_hash": token_hash})
999 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token expired")
1000 raise AuthenticationError("This reset link has expired")
1002 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", True, reset_token.user_email, ip_address, user_agent)
1003 return reset_token
1005 async def reset_password_with_token(self, token: str, new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool:
1006 """Complete password reset using a validated one-time token.
1008 Args:
1009 token: Plaintext password reset token.
1010 new_password: New password value.
1011 ip_address: Source IP address.
1012 user_agent: Source user agent string.
1014 Returns:
1015 bool: True when password reset completed successfully.
1017 Raises:
1018 AuthenticationError: If token or associated user is invalid.
1019 PasswordValidationError: If new password violates policy or reuse checks.
1020 """
1021 reset_token = await self.validate_password_reset_token(token, ip_address=ip_address, user_agent=user_agent)
1022 user = await self.get_user_by_email(reset_token.user_email)
1023 if not user or not user.is_active:
1024 password_reset_completions_counter.labels(outcome="invalid_user").inc()
1025 raise AuthenticationError("This reset link is invalid")
1027 self.validate_password(new_password)
1028 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash):
1029 password_reset_completions_counter.labels(outcome="reused_password").inc()
1030 raise PasswordValidationError("New password must be different from current password")
1032 now = utc_now()
1033 user.password_hash = await self.password_service.hash_password_async(new_password)
1034 user.password_change_required = False
1035 user.password_changed_at = now
1036 user.failed_login_attempts = 0
1037 user.locked_until = None
1039 reset_token.used_at = now
1040 outstanding_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.id != reset_token.id).where(PasswordResetToken.used_at.is_(None))
1041 for outstanding in self.db.execute(outstanding_stmt).scalars().all():
1042 outstanding.used_at = now
1044 self.db.commit()
1046 if getattr(settings, "password_reset_invalidate_sessions", True):
1047 await self._invalidate_user_auth_cache(user.email)
1049 email_sent = False
1050 try:
1051 email_sent = await self.email_notification_service.send_password_reset_confirmation_email(to_email=user.email, full_name=user.full_name)
1052 except Exception as exc:
1053 logger.warning("Failed to send password reset confirmation for %s: %s", user.email, exc)
1055 password_reset_completions_counter.labels(outcome="success").inc()
1056 self._log_auth_event(
1057 event_type="PASSWORD_RESET_COMPLETED",
1058 success=True,
1059 user_email=user.email,
1060 ip_address=ip_address,
1061 user_agent=user_agent,
1062 details={"email_sent": email_sent},
1063 )
1064 return True
1066 async def unlock_user_account(self, email: str, unlocked_by: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> EmailUser:
1067 """Clear lockout state for a user account.
1069 Args:
1070 email: User email to unlock.
1071 unlocked_by: Admin/user identifier who performed unlock.
1072 ip_address: Source IP address.
1073 user_agent: Source user agent string.
1075 Returns:
1076 EmailUser: Updated user record after unlock.
1078 Raises:
1079 ValueError: If the target user cannot be found.
1080 """
1081 normalized_email = email.lower().strip()
1082 user = await self.get_user_by_email(normalized_email)
1083 if not user:
1084 raise ValueError(f"User {normalized_email} not found")
1086 user.failed_login_attempts = 0
1087 user.locked_until = None
1088 user.updated_at = utc_now()
1089 self.db.commit()
1090 self._log_auth_event(
1091 event_type="ACCOUNT_UNLOCKED",
1092 success=True,
1093 user_email=user.email,
1094 ip_address=ip_address,
1095 user_agent=user_agent,
1096 details={"unlocked_by": unlocked_by},
1097 )
1098 return user
1100 async def change_password(self, email: str, old_password: Optional[str], new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool:
1101 """Change a user's password.
1103 Args:
1104 email: User's email address
1105 old_password: Current password for verification
1106 new_password: New password to set
1107 ip_address: Client IP address for logging
1108 user_agent: Client user agent for logging
1110 Returns:
1111 bool: True if password changed successfully
1113 Raises:
1114 AuthenticationError: If old password is incorrect
1115 PasswordValidationError: If new password doesn't meet policy
1116 Exception: If database operation fails
1118 Examples:
1119 # success = await service.change_password(
1120 # "user@example.com",
1121 # "old_password",
1122 # "new_secure_password"
1123 # )
1124 # success # Returns: True
1125 """
1126 # Validate old password is provided
1127 if old_password is None:
1128 raise AuthenticationError("Current password is required")
1130 # First authenticate with old password
1131 user = await self.authenticate_user(email, old_password, ip_address, user_agent)
1132 if not user:
1133 raise AuthenticationError("Current password is incorrect")
1135 # Validate new password
1136 self.validate_password(new_password)
1138 # Check if new password is same as old (optional policy)
1139 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash):
1140 raise PasswordValidationError("New password must be different from current password")
1142 success = False
1143 try:
1144 # Hash new password and update
1145 new_password_hash = await self.password_service.hash_password_async(new_password)
1146 user.password_hash = new_password_hash
1147 # Clear the flag that requires the user to change password
1148 user.password_change_required = False
1149 # Record the password change timestamp
1150 try:
1151 user.password_changed_at = utc_now()
1152 except Exception as exc:
1153 logger.debug("Failed to set password_changed_at for %s: %s", email, exc)
1155 self.db.commit()
1156 success = True
1158 # Invalidate auth cache for user
1159 try:
1160 await self._invalidate_user_auth_cache(email)
1161 except Exception as cache_error: # nosec B110 - best effort cache invalidation
1162 logger.debug("Failed to invalidate auth cache on password change: %s", cache_error)
1164 logger.info(f"Password changed successfully for {SecurityValidator.sanitize_log_message(email)}")
1166 except Exception as e:
1167 self.db.rollback()
1168 logger.error(f"Error changing password for {SecurityValidator.sanitize_log_message(email)}: {e}")
1169 raise
1170 finally:
1171 # Log password change event
1172 password_event = EmailAuthEvent.create_password_change_event(user_email=email, success=success, ip_address=ip_address, user_agent=user_agent)
1173 self.db.add(password_event)
1174 self.db.commit()
1176 return success
1178 async def create_platform_admin(self, email: str, password: str, full_name: Optional[str] = None) -> EmailUser:
1179 """Create or update the platform administrator user.
1181 This method is used during system bootstrap to create the initial
1182 admin user from environment variables.
1184 Args:
1185 email: Admin email address
1186 password: Admin password
1187 full_name: Admin full name
1189 Returns:
1190 EmailUser: The admin user
1192 Examples:
1193 # admin = await service.create_platform_admin(
1194 # "admin@example.com",
1195 # "admin_password",
1196 # "Platform Administrator"
1197 # )
1198 # admin.is_admin # Returns: True
1199 """
1200 # Check if admin user already exists
1201 existing_admin = await self.get_user_by_email(email)
1203 if existing_admin:
1204 # Update existing admin if password or name changed
1205 if full_name and existing_admin.full_name != full_name:
1206 existing_admin.full_name = full_name
1208 # Check if password needs update (verify current password first)
1209 if not await self.password_service.verify_password_async(password, existing_admin.password_hash):
1210 existing_admin.password_hash = await self.password_service.hash_password_async(password)
1211 try:
1212 existing_admin.password_changed_at = utc_now()
1213 except Exception as exc:
1214 logger.debug("Failed to set password_changed_at for existing admin %s: %s", email, exc)
1216 # Ensure admin status
1217 existing_admin.is_admin = True
1218 existing_admin.is_active = True
1220 # Synchronize platform_admin RBAC role with is_admin flag
1221 # This ensures atomicity: when setting is_admin=True, also assign the platform_admin role
1222 try:
1223 platform_admin_role = await self.role_service.get_role_by_name("platform_admin", "global")
1224 if platform_admin_role:
1225 # Check if role assignment already exists
1226 existing_assignment = await self.role_service.get_user_role_assignment(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None)
1228 if not existing_assignment or not existing_assignment.is_active:
1229 await self.role_service.assign_role_to_user(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=email)
1230 logger.info(f"Assigned platform_admin role to {SecurityValidator.sanitize_log_message(email)} during create_platform_admin()")
1231 else:
1232 logger.debug(f"User {SecurityValidator.sanitize_log_message(email)} already has active platform_admin role")
1233 else:
1234 logger.warning(f"platform_admin role not found. User {SecurityValidator.sanitize_log_message(email)} updated with is_admin=True but without platform_admin role assignment.")
1235 except Exception as role_error:
1236 logger.error(
1237 f"Failed to assign platform_admin role to {SecurityValidator.sanitize_log_message(email)}: {SecurityValidator.sanitize_log_message(str(role_error))}. User updated with is_admin=True but role assignment failed."
1238 )
1239 # Rollback to clear any failed transaction state (e.g. PendingRollbackError
1240 # from a failed commit inside assign_role_to_user), then re-apply admin flags
1241 # so the subsequent commit can persist the admin user update.
1242 try:
1243 self.db.rollback()
1244 existing_admin.is_admin = True
1245 existing_admin.is_active = True
1246 except Exception as rollback_error: # nosec B110
1247 logger.debug("Session rollback after role sync failure also failed: %s", rollback_error)
1248 # bootstrap_default_roles() will sync the role assignment later
1250 self.db.commit()
1251 logger.info(f"Updated platform admin user: {SecurityValidator.sanitize_log_message(email)}")
1252 return existing_admin
1254 # Create new admin user - skip password validation during bootstrap
1255 admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True)
1257 logger.info(f"Created platform admin user: {SecurityValidator.sanitize_log_message(email)}")
1258 return admin_user
1260 async def update_last_login(self, email: str) -> None:
1261 """Update the last login timestamp for a user.
1263 Args:
1264 email: User's email address
1265 """
1266 user = await self.get_user_by_email(email)
1267 if user:
1268 user.reset_failed_attempts() # This also updates last_login
1269 self.db.commit()
1271 @staticmethod
1272 def _escape_like(value: str) -> str:
1273 """Escape LIKE wildcards for prefix search.
1275 Args:
1276 value: Raw value to escape for LIKE matching.
1278 Returns:
1279 Escaped string safe for LIKE patterns.
1280 """
1281 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
1283 async def list_users(
1284 self,
1285 limit: Optional[int] = None,
1286 cursor: Optional[str] = None,
1287 page: Optional[int] = None,
1288 per_page: Optional[int] = None,
1289 search: Optional[str] = None,
1290 ) -> UsersListResult:
1291 """List all users with cursor or page-based pagination support and optional search.
1293 This method supports both cursor-based (for API endpoints with large datasets)
1294 and page-based (for admin UI with page numbers) pagination, with optional
1295 search filtering by email or full name.
1297 Note: This method returns ORM objects and cannot be cached since callers
1298 depend on ORM attributes and methods (e.g., EmailUserResponse.from_email_user).
1300 Args:
1301 limit: Maximum number of users to return (for cursor-based pagination)
1302 cursor: Opaque cursor token for cursor-based pagination
1303 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1304 per_page: Items per page for page-based pagination
1305 search: Optional search term to filter by email or full name (case-insensitive)
1307 Returns:
1308 UsersListResult with data and optional pagination metadata.
1310 Examples:
1311 # Cursor-based pagination (for APIs)
1312 # result = await service.list_users(cursor=None, limit=50)
1313 # len(result.data) <= 50 # Returns: True
1315 # Page-based pagination (for admin UI)
1316 # result = await service.list_users(page=1, per_page=10)
1317 # result.data # Returns: list of users
1318 # result.pagination # Returns: pagination metadata
1320 # Search users
1321 # users = await service.list_users(search="john", page=1, per_page=10)
1322 # All users with "john" in email or name
1323 """
1324 try:
1325 # Build base query with ordering by created_at, email for consistent pagination
1326 # Note: EmailUser uses email as primary key, not id
1327 query = select(EmailUser).order_by(desc(EmailUser.created_at), desc(EmailUser.email))
1329 # Apply search filter if provided (prefix search for better index usage)
1330 if search and search.strip():
1331 search_term = f"{self._escape_like(search.strip())}%"
1332 # NOTE: For large Postgres datasets, consider citext or functional indexes for case-insensitive search.
1333 query = query.where(
1334 or_(
1335 EmailUser.email.ilike(search_term, escape="\\"),
1336 EmailUser.full_name.ilike(search_term, escape="\\"),
1337 )
1338 )
1340 # Page-based pagination: use unified_paginate
1341 if page is not None:
1342 pag_result = await unified_paginate(
1343 db=self.db,
1344 query=query,
1345 page=page,
1346 per_page=per_page,
1347 cursor=None,
1348 limit=None,
1349 base_url="/admin/users",
1350 query_params={},
1351 )
1352 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"])
1354 # Cursor-based pagination: custom implementation for EmailUser
1355 # EmailUser uses email as PK (not id), so we need custom cursor using (created_at, email)
1356 page_size = limit if limit and limit > 0 else settings.pagination_default_page_size
1357 if limit == 0:
1358 page_size = None # No limit
1360 # Decode cursor and apply keyset filter if provided
1361 if cursor:
1362 try:
1363 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
1364 cursor_data = orjson.loads(cursor_json)
1365 last_email = cursor_data.get("email")
1366 created_str = cursor_data.get("created_at")
1367 if last_email and created_str:
1368 last_created = datetime.fromisoformat(created_str)
1369 # Apply keyset filter (assumes DESC order on created_at, email)
1370 query = query.where(
1371 or_(
1372 EmailUser.created_at < last_created,
1373 and_(EmailUser.created_at == last_created, EmailUser.email < last_email),
1374 )
1375 )
1376 except (ValueError, TypeError) as e:
1377 logger.warning(f"Invalid cursor for user pagination, ignoring: {e}")
1379 # Fetch page_size + 1 to determine if there are more results
1380 if page_size is not None:
1381 query = query.limit(page_size + 1)
1382 result = self.db.execute(query)
1383 users = list(result.scalars().all())
1385 if page_size is None:
1386 return UsersListResult(data=users, next_cursor=None)
1388 # Check if there are more results
1389 has_more = len(users) > page_size
1390 if has_more:
1391 users = users[:page_size]
1393 # Generate next cursor using (created_at, email) for EmailUser
1394 next_cursor = None
1395 if has_more and users:
1396 last_user = users[-1]
1397 cursor_data = {
1398 "created_at": last_user.created_at.isoformat() if last_user.created_at else None,
1399 "email": last_user.email,
1400 }
1401 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
1403 return UsersListResult(data=users, next_cursor=next_cursor)
1405 except Exception as e:
1406 logger.error(f"Error listing users: {e}")
1407 # Return appropriate empty response based on pagination mode
1408 if page is not None:
1409 fallback_per_page = per_page or 50
1410 return UsersListResult(
1411 data=[],
1412 pagination=PaginationMeta(page=page, per_page=fallback_per_page, total_items=0, total_pages=0, has_next=False, has_prev=False),
1413 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg
1414 self=f"/admin/users?page=1&per_page={fallback_per_page}",
1415 first=f"/admin/users?page=1&per_page={fallback_per_page}",
1416 last=f"/admin/users?page=1&per_page={fallback_per_page}",
1417 ),
1418 )
1420 if cursor is not None:
1421 return UsersListResult(data=[], next_cursor=None)
1423 return UsersListResult(data=[])
1425 async def list_users_not_in_team(
1426 self,
1427 team_id: str,
1428 cursor: Optional[str] = None,
1429 limit: Optional[int] = None,
1430 page: Optional[int] = None,
1431 per_page: Optional[int] = None,
1432 search: Optional[str] = None,
1433 ) -> UsersListResult:
1434 """List users who are NOT members of the specified team with cursor or page-based pagination.
1436 Uses a NOT IN subquery to efficiently exclude team members.
1438 Args:
1439 team_id: ID of the team to exclude members from
1440 cursor: Opaque cursor token for cursor-based pagination
1441 limit: Maximum number of users to return (for cursor-based, default: 50)
1442 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1443 per_page: Items per page for page-based pagination (default: 30)
1444 search: Optional search term to filter by email or full name
1446 Returns:
1447 UsersListResult with data and either cursor or pagination metadata
1449 Examples:
1450 # Page-based (admin UI)
1451 # result = await service.list_users_not_in_team("team-123", page=1, per_page=30)
1452 # result.pagination # Returns: pagination metadata
1454 # Cursor-based (API)
1455 # result = await service.list_users_not_in_team("team-123", cursor=None, limit=50)
1456 # result.next_cursor # Returns: next cursor token
1457 """
1458 try:
1459 # Build base query
1460 query = select(EmailUser)
1462 # Apply search filter if provided
1463 if search and search.strip():
1464 search_term = f"{self._escape_like(search.strip())}%"
1465 query = query.where(
1466 or_(
1467 EmailUser.email.ilike(search_term, escape="\\"),
1468 EmailUser.full_name.ilike(search_term, escape="\\"),
1469 )
1470 )
1472 # Exclude team members using NOT IN subquery
1473 member_emails_subquery = select(EmailTeamMember.user_email).where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
1474 query = query.where(EmailUser.is_active.is_(True), ~EmailUser.email.in_(member_emails_subquery))
1476 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate
1477 if page is not None:
1478 query = query.order_by(EmailUser.full_name, EmailUser.email)
1479 pag_result = await unified_paginate(
1480 db=self.db,
1481 query=query,
1482 page=page,
1483 per_page=per_page or 30,
1484 cursor=None,
1485 limit=None,
1486 base_url=f"/admin/teams/{team_id}/non-members",
1487 query_params={},
1488 )
1489 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"])
1491 # CURSOR-BASED PAGINATION - custom implementation using (created_at, email)
1492 # unified_paginate uses (created_at, id) but EmailUser uses email as PK
1493 query = query.order_by(desc(EmailUser.created_at), desc(EmailUser.email))
1495 # Decode cursor and apply keyset filter
1496 if cursor:
1497 try:
1498 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
1499 cursor_data = orjson.loads(cursor_json)
1500 last_email = cursor_data.get("email")
1501 created_str = cursor_data.get("created_at")
1502 if last_email and created_str:
1503 last_created = datetime.fromisoformat(created_str)
1504 # Keyset filter: (created_at < last) OR (created_at = last AND email < last_email)
1505 query = query.where(
1506 or_(
1507 EmailUser.created_at < last_created,
1508 and_(EmailUser.created_at == last_created, EmailUser.email < last_email),
1509 )
1510 )
1511 except (ValueError, TypeError) as e:
1512 logger.warning(f"Invalid cursor for non-members list, ignoring: {e}")
1514 # Fetch limit + 1 to check for more results
1515 page_size = limit or 50
1516 query = query.limit(page_size + 1)
1517 users = list(self.db.execute(query).scalars().all())
1519 # Check if there are more results
1520 has_more = len(users) > page_size
1521 if has_more:
1522 users = users[:page_size]
1524 # Generate next cursor using (created_at, email)
1525 next_cursor = None
1526 if has_more and users:
1527 last_user = users[-1]
1528 cursor_data = {
1529 "created_at": last_user.created_at.isoformat() if last_user.created_at else None,
1530 "email": last_user.email,
1531 }
1532 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
1534 self.db.commit()
1535 return UsersListResult(data=users, next_cursor=next_cursor)
1537 except Exception as e:
1538 logger.error(f"Error listing non-members for team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
1540 # Return appropriate empty response based on mode
1541 if page is not None:
1542 return UsersListResult(
1543 data=[],
1544 pagination=PaginationMeta(page=page, per_page=per_page or 30, total_items=0, total_pages=0, has_next=False, has_prev=False),
1545 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg
1546 self=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
1547 first=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
1548 last=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
1549 ),
1550 )
1552 return UsersListResult(data=[], next_cursor=None)
1554 async def get_all_users(self) -> list[EmailUser]:
1555 """Get all users without pagination.
1557 .. deprecated:: 1.0
1558 Use :meth:`list_users` with proper pagination instead.
1559 This method has a hardcoded limit of 10,000 users and will not return
1560 more than that. For production systems with many users, use paginated
1561 access with search/filtering.
1563 Returns:
1564 List of up to 10,000 EmailUser objects
1566 Raises:
1567 ValueError: If total users exceed 10,000
1569 Examples:
1570 # users = await service.get_all_users()
1571 # isinstance(users, list) # Returns: True
1573 Warning:
1574 This method is deprecated and will be removed in a future version.
1575 Use list_users() with pagination instead:
1577 # For small datasets
1578 users = await service.list_users(page=1, per_page=1000).data
1580 # For searching
1581 users = await service.list_users(search="john", page=1, per_page=10).data
1582 """
1583 if not self.__class__.get_all_users_deprecated_warned:
1584 warnings.warn(
1585 "get_all_users() is deprecated and limited to 10,000 users. " + "Use list_users() with pagination instead.",
1586 DeprecationWarning,
1587 stacklevel=2,
1588 )
1589 self.__class__.get_all_users_deprecated_warned = True
1591 total_users = await self.count_users()
1592 if total_users > _GET_ALL_USERS_LIMIT:
1593 raise ValueError("get_all_users() supports up to 10,000 users. Use list_users() pagination instead.")
1595 result = await self.list_users(limit=_GET_ALL_USERS_LIMIT)
1596 return result.data # Large limit to get all users
1598 async def count_users(self) -> int:
1599 """Count total number of users.
1601 Returns:
1602 int: Total user count
1603 """
1604 try:
1605 stmt = select(func.count(EmailUser.email)) # pylint: disable=not-callable
1606 count = self.db.execute(stmt).scalar() or 0
1607 return count
1608 except Exception as e:
1609 logger.error(f"Error counting users: {e}")
1610 return 0
1612 async def get_auth_events(self, email: Optional[str] = None, limit: int = 100, offset: int = 0) -> list[EmailAuthEvent]:
1613 """Get authentication events for auditing.
1615 Args:
1616 email: Filter by specific user email (optional)
1617 limit: Maximum number of events to return
1618 offset: Number of events to skip
1620 Returns:
1621 List of EmailAuthEvent objects
1622 """
1623 try:
1624 stmt = select(EmailAuthEvent)
1625 if email:
1626 stmt = stmt.where(EmailAuthEvent.user_email == email)
1627 stmt = stmt.order_by(EmailAuthEvent.timestamp.desc()).offset(offset).limit(limit)
1629 result = self.db.execute(stmt)
1630 events = list(result.scalars().all())
1631 return events
1632 except Exception as e:
1633 logger.error(f"Error getting auth events: {e}")
1634 return []
1636 async def update_user(
1637 self,
1638 email: str,
1639 full_name: Optional[str] = None,
1640 is_admin: Optional[bool] = None,
1641 is_active: Optional[bool] = None,
1642 email_verified: Optional[bool] = None,
1643 password_change_required: Optional[bool] = None,
1644 password: Optional[str] = None,
1645 admin_origin_source: Optional[str] = None,
1646 ) -> EmailUser:
1647 """Update user information.
1649 Args:
1650 email: User's email address (primary key)
1651 full_name: New full name (optional)
1652 is_admin: New admin status (optional)
1653 is_active: New active status (optional)
1654 email_verified: Set email verification status (optional)
1655 password_change_required: Whether user must change password on next login (optional)
1656 password: New password (optional, will be hashed)
1657 admin_origin_source: Source of admin change for tracking (e.g. "api", "ui"). Callers should pass explicitly.
1659 Returns:
1660 EmailUser: Updated user object
1662 Raises:
1663 ValueError: If user doesn't exist, if protect_all_admins blocks the change, or if it would remove the last active admin
1664 PasswordValidationError: If password doesn't meet policy
1665 """
1666 try:
1667 # Normalize email to match create_user() / get_user_by_email() behavior
1668 email = email.lower().strip()
1670 # Get existing user
1671 stmt = select(EmailUser).where(EmailUser.email == email)
1672 result = self.db.execute(stmt)
1673 user = result.scalar_one_or_none()
1675 if not user:
1676 raise ValueError(f"User {email} not found")
1678 # Admin protection guard
1679 if user.is_admin and user.is_active:
1680 would_lose_admin = (is_admin is not None and not is_admin) or (is_active is not None and not is_active)
1681 if would_lose_admin:
1682 if settings.protect_all_admins:
1683 raise ValueError("Admin protection is enabled — cannot demote or deactivate any admin user")
1684 if await self.is_last_active_admin(email):
1685 raise ValueError("Cannot demote or deactivate the last remaining active admin user")
1687 # Update fields if provided
1688 if full_name is not None:
1689 user.full_name = full_name
1691 if email_verified is not None:
1692 user.email_verified_at = utc_now() if email_verified else None
1694 if is_admin is not None:
1695 # Track admin_origin when status actually changes
1696 if is_admin != user.is_admin:
1697 user.is_admin = is_admin
1698 user.admin_origin = admin_origin_source if is_admin else None
1700 # Sync global role assignment with is_admin flag:
1701 # Promotion: revoke default_user_role, assign default_admin_role
1702 # Demotion: revoke default_admin_role, assign default_user_role
1703 try:
1704 admin_role_name = settings.default_admin_role
1705 user_role_name = settings.default_user_role
1706 admin_role = await self.role_service.get_role_by_name(admin_role_name, "global")
1707 user_role = await self.role_service.get_role_by_name(user_role_name, "global")
1709 if is_admin:
1710 # Promotion: assign admin role, revoke user role
1711 if admin_role:
1712 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=admin_role.id, scope="global", scope_id=None)
1713 if not existing or not existing.is_active:
1714 await self.role_service.assign_role_to_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None, granted_by=email)
1715 logger.info(f"Assigned {admin_role_name} role to {SecurityValidator.sanitize_log_message(email)}")
1716 else:
1717 logger.warning(f"{admin_role_name} role not found, cannot assign to {SecurityValidator.sanitize_log_message(email)}")
1719 if user_role:
1720 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None)
1721 if revoked:
1722 logger.info(f"Revoked {SecurityValidator.sanitize_log_message(user_role_name)} role from {SecurityValidator.sanitize_log_message(email)}")
1723 else:
1724 # Demotion: revoke admin role, assign user role
1725 if admin_role:
1726 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None)
1727 if revoked:
1728 logger.info(f"Revoked {admin_role_name} role from {SecurityValidator.sanitize_log_message(email)}")
1730 if user_role:
1731 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=user_role.id, scope="global", scope_id=None)
1732 if not existing or not existing.is_active:
1733 await self.role_service.assign_role_to_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None, granted_by=email)
1734 logger.info(f"Assigned {SecurityValidator.sanitize_log_message(user_role_name)} role to {SecurityValidator.sanitize_log_message(email)}")
1735 else:
1736 logger.warning(f"{SecurityValidator.sanitize_log_message(user_role_name)} role not found, cannot assign to {SecurityValidator.sanitize_log_message(email)}")
1738 except Exception as e:
1739 logger.warning(f"Failed to sync global roles for {SecurityValidator.sanitize_log_message(email)}: {e}")
1740 # Don't fail user update if role sync fails
1742 if is_active is not None:
1743 user.is_active = is_active
1745 if password is not None:
1746 self.validate_password(password)
1747 user.password_hash = await self.password_service.hash_password_async(password)
1748 # Only clear password_change_required if it wasn't explicitly set
1749 if password_change_required is None:
1750 user.password_change_required = False
1751 user.password_changed_at = utc_now()
1753 # Set password_change_required after password processing to allow explicit override
1754 if password_change_required is not None:
1755 user.password_change_required = password_change_required
1757 user.updated_at = datetime.now(timezone.utc)
1759 self.db.commit()
1761 return user
1763 except Exception as e:
1764 self.db.rollback()
1765 logger.error(f"Error updating user {SecurityValidator.sanitize_log_message(email)}: {e}")
1766 raise
1768 async def activate_user(self, email: str) -> EmailUser:
1769 """Activate a user account.
1771 Args:
1772 email: User's email address
1774 Returns:
1775 EmailUser: Updated user object
1777 Raises:
1778 ValueError: If user doesn't exist
1779 """
1780 try:
1781 stmt = select(EmailUser).where(EmailUser.email == email)
1782 result = self.db.execute(stmt)
1783 user = result.scalar_one_or_none()
1785 if not user:
1786 raise ValueError(f"User {email} not found")
1788 user.is_active = True
1789 user.updated_at = datetime.now(timezone.utc)
1791 self.db.commit()
1793 logger.info(f"User {SecurityValidator.sanitize_log_message(email)} activated")
1794 return user
1796 except Exception as e:
1797 self.db.rollback()
1798 logger.error(f"Error activating user {SecurityValidator.sanitize_log_message(email)}: {e}")
1799 raise
1801 async def deactivate_user(self, email: str) -> EmailUser:
1802 """Deactivate a user account.
1804 Args:
1805 email: User's email address
1807 Returns:
1808 EmailUser: Updated user object
1810 Raises:
1811 ValueError: If user doesn't exist
1812 """
1813 try:
1814 stmt = select(EmailUser).where(EmailUser.email == email)
1815 result = self.db.execute(stmt)
1816 user = result.scalar_one_or_none()
1818 if not user:
1819 raise ValueError(f"User {email} not found")
1821 user.is_active = False
1822 user.updated_at = datetime.now(timezone.utc)
1824 self.db.commit()
1826 logger.info(f"User {SecurityValidator.sanitize_log_message(email)} deactivated")
1827 return user
1829 except Exception as e:
1830 self.db.rollback()
1831 logger.error(f"Error deactivating user {SecurityValidator.sanitize_log_message(email)}: {e}")
1832 raise
1834 async def delete_user(self, email: str) -> bool:
1835 """Delete a user account permanently.
1837 Args:
1838 email: User's email address
1840 Returns:
1841 bool: True if user was deleted
1843 Raises:
1844 ValueError: If user doesn't exist
1845 ValueError: If user owns teams that cannot be transferred
1846 """
1847 try:
1848 stmt = select(EmailUser).where(EmailUser.email == email)
1849 result = self.db.execute(stmt)
1850 user = result.scalar_one_or_none()
1852 if not user:
1853 raise ValueError(f"User {email} not found")
1855 # Check if user owns any teams
1856 teams_owned_stmt = select(EmailTeam).where(EmailTeam.created_by == email)
1857 teams_owned = self.db.execute(teams_owned_stmt).scalars().all()
1859 if teams_owned:
1860 # For each team, try to transfer ownership to another owner
1861 for team in teams_owned:
1862 # Find other team owners who can take ownership
1863 potential_owners_stmt = (
1864 select(EmailTeamMember).where(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email != email, EmailTeamMember.role == "owner").order_by(EmailTeamMember.role.desc())
1865 )
1867 potential_owners = self.db.execute(potential_owners_stmt).scalars().all()
1869 if potential_owners:
1870 # Transfer ownership to the first available owner
1871 new_owner = potential_owners[0]
1872 team.created_by = new_owner.user_email
1873 logger.info(f"Transferred team '{SecurityValidator.sanitize_log_message(team.name)}' ownership from {SecurityValidator.sanitize_log_message(email)} to {new_owner.user_email}")
1874 else:
1875 # No other owners available - check if it's a single-user team
1876 all_members_stmt = select(EmailTeamMember).where(EmailTeamMember.team_id == team.id)
1877 all_members = self.db.execute(all_members_stmt).scalars().all()
1879 if len(all_members) == 1 and all_members[0].user_email == email:
1880 # This is a single-user personal team - cascade delete it
1881 logger.info(f"Deleting personal team '{SecurityValidator.sanitize_log_message(team.name)}' (single member: {SecurityValidator.sanitize_log_message(email)})")
1882 # Delete team members first (should be just the owner)
1883 delete_team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.team_id == team.id)
1884 self.db.execute(delete_team_members_stmt)
1885 # Delete the team
1886 self.db.delete(team)
1887 else:
1888 # Multi-member team with no other owners - cannot delete user
1889 raise ValueError(f"Cannot delete user {email}: owns team '{team.name}' with {len(all_members)} members but no other owners to transfer ownership to")
1891 # Delete all role assignments for the user
1892 try:
1893 await self.role_service.delete_all_user_roles(email)
1894 except Exception as e:
1895 logger.warning(f"Failed to delete role assignments for {SecurityValidator.sanitize_log_message(email)}: {e}")
1897 # Reassign non-null audit FKs to another user so deleting this user does not
1898 # break referential integrity for historical records.
1899 replacement_row = self.db.query(EmailUser.email).filter(EmailUser.email != email).order_by(EmailUser.is_admin.desc(), EmailUser.created_at.asc()).first()
1900 replacement_email = replacement_row[0] if replacement_row else None
1902 if replacement_email:
1903 self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.invited_by == email).update({EmailTeamInvitation.invited_by: replacement_email}, synchronize_session=False)
1904 self.db.query(Role).filter(Role.created_by == email).update({Role.created_by: replacement_email}, synchronize_session=False)
1905 self.db.query(UserRole).filter(UserRole.granted_by == email).update({UserRole.granted_by: replacement_email}, synchronize_session=False)
1906 self.db.query(TokenRevocation).filter(TokenRevocation.revoked_by == email).update({TokenRevocation.revoked_by: replacement_email}, synchronize_session=False)
1908 # Nullify nullable actor references.
1909 self.db.query(EmailTeamMember).filter(EmailTeamMember.invited_by == email).update({EmailTeamMember.invited_by: None}, synchronize_session=False)
1910 self.db.query(EmailTeamMemberHistory).filter(EmailTeamMemberHistory.action_by == email).update({EmailTeamMemberHistory.action_by: None}, synchronize_session=False)
1911 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.reviewed_by == email).update({EmailTeamJoinRequest.reviewed_by: None}, synchronize_session=False)
1912 self.db.query(PendingUserApproval).filter(PendingUserApproval.approved_by == email).update({PendingUserApproval.approved_by: None}, synchronize_session=False)
1913 self.db.query(SSOAuthSession).filter(SSOAuthSession.user_email == email).update({SSOAuthSession.user_email: None}, synchronize_session=False)
1915 # Remove rows where this user is the primary subject.
1916 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == email).delete(synchronize_session=False)
1918 # Delete related auth events
1919 auth_events_stmt = delete(EmailAuthEvent).where(EmailAuthEvent.user_email == email)
1920 self.db.execute(auth_events_stmt)
1922 # Remove user from all team memberships
1923 team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.user_email == email)
1924 self.db.execute(team_members_stmt)
1926 # Delete the user
1927 self.db.delete(user)
1928 self.db.commit()
1930 await self._invalidate_deleted_user_auth_caches(email)
1932 logger.info(f"User {SecurityValidator.sanitize_log_message(email)} deleted permanently")
1933 return True
1935 except Exception as e:
1936 self.db.rollback()
1937 logger.error(f"Error deleting user {SecurityValidator.sanitize_log_message(email)}: {e}")
1938 raise
1940 async def count_active_admin_users(self) -> int:
1941 """Count the number of active admin users.
1943 Returns:
1944 int: Number of active admin users
1945 """
1946 stmt = select(func.count(EmailUser.email)).where(EmailUser.is_admin.is_(True), EmailUser.is_active.is_(True)) # pylint: disable=not-callable
1947 result = self.db.execute(stmt)
1948 return result.scalar() or 0
1950 async def is_last_active_admin(self, email: str) -> bool:
1951 """Check if the given user is the last active admin.
1953 Args:
1954 email: User's email address
1956 Returns:
1957 bool: True if this user is the last active admin
1958 """
1959 # First check if the user is an active admin
1960 stmt = select(EmailUser).where(EmailUser.email == email)
1961 result = self.db.execute(stmt)
1962 user = result.scalar_one_or_none()
1964 if not user or not user.is_admin or not user.is_active:
1965 return False
1967 # Count total active admins
1968 admin_count = await self.count_active_admin_users()
1969 return admin_count == 1