Coverage for mcpgateway / services / team_management_service.py: 97%
796 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/team_management_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Management Service.
8This module provides team creation, management, and membership operations
9for the multi-team collaboration system.
11Examples:
12 >>> from unittest.mock import Mock
13 >>> service = TeamManagementService(Mock())
14 >>> isinstance(service, TeamManagementService)
15 True
16 >>> hasattr(service, 'db')
17 True
18"""
20# Standard
21import asyncio
22import base64
23from datetime import datetime, timedelta
24from typing import Any, Dict, List, Optional, Tuple, Union
26# Third-Party
27import orjson
28from sqlalchemy import and_, desc, func, or_, select
29from sqlalchemy.orm import selectinload, Session
31# First-Party
32from mcpgateway.cache.admin_stats_cache import admin_stats_cache
33from mcpgateway.cache.auth_cache import auth_cache, get_auth_cache
34from mcpgateway.common.validators import SecurityValidator
35from mcpgateway.config import settings
36from mcpgateway.db import EmailTeam, EmailTeamJoinRequest, EmailTeamMember, EmailTeamMemberHistory, EmailUser, utc_now
37from mcpgateway.services.logging_service import LoggingService
38from mcpgateway.utils.create_slug import slugify
39from mcpgateway.utils.pagination import unified_paginate
40from mcpgateway.utils.redis_client import get_redis_client
42# Initialize logging
43logging_service = LoggingService()
44logger = logging_service.get_logger(__name__)
47def get_user_team_count(db: Session, user_email: str) -> int:
48 """Get the number of active teams a user belongs to.
50 Args:
51 db: SQLAlchemy database session
52 user_email: Email address of the user
54 Returns:
55 int: Number of active team memberships
56 """
57 return db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).count()
60class TeamManagementError(Exception):
61 """Base class for team management-related errors.
63 Examples:
64 >>> error = TeamManagementError("Test error")
65 >>> str(error)
66 'Test error'
67 >>> isinstance(error, Exception)
68 True
69 """
72class InvalidRoleError(TeamManagementError):
73 """Raised when an invalid role is specified.
75 Examples:
76 >>> error = InvalidRoleError("Invalid role: guest")
77 >>> str(error)
78 'Invalid role: guest'
79 >>> isinstance(error, TeamManagementError)
80 True
81 """
84class TeamNotFoundError(TeamManagementError):
85 """Raised when a team does not exist.
87 Examples:
88 >>> error = TeamNotFoundError("Team not found: team-123")
89 >>> str(error)
90 'Team not found: team-123'
91 >>> isinstance(error, TeamManagementError)
92 True
93 """
96class UserNotFoundError(TeamManagementError):
97 """Raised when a user does not exist.
99 Examples:
100 >>> error = UserNotFoundError("User not found: user@example.com")
101 >>> str(error)
102 'User not found: user@example.com'
103 >>> isinstance(error, TeamManagementError)
104 True
105 """
108class MemberAlreadyExistsError(TeamManagementError):
109 """Raised when a user is already a member of the team.
111 Examples:
112 >>> error = MemberAlreadyExistsError("User user@example.com is already a member of team team-123")
113 >>> str(error)
114 'User user@example.com is already a member of team team-123'
115 >>> isinstance(error, TeamManagementError)
116 True
117 """
120class TeamMemberLimitExceededError(TeamManagementError):
121 """Raised when a team has reached its maximum member limit.
123 Examples:
124 >>> error = TeamMemberLimitExceededError("Team has reached maximum member limit of 10")
125 >>> str(error)
126 'Team has reached maximum member limit of 10'
127 >>> isinstance(error, TeamManagementError)
128 True
129 """
132class TeamMemberAddError(TeamManagementError):
133 """Raised when adding a member to a team fails due to database or system errors.
135 Examples:
136 >>> error = TeamMemberAddError("Failed to add member due to database error")
137 >>> str(error)
138 'Failed to add member due to database error'
139 >>> isinstance(error, TeamManagementError)
140 True
141 """
144class _Unset:
145 """Sentinel type: distinguishes 'caller omitted the argument' from 'caller passed None'."""
147 __slots__ = ()
149 def __repr__(self) -> str:
150 """Return string representation of the unset sentinel.
152 Returns:
153 str: The literal string ``"UNSET"``.
154 """
155 return "UNSET"
157 def __bool__(self) -> bool:
158 """Make the sentinel falsy so truthiness checks treat it as not provided.
160 Returns:
161 bool: Always ``False``.
162 """
163 return False
166UNSET = _Unset()
169def get_effective_max_members(team: "EmailTeam") -> int:
170 """Return the effective member limit for a team.
172 If the team has an explicit ``max_members`` value stored in the DB, that
173 value is used. Otherwise the global ``MAX_MEMBERS_PER_TEAM`` setting is
174 returned so that changing the environment variable takes effect for all
175 teams that have not been individually overridden.
177 Args:
178 team: The team whose effective member limit should be resolved.
180 Returns:
181 The member limit (always an int; 0 means unlimited).
182 """
183 if team.max_members is not None:
184 return team.max_members
185 return settings.max_members_per_team
188def check_team_member_capacity(db: "Session", team: "EmailTeam", *, extra_count: int = 0) -> None:
189 """Raise if the team is at or over its member capacity.
191 Args:
192 db: Database session for querying active member count.
193 team: The team to check.
194 extra_count: Additional slots to reserve (e.g. pending invitations).
196 Raises:
197 TeamMemberLimitExceededError: If the team is at capacity.
198 """
199 effective_max = get_effective_max_members(team)
200 if effective_max <= 0:
201 return
202 member_count = db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.is_active.is_(True)).count()
203 if (member_count + extra_count) >= effective_max:
204 raise TeamMemberLimitExceededError(f"Team has reached maximum member limit of {effective_max}")
207class TeamManagementService:
208 """Service for team management operations.
210 This service handles team creation, membership management,
211 role assignments, and team access control.
213 Attributes:
214 db (Session): SQLAlchemy database session
216 Examples:
217 >>> from unittest.mock import Mock
218 >>> service = TeamManagementService(Mock())
219 >>> service.__class__.__name__
220 'TeamManagementService'
221 >>> hasattr(service, 'db')
222 True
223 """
225 def __init__(self, db: Session):
226 """Initialize the team management service.
228 Args:
229 db: SQLAlchemy database session
231 Examples:
232 Basic initialization:
233 >>> from mcpgateway.services.team_management_service import TeamManagementService
234 >>> from unittest.mock import Mock
235 >>> db_session = Mock()
236 >>> service = TeamManagementService(db_session)
237 >>> service.db is db_session
238 True
240 Service attributes:
241 >>> hasattr(service, 'db')
242 True
243 >>> service.__class__.__name__
244 'TeamManagementService'
245 """
246 self.db = db
247 self._role_service = None # Lazy initialization to avoid circular imports
249 @property
250 def role_service(self):
251 """Lazy-initialized RoleService to avoid circular imports.
253 Returns:
254 RoleService: Instance of RoleService
255 """
256 if self._role_service is None:
257 # First-Party
258 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
260 self._role_service = RoleService(self.db)
261 return self._role_service
263 def _get_user_team_count(self, user_email: str) -> int:
264 """Get the number of active teams a user belongs to.
266 Args:
267 user_email: Email address of the user
269 Returns:
270 int: Number of active team memberships
271 """
272 return get_user_team_count(self.db, user_email)
274 @staticmethod
275 def _get_rbac_role_name(membership_role: str) -> str:
276 """Map a team membership role to the corresponding configurable RBAC role name.
278 Args:
279 membership_role: Team membership role ('owner' or 'member').
281 Returns:
282 str: The configured RBAC role name from settings.
283 """
284 return settings.default_team_owner_role if membership_role == "owner" else settings.default_team_member_role
286 @staticmethod
287 def _fire_and_forget(coro: Any) -> None:
288 """Schedule a background coroutine and close it if scheduling fails.
290 Args:
291 coro: The coroutine to schedule as a background task.
293 Raises:
294 Exception: If asyncio.create_task fails (e.g. no running loop).
295 """
296 try:
297 task = asyncio.create_task(coro)
298 # Some tests patch create_task with a plain Mock return value. In that
299 # case the coroutine is never actually scheduled and must be closed.
300 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task):
301 close = getattr(coro, "close", None)
302 if callable(close):
303 close()
304 except Exception:
305 # If create_task() fails (e.g. no running loop), the coroutine has
306 # already been created and must be closed to avoid runtime warnings.
307 close = getattr(coro, "close", None)
308 if callable(close):
309 close()
310 raise
312 def _log_team_member_action(self, team_member_id: str, team_id: str, user_email: str, role: str, action: str, action_by: Optional[str]):
313 """
314 Log a team member action to EmailTeamMemberHistory.
316 Args:
317 team_member_id: ID of the EmailTeamMember
318 team_id: Team ID
319 user_email: Email of the affected user
320 role: Role at the time of action
321 action: Action type ("added", "removed", "reactivated", "role_changed")
322 action_by: Email of the user who performed the action
324 Examples:
325 >>> from mcpgateway.services.team_management_service import TeamManagementService
326 >>> from unittest.mock import Mock
327 >>> service = TeamManagementService(Mock())
328 >>> service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com")
329 """
330 history = EmailTeamMemberHistory(team_member_id=team_member_id, team_id=team_id, user_email=user_email, role=role, action=action, action_by=action_by, action_timestamp=utc_now())
331 self.db.add(history)
332 self.db.commit()
334 async def _assign_team_rbac_role(self, user_email: str, team_id: str, membership_role: str, granted_by: Optional[str] = None) -> None:
335 """Assign a team-scoped RBAC role matching the membership role.
337 Looks up the configured RBAC role name for *membership_role* (owner
338 or member) and assigns it if the user does not already have an
339 active assignment. Errors are logged but do not propagate.
341 Args:
342 user_email: Email of the user to assign the role to.
343 team_id: Team ID for the scoped assignment.
344 membership_role: Team membership role (``"owner"`` or ``"member"``).
345 granted_by: Email of the user who triggered the assignment.
346 """
347 try:
348 rbac_role_name = self._get_rbac_role_name(membership_role)
349 team_rbac_role = await self.role_service.get_role_by_name(rbac_role_name, scope="team")
350 if team_rbac_role:
351 existing = await self.role_service.get_user_role_assignment(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id)
352 if not existing or not existing.is_active:
353 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id, granted_by=granted_by or user_email)
354 logger.info(f"Assigned {rbac_role_name} role to {SecurityValidator.sanitize_log_message(user_email)} for team {SecurityValidator.sanitize_log_message(team_id)}")
355 else:
356 logger.debug(f"User {SecurityValidator.sanitize_log_message(user_email)} already has active {rbac_role_name} role for team {SecurityValidator.sanitize_log_message(team_id)}")
357 else:
358 logger.warning(f"Role '{rbac_role_name}' not found. User {SecurityValidator.sanitize_log_message(user_email)} added without RBAC role.")
359 except Exception as role_error:
360 logger.warning(f"Failed to assign role to {SecurityValidator.sanitize_log_message(user_email)}: {role_error}")
362 def _invalidate_membership_caches(self, user_email: str, team_id: str, *, include_admin_stats: bool = True) -> None:
363 """Invalidate auth and admin caches after a membership change.
365 Errors are logged at debug level but do not propagate.
367 Args:
368 user_email: Email of the affected user.
369 team_id: Team ID whose caches should be invalidated.
370 include_admin_stats: Whether to also invalidate admin stats (skip on removal).
371 """
372 try:
373 self._fire_and_forget(auth_cache.invalidate_team(user_email))
374 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id))
375 self._fire_and_forget(auth_cache.invalidate_user_teams(user_email))
376 self._fire_and_forget(auth_cache.invalidate_team_membership(user_email))
377 if include_admin_stats:
378 self._fire_and_forget(admin_stats_cache.invalidate_teams())
379 except Exception as cache_error:
380 logger.debug(f"Failed to invalidate membership caches for {SecurityValidator.sanitize_log_message(user_email)}: {cache_error}")
382 def _check_user_team_limit(self, user_email: str) -> None:
383 """Raise if the user has reached the maximum team membership limit.
385 Args:
386 user_email: Email of the user to check.
388 Raises:
389 TeamManagementError: If the user is at the configured limit.
390 """
391 max_teams = getattr(settings, "max_teams_per_user", 50)
392 if self._get_user_team_count(user_email) >= max_teams:
393 raise TeamManagementError(f"User has reached the maximum team limit of {max_teams}")
395 async def create_team(
396 self, name: str, description: Optional[str], created_by: str, visibility: Optional[str] = "public", max_members: Optional[int] = None, skip_limits: bool = False
397 ) -> EmailTeam:
398 """Create a new team.
400 Args:
401 name: Team name
402 description: Team description
403 created_by: Email of the user creating the team
404 visibility: Team visibility (private, team, public)
405 max_members: Maximum number of team members allowed
406 skip_limits: Skip max_teams_per_user check (for admin bypass)
408 Returns:
409 EmailTeam: The created team
411 Raises:
412 ValueError: If team name is taken or invalid
413 Exception: If team creation fails
415 Examples:
416 Team creation parameter validation:
417 >>> from mcpgateway.services.team_management_service import TeamManagementService
419 Test team name validation:
420 >>> team_name = "My Development Team"
421 >>> len(team_name) > 0
422 True
423 >>> len(team_name) <= 255
424 True
425 >>> bool(team_name.strip())
426 True
428 Test visibility validation:
429 >>> visibility = "private"
430 >>> valid_visibilities = ["private", "public"]
431 >>> visibility in valid_visibilities
432 True
433 >>> "invalid" in valid_visibilities
434 False
436 Test max_members validation:
437 >>> max_members = 50
438 >>> isinstance(max_members, int)
439 True
440 >>> max_members > 0
441 True
443 Test creator validation:
444 >>> created_by = "admin@example.com"
445 >>> "@" in created_by
446 True
447 >>> len(created_by) > 0
448 True
450 Test description handling:
451 >>> description = "A team for software development"
452 >>> description is not None
453 True
454 >>> isinstance(description, str)
455 True
457 >>> # Test None description
458 >>> description_none = None
459 >>> description_none is None
460 True
461 """
462 try:
463 # Validate visibility
464 valid_visibilities = ["private", "public"]
465 if visibility not in valid_visibilities:
466 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}")
468 # Check max teams per user
469 if not skip_limits:
470 max_teams = getattr(settings, "max_teams_per_user", 50)
471 if self._get_user_team_count(created_by) >= max_teams:
472 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
474 # Enforce max_members cap for non-admins (only when explicitly provided)
475 if not skip_limits and max_members is not None:
476 max_limit = settings.max_members_per_team
477 if max_members > max_limit:
478 raise ValueError(f"max_members cannot exceed the configured limit of {max_limit}")
480 # If max_members is not explicitly provided, leave it as None in the DB.
481 # The effective limit will be resolved at check time from settings.max_members_per_team,
482 # so changing the env var affects all teams that don't have an explicit override.
484 # Check for existing inactive team with same name
486 potential_slug = slugify(name)
487 existing_inactive_team = self.db.query(EmailTeam).filter(EmailTeam.slug == potential_slug, EmailTeam.is_active.is_(False)).first()
489 if existing_inactive_team:
490 # Reactivate the existing team with new details
491 existing_inactive_team.name = name
492 existing_inactive_team.description = description
493 existing_inactive_team.created_by = created_by
494 existing_inactive_team.visibility = visibility
495 existing_inactive_team.max_members = max_members
496 existing_inactive_team.is_active = True
497 existing_inactive_team.updated_at = utc_now()
498 team = existing_inactive_team
500 # Check if the creator already has an inactive membership
501 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email == created_by).first()
503 if existing_membership:
504 # Reactivate existing membership as owner
505 existing_membership.role = "owner"
506 existing_membership.joined_at = utc_now()
507 existing_membership.is_active = True
508 membership = existing_membership
509 else:
510 # Create new membership
511 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True)
512 self.db.add(membership)
514 logger.info(f"Reactivated existing team with slug {potential_slug}")
515 else:
516 # Create the team (slug will be auto-generated by event listener)
517 team = EmailTeam(name=name, description=description, created_by=created_by, is_personal=False, visibility=visibility, max_members=max_members, is_active=True)
518 self.db.add(team)
520 self.db.flush() # Get the team ID
522 # Add the creator as owner
523 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True)
524 self.db.add(membership)
526 self.db.commit()
528 # Invalidate member count cache for the new team
529 await self.invalidate_team_member_count_cache(str(team.id))
531 # Invalidate auth cache for creator's team membership
532 # Without this, the cache won't know the user belongs to this new team
533 try:
534 await auth_cache.invalidate_user_teams(created_by)
535 await auth_cache.invalidate_team_membership(created_by)
536 await auth_cache.invalidate_user_role(created_by, str(team.id))
537 await admin_stats_cache.invalidate_teams()
538 except Exception as cache_error:
539 logger.debug(f"Failed to invalidate cache on team create: {cache_error}")
541 logger.info(f"Created team '{SecurityValidator.sanitize_log_message(team.name)}' by {created_by}")
542 return team
544 except Exception as e:
545 self.db.rollback()
546 logger.error(f"Failed to create team '{name}': {e}")
547 raise
549 async def get_team_by_id(self, team_id: str) -> Optional[EmailTeam]:
550 """Get a team by ID.
552 Args:
553 team_id: Team ID to lookup
555 Returns:
556 EmailTeam: The team or None if not found
558 Examples:
559 >>> import asyncio
560 >>> from unittest.mock import Mock
561 >>> service = TeamManagementService(Mock())
562 >>> asyncio.iscoroutinefunction(service.get_team_by_id)
563 True
564 """
565 try:
566 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
567 self.db.commit() # Release transaction to avoid idle-in-transaction
568 return team
570 except Exception as e:
571 self.db.rollback()
572 logger.error(f"Failed to get team by ID {SecurityValidator.sanitize_log_message(team_id)}: {e}")
573 return None
575 async def get_team_by_slug(self, slug: str) -> Optional[EmailTeam]:
576 """Get a team by slug.
578 Args:
579 slug: Team slug to lookup
581 Returns:
582 EmailTeam: The team or None if not found
584 Examples:
585 >>> import asyncio
586 >>> from unittest.mock import Mock
587 >>> service = TeamManagementService(Mock())
588 >>> asyncio.iscoroutinefunction(service.get_team_by_slug)
589 True
590 """
591 try:
592 team = self.db.query(EmailTeam).filter(EmailTeam.slug == slug, EmailTeam.is_active.is_(True)).first()
593 self.db.commit() # Release transaction to avoid idle-in-transaction
594 return team
596 except Exception as e:
597 self.db.rollback()
598 logger.error(f"Failed to get team by slug {slug}: {e}")
599 return None
601 async def update_team(
602 self,
603 team_id: str,
604 name: Optional[str] = None,
605 description: Optional[str] = None,
606 visibility: Optional[str] = None,
607 max_members: Union[int, None, _Unset] = UNSET,
608 updated_by: Optional[str] = None,
609 skip_limits: bool = False,
610 ) -> bool:
611 """Update team information.
613 Args:
614 team_id: ID of the team to update
615 name: New team name
616 description: New team description
617 visibility: New visibility setting
618 max_members: New maximum member limit. Pass ``None`` to clear an
619 explicit per-team override (reverts to global default). Omit
620 (or pass ``UNSET``) to leave the current value unchanged.
621 updated_by: Email of user making the update
622 skip_limits: Skip the max_members_per_team cap check (platform admins only)
624 Returns:
625 bool: True if update succeeded, False otherwise
627 Raises:
628 ValueError: If visibility setting is invalid
630 Examples:
631 >>> import asyncio
632 >>> from unittest.mock import Mock
633 >>> service = TeamManagementService(Mock())
634 >>> asyncio.iscoroutinefunction(service.update_team)
635 True
636 """
637 try:
638 team = await self.get_team_by_id(team_id)
639 if not team:
640 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found for update")
641 return False
643 # Prevent updating personal teams
644 if team.is_personal:
645 logger.warning(f"Cannot update personal team {SecurityValidator.sanitize_log_message(team_id)}")
646 return False
648 # Update fields if provided
649 if name is not None:
650 team.name = name
651 # Slug will be updated by event listener if name changes
653 if description is not None:
654 team.description = description
656 if visibility is not None:
657 valid_visibilities = ["private", "public"]
658 if visibility not in valid_visibilities:
659 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}")
660 team.visibility = visibility
662 # UNSET means "not provided" — leave unchanged.
663 # None means "explicitly clear the per-team override" — store NULL.
664 # An int means "set an explicit per-team limit".
665 if max_members is not UNSET:
666 if max_members is not None and not skip_limits:
667 max_limit = settings.max_members_per_team
668 if max_members > max_limit:
669 raise ValueError(f"max_members cannot exceed the configured limit of {max_limit}")
670 team.max_members = max_members
672 team.updated_at = utc_now()
673 self.db.commit()
675 logger.info(f"Updated team {SecurityValidator.sanitize_log_message(team_id)} by {updated_by}")
676 return True
678 except ValueError:
679 raise # Let ValueError propagate to caller for proper error handling
680 except Exception as e:
681 self.db.rollback()
682 logger.error(f"Failed to update team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
683 return False
685 async def delete_team(self, team_id: str, deleted_by: str) -> bool:
686 """Delete a team (soft delete).
688 Args:
689 team_id: ID of the team to delete
690 deleted_by: Email of user performing deletion
692 Returns:
693 bool: True if deletion succeeded, False otherwise
695 Raises:
696 ValueError: If attempting to delete a personal team
698 Examples:
699 >>> import asyncio
700 >>> from unittest.mock import Mock
701 >>> service = TeamManagementService(Mock())
702 >>> asyncio.iscoroutinefunction(service.delete_team)
703 True
704 """
705 try:
706 team = await self.get_team_by_id(team_id)
707 if not team:
708 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found for deletion")
709 return False
711 # Prevent deleting personal teams
712 if team.is_personal:
713 logger.warning(f"Cannot delete personal team {SecurityValidator.sanitize_log_message(team_id)}")
714 raise ValueError("Personal teams cannot be deleted")
716 # Soft delete the team
717 team.is_active = False
718 team.updated_at = utc_now()
720 # Get all active memberships before deactivating (for history logging)
721 memberships = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).all()
723 # Log history for each membership (before bulk update)
724 for membership in memberships:
725 self._log_team_member_action(membership.id, team_id, membership.user_email, membership.role, "team-deleted", deleted_by)
727 # Bulk update: deactivate all memberships in single query instead of looping
728 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).update({EmailTeamMember.is_active: False}, synchronize_session=False)
730 self.db.commit()
732 # Invalidate all role caches for this team
733 try:
734 self._fire_and_forget(auth_cache.invalidate_team_roles(team_id))
735 self._fire_and_forget(admin_stats_cache.invalidate_teams())
736 # Also invalidate team cache, teams list cache, and team membership cache for each member
737 for membership in memberships:
738 self._fire_and_forget(auth_cache.invalidate_team(membership.user_email))
739 self._fire_and_forget(auth_cache.invalidate_user_teams(membership.user_email))
740 self._fire_and_forget(auth_cache.invalidate_team_membership(membership.user_email))
741 except Exception as cache_error:
742 logger.debug(f"Failed to invalidate caches on team delete: {cache_error}")
744 logger.info(f"Deleted team {SecurityValidator.sanitize_log_message(team_id)} by {deleted_by}")
745 return True
747 except Exception as e:
748 self.db.rollback()
749 logger.error(f"Failed to delete team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
750 return False
752 async def add_member_to_team(self, team_id: str, user_email: str, role: str = "member", invited_by: Optional[str] = None, grant_source: Optional[str] = None) -> EmailTeamMember:
753 """Add a member to a team.
755 Args:
756 team_id: ID of the team
757 user_email: Email of the user to add
758 role: Role to assign (owner, member)
759 invited_by: Email of user who added this member
760 grant_source: Origin of grant (e.g., 'sso', 'manual', 'bootstrap', 'auto')
762 Returns:
763 EmailTeamMember: The created or reactivated team member object
765 Raises:
766 InvalidRoleError: If role is invalid
767 TeamNotFoundError: If team does not exist
768 TeamManagementError: If team is a personal team
769 UserNotFoundError: If user does not exist
770 MemberAlreadyExistsError: If user is already a member
771 TeamMemberLimitExceededError: If team has reached maximum member limit
772 TeamMemberAddError: If adding member fails due to database or system errors
774 Examples:
775 >>> import asyncio
776 >>> from unittest.mock import Mock
777 >>> service = TeamManagementService(Mock())
778 >>> asyncio.iscoroutinefunction(service.add_member_to_team)
779 True
780 >>> # After adding, EmailTeamMemberHistory is updated
781 >>> # service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com")
782 """
783 # Validate role
784 valid_roles = ["owner", "member"]
785 if role not in valid_roles:
786 raise InvalidRoleError(f"Invalid role '{role}'. Must be one of: {', '.join(valid_roles)}")
788 # Check if team exists
789 team = await self.get_team_by_id(team_id)
790 if not team:
791 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found")
792 raise TeamNotFoundError("Team not found")
794 # Prevent adding members to personal teams
795 if team.is_personal:
796 logger.warning(f"Cannot add members to personal team {SecurityValidator.sanitize_log_message(team_id)}")
797 raise TeamManagementError("Cannot add members to personal teams")
799 # Check if user exists
800 user = self.db.query(EmailUser).filter(EmailUser.email == user_email).first()
801 if not user:
802 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} not found")
803 raise UserNotFoundError("User not found")
805 self._check_user_team_limit(user_email)
807 # Check if user is already a member
808 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email).first()
810 if existing_membership and existing_membership.is_active:
811 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} is already a member of team {SecurityValidator.sanitize_log_message(team_id)}")
812 raise MemberAlreadyExistsError("User is already a member of this team")
814 # Check team member limit (explicit per-team value or global default)
815 check_team_member_capacity(self.db, team)
817 # Add or reactivate membership
818 try:
819 if existing_membership:
820 existing_membership.is_active = True
821 existing_membership.role = role
822 existing_membership.joined_at = utc_now()
823 existing_membership.invited_by = invited_by
824 if grant_source is not None:
825 existing_membership.grant_source = grant_source
826 self.db.commit()
827 self._log_team_member_action(existing_membership.id, team_id, user_email, role, "reactivated", invited_by)
828 member = existing_membership
829 else:
830 membership = EmailTeamMember(team_id=team_id, user_email=user_email, role=role, joined_at=utc_now(), invited_by=invited_by, grant_source=grant_source, is_active=True)
831 self.db.add(membership)
832 self.db.commit()
833 self._log_team_member_action(membership.id, team_id, user_email, role, "added", invited_by)
834 member = membership
836 await self._assign_team_rbac_role(user_email, team_id, role, granted_by=invited_by)
837 self._invalidate_membership_caches(user_email, team_id)
838 await self.invalidate_team_member_count_cache(str(team_id))
840 logger.info(f"Added {SecurityValidator.sanitize_log_message(user_email)} to team {SecurityValidator.sanitize_log_message(team_id)} with role {role}")
841 return member
843 except Exception as e:
844 self.db.rollback()
845 logger.error(f"Failed to add {SecurityValidator.sanitize_log_message(user_email)} to team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
846 raise TeamMemberAddError("Failed to add member to team") from e
848 async def remove_member_from_team(self, team_id: str, user_email: str, removed_by: Optional[str] = None) -> bool:
849 """Remove a member from a team.
851 Args:
852 team_id: ID of the team
853 user_email: Email of the user to remove
854 removed_by: Email of user performing the removal
856 Returns:
857 bool: True if member was removed successfully, False otherwise
859 Raises:
860 ValueError: If attempting to remove the last owner
862 Examples:
863 Team membership management with role-based access control.
864 After removal, EmailTeamMemberHistory is updated via _log_team_member_action.
865 """
866 try:
867 team = await self.get_team_by_id(team_id)
868 if not team:
869 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found")
870 return False
872 # Prevent removing members from personal teams
873 if team.is_personal:
874 logger.warning(f"Cannot remove members from personal team {SecurityValidator.sanitize_log_message(team_id)}")
875 return False
877 # Find the membership
878 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
880 if not membership:
881 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} is not a member of team {SecurityValidator.sanitize_log_message(team_id)}")
882 return False
884 # Prevent removing the last owner
885 if membership.role == "owner":
886 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
888 if owner_count <= 1:
889 logger.warning(f"Cannot remove the last owner from team {SecurityValidator.sanitize_log_message(team_id)}")
890 raise ValueError("Cannot remove the last owner from a team")
892 # Remove membership (soft delete)
893 membership.is_active = False
894 self.db.commit()
895 self._log_team_member_action(membership.id, team_id, user_email, membership.role, "removed", removed_by)
897 # Revoke all team-scoped RBAC roles from removed member defensively
898 # (revoke both owner and member roles to handle edge cases)
899 try:
900 for role_name in (settings.default_team_owner_role, settings.default_team_member_role):
901 rbac_role = await self.role_service.get_role_by_name(role_name, scope="team")
902 if rbac_role:
903 revoked = await self.role_service.revoke_role_from_user(user_email=user_email, role_id=rbac_role.id, scope="team", scope_id=team_id)
904 if revoked:
905 logger.info(f"Revoked {role_name} role from {SecurityValidator.sanitize_log_message(user_email)} for team {SecurityValidator.sanitize_log_message(team_id)}")
906 except Exception as role_error:
907 logger.warning(f"Failed to revoke roles from {SecurityValidator.sanitize_log_message(user_email)}: {role_error}")
909 self._invalidate_membership_caches(user_email, team_id, include_admin_stats=False)
910 await self.invalidate_team_member_count_cache(str(team_id))
912 logger.info(f"Removed {SecurityValidator.sanitize_log_message(user_email)} from team {SecurityValidator.sanitize_log_message(team_id)} by {removed_by}")
913 return True
915 except Exception as e:
916 self.db.rollback()
917 logger.error(f"Failed to remove {SecurityValidator.sanitize_log_message(user_email)} from team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
918 return False
920 async def update_member_role(self, team_id: str, user_email: str, new_role: str, updated_by: Optional[str] = None) -> bool:
921 """Update a team member's role.
923 Args:
924 team_id: ID of the team
925 user_email: Email of the user whose role to update
926 new_role: New role to assign
927 updated_by: Email of user making the change
929 Returns:
930 bool: True if role was updated successfully, False otherwise
932 Raises:
933 ValueError: If role is invalid or removing last owner role
935 Examples:
936 Role management within teams for access control.
937 After role update, EmailTeamMemberHistory is updated via _log_team_member_action.
938 """
939 try:
940 # Validate role
941 valid_roles = ["owner", "member"]
942 if new_role not in valid_roles:
943 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
945 team = await self.get_team_by_id(team_id)
946 if not team:
947 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found")
948 return False
950 # Prevent updating roles in personal teams
951 if team.is_personal:
952 logger.warning(f"Cannot update roles in personal team {SecurityValidator.sanitize_log_message(team_id)}")
953 return False
955 # Find the membership
956 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
958 if not membership:
959 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} is not a member of team {SecurityValidator.sanitize_log_message(team_id)}")
960 return False
962 # Prevent changing the role of the last owner to non-owner
963 if membership.role == "owner" and new_role != "owner":
964 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
966 if owner_count <= 1:
967 logger.warning(f"Cannot remove owner role from the last owner of team {SecurityValidator.sanitize_log_message(team_id)}")
968 raise ValueError("Cannot remove owner role from the last owner of a team")
970 # Update the role
971 old_role = membership.role
972 membership.role = new_role
973 self.db.commit()
974 self._log_team_member_action(membership.id, team_id, user_email, new_role, "role_changed", updated_by)
976 # Handle RBAC role changes when team membership role changes
977 if old_role != new_role:
978 try:
979 # Get both role types
980 team_member_role = await self.role_service.get_role_by_name(settings.default_team_member_role, scope="team")
981 team_owner_role = await self.role_service.get_role_by_name(settings.default_team_owner_role, scope="team")
983 # Handle role transitions
984 if old_role == "member" and new_role == "owner":
985 # member -> owner: revoke member role, assign owner role
986 if team_member_role:
987 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id)
988 if team_owner_role:
989 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email)
990 logger.info(
991 f"Transitioned RBAC role from {settings.default_team_member_role} to {settings.default_team_owner_role} for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}"
992 )
994 elif old_role == "owner" and new_role == "member":
995 # owner -> member: revoke owner role, assign member role
996 if team_owner_role:
997 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id)
998 if team_member_role:
999 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email)
1000 logger.info(
1001 f"Transitioned RBAC role from {settings.default_team_owner_role} to {settings.default_team_member_role} for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}"
1002 )
1004 except Exception as role_error:
1005 logger.warning(f"Failed to update RBAC roles for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {role_error}")
1006 # Don't fail the membership role update if RBAC role update fails
1008 # Invalidate role cache
1009 try:
1010 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id))
1011 except Exception as cache_error:
1012 logger.debug(f"Failed to invalidate cache on role update: {cache_error}")
1014 logger.info(f"Updated role of {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)} to {new_role} by {updated_by}")
1015 return True
1017 except ValueError:
1018 raise # Let ValueError propagate to caller for proper error handling
1019 except Exception as e:
1020 self.db.rollback()
1021 logger.error(f"Failed to update role of {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
1022 return False
1024 async def get_member(self, team_id: str, user_email: str) -> Optional[EmailTeamMember]:
1025 """Get a single team member by team ID and user email.
1027 Args:
1028 team_id: ID of the team
1029 user_email: Email of the user
1031 Returns:
1032 EmailTeamMember if found and active, None otherwise
1033 """
1034 try:
1035 return self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
1036 except Exception as e:
1037 logger.error(f"Failed to get member {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
1038 return None
1040 async def get_user_teams(self, user_email: str, include_personal: bool = True) -> List[EmailTeam]:
1041 """Get all teams a user belongs to.
1043 Uses caching to reduce database queries (called 20+ times per request).
1044 Cache can be disabled via AUTH_CACHE_TEAMS_ENABLED=false.
1046 Args:
1047 user_email: Email of the user
1048 include_personal: Whether to include personal teams
1050 Returns:
1051 List[EmailTeam]: List of teams the user belongs to
1053 Examples:
1054 User dashboard showing team memberships.
1055 """
1056 # Check cache first
1057 cache = self._get_auth_cache()
1058 cache_key = f"{user_email}:{include_personal}"
1060 if cache:
1061 cached_team_ids = await cache.get_user_teams(cache_key)
1062 if cached_team_ids is not None:
1063 if not cached_team_ids: # Empty list = user has no teams
1064 return []
1065 # Fetch full team objects by IDs (fast indexed lookup)
1066 try:
1067 teams = self.db.query(EmailTeam).filter(EmailTeam.id.in_(cached_team_ids), EmailTeam.is_active.is_(True)).all()
1068 self.db.commit() # Release transaction to avoid idle-in-transaction
1069 return teams
1070 except Exception as e:
1071 self.db.rollback()
1072 logger.warning(f"Failed to fetch teams by IDs from cache: {e}")
1073 # Fall through to full query
1075 # Cache miss or caching disabled - do full query
1076 try:
1077 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True))
1079 if not include_personal:
1080 query = query.filter(EmailTeam.is_personal.is_(False))
1082 teams = query.all()
1083 self.db.commit() # Release transaction to avoid idle-in-transaction
1085 # Update cache with team IDs
1086 if cache:
1087 team_ids = [t.id for t in teams]
1088 await cache.set_user_teams(cache_key, team_ids)
1090 return teams
1092 except Exception as e:
1093 self.db.rollback()
1094 logger.error(f"Failed to get teams for user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1095 return []
1097 async def verify_team_for_user(self, user_email, team_id=None):
1098 """
1099 Retrieve a team ID for a user based on their membership and optionally a specific team ID.
1101 This function attempts to fetch all teams associated with the given user email.
1102 If no `team_id` is provided, it returns the ID of the user's personal team (if any).
1103 If a `team_id` is provided, it checks whether the user is a member of that team.
1104 If the user is not a member of the specified team, it returns a JSONResponse with an error message.
1106 Args:
1107 user_email (str): The email of the user whose teams are being queried.
1108 team_id (str or None, optional): Specific team ID to check for membership. Defaults to None.
1110 Returns:
1111 str or JSONResponse or None:
1112 - If `team_id` is None, returns the ID of the user's personal team or None if not found.
1113 - If `team_id` is provided and the user is a member of that team, returns `team_id`.
1114 - If `team_id` is provided but the user is not a member of that team, returns a JSONResponse with error.
1115 - Returns None if an error occurs and no `team_id` was initially provided.
1117 Raises:
1118 None explicitly, but any exceptions during the process are caught and logged.
1120 Examples:
1121 Verifies user team if team_id provided otherwise finds its personal id.
1122 """
1123 try:
1124 # Get all teams the user belongs to in a single query
1125 try:
1126 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True))
1127 user_teams = query.all()
1128 self.db.commit() # Release transaction to avoid idle-in-transaction
1129 except Exception as e:
1130 self.db.rollback()
1131 logger.error(f"Failed to get teams for user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1132 return []
1134 if not team_id:
1135 # If no team_id is provided, try to get the personal team
1136 personal_team = next((t for t in user_teams if getattr(t, "is_personal", False)), None)
1137 team_id = personal_team.id if personal_team else None
1138 else:
1139 # Check if the provided team_id exists among the user's teams
1140 is_team_present = any(team.id == team_id for team in user_teams)
1141 if not is_team_present:
1142 return []
1143 except Exception as e:
1144 self.db.rollback()
1145 logger.error(f"Failed to verify team for user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1146 if not team_id:
1147 team_id = None
1149 return team_id
1151 @staticmethod
1152 def _escape_like(value: str) -> str:
1153 """Escape LIKE wildcards for prefix search.
1155 Args:
1156 value: Raw value to escape for LIKE matching.
1158 Returns:
1159 Escaped string safe for LIKE patterns.
1160 """
1161 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
1163 async def get_team_members(
1164 self,
1165 team_id: str,
1166 cursor: Optional[str] = None,
1167 limit: Optional[int] = None,
1168 page: Optional[int] = None,
1169 per_page: Optional[int] = None,
1170 search: Optional[str] = None,
1171 ) -> Union[List[Tuple[EmailUser, EmailTeamMember]], Tuple[List[Tuple[EmailUser, EmailTeamMember]], Optional[str]], Dict[str, Any]]:
1172 """Get all members of a team with optional cursor or page-based pagination.
1174 Note: This method returns ORM objects and cannot be cached since callers
1175 depend on ORM attributes and methods.
1177 Args:
1178 team_id: ID of the team
1179 cursor: Opaque cursor token for cursor-based pagination
1180 limit: Maximum number of members to return (for cursor-based, default: 50)
1181 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1182 per_page: Items per page for page-based pagination (default: 30)
1183 search: Optional search term to filter by email or full name
1185 Returns:
1186 - If cursor is provided: Tuple (members, next_cursor)
1187 - If page is provided: Dict with keys 'data', 'pagination', 'links'
1188 - If neither: List of all members (backward compatibility)
1190 Examples:
1191 Team member management and role display.
1192 """
1193 try:
1194 # Build base query - for pagination, select EmailTeamMember and eager-load user
1195 # For backward compat (no pagination), select both entities as tuple
1196 # Build optional search filter
1197 search_filter = None
1198 if search and search.strip():
1199 search_term = f"{self._escape_like(search.strip())}%"
1200 search_filter = or_(
1201 EmailUser.email.ilike(search_term, escape="\\"),
1202 EmailUser.full_name.ilike(search_term, escape="\\"),
1203 )
1205 if cursor is None and page is None and limit is None:
1206 # Backward compatibility: return tuples (no pagination requested)
1207 query = (
1208 select(EmailUser, EmailTeamMember)
1209 .join(EmailTeamMember, EmailUser.email == EmailTeamMember.user_email)
1210 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
1211 .order_by(EmailUser.full_name, EmailUser.email)
1212 )
1213 if search_filter is not None:
1214 query = query.where(search_filter)
1215 result = self.db.execute(query)
1216 members = list(result.all())
1217 self.db.commit()
1218 return members
1220 # For pagination: select EmailTeamMember and eager-load user to avoid N+1
1221 query = (
1222 select(EmailTeamMember)
1223 .options(selectinload(EmailTeamMember.user))
1224 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
1225 .join(EmailUser, EmailUser.email == EmailTeamMember.user_email)
1226 )
1227 if search_filter is not None:
1228 query = query.where(search_filter)
1230 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate
1231 if page is not None:
1232 # Alphabetical ordering for user-friendly display
1233 query = query.order_by(EmailUser.full_name, EmailUser.email)
1234 pag_result = await unified_paginate(
1235 db=self.db,
1236 query=query,
1237 page=page,
1238 per_page=per_page or 30,
1239 cursor=None,
1240 limit=None,
1241 base_url=f"/admin/teams/{team_id}/members",
1242 query_params={},
1243 )
1244 self.db.commit()
1245 memberships = pag_result["data"]
1246 tuples = [(m.user, m) for m in memberships]
1247 return {
1248 "data": tuples,
1249 "pagination": pag_result["pagination"],
1250 "links": pag_result["links"],
1251 }
1253 # CURSOR-BASED PAGINATION (API) - custom implementation using (joined_at, id)
1254 # unified_paginate uses created_at which doesn't exist on EmailTeamMember
1256 # Order by joined_at DESC, id DESC for keyset pagination
1257 query = query.order_by(desc(EmailTeamMember.joined_at), desc(EmailTeamMember.id))
1259 # Decode cursor and apply keyset filter
1260 if cursor:
1261 try:
1262 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
1263 cursor_data = orjson.loads(cursor_json)
1264 last_id = cursor_data.get("id")
1265 joined_str = cursor_data.get("joined_at")
1266 if last_id and joined_str:
1267 last_joined = datetime.fromisoformat(joined_str)
1268 # Keyset filter: (joined_at < last) OR (joined_at = last AND id < last_id)
1269 query = query.where(
1270 or_(
1271 EmailTeamMember.joined_at < last_joined,
1272 and_(EmailTeamMember.joined_at == last_joined, EmailTeamMember.id < last_id),
1273 )
1274 )
1275 except (ValueError, TypeError) as e:
1276 logger.warning(f"Invalid cursor for team members, ignoring: {e}")
1278 # Fetch limit + 1 to check for more results (cap at max_page_size)
1279 page_size = min(limit or 50, settings.pagination_max_page_size)
1280 query = query.limit(page_size + 1)
1281 memberships = list(self.db.execute(query).scalars().all())
1283 # Check if there are more results
1284 has_more = len(memberships) > page_size
1285 if has_more:
1286 memberships = memberships[:page_size]
1288 # Generate next cursor using (joined_at, id)
1289 next_cursor = None
1290 if has_more and memberships:
1291 last_member = memberships[-1]
1292 cursor_data = {
1293 "joined_at": last_member.joined_at.isoformat() if last_member.joined_at else None,
1294 "id": last_member.id,
1295 }
1296 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
1298 self.db.commit()
1299 tuples = [(m.user, m) for m in memberships]
1300 return (tuples, next_cursor)
1302 except Exception as e:
1303 self.db.rollback()
1304 logger.error(f"Failed to get members for team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
1306 # Return appropriate empty response based on mode
1307 if page is not None:
1308 return {"data": [], "pagination": {"page": page, "per_page": per_page or 30, "total": 0, "has_next": False, "has_prev": False}, "links": None}
1310 if cursor is not None:
1311 return ([], None)
1313 return []
1315 def count_team_owners(self, team_id: str) -> int:
1316 """Count the number of owners in a team using SQL COUNT.
1318 This is more efficient than loading all members and counting in Python.
1320 Args:
1321 team_id: ID of the team
1323 Returns:
1324 int: Number of active owners in the team
1325 """
1326 count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
1327 self.db.commit() # Release transaction to avoid idle-in-transaction
1328 return count
1330 def _get_auth_cache(self):
1331 """Get auth cache instance lazily.
1333 Returns:
1334 AuthCache instance or None if unavailable.
1335 """
1336 try:
1337 return get_auth_cache()
1338 except ImportError:
1339 return None
1341 def _get_admin_stats_cache(self):
1342 """Get admin stats cache instance lazily.
1344 Returns:
1345 AdminStatsCache instance or None if unavailable.
1346 """
1347 try:
1348 # First-Party
1349 from mcpgateway.cache.admin_stats_cache import get_admin_stats_cache # pylint: disable=import-outside-toplevel
1351 return get_admin_stats_cache()
1352 except ImportError:
1353 return None
1355 async def get_user_role_in_team(self, user_email: str, team_id: str) -> Optional[str]:
1356 """Get a user's role in a specific team.
1358 Uses caching to reduce database queries (called 11+ times per team operation).
1360 Args:
1361 user_email: Email of the user
1362 team_id: ID of the team
1364 Returns:
1365 str: User's role or None if not a member
1367 Examples:
1368 Access control and permission checking.
1369 """
1370 # Check cache first
1371 cache = self._get_auth_cache()
1372 if cache:
1373 cached_role = await cache.get_user_role(user_email, team_id)
1374 if cached_role is not None:
1375 # Empty string means "not a member" (cached None)
1376 return cached_role if cached_role else None
1378 try:
1379 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).first()
1380 self.db.commit() # Release transaction to avoid idle-in-transaction
1382 role = membership.role if membership else None
1384 # Store in cache
1385 if cache:
1386 await cache.set_user_role(user_email, team_id, role)
1388 return role
1390 except Exception as e:
1391 self.db.rollback()
1392 logger.error(f"Failed to get role for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
1393 return None
1395 @staticmethod
1396 def _apply_team_list_filters(
1397 query: Any,
1398 *,
1399 include_personal: bool = False,
1400 include_inactive: bool = False,
1401 visibility_filter: Optional[str] = None,
1402 search_query: Optional[str] = None,
1403 personal_owner_email: Optional[str] = None,
1404 search_description: bool = False,
1405 ) -> Any:
1406 """Apply common filter predicates to a team list query.
1408 Args:
1409 query: SQLAlchemy select statement to filter.
1410 include_personal: Whether to include personal teams.
1411 include_inactive: Whether to include inactive teams.
1412 visibility_filter: Filter by visibility (private, public).
1413 search_query: Optional search term for name/slug (and description if *search_description*).
1414 personal_owner_email: When set (and *include_personal* is False), include this user's personal team.
1415 search_description: Whether to include ``EmailTeam.description`` in the search predicate.
1417 Returns:
1418 The filtered query.
1419 """
1420 if not include_personal:
1421 if personal_owner_email:
1422 query = query.where(
1423 or_(
1424 EmailTeam.is_personal.is_(False),
1425 and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email),
1426 )
1427 )
1428 else:
1429 query = query.where(EmailTeam.is_personal.is_(False))
1431 if not include_inactive:
1432 query = query.where(EmailTeam.is_active.is_(True))
1434 if visibility_filter:
1435 query = query.where(EmailTeam.visibility == visibility_filter)
1437 if search_query:
1438 search_term = f"%{search_query}%"
1439 predicates = [EmailTeam.name.ilike(search_term), EmailTeam.slug.ilike(search_term)]
1440 if search_description:
1441 predicates.append(EmailTeam.description.ilike(search_term))
1442 query = query.where(or_(*predicates))
1444 return query
1446 async def list_teams(
1447 self,
1448 # Unified pagination params
1449 limit: int = 100,
1450 offset: int = 0,
1451 cursor: Optional[str] = None,
1452 page: Optional[int] = None,
1453 per_page: int = 50,
1454 include_inactive: bool = False,
1455 visibility_filter: Optional[str] = None,
1456 base_url: Optional[str] = None,
1457 include_personal: bool = False,
1458 search_query: Optional[str] = None,
1459 personal_owner_email: Optional[str] = None,
1460 ) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
1461 """List teams with pagination support (cursor or page based).
1463 Args:
1464 limit: Max items for cursor pagination
1465 offset: Offset for legacy/cursor pagination
1466 cursor: Cursor token
1467 page: Page number (1-indexed)
1468 per_page: Items per page
1469 include_inactive: Whether to include inactive teams
1470 visibility_filter: Filter by visibility (private, team, public)
1471 base_url: Base URL for pagination links
1472 include_personal: Whether to include personal teams
1473 search_query: Search term for name/slug/description
1474 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams
1476 Returns:
1477 Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
1478 - Tuple (teams, next_cursor) if cursor/offset based
1479 - Dict {data, pagination, links} if page based
1480 """
1481 query = self._apply_team_list_filters(
1482 select(EmailTeam),
1483 include_personal=include_personal,
1484 include_inactive=include_inactive,
1485 visibility_filter=visibility_filter,
1486 search_query=search_query,
1487 personal_owner_email=personal_owner_email,
1488 search_description=True,
1489 )
1491 # Choose ordering based on pagination mode:
1492 # - Page-based (UI): alphabetical by name for user-friendly display
1493 # - Cursor-based (API): created_at DESC, id DESC to match unified_paginate expectations
1494 if page is not None:
1495 query = query.order_by(EmailTeam.name, EmailTeam.id)
1496 else:
1497 query = query.order_by(desc(EmailTeam.created_at), desc(EmailTeam.id))
1499 # Base URL for pagination links (default to admin partial if not provided)
1500 if not base_url:
1501 base_url = f"{settings.app_root_path}/admin/teams/partial"
1503 # Apply offset manually for legacy offset-based pagination if not using page or cursor
1504 if not page and not cursor and offset > 0:
1505 query = query.offset(offset)
1507 result = await unified_paginate(
1508 db=self.db,
1509 query=query,
1510 cursor=cursor,
1511 limit=limit,
1512 page=page,
1513 per_page=per_page,
1514 base_url=base_url,
1515 )
1516 self.db.commit() # Release transaction to avoid idle-in-transaction
1517 return result
1519 async def get_all_team_ids(
1520 self,
1521 include_inactive: bool = False,
1522 visibility_filter: Optional[str] = None,
1523 include_personal: bool = False,
1524 search_query: Optional[str] = None,
1525 personal_owner_email: Optional[str] = None,
1526 ) -> List[int]:
1527 """Get all team IDs matching criteria (unpaginated).
1529 Args:
1530 include_inactive: Whether to include inactive teams
1531 visibility_filter: Filter by visibility (private, team, public)
1532 include_personal: Whether to include personal teams
1533 search_query: Search term for name/slug
1534 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams
1536 Returns:
1537 List[int]: List of team IDs
1538 """
1539 query = self._apply_team_list_filters(
1540 select(EmailTeam.id),
1541 include_personal=include_personal,
1542 include_inactive=include_inactive,
1543 visibility_filter=visibility_filter,
1544 search_query=search_query,
1545 personal_owner_email=personal_owner_email,
1546 )
1548 result = self.db.execute(query)
1549 team_ids = [row[0] for row in result.all()]
1550 self.db.commit() # Release transaction to avoid idle-in-transaction
1551 return team_ids
1553 async def get_teams_count(
1554 self,
1555 include_inactive: bool = False,
1556 visibility_filter: Optional[str] = None,
1557 include_personal: bool = False,
1558 search_query: Optional[str] = None,
1559 personal_owner_email: Optional[str] = None,
1560 ) -> int:
1561 """Get total count of teams matching criteria.
1563 Args:
1564 include_inactive: Whether to include inactive teams
1565 visibility_filter: Filter by visibility (private, team, public)
1566 include_personal: Whether to include personal teams
1567 search_query: Search term for name/slug
1568 personal_owner_email: When set (and include_personal=False), includes this user's personal team in the count
1570 Returns:
1571 int: Total count of matching teams
1572 """
1573 query = self._apply_team_list_filters(
1574 select(func.count(EmailTeam.id)), # pylint: disable=not-callable
1575 include_personal=include_personal,
1576 include_inactive=include_inactive,
1577 visibility_filter=visibility_filter,
1578 search_query=search_query,
1579 personal_owner_email=personal_owner_email,
1580 )
1582 result = self.db.execute(query)
1583 count = result.scalar() or 0
1584 self.db.commit() # Release transaction to avoid idle-in-transaction
1585 return count
1587 async def discover_public_teams(self, user_email: str, skip: int = 0, limit: Optional[int] = None) -> List[EmailTeam]:
1588 """Discover public teams that user can join.
1590 Args:
1591 user_email: Email of the user discovering teams
1592 skip: Number of teams to skip for pagination
1593 limit: Maximum number of teams to return (None for unlimited)
1595 Returns:
1596 List[EmailTeam]: List of public teams user can join
1598 Raises:
1599 Exception: If discovery fails
1600 """
1601 try:
1602 # Optimized: Use subquery instead of loading all IDs into memory (2 queries → 1)
1603 user_team_subquery = select(EmailTeamMember.team_id).where(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).scalar_subquery()
1605 query = self.db.query(EmailTeam).filter(EmailTeam.visibility == "public", EmailTeam.is_active.is_(True), EmailTeam.is_personal.is_(False), ~EmailTeam.id.in_(user_team_subquery))
1607 query = query.offset(skip)
1608 if limit is not None:
1609 query = query.limit(limit)
1610 teams = query.all()
1611 self.db.commit() # Release transaction to avoid idle-in-transaction
1612 return teams
1614 except Exception as e:
1615 self.db.rollback()
1616 logger.error(f"Failed to discover public teams for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1617 return []
1619 async def create_join_request(self, team_id: str, user_email: str, message: Optional[str] = None) -> "EmailTeamJoinRequest":
1620 """Create a request to join a public team.
1622 Args:
1623 team_id: ID of the team to join
1624 user_email: Email of the user requesting to join
1625 message: Optional message to team owners
1627 Returns:
1628 EmailTeamJoinRequest: Created join request
1630 Raises:
1631 ValueError: If team not found, not public, or user already member/has pending request
1632 """
1633 try:
1634 # Validate team
1635 team = await self.get_team_by_id(team_id)
1636 if not team:
1637 raise ValueError("Team not found")
1639 if team.visibility != "public":
1640 raise ValueError("Can only request to join public teams")
1642 # Check if user is already a member
1643 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
1645 if existing_member:
1646 raise ValueError("User is already a member of this team")
1648 # Check max teams per user
1649 max_teams = getattr(settings, "max_teams_per_user", 50)
1650 if self._get_user_team_count(user_email) >= max_teams:
1651 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
1653 # Check for existing requests (any status)
1654 existing_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.user_email == user_email).first()
1656 if existing_request:
1657 if existing_request.status == "pending" and not existing_request.is_expired():
1658 raise ValueError("User already has a pending join request for this team")
1660 # Update existing request (cancelled, rejected, expired) to pending
1661 existing_request.message = message or ""
1662 existing_request.status = "pending"
1663 existing_request.requested_at = utc_now()
1664 existing_request.expires_at = utc_now() + timedelta(days=7)
1665 existing_request.reviewed_at = None
1666 existing_request.reviewed_by = None
1667 existing_request.notes = None
1668 join_request = existing_request
1669 else:
1670 # Create new join request
1671 join_request = EmailTeamJoinRequest(team_id=team_id, user_email=user_email, message=message, expires_at=utc_now() + timedelta(days=7))
1672 self.db.add(join_request)
1674 self.db.commit()
1675 self.db.refresh(join_request)
1677 logger.info(f"Created join request for user {SecurityValidator.sanitize_log_message(user_email)} to team {SecurityValidator.sanitize_log_message(team_id)}")
1678 return join_request
1680 except ValueError:
1681 self.db.rollback()
1682 raise
1683 except Exception as e:
1684 self.db.rollback()
1685 logger.error(f"Failed to create join request: {e}")
1686 raise
1688 async def list_join_requests(self, team_id: str) -> List["EmailTeamJoinRequest"]:
1689 """List pending join requests for a team.
1691 Args:
1692 team_id: ID of the team
1694 Returns:
1695 List[EmailTeamJoinRequest]: List of pending join requests
1696 """
1697 try:
1698 requests = (
1699 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.status == "pending").order_by(EmailTeamJoinRequest.requested_at.desc()).all()
1700 )
1701 return requests
1703 except Exception as e:
1704 logger.error(f"Failed to list join requests for team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
1705 return []
1707 async def approve_join_request(self, request_id: str, approved_by: str) -> Optional[EmailTeamMember]:
1708 """Approve a team join request.
1710 Args:
1711 request_id: ID of the join request
1712 approved_by: Email of the user approving the request
1714 Returns:
1715 EmailTeamMember: New team member or None if request not found
1717 Raises:
1718 ValueError: If request not found, expired, or already processed
1719 """
1720 try:
1721 # Get join request
1722 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first()
1724 if not join_request:
1725 raise ValueError("Join request not found or already processed")
1727 if join_request.is_expired():
1728 join_request.status = "expired"
1729 self.db.commit()
1730 raise ValueError("Join request has expired")
1732 # Check max teams per user
1733 max_teams = getattr(settings, "max_teams_per_user", 50)
1734 if self._get_user_team_count(join_request.user_email) >= max_teams:
1735 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
1737 # Check team member limit
1738 team = await self.get_team_by_id(join_request.team_id)
1739 if not team:
1740 raise ValueError(f"Team {join_request.team_id} not found or inactive")
1741 check_team_member_capacity(self.db, team)
1743 # Add user to team
1744 member = EmailTeamMember(team_id=join_request.team_id, user_email=join_request.user_email, role="member", invited_by=approved_by, joined_at=utc_now()) # New joiners are always members
1746 self.db.add(member)
1747 # Update join request status
1748 join_request.status = "approved"
1749 join_request.reviewed_at = utc_now()
1750 join_request.reviewed_by = approved_by
1752 self.db.flush()
1753 self._log_team_member_action(member.id, join_request.team_id, join_request.user_email, member.role, "added", approved_by)
1755 self.db.refresh(member)
1757 await self._assign_team_rbac_role(join_request.user_email, join_request.team_id, member.role, granted_by=approved_by)
1758 self._invalidate_membership_caches(join_request.user_email, join_request.team_id)
1759 await self.invalidate_team_member_count_cache(str(join_request.team_id))
1761 logger.info(f"Approved join request {request_id}: user {join_request.user_email} joined team {join_request.team_id}")
1762 return member
1764 except ValueError:
1765 self.db.rollback()
1766 raise
1767 except Exception as e:
1768 self.db.rollback()
1769 logger.error(f"Failed to approve join request {request_id}: {e}")
1770 raise
1772 async def reject_join_request(self, request_id: str, rejected_by: str) -> bool:
1773 """Reject a team join request.
1775 Args:
1776 request_id: ID of the join request
1777 rejected_by: Email of the user rejecting the request
1779 Returns:
1780 bool: True if request was rejected successfully
1782 Raises:
1783 ValueError: If request not found or already processed
1784 """
1785 try:
1786 # Get join request
1787 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first()
1789 if not join_request:
1790 raise ValueError("Join request not found or already processed")
1792 # Update join request status
1793 join_request.status = "rejected"
1794 join_request.reviewed_at = utc_now()
1795 join_request.reviewed_by = rejected_by
1797 self.db.commit()
1799 logger.info(f"Rejected join request {request_id}: user {join_request.user_email} for team {join_request.team_id}")
1800 return True
1802 except Exception as e:
1803 self.db.rollback()
1804 logger.error(f"Failed to reject join request {request_id}: {e}")
1805 raise
1807 async def get_user_join_requests(self, user_email: str, team_id: Optional[str] = None) -> List["EmailTeamJoinRequest"]:
1808 """Get join requests made by a user.
1810 Args:
1811 user_email: Email of the user
1812 team_id: Optional team ID to filter requests
1814 Returns:
1815 List[EmailTeamJoinRequest]: List of join requests made by the user
1817 Examples:
1818 Get all requests made by a user or for a specific team.
1819 """
1820 try:
1821 query = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email)
1823 if team_id:
1824 query = query.filter(EmailTeamJoinRequest.team_id == team_id)
1826 requests = query.all()
1827 return requests
1829 except Exception as e:
1830 logger.error(f"Failed to get join requests for user {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1831 return []
1833 async def cancel_join_request(self, request_id: str, user_email: str) -> bool:
1834 """Cancel a join request.
1836 Args:
1837 request_id: ID of the join request to cancel
1838 user_email: Email of the user canceling the request
1840 Returns:
1841 bool: True if canceled successfully, False otherwise
1843 Examples:
1844 Allow users to cancel their pending join requests.
1845 """
1846 try:
1847 # Get the join request
1848 join_request = (
1849 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.status == "pending").first()
1850 )
1852 if not join_request:
1853 logger.warning(f"Join request {request_id} not found for user {SecurityValidator.sanitize_log_message(user_email)} or not pending")
1854 return False
1856 # Update join request status
1857 join_request.status = "cancelled"
1858 join_request.reviewed_at = utc_now()
1859 join_request.reviewed_by = user_email
1861 self.db.commit()
1863 logger.info(f"Cancelled join request {request_id} by user {SecurityValidator.sanitize_log_message(user_email)}")
1864 return True
1866 except Exception as e:
1867 self.db.rollback()
1868 logger.error(f"Failed to cancel join request {request_id}: {e}")
1869 return False
1871 # ==================================================================================
1872 # Batch Query Methods (N+1 Query Elimination - Issue #1892)
1873 # ==================================================================================
1875 def get_member_counts_batch(self, team_ids: List[str]) -> Dict[str, int]:
1876 """Get member counts for multiple teams in a single query.
1878 This is a synchronous method following the existing service pattern.
1879 Note: Like other sync SQLAlchemy calls, this will block the event
1880 loop in async contexts. For typical team counts this is acceptable.
1882 Args:
1883 team_ids: List of team UUIDs
1885 Returns:
1886 Dict mapping team_id to member count
1888 Raises:
1889 Exception: Re-raises any database errors after rollback
1891 Examples:
1892 >>> from unittest.mock import Mock
1893 >>> service = TeamManagementService(Mock())
1894 >>> service.get_member_counts_batch([])
1895 {}
1896 """
1897 if not team_ids:
1898 return {}
1900 try:
1901 # Single query for all teams
1902 results = (
1903 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable
1904 .filter(EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True))
1905 .group_by(EmailTeamMember.team_id)
1906 .all()
1907 )
1909 self.db.commit() # Release transaction to avoid idle-in-transaction
1911 # Build result dict, defaulting to 0 for teams with no members
1912 counts = {str(row.team_id): row.count for row in results}
1913 return {tid: counts.get(tid, 0) for tid in team_ids}
1914 except Exception as e:
1915 self.db.rollback()
1916 logger.error(f"Failed to get member counts for teams: {e}")
1917 raise
1919 def get_user_roles_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[str]]:
1920 """Get a user's role in multiple teams in a single query.
1922 Args:
1923 user_email: Email of the user
1924 team_ids: List of team UUIDs
1926 Returns:
1927 Dict mapping team_id to role (or None if not a member)
1929 Raises:
1930 Exception: Re-raises any database errors after rollback
1931 """
1932 if not team_ids:
1933 return {}
1935 try:
1936 # Single query for all teams
1937 results = (
1938 self.db.query(EmailTeamMember.team_id, EmailTeamMember.role)
1939 .filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True))
1940 .all()
1941 )
1943 self.db.commit() # Release transaction to avoid idle-in-transaction
1945 # Build result dict - teams with no membership return None
1946 roles = {str(row.team_id): row.role for row in results}
1947 return {tid: roles.get(tid) for tid in team_ids}
1948 except Exception as e:
1949 self.db.rollback()
1950 logger.error(f"Failed to get user roles for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1951 raise
1953 def get_pending_join_requests_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[Any]]:
1954 """Get pending join requests for a user across multiple teams in a single query.
1956 Args:
1957 user_email: Email of the user
1958 team_ids: List of team UUIDs to check
1960 Returns:
1961 Dict mapping team_id to pending EmailTeamJoinRequest (or None if no pending request)
1963 Raises:
1964 Exception: Re-raises any database errors after rollback
1965 """
1966 if not team_ids:
1967 return {}
1969 try:
1970 # Single query for all pending requests across teams
1971 results = (
1972 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.team_id.in_(team_ids), EmailTeamJoinRequest.status == "pending").all()
1973 )
1975 self.db.commit() # Release transaction to avoid idle-in-transaction
1977 # Build result dict - only one pending request per team expected
1978 pending_reqs = {str(req.team_id): req for req in results}
1979 return {tid: pending_reqs.get(tid) for tid in team_ids}
1980 except Exception as e:
1981 self.db.rollback()
1982 logger.error(f"Failed to get pending join requests for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
1983 raise
1985 # ==================================================================================
1986 # Cached Batch Methods (Redis caching for member counts)
1987 # ==================================================================================
1989 def _get_member_count_cache_key(self, team_id: str) -> str:
1990 """Build cache key using settings.cache_prefix for consistency.
1992 Args:
1993 team_id: Team UUID to build cache key for
1995 Returns:
1996 Cache key string in format "{prefix}team:member_count:{team_id}"
1997 """
1998 cache_prefix = getattr(settings, "cache_prefix", "mcpgw:")
1999 return f"{cache_prefix}team:member_count:{team_id}"
2001 async def get_member_counts_batch_cached(self, team_ids: List[str]) -> Dict[str, int]:
2002 """Get member counts for multiple teams, using Redis cache with DB fallback.
2004 Caching behavior is controlled by settings:
2005 - team_member_count_cache_enabled: Enable/disable caching (default: True)
2006 - team_member_count_cache_ttl: Cache TTL in seconds (default: 300)
2008 Args:
2009 team_ids: List of team UUIDs
2011 Returns:
2012 Dict mapping team_id to member count
2014 Raises:
2015 Exception: Re-raises any database errors after rollback
2016 """
2017 if not team_ids:
2018 return {}
2020 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True)
2021 cache_ttl = getattr(settings, "team_member_count_cache_ttl", 300)
2023 # If caching disabled, go straight to batch DB query
2024 if not cache_enabled:
2025 return self.get_member_counts_batch(team_ids)
2027 try:
2028 redis_client = await get_redis_client()
2029 except Exception:
2030 redis_client = None
2032 result: Dict[str, int] = {}
2033 cache_misses: List[str] = []
2035 # Step 1: Check Redis cache for all team IDs
2036 if redis_client:
2037 try:
2038 cache_keys = [self._get_member_count_cache_key(tid) for tid in team_ids]
2039 cached_values = await redis_client.mget(cache_keys)
2041 for tid, cached in zip(team_ids, cached_values):
2042 if cached is not None:
2043 result[tid] = int(cached)
2044 else:
2045 cache_misses.append(tid)
2046 except Exception as e:
2047 logger.warning(f"Redis cache read failed, falling back to DB: {e}")
2048 cache_misses = list(team_ids)
2049 else:
2050 # No Redis available, fall back to DB
2051 cache_misses = list(team_ids)
2053 # Step 2: Query database for cache misses
2054 if cache_misses:
2055 try:
2056 db_results = (
2057 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable
2058 .filter(EmailTeamMember.team_id.in_(cache_misses), EmailTeamMember.is_active.is_(True))
2059 .group_by(EmailTeamMember.team_id)
2060 .all()
2061 )
2063 self.db.commit()
2065 db_counts = {str(row.team_id): row.count for row in db_results}
2067 # Fill in results and cache them
2068 for tid in cache_misses:
2069 count = db_counts.get(tid, 0)
2070 result[tid] = count
2072 # Step 3: Cache the result with configured TTL
2073 if redis_client:
2074 try:
2075 await redis_client.setex(self._get_member_count_cache_key(tid), cache_ttl, str(count))
2076 except Exception as e:
2077 logger.warning(f"Redis cache write failed for team {tid}: {e}")
2079 except Exception as e:
2080 self.db.rollback()
2081 logger.error(f"Failed to get member counts for teams: {e}")
2082 raise
2084 return result
2086 async def invalidate_team_member_count_cache(self, team_id: str) -> None:
2087 """Invalidate the cached member count for a team.
2089 Call this after any membership changes (add/remove/update).
2090 No-op if caching is disabled or Redis unavailable.
2092 Args:
2093 team_id: Team UUID to invalidate
2094 """
2095 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True)
2096 if not cache_enabled:
2097 return
2099 try:
2100 redis_client = await get_redis_client()
2101 if redis_client:
2102 await redis_client.delete(self._get_member_count_cache_key(team_id))
2103 except Exception as e:
2104 logger.warning(f"Failed to invalidate member count cache for team {SecurityValidator.sanitize_log_message(team_id)}: {e}")