Coverage for mcpgateway / services / token_catalog_service.py: 99%

253 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/token_catalog_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Token Catalog Service. 

8This module provides comprehensive API token management with scoping, 

9revocation, usage tracking, and analytics for email-based users. 

10 

11Examples: 

12 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService 

13 >>> service = TokenCatalogService(None) # Mock database for doctest 

14 >>> # Service provides full token lifecycle management 

15""" 

16 

17# Standard 

18from datetime import datetime, timedelta, timezone 

19import hashlib 

20import math 

21from typing import List, Optional 

22import uuid 

23 

24# Third-Party 

25from sqlalchemy import and_, case, func, or_, select 

26from sqlalchemy.orm import Session 

27 

28# First-Party 

29from mcpgateway.db import EmailApiToken, EmailUser, TokenRevocation, TokenUsageLog, utc_now 

30from mcpgateway.services.logging_service import LoggingService 

31from mcpgateway.utils.create_jwt_token import create_jwt_token 

32 

33# Initialize logging 

34logging_service = LoggingService() 

35logger = logging_service.get_logger(__name__) 

36 

37 

38class TokenScope: 

39 """Token scoping configuration for fine-grained access control. 

40 

41 This class encapsulates token scoping parameters including 

42 server restrictions, permissions, IP limitations, and usage quotas. 

43 

44 Attributes: 

45 server_id (Optional[str]): Limit token to specific server 

46 permissions (List[str]): Specific permission scopes 

47 ip_restrictions (List[str]): IP address/CIDR restrictions 

48 time_restrictions (dict): Time-based access limitations 

49 usage_limits (dict): Rate limiting and quota settings 

50 

51 Examples: 

52 >>> scope = TokenScope( 

53 ... server_id="prod-server-123", 

54 ... permissions=["tools.read", "resources.read"], 

55 ... ip_restrictions=["192.168.1.0/24"], 

56 ... time_restrictions={"business_hours_only": True} 

57 ... ) 

58 >>> scope.is_server_scoped() 

59 True 

60 >>> scope.has_permission("tools.read") 

61 True 

62 >>> scope.has_permission("tools.write") 

63 False 

64 >>> scope.has_permission("resources.read") 

65 True 

66 >>> 

67 >>> # Test empty scope 

68 >>> empty_scope = TokenScope() 

69 >>> empty_scope.is_server_scoped() 

70 False 

71 >>> empty_scope.has_permission("anything") 

72 False 

73 >>> 

74 >>> # Test global scope 

75 >>> global_scope = TokenScope(permissions=["*"]) 

76 >>> global_scope.has_permission("*") 

77 True 

78 """ 

79 

80 def __init__( 

81 self, 

82 server_id: Optional[str] = None, 

83 permissions: Optional[List[str]] = None, 

84 ip_restrictions: Optional[List[str]] = None, 

85 time_restrictions: Optional[dict] = None, 

86 usage_limits: Optional[dict] = None, 

87 ): 

88 """Initialize TokenScope with specified restrictions and limits. 

89 

90 Args: 

91 server_id: Optional server ID to scope token to specific server 

92 permissions: List of permissions granted to this token 

93 ip_restrictions: List of IP addresses/ranges allowed to use token 

94 time_restrictions: Dictionary of time-based access restrictions 

95 usage_limits: Dictionary of usage limits for the token 

96 """ 

97 self.server_id = server_id 

98 self.permissions = permissions or [] 

99 self.ip_restrictions = ip_restrictions or [] 

100 self.time_restrictions = time_restrictions or {} 

101 self.usage_limits = usage_limits or {} 

102 

103 def is_server_scoped(self) -> bool: 

104 """Check if token is scoped to a specific server. 

105 

106 Returns: 

107 bool: True if scoped to a server, False otherwise. 

108 """ 

109 return self.server_id is not None 

110 

111 def has_permission(self, permission: str) -> bool: 

112 """Check if scope includes specific permission. 

113 

114 Args: 

115 permission: Permission string to check for. 

116 

117 Returns: 

118 bool: True if permission is included, False otherwise. 

119 """ 

120 return permission in self.permissions 

121 

122 def to_dict(self) -> dict: 

123 """Convert scope to dictionary for JSON storage. 

124 

125 Returns: 

126 dict: Dictionary representation of the token scope. 

127 

128 Examples: 

129 >>> scope = TokenScope(server_id="server-123", permissions=["read", "write"]) 

130 >>> result = scope.to_dict() 

131 >>> result["server_id"] 

132 'server-123' 

133 >>> result["permissions"] 

134 ['read', 'write'] 

135 >>> isinstance(result, dict) 

136 True 

137 """ 

138 return {"server_id": self.server_id, "permissions": self.permissions, "ip_restrictions": self.ip_restrictions, "time_restrictions": self.time_restrictions, "usage_limits": self.usage_limits} 

139 

140 @classmethod 

141 def from_dict(cls, data: dict) -> "TokenScope": 

142 """Create TokenScope from dictionary. 

