Coverage for mcpgateway / services / team_management_service.py: 100%

679 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/team_management_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Management Service. 

8This module provides team creation, management, and membership operations 

9for the multi-team collaboration system. 

10 

11Examples: 

12 >>> from unittest.mock import Mock 

13 >>> service = TeamManagementService(Mock()) 

14 >>> isinstance(service, TeamManagementService) 

15 True 

16 >>> hasattr(service, 'db') 

17 True 

18""" 

19 

20# Standard 

21import asyncio 

22import base64 

23from datetime import datetime, timedelta 

24from typing import Any, Dict, List, Optional, Tuple, Union 

25 

26# Third-Party 

27import orjson 

28from sqlalchemy import and_, desc, func, or_, select 

29from sqlalchemy.orm import selectinload, Session 

30 

31# First-Party 

32from mcpgateway.cache.admin_stats_cache import admin_stats_cache 

33from mcpgateway.cache.auth_cache import auth_cache 

34from mcpgateway.config import settings 

35from mcpgateway.db import EmailTeam, EmailTeamJoinRequest, EmailTeamMember, EmailTeamMemberHistory, EmailUser, utc_now 

36from mcpgateway.services.logging_service import LoggingService 

37from mcpgateway.utils.pagination import unified_paginate 

38 

39# Initialize logging 

40logging_service = LoggingService() 

41logger = logging_service.get_logger(__name__) 

42 

43 

44class TeamManagementService: 

45 """Service for team management operations. 

46 

47 This service handles team creation, membership management, 

48 role assignments, and team access control. 

49 

50 Attributes: 

51 db (Session): SQLAlchemy database session 

52 

53 Examples: 

54 >>> from unittest.mock import Mock 

55 >>> service = TeamManagementService(Mock()) 

56 >>> service.__class__.__name__ 

57 'TeamManagementService' 

58 >>> hasattr(service, 'db') 

59 True 

60 """ 

61 

62 def __init__(self, db: Session): 

63 """Initialize the team management service. 

64 

65 Args: 

66 db: SQLAlchemy database session 

67 

68 Examples: 

69 Basic initialization: 

70 >>> from mcpgateway.services.team_management_service import TeamManagementService 

71 >>> from unittest.mock import Mock 

72 >>> db_session = Mock() 

73 >>> service = TeamManagementService(db_session) 

74 >>> service.db is db_session 

75 True 

76 

77 Service attributes: 

78 >>> hasattr(service, 'db') 

79 True 

80 >>> service.__class__.__name__ 

81 'TeamManagementService' 

82 """ 

83 self.db = db 

84 

85 def _log_team_member_action(self, team_member_id: str, team_id: str, user_email: str, role: str, action: str, action_by: Optional[str]): 

86 """ 

87 Log a team member action to EmailTeamMemberHistory. 

88 

89 Args: 

90 team_member_id: ID of the EmailTeamMember 

91 team_id: Team ID 

92 user_email: Email of the affected user 

93 role: Role at the time of action 

94 action: Action type ("added", "removed", "reactivated", "role_changed") 

95 action_by: Email of the user who performed the action 

96 

97 Examples: 

98 >>> from mcpgateway.services.team_management_service import TeamManagementService 

99 >>> from unittest.mock import Mock 

100 >>> service = TeamManagementService(Mock()) 

101 >>> service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com") 

102 """ 

103 history = EmailTeamMemberHistory(team_member_id=team_member_id, team_id=team_id, user_email=user_email, role=role, action=action, action_by=action_by, action_timestamp=utc_now()) 

104 self.db.add(history) 

105 self.db.commit() 

106 

107 async def create_team(self, name: str, description: Optional[str], created_by: str, visibility: Optional[str] = "public", max_members: Optional[int] = None) -> EmailTeam: 

108 """Create a new team. 

109 

110 Args: 

111 name: Team name 

112 description: Team description 

113 created_by: Email of the user creating the team 

114 visibility: Team visibility (private, team, public) 

115 max_members: Maximum number of team members allowed 

116 

117 Returns: 

118 EmailTeam: The created team 

119 

120 Raises: 

121 ValueError: If team name is taken or invalid 

122 Exception: If team creation fails 

123 

124 Examples: 

125 Team creation parameter validation: 

126 >>> from mcpgateway.services.team_management_service import TeamManagementService 

127 

128 Test team name validation: 

129 >>> team_name = "My Development Team" 

130 >>> len(team_name) > 0 

131 True 

132 >>> len(team_name) <= 255 

133 True 

134 >>> bool(team_name.strip()) 

135 True 

136 

137 Test visibility validation: 

138 >>> visibility = "private" 

139 >>> valid_visibilities = ["private", "public"] 

140 >>> visibility in valid_visibilities 

141 True 

142 >>> "invalid" in valid_visibilities 

143 False 

144 

145 Test max_members validation: 

146 >>> max_members = 50 

147 >>> isinstance(max_members, int) 

148 True 

149 >>> max_members > 0 

150 True 

151 

152 Test creator validation: 

153 >>> created_by = "admin@example.com" 

154 >>> "@" in created_by 

155 True 

156 >>> len(created_by) > 0 

157 True 

158 

159 Test description handling: 

160 >>> description = "A team for software development" 

161 >>> description is not None 

162 True 

163 >>> isinstance(description, str) 

164 True 

165 

166 >>> # Test None description 

167 >>> description_none = None 

168 >>> description_none is None 

169 True 

170 """ 

171 try: 

172 # Validate visibility 

173 valid_visibilities = ["private", "public"] 

174 if visibility not in valid_visibilities: 

175 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}") 

176 

177 # Apply default max members from settings 

178 if max_members is None: 

179 max_members = getattr(settings, "max_members_per_team", 100) 

180 

181 # Check for existing inactive team with same name 

182 # First-Party 

183 from mcpgateway.utils.create_slug import slugify # pylint: disable=import-outside-toplevel 

184 

185 potential_slug = slugify(name) 

186 existing_inactive_team = self.db.query(EmailTeam).filter(EmailTeam.slug == potential_slug, EmailTeam.is_active.is_(False)).first() 

187 

188 if existing_inactive_team: 

189 # Reactivate the existing team with new details 

190 existing_inactive_team.name = name 

191 existing_inactive_team.description = description 

192 existing_inactive_team.created_by = created_by 

193 existing_inactive_team.visibility = visibility 

194 existing_inactive_team.max_members = max_members 

195 existing_inactive_team.is_active = True 

196 existing_inactive_team.updated_at = utc_now() 

197 team = existing_inactive_team 

198 

199 # Check if the creator already has an inactive membership 

200 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email == created_by).first() 

201 

202 if existing_membership: 

203 # Reactivate existing membership as owner 

204 existing_membership.role = "owner" 

205 existing_membership.joined_at = utc_now() 

206 existing_membership.is_active = True 

207 membership = existing_membership 

208 else: 

209 # Create new membership 

210 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True) 

211 self.db.add(membership) 

212 

213 logger.info(f"Reactivated existing team with slug {potential_slug}") 

214 else: 

215 # Create the team (slug will be auto-generated by event listener) 

216 team = EmailTeam(name=name, description=description, created_by=created_by, is_personal=False, visibility=visibility, max_members=max_members, is_active=True) 

217 self.db.add(team) 

218 

219 self.db.flush() # Get the team ID 

220 

221 # Add the creator as owner 

222 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True) 

223 self.db.add(membership) 

224 

225 self.db.commit() 

226 

227 # Invalidate member count cache for the new team 

228 await self.invalidate_team_member_count_cache(str(team.id)) 

229 

230 # Invalidate auth cache for creator's team membership 

231 # Without this, the cache won't know the user belongs to this new team 

232 try: 

233 await auth_cache.invalidate_user_teams(created_by) 

234 await auth_cache.invalidate_team_membership(created_by) 

235 await auth_cache.invalidate_user_role(created_by, str(team.id)) 

236 await admin_stats_cache.invalidate_teams() 

237 except Exception as cache_error: 

238 logger.debug(f"Failed to invalidate cache on team create: {cache_error}") 

239 

240 logger.info(f"Created team '{team.name}' by {created_by}") 

241 return team 

242 

243 except Exception as e: 

244 self.db.rollback() 

245 logger.error(f"Failed to create team '{name}': {e}") 

246 raise 

247 

248 async def get_team_by_id(self, team_id: str) -> Optional[EmailTeam]: 

249 """Get a team by ID. 

