Coverage for mcpgateway / services / team_invitation_service.py: 99%
212 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/team_invitation_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Invitation Service.
8This module provides team invitation creation, management, and acceptance
9for the multi-team collaboration system.
11Examples:
12 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
13 >>> from mcpgateway.db import SessionLocal
14 >>> db = SessionLocal()
15 >>> service = TeamInvitationService(db)
16 >>> # Service handles team invitation lifecycle
17"""
19# Standard
20import asyncio
21from datetime import timedelta
22import secrets
23from typing import Any, List, Optional
25# Third-Party
26from sqlalchemy.orm import Session
28# First-Party
29from mcpgateway.cache.auth_cache import auth_cache
30from mcpgateway.common.validators import SecurityValidator
31from mcpgateway.config import settings
32from mcpgateway.db import EmailTeam, EmailTeamInvitation, EmailTeamMember, EmailUser, utc_now
33from mcpgateway.services.logging_service import LoggingService
34from mcpgateway.services.team_management_service import check_team_member_capacity, get_user_team_count
36# Initialize logging
37logging_service = LoggingService()
38logger = logging_service.get_logger(__name__)
41class TeamInvitationService:
42 """Service for team invitation management.
44 This service handles invitation creation, validation, acceptance,
45 and cleanup for team membership management.
47 Attributes:
48 db (Session): SQLAlchemy database session
50 Examples:
51 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
52 >>> from mcpgateway.db import SessionLocal
53 >>> db = SessionLocal()
54 >>> service = TeamInvitationService(db)
55 >>> service.db is not None
56 True
57 """
59 def __init__(self, db: Session):
60 """Initialize the team invitation service.
62 Args:
63 db: SQLAlchemy database session
65 Examples:
66 Basic initialization:
67 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
68 >>> from unittest.mock import Mock
69 >>> db_session = Mock()
70 >>> service = TeamInvitationService(db_session)
71 >>> service.db is db_session
72 True
74 Service attributes:
75 >>> hasattr(service, 'db')
76 True
77 >>> service.__class__.__name__
78 'TeamInvitationService'
79 """
80 self.db = db
82 def _get_user_team_count(self, user_email: str) -> int:
83 """Get the number of active teams a user belongs to.
85 Args:
86 user_email: Email address of the user
88 Returns:
89 int: Number of active team memberships
90 """
91 return get_user_team_count(self.db, user_email)
93 @staticmethod
94 def _fire_and_forget(coro: Any) -> None:
95 """Schedule a background coroutine and close it if scheduling fails.
97 Args:
98 coro: The coroutine to schedule as a background task.
100 Raises:
101 Exception: If asyncio.create_task fails (e.g. no running loop).
102 """
103 try:
104 task = asyncio.create_task(coro)
105 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task):
106 close = getattr(coro, "close", None)
107 if callable(close):
108 close()
109 except Exception:
110 close = getattr(coro, "close", None)
111 if callable(close):
112 close()
113 raise
115 def _generate_invitation_token(self) -> str:
116 """Generate a secure invitation token.
118 Returns:
119 str: A cryptographically secure random token
121 Examples:
122 Test token generation:
123 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService
124 >>> from unittest.mock import Mock
125 >>> db_session = Mock()
126 >>> service = TeamInvitationService(db_session)
127 >>> token = service._generate_invitation_token()
128 >>> isinstance(token, str)
129 True
130 >>> len(token) > 0
131 True
133 Token characteristics:
134 >>> # Test that token is URL-safe
135 >>> import string
136 >>> valid_chars = string.ascii_letters + string.digits + '-_'
137 >>> all(c in valid_chars for c in token)
138 True
140 >>> # Test token length (base64-encoded 32 bytes)
141 >>> len(token) >= 32 # URL-safe base64 of 32 bytes is ~43 chars
142 True
144 Token uniqueness:
145 >>> token1 = service._generate_invitation_token()
146 >>> token2 = service._generate_invitation_token()
147 >>> token1 != token2
148 True
149 """
150 return secrets.token_urlsafe(32)
152 async def create_invitation(self, team_id: str, email: str, role: str, invited_by: str, expiry_days: Optional[int] = None) -> Optional[EmailTeamInvitation]:
153 """Create a team invitation.
155 Args:
156 team_id: ID of the team
157 email: Email address to invite
158 role: Role to assign (owner, member)
159 invited_by: Email of user sending the invitation
160 expiry_days: Days until invitation expires (default from settings)
162 Returns:
163 EmailTeamInvitation: The created invitation or None if failed
165 Raises:
166 ValueError: If invitation parameters are invalid
167 Exception: If invitation creation fails
169 Examples:
170 Team owners can send invitations to new members.
171 """
172 try:
173 # Check feature flag
174 if not getattr(settings, "allow_team_invitations", True):
175 raise ValueError("Team invitations are currently disabled")
177 # Validate role
178 valid_roles = ["owner", "member"]
179 if role not in valid_roles:
180 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}")
182 # Check if team exists
183 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
185 if not team:
186 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found")
187 return None
189 # Prevent invitations to personal teams
190 if team.is_personal:
191 logger.warning(f"Cannot send invitations to personal team {SecurityValidator.sanitize_log_message(team_id)}")
192 raise ValueError("Cannot send invitations to personal teams")
194 # Check if inviter exists and is a team member
195 inviter = self.db.query(EmailUser).filter(EmailUser.email == invited_by).first()
196 if not inviter:
197 logger.warning(f"Inviter {invited_by} not found")
198 return None
200 # Check email verification requirement for invitee
201 if getattr(settings, "require_email_verification_for_invites", True):
202 invitee = self.db.query(EmailUser).filter(EmailUser.email == email).first()
203 if invitee and not invitee.email_verified_at:
204 raise ValueError("Invitee email address has not been verified")
206 # Check if inviter is a member of the team with appropriate permissions
207 inviter_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == invited_by, EmailTeamMember.is_active.is_(True)).first()
209 if not inviter_membership:
210 logger.warning(f"Inviter {invited_by} is not a member of team {SecurityValidator.sanitize_log_message(team_id)}")
211 raise ValueError("Only team members can send invitations")
213 # Only owners can send invitations
214 if inviter_membership.role != "owner":
215 logger.warning(f"User {invited_by} does not have permission to invite to team {SecurityValidator.sanitize_log_message(team_id)}")
216 raise ValueError("Only team owners can send invitations")
218 # Check if user is already a team member
219 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == email, EmailTeamMember.is_active.is_(True)).first()
221 if existing_member:
222 logger.warning(f"User {SecurityValidator.sanitize_log_message(email)} is already a member of team {SecurityValidator.sanitize_log_message(team_id)}")
223 raise ValueError(f"User {email} is already a member of this team")
225 # Check for existing active invitations
226 existing_invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.email == email, EmailTeamInvitation.is_active.is_(True)).first()
228 if existing_invitation and not existing_invitation.is_expired():
229 logger.warning(f"Active invitation already exists for {SecurityValidator.sanitize_log_message(email)} to team {SecurityValidator.sanitize_log_message(team_id)}")
230 raise ValueError(f"An active invitation already exists for {email}")
232 # Check team member limit (explicit per-team value or global default).
233 # Reserve slots for pending invitations too, so we don't over-invite.
234 pending_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.is_active.is_(True)).count()
235 check_team_member_capacity(self.db, team, extra_count=pending_count)
237 # Deactivate any existing invitations for this email/team combination
238 if existing_invitation:
239 existing_invitation.is_active = False
241 # Set expiry
242 if expiry_days is None:
243 expiry_days = getattr(settings, "invitation_expiry_days", 7)
244 expires_at = utc_now() + timedelta(days=expiry_days)
246 # Create the invitation
247 invitation = EmailTeamInvitation(
248 team_id=team_id, email=email, role=role, invited_by=invited_by, invited_at=utc_now(), expires_at=expires_at, token=self._generate_invitation_token(), is_active=True
249 )
251 self.db.add(invitation)
252 self.db.commit()
254 logger.info(f"Created invitation for {SecurityValidator.sanitize_log_message(email)} to team {SecurityValidator.sanitize_log_message(team_id)} by {invited_by}")
255 return invitation
257 except Exception as e:
258 self.db.rollback()
259 logger.error(f"Failed to create invitation for {SecurityValidator.sanitize_log_message(email)} to team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
260 raise
262 async def get_invitation_by_token(self, token: str) -> Optional[EmailTeamInvitation]:
263 """Get an invitation by its token.
265 Args:
266 token: The invitation token
268 Returns:
269 EmailTeamInvitation: The invitation or None if not found
271 Examples:
272 Used for invitation acceptance and validation.
273 """
274 try:
275 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.token == token).first()
277 return invitation
279 except Exception as e:
280 logger.error(f"Failed to get invitation by token: {e}")
281 return None
283 async def accept_invitation(self, token: str, accepting_user_email: Optional[str] = None) -> EmailTeamMember:
284 """Accept a team invitation.
286 Args:
287 token: The invitation token
288 accepting_user_email: Email of user accepting (for validation)
290 Returns:
291 EmailTeamMember: The created team membership record
293 Raises:
294 ValueError: If invitation is invalid or expired
295 Exception: If acceptance fails
297 Examples:
298 Users can accept invitations to join teams.
299 """
300 try:
301 # Get the invitation
302 invitation = await self.get_invitation_by_token(token)
303 if not invitation:
304 logger.warning("Invitation not found for token")
305 raise ValueError("Invitation not found")
307 # Check if invitation is valid
308 if not invitation.is_valid():
309 logger.warning(f"Invalid or expired invitation for {invitation.email}")
310 raise ValueError("Invitation is invalid or expired")
312 # Validate accepting user email if provided
313 if accepting_user_email and accepting_user_email != invitation.email:
314 logger.warning(f"Email mismatch: invitation for {invitation.email}, accepting as {SecurityValidator.sanitize_log_message(accepting_user_email)}")
315 raise ValueError("Email address does not match invitation")
317 # Check if user exists (if email provided, they must exist)
318 if accepting_user_email:
319 user = self.db.query(EmailUser).filter(EmailUser.email == accepting_user_email).first()
320 if not user:
321 logger.warning(f"User {SecurityValidator.sanitize_log_message(accepting_user_email)} not found")
322 raise ValueError("User account not found")
324 # Check email verification at accept-time
325 if getattr(settings, "require_email_verification_for_invites", True):
326 if not user.email_verified_at:
327 raise ValueError("Email address has not been verified")
329 # Check if team still exists
330 team = self.db.query(EmailTeam).filter(EmailTeam.id == invitation.team_id, EmailTeam.is_active.is_(True)).first()
332 if not team:
333 logger.warning(f"Team {invitation.team_id} not found or inactive")
334 raise ValueError("Team not found or inactive")
336 # Check if user is already a member
337 existing_member = (
338 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == invitation.email, EmailTeamMember.is_active.is_(True)).first()
339 )
341 if existing_member:
342 logger.warning(f"User {invitation.email} is already a member of team {invitation.team_id}")
343 # Deactivate the invitation since they're already a member
344 invitation.is_active = False
345 self.db.commit()
346 raise ValueError("User is already a member of this team")
348 # Check max teams per user
349 max_teams = getattr(settings, "max_teams_per_user", 50)
350 accepting_email = accepting_user_email or invitation.email
351 if self._get_user_team_count(accepting_email) >= max_teams:
352 raise ValueError(f"User has reached the maximum team limit of {max_teams}")
354 # Check team member limit (explicit per-team value or global default)
355 check_team_member_capacity(self.db, team)
357 # Create team membership
358 membership = EmailTeamMember(team_id=invitation.team_id, user_email=invitation.email, role=invitation.role, joined_at=utc_now(), invited_by=invitation.invited_by, is_active=True)
360 self.db.add(membership)
362 # Deactivate the invitation
363 invitation.is_active = False
365 self.db.commit()
367 # Invalidate auth cache for user's team membership
368 try:
369 self._fire_and_forget(auth_cache.invalidate_team(invitation.email))
370 self._fire_and_forget(auth_cache.invalidate_user_role(invitation.email, invitation.team_id))
371 self._fire_and_forget(auth_cache.invalidate_user_teams(invitation.email))
372 self._fire_and_forget(auth_cache.invalidate_team_membership(invitation.email))
373 except Exception as cache_error:
374 logger.debug(f"Failed to invalidate cache on invitation acceptance: {cache_error}")
376 logger.info(f"User {invitation.email} accepted invitation to team {invitation.team_id}")
377 return membership
379 except Exception as e:
380 self.db.rollback()
381 logger.error(f"Failed to accept invitation: {e}")
382 raise
384 async def decline_invitation(self, token: str, declining_user_email: Optional[str] = None) -> bool:
385 """Decline a team invitation.
387 Args:
388 token: The invitation token
389 declining_user_email: Email of user declining (for validation)
391 Returns:
392 bool: True if invitation was declined successfully, False otherwise
394 Examples:
395 Users can decline invitations they don't want to accept.
396 """
397 try:
398 # Get the invitation
399 invitation = await self.get_invitation_by_token(token)
400 if not invitation:
401 logger.warning("Invitation not found for token")
402 return False
404 # Validate declining user email if provided
405 if declining_user_email and declining_user_email != invitation.email:
406 logger.warning(f"Email mismatch: invitation for {invitation.email}, declining as {SecurityValidator.sanitize_log_message(declining_user_email)}")
407 return False
409 # Deactivate the invitation
410 invitation.is_active = False
411 self.db.commit()
413 logger.info(f"User {invitation.email} declined invitation to team {invitation.team_id}")
414 return True
416 except Exception as e:
417 self.db.rollback()
418 logger.error(f"Failed to decline invitation: {e}")
419 return False
421 async def revoke_invitation(self, invitation_id: str, revoked_by: str) -> bool:
422 """Revoke a team invitation.
424 Args:
425 invitation_id: ID of the invitation to revoke
426 revoked_by: Email of user revoking the invitation
428 Returns:
429 bool: True if invitation was revoked successfully, False otherwise
431 Examples:
432 Team owners can revoke pending invitations.
433 """
434 try:
435 # Get the invitation
436 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id, EmailTeamInvitation.is_active.is_(True)).first()
438 if not invitation:
439 logger.warning(f"Active invitation {invitation_id} not found")
440 return False
442 # Check if revoker has permission
443 revoker_membership = (
444 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == revoked_by, EmailTeamMember.is_active.is_(True)).first()
445 )
447 if not revoker_membership or revoker_membership.role != "owner":
448 logger.warning(f"User {revoked_by} does not have permission to revoke invitation {invitation_id}")
449 return False
451 # Revoke the invitation
452 invitation.is_active = False
453 self.db.commit()
455 logger.info(f"Invitation {invitation_id} revoked by {revoked_by}")
456 return True
458 except Exception as e:
459 self.db.rollback()
460 logger.error(f"Failed to revoke invitation {invitation_id}: {e}")
461 return False
463 async def get_team_invitations(self, team_id: str, active_only: bool = True) -> List[EmailTeamInvitation]:
464 """Get all invitations for a team.
466 Args:
467 team_id: ID of the team
468 active_only: Whether to return only active invitations
470 Returns:
471 List[EmailTeamInvitation]: List of team invitations
473 Examples:
474 Team management interface showing pending invitations.
475 """
476 try:
477 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id)
479 if active_only:
480 query = query.filter(EmailTeamInvitation.is_active.is_(True))
482 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all()
483 return invitations
485 except Exception as e:
486 logger.error(f"Failed to get invitations for team {SecurityValidator.sanitize_log_message(team_id)}: {e}")
487 return []
489 async def get_user_invitations(self, email: str, active_only: bool = True) -> List[EmailTeamInvitation]:
490 """Get all invitations for a user.
492 Args:
493 email: Email address of the user
494 active_only: Whether to return only active invitations
496 Returns:
497 List[EmailTeamInvitation]: List of invitations for the user
499 Examples:
500 User dashboard showing pending team invitations.
501 """
502 try:
503 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.email == email)
505 if active_only:
506 query = query.filter(EmailTeamInvitation.is_active.is_(True))
508 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all()
509 return invitations
511 except Exception as e:
512 logger.error(f"Failed to get invitations for user {SecurityValidator.sanitize_log_message(email)}: {e}")
513 return []
515 async def cleanup_expired_invitations(self) -> int:
516 """Clean up expired invitations.
518 Returns:
519 int: Number of invitations cleaned up
521 Examples:
522 Periodic cleanup task to remove expired invitations.
523 """
524 try:
525 now = utc_now()
526 expired_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.expires_at < now, EmailTeamInvitation.is_active.is_(True)).update({"is_active": False})
528 self.db.commit()
530 if expired_count > 0:
531 logger.info(f"Cleaned up {expired_count} expired invitations")
533 return expired_count
535 except Exception as e:
536 self.db.rollback()
537 logger.error(f"Failed to cleanup expired invitations: {e}")
538 return 0