Coverage for mcpgateway / routers / teams.py: 100%

411 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/teams.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Team Management Router. 

8This module provides FastAPI routes for team management including 

9team creation, member management, and invitation handling. 

10 

11Examples: 

12 >>> from fastapi import FastAPI 

13 >>> from mcpgateway.routers.teams import teams_router 

14 >>> app = FastAPI() 

15 >>> app.include_router(teams_router, prefix="/teams", tags=["Teams"]) 

16 >>> isinstance(teams_router, APIRouter) 

17 True 

18 >>> len(teams_router.routes) > 10 # Multiple team management endpoints 

19 True 

20""" 

21 

22# Standard 

23from typing import Any, cast, List, Optional, Union 

24 

25# Third-Party 

26from fastapi import APIRouter, Depends, HTTPException, Query, status 

27from sqlalchemy.orm import Session 

28 

29# First-Party 

30from mcpgateway.config import settings 

31from mcpgateway.db import get_db 

32from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

33from mcpgateway.schemas import ( 

34 CursorPaginatedTeamsResponse, 

35 PaginatedTeamMembersResponse, 

36 SuccessResponse, 

37 TeamCreateRequest, 

38 TeamDiscoveryResponse, 

39 TeamInvitationResponse, 

40 TeamInviteRequest, 

41 TeamJoinRequest, 

42 TeamJoinRequestResponse, 

43 TeamListResponse, 

44 TeamMemberResponse, 

45 TeamMemberUpdateRequest, 

46 TeamResponse, 

47 TeamUpdateRequest, 

48) 

49from mcpgateway.services.logging_service import LoggingService 

50from mcpgateway.services.team_invitation_service import TeamInvitationService 

51from mcpgateway.services.team_management_service import TeamManagementService 

52 

53# Initialize logging 

54logging_service = LoggingService() 

55logger = logging_service.get_logger(__name__) 

56 

57# Create router 

58teams_router = APIRouter() 

59 

60 

61# --------------------------------------------------------------------------- 

62# Team CRUD Operations 

63# --------------------------------------------------------------------------- 

64 

65 

66@teams_router.post("/", response_model=TeamResponse, status_code=status.HTTP_201_CREATED) 

67@require_permission("teams.create") 

68async def create_team(request: TeamCreateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamResponse: 

69 """Create a new team. 

70 

71 Args: 

72 request: Team creation request data 

73 current_user_ctx: Currently authenticated user context 

74 db: Database session 

75 

76 Returns: 

77 TeamResponse: Created team data 

78 

79 Raises: 

80 HTTPException: If team creation fails 

81 

82 Examples: 

83 >>> import asyncio 

84 >>> asyncio.iscoroutinefunction(create_team) 

85 True 

86 """ 

87 try: 

88 service = TeamManagementService(db) 

89 team = await service.create_team(name=request.name, description=request.description, created_by=current_user_ctx["email"], visibility=request.visibility, max_members=request.max_members) 

90 

91 # Build response BEFORE closing session to avoid lazy-load issues with get_member_count() 

92 response = TeamResponse( 

93 id=team.id, 

94 name=team.name, 

95 slug=team.slug, 

96 description=team.description, 

97 created_by=team.created_by, 

98 is_personal=team.is_personal, 

99 visibility=team.visibility, 

100 max_members=team.max_members, 

101 member_count=team.get_member_count(), 

102 created_at=team.created_at, 

103 updated_at=team.updated_at, 

104 is_active=team.is_active, 

105 ) 

106 db.commit() 

107 db.close() 

108 return response 

109 except ValueError as e: 

110 logger.error(f"Team creation failed: {e}") 

111 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

112 except Exception as e: 

113 logger.error(f"Unexpected error creating team: {e}") 

114 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create team") 

115 

116 

117@teams_router.get("/", response_model=Union[TeamListResponse, CursorPaginatedTeamsResponse]) 

118@require_permission("teams.read") 

119async def list_teams( 

120 skip: int = Query(0, ge=0, description="Number of teams to skip"), 

121 limit: int = Query(50, ge=1, le=100, description="Number of teams to return"), 

122 cursor: Optional[str] = Query(None, description="Pagination cursor"), 

123 include_pagination: bool = Query(False, description="Include pagination metadata (cursor)"), 

124 current_user_ctx: dict = Depends(get_current_user_with_permissions), 

125 db: Session = Depends(get_db), 

126) -> Union[TeamListResponse, CursorPaginatedTeamsResponse]: 

127 """List teams visible to the caller. 

128 

129 - Administrators see all non-personal teams (paginated) 

