Coverage for mcpgateway / routers / tokens.py: 98%
206 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/tokens.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7JWT Token Catalog API endpoints.
8Provides comprehensive API token management with scoping, revocation, and analytics.
9"""
11# Standard
12import logging
13from typing import List, Optional
15# Third-Party
16from fastapi import APIRouter, Depends, HTTPException, status
17from sqlalchemy.exc import IntegrityError
18from sqlalchemy.orm import Session
20# First-Party
21from mcpgateway.db import get_db
22from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
23from mcpgateway.schemas import TokenCreateRequest, TokenCreateResponse, TokenListResponse, TokenResponse, TokenRevokeRequest, TokenUpdateRequest, TokenUsageStatsResponse
24from mcpgateway.services.permission_service import PermissionService
25from mcpgateway.services.token_catalog_service import TokenCatalogService, TokenScope
27logger = logging.getLogger(__name__)
29router = APIRouter(prefix="/tokens", tags=["tokens"])
32def _require_authenticated_session(current_user: dict) -> None:
33 """Block anonymous, unauthenticated, and API-token access to token management endpoints.
35 Enforces Management Plane isolation: only interactive sessions (JWT from web
36 login, SSO, or OAuth) may create, list, or revoke tokens. API tokens are
37 Data Plane credentials and must never be able to manage other tokens
38 (token-chaining attack vector).
40 Args:
41 current_user: User context from get_current_user_with_permissions
43 Raises:
44 HTTPException: 403 if auth_method is None, anonymous, or api_token
45 """
46 auth_method = current_user.get("auth_method")
48 # Fail-secure: block if auth_method not set (indicates incomplete auth flow)
49 if auth_method is None:
50 logger.warning("Token management blocked: auth_method not set. This indicates an auth code path that needs to set request.state.auth_method")
51 raise HTTPException(
52 status_code=status.HTTP_403_FORBIDDEN,
53 detail="Token management requires authentication. Authentication method could not be determined.",
54 )
56 # Block anonymous users (missing proxy header or unauthenticated)
57 if auth_method == "anonymous":
58 raise HTTPException(
59 status_code=status.HTTP_403_FORBIDDEN,
60 detail="Token management requires authentication. Anonymous access is not permitted.",
61 )
63 # Block API tokens from managing other tokens (Management Plane isolation).
64 # Token CRUD endpoints require an interactive session (JWT from web login or SSO).
65 # Allowing API tokens here would let a compromised token create new long-lived
66 # tokens and escalate persistence — a token-chaining attack.
67 if auth_method == "api_token":
68 raise HTTPException(
69 status_code=status.HTTP_403_FORBIDDEN,
70 detail=("Token management requires an interactive session (JWT from web login or SSO). " "API tokens cannot create, list, or revoke other tokens."),
71 )
74async def _get_caller_permissions(
75 db: Session,
76 current_user: dict,
77 team_id: Optional[str] = None,
78) -> Optional[List[str]]:
79 """Get caller's effective permissions for scope containment.
81 Args:
82 db: Database session
83 current_user: User context
84 team_id: Team context for permission lookup
86 Returns:
87 List of permissions, or ["*"] for admins
88 """
89 if current_user.get("is_admin"):
90 return ["*"] # Admins can grant anything
92 permission_service = PermissionService(db)
93 permissions = await permission_service.get_user_permissions(
94 user_email=current_user["email"],
95 team_id=team_id,
96 )
97 return list(permissions) if permissions else None
100@router.post("", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED)
101@require_permission("tokens.create")
102async def create_token(
103 request: TokenCreateRequest,
104 current_user=Depends(get_current_user_with_permissions),
105 db: Session = Depends(get_db),
106) -> TokenCreateResponse:
107 """Create a new API token for the current user.
109 Args:
110 request: Token creation request with name, description, scoping, etc.
111 current_user: Authenticated user from JWT
112 db: Database session
114 Returns:
115 TokenCreateResponse: Created token details with raw token
117 Raises:
118 HTTPException: If token name already exists or validation fails
120 Examples:
121 >>> import asyncio
122 >>> asyncio.iscoroutinefunction(create_token)
123 True
124 """
125 _require_authenticated_session(current_user)
127 # Auto-inherit team_id from the caller's single team when not explicitly provided.
128 # This prevents tokens from being silently scoped to public-only (team_id=None)
129 # when the user belongs to exactly one team, maintaining RBAC context at token level.
130 # Multi-team users must specify team_id explicitly to avoid ambiguity.
131 # Admins with teams=null are exempt and may still create global-scope tokens.
132 effective_team_id = request.team_id
133 if effective_team_id is None and not current_user.get("is_admin"):
134 user_teams = current_user.get("token_teams") or []
135 if len(user_teams) == 1:
136 effective_team_id = user_teams[0]
137 logger.debug("Auto-inherited team_id=%s for token creation by %s", effective_team_id, current_user["email"])
139 service = TokenCatalogService(db)
141 # Get caller permissions for scope containment (if custom scope requested)
142 caller_permissions = None
143 if request.scope and request.scope.permissions:
144 caller_permissions = await _get_caller_permissions(db, current_user, effective_team_id)
146 # Convert request to TokenScope if provided
147 scope = None
148 if request.scope:
149 scope = TokenScope(
150 server_id=request.scope.server_id,
151 permissions=request.scope.permissions,
152 ip_restrictions=request.scope.ip_restrictions,
153 time_restrictions=request.scope.time_restrictions,
154 usage_limits=request.scope.usage_limits,
155 )
157 try:
158 token_record, raw_token = await service.create_token(
159 user_email=current_user["email"],
160 name=request.name,
161 description=request.description,
162 scope=scope,
163 expires_in_days=request.expires_in_days,
164 tags=request.tags,
165 team_id=effective_team_id,
166 caller_permissions=caller_permissions,
167 is_active=request.is_active,
168 )
170 # Create TokenResponse for the token info
171 token_response = TokenResponse(
172 id=token_record.id,
173 name=token_record.name,
174 description=token_record.description,
175 user_email=token_record.user_email,
176 team_id=token_record.team_id,
177 server_id=token_record.server_id,
178 resource_scopes=token_record.resource_scopes or [],
179 ip_restrictions=token_record.ip_restrictions or [],
180 time_restrictions=token_record.time_restrictions or {},
181 usage_limits=token_record.usage_limits or {},
182 created_at=token_record.created_at,
183 expires_at=token_record.expires_at,
184 last_used=token_record.last_used,
185 is_active=token_record.is_active,
186 tags=token_record.tags or [],
187 )
189 db.commit()
190 db.close()
191 return TokenCreateResponse(
192 token=token_response,
193 access_token=raw_token,
194 )
195 except ValueError as e:
196 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
197 except IntegrityError as e:
198 db.rollback()
199 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e)
200 # Match the specific name constraint: PostgreSQL reports the constraint name
201 # (either the db.py name or the Alembic migration name); SQLite reports column paths.
202 if (
203 "uq_email_api_tokens_user_name_team" in err_str
204 or "uq_email_api_tokens_user_name" in err_str
205 or "uq_email_api_tokens_user_email_name" in err_str
206 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str)
207 ):
208 raise HTTPException(
209 status_code=status.HTTP_409_CONFLICT,
210 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.",
211 )
212 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.")
215@router.get("", response_model=TokenListResponse)
216@require_permission("tokens.read")
217async def list_tokens(
218 include_inactive: bool = False,
219 limit: int = 50,
220 offset: int = 0,
221 db: Session = Depends(get_db),
222 current_user=Depends(get_current_user_with_permissions),
223) -> TokenListResponse:
224 """List API tokens for the current user.
226 Args:
227 include_inactive: Include inactive/expired tokens
228 limit: Maximum number of tokens to return (default 50)
229 offset: Number of tokens to skip for pagination
230 current_user: Authenticated user from JWT
231 db: Database session
233 Returns:
234 TokenListResponse: List of user's API tokens
236 Examples:
237 >>> import asyncio
238 >>> asyncio.iscoroutinefunction(list_tokens)
239 True
240 """
241 _require_authenticated_session(current_user)
243 service = TokenCatalogService(db)
244 tokens = await service.list_user_and_team_tokens(
245 user_email=current_user["email"],
246 include_inactive=include_inactive,
247 limit=limit,
248 offset=offset,
249 )
251 total_count = await service.count_user_and_team_tokens(
252 user_email=current_user["email"],
253 include_inactive=include_inactive,
254 )
256 # Batch fetch revocation info (single query instead of N+1)
257 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens])
259 token_responses = []
260 for token in tokens:
261 revocation_info = revocation_map.get(token.jti)
263 token_responses.append(
264 TokenResponse(
265 id=token.id,
266 name=token.name,
267 description=token.description,
268 user_email=token.user_email,
269 team_id=token.team_id,
270 created_at=token.created_at,
271 expires_at=token.expires_at,
272 last_used=token.last_used,
273 is_active=token.is_active,
274 is_revoked=revocation_info is not None,
275 revoked_at=revocation_info.revoked_at if revocation_info else None,
276 revoked_by=revocation_info.revoked_by if revocation_info else None,
277 revocation_reason=revocation_info.reason if revocation_info else None,
278 tags=token.tags,
279 server_id=token.server_id,
280 resource_scopes=token.resource_scopes,
281 ip_restrictions=token.ip_restrictions,
282 time_restrictions=token.time_restrictions,
283 usage_limits=token.usage_limits,
284 )
285 )
287 db.commit()
288 db.close()
289 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset)
292@router.get("/{token_id}", response_model=TokenResponse)
293@require_permission("tokens.read")
294async def get_token(
295 token_id: str,
296 current_user=Depends(get_current_user_with_permissions),
297 db: Session = Depends(get_db),
298) -> TokenResponse:
299 """Get details of a specific token.
301 Args:
302 token_id: Token ID to retrieve
303 current_user: Authenticated user from JWT
304 db: Database session
306 Returns:
307 TokenResponse: Token details
309 Raises:
310 HTTPException: If token not found or not owned by user
312 Examples:
313 >>> import asyncio
314 >>> asyncio.iscoroutinefunction(get_token)
315 True
316 """
317 _require_authenticated_session(current_user)
319 service = TokenCatalogService(db)
320 token = await service.get_token(token_id, current_user["email"])
322 if not token:
323 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
325 db.commit()
326 db.close()
327 return TokenResponse(
328 id=token.id,
329 name=token.name,
330 description=token.description,
331 user_email=token.user_email,
332 team_id=token.team_id,
333 created_at=token.created_at,
334 expires_at=token.expires_at,
335 last_used=token.last_used,
336 is_active=token.is_active,
337 tags=token.tags,
338 server_id=token.server_id,
339 resource_scopes=token.resource_scopes,
340 ip_restrictions=token.ip_restrictions,
341 time_restrictions=token.time_restrictions,
342 usage_limits=token.usage_limits,
343 )
346@router.put("/{token_id}", response_model=TokenResponse)
347@require_permission("tokens.update")
348async def update_token(
349 token_id: str,
350 request: TokenUpdateRequest,
351 current_user=Depends(get_current_user_with_permissions),
352 db: Session = Depends(get_db),
353) -> TokenResponse:
354 """Update an existing token.
356 Args:
357 token_id: Token ID to update
358 request: Token update request
359 current_user: Authenticated user from JWT
360 db: Database session
362 Returns:
363 TokenResponse: Updated token details
365 Raises:
366 HTTPException: If token not found or validation fails
367 """
368 _require_authenticated_session(current_user)
370 service = TokenCatalogService(db)
372 # For update, get caller permissions using token's team_id
373 caller_permissions = None
374 if request.scope and request.scope.permissions:
375 # Get existing token to find its team_id
376 existing_token = await service.get_token(token_id, current_user["email"])
377 if not existing_token:
378 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
379 # Use token's team_id for permission lookup
380 caller_permissions = await _get_caller_permissions(db, current_user, existing_token.team_id)
382 # Convert request to TokenScope if provided
383 scope = None
384 if request.scope:
385 scope = TokenScope(
386 server_id=request.scope.server_id,
387 permissions=request.scope.permissions,
388 ip_restrictions=request.scope.ip_restrictions,
389 time_restrictions=request.scope.time_restrictions,
390 usage_limits=request.scope.usage_limits,
391 )
393 try:
394 token = await service.update_token(
395 token_id=token_id,
396 user_email=current_user["email"],
397 name=request.name,
398 description=request.description,
399 scope=scope,
400 tags=request.tags,
401 caller_permissions=caller_permissions,
402 is_active=request.is_active,
403 )
405 if not token:
406 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
408 result = TokenResponse(
409 id=token.id,
410 name=token.name,
411 description=token.description,
412 user_email=token.user_email,
413 team_id=token.team_id,
414 created_at=token.created_at,
415 expires_at=token.expires_at,
416 last_used=token.last_used,
417 is_active=token.is_active,
418 tags=token.tags,
419 server_id=token.server_id,
420 resource_scopes=token.resource_scopes,
421 ip_restrictions=token.ip_restrictions,
422 time_restrictions=token.time_restrictions,
423 usage_limits=token.usage_limits,
424 )
425 db.commit()
426 db.close()
427 return result
428 except ValueError as e:
429 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
432@router.delete("/{token_id}", status_code=status.HTTP_204_NO_CONTENT)
433@require_permission("tokens.revoke")
434async def revoke_token(
435 token_id: str,
436 request: Optional[TokenRevokeRequest] = None,
437 current_user=Depends(get_current_user_with_permissions),
438 db: Session = Depends(get_db),
439) -> None:
440 """Revoke (delete) a token.
442 Args:
443 token_id: Token ID to revoke
444 request: Optional revocation request with reason
445 current_user: Authenticated user from JWT
446 db: Database session
448 Raises:
449 HTTPException: If token not found
450 """
451 _require_authenticated_session(current_user)
453 service = TokenCatalogService(db)
455 reason = request.reason if request else "Revoked by user"
456 # SECURITY FIX: Pass user_email for ownership verification
457 success = await service.revoke_token(
458 token_id=token_id,
459 user_email=current_user["email"],
460 revoked_by=current_user["email"],
461 reason=reason,
462 )
464 if not success:
465 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
467 db.commit()
468 db.close()
471@router.get("/{token_id}/usage", response_model=TokenUsageStatsResponse)
472@require_permission("tokens.read")
473async def get_token_usage_stats(
474 token_id: str,
475 days: int = 30,
476 current_user=Depends(get_current_user_with_permissions),
477 db: Session = Depends(get_db),
478) -> TokenUsageStatsResponse:
479 """Get usage statistics for a specific token.
481 Args:
482 token_id: Token ID to get stats for
483 days: Number of days to analyze (default 30)
484 current_user: Authenticated user from JWT
485 db: Database session
487 Returns:
488 TokenUsageStatsResponse: Token usage statistics
490 Raises:
491 HTTPException: If token not found or not owned by user
492 """
493 _require_authenticated_session(current_user)
495 service = TokenCatalogService(db)
497 # Verify token ownership
498 token = await service.get_token(token_id, current_user["email"])
499 if not token:
500 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
502 stats = await service.get_token_usage_stats(user_email=current_user["email"], token_id=token_id, days=days)
504 db.commit()
505 db.close()
506 return TokenUsageStatsResponse(**stats)
509# Admin endpoints for token oversight
510@router.get("/admin/all", response_model=TokenListResponse, tags=["admin"])
511async def list_all_tokens(
512 user_email: Optional[str] = None,
513 include_inactive: bool = False,
514 limit: int = 100,
515 offset: int = 0,
516 current_user=Depends(get_current_user_with_permissions),
517 db: Session = Depends(get_db),
518) -> TokenListResponse:
519 """Admin endpoint to list all tokens or tokens for a specific user.
521 Args:
522 user_email: Filter tokens by user email (admin only)
523 include_inactive: Include inactive/expired tokens
524 limit: Maximum number of tokens to return
525 offset: Number of tokens to skip
526 current_user: Authenticated admin user
527 db: Database session
529 Returns:
530 TokenListResponse: List of tokens
532 Raises:
533 HTTPException: If user is not admin
534 """
535 _require_authenticated_session(current_user)
537 if not current_user["is_admin"]:
538 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
540 service = TokenCatalogService(db)
542 if user_email:
543 # Get tokens for specific user
544 tokens = await service.list_user_tokens(
545 user_email=user_email,
546 include_inactive=include_inactive,
547 limit=limit,
548 offset=offset,
549 )
550 total_count = await service.count_user_tokens(
551 user_email=user_email,
552 include_inactive=include_inactive,
553 )
554 else:
555 # Admin: get all tokens
556 tokens = await service.list_all_tokens(
557 include_inactive=include_inactive,
558 limit=limit,
559 offset=offset,
560 )
561 total_count = await service.count_all_tokens(
562 include_inactive=include_inactive,
563 )
565 # Batch fetch revocation info (single query instead of N+1)
566 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens])
568 token_responses = []
569 for token in tokens:
570 revocation_info = revocation_map.get(token.jti)
572 token_responses.append(
573 TokenResponse(
574 id=token.id,
575 name=token.name,
576 description=token.description,
577 user_email=token.user_email,
578 team_id=token.team_id,
579 created_at=token.created_at,
580 expires_at=token.expires_at,
581 last_used=token.last_used,
582 is_active=token.is_active,
583 is_revoked=revocation_info is not None,
584 revoked_at=revocation_info.revoked_at if revocation_info else None,
585 revoked_by=revocation_info.revoked_by if revocation_info else None,
586 revocation_reason=revocation_info.reason if revocation_info else None,
587 tags=token.tags,
588 server_id=token.server_id,
589 resource_scopes=token.resource_scopes,
590 ip_restrictions=token.ip_restrictions,
591 time_restrictions=token.time_restrictions,
592 usage_limits=token.usage_limits,
593 )
594 )
596 db.commit()
597 db.close()
598 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset)
601@router.delete("/admin/{token_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["admin"])
602async def admin_revoke_token(
603 token_id: str,
604 request: Optional[TokenRevokeRequest] = None,
605 current_user=Depends(get_current_user_with_permissions),
606 db: Session = Depends(get_db),
607) -> None:
608 """Admin endpoint to revoke any token.
610 Args:
611 token_id: Token ID to revoke
612 request: Optional revocation request with reason
613 current_user: Authenticated admin user
614 db: Database session
616 Raises:
617 HTTPException: If user is not admin or token not found
618 """
619 _require_authenticated_session(current_user)
621 if not current_user["is_admin"]:
622 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
624 service = TokenCatalogService(db)
625 admin_email = current_user["email"]
626 reason = request.reason if request else f"Revoked by admin {admin_email}"
628 # Use admin method - no ownership check
629 success = await service.admin_revoke_token(
630 token_id=token_id,
631 revoked_by=admin_email,
632 reason=reason,
633 )
635 if not success:
636 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
638 db.commit()
639 db.close()
642# Team-based token endpoints
643@router.post("/teams/{team_id}", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED)
644@require_permission("tokens.create")
645async def create_team_token(
646 team_id: str,
647 request: TokenCreateRequest,
648 current_user=Depends(get_current_user_with_permissions),
649 db: Session = Depends(get_db),
650) -> TokenCreateResponse:
651 """Create a new API token for a team (only team owners can do this).
653 Args:
654 team_id: Team ID to create token for
655 request: Token creation request with name, description, scoping, etc.
656 current_user: Authenticated user (must be team owner)
657 db: Database session
659 Returns:
660 TokenCreateResponse: Created token details with raw token
662 Raises:
663 HTTPException: If user is not team owner or validation fails
664 """
665 _require_authenticated_session(current_user)
667 service = TokenCatalogService(db)
669 # Use team_id from path for permission context
670 caller_permissions = None
671 if request.scope and request.scope.permissions:
672 caller_permissions = await _get_caller_permissions(db, current_user, team_id)
674 # Convert request to TokenScope if provided
675 scope = None
676 if request.scope:
677 scope = TokenScope(
678 server_id=request.scope.server_id,
679 permissions=request.scope.permissions,
680 ip_restrictions=request.scope.ip_restrictions,
681 time_restrictions=request.scope.time_restrictions,
682 usage_limits=request.scope.usage_limits,
683 )
685 try:
686 token_record, raw_token = await service.create_token(
687 user_email=current_user["email"],
688 name=request.name,
689 description=request.description,
690 scope=scope,
691 expires_in_days=request.expires_in_days,
692 tags=request.tags,
693 team_id=team_id, # This will validate team ownership
694 caller_permissions=caller_permissions,
695 is_active=request.is_active,
696 )
698 # Create TokenResponse for the token info
699 token_response = TokenResponse(
700 id=token_record.id,
701 name=token_record.name,
702 description=token_record.description,
703 user_email=token_record.user_email,
704 team_id=token_record.team_id,
705 server_id=token_record.server_id,
706 resource_scopes=token_record.resource_scopes or [],
707 ip_restrictions=token_record.ip_restrictions or [],
708 time_restrictions=token_record.time_restrictions or {},
709 usage_limits=token_record.usage_limits or {},
710 created_at=token_record.created_at,
711 expires_at=token_record.expires_at,
712 last_used=token_record.last_used,
713 is_active=token_record.is_active,
714 tags=token_record.tags or [],
715 )
717 db.commit()
718 db.close()
719 return TokenCreateResponse(
720 token=token_response,
721 access_token=raw_token,
722 )
723 except ValueError as e:
724 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
725 except IntegrityError as e:
726 db.rollback()
727 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e)
728 # Match the specific name constraint: PostgreSQL reports the constraint name
729 # (either the db.py name or the Alembic migration name); SQLite reports column paths.
730 if (
731 "uq_email_api_tokens_user_name_team" in err_str
732 or "uq_email_api_tokens_user_name" in err_str
733 or "uq_email_api_tokens_user_email_name" in err_str
734 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str)
735 ):
736 raise HTTPException(
737 status_code=status.HTTP_409_CONFLICT,
738 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.",
739 )
740 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.")
743@router.get("/teams/{team_id}", response_model=TokenListResponse)
744@require_permission("tokens.read")
745async def list_team_tokens(
746 team_id: str,
747 include_inactive: bool = False,
748 limit: int = 50,
749 offset: int = 0,
750 current_user=Depends(get_current_user_with_permissions),
751 db: Session = Depends(get_db),
752) -> TokenListResponse:
753 """List API tokens for a team (requires active team membership).
755 Args:
756 team_id: Team ID to list tokens for
757 include_inactive: Include inactive/expired tokens
758 limit: Maximum number of tokens to return (default 50)
759 offset: Number of tokens to skip for pagination
760 current_user: Authenticated user (must be an active member of the team)
761 db: Database session
763 Returns:
764 TokenListResponse: List of team's API tokens
766 Raises:
767 HTTPException: If user is not an active member of the team
768 """
769 _require_authenticated_session(current_user)
771 service = TokenCatalogService(db)
773 try:
774 tokens = await service.list_team_tokens(
775 team_id=team_id,
776 user_email=current_user["email"], # This will validate team ownership
777 include_inactive=include_inactive,
778 limit=limit,
779 offset=offset,
780 )
782 total_count = await service.count_team_tokens(
783 team_id=team_id,
784 include_inactive=include_inactive,
785 )
787 # Batch fetch revocation info (single query instead of N+1)
788 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens])
790 token_responses = []
791 for token in tokens:
792 revocation_info = revocation_map.get(token.jti)
794 token_responses.append(
795 TokenResponse(
796 id=token.id,
797 name=token.name,
798 description=token.description,
799 user_email=token.user_email,
800 team_id=token.team_id,
801 created_at=token.created_at,
802 expires_at=token.expires_at,
803 last_used=token.last_used,
804 is_active=token.is_active,
805 is_revoked=revocation_info is not None,
806 revoked_at=revocation_info.revoked_at if revocation_info else None,
807 revoked_by=revocation_info.revoked_by if revocation_info else None,
808 revocation_reason=revocation_info.reason if revocation_info else None,
809 tags=token.tags,
810 server_id=token.server_id,
811 resource_scopes=token.resource_scopes,
812 ip_restrictions=token.ip_restrictions,
813 time_restrictions=token.time_restrictions,
814 usage_limits=token.usage_limits,
815 )
816 )
818 db.commit()
819 db.close()
820 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset)
821 except ValueError as e:
822 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))