Coverage for mcpgateway / services / permission_service.py: 99%
173 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/permission_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Permission Service for RBAC System.
9This module provides the core permission checking logic for the RBAC system.
10It handles role-based permission validation, permission auditing, and caching.
11"""
13# Standard
14from datetime import datetime
15import logging
16from typing import Dict, List, Optional, Set
18# Third-Party
19from sqlalchemy import and_, or_, select
20from sqlalchemy.orm import contains_eager, Session
22# First-Party
23from mcpgateway.config import settings
24from mcpgateway.db import PermissionAuditLog, Permissions, Role, UserRole, utc_now
26logger = logging.getLogger(__name__)
29class PermissionService:
30 """Service for checking and managing user permissions.
32 Provides role-based permission checking with caching, auditing,
33 and support for global, team, and personal scopes.
35 Attributes:
36 db: Database session
37 audit_enabled: Whether to log permission checks
38 cache_ttl: Permission cache TTL in seconds
40 Examples:
41 Basic construction and coroutine checks:
42 >>> from unittest.mock import Mock
43 >>> service = PermissionService(Mock())
44 >>> isinstance(service, PermissionService)
45 True
46 >>> import asyncio
47 >>> asyncio.iscoroutinefunction(service.check_permission)
48 True
49 >>> asyncio.iscoroutinefunction(service.get_user_permissions)
50 True
51 """
53 def __init__(self, db: Session, audit_enabled: Optional[bool] = None):
54 """Initialize permission service.
56 Args:
57 db: Database session
58 audit_enabled: Whether to enable permission auditing (defaults to settings.permission_audit_enabled / PERMISSION_AUDIT_ENABLED)
59 """
60 self.db = db
61 if audit_enabled is None:
62 audit_enabled = settings.permission_audit_enabled
63 self.audit_enabled = audit_enabled
64 self._permission_cache: Dict[str, Set[str]] = {}
65 self._roles_cache: Dict[str, List[UserRole]] = {}
66 self._cache_timestamps: Dict[str, datetime] = {}
67 self.cache_ttl = 300 # 5 minutes
69 async def check_permission(
70 self,
71 user_email: str,
72 permission: str,
73 resource_type: Optional[str] = None,
74 resource_id: Optional[str] = None,
75 team_id: Optional[str] = None,
76 ip_address: Optional[str] = None,
77 user_agent: Optional[str] = None,
78 allow_admin_bypass: bool = True,
79 check_any_team: bool = False,
80 ) -> bool:
81 """Check if user has specific permission.
83 Checks user's roles across all applicable scopes (global, team, personal)
84 and returns True if any role grants the required permission.
86 Args:
87 user_email: Email of the user to check
88 permission: Permission to check (e.g., 'tools.create')
89 resource_type: Type of resource being accessed
90 resource_id: Specific resource ID if applicable
91 team_id: Team context for the permission check
92 ip_address: IP address for audit logging
93 user_agent: User agent for audit logging
94 allow_admin_bypass: If True, admin users bypass all permission checks.
95 If False, admins must have explicit permissions.
96 Default is True for backward compatibility.
97 check_any_team: If True, check permission across ALL team-scoped roles
98 (used for list/read endpoints with multi-team session tokens)
100 Returns:
101 bool: True if permission is granted, False otherwise
103 Examples:
104 Parameter validation helpers:
105 >>> permission = "users.read"
106 >>> permission.count('.') == 1
107 True
108 >>> team_id = "team-123"
109 >>> isinstance(team_id, str)
110 True
111 >>> from unittest.mock import Mock
112 >>> service = PermissionService(Mock())
113 >>> import asyncio
114 >>> asyncio.iscoroutinefunction(service.check_permission)
115 True
116 """
117 try:
118 # Check if user is admin (bypass all permission checks if allowed)
119 if allow_admin_bypass and await self._is_user_admin(user_email):
120 return True
122 # Get user's effective permissions (uses cache when valid)
123 user_permissions = await self.get_user_permissions(user_email, team_id, include_all_teams=check_any_team)
125 # Check if user has the specific permission or wildcard
126 granted = permission in user_permissions or Permissions.ALL_PERMISSIONS in user_permissions
128 # If no explicit permissions found, check fallback permissions for team operations
129 if not granted and permission.startswith("teams."):
130 granted = await self._check_team_fallback_permissions(user_email, permission, team_id)
132 # If no explicit permissions found, check fallback permissions for token operations
133 if not granted and permission.startswith("tokens."):
134 granted = await self._check_token_fallback_permissions(user_email, permission)
136 # Log the permission check if auditing is enabled
137 if self.audit_enabled:
138 # Reuse roles cached by get_user_permissions (no second query)
139 roles_checked = self._get_roles_for_audit(user_email, team_id)
140 await self._log_permission_check(
141 user_email=user_email,
142 permission=permission,
143 resource_type=resource_type,
144 resource_id=resource_id,
145 team_id=team_id,
146 granted=granted,
147 roles_checked=roles_checked,
148 ip_address=ip_address,
149 user_agent=user_agent,
150 )
152 logger.debug(f"Permission check: user={user_email}, permission={permission}, team={team_id}, granted={granted}")
154 return granted
156 except Exception as e:
157 logger.error(f"Error checking permission for {user_email}: {e}")
158 # Default to deny on error
159 return False
161 async def has_admin_permission(self, user_email: str) -> bool:
162 """Check if user has any admin-level permission.
164 This is used by AdminAuthMiddleware to allow access to /admin/* routes
165 for users who have admin permissions via RBAC, even if they're not
166 marked as is_admin in the database.
168 Args:
169 user_email: Email of the user to check
171 Returns:
172 bool: True if user is an admin OR has any admin.* permission
173 """
174 try:
175 # First check if user is a database admin
176 if await self._is_user_admin(user_email):
177 return True
179 # Get user's permissions and check for any admin.* permission
180 user_permissions = await self.get_user_permissions(user_email)
182 # Check for wildcard or any admin permission
183 if Permissions.ALL_PERMISSIONS in user_permissions:
184 return True
186 # Check for any admin.* permission
187 for perm in user_permissions:
188 if perm.startswith("admin."):
189 return True
191 return False
193 except Exception as e:
194 logger.error(f"Error checking admin permission for {user_email}: {e}")
195 return False
197 async def get_user_permissions(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> Set[str]:
198 """Get all effective permissions for a user.
200 Collects permissions from all user's roles across applicable scopes.
201 Includes role inheritance and handles permission caching.
203 Args:
204 user_email: Email of the user
205 team_id: Optional team context
206 include_all_teams: If True, include ALL team-scoped roles (for list/read endpoints)
208 Returns:
209 Set[str]: All effective permissions for the user
211 Examples:
212 Key shapes and coroutine check:
213 >>> cache_key = f"user@example.com:{'global'}"
214 >>> ':' in cache_key
215 True
216 >>> from unittest.mock import Mock
217 >>> service = PermissionService(Mock())
218 >>> import asyncio
219 >>> asyncio.iscoroutinefunction(service.get_user_permissions)
220 True
221 """
222 # Use distinct cache key for any-team lookups to avoid poisoning global cache
223 if include_all_teams:
224 cache_key = f"{user_email}:__anyteam__"
225 else:
226 cache_key = f"{user_email}:{team_id or 'global'}"
227 if self._is_cache_valid(cache_key):
228 cached_perms = self._permission_cache[cache_key]
229 logger.debug(f"[RBAC] Cache hit for {user_email} (team_id={team_id}): {cached_perms}")
230 return cached_perms
232 permissions = set()
234 # Get all active roles for the user (with eager-loaded role relationship)
235 user_roles = await self._get_user_roles(user_email, team_id, include_all_teams=include_all_teams)
236 logger.debug(f"[RBAC] Found {len(user_roles)} roles for {user_email} (team_id={team_id})")
238 # Collect permissions from all roles
239 for user_role in user_roles:
240 role_permissions = user_role.role.get_effective_permissions()
241 logger.debug(f"[RBAC] Role '{user_role.role.name}' (scope={user_role.scope}, scope_id={user_role.scope_id}) has permissions: {role_permissions}")
242 permissions.update(role_permissions)
244 # Cache both permissions and roles
245 self._permission_cache[cache_key] = permissions
246 self._roles_cache[cache_key] = user_roles
247 self._cache_timestamps[cache_key] = utc_now()
249 return permissions
251 async def get_user_roles(self, user_email: str, scope: Optional[str] = None, team_id: Optional[str] = None, include_expired: bool = False) -> List[UserRole]:
252 """Get user's role assignments.
254 Args:
255 user_email: Email of the user
256 scope: Filter by scope ('global', 'team', 'personal')
257 team_id: Filter by team ID
258 include_expired: Whether to include expired roles
260 Returns:
261 List[UserRole]: User's role assignments
263 Examples:
264 Coroutine check:
265 >>> from unittest.mock import Mock
266 >>> service = PermissionService(Mock())
267 >>> import asyncio
268 >>> asyncio.iscoroutinefunction(service.get_user_roles)
269 True
270 """
271 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
273 if scope:
274 query = query.where(UserRole.scope == scope)
276 if team_id:
277 query = query.where(UserRole.scope_id == team_id)
279 if not include_expired:
280 now = utc_now()
281 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
283 result = self.db.execute(query)
284 user_roles = result.scalars().all()
285 return user_roles
287 async def has_permission_on_resource(self, user_email: str, permission: str, resource_type: str, resource_id: str, team_id: Optional[str] = None) -> bool:
288 """Check if user has permission on a specific resource.
290 This method can be extended to include resource-specific
291 permission logic (e.g., resource ownership, sharing rules).
293 Args:
294 user_email: Email of the user
295 permission: Permission to check
296 resource_type: Type of resource
297 resource_id: Specific resource ID
298 team_id: Team context
300 Returns:
301 bool: True if user has permission on the resource
303 Examples:
304 Coroutine check and parameter sanity:
305 >>> from unittest.mock import Mock
306 >>> service = PermissionService(Mock())
307 >>> import asyncio
308 >>> asyncio.iscoroutinefunction(service.has_permission_on_resource)
309 True
310 >>> res_type, res_id = "tools", "tool-123"
311 >>> all(isinstance(x, str) for x in (res_type, res_id))
312 True
313 """
314 # Basic permission check
315 if not await self.check_permission(user_email=user_email, permission=permission, resource_type=resource_type, resource_id=resource_id, team_id=team_id):
316 return False
318 # NOTE: Add resource-specific logic here in future enhancement
319 # For example:
320 # - Check resource ownership
321 # - Check resource sharing permissions
322 # - Check resource team membership
324 return True
326 async def check_resource_ownership(self, user_email: str, resource: any, allow_team_admin: bool = True) -> bool:
327 """Check if user owns a resource or is a team admin for team resources.
329 This method checks resource ownership based on the owner_email field
330 and optionally allows team admins to modify team-scoped resources.
332 Args:
333 user_email: Email of the user to check
334 resource: Resource object with owner_email, team_id, and visibility attributes
335 allow_team_admin: Whether to allow team admins for team-scoped resources
337 Returns:
338 bool: True if user owns the resource or is authorized team admin
340 Examples:
341 >>> from unittest.mock import Mock
342 >>> service = PermissionService(Mock())
343 >>> import asyncio
344 >>> asyncio.iscoroutinefunction(service.check_resource_ownership)
345 True
346 """
347 # Check if user is platform admin (bypass ownership checks)
348 if await self._is_user_admin(user_email):
349 return True
351 # Check direct ownership
352 if hasattr(resource, "owner_email") and resource.owner_email == user_email:
353 return True
355 # Check team admin permission for team resources
356 if allow_team_admin and hasattr(resource, "visibility") and resource.visibility == "team":
357 if hasattr(resource, "team_id") and resource.team_id: 357 ↛ 362line 357 didn't jump to line 362 because the condition on line 357 was always true
358 user_role = await self._get_user_team_role(user_email, resource.team_id)
359 if user_role == "owner":
360 return True
362 return False
364 async def check_admin_permission(self, user_email: str) -> bool:
365 """Check if user has any admin permissions.
367 Args:
368 user_email: Email of the user
370 Returns:
371 bool: True if user has admin permissions
373 Examples:
374 Coroutine check:
375 >>> from unittest.mock import Mock
376 >>> service = PermissionService(Mock())
377 >>> import asyncio
378 >>> asyncio.iscoroutinefunction(service.check_admin_permission)
379 True
380 """
381 # First check if user is admin (handles platform admin virtual user)
382 if await self._is_user_admin(user_email):
383 return True
385 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS]
387 user_permissions = await self.get_user_permissions(user_email)
388 return any(perm in user_permissions for perm in admin_permissions)
390 def clear_user_cache(self, user_email: str) -> None:
391 """Clear cached permissions for a user.
393 Should be called when user's roles change.
395 Args:
396 user_email: Email of the user
398 Examples:
399 Cache invalidation behavior:
400 >>> from unittest.mock import Mock
401 >>> service = PermissionService(Mock())
402 >>> service._permission_cache = {"alice:global": {"tools.read"}, "bob:team1": {"*"}}
403 >>> service._cache_timestamps = {"alice:global": utc_now(), "bob:team1": utc_now()}
404 >>> service.clear_user_cache("alice")
405 >>> "alice:global" in service._permission_cache
406 False
407 >>> "bob:team1" in service._permission_cache
408 True
409 """
410 keys_to_remove = [key for key in self._permission_cache if key.startswith(f"{user_email}:")]
412 for key in keys_to_remove:
413 self._permission_cache.pop(key, None)
414 self._roles_cache.pop(key, None)
415 self._cache_timestamps.pop(key, None)
417 logger.debug(f"Cleared permission cache for user: {user_email}")
419 def clear_cache(self) -> None:
420 """Clear all cached permissions.
422 Examples:
423 Clear all cache:
424 >>> from unittest.mock import Mock
425 >>> service = PermissionService(Mock())
426 >>> service._permission_cache = {"x": {"p"}}
427 >>> service._cache_timestamps = {"x": utc_now()}
428 >>> service.clear_cache()
429 >>> service._permission_cache == {}
430 True
431 >>> service._cache_timestamps == {}
432 True
433 """
434 self._permission_cache.clear()
435 self._roles_cache.clear()
436 self._cache_timestamps.clear()
437 logger.debug("Cleared all permission cache")
439 async def _get_user_roles(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False) -> List[UserRole]:
440 """Get user roles for permission checking.
442 Always includes global and personal roles. Team-scoped role inclusion
443 depends on the parameters:
445 - team_id provided: includes team roles for that specific team
446 (plus team roles with scope_id=NULL which apply to all teams)
447 - team_id=None, include_all_teams=True: includes ALL team-scoped roles
448 - team_id=None, include_all_teams=False: includes only team-scoped roles
449 with scope_id=NULL (roles that apply to all teams, e.g. during login)
451 Args:
452 user_email: Email address of the user
453 team_id: Optional team ID to filter to a specific team's roles
454 include_all_teams: If True, include ALL team-scoped roles (for list/read with session tokens)
456 Returns:
457 List[UserRole]: List of active roles for the user
458 """
459 query = select(UserRole).join(Role).options(contains_eager(UserRole.role)).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
461 # Include global roles and personal roles
462 scope_conditions = [UserRole.scope == "global", UserRole.scope == "personal"]
464 if team_id:
465 # Filter to specific team's roles only
466 scope_conditions.append(and_(UserRole.scope == "team", or_(UserRole.scope_id == team_id, UserRole.scope_id.is_(None))))
467 elif include_all_teams:
468 # Include ALL team-scoped roles (for list/read endpoints with session tokens)
469 scope_conditions.append(UserRole.scope == "team")
470 else:
471 # When team_id is None and include_all_teams is False (e.g., during login),
472 # include team-scoped roles with scope_id=None (roles that apply to all teams)
473 scope_conditions.append(and_(UserRole.scope == "team", UserRole.scope_id.is_(None)))
475 query = query.where(or_(*scope_conditions))
477 # Filter out expired roles
478 now = utc_now()
479 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
481 result = self.db.execute(query)
482 user_roles = result.unique().scalars().all()
483 return user_roles
485 async def _log_permission_check(
486 self,
487 user_email: str,
488 permission: str,
489 resource_type: Optional[str],
490 resource_id: Optional[str],
491 team_id: Optional[str],
492 granted: bool,
493 roles_checked: Dict,
494 ip_address: Optional[str],
495 user_agent: Optional[str],
496 ) -> None:
497 """Log permission check for auditing.
499 Args:
500 user_email: Email address of the user
501 permission: Permission being checked
502 resource_type: Type of resource being accessed
503 resource_id: ID of specific resource
504 team_id: ID of team context
505 granted: Whether permission was granted
506 roles_checked: Dictionary of roles that were checked
507 ip_address: IP address of request
508 user_agent: User agent of request
509 """
510 audit_log = PermissionAuditLog(
511 user_email=user_email,
512 permission=permission,
513 resource_type=resource_type,
514 resource_id=resource_id,
515 team_id=team_id,
516 granted=granted,
517 roles_checked=roles_checked,
518 ip_address=ip_address,
519 user_agent=user_agent,
520 )
522 self.db.add(audit_log)
523 self.db.commit()
525 def _get_roles_for_audit(self, user_email: str, team_id: Optional[str]) -> Dict:
526 """Get role information for audit logging from cached roles.
528 Uses roles cached by get_user_permissions() to avoid a duplicate DB query.
530 Args:
531 user_email: Email address of the user.
532 team_id: Optional team ID for context.
534 Returns:
535 Dict: Role information for audit logging
536 """
537 cache_key = f"{user_email}:{team_id or 'global'}"
538 user_roles = self._roles_cache.get(cache_key, [])
539 return {"roles": [{"id": ur.role_id, "name": ur.role.name, "scope": ur.scope, "permissions": ur.role.permissions} for ur in user_roles]}
541 def _is_cache_valid(self, cache_key: str) -> bool:
542 """Check if cached permissions are still valid.
544 Args:
545 cache_key: Cache key to check validity for
547 Returns:
548 bool: True if cache is valid, False otherwise
549 """
550 if cache_key not in self._permission_cache:
551 return False
553 if cache_key not in self._cache_timestamps:
554 return False
556 age = utc_now() - self._cache_timestamps[cache_key]
557 return age.total_seconds() < self.cache_ttl
559 async def _is_user_admin(self, user_email: str) -> bool:
560 """Check if user is admin by looking up user record directly.
562 Args:
563 user_email: Email address of the user
565 Returns:
566 bool: True if user is admin
567 """
568 # First-Party
569 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel
571 # Special case for platform admin (virtual user)
572 if user_email == getattr(settings, "platform_admin_email", ""):
573 return True
575 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none()
576 return bool(user and user.is_admin)
578 async def _check_team_fallback_permissions(self, user_email: str, permission: str, team_id: Optional[str]) -> bool:
579 """Check fallback team permissions for users without explicit RBAC roles.
581 This provides basic team management permissions for authenticated users on teams they belong to.
583 Args:
584 user_email: Email address of the user
585 permission: Permission being checked
586 team_id: Team ID context
588 Returns:
589 bool: True if user has fallback permission
590 """
591 if not team_id:
592 # For global team operations, allow authenticated users to read their teams and create new teams
593 if permission in ["teams.create", "teams.read"]:
594 return True
595 return False
597 # Get user's role in the team (single query instead of two separate queries)
598 user_role = await self._get_user_team_role(user_email, team_id)
600 # If user is not a member (role is None), deny access
601 if user_role is None:
602 return False
604 # Define fallback permissions based on team role
605 if user_role == "owner":
606 # Team owners get full permissions on their teams
607 return permission in ["teams.read", "teams.update", "teams.delete", "teams.manage_members", "teams.create"]
608 if user_role in ["member"]:
609 # Team members get basic read permissions
610 return permission in ["teams.read"]
612 return False
614 async def _is_team_member(self, user_email: str, team_id: str) -> bool:
615 """Check if user is a member of the specified team.
617 Note: This method delegates to _get_user_team_role to avoid duplicate DB queries.
619 Args:
620 user_email: Email address of the user
621 team_id: Team ID
623 Returns:
624 bool: True if user is a team member
625 """
626 # Delegate to _get_user_team_role to avoid duplicate query
627 return await self._get_user_team_role(user_email, team_id) is not None
629 async def _get_user_team_role(self, user_email: str, team_id: str) -> Optional[str]:
630 """Get user's role in the specified team.
632 Args:
633 user_email: Email address of the user
634 team_id: Team ID
636 Returns:
637 Optional[str]: User's role in the team or None if not a member
638 """
639 # First-Party
640 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
642 member = self.db.execute(select(EmailTeamMember).where(and_(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active))).scalar_one_or_none()
643 self.db.commit() # Release transaction to avoid idle-in-transaction
645 return member.role if member else None
647 async def _check_token_fallback_permissions(self, _user_email: str, permission: str) -> bool:
648 """Check fallback token permissions for authenticated users.
650 All authenticated users can manage their own tokens. The token endpoints
651 already filter by user_email, so this just grants access to the endpoints.
653 Args:
654 _user_email: Email address of the user (unused)
655 permission: Permission being checked
657 Returns:
658 bool: True if user has fallback permission for token operations
659 """
660 # Any authenticated user can create, read, update, and revoke their own tokens
661 # The actual filtering by user_email happens in the token service layer
662 if permission in ["tokens.create", "tokens.read", "tokens.update", "tokens.revoke"]:
663 return True
665 return False