Coverage for mcpgateway / services / team_management_service.py: 98%

782 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/team_management_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Management Service. 

8This module provides team creation, management, and membership operations 

9for the multi-team collaboration system. 

10 

11Examples: 

12 >>> from unittest.mock import Mock 

13 >>> service = TeamManagementService(Mock()) 

14 >>> isinstance(service, TeamManagementService) 

15 True 

16 >>> hasattr(service, 'db') 

17 True 

18""" 

19 

20# Standard 

21import asyncio 

22import base64 

23from datetime import datetime, timedelta 

24from typing import Any, Dict, List, Optional, Tuple, Union 

25 

26# Third-Party 

27import orjson 

28from sqlalchemy import and_, desc, func, or_, select 

29from sqlalchemy.orm import selectinload, Session 

30 

31# First-Party 

32from mcpgateway.cache.admin_stats_cache import admin_stats_cache 

33from mcpgateway.cache.auth_cache import auth_cache, get_auth_cache 

34from mcpgateway.config import settings 

35from mcpgateway.db import EmailTeam, EmailTeamJoinRequest, EmailTeamMember, EmailTeamMemberHistory, EmailUser, utc_now 

36from mcpgateway.services.logging_service import LoggingService 

37from mcpgateway.utils.create_slug import slugify 

38from mcpgateway.utils.pagination import unified_paginate 

39from mcpgateway.utils.redis_client import get_redis_client 

40 

41# Initialize logging 

42logging_service = LoggingService() 

43logger = logging_service.get_logger(__name__) 

44 

45 

46def get_user_team_count(db: Session, user_email: str) -> int: 

47 """Get the number of active teams a user belongs to. 

48 

49 Args: 

50 db: SQLAlchemy database session 

51 user_email: Email address of the user 

52 

53 Returns: 

54 int: Number of active team memberships 

55 """ 

56 return db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).count() 

57 

58 

59class TeamManagementError(Exception): 

60 """Base class for team management-related errors. 

61 

62 Examples: 

63 >>> error = TeamManagementError("Test error") 

64 >>> str(error) 

65 'Test error' 

66 >>> isinstance(error, Exception) 

67 True 

68 """ 

69 

70 

71class InvalidRoleError(TeamManagementError): 

72 """Raised when an invalid role is specified. 

73 

74 Examples: 

75 >>> error = InvalidRoleError("Invalid role: guest") 

76 >>> str(error) 

77 'Invalid role: guest' 

78 >>> isinstance(error, TeamManagementError) 

79 True 

80 """ 

81 

82 

83class TeamNotFoundError(TeamManagementError): 

84 """Raised when a team does not exist. 

85 

86 Examples: 

87 >>> error = TeamNotFoundError("Team not found: team-123") 

88 >>> str(error) 

89 'Team not found: team-123' 

90 >>> isinstance(error, TeamManagementError) 

91 True 

92 """ 

93 

94 

95class UserNotFoundError(TeamManagementError): 

96 """Raised when a user does not exist. 

97 

98 Examples: 

99 >>> error = UserNotFoundError("User not found: user@example.com") 

100 >>> str(error) 

101 'User not found: user@example.com' 

102 >>> isinstance(error, TeamManagementError) 

103 True 

104 """ 

105 

106 

107class MemberAlreadyExistsError(TeamManagementError): 

108 """Raised when a user is already a member of the team. 

109 

110 Examples: 

111 >>> error = MemberAlreadyExistsError("User user@example.com is already a member of team team-123") 

112 >>> str(error) 

113 'User user@example.com is already a member of team team-123' 

114 >>> isinstance(error, TeamManagementError) 

115 True 

116 """ 

117 

118 

119class TeamMemberLimitExceededError(TeamManagementError): 

120 """Raised when a team has reached its maximum member limit. 

121 

122 Examples: 

123 >>> error = TeamMemberLimitExceededError("Team has reached maximum member limit of 10") 

124 >>> str(error) 

125 'Team has reached maximum member limit of 10' 

126 >>> isinstance(error, TeamManagementError) 

127 True 

128 """ 

129 

130 

131class TeamMemberAddError(TeamManagementError): 

132 """Raised when adding a member to a team fails due to database or system errors. 

133 

134 Examples: 

135 >>> error = TeamMemberAddError("Failed to add member due to database error") 

136 >>> str(error) 

137 'Failed to add member due to database error' 

138 >>> isinstance(error, TeamManagementError) 

139 True 

140 """ 

141 

142 

143class TeamManagementService: 

144 """Service for team management operations. 

145 

146 This service handles team creation, membership management, 

147 role assignments, and team access control. 

148 

149 Attributes: 

150 db (Session): SQLAlchemy database session 

151 

152 Examples: 

153 >>> from unittest.mock import Mock 

154 >>> service = TeamManagementService(Mock()) 

155 >>> service.__class__.__name__ 

156 'TeamManagementService' 

157 >>> hasattr(service, 'db') 

158 True 

159 """ 

160 

161 def __init__(self, db: Session): 

162 """Initialize the team management service. 

163 

164 Args: 

165 db: SQLAlchemy database session 

166 

167 Examples: 

168 Basic initialization: 

169 >>> from mcpgateway.services.team_management_service import TeamManagementService 

170 >>> from unittest.mock import Mock 

171 >>> db_session = Mock() 

172 >>> service = TeamManagementService(db_session) 

173 >>> service.db is db_session 

174 True 

175 

176 Service attributes: 

177 >>> hasattr(service, 'db') 

178 True 

179 >>> service.__class__.__name__ 

180 'TeamManagementService' 

181 """ 

182 self.db = db 

183 self._role_service = None # Lazy initialization to avoid circular imports 

184 

185 @property 

186 def role_service(self): 

187 """Lazy-initialized RoleService to avoid circular imports. 

188 

189 Returns: 

190 RoleService: Instance of RoleService 

191 """ 

192 if self._role_service is None: 

193 # First-Party 

194 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

195 

196 self._role_service = RoleService(self.db) 

197 return self._role_service 

198 

199 def _get_user_team_count(self, user_email: str) -> int: 

200 """Get the number of active teams a user belongs to. 

201 

202 Args: 

203 user_email: Email address of the user 

204 

205 Returns: 

206 int: Number of active team memberships 

207 """ 

208 return get_user_team_count(self.db, user_email) 

209 

210 @staticmethod 

211 def _get_rbac_role_name(membership_role: str) -> str: 

212 """Map a team membership role to the corresponding configurable RBAC role name. 

213 

214 Args: 

215 membership_role: Team membership role ('owner' or 'member'). 

216 

217 Returns: 

218 str: The configured RBAC role name from settings. 

219 """ 

220 return settings.default_team_owner_role if membership_role == "owner" else settings.default_team_member_role 

221 

222 @staticmethod 

223 def _fire_and_forget(coro: Any) -> None: 

224 """Schedule a background coroutine and close it if scheduling fails. 

225 

226 Args: 

227 coro: The coroutine to schedule as a background task. 

228 

229 Raises: 

230 Exception: If asyncio.create_task fails (e.g. no running loop). 

231 """ 

232 try: 

233 task = asyncio.create_task(coro) 

234 # Some tests patch create_task with a plain Mock return value. In that 

235 # case the coroutine is never actually scheduled and must be closed. 

236 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task): 

237 close = getattr(coro, "close", None) 

238 if callable(close): 

239 close() 

240 except Exception: 

241 # If create_task() fails (e.g. no running loop), the coroutine has 

242 # already been created and must be closed to avoid runtime warnings. 

243 close = getattr(coro, "close", None) 

244 if callable(close): 

245 close() 

246 raise 

247 

248 def _log_team_member_action(self, team_member_id: str, team_id: str, user_email: str, role: str, action: str, action_by: Optional[str]): 

249 """ 

250 Log a team member action to EmailTeamMemberHistory. 

251 

252 Args: 

253 team_member_id: ID of the EmailTeamMember 

254 team_id: Team ID 

255 user_email: Email of the affected user 

256 role: Role at the time of action 

257 action: Action type ("added", "removed", "reactivated", "role_changed") 

258 action_by: Email of the user who performed the action 

259 

260 Examples: 

261 >>> from mcpgateway.services.team_management_service import TeamManagementService 

262 >>> from unittest.mock import Mock 

263 >>> service = TeamManagementService(Mock()) 

264 >>> service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com") 

265 """ 

266 history = EmailTeamMemberHistory(team_member_id=team_member_id, team_id=team_id, user_email=user_email, role=role, action=action, action_by=action_by, action_timestamp=utc_now()) 

267 self.db.add(history) 

268 self.db.commit() 

269 

270 async def create_team( 

271 self, name: str, description: Optional[str], created_by: str, visibility: Optional[str] = "public", max_members: Optional[int] = None, skip_limits: bool = False 

272 ) -> EmailTeam: 

273 """Create a new team. 

274 

275 Args: 

276 name: Team name 

277 description: Team description 

278 created_by: Email of the user creating the team 

279 visibility: Team visibility (private, team, public) 

