Coverage for mcpgateway / routers / tokens.py: 96%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/tokens.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7JWT Token Catalog API endpoints. 

8Provides comprehensive API token management with scoping, revocation, and analytics. 

9""" 

10 

11# Standard 

12import logging 

13from typing import List, Optional 

14 

15# Third-Party 

16from fastapi import APIRouter, Depends, HTTPException, status 

17from sqlalchemy.orm import Session 

18 

19# First-Party 

20from mcpgateway.db import get_db 

21from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

22from mcpgateway.schemas import TokenCreateRequest, TokenCreateResponse, TokenListResponse, TokenResponse, TokenRevokeRequest, TokenUpdateRequest, TokenUsageStatsResponse 

23from mcpgateway.services.permission_service import PermissionService 

24from mcpgateway.services.token_catalog_service import TokenCatalogService, TokenScope 

25 

26logger = logging.getLogger(__name__) 

27 

28router = APIRouter(prefix="/tokens", tags=["tokens"]) 

29 

30 

31def _require_interactive_session(current_user: dict) -> None: 

32 """Block API token access to token management endpoints. 

33 

34 Token management requires interactive sessions (web UI login, SSO, OIDC, etc.) 

35 to prevent privilege escalation via token chaining. This is a hard security 

36 boundary that applies to ALL users including admins. 

37 

38 ALLOWED auth_methods: 

39 - "jwt": Standard web login 

40 - "oauth", "oidc", "saml": SSO providers via plugins 

41 - "disabled": Development mode (auth disabled) 

42 - Any other plugin-defined method that isn't "api_token" 

43 

44 BLOCKED: 

45 - "api_token": Explicitly blocked 

46 - "anonymous": Unauthenticated/missing proxy header 

47 - None: Fail-secure - auth flow didn't set auth_method (code bug) 

48 

49 Args: 

50 current_user: User context from get_current_user_with_permissions 

51 

52 Raises: 

53 HTTPException: 403 if request is from an API token or auth_method not set 

54 """ 

55 auth_method = current_user.get("auth_method") 

56 

57 # Fail-secure: block if auth_method not set (indicates incomplete auth flow) 

58 if auth_method is None: 

59 logger.warning("Token management blocked: auth_method not set. " "This indicates an auth code path that needs to set request.state.auth_method") 

60 raise HTTPException( 

61 status_code=status.HTTP_403_FORBIDDEN, 

62 detail="Token management requires interactive session. " "Authentication method could not be determined.", 

63 ) 

64 

65 # Block API tokens explicitly 

66 if auth_method == "api_token": 

67 raise HTTPException( 

68 status_code=status.HTTP_403_FORBIDDEN, 

69 detail="Token management requires interactive session (web login). " "API tokens cannot create, modify, or revoke tokens.", 

70 ) 

71 

72 # Block anonymous users (missing proxy header or unauthenticated) 

73 if auth_method == "anonymous": 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 raise HTTPException( 

75 status_code=status.HTTP_403_FORBIDDEN, 

76 detail="Token management requires interactive session (web login). " "Anonymous access is not permitted.", 

77 ) 

78 

79 # All other auth_methods (jwt, oauth, oidc, saml, proxy, disabled, etc.) are allowed 

80 

81 

82async def _get_caller_permissions( 

83 db: Session, 

84 current_user: dict, 

85 team_id: Optional[str] = None, 

86) -> Optional[List[str]]: 

87 """Get caller's effective permissions for scope containment. 

88 

89 Args: 

90 db: Database session 

91 current_user: User context 

92 team_id: Team context for permission lookup 

93 

94 Returns: 

95 List of permissions, or ["*"] for admins 

96 """ 

97 if current_user.get("is_admin"): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 return ["*"] # Admins can grant anything 

99 

100 permission_service = PermissionService(db) 

101 permissions = await permission_service.get_user_permissions( 

102 user_email=current_user["email"], 

103 team_id=team_id, 

104 ) 

105 return list(permissions) if permissions else None 

106 

107 

108@router.post("", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED) 

109@require_permission("tokens.create") 

110async def create_token( 

111 request: TokenCreateRequest, 

112 current_user=Depends(get_current_user_with_permissions), 

113 db: Session = Depends(get_db), 

114) -> TokenCreateResponse: 

115 """Create a new API token for the current user. 