143 

144 Args: 

145 data: Dictionary containing scope configuration. 

146 

147 Returns: 

148 TokenScope: New TokenScope instance. 

149 

150 Examples: 

151 >>> data = { 

152 ... "server_id": "server-456", 

153 ... "permissions": ["tools.read", "tools.execute"], 

154 ... "ip_restrictions": ["10.0.0.0/8"] 

155 ... } 

156 >>> scope = TokenScope.from_dict(data) 

157 >>> scope.server_id 

158 'server-456' 

159 >>> scope.permissions 

160 ['tools.read', 'tools.execute'] 

161 >>> scope.is_server_scoped() 

162 True 

163 >>> scope.has_permission("tools.read") 

164 True 

165 >>> 

166 >>> # Test empty dict 

167 >>> empty_scope = TokenScope.from_dict({}) 

168 >>> empty_scope.server_id is None 

169 True 

170 >>> empty_scope.permissions 

171 [] 

172 """ 

173 return cls( 

174 server_id=data.get("server_id"), 

175 permissions=data.get("permissions", []), 

176 ip_restrictions=data.get("ip_restrictions", []), 

177 time_restrictions=data.get("time_restrictions", {}), 

178 usage_limits=data.get("usage_limits", {}), 

179 ) 

180 

181 

182class TokenCatalogService: 

183 """Service for managing user API token catalogs. 

184 

185 This service provides comprehensive token lifecycle management including 

186 creation, scoping, revocation, usage tracking, and analytics. It handles 

187 JWT-based API tokens with fine-grained access control, team support, 

188 and comprehensive audit logging. 

189 

190 Key features: 

191 - Token creation with customizable scopes and permissions 

192 - Team-based token management with role-based access 

193 - Token revocation and blacklisting 

194 - Usage tracking and analytics 

195 - IP and time-based restrictions 

196 - Automatic cleanup of expired tokens 

197 

198 Attributes: 

199 db (Session): SQLAlchemy database session for token operations 

200 

201 Examples: 

202 >>> from mcpgateway.services.token_catalog_service import TokenCatalogService 

203 >>> service = TokenCatalogService(None) # Mock database for doctest 

204 >>> service.db is None 

205 True 

206 """ 

207 

208 def __init__(self, db: Session): 

209 """Initialize TokenCatalogService with database session. 

210 

211 Args: 

212 db: SQLAlchemy database session for token operations 

213 """ 

214 self.db = db 

215 

216 async def _generate_token( 

217 self, user_email: str, jti: str, team_id: Optional[str] = None, expires_at: Optional[datetime] = None, scope: Optional["TokenScope"] = None, user: Optional[object] = None 

218 ) -> str: 

219 """Generate a JWT token for API access. 

220 

221 This internal method creates a properly formatted JWT token with all 

222 necessary claims including user identity, scopes, team membership, 

223 and expiration. The token follows the MCP Gateway JWT structure. 

224 

225 Args: 

226 user_email: User's email address for the token subject 

227 jti: JWT ID for token uniqueness 

228 team_id: Optional team ID for team-scoped tokens 

229 expires_at: Optional expiration datetime 

230 scope: Optional token scope information for access control 

231 user: Optional user object to extract admin privileges 

232 

233 Returns: 

234 str: Signed JWT token string ready for API authentication 

235 

236 Raises: 

237 ValueError: If expires_at is in the past (cannot create already-expired tokens) 

238 

239 Note: 

240 This is an internal method. Use create_token() to generate 

241 tokens with proper database tracking and validation. 

242 """ 

243 # Calculate expiration in minutes from expires_at 

244 expires_in_minutes = 0 

245 if expires_at: 

246 now = datetime.now(timezone.utc) 

247 delta = expires_at - now 

248 delta_seconds = delta.total_seconds() 

249 

250 # Guard: reject already-expired expiration times 

251 if delta_seconds <= 0: 

252 raise ValueError("Token expiration time is in the past. Cannot create already-expired tokens.") 

253 

254 # Use ceiling to ensure we always have at least 1 minute expiration 

255 # This prevents <60s from rounding to 0 and creating non-expiring tokens 

256 expires_in_minutes = max(1, math.ceil(delta_seconds / 60)) 

257 

258 # Build user data dict 

259 user_data = { 

260 "email": user_email, 

261 "full_name": "API Token User", 

262 "is_admin": user.is_admin if user else False, 

263 "auth_provider": "api_token", 

264 } 

265 

266 # Build teams list 

267 teams = [team_id] if team_id else [] 

268 

269 # Build scopes dict 

270 # Empty permissions = defer to RBAC at runtime (not wildcard access) 

271 scopes_dict = None 

272 if scope: 

273 scopes_dict = { 

274 "server_id": scope.server_id, 

275 "permissions": scope.permissions if scope.permissions is not None else [], 

276 "ip_restrictions": scope.ip_restrictions or [], 

277 "time_restrictions": scope.time_restrictions or {}, 

278 } 

279 else: 

280 scopes_dict = { 

281 "server_id": None, 

282 "permissions": [], # Empty = inherit from RBAC at runtime 

283 "ip_restrictions": [], 

284 "time_restrictions": {}, 

285 } 

286 

287 # Generate JWT token using the centralized token creation utility 

288 # Pass structured data to the enhanced create_jwt_token function 

289 return await create_jwt_token( 

290 data={"sub": user_email, "jti": jti, "token_use": "api"}, # nosec B105 - token type marker, not a password 

291 expires_in_minutes=expires_in_minutes, 

292 user_data=user_data, 

293 teams=teams, 

294 scopes=scopes_dict, 

295 ) 

296 

297 def _hash_token(self, token: str) -> str: 

298 """Create secure hash of token for storage. 