280 max_members: Maximum number of team members allowed 

281 skip_limits: Skip max_teams_per_user check (for admin bypass) 

282 

283 Returns: 

284 EmailTeam: The created team 

285 

286 Raises: 

287 ValueError: If team name is taken or invalid 

288 Exception: If team creation fails 

289 

290 Examples: 

291 Team creation parameter validation: 

292 >>> from mcpgateway.services.team_management_service import TeamManagementService 

293 

294 Test team name validation: 

295 >>> team_name = "My Development Team" 

296 >>> len(team_name) > 0 

297 True 

298 >>> len(team_name) <= 255 

299 True 

300 >>> bool(team_name.strip()) 

301 True 

302 

303 Test visibility validation: 

304 >>> visibility = "private" 

305 >>> valid_visibilities = ["private", "public"] 

306 >>> visibility in valid_visibilities 

307 True 

308 >>> "invalid" in valid_visibilities 

309 False 

310 

311 Test max_members validation: 

312 >>> max_members = 50 

313 >>> isinstance(max_members, int) 

314 True 

315 >>> max_members > 0 

316 True 

317 

318 Test creator validation: 

319 >>> created_by = "admin@example.com" 

320 >>> "@" in created_by 

321 True 

322 >>> len(created_by) > 0 

323 True 

324 

325 Test description handling: 

326 >>> description = "A team for software development" 

327 >>> description is not None 

328 True 

329 >>> isinstance(description, str) 

330 True 

331 

332 >>> # Test None description 

333 >>> description_none = None 

334 >>> description_none is None 

335 True 

336 """ 

337 try: 

338 # Validate visibility 

339 valid_visibilities = ["private", "public"] 

340 if visibility not in valid_visibilities: 

341 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}") 

342 

343 # Check max teams per user 

344 if not skip_limits: 

345 max_teams = getattr(settings, "max_teams_per_user", 50) 

346 if self._get_user_team_count(created_by) >= max_teams: 

347 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

348 

349 # Apply default max members from settings 

350 if max_members is None: 

351 max_members = getattr(settings, "max_members_per_team", 100) 

352 

353 # Check for existing inactive team with same name 

354 

355 potential_slug = slugify(name) 

356 existing_inactive_team = self.db.query(EmailTeam).filter(EmailTeam.slug == potential_slug, EmailTeam.is_active.is_(False)).first() 

357 

358 if existing_inactive_team: 

359 # Reactivate the existing team with new details 

360 existing_inactive_team.name = name 

361 existing_inactive_team.description = description 

362 existing_inactive_team.created_by = created_by 

363 existing_inactive_team.visibility = visibility 

364 existing_inactive_team.max_members = max_members 

365 existing_inactive_team.is_active = True 

366 existing_inactive_team.updated_at = utc_now() 

367 team = existing_inactive_team 

368 

369 # Check if the creator already has an inactive membership 

370 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email == created_by).first() 

371 

372 if existing_membership: 

373 # Reactivate existing membership as owner 

374 existing_membership.role = "owner" 

375 existing_membership.joined_at = utc_now() 

376 existing_membership.is_active = True 

377 membership = existing_membership 

378 else: 

379 # Create new membership 

380 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True) 

381 self.db.add(membership) 

382 

383 logger.info(f"Reactivated existing team with slug {potential_slug}") 

384 else: 

385 # Create the team (slug will be auto-generated by event listener) 

386 team = EmailTeam(name=name, description=description, created_by=created_by, is_personal=False, visibility=visibility, max_members=max_members, is_active=True) 

387 self.db.add(team) 

388 

389 self.db.flush() # Get the team ID 

390 

391 # Add the creator as owner 

392 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True) 

393 self.db.add(membership) 

394 

395 self.db.commit() 

396 

397 # Invalidate member count cache for the new team 

398 await self.invalidate_team_member_count_cache(str(team.id)) 

399 

400 # Invalidate auth cache for creator's team membership 

401 # Without this, the cache won't know the user belongs to this new team 

402 try: 

403 await auth_cache.invalidate_user_teams(created_by) 

404 await auth_cache.invalidate_team_membership(created_by) 

405 await auth_cache.invalidate_user_role(created_by, str(team.id)) 

406 await admin_stats_cache.invalidate_teams() 

407 except Exception as cache_error: 

408 logger.debug(f"Failed to invalidate cache on team create: {cache_error}") 

409 

410 logger.info(f"Created team '{team.name}' by {created_by}") 

411 return team 

412 

413 except Exception as e: 

414 self.db.rollback() 

415 logger.error(f"Failed to create team '{name}': {e}") 

416 raise 

417 

418 async def get_team_by_id(self, team_id: str) -> Optional[EmailTeam]: 

419 """Get a team by ID. 

420 

421 Args: 

422 team_id: Team ID to lookup 

423 

424 Returns: 

425 EmailTeam: The team or None if not found 

426 

427 Examples: 

428 >>> import asyncio 

429 >>> from unittest.mock import Mock 

430 >>> service = TeamManagementService(Mock()) 

431 >>> asyncio.iscoroutinefunction(service.get_team_by_id) 

432 True 

433 """ 

434 try: 

435 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

436 self.db.commit() # Release transaction to avoid idle-in-transaction 

437 return team 

438 

439 except Exception as e: 

440 self.db.rollback() 

441 logger.error(f"Failed to get team by ID {team_id}: {e}") 

442 return None 

443 

444 async def get_team_by_slug(self, slug: str) -> Optional[EmailTeam]: 

445 """Get a team by slug. 

446 

447 Args: 

448 slug: Team slug to lookup 

449 

450 Returns: 

451 EmailTeam: The team or None if not found 

452 

453 Examples: 

454 >>> import asyncio 

455 >>> from unittest.mock import Mock 

456 >>> service = TeamManagementService(Mock()) 

457 >>> asyncio.iscoroutinefunction(service.get_team_by_slug) 

458 True 

459 """ 

460 try: 

461 team = self.db.query(EmailTeam).filter(EmailTeam.slug == slug, EmailTeam.is_active.is_(True)).first() 

462 self.db.commit() # Release transaction to avoid idle-in-transaction 

463 return team 

464 

465 except Exception as e: 

466 self.db.rollback() 

467 logger.error(f"Failed to get team by slug {slug}: {e}") 

468 return None 

469 

470 async def update_team( 

471 self, team_id: str, name: Optional[str] = None, description: Optional[str] = None, visibility: Optional[str] = None, max_members: Optional[int] = None, updated_by: Optional[str] = None 

472 ) -> bool: 

473 """Update team information. 

474 

475 Args: 

476 team_id: ID of the team to update 

477 name: New team name 

478 description: New team description 

479 visibility: New visibility setting 

480 max_members: New maximum member limit 

481 updated_by: Email of user making the update 

482 

483 Returns: 

484 bool: True if update succeeded, False otherwise 

485 

486 Raises: 

487 ValueError: If visibility setting is invalid 

488 

489 Examples: 

490 >>> import asyncio 

491 >>> from unittest.mock import Mock 

492 >>> service = TeamManagementService(Mock()) 

493 >>> asyncio.iscoroutinefunction(service.update_team) 

494 True 

495 """ 

496 try: 

497 team = await self.get_team_by_id(team_id) 

498 if not team: 

499 logger.warning(f"Team {team_id} not found for update") 

500 return False 

501 

502 # Prevent updating personal teams 

503 if team.is_personal: 

504 logger.warning(f"Cannot update personal team {team_id}") 

505 return False 

506 

507 # Update fields if provided 

508 if name is not None: 

509 team.name = name 

510 # Slug will be updated by event listener if name changes 

511 

512 if description is not None: 

513 team.description = description 

514 

515 if visibility is not None: 

516 valid_visibilities = ["private", "public"] 

517 if visibility not in valid_visibilities: 

518 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}") 

519 team.visibility = visibility 

520 

521 if max_members is not None: 

522 team.max_members = max_members 

523 

524 team.updated_at = utc_now() 

525 self.db.commit() 

526 

527 logger.info(f"Updated team {team_id} by {updated_by}") 

528 return True 

529 

530 except ValueError: 

531 raise # Let ValueError propagate to caller for proper error handling 

532 except Exception as e: 

533 self.db.rollback() 

534 logger.error(f"Failed to update team {team_id}: {e}") 

535 return False 

536 

537 async def delete_team(self, team_id: str, deleted_by: str) -> bool: 

538 """Delete a team (soft delete). 

539 

540 Args: 

541 team_id: ID of the team to delete 

542 deleted_by: Email of user performing deletion 

543 

544 Returns: 

545 bool: True if deletion succeeded, False otherwise 

546 

547 Raises: 

548 ValueError: If attempting to delete a personal team 

549 

550 Examples: 

551 >>> import asyncio 

552 >>> from unittest.mock import Mock 

553 >>> service = TeamManagementService(Mock()) 

554 >>> asyncio.iscoroutinefunction(service.delete_team) 

555 True 

