Coverage for mcpgateway / routers / tokens.py: 98%

206 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/tokens.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7JWT Token Catalog API endpoints. 

8Provides comprehensive API token management with scoping, revocation, and analytics. 

9""" 

10 

11# Standard 

12import logging 

13from typing import List, Optional 

14 

15# Third-Party 

16from fastapi import APIRouter, Depends, HTTPException, status 

17from sqlalchemy.exc import IntegrityError 

18from sqlalchemy.orm import Session 

19 

20# First-Party 

21from mcpgateway.db import get_db 

22from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

23from mcpgateway.schemas import TokenCreateRequest, TokenCreateResponse, TokenListResponse, TokenResponse, TokenRevokeRequest, TokenUpdateRequest, TokenUsageStatsResponse 

24from mcpgateway.services.permission_service import PermissionService 

25from mcpgateway.services.token_catalog_service import TokenCatalogService, TokenScope 

26 

27logger = logging.getLogger(__name__) 

28 

29router = APIRouter(prefix="/tokens", tags=["tokens"]) 

30 

31 

32def _require_authenticated_session(current_user: dict) -> None: 

33 """Block anonymous, unauthenticated, and API-token access to token management endpoints. 

34 

35 Enforces Management Plane isolation: only interactive sessions (JWT from web 

36 login, SSO, or OAuth) may create, list, or revoke tokens. API tokens are 

37 Data Plane credentials and must never be able to manage other tokens 

38 (token-chaining attack vector). 

39 

40 Args: 

41 current_user: User context from get_current_user_with_permissions 

42 

43 Raises: 

44 HTTPException: 403 if auth_method is None, anonymous, or api_token 

45 """ 

46 auth_method = current_user.get("auth_method") 

47 

48 # Fail-secure: block if auth_method not set (indicates incomplete auth flow) 

49 if auth_method is None: 

50 logger.warning("Token management blocked: auth_method not set. This indicates an auth code path that needs to set request.state.auth_method") 

51 raise HTTPException( 

52 status_code=status.HTTP_403_FORBIDDEN, 

53 detail="Token management requires authentication. Authentication method could not be determined.", 

54 ) 

55 

56 # Block anonymous users (missing proxy header or unauthenticated) 

57 if auth_method == "anonymous": 

58 raise HTTPException( 

59 status_code=status.HTTP_403_FORBIDDEN, 

60 detail="Token management requires authentication. Anonymous access is not permitted.", 

61 ) 

62 

63 # Block API tokens from managing other tokens (Management Plane isolation). 

64 # Token CRUD endpoints require an interactive session (JWT from web login or SSO). 

65 # Allowing API tokens here would let a compromised token create new long-lived 

66 # tokens and escalate persistence — a token-chaining attack. 

67 if auth_method == "api_token": 

68 raise HTTPException( 

69 status_code=status.HTTP_403_FORBIDDEN, 

70 detail=("Token management requires an interactive session (JWT from web login or SSO). " "API tokens cannot create, list, or revoke other tokens."), 

71 ) 

72 

73 

74async def _get_caller_permissions( 

75 db: Session, 

76 current_user: dict, 

77 team_id: Optional[str] = None, 

78) -> Optional[List[str]]: 

79 """Get caller's effective permissions for scope containment. 

80 

81 Args: 

82 db: Database session 

83 current_user: User context 

84 team_id: Team context for permission lookup 

85 

86 Returns: 

87 List of permissions, or ["*"] for admins 

88 """ 

89 if current_user.get("is_admin"): 

90 return ["*"] # Admins can grant anything 

91 

92 permission_service = PermissionService(db) 

93 permissions = await permission_service.get_user_permissions( 

94 user_email=current_user["email"], 

95 team_id=team_id, 

96 ) 

97 return list(permissions) if permissions else None 

98 

99 

100@router.post("", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED) 

101@require_permission("tokens.create") 

102async def create_token( 

103 request: TokenCreateRequest, 

104 current_user=Depends(get_current_user_with_permissions), 

105 db: Session = Depends(get_db), 

106) -> TokenCreateResponse: 

107 """Create a new API token for the current user. 

