Coverage for mcpgateway / services / token_catalog_service.py: 99%

329 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/token_catalog_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Token Catalog Service. 

8This module provides comprehensive API token management with scoping, 

9revocation, usage tracking, and analytics for email-based users. 

10 

11Examples: 

12 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService 

13 >>> service = TokenCatalogService(None) # Mock database for doctest 

14 >>> # Service provides full token lifecycle management 

15""" 

16 

17# Standard 

18import asyncio 

19from datetime import datetime, timedelta, timezone 

20import hashlib 

21import math 

22from typing import Dict, List, Optional 

23import uuid 

24 

25# Third-Party 

26from sqlalchemy import and_, case, func, or_, select 

27from sqlalchemy.orm import Session 

28 

29# First-Party 

30from mcpgateway.common.validators import SecurityValidator 

31from mcpgateway.config import settings 

32from mcpgateway.db import EmailApiToken, EmailUser, Permissions, TokenRevocation, TokenUsageLog, utc_now 

33from mcpgateway.services.logging_service import LoggingService 

34from mcpgateway.utils.create_jwt_token import create_jwt_token 

35 

36# Initialize logging 

37logging_service = LoggingService() 

38logger = logging_service.get_logger(__name__) 

39 

40# Strong references to background tasks to prevent GC before completion 

41_background_tasks: set[asyncio.Task] = set() 

42 

43 

44class TokenScope: 

45 """Token scoping configuration for fine-grained access control. 

46 

47 This class encapsulates token scoping parameters including 

48 server restrictions, permissions, IP limitations, and usage quotas. 

49 

50 Attributes: 

51 server_id (Optional[str]): Limit token to specific server 

52 permissions (List[str]): Specific permission scopes 

53 ip_restrictions (List[str]): IP address/CIDR restrictions 

54 time_restrictions (dict): Time-based access limitations 

55 usage_limits (dict): Rate limiting and quota settings 

56 

57 Examples: 

58 >>> scope = TokenScope( 

59 ... server_id="prod-server-123", 

60 ... permissions=["tools.read", "resources.read"], 

61 ... ip_restrictions=["192.168.1.0/24"], 

62 ... time_restrictions={"business_hours_only": True} 

63 ... ) 

64 >>> scope.is_server_scoped() 

65 True 

66 >>> scope.has_permission("tools.read") 

67 True 

68 >>> scope.has_permission("tools.write") 

69 False 

70 >>> scope.has_permission("resources.read") 

71 True 

72 >>> 

73 >>> # Test empty scope 

74 >>> empty_scope = TokenScope() 

75 >>> empty_scope.is_server_scoped() 

76 False 

77 >>> empty_scope.has_permission("anything") 

78 False 

79 >>> 

80 >>> # Test global scope 

81 >>> global_scope = TokenScope(permissions=["*"]) 

82 >>> global_scope.has_permission("*") 

83 True 

84 """ 

85 

86 def __init__( 

87 self, 

88 server_id: Optional[str] = None, 

89 permissions: Optional[List[str]] = None, 

90 ip_restrictions: Optional[List[str]] = None, 

91 time_restrictions: Optional[dict] = None, 

92 usage_limits: Optional[dict] = None, 

93 ): 

94 """Initialize TokenScope with specified restrictions and limits. 

95 

96 Args: 

97 server_id: Optional server ID to scope token to specific server 

98 permissions: List of permissions granted to this token 

99 ip_restrictions: List of IP addresses/ranges allowed to use token 

100 time_restrictions: Dictionary of time-based access restrictions 

101 usage_limits: Dictionary of usage limits for the token 

102 """ 

103 self.server_id = server_id 

104 self.permissions = permissions or [] 

105 self.ip_restrictions = ip_restrictions or [] 

106 self.time_restrictions = time_restrictions or {} 

107 self.usage_limits = usage_limits or {} 

108 

109 def is_server_scoped(self) -> bool: 

110 """Check if token is scoped to a specific server. 

111 

112 Returns: 

113 bool: True if scoped to a server, False otherwise. 

114 """ 

115 return self.server_id is not None 

116 

117 def has_permission(self, permission: str) -> bool: 

118 """Check if scope includes specific permission. 

119 

120 Args: 

121 permission: Permission string to check for. 

122 

123 Returns: 

124 bool: True if permission is included, False otherwise. 

125 """ 

126 return permission in self.permissions 

127 

128 def to_dict(self) -> dict: 

129 """Convert scope to dictionary for JSON storage. 

130 

131 Returns: 

132 dict: Dictionary representation of the token scope. 

133 

134 Examples: 

135 >>> scope = TokenScope(server_id="server-123", permissions=["read", "write"]) 

136 >>> result = scope.to_dict() 

137 >>> result["server_id"] 

138 'server-123' 

139 >>> result["permissions"] 

140 ['read', 'write'] 

141 >>> isinstance(result, dict) 

142 True 

143 """ 

144 return {"server_id": self.server_id, "permissions": self.permissions, "ip_restrictions": self.ip_restrictions, "time_restrictions": self.time_restrictions, "usage_limits": self.usage_limits} 

145 

146 @classmethod 

147 def from_dict(cls, data: dict) -> "TokenScope": 

148 """Create TokenScope from dictionary. 

149 

150 Args: 

151 data: Dictionary containing scope configuration. 

152 

153 Returns: 

154 TokenScope: New TokenScope instance. 

155 

156 Examples: 

157 >>> data = { 

158 ... "server_id": "server-456", 

159 ... "permissions": ["tools.read", "tools.execute"], 

160 ... "ip_restrictions": ["10.0.0.0/8"] 

161 ... } 

