Coverage for mcpgateway / services / token_catalog_service.py: 99%
328 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/token_catalog_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Token Catalog Service.
8This module provides comprehensive API token management with scoping,
9revocation, usage tracking, and analytics for email-based users.
11Examples:
12 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService
13 >>> service = TokenCatalogService(None) # Mock database for doctest
14 >>> # Service provides full token lifecycle management
15"""
17# Standard
18import asyncio
19from datetime import datetime, timedelta, timezone
20import hashlib
21import math
22from typing import Dict, List, Optional
23import uuid
25# Third-Party
26from sqlalchemy import and_, case, func, or_, select
27from sqlalchemy.orm import Session
29# First-Party
30from mcpgateway.config import settings
31from mcpgateway.db import EmailApiToken, EmailUser, Permissions, TokenRevocation, TokenUsageLog, utc_now
32from mcpgateway.services.logging_service import LoggingService
33from mcpgateway.utils.create_jwt_token import create_jwt_token
35# Initialize logging
36logging_service = LoggingService()
37logger = logging_service.get_logger(__name__)
39# Strong references to background tasks to prevent GC before completion
40_background_tasks: set[asyncio.Task] = set()
43class TokenScope:
44 """Token scoping configuration for fine-grained access control.
46 This class encapsulates token scoping parameters including
47 server restrictions, permissions, IP limitations, and usage quotas.
49 Attributes:
50 server_id (Optional[str]): Limit token to specific server
51 permissions (List[str]): Specific permission scopes
52 ip_restrictions (List[str]): IP address/CIDR restrictions
53 time_restrictions (dict): Time-based access limitations
54 usage_limits (dict): Rate limiting and quota settings
56 Examples:
57 >>> scope = TokenScope(
58 ... server_id="prod-server-123",
59 ... permissions=["tools.read", "resources.read"],
60 ... ip_restrictions=["192.168.1.0/24"],
61 ... time_restrictions={"business_hours_only": True}
62 ... )
63 >>> scope.is_server_scoped()
64 True
65 >>> scope.has_permission("tools.read")
66 True
67 >>> scope.has_permission("tools.write")
68 False
69 >>> scope.has_permission("resources.read")
70 True
71 >>>
72 >>> # Test empty scope
73 >>> empty_scope = TokenScope()
74 >>> empty_scope.is_server_scoped()
75 False
76 >>> empty_scope.has_permission("anything")
77 False
78 >>>
79 >>> # Test global scope
80 >>> global_scope = TokenScope(permissions=["*"])
81 >>> global_scope.has_permission("*")
82 True
83 """
85 def __init__(
86 self,
87 server_id: Optional[str] = None,
88 permissions: Optional[List[str]] = None,
89 ip_restrictions: Optional[List[str]] = None,
90 time_restrictions: Optional[dict] = None,
91 usage_limits: Optional[dict] = None,
92 ):
93 """Initialize TokenScope with specified restrictions and limits.
95 Args:
96 server_id: Optional server ID to scope token to specific server
97 permissions: List of permissions granted to this token
98 ip_restrictions: List of IP addresses/ranges allowed to use token
99 time_restrictions: Dictionary of time-based access restrictions
100 usage_limits: Dictionary of usage limits for the token
101 """
102 self.server_id = server_id
103 self.permissions = permissions or []
104 self.ip_restrictions = ip_restrictions or []
105 self.time_restrictions = time_restrictions or {}
106 self.usage_limits = usage_limits or {}
108 def is_server_scoped(self) -> bool:
109 """Check if token is scoped to a specific server.
111 Returns:
112 bool: True if scoped to a server, False otherwise.
113 """
114 return self.server_id is not None
116 def has_permission(self, permission: str) -> bool:
117 """Check if scope includes specific permission.
119 Args:
120 permission: Permission string to check for.
122 Returns:
123 bool: True if permission is included, False otherwise.
124 """
125 return permission in self.permissions
127 def to_dict(self) -> dict:
128 """Convert scope to dictionary for JSON storage.
130 Returns:
131 dict: Dictionary representation of the token scope.
133 Examples:
134 >>> scope = TokenScope(server_id="server-123", permissions=["read", "write"])
135 >>> result = scope.to_dict()
136 >>> result["server_id"]
137 'server-123'
138 >>> result["permissions"]
139 ['read', 'write']
140 >>> isinstance(result, dict)
141 True
142 """
143 return {"server_id": self.server_id, "permissions": self.permissions, "ip_restrictions": self.ip_restrictions, "time_restrictions": self.time_restrictions, "usage_limits": self.usage_limits}
145 @classmethod
146 def from_dict(cls, data: dict) -> "TokenScope":
147 """Create TokenScope from dictionary.
149 Args:
150 data: Dictionary containing scope configuration.
152 Returns:
153 TokenScope: New TokenScope instance.
155 Examples:
156 >>> data = {
157 ... "server_id": "server-456",
158 ... "permissions": ["tools.read", "tools.execute"],
159 ... "ip_restrictions": ["10.0.0.0/8"]
160 ... }
161 >>> scope = TokenScope.from_dict(data)
162 >>> scope.server_id
163 'server-456'
164 >>> scope.permissions
165 ['tools.read', 'tools.execute']
166 >>> scope.is_server_scoped()
167 True
168 >>> scope.has_permission("tools.read")
169 True
170 >>>
171 >>> # Test empty dict
172 >>> empty_scope = TokenScope.from_dict({})
173 >>> empty_scope.server_id is None
174 True
175 >>> empty_scope.permissions
176 []
177 """
178 return cls(
179 server_id=data.get("server_id"),
180 permissions=data.get("permissions", []),
181 ip_restrictions=data.get("ip_restrictions", []),
182 time_restrictions=data.get("time_restrictions", {}),
183 usage_limits=data.get("usage_limits", {}),
184 )
187class TokenCatalogService:
188 """Service for managing user API token catalogs.
190 This service provides comprehensive token lifecycle management including
191 creation, scoping, revocation, usage tracking, and analytics. It handles
192 JWT-based API tokens with fine-grained access control, team support,
193 and comprehensive audit logging.
195 Key features:
196 - Token creation with customizable scopes and permissions
197 - Team-based token management with role-based access
198 - Token revocation and blacklisting
199 - Usage tracking and analytics
200 - IP and time-based restrictions
201 - Automatic cleanup of expired tokens
203 Attributes:
204 db (Session): SQLAlchemy database session for token operations
206 Examples:
207 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService
208 >>> service = TokenCatalogService(None) # Mock database for doctest
209 >>> service.db is None
210 True
211 """
213 def __init__(self, db: Session):
214 """Initialize TokenCatalogService with database session.
216 Args:
217 db: SQLAlchemy database session for token operations
218 """
219 self.db = db
221 async def _generate_token(
222 self, user_email: str, jti: str, team_id: Optional[str] = None, expires_at: Optional[datetime] = None, scope: Optional["TokenScope"] = None, user: Optional[object] = None
223 ) -> str:
224 """Generate a JWT token for API access.
226 This internal method creates a properly formatted JWT token with all
227 necessary claims including user identity, scopes, team membership,
228 and expiration. The token follows ContextForge JWT structure.
230 Args:
231 user_email: User's email address for the token subject
232 jti: JWT ID for token uniqueness
233 team_id: Optional team ID for team-scoped tokens
234 expires_at: Optional expiration datetime
235 scope: Optional token scope information for access control
236 user: Optional user object to extract admin privileges
238 Returns:
239 str: Signed JWT token string ready for API authentication
241 Raises:
242 ValueError: If expires_at is in the past (cannot create already-expired tokens)
244 Note:
245 This is an internal method. Use create_token() to generate
246 tokens with proper database tracking and validation.
247 """
248 # Calculate expiration in minutes from expires_at
249 expires_in_minutes = 0
250 if expires_at:
251 now = datetime.now(timezone.utc)
252 delta = expires_at - now
253 delta_seconds = delta.total_seconds()
255 # Guard: reject already-expired expiration times
256 if delta_seconds <= 0:
257 raise ValueError("Token expiration time is in the past. Cannot create already-expired tokens.")
259 # Use ceiling to ensure we always have at least 1 minute expiration
260 # This prevents <60s from rounding to 0 and creating non-expiring tokens
261 expires_in_minutes = max(1, math.ceil(delta_seconds / 60))
263 # Build user data dict
264 user_data = {
265 "email": user_email,
266 "full_name": "API Token User",
267 "is_admin": user.is_admin if user else False,
268 "auth_provider": "api_token",
269 }
271 # Build teams list
272 teams = [team_id] if team_id else []
274 # Build scopes dict
275 # Empty permissions = defer to RBAC at runtime (not wildcard access)
276 scopes_dict = None
277 if scope:
278 scopes_dict = {
279 "server_id": scope.server_id,
280 "permissions": scope.permissions if scope.permissions is not None else [],
281 "ip_restrictions": scope.ip_restrictions or [],
282 "time_restrictions": scope.time_restrictions or {},
283 }
284 else:
285 scopes_dict = {
286 "server_id": None,
287 "permissions": [], # Empty = inherit from RBAC at runtime
288 "ip_restrictions": [],
289 "time_restrictions": {},
290 }
292 # Auto-inject servers.use for tokens with explicit MCP-related permissions.
293 # Without servers.use, the token scoping middleware blocks /rpc and /mcp
294 # transport access, making MCP-method permissions useless.
295 permissions = scopes_dict["permissions"]
296 if permissions and "*" not in permissions and "servers.use" not in permissions:
297 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions):
298 scopes_dict["permissions"] = [*permissions, "servers.use"]
300 # Generate JWT token using the centralized token creation utility
301 # Pass structured data to the enhanced create_jwt_token function
302 return await create_jwt_token(
303 data={"sub": user_email, "jti": jti, "token_use": "api"}, # nosec B105 - token type marker, not a password
304 expires_in_minutes=expires_in_minutes,
305 user_data=user_data,
306 teams=teams,
307 scopes=scopes_dict,
308 )
310 def _hash_token(self, token: str) -> str:
311 """Create secure hash of token for storage.
313 Args:
314 token: Raw token string
316 Returns:
317 str: SHA-256 hash of token
319 Examples:
320 >>> service = TokenCatalogService(None)
321 >>> hash_val = service._hash_token("test_token")
322 >>> len(hash_val) == 64
323 True
324 """
325 return hashlib.sha256(token.encode()).hexdigest()
327 def _validate_scope_containment(
328 self,
329 requested_permissions: Optional[List[str]],
330 caller_permissions: Optional[List[str]],
331 ) -> None:
332 """Validate that requested permissions don't exceed caller's permissions.
334 SECURITY: This is fail-secure. If caller_permissions is empty/None,
335 custom scopes are DENIED. Users without explicit permissions can only
336 create tokens with empty scope (inherit at runtime).
338 Args:
339 requested_permissions: Permissions requested for new/updated token
340 caller_permissions: Caller's effective permissions (RBAC + current token scopes)
342 Raises:
343 ValueError: If requested permissions exceed caller's permissions
344 """
345 # No requested permissions = empty scope, always allowed
346 if not requested_permissions:
347 return
349 # FAIL-SECURE: If caller has no permissions, deny any custom scope
350 if not caller_permissions:
351 raise ValueError("Cannot specify custom token permissions. " + "You have no explicit permissions to delegate. " + "Create a token without scope to inherit permissions at runtime.")
353 # Wildcard caller can grant anything
354 if "*" in caller_permissions:
355 return
357 # Wildcard request requires wildcard caller
358 if "*" in requested_permissions:
359 raise ValueError("Cannot create token with wildcard permissions. " + "Your effective permissions do not include wildcard access.")
361 # Check each requested permission
362 for req_perm in requested_permissions:
363 if req_perm in caller_permissions:
364 continue
366 # Check for category wildcard (e.g., "tools.*" allows "tools.read")
367 if "." in req_perm:
368 category = req_perm.split(".")[0]
369 if f"{category}.*" in caller_permissions:
370 continue
372 raise ValueError(f"Cannot grant permission '{req_perm}' - not in your effective permissions.")
374 async def create_token(
375 self,
376 user_email: str,
377 name: str,
378 description: Optional[str] = None,
379 scope: Optional[TokenScope] = None,
380 expires_in_days: Optional[int] = None,
381 tags: Optional[List[str]] = None,
382 team_id: Optional[str] = None,
383 caller_permissions: Optional[List[str]] = None,
384 is_active: bool = True,
385 ) -> tuple[EmailApiToken, str]:
386 """
387 Create a new API token with team-level scoping and additional configurations.
389 This method generates a JWT-based API token with team-level scoping and optional security configurations,
390 such as expiration, permissions, IP restrictions, and usage limits. The token is associated with a user
391 and a specific team, ensuring access control and multi-tenancy support.
393 The function will:
394 - Validate the existence of the user.
395 - Ensure the user is an active member of the specified team.
396 - Verify that the token name is unique for the user+team combination.
397 - Generate a JWT with the specified scoping parameters (e.g., permissions, IP, etc.).
398 - Store the token in the database with the relevant details and return the token and raw JWT string.
400 Args:
401 user_email (str): The email address of the user requesting the token.
402 name (str): A unique, human-readable name for the token (must be unique per user+team).
403 description (Optional[str]): A description for the token (default is None).
404 scope (Optional[TokenScope]): The scoping configuration for the token, including permissions,
405 server ID, IP restrictions, etc. (default is None).
406 expires_in_days (Optional[int]): The expiration time in days for the token (None means no expiration).
407 tags (Optional[List[str]]): A list of organizational tags for the token (default is an empty list).
408 team_id (Optional[str]): The team ID to which the token should be scoped. This is required for team-level scoping.
409 caller_permissions (Optional[List[str]]): The permissions of the caller creating the token. Used for
410 scope containment validation to ensure the new token cannot have broader permissions than the caller.
411 is_active (bool): Whether the token should be created as active (default is True).
413 Returns:
414 tuple[EmailApiToken, str]: A tuple where the first element is the `EmailApiToken` database record and
415 the second element is the raw JWT token string. The `EmailApiToken` contains the database record with the
416 token details.
418 Raises:
419 ValueError: If any of the following validation checks fail:
420 - The `user_email` does not correspond to an existing user.
421 - The `team_id` is missing or the user is not an active member of the specified team.
422 - A token with the same name already exists for the given user and team.
423 - Invalid token configuration (e.g., invalid expiration date).
425 Examples:
426 >>> # This method requires database operations, shown for reference
427 >>> service = TokenCatalogService(None) # Would use real DB session
428 >>> # token, raw_token = await service.create_token(...)
429 >>> # Returns (EmailApiToken, raw_token_string) tuple
430 """
431 # # Enforce team-level scoping requirement
432 # if not team_id:
433 # raise ValueError("team_id is required for token creation. " "Please select a specific team before creating a token. " "You cannot create tokens while viewing 'All Teams'.")
435 # Validate user exists
436 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none()
438 if not user:
439 raise ValueError(f"User not found: {user_email}")
441 # Validate scope containment (fail-secure if no caller_permissions)
442 if scope and scope.permissions:
443 self._validate_scope_containment(scope.permissions, caller_permissions)
445 # Validate team exists and user is active member
446 if team_id:
447 # First-Party
448 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel
450 # Check if team exists
451 team = self.db.execute(select(EmailTeam).where(EmailTeam.id == team_id)).scalar_one_or_none()
453 if not team:
454 raise ValueError(f"Team not found: {team_id}")
456 # Verify user is an active member of the team
457 membership = self.db.execute(
458 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)))
459 ).scalar_one_or_none()
461 if not membership:
462 raise ValueError(f"User {user_email} is not an active member of team {team_id}. Only team members can create tokens for the team.")
464 # Check for duplicate active token name for this user within the same team scope,
465 # matching DB constraint uq_email_api_tokens_user_name_team (user_email, name, team_id).
466 # team_id=None tokens are scoped to the global (no-team) bucket.
467 if team_id:
468 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id == team_id, EmailApiToken.is_active.is_(True))
469 else:
470 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id.is_(None), EmailApiToken.is_active.is_(True))
471 existing_token = self.db.execute(select(EmailApiToken).where(name_check)).scalar_one_or_none()
473 if existing_token:
474 scope_label = f"team '{team_id}'" if team_id else "the global scope (no team)"
475 raise ValueError(f"Token with name '{name}' already exists for user {user_email} in {scope_label}. Token names must be unique per user per team. Please choose a different name.")
477 # CALCULATE EXPIRATION DATE
478 expires_at = None
479 if expires_in_days:
480 expires_at = utc_now() + timedelta(days=expires_in_days)
482 # Enforce expiration requirement if configured
483 if settings.require_token_expiration and not expires_at:
484 raise ValueError("Token expiration is required by server policy (REQUIRE_TOKEN_EXPIRATION=true). Please specify an expiration date for the token.")
486 jti = str(uuid.uuid4()) # Unique JWT ID
487 # Generate JWT token with all necessary claims
488 raw_token = await self._generate_token(user_email=user_email, jti=jti, team_id=team_id, expires_at=expires_at, scope=scope, user=user) # Pass user object to include admin status
490 # Hash token for secure storage
491 token_hash = self._hash_token(raw_token)
493 # Create database record
494 api_token = EmailApiToken(
495 id=str(uuid.uuid4()),
496 user_email=user_email,
497 team_id=team_id, # Store team association
498 name=name,
499 jti=jti,
500 description=description,
501 token_hash=token_hash, # Store hash, not raw token
502 expires_at=expires_at,
503 tags=tags or [],
504 # Store scoping information
505 server_id=scope.server_id if scope else None,
506 resource_scopes=scope.permissions if scope else [],
507 ip_restrictions=scope.ip_restrictions if scope else [],
508 time_restrictions=scope.time_restrictions if scope else {},
509 usage_limits=scope.usage_limits if scope else {},
510 # Token status
511 is_active=is_active,
512 created_at=utc_now(),
513 last_used=None,
514 )
516 self.db.add(api_token)
517 self.db.commit()
518 self.db.refresh(api_token)
520 token_type = f"team-scoped (team: {team_id})" if team_id else "public-only"
521 logger.info(f"Created {token_type} API token '{name}' for user {user_email}. Token ID: {api_token.id}, Expires: {expires_at or 'Never'}")
522 return api_token, raw_token
524 async def count_user_tokens(self, user_email: str, include_inactive: bool = False) -> int:
525 """Count API tokens for a user.
527 Args:
528 user_email: User's email address
529 include_inactive: Include inactive/expired tokens
531 Returns:
532 int: Total number of matching tokens
533 """
534 # pylint: disable=not-callable
535 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.user_email == user_email)
537 if not include_inactive:
538 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
540 result = self.db.execute(query)
541 return result.scalar() or 0
543 async def get_user_team_ids(self, user_email: str) -> List[str]:
544 """Get all team IDs the user is a member of.
546 Uses TeamManagementService.get_user_teams which is cached and consistent
547 with how other services (servers, tools, resources) resolve team visibility.
549 Args:
550 user_email: User's email address
552 Returns:
553 List[str]: Team IDs the user belongs to
554 """
555 # First-Party
556 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
558 team_service = TeamManagementService(self.db)
559 user_teams = await team_service.get_user_teams(user_email)
560 return [team.id for team in user_teams]
562 async def count_user_and_team_tokens(self, user_email: str, include_inactive: bool = False) -> int:
563 """Count API tokens for a user plus team tokens from teams the user belongs to.
565 This combines personal tokens (created by the user) with team-scoped tokens
566 from all teams where the user is an active member.
568 Args:
569 user_email: User's email address
570 include_inactive: Include inactive/expired tokens
572 Returns:
573 int: Total number of matching tokens
574 """
575 team_ids = await self.get_user_team_ids(user_email)
577 # Build query: tokens created by user OR tokens in user's teams
578 conditions = [EmailApiToken.user_email == user_email]
579 if team_ids:
580 conditions.append(EmailApiToken.team_id.in_(team_ids))
582 # pylint: disable=not-callable
583 query = select(func.count(EmailApiToken.id)).where(or_(*conditions))
585 if not include_inactive:
586 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
588 result = self.db.execute(query)
589 return result.scalar() or 0
591 async def count_team_tokens(self, team_id: str, include_inactive: bool = False) -> int:
592 """Count API tokens for a team.
594 Args:
595 team_id: Team ID to count tokens for
596 include_inactive: Include inactive/expired tokens
598 Returns:
599 int: Total number of matching tokens
600 """
601 # pylint: disable=not-callable
602 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.team_id == team_id)
604 if not include_inactive:
605 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
607 result = self.db.execute(query)
608 return result.scalar() or 0
610 async def list_user_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
611 """List API tokens for a user.
613 Args:
614 user_email: User's email address
615 include_inactive: Include inactive/expired tokens
616 limit: Maximum tokens to return
617 offset: Number of tokens to skip
619 Returns:
620 List[EmailApiToken]: User's API tokens
622 Examples:
623 >>> service = TokenCatalogService(None) # Would use real DB session
624 >>> # Returns List[EmailApiToken] for user
625 """
626 # Validate parameters
627 if limit <= 0 or limit > 1000:
628 limit = 50 # Use default
629 offset = max(offset, 0) # Use default
630 query = select(EmailApiToken).where(EmailApiToken.user_email == user_email)
632 if not include_inactive:
633 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
635 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
637 result = self.db.execute(query)
638 return result.scalars().all()
640 async def list_team_tokens(self, team_id: str, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
641 """List API tokens for a team (accessible by any active team member).
643 Args:
644 team_id: Team ID to list tokens for
645 user_email: User's email (must be an active member of the team)
646 include_inactive: Include inactive/expired tokens
647 limit: Maximum tokens to return
648 offset: Number of tokens to skip
650 Returns:
651 List[EmailApiToken]: Team's API tokens
653 Raises:
654 ValueError: If user is not an active member of the team
655 """
656 team_ids = await self.get_user_team_ids(user_email)
658 if team_id not in team_ids:
659 raise ValueError(f"User {user_email} is not an active member of team {team_id}")
661 # Validate parameters
662 if limit <= 0 or limit > 1000:
663 limit = 50
664 offset = max(offset, 0)
666 query = select(EmailApiToken).where(EmailApiToken.team_id == team_id)
668 if not include_inactive:
669 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
671 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
672 result = self.db.execute(query)
673 return result.scalars().all()
675 async def list_user_and_team_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
676 """List API tokens for a user plus team tokens from teams the user belongs to.
678 This combines personal tokens (created by the user) with team-scoped tokens
679 from all teams where the user is an active member.
681 Args:
682 user_email: User's email address
683 include_inactive: Include inactive/expired tokens
684 limit: Maximum tokens to return
685 offset: Number of tokens to skip
687 Returns:
688 List[EmailApiToken]: Combined list of user's personal tokens and team tokens
690 Examples:
691 >>> service = TokenCatalogService(None) # Would use real DB session
692 >>> # Returns List[EmailApiToken] including personal and team tokens
693 """
694 # Validate parameters
695 if limit <= 0 or limit > 1000:
696 limit = 50
697 offset = max(offset, 0)
699 team_ids = await self.get_user_team_ids(user_email)
701 # Build query: tokens created by user OR tokens in user's teams
702 conditions = [EmailApiToken.user_email == user_email]
703 if team_ids:
704 conditions.append(EmailApiToken.team_id.in_(team_ids))
706 query = select(EmailApiToken).where(or_(*conditions))
708 if not include_inactive:
709 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
711 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
713 result = self.db.execute(query)
714 return result.scalars().all()
716 async def get_token(self, token_id: str, user_email: Optional[str] = None) -> Optional[EmailApiToken]:
717 """Get a specific token by ID.
719 Args:
720 token_id: Token ID
721 user_email: Optional user email filter for security
723 Returns:
724 Optional[EmailApiToken]: Token if found and authorized
726 Examples:
727 >>> service = TokenCatalogService(None) # Would use real DB session
728 >>> # Returns Optional[EmailApiToken] if found and authorized
729 """
730 query = select(EmailApiToken).where(EmailApiToken.id == token_id)
732 if user_email:
733 query = query.where(EmailApiToken.user_email == user_email)
735 result = self.db.execute(query)
736 return result.scalar_one_or_none()
738 async def update_token(
739 self,
740 token_id: str,
741 user_email: str,
742 name: Optional[str] = None,
743 description: Optional[str] = None,
744 scope: Optional[TokenScope] = None,
745 tags: Optional[List[str]] = None,
746 caller_permissions: Optional[List[str]] = None,
747 is_active: Optional[bool] = None,
748 ) -> Optional[EmailApiToken]:
749 """Update an existing token with scope containment validation.
751 Args:
752 token_id: Token ID to update
753 user_email: Owner's email for security
754 name: New token name
755 description: New description
756 scope: New scoping configuration
757 tags: New tags
758 caller_permissions: Caller's effective permissions for scope containment
759 is_active: New token active status
761 Returns:
762 Optional[EmailApiToken]: Updated token if found
764 Raises:
765 ValueError: If token not found or name conflicts
767 Examples:
768 >>> service = TokenCatalogService(None) # Would use real DB session
769 >>> # Returns Optional[EmailApiToken] if updated successfully
770 """
771 token = await self.get_token(token_id, user_email)
772 if not token:
773 raise ValueError("Token not found or not authorized")
775 # Validate scope containment for scope changes
776 if scope and scope.permissions:
777 self._validate_scope_containment(scope.permissions, caller_permissions)
779 # Check for duplicate name if changing
780 if name and name != token.name:
781 existing = self.db.execute(
782 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.id != token_id, EmailApiToken.is_active.is_(True)))
783 ).scalar_one_or_none()
785 if existing:
786 raise ValueError(f"Token name '{name}' already exists")
788 token.name = name
790 if description is not None:
791 token.description = description
793 if tags is not None:
794 token.tags = tags
796 if is_active is not None:
797 token.is_active = is_active
799 if scope:
800 token.server_id = scope.server_id
801 token.resource_scopes = scope.permissions
802 token.ip_restrictions = scope.ip_restrictions
803 token.time_restrictions = scope.time_restrictions
804 token.usage_limits = scope.usage_limits
806 self.db.commit()
807 self.db.refresh(token)
809 logger.info(f"Updated token '{token.name}' for user {user_email}")
811 return token
813 async def revoke_token(self, token_id: str, user_email: str, revoked_by: str, reason: Optional[str] = None) -> bool:
814 """Revoke a token owned by the specified user or in a team the user belongs to.
816 Args:
817 token_id: Token ID to revoke
818 user_email: Caller's email - must own the token or be a member of the token's team
819 revoked_by: Email of user performing revocation (for audit)
820 reason: Optional reason for revocation
822 Returns:
823 bool: True if token was revoked, False if not found or not authorized
825 Examples:
826 >>> service = TokenCatalogService(None) # Would use real DB session
827 >>> # Returns bool: True if token was revoked successfully
828 """
829 # First try ownership match
830 token = await self.get_token(token_id, user_email)
832 # If not owned by caller, check if token is in a team the caller is an owner of
833 if not token:
834 token = await self.get_token(token_id)
835 if not token or not token.team_id:
836 return False
837 # Only team owners (admins) can revoke other members' team tokens
838 # First-Party
839 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
841 team_service = TeamManagementService(self.db)
842 role = await team_service.get_user_role_in_team(user_email, token.team_id)
843 if role != "owner":
844 return False
846 # Mark token as inactive
847 token.is_active = False
849 # Add to blacklist
850 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason)
852 self.db.add(revocation)
853 self.db.commit()
855 # Invalidate auth cache synchronously so revoked tokens are rejected immediately
856 # (fire-and-forget via create_task risks a race where the next request arrives
857 # before the invalidation task runs, allowing the revoked token through).
858 try:
859 # First-Party
860 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
862 await auth_cache.invalidate_revocation(token.jti)
863 except Exception as cache_error:
864 logger.debug(f"Failed to invalidate auth cache for revoked token: {cache_error}")
866 logger.info(f"Revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}")
868 return True
870 async def admin_revoke_token(self, token_id: str, revoked_by: str, reason: Optional[str] = None) -> bool:
871 """Admin-only: Revoke any token without ownership check.
873 WARNING: This method bypasses ownership verification.
874 Only call from admin-authenticated endpoints.
876 Args:
877 token_id: Token ID to revoke
878 revoked_by: Admin email for audit
879 reason: Revocation reason
881 Returns:
882 bool: True if token was revoked, False if not found
884 Examples:
885 >>> service = TokenCatalogService(None) # Would use real DB session
886 >>> # Returns bool: True if token was revoked successfully
887 """
888 # No user filter - admin can revoke any token
889 token = await self.get_token(token_id)
890 if not token:
891 return False
893 token.is_active = False
894 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason)
895 self.db.add(revocation)
896 self.db.commit()
898 try:
899 # First-Party
900 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
902 await auth_cache.invalidate_revocation(token.jti)
903 except Exception as cache_error:
904 logger.debug(f"Failed to invalidate auth cache: {cache_error}")
906 logger.info(f"Admin revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}")
907 return True
909 async def is_token_revoked(self, jti: str) -> bool:
910 """Check if a token JTI is revoked.
912 Args:
913 jti: JWT ID to check
915 Returns:
916 bool: True if token is revoked
918 Examples:
919 >>> service = TokenCatalogService(None) # Would use real DB session
920 >>> # Returns bool: True if token is revoked
921 """
922 revocation = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)).scalar_one_or_none()
924 return revocation is not None
926 async def log_token_usage(
927 self,
928 jti: str,
929 user_email: str,
930 endpoint: Optional[str] = None,
931 method: Optional[str] = None,
932 ip_address: Optional[str] = None,
933 user_agent: Optional[str] = None,
934 status_code: Optional[int] = None,
935 response_time_ms: Optional[int] = None,
936 blocked: bool = False,
937 block_reason: Optional[str] = None,
938 ) -> None:
939 """Log token usage for analytics and security.
941 Args:
942 jti: JWT ID of token used
943 user_email: Token owner's email
944 endpoint: API endpoint accessed
945 method: HTTP method
946 ip_address: Client IP address
947 user_agent: Client user agent
948 status_code: HTTP response status
949 response_time_ms: Response time in milliseconds
950 blocked: Whether request was blocked
951 block_reason: Reason for blocking
953 Examples:
954 >>> service = TokenCatalogService(None) # Would use real DB session
955 >>> # Logs token usage for analytics - no return value
956 """
957 usage_log = TokenUsageLog(
958 token_jti=jti,
959 user_email=user_email,
960 endpoint=endpoint,
961 method=method,
962 ip_address=ip_address,
963 user_agent=user_agent,
964 status_code=status_code,
965 response_time_ms=response_time_ms,
966 blocked=blocked,
967 block_reason=block_reason,
968 )
970 self.db.add(usage_log)
971 self.db.commit()
973 async def get_token_usage_stats(self, user_email: str, token_id: Optional[str] = None, days: int = 30) -> dict:
974 """Get token usage statistics.
976 Args:
977 user_email: User's email address
978 token_id: Optional specific token ID
979 days: Number of days to analyze
981 Returns:
982 dict: Usage statistics
984 Examples:
985 >>> service = TokenCatalogService(None) # Would use real DB session
986 >>> # Returns dict with usage statistics
987 """
988 start_date = utc_now() - timedelta(days=days)
990 # Get token JTI if specific token requested
991 token_jti = None
992 if token_id:
993 token = await self.get_token(token_id, user_email)
994 if token:
995 token_jti = token.jti
997 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite
998 dialect_name = self.db.get_bind().dialect.name
999 if dialect_name == "postgresql":
1000 return await self._get_usage_stats_postgresql(user_email, start_date, token_jti, days)
1001 return await self._get_usage_stats_python(user_email, start_date, token_jti, days)
1003 async def _get_usage_stats_postgresql(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict:
1004 """Compute usage stats using PostgreSQL SQL aggregation.
1006 Args:
1007 user_email: User's email address
1008 start_date: Start date for analysis
1009 token_jti: Optional token JTI filter
1010 days: Number of days being analyzed
1012 Returns:
1013 dict: Usage statistics computed via SQL
1014 """
1015 # Build filter conditions
1016 conditions = [TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date]
1017 if token_jti:
1018 conditions.append(TokenUsageLog.token_jti == token_jti)
1020 base_filter = and_(*conditions)
1022 # Main stats query using SQL aggregation
1023 # Match Python behavior:
1024 # - status_code must be non-null AND non-zero AND < 400 for success count
1025 # - response_time_ms must be non-null AND non-zero for average (Python: if log.response_time_ms)
1026 stats_query = (
1027 select(
1028 func.count().label("total"), # pylint: disable=not-callable
1029 func.sum(
1030 case(
1031 (and_(TokenUsageLog.status_code.isnot(None), TokenUsageLog.status_code > 0, TokenUsageLog.status_code < 400), 1),
1032 else_=0,
1033 )
1034 ).label("successful"),
1035 func.sum(case((TokenUsageLog.blocked.is_(True), 1), else_=0)).label("blocked"),
1036 # Only average non-null and non-zero response times (NULL values are ignored by AVG)
1037 func.avg(
1038 case(
1039 (and_(TokenUsageLog.response_time_ms.isnot(None), TokenUsageLog.response_time_ms > 0), TokenUsageLog.response_time_ms),
1040 else_=None,
1041 )
1042 ).label("avg_response"),
1043 )
1044 .select_from(TokenUsageLog)
1045 .where(base_filter)
1046 )
1048 result = self.db.execute(stats_query).fetchone()
1050 total_requests = result.total or 0
1051 successful_requests = result.successful or 0
1052 blocked_requests = result.blocked or 0
1053 avg_response_time = float(result.avg_response) if result.avg_response else 0.0
1055 # Top endpoints query using SQL GROUP BY
1056 # Match Python behavior: exclude None AND empty string endpoints (Python: if log.endpoint)
1057 endpoints_query = (
1058 select(TokenUsageLog.endpoint, func.count().label("count")) # pylint: disable=not-callable
1059 .where(and_(base_filter, TokenUsageLog.endpoint.isnot(None), TokenUsageLog.endpoint != ""))
1060 .group_by(TokenUsageLog.endpoint)
1061 .order_by(func.count().desc()) # pylint: disable=not-callable
1062 .limit(5)
1063 )
1065 endpoints_result = self.db.execute(endpoints_query).fetchall()
1066 top_endpoints = [(row.endpoint, row.count) for row in endpoints_result]
1068 return {
1069 "period_days": days,
1070 "total_requests": total_requests,
1071 "successful_requests": successful_requests,
1072 "blocked_requests": blocked_requests,
1073 "success_rate": successful_requests / total_requests if total_requests > 0 else 0,
1074 "average_response_time_ms": round(avg_response_time, 2),
1075 "top_endpoints": top_endpoints,
1076 }
1078 async def _get_usage_stats_python(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict:
1079 """Compute usage stats using Python (fallback for SQLite).
1081 Args:
1082 user_email: User's email address
1083 start_date: Start date for analysis
1084 token_jti: Optional token JTI filter
1085 days: Number of days being analyzed
1087 Returns:
1088 dict: Usage statistics computed in Python
1089 """
1090 query = select(TokenUsageLog).where(and_(TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date))
1092 if token_jti:
1093 query = query.where(TokenUsageLog.token_jti == token_jti)
1095 usage_logs = self.db.execute(query).scalars().all()
1097 # Calculate statistics
1098 total_requests = len(usage_logs)
1099 successful_requests = sum(1 for log in usage_logs if log.status_code and log.status_code < 400)
1100 blocked_requests = sum(1 for log in usage_logs if log.blocked)
1102 # Average response time
1103 response_times = [log.response_time_ms for log in usage_logs if log.response_time_ms]
1104 avg_response_time = sum(response_times) / len(response_times) if response_times else 0
1106 # Most accessed endpoints
1107 endpoint_counts: dict = {}
1108 for log in usage_logs:
1109 if log.endpoint:
1110 endpoint_counts[log.endpoint] = endpoint_counts.get(log.endpoint, 0) + 1
1112 top_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:5]
1114 return {
1115 "period_days": days,
1116 "total_requests": total_requests,
1117 "successful_requests": successful_requests,
1118 "blocked_requests": blocked_requests,
1119 "success_rate": successful_requests / total_requests if total_requests > 0 else 0,
1120 "average_response_time_ms": round(avg_response_time, 2),
1121 "top_endpoints": top_endpoints,
1122 }
1124 async def get_token_revocation(self, jti: str) -> Optional[TokenRevocation]:
1125 """Get token revocation information by JTI.
1127 Args:
1128 jti: JWT token ID
1130 Returns:
1131 Optional[TokenRevocation]: Revocation info if token is revoked
1133 Examples:
1134 >>> service = TokenCatalogService(None) # Would use real DB session
1135 >>> # Returns Optional[TokenRevocation] if token is revoked
1136 """
1137 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
1138 return result.scalar_one_or_none()
1140 async def get_token_revocations_batch(self, jtis: List[str]) -> Dict[str, TokenRevocation]:
1141 """Get token revocation information for multiple JTIs in a single query.
1143 Args:
1144 jtis: List of JWT token IDs
1146 Returns:
1147 Dict mapping JTI to TokenRevocation for revoked tokens only.
1148 """
1149 if not jtis:
1150 return {}
1151 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti.in_(jtis)))
1152 return {rev.jti: rev for rev in result.scalars().all()}
1154 async def list_all_tokens(self, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
1155 """List all API tokens (admin only).
1157 Args:
1158 include_inactive: Include inactive/expired tokens
1159 limit: Maximum tokens to return
1160 offset: Number of tokens to skip
1162 Returns:
1163 List[EmailApiToken]: All API tokens
1164 """
1165 if limit <= 0 or limit > 1000:
1166 limit = 50
1167 offset = max(offset, 0)
1169 query = select(EmailApiToken)
1171 if not include_inactive:
1172 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
1174 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
1176 result = self.db.execute(query)
1177 return result.scalars().all()
1179 async def count_all_tokens(self, include_inactive: bool = False) -> int:
1180 """Count all API tokens (admin only).
1182 Args:
1183 include_inactive: Include inactive/expired tokens in count
1185 Returns:
1186 int: Total count of all tokens
1187 """
1188 query = select(func.count(EmailApiToken.id)) # pylint: disable=not-callable
1190 if not include_inactive:
1191 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
1193 result = self.db.execute(query)
1194 return result.scalar() or 0
1196 async def cleanup_expired_tokens(self) -> int:
1197 """Clean up expired tokens using bulk UPDATE.
1199 Uses a single SQL UPDATE statement instead of loading tokens into memory
1200 and updating them one by one. This is more efficient and avoids memory
1201 issues when many tokens expire at once.
1203 Returns:
1204 int: Number of tokens cleaned up
1206 Examples:
1207 >>> service = TokenCatalogService(None) # Would use real DB session
1208 >>> # Returns int: Number of tokens cleaned up
1209 """
1210 try:
1211 now = utc_now()
1212 count = self.db.query(EmailApiToken).filter(EmailApiToken.expires_at < now, EmailApiToken.is_active.is_(True)).update({"is_active": False}, synchronize_session=False)
1214 self.db.commit()
1216 if count > 0:
1217 logger.info(f"Cleaned up {count} expired tokens")
1219 return count
1221 except Exception as e:
1222 self.db.rollback()
1223 logger.error(f"Failed to cleanup expired tokens: {e}")
1224 return 0