130 - Regular users see only teams they are a member of (paginated client-side) 

131 

132 Args: 

133 skip: Number of teams to skip for pagination 

134 limit: Maximum number of teams to return 

135 cursor: Pagination cursor 

136 include_pagination: Include pagination metadata 

137 current_user_ctx: Current user context with permissions and database session 

138 db: Database session 

139 

140 Returns: 

141 Union[TeamListResponse, CursorPaginatedTeamsResponse]: List of teams 

142 

143 Raises: 

144 HTTPException: If there's an error listing teams 

145 """ 

146 try: 

147 service = TeamManagementService(db) 

148 

149 teams_data = [] 

150 next_cursor = None 

151 total = 0 

152 

153 if current_user_ctx.get("is_admin"): 

154 # Use updated list_teams logic 

155 # If current request uses offset (skip), mapped to offset. 

156 # If cursor, mapped to cursor. 

157 # page is None, so returns Tuple 

158 result = await service.list_teams( 

159 limit=limit, 

160 offset=skip, 

161 cursor=cursor, 

162 ) 

163 # Result is tuple (list, next_cursor) 

164 teams_data, next_cursor = result 

165 

166 # Get accurate total count for API consumers 

167 total = await service.get_teams_count() 

168 else: 

169 # Fallback to user teams and apply pagination locally 

170 user_teams = await service.get_user_teams(current_user_ctx["email"], include_personal=True) 

171 total = len(user_teams) 

172 teams_data = user_teams[skip : skip + limit] 

173 

174 # Batch fetch member counts with caching (N+1 elimination) 

175 team_ids = [str(team.id) for team in teams_data] 

176 member_counts = await service.get_member_counts_batch_cached(team_ids) 

177 

178 team_responses = [ 

179 TeamResponse( 

180 id=team.id, 

181 name=team.name, 

182 slug=team.slug, 

183 description=team.description, 

184 created_by=team.created_by, 

185 is_personal=team.is_personal, 

186 visibility=team.visibility, 

187 max_members=team.max_members, 

188 member_count=member_counts.get(str(team.id), 0), 

189 created_at=team.created_at, 

190 updated_at=team.updated_at, 

191 is_active=team.is_active, 

192 ) 

193 for team in teams_data 

194 ] 

195 

196 # Release transaction before response serialization 

197 db.commit() 

198 db.close() 

199 

200 if include_pagination: 

201 return CursorPaginatedTeamsResponse(teams=team_responses, nextCursor=next_cursor) 

202 

203 return TeamListResponse(teams=team_responses, total=total) 

204 except Exception as e: 

205 logger.error(f"Error listing teams: {e}") 

206 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list teams") 

207 

208 

209@teams_router.get("/discover", response_model=List[TeamDiscoveryResponse]) 

210@require_permission("teams.read") 

211async def discover_public_teams( 

212 skip: int = Query(0, ge=0, description="Number of teams to skip"), 

213 limit: int = Query(50, ge=1, le=100, description="Number of teams to return"), 

214 current_user_ctx: dict = Depends(get_current_user_with_permissions), 

215 db: Session = Depends(get_db), 

216) -> List[TeamDiscoveryResponse]: 

217 """Discover public teams that can be joined. 

218 

219 Returns public teams that are discoverable to all authenticated users. 

220 Only shows teams where the current user is not already a member. 

221 

222 Args: 

223 skip: Number of teams to skip for pagination 

224 limit: Maximum number of teams to return 

225 current_user_ctx: Current user context with permissions and database session 

226 db: Database session 

227 

228 Returns: 

229 List[TeamDiscoveryResponse]: List of discoverable public teams 

230 

231 Raises: 

232 HTTPException: If there's an error discovering teams 

233 """ 

234 try: 

235 team_service = TeamManagementService(db) 

236 

237 # Get public teams where user is not already a member 

238 public_teams = await team_service.discover_public_teams(current_user_ctx["email"], skip=skip, limit=limit) 

239 

240 # Batch fetch member counts with caching (N+1 elimination) 

241 team_ids = [str(team.id) for team in public_teams] 

242 member_counts = await team_service.get_member_counts_batch_cached(team_ids) 

243 

244 discovery_responses = [] 

245 for team in public_teams: 

246 discovery_responses.append( 

247 TeamDiscoveryResponse( 

248 id=team.id, 

249 name=team.name, 

250 description=team.description, 

251 member_count=member_counts.get(str(team.id), 0), 

252 created_at=team.created_at, 

253 is_joinable=True, # All returned teams are joinable 

254 ) 

255 ) 

256 

257 # Release transaction before response serialization 

258 db.commit() 

259 db.close() 

260 

261 return discovery_responses 

262 except Exception as e: 

263 logger.error(f"Error discovering public teams: {e}") 

264 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to discover teams") 

265 

266 

267@teams_router.get("/{team_id}", response_model=TeamResponse) 

268@require_permission("teams.read") 

269async def get_team(team_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamResponse: 

270 """Get a specific team by ID. 