556 """ 

557 try: 

558 team = await self.get_team_by_id(team_id) 

559 if not team: 

560 logger.warning(f"Team {team_id} not found for deletion") 

561 return False 

562 

563 # Prevent deleting personal teams 

564 if team.is_personal: 

565 logger.warning(f"Cannot delete personal team {team_id}") 

566 raise ValueError("Personal teams cannot be deleted") 

567 

568 # Soft delete the team 

569 team.is_active = False 

570 team.updated_at = utc_now() 

571 

572 # Get all active memberships before deactivating (for history logging) 

573 memberships = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).all() 

574 

575 # Log history for each membership (before bulk update) 

576 for membership in memberships: 

577 self._log_team_member_action(membership.id, team_id, membership.user_email, membership.role, "team-deleted", deleted_by) 

578 

579 # Bulk update: deactivate all memberships in single query instead of looping 

580 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).update({EmailTeamMember.is_active: False}, synchronize_session=False) 

581 

582 self.db.commit() 

583 

584 # Invalidate all role caches for this team 

585 try: 

586 self._fire_and_forget(auth_cache.invalidate_team_roles(team_id)) 

587 self._fire_and_forget(admin_stats_cache.invalidate_teams()) 

588 # Also invalidate team cache, teams list cache, and team membership cache for each member 

589 for membership in memberships: 

590 self._fire_and_forget(auth_cache.invalidate_team(membership.user_email)) 

591 self._fire_and_forget(auth_cache.invalidate_user_teams(membership.user_email)) 

592 self._fire_and_forget(auth_cache.invalidate_team_membership(membership.user_email)) 

593 except Exception as cache_error: 

594 logger.debug(f"Failed to invalidate caches on team delete: {cache_error}") 

595 

596 logger.info(f"Deleted team {team_id} by {deleted_by}") 

597 return True 

598 

599 except Exception as e: 

600 self.db.rollback() 

601 logger.error(f"Failed to delete team {team_id}: {e}") 

602 return False 

603 

604 async def add_member_to_team(self, team_id: str, user_email: str, role: str = "member", invited_by: Optional[str] = None) -> EmailTeamMember: 

605 """Add a member to a team. 

606 

607 Args: 

608 team_id: ID of the team 

609 user_email: Email of the user to add 

610 role: Role to assign (owner, member) 

611 invited_by: Email of user who added this member 

612 

613 Returns: 

614 EmailTeamMember: The created or reactivated team member object 

615 

616 Raises: 

617 InvalidRoleError: If role is invalid 

618 TeamNotFoundError: If team does not exist 

619 TeamManagementError: If team is a personal team 

620 UserNotFoundError: If user does not exist 

621 MemberAlreadyExistsError: If user is already a member 

622 TeamMemberLimitExceededError: If team has reached maximum member limit 

623 TeamMemberAddError: If adding member fails due to database or system errors 

624 

625 Examples: 

626 >>> import asyncio 

627 >>> from unittest.mock import Mock 

628 >>> service = TeamManagementService(Mock()) 

629 >>> asyncio.iscoroutinefunction(service.add_member_to_team) 

630 True 

631 >>> # After adding, EmailTeamMemberHistory is updated 

632 >>> # service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com") 

633 """ 

634 # Validate role 

635 valid_roles = ["owner", "member"] 

636 if role not in valid_roles: 

637 raise InvalidRoleError(f"Invalid role '{role}'. Must be one of: {', '.join(valid_roles)}") 

638 

639 # Check if team exists 

640 team = await self.get_team_by_id(team_id) 

641 if not team: 

642 logger.warning(f"Team {team_id} not found") 

643 raise TeamNotFoundError("Team not found") 

644 

645 # Prevent adding members to personal teams 

646 if team.is_personal: 

647 logger.warning(f"Cannot add members to personal team {team_id}") 

648 raise TeamManagementError("Cannot add members to personal teams") 

649 

650 # Check if user exists 

651 user = self.db.query(EmailUser).filter(EmailUser.email == user_email).first() 

652 if not user: 

653 logger.warning(f"User {user_email} not found") 

654 raise UserNotFoundError("User not found") 

655 

656 # Check max teams per user 

657 max_teams = getattr(settings, "max_teams_per_user", 50) 

658 if self._get_user_team_count(user_email) >= max_teams: 

659 raise TeamManagementError(f"User has reached the maximum team limit of {max_teams}") 

660 

661 # Check if user is already a member 

662 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email).first() 

663 

664 if existing_membership and existing_membership.is_active: 

665 logger.warning(f"User {user_email} is already a member of team {team_id}") 

666 raise MemberAlreadyExistsError("User is already a member of this team") 

667 

668 # Check team member limit 

669 if team.max_members: 

670 current_member_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).count() 

671 

672 if current_member_count >= team.max_members: 

673 logger.warning(f"Team {team_id} has reached maximum member limit of {team.max_members}") 

674 raise TeamMemberLimitExceededError(f"Team has reached maximum member limit of {team.max_members}") 

675 

676 # Add or reactivate membership 

677 try: 

678 if existing_membership: 

679 existing_membership.is_active = True 

680 existing_membership.role = role 

681 existing_membership.joined_at = utc_now() 

682 existing_membership.invited_by = invited_by 

683 self.db.commit() 

684 self._log_team_member_action(existing_membership.id, team_id, user_email, role, "reactivated", invited_by) 

685 member = existing_membership 

686 else: 

687 membership = EmailTeamMember(team_id=team_id, user_email=user_email, role=role, joined_at=utc_now(), invited_by=invited_by, is_active=True) 

688 self.db.add(membership) 

689 self.db.commit() 

690 self._log_team_member_action(membership.id, team_id, user_email, role, "added", invited_by) 

691 member = membership 

692 

693 # Assign team-scoped RBAC role matching the membership role (owner or member) 

694 try: 

695 rbac_role_name = self._get_rbac_role_name(role) 

696 team_rbac_role = await self.role_service.get_role_by_name(rbac_role_name, scope="team") 

697 if team_rbac_role: 

698 existing = await self.role_service.get_user_role_assignment(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id) 

699 if not existing or not existing.is_active: 

700 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id, granted_by=invited_by or user_email) 

701 logger.info(f"Assigned {rbac_role_name} role to {user_email} for team {team_id}") 

702 else: 

703 logger.debug(f"User {user_email} already has active {rbac_role_name} role for team {team_id}") 

704 else: 

705 logger.warning(f"Role '{rbac_role_name}' not found. User {user_email} added without RBAC role.") 

706 except Exception as role_error: 

707 logger.warning(f"Failed to assign role to {user_email}: {role_error}") 

708 

709 # Invalidate auth cache for user's team membership and role 

710 try: 

711 self._fire_and_forget(auth_cache.invalidate_team(user_email)) 

712 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id)) 

713 self._fire_and_forget(auth_cache.invalidate_user_teams(user_email)) 

714 self._fire_and_forget(auth_cache.invalidate_team_membership(user_email)) 

715 self._fire_and_forget(admin_stats_cache.invalidate_teams()) 

716 except Exception as cache_error: 

717 logger.debug(f"Failed to invalidate cache on team add: {cache_error}") 

718 

719 # Invalidate member count cache for this team 

720 await self.invalidate_team_member_count_cache(str(team_id)) 

721 

722 logger.info(f"Added {user_email} to team {team_id} with role {role}") 

723 return member 

724 

725 except Exception as e: 

726 self.db.rollback() 

727 logger.error(f"Failed to add {user_email} to team {team_id}: {e}") 

728 raise TeamMemberAddError("Failed to add member to team") from e 

729 

730 async def remove_member_from_team(self, team_id: str, user_email: str, removed_by: Optional[str] = None) -> bool: 

731 """Remove a member from a team. 

732 

733 Args: 

734 team_id: ID of the team 

735 user_email: Email of the user to remove 

736 removed_by: Email of user performing the removal 

737 

738 Returns: 

739 bool: True if member was removed successfully, False otherwise 

740 

741 Raises: 

742 ValueError: If attempting to remove the last owner 

743 

744 Examples: 

745 Team membership management with role-based access control. 

746 After removal, EmailTeamMemberHistory is updated via _log_team_member_action. 