162 >>> scope = TokenScope.from_dict(data) 

163 >>> scope.server_id 

164 'server-456' 

165 >>> scope.permissions 

166 ['tools.read', 'tools.execute'] 

167 >>> scope.is_server_scoped() 

168 True 

169 >>> scope.has_permission("tools.read") 

170 True 

171 >>> 

172 >>> # Test empty dict 

173 >>> empty_scope = TokenScope.from_dict({}) 

174 >>> empty_scope.server_id is None 

175 True 

176 >>> empty_scope.permissions 

177 [] 

178 """ 

179 return cls( 

180 server_id=data.get("server_id"), 

181 permissions=data.get("permissions", []), 

182 ip_restrictions=data.get("ip_restrictions", []), 

183 time_restrictions=data.get("time_restrictions", {}), 

184 usage_limits=data.get("usage_limits", {}), 

185 ) 

186 

187 

188class TokenCatalogService: 

189 """Service for managing user API token catalogs. 

190 

191 This service provides comprehensive token lifecycle management including 

192 creation, scoping, revocation, usage tracking, and analytics. It handles 

193 JWT-based API tokens with fine-grained access control, team support, 

194 and comprehensive audit logging. 

195 

196 Key features: 

197 - Token creation with customizable scopes and permissions 

198 - Team-based token management with role-based access 

199 - Token revocation and blacklisting 

200 - Usage tracking and analytics 

201 - IP and time-based restrictions 

202 - Automatic cleanup of expired tokens 

203 

204 Attributes: 

205 db (Session): SQLAlchemy database session for token operations 

206 

207 Examples: 

208 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService 

209 >>> service = TokenCatalogService(None) # Mock database for doctest 

210 >>> service.db is None 

211 True 

212 """ 

213 

214 def __init__(self, db: Session): 

215 """Initialize TokenCatalogService with database session. 

216 

217 Args: 

218 db: SQLAlchemy database session for token operations 

219 """ 

220 self.db = db 

221 

222 async def _generate_token( 

223 self, user_email: str, jti: str, team_id: Optional[str] = None, expires_at: Optional[datetime] = None, scope: Optional["TokenScope"] = None, user: Optional[object] = None 

224 ) -> str: 

225 """Generate a JWT token for API access. 

226 

227 This internal method creates a properly formatted JWT token with all 

228 necessary claims including user identity, scopes, team membership, 

229 and expiration. The token follows ContextForge JWT structure. 

230 

231 Args: 

232 user_email: User's email address for the token subject 

233 jti: JWT ID for token uniqueness 

234 team_id: Optional team ID for team-scoped tokens 

235 expires_at: Optional expiration datetime 

236 scope: Optional token scope information for access control 

237 user: Optional user object to extract admin privileges 

238 

239 Returns: 

240 str: Signed JWT token string ready for API authentication 

241 

242 Raises: 

243 ValueError: If expires_at is in the past (cannot create already-expired tokens) 

244 

245 Note: 

246 This is an internal method. Use create_token() to generate 

247 tokens with proper database tracking and validation. 

248 """ 

249 # Calculate expiration in minutes from expires_at 

250 expires_in_minutes = 0 

251 if expires_at: 

252 now = datetime.now(timezone.utc) 

253 delta = expires_at - now 

254 delta_seconds = delta.total_seconds() 

255 

256 # Guard: reject already-expired expiration times 

257 if delta_seconds <= 0: 

258 raise ValueError("Token expiration time is in the past. Cannot create already-expired tokens.") 

259 

260 # Use ceiling to ensure we always have at least 1 minute expiration 

261 # This prevents <60s from rounding to 0 and creating non-expiring tokens 

262 expires_in_minutes = max(1, math.ceil(delta_seconds / 60)) 

263 

264 # Build user data dict 

265 user_data = { 

266 "email": user_email, 

267 "full_name": "API Token User", 

268 "is_admin": user.is_admin if user else False, 

269 "auth_provider": "api_token", 

270 } 

271 

272 # Build teams list — None means "all teams" (admin bypass when is_admin=true), 

273 # [] means "public-only" (see normalize_token_teams() in auth.py) 

274 teams = [team_id] if team_id else None 

275 

276 # Build scopes dict 

277 # Empty permissions = defer to RBAC at runtime (not wildcard access) 

278 scopes_dict = None 

279 if scope: 

280 scopes_dict = { 

281 "server_id": scope.server_id, 

282 "permissions": scope.permissions if scope.permissions is not None else [], 

283 "ip_restrictions": scope.ip_restrictions or [], 

284 "time_restrictions": scope.time_restrictions or {}, 

285 } 

286 else: 

287 scopes_dict = { 

288 "server_id": None, 

289 "permissions": [], # Empty = inherit from RBAC at runtime 

290 "ip_restrictions": [], 

291 "time_restrictions": {}, 

292 } 

293 

294 # Auto-inject servers.use for tokens with explicit MCP-related permissions. 

295 # Without servers.use, the token scoping middleware blocks /rpc and /mcp 

296 # transport access, making MCP-method permissions useless. 

297 permissions = scopes_dict["permissions"] 

298 if permissions and "*" not in permissions and "servers.use" not in permissions: 

299 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions): 

300 scopes_dict["permissions"] = [*permissions, "servers.use"] 

301 

302 # Generate JWT token using the centralized token creation utility 

303 # Pass structured data to the enhanced create_jwt_token function 

304 return await create_jwt_token( 

305 data={"sub": user_email, "jti": jti, "token_use": "api"}, # nosec B105 - token type marker, not a password 

306 expires_in_minutes=expires_in_minutes, 

307 user_data=user_data, 

308 teams=teams, 

309 scopes=scopes_dict, 

310 ) 

311 

312 def _hash_token(self, token: str) -> str: 

313 """Create secure hash of token for storage. 

