Coverage for mcpgateway / services / team_invitation_service.py: 99%
219 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/team_invitation_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Invitation Service.
8This module provides team invitation creation, management, and acceptance
9for the multi-team collaboration system.
11Examples:
12 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
13 >>> from mcpgateway.db import SessionLocal
14 >>> db = SessionLocal()
15 >>> service = TeamInvitationService(db)
16 >>> # Service handles team invitation lifecycle
17"""
19# Standard
20import asyncio
21from datetime import timedelta
22import secrets
23from typing import Any, List, Optional
25# Third-Party
26from sqlalchemy.orm import Session
28# First-Party
29from mcpgateway.cache.auth_cache import auth_cache
30from mcpgateway.config import settings
31from mcpgateway.db import EmailTeam, EmailTeamInvitation, EmailTeamMember, EmailUser, utc_now
32from mcpgateway.services.logging_service import LoggingService
33from mcpgateway.services.team_management_service import get_user_team_count
35# Initialize logging
36logging_service = LoggingService()
37logger = logging_service.get_logger(__name__)
40class TeamInvitationService:
41 """Service for team invitation management.
43 This service handles invitation creation, validation, acceptance,
44 and cleanup for team membership management.
46 Attributes:
47 db (Session): SQLAlchemy database session
49 Examples:
50 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
51 >>> from mcpgateway.db import SessionLocal
52 >>> db = SessionLocal()
53 >>> service = TeamInvitationService(db)
54 >>> service.db is not None
55 True
56 """
58 def __init__(self, db: Session):
59 """Initialize the team invitation service.
61 Args:
62 db: SQLAlchemy database session
64 Examples:
65 Basic initialization:
66 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
67 >>> from unittest.mock import Mock
68 >>> db_session = Mock()
69 >>> service = TeamInvitationService(db_session)
70 >>> service.db is db_session
71 True
73 Service attributes:
74 >>> hasattr(service, 'db')
75 True
76 >>> service.__class__.__name__
77 'TeamInvitationService'
78 """
79 self.db = db
81 def _get_user_team_count(self, user_email: str) -> int:
82 """Get the number of active teams a user belongs to.
84 Args:
85 user_email: Email address of the user
87 Returns:
88 int: Number of active team memberships
89 """
90 return get_user_team_count(self.db, user_email)
92 @staticmethod
93 def _fire_and_forget(coro: Any) -> None:
94 """Schedule a background coroutine and close it if scheduling fails.
96 Args:
97 coro: The coroutine to schedule as a background task.
99 Raises:
100 Exception: If asyncio.create_task fails (e.g. no running loop).
101 """
102 try:
103 task = asyncio.create_task(coro)
104 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task):
105 close = getattr(coro, "close", None)
106 if callable(close):
107 close()
108 except Exception:
109 close = getattr(coro, "close", None)
110 if callable(close):
111 close()
112 raise
114 def _generate_invitation_token(self) -> str:
115 """Generate a secure invitation token.
117 Returns:
118 str: A cryptographically secure random token
120 Examples:
121 Test token generation:
122 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
123 >>> from unittest.mock import Mock
124 >>> db_session = Mock()
125 >>> service = TeamInvitationService(db_session)
126 >>> token = service._generate_invitation_token()
127 >>> isinstance(token, str)
128 True
129 >>> len(token) > 0
130 True
132 Token characteristics:
133 >>> # Test that token is URL-safe
134 >>> import string
135 >>> valid_chars = string.ascii_letters + string.digits + '-_'
136 >>> all(c in valid_chars for c in token)
137 True
139 >>> # Test token length (base64-encoded 32 bytes)
140 >>> len(token) >= 32 # URL-safe base64 of 32 bytes is ~43 chars
141 True
143 Token uniqueness:
144 >>> token1 = service._generate_invitation_token()
145 >>> token2 = service._generate_invitation_token()
146 >>> token1 != token2
147 True
148 """
149 return secrets.token_urlsafe(32)
151 async def create_invitation(self, team_id: str, email: str, role: str, invited_by: str, expiry_days: Optional[int] = None) -> Optional[EmailTeamInvitation]:
152 """Create a team invitation.
154 Args:
155 team_id: ID of the team
156 email: Email address to invite
157 role: Role to assign (owner, member)
158 invited_by: Email of user sending the invitation
159 expiry_days: Days until invitation expires (default from settings)
161 Returns:
162 EmailTeamInvitation: The created invitation or None if failed
164 Raises:
165 ValueError: If invitation parameters are invalid
166 Exception: If invitation creation fails
168 Examples:
169 Team owners can send invitations to new members.
170 """
171 try:
172 # Check feature flag
173 if not getattr(settings, "allow_team_invitations", True):
174 raise ValueError("Team invitations are currently disabled")
176 # Validate role
177 valid_roles = ["owner", "member"]
178 if role not in valid_roles:
179 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
181 # Check if team exists
182 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
184 if not team:
185 logger.warning(f"Team {team_id} not found")
186 return None
188 # Prevent invitations to personal teams
189 if team.is_personal:
190 logger.warning(f"Cannot send invitations to personal team {team_id}")
191 raise ValueError("Cannot send invitations to personal teams")
193 # Check if inviter exists and is a team member
194 inviter = self.db.query(EmailUser).filter(EmailUser.email == invited_by).first()
195 if not inviter:
196 logger.warning(f"Inviter {invited_by} not found")
197 return None
199 # Check email verification requirement for invitee
200 if getattr(settings, "require_email_verification_for_invites", True):
201 invitee = self.db.query(EmailUser).filter(EmailUser.email == email).first()
202 if invitee and not invitee.email_verified_at:
203 raise ValueError("Invitee email address has not been verified")
205 # Check if inviter is a member of the team with appropriate permissions
206 inviter_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == invited_by, EmailTeamMember.is_active.is_(True)).first()
208 if not inviter_membership:
209 logger.warning(f"Inviter {invited_by} is not a member of team {team_id}")
210 raise ValueError("Only team members can send invitations")
212 # Only owners can send invitations
213 if inviter_membership.role != "owner":
214 logger.warning(f"User {invited_by} does not have permission to invite to team {team_id}")
215 raise ValueError("Only team owners can send invitations")
217 # Check if user is already a team member
218 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == email, EmailTeamMember.is_active.is_(True)).first()
220 if existing_member:
221 logger.warning(f"User {email} is already a member of team {team_id}")
222 raise ValueError(f"User {email} is already a member of this team")
224 # Check for existing active invitations
225 existing_invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.email == email, EmailTeamInvitation.is_active.is_(True)).first()
227 if existing_invitation and not existing_invitation.is_expired():
228 logger.warning(f"Active invitation already exists for {email} to team {team_id}")
229 raise ValueError(f"An active invitation already exists for {email}")
231 # Check team member limit
232 if team.max_members:
233 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count()
235 pending_invitation_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.is_active.is_(True)).count()
237 if (current_member_count + pending_invitation_count) >= team.max_members:
238 logger.warning(f"Team {team_id} has reached maximum member limit")
239 raise ValueError(f"Team has reached maximum member limit of {team.max_members}")
241 # Deactivate any existing invitations for this email/team combination
242 if existing_invitation:
243 existing_invitation.is_active = False
245 # Set expiry
246 if expiry_days is None:
247 expiry_days = getattr(settings, "invitation_expiry_days", 7)
248 expires_at = utc_now() + timedelta(days=expiry_days)
250 # Create the invitation
251 invitation = EmailTeamInvitation(
252 team_id=team_id, email=email, role=role, invited_by=invited_by, invited_at=utc_now(), expires_at=expires_at, token=self._generate_invitation_token(), is_active=True
253 )
255 self.db.add(invitation)
256 self.db.commit()
258 logger.info(f"Created invitation for {email} to team {team_id} by {invited_by}")
259 return invitation
261 except Exception as e:
262 self.db.rollback()
263 logger.error(f"Failed to create invitation for {email} to team {team_id}: {e}")
264 raise
266 async def get_invitation_by_token(self, token: str) -> Optional[EmailTeamInvitation]:
267 """Get an invitation by its token.
269 Args:
270 token: The invitation token
272 Returns:
273 EmailTeamInvitation: The invitation or None if not found
275 Examples:
276 Used for invitation acceptance and validation.
277 """
278 try:
279 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.token == token).first()
281 return invitation
283 except Exception as e:
284 logger.error(f"Failed to get invitation by token: {e}")
285 return None
287 async def accept_invitation(self, token: str, accepting_user_email: Optional[str] = None) -> EmailTeamMember:
288 """Accept a team invitation.
290 Args:
291 token: The invitation token
292 accepting_user_email: Email of user accepting (for validation)
294 Returns:
295 EmailTeamMember: The created team membership record
297 Raises:
298 ValueError: If invitation is invalid or expired
299 Exception: If acceptance fails
301 Examples:
302 Users can accept invitations to join teams.
303 """
304 try:
305 # Get the invitation
306 invitation = await self.get_invitation_by_token(token)
307 if not invitation:
308 logger.warning("Invitation not found for token")
309 raise ValueError("Invitation not found")
311 # Check if invitation is valid
312 if not invitation.is_valid():
313 logger.warning(f"Invalid or expired invitation for {invitation.email}")
314 raise ValueError("Invitation is invalid or expired")
316 # Validate accepting user email if provided
317 if accepting_user_email and accepting_user_email != invitation.email:
318 logger.warning(f"Email mismatch: invitation for {invitation.email}, accepting as {accepting_user_email}")
319 raise ValueError("Email address does not match invitation")
321 # Check if user exists (if email provided, they must exist)
322 if accepting_user_email:
323 user = self.db.query(EmailUser).filter(EmailUser.email == accepting_user_email).first()
324 if not user:
325 logger.warning(f"User {accepting_user_email} not found")
326 raise ValueError("User account not found")
328 # Check email verification at accept-time
329 if getattr(settings, "require_email_verification_for_invites", True):
330 if not user.email_verified_at:
331 raise ValueError("Email address has not been verified")
333 # Check if team still exists
334 team = self.db.query(EmailTeam).filter(EmailTeam.id == invitation.team_id, EmailTeam.is_active.is_(True)).first()
336 if not team:
337 logger.warning(f"Team {invitation.team_id} not found or inactive")
338 raise ValueError("Team not found or inactive")
340 # Check if user is already a member
341 existing_member = (
342 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == invitation.email, EmailTeamMember.is_active.is_(True)).first()
343 )
345 if existing_member:
346 logger.warning(f"User {invitation.email} is already a member of team {invitation.team_id}")
347 # Deactivate the invitation since they're already a member
348 invitation.is_active = False
349 self.db.commit()
350 raise ValueError("User is already a member of this team")
352 # Check max teams per user
353 max_teams = getattr(settings, "max_teams_per_user", 50)
354 accepting_email = accepting_user_email or invitation.email
355 if self._get_user_team_count(accepting_email) >= max_teams:
356 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
358 # Check team member limit
359 if team.max_members:
360 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.is_active.is_(True)).count()
361 if current_member_count >= team.max_members:
362 logger.warning(f"Team {invitation.team_id} has reached maximum member limit")
363 raise ValueError(f"Team has reached maximum member limit of {team.max_members}")
365 # Create team membership
366 membership = EmailTeamMember(team_id=invitation.team_id, user_email=invitation.email, role=invitation.role, joined_at=utc_now(), invited_by=invitation.invited_by, is_active=True)
368 self.db.add(membership)
370 # Deactivate the invitation
371 invitation.is_active = False
373 self.db.commit()
375 # Invalidate auth cache for user's team membership
376 try:
377 self._fire_and_forget(auth_cache.invalidate_team(invitation.email))
378 self._fire_and_forget(auth_cache.invalidate_user_role(invitation.email, invitation.team_id))
379 self._fire_and_forget(auth_cache.invalidate_user_teams(invitation.email))
380 self._fire_and_forget(auth_cache.invalidate_team_membership(invitation.email))
381 except Exception as cache_error:
382 logger.debug(f"Failed to invalidate cache on invitation acceptance: {cache_error}")
384 logger.info(f"User {invitation.email} accepted invitation to team {invitation.team_id}")
385 return membership
387 except Exception as e:
388 self.db.rollback()
389 logger.error(f"Failed to accept invitation: {e}")
390 raise
392 async def decline_invitation(self, token: str, declining_user_email: Optional[str] = None) -> bool:
393 """Decline a team invitation.
395 Args:
396 token: The invitation token
397 declining_user_email: Email of user declining (for validation)
399 Returns:
400 bool: True if invitation was declined successfully, False otherwise
402 Examples:
403 Users can decline invitations they don't want to accept.
404 """
405 try:
406 # Get the invitation
407 invitation = await self.get_invitation_by_token(token)
408 if not invitation:
409 logger.warning("Invitation not found for token")
410 return False
412 # Validate declining user email if provided
413 if declining_user_email and declining_user_email != invitation.email:
414 logger.warning(f"Email mismatch: invitation for {invitation.email}, declining as {declining_user_email}")
415 return False
417 # Deactivate the invitation
418 invitation.is_active = False
419 self.db.commit()
421 logger.info(f"User {invitation.email} declined invitation to team {invitation.team_id}")
422 return True
424 except Exception as e:
425 self.db.rollback()
426 logger.error(f"Failed to decline invitation: {e}")
427 return False
429 async def revoke_invitation(self, invitation_id: str, revoked_by: str) -> bool:
430 """Revoke a team invitation.
432 Args:
433 invitation_id: ID of the invitation to revoke
434 revoked_by: Email of user revoking the invitation
436 Returns:
437 bool: True if invitation was revoked successfully, False otherwise
439 Examples:
440 Team owners can revoke pending invitations.
441 """
442 try:
443 # Get the invitation
444 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id, EmailTeamInvitation.is_active.is_(True)).first()
446 if not invitation:
447 logger.warning(f"Active invitation {invitation_id} not found")
448 return False
450 # Check if revoker has permission
451 revoker_membership = (
452 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == revoked_by, EmailTeamMember.is_active.is_(True)).first()
453 )
455 if not revoker_membership or revoker_membership.role != "owner":
456 logger.warning(f"User {revoked_by} does not have permission to revoke invitation {invitation_id}")
457 return False
459 # Revoke the invitation
460 invitation.is_active = False
461 self.db.commit()
463 logger.info(f"Invitation {invitation_id} revoked by {revoked_by}")
464 return True
466 except Exception as e:
467 self.db.rollback()
468 logger.error(f"Failed to revoke invitation {invitation_id}: {e}")
469 return False
471 async def get_team_invitations(self, team_id: str, active_only: bool = True) -> List[EmailTeamInvitation]:
472 """Get all invitations for a team.
474 Args:
475 team_id: ID of the team
476 active_only: Whether to return only active invitations
478 Returns:
479 List[EmailTeamInvitation]: List of team invitations
481 Examples:
482 Team management interface showing pending invitations.
483 """
484 try:
485 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id)
487 if active_only:
488 query = query.filter(EmailTeamInvitation.is_active.is_(True))
490 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all()
491 return invitations
493 except Exception as e:
494 logger.error(f"Failed to get invitations for team {team_id}: {e}")
495 return []
497 async def get_user_invitations(self, email: str, active_only: bool = True) -> List[EmailTeamInvitation]:
498 """Get all invitations for a user.
500 Args:
501 email: Email address of the user
502 active_only: Whether to return only active invitations
504 Returns:
505 List[EmailTeamInvitation]: List of invitations for the user
507 Examples:
508 User dashboard showing pending team invitations.
509 """
510 try:
511 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.email == email)
513 if active_only:
514 query = query.filter(EmailTeamInvitation.is_active.is_(True))
516 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all()
517 return invitations
519 except Exception as e:
520 logger.error(f"Failed to get invitations for user {email}: {e}")
521 return []
523 async def cleanup_expired_invitations(self) -> int:
524 """Clean up expired invitations.
526 Returns:
527 int: Number of invitations cleaned up
529 Examples:
530 Periodic cleanup task to remove expired invitations.
531 """
532 try:
533 now = utc_now()
534 expired_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.expires_at < now, EmailTeamInvitation.is_active.is_(True)).update({"is_active": False})
536 self.db.commit()
538 if expired_count > 0:
539 logger.info(f"Cleaned up {expired_count} expired invitations")
541 return expired_count
543 except Exception as e:
544 self.db.rollback()
545 logger.error(f"Failed to cleanup expired invitations: {e}")
546 return 0