747 """ 

748 try: 

749 team = await self.get_team_by_id(team_id) 

750 if not team: 

751 logger.warning(f"Team {team_id} not found") 

752 return False 

753 

754 # Prevent removing members from personal teams 

755 if team.is_personal: 

756 logger.warning(f"Cannot remove members from personal team {team_id}") 

757 return False 

758 

759 # Find the membership 

760 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

761 

762 if not membership: 

763 logger.warning(f"User {user_email} is not a member of team {team_id}") 

764 return False 

765 

766 # Prevent removing the last owner 

767 if membership.role == "owner": 

768 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

769 

770 if owner_count <= 1: 

771 logger.warning(f"Cannot remove the last owner from team {team_id}") 

772 raise ValueError("Cannot remove the last owner from a team") 

773 

774 # Remove membership (soft delete) 

775 membership.is_active = False 

776 self.db.commit() 

777 self._log_team_member_action(membership.id, team_id, user_email, membership.role, "removed", removed_by) 

778 

779 # Revoke all team-scoped RBAC roles from removed member defensively 

780 # (revoke both owner and member roles to handle edge cases) 

781 try: 

782 for role_name in (settings.default_team_owner_role, settings.default_team_member_role): 

783 rbac_role = await self.role_service.get_role_by_name(role_name, scope="team") 

784 if rbac_role: 

785 revoked = await self.role_service.revoke_role_from_user(user_email=user_email, role_id=rbac_role.id, scope="team", scope_id=team_id) 

786 if revoked: 

787 logger.info(f"Revoked {role_name} role from {user_email} for team {team_id}") 

788 except Exception as role_error: 

789 logger.warning(f"Failed to revoke roles from {user_email}: {role_error}") 

790 

791 # Invalidate auth cache for user's team membership and role 

792 try: 

793 self._fire_and_forget(auth_cache.invalidate_team(user_email)) 

794 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id)) 

795 self._fire_and_forget(auth_cache.invalidate_user_teams(user_email)) 

796 self._fire_and_forget(auth_cache.invalidate_team_membership(user_email)) 

797 except Exception as cache_error: 

798 logger.debug(f"Failed to invalidate cache on team remove: {cache_error}") 

799 

800 # Invalidate member count cache for this team 

801 await self.invalidate_team_member_count_cache(str(team_id)) 

802 

803 logger.info(f"Removed {user_email} from team {team_id} by {removed_by}") 

804 return True 

805 

806 except Exception as e: 

807 self.db.rollback() 

808 logger.error(f"Failed to remove {user_email} from team {team_id}: {e}") 

809 return False 

810 

811 async def update_member_role(self, team_id: str, user_email: str, new_role: str, updated_by: Optional[str] = None) -> bool: 

812 """Update a team member's role. 

813 

814 Args: 

815 team_id: ID of the team 

816 user_email: Email of the user whose role to update 

817 new_role: New role to assign 

818 updated_by: Email of user making the change 

819 

820 Returns: 

821 bool: True if role was updated successfully, False otherwise 

822 

823 Raises: 

824 ValueError: If role is invalid or removing last owner role 

825 

826 Examples: 

827 Role management within teams for access control. 

828 After role update, EmailTeamMemberHistory is updated via _log_team_member_action. 

829 """ 

830 try: 

831 # Validate role 

832 valid_roles = ["owner", "member"] 

833 if new_role not in valid_roles: 

834 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

835 

836 team = await self.get_team_by_id(team_id) 

837 if not team: 

838 logger.warning(f"Team {team_id} not found") 

839 return False 

840 

841 # Prevent updating roles in personal teams 

842 if team.is_personal: 

843 logger.warning(f"Cannot update roles in personal team {team_id}") 

844 return False 

845 

846 # Find the membership 

847 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

848 

849 if not membership: 

850 logger.warning(f"User {user_email} is not a member of team {team_id}") 

851 return False 

852 

853 # Prevent changing the role of the last owner to non-owner 

854 if membership.role == "owner" and new_role != "owner": 

855 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

856 

857 if owner_count <= 1: 

858 logger.warning(f"Cannot remove owner role from the last owner of team {team_id}") 

859 raise ValueError("Cannot remove owner role from the last owner of a team") 

860 

861 # Update the role 

862 old_role = membership.role 

863 membership.role = new_role 

864 self.db.commit() 

865 self._log_team_member_action(membership.id, team_id, user_email, new_role, "role_changed", updated_by) 

866 

867 # Handle RBAC role changes when team membership role changes 

868 if old_role != new_role: 

869 try: 

870 # Get both role types 

871 team_member_role = await self.role_service.get_role_by_name(settings.default_team_member_role, scope="team") 

872 team_owner_role = await self.role_service.get_role_by_name(settings.default_team_owner_role, scope="team") 

873 

874 # Handle role transitions 

875 if old_role == "member" and new_role == "owner": 

876 # member -> owner: revoke member role, assign owner role 

877 if team_member_role: 

878 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id) 

879 if team_owner_role: 

880 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email) 

881 logger.info(f"Transitioned RBAC role from {settings.default_team_member_role} to {settings.default_team_owner_role} for {user_email} in team {team_id}") 

882 

883 elif old_role == "owner" and new_role == "member": 

884 # owner -> member: revoke owner role, assign member role 

885 if team_owner_role: 

886 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id) 

887 if team_member_role: 

888 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email) 

889 logger.info(f"Transitioned RBAC role from {settings.default_team_owner_role} to {settings.default_team_member_role} for {user_email} in team {team_id}") 

890 

891 except Exception as role_error: 

892 logger.warning(f"Failed to update RBAC roles for {user_email} in team {team_id}: {role_error}") 

893 # Don't fail the membership role update if RBAC role update fails 

894 

895 # Invalidate role cache 

896 try: 

897 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id)) 

898 except Exception as cache_error: 

899 logger.debug(f"Failed to invalidate cache on role update: {cache_error}") 

900 

901 logger.info(f"Updated role of {user_email} in team {team_id} to {new_role} by {updated_by}") 

902 return True 

903 

904 except ValueError: 

905 raise # Let ValueError propagate to caller for proper error handling 

906 except Exception as e: 

907 self.db.rollback() 

908 logger.error(f"Failed to update role of {user_email} in team {team_id}: {e}") 

909 return False 

910 

911 async def get_member(self, team_id: str, user_email: str) -> Optional[EmailTeamMember]: 

912 """Get a single team member by team ID and user email. 

913 

914 Args: 

915 team_id: ID of the team 

916 user_email: Email of the user 

917 

918 Returns: 

919 EmailTeamMember if found and active, None otherwise 

920 """ 

921 try: 

922 return self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

923 except Exception as e: 

924 logger.error(f"Failed to get member {user_email} in team {team_id}: {e}") 

925 return None 

926 

927 async def get_user_teams(self, user_email: str, include_personal: bool = True) -> List[EmailTeam]: 

928 """Get all teams a user belongs to. 

929 

930 Uses caching to reduce database queries (called 20+ times per request). 

931 Cache can be disabled via AUTH_CACHE_TEAMS_ENABLED=false. 

932 

933 Args: 

934 user_email: Email of the user 

935 include_personal: Whether to include personal teams 

936 

937 Returns: 

938 List[EmailTeam]: List of teams the user belongs to 

939 

940 Examples: 

941 User dashboard showing team memberships. 

942 """ 

943 # Check cache first 

944 cache = self._get_auth_cache() 

945 cache_key = f"{user_email}:{include_personal}" 

946 

947 if cache: 

948 cached_team_ids = await cache.get_user_teams(cache_key) 

949 if cached_team_ids is not None: 

950 if not cached_team_ids: # Empty list = user has no teams 

951 return [] 

952 # Fetch full team objects by IDs (fast indexed lookup) 

953 try: 

954 teams = self.db.query(EmailTeam).filter(EmailTeam.id.in_(cached_team_ids), EmailTeam.is_active.is_(True)).all() 

955 self.db.commit() # Release transaction to avoid idle-in-transaction 

956 return teams 

957 except Exception as e: 

958 self.db.rollback() 

959 logger.warning(f"Failed to fetch teams by IDs from cache: {e}") 

960 # Fall through to full query 

961 

962 # Cache miss or caching disabled - do full query 

963 try: 

964 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True)) 

965 

966 if not include_personal: 

967 query = query.filter(EmailTeam.is_personal.is_(False)) 

968 

969 teams = query.all() 

970 self.db.commit() # Release transaction to avoid idle-in-transaction 

971 

972 # Update cache with team IDs 

973 if cache: 

974 team_ids = [t.id for t in teams] 

975 await cache.set_user_teams(cache_key, team_ids) 

976 

977 return teams 

978 

979 except Exception as e: 

980 self.db.rollback() 

981 logger.error(f"Failed to get teams for user {user_email}: {e}") 

982 return [] 

983 

984 async def verify_team_for_user(self, user_email, team_id=None): 

985 """ 

986 Retrieve a team ID for a user based on their membership and optionally a specific team ID. 

987 

988 This function attempts to fetch all teams associated with the given user email. 

989 If no `team_id` is provided, it returns the ID of the user's personal team (if any). 

990 If a `team_id` is provided, it checks whether the user is a member of that team. 

991 If the user is not a member of the specified team, it returns a JSONResponse with an error message. 

992 

993 Args: 

994 user_email (str): The email of the user whose teams are being queried. 

995 team_id (str or None, optional): Specific team ID to check for membership. Defaults to None. 

996 

997 Returns: 

998 str or JSONResponse or None: 

999 - If `team_id` is None, returns the ID of the user's personal team or None if not found. 

1000 - If `team_id` is provided and the user is a member of that team, returns `team_id`. 

1001 - If `team_id` is provided but the user is not a member of that team, returns a JSONResponse with error. 

1002 - Returns None if an error occurs and no `team_id` was initially provided. 

1003 

1004 Raises: 

