Coverage for mcpgateway / routers / tokens.py: 99%

211 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/tokens.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7JWT Token Catalog API endpoints. 

8Provides comprehensive API token management with scoping, revocation, and analytics. 

9""" 

10 

11# Standard 

12import logging 

13from typing import List, Optional 

14 

15# Third-Party 

16from fastapi import APIRouter, Depends, HTTPException, status 

17from sqlalchemy.exc import IntegrityError 

18from sqlalchemy.orm import Session 

19 

20# First-Party 

21from mcpgateway.db import get_db 

22from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

23from mcpgateway.schemas import TokenCreateRequest, TokenCreateResponse, TokenListResponse, TokenResponse, TokenRevokeRequest, TokenUpdateRequest, TokenUsageStatsResponse 

24from mcpgateway.services.permission_service import PermissionService 

25from mcpgateway.services.token_catalog_service import TokenCatalogService, TokenScope 

26 

27logger = logging.getLogger(__name__) 

28 

29router = APIRouter(prefix="/tokens", tags=["tokens"]) 

30 

31 

32def _require_authenticated_session(current_user: dict) -> None: 

33 """Block anonymous, unauthenticated, and API-token access to token management endpoints. 

34 

35 Enforces Management Plane isolation: only interactive sessions (JWT from web 

36 login, SSO, or OAuth) may create, list, or revoke tokens. API tokens are 

37 Data Plane credentials and must never be able to manage other tokens 

38 (token-chaining attack vector). 

39 

40 Args: 

41 current_user: User context from get_current_user_with_permissions 

42 

43 Raises: 

44 HTTPException: 403 if auth_method is None, anonymous, or api_token 

45 """ 

46 auth_method = current_user.get("auth_method") 

47 

48 # Fail-secure: block if auth_method not set (indicates incomplete auth flow) 

49 if auth_method is None: 

50 logger.warning("Token management blocked: auth_method not set. This indicates an auth code path that needs to set request.state.auth_method") 

51 raise HTTPException( 

52 status_code=status.HTTP_403_FORBIDDEN, 

53 detail="Token management requires authentication. Authentication method could not be determined.", 

54 ) 

55 

56 # Block anonymous users (missing proxy header or unauthenticated) 

57 if auth_method == "anonymous": 

58 raise HTTPException( 

59 status_code=status.HTTP_403_FORBIDDEN, 

60 detail="Token management requires authentication. Anonymous access is not permitted.", 

61 ) 

62 

63 # Block API tokens from managing other tokens (Management Plane isolation). 

64 # Token CRUD endpoints require an interactive session (JWT from web login or SSO). 

65 # Allowing API tokens here would let a compromised token create new long-lived 

66 # tokens and escalate persistence — a token-chaining attack. 

67 if auth_method == "api_token": 

68 raise HTTPException( 

69 status_code=status.HTTP_403_FORBIDDEN, 

70 detail=("Token management requires an interactive session (JWT from web login or SSO). " "API tokens cannot create, list, or revoke other tokens."), 

71 ) 

72 

73 

74async def _get_caller_permissions( 

75 db: Session, 

76 current_user: dict, 

77 team_id: Optional[str] = None, 

78) -> Optional[List[str]]: 

79 """Get caller's effective permissions for scope containment. 

80 

81 Args: 

82 db: Database session 

83 current_user: User context 

84 team_id: Team context for permission lookup 

85 

86 Returns: 

87 List of permissions, or ["*"] for admins 

88 """ 

89 # SECURITY: Only treat admin as unrestricted when token is un-narrowed. 

90 # Narrowed or public-only admin sessions must derive permissions through 

91 # the token-aware path to enforce Layer 1 scope containment. 

92 token_teams = current_user.get("token_teams") 

93 if current_user.get("is_admin") and token_teams is None: 

94 return ["*"] # Un-narrowed admins can grant anything 

95 

96 permission_service = PermissionService(db) 

97 permissions = await permission_service.get_user_permissions( 

98 user_email=current_user["email"], 

99 team_id=team_id, 

100 token_teams=token_teams, # SECURITY: Respect token narrowing 

101 ) 

102 return list(permissions) if permissions else None 

103 

104 

105@router.post("", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED) 

106@require_permission("tokens.create") 

107async def create_token( 

108 request: TokenCreateRequest, 

109 current_user=Depends(get_current_user_with_permissions), 

110 db: Session = Depends(get_db), 

111) -> TokenCreateResponse: 

112 """Create a new API token for the current user. 

