Coverage for mcpgateway / services / token_catalog_service.py: 99%
253 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/token_catalog_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Token Catalog Service.
8This module provides comprehensive API token management with scoping,
9revocation, usage tracking, and analytics for email-based users.
11Examples:
12 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService
13 >>> service = TokenCatalogService(None) # Mock database for doctest
14 >>> # Service provides full token lifecycle management
15"""
17# Standard
18from datetime import datetime, timedelta, timezone
19import hashlib
20import math
21from typing import List, Optional
22import uuid
24# Third-Party
25from sqlalchemy import and_, case, func, or_, select
26from sqlalchemy.orm import Session
28# First-Party
29from mcpgateway.db import EmailApiToken, EmailUser, TokenRevocation, TokenUsageLog, utc_now
30from mcpgateway.services.logging_service import LoggingService
31from mcpgateway.utils.create_jwt_token import create_jwt_token
33# Initialize logging
34logging_service = LoggingService()
35logger = logging_service.get_logger(__name__)
38class TokenScope:
39 """Token scoping configuration for fine-grained access control.
41 This class encapsulates token scoping parameters including
42 server restrictions, permissions, IP limitations, and usage quotas.
44 Attributes:
45 server_id (Optional[str]): Limit token to specific server
46 permissions (List[str]): Specific permission scopes
47 ip_restrictions (List[str]): IP address/CIDR restrictions
48 time_restrictions (dict): Time-based access limitations
49 usage_limits (dict): Rate limiting and quota settings
51 Examples:
52 >>> scope = TokenScope(
53 ... server_id="prod-server-123",
54 ... permissions=["tools.read", "resources.read"],
55 ... ip_restrictions=["192.168.1.0/24"],
56 ... time_restrictions={"business_hours_only": True}
57 ... )
58 >>> scope.is_server_scoped()
59 True
60 >>> scope.has_permission("tools.read")
61 True
62 >>> scope.has_permission("tools.write")
63 False
64 >>> scope.has_permission("resources.read")
65 True
66 >>>
67 >>> # Test empty scope
68 >>> empty_scope = TokenScope()
69 >>> empty_scope.is_server_scoped()
70 False
71 >>> empty_scope.has_permission("anything")
72 False
73 >>>
74 >>> # Test global scope
75 >>> global_scope = TokenScope(permissions=["*"])
76 >>> global_scope.has_permission("*")
77 True
78 """
80 def __init__(
81 self,
82 server_id: Optional[str] = None,
83 permissions: Optional[List[str]] = None,
84 ip_restrictions: Optional[List[str]] = None,
85 time_restrictions: Optional[dict] = None,
86 usage_limits: Optional[dict] = None,
87 ):
88 """Initialize TokenScope with specified restrictions and limits.
90 Args:
91 server_id: Optional server ID to scope token to specific server
92 permissions: List of permissions granted to this token
93 ip_restrictions: List of IP addresses/ranges allowed to use token
94 time_restrictions: Dictionary of time-based access restrictions
95 usage_limits: Dictionary of usage limits for the token
96 """
97 self.server_id = server_id
98 self.permissions = permissions or []
99 self.ip_restrictions = ip_restrictions or []
100 self.time_restrictions = time_restrictions or {}
101 self.usage_limits = usage_limits or {}
103 def is_server_scoped(self) -> bool:
104 """Check if token is scoped to a specific server.
106 Returns:
107 bool: True if scoped to a server, False otherwise.
108 """
109 return self.server_id is not None
111 def has_permission(self, permission: str) -> bool:
112 """Check if scope includes specific permission.
114 Args:
115 permission: Permission string to check for.
117 Returns:
118 bool: True if permission is included, False otherwise.
119 """
120 return permission in self.permissions
122 def to_dict(self) -> dict:
123 """Convert scope to dictionary for JSON storage.
125 Returns:
126 dict: Dictionary representation of the token scope.
128 Examples:
129 >>> scope = TokenScope(server_id="server-123", permissions=["read", "write"])
130 >>> result = scope.to_dict()
131 >>> result["server_id"]
132 'server-123'
133 >>> result["permissions"]
134 ['read', 'write']
135 >>> isinstance(result, dict)
136 True
137 """
138 return {"server_id": self.server_id, "permissions": self.permissions, "ip_restrictions": self.ip_restrictions, "time_restrictions": self.time_restrictions, "usage_limits": self.usage_limits}
140 @classmethod
141 def from_dict(cls, data: dict) -> "TokenScope":
142 """Create TokenScope from dictionary.
144 Args:
145 data: Dictionary containing scope configuration.
147 Returns:
148 TokenScope: New TokenScope instance.
150 Examples:
151 >>> data = {
152 ... "server_id": "server-456",
153 ... "permissions": ["tools.read", "tools.execute"],
154 ... "ip_restrictions": ["10.0.0.0/8"]
155 ... }
156 >>> scope = TokenScope.from_dict(data)
157 >>> scope.server_id
158 'server-456'
159 >>> scope.permissions
160 ['tools.read', 'tools.execute']
161 >>> scope.is_server_scoped()
162 True
163 >>> scope.has_permission("tools.read")
164 True
165 >>>
166 >>> # Test empty dict
167 >>> empty_scope = TokenScope.from_dict({})
168 >>> empty_scope.server_id is None
169 True
170 >>> empty_scope.permissions
171 []
172 """
173 return cls(
174 server_id=data.get("server_id"),
175 permissions=data.get("permissions", []),
176 ip_restrictions=data.get("ip_restrictions", []),
177 time_restrictions=data.get("time_restrictions", {}),
178 usage_limits=data.get("usage_limits", {}),
179 )
182class TokenCatalogService:
183 """Service for managing user API token catalogs.
185 This service provides comprehensive token lifecycle management including
186 creation, scoping, revocation, usage tracking, and analytics. It handles
187 JWT-based API tokens with fine-grained access control, team support,
188 and comprehensive audit logging.
190 Key features:
191 - Token creation with customizable scopes and permissions
192 - Team-based token management with role-based access
193 - Token revocation and blacklisting
194 - Usage tracking and analytics
195 - IP and time-based restrictions
196 - Automatic cleanup of expired tokens
198 Attributes:
199 db (Session): SQLAlchemy database session for token operations
201 Examples:
202 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService
203 >>> service = TokenCatalogService(None) # Mock database for doctest
204 >>> service.db is None
205 True
206 """
208 def __init__(self, db: Session):
209 """Initialize TokenCatalogService with database session.
211 Args:
212 db: SQLAlchemy database session for token operations
213 """
214 self.db = db
216 async def _generate_token(
217 self, user_email: str, jti: str, team_id: Optional[str] = None, expires_at: Optional[datetime] = None, scope: Optional["TokenScope"] = None, user: Optional[object] = None
218 ) -> str:
219 """Generate a JWT token for API access.
221 This internal method creates a properly formatted JWT token with all
222 necessary claims including user identity, scopes, team membership,
223 and expiration. The token follows the MCP Gateway JWT structure.
225 Args:
226 user_email: User's email address for the token subject
227 jti: JWT ID for token uniqueness
228 team_id: Optional team ID for team-scoped tokens
229 expires_at: Optional expiration datetime
230 scope: Optional token scope information for access control
231 user: Optional user object to extract admin privileges
233 Returns:
234 str: Signed JWT token string ready for API authentication
236 Raises:
237 ValueError: If expires_at is in the past (cannot create already-expired tokens)
239 Note:
240 This is an internal method. Use create_token() to generate
241 tokens with proper database tracking and validation.
242 """
243 # Calculate expiration in minutes from expires_at
244 expires_in_minutes = 0
245 if expires_at:
246 now = datetime.now(timezone.utc)
247 delta = expires_at - now
248 delta_seconds = delta.total_seconds()
250 # Guard: reject already-expired expiration times
251 if delta_seconds <= 0:
252 raise ValueError("Token expiration time is in the past. Cannot create already-expired tokens.")
254 # Use ceiling to ensure we always have at least 1 minute expiration
255 # This prevents <60s from rounding to 0 and creating non-expiring tokens
256 expires_in_minutes = max(1, math.ceil(delta_seconds / 60))
258 # Build user data dict
259 user_data = {
260 "email": user_email,
261 "full_name": "API Token User",
262 "is_admin": user.is_admin if user else False,
263 "auth_provider": "api_token",
264 }
266 # Build teams list
267 teams = [team_id] if team_id else []
269 # Build scopes dict
270 # Empty permissions = defer to RBAC at runtime (not wildcard access)
271 scopes_dict = None
272 if scope:
273 scopes_dict = {
274 "server_id": scope.server_id,
275 "permissions": scope.permissions if scope.permissions is not None else [],
276 "ip_restrictions": scope.ip_restrictions or [],
277 "time_restrictions": scope.time_restrictions or {},
278 }
279 else:
280 scopes_dict = {
281 "server_id": None,
282 "permissions": [], # Empty = inherit from RBAC at runtime
283 "ip_restrictions": [],
284 "time_restrictions": {},
285 }
287 # Generate JWT token using the centralized token creation utility
288 # Pass structured data to the enhanced create_jwt_token function
289 return await create_jwt_token(
290 data={"sub": user_email, "jti": jti, "token_use": "api"}, # nosec B105 - token type marker, not a password
291 expires_in_minutes=expires_in_minutes,
292 user_data=user_data,
293 teams=teams,
294 scopes=scopes_dict,
295 )
297 def _hash_token(self, token: str) -> str:
298 """Create secure hash of token for storage.
300 Args:
301 token: Raw token string
303 Returns:
304 str: SHA-256 hash of token
306 Examples:
307 >>> service = TokenCatalogService(None)
308 >>> hash_val = service._hash_token("test_token")
309 >>> len(hash_val) == 64
310 True
311 """
312 return hashlib.sha256(token.encode()).hexdigest()
314 def _validate_scope_containment(
315 self,
316 requested_permissions: Optional[List[str]],
317 caller_permissions: Optional[List[str]],
318 ) -> None:
319 """Validate that requested permissions don't exceed caller's permissions.
321 SECURITY: This is fail-secure. If caller_permissions is empty/None,
322 custom scopes are DENIED. Users without explicit permissions can only
323 create tokens with empty scope (inherit at runtime).
325 Args:
326 requested_permissions: Permissions requested for new/updated token
327 caller_permissions: Caller's effective permissions (RBAC + current token scopes)
329 Raises:
330 ValueError: If requested permissions exceed caller's permissions
331 """
332 # No requested permissions = empty scope, always allowed
333 if not requested_permissions:
334 return
336 # FAIL-SECURE: If caller has no permissions, deny any custom scope
337 if not caller_permissions:
338 raise ValueError("Cannot specify custom token permissions. " + "You have no explicit permissions to delegate. " + "Create a token without scope to inherit permissions at runtime.")
340 # Wildcard caller can grant anything
341 if "*" in caller_permissions:
342 return
344 # Wildcard request requires wildcard caller
345 if "*" in requested_permissions:
346 raise ValueError("Cannot create token with wildcard permissions. " + "Your effective permissions do not include wildcard access.")
348 # Check each requested permission
349 for req_perm in requested_permissions:
350 if req_perm in caller_permissions:
351 continue
353 # Check for category wildcard (e.g., "tools.*" allows "tools.read")
354 if "." in req_perm: 354 ↛ 359line 354 didn't jump to line 359 because the condition on line 354 was always true
355 category = req_perm.split(".")[0]
356 if f"{category}.*" in caller_permissions:
357 continue
359 raise ValueError(f"Cannot grant permission '{req_perm}' - not in your effective permissions.")
361 async def create_token(
362 self,
363 user_email: str,
364 name: str,
365 description: Optional[str] = None,
366 scope: Optional[TokenScope] = None,
367 expires_in_days: Optional[int] = None,
368 tags: Optional[List[str]] = None,
369 team_id: Optional[str] = None,
370 caller_permissions: Optional[List[str]] = None,
371 is_active: bool = True,
372 ) -> tuple[EmailApiToken, str]:
373 """
374 Create a new API token with team-level scoping and additional configurations.
376 This method generates a JWT-based API token with team-level scoping and optional security configurations,
377 such as expiration, permissions, IP restrictions, and usage limits. The token is associated with a user
378 and a specific team, ensuring access control and multi-tenancy support.
380 The function will:
381 - Validate the existence of the user.
382 - Ensure the user is an active member of the specified team.
383 - Verify that the token name is unique for the user+team combination.
384 - Generate a JWT with the specified scoping parameters (e.g., permissions, IP, etc.).
385 - Store the token in the database with the relevant details and return the token and raw JWT string.
387 Args:
388 user_email (str): The email address of the user requesting the token.
389 name (str): A unique, human-readable name for the token (must be unique per user+team).
390 description (Optional[str]): A description for the token (default is None).
391 scope (Optional[TokenScope]): The scoping configuration for the token, including permissions,
392 server ID, IP restrictions, etc. (default is None).
393 expires_in_days (Optional[int]): The expiration time in days for the token (None means no expiration).
394 tags (Optional[List[str]]): A list of organizational tags for the token (default is an empty list).
395 team_id (Optional[str]): The team ID to which the token should be scoped. This is required for team-level scoping.
396 caller_permissions (Optional[List[str]]): The permissions of the caller creating the token. Used for
397 scope containment validation to ensure the new token cannot have broader permissions than the caller.
398 is_active (bool): Whether the token should be created as active (default is True).
400 Returns:
401 tuple[EmailApiToken, str]: A tuple where the first element is the `EmailApiToken` database record and
402 the second element is the raw JWT token string. The `EmailApiToken` contains the database record with the
403 token details.
405 Raises:
406 ValueError: If any of the following validation checks fail:
407 - The `user_email` does not correspond to an existing user.
408 - The `team_id` is missing or the user is not an active member of the specified team.
409 - A token with the same name already exists for the given user and team.
410 - Invalid token configuration (e.g., invalid expiration date).
412 Examples:
413 >>> # This method requires database operations, shown for reference
414 >>> service = TokenCatalogService(None) # Would use real DB session
415 >>> # token, raw_token = await service.create_token(...)
416 >>> # Returns (EmailApiToken, raw_token_string) tuple
417 """
418 # # Enforce team-level scoping requirement
419 # if not team_id:
420 # raise ValueError("team_id is required for token creation. " "Please select a specific team before creating a token. " "You cannot create tokens while viewing 'All Teams'.")
422 # Validate user exists
423 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none()
425 if not user:
426 raise ValueError(f"User not found: {user_email}")
428 # Validate scope containment (fail-secure if no caller_permissions)
429 if scope and scope.permissions:
430 self._validate_scope_containment(scope.permissions, caller_permissions)
432 # Validate team exists and user is active member
433 if team_id:
434 # First-Party
435 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel
437 # Check if team exists
438 team = self.db.execute(select(EmailTeam).where(EmailTeam.id == team_id)).scalar_one_or_none()
440 if not team:
441 raise ValueError(f"Team not found: {team_id}")
443 # Verify user is an active member of the team
444 membership = self.db.execute(
445 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)))
446 ).scalar_one_or_none()
448 if not membership:
449 raise ValueError(f"User {user_email} is not an active member of team {team_id}. Only team members can create tokens for the team.")
451 # Check for duplicate active token name for this user+team
452 existing_token = self.db.execute(
453 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id == team_id, EmailApiToken.is_active.is_(True)))
454 ).scalar_one_or_none()
456 if existing_token:
457 raise ValueError(f"Token with name '{name}' already exists for user {user_email} in team {team_id}. Please choose a different name.")
459 # CALCULATE EXPIRATION DATE
460 expires_at = None
461 if expires_in_days:
462 expires_at = utc_now() + timedelta(days=expires_in_days)
464 jti = str(uuid.uuid4()) # Unique JWT ID
465 # Generate JWT token with all necessary claims
466 raw_token = await self._generate_token(user_email=user_email, jti=jti, team_id=team_id, expires_at=expires_at, scope=scope, user=user) # Pass user object to include admin status
468 # Hash token for secure storage
469 token_hash = self._hash_token(raw_token)
471 # Create database record
472 api_token = EmailApiToken(
473 id=str(uuid.uuid4()),
474 user_email=user_email,
475 team_id=team_id, # Store team association
476 name=name,
477 jti=jti,
478 description=description,
479 token_hash=token_hash, # Store hash, not raw token
480 expires_at=expires_at,
481 tags=tags or [],
482 # Store scoping information
483 server_id=scope.server_id if scope else None,
484 resource_scopes=scope.permissions if scope else [],
485 ip_restrictions=scope.ip_restrictions if scope else [],
486 time_restrictions=scope.time_restrictions if scope else {},
487 usage_limits=scope.usage_limits if scope else {},
488 # Token status
489 is_active=is_active,
490 created_at=utc_now(),
491 last_used=None,
492 )
494 self.db.add(api_token)
495 self.db.commit()
496 self.db.refresh(api_token)
498 token_type = f"team-scoped (team: {team_id})" if team_id else "public-only"
499 logger.info(f"Created {token_type} API token '{name}' for user {user_email}. Token ID: {api_token.id}, Expires: {expires_at or 'Never'}")
500 return api_token, raw_token
502 async def list_user_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
503 """List API tokens for a user.
505 Args:
506 user_email: User's email address
507 include_inactive: Include inactive/expired tokens
508 limit: Maximum tokens to return
509 offset: Number of tokens to skip
511 Returns:
512 List[EmailApiToken]: User's API tokens
514 Examples:
515 >>> service = TokenCatalogService(None) # Would use real DB session
516 >>> # Returns List[EmailApiToken] for user
517 """
518 # Validate parameters
519 if limit <= 0 or limit > 1000:
520 limit = 50 # Use default
521 offset = max(offset, 0) # Use default
522 query = select(EmailApiToken).where(EmailApiToken.user_email == user_email)
524 if not include_inactive:
525 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
527 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
529 result = self.db.execute(query)
530 return result.scalars().all()
532 async def list_team_tokens(self, team_id: str, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]:
533 """List API tokens for a team (only accessible by team owners).
535 Args:
536 team_id: Team ID to list tokens for
537 user_email: User's email (must be team owner)
538 include_inactive: Include inactive/expired tokens
539 limit: Maximum tokens to return
540 offset: Number of tokens to skip
542 Returns:
543 List[EmailApiToken]: Team's API tokens
545 Raises:
546 ValueError: If user is not a team owner
547 """
548 # Validate user is team owner
549 # First-Party
550 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
552 membership = self.db.execute(
553 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True)))
554 ).scalar_one_or_none()
556 if not membership:
557 raise ValueError(f"Only team owners can view team tokens for {team_id}")
559 # Validate parameters
560 if limit <= 0 or limit > 1000:
561 limit = 50
562 offset = max(offset, 0)
564 query = select(EmailApiToken).where(EmailApiToken.team_id == team_id)
566 if not include_inactive: 566 ↛ 569line 566 didn't jump to line 569 because the condition on line 566 was always true
567 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now())))
569 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset)
570 result = self.db.execute(query)
571 return result.scalars().all()
573 async def get_token(self, token_id: str, user_email: Optional[str] = None) -> Optional[EmailApiToken]:
574 """Get a specific token by ID.
576 Args:
577 token_id: Token ID
578 user_email: Optional user email filter for security
580 Returns:
581 Optional[EmailApiToken]: Token if found and authorized
583 Examples:
584 >>> service = TokenCatalogService(None) # Would use real DB session
585 >>> # Returns Optional[EmailApiToken] if found and authorized
586 """
587 query = select(EmailApiToken).where(EmailApiToken.id == token_id)
589 if user_email:
590 query = query.where(EmailApiToken.user_email == user_email)
592 result = self.db.execute(query)
593 return result.scalar_one_or_none()
595 async def update_token(
596 self,
597 token_id: str,
598 user_email: str,
599 name: Optional[str] = None,
600 description: Optional[str] = None,
601 scope: Optional[TokenScope] = None,
602 tags: Optional[List[str]] = None,
603 caller_permissions: Optional[List[str]] = None,
604 is_active: Optional[bool] = None,
605 ) -> Optional[EmailApiToken]:
606 """Update an existing token with scope containment validation.
608 Args:
609 token_id: Token ID to update
610 user_email: Owner's email for security
611 name: New token name
612 description: New description
613 scope: New scoping configuration
614 tags: New tags
615 caller_permissions: Caller's effective permissions for scope containment
616 is_active: New token active status
618 Returns:
619 Optional[EmailApiToken]: Updated token if found
621 Raises:
622 ValueError: If token not found or name conflicts
624 Examples:
625 >>> service = TokenCatalogService(None) # Would use real DB session
626 >>> # Returns Optional[EmailApiToken] if updated successfully
627 """
628 token = await self.get_token(token_id, user_email)
629 if not token:
630 raise ValueError("Token not found or not authorized")
632 # Validate scope containment for scope changes
633 if scope and scope.permissions:
634 self._validate_scope_containment(scope.permissions, caller_permissions)
636 # Check for duplicate name if changing
637 if name and name != token.name:
638 existing = self.db.execute(
639 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.id != token_id, EmailApiToken.is_active.is_(True)))
640 ).scalar_one_or_none()
642 if existing:
643 raise ValueError(f"Token name '{name}' already exists")
645 token.name = name
647 if description is not None:
648 token.description = description
650 if tags is not None:
651 token.tags = tags
653 if is_active is not None:
654 token.is_active = is_active
656 if scope:
657 token.server_id = scope.server_id
658 token.resource_scopes = scope.permissions
659 token.ip_restrictions = scope.ip_restrictions
660 token.time_restrictions = scope.time_restrictions
661 token.usage_limits = scope.usage_limits
663 self.db.commit()
664 self.db.refresh(token)
666 logger.info(f"Updated token '{token.name}' for user {user_email}")
668 return token
670 async def revoke_token(self, token_id: str, user_email: str, revoked_by: str, reason: Optional[str] = None) -> bool:
671 """Revoke a token owned by the specified user.
673 Args:
674 token_id: Token ID to revoke
675 user_email: Owner's email - token must belong to this user (ownership check)
676 revoked_by: Email of user performing revocation (for audit)
677 reason: Optional reason for revocation
679 Returns:
680 bool: True if token was revoked, False if not found or not authorized
682 Examples:
683 >>> service = TokenCatalogService(None) # Would use real DB session
684 >>> # Returns bool: True if token was revoked successfully
685 """
686 # SECURITY FIX: Filter by owner to prevent cross-user revocation
687 token = await self.get_token(token_id, user_email)
688 if not token:
689 return False
691 # Mark token as inactive
692 token.is_active = False
694 # Add to blacklist
695 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason)
697 self.db.add(revocation)
698 self.db.commit()
700 # Invalidate auth cache for revoked token
701 try:
702 # Standard
703 import asyncio # pylint: disable=import-outside-toplevel
705 # First-Party
706 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
708 asyncio.create_task(auth_cache.invalidate_revocation(token.jti))
709 except Exception as cache_error:
710 logger.debug(f"Failed to invalidate auth cache for revoked token: {cache_error}")
712 logger.info(f"Revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}")
714 return True
716 async def admin_revoke_token(self, token_id: str, revoked_by: str, reason: Optional[str] = None) -> bool:
717 """Admin-only: Revoke any token without ownership check.
719 WARNING: This method bypasses ownership verification.
720 Only call from admin-authenticated endpoints.
722 Args:
723 token_id: Token ID to revoke
724 revoked_by: Admin email for audit
725 reason: Revocation reason
727 Returns:
728 bool: True if token was revoked, False if not found
730 Examples:
731 >>> service = TokenCatalogService(None) # Would use real DB session
732 >>> # Returns bool: True if token was revoked successfully
733 """
734 # No user filter - admin can revoke any token
735 token = await self.get_token(token_id)
736 if not token:
737 return False
739 token.is_active = False
740 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason)
741 self.db.add(revocation)
742 self.db.commit()
744 try:
745 # Standard
746 import asyncio # pylint: disable=import-outside-toplevel
748 # First-Party
749 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
751 asyncio.create_task(auth_cache.invalidate_revocation(token.jti))
752 except Exception as cache_error:
753 logger.debug(f"Failed to invalidate auth cache: {cache_error}")
755 logger.info(f"Admin revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}")
756 return True
758 async def is_token_revoked(self, jti: str) -> bool:
759 """Check if a token JTI is revoked.
761 Args:
762 jti: JWT ID to check
764 Returns:
765 bool: True if token is revoked
767 Examples:
768 >>> service = TokenCatalogService(None) # Would use real DB session
769 >>> # Returns bool: True if token is revoked
770 """
771 revocation = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)).scalar_one_or_none()
773 return revocation is not None
775 async def log_token_usage(
776 self,
777 jti: str,
778 user_email: str,
779 endpoint: Optional[str] = None,
780 method: Optional[str] = None,
781 ip_address: Optional[str] = None,
782 user_agent: Optional[str] = None,
783 status_code: Optional[int] = None,
784 response_time_ms: Optional[int] = None,
785 blocked: bool = False,
786 block_reason: Optional[str] = None,
787 ) -> None:
788 """Log token usage for analytics and security.
790 Args:
791 jti: JWT ID of token used
792 user_email: Token owner's email
793 endpoint: API endpoint accessed
794 method: HTTP method
795 ip_address: Client IP address
796 user_agent: Client user agent
797 status_code: HTTP response status
798 response_time_ms: Response time in milliseconds
799 blocked: Whether request was blocked
800 block_reason: Reason for blocking
802 Examples:
803 >>> service = TokenCatalogService(None) # Would use real DB session
804 >>> # Logs token usage for analytics - no return value
805 """
806 usage_log = TokenUsageLog(
807 token_jti=jti,
808 user_email=user_email,
809 endpoint=endpoint,
810 method=method,
811 ip_address=ip_address,
812 user_agent=user_agent,
813 status_code=status_code,
814 response_time_ms=response_time_ms,
815 blocked=blocked,
816 block_reason=block_reason,
817 )
819 self.db.add(usage_log)
820 self.db.commit()
822 # Update token last_used timestamp
823 token = self.db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti)).scalar_one_or_none()
825 if token:
826 token.last_used = utc_now()
827 self.db.commit()
829 async def get_token_usage_stats(self, user_email: str, token_id: Optional[str] = None, days: int = 30) -> dict:
830 """Get token usage statistics.
832 Args:
833 user_email: User's email address
834 token_id: Optional specific token ID
835 days: Number of days to analyze
837 Returns:
838 dict: Usage statistics
840 Examples:
841 >>> service = TokenCatalogService(None) # Would use real DB session
842 >>> # Returns dict with usage statistics
843 """
844 start_date = utc_now() - timedelta(days=days)
846 # Get token JTI if specific token requested
847 token_jti = None
848 if token_id:
849 token = await self.get_token(token_id, user_email)
850 if token: 850 ↛ 854line 850 didn't jump to line 854 because the condition on line 850 was always true
851 token_jti = token.jti
853 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite
854 dialect_name = self.db.get_bind().dialect.name
855 if dialect_name == "postgresql":
856 return await self._get_usage_stats_postgresql(user_email, start_date, token_jti, days)
857 return await self._get_usage_stats_python(user_email, start_date, token_jti, days)
859 async def _get_usage_stats_postgresql(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict:
860 """Compute usage stats using PostgreSQL SQL aggregation.
862 Args:
863 user_email: User's email address
864 start_date: Start date for analysis
865 token_jti: Optional token JTI filter
866 days: Number of days being analyzed
868 Returns:
869 dict: Usage statistics computed via SQL
870 """
871 # Build filter conditions
872 conditions = [TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date]
873 if token_jti:
874 conditions.append(TokenUsageLog.token_jti == token_jti)
876 base_filter = and_(*conditions)
878 # Main stats query using SQL aggregation
879 # Match Python behavior:
880 # - status_code must be non-null AND non-zero AND < 400 for success count
881 # - response_time_ms must be non-null AND non-zero for average (Python: if log.response_time_ms)
882 stats_query = (
883 select(
884 func.count().label("total"), # pylint: disable=not-callable
885 func.sum(
886 case(
887 (and_(TokenUsageLog.status_code.isnot(None), TokenUsageLog.status_code > 0, TokenUsageLog.status_code < 400), 1),
888 else_=0,
889 )
890 ).label("successful"),
891 func.sum(case((TokenUsageLog.blocked.is_(True), 1), else_=0)).label("blocked"),
892 # Only average non-null and non-zero response times (NULL values are ignored by AVG)
893 func.avg(
894 case(
895 (and_(TokenUsageLog.response_time_ms.isnot(None), TokenUsageLog.response_time_ms > 0), TokenUsageLog.response_time_ms),
896 else_=None,
897 )
898 ).label("avg_response"),
899 )
900 .select_from(TokenUsageLog)
901 .where(base_filter)
902 )
904 result = self.db.execute(stats_query).fetchone()
906 total_requests = result.total or 0
907 successful_requests = result.successful or 0
908 blocked_requests = result.blocked or 0
909 avg_response_time = float(result.avg_response) if result.avg_response else 0.0
911 # Top endpoints query using SQL GROUP BY
912 # Match Python behavior: exclude None AND empty string endpoints (Python: if log.endpoint)
913 endpoints_query = (
914 select(TokenUsageLog.endpoint, func.count().label("count")) # pylint: disable=not-callable
915 .where(and_(base_filter, TokenUsageLog.endpoint.isnot(None), TokenUsageLog.endpoint != ""))
916 .group_by(TokenUsageLog.endpoint)
917 .order_by(func.count().desc()) # pylint: disable=not-callable
918 .limit(5)
919 )
921 endpoints_result = self.db.execute(endpoints_query).fetchall()
922 top_endpoints = [(row.endpoint, row.count) for row in endpoints_result]
924 return {
925 "period_days": days,
926 "total_requests": total_requests,
927 "successful_requests": successful_requests,
928 "blocked_requests": blocked_requests,
929 "success_rate": successful_requests / total_requests if total_requests > 0 else 0,
930 "average_response_time_ms": round(avg_response_time, 2),
931 "top_endpoints": top_endpoints,
932 }
934 async def _get_usage_stats_python(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict:
935 """Compute usage stats using Python (fallback for SQLite).
937 Args:
938 user_email: User's email address
939 start_date: Start date for analysis
940 token_jti: Optional token JTI filter
941 days: Number of days being analyzed
943 Returns:
944 dict: Usage statistics computed in Python
945 """
946 query = select(TokenUsageLog).where(and_(TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date))
948 if token_jti:
949 query = query.where(TokenUsageLog.token_jti == token_jti)
951 usage_logs = self.db.execute(query).scalars().all()
953 # Calculate statistics
954 total_requests = len(usage_logs)
955 successful_requests = sum(1 for log in usage_logs if log.status_code and log.status_code < 400)
956 blocked_requests = sum(1 for log in usage_logs if log.blocked)
958 # Average response time
959 response_times = [log.response_time_ms for log in usage_logs if log.response_time_ms]
960 avg_response_time = sum(response_times) / len(response_times) if response_times else 0
962 # Most accessed endpoints
963 endpoint_counts: dict = {}
964 for log in usage_logs:
965 if log.endpoint: 965 ↛ 964line 965 didn't jump to line 964 because the condition on line 965 was always true
966 endpoint_counts[log.endpoint] = endpoint_counts.get(log.endpoint, 0) + 1
968 top_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:5]
970 return {
971 "period_days": days,
972 "total_requests": total_requests,
973 "successful_requests": successful_requests,
974 "blocked_requests": blocked_requests,
975 "success_rate": successful_requests / total_requests if total_requests > 0 else 0,
976 "average_response_time_ms": round(avg_response_time, 2),
977 "top_endpoints": top_endpoints,
978 }
980 async def get_token_revocation(self, jti: str) -> Optional[TokenRevocation]:
981 """Get token revocation information by JTI.
983 Args:
984 jti: JWT token ID
986 Returns:
987 Optional[TokenRevocation]: Revocation info if token is revoked
989 Examples:
990 >>> service = TokenCatalogService(None) # Would use real DB session
991 >>> # Returns Optional[TokenRevocation] if token is revoked
992 """
993 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
994 return result.scalar_one_or_none()
996 async def cleanup_expired_tokens(self) -> int:
997 """Clean up expired tokens using bulk UPDATE.
999 Uses a single SQL UPDATE statement instead of loading tokens into memory
1000 and updating them one by one. This is more efficient and avoids memory
1001 issues when many tokens expire at once.
1003 Returns:
1004 int: Number of tokens cleaned up
1006 Examples:
1007 >>> service = TokenCatalogService(None) # Would use real DB session
1008 >>> # Returns int: Number of tokens cleaned up
1009 """
1010 try:
1011 now = utc_now()
1012 count = self.db.query(EmailApiToken).filter(EmailApiToken.expires_at < now, EmailApiToken.is_active.is_(True)).update({"is_active": False}, synchronize_session=False)
1014 self.db.commit()
1016 if count > 0:
1017 logger.info(f"Cleaned up {count} expired tokens")
1019 return count
1021 except Exception as e:
1022 self.db.rollback()
1023 logger.error(f"Failed to cleanup expired tokens: {e}")
1024 return 0