1005 None explicitly, but any exceptions during the process are caught and logged. 

1006 

1007 Examples: 

1008 Verifies user team if team_id provided otherwise finds its personal id. 

1009 """ 

1010 try: 

1011 # Get all teams the user belongs to in a single query 

1012 try: 

1013 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True)) 

1014 user_teams = query.all() 

1015 self.db.commit() # Release transaction to avoid idle-in-transaction 

1016 except Exception as e: 

1017 self.db.rollback() 

1018 logger.error(f"Failed to get teams for user {user_email}: {e}") 

1019 return [] 

1020 

1021 if not team_id: 

1022 # If no team_id is provided, try to get the personal team 

1023 personal_team = next((t for t in user_teams if getattr(t, "is_personal", False)), None) 

1024 team_id = personal_team.id if personal_team else None 

1025 else: 

1026 # Check if the provided team_id exists among the user's teams 

1027 is_team_present = any(team.id == team_id for team in user_teams) 

1028 if not is_team_present: 

1029 return [] 

1030 except Exception as e: 

1031 self.db.rollback() 

1032 print(f"An error occurred: {e}") 

1033 if not team_id: 

1034 team_id = None 

1035 

1036 return team_id 

1037 

1038 async def get_team_members( 

1039 self, 

1040 team_id: str, 

1041 cursor: Optional[str] = None, 

1042 limit: Optional[int] = None, 

1043 page: Optional[int] = None, 

1044 per_page: Optional[int] = None, 

1045 ) -> Union[List[Tuple[EmailUser, EmailTeamMember]], Tuple[List[Tuple[EmailUser, EmailTeamMember]], Optional[str]], Dict[str, Any]]: 

1046 """Get all members of a team with optional cursor or page-based pagination. 

1047 

1048 Note: This method returns ORM objects and cannot be cached since callers 

1049 depend on ORM attributes and methods. 

1050 

1051 Args: 

1052 team_id: ID of the team 

1053 cursor: Opaque cursor token for cursor-based pagination 

1054 limit: Maximum number of members to return (for cursor-based, default: 50) 

1055 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1056 per_page: Items per page for page-based pagination (default: 30) 

1057 

1058 Returns: 

1059 - If cursor is provided: Tuple (members, next_cursor) 

1060 - If page is provided: Dict with keys 'data', 'pagination', 'links' 

1061 - If neither: List of all members (backward compatibility) 

1062 

1063 Examples: 

1064 Team member management and role display. 

1065 """ 

1066 try: 

1067 # Build base query - for pagination, select EmailTeamMember and eager-load user 

1068 # For backward compat (no pagination), select both entities as tuple 

1069 if cursor is None and page is None and limit is None: 

1070 # Backward compatibility: return tuples (no pagination requested) 

1071 query = ( 

1072 select(EmailUser, EmailTeamMember) 

1073 .join(EmailTeamMember, EmailUser.email == EmailTeamMember.user_email) 

1074 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

1075 .order_by(EmailUser.full_name, EmailUser.email) 

1076 ) 

1077 result = self.db.execute(query) 

1078 members = list(result.all()) 

1079 self.db.commit() 

1080 return members 

1081 

1082 # For pagination: select EmailTeamMember and eager-load user to avoid N+1 

1083 query = ( 

1084 select(EmailTeamMember) 

1085 .options(selectinload(EmailTeamMember.user)) 

1086 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

1087 .join(EmailUser, EmailUser.email == EmailTeamMember.user_email) 

1088 ) 

1089 

1090 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate 

1091 if page is not None: 

1092 # Alphabetical ordering for user-friendly display 

1093 query = query.order_by(EmailUser.full_name, EmailUser.email) 

1094 pag_result = await unified_paginate( 

1095 db=self.db, 

1096 query=query, 

1097 page=page, 

1098 per_page=per_page or 30, 

1099 cursor=None, 

1100 limit=None, 

1101 base_url=f"/admin/teams/{team_id}/members", 

1102 query_params={}, 

1103 ) 

1104 self.db.commit() 

1105 memberships = pag_result["data"] 

1106 tuples = [(m.user, m) for m in memberships] 

1107 return { 

1108 "data": tuples, 

1109 "pagination": pag_result["pagination"], 

1110 "links": pag_result["links"], 

1111 } 

1112 

1113 # CURSOR-BASED PAGINATION (API) - custom implementation using (joined_at, id) 

1114 # unified_paginate uses created_at which doesn't exist on EmailTeamMember 

1115 

1116 # Order by joined_at DESC, id DESC for keyset pagination 

1117 query = query.order_by(desc(EmailTeamMember.joined_at), desc(EmailTeamMember.id)) 

1118 

1119 # Decode cursor and apply keyset filter 

1120 if cursor: 

1121 try: 

1122 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

1123 cursor_data = orjson.loads(cursor_json) 

1124 last_id = cursor_data.get("id") 

1125 joined_str = cursor_data.get("joined_at") 

1126 if last_id and joined_str: 

1127 last_joined = datetime.fromisoformat(joined_str) 

1128 # Keyset filter: (joined_at < last) OR (joined_at = last AND id < last_id) 

1129 query = query.where( 

1130 or_( 

1131 EmailTeamMember.joined_at < last_joined, 

1132 and_(EmailTeamMember.joined_at == last_joined, EmailTeamMember.id < last_id), 

1133 ) 

1134 ) 

1135 except (ValueError, TypeError) as e: 

1136 logger.warning(f"Invalid cursor for team members, ignoring: {e}") 

1137 

1138 # Fetch limit + 1 to check for more results (cap at max_page_size) 

1139 page_size = min(limit or 50, settings.pagination_max_page_size) 

1140 query = query.limit(page_size + 1) 

1141 memberships = list(self.db.execute(query).scalars().all()) 

1142 

1143 # Check if there are more results 

1144 has_more = len(memberships) > page_size 

1145 if has_more: 

1146 memberships = memberships[:page_size] 

1147 

1148 # Generate next cursor using (joined_at, id) 

1149 next_cursor = None 

1150 if has_more and memberships: 

1151 last_member = memberships[-1] 

1152 cursor_data = { 

1153 "joined_at": last_member.joined_at.isoformat() if last_member.joined_at else None, 

1154 "id": last_member.id, 

1155 } 

1156 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

1157 

1158 self.db.commit() 

1159 tuples = [(m.user, m) for m in memberships] 

1160 return (tuples, next_cursor) 

1161 

1162 except Exception as e: 

1163 self.db.rollback() 

1164 logger.error(f"Failed to get members for team {team_id}: {e}") 

1165 

1166 # Return appropriate empty response based on mode 

1167 if page is not None: 

1168 return {"data": [], "pagination": {"page": page, "per_page": per_page or 30, "total": 0, "has_next": False, "has_prev": False}, "links": None} 

1169 

1170 if cursor is not None: 

1171 return ([], None) 

1172 

1173 return [] 

1174 

1175 def count_team_owners(self, team_id: str) -> int: 

1176 """Count the number of owners in a team using SQL COUNT. 

1177 

1178 This is more efficient than loading all members and counting in Python. 

1179 

1180 Args: 

1181 team_id: ID of the team 

1182 

1183 Returns: 

1184 int: Number of active owners in the team 

1185 """ 

1186 count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

1187 self.db.commit() # Release transaction to avoid idle-in-transaction 

1188 return count 

1189 

1190 def _get_auth_cache(self): 

1191 """Get auth cache instance lazily. 

1192 

1193 Returns: 

1194 AuthCache instance or None if unavailable. 

1195 """ 

1196 try: 

1197 return get_auth_cache() 

1198 except ImportError: 

1199 return None 

1200 

1201 def _get_admin_stats_cache(self): 

1202 """Get admin stats cache instance lazily. 

1203 

1204 Returns: 

1205 AdminStatsCache instance or None if unavailable. 

1206 """ 

1207 try: 

1208 # First-Party 

1209 from mcpgateway.cache.admin_stats_cache import get_admin_stats_cache # pylint: disable=import-outside-toplevel 

1210 

1211 return get_admin_stats_cache() 

1212 except ImportError: 

1213 return None 

1214 

1215 async def get_user_role_in_team(self, user_email: str, team_id: str) -> Optional[str]: 

1216 """Get a user's role in a specific team. 

1217 

1218 Uses caching to reduce database queries (called 11+ times per team operation). 

1219 

1220 Args: 

1221 user_email: Email of the user 

1222 team_id: ID of the team 

1223 

1224 Returns: 

1225 str: User's role or None if not a member 

1226 

1227 Examples: 

1228 Access control and permission checking. 

