Coverage for mcpgateway / services / permission_service.py: 99%

197 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/permission_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Permission Service for RBAC System. 

8 

9This module provides the core permission checking logic for the RBAC system. 

10It handles role-based permission validation, permission auditing, and caching. 

11""" 

12 

13# Standard 

14from datetime import datetime 

15import logging 

16from typing import Dict, List, Optional, Set 

17 

18# Third-Party 

19from sqlalchemy import and_, or_, select 

20from sqlalchemy.orm import contains_eager, Session 

21 

22# First-Party 

23from mcpgateway.common.validators import SecurityValidator 

24from mcpgateway.config import settings 

25from mcpgateway.db import PermissionAuditLog, Permissions, Role, UserRole, utc_now 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class PermissionService: 

31 """Service for checking and managing user permissions. 

32 

33 Provides role-based permission checking with caching, auditing, 

34 and support for global, team, and personal scopes. 

35 

36 Attributes: 

37 db: Database session 

38 audit_enabled: Whether to log permission checks 

39 cache_ttl: Permission cache TTL in seconds 

40 

41 Examples: 

42 Basic construction and coroutine checks: 

43 >>> from unittest.mock import Mock 

44 >>> service = PermissionService(Mock()) 

45 >>> isinstance(service, PermissionService) 

46 True 

47 >>> import asyncio 

48 >>> asyncio.iscoroutinefunction(service.check_permission) 

49 True 

50 >>> asyncio.iscoroutinefunction(service.get_user_permissions) 

51 True 

52 """ 

53 

54 def __init__(self, db: Session, audit_enabled: Optional[bool] = None): 

55 """Initialize permission service. 

56 

57 Args: 

58 db: Database session 

59 audit_enabled: Whether to enable permission auditing (defaults to settings.permission_audit_enabled / PERMISSION_AUDIT_ENABLED) 

60 """ 

61 self.db = db 

62 if audit_enabled is None: 

63 audit_enabled = settings.permission_audit_enabled 

64 self.audit_enabled = audit_enabled 

65 self._permission_cache: Dict[str, Set[str]] = {} 

66 self._roles_cache: Dict[str, List[UserRole]] = {} 

67 self._cache_timestamps: Dict[str, datetime] = {} 

68 self.cache_ttl = 300 # 5 minutes 

69 

70 async def check_permission( 

71 self, 

72 user_email: str, 

73 permission: str, 

74 resource_type: Optional[str] = None, 

75 resource_id: Optional[str] = None, 

76 team_id: Optional[str] = None, 

77 token_teams: Optional[List[str]] = None, 

78 ip_address: Optional[str] = None, 

79 user_agent: Optional[str] = None, 

80 allow_admin_bypass: bool = True, 

81 check_any_team: bool = False, 

82 ) -> bool: 

83 """Check if user has specific permission. 

84 

85 Checks user's roles across all applicable scopes (global, team, personal) 

86 and returns True if any role grants the required permission. 

87 

88 Args: 

89 user_email: Email of the user to check 

90 permission: Permission to check (e.g., 'tools.create') 

91 resource_type: Type of resource being accessed 

92 resource_id: Specific resource ID if applicable 

93 team_id: Team context for the permission check 

94 token_teams: Normalized token team scope from auth context. 

95 `[]` means public-only scope; `None` means unrestricted 

96 admin scope (when allowed by token semantics). 

97 ip_address: IP address for audit logging 

98 user_agent: User agent for audit logging 

99 allow_admin_bypass: If True, admin users bypass all permission checks. 

100 If False, admins must have explicit permissions. 

101 Default is True for backward compatibility. 

102 check_any_team: If True, check permission across ALL team-scoped roles 

103 (used for list/read endpoints with multi-team session tokens) 

104 

105 Returns: 

106 bool: True if permission is granted, False otherwise 

107 

108 Examples: 

109 Parameter validation helpers: 