314 

315 Args: 

316 token: Raw token string 

317 

318 Returns: 

319 str: SHA-256 hash of token 

320 

321 Examples: 

322 >>> service = TokenCatalogService(None) 

323 >>> hash_val = service._hash_token("test_token") 

324 >>> len(hash_val) == 64 

325 True 

326 """ 

327 return hashlib.sha256(token.encode()).hexdigest() 

328 

329 def _validate_scope_containment( 

330 self, 

331 requested_permissions: Optional[List[str]], 

332 caller_permissions: Optional[List[str]], 

333 ) -> None: 

334 """Validate that requested permissions don't exceed caller's permissions. 

335 

336 SECURITY: This is fail-secure. If caller_permissions is empty/None, 

337 custom scopes are DENIED. Users without explicit permissions can only 

338 create tokens with empty scope (inherit at runtime). 

339 

340 Args: 

341 requested_permissions: Permissions requested for new/updated token 

342 caller_permissions: Caller's effective permissions (RBAC + current token scopes) 

343 

344 Raises: 

345 ValueError: If requested permissions exceed caller's permissions 

346 """ 

347 # No requested permissions = empty scope, always allowed 

348 if not requested_permissions: 

349 return 

350 

351 # FAIL-SECURE: If caller has no permissions, deny any custom scope 

352 if not caller_permissions: 

353 raise ValueError("Cannot specify custom token permissions. " + "You have no explicit permissions to delegate. " + "Create a token without scope to inherit permissions at runtime.") 

354 

355 # Wildcard caller can grant anything 

356 if "*" in caller_permissions: 

357 return 

358 

359 # Wildcard request requires wildcard caller 

360 if "*" in requested_permissions: 

361 raise ValueError("Cannot create token with wildcard permissions. " + "Your effective permissions do not include wildcard access.") 

362 

363 # Check each requested permission 

364 for req_perm in requested_permissions: 

365 if req_perm in caller_permissions: 

366 continue 

367 

368 # Check for category wildcard (e.g., "tools.*" allows "tools.read") 

369 if "." in req_perm: 

370 category = req_perm.split(".")[0] 

371 if f"{category}.*" in caller_permissions: 

372 continue 

373 

374 raise ValueError(f"Cannot grant permission '{req_perm}' - not in your effective permissions.") 

375 

376 async def create_token( 

377 self, 

378 user_email: str, 

379 name: str, 

380 description: Optional[str] = None, 

381 scope: Optional[TokenScope] = None, 

382 expires_in_days: Optional[int] = None, 

383 tags: Optional[List[str]] = None, 

384 team_id: Optional[str] = None, 

385 caller_permissions: Optional[List[str]] = None, 

386 is_active: bool = True, 

387 ) -> tuple[EmailApiToken, str]: 

388 """ 

389 Create a new API token with team-level scoping and additional configurations. 

390 

391 This method generates a JWT-based API token with team-level scoping and optional security configurations, 

392 such as expiration, permissions, IP restrictions, and usage limits. The token is associated with a user 

393 and a specific team, ensuring access control and multi-tenancy support. 

394 

395 The function will: 

396 - Validate the existence of the user. 

397 - Ensure the user is an active member of the specified team. 

398 - Verify that the token name is unique for the user+team combination. 

399 - Generate a JWT with the specified scoping parameters (e.g., permissions, IP, etc.). 

400 - Store the token in the database with the relevant details and return the token and raw JWT string. 

401 

402 Args: 

403 user_email (str): The email address of the user requesting the token. 

404 name (str): A unique, human-readable name for the token (must be unique per user+team). 

405 description (Optional[str]): A description for the token (default is None). 

406 scope (Optional[TokenScope]): The scoping configuration for the token, including permissions, 

407 server ID, IP restrictions, etc. (default is None). 

408 expires_in_days (Optional[int]): The expiration time in days for the token (None means no expiration). 

409 tags (Optional[List[str]]): A list of organizational tags for the token (default is an empty list). 

410 team_id (Optional[str]): The team ID to which the token should be scoped. This is required for team-level scoping. 

411 caller_permissions (Optional[List[str]]): The permissions of the caller creating the token. Used for 

412 scope containment validation to ensure the new token cannot have broader permissions than the caller. 

413 is_active (bool): Whether the token should be created as active (default is True). 

414 

415 Returns: 

416 tuple[EmailApiToken, str]: A tuple where the first element is the `EmailApiToken` database record and 

417 the second element is the raw JWT token string. The `EmailApiToken` contains the database record with the 

418 token details. 

419 

420 Raises: 

421 ValueError: If any of the following validation checks fail: 

422 - The `user_email` does not correspond to an existing user. 

423 - The `team_id` is missing or the user is not an active member of the specified team. 

424 - A token with the same name already exists for the given user and team. 

425 - Invalid token configuration (e.g., invalid expiration date). 

426 

427 Examples: 

428 >>> # This method requires database operations, shown for reference 

429 >>> service = TokenCatalogService(None) # Would use real DB session 

430 >>> # token, raw_token = await service.create_token(...) 

431 >>> # Returns (EmailApiToken, raw_token_string) tuple 

