Coverage for mcpgateway / services / token_catalog_service.py: 99%

328 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/token_catalog_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Token Catalog Service. 

8This module provides comprehensive API token management with scoping, 

9revocation, usage tracking, and analytics for email-based users. 

10 

11Examples: 

12 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService 

13 >>> service = TokenCatalogService(None) # Mock database for doctest 

14 >>> # Service provides full token lifecycle management 

15""" 

16 

17# Standard 

18import asyncio 

19from datetime import datetime, timedelta, timezone 

20import hashlib 

21import math 

22from typing import Dict, List, Optional 

23import uuid 

24 

25# Third-Party 

26from sqlalchemy import and_, case, func, or_, select 

27from sqlalchemy.orm import Session 

28 

29# First-Party 

30from mcpgateway.config import settings 

31from mcpgateway.db import EmailApiToken, EmailUser, Permissions, TokenRevocation, TokenUsageLog, utc_now 

32from mcpgateway.services.logging_service import LoggingService 

33from mcpgateway.utils.create_jwt_token import create_jwt_token 

34 

35# Initialize logging 

36logging_service = LoggingService() 

37logger = logging_service.get_logger(__name__) 

38 

39# Strong references to background tasks to prevent GC before completion 

40_background_tasks: set[asyncio.Task] = set() 

41 

42 

43class TokenScope: 

44 """Token scoping configuration for fine-grained access control. 

45 

46 This class encapsulates token scoping parameters including 

47 server restrictions, permissions, IP limitations, and usage quotas. 

48 

49 Attributes: 

50 server_id (Optional[str]): Limit token to specific server 

51 permissions (List[str]): Specific permission scopes 

52 ip_restrictions (List[str]): IP address/CIDR restrictions 

53 time_restrictions (dict): Time-based access limitations 

54 usage_limits (dict): Rate limiting and quota settings 

55 

56 Examples: 

57 >>> scope = TokenScope( 

58 ... server_id="prod-server-123", 

59 ... permissions=["tools.read", "resources.read"], 

60 ... ip_restrictions=["192.168.1.0/24"], 

61 ... time_restrictions={"business_hours_only": True} 

62 ... ) 

63 >>> scope.is_server_scoped() 

64 True 

65 >>> scope.has_permission("tools.read") 

66 True 

67 >>> scope.has_permission("tools.write") 

68 False 

69 >>> scope.has_permission("resources.read") 

70 True 

71 >>> 

72 >>> # Test empty scope 

73 >>> empty_scope = TokenScope() 

74 >>> empty_scope.is_server_scoped() 

75 False 

76 >>> empty_scope.has_permission("anything") 

77 False 

78 >>> 

79 >>> # Test global scope 

80 >>> global_scope = TokenScope(permissions=["*"]) 

81 >>> global_scope.has_permission("*") 

82 True 

83 """ 

84 

85 def __init__( 

86 self, 

87 server_id: Optional[str] = None, 

88 permissions: Optional[List[str]] = None, 

89 ip_restrictions: Optional[List[str]] = None, 

90 time_restrictions: Optional[dict] = None, 

91 usage_limits: Optional[dict] = None, 

92 ): 

93 """Initialize TokenScope with specified restrictions and limits. 

94 

95 Args: 

96 server_id: Optional server ID to scope token to specific server 

97 permissions: List of permissions granted to this token 

98 ip_restrictions: List of IP addresses/ranges allowed to use token 

99 time_restrictions: Dictionary of time-based access restrictions 

100 usage_limits: Dictionary of usage limits for the token 

101 """ 

102 self.server_id = server_id 

103 self.permissions = permissions or [] 

104 self.ip_restrictions = ip_restrictions or [] 

105 self.time_restrictions = time_restrictions or {} 

106 self.usage_limits = usage_limits or {} 

107 

108 def is_server_scoped(self) -> bool: 

109 """Check if token is scoped to a specific server. 

110 

111 Returns: 

112 bool: True if scoped to a server, False otherwise. 

113 """ 

114 return self.server_id is not None 

115 

116 def has_permission(self, permission: str) -> bool: 

117 """Check if scope includes specific permission. 

118 

119 Args: 

120 permission: Permission string to check for. 

121 

122 Returns: 

123 bool: True if permission is included, False otherwise. 

124 """ 

125 return permission in self.permissions 

126 

127 def to_dict(self) -> dict: 

128 """Convert scope to dictionary for JSON storage. 

129 

130 Returns: 

131 dict: Dictionary representation of the token scope. 

132 

133 Examples: 

134 >>> scope = TokenScope(server_id="server-123", permissions=["read", "write"]) 

135 >>> result = scope.to_dict() 

136 >>> result["server_id"] 

137 'server-123' 

138 >>> result["permissions"] 

139 ['read', 'write'] 

140 >>> isinstance(result, dict) 

141 True 

142 """ 

143 return {"server_id": self.server_id, "permissions": self.permissions, "ip_restrictions": self.ip_restrictions, "time_restrictions": self.time_restrictions, "usage_limits": self.usage_limits} 

144 

145 @classmethod 

146 def from_dict(cls, data: dict) -> "TokenScope": 

147 """Create TokenScope from dictionary. 

148 

149 Args: 

150 data: Dictionary containing scope configuration. 

151 

152 Returns: 

153 TokenScope: New TokenScope instance. 

154 

155 Examples: 

156 >>> data = { 

157 ... "server_id": "server-456", 

158 ... "permissions": ["tools.read", "tools.execute"], 

159 ... "ip_restrictions": ["10.0.0.0/8"] 

160 ... } 

161 >>> scope = TokenScope.from_dict(data) 