271 

272 Args: 

273 team_id: Team UUID 

274 current_user: Authenticated user context dict with email and permissions 

275 db: Database session 

276 

277 Returns: 

278 TeamResponse: Team data 

279 

280 Raises: 

281 HTTPException: If team not found or access denied 

282 """ 

283 try: 

284 service = TeamManagementService(db) 

285 team = await service.get_team_by_id(team_id) 

286 

287 if not team: 

288 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

289 

290 # Check if user has access to the team 

291 user_role = await service.get_user_role_in_team(current_user["email"], team_id) 

292 if not user_role: 

293 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to team") 

294 

295 team_obj = cast(Any, team) 

296 # Build response BEFORE closing session to avoid lazy-load issues with get_member_count() 

297 response = TeamResponse( 

298 id=team_obj.id, 

299 name=team_obj.name, 

300 slug=team_obj.slug, 

301 description=team_obj.description, 

302 created_by=team_obj.created_by, 

303 is_personal=team_obj.is_personal, 

304 visibility=team_obj.visibility, 

305 max_members=team_obj.max_members, 

306 member_count=team_obj.get_member_count(), 

307 created_at=team_obj.created_at, 

308 updated_at=team_obj.updated_at, 

309 is_active=team_obj.is_active, 

310 ) 

311 db.commit() 

312 db.close() 

313 return response 

314 except HTTPException: 

315 raise 

316 except Exception as e: 

317 logger.error(f"Error getting team {team_id}: {e}") 

318 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to get team") 

319 

320 

321@teams_router.put("/{team_id}", response_model=TeamResponse) 

322@require_permission("teams.update") 

323async def update_team(team_id: str, request: TeamUpdateRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamResponse: 

324 """Update a team. 

325 

326 Args: 

327 team_id: Team UUID 

328 request: Team update request data 

329 current_user: Authenticated user context dict with email and permissions 

330 db: Database session 

331 

332 Returns: 

333 TeamResponse: Updated team data 

334 

335 Raises: 

336 HTTPException: If team not found, access denied, or update fails 

337 """ 

338 try: 

339 service = TeamManagementService(db) 

340 

341 # Check if user is team owner 

342 role = await service.get_user_role_in_team(current_user["email"], team_id) 

343 if role != "owner": 

344 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") 

345 

346 success = await service.update_team(team_id=team_id, name=request.name, description=request.description, visibility=request.visibility, max_members=request.max_members) 

347 

348 if not success: 

349 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found or update failed") 

350 

351 # Fetch the updated team to build the response 

352 team = await service.get_team_by_id(team_id) 

353 if not team: 

354 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found after update") 

355 

356 team_obj = cast(Any, team) 

357 # Build response BEFORE closing session to avoid lazy-load issues with get_member_count() 

358 response = TeamResponse( 

359 id=team_obj.id, 

360 name=team_obj.name, 

361 slug=team_obj.slug, 

362 description=team_obj.description, 

363 created_by=team_obj.created_by, 

364 is_personal=team_obj.is_personal, 

365 visibility=team_obj.visibility, 

366 max_members=team_obj.max_members, 

367 member_count=team_obj.get_member_count(), 

368 created_at=team_obj.created_at, 

369 updated_at=team_obj.updated_at, 

370 is_active=team_obj.is_active, 

371 ) 

372 db.commit() 

373 db.close() 

374 return response 

375 except HTTPException: 

376 raise 

377 except ValueError as e: 

378 logger.error(f"Team update failed: {e}") 

379 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

380 except Exception as e: 

381 logger.error(f"Error updating team {team_id}: {e}") 

382 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update team") 

383 

384 

385@teams_router.delete("/{team_id}", response_model=SuccessResponse) 

386@require_permission("teams.delete") 

387async def delete_team(team_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> SuccessResponse: 

388 """Delete a team. 

389 

390 Args: 

391 team_id: Team UUID 

392 current_user: Authenticated user context dict with email and permissions 

393 db: Database session 

394 

395 Returns: 

396 SuccessResponse: Success confirmation 

397 

398 Raises: 

399 HTTPException: If team not found, access denied, or deletion fails 

