Coverage for mcpgateway / services / team_management_service.py: 97%

796 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/team_management_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Management Service. 

8This module provides team creation, management, and membership operations 

9for the multi-team collaboration system. 

10 

11Examples: 

12 >>> from unittest.mock import Mock 

13 >>> service = TeamManagementService(Mock()) 

14 >>> isinstance(service, TeamManagementService) 

15 True 

16 >>> hasattr(service, 'db') 

17 True 

18""" 

19 

20# Standard 

21import asyncio 

22import base64 

23from datetime import datetime, timedelta 

24from typing import Any, Dict, List, Optional, Tuple, Union 

25 

26# Third-Party 

27import orjson 

28from sqlalchemy import and_, desc, func, or_, select 

29from sqlalchemy.orm import selectinload, Session 

30 

31# First-Party 

32from mcpgateway.cache.admin_stats_cache import admin_stats_cache 

33from mcpgateway.cache.auth_cache import auth_cache, get_auth_cache 

34from mcpgateway.common.validators import SecurityValidator 

35from mcpgateway.config import settings 

36from mcpgateway.db import EmailTeam, EmailTeamJoinRequest, EmailTeamMember, EmailTeamMemberHistory, EmailUser, utc_now 

37from mcpgateway.services.logging_service import LoggingService 

38from mcpgateway.utils.create_slug import slugify 

39from mcpgateway.utils.pagination import unified_paginate 

40from mcpgateway.utils.redis_client import get_redis_client 

41 

42# Initialize logging 

43logging_service = LoggingService() 

44logger = logging_service.get_logger(__name__) 

45 

46 

47def get_user_team_count(db: Session, user_email: str) -> int: 

48 """Get the number of active teams a user belongs to. 

49 

50 Args: 

51 db: SQLAlchemy database session 

52 user_email: Email address of the user 

53 

54 Returns: 

55 int: Number of active team memberships 

56 """ 

57 return db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).count() 

58 

59 

60class TeamManagementError(Exception): 

61 """Base class for team management-related errors. 

62 

63 Examples: 

64 >>> error = TeamManagementError("Test error") 

65 >>> str(error) 

66 'Test error' 

67 >>> isinstance(error, Exception) 

68 True 

69 """ 

70 

71 

72class InvalidRoleError(TeamManagementError): 

73 """Raised when an invalid role is specified. 

74 

75 Examples: 

76 >>> error = InvalidRoleError("Invalid role: guest") 

77 >>> str(error) 

78 'Invalid role: guest' 

79 >>> isinstance(error, TeamManagementError) 

80 True 

81 """ 

82 

83 

84class TeamNotFoundError(TeamManagementError): 

85 """Raised when a team does not exist. 

86 

87 Examples: 

88 >>> error = TeamNotFoundError("Team not found: team-123") 

89 >>> str(error) 

90 'Team not found: team-123' 

91 >>> isinstance(error, TeamManagementError) 

92 True 

93 """ 

94 

95 

96class UserNotFoundError(TeamManagementError): 

97 """Raised when a user does not exist. 

98 

99 Examples: 

100 >>> error = UserNotFoundError("User not found: user@example.com") 

101 >>> str(error) 

102 'User not found: user@example.com' 

103 >>> isinstance(error, TeamManagementError) 

104 True 

105 """ 

106 

107 

108class MemberAlreadyExistsError(TeamManagementError): 

109 """Raised when a user is already a member of the team. 

110 

111 Examples: 

112 >>> error = MemberAlreadyExistsError("User user@example.com is already a member of team team-123") 

113 >>> str(error) 

114 'User user@example.com is already a member of team team-123' 

115 >>> isinstance(error, TeamManagementError) 

116 True 

117 """ 

118 

119 

120class TeamMemberLimitExceededError(TeamManagementError): 

121 """Raised when a team has reached its maximum member limit. 

122 

123 Examples: 

124 >>> error = TeamMemberLimitExceededError("Team has reached maximum member limit of 10") 

125 >>> str(error) 

126 'Team has reached maximum member limit of 10' 

127 >>> isinstance(error, TeamManagementError) 

128 True 

129 """ 

130 

131 

132class TeamMemberAddError(TeamManagementError): 

133 """Raised when adding a member to a team fails due to database or system errors. 

134 

135 Examples: 

136 >>> error = TeamMemberAddError("Failed to add member due to database error") 

137 >>> str(error) 

138 'Failed to add member due to database error' 

139 >>> isinstance(error, TeamManagementError) 

140 True 

141 """ 

142 

143 

144class _Unset: 

145 """Sentinel type: distinguishes 'caller omitted the argument' from 'caller passed None'.""" 

146 

147 __slots__ = () 

148 

149 def __repr__(self) -> str: 

150 """Return string representation of the unset sentinel. 

151 

152 Returns: 

153 str: The literal string ``"UNSET"``. 

154 """ 

155 return "UNSET" 

156 

157 def __bool__(self) -> bool: 

158 """Make the sentinel falsy so truthiness checks treat it as not provided. 

159 

160 Returns: 

161 bool: Always ``False``. 

162 """ 

163 return False 

164 

165 

166UNSET = _Unset() 

167 

168 

169def get_effective_max_members(team: "EmailTeam") -> int: 

170 """Return the effective member limit for a team. 

171 

172 If the team has an explicit ``max_members`` value stored in the DB, that 

173 value is used. Otherwise the global ``MAX_MEMBERS_PER_TEAM`` setting is 

174 returned so that changing the environment variable takes effect for all 

175 teams that have not been individually overridden. 

176 

177 Args: 

178 team: The team whose effective member limit should be resolved. 

179 

180 Returns: 

181 The member limit (always an int; 0 means unlimited). 

182 """ 

183 if team.max_members is not None: 

184 return team.max_members 

185 return settings.max_members_per_team 

186 

187 

188def check_team_member_capacity(db: "Session", team: "EmailTeam", *, extra_count: int = 0) -> None: 

189 """Raise if the team is at or over its member capacity. 

190 

191 Args: 

192 db: Database session for querying active member count. 

193 team: The team to check. 

194 extra_count: Additional slots to reserve (e.g. pending invitations). 

195 

196 Raises: 

197 TeamMemberLimitExceededError: If the team is at capacity. 

198 """ 

199 effective_max = get_effective_max_members(team) 

200 if effective_max <= 0: 

201 return 

202 member_count = db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.is_active.is_(True)).count() 

203 if (member_count + extra_count) >= effective_max: 

204 raise TeamMemberLimitExceededError(f"Team has reached maximum member limit of {effective_max}") 

205 

206 

207class TeamManagementService: 

208 """Service for team management operations. 

209 

210 This service handles team creation, membership management, 

211 role assignments, and team access control. 

212 

213 Attributes: 

214 db (Session): SQLAlchemy database session 

215 

216 Examples: 

217 >>> from unittest.mock import Mock 

218 >>> service = TeamManagementService(Mock()) 

219 >>> service.__class__.__name__ 

220 'TeamManagementService' 

221 >>> hasattr(service, 'db') 

222 True 

223 """ 

224 

225 def __init__(self, db: Session): 

226 """Initialize the team management service. 

227 

228 Args: 

229 db: SQLAlchemy database session 

230 

231 Examples: 

232 Basic initialization: 

233 >>> from mcpgateway.services.team_management_service import TeamManagementService 

234 >>> from unittest.mock import Mock 

235 >>> db_session = Mock() 

236 >>> service = TeamManagementService(db_session) 

237 >>> service.db is db_session 

238 True 

239 

240 Service attributes: 

241 >>> hasattr(service, 'db') 

242 True 

243 >>> service.__class__.__name__ 

244 'TeamManagementService' 

245 """ 

246 self.db = db 

247 self._role_service = None # Lazy initialization to avoid circular imports 

248 

249 @property 

250 def role_service(self): 

251 """Lazy-initialized RoleService to avoid circular imports. 

252 

253 Returns: 

254 RoleService: Instance of RoleService 

255 """ 

256 if self._role_service is None: 

257 # First-Party 

258 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

259 

260 self._role_service = RoleService(self.db) 

261 return self._role_service 

262 

263 def _get_user_team_count(self, user_email: str) -> int: 

264 """Get the number of active teams a user belongs to. 

265 

266 Args: 

267 user_email: Email address of the user 

268 

269 Returns: 

270 int: Number of active team memberships 

271 """ 

272 return get_user_team_count(self.db, user_email) 

273 

274 @staticmethod 

275 def _get_rbac_role_name(membership_role: str) -> str: 

276 """Map a team membership role to the corresponding configurable RBAC role name. 

277 

278 Args: 

279 membership_role: Team membership role ('owner' or 'member'). 

280 

281 Returns: 

282 str: The configured RBAC role name from settings. 

283 """ 

284 return settings.default_team_owner_role if membership_role == "owner" else settings.default_team_member_role 

285 

286 @staticmethod 

287 def _fire_and_forget(coro: Any) -> None: 

288 """Schedule a background coroutine and close it if scheduling fails. 

289 

290 Args: 

291 coro: The coroutine to schedule as a background task. 

292 

293 Raises: 

294 Exception: If asyncio.create_task fails (e.g. no running loop). 

295 """ 

296 try: 

297 task = asyncio.create_task(coro) 

298 # Some tests patch create_task with a plain Mock return value. In that 

299 # case the coroutine is never actually scheduled and must be closed. 

300 if asyncio.iscoroutine(coro) and not isinstance(task, asyncio.Task): 

301 close = getattr(coro, "close", None) 

302 if callable(close): 

303 close() 

304 except Exception: 

305 # If create_task() fails (e.g. no running loop), the coroutine has 

306 # already been created and must be closed to avoid runtime warnings. 

307 close = getattr(coro, "close", None) 

308 if callable(close): 

309 close() 

310 raise 

311 

312 def _log_team_member_action(self, team_member_id: str, team_id: str, user_email: str, role: str, action: str, action_by: Optional[str]): 

313 """ 

314 Log a team member action to EmailTeamMemberHistory. 

