Coverage for mcpgateway / routers / tokens.py: 96%
180 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/tokens.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7JWT Token Catalog API endpoints.
8Provides comprehensive API token management with scoping, revocation, and analytics.
9"""
11# Standard
12import logging
13from typing import List, Optional
15# Third-Party
16from fastapi import APIRouter, Depends, HTTPException, status
17from sqlalchemy.orm import Session
19# First-Party
20from mcpgateway.db import get_db
21from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
22from mcpgateway.schemas import TokenCreateRequest, TokenCreateResponse, TokenListResponse, TokenResponse, TokenRevokeRequest, TokenUpdateRequest, TokenUsageStatsResponse
23from mcpgateway.services.permission_service import PermissionService
24from mcpgateway.services.token_catalog_service import TokenCatalogService, TokenScope
26logger = logging.getLogger(__name__)
28router = APIRouter(prefix="/tokens", tags=["tokens"])
31def _require_interactive_session(current_user: dict) -> None:
32 """Block API token access to token management endpoints.
34 Token management requires interactive sessions (web UI login, SSO, OIDC, etc.)
35 to prevent privilege escalation via token chaining. This is a hard security
36 boundary that applies to ALL users including admins.
38 ALLOWED auth_methods:
39 - "jwt": Standard web login
40 - "oauth", "oidc", "saml": SSO providers via plugins
41 - "disabled": Development mode (auth disabled)
42 - Any other plugin-defined method that isn't "api_token"
44 BLOCKED:
45 - "api_token": Explicitly blocked
46 - "anonymous": Unauthenticated/missing proxy header
47 - None: Fail-secure - auth flow didn't set auth_method (code bug)
49 Args:
50 current_user: User context from get_current_user_with_permissions
52 Raises:
53 HTTPException: 403 if request is from an API token or auth_method not set
54 """
55 auth_method = current_user.get("auth_method")
57 # Fail-secure: block if auth_method not set (indicates incomplete auth flow)
58 if auth_method is None:
59 logger.warning("Token management blocked: auth_method not set. " "This indicates an auth code path that needs to set request.state.auth_method")
60 raise HTTPException(
61 status_code=status.HTTP_403_FORBIDDEN,
62 detail="Token management requires interactive session. " "Authentication method could not be determined.",
63 )
65 # Block API tokens explicitly
66 if auth_method == "api_token":
67 raise HTTPException(
68 status_code=status.HTTP_403_FORBIDDEN,
69 detail="Token management requires interactive session (web login). " "API tokens cannot create, modify, or revoke tokens.",
70 )
72 # Block anonymous users (missing proxy header or unauthenticated)
73 if auth_method == "anonymous": 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 raise HTTPException(
75 status_code=status.HTTP_403_FORBIDDEN,
76 detail="Token management requires interactive session (web login). " "Anonymous access is not permitted.",
77 )
79 # All other auth_methods (jwt, oauth, oidc, saml, proxy, disabled, etc.) are allowed
82async def _get_caller_permissions(
83 db: Session,
84 current_user: dict,
85 team_id: Optional[str] = None,
86) -> Optional[List[str]]:
87 """Get caller's effective permissions for scope containment.
89 Args:
90 db: Database session
91 current_user: User context
92 team_id: Team context for permission lookup
94 Returns:
95 List of permissions, or ["*"] for admins
96 """
97 if current_user.get("is_admin"): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 return ["*"] # Admins can grant anything
100 permission_service = PermissionService(db)
101 permissions = await permission_service.get_user_permissions(
102 user_email=current_user["email"],
103 team_id=team_id,
104 )
105 return list(permissions) if permissions else None
108@router.post("", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED)
109@require_permission("tokens.create")
110async def create_token(
111 request: TokenCreateRequest,
112 current_user=Depends(get_current_user_with_permissions),
113 db: Session = Depends(get_db),
114) -> TokenCreateResponse:
115 """Create a new API token for the current user.
117 Args:
118 request: Token creation request with name, description, scoping, etc.
119 current_user: Authenticated user from JWT
120 db: Database session
122 Returns:
123 TokenCreateResponse: Created token details with raw token
125 Raises:
126 HTTPException: If token name already exists or validation fails
128 Examples:
129 >>> import asyncio
130 >>> asyncio.iscoroutinefunction(create_token)
131 True
132 """
133 _require_interactive_session(current_user)
135 service = TokenCatalogService(db)
137 # Get caller permissions for scope containment (if custom scope requested)
138 caller_permissions = None
139 if request.scope and request.scope.permissions:
140 caller_permissions = await _get_caller_permissions(db, current_user, request.team_id)
142 # Convert request to TokenScope if provided
143 scope = None
144 if request.scope:
145 scope = TokenScope(
146 server_id=request.scope.server_id,
147 permissions=request.scope.permissions,
148 ip_restrictions=request.scope.ip_restrictions,
149 time_restrictions=request.scope.time_restrictions,
150 usage_limits=request.scope.usage_limits,
151 )
153 try:
154 token_record, raw_token = await service.create_token(
155 user_email=current_user["email"],
156 name=request.name,
157 description=request.description,
158 scope=scope,
159 expires_in_days=request.expires_in_days,
160 tags=request.tags,
161 team_id=request.team_id,
162 caller_permissions=caller_permissions,
163 is_active=request.is_active,
164 )
166 # Create TokenResponse for the token info
167 token_response = TokenResponse(
168 id=token_record.id,
169 name=token_record.name,
170 description=token_record.description,
171 user_email=token_record.user_email,
172 team_id=token_record.team_id,
173 server_id=token_record.server_id,
174 resource_scopes=token_record.resource_scopes or [],
175 ip_restrictions=token_record.ip_restrictions or [],
176 time_restrictions=token_record.time_restrictions or {},
177 usage_limits=token_record.usage_limits or {},
178 created_at=token_record.created_at,
179 expires_at=token_record.expires_at,
180 last_used=token_record.last_used,
181 is_active=token_record.is_active,
182 tags=token_record.tags or [],
183 )
185 db.commit()
186 db.close()
187 return TokenCreateResponse(
188 token=token_response,
189 access_token=raw_token,
190 )
191 except ValueError as e:
192 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
195@router.get("", response_model=TokenListResponse)
196@require_permission("tokens.read")
197async def list_tokens(
198 include_inactive: bool = False,
199 limit: int = 50,
200 offset: int = 0,
201 db: Session = Depends(get_db),
202 current_user=Depends(get_current_user_with_permissions),
203) -> TokenListResponse:
204 """List API tokens for the current user.
206 Args:
207 include_inactive: Include inactive/expired tokens
208 limit: Maximum number of tokens to return (default 50)
209 offset: Number of tokens to skip for pagination
210 current_user: Authenticated user from JWT
211 db: Database session
213 Returns:
214 TokenListResponse: List of user's API tokens
216 Examples:
217 >>> import asyncio
218 >>> asyncio.iscoroutinefunction(list_tokens)
219 True
220 """
221 _require_interactive_session(current_user)
223 service = TokenCatalogService(db)
224 tokens = await service.list_user_tokens(
225 user_email=current_user["email"],
226 include_inactive=include_inactive,
227 limit=limit,
228 offset=offset,
229 )
231 token_responses = []
232 for token in tokens:
233 # Check if token is revoked
234 revocation_info = await service.get_token_revocation(token.jti)
236 token_responses.append(
237 TokenResponse(
238 id=token.id,
239 name=token.name,
240 description=token.description,
241 user_email=token.user_email,
242 team_id=token.team_id,
243 created_at=token.created_at,
244 expires_at=token.expires_at,
245 last_used=token.last_used,
246 is_active=token.is_active,
247 is_revoked=revocation_info is not None,
248 revoked_at=revocation_info.revoked_at if revocation_info else None,
249 revoked_by=revocation_info.revoked_by if revocation_info else None,
250 revocation_reason=revocation_info.reason if revocation_info else None,
251 tags=token.tags,
252 server_id=token.server_id,
253 resource_scopes=token.resource_scopes,
254 ip_restrictions=token.ip_restrictions,
255 time_restrictions=token.time_restrictions,
256 usage_limits=token.usage_limits,
257 )
258 )
260 db.commit()
261 db.close()
262 return TokenListResponse(tokens=token_responses, total=len(token_responses), limit=limit, offset=offset)
265@router.get("/{token_id}", response_model=TokenResponse)
266@require_permission("tokens.read")
267async def get_token(
268 token_id: str,
269 current_user=Depends(get_current_user_with_permissions),
270 db: Session = Depends(get_db),
271) -> TokenResponse:
272 """Get details of a specific token.
274 Args:
275 token_id: Token ID to retrieve
276 current_user: Authenticated user from JWT
277 db: Database session
279 Returns:
280 TokenResponse: Token details
282 Raises:
283 HTTPException: If token not found or not owned by user
285 Examples:
286 >>> import asyncio
287 >>> asyncio.iscoroutinefunction(get_token)
288 True
289 """
290 _require_interactive_session(current_user)
292 service = TokenCatalogService(db)
293 token = await service.get_token(token_id, current_user["email"])
295 if not token:
296 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
298 db.commit()
299 db.close()
300 return TokenResponse(
301 id=token.id,
302 name=token.name,
303 description=token.description,
304 user_email=token.user_email,
305 team_id=token.team_id,
306 created_at=token.created_at,
307 expires_at=token.expires_at,
308 last_used=token.last_used,
309 is_active=token.is_active,
310 tags=token.tags,
311 server_id=token.server_id,
312 resource_scopes=token.resource_scopes,
313 ip_restrictions=token.ip_restrictions,
314 time_restrictions=token.time_restrictions,
315 usage_limits=token.usage_limits,
316 )
319@router.put("/{token_id}", response_model=TokenResponse)
320@require_permission("tokens.update")
321async def update_token(
322 token_id: str,
323 request: TokenUpdateRequest,
324 current_user=Depends(get_current_user_with_permissions),
325 db: Session = Depends(get_db),
326) -> TokenResponse:
327 """Update an existing token.
329 Args:
330 token_id: Token ID to update
331 request: Token update request
332 current_user: Authenticated user from JWT
333 db: Database session
335 Returns:
336 TokenResponse: Updated token details
338 Raises:
339 HTTPException: If token not found or validation fails
340 """
341 _require_interactive_session(current_user)
343 service = TokenCatalogService(db)
345 # For update, get caller permissions using token's team_id
346 caller_permissions = None
347 if request.scope and request.scope.permissions:
348 # Get existing token to find its team_id
349 existing_token = await service.get_token(token_id, current_user["email"])
350 if not existing_token: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
352 # Use token's team_id for permission lookup
353 caller_permissions = await _get_caller_permissions(db, current_user, existing_token.team_id)
355 # Convert request to TokenScope if provided
356 scope = None
357 if request.scope:
358 scope = TokenScope(
359 server_id=request.scope.server_id,
360 permissions=request.scope.permissions,
361 ip_restrictions=request.scope.ip_restrictions,
362 time_restrictions=request.scope.time_restrictions,
363 usage_limits=request.scope.usage_limits,
364 )
366 try:
367 token = await service.update_token(
368 token_id=token_id,
369 user_email=current_user["email"],
370 name=request.name,
371 description=request.description,
372 scope=scope,
373 tags=request.tags,
374 caller_permissions=caller_permissions,
375 is_active=request.is_active,
376 )
378 if not token:
379 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
381 result = TokenResponse(
382 id=token.id,
383 name=token.name,
384 description=token.description,
385 user_email=token.user_email,
386 team_id=token.team_id,
387 created_at=token.created_at,
388 expires_at=token.expires_at,
389 last_used=token.last_used,
390 is_active=token.is_active,
391 tags=token.tags,
392 server_id=token.server_id,
393 resource_scopes=token.resource_scopes,
394 ip_restrictions=token.ip_restrictions,
395 time_restrictions=token.time_restrictions,
396 usage_limits=token.usage_limits,
397 )
398 db.commit()
399 db.close()
400 return result
401 except ValueError as e:
402 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
405@router.delete("/{token_id}", status_code=status.HTTP_204_NO_CONTENT)
406@require_permission("tokens.revoke")
407async def revoke_token(
408 token_id: str,
409 request: Optional[TokenRevokeRequest] = None,
410 current_user=Depends(get_current_user_with_permissions),
411 db: Session = Depends(get_db),
412) -> None:
413 """Revoke (delete) a token.
415 Args:
416 token_id: Token ID to revoke
417 request: Optional revocation request with reason
418 current_user: Authenticated user from JWT
419 db: Database session
421 Raises:
422 HTTPException: If token not found
423 """
424 _require_interactive_session(current_user)
426 service = TokenCatalogService(db)
428 reason = request.reason if request else "Revoked by user"
429 # SECURITY FIX: Pass user_email for ownership verification
430 success = await service.revoke_token(
431 token_id=token_id,
432 user_email=current_user["email"],
433 revoked_by=current_user["email"],
434 reason=reason,
435 )
437 if not success:
438 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
440 db.commit()
441 db.close()
444@router.get("/{token_id}/usage", response_model=TokenUsageStatsResponse)
445@require_permission("tokens.read")
446async def get_token_usage_stats(
447 token_id: str,
448 days: int = 30,
449 current_user=Depends(get_current_user_with_permissions),
450 db: Session = Depends(get_db),
451) -> TokenUsageStatsResponse:
452 """Get usage statistics for a specific token.
454 Args:
455 token_id: Token ID to get stats for
456 days: Number of days to analyze (default 30)
457 current_user: Authenticated user from JWT
458 db: Database session
460 Returns:
461 TokenUsageStatsResponse: Token usage statistics
463 Raises:
464 HTTPException: If token not found or not owned by user
465 """
466 _require_interactive_session(current_user)
468 service = TokenCatalogService(db)
470 # Verify token ownership
471 token = await service.get_token(token_id, current_user["email"])
472 if not token:
473 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
475 stats = await service.get_token_usage_stats(user_email=current_user["email"], token_id=token_id, days=days)
477 db.commit()
478 db.close()
479 return TokenUsageStatsResponse(**stats)
482# Admin endpoints for token oversight
483@router.get("/admin/all", response_model=TokenListResponse, tags=["admin"])
484async def list_all_tokens(
485 user_email: Optional[str] = None,
486 include_inactive: bool = False,
487 limit: int = 100,
488 offset: int = 0,
489 current_user=Depends(get_current_user_with_permissions),
490 db: Session = Depends(get_db),
491) -> TokenListResponse:
492 """Admin endpoint to list all tokens or tokens for a specific user.
494 Args:
495 user_email: Filter tokens by user email (admin only)
496 include_inactive: Include inactive/expired tokens
497 limit: Maximum number of tokens to return
498 offset: Number of tokens to skip
499 current_user: Authenticated admin user
500 db: Database session
502 Returns:
503 TokenListResponse: List of tokens
505 Raises:
506 HTTPException: If user is not admin
507 """
508 _require_interactive_session(current_user)
510 if not current_user["is_admin"]:
511 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
513 service = TokenCatalogService(db)
515 if user_email:
516 # Get tokens for specific user
517 tokens = await service.list_user_tokens(
518 user_email=user_email,
519 include_inactive=include_inactive,
520 limit=limit,
521 offset=offset,
522 )
523 else:
524 # This would need a new method in service for all tokens
525 # For now, return empty list - can implement later if needed
526 tokens = []
528 token_responses = []
529 for token in tokens:
530 # Check if token is revoked
531 revocation_info = await service.get_token_revocation(token.jti)
533 token_responses.append(
534 TokenResponse(
535 id=token.id,
536 name=token.name,
537 description=token.description,
538 user_email=token.user_email,
539 team_id=token.team_id,
540 created_at=token.created_at,
541 expires_at=token.expires_at,
542 last_used=token.last_used,
543 is_active=token.is_active,
544 is_revoked=revocation_info is not None,
545 revoked_at=revocation_info.revoked_at if revocation_info else None,
546 revoked_by=revocation_info.revoked_by if revocation_info else None,
547 revocation_reason=revocation_info.reason if revocation_info else None,
548 tags=token.tags,
549 server_id=token.server_id,
550 resource_scopes=token.resource_scopes,
551 ip_restrictions=token.ip_restrictions,
552 time_restrictions=token.time_restrictions,
553 usage_limits=token.usage_limits,
554 )
555 )
557 db.commit()
558 db.close()
559 return TokenListResponse(tokens=token_responses, total=len(token_responses), limit=limit, offset=offset)
562@router.delete("/admin/{token_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["admin"])
563async def admin_revoke_token(
564 token_id: str,
565 request: Optional[TokenRevokeRequest] = None,
566 current_user=Depends(get_current_user_with_permissions),
567 db: Session = Depends(get_db),
568) -> None:
569 """Admin endpoint to revoke any token.
571 Args:
572 token_id: Token ID to revoke
573 request: Optional revocation request with reason
574 current_user: Authenticated admin user
575 db: Database session
577 Raises:
578 HTTPException: If user is not admin or token not found
579 """
580 _require_interactive_session(current_user)
582 if not current_user["is_admin"]:
583 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
585 service = TokenCatalogService(db)
586 admin_email = current_user["email"]
587 reason = request.reason if request else f"Revoked by admin {admin_email}"
589 # Use admin method - no ownership check
590 success = await service.admin_revoke_token(
591 token_id=token_id,
592 revoked_by=admin_email,
593 reason=reason,
594 )
596 if not success:
597 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")
599 db.commit()
600 db.close()
603# Team-based token endpoints
604@router.post("/teams/{team_id}", response_model=TokenCreateResponse, status_code=status.HTTP_201_CREATED)
605@require_permission("tokens.create")
606async def create_team_token(
607 team_id: str,
608 request: TokenCreateRequest,
609 current_user=Depends(get_current_user_with_permissions),
610 db: Session = Depends(get_db),
611) -> TokenCreateResponse:
612 """Create a new API token for a team (only team owners can do this).
614 Args:
615 team_id: Team ID to create token for
616 request: Token creation request with name, description, scoping, etc.
617 current_user: Authenticated user (must be team owner)
618 db: Database session
620 Returns:
621 TokenCreateResponse: Created token details with raw token
623 Raises:
624 HTTPException: If user is not team owner or validation fails
625 """
626 _require_interactive_session(current_user)
628 service = TokenCatalogService(db)
630 # Use team_id from path for permission context
631 caller_permissions = None
632 if request.scope and request.scope.permissions: 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true
633 caller_permissions = await _get_caller_permissions(db, current_user, team_id)
635 # Convert request to TokenScope if provided
636 scope = None
637 if request.scope: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true
638 scope = TokenScope(
639 server_id=request.scope.server_id,
640 permissions=request.scope.permissions,
641 ip_restrictions=request.scope.ip_restrictions,
642 time_restrictions=request.scope.time_restrictions,
643 usage_limits=request.scope.usage_limits,
644 )
646 try:
647 token_record, raw_token = await service.create_token(
648 user_email=current_user["email"],
649 name=request.name,
650 description=request.description,
651 scope=scope,
652 expires_in_days=request.expires_in_days,
653 tags=request.tags,
654 team_id=team_id, # This will validate team ownership
655 caller_permissions=caller_permissions,
656 is_active=request.is_active,
657 )
659 # Create TokenResponse for the token info
660 token_response = TokenResponse(
661 id=token_record.id,
662 name=token_record.name,
663 description=token_record.description,
664 user_email=token_record.user_email,
665 team_id=token_record.team_id,
666 server_id=token_record.server_id,
667 resource_scopes=token_record.resource_scopes or [],
668 ip_restrictions=token_record.ip_restrictions or [],
669 time_restrictions=token_record.time_restrictions or {},
670 usage_limits=token_record.usage_limits or {},
671 created_at=token_record.created_at,
672 expires_at=token_record.expires_at,
673 last_used=token_record.last_used,
674 is_active=token_record.is_active,
675 tags=token_record.tags or [],
676 )
678 db.commit()
679 db.close()
680 return TokenCreateResponse(
681 token=token_response,
682 access_token=raw_token,
683 )
684 except ValueError as e:
685 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
688@router.get("/teams/{team_id}", response_model=TokenListResponse)
689@require_permission("tokens.read")
690async def list_team_tokens(
691 team_id: str,
692 include_inactive: bool = False,
693 limit: int = 50,
694 offset: int = 0,
695 current_user=Depends(get_current_user_with_permissions),
696 db: Session = Depends(get_db),
697) -> TokenListResponse:
698 """List API tokens for a team (only team owners can do this).
700 Args:
701 team_id: Team ID to list tokens for
702 include_inactive: Include inactive/expired tokens
703 limit: Maximum number of tokens to return (default 50)
704 offset: Number of tokens to skip for pagination
705 current_user: Authenticated user (must be team owner)
706 db: Database session
708 Returns:
709 TokenListResponse: List of teams API tokens
711 Raises:
712 HTTPException: If user is not team owner
713 """
714 _require_interactive_session(current_user)
716 service = TokenCatalogService(db)
718 try:
719 tokens = await service.list_team_tokens(
720 team_id=team_id,
721 user_email=current_user["email"], # This will validate team ownership
722 include_inactive=include_inactive,
723 limit=limit,
724 offset=offset,
725 )
727 token_responses = []
728 for token in tokens:
729 # Check if token is revoked
730 revocation_info = await service.get_token_revocation(token.jti)
732 token_responses.append(
733 TokenResponse(
734 id=token.id,
735 name=token.name,
736 description=token.description,
737 user_email=token.user_email,
738 team_id=token.team_id,
739 created_at=token.created_at,
740 expires_at=token.expires_at,
741 last_used=token.last_used,
742 is_active=token.is_active,
743 is_revoked=revocation_info is not None,
744 revoked_at=revocation_info.revoked_at if revocation_info else None,
745 revoked_by=revocation_info.revoked_by if revocation_info else None,
746 revocation_reason=revocation_info.reason if revocation_info else None,
747 tags=token.tags,
748 server_id=token.server_id,
749 resource_scopes=token.resource_scopes,
750 ip_restrictions=token.ip_restrictions,
751 time_restrictions=token.time_restrictions,
752 usage_limits=token.usage_limits,
753 )
754 )
756 db.commit()
757 db.close()
758 return TokenListResponse(tokens=token_responses, total=len(token_responses), limit=limit, offset=offset)
759 except ValueError as e:
760 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))