Coverage for mcpgateway / services / team_invitation_service.py: 99%

219 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/team_invitation_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Invitation Service. 

8This module provides team invitation creation, management, and acceptance 

9for the multi-team collaboration system. 

10 

11Examples: 

12 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

13 >>> from mcpgateway.db import SessionLocal 

14 >>> db = SessionLocal() 

15 >>> service = TeamInvitationService(db) 

16 >>> # Service handles team invitation lifecycle 

17""" 

18 

19# Standard 

20import asyncio 

21from datetime import timedelta 

22import secrets 

23from typing import Any, List, Optional 

24 

25# Third-Party 

26from sqlalchemy.orm import Session 

27 

28# First-Party 

29from mcpgateway.cache.auth_cache import auth_cache 

30from mcpgateway.config import settings 

31from mcpgateway.db import EmailTeam, EmailTeamInvitation, EmailTeamMember, EmailUser, utc_now 

32from mcpgateway.services.logging_service import LoggingService 

33from mcpgateway.services.team_management_service import get_user_team_count 

34 

35# Initialize logging 

36logging_service = LoggingService() 

37logger = logging_service.get_logger(__name__) 

38 

39 

40class TeamInvitationService: 

41 """Service for team invitation management. 

42 

43 This service handles invitation creation, validation, acceptance, 

44 and cleanup for team membership management. 

45 

46 Attributes: 

47 db (Session): SQLAlchemy database session 

48 

49 Examples: 

50 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

51 >>> from mcpgateway.db import SessionLocal 

52 >>> db = SessionLocal() 

53 >>> service = TeamInvitationService(db) 

54 >>> service.db is not None 

55 True 

56 """ 

57 

58 def __init__(self, db: Session): 

59 """Initialize the team invitation service. 

60 

61 Args: 

62 db: SQLAlchemy database session 

63 

64 Examples: 

65 Basic initialization: 

66 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

67 >>> from unittest.mock import Mock 

68 >>> db_session = Mock() 

69 >>> service = TeamInvitationService(db_session) 

70 >>> service.db is db_session 

71 True 

72 

73 Service attributes: 

74 >>> hasattr(service, 'db') 

75 True 

76 >>> service.__class__.__name__ 

77 'TeamInvitationService' 

78 """ 

79 self.db = db 

80 

81 def _get_user_team_count(self, user_email: str) -> int: 

82 """Get the number of active teams a user belongs to. 

83 

84 Args: 

85 user_email: Email address of the user 

86 

87 Returns: 

88 int: Number of active team memberships 

89 """ 

90 return get_user_team_count(self.db, user_email) 

91 

92 @staticmethod 

93 def _fire_and_forget(coro: Any) -> None: 

94 """Schedule a background coroutine and close it if scheduling fails. 

95 

96 Args: 

97 coro: The coroutine to schedule as a background task. 

98 

99 Raises: 

100 Exception: If asyncio.create_task fails (e.g. no running loop). 

101 """ 

102 try: 

103 task = asyncio.create_task(coro) 

104 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task): 

105 close = getattr(coro, "close", None) 

106 if callable(close): 

107 close() 

108 except Exception: 

109 close = getattr(coro, "close", None) 

110 if callable(close): 

111 close() 

112 raise 

113 

114 def _generate_invitation_token(self) -> str: 

115 """Generate a secure invitation token. 

116 

117 Returns: 

118 str: A cryptographically secure random token 

119 

120 Examples: 

121 Test token generation: 

122 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

123 >>> from unittest.mock import Mock 

124 >>> db_session = Mock() 

125 >>> service = TeamInvitationService(db_session) 

126 >>> token = service._generate_invitation_token() 

127 >>> isinstance(token, str) 

128 True 

129 >>> len(token) > 0 

130 True 

131 

132 Token characteristics: 

133 >>> # Test that token is URL-safe 

134 >>> import string 

135 >>> valid_chars = string.ascii_letters + string.digits + '-_' 

136 >>> all(c in valid_chars for c in token) 

137 True 

138 

139 >>> # Test token length (base64-encoded 32 bytes) 

140 >>> len(token) >= 32 # URL-safe base64 of 32 bytes is ~43 chars 

141 True 

142 

143 Token uniqueness: 

144 >>> token1 = service._generate_invitation_token() 

145 >>> token2 = service._generate_invitation_token() 

146 >>> token1 != token2 

147 True 

148 """ 

149 return secrets.token_urlsafe(32) 

150 

151 async def create_invitation(self, team_id: str, email: str, role: str, invited_by: str, expiry_days: Optional[int] = None) -> Optional[EmailTeamInvitation]: 

152 """Create a team invitation. 

153 

154 Args: 

155 team_id: ID of the team 