315 

316 Args: 

317 team_member_id: ID of the EmailTeamMember 

318 team_id: Team ID 

319 user_email: Email of the affected user 

320 role: Role at the time of action 

321 action: Action type ("added", "removed", "reactivated", "role_changed") 

322 action_by: Email of the user who performed the action 

323 

324 Examples: 

325 >>> from mcpgateway.services.team_management_service import TeamManagementService 

326 >>> from unittest.mock import Mock 

327 >>> service = TeamManagementService(Mock()) 

328 >>> service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com") 

329 """ 

330 history = EmailTeamMemberHistory(team_member_id=team_member_id, team_id=team_id, user_email=user_email, role=role, action=action, action_by=action_by, action_timestamp=utc_now()) 

331 self.db.add(history) 

332 self.db.commit() 

333 

334 async def _assign_team_rbac_role(self, user_email: str, team_id: str, membership_role: str, granted_by: Optional[str] = None) -> None: 

335 """Assign a team-scoped RBAC role matching the membership role. 

336 

337 Looks up the configured RBAC role name for *membership_role* (owner 

338 or member) and assigns it if the user does not already have an 

339 active assignment. Errors are logged but do not propagate. 

340 

341 Args: 

342 user_email: Email of the user to assign the role to. 

343 team_id: Team ID for the scoped assignment. 

344 membership_role: Team membership role (``"owner"`` or ``"member"``). 

345 granted_by: Email of the user who triggered the assignment. 

346 """ 

347 try: 

348 rbac_role_name = self._get_rbac_role_name(membership_role) 

349 team_rbac_role = await self.role_service.get_role_by_name(rbac_role_name, scope="team") 

350 if team_rbac_role: 

351 existing = await self.role_service.get_user_role_assignment(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id) 

352 if not existing or not existing.is_active: 

353 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_rbac_role.id, scope="team", scope_id=team_id, granted_by=granted_by or user_email) 

354 logger.info(f"Assigned {rbac_role_name} role to {SecurityValidator.sanitize_log_message(user_email)} for team {SecurityValidator.sanitize_log_message(team_id)}") 

355 else: 

356 logger.debug(f"User {SecurityValidator.sanitize_log_message(user_email)} already has active {rbac_role_name} role for team {SecurityValidator.sanitize_log_message(team_id)}") 

357 else: 

358 logger.warning(f"Role '{rbac_role_name}' not found. User {SecurityValidator.sanitize_log_message(user_email)} added without RBAC role.") 

359 except Exception as role_error: 

360 logger.warning(f"Failed to assign role to {SecurityValidator.sanitize_log_message(user_email)}: {role_error}") 

361 

362 def _invalidate_membership_caches(self, user_email: str, team_id: str, *, include_admin_stats: bool = True) -> None: 

363 """Invalidate auth and admin caches after a membership change. 

364 

365 Errors are logged at debug level but do not propagate. 

366 

367 Args: 

368 user_email: Email of the affected user. 

369 team_id: Team ID whose caches should be invalidated. 

370 include_admin_stats: Whether to also invalidate admin stats (skip on removal). 

371 """ 

372 try: 

373 self._fire_and_forget(auth_cache.invalidate_team(user_email)) 

374 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id)) 

375 self._fire_and_forget(auth_cache.invalidate_user_teams(user_email)) 

376 self._fire_and_forget(auth_cache.invalidate_team_membership(user_email)) 

377 if include_admin_stats: 

378 self._fire_and_forget(admin_stats_cache.invalidate_teams()) 

379 except Exception as cache_error: 

380 logger.debug(f"Failed to invalidate membership caches for {SecurityValidator.sanitize_log_message(user_email)}: {cache_error}") 

381 

382 def _check_user_team_limit(self, user_email: str) -> None: 

383 """Raise if the user has reached the maximum team membership limit. 

384 

385 Args: 

386 user_email: Email of the user to check. 

387 

388 Raises: 

389 TeamManagementError: If the user is at the configured limit. 

390 """ 

391 max_teams = getattr(settings, "max_teams_per_user", 50) 

392 if self._get_user_team_count(user_email) >= max_teams: 

393 raise TeamManagementError(f"User has reached the maximum team limit of {max_teams}") 

394 

395 async def create_team( 

396 self, name: str, description: Optional[str], created_by: str, visibility: Optional[str] = "public", max_members: Optional[int] = None, skip_limits: bool = False 

397 ) -> EmailTeam: 

398 """Create a new team. 

399 

400 Args: 

401 name: Team name 

402 description: Team description 

403 created_by: Email of the user creating the team 

404 visibility: Team visibility (private, team, public) 

405 max_members: Maximum number of team members allowed 

406 skip_limits: Skip max_teams_per_user check (for admin bypass) 

407 

408 Returns: 

409 EmailTeam: The created team 

410 

411 Raises: 

412 ValueError: If team name is taken or invalid 

413 Exception: If team creation fails 

414 

415 Examples: 

416 Team creation parameter validation: 

417 >>> from mcpgateway.services.team_management_service import TeamManagementService 

418 

419 Test team name validation: 

420 >>> team_name = "My Development Team" 

421 >>> len(team_name) > 0 

422 True 

423 >>> len(team_name) <= 255 

424 True 

425 >>> bool(team_name.strip()) 

426 True 

427 

428 Test visibility validation: 

429 >>> visibility = "private" 

430 >>> valid_visibilities = ["private", "public"] 

431 >>> visibility in valid_visibilities 

432 True 

433 >>> "invalid" in valid_visibilities 

434 False 

435 

436 Test max_members validation: 

437 >>> max_members = 50 

438 >>> isinstance(max_members, int) 

439 True 

440 >>> max_members > 0 

441 True 

442 

443 Test creator validation: 

444 >>> created_by = "admin@example.com" 

445 >>> "@" in created_by 

446 True 

447 >>> len(created_by) > 0 

448 True 

449 

450 Test description handling: 

451 >>> description = "A team for software development" 

452 >>> description is not None 

453 True 

454 >>> isinstance(description, str) 

455 True 

456 

457 >>> # Test None description 

458 >>> description_none = None 

459 >>> description_none is None 

460 True 

461 """ 

462 try: 

463 # Validate visibility 

464 valid_visibilities = ["private", "public"] 

465 if visibility not in valid_visibilities: 

466 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}") 

467 

468 # Check max teams per user 

469 if not skip_limits: 

470 max_teams = getattr(settings, "max_teams_per_user", 50) 

471 if self._get_user_team_count(created_by) >= max_teams: 

472 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

473 

474 # Enforce max_members cap for non-admins (only when explicitly provided) 

475 if not skip_limits and max_members is not None: 

476 max_limit = settings.max_members_per_team 

477 if max_members > max_limit: 

478 raise ValueError(f"max_members cannot exceed the configured limit of {max_limit}") 

479 

480 # If max_members is not explicitly provided, leave it as None in the DB. 

481 # The effective limit will be resolved at check time from settings.max_members_per_team, 

482 # so changing the env var affects all teams that don't have an explicit override. 

483 

484 # Check for existing inactive team with same name 

485 

486 potential_slug = slugify(name) 

487 existing_inactive_team = self.db.query(EmailTeam).filter(EmailTeam.slug == potential_slug, EmailTeam.is_active.is_(False)).first() 

488 

489 if existing_inactive_team: 

490 # Reactivate the existing team with new details 

491 existing_inactive_team.name = name 

492 existing_inactive_team.description = description 

493 existing_inactive_team.created_by = created_by 

494 existing_inactive_team.visibility = visibility 

495 existing_inactive_team.max_members = max_members 

496 existing_inactive_team.is_active = True 

497 existing_inactive_team.updated_at = utc_now() 

498 team = existing_inactive_team 

499 

500 # Check if the creator already has an inactive membership 

501 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team.id, EmailTeamMember.user_email == created_by).first() 

502 

503 if existing_membership: 

504 # Reactivate existing membership as owner 

505 existing_membership.role = "owner" 

506 existing_membership.joined_at = utc_now() 

507 existing_membership.is_active = True 

508 membership = existing_membership 

509 else: 

510 # Create new membership 

511 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True) 

512 self.db.add(membership) 

513 

514 logger.info(f"Reactivated existing team with slug {potential_slug}") 

515 else: 

516 # Create the team (slug will be auto-generated by event listener) 

517 team = EmailTeam(name=name, description=description, created_by=created_by, is_personal=False, visibility=visibility, max_members=max_members, is_active=True) 

518 self.db.add(team) 

519 

520 self.db.flush() # Get the team ID 

521 

522 # Add the creator as owner 

523 membership = EmailTeamMember(team_id=team.id, user_email=created_by, role="owner", joined_at=utc_now(), is_active=True) 

524 self.db.add(membership) 

525 

526 self.db.commit() 

527 

528 # Invalidate member count cache for the new team 

529 await self.invalidate_team_member_count_cache(str(team.id)) 

530 

531 # Invalidate auth cache for creator's team membership 

532 # Without this, the cache won't know the user belongs to this new team 

533 try: 

534 await auth_cache.invalidate_user_teams(created_by) 

535 await auth_cache.invalidate_team_membership(created_by) 

536 await auth_cache.invalidate_user_role(created_by, str(team.id)) 

537 await admin_stats_cache.invalidate_teams() 

538 except Exception as cache_error: 

539 logger.debug(f"Failed to invalidate cache on team create: {cache_error}") 

540 

541 logger.info(f"Created team '{SecurityValidator.sanitize_log_message(team.name)}' by {created_by}") 

542 return team 

543 

544 except Exception as e: 

545 self.db.rollback() 

546 logger.error(f"Failed to create team '{name}': {e}") 

547 raise 

548 

549 async def get_team_by_id(self, team_id: str) -> Optional[EmailTeam]: 

550 """Get a team by ID. 

551 

552 Args: 

553 team_id: Team ID to lookup 

554 

555 Returns: 

556 EmailTeam: The team or None if not found 

557 

558 Examples: 

559 >>> import asyncio 

