Coverage for mcpgateway / services / permission_service.py: 100%

173 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/permission_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Permission Service for RBAC System. 

8 

9This module provides the core permission checking logic for the RBAC system. 

10It handles role-based permission validation, permission auditing, and caching. 

11""" 

12 

13# Standard 

14from datetime import datetime 

15import logging 

16from typing import Dict, List, Optional, Set 

17 

18# Third-Party 

19from sqlalchemy import and_, or_, select 

20from sqlalchemy.orm import contains_eager, Session 

21 

22# First-Party 

23from mcpgateway.config import settings 

24from mcpgateway.db import PermissionAuditLog, Permissions, Role, UserRole, utc_now 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class PermissionService: 

30 """Service for checking and managing user permissions. 

31 

32 Provides role-based permission checking with caching, auditing, 

33 and support for global, team, and personal scopes. 

34 

35 Attributes: 

36 db: Database session 

37 audit_enabled: Whether to log permission checks 

38 cache_ttl: Permission cache TTL in seconds 

39 

40 Examples: 

41 Basic construction and coroutine checks: 

42 >>> from unittest.mock import Mock 

43 >>> service = PermissionService(Mock()) 

44 >>> isinstance(service, PermissionService) 

45 True 

46 >>> import asyncio 

47 >>> asyncio.iscoroutinefunction(service.check_permission) 

48 True 

49 >>> asyncio.iscoroutinefunction(service.get_user_permissions) 

50 True 

51 """ 

52 

53 def __init__(self, db: Session, audit_enabled: Optional[bool] = None): 

54 """Initialize permission service. 

55 

56 Args: 

57 db: Database session 

58 audit_enabled: Whether to enable permission auditing (defaults to settings.permission_audit_enabled / PERMISSION_AUDIT_ENABLED) 

59 """ 

60 self.db = db 

61 if audit_enabled is None: 

62 audit_enabled = settings.permission_audit_enabled 

63 self.audit_enabled = audit_enabled 

64 self._permission_cache: Dict[str, Set[str]] = {} 

65 self._roles_cache: Dict[str, List[UserRole]] = {} 

66 self._cache_timestamps: Dict[str, datetime] = {} 

67 self.cache_ttl = 300 # 5 minutes 

68 

69 async def check_permission( 

70 self, 

71 user_email: str, 

72 permission: str, 

73 resource_type: Optional[str] = None, 

74 resource_id: Optional[str] = None, 

75 team_id: Optional[str] = None, 

76 token_teams: Optional[List[str]] = None, 

77 ip_address: Optional[str] = None, 

78 user_agent: Optional[str] = None, 

79 allow_admin_bypass: bool = True, 

80 check_any_team: bool = False, 

81 ) -> bool: 

82 """Check if user has specific permission. 

83 

84 Checks user's roles across all applicable scopes (global, team, personal) 

85 and returns True if any role grants the required permission. 

86 

87 Args: 

88 user_email: Email of the user to check 

89 permission: Permission to check (e.g., 'tools.create') 

90 resource_type: Type of resource being accessed 

91 resource_id: Specific resource ID if applicable 

92 team_id: Team context for the permission check 

93 token_teams: Normalized token team scope from auth context. 

94 `[]` means public-only scope; `None` means unrestricted 

95 admin scope (when allowed by token semantics). 

96 ip_address: IP address for audit logging 

97 user_agent: User agent for audit logging 

98 allow_admin_bypass: If True, admin users bypass all permission checks. 

99 If False, admins must have explicit permissions. 

100 Default is True for backward compatibility. 

101 check_any_team: If True, check permission across ALL team-scoped roles 

102 (used for list/read endpoints with multi-team session tokens) 

103 

104 Returns: 

105 bool: True if permission is granted, False otherwise 

106 

107 Examples: 

108 Parameter validation helpers: 

109 >>> permission = "users.read" 

110 >>> permission.count('.') == 1 

111 True 

112 >>> team_id = "team-123" 

113 >>> isinstance(team_id, str) 

114 True 

115 >>> from unittest.mock import Mock 

116 >>> service = PermissionService(Mock()) 

117 >>> import asyncio 

118 >>> asyncio.iscoroutinefunction(service.check_permission) 

119 True 