400 """ 

401 try: 

402 service = TeamManagementService(db) 

403 

404 # Check if user is team owner 

405 role = await service.get_user_role_in_team(current_user["email"], team_id) 

406 if role != "owner": 

407 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can delete teams") 

408 

409 success = await service.delete_team(team_id, current_user["email"]) 

410 if not success: 

411 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

412 

413 db.commit() 

414 db.close() 

415 return SuccessResponse(message="Team deleted successfully") 

416 except HTTPException: 

417 raise 

418 except Exception as e: 

419 logger.error(f"Error deleting team {team_id}: {e}") 

420 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete team") 

421 

422 

423# --------------------------------------------------------------------------- 

424# Team Member Management 

425# --------------------------------------------------------------------------- 

426 

427 

428@teams_router.get("/{team_id}/members", response_model=Union[PaginatedTeamMembersResponse, List[TeamMemberResponse]]) 

429@require_permission("teams.read") 

430async def list_team_members( 

431 team_id: str, 

432 cursor: Optional[str] = Query(None, description="Cursor for pagination"), 

433 limit: Optional[int] = Query(None, ge=1, le=settings.pagination_max_page_size, description="Maximum number of members to return (default: 50)"), 

434 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"), 

435 current_user: dict = Depends(get_current_user_with_permissions), 

436 db: Session = Depends(get_db), 

437) -> Union[PaginatedTeamMembersResponse, List[TeamMemberResponse]]: 

438 """List team members with cursor-based pagination. 

439 

440 Args: 

441 team_id: Team UUID 

442 cursor: Pagination cursor for fetching the next set of results 

443 limit: Maximum number of members to return (default: 50) 

444 include_pagination: Whether to include cursor pagination metadata in the response (default: false) 

445 current_user: Authenticated user context dict with email and permissions 

446 db: Database session 

447 

448 Returns: 

449 PaginatedTeamMembersResponse with members and nextCursor if include_pagination=true, or 

450 List of team members if include_pagination=false 

451 

452 Raises: 

453 HTTPException: If team not found or access denied 

454 """ 

455 try: 

456 service = TeamManagementService(db) 

457 

458 # Check if user has access to the team 

459 user_role = await service.get_user_role_in_team(current_user["email"], team_id) 

460 if not user_role: 

461 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to team") 

462 

463 # Get members - service returns different types based on parameters: 

464 # - cursor=None, limit=None: List[Tuple] (backward compat) 

465 # - cursor or limit provided: Tuple[List[Tuple], next_cursor] 

466 result = await service.get_team_members(team_id, cursor=cursor, limit=limit) 

467 

468 # Handle different return types from service 

469 if cursor is not None or limit is not None: 

470 # Cursor pagination was used - result is a tuple 

471 members, next_cursor = result 

472 else: 

473 # No pagination - result is a plain list 

474 members = result 

475 next_cursor = None 

476 

477 # Convert to response objects 

478 member_responses = [] 

479 for user, membership in members: 

480 member_responses.append( 

481 TeamMemberResponse( 

482 id=membership.id, 

483 team_id=membership.team_id, 

484 user_email=membership.user_email, 

485 role=membership.role, 

486 joined_at=membership.joined_at, 

487 invited_by=membership.invited_by, 

488 is_active=membership.is_active, 

489 ) 

490 ) 

491 

492 # Return with pagination metadata if requested 

493 db.commit() 

494 db.close() 

495 if include_pagination: 

496 return PaginatedTeamMembersResponse(members=member_responses, nextCursor=next_cursor) 

497 

498 return member_responses 

499 except HTTPException: 

500 raise 

501 except Exception as e: 

502 logger.error(f"Error listing team members for team {team_id}: {e}") 

503 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list team members") 

504 

505 

506@teams_router.put("/{team_id}/members/{user_email}", response_model=TeamMemberResponse) 

507@require_permission("teams.manage_members") 

508async def update_team_member( 

509 team_id: str, user_email: str, request: TeamMemberUpdateRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db) 

510) -> TeamMemberResponse: 

511 """Update a team member's role. 

512 

513 Args: 

514 team_id: Team UUID 

515 user_email: Email of the member to update 

516 request: Member update request data 

517 current_user: Authenticated user context dict with email and permissions 

518 db: Database session 

519 

520 Returns: 

521 TeamMemberResponse: Updated member data 

522 

523 Raises: 

524 HTTPException: If member not found, access denied, or update fails 