560 >>> from unittest.mock import Mock 

561 >>> service = TeamManagementService(Mock()) 

562 >>> asyncio.iscoroutinefunction(service.get_team_by_id) 

563 True 

564 """ 

565 try: 

566 team = self.db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

567 self.db.commit() # Release transaction to avoid idle-in-transaction 

568 return team 

569 

570 except Exception as e: 

571 self.db.rollback() 

572 logger.error(f"Failed to get team by ID {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

573 return None 

574 

575 async def get_team_by_slug(self, slug: str) -> Optional[EmailTeam]: 

576 """Get a team by slug. 

577 

578 Args: 

579 slug: Team slug to lookup 

580 

581 Returns: 

582 EmailTeam: The team or None if not found 

583 

584 Examples: 

585 >>> import asyncio 

586 >>> from unittest.mock import Mock 

587 >>> service = TeamManagementService(Mock()) 

588 >>> asyncio.iscoroutinefunction(service.get_team_by_slug) 

589 True 

590 """ 

591 try: 

592 team = self.db.query(EmailTeam).filter(EmailTeam.slug == slug, EmailTeam.is_active.is_(True)).first() 

593 self.db.commit() # Release transaction to avoid idle-in-transaction 

594 return team 

595 

596 except Exception as e: 

597 self.db.rollback() 

598 logger.error(f"Failed to get team by slug {slug}: {e}") 

599 return None 

600 

601 async def update_team( 

602 self, 

603 team_id: str, 

604 name: Optional[str] = None, 

605 description: Optional[str] = None, 

606 visibility: Optional[str] = None, 

607 max_members: Union[int, None, _Unset] = UNSET, 

608 updated_by: Optional[str] = None, 

609 skip_limits: bool = False, 

610 ) -> bool: 

611 """Update team information. 

612 

613 Args: 

614 team_id: ID of the team to update 

615 name: New team name 

616 description: New team description 

617 visibility: New visibility setting 

618 max_members: New maximum member limit. Pass ``None`` to clear an 

619 explicit per-team override (reverts to global default). Omit 

620 (or pass ``UNSET``) to leave the current value unchanged. 

621 updated_by: Email of user making the update 

622 skip_limits: Skip the max_members_per_team cap check (platform admins only) 

623 

624 Returns: 

625 bool: True if update succeeded, False otherwise 

626 

627 Raises: 

628 ValueError: If visibility setting is invalid 

629 

630 Examples: 

631 >>> import asyncio 

632 >>> from unittest.mock import Mock 

633 >>> service = TeamManagementService(Mock()) 

634 >>> asyncio.iscoroutinefunction(service.update_team) 

635 True 

636 """ 

637 try: 

638 team = await self.get_team_by_id(team_id) 

639 if not team: 

640 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found for update") 

641 return False 

642 

643 # Prevent updating personal teams 

644 if team.is_personal: 

645 logger.warning(f"Cannot update personal team {SecurityValidator.sanitize_log_message(team_id)}") 

646 return False 

647 

648 # Update fields if provided 

649 if name is not None: 

650 team.name = name 

651 # Slug will be updated by event listener if name changes 

652 

653 if description is not None: 

654 team.description = description 

655 

656 if visibility is not None: 

657 valid_visibilities = ["private", "public"] 

658 if visibility not in valid_visibilities: 

659 raise ValueError(f"Invalid visibility. Must be one of: {', '.join(valid_visibilities)}") 

660 team.visibility = visibility 

661 

662 # UNSET means "not provided" — leave unchanged. 

663 # None means "explicitly clear the per-team override" — store NULL. 

664 # An int means "set an explicit per-team limit". 

665 if max_members is not UNSET: 

666 if max_members is not None and not skip_limits: 

667 max_limit = settings.max_members_per_team 

668 if max_members > max_limit: 

669 raise ValueError(f"max_members cannot exceed the configured limit of {max_limit}") 

670 team.max_members = max_members 

671 

672 team.updated_at = utc_now() 

673 self.db.commit() 

674 

675 logger.info(f"Updated team {SecurityValidator.sanitize_log_message(team_id)} by {updated_by}") 

676 return True 

677 

678 except ValueError: 

679 raise # Let ValueError propagate to caller for proper error handling 

680 except Exception as e: 

681 self.db.rollback() 

682 logger.error(f"Failed to update team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

683 return False 

684 

685 async def delete_team(self, team_id: str, deleted_by: str) -> bool: 

686 """Delete a team (soft delete). 

687 

688 Args: 

689 team_id: ID of the team to delete 

690 deleted_by: Email of user performing deletion 

691 

692 Returns: 

693 bool: True if deletion succeeded, False otherwise 

694 

695 Raises: 

696 ValueError: If attempting to delete a personal team 

697 

698 Examples: 

699 >>> import asyncio 

700 >>> from unittest.mock import Mock 

701 >>> service = TeamManagementService(Mock()) 

702 >>> asyncio.iscoroutinefunction(service.delete_team) 

703 True 

704 """ 

705 try: 

706 team = await self.get_team_by_id(team_id) 

707 if not team: 

708 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found for deletion") 

709 return False 

710 

711 # Prevent deleting personal teams 

712 if team.is_personal: 

713 logger.warning(f"Cannot delete personal team {SecurityValidator.sanitize_log_message(team_id)}") 

714 raise ValueError("Personal teams cannot be deleted") 

715 

716 # Soft delete the team 

717 team.is_active = False 

718 team.updated_at = utc_now() 

719 

720 # Get all active memberships before deactivating (for history logging) 

721 memberships = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).all() 

722 

723 # Log history for each membership (before bulk update) 

724 for membership in memberships: 

725 self._log_team_member_action(membership.id, team_id, membership.user_email, membership.role, "team-deleted", deleted_by) 

726 

727 # Bulk update: deactivate all memberships in single query instead of looping 

728 self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).update({EmailTeamMember.is_active: False}, synchronize_session=False) 

729 

730 self.db.commit() 

731 

732 # Invalidate all role caches for this team 

733 try: 

734 self._fire_and_forget(auth_cache.invalidate_team_roles(team_id)) 

735 self._fire_and_forget(admin_stats_cache.invalidate_teams()) 

736 # Also invalidate team cache, teams list cache, and team membership cache for each member 

737 for membership in memberships: 

738 self._fire_and_forget(auth_cache.invalidate_team(membership.user_email)) 

739 self._fire_and_forget(auth_cache.invalidate_user_teams(membership.user_email)) 

740 self._fire_and_forget(auth_cache.invalidate_team_membership(membership.user_email)) 

741 except Exception as cache_error: 

742 logger.debug(f"Failed to invalidate caches on team delete: {cache_error}") 

743 

744 logger.info(f"Deleted team {SecurityValidator.sanitize_log_message(team_id)} by {deleted_by}") 

745 return True 

746 

747 except Exception as e: 

748 self.db.rollback() 

749 logger.error(f"Failed to delete team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

750 return False 

751 

752 async def add_member_to_team(self, team_id: str, user_email: str, role: str = "member", invited_by: Optional[str] = None, grant_source: Optional[str] = None) -> EmailTeamMember: 

753 """Add a member to a team. 

754 

755 Args: 

756 team_id: ID of the team 

757 user_email: Email of the user to add 

758 role: Role to assign (owner, member) 

759 invited_by: Email of user who added this member 

760 grant_source: Origin of grant (e.g., 'sso', 'manual', 'bootstrap', 'auto') 

761 

762 Returns: 

763 EmailTeamMember: The created or reactivated team member object 

764 

765 Raises: 

766 InvalidRoleError: If role is invalid 

767 TeamNotFoundError: If team does not exist 

768 TeamManagementError: If team is a personal team 

769 UserNotFoundError: If user does not exist 

770 MemberAlreadyExistsError: If user is already a member 

771 TeamMemberLimitExceededError: If team has reached maximum member limit 

772 TeamMemberAddError: If adding member fails due to database or system errors 

773 

774 Examples: 

775 >>> import asyncio 

776 >>> from unittest.mock import Mock 

777 >>> service = TeamManagementService(Mock()) 

778 >>> asyncio.iscoroutinefunction(service.add_member_to_team) 

779 True 

780 >>> # After adding, EmailTeamMemberHistory is updated 

781 >>> # service._log_team_member_action("tm-123", "team-123", "user@example.com", "member", "added", "admin@example.com") 

782 """ 

783 # Validate role 

784 valid_roles = ["owner", "member"] 

785 if role not in valid_roles: 

786 raise InvalidRoleError(f"Invalid role '{role}'. Must be one of: {', '.join(valid_roles)}") 

787 

788 # Check if team exists 

789 team = await self.get_team_by_id(team_id) 

790 if not team: 

791 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found") 

792 raise TeamNotFoundError("Team not found") 

793 

794 # Prevent adding members to personal teams 

795 if team.is_personal: 

796 logger.warning(f"Cannot add members to personal team {SecurityValidator.sanitize_log_message(team_id)}") 

797 raise TeamManagementError("Cannot add members to personal teams") 

798 

799 # Check if user exists 

800 user = self.db.query(EmailUser).filter(EmailUser.email == user_email).first() 

801 if not user: 

802 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} not found") 

803 raise UserNotFoundError("User not found") 

804 

805 self._check_user_team_limit(user_email) 

806 

807 # Check if user is already a member 

808 existing_membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email).first() 

809 

810 if existing_membership and existing_membership.is_active: 

811 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} is already a member of team {SecurityValidator.sanitize_log_message(team_id)}") 

812 raise MemberAlreadyExistsError("User is already a member of this team") 

813 

814 # Check team member limit (explicit per-team value or global default) 

815 check_team_member_capacity(self.db, team) 

816 

817 # Add or reactivate membership 

818 try: 

819 if existing_membership: 

820 existing_membership.is_active = True 

821 existing_membership.role = role 

822 existing_membership.joined_at = utc_now() 

823 existing_membership.invited_by = invited_by 

824 if grant_source is not None: 

825 existing_membership.grant_source = grant_source 

826 self.db.commit() 

827 self._log_team_member_action(existing_membership.id, team_id, user_email, role, "reactivated", invited_by) 

828 member = existing_membership 

829 else: 

830 membership = EmailTeamMember(team_id=team_id, user_email=user_email, role=role, joined_at=utc_now(), invited_by=invited_by, grant_source=grant_source, is_active=True) 

831 self.db.add(membership) 

832 self.db.commit() 

833 self._log_team_member_action(membership.id, team_id, user_email, role, "added", invited_by) 

834 member = membership 

835 

836 await self._assign_team_rbac_role(user_email, team_id, role, granted_by=invited_by) 

837 self._invalidate_membership_caches(user_email, team_id) 

838 await self.invalidate_team_member_count_cache(str(team_id)) 

839 

840 logger.info(f"Added {SecurityValidator.sanitize_log_message(user_email)} to team {SecurityValidator.sanitize_log_message(team_id)} with role {role}") 

841 return member 

842 

843 except Exception as e: 

844 self.db.rollback() 

845 logger.error(f"Failed to add {SecurityValidator.sanitize_log_message(user_email)} to team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

846 raise TeamMemberAddError("Failed to add member to team") from e 

847 

848 async def remove_member_from_team(self, team_id: str, user_email: str, removed_by: Optional[str] = None) -> bool: 

849 """Remove a member from a team. 