110 >>> permission = "users.read" 

111 >>> permission.count('.') == 1 

112 True 

113 >>> team_id = "team-123" 

114 >>> isinstance(team_id, str) 

115 True 

116 >>> from unittest.mock import Mock 

117 >>> service = PermissionService(Mock()) 

118 >>> import asyncio 

119 >>> asyncio.iscoroutinefunction(service.check_permission) 

120 True 

121 """ 

122 try: 

123 # SECURITY: Public-only tokens (teams=[]) must never satisfy ANY permissions 

124 # via admin bypass or team-scoped roles, even when the backing user identity is an admin. 

125 # This enforces strict isolation: token_teams=[] means public-only access at both Layer 1 and Layer 2. 

126 if token_teams is not None and len(token_teams) == 0: 

127 # Public-only tokens: admin bypass is suppressed entirely 

128 if allow_admin_bypass and await self._is_user_admin(user_email): 

129 logger.warning(f"[RBAC] Admin bypass suppressed for public-only token: " f"user={SecurityValidator.sanitize_log_message(user_email)}, permission={permission}") 

130 # Continue to permission check without admin bypass 

131 elif allow_admin_bypass and await self._is_user_admin(user_email): 

132 # Check if user is admin (bypass all permission checks if allowed) 

133 return True 

134 

135 # Get user's effective permissions (uses cache when valid) 

136 user_permissions = await self.get_user_permissions(user_email, team_id, include_all_teams=check_any_team, token_teams=token_teams) 

137 

138 # Check if user has the specific permission or wildcard 

139 granted = permission in user_permissions or Permissions.ALL_PERMISSIONS in user_permissions 

140 

141 # Log the permission check if auditing is enabled 

142 if self.audit_enabled: 

143 # Reuse roles cached by get_user_permissions (no second query) 

144 roles_checked = self._get_roles_for_audit(user_email, team_id) 

145 await self._log_permission_check( 

146 user_email=user_email, 

147 permission=permission, 

148 resource_type=resource_type, 

149 resource_id=resource_id, 

150 team_id=team_id, 

151 granted=granted, 

152 roles_checked=roles_checked, 

153 ip_address=ip_address, 

154 user_agent=user_agent, 

155 ) 

156 

157 logger.debug( 

158 f"Permission check: user={SecurityValidator.sanitize_log_message(user_email)}, permission={permission}, team={SecurityValidator.sanitize_log_message(team_id)}, granted={granted}" 

159 ) 

160 

161 return granted 

162 

163 except Exception as e: 

164 logger.error(f"Error checking permission for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

165 # Default to deny on error 

166 return False 

167 

168 async def has_admin_permission(self, user_email: str, team_id: Optional[str] = None, token_teams: Optional[List[str]] = None) -> bool: 

169 """Check if user has any admin-level permission. 

170 