299 

300 Args: 

301 token: Raw token string 

302 

303 Returns: 

304 str: SHA-256 hash of token 

305 

306 Examples: 

307 >>> service = TokenCatalogService(None) 

308 >>> hash_val = service._hash_token("test_token") 

309 >>> len(hash_val) == 64 

310 True 

311 """ 

312 return hashlib.sha256(token.encode()).hexdigest() 

313 

314 def _validate_scope_containment( 

315 self, 

316 requested_permissions: Optional[List[str]], 

317 caller_permissions: Optional[List[str]], 

318 ) -> None: 

319 """Validate that requested permissions don't exceed caller's permissions. 

320 

321 SECURITY: This is fail-secure. If caller_permissions is empty/None, 

322 custom scopes are DENIED. Users without explicit permissions can only 

323 create tokens with empty scope (inherit at runtime). 

324 

325 Args: 

326 requested_permissions: Permissions requested for new/updated token 

327 caller_permissions: Caller's effective permissions (RBAC + current token scopes) 

328 

329 Raises: 

330 ValueError: If requested permissions exceed caller's permissions 

331 """ 

332 # No requested permissions = empty scope, always allowed 

333 if not requested_permissions: 

334 return 

335 

336 # FAIL-SECURE: If caller has no permissions, deny any custom scope 

337 if not caller_permissions: 

338 raise ValueError("Cannot specify custom token permissions. " + "You have no explicit permissions to delegate. " + "Create a token without scope to inherit permissions at runtime.") 

339 

340 # Wildcard caller can grant anything 

341 if "*" in caller_permissions: 

342 return 

343 

344 # Wildcard request requires wildcard caller 

345 if "*" in requested_permissions: 

346 raise ValueError("Cannot create token with wildcard permissions. " + "Your effective permissions do not include wildcard access.") 

347 

348 # Check each requested permission 

349 for req_perm in requested_permissions: 

350 if req_perm in caller_permissions: 

351 continue 

352 

353 # Check for category wildcard (e.g., "tools.*" allows "tools.read") 

354 if "." in req_perm: 354 ↛ 359line 354 didn't jump to line 359 because the condition on line 354 was always true

355 category = req_perm.split(".")[0] 

356 if f"{category}.*" in caller_permissions: 

357 continue 

358 

359 raise ValueError(f"Cannot grant permission '{req_perm}' - not in your effective permissions.") 

360 

361 async def create_token( 

362 self, 

363 user_email: str, 

364 name: str, 

365 description: Optional[str] = None, 

366 scope: Optional[TokenScope] = None, 

367 expires_in_days: Optional[int] = None, 

368 tags: Optional[List[str]] = None, 

369 team_id: Optional[str] = None, 

370 caller_permissions: Optional[List[str]] = None, 

371 is_active: bool = True, 

372 ) -> tuple[EmailApiToken, str]: 

373 """ 

374 Create a new API token with team-level scoping and additional configurations. 

375 

376 This method generates a JWT-based API token with team-level scoping and optional security configurations, 

377 such as expiration, permissions, IP restrictions, and usage limits. The token is associated with a user 

378 and a specific team, ensuring access control and multi-tenancy support. 

379 

380 The function will: 

381 - Validate the existence of the user. 

382 - Ensure the user is an active member of the specified team. 

383 - Verify that the token name is unique for the user+team combination. 

384 - Generate a JWT with the specified scoping parameters (e.g., permissions, IP, etc.). 

385 - Store the token in the database with the relevant details and return the token and raw JWT string. 

386 

387 Args: 

388 user_email (str): The email address of the user requesting the token. 

389 name (str): A unique, human-readable name for the token (must be unique per user+team). 

390 description (Optional[str]): A description for the token (default is None). 

391 scope (Optional[TokenScope]): The scoping configuration for the token, including permissions, 

392 server ID, IP restrictions, etc. (default is None). 