850 

851 Args: 

852 team_id: ID of the team 

853 user_email: Email of the user to remove 

854 removed_by: Email of user performing the removal 

855 

856 Returns: 

857 bool: True if member was removed successfully, False otherwise 

858 

859 Raises: 

860 ValueError: If attempting to remove the last owner 

861 

862 Examples: 

863 Team membership management with role-based access control. 

864 After removal, EmailTeamMemberHistory is updated via _log_team_member_action. 

865 """ 

866 try: 

867 team = await self.get_team_by_id(team_id) 

868 if not team: 

869 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found") 

870 return False 

871 

872 # Prevent removing members from personal teams 

873 if team.is_personal: 

874 logger.warning(f"Cannot remove members from personal team {SecurityValidator.sanitize_log_message(team_id)}") 

875 return False 

876 

877 # Find the membership 

878 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

879 

880 if not membership: 

881 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} is not a member of team {SecurityValidator.sanitize_log_message(team_id)}") 

882 return False 

883 

884 # Prevent removing the last owner 

885 if membership.role == "owner": 

886 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

887 

888 if owner_count <= 1: 

889 logger.warning(f"Cannot remove the last owner from team {SecurityValidator.sanitize_log_message(team_id)}") 

890 raise ValueError("Cannot remove the last owner from a team") 

891 

892 # Remove membership (soft delete) 

893 membership.is_active = False 

894 self.db.commit() 

895 self._log_team_member_action(membership.id, team_id, user_email, membership.role, "removed", removed_by) 

896 

897 # Revoke all team-scoped RBAC roles from removed member defensively 

898 # (revoke both owner and member roles to handle edge cases) 

899 try: 

900 for role_name in (settings.default_team_owner_role, settings.default_team_member_role): 

901 rbac_role = await self.role_service.get_role_by_name(role_name, scope="team") 

902 if rbac_role: 

903 revoked = await self.role_service.revoke_role_from_user(user_email=user_email, role_id=rbac_role.id, scope="team", scope_id=team_id) 

904 if revoked: 

905 logger.info(f"Revoked {role_name} role from {SecurityValidator.sanitize_log_message(user_email)} for team {SecurityValidator.sanitize_log_message(team_id)}") 

906 except Exception as role_error: 

907 logger.warning(f"Failed to revoke roles from {SecurityValidator.sanitize_log_message(user_email)}: {role_error}") 

908 

909 self._invalidate_membership_caches(user_email, team_id, include_admin_stats=False) 

910 await self.invalidate_team_member_count_cache(str(team_id)) 

911 

912 logger.info(f"Removed {SecurityValidator.sanitize_log_message(user_email)} from team {SecurityValidator.sanitize_log_message(team_id)} by {removed_by}") 

913 return True 

914 

915 except Exception as e: 

916 self.db.rollback() 

917 logger.error(f"Failed to remove {SecurityValidator.sanitize_log_message(user_email)} from team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

918 return False 

919 

920 async def update_member_role(self, team_id: str, user_email: str, new_role: str, updated_by: Optional[str] = None) -> bool: 

921 """Update a team member's role. 

922 

923 Args: 

924 team_id: ID of the team 

925 user_email: Email of the user whose role to update 

926 new_role: New role to assign 

927 updated_by: Email of user making the change 

928 

929 Returns: 

930 bool: True if role was updated successfully, False otherwise 

931 

932 Raises: 

933 ValueError: If role is invalid or removing last owner role 

934 

935 Examples: 

936 Role management within teams for access control. 

937 After role update, EmailTeamMemberHistory is updated via _log_team_member_action. 

938 """ 

939 try: 

940 # Validate role 

941 valid_roles = ["owner", "member"] 

942 if new_role not in valid_roles: 

943 raise ValueError(f"Invalid role. Must be one of: {', '.join(valid_roles)}") 

944 

945 team = await self.get_team_by_id(team_id) 

946 if not team: 

947 logger.warning(f"Team {SecurityValidator.sanitize_log_message(team_id)} not found") 

948 return False 

949 

950 # Prevent updating roles in personal teams 

951 if team.is_personal: 

952 logger.warning(f"Cannot update roles in personal team {SecurityValidator.sanitize_log_message(team_id)}") 

953 return False 

954 

955 # Find the membership 

956 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

957 

958 if not membership: 

959 logger.warning(f"User {SecurityValidator.sanitize_log_message(user_email)} is not a member of team {SecurityValidator.sanitize_log_message(team_id)}") 

960 return False 

961 

962 # Prevent changing the role of the last owner to non-owner 

963 if membership.role == "owner" and new_role != "owner": 

964 owner_count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

965 

966 if owner_count <= 1: 

967 logger.warning(f"Cannot remove owner role from the last owner of team {SecurityValidator.sanitize_log_message(team_id)}") 

968 raise ValueError("Cannot remove owner role from the last owner of a team") 

969 

970 # Update the role 

971 old_role = membership.role 

972 membership.role = new_role 

973 self.db.commit() 

974 self._log_team_member_action(membership.id, team_id, user_email, new_role, "role_changed", updated_by) 

975 

976 # Handle RBAC role changes when team membership role changes 

977 if old_role != new_role: 

978 try: 

979 # Get both role types 

980 team_member_role = await self.role_service.get_role_by_name(settings.default_team_member_role, scope="team") 

981 team_owner_role = await self.role_service.get_role_by_name(settings.default_team_owner_role, scope="team") 

982 

983 # Handle role transitions 

984 if old_role == "member" and new_role == "owner": 

985 # member -> owner: revoke member role, assign owner role 

986 if team_member_role: 

987 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id) 

988 if team_owner_role: 

989 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email) 

990 logger.info( 

991 f"Transitioned RBAC role from {settings.default_team_member_role} to {settings.default_team_owner_role} for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}" 

992 ) 

993 

994 elif old_role == "owner" and new_role == "member": 

995 # owner -> member: revoke owner role, assign member role 

996 if team_owner_role: 

997 await self.role_service.revoke_role_from_user(user_email=user_email, role_id=team_owner_role.id, scope="team", scope_id=team_id) 

998 if team_member_role: 

999 await self.role_service.assign_role_to_user(user_email=user_email, role_id=team_member_role.id, scope="team", scope_id=team_id, granted_by=updated_by or user_email) 

1000 logger.info( 

1001 f"Transitioned RBAC role from {settings.default_team_owner_role} to {settings.default_team_member_role} for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}" 

1002 ) 

1003 

1004 except Exception as role_error: 

1005 logger.warning(f"Failed to update RBAC roles for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {role_error}") 

1006 # Don't fail the membership role update if RBAC role update fails 

1007 

1008 # Invalidate role cache 

1009 try: 

1010 self._fire_and_forget(auth_cache.invalidate_user_role(user_email, team_id)) 

1011 except Exception as cache_error: 

1012 logger.debug(f"Failed to invalidate cache on role update: {cache_error}") 

1013 

1014 logger.info(f"Updated role of {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)} to {new_role} by {updated_by}") 

1015 return True 

1016 

1017 except ValueError: 

1018 raise # Let ValueError propagate to caller for proper error handling 

1019 except Exception as e: 

1020 self.db.rollback() 

1021 logger.error(f"Failed to update role of {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

1022 return False 

1023 

1024 async def get_member(self, team_id: str, user_email: str) -> Optional[EmailTeamMember]: 

1025 """Get a single team member by team ID and user email. 

1026 

1027 Args: 

1028 team_id: ID of the team 

1029 user_email: Email of the user 

1030 

1031 Returns: 

1032 EmailTeamMember if found and active, None otherwise 

1033 """ 

1034 try: 

1035 return self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

1036 except Exception as e: 

1037 logger.error(f"Failed to get member {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

1038 return None 

1039 

1040 async def get_user_teams(self, user_email: str, include_personal: bool = True) -> List[EmailTeam]: 

1041 """Get all teams a user belongs to. 

1042 

1043 Uses caching to reduce database queries (called 20+ times per request). 

1044 Cache can be disabled via AUTH_CACHE_TEAMS_ENABLED=false. 

1045 

1046 Args: 

1047 user_email: Email of the user 

1048 include_personal: Whether to include personal teams 

1049 

1050 Returns: 

1051 List[EmailTeam]: List of teams the user belongs to 

1052 

1053 Examples: 

1054 User dashboard showing team memberships. 

