Coverage for mcpgateway / services / team_management_service.py: 98%
782 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/team_management_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Management Service.
8This module provides team creation, management, and membership operations
9for the multi-team collaboration system.
11Examples:
12 >>> from unittest.mock import Mock
13 >>> service = TeamManagementService(Mock())
14 >>> isinstance(service, TeamManagementService)
15 True
16 >>> hasattr(service, 'db')
17 True
18"""
20# Standard
21import asyncio
22import base64
23from datetime import datetime, timedelta
24from typing import Any, Dict, List, Optional, Tuple, Union
26# Third-Party
27import orjson
28from sqlalchemy import and_, desc, func, or_, select
29from sqlalchemy.orm import selectinload, Session
31# First-Party
32from mcpgateway.cache.admin_stats_cache import admin_stats_cache
33from mcpgateway.cache.auth_cache import auth_cache, get_auth_cache
34from mcpgateway.config import settings
35from mcpgateway.db import EmailTeam, EmailTeamJoinRequest, EmailTeamMember, EmailTeamMemberHistory, EmailUser, utc_now
36from mcpgateway.services.logging_service import LoggingService
37from mcpgateway.utils.create_slug import slugify
38from mcpgateway.utils.pagination import unified_paginate
39from mcpgateway.utils.redis_client import get_redis_client
41# Initialize logging
42logging_service = LoggingService()
43logger = logging_service.get_logger(__name__)
46def get_user_team_count(db: Session, user_email: str) -> int:
47 """Get the number of active teams a user belongs to.
49 Args:
50 db: SQLAlchemy database session
51 user_email: Email address of the user
53 Returns:
54 int: Number of active team memberships
55 """
56 return db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).count()
59class TeamManagementError(Exception):
60 """Base class for team management-related errors.
62 Examples:
63 >>> error = TeamManagementError("Test error")
64 >>> str(error)
65 'Test error'
66 >>> isinstance(error, Exception)
67 True
68 """
71class InvalidRoleError(TeamManagementError):
72 """Raised when an invalid role is specified.
74 Examples:
75 >>> error = InvalidRoleError("Invalid role: guest")
76 >>> str(error)
77 'Invalid role: guest'
78 >>> isinstance(error, TeamManagementError)
79 True
80 """
83class TeamNotFoundError(TeamManagementError):
84 """Raised when a team does not exist.
86 Examples:
87 >>> error = TeamNotFoundError("Team not found: team-123")
88 >>> str(error)
89 'Team not found: team-123'
90 >>> isinstance(error, TeamManagementError)
91 True
92 """
95class UserNotFoundError(TeamManagementError):
96 """Raised when a user does not exist.
98 Examples:
99 >>> error = UserNotFoundError("User not found: user@example.com")
100 >>> str(error)
101 'User not found: user@example.com'
102 >>> isinstance(error, TeamManagementError)
103 True
104 """
107class MemberAlreadyExistsError(TeamManagementError):
108 """Raised when a user is already a member of the team.
110 Examples:
111 >>> error = MemberAlreadyExistsError("User user@example.com is already a member of team team-123")
112 >>> str(error)
113 'User user@example.com is already a member of team team-123'
114 >>> isinstance(error, TeamManagementError)
115 True
116 """
119class TeamMemberLimitExceededError(TeamManagementError):
120 """Raised when a team has reached its maximum member limit.
122 Examples:
123 >>> error = TeamMemberLimitExceededError("Team has reached maximum member limit of 10")
124 >>> str(error)
125 'Team has reached maximum member limit of 10'
126 >>> isinstance(error, TeamManagementError)
127 True
128 """
131class TeamMemberAddError(TeamManagementError):
132 """Raised when adding a member to a team fails due to database or system errors.
134 Examples:
135 >>> error = TeamMemberAddError("Failed to add member due to database error")
136 >>> str(error)
137 'Failed to add member due to database error'
138 >>> isinstance(error, TeamManagementError)
139 True
140 """
143class TeamManagementService:
144 """Service for team management operations.
146 This service handles team creation, membership management,
147 role assignments, and team access control.
149 Attributes:
150 db (Session): SQLAlchemy database session
152 Examples:
153 >>> from unittest.mock import Mock
154 >>> service = TeamManagementService(Mock())
155 >>> service.__class__.__name__
156 'TeamManagementService'
157 >>> hasattr(service, 'db')
158 True
159 """
161 def __init__(self, db: Session):
162 """Initialize the team management service.
164 Args:
165 db: SQLAlchemy database session
167 Examples:
168 Basic initialization:
169 >>> from mcpgateway.services.team_management_service import TeamManagementService
170 >>> from unittest.mock import Mock
171 >>> db_session = Mock()
172 >>> service = TeamManagementService(db_session)
173 >>> service.db is db_session
174 True
176 Service attributes:
177 >>> hasattr(service, 'db')
178 True
179 >>> service.__class__.__name__
180 'TeamManagementService'
181 """
182 self.db = db
183 self._role_service = None # Lazy initialization to avoid circular imports
185 @property
186 def role_service(self):
187 """Lazy-initialized RoleService to avoid circular imports.
189 Returns:
190 RoleService: Instance of RoleService
191 """
192 if self._role_service is None:
193 # First-Party
194 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
196 self._role_service = RoleService(self.db)
197 return self._role_service
199 def _get_user_team_count(self, user_email: str) -> int:
200 """Get the number of active teams a user belongs to.
202 Args:
203 user_email: Email address of the user
205 Returns:
206 int: Number of active team memberships
207 """
208 return get_user_team_count(self.db, user_email)
210 @staticmethod
211 def _get_rbac_role_name(membership_role: str) -> str:
212 """Map a team membership role to the corresponding configurable RBAC role name.
214 Args:
215 membership_role: Team membership role ('owner' or 'member').
217 Returns:
218 str: The configured RBAC role name from settings.
219 """
220 return settings.default_team_owner_role if membership_role == "owner" else settings.default_team_member_role
222 @staticmethod
223 def _fire_and_forget(coro: Any) -> None:
224 """Schedule a background coroutine and close it if scheduling fails.
226 Args:
227 coro: The coroutine to schedule as a background task.
229 Raises:
230 Exception: If asyncio.create_task fails (e.g. no running loop).
231 """
232 try:
233 task = asyncio.create_task(coro)
234 # Some tests patch create_task with a plain Mock return value. In that
235 # case the coroutine is never actually scheduled and must be closed.
236 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task):
237 close = getattr(coro, "close", None)
238 if callable(close):
239 close()
240 except Exception:
241 # If create_task() fails (e.g. no running loop), the coroutine has
242 # already been created and must be closed to avoid runtime warnings.
243 close = getattr(coro, "close", None)
244 if callable(close):
245 close()
246 raise
248 def _log_team_member_action(self, team_member_id: str, team_id: str, user_email: str, role: str, action: str, action_by: Optional[str]):
249 """
250 Log a team member action to EmailTeamMemberHistory.
252 Args:
253 team_member_id: ID of the EmailTeamMember
254 team_id: Team ID
255 user_email: Email of the affected user
256 role: Role at the time of action
257 action: Action type ("added", "removed", "reactivated", "role_changed")
258 action_by: Email of the user who performed the action
260 Examples:
261 >>> from mcpgateway.services.team_management_service import TeamManagementService
262 >>> from unittest.mock import Mock
263 >>> service = TeamManagementService(Mock())
264 >>> service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com")
265 """
266 history = EmailTeamMemberHistory(team_member_id=team_member_id, team_id=team_id, user_email=user_email, role=role, action=action, action_by=action_by, action_timestamp=utc_now())
267 self.db.add(history)
268 self.db.commit()
270 async def create_team(
271 self, name: str, description: Optional[str], created_by: str, visibility: Optional[str] = "public", max_members: Optional[int] = None, skip_limits: bool = False
272 ) -> EmailTeam:
273 """Create a new team.
275 Args:
276 name: Team name
277 description: Team description
278 created_by: Email of the user creating the team
279 visibility: Team visibility (private, team, public)
280 max_members: Maximum number of team members allowed
281 skip_limits: Skip max_teams_per_user check (for admin bypass)
283 Returns:
284 EmailTeam: The created team
286 Raises:
287 ValueError: If team name is taken or invalid
288 Exception: If team creation fails
290 Examples:
291 Team creation parameter validation:
292 >>> from mcpgateway.services.team_management_service import TeamManagementService
294 Test team name validation:
295 >>> team_name = "My Development Team"
296 >>> len(team_name) > 0
297 True
298 >>> len(team_name) <= 255
299 True
300 >>> bool(team_name.strip())
301 True
303 Test visibility validation:
304 >>> visibility = "private"
305 >>> valid_visibilities = ["private", "public"]
306 >>> visibility in valid_visibilities
307 True
308 >>> "invalid" in valid_visibilities
309 False
311 Test max_members validation:
312 >>> max_members = 50
313 >>> isinstance(max_members, int)
314 True
315 >>> max_members > 0
316 True
318 Test creator validation:
319 >>> created_by = "admin@example.com"
320 >>> "@" in created_by
321 True
322 >>> len(created_by) > 0
323 True
325 Test description handling:
326 >>> description = "A team for software development"
327 >>> description is not None
328 True
329 >>> isinstance(description, str)
330 True
332 >>> # Test None description
333 >>> description_none = None
334 >>> description_none is None
335 True
336 """
337 try:
338 # Validate visibility
339 valid_visibilities = ["private", "public"]
340 if visibility not in valid_visibilities:
341 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}")
343 # Check max teams per user
344 if not skip_limits:
345 max_teams = getattr(settings, "max_teams_per_user", 50)
346 if self._get_user_team_count(created_by) >= max_teams:
347 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
349 # Apply default max members from settings
350 if max_members is None:
351 max_members = getattr(settings, "max_members_per_team", 100)
353 # Check for existing inactive team with same name
355 potential_slug = slugify(name)
356 existing_inactive_team = self.db.query(EmailTeam).filter(EmailTeam.slug == potential_slug, EmailTeam.is_active.is_(False)).first()
358 if existing_inactive_team:
359 # Reactivate the existing team with new details
360 existing_inactive_team.name = name
361 existing_inactive_team.description = description
362 existing_inactive_team.created_by = created_by
363 existing_inactive_team.visibility = visibility
364 existing_inactive_team.max_members = max_members
365 existing_inactive_team.is_active = True
366 existing_inactive_team.updated_at = utc_now()
367 team = existing_inactive_team
369 # Check if the creator already has an inactive membership
370 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email == created_by).first()
372 if existing_membership:
373 # Reactivate existing membership as owner
374 existing_membership.role = "owner"
375 existing_membership.joined_at = utc_now()
376 existing_membership.is_active = True
377 membership = existing_membership
378 else:
379 # Create new membership
380 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True)
381 self.db.add(membership)
383 logger.info(f"Reactivated existing team with slug {potential_slug}")
384 else:
385 # Create the team (slug will be auto-generated by event listener)
386 team = EmailTeam(name=name, description=description, created_by=created_by, is_personal=False, visibility=visibility, max_members=max_members, is_active=True)
387 self.db.add(team)
389 self.db.flush() # Get the team ID
391 # Add the creator as owner
392 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True)
393 self.db.add(membership)
395 self.db.commit()
397 # Invalidate member count cache for the new team
398 await self.invalidate_team_member_count_cache(str(team.id))
400 # Invalidate auth cache for creator's team membership
401 # Without this, the cache won't know the user belongs to this new team
402 try:
403 await auth_cache.invalidate_user_teams(created_by)
404 await auth_cache.invalidate_team_membership(created_by)
405 await auth_cache.invalidate_user_role(created_by, str(team.id))
406 await admin_stats_cache.invalidate_teams()
407 except Exception as cache_error:
408 logger.debug(f"Failed to invalidate cache on team create: {cache_error}")
410 logger.info(f"Created team '{team.name}' by {created_by}")
411 return team
413 except Exception as e:
414 self.db.rollback()
415 logger.error(f"Failed to create team '{name}': {e}")
416 raise
418 async def get_team_by_id(self, team_id: str) -> Optional[EmailTeam]:
419 """Get a team by ID.
421 Args:
422 team_id: Team ID to lookup
424 Returns:
425 EmailTeam: The team or None if not found
427 Examples:
428 >>> import asyncio
429 >>> from unittest.mock import Mock
430 >>> service = TeamManagementService(Mock())
431 >>> asyncio.iscoroutinefunction(service.get_team_by_id)
432 True
433 """
434 try:
435 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
436 self.db.commit() # Release transaction to avoid idle-in-transaction
437 return team
439 except Exception as e:
440 self.db.rollback()
441 logger.error(f"Failed to get team by ID {team_id}: {e}")
442 return None
444 async def get_team_by_slug(self, slug: str) -> Optional[EmailTeam]:
445 """Get a team by slug.
447 Args:
448 slug: Team slug to lookup
450 Returns:
451 EmailTeam: The team or None if not found
453 Examples:
454 >>> import asyncio
455 >>> from unittest.mock import Mock
456 >>> service = TeamManagementService(Mock())
457 >>> asyncio.iscoroutinefunction(service.get_team_by_slug)
458 True
459 """
460 try:
461 team = self.db.query(EmailTeam).filter(EmailTeam.slug == slug, EmailTeam.is_active.is_(True)).first()
462 self.db.commit() # Release transaction to avoid idle-in-transaction
463 return team
465 except Exception as e:
466 self.db.rollback()
467 logger.error(f"Failed to get team by slug {slug}: {e}")
468 return None
470 async def update_team(
471 self, team_id: str, name: Optional[str] = None, description: Optional[str] = None, visibility: Optional[str] = None, max_members: Optional[int] = None, updated_by: Optional[str] = None
472 ) -> bool:
473 """Update team information.
475 Args:
476 team_id: ID of the team to update
477 name: New team name
478 description: New team description
479 visibility: New visibility setting
480 max_members: New maximum member limit
481 updated_by: Email of user making the update
483 Returns:
484 bool: True if update succeeded, False otherwise
486 Raises:
487 ValueError: If visibility setting is invalid
489 Examples:
490 >>> import asyncio
491 >>> from unittest.mock import Mock
492 >>> service = TeamManagementService(Mock())
493 >>> asyncio.iscoroutinefunction(service.update_team)
494 True
495 """
496 try:
497 team = await self.get_team_by_id(team_id)
498 if not team:
499 logger.warning(f"Team {team_id} not found for update")
500 return False
502 # Prevent updating personal teams
503 if team.is_personal:
504 logger.warning(f"Cannot update personal team {team_id}")
505 return False
507 # Update fields if provided
508 if name is not None:
509 team.name = name
510 # Slug will be updated by event listener if name changes
512 if description is not None:
513 team.description = description
515 if visibility is not None:
516 valid_visibilities = ["private", "public"]
517 if visibility not in valid_visibilities:
518 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}")
519 team.visibility = visibility
521 if max_members is not None:
522 team.max_members = max_members
524 team.updated_at = utc_now()
525 self.db.commit()
527 logger.info(f"Updated team {team_id} by {updated_by}")
528 return True
530 except ValueError:
531 raise # Let ValueError propagate to caller for proper error handling
532 except Exception as e:
533 self.db.rollback()
534 logger.error(f"Failed to update team {team_id}: {e}")
535 return False
537 async def delete_team(self, team_id: str, deleted_by: str) -> bool:
538 """Delete a team (soft delete).
540 Args:
541 team_id: ID of the team to delete
542 deleted_by: Email of user performing deletion
544 Returns:
545 bool: True if deletion succeeded, False otherwise
547 Raises:
548 ValueError: If attempting to delete a personal team
550 Examples:
551 >>> import asyncio
552 >>> from unittest.mock import Mock
553 >>> service = TeamManagementService(Mock())
554 >>> asyncio.iscoroutinefunction(service.delete_team)
555 True
556 """
557 try:
558 team = await self.get_team_by_id(team_id)
559 if not team:
560 logger.warning(f"Team {team_id} not found for deletion")
561 return False
563 # Prevent deleting personal teams
564 if team.is_personal:
565 logger.warning(f"Cannot delete personal team {team_id}")
566 raise ValueError("Personal teams cannot be deleted")
568 # Soft delete the team
569 team.is_active = False
570 team.updated_at = utc_now()
572 # Get all active memberships before deactivating (for history logging)
573 memberships = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).all()
575 # Log history for each membership (before bulk update)
576 for membership in memberships:
577 self._log_team_member_action(membership.id, team_id, membership.user_email, membership.role, "team-deleted", deleted_by)
579 # Bulk update: deactivate all memberships in single query instead of looping
580 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).update({EmailTeamMember.is_active: False}, synchronize_session=False)
582 self.db.commit()
584 # Invalidate all role caches for this team
585 try:
586 self._fire_and_forget(auth_cache.invalidate_team_roles(team_id))
587 self._fire_and_forget(admin_stats_cache.invalidate_teams())
588 # Also invalidate team cache, teams list cache, and team membership cache for each member
589 for membership in memberships:
590 self._fire_and_forget(auth_cache.invalidate_team(membership.user_email))
591 self._fire_and_forget(auth_cache.invalidate_user_teams(membership.user_email))
592 self._fire_and_forget(auth_cache.invalidate_team_membership(membership.user_email))
593 except Exception as cache_error:
594 logger.debug(f"Failed to invalidate caches on team delete: {cache_error}")
596 logger.info(f"Deleted team {team_id} by {deleted_by}")
597 return True
599 except Exception as e:
600 self.db.rollback()
601 logger.error(f"Failed to delete team {team_id}: {e}")
602 return False
604 async def add_member_to_team(self, team_id: str, user_email: str, role: str = "member", invited_by: Optional[str] = None) -> EmailTeamMember:
605 """Add a member to a team.
607 Args:
608 team_id: ID of the team
609 user_email: Email of the user to add
610 role: Role to assign (owner, member)
611 invited_by: Email of user who added this member
613 Returns:
614 EmailTeamMember: The created or reactivated team member object
616 Raises:
617 InvalidRoleError: If role is invalid
618 TeamNotFoundError: If team does not exist
619 TeamManagementError: If team is a personal team
620 UserNotFoundError: If user does not exist
621 MemberAlreadyExistsError: If user is already a member
622 TeamMemberLimitExceededError: If team has reached maximum member limit
623 TeamMemberAddError: If adding member fails due to database or system errors
625 Examples:
626 >>> import asyncio
627 >>> from unittest.mock import Mock
628 >>> service = TeamManagementService(Mock())
629 >>> asyncio.iscoroutinefunction(service.add_member_to_team)
630 True
631 >>> # After adding, EmailTeamMemberHistory is updated
632 >>> # service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com")
633 """
634 # Validate role
635 valid_roles = ["owner", "member"]
636 if role not in valid_roles:
637 raise InvalidRoleError(f"Invalid role '{role}'. Must be one of: {', '.join(valid_roles)}")
639 # Check if team exists
640 team = await self.get_team_by_id(team_id)
641 if not team:
642 logger.warning(f"Team {team_id} not found")
643 raise TeamNotFoundError("Team not found")
645 # Prevent adding members to personal teams
646 if team.is_personal:
647 logger.warning(f"Cannot add members to personal team {team_id}")
648 raise TeamManagementError("Cannot add members to personal teams")
650 # Check if user exists
651 user = self.db.query(EmailUser).filter(EmailUser.email == user_email).first()
652 if not user:
653 logger.warning(f"User {user_email} not found")
654 raise UserNotFoundError("User not found")
656 # Check max teams per user
657 max_teams = getattr(settings, "max_teams_per_user", 50)
658 if self._get_user_team_count(user_email) >= max_teams:
659 raise TeamManagementError(f"User has reached the maximum team limit of {max_teams}")
661 # Check if user is already a member
662 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email).first()
664 if existing_membership and existing_membership.is_active:
665 logger.warning(f"User {user_email} is already a member of team {team_id}")
666 raise MemberAlreadyExistsError("User is already a member of this team")
668 # Check team member limit
669 if team.max_members:
670 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count()
672 if current_member_count >= team.max_members:
673 logger.warning(f"Team {team_id} has reached maximum member limit of {team.max_members}")
674 raise TeamMemberLimitExceededError(f"Team has reached maximum member limit of {team.max_members}")
676 # Add or reactivate membership
677 try:
678 if existing_membership:
679 existing_membership.is_active = True
680 existing_membership.role = role
681 existing_membership.joined_at = utc_now()
682 existing_membership.invited_by = invited_by
683 self.db.commit()
684 self._log_team_member_action(existing_membership.id, team_id, user_email, role, "reactivated", invited_by)
685 member = existing_membership
686 else:
687 membership = EmailTeamMember(team_id=team_id, user_email=user_email, role=role, joined_at=utc_now(), invited_by=invited_by, is_active=True)
688 self.db.add(membership)
689 self.db.commit()
690 self._log_team_member_action(membership.id, team_id, user_email, role, "added", invited_by)
691 member = membership
693 # Assign team-scoped RBAC role matching the membership role (owner or member)
694 try:
695 rbac_role_name = self._get_rbac_role_name(role)
696 team_rbac_role = await self.role_service.get_role_by_name(rbac_role_name, scope="team")
697 if team_rbac_role:
698 existing = await self.role_service.get_user_role_assignment(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id)
699 if not existing or not existing.is_active:
700 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id, granted_by=invited_by or user_email)
701 logger.info(f"Assigned {rbac_role_name} role to {user_email} for team {team_id}")
702 else:
703 logger.debug(f"User {user_email} already has active {rbac_role_name} role for team {team_id}")
704 else:
705 logger.warning(f"Role '{rbac_role_name}' not found. User {user_email} added without RBAC role.")
706 except Exception as role_error:
707 logger.warning(f"Failed to assign role to {user_email}: {role_error}")
709 # Invalidate auth cache for user's team membership and role
710 try:
711 self._fire_and_forget(auth_cache.invalidate_team(user_email))
712 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id))
713 self._fire_and_forget(auth_cache.invalidate_user_teams(user_email))
714 self._fire_and_forget(auth_cache.invalidate_team_membership(user_email))
715 self._fire_and_forget(admin_stats_cache.invalidate_teams())
716 except Exception as cache_error:
717 logger.debug(f"Failed to invalidate cache on team add: {cache_error}")
719 # Invalidate member count cache for this team
720 await self.invalidate_team_member_count_cache(str(team_id))
722 logger.info(f"Added {user_email} to team {team_id} with role {role}")
723 return member
725 except Exception as e:
726 self.db.rollback()
727 logger.error(f"Failed to add {user_email} to team {team_id}: {e}")
728 raise TeamMemberAddError("Failed to add member to team") from e
730 async def remove_member_from_team(self, team_id: str, user_email: str, removed_by: Optional[str] = None) -> bool:
731 """Remove a member from a team.
733 Args:
734 team_id: ID of the team
735 user_email: Email of the user to remove
736 removed_by: Email of user performing the removal
738 Returns:
739 bool: True if member was removed successfully, False otherwise
741 Raises:
742 ValueError: If attempting to remove the last owner
744 Examples:
745 Team membership management with role-based access control.
746 After removal, EmailTeamMemberHistory is updated via _log_team_member_action.
747 """
748 try:
749 team = await self.get_team_by_id(team_id)
750 if not team:
751 logger.warning(f"Team {team_id} not found")
752 return False
754 # Prevent removing members from personal teams
755 if team.is_personal:
756 logger.warning(f"Cannot remove members from personal team {team_id}")
757 return False
759 # Find the membership
760 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
762 if not membership:
763 logger.warning(f"User {user_email} is not a member of team {team_id}")
764 return False
766 # Prevent removing the last owner
767 if membership.role == "owner":
768 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
770 if owner_count <= 1:
771 logger.warning(f"Cannot remove the last owner from team {team_id}")
772 raise ValueError("Cannot remove the last owner from a team")
774 # Remove membership (soft delete)
775 membership.is_active = False
776 self.db.commit()
777 self._log_team_member_action(membership.id, team_id, user_email, membership.role, "removed", removed_by)
779 # Revoke all team-scoped RBAC roles from removed member defensively
780 # (revoke both owner and member roles to handle edge cases)
781 try:
782 for role_name in (settings.default_team_owner_role, settings.default_team_member_role):
783 rbac_role = await self.role_service.get_role_by_name(role_name, scope="team")
784 if rbac_role:
785 revoked = await self.role_service.revoke_role_from_user(user_email=user_email, role_id=rbac_role.id, scope="team", scope_id=team_id)
786 if revoked:
787 logger.info(f"Revoked {role_name} role from {user_email} for team {team_id}")
788 except Exception as role_error:
789 logger.warning(f"Failed to revoke roles from {user_email}: {role_error}")
791 # Invalidate auth cache for user's team membership and role
792 try:
793 self._fire_and_forget(auth_cache.invalidate_team(user_email))
794 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id))
795 self._fire_and_forget(auth_cache.invalidate_user_teams(user_email))
796 self._fire_and_forget(auth_cache.invalidate_team_membership(user_email))
797 except Exception as cache_error:
798 logger.debug(f"Failed to invalidate cache on team remove: {cache_error}")
800 # Invalidate member count cache for this team
801 await self.invalidate_team_member_count_cache(str(team_id))
803 logger.info(f"Removed {user_email} from team {team_id} by {removed_by}")
804 return True
806 except Exception as e:
807 self.db.rollback()
808 logger.error(f"Failed to remove {user_email} from team {team_id}: {e}")
809 return False
811 async def update_member_role(self, team_id: str, user_email: str, new_role: str, updated_by: Optional[str] = None) -> bool:
812 """Update a team member's role.
814 Args:
815 team_id: ID of the team
816 user_email: Email of the user whose role to update
817 new_role: New role to assign
818 updated_by: Email of user making the change
820 Returns:
821 bool: True if role was updated successfully, False otherwise
823 Raises:
824 ValueError: If role is invalid or removing last owner role
826 Examples:
827 Role management within teams for access control.
828 After role update, EmailTeamMemberHistory is updated via _log_team_member_action.
829 """
830 try:
831 # Validate role
832 valid_roles = ["owner", "member"]
833 if new_role not in valid_roles:
834 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
836 team = await self.get_team_by_id(team_id)
837 if not team:
838 logger.warning(f"Team {team_id} not found")
839 return False
841 # Prevent updating roles in personal teams
842 if team.is_personal:
843 logger.warning(f"Cannot update roles in personal team {team_id}")
844 return False
846 # Find the membership
847 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
849 if not membership:
850 logger.warning(f"User {user_email} is not a member of team {team_id}")
851 return False
853 # Prevent changing the role of the last owner to non-owner
854 if membership.role == "owner" and new_role != "owner":
855 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
857 if owner_count <= 1:
858 logger.warning(f"Cannot remove owner role from the last owner of team {team_id}")
859 raise ValueError("Cannot remove owner role from the last owner of a team")
861 # Update the role
862 old_role = membership.role
863 membership.role = new_role
864 self.db.commit()
865 self._log_team_member_action(membership.id, team_id, user_email, new_role, "role_changed", updated_by)
867 # Handle RBAC role changes when team membership role changes
868 if old_role != new_role:
869 try:
870 # Get both role types
871 team_member_role = await self.role_service.get_role_by_name(settings.default_team_member_role, scope="team")
872 team_owner_role = await self.role_service.get_role_by_name(settings.default_team_owner_role, scope="team")
874 # Handle role transitions
875 if old_role == "member" and new_role == "owner":
876 # member -> owner: revoke member role, assign owner role
877 if team_member_role:
878 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id)
879 if team_owner_role:
880 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email)
881 logger.info(f"Transitioned RBAC role from {settings.default_team_member_role} to {settings.default_team_owner_role} for {user_email} in team {team_id}")
883 elif old_role == "owner" and new_role == "member":
884 # owner -> member: revoke owner role, assign member role
885 if team_owner_role:
886 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id)
887 if team_member_role:
888 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email)
889 logger.info(f"Transitioned RBAC role from {settings.default_team_owner_role} to {settings.default_team_member_role} for {user_email} in team {team_id}")
891 except Exception as role_error:
892 logger.warning(f"Failed to update RBAC roles for {user_email} in team {team_id}: {role_error}")
893 # Don't fail the membership role update if RBAC role update fails
895 # Invalidate role cache
896 try:
897 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id))
898 except Exception as cache_error:
899 logger.debug(f"Failed to invalidate cache on role update: {cache_error}")
901 logger.info(f"Updated role of {user_email} in team {team_id} to {new_role} by {updated_by}")
902 return True
904 except ValueError:
905 raise # Let ValueError propagate to caller for proper error handling
906 except Exception as e:
907 self.db.rollback()
908 logger.error(f"Failed to update role of {user_email} in team {team_id}: {e}")
909 return False
911 async def get_member(self, team_id: str, user_email: str) -> Optional[EmailTeamMember]:
912 """Get a single team member by team ID and user email.
914 Args:
915 team_id: ID of the team
916 user_email: Email of the user
918 Returns:
919 EmailTeamMember if found and active, None otherwise
920 """
921 try:
922 return self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
923 except Exception as e:
924 logger.error(f"Failed to get member {user_email} in team {team_id}: {e}")
925 return None
927 async def get_user_teams(self, user_email: str, include_personal: bool = True) -> List[EmailTeam]:
928 """Get all teams a user belongs to.
930 Uses caching to reduce database queries (called 20+ times per request).
931 Cache can be disabled via AUTH_CACHE_TEAMS_ENABLED=false.
933 Args:
934 user_email: Email of the user
935 include_personal: Whether to include personal teams
937 Returns:
938 List[EmailTeam]: List of teams the user belongs to
940 Examples:
941 User dashboard showing team memberships.
942 """
943 # Check cache first
944 cache = self._get_auth_cache()
945 cache_key = f"{user_email}:{include_personal}"
947 if cache:
948 cached_team_ids = await cache.get_user_teams(cache_key)
949 if cached_team_ids is not None:
950 if not cached_team_ids: # Empty list = user has no teams
951 return []
952 # Fetch full team objects by IDs (fast indexed lookup)
953 try:
954 teams = self.db.query(EmailTeam).filter(EmailTeam.id.in_(cached_team_ids), EmailTeam.is_active.is_(True)).all()
955 self.db.commit() # Release transaction to avoid idle-in-transaction
956 return teams
957 except Exception as e:
958 self.db.rollback()
959 logger.warning(f"Failed to fetch teams by IDs from cache: {e}")
960 # Fall through to full query
962 # Cache miss or caching disabled - do full query
963 try:
964 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True))
966 if not include_personal:
967 query = query.filter(EmailTeam.is_personal.is_(False))
969 teams = query.all()
970 self.db.commit() # Release transaction to avoid idle-in-transaction
972 # Update cache with team IDs
973 if cache:
974 team_ids = [t.id for t in teams]
975 await cache.set_user_teams(cache_key, team_ids)
977 return teams
979 except Exception as e:
980 self.db.rollback()
981 logger.error(f"Failed to get teams for user {user_email}: {e}")
982 return []
984 async def verify_team_for_user(self, user_email, team_id=None):
985 """
986 Retrieve a team ID for a user based on their membership and optionally a specific team ID.
988 This function attempts to fetch all teams associated with the given user email.
989 If no `team_id` is provided, it returns the ID of the user's personal team (if any).
990 If a `team_id` is provided, it checks whether the user is a member of that team.
991 If the user is not a member of the specified team, it returns a JSONResponse with an error message.
993 Args:
994 user_email (str): The email of the user whose teams are being queried.
995 team_id (str or None, optional): Specific team ID to check for membership. Defaults to None.
997 Returns:
998 str or JSONResponse or None:
999 - If `team_id` is None, returns the ID of the user's personal team or None if not found.
1000 - If `team_id` is provided and the user is a member of that team, returns `team_id`.
1001 - If `team_id` is provided but the user is not a member of that team, returns a JSONResponse with error.
1002 - Returns None if an error occurs and no `team_id` was initially provided.
1004 Raises:
1005 None explicitly, but any exceptions during the process are caught and logged.
1007 Examples:
1008 Verifies user team if team_id provided otherwise finds its personal id.
1009 """
1010 try:
1011 # Get all teams the user belongs to in a single query
1012 try:
1013 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True))
1014 user_teams = query.all()
1015 self.db.commit() # Release transaction to avoid idle-in-transaction
1016 except Exception as e:
1017 self.db.rollback()
1018 logger.error(f"Failed to get teams for user {user_email}: {e}")
1019 return []
1021 if not team_id:
1022 # If no team_id is provided, try to get the personal team
1023 personal_team = next((t for t in user_teams if getattr(t, "is_personal", False)), None)
1024 team_id = personal_team.id if personal_team else None
1025 else:
1026 # Check if the provided team_id exists among the user's teams
1027 is_team_present = any(team.id == team_id for team in user_teams)
1028 if not is_team_present:
1029 return []
1030 except Exception as e:
1031 self.db.rollback()
1032 print(f"An error occurred: {e}")
1033 if not team_id:
1034 team_id = None
1036 return team_id
1038 async def get_team_members(
1039 self,
1040 team_id: str,
1041 cursor: Optional[str] = None,
1042 limit: Optional[int] = None,
1043 page: Optional[int] = None,
1044 per_page: Optional[int] = None,
1045 ) -> Union[List[Tuple[EmailUser, EmailTeamMember]], Tuple[List[Tuple[EmailUser, EmailTeamMember]], Optional[str]], Dict[str, Any]]:
1046 """Get all members of a team with optional cursor or page-based pagination.
1048 Note: This method returns ORM objects and cannot be cached since callers
1049 depend on ORM attributes and methods.
1051 Args:
1052 team_id: ID of the team
1053 cursor: Opaque cursor token for cursor-based pagination
1054 limit: Maximum number of members to return (for cursor-based, default: 50)
1055 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1056 per_page: Items per page for page-based pagination (default: 30)
1058 Returns:
1059 - If cursor is provided: Tuple (members, next_cursor)
1060 - If page is provided: Dict with keys 'data', 'pagination', 'links'
1061 - If neither: List of all members (backward compatibility)
1063 Examples:
1064 Team member management and role display.
1065 """
1066 try:
1067 # Build base query - for pagination, select EmailTeamMember and eager-load user
1068 # For backward compat (no pagination), select both entities as tuple
1069 if cursor is None and page is None and limit is None:
1070 # Backward compatibility: return tuples (no pagination requested)
1071 query = (
1072 select(EmailUser, EmailTeamMember)
1073 .join(EmailTeamMember, EmailUser.email == EmailTeamMember.user_email)
1074 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
1075 .order_by(EmailUser.full_name, EmailUser.email)
1076 )
1077 result = self.db.execute(query)
1078 members = list(result.all())
1079 self.db.commit()
1080 return members
1082 # For pagination: select EmailTeamMember and eager-load user to avoid N+1
1083 query = (
1084 select(EmailTeamMember)
1085 .options(selectinload(EmailTeamMember.user))
1086 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
1087 .join(EmailUser, EmailUser.email == EmailTeamMember.user_email)
1088 )
1090 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate
1091 if page is not None:
1092 # Alphabetical ordering for user-friendly display
1093 query = query.order_by(EmailUser.full_name, EmailUser.email)
1094 pag_result = await unified_paginate(
1095 db=self.db,
1096 query=query,
1097 page=page,
1098 per_page=per_page or 30,
1099 cursor=None,
1100 limit=None,
1101 base_url=f"/admin/teams/{team_id}/members",
1102 query_params={},
1103 )
1104 self.db.commit()
1105 memberships = pag_result["data"]
1106 tuples = [(m.user, m) for m in memberships]
1107 return {
1108 "data": tuples,
1109 "pagination": pag_result["pagination"],
1110 "links": pag_result["links"],
1111 }
1113 # CURSOR-BASED PAGINATION (API) - custom implementation using (joined_at, id)
1114 # unified_paginate uses created_at which doesn't exist on EmailTeamMember
1116 # Order by joined_at DESC, id DESC for keyset pagination
1117 query = query.order_by(desc(EmailTeamMember.joined_at), desc(EmailTeamMember.id))
1119 # Decode cursor and apply keyset filter
1120 if cursor:
1121 try:
1122 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
1123 cursor_data = orjson.loads(cursor_json)
1124 last_id = cursor_data.get("id")
1125 joined_str = cursor_data.get("joined_at")
1126 if last_id and joined_str:
1127 last_joined = datetime.fromisoformat(joined_str)
1128 # Keyset filter: (joined_at < last) OR (joined_at = last AND id < last_id)
1129 query = query.where(
1130 or_(
1131 EmailTeamMember.joined_at < last_joined,
1132 and_(EmailTeamMember.joined_at == last_joined, EmailTeamMember.id < last_id),
1133 )
1134 )
1135 except (ValueError, TypeError) as e:
1136 logger.warning(f"Invalid cursor for team members, ignoring: {e}")
1138 # Fetch limit + 1 to check for more results (cap at max_page_size)
1139 page_size = min(limit or 50, settings.pagination_max_page_size)
1140 query = query.limit(page_size + 1)
1141 memberships = list(self.db.execute(query).scalars().all())
1143 # Check if there are more results
1144 has_more = len(memberships) > page_size
1145 if has_more:
1146 memberships = memberships[:page_size]
1148 # Generate next cursor using (joined_at, id)
1149 next_cursor = None
1150 if has_more and memberships:
1151 last_member = memberships[-1]
1152 cursor_data = {
1153 "joined_at": last_member.joined_at.isoformat() if last_member.joined_at else None,
1154 "id": last_member.id,
1155 }
1156 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
1158 self.db.commit()
1159 tuples = [(m.user, m) for m in memberships]
1160 return (tuples, next_cursor)
1162 except Exception as e:
1163 self.db.rollback()
1164 logger.error(f"Failed to get members for team {team_id}: {e}")
1166 # Return appropriate empty response based on mode
1167 if page is not None:
1168 return {"data": [], "pagination": {"page": page, "per_page": per_page or 30, "total": 0, "has_next": False, "has_prev": False}, "links": None}
1170 if cursor is not None:
1171 return ([], None)
1173 return []
1175 def count_team_owners(self, team_id: str) -> int:
1176 """Count the number of owners in a team using SQL COUNT.
1178 This is more efficient than loading all members and counting in Python.
1180 Args:
1181 team_id: ID of the team
1183 Returns:
1184 int: Number of active owners in the team
1185 """
1186 count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
1187 self.db.commit() # Release transaction to avoid idle-in-transaction
1188 return count
1190 def _get_auth_cache(self):
1191 """Get auth cache instance lazily.
1193 Returns:
1194 AuthCache instance or None if unavailable.
1195 """
1196 try:
1197 return get_auth_cache()
1198 except ImportError:
1199 return None
1201 def _get_admin_stats_cache(self):
1202 """Get admin stats cache instance lazily.
1204 Returns:
1205 AdminStatsCache instance or None if unavailable.
1206 """
1207 try:
1208 # First-Party
1209 from mcpgateway.cache.admin_stats_cache import get_admin_stats_cache # pylint: disable=import-outside-toplevel
1211 return get_admin_stats_cache()
1212 except ImportError:
1213 return None
1215 async def get_user_role_in_team(self, user_email: str, team_id: str) -> Optional[str]:
1216 """Get a user's role in a specific team.
1218 Uses caching to reduce database queries (called 11+ times per team operation).
1220 Args:
1221 user_email: Email of the user
1222 team_id: ID of the team
1224 Returns:
1225 str: User's role or None if not a member
1227 Examples:
1228 Access control and permission checking.
1229 """
1230 # Check cache first
1231 cache = self._get_auth_cache()
1232 if cache:
1233 cached_role = await cache.get_user_role(user_email, team_id)
1234 if cached_role is not None:
1235 # Empty string means "not a member" (cached None)
1236 return cached_role if cached_role else None
1238 try:
1239 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).first()
1240 self.db.commit() # Release transaction to avoid idle-in-transaction
1242 role = membership.role if membership else None
1244 # Store in cache
1245 if cache:
1246 await cache.set_user_role(user_email, team_id, role)
1248 return role
1250 except Exception as e:
1251 self.db.rollback()
1252 logger.error(f"Failed to get role for {user_email} in team {team_id}: {e}")
1253 return None
1255 async def list_teams(
1256 self,
1257 # Unified pagination params
1258 limit: int = 100,
1259 offset: int = 0,
1260 cursor: Optional[str] = None,
1261 page: Optional[int] = None,
1262 per_page: int = 50,
1263 include_inactive: bool = False,
1264 visibility_filter: Optional[str] = None,
1265 base_url: Optional[str] = None,
1266 include_personal: bool = False,
1267 search_query: Optional[str] = None,
1268 personal_owner_email: Optional[str] = None,
1269 ) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
1270 """List teams with pagination support (cursor or page based).
1272 Args:
1273 limit: Max items for cursor pagination
1274 offset: Offset for legacy/cursor pagination
1275 cursor: Cursor token
1276 page: Page number (1-indexed)
1277 per_page: Items per page
1278 include_inactive: Whether to include inactive teams
1279 visibility_filter: Filter by visibility (private, team, public)
1280 base_url: Base URL for pagination links
1281 include_personal: Whether to include personal teams
1282 search_query: Search term for name/slug/description
1283 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams
1285 Returns:
1286 Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
1287 - Tuple (teams, next_cursor) if cursor/offset based
1288 - Dict {data, pagination, links} if page based
1289 """
1290 query = select(EmailTeam)
1292 if not include_personal:
1293 if personal_owner_email:
1294 query = query.where(
1295 or_(
1296 EmailTeam.is_personal.is_(False),
1297 and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email),
1298 )
1299 )
1300 else:
1301 query = query.where(EmailTeam.is_personal.is_(False))
1303 if not include_inactive:
1304 query = query.where(EmailTeam.is_active.is_(True))
1306 if visibility_filter:
1307 query = query.where(EmailTeam.visibility == visibility_filter)
1309 if search_query:
1310 search_term = f"%{search_query}%"
1311 query = query.where(
1312 or_(
1313 EmailTeam.name.ilike(search_term),
1314 EmailTeam.slug.ilike(search_term),
1315 EmailTeam.description.ilike(search_term),
1316 )
1317 )
1319 # Choose ordering based on pagination mode:
1320 # - Page-based (UI): alphabetical by name for user-friendly display
1321 # - Cursor-based (API): created_at DESC, id DESC to match unified_paginate expectations
1322 if page is not None:
1323 query = query.order_by(EmailTeam.name, EmailTeam.id)
1324 else:
1325 query = query.order_by(desc(EmailTeam.created_at), desc(EmailTeam.id))
1327 # Base URL for pagination links (default to admin partial if not provided)
1328 if not base_url:
1329 base_url = f"{settings.app_root_path}/admin/teams/partial"
1331 # Apply offset manually for legacy offset-based pagination if not using page or cursor
1332 if not page and not cursor and offset > 0:
1333 query = query.offset(offset)
1335 result = await unified_paginate(
1336 db=self.db,
1337 query=query,
1338 cursor=cursor,
1339 limit=limit,
1340 page=page,
1341 per_page=per_page,
1342 base_url=base_url,
1343 )
1344 self.db.commit() # Release transaction to avoid idle-in-transaction
1345 return result
1347 async def get_all_team_ids(
1348 self,
1349 include_inactive: bool = False,
1350 visibility_filter: Optional[str] = None,
1351 include_personal: bool = False,
1352 search_query: Optional[str] = None,
1353 personal_owner_email: Optional[str] = None,
1354 ) -> List[int]:
1355 """Get all team IDs matching criteria (unpaginated).
1357 Args:
1358 include_inactive: Whether to include inactive teams
1359 visibility_filter: Filter by visibility (private, team, public)
1360 include_personal: Whether to include personal teams
1361 search_query: Search term for name/slug
1362 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams
1364 Returns:
1365 List[int]: List of team IDs
1366 """
1367 query = select(EmailTeam.id)
1369 if not include_personal:
1370 if personal_owner_email:
1371 query = query.where(
1372 or_(
1373 EmailTeam.is_personal.is_(False),
1374 and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email),
1375 )
1376 )
1377 else:
1378 query = query.where(EmailTeam.is_personal.is_(False))
1380 if not include_inactive:
1381 query = query.where(EmailTeam.is_active.is_(True))
1383 if visibility_filter:
1384 query = query.where(EmailTeam.visibility == visibility_filter)
1386 if search_query:
1387 search_term = f"%{search_query}%"
1388 query = query.where(
1389 or_(
1390 EmailTeam.name.ilike(search_term),
1391 EmailTeam.slug.ilike(search_term),
1392 )
1393 )
1395 result = self.db.execute(query)
1396 team_ids = [row[0] for row in result.all()]
1397 self.db.commit() # Release transaction to avoid idle-in-transaction
1398 return team_ids
1400 async def get_teams_count(
1401 self,
1402 include_inactive: bool = False,
1403 visibility_filter: Optional[str] = None,
1404 include_personal: bool = False,
1405 search_query: Optional[str] = None,
1406 ) -> int:
1407 """Get total count of teams matching criteria.
1409 Args:
1410 include_inactive: Whether to include inactive teams
1411 visibility_filter: Filter by visibility (private, team, public)
1412 include_personal: Whether to include personal teams
1413 search_query: Search term for name/slug
1415 Returns:
1416 int: Total count of matching teams
1417 """
1418 query = select(func.count(EmailTeam.id)) # pylint: disable=not-callable
1420 if not include_personal:
1421 query = query.where(EmailTeam.is_personal.is_(False))
1423 if not include_inactive:
1424 query = query.where(EmailTeam.is_active.is_(True))
1426 if visibility_filter:
1427 query = query.where(EmailTeam.visibility == visibility_filter)
1429 if search_query:
1430 search_term = f"%{search_query}%"
1431 query = query.where(
1432 or_(
1433 EmailTeam.name.ilike(search_term),
1434 EmailTeam.slug.ilike(search_term),
1435 )
1436 )
1438 result = self.db.execute(query)
1439 count = result.scalar() or 0
1440 self.db.commit() # Release transaction to avoid idle-in-transaction
1441 return count
1443 async def discover_public_teams(self, user_email: str, skip: int = 0, limit: Optional[int] = None) -> List[EmailTeam]:
1444 """Discover public teams that user can join.
1446 Args:
1447 user_email: Email of the user discovering teams
1448 skip: Number of teams to skip for pagination
1449 limit: Maximum number of teams to return (None for unlimited)
1451 Returns:
1452 List[EmailTeam]: List of public teams user can join
1454 Raises:
1455 Exception: If discovery fails
1456 """
1457 try:
1458 # Optimized: Use subquery instead of loading all IDs into memory (2 queries → 1)
1459 user_team_subquery = select(EmailTeamMember.team_id).where(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).scalar_subquery()
1461 query = self.db.query(EmailTeam).filter(EmailTeam.visibility == "public", EmailTeam.is_active.is_(True), EmailTeam.is_personal.is_(False), ~EmailTeam.id.in_(user_team_subquery))
1463 query = query.offset(skip)
1464 if limit is not None:
1465 query = query.limit(limit)
1466 teams = query.all()
1467 self.db.commit() # Release transaction to avoid idle-in-transaction
1468 return teams
1470 except Exception as e:
1471 self.db.rollback()
1472 logger.error(f"Failed to discover public teams for {user_email}: {e}")
1473 return []
1475 async def create_join_request(self, team_id: str, user_email: str, message: Optional[str] = None) -> "EmailTeamJoinRequest":
1476 """Create a request to join a public team.
1478 Args:
1479 team_id: ID of the team to join
1480 user_email: Email of the user requesting to join
1481 message: Optional message to team owners
1483 Returns:
1484 EmailTeamJoinRequest: Created join request
1486 Raises:
1487 ValueError: If team not found, not public, or user already member/has pending request
1488 """
1489 try:
1490 # Validate team
1491 team = await self.get_team_by_id(team_id)
1492 if not team:
1493 raise ValueError("Team not found")
1495 if team.visibility != "public":
1496 raise ValueError("Can only request to join public teams")
1498 # Check if user is already a member
1499 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
1501 if existing_member:
1502 raise ValueError("User is already a member of this team")
1504 # Check for existing requests (any status)
1505 existing_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.user_email == user_email).first()
1507 if existing_request:
1508 if existing_request.status == "pending" and not existing_request.is_expired():
1509 raise ValueError("User already has a pending join request for this team")
1511 # Update existing request (cancelled, rejected, expired) to pending
1512 existing_request.message = message or ""
1513 existing_request.status = "pending"
1514 existing_request.requested_at = utc_now()
1515 existing_request.expires_at = utc_now() + timedelta(days=7)
1516 existing_request.reviewed_at = None
1517 existing_request.reviewed_by = None
1518 existing_request.notes = None
1519 join_request = existing_request
1520 else:
1521 # Create new join request
1522 join_request = EmailTeamJoinRequest(team_id=team_id, user_email=user_email, message=message, expires_at=utc_now() + timedelta(days=7))
1523 self.db.add(join_request)
1525 self.db.commit()
1526 self.db.refresh(join_request)
1528 logger.info(f"Created join request for user {user_email} to team {team_id}")
1529 return join_request
1531 except Exception as e:
1532 self.db.rollback()
1533 logger.error(f"Failed to create join request: {e}")
1534 raise
1536 async def list_join_requests(self, team_id: str) -> List["EmailTeamJoinRequest"]:
1537 """List pending join requests for a team.
1539 Args:
1540 team_id: ID of the team
1542 Returns:
1543 List[EmailTeamJoinRequest]: List of pending join requests
1544 """
1545 try:
1546 requests = (
1547 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.status == "pending").order_by(EmailTeamJoinRequest.requested_at.desc()).all()
1548 )
1549 return requests
1551 except Exception as e:
1552 logger.error(f"Failed to list join requests for team {team_id}: {e}")
1553 return []
1555 async def approve_join_request(self, request_id: str, approved_by: str) -> Optional[EmailTeamMember]:
1556 """Approve a team join request.
1558 Args:
1559 request_id: ID of the join request
1560 approved_by: Email of the user approving the request
1562 Returns:
1563 EmailTeamMember: New team member or None if request not found
1565 Raises:
1566 ValueError: If request not found, expired, or already processed
1567 """
1568 try:
1569 # Get join request
1570 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first()
1572 if not join_request:
1573 raise ValueError("Join request not found or already processed")
1575 if join_request.is_expired():
1576 join_request.status = "expired"
1577 self.db.commit()
1578 raise ValueError("Join request has expired")
1580 # Check max teams per user
1581 max_teams = getattr(settings, "max_teams_per_user", 50)
1582 if self._get_user_team_count(join_request.user_email) >= max_teams:
1583 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
1585 # Add user to team
1586 member = EmailTeamMember(team_id=join_request.team_id, user_email=join_request.user_email, role="member", invited_by=approved_by, joined_at=utc_now()) # New joiners are always members
1588 self.db.add(member)
1589 # Update join request status
1590 join_request.status = "approved"
1591 join_request.reviewed_at = utc_now()
1592 join_request.reviewed_by = approved_by
1594 self.db.flush()
1595 self._log_team_member_action(member.id, join_request.team_id, join_request.user_email, member.role, "added", approved_by)
1597 self.db.refresh(member)
1599 # Assign team-scoped RBAC role matching the membership role
1600 try:
1601 rbac_role_name = self._get_rbac_role_name(member.role)
1602 team_rbac_role = await self.role_service.get_role_by_name(rbac_role_name, scope="team")
1603 if team_rbac_role:
1604 existing = await self.role_service.get_user_role_assignment(user_email=join_request.user_email, role_id=team_rbac_role.id, scope="team", scope_id=join_request.team_id)
1605 if not existing or not existing.is_active:
1606 await self.role_service.assign_role_to_user(user_email=join_request.user_email, role_id=team_rbac_role.id, scope="team", scope_id=join_request.team_id, granted_by=approved_by)
1607 logger.info(f"Assigned {rbac_role_name} role to {join_request.user_email} for team {join_request.team_id}")
1608 else:
1609 logger.debug(f"User {join_request.user_email} already has active {rbac_role_name} role for team {join_request.team_id}")
1610 else:
1611 logger.warning(f"Role '{rbac_role_name}' not found. User {join_request.user_email} added without RBAC role.")
1612 except Exception as role_error:
1613 logger.warning(f"Failed to assign role to {join_request.user_email}: {role_error}")
1615 # Invalidate auth cache for user's team membership and role
1616 try:
1617 self._fire_and_forget(auth_cache.invalidate_team(join_request.user_email))
1618 self._fire_and_forget(auth_cache.invalidate_user_role(join_request.user_email, join_request.team_id))
1619 self._fire_and_forget(auth_cache.invalidate_user_teams(join_request.user_email))
1620 self._fire_and_forget(auth_cache.invalidate_team_membership(join_request.user_email))
1621 self._fire_and_forget(admin_stats_cache.invalidate_teams())
1622 except Exception as cache_error:
1623 logger.debug(f"Failed to invalidate caches on join approval: {cache_error}")
1625 # Invalidate member count cache for this team
1626 await self.invalidate_team_member_count_cache(str(join_request.team_id))
1628 logger.info(f"Approved join request {request_id}: user {join_request.user_email} joined team {join_request.team_id}")
1629 return member
1631 except Exception as e:
1632 self.db.rollback()
1633 logger.error(f"Failed to approve join request {request_id}: {e}")
1634 raise
1636 async def reject_join_request(self, request_id: str, rejected_by: str) -> bool:
1637 """Reject a team join request.
1639 Args:
1640 request_id: ID of the join request
1641 rejected_by: Email of the user rejecting the request
1643 Returns:
1644 bool: True if request was rejected successfully
1646 Raises:
1647 ValueError: If request not found or already processed
1648 """
1649 try:
1650 # Get join request
1651 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first()
1653 if not join_request:
1654 raise ValueError("Join request not found or already processed")
1656 # Update join request status
1657 join_request.status = "rejected"
1658 join_request.reviewed_at = utc_now()
1659 join_request.reviewed_by = rejected_by
1661 self.db.commit()
1663 logger.info(f"Rejected join request {request_id}: user {join_request.user_email} for team {join_request.team_id}")
1664 return True
1666 except Exception as e:
1667 self.db.rollback()
1668 logger.error(f"Failed to reject join request {request_id}: {e}")
1669 raise
1671 async def get_user_join_requests(self, user_email: str, team_id: Optional[str] = None) -> List["EmailTeamJoinRequest"]:
1672 """Get join requests made by a user.
1674 Args:
1675 user_email: Email of the user
1676 team_id: Optional team ID to filter requests
1678 Returns:
1679 List[EmailTeamJoinRequest]: List of join requests made by the user
1681 Examples:
1682 Get all requests made by a user or for a specific team.
1683 """
1684 try:
1685 query = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email)
1687 if team_id:
1688 query = query.filter(EmailTeamJoinRequest.team_id == team_id)
1690 requests = query.all()
1691 return requests
1693 except Exception as e:
1694 logger.error(f"Failed to get join requests for user {user_email}: {e}")
1695 return []
1697 async def cancel_join_request(self, request_id: str, user_email: str) -> bool:
1698 """Cancel a join request.
1700 Args:
1701 request_id: ID of the join request to cancel
1702 user_email: Email of the user canceling the request
1704 Returns:
1705 bool: True if canceled successfully, False otherwise
1707 Examples:
1708 Allow users to cancel their pending join requests.
1709 """
1710 try:
1711 # Get the join request
1712 join_request = (
1713 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.status == "pending").first()
1714 )
1716 if not join_request:
1717 logger.warning(f"Join request {request_id} not found for user {user_email} or not pending")
1718 return False
1720 # Update join request status
1721 join_request.status = "cancelled"
1722 join_request.reviewed_at = utc_now()
1723 join_request.reviewed_by = user_email
1725 self.db.commit()
1727 logger.info(f"Cancelled join request {request_id} by user {user_email}")
1728 return True
1730 except Exception as e:
1731 self.db.rollback()
1732 logger.error(f"Failed to cancel join request {request_id}: {e}")
1733 return False
1735 # ==================================================================================
1736 # Batch Query Methods (N+1 Query Elimination - Issue #1892)
1737 # ==================================================================================
1739 def get_member_counts_batch(self, team_ids: List[str]) -> Dict[str, int]:
1740 """Get member counts for multiple teams in a single query.
1742 This is a synchronous method following the existing service pattern.
1743 Note: Like other sync SQLAlchemy calls, this will block the event
1744 loop in async contexts. For typical team counts this is acceptable.
1746 Args:
1747 team_ids: List of team UUIDs
1749 Returns:
1750 Dict mapping team_id to member count
1752 Raises:
1753 Exception: Re-raises any database errors after rollback
1755 Examples:
1756 >>> from unittest.mock import Mock
1757 >>> service = TeamManagementService(Mock())
1758 >>> service.get_member_counts_batch([])
1759 {}
1760 """
1761 if not team_ids:
1762 return {}
1764 try:
1765 # Single query for all teams
1766 results = (
1767 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable
1768 .filter(EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True))
1769 .group_by(EmailTeamMember.team_id)
1770 .all()
1771 )
1773 self.db.commit() # Release transaction to avoid idle-in-transaction
1775 # Build result dict, defaulting to 0 for teams with no members
1776 counts = {str(row.team_id): row.count for row in results}
1777 return {tid: counts.get(tid, 0) for tid in team_ids}
1778 except Exception as e:
1779 self.db.rollback()
1780 logger.error(f"Failed to get member counts for teams: {e}")
1781 raise
1783 def get_user_roles_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[str]]:
1784 """Get a user's role in multiple teams in a single query.
1786 Args:
1787 user_email: Email of the user
1788 team_ids: List of team UUIDs
1790 Returns:
1791 Dict mapping team_id to role (or None if not a member)
1793 Raises:
1794 Exception: Re-raises any database errors after rollback
1795 """
1796 if not team_ids:
1797 return {}
1799 try:
1800 # Single query for all teams
1801 results = (
1802 self.db.query(EmailTeamMember.team_id, EmailTeamMember.role)
1803 .filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True))
1804 .all()
1805 )
1807 self.db.commit() # Release transaction to avoid idle-in-transaction
1809 # Build result dict - teams with no membership return None
1810 roles = {str(row.team_id): row.role for row in results}
1811 return {tid: roles.get(tid) for tid in team_ids}
1812 except Exception as e:
1813 self.db.rollback()
1814 logger.error(f"Failed to get user roles for {user_email}: {e}")
1815 raise
1817 def get_pending_join_requests_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[Any]]:
1818 """Get pending join requests for a user across multiple teams in a single query.
1820 Args:
1821 user_email: Email of the user
1822 team_ids: List of team UUIDs to check
1824 Returns:
1825 Dict mapping team_id to pending EmailTeamJoinRequest (or None if no pending request)
1827 Raises:
1828 Exception: Re-raises any database errors after rollback
1829 """
1830 if not team_ids:
1831 return {}
1833 try:
1834 # Single query for all pending requests across teams
1835 results = (
1836 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.team_id.in_(team_ids), EmailTeamJoinRequest.status == "pending").all()
1837 )
1839 self.db.commit() # Release transaction to avoid idle-in-transaction
1841 # Build result dict - only one pending request per team expected
1842 pending_reqs = {str(req.team_id): req for req in results}
1843 return {tid: pending_reqs.get(tid) for tid in team_ids}
1844 except Exception as e:
1845 self.db.rollback()
1846 logger.error(f"Failed to get pending join requests for {user_email}: {e}")
1847 raise
1849 # ==================================================================================
1850 # Cached Batch Methods (Redis caching for member counts)
1851 # ==================================================================================
1853 def _get_member_count_cache_key(self, team_id: str) -> str:
1854 """Build cache key using settings.cache_prefix for consistency.
1856 Args:
1857 team_id: Team UUID to build cache key for
1859 Returns:
1860 Cache key string in format "{prefix}team:member_count:{team_id}"
1861 """
1862 cache_prefix = getattr(settings, "cache_prefix", "mcpgw:")
1863 return f"{cache_prefix}team:member_count:{team_id}"
1865 async def get_member_counts_batch_cached(self, team_ids: List[str]) -> Dict[str, int]:
1866 """Get member counts for multiple teams, using Redis cache with DB fallback.
1868 Caching behavior is controlled by settings:
1869 - team_member_count_cache_enabled: Enable/disable caching (default: True)
1870 - team_member_count_cache_ttl: Cache TTL in seconds (default: 300)
1872 Args:
1873 team_ids: List of team UUIDs
1875 Returns:
1876 Dict mapping team_id to member count
1878 Raises:
1879 Exception: Re-raises any database errors after rollback
1880 """
1881 if not team_ids:
1882 return {}
1884 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True)
1885 cache_ttl = getattr(settings, "team_member_count_cache_ttl", 300)
1887 # If caching disabled, go straight to batch DB query
1888 if not cache_enabled:
1889 return self.get_member_counts_batch(team_ids)
1891 try:
1892 redis_client = await get_redis_client()
1893 except Exception:
1894 redis_client = None
1896 result: Dict[str, int] = {}
1897 cache_misses: List[str] = []
1899 # Step 1: Check Redis cache for all team IDs
1900 if redis_client:
1901 try:
1902 cache_keys = [self._get_member_count_cache_key(tid) for tid in team_ids]
1903 cached_values = await redis_client.mget(cache_keys)
1905 for tid, cached in zip(team_ids, cached_values):
1906 if cached is not None:
1907 result[tid] = int(cached)
1908 else:
1909 cache_misses.append(tid)
1910 except Exception as e:
1911 logger.warning(f"Redis cache read failed, falling back to DB: {e}")
1912 cache_misses = list(team_ids)
1913 else:
1914 # No Redis available, fall back to DB
1915 cache_misses = list(team_ids)
1917 # Step 2: Query database for cache misses
1918 if cache_misses:
1919 try:
1920 db_results = (
1921 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable
1922 .filter(EmailTeamMember.team_id.in_(cache_misses), EmailTeamMember.is_active.is_(True))
1923 .group_by(EmailTeamMember.team_id)
1924 .all()
1925 )
1927 self.db.commit()
1929 db_counts = {str(row.team_id): row.count for row in db_results}
1931 # Fill in results and cache them
1932 for tid in cache_misses:
1933 count = db_counts.get(tid, 0)
1934 result[tid] = count
1936 # Step 3: Cache the result with configured TTL
1937 if redis_client:
1938 try:
1939 await redis_client.setex(self._get_member_count_cache_key(tid), cache_ttl, str(count))
1940 except Exception as e:
1941 logger.warning(f"Redis cache write failed for team {tid}: {e}")
1943 except Exception as e:
1944 self.db.rollback()
1945 logger.error(f"Failed to get member counts for teams: {e}")
1946 raise
1948 return result
1950 async def invalidate_team_member_count_cache(self, team_id: str) -> None:
1951 """Invalidate the cached member count for a team.
1953 Call this after any membership changes (add/remove/update).
1954 No-op if caching is disabled or Redis unavailable.
1956 Args:
1957 team_id: Team UUID to invalidate
1958 """
1959 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True)
1960 if not cache_enabled:
1961 return
1963 try:
1964 redis_client = await get_redis_client()
1965 if redis_client:
1966 await redis_client.delete(self._get_member_count_cache_key(team_id))
1967 except Exception as e:
1968 logger.warning(f"Failed to invalidate member count cache for team {team_id}: {e}")