Coverage for mcpgateway / services / permission_service.py: 99%
197 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/permission_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Permission Service for RBAC System.
9This module provides the core permission checking logic for the RBAC system.
10It handles role-based permission validation, permission auditing, and caching.
11"""
13# Standard
14from datetime import datetime
15import logging
16from typing import Dict, List, Optional, Set
18# Third-Party
19from sqlalchemy import and_, or_, select
20from sqlalchemy.orm import contains_eager, Session
22# First-Party
23from mcpgateway.common.validators import SecurityValidator
24from mcpgateway.config import settings
25from mcpgateway.db import PermissionAuditLog, Permissions, Role, UserRole, utc_now
27logger = logging.getLogger(__name__)
30class PermissionService:
31 """Service for checking and managing user permissions.
33 Provides role-based permission checking with caching, auditing,
34 and support for global, team, and personal scopes.
36 Attributes:
37 db: Database session
38 audit_enabled: Whether to log permission checks
39 cache_ttl: Permission cache TTL in seconds
41 Examples:
42 Basic construction and coroutine checks:
43 >>> from unittest.mock import Mock
44 >>> service = PermissionService(Mock())
45 >>> isinstance(service, PermissionService)
46 True
47 >>> import asyncio
48 >>> asyncio.iscoroutinefunction(service.check_permission)
49 True
50 >>> asyncio.iscoroutinefunction(service.get_user_permissions)
51 True
52 """
54 def __init__(self, db: Session, audit_enabled: Optional[bool] = None):
55 """Initialize permission service.
57 Args:
58 db: Database session
59 audit_enabled: Whether to enable permission auditing (defaults to settings.permission_audit_enabled / PERMISSION_AUDIT_ENABLED)
60 """
61 self.db = db
62 if audit_enabled is None:
63 audit_enabled = settings.permission_audit_enabled
64 self.audit_enabled = audit_enabled
65 self._permission_cache: Dict[str, Set[str]] = {}
66 self._roles_cache: Dict[str, List[UserRole]] = {}
67 self._cache_timestamps: Dict[str, datetime] = {}
68 self.cache_ttl = 300 # 5 minutes
70 async def check_permission(
71 self,
72 user_email: str,
73 permission: str,
74 resource_type: Optional[str] = None,
75 resource_id: Optional[str] = None,
76 team_id: Optional[str] = None,
77 token_teams: Optional[List[str]] = None,
78 ip_address: Optional[str] = None,
79 user_agent: Optional[str] = None,
80 allow_admin_bypass: bool = True,
81 check_any_team: bool = False,
82 ) -> bool:
83 """Check if user has specific permission.
85 Checks user's roles across all applicable scopes (global, team, personal)
86 and returns True if any role grants the required permission.
88 Args:
89 user_email: Email of the user to check
90 permission: Permission to check (e.g., 'tools.create')
91 resource_type: Type of resource being accessed
92 resource_id: Specific resource ID if applicable
93 team_id: Team context for the permission check
94 token_teams: Normalized token team scope from auth context.
95 `[]` means public-only scope; `None` means unrestricted
96 admin scope (when allowed by token semantics).
97 ip_address: IP address for audit logging
98 user_agent: User agent for audit logging
99 allow_admin_bypass: If True, admin users bypass all permission checks.
100 If False, admins must have explicit permissions.
101 Default is True for backward compatibility.
102 check_any_team: If True, check permission across ALL team-scoped roles
103 (used for list/read endpoints with multi-team session tokens)
105 Returns:
106 bool: True if permission is granted, False otherwise
108 Examples:
109 Parameter validation helpers:
110 >>> permission = "users.read"
111 >>> permission.count('.') == 1
112 True
113 >>> team_id = "team-123"
114 >>> isinstance(team_id, str)
115 True
116 >>> from unittest.mock import Mock
117 >>> service = PermissionService(Mock())
118 >>> import asyncio
119 >>> asyncio.iscoroutinefunction(service.check_permission)
120 True
121 """
122 try:
123 # SECURITY: Public-only tokens (teams=[]) must never satisfy ANY permissions
124 # via admin bypass or team-scoped roles, even when the backing user identity is an admin.
125 # This enforces strict isolation: token_teams=[] means public-only access at both Layer 1 and Layer 2.
126 if token_teams is not None and len(token_teams) == 0:
127 # Public-only tokens: admin bypass is suppressed entirely
128 if allow_admin_bypass and await self._is_user_admin(user_email):
129 logger.warning(f"[RBAC] Admin bypass suppressed for public-only token: " f"user={SecurityValidator.sanitize_log_message(user_email)}, permission={permission}")
130 # Continue to permission check without admin bypass
131 elif allow_admin_bypass and await self._is_user_admin(user_email):
132 # Check if user is admin (bypass all permission checks if allowed)
133 return True
135 # Get user's effective permissions (uses cache when valid)
136 user_permissions = await self.get_user_permissions(user_email, team_id, include_all_teams=check_any_team, token_teams=token_teams)
138 # Check if user has the specific permission or wildcard
139 granted = permission in user_permissions or Permissions.ALL_PERMISSIONS in user_permissions
141 # Log the permission check if auditing is enabled
142 if self.audit_enabled:
143 # Reuse roles cached by get_user_permissions (no second query)
144 roles_checked = self._get_roles_for_audit(user_email, team_id)
145 await self._log_permission_check(
146 user_email=user_email,
147 permission=permission,
148 resource_type=resource_type,
149 resource_id=resource_id,
150 team_id=team_id,
151 granted=granted,
152 roles_checked=roles_checked,
153 ip_address=ip_address,
154 user_agent=user_agent,
155 )
157 logger.debug(
158 f"Permission check: user={SecurityValidator.sanitize_log_message(user_email)}, permission={permission}, team={SecurityValidator.sanitize_log_message(team_id)}, granted={granted}"
159 )
161 return granted
163 except Exception as e:
164 logger.error(f"Error checking permission for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
165 # Default to deny on error
166 return False
168 async def has_admin_permission(self, user_email: str, team_id: Optional[str] = None, token_teams: Optional[List[str]] = None) -> bool:
169 """Check if user has any admin-level permission.
171 This is used by AdminAuthMiddleware to allow access to /admin/* routes
172 for users who have admin permissions via RBAC, even if they're not
173 marked as is_admin in the database.
175 When team_id is provided (team-scoped request), team-scoped roles are
176 included in the permission check. When team_id is None, only global
177 and personal roles are evaluated (original behavior).
179 Args:
180 user_email: Email of the user to check
181 team_id: Optional team ID for team-scoped permission checks.
182 Must be pre-validated against the user's DB-resolved teams
183 before passing here.
184 token_teams: Optional list of team IDs to scope the permission check (Layer 1 narrowing)
186 Returns:
187 bool: True if user is an admin OR has any admin.* permission
188 """
189 try:
190 # SECURITY: Public-only tokens (token_teams=[]) suppress admin bypass
191 if token_teams is not None and len(token_teams) == 0:
192 user_permissions = await self.get_user_permissions(user_email, team_id=team_id, token_teams=token_teams)
193 if Permissions.ALL_PERMISSIONS in user_permissions:
194 return True
195 return any(perm.startswith("admin.") for perm in user_permissions)
197 # First check if user is a database admin
198 if await self._is_user_admin(user_email):
199 return True
201 # Get user's permissions and check for any admin.* permission.
202 # When team_id is provided, this includes team-scoped roles for
203 # that team, allowing team members with admin.dashboard to access
204 # the admin UI in their team context.
205 user_permissions = await self.get_user_permissions(user_email, team_id=team_id, token_teams=token_teams)
207 # Check for wildcard or any admin permission
208 if Permissions.ALL_PERMISSIONS in user_permissions:
209 return True
211 # Check for any admin.* permission
212 for perm in user_permissions:
213 if perm.startswith("admin."):
214 return True
216 return False
218 except Exception as e:
219 logger.error(f"Error checking admin permission for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
220 return False
222 async def get_user_permissions(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False, token_teams: Optional[List[str]] = None) -> Set[str]:
223 """Get all effective permissions for a user.
225 Collects permissions from all user's roles across applicable scopes.
226 Includes role inheritance and handles permission caching.
228 Args:
229 user_email: Email of the user
230 team_id: Optional team context
231 include_all_teams: If True, include ALL team-scoped roles (for list/read endpoints)
232 token_teams: Optional list of team IDs from token narrowing. When include_all_teams=True
233 and token_teams is non-empty, filters team-scoped roles to only include
234 roles from teams in this list (enforces Layer 1 narrowing at Layer 2)
236 Returns:
237 Set[str]: All effective permissions for the user
239 Examples:
240 Key shapes and coroutine check:
241 >>> cache_key = f"user@example.com:{'global'}"
242 >>> ':' in cache_key
243 True
244 >>> from unittest.mock import Mock
245 >>> service = PermissionService(Mock())
246 >>> import asyncio
247 >>> asyncio.iscoroutinefunction(service.get_user_permissions)
248 True
249 """
250 # Use distinct cache key for any-team lookups to avoid poisoning global cache.
251 # token_teams must be encoded in the key: None (unrestricted), [] (public-only),
252 # and ["team-a"] (narrowed) all produce different permission sets.
253 if token_teams is None:
254 tt_suffix = ""
255 elif len(token_teams) == 0:
256 tt_suffix = ":__public__"
257 else:
258 tt_suffix = f":{','.join(sorted(set(token_teams)))}"
260 if include_all_teams:
261 cache_key = f"{user_email}:__anyteam__{tt_suffix}"
262 else:
263 cache_key = f"{user_email}:{team_id or 'global'}{tt_suffix}"
264 if self._is_cache_valid(cache_key):
265 cached_perms = self._permission_cache[cache_key]
266 logger.debug(f"[RBAC] Cache hit for {SecurityValidator.sanitize_log_message(user_email)} (team_id={SecurityValidator.sanitize_log_message(team_id)}): {cached_perms}")
267 return cached_perms
269 permissions = set()
271 # Get all active roles for the user (with eager-loaded role relationship)
272 user_roles = await self._get_user_roles(user_email, team_id, include_all_teams=include_all_teams, token_teams=token_teams)
273 logger.debug(f"[RBAC] Found {len(user_roles)} roles for {SecurityValidator.sanitize_log_message(user_email)} (team_id={SecurityValidator.sanitize_log_message(team_id)})")
275 # Collect permissions from all roles
276 for user_role in user_roles:
277 role_permissions = user_role.role.get_effective_permissions()
278 logger.debug(f"[RBAC] Role '{user_role.role.name}' (scope={user_role.scope}, scope_id={user_role.scope_id}) has permissions: {role_permissions}")
279 permissions.update(role_permissions)
281 # Cache both permissions and roles
282 self._permission_cache[cache_key] = permissions
283 self._roles_cache[cache_key] = user_roles
284 self._cache_timestamps[cache_key] = utc_now()
286 return permissions
288 async def get_user_roles(self, user_email: str, scope: Optional[str] = None, team_id: Optional[str] = None, include_expired: bool = False) -> List[UserRole]:
289 """Get user's role assignments.
291 Args:
292 user_email: Email of the user
293 scope: Filter by scope ('global', 'team', 'personal')
294 team_id: Filter by team ID
295 include_expired: Whether to include expired roles
297 Returns:
298 List[UserRole]: User's role assignments
300 Examples:
301 Coroutine check:
302 >>> from unittest.mock import Mock
303 >>> service = PermissionService(Mock())
304 >>> import asyncio
305 >>> asyncio.iscoroutinefunction(service.get_user_roles)
306 True
307 """
308 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
310 if scope:
311 query = query.where(UserRole.scope == scope)
313 if team_id:
314 query = query.where(UserRole.scope_id == team_id)
316 if not include_expired:
317 now = utc_now()
318 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
320 result = self.db.execute(query)
321 user_roles = result.scalars().all()
322 return user_roles
324 async def has_permission_on_resource(self, user_email: str, permission: str, resource_type: str, resource_id: str, team_id: Optional[str] = None) -> bool:
325 """Check if user has permission on a specific resource.
327 This method can be extended to include resource-specific
328 permission logic (e.g., resource ownership, sharing rules).
330 Args:
331 user_email: Email of the user
332 permission: Permission to check
333 resource_type: Type of resource
334 resource_id: Specific resource ID
335 team_id: Team context
337 Returns:
338 bool: True if user has permission on the resource
340 Examples:
341 Coroutine check and parameter sanity:
342 >>> from unittest.mock import Mock
343 >>> service = PermissionService(Mock())
344 >>> import asyncio
345 >>> asyncio.iscoroutinefunction(service.has_permission_on_resource)
346 True
347 >>> res_type, res_id = "tools", "tool-123"
348 >>> all(isinstance(x, str) for x in (res_type, res_id))
349 True
350 """
351 # Basic permission check
352 if not await self.check_permission(user_email=user_email, permission=permission, resource_type=resource_type, resource_id=resource_id, team_id=team_id):
353 return False
355 # NOTE: Add resource-specific logic here in future enhancement
356 # For example:
357 # - Check resource ownership
358 # - Check resource sharing permissions
359 # - Check resource team membership
361 return True
363 async def check_resource_ownership(self, user_email: str, resource: any, allow_team_admin: bool = True) -> bool:
364 """Check if user owns a resource or is a team admin for team resources.
366 This method checks resource ownership based on the owner_email field
367 and optionally allows team admins to modify team-scoped resources.
369 Args:
370 user_email: Email of the user to check
371 resource: Resource object with owner_email, team_id, and visibility attributes
372 allow_team_admin: Whether to allow team admins for team-scoped resources
374 Returns:
375 bool: True if user owns the resource or is authorized team admin
377 Examples:
378 >>> from unittest.mock import Mock
379 >>> service = PermissionService(Mock())
380 >>> import asyncio
381 >>> asyncio.iscoroutinefunction(service.check_resource_ownership)
382 True
383 """
384 # Check if user is platform admin (bypass ownership checks)
385 if await self._is_user_admin(user_email):
386 return True
388 # Check direct ownership
389 if hasattr(resource, "owner_email") and resource.owner_email == user_email:
390 return True
392 # Check team admin permission for team resources
393 if allow_team_admin and hasattr(resource, "visibility") and resource.visibility == "team":
394 if hasattr(resource, "team_id") and resource.team_id:
395 user_role = await self._get_user_team_role(user_email, resource.team_id)
396 if user_role == "owner":
397 return True
399 return False
401 async def check_admin_permission(self, user_email: str, token_teams: Optional[List[str]] = None) -> bool:
402 """Check if user has any admin permissions.
404 Args:
405 user_email: Email of the user
406 token_teams: Optional list of team IDs to scope the permission check (Layer 1 narrowing)
408 Returns:
409 bool: True if user has admin permissions
411 Examples:
412 Coroutine check:
413 >>> from unittest.mock import Mock
414 >>> service = PermissionService(Mock())
415 >>> import asyncio
416 >>> asyncio.iscoroutinefunction(service.check_admin_permission)
417 True
418 """
419 # SECURITY: Public-only tokens (token_teams=[]) suppress admin bypass
420 if token_teams is not None and len(token_teams) == 0:
421 # Public-only token: check permissions without admin bypass
422 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS]
423 user_permissions = await self.get_user_permissions(user_email, token_teams=token_teams)
424 return any(perm in user_permissions for perm in admin_permissions)
426 # First check if user is admin (handles platform admin virtual user)
427 if await self._is_user_admin(user_email):
428 return True
430 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS]
432 user_permissions = await self.get_user_permissions(user_email, token_teams=token_teams)
433 return any(perm in user_permissions for perm in admin_permissions)
435 def clear_user_cache(self, user_email: str) -> None:
436 """Clear cached permissions for a user.
438 Should be called when user's roles change.
440 Args:
441 user_email: Email of the user
443 Examples:
444 Cache invalidation behavior:
445 >>> from unittest.mock import Mock
446 >>> service = PermissionService(Mock())
447 >>> service._permission_cache = {"alice:global": {"tools.read"}, "bob:team1": {"*"}}
448 >>> service._cache_timestamps = {"alice:global": utc_now(), "bob:team1": utc_now()}
449 >>> service.clear_user_cache("alice")
450 >>> "alice:global" in service._permission_cache
451 False
452 >>> "bob:team1" in service._permission_cache
453 True
454 """
455 keys_to_remove = [key for key in self._permission_cache if key.startswith(f"{user_email}:")]
457 for key in keys_to_remove:
458 self._permission_cache.pop(key, None)
459 self._roles_cache.pop(key, None)
460 self._cache_timestamps.pop(key, None)
462 logger.debug(f"Cleared permission cache for user: {SecurityValidator.sanitize_log_message(user_email)}")
464 def clear_cache(self) -> None:
465 """Clear all cached permissions.
467 Examples:
468 Clear all cache:
469 >>> from unittest.mock import Mock
470 >>> service = PermissionService(Mock())
471 >>> service._permission_cache = {"x": {"p"}}
472 >>> service._cache_timestamps = {"x": utc_now()}
473 >>> service.clear_cache()
474 >>> service._permission_cache == {}
475 True
476 >>> service._cache_timestamps == {}
477 True
478 """
479 self._permission_cache.clear()
480 self._roles_cache.clear()
481 self._cache_timestamps.clear()
482 logger.debug("Cleared all permission cache")
484 async def _get_user_roles(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False, token_teams: Optional[List[str]] = None) -> List[UserRole]:
485 """Get user roles for permission checking.
487 Always includes global and personal roles. Team-scoped role inclusion
488 depends on the parameters:
490 - team_id provided: includes team roles for that specific team
491 (plus team roles with scope_id=NULL which apply to all teams)
492 - team_id=None, include_all_teams=True: includes ALL team-scoped roles
493 EXCEPT roles on personal teams (which auto-grant team_admin to every user)
494 - team_id=None, include_all_teams=False: includes only team-scoped roles
495 with scope_id=NULL (roles that apply to all teams, e.g. during login)
497 Args:
498 user_email: Email address of the user
499 team_id: Optional team ID to filter to a specific team's roles
500 include_all_teams: If True, include ALL team-scoped roles (for list/read with session tokens)
501 token_teams: Optional list of team IDs from token narrowing. When include_all_teams=True
502 and token_teams is non-empty, filters team-scoped roles to only include
503 roles from teams in this list (enforces Layer 1 narrowing at Layer 2)
505 Returns:
506 List[UserRole]: List of active roles for the user
507 """
508 query = select(UserRole).join(Role).options(contains_eager(UserRole.role)).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
510 # Include global roles and personal roles
511 scope_conditions = [UserRole.scope == "global", UserRole.scope == "personal"]
513 if team_id:
514 # Security: Verify team_id is within token scope when narrowed.
515 # Public-only tokens (token_teams=[]) must never access team-specific roles.
516 # When team_id is out of scope, we skip adding team-scoped roles but still
517 # return global and personal roles (needed for join endpoint and other operations).
518 if token_teams is not None and (len(token_teams) == 0 or team_id not in token_teams):
519 logger.debug(
520 f"[RBAC] Team {SecurityValidator.sanitize_log_message(team_id)} not in token scope "
521 f"{SecurityValidator.sanitize_log_message(token_teams)} for {SecurityValidator.sanitize_log_message(user_email)}: "
522 f"excluding team-scoped roles but keeping global/personal roles"
523 )
524 else:
525 # Team is in scope: include team-scoped roles for this team
526 scope_conditions.append(and_(UserRole.scope == "team", or_(UserRole.scope_id == team_id, UserRole.scope_id.is_(None))))
527 elif include_all_teams:
528 # Include ALL team-scoped roles EXCEPT personal team roles.
529 # Personal teams are auto-created for every user with team_admin permissions,
530 # so including them would grant every user full mutate permissions (servers.create,
531 # tools.create, etc.) when check_any_team=True, making RBAC ineffective.
532 # First-Party
533 from mcpgateway.db import EmailTeam # pylint: disable=import-outside-toplevel
535 base_condition = and_(
536 UserRole.scope == "team",
537 or_(
538 UserRole.scope_id.is_(None),
539 ~UserRole.scope_id.in_(select(EmailTeam.id).where(EmailTeam.is_personal.is_(True))),
540 ),
541 )
543 # SECURITY: Filter team-scoped roles based on token_teams
544 # - token_teams=None (un-narrowed): include all team roles
545 # - token_teams=[] (public-only): exclude ALL team roles (strict isolation)
546 # - token_teams=["team-a", ...]: include only specified teams
547 if token_teams is not None:
548 if len(token_teams) == 0:
549 # Public-only token: exclude ALL team-scoped roles
550 # Only global and personal roles remain
551 logger.debug(f"[RBAC] Public-only token for {SecurityValidator.sanitize_log_message(user_email)}: excluding all team-scoped roles")
552 # Do NOT append base_condition - this excludes all team roles
553 else:
554 # Narrowed token: include only specified teams
555 base_condition = and_(
556 base_condition,
557 or_(UserRole.scope_id.is_(None), UserRole.scope_id.in_(token_teams)), # Keep global team roles # Only roles from narrowed teams
558 )
559 scope_conditions.append(base_condition)
560 else:
561 # Un-narrowed token: include all team roles (original behavior)
562 scope_conditions.append(base_condition)
563 else:
564 # When team_id is None and include_all_teams is False (e.g., during login),
565 # include team-scoped roles with scope_id=None (roles that apply to all teams).
566 # SECURITY: Public-only tokens must not access any team-scoped roles.
567 if token_teams is None or len(token_teams) > 0:
568 scope_conditions.append(and_(UserRole.scope == "team", UserRole.scope_id.is_(None)))
570 query = query.where(or_(*scope_conditions))
572 # Filter out expired roles
573 now = utc_now()
574 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
576 result = self.db.execute(query)
577 user_roles = result.unique().scalars().all()
578 return user_roles
580 async def _log_permission_check(
581 self,
582 user_email: str,
583 permission: str,
584 resource_type: Optional[str],
585 resource_id: Optional[str],
586 team_id: Optional[str],
587 granted: bool,
588 roles_checked: Dict,
589 ip_address: Optional[str],
590 user_agent: Optional[str],
591 ) -> None:
592 """Log permission check for auditing.
594 Args:
595 user_email: Email address of the user
596 permission: Permission being checked
597 resource_type: Type of resource being accessed
598 resource_id: ID of specific resource
599 team_id: ID of team context
600 granted: Whether permission was granted
601 roles_checked: Dictionary of roles that were checked
602 ip_address: IP address of request
603 user_agent: User agent of request
604 """
605 audit_log = PermissionAuditLog(
606 user_email=user_email,
607 permission=permission,
608 resource_type=resource_type,
609 resource_id=resource_id,
610 team_id=team_id,
611 granted=granted,
612 roles_checked=roles_checked,
613 ip_address=ip_address,
614 user_agent=user_agent,
615 )
617 self.db.add(audit_log)
618 self.db.commit()
620 def _get_roles_for_audit(self, user_email: str, team_id: Optional[str]) -> Dict:
621 """Get role information for audit logging from cached roles.
623 Uses roles cached by get_user_permissions() to avoid a duplicate DB query.
625 Args:
626 user_email: Email address of the user.
627 team_id: Optional team ID for context.
629 Returns:
630 Dict: Role information for audit logging
631 """
632 cache_key = f"{user_email}:{team_id or 'global'}"
633 user_roles = self._roles_cache.get(cache_key, [])
634 return {"roles": [{"id": ur.role_id, "name": ur.role.name, "scope": ur.scope, "permissions": ur.role.permissions} for ur in user_roles]}
636 def _is_cache_valid(self, cache_key: str) -> bool:
637 """Check if cached permissions are still valid.
639 Args:
640 cache_key: Cache key to check validity for
642 Returns:
643 bool: True if cache is valid, False otherwise
644 """
645 if cache_key not in self._permission_cache:
646 return False
648 if cache_key not in self._cache_timestamps:
649 return False
651 age = utc_now() - self._cache_timestamps[cache_key]
652 return age.total_seconds() < self.cache_ttl
654 async def _is_user_admin(self, user_email: str) -> bool:
655 """Check if user is admin by looking up user record directly.
657 Args:
658 user_email: Email address of the user
660 Returns:
661 bool: True if user is admin
662 """
663 # First-Party
664 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel
666 # Special case for platform admin (virtual user)
667 if user_email == getattr(settings, "platform_admin_email", ""):
668 return True
670 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none()
671 return bool(user and user.is_admin)
673 async def _check_team_fallback_permissions(self, user_email: str, permission: str, team_id: Optional[str]) -> bool:
674 """Check fallback team permissions for users without explicit RBAC roles.
676 This provides basic team management permissions for authenticated users on teams they belong to.
678 Args:
679 user_email: Email address of the user
680 permission: Permission being checked
681 team_id: Team ID context
683 Returns:
684 bool: True if user has fallback permission
685 """
686 if not team_id:
687 # For global team operations, allow authenticated users to read their teams and create new teams
688 if permission in ["teams.create", "teams.read"]:
689 return True
690 return False
692 # Get user's role in the team (single query instead of two separate queries)
693 user_role = await self._get_user_team_role(user_email, team_id)
695 # If user is not a member (role is None), deny access
696 if user_role is None:
697 return False
699 # Define fallback permissions based on team role
700 if user_role == "owner":
701 # Team owners get full permissions on their teams
702 return permission in ["teams.read", "teams.update", "teams.delete", "teams.manage_members", "teams.create"]
703 if user_role in ["member"]:
704 # Team members get basic read permissions
705 return permission in ["teams.read"]
707 return False
709 async def _is_team_member(self, user_email: str, team_id: str) -> bool:
710 """Check if user is a member of the specified team.
712 Note: This method delegates to _get_user_team_role to avoid duplicate DB queries.
714 Args:
715 user_email: Email address of the user
716 team_id: Team ID
718 Returns:
719 bool: True if user is a team member
720 """
721 # Delegate to _get_user_team_role to avoid duplicate query
722 return await self._get_user_team_role(user_email, team_id) is not None
724 async def _get_user_team_role(self, user_email: str, team_id: str) -> Optional[str]:
725 """Get user's role in the specified team.
727 Args:
728 user_email: Email address of the user
729 team_id: Team ID
731 Returns:
732 Optional[str]: User's role in the team or None if not a member
733 """
734 # First-Party
735 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
737 member = self.db.execute(select(EmailTeamMember).where(and_(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active))).scalar_one_or_none()
738 self.db.commit() # Release transaction to avoid idle-in-transaction
740 return member.role if member else None
742 async def _check_token_fallback_permissions(self, _user_email: str, permission: str) -> bool:
743 """Check fallback token permissions for authenticated users.
745 All authenticated users can manage their own tokens. The token endpoints
746 already filter by user_email, so this just grants access to the endpoints.
748 Args:
749 _user_email: Email address of the user (unused)
750 permission: Permission being checked
752 Returns:
753 bool: True if user has fallback permission for token operations
754 """
755 # Any authenticated user can create, read, update, and revoke their own tokens
756 # The actual filtering by user_email happens in the token service layer
757 if permission in ["tokens.create", "tokens.read", "tokens.update", "tokens.revoke"]:
758 return True
760 return False