Coverage for mcpgateway / services / team_invitation_service.py: 100%
190 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/team_invitation_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Invitation Service.
8This module provides team invitation creation, management, and acceptance
9for the multi-team collaboration system.
11Examples:
12 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
13 >>> from mcpgateway.db import SessionLocal
14 >>> db = SessionLocal()
15 >>> service = TeamInvitationService(db)
16 >>> # Service handles team invitation lifecycle
17"""
19# Standard
20from datetime import timedelta
21import secrets
22from typing import List, Optional
24# Third-Party
25from sqlalchemy.orm import Session
27# First-Party
28from mcpgateway.config import settings
29from mcpgateway.db import EmailTeam, EmailTeamInvitation, EmailTeamMember, EmailUser, utc_now
30from mcpgateway.services.logging_service import LoggingService
32# Initialize logging
33logging_service = LoggingService()
34logger = logging_service.get_logger(__name__)
37class TeamInvitationService:
38 """Service for team invitation management.
40 This service handles invitation creation, validation, acceptance,
41 and cleanup for team membership management.
43 Attributes:
44 db (Session): SQLAlchemy database session
46 Examples:
47 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
48 >>> from mcpgateway.db import SessionLocal
49 >>> db = SessionLocal()
50 >>> service = TeamInvitationService(db)
51 >>> service.db is not None
52 True
53 """
55 def __init__(self, db: Session):
56 """Initialize the team invitation service.
58 Args:
59 db: SQLAlchemy database session
61 Examples:
62 Basic initialization:
63 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
64 >>> from unittest.mock import Mock
65 >>> db_session = Mock()
66 >>> service = TeamInvitationService(db_session)
67 >>> service.db is db_session
68 True
70 Service attributes:
71 >>> hasattr(service, 'db')
72 True
73 >>> service.__class__.__name__
74 'TeamInvitationService'
75 """
76 self.db = db
78 def _generate_invitation_token(self) -> str:
79 """Generate a secure invitation token.
81 Returns:
82 str: A cryptographically secure random token
84 Examples:
85 Test token generation:
86 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
87 >>> from unittest.mock import Mock
88 >>> db_session = Mock()
89 >>> service = TeamInvitationService(db_session)
90 >>> token = service._generate_invitation_token()
91 >>> isinstance(token, str)
92 True
93 >>> len(token) > 0
94 True
96 Token characteristics:
97 >>> # Test that token is URL-safe
98 >>> import string
99 >>> valid_chars = string.ascii_letters + string.digits + '-_'
100 >>> all(c in valid_chars for c in token)
101 True
103 >>> # Test token length (base64-encoded 32 bytes)
104 >>> len(token) >= 32 # URL-safe base64 of 32 bytes is ~43 chars
105 True
107 Token uniqueness:
108 >>> token1 = service._generate_invitation_token()
109 >>> token2 = service._generate_invitation_token()
110 >>> token1 != token2
111 True
112 """
113 return secrets.token_urlsafe(32)
115 async def create_invitation(self, team_id: str, email: str, role: str, invited_by: str, expiry_days: Optional[int] = None) -> Optional[EmailTeamInvitation]:
116 """Create a team invitation.
118 Args:
119 team_id: ID of the team
120 email: Email address to invite
121 role: Role to assign (owner, member)
122 invited_by: Email of user sending the invitation
123 expiry_days: Days until invitation expires (default from settings)
125 Returns:
126 EmailTeamInvitation: The created invitation or None if failed
128 Raises:
129 ValueError: If invitation parameters are invalid
130 Exception: If invitation creation fails
132 Examples:
133 Team owners can send invitations to new members.
134 """
135 try:
136 # Validate role
137 valid_roles = ["owner", "member"]
138 if role not in valid_roles:
139 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
141 # Check if team exists
142 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
144 if not team:
145 logger.warning(f"Team {team_id} not found")
146 return None
148 # Prevent invitations to personal teams
149 if team.is_personal:
150 logger.warning(f"Cannot send invitations to personal team {team_id}")
151 raise ValueError("Cannot send invitations to personal teams")
153 # Check if inviter exists and is a team member
154 inviter = self.db.query(EmailUser).filter(EmailUser.email == invited_by).first()
155 if not inviter:
156 logger.warning(f"Inviter {invited_by} not found")
157 return None
159 # Check if inviter is a member of the team with appropriate permissions
160 inviter_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == invited_by, EmailTeamMember.is_active.is_(True)).first()
162 if not inviter_membership:
163 logger.warning(f"Inviter {invited_by} is not a member of team {team_id}")
164 raise ValueError("Only team members can send invitations")
166 # Only owners can send invitations
167 if inviter_membership.role != "owner":
168 logger.warning(f"User {invited_by} does not have permission to invite to team {team_id}")
169 raise ValueError("Only team owners can send invitations")
171 # Check if user is already a team member
172 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == email, EmailTeamMember.is_active.is_(True)).first()
174 if existing_member:
175 logger.warning(f"User {email} is already a member of team {team_id}")
176 raise ValueError(f"User {email} is already a member of this team")
178 # Check for existing active invitations
179 existing_invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.email == email, EmailTeamInvitation.is_active.is_(True)).first()
181 if existing_invitation and not existing_invitation.is_expired():
182 logger.warning(f"Active invitation already exists for {email} to team {team_id}")
183 raise ValueError(f"An active invitation already exists for {email}")
185 # Check team member limit
186 if team.max_members:
187 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count()
189 pending_invitation_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.is_active.is_(True)).count()
191 if (current_member_count + pending_invitation_count) >= team.max_members:
192 logger.warning(f"Team {team_id} has reached maximum member limit")
193 raise ValueError(f"Team has reached maximum member limit of {team.max_members}")
195 # Deactivate any existing invitations for this email/team combination
196 if existing_invitation:
197 existing_invitation.is_active = False
199 # Set expiry
200 if expiry_days is None:
201 expiry_days = getattr(settings, "invitation_expiry_days", 7)
202 expires_at = utc_now() + timedelta(days=expiry_days)
204 # Create the invitation
205 invitation = EmailTeamInvitation(
206 team_id=team_id, email=email, role=role, invited_by=invited_by, invited_at=utc_now(), expires_at=expires_at, token=self._generate_invitation_token(), is_active=True
207 )
209 self.db.add(invitation)
210 self.db.commit()
212 logger.info(f"Created invitation for {email} to team {team_id} by {invited_by}")
213 return invitation
215 except Exception as e:
216 self.db.rollback()
217 logger.error(f"Failed to create invitation for {email} to team {team_id}: {e}")
218 raise
220 async def get_invitation_by_token(self, token: str) -> Optional[EmailTeamInvitation]:
221 """Get an invitation by its token.
223 Args:
224 token: The invitation token
226 Returns:
227 EmailTeamInvitation: The invitation or None if not found
229 Examples:
230 Used for invitation acceptance and validation.
231 """
232 try:
233 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.token == token).first()
235 return invitation
237 except Exception as e:
238 logger.error(f"Failed to get invitation by token: {e}")
239 return None
241 async def accept_invitation(self, token: str, accepting_user_email: Optional[str] = None) -> bool:
242 """Accept a team invitation.
244 Args:
245 token: The invitation token
246 accepting_user_email: Email of user accepting (for validation)
248 Returns:
249 bool: True if invitation was accepted successfully, False otherwise
251 Raises:
252 ValueError: If invitation is invalid or expired
253 Exception: If acceptance fails
255 Examples:
256 Users can accept invitations to join teams.
257 """
258 try:
259 # Get the invitation
260 invitation = await self.get_invitation_by_token(token)
261 if not invitation:
262 logger.warning("Invitation not found for token")
263 raise ValueError("Invitation not found")
265 # Check if invitation is valid
266 if not invitation.is_valid():
267 logger.warning(f"Invalid or expired invitation for {invitation.email}")
268 raise ValueError("Invitation is invalid or expired")
270 # Validate accepting user email if provided
271 if accepting_user_email and accepting_user_email != invitation.email:
272 logger.warning(f"Email mismatch: invitation for {invitation.email}, accepting as {accepting_user_email}")
273 raise ValueError("Email address does not match invitation")
275 # Check if user exists (if email provided, they must exist)
276 if accepting_user_email:
277 user = self.db.query(EmailUser).filter(EmailUser.email == accepting_user_email).first()
278 if not user:
279 logger.warning(f"User {accepting_user_email} not found")
280 raise ValueError("User account not found")
282 # Check if team still exists
283 team = self.db.query(EmailTeam).filter(EmailTeam.id == invitation.team_id, EmailTeam.is_active.is_(True)).first()
285 if not team:
286 logger.warning(f"Team {invitation.team_id} not found or inactive")
287 raise ValueError("Team not found or inactive")
289 # Check if user is already a member
290 existing_member = (
291 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == invitation.email, EmailTeamMember.is_active.is_(True)).first()
292 )
294 if existing_member:
295 logger.warning(f"User {invitation.email} is already a member of team {invitation.team_id}")
296 # Deactivate the invitation since they're already a member
297 invitation.is_active = False
298 self.db.commit()
299 raise ValueError("User is already a member of this team")
301 # Check team member limit
302 if team.max_members:
303 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.is_active.is_(True)).count()
304 if current_member_count >= team.max_members:
305 logger.warning(f"Team {invitation.team_id} has reached maximum member limit")
306 raise ValueError(f"Team has reached maximum member limit of {team.max_members}")
308 # Create team membership
309 membership = EmailTeamMember(team_id=invitation.team_id, user_email=invitation.email, role=invitation.role, joined_at=utc_now(), invited_by=invitation.invited_by, is_active=True)
311 self.db.add(membership)
313 # Deactivate the invitation
314 invitation.is_active = False
316 self.db.commit()
318 # Invalidate auth cache for user's team membership
319 try:
320 # Standard
321 import asyncio # pylint: disable=import-outside-toplevel
323 # First-Party
324 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
326 asyncio.create_task(auth_cache.invalidate_team(invitation.email))
327 asyncio.create_task(auth_cache.invalidate_user_role(invitation.email, invitation.team_id))
328 asyncio.create_task(auth_cache.invalidate_user_teams(invitation.email))
329 asyncio.create_task(auth_cache.invalidate_team_membership(invitation.email))
330 except Exception as cache_error:
331 logger.debug(f"Failed to invalidate cache on invitation acceptance: {cache_error}")
333 logger.info(f"User {invitation.email} accepted invitation to team {invitation.team_id}")
334 return True
336 except Exception as e:
337 self.db.rollback()
338 logger.error(f"Failed to accept invitation: {e}")
339 raise
341 async def decline_invitation(self, token: str, declining_user_email: Optional[str] = None) -> bool:
342 """Decline a team invitation.
344 Args:
345 token: The invitation token
346 declining_user_email: Email of user declining (for validation)
348 Returns:
349 bool: True if invitation was declined successfully, False otherwise
351 Examples:
352 Users can decline invitations they don't want to accept.
353 """
354 try:
355 # Get the invitation
356 invitation = await self.get_invitation_by_token(token)
357 if not invitation:
358 logger.warning("Invitation not found for token")
359 return False
361 # Validate declining user email if provided
362 if declining_user_email and declining_user_email != invitation.email:
363 logger.warning(f"Email mismatch: invitation for {invitation.email}, declining as {declining_user_email}")
364 return False
366 # Deactivate the invitation
367 invitation.is_active = False
368 self.db.commit()
370 logger.info(f"User {invitation.email} declined invitation to team {invitation.team_id}")
371 return True
373 except Exception as e:
374 self.db.rollback()
375 logger.error(f"Failed to decline invitation: {e}")
376 return False
378 async def revoke_invitation(self, invitation_id: str, revoked_by: str) -> bool:
379 """Revoke a team invitation.
381 Args:
382 invitation_id: ID of the invitation to revoke
383 revoked_by: Email of user revoking the invitation
385 Returns:
386 bool: True if invitation was revoked successfully, False otherwise
388 Examples:
389 Team owners can revoke pending invitations.
390 """
391 try:
392 # Get the invitation
393 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id, EmailTeamInvitation.is_active.is_(True)).first()
395 if not invitation:
396 logger.warning(f"Active invitation {invitation_id} not found")
397 return False
399 # Check if revoker has permission
400 revoker_membership = (
401 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == revoked_by, EmailTeamMember.is_active.is_(True)).first()
402 )
404 if not revoker_membership or revoker_membership.role != "owner":
405 logger.warning(f"User {revoked_by} does not have permission to revoke invitation {invitation_id}")
406 return False
408 # Revoke the invitation
409 invitation.is_active = False
410 self.db.commit()
412 logger.info(f"Invitation {invitation_id} revoked by {revoked_by}")
413 return True
415 except Exception as e:
416 self.db.rollback()
417 logger.error(f"Failed to revoke invitation {invitation_id}: {e}")
418 return False
420 async def get_team_invitations(self, team_id: str, active_only: bool = True) -> List[EmailTeamInvitation]:
421 """Get all invitations for a team.
423 Args:
424 team_id: ID of the team
425 active_only: Whether to return only active invitations
427 Returns:
428 List[EmailTeamInvitation]: List of team invitations
430 Examples:
431 Team management interface showing pending invitations.
432 """
433 try:
434 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id)
436 if active_only:
437 query = query.filter(EmailTeamInvitation.is_active.is_(True))
439 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all()
440 return invitations
442 except Exception as e:
443 logger.error(f"Failed to get invitations for team {team_id}: {e}")
444 return []
446 async def get_user_invitations(self, email: str, active_only: bool = True) -> List[EmailTeamInvitation]:
447 """Get all invitations for a user.
449 Args:
450 email: Email address of the user
451 active_only: Whether to return only active invitations
453 Returns:
454 List[EmailTeamInvitation]: List of invitations for the user
456 Examples:
457 User dashboard showing pending team invitations.
458 """
459 try:
460 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.email == email)
462 if active_only:
463 query = query.filter(EmailTeamInvitation.is_active.is_(True))
465 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all()
466 return invitations
468 except Exception as e:
469 logger.error(f"Failed to get invitations for user {email}: {e}")
470 return []
472 async def cleanup_expired_invitations(self) -> int:
473 """Clean up expired invitations.
475 Returns:
476 int: Number of invitations cleaned up
478 Examples:
479 Periodic cleanup task to remove expired invitations.
480 """
481 try:
482 now = utc_now()
483 expired_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.expires_at < now, EmailTeamInvitation.is_active.is_(True)).update({"is_active": False})
485 self.db.commit()
487 if expired_count > 0:
488 logger.info(f"Cleaned up {expired_count} expired invitations")
490 return expired_count
492 except Exception as e:
493 self.db.rollback()
494 logger.error(f"Failed to cleanup expired invitations: {e}")
495 return 0