1055 """ 

1056 # Check cache first 

1057 cache = self._get_auth_cache() 

1058 cache_key = f"{user_email}:{include_personal}" 

1059 

1060 if cache: 

1061 cached_team_ids = await cache.get_user_teams(cache_key) 

1062 if cached_team_ids is not None: 

1063 if not cached_team_ids: # Empty list = user has no teams 

1064 return [] 

1065 # Fetch full team objects by IDs (fast indexed lookup) 

1066 try: 

1067 teams = self.db.query(EmailTeam).filter(EmailTeam.id.in_(cached_team_ids), EmailTeam.is_active.is_(True)).all() 

1068 self.db.commit() # Release transaction to avoid idle-in-transaction 

1069 return teams 

1070 except Exception as e: 

1071 self.db.rollback() 

1072 logger.warning(f"Failed to fetch teams by IDs from cache: {e}") 

1073 # Fall through to full query 

1074 

1075 # Cache miss or caching disabled - do full query 

1076 try: 

1077 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True)) 

1078 

1079 if not include_personal: 

1080 query = query.filter(EmailTeam.is_personal.is_(False)) 

1081 

1082 teams = query.all() 

1083 self.db.commit() # Release transaction to avoid idle-in-transaction 

1084 

1085 # Update cache with team IDs 

1086 if cache: 

1087 team_ids = [t.id for t in teams] 

1088 await cache.set_user_teams(cache_key, team_ids) 

1089 

1090 return teams 

1091 

1092 except Exception as e: 

1093 self.db.rollback() 

1094 logger.error(f"Failed to get teams for user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1095 return [] 

1096 

1097 async def verify_team_for_user(self, user_email, team_id=None): 

1098 """ 

1099 Retrieve a team ID for a user based on their membership and optionally a specific team ID. 

1100 

1101 This function attempts to fetch all teams associated with the given user email. 

1102 If no `team_id` is provided, it returns the ID of the user's personal team (if any). 

1103 If a `team_id` is provided, it checks whether the user is a member of that team. 

1104 If the user is not a member of the specified team, it returns a JSONResponse with an error message. 

1105 

1106 Args: 

1107 user_email (str): The email of the user whose teams are being queried. 

1108 team_id (str or None, optional): Specific team ID to check for membership. Defaults to None. 

1109 

1110 Returns: 

1111 str or JSONResponse or None: 

1112 - If `team_id` is None, returns the ID of the user's personal team or None if not found. 

1113 - If `team_id` is provided and the user is a member of that team, returns `team_id`. 

1114 - If `team_id` is provided but the user is not a member of that team, returns a JSONResponse with error. 

1115 - Returns None if an error occurs and no `team_id` was initially provided. 

1116 

1117 Raises: 

1118 None explicitly, but any exceptions during the process are caught and logged. 

1119 

1120 Examples: 

1121 Verifies user team if team_id provided otherwise finds its personal id. 

1122 """ 

1123 try: 

1124 # Get all teams the user belongs to in a single query 

1125 try: 

1126 query = self.db.query(EmailTeam).join(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True), EmailTeam.is_active.is_(True)) 

1127 user_teams = query.all() 

1128 self.db.commit() # Release transaction to avoid idle-in-transaction 

1129 except Exception as e: 

1130 self.db.rollback() 

1131 logger.error(f"Failed to get teams for user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1132 return [] 

1133 

1134 if not team_id: 

1135 # If no team_id is provided, try to get the personal team 

1136 personal_team = next((t for t in user_teams if getattr(t, "is_personal", False)), None) 

1137 team_id = personal_team.id if personal_team else None 

1138 else: 

1139 # Check if the provided team_id exists among the user's teams 

1140 is_team_present = any(team.id == team_id for team in user_teams) 

1141 if not is_team_present: 

1142 return [] 

1143 except Exception as e: 

1144 self.db.rollback() 

1145 logger.error(f"Failed to verify team for user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1146 if not team_id: 

1147 team_id = None 

1148 

1149 return team_id 

1150 

1151 @staticmethod 

1152 def _escape_like(value: str) -> str: 

1153 """Escape LIKE wildcards for prefix search. 

1154 

1155 Args: 

1156 value: Raw value to escape for LIKE matching. 

1157 

1158 Returns: 

1159 Escaped string safe for LIKE patterns. 

1160 """ 

1161 return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

1162 

1163 async def get_team_members( 

1164 self, 

1165 team_id: str, 

1166 cursor: Optional[str] = None, 

1167 limit: Optional[int] = None, 

1168 page: Optional[int] = None, 

1169 per_page: Optional[int] = None, 

1170 search: Optional[str] = None, 

1171 ) -> Union[List[Tuple[EmailUser, EmailTeamMember]], Tuple[List[Tuple[EmailUser, EmailTeamMember]], Optional[str]], Dict[str, Any]]: 

1172 """Get all members of a team with optional cursor or page-based pagination. 

1173 

1174 Note: This method returns ORM objects and cannot be cached since callers 

1175 depend on ORM attributes and methods. 

1176 

1177 Args: 

1178 team_id: ID of the team 

1179 cursor: Opaque cursor token for cursor-based pagination 

1180 limit: Maximum number of members to return (for cursor-based, default: 50) 

1181 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1182 per_page: Items per page for page-based pagination (default: 30) 

1183 search: Optional search term to filter by email or full name 

1184 

1185 Returns: 

1186 - If cursor is provided: Tuple (members, next_cursor) 

1187 - If page is provided: Dict with keys 'data', 'pagination', 'links' 

1188 - If neither: List of all members (backward compatibility) 

1189 

1190 Examples: 

1191 Team member management and role display. 

1192 """ 

1193 try: 

1194 # Build base query - for pagination, select EmailTeamMember and eager-load user 

1195 # For backward compat (no pagination), select both entities as tuple 

1196 # Build optional search filter 

1197 search_filter = None 

1198 if search and search.strip(): 

1199 search_term = f"{self._escape_like(search.strip())}%" 

1200 search_filter = or_( 

1201 EmailUser.email.ilike(search_term, escape="\\"), 

1202 EmailUser.full_name.ilike(search_term, escape="\\"), 

1203 ) 

1204 

1205 if cursor is None and page is None and limit is None: 

1206 # Backward compatibility: return tuples (no pagination requested) 

1207 query = ( 

1208 select(EmailUser, EmailTeamMember) 

1209 .join(EmailTeamMember, EmailUser.email == EmailTeamMember.user_email) 

1210 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

1211 .order_by(EmailUser.full_name, EmailUser.email) 

1212 ) 

1213 if search_filter is not None: 

1214 query = query.where(search_filter) 

1215 result = self.db.execute(query) 

1216 members = list(result.all()) 

1217 self.db.commit() 

1218 return members 

1219 

1220 # For pagination: select EmailTeamMember and eager-load user to avoid N+1 

1221 query = ( 

1222 select(EmailTeamMember) 

1223 .options(selectinload(EmailTeamMember.user)) 

1224 .where(EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)) 

1225 .join(EmailUser, EmailUser.email == EmailTeamMember.user_email) 

1226 ) 

1227 if search_filter is not None: 

1228 query = query.where(search_filter) 

1229 

1230 # PAGE-BASED PAGINATION (Admin UI) - use unified_paginate 

1231 if page is not None: 

1232 # Alphabetical ordering for user-friendly display 

1233 query = query.order_by(EmailUser.full_name, EmailUser.email) 

1234 pag_result = await unified_paginate( 

1235 db=self.db, 

1236 query=query, 

1237 page=page, 

1238 per_page=per_page or 30, 

1239 cursor=None, 

1240 limit=None, 

1241 base_url=f"/admin/teams/{team_id}/members", 

1242 query_params={}, 

1243 ) 

1244 self.db.commit() 

1245 memberships = pag_result["data"] 

1246 tuples = [(m.user, m) for m in memberships] 

1247 return { 

1248 "data": tuples, 

1249 "pagination": pag_result["pagination"], 

1250 "links": pag_result["links"], 

1251 } 

1252 

1253 # CURSOR-BASED PAGINATION (API) - custom implementation using (joined_at, id) 

1254 # unified_paginate uses created_at which doesn't exist on EmailTeamMember 

1255 

1256 # Order by joined_at DESC, id DESC for keyset pagination 

1257 query = query.order_by(desc(EmailTeamMember.joined_at), desc(EmailTeamMember.id)) 

1258 

1259 # Decode cursor and apply keyset filter 

1260 if cursor: 

1261 try: 

1262 cursor_json = base64.urlsafe_b64decode(cursor.encode()).decode() 

1263 cursor_data = orjson.loads(cursor_json) 

1264 last_id = cursor_data.get("id") 

1265 joined_str = cursor_data.get("joined_at") 

1266 if last_id and joined_str: 

1267 last_joined = datetime.fromisoformat(joined_str) 

1268 # Keyset filter: (joined_at < last) OR (joined_at = last AND id < last_id) 

1269 query = query.where( 

1270 or_( 

1271 EmailTeamMember.joined_at < last_joined, 

1272 and_(EmailTeamMember.joined_at == last_joined, EmailTeamMember.id < last_id), 

1273 ) 

1274 ) 

1275 except (ValueError, TypeError) as e: 

1276 logger.warning(f"Invalid cursor for team members, ignoring: {e}") 

1277 

1278 # Fetch limit + 1 to check for more results (cap at max_page_size) 

1279 page_size = min(limit or 50, settings.pagination_max_page_size) 

1280 query = query.limit(page_size + 1) 

1281 memberships = list(self.db.execute(query).scalars().all()) 

1282 

1283 # Check if there are more results 

1284 has_more = len(memberships) > page_size 

1285 if has_more: 

1286 memberships = memberships[:page_size] 

1287 

1288 # Generate next cursor using (joined_at, id) 

1289 next_cursor = None 

1290 if has_more and memberships: 

1291 last_member = memberships[-1] 

1292 cursor_data = { 

1293 "joined_at": last_member.joined_at.isoformat() if last_member.joined_at else None, 

1294 "id": last_member.id, 

1295 } 

1296 next_cursor = base64.urlsafe_b64encode(orjson.dumps(cursor_data)).decode() 

1297 

1298 self.db.commit() 

1299 tuples = [(m.user, m) for m in memberships] 

1300 return (tuples, next_cursor) 

1301 

1302 except Exception as e: 

1303 self.db.rollback() 

1304 logger.error(f"Failed to get members for team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

1305 

1306 # Return appropriate empty response based on mode 

1307 if page is not None: 

1308 return {"data": [], "pagination": {"page": page, "per_page": per_page or 30, "total": 0, "has_next": False, "has_prev": False}, "links": None} 

1309 

1310 if cursor is not None: 

1311 return ([], None) 

1312 

1313 return [] 

1314 

1315 def count_team_owners(self, team_id: str) -> int: 

1316 """Count the number of owners in a team using SQL COUNT. 