116 

117 Args: 

118 request: Token creation request with name, description, scoping, etc. 

119 current_user: Authenticated user from JWT 

120 db: Database session 

121 

122 Returns: 

123 TokenCreateResponse: Created token details with raw token 

124 

125 Raises: 

126 HTTPException: If token name already exists or validation fails 

127 

128 Examples: 

129 >>> import asyncio 

130 >>> asyncio.iscoroutinefunction(create_token) 

131 True 

132 """ 

133 _require_interactive_session(current_user) 

134 

135 service = TokenCatalogService(db) 

136 

137 # Get caller permissions for scope containment (if custom scope requested) 

138 caller_permissions = None 

139 if request.scope and request.scope.permissions: 

140 caller_permissions = await _get_caller_permissions(db, current_user, request.team_id) 

141 

142 # Convert request to TokenScope if provided 

143 scope = None 

144 if request.scope: 

145 scope = TokenScope( 

146 server_id=request.scope.server_id, 

147 permissions=request.scope.permissions, 

148 ip_restrictions=request.scope.ip_restrictions, 

149 time_restrictions=request.scope.time_restrictions, 

150 usage_limits=request.scope.usage_limits, 

151 ) 

152 

153 try: 

154 token_record, raw_token = await service.create_token( 

155 user_email=current_user["email"], 

156 name=request.name, 

157 description=request.description, 

158 scope=scope, 

159 expires_in_days=request.expires_in_days, 

160 tags=request.tags, 

161 team_id=request.team_id, 

162 caller_permissions=caller_permissions, 

163 is_active=request.is_active, 

164 ) 

165 

166 # Create TokenResponse for the token info 

167 token_response = TokenResponse( 

168 id=token_record.id, 

169 name=token_record.name, 

170 description=token_record.description, 

171 user_email=token_record.user_email, 

172 team_id=token_record.team_id, 

173 server_id=token_record.server_id, 

174 resource_scopes=token_record.resource_scopes or [], 

175 ip_restrictions=token_record.ip_restrictions or [], 

176 time_restrictions=token_record.time_restrictions or {}, 

177 usage_limits=token_record.usage_limits or {}, 

178 created_at=token_record.created_at, 

179 expires_at=token_record.expires_at, 

180 last_used=token_record.last_used, 

181 is_active=token_record.is_active, 

182 tags=token_record.tags or [], 

183 ) 

184 

185 db.commit() 

186 db.close() 

187 return TokenCreateResponse( 

188 token=token_response, 

189 access_token=raw_token, 

190 ) 

191 except ValueError as e: 

192 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

193 

194 

195@router.get("", response_model=TokenListResponse) 

196@require_permission("tokens.read") 

197async def list_tokens( 

198 include_inactive: bool = False, 

199 limit: int = 50, 

200 offset: int = 0, 

201 db: Session = Depends(get_db), 

202 current_user=Depends(get_current_user_with_permissions), 

203) -> TokenListResponse: 

204 """List API tokens for the current user. 

205 

206 Args: 

207 include_inactive: Include inactive/expired tokens 

208 limit: Maximum number of tokens to return (default 50) 

209 offset: Number of tokens to skip for pagination 

210 current_user: Authenticated user from JWT 

211 db: Database session 

212 

213 Returns: 

214 TokenListResponse: List of user's API tokens 

215 

216 Examples: 

217 >>> import asyncio 

218 >>> asyncio.iscoroutinefunction(list_tokens) 

219 True 