108 

109 Args: 

110 request: Token creation request with name, description, scoping, etc. 

111 current_user: Authenticated user from JWT 

112 db: Database session 

113 

114 Returns: 

115 TokenCreateResponse: Created token details with raw token 

116 

117 Raises: 

118 HTTPException: If token name already exists or validation fails 

119 

120 Examples: 

121 >>> import asyncio 

122 >>> asyncio.iscoroutinefunction(create_token) 

123 True 

124 """ 

125 _require_authenticated_session(current_user) 

126 

127 # Auto-inherit team_id from the caller's single team when not explicitly provided. 

128 # This prevents tokens from being silently scoped to public-only (team_id=None) 

129 # when the user belongs to exactly one team, maintaining RBAC context at token level. 

130 # Multi-team users must specify team_id explicitly to avoid ambiguity. 

131 # Admins with teams=null are exempt and may still create global-scope tokens. 

132 effective_team_id = request.team_id 

133 if effective_team_id is None and not current_user.get("is_admin"): 

134 user_teams = current_user.get("token_teams") or [] 

135 if len(user_teams) == 1: 

136 effective_team_id = user_teams[0] 

137 logger.debug("Auto-inherited team_id=%s for token creation by %s", effective_team_id, current_user["email"]) 

138 

139 service = TokenCatalogService(db) 

140 

141 # Get caller permissions for scope containment (if custom scope requested) 

142 caller_permissions = None 

143 if request.scope and request.scope.permissions: 

144 caller_permissions = await _get_caller_permissions(db, current_user, effective_team_id) 

145 

146 # Convert request to TokenScope if provided 

147 scope = None 

148 if request.scope: 

149 scope = TokenScope( 

150 server_id=request.scope.server_id, 

151 permissions=request.scope.permissions, 

152 ip_restrictions=request.scope.ip_restrictions, 

153 time_restrictions=request.scope.time_restrictions, 

154 usage_limits=request.scope.usage_limits, 

155 ) 

156 

157 try: 

158 token_record, raw_token = await service.create_token( 

159 user_email=current_user["email"], 

160 name=request.name, 

161 description=request.description, 

162 scope=scope, 

163 expires_in_days=request.expires_in_days, 

164 tags=request.tags, 

165 team_id=effective_team_id, 

166 caller_permissions=caller_permissions, 

167 is_active=request.is_active, 

168 ) 

169 

170 # Create TokenResponse for the token info 

171 token_response = TokenResponse( 

172 id=token_record.id, 

173 name=token_record.name, 

174 description=token_record.description, 

175 user_email=token_record.user_email, 

176 team_id=token_record.team_id, 

177 server_id=token_record.server_id, 

178 resource_scopes=token_record.resource_scopes or [], 

179 ip_restrictions=token_record.ip_restrictions or [], 

180 time_restrictions=token_record.time_restrictions or {}, 

181 usage_limits=token_record.usage_limits or {}, 

182 created_at=token_record.created_at, 

183 expires_at=token_record.expires_at, 

184 last_used=token_record.last_used, 

185 is_active=token_record.is_active, 

186 tags=token_record.tags or [], 

187 ) 

188 

189 db.commit() 

190 db.close() 

191 return TokenCreateResponse( 

192 token=token_response, 

193 access_token=raw_token, 

194 ) 

195 except ValueError as e: 

196 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

197 except IntegrityError as e: 

198 db.rollback() 

199 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e) 

200 # Match the specific name constraint: PostgreSQL reports the constraint name 

201 # (either the db.py name or the Alembic migration name); SQLite reports column paths. 

202 if ( 

203 "uq_email_api_tokens_user_name_team" in err_str 

204 or "uq_email_api_tokens_user_name" in err_str 

205 or "uq_email_api_tokens_user_email_name" in err_str 

206 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str) 

207 ): 

208 raise HTTPException( 

209 status_code=status.HTTP_409_CONFLICT, 

210 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.", 

211 ) 

212 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.") 

213 

214 

215@router.get("", response_model=TokenListResponse) 

216@require_permission("tokens.read") 

217async def list_tokens( 

218 include_inactive: bool = False, 

219 limit: int = 50, 

220 offset: int = 0, 

221 db: Session = Depends(get_db), 

222 current_user=Depends(get_current_user_with_permissions), 

223) -> TokenListResponse: 

224 """List API tokens for the current user. 