250 

251 Args: 

252 team_id: Team ID to lookup 

253 

254 Returns: 

255 EmailTeam: The team or None if not found 

256 

257 Examples: 

258 >>> import asyncio 

259 >>> from unittest.mock import Mock 

260 >>> service = TeamManagementService(Mock()) 

261 >>> asyncio.iscoroutinefunction(service.get_team_by_id) 

262 True 

263 """ 

264 try: 

265 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

266 self.db.commit() # Release transaction to avoid idle-in-transaction 

267 return team 

268 

269 except Exception as e: 

270 self.db.rollback() 

271 logger.error(f"Failed to get team by ID {team_id}: {e}") 

272 return None 

273 

274 async def get_team_by_slug(self, slug: str) -> Optional[EmailTeam]: 

275 """Get a team by slug. 

276 

277 Args: 

278 slug: Team slug to lookup 

279 

280 Returns: 

281 EmailTeam: The team or None if not found 

282 

283 Examples: 

284 >>> import asyncio 

285 >>> from unittest.mock import Mock 

286 >>> service = TeamManagementService(Mock()) 

287 >>> asyncio.iscoroutinefunction(service.get_team_by_slug) 

288 True 

289 """ 

290 try: 

291 team = self.db.query(EmailTeam).filter(EmailTeam.slug == slug, EmailTeam.is_active.is_(True)).first() 

292 self.db.commit() # Release transaction to avoid idle-in-transaction 

293 return team 

294 

295 except Exception as e: 

296 self.db.rollback() 

297 logger.error(f"Failed to get team by slug {slug}: {e}") 

298 return None 

299 

300 async def update_team( 

301 self, team_id: str, name: Optional[str] = None, description: Optional[str] = None, visibility: Optional[str] = None, max_members: Optional[int] = None, updated_by: Optional[str] = None 

302 ) -> bool: 

303 """Update team information. 

304 

305 Args: 

306 team_id: ID of the team to update 

307 name: New team name 

308 description: New team description 

309 visibility: New visibility setting 

310 max_members: New maximum member limit 

311 updated_by: Email of user making the update 

312 

313 Returns: 

314 bool: True if update succeeded, False otherwise 

315 

316 Raises: 

317 ValueError: If visibility setting is invalid 

318 

319 Examples: 

320 >>> import asyncio 

321 >>> from unittest.mock import Mock 

322 >>> service = TeamManagementService(Mock()) 

323 >>> asyncio.iscoroutinefunction(service.update_team) 

324 True 

325 """ 

326 try: 

327 team = await self.get_team_by_id(team_id) 

328 if not team: 

329 logger.warning(f"Team {team_id} not found for update") 

330 return False 

331 

332 # Prevent updating personal teams 

333 if team.is_personal: 

334 logger.warning(f"Cannot update personal team {team_id}") 

335 return False 

336 

337 # Update fields if provided 

338 if name is not None: 

339 team.name = name 

340 # Slug will be updated by event listener if name changes 

341 

342 if description is not None: 

343 team.description = description 

344 

345 if visibility is not None: 

346 valid_visibilities = ["private", "public"] 

347 if visibility not in valid_visibilities: 

348 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}") 

349 team.visibility = visibility 

350 

351 if max_members is not None: 

352 team.max_members = max_members 

353 

354 team.updated_at = utc_now() 

355 self.db.commit() 

356 

357 logger.info(f"Updated team {team_id} by {updated_by}") 

358 return True 

359 

360 except ValueError: 

361 raise # Let ValueError propagate to caller for proper error handling 

362 except Exception as e: 

363 self.db.rollback() 

364 logger.error(f"Failed to update team {team_id}: {e}") 

365 return False 

366 

367 async def delete_team(self, team_id: str, deleted_by: str) -> bool: 

368 """Delete a team (soft delete). 

369 

370 Args: 

371 team_id: ID of the team to delete 

372 deleted_by: Email of user performing deletion 

373 

374 Returns: 

375 bool: True if deletion succeeded, False otherwise 

376 

377 Raises: 

378 ValueError: If attempting to delete a personal team 

379 

380 Examples: 

381 >>> import asyncio 

382 >>> from unittest.mock import Mock 

383 >>> service = TeamManagementService(Mock()) 

384 >>> asyncio.iscoroutinefunction(service.delete_team) 

385 True 

386 """ 

387 try: 

388 team = await self.get_team_by_id(team_id) 

389 if not team: 

390 logger.warning(f"Team {team_id} not found for deletion") 

391 return False 

392 

393 # Prevent deleting personal teams 

394 if team.is_personal: 

395 logger.warning(f"Cannot delete personal team {team_id}") 

396 raise ValueError("Personal teams cannot be deleted") 

397 

398 # Soft delete the team 

399 team.is_active = False 

400 team.updated_at = utc_now() 

401 

402 # Get all active memberships before deactivating (for history logging) 

403 memberships = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).all() 

404 

405 # Log history for each membership (before bulk update) 

406 for membership in memberships: 

407 self._log_team_member_action(membership.id, team_id, membership.user_email, membership.role, "team-deleted", deleted_by) 

408 

409 # Bulk update: deactivate all memberships in single query instead of looping 

410 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).update({EmailTeamMember.is_active: False}, synchronize_session=False) 

411 

412 self.db.commit() 

413 

414 # Invalidate all role caches for this team 

415 try: 

416 asyncio.create_task(auth_cache.invalidate_team_roles(team_id)) 

417 asyncio.create_task(admin_stats_cache.invalidate_teams()) 

418 # Also invalidate team cache, teams list cache, and team membership cache for each member 

419 for membership in memberships: 

420 asyncio.create_task(auth_cache.invalidate_team(membership.user_email)) 

421 asyncio.create_task(auth_cache.invalidate_user_teams(membership.user_email)) 

422 asyncio.create_task(auth_cache.invalidate_team_membership(membership.user_email)) 

423 except Exception as cache_error: 

424 logger.debug(f"Failed to invalidate caches on team delete: {cache_error}") 

425 

426 logger.info(f"Deleted team {team_id} by {deleted_by}") 

427 return True 

428 

429 except Exception as e: 

430 self.db.rollback() 

431 logger.error(f"Failed to delete team {team_id}: {e}") 

432 return False 

433 

434 async def add_member_to_team(self, team_id: str, user_email: str, role: str = "member", invited_by: Optional[str] = None) -> bool: 

435 """Add a member to a team. 

436 

437 Args: 

438 team_id: ID of the team 

439 user_email: Email of the user to add 

440 role: Role to assign (owner, member) 

441 invited_by: Email of user who added this member 

442 

443 Returns: 

444 bool: True if member was added successfully, False otherwise 

445 

446 Raises: 

447 ValueError: If role is invalid or team member limit exceeded 