220 """ 

221 _require_interactive_session(current_user) 

222 

223 service = TokenCatalogService(db) 

224 tokens = await service.list_user_tokens( 

225 user_email=current_user["email"], 

226 include_inactive=include_inactive, 

227 limit=limit, 

228 offset=offset, 

229 ) 

230 

231 token_responses = [] 

232 for token in tokens: 

233 # Check if token is revoked 

234 revocation_info = await service.get_token_revocation(token.jti) 

235 

236 token_responses.append( 

237 TokenResponse( 

238 id=token.id, 

239 name=token.name, 

240 description=token.description, 

241 user_email=token.user_email, 

242 team_id=token.team_id, 

243 created_at=token.created_at, 

244 expires_at=token.expires_at, 

245 last_used=token.last_used, 

246 is_active=token.is_active, 

247 is_revoked=revocation_info is not None, 

248 revoked_at=revocation_info.revoked_at if revocation_info else None, 

249 revoked_by=revocation_info.revoked_by if revocation_info else None, 

250 revocation_reason=revocation_info.reason if revocation_info else None, 

251 tags=token.tags, 

252 server_id=token.server_id, 

253 resource_scopes=token.resource_scopes, 

254 ip_restrictions=token.ip_restrictions, 

255 time_restrictions=token.time_restrictions, 

256 usage_limits=token.usage_limits, 

257 ) 

258 ) 

259 

260 db.commit() 

261 db.close() 

262 return TokenListResponse(tokens=token_responses, total=len(token_responses), limit=limit, offset=offset) 

263 

264 

265@router.get("/{token_id}", response_model=TokenResponse) 

266@require_permission("tokens.read") 

267async def get_token( 

268 token_id: str, 

269 current_user=Depends(get_current_user_with_permissions), 

270 db: Session = Depends(get_db), 

271) -> TokenResponse: 

272 """Get details of a specific token. 

273 

274 Args: 

275 token_id: Token ID to retrieve 

276 current_user: Authenticated user from JWT 

277 db: Database session 

278 

279 Returns: 

280 TokenResponse: Token details 

281 

282 Raises: 

283 HTTPException: If token not found or not owned by user 

284 

285 Examples: 

286 >>> import asyncio 

287 >>> asyncio.iscoroutinefunction(get_token) 

288 True 

289 """ 

290 _require_interactive_session(current_user) 

291 

292 service = TokenCatalogService(db) 

293 token = await service.get_token(token_id, current_user["email"]) 

294 

295 if not token: 

296 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

297 

298 db.commit() 

299 db.close() 

300 return TokenResponse( 

301 id=token.id, 

302 name=token.name, 

303 description=token.description, 

304 user_email=token.user_email, 

305 team_id=token.team_id, 

306 created_at=token.created_at, 

307 expires_at=token.expires_at, 

308 last_used=token.last_used, 

309 is_active=token.is_active, 

310 tags=token.tags, 

311 server_id=token.server_id, 

312 resource_scopes=token.resource_scopes, 

313 ip_restrictions=token.ip_restrictions, 

314 time_restrictions=token.time_restrictions, 

315 usage_limits=token.usage_limits, 

316 ) 

317 

318 

319@router.put("/{token_id}", response_model=TokenResponse) 

320@require_permission("tokens.update") 

321async def update_token( 

322 token_id: str, 

323 request: TokenUpdateRequest, 

324 current_user=Depends(get_current_user_with_permissions), 

325 db: Session = Depends(get_db), 

326) -> TokenResponse: 

327 """Update an existing token. 

328 

329 Args: 

330 token_id: Token ID to update 

331 request: Token update request 

332 current_user: Authenticated user from JWT 

333 db: Database session 

334 

335 Returns: 

336 TokenResponse: Updated token details 

337 

338 Raises: 

339 HTTPException: If token not found or validation fails 

