Coverage for mcpgateway / routers / tokens.py: 99%
211 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/tokens.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7JWT Token Catalog API endpoints.
8Provides comprehensive API token management with scoping, revocation, and analytics.
9"""
11# Standard
12import logging
13from typing import List, Optional
15# Third-Party
16from fastapi import APIRouter, Depends, HTTPException, status
17from sqlalchemy.exc import IntegrityError
18from sqlalchemy.orm import Session
20# First-Party
21from mcpgateway.db import get_db
22from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
23from mcpgateway.schemas import TokenCreateRequest, TokenCreateResponse, TokenListResponse, TokenResponse, TokenRevokeRequest, TokenUpdateRequest, TokenUsageStatsResponse
24from mcpgateway.services.permission_service import PermissionService
25from mcpgateway.services.token_catalog_service import TokenCatalogService, TokenScope
27logger = logging.getLogger(__name__)
29router = APIRouter(prefix="/tokens", tags=["tokens"])
32def _require_authenticated_session(current_user: dict) -> None:
33 """Block anonymous, unauthenticated, and API-token access to token management endpoints.
35 Enforces Management Plane isolation: only interactive sessions (JWT from web
36 login, SSO, or OAuth) may create, list, or revoke tokens. API tokens are
37 Data Plane credentials and must never be able to manage other tokens
38 (token-chaining attack vector).
40 Args:
41 current_user: User context from get_current_user_with_permissions
43 Raises:
44 HTTPException: 403 if auth_method is None, anonymous, or api_token
45 """
46 auth_method = current_user.get("auth_method")
48 # Fail-secure: block if auth_method not set (indicates incomplete auth flow)
49 if auth_method is None:
50 logger.warning("Token management blocked: auth_method not set. This indicates an auth code path that needs to set request.state.auth_method")
51 raise HTTPException(
52 status_code=status.HTTP_403_FORBIDDEN,
53 detail="Token management requires authentication. Authentication method could not be determined.",
54 )
56 # Block anonymous users (missing proxy header or unauthenticated)
57 if auth_method == "anonymous":
58 raise HTTPException(
59 status_code=status.HTTP_403_FORBIDDEN,
60 detail="Token management requires authentication. Anonymous access is not permitted.",
61 )
63 # Block API tokens from managing other tokens (Management Plane isolation).
64 # Token CRUD endpoints require an interactive session (JWT from web login or SSO).
65 # Allowing API tokens here would let a compromised token create new long-lived
66 # tokens and escalate persistence — a token-chaining attack.
67 if auth_method == "api_token":
68 raise HTTPException(
69 status_code=status.HTTP_403_FORBIDDEN,
70 detail=("Token management requires an interactive session (JWT from web login or SSO). " "API tokens cannot create, list, or revoke other tokens."),
71 )
74async def _get_caller_permissions(
75 db: Session,
76 current_user: dict,
77 team_id: Optional[str] = None,
78) -> Optional[List[str]]:
79 """Get caller's effective permissions for scope containment.
81 Args:
82 db: Database session
83 current_user: User context
84 team_id: Team context for permission lookup
86 Returns:
87 List of permissions, or ["*"] for admins
88 """
89 # SECURITY: Only treat admin as unrestricted when token is un-narrowed.
90 # Narrowed or public-only admin sessions must derive permissions through
91 # the token-aware path to enforce Layer 1 scope containment.
92 token_teams = current_user.get("token_teams")
93 if current_user.get("is_admin") and token_teams is None:
94 return ["*"] # Un-narrowed admins can grant anything
96 permission_service = PermissionService(db)
97 permissions = await permission_service.get_user_permissions(
98 user_email=current_user["email"],
99 team_id=team_id,
100 token_teams=token_teams, # SECURITY: Respect token narrowing
101 )
102 return list(permissions) if permissions else None
105@router.post("", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED)
106@require_permission("tokens.create")
107async def create_token(
108 request: TokenCreateRequest,
109 current_user=Depends(get_current_user_with_permissions),
110 db: Session = Depends(get_db),
111) -> TokenCreateResponse:
112 """Create a new API token for the current user.
114 Args:
115 request: Token creation request with name, description, scoping, etc.
116 current_user: Authenticated user from JWT
117 db: Database session
119 Returns:
120 TokenCreateResponse: Created token details with raw token
122 Raises:
123 HTTPException: If token name already exists or validation fails
125 Examples:
126 >>> import asyncio
127 >>> asyncio.iscoroutinefunction(create_token)
128 True
129 """
130 _require_authenticated_session(current_user)
132 # Auto-inherit team_id from the caller's single team when not explicitly provided.
133 # This prevents tokens from being silently scoped to public-only (team_id=None)
134 # when the user belongs to exactly one team, maintaining RBAC context at token level.
135 # Multi-team users must specify team_id explicitly to avoid ambiguity.
136 # Admins with teams=null are exempt and may still create global-scope tokens.
137 effective_team_id = request.team_id
138 caller_token_teams = current_user.get("token_teams")
139 # Only un-narrowed admins (token_teams=None) are exempt from auto-inheritance.
140 # Narrowed admin sessions use the same team-scoping logic as non-admins.
141 is_unrestricted_admin = current_user.get("is_admin") and caller_token_teams is None
142 if effective_team_id is None and not is_unrestricted_admin:
143 user_teams = caller_token_teams or []
144 if len(user_teams) == 1:
145 effective_team_id = user_teams[0]
146 logger.debug("Auto-inherited team_id=%s for token creation by %s", effective_team_id, current_user["email"])
148 service = TokenCatalogService(db)
150 # Get caller permissions for scope containment (if custom scope requested)
151 caller_permissions = None
152 if request.scope and request.scope.permissions:
153 caller_permissions = await _get_caller_permissions(db, current_user, effective_team_id)
155 # Convert request to TokenScope if provided
156 scope = None
157 if request.scope:
158 scope = TokenScope(
159 server_id=request.scope.server_id,
160 permissions=request.scope.permissions,
161 ip_restrictions=request.scope.ip_restrictions,
162 time_restrictions=request.scope.time_restrictions,
163 usage_limits=request.scope.usage_limits,
164 )
166 try:
167 token_record, raw_token = await service.create_token(
168 user_email=current_user["email"],
169 name=request.name,
170 description=request.description,
171 scope=scope,
172 expires_in_days=request.expires_in_days,
173 tags=request.tags,
174 team_id=effective_team_id,
175 caller_permissions=caller_permissions,
176 is_active=request.is_active,
177 )
179 # Create TokenResponse for the token info
180 token_response = TokenResponse(
181 id=token_record.id,
182 name=token_record.name,
183 description=token_record.description,
184 user_email=token_record.user_email,
185 team_id=token_record.team_id,
186 server_id=token_record.server_id,
187 resource_scopes=token_record.resource_scopes or [],
188 ip_restrictions=token_record.ip_restrictions or [],
189 time_restrictions=token_record.time_restrictions or {},
190 usage_limits=token_record.usage_limits or {},
191 created_at=token_record.created_at,
192 expires_at=token_record.expires_at,
193 last_used=token_record.last_used,
194 is_active=token_record.is_active,
195 tags=token_record.tags or [],
196 )
198 db.commit()
199 db.close()
200 return TokenCreateResponse(
201 token=token_response,
202 access_token=raw_token,
203 )
204 except ValueError as e:
205 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
206 except IntegrityError as e:
207 db.rollback()
208 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e)
209 # Match the specific name constraint: PostgreSQL reports the constraint name
210 # (either the db.py name or the Alembic migration name); SQLite reports column paths.
211 if (
212 "uq_email_api_tokens_user_name_team" in err_str
213 or "uq_email_api_tokens_user_name" in err_str
214 or "uq_email_api_tokens_user_email_name" in err_str
215 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str)
216 ):
217 raise HTTPException(
218 status_code=status.HTTP_409_CONFLICT,
219 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.",
220 )
221 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.")
224@router.get("", response_model=TokenListResponse)
225@require_permission("tokens.read")
226async def list_tokens(
227 include_inactive: bool = False,
228 limit: int = 50,
229 offset: int = 0,
230 db: Session = Depends(get_db),
231 current_user=Depends(get_current_user_with_permissions),
232) -> TokenListResponse:
233 """List API tokens for the current user.
235 Args:
236 include_inactive: Include inactive/expired tokens
237 limit: Maximum number of tokens to return (default 50)
238 offset: Number of tokens to skip for pagination
239 current_user: Authenticated user from JWT
240 db: Database session
242 Returns:
243 TokenListResponse: List of user's API tokens
245 Examples:
246 >>> import asyncio
247 >>> asyncio.iscoroutinefunction(list_tokens)
248 True
249 """
250 _require_authenticated_session(current_user)
252 service = TokenCatalogService(db)
253 tokens = await service.list_user_and_team_tokens(
254 user_email=current_user["email"],
255 include_inactive=include_inactive,
256 limit=limit,
257 offset=offset,
258 )
260 total_count = await service.count_user_and_team_tokens(
261 user_email=current_user["email"],
262 include_inactive=include_inactive,
263 )
265 # Batch fetch revocation info (single query instead of N+1)
266 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens])
268 token_responses = []
269 for token in tokens:
270 revocation_info = revocation_map.get(token.jti)
272 token_responses.append(
273 TokenResponse(
274 id=token.id,
275 name=token.name,
276 description=token.description,
277 user_email=token.user_email,
278 team_id=token.team_id,
279 created_at=token.created_at,
280 expires_at=token.expires_at,
281 last_used=token.last_used,
282 is_active=token.is_active,
283 is_revoked=revocation_info is not None,
284 revoked_at=revocation_info.revoked_at if revocation_info else None,
285 revoked_by=revocation_info.revoked_by if revocation_info else None,
286 revocation_reason=revocation_info.reason if revocation_info else None,
287 tags=token.tags,
288 server_id=token.server_id,
289 resource_scopes=token.resource_scopes,
290 ip_restrictions=token.ip_restrictions,
291 time_restrictions=token.time_restrictions,
292 usage_limits=token.usage_limits,
293 )
294 )
296 db.commit()
297 db.close()
298 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset)
301@router.get("/{token_id}", response_model=TokenResponse)
302@require_permission("tokens.read")
303async def get_token(
304 token_id: str,
305 current_user=Depends(get_current_user_with_permissions),
306 db: Session = Depends(get_db),
307) -> TokenResponse:
308 """Get details of a specific token.
310 Args:
311 token_id: Token ID to retrieve
312 current_user: Authenticated user from JWT
313 db: Database session
315 Returns:
316 TokenResponse: Token details
318 Raises:
319 HTTPException: If token not found or not owned by user
321 Examples:
322 >>> import asyncio
323 >>> asyncio.iscoroutinefunction(get_token)
324 True
325 """
326 _require_authenticated_session(current_user)
328 service = TokenCatalogService(db)
329 token = await service.get_token(token_id, current_user["email"])
331 if not token:
332 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
334 db.commit()
335 db.close()
336 return TokenResponse(
337 id=token.id,
338 name=token.name,
339 description=token.description,
340 user_email=token.user_email,
341 team_id=token.team_id,
342 created_at=token.created_at,
343 expires_at=token.expires_at,
344 last_used=token.last_used,
345 is_active=token.is_active,
346 tags=token.tags,
347 server_id=token.server_id,
348 resource_scopes=token.resource_scopes,
349 ip_restrictions=token.ip_restrictions,
350 time_restrictions=token.time_restrictions,
351 usage_limits=token.usage_limits,
352 )
355@router.put("/{token_id}", response_model=TokenResponse)
356@require_permission("tokens.update")
357async def update_token(
358 token_id: str,
359 request: TokenUpdateRequest,
360 current_user=Depends(get_current_user_with_permissions),
361 db: Session = Depends(get_db),
362) -> TokenResponse:
363 """Update an existing token.
365 Args:
366 token_id: Token ID to update
367 request: Token update request
368 current_user: Authenticated user from JWT
369 db: Database session
371 Returns:
372 TokenResponse: Updated token details
374 Raises:
375 HTTPException: If token not found or validation fails
376 """
377 _require_authenticated_session(current_user)
379 service = TokenCatalogService(db)
381 # For update, get caller permissions using token's team_id
382 caller_permissions = None
383 if request.scope and request.scope.permissions:
384 # Get existing token to find its team_id
385 existing_token = await service.get_token(token_id, current_user["email"])
386 if not existing_token:
387 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
388 # Use token's team_id for permission lookup
389 caller_permissions = await _get_caller_permissions(db, current_user, existing_token.team_id)
391 # Convert request to TokenScope if provided
392 scope = None
393 if request.scope:
394 scope = TokenScope(
395 server_id=request.scope.server_id,
396 permissions=request.scope.permissions,
397 ip_restrictions=request.scope.ip_restrictions,
398 time_restrictions=request.scope.time_restrictions,
399 usage_limits=request.scope.usage_limits,
400 )
402 try:
403 token = await service.update_token(
404 token_id=token_id,
405 user_email=current_user["email"],
406 name=request.name,
407 description=request.description,
408 scope=scope,
409 tags=request.tags,
410 caller_permissions=caller_permissions,
411 is_active=request.is_active,
412 )
414 if not token:
415 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
417 result = TokenResponse(
418 id=token.id,
419 name=token.name,
420 description=token.description,
421 user_email=token.user_email,
422 team_id=token.team_id,
423 created_at=token.created_at,
424 expires_at=token.expires_at,
425 last_used=token.last_used,
426 is_active=token.is_active,
427 tags=token.tags,
428 server_id=token.server_id,
429 resource_scopes=token.resource_scopes,
430 ip_restrictions=token.ip_restrictions,
431 time_restrictions=token.time_restrictions,
432 usage_limits=token.usage_limits,
433 )
434 db.commit()
435 db.close()
436 return result
437 except ValueError as e:
438 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
441@router.delete("/{token_id}", status_code=status.HTTP_204_NO_CONTENT)
442@require_permission("tokens.revoke")
443async def revoke_token(
444 token_id: str,
445 request: Optional[TokenRevokeRequest] = None,
446 current_user=Depends(get_current_user_with_permissions),
447 db: Session = Depends(get_db),
448) -> None:
449 """Revoke (delete) a token.
451 Args:
452 token_id: Token ID to revoke
453 request: Optional revocation request with reason
454 current_user: Authenticated user from JWT
455 db: Database session
457 Raises:
458 HTTPException: If token not found
459 """
460 _require_authenticated_session(current_user)
462 service = TokenCatalogService(db)
464 reason = request.reason if request else "Revoked by user"
465 # SECURITY FIX: Pass user_email for ownership verification
466 success = await service.revoke_token(
467 token_id=token_id,
468 user_email=current_user["email"],
469 revoked_by=current_user["email"],
470 reason=reason,
471 )
473 if not success:
474 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
476 db.commit()
477 db.close()
480@router.get("/{token_id}/usage", response_model=TokenUsageStatsResponse)
481@require_permission("tokens.read")
482async def get_token_usage_stats(
483 token_id: str,
484 days: int = 30,
485 current_user=Depends(get_current_user_with_permissions),
486 db: Session = Depends(get_db),
487) -> TokenUsageStatsResponse:
488 """Get usage statistics for a specific token.
490 Args:
491 token_id: Token ID to get stats for
492 days: Number of days to analyze (default 30)
493 current_user: Authenticated user from JWT
494 db: Database session
496 Returns:
497 TokenUsageStatsResponse: Token usage statistics
499 Raises:
500 HTTPException: If token not found or not owned by user
501 """
502 _require_authenticated_session(current_user)
504 service = TokenCatalogService(db)
506 # Verify token ownership
507 token = await service.get_token(token_id, current_user["email"])
508 if not token:
509 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
511 stats = await service.get_token_usage_stats(user_email=current_user["email"], token_id=token_id, days=days)
513 db.commit()
514 db.close()
515 return TokenUsageStatsResponse(**stats)
518# Admin endpoints for token oversight
519@router.get("/admin/all", response_model=TokenListResponse, tags=["admin"])
520async def list_all_tokens(
521 user_email: Optional[str] = None,
522 include_inactive: bool = False,
523 limit: int = 100,
524 offset: int = 0,
525 current_user=Depends(get_current_user_with_permissions),
526 db: Session = Depends(get_db),
527) -> TokenListResponse:
528 """Admin endpoint to list all tokens or tokens for a specific user.
530 Args:
531 user_email: Filter tokens by user email (admin only)
532 include_inactive: Include inactive/expired tokens
533 limit: Maximum number of tokens to return
534 offset: Number of tokens to skip
535 current_user: Authenticated admin user
536 db: Database session
538 Returns:
539 TokenListResponse: List of tokens
541 Raises:
542 HTTPException: If user is not admin
543 """
544 _require_authenticated_session(current_user)
546 # SECURITY: Require un-narrowed admin. Narrowed/public-only admin sessions
547 # must not access the token oversight surface to prevent privilege escalation.
548 token_teams = current_user.get("token_teams")
549 if not current_user.get("is_admin") or token_teams is not None:
550 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
552 service = TokenCatalogService(db)
554 if user_email:
555 # Get tokens for specific user
556 tokens = await service.list_user_tokens(
557 user_email=user_email,
558 include_inactive=include_inactive,
559 limit=limit,
560 offset=offset,
561 )
562 total_count = await service.count_user_tokens(
563 user_email=user_email,
564 include_inactive=include_inactive,
565 )
566 else:
567 # Admin: get all tokens
568 tokens = await service.list_all_tokens(
569 include_inactive=include_inactive,
570 limit=limit,
571 offset=offset,
572 )
573 total_count = await service.count_all_tokens(
574 include_inactive=include_inactive,
575 )
577 # Batch fetch revocation info (single query instead of N+1)
578 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens])
580 token_responses = []
581 for token in tokens:
582 revocation_info = revocation_map.get(token.jti)
584 token_responses.append(
585 TokenResponse(
586 id=token.id,
587 name=token.name,
588 description=token.description,
589 user_email=token.user_email,
590 team_id=token.team_id,
591 created_at=token.created_at,
592 expires_at=token.expires_at,
593 last_used=token.last_used,
594 is_active=token.is_active,
595 is_revoked=revocation_info is not None,
596 revoked_at=revocation_info.revoked_at if revocation_info else None,
597 revoked_by=revocation_info.revoked_by if revocation_info else None,
598 revocation_reason=revocation_info.reason if revocation_info else None,
599 tags=token.tags,
600 server_id=token.server_id,
601 resource_scopes=token.resource_scopes,
602 ip_restrictions=token.ip_restrictions,
603 time_restrictions=token.time_restrictions,
604 usage_limits=token.usage_limits,
605 )
606 )
608 db.commit()
609 db.close()
610 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset)
613@router.delete("/admin/{token_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["admin"])
614async def admin_revoke_token(
615 token_id: str,
616 request: Optional[TokenRevokeRequest] = None,
617 current_user=Depends(get_current_user_with_permissions),
618 db: Session = Depends(get_db),
619) -> None:
620 """Admin endpoint to revoke any token.
622 Args:
623 token_id: Token ID to revoke
624 request: Optional revocation request with reason
625 current_user: Authenticated admin user
626 db: Database session
628 Raises:
629 HTTPException: If user is not admin or token not found
630 """
631 _require_authenticated_session(current_user)
633 # SECURITY: Require un-narrowed admin. Narrowed/public-only admin sessions
634 # must not revoke arbitrary tokens to prevent privilege escalation.
635 revoke_token_teams = current_user.get("token_teams")
636 if not current_user.get("is_admin") or revoke_token_teams is not None:
637 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
639 service = TokenCatalogService(db)
640 admin_email = current_user["email"]
641 reason = request.reason if request else f"Revoked by admin {admin_email}"
643 # Use admin method - no ownership check
644 success = await service.admin_revoke_token(
645 token_id=token_id,
646 revoked_by=admin_email,
647 reason=reason,
648 )
650 if not success:
651 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
653 db.commit()
654 db.close()
657# Team-based token endpoints
658@router.post("/teams/{team_id}", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED)
659@require_permission("tokens.create")
660async def create_team_token(
661 team_id: str,
662 request: TokenCreateRequest,
663 current_user=Depends(get_current_user_with_permissions),
664 db: Session = Depends(get_db),
665) -> TokenCreateResponse:
666 """Create a new API token for a team (only team owners can do this).
668 Args:
669 team_id: Team ID to create token for
670 request: Token creation request with name, description, scoping, etc.
671 current_user: Authenticated user (must be team owner)
672 db: Database session
674 Returns:
675 TokenCreateResponse: Created token details with raw token
677 Raises:
678 HTTPException: If user is not team owner or validation fails
679 """
680 _require_authenticated_session(current_user)
682 service = TokenCatalogService(db)
684 # Use team_id from path for permission context
685 caller_permissions = None
686 if request.scope and request.scope.permissions:
687 caller_permissions = await _get_caller_permissions(db, current_user, team_id)
689 # Convert request to TokenScope if provided
690 scope = None
691 if request.scope:
692 scope = TokenScope(
693 server_id=request.scope.server_id,
694 permissions=request.scope.permissions,
695 ip_restrictions=request.scope.ip_restrictions,
696 time_restrictions=request.scope.time_restrictions,
697 usage_limits=request.scope.usage_limits,
698 )
700 try:
701 token_record, raw_token = await service.create_token(
702 user_email=current_user["email"],
703 name=request.name,
704 description=request.description,
705 scope=scope,
706 expires_in_days=request.expires_in_days,
707 tags=request.tags,
708 team_id=team_id, # This will validate team ownership
709 caller_permissions=caller_permissions,
710 is_active=request.is_active,
711 )
713 # Create TokenResponse for the token info
714 token_response = TokenResponse(
715 id=token_record.id,
716 name=token_record.name,
717 description=token_record.description,
718 user_email=token_record.user_email,
719 team_id=token_record.team_id,
720 server_id=token_record.server_id,
721 resource_scopes=token_record.resource_scopes or [],
722 ip_restrictions=token_record.ip_restrictions or [],
723 time_restrictions=token_record.time_restrictions or {},
724 usage_limits=token_record.usage_limits or {},
725 created_at=token_record.created_at,
726 expires_at=token_record.expires_at,
727 last_used=token_record.last_used,
728 is_active=token_record.is_active,
729 tags=token_record.tags or [],
730 )
732 db.commit()
733 db.close()
734 return TokenCreateResponse(
735 token=token_response,
736 access_token=raw_token,
737 )
738 except ValueError as e:
739 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
740 except IntegrityError as e:
741 db.rollback()
742 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e)
743 # Match the specific name constraint: PostgreSQL reports the constraint name
744 # (either the db.py name or the Alembic migration name); SQLite reports column paths.
745 if (
746 "uq_email_api_tokens_user_name_team" in err_str
747 or "uq_email_api_tokens_user_name" in err_str
748 or "uq_email_api_tokens_user_email_name" in err_str
749 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str)
750 ):
751 raise HTTPException(
752 status_code=status.HTTP_409_CONFLICT,
753 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.",
754 )
755 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.")
758@router.get("/teams/{team_id}", response_model=TokenListResponse)
759@require_permission("tokens.read")
760async def list_team_tokens(
761 team_id: str,
762 include_inactive: bool = False,
763 limit: int = 50,
764 offset: int = 0,
765 current_user=Depends(get_current_user_with_permissions),
766 db: Session = Depends(get_db),
767) -> TokenListResponse:
768 """List API tokens for a team (requires active team membership).
770 Args:
771 team_id: Team ID to list tokens for
772 include_inactive: Include inactive/expired tokens
773 limit: Maximum number of tokens to return (default 50)
774 offset: Number of tokens to skip for pagination
775 current_user: Authenticated user (must be an active member of the team)
776 db: Database session
778 Returns:
779 TokenListResponse: List of team's API tokens
781 Raises:
782 HTTPException: If user is not an active member of the team
783 """
784 _require_authenticated_session(current_user)
786 service = TokenCatalogService(db)
788 try:
789 tokens = await service.list_team_tokens(
790 team_id=team_id,
791 user_email=current_user["email"], # This will validate team ownership
792 include_inactive=include_inactive,
793 limit=limit,
794 offset=offset,
795 )
797 total_count = await service.count_team_tokens(
798 team_id=team_id,
799 include_inactive=include_inactive,
800 )
802 # Batch fetch revocation info (single query instead of N+1)
803 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens])
805 token_responses = []
806 for token in tokens:
807 revocation_info = revocation_map.get(token.jti)
809 token_responses.append(
810 TokenResponse(
811 id=token.id,
812 name=token.name,
813 description=token.description,
814 user_email=token.user_email,
815 team_id=token.team_id,
816 created_at=token.created_at,
817 expires_at=token.expires_at,
818 last_used=token.last_used,
819 is_active=token.is_active,
820 is_revoked=revocation_info is not None,
821 revoked_at=revocation_info.revoked_at if revocation_info else None,
822 revoked_by=revocation_info.revoked_by if revocation_info else None,
823 revocation_reason=revocation_info.reason if revocation_info else None,
824 tags=token.tags,
825 server_id=token.server_id,
826 resource_scopes=token.resource_scopes,
827 ip_restrictions=token.ip_restrictions,
828 time_restrictions=token.time_restrictions,
829 usage_limits=token.usage_limits,
830 )
831 )
833 db.commit()
834 db.close()
835 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset)
836 except ValueError as e:
837 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))