448 

449 Examples: 

450 >>> import asyncio 

451 >>> from unittest.mock import Mock 

452 >>> service = TeamManagementService(Mock()) 

453 >>> asyncio.iscoroutinefunction(service.add_member_to_team) 

454 True 

455 >>> # After adding, EmailTeamMemberHistory is updated 

456 >>> # service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com") 

457 """ 

458 try: 

459 # Validate role 

460 valid_roles = ["owner", "member"] 

461 if role not in valid_roles: 

462 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

463 

464 # Check if team exists 

465 team = await self.get_team_by_id(team_id) 

466 if not team: 

467 logger.warning(f"Team {team_id} not found") 

468 return False 

469 

470 # Check if user exists 

471 user = self.db.query(EmailUser).filter(EmailUser.email == user_email).first() 

472 if not user: 

473 logger.warning(f"User {user_email} not found") 

474 return False 

475 

476 # Check if user is already a member 

477 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email).first() 

478 

479 if existing_membership and existing_membership.is_active: 

480 logger.warning(f"User {user_email} is already a member of team {team_id}") 

481 return False 

482 

483 # Check team member limit 

484 if team.max_members: 

485 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count() 

486 

487 if current_member_count >= team.max_members: 

488 logger.warning(f"Team {team_id} has reached maximum member limit") 

489 raise ValueError(f"Team has reached maximum member limit of {team.max_members}") 

490 

491 # Add or reactivate membership 

492 if existing_membership: 

493 existing_membership.is_active = True 

494 existing_membership.role = role 

495 existing_membership.joined_at = utc_now() 

496 existing_membership.invited_by = invited_by 

497 self.db.commit() 

498 self._log_team_member_action(existing_membership.id, team_id, user_email, role, "reactivated", invited_by) 

499 else: 

500 membership = EmailTeamMember(team_id=team_id, user_email=user_email, role=role, joined_at=utc_now(), invited_by=invited_by, is_active=True) 

501 self.db.add(membership) 

502 self.db.commit() 

503 self._log_team_member_action(membership.id, team_id, user_email, role, "added", invited_by) 

504 

505 # Invalidate auth cache for user's team membership and role 

506 try: 

507 asyncio.create_task(auth_cache.invalidate_team(user_email)) 

508 asyncio.create_task(auth_cache.invalidate_user_role(user_email, team_id)) 

509 asyncio.create_task(auth_cache.invalidate_user_teams(user_email)) 

510 asyncio.create_task(auth_cache.invalidate_team_membership(user_email)) 

511 asyncio.create_task(admin_stats_cache.invalidate_teams()) 

512 except Exception as cache_error: 

513 logger.debug(f"Failed to invalidate cache on team add: {cache_error}") 

514 

515 # Invalidate member count cache for this team 

516 await self.invalidate_team_member_count_cache(str(team_id)) 

517 

518 logger.info(f"Added {user_email} to team {team_id} with role {role}") 

519 return True 

520 

521 except Exception as e: 

522 self.db.rollback() 

523 logger.error(f"Failed to add {user_email} to team {team_id}: {e}") 

524 return False 

525 

526 async def remove_member_from_team(self, team_id: str, user_email: str, removed_by: Optional[str] = None) -> bool: 

527 """Remove a member from a team. 

528 

529 Args: 

530 team_id: ID of the team 

531 user_email: Email of the user to remove 

532 removed_by: Email of user performing the removal 

533 

534 Returns: 

535 bool: True if member was removed successfully, False otherwise 

536 

537 Raises: 

538 ValueError: If attempting to remove the last owner 

539 

540 Examples: 

541 Team membership management with role-based access control. 

542 After removal, EmailTeamMemberHistory is updated via _log_team_member_action. 

543 """ 

544 try: 

545 team = await self.get_team_by_id(team_id) 

546 if not team: 

547 logger.warning(f"Team {team_id} not found") 

548 return False 

549 

550 # Prevent removing members from personal teams 

551 if team.is_personal: 

552 logger.warning(f"Cannot remove members from personal team {team_id}") 

553 return False 

554 

555 # Find the membership 

556 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

557 

558 if not membership: 

559 logger.warning(f"User {user_email} is not a member of team {team_id}") 

560 return False 

561 

562 # Prevent removing the last owner 

563 if membership.role == "owner": 

564 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

565 

566 if owner_count <= 1: 

567 logger.warning(f"Cannot remove the last owner from team {team_id}") 

568 raise ValueError("Cannot remove the last owner from a team") 

569 

570 # Remove membership (soft delete) 

571 membership.is_active = False 

572 self.db.commit() 

573 self._log_team_member_action(membership.id, team_id, user_email, membership.role, "removed", removed_by) 

574 

575 # Invalidate auth cache for user's team membership and role 

576 try: 

577 asyncio.create_task(auth_cache.invalidate_team(user_email)) 

578 asyncio.create_task(auth_cache.invalidate_user_role(user_email, team_id)) 

579 asyncio.create_task(auth_cache.invalidate_user_teams(user_email)) 

580 asyncio.create_task(auth_cache.invalidate_team_membership(user_email)) 

581 except Exception as cache_error: 

582 logger.debug(f"Failed to invalidate cache on team remove: {cache_error}") 

583 

584 # Invalidate member count cache for this team 

585 await self.invalidate_team_member_count_cache(str(team_id)) 

586 

587 logger.info(f"Removed {user_email} from team {team_id} by {removed_by}") 

588 return True 

589 

590 except Exception as e: 

591 self.db.rollback() 

592 logger.error(f"Failed to remove {user_email} from team {team_id}: {e}") 

593 return False 

594 

595 async def update_member_role(self, team_id: str, user_email: str, new_role: str, updated_by: Optional[str] = None) -> bool: 

596 """Update a team member's role. 

597 

598 Args: 

599 team_id: ID of the team 

600 user_email: Email of the user whose role to update 

601 new_role: New role to assign 

602 updated_by: Email of user making the change 

603 

604 Returns: 

605 bool: True if role was updated successfully, False otherwise 

606 

607 Raises: 

608 ValueError: If role is invalid or removing last owner role 

609 

610 Examples: 

611 Role management within teams for access control. 

612 After role update, EmailTeamMemberHistory is updated via _log_team_member_action. 

613 """ 

614 try: 

615 # Validate role 

616 valid_roles = ["owner", "member"] 

617 if new_role not in valid_roles: 

618 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

619 

620 team = await self.get_team_by_id(team_id) 

621 if not team: 

622 logger.warning(f"Team {team_id} not found") 

623 return False 

624 

625 # Prevent updating roles in personal teams 

626 if team.is_personal: 

627 logger.warning(f"Cannot update roles in personal team {team_id}") 

628 return False 

629 

630 # Find the membership 

631 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

632 

633 if not membership: 

634 logger.warning(f"User {user_email} is not a member of team {team_id}") 

635 return False 

636 

637 # Prevent changing the role of the last owner to non-owner 

638 if membership.role == "owner" and new_role != "owner": 

639 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

640 

641 if owner_count <= 1: 

642 logger.warning(f"Cannot remove owner role from the last owner of team {team_id}") 

643 raise ValueError("Cannot remove owner role from the last owner of a team") 

644 

645 # Update the role 

646 membership.role = new_role 

647 self.db.commit() 

648 self._log_team_member_action(membership.id, team_id, user_email, new_role, "role_changed", updated_by) 

649 

650 # Invalidate role cache 

651 try: 

652 asyncio.create_task(auth_cache.invalidate_user_role(user_email, team_id)) 

653 except Exception as cache_error: 

654 logger.debug(f"Failed to invalidate cache on role update: {cache_error}") 

655 

656 logger.info(f"Updated role of {user_email} in team {team_id} to {new_role} by {updated_by}") 

657 return True 

658 

659 except ValueError: 

660 raise # Let ValueError propagate to caller for proper error handling 

661 except Exception as e: 

662 self.db.rollback() 

663 logger.error(f"Failed to update role of {user_email} in team {team_id}: {e}") 

664 return False 

665 

666 async def get_member(self, team_id: str, user_email: str) -> Optional[EmailTeamMember]: 

667 """Get a single team member by team ID and user email. 