432 """ 

433 # # Enforce team-level scoping requirement 

434 # if not team_id: 

435 # raise ValueError("team_id is required for token creation. " "Please select a specific team before creating a token. " "You cannot create tokens while viewing 'All Teams'.") 

436 

437 # Validate user exists 

438 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none() 

439 

440 if not user: 

441 raise ValueError(f"User not found: {user_email}") 

442 

443 # Validate scope containment (fail-secure if no caller_permissions) 

444 if scope and scope.permissions: 

445 self._validate_scope_containment(scope.permissions, caller_permissions) 

446 

447 # Validate team exists and user is active member 

448 if team_id: 

449 # First-Party 

450 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel 

451 

452 # Check if team exists 

453 team = self.db.execute(select(EmailTeam).where(EmailTeam.id == team_id)).scalar_one_or_none() 

454 

455 if not team: 

456 raise ValueError(f"Team not found: {team_id}") 

457 

458 # Verify user is an active member of the team 

459 membership = self.db.execute( 

460 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True))) 

461 ).scalar_one_or_none() 

462 

463 if not membership: 

464 raise ValueError(f"User {user_email} is not an active member of team {team_id}. Only team members can create tokens for the team.") 

465 

466 # Check for duplicate active token name for this user within the same team scope, 

467 # matching DB constraint uq_email_api_tokens_user_name_team (user_email, name, team_id). 

468 # team_id=None tokens are scoped to the global (no-team) bucket. 

469 if team_id: 

470 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id == team_id, EmailApiToken.is_active.is_(True)) 

471 else: 

472 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id.is_(None), EmailApiToken.is_active.is_(True)) 

473 existing_token = self.db.execute(select(EmailApiToken).where(name_check)).scalar_one_or_none() 

474 

475 if existing_token: 

476 scope_label = f"team '{team_id}'" if team_id else "the global scope (no team)" 

477 raise ValueError(f"Token with name '{name}' already exists for user {user_email} in {scope_label}. Token names must be unique per user per team. Please choose a different name.") 

478 

479 # CALCULATE EXPIRATION DATE 

480 expires_at = None 

481 if expires_in_days: 

482 expires_at = utc_now() + timedelta(days=expires_in_days) 

483 

484 # Enforce expiration requirement if configured 

485 if settings.require_token_expiration and not expires_at: 

486 raise ValueError("Token expiration is required by server policy (REQUIRE_TOKEN_EXPIRATION=true). Please specify an expiration date for the token.") 

487 

488 jti = str(uuid.uuid4()) # Unique JWT ID 

489 # Generate JWT token with all necessary claims 

490 raw_token = await self._generate_token(user_email=user_email, jti=jti, team_id=team_id, expires_at=expires_at, scope=scope, user=user) # Pass user object to include admin status 

491 

492 # Hash token for secure storage 

493 token_hash = self._hash_token(raw_token) 

494 

495 # Create database record 

496 api_token = EmailApiToken( 

497 id=str(uuid.uuid4()), 

498 user_email=user_email, 

499 team_id=team_id, # Store team association 

500 name=name, 

501 jti=jti, 

502 description=description, 

503 token_hash=token_hash, # Store hash, not raw token 

504 expires_at=expires_at, 

505 tags=tags or [], 

506 # Store scoping information 

507 server_id=scope.server_id if scope else None, 

508 resource_scopes=scope.permissions if scope else [], 

509 ip_restrictions=scope.ip_restrictions if scope else [], 

510 time_restrictions=scope.time_restrictions if scope else {}, 

511 usage_limits=scope.usage_limits if scope else {}, 

512 # Token status 

513 is_active=is_active, 

514 created_at=utc_now(), 

515 last_used=None, 

516 ) 

517 

518 self.db.add(api_token) 

519 self.db.commit() 

520 self.db.refresh(api_token) 

521 

522 token_type = f"team-scoped (team: {team_id})" if team_id else "public-only" 

523 logger.info(f"Created {token_type} API token '{name}' for user {SecurityValidator.sanitize_log_message(user_email)}. Token ID: {api_token.id}, Expires: {expires_at or 'Never'}") 

524 return api_token, raw_token 

525 

526 async def count_user_tokens(self, user_email: str, include_inactive: bool = False) -> int: 

527 """Count API tokens for a user. 

528 

529 Args: 

530 user_email: User's email address 

531 include_inactive: Include inactive/expired tokens 

532 

533 Returns: 

534 int: Total number of matching tokens 

535 """ 

536 # pylint: disable=not-callable 

537 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.user_email == user_email) 

538 

539 if not include_inactive: 

540 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

541 

542 result = self.db.execute(query) 

543 return result.scalar() or 0 

544 

545 async def get_user_team_ids(self, user_email: str) -> List[str]: 

546 """Get all team IDs the user is a member of. 

547 

548 Uses TeamManagementService.get_user_teams which is cached and consistent 

549 with how other services (servers, tools, resources) resolve team visibility. 

550 

551 Args: 

552 user_email: User's email address 

553 

554 Returns: 

555 List[str]: Team IDs the user belongs to 

556 """ 

557 # First-Party 

558 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

559 

560 team_service = TeamManagementService(self.db) 

561 user_teams = await team_service.get_user_teams(user_email) 

562 return [team.id for team in user_teams] 

563 

564 async def count_user_and_team_tokens(self, user_email: str, include_inactive: bool = False) -> int: 

565 """Count API tokens for a user plus team tokens from teams the user belongs to. 

566 

567 This combines personal tokens (created by the user) with team-scoped tokens 

568 from all teams where the user is an active member. 

569 

570 Args: 

571 user_email: User's email address 

572 include_inactive: Include inactive/expired tokens 

573 

574 Returns: 

575 int: Total number of matching tokens 