162 >>> scope.server_id 

163 'server-456' 

164 >>> scope.permissions 

165 ['tools.read', 'tools.execute'] 

166 >>> scope.is_server_scoped() 

167 True 

168 >>> scope.has_permission("tools.read") 

169 True 

170 >>> 

171 >>> # Test empty dict 

172 >>> empty_scope = TokenScope.from_dict({}) 

173 >>> empty_scope.server_id is None 

174 True 

175 >>> empty_scope.permissions 

176 [] 

177 """ 

178 return cls( 

179 server_id=data.get("server_id"), 

180 permissions=data.get("permissions", []), 

181 ip_restrictions=data.get("ip_restrictions", []), 

182 time_restrictions=data.get("time_restrictions", {}), 

183 usage_limits=data.get("usage_limits", {}), 

184 ) 

185 

186 

187class TokenCatalogService: 

188 """Service for managing user API token catalogs. 

189 

190 This service provides comprehensive token lifecycle management including 

191 creation, scoping, revocation, usage tracking, and analytics. It handles 

192 JWT-based API tokens with fine-grained access control, team support, 

193 and comprehensive audit logging. 

194 

195 Key features: 

196 - Token creation with customizable scopes and permissions 

197 - Team-based token management with role-based access 

198 - Token revocation and blacklisting 

199 - Usage tracking and analytics 

200 - IP and time-based restrictions 

201 - Automatic cleanup of expired tokens 

202 

203 Attributes: 

204 db (Session): SQLAlchemy database session for token operations 

205 

206 Examples: 

207 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService 

208 >>> service = TokenCatalogService(None) # Mock database for doctest 

209 >>> service.db is None 

210 True 

211 """ 

212 

213 def __init__(self, db: Session): 

214 """Initialize TokenCatalogService with database session. 

215 

216 Args: 

217 db: SQLAlchemy database session for token operations 

218 """ 

219 self.db = db 

220 

221 async def _generate_token( 

222 self, user_email: str, jti: str, team_id: Optional[str] = None, expires_at: Optional[datetime] = None, scope: Optional["TokenScope"] = None, user: Optional[object] = None 

223 ) -> str: 

224 """Generate a JWT token for API access. 

225 

226 This internal method creates a properly formatted JWT token with all 

227 necessary claims including user identity, scopes, team membership, 

228 and expiration. The token follows ContextForge JWT structure. 

229 

230 Args: 

231 user_email: User's email address for the token subject 

232 jti: JWT ID for token uniqueness 

233 team_id: Optional team ID for team-scoped tokens 

234 expires_at: Optional expiration datetime 

235 scope: Optional token scope information for access control 

236 user: Optional user object to extract admin privileges 

237 

238 Returns: 

239 str: Signed JWT token string ready for API authentication 

240 

241 Raises: 

242 ValueError: If expires_at is in the past (cannot create already-expired tokens) 

243 

244 Note: 

245 This is an internal method. Use create_token() to generate 

246 tokens with proper database tracking and validation. 

247 """ 

248 # Calculate expiration in minutes from expires_at 

249 expires_in_minutes = 0 

250 if expires_at: 

251 now = datetime.now(timezone.utc) 

252 delta = expires_at - now 

253 delta_seconds = delta.total_seconds() 

254 

255 # Guard: reject already-expired expiration times 

256 if delta_seconds <= 0: 

257 raise ValueError("Token expiration time is in the past. Cannot create already-expired tokens.") 

258 

259 # Use ceiling to ensure we always have at least 1 minute expiration 

260 # This prevents <60s from rounding to 0 and creating non-expiring tokens 

261 expires_in_minutes = max(1, math.ceil(delta_seconds / 60)) 

262 

263 # Build user data dict 

264 user_data = { 

265 "email": user_email, 

266 "full_name": "API Token User", 

267 "is_admin": user.is_admin if user else False, 

268 "auth_provider": "api_token", 

269 } 

270 

271 # Build teams list 

272 teams = [team_id] if team_id else [] 

273 

274 # Build scopes dict 

275 # Empty permissions = defer to RBAC at runtime (not wildcard access) 

276 scopes_dict = None 

277 if scope: 

278 scopes_dict = { 

279 "server_id": scope.server_id, 

280 "permissions": scope.permissions if scope.permissions is not None else [], 

281 "ip_restrictions": scope.ip_restrictions or [], 

282 "time_restrictions": scope.time_restrictions or {}, 

283 } 

284 else: 

285 scopes_dict = { 

286 "server_id": None, 

287 "permissions": [], # Empty = inherit from RBAC at runtime 

288 "ip_restrictions": [], 

289 "time_restrictions": {}, 

290 } 

291 

292 # Auto-inject servers.use for tokens with explicit MCP-related permissions. 

293 # Without servers.use, the token scoping middleware blocks /rpc and /mcp 

294 # transport access, making MCP-method permissions useless. 

295 permissions = scopes_dict["permissions"] 

296 if permissions and "*" not in permissions and "servers.use" not in permissions: 

297 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions): 

298 scopes_dict["permissions"] = [*permissions, "servers.use"] 

299 

300 # Generate JWT token using the centralized token creation utility 

301 # Pass structured data to the enhanced create_jwt_token function 

302 return await create_jwt_token( 

303 data={"sub": user_email, "jti": jti, "token_use": "api"}, # nosec B105 - token type marker, not a password 

304 expires_in_minutes=expires_in_minutes, 

305 user_data=user_data, 

306 teams=teams, 

307 scopes=scopes_dict, 

308 ) 

309 

310 def _hash_token(self, token: str) -> str: 

311 """Create secure hash of token for storage. 