113 

114 Args: 

115 request: Token creation request with name, description, scoping, etc. 

116 current_user: Authenticated user from JWT 

117 db: Database session 

118 

119 Returns: 

120 TokenCreateResponse: Created token details with raw token 

121 

122 Raises: 

123 HTTPException: If token name already exists or validation fails 

124 

125 Examples: 

126 >>> import asyncio 

127 >>> asyncio.iscoroutinefunction(create_token) 

128 True 

129 """ 

130 _require_authenticated_session(current_user) 

131 

132 # Auto-inherit team_id from the caller's single team when not explicitly provided. 

133 # This prevents tokens from being silently scoped to public-only (team_id=None) 

134 # when the user belongs to exactly one team, maintaining RBAC context at token level. 

135 # Multi-team users must specify team_id explicitly to avoid ambiguity. 

136 # Admins with teams=null are exempt and may still create global-scope tokens. 

137 effective_team_id = request.team_id 

138 caller_token_teams = current_user.get("token_teams") 

139 # Only un-narrowed admins (token_teams=None) are exempt from auto-inheritance. 

140 # Narrowed admin sessions use the same team-scoping logic as non-admins. 

141 is_unrestricted_admin = current_user.get("is_admin") and caller_token_teams is None 

142 if effective_team_id is None and not is_unrestricted_admin: 

143 user_teams = caller_token_teams or [] 

144 if len(user_teams) == 1: 

145 effective_team_id = user_teams[0] 

146 logger.debug("Auto-inherited team_id=%s for token creation by %s", effective_team_id, current_user["email"]) 

147 

148 service = TokenCatalogService(db) 

149 

150 # Get caller permissions for scope containment (if custom scope requested) 

151 caller_permissions = None 

152 if request.scope and request.scope.permissions: 

153 caller_permissions = await _get_caller_permissions(db, current_user, effective_team_id) 

154 

155 # Convert request to TokenScope if provided 

156 scope = None 

157 if request.scope: 

158 scope = TokenScope( 

159 server_id=request.scope.server_id, 

160 permissions=request.scope.permissions, 

161 ip_restrictions=request.scope.ip_restrictions, 

162 time_restrictions=request.scope.time_restrictions, 

163 usage_limits=request.scope.usage_limits, 

164 ) 

165 

166 try: 

167 token_record, raw_token = await service.create_token( 

168 user_email=current_user["email"], 

169 name=request.name, 

170 description=request.description, 

171 scope=scope, 

172 expires_in_days=request.expires_in_days, 

173 tags=request.tags, 

174 team_id=effective_team_id, 

175 caller_permissions=caller_permissions, 

176 is_active=request.is_active, 

177 ) 

178 

179 # Create TokenResponse for the token info 

180 token_response = TokenResponse( 

181 id=token_record.id, 

182 name=token_record.name, 

183 description=token_record.description, 

184 user_email=token_record.user_email, 

185 team_id=token_record.team_id, 

186 server_id=token_record.server_id, 

187 resource_scopes=token_record.resource_scopes or [], 

188 ip_restrictions=token_record.ip_restrictions or [], 

189 time_restrictions=token_record.time_restrictions or {}, 

190 usage_limits=token_record.usage_limits or {}, 

191 created_at=token_record.created_at, 

192 expires_at=token_record.expires_at, 

193 last_used=token_record.last_used, 

194 is_active=token_record.is_active, 

195 tags=token_record.tags or [], 

196 ) 

197 

198 db.commit() 

199 db.close() 

200 return TokenCreateResponse( 

201 token=token_response, 

202 access_token=raw_token, 

203 ) 

204 except ValueError as e: 

205 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

206 except IntegrityError as e: 

207 db.rollback() 

208 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e) 

209 # Match the specific name constraint: PostgreSQL reports the constraint name 

210 # (either the db.py name or the Alembic migration name); SQLite reports column paths. 

211 if ( 

212 "uq_email_api_tokens_user_name_team" in err_str 

213 or "uq_email_api_tokens_user_name" in err_str 

214 or "uq_email_api_tokens_user_email_name" in err_str 

215 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str) 

216 ): 

217 raise HTTPException( 

218 status_code=status.HTTP_409_CONFLICT, 

219 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.", 

220 ) 

221 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.") 

222 

223 

224@router.get("", response_model=TokenListResponse) 

225@require_permission("tokens.read") 

226async def list_tokens( 

227 include_inactive: bool = False, 

228 limit: int = 50, 

229 offset: int = 0, 

230 db: Session = Depends(get_db), 

231 current_user=Depends(get_current_user_with_permissions), 

232) -> TokenListResponse: 

233 """List API tokens for the current user. 