340 """ 

341 _require_interactive_session(current_user) 

342 

343 service = TokenCatalogService(db) 

344 

345 # For update, get caller permissions using token's team_id 

346 caller_permissions = None 

347 if request.scope and request.scope.permissions: 

348 # Get existing token to find its team_id 

349 existing_token = await service.get_token(token_id, current_user["email"]) 

350 if not existing_token: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

352 # Use token's team_id for permission lookup 

353 caller_permissions = await _get_caller_permissions(db, current_user, existing_token.team_id) 

354 

355 # Convert request to TokenScope if provided 

356 scope = None 

357 if request.scope: 

358 scope = TokenScope( 

359 server_id=request.scope.server_id, 

360 permissions=request.scope.permissions, 

361 ip_restrictions=request.scope.ip_restrictions, 

362 time_restrictions=request.scope.time_restrictions, 

363 usage_limits=request.scope.usage_limits, 

364 ) 

365 

366 try: 

367 token = await service.update_token( 

368 token_id=token_id, 

369 user_email=current_user["email"], 

370 name=request.name, 

371 description=request.description, 

372 scope=scope, 

373 tags=request.tags, 

374 caller_permissions=caller_permissions, 

375 is_active=request.is_active, 

376 ) 

377 

378 if not token: 

379 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

380 

381 result = TokenResponse( 

382 id=token.id, 

383 name=token.name, 

384 description=token.description, 

385 user_email=token.user_email, 

386 team_id=token.team_id, 

387 created_at=token.created_at, 

388 expires_at=token.expires_at, 

389 last_used=token.last_used, 

390 is_active=token.is_active, 

391 tags=token.tags, 

392 server_id=token.server_id, 

393 resource_scopes=token.resource_scopes, 

394 ip_restrictions=token.ip_restrictions, 

395 time_restrictions=token.time_restrictions, 

396 usage_limits=token.usage_limits, 

397 ) 

398 db.commit() 

399 db.close() 

400 return result 

401 except ValueError as e: 

402 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

403 

404 

405@router.delete("/{token_id}", status_code=status.HTTP_204_NO_CONTENT) 

406@require_permission("tokens.revoke") 

407async def revoke_token( 

408 token_id: str, 

409 request: Optional[TokenRevokeRequest] = None, 

410 current_user=Depends(get_current_user_with_permissions), 

411 db: Session = Depends(get_db), 

412) -> None: 

413 """Revoke (delete) a token. 

414 

415 Args: 

416 token_id: Token ID to revoke 

417 request: Optional revocation request with reason 

418 current_user: Authenticated user from JWT 

419 db: Database session 

420 

421 Raises: 

422 HTTPException: If token not found 

423 """ 

424 _require_interactive_session(current_user) 

425 

426 service = TokenCatalogService(db) 

427 

428 reason = request.reason if request else "Revoked by user" 

429 # SECURITY FIX: Pass user_email for ownership verification 

430 success = await service.revoke_token( 

431 token_id=token_id, 

432 user_email=current_user["email"], 

433 revoked_by=current_user["email"], 

434 reason=reason, 

435 ) 

436 

437 if not success: 

438 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

439 

440 db.commit() 

441 db.close() 

442 

443 

444@router.get("/{token_id}/usage", response_model=TokenUsageStatsResponse) 

445@require_permission("tokens.read") 

446async def get_token_usage_stats( 

447 token_id: str, 

448 days: int = 30, 

449 current_user=Depends(get_current_user_with_permissions), 

450 db: Session = Depends(get_db), 

451) -> TokenUsageStatsResponse: 

452 """Get usage statistics for a specific token. 

453 

454 Args: 

455 token_id: Token ID to get stats for 

456 days: Number of days to analyze (default 30) 

457 current_user: Authenticated user from JWT 

458 db: Database session 

459 

460 Returns: 

461 TokenUsageStatsResponse: Token usage statistics 

462 

463 Raises: 

464 HTTPException: If token not found or not owned by user 

465 """ 

466 _require_interactive_session(current_user) 

467 

468 service = TokenCatalogService(db) 

469 

470 # Verify token ownership 

471 token = await service.get_token(token_id, current_user["email"]) 

472 if not token: 

473 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

474 

475 stats = await service.get_token_usage_stats(user_email=current_user["email"], token_id=token_id, days=days) 

476 

477 db.commit() 

478 db.close() 

479 return TokenUsageStatsResponse(**stats) 

480 

481 

482# Admin endpoints for token oversight 

483@router.get("/admin/all", response_model=TokenListResponse, tags=["admin"]) 

484async def list_all_tokens( 

485 user_email: Optional[str] = None, 

486 include_inactive: bool = False, 

487 limit: int = 100, 

488 offset: int = 0, 

489 current_user=Depends(get_current_user_with_permissions), 

490 db: Session = Depends(get_db), 

491) -> TokenListResponse: 

492 """Admin endpoint to list all tokens or tokens for a specific user. 

493 

494 Args: 

495 user_email: Filter tokens by user email (admin only) 

496 include_inactive: Include inactive/expired tokens 

497 limit: Maximum number of tokens to return 

498 offset: Number of tokens to skip 

499 current_user: Authenticated admin user 

500 db: Database session 

501 

502 Returns: 

503 TokenListResponse: List of tokens 

504 

505 Raises: 

506 HTTPException: If user is not admin 