576 """ 

577 team_ids = await self.get_user_team_ids(user_email) 

578 

579 # Build query: tokens created by user OR tokens in user's teams 

580 conditions = [EmailApiToken.user_email == user_email] 

581 if team_ids: 

582 conditions.append(EmailApiToken.team_id.in_(team_ids)) 

583 

584 # pylint: disable=not-callable 

585 query = select(func.count(EmailApiToken.id)).where(or_(*conditions)) 

586 

587 if not include_inactive: 

588 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

589 

590 result = self.db.execute(query) 

591 return result.scalar() or 0 

592 

593 async def count_team_tokens(self, team_id: str, include_inactive: bool = False) -> int: 

594 """Count API tokens for a team. 

595 

596 Args: 

597 team_id: Team ID to count tokens for 

598 include_inactive: Include inactive/expired tokens 

599 

600 Returns: 

601 int: Total number of matching tokens 

602 """ 

603 # pylint: disable=not-callable 

604 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.team_id == team_id) 

605 

606 if not include_inactive: 

607 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

608 

609 result = self.db.execute(query) 

610 return result.scalar() or 0 

611 

612 async def list_user_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

613 """List API tokens for a user. 

614 

615 Args: 

616 user_email: User's email address 

617 include_inactive: Include inactive/expired tokens 

618 limit: Maximum tokens to return 

619 offset: Number of tokens to skip 

620 

621 Returns: 

622 List[EmailApiToken]: User's API tokens 

623 

624 Examples: 

625 >>> service = TokenCatalogService(None) # Would use real DB session 

626 >>> # Returns List[EmailApiToken] for user 

627 """ 

628 # Validate parameters 

629 if limit <= 0 or limit > 1000: 

630 limit = 50 # Use default 

631 offset = max(offset, 0) # Use default 

632 query = select(EmailApiToken).where(EmailApiToken.user_email == user_email) 

633 

634 if not include_inactive: 

635 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

636 

637 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

638 

639 result = self.db.execute(query) 

640 return result.scalars().all() 

641 

642 async def list_team_tokens(self, team_id: str, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

643 """List API tokens for a team (accessible by any active team member). 

644 

645 Args: 

646 team_id: Team ID to list tokens for 

647 user_email: User's email (must be an active member of the team) 

648 include_inactive: Include inactive/expired tokens 

649 limit: Maximum tokens to return 

650 offset: Number of tokens to skip 

651 

652 Returns: 

653 List[EmailApiToken]: Team's API tokens 

654 

655 Raises: 

656 ValueError: If user is not an active member of the team 

657 """ 

658 team_ids = await self.get_user_team_ids(user_email) 

659 

660 if team_id not in team_ids: 

661 raise ValueError(f"User {user_email} is not an active member of team {team_id}") 

662 

663 # Validate parameters 

664 if limit <= 0 or limit > 1000: 

665 limit = 50 

666 offset = max(offset, 0) 

667 

668 query = select(EmailApiToken).where(EmailApiToken.team_id == team_id) 

669 

670 if not include_inactive: 

671 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

672 

673 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

674 result = self.db.execute(query) 

675 return result.scalars().all() 

676 

677 async def list_user_and_team_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

678 """List API tokens for a user plus team tokens from teams the user belongs to. 

679 

680 This combines personal tokens (created by the user) with team-scoped tokens 

681 from all teams where the user is an active member. 

682 

683 Args: 

684 user_email: User's email address 

685 include_inactive: Include inactive/expired tokens 

686 limit: Maximum tokens to return 

687 offset: Number of tokens to skip 

688 

689 Returns: 

690 List[EmailApiToken]: Combined list of user's personal tokens and team tokens 

691 

692 Examples: 

693 >>> service = TokenCatalogService(None) # Would use real DB session 

694 >>> # Returns List[EmailApiToken] including personal and team tokens 

695 """ 

696 # Validate parameters 

697 if limit <= 0 or limit > 1000: 

698 limit = 50 

699 offset = max(offset, 0) 

700 

701 team_ids = await self.get_user_team_ids(user_email) 

702 

703 # Build query: tokens created by user OR tokens in user's teams 

704 conditions = [EmailApiToken.user_email == user_email] 

705 if team_ids: 

706 conditions.append(EmailApiToken.team_id.in_(team_ids)) 

707 

708 query = select(EmailApiToken).where(or_(*conditions)) 

709 

710 if not include_inactive: 

711 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

712 

713 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

714 

715 result = self.db.execute(query) 

716 return result.scalars().all() 

717 

718 async def get_token(self, token_id: str, user_email: Optional[str] = None) -> Optional[EmailApiToken]: 

719 """Get a specific token by ID. 

720 

721 Args: 

722 token_id: Token ID 

723 user_email: Optional user email filter for security 

724 

725 Returns: 

726 Optional[EmailApiToken]: Token if found and authorized 

727 

728 Examples: 

729 >>> service = TokenCatalogService(None) # Would use real DB session 

730 >>> # Returns Optional[EmailApiToken] if found and authorized 

731 """ 

732 query = select(EmailApiToken).where(EmailApiToken.id == token_id) 

733 

734 if user_email: 

735 query = query.where(EmailApiToken.user_email == user_email) 

736 

737 result = self.db.execute(query) 

738 return result.scalar_one_or_none() 

739 

740 async def update_token( 

741 self, 

742 token_id: str, 

743 user_email: str, 

744 name: Optional[str] = None, 

745 description: Optional[str] = None, 

746 scope: Optional[TokenScope] = None, 

747 tags: Optional[List[str]] = None, 

748 caller_permissions: Optional[List[str]] = None, 

749 is_active: Optional[bool] = None, 

750 ) -> Optional[EmailApiToken]: 

751 """Update an existing token with scope containment validation. 

752 

753 Args: 

754 token_id: Token ID to update 

755 user_email: Owner's email for security 

756 name: New token name 

757 description: New description 

758 scope: New scoping configuration 