234 

235 Args: 

236 include_inactive: Include inactive/expired tokens 

237 limit: Maximum number of tokens to return (default 50) 

238 offset: Number of tokens to skip for pagination 

239 current_user: Authenticated user from JWT 

240 db: Database session 

241 

242 Returns: 

243 TokenListResponse: List of user's API tokens 

244 

245 Examples: 

246 >>> import asyncio 

247 >>> asyncio.iscoroutinefunction(list_tokens) 

248 True 

249 """ 

250 _require_authenticated_session(current_user) 

251 

252 service = TokenCatalogService(db) 

253 tokens = await service.list_user_and_team_tokens( 

254 user_email=current_user["email"], 

255 include_inactive=include_inactive, 

256 limit=limit, 

257 offset=offset, 

258 ) 

259 

260 total_count = await service.count_user_and_team_tokens( 

261 user_email=current_user["email"], 

262 include_inactive=include_inactive, 

263 ) 

264 

265 # Batch fetch revocation info (single query instead of N+1) 

266 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens]) 

267 

268 token_responses = [] 

269 for token in tokens: 

270 revocation_info = revocation_map.get(token.jti) 

271 

272 token_responses.append( 

273 TokenResponse( 

274 id=token.id, 

275 name=token.name, 

276 description=token.description, 

277 user_email=token.user_email, 

278 team_id=token.team_id, 

279 created_at=token.created_at, 

280 expires_at=token.expires_at, 

281 last_used=token.last_used, 

282 is_active=token.is_active, 

283 is_revoked=revocation_info is not None, 

284 revoked_at=revocation_info.revoked_at if revocation_info else None, 

285 revoked_by=revocation_info.revoked_by if revocation_info else None, 

286 revocation_reason=revocation_info.reason if revocation_info else None, 

287 tags=token.tags, 

288 server_id=token.server_id, 

289 resource_scopes=token.resource_scopes, 

290 ip_restrictions=token.ip_restrictions, 

291 time_restrictions=token.time_restrictions, 

292 usage_limits=token.usage_limits, 

293 ) 

294 ) 

295 

296 db.commit() 

297 db.close() 

298 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset) 

299 

300 

301@router.get("/{token_id}", response_model=TokenResponse) 

302@require_permission("tokens.read") 

303async def get_token( 

304 token_id: str, 

305 current_user=Depends(get_current_user_with_permissions), 

306 db: Session = Depends(get_db), 

307) -> TokenResponse: 

308 """Get details of a specific token. 

309 

310 Args: 

311 token_id: Token ID to retrieve 

312 current_user: Authenticated user from JWT 

313 db: Database session 

314 

315 Returns: 

316 TokenResponse: Token details 

317 

318 Raises: 

319 HTTPException: If token not found or not owned by user 

320 

321 Examples: 

322 >>> import asyncio 

323 >>> asyncio.iscoroutinefunction(get_token) 

324 True 