507 """ 

508 _require_interactive_session(current_user) 

509 

510 if not current_user["is_admin"]: 

511 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") 

512 

513 service = TokenCatalogService(db) 

514 

515 if user_email: 

516 # Get tokens for specific user 

517 tokens = await service.list_user_tokens( 

518 user_email=user_email, 

519 include_inactive=include_inactive, 

520 limit=limit, 

521 offset=offset, 

522 ) 

523 else: 

524 # This would need a new method in service for all tokens 

525 # For now, return empty list - can implement later if needed 

526 tokens = [] 

527 

528 token_responses = [] 

529 for token in tokens: 

530 # Check if token is revoked 

531 revocation_info = await service.get_token_revocation(token.jti) 

532 

533 token_responses.append( 

534 TokenResponse( 

535 id=token.id, 

536 name=token.name, 

537 description=token.description, 

538 user_email=token.user_email, 

539 team_id=token.team_id, 

540 created_at=token.created_at, 

541 expires_at=token.expires_at, 

542 last_used=token.last_used, 

543 is_active=token.is_active, 

544 is_revoked=revocation_info is not None, 

545 revoked_at=revocation_info.revoked_at if revocation_info else None, 

546 revoked_by=revocation_info.revoked_by if revocation_info else None, 

547 revocation_reason=revocation_info.reason if revocation_info else None, 

548 tags=token.tags, 

549 server_id=token.server_id, 

550 resource_scopes=token.resource_scopes, 

551 ip_restrictions=token.ip_restrictions, 

552 time_restrictions=token.time_restrictions, 

553 usage_limits=token.usage_limits, 

554 ) 

555 ) 

556 

557 db.commit() 

558 db.close() 

559 return TokenListResponse(tokens=token_responses, total=len(token_responses), limit=limit, offset=offset) 

560 

561 

562@router.delete("/admin/{token_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["admin"]) 

563async def admin_revoke_token( 

564 token_id: str, 

565 request: Optional[TokenRevokeRequest] = None, 

566 current_user=Depends(get_current_user_with_permissions), 

567 db: Session = Depends(get_db), 

568) -> None: 

569 """Admin endpoint to revoke any token. 

570 

571 Args: 

572 token_id: Token ID to revoke 

573 request: Optional revocation request with reason 

574 current_user: Authenticated admin user 

575 db: Database session 

576 

577 Raises: 

578 HTTPException: If user is not admin or token not found 

579 """ 

580 _require_interactive_session(current_user) 

581 

582 if not current_user["is_admin"]: 

583 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") 

584 

585 service = TokenCatalogService(db) 

586 admin_email = current_user["email"] 

587 reason = request.reason if request else f"Revoked by admin {admin_email}" 

588 

589 # Use admin method - no ownership check 

590 success = await service.admin_revoke_token( 

591 token_id=token_id, 

592 revoked_by=admin_email, 

593 reason=reason, 

594 ) 

595 

596 if not success: 

597 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

598 

599 db.commit() 

600 db.close() 

601 

602 

603# Team-based token endpoints 

604@router.post("/teams/{team_id}", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED) 

605@require_permission("tokens.create") 

606async def create_team_token( 

607 team_id: str, 

608 request: TokenCreateRequest, 

609 current_user=Depends(get_current_user_with_permissions), 

610 db: Session = Depends(get_db), 

611) -> TokenCreateResponse: 

612 """Create a new API token for a team (only team owners can do this). 

613 

614 Args: 

615 team_id: Team ID to create token for 

616 request: Token creation request with name, description, scoping, etc. 

617 current_user: Authenticated user (must be team owner) 

618 db: Database session 

619 

620 Returns: 

621 TokenCreateResponse: Created token details with raw token 

622 

623 Raises: 

624 HTTPException: If user is not team owner or validation fails 