668 

669 Args: 

670 team_id: ID of the team 

671 user_email: Email of the user 

672 

673 Returns: 

674 EmailTeamMember if found and active, None otherwise 

675 """ 

676 try: 

677 return self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

678 except Exception as e: 

679 logger.error(f"Failed to get member {user_email} in team {team_id}: {e}") 

680 return None 

681 

682 async def get_user_teams(self, user_email: str, include_personal: bool = True) -> List[EmailTeam]: 

683 """Get all teams a user belongs to. 

684 

685 Uses caching to reduce database queries (called 20+ times per request). 

686 Cache can be disabled via AUTH_CACHE_TEAMS_ENABLED=false. 

687 

688 Args: 

689 user_email: Email of the user 

690 include_personal: Whether to include personal teams 

691 

692 Returns: 

693 List[EmailTeam]: List of teams the user belongs to 

694 

695 Examples: 

696 User dashboard showing team memberships. 

697 """ 

698 # Check cache first 

699 cache = self._get_auth_cache() 

700 cache_key = f"{user_email}:{include_personal}" 

701 

702 if cache: 

703 cached_team_ids = await cache.get_user_teams(cache_key) 

704 if cached_team_ids is not None: 

705 if not cached_team_ids: # Empty list = user has no teams 

706 return [] 

707 # Fetch full team objects by IDs (fast indexed lookup) 

708 try: 

709 teams = self.db.query(EmailTeam).filter(EmailTeam.id.in_(cached_team_ids), EmailTeam.is_active.is_(True)).all() 

710 self.db.commit() # Release transaction to avoid idle-in-transaction 

711 return teams 

712 except Exception as e: 

713 self.db.rollback() 

714 logger.warning(f"Failed to fetch teams by IDs from cache: {e}") 

715 # Fall through to full query 

716 

717 # Cache miss or caching disabled - do full query 

718 try: 

719 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True)) 

720 

721 if not include_personal: 

722 query = query.filter(EmailTeam.is_personal.is_(False)) 

723 

724 teams = query.all() 

725 self.db.commit() # Release transaction to avoid idle-in-transaction 

726 

727 # Update cache with team IDs 

728 if cache: 

729 team_ids = [t.id for t in teams] 

730 await cache.set_user_teams(cache_key, team_ids) 

731 

732 return teams 

733 

734 except Exception as e: 

735 self.db.rollback() 

736 logger.error(f"Failed to get teams for user {user_email}: {e}") 

737 return [] 

738 

739 async def verify_team_for_user(self, user_email, team_id=None): 

740 """ 

741 Retrieve a team ID for a user based on their membership and optionally a specific team ID. 

742 

743 This function attempts to fetch all teams associated with the given user email. 

744 If no `team_id` is provided, it returns the ID of the user's personal team (if any). 

745 If a `team_id` is provided, it checks whether the user is a member of that team. 

746 If the user is not a member of the specified team, it returns a JSONResponse with an error message. 

747 

748 Args: 

749 user_email (str): The email of the user whose teams are being queried. 

750 team_id (str or None, optional): Specific team ID to check for membership. Defaults to None. 

751 

752 Returns: 

753 str or JSONResponse or None: 

754 - If `team_id` is None, returns the ID of the user's personal team or None if not found. 

755 - If `team_id` is provided and the user is a member of that team, returns `team_id`. 

756 - If `team_id` is provided but the user is not a member of that team, returns a JSONResponse with error. 

757 - Returns None if an error occurs and no `team_id` was initially provided. 

758 

759 Raises: 

760 None explicitly, but any exceptions during the process are caught and logged. 

761 

762 Examples: 

763 Verifies user team if team_id provided otherwise finds its personal id. 

764 """ 

765 try: 

766 # Get all teams the user belongs to in a single query 

767 try: 

768 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True)) 

769 user_teams = query.all() 

770 self.db.commit() # Release transaction to avoid idle-in-transaction 

771 except Exception as e: 

772 self.db.rollback() 

773 logger.error(f"Failed to get teams for user {user_email}: {e}") 

774 return [] 

775 

776 if not team_id: 

777 # If no team_id is provided, try to get the personal team 

778 personal_team = next((t for t in user_teams if getattr(t, "is_personal", False)), None) 

779 team_id = personal_team.id if personal_team else None 

780 else: 

781 # Check if the provided team_id exists among the user's teams 

782 is_team_present = any(team.id == team_id for team in user_teams) 

783 if not is_team_present: 

784 return [] 

785 except Exception as e: 

786 self.db.rollback() 

787 print(f"An error occurred: {e}") 

788 if not team_id: 

789 team_id = None 

790 

791 return team_id 

792 

793 async def get_team_members( 

794 self, 

795 team_id: str, 

796 cursor: Optional[str] = None, 

797 limit: Optional[int] = None, 

798 page: Optional[int] = None, 

799 per_page: Optional[int] = None, 

800 ) -> Union[List[Tuple[EmailUser, EmailTeamMember]], Tuple[List[Tuple[EmailUser, EmailTeamMember]], Optional[str]], Dict[str, Any]]: 

801 """Get all members of a team with optional cursor or page-based pagination. 

802 

803 Note: This method returns ORM objects and cannot be cached since callers 

804 depend on ORM attributes and methods. 

805 

806 Args: 

807 team_id: ID of the team 

808 cursor: Opaque cursor token for cursor-based pagination 

809 limit: Maximum number of members to return (for cursor-based, default: 50) 

810 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

811 per_page: Items per page for page-based pagination (default: 30) 

812 

813 Returns: 

814 - If cursor is provided: Tuple (members, next_cursor) 

815 - If page is provided: Dict with keys 'data', 'pagination', 'links' 

816 - If neither: List of all members (backward compatibility) 

817 

818 Examples: 

819 Team member management and role display. 