393 expires_in_days (Optional[int]): The expiration time in days for the token (None means no expiration). 

394 tags (Optional[List[str]]): A list of organizational tags for the token (default is an empty list). 

395 team_id (Optional[str]): The team ID to which the token should be scoped. This is required for team-level scoping. 

396 caller_permissions (Optional[List[str]]): The permissions of the caller creating the token. Used for 

397 scope containment validation to ensure the new token cannot have broader permissions than the caller. 

398 is_active (bool): Whether the token should be created as active (default is True). 

399 

400 Returns: 

401 tuple[EmailApiToken, str]: A tuple where the first element is the `EmailApiToken` database record and 

402 the second element is the raw JWT token string. The `EmailApiToken` contains the database record with the 

403 token details. 

404 

405 Raises: 

406 ValueError: If any of the following validation checks fail: 

407 - The `user_email` does not correspond to an existing user. 

408 - The `team_id` is missing or the user is not an active member of the specified team. 

409 - A token with the same name already exists for the given user and team. 

410 - Invalid token configuration (e.g., invalid expiration date). 

411 

412 Examples: 

413 >>> # This method requires database operations, shown for reference 

414 >>> service = TokenCatalogService(None) # Would use real DB session 

415 >>> # token, raw_token = await service.create_token(...) 

416 >>> # Returns (EmailApiToken, raw_token_string) tuple 

417 """ 

418 # # Enforce team-level scoping requirement 

419 # if not team_id: 

420 # raise ValueError("team_id is required for token creation. " "Please select a specific team before creating a token. " "You cannot create tokens while viewing 'All Teams'.") 

421 

422 # Validate user exists 

423 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none() 

424 

425 if not user: 

426 raise ValueError(f"User not found: {user_email}") 

427 

428 # Validate scope containment (fail-secure if no caller_permissions) 

429 if scope and scope.permissions: 

430 self._validate_scope_containment(scope.permissions, caller_permissions) 

431 

432 # Validate team exists and user is active member 

433 if team_id: 

434 # First-Party 

435 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel 

436 

437 # Check if team exists 

438 team = self.db.execute(select(EmailTeam).where(EmailTeam.id == team_id)).scalar_one_or_none() 

439 

440 if not team: 

441 raise ValueError(f"Team not found: {team_id}") 

442 

443 # Verify user is an active member of the team 

444 membership = self.db.execute( 

445 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True))) 

446 ).scalar_one_or_none() 

447 

448 if not membership: 

449 raise ValueError(f"User {user_email} is not an active member of team {team_id}. Only team members can create tokens for the team.") 

450 

451 # Check for duplicate active token name for this user+team 

452 existing_token = self.db.execute( 

453 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.team_id == team_id, EmailApiToken.is_active.is_(True))) 

454 ).scalar_one_or_none() 

455 

456 if existing_token: 

457 raise ValueError(f"Token with name '{name}' already exists for user {user_email} in team {team_id}. Please choose a different name.") 

458 

459 # CALCULATE EXPIRATION DATE 

460 expires_at = None 

461 if expires_in_days: 

462 expires_at = utc_now() + timedelta(days=expires_in_days) 

463 

464 jti = str(uuid.uuid4()) # Unique JWT ID 

465 # Generate JWT token with all necessary claims 

466 raw_token = await self._generate_token(user_email=user_email, jti=jti, team_id=team_id, expires_at=expires_at, scope=scope, user=user) # Pass user object to include admin status 

467 

468 # Hash token for secure storage 

469 token_hash = self._hash_token(raw_token) 

470 

471 # Create database record 

472 api_token = EmailApiToken( 

473 id=str(uuid.uuid4()), 

474 user_email=user_email, 

475 team_id=team_id, # Store team association 

476 name=name, 

477 jti=jti, 

478 description=description, 

479 token_hash=token_hash, # Store hash, not raw token 

480 expires_at=expires_at, 

481 tags=tags or [], 

482 # Store scoping information 

483 server_id=scope.server_id if scope else None, 

484 resource_scopes=scope.permissions if scope else [], 

485 ip_restrictions=scope.ip_restrictions if scope else [], 

486 time_restrictions=scope.time_restrictions if scope else {}, 

487 usage_limits=scope.usage_limits if scope else {}, 

488 # Token status 

489 is_active=is_active, 

490 created_at=utc_now(), 

491 last_used=None, 

492 ) 

493 

494 self.db.add(api_token) 

495 self.db.commit() 

496 self.db.refresh(api_token) 

497 

498 token_type = f"team-scoped (team: {team_id})" if team_id else "public-only" 

499 logger.info(f"Created {token_type} API token '{name}' for user {user_email}. Token ID: {api_token.id}, Expires: {expires_at or 'Never'}") 

500 return api_token, raw_token 

501 

502 async def list_user_tokens(self, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

503 """List API tokens for a user. 

