Coverage for mcpgateway / services / email_auth_service.py: 99%
524 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/email_auth_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Email Authentication Service.
8This module provides email-based user authentication services including
9user creation, authentication, password management, and security features.
11Examples:
12 Basic usage (requires async context):
13 from mcpgateway.services.email_auth_service import EmailAuthService
14 from mcpgateway.db import SessionLocal
16 with SessionLocal() as db:
17 service = EmailAuthService(db)
18 # Use in async context:
19 # user = await service.create_user("test@example.com", "password123")
20 # authenticated = await service.authenticate_user("test@example.com", "password123")
21"""
23# Standard
24import base64
25from dataclasses import dataclass
26from datetime import datetime, timezone
27import re
28from typing import Optional
29import warnings
31# Third-Party
32import orjson
33from sqlalchemy import and_, delete, desc, func, or_, select
34from sqlalchemy.exc import IntegrityError
35from sqlalchemy.orm import Session
37# First-Party
38from mcpgateway.config import settings
39from mcpgateway.db import EmailAuthEvent, EmailTeam, EmailTeamMember, EmailUser, utc_now
40from mcpgateway.schemas import PaginationLinks, PaginationMeta
41from mcpgateway.services.argon2_service import Argon2PasswordService
42from mcpgateway.services.logging_service import LoggingService
43from mcpgateway.utils.pagination import unified_paginate
45# Initialize logging
46logging_service = LoggingService()
47logger = logging_service.get_logger(__name__)
49_GET_ALL_USERS_LIMIT = 10000
52@dataclass(frozen=True)
53class UsersListResult:
54 """Result for list_users queries."""
56 data: list[EmailUser]
57 next_cursor: Optional[str] = None
58 pagination: Optional[PaginationMeta] = None
59 links: Optional[PaginationLinks] = None
62class EmailValidationError(Exception):
63 """Raised when email format is invalid.
65 Examples:
66 >>> try:
67 ... raise EmailValidationError("Invalid email format")
68 ... except EmailValidationError as e:
69 ... str(e)
70 'Invalid email format'
71 """
74class PasswordValidationError(Exception):
75 """Raised when password doesn't meet policy requirements.
77 Examples:
78 >>> try:
79 ... raise PasswordValidationError("Password too short")
80 ... except PasswordValidationError as e:
81 ... str(e)
82 'Password too short'
83 """
86class UserExistsError(Exception):
87 """Raised when attempting to create a user that already exists.
89 Examples:
90 >>> try:
91 ... raise UserExistsError("User already exists")
92 ... except UserExistsError as e:
93 ... str(e)
94 'User already exists'
95 """
98class AuthenticationError(Exception):
99 """Raised when authentication fails.
101 Examples:
102 >>> try:
103 ... raise AuthenticationError("Invalid credentials")
104 ... except AuthenticationError as e:
105 ... str(e)
106 'Invalid credentials'
107 """
110class EmailAuthService:
111 """Service for email-based user authentication.
113 This service handles user registration, authentication, password management,
114 and security features like account lockout and failed attempt tracking.
116 Attributes:
117 db (Session): Database session
118 password_service (Argon2PasswordService): Password hashing service
120 Examples:
121 >>> from mcpgateway.db import SessionLocal
122 >>> with SessionLocal() as db:
123 ... service = EmailAuthService(db)
124 ... # Service is ready to use
125 """
127 get_all_users_deprecated_warned = False
129 def __init__(self, db: Session):
130 """Initialize the email authentication service.
132 Args:
133 db: SQLAlchemy database session
134 """
135 self.db = db
136 self.password_service = Argon2PasswordService()
137 self._role_service = None
138 logger.debug("EmailAuthService initialized")
140 @property
141 def role_service(self):
142 """Lazy-initialized RoleService to avoid circular imports.
144 Returns:
145 RoleService: Instance of RoleService
146 """
147 if self._role_service is None:
148 # First-Party
149 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
151 self._role_service = RoleService(self.db)
152 return self._role_service
154 def validate_email(self, email: str) -> bool:
155 """Validate email address format.
157 Args:
158 email: Email address to validate
160 Returns:
161 bool: True if email is valid
163 Raises:
164 EmailValidationError: If email format is invalid
166 Examples:
167 >>> service = EmailAuthService(None)
168 >>> service.validate_email("user@example.com")
169 True
170 >>> service.validate_email("test.user+tag@domain.co.uk")
171 True
172 >>> service.validate_email("user123@test-domain.com")
173 True
174 >>> try:
175 ... service.validate_email("invalid-email")
176 ... except EmailValidationError as e:
177 ... "Invalid email format" in str(e)
178 True
179 >>> try:
180 ... service.validate_email("")
181 ... except EmailValidationError as e:
182 ... "Email is required" in str(e)
183 True
184 >>> try:
185 ... service.validate_email("user@")
186 ... except EmailValidationError as e:
187 ... "Invalid email format" in str(e)
188 True
189 >>> try:
190 ... service.validate_email("@domain.com")
191 ... except EmailValidationError as e:
192 ... "Invalid email format" in str(e)
193 True
194 >>> try:
195 ... service.validate_email("user@domain")
196 ... except EmailValidationError as e:
197 ... "Invalid email format" in str(e)
198 True
199 >>> try:
200 ... service.validate_email("a" * 250 + "@domain.com")
201 ... except EmailValidationError as e:
202 ... "Email address too long" in str(e)
203 True
204 >>> try:
205 ... service.validate_email(None)
206 ... except EmailValidationError as e:
207 ... "Email is required" in str(e)
208 True
209 """
210 if not email or not isinstance(email, str):
211 raise EmailValidationError("Email is required and must be a string")
213 # Basic email regex pattern
214 email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
216 if not re.match(email_pattern, email):
217 raise EmailValidationError("Invalid email format")
219 if len(email) > 255:
220 raise EmailValidationError("Email address too long (max 255 characters)")
222 return True
224 def validate_password(self, password: str) -> bool:
225 """Validate password against policy requirements.
227 Args:
228 password: Password to validate
230 Returns:
231 bool: True if password meets policy
233 Raises:
234 PasswordValidationError: If password doesn't meet requirements
236 Examples:
237 >>> service = EmailAuthService(None)
238 >>> service.validate_password("Password123!") # Meets all requirements
239 True
240 >>> service.validate_password("ValidPassword123!")
241 True
242 >>> service.validate_password("Shortpass!") # 8+ chars with requirements
243 True
244 >>> service.validate_password("VeryLongPasswordThatMeetsMinimumRequirements!")
245 True
246 >>> try:
247 ... service.validate_password("")
248 ... except PasswordValidationError as e:
249 ... "Password is required" in str(e)
250 True
251 >>> try:
252 ... service.validate_password(None)
253 ... except PasswordValidationError as e:
254 ... "Password is required" in str(e)
255 True
256 >>> try:
257 ... service.validate_password("short") # Only 5 chars, should fail with default min_length=8
258 ... except PasswordValidationError as e:
259 ... "characters long" in str(e)
260 True
261 """
262 if not password:
263 raise PasswordValidationError("Password is required")
265 # Respect global toggle for password policy
266 if not getattr(settings, "password_policy_enabled", True):
267 return True
269 # Get password policy settings
270 min_length = getattr(settings, "password_min_length", 8)
271 require_uppercase = getattr(settings, "password_require_uppercase", False)
272 require_lowercase = getattr(settings, "password_require_lowercase", False)
273 require_numbers = getattr(settings, "password_require_numbers", False)
274 require_special = getattr(settings, "password_require_special", False)
276 if len(password) < min_length:
277 raise PasswordValidationError(f"Password must be at least {min_length} characters long")
279 if require_uppercase and not re.search(r"[A-Z]", password):
280 raise PasswordValidationError("Password must contain at least one uppercase letter")
282 if require_lowercase and not re.search(r"[a-z]", password):
283 raise PasswordValidationError("Password must contain at least one lowercase letter")
285 if require_numbers and not re.search(r"[0-9]", password):
286 raise PasswordValidationError("Password must contain at least one number")
288 if require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
289 raise PasswordValidationError("Password must contain at least one special character")
291 return True
293 async def get_user_by_email(self, email: str) -> Optional[EmailUser]:
294 """Get user by email address.
296 Args:
297 email: Email address to look up
299 Returns:
300 EmailUser or None if not found
302 Examples:
303 # Assuming database has user "test@example.com"
304 # user = await service.get_user_by_email("test@example.com")
305 # user.email if user else None # Returns: 'test@example.com'
306 """
307 try:
308 stmt = select(EmailUser).where(EmailUser.email == email.lower())
309 result = self.db.execute(stmt)
310 user = result.scalar_one_or_none()
311 return user
312 except Exception as e:
313 logger.error(f"Error getting user by email {email}: {e}")
314 return None
316 async def create_user(
317 self,
318 email: str,
319 password: str,
320 full_name: Optional[str] = None,
321 is_admin: bool = False,
322 is_active: bool = True,
323 password_change_required: bool = False,
324 auth_provider: str = "local",
325 skip_password_validation: bool = False,
326 granted_by: Optional[str] = None,
327 ) -> EmailUser:
328 """Create a new user with email authentication.
330 Args:
331 email: User's email address (primary key)
332 password: Plain text password (will be hashed)
333 full_name: Optional full name for display
334 is_admin: Whether user has admin privileges
335 is_active: Whether user account is active (default: True)
336 password_change_required: Whether user must change password on next login (default: False)
337 auth_provider: Authentication provider ('local', 'github', etc.)
338 skip_password_validation: Skip password policy validation (for bootstrap)
339 granted_by: Email of user creating this user (for role assignment audit trail)
341 Returns:
342 EmailUser: The created user object
344 Raises:
345 EmailValidationError: If email format is invalid
346 PasswordValidationError: If password doesn't meet policy
347 UserExistsError: If user already exists
349 Examples:
350 # user = await service.create_user(
351 # email="new@example.com",
352 # password="secure123",
353 # full_name="New User",
354 # is_active=True,
355 # password_change_required=False
356 # )
357 # user.email # Returns: 'new@example.com'
358 # user.full_name # Returns: 'New User'
359 # user.is_active # Returns: True
360 """
361 # Normalize email to lowercase
362 email = email.lower().strip()
364 # Validate inputs
365 self.validate_email(email)
366 if not skip_password_validation:
367 self.validate_password(password)
369 # Check if user already exists
370 existing_user = await self.get_user_by_email(email)
371 if existing_user:
372 raise UserExistsError(f"User with email {email} already exists")
374 # Hash the password
375 password_hash = await self.password_service.hash_password_async(password)
377 # Create new user (record password change timestamp)
378 user = EmailUser(
379 email=email,
380 password_hash=password_hash,
381 full_name=full_name,
382 is_admin=is_admin,
383 is_active=is_active,
384 password_change_required=password_change_required,
385 auth_provider=auth_provider,
386 password_changed_at=utc_now(),
387 admin_origin="api" if is_admin else None,
388 )
390 try:
391 self.db.add(user)
392 self.db.commit()
393 self.db.refresh(user)
395 logger.info(f"Created new user: {email}")
397 # Create personal team first if enabled (needed for team-scoped role assignment)
398 personal_team_id = None
399 if getattr(settings, "auto_create_personal_teams", True):
400 try:
401 # Import here to avoid circular imports
402 # First-Party
403 from mcpgateway.services.personal_team_service import PersonalTeamService # pylint: disable=import-outside-toplevel
405 personal_team_service = PersonalTeamService(self.db)
406 personal_team = await personal_team_service.create_personal_team(user)
407 personal_team_id = personal_team.id # Get team_id directly from created team
408 logger.info(f"Created personal team '{personal_team.name}' (ID: {personal_team_id}) for user {email}")
409 except Exception as e:
410 logger.warning(f"Failed to create personal team for {email}: {e}")
411 # Don't fail user creation if personal team creation fails
413 # Auto-assign dual roles using RoleService (after personal team creation)
414 try:
415 granter = granted_by or email # Use granted_by if provided, otherwise self-granted
417 # Determine global role based on admin status
418 global_role_name = "platform_admin" if is_admin else "platform_viewer"
419 global_role = await self.role_service.get_role_by_name(global_role_name, "global")
421 if global_role:
422 try:
423 await self.role_service.assign_role_to_user(user_email=email, role_id=global_role.id, scope="global", scope_id=None, granted_by=granter)
424 logger.info(f"Assigned {global_role_name} role (global scope) to user {email}")
425 except ValueError as e:
426 logger.warning(f"Could not assign {global_role_name} role to {email}: {e}")
427 else:
428 logger.warning(f"{global_role_name} role not found. User {email} created without global role.")
430 # Assign team_admin role with team scope (if personal team exists)
431 if personal_team_id:
432 team_admin_role = await self.role_service.get_role_by_name("team_admin", "team")
434 if team_admin_role:
435 try:
436 await self.role_service.assign_role_to_user(user_email=email, role_id=team_admin_role.id, scope="team", scope_id=personal_team_id, granted_by=granter)
437 logger.info(f"Assigned team_admin role (team scope: {personal_team_id}) to user {email}")
438 except ValueError as e:
439 logger.warning(f"Could not assign team_admin role to {email}: {e}")
440 else:
441 logger.warning(f"team_admin role not found. User {email} created without team admin role.")
443 except Exception as role_error:
444 logger.error(f"Failed to assign roles to user {email}: {role_error}")
445 # Don't fail user creation if role assignment fails
446 # User can be assigned roles manually later
448 # Log registration event
449 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=True)
450 self.db.add(registration_event)
451 self.db.commit()
453 return user
455 except IntegrityError as e:
456 self.db.rollback()
457 logger.error(f"Database error creating user {email}: {e}")
458 raise UserExistsError(f"User with email {email} already exists") from e
459 except Exception as e:
460 self.db.rollback()
461 logger.error(f"Unexpected error creating user {email}: {e}")
463 # Log failed registration
464 registration_event = EmailAuthEvent.create_registration_event(user_email=email, success=False, failure_reason=str(e))
465 self.db.add(registration_event)
466 self.db.commit()
468 raise
470 async def authenticate_user(self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[EmailUser]:
471 """Authenticate a user with email and password.
473 Args:
474 email: User's email address
475 password: Plain text password
476 ip_address: Client IP address for logging
477 user_agent: Client user agent for logging
479 Returns:
480 EmailUser if authentication successful, None otherwise
482 Examples:
483 # user = await service.authenticate_user("user@example.com", "correct_password")
484 # user.email if user else None # Returns: 'user@example.com'
485 # await service.authenticate_user("user@example.com", "wrong_password") # Returns: None
486 """
487 email = email.lower().strip()
489 # Get user from database
490 user = await self.get_user_by_email(email)
492 # Track authentication attempt
493 auth_success = False
494 failure_reason = None
496 try:
497 if not user:
498 failure_reason = "User not found"
499 logger.info(f"Authentication failed for {email}: user not found")
500 return None
502 if not user.is_active:
503 failure_reason = "Account is disabled"
504 logger.info(f"Authentication failed for {email}: account disabled")
505 return None
507 is_protected_admin = user.is_admin and settings.protect_all_admins
509 if user.is_account_locked() and not is_protected_admin:
510 failure_reason = "Account is locked"
511 logger.info(f"Authentication failed for {email}: account locked")
512 return None
514 # Clear lockout for protected admins so they can always attempt login
515 if is_protected_admin and user.is_account_locked():
516 logger.info(f"Clearing lockout for protected admin {email}")
517 user.reset_failed_attempts()
518 self.db.commit()
520 # Verify password
521 if not await self.password_service.verify_password_async(password, user.password_hash):
522 failure_reason = "Invalid password"
524 # Increment failed attempts (skip for protected admins)
525 if not is_protected_admin:
526 max_attempts = getattr(settings, "max_failed_login_attempts", 5)
527 lockout_duration = getattr(settings, "account_lockout_duration_minutes", 30)
529 is_locked = user.increment_failed_attempts(max_attempts, lockout_duration)
531 if is_locked:
532 logger.warning(f"Account locked for {email} after {max_attempts} failed attempts")
533 failure_reason = "Account locked due to too many failed attempts"
535 self.db.commit()
536 logger.info(f"Authentication failed for {email}: invalid password")
537 return None
539 # Authentication successful
540 user.reset_failed_attempts()
541 self.db.commit()
543 auth_success = True
544 logger.info(f"Authentication successful for {email}")
546 return user
548 finally:
549 # Log authentication event
550 auth_event = EmailAuthEvent.create_login_attempt(user_email=email, success=auth_success, ip_address=ip_address, user_agent=user_agent, failure_reason=failure_reason)
551 self.db.add(auth_event)
552 self.db.commit()
554 async def change_password(self, email: str, old_password: Optional[str], new_password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> bool:
555 """Change a user's password.
557 Args:
558 email: User's email address
559 old_password: Current password for verification
560 new_password: New password to set
561 ip_address: Client IP address for logging
562 user_agent: Client user agent for logging
564 Returns:
565 bool: True if password changed successfully
567 Raises:
568 AuthenticationError: If old password is incorrect
569 PasswordValidationError: If new password doesn't meet policy
570 Exception: If database operation fails
572 Examples:
573 # success = await service.change_password(
574 # "user@example.com",
575 # "old_password",
576 # "new_secure_password"
577 # )
578 # success # Returns: True
579 """
580 # Validate old password is provided
581 if old_password is None:
582 raise AuthenticationError("Current password is required")
584 # First authenticate with old password
585 user = await self.authenticate_user(email, old_password, ip_address, user_agent)
586 if not user:
587 raise AuthenticationError("Current password is incorrect")
589 # Validate new password
590 self.validate_password(new_password)
592 # Check if new password is same as old (optional policy)
593 if getattr(settings, "password_prevent_reuse", True) and await self.password_service.verify_password_async(new_password, user.password_hash):
594 raise PasswordValidationError("New password must be different from current password")
596 success = False
597 try:
598 # Hash new password and update
599 new_password_hash = await self.password_service.hash_password_async(new_password)
600 user.password_hash = new_password_hash
601 # Clear the flag that requires the user to change password
602 user.password_change_required = False
603 # Record the password change timestamp
604 try:
605 user.password_changed_at = utc_now()
606 except Exception as exc:
607 logger.debug("Failed to set password_changed_at for %s: %s", email, exc)
609 self.db.commit()
610 success = True
612 # Invalidate auth cache for user
613 try:
614 # Standard
615 import asyncio # pylint: disable=import-outside-toplevel
617 # First-Party
618 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
620 # Ensure cache invalidation runs before returning to avoid stale
621 # auth context being used by subsequent requests. Use a timeout
622 # to prevent blocking if Redis is slow or unresponsive.
623 # Shield the task to prevent cancellation on timeout - allows Redis
624 # operations to complete in background even if we stop waiting.
625 await asyncio.wait_for(asyncio.shield(auth_cache.invalidate_user(email)), timeout=5.0)
626 except asyncio.TimeoutError:
627 logger.warning(f"Auth cache invalidation timed out for {email} - continuing in background")
628 except Exception as cache_error:
629 logger.debug(f"Failed to invalidate auth cache on password change: {cache_error}")
631 logger.info(f"Password changed successfully for {email}")
633 except Exception as e:
634 self.db.rollback()
635 logger.error(f"Error changing password for {email}: {e}")
636 raise
637 finally:
638 # Log password change event
639 password_event = EmailAuthEvent.create_password_change_event(user_email=email, success=success, ip_address=ip_address, user_agent=user_agent)
640 self.db.add(password_event)
641 self.db.commit()
643 return success
645 async def create_platform_admin(self, email: str, password: str, full_name: Optional[str] = None) -> EmailUser:
646 """Create or update the platform administrator user.
648 This method is used during system bootstrap to create the initial
649 admin user from environment variables.
651 Args:
652 email: Admin email address
653 password: Admin password
654 full_name: Admin full name
656 Returns:
657 EmailUser: The admin user
659 Examples:
660 # admin = await service.create_platform_admin(
661 # "admin@example.com",
662 # "admin_password",
663 # "Platform Administrator"
664 # )
665 # admin.is_admin # Returns: True
666 """
667 # Check if admin user already exists
668 existing_admin = await self.get_user_by_email(email)
670 if existing_admin:
671 # Update existing admin if password or name changed
672 if full_name and existing_admin.full_name != full_name:
673 existing_admin.full_name = full_name
675 # Check if password needs update (verify current password first)
676 if not await self.password_service.verify_password_async(password, existing_admin.password_hash):
677 existing_admin.password_hash = await self.password_service.hash_password_async(password)
678 try:
679 existing_admin.password_changed_at = utc_now()
680 except Exception as exc:
681 logger.debug("Failed to set password_changed_at for existing admin %s: %s", email, exc)
683 # Ensure admin status
684 existing_admin.is_admin = True
685 existing_admin.is_active = True
687 self.db.commit()
688 logger.info(f"Updated platform admin user: {email}")
689 return existing_admin
691 # Create new admin user - skip password validation during bootstrap
692 admin_user = await self.create_user(email=email, password=password, full_name=full_name, is_admin=True, auth_provider="local", skip_password_validation=True)
694 logger.info(f"Created platform admin user: {email}")
695 return admin_user
697 async def update_last_login(self, email: str) -> None:
698 """Update the last login timestamp for a user.
700 Args:
701 email: User's email address
702 """
703 user = await self.get_user_by_email(email)
704 if user:
705 user.reset_failed_attempts() # This also updates last_login
706 self.db.commit()
708 @staticmethod
709 def _escape_like(value: str) -> str:
710 """Escape LIKE wildcards for prefix search.
712 Args:
713 value: Raw value to escape for LIKE matching.
715 Returns:
716 Escaped string safe for LIKE patterns.
717 """
718 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
720 async def list_users(
721 self,
722 limit: Optional[int] = None,
723 cursor: Optional[str] = None,
724 page: Optional[int] = None,
725 per_page: Optional[int] = None,
726 search: Optional[str] = None,
727 ) -> UsersListResult:
728 """List all users with cursor or page-based pagination support and optional search.
730 This method supports both cursor-based (for API endpoints with large datasets)
731 and page-based (for admin UI with page numbers) pagination, with optional
732 search filtering by email or full name.
734 Note: This method returns ORM objects and cannot be cached since callers
735 depend on ORM attributes and methods (e.g., EmailUserResponse.from_email_user).
737 Args:
738 limit: Maximum number of users to return (for cursor-based pagination)
739 cursor: Opaque cursor token for cursor-based pagination
740 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
741 per_page: Items per page for page-based pagination
742 search: Optional search term to filter by email or full name (case-insensitive)
744 Returns:
745 UsersListResult with data and optional pagination metadata.
747 Examples:
748 # Cursor-based pagination (for APIs)
749 # result = await service.list_users(cursor=None, limit=50)
750 # len(result.data) <= 50 # Returns: True
752 # Page-based pagination (for admin UI)
753 # result = await service.list_users(page=1, per_page=10)
754 # result.data # Returns: list of users
755 # result.pagination # Returns: pagination metadata
757 # Search users
758 # users = await service.list_users(search="john", page=1, per_page=10)
759 # All users with "john" in email or name
760 """
761 try:
762 # Build base query with ordering by created_at, email for consistent pagination
763 # Note: EmailUser uses email as primary key, not id
764 query = select(EmailUser).order_by(desc(EmailUser.created_at), desc(EmailUser.email))
766 # Apply search filter if provided (prefix search for better index usage)
767 if search and search.strip():
768 search_term = f"{self._escape_like(search.strip())}%"
769 # NOTE: For large Postgres datasets, consider citext or functional indexes for case-insensitive search.
770 query = query.where(
771 or_(
772 EmailUser.email.ilike(search_term, escape="\\"),
773 EmailUser.full_name.ilike(search_term, escape="\\"),
774 )
775 )
777 # Page-based pagination: use unified_paginate
778 if page is not None:
779 pag_result = await unified_paginate(
780 db=self.db,
781 query=query,
782 page=page,
783 per_page=per_page,
784 cursor=None,
785 limit=None,
786 base_url="/admin/users",
787 query_params={},
788 )
789 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"])
791 # Cursor-based pagination: custom implementation for EmailUser
792 # EmailUser uses email as PK (not id), so we need custom cursor using (created_at, email)
793 page_size = limit if limit and limit > 0 else settings.pagination_default_page_size
794 if limit == 0:
795 page_size = None # No limit
797 # Decode cursor and apply keyset filter if provided
798 if cursor:
799 try:
800 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
801 cursor_data = orjson.loads(cursor_json)
802 last_email = cursor_data.get("email")
803 created_str = cursor_data.get("created_at")
804 if last_email and created_str:
805 last_created = datetime.fromisoformat(created_str)
806 # Apply keyset filter (assumes DESC order on created_at, email)
807 query = query.where(
808 or_(
809 EmailUser.created_at < last_created,
810 and_(EmailUser.created_at == last_created, EmailUser.email < last_email),
811 )
812 )
813 except (ValueError, TypeError) as e:
814 logger.warning(f"Invalid cursor for user pagination, ignoring: {e}")
816 # Fetch page_size + 1 to determine if there are more results
817 if page_size is not None:
818 query = query.limit(page_size + 1)
819 result = self.db.execute(query)
820 users = list(result.scalars().all())
822 if page_size is None:
823 return UsersListResult(data=users, next_cursor=None)
825 # Check if there are more results
826 has_more = len(users) > page_size
827 if has_more:
828 users = users[:page_size]
830 # Generate next cursor using (created_at, email) for EmailUser
831 next_cursor = None
832 if has_more and users:
833 last_user = users[-1]
834 cursor_data = {
835 "created_at": last_user.created_at.isoformat() if last_user.created_at else None,
836 "email": last_user.email,
837 }
838 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
840 return UsersListResult(data=users, next_cursor=next_cursor)
842 except Exception as e:
843 logger.error(f"Error listing users: {e}")
844 # Return appropriate empty response based on pagination mode
845 if page is not None:
846 fallback_per_page = per_page or 50
847 return UsersListResult(
848 data=[],
849 pagination=PaginationMeta(page=page, per_page=fallback_per_page, total_items=0, total_pages=0, has_next=False, has_prev=False),
850 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg
851 self=f"/admin/users?page=1&per_page={fallback_per_page}",
852 first=f"/admin/users?page=1&per_page={fallback_per_page}",
853 last=f"/admin/users?page=1&per_page={fallback_per_page}",
854 ),
855 )
857 if cursor is not None:
858 return UsersListResult(data=[], next_cursor=None)
860 return UsersListResult(data=[])
862 async def list_users_not_in_team(
863 self,
864 team_id: str,
865 cursor: Optional[str] = None,
866 limit: Optional[int] = None,
867 page: Optional[int] = None,
868 per_page: Optional[int] = None,
869 search: Optional[str] = None,
870 ) -> UsersListResult:
871 """List users who are NOT members of the specified team with cursor or page-based pagination.
873 Uses a NOT IN subquery to efficiently exclude team members.
875 Args:
876 team_id: ID of the team to exclude members from
877 cursor: Opaque cursor token for cursor-based pagination
878 limit: Maximum number of users to return (for cursor-based, default: 50)
879 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
880 per_page: Items per page for page-based pagination (default: 30)
881 search: Optional search term to filter by email or full name
883 Returns:
884 UsersListResult with data and either cursor or pagination metadata
886 Examples:
887 # Page-based (admin UI)
888 # result = await service.list_users_not_in_team("team-123", page=1, per_page=30)
889 # result.pagination # Returns: pagination metadata
891 # Cursor-based (API)
892 # result = await service.list_users_not_in_team("team-123", cursor=None, limit=50)
893 # result.next_cursor # Returns: next cursor token
894 """
895 try:
896 # Build base query
897 query = select(EmailUser)
899 # Apply search filter if provided
900 if search and search.strip():
901 search_term = f"{self._escape_like(search.strip())}%"
902 query = query.where(
903 or_(
904 EmailUser.email.ilike(search_term, escape="\\"),
905 EmailUser.full_name.ilike(search_term, escape="\\"),
906 )
907 )
909 # Exclude team members using NOT IN subquery
910 member_emails_subquery = select(EmailTeamMember.user_email).where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
911 query = query.where(EmailUser.is_active.is_(True), ~EmailUser.email.in_(member_emails_subquery))
913 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate
914 if page is not None:
915 query = query.order_by(EmailUser.full_name, EmailUser.email)
916 pag_result = await unified_paginate(
917 db=self.db,
918 query=query,
919 page=page,
920 per_page=per_page or 30,
921 cursor=None,
922 limit=None,
923 base_url=f"/admin/teams/{team_id}/non-members",
924 query_params={},
925 )
926 return UsersListResult(data=pag_result["data"], pagination=pag_result["pagination"], links=pag_result["links"])
928 # CURSOR-BASED PAGINATION - custom implementation using (created_at, email)
929 # unified_paginate uses (created_at, id) but EmailUser uses email as PK
930 query = query.order_by(desc(EmailUser.created_at), desc(EmailUser.email))
932 # Decode cursor and apply keyset filter
933 if cursor:
934 try:
935 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
936 cursor_data = orjson.loads(cursor_json)
937 last_email = cursor_data.get("email")
938 created_str = cursor_data.get("created_at")
939 if last_email and created_str:
940 last_created = datetime.fromisoformat(created_str)
941 # Keyset filter: (created_at < last) OR (created_at = last AND email < last_email)
942 query = query.where(
943 or_(
944 EmailUser.created_at < last_created,
945 and_(EmailUser.created_at == last_created, EmailUser.email < last_email),
946 )
947 )
948 except (ValueError, TypeError) as e:
949 logger.warning(f"Invalid cursor for non-members list, ignoring: {e}")
951 # Fetch limit + 1 to check for more results
952 page_size = limit or 50
953 query = query.limit(page_size + 1)
954 users = list(self.db.execute(query).scalars().all())
956 # Check if there are more results
957 has_more = len(users) > page_size
958 if has_more:
959 users = users[:page_size]
961 # Generate next cursor using (created_at, email)
962 next_cursor = None
963 if has_more and users:
964 last_user = users[-1]
965 cursor_data = {
966 "created_at": last_user.created_at.isoformat() if last_user.created_at else None,
967 "email": last_user.email,
968 }
969 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
971 self.db.commit()
972 return UsersListResult(data=users, next_cursor=next_cursor)
974 except Exception as e:
975 logger.error(f"Error listing non-members for team {team_id}: {e}")
977 # Return appropriate empty response based on mode
978 if page is not None:
979 return UsersListResult(
980 data=[],
981 pagination=PaginationMeta(page=page, per_page=per_page or 30, total_items=0, total_pages=0, has_next=False, has_prev=False),
982 links=PaginationLinks( # pylint: disable=kwarg-superseded-by-positional-arg
983 self=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
984 first=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
985 last=f"/admin/teams/{team_id}/non-members?page=1&per_page={per_page or 30}",
986 ),
987 )
989 return UsersListResult(data=[], next_cursor=None)
991 async def get_all_users(self) -> list[EmailUser]:
992 """Get all users without pagination.
994 .. deprecated:: 1.0
995 Use :meth:`list_users` with proper pagination instead.
996 This method has a hardcoded limit of 10,000 users and will not return
997 more than that. For production systems with many users, use paginated
998 access with search/filtering.
1000 Returns:
1001 List of up to 10,000 EmailUser objects
1003 Raises:
1004 ValueError: If total users exceed 10,000
1006 Examples:
1007 # users = await service.get_all_users()
1008 # isinstance(users, list) # Returns: True
1010 Warning:
1011 This method is deprecated and will be removed in a future version.
1012 Use list_users() with pagination instead:
1014 # For small datasets
1015 users = await service.list_users(page=1, per_page=1000).data
1017 # For searching
1018 users = await service.list_users(search="john", page=1, per_page=10).data
1019 """
1020 if not self.__class__.get_all_users_deprecated_warned:
1021 warnings.warn(
1022 "get_all_users() is deprecated and limited to 10,000 users. " + "Use list_users() with pagination instead.",
1023 DeprecationWarning,
1024 stacklevel=2,
1025 )
1026 self.__class__.get_all_users_deprecated_warned = True
1028 total_users = await self.count_users()
1029 if total_users > _GET_ALL_USERS_LIMIT:
1030 raise ValueError("get_all_users() supports up to 10,000 users. Use list_users() pagination instead.")
1032 result = await self.list_users(limit=_GET_ALL_USERS_LIMIT)
1033 return result.data # Large limit to get all users
1035 async def count_users(self) -> int:
1036 """Count total number of users.
1038 Returns:
1039 int: Total user count
1040 """
1041 try:
1042 stmt = select(func.count(EmailUser.email)) # pylint: disable=not-callable
1043 count = self.db.execute(stmt).scalar() or 0
1044 return count
1045 except Exception as e:
1046 logger.error(f"Error counting users: {e}")
1047 return 0
1049 async def get_auth_events(self, email: Optional[str] = None, limit: int = 100, offset: int = 0) -> list[EmailAuthEvent]:
1050 """Get authentication events for auditing.
1052 Args:
1053 email: Filter by specific user email (optional)
1054 limit: Maximum number of events to return
1055 offset: Number of events to skip
1057 Returns:
1058 List of EmailAuthEvent objects
1059 """
1060 try:
1061 stmt = select(EmailAuthEvent)
1062 if email:
1063 stmt = stmt.where(EmailAuthEvent.user_email == email)
1064 stmt = stmt.order_by(EmailAuthEvent.timestamp.desc()).offset(offset).limit(limit)
1066 result = self.db.execute(stmt)
1067 events = list(result.scalars().all())
1068 return events
1069 except Exception as e:
1070 logger.error(f"Error getting auth events: {e}")
1071 return []
1073 async def update_user(
1074 self,
1075 email: str,
1076 full_name: Optional[str] = None,
1077 is_admin: Optional[bool] = None,
1078 is_active: Optional[bool] = None,
1079 password_change_required: Optional[bool] = None,
1080 password: Optional[str] = None,
1081 admin_origin_source: Optional[str] = None,
1082 ) -> EmailUser:
1083 """Update user information.
1085 Args:
1086 email: User's email address (primary key)
1087 full_name: New full name (optional)
1088 is_admin: New admin status (optional)
1089 is_active: New active status (optional)
1090 password_change_required: Whether user must change password on next login (optional)
1091 password: New password (optional, will be hashed)
1092 admin_origin_source: Source of admin change for tracking (e.g. "api", "ui"). Callers should pass explicitly.
1094 Returns:
1095 EmailUser: Updated user object
1097 Raises:
1098 ValueError: If user doesn't exist, if protect_all_admins blocks the change, or if it would remove the last active admin
1099 PasswordValidationError: If password doesn't meet policy
1100 """
1101 try:
1102 # Normalize email to match create_user() / get_user_by_email() behavior
1103 email = email.lower().strip()
1105 # Get existing user
1106 stmt = select(EmailUser).where(EmailUser.email == email)
1107 result = self.db.execute(stmt)
1108 user = result.scalar_one_or_none()
1110 if not user:
1111 raise ValueError(f"User {email} not found")
1113 # Admin protection guard
1114 if user.is_admin and user.is_active:
1115 would_lose_admin = (is_admin is not None and not is_admin) or (is_active is not None and not is_active)
1116 if would_lose_admin:
1117 if settings.protect_all_admins:
1118 raise ValueError("Admin protection is enabled — cannot demote or deactivate any admin user")
1119 if await self.is_last_active_admin(email):
1120 raise ValueError("Cannot demote or deactivate the last remaining active admin user")
1122 # Update fields if provided
1123 if full_name is not None:
1124 user.full_name = full_name
1126 if is_admin is not None:
1127 # Track admin_origin when status actually changes
1128 if is_admin != user.is_admin: 1128 ↛ 1172line 1128 didn't jump to line 1172 because the condition on line 1128 was always true
1129 user.is_admin = is_admin
1130 user.admin_origin = admin_origin_source if is_admin else None
1132 # Sync global role assignment with is_admin flag:
1133 # Promotion: revoke platform_viewer, assign platform_admin
1134 # Demotion: revoke platform_admin, assign platform_viewer
1135 try:
1136 platform_admin_role = await self.role_service.get_role_by_name("platform_admin", "global")
1137 platform_viewer_role = await self.role_service.get_role_by_name("platform_viewer", "global")
1139 if is_admin:
1140 # Promotion: assign platform_admin, revoke platform_viewer
1141 if platform_admin_role:
1142 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None)
1143 if not existing or not existing.is_active: 1143 ↛ 1149line 1143 didn't jump to line 1149 because the condition on line 1143 was always true
1144 await self.role_service.assign_role_to_user(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=email)
1145 logger.info(f"Assigned platform_admin role to {email}")
1146 else:
1147 logger.warning(f"platform_admin role not found, cannot assign to {email}")
1149 if platform_viewer_role:
1150 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=platform_viewer_role.id, scope="global", scope_id=None)
1151 if revoked: 1151 ↛ 1172line 1151 didn't jump to line 1172 because the condition on line 1151 was always true
1152 logger.info(f"Revoked platform_viewer role from {email}")
1153 else:
1154 # Demotion: revoke platform_admin, assign platform_viewer
1155 if platform_admin_role: 1155 ↛ 1160line 1155 didn't jump to line 1160 because the condition on line 1155 was always true
1156 revoked = await self.role_service.revoke_role_from_user(user_email=email, role_id=platform_admin_role.id, scope="global", scope_id=None)
1157 if revoked: 1157 ↛ 1160line 1157 didn't jump to line 1160 because the condition on line 1157 was always true
1158 logger.info(f"Revoked platform_admin role from {email}")
1160 if platform_viewer_role: 1160 ↛ 1166line 1160 didn't jump to line 1166 because the condition on line 1160 was always true
1161 existing = await self.role_service.get_user_role_assignment(user_email=email, role_id=platform_viewer_role.id, scope="global", scope_id=None)
1162 if not existing or not existing.is_active: 1162 ↛ 1172line 1162 didn't jump to line 1172 because the condition on line 1162 was always true
1163 await self.role_service.assign_role_to_user(user_email=email, role_id=platform_viewer_role.id, scope="global", scope_id=None, granted_by=email)
1164 logger.info(f"Assigned platform_viewer role to {email}")
1165 else:
1166 logger.warning(f"platform_viewer role not found, cannot assign to {email}")
1168 except Exception as e:
1169 logger.warning(f"Failed to sync global roles for {email}: {e}")
1170 # Don't fail user update if role sync fails
1172 if is_active is not None:
1173 user.is_active = is_active
1175 if password is not None:
1176 self.validate_password(password)
1177 user.password_hash = await self.password_service.hash_password_async(password)
1178 # Only clear password_change_required if it wasn't explicitly set
1179 if password_change_required is None:
1180 user.password_change_required = False
1181 user.password_changed_at = utc_now()
1183 # Set password_change_required after password processing to allow explicit override
1184 if password_change_required is not None:
1185 user.password_change_required = password_change_required
1187 user.updated_at = datetime.now(timezone.utc)
1189 self.db.commit()
1191 return user
1193 except Exception as e:
1194 self.db.rollback()
1195 logger.error(f"Error updating user {email}: {e}")
1196 raise
1198 async def activate_user(self, email: str) -> EmailUser:
1199 """Activate a user account.
1201 Args:
1202 email: User's email address
1204 Returns:
1205 EmailUser: Updated user object
1207 Raises:
1208 ValueError: If user doesn't exist
1209 """
1210 try:
1211 stmt = select(EmailUser).where(EmailUser.email == email)
1212 result = self.db.execute(stmt)
1213 user = result.scalar_one_or_none()
1215 if not user:
1216 raise ValueError(f"User {email} not found")
1218 user.is_active = True
1219 user.updated_at = datetime.now(timezone.utc)
1221 self.db.commit()
1223 logger.info(f"User {email} activated")
1224 return user
1226 except Exception as e:
1227 self.db.rollback()
1228 logger.error(f"Error activating user {email}: {e}")
1229 raise
1231 async def deactivate_user(self, email: str) -> EmailUser:
1232 """Deactivate a user account.
1234 Args:
1235 email: User's email address
1237 Returns:
1238 EmailUser: Updated user object
1240 Raises:
1241 ValueError: If user doesn't exist
1242 """
1243 try:
1244 stmt = select(EmailUser).where(EmailUser.email == email)
1245 result = self.db.execute(stmt)
1246 user = result.scalar_one_or_none()
1248 if not user:
1249 raise ValueError(f"User {email} not found")
1251 user.is_active = False
1252 user.updated_at = datetime.now(timezone.utc)
1254 self.db.commit()
1256 logger.info(f"User {email} deactivated")
1257 return user
1259 except Exception as e:
1260 self.db.rollback()
1261 logger.error(f"Error deactivating user {email}: {e}")
1262 raise
1264 async def delete_user(self, email: str) -> bool:
1265 """Delete a user account permanently.
1267 Args:
1268 email: User's email address
1270 Returns:
1271 bool: True if user was deleted
1273 Raises:
1274 ValueError: If user doesn't exist
1275 ValueError: If user owns teams that cannot be transferred
1276 """
1277 try:
1278 stmt = select(EmailUser).where(EmailUser.email == email)
1279 result = self.db.execute(stmt)
1280 user = result.scalar_one_or_none()
1282 if not user:
1283 raise ValueError(f"User {email} not found")
1285 # Check if user owns any teams
1286 teams_owned_stmt = select(EmailTeam).where(EmailTeam.created_by == email)
1287 teams_owned = self.db.execute(teams_owned_stmt).scalars().all()
1289 if teams_owned:
1290 # For each team, try to transfer ownership to another owner
1291 for team in teams_owned:
1292 # Find other team owners who can take ownership
1293 potential_owners_stmt = (
1294 select(EmailTeamMember).where(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email != email, EmailTeamMember.role == "owner").order_by(EmailTeamMember.role.desc())
1295 )
1297 potential_owners = self.db.execute(potential_owners_stmt).scalars().all()
1299 if potential_owners:
1300 # Transfer ownership to the first available owner
1301 new_owner = potential_owners[0]
1302 team.created_by = new_owner.user_email
1303 logger.info(f"Transferred team '{team.name}' ownership from {email} to {new_owner.user_email}")
1304 else:
1305 # No other owners available - check if it's a single-user team
1306 all_members_stmt = select(EmailTeamMember).where(EmailTeamMember.team_id == team.id)
1307 all_members = self.db.execute(all_members_stmt).scalars().all()
1309 if len(all_members) == 1 and all_members[0].user_email == email:
1310 # This is a single-user personal team - cascade delete it
1311 logger.info(f"Deleting personal team '{team.name}' (single member: {email})")
1312 # Delete team members first (should be just the owner)
1313 delete_team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.team_id == team.id)
1314 self.db.execute(delete_team_members_stmt)
1315 # Delete the team
1316 self.db.delete(team)
1317 else:
1318 # Multi-member team with no other owners - cannot delete user
1319 raise ValueError(f"Cannot delete user {email}: owns team '{team.name}' with {len(all_members)} members but no other owners to transfer ownership to")
1321 # Delete all role assignments for the user
1322 try:
1323 await self.role_service.delete_all_user_roles(email)
1324 except Exception as e:
1325 logger.warning(f"Failed to delete role assignments for {email}: {e}")
1327 # Delete related auth events
1328 auth_events_stmt = delete(EmailAuthEvent).where(EmailAuthEvent.user_email == email)
1329 self.db.execute(auth_events_stmt)
1331 # Remove user from all team memberships
1332 team_members_stmt = delete(EmailTeamMember).where(EmailTeamMember.user_email == email)
1333 self.db.execute(team_members_stmt)
1335 # Delete the user
1336 self.db.delete(user)
1337 self.db.commit()
1339 # Invalidate all auth caches for deleted user
1340 try:
1341 # Standard
1342 import asyncio # pylint: disable=import-outside-toplevel
1344 # First-Party
1345 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
1347 asyncio.create_task(auth_cache.invalidate_user(email))
1348 asyncio.create_task(auth_cache.invalidate_user_teams(email))
1349 asyncio.create_task(auth_cache.invalidate_team_membership(email))
1350 except Exception as cache_error:
1351 logger.debug(f"Failed to invalidate cache on user delete: {cache_error}")
1353 logger.info(f"User {email} deleted permanently")
1354 return True
1356 except Exception as e:
1357 self.db.rollback()
1358 logger.error(f"Error deleting user {email}: {e}")
1359 raise
1361 async def count_active_admin_users(self) -> int:
1362 """Count the number of active admin users.
1364 Returns:
1365 int: Number of active admin users
1366 """
1367 stmt = select(func.count(EmailUser.email)).where(EmailUser.is_admin.is_(True), EmailUser.is_active.is_(True)) # pylint: disable=not-callable
1368 result = self.db.execute(stmt)
1369 return result.scalar() or 0
1371 async def is_last_active_admin(self, email: str) -> bool:
1372 """Check if the given user is the last active admin.
1374 Args:
1375 email: User's email address
1377 Returns:
1378 bool: True if this user is the last active admin
1379 """
1380 # First check if the user is an active admin
1381 stmt = select(EmailUser).where(EmailUser.email == email)
1382 result = self.db.execute(stmt)
1383 user = result.scalar_one_or_none()
1385 if not user or not user.is_admin or not user.is_active:
1386 return False
1388 # Count total active admins
1389 admin_count = await self.count_active_admin_users()
1390 return admin_count == 1