Coverage for mcpgateway / services / team_invitation_service.py: 99%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/team_invitation_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Invitation Service. 

8This module provides team invitation creation, management, and acceptance 

9for the multi-team collaboration system. 

10 

11Examples: 

12 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

13 >>> from mcpgateway.db import SessionLocal 

14 >>> db = SessionLocal() 

15 >>> service = TeamInvitationService(db) 

16 >>> # Service handles team invitation lifecycle 

17""" 

18 

19# Standard 

20import asyncio 

21from datetime import timedelta 

22import secrets 

23from typing import Any, List, Optional 

24 

25# Third-Party 

26from sqlalchemy.orm import Session 

27 

28# First-Party 

29from mcpgateway.cache.auth_cache import auth_cache 

30from mcpgateway.common.validators import SecurityValidator 

31from mcpgateway.config import settings 

32from mcpgateway.db import EmailTeam, EmailTeamInvitation, EmailTeamMember, EmailUser, utc_now 

33from mcpgateway.services.logging_service import LoggingService 

34from mcpgateway.services.team_management_service import check_team_member_capacity, get_user_team_count 

35 

36# Initialize logging 

37logging_service = LoggingService() 

38logger = logging_service.get_logger(__name__) 

39 

40 

41class TeamInvitationService: 

42 """Service for team invitation management. 

43 

44 This service handles invitation creation, validation, acceptance, 

45 and cleanup for team membership management. 

46 

47 Attributes: 

48 db (Session): SQLAlchemy database session 

49 

50 Examples: 

51 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

52 >>> from mcpgateway.db import SessionLocal 

53 >>> db = SessionLocal() 

54 >>> service = TeamInvitationService(db) 

55 >>> service.db is not None 

56 True 

57 """ 

58 

59 def __init__(self, db: Session): 

60 """Initialize the team invitation service. 

61 

62 Args: 

63 db: SQLAlchemy database session 

64 

65 Examples: 

66 Basic initialization: 

67 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

68 >>> from unittest.mock import Mock 

69 >>> db_session = Mock() 

70 >>> service = TeamInvitationService(db_session) 

71 >>> service.db is db_session 

72 True 

73 

74 Service attributes: 

75 >>> hasattr(service, 'db') 

76 True 

77 >>> service.__class__.__name__ 

78 'TeamInvitationService' 

79 """ 

80 self.db = db 

81 

82 def _get_user_team_count(self, user_email: str) -> int: 

83 """Get the number of active teams a user belongs to. 

84 

85 Args: 

86 user_email: Email address of the user 

87 

88 Returns: 

89 int: Number of active team memberships 

90 """ 

91 return get_user_team_count(self.db, user_email) 

92 

93 @staticmethod 

94 def _fire_and_forget(coro: Any) -> None: 

95 """Schedule a background coroutine and close it if scheduling fails. 

96 

97 Args: 

98 coro: The coroutine to schedule as a background task. 

99 

100 Raises: 

101 Exception: If asyncio.create_task fails (e.g. no running loop). 

102 """ 

103 try: 

104 task = asyncio.create_task(coro) 

105 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task): 

106 close = getattr(coro, "close", None) 

107 if callable(close): 

108 close() 

109 except Exception: 

110 close = getattr(coro, "close", None) 

111 if callable(close): 

112 close() 

113 raise 

114 

115 def _generate_invitation_token(self) -> str: 

116 """Generate a secure invitation token. 

117 

118 Returns: 

119 str: A cryptographically secure random token 

120 

121 Examples: 

122 Test token generation: 

123 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

124 >>> from unittest.mock import Mock 

125 >>> db_session = Mock() 

126 >>> service = TeamInvitationService(db_session) 

127 >>> token = service._generate_invitation_token() 

128 >>> isinstance(token, str) 

129 True 

130 >>> len(token) > 0 

131 True 

132 

133 Token characteristics: 

134 >>> # Test that token is URL-safe 

135 >>> import string 

136 >>> valid_chars = string.ascii_letters + string.digits + '-_' 

137 >>> all(c in valid_chars for c in token) 

138 True 

139 

140 >>> # Test token length (base64-encoded 32 bytes) 

141 >>> len(token) >= 32 # URL-safe base64 of 32 bytes is ~43 chars 

142 True 

143 

144 Token uniqueness: 

145 >>> token1 = service._generate_invitation_token() 

146 >>> token2 = service._generate_invitation_token() 

147 >>> token1 != token2 

148 True 

149 """ 

150 return secrets.token_urlsafe(32) 

151 

152 async def create_invitation(self, team_id: str, email: str, role: str, invited_by: str, expiry_days: Optional[int] = None) -> Optional[EmailTeamInvitation]: 

153 """Create a team invitation. 

154 

155 Args: 

156 team_id: ID of the team 