820 """ 

821 try: 

822 # Build base query - for pagination, select EmailTeamMember and eager-load user 

823 # For backward compat (no pagination), select both entities as tuple 

824 if cursor is None and page is None and limit is None: 

825 # Backward compatibility: return tuples (no pagination requested) 

826 query = ( 

827 select(EmailUser, EmailTeamMember) 

828 .join(EmailTeamMember, EmailUser.email == EmailTeamMember.user_email) 

829 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

830 .order_by(EmailUser.full_name, EmailUser.email) 

831 ) 

832 result = self.db.execute(query) 

833 members = list(result.all()) 

834 self.db.commit() 

835 return members 

836 

837 # For pagination: select EmailTeamMember and eager-load user to avoid N+1 

838 query = ( 

839 select(EmailTeamMember) 

840 .options(selectinload(EmailTeamMember.user)) 

841 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

842 .join(EmailUser, EmailUser.email == EmailTeamMember.user_email) 

843 ) 

844 

845 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate 

846 if page is not None: 

847 # Alphabetical ordering for user-friendly display 

848 query = query.order_by(EmailUser.full_name, EmailUser.email) 

849 pag_result = await unified_paginate( 

850 db=self.db, 

851 query=query, 

852 page=page, 

853 per_page=per_page or 30, 

854 cursor=None, 

855 limit=None, 

856 base_url=f"/admin/teams/{team_id}/members", 

857 query_params={}, 

858 ) 

859 self.db.commit() 

860 memberships = pag_result["data"] 

861 tuples = [(m.user, m) for m in memberships] 

862 return { 

863 "data": tuples, 

864 "pagination": pag_result["pagination"], 

865 "links": pag_result["links"], 

866 } 

867 

868 # CURSOR-BASED PAGINATION (API) - custom implementation using (joined_at, id) 

869 # unified_paginate uses created_at which doesn't exist on EmailTeamMember 

870 

871 # Order by joined_at DESC, id DESC for keyset pagination 

872 query = query.order_by(desc(EmailTeamMember.joined_at), desc(EmailTeamMember.id)) 

873 

874 # Decode cursor and apply keyset filter 

875 if cursor: 

876 try: 

877 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

878 cursor_data = orjson.loads(cursor_json) 

879 last_id = cursor_data.get("id") 

880 joined_str = cursor_data.get("joined_at") 

881 if last_id and joined_str: 

882 last_joined = datetime.fromisoformat(joined_str) 

883 # Keyset filter: (joined_at < last) OR (joined_at = last AND id < last_id) 

884 query = query.where( 

885 or_( 

886 EmailTeamMember.joined_at < last_joined, 

887 and_(EmailTeamMember.joined_at == last_joined, EmailTeamMember.id < last_id), 

888 ) 

889 ) 

890 except (ValueError, TypeError) as e: 

891 logger.warning(f"Invalid cursor for team members, ignoring: {e}") 

892 

893 # Fetch limit + 1 to check for more results (cap at max_page_size) 

894 page_size = min(limit or 50, settings.pagination_max_page_size) 

895 query = query.limit(page_size + 1) 

896 memberships = list(self.db.execute(query).scalars().all()) 

897 

898 # Check if there are more results 

899 has_more = len(memberships) > page_size 

900 if has_more: 

901 memberships = memberships[:page_size] 

902 

903 # Generate next cursor using (joined_at, id) 

904 next_cursor = None 

905 if has_more and memberships: 

906 last_member = memberships[-1] 

907 cursor_data = { 

908 "joined_at": last_member.joined_at.isoformat() if last_member.joined_at else None, 

909 "id": last_member.id, 

910 } 

911 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

912 

913 self.db.commit() 

914 tuples = [(m.user, m) for m in memberships] 

915 return (tuples, next_cursor) 

916 

917 except Exception as e: 

918 self.db.rollback() 

919 logger.error(f"Failed to get members for team {team_id}: {e}") 

920 

921 # Return appropriate empty response based on mode 

922 if page is not None: 

923 return {"data": [], "pagination": {"page": page, "per_page": per_page or 30, "total": 0, "has_next": False, "has_prev": False}, "links": None} 

924 

925 if cursor is not None: 

926 return ([], None) 

927 

928 return [] 

929 

930 def count_team_owners(self, team_id: str) -> int: 

931 """Count the number of owners in a team using SQL COUNT. 

932 

933 This is more efficient than loading all members and counting in Python. 

934 

935 Args: 

936 team_id: ID of the team 

937 

938 Returns: 

939 int: Number of active owners in the team 

940 """ 

941 count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

942 self.db.commit() # Release transaction to avoid idle-in-transaction 

943 return count 

944 

945 def _get_auth_cache(self): 

946 """Get auth cache instance lazily. 

947 

948 Returns: 

949 AuthCache instance or None if unavailable. 

950 """ 

951 try: 

952 # First-Party 

953 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel 

954 

955 return get_auth_cache() 

956 except ImportError: 

957 return None 

958 

959 def _get_admin_stats_cache(self): 

960 """Get admin stats cache instance lazily. 

961 

962 Returns: 

963 AdminStatsCache instance or None if unavailable. 

964 """ 

965 try: 

966 # First-Party 

967 from mcpgateway.cache.admin_stats_cache import get_admin_stats_cache # pylint: disable=import-outside-toplevel 

968 

969 return get_admin_stats_cache() 

970 except ImportError: 

971 return None 

972 

973 async def get_user_role_in_team(self, user_email: str, team_id: str) -> Optional[str]: 

974 """Get a user's role in a specific team. 

975 

976 Uses caching to reduce database queries (called 11+ times per team operation). 

977 

978 Args: 

979 user_email: Email of the user 

980 team_id: ID of the team 

981 

982 Returns: 

983 str: User's role or None if not a member 

984 

985 Examples: 

986 Access control and permission checking. 

987 """ 

988 # Check cache first 

989 cache = self._get_auth_cache() 

990 if cache: 

991 cached_role = await cache.get_user_role(user_email, team_id) 

992 if cached_role is not None: 

993 # Empty string means "not a member" (cached None) 

994 return cached_role if cached_role else None 

995 

996 try: 

997 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).first() 

998 self.db.commit() # Release transaction to avoid idle-in-transaction 

999 

1000 role = membership.role if membership else None 

1001 

1002 # Store in cache 

1003 if cache: 

1004 await cache.set_user_role(user_email, team_id, role) 

1005 

1006 return role 

1007 

1008 except Exception as e: 

1009 self.db.rollback() 

1010 logger.error(f"Failed to get role for {user_email} in team {team_id}: {e}") 

1011 return None 

1012 

1013 async def list_teams( 

1014 self, 

1015 # Unified pagination params 

1016 limit: int = 100, 

1017 offset: int = 0, 

1018 cursor: Optional[str] = None, 

1019 page: Optional[int] = None, 

1020 per_page: int = 50, 

1021 include_inactive: bool = False, 

1022 visibility_filter: Optional[str] = None, 

1023 base_url: Optional[str] = None, 

1024 include_personal: bool = False, 

1025 search_query: Optional[str] = None, 

1026 ) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]: 

1027 """List teams with pagination support (cursor or page based). 

1028 

1029 Args: 

1030 limit: Max items for cursor pagination 

1031 offset: Offset for legacy/cursor pagination 

1032 cursor: Cursor token 

1033 page: Page number (1-indexed) 

1034 per_page: Items per page 

1035 include_inactive: Whether to include inactive teams 

1036 visibility_filter: Filter by visibility (private, team, public) 

1037 base_url: Base URL for pagination links 

1038 include_personal: Whether to include personal teams 

1039 search_query: Search term for name/slug/description 

1040 

1041 Returns: 

1042 Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]: 

1043 - Tuple (teams, next_cursor) if cursor/offset based 

1044 - Dict {data, pagination, links} if page based 