312 

313 Args: 

314 token: Raw token string 

315 

316 Returns: 

317 str: SHA-256 hash of token 

318 

319 Examples: 

320 >>> service = TokenCatalogService(None) 

321 >>> hash_val = service._hash_token("test_token") 

322 >>> len(hash_val) == 64 

323 True 

324 """ 

325 return hashlib.sha256(token.encode()).hexdigest() 

326 

327 def _validate_scope_containment( 

328 self, 

329 requested_permissions: Optional[List[str]], 

330 caller_permissions: Optional[List[str]], 

331 ) -> None: 

332 """Validate that requested permissions don't exceed caller's permissions. 

333 

334 SECURITY: This is fail-secure. If caller_permissions is empty/None, 

335 custom scopes are DENIED. Users without explicit permissions can only 

336 create tokens with empty scope (inherit at runtime). 

337 

338 Args: 

339 requested_permissions: Permissions requested for new/updated token 

340 caller_permissions: Caller's effective permissions (RBAC + current token scopes) 

341 

342 Raises: 

343 ValueError: If requested permissions exceed caller's permissions 

344 """ 

345 # No requested permissions = empty scope, always allowed 

346 if not requested_permissions: 

347 return 

348 

349 # FAIL-SECURE: If caller has no permissions, deny any custom scope 

350 if not caller_permissions: 

351 raise ValueError("Cannot specify custom token permissions. " + "You have no explicit permissions to delegate. " + "Create a token without scope to inherit permissions at runtime.") 

352 

353 # Wildcard caller can grant anything 

354 if "*" in caller_permissions: 

355 return 

356 

357 # Wildcard request requires wildcard caller 

358 if "*" in requested_permissions: 

359 raise ValueError("Cannot create token with wildcard permissions. " + "Your effective permissions do not include wildcard access.") 

360 

361 # Check each requested permission 

362 for req_perm in requested_permissions: 

363 if req_perm in caller_permissions: 

364 continue 

365 

366 # Check for category wildcard (e.g., "tools.*" allows "tools.read") 

367 if "." in req_perm: 

368 category = req_perm.split(".")[0] 

369 if f"{category}.*" in caller_permissions: 

370 continue 

371 

372 raise ValueError(f"Cannot grant permission '{req_perm}' - not in your effective permissions.") 

373 

374 async def create_token( 

375 self, 

376 user_email: str, 

377 name: str, 

378 description: Optional[str] = None, 

379 scope: Optional[TokenScope] = None, 

380 expires_in_days: Optional[int] = None, 

381 tags: Optional[List[str]] = None, 

382 team_id: Optional[str] = None, 

383 caller_permissions: Optional[List[str]] = None, 

384 is_active: bool = True, 

385 ) -> tuple[EmailApiToken, str]: 

386 """ 

387 Create a new API token with team-level scoping and additional configurations. 

388 

389 This method generates a JWT-based API token with team-level scoping and optional security configurations, 

390 such as expiration, permissions, IP restrictions, and usage limits. The token is associated with a user 

391 and a specific team, ensuring access control and multi-tenancy support. 

392 

393 The function will: 

394 - Validate the existence of the user. 

395 - Ensure the user is an active member of the specified team. 

396 - Verify that the token name is unique for the user+team combination. 

397 - Generate a JWT with the specified scoping parameters (e.g., permissions, IP, etc.). 

398 - Store the token in the database with the relevant details and return the token and raw JWT string. 

399 

400 Args: 

401 user_email (str): The email address of the user requesting the token. 

402 name (str): A unique, human-readable name for the token (must be unique per user+team). 

403 description (Optional[str]): A description for the token (default is None). 

404 scope (Optional[TokenScope]): The scoping configuration for the token, including permissions, 

405 server ID, IP restrictions, etc. (default is None). 

406 expires_in_days (Optional[int]): The expiration time in days for the token (None means no expiration). 

407 tags (Optional[List[str]]): A list of organizational tags for the token (default is an empty list). 

408 team_id (Optional[str]): The team ID to which the token should be scoped. This is required for team-level scoping. 

409 caller_permissions (Optional[List[str]]): The permissions of the caller creating the token. Used for 

410 scope containment validation to ensure the new token cannot have broader permissions than the caller. 

411 is_active (bool): Whether the token should be created as active (default is True). 

412 

413 Returns: 

414 tuple[EmailApiToken, str]: A tuple where the first element is the `EmailApiToken` database record and 

415 the second element is the raw JWT token string. The `EmailApiToken` contains the database record with the 

416 token details. 

417 

418 Raises: 

419 ValueError: If any of the following validation checks fail: 

420 - The `user_email` does not correspond to an existing user. 

421 - The `team_id` is missing or the user is not an active member of the specified team. 

422 - A token with the same name already exists for the given user and team. 

423 - Invalid token configuration (e.g., invalid expiration date). 

424 

425 Examples: 

426 >>> # This method requires database operations, shown for reference 

427 >>> service = TokenCatalogService(None) # Would use real DB session 

428 >>> # token, raw_token = await service.create_token(...) 

429 >>> # Returns (EmailApiToken, raw_token_string) tuple 