156 email: Email address to invite 

157 role: Role to assign (owner, member) 

158 invited_by: Email of user sending the invitation 

159 expiry_days: Days until invitation expires (default from settings) 

160 

161 Returns: 

162 EmailTeamInvitation: The created invitation or None if failed 

163 

164 Raises: 

165 ValueError: If invitation parameters are invalid 

166 Exception: If invitation creation fails 

167 

168 Examples: 

169 Team owners can send invitations to new members. 

170 """ 

171 try: 

172 # Check feature flag 

173 if not getattr(settings, "allow_team_invitations", True): 

174 raise ValueError("Team invitations are currently disabled") 

175 

176 # Validate role 

177 valid_roles = ["owner", "member"] 

178 if role not in valid_roles: 

179 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

180 

181 # Check if team exists 

182 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

183 

184 if not team: 

185 logger.warning(f"Team {team_id} not found") 

186 return None 

187 

188 # Prevent invitations to personal teams 

189 if team.is_personal: 

190 logger.warning(f"Cannot send invitations to personal team {team_id}") 

191 raise ValueError("Cannot send invitations to personal teams") 

192 

193 # Check if inviter exists and is a team member 

194 inviter = self.db.query(EmailUser).filter(EmailUser.email == invited_by).first() 

195 if not inviter: 

196 logger.warning(f"Inviter {invited_by} not found") 

197 return None 

198 

199 # Check email verification requirement for invitee 

200 if getattr(settings, "require_email_verification_for_invites", True): 

201 invitee = self.db.query(EmailUser).filter(EmailUser.email == email).first() 

202 if invitee and not invitee.email_verified_at: 

203 raise ValueError("Invitee email address has not been verified") 

204 

205 # Check if inviter is a member of the team with appropriate permissions 

206 inviter_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == invited_by, EmailTeamMember.is_active.is_(True)).first() 

207 

208 if not inviter_membership: 

209 logger.warning(f"Inviter {invited_by} is not a member of team {team_id}") 

210 raise ValueError("Only team members can send invitations") 

211 

212 # Only owners can send invitations 

213 if inviter_membership.role != "owner": 

214 logger.warning(f"User {invited_by} does not have permission to invite to team {team_id}") 

215 raise ValueError("Only team owners can send invitations") 

216 

217 # Check if user is already a team member 

218 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == email, EmailTeamMember.is_active.is_(True)).first() 

219 

220 if existing_member: 

221 logger.warning(f"User {email} is already a member of team {team_id}") 

222 raise ValueError(f"User {email} is already a member of this team") 

223 

224 # Check for existing active invitations 

225 existing_invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.email == email, EmailTeamInvitation.is_active.is_(True)).first() 

226 

227 if existing_invitation and not existing_invitation.is_expired(): 

228 logger.warning(f"Active invitation already exists for {email} to team {team_id}") 

229 raise ValueError(f"An active invitation already exists for {email}") 

230 

231 # Check team member limit 

232 if team.max_members: 

233 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count() 

234 

235 pending_invitation_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.is_active.is_(True)).count() 

236 

237 if (current_member_count + pending_invitation_count) >= team.max_members: 

238 logger.warning(f"Team {team_id} has reached maximum member limit") 

239 raise ValueError(f"Team has reached maximum member limit of {team.max_members}") 

240 

241 # Deactivate any existing invitations for this email/team combination 

242 if existing_invitation: 

243 existing_invitation.is_active = False 

244 

245 # Set expiry 

246 if expiry_days is None: 

247 expiry_days = getattr(settings, "invitation_expiry_days", 7) 

248 expires_at = utc_now() + timedelta(days=expiry_days) 

249 

250 # Create the invitation 

251 invitation = EmailTeamInvitation( 

252 team_id=team_id, email=email, role=role, invited_by=invited_by, invited_at=utc_now(), expires_at=expires_at, token=self._generate_invitation_token(), is_active=True 

253 ) 

254 

255 self.db.add(invitation) 

256 self.db.commit() 

257 

258 logger.info(f"Created invitation for {email} to team {team_id} by {invited_by}") 

259 return invitation 

260 

261 except Exception as e: 

262 self.db.rollback() 

263 logger.error(f"Failed to create invitation for {email} to team {team_id}: {e}") 

264 raise 

265 

266 async def get_invitation_by_token(self, token: str) -> Optional[EmailTeamInvitation]: 

267 """Get an invitation by its token. 

268 

269 Args: 

270 token: The invitation token 

271 

272 Returns: 

273 EmailTeamInvitation: The invitation or None if not found 

274 

275 Examples: 

276 Used for invitation acceptance and validation. 

