Coverage for mcpgateway / services / permission_service.py: 99%

173 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/permission_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Permission Service for RBAC System. 

8 

9This module provides the core permission checking logic for the RBAC system. 

10It handles role-based permission validation, permission auditing, and caching. 

11""" 

12 

13# Standard 

14from datetime import datetime 

15import logging 

16from typing import Dict, List, Optional, Set 

17 

18# Third-Party 

19from sqlalchemy import and_, or_, select 

20from sqlalchemy.orm import contains_eager, Session 

21 

22# First-Party 

23from mcpgateway.config import settings 

24from mcpgateway.db import PermissionAuditLog, Permissions, Role, UserRole, utc_now 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class PermissionService: 

30 """Service for checking and managing user permissions. 

31 

32 Provides role-based permission checking with caching, auditing, 

33 and support for global, team, and personal scopes. 

34 

35 Attributes: 

36 db: Database session 

37 audit_enabled: Whether to log permission checks 

38 cache_ttl: Permission cache TTL in seconds 

39 

40 Examples: 

41 Basic construction and coroutine checks: 

42 >>> from unittest.mock import Mock 

43 >>> service = PermissionService(Mock()) 

44 >>> isinstance(service, PermissionService) 

45 True 

46 >>> import asyncio 

47 >>> asyncio.iscoroutinefunction(service.check_permission) 

48 True 

49 >>> asyncio.iscoroutinefunction(service.get_user_permissions) 

50 True 

51 """ 

52 

53 def __init__(self, db: Session, audit_enabled: Optional[bool] = None): 

54 """Initialize permission service. 

55 

56 Args: 

57 db: Database session 

58 audit_enabled: Whether to enable permission auditing (defaults to settings.permission_audit_enabled / PERMISSION_AUDIT_ENABLED) 

59 """ 

60 self.db = db 

61 if audit_enabled is None: 

62 audit_enabled = settings.permission_audit_enabled 

63 self.audit_enabled = audit_enabled 

64 self._permission_cache: Dict[str, Set[str]] = {} 

65 self._roles_cache: Dict[str, List[UserRole]] = {} 

66 self._cache_timestamps: Dict[str, datetime] = {} 

67 self.cache_ttl = 300 # 5 minutes 

68 

69 async def check_permission( 

70 self, 

71 user_email: str, 

72 permission: str, 

73 resource_type: Optional[str] = None, 

74 resource_id: Optional[str] = None, 

75 team_id: Optional[str] = None, 

76 ip_address: Optional[str] = None, 

77 user_agent: Optional[str] = None, 

78 allow_admin_bypass: bool = True, 

79 check_any_team: bool = False, 

80 ) -> bool: 

81 """Check if user has specific permission. 

82 

83 Checks user's roles across all applicable scopes (global, team, personal) 

84 and returns True if any role grants the required permission. 

85 

86 Args: 

87 user_email: Email of the user to check 

88 permission: Permission to check (e.g., 'tools.create') 

89 resource_type: Type of resource being accessed 

90 resource_id: Specific resource ID if applicable 

91 team_id: Team context for the permission check 

92 ip_address: IP address for audit logging 

93 user_agent: User agent for audit logging 

94 allow_admin_bypass: If True, admin users bypass all permission checks. 

95 If False, admins must have explicit permissions. 

96 Default is True for backward compatibility. 

97 check_any_team: If True, check permission across ALL team-scoped roles 

98 (used for list/read endpoints with multi-team session tokens) 

99 

100 Returns: 

101 bool: True if permission is granted, False otherwise 

102 

103 Examples: 

104 Parameter validation helpers: 

105 >>> permission = "users.read" 

106 >>> permission.count('.') == 1 

107 True 

108 >>> team_id = "team-123" 

109 >>> isinstance(team_id, str) 

110 True 

111 >>> from unittest.mock import Mock 

112 >>> service = PermissionService(Mock()) 

113 >>> import asyncio 

114 >>> asyncio.iscoroutinefunction(service.check_permission) 

115 True 