225 

226 Args: 

227 include_inactive: Include inactive/expired tokens 

228 limit: Maximum number of tokens to return (default 50) 

229 offset: Number of tokens to skip for pagination 

230 current_user: Authenticated user from JWT 

231 db: Database session 

232 

233 Returns: 

234 TokenListResponse: List of user's API tokens 

235 

236 Examples: 

237 >>> import asyncio 

238 >>> asyncio.iscoroutinefunction(list_tokens) 

239 True 

240 """ 

241 _require_authenticated_session(current_user) 

242 

243 service = TokenCatalogService(db) 

244 tokens = await service.list_user_and_team_tokens( 

245 user_email=current_user["email"], 

246 include_inactive=include_inactive, 

247 limit=limit, 

248 offset=offset, 

249 ) 

250 

251 total_count = await service.count_user_and_team_tokens( 

252 user_email=current_user["email"], 

253 include_inactive=include_inactive, 

254 ) 

255 

256 # Batch fetch revocation info (single query instead of N+1) 

257 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens]) 

258 

259 token_responses = [] 

260 for token in tokens: 

261 revocation_info = revocation_map.get(token.jti) 

262 

263 token_responses.append( 

264 TokenResponse( 

265 id=token.id, 

266 name=token.name, 

267 description=token.description, 

268 user_email=token.user_email, 

269 team_id=token.team_id, 

270 created_at=token.created_at, 

271 expires_at=token.expires_at, 

272 last_used=token.last_used, 

273 is_active=token.is_active, 

274 is_revoked=revocation_info is not None, 

275 revoked_at=revocation_info.revoked_at if revocation_info else None, 

276 revoked_by=revocation_info.revoked_by if revocation_info else None, 

277 revocation_reason=revocation_info.reason if revocation_info else None, 

278 tags=token.tags, 

279 server_id=token.server_id, 

280 resource_scopes=token.resource_scopes, 

281 ip_restrictions=token.ip_restrictions, 

282 time_restrictions=token.time_restrictions, 

283 usage_limits=token.usage_limits, 

284 ) 

285 ) 

286 

287 db.commit() 

288 db.close() 

289 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset) 

290 

291 

292@router.get("/{token_id}", response_model=TokenResponse) 

293@require_permission("tokens.read") 

294async def get_token( 

295 token_id: str, 

296 current_user=Depends(get_current_user_with_permissions), 

297 db: Session = Depends(get_db), 

298) -> TokenResponse: 

299 """Get details of a specific token. 

300 

301 Args: 

302 token_id: Token ID to retrieve 

303 current_user: Authenticated user from JWT 

304 db: Database session 

305 

306 Returns: 

307 TokenResponse: Token details 

308 

309 Raises: 

310 HTTPException: If token not found or not owned by user 

311 

312 Examples: 

313 >>> import asyncio 

314 >>> asyncio.iscoroutinefunction(get_token) 

315 True 