1317 

1318 This is more efficient than loading all members and counting in Python. 

1319 

1320 Args: 

1321 team_id: ID of the team 

1322 

1323 Returns: 

1324 int: Number of active owners in the team 

1325 """ 

1326 count = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)).count() 

1327 self.db.commit() # Release transaction to avoid idle-in-transaction 

1328 return count 

1329 

1330 def _get_auth_cache(self): 

1331 """Get auth cache instance lazily. 

1332 

1333 Returns: 

1334 AuthCache instance or None if unavailable. 

1335 """ 

1336 try: 

1337 return get_auth_cache() 

1338 except ImportError: 

1339 return None 

1340 

1341 def _get_admin_stats_cache(self): 

1342 """Get admin stats cache instance lazily. 

1343 

1344 Returns: 

1345 AdminStatsCache instance or None if unavailable. 

1346 """ 

1347 try: 

1348 # First-Party 

1349 from mcpgateway.cache.admin_stats_cache import get_admin_stats_cache # pylint: disable=import-outside-toplevel 

1350 

1351 return get_admin_stats_cache() 

1352 except ImportError: 

1353 return None 

1354 

1355 async def get_user_role_in_team(self, user_email: str, team_id: str) -> Optional[str]: 

1356 """Get a user's role in a specific team. 

1357 

1358 Uses caching to reduce database queries (called 11+ times per team operation). 

1359 

1360 Args: 

1361 user_email: Email of the user 

1362 team_id: ID of the team 

1363 

1364 Returns: 

1365 str: User's role or None if not a member 

1366 

1367 Examples: 

1368 Access control and permission checking. 

1369 """ 

1370 # Check cache first 

1371 cache = self._get_auth_cache() 

1372 if cache: 

1373 cached_role = await cache.get_user_role(user_email, team_id) 

1374 if cached_role is not None: 

1375 # Empty string means "not a member" (cached None) 

1376 return cached_role if cached_role else None 

1377 

1378 try: 

1379 membership = self.db.query(EmailTeamMember).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active.is_(True)).first() 

1380 self.db.commit() # Release transaction to avoid idle-in-transaction 

1381 

1382 role = membership.role if membership else None 

1383 

1384 # Store in cache 

1385 if cache: 

1386 await cache.set_user_role(user_email, team_id, role) 

1387 

1388 return role 

1389 

1390 except Exception as e: 

1391 self.db.rollback() 

1392 logger.error(f"Failed to get role for {SecurityValidator.sanitize_log_message(user_email)} in team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

1393 return None 

1394 

1395 @staticmethod 

1396 def _apply_team_list_filters( 

1397 query: Any, 

1398 *, 

1399 include_personal: bool = False, 

1400 include_inactive: bool = False, 

1401 visibility_filter: Optional[str] = None, 

1402 search_query: Optional[str] = None, 

1403 personal_owner_email: Optional[str] = None, 

1404 search_description: bool = False, 

1405 ) -> Any: 

1406 """Apply common filter predicates to a team list query. 

1407 

1408 Args: 

1409 query: SQLAlchemy select statement to filter. 

1410 include_personal: Whether to include personal teams. 

1411 include_inactive: Whether to include inactive teams. 

1412 visibility_filter: Filter by visibility (private, public). 

1413 search_query: Optional search term for name/slug (and description if *search_description*). 

1414 personal_owner_email: When set (and *include_personal* is False), include this user's personal team. 

1415 search_description: Whether to include ``EmailTeam.description`` in the search predicate. 

1416 

1417 Returns: 

1418 The filtered query. 

1419 """ 

1420 if not include_personal: 

1421 if personal_owner_email: 

1422 query = query.where( 

1423 or_( 

1424 EmailTeam.is_personal.is_(False), 

1425 and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email), 

1426 ) 

1427 ) 

1428 else: 

1429 query = query.where(EmailTeam.is_personal.is_(False)) 

1430 

1431 if not include_inactive: 

1432 query = query.where(EmailTeam.is_active.is_(True)) 

1433 

1434 if visibility_filter: 

1435 query = query.where(EmailTeam.visibility == visibility_filter) 

1436 

1437 if search_query: 

1438 search_term = f"%{search_query}%" 

1439 predicates = [EmailTeam.name.ilike(search_term), EmailTeam.slug.ilike(search_term)] 

1440 if search_description: 

1441 predicates.append(EmailTeam.description.ilike(search_term)) 

1442 query = query.where(or_(*predicates)) 

1443 

1444 return query 

1445 

1446 async def list_teams( 

1447 self, 

1448 # Unified pagination params 

1449 limit: int = 100, 

1450 offset: int = 0, 

1451 cursor: Optional[str] = None, 

1452 page: Optional[int] = None, 

1453 per_page: int = 50, 

1454 include_inactive: bool = False, 

1455 visibility_filter: Optional[str] = None, 

1456 base_url: Optional[str] = None, 

1457 include_personal: bool = False, 

1458 search_query: Optional[str] = None, 

1459 personal_owner_email: Optional[str] = None, 

1460 ) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]: 

1461 """List teams with pagination support (cursor or page based). 

1462 

1463 Args: 

1464 limit: Max items for cursor pagination 

1465 offset: Offset for legacy/cursor pagination 

1466 cursor: Cursor token 

1467 page: Page number (1-indexed) 

1468 per_page: Items per page 

1469 include_inactive: Whether to include inactive teams 

1470 visibility_filter: Filter by visibility (private, team, public) 

1471 base_url: Base URL for pagination links 

1472 include_personal: Whether to include personal teams 

1473 search_query: Search term for name/slug/description 

1474 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams 

1475 

1476 Returns: 

1477 Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]: 

1478 - Tuple (teams, next_cursor) if cursor/offset based 

1479 - Dict {data, pagination, links} if page based 

1480 """ 

1481 query = self._apply_team_list_filters( 

1482 select(EmailTeam), 

1483 include_personal=include_personal, 

1484 include_inactive=include_inactive, 

1485 visibility_filter=visibility_filter, 

1486 search_query=search_query, 

1487 personal_owner_email=personal_owner_email, 

1488 search_description=True, 

1489 ) 

1490 

1491 # Choose ordering based on pagination mode: 

1492 # - Page-based (UI): alphabetical by name for user-friendly display 

1493 # - Cursor-based (API): created_at DESC, id DESC to match unified_paginate expectations 

1494 if page is not None: 

1495 query = query.order_by(EmailTeam.name, EmailTeam.id) 

1496 else: 

1497 query = query.order_by(desc(EmailTeam.created_at), desc(EmailTeam.id)) 

1498 

1499 # Base URL for pagination links (default to admin partial if not provided) 

1500 if not base_url: 

1501 base_url = f"{settings.app_root_path}/admin/teams/partial" 

1502 

1503 # Apply offset manually for legacy offset-based pagination if not using page or cursor 

1504 if not page and not cursor and offset > 0: 

1505 query = query.offset(offset) 

1506 

1507 result = await unified_paginate( 

1508 db=self.db, 

1509 query=query, 

1510 cursor=cursor, 

1511 limit=limit, 

1512 page=page, 

1513 per_page=per_page, 

1514 base_url=base_url, 

1515 ) 

1516 self.db.commit() # Release transaction to avoid idle-in-transaction 

1517 return result 

1518 

1519 async def get_all_team_ids( 

1520 self, 

1521 include_inactive: bool = False, 

1522 visibility_filter: Optional[str] = None, 

1523 include_personal: bool = False, 

1524 search_query: Optional[str] = None, 

1525 personal_owner_email: Optional[str] = None, 

1526 ) -> List[int]: 

1527 """Get all team IDs matching criteria (unpaginated). 

1528 

1529 Args: 

1530 include_inactive: Whether to include inactive teams 

1531 visibility_filter: Filter by visibility (private, team, public) 

1532 include_personal: Whether to include personal teams 

1533 search_query: Search term for name/slug 

1534 personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams 

1535 

1536 Returns: 

1537 List[int]: List of team IDs 

1538 """ 

1539 query = self._apply_team_list_filters( 

1540 select(EmailTeam.id), 

1541 include_personal=include_personal, 

1542 include_inactive=include_inactive, 

1543 visibility_filter=visibility_filter, 

1544 search_query=search_query, 

1545 personal_owner_email=personal_owner_email, 

1546 ) 

1547 

1548 result = self.db.execute(query) 

1549 team_ids = [row[0] for row in result.all()] 

1550 self.db.commit() # Release transaction to avoid idle-in-transaction 

1551 return team_ids 

1552 

1553 async def get_teams_count( 

1554 self, 

1555 include_inactive: bool = False, 

1556 visibility_filter: Optional[str] = None, 

1557 include_personal: bool = False, 

1558 search_query: Optional[str] = None, 

1559 personal_owner_email: Optional[str] = None, 

1560 ) -> int: 

1561 """Get total count of teams matching criteria. 

1562 

1563 Args: 

1564 include_inactive: Whether to include inactive teams 

1565 visibility_filter: Filter by visibility (private, team, public) 

1566 include_personal: Whether to include personal teams 

1567 search_query: Search term for name/slug 

1568 personal_owner_email: When set (and include_personal=False), includes this user's personal team in the count 

1569 

1570 Returns: 

1571 int: Total count of matching teams 