116 """ 

117 try: 

118 # Check if user is admin (bypass all permission checks if allowed) 

119 if allow_admin_bypass and await self._is_user_admin(user_email): 

120 return True 

121 

122 # Get user's effective permissions (uses cache when valid) 

123 user_permissions = await self.get_user_permissions(user_email, team_id, include_all_teams=check_any_team) 

124 

125 # Check if user has the specific permission or wildcard 

126 granted = permission in user_permissions or Permissions.ALL_PERMISSIONS in user_permissions 

127 

128 # If no explicit permissions found, check fallback permissions for team operations 

129 if not granted and permission.startswith("teams."): 

130 granted = await self._check_team_fallback_permissions(user_email, permission, team_id) 

131 

132 # If no explicit permissions found, check fallback permissions for token operations 

133 if not granted and permission.startswith("tokens."): 

134 granted = await self._check_token_fallback_permissions(user_email, permission) 

135 

136 # Log the permission check if auditing is enabled 

137 if self.audit_enabled: 

138 # Reuse roles cached by get_user_permissions (no second query) 

139 roles_checked = self._get_roles_for_audit(user_email, team_id) 

140 await self._log_permission_check( 

141 user_email=user_email, 

142 permission=permission, 

143 resource_type=resource_type, 

144 resource_id=resource_id, 

145 team_id=team_id, 

146 granted=granted, 

147 roles_checked=roles_checked, 

148 ip_address=ip_address, 

149 user_agent=user_agent, 

150 ) 

151 

152 logger.debug(f"Permission check: user={user_email}, permission={permission}, team={team_id}, granted={granted}") 

153 

154 return granted 

155 

156 except Exception as e: 

157 logger.error(f"Error checking permission for {user_email}: {e}") 

158 # Default to deny on error 

159 return False 

160 

161 async def has_admin_permission(self, user_email: str) -> bool: 

162 """Check if user has any admin-level permission. 

163 