430 """ 

431 # # Enforce team-level scoping requirement 

432 # if not team_id: 

433 # raise ValueError("team_id is required for token creation. " "Please select a specific team before creating a token. " "You cannot create tokens while viewing 'All Teams'.") 

434 

435 # Validate user exists 

436 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none() 

437 

438 if not user: 

439 raise ValueError(f"User not found: {user_email}") 

440 

441 # Validate scope containment (fail-secure if no caller_permissions) 

442 if scope and scope.permissions: 

443 self._validate_scope_containment(scope.permissions, caller_permissions) 

444 

445 # Validate team exists and user is active member 

446 if team_id: 

447 # First-Party 

448 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel 

449 

450 # Check if team exists 

451 team = self.db.execute(select(EmailTeam).where(EmailTeam.id == team_id)).scalar_one_or_none() 

452 

453 if not team: 

454 raise ValueError(f"Team not found: {team_id}") 

455 

456 # Verify user is an active member of the team 

457 membership = self.db.execute( 

458 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True))) 

459 ).scalar_one_or_none() 

460 

461 if not membership: 

462 raise ValueError(f"User {user_email} is not an active member of team {team_id}. Only team members can create tokens for the team.") 

463 

464 # Check for duplicate active token name for this user within the same team scope, 

465 # matching DB constraint uq_email_api_tokens_user_name_team (user_email, name, team_id). 

466 # team_id=None tokens are scoped to the global (no-team) bucket. 

467 if team_id: 

468 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id == team_id, EmailApiToken.is_active.is_(True)) 

469 else: 

470 name_check = and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id.is_(None), EmailApiToken.is_active.is_(True)) 

471 existing_token = self.db.execute(select(EmailApiToken).where(name_check)).scalar_one_or_none() 

472 

473 if existing_token: 

474 scope_label = f"team '{team_id}'" if team_id else "the global scope (no team)" 

475 raise ValueError(f"Token with name '{name}' already exists for user {user_email} in {scope_label}. Token names must be unique per user per team. Please choose a different name.") 

476 

477 # CALCULATE EXPIRATION DATE 

478 expires_at = None 

479 if expires_in_days: 

480 expires_at = utc_now() + timedelta(days=expires_in_days) 

481 

482 # Enforce expiration requirement if configured 

483 if settings.require_token_expiration and not expires_at: 

484 raise ValueError("Token expiration is required by server policy (REQUIRE_TOKEN_EXPIRATION=true). Please specify an expiration date for the token.") 

485 

486 jti = str(uuid.uuid4()) # Unique JWT ID 

487 # Generate JWT token with all necessary claims 

488 raw_token = await self._generate_token(user_email=user_email, jti=jti, team_id=team_id, expires_at=expires_at, scope=scope, user=user) # Pass user object to include admin status 

489 

490 # Hash token for secure storage 

491 token_hash = self._hash_token(raw_token) 

492 

493 # Create database record 

494 api_token = EmailApiToken( 

495 id=str(uuid.uuid4()), 

496 user_email=user_email, 

497 team_id=team_id, # Store team association 

498 name=name, 

499 jti=jti, 

500 description=description, 

501 token_hash=token_hash, # Store hash, not raw token 

502 expires_at=expires_at, 

503 tags=tags or [], 

504 # Store scoping information 

505 server_id=scope.server_id if scope else None, 

506 resource_scopes=scope.permissions if scope else [], 

507 ip_restrictions=scope.ip_restrictions if scope else [], 

508 time_restrictions=scope.time_restrictions if scope else {}, 

509 usage_limits=scope.usage_limits if scope else {}, 

510 # Token status 

511 is_active=is_active, 

512 created_at=utc_now(), 

513 last_used=None, 

514 ) 

515 

516 self.db.add(api_token) 

517 self.db.commit() 

518 self.db.refresh(api_token) 

519 

520 token_type = f"team-scoped (team: {team_id})" if team_id else "public-only" 

521 logger.info(f"Created {token_type} API token '{name}' for user {user_email}. Token ID: {api_token.id}, Expires: {expires_at or 'Never'}") 

522 return api_token, raw_token 

523 

524 async def count_user_tokens(self, user_email: str, include_inactive: bool = False) -> int: 

525 """Count API tokens for a user. 

526 

527 Args: 

528 user_email: User's email address 

529 include_inactive: Include inactive/expired tokens 

530 

531 Returns: 

532 int: Total number of matching tokens 

533 """ 

534 # pylint: disable=not-callable 

535 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.user_email == user_email) 

536 

537 if not include_inactive: 

538 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

539 

540 result = self.db.execute(query) 

541 return result.scalar() or 0 

542 

543 async def get_user_team_ids(self, user_email: str) -> List[str]: 

544 """Get all team IDs the user is a member of. 

545 

546 Uses TeamManagementService.get_user_teams which is cached and consistent 

547 with how other services (servers, tools, resources) resolve team visibility. 

548 

549 Args: 

550 user_email: User's email address 

551 

552 Returns: 

553 List[str]: Team IDs the user belongs to 

554 """ 

555 # First-Party 

556 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

557 

558 team_service = TeamManagementService(self.db) 

559 user_teams = await team_service.get_user_teams(user_email) 

560 return [team.id for team in user_teams] 

561 

562 async def count_user_and_team_tokens(self, user_email: str, include_inactive: bool = False) -> int: 

563 """Count API tokens for a user plus team tokens from teams the user belongs to. 

564 

565 This combines personal tokens (created by the user) with team-scoped tokens 

566 from all teams where the user is an active member. 

567 

568 Args: 

569 user_email: User's email address 

570 include_inactive: Include inactive/expired tokens 

571 

572 Returns: 

573 int: Total number of matching tokens 

574 """ 

575 team_ids = await self.get_user_team_ids(user_email) 

576 

577 # Build query: tokens created by user OR tokens in user's teams 

578 conditions = [EmailApiToken.user_email == user_email] 

579 if team_ids: 

580 conditions.append(EmailApiToken.team_id.in_(team_ids)) 

581 

582 # pylint: disable=not-callable 

583 query = select(func.count(EmailApiToken.id)).where(or_(*conditions)) 

584 

585 if not include_inactive: 

586 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

587 

588 result = self.db.execute(query) 

589 return result.scalar() or 0 

590 

591 async def count_team_tokens(self, team_id: str, include_inactive: bool = False) -> int: 

592 """Count API tokens for a team. 

