Coverage for mcpgateway / services / role_service.py: 99%
180 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/role_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Role Management Service for RBAC System.
9This module provides CRUD operations for roles and user role assignments.
10It handles role creation, assignment, revocation, and validation.
11"""
13# Standard
14from datetime import datetime
15import logging
16from typing import List, Optional
18# Third-Party
19from sqlalchemy import and_, delete, select
20from sqlalchemy.orm import Session
22# First-Party
23from mcpgateway.db import Permissions, Role, UserRole, utc_now
25logger = logging.getLogger(__name__)
28class RoleService:
29 """Service for managing roles and role assignments.
31 Provides comprehensive role management including creation, assignment,
32 revocation, and validation with support for role inheritance.
34 Attributes:
35 Database session
37 Examples:
38 Basic construction:
39 >>> from unittest.mock import Mock
40 >>> service = RoleService(Mock())
41 >>> isinstance(service, RoleService)
42 True
43 >>> hasattr(service, 'db')
44 True
45 """
47 def __init__(self, db: Session):
48 """Initialize role service.
50 Args:
51 db: Database session
53 Examples:
54 Basic initialization:
55 >>> from mcpgateway.services.role_service import RoleService
56 >>> from unittest.mock import Mock
57 >>> db_session = Mock()
58 >>> service = RoleService(db_session)
59 >>> service.db is db_session
60 True
62 Service instance attributes:
63 >>> hasattr(service, 'db')
64 True
65 >>> service.__class__.__name__
66 'RoleService'
67 """
68 self.db = db
70 async def create_role(self, name: str, description: str, scope: str, permissions: List[str], created_by: str, inherits_from: Optional[str] = None, is_system_role: bool = False) -> Role:
71 """Create a new role.
73 Args:
74 name: Role name (must be unique within scope)
75 description: Role description
76 scope: Role scope ('global', 'team', 'personal')
77 permissions: List of permission strings
78 created_by: Email of user creating the role
79 inherits_from: ID of parent role for inheritance
80 is_system_role: Whether this is a system-defined role
82 Returns:
83 Role: The created role
85 Raises:
86 ValueError: If role name already exists or invalid parameters
88 Examples:
89 Basic role creation parameters:
90 >>> from mcpgateway.services.role_service import RoleService
91 >>> role_name = "developer"
92 >>> len(role_name) > 0
93 True
94 >>> role_scope = "team"
95 >>> role_scope in ["global", "team", "personal"]
96 True
97 >>> permissions = ["tools.read", "tools.execute"]
98 >>> all(isinstance(p, str) for p in permissions)
99 True
101 Role validation logic:
102 >>> # Test role name validation
103 >>> test_name = "admin-role"
104 >>> len(test_name) <= 255
105 True
106 >>> bool(test_name.strip())
107 True
109 >>> # Test scope validation
110 >>> valid_scopes = ["global", "team", "personal"]
111 >>> "team" in valid_scopes
112 True
113 >>> "invalid" in valid_scopes
114 False
116 >>> # Test permissions format
117 >>> perms = ["users.read", "users.write", "teams.manage"]
118 >>> all("." in p for p in perms)
119 True
120 >>> all(len(p) > 0 for p in perms)
121 True
123 Role inheritance validation:
124 >>> # Test inherits_from parameter
125 >>> parent_role_id = "role-123"
126 >>> isinstance(parent_role_id, str)
127 True
128 >>> parent_role_id != ""
129 True
131 System role flags:
132 >>> is_system = True
133 >>> isinstance(is_system, bool)
134 True
135 >>> is_admin_role = False
136 >>> isinstance(is_admin_role, bool)
137 True
139 Creator validation:
140 >>> created_by = "admin@example.com"
141 >>> "@" in created_by
142 True
143 >>> len(created_by) > 0
144 True
146 Invalid scope is rejected immediately:
147 >>> import asyncio
148 >>> from unittest.mock import Mock
149 >>> svc = RoleService(Mock())
150 >>> try:
151 ... asyncio.run(svc.create_role('n','d','invalid',[], 'u@example.com'))
152 ... except ValueError as e:
153 ... 'Invalid scope' in str(e)
154 True
156 Duplicate name rejected within scope:
157 >>> from unittest.mock import AsyncMock, patch
158 >>> svc = RoleService(Mock())
159 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=object())):
160 ... try:
161 ... asyncio.run(svc.create_role('dup','d','global',[], 'u@example.com'))
162 ... except ValueError as e:
163 ... 'already exists' in str(e)
164 True
166 Invalid permissions rejected:
167 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)):
168 ... with patch('mcpgateway.services.role_service.Permissions.get_all_permissions', return_value=[]):
169 ... try:
170 ... asyncio.run(svc.create_role('n','d','global',['bad'], 'u@example.com'))
171 ... except ValueError as e:
172 ... 'Invalid permissions' in str(e)
173 True
175 Parent not found and cycle detection:
176 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)):
177 ... with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)):
178 ... try:
179 ... asyncio.run(svc.create_role('n','d','global',[], 'u@example.com', inherits_from='p'))
180 ... except ValueError as e:
181 ... 'Parent role not found' in str(e)
182 True
183 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)):
184 ... with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=object())):
185 ... with patch.object(RoleService, '_would_create_cycle', new=AsyncMock(return_value=True)):
186 ... try:
187 ... asyncio.run(svc.create_role('n','d','global',[], 'u@example.com', inherits_from='p'))
188 ... except ValueError as e:
189 ... 'create a cycle' in str(e)
190 True
191 """
192 # Validate scope
193 if scope not in ["global", "team", "personal"]:
194 raise ValueError(f"Invalid scope: {scope}")
196 # Check for duplicate name within scope
197 existing = await self.get_role_by_name(name, scope)
198 if existing:
199 raise ValueError(f"Role '{name}' already exists in scope '{scope}'")
201 # Validate permissions
202 valid_permissions = Permissions.get_all_permissions()
203 valid_permissions.append(Permissions.ALL_PERMISSIONS) # Allow wildcard
205 invalid_perms = [p for p in permissions if p not in valid_permissions]
206 if invalid_perms:
207 raise ValueError(f"Invalid permissions: {invalid_perms}")
209 # Validate inheritance
210 parent_role = None
211 if inherits_from:
212 parent_role = await self.get_role_by_id(inherits_from)
213 if not parent_role:
214 raise ValueError(f"Parent role not found: {inherits_from}")
216 # Check for circular inheritance
217 if await self._would_create_cycle(inherits_from, None):
218 raise ValueError("Role inheritance would create a cycle")
220 # Create the role
221 role = Role(name=name, description=description, scope=scope, permissions=permissions, created_by=created_by, inherits_from=inherits_from, is_system_role=is_system_role)
223 self.db.add(role)
224 self.db.commit()
225 self.db.refresh(role)
227 logger.info(f"Created role: {role.name} (scope: {role.scope}, id: {role.id})")
228 return role
230 async def get_role_by_id(self, role_id: str) -> Optional[Role]:
231 """Get role by ID.
233 Args:
234 role_id: Role ID to lookup
236 Returns:
237 Optional[Role]: The role if found, None otherwise
239 Examples:
240 Check coroutine nature and signature:
241 >>> import asyncio
242 >>> from unittest.mock import Mock
243 >>> service = RoleService(Mock())
244 >>> asyncio.iscoroutinefunction(service.get_role_by_id)
245 True
246 """
247 result = self.db.execute(select(Role).where(Role.id == role_id))
248 role = result.scalar_one_or_none()
249 return role
251 async def get_role_by_name(self, name: str, scope: str) -> Optional[Role]:
252 """Get role by name and scope.
254 Args:
255 name: Role name
256 scope: Role scope
258 Returns:
259 Optional[Role]: The role if found, None otherwise
261 Examples:
262 Basic callable validation:
263 >>> import asyncio
264 >>> from unittest.mock import Mock
265 >>> service = RoleService(Mock())
266 >>> asyncio.iscoroutinefunction(service.get_role_by_name)
267 True
268 """
269 result = self.db.execute(select(Role).where(and_(Role.name == name, Role.scope == scope, Role.is_active.is_(True))))
270 role = result.scalar_one_or_none()
271 return role
273 async def list_roles(self, scope: Optional[str] = None, include_system: bool = True, include_inactive: bool = False) -> List[Role]:
274 """List roles with optional filtering.
276 Args:
277 scope: Filter by scope ('global', 'team', 'personal')
278 include_system: Whether to include system roles
279 include_inactive: Whether to include inactive roles
281 Returns:
282 List[Role]: List of matching roles
284 Examples:
285 Callable check:
286 >>> import asyncio
287 >>> from unittest.mock import Mock
288 >>> service = RoleService(Mock())
289 >>> asyncio.iscoroutinefunction(service.list_roles)
290 True
291 >>> # Simulate empty list result
292 >>> class _Res:
293 ... def scalars(self):
294 ... class _S:
295 ... def all(self):
296 ... return []
297 ... return _S()
298 >>> service.db.execute = lambda *_a, **_k: _Res()
299 >>> asyncio.run(service.list_roles('team', include_system=False, include_inactive=True)) == []
300 True
301 """
302 query = select(Role)
304 conditions = []
306 if scope:
307 conditions.append(Role.scope == scope)
309 if not include_system:
310 conditions.append(Role.is_system_role.is_(False))
312 if not include_inactive:
313 conditions.append(Role.is_active.is_(True))
315 if conditions:
316 query = query.where(and_(*conditions))
318 query = query.order_by(Role.scope, Role.name)
320 result = self.db.execute(query)
321 roles = result.scalars().all()
322 return roles
324 async def update_role(
325 self,
326 role_id: str,
327 name: Optional[str] = None,
328 description: Optional[str] = None,
329 permissions: Optional[List[str]] = None,
330 inherits_from: Optional[str] = None,
331 is_active: Optional[bool] = None,
332 ) -> Optional[Role]:
333 """Update an existing role.
335 Args:
336 role_id: ID of role to update
337 name: New role name
338 description: New role description
339 permissions: New permissions list
340 inherits_from: New parent role ID
341 is_active: New active status
343 Returns:
344 Optional[Role]: Updated role or None if not found
346 Raises:
347 ValueError: If update would create invalid state
349 Examples:
350 Signature and coroutine checks:
351 >>> import asyncio, inspect
352 >>> from unittest.mock import Mock
353 >>> service = RoleService(Mock())
354 >>> asyncio.iscoroutinefunction(service.update_role)
355 True
356 >>> params = inspect.signature(RoleService.update_role).parameters
357 >>> all(p in params for p in [
358 ... 'role_id', 'name', 'description', 'permissions', 'inherits_from', 'is_active'
359 ... ])
360 True
362 Additional validation paths:
363 Cannot modify system roles:
364 >>> from unittest.mock import AsyncMock, patch
365 >>> service = RoleService(object())
366 >>> mock_role = type('R', (), {'is_system_role': True})()
367 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=mock_role)):
368 ... try:
369 ... _ = asyncio.run(service.update_role('rid'))
370 ... except ValueError as e:
371 ... 'system roles' in str(e)
372 True
374 Returns None when role not found:
375 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)):
376 ... asyncio.run(service.update_role('missing')) is None
377 True
379 Duplicate new name rejected:
380 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'is_active': True})()
381 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)):
382 ... with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=type('R2', (), {'id': 'id2'})())):
383 ... try:
384 ... asyncio.run(service.update_role('id1', name='new'))
385 ... except ValueError as e:
386 ... 'already exists' in str(e)
387 True
389 Invalid permissions rejected on update:
390 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'is_active': True})()
391 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)):
392 ... with patch('mcpgateway.services.role_service.Permissions.get_all_permissions', return_value=[]):
393 ... try:
394 ... asyncio.run(service.update_role('id1', permissions=['bad']))
395 ... except ValueError as e:
396 ... 'Invalid permissions' in str(e)
397 True
399 Parent not found and cycle detection on update:
400 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'inherits_from': None, 'is_active': True})()
401 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(side_effect=[role, None])):
402 ... try:
403 ... asyncio.run(service.update_role('id1', inherits_from='p'))
404 ... except ValueError as e:
405 ... 'Parent role not found' in str(e)
406 True
407 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(side_effect=[role, object()])):
408 ... with patch.object(RoleService, '_would_create_cycle', new=AsyncMock(return_value=True)):
409 ... try:
410 ... asyncio.run(service.update_role('id1', inherits_from='p'))
411 ... except ValueError as e:
412 ... 'create a cycle' in str(e)
413 True
414 """
415 role = await self.get_role_by_id(role_id)
416 if not role:
417 return None
419 # Prevent modification of system roles
420 if role.is_system_role:
421 raise ValueError("Cannot modify system roles")
423 # Validate new name if provided
424 if name and name != role.name:
425 existing = await self.get_role_by_name(name, role.scope)
426 if existing and existing.id != role_id: 426 ↛ 428line 426 didn't jump to line 428 because the condition on line 426 was always true
427 raise ValueError(f"Role '{name}' already exists in scope '{role.scope}'")
428 role.name = name
430 # Update description
431 if description is not None:
432 role.description = description
434 # Validate and update permissions
435 if permissions is not None:
436 valid_permissions = Permissions.get_all_permissions()
437 valid_permissions.append(Permissions.ALL_PERMISSIONS)
439 invalid_perms = [p for p in permissions if p not in valid_permissions]
440 if invalid_perms:
441 raise ValueError(f"Invalid permissions: {invalid_perms}")
443 role.permissions = permissions
445 # Validate and update inheritance
446 if inherits_from is not None:
447 if inherits_from != role.inherits_from: 447 ↛ 460line 447 didn't jump to line 460 because the condition on line 447 was always true
448 if inherits_from:
449 parent_role = await self.get_role_by_id(inherits_from)
450 if not parent_role:
451 raise ValueError(f"Parent role not found: {inherits_from}")
453 # Check for circular inheritance
454 if await self._would_create_cycle(inherits_from, role_id):
455 raise ValueError("Role inheritance would create a cycle")
457 role.inherits_from = inherits_from
459 # Update active status
460 if is_active is not None:
461 role.is_active = is_active
463 # Update timestamp
464 role.updated_at = utc_now()
466 self.db.commit()
467 self.db.refresh(role)
469 logger.info(f"Updated role: {role.name} (id: {role.id})")
470 return role
472 async def delete_role(self, role_id: str) -> bool:
473 """Delete a role.
475 Soft deletes the role by setting is_active to False.
476 Also deactivates all user role assignments.
478 Args:
479 role_id: ID of role to delete
481 Returns:
482 bool: True if role was deleted, False if not found
484 Raises:
485 ValueError: If trying to delete a system role
487 Examples:
488 Coroutine check:
489 >>> import asyncio
490 >>> from unittest.mock import Mock
491 >>> service = RoleService(Mock())
492 >>> asyncio.iscoroutinefunction(service.delete_role)
493 True
495 Returns False when role not found:
496 >>> from unittest.mock import AsyncMock, patch
497 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)):
498 ... asyncio.run(service.delete_role('missing'))
499 False
501 System roles cannot be deleted:
502 >>> sys_role = type('R', (), {'is_system_role': True})()
503 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=sys_role)):
504 ... try:
505 ... asyncio.run(service.delete_role('rid'))
506 ... except ValueError as e:
507 ... 'system roles' in str(e)
508 True
509 """
510 role = await self.get_role_by_id(role_id)
511 if not role:
512 return False
514 if role.is_system_role:
515 raise ValueError("Cannot delete system roles")
517 # Soft delete the role
518 role.is_active = False
519 role.updated_at = utc_now()
521 # Deactivate all user assignments of this role
522 self.db.execute(select(UserRole).where(UserRole.role_id == role_id)).update({"is_active": False})
524 self.db.commit()
526 logger.info(f"Deleted role: {role.name} (id: {role.id})")
527 return True
529 async def assign_role_to_user(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str], granted_by: str, expires_at: Optional[datetime] = None) -> UserRole:
530 """Assign a role to a user.
532 Args:
533 user_email: Email of user to assign role to
534 role_id: ID of role to assign
535 scope: Scope of assignment ('global', 'team', 'personal')
536 scope_id: Team ID if team-scoped
537 granted_by: Email of user granting the role
538 expires_at: Optional expiration datetime
540 Returns:
541 UserRole: The role assignment
543 Raises:
544 ValueError: If invalid parameters or assignment already exists
546 Examples:
547 Coroutine check:
548 >>> import asyncio
549 >>> from unittest.mock import Mock
550 >>> service = RoleService(Mock())
551 >>> asyncio.iscoroutinefunction(service.assign_role_to_user)
552 True
554 Scope mismatch raises error:
555 >>> from unittest.mock import AsyncMock, patch
556 >>> role = type('Role', (), {'is_active': True, 'scope': 'team'})()
557 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)):
558 ... try:
559 ... asyncio.run(service.assign_role_to_user('u@e','rid','global',None,'admin'))
560 ... except ValueError as e:
561 ... "doesn't match" in str(e)
562 True
564 Team scope requires scope_id:
565 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)):
566 ... try:
567 ... asyncio.run(service.assign_role_to_user('u@e','rid','team',None,'admin'))
568 ... except ValueError as e:
569 ... 'scope_id required' in str(e)
570 True
572 Global scope forbids scope_id:
573 >>> role.scope = 'global'
574 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)):
575 ... try:
576 ... asyncio.run(service.assign_role_to_user('u@e','rid','global','x','admin'))
577 ... except ValueError as e:
578 ... 'not allowed for global' in str(e)
579 True
581 Duplicate active assignment is rejected:
582 >>> role.scope = 'team'
583 >>> active = type('UR', (), {'is_active': True, 'is_expired': lambda self: False})()
584 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)):
585 ... with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=active)):
586 ... try:
587 ... asyncio.run(service.assign_role_to_user('u@e','rid','team','t1','admin'))
588 ... except ValueError as e:
589 ... 'already has this role' in str(e)
590 True
592 Role not found or inactive raises:
593 >>> inactive = type('Role', (), {'is_active': False, 'scope': 'team'})()
594 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=inactive)):
595 ... try:
596 ... asyncio.run(service.assign_role_to_user('u@e','rid','team','t1','admin'))
597 ... except ValueError as e:
598 ... 'not found or inactive' in str(e)
599 True
600 """
601 # Validate role exists and is active
602 role = await self.get_role_by_id(role_id)
603 if not role or not role.is_active:
604 raise ValueError(f"Role not found or inactive: {role_id}")
606 # Validate scope consistency
607 if role.scope != scope:
608 raise ValueError(f"Role scope '{role.scope}' doesn't match assignment scope '{scope}'")
610 # Validate scope_id requirements
611 if scope == "team" and not scope_id:
612 raise ValueError("scope_id required for team-scoped assignments")
613 if scope in ["global", "personal"] and scope_id:
614 raise ValueError(f"scope_id not allowed for {scope} assignments")
616 # Check for existing active assignment
617 existing = await self.get_user_role_assignment(user_email, role_id, scope, scope_id)
618 if existing and existing.is_active and not existing.is_expired():
619 raise ValueError("User already has this role assignment")
621 # Create the assignment
622 user_role = UserRole(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id, granted_by=granted_by, expires_at=expires_at)
624 self.db.add(user_role)
625 self.db.commit()
626 self.db.refresh(user_role)
628 logger.info(f"Assigned role {role.name} to {user_email} (scope: {scope}, scope_id: {scope_id})")
629 return user_role
631 async def revoke_role_from_user(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str]) -> bool:
632 """Revoke a role from a user.
634 Args:
635 user_email: Email of user
636 role_id: ID of role to revoke
637 scope: Scope of assignment
638 scope_id: Team ID if team-scoped
640 Returns:
641 bool: True if role was revoked, False if not found
643 Examples:
644 Coroutine check:
645 >>> import asyncio
646 >>> from unittest.mock import Mock
647 >>> service = RoleService(Mock())
648 >>> asyncio.iscoroutinefunction(service.revoke_role_from_user)
649 True
651 Returns False when assignment not found or inactive:
652 >>> from unittest.mock import AsyncMock, patch
653 >>> with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=None)):
654 ... asyncio.run(service.revoke_role_from_user('u','r','team','t'))
655 False
657 Returns True on successful revoke:
658 >>> active = type('UR', (), {'is_active': True})()
659 >>> with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=active)):
660 ... asyncio.run(service.revoke_role_from_user('u','r','team','t'))
661 True
662 """
663 user_role = await self.get_user_role_assignment(user_email, role_id, scope, scope_id)
665 if not user_role or not user_role.is_active:
666 return False
668 user_role.is_active = False
669 self.db.commit()
671 logger.info(f"Revoked role {role_id} from {user_email} (scope: {scope}, scope_id: {scope_id})")
672 return True
674 async def get_user_role_assignment(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str]) -> Optional[UserRole]:
675 """Get a specific user role assignment.
677 Args:
678 user_email: Email of user
679 role_id: ID of role
680 scope: Scope of assignment
681 scope_id: Team ID if team-scoped
683 Returns:
684 Optional[UserRole]: The role assignment if found
686 Examples:
687 Coroutine check:
688 >>> import asyncio
689 >>> from unittest.mock import Mock
690 >>> service = RoleService(Mock())
691 >>> asyncio.iscoroutinefunction(service.get_user_role_assignment)
692 True
693 """
694 conditions = [UserRole.user_email == user_email, UserRole.role_id == role_id, UserRole.scope == scope]
696 if scope_id:
697 conditions.append(UserRole.scope_id == scope_id)
698 else:
699 conditions.append(UserRole.scope_id.is_(None))
701 result = self.db.execute(select(UserRole).where(and_(*conditions)))
702 user_role = result.scalar_one_or_none()
703 return user_role
705 async def list_user_roles(self, user_email: str, scope: Optional[str] = None, include_expired: bool = False) -> List[UserRole]:
706 """List all role assignments for a user.
708 Args:
709 user_email: Email of user
710 scope: Filter by scope
711 include_expired: Whether to include expired roles
713 Returns:
714 List[UserRole]: User's role assignments
716 Examples:
717 Coroutine check:
718 >>> import asyncio
719 >>> from unittest.mock import Mock
720 >>> service = RoleService(Mock())
721 >>> asyncio.iscoroutinefunction(service.list_user_roles)
722 True
723 >>> # Simulate scalar results aggregation
724 >>> class _Res:
725 ... def scalars(self):
726 ... class _S:
727 ... def all(self):
728 ... return ['ur1', 'ur2']
729 ... return _S()
730 >>> service.db.execute = lambda *_a, **_k: _Res()
731 >>> result = asyncio.run(service.list_user_roles('u@example.com', 'team'))
732 >>> isinstance(result, list) and len(result) == 2
733 True
734 """
735 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True)))
737 if scope:
738 query = query.where(UserRole.scope == scope)
740 if not include_expired:
741 now = utc_now()
742 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
744 query = query.order_by(UserRole.scope, Role.name)
746 result = self.db.execute(query)
747 user_roles = result.scalars().all()
748 return user_roles
750 async def list_role_assignments(self, role_id: str, scope: Optional[str] = None, include_expired: bool = False) -> List[UserRole]:
751 """List all user assignments for a role.
753 Args:
754 role_id: ID of role
755 scope: Filter by scope
756 include_expired: Whether to include expired assignments
758 Returns:
759 List[UserRole]: Role assignments
761 Examples:
762 Coroutine check:
763 >>> import asyncio
764 >>> from unittest.mock import Mock
765 >>> service = RoleService(Mock())
766 >>> asyncio.iscoroutinefunction(service.list_role_assignments)
767 True
768 >>> # Simulate scalar results aggregation
769 >>> class _Res:
770 ... def scalars(self):
771 ... class _S:
772 ... def all(self):
773 ... return []
774 ... return _S()
775 >>> service.db.execute = lambda *_a, **_k: _Res()
776 >>> asyncio.run(service.list_role_assignments('rid'))
777 []
778 """
779 query = select(UserRole).where(and_(UserRole.role_id == role_id, UserRole.is_active.is_(True)))
781 if scope:
782 query = query.where(UserRole.scope == scope)
784 if not include_expired:
785 now = utc_now()
786 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now))
788 query = query.order_by(UserRole.user_email)
790 result = self.db.execute(query)
791 assignments = result.scalars().all()
792 return assignments
794 async def _would_create_cycle(self, parent_id: str, child_id: Optional[str]) -> bool:
795 """Check if setting parent_id as parent of child_id would create a cycle.
797 Args:
798 parent_id: ID of the proposed parent role
799 child_id: ID of the proposed child role
801 Returns:
802 True if setting this relationship would create a cycle, False otherwise
804 Examples:
805 Test cycle detection logic:
806 >>> from mcpgateway.services.role_service import RoleService
808 Basic parameter validation:
809 >>> parent_id = "role-admin"
810 >>> child_id = "role-user"
811 >>> parent_id != child_id
812 True
813 >>> isinstance(parent_id, str)
814 True
815 >>> isinstance(child_id, str)
816 True
818 Test None child_id handling (line 584-585):
819 >>> child_id_none = None
820 >>> child_id_none is None
821 True
822 >>> # This should return False without cycle check
824 Test cycle detection scenarios:
825 >>> # Direct cycle: A -> A
826 >>> same_id = "role-123"
827 >>> same_id == same_id
828 True
830 >>> # Simple cycle: A -> B, B -> A
831 >>> role_a = "role-a"
832 >>> role_b = "role-b"
833 >>> role_a != role_b
834 True
836 Test visited set logic:
837 >>> visited = set()
838 >>> current = "role-1"
839 >>> current not in visited
840 True
841 >>> visited.add(current)
842 >>> current in visited
843 True
845 Test role hierarchy traversal:
846 >>> # Test hierarchy: admin -> manager -> user
847 >>> admin_role = "admin-role"
848 >>> manager_role = "manager-role"
849 >>> user_role = "user-role"
850 >>> all(isinstance(r, str) for r in [admin_role, manager_role, user_role])
851 True
852 >>> len({admin_role, manager_role, user_role}) == 3
853 True
854 """
855 if not child_id:
856 return False
858 visited = set()
859 current = parent_id
861 while current and current not in visited:
862 if current == child_id:
863 return True
865 visited.add(current)
867 # Get parent of current role
868 result = self.db.execute(select(Role.inherits_from).where(Role.id == current))
869 current = result.scalar_one_or_none()
871 return False
873 async def delete_all_user_roles(self, user_email: str) -> int:
874 """Delete all role assignments for a user.
876 Hard-deletes all role assignments (active and inactive) for the given user.
877 Intended for use when permanently deleting a user account.
879 Note: Does not commit the transaction. The caller is responsible for
880 committing (e.g., as part of a larger user deletion operation).
882 Args:
883 user_email: Email of user whose roles should be deleted
885 Returns:
886 int: Number of role assignments deleted
887 """
888 stmt = delete(UserRole).where(UserRole.user_email == user_email)
889 result = self.db.execute(stmt)
890 deleted_count = result.rowcount
891 logger.info(f"Deleted {deleted_count} role assignment(s) for user {user_email}")
892 return deleted_count