171 This is used by AdminAuthMiddleware to allow access to /admin/* routes 

172 for users who have admin permissions via RBAC, even if they're not 

173 marked as is_admin in the database. 

174 

175 When team_id is provided (team-scoped request), team-scoped roles are 

176 included in the permission check. When team_id is None, only global 

177 and personal roles are evaluated (original behavior). 

178 

179 Args: 

180 user_email: Email of the user to check 

181 team_id: Optional team ID for team-scoped permission checks. 

182 Must be pre-validated against the user's DB-resolved teams 

183 before passing here. 

184 token_teams: Optional list of team IDs to scope the permission check (Layer 1 narrowing) 

185 

186 Returns: 

187 bool: True if user is an admin OR has any admin.* permission 

188 """ 

189 try: 

190 # SECURITY: Public-only tokens (token_teams=[]) suppress admin bypass 

191 if token_teams is not None and len(token_teams) == 0: 

192 user_permissions = await self.get_user_permissions(user_email, team_id=team_id, token_teams=token_teams) 

193 if Permissions.ALL_PERMISSIONS in user_permissions: 

194 return True 

195 return any(perm.startswith("admin.") for perm in user_permissions) 

196 

197 # First check if user is a database admin 

198 if await self._is_user_admin(user_email): 

199 return True 

200 

201 # Get user's permissions and check for any admin.* permission. 

202 # When team_id is provided, this includes team-scoped roles for 

203 # that team, allowing team members with admin.dashboard to access 

204 # the admin UI in their team context. 

205 user_permissions = await self.get_user_permissions(user_email, team_id=team_id, token_teams=token_teams) 

206 

207 # Check for wildcard or any admin permission 

208 if Permissions.ALL_PERMISSIONS in user_permissions: 

209 return True 

210 

211 # Check for any admin.* permission 

212 for perm in user_permissions: 

213 if perm.startswith("admin."): 

214 return True 

215 

216 return False 

217 

218 except Exception as e: 

219 logger.error(f"Error checking admin permission for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

220 return False 

221 

222 async def get_user_permissions(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False, token_teams: Optional[List[str]] = None) -> Set[str]: 

223 """Get all effective permissions for a user. 

224 

225 Collects permissions from all user's roles across applicable scopes. 

226 Includes role inheritance and handles permission caching. 

227 

228 Args: 

229 user_email: Email of the user 

230 team_id: Optional team context 

231 include_all_teams: If True, include ALL team-scoped roles (for list/read endpoints) 

232 token_teams: Optional list of team IDs from token narrowing. When include_all_teams=True 

233 and token_teams is non-empty, filters team-scoped roles to only include 

234 roles from teams in this list (enforces Layer 1 narrowing at Layer 2) 

235 

236 Returns: 

237 Set[str]: All effective permissions for the user 

238 

239 Examples: 

240 Key shapes and coroutine check: 

241 >>> cache_key = f"user@example.com:{'global'}" 

242 >>> ':' in cache_key 

243 True 

244 >>> from unittest.mock import Mock 

245 >>> service = PermissionService(Mock()) 

246 >>> import asyncio 

247 >>> asyncio.iscoroutinefunction(service.get_user_permissions) 

248 True 

249 """ 

250 # Use distinct cache key for any-team lookups to avoid poisoning global cache. 

251 # token_teams must be encoded in the key: None (unrestricted), [] (public-only), 

252 # and ["team-a"] (narrowed) all produce different permission sets. 

253 if token_teams is None: 

254 tt_suffix = "" 

255 elif len(token_teams) == 0: 

256 tt_suffix = ":__public__" 

257 else: 

258 tt_suffix = f":{','.join(sorted(set(token_teams)))}" 

259 

260 if include_all_teams: 

261 cache_key = f"{user_email}:__anyteam__{tt_suffix}" 

262 else: 

263 cache_key = f"{user_email}:{team_id or 'global'}{tt_suffix}" 

264 if self._is_cache_valid(cache_key): 

265 cached_perms = self._permission_cache[cache_key] 

266 logger.debug(f"[RBAC] Cache hit for {SecurityValidator.sanitize_log_message(user_email)} (team_id={SecurityValidator.sanitize_log_message(team_id)}): {cached_perms}") 

267 return cached_perms 

268 

269 permissions = set() 

270 

271 # Get all active roles for the user (with eager-loaded role relationship) 

272 user_roles = await self._get_user_roles(user_email, team_id, include_all_teams=include_all_teams, token_teams=token_teams) 

273 logger.debug(f"[RBAC] Found {len(user_roles)} roles for {SecurityValidator.sanitize_log_message(user_email)} (team_id={SecurityValidator.sanitize_log_message(team_id)})") 

274 

275 # Collect permissions from all roles 

276 for user_role in user_roles: 

277 role_permissions = user_role.role.get_effective_permissions() 

278 logger.debug(f"[RBAC] Role '{user_role.role.name}' (scope={user_role.scope}, scope_id={user_role.scope_id}) has permissions: {role_permissions}") 

279 permissions.update(role_permissions) 

280 

281 # Cache both permissions and roles 

282 self._permission_cache[cache_key] = permissions 

283 self._roles_cache[cache_key] = user_roles 

284 self._cache_timestamps[cache_key] = utc_now() 

285 

286 return permissions 

287 

288 async def get_user_roles(self, user_email: str, scope: Optional[str] = None, team_id: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

289 """Get user's role assignments. 

290 

291 Args: 

292 user_email: Email of the user 

293 scope: Filter by scope ('global', 'team', 'personal') 

294 team_id: Filter by team ID 

295 include_expired: Whether to include expired roles 

296 

297 Returns: 

298 List[UserRole]: User's role assignments 

299 

300 Examples: 

301 Coroutine check: 

302 >>> from unittest.mock import Mock 

303 >>> service = PermissionService(Mock()) 

304 >>> import asyncio 

305 >>> asyncio.iscoroutinefunction(service.get_user_roles) 

306 True 

307 """ 

308 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

309 

310 if scope: 

311 query = query.where(UserRole.scope == scope) 

312 

313 if team_id: 

314 query = query.where(UserRole.scope_id == team_id) 

315 

316 if not include_expired: 

317 now = utc_now() 

318 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

319 

320 result = self.db.execute(query) 

321 user_roles = result.scalars().all() 

322 return user_roles 

323 

324 async def has_permission_on_resource(self, user_email: str, permission: str, resource_type: str, resource_id: str, team_id: Optional[str] = None) -> bool: 

325 """Check if user has permission on a specific resource. 

326 

327 This method can be extended to include resource-specific 

328 permission logic (e.g., resource ownership, sharing rules). 

329 

330 Args: 

331 user_email: Email of the user 

332 permission: Permission to check 

333 resource_type: Type of resource 

334 resource_id: Specific resource ID 

335 team_id: Team context 

336 

337 Returns: 

338 bool: True if user has permission on the resource 

339 

340 Examples: 

341 Coroutine check and parameter sanity: 

342 >>> from unittest.mock import Mock 

343 >>> service = PermissionService(Mock()) 

344 >>> import asyncio 

345 >>> asyncio.iscoroutinefunction(service.has_permission_on_resource) 

346 True 

347 >>> res_type, res_id = "tools", "tool-123" 

348 >>> all(isinstance(x, str) for x in (res_type, res_id)) 

349 True 

350 """ 

351 # Basic permission check 

352 if not await self.check_permission(user_email=user_email, permission=permission, resource_type=resource_type, resource_id=resource_id, team_id=team_id): 

353 return False 

354 

355 # NOTE: Add resource-specific logic here in future enhancement 

356 # For example: 

357 # - Check resource ownership 

358 # - Check resource sharing permissions 

359 # - Check resource team membership 

360 

361 return True 

362 

363 async def check_resource_ownership(self, user_email: str, resource: any, allow_team_admin: bool = True) -> bool: 

364 """Check if user owns a resource or is a team admin for team resources. 

365 

366 This method checks resource ownership based on the owner_email field 

367 and optionally allows team admins to modify team-scoped resources. 

368 

369 Args: 

370 user_email: Email of the user to check 

371 resource: Resource object with owner_email, team_id, and visibility attributes 

372 allow_team_admin: Whether to allow team admins for team-scoped resources 

373 

374 Returns: 

375 bool: True if user owns the resource or is authorized team admin 

376 

377 Examples: 

378 >>> from unittest.mock import Mock 

379 >>> service = PermissionService(Mock()) 

380 >>> import asyncio 

381 >>> asyncio.iscoroutinefunction(service.check_resource_ownership) 

382 True 

383 """ 

384 # Check if user is platform admin (bypass ownership checks) 

385 if await self._is_user_admin(user_email): 

386 return True 

387 

388 # Check direct ownership 

389 if hasattr(resource, "owner_email") and resource.owner_email == user_email: 

390 return True 

391 

392 # Check team admin permission for team resources 

393 if allow_team_admin and hasattr(resource, "visibility") and resource.visibility == "team": 

394 if hasattr(resource, "team_id") and resource.team_id: 

395 user_role = await self._get_user_team_role(user_email, resource.team_id) 

396 if user_role == "owner": 

397 return True 

398 

399 return False 

400 

401 async def check_admin_permission(self, user_email: str, token_teams: Optional[List[str]] = None) -> bool: 

402 """Check if user has any admin permissions. 

403 

404 Args: 

405 user_email: Email of the user 

406 token_teams: Optional list of team IDs to scope the permission check (Layer 1 narrowing) 

407 

408 Returns: 

409 bool: True if user has admin permissions 

410 

411 Examples: 

412 Coroutine check: 

413 >>> from unittest.mock import Mock 

414 >>> service = PermissionService(Mock()) 

415 >>> import asyncio 

416 >>> asyncio.iscoroutinefunction(service.check_admin_permission) 

417 True 

418 """ 

419 # SECURITY: Public-only tokens (token_teams=[]) suppress admin bypass 

420 if token_teams is not None and len(token_teams) == 0: 

421 # Public-only token: check permissions without admin bypass 

422 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS] 

423 user_permissions = await self.get_user_permissions(user_email, token_teams=token_teams) 

424 return any(perm in user_permissions for perm in admin_permissions) 

425 

426 # First check if user is admin (handles platform admin virtual user) 

427 if await self._is_user_admin(user_email): 

428 return True 

429 

430 admin_permissions = [Permissions.ADMIN_SYSTEM_CONFIG, Permissions.ADMIN_USER_MANAGEMENT, Permissions.ADMIN_SECURITY_AUDIT, Permissions.ALL_PERMISSIONS] 

431 

432 user_permissions = await self.get_user_permissions(user_email, token_teams=token_teams) 

433 return any(perm in user_permissions for perm in admin_permissions) 

434 

435 def clear_user_cache(self, user_email: str) -> None: 

436 """Clear cached permissions for a user. 

437 

438 Should be called when user's roles change. 

439 

440 Args: 

441 user_email: Email of the user 

442 

443 Examples: 

444 Cache invalidation behavior: 

445 >>> from unittest.mock import Mock 

446 >>> service = PermissionService(Mock()) 

447 >>> service._permission_cache = {"alice:global": {"tools.read"}, "bob:team1": {"*"}} 

448 >>> service._cache_timestamps = {"alice:global": utc_now(), "bob:team1": utc_now()} 

449 >>> service.clear_user_cache("alice") 

450 >>> "alice:global" in service._permission_cache 

451 False 

452 >>> "bob:team1" in service._permission_cache 

453 True 

454 """ 

455 keys_to_remove = [key for key in self._permission_cache if key.startswith(f"{user_email}:")] 

456 

457 for key in keys_to_remove: 

458 self._permission_cache.pop(key, None) 

459 self._roles_cache.pop(key, None) 

460 self._cache_timestamps.pop(key, None) 

461 

462 logger.debug(f"Cleared permission cache for user: {SecurityValidator.sanitize_log_message(user_email)}") 

463 

464 def clear_cache(self) -> None: 

465 """Clear all cached permissions. 

466 

467 Examples: 

468 Clear all cache: 

469 >>> from unittest.mock import Mock 

470 >>> service = PermissionService(Mock()) 

471 >>> service._permission_cache = {"x": {"p"}} 

472 >>> service._cache_timestamps = {"x": utc_now()} 

473 >>> service.clear_cache() 

474 >>> service._permission_cache == {} 

475 True 

476 >>> service._cache_timestamps == {} 

477 True 

478 """ 

479 self._permission_cache.clear() 

480 self._roles_cache.clear() 

481 self._cache_timestamps.clear() 

482 logger.debug("Cleared all permission cache") 

483 

484 async def _get_user_roles(self, user_email: str, team_id: Optional[str] = None, include_all_teams: bool = False, token_teams: Optional[List[str]] = None) -> List[UserRole]: 

485 """Get user roles for permission checking. 

486 

487 Always includes global and personal roles. Team-scoped role inclusion 

488 depends on the parameters: 

489 

490 - team_id provided: includes team roles for that specific team 

491 (plus team roles with scope_id=NULL which apply to all teams) 

492 - team_id=None, include_all_teams=True: includes ALL team-scoped roles 

493 EXCEPT roles on personal teams (which auto-grant team_admin to every user) 

494 - team_id=None, include_all_teams=False: includes only team-scoped roles 

495 with scope_id=NULL (roles that apply to all teams, e.g. during login) 

496 

497 Args: 

498 user_email: Email address of the user 

499 team_id: Optional team ID to filter to a specific team's roles 

500 include_all_teams: If True, include ALL team-scoped roles (for list/read with session tokens) 

501 token_teams: Optional list of team IDs from token narrowing. When include_all_teams=True 

502 and token_teams is non-empty, filters team-scoped roles to only include 

503 roles from teams in this list (enforces Layer 1 narrowing at Layer 2) 

504 

505 Returns: 

506 List[UserRole]: List of active roles for the user 

507 """ 

508 query = select(UserRole).join(Role).options(contains_eager(UserRole.role)).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

509 

510 # Include global roles and personal roles 

511 scope_conditions = [UserRole.scope == "global", UserRole.scope == "personal"] 

512 

513 if team_id: 

514 # Security: Verify team_id is within token scope when narrowed. 

515 # Public-only tokens (token_teams=[]) must never access team-specific roles. 

516 # When team_id is out of scope, we skip adding team-scoped roles but still 

517 # return global and personal roles (needed for join endpoint and other operations). 

518 if token_teams is not None and (len(token_teams) == 0 or team_id not in token_teams): 

519 logger.debug( 

520 f"[RBAC] Team {SecurityValidator.sanitize_log_message(team_id)} not in token scope " 

521 f"{SecurityValidator.sanitize_log_message(token_teams)} for {SecurityValidator.sanitize_log_message(user_email)}: " 

522 f"excluding team-scoped roles but keeping global/personal roles" 

523 ) 

524 else: 

525 # Team is in scope: include team-scoped roles for this team 

526 scope_conditions.append(and_(UserRole.scope == "team", or_(UserRole.scope_id == team_id, UserRole.scope_id.is_(None)))) 

527 elif include_all_teams: 

528 # Include ALL team-scoped roles EXCEPT personal team roles. 

529 # Personal teams are auto-created for every user with team_admin permissions, 

530 # so including them would grant every user full mutate permissions (servers.create, 

531 # tools.create, etc.) when check_any_team=True, making RBAC ineffective. 

532 # First-Party 

533 from mcpgateway.db import EmailTeam # pylint: disable=import-outside-toplevel 

534 

535 base_condition = and_( 

536 UserRole.scope == "team", 

537 or_( 

538 UserRole.scope_id.is_(None), 

539 ~UserRole.scope_id.in_(select(EmailTeam.id).where(EmailTeam.is_personal.is_(True))), 

540 ), 

541 ) 

542 

543 # SECURITY: Filter team-scoped roles based on token_teams 

544 # - token_teams=None (un-narrowed): include all team roles 

545 # - token_teams=[] (public-only): exclude ALL team roles (strict isolation) 

546 # - token_teams=["team-a", ...]: include only specified teams 

547 if token_teams is not None: 

548 if len(token_teams) == 0: 

549 # Public-only token: exclude ALL team-scoped roles 

550 # Only global and personal roles remain 

551 logger.debug(f"[RBAC] Public-only token for {SecurityValidator.sanitize_log_message(user_email)}: excluding all team-scoped roles") 

552 # Do NOT append base_condition - this excludes all team roles 

553 else: 

554 # Narrowed token: include only specified teams 

555 base_condition = and_( 

556 base_condition, 

557 or_(UserRole.scope_id.is_(None), UserRole.scope_id.in_(token_teams)), # Keep global team roles # Only roles from narrowed teams 

558 ) 

559 scope_conditions.append(base_condition) 

560 else: 

561 # Un-narrowed token: include all team roles (original behavior) 

562 scope_conditions.append(base_condition) 

563 else: 

564 # When team_id is None and include_all_teams is False (e.g., during login), 

565 # include team-scoped roles with scope_id=None (roles that apply to all teams). 

566 # SECURITY: Public-only tokens must not access any team-scoped roles. 

567 if token_teams is None or len(token_teams) > 0: 

568 scope_conditions.append(and_(UserRole.scope == "team", UserRole.scope_id.is_(None))) 

569 

570 query = query.where(or_(*scope_conditions)) 

571 

572 # Filter out expired roles 

573 now = utc_now() 

574 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

575 

576 result = self.db.execute(query) 

577 user_roles = result.unique().scalars().all() 

578 return user_roles 

579 

580 async def _log_permission_check( 

581 self, 

582 user_email: str, 

583 permission: str, 

584 resource_type: Optional[str], 

585 resource_id: Optional[str], 

586 team_id: Optional[str], 

587 granted: bool, 

588 roles_checked: Dict, 

589 ip_address: Optional[str], 

590 user_agent: Optional[str], 

591 ) -> None: 

592 """Log permission check for auditing. 

593 

594 Args: 

595 user_email: Email address of the user 

596 permission: Permission being checked 

597 resource_type: Type of resource being accessed 

598 resource_id: ID of specific resource 

599 team_id: ID of team context 

600 granted: Whether permission was granted 

601 roles_checked: Dictionary of roles that were checked 

602 ip_address: IP address of request 

603 user_agent: User agent of request 

604 """ 

605 audit_log = PermissionAuditLog( 

606 user_email=user_email, 

607 permission=permission, 

608 resource_type=resource_type, 

609 resource_id=resource_id, 

610 team_id=team_id, 

611 granted=granted, 

612 roles_checked=roles_checked, 

613 ip_address=ip_address, 

614 user_agent=user_agent, 

615 ) 

616 

617 self.db.add(audit_log) 

618 self.db.commit() 

619 

620 def _get_roles_for_audit(self, user_email: str, team_id: Optional[str]) -> Dict: 

621 """Get role information for audit logging from cached roles. 

622 

623 Uses roles cached by get_user_permissions() to avoid a duplicate DB query. 

624 

625 Args: 

626 user_email: Email address of the user. 

627 team_id: Optional team ID for context. 

628 

629 Returns: 

630 Dict: Role information for audit logging 

631 """ 

632 cache_key = f"{user_email}:{team_id or 'global'}" 

633 user_roles = self._roles_cache.get(cache_key, []) 

634 return {"roles": [{"id": ur.role_id, "name": ur.role.name, "scope": ur.scope, "permissions": ur.role.permissions} for ur in user_roles]} 

635 

636 def _is_cache_valid(self, cache_key: str) -> bool: 

637 """Check if cached permissions are still valid. 

638 

639 Args: 

640 cache_key: Cache key to check validity for 

641 

642 Returns: 

643 bool: True if cache is valid, False otherwise 

644 """ 

645 if cache_key not in self._permission_cache: 

646 return False 

647 

648 if cache_key not in self._cache_timestamps: 

649 return False 

650 

651 age = utc_now() - self._cache_timestamps[cache_key] 

652 return age.total_seconds() < self.cache_ttl 

653 

654 async def _is_user_admin(self, user_email: str) -> bool: 

655 """Check if user is admin by looking up user record directly. 

656 

657 Args: 

658 user_email: Email address of the user 

659 

660 Returns: 

661 bool: True if user is admin 

662 """ 

663 # First-Party 

664 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel 

665 

666 # Special case for platform admin (virtual user) 

667 if user_email == getattr(settings, "platform_admin_email", ""): 

668 return True 

669 

670 user = self.db.execute(select(EmailUser).where(EmailUser.email == user_email)).scalar_one_or_none() 

671 return bool(user and user.is_admin) 

672 

673 async def _check_team_fallback_permissions(self, user_email: str, permission: str, team_id: Optional[str]) -> bool: 

674 """Check fallback team permissions for users without explicit RBAC roles. 

675 

676 This provides basic team management permissions for authenticated users on teams they belong to. 

677 

678 Args: 

679 user_email: Email address of the user 

680 permission: Permission being checked 

681 team_id: Team ID context 

682 

683 Returns: 

684 bool: True if user has fallback permission 

685 """ 

686 if not team_id: 

687 # For global team operations, allow authenticated users to read their teams and create new teams 

688 if permission in ["teams.create", "teams.read"]: 

689 return True 

690 return False 

691 

692 # Get user's role in the team (single query instead of two separate queries) 

693 user_role = await self._get_user_team_role(user_email, team_id) 

694 

695 # If user is not a member (role is None), deny access 

696 if user_role is None: 

697 return False 

698 

699 # Define fallback permissions based on team role 

700 if user_role == "owner": 

701 # Team owners get full permissions on their teams 

702 return permission in ["teams.read", "teams.update", "teams.delete", "teams.manage_members", "teams.create"] 

703 if user_role in ["member"]: 

704 # Team members get basic read permissions 

705 return permission in ["teams.read"] 

706 

707 return False 

708 

709 async def _is_team_member(self, user_email: str, team_id: str) -> bool: 

710 """Check if user is a member of the specified team. 

711 

712 Note: This method delegates to _get_user_team_role to avoid duplicate DB queries. 

713 

714 Args: 

715 user_email: Email address of the user 

716 team_id: Team ID 

717 

718 Returns: 

719 bool: True if user is a team member 

720 """ 

721 # Delegate to _get_user_team_role to avoid duplicate query 

722 return await self._get_user_team_role(user_email, team_id) is not None 

723 

724 async def _get_user_team_role(self, user_email: str, team_id: str) -> Optional[str]: 

725 """Get user's role in the specified team. 

726 

727 Args: 

728 user_email: Email address of the user 

729 team_id: Team ID 

730 

731 Returns: 

732 Optional[str]: User's role in the team or None if not a member 

733 """ 

734 # First-Party 

735 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

736 

737 member = self.db.execute(select(EmailTeamMember).where(and_(EmailTeamMember.user_email == user_email, EmailTeamMember.team_id == team_id, EmailTeamMember.is_active))).scalar_one_or_none() 

738 self.db.commit() # Release transaction to avoid idle-in-transaction 

739 

740 return member.role if member else None 

741 

742 async def _check_token_fallback_permissions(self, _user_email: str, permission: str) -> bool: 

743 """Check fallback token permissions for authenticated users. 

744 

745 All authenticated users can manage their own tokens. The token endpoints 

746 already filter by user_email, so this just grants access to the endpoints. 

747 

748 Args: 

749 _user_email: Email address of the user (unused) 

750 permission: Permission being checked 

751 

752 Returns: 

753 bool: True if user has fallback permission for token operations 

754 """ 

755 # Any authenticated user can create, read, update, and revoke their own tokens 

756 # The actual filtering by user_email happens in the token service layer 

757 if permission in ["tokens.create", "tokens.read", "tokens.update", "tokens.revoke"]: 

758 return True 

759 

760 return False