120 """ 

121 try: 

122 # SECURITY: Public-only tokens (teams=[]) must never satisfy admin.* 

123 # permissions, even when the backing user identity is an admin. 

124 if permission.startswith("admin.") and token_teams is not None and len(token_teams) == 0: 

125 logger.warning(f"Permission denied for public-only token: user={user_email}, permission={permission}") 

126 return False 

127 

128 # Check if user is admin (bypass all permission checks if allowed) 

129 if allow_admin_bypass and await self._is_user_admin(user_email): 

130 return True 

131 

132 # Get user's effective permissions (uses cache when valid) 

133 user_permissions = await self.get_user_permissions(user_email, team_id, include_all_teams=check_any_team) 

134 

135 # Check if user has the specific permission or wildcard 

136 granted = permission in user_permissions or Permissions.ALL_PERMISSIONS in user_permissions 

137 

138 # Log the permission check if auditing is enabled 

139 if self.audit_enabled: 

140 # Reuse roles cached by get_user_permissions (no second query) 

141 roles_checked = self._get_roles_for_audit(user_email, team_id) 

142 await self._log_permission_check( 

143 user_email=user_email, 

144 permission=permission, 

145 resource_type=resource_type, 

146 resource_id=resource_id, 

147 team_id=team_id, 

148 granted=granted, 

149 roles_checked=roles_checked, 

150 ip_address=ip_address, 

151 user_agent=user_agent, 

152 ) 

153 

154 logger.debug(f"Permission check: user={user_email}, permission={permission}, team={team_id}, granted={granted}") 

155 

156 return granted 

157 

158 except Exception as e: 

159 logger.error(f"Error checking permission for {user_email}: {e}") 

160 # Default to deny on error 

161 return False 

162 

163 async def has_admin_permission(self, user_email: str, team_id: Optional[str] = None) -> bool: 

164 """Check if user has any admin-level permission. 

165 