316 """ 

317 _require_authenticated_session(current_user) 

318 

319 service = TokenCatalogService(db) 

320 token = await service.get_token(token_id, current_user["email"]) 

321 

322 if not token: 

323 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

324 

325 db.commit() 

326 db.close() 

327 return TokenResponse( 

328 id=token.id, 

329 name=token.name, 

330 description=token.description, 

331 user_email=token.user_email, 

332 team_id=token.team_id, 

333 created_at=token.created_at, 

334 expires_at=token.expires_at, 

335 last_used=token.last_used, 

336 is_active=token.is_active, 

337 tags=token.tags, 

338 server_id=token.server_id, 

339 resource_scopes=token.resource_scopes, 

340 ip_restrictions=token.ip_restrictions, 

341 time_restrictions=token.time_restrictions, 

342 usage_limits=token.usage_limits, 

343 ) 

344 

345 

346@router.put("/{token_id}", response_model=TokenResponse) 

347@require_permission("tokens.update") 

348async def update_token( 

349 token_id: str, 

350 request: TokenUpdateRequest, 

351 current_user=Depends(get_current_user_with_permissions), 

352 db: Session = Depends(get_db), 

353) -> TokenResponse: 

354 """Update an existing token. 

355 

356 Args: 

357 token_id: Token ID to update 

358 request: Token update request 

359 current_user: Authenticated user from JWT 

360 db: Database session 

361 

362 Returns: 

363 TokenResponse: Updated token details 

364 

365 Raises: 

366 HTTPException: If token not found or validation fails 

367 """ 

368 _require_authenticated_session(current_user) 

369 

370 service = TokenCatalogService(db) 

371 

372 # For update, get caller permissions using token's team_id 

373 caller_permissions = None 

374 if request.scope and request.scope.permissions: 

375 # Get existing token to find its team_id 

376 existing_token = await service.get_token(token_id, current_user["email"]) 

377 if not existing_token: 

378 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

379 # Use token's team_id for permission lookup 

380 caller_permissions = await _get_caller_permissions(db, current_user, existing_token.team_id) 

381 

382 # Convert request to TokenScope if provided 

383 scope = None 

384 if request.scope: 

385 scope = TokenScope( 

386 server_id=request.scope.server_id, 

387 permissions=request.scope.permissions, 

388 ip_restrictions=request.scope.ip_restrictions, 

389 time_restrictions=request.scope.time_restrictions, 

390 usage_limits=request.scope.usage_limits, 

391 ) 

392 

393 try: 

394 token = await service.update_token( 

395 token_id=token_id, 

396 user_email=current_user["email"], 

397 name=request.name, 

398 description=request.description, 

399 scope=scope, 

400 tags=request.tags, 

401 caller_permissions=caller_permissions, 

402 is_active=request.is_active, 

403 ) 

404 

405 if not token: 

406 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

407 

408 result = TokenResponse( 

409 id=token.id, 

410 name=token.name, 

411 description=token.description, 

412 user_email=token.user_email, 

413 team_id=token.team_id, 

414 created_at=token.created_at, 

415 expires_at=token.expires_at, 

416 last_used=token.last_used, 

417 is_active=token.is_active, 

418 tags=token.tags, 

419 server_id=token.server_id, 

420 resource_scopes=token.resource_scopes, 

421 ip_restrictions=token.ip_restrictions, 

422 time_restrictions=token.time_restrictions, 

423 usage_limits=token.usage_limits, 

424 ) 

425 db.commit() 

426 db.close() 

427 return result 

428 except ValueError as e: 

429 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

430 

431 

432@router.delete("/{token_id}", status_code=status.HTTP_204_NO_CONTENT) 

433@require_permission("tokens.revoke") 

434async def revoke_token( 

435 token_id: str, 

436 request: Optional[TokenRevokeRequest] = None, 

437 current_user=Depends(get_current_user_with_permissions), 

438 db: Session = Depends(get_db), 

439) -> None: 

440 """Revoke (delete) a token. 

441 

442 Args: 

443 token_id: Token ID to revoke 

444 request: Optional revocation request with reason 

445 current_user: Authenticated user from JWT 

446 db: Database session 

447 

448 Raises: 

449 HTTPException: If token not found 

