Coverage for mcpgateway / routers / teams.py: 100%
450 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/teams.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Team Management Router.
8This module provides FastAPI routes for team management including
9team creation, member management, and invitation handling.
11Examples:
12 >>> from fastapi import FastAPI
13 >>> from mcpgateway.routers.teams import teams_router
14 >>> app = FastAPI()
15 >>> app.include_router(teams_router, prefix="/teams", tags=["Teams"])
16 >>> isinstance(teams_router, APIRouter)
17 True
18 >>> len(teams_router.routes) > 10 # Multiple team management endpoints
19 True
20"""
22# Standard
23from typing import Any, cast, List, Optional, Union
25# Third-Party
26from fastapi import APIRouter, Depends, HTTPException, Query, status
27from sqlalchemy.orm import Session
29# First-Party
30from mcpgateway.config import settings
31from mcpgateway.db import get_db
32from mcpgateway.middleware.rbac import _ACCESS_DENIED_MSG, get_current_user_with_permissions, require_permission
33from mcpgateway.schemas import (
34 CursorPaginatedTeamsResponse,
35 PaginatedTeamMembersResponse,
36 SuccessResponse,
37 TeamCreateRequest,
38 TeamDiscoveryResponse,
39 TeamInvitationResponse,
40 TeamInviteRequest,
41 TeamJoinRequest,
42 TeamJoinRequestResponse,
43 TeamListResponse,
44 TeamMemberAddRequest,
45 TeamMemberResponse,
46 TeamMemberUpdateRequest,
47 TeamResponse,
48 TeamUpdateRequest,
49)
50from mcpgateway.services.logging_service import LoggingService
51from mcpgateway.services.team_invitation_service import TeamInvitationService
52from mcpgateway.services.team_management_service import (
53 InvalidRoleError,
54 MemberAlreadyExistsError,
55 TeamManagementError,
56 TeamManagementService,
57 TeamMemberAddError,
58 TeamMemberLimitExceededError,
59 TeamNotFoundError,
60 UserNotFoundError,
61)
63# Initialize logging
64logging_service = LoggingService()
65logger = logging_service.get_logger(__name__)
67# Create router
68teams_router = APIRouter()
71# ---------------------------------------------------------------------------
72# Team CRUD Operations
73# ---------------------------------------------------------------------------
76@teams_router.post("/", response_model=TeamResponse, status_code=status.HTTP_201_CREATED)
77@require_permission("teams.create")
78async def create_team(request: TeamCreateRequest, current_user_ctx: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamResponse:
79 """Create a new team.
81 Args:
82 request: Team creation request data
83 current_user_ctx: Currently authenticated user context
84 db: Database session
86 Returns:
87 TeamResponse: Created team data
89 Raises:
90 HTTPException: If team creation fails
92 Examples:
93 >>> import asyncio
94 >>> asyncio.iscoroutinefunction(create_team)
95 True
96 """
97 try:
98 if not settings.allow_team_creation and not current_user_ctx.get("is_admin"):
99 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Team creation is currently disabled")
101 service = TeamManagementService(db)
102 team = await service.create_team(
103 name=request.name,
104 description=request.description,
105 created_by=current_user_ctx["email"],
106 visibility=request.visibility,
107 max_members=request.max_members,
108 skip_limits=bool(current_user_ctx.get("is_admin")),
109 )
111 # Build response BEFORE closing session to avoid lazy-load issues with get_member_count()
112 response = TeamResponse(
113 id=team.id,
114 name=team.name,
115 slug=team.slug,
116 description=team.description,
117 created_by=team.created_by,
118 is_personal=team.is_personal,
119 visibility=team.visibility,
120 max_members=team.max_members,
121 member_count=team.get_member_count(),
122 created_at=team.created_at,
123 updated_at=team.updated_at,
124 is_active=team.is_active,
125 )
126 db.commit()
127 db.close()
128 return response
129 except HTTPException:
130 raise
131 except ValueError as e:
132 logger.error(f"Team creation failed: {e}")
133 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
134 except Exception as e:
135 logger.error(f"Unexpected error creating team: {e}")
136 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create team")
139@teams_router.get("/", response_model=Union[TeamListResponse, CursorPaginatedTeamsResponse])
140@require_permission("teams.read")
141async def list_teams(
142 skip: int = Query(0, ge=0, description="Number of teams to skip"),
143 limit: int = Query(50, ge=1, le=100, description="Number of teams to return"),
144 cursor: Optional[str] = Query(None, description="Pagination cursor"),
145 include_pagination: bool = Query(False, description="Include pagination metadata (cursor)"),
146 current_user_ctx: dict = Depends(get_current_user_with_permissions),
147 db: Session = Depends(get_db),
148) -> Union[TeamListResponse, CursorPaginatedTeamsResponse]:
149 """List teams visible to the caller.
151 - Administrators see all non-personal teams (paginated)
152 - Regular users see only teams they are a member of (paginated client-side)
154 Args:
155 skip: Number of teams to skip for pagination
156 limit: Maximum number of teams to return
157 cursor: Pagination cursor
158 include_pagination: Include pagination metadata
159 current_user_ctx: Current user context with permissions and database session
160 db: Database session
162 Returns:
163 Union[TeamListResponse, CursorPaginatedTeamsResponse]: List of teams
165 Raises:
166 HTTPException: If there's an error listing teams
167 """
168 try:
169 service = TeamManagementService(db)
171 teams_data = []
172 next_cursor = None
173 total = 0
175 if current_user_ctx.get("is_admin"):
176 # Use updated list_teams logic
177 # If current request uses offset (skip), mapped to offset.
178 # If cursor, mapped to cursor.
179 # page is None, so returns Tuple
180 result = await service.list_teams(
181 limit=limit,
182 offset=skip,
183 cursor=cursor,
184 )
185 # Result is tuple (list, next_cursor)
186 teams_data, next_cursor = result
188 # Get accurate total count for API consumers
189 total = await service.get_teams_count()
190 else:
191 # Fallback to user teams and apply pagination locally
192 user_teams = await service.get_user_teams(current_user_ctx["email"], include_personal=True)
193 total = len(user_teams)
194 teams_data = user_teams[skip : skip + limit]
196 # Batch fetch member counts with caching (N+1 elimination)
197 team_ids = [str(team.id) for team in teams_data]
198 member_counts = await service.get_member_counts_batch_cached(team_ids)
200 team_responses = [
201 TeamResponse(
202 id=team.id,
203 name=team.name,
204 slug=team.slug,
205 description=team.description,
206 created_by=team.created_by,
207 is_personal=team.is_personal,
208 visibility=team.visibility,
209 max_members=team.max_members,
210 member_count=member_counts.get(str(team.id), 0),
211 created_at=team.created_at,
212 updated_at=team.updated_at,
213 is_active=team.is_active,
214 )
215 for team in teams_data
216 ]
218 # Release transaction before response serialization
219 db.commit()
220 db.close()
222 if include_pagination:
223 return CursorPaginatedTeamsResponse(teams=team_responses, nextCursor=next_cursor)
225 return TeamListResponse(teams=team_responses, total=total)
226 except Exception as e:
227 logger.error(f"Error listing teams: {e}")
228 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list teams")
231@teams_router.get("/discover", response_model=List[TeamDiscoveryResponse])
232@require_permission("teams.read")
233async def discover_public_teams(
234 skip: int = Query(0, ge=0, description="Number of teams to skip"),
235 limit: int = Query(50, ge=1, le=100, description="Number of teams to return"),
236 current_user_ctx: dict = Depends(get_current_user_with_permissions),
237 db: Session = Depends(get_db),
238) -> List[TeamDiscoveryResponse]:
239 """Discover public teams that can be joined.
241 Returns public teams that are discoverable to all authenticated users.
242 Only shows teams where the current user is not already a member.
244 Args:
245 skip: Number of teams to skip for pagination
246 limit: Maximum number of teams to return
247 current_user_ctx: Current user context with permissions and database session
248 db: Database session
250 Returns:
251 List[TeamDiscoveryResponse]: List of discoverable public teams
253 Raises:
254 HTTPException: If there's an error discovering teams
255 """
256 try:
257 team_service = TeamManagementService(db)
259 # Get public teams where user is not already a member
260 public_teams = await team_service.discover_public_teams(current_user_ctx["email"], skip=skip, limit=limit)
262 # Batch fetch member counts with caching (N+1 elimination)
263 team_ids = [str(team.id) for team in public_teams]
264 member_counts = await team_service.get_member_counts_batch_cached(team_ids)
266 discovery_responses = []
267 for team in public_teams:
268 discovery_responses.append(
269 TeamDiscoveryResponse(
270 id=team.id,
271 name=team.name,
272 description=team.description,
273 member_count=member_counts.get(str(team.id), 0),
274 created_at=team.created_at,
275 is_joinable=True, # All returned teams are joinable
276 )
277 )
279 # Release transaction before response serialization
280 db.commit()
281 db.close()
283 return discovery_responses
284 except Exception as e:
285 logger.error(f"Error discovering public teams: {e}")
286 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to discover teams")
289@teams_router.get("/{team_id}", response_model=TeamResponse)
290@require_permission("teams.read")
291async def get_team(team_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamResponse:
292 """Get a specific team by ID.
294 Args:
295 team_id: Team UUID
296 current_user: Authenticated user context dict with email and permissions
297 db: Database session
299 Returns:
300 TeamResponse: Team data
302 Raises:
303 HTTPException: If team not found or access denied
304 """
305 try:
306 service = TeamManagementService(db)
307 team = await service.get_team_by_id(team_id)
309 if not team:
310 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
312 # Check if user has access to the team
313 user_role = await service.get_user_role_in_team(current_user["email"], team_id)
314 if not user_role:
315 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
317 team_obj = cast(Any, team)
318 # Build response BEFORE closing session to avoid lazy-load issues with get_member_count()
319 response = TeamResponse(
320 id=team_obj.id,
321 name=team_obj.name,
322 slug=team_obj.slug,
323 description=team_obj.description,
324 created_by=team_obj.created_by,
325 is_personal=team_obj.is_personal,
326 visibility=team_obj.visibility,
327 max_members=team_obj.max_members,
328 member_count=team_obj.get_member_count(),
329 created_at=team_obj.created_at,
330 updated_at=team_obj.updated_at,
331 is_active=team_obj.is_active,
332 )
333 db.commit()
334 db.close()
335 return response
336 except HTTPException:
337 raise
338 except Exception as e:
339 logger.error(f"Error getting team {team_id}: {e}")
340 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to get team")
343@teams_router.put("/{team_id}", response_model=TeamResponse)
344@require_permission("teams.update")
345async def update_team(team_id: str, request: TeamUpdateRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamResponse:
346 """Update a team.
348 Args:
349 team_id: Team UUID
350 request: Team update request data
351 current_user: Authenticated user context dict with email and permissions
352 db: Database session
354 Returns:
355 TeamResponse: Updated team data
357 Raises:
358 HTTPException: If team not found, access denied, or update fails
359 """
360 try:
361 service = TeamManagementService(db)
363 # Check if user is team owner
364 role = await service.get_user_role_in_team(current_user["email"], team_id)
365 if role != "owner":
366 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
368 success = await service.update_team(team_id=team_id, name=request.name, description=request.description, visibility=request.visibility, max_members=request.max_members)
370 if not success:
371 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found or update failed")
373 # Fetch the updated team to build the response
374 team = await service.get_team_by_id(team_id)
375 if not team:
376 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found after update")
378 team_obj = cast(Any, team)
379 # Build response BEFORE closing session to avoid lazy-load issues with get_member_count()
380 response = TeamResponse(
381 id=team_obj.id,
382 name=team_obj.name,
383 slug=team_obj.slug,
384 description=team_obj.description,
385 created_by=team_obj.created_by,
386 is_personal=team_obj.is_personal,
387 visibility=team_obj.visibility,
388 max_members=team_obj.max_members,
389 member_count=team_obj.get_member_count(),
390 created_at=team_obj.created_at,
391 updated_at=team_obj.updated_at,
392 is_active=team_obj.is_active,
393 )
394 db.commit()
395 db.close()
396 return response
397 except HTTPException:
398 raise
399 except ValueError as e:
400 logger.error(f"Team update failed: {e}")
401 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
402 except Exception as e:
403 logger.error(f"Error updating team {team_id}: {e}")
404 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update team")
407@teams_router.delete("/{team_id}", response_model=SuccessResponse)
408@require_permission("teams.delete")
409async def delete_team(team_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> SuccessResponse:
410 """Delete a team.
412 Args:
413 team_id: Team UUID
414 current_user: Authenticated user context dict with email and permissions
415 db: Database session
417 Returns:
418 SuccessResponse: Success confirmation
420 Raises:
421 HTTPException: If team not found, access denied, or deletion fails
422 """
423 try:
424 service = TeamManagementService(db)
426 # Check if user is team owner
427 role = await service.get_user_role_in_team(current_user["email"], team_id)
428 if role != "owner":
429 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can delete teams")
431 success = await service.delete_team(team_id, current_user["email"])
432 if not success:
433 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
435 db.commit()
436 db.close()
437 return SuccessResponse(message="Team deleted successfully")
438 except HTTPException:
439 raise
440 except Exception as e:
441 logger.error(f"Error deleting team {team_id}: {e}")
442 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete team")
445# ---------------------------------------------------------------------------
446# Team Member Management
447# ---------------------------------------------------------------------------
450@teams_router.get("/{team_id}/members", response_model=Union[PaginatedTeamMembersResponse, List[TeamMemberResponse]])
451@require_permission("teams.read")
452async def list_team_members(
453 team_id: str,
454 cursor: Optional[str] = Query(None, description="Cursor for pagination"),
455 limit: Optional[int] = Query(None, ge=1, le=settings.pagination_max_page_size, description="Maximum number of members to return (default: 50)"),
456 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
457 current_user: dict = Depends(get_current_user_with_permissions),
458 db: Session = Depends(get_db),
459) -> Union[PaginatedTeamMembersResponse, List[TeamMemberResponse]]:
460 """List team members with cursor-based pagination.
462 Args:
463 team_id: Team UUID
464 cursor: Pagination cursor for fetching the next set of results
465 limit: Maximum number of members to return (default: 50)
466 include_pagination: Whether to include cursor pagination metadata in the response (default: false)
467 current_user: Authenticated user context dict with email and permissions
468 db: Database session
470 Returns:
471 PaginatedTeamMembersResponse with members and nextCursor if include_pagination=true, or
472 List of team members if include_pagination=false
474 Raises:
475 HTTPException: If team not found or access denied
476 """
477 try:
478 service = TeamManagementService(db)
480 # Check if user has access to the team
481 user_role = await service.get_user_role_in_team(current_user["email"], team_id)
482 if not user_role:
483 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
485 # Get members - service returns different types based on parameters:
486 # - cursor=None, limit=None: List[Tuple] (backward compat)
487 # - cursor or limit provided: Tuple[List[Tuple], next_cursor]
488 result = await service.get_team_members(team_id, cursor=cursor, limit=limit)
490 # Handle different return types from service
491 if cursor is not None or limit is not None:
492 # Cursor pagination was used - result is a tuple
493 members, next_cursor = result
494 else:
495 # No pagination - result is a plain list
496 members = result
497 next_cursor = None
499 # Convert to response objects
500 member_responses = []
501 for user, membership in members:
502 member_responses.append(
503 TeamMemberResponse(
504 id=membership.id,
505 team_id=membership.team_id,
506 user_email=membership.user_email,
507 role=membership.role,
508 joined_at=membership.joined_at,
509 invited_by=membership.invited_by,
510 is_active=membership.is_active,
511 )
512 )
514 # Return with pagination metadata if requested
515 db.commit()
516 db.close()
517 if include_pagination:
518 return PaginatedTeamMembersResponse(members=member_responses, nextCursor=next_cursor)
520 return member_responses
521 except HTTPException:
522 raise
523 except Exception as e:
524 logger.error(f"Error listing team members for team {team_id}: {e}")
525 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list team members")
528@teams_router.post("/{team_id}/members", response_model=TeamMemberResponse, status_code=status.HTTP_201_CREATED)
529@require_permission("teams.manage_members")
530async def add_team_member(team_id: str, request: TeamMemberAddRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamMemberResponse:
531 """Add a new member to a team.
533 Args:
534 team_id: Team UUID
535 request: Member add request data with email and role
536 current_user: Authenticated user context dict with email and permissions
537 db: Database session
539 Returns:
540 TeamMemberResponse: New member data
542 Raises:
543 HTTPException: If team not found, access denied, or add fails
544 """
545 try:
546 service = TeamManagementService(db)
548 # Check if user is team owner
549 role = await service.get_user_role_in_team(current_user["email"], team_id)
550 if role != "owner":
551 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
553 # Add member to team and get the created member directly
554 member = await service.add_member_to_team(team_id, request.email, request.role, invited_by=current_user["email"])
556 db.commit()
557 db.close()
558 return TeamMemberResponse.model_validate(member)
559 except HTTPException:
560 raise
561 except InvalidRoleError as e:
562 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
563 except TeamNotFoundError as e:
564 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
565 except UserNotFoundError as e:
566 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
567 except MemberAlreadyExistsError as e:
568 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
569 except TeamMemberLimitExceededError as e:
570 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
571 except TeamMemberAddError as e:
572 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
573 except TeamManagementError as e:
574 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
575 except Exception as e:
576 logger.error(f"Error adding team member {request.email} to team {team_id}: {e}")
577 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to add team member")
580@teams_router.put("/{team_id}/members/{user_email}", response_model=TeamMemberResponse)
581@require_permission("teams.manage_members")
582async def update_team_member(
583 team_id: str, user_email: str, request: TeamMemberUpdateRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)
584) -> TeamMemberResponse:
585 """Update a team member's role.
587 Args:
588 team_id: Team UUID
589 user_email: Email of the member to update
590 request: Member update request data
591 current_user: Authenticated user context dict with email and permissions
592 db: Database session
594 Returns:
595 TeamMemberResponse: Updated member data
597 Raises:
598 HTTPException: If member not found, access denied, or update fails
599 """
600 try:
601 service = TeamManagementService(db)
603 # Check if user is team owner
604 role = await service.get_user_role_in_team(current_user["email"], team_id)
605 if role != "owner":
606 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
608 success = await service.update_member_role(team_id, user_email, request.role)
609 if not success:
610 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team member not found or update failed")
612 # Fetch the updated member to build the response
613 member = await service.get_member(team_id, user_email)
614 if not member:
615 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team member not found after update")
617 db.commit()
618 db.close()
619 return TeamMemberResponse.model_validate(member)
620 except HTTPException:
621 raise
622 except ValueError as e:
623 logger.error(f"Member update failed: {e}")
624 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
625 except Exception as e:
626 logger.error(f"Error updating team member {user_email} in team {team_id}: {e}")
627 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update team member")
630@teams_router.delete("/{team_id}/members/{user_email}", response_model=SuccessResponse)
631@require_permission("teams.manage_members")
632async def remove_team_member(team_id: str, user_email: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> SuccessResponse:
633 """Remove a team member.
635 Args:
636 team_id: Team UUID
637 user_email: Email of the member to remove
638 current_user: Authenticated user context dict with email and permissions
639 db: Database session
641 Returns:
642 SuccessResponse: Success confirmation
644 Raises:
645 HTTPException: If member not found, access denied, or removal fails
646 """
647 try:
648 service = TeamManagementService(db)
650 # Users can remove themselves, or owners can remove others
651 current_user_role = await service.get_user_role_in_team(current_user["email"], team_id)
652 if current_user["email"] != user_email and current_user_role != "owner":
653 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
655 success = await service.remove_member_from_team(team_id, user_email)
656 if not success:
657 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team member not found")
659 db.commit()
660 db.close()
661 return SuccessResponse(message="Team member removed successfully")
662 except HTTPException:
663 raise
664 except Exception as e:
665 logger.error(f"Error removing team member {user_email} from team {team_id}: {e}")
666 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to remove team member")
669# ---------------------------------------------------------------------------
670# Team Invitations
671# ---------------------------------------------------------------------------
674@teams_router.post("/{team_id}/invitations", response_model=TeamInvitationResponse, status_code=status.HTTP_201_CREATED)
675@require_permission("teams.manage_members")
676async def invite_team_member(team_id: str, request: TeamInviteRequest, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamInvitationResponse:
677 """Invite a user to join a team.
679 Args:
680 team_id: Team UUID
681 request: Invitation request data
682 current_user: Authenticated user context dict with email and permissions
683 db: Database session
685 Returns:
686 TeamInvitationResponse: Created invitation data
688 Raises:
689 HTTPException: If team not found, access denied, or invitation fails
690 """
691 try:
692 if not settings.allow_team_invitations:
693 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Team invitations are currently disabled")
695 team_service = TeamManagementService(db)
696 invitation_service = TeamInvitationService(db)
698 # Check if user is team owner
699 role = await team_service.get_user_role_in_team(current_user["email"], team_id)
700 if role != "owner":
701 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
703 invitation = await invitation_service.create_invitation(team_id=team_id, email=str(request.email), role=request.role, invited_by=current_user["email"])
704 if not invitation:
705 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create invitation")
707 # Get team name for response
708 team = await team_service.get_team_by_id(team_id)
709 team_name = team.name if team else "Unknown Team"
711 db.commit()
712 db.close()
713 return TeamInvitationResponse(
714 id=invitation.id,
715 team_id=invitation.team_id,
716 team_name=team_name,
717 email=invitation.email,
718 role=invitation.role,
719 invited_by=invitation.invited_by,
720 invited_at=invitation.invited_at,
721 expires_at=invitation.expires_at,
722 token=invitation.token,
723 is_active=invitation.is_active,
724 is_expired=invitation.is_expired(),
725 )
726 except HTTPException:
727 raise
728 except ValueError as e:
729 logger.error(f"Team invitation failed: {e}")
730 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
731 except Exception as e:
732 logger.error(f"Error creating team invitation for team {team_id}: {e}")
733 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create invitation")
736@teams_router.get("/{team_id}/invitations", response_model=List[TeamInvitationResponse])
737@require_permission("teams.read")
738async def list_team_invitations(team_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> List[TeamInvitationResponse]:
739 """List team invitations.
741 Args:
742 team_id: Team UUID
743 current_user: Authenticated user context dict with email and permissions
744 db: Database session
746 Returns:
747 List[TeamInvitationResponse]: List of team invitations
749 Raises:
750 HTTPException: If team not found or access denied
751 """
752 try:
753 team_service = TeamManagementService(db)
754 invitation_service = TeamInvitationService(db)
756 # Check if user is team owner
757 role = await team_service.get_user_role_in_team(current_user["email"], team_id)
758 if role != "owner":
759 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
761 invitations = await invitation_service.get_team_invitations(team_id)
763 # Get team name for responses
764 team = await team_service.get_team_by_id(team_id)
765 team_name = team.name if team else "Unknown Team"
767 invitation_responses = []
768 for invitation in invitations:
769 invitation_responses.append(
770 TeamInvitationResponse(
771 id=invitation.id,
772 team_id=invitation.team_id,
773 team_name=team_name,
774 email=invitation.email,
775 role=invitation.role,
776 invited_by=invitation.invited_by,
777 invited_at=invitation.invited_at,
778 expires_at=invitation.expires_at,
779 token=invitation.token,
780 is_active=invitation.is_active,
781 is_expired=invitation.is_expired(),
782 )
783 )
785 db.commit()
786 db.close()
787 return invitation_responses
788 except HTTPException:
789 raise
790 except Exception as e:
791 logger.error(f"Error listing team invitations for team {team_id}: {e}")
792 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list invitations")
795@teams_router.post("/invitations/{token}/accept", response_model=TeamMemberResponse)
796@require_permission("teams.join")
797async def accept_team_invitation(token: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> TeamMemberResponse:
798 """Accept a team invitation.
800 Args:
801 token: Invitation token
802 current_user: Authenticated user context dict with email and permissions
803 db: Database session
805 Returns:
806 TeamMemberResponse: New team member data
808 Raises:
809 HTTPException: If invitation not found, expired, or acceptance fails
810 """
811 try:
812 invitation_service = TeamInvitationService(db)
814 member = await invitation_service.accept_invitation(token, current_user["email"])
815 if not member or not hasattr(member, "id"):
816 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid or expired invitation")
818 db.commit()
819 db.close()
820 return TeamMemberResponse.model_validate(member)
821 except HTTPException:
822 raise
823 except ValueError as e:
824 logger.error(f"Invitation acceptance failed: {e}")
825 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
826 except Exception as e:
827 logger.error("Error accepting invitation: %s", e)
828 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to accept invitation")
831@teams_router.delete("/invitations/{invitation_id}", response_model=SuccessResponse)
832@require_permission("teams.manage_members")
833async def cancel_team_invitation(invitation_id: str, current_user: dict = Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> SuccessResponse:
834 """Cancel a team invitation.
836 Args:
837 invitation_id: Invitation UUID
838 current_user: Authenticated user context dict with email and permissions
839 db: Database session
841 Returns:
842 SuccessResponse: Success confirmation
844 Raises:
845 HTTPException: If invitation not found, access denied, or cancellation fails
846 """
847 try:
848 team_service = TeamManagementService(db)
849 invitation_service = TeamInvitationService(db)
851 # Get invitation to check team permissions
852 # First-Party
853 from mcpgateway.db import EmailTeamInvitation
855 invitation = db.query(EmailTeamInvitation).filter(EmailTeamInvitation.id == invitation_id).first()
856 if not invitation:
857 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found")
859 # Check if user is team owner or the inviter
860 role = await team_service.get_user_role_in_team(current_user["email"], invitation.team_id)
861 if role != "owner" and current_user["email"] != invitation.invited_by:
862 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
864 success = await invitation_service.revoke_invitation(invitation_id, current_user["email"])
865 if not success:
866 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found")
868 db.commit()
869 db.close()
870 return SuccessResponse(message="Team invitation cancelled successfully")
871 except HTTPException:
872 raise
873 except Exception as e:
874 logger.error(f"Error cancelling invitation {invitation_id}: {e}")
875 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to cancel invitation")
878@teams_router.post("/{team_id}/join", response_model=TeamJoinRequestResponse)
879@require_permission("teams.join")
880async def request_to_join_team(
881 team_id: str,
882 join_request: TeamJoinRequest,
883 current_user: dict = Depends(get_current_user_with_permissions),
884 db: Session = Depends(get_db),
885) -> TeamJoinRequestResponse:
886 """Request to join a public team.
888 Allows users to request membership in public teams. The request will be
889 pending until approved by a team owner.
891 Args:
892 team_id: ID of the team to join
893 join_request: Join request details including optional message
894 current_user: Currently authenticated user
895 db: Database session
897 Returns:
898 TeamJoinRequestResponse: Created join request details
900 Raises:
901 HTTPException: If team not found, not public, user already member, or request fails
902 """
903 try:
904 if not settings.allow_team_join_requests:
905 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Team join requests are currently disabled")
907 team_service = TeamManagementService(db)
909 # Validate team exists and is public
910 team = await team_service.get_team_by_id(team_id)
911 if not team:
912 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
914 if team.visibility != "public":
915 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Can only request to join public teams")
917 # Check if user is already a member
918 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id)
919 if user_role:
920 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User is already a member of this team")
922 # Create join request
923 join_req = await team_service.create_join_request(team_id=team_id, user_email=current_user["email"], message=join_request.message)
925 db.commit()
926 db.close()
927 return TeamJoinRequestResponse(
928 id=join_req.id,
929 team_id=join_req.team_id,
930 team_name=team.name,
931 user_email=join_req.user_email,
932 message=join_req.message,
933 status=join_req.status,
934 requested_at=join_req.requested_at,
935 expires_at=join_req.expires_at,
936 )
937 except HTTPException:
938 raise
939 except Exception as e:
940 logger.error(f"Error creating join request for team {team_id}: {e}")
941 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create join request")
944@teams_router.delete("/{team_id}/leave", response_model=SuccessResponse)
945@require_permission("teams.join")
946async def leave_team(
947 team_id: str,
948 current_user: dict = Depends(get_current_user_with_permissions),
949 db: Session = Depends(get_db),
950) -> SuccessResponse:
951 """Leave a team.
953 Allows users to remove themselves from a team. Cannot leave personal teams
954 or if they are the last owner of a team.
956 Args:
957 team_id: ID of the team to leave
958 current_user: Currently authenticated user
959 db: Database session
961 Returns:
962 SuccessResponse: Confirmation of leaving the team
964 Raises:
965 HTTPException: If team not found, user not member, cannot leave personal team, or last owner
966 """
967 try:
968 team_service = TeamManagementService(db)
970 # Validate team exists
971 team = await team_service.get_team_by_id(team_id)
972 if not team:
973 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
975 # Cannot leave personal team
976 if team.is_personal:
977 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot leave personal team")
979 # Check if user is member
980 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id)
981 if not user_role:
982 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User is not a member of this team")
984 # Remove user from team
985 success = await team_service.remove_member_from_team(team_id, current_user["email"], removed_by=current_user["email"])
986 if not success:
987 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot leave team - you may be the last owner")
989 db.commit()
990 db.close()
991 return SuccessResponse(message="Successfully left the team")
992 except HTTPException:
993 raise
994 except Exception as e:
995 logger.error(f"Error leaving team {team_id}: {e}")
996 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to leave team")
999@teams_router.get("/{team_id}/join-requests", response_model=List[TeamJoinRequestResponse])
1000@require_permission("teams.manage_members")
1001async def list_team_join_requests(
1002 team_id: str,
1003 current_user: dict = Depends(get_current_user_with_permissions),
1004 db: Session = Depends(get_db),
1005) -> List[TeamJoinRequestResponse]:
1006 """List pending join requests for a team.
1008 Only team owners can view join requests for their teams.
1010 Args:
1011 team_id: ID of the team
1012 current_user: Authenticated user context dict with email and permissions
1013 db: Database session
1015 Returns:
1016 List[TeamJoinRequestResponse]: List of pending join requests
1018 Raises:
1019 HTTPException: If team not found or user not authorized
1020 """
1021 try:
1022 team_service = TeamManagementService(db)
1024 # Validate team exists and user is owner
1025 team = await team_service.get_team_by_id(team_id)
1026 if not team:
1027 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
1029 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id)
1030 if user_role != "owner":
1031 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can view join requests")
1033 # Get join requests
1034 join_requests = await team_service.list_join_requests(team_id)
1036 result = [
1037 TeamJoinRequestResponse(
1038 id=req.id,
1039 team_id=req.team_id,
1040 team_name=team.name,
1041 user_email=req.user_email,
1042 message=req.message,
1043 status=req.status,
1044 requested_at=req.requested_at,
1045 expires_at=req.expires_at,
1046 )
1047 for req in join_requests
1048 ]
1049 db.commit()
1050 db.close()
1051 return result
1052 except HTTPException:
1053 raise
1054 except Exception as e:
1055 logger.error(f"Error listing join requests for team {team_id}: {e}")
1056 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to list join requests")
1059@teams_router.post("/{team_id}/join-requests/{request_id}/approve", response_model=TeamMemberResponse)
1060@require_permission("teams.manage_members")
1061async def approve_join_request(
1062 team_id: str,
1063 request_id: str,
1064 current_user: dict = Depends(get_current_user_with_permissions),
1065 db: Session = Depends(get_db),
1066) -> TeamMemberResponse:
1067 """Approve a team join request.
1069 Only team owners can approve join requests for their teams.
1071 Args:
1072 team_id: ID of the team
1073 request_id: ID of the join request
1074 current_user: Authenticated user context dict with email and permissions
1075 db: Database session
1077 Returns:
1078 TeamMemberResponse: New team member data
1080 Raises:
1081 HTTPException: If request not found or user not authorized
1082 """
1083 try:
1084 team_service = TeamManagementService(db)
1086 # Validate team exists and user is owner
1087 team = await team_service.get_team_by_id(team_id)
1088 if not team:
1089 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
1091 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id)
1092 if user_role != "owner":
1093 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can approve join requests")
1095 # Approve join request
1096 member = await team_service.approve_join_request(request_id, approved_by=current_user["email"])
1097 if not member:
1098 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Join request not found")
1100 db.commit()
1101 db.close()
1102 return TeamMemberResponse(
1103 id=member.id,
1104 team_id=member.team_id,
1105 user_email=member.user_email,
1106 role=member.role,
1107 joined_at=member.joined_at,
1108 invited_by=member.invited_by,
1109 is_active=member.is_active,
1110 )
1111 except HTTPException:
1112 raise
1113 except Exception as e:
1114 logger.error(f"Error approving join request {request_id}: {e}")
1115 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to approve join request")
1118@teams_router.delete("/{team_id}/join-requests/{request_id}", response_model=SuccessResponse)
1119@require_permission("teams.manage_members")
1120async def reject_join_request(
1121 team_id: str,
1122 request_id: str,
1123 current_user: dict = Depends(get_current_user_with_permissions),
1124 db: Session = Depends(get_db),
1125) -> SuccessResponse:
1126 """Reject a team join request.
1128 Only team owners can reject join requests for their teams.
1130 Args:
1131 team_id: ID of the team
1132 request_id: ID of the join request
1133 current_user: Authenticated user context dict with email and permissions
1134 db: Database session
1136 Returns:
1137 SuccessResponse: Confirmation of rejection
1139 Raises:
1140 HTTPException: If request not found or user not authorized
1141 """
1142 try:
1143 team_service = TeamManagementService(db)
1145 # Validate team exists and user is owner
1146 team = await team_service.get_team_by_id(team_id)
1147 if not team:
1148 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
1150 user_role = await team_service.get_user_role_in_team(current_user["email"], team_id)
1151 if user_role != "owner":
1152 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only team owners can reject join requests")
1154 # Reject join request
1155 success = await team_service.reject_join_request(request_id, rejected_by=current_user["email"])
1156 if not success:
1157 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Join request not found")
1159 db.commit()
1160 db.close()
1161 return SuccessResponse(message="Join request rejected successfully")
1162 except HTTPException:
1163 raise
1164 except Exception as e:
1165 logger.error(f"Error rejecting join request {request_id}: {e}")
1166 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to reject join request")