525 """ 

526 try: 

527 service = TeamManagementService(db) 

528 

529 # Check if user is team owner 

530 role = await service.get_user_role_in_team(current_user["email"], team_id) 

531 if role != "owner": 

532 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") 

533 

534 success = await service.update_member_role(team_id, user_email, request.role) 

535 if not success: 

536 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team member not found or update failed") 

537 

538 # Fetch the updated member to build the response 

539 member = await service.get_member(team_id, user_email) 

540 if not member: 

541 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team member not found after update") 

542 

543 mm = cast(Any, member) 

544 db.commit() 

545 db.close() 

546 return TeamMemberResponse(id=mm.id, team_id=mm.team_id, user_email=mm.user_email, role=mm.role, joined_at=mm.joined_at, invited_by=mm.invited_by, is_active=mm.is_active) 

547 except HTTPException: 

548 raise 

549 except ValueError as e: 

550 logger.error(f"Member update failed: {e}") 

551 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

552 except Exception as e: 

553 logger.error(f"Error updating team member {user_email} in team {team_id}: {e}") 

554 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update team member") 

555 

556 

557@teams_router.delete("/{team_id}/members/{user_email}", response_model=SuccessResponse) 

558@require_permission("teams.manage_members") 

559async def remove_team_member(team_id: str, user_email: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> SuccessResponse: 

560 """Remove a team member. 

561 

562 Args: 

563 team_id: Team UUID 

564 user_email: Email of the member to remove 

565 current_user: Authenticated user context dict with email and permissions 

566 db: Database session 

567 

568 Returns: 

569 SuccessResponse: Success confirmation 

570 

571 Raises: 

572 HTTPException: If member not found, access denied, or removal fails 

573 """ 

574 try: 

575 service = TeamManagementService(db) 

576 

577 # Users can remove themselves, or owners can remove others 

578 current_user_role = await service.get_user_role_in_team(current_user["email"], team_id) 

579 if current_user["email"] != user_email and current_user_role != "owner": 

580 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") 

581 

582 success = await service.remove_member_from_team(team_id, user_email) 

583 if not success: 

584 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team member not found") 

585 

586 db.commit() 

587 db.close() 

588 return SuccessResponse(message="Team member removed successfully") 

589 except HTTPException: 

590 raise 

591 except Exception as e: 

592 logger.error(f"Error removing team member {user_email} from team {team_id}: {e}") 

593 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to remove team member") 

594 

595 

596# --------------------------------------------------------------------------- 

597# Team Invitations 

598# --------------------------------------------------------------------------- 

599 

600 

601@teams_router.post("/{team_id}/invitations", response_model=TeamInvitationResponse, status_code=status.HTTP_201_CREATED) 

602@require_permission("teams.manage_members") 

603async def invite_team_member(team_id: str, request: TeamInviteRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamInvitationResponse: 

604 """Invite a user to join a team. 

605 

606 Args: 

607 team_id: Team UUID 

608 request: Invitation request data 

609 current_user: Authenticated user context dict with email and permissions 

610 db: Database session 

611 

612 Returns: 

613 TeamInvitationResponse: Created invitation data 

614 

615 Raises: 

616 HTTPException: If team not found, access denied, or invitation fails 

617 """ 

618 try: 

619 team_service = TeamManagementService(db) 

620 invitation_service = TeamInvitationService(db) 

621 

622 # Check if user is team owner 

623 role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

624 if role != "owner": 

625 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") 

626 

627 invitation = await invitation_service.create_invitation(team_id=team_id, email=str(request.email), role=request.role, invited_by=current_user["email"]) 

628 if not invitation: 

629 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create invitation") 

630 

631 # Get team name for response 

632 team = await team_service.get_team_by_id(team_id) 

633 team_name = team.name if team else "Unknown Team" 

634 

635 db.commit() 

636 db.close() 

637 return TeamInvitationResponse( 

638 id=invitation.id, 

639 team_id=invitation.team_id, 

640 team_name=team_name, 

641 email=invitation.email, 

642 role=invitation.role, 

643 invited_by=invitation.invited_by, 

644 invited_at=invitation.invited_at, 

645 expires_at=invitation.expires_at, 

646 token=invitation.token, 

647 is_active=invitation.is_active, 

648 is_expired=invitation.is_expired(), 

649 ) 

650 except HTTPException: 

651 raise 

652 except ValueError as e: 

653 logger.error(f"Team invitation failed: {e}") 

654 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

655 except Exception as e: 

656 logger.error(f"Error creating team invitation for team {team_id}: {e}") 

657 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create invitation") 

658 

659 

660@teams_router.get("/{team_id}/invitations", response_model=List[TeamInvitationResponse]) 

661@require_permission("teams.read") 

662async def list_team_invitations(team_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> List[TeamInvitationResponse]: 

663 """List team invitations. 

