Coverage for mcpgateway / services / team_invitation_service.py: 100%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/team_invitation_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Invitation Service. 

8This module provides team invitation creation, management, and acceptance 

9for the multi-team collaboration system. 

10 

11Examples: 

12 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

13 >>> from mcpgateway.db import SessionLocal 

14 >>> db = SessionLocal() 

15 >>> service = TeamInvitationService(db) 

16 >>> # Service handles team invitation lifecycle 

17""" 

18 

19# Standard 

20from datetime import timedelta 

21import secrets 

22from typing import List, Optional 

23 

24# Third-Party 

25from sqlalchemy.orm import Session 

26 

27# First-Party 

28from mcpgateway.config import settings 

29from mcpgateway.db import EmailTeam, EmailTeamInvitation, EmailTeamMember, EmailUser, utc_now 

30from mcpgateway.services.logging_service import LoggingService 

31 

32# Initialize logging 

33logging_service = LoggingService() 

34logger = logging_service.get_logger(__name__) 

35 

36 

37class TeamInvitationService: 

38 """Service for team invitation management. 

39 

40 This service handles invitation creation, validation, acceptance, 

41 and cleanup for team membership management. 

42 

43 Attributes: 

44 db (Session): SQLAlchemy database session 

45 

46 Examples: 

47 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

48 >>> from mcpgateway.db import SessionLocal 

49 >>> db = SessionLocal() 

50 >>> service = TeamInvitationService(db) 

51 >>> service.db is not None 

52 True 

53 """ 

54 

55 def __init__(self, db: Session): 

56 """Initialize the team invitation service. 

57 

58 Args: 

59 db: SQLAlchemy database session 

60 

61 Examples: 

62 Basic initialization: 

63 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

64 >>> from unittest.mock import Mock 

65 >>> db_session = Mock() 

66 >>> service = TeamInvitationService(db_session) 

67 >>> service.db is db_session 

68 True 

69 

70 Service attributes: 

71 >>> hasattr(service, 'db') 

72 True 

73 >>> service.__class__.__name__ 

74 'TeamInvitationService' 

75 """ 

76 self.db = db 

77 

78 def _generate_invitation_token(self) -> str: 

79 """Generate a secure invitation token. 

80 

81 Returns: 

82 str: A cryptographically secure random token 

83 

84 Examples: 

85 Test token generation: 

86 >>> from mcpgateway.services.team_invitation_service import TeamInvitationService 

87 >>> from unittest.mock import Mock 

88 >>> db_session = Mock() 

89 >>> service = TeamInvitationService(db_session) 

90 >>> token = service._generate_invitation_token() 

91 >>> isinstance(token, str) 

92 True 

93 >>> len(token) > 0 

94 True 

95 

96 Token characteristics: 

97 >>> # Test that token is URL-safe 

98 >>> import string 

99 >>> valid_chars = string.ascii_letters + string.digits + '-_' 

100 >>> all(c in valid_chars for c in token) 

101 True 

102 

103 >>> # Test token length (base64-encoded 32 bytes) 

104 >>> len(token) >= 32 # URL-safe base64 of 32 bytes is ~43 chars 

105 True 

106 

107 Token uniqueness: 

108 >>> token1 = service._generate_invitation_token() 

109 >>> token2 = service._generate_invitation_token() 

110 >>> token1 != token2 

111 True 

112 """ 

113 return secrets.token_urlsafe(32) 

114 

115 async def create_invitation(self, team_id: str, email: str, role: str, invited_by: str, expiry_days: Optional[int] = None) -> Optional[EmailTeamInvitation]: 

116 """Create a team invitation. 

117 

118 Args: 

119 team_id: ID of the team 

120 email: Email address to invite 

121 role: Role to assign (owner, member) 

122 invited_by: Email of user sending the invitation 

123 expiry_days: Days until invitation expires (default from settings) 

124 

125 Returns: 

126 EmailTeamInvitation: The created invitation or None if failed 

127 

128 Raises: 

129 ValueError: If invitation parameters are invalid 

130 Exception: If invitation creation fails 

131 

132 Examples: 

133 Team owners can send invitations to new members. 