1045 """ 

1046 query = select(EmailTeam) 

1047 

1048 if not include_personal: 

1049 query = query.where(EmailTeam.is_personal.is_(False)) 

1050 

1051 if not include_inactive: 

1052 query = query.where(EmailTeam.is_active.is_(True)) 

1053 

1054 if visibility_filter: 

1055 query = query.where(EmailTeam.visibility == visibility_filter) 

1056 

1057 if search_query: 

1058 search_term = f"%{search_query}%" 

1059 query = query.where( 

1060 or_( 

1061 EmailTeam.name.ilike(search_term), 

1062 EmailTeam.slug.ilike(search_term), 

1063 EmailTeam.description.ilike(search_term), 

1064 ) 

1065 ) 

1066 

1067 # Choose ordering based on pagination mode: 

1068 # - Page-based (UI): alphabetical by name for user-friendly display 

1069 # - Cursor-based (API): created_at DESC, id DESC to match unified_paginate expectations 

1070 if page is not None: 

1071 query = query.order_by(EmailTeam.name, EmailTeam.id) 

1072 else: 

1073 query = query.order_by(desc(EmailTeam.created_at), desc(EmailTeam.id)) 

1074 

1075 # Base URL for pagination links (default to admin partial if not provided) 

1076 if not base_url: 

1077 base_url = f"{settings.app_root_path}/admin/teams/partial" 

1078 

1079 # Apply offset manually for legacy offset-based pagination if not using page or cursor 

1080 if not page and not cursor and offset > 0: 

1081 query = query.offset(offset) 

1082 

1083 result = await unified_paginate( 

1084 db=self.db, 

1085 query=query, 

1086 cursor=cursor, 

1087 limit=limit, 

1088 page=page, 

1089 per_page=per_page, 

1090 base_url=base_url, 

1091 ) 

1092 self.db.commit() # Release transaction to avoid idle-in-transaction 

1093 return result 

1094 

1095 async def get_all_team_ids( 

1096 self, 

1097 include_inactive: bool = False, 

1098 visibility_filter: Optional[str] = None, 

1099 include_personal: bool = False, 

1100 search_query: Optional[str] = None, 

1101 ) -> List[int]: 

1102 """Get all team IDs matching criteria (unpaginated). 

1103 

1104 Args: 

1105 include_inactive: Whether to include inactive teams 

1106 visibility_filter: Filter by visibility (private, team, public) 

1107 include_personal: Whether to include personal teams 

1108 search_query: Search term for name/slug 

1109 

1110 Returns: 

1111 List[int]: List of team IDs 

1112 """ 

1113 query = select(EmailTeam.id) 

1114 

1115 if not include_personal: 

1116 query = query.where(EmailTeam.is_personal.is_(False)) 

1117 

1118 if not include_inactive: 

1119 query = query.where(EmailTeam.is_active.is_(True)) 

1120 

1121 if visibility_filter: 

1122 query = query.where(EmailTeam.visibility == visibility_filter) 

1123 

1124 if search_query: 

1125 search_term = f"%{search_query}%" 

1126 query = query.where( 

1127 or_( 

1128 EmailTeam.name.ilike(search_term), 

1129 EmailTeam.slug.ilike(search_term), 

1130 ) 

1131 ) 

1132 

1133 result = self.db.execute(query) 

1134 team_ids = [row[0] for row in result.all()] 

1135 self.db.commit() # Release transaction to avoid idle-in-transaction 

1136 return team_ids 

1137 

1138 async def get_teams_count( 

1139 self, 

1140 include_inactive: bool = False, 

1141 visibility_filter: Optional[str] = None, 

1142 include_personal: bool = False, 

1143 search_query: Optional[str] = None, 

1144 ) -> int: 

1145 """Get total count of teams matching criteria. 

1146 

1147 Args: 

1148 include_inactive: Whether to include inactive teams 

1149 visibility_filter: Filter by visibility (private, team, public) 

1150 include_personal: Whether to include personal teams 

1151 search_query: Search term for name/slug 

1152 

1153 Returns: 

1154 int: Total count of matching teams 

1155 """ 

1156 query = select(func.count(EmailTeam.id)) # pylint: disable=not-callable 

1157 

1158 if not include_personal: 

1159 query = query.where(EmailTeam.is_personal.is_(False)) 

1160 

1161 if not include_inactive: 

1162 query = query.where(EmailTeam.is_active.is_(True)) 

1163 

1164 if visibility_filter: 

1165 query = query.where(EmailTeam.visibility == visibility_filter) 

1166 

1167 if search_query: 

1168 search_term = f"%{search_query}%" 

1169 query = query.where( 

1170 or_( 

1171 EmailTeam.name.ilike(search_term), 

1172 EmailTeam.slug.ilike(search_term), 

1173 ) 

1174 ) 

1175 

1176 result = self.db.execute(query) 

1177 count = result.scalar() or 0 

1178 self.db.commit() # Release transaction to avoid idle-in-transaction 

1179 return count 

1180 

1181 async def discover_public_teams(self, user_email: str, skip: int = 0, limit: Optional[int] = None) -> List[EmailTeam]: 

1182 """Discover public teams that user can join. 

1183 

1184 Args: 

1185 user_email: Email of the user discovering teams 

1186 skip: Number of teams to skip for pagination 

1187 limit: Maximum number of teams to return (None for unlimited) 

1188 

1189 Returns: 

1190 List[EmailTeam]: List of public teams user can join 

1191 

1192 Raises: 

1193 Exception: If discovery fails 

1194 """ 

1195 try: 

1196 # Optimized: Use subquery instead of loading all IDs into memory (2 queries → 1) 

1197 user_team_subquery = select(EmailTeamMember.team_id).where(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).scalar_subquery() 

1198 

1199 query = self.db.query(EmailTeam).filter(EmailTeam.visibility == "public", EmailTeam.is_active.is_(True), EmailTeam.is_personal.is_(False), ~EmailTeam.id.in_(user_team_subquery)) 

1200 

1201 query = query.offset(skip) 

1202 if limit is not None: 

1203 query = query.limit(limit) 

1204 teams = query.all() 

1205 self.db.commit() # Release transaction to avoid idle-in-transaction 

1206 return teams 

1207 

1208 except Exception as e: 

1209 self.db.rollback() 

1210 logger.error(f"Failed to discover public teams for {user_email}: {e}") 

1211 return [] 

1212 

1213 async def create_join_request(self, team_id: str, user_email: str, message: Optional[str] = None) -> "EmailTeamJoinRequest": 

1214 """Create a request to join a public team. 

1215 

1216 Args: 

1217 team_id: ID of the team to join 

1218 user_email: Email of the user requesting to join 

1219 message: Optional message to team owners 

1220 

1221 Returns: 

1222 EmailTeamJoinRequest: Created join request 

1223 

1224 Raises: 

1225 ValueError: If team not found, not public, or user already member/has pending request 

1226 """ 

1227 try: 

1228 # Validate team 

1229 team = await self.get_team_by_id(team_id) 

1230 if not team: 

1231 raise ValueError("Team not found") 

1232 

1233 if team.visibility != "public": 

1234 raise ValueError("Can only request to join public teams") 

1235 

1236 # Check if user is already a member 

1237 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

1238 

1239 if existing_member: 

1240 raise ValueError("User is already a member of this team") 

1241 

1242 # Check for existing requests (any status) 

1243 existing_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.user_email == user_email).first() 

1244 

1245 if existing_request: 

1246 if existing_request.status == "pending" and not existing_request.is_expired(): 

1247 raise ValueError("User already has a pending join request for this team") 

1248 

1249 # Update existing request (cancelled, rejected, expired) to pending 

1250 existing_request.message = message or "" 

1251 existing_request.status = "pending" 

1252 existing_request.requested_at = utc_now() 

1253 existing_request.expires_at = utc_now() + timedelta(days=7) 

1254 existing_request.reviewed_at = None 

1255 existing_request.reviewed_by = None 

1256 existing_request.notes = None 

1257 join_request = existing_request 

1258 else: 

1259 # Create new join request 

1260 join_request = EmailTeamJoinRequest(team_id=team_id, user_email=user_email, message=message, expires_at=utc_now() + timedelta(days=7)) 

1261 self.db.add(join_request) 

1262 

1263 self.db.commit() 

1264 self.db.refresh(join_request) 

1265 

1266 logger.info(f"Created join request for user {user_email} to team {team_id}") 

1267 return join_request 

1268 

1269 except Exception as e: 

1270 self.db.rollback() 

1271 logger.error(f"Failed to create join request: {e}") 

1272 raise 

1273 

1274 async def list_join_requests(self, team_id: str) -> List["EmailTeamJoinRequest"]: 

1275 """List pending join requests for a team. 