759 tags: New tags 

760 caller_permissions: Caller's effective permissions for scope containment 

761 is_active: New token active status 

762 

763 Returns: 

764 Optional[EmailApiToken]: Updated token if found 

765 

766 Raises: 

767 ValueError: If token not found or name conflicts 

768 

769 Examples: 

770 >>> service = TokenCatalogService(None) # Would use real DB session 

771 >>> # Returns Optional[EmailApiToken] if updated successfully 

772 """ 

773 token = await self.get_token(token_id, user_email) 

774 if not token: 

775 raise ValueError("Token not found or not authorized") 

776 

777 # Validate scope containment for scope changes 

778 if scope and scope.permissions: 

779 self._validate_scope_containment(scope.permissions, caller_permissions) 

780 

781 # Check for duplicate name if changing 

782 if name and name != token.name: 

783 existing = self.db.execute( 

784 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.id != token_id, EmailApiToken.is_active.is_(True))) 

785 ).scalar_one_or_none() 

786 

787 if existing: 

788 raise ValueError(f"Token name '{name}' already exists") 

789 

790 token.name = name 

791 

792 if description is not None: 

793 token.description = description 

794 

795 if tags is not None: 

796 token.tags = tags 

797 

798 if is_active is not None: 

799 token.is_active = is_active 

800 

801 if scope: 

802 token.server_id = scope.server_id 

803 token.resource_scopes = scope.permissions 

804 token.ip_restrictions = scope.ip_restrictions 

805 token.time_restrictions = scope.time_restrictions 

806 token.usage_limits = scope.usage_limits 

807 

808 self.db.commit() 

809 self.db.refresh(token) 

810 

811 logger.info(f"Updated token '{token.name}' for user {SecurityValidator.sanitize_log_message(user_email)}") 

812 

813 return token 

814 

815 async def revoke_token(self, token_id: str, user_email: str, revoked_by: str, reason: Optional[str] = None) -> bool: 

816 """Revoke a token owned by the specified user or in a team the user belongs to. 

817 

818 Args: 

819 token_id: Token ID to revoke 

820 user_email: Caller's email - must own the token or be a member of the token's team 

821 revoked_by: Email of user performing revocation (for audit) 

822 reason: Optional reason for revocation 

823 

824 Returns: 

825 bool: True if token was revoked, False if not found or not authorized 

826 

827 Examples: 

828 >>> service = TokenCatalogService(None) # Would use real DB session 

829 >>> # Returns bool: True if token was revoked successfully 

830 """ 

831 # First try ownership match 

832 token = await self.get_token(token_id, user_email) 

833 

834 # If not owned by caller, check if token is in a team the caller is an owner of 

835 if not token: 

836 token = await self.get_token(token_id) 

837 if not token or not token.team_id: 

838 return False 

839 # Only team owners (admins) can revoke other members' team tokens 

840 # First-Party 

841 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

842 

843 team_service = TeamManagementService(self.db) 

844 role = await team_service.get_user_role_in_team(user_email, token.team_id) 

845 if role != "owner": 

846 return False 

847 

848 # Mark token as inactive 

849 token.is_active = False 

850 

851 # Add to blacklist 

852 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason) 

853 

854 self.db.add(revocation) 

855 self.db.commit() 

856 

857 # Invalidate auth cache synchronously so revoked tokens are rejected immediately 

858 # (fire-and-forget via create_task risks a race where the next request arrives 

859 # before the invalidation task runs, allowing the revoked token through). 

860 try: 

861 # First-Party 

862 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

863 

864 await auth_cache.invalidate_revocation(token.jti) 

865 except Exception as cache_error: 

866 logger.debug(f"Failed to invalidate auth cache for revoked token: {cache_error}") 

867 

868 logger.info(f"Revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}") 

869 

870 return True 

871 

872 async def admin_revoke_token(self, token_id: str, revoked_by: str, reason: Optional[str] = None) -> bool: 

873 """Admin-only: Revoke any token without ownership check. 

874 

875 WARNING: This method bypasses ownership verification. 

876 Only call from admin-authenticated endpoints. 

877 

878 Args: 

879 token_id: Token ID to revoke 

880 revoked_by: Admin email for audit 

881 reason: Revocation reason 

882 

883 Returns: 

884 bool: True if token was revoked, False if not found 

885 

886 Examples: 

887 >>> service = TokenCatalogService(None) # Would use real DB session 

888 >>> # Returns bool: True if token was revoked successfully 

889 """ 

890 # No user filter - admin can revoke any token 

891 token = await self.get_token(token_id) 

892 if not token: 

893 return False 

894 

895 token.is_active = False 

896 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason) 

897 self.db.add(revocation) 

898 self.db.commit() 

899 

900 try: 

901 # First-Party 

902 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

903 

904 await auth_cache.invalidate_revocation(token.jti) 

905 except Exception as cache_error: 

906 logger.debug(f"Failed to invalidate auth cache: {cache_error}") 

907 

908 logger.info(f"Admin revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}") 

909 return True 

910 

911 async def is_token_revoked(self, jti: str) -> bool: 

912 """Check if a token JTI is revoked. 

913 

914 Args: 

915 jti: JWT ID to check 

916 

917 Returns: 

918 bool: True if token is revoked 

919 

920 Examples: 

921 >>> service = TokenCatalogService(None) # Would use real DB session 

922 >>> # Returns bool: True if token is revoked 