134 """ 

135 try: 

136 # Validate role 

137 valid_roles = ["owner", "member"] 

138 if role not in valid_roles: 

139 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

140 

141 # Check if team exists 

142 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

143 

144 if not team: 

145 logger.warning(f"Team {team_id} not found") 

146 return None 

147 

148 # Prevent invitations to personal teams 

149 if team.is_personal: 

150 logger.warning(f"Cannot send invitations to personal team {team_id}") 

151 raise ValueError("Cannot send invitations to personal teams") 

152 

153 # Check if inviter exists and is a team member 

154 inviter = self.db.query(EmailUser).filter(EmailUser.email == invited_by).first() 

155 if not inviter: 

156 logger.warning(f"Inviter {invited_by} not found") 

157 return None 

158 

159 # Check if inviter is a member of the team with appropriate permissions 

160 inviter_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == invited_by, EmailTeamMember.is_active.is_(True)).first() 

161 

162 if not inviter_membership: 

163 logger.warning(f"Inviter {invited_by} is not a member of team {team_id}") 

164 raise ValueError("Only team members can send invitations") 

165 

166 # Only owners can send invitations 

167 if inviter_membership.role != "owner": 

168 logger.warning(f"User {invited_by} does not have permission to invite to team {team_id}") 

169 raise ValueError("Only team owners can send invitations") 

170 

171 # Check if user is already a team member 

172 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == email, EmailTeamMember.is_active.is_(True)).first() 

173 

174 if existing_member: 

175 logger.warning(f"User {email} is already a member of team {team_id}") 

176 raise ValueError(f"User {email} is already a member of this team") 

177 

178 # Check for existing active invitations 

179 existing_invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.email == email, EmailTeamInvitation.is_active.is_(True)).first() 

180 

181 if existing_invitation and not existing_invitation.is_expired(): 

182 logger.warning(f"Active invitation already exists for {email} to team {team_id}") 

183 raise ValueError(f"An active invitation already exists for {email}") 

184 

185 # Check team member limit 

186 if team.max_members: 

187 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count() 

188 

189 pending_invitation_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id, EmailTeamInvitation.is_active.is_(True)).count() 

190 

191 if (current_member_count + pending_invitation_count) >= team.max_members: 

192 logger.warning(f"Team {team_id} has reached maximum member limit") 

193 raise ValueError(f"Team has reached maximum member limit of {team.max_members}") 

194 

195 # Deactivate any existing invitations for this email/team combination 

196 if existing_invitation: 

197 existing_invitation.is_active = False 

198 

199 # Set expiry 

200 if expiry_days is None: 

201 expiry_days = getattr(settings, "invitation_expiry_days", 7) 

202 expires_at = utc_now() + timedelta(days=expiry_days) 

203 

204 # Create the invitation 

205 invitation = EmailTeamInvitation( 

206 team_id=team_id, email=email, role=role, invited_by=invited_by, invited_at=utc_now(), expires_at=expires_at, token=self._generate_invitation_token(), is_active=True 

207 ) 

208 

209 self.db.add(invitation) 

210 self.db.commit() 

211 

212 logger.info(f"Created invitation for {email} to team {team_id} by {invited_by}") 

213 return invitation 

214 

215 except Exception as e: 

216 self.db.rollback() 

217 logger.error(f"Failed to create invitation for {email} to team {team_id}: {e}") 

218 raise 

219 

220 async def get_invitation_by_token(self, token: str) -> Optional[EmailTeamInvitation]: 

221 """Get an invitation by its token. 

222 

223 Args: 

224 token: The invitation token 

225 

226 Returns: 

227 EmailTeamInvitation: The invitation or None if not found 

228 

229 Examples: 

230 Used for invitation acceptance and validation. 

231 """ 

232 try: 

233 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.token == token).first() 

234 

235 return invitation 

236 

237 except Exception as e: 

238 logger.error(f"Failed to get invitation by token: {e}") 

239 return None 

240 

241 async def accept_invitation(self, token: str, accepting_user_email: Optional[str] = None) -> bool: 

242 """Accept a team invitation. 

243 

244 Args: 

245 token: The invitation token 

246 accepting_user_email: Email of user accepting (for validation) 

247 

248 Returns: 

249 bool: True if invitation was accepted successfully, False otherwise 

250 

251 Raises: 

252 ValueError: If invitation is invalid or expired 

253 Exception: If acceptance fails 

254 

255 Examples: 

256 Users can accept invitations to join teams. 