450 """ 

451 _require_authenticated_session(current_user) 

452 

453 service = TokenCatalogService(db) 

454 

455 reason = request.reason if request else "Revoked by user" 

456 # SECURITY FIX: Pass user_email for ownership verification 

457 success = await service.revoke_token( 

458 token_id=token_id, 

459 user_email=current_user["email"], 

460 revoked_by=current_user["email"], 

461 reason=reason, 

462 ) 

463 

464 if not success: 

465 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

466 

467 db.commit() 

468 db.close() 

469 

470 

471@router.get("/{token_id}/usage", response_model=TokenUsageStatsResponse) 

472@require_permission("tokens.read") 

473async def get_token_usage_stats( 

474 token_id: str, 

475 days: int = 30, 

476 current_user=Depends(get_current_user_with_permissions), 

477 db: Session = Depends(get_db), 

478) -> TokenUsageStatsResponse: 

479 """Get usage statistics for a specific token. 

480 

481 Args: 

482 token_id: Token ID to get stats for 

483 days: Number of days to analyze (default 30) 

484 current_user: Authenticated user from JWT 

485 db: Database session 

486 

487 Returns: 

488 TokenUsageStatsResponse: Token usage statistics 

489 

490 Raises: 

491 HTTPException: If token not found or not owned by user 

492 """ 

493 _require_authenticated_session(current_user) 

494 

495 service = TokenCatalogService(db) 

496 

497 # Verify token ownership 

498 token = await service.get_token(token_id, current_user["email"]) 

499 if not token: 

500 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

501 

502 stats = await service.get_token_usage_stats(user_email=current_user["email"], token_id=token_id, days=days) 

503 

504 db.commit() 

505 db.close() 

506 return TokenUsageStatsResponse(**stats) 

507 

508 

509# Admin endpoints for token oversight 

510@router.get("/admin/all", response_model=TokenListResponse, tags=["admin"]) 

511async def list_all_tokens( 

512 user_email: Optional[str] = None, 

513 include_inactive: bool = False, 

514 limit: int = 100, 

515 offset: int = 0, 

516 current_user=Depends(get_current_user_with_permissions), 

517 db: Session = Depends(get_db), 

518) -> TokenListResponse: 

519 """Admin endpoint to list all tokens or tokens for a specific user. 

520 

521 Args: 

522 user_email: Filter tokens by user email (admin only) 

523 include_inactive: Include inactive/expired tokens 

524 limit: Maximum number of tokens to return 

525 offset: Number of tokens to skip 

526 current_user: Authenticated admin user 

527 db: Database session 

528 

529 Returns: 

530 TokenListResponse: List of tokens 

531 

532 Raises: 

533 HTTPException: If user is not admin 

534 """ 

535 _require_authenticated_session(current_user) 

536 

537 if not current_user["is_admin"]: 

538 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") 

539 

540 service = TokenCatalogService(db) 

541 

542 if user_email: 

543 # Get tokens for specific user 

544 tokens = await service.list_user_tokens( 

545 user_email=user_email, 

546 include_inactive=include_inactive, 

547 limit=limit, 

548 offset=offset, 

549 ) 

550 total_count = await service.count_user_tokens( 

551 user_email=user_email, 

552 include_inactive=include_inactive, 

553 ) 

554 else: 

555 # Admin: get all tokens 

556 tokens = await service.list_all_tokens( 

557 include_inactive=include_inactive, 

558 limit=limit, 

559 offset=offset, 

560 ) 

561 total_count = await service.count_all_tokens( 

562 include_inactive=include_inactive, 

563 ) 

564 

565 # Batch fetch revocation info (single query instead of N+1) 

566 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens]) 

567 

568 token_responses = [] 

569 for token in tokens: 

570 revocation_info = revocation_map.get(token.jti) 

571 

572 token_responses.append( 

573 TokenResponse( 

574 id=token.id, 

575 name=token.name, 

576 description=token.description, 

577 user_email=token.user_email, 

578 team_id=token.team_id, 

579 created_at=token.created_at, 

580 expires_at=token.expires_at, 

581 last_used=token.last_used, 

582 is_active=token.is_active, 

583 is_revoked=revocation_info is not None, 

584 revoked_at=revocation_info.revoked_at if revocation_info else None, 

585 revoked_by=revocation_info.revoked_by if revocation_info else None, 

586 revocation_reason=revocation_info.reason if revocation_info else None, 

587 tags=token.tags, 

588 server_id=token.server_id, 

589 resource_scopes=token.resource_scopes, 

590 ip_restrictions=token.ip_restrictions, 

591 time_restrictions=token.time_restrictions, 

592 usage_limits=token.usage_limits, 

593 ) 

594 ) 

595 

596 db.commit() 

597 db.close() 

598 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset) 

599 

600 

601@router.delete("/admin/{token_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["admin"]) 

602async def admin_revoke_token( 

603 token_id: str, 

604 request: Optional[TokenRevokeRequest] = None, 

605 current_user=Depends(get_current_user_with_permissions), 

606 db: Session = Depends(get_db), 

607) -> None: 

608 """Admin endpoint to revoke any token. 