157 email: Email address to invite 

158 role: Role to assign (owner, member) 

159 invited_by: Email of user sending the invitation 

160 expiry_days: Days until invitation expires (default from settings) 

161 

162 Returns: 

163 EmailTeamInvitation: The created invitation or None if failed 

164 

165 Raises: 

166 ValueError: If invitation parameters are invalid 

167 Exception: If invitation creation fails 

168 

169 Examples: 

170 Team owners can send invitations to new members. 

171 """ 

172 try: 

173 # Check feature flag 

174 if not getattr(settings, "allow_team_invitations", True): 

175 raise ValueError("Team invitations are currently disabled") 

176 

177 # Validate role 

178 valid_roles = ["owner", "member"] 

179 if role not in valid_roles: 

180 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

181 

182 # Check if team exists 

183 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

184 

185 if not team: 

186 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found") 

187 return None 

188 

189 # Prevent invitations to personal teams 

190 if team.is_personal: 

191 logger.warning(f"Cannot send invitations to personal team {SecurityValidator.sanitize_log_message(team_id)}") 

192 raise ValueError("Cannot send invitations to personal teams") 

193 

194 # Check if inviter exists and is a team member 

195 inviter = self.db.query(EmailUser).filter(EmailUser.email == invited_by).first() 

196 if not inviter: 

197 logger.warning(f"Inviter {invited_by} not found") 

198 return None 

199 

200 # Check email verification requirement for invitee 

201 if getattr(settings, "require_email_verification_for_invites", True): 

202 invitee = self.db.query(EmailUser).filter(EmailUser.email == email).first() 

203 if invitee and not invitee.email_verified_at: 

204 raise ValueError("Invitee email address has not been verified") 

205 

206 # Check if inviter is a member of the team with appropriate permissions 

207 inviter_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == invited_by, EmailTeamMember.is_active.is_(True)).first() 

208 

209 if not inviter_membership: 

210 logger.warning(f"Inviter {invited_by} is not a member of team {SecurityValidator.sanitize_log_message(team_id)}") 

211 raise ValueError("Only team members can send invitations") 

212 

213 # Only owners can send invitations 

214 if inviter_membership.role != "owner": 

215 logger.warning(f"User {invited_by} does not have permission to invite to team {SecurityValidator.sanitize_log_message(team_id)}") 

216 raise ValueError("Only team owners can send invitations") 

217 

218 # Check if user is already a team member 

219 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == email, EmailTeamMember.is_active.is_(True)).first() 

220 

221 if existing_member: 

222 logger.warning(f"User {SecurityValidator.sanitize_log_message(email)} is already a member of team {SecurityValidator.sanitize_log_message(team_id)}") 

223 raise ValueError(f"User {email} is already a member of this team") 

224 

225 # Check for existing active invitations 

226 existing_invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.email == email, EmailTeamInvitation.is_active.is_(True)).first() 

227 

228 if existing_invitation and not existing_invitation.is_expired(): 

229 logger.warning(f"Active invitation already exists for {SecurityValidator.sanitize_log_message(email)} to team {SecurityValidator.sanitize_log_message(team_id)}") 

230 raise ValueError(f"An active invitation already exists for {email}") 

231 

232 # Check team member limit (explicit per-team value or global default). 

233 # Reserve slots for pending invitations too, so we don't over-invite. 

234 pending_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.is_active.is_(True)).count() 

235 check_team_member_capacity(self.db, team, extra_count=pending_count) 

236 

237 # Deactivate any existing invitations for this email/team combination 

238 if existing_invitation: 

239 existing_invitation.is_active = False 

240 

241 # Set expiry 

242 if expiry_days is None: 

243 expiry_days = getattr(settings, "invitation_expiry_days", 7) 

244 expires_at = utc_now() + timedelta(days=expiry_days) 

245 

246 # Create the invitation 

247 invitation = EmailTeamInvitation( 

248 team_id=team_id, email=email, role=role, invited_by=invited_by, invited_at=utc_now(), expires_at=expires_at, token=self._generate_invitation_token(), is_active=True 

249 ) 

250 

251 self.db.add(invitation) 

252 self.db.commit() 

253 

254 logger.info(f"Created invitation for {SecurityValidator.sanitize_log_message(email)} to team {SecurityValidator.sanitize_log_message(team_id)} by {invited_by}") 

255 return invitation 

256 

257 except Exception as e: 

258 self.db.rollback() 

259 logger.error(f"Failed to create invitation for {SecurityValidator.sanitize_log_message(email)} to team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

260 raise 

261 

262 async def get_invitation_by_token(self, token: str) -> Optional[EmailTeamInvitation]: 

263 """Get an invitation by its token. 

264 

265 Args: 

266 token: The invitation token 

267 

268 Returns: 