325 """ 

326 _require_authenticated_session(current_user) 

327 

328 service = TokenCatalogService(db) 

329 token = await service.get_token(token_id, current_user["email"]) 

330 

331 if not token: 

332 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

333 

334 db.commit() 

335 db.close() 

336 return TokenResponse( 

337 id=token.id, 

338 name=token.name, 

339 description=token.description, 

340 user_email=token.user_email, 

341 team_id=token.team_id, 

342 created_at=token.created_at, 

343 expires_at=token.expires_at, 

344 last_used=token.last_used, 

345 is_active=token.is_active, 

346 tags=token.tags, 

347 server_id=token.server_id, 

348 resource_scopes=token.resource_scopes, 

349 ip_restrictions=token.ip_restrictions, 

350 time_restrictions=token.time_restrictions, 

351 usage_limits=token.usage_limits, 

352 ) 

353 

354 

355@router.put("/{token_id}", response_model=TokenResponse) 

356@require_permission("tokens.update") 

357async def update_token( 

358 token_id: str, 

359 request: TokenUpdateRequest, 

360 current_user=Depends(get_current_user_with_permissions), 

361 db: Session = Depends(get_db), 

362) -> TokenResponse: 

363 """Update an existing token. 

364 

365 Args: 

366 token_id: Token ID to update 

367 request: Token update request 

368 current_user: Authenticated user from JWT 

369 db: Database session 

370 

371 Returns: 

372 TokenResponse: Updated token details 

373 

374 Raises: 

375 HTTPException: If token not found or validation fails 

376 """ 

377 _require_authenticated_session(current_user) 

378 

379 service = TokenCatalogService(db) 

380 

381 # For update, get caller permissions using token's team_id 

382 caller_permissions = None 

383 if request.scope and request.scope.permissions: 

384 # Get existing token to find its team_id 

385 existing_token = await service.get_token(token_id, current_user["email"]) 

386 if not existing_token: 

387 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

388 # Use token's team_id for permission lookup 

389 caller_permissions = await _get_caller_permissions(db, current_user, existing_token.team_id) 

390 

391 # Convert request to TokenScope if provided 

392 scope = None 

393 if request.scope: 

394 scope = TokenScope( 

395 server_id=request.scope.server_id, 

396 permissions=request.scope.permissions, 

397 ip_restrictions=request.scope.ip_restrictions, 

398 time_restrictions=request.scope.time_restrictions, 

399 usage_limits=request.scope.usage_limits, 

400 ) 

401 

402 try: 

403 token = await service.update_token( 

404 token_id=token_id, 

405 user_email=current_user["email"], 

406 name=request.name, 

407 description=request.description, 

408 scope=scope, 

409 tags=request.tags, 

410 caller_permissions=caller_permissions, 

411 is_active=request.is_active, 

412 ) 

413 

414 if not token: 

415 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

416 

417 result = TokenResponse( 

418 id=token.id, 

419 name=token.name, 

420 description=token.description, 

421 user_email=token.user_email, 

422 team_id=token.team_id, 

423 created_at=token.created_at, 

424 expires_at=token.expires_at, 

425 last_used=token.last_used, 

426 is_active=token.is_active, 

427 tags=token.tags, 

428 server_id=token.server_id, 

429 resource_scopes=token.resource_scopes, 

430 ip_restrictions=token.ip_restrictions, 

431 time_restrictions=token.time_restrictions, 

432 usage_limits=token.usage_limits, 

433 ) 

434 db.commit() 

435 db.close() 

436 return result 

437 except ValueError as e: 

438 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

439 

440 

441@router.delete("/{token_id}", status_code=status.HTTP_204_NO_CONTENT) 

442@require_permission("tokens.revoke") 

443async def revoke_token( 

444 token_id: str, 

445 request: Optional[TokenRevokeRequest] = None, 

446 current_user=Depends(get_current_user_with_permissions), 

447 db: Session = Depends(get_db), 

448) -> None: 

449 """Revoke (delete) a token. 

450 

451 Args: 

452 token_id: Token ID to revoke 

453 request: Optional revocation request with reason 

454 current_user: Authenticated user from JWT 

455 db: Database session 

456 

457 Raises: 

458 HTTPException: If token not found 

459 """ 

460 _require_authenticated_session(current_user) 

461 

462 service = TokenCatalogService(db) 

463 

464 reason = request.reason if request else "Revoked by user" 

465 # SECURITY FIX: Pass user_email for ownership verification 

466 success = await service.revoke_token( 

467 token_id=token_id, 

468 user_email=current_user["email"], 

469 revoked_by=current_user["email"], 

470 reason=reason, 

471 ) 

472 

473 if not success: 

474 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

475 

476 db.commit() 

477 db.close() 

478 

479 

480@router.get("/{token_id}/usage", response_model=TokenUsageStatsResponse) 

481@require_permission("tokens.read") 

482async def get_token_usage_stats( 

483 token_id: str, 

484 days: int = 30, 

485 current_user=Depends(get_current_user_with_permissions), 

486 db: Session = Depends(get_db), 

487) -> TokenUsageStatsResponse: 

488 """Get usage statistics for a specific token. 