625 """ 

626 _require_interactive_session(current_user) 

627 

628 service = TokenCatalogService(db) 

629 

630 # Use team_id from path for permission context 

631 caller_permissions = None 

632 if request.scope and request.scope.permissions: 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true

633 caller_permissions = await _get_caller_permissions(db, current_user, team_id) 

634 

635 # Convert request to TokenScope if provided 

636 scope = None 

637 if request.scope: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true

638 scope = TokenScope( 

639 server_id=request.scope.server_id, 

640 permissions=request.scope.permissions, 

641 ip_restrictions=request.scope.ip_restrictions, 

642 time_restrictions=request.scope.time_restrictions, 

643 usage_limits=request.scope.usage_limits, 

644 ) 

645 

646 try: 

647 token_record, raw_token = await service.create_token( 

648 user_email=current_user["email"], 

649 name=request.name, 

650 description=request.description, 

651 scope=scope, 

652 expires_in_days=request.expires_in_days, 

653 tags=request.tags, 

654 team_id=team_id, # This will validate team ownership 

655 caller_permissions=caller_permissions, 

656 is_active=request.is_active, 

657 ) 

658 

659 # Create TokenResponse for the token info 

660 token_response = TokenResponse( 

661 id=token_record.id, 

662 name=token_record.name, 

663 description=token_record.description, 

664 user_email=token_record.user_email, 

665 team_id=token_record.team_id, 

666 server_id=token_record.server_id, 

667 resource_scopes=token_record.resource_scopes or [], 

668 ip_restrictions=token_record.ip_restrictions or [], 

669 time_restrictions=token_record.time_restrictions or {}, 

670 usage_limits=token_record.usage_limits or {}, 

671 created_at=token_record.created_at, 

672 expires_at=token_record.expires_at, 

673 last_used=token_record.last_used, 

674 is_active=token_record.is_active, 

675 tags=token_record.tags or [], 

676 ) 

677 

678 db.commit() 

679 db.close() 

680 return TokenCreateResponse( 

681 token=token_response, 

682 access_token=raw_token, 

683 ) 

684 except ValueError as e: 

685 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

686 

687 

688@router.get("/teams/{team_id}", response_model=TokenListResponse) 

689@require_permission("tokens.read") 

690async def list_team_tokens( 

691 team_id: str, 

692 include_inactive: bool = False, 

693 limit: int = 50, 

694 offset: int = 0, 

695 current_user=Depends(get_current_user_with_permissions), 

696 db: Session = Depends(get_db), 

697) -> TokenListResponse: 

698 """List API tokens for a team (only team owners can do this). 

699 

700 Args: 

701 team_id: Team ID to list tokens for 

702 include_inactive: Include inactive/expired tokens 

703 limit: Maximum number of tokens to return (default 50) 

704 offset: Number of tokens to skip for pagination 

705 current_user: Authenticated user (must be team owner) 

706 db: Database session 

707 

708 Returns: 

709 TokenListResponse: List of teams API tokens 

710 

711 Raises: 

712 HTTPException: If user is not team owner 

713 """ 

714 _require_interactive_session(current_user) 

715 

716 service = TokenCatalogService(db) 

717 

718 try: 

719 tokens = await service.list_team_tokens( 

720 team_id=team_id, 

721 user_email=current_user["email"], # This will validate team ownership 

722 include_inactive=include_inactive, 

723 limit=limit, 

724 offset=offset, 

725 ) 

726 

727 token_responses = [] 

728 for token in tokens: 

729 # Check if token is revoked 

730 revocation_info = await service.get_token_revocation(token.jti) 

731 

732 token_responses.append( 

733 TokenResponse( 

734 id=token.id, 

735 name=token.name, 

736 description=token.description, 

737 user_email=token.user_email, 

738 team_id=token.team_id, 

739 created_at=token.created_at, 

740 expires_at=token.expires_at, 

741 last_used=token.last_used, 

742 is_active=token.is_active, 

743 is_revoked=revocation_info is not None, 

744 revoked_at=revocation_info.revoked_at if revocation_info else None, 

745 revoked_by=revocation_info.revoked_by if revocation_info else None, 

746 revocation_reason=revocation_info.reason if revocation_info else None, 

747 tags=token.tags, 

748 server_id=token.server_id, 

749 resource_scopes=token.resource_scopes, 

750 ip_restrictions=token.ip_restrictions, 

751 time_restrictions=token.time_restrictions, 

752 usage_limits=token.usage_limits, 

753 ) 

754 ) 

755 

756 db.commit() 

757 db.close() 

758 return TokenListResponse(tokens=token_responses, total=len(token_responses), limit=limit, offset=offset) 

759 except ValueError as e: 

760 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))