609 

610 Args: 

611 token_id: Token ID to revoke 

612 request: Optional revocation request with reason 

613 current_user: Authenticated admin user 

614 db: Database session 

615 

616 Raises: 

617 HTTPException: If user is not admin or token not found 

618 """ 

619 _require_authenticated_session(current_user) 

620 

621 if not current_user["is_admin"]: 

622 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") 

623 

624 service = TokenCatalogService(db) 

625 admin_email = current_user["email"] 

626 reason = request.reason if request else f"Revoked by admin {admin_email}" 

627 

628 # Use admin method - no ownership check 

629 success = await service.admin_revoke_token( 

630 token_id=token_id, 

631 revoked_by=admin_email, 

632 reason=reason, 

633 ) 

634 

635 if not success: 

636 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

637 

638 db.commit() 

639 db.close() 

640 

641 

642# Team-based token endpoints 

643@router.post("/teams/{team_id}", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED) 

644@require_permission("tokens.create") 

645async def create_team_token( 

646 team_id: str, 

647 request: TokenCreateRequest, 

648 current_user=Depends(get_current_user_with_permissions), 

649 db: Session = Depends(get_db), 

650) -> TokenCreateResponse: 

651 """Create a new API token for a team (only team owners can do this). 

652 

653 Args: 

654 team_id: Team ID to create token for 

655 request: Token creation request with name, description, scoping, etc. 

656 current_user: Authenticated user (must be team owner) 

657 db: Database session 

658 

659 Returns: 

660 TokenCreateResponse: Created token details with raw token 

661 

662 Raises: 

663 HTTPException: If user is not team owner or validation fails 