664 

665 Args: 

666 team_id: Team UUID 

667 current_user: Authenticated user context dict with email and permissions 

668 db: Database session 

669 

670 Returns: 

671 List[TeamInvitationResponse]: List of team invitations 

672 

673 Raises: 

674 HTTPException: If team not found or access denied 

675 """ 

676 try: 

677 team_service = TeamManagementService(db) 

678 invitation_service = TeamInvitationService(db) 

679 

680 # Check if user is team owner 

681 role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

682 if role != "owner": 

683 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") 

684 

685 invitations = await invitation_service.get_team_invitations(team_id) 

686 

687 # Get team name for responses 

688 team = await team_service.get_team_by_id(team_id) 

689 team_name = team.name if team else "Unknown Team" 

690 

691 invitation_responses = [] 

692 for invitation in invitations: 

693 invitation_responses.append( 

694 TeamInvitationResponse( 

695 id=invitation.id, 

696 team_id=invitation.team_id, 

697 team_name=team_name, 

698 email=invitation.email, 

699 role=invitation.role, 

700 invited_by=invitation.invited_by, 

701 invited_at=invitation.invited_at, 

702 expires_at=invitation.expires_at, 

703 token=invitation.token, 

704 is_active=invitation.is_active, 

705 is_expired=invitation.is_expired(), 

706 ) 

707 ) 

708 

709 db.commit() 

710 db.close() 

711 return invitation_responses 

712 except HTTPException: 

713 raise 

714 except Exception as e: 

715 logger.error(f"Error listing team invitations for team {team_id}: {e}") 

716 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list invitations") 

717 

718 

719@teams_router.post("/invitations/{token}/accept", response_model=TeamMemberResponse) 

720@require_permission("teams.read") 

721async def accept_team_invitation(token: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamMemberResponse: 

722 """Accept a team invitation. 

723 

724 Args: 

725 token: Invitation token 

726 current_user: Authenticated user context dict with email and permissions 

727 db: Database session 

728 

729 Returns: 

730 TeamMemberResponse: New team member data 

731 

732 Raises: 

733 HTTPException: If invitation not found, expired, or acceptance fails 

734 """ 

735 try: 

736 invitation_service = TeamInvitationService(db) 

737 

738 member = await invitation_service.accept_invitation(token, current_user["email"]) 

739 if not member or not hasattr(member, "id"): 

740 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid or expired invitation") 

741 

742 mm = cast(Any, member) 

743 db.commit() 

744 db.close() 

745 return TeamMemberResponse(id=mm.id, team_id=mm.team_id, user_email=mm.user_email, role=mm.role, joined_at=mm.joined_at, invited_by=mm.invited_by, is_active=mm.is_active) 

746 except HTTPException: 

747 raise 

748 except ValueError as e: 

749 logger.error(f"Invitation acceptance failed: {e}") 

750 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

751 except Exception as e: 

752 logger.error(f"Error accepting invitation {token}: {e}") 

753 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to accept invitation") 

754 

755 

756@teams_router.delete("/invitations/{invitation_id}", response_model=SuccessResponse) 

757@require_permission("teams.manage_members") 

758async def cancel_team_invitation(invitation_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> SuccessResponse: 

759 """Cancel a team invitation. 

760 

761 Args: 

762 invitation_id: Invitation UUID 

763 current_user: Authenticated user context dict with email and permissions 

764 db: Database session 

765 

766 Returns: 

767 SuccessResponse: Success confirmation 

768 

769 Raises: 

770 HTTPException: If invitation not found, access denied, or cancellation fails 

771 """ 

772 try: 

773 team_service = TeamManagementService(db) 

774 invitation_service = TeamInvitationService(db) 

775 

776 # Get invitation to check team permissions 

777 # First-Party 

778 from mcpgateway.db import EmailTeamInvitation 

779 

780 invitation = db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id).first() 

781 if not invitation: 

782 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found") 

783 

784 # Check if user is team owner or the inviter 

785 role = await team_service.get_user_role_in_team(current_user["email"], invitation.team_id) 

786 if role != "owner" and current_user["email"] != invitation.invited_by: 

787 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") 

788 

789 success = await invitation_service.revoke_invitation(invitation_id, current_user["email"]) 

790 if not success: 

791 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found") 

792 

793 db.commit() 

794 db.close() 

795 return SuccessResponse(message="Team invitation cancelled successfully") 

796 except HTTPException: 

797 raise 

798 except Exception as e: 

799 logger.error(f"Error cancelling invitation {invitation_id}: {e}") 

800 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to cancel invitation") 

801 

802 

803@teams_router.post("/{team_id}/join", response_model=TeamJoinRequestResponse) 

804async def request_to_join_team( 

805 team_id: str, 

806 join_request: TeamJoinRequest, 

807 current_user: dict = Depends(get_current_user_with_permissions), 

808 db: Session = Depends(get_db), 

809) -> TeamJoinRequestResponse: 

810 """Request to join a public team. 