504 

505 Args: 

506 user_email: User's email address 

507 include_inactive: Include inactive/expired tokens 

508 limit: Maximum tokens to return 

509 offset: Number of tokens to skip 

510 

511 Returns: 

512 List[EmailApiToken]: User's API tokens 

513 

514 Examples: 

515 >>> service = TokenCatalogService(None) # Would use real DB session 

516 >>> # Returns List[EmailApiToken] for user 

517 """ 

518 # Validate parameters 

519 if limit <= 0 or limit > 1000: 

520 limit = 50 # Use default 

521 offset = max(offset, 0) # Use default 

522 query = select(EmailApiToken).where(EmailApiToken.user_email == user_email) 

523 

524 if not include_inactive: 

525 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

526 

527 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

528 

529 result = self.db.execute(query) 

530 return result.scalars().all() 

531 

532 async def list_team_tokens(self, team_id: str, user_email: str, include_inactive: bool = False, limit: int = 100, offset: int = 0) -> List[EmailApiToken]: 

533 """List API tokens for a team (only accessible by team owners). 

534 

535 Args: 

536 team_id: Team ID to list tokens for 

537 user_email: User's email (must be team owner) 

538 include_inactive: Include inactive/expired tokens 

539 limit: Maximum tokens to return 

540 offset: Number of tokens to skip 

541 

542 Returns: 

543 List[EmailApiToken]: Team's API tokens 

544 

545 Raises: 

546 ValueError: If user is not a team owner 

547 """ 

548 # Validate user is team owner 

549 # First-Party 

550 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

551 

552 membership = self.db.execute( 

553 select(EmailTeamMember).where(and_(EmailTeamMember.team_id == team_id, EmailTeamMember.user_email == user_email, EmailTeamMember.role == "owner", EmailTeamMember.is_active.is_(True))) 

554 ).scalar_one_or_none() 

555 

556 if not membership: 

557 raise ValueError(f"Only team owners can view team tokens for {team_id}") 

558 

559 # Validate parameters 

560 if limit <= 0 or limit > 1000: 

561 limit = 50 

562 offset = max(offset, 0) 

563 

564 query = select(EmailApiToken).where(EmailApiToken.team_id == team_id) 

565 

566 if not include_inactive: 566 ↛ 569line 566 didn't jump to line 569 because the condition on line 566 was always true

567 query = query.where(and_(EmailApiToken.is_active.is_(True), or_(EmailApiToken.expires_at.is_(None), EmailApiToken.expires_at > utc_now()))) 

568 

569 query = query.order_by(EmailApiToken.created_at.desc()).limit(limit).offset(offset) 

570 result = self.db.execute(query) 

571 return result.scalars().all() 

572 

573 async def get_token(self, token_id: str, user_email: Optional[str] = None) -> Optional[EmailApiToken]: 

574 """Get a specific token by ID. 

575 

576 Args: 

577 token_id: Token ID 

578 user_email: Optional user email filter for security 

579 

580 Returns: 

581 Optional[EmailApiToken]: Token if found and authorized 

582 

583 Examples: 

584 >>> service = TokenCatalogService(None) # Would use real DB session 

585 >>> # Returns Optional[EmailApiToken] if found and authorized 

586 """ 

587 query = select(EmailApiToken).where(EmailApiToken.id == token_id) 

588 

589 if user_email: 

590 query = query.where(EmailApiToken.user_email == user_email) 

591 

592 result = self.db.execute(query) 

593 return result.scalar_one_or_none() 

594 

595 async def update_token( 

596 self, 

597 token_id: str, 

598 user_email: str, 

599 name: Optional[str] = None, 

600 description: Optional[str] = None, 

601 scope: Optional[TokenScope] = None, 

602 tags: Optional[List[str]] = None, 

603 caller_permissions: Optional[List[str]] = None, 

604 is_active: Optional[bool] = None, 

605 ) -> Optional[EmailApiToken]: 

606 """Update an existing token with scope containment validation. 

607 

608 Args: 

609 token_id: Token ID to update 

610 user_email: Owner's email for security 

611 name: New token name 

612 description: New description 

613 scope: New scoping configuration 

614 tags: New tags 

615 caller_permissions: Caller's effective permissions for scope containment 

616 is_active: New token active status 

617 

618 Returns: 

619 Optional[EmailApiToken]: Updated token if found 

620 

621 Raises: 

622 ValueError: If token not found or name conflicts 

623 

624 Examples: 

625 >>> service = TokenCatalogService(None) # Would use real DB session 

626 >>> # Returns Optional[EmailApiToken] if updated successfully 