1572 """ 

1573 query = self._apply_team_list_filters( 

1574 select(func.count(EmailTeam.id)), # pylint: disable=not-callable 

1575 include_personal=include_personal, 

1576 include_inactive=include_inactive, 

1577 visibility_filter=visibility_filter, 

1578 search_query=search_query, 

1579 personal_owner_email=personal_owner_email, 

1580 ) 

1581 

1582 result = self.db.execute(query) 

1583 count = result.scalar() or 0 

1584 self.db.commit() # Release transaction to avoid idle-in-transaction 

1585 return count 

1586 

1587 async def discover_public_teams(self, user_email: str, skip: int = 0, limit: Optional[int] = None) -> List[EmailTeam]: 

1588 """Discover public teams that user can join. 

1589 

1590 Args: 

1591 user_email: Email of the user discovering teams 

1592 skip: Number of teams to skip for pagination 

1593 limit: Maximum number of teams to return (None for unlimited) 

1594 

1595 Returns: 

1596 List[EmailTeam]: List of public teams user can join 

1597 

1598 Raises: 

1599 Exception: If discovery fails 

1600 """ 

1601 try: 

1602 # Optimized: Use subquery instead of loading all IDs into memory (2 queries → 1) 

1603 user_team_subquery = select(EmailTeamMember.team_id).where(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).scalar_subquery() 

1604 

1605 query = self.db.query(EmailTeam).filter(EmailTeam.visibility == "public", EmailTeam.is_active.is_(True), EmailTeam.is_personal.is_(False), ~EmailTeam.id.in_(user_team_subquery)) 

1606 

1607 query = query.offset(skip) 

1608 if limit is not None: 

1609 query = query.limit(limit) 

1610 teams = query.all() 

1611 self.db.commit() # Release transaction to avoid idle-in-transaction 

1612 return teams 

1613 

1614 except Exception as e: 

1615 self.db.rollback() 

1616 logger.error(f"Failed to discover public teams for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1617 return [] 

1618 

1619 async def create_join_request(self, team_id: str, user_email: str, message: Optional[str] = None) -> "EmailTeamJoinRequest": 

1620 """Create a request to join a public team. 

1621 

1622 Args: 

1623 team_id: ID of the team to join 

1624 user_email: Email of the user requesting to join 

1625 message: Optional message to team owners 

1626 

1627 Returns: 

1628 EmailTeamJoinRequest: Created join request 

1629 

1630 Raises: 

1631 ValueError: If team not found, not public, or user already member/has pending request 

1632 """ 

1633 try: 

1634 # Validate team 

1635 team = await self.get_team_by_id(team_id) 

1636 if not team: 

1637 raise ValueError("Team not found") 

1638 

1639 if team.visibility != "public": 

1640 raise ValueError("Can only request to join public teams") 

1641 

1642 # Check if user is already a member 

1643 existing_member = self.db.query(EmailTeamMember).filter(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).first() 

1644 

1645 if existing_member: 

1646 raise ValueError("User is already a member of this team") 

1647 

1648 # Check max teams per user 

1649 max_teams = getattr(settings, "max_teams_per_user", 50) 

1650 if self._get_user_team_count(user_email) >= max_teams: 

1651 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

1652 

1653 # Check for existing requests (any status) 

1654 existing_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.user_email == user_email).first() 

1655 

1656 if existing_request: 

1657 if existing_request.status == "pending" and not existing_request.is_expired(): 

1658 raise ValueError("User already has a pending join request for this team") 

1659 

1660 # Update existing request (cancelled, rejected, expired) to pending 

1661 existing_request.message = message or "" 

1662 existing_request.status = "pending" 

1663 existing_request.requested_at = utc_now() 

1664 existing_request.expires_at = utc_now() + timedelta(days=7) 

1665 existing_request.reviewed_at = None 

1666 existing_request.reviewed_by = None 

1667 existing_request.notes = None 

1668 join_request = existing_request 

1669 else: 

1670 # Create new join request 

1671 join_request = EmailTeamJoinRequest(team_id=team_id, user_email=user_email, message=message, expires_at=utc_now() + timedelta(days=7)) 

1672 self.db.add(join_request) 

1673 

1674 self.db.commit() 

1675 self.db.refresh(join_request) 

1676 

1677 logger.info(f"Created join request for user {SecurityValidator.sanitize_log_message(user_email)} to team {SecurityValidator.sanitize_log_message(team_id)}") 

1678 return join_request 

1679 

1680 except ValueError: 

1681 self.db.rollback() 

1682 raise 

1683 except Exception as e: 

1684 self.db.rollback() 

1685 logger.error(f"Failed to create join request: {e}") 

1686 raise 

1687 

1688 async def list_join_requests(self, team_id: str) -> List["EmailTeamJoinRequest"]: 

1689 """List pending join requests for a team. 

1690 

1691 Args: 

1692 team_id: ID of the team 

1693 

1694 Returns: 

1695 List[EmailTeamJoinRequest]: List of pending join requests 

1696 """ 

1697 try: 

1698 requests = ( 

1699 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.team_id == team_id, EmailTeamJoinRequest.status == "pending").order_by(EmailTeamJoinRequest.requested_at.desc()).all() 

1700 ) 

1701 return requests 

1702 

1703 except Exception as e: 

1704 logger.error(f"Failed to list join requests for team {SecurityValidator.sanitize_log_message(team_id)}: {e}") 

1705 return [] 

1706 

1707 async def approve_join_request(self, request_id: str, approved_by: str) -> Optional[EmailTeamMember]: 

1708 """Approve a team join request. 

1709 

1710 Args: 

1711 request_id: ID of the join request 

1712 approved_by: Email of the user approving the request 

1713 

1714 Returns: 

1715 EmailTeamMember: New team member or None if request not found 

1716 

1717 Raises: 

1718 ValueError: If request not found, expired, or already processed 

1719 """ 

1720 try: 

1721 # Get join request 

1722 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first() 

1723 

1724 if not join_request: 

1725 raise ValueError("Join request not found or already processed") 

1726 

1727 if join_request.is_expired(): 

1728 join_request.status = "expired" 

1729 self.db.commit() 

1730 raise ValueError("Join request has expired") 

1731 

1732 # Check max teams per user 

1733 max_teams = getattr(settings, "max_teams_per_user", 50) 

1734 if self._get_user_team_count(join_request.user_email) >= max_teams: 

1735 raise ValueError(f"User has reached the maximum team limit of {max_teams}") 

1736 

1737 # Check team member limit 

1738 team = await self.get_team_by_id(join_request.team_id) 

1739 if not team: 

1740 raise ValueError(f"Team {join_request.team_id} not found or inactive") 

1741 check_team_member_capacity(self.db, team) 

1742 

1743 # Add user to team 

1744 member = EmailTeamMember(team_id=join_request.team_id, user_email=join_request.user_email, role="member", invited_by=approved_by, joined_at=utc_now()) # New joiners are always members 

1745 

1746 self.db.add(member) 

1747 # Update join request status 

1748 join_request.status = "approved" 

1749 join_request.reviewed_at = utc_now() 

1750 join_request.reviewed_by = approved_by 

1751 

1752 self.db.flush() 

1753 self._log_team_member_action(member.id, join_request.team_id, join_request.user_email, member.role, "added", approved_by) 

1754 

1755 self.db.refresh(member) 

1756 

1757 await self._assign_team_rbac_role(join_request.user_email, join_request.team_id, member.role, granted_by=approved_by) 

1758 self._invalidate_membership_caches(join_request.user_email, join_request.team_id) 

1759 await self.invalidate_team_member_count_cache(str(join_request.team_id)) 

1760 

1761 logger.info(f"Approved join request {request_id}: user {join_request.user_email} joined team {join_request.team_id}") 

1762 return member 

1763 

1764 except ValueError: 

1765 self.db.rollback() 

1766 raise 

1767 except Exception as e: 

1768 self.db.rollback() 

1769 logger.error(f"Failed to approve join request {request_id}: {e}") 

1770 raise 

1771 

1772 async def reject_join_request(self, request_id: str, rejected_by: str) -> bool: 

1773 """Reject a team join request. 

1774 

1775 Args: 

1776 request_id: ID of the join request 

1777 rejected_by: Email of the user rejecting the request 

1778 

1779 Returns: 

1780 bool: True if request was rejected successfully 

1781 

1782 Raises: 

1783 ValueError: If request not found or already processed 

1784 """ 

1785 try: 

1786 # Get join request 

1787 join_request = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.status == "pending").first() 

1788 

1789 if not join_request: 

1790 raise ValueError("Join request not found or already processed") 

1791 

1792 # Update join request status 

1793 join_request.status = "rejected" 

1794 join_request.reviewed_at = utc_now() 

1795 join_request.reviewed_by = rejected_by 

1796 

1797 self.db.commit() 

1798 

1799 logger.info(f"Rejected join request {request_id}: user {join_request.user_email} for team {join_request.team_id}") 

1800 return True 

1801 

1802 except Exception as e: 

1803 self.db.rollback() 

1804 logger.error(f"Failed to reject join request {request_id}: {e}") 

1805 raise 

1806 

1807 async def get_user_join_requests(self, user_email: str, team_id: Optional[str] = None) -> List["EmailTeamJoinRequest"]: 

1808 """Get join requests made by a user. 

1809 

1810 Args: 

1811 user_email: Email of the user 

1812 team_id: Optional team ID to filter requests 

1813 

1814 Returns: 

1815 List[EmailTeamJoinRequest]: List of join requests made by the user 

1816 

1817 Examples: 

1818 Get all requests made by a user or for a specific team. 

1819 """ 

1820 try: 

1821 query = self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email) 

1822 

1823 if team_id: 

1824 query = query.filter(EmailTeamJoinRequest.team_id == team_id) 

1825 

1826 requests = query.all() 

1827 return requests 

1828 

1829 except Exception as e: 

1830 logger.error(f"Failed to get join requests for user {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1831 return [] 

1832 

1833 async def cancel_join_request(self, request_id: str, user_email: str) -> bool: 

1834 """Cancel a join request. 

1835 

1836 Args: 

1837 request_id: ID of the join request to cancel 

1838 user_email: Email of the user canceling the request 

1839 

