Coverage for mcpgateway / services / token_catalog_service.py: 99%
329 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/token_catalog_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Token Catalog Service.
8This module provides comprehensive API token management with scoping,
9revocation, usage tracking, and analytics for email-based users.
11Examples:
12 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService
13 >>> service = TokenCatalogService(None) # Mock database for doctest
14 >>> # Service provides full token lifecycle management
15"""
17# Standard
18import asyncio
19from datetime import datetime, timedelta, timezone
20import hashlib
21import math
22from typing import Dict, List, Optional
23import uuid
25# Third-Party
26from sqlalchemy import and_, case, func, or_, select
27from sqlalchemy.orm import Session
29# First-Party
30from mcpgateway.common.validators import SecurityValidator
31from mcpgateway.config import settings
32from mcpgateway.db import EmailApiToken, EmailUser, Permissions, TokenRevocation, TokenUsageLog, utc_now
33from mcpgateway.services.logging_service import LoggingService
34from mcpgateway.utils.create_jwt_token import create_jwt_token
36# Initialize logging
37logging_service = LoggingService()
38logger = logging_service.get_logger(__name__)
40# Strong references to background tasks to prevent GC before completion
41_background_tasks: set[asyncio.Task] = set()
44class TokenScope:
45 """Token scoping configuration for fine-grained access control.
47 This class encapsulates token scoping parameters including
48 server restrictions, permissions, IP limitations, and usage quotas.
50 Attributes:
51 server_id (Optional[str]): Limit token to specific server
52 permissions (List[str]): Specific permission scopes
53 ip_restrictions (List[str]): IP address/CIDR restrictions
54 time_restrictions (dict): Time-based access limitations
55 usage_limits (dict): Rate limiting and quota settings
57 Examples:
58 >>> scope = TokenScope(
59 ... server_id="prod-server-123",
60 ... permissions=["tools.read", "resources.read"],
61 ... ip_restrictions=["192.168.1.0/24"],
62 ... time_restrictions={"business_hours_only": True}
63 ... )
64 >>> scope.is_server_scoped()
65 True
66 >>> scope.has_permission("tools.read")
67 True
68 >>> scope.has_permission("tools.write")
69 False
70 >>> scope.has_permission("resources.read")
71 True
72 >>>
73 >>> # Test empty scope
74 >>> empty_scope = TokenScope()
75 >>> empty_scope.is_server_scoped()
76 False
77 >>> empty_scope.has_permission("anything")
78 False
79 >>>
80 >>> # Test global scope
81 >>> global_scope = TokenScope(permissions=["*"])
82 >>> global_scope.has_permission("*")
83 True
84 """
86 def __init__(
87 self,
88 server_id: Optional[str] = None,
89 permissions: Optional[List[str]] = None,
90 ip_restrictions: Optional[List[str]] = None,
91 time_restrictions: Optional[dict] = None,
92 usage_limits: Optional[dict] = None,
93 ):
94 """Initialize TokenScope with specified restrictions and limits.
96 Args:
97 server_id: Optional server ID to scope token to specific server
98 permissions: List of permissions granted to this token
99 ip_restrictions: List of IP addresses/ranges allowed to use token
100 time_restrictions: Dictionary of time-based access restrictions
101 usage_limits: Dictionary of usage limits for the token
102 """
103 self.server_id = server_id
104 self.permissions = permissions or []
105 self.ip_restrictions = ip_restrictions or []
106 self.time_restrictions = time_restrictions or {}
107 self.usage_limits = usage_limits or {}
109 def is_server_scoped(self) -> bool:
110 """Check if token is scoped to a specific server.
112 Returns:
113 bool: True if scoped to a server, False otherwise.
114 """
115 return self.server_id is not None
117 def has_permission(self, permission: str) -> bool:
118 """Check if scope includes specific permission.
120 Args:
121 permission: Permission string to check for.
123 Returns:
124 bool: True if permission is included, False otherwise.
125 """
126 return permission in self.permissions
128 def to_dict(self) -> dict:
129 """Convert scope to dictionary for JSON storage.
131 Returns:
132 dict: Dictionary representation of the token scope.
134 Examples:
135 >>> scope = TokenScope(server_id="server-123", permissions=["read", "write"])
136 >>> result = scope.to_dict()
137 >>> result["server_id"]
138 'server-123'
139 >>> result["permissions"]
140 ['read', 'write']
141 >>> isinstance(result, dict)
142 True
143 """
144 return {"server_id": self.server_id, "permissions": self.permissions, "ip_restrictions": self.ip_restrictions, "time_restrictions": self.time_restrictions, "usage_limits": self.usage_limits}
146 @classmethod
147 def from_dict(cls, data: dict) -> "TokenScope":
148 """Create TokenScope from dictionary.
150 Args:
151 data: Dictionary containing scope configuration.
153 Returns:
154 TokenScope: New TokenScope instance.
156 Examples:
157 >>> data = {
158 ... "server_id": "server-456",
159 ... "permissions": ["tools.read", "tools.execute"],
160 ... "ip_restrictions": ["10.0.0.0/8"]
161 ... }
162 >>> scope = TokenScope.from_dict(data)
163 >>> scope.server_id
164 'server-456'
165 >>> scope.permissions
166 ['tools.read', 'tools.execute']
167 >>> scope.is_server_scoped()
168 True
169 >>> scope.has_permission("tools.read")
170 True
171 >>>
172 >>> # Test empty dict
173 >>> empty_scope = TokenScope.from_dict({})
174 >>> empty_scope.server_id is None
175 True
176 >>> empty_scope.permissions
177 []
178 """
179 return cls(
180 server_id=data.get("server_id"),
181 permissions=data.get("permissions", []),
182 ip_restrictions=data.get("ip_restrictions", []),
183 time_restrictions=data.get("time_restrictions", {}),
184 usage_limits=data.get("usage_limits", {}),
185 )
188class TokenCatalogService:
189 """Service for managing user API token catalogs.
191 This service provides comprehensive token lifecycle management including
192 creation, scoping, revocation, usage tracking, and analytics. It handles
193 JWT-based API tokens with fine-grained access control, team support,
194 and comprehensive audit logging.
196 Key features:
197 - Token creation with customizable scopes and permissions
198 - Team-based token management with role-based access
199 - Token revocation and blacklisting
200 - Usage tracking and analytics
201 - IP and time-based restrictions
202 - Automatic cleanup of expired tokens
204 Attributes:
205 db (Session): SQLAlchemy database session for token operations
207 Examples:
208 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService
209 >>> service = TokenCatalogService(None) # Mock database for doctest
210 >>> service.db is None
211 True
212 """
214 def __init__(self, db: Session):
215 """Initialize TokenCatalogService with database session.
217 Args:
218 db: SQLAlchemy database session for token operations
219 """
220 self.db = db
222 async def _generate_token(
223 self, user_email: str, jti: str, team_id: Optional[str] = None, expires_at: Optional[datetime] = None, scope: Optional["TokenScope"] = None, user: Optional[object] = None
224 ) -> str:
225 """Generate a JWT token for API access.
227 This internal method creates a properly formatted JWT token with all
228 necessary claims including user identity, scopes, team membership,
229 and expiration. The token follows ContextForge JWT structure.
231 Args:
232 user_email: User's email address for the token subject
233 jti: JWT ID for token uniqueness
234 team_id: Optional team ID for team-scoped tokens
235 expires_at: Optional expiration datetime
236 scope: Optional token scope information for access control
237 user: Optional user object to extract admin privileges
239 Returns:
240 str: Signed JWT token string ready for API authentication
242 Raises:
243 ValueError: If expires_at is in the past (cannot create already-expired tokens)
245 Note:
246 This is an internal method. Use create_token() to generate
247 tokens with proper database tracking and validation.
248 """
249 # Calculate expiration in minutes from expires_at
250 expires_in_minutes = 0
251 if expires_at:
252 now = datetime.now(timezone.utc)
253 delta = expires_at - now
254 delta_seconds = delta.total_seconds()
256 # Guard: reject already-expired expiration times
257 if delta_seconds <= 0:
258 raise ValueError("Token expiration time is in the past. Cannot create already-expired tokens.")
260 # Use ceiling to ensure we always have at least 1 minute expiration
261 # This prevents <60s from rounding to 0 and creating non-expiring tokens
262 expires_in_minutes = max(1, math.ceil(delta_seconds / 60))
264 # Build user data dict
265 user_data = {
266 "email": user_email,
267 "full_name": "API Token User",
268 "is_admin": user.is_admin if user else False,
269 "auth_provider": "api_token",
270 }
272 # Build teams list — None means "all teams" (admin bypass when is_admin=true),
273 # [] means "public-only" (see normalize_token_teams() in auth.py)
274 teams = [team_id] if team_id else None
276 # Build scopes dict
277 # Empty permissions = defer to RBAC at runtime (not wildcard access)
278 scopes_dict = None
279 if scope:
280 scopes_dict = {
281 "server_id": scope.server_id,
282 "permissions": scope.permissions if scope.permissions is not None else [],
283 "ip_restrictions": scope.ip_restrictions or [],
284 "time_restrictions": scope.time_restrictions or {},
285 }
286 else:
287 scopes_dict = {
288 "server_id": None,
289 "permissions": [], # Empty = inherit from RBAC at runtime
290 "ip_restrictions": [],
291 "time_restrictions": {},
292 }
294 # Auto-inject servers.use for tokens with explicit MCP-related permissions.
295 # Without servers.use, the token scoping middleware blocks /rpc and /mcp
296 # transport access, making MCP-method permissions useless.
297 permissions = scopes_dict["permissions"]
298 if permissions and "*" not in permissions and "servers.use" not in permissions:
299 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions):
300 scopes_dict["permissions"] = [*permissions, "servers.use"]
302 # Generate JWT token using the centralized token creation utility
303 # Pass structured data to the enhanced create_jwt_token function
304 return await create_jwt_token(
305 data={"sub": user_email, "jti": jti, "token_use": "api"}, # nosec B105 - token type marker, not a password
306 expires_in_minutes=expires_in_minutes,
307 user_data=user_data,
308 teams=teams,
309 scopes=scopes_dict,
310 )
312 def _hash_token(self, token: str) -> str:
313 """Create secure hash of token for storage.
315 Args:
316 token: Raw token string
318 Returns:
319 str: SHA-256 hash of token
321 Examples:
322 >>> service = TokenCatalogService(None)
323 >>> hash_val = service._hash_token("test_token")
324 >>> len(hash_val) == 64
325 True
326 """
327 return hashlib.sha256(token.encode()).hexdigest()
329 def _validate_scope_containment(
330 self,
331 requested_permissions: Optional[List[str]],
332 caller_permissions: Optional[List[str]],
333 ) -> None:
334 """Validate that requested permissions don't exceed caller's permissions.
336 SECURITY: This is fail-secure. If caller_permissions is empty/None,
337 custom scopes are DENIED. Users without explicit permissions can only
338 create tokens with empty scope (inherit at runtime).
340 Args:
341 requested_permissions: Permissions requested for new/updated token
342 caller_permissions: Caller's effective permissions (RBAC + current token scopes)
344 Raises:
345 ValueError: If requested permissions exceed caller's permissions
346 """
347 # No requested permissions = empty scope, always allowed
348 if not requested_permissions:
349 return
351 # FAIL-SECURE: If caller has no permissions, deny any custom scope
352 if not caller_permissions:
353 raise ValueError("Cannot specify custom token permissions. " + "You have no explicit permissions to delegate. " + "Create a token without scope to inherit permissions at runtime.")
355 # Wildcard caller can grant anything
356 if "*" in caller_permissions:
357 return
359 # Wildcard request requires wildcard caller
360 if "*" in requested_permissions:
361 raise ValueError("Cannot create token with wildcard permissions. " + "Your effective permissions do not include wildcard access.")
363 # Check each requested permission
364 for req_perm in requested_permissions:
365 if req_perm in caller_permissions:
366 continue
368 # Check for category wildcard (e.g., "tools.*" allows "tools.read")
369 if "." in req_perm:
370 category = req_perm.split(".")[0]
371 if f"{category}.*" in caller_permissions:
372 continue
374 raise ValueError(f"Cannot grant permission '{req_perm}' - not in your effective permissions.")
376 async def create_token(
377 self,
378 user_email: str,
379 name: str,
380 description: Optional[str] = None,
381 scope: Optional[TokenScope] = None,
382 expires_in_days: Optional[int] = None,
383 tags: Optional[List[str]] = None,
384 team_id: Optional[str] = None,
385 caller_permissions: Optional[List[str]] = None,
386 is_active: bool = True,
387 ) -> tuple[EmailApiToken, str]:
388 """
389 Create a new API token with team-level scoping and additional configurations.
391 This method generates a JWT-based API token with team-level scoping and optional security configurations,
392 such as expiration, permissions, IP restrictions, and usage limits. The token is associated with a user
393 and a specific team, ensuring access control and multi-tenancy support.
395 The function will:
396 - Validate the existence of the user.
397 - Ensure the user is an active member of the specified team.
398 - Verify that the token name is unique for the user+team combination.
399 - Generate a JWT with the specified scoping parameters (e.g., permissions, IP, etc.).
400 - Store the token in the database with the relevant details and return the token and raw JWT string.
402 Args:
403 user_email (str): The email address of the user requesting the token.
404 name (str): A unique, human-readable name for the token (must be unique per user+team).
405 description (Optional[str]): A description for the token (default is None).
406 scope (Optional[TokenScope]): The scoping configuration for the token, including permissions,
407 server ID, IP restrictions, etc. (default is None).
408 expires_in_days (Optional[int]): The expiration time in days for the token (None means no expiration).
409 tags (Optional[List[str]]): A list of organizational tags for the token (default is an empty list).
410 team_id (Optional[str]): The team ID to which the token should be scoped. This is required for team-level scoping.
411 caller_permissions (Optional[List[str]]): The permissions of the caller creating the token. Used for
412 scope containment validation to ensure the new token cannot have broader permissions than the caller.
413 is_active (bool): Whether the token should be created as active (default is True).
415 Returns:
416 tuple[EmailApiToken, str]: A tuple where the first element is the `EmailApiToken` database record and
417 the second element is the raw JWT token string. The `EmailApiToken` contains the database record with the
418 token details.
420 Raises:
421 ValueError: If any of the following validation checks fail:
422 - The `user_email` does not correspond to an existing user.
423 - The `team_id` is missing or the user is not an active member of the specified team.
424 - A token with the same name already exists for the given user and team.
425 - Invalid token configuration (e.g., invalid expiration date).
427 Examples:
428 >>> # This method requires database operations, shown for reference
429 >>> service = TokenCatalogService(None) # Would use real DB session
430 >>> # token, raw_token = await service.create_token(...)
431 >>> # Returns (EmailApiToken, raw_token_string) tuple
432 """
433 # # Enforce team-level scoping requirement
434 # if not team_id:
435 # raise ValueError("team_id is required for token creation. " "Please select a specific team before creating a token. " "You cannot create tokens while viewing 'All Teams'.")
437 # Validate user exists
438 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none()
440 if not user:
441 raise ValueError(f"User not found: {user_email}")
443 # Validate scope containment (fail-secure if no caller_permissions)
444 if scope and scope.permissions:
445 self._validate_scope_containment(scope.permissions, caller_permissions)
447 # Validate team exists and user is active member
448 if team_id:
449 # First-Party
450 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel
452 # Check if team exists
453 team = self.db.execute(select(EmailTeam).where(EmailTeam.id == team_id)).scalar_one_or_none()
455 if not team:
456 raise ValueError(f"Team not found: {team_id}")
458 # Verify user is an active member of the team
459 membership = self.db.execute(
460 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)))
461 ).scalar_one_or_none()
463 if not membership:
464 raise ValueError(f"User {user_email} is not an active member of team {team_id}. Only team members can create tokens for the team.")
466 # Check for duplicate active token name for this user within the same team scope,
467 # matching DB constraint uq_email_api_tokens_user_name_team (user_email, name, team_id).
468 # team_id=None tokens are scoped to the global (no-team) bucket.
469 if team_id:
470 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id == team_id, EmailApiToken.is_active.is_(True))
471 else:
472 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id.is_(None), EmailApiToken.is_active.is_(True))
473 existing_token = self.db.execute(select(EmailApiToken).where(name_check)).scalar_one_or_none()
475 if existing_token:
476 scope_label = f"team '{team_id}'" if team_id else "the global scope (no team)"
477 raise ValueError(f"Token with name '{name}' already exists for user {user_email} in {scope_label}. Token names must be unique per user per team. Please choose a different name.")
479 # CALCULATE EXPIRATION DATE
480 expires_at = None
481 if expires_in_days:
482 expires_at = utc_now() + timedelta(days=expires_in_days)
484 # Enforce expiration requirement if configured
485 if settings.require_token_expiration and not expires_at:
486 raise ValueError("Token expiration is required by server policy (REQUIRE_TOKEN_EXPIRATION=true). Please specify an expiration date for the token.")
488 jti = str(uuid.uuid4()) # Unique JWT ID
489 # Generate JWT token with all necessary claims
490 raw_token = await self._generate_token(user_email=user_email, jti=jti, team_id=team_id, expires_at=expires_at, scope=scope, user=user) # Pass user object to include admin status
492 # Hash token for secure storage
493 token_hash = self._hash_token(raw_token)
495 # Create database record
496 api_token = EmailApiToken(
497 id=str(uuid.uuid4()),
498 user_email=user_email,
499 team_id=team_id, # Store team association
500 name=name,
501 jti=jti,
502 description=description,
503 token_hash=token_hash, # Store hash, not raw token
504 expires_at=expires_at,
505 tags=tags or [],
506 # Store scoping information
507 server_id=scope.server_id if scope else None,
508 resource_scopes=scope.permissions if scope else [],
509 ip_restrictions=scope.ip_restrictions if scope else [],
510 time_restrictions=scope.time_restrictions if scope else {},
511 usage_limits=scope.usage_limits if scope else {},
512 # Token status
513 is_active=is_active,
514 created_at=utc_now(),
515 last_used=None,
516 )
518 self.db.add(api_token)
519 self.db.commit()
520 self.db.refresh(api_token)
522 token_type = f"team-scoped (team: {team_id})" if team_id else "public-only"
523 logger.info(f"Created {token_type} API token '{name}' for user {SecurityValidator.sanitize_log_message(user_email)}. Token ID: {api_token.id}, Expires: {expires_at or 'Never'}")
524 return api_token, raw_token
526 async def count_user_tokens(self, user_email: str, include_inactive: bool = False) -> int:
527 """Count API tokens for a user.
529 Args:
530 user_email: User's email address
531 include_inactive: Include inactive/expired tokens
533 Returns:
534 int: Total number of matching tokens
535 """
536 # pylint: disable=not-callable
537 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.user_email == user_email)
539 if not include_inactive:
540 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
542 result = self.db.execute(query)
543 return result.scalar() or 0
545 async def get_user_team_ids(self, user_email: str) -> List[str]:
546 """Get all team IDs the user is a member of.
548 Uses TeamManagementService.get_user_teams which is cached and consistent
549 with how other services (servers, tools, resources) resolve team visibility.
551 Args:
552 user_email: User's email address
554 Returns:
555 List[str]: Team IDs the user belongs to
556 """
557 # First-Party
558 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
560 team_service = TeamManagementService(self.db)
561 user_teams = await team_service.get_user_teams(user_email)
562 return [team.id for team in user_teams]
564 async def count_user_and_team_tokens(self, user_email: str, include_inactive: bool = False) -> int:
565 """Count API tokens for a user plus team tokens from teams the user belongs to.
567 This combines personal tokens (created by the user) with team-scoped tokens
568 from all teams where the user is an active member.
570 Args:
571 user_email: User's email address
572 include_inactive: Include inactive/expired tokens
574 Returns:
575 int: Total number of matching tokens
576 """
577 team_ids = await self.get_user_team_ids(user_email)
579 # Build query: tokens created by user OR tokens in user's teams
580 conditions = [EmailApiToken.user_email == user_email]
581 if team_ids:
582 conditions.append(EmailApiToken.team_id.in_(team_ids))
584 # pylint: disable=not-callable
585 query = select(func.count(EmailApiToken.id)).where(or_(*conditions))
587 if not include_inactive:
588 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
590 result = self.db.execute(query)
591 return result.scalar() or 0
593 async def count_team_tokens(self, team_id: str, include_inactive: bool = False) -> int:
594 """Count API tokens for a team.
596 Args:
597 team_id: Team ID to count tokens for
598 include_inactive: Include inactive/expired tokens
600 Returns:
601 int: Total number of matching tokens
602 """
603 # pylint: disable=not-callable
604 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.team_id == team_id)
606 if not include_inactive:
607 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
609 result = self.db.execute(query)
610 return result.scalar() or 0
612 async def list_user_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
613 """List API tokens for a user.
615 Args:
616 user_email: User's email address
617 include_inactive: Include inactive/expired tokens
618 limit: Maximum tokens to return
619 offset: Number of tokens to skip
621 Returns:
622 List[EmailApiToken]: User's API tokens
624 Examples:
625 >>> service = TokenCatalogService(None) # Would use real DB session
626 >>> # Returns List[EmailApiToken] for user
627 """
628 # Validate parameters
629 if limit <= 0 or limit > 1000:
630 limit = 50 # Use default
631 offset = max(offset, 0) # Use default
632 query = select(EmailApiToken).where(EmailApiToken.user_email == user_email)
634 if not include_inactive:
635 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
637 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
639 result = self.db.execute(query)
640 return result.scalars().all()
642 async def list_team_tokens(self, team_id: str, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
643 """List API tokens for a team (accessible by any active team member).
645 Args:
646 team_id: Team ID to list tokens for
647 user_email: User's email (must be an active member of the team)
648 include_inactive: Include inactive/expired tokens
649 limit: Maximum tokens to return
650 offset: Number of tokens to skip
652 Returns:
653 List[EmailApiToken]: Team's API tokens
655 Raises:
656 ValueError: If user is not an active member of the team
657 """
658 team_ids = await self.get_user_team_ids(user_email)
660 if team_id not in team_ids:
661 raise ValueError(f"User {user_email} is not an active member of team {team_id}")
663 # Validate parameters
664 if limit <= 0 or limit > 1000:
665 limit = 50
666 offset = max(offset, 0)
668 query = select(EmailApiToken).where(EmailApiToken.team_id == team_id)
670 if not include_inactive:
671 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
673 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
674 result = self.db.execute(query)
675 return result.scalars().all()
677 async def list_user_and_team_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
678 """List API tokens for a user plus team tokens from teams the user belongs to.
680 This combines personal tokens (created by the user) with team-scoped tokens
681 from all teams where the user is an active member.
683 Args:
684 user_email: User's email address
685 include_inactive: Include inactive/expired tokens
686 limit: Maximum tokens to return
687 offset: Number of tokens to skip
689 Returns:
690 List[EmailApiToken]: Combined list of user's personal tokens and team tokens
692 Examples:
693 >>> service = TokenCatalogService(None) # Would use real DB session
694 >>> # Returns List[EmailApiToken] including personal and team tokens
695 """
696 # Validate parameters
697 if limit <= 0 or limit > 1000:
698 limit = 50
699 offset = max(offset, 0)
701 team_ids = await self.get_user_team_ids(user_email)
703 # Build query: tokens created by user OR tokens in user's teams
704 conditions = [EmailApiToken.user_email == user_email]
705 if team_ids:
706 conditions.append(EmailApiToken.team_id.in_(team_ids))
708 query = select(EmailApiToken).where(or_(*conditions))
710 if not include_inactive:
711 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
713 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
715 result = self.db.execute(query)
716 return result.scalars().all()
718 async def get_token(self, token_id: str, user_email: Optional[str] = None) -> Optional[EmailApiToken]:
719 """Get a specific token by ID.
721 Args:
722 token_id: Token ID
723 user_email: Optional user email filter for security
725 Returns:
726 Optional[EmailApiToken]: Token if found and authorized
728 Examples:
729 >>> service = TokenCatalogService(None) # Would use real DB session
730 >>> # Returns Optional[EmailApiToken] if found and authorized
731 """
732 query = select(EmailApiToken).where(EmailApiToken.id == token_id)
734 if user_email:
735 query = query.where(EmailApiToken.user_email == user_email)
737 result = self.db.execute(query)
738 return result.scalar_one_or_none()
740 async def update_token(
741 self,
742 token_id: str,
743 user_email: str,
744 name: Optional[str] = None,
745 description: Optional[str] = None,
746 scope: Optional[TokenScope] = None,
747 tags: Optional[List[str]] = None,
748 caller_permissions: Optional[List[str]] = None,
749 is_active: Optional[bool] = None,
750 ) -> Optional[EmailApiToken]:
751 """Update an existing token with scope containment validation.
753 Args:
754 token_id: Token ID to update
755 user_email: Owner's email for security
756 name: New token name
757 description: New description
758 scope: New scoping configuration
759 tags: New tags
760 caller_permissions: Caller's effective permissions for scope containment
761 is_active: New token active status
763 Returns:
764 Optional[EmailApiToken]: Updated token if found
766 Raises:
767 ValueError: If token not found or name conflicts
769 Examples:
770 >>> service = TokenCatalogService(None) # Would use real DB session
771 >>> # Returns Optional[EmailApiToken] if updated successfully
772 """
773 token = await self.get_token(token_id, user_email)
774 if not token:
775 raise ValueError("Token not found or not authorized")
777 # Validate scope containment for scope changes
778 if scope and scope.permissions:
779 self._validate_scope_containment(scope.permissions, caller_permissions)
781 # Check for duplicate name if changing
782 if name and name != token.name:
783 existing = self.db.execute(
784 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.id != token_id, EmailApiToken.is_active.is_(True)))
785 ).scalar_one_or_none()
787 if existing:
788 raise ValueError(f"Token name '{name}' already exists")
790 token.name = name
792 if description is not None:
793 token.description = description
795 if tags is not None:
796 token.tags = tags
798 if is_active is not None:
799 token.is_active = is_active
801 if scope:
802 token.server_id = scope.server_id
803 token.resource_scopes = scope.permissions
804 token.ip_restrictions = scope.ip_restrictions
805 token.time_restrictions = scope.time_restrictions
806 token.usage_limits = scope.usage_limits
808 self.db.commit()
809 self.db.refresh(token)
811 logger.info(f"Updated token '{token.name}' for user {SecurityValidator.sanitize_log_message(user_email)}")
813 return token
815 async def revoke_token(self, token_id: str, user_email: str, revoked_by: str, reason: Optional[str] = None) -> bool:
816 """Revoke a token owned by the specified user or in a team the user belongs to.
818 Args:
819 token_id: Token ID to revoke
820 user_email: Caller's email - must own the token or be a member of the token's team
821 revoked_by: Email of user performing revocation (for audit)
822 reason: Optional reason for revocation
824 Returns:
825 bool: True if token was revoked, False if not found or not authorized
827 Examples:
828 >>> service = TokenCatalogService(None) # Would use real DB session
829 >>> # Returns bool: True if token was revoked successfully
830 """
831 # First try ownership match
832 token = await self.get_token(token_id, user_email)
834 # If not owned by caller, check if token is in a team the caller is an owner of
835 if not token:
836 token = await self.get_token(token_id)
837 if not token or not token.team_id:
838 return False
839 # Only team owners (admins) can revoke other members' team tokens
840 # First-Party
841 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
843 team_service = TeamManagementService(self.db)
844 role = await team_service.get_user_role_in_team(user_email, token.team_id)
845 if role != "owner":
846 return False
848 # Mark token as inactive
849 token.is_active = False
851 # Add to blacklist
852 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason)
854 self.db.add(revocation)
855 self.db.commit()
857 # Invalidate auth cache synchronously so revoked tokens are rejected immediately
858 # (fire-and-forget via create_task risks a race where the next request arrives
859 # before the invalidation task runs, allowing the revoked token through).
860 try:
861 # First-Party
862 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
864 await auth_cache.invalidate_revocation(token.jti)
865 except Exception as cache_error:
866 logger.debug(f"Failed to invalidate auth cache for revoked token: {cache_error}")
868 logger.info(f"Revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}")
870 return True
872 async def admin_revoke_token(self, token_id: str, revoked_by: str, reason: Optional[str] = None) -> bool:
873 """Admin-only: Revoke any token without ownership check.
875 WARNING: This method bypasses ownership verification.
876 Only call from admin-authenticated endpoints.
878 Args:
879 token_id: Token ID to revoke
880 revoked_by: Admin email for audit
881 reason: Revocation reason
883 Returns:
884 bool: True if token was revoked, False if not found
886 Examples:
887 >>> service = TokenCatalogService(None) # Would use real DB session
888 >>> # Returns bool: True if token was revoked successfully
889 """
890 # No user filter - admin can revoke any token
891 token = await self.get_token(token_id)
892 if not token:
893 return False
895 token.is_active = False
896 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason)
897 self.db.add(revocation)
898 self.db.commit()
900 try:
901 # First-Party
902 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
904 await auth_cache.invalidate_revocation(token.jti)
905 except Exception as cache_error:
906 logger.debug(f"Failed to invalidate auth cache: {cache_error}")
908 logger.info(f"Admin revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}")
909 return True
911 async def is_token_revoked(self, jti: str) -> bool:
912 """Check if a token JTI is revoked.
914 Args:
915 jti: JWT ID to check
917 Returns:
918 bool: True if token is revoked
920 Examples:
921 >>> service = TokenCatalogService(None) # Would use real DB session
922 >>> # Returns bool: True if token is revoked
923 """
924 revocation = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)).scalar_one_or_none()
926 return revocation is not None
928 async def log_token_usage(
929 self,
930 jti: str,
931 user_email: str,
932 endpoint: Optional[str] = None,
933 method: Optional[str] = None,
934 ip_address: Optional[str] = None,
935 user_agent: Optional[str] = None,
936 status_code: Optional[int] = None,
937 response_time_ms: Optional[int] = None,
938 blocked: bool = False,
939 block_reason: Optional[str] = None,
940 ) -> None:
941 """Log token usage for analytics and security.
943 Args:
944 jti: JWT ID of token used
945 user_email: Token owner's email
946 endpoint: API endpoint accessed
947 method: HTTP method
948 ip_address: Client IP address
949 user_agent: Client user agent
950 status_code: HTTP response status
951 response_time_ms: Response time in milliseconds
952 blocked: Whether request was blocked
953 block_reason: Reason for blocking
955 Examples:
956 >>> service = TokenCatalogService(None) # Would use real DB session
957 >>> # Logs token usage for analytics - no return value
958 """
959 usage_log = TokenUsageLog(
960 token_jti=jti,
961 user_email=user_email,
962 endpoint=endpoint,
963 method=method,
964 ip_address=ip_address,
965 user_agent=user_agent,
966 status_code=status_code,
967 response_time_ms=response_time_ms,
968 blocked=blocked,
969 block_reason=block_reason,
970 )
972 self.db.add(usage_log)
973 self.db.commit()
975 async def get_token_usage_stats(self, user_email: str, token_id: Optional[str] = None, days: int = 30) -> dict:
976 """Get token usage statistics.
978 Args:
979 user_email: User's email address
980 token_id: Optional specific token ID
981 days: Number of days to analyze
983 Returns:
984 dict: Usage statistics
986 Examples:
987 >>> service = TokenCatalogService(None) # Would use real DB session
988 >>> # Returns dict with usage statistics
989 """
990 start_date = utc_now() - timedelta(days=days)
992 # Get token JTI if specific token requested
993 token_jti = None
994 if token_id:
995 token = await self.get_token(token_id, user_email)
996 if token:
997 token_jti = token.jti
999 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite
1000 dialect_name = self.db.get_bind().dialect.name
1001 if dialect_name == "postgresql":
1002 return await self._get_usage_stats_postgresql(user_email, start_date, token_jti, days)
1003 return await self._get_usage_stats_python(user_email, start_date, token_jti, days)
1005 async def _get_usage_stats_postgresql(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict:
1006 """Compute usage stats using PostgreSQL SQL aggregation.
1008 Args:
1009 user_email: User's email address
1010 start_date: Start date for analysis
1011 token_jti: Optional token JTI filter
1012 days: Number of days being analyzed
1014 Returns:
1015 dict: Usage statistics computed via SQL
1016 """
1017 # Build filter conditions
1018 conditions = [TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date]
1019 if token_jti:
1020 conditions.append(TokenUsageLog.token_jti == token_jti)
1022 base_filter = and_(*conditions)
1024 # Main stats query using SQL aggregation
1025 # Match Python behavior:
1026 # - status_code must be non-null AND non-zero AND < 400 for success count
1027 # - response_time_ms must be non-null AND non-zero for average (Python: if log.response_time_ms)
1028 stats_query = (
1029 select(
1030 func.count().label("total"), # pylint: disable=not-callable
1031 func.sum(
1032 case(
1033 (and_(TokenUsageLog.status_code.isnot(None), TokenUsageLog.status_code > 0, TokenUsageLog.status_code < 400), 1),
1034 else_=0,
1035 )
1036 ).label("successful"),
1037 func.sum(case((TokenUsageLog.blocked.is_(True), 1), else_=0)).label("blocked"),
1038 # Only average non-null and non-zero response times (NULL values are ignored by AVG)
1039 func.avg(
1040 case(
1041 (and_(TokenUsageLog.response_time_ms.isnot(None), TokenUsageLog.response_time_ms > 0), TokenUsageLog.response_time_ms),
1042 else_=None,
1043 )
1044 ).label("avg_response"),
1045 )
1046 .select_from(TokenUsageLog)
1047 .where(base_filter)
1048 )
1050 result = self.db.execute(stats_query).fetchone()
1052 total_requests = result.total or 0
1053 successful_requests = result.successful or 0
1054 blocked_requests = result.blocked or 0
1055 avg_response_time = float(result.avg_response) if result.avg_response else 0.0
1057 # Top endpoints query using SQL GROUP BY
1058 # Match Python behavior: exclude None AND empty string endpoints (Python: if log.endpoint)
1059 endpoints_query = (
1060 select(TokenUsageLog.endpoint, func.count().label("count")) # pylint: disable=not-callable
1061 .where(and_(base_filter, TokenUsageLog.endpoint.isnot(None), TokenUsageLog.endpoint != ""))
1062 .group_by(TokenUsageLog.endpoint)
1063 .order_by(func.count().desc()) # pylint: disable=not-callable
1064 .limit(5)
1065 )
1067 endpoints_result = self.db.execute(endpoints_query).fetchall()
1068 top_endpoints = [(row.endpoint, row.count) for row in endpoints_result]
1070 return {
1071 "period_days": days,
1072 "total_requests": total_requests,
1073 "successful_requests": successful_requests,
1074 "blocked_requests": blocked_requests,
1075 "success_rate": successful_requests / total_requests if total_requests > 0 else 0,
1076 "average_response_time_ms": round(avg_response_time, 2),
1077 "top_endpoints": top_endpoints,
1078 }
1080 async def _get_usage_stats_python(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict:
1081 """Compute usage stats using Python (fallback for SQLite).
1083 Args:
1084 user_email: User's email address
1085 start_date: Start date for analysis
1086 token_jti: Optional token JTI filter
1087 days: Number of days being analyzed
1089 Returns:
1090 dict: Usage statistics computed in Python
1091 """
1092 query = select(TokenUsageLog).where(and_(TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date))
1094 if token_jti:
1095 query = query.where(TokenUsageLog.token_jti == token_jti)
1097 usage_logs = self.db.execute(query).scalars().all()
1099 # Calculate statistics
1100 total_requests = len(usage_logs)
1101 successful_requests = sum(1 for log in usage_logs if log.status_code and log.status_code < 400)
1102 blocked_requests = sum(1 for log in usage_logs if log.blocked)
1104 # Average response time
1105 response_times = [log.response_time_ms for log in usage_logs if log.response_time_ms]
1106 avg_response_time = sum(response_times) / len(response_times) if response_times else 0
1108 # Most accessed endpoints
1109 endpoint_counts: dict = {}
1110 for log in usage_logs:
1111 if log.endpoint:
1112 endpoint_counts[log.endpoint] = endpoint_counts.get(log.endpoint, 0) + 1
1114 top_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:5]
1116 return {
1117 "period_days": days,
1118 "total_requests": total_requests,
1119 "successful_requests": successful_requests,
1120 "blocked_requests": blocked_requests,
1121 "success_rate": successful_requests / total_requests if total_requests > 0 else 0,
1122 "average_response_time_ms": round(avg_response_time, 2),
1123 "top_endpoints": top_endpoints,
1124 }
1126 async def get_token_revocation(self, jti: str) -> Optional[TokenRevocation]:
1127 """Get token revocation information by JTI.
1129 Args:
1130 jti: JWT token ID
1132 Returns:
1133 Optional[TokenRevocation]: Revocation info if token is revoked
1135 Examples:
1136 >>> service = TokenCatalogService(None) # Would use real DB session
1137 >>> # Returns Optional[TokenRevocation] if token is revoked
1138 """
1139 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
1140 return result.scalar_one_or_none()
1142 async def get_token_revocations_batch(self, jtis: List[str]) -> Dict[str, TokenRevocation]:
1143 """Get token revocation information for multiple JTIs in a single query.
1145 Args:
1146 jtis: List of JWT token IDs
1148 Returns:
1149 Dict mapping JTI to TokenRevocation for revoked tokens only.
1150 """
1151 if not jtis:
1152 return {}
1153 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti.in_(jtis)))
1154 return {rev.jti: rev for rev in result.scalars().all()}
1156 async def list_all_tokens(self, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
1157 """List all API tokens (admin only).
1159 Args:
1160 include_inactive: Include inactive/expired tokens
1161 limit: Maximum tokens to return
1162 offset: Number of tokens to skip
1164 Returns:
1165 List[EmailApiToken]: All API tokens
1166 """
1167 if limit <= 0 or limit > 1000:
1168 limit = 50
1169 offset = max(offset, 0)
1171 query = select(EmailApiToken)
1173 if not include_inactive:
1174 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
1176 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
1178 result = self.db.execute(query)
1179 return result.scalars().all()
1181 async def count_all_tokens(self, include_inactive: bool = False) -> int:
1182 """Count all API tokens (admin only).
1184 Args:
1185 include_inactive: Include inactive/expired tokens in count
1187 Returns:
1188 int: Total count of all tokens
1189 """
1190 query = select(func.count(EmailApiToken.id)) # pylint: disable=not-callable
1192 if not include_inactive:
1193 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
1195 result = self.db.execute(query)
1196 return result.scalar() or 0
1198 async def cleanup_expired_tokens(self) -> int:
1199 """Clean up expired tokens using bulk UPDATE.
1201 Uses a single SQL UPDATE statement instead of loading tokens into memory
1202 and updating them one by one. This is more efficient and avoids memory
1203 issues when many tokens expire at once.
1205 Returns:
1206 int: Number of tokens cleaned up
1208 Examples:
1209 >>> service = TokenCatalogService(None) # Would use real DB session
1210 >>> # Returns int: Number of tokens cleaned up
1211 """
1212 try:
1213 now = utc_now()
1214 count = self.db.query(EmailApiToken).filter(EmailApiToken.expires_at < now, EmailApiToken.is_active.is_(True)).update({"is_active": False}, synchronize_session=False)
1216 self.db.commit()
1218 if count > 0:
1219 logger.info(f"Cleaned up {count} expired tokens")
1221 return count
1223 except Exception as e:
1224 self.db.rollback()
1225 logger.error(f"Failed to cleanup expired tokens: {e}")
1226 return 0