257 """ 

258 try: 

259 # Get the invitation 

260 invitation = await self.get_invitation_by_token(token) 

261 if not invitation: 

262 logger.warning("Invitation not found for token") 

263 raise ValueError("Invitation not found") 

264 

265 # Check if invitation is valid 

266 if not invitation.is_valid(): 

267 logger.warning(f"Invalid or expired invitation for {invitation.email}") 

268 raise ValueError("Invitation is invalid or expired") 

269 

270 # Validate accepting user email if provided 

271 if accepting_user_email and accepting_user_email != invitation.email: 

272 logger.warning(f"Email mismatch: invitation for {invitation.email}, accepting as {accepting_user_email}") 

273 raise ValueError("Email address does not match invitation") 

274 

275 # Check if user exists (if email provided, they must exist) 

276 if accepting_user_email: 

277 user = self.db.query(EmailUser).filter(EmailUser.email == accepting_user_email).first() 

278 if not user: 

279 logger.warning(f"User {accepting_user_email} not found") 

280 raise ValueError("User account not found") 

281 

282 # Check if team still exists 

283 team = self.db.query(EmailTeam).filter(EmailTeam.id == invitation.team_id, EmailTeam.is_active.is_(True)).first() 

284 

285 if not team: 

286 logger.warning(f"Team {invitation.team_id} not found or inactive") 

287 raise ValueError("Team not found or inactive") 

288 

289 # Check if user is already a member 

290 existing_member = ( 

291 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == invitation.email, EmailTeamMember.is_active.is_(True)).first() 

292 ) 

293 

294 if existing_member: 

295 logger.warning(f"User {invitation.email} is already a member of team {invitation.team_id}") 

296 # Deactivate the invitation since they're already a member 

297 invitation.is_active = False 

298 self.db.commit() 

299 raise ValueError("User is already a member of this team") 

300 

301 # Check team member limit 

302 if team.max_members: 

303 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.is_active.is_(True)).count() 

304 if current_member_count >= team.max_members: 

305 logger.warning(f"Team {invitation.team_id} has reached maximum member limit") 

306 raise ValueError(f"Team has reached maximum member limit of {team.max_members}") 

307 

308 # Create team membership 

309 membership = EmailTeamMember(team_id=invitation.team_id, user_email=invitation.email, role=invitation.role, joined_at=utc_now(), invited_by=invitation.invited_by, is_active=True) 

310 

311 self.db.add(membership) 

312 

313 # Deactivate the invitation 

314 invitation.is_active = False 

315 

316 self.db.commit() 

317 

318 # Invalidate auth cache for user's team membership 

319 try: 

320 # Standard 

321 import asyncio # pylint: disable=import-outside-toplevel 

322 

323 # First-Party 

324 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

325 

326 asyncio.create_task(auth_cache.invalidate_team(invitation.email)) 

327 asyncio.create_task(auth_cache.invalidate_user_role(invitation.email, invitation.team_id)) 

328 asyncio.create_task(auth_cache.invalidate_user_teams(invitation.email)) 

329 asyncio.create_task(auth_cache.invalidate_team_membership(invitation.email)) 

330 except Exception as cache_error: 

331 logger.debug(f"Failed to invalidate cache on invitation acceptance: {cache_error}") 

332 

333 logger.info(f"User {invitation.email} accepted invitation to team {invitation.team_id}") 

334 return True 

335 

336 except Exception as e: 

337 self.db.rollback() 

338 logger.error(f"Failed to accept invitation: {e}") 

339 raise 

340 

341 async def decline_invitation(self, token: str, declining_user_email: Optional[str] = None) -> bool: 

342 """Decline a team invitation. 

343 

344 Args: 

345 token: The invitation token 

346 declining_user_email: Email of user declining (for validation) 

347 

348 Returns: 

349 bool: True if invitation was declined successfully, False otherwise 

350 

351 Examples: 

352 Users can decline invitations they don't want to accept. 