627 """ 

628 token = await self.get_token(token_id, user_email) 

629 if not token: 

630 raise ValueError("Token not found or not authorized") 

631 

632 # Validate scope containment for scope changes 

633 if scope and scope.permissions: 

634 self._validate_scope_containment(scope.permissions, caller_permissions) 

635 

636 # Check for duplicate name if changing 

637 if name and name != token.name: 

638 existing = self.db.execute( 

639 select(EmailApiToken).where(and_(EmailApiToken.user_email == user_email, EmailApiToken.name == name, EmailApiToken.id != token_id, EmailApiToken.is_active.is_(True))) 

640 ).scalar_one_or_none() 

641 

642 if existing: 

643 raise ValueError(f"Token name '{name}' already exists") 

644 

645 token.name = name 

646 

647 if description is not None: 

648 token.description = description 

649 

650 if tags is not None: 

651 token.tags = tags 

652 

653 if is_active is not None: 

654 token.is_active = is_active 

655 

656 if scope: 

657 token.server_id = scope.server_id 

658 token.resource_scopes = scope.permissions 

659 token.ip_restrictions = scope.ip_restrictions 

660 token.time_restrictions = scope.time_restrictions 

661 token.usage_limits = scope.usage_limits 

662 

663 self.db.commit() 

664 self.db.refresh(token) 

665 

666 logger.info(f"Updated token '{token.name}' for user {user_email}") 

667 

668 return token 

669 

670 async def revoke_token(self, token_id: str, user_email: str, revoked_by: str, reason: Optional[str] = None) -> bool: 

671 """Revoke a token owned by the specified user. 

672 

673 Args: 

674 token_id: Token ID to revoke 

675 user_email: Owner's email - token must belong to this user (ownership check) 

676 revoked_by: Email of user performing revocation (for audit) 

677 reason: Optional reason for revocation 

678 

679 Returns: 

680 bool: True if token was revoked, False if not found or not authorized 

681 

682 Examples: 

683 >>> service = TokenCatalogService(None) # Would use real DB session 

684 >>> # Returns bool: True if token was revoked successfully 

685 """ 

686 # SECURITY FIX: Filter by owner to prevent cross-user revocation 

687 token = await self.get_token(token_id, user_email) 

688 if not token: 

689 return False 

690 

691 # Mark token as inactive 

692 token.is_active = False 

693 

694 # Add to blacklist 

695 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason) 

696 

697 self.db.add(revocation) 

698 self.db.commit() 

699 

700 # Invalidate auth cache for revoked token 

701 try: 

702 # Standard 

703 import asyncio # pylint: disable=import-outside-toplevel 

704 

705 # First-Party 

706 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

707 

708 asyncio.create_task(auth_cache.invalidate_revocation(token.jti)) 

709 except Exception as cache_error: 

710 logger.debug(f"Failed to invalidate auth cache for revoked token: {cache_error}") 

711 

712 logger.info(f"Revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}") 

713 

714 return True 

715 

716 async def admin_revoke_token(self, token_id: str, revoked_by: str, reason: Optional[str] = None) -> bool: 

717 """Admin-only: Revoke any token without ownership check. 

718 

719 WARNING: This method bypasses ownership verification. 

720 Only call from admin-authenticated endpoints. 

721 

722 Args: 

723 token_id: Token ID to revoke 

724 revoked_by: Admin email for audit 

725 reason: Revocation reason 

726 

727 Returns: 

728 bool: True if token was revoked, False if not found 

729 

730 Examples: 

731 >>> service = TokenCatalogService(None) # Would use real DB session 

732 >>> # Returns bool: True if token was revoked successfully 

733 """ 

734 # No user filter - admin can revoke any token 

735 token = await self.get_token(token_id) 

736 if not token: 

737 return False 

738 

739 token.is_active = False 

740 revocation = TokenRevocation(jti=token.jti, revoked_by=revoked_by, reason=reason) 

741 self.db.add(revocation) 

742 self.db.commit() 

743 

744 try: 

745 # Standard 

746 import asyncio # pylint: disable=import-outside-toplevel 

747 

748 # First-Party 

749 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

750 

751 asyncio.create_task(auth_cache.invalidate_revocation(token.jti)) 

752 except Exception as cache_error: 

753 logger.debug(f"Failed to invalidate auth cache: {cache_error}") 

754 

755 logger.info(f"Admin revoked token '{token.name}' (JTI: {token.jti}) by {revoked_by}") 

756 return True 

757 

758 async def is_token_revoked(self, jti: str) -> bool: 

759 """Check if a token JTI is revoked. 

760 

761 Args: 

762 jti: JWT ID to check 

763 

764 Returns: 

765 bool: True if token is revoked 

766 

767 Examples: 

768 >>> service = TokenCatalogService(None) # Would use real DB session 

769 >>> # Returns bool: True if token is revoked 