593 

594 Args: 

595 team_id: Team ID to count tokens for 

596 include_inactive: Include inactive/expired tokens 

597 

598 Returns: 

599 int: Total number of matching tokens 

600 """ 

601 # pylint: disable=not-callable 

602 query = select(func.count(EmailApiToken.id)).where(EmailApiToken.team_id == team_id) 

603 

604 if not include_inactive: 

605 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

606 

607 result = self.db.execute(query) 

608 return result.scalar() or 0 

609 

610 async def list_user_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

611 """List API tokens for a user. 

612 

613 Args: 

614 user_email: User's email address 

615 include_inactive: Include inactive/expired tokens 

616 limit: Maximum tokens to return 

617 offset: Number of tokens to skip 

618 

619 Returns: 

620 List[EmailApiToken]: User's API tokens 

621 

622 Examples: 

623 >>> service = TokenCatalogService(None) # Would use real DB session 

624 >>> # Returns List[EmailApiToken] for user 

625 """ 

626 # Validate parameters 

627 if limit <= 0 or limit > 1000: 

628 limit = 50 # Use default 

629 offset = max(offset, 0) # Use default 

630 query = select(EmailApiToken).where(EmailApiToken.user_email == user_email) 

631 

632 if not include_inactive: 

633 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

634 

635 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

636 

637 result = self.db.execute(query) 

638 return result.scalars().all() 

639 

640 async def list_team_tokens(self, team_id: str, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

641 """List API tokens for a team (accessible by any active team member). 

642 

643 Args: 

644 team_id: Team ID to list tokens for 

645 user_email: User's email (must be an active member of the team) 

646 include_inactive: Include inactive/expired tokens 

647 limit: Maximum tokens to return 

648 offset: Number of tokens to skip 

649 

650 Returns: 

651 List[EmailApiToken]: Team's API tokens 

652 

653 Raises: 

654 ValueError: If user is not an active member of the team 

655 """ 

656 team_ids = await self.get_user_team_ids(user_email) 

657 

658 if team_id not in team_ids: 

659 raise ValueError(f"User {user_email} is not an active member of team {team_id}") 

660 

661 # Validate parameters 

662 if limit <= 0 or limit > 1000: 

663 limit = 50 

664 offset = max(offset, 0) 

665 

666 query = select(EmailApiToken).where(EmailApiToken.team_id == team_id) 

667 

668 if not include_inactive: 

669 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

670 

671 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

672 result = self.db.execute(query) 

673 return result.scalars().all() 

674 

675 async def list_user_and_team_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

676 """List API tokens for a user plus team tokens from teams the user belongs to. 

677 

678 This combines personal tokens (created by the user) with team-scoped tokens 

679 from all teams where the user is an active member. 

680 

681 Args: 

682 user_email: User's email address 

683 include_inactive: Include inactive/expired tokens 

684 limit: Maximum tokens to return 

685 offset: Number of tokens to skip 

686 

687 Returns: 

688 List[EmailApiToken]: Combined list of user's personal tokens and team tokens 

689 

690 Examples: 

691 >>> service = TokenCatalogService(None) # Would use real DB session 

692 >>> # Returns List[EmailApiToken] including personal and team tokens 

693 """ 

694 # Validate parameters 

695 if limit <= 0 or limit > 1000: 

696 limit = 50 

697 offset = max(offset, 0) 

698 

699 team_ids = await self.get_user_team_ids(user_email) 

700 

701 # Build query: tokens created by user OR tokens in user's teams 

702 conditions = [EmailApiToken.user_email == user_email] 

703 if team_ids: 

704 conditions.append(EmailApiToken.team_id.in_(team_ids)) 

705 

706 query = select(EmailApiToken).where(or_(*conditions)) 

707 

708 if not include_inactive: 

709 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

710 

711 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

712 

713 result = self.db.execute(query) 

714 return result.scalars().all() 

715 

716 async def get_token(self, token_id: str, user_email: Optional[str] = None) -> Optional[EmailApiToken]: 

717 """Get a specific token by ID. 

718 

719 Args: 

720 token_id: Token ID 

721 user_email: Optional user email filter for security 

722 

723 Returns: 

724 Optional[EmailApiToken]: Token if found and authorized 

725 

726 Examples: 

727 >>> service = TokenCatalogService(None) # Would use real DB session 

728 >>> # Returns Optional[EmailApiToken] if found and authorized 

729 """ 

730 query = select(EmailApiToken).where(EmailApiToken.id == token_id) 

731 

732 if user_email: 

733 query = query.where(EmailApiToken.user_email == user_email) 

734 

735 result = self.db.execute(query) 

736 return result.scalar_one_or_none() 

737 

738 async def update_token( 

739 self, 

740 token_id: str, 

741 user_email: str, 

742 name: Optional[str] = None, 

743 description: Optional[str] = None, 

744 scope: Optional[TokenScope] = None, 

745 tags: Optional[List[str]] = None, 

746 caller_permissions: Optional[List[str]] = None, 

747 is_active: Optional[bool] = None, 

748 ) -> Optional[EmailApiToken]: 

749 """Update an existing token with scope containment validation. 

750 

751 Args: 

752 token_id: Token ID to update 

753 user_email: Owner's email for security 

754 name: New token name 

755 description: New description 

756 scope: New scoping configuration 

757 tags: New tags 

