Coverage for mcpgateway / services / team_management_service.py: 100%
679 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/team_management_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Management Service.
8This module provides team creation, management, and membership operations
9for the multi-team collaboration system.
11Examples:
12 >>> from unittest.mock import Mock
13 >>> service = TeamManagementService(Mock())
14 >>> isinstance(service, TeamManagementService)
15 True
16 >>> hasattr(service, 'db')
17 True
18"""
20# Standard
21import asyncio
22import base64
23from datetime import datetime, timedelta
24from typing import Any, Dict, List, Optional, Tuple, Union
26# Third-Party
27import orjson
28from sqlalchemy import and_, desc, func, or_, select
29from sqlalchemy.orm import selectinload, Session
31# First-Party
32from mcpgateway.cache.admin_stats_cache import admin_stats_cache
33from mcpgateway.cache.auth_cache import auth_cache
34from mcpgateway.config import settings
35from mcpgateway.db import EmailTeam, EmailTeamJoinRequest, EmailTeamMember, EmailTeamMemberHistory, EmailUser, utc_now
36from mcpgateway.services.logging_service import LoggingService
37from mcpgateway.utils.pagination import unified_paginate
39# Initialize logging
40logging_service = LoggingService()
41logger = logging_service.get_logger(__name__)
44class TeamManagementService:
45 """Service for team management operations.
47 This service handles team creation, membership management,
48 role assignments, and team access control.
50 Attributes:
51 db (Session): SQLAlchemy database session
53 Examples:
54 >>> from unittest.mock import Mock
55 >>> service = TeamManagementService(Mock())
56 >>> service.__class__.__name__
57 'TeamManagementService'
58 >>> hasattr(service, 'db')
59 True
60 """
62 def __init__(self, db: Session):
63 """Initialize the team management service.
65 Args:
66 db: SQLAlchemy database session
68 Examples:
69 Basic initialization:
70 >>> from mcpgateway.services.team_management_service import TeamManagementService
71 >>> from unittest.mock import Mock
72 >>> db_session = Mock()
73 >>> service = TeamManagementService(db_session)
74 >>> service.db is db_session
75 True
77 Service attributes:
78 >>> hasattr(service, 'db')
79 True
80 >>> service.__class__.__name__
81 'TeamManagementService'
82 """
83 self.db = db
85 def _log_team_member_action(self, team_member_id: str, team_id: str, user_email: str, role: str, action: str, action_by: Optional[str]):
86 """
87 Log a team member action to EmailTeamMemberHistory.
89 Args:
90 team_member_id: ID of the EmailTeamMember
91 team_id: Team ID
92 user_email: Email of the affected user
93 role: Role at the time of action
94 action: Action type ("added", "removed", "reactivated", "role_changed")
95 action_by: Email of the user who performed the action
97 Examples:
98 >>> from mcpgateway.services.team_management_service import TeamManagementService
99 >>> from unittest.mock import Mock
100 >>> service = TeamManagementService(Mock())
101 >>> service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com")
102 """
103 history = EmailTeamMemberHistory(team_member_id=team_member_id, team_id=team_id, user_email=user_email, role=role, action=action, action_by=action_by, action_timestamp=utc_now())
104 self.db.add(history)
105 self.db.commit()
107 async def create_team(self, name: str, description: Optional[str], created_by: str, visibility: Optional[str] = "public", max_members: Optional[int] = None) -> EmailTeam:
108 """Create a new team.
110 Args:
111 name: Team name
112 description: Team description
113 created_by: Email of the user creating the team
114 visibility: Team visibility (private, team, public)
115 max_members: Maximum number of team members allowed
117 Returns:
118 EmailTeam: The created team
120 Raises:
121 ValueError: If team name is taken or invalid
122 Exception: If team creation fails
124 Examples:
125 Team creation parameter validation:
126 >>> from mcpgateway.services.team_management_service import TeamManagementService
128 Test team name validation:
129 >>> team_name = "My Development Team"
130 >>> len(team_name) > 0
131 True
132 >>> len(team_name) <= 255
133 True
134 >>> bool(team_name.strip())
135 True
137 Test visibility validation:
138 >>> visibility = "private"
139 >>> valid_visibilities = ["private", "public"]
140 >>> visibility in valid_visibilities
141 True
142 >>> "invalid" in valid_visibilities
143 False
145 Test max_members validation:
146 >>> max_members = 50
147 >>> isinstance(max_members, int)
148 True
149 >>> max_members > 0
150 True
152 Test creator validation:
153 >>> created_by = "admin@example.com"
154 >>> "@" in created_by
155 True
156 >>> len(created_by) > 0
157 True
159 Test description handling:
160 >>> description = "A team for software development"
161 >>> description is not None
162 True
163 >>> isinstance(description, str)
164 True
166 >>> # Test None description
167 >>> description_none = None
168 >>> description_none is None
169 True
170 """
171 try:
172 # Validate visibility
173 valid_visibilities = ["private", "public"]
174 if visibility not in valid_visibilities:
175 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}")
177 # Apply default max members from settings
178 if max_members is None:
179 max_members = getattr(settings, "max_members_per_team", 100)
181 # Check for existing inactive team with same name
182 # First-Party
183 from mcpgateway.utils.create_slug import slugify # pylint: disable=import-outside-toplevel
185 potential_slug = slugify(name)
186 existing_inactive_team = self.db.query(EmailTeam).filter(EmailTeam.slug == potential_slug, EmailTeam.is_active.is_(False)).first()
188 if existing_inactive_team:
189 # Reactivate the existing team with new details
190 existing_inactive_team.name = name
191 existing_inactive_team.description = description
192 existing_inactive_team.created_by = created_by
193 existing_inactive_team.visibility = visibility
194 existing_inactive_team.max_members = max_members
195 existing_inactive_team.is_active = True
196 existing_inactive_team.updated_at = utc_now()
197 team = existing_inactive_team
199 # Check if the creator already has an inactive membership
200 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email == created_by).first()
202 if existing_membership:
203 # Reactivate existing membership as owner
204 existing_membership.role = "owner"
205 existing_membership.joined_at = utc_now()
206 existing_membership.is_active = True
207 membership = existing_membership
208 else:
209 # Create new membership
210 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True)
211 self.db.add(membership)
213 logger.info(f"Reactivated existing team with slug {potential_slug}")
214 else:
215 # Create the team (slug will be auto-generated by event listener)
216 team = EmailTeam(name=name, description=description, created_by=created_by, is_personal=False, visibility=visibility, max_members=max_members, is_active=True)
217 self.db.add(team)
219 self.db.flush() # Get the team ID
221 # Add the creator as owner
222 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True)
223 self.db.add(membership)
225 self.db.commit()
227 # Invalidate member count cache for the new team
228 await self.invalidate_team_member_count_cache(str(team.id))
230 # Invalidate auth cache for creator's team membership
231 # Without this, the cache won't know the user belongs to this new team
232 try:
233 await auth_cache.invalidate_user_teams(created_by)
234 await auth_cache.invalidate_team_membership(created_by)
235 await auth_cache.invalidate_user_role(created_by, str(team.id))
236 await admin_stats_cache.invalidate_teams()
237 except Exception as cache_error:
238 logger.debug(f"Failed to invalidate cache on team create: {cache_error}")
240 logger.info(f"Created team '{team.name}' by {created_by}")
241 return team
243 except Exception as e:
244 self.db.rollback()
245 logger.error(f"Failed to create team '{name}': {e}")
246 raise
248 async def get_team_by_id(self, team_id: str) -> Optional[EmailTeam]:
249 """Get a team by ID.
251 Args:
252 team_id: Team ID to lookup
254 Returns:
255 EmailTeam: The team or None if not found
257 Examples:
258 >>> import asyncio
259 >>> from unittest.mock import Mock
260 >>> service = TeamManagementService(Mock())
261 >>> asyncio.iscoroutinefunction(service.get_team_by_id)
262 True
263 """
264 try:
265 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
266 self.db.commit() # Release transaction to avoid idle-in-transaction
267 return team
269 except Exception as e:
270 self.db.rollback()
271 logger.error(f"Failed to get team by ID {team_id}: {e}")
272 return None
274 async def get_team_by_slug(self, slug: str) -> Optional[EmailTeam]:
275 """Get a team by slug.
277 Args:
278 slug: Team slug to lookup
280 Returns:
281 EmailTeam: The team or None if not found
283 Examples:
284 >>> import asyncio
285 >>> from unittest.mock import Mock
286 >>> service = TeamManagementService(Mock())
287 >>> asyncio.iscoroutinefunction(service.get_team_by_slug)
288 True
289 """
290 try:
291 team = self.db.query(EmailTeam).filter(EmailTeam.slug == slug, EmailTeam.is_active.is_(True)).first()
292 self.db.commit() # Release transaction to avoid idle-in-transaction
293 return team
295 except Exception as e:
296 self.db.rollback()
297 logger.error(f"Failed to get team by slug {slug}: {e}")
298 return None
300 async def update_team(
301 self, team_id: str, name: Optional[str] = None, description: Optional[str] = None, visibility: Optional[str] = None, max_members: Optional[int] = None, updated_by: Optional[str] = None
302 ) -> bool:
303 """Update team information.
305 Args:
306 team_id: ID of the team to update
307 name: New team name
308 description: New team description
309 visibility: New visibility setting
310 max_members: New maximum member limit
311 updated_by: Email of user making the update
313 Returns:
314 bool: True if update succeeded, False otherwise
316 Raises:
317 ValueError: If visibility setting is invalid
319 Examples:
320 >>> import asyncio
321 >>> from unittest.mock import Mock
322 >>> service = TeamManagementService(Mock())
323 >>> asyncio.iscoroutinefunction(service.update_team)
324 True
325 """
326 try:
327 team = await self.get_team_by_id(team_id)
328 if not team:
329 logger.warning(f"Team {team_id} not found for update")
330 return False
332 # Prevent updating personal teams
333 if team.is_personal:
334 logger.warning(f"Cannot update personal team {team_id}")
335 return False
337 # Update fields if provided
338 if name is not None:
339 team.name = name
340 # Slug will be updated by event listener if name changes
342 if description is not None:
343 team.description = description
345 if visibility is not None:
346 valid_visibilities = ["private", "public"]
347 if visibility not in valid_visibilities:
348 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}")
349 team.visibility = visibility
351 if max_members is not None:
352 team.max_members = max_members
354 team.updated_at = utc_now()
355 self.db.commit()
357 logger.info(f"Updated team {team_id} by {updated_by}")
358 return True
360 except ValueError:
361 raise # Let ValueError propagate to caller for proper error handling
362 except Exception as e:
363 self.db.rollback()
364 logger.error(f"Failed to update team {team_id}: {e}")
365 return False
367 async def delete_team(self, team_id: str, deleted_by: str) -> bool:
368 """Delete a team (soft delete).
370 Args:
371 team_id: ID of the team to delete
372 deleted_by: Email of user performing deletion
374 Returns:
375 bool: True if deletion succeeded, False otherwise
377 Raises:
378 ValueError: If attempting to delete a personal team
380 Examples:
381 >>> import asyncio
382 >>> from unittest.mock import Mock
383 >>> service = TeamManagementService(Mock())
384 >>> asyncio.iscoroutinefunction(service.delete_team)
385 True
386 """
387 try:
388 team = await self.get_team_by_id(team_id)
389 if not team:
390 logger.warning(f"Team {team_id} not found for deletion")
391 return False
393 # Prevent deleting personal teams
394 if team.is_personal:
395 logger.warning(f"Cannot delete personal team {team_id}")
396 raise ValueError("Personal teams cannot be deleted")
398 # Soft delete the team
399 team.is_active = False
400 team.updated_at = utc_now()
402 # Get all active memberships before deactivating (for history logging)
403 memberships = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).all()
405 # Log history for each membership (before bulk update)
406 for membership in memberships:
407 self._log_team_member_action(membership.id, team_id, membership.user_email, membership.role, "team-deleted", deleted_by)
409 # Bulk update: deactivate all memberships in single query instead of looping
410 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).update({EmailTeamMember.is_active: False}, synchronize_session=False)
412 self.db.commit()
414 # Invalidate all role caches for this team
415 try:
416 asyncio.create_task(auth_cache.invalidate_team_roles(team_id))
417 asyncio.create_task(admin_stats_cache.invalidate_teams())
418 # Also invalidate team cache, teams list cache, and team membership cache for each member
419 for membership in memberships:
420 asyncio.create_task(auth_cache.invalidate_team(membership.user_email))
421 asyncio.create_task(auth_cache.invalidate_user_teams(membership.user_email))
422 asyncio.create_task(auth_cache.invalidate_team_membership(membership.user_email))
423 except Exception as cache_error:
424 logger.debug(f"Failed to invalidate caches on team delete: {cache_error}")
426 logger.info(f"Deleted team {team_id} by {deleted_by}")
427 return True
429 except Exception as e:
430 self.db.rollback()
431 logger.error(f"Failed to delete team {team_id}: {e}")
432 return False
434 async def add_member_to_team(self, team_id: str, user_email: str, role: str = "member", invited_by: Optional[str] = None) -> bool:
435 """Add a member to a team.
437 Args:
438 team_id: ID of the team
439 user_email: Email of the user to add
440 role: Role to assign (owner, member)
441 invited_by: Email of user who added this member
443 Returns:
444 bool: True if member was added successfully, False otherwise
446 Raises:
447 ValueError: If role is invalid or team member limit exceeded
449 Examples:
450 >>> import asyncio
451 >>> from unittest.mock import Mock
452 >>> service = TeamManagementService(Mock())
453 >>> asyncio.iscoroutinefunction(service.add_member_to_team)
454 True
455 >>> # After adding, EmailTeamMemberHistory is updated
456 >>> # service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com")
457 """
458 try:
459 # Validate role
460 valid_roles = ["owner", "member"]
461 if role not in valid_roles:
462 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
464 # Check if team exists
465 team = await self.get_team_by_id(team_id)
466 if not team:
467 logger.warning(f"Team {team_id} not found")
468 return False
470 # Check if user exists
471 user = self.db.query(EmailUser).filter(EmailUser.email == user_email).first()
472 if not user:
473 logger.warning(f"User {user_email} not found")
474 return False
476 # Check if user is already a member
477 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email).first()
479 if existing_membership and existing_membership.is_active:
480 logger.warning(f"User {user_email} is already a member of team {team_id}")
481 return False
483 # Check team member limit
484 if team.max_members:
485 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count()
487 if current_member_count >= team.max_members:
488 logger.warning(f"Team {team_id} has reached maximum member limit")
489 raise ValueError(f"Team has reached maximum member limit of {team.max_members}")
491 # Add or reactivate membership
492 if existing_membership:
493 existing_membership.is_active = True
494 existing_membership.role = role
495 existing_membership.joined_at = utc_now()
496 existing_membership.invited_by = invited_by
497 self.db.commit()
498 self._log_team_member_action(existing_membership.id, team_id, user_email, role, "reactivated", invited_by)
499 else:
500 membership = EmailTeamMember(team_id=team_id, user_email=user_email, role=role, joined_at=utc_now(), invited_by=invited_by, is_active=True)
501 self.db.add(membership)
502 self.db.commit()
503 self._log_team_member_action(membership.id, team_id, user_email, role, "added", invited_by)
505 # Invalidate auth cache for user's team membership and role
506 try:
507 asyncio.create_task(auth_cache.invalidate_team(user_email))
508 asyncio.create_task(auth_cache.invalidate_user_role(user_email, team_id))
509 asyncio.create_task(auth_cache.invalidate_user_teams(user_email))
510 asyncio.create_task(auth_cache.invalidate_team_membership(user_email))
511 asyncio.create_task(admin_stats_cache.invalidate_teams())
512 except Exception as cache_error:
513 logger.debug(f"Failed to invalidate cache on team add: {cache_error}")
515 # Invalidate member count cache for this team
516 await self.invalidate_team_member_count_cache(str(team_id))
518 logger.info(f"Added {user_email} to team {team_id} with role {role}")
519 return True
521 except Exception as e:
522 self.db.rollback()
523 logger.error(f"Failed to add {user_email} to team {team_id}: {e}")
524 return False
526 async def remove_member_from_team(self, team_id: str, user_email: str, removed_by: Optional[str] = None) -> bool:
527 """Remove a member from a team.
529 Args:
530 team_id: ID of the team
531 user_email: Email of the user to remove
532 removed_by: Email of user performing the removal
534 Returns:
535 bool: True if member was removed successfully, False otherwise
537 Raises:
538 ValueError: If attempting to remove the last owner
540 Examples:
541 Team membership management with role-based access control.
542 After removal, EmailTeamMemberHistory is updated via _log_team_member_action.
543 """
544 try:
545 team = await self.get_team_by_id(team_id)
546 if not team:
547 logger.warning(f"Team {team_id} not found")
548 return False
550 # Prevent removing members from personal teams
551 if team.is_personal:
552 logger.warning(f"Cannot remove members from personal team {team_id}")
553 return False
555 # Find the membership
556 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
558 if not membership:
559 logger.warning(f"User {user_email} is not a member of team {team_id}")
560 return False
562 # Prevent removing the last owner
563 if membership.role == "owner":
564 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
566 if owner_count <= 1:
567 logger.warning(f"Cannot remove the last owner from team {team_id}")
568 raise ValueError("Cannot remove the last owner from a team")
570 # Remove membership (soft delete)
571 membership.is_active = False
572 self.db.commit()
573 self._log_team_member_action(membership.id, team_id, user_email, membership.role, "removed", removed_by)
575 # Invalidate auth cache for user's team membership and role
576 try:
577 asyncio.create_task(auth_cache.invalidate_team(user_email))
578 asyncio.create_task(auth_cache.invalidate_user_role(user_email, team_id))
579 asyncio.create_task(auth_cache.invalidate_user_teams(user_email))
580 asyncio.create_task(auth_cache.invalidate_team_membership(user_email))
581 except Exception as cache_error:
582 logger.debug(f"Failed to invalidate cache on team remove: {cache_error}")
584 # Invalidate member count cache for this team
585 await self.invalidate_team_member_count_cache(str(team_id))
587 logger.info(f"Removed {user_email} from team {team_id} by {removed_by}")
588 return True
590 except Exception as e:
591 self.db.rollback()
592 logger.error(f"Failed to remove {user_email} from team {team_id}: {e}")
593 return False
595 async def update_member_role(self, team_id: str, user_email: str, new_role: str, updated_by: Optional[str] = None) -> bool:
596 """Update a team member's role.
598 Args:
599 team_id: ID of the team
600 user_email: Email of the user whose role to update
601 new_role: New role to assign
602 updated_by: Email of user making the change
604 Returns:
605 bool: True if role was updated successfully, False otherwise
607 Raises:
608 ValueError: If role is invalid or removing last owner role
610 Examples:
611 Role management within teams for access control.
612 After role update, EmailTeamMemberHistory is updated via _log_team_member_action.
613 """
614 try:
615 # Validate role
616 valid_roles = ["owner", "member"]
617 if new_role not in valid_roles:
618 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
620 team = await self.get_team_by_id(team_id)
621 if not team:
622 logger.warning(f"Team {team_id} not found")
623 return False
625 # Prevent updating roles in personal teams
626 if team.is_personal:
627 logger.warning(f"Cannot update roles in personal team {team_id}")
628 return False
630 # Find the membership
631 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
633 if not membership:
634 logger.warning(f"User {user_email} is not a member of team {team_id}")
635 return False
637 # Prevent changing the role of the last owner to non-owner
638 if membership.role == "owner" and new_role != "owner":
639 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
641 if owner_count <= 1:
642 logger.warning(f"Cannot remove owner role from the last owner of team {team_id}")
643 raise ValueError("Cannot remove owner role from the last owner of a team")
645 # Update the role
646 membership.role = new_role
647 self.db.commit()
648 self._log_team_member_action(membership.id, team_id, user_email, new_role, "role_changed", updated_by)
650 # Invalidate role cache
651 try:
652 asyncio.create_task(auth_cache.invalidate_user_role(user_email, team_id))
653 except Exception as cache_error:
654 logger.debug(f"Failed to invalidate cache on role update: {cache_error}")
656 logger.info(f"Updated role of {user_email} in team {team_id} to {new_role} by {updated_by}")
657 return True
659 except ValueError:
660 raise # Let ValueError propagate to caller for proper error handling
661 except Exception as e:
662 self.db.rollback()
663 logger.error(f"Failed to update role of {user_email} in team {team_id}: {e}")
664 return False
666 async def get_member(self, team_id: str, user_email: str) -> Optional[EmailTeamMember]:
667 """Get a single team member by team ID and user email.
669 Args:
670 team_id: ID of the team
671 user_email: Email of the user
673 Returns:
674 EmailTeamMember if found and active, None otherwise
675 """
676 try:
677 return self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
678 except Exception as e:
679 logger.error(f"Failed to get member {user_email} in team {team_id}: {e}")
680 return None
682 async def get_user_teams(self, user_email: str, include_personal: bool = True) -> List[EmailTeam]:
683 """Get all teams a user belongs to.
685 Uses caching to reduce database queries (called 20+ times per request).
686 Cache can be disabled via AUTH_CACHE_TEAMS_ENABLED=false.
688 Args:
689 user_email: Email of the user
690 include_personal: Whether to include personal teams
692 Returns:
693 List[EmailTeam]: List of teams the user belongs to
695 Examples:
696 User dashboard showing team memberships.
697 """
698 # Check cache first
699 cache = self._get_auth_cache()
700 cache_key = f"{user_email}:{include_personal}"
702 if cache:
703 cached_team_ids = await cache.get_user_teams(cache_key)
704 if cached_team_ids is not None:
705 if not cached_team_ids: # Empty list = user has no teams
706 return []
707 # Fetch full team objects by IDs (fast indexed lookup)
708 try:
709 teams = self.db.query(EmailTeam).filter(EmailTeam.id.in_(cached_team_ids), EmailTeam.is_active.is_(True)).all()
710 self.db.commit() # Release transaction to avoid idle-in-transaction
711 return teams
712 except Exception as e:
713 self.db.rollback()
714 logger.warning(f"Failed to fetch teams by IDs from cache: {e}")
715 # Fall through to full query
717 # Cache miss or caching disabled - do full query
718 try:
719 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True))
721 if not include_personal:
722 query = query.filter(EmailTeam.is_personal.is_(False))
724 teams = query.all()
725 self.db.commit() # Release transaction to avoid idle-in-transaction
727 # Update cache with team IDs
728 if cache:
729 team_ids = [t.id for t in teams]
730 await cache.set_user_teams(cache_key, team_ids)
732 return teams
734 except Exception as e:
735 self.db.rollback()
736 logger.error(f"Failed to get teams for user {user_email}: {e}")
737 return []
739 async def verify_team_for_user(self, user_email, team_id=None):
740 """
741 Retrieve a team ID for a user based on their membership and optionally a specific team ID.
743 This function attempts to fetch all teams associated with the given user email.
744 If no `team_id` is provided, it returns the ID of the user's personal team (if any).
745 If a `team_id` is provided, it checks whether the user is a member of that team.
746 If the user is not a member of the specified team, it returns a JSONResponse with an error message.
748 Args:
749 user_email (str): The email of the user whose teams are being queried.
750 team_id (str or None, optional): Specific team ID to check for membership. Defaults to None.
752 Returns:
753 str or JSONResponse or None:
754 - If `team_id` is None, returns the ID of the user's personal team or None if not found.
755 - If `team_id` is provided and the user is a member of that team, returns `team_id`.
756 - If `team_id` is provided but the user is not a member of that team, returns a JSONResponse with error.
757 - Returns None if an error occurs and no `team_id` was initially provided.
759 Raises:
760 None explicitly, but any exceptions during the process are caught and logged.
762 Examples:
763 Verifies user team if team_id provided otherwise finds its personal id.
764 """
765 try:
766 # Get all teams the user belongs to in a single query
767 try:
768 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True))
769 user_teams = query.all()
770 self.db.commit() # Release transaction to avoid idle-in-transaction
771 except Exception as e:
772 self.db.rollback()
773 logger.error(f"Failed to get teams for user {user_email}: {e}")
774 return []
776 if not team_id:
777 # If no team_id is provided, try to get the personal team
778 personal_team = next((t for t in user_teams if getattr(t, "is_personal", False)), None)
779 team_id = personal_team.id if personal_team else None
780 else:
781 # Check if the provided team_id exists among the user's teams
782 is_team_present = any(team.id == team_id for team in user_teams)
783 if not is_team_present:
784 return []
785 except Exception as e:
786 self.db.rollback()
787 print(f"An error occurred: {e}")
788 if not team_id:
789 team_id = None
791 return team_id
793 async def get_team_members(
794 self,
795 team_id: str,
796 cursor: Optional[str] = None,
797 limit: Optional[int] = None,
798 page: Optional[int] = None,
799 per_page: Optional[int] = None,
800 ) -> Union[List[Tuple[EmailUser, EmailTeamMember]], Tuple[List[Tuple[EmailUser, EmailTeamMember]], Optional[str]], Dict[str, Any]]:
801 """Get all members of a team with optional cursor or page-based pagination.
803 Note: This method returns ORM objects and cannot be cached since callers
804 depend on ORM attributes and methods.
806 Args:
807 team_id: ID of the team
808 cursor: Opaque cursor token for cursor-based pagination
809 limit: Maximum number of members to return (for cursor-based, default: 50)
810 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
811 per_page: Items per page for page-based pagination (default: 30)
813 Returns:
814 - If cursor is provided: Tuple (members, next_cursor)
815 - If page is provided: Dict with keys 'data', 'pagination', 'links'
816 - If neither: List of all members (backward compatibility)
818 Examples:
819 Team member management and role display.
820 """
821 try:
822 # Build base query - for pagination, select EmailTeamMember and eager-load user
823 # For backward compat (no pagination), select both entities as tuple
824 if cursor is None and page is None and limit is None:
825 # Backward compatibility: return tuples (no pagination requested)
826 query = (
827 select(EmailUser, EmailTeamMember)
828 .join(EmailTeamMember, EmailUser.email == EmailTeamMember.user_email)
829 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
830 .order_by(EmailUser.full_name, EmailUser.email)
831 )
832 result = self.db.execute(query)
833 members = list(result.all())
834 self.db.commit()
835 return members
837 # For pagination: select EmailTeamMember and eager-load user to avoid N+1
838 query = (
839 select(EmailTeamMember)
840 .options(selectinload(EmailTeamMember.user))
841 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True))
842 .join(EmailUser, EmailUser.email == EmailTeamMember.user_email)
843 )
845 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate
846 if page is not None:
847 # Alphabetical ordering for user-friendly display
848 query = query.order_by(EmailUser.full_name, EmailUser.email)
849 pag_result = await unified_paginate(
850 db=self.db,
851 query=query,
852 page=page,
853 per_page=per_page or 30,
854 cursor=None,
855 limit=None,
856 base_url=f"/admin/teams/{team_id}/members",
857 query_params={},
858 )
859 self.db.commit()
860 memberships = pag_result["data"]
861 tuples = [(m.user, m) for m in memberships]
862 return {
863 "data": tuples,
864 "pagination": pag_result["pagination"],
865 "links": pag_result["links"],
866 }
868 # CURSOR-BASED PAGINATION (API) - custom implementation using (joined_at, id)
869 # unified_paginate uses created_at which doesn't exist on EmailTeamMember
871 # Order by joined_at DESC, id DESC for keyset pagination
872 query = query.order_by(desc(EmailTeamMember.joined_at), desc(EmailTeamMember.id))
874 # Decode cursor and apply keyset filter
875 if cursor:
876 try:
877 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode()
878 cursor_data = orjson.loads(cursor_json)
879 last_id = cursor_data.get("id")
880 joined_str = cursor_data.get("joined_at")
881 if last_id and joined_str:
882 last_joined = datetime.fromisoformat(joined_str)
883 # Keyset filter: (joined_at < last) OR (joined_at = last AND id < last_id)
884 query = query.where(
885 or_(
886 EmailTeamMember.joined_at < last_joined,
887 and_(EmailTeamMember.joined_at == last_joined, EmailTeamMember.id < last_id),
888 )
889 )
890 except (ValueError, TypeError) as e:
891 logger.warning(f"Invalid cursor for team members, ignoring: {e}")
893 # Fetch limit + 1 to check for more results (cap at max_page_size)
894 page_size = min(limit or 50, settings.pagination_max_page_size)
895 query = query.limit(page_size + 1)
896 memberships = list(self.db.execute(query).scalars().all())
898 # Check if there are more results
899 has_more = len(memberships) > page_size
900 if has_more:
901 memberships = memberships[:page_size]
903 # Generate next cursor using (joined_at, id)
904 next_cursor = None
905 if has_more and memberships:
906 last_member = memberships[-1]
907 cursor_data = {
908 "joined_at": last_member.joined_at.isoformat() if last_member.joined_at else None,
909 "id": last_member.id,
910 }
911 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode()
913 self.db.commit()
914 tuples = [(m.user, m) for m in memberships]
915 return (tuples, next_cursor)
917 except Exception as e:
918 self.db.rollback()
919 logger.error(f"Failed to get members for team {team_id}: {e}")
921 # Return appropriate empty response based on mode
922 if page is not None:
923 return {"data": [], "pagination": {"page": page, "per_page": per_page or 30, "total": 0, "has_next": False, "has_prev": False}, "links": None}
925 if cursor is not None:
926 return ([], None)
928 return []
930 def count_team_owners(self, team_id: str) -> int:
931 """Count the number of owners in a team using SQL COUNT.
933 This is more efficient than loading all members and counting in Python.
935 Args:
936 team_id: ID of the team
938 Returns:
939 int: Number of active owners in the team
940 """
941 count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count()
942 self.db.commit() # Release transaction to avoid idle-in-transaction
943 return count
945 def _get_auth_cache(self):
946 """Get auth cache instance lazily.
948 Returns:
949 AuthCache instance or None if unavailable.
950 """
951 try:
952 # First-Party
953 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel
955 return get_auth_cache()
956 except ImportError:
957 return None
959 def _get_admin_stats_cache(self):
960 """Get admin stats cache instance lazily.
962 Returns:
963 AdminStatsCache instance or None if unavailable.
964 """
965 try:
966 # First-Party
967 from mcpgateway.cache.admin_stats_cache import get_admin_stats_cache # pylint: disable=import-outside-toplevel
969 return get_admin_stats_cache()
970 except ImportError:
971 return None
973 async def get_user_role_in_team(self, user_email: str, team_id: str) -> Optional[str]:
974 """Get a user's role in a specific team.
976 Uses caching to reduce database queries (called 11+ times per team operation).
978 Args:
979 user_email: Email of the user
980 team_id: ID of the team
982 Returns:
983 str: User's role or None if not a member
985 Examples:
986 Access control and permission checking.
987 """
988 # Check cache first
989 cache = self._get_auth_cache()
990 if cache:
991 cached_role = await cache.get_user_role(user_email, team_id)
992 if cached_role is not None:
993 # Empty string means "not a member" (cached None)
994 return cached_role if cached_role else None
996 try:
997 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).first()
998 self.db.commit() # Release transaction to avoid idle-in-transaction
1000 role = membership.role if membership else None
1002 # Store in cache
1003 if cache:
1004 await cache.set_user_role(user_email, team_id, role)
1006 return role
1008 except Exception as e:
1009 self.db.rollback()
1010 logger.error(f"Failed to get role for {user_email} in team {team_id}: {e}")
1011 return None
1013 async def list_teams(
1014 self,
1015 # Unified pagination params
1016 limit: int = 100,
1017 offset: int = 0,
1018 cursor: Optional[str] = None,
1019 page: Optional[int] = None,
1020 per_page: int = 50,
1021 include_inactive: bool = False,
1022 visibility_filter: Optional[str] = None,
1023 base_url: Optional[str] = None,
1024 include_personal: bool = False,
1025 search_query: Optional[str] = None,
1026 ) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
1027 """List teams with pagination support (cursor or page based).
1029 Args:
1030 limit: Max items for cursor pagination
1031 offset: Offset for legacy/cursor pagination
1032 cursor: Cursor token
1033 page: Page number (1-indexed)
1034 per_page: Items per page
1035 include_inactive: Whether to include inactive teams
1036 visibility_filter: Filter by visibility (private, team, public)
1037 base_url: Base URL for pagination links
1038 include_personal: Whether to include personal teams
1039 search_query: Search term for name/slug/description
1041 Returns:
1042 Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
1043 - Tuple (teams, next_cursor) if cursor/offset based
1044 - Dict {data, pagination, links} if page based
1045 """
1046 query = select(EmailTeam)
1048 if not include_personal:
1049 query = query.where(EmailTeam.is_personal.is_(False))
1051 if not include_inactive:
1052 query = query.where(EmailTeam.is_active.is_(True))
1054 if visibility_filter:
1055 query = query.where(EmailTeam.visibility == visibility_filter)
1057 if search_query:
1058 search_term = f"%{search_query}%"
1059 query = query.where(
1060 or_(
1061 EmailTeam.name.ilike(search_term),
1062 EmailTeam.slug.ilike(search_term),
1063 EmailTeam.description.ilike(search_term),
1064 )
1065 )
1067 # Choose ordering based on pagination mode:
1068 # - Page-based (UI): alphabetical by name for user-friendly display
1069 # - Cursor-based (API): created_at DESC, id DESC to match unified_paginate expectations
1070 if page is not None:
1071 query = query.order_by(EmailTeam.name, EmailTeam.id)
1072 else:
1073 query = query.order_by(desc(EmailTeam.created_at), desc(EmailTeam.id))
1075 # Base URL for pagination links (default to admin partial if not provided)
1076 if not base_url:
1077 base_url = f"{settings.app_root_path}/admin/teams/partial"
1079 # Apply offset manually for legacy offset-based pagination if not using page or cursor
1080 if not page and not cursor and offset > 0:
1081 query = query.offset(offset)
1083 result = await unified_paginate(
1084 db=self.db,
1085 query=query,
1086 cursor=cursor,
1087 limit=limit,
1088 page=page,
1089 per_page=per_page,
1090 base_url=base_url,
1091 )
1092 self.db.commit() # Release transaction to avoid idle-in-transaction
1093 return result
1095 async def get_all_team_ids(
1096 self,
1097 include_inactive: bool = False,
1098 visibility_filter: Optional[str] = None,
1099 include_personal: bool = False,
1100 search_query: Optional[str] = None,
1101 ) -> List[int]:
1102 """Get all team IDs matching criteria (unpaginated).
1104 Args:
1105 include_inactive: Whether to include inactive teams
1106 visibility_filter: Filter by visibility (private, team, public)
1107 include_personal: Whether to include personal teams
1108 search_query: Search term for name/slug
1110 Returns:
1111 List[int]: List of team IDs
1112 """
1113 query = select(EmailTeam.id)
1115 if not include_personal:
1116 query = query.where(EmailTeam.is_personal.is_(False))
1118 if not include_inactive:
1119 query = query.where(EmailTeam.is_active.is_(True))
1121 if visibility_filter:
1122 query = query.where(EmailTeam.visibility == visibility_filter)
1124 if search_query:
1125 search_term = f"%{search_query}%"
1126 query = query.where(
1127 or_(
1128 EmailTeam.name.ilike(search_term),
1129 EmailTeam.slug.ilike(search_term),
1130 )
1131 )
1133 result = self.db.execute(query)
1134 team_ids = [row[0] for row in result.all()]
1135 self.db.commit() # Release transaction to avoid idle-in-transaction
1136 return team_ids
1138 async def get_teams_count(
1139 self,
1140 include_inactive: bool = False,
1141 visibility_filter: Optional[str] = None,
1142 include_personal: bool = False,
1143 search_query: Optional[str] = None,
1144 ) -> int:
1145 """Get total count of teams matching criteria.
1147 Args:
1148 include_inactive: Whether to include inactive teams
1149 visibility_filter: Filter by visibility (private, team, public)
1150 include_personal: Whether to include personal teams
1151 search_query: Search term for name/slug
1153 Returns:
1154 int: Total count of matching teams
1155 """
1156 query = select(func.count(EmailTeam.id)) # pylint: disable=not-callable
1158 if not include_personal:
1159 query = query.where(EmailTeam.is_personal.is_(False))
1161 if not include_inactive:
1162 query = query.where(EmailTeam.is_active.is_(True))
1164 if visibility_filter:
1165 query = query.where(EmailTeam.visibility == visibility_filter)
1167 if search_query:
1168 search_term = f"%{search_query}%"
1169 query = query.where(
1170 or_(
1171 EmailTeam.name.ilike(search_term),
1172 EmailTeam.slug.ilike(search_term),
1173 )
1174 )
1176 result = self.db.execute(query)
1177 count = result.scalar() or 0
1178 self.db.commit() # Release transaction to avoid idle-in-transaction
1179 return count
1181 async def discover_public_teams(self, user_email: str, skip: int = 0, limit: Optional[int] = None) -> List[EmailTeam]:
1182 """Discover public teams that user can join.
1184 Args:
1185 user_email: Email of the user discovering teams
1186 skip: Number of teams to skip for pagination
1187 limit: Maximum number of teams to return (None for unlimited)
1189 Returns:
1190 List[EmailTeam]: List of public teams user can join
1192 Raises:
1193 Exception: If discovery fails
1194 """
1195 try:
1196 # Optimized: Use subquery instead of loading all IDs into memory (2 queries → 1)
1197 user_team_subquery = select(EmailTeamMember.team_id).where(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).scalar_subquery()
1199 query = self.db.query(EmailTeam).filter(EmailTeam.visibility == "public", EmailTeam.is_active.is_(True), EmailTeam.is_personal.is_(False), ~EmailTeam.id.in_(user_team_subquery))
1201 query = query.offset(skip)
1202 if limit is not None:
1203 query = query.limit(limit)
1204 teams = query.all()
1205 self.db.commit() # Release transaction to avoid idle-in-transaction
1206 return teams
1208 except Exception as e:
1209 self.db.rollback()
1210 logger.error(f"Failed to discover public teams for {user_email}: {e}")
1211 return []
1213 async def create_join_request(self, team_id: str, user_email: str, message: Optional[str] = None) -> "EmailTeamJoinRequest":
1214 """Create a request to join a public team.
1216 Args:
1217 team_id: ID of the team to join
1218 user_email: Email of the user requesting to join
1219 message: Optional message to team owners
1221 Returns:
1222 EmailTeamJoinRequest: Created join request
1224 Raises:
1225 ValueError: If team not found, not public, or user already member/has pending request
1226 """
1227 try:
1228 # Validate team
1229 team = await self.get_team_by_id(team_id)
1230 if not team:
1231 raise ValueError("Team not found")
1233 if team.visibility != "public":
1234 raise ValueError("Can only request to join public teams")
1236 # Check if user is already a member
1237 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first()
1239 if existing_member:
1240 raise ValueError("User is already a member of this team")
1242 # Check for existing requests (any status)
1243 existing_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.user_email == user_email).first()
1245 if existing_request:
1246 if existing_request.status == "pending" and not existing_request.is_expired():
1247 raise ValueError("User already has a pending join request for this team")
1249 # Update existing request (cancelled, rejected, expired) to pending
1250 existing_request.message = message or ""
1251 existing_request.status = "pending"
1252 existing_request.requested_at = utc_now()
1253 existing_request.expires_at = utc_now() + timedelta(days=7)
1254 existing_request.reviewed_at = None
1255 existing_request.reviewed_by = None
1256 existing_request.notes = None
1257 join_request = existing_request
1258 else:
1259 # Create new join request
1260 join_request = EmailTeamJoinRequest(team_id=team_id, user_email=user_email, message=message, expires_at=utc_now() + timedelta(days=7))
1261 self.db.add(join_request)
1263 self.db.commit()
1264 self.db.refresh(join_request)
1266 logger.info(f"Created join request for user {user_email} to team {team_id}")
1267 return join_request
1269 except Exception as e:
1270 self.db.rollback()
1271 logger.error(f"Failed to create join request: {e}")
1272 raise
1274 async def list_join_requests(self, team_id: str) -> List["EmailTeamJoinRequest"]:
1275 """List pending join requests for a team.
1277 Args:
1278 team_id: ID of the team
1280 Returns:
1281 List[EmailTeamJoinRequest]: List of pending join requests
1282 """
1283 try:
1284 requests = (
1285 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.status == "pending").order_by(EmailTeamJoinRequest.requested_at.desc()).all()
1286 )
1287 return requests
1289 except Exception as e:
1290 logger.error(f"Failed to list join requests for team {team_id}: {e}")
1291 return []
1293 async def approve_join_request(self, request_id: str, approved_by: str) -> Optional[EmailTeamMember]:
1294 """Approve a team join request.
1296 Args:
1297 request_id: ID of the join request
1298 approved_by: Email of the user approving the request
1300 Returns:
1301 EmailTeamMember: New team member or None if request not found
1303 Raises:
1304 ValueError: If request not found, expired, or already processed
1305 """
1306 try:
1307 # Get join request
1308 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first()
1310 if not join_request:
1311 raise ValueError("Join request not found or already processed")
1313 if join_request.is_expired():
1314 join_request.status = "expired"
1315 self.db.commit()
1316 raise ValueError("Join request has expired")
1318 # Add user to team
1319 member = EmailTeamMember(team_id=join_request.team_id, user_email=join_request.user_email, role="member", invited_by=approved_by, joined_at=utc_now()) # New joiners are always members
1321 self.db.add(member)
1322 # Update join request status
1323 join_request.status = "approved"
1324 join_request.reviewed_at = utc_now()
1325 join_request.reviewed_by = approved_by
1327 self.db.flush()
1328 self._log_team_member_action(member.id, join_request.team_id, join_request.user_email, member.role, "added", approved_by)
1330 self.db.refresh(member)
1332 # Invalidate auth cache for user's team membership and role
1333 try:
1334 asyncio.create_task(auth_cache.invalidate_team(join_request.user_email))
1335 asyncio.create_task(auth_cache.invalidate_user_role(join_request.user_email, join_request.team_id))
1336 asyncio.create_task(auth_cache.invalidate_user_teams(join_request.user_email))
1337 asyncio.create_task(auth_cache.invalidate_team_membership(join_request.user_email))
1338 asyncio.create_task(admin_stats_cache.invalidate_teams())
1339 except Exception as cache_error:
1340 logger.debug(f"Failed to invalidate caches on join approval: {cache_error}")
1342 # Invalidate member count cache for this team
1343 await self.invalidate_team_member_count_cache(str(join_request.team_id))
1345 logger.info(f"Approved join request {request_id}: user {join_request.user_email} joined team {join_request.team_id}")
1346 return member
1348 except Exception as e:
1349 self.db.rollback()
1350 logger.error(f"Failed to approve join request {request_id}: {e}")
1351 raise
1353 async def reject_join_request(self, request_id: str, rejected_by: str) -> bool:
1354 """Reject a team join request.
1356 Args:
1357 request_id: ID of the join request
1358 rejected_by: Email of the user rejecting the request
1360 Returns:
1361 bool: True if request was rejected successfully
1363 Raises:
1364 ValueError: If request not found or already processed
1365 """
1366 try:
1367 # Get join request
1368 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first()
1370 if not join_request:
1371 raise ValueError("Join request not found or already processed")
1373 # Update join request status
1374 join_request.status = "rejected"
1375 join_request.reviewed_at = utc_now()
1376 join_request.reviewed_by = rejected_by
1378 self.db.commit()
1380 logger.info(f"Rejected join request {request_id}: user {join_request.user_email} for team {join_request.team_id}")
1381 return True
1383 except Exception as e:
1384 self.db.rollback()
1385 logger.error(f"Failed to reject join request {request_id}: {e}")
1386 raise
1388 async def get_user_join_requests(self, user_email: str, team_id: Optional[str] = None) -> List["EmailTeamJoinRequest"]:
1389 """Get join requests made by a user.
1391 Args:
1392 user_email: Email of the user
1393 team_id: Optional team ID to filter requests
1395 Returns:
1396 List[EmailTeamJoinRequest]: List of join requests made by the user
1398 Examples:
1399 Get all requests made by a user or for a specific team.
1400 """
1401 try:
1402 query = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email)
1404 if team_id:
1405 query = query.filter(EmailTeamJoinRequest.team_id == team_id)
1407 requests = query.all()
1408 return requests
1410 except Exception as e:
1411 logger.error(f"Failed to get join requests for user {user_email}: {e}")
1412 return []
1414 async def cancel_join_request(self, request_id: str, user_email: str) -> bool:
1415 """Cancel a join request.
1417 Args:
1418 request_id: ID of the join request to cancel
1419 user_email: Email of the user canceling the request
1421 Returns:
1422 bool: True if canceled successfully, False otherwise
1424 Examples:
1425 Allow users to cancel their pending join requests.
1426 """
1427 try:
1428 # Get the join request
1429 join_request = (
1430 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.status == "pending").first()
1431 )
1433 if not join_request:
1434 logger.warning(f"Join request {request_id} not found for user {user_email} or not pending")
1435 return False
1437 # Update join request status
1438 join_request.status = "cancelled"
1439 join_request.reviewed_at = utc_now()
1440 join_request.reviewed_by = user_email
1442 self.db.commit()
1444 logger.info(f"Cancelled join request {request_id} by user {user_email}")
1445 return True
1447 except Exception as e:
1448 self.db.rollback()
1449 logger.error(f"Failed to cancel join request {request_id}: {e}")
1450 return False
1452 # ==================================================================================
1453 # Batch Query Methods (N+1 Query Elimination - Issue #1892)
1454 # ==================================================================================
1456 def get_member_counts_batch(self, team_ids: List[str]) -> Dict[str, int]:
1457 """Get member counts for multiple teams in a single query.
1459 This is a synchronous method following the existing service pattern.
1460 Note: Like other sync SQLAlchemy calls, this will block the event
1461 loop in async contexts. For typical team counts this is acceptable.
1463 Args:
1464 team_ids: List of team UUIDs
1466 Returns:
1467 Dict mapping team_id to member count
1469 Raises:
1470 Exception: Re-raises any database errors after rollback
1472 Examples:
1473 >>> from unittest.mock import Mock
1474 >>> service = TeamManagementService(Mock())
1475 >>> service.get_member_counts_batch([])
1476 {}
1477 """
1478 if not team_ids:
1479 return {}
1481 try:
1482 # Single query for all teams
1483 results = (
1484 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable
1485 .filter(EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True))
1486 .group_by(EmailTeamMember.team_id)
1487 .all()
1488 )
1490 self.db.commit() # Release transaction to avoid idle-in-transaction
1492 # Build result dict, defaulting to 0 for teams with no members
1493 counts = {str(row.team_id): row.count for row in results}
1494 return {tid: counts.get(tid, 0) for tid in team_ids}
1495 except Exception as e:
1496 self.db.rollback()
1497 logger.error(f"Failed to get member counts for teams: {e}")
1498 raise
1500 def get_user_roles_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[str]]:
1501 """Get a user's role in multiple teams in a single query.
1503 Args:
1504 user_email: Email of the user
1505 team_ids: List of team UUIDs
1507 Returns:
1508 Dict mapping team_id to role (or None if not a member)
1510 Raises:
1511 Exception: Re-raises any database errors after rollback
1512 """
1513 if not team_ids:
1514 return {}
1516 try:
1517 # Single query for all teams
1518 results = (
1519 self.db.query(EmailTeamMember.team_id, EmailTeamMember.role)
1520 .filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True))
1521 .all()
1522 )
1524 self.db.commit() # Release transaction to avoid idle-in-transaction
1526 # Build result dict - teams with no membership return None
1527 roles = {str(row.team_id): row.role for row in results}
1528 return {tid: roles.get(tid) for tid in team_ids}
1529 except Exception as e:
1530 self.db.rollback()
1531 logger.error(f"Failed to get user roles for {user_email}: {e}")
1532 raise
1534 def get_pending_join_requests_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[Any]]:
1535 """Get pending join requests for a user across multiple teams in a single query.
1537 Args:
1538 user_email: Email of the user
1539 team_ids: List of team UUIDs to check
1541 Returns:
1542 Dict mapping team_id to pending EmailTeamJoinRequest (or None if no pending request)
1544 Raises:
1545 Exception: Re-raises any database errors after rollback
1546 """
1547 if not team_ids:
1548 return {}
1550 try:
1551 # Single query for all pending requests across teams
1552 results = (
1553 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.team_id.in_(team_ids), EmailTeamJoinRequest.status == "pending").all()
1554 )
1556 self.db.commit() # Release transaction to avoid idle-in-transaction
1558 # Build result dict - only one pending request per team expected
1559 pending_reqs = {str(req.team_id): req for req in results}
1560 return {tid: pending_reqs.get(tid) for tid in team_ids}
1561 except Exception as e:
1562 self.db.rollback()
1563 logger.error(f"Failed to get pending join requests for {user_email}: {e}")
1564 raise
1566 # ==================================================================================
1567 # Cached Batch Methods (Redis caching for member counts)
1568 # ==================================================================================
1570 def _get_member_count_cache_key(self, team_id: str) -> str:
1571 """Build cache key using settings.cache_prefix for consistency.
1573 Args:
1574 team_id: Team UUID to build cache key for
1576 Returns:
1577 Cache key string in format "{prefix}team:member_count:{team_id}"
1578 """
1579 cache_prefix = getattr(settings, "cache_prefix", "mcpgw:")
1580 return f"{cache_prefix}team:member_count:{team_id}"
1582 async def get_member_counts_batch_cached(self, team_ids: List[str]) -> Dict[str, int]:
1583 """Get member counts for multiple teams, using Redis cache with DB fallback.
1585 Caching behavior is controlled by settings:
1586 - team_member_count_cache_enabled: Enable/disable caching (default: True)
1587 - team_member_count_cache_ttl: Cache TTL in seconds (default: 300)
1589 Args:
1590 team_ids: List of team UUIDs
1592 Returns:
1593 Dict mapping team_id to member count
1595 Raises:
1596 Exception: Re-raises any database errors after rollback
1597 """
1598 if not team_ids:
1599 return {}
1601 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True)
1602 cache_ttl = getattr(settings, "team_member_count_cache_ttl", 300)
1604 # If caching disabled, go straight to batch DB query
1605 if not cache_enabled:
1606 return self.get_member_counts_batch(team_ids)
1608 # Import Redis client lazily
1609 try:
1610 # First-Party
1611 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel
1613 redis_client = await get_redis_client()
1614 except Exception:
1615 redis_client = None
1617 result: Dict[str, int] = {}
1618 cache_misses: List[str] = []
1620 # Step 1: Check Redis cache for all team IDs
1621 if redis_client:
1622 try:
1623 cache_keys = [self._get_member_count_cache_key(tid) for tid in team_ids]
1624 cached_values = await redis_client.mget(cache_keys)
1626 for tid, cached in zip(team_ids, cached_values):
1627 if cached is not None:
1628 result[tid] = int(cached)
1629 else:
1630 cache_misses.append(tid)
1631 except Exception as e:
1632 logger.warning(f"Redis cache read failed, falling back to DB: {e}")
1633 cache_misses = list(team_ids)
1634 else:
1635 # No Redis available, fall back to DB
1636 cache_misses = list(team_ids)
1638 # Step 2: Query database for cache misses
1639 if cache_misses:
1640 try:
1641 db_results = (
1642 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable
1643 .filter(EmailTeamMember.team_id.in_(cache_misses), EmailTeamMember.is_active.is_(True))
1644 .group_by(EmailTeamMember.team_id)
1645 .all()
1646 )
1648 self.db.commit()
1650 db_counts = {str(row.team_id): row.count for row in db_results}
1652 # Fill in results and cache them
1653 for tid in cache_misses:
1654 count = db_counts.get(tid, 0)
1655 result[tid] = count
1657 # Step 3: Cache the result with configured TTL
1658 if redis_client:
1659 try:
1660 await redis_client.setex(self._get_member_count_cache_key(tid), cache_ttl, str(count))
1661 except Exception as e:
1662 logger.warning(f"Redis cache write failed for team {tid}: {e}")
1664 except Exception as e:
1665 self.db.rollback()
1666 logger.error(f"Failed to get member counts for teams: {e}")
1667 raise
1669 return result
1671 async def invalidate_team_member_count_cache(self, team_id: str) -> None:
1672 """Invalidate the cached member count for a team.
1674 Call this after any membership changes (add/remove/update).
1675 No-op if caching is disabled or Redis unavailable.
1677 Args:
1678 team_id: Team UUID to invalidate
1679 """
1680 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True)
1681 if not cache_enabled:
1682 return
1684 try:
1685 # First-Party
1686 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel
1688 redis_client = await get_redis_client()
1689 if redis_client:
1690 await redis_client.delete(self._get_member_count_cache_key(team_id))
1691 except Exception as e:
1692 logger.warning(f"Failed to invalidate member count cache for team {team_id}: {e}")