1229 """ 

1230 # Check cache first 

1231 cache = self._get_auth_cache() 

1232 if cache: 

1233 cached_role = await cache.get_user_role(user_email, team_id) 

1234 if cached_role is not None: 

1235 # Empty string means "not a member" (cached None) 

1236 return cached_role if cached_role else None 

1237 

1238 try: 

1239 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).first() 

1240 self.db.commit() # Release transaction to avoid idle-in-transaction 

1241 

1242 role = membership.role if membership else None 

1243 

1244 # Store in cache 

1245 if cache: 

1246 await cache.set_user_role(user_email, team_id, role) 

1247 

1248 return role 

1249 

1250 except Exception as e: 

1251 self.db.rollback() 

1252 logger.error(f"Failed to get role for {user_email} in team {team_id}: {e}") 

1253 return None 

1254 

1255 async def list_teams( 

1256 self, 

1257 # Unified pagination params 

1258 limit: int = 100, 

1259 offset: int = 0, 

1260 cursor: Optional[str] = None, 

1261 page: Optional[int] = None, 

1262 per_page: int = 50, 

1263 include_inactive: bool = False, 

1264 visibility_filter: Optional[str] = None, 

1265 base_url: Optional[str] = None, 

1266 include_personal: bool = False, 

1267 search_query: Optional[str] = None, 

1268 personal_owner_email: Optional[str] = None, 

1269 ) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]: 

1270 """List teams with pagination support (cursor or page based). 

1271 

1272 Args: 

1273 limit: Max items for cursor pagination 

1274 offset: Offset for legacy/cursor pagination 

1275 cursor: Cursor token 

1276 page: Page number (1-indexed) 

1277 per_page: Items per page 

1278 include_inactive: Whether to include inactive teams 

1279 visibility_filter: Filter by visibility (private, team, public) 

1280 base_url: Base URL for pagination links 

1281 include_personal: Whether to include personal teams 

1282 search_query: Search term for name/slug/description 

1283 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams 

1284 

1285 Returns: 

1286 Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]: 

1287 - Tuple (teams, next_cursor) if cursor/offset based 

1288 - Dict {data, pagination, links} if page based 

1289 """ 

1290 query = select(EmailTeam) 

1291 

1292 if not include_personal: 

1293 if personal_owner_email: 

1294 query = query.where( 

1295 or_( 

1296 EmailTeam.is_personal.is_(False), 

1297 and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email), 

1298 ) 

1299 ) 

1300 else: 

1301 query = query.where(EmailTeam.is_personal.is_(False)) 

1302 

1303 if not include_inactive: 

1304 query = query.where(EmailTeam.is_active.is_(True)) 

1305 

1306 if visibility_filter: 

1307 query = query.where(EmailTeam.visibility == visibility_filter) 

1308 

1309 if search_query: 

1310 search_term = f"%{search_query}%" 

1311 query = query.where( 

1312 or_( 

1313 EmailTeam.name.ilike(search_term), 

1314 EmailTeam.slug.ilike(search_term), 

1315 EmailTeam.description.ilike(search_term), 

1316 ) 

1317 ) 

1318 

1319 # Choose ordering based on pagination mode: 

1320 # - Page-based (UI): alphabetical by name for user-friendly display 

1321 # - Cursor-based (API): created_at DESC, id DESC to match unified_paginate expectations 

1322 if page is not None: 

1323 query = query.order_by(EmailTeam.name, EmailTeam.id) 

1324 else: 

1325 query = query.order_by(desc(EmailTeam.created_at), desc(EmailTeam.id)) 

1326 

1327 # Base URL for pagination links (default to admin partial if not provided) 

1328 if not base_url: 

1329 base_url = f"{settings.app_root_path}/admin/teams/partial" 

1330 

1331 # Apply offset manually for legacy offset-based pagination if not using page or cursor 

1332 if not page and not cursor and offset > 0: 

1333 query = query.offset(offset) 

1334 

1335 result = await unified_paginate( 

1336 db=self.db, 

1337 query=query, 

1338 cursor=cursor, 

1339 limit=limit, 

1340 page=page, 

1341 per_page=per_page, 

1342 base_url=base_url, 

1343 ) 

1344 self.db.commit() # Release transaction to avoid idle-in-transaction 

1345 return result 

1346 

1347 async def get_all_team_ids( 

1348 self, 

1349 include_inactive: bool = False, 

1350 visibility_filter: Optional[str] = None, 

1351 include_personal: bool = False, 

1352 search_query: Optional[str] = None, 

1353 personal_owner_email: Optional[str] = None, 

1354 ) -> List[int]: 

1355 """Get all team IDs matching criteria (unpaginated). 

1356 

1357 Args: 

1358 include_inactive: Whether to include inactive teams 

1359 visibility_filter: Filter by visibility (private, team, public) 

1360 include_personal: Whether to include personal teams 

1361 search_query: Search term for name/slug 

1362 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams 

1363 

1364 Returns: 

1365 List[int]: List of team IDs 

1366 """ 

1367 query = select(EmailTeam.id) 

1368 

1369 if not include_personal: 

1370 if personal_owner_email: 

1371 query = query.where( 

1372 or_( 

1373 EmailTeam.is_personal.is_(False), 

1374 and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email), 

1375 ) 

1376 ) 

1377 else: 

1378 query = query.where(EmailTeam.is_personal.is_(False)) 

1379 

1380 if not include_inactive: 

1381 query = query.where(EmailTeam.is_active.is_(True)) 

1382 

1383 if visibility_filter: 

1384 query = query.where(EmailTeam.visibility == visibility_filter) 

1385 

1386 if search_query: 

1387 search_term = f"%{search_query}%" 

1388 query = query.where( 

1389 or_( 

1390 EmailTeam.name.ilike(search_term), 

1391 EmailTeam.slug.ilike(search_term), 

1392 ) 

1393 ) 

1394 

1395 result = self.db.execute(query) 

1396 team_ids = [row[0] for row in result.all()] 

1397 self.db.commit() # Release transaction to avoid idle-in-transaction 

1398 return team_ids 

1399 

1400 async def get_teams_count( 

1401 self, 

1402 include_inactive: bool = False, 

1403 visibility_filter: Optional[str] = None, 

1404 include_personal: bool = False, 

1405 search_query: Optional[str] = None, 

1406 ) -> int: 

1407 """Get total count of teams matching criteria. 

1408 

1409 Args: 

1410 include_inactive: Whether to include inactive teams 

1411 visibility_filter: Filter by visibility (private, team, public) 

1412 include_personal: Whether to include personal teams 

1413 search_query: Search term for name/slug 

1414 

1415 Returns: 

1416 int: Total count of matching teams 

1417 """ 

1418 query = select(func.count(EmailTeam.id)) # pylint: disable=not-callable 

1419 

1420 if not include_personal: 

1421 query = query.where(EmailTeam.is_personal.is_(False)) 

1422 

1423 if not include_inactive: 

1424 query = query.where(EmailTeam.is_active.is_(True)) 

1425 

1426 if visibility_filter: 

1427 query = query.where(EmailTeam.visibility == visibility_filter) 

1428 

1429 if search_query: 

1430 search_term = f"%{search_query}%" 

1431 query = query.where( 

1432 or_( 

1433 EmailTeam.name.ilike(search_term), 

1434 EmailTeam.slug.ilike(search_term), 

1435 ) 

1436 ) 

1437 

1438 result = self.db.execute(query) 

1439 count = result.scalar() or 0 

1440 self.db.commit() # Release transaction to avoid idle-in-transaction 

1441 return count 

1442 

1443 async def discover_public_teams(self, user_email: str, skip: int = 0, limit: Optional[int] = None) -> List[EmailTeam]: 

1444 """Discover public teams that user can join. 

1445 

1446 Args: 

1447 user_email: Email of the user discovering teams 

1448 skip: Number of teams to skip for pagination 

1449 limit: Maximum number of teams to return (None for unlimited) 

1450 

1451 Returns: 

1452 List[EmailTeam]: List of public teams user can join 

1453 

1454 Raises: 

1455 Exception: If discovery fails 

1456 """ 

1457 try: 

1458 # Optimized: Use subquery instead of loading all IDs into memory (2 queries → 1) 

1459 user_team_subquery = select(EmailTeamMember.team_id).where(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).scalar_subquery() 

1460 

1461 query = self.db.query(EmailTeam).filter(EmailTeam.visibility == "public", EmailTeam.is_active.is_(True), EmailTeam.is_personal.is_(False), ~EmailTeam.id.in_(user_team_subquery)) 

1462 

1463 query = query.offset(skip) 

1464 if limit is not None: 

1465 query = query.limit(limit) 

1466 teams = query.all() 

1467 self.db.commit() # Release transaction to avoid idle-in-transaction 

1468 return teams 

1469 

1470 except Exception as e: 

1471 self.db.rollback() 

1472 logger.error(f"Failed to discover public teams for {user_email}: {e}") 

1473 return [] 

1474 

1475 async def create_join_request(self, team_id: str, user_email: str, message: Optional[str] = None) -> "EmailTeamJoinRequest": 

1476 """Create a request to join a public team. 

1477 

1478 Args: 

1479 team_id: ID of the team to join 

1480 user_email: Email of the user requesting to join 

1481 message: Optional message to team owners 

1482 

1483 Returns: 

1484 EmailTeamJoinRequest: Created join request 

1485 

1486 Raises: 

1487 ValueError: If team not found, not public, or user already member/has pending request 