758 caller_permissions: Caller's effective permissions for scope containment 

759 is_active: New token active status 

760 

761 Returns: 

762 Optional[EmailApiToken]: Updated token if found 

763 

764 Raises: 

765 ValueError: If token not found or name conflicts 

766 

767 Examples: 

768 >>> service = TokenCatalogService(None) # Would use real DB session 

769 >>> # Returns Optional[EmailApiToken] if updated successfully 

770 """ 

771 token = await self.get_token(token_id, user_email) 

772 if not token: 

773 raise ValueError("Token not found or not authorized") 

774 

775 # Validate scope containment for scope changes 

776 if scope and scope.permissions: 

777 self._validate_scope_containment(scope.permissions, caller_permissions) 

778 

779 # Check for duplicate name if changing 

780 if name and name != token.name: 

781 existing = self.db.execute( 

782 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.id != token_id, EmailApiToken.is_active.is_(True))) 

783 ).scalar_one_or_none() 

784 

785 if existing: 

786 raise ValueError(f"Token name '{name}' already exists") 

787 

788 token.name = name 

789 

790 if description is not None: 

791 token.description = description 

792 

793 if tags is not None: 

794 token.tags = tags 

795 

796 if is_active is not None: 

797 token.is_active = is_active 

798 

799 if scope: 

800 token.server_id = scope.server_id 

801 token.resource_scopes = scope.permissions 

802 token.ip_restrictions = scope.ip_restrictions 

803 token.time_restrictions = scope.time_restrictions 

804 token.usage_limits = scope.usage_limits 

805 

806 self.db.commit() 

807 self.db.refresh(token) 

808 

809 logger.info(f"Updated token '{token.name}' for user {user_email}") 

810 

811 return token 

812 

813 async def revoke_token(self, token_id: str, user_email: str, revoked_by: str, reason: Optional[str] = None) -> bool: 

814 """Revoke a token owned by the specified user or in a team the user belongs to. 

815 

816 Args: 

817 token_id: Token ID to revoke 

818 user_email: Caller's email - must own the token or be a member of the token's team 

819 revoked_by: Email of user performing revocation (for audit) 

820 reason: Optional reason for revocation 

821 

822 Returns: 

823 bool: True if token was revoked, False if not found or not authorized 

824 

825 Examples: 

826 >>> service = TokenCatalogService(None) # Would use real DB session 

827 >>> # Returns bool: True if token was revoked successfully 

828 """ 

829 # First try ownership match 

830 token = await self.get_token(token_id, user_email) 

831 

832 # If not owned by caller, check if token is in a team the caller is an owner of 

833 if not token: 

834 token = await self.get_token(token_id) 

835 if not token or not token.team_id: 

836 return False 

837 # Only team owners (admins) can revoke other members' team tokens 

838 # First-Party 

839 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

840 

841 team_service = TeamManagementService(self.db) 

842 role = await team_service.get_user_role_in_team(user_email, token.team_id) 

843 if role != "owner": 

844 return False 

845 

846 # Mark token as inactive 

847 token.is_active = False 

848 

849 # Add to blacklist 

850 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason) 

851 

852 self.db.add(revocation) 

853 self.db.commit() 

854 

855 # Invalidate auth cache synchronously so revoked tokens are rejected immediately 

856 # (fire-and-forget via create_task risks a race where the next request arrives 

857 # before the invalidation task runs, allowing the revoked token through). 

858 try: 

859 # First-Party 

860 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

861 

862 await auth_cache.invalidate_revocation(token.jti) 

863 except Exception as cache_error: 

864 logger.debug(f"Failed to invalidate auth cache for revoked token: {cache_error}") 

865 

866 logger.info(f"Revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}") 

867 

868 return True 

869 

870 async def admin_revoke_token(self, token_id: str, revoked_by: str, reason: Optional[str] = None) -> bool: 

871 """Admin-only: Revoke any token without ownership check. 

872 

873 WARNING: This method bypasses ownership verification. 

874 Only call from admin-authenticated endpoints. 

875 

876 Args: 

877 token_id: Token ID to revoke 

878 revoked_by: Admin email for audit 

879 reason: Revocation reason 

880 

881 Returns: 

882 bool: True if token was revoked, False if not found 

883 

884 Examples: 

885 >>> service = TokenCatalogService(None) # Would use real DB session 

886 >>> # Returns bool: True if token was revoked successfully 

887 """ 

888 # No user filter - admin can revoke any token 

889 token = await self.get_token(token_id) 

890 if not token: 

891 return False 

892 

893 token.is_active = False 

894 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason) 

895 self.db.add(revocation) 

896 self.db.commit() 

897 

898 try: 

899 # First-Party 

900 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

901 

902 await auth_cache.invalidate_revocation(token.jti) 

903 except Exception as cache_error: 

904 logger.debug(f"Failed to invalidate auth cache: {cache_error}") 

905 

906 logger.info(f"Admin revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}") 

907 return True 

908 

909 async def is_token_revoked(self, jti: str) -> bool: 

910 """Check if a token JTI is revoked. 

911 

912 Args: 

913 jti: JWT ID to check 

914 

915 Returns: 

916 bool: True if token is revoked 

917 

918 Examples: 

919 >>> service = TokenCatalogService(None) # Would use real DB session 

920 >>> # Returns bool: True if token is revoked 