489 

490 Args: 

491 token_id: Token ID to get stats for 

492 days: Number of days to analyze (default 30) 

493 current_user: Authenticated user from JWT 

494 db: Database session 

495 

496 Returns: 

497 TokenUsageStatsResponse: Token usage statistics 

498 

499 Raises: 

500 HTTPException: If token not found or not owned by user 

501 """ 

502 _require_authenticated_session(current_user) 

503 

504 service = TokenCatalogService(db) 

505 

506 # Verify token ownership 

507 token = await service.get_token(token_id, current_user["email"]) 

508 if not token: 

509 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

510 

511 stats = await service.get_token_usage_stats(user_email=current_user["email"], token_id=token_id, days=days) 

512 

513 db.commit() 

514 db.close() 

515 return TokenUsageStatsResponse(**stats) 

516 

517 

518# Admin endpoints for token oversight 

519@router.get("/admin/all", response_model=TokenListResponse, tags=["admin"]) 

520async def list_all_tokens( 

521 user_email: Optional[str] = None, 

522 include_inactive: bool = False, 

523 limit: int = 100, 

524 offset: int = 0, 

525 current_user=Depends(get_current_user_with_permissions), 

526 db: Session = Depends(get_db), 

527) -> TokenListResponse: 

528 """Admin endpoint to list all tokens or tokens for a specific user. 

529 

530 Args: 

531 user_email: Filter tokens by user email (admin only) 

532 include_inactive: Include inactive/expired tokens 

533 limit: Maximum number of tokens to return 

534 offset: Number of tokens to skip 

535 current_user: Authenticated admin user 

536 db: Database session 

537 

538 Returns: 

539 TokenListResponse: List of tokens 

540 

541 Raises: 

542 HTTPException: If user is not admin 

543 """ 

544 _require_authenticated_session(current_user) 

545 

546 # SECURITY: Require un-narrowed admin. Narrowed/public-only admin sessions 

547 # must not access the token oversight surface to prevent privilege escalation. 

548 token_teams = current_user.get("token_teams") 

549 if not current_user.get("is_admin") or token_teams is not None: 

550 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") 

551 

552 service = TokenCatalogService(db) 

553 

554 if user_email: 

555 # Get tokens for specific user 

556 tokens = await service.list_user_tokens( 

557 user_email=user_email, 

558 include_inactive=include_inactive, 

559 limit=limit, 

560 offset=offset, 

561 ) 

562 total_count = await service.count_user_tokens( 

563 user_email=user_email, 

564 include_inactive=include_inactive, 

565 ) 

566 else: 

567 # Admin: get all tokens 

568 tokens = await service.list_all_tokens( 

569 include_inactive=include_inactive, 

570 limit=limit, 

571 offset=offset, 

572 ) 

573 total_count = await service.count_all_tokens( 

574 include_inactive=include_inactive, 

575 ) 

576 

577 # Batch fetch revocation info (single query instead of N+1) 

578 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens]) 

579 

580 token_responses = [] 

581 for token in tokens: 

582 revocation_info = revocation_map.get(token.jti) 

583 

584 token_responses.append( 

585 TokenResponse( 

586 id=token.id, 

587 name=token.name, 

588 description=token.description, 

589 user_email=token.user_email, 

590 team_id=token.team_id, 

591 created_at=token.created_at, 

592 expires_at=token.expires_at, 

593 last_used=token.last_used, 

594 is_active=token.is_active, 

595 is_revoked=revocation_info is not None, 

596 revoked_at=revocation_info.revoked_at if revocation_info else None, 

597 revoked_by=revocation_info.revoked_by if revocation_info else None, 

598 revocation_reason=revocation_info.reason if revocation_info else None, 

599 tags=token.tags, 

600 server_id=token.server_id, 

601 resource_scopes=token.resource_scopes, 

602 ip_restrictions=token.ip_restrictions, 

603 time_restrictions=token.time_restrictions, 

604 usage_limits=token.usage_limits, 

605 ) 

606 ) 

607 

608 db.commit() 

609 db.close() 

610 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset) 

611 

612 

613@router.delete("/admin/{token_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["admin"]) 

614async def admin_revoke_token( 

615 token_id: str, 

616 request: Optional[TokenRevokeRequest] = None, 

617 current_user=Depends(get_current_user_with_permissions), 

618 db: Session = Depends(get_db), 

619) -> None: 

620 """Admin endpoint to revoke any token. 