277 """ 

278 try: 

279 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.token == token).first() 

280 

281 return invitation 

282 

283 except Exception as e: 

284 logger.error(f"Failed to get invitation by token: {e}") 

285 return None 

286 

287 async def accept_invitation(self, token: str, accepting_user_email: Optional[str] = None) -> EmailTeamMember: 

288 """Accept a team invitation. 

289 

290 Args: 

291 token: The invitation token 

292 accepting_user_email: Email of user accepting (for validation) 

293 

294 Returns: 

295 EmailTeamMember: The created team membership record 

296 

297 Raises: 

298 ValueError: If invitation is invalid or expired 

299 Exception: If acceptance fails 

300 

301 Examples: 

302 Users can accept invitations to join teams. 

303 """ 

304 try: 

305 # Get the invitation 

306 invitation = await self.get_invitation_by_token(token) 

307 if not invitation: 

308 logger.warning("Invitation not found for token") 

309 raise ValueError("Invitation not found") 

310 

311 # Check if invitation is valid 

312 if not invitation.is_valid(): 

313 logger.warning(f"Invalid or expired invitation for {invitation.email}") 

314 raise ValueError("Invitation is invalid or expired") 

315 

316 # Validate accepting user email if provided 

317 if accepting_user_email and accepting_user_email != invitation.email: 

318 logger.warning(f"Email mismatch: invitation for {invitation.email}, accepting as {accepting_user_email}") 

319 raise ValueError("Email address does not match invitation") 

320 

321 # Check if user exists (if email provided, they must exist) 

322 if accepting_user_email: 

323 user = self.db.query(EmailUser).filter(EmailUser.email == accepting_user_email).first() 

324 if not user: 

325 logger.warning(f"User {accepting_user_email} not found") 

326 raise ValueError("User account not found") 

327 

328 # Check email verification at accept-time 

329 if getattr(settings, "require_email_verification_for_invites", True): 

330 if not user.email_verified_at: 

331 raise ValueError("Email address has not been verified") 

332 

333 # Check if team still exists 

334 team = self.db.query(EmailTeam).filter(EmailTeam.id == invitation.team_id, EmailTeam.is_active.is_(True)).first() 

335 

336 if not team: 

337 logger.warning(f"Team {invitation.team_id} not found or inactive") 

338 raise ValueError("Team not found or inactive") 

339 

340 # Check if user is already a member 

341 existing_member = ( 

342 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == invitation.email, EmailTeamMember.is_active.is_(True)).first() 

343 ) 

344 

345 if existing_member: 

346 logger.warning(f"User {invitation.email} is already a member of team {invitation.team_id}") 

347 # Deactivate the invitation since they're already a member 

348 invitation.is_active = False 

349 self.db.commit() 

350 raise ValueError("User is already a member of this team") 

351 

352 # Check max teams per user 

353 max_teams = getattr(settings, "max_teams_per_user", 50) 

354 accepting_email = accepting_user_email or invitation.email 

355 if self._get_user_team_count(accepting_email) >= max_teams: 

356 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

357 

358 # Check team member limit 

359 if team.max_members: 

360 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.is_active.is_(True)).count() 

361 if current_member_count >= team.max_members: 

362 logger.warning(f"Team {invitation.team_id} has reached maximum member limit") 

363 raise ValueError(f"Team has reached maximum member limit of {team.max_members}") 

364 

365 # Create team membership 

366 membership = EmailTeamMember(team_id=invitation.team_id, user_email=invitation.email, role=invitation.role, joined_at=utc_now(), invited_by=invitation.invited_by, is_active=True) 

367 

368 self.db.add(membership) 

369 

370 # Deactivate the invitation 

371 invitation.is_active = False 

372 

373 self.db.commit() 

374 

375 # Invalidate auth cache for user's team membership 

376 try: 

377 self._fire_and_forget(auth_cache.invalidate_team(invitation.email)) 

378 self._fire_and_forget(auth_cache.invalidate_user_role(invitation.email, invitation.team_id)) 

379 self._fire_and_forget(auth_cache.invalidate_user_teams(invitation.email)) 

380 self._fire_and_forget(auth_cache.invalidate_team_membership(invitation.email)) 

381 except Exception as cache_error: 

382 logger.debug(f"Failed to invalidate cache on invitation acceptance: {cache_error}") 

383 

384 logger.info(f"User {invitation.email} accepted invitation to team {invitation.team_id}") 

385 return membership 

386 

387 except Exception as e: 

388 self.db.rollback() 

389 logger.error(f"Failed to accept invitation: {e}") 

390 raise 

391 

392 async def decline_invitation(self, token: str, declining_user_email: Optional[str] = None) -> bool: 

393 """Decline a team invitation. 

394 

395 Args: 

396 token: The invitation token 