770 """ 

771 revocation = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)).scalar_one_or_none() 

772 

773 return revocation is not None 

774 

775 async def log_token_usage( 

776 self, 

777 jti: str, 

778 user_email: str, 

779 endpoint: Optional[str] = None, 

780 method: Optional[str] = None, 

781 ip_address: Optional[str] = None, 

782 user_agent: Optional[str] = None, 

783 status_code: Optional[int] = None, 

784 response_time_ms: Optional[int] = None, 

785 blocked: bool = False, 

786 block_reason: Optional[str] = None, 

787 ) -> None: 

788 """Log token usage for analytics and security. 

789 

790 Args: 

791 jti: JWT ID of token used 

792 user_email: Token owner's email 

793 endpoint: API endpoint accessed 

794 method: HTTP method 

795 ip_address: Client IP address 

796 user_agent: Client user agent 

797 status_code: HTTP response status 

798 response_time_ms: Response time in milliseconds 

799 blocked: Whether request was blocked 

800 block_reason: Reason for blocking 

801 

802 Examples: 

803 >>> service = TokenCatalogService(None) # Would use real DB session 

804 >>> # Logs token usage for analytics - no return value 

805 """ 

806 usage_log = TokenUsageLog( 

807 token_jti=jti, 

808 user_email=user_email, 

809 endpoint=endpoint, 

810 method=method, 

811 ip_address=ip_address, 

812 user_agent=user_agent, 

813 status_code=status_code, 

814 response_time_ms=response_time_ms, 

815 blocked=blocked, 

816 block_reason=block_reason, 

817 ) 

818 

819 self.db.add(usage_log) 

820 self.db.commit() 

821 

822 # Update token last_used timestamp 

823 token = self.db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti)).scalar_one_or_none() 

824 

825 if token: 

826 token.last_used = utc_now() 

827 self.db.commit() 

828 

829 async def get_token_usage_stats(self, user_email: str, token_id: Optional[str] = None, days: int = 30) -> dict: 

830 """Get token usage statistics. 

831 

832 Args: 

833 user_email: User's email address 

834 token_id: Optional specific token ID 

835 days: Number of days to analyze 

836 

837 Returns: 

838 dict: Usage statistics 

839 

840 Examples: 

841 >>> service = TokenCatalogService(None) # Would use real DB session 

842 >>> # Returns dict with usage statistics 

843 """ 

844 start_date = utc_now() - timedelta(days=days) 

845 

846 # Get token JTI if specific token requested 

847 token_jti = None 

848 if token_id: 

849 token = await self.get_token(token_id, user_email) 

850 if token: 850 ↛ 854line 850 didn't jump to line 854 because the condition on line 850 was always true

851 token_jti = token.jti 

852 

853 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite 

854 dialect_name = self.db.get_bind().dialect.name 

855 if dialect_name == "postgresql": 

856 return await self._get_usage_stats_postgresql(user_email, start_date, token_jti, days) 

857 return await self._get_usage_stats_python(user_email, start_date, token_jti, days) 

858 

859 async def _get_usage_stats_postgresql(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict: 

860 """Compute usage stats using PostgreSQL SQL aggregation. 

861 

862 Args: 

863 user_email: User's email address 

864 start_date: Start date for analysis 

865 token_jti: Optional token JTI filter 

866 days: Number of days being analyzed 

867 

868 Returns: 

869 dict: Usage statistics computed via SQL 

870 """ 

871 # Build filter conditions 

872 conditions = [TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date] 

873 if token_jti: 

874 conditions.append(TokenUsageLog.token_jti == token_jti) 

875 

876 base_filter = and_(*conditions) 

877 

878 # Main stats query using SQL aggregation 

879 # Match Python behavior: 

880 # - status_code must be non-null AND non-zero AND < 400 for success count 

881 # - response_time_ms must be non-null AND non-zero for average (Python: if log.response_time_ms) 

882 stats_query = ( 

883 select( 

884 func.count().label("total"), # pylint: disable=not-callable 

885 func.sum( 

886 case( 

887 (and_(TokenUsageLog.status_code.isnot(None), TokenUsageLog.status_code > 0, TokenUsageLog.status_code < 400), 1), 

888 else_=0, 

889 ) 

890 ).label("successful"), 

891 func.sum(case((TokenUsageLog.blocked.is_(True), 1), else_=0)).label("blocked"), 

892 # Only average non-null and non-zero response times (NULL values are ignored by AVG) 

893 func.avg( 

894 case( 

895 (and_(TokenUsageLog.response_time_ms.isnot(None), TokenUsageLog.response_time_ms > 0), TokenUsageLog.response_time_ms), 

896 else_=None, 

897 ) 

898 ).label("avg_response"), 

899 ) 

900 .select_from(TokenUsageLog) 

901 .where(base_filter) 

902 ) 

903 

904 result = self.db.execute(stats_query).fetchone() 

905 

906 total_requests = result.total or 0 

907 successful_requests = result.successful or 0 

908 blocked_requests = result.blocked or 0 

909 avg_response_time = float(result.avg_response) if result.avg_response else 0.0 

910 

911 # Top endpoints query using SQL GROUP BY 

912 # Match Python behavior: exclude None AND empty string endpoints (Python: if log.endpoint) 

913 endpoints_query = ( 

914 select(TokenUsageLog.endpoint, func.count().label("count")) # pylint: disable=not-callable 

915 .where(and_(base_filter, TokenUsageLog.endpoint.isnot(None), TokenUsageLog.endpoint != "")) 

916 .group_by(TokenUsageLog.endpoint) 

917 .order_by(func.count().desc()) # pylint: disable=not-callable 

918 .limit(5) 

919 ) 

920 

921 endpoints_result = self.db.execute(endpoints_query).fetchall() 

922 top_endpoints = [(row.endpoint, row.count) for row in endpoints_result] 

923 

924 return { 

925 "period_days": days, 

926 "total_requests": total_requests, 

927 "successful_requests": successful_requests, 

928 "blocked_requests": blocked_requests, 

929 "success_rate": successful_requests / total_requests if total_requests > 0 else 0, 

930 "average_response_time_ms": round(avg_response_time, 2), 

931 "top_endpoints": top_endpoints, 

932 } 

933 

934 async def _get_usage_stats_python(self, user_email: str, start_date: datetime, token_jti: Optional[str], days: int) -> dict: 

935 """Compute usage stats using Python (fallback for SQLite). 