621 

622 Args: 

623 token_id: Token ID to revoke 

624 request: Optional revocation request with reason 

625 current_user: Authenticated admin user 

626 db: Database session 

627 

628 Raises: 

629 HTTPException: If user is not admin or token not found 

630 """ 

631 _require_authenticated_session(current_user) 

632 

633 # SECURITY: Require un-narrowed admin. Narrowed/public-only admin sessions 

634 # must not revoke arbitrary tokens to prevent privilege escalation. 

635 revoke_token_teams = current_user.get("token_teams") 

636 if not current_user.get("is_admin") or revoke_token_teams is not None: 

637 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") 

638 

639 service = TokenCatalogService(db) 

640 admin_email = current_user["email"] 

641 reason = request.reason if request else f"Revoked by admin {admin_email}" 

642 

643 # Use admin method - no ownership check 

644 success = await service.admin_revoke_token( 

645 token_id=token_id, 

646 revoked_by=admin_email, 

647 reason=reason, 

648 ) 

649 

650 if not success: 

651 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") 

652 

653 db.commit() 

654 db.close() 

655 

656 

657# Team-based token endpoints 

658@router.post("/teams/{team_id}", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED) 

659@require_permission("tokens.create") 

660async def create_team_token( 

661 team_id: str, 

662 request: TokenCreateRequest, 

663 current_user=Depends(get_current_user_with_permissions), 

664 db: Session = Depends(get_db), 

665) -> TokenCreateResponse: 

666 """Create a new API token for a team (only team owners can do this). 

667 

668 Args: 

669 team_id: Team ID to create token for 

670 request: Token creation request with name, description, scoping, etc. 

671 current_user: Authenticated user (must be team owner) 

672 db: Database session 

673 

674 Returns: 

675 TokenCreateResponse: Created token details with raw token 

676 

677 Raises: 

678 HTTPException: If user is not team owner or validation fails 