811 

812 Allows users to request membership in public teams. The request will be 

813 pending until approved by a team owner. 

814 

815 Args: 

816 team_id: ID of the team to join 

817 join_request: Join request details including optional message 

818 current_user: Currently authenticated user 

819 db: Database session 

820 

821 Returns: 

822 TeamJoinRequestResponse: Created join request details 

823 

824 Raises: 

825 HTTPException: If team not found, not public, user already member, or request fails 

826 """ 

827 try: 

828 team_service = TeamManagementService(db) 

829 

830 # Validate team exists and is public 

831 team = await team_service.get_team_by_id(team_id) 

832 if not team: 

833 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

834 

835 if team.visibility != "public": 

836 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Can only request to join public teams") 

837 

838 # Check if user is already a member 

839 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

840 if user_role: 

841 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User is already a member of this team") 

842 

843 # Create join request 

844 join_req = await team_service.create_join_request(team_id=team_id, user_email=current_user["email"], message=join_request.message) 

845 

846 db.commit() 

847 db.close() 

848 return TeamJoinRequestResponse( 

849 id=join_req.id, 

850 team_id=join_req.team_id, 

851 team_name=team.name, 

852 user_email=join_req.user_email, 

853 message=join_req.message, 

854 status=join_req.status, 

855 requested_at=join_req.requested_at, 

856 expires_at=join_req.expires_at, 

857 ) 

858 except HTTPException: 

859 raise 

860 except Exception as e: 

861 logger.error(f"Error creating join request for team {team_id}: {e}") 

862 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create join request") 

863 

864 

865@teams_router.delete("/{team_id}/leave", response_model=SuccessResponse) 

866async def leave_team( 

867 team_id: str, 

868 current_user: dict = Depends(get_current_user_with_permissions), 

869 db: Session = Depends(get_db), 

870) -> SuccessResponse: 

871 """Leave a team. 

872 

873 Allows users to remove themselves from a team. Cannot leave personal teams 

874 or if they are the last owner of a team. 

875 

876 Args: 

877 team_id: ID of the team to leave 

878 current_user: Currently authenticated user 

879 db: Database session 

880 

881 Returns: 

882 SuccessResponse: Confirmation of leaving the team 

883 

884 Raises: 

885 HTTPException: If team not found, user not member, cannot leave personal team, or last owner 

886 """ 

887 try: 

888 team_service = TeamManagementService(db) 

889 

890 # Validate team exists 

891 team = await team_service.get_team_by_id(team_id) 

892 if not team: 

893 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

894 

895 # Cannot leave personal team 

896 if team.is_personal: 

897 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot leave personal team") 

898 

899 # Check if user is member 

900 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

901 if not user_role: 

902 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User is not a member of this team") 

903 

904 # Remove user from team 

905 success = await team_service.remove_member_from_team(team_id, current_user["email"], removed_by=current_user["email"]) 

906 if not success: 

907 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot leave team - you may be the last owner") 

908 

909 db.commit() 

910 db.close() 

911 return SuccessResponse(message="Successfully left the team") 

912 except HTTPException: 

913 raise 

914 except Exception as e: 

915 logger.error(f"Error leaving team {team_id}: {e}") 

916 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to leave team") 

917 

918 

919@teams_router.get("/{team_id}/join-requests", response_model=List[TeamJoinRequestResponse]) 

920@require_permission("teams.manage_members") 

921async def list_team_join_requests( 

922 team_id: str, 

923 current_user: dict = Depends(get_current_user_with_permissions), 

924 db: Session = Depends(get_db), 

925) -> List[TeamJoinRequestResponse]: 

926 """List pending join requests for a team. 

927 

928 Only team owners can view join requests for their teams. 

929 

930 Args: 

931 team_id: ID of the team 

932 current_user: Authenticated user context dict with email and permissions 

933 db: Database session 

934 

935 Returns: 

936 List[TeamJoinRequestResponse]: List of pending join requests 

937 

938 Raises: 

939 HTTPException: If team not found or user not authorized 