936 

937 Args: 

938 user_email: User's email address 

939 start_date: Start date for analysis 

940 token_jti: Optional token JTI filter 

941 days: Number of days being analyzed 

942 

943 Returns: 

944 dict: Usage statistics computed in Python 

945 """ 

946 query = select(TokenUsageLog).where(and_(TokenUsageLog.user_email == user_email, TokenUsageLog.timestamp >= start_date)) 

947 

948 if token_jti: 

949 query = query.where(TokenUsageLog.token_jti == token_jti) 

950 

951 usage_logs = self.db.execute(query).scalars().all() 

952 

953 # Calculate statistics 

954 total_requests = len(usage_logs) 

955 successful_requests = sum(1 for log in usage_logs if log.status_code and log.status_code < 400) 

956 blocked_requests = sum(1 for log in usage_logs if log.blocked) 

957 

958 # Average response time 

959 response_times = [log.response_time_ms for log in usage_logs if log.response_time_ms] 

960 avg_response_time = sum(response_times) / len(response_times) if response_times else 0 

961 

962 # Most accessed endpoints 

963 endpoint_counts: dict = {} 

964 for log in usage_logs: 

965 if log.endpoint: 965 ↛ 964line 965 didn't jump to line 964 because the condition on line 965 was always true

966 endpoint_counts[log.endpoint] = endpoint_counts.get(log.endpoint, 0) + 1 

967 

968 top_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)[:5] 

969 

970 return { 

971 "period_days": days, 

972 "total_requests": total_requests, 

973 "successful_requests": successful_requests, 

974 "blocked_requests": blocked_requests, 

975 "success_rate": successful_requests / total_requests if total_requests > 0 else 0, 

976 "average_response_time_ms": round(avg_response_time, 2), 

977 "top_endpoints": top_endpoints, 

978 } 

979 

980 async def get_token_revocation(self, jti: str) -> Optional[TokenRevocation]: 

981 """Get token revocation information by JTI. 

982 

983 Args: 

984 jti: JWT token ID 

985 

986 Returns: 

987 Optional[TokenRevocation]: Revocation info if token is revoked 

988 

989 Examples: 

990 >>> service = TokenCatalogService(None) # Would use real DB session 

991 >>> # Returns Optional[TokenRevocation] if token is revoked 

992 """ 

993 result = self.db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

994 return result.scalar_one_or_none() 

995 

996 async def cleanup_expired_tokens(self) -> int: 

997 """Clean up expired tokens using bulk UPDATE. 

998 

999 Uses a single SQL UPDATE statement instead of loading tokens into memory 

1000 and updating them one by one. This is more efficient and avoids memory 

1001 issues when many tokens expire at once. 

1002 

1003 Returns: 

1004 int: Number of tokens cleaned up 

1005 

1006 Examples: 

1007 >>> service = TokenCatalogService(None) # Would use real DB session 

1008 >>> # Returns int: Number of tokens cleaned up 

1009 """ 

1010 try: 

1011 now = utc_now() 

1012 count = self.db.query(EmailApiToken).filter(EmailApiToken.expires_at < now, EmailApiToken.is_active.is_(True)).update({"is_active": False}, synchronize_session=False) 

1013 

1014 self.db.commit() 

1015 

1016 if count > 0: 

1017 logger.info(f"Cleaned up {count} expired tokens") 

1018 

1019 return count 

1020 

1021 except Exception as e: 

1022 self.db.rollback() 

1023 logger.error(f"Failed to cleanup expired tokens: {e}") 

1024 return 0