269 EmailTeamInvitation: The invitation or None if not found 

270 

271 Examples: 

272 Used for invitation acceptance and validation. 

273 """ 

274 try: 

275 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.token == token).first() 

276 

277 return invitation 

278 

279 except Exception as e: 

280 logger.error(f"Failed to get invitation by token: {e}") 

281 return None 

282 

283 async def accept_invitation(self, token: str, accepting_user_email: Optional[str] = None) -> EmailTeamMember: 

284 """Accept a team invitation. 

285 

286 Args: 

287 token: The invitation token 

288 accepting_user_email: Email of user accepting (for validation) 

289 

290 Returns: 

291 EmailTeamMember: The created team membership record 

292 

293 Raises: 

294 ValueError: If invitation is invalid or expired 

295 Exception: If acceptance fails 

296 

297 Examples: 

298 Users can accept invitations to join teams. 

299 """ 

300 try: 

301 # Get the invitation 

302 invitation = await self.get_invitation_by_token(token) 

303 if not invitation: 

304 logger.warning("Invitation not found for token") 

305 raise ValueError("Invitation not found") 

306 

307 # Check if invitation is valid 

308 if not invitation.is_valid(): 

309 logger.warning(f"Invalid or expired invitation for {invitation.email}") 

310 raise ValueError("Invitation is invalid or expired") 

311 

312 # Validate accepting user email if provided 

313 if accepting_user_email and accepting_user_email != invitation.email: 

314 logger.warning(f"Email mismatch: invitation for {invitation.email}, accepting as {SecurityValidator.sanitize_log_message(accepting_user_email)}") 

315 raise ValueError("Email address does not match invitation") 

316 

317 # Check if user exists (if email provided, they must exist) 

318 if accepting_user_email: 

319 user = self.db.query(EmailUser).filter(EmailUser.email == accepting_user_email).first() 

320 if not user: 

321 logger.warning(f"User {SecurityValidator.sanitize_log_message(accepting_user_email)} not found") 

322 raise ValueError("User account not found") 

323 

324 # Check email verification at accept-time 

325 if getattr(settings, "require_email_verification_for_invites", True): 

326 if not user.email_verified_at: 

327 raise ValueError("Email address has not been verified") 

328 

329 # Check if team still exists 

330 team = self.db.query(EmailTeam).filter(EmailTeam.id == invitation.team_id, EmailTeam.is_active.is_(True)).first() 

331 

332 if not team: 

333 logger.warning(f"Team {invitation.team_id} not found or inactive") 

334 raise ValueError("Team not found or inactive") 

335 

336 # Check if user is already a member 

337 existing_member = ( 

338 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == invitation.email, EmailTeamMember.is_active.is_(True)).first() 

339 ) 

340 

341 if existing_member: 

342 logger.warning(f"User {invitation.email} is already a member of team {invitation.team_id}") 

343 # Deactivate the invitation since they're already a member 

344 invitation.is_active = False 

345 self.db.commit() 

346 raise ValueError("User is already a member of this team") 

347 

348 # Check max teams per user 

349 max_teams = getattr(settings, "max_teams_per_user", 50) 

350 accepting_email = accepting_user_email or invitation.email 

351 if self._get_user_team_count(accepting_email) >= max_teams: 

352 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

353 

354 # Check team member limit (explicit per-team value or global default) 

355 check_team_member_capacity(self.db, team) 

356 

357 # Create team membership 

358 membership = EmailTeamMember(team_id=invitation.team_id, user_email=invitation.email, role=invitation.role, joined_at=utc_now(), invited_by=invitation.invited_by, is_active=True) 

359 

360 self.db.add(membership) 

361 

362 # Deactivate the invitation 

363 invitation.is_active = False 

364 

365 self.db.commit() 

366 

367 # Invalidate auth cache for user's team membership 

368 try: 

369 self._fire_and_forget(auth_cache.invalidate_team(invitation.email)) 

370 self._fire_and_forget(auth_cache.invalidate_user_role(invitation.email, invitation.team_id)) 

371 self._fire_and_forget(auth_cache.invalidate_user_teams(invitation.email)) 

372 self._fire_and_forget(auth_cache.invalidate_team_membership(invitation.email)) 

373 except Exception as cache_error: 

374 logger.debug(f"Failed to invalidate cache on invitation acceptance: {cache_error}") 

375 

376 logger.info(f"User {invitation.email} accepted invitation to team {invitation.team_id}") 

377 return membership 

378 

379 except Exception as e: 

380 self.db.rollback() 

381 logger.error(f"Failed to accept invitation: {e}") 

382 raise 

383 

384 async def decline_invitation(self, token: str, declining_user_email: Optional[str] = None) -> bool: 

385 """Decline a team invitation. 

386 

387 Args: 

388 token: The invitation token 