166 This is used by AdminAuthMiddleware to allow access to /admin/* routes 

167 for users who have admin permissions via RBAC, even if they're not 

168 marked as is_admin in the database. 

169 

170 When team_id is provided (team-scoped request), team-scoped roles are 

171 included in the permission check. When team_id is None, only global 

172 and personal roles are evaluated (original behavior). 

173 

174 Args: 

175 user_email: Email of the user to check 

176 team_id: Optional team ID for team-scoped permission checks. 

177 Must be pre-validated against the user's DB-resolved teams 

178 before passing here. 

179 

180 Returns: 

181 bool: True if user is an admin OR has any admin.* permission 

182 """ 

183 try: 

184 # First check if user is a database admin 

185 if await self._is_user_admin(user_email): 

186 return True 

187 

188 # Get user's permissions and check for any admin.* permission. 

189 # When team_id is provided, this includes team-scoped roles for 

190 # that team, allowing team members with admin.dashboard to access 

191 # the admin UI in their team context. 

192 user_permissions = await self.get_user_permissions(user_email, team_id=team_id) 

193 

194 # Check for wildcard or any admin permission 

195 if Permissions.ALL_PERMISSIONS in user_permissions: 

196 return True 

197 

198 # Check for any admin.* permission 

199 for perm in user_permissions: 

200 if perm.startswith("admin."): 

201 return True 

202 

203 return False 

204 

205 except Exception as e: 

206 logger.error(f"Error checking admin permission for {user_email}: {e}") 

207 return False 

208 

209 async def get_user_permissions(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> Set[str]: 

210 """Get all effective permissions for a user. 

211 

212 Collects permissions from all user's roles across applicable scopes. 

213 Includes role inheritance and handles permission caching. 

214 

215 Args: 

216 user_email: Email of the user 

217 team_id: Optional team context 

218 include_all_teams: If True, include ALL team-scoped roles (for list/read endpoints) 

219 

220 Returns: 

221 Set[str]: All effective permissions for the user 

222 

223 Examples: 

224 Key shapes and coroutine check: 

225 >>> cache_key = f"user@example.com:{'global'}" 

226 >>> ':' in cache_key 

227 True 

228 >>> from unittest.mock import Mock 

229 >>> service = PermissionService(Mock()) 

230 >>> import asyncio 

231 >>> asyncio.iscoroutinefunction(service.get_user_permissions) 

232 True 

233 """ 

234 # Use distinct cache key for any-team lookups to avoid poisoning global cache 

235 if include_all_teams: 

236 cache_key = f"{user_email}:__anyteam__" 

237 else: 

238 cache_key = f"{user_email}:{team_id or 'global'}" 

239 if self._is_cache_valid(cache_key): 

240 cached_perms = self._permission_cache[cache_key] 

241 logger.debug(f"[RBAC] Cache hit for {user_email} (team_id={team_id}): {cached_perms}") 

242 return cached_perms 

243 

244 permissions = set() 

245 

246 # Get all active roles for the user (with eager-loaded role relationship) 

247 user_roles = await self._get_user_roles(user_email, team_id, include_all_teams=include_all_teams) 

248 logger.debug(f"[RBAC] Found {len(user_roles)} roles for {user_email} (team_id={team_id})") 

249 

250 # Collect permissions from all roles 

251 for user_role in user_roles: 

252 role_permissions = user_role.role.get_effective_permissions() 

253 logger.debug(f"[RBAC] Role '{user_role.role.name}' (scope={user_role.scope}, scope_id={user_role.scope_id}) has permissions: {role_permissions}") 

254 permissions.update(role_permissions) 

255 

256 # Cache both permissions and roles 

257 self._permission_cache[cache_key] = permissions 

258 self._roles_cache[cache_key] = user_roles 

259 self._cache_timestamps[cache_key] = utc_now() 

260 

261 return permissions 

262 

263 async def get_user_roles(self, user_email: str, scope: Optional[str] = None, team_id: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

264 """Get user's role assignments. 

265 

266 Args: 

267 user_email: Email of the user 

268 scope: Filter by scope ('global', 'team', 'personal') 

269 team_id: Filter by team ID 

270 include_expired: Whether to include expired roles 

271 

272 Returns: 

273 List[UserRole]: User's role assignments 

274 

275 Examples: 

276 Coroutine check: 

277 >>> from unittest.mock import Mock 

278 >>> service = PermissionService(Mock()) 

279 >>> import asyncio 

280 >>> asyncio.iscoroutinefunction(service.get_user_roles) 

281 True 

282 """ 

283 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

284 

285 if scope: 

286 query = query.where(UserRole.scope == scope) 

287 

288 if team_id: 

289 query = query.where(UserRole.scope_id == team_id) 

290 

291 if not include_expired: 

292 now = utc_now() 

293 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

294 

295 result = self.db.execute(query) 

296 user_roles = result.scalars().all() 

297 return user_roles 

298 

299 async def has_permission_on_resource(self, user_email: str, permission: str, resource_type: str, resource_id: str, team_id: Optional[str] = None) -> bool: 

300 """Check if user has permission on a specific resource. 

301 

302 This method can be extended to include resource-specific 

303 permission logic (e.g., resource ownership, sharing rules). 

304 

305 Args: 

306 user_email: Email of the user 

307 permission: Permission to check 

308 resource_type: Type of resource 

309 resource_id: Specific resource ID 

310 team_id: Team context 

311 

312 Returns: 

313 bool: True if user has permission on the resource 

314 

315 Examples: 

316 Coroutine check and parameter sanity: 

317 >>> from unittest.mock import Mock 

318 >>> service = PermissionService(Mock()) 

319 >>> import asyncio 

320 >>> asyncio.iscoroutinefunction(service.has_permission_on_resource) 

321 True 

322 >>> res_type, res_id = "tools", "tool-123" 

323 >>> all(isinstance(x, str) for x in (res_type, res_id)) 

324 True 

325 """ 

326 # Basic permission check 

327 if not await self.check_permission(user_email=user_email, permission=permission, resource_type=resource_type, resource_id=resource_id, team_id=team_id): 

328 return False 

329 

330 # NOTE: Add resource-specific logic here in future enhancement 

331 # For example: 

332 # - Check resource ownership 

333 # - Check resource sharing permissions 

334 # - Check resource team membership 

335 

336 return True 

337 

338 async def check_resource_ownership(self, user_email: str, resource: any, allow_team_admin: bool = True) -> bool: 

339 """Check if user owns a resource or is a team admin for team resources. 

340 

341 This method checks resource ownership based on the owner_email field 

342 and optionally allows team admins to modify team-scoped resources. 

343 

344 Args: 

345 user_email: Email of the user to check 

346 resource: Resource object with owner_email, team_id, and visibility attributes 

347 allow_team_admin: Whether to allow team admins for team-scoped resources 

348 

349 Returns: 

350 bool: True if user owns the resource or is authorized team admin 

351 

352 Examples: 

353 >>> from unittest.mock import Mock 

354 >>> service = PermissionService(Mock()) 

355 >>> import asyncio 

356 >>> asyncio.iscoroutinefunction(service.check_resource_ownership) 

357 True 

358 """ 

359 # Check if user is platform admin (bypass ownership checks) 

360 if await self._is_user_admin(user_email): 

361 return True 

362 

363 # Check direct ownership 

364 if hasattr(resource, "owner_email") and resource.owner_email == user_email: 

365 return True 

366 

367 # Check team admin permission for team resources 

368 if allow_team_admin and hasattr(resource, "visibility") and resource.visibility == "team": 

369 if hasattr(resource, "team_id") and resource.team_id: 

370 user_role = await self._get_user_team_role(user_email, resource.team_id) 

371 if user_role == "owner": 

372 return True 

373 

374 return False 

375 

376 async def check_admin_permission(self, user_email: str) -> bool: 

377 """Check if user has any admin permissions. 

378 

379 Args: 

380 user_email: Email of the user 

381 

382 Returns: 

383 bool: True if user has admin permissions 

384 

385 Examples: 

386 Coroutine check: 

387 >>> from unittest.mock import Mock 

388 >>> service = PermissionService(Mock()) 

389 >>> import asyncio 

390 >>> asyncio.iscoroutinefunction(service.check_admin_permission) 

391 True 

392 """ 

393 # First check if user is admin (handles platform admin virtual user) 

394 if await self._is_user_admin(user_email): 

395 return True 

396 

397 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS] 

398 

399 user_permissions = await self.get_user_permissions(user_email) 

400 return any(perm in user_permissions for perm in admin_permissions) 

401 

402 def clear_user_cache(self, user_email: str) -> None: 

403 """Clear cached permissions for a user. 

404 

405 Should be called when user's roles change. 

406 

407 Args: 

408 user_email: Email of the user 

409 

410 Examples: 

411 Cache invalidation behavior: 

412 >>> from unittest.mock import Mock 

413 >>> service = PermissionService(Mock()) 

414 >>> service._permission_cache = {"alice:global": {"tools.read"}, "bob:team1": {"*"}} 

415 >>> service._cache_timestamps = {"alice:global": utc_now(), "bob:team1": utc_now()} 

416 >>> service.clear_user_cache("alice") 

417 >>> "alice:global" in service._permission_cache 

418 False 

419 >>> "bob:team1" in service._permission_cache 

420 True 

421 """ 

422 keys_to_remove = [key for key in self._permission_cache if key.startswith(f"{user_email}:")] 

423 

424 for key in keys_to_remove: 

425 self._permission_cache.pop(key, None) 

426 self._roles_cache.pop(key, None) 

427 self._cache_timestamps.pop(key, None) 

428 

429 logger.debug(f"Cleared permission cache for user: {user_email}") 

430 

431 def clear_cache(self) -> None: 

432 """Clear all cached permissions. 

433 

434 Examples: 

435 Clear all cache: 

436 >>> from unittest.mock import Mock 

437 >>> service = PermissionService(Mock()) 

438 >>> service._permission_cache = {"x": {"p"}} 

439 >>> service._cache_timestamps = {"x": utc_now()} 

440 >>> service.clear_cache() 

441 >>> service._permission_cache == {} 

442 True 

443 >>> service._cache_timestamps == {} 

444 True 

445 """ 

446 self._permission_cache.clear() 

447 self._roles_cache.clear() 

448 self._cache_timestamps.clear() 

449 logger.debug("Cleared all permission cache") 

450 

451 async def _get_user_roles(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> List[UserRole]: 

452 """Get user roles for permission checking. 

453 

454 Always includes global and personal roles. Team-scoped role inclusion 

455 depends on the parameters: 

456 

457 - team_id provided: includes team roles for that specific team 

458 (plus team roles with scope_id=NULL which apply to all teams) 

459 - team_id=None, include_all_teams=True: includes ALL team-scoped roles 

460 EXCEPT roles on personal teams (which auto-grant team_admin to every user) 

461 - team_id=None, include_all_teams=False: includes only team-scoped roles 

462 with scope_id=NULL (roles that apply to all teams, e.g. during login) 

463 

464 Args: 

465 user_email: Email address of the user 

466 team_id: Optional team ID to filter to a specific team's roles 

467 include_all_teams: If True, include ALL team-scoped roles (for list/read with session tokens) 

468 

469 Returns: 

470 List[UserRole]: List of active roles for the user 

471 """ 

472 query = select(UserRole).join(Role).options(contains_eager(UserRole.role)).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

473 

474 # Include global roles and personal roles 

475 scope_conditions = [UserRole.scope == "global", UserRole.scope == "personal"] 

476 

477 if team_id: 

478 # Filter to specific team's roles only 

479 scope_conditions.append(and_(UserRole.scope == "team", or_(UserRole.scope_id == team_id, UserRole.scope_id.is_(None)))) 

480 elif include_all_teams: 

481 # Include ALL team-scoped roles EXCEPT personal team roles. 

482 # Personal teams are auto-created for every user with team_admin permissions, 

483 # so including them would grant every user full mutate permissions (servers.create, 

484 # tools.create, etc.) when check_any_team=True, making RBAC ineffective. 

485 # First-Party 

486 from mcpgateway.db import EmailTeam # pylint: disable=import-outside-toplevel 

487 

488 scope_conditions.append( 

489 and_( 

490 UserRole.scope == "team", 

491 or_( 

492 UserRole.scope_id.is_(None), 

493 ~UserRole.scope_id.in_(select(EmailTeam.id).where(EmailTeam.is_personal.is_(True))), 

494 ), 

495 ) 

496 ) 

497 else: 

498 # When team_id is None and include_all_teams is False (e.g., during login), 

499 # include team-scoped roles with scope_id=None (roles that apply to all teams) 

500 scope_conditions.append(and_(UserRole.scope == "team", UserRole.scope_id.is_(None))) 

501 

502 query = query.where(or_(*scope_conditions)) 

503 

504 # Filter out expired roles 

505 now = utc_now() 

506 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

507 

508 result = self.db.execute(query) 

509 user_roles = result.unique().scalars().all() 

510 return user_roles 

511 

512 async def _log_permission_check( 

513 self, 

514 user_email: str, 

515 permission: str, 

516 resource_type: Optional[str], 

517 resource_id: Optional[str], 

518 team_id: Optional[str], 

519 granted: bool, 

520 roles_checked: Dict, 

521 ip_address: Optional[str], 

522 user_agent: Optional[str], 

523 ) -> None: 

524 """Log permission check for auditing. 

525 

526 Args: 

527 user_email: Email address of the user 

528 permission: Permission being checked 

529 resource_type: Type of resource being accessed 

530 resource_id: ID of specific resource 

531 team_id: ID of team context 

532 granted: Whether permission was granted 

533 roles_checked: Dictionary of roles that were checked 

534 ip_address: IP address of request 

535 user_agent: User agent of request 

536 """ 

537 audit_log = PermissionAuditLog( 

538 user_email=user_email, 

539 permission=permission, 

540 resource_type=resource_type, 

541 resource_id=resource_id, 

542 team_id=team_id, 

543 granted=granted, 

544 roles_checked=roles_checked, 

545 ip_address=ip_address, 

546 user_agent=user_agent, 

547 ) 

548 

549 self.db.add(audit_log) 

550 self.db.commit() 

551 

552 def _get_roles_for_audit(self, user_email: str, team_id: Optional[str]) -> Dict: 

553 """Get role information for audit logging from cached roles. 

554 

555 Uses roles cached by get_user_permissions() to avoid a duplicate DB query. 

556 

557 Args: 

558 user_email: Email address of the user. 

559 team_id: Optional team ID for context. 

560 

561 Returns: 

562 Dict: Role information for audit logging 

563 """ 

564 cache_key = f"{user_email}:{team_id or 'global'}" 

565 user_roles = self._roles_cache.get(cache_key, []) 

566 return {"roles": [{"id": ur.role_id, "name": ur.role.name, "scope": ur.scope, "permissions": ur.role.permissions} for ur in user_roles]} 

567 

568 def _is_cache_valid(self, cache_key: str) -> bool: 

569 """Check if cached permissions are still valid. 

570 

571 Args: 

572 cache_key: Cache key to check validity for 

573 

574 Returns: 

575 bool: True if cache is valid, False otherwise 

576 """ 

577 if cache_key not in self._permission_cache: 

578 return False 

579 

580 if cache_key not in self._cache_timestamps: 

581 return False 

582 

583 age = utc_now() - self._cache_timestamps[cache_key] 

584 return age.total_seconds() < self.cache_ttl 

585 

586 async def _is_user_admin(self, user_email: str) -> bool: 

587 """Check if user is admin by looking up user record directly. 

588 

589 Args: 

590 user_email: Email address of the user 

591 

592 Returns: 

593 bool: True if user is admin 

594 """ 

595 # First-Party 

596 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel 

597 

598 # Special case for platform admin (virtual user) 

599 if user_email == getattr(settings, "platform_admin_email", ""): 

600 return True 

601 

602 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none() 

603 return bool(user and user.is_admin) 

604 

605 async def _check_team_fallback_permissions(self, user_email: str, permission: str, team_id: Optional[str]) -> bool: 

606 """Check fallback team permissions for users without explicit RBAC roles. 

607 

608 This provides basic team management permissions for authenticated users on teams they belong to. 

609 

610 Args: 

611 user_email: Email address of the user 

612 permission: Permission being checked 

613 team_id: Team ID context 

614 

615 Returns: 

616 bool: True if user has fallback permission 

617 """ 

618 if not team_id: 

619 # For global team operations, allow authenticated users to read their teams and create new teams 

620 if permission in ["teams.create", "teams.read"]: 

621 return True 

622 return False 

623 

624 # Get user's role in the team (single query instead of two separate queries) 

625 user_role = await self._get_user_team_role(user_email, team_id) 

626 

627 # If user is not a member (role is None), deny access 

628 if user_role is None: 

629 return False 

630 

631 # Define fallback permissions based on team role 

632 if user_role == "owner": 

633 # Team owners get full permissions on their teams 

634 return permission in ["teams.read", "teams.update", "teams.delete", "teams.manage_members", "teams.create"] 

635 if user_role in ["member"]: 

636 # Team members get basic read permissions 

637 return permission in ["teams.read"] 

638 

639 return False 

640 

641 async def _is_team_member(self, user_email: str, team_id: str) -> bool: 

642 """Check if user is a member of the specified team. 

643 

644 Note: This method delegates to _get_user_team_role to avoid duplicate DB queries. 

645 

646 Args: 

647 user_email: Email address of the user 

648 team_id: Team ID 

649 

650 Returns: 

651 bool: True if user is a team member 

652 """ 

653 # Delegate to _get_user_team_role to avoid duplicate query 

654 return await self._get_user_team_role(user_email, team_id) is not None 

655 

656 async def _get_user_team_role(self, user_email: str, team_id: str) -> Optional[str]: 

657 """Get user's role in the specified team. 

658 

659 Args: 

660 user_email: Email address of the user 

661 team_id: Team ID 

662 

663 Returns: 

664 Optional[str]: User's role in the team or None if not a member 

665 """ 

666 # First-Party 

667 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

668 

669 member = self.db.execute(select(EmailTeamMember).where(and_(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active))).scalar_one_or_none() 

670 self.db.commit() # Release transaction to avoid idle-in-transaction 

671 

672 return member.role if member else None 

673 

674 async def _check_token_fallback_permissions(self, _user_email: str, permission: str) -> bool: 

675 """Check fallback token permissions for authenticated users. 

676 

677 All authenticated users can manage their own tokens. The token endpoints 

678 already filter by user_email, so this just grants access to the endpoints. 

679 

680 Args: 

681 _user_email: Email address of the user (unused) 

682 permission: Permission being checked 

683 

684 Returns: 

685 bool: True if user has fallback permission for token operations 

686 """ 

687 # Any authenticated user can create, read, update, and revoke their own tokens 

688 # The actual filtering by user_email happens in the token service layer 

689 if permission in ["tokens.create", "tokens.read", "tokens.update", "tokens.revoke"]: 

690 return True 

691 

692 return False