921 """ 

922 revocation = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)).scalar_one_or_none() 

923 

924 return revocation is not None 

925 

926 async def log_token_usage( 

927 self, 

928 jti: str, 

929 user_email: str, 

930 endpoint: Optional[str] = None, 

931 method: Optional[str] = None, 

932 ip_address: Optional[str] = None, 

933 user_agent: Optional[str] = None, 

934 status_code: Optional[int] = None, 

935 response_time_ms: Optional[int] = None, 

936 blocked: bool = False, 

937 block_reason: Optional[str] = None, 

938 ) -> None: 

939 """Log token usage for analytics and security. 

940 

941 Args: 

942 jti: JWT ID of token used 

943 user_email: Token owner's email 

944 endpoint: API endpoint accessed 

945 method: HTTP method 

946 ip_address: Client IP address 

947 user_agent: Client user agent 

948 status_code: HTTP response status 

949 response_time_ms: Response time in milliseconds 

950 blocked: Whether request was blocked 

951 block_reason: Reason for blocking 

952 

953 Examples: 

954 >>> service = TokenCatalogService(None) # Would use real DB session 

955 >>> # Logs token usage for analytics - no return value 

956 """ 

957 usage_log = TokenUsageLog( 

958 token_jti=jti, 

959 user_email=user_email, 

960 endpoint=endpoint, 

961 method=method, 

962 ip_address=ip_address, 

963 user_agent=user_agent, 

964 status_code=status_code, 

965 response_time_ms=response_time_ms, 

966 blocked=blocked, 

967 block_reason=block_reason, 

968 ) 

969 

970 self.db.add(usage_log) 

971 self.db.commit() 

972 

973 async def get_token_usage_stats(self, user_email: str, token_id: Optional[str] = None, days: int = 30) -> dict: 

974 """Get token usage statistics. 

975 

976 Args: 

977 user_email: User's email address 

978 token_id: Optional specific token ID 

979 days: Number of days to analyze 

980 

981 Returns: 

982 dict: Usage statistics 

983 

984 Examples: 

985 >>> service = TokenCatalogService(None) # Would use real DB session 

986 >>> # Returns dict with usage statistics 

987 """ 

988 start_date = utc_now() - timedelta(days=days) 

989 

990 # Get token JTI if specific token requested 

991 token_jti = None 

992 if token_id: 

993 token = await self.get_token(token_id, user_email) 

994 if token: 

995 token_jti = token.jti 

996 

997 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite 

998 dialect_name = self.db.get_bind().dialect.name 

999 if dialect_name == "postgresql": 

1000 return await self._get_usage_stats_postgresql(user_email, start_date, token_jti, days) 

1001 return await self._get_usage_stats_python(user_email, start_date, token_jti, days) 

1002 

1003 async def _get_usage_stats_postgresql(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict: 

1004 """Compute usage stats using PostgreSQL SQL aggregation. 

1005 

1006 Args: 

1007 user_email: User's email address 

1008 start_date: Start date for analysis 

1009 token_jti: Optional token JTI filter 

1010 days: Number of days being analyzed 

1011 

1012 Returns: 

1013 dict: Usage statistics computed via SQL 

1014 """ 

1015 # Build filter conditions 

1016 conditions = [TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date] 

1017 if token_jti: 

1018 conditions.append(TokenUsageLog.token_jti == token_jti) 

1019 

1020 base_filter = and_(*conditions) 

1021 

1022 # Main stats query using SQL aggregation 

1023 # Match Python behavior: 

1024 # - status_code must be non-null AND non-zero AND < 400 for success count 

1025 # - response_time_ms must be non-null AND non-zero for average (Python: if log.response_time_ms) 

1026 stats_query = ( 

1027 select( 

1028 func.count().label("total"), # pylint: disable=not-callable 

1029 func.sum( 

1030 case( 

1031 (and_(TokenUsageLog.status_code.isnot(None), TokenUsageLog.status_code > 0, TokenUsageLog.status_code < 400), 1), 

1032 else_=0, 

1033 ) 

1034 ).label("successful"), 

1035 func.sum(case((TokenUsageLog.blocked.is_(True), 1), else_=0)).label("blocked"), 

1036 # Only average non-null and non-zero response times (NULL values are ignored by AVG) 

1037 func.avg( 

1038 case( 

1039 (and_(TokenUsageLog.response_time_ms.isnot(None), TokenUsageLog.response_time_ms > 0), TokenUsageLog.response_time_ms), 

1040 else_=None, 

1041 ) 

1042 ).label("avg_response"), 

1043 ) 

1044 .select_from(TokenUsageLog) 

1045 .where(base_filter) 

1046 ) 

1047 

1048 result = self.db.execute(stats_query).fetchone() 

1049 

1050 total_requests = result.total or 0 

1051 successful_requests = result.successful or 0 

1052 blocked_requests = result.blocked or 0 

1053 avg_response_time = float(result.avg_response) if result.avg_response else 0.0 

1054 

1055 # Top endpoints query using SQL GROUP BY 

1056 # Match Python behavior: exclude None AND empty string endpoints (Python: if log.endpoint) 

1057 endpoints_query = ( 

1058 select(TokenUsageLog.endpoint, func.count().label("count")) # pylint: disable=not-callable 

1059 .where(and_(base_filter, TokenUsageLog.endpoint.isnot(None), TokenUsageLog.endpoint != "")) 

1060 .group_by(TokenUsageLog.endpoint) 

1061 .order_by(func.count().desc()) # pylint: disable=not-callable 

1062 .limit(5) 

1063 ) 

1064 

1065 endpoints_result = self.db.execute(endpoints_query).fetchall() 

1066 top_endpoints = [(row.endpoint, row.count) for row in endpoints_result] 

1067 

1068 return { 

1069 "period_days": days, 

1070 "total_requests": total_requests, 

1071 "successful_requests": successful_requests, 

1072 "blocked_requests": blocked_requests, 

1073 "success_rate": successful_requests / total_requests if total_requests > 0 else 0, 

1074 "average_response_time_ms": round(avg_response_time, 2), 

1075 "top_endpoints": top_endpoints, 

1076 } 

1077 

1078 async def _get_usage_stats_python(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict: 

1079 """Compute usage stats using Python (fallback for SQLite). 