923 """ 

924 revocation = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)).scalar_one_or_none() 

925 

926 return revocation is not None 

927 

928 async def log_token_usage( 

929 self, 

930 jti: str, 

931 user_email: str, 

932 endpoint: Optional[str] = None, 

933 method: Optional[str] = None, 

934 ip_address: Optional[str] = None, 

935 user_agent: Optional[str] = None, 

936 status_code: Optional[int] = None, 

937 response_time_ms: Optional[int] = None, 

938 blocked: bool = False, 

939 block_reason: Optional[str] = None, 

940 ) -> None: 

941 """Log token usage for analytics and security. 

942 

943 Args: 

944 jti: JWT ID of token used 

945 user_email: Token owner's email 

946 endpoint: API endpoint accessed 

947 method: HTTP method 

948 ip_address: Client IP address 

949 user_agent: Client user agent 

950 status_code: HTTP response status 

951 response_time_ms: Response time in milliseconds 

952 blocked: Whether request was blocked 

953 block_reason: Reason for blocking 

954 

955 Examples: 

956 >>> service = TokenCatalogService(None) # Would use real DB session 

957 >>> # Logs token usage for analytics - no return value 

958 """ 

959 usage_log = TokenUsageLog( 

960 token_jti=jti, 

961 user_email=user_email, 

962 endpoint=endpoint, 

963 method=method, 

964 ip_address=ip_address, 

965 user_agent=user_agent, 

966 status_code=status_code, 

967 response_time_ms=response_time_ms, 

968 blocked=blocked, 

969 block_reason=block_reason, 

970 ) 

971 

972 self.db.add(usage_log) 

973 self.db.commit() 

974 

975 async def get_token_usage_stats(self, user_email: str, token_id: Optional[str] = None, days: int = 30) -> dict: 

976 """Get token usage statistics. 

977 

978 Args: 

979 user_email: User's email address 

980 token_id: Optional specific token ID 

981 days: Number of days to analyze 

982 

983 Returns: 

984 dict: Usage statistics 

985 

986 Examples: 

987 >>> service = TokenCatalogService(None) # Would use real DB session 

988 >>> # Returns dict with usage statistics 

989 """ 

990 start_date = utc_now() - timedelta(days=days) 

991 

992 # Get token JTI if specific token requested 

993 token_jti = None 

994 if token_id: 

995 token = await self.get_token(token_id, user_email) 

996 if token: 

997 token_jti = token.jti 

998 

999 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite 

1000 dialect_name = self.db.get_bind().dialect.name 

1001 if dialect_name == "postgresql": 

1002 return await self._get_usage_stats_postgresql(user_email, start_date, token_jti, days) 

1003 return await self._get_usage_stats_python(user_email, start_date, token_jti, days) 

1004 

1005 async def _get_usage_stats_postgresql(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict: 

1006 """Compute usage stats using PostgreSQL SQL aggregation. 

1007 

1008 Args: 

1009 user_email: User's email address 

1010 start_date: Start date for analysis 

1011 token_jti: Optional token JTI filter 

1012 days: Number of days being analyzed 

1013 

1014 Returns: 

1015 dict: Usage statistics computed via SQL 

1016 """ 

1017 # Build filter conditions 

1018 conditions = [TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date] 

1019 if token_jti: 

1020 conditions.append(TokenUsageLog.token_jti == token_jti) 

1021 

1022 base_filter = and_(*conditions) 

1023 

1024 # Main stats query using SQL aggregation 

1025 # Match Python behavior: 

1026 # - status_code must be non-null AND non-zero AND < 400 for success count 

1027 # - response_time_ms must be non-null AND non-zero for average (Python: if log.response_time_ms) 

1028 stats_query = ( 

1029 select( 

1030 func.count().label("total"), # pylint: disable=not-callable 

1031 func.sum( 

1032 case( 

1033 (and_(TokenUsageLog.status_code.isnot(None), TokenUsageLog.status_code > 0, TokenUsageLog.status_code < 400), 1), 

1034 else_=0, 

1035 ) 

1036 ).label("successful"), 

1037 func.sum(case((TokenUsageLog.blocked.is_(True), 1), else_=0)).label("blocked"), 

1038 # Only average non-null and non-zero response times (NULL values are ignored by AVG) 

1039 func.avg( 

1040 case( 

1041 (and_(TokenUsageLog.response_time_ms.isnot(None), TokenUsageLog.response_time_ms > 0), TokenUsageLog.response_time_ms), 

1042 else_=None, 

1043 ) 

1044 ).label("avg_response"), 

1045 ) 

1046 .select_from(TokenUsageLog) 

1047 .where(base_filter) 

1048 ) 

1049 

1050 result = self.db.execute(stats_query).fetchone() 

1051 

1052 total_requests = result.total or 0 

1053 successful_requests = result.successful or 0 

1054 blocked_requests = result.blocked or 0 

1055 avg_response_time = float(result.avg_response) if result.avg_response else 0.0 

1056 

1057 # Top endpoints query using SQL GROUP BY 

1058 # Match Python behavior: exclude None AND empty string endpoints (Python: if log.endpoint) 

1059 endpoints_query = ( 

1060 select(TokenUsageLog.endpoint, func.count().label("count")) # pylint: disable=not-callable 

1061 .where(and_(base_filter, TokenUsageLog.endpoint.isnot(None), TokenUsageLog.endpoint != "")) 

1062 .group_by(TokenUsageLog.endpoint) 

1063 .order_by(func.count().desc()) # pylint: disable=not-callable 

1064 .limit(5) 

1065 ) 

1066 

1067 endpoints_result = self.db.execute(endpoints_query).fetchall() 

1068 top_endpoints = [(row.endpoint, row.count) for row in endpoints_result] 

1069 

1070 return { 

1071 "period_days": days, 

1072 "total_requests": total_requests, 

1073 "successful_requests": successful_requests, 

1074 "blocked_requests": blocked_requests, 

1075 "success_rate": successful_requests / total_requests if total_requests > 0 else 0, 

1076 "average_response_time_ms": round(avg_response_time, 2), 

1077 "top_endpoints": top_endpoints, 

1078 } 

1079 

1080 async def _get_usage_stats_python(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict: 

1081 """Compute usage stats using Python (fallback for SQLite). 