1276 

1277 Args: 

1278 team_id: ID of the team 

1279 

1280 Returns: 

1281 List[EmailTeamJoinRequest]: List of pending join requests 

1282 """ 

1283 try: 

1284 requests = ( 

1285 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.status == "pending").order_by(EmailTeamJoinRequest.requested_at.desc()).all() 

1286 ) 

1287 return requests 

1288 

1289 except Exception as e: 

1290 logger.error(f"Failed to list join requests for team {team_id}: {e}") 

1291 return [] 

1292 

1293 async def approve_join_request(self, request_id: str, approved_by: str) -> Optional[EmailTeamMember]: 

1294 """Approve a team join request. 

1295 

1296 Args: 

1297 request_id: ID of the join request 

1298 approved_by: Email of the user approving the request 

1299 

1300 Returns: 

1301 EmailTeamMember: New team member or None if request not found 

1302 

1303 Raises: 

1304 ValueError: If request not found, expired, or already processed 

1305 """ 

1306 try: 

1307 # Get join request 

1308 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first() 

1309 

1310 if not join_request: 

1311 raise ValueError("Join request not found or already processed") 

1312 

1313 if join_request.is_expired(): 

1314 join_request.status = "expired" 

1315 self.db.commit() 

1316 raise ValueError("Join request has expired") 

1317 

1318 # Add user to team 

1319 member = EmailTeamMember(team_id=join_request.team_id, user_email=join_request.user_email, role="member", invited_by=approved_by, joined_at=utc_now()) # New joiners are always members 

1320 

1321 self.db.add(member) 

1322 # Update join request status 

1323 join_request.status = "approved" 

1324 join_request.reviewed_at = utc_now() 

1325 join_request.reviewed_by = approved_by 

1326 

1327 self.db.flush() 

1328 self._log_team_member_action(member.id, join_request.team_id, join_request.user_email, member.role, "added", approved_by) 

1329 

1330 self.db.refresh(member) 

1331 

1332 # Invalidate auth cache for user's team membership and role 

1333 try: 

1334 asyncio.create_task(auth_cache.invalidate_team(join_request.user_email)) 

1335 asyncio.create_task(auth_cache.invalidate_user_role(join_request.user_email, join_request.team_id)) 

1336 asyncio.create_task(auth_cache.invalidate_user_teams(join_request.user_email)) 

1337 asyncio.create_task(auth_cache.invalidate_team_membership(join_request.user_email)) 

1338 asyncio.create_task(admin_stats_cache.invalidate_teams()) 

1339 except Exception as cache_error: 

1340 logger.debug(f"Failed to invalidate caches on join approval: {cache_error}") 

1341 

1342 # Invalidate member count cache for this team 

1343 await self.invalidate_team_member_count_cache(str(join_request.team_id)) 

1344 

1345 logger.info(f"Approved join request {request_id}: user {join_request.user_email} joined team {join_request.team_id}") 

1346 return member 

1347 

1348 except Exception as e: 

1349 self.db.rollback() 

1350 logger.error(f"Failed to approve join request {request_id}: {e}") 

1351 raise 

1352 

1353 async def reject_join_request(self, request_id: str, rejected_by: str) -> bool: 

1354 """Reject a team join request. 

1355 

1356 Args: 

1357 request_id: ID of the join request 

1358 rejected_by: Email of the user rejecting the request 

1359 

1360 Returns: 

1361 bool: True if request was rejected successfully 

1362 

1363 Raises: 

1364 ValueError: If request not found or already processed 

1365 """ 

1366 try: 

1367 # Get join request 

1368 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first() 

1369 

1370 if not join_request: 

1371 raise ValueError("Join request not found or already processed") 

1372 

1373 # Update join request status 

1374 join_request.status = "rejected" 

1375 join_request.reviewed_at = utc_now() 

1376 join_request.reviewed_by = rejected_by 

1377 

1378 self.db.commit() 

1379 

1380 logger.info(f"Rejected join request {request_id}: user {join_request.user_email} for team {join_request.team_id}") 

1381 return True 

1382 

1383 except Exception as e: 

1384 self.db.rollback() 

1385 logger.error(f"Failed to reject join request {request_id}: {e}") 

1386 raise 

1387 

1388 async def get_user_join_requests(self, user_email: str, team_id: Optional[str] = None) -> List["EmailTeamJoinRequest"]: 

1389 """Get join requests made by a user. 

1390 

1391 Args: 

1392 user_email: Email of the user 

1393 team_id: Optional team ID to filter requests 

1394 

1395 Returns: 

1396 List[EmailTeamJoinRequest]: List of join requests made by the user 

1397 

1398 Examples: 

1399 Get all requests made by a user or for a specific team. 

1400 """ 

1401 try: 

1402 query = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email) 

1403 

1404 if team_id: 

1405 query = query.filter(EmailTeamJoinRequest.team_id == team_id) 

1406 

1407 requests = query.all() 

1408 return requests 

1409 

1410 except Exception as e: 

1411 logger.error(f"Failed to get join requests for user {user_email}: {e}") 

1412 return [] 

1413 

1414 async def cancel_join_request(self, request_id: str, user_email: str) -> bool: 

1415 """Cancel a join request. 

1416 

1417 Args: 

1418 request_id: ID of the join request to cancel 

1419 user_email: Email of the user canceling the request 

1420 

1421 Returns: 

1422 bool: True if canceled successfully, False otherwise 

1423 

1424 Examples: 

1425 Allow users to cancel their pending join requests. 

1426 """ 

1427 try: 

1428 # Get the join request 

1429 join_request = ( 

1430 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.status == "pending").first() 

1431 ) 

1432 

1433 if not join_request: 

1434 logger.warning(f"Join request {request_id} not found for user {user_email} or not pending") 

1435 return False 

1436 

1437 # Update join request status 

1438 join_request.status = "cancelled" 

1439 join_request.reviewed_at = utc_now() 

1440 join_request.reviewed_by = user_email 

1441 

1442 self.db.commit() 

1443 

1444 logger.info(f"Cancelled join request {request_id} by user {user_email}") 

1445 return True 

1446 

1447 except Exception as e: 

1448 self.db.rollback() 

1449 logger.error(f"Failed to cancel join request {request_id}: {e}") 

1450 return False 

1451 

1452 # ================================================================================== 

1453 # Batch Query Methods (N+1 Query Elimination - Issue #1892) 

1454 # ================================================================================== 

1455 

1456 def get_member_counts_batch(self, team_ids: List[str]) -> Dict[str, int]: 

1457 """Get member counts for multiple teams in a single query. 

1458 

1459 This is a synchronous method following the existing service pattern. 

1460 Note: Like other sync SQLAlchemy calls, this will block the event 

1461 loop in async contexts. For typical team counts this is acceptable. 

1462 

1463 Args: 

1464 team_ids: List of team UUIDs 

1465 

1466 Returns: 

1467 Dict mapping team_id to member count 

1468 

1469 Raises: 

1470 Exception: Re-raises any database errors after rollback 

1471 

1472 Examples: 

1473 >>> from unittest.mock import Mock 

1474 >>> service = TeamManagementService(Mock()) 

1475 >>> service.get_member_counts_batch([]) 

1476 {} 