1488 """ 

1489 try: 

1490 # Validate team 

1491 team = await self.get_team_by_id(team_id) 

1492 if not team: 

1493 raise ValueError("Team not found") 

1494 

1495 if team.visibility != "public": 

1496 raise ValueError("Can only request to join public teams") 

1497 

1498 # Check if user is already a member 

1499 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

1500 

1501 if existing_member: 

1502 raise ValueError("User is already a member of this team") 

1503 

1504 # Check for existing requests (any status) 

1505 existing_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.user_email == user_email).first() 

1506 

1507 if existing_request: 

1508 if existing_request.status == "pending" and not existing_request.is_expired(): 

1509 raise ValueError("User already has a pending join request for this team") 

1510 

1511 # Update existing request (cancelled, rejected, expired) to pending 

1512 existing_request.message = message or "" 

1513 existing_request.status = "pending" 

1514 existing_request.requested_at = utc_now() 

1515 existing_request.expires_at = utc_now() + timedelta(days=7) 

1516 existing_request.reviewed_at = None 

1517 existing_request.reviewed_by = None 

1518 existing_request.notes = None 

1519 join_request = existing_request 

1520 else: 

1521 # Create new join request 

1522 join_request = EmailTeamJoinRequest(team_id=team_id, user_email=user_email, message=message, expires_at=utc_now() + timedelta(days=7)) 

1523 self.db.add(join_request) 

1524 

1525 self.db.commit() 

1526 self.db.refresh(join_request) 

1527 

1528 logger.info(f"Created join request for user {user_email} to team {team_id}") 

1529 return join_request 

1530 

1531 except Exception as e: 

1532 self.db.rollback() 

1533 logger.error(f"Failed to create join request: {e}") 

1534 raise 

1535 

1536 async def list_join_requests(self, team_id: str) -> List["EmailTeamJoinRequest"]: 

1537 """List pending join requests for a team. 

1538 

1539 Args: 

1540 team_id: ID of the team 

1541 

1542 Returns: 

1543 List[EmailTeamJoinRequest]: List of pending join requests 

1544 """ 

1545 try: 

1546 requests = ( 

1547 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.status == "pending").order_by(EmailTeamJoinRequest.requested_at.desc()).all() 

1548 ) 

1549 return requests 

1550 

1551 except Exception as e: 

1552 logger.error(f"Failed to list join requests for team {team_id}: {e}") 

1553 return [] 

1554 

1555 async def approve_join_request(self, request_id: str, approved_by: str) -> Optional[EmailTeamMember]: 

1556 """Approve a team join request. 

1557 

1558 Args: 

1559 request_id: ID of the join request 

1560 approved_by: Email of the user approving the request 

1561 

1562 Returns: 

1563 EmailTeamMember: New team member or None if request not found 

1564 

1565 Raises: 

1566 ValueError: If request not found, expired, or already processed 

1567 """ 

1568 try: 

1569 # Get join request 

1570 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first() 

1571 

1572 if not join_request: 

1573 raise ValueError("Join request not found or already processed") 

1574 

1575 if join_request.is_expired(): 

1576 join_request.status = "expired" 

1577 self.db.commit() 

1578 raise ValueError("Join request has expired") 

1579 

1580 # Check max teams per user 

1581 max_teams = getattr(settings, "max_teams_per_user", 50) 

1582 if self._get_user_team_count(join_request.user_email) >= max_teams: 

1583 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

1584 

1585 # Add user to team 

1586 member = EmailTeamMember(team_id=join_request.team_id, user_email=join_request.user_email, role="member", invited_by=approved_by, joined_at=utc_now()) # New joiners are always members 

1587 

1588 self.db.add(member) 

1589 # Update join request status 

1590 join_request.status = "approved" 

1591 join_request.reviewed_at = utc_now() 

1592 join_request.reviewed_by = approved_by 

1593 

1594 self.db.flush() 

1595 self._log_team_member_action(member.id, join_request.team_id, join_request.user_email, member.role, "added", approved_by) 

1596 

1597 self.db.refresh(member) 

1598 

1599 # Assign team-scoped RBAC role matching the membership role 

1600 try: 

1601 rbac_role_name = self._get_rbac_role_name(member.role) 

1602 team_rbac_role = await self.role_service.get_role_by_name(rbac_role_name, scope="team") 

1603 if team_rbac_role: 

1604 existing = await self.role_service.get_user_role_assignment(user_email=join_request.user_email, role_id=team_rbac_role.id, scope="team", scope_id=join_request.team_id) 

1605 if not existing or not existing.is_active: 

1606 await self.role_service.assign_role_to_user(user_email=join_request.user_email, role_id=team_rbac_role.id, scope="team", scope_id=join_request.team_id, granted_by=approved_by) 

1607 logger.info(f"Assigned {rbac_role_name} role to {join_request.user_email} for team {join_request.team_id}") 

1608 else: 

1609 logger.debug(f"User {join_request.user_email} already has active {rbac_role_name} role for team {join_request.team_id}") 

1610 else: 

1611 logger.warning(f"Role '{rbac_role_name}' not found. User {join_request.user_email} added without RBAC role.") 

1612 except Exception as role_error: 

1613 logger.warning(f"Failed to assign role to {join_request.user_email}: {role_error}") 

1614 

1615 # Invalidate auth cache for user's team membership and role 

1616 try: 

1617 self._fire_and_forget(auth_cache.invalidate_team(join_request.user_email)) 

1618 self._fire_and_forget(auth_cache.invalidate_user_role(join_request.user_email, join_request.team_id)) 

1619 self._fire_and_forget(auth_cache.invalidate_user_teams(join_request.user_email)) 

1620 self._fire_and_forget(auth_cache.invalidate_team_membership(join_request.user_email)) 

1621 self._fire_and_forget(admin_stats_cache.invalidate_teams()) 

1622 except Exception as cache_error: 

1623 logger.debug(f"Failed to invalidate caches on join approval: {cache_error}") 

1624 

1625 # Invalidate member count cache for this team 

1626 await self.invalidate_team_member_count_cache(str(join_request.team_id)) 

1627 

1628 logger.info(f"Approved join request {request_id}: user {join_request.user_email} joined team {join_request.team_id}") 

1629 return member 

1630 

1631 except Exception as e: 

1632 self.db.rollback() 

1633 logger.error(f"Failed to approve join request {request_id}: {e}") 

1634 raise 

1635 

1636 async def reject_join_request(self, request_id: str, rejected_by: str) -> bool: 

1637 """Reject a team join request. 

1638 

1639 Args: 

1640 request_id: ID of the join request 

1641 rejected_by: Email of the user rejecting the request 

1642 

1643 Returns: 

1644 bool: True if request was rejected successfully 

1645 

1646 Raises: 

1647 ValueError: If request not found or already processed 

1648 """ 

1649 try: 

1650 # Get join request 

1651 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first() 

1652 

1653 if not join_request: 

1654 raise ValueError("Join request not found or already processed") 

1655 

1656 # Update join request status 

1657 join_request.status = "rejected" 

1658 join_request.reviewed_at = utc_now() 

1659 join_request.reviewed_by = rejected_by 

1660 

1661 self.db.commit() 

1662 

1663 logger.info(f"Rejected join request {request_id}: user {join_request.user_email} for team {join_request.team_id}") 

1664 return True 

1665 

1666 except Exception as e: 

1667 self.db.rollback() 

1668 logger.error(f"Failed to reject join request {request_id}: {e}") 

1669 raise 

1670 

1671 async def get_user_join_requests(self, user_email: str, team_id: Optional[str] = None) -> List["EmailTeamJoinRequest"]: 

1672 """Get join requests made by a user. 

1673 

1674 Args: 

1675 user_email: Email of the user 

1676 team_id: Optional team ID to filter requests 

1677 

1678 Returns: 

1679 List[EmailTeamJoinRequest]: List of join requests made by the user 

1680 

1681 Examples: 

1682 Get all requests made by a user or for a specific team. 

1683 """ 

1684 try: 

1685 query = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email) 

1686 

1687 if team_id: 

1688 query = query.filter(EmailTeamJoinRequest.team_id == team_id) 

1689 

1690 requests = query.all() 

1691 return requests 

1692 

1693 except Exception as e: 

1694 logger.error(f"Failed to get join requests for user {user_email}: {e}") 

1695 return [] 

1696 

1697 async def cancel_join_request(self, request_id: str, user_email: str) -> bool: 

1698 """Cancel a join request. 

1699 

1700 Args: 

1701 request_id: ID of the join request to cancel 

1702 user_email: Email of the user canceling the request 

1703 

1704 Returns: 

1705 bool: True if canceled successfully, False otherwise 

1706 

1707 Examples: 

1708 Allow users to cancel their pending join requests. 