940 """ 

941 try: 

942 team_service = TeamManagementService(db) 

943 

944 # Validate team exists and user is owner 

945 team = await team_service.get_team_by_id(team_id) 

946 if not team: 

947 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

948 

949 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

950 if user_role != "owner": 

951 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can view join requests") 

952 

953 # Get join requests 

954 join_requests = await team_service.list_join_requests(team_id) 

955 

956 result = [ 

957 TeamJoinRequestResponse( 

958 id=req.id, 

959 team_id=req.team_id, 

960 team_name=team.name, 

961 user_email=req.user_email, 

962 message=req.message, 

963 status=req.status, 

964 requested_at=req.requested_at, 

965 expires_at=req.expires_at, 

966 ) 

967 for req in join_requests 

968 ] 

969 db.commit() 

970 db.close() 

971 return result 

972 except HTTPException: 

973 raise 

974 except Exception as e: 

975 logger.error(f"Error listing join requests for team {team_id}: {e}") 

976 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list join requests") 

977 

978 

979@teams_router.post("/{team_id}/join-requests/{request_id}/approve", response_model=TeamMemberResponse) 

980@require_permission("teams.manage_members") 

981async def approve_join_request( 

982 team_id: str, 

983 request_id: str, 

984 current_user: dict = Depends(get_current_user_with_permissions), 

985 db: Session = Depends(get_db), 

986) -> TeamMemberResponse: 

987 """Approve a team join request. 

988 

989 Only team owners can approve join requests for their teams. 

990 

991 Args: 

992 team_id: ID of the team 

993 request_id: ID of the join request 

994 current_user: Authenticated user context dict with email and permissions 

995 db: Database session 

996 

997 Returns: 

998 TeamMemberResponse: New team member data 

999 

1000 Raises: 

1001 HTTPException: If request not found or user not authorized 

1002 """ 

1003 try: 

1004 team_service = TeamManagementService(db) 

1005 

1006 # Validate team exists and user is owner 

1007 team = await team_service.get_team_by_id(team_id) 

1008 if not team: 

1009 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

1010 

1011 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

1012 if user_role != "owner": 

1013 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can approve join requests") 

1014 

1015 # Approve join request 

1016 member = await team_service.approve_join_request(request_id, approved_by=current_user["email"]) 

1017 if not member: 

1018 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Join request not found") 

1019 

1020 db.commit() 

1021 db.close() 

1022 return TeamMemberResponse( 

1023 id=member.id, 

1024 team_id=member.team_id, 

1025 user_email=member.user_email, 

1026 role=member.role, 

1027 joined_at=member.joined_at, 

1028 invited_by=member.invited_by, 

1029 is_active=member.is_active, 

1030 ) 

1031 except HTTPException: 

1032 raise 

1033 except Exception as e: 

1034 logger.error(f"Error approving join request {request_id}: {e}") 

1035 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to approve join request") 

1036 

1037 

1038@teams_router.delete("/{team_id}/join-requests/{request_id}", response_model=SuccessResponse) 

1039@require_permission("teams.manage_members") 

1040async def reject_join_request( 

1041 team_id: str, 

1042 request_id: str, 

1043 current_user: dict = Depends(get_current_user_with_permissions), 

1044 db: Session = Depends(get_db), 

1045) -> SuccessResponse: 

1046 """Reject a team join request. 

1047 

1048 Only team owners can reject join requests for their teams. 

1049 

1050 Args: 

1051 team_id: ID of the team 

1052 request_id: ID of the join request 

1053 current_user: Authenticated user context dict with email and permissions 

1054 db: Database session 

1055 

1056 Returns: 

1057 SuccessResponse: Confirmation of rejection 

1058 

1059 Raises: 

1060 HTTPException: If request not found or user not authorized 

1061 """ 

1062 try: 

1063 team_service = TeamManagementService(db) 

1064 

1065 # Validate team exists and user is owner 

1066 team = await team_service.get_team_by_id(team_id) 

1067 if not team: 

1068 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") 

1069 

1070 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id) 

1071 if user_role != "owner": 

1072 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can reject join requests") 

1073 

1074 # Reject join request 

1075 success = await team_service.reject_join_request(request_id, rejected_by=current_user["email"]) 

1076 if not success: 

1077 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Join request not found") 

1078 

1079 db.commit() 

1080 db.close() 

1081 return SuccessResponse(message="Join request rejected successfully") 

1082 except HTTPException: 

1083 raise 

1084 except Exception as e: 

1085 logger.error(f"Error rejecting join request {request_id}: {e}") 

1086 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to reject join request")