389 declining_user_email: Email of user declining (for validation) 

390 

391 Returns: 

392 bool: True if invitation was declined successfully, False otherwise 

393 

394 Examples: 

395 Users can decline invitations they don't want to accept. 

396 """ 

397 try: 

398 # Get the invitation 

399 invitation = await self.get_invitation_by_token(token) 

400 if not invitation: 

401 logger.warning("Invitation not found for token") 

402 return False 

403 

404 # Validate declining user email if provided 

405 if declining_user_email and declining_user_email != invitation.email: 

406 logger.warning(f"Email mismatch: invitation for {invitation.email}, declining as {SecurityValidator.sanitize_log_message(declining_user_email)}") 

407 return False 

408 

409 # Deactivate the invitation 

410 invitation.is_active = False 

411 self.db.commit() 

412 

413 logger.info(f"User {invitation.email} declined invitation to team {invitation.team_id}") 

414 return True 

415 

416 except Exception as e: 

417 self.db.rollback() 

418 logger.error(f"Failed to decline invitation: {e}") 

419 return False 

420 

421 async def revoke_invitation(self, invitation_id: str, revoked_by: str) -> bool: 

422 """Revoke a team invitation. 

423 

424 Args: 

425 invitation_id: ID of the invitation to revoke 

426 revoked_by: Email of user revoking the invitation 

427 

428 Returns: 

429 bool: True if invitation was revoked successfully, False otherwise 

430 

431 Examples: 

432 Team owners can revoke pending invitations. 

433 """ 

434 try: 

435 # Get the invitation 

436 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id, EmailTeamInvitation.is_active.is_(True)).first() 

437 

438 if not invitation: 

439 logger.warning(f"Active invitation {invitation_id} not found") 

440 return False 

441 

442 # Check if revoker has permission 

443 revoker_membership = ( 

444 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == revoked_by, EmailTeamMember.is_active.is_(True)).first() 

445 ) 

446 

447 if not revoker_membership or revoker_membership.role != "owner": 

448 logger.warning(f"User {revoked_by} does not have permission to revoke invitation {invitation_id}") 

449 return False 

450 

451 # Revoke the invitation 

452 invitation.is_active = False 

453 self.db.commit() 

454 

455 logger.info(f"Invitation {invitation_id} revoked by {revoked_by}") 

456 return True 

457 

458 except Exception as e: 

459 self.db.rollback() 

460 logger.error(f"Failed to revoke invitation {invitation_id}: {e}") 

461 return False 

462 

463 async def get_team_invitations(self, team_id: str, active_only: bool = True) -> List[EmailTeamInvitation]: 

464 """Get all invitations for a team. 

465 

466 Args: 

467 team_id: ID of the team 

468 active_only: Whether to return only active invitations 

469 

470 Returns: 

471 List[EmailTeamInvitation]: List of team invitations 

472 

473 Examples: 

474 Team management interface showing pending invitations. 

475 """ 

476 try: 

477 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id) 

478 

479 if active_only: 

480 query = query.filter(EmailTeamInvitation.is_active.is_(True)) 

481 

482 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all() 

483 return invitations 

484 

485 except Exception as e: 

486 logger.error(f"Failed to get invitations for team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

487 return [] 

488 

489 async def get_user_invitations(self, email: str, active_only: bool = True) -> List[EmailTeamInvitation]: 

490 """Get all invitations for a user. 

491 

492 Args: 

493 email: Email address of the user 

494 active_only: Whether to return only active invitations 

495 

496 Returns: 

497 List[EmailTeamInvitation]: List of invitations for the user 

498 

499 Examples: 

500 User dashboard showing pending team invitations. 

501 """ 

502 try: 

503 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.email == email) 

504 

505 if active_only: 

506 query = query.filter(EmailTeamInvitation.is_active.is_(True)) 

507 

508 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all() 

509 return invitations 

510 

511 except Exception as e: 

512 logger.error(f"Failed to get invitations for user {SecurityValidator.sanitize_log_message(email)}: {e}") 

513 return [] 

514 

515 async def cleanup_expired_invitations(self) -> int: 

516 """Clean up expired invitations. 

517 

518 Returns: 

519 int: Number of invitations cleaned up 

520 

521 Examples: 

522 Periodic cleanup task to remove expired invitations. 

523 """ 

524 try: 

525 now = utc_now() 

526 expired_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.expires_at < now, EmailTeamInvitation.is_active.is_(True)).update({"is_active": False}) 

527 

528 self.db.commit() 

529 

530 if expired_count > 0: 

531 logger.info(f"Cleaned up {expired_count} expired invitations") 

532 

533 return expired_count 

534 

535 except Exception as e: 

536 self.db.rollback() 

537 logger.error(f"Failed to cleanup expired invitations: {e}") 

538 return 0