1709 """ 

1710 try: 

1711 # Get the join request 

1712 join_request = ( 

1713 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.status == "pending").first() 

1714 ) 

1715 

1716 if not join_request: 

1717 logger.warning(f"Join request {request_id} not found for user {user_email} or not pending") 

1718 return False 

1719 

1720 # Update join request status 

1721 join_request.status = "cancelled" 

1722 join_request.reviewed_at = utc_now() 

1723 join_request.reviewed_by = user_email 

1724 

1725 self.db.commit() 

1726 

1727 logger.info(f"Cancelled join request {request_id} by user {user_email}") 

1728 return True 

1729 

1730 except Exception as e: 

1731 self.db.rollback() 

1732 logger.error(f"Failed to cancel join request {request_id}: {e}") 

1733 return False 

1734 

1735 # ================================================================================== 

1736 # Batch Query Methods (N+1 Query Elimination - Issue #1892) 

1737 # ================================================================================== 

1738 

1739 def get_member_counts_batch(self, team_ids: List[str]) -> Dict[str, int]: 

1740 """Get member counts for multiple teams in a single query. 

1741 

1742 This is a synchronous method following the existing service pattern. 

1743 Note: Like other sync SQLAlchemy calls, this will block the event 

1744 loop in async contexts. For typical team counts this is acceptable. 

1745 

1746 Args: 

1747 team_ids: List of team UUIDs 

1748 

1749 Returns: 

1750 Dict mapping team_id to member count 

1751 

1752 Raises: 

1753 Exception: Re-raises any database errors after rollback 

1754 

1755 Examples: 

1756 >>> from unittest.mock import Mock 

1757 >>> service = TeamManagementService(Mock()) 

1758 >>> service.get_member_counts_batch([]) 

1759 {} 

1760 """ 

1761 if not team_ids: 

1762 return {} 

1763 

1764 try: 

1765 # Single query for all teams 

1766 results = ( 

1767 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable 

1768 .filter(EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True)) 

1769 .group_by(EmailTeamMember.team_id) 

1770 .all() 

1771 ) 

1772 

1773 self.db.commit() # Release transaction to avoid idle-in-transaction 

1774 

1775 # Build result dict, defaulting to 0 for teams with no members 

1776 counts = {str(row.team_id): row.count for row in results} 

1777 return {tid: counts.get(tid, 0) for tid in team_ids} 

1778 except Exception as e: 

1779 self.db.rollback() 

1780 logger.error(f"Failed to get member counts for teams: {e}") 

1781 raise 

1782 

1783 def get_user_roles_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[str]]: 

1784 """Get a user's role in multiple teams in a single query. 

1785 

1786 Args: 

1787 user_email: Email of the user 

1788 team_ids: List of team UUIDs 

1789 

1790 Returns: 

1791 Dict mapping team_id to role (or None if not a member) 

1792 

1793 Raises: 

1794 Exception: Re-raises any database errors after rollback 

1795 """ 

1796 if not team_ids: 

1797 return {} 

1798 

1799 try: 

1800 # Single query for all teams 

1801 results = ( 

1802 self.db.query(EmailTeamMember.team_id, EmailTeamMember.role) 

1803 .filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True)) 

1804 .all() 

1805 ) 

1806 

1807 self.db.commit() # Release transaction to avoid idle-in-transaction 

1808 

1809 # Build result dict - teams with no membership return None 

1810 roles = {str(row.team_id): row.role for row in results} 

1811 return {tid: roles.get(tid) for tid in team_ids} 

1812 except Exception as e: 

1813 self.db.rollback() 

1814 logger.error(f"Failed to get user roles for {user_email}: {e}") 

1815 raise 

1816 

1817 def get_pending_join_requests_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[Any]]: 

1818 """Get pending join requests for a user across multiple teams in a single query. 

1819 

1820 Args: 

1821 user_email: Email of the user 

1822 team_ids: List of team UUIDs to check 

1823 

1824 Returns: 

1825 Dict mapping team_id to pending EmailTeamJoinRequest (or None if no pending request) 

1826 

1827 Raises: 

1828 Exception: Re-raises any database errors after rollback 

1829 """ 

1830 if not team_ids: 

1831 return {} 

1832 

1833 try: 

1834 # Single query for all pending requests across teams 

1835 results = ( 

1836 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.team_id.in_(team_ids), EmailTeamJoinRequest.status == "pending").all() 

1837 ) 

1838 

1839 self.db.commit() # Release transaction to avoid idle-in-transaction 

1840 

1841 # Build result dict - only one pending request per team expected 

1842 pending_reqs = {str(req.team_id): req for req in results} 

1843 return {tid: pending_reqs.get(tid) for tid in team_ids} 

1844 except Exception as e: 

1845 self.db.rollback() 

1846 logger.error(f"Failed to get pending join requests for {user_email}: {e}") 

1847 raise 

1848 

1849 # ================================================================================== 

1850 # Cached Batch Methods (Redis caching for member counts) 

1851 # ================================================================================== 

1852 

1853 def _get_member_count_cache_key(self, team_id: str) -> str: 

1854 """Build cache key using settings.cache_prefix for consistency. 

1855 

1856 Args: 

1857 team_id: Team UUID to build cache key for 

1858 

1859 Returns: 

1860 Cache key string in format "{prefix}team:member_count:{team_id}" 

1861 """ 

1862 cache_prefix = getattr(settings, "cache_prefix", "mcpgw:") 

1863 return f"{cache_prefix}team:member_count:{team_id}" 

1864 

1865 async def get_member_counts_batch_cached(self, team_ids: List[str]) -> Dict[str, int]: 

1866 """Get member counts for multiple teams, using Redis cache with DB fallback. 

1867 

1868 Caching behavior is controlled by settings: 

1869 - team_member_count_cache_enabled: Enable/disable caching (default: True) 

1870 - team_member_count_cache_ttl: Cache TTL in seconds (default: 300) 

1871 

1872 Args: 

1873 team_ids: List of team UUIDs 

1874 

1875 Returns: 

1876 Dict mapping team_id to member count 

1877 

1878 Raises: 

1879 Exception: Re-raises any database errors after rollback 

1880 """ 

1881 if not team_ids: 

1882 return {} 

1883 

1884 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True) 

1885 cache_ttl = getattr(settings, "team_member_count_cache_ttl", 300) 

1886 

1887 # If caching disabled, go straight to batch DB query 

1888 if not cache_enabled: 

1889 return self.get_member_counts_batch(team_ids) 

1890 

1891 try: 

1892 redis_client = await get_redis_client() 

1893 except Exception: 

1894 redis_client = None 

1895 

1896 result: Dict[str, int] = {} 

1897 cache_misses: List[str] = [] 

1898 

1899 # Step 1: Check Redis cache for all team IDs 

1900 if redis_client: 

1901 try: 

1902 cache_keys = [self._get_member_count_cache_key(tid) for tid in team_ids] 

1903 cached_values = await redis_client.mget(cache_keys) 

1904 

1905 for tid, cached in zip(team_ids, cached_values): 

1906 if cached is not None: 

1907 result[tid] = int(cached) 

1908 else: 

1909 cache_misses.append(tid) 

1910 except Exception as e: 

1911 logger.warning(f"Redis cache read failed, falling back to DB: {e}") 

1912 cache_misses = list(team_ids) 

1913 else: 

1914 # No Redis available, fall back to DB 

1915 cache_misses = list(team_ids) 

1916 

1917 # Step 2: Query database for cache misses 

1918 if cache_misses: 

1919 try: 

1920 db_results = ( 

1921 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable 

1922 .filter(EmailTeamMember.team_id.in_(cache_misses), EmailTeamMember.is_active.is_(True)) 

1923 .group_by(EmailTeamMember.team_id) 

1924 .all() 

1925 ) 

1926 

1927 self.db.commit() 

1928 

1929 db_counts = {str(row.team_id): row.count for row in db_results} 

1930 

1931 # Fill in results and cache them 

1932 for tid in cache_misses: 

1933 count = db_counts.get(tid, 0) 

1934 result[tid] = count 

1935 

1936 # Step 3: Cache the result with configured TTL 

1937 if redis_client: 

1938 try: 

1939 await redis_client.setex(self._get_member_count_cache_key(tid), cache_ttl, str(count)) 

1940 except Exception as e: 

1941 logger.warning(f"Redis cache write failed for team {tid}: {e}") 

1942 

1943 except Exception as e: 

1944 self.db.rollback() 

1945 logger.error(f"Failed to get member counts for teams: {e}") 

1946 raise 

1947 

1948 return result 

1949 

1950 async def invalidate_team_member_count_cache(self, team_id: str) -> None: 

1951 """Invalidate the cached member count for a team. 

1952 

1953 Call this after any membership changes (add/remove/update). 

1954 No-op if caching is disabled or Redis unavailable. 

1955 

1956 Args: 

1957 team_id: Team UUID to invalidate 

1958 """ 

1959 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True) 

1960 if not cache_enabled: 

1961 return 

1962 

1963 try: 

1964 redis_client = await get_redis_client() 

1965 if redis_client: 

1966 await redis_client.delete(self._get_member_count_cache_key(team_id)) 

1967 except Exception as e: 

1968 logger.warning(f"Failed to invalidate member count cache for team {team_id}: {e}")