164 This is used by AdminAuthMiddleware to allow access to /admin/* routes 

165 for users who have admin permissions via RBAC, even if they're not 

166 marked as is_admin in the database. 

167 

168 Args: 

169 user_email: Email of the user to check 

170 

171 Returns: 

172 bool: True if user is an admin OR has any admin.* permission 

173 """ 

174 try: 

175 # First check if user is a database admin 

176 if await self._is_user_admin(user_email): 

177 return True 

178 

179 # Get user's permissions and check for any admin.* permission 

180 user_permissions = await self.get_user_permissions(user_email) 

181 

182 # Check for wildcard or any admin permission 

183 if Permissions.ALL_PERMISSIONS in user_permissions: 

184 return True 

185 

186 # Check for any admin.* permission 

187 for perm in user_permissions: 

188 if perm.startswith("admin."): 

189 return True 

190 

191 return False 

192 

193 except Exception as e: 

194 logger.error(f"Error checking admin permission for {user_email}: {e}") 

195 return False 

196 

197 async def get_user_permissions(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> Set[str]: 

198 """Get all effective permissions for a user. 

199 

200 Collects permissions from all user's roles across applicable scopes. 

201 Includes role inheritance and handles permission caching. 

202 

203 Args: 

204 user_email: Email of the user 

205 team_id: Optional team context 

206 include_all_teams: If True, include ALL team-scoped roles (for list/read endpoints) 

207 

208 Returns: 

209 Set[str]: All effective permissions for the user 

210 

211 Examples: 

212 Key shapes and coroutine check: 

213 >>> cache_key = f"user@example.com:{'global'}" 

214 >>> ':' in cache_key 

215 True 

216 >>> from unittest.mock import Mock 

217 >>> service = PermissionService(Mock()) 

218 >>> import asyncio 

219 >>> asyncio.iscoroutinefunction(service.get_user_permissions) 

220 True 

221 """ 

222 # Use distinct cache key for any-team lookups to avoid poisoning global cache 

223 if include_all_teams: 

224 cache_key = f"{user_email}:__anyteam__" 

225 else: 

226 cache_key = f"{user_email}:{team_id or 'global'}" 

227 if self._is_cache_valid(cache_key): 

228 cached_perms = self._permission_cache[cache_key] 

229 logger.debug(f"[RBAC] Cache hit for {user_email} (team_id={team_id}): {cached_perms}") 

230 return cached_perms 

231 

232 permissions = set() 

233 

234 # Get all active roles for the user (with eager-loaded role relationship) 

235 user_roles = await self._get_user_roles(user_email, team_id, include_all_teams=include_all_teams) 

236 logger.debug(f"[RBAC] Found {len(user_roles)} roles for {user_email} (team_id={team_id})") 

237 

238 # Collect permissions from all roles 

239 for user_role in user_roles: 

240 role_permissions = user_role.role.get_effective_permissions() 

241 logger.debug(f"[RBAC] Role '{user_role.role.name}' (scope={user_role.scope}, scope_id={user_role.scope_id}) has permissions: {role_permissions}") 

242 permissions.update(role_permissions) 

243 

244 # Cache both permissions and roles 

245 self._permission_cache[cache_key] = permissions 

246 self._roles_cache[cache_key] = user_roles 

247 self._cache_timestamps[cache_key] = utc_now() 

248 

249 return permissions 

250 

251 async def get_user_roles(self, user_email: str, scope: Optional[str] = None, team_id: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

252 """Get user's role assignments. 

253 

254 Args: 

255 user_email: Email of the user 

256 scope: Filter by scope ('global', 'team', 'personal') 

257 team_id: Filter by team ID 

258 include_expired: Whether to include expired roles 

259 

260 Returns: 

261 List[UserRole]: User's role assignments 

262 

263 Examples: 

264 Coroutine check: 

265 >>> from unittest.mock import Mock 

266 >>> service = PermissionService(Mock()) 

267 >>> import asyncio 

268 >>> asyncio.iscoroutinefunction(service.get_user_roles) 

269 True 

270 """ 

271 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

272 

273 if scope: 

274 query = query.where(UserRole.scope == scope) 

275 

276 if team_id: 

277 query = query.where(UserRole.scope_id == team_id) 

278 

279 if not include_expired: 

280 now = utc_now() 

281 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

282 

283 result = self.db.execute(query) 

284 user_roles = result.scalars().all() 

285 return user_roles 

286 

287 async def has_permission_on_resource(self, user_email: str, permission: str, resource_type: str, resource_id: str, team_id: Optional[str] = None) -> bool: 

288 """Check if user has permission on a specific resource. 

289 

290 This method can be extended to include resource-specific 

291 permission logic (e.g., resource ownership, sharing rules). 

292 

293 Args: 

294 user_email: Email of the user 

295 permission: Permission to check 

296 resource_type: Type of resource 

297 resource_id: Specific resource ID 

298 team_id: Team context 

299 

300 Returns: 

301 bool: True if user has permission on the resource 

302 

303 Examples: 

304 Coroutine check and parameter sanity: 

305 >>> from unittest.mock import Mock 

306 >>> service = PermissionService(Mock()) 

307 >>> import asyncio 

308 >>> asyncio.iscoroutinefunction(service.has_permission_on_resource) 

309 True 

310 >>> res_type, res_id = "tools", "tool-123" 

311 >>> all(isinstance(x, str) for x in (res_type, res_id)) 

312 True 

313 """ 

314 # Basic permission check 

315 if not await self.check_permission(user_email=user_email, permission=permission, resource_type=resource_type, resource_id=resource_id, team_id=team_id): 

316 return False 

317 

318 # NOTE: Add resource-specific logic here in future enhancement 

319 # For example: 

320 # - Check resource ownership 

321 # - Check resource sharing permissions 

322 # - Check resource team membership 

323 

324 return True 

325 

326 async def check_resource_ownership(self, user_email: str, resource: any, allow_team_admin: bool = True) -> bool: 

327 """Check if user owns a resource or is a team admin for team resources. 

328 

329 This method checks resource ownership based on the owner_email field 

330 and optionally allows team admins to modify team-scoped resources. 

331 

332 Args: 

333 user_email: Email of the user to check 

334 resource: Resource object with owner_email, team_id, and visibility attributes 

335 allow_team_admin: Whether to allow team admins for team-scoped resources 

336 

337 Returns: 

338 bool: True if user owns the resource or is authorized team admin 

339 

340 Examples: 

341 >>> from unittest.mock import Mock 

342 >>> service = PermissionService(Mock()) 

343 >>> import asyncio 

344 >>> asyncio.iscoroutinefunction(service.check_resource_ownership) 

345 True 

346 """ 

347 # Check if user is platform admin (bypass ownership checks) 

348 if await self._is_user_admin(user_email): 

349 return True 

350 

351 # Check direct ownership 

352 if hasattr(resource, "owner_email") and resource.owner_email == user_email: 

353 return True 

354 

355 # Check team admin permission for team resources 

356 if allow_team_admin and hasattr(resource, "visibility") and resource.visibility == "team": 

357 if hasattr(resource, "team_id") and resource.team_id: 357 ↛ 362line 357 didn't jump to line 362 because the condition on line 357 was always true

358 user_role = await self._get_user_team_role(user_email, resource.team_id) 

359 if user_role == "owner": 

360 return True 

361 

362 return False 

363 

364 async def check_admin_permission(self, user_email: str) -> bool: 

365 """Check if user has any admin permissions. 

366 

367 Args: 

368 user_email: Email of the user 

369 

370 Returns: 

371 bool: True if user has admin permissions 

372 

373 Examples: 

374 Coroutine check: 

375 >>> from unittest.mock import Mock 

376 >>> service = PermissionService(Mock()) 

377 >>> import asyncio 

378 >>> asyncio.iscoroutinefunction(service.check_admin_permission) 

379 True 

380 """ 

381 # First check if user is admin (handles platform admin virtual user) 

382 if await self._is_user_admin(user_email): 

383 return True 

384 

385 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS] 

386 

387 user_permissions = await self.get_user_permissions(user_email) 

388 return any(perm in user_permissions for perm in admin_permissions) 

389 

390 def clear_user_cache(self, user_email: str) -> None: 

391 """Clear cached permissions for a user. 

392 

393 Should be called when user's roles change. 

394 

395 Args: 

396 user_email: Email of the user 

397 

398 Examples: 

399 Cache invalidation behavior: 

400 >>> from unittest.mock import Mock 

401 >>> service = PermissionService(Mock()) 

402 >>> service._permission_cache = {"alice:global": {"tools.read"}, "bob:team1": {"*"}} 

403 >>> service._cache_timestamps = {"alice:global": utc_now(), "bob:team1": utc_now()} 

404 >>> service.clear_user_cache("alice") 

405 >>> "alice:global" in service._permission_cache 

406 False 

407 >>> "bob:team1" in service._permission_cache 

408 True 

409 """ 

410 keys_to_remove = [key for key in self._permission_cache if key.startswith(f"{user_email}:")] 

411 

412 for key in keys_to_remove: 

413 self._permission_cache.pop(key, None) 

414 self._roles_cache.pop(key, None) 

415 self._cache_timestamps.pop(key, None) 

416 

417 logger.debug(f"Cleared permission cache for user: {user_email}") 

418 

419 def clear_cache(self) -> None: 

420 """Clear all cached permissions. 

421 

422 Examples: 

423 Clear all cache: 

424 >>> from unittest.mock import Mock 

425 >>> service = PermissionService(Mock()) 

426 >>> service._permission_cache = {"x": {"p"}} 

427 >>> service._cache_timestamps = {"x": utc_now()} 

428 >>> service.clear_cache() 

429 >>> service._permission_cache == {} 

430 True 

431 >>> service._cache_timestamps == {} 

432 True 

433 """ 

434 self._permission_cache.clear() 

435 self._roles_cache.clear() 

436 self._cache_timestamps.clear() 

437 logger.debug("Cleared all permission cache") 

438 

439 async def _get_user_roles(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> List[UserRole]: 

440 """Get user roles for permission checking. 

441 

442 Always includes global and personal roles. Team-scoped role inclusion 

443 depends on the parameters: 

444 

445 - team_id provided: includes team roles for that specific team 

446 (plus team roles with scope_id=NULL which apply to all teams) 

447 - team_id=None, include_all_teams=True: includes ALL team-scoped roles 

448 - team_id=None, include_all_teams=False: includes only team-scoped roles 

449 with scope_id=NULL (roles that apply to all teams, e.g. during login) 

450 

451 Args: 

452 user_email: Email address of the user 

453 team_id: Optional team ID to filter to a specific team's roles 

454 include_all_teams: If True, include ALL team-scoped roles (for list/read with session tokens) 

455 

456 Returns: 

457 List[UserRole]: List of active roles for the user 

458 """ 

459 query = select(UserRole).join(Role).options(contains_eager(UserRole.role)).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

460 

461 # Include global roles and personal roles 

462 scope_conditions = [UserRole.scope == "global", UserRole.scope == "personal"] 

463 

464 if team_id: 

465 # Filter to specific team's roles only 

466 scope_conditions.append(and_(UserRole.scope == "team", or_(UserRole.scope_id == team_id, UserRole.scope_id.is_(None)))) 

467 elif include_all_teams: 

468 # Include ALL team-scoped roles (for list/read endpoints with session tokens) 

469 scope_conditions.append(UserRole.scope == "team") 

470 else: 

471 # When team_id is None and include_all_teams is False (e.g., during login), 

472 # include team-scoped roles with scope_id=None (roles that apply to all teams) 

473 scope_conditions.append(and_(UserRole.scope == "team", UserRole.scope_id.is_(None))) 

474 

475 query = query.where(or_(*scope_conditions)) 

476 

477 # Filter out expired roles 

478 now = utc_now() 

479 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

480 

481 result = self.db.execute(query) 

482 user_roles = result.unique().scalars().all() 

483 return user_roles 

484 

485 async def _log_permission_check( 

486 self, 

487 user_email: str, 

488 permission: str, 

489 resource_type: Optional[str], 

490 resource_id: Optional[str], 

491 team_id: Optional[str], 

492 granted: bool, 

493 roles_checked: Dict, 

494 ip_address: Optional[str], 

495 user_agent: Optional[str], 

496 ) -> None: 

497 """Log permission check for auditing. 

498 

499 Args: 

500 user_email: Email address of the user 

501 permission: Permission being checked 

502 resource_type: Type of resource being accessed 

503 resource_id: ID of specific resource 

504 team_id: ID of team context 

505 granted: Whether permission was granted 

506 roles_checked: Dictionary of roles that were checked 

507 ip_address: IP address of request 

508 user_agent: User agent of request 

509 """ 

510 audit_log = PermissionAuditLog( 

511 user_email=user_email, 

512 permission=permission, 

513 resource_type=resource_type, 

514 resource_id=resource_id, 

515 team_id=team_id, 

516 granted=granted, 

517 roles_checked=roles_checked, 

518 ip_address=ip_address, 

519 user_agent=user_agent, 

520 ) 

521 

522 self.db.add(audit_log) 

523 self.db.commit() 

524 

525 def _get_roles_for_audit(self, user_email: str, team_id: Optional[str]) -> Dict: 

526 """Get role information for audit logging from cached roles. 

527 

528 Uses roles cached by get_user_permissions() to avoid a duplicate DB query. 

529 

530 Args: 

531 user_email: Email address of the user. 

532 team_id: Optional team ID for context. 

533 

534 Returns: 

535 Dict: Role information for audit logging 

536 """ 

537 cache_key = f"{user_email}:{team_id or 'global'}" 

538 user_roles = self._roles_cache.get(cache_key, []) 

539 return {"roles": [{"id": ur.role_id, "name": ur.role.name, "scope": ur.scope, "permissions": ur.role.permissions} for ur in user_roles]} 

540 

541 def _is_cache_valid(self, cache_key: str) -> bool: 

542 """Check if cached permissions are still valid. 

543 

544 Args: 

545 cache_key: Cache key to check validity for 

546 

547 Returns: 

548 bool: True if cache is valid, False otherwise 

549 """ 

550 if cache_key not in self._permission_cache: 

551 return False 

552 

553 if cache_key not in self._cache_timestamps: 

554 return False 

555 

556 age = utc_now() - self._cache_timestamps[cache_key] 

557 return age.total_seconds() < self.cache_ttl 

558 

559 async def _is_user_admin(self, user_email: str) -> bool: 

560 """Check if user is admin by looking up user record directly. 

561 

562 Args: 

563 user_email: Email address of the user 

564 

565 Returns: 

566 bool: True if user is admin 

567 """ 

568 # First-Party 

569 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel 

570 

571 # Special case for platform admin (virtual user) 

572 if user_email == getattr(settings, "platform_admin_email", ""): 

573 return True 

574 

575 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none() 

576 return bool(user and user.is_admin) 

577 

578 async def _check_team_fallback_permissions(self, user_email: str, permission: str, team_id: Optional[str]) -> bool: 

579 """Check fallback team permissions for users without explicit RBAC roles. 

580 

581 This provides basic team management permissions for authenticated users on teams they belong to. 

582 

583 Args: 

584 user_email: Email address of the user 

585 permission: Permission being checked 

586 team_id: Team ID context 

587 

588 Returns: 

589 bool: True if user has fallback permission 

590 """ 

591 if not team_id: 

592 # For global team operations, allow authenticated users to read their teams and create new teams 

593 if permission in ["teams.create", "teams.read"]: 

594 return True 

595 return False 

596 

597 # Get user's role in the team (single query instead of two separate queries) 

598 user_role = await self._get_user_team_role(user_email, team_id) 

599 

600 # If user is not a member (role is None), deny access 

601 if user_role is None: 

602 return False 

603 

604 # Define fallback permissions based on team role 

605 if user_role == "owner": 

606 # Team owners get full permissions on their teams 

607 return permission in ["teams.read", "teams.update", "teams.delete", "teams.manage_members", "teams.create"] 

608 if user_role in ["member"]: 

609 # Team members get basic read permissions 

610 return permission in ["teams.read"] 

611 

612 return False 

613 

614 async def _is_team_member(self, user_email: str, team_id: str) -> bool: 

615 """Check if user is a member of the specified team. 

616 

617 Note: This method delegates to _get_user_team_role to avoid duplicate DB queries. 

618 

619 Args: 

620 user_email: Email address of the user 

621 team_id: Team ID 

622 

623 Returns: 

624 bool: True if user is a team member 

625 """ 

626 # Delegate to _get_user_team_role to avoid duplicate query 

627 return await self._get_user_team_role(user_email, team_id) is not None 

628 

629 async def _get_user_team_role(self, user_email: str, team_id: str) -> Optional[str]: 

630 """Get user's role in the specified team. 

631 

632 Args: 

633 user_email: Email address of the user 

634 team_id: Team ID 

635 

636 Returns: 

637 Optional[str]: User's role in the team or None if not a member 

638 """ 

639 # First-Party 

640 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

641 

642 member = self.db.execute(select(EmailTeamMember).where(and_(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active))).scalar_one_or_none() 

643 self.db.commit() # Release transaction to avoid idle-in-transaction 

644 

645 return member.role if member else None 

646 

647 async def _check_token_fallback_permissions(self, _user_email: str, permission: str) -> bool: 

648 """Check fallback token permissions for authenticated users. 

649 

650 All authenticated users can manage their own tokens. The token endpoints 

651 already filter by user_email, so this just grants access to the endpoints. 

652 

653 Args: 

654 _user_email: Email address of the user (unused) 

655 permission: Permission being checked 

656 

657 Returns: 

658 bool: True if user has fallback permission for token operations 

659 """ 

660 # Any authenticated user can create, read, update, and revoke their own tokens 

661 # The actual filtering by user_email happens in the token service layer 

662 if permission in ["tokens.create", "tokens.read", "tokens.update", "tokens.revoke"]: 

663 return True 

664 

665 return False