679 """ 

680 _require_authenticated_session(current_user) 

681 

682 service = TokenCatalogService(db) 

683 

684 # Use team_id from path for permission context 

685 caller_permissions = None 

686 if request.scope and request.scope.permissions: 

687 caller_permissions = await _get_caller_permissions(db, current_user, team_id) 

688 

689 # Convert request to TokenScope if provided 

690 scope = None 

691 if request.scope: 

692 scope = TokenScope( 

693 server_id=request.scope.server_id, 

694 permissions=request.scope.permissions, 

695 ip_restrictions=request.scope.ip_restrictions, 

696 time_restrictions=request.scope.time_restrictions, 

697 usage_limits=request.scope.usage_limits, 

698 ) 

699 

700 try: 

701 token_record, raw_token = await service.create_token( 

702 user_email=current_user["email"], 

703 name=request.name, 

704 description=request.description, 

705 scope=scope, 

706 expires_in_days=request.expires_in_days, 

707 tags=request.tags, 

708 team_id=team_id, # This will validate team ownership 

709 caller_permissions=caller_permissions, 

710 is_active=request.is_active, 

711 ) 

712 

713 # Create TokenResponse for the token info 

714 token_response = TokenResponse( 

715 id=token_record.id, 

716 name=token_record.name, 

717 description=token_record.description, 

718 user_email=token_record.user_email, 

719 team_id=token_record.team_id, 

720 server_id=token_record.server_id, 

721 resource_scopes=token_record.resource_scopes or [], 

722 ip_restrictions=token_record.ip_restrictions or [], 

723 time_restrictions=token_record.time_restrictions or {}, 

724 usage_limits=token_record.usage_limits or {}, 

725 created_at=token_record.created_at, 

726 expires_at=token_record.expires_at, 

727 last_used=token_record.last_used, 

728 is_active=token_record.is_active, 

729 tags=token_record.tags or [], 

730 ) 

731 

732 db.commit() 

733 db.close() 

734 return TokenCreateResponse( 

735 token=token_response, 

736 access_token=raw_token, 

737 ) 

738 except ValueError as e: 

739 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

740 except IntegrityError as e: 

741 db.rollback() 

742 err_str = str(e.orig) if hasattr(e, "orig") and e.orig else str(e) 

743 # Match the specific name constraint: PostgreSQL reports the constraint name 

744 # (either the db.py name or the Alembic migration name); SQLite reports column paths. 

745 if ( 

746 "uq_email_api_tokens_user_name_team" in err_str 

747 or "uq_email_api_tokens_user_name" in err_str 

748 or "uq_email_api_tokens_user_email_name" in err_str 

749 or ("email_api_tokens.user_email" in err_str and "email_api_tokens.name" in err_str) 

750 ): 

751 raise HTTPException( 

752 status_code=status.HTTP_409_CONFLICT, 

753 detail="A token with this name already exists for this user in the same team scope. Token names must be unique per user per team. Please choose a different name.", 

754 ) 

755 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Token creation failed due to a conflict. Please try again.") 

756 

757 

758@router.get("/teams/{team_id}", response_model=TokenListResponse) 

759@require_permission("tokens.read") 

760async def list_team_tokens( 

761 team_id: str, 

762 include_inactive: bool = False, 

763 limit: int = 50, 

764 offset: int = 0, 

765 current_user=Depends(get_current_user_with_permissions), 

766 db: Session = Depends(get_db), 

767) -> TokenListResponse: 

768 """List API tokens for a team (requires active team membership). 

769 

770 Args: 

771 team_id: Team ID to list tokens for 

772 include_inactive: Include inactive/expired tokens 

773 limit: Maximum number of tokens to return (default 50) 

774 offset: Number of tokens to skip for pagination 

775 current_user: Authenticated user (must be an active member of the team) 

776 db: Database session 

777 

778 Returns: 

779 TokenListResponse: List of team's API tokens 

780 

781 Raises: 

782 HTTPException: If user is not an active member of the team 

783 """ 

784 _require_authenticated_session(current_user) 

785 

786 service = TokenCatalogService(db) 

787 

788 try: 

789 tokens = await service.list_team_tokens( 

790 team_id=team_id, 

791 user_email=current_user["email"], # This will validate team ownership 

792 include_inactive=include_inactive, 

793 limit=limit, 

794 offset=offset, 

795 ) 

796 

797 total_count = await service.count_team_tokens( 

798 team_id=team_id, 

799 include_inactive=include_inactive, 

800 ) 

801 

802 # Batch fetch revocation info (single query instead of N+1) 

803 revocation_map = await service.get_token_revocations_batch([t.jti for t in tokens]) 

804 

805 token_responses = [] 

806 for token in tokens: 

807 revocation_info = revocation_map.get(token.jti) 

808 

809 token_responses.append( 

810 TokenResponse( 

811 id=token.id, 

812 name=token.name, 

813 description=token.description, 

814 user_email=token.user_email, 

815 team_id=token.team_id, 

816 created_at=token.created_at, 

817 expires_at=token.expires_at, 

818 last_used=token.last_used, 

819 is_active=token.is_active, 

820 is_revoked=revocation_info is not None, 

821 revoked_at=revocation_info.revoked_at if revocation_info else None, 

822 revoked_by=revocation_info.revoked_by if revocation_info else None, 

823 revocation_reason=revocation_info.reason if revocation_info else None, 

824 tags=token.tags, 

825 server_id=token.server_id, 

826 resource_scopes=token.resource_scopes, 

827 ip_restrictions=token.ip_restrictions, 

828 time_restrictions=token.time_restrictions, 

829 usage_limits=token.usage_limits, 

830 ) 

831 ) 

832 

833 db.commit() 

834 db.close() 

835 return TokenListResponse(tokens=token_responses, total=total_count, limit=limit, offset=offset) 

836 except ValueError as e: 

837 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))