1080 

1081 Args: 

1082 user_email: User's email address 

1083 start_date: Start date for analysis 

1084 token_jti: Optional token JTI filter 

1085 days: Number of days being analyzed 

1086 

1087 Returns: 

1088 dict: Usage statistics computed in Python 

1089 """ 

1090 query = select(TokenUsageLog).where(and_(TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date)) 

1091 

1092 if token_jti: 

1093 query = query.where(TokenUsageLog.token_jti == token_jti) 

1094 

1095 usage_logs = self.db.execute(query).scalars().all() 

1096 

1097 # Calculate statistics 

1098 total_requests = len(usage_logs) 

1099 successful_requests = sum(1 for log in usage_logs if log.status_code and log.status_code < 400) 

1100 blocked_requests = sum(1 for log in usage_logs if log.blocked) 

1101 

1102 # Average response time 

1103 response_times = [log.response_time_ms for log in usage_logs if log.response_time_ms] 

1104 avg_response_time = sum(response_times) / len(response_times) if response_times else 0 

1105 

1106 # Most accessed endpoints 

1107 endpoint_counts: dict = {} 

1108 for log in usage_logs: 

1109 if log.endpoint: 

1110 endpoint_counts[log.endpoint] = endpoint_counts.get(log.endpoint, 0) + 1 

1111 

1112 top_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:5] 

1113 

1114 return { 

1115 "period_days": days, 

1116 "total_requests": total_requests, 

1117 "successful_requests": successful_requests, 

1118 "blocked_requests": blocked_requests, 

1119 "success_rate": successful_requests / total_requests if total_requests > 0 else 0, 

1120 "average_response_time_ms": round(avg_response_time, 2), 

1121 "top_endpoints": top_endpoints, 

1122 } 

1123 

1124 async def get_token_revocation(self, jti: str) -> Optional[TokenRevocation]: 

1125 """Get token revocation information by JTI. 

1126 

1127 Args: 

1128 jti: JWT token ID 

1129 

1130 Returns: 

1131 Optional[TokenRevocation]: Revocation info if token is revoked 

1132 

1133 Examples: 

1134 >>> service = TokenCatalogService(None) # Would use real DB session 

1135 >>> # Returns Optional[TokenRevocation] if token is revoked 

1136 """ 

1137 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

1138 return result.scalar_one_or_none() 

1139 

1140 async def get_token_revocations_batch(self, jtis: List[str]) -> Dict[str, TokenRevocation]: 

1141 """Get token revocation information for multiple JTIs in a single query. 

1142 

1143 Args: 

1144 jtis: List of JWT token IDs 

1145 

1146 Returns: 

1147 Dict mapping JTI to TokenRevocation for revoked tokens only. 

1148 """ 

1149 if not jtis: 

1150 return {} 

1151 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti.in_(jtis))) 

1152 return {rev.jti: rev for rev in result.scalars().all()} 

1153 

1154 async def list_all_tokens(self, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

1155 """List all API tokens (admin only). 

1156 

1157 Args: 

1158 include_inactive: Include inactive/expired tokens 

1159 limit: Maximum tokens to return 

1160 offset: Number of tokens to skip 

1161 

1162 Returns: 

1163 List[EmailApiToken]: All API tokens 

1164 """ 

1165 if limit <= 0 or limit > 1000: 

1166 limit = 50 

1167 offset = max(offset, 0) 

1168 

1169 query = select(EmailApiToken) 

1170 

1171 if not include_inactive: 

1172 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

1173 

1174 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

1175 

1176 result = self.db.execute(query) 

1177 return result.scalars().all() 

1178 

1179 async def count_all_tokens(self, include_inactive: bool = False) -> int: 

1180 """Count all API tokens (admin only). 

1181 

1182 Args: 

1183 include_inactive: Include inactive/expired tokens in count 

1184 

1185 Returns: 

1186 int: Total count of all tokens 

1187 """ 

1188 query = select(func.count(EmailApiToken.id)) # pylint: disable=not-callable 

1189 

1190 if not include_inactive: 

1191 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

1192 

1193 result = self.db.execute(query) 

1194 return result.scalar() or 0 

1195 

1196 async def cleanup_expired_tokens(self) -> int: 

1197 """Clean up expired tokens using bulk UPDATE. 

1198 

1199 Uses a single SQL UPDATE statement instead of loading tokens into memory 

1200 and updating them one by one. This is more efficient and avoids memory 

1201 issues when many tokens expire at once. 

1202 

1203 Returns: 

1204 int: Number of tokens cleaned up 

1205 

1206 Examples: 

1207 >>> service = TokenCatalogService(None) # Would use real DB session 

1208 >>> # Returns int: Number of tokens cleaned up 

1209 """ 

1210 try: 

1211 now = utc_now() 

1212 count = self.db.query(EmailApiToken).filter(EmailApiToken.expires_at < now, EmailApiToken.is_active.is_(True)).update({"is_active": False}, synchronize_session=False) 

1213 

1214 self.db.commit() 

1215 

1216 if count > 0: 

1217 logger.info(f"Cleaned up {count} expired tokens") 

1218 

1219 return count 

1220 

1221 except Exception as e: 

1222 self.db.rollback() 

1223 logger.error(f"Failed to cleanup expired tokens: {e}") 

1224 return 0