353 """ 

354 try: 

355 # Get the invitation 

356 invitation = await self.get_invitation_by_token(token) 

357 if not invitation: 

358 logger.warning("Invitation not found for token") 

359 return False 

360 

361 # Validate declining user email if provided 

362 if declining_user_email and declining_user_email != invitation.email: 

363 logger.warning(f"Email mismatch: invitation for {invitation.email}, declining as {declining_user_email}") 

364 return False 

365 

366 # Deactivate the invitation 

367 invitation.is_active = False 

368 self.db.commit() 

369 

370 logger.info(f"User {invitation.email} declined invitation to team {invitation.team_id}") 

371 return True 

372 

373 except Exception as e: 

374 self.db.rollback() 

375 logger.error(f"Failed to decline invitation: {e}") 

376 return False 

377 

378 async def revoke_invitation(self, invitation_id: str, revoked_by: str) -> bool: 

379 """Revoke a team invitation. 

380 

381 Args: 

382 invitation_id: ID of the invitation to revoke 

383 revoked_by: Email of user revoking the invitation 

384 

385 Returns: 

386 bool: True if invitation was revoked successfully, False otherwise 

387 

388 Examples: 

389 Team owners can revoke pending invitations. 

390 """ 

391 try: 

392 # Get the invitation 

393 invitation = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id, EmailTeamInvitation.is_active.is_(True)).first() 

394 

395 if not invitation: 

396 logger.warning(f"Active invitation {invitation_id} not found") 

397 return False 

398 

399 # Check if revoker has permission 

400 revoker_membership = ( 

401 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == invitation.team_id, EmailTeamMember.user_email == revoked_by, EmailTeamMember.is_active.is_(True)).first() 

402 ) 

403 

404 if not revoker_membership or revoker_membership.role != "owner": 

405 logger.warning(f"User {revoked_by} does not have permission to revoke invitation {invitation_id}") 

406 return False 

407 

408 # Revoke the invitation 

409 invitation.is_active = False 

410 self.db.commit() 

411 

412 logger.info(f"Invitation {invitation_id} revoked by {revoked_by}") 

413 return True 

414 

415 except Exception as e: 

416 self.db.rollback() 

417 logger.error(f"Failed to revoke invitation {invitation_id}: {e}") 

418 return False 

419 

420 async def get_team_invitations(self, team_id: str, active_only: bool = True) -> List[EmailTeamInvitation]: 

421 """Get all invitations for a team. 

422 

423 Args: 

424 team_id: ID of the team 

425 active_only: Whether to return only active invitations 

426 

427 Returns: 

428 List[EmailTeamInvitation]: List of team invitations 

429 

430 Examples: 

431 Team management interface showing pending invitations. 

432 """ 

433 try: 

434 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.team_id == team_id) 

435 

436 if active_only: 

437 query = query.filter(EmailTeamInvitation.is_active.is_(True)) 

438 

439 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all() 

440 return invitations 

441 

442 except Exception as e: 

443 logger.error(f"Failed to get invitations for team {team_id}: {e}") 

444 return [] 

445 

446 async def get_user_invitations(self, email: str, active_only: bool = True) -> List[EmailTeamInvitation]: 

447 """Get all invitations for a user. 

448 

449 Args: 

450 email: Email address of the user 

451 active_only: Whether to return only active invitations 

452 

453 Returns: 

454 List[EmailTeamInvitation]: List of invitations for the user 

455 

456 Examples: 

457 User dashboard showing pending team invitations. 

458 """ 

459 try: 

460 query = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.email == email) 

461 

462 if active_only: 

463 query = query.filter(EmailTeamInvitation.is_active.is_(True)) 

464 

465 invitations = query.order_by(EmailTeamInvitation.invited_at.desc()).all() 

466 return invitations 

467 

468 except Exception as e: 

469 logger.error(f"Failed to get invitations for user {email}: {e}") 

470 return [] 

471 

472 async def cleanup_expired_invitations(self) -> int: 

473 """Clean up expired invitations. 

474 

475 Returns: 

476 int: Number of invitations cleaned up 

477 

478 Examples: 

479 Periodic cleanup task to remove expired invitations. 

480 """ 

481 try: 

482 now = utc_now() 

483 expired_count = self.db.query(EmailTeamInvitation).filter(EmailTeamInvitation.expires_at < now, EmailTeamInvitation.is_active.is_(True)).update({"is_active": False}) 

484 

485 self.db.commit() 

486 

487 if expired_count > 0: 

488 logger.info(f"Cleaned up {expired_count} expired invitations") 

489 

490 return expired_count 

491 

492 except Exception as e: 

493 self.db.rollback() 

494 logger.error(f"Failed to cleanup expired invitations: {e}") 

495 return 0