1082 

1083 Args: 

1084 user_email: User's email address 

1085 start_date: Start date for analysis 

1086 token_jti: Optional token JTI filter 

1087 days: Number of days being analyzed 

1088 

1089 Returns: 

1090 dict: Usage statistics computed in Python 

1091 """ 

1092 query = select(TokenUsageLog).where(and_(TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date)) 

1093 

1094 if token_jti: 

1095 query = query.where(TokenUsageLog.token_jti == token_jti) 

1096 

1097 usage_logs = self.db.execute(query).scalars().all() 

1098 

1099 # Calculate statistics 

1100 total_requests = len(usage_logs) 

1101 successful_requests = sum(1 for log in usage_logs if log.status_code and log.status_code < 400) 

1102 blocked_requests = sum(1 for log in usage_logs if log.blocked) 

1103 

1104 # Average response time 

1105 response_times = [log.response_time_ms for log in usage_logs if log.response_time_ms] 

1106 avg_response_time = sum(response_times) / len(response_times) if response_times else 0 

1107 

1108 # Most accessed endpoints 

1109 endpoint_counts: dict = {} 

1110 for log in usage_logs: 

1111 if log.endpoint: 

1112 endpoint_counts[log.endpoint] = endpoint_counts.get(log.endpoint, 0) + 1 

1113 

1114 top_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:5] 

1115 

1116 return { 

1117 "period_days": days, 

1118 "total_requests": total_requests, 

1119 "successful_requests": successful_requests, 

1120 "blocked_requests": blocked_requests, 

1121 "success_rate": successful_requests / total_requests if total_requests > 0 else 0, 

1122 "average_response_time_ms": round(avg_response_time, 2), 

1123 "top_endpoints": top_endpoints, 

1124 } 

1125 

1126 async def get_token_revocation(self, jti: str) -> Optional[TokenRevocation]: 

1127 """Get token revocation information by JTI. 

1128 

1129 Args: 

1130 jti: JWT token ID 

1131 

1132 Returns: 

1133 Optional[TokenRevocation]: Revocation info if token is revoked 

1134 

1135 Examples: 

1136 >>> service = TokenCatalogService(None) # Would use real DB session 

1137 >>> # Returns Optional[TokenRevocation] if token is revoked 

1138 """ 

1139 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

1140 return result.scalar_one_or_none() 

1141 

1142 async def get_token_revocations_batch(self, jtis: List[str]) -> Dict[str, TokenRevocation]: 

1143 """Get token revocation information for multiple JTIs in a single query. 

1144 

1145 Args: 

1146 jtis: List of JWT token IDs 

1147 

1148 Returns: 

1149 Dict mapping JTI to TokenRevocation for revoked tokens only. 

1150 """ 

1151 if not jtis: 

1152 return {} 

1153 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti.in_(jtis))) 

1154 return {rev.jti: rev for rev in result.scalars().all()} 

1155 

1156 async def list_all_tokens(self, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

1157 """List all API tokens (admin only). 

1158 

1159 Args: 

1160 include_inactive: Include inactive/expired tokens 

1161 limit: Maximum tokens to return 

1162 offset: Number of tokens to skip 

1163 

1164 Returns: 

1165 List[EmailApiToken]: All API tokens 

1166 """ 

1167 if limit <= 0 or limit > 1000: 

1168 limit = 50 

1169 offset = max(offset, 0) 

1170 

1171 query = select(EmailApiToken) 

1172 

1173 if not include_inactive: 

1174 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

1175 

1176 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

1177 

1178 result = self.db.execute(query) 

1179 return result.scalars().all() 

1180 

1181 async def count_all_tokens(self, include_inactive: bool = False) -> int: 

1182 """Count all API tokens (admin only). 

1183 

1184 Args: 

1185 include_inactive: Include inactive/expired tokens in count 

1186 

1187 Returns: 

1188 int: Total count of all tokens 

1189 """ 

1190 query = select(func.count(EmailApiToken.id)) # pylint: disable=not-callable 

1191 

1192 if not include_inactive: 

1193 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

1194 

1195 result = self.db.execute(query) 

1196 return result.scalar() or 0 

1197 

1198 async def cleanup_expired_tokens(self) -> int: 

1199 """Clean up expired tokens using bulk UPDATE. 

1200 

1201 Uses a single SQL UPDATE statement instead of loading tokens into memory 

1202 and updating them one by one. This is more efficient and avoids memory 

1203 issues when many tokens expire at once. 

1204 

1205 Returns: 

1206 int: Number of tokens cleaned up 

1207 

1208 Examples: 

1209 >>> service = TokenCatalogService(None) # Would use real DB session 

1210 >>> # Returns int: Number of tokens cleaned up 

1211 """ 

1212 try: 

1213 now = utc_now() 

1214 count = self.db.query(EmailApiToken).filter(EmailApiToken.expires_at < now, EmailApiToken.is_active.is_(True)).update({"is_active": False}, synchronize_session=False) 

1215 

1216 self.db.commit() 

1217 

1218 if count > 0: 

1219 logger.info(f"Cleaned up {count} expired tokens") 

1220 

1221 return count 

1222 

1223 except Exception as e: 

1224 self.db.rollback() 

1225 logger.error(f"Failed to cleanup expired tokens: {e}") 

1226 return 0