664 """ 

665 _require_authenticated_session(current_user) 

666 

667 service = TokenCatalogService(db) 

668 

669 # Use team_id from path for permission context 

670 caller_permissions = None 

671 if request.scope and request.scope.permissions: 

672 caller_permissions = await _get_caller_permissions(db, current_user, team_id) 

673 

674 # Convert request to TokenScope if provided 

675 scope = None 

676 if request.scope: 

677 scope = TokenScope( 

678 server_id=request.scope.server_id, 

679 permissions=request.scope.permissions, 

680 ip_restrictions=request.scope.ip_restrictions, 

681 time_restrictions=request.scope.time_restrictions, 

682 usage_limits=request.scope.usage_limits, 

683 ) 

684 

685 try: 

686 token_record, raw_token = await service.create_token( 

687 user_email=current_user["email"], 

688 name=request.name, 

689 description=request.description, 

690 scope=scope, 

691 expires_in_days=request.expires_in_days, 

692 tags=request.tags, 

693 team_id=team_id, # This will validate team ownership 

694 caller_permissions=caller_permissions, 

695 is_active=request.is_active, 

696 ) 

697 

698 # Create TokenResponse for the token info 

699 token_response = TokenResponse( 

700 id=token_record.id, 

701 name=token_record.name, 

702 description=token_record.description, 

703 user_email=token_record.user_email, 

704 team_id=token_record.team_id, 

705 server_id=token_record.server_id, 

706 resource_scopes=token_record.resource_scopes or [], 

707 ip_restrictions=token_record.ip_restrictions or [], 

708 time_restrictions=token_record.time_restrictions or {}, 

709 usage_limits=token_record.usage_limits or {}, 

710 created_at=token_record.created_at, 

711 expires_at=token_record.expires_at, 

712 last_used=token_record.last_used, 

713 is_active=token_record.is_active, 

714 tags=token_record.tags or [], 

715 ) 

716 

717 db.commit() 

718 db.close() 

719 return TokenCreateResponse( 

720 token=token_response, 

721 access_token=raw_token, 

722 ) 

723 except ValueError as e: 

724 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

725 except IntegrityError as e: 

726 db.rollback() 

727 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e) 

728 # Match the specific name constraint: PostgreSQL reports the constraint name 

729 # (either the db.py name or the Alembic migration name); SQLite reports column paths. 

730 if ( 

731 "uq_email_api_tokens_user_name_team" in err_str 

732 or "uq_email_api_tokens_user_name" in err_str 

733 or "uq_email_api_tokens_user_email_name" in err_str 

734 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str) 

735 ): 

736 raise HTTPException( 

737 status_code=status.HTTP_409_CONFLICT, 

738 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.", 

739 ) 

740 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.") 

741 

742 

743@router.get("/teams/{team_id}", response_model=TokenListResponse) 

744@require_permission("tokens.read") 

745async def list_team_tokens( 

746 team_id: str, 

747 include_inactive: bool = False, 

748 limit: int = 50, 

749 offset: int = 0, 

750 current_user=Depends(get_current_user_with_permissions), 

751 db: Session = Depends(get_db), 

752) -> TokenListResponse: 

753 """List API tokens for a team (requires active team membership). 

754 

755 Args: 

756 team_id: Team ID to list tokens for 

757 include_inactive: Include inactive/expired tokens 

758 limit: Maximum number of tokens to return (default 50) 

759 offset: Number of tokens to skip for pagination 

760 current_user: Authenticated user (must be an active member of the team) 

761 db: Database session 

762 

763 Returns: 

764 TokenListResponse: List of team's API tokens 

765 

766 Raises: 

767 HTTPException: If user is not an active member of the team 

768 """ 

769 _require_authenticated_session(current_user) 

770 

771 service = TokenCatalogService(db) 

772 

773 try: 

774 tokens = await service.list_team_tokens( 

775 team_id=team_id, 

776 user_email=current_user["email"], # This will validate team ownership 

777 include_inactive=include_inactive, 

778 limit=limit, 

779 offset=offset, 

780 ) 

781 

782 total_count = await service.count_team_tokens( 

783 team_id=team_id, 

784 include_inactive=include_inactive, 

785 ) 

786 

787 # Batch fetch revocation info (single query instead of N+1) 

788 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens]) 

789 

790 token_responses = [] 

791 for token in tokens: 

792 revocation_info = revocation_map.get(token.jti) 

793 

794 token_responses.append( 

795 TokenResponse( 

796 id=token.id, 

797 name=token.name, 

798 description=token.description, 

799 user_email=token.user_email, 

800 team_id=token.team_id, 

801 created_at=token.created_at, 

802 expires_at=token.expires_at, 

803 last_used=token.last_used, 

804 is_active=token.is_active, 

805 is_revoked=revocation_info is not None, 

806 revoked_at=revocation_info.revoked_at if revocation_info else None, 

807 revoked_by=revocation_info.revoked_by if revocation_info else None, 

808 revocation_reason=revocation_info.reason if revocation_info else None, 

809 tags=token.tags, 

810 server_id=token.server_id, 

811 resource_scopes=token.resource_scopes, 

812 ip_restrictions=token.ip_restrictions, 

813 time_restrictions=token.time_restrictions, 

814 usage_limits=token.usage_limits, 

815 ) 

816 ) 

817 

818 db.commit() 

819 db.close() 

820 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset) 

821 except ValueError as e: 

822 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))