1840 Returns: 

1841 bool: True if canceled successfully, False otherwise 

1842 

1843 Examples: 

1844 Allow users to cancel their pending join requests. 

1845 """ 

1846 try: 

1847 # Get the join request 

1848 join_request = ( 

1849 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.id == request_id, EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.status == "pending").first() 

1850 ) 

1851 

1852 if not join_request: 

1853 logger.warning(f"Join request {request_id} not found for user {SecurityValidator.sanitize_log_message(user_email)} or not pending") 

1854 return False 

1855 

1856 # Update join request status 

1857 join_request.status = "cancelled" 

1858 join_request.reviewed_at = utc_now() 

1859 join_request.reviewed_by = user_email 

1860 

1861 self.db.commit() 

1862 

1863 logger.info(f"Cancelled join request {request_id} by user {SecurityValidator.sanitize_log_message(user_email)}") 

1864 return True 

1865 

1866 except Exception as e: 

1867 self.db.rollback() 

1868 logger.error(f"Failed to cancel join request {request_id}: {e}") 

1869 return False 

1870 

1871 # ================================================================================== 

1872 # Batch Query Methods (N+1 Query Elimination - Issue #1892) 

1873 # ================================================================================== 

1874 

1875 def get_member_counts_batch(self, team_ids: List[str]) -> Dict[str, int]: 

1876 """Get member counts for multiple teams in a single query. 

1877 

1878 This is a synchronous method following the existing service pattern. 

1879 Note: Like other sync SQLAlchemy calls, this will block the event 

1880 loop in async contexts. For typical team counts this is acceptable. 

1881 

1882 Args: 

1883 team_ids: List of team UUIDs 

1884 

1885 Returns: 

1886 Dict mapping team_id to member count 

1887 

1888 Raises: 

1889 Exception: Re-raises any database errors after rollback 

1890 

1891 Examples: 

1892 >>> from unittest.mock import Mock 

1893 >>> service = TeamManagementService(Mock()) 

1894 >>> service.get_member_counts_batch([]) 

1895 {} 

1896 """ 

1897 if not team_ids: 

1898 return {} 

1899 

1900 try: 

1901 # Single query for all teams 

1902 results = ( 

1903 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable 

1904 .filter(EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True)) 

1905 .group_by(EmailTeamMember.team_id) 

1906 .all() 

1907 ) 

1908 

1909 self.db.commit() # Release transaction to avoid idle-in-transaction 

1910 

1911 # Build result dict, defaulting to 0 for teams with no members 

1912 counts = {str(row.team_id): row.count for row in results} 

1913 return {tid: counts.get(tid, 0) for tid in team_ids} 

1914 except Exception as e: 

1915 self.db.rollback() 

1916 logger.error(f"Failed to get member counts for teams: {e}") 

1917 raise 

1918 

1919 def get_user_roles_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[str]]: 

1920 """Get a user's role in multiple teams in a single query. 

1921 

1922 Args: 

1923 user_email: Email of the user 

1924 team_ids: List of team UUIDs 

1925 

1926 Returns: 

1927 Dict mapping team_id to role (or None if not a member) 

1928 

1929 Raises: 

1930 Exception: Re-raises any database errors after rollback 

1931 """ 

1932 if not team_ids: 

1933 return {} 

1934 

1935 try: 

1936 # Single query for all teams 

1937 results = ( 

1938 self.db.query(EmailTeamMember.team_id, EmailTeamMember.role) 

1939 .filter(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id.in_(team_ids), EmailTeamMember.is_active.is_(True)) 

1940 .all() 

1941 ) 

1942 

1943 self.db.commit() # Release transaction to avoid idle-in-transaction 

1944 

1945 # Build result dict - teams with no membership return None 

1946 roles = {str(row.team_id): row.role for row in results} 

1947 return {tid: roles.get(tid) for tid in team_ids} 

1948 except Exception as e: 

1949 self.db.rollback() 

1950 logger.error(f"Failed to get user roles for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1951 raise 

1952 

1953 def get_pending_join_requests_batch(self, user_email: str, team_ids: List[str]) -> Dict[str, Optional[Any]]: 

1954 """Get pending join requests for a user across multiple teams in a single query. 

1955 

1956 Args: 

1957 user_email: Email of the user 

1958 team_ids: List of team UUIDs to check 

1959 

1960 Returns: 

1961 Dict mapping team_id to pending EmailTeamJoinRequest (or None if no pending request) 

1962 

1963 Raises: 

1964 Exception: Re-raises any database errors after rollback 

1965 """ 

1966 if not team_ids: 

1967 return {} 

1968 

1969 try: 

1970 # Single query for all pending requests across teams 

1971 results = ( 

1972 self.db.query(EmailTeamJoinRequest).filter(EmailTeamJoinRequest.user_email == user_email, EmailTeamJoinRequest.team_id.in_(team_ids), EmailTeamJoinRequest.status == "pending").all() 

1973 ) 

1974 

1975 self.db.commit() # Release transaction to avoid idle-in-transaction 

1976 

1977 # Build result dict - only one pending request per team expected 

1978 pending_reqs = {str(req.team_id): req for req in results} 

1979 return {tid: pending_reqs.get(tid) for tid in team_ids} 

1980 except Exception as e: 

1981 self.db.rollback() 

1982 logger.error(f"Failed to get pending join requests for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

1983 raise 

1984 

1985 # ================================================================================== 

1986 # Cached Batch Methods (Redis caching for member counts) 

1987 # ================================================================================== 

1988 

1989 def _get_member_count_cache_key(self, team_id: str) -> str: 

1990 """Build cache key using settings.cache_prefix for consistency. 

1991 

1992 Args: 

1993 team_id: Team UUID to build cache key for 

1994 

1995 Returns: 

1996 Cache key string in format "{prefix}team:member_count:{team_id}" 

1997 """ 

1998 cache_prefix = getattr(settings, "cache_prefix", "mcpgw:") 

1999 return f"{cache_prefix}team:member_count:{team_id}" 

2000 

2001 async def get_member_counts_batch_cached(self, team_ids: List[str]) -> Dict[str, int]: 

2002 """Get member counts for multiple teams, using Redis cache with DB fallback. 

2003 

2004 Caching behavior is controlled by settings: 

2005 - team_member_count_cache_enabled: Enable/disable caching (default: True) 

2006 - team_member_count_cache_ttl: Cache TTL in seconds (default: 300) 

2007 

2008 Args: 

2009 team_ids: List of team UUIDs 

2010 

2011 Returns: 

2012 Dict mapping team_id to member count 

2013 

2014 Raises: 

2015 Exception: Re-raises any database errors after rollback 

2016 """ 

2017 if not team_ids: 

2018 return {} 

2019 

2020 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True) 

2021 cache_ttl = getattr(settings, "team_member_count_cache_ttl", 300) 

2022 

2023 # If caching disabled, go straight to batch DB query 

2024 if not cache_enabled: 

2025 return self.get_member_counts_batch(team_ids) 

2026 

2027 try: 

2028 redis_client = await get_redis_client() 

2029 except Exception: 

2030 redis_client = None 

2031 

2032 result: Dict[str, int] = {} 

2033 cache_misses: List[str] = [] 

2034 

2035 # Step 1: Check Redis cache for all team IDs 

2036 if redis_client: 

2037 try: 

2038 cache_keys = [self._get_member_count_cache_key(tid) for tid in team_ids] 

2039 cached_values = await redis_client.mget(cache_keys) 

2040 

2041 for tid, cached in zip(team_ids, cached_values): 

2042 if cached is not None: 

2043 result[tid] = int(cached) 

2044 else: 

2045 cache_misses.append(tid) 

2046 except Exception as e: 

2047 logger.warning(f"Redis cache read failed, falling back to DB: {e}") 

2048 cache_misses = list(team_ids) 

2049 else: 

2050 # No Redis available, fall back to DB 

2051 cache_misses = list(team_ids) 

2052 

2053 # Step 2: Query database for cache misses 

2054 if cache_misses: 

2055 try: 

2056 db_results = ( 

2057 self.db.query(EmailTeamMember.team_id, func.count(EmailTeamMember.id).label("count")) # pylint: disable=not-callable 

2058 .filter(EmailTeamMember.team_id.in_(cache_misses), EmailTeamMember.is_active.is_(True)) 

2059 .group_by(EmailTeamMember.team_id) 

2060 .all() 

2061 ) 

2062 

2063 self.db.commit() 

2064 

2065 db_counts = {str(row.team_id): row.count for row in db_results} 

2066 

2067 # Fill in results and cache them 

2068 for tid in cache_misses: 

2069 count = db_counts.get(tid, 0) 

2070 result[tid] = count 

2071 

2072 # Step 3: Cache the result with configured TTL 

2073 if redis_client: 

2074 try: 

2075 await redis_client.setex(self._get_member_count_cache_key(tid), cache_ttl, str(count)) 

2076 except Exception as e: 

2077 logger.warning(f"Redis cache write failed for team {tid}: {e}") 

2078 

2079 except Exception as e: 

2080 self.db.rollback() 

2081 logger.error(f"Failed to get member counts for teams: {e}") 

2082 raise 

2083 

2084 return result 

2085 

2086 async def invalidate_team_member_count_cache(self, team_id: str) -> None: 

2087 """Invalidate the cached member count for a team. 

2088 

2089 Call this after any membership changes (add/remove/update). 

2090 No-op if caching is disabled or Redis unavailable. 

2091 

2092 Args: 

2093 team_id: Team UUID to invalidate 

2094 """ 

2095 cache_enabled = getattr(settings, "team_member_count_cache_enabled", True) 

2096 if not cache_enabled: 

2097 return 

2098 

2099 try: 

2100 redis_client = await get_redis_client() 

2101 if redis_client: 

2102 await redis_client.delete(self._get_member_count_cache_key(team_id)) 

2103 except Exception as e: 

2104 logger.warning(f"Failed to invalidate member count cache for team {SecurityValidator.sanitize_log_message(team_id)}: {e}")