Coverage for mcpgateway / services / email_auth_service.py: 99%
727 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/email_auth_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Email Authentication Service.
8This module provides email-based user authentication services including
9user creation, authentication, password management, and security features.
11Examples:
12 Basic usage (requires async context):
13 from mcpgateway.services.email_auth_service import EmailAuthService
14 from mcpgateway.db import SessionLocal
16 with SessionLocal() as db:
17 service = EmailAuthService(db)
18 # Use in async context:
19 # user = await service.create_user("test@example.com", "password123")
20 # authenticated = await service.authenticate_user("test@example.com", "password123")
21"""
23# Standard
24import asyncio
25import base64
26from dataclasses import dataclass
27from datetime import datetime, timedelta, timezone
28import hashlib
29import hmac
30import re
31import secrets
32import time
33from typing import Optional
34import urllib.parse
35import warnings
37# Third-Party
38import orjson
39from sqlalchemy import and_, delete, desc, func, or_, select
40from sqlalchemy.exc import IntegrityError
41from sqlalchemy.orm import Session
43# First-Party
44from mcpgateway.config import settings
45from mcpgateway.db import (
46 EmailAuthEvent,
47 EmailTeam,
48 EmailTeamInvitation,
49 EmailTeamJoinRequest,
50 EmailTeamMember,
51 EmailTeamMemberHistory,
52 EmailUser,
53 PasswordResetToken,
54 PendingUserApproval,
55 Role,
56 SSOAuthSession,
57 TokenRevocation,
58 UserRole,
59 utc_now,
60)
61from mcpgateway.schemas import PaginationLinks, PaginationMeta
62from mcpgateway.services.argon2_service import Argon2PasswordService
63from mcpgateway.services.email_notification_service import AuthEmailNotificationService
64from mcpgateway.services.logging_service import LoggingService
65from mcpgateway.services.metrics import password_reset_completions_counter, password_reset_requests_counter
66from mcpgateway.utils.pagination import unified_paginate
68# Initialize logging
69logging_service = LoggingService()
70logger = logging_service.get_logger(__name__)
72# Strong references to background tasks to prevent GC before completion
73_background_tasks: set[asyncio.Task] = set()
75_GET_ALL_USERS_LIMIT = 10000
76_DUMMY_ARGON2_HASH = "$argon2id$v=19$m=65536,t=3,p=1$9x/nTs9D0R97+BI7BWP2Tg$V/40qCuaGh4i+94HpGpxJESEVs3IDpLzUqtNqRPuty4"
79@dataclass(frozen=True)
80class UsersListResult:
81 """Result for list_users queries."""
83 data: list[EmailUser]
84 next_cursor: Optional[str] = None
85 pagination: Optional[PaginationMeta] = None
86 links: Optional[PaginationLinks] = None
89@dataclass(frozen=True)
90class PasswordResetRequestResult:
91 """Result for forgot-password requests."""
93 rate_limited: bool
94 email_sent: bool
97class EmailValidationError(Exception):
98 """Raised when email format is invalid.
100 Examples:
101 >>> try:
102 ... raise EmailValidationError("Invalid email format")
103 ... except EmailValidationError as e:
104 ... str(e)
105 'Invalid email format'
106 """
109class PasswordValidationError(Exception):
110 """Raised when password doesn't meet policy requirements.
112 Examples:
113 >>> try:
114 ... raise PasswordValidationError("Password too short")
115 ... except PasswordValidationError as e:
116 ... str(e)
117 'Password too short'
118 """
121class UserExistsError(Exception):
122 """Raised when attempting to create a user that already exists.
124 Examples:
125 >>> try:
126 ... raise UserExistsError("User already exists")
127 ... except UserExistsError as e:
128 ... str(e)
129 'User already exists'
130 """
133class AuthenticationError(Exception):
134 """Raised when authentication fails.
136 Examples:
137 >>> try:
138 ... raise AuthenticationError("Invalid credentials")
139 ... except AuthenticationError as e:
140 ... str(e)
141 'Invalid credentials'
142 """
145class EmailAuthService:
146 """Service for email-based user authentication.
148 This service handles user registration, authentication, password management,
149 and security features like account lockout and failed attempt tracking.
151 Attributes:
152 db (Session): Database session
153 password_service (Argon2PasswordService): Password hashing service
155 Examples:
156 >>> from mcpgateway.db import SessionLocal
157 >>> with SessionLocal() as db:
158 ... service = EmailAuthService(db)
159 ... # Service is ready to use
160 """
162 get_all_users_deprecated_warned = False
164 def __init__(self, db: Session):
165 """Initialize the email authentication service.
167 Args:
168 db: SQLAlchemy database session
169 """
170 self.db = db
171 self.password_service = Argon2PasswordService()
172 self.email_notification_service = AuthEmailNotificationService()
173 self._role_service = None
174 logger.debug("EmailAuthService initialized")
176 @property
177 def role_service(self):
178 """Lazy-initialized RoleService to avoid circular imports.
180 Returns:
181 RoleService: Instance of RoleService
182 """
183 if self._role_service is None:
184 # First-Party
185 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
187 self._role_service = RoleService(self.db)
188 return self._role_service
190 def validate_email(self, email: str) -> bool:
191 """Validate email address format.
193 Args:
194 email: Email address to validate
196 Returns:
197 bool: True if email is valid
199 Raises:
200 EmailValidationError: If email format is invalid
202 Examples:
203 >>> service = EmailAuthService(None)
204 >>> service.validate_email("user@example.com")
205 True
206 >>> service.validate_email("test.user+tag@domain.co.uk")
207 True
208 >>> service.validate_email("user123@test-domain.com")
209 True
210 >>> try:
211 ... service.validate_email("invalid-email")
212 ... except EmailValidationError as e:
213 ... "Invalid email format" in str(e)
214 True
215 >>> try:
216 ... service.validate_email("")
217 ... except EmailValidationError as e:
218 ... "Email is required" in str(e)
219 True
220 >>> try:
221 ... service.validate_email("user@")
222 ... except EmailValidationError as e:
223 ... "Invalid email format" in str(e)
224 True
225 >>> try:
226 ... service.validate_email("@domain.com")
227 ... except EmailValidationError as e:
228 ... "Invalid email format" in str(e)
229 True
230 >>> try:
231 ... service.validate_email("user@domain")
232 ... except EmailValidationError as e:
233 ... "Invalid email format" in str(e)
234 True
235 >>> try:
236 ... service.validate_email("a" * 250 + "@domain.com")
237 ... except EmailValidationError as e:
238 ... "Email address too long" in str(e)
239 True
240 >>> try:
241 ... service.validate_email(None)
242 ... except EmailValidationError as e:
243 ... "Email is required" in str(e)
244 True
245 """
246 if not email or not isinstance(email, str):
247 raise EmailValidationError("Email is required and must be a string")
249 # Basic email regex pattern
250 email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
252 if not re.match(email_pattern, email):
253 raise EmailValidationError("Invalid email format")
255 if len(email) > 255:
256 raise EmailValidationError("Email address too long (max 255 characters)")
258 return True
260 def validate_password(self, password: str) -> bool:
261 """Validate password against policy requirements.
263 Args:
264 password: Password to validate
266 Returns:
267 bool: True if password meets policy
269 Raises:
270 PasswordValidationError: If password doesn't meet requirements
272 Examples:
273 >>> service = EmailAuthService(None)
274 >>> service.validate_password("Password123!") # Meets all requirements
275 True
276 >>> service.validate_password("ValidPassword123!")
277 True
278 >>> service.validate_password("Shortpass!") # 8+ chars with requirements
279 True
280 >>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!")
281 True
282 >>> try:
283 ... service.validate_password("")
284 ... except PasswordValidationError as e:
285 ... "Password is required" in str(e)
286 True
287 >>> try:
288 ... service.validate_password(None)
289 ... except PasswordValidationError as e:
290 ... "Password is required" in str(e)
291 True
292 >>> try:
293 ... service.validate_password("short") # Only 5 chars, should fail with default min_length=8
294 ... except PasswordValidationError as e:
295 ... "characters long" in str(e)
296 True
297 """
298 if not password:
299 raise PasswordValidationError("Password is required")
301 # Respect global toggle for password policy
302 if not getattr(settings, "password_policy_enabled", True):
303 return True
305 # Get password policy settings
306 min_length = getattr(settings, "password_min_length", 8)
307 require_uppercase = getattr(settings, "password_require_uppercase", False)
308 require_lowercase = getattr(settings, "password_require_lowercase", False)
309 require_numbers = getattr(settings, "password_require_numbers", False)
310 require_special = getattr(settings, "password_require_special", False)
312 if len(password) < min_length:
313 raise PasswordValidationError(f"Password must be at least {min_length} characters long")
315 if require_uppercase and not re.search(r"[A-Z]", password):
316 raise PasswordValidationError("Password must contain at least one uppercase letter")
318 if require_lowercase and not re.search(r"[a-z]", password):
319 raise PasswordValidationError("Password must contain at least one lowercase letter")
321 if require_numbers and not re.search(r"[0-9]", password):
322 raise PasswordValidationError("Password must contain at least one number")
324 if require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
325 raise PasswordValidationError("Password must contain at least one special character")
327 return True
329 @staticmethod
330 def _hash_reset_token(token: str) -> str:
331 """Hash a plaintext password-reset token using SHA-256.
333 Args:
334 token: Plaintext reset token.
336 Returns:
337 str: Hex-encoded SHA-256 digest.
338 """
339 return hashlib.sha256(token.encode("utf-8")).hexdigest()
341 @staticmethod
342 def _minimum_reset_response_seconds() -> float:
343 """Get minimum forgot-password response duration.
345 Returns:
346 float: Minimum response duration in seconds.
347 """
348 min_ms = max(0, int(getattr(settings, "password_reset_min_response_ms", 250)))
349 return min_ms / 1000.0
351 @staticmethod
352 def _minimum_login_failure_seconds() -> float:
353 """Get minimum failed-login response duration.
355 Returns:
356 float: Minimum failure response duration in seconds.
357 """
358 min_ms = max(0, int(getattr(settings, "failed_login_min_response_ms", 250)))
359 return min_ms / 1000.0
361 async def _apply_failed_login_floor(self, start_time: float) -> None:
362 """Apply minimum failed-login response duration.
364 Args:
365 start_time: Monotonic timestamp when login processing started.
366 """
367 remaining = self._minimum_login_failure_seconds() - (time.monotonic() - start_time)
368 if remaining > 0:
369 await asyncio.sleep(remaining)
371 async def _verify_dummy_password_for_timing(self, password: str) -> None:
372 """Run dummy Argon2 verification to reduce observable timing differences.
374 Args:
375 password: User-supplied password candidate.
376 """
377 try:
378 await self.password_service.verify_password_async(password, _DUMMY_ARGON2_HASH)
379 except Exception as exc: # nosec B110
380 logger.debug("Dummy password verification failed: %s", exc)
382 @staticmethod
383 def _build_forgot_password_url() -> str:
384 """Build the absolute forgot-password page URL.
386 Returns:
387 str: Absolute forgot-password URL.
388 """
389 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/")
390 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/")
391 return f"{app_domain}{root_path}/admin/forgot-password"
393 @staticmethod
394 def _build_reset_password_url(token: str) -> str:
395 """Build the absolute reset-password URL for a token.
397 Args:
398 token: Plaintext reset token.
400 Returns:
401 str: Absolute reset-password URL.
402 """
403 safe_token = urllib.parse.quote(token, safe="")
404 app_domain = str(getattr(settings, "app_domain", "http://localhost:4444")).rstrip("/")
405 root_path = str(getattr(settings, "app_root_path", "")).rstrip("/")
406 return f"{app_domain}{root_path}/admin/reset-password/{safe_token}"
408 async def _invalidate_user_auth_cache(self, email: str) -> None:
409 """Invalidate cached authentication data for a user.
411 Args:
412 email: User email for cache invalidation.
413 """
414 try:
415 # First-Party
416 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
418 await asyncio.wait_for(asyncio.shield(auth_cache.invalidate_user(email)), timeout=5.0)
419 except asyncio.TimeoutError:
420 logger.warning("Auth cache invalidation timed out for %s - continuing", email)
421 except Exception as cache_error: # nosec B110
422 logger.debug("Failed to invalidate auth cache for %s: %s", email, cache_error)
424 def _log_auth_event(
425 self,
426 event_type: str,
427 success: bool,
428 user_email: Optional[str],
429 ip_address: Optional[str] = None,
430 user_agent: Optional[str] = None,
431 failure_reason: Optional[str] = None,
432 details: Optional[dict] = None,
433 ) -> None:
434 """Persist a custom authentication/security event.
436 Args:
437 event_type: Event type identifier.
438 success: Whether the event succeeded.
439 user_email: Related user email, if available.
440 ip_address: Source IP address.
441 user_agent: Source user agent string.
442 failure_reason: Failure detail when `success` is False.
443 details: Additional structured event payload.
444 """
445 try:
446 event = EmailAuthEvent(
447 user_email=user_email,
448 event_type=event_type,
449 success=success,
450 ip_address=ip_address,
451 user_agent=user_agent,
452 failure_reason=failure_reason,
453 details=orjson.dumps(details).decode() if details else None,
454 )
455 self.db.add(event)
456 self.db.commit()
457 except Exception as exc:
458 self.db.rollback()
459 logger.warning("Failed to persist auth event %s for %s: %s", event_type, user_email, exc)
461 def _recent_password_reset_request_count(self, email: str, now: datetime) -> int:
462 """Count recent password-reset requests for rate limiting.
464 Args:
465 email: Email to count requests for.
466 now: Current UTC timestamp.
468 Returns:
469 int: Number of reset requests in the current rate-limit window.
470 """
471 window_minutes = int(getattr(settings, "password_reset_rate_window_minutes", 15))
472 window_start = now - timedelta(minutes=window_minutes)
473 stmt = (
474 select(func.count(EmailAuthEvent.id)) # pylint: disable=not-callable
475 .where(EmailAuthEvent.event_type == "PASSWORD_RESET_REQUESTED")
476 .where(EmailAuthEvent.user_email == email)
477 .where(EmailAuthEvent.timestamp >= window_start)
478 )
479 count = self.db.execute(stmt).scalar()
480 return int(count or 0)
482 async def get_user_by_email(self, email: str) -> Optional[EmailUser]:
483 """Get user by email address.
485 Args:
486 email: Email address to look up
488 Returns:
489 EmailUser or None if not found
491 Examples:
492 # Assuming database has user "test@example.com"
493 # user = await service.get_user_by_email("test@example.com")
494 # user.email if user else None # Returns: 'test@example.com'
495 """
496 try:
497 stmt = select(EmailUser).where(EmailUser.email == email.lower())
498 result = self.db.execute(stmt)
499 user = result.scalar_one_or_none()
500 return user
501 except Exception as e:
502 logger.error(f"Error getting user by email {email}: {e}")
503 return None
505 async def create_user(
506 self,
507 email: str,
508 password: str,
509 full_name: Optional[str] = None,
510 is_admin: bool = False,
511 is_active: bool = True,
512 password_change_required: bool = False,
513 auth_provider: str = "local",
514 skip_password_validation: bool = False,
515 granted_by: Optional[str] = None,
516 ) -> EmailUser:
517 """Create a new user with email authentication.
519 Args:
520 email: User's email address (primary key)
521 password: Plain text password (will be hashed)
522 full_name: Optional full name for display
523 is_admin: Whether user has admin privileges
524 is_active: Whether user account is active (default: True)
525 password_change_required: Whether user must change password on next login (default: False)
526 auth_provider: Authentication provider ('local', 'github', etc.)
527 skip_password_validation: Skip password policy validation (for bootstrap)
528 granted_by: Email of user creating this user (for role assignment audit trail)
530 Returns:
531 EmailUser: The created user object
533 Raises:
534 EmailValidationError: If email format is invalid
535 PasswordValidationError: If password doesn't meet policy
536 UserExistsError: If user already exists
538 Examples:
539 # user = await service.create_user(
540 # email="new@example.com",
541 # password="secure123",
542 # full_name="New User",
543 # is_active=True,
544 # password_change_required=False
545 # )
546 # user.email # Returns: 'new@example.com'
547 # user.full_name # Returns: 'New User'
548 # user.is_active # Returns: True
549 """
550 # Normalize email to lowercase
551 email = email.lower().strip()
553 # Validate inputs
554 self.validate_email(email)
555 if not skip_password_validation:
556 self.validate_password(password)
558 # Check if user already exists
559 existing_user = await self.get_user_by_email(email)
560 if existing_user:
561 raise UserExistsError(f"User with email {email} already exists")
563 # Hash the password
564 password_hash = await self.password_service.hash_password_async(password)
566 # Create new user (record password change timestamp)
567 user = EmailUser(
568 email=email,
569 password_hash=password_hash,
570 full_name=full_name,
571 is_admin=is_admin,
572 is_active=is_active,
573 password_change_required=password_change_required,
574 auth_provider=auth_provider,
575 password_changed_at=utc_now(),
576 admin_origin="api" if is_admin else None,
577 )
579 # Admin-created users are implicitly email-verified (the admin vouched for them)
580 if granted_by:
581 user.email_verified_at = utc_now()
583 try:
584 self.db.add(user)
585 self.db.commit()
586 self.db.refresh(user)
588 logger.info(f"Created new user: {email}")
590 # Create personal team first if enabled (needed for team-scoped role assignment)
591 personal_team_id = None
592 if getattr(settings, "auto_create_personal_teams", True):
593 try:
594 # Import here to avoid circular imports
595 # First-Party
596 from mcpgateway.services.personal_team_service import PersonalTeamService # pylint: disable=import-outside-toplevel
598 personal_team_service = PersonalTeamService(self.db)
599 personal_team = await personal_team_service.create_personal_team(user)
600 personal_team_id = personal_team.id # Get team_id directly from created team
601 logger.info(f"Created personal team '{personal_team.name}' (ID: {personal_team_id}) for user {email}")
602 except Exception as e:
603 logger.warning(f"Failed to create personal team for {email}: {e}")
604 # Don't fail user creation if personal team creation fails
606 # Auto-assign dual roles using RoleService (after personal team creation)
607 try:
608 granter = granted_by or email # Use granted_by if provided, otherwise self-granted
610 # Determine global role based on admin status
611 global_role_name = settings.default_admin_role if is_admin else settings.default_user_role
612 global_role = await self.role_service.get_role_by_name(global_role_name, "global")
614 if global_role:
615 try:
616 await self.role_service.assign_role_to_user(user_email=email, role_id=global_role.id, scope="global", scope_id=None, granted_by=granter)
617 logger.info(f"Assigned {global_role_name} role (global scope) to user {email}")
618 except ValueError as e:
619 logger.warning(f"Could not assign {global_role_name} role to {email}: {e}")
620 else:
621 logger.warning(f"{global_role_name} role not found. User {email} created without global role.")
623 # Assign team owner role with team scope (if personal team exists)
624 if personal_team_id:
625 team_owner_role_name = settings.default_team_owner_role
626 team_owner_role = await self.role_service.get_role_by_name(team_owner_role_name, "team")
628 if team_owner_role:
629 try:
630 await self.role_service.assign_role_to_user(user_email=email, role_id=team_owner_role.id, scope="team", scope_id=personal_team_id, granted_by=granter)
631 logger.info(f"Assigned {team_owner_role_name} role (team scope: {personal_team_id}) to user {email}")
632 except ValueError as e:
633 logger.warning(f"Could not assign {team_owner_role_name} role to {email}: {e}")
634 else:
635 logger.warning(f"{team_owner_role_name} role not found. User {email} created without team owner role.")
637 except Exception as role_error:
638 logger.error(f"Failed to assign roles to user {email}: {role_error}")
639 # Don't fail user creation if role assignment fails
640 # User can be assigned roles manually later
642 # Log registration event
643 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=True)
644 self.db.add(registration_event)
645 self.db.commit()
647 return user
649 except IntegrityError as e:
650 self.db.rollback()
651 logger.error(f"Database error creating user {email}: {e}")
652 raise UserExistsError(f"User with email {email} already exists") from e
653 except Exception as e:
654 self.db.rollback()
655 logger.error(f"Unexpected error creating user {email}: {e}")
657 # Log failed registration
658 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=False, failure_reason=str(e))
659 self.db.add(registration_event)
660 self.db.commit()
662 raise
664 async def authenticate_user(self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[EmailUser]:
665 """Authenticate a user with email and password.
667 Args:
668 email: User's email address
669 password: Plain text password
670 ip_address: Client IP address for logging
671 user_agent: Client user agent for logging
673 Returns:
674 EmailUser if authentication successful, None otherwise
676 Examples:
677 # user = await service.authenticate_user("user@example.com", "correct_password")
678 # user.email if user else None # Returns: 'user@example.com'
679 # await service.authenticate_user("user@example.com", "wrong_password") # Returns: None
680 """
681 email = email.lower().strip()
682 start_time = time.monotonic()
684 # Get user from database
685 user = await self.get_user_by_email(email)
687 # Track authentication attempt
688 auth_success = False
689 failure_reason = None
691 try:
692 if not user:
693 failure_reason = "User not found"
694 logger.info(f"Authentication failed for {email}: user not found")
695 await self._verify_dummy_password_for_timing(password)
696 await self._apply_failed_login_floor(start_time)
697 return None
699 if not user.is_active:
700 failure_reason = "Account is disabled"
701 logger.info(f"Authentication failed for {email}: account disabled")
702 await self._verify_dummy_password_for_timing(password)
703 await self._apply_failed_login_floor(start_time)
704 return None
706 is_protected_admin = user.is_admin and settings.protect_all_admins
708 # Enforce lockout for all accounts. Protected admins are allowed
709 # to continue attempting login (feature-flagged via protect_all_admins)
710 # but their failed attempts are still tracked for audit purposes.
711 if user.is_account_locked() and not is_protected_admin:
712 failure_reason = "Account is locked"
713 logger.info(f"Authentication failed for {email}: account locked")
714 await self._verify_dummy_password_for_timing(password)
715 await self._apply_failed_login_floor(start_time)
716 return None
718 # Verify password
719 if not await self.password_service.verify_password_async(password, user.password_hash):
720 failure_reason = "Invalid password"
722 # Always increment failed attempts — including for protected admins
723 max_attempts = getattr(settings, "max_failed_login_attempts", 5)
724 lockout_duration = getattr(settings, "account_lockout_duration_minutes", 30)
726 is_locked = user.increment_failed_attempts(max_attempts, lockout_duration)
728 if is_locked:
729 logger.warning(f"Account locked for {email} after {max_attempts} failed attempts")
730 failure_reason = "Account locked due to too many failed attempts"
731 lockout_notifications_enabled = getattr(settings, "account_lockout_notification_enabled", True)
732 if isinstance(lockout_notifications_enabled, bool) and lockout_notifications_enabled:
733 locked_until_iso = user.locked_until.isoformat() if user.locked_until else "unknown"
734 try:
735 await self.email_notification_service.send_account_lockout_email(
736 to_email=user.email,
737 full_name=user.full_name,
738 locked_until_iso=locked_until_iso,
739 reset_url=self._build_forgot_password_url(),
740 )
741 except Exception as email_exc:
742 logger.warning("Failed to send lockout notification for %s: %s", email, email_exc)
743 self._log_auth_event(
744 event_type="ACCOUNT_LOCKED",
745 success=True,
746 user_email=email,
747 ip_address=ip_address,
748 user_agent=user_agent,
749 details={"locked_until": user.locked_until.isoformat() if user.locked_until else None},
750 )
752 self.db.commit()
753 logger.info(f"Authentication failed for {email}: invalid password")
754 await self._apply_failed_login_floor(start_time)
755 return None
757 # Authentication successful
758 user.reset_failed_attempts()
759 self.db.commit()
761 auth_success = True
762 logger.info(f"Authentication successful for {email}")
764 return user
766 finally:
767 # Log authentication event
768 auth_event = EmailAuthEvent.create_login_attempt(user_email=email, success=auth_success, ip_address=ip_address, user_agent=user_agent, failure_reason=failure_reason)
769 self.db.add(auth_event)
770 self.db.commit()
772 async def request_password_reset(self, email: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetRequestResult:
773 """Create a password reset token and send reset email when user exists.
775 The function intentionally returns generic outcomes to avoid account
776 enumeration while still allowing rate-limit enforcement.
778 Args:
779 email: User email requesting password reset.
780 ip_address: Source IP address.
781 user_agent: Source user agent string.
783 Returns:
784 PasswordResetRequestResult: Reset request processing outcome.
785 """
786 start_time = time.monotonic()
787 normalized_email = (email or "").lower().strip()
788 now = utc_now()
789 _ = self._hash_reset_token(secrets.token_urlsafe(32))
791 rate_limit = int(getattr(settings, "password_reset_rate_limit", 5))
792 is_rate_limited = bool(normalized_email and self._recent_password_reset_request_count(normalized_email, now) >= rate_limit)
793 if is_rate_limited:
794 password_reset_requests_counter.labels(outcome="rate_limited").inc()
795 self._log_auth_event(
796 event_type="PASSWORD_RESET_RATE_LIMITED",
797 success=False,
798 user_email=normalized_email or None,
799 ip_address=ip_address,
800 user_agent=user_agent,
801 )
802 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time)
803 if remaining > 0:
804 await asyncio.sleep(remaining)
805 return PasswordResetRequestResult(rate_limited=True, email_sent=False)
807 user = await self.get_user_by_email(normalized_email) if normalized_email else None
808 self._log_auth_event(
809 event_type="PASSWORD_RESET_REQUESTED",
810 success=True,
811 user_email=normalized_email or None,
812 ip_address=ip_address,
813 user_agent=user_agent,
814 )
816 email_sent = False
817 if user and user.is_active:
818 token_plaintext = secrets.token_urlsafe(48)
819 token_hash = self._hash_reset_token(token_plaintext)
820 expires_minutes = int(getattr(settings, "password_reset_token_expiry_minutes", 60))
821 expires_at = now + timedelta(minutes=expires_minutes)
823 existing_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.used_at.is_(None)).where(PasswordResetToken.expires_at > now)
824 for existing in self.db.execute(existing_stmt).scalars().all():
825 existing.used_at = now
827 token_record = PasswordResetToken(
828 user_email=user.email,
829 token_hash=token_hash,
830 expires_at=expires_at,
831 ip_address=ip_address,
832 user_agent=user_agent,
833 )
834 self.db.add(token_record)
835 self.db.commit()
837 try:
838 email_sent = await self.email_notification_service.send_password_reset_email(
839 to_email=user.email,
840 full_name=user.full_name,
841 reset_url=self._build_reset_password_url(token_plaintext),
842 expires_minutes=expires_minutes,
843 )
844 except Exception as exc:
845 logger.warning("Failed to send password reset email to %s: %s", user.email, exc)
847 password_reset_requests_counter.labels(outcome="accepted").inc()
848 self._log_auth_event(
849 event_type="PASSWORD_RESET_EMAIL_SENT",
850 success=True,
851 user_email=user.email,
852 ip_address=ip_address,
853 user_agent=user_agent,
854 details={"token_hash": token_hash, "expires_at": expires_at.isoformat(), "email_sent": email_sent},
855 )
856 else:
857 password_reset_requests_counter.labels(outcome="accepted").inc()
859 remaining = self._minimum_reset_response_seconds() - (time.monotonic() - start_time)
860 if remaining > 0:
861 await asyncio.sleep(remaining)
862 return PasswordResetRequestResult(rate_limited=False, email_sent=email_sent)
864 async def validate_password_reset_token(self, token: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> PasswordResetToken:
865 """Validate a one-time password reset token.
867 Args:
868 token: Plaintext password reset token.
869 ip_address: Source IP address.
870 user_agent: Source user agent string.
872 Returns:
873 PasswordResetToken: Matching valid reset token record.
875 Raises:
876 AuthenticationError: If token is missing, invalid, used, or expired.
877 """
878 if not token:
879 password_reset_completions_counter.labels(outcome="invalid_token").inc()
880 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Missing token")
881 raise AuthenticationError("This reset link is invalid")
883 token_hash = self._hash_reset_token(token)
884 stmt = select(PasswordResetToken).where(PasswordResetToken.token_hash == token_hash)
885 reset_token = self.db.execute(stmt).scalar_one_or_none()
887 if not reset_token:
888 password_reset_completions_counter.labels(outcome="invalid_token").inc()
889 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, None, ip_address, user_agent, failure_reason="Invalid token hash")
890 raise AuthenticationError("This reset link is invalid")
892 if not hmac.compare_digest(reset_token.token_hash, token_hash):
893 password_reset_completions_counter.labels(outcome="invalid_token").inc()
894 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token hash mismatch")
895 raise AuthenticationError("This reset link is invalid")
897 if reset_token.is_used():
898 password_reset_completions_counter.labels(outcome="used_token").inc()
899 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token already used")
900 raise AuthenticationError("This reset link has already been used")
902 if reset_token.is_expired():
903 password_reset_completions_counter.labels(outcome="expired_token").inc()
904 self._log_auth_event("PASSWORD_RESET_TOKEN_EXPIRED", False, reset_token.user_email, ip_address, user_agent, details={"token_hash": token_hash})
905 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", False, reset_token.user_email, ip_address, user_agent, failure_reason="Token expired")
906 raise AuthenticationError("This reset link has expired")
908 self._log_auth_event("PASSWORD_RESET_ATTEMPTED", True, reset_token.user_email, ip_address, user_agent)
909 return reset_token
911 async def reset_password_with_token(self, token: str, new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool:
912 """Complete password reset using a validated one-time token.
914 Args:
915 token: Plaintext password reset token.
916 new_password: New password value.
917 ip_address: Source IP address.
918 user_agent: Source user agent string.
920 Returns:
921 bool: True when password reset completed successfully.
923 Raises:
924 AuthenticationError: If token or associated user is invalid.
925 PasswordValidationError: If new password violates policy or reuse checks.
926 """
927 reset_token = await self.validate_password_reset_token(token, ip_address=ip_address, user_agent=user_agent)
928 user = await self.get_user_by_email(reset_token.user_email)
929 if not user or not user.is_active:
930 password_reset_completions_counter.labels(outcome="invalid_user").inc()
931 raise AuthenticationError("This reset link is invalid")
933 self.validate_password(new_password)
934 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash):
935 password_reset_completions_counter.labels(outcome="reused_password").inc()
936 raise PasswordValidationError("New password must be different from current password")
938 now = utc_now()
939 user.password_hash = await self.password_service.hash_password_async(new_password)
940 user.password_change_required = False
941 user.password_changed_at = now
942 user.failed_login_attempts = 0
943 user.locked_until = None
945 reset_token.used_at = now
946 outstanding_stmt = select(PasswordResetToken).where(PasswordResetToken.user_email == user.email).where(PasswordResetToken.id != reset_token.id).where(PasswordResetToken.used_at.is_(None))
947 for outstanding in self.db.execute(outstanding_stmt).scalars().all():
948 outstanding.used_at = now
950 self.db.commit()
952 if getattr(settings, "password_reset_invalidate_sessions", True):
953 await self._invalidate_user_auth_cache(user.email)
955 email_sent = False
956 try:
957 email_sent = await self.email_notification_service.send_password_reset_confirmation_email(to_email=user.email, full_name=user.full_name)
958 except Exception as exc:
959 logger.warning("Failed to send password reset confirmation for %s: %s", user.email, exc)
961 password_reset_completions_counter.labels(outcome="success").inc()
962 self._log_auth_event(
963 event_type="PASSWORD_RESET_COMPLETED",
964 success=True,
965 user_email=user.email,
966 ip_address=ip_address,
967 user_agent=user_agent,
968 details={"email_sent": email_sent},
969 )
970 return True
972 async def unlock_user_account(self, email: str, unlocked_by: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> EmailUser:
973 """Clear lockout state for a user account.
975 Args:
976 email: User email to unlock.
977 unlocked_by: Admin/user identifier who performed unlock.
978 ip_address: Source IP address.
979 user_agent: Source user agent string.
981 Returns:
982 EmailUser: Updated user record after unlock.
984 Raises:
985 ValueError: If the target user cannot be found.
986 """
987 normalized_email = email.lower().strip()
988 user = await self.get_user_by_email(normalized_email)
989 if not user:
990 raise ValueError(f"User {normalized_email} not found")
992 user.failed_login_attempts = 0
993 user.locked_until = None
994 user.updated_at = utc_now()
995 self.db.commit()
996 self._log_auth_event(
997 event_type="ACCOUNT_UNLOCKED",
998 success=True,
999 user_email=user.email,
1000 ip_address=ip_address,
1001 user_agent=user_agent,
1002 details={"unlocked_by": unlocked_by},
1003 )
1004 return user
1006 async def change_password(self, email: str, old_password: Optional[str], new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool:
1007 """Change a user's password.
1009 Args:
1010 email: User's email address
1011 old_password: Current password for verification
1012 new_password: New password to set
1013 ip_address: Client IP address for logging
1014 user_agent: Client user agent for logging
1016 Returns:
1017 bool: True if password changed successfully
1019 Raises:
1020 AuthenticationError: If old password is incorrect
1021 PasswordValidationError: If new password doesn't meet policy
1022 Exception: If database operation fails
1024 Examples:
1025 # success = await service.change_password(
1026 # "user@example.com",
1027 # "old_password",
1028 # "new_secure_password"
1029 # )
1030 # success # Returns: True
1031 """
1032 # Validate old password is provided
1033 if old_password is None:
1034 raise AuthenticationError("Current password is required")
1036 # First authenticate with old password
1037 user = await self.authenticate_user(email, old_password, ip_address, user_agent)
1038 if not user:
1039 raise AuthenticationError("Current password is incorrect")
1041 # Validate new password
1042 self.validate_password(new_password)
1044 # Check if new password is same as old (optional policy)
1045 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash):
1046 raise PasswordValidationError("New password must be different from current password")
1048 success = False
1049 try:
1050 # Hash new password and update
1051 new_password_hash = await self.password_service.hash_password_async(new_password)
1052 user.password_hash = new_password_hash
1053 # Clear the flag that requires the user to change password
1054 user.password_change_required = False
1055 # Record the password change timestamp
1056 try:
1057 user.password_changed_at = utc_now()
1058 except Exception as exc:
1059 logger.debug("Failed to set password_changed_at for %s: %s", email, exc)
1061 self.db.commit()
1062 success = True
1064 # Invalidate auth cache for user
1065 try:
1066 await self._invalidate_user_auth_cache(email)
1067 except Exception as cache_error: # nosec B110 - best effort cache invalidation
1068 logger.debug("Failed to invalidate auth cache on password change: %s", cache_error)
1070 logger.info(f"Password changed successfully for {email}")
1072 except Exception as e:
1073 self.db.rollback()
1074 logger.error(f"Error changing password for {email}: {e}")
1075 raise
1076 finally:
1077 # Log password change event
1078 password_event = EmailAuthEvent.create_password_change_event(user_email=email, success=success, ip_address=ip_address, user_agent=user_agent)
1079 self.db.add(password_event)
1080 self.db.commit()
1082 return success
1084 async def create_platform_admin(self, email: str, password: str, full_name: Optional[str] = None) -> EmailUser:
1085 """Create or update the platform administrator user.
1087 This method is used during system bootstrap to create the initial
1088 admin user from environment variables.
1090 Args:
1091 email: Admin email address
1092 password: Admin password
1093 full_name: Admin full name
1095 Returns:
1096 EmailUser: The admin user
1098 Examples:
1099 # admin = await service.create_platform_admin(
1100 # "admin@example.com",
1101 # "admin_password",
1102 # "Platform Administrator"
1103 # )
1104 # admin.is_admin # Returns: True
1105 """
1106 # Check if admin user already exists
1107 existing_admin = await self.get_user_by_email(email)
1109 if existing_admin:
1110 # Update existing admin if password or name changed
1111 if full_name and existing_admin.full_name != full_name:
1112 existing_admin.full_name = full_name
1114 # Check if password needs update (verify current password first)
1115 if not await self.password_service.verify_password_async(password, existing_admin.password_hash):
1116 existing_admin.password_hash = await self.password_service.hash_password_async(password)
1117 try:
1118 existing_admin.password_changed_at = utc_now()
1119 except Exception as exc:
1120 logger.debug("Failed to set password_changed_at for existing admin %s: %s", email, exc)
1122 # Ensure admin status
1123 existing_admin.is_admin = True
1124 existing_admin.is_active = True
1126 self.db.commit()
1127 logger.info(f"Updated platform admin user: {email}")
1128 return existing_admin
1130 # Create new admin user - skip password validation during bootstrap
1131 admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True)
1133 logger.info(f"Created platform admin user: {email}")
1134 return admin_user
1136 async def update_last_login(self, email: str) -> None:
1137 """Update the last login timestamp for a user.
1139 Args:
1140 email: User's email address
1141 """
1142 user = await self.get_user_by_email(email)
1143 if user:
1144 user.reset_failed_attempts() # This also updates last_login
1145 self.db.commit()
1147 @staticmethod
1148 def _escape_like(value: str) -> str:
1149 """Escape LIKE wildcards for prefix search.
1151 Args:
1152 value: Raw value to escape for LIKE matching.
1154 Returns:
1155 Escaped string safe for LIKE patterns.
1156 """
1157 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
1159 async def list_users(
1160 self,
1161 limit: Optional[int] = None,
1162 cursor: Optional[str] = None,
1163 page: Optional[int] = None,
1164 per_page: Optional[int] = None,
1165 search: Optional[str] = None,
1166 ) -> UsersListResult:
1167 """List all users with cursor or page-based pagination support and optional search.
1169 This method supports both cursor-based (for API endpoints with large datasets)
1170 and page-based (for admin UI with page numbers) pagination, with optional
1171 search filtering by email or full name.
1173 Note: This method returns ORM objects and cannot be cached since callers
1174 depend on ORM attributes and methods (e.g., EmailUserResponse.from_email_user).
1176 Args:
1177 limit: Maximum number of users to return (for cursor-based pagination)
1178 cursor: Opaque cursor token for cursor-based pagination
1179 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1180 per_page: Items per page for page-based pagination
1181 search: Optional search term to filter by email or full name (case-insensitive)
1183 Returns:
1184 UsersListResult with data and optional pagination metadata.
1186 Examples:
1187 # Cursor-based pagination (for APIs)
1188 # result = await service.list_users(cursor=None, limit=50)
1189 # len(result.data) <= 50 # Returns: True
1191 # Page-based pagination (for admin UI)
1192 # result = await service.list_users(page=1, per_page=10)
1193 # result.data # Returns: list of users
1194 # result.pagination # Returns: pagination metadata
1196 # Search users
1197 # users = await service.list_users(search="john", page=1, per_page=10)
1198 # All users with "john" in email or name
1199 """
1200 try:
1201 # Build base query with ordering by created_at, email for consistent pagination
1202 # Note: EmailUser uses email as primary key, not id
1203 query = select(EmailUser).order_by(desc(EmailUser.created_at), desc(EmailUser.email))
1205 # Apply search filter if provided (prefix search for better index usage)
1206 if search and search.strip():
1207 search_term = f"{self._escape_like(search.strip())}%"
1208 # NOTE: For large Postgres datasets, consider citext or functional indexes for case-insensitive search.
1209 query = query.where(
1210 or_(
1211 EmailUser.email.ilike(search_term, escape="\\"),
1212 EmailUser.full_name.ilike(search_term, escape="\\"),
1213 )
1214 )
1216 # Page-based pagination: use unified_paginate
1217 if page is not None:
1218 pag_result = await unified_paginate(
1219 db=self.db,
1220 query=query,
1221 page=page,
1222 per_page=per_page,
1223 cursor=None,
1224 limit=None,
1225 base_url="/admin/users",
1226 query_params={},
1227 )
1228 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"])
1230 # Cursor-based pagination: custom implementation for EmailUser
1231 # EmailUser uses email as PK (not id), so we need custom cursor using (created_at, email)
1232 page_size = limit if limit and limit > 0 else settings.pagination_default_page_size
1233 if limit == 0:
1234 page_size = None # No limit
1236 # Decode cursor and apply keyset filter if provided
1237 if cursor:
1238 try:
1239 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
1240 cursor_data = orjson.loads(cursor_json)
1241 last_email = cursor_data.get("email")
1242 created_str = cursor_data.get("created_at")
1243 if last_email and created_str:
1244 last_created = datetime.fromisoformat(created_str)
1245 # Apply keyset filter (assumes DESC order on created_at, email)
1246 query = query.where(
1247 or_(
1248 EmailUser.created_at < last_created,
1249 and_(EmailUser.created_at == last_created, EmailUser.email < last_email),
1250 )
1251 )
1252 except (ValueError, TypeError) as e:
1253 logger.warning(f"Invalid cursor for user pagination, ignoring: {e}")
1255 # Fetch page_size + 1 to determine if there are more results
1256 if page_size is not None:
1257 query = query.limit(page_size + 1)
1258 result = self.db.execute(query)
1259 users = list(result.scalars().all())
1261 if page_size is None:
1262 return UsersListResult(data=users, next_cursor=None)
1264 # Check if there are more results
1265 has_more = len(users) > page_size
1266 if has_more:
1267 users = users[:page_size]
1269 # Generate next cursor using (created_at, email) for EmailUser
1270 next_cursor = None
1271 if has_more and users:
1272 last_user = users[-1]
1273 cursor_data = {
1274 "created_at": last_user.created_at.isoformat() if last_user.created_at else None,
1275 "email": last_user.email,
1276 }
1277 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
1279 return UsersListResult(data=users, next_cursor=next_cursor)
1281 except Exception as e:
1282 logger.error(f"Error listing users: {e}")
1283 # Return appropriate empty response based on pagination mode
1284 if page is not None:
1285 fallback_per_page = per_page or 50
1286 return UsersListResult(
1287 data=[],
1288 pagination=PaginationMeta(page=page, per_page=fallback_per_page, total_items=0, total_pages=0, has_next=False, has_prev=False),
1289 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg
1290 self=f"/admin/users?page=1&per_page={fallback_per_page}",
1291 first=f"/admin/users?page=1&per_page={fallback_per_page}",
1292 last=f"/admin/users?page=1&per_page={fallback_per_page}",
1293 ),
1294 )
1296 if cursor is not None:
1297 return UsersListResult(data=[], next_cursor=None)
1299 return UsersListResult(data=[])
1301 async def list_users_not_in_team(
1302 self,
1303 team_id: str,
1304 cursor: Optional[str] = None,
1305 limit: Optional[int] = None,
1306 page: Optional[int] = None,
1307 per_page: Optional[int] = None,
1308 search: Optional[str] = None,
1309 ) -> UsersListResult:
1310 """List users who are NOT members of the specified team with cursor or page-based pagination.
1312 Uses a NOT IN subquery to efficiently exclude team members.
1314 Args:
1315 team_id: ID of the team to exclude members from
1316 cursor: Opaque cursor token for cursor-based pagination
1317 limit: Maximum number of users to return (for cursor-based, default: 50)
1318 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1319 per_page: Items per page for page-based pagination (default: 30)
1320 search: Optional search term to filter by email or full name
1322 Returns:
1323 UsersListResult with data and either cursor or pagination metadata
1325 Examples:
1326 # Page-based (admin UI)
1327 # result = await service.list_users_not_in_team("team-123", page=1, per_page=30)
1328 # result.pagination # Returns: pagination metadata
1330 # Cursor-based (API)
1331 # result = await service.list_users_not_in_team("team-123", cursor=None, limit=50)
1332 # result.next_cursor # Returns: next cursor token
1333 """
1334 try:
1335 # Build base query
1336 query = select(EmailUser)
1338 # Apply search filter if provided
1339 if search and search.strip():
1340 search_term = f"{self._escape_like(search.strip())}%"
1341 query = query.where(
1342 or_(
1343 EmailUser.email.ilike(search_term, escape="\\"),
1344 EmailUser.full_name.ilike(search_term, escape="\\"),
1345 )
1346 )
1348 # Exclude team members using NOT IN subquery
1349 member_emails_subquery = select(EmailTeamMember.user_email).where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
1350 query = query.where(EmailUser.is_active.is_(True), ~EmailUser.email.in_(member_emails_subquery))
1352 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate
1353 if page is not None:
1354 query = query.order_by(EmailUser.full_name, EmailUser.email)
1355 pag_result = await unified_paginate(
1356 db=self.db,
1357 query=query,
1358 page=page,
1359 per_page=per_page or 30,
1360 cursor=None,
1361 limit=None,
1362 base_url=f"/admin/teams/{team_id}/non-members",
1363 query_params={},
1364 )
1365 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"])
1367 # CURSOR-BASED PAGINATION - custom implementation using (created_at, email)
1368 # unified_paginate uses (created_at, id) but EmailUser uses email as PK
1369 query = query.order_by(desc(EmailUser.created_at), desc(EmailUser.email))
1371 # Decode cursor and apply keyset filter
1372 if cursor:
1373 try:
1374 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
1375 cursor_data = orjson.loads(cursor_json)
1376 last_email = cursor_data.get("email")
1377 created_str = cursor_data.get("created_at")
1378 if last_email and created_str:
1379 last_created = datetime.fromisoformat(created_str)
1380 # Keyset filter: (created_at < last) OR (created_at = last AND email < last_email)
1381 query = query.where(
1382 or_(
1383 EmailUser.created_at < last_created,
1384 and_(EmailUser.created_at == last_created, EmailUser.email < last_email),
1385 )
1386 )
1387 except (ValueError, TypeError) as e:
1388 logger.warning(f"Invalid cursor for non-members list, ignoring: {e}")
1390 # Fetch limit + 1 to check for more results
1391 page_size = limit or 50
1392 query = query.limit(page_size + 1)
1393 users = list(self.db.execute(query).scalars().all())
1395 # Check if there are more results
1396 has_more = len(users) > page_size
1397 if has_more:
1398 users = users[:page_size]
1400 # Generate next cursor using (created_at, email)
1401 next_cursor = None
1402 if has_more and users:
1403 last_user = users[-1]
1404 cursor_data = {
1405 "created_at": last_user.created_at.isoformat() if last_user.created_at else None,
1406 "email": last_user.email,
1407 }
1408 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
1410 self.db.commit()
1411 return UsersListResult(data=users, next_cursor=next_cursor)
1413 except Exception as e:
1414 logger.error(f"Error listing non-members for team {team_id}: {e}")
1416 # Return appropriate empty response based on mode
1417 if page is not None:
1418 return UsersListResult(
1419 data=[],
1420 pagination=PaginationMeta(page=page, per_page=per_page or 30, total_items=0, total_pages=0, has_next=False, has_prev=False),
1421 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg
1422 self=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
1423 first=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
1424 last=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
1425 ),
1426 )
1428 return UsersListResult(data=[], next_cursor=None)
1430 async def get_all_users(self) -> list[EmailUser]:
1431 """Get all users without pagination.
1433 .. deprecated:: 1.0
1434 Use :meth:`list_users` with proper pagination instead.
1435 This method has a hardcoded limit of 10,000 users and will not return
1436 more than that. For production systems with many users, use paginated
1437 access with search/filtering.
1439 Returns:
1440 List of up to 10,000 EmailUser objects
1442 Raises:
1443 ValueError: If total users exceed 10,000
1445 Examples:
1446 # users = await service.get_all_users()
1447 # isinstance(users, list) # Returns: True
1449 Warning:
1450 This method is deprecated and will be removed in a future version.
1451 Use list_users() with pagination instead:
1453 # For small datasets
1454 users = await service.list_users(page=1, per_page=1000).data
1456 # For searching
1457 users = await service.list_users(search="john", page=1, per_page=10).data
1458 """
1459 if not self.__class__.get_all_users_deprecated_warned:
1460 warnings.warn(
1461 "get_all_users() is deprecated and limited to 10,000 users. " + "Use list_users() with pagination instead.",
1462 DeprecationWarning,
1463 stacklevel=2,
1464 )
1465 self.__class__.get_all_users_deprecated_warned = True
1467 total_users = await self.count_users()
1468 if total_users > _GET_ALL_USERS_LIMIT:
1469 raise ValueError("get_all_users() supports up to 10,000 users. Use list_users() pagination instead.")
1471 result = await self.list_users(limit=_GET_ALL_USERS_LIMIT)
1472 return result.data # Large limit to get all users
1474 async def count_users(self) -> int:
1475 """Count total number of users.
1477 Returns:
1478 int: Total user count
1479 """
1480 try:
1481 stmt = select(func.count(EmailUser.email)) # pylint: disable=not-callable
1482 count = self.db.execute(stmt).scalar() or 0
1483 return count
1484 except Exception as e:
1485 logger.error(f"Error counting users: {e}")
1486 return 0
1488 async def get_auth_events(self, email: Optional[str] = None, limit: int = 100, offset: int = 0) -> list[EmailAuthEvent]:
1489 """Get authentication events for auditing.
1491 Args:
1492 email: Filter by specific user email (optional)
1493 limit: Maximum number of events to return
1494 offset: Number of events to skip
1496 Returns:
1497 List of EmailAuthEvent objects
1498 """
1499 try:
1500 stmt = select(EmailAuthEvent)
1501 if email:
1502 stmt = stmt.where(EmailAuthEvent.user_email == email)
1503 stmt = stmt.order_by(EmailAuthEvent.timestamp.desc()).offset(offset).limit(limit)
1505 result = self.db.execute(stmt)
1506 events = list(result.scalars().all())
1507 return events
1508 except Exception as e:
1509 logger.error(f"Error getting auth events: {e}")
1510 return []
1512 async def update_user(
1513 self,
1514 email: str,
1515 full_name: Optional[str] = None,
1516 is_admin: Optional[bool] = None,
1517 is_active: Optional[bool] = None,
1518 email_verified: Optional[bool] = None,
1519 password_change_required: Optional[bool] = None,
1520 password: Optional[str] = None,
1521 admin_origin_source: Optional[str] = None,
1522 ) -> EmailUser:
1523 """Update user information.
1525 Args:
1526 email: User's email address (primary key)
1527 full_name: New full name (optional)
1528 is_admin: New admin status (optional)
1529 is_active: New active status (optional)
1530 email_verified: Set email verification status (optional)
1531 password_change_required: Whether user must change password on next login (optional)
1532 password: New password (optional, will be hashed)
1533 admin_origin_source: Source of admin change for tracking (e.g. "api", "ui"). Callers should pass explicitly.
1535 Returns:
1536 EmailUser: Updated user object
1538 Raises:
1539 ValueError: If user doesn't exist, if protect_all_admins blocks the change, or if it would remove the last active admin
1540 PasswordValidationError: If password doesn't meet policy
1541 """
1542 try:
1543 # Normalize email to match create_user() / get_user_by_email() behavior
1544 email = email.lower().strip()
1546 # Get existing user
1547 stmt = select(EmailUser).where(EmailUser.email == email)
1548 result = self.db.execute(stmt)
1549 user = result.scalar_one_or_none()
1551 if not user:
1552 raise ValueError(f"User {email} not found")
1554 # Admin protection guard
1555 if user.is_admin and user.is_active:
1556 would_lose_admin = (is_admin is not None and not is_admin) or (is_active is not None and not is_active)
1557 if would_lose_admin:
1558 if settings.protect_all_admins:
1559 raise ValueError("Admin protection is enabled — cannot demote or deactivate any admin user")
1560 if await self.is_last_active_admin(email):
1561 raise ValueError("Cannot demote or deactivate the last remaining active admin user")
1563 # Update fields if provided
1564 if full_name is not None:
1565 user.full_name = full_name
1567 if email_verified is not None:
1568 user.email_verified_at = utc_now() if email_verified else None
1570 if is_admin is not None:
1571 # Track admin_origin when status actually changes
1572 if is_admin != user.is_admin:
1573 user.is_admin = is_admin
1574 user.admin_origin = admin_origin_source if is_admin else None
1576 # Sync global role assignment with is_admin flag:
1577 # Promotion: revoke default_user_role, assign default_admin_role
1578 # Demotion: revoke default_admin_role, assign default_user_role
1579 try:
1580 admin_role_name = settings.default_admin_role
1581 user_role_name = settings.default_user_role
1582 admin_role = await self.role_service.get_role_by_name(admin_role_name, "global")
1583 user_role = await self.role_service.get_role_by_name(user_role_name, "global")
1585 if is_admin:
1586 # Promotion: assign admin role, revoke user role
1587 if admin_role:
1588 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=admin_role.id, scope="global", scope_id=None)
1589 if not existing or not existing.is_active:
1590 await self.role_service.assign_role_to_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None, granted_by=email)
1591 logger.info(f"Assigned {admin_role_name} role to {email}")
1592 else:
1593 logger.warning(f"{admin_role_name} role not found, cannot assign to {email}")
1595 if user_role:
1596 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None)
1597 if revoked:
1598 logger.info(f"Revoked {user_role_name} role from {email}")
1599 else:
1600 # Demotion: revoke admin role, assign user role
1601 if admin_role:
1602 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=admin_role.id, scope="global", scope_id=None)
1603 if revoked:
1604 logger.info(f"Revoked {admin_role_name} role from {email}")
1606 if user_role:
1607 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=user_role.id, scope="global", scope_id=None)
1608 if not existing or not existing.is_active:
1609 await self.role_service.assign_role_to_user(user_email=email, role_id=user_role.id, scope="global", scope_id=None, granted_by=email)
1610 logger.info(f"Assigned {user_role_name} role to {email}")
1611 else:
1612 logger.warning(f"{user_role_name} role not found, cannot assign to {email}")
1614 except Exception as e:
1615 logger.warning(f"Failed to sync global roles for {email}: {e}")
1616 # Don't fail user update if role sync fails
1618 if is_active is not None:
1619 user.is_active = is_active
1621 if password is not None:
1622 self.validate_password(password)
1623 user.password_hash = await self.password_service.hash_password_async(password)
1624 # Only clear password_change_required if it wasn't explicitly set
1625 if password_change_required is None:
1626 user.password_change_required = False
1627 user.password_changed_at = utc_now()
1629 # Set password_change_required after password processing to allow explicit override
1630 if password_change_required is not None:
1631 user.password_change_required = password_change_required
1633 user.updated_at = datetime.now(timezone.utc)
1635 self.db.commit()
1637 return user
1639 except Exception as e:
1640 self.db.rollback()
1641 logger.error(f"Error updating user {email}: {e}")
1642 raise
1644 async def activate_user(self, email: str) -> EmailUser:
1645 """Activate a user account.
1647 Args:
1648 email: User's email address
1650 Returns:
1651 EmailUser: Updated user object
1653 Raises:
1654 ValueError: If user doesn't exist
1655 """
1656 try:
1657 stmt = select(EmailUser).where(EmailUser.email == email)
1658 result = self.db.execute(stmt)
1659 user = result.scalar_one_or_none()
1661 if not user:
1662 raise ValueError(f"User {email} not found")
1664 user.is_active = True
1665 user.updated_at = datetime.now(timezone.utc)
1667 self.db.commit()
1669 logger.info(f"User {email} activated")
1670 return user
1672 except Exception as e:
1673 self.db.rollback()
1674 logger.error(f"Error activating user {email}: {e}")
1675 raise
1677 async def deactivate_user(self, email: str) -> EmailUser:
1678 """Deactivate a user account.
1680 Args:
1681 email: User's email address
1683 Returns:
1684 EmailUser: Updated user object
1686 Raises:
1687 ValueError: If user doesn't exist
1688 """
1689 try:
1690 stmt = select(EmailUser).where(EmailUser.email == email)
1691 result = self.db.execute(stmt)
1692 user = result.scalar_one_or_none()
1694 if not user:
1695 raise ValueError(f"User {email} not found")
1697 user.is_active = False
1698 user.updated_at = datetime.now(timezone.utc)
1700 self.db.commit()
1702 logger.info(f"User {email} deactivated")
1703 return user
1705 except Exception as e:
1706 self.db.rollback()
1707 logger.error(f"Error deactivating user {email}: {e}")
1708 raise
1710 async def delete_user(self, email: str) -> bool:
1711 """Delete a user account permanently.
1713 Args:
1714 email: User's email address
1716 Returns:
1717 bool: True if user was deleted
1719 Raises:
1720 ValueError: If user doesn't exist
1721 ValueError: If user owns teams that cannot be transferred
1722 """
1723 try:
1724 stmt = select(EmailUser).where(EmailUser.email == email)
1725 result = self.db.execute(stmt)
1726 user = result.scalar_one_or_none()
1728 if not user:
1729 raise ValueError(f"User {email} not found")
1731 # Check if user owns any teams
1732 teams_owned_stmt = select(EmailTeam).where(EmailTeam.created_by == email)
1733 teams_owned = self.db.execute(teams_owned_stmt).scalars().all()
1735 if teams_owned:
1736 # For each team, try to transfer ownership to another owner
1737 for team in teams_owned:
1738 # Find other team owners who can take ownership
1739 potential_owners_stmt = (
1740 select(EmailTeamMember).where(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email != email, EmailTeamMember.role == "owner").order_by(EmailTeamMember.role.desc())
1741 )
1743 potential_owners = self.db.execute(potential_owners_stmt).scalars().all()
1745 if potential_owners:
1746 # Transfer ownership to the first available owner
1747 new_owner = potential_owners[0]
1748 team.created_by = new_owner.user_email
1749 logger.info(f"Transferred team '{team.name}' ownership from {email} to {new_owner.user_email}")
1750 else:
1751 # No other owners available - check if it's a single-user team
1752 all_members_stmt = select(EmailTeamMember).where(EmailTeamMember.team_id == team.id)
1753 all_members = self.db.execute(all_members_stmt).scalars().all()
1755 if len(all_members) == 1 and all_members[0].user_email == email:
1756 # This is a single-user personal team - cascade delete it
1757 logger.info(f"Deleting personal team '{team.name}' (single member: {email})")
1758 # Delete team members first (should be just the owner)
1759 delete_team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.team_id == team.id)
1760 self.db.execute(delete_team_members_stmt)
1761 # Delete the team
1762 self.db.delete(team)
1763 else:
1764 # Multi-member team with no other owners - cannot delete user
1765 raise ValueError(f"Cannot delete user {email}: owns team '{team.name}' with {len(all_members)} members but no other owners to transfer ownership to")
1767 # Delete all role assignments for the user
1768 try:
1769 await self.role_service.delete_all_user_roles(email)
1770 except Exception as e:
1771 logger.warning(f"Failed to delete role assignments for {email}: {e}")
1773 # Reassign non-null audit FKs to another user so deleting this user does not
1774 # break referential integrity for historical records.
1775 replacement_row = self.db.query(EmailUser.email).filter(EmailUser.email != email).order_by(EmailUser.is_admin.desc(), EmailUser.created_at.asc()).first()
1776 replacement_email = replacement_row[0] if replacement_row else None
1778 if replacement_email:
1779 self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.invited_by == email).update({EmailTeamInvitation.invited_by: replacement_email}, synchronize_session=False)
1780 self.db.query(Role).filter(Role.created_by == email).update({Role.created_by: replacement_email}, synchronize_session=False)
1781 self.db.query(UserRole).filter(UserRole.granted_by == email).update({UserRole.granted_by: replacement_email}, synchronize_session=False)
1782 self.db.query(TokenRevocation).filter(TokenRevocation.revoked_by == email).update({TokenRevocation.revoked_by: replacement_email}, synchronize_session=False)
1784 # Nullify nullable actor references.
1785 self.db.query(EmailTeamMember).filter(EmailTeamMember.invited_by == email).update({EmailTeamMember.invited_by: None}, synchronize_session=False)
1786 self.db.query(EmailTeamMemberHistory).filter(EmailTeamMemberHistory.action_by == email).update({EmailTeamMemberHistory.action_by: None}, synchronize_session=False)
1787 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.reviewed_by == email).update({EmailTeamJoinRequest.reviewed_by: None}, synchronize_session=False)
1788 self.db.query(PendingUserApproval).filter(PendingUserApproval.approved_by == email).update({PendingUserApproval.approved_by: None}, synchronize_session=False)
1789 self.db.query(SSOAuthSession).filter(SSOAuthSession.user_email == email).update({SSOAuthSession.user_email: None}, synchronize_session=False)
1791 # Remove rows where this user is the primary subject.
1792 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == email).delete(synchronize_session=False)
1794 # Delete related auth events
1795 auth_events_stmt = delete(EmailAuthEvent).where(EmailAuthEvent.user_email == email)
1796 self.db.execute(auth_events_stmt)
1798 # Remove user from all team memberships
1799 team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.user_email == email)
1800 self.db.execute(team_members_stmt)
1802 # Delete the user
1803 self.db.delete(user)
1804 self.db.commit()
1806 # Invalidate all auth caches for deleted user
1807 try:
1808 # First-Party
1809 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
1811 for coro in [auth_cache.invalidate_user(email), auth_cache.invalidate_user_teams(email), auth_cache.invalidate_team_membership(email)]:
1812 task = asyncio.create_task(coro)
1813 _background_tasks.add(task)
1814 task.add_done_callback(_background_tasks.discard)
1815 except Exception as cache_error:
1816 logger.debug(f"Failed to invalidate cache on user delete: {cache_error}")
1818 logger.info(f"User {email} deleted permanently")
1819 return True
1821 except Exception as e:
1822 self.db.rollback()
1823 logger.error(f"Error deleting user {email}: {e}")
1824 raise
1826 async def count_active_admin_users(self) -> int:
1827 """Count the number of active admin users.
1829 Returns:
1830 int: Number of active admin users
1831 """
1832 stmt = select(func.count(EmailUser.email)).where(EmailUser.is_admin.is_(True), EmailUser.is_active.is_(True)) # pylint: disable=not-callable
1833 result = self.db.execute(stmt)
1834 return result.scalar() or 0
1836 async def is_last_active_admin(self, email: str) -> bool:
1837 """Check if the given user is the last active admin.
1839 Args:
1840 email: User's email address
1842 Returns:
1843 bool: True if this user is the last active admin
1844 """
1845 # First check if the user is an active admin
1846 stmt = select(EmailUser).where(EmailUser.email == email)
1847 result = self.db.execute(stmt)
1848 user = result.scalar_one_or_none()
1850 if not user or not user.is_admin or not user.is_active:
1851 return False
1853 # Count total active admins
1854 admin_count = await self.count_active_admin_users()
1855 return admin_count == 1