Coverage for mcpgateway / services / permission_service.py: 100%
173 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/permission_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Permission Service for RBAC System.
9This module provides the core permission checking logic for the RBAC system.
10It handles role-based permission validation, permission auditing, and caching.
11"""
13# Standard
14from datetime import datetime
15import logging
16from typing import Dict, List, Optional, Set
18# Third-Party
19from sqlalchemy import and_, or_, select
20from sqlalchemy.orm import contains_eager, Session
22# First-Party
23from mcpgateway.config import settings
24from mcpgateway.db import PermissionAuditLog, Permissions, Role, UserRole, utc_now
26logger = logging.getLogger(__name__)
29class PermissionService:
30 """Service for checking and managing user permissions.
32 Provides role-based permission checking with caching, auditing,
33 and support for global, team, and personal scopes.
35 Attributes:
36 db: Database session
37 audit_enabled: Whether to log permission checks
38 cache_ttl: Permission cache TTL in seconds
40 Examples:
41 Basic construction and coroutine checks:
42 >>> from unittest.mock import Mock
43 >>> service = PermissionService(Mock())
44 >>> isinstance(service, PermissionService)
45 True
46 >>> import asyncio
47 >>> asyncio.iscoroutinefunction(service.check_permission)
48 True
49 >>> asyncio.iscoroutinefunction(service.get_user_permissions)
50 True
51 """
53 def __init__(self, db: Session, audit_enabled: Optional[bool] = None):
54 """Initialize permission service.
56 Args:
57 db: Database session
58 audit_enabled: Whether to enable permission auditing (defaults to settings.permission_audit_enabled / PERMISSION_AUDIT_ENABLED)
59 """
60 self.db = db
61 if audit_enabled is None:
62 audit_enabled = settings.permission_audit_enabled
63 self.audit_enabled = audit_enabled
64 self._permission_cache: Dict[str, Set[str]] = {}
65 self._roles_cache: Dict[str, List[UserRole]] = {}
66 self._cache_timestamps: Dict[str, datetime] = {}
67 self.cache_ttl = 300 # 5 minutes
69 async def check_permission(
70 self,
71 user_email: str,
72 permission: str,
73 resource_type: Optional[str] = None,
74 resource_id: Optional[str] = None,
75 team_id: Optional[str] = None,
76 token_teams: Optional[List[str]] = None,
77 ip_address: Optional[str] = None,
78 user_agent: Optional[str] = None,
79 allow_admin_bypass: bool = True,
80 check_any_team: bool = False,
81 ) -> bool:
82 """Check if user has specific permission.
84 Checks user's roles across all applicable scopes (global, team, personal)
85 and returns True if any role grants the required permission.
87 Args:
88 user_email: Email of the user to check
89 permission: Permission to check (e.g., 'tools.create')
90 resource_type: Type of resource being accessed
91 resource_id: Specific resource ID if applicable
92 team_id: Team context for the permission check
93 token_teams: Normalized token team scope from auth context.
94 `[]` means public-only scope; `None` means unrestricted
95 admin scope (when allowed by token semantics).
96 ip_address: IP address for audit logging
97 user_agent: User agent for audit logging
98 allow_admin_bypass: If True, admin users bypass all permission checks.
99 If False, admins must have explicit permissions.
100 Default is True for backward compatibility.
101 check_any_team: If True, check permission across ALL team-scoped roles
102 (used for list/read endpoints with multi-team session tokens)
104 Returns:
105 bool: True if permission is granted, False otherwise
107 Examples:
108 Parameter validation helpers:
109 >>> permission = "users.read"
110 >>> permission.count('.') == 1
111 True
112 >>> team_id = "team-123"
113 >>> isinstance(team_id, str)
114 True
115 >>> from unittest.mock import Mock
116 >>> service = PermissionService(Mock())
117 >>> import asyncio
118 >>> asyncio.iscoroutinefunction(service.check_permission)
119 True
120 """
121 try:
122 # SECURITY: Public-only tokens (teams=[]) must never satisfy admin.*
123 # permissions, even when the backing user identity is an admin.
124 if permission.startswith("admin.") and token_teams is not None and len(token_teams) == 0:
125 logger.warning(f"Permission denied for public-only token: user={user_email}, permission={permission}")
126 return False
128 # Check if user is admin (bypass all permission checks if allowed)
129 if allow_admin_bypass and await self._is_user_admin(user_email):
130 return True
132 # Get user's effective permissions (uses cache when valid)
133 user_permissions = await self.get_user_permissions(user_email, team_id, include_all_teams=check_any_team)
135 # Check if user has the specific permission or wildcard
136 granted = permission in user_permissions or Permissions.ALL_PERMISSIONS in user_permissions
138 # Log the permission check if auditing is enabled
139 if self.audit_enabled:
140 # Reuse roles cached by get_user_permissions (no second query)
141 roles_checked = self._get_roles_for_audit(user_email, team_id)
142 await self._log_permission_check(
143 user_email=user_email,
144 permission=permission,
145 resource_type=resource_type,
146 resource_id=resource_id,
147 team_id=team_id,
148 granted=granted,
149 roles_checked=roles_checked,
150 ip_address=ip_address,
151 user_agent=user_agent,
152 )
154 logger.debug(f"Permission check: user={user_email}, permission={permission}, team={team_id}, granted={granted}")
156 return granted
158 except Exception as e:
159 logger.error(f"Error checking permission for {user_email}: {e}")
160 # Default to deny on error
161 return False
163 async def has_admin_permission(self, user_email: str, team_id: Optional[str] = None) -> bool:
164 """Check if user has any admin-level permission.
166 This is used by AdminAuthMiddleware to allow access to /admin/* routes
167 for users who have admin permissions via RBAC, even if they're not
168 marked as is_admin in the database.
170 When team_id is provided (team-scoped request), team-scoped roles are
171 included in the permission check. When team_id is None, only global
172 and personal roles are evaluated (original behavior).
174 Args:
175 user_email: Email of the user to check
176 team_id: Optional team ID for team-scoped permission checks.
177 Must be pre-validated against the user's DB-resolved teams
178 before passing here.
180 Returns:
181 bool: True if user is an admin OR has any admin.* permission
182 """
183 try:
184 # First check if user is a database admin
185 if await self._is_user_admin(user_email):
186 return True
188 # Get user's permissions and check for any admin.* permission.
189 # When team_id is provided, this includes team-scoped roles for
190 # that team, allowing team members with admin.dashboard to access
191 # the admin UI in their team context.
192 user_permissions = await self.get_user_permissions(user_email, team_id=team_id)
194 # Check for wildcard or any admin permission
195 if Permissions.ALL_PERMISSIONS in user_permissions:
196 return True
198 # Check for any admin.* permission
199 for perm in user_permissions:
200 if perm.startswith("admin."):
201 return True
203 return False
205 except Exception as e:
206 logger.error(f"Error checking admin permission for {user_email}: {e}")
207 return False
209 async def get_user_permissions(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> Set[str]:
210 """Get all effective permissions for a user.
212 Collects permissions from all user's roles across applicable scopes.
213 Includes role inheritance and handles permission caching.
215 Args:
216 user_email: Email of the user
217 team_id: Optional team context
218 include_all_teams: If True, include ALL team-scoped roles (for list/read endpoints)
220 Returns:
221 Set[str]: All effective permissions for the user
223 Examples:
224 Key shapes and coroutine check:
225 >>> cache_key = f"user@example.com:{'global'}"
226 >>> ':' in cache_key
227 True
228 >>> from unittest.mock import Mock
229 >>> service = PermissionService(Mock())
230 >>> import asyncio
231 >>> asyncio.iscoroutinefunction(service.get_user_permissions)
232 True
233 """
234 # Use distinct cache key for any-team lookups to avoid poisoning global cache
235 if include_all_teams:
236 cache_key = f"{user_email}:__anyteam__"
237 else:
238 cache_key = f"{user_email}:{team_id or 'global'}"
239 if self._is_cache_valid(cache_key):
240 cached_perms = self._permission_cache[cache_key]
241 logger.debug(f"[RBAC] Cache hit for {user_email} (team_id={team_id}): {cached_perms}")
242 return cached_perms
244 permissions = set()
246 # Get all active roles for the user (with eager-loaded role relationship)
247 user_roles = await self._get_user_roles(user_email, team_id, include_all_teams=include_all_teams)
248 logger.debug(f"[RBAC] Found {len(user_roles)} roles for {user_email} (team_id={team_id})")
250 # Collect permissions from all roles
251 for user_role in user_roles:
252 role_permissions = user_role.role.get_effective_permissions()
253 logger.debug(f"[RBAC] Role '{user_role.role.name}' (scope={user_role.scope}, scope_id={user_role.scope_id}) has permissions: {role_permissions}")
254 permissions.update(role_permissions)
256 # Cache both permissions and roles
257 self._permission_cache[cache_key] = permissions
258 self._roles_cache[cache_key] = user_roles
259 self._cache_timestamps[cache_key] = utc_now()
261 return permissions
263 async def get_user_roles(self, user_email: str, scope: Optional[str] = None, team_id: Optional[str] = None, include_expired: bool = False) -> List[UserRole]:
264 """Get user's role assignments.
266 Args:
267 user_email: Email of the user
268 scope: Filter by scope ('global', 'team', 'personal')
269 team_id: Filter by team ID
270 include_expired: Whether to include expired roles
272 Returns:
273 List[UserRole]: User's role assignments
275 Examples:
276 Coroutine check:
277 >>> from unittest.mock import Mock
278 >>> service = PermissionService(Mock())
279 >>> import asyncio
280 >>> asyncio.iscoroutinefunction(service.get_user_roles)
281 True
282 """
283 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
285 if scope:
286 query = query.where(UserRole.scope == scope)
288 if team_id:
289 query = query.where(UserRole.scope_id == team_id)
291 if not include_expired:
292 now = utc_now()
293 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
295 result = self.db.execute(query)
296 user_roles = result.scalars().all()
297 return user_roles
299 async def has_permission_on_resource(self, user_email: str, permission: str, resource_type: str, resource_id: str, team_id: Optional[str] = None) -> bool:
300 """Check if user has permission on a specific resource.
302 This method can be extended to include resource-specific
303 permission logic (e.g., resource ownership, sharing rules).
305 Args:
306 user_email: Email of the user
307 permission: Permission to check
308 resource_type: Type of resource
309 resource_id: Specific resource ID
310 team_id: Team context
312 Returns:
313 bool: True if user has permission on the resource
315 Examples:
316 Coroutine check and parameter sanity:
317 >>> from unittest.mock import Mock
318 >>> service = PermissionService(Mock())
319 >>> import asyncio
320 >>> asyncio.iscoroutinefunction(service.has_permission_on_resource)
321 True
322 >>> res_type, res_id = "tools", "tool-123"
323 >>> all(isinstance(x, str) for x in (res_type, res_id))
324 True
325 """
326 # Basic permission check
327 if not await self.check_permission(user_email=user_email, permission=permission, resource_type=resource_type, resource_id=resource_id, team_id=team_id):
328 return False
330 # NOTE: Add resource-specific logic here in future enhancement
331 # For example:
332 # - Check resource ownership
333 # - Check resource sharing permissions
334 # - Check resource team membership
336 return True
338 async def check_resource_ownership(self, user_email: str, resource: any, allow_team_admin: bool = True) -> bool:
339 """Check if user owns a resource or is a team admin for team resources.
341 This method checks resource ownership based on the owner_email field
342 and optionally allows team admins to modify team-scoped resources.
344 Args:
345 user_email: Email of the user to check
346 resource: Resource object with owner_email, team_id, and visibility attributes
347 allow_team_admin: Whether to allow team admins for team-scoped resources
349 Returns:
350 bool: True if user owns the resource or is authorized team admin
352 Examples:
353 >>> from unittest.mock import Mock
354 >>> service = PermissionService(Mock())
355 >>> import asyncio
356 >>> asyncio.iscoroutinefunction(service.check_resource_ownership)
357 True
358 """
359 # Check if user is platform admin (bypass ownership checks)
360 if await self._is_user_admin(user_email):
361 return True
363 # Check direct ownership
364 if hasattr(resource, "owner_email") and resource.owner_email == user_email:
365 return True
367 # Check team admin permission for team resources
368 if allow_team_admin and hasattr(resource, "visibility") and resource.visibility == "team":
369 if hasattr(resource, "team_id") and resource.team_id:
370 user_role = await self._get_user_team_role(user_email, resource.team_id)
371 if user_role == "owner":
372 return True
374 return False
376 async def check_admin_permission(self, user_email: str) -> bool:
377 """Check if user has any admin permissions.
379 Args:
380 user_email: Email of the user
382 Returns:
383 bool: True if user has admin permissions
385 Examples:
386 Coroutine check:
387 >>> from unittest.mock import Mock
388 >>> service = PermissionService(Mock())
389 >>> import asyncio
390 >>> asyncio.iscoroutinefunction(service.check_admin_permission)
391 True
392 """
393 # First check if user is admin (handles platform admin virtual user)
394 if await self._is_user_admin(user_email):
395 return True
397 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS]
399 user_permissions = await self.get_user_permissions(user_email)
400 return any(perm in user_permissions for perm in admin_permissions)
402 def clear_user_cache(self, user_email: str) -> None:
403 """Clear cached permissions for a user.
405 Should be called when user's roles change.
407 Args:
408 user_email: Email of the user
410 Examples:
411 Cache invalidation behavior:
412 >>> from unittest.mock import Mock
413 >>> service = PermissionService(Mock())
414 >>> service._permission_cache = {"alice:global": {"tools.read"}, "bob:team1": {"*"}}
415 >>> service._cache_timestamps = {"alice:global": utc_now(), "bob:team1": utc_now()}
416 >>> service.clear_user_cache("alice")
417 >>> "alice:global" in service._permission_cache
418 False
419 >>> "bob:team1" in service._permission_cache
420 True
421 """
422 keys_to_remove = [key for key in self._permission_cache if key.startswith(f"{user_email}:")]
424 for key in keys_to_remove:
425 self._permission_cache.pop(key, None)
426 self._roles_cache.pop(key, None)
427 self._cache_timestamps.pop(key, None)
429 logger.debug(f"Cleared permission cache for user: {user_email}")
431 def clear_cache(self) -> None:
432 """Clear all cached permissions.
434 Examples:
435 Clear all cache:
436 >>> from unittest.mock import Mock
437 >>> service = PermissionService(Mock())
438 >>> service._permission_cache = {"x": {"p"}}
439 >>> service._cache_timestamps = {"x": utc_now()}
440 >>> service.clear_cache()
441 >>> service._permission_cache == {}
442 True
443 >>> service._cache_timestamps == {}
444 True
445 """
446 self._permission_cache.clear()
447 self._roles_cache.clear()
448 self._cache_timestamps.clear()
449 logger.debug("Cleared all permission cache")
451 async def _get_user_roles(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> List[UserRole]:
452 """Get user roles for permission checking.
454 Always includes global and personal roles. Team-scoped role inclusion
455 depends on the parameters:
457 - team_id provided: includes team roles for that specific team
458 (plus team roles with scope_id=NULL which apply to all teams)
459 - team_id=None, include_all_teams=True: includes ALL team-scoped roles
460 EXCEPT roles on personal teams (which auto-grant team_admin to every user)
461 - team_id=None, include_all_teams=False: includes only team-scoped roles
462 with scope_id=NULL (roles that apply to all teams, e.g. during login)
464 Args:
465 user_email: Email address of the user
466 team_id: Optional team ID to filter to a specific team's roles
467 include_all_teams: If True, include ALL team-scoped roles (for list/read with session tokens)
469 Returns:
470 List[UserRole]: List of active roles for the user
471 """
472 query = select(UserRole).join(Role).options(contains_eager(UserRole.role)).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
474 # Include global roles and personal roles
475 scope_conditions = [UserRole.scope == "global", UserRole.scope == "personal"]
477 if team_id:
478 # Filter to specific team's roles only
479 scope_conditions.append(and_(UserRole.scope == "team", or_(UserRole.scope_id == team_id, UserRole.scope_id.is_(None))))
480 elif include_all_teams:
481 # Include ALL team-scoped roles EXCEPT personal team roles.
482 # Personal teams are auto-created for every user with team_admin permissions,
483 # so including them would grant every user full mutate permissions (servers.create,
484 # tools.create, etc.) when check_any_team=True, making RBAC ineffective.
485 # First-Party
486 from mcpgateway.db import EmailTeam # pylint: disable=import-outside-toplevel
488 scope_conditions.append(
489 and_(
490 UserRole.scope == "team",
491 or_(
492 UserRole.scope_id.is_(None),
493 ~UserRole.scope_id.in_(select(EmailTeam.id).where(EmailTeam.is_personal.is_(True))),
494 ),
495 )
496 )
497 else:
498 # When team_id is None and include_all_teams is False (e.g., during login),
499 # include team-scoped roles with scope_id=None (roles that apply to all teams)
500 scope_conditions.append(and_(UserRole.scope == "team", UserRole.scope_id.is_(None)))
502 query = query.where(or_(*scope_conditions))
504 # Filter out expired roles
505 now = utc_now()
506 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
508 result = self.db.execute(query)
509 user_roles = result.unique().scalars().all()
510 return user_roles
512 async def _log_permission_check(
513 self,
514 user_email: str,
515 permission: str,
516 resource_type: Optional[str],
517 resource_id: Optional[str],
518 team_id: Optional[str],
519 granted: bool,
520 roles_checked: Dict,
521 ip_address: Optional[str],
522 user_agent: Optional[str],
523 ) -> None:
524 """Log permission check for auditing.
526 Args:
527 user_email: Email address of the user
528 permission: Permission being checked
529 resource_type: Type of resource being accessed
530 resource_id: ID of specific resource
531 team_id: ID of team context
532 granted: Whether permission was granted
533 roles_checked: Dictionary of roles that were checked
534 ip_address: IP address of request
535 user_agent: User agent of request
536 """
537 audit_log = PermissionAuditLog(
538 user_email=user_email,
539 permission=permission,
540 resource_type=resource_type,
541 resource_id=resource_id,
542 team_id=team_id,
543 granted=granted,
544 roles_checked=roles_checked,
545 ip_address=ip_address,
546 user_agent=user_agent,
547 )
549 self.db.add(audit_log)
550 self.db.commit()
552 def _get_roles_for_audit(self, user_email: str, team_id: Optional[str]) -> Dict:
553 """Get role information for audit logging from cached roles.
555 Uses roles cached by get_user_permissions() to avoid a duplicate DB query.
557 Args:
558 user_email: Email address of the user.
559 team_id: Optional team ID for context.
561 Returns:
562 Dict: Role information for audit logging
563 """
564 cache_key = f"{user_email}:{team_id or 'global'}"
565 user_roles = self._roles_cache.get(cache_key, [])
566 return {"roles": [{"id": ur.role_id, "name": ur.role.name, "scope": ur.scope, "permissions": ur.role.permissions} for ur in user_roles]}
568 def _is_cache_valid(self, cache_key: str) -> bool:
569 """Check if cached permissions are still valid.
571 Args:
572 cache_key: Cache key to check validity for
574 Returns:
575 bool: True if cache is valid, False otherwise
576 """
577 if cache_key not in self._permission_cache:
578 return False
580 if cache_key not in self._cache_timestamps:
581 return False
583 age = utc_now() - self._cache_timestamps[cache_key]
584 return age.total_seconds() < self.cache_ttl
586 async def _is_user_admin(self, user_email: str) -> bool:
587 """Check if user is admin by looking up user record directly.
589 Args:
590 user_email: Email address of the user
592 Returns:
593 bool: True if user is admin
594 """
595 # First-Party
596 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel
598 # Special case for platform admin (virtual user)
599 if user_email == getattr(settings, "platform_admin_email", ""):
600 return True
602 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none()
603 return bool(user and user.is_admin)
605 async def _check_team_fallback_permissions(self, user_email: str, permission: str, team_id: Optional[str]) -> bool:
606 """Check fallback team permissions for users without explicit RBAC roles.
608 This provides basic team management permissions for authenticated users on teams they belong to.
610 Args:
611 user_email: Email address of the user
612 permission: Permission being checked
613 team_id: Team ID context
615 Returns:
616 bool: True if user has fallback permission
617 """
618 if not team_id:
619 # For global team operations, allow authenticated users to read their teams and create new teams
620 if permission in ["teams.create", "teams.read"]:
621 return True
622 return False
624 # Get user's role in the team (single query instead of two separate queries)
625 user_role = await self._get_user_team_role(user_email, team_id)
627 # If user is not a member (role is None), deny access
628 if user_role is None:
629 return False
631 # Define fallback permissions based on team role
632 if user_role == "owner":
633 # Team owners get full permissions on their teams
634 return permission in ["teams.read", "teams.update", "teams.delete", "teams.manage_members", "teams.create"]
635 if user_role in ["member"]:
636 # Team members get basic read permissions
637 return permission in ["teams.read"]
639 return False
641 async def _is_team_member(self, user_email: str, team_id: str) -> bool:
642 """Check if user is a member of the specified team.
644 Note: This method delegates to _get_user_team_role to avoid duplicate DB queries.
646 Args:
647 user_email: Email address of the user
648 team_id: Team ID
650 Returns:
651 bool: True if user is a team member
652 """
653 # Delegate to _get_user_team_role to avoid duplicate query
654 return await self._get_user_team_role(user_email, team_id) is not None
656 async def _get_user_team_role(self, user_email: str, team_id: str) -> Optional[str]:
657 """Get user's role in the specified team.
659 Args:
660 user_email: Email address of the user
661 team_id: Team ID
663 Returns:
664 Optional[str]: User's role in the team or None if not a member
665 """
666 # First-Party
667 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
669 member = self.db.execute(select(EmailTeamMember).where(and_(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active))).scalar_one_or_none()
670 self.db.commit() # Release transaction to avoid idle-in-transaction
672 return member.role if member else None
674 async def _check_token_fallback_permissions(self, _user_email: str, permission: str) -> bool:
675 """Check fallback token permissions for authenticated users.
677 All authenticated users can manage their own tokens. The token endpoints
678 already filter by user_email, so this just grants access to the endpoints.
680 Args:
681 _user_email: Email address of the user (unused)
682 permission: Permission being checked
684 Returns:
685 bool: True if user has fallback permission for token operations
686 """
687 # Any authenticated user can create, read, update, and revoke their own tokens
688 # The actual filtering by user_email happens in the token service layer
689 if permission in ["tokens.create", "tokens.read", "tokens.update", "tokens.revoke"]:
690 return True
692 return False