397 declining_user_email: Email of user declining (for validation) 

398 

399 Returns: 

400 bool: True if invitation was declined successfully, False otherwise 

401 

402 Examples: 

403 Users can decline invitations they don't want to accept. 

404 """ 

405 try: 

406 # Get the invitation 

407 invitation = await self.get_invitation_by_token(token) 

408 if not invitation: 

409 logger.warning("Invitation not found for token") 

410 return False 

411 

412 # Validate declining user email if provided 

413 if declining_user_email and declining_user_email != invitation.email: 

414 logger.warning(f"Email mismatch: invitation for {invitation.email}, declining as {declining_user_email}") 

415 return False 

416 

417 # Deactivate the invitation 

418 invitation.is_active = False 

419 self.db.commit() 

420 

421 logger.info(f"User {invitation.email} declined invitation to team {invitation.team_id}") 

422 return True 

423 

424 except Exception as e: 

425 self.db.rollback() 

426 logger.error(f"Failed to decline invitation: {e}") 

427 return False 

428 

429 async def revoke_invitation(self, invitation_id: str, revoked_by: str) -> bool: 

430 """Revoke a team invitation. 

431 

432 Args: 

433 invitation_id: ID of the invitation to revoke 

434 revoked_by: Email of user revoking the invitation 

435 

436 Returns: 

437 bool: True if invitation was revoked successfully, False otherwise 

438 

439 Examples: 

440 Team owners can revoke pending invitations. 

441 """ 

442 try: 

443 # Get the invitation 

444 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id, EmailTeamInvitation.is_active.is_(True)).first() 

445 

446 if not invitation: 

447 logger.warning(f"Active invitation {invitation_id} not found") 

448 return False 

449 

450 # Check if revoker has permission 

451 revoker_membership = ( 

452 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == revoked_by, EmailTeamMember.is_active.is_(True)).first() 

453 ) 

454 

455 if not revoker_membership or revoker_membership.role != "owner": 

456 logger.warning(f"User {revoked_by} does not have permission to revoke invitation {invitation_id}") 

457 return False 

458 

459 # Revoke the invitation 

460 invitation.is_active = False 

461 self.db.commit() 

462 

463 logger.info(f"Invitation {invitation_id} revoked by {revoked_by}") 

464 return True 

465 

466 except Exception as e: 

467 self.db.rollback() 

468 logger.error(f"Failed to revoke invitation {invitation_id}: {e}") 

469 return False 

470 

471 async def get_team_invitations(self, team_id: str, active_only: bool = True) -> List[EmailTeamInvitation]: 

472 """Get all invitations for a team. 

473 

474 Args: 

475 team_id: ID of the team 

476 active_only: Whether to return only active invitations 

477 

478 Returns: 

479 List[EmailTeamInvitation]: List of team invitations 

480 

481 Examples: 

482 Team management interface showing pending invitations. 

483 """ 

484 try: 

485 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id) 

486 

487 if active_only: 

488 query = query.filter(EmailTeamInvitation.is_active.is_(True)) 

489 

490 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all() 

491 return invitations 

492 

493 except Exception as e: 

494 logger.error(f"Failed to get invitations for team {team_id}: {e}") 

495 return [] 

496 

497 async def get_user_invitations(self, email: str, active_only: bool = True) -> List[EmailTeamInvitation]: 

498 """Get all invitations for a user. 

499 

500 Args: 

501 email: Email address of the user 

502 active_only: Whether to return only active invitations 

503 

504 Returns: 

505 List[EmailTeamInvitation]: List of invitations for the user 

506 

507 Examples: 

508 User dashboard showing pending team invitations. 

509 """ 

510 try: 

511 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.email == email) 

512 

513 if active_only: 

514 query = query.filter(EmailTeamInvitation.is_active.is_(True)) 

515 

516 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all() 

517 return invitations 

518 

519 except Exception as e: 

520 logger.error(f"Failed to get invitations for user {email}: {e}") 

521 return [] 

522 

523 async def cleanup_expired_invitations(self) -> int: 

524 """Clean up expired invitations. 

525 

526 Returns: 

527 int: Number of invitations cleaned up 

528 

529 Examples: 

530 Periodic cleanup task to remove expired invitations. 

531 """ 

532 try: 

533 now = utc_now() 

534 expired_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.expires_at < now, EmailTeamInvitation.is_active.is_(True)).update({"is_active": False}) 

535 

536 self.db.commit() 

537 

538 if expired_count > 0: 

539 logger.info(f"Cleaned up {expired_count} expired invitations") 

540 

541 return expired_count 

542 

543 except Exception as e: 

544 self.db.rollback() 

545 logger.error(f"Failed to cleanup expired invitations: {e}") 

546 return 0