1477 """ 

1478 if not team_ids: 

1479 return {} 

1480 

1481 try: 

1482 # Single query for all teams 

1483 results = ( 

1484 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable 

1485 .filter(EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True)) 

1486 .group_by(EmailTeamMember.team_id) 

1487 .all() 

1488 ) 

1489 

1490 self.db.commit() # Release transaction to avoid idle-in-transaction 

1491 

1492 # Build result dict, defaulting to 0 for teams with no members 

1493 counts = {str(row.team_id): row.count for row in results} 

1494 return {tid: counts.get(tid, 0) for tid in team_ids} 

1495 except Exception as e: 

1496 self.db.rollback() 

1497 logger.error(f"Failed to get member counts for teams: {e}") 

1498 raise 

1499 

1500 def get_user_roles_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[str]]: 

1501 """Get a user's role in multiple teams in a single query. 

1502 

1503 Args: 

1504 user_email: Email of the user 

1505 team_ids: List of team UUIDs 

1506 

1507 Returns: 

1508 Dict mapping team_id to role (or None if not a member) 

1509 

1510 Raises: 

1511 Exception: Re-raises any database errors after rollback 

1512 """ 

1513 if not team_ids: 

1514 return {} 

1515 

1516 try: 

1517 # Single query for all teams 

1518 results = ( 

1519 self.db.query(EmailTeamMember.team_id, EmailTeamMember.role) 

1520 .filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True)) 

1521 .all() 

1522 ) 

1523 

1524 self.db.commit() # Release transaction to avoid idle-in-transaction 

1525 

1526 # Build result dict - teams with no membership return None 

1527 roles = {str(row.team_id): row.role for row in results} 

1528 return {tid: roles.get(tid) for tid in team_ids} 

1529 except Exception as e: 

1530 self.db.rollback() 

1531 logger.error(f"Failed to get user roles for {user_email}: {e}") 

1532 raise 

1533 

1534 def get_pending_join_requests_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[Any]]: 

1535 """Get pending join requests for a user across multiple teams in a single query. 

1536 

1537 Args: 

1538 user_email: Email of the user 

1539 team_ids: List of team UUIDs to check 

1540 

1541 Returns: 

1542 Dict mapping team_id to pending EmailTeamJoinRequest (or None if no pending request) 

1543 

1544 Raises: 

1545 Exception: Re-raises any database errors after rollback 

1546 """ 

1547 if not team_ids: 

1548 return {} 

1549 

1550 try: 

1551 # Single query for all pending requests across teams 

1552 results = ( 

1553 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.team_id.in_(team_ids), EmailTeamJoinRequest.status == "pending").all() 

1554 ) 

1555 

1556 self.db.commit() # Release transaction to avoid idle-in-transaction 

1557 

1558 # Build result dict - only one pending request per team expected 

1559 pending_reqs = {str(req.team_id): req for req in results} 

1560 return {tid: pending_reqs.get(tid) for tid in team_ids} 

1561 except Exception as e: 

1562 self.db.rollback() 

1563 logger.error(f"Failed to get pending join requests for {user_email}: {e}") 

1564 raise 

1565 

1566 # ================================================================================== 

1567 # Cached Batch Methods (Redis caching for member counts) 

1568 # ================================================================================== 

1569 

1570 def _get_member_count_cache_key(self, team_id: str) -> str: 

1571 """Build cache key using settings.cache_prefix for consistency. 

1572 

1573 Args: 

1574 team_id: Team UUID to build cache key for 

1575 

1576 Returns: 

1577 Cache key string in format "{prefix}team:member_count:{team_id}" 

1578 """ 

1579 cache_prefix = getattr(settings, "cache_prefix", "mcpgw:") 

1580 return f"{cache_prefix}team:member_count:{team_id}" 

1581 

1582 async def get_member_counts_batch_cached(self, team_ids: List[str]) -> Dict[str, int]: 

1583 """Get member counts for multiple teams, using Redis cache with DB fallback. 

1584 

1585 Caching behavior is controlled by settings: 

1586 - team_member_count_cache_enabled: Enable/disable caching (default: True) 

1587 - team_member_count_cache_ttl: Cache TTL in seconds (default: 300) 

1588 

1589 Args: 

1590 team_ids: List of team UUIDs 

1591 

1592 Returns: 

1593 Dict mapping team_id to member count 

1594 

1595 Raises: 

1596 Exception: Re-raises any database errors after rollback 

1597 """ 

1598 if not team_ids: 

1599 return {} 

1600 

1601 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True) 

1602 cache_ttl = getattr(settings, "team_member_count_cache_ttl", 300) 

1603 

1604 # If caching disabled, go straight to batch DB query 

1605 if not cache_enabled: 

1606 return self.get_member_counts_batch(team_ids) 

1607 

1608 # Import Redis client lazily 

1609 try: 

1610 # First-Party 

1611 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel 

1612 

1613 redis_client = await get_redis_client() 

1614 except Exception: 

1615 redis_client = None 

1616 

1617 result: Dict[str, int] = {} 

1618 cache_misses: List[str] = [] 

1619 

1620 # Step 1: Check Redis cache for all team IDs 

1621 if redis_client: 

1622 try: 

1623 cache_keys = [self._get_member_count_cache_key(tid) for tid in team_ids] 

1624 cached_values = await redis_client.mget(cache_keys) 

1625 

1626 for tid, cached in zip(team_ids, cached_values): 

1627 if cached is not None: 

1628 result[tid] = int(cached) 

1629 else: 

1630 cache_misses.append(tid) 

1631 except Exception as e: 

1632 logger.warning(f"Redis cache read failed, falling back to DB: {e}") 

1633 cache_misses = list(team_ids) 

1634 else: 

1635 # No Redis available, fall back to DB 

1636 cache_misses = list(team_ids) 

1637 

1638 # Step 2: Query database for cache misses 

1639 if cache_misses: 

1640 try: 

1641 db_results = ( 

1642 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable 

1643 .filter(EmailTeamMember.team_id.in_(cache_misses), EmailTeamMember.is_active.is_(True)) 

1644 .group_by(EmailTeamMember.team_id) 

1645 .all() 

1646 ) 

1647 

1648 self.db.commit() 

1649 

1650 db_counts = {str(row.team_id): row.count for row in db_results} 

1651 

1652 # Fill in results and cache them 

1653 for tid in cache_misses: 

1654 count = db_counts.get(tid, 0) 

1655 result[tid] = count 

1656 

1657 # Step 3: Cache the result with configured TTL 

1658 if redis_client: 

1659 try: 

1660 await redis_client.setex(self._get_member_count_cache_key(tid), cache_ttl, str(count)) 

1661 except Exception as e: 

1662 logger.warning(f"Redis cache write failed for team {tid}: {e}") 

1663 

1664 except Exception as e: 

1665 self.db.rollback() 

1666 logger.error(f"Failed to get member counts for teams: {e}") 

1667 raise 

1668 

1669 return result 

1670 

1671 async def invalidate_team_member_count_cache(self, team_id: str) -> None: 

1672 """Invalidate the cached member count for a team. 

1673 

1674 Call this after any membership changes (add/remove/update). 

1675 No-op if caching is disabled or Redis unavailable. 

1676 

1677 Args: 

1678 team_id: Team UUID to invalidate 

1679 """ 

1680 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True) 

1681 if not cache_enabled: 

1682 return 

1683 

1684 try: 

1685 # First-Party 

1686 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel 

1687 

1688 redis_client = await get_redis_client() 

1689 if redis_client: 

1690 await redis_client.delete(self._get_member_count_cache_key(team_id)) 

1691 except Exception as e: 

1692 logger.warning(f"Failed to invalidate member count cache for team {team_id}: {e}")