Coverage for mcpgateway / services / role_service.py: 99%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/role_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Role Management Service for RBAC System. 

8 

9This module provides CRUD operations for roles and user role assignments. 

10It handles role creation, assignment, revocation, and validation. 

11""" 

12 

13# Standard 

14from datetime import datetime 

15import logging 

16from typing import List, Optional 

17 

18# Third-Party 

19from sqlalchemy import and_, delete, select, update 

20from sqlalchemy.orm import Session 

21 

22# First-Party 

23from mcpgateway.db import Permissions, Role, UserRole, utc_now 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class RoleService: 

29 """Service for managing roles and role assignments. 

30 

31 Provides comprehensive role management including creation, assignment, 

32 revocation, and validation with support for role inheritance. 

33 

34 Attributes: 

35 Database session 

36 

37 Examples: 

38 Basic construction: 

39 >>> from unittest.mock import Mock 

40 >>> service = RoleService(Mock()) 

41 >>> isinstance(service, RoleService) 

42 True 

43 >>> hasattr(service, 'db') 

44 True 

45 """ 

46 

47 def __init__(self, db: Session): 

48 """Initialize role service. 

49 

50 Args: 

51 db: Database session 

52 

53 Examples: 

54 Basic initialization: 

55 >>> from mcpgateway.services.role_service import RoleService 

56 >>> from unittest.mock import Mock 

57 >>> db_session = Mock() 

58 >>> service = RoleService(db_session) 

59 >>> service.db is db_session 

60 True 

61 

62 Service instance attributes: 

63 >>> hasattr(service, 'db') 

64 True 

65 >>> service.__class__.__name__ 

66 'RoleService' 

67 """ 

68 self.db = db 

69 

70 async def create_role(self, name: str, description: str, scope: str, permissions: List[str], created_by: str, inherits_from: Optional[str] = None, is_system_role: bool = False) -> Role: 

71 """Create a new role. 

72 

73 Args: 

74 name: Role name (must be unique within scope) 

75 description: Role description 

76 scope: Role scope ('global', 'team', 'personal') 

77 permissions: List of permission strings 

78 created_by: Email of user creating the role 

79 inherits_from: ID of parent role for inheritance 

80 is_system_role: Whether this is a system-defined role 

81 

82 Returns: 

83 Role: The created role 

84 

85 Raises: 

86 ValueError: If role name already exists or invalid parameters 

87 

88 Examples: 

89 Basic role creation parameters: 

90 >>> from mcpgateway.services.role_service import RoleService 

91 >>> role_name = "developer" 

92 >>> len(role_name) > 0 

93 True 

94 >>> role_scope = "team" 

95 >>> role_scope in ["global", "team", "personal"] 

96 True 

97 >>> permissions = ["tools.read", "tools.execute"] 

98 >>> all(isinstance(p, str) for p in permissions) 

99 True 

100 

101 Role validation logic: 

102 >>> # Test role name validation 

103 >>> test_name = "admin-role" 

104 >>> len(test_name) <= 255 

105 True 

106 >>> bool(test_name.strip()) 

107 True 

108 

109 >>> # Test scope validation 

110 >>> valid_scopes = ["global", "team", "personal"] 

111 >>> "team" in valid_scopes 

112 True 

113 >>> "invalid" in valid_scopes 

114 False 

115 

116 >>> # Test permissions format 

117 >>> perms = ["users.read", "users.write", "teams.manage"] 

118 >>> all("." in p for p in perms) 

119 True 

120 >>> all(len(p) > 0 for p in perms) 

121 True 

122 

123 Role inheritance validation: 

124 >>> # Test inherits_from parameter 

125 >>> parent_role_id = "role-123" 

126 >>> isinstance(parent_role_id, str) 

127 True 

128 >>> parent_role_id != "" 

129 True 

130 

131 System role flags: 

132 >>> is_system = True 

133 >>> isinstance(is_system, bool) 

134 True 

135 >>> is_admin_role = False 

136 >>> isinstance(is_admin_role, bool) 

137 True 

138 

139 Creator validation: 

140 >>> created_by = "admin@example.com" 

141 >>> "@" in created_by 

142 True 

143 >>> len(created_by) > 0 

144 True 

145 

146 Invalid scope is rejected immediately: 

147 >>> import asyncio 

148 >>> from unittest.mock import Mock 

149 >>> svc = RoleService(Mock()) 

150 >>> try: 

151 ... asyncio.run(svc.create_role('n','d','invalid',[], 'u@example.com')) 

152 ... except ValueError as e: 

153 ... 'Invalid scope' in str(e) 

154 True 

155 

156 Duplicate name rejected within scope: 

157 >>> from unittest.mock import AsyncMock, patch 

158 >>> svc = RoleService(Mock()) 

159 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=object())): 

160 ... try: 

161 ... asyncio.run(svc.create_role('dup','d','global',[], 'u@example.com')) 

162 ... except ValueError as e: 

163 ... 'already exists' in str(e) 

164 True 

165 

166 Invalid permissions rejected: 

167 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)): 

168 ... with patch('mcpgateway.services.role_service.Permissions.get_all_permissions', return_value=[]): 

169 ... try: 

170 ... asyncio.run(svc.create_role('n','d','global',['bad'], 'u@example.com')) 

171 ... except ValueError as e: 

172 ... 'Invalid permissions' in str(e) 

173 True 

174 

175 Parent not found and cycle detection: 

176 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)): 

177 ... with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)): 

178 ... try: 

179 ... asyncio.run(svc.create_role('n','d','global',[], 'u@example.com', inherits_from='p')) 

180 ... except ValueError as e: 

181 ... 'Parent role not found' in str(e) 

182 True 

183 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)): 

184 ... with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=object())): 

185 ... with patch.object(RoleService, '_would_create_cycle', new=AsyncMock(return_value=True)): 

186 ... try: 

187 ... asyncio.run(svc.create_role('n','d','global',[], 'u@example.com', inherits_from='p')) 

188 ... except ValueError as e: 

189 ... 'create a cycle' in str(e) 

190 True 

191 """ 

192 # Validate scope 

193 if scope not in ["global", "team", "personal"]: 

194 raise ValueError(f"Invalid scope: {scope}") 

195 

196 # Check for duplicate name within scope 

197 existing = await self.get_role_by_name(name, scope) 

198 if existing: 

199 raise ValueError(f"Role '{name}' already exists in scope '{scope}'") 

200 

201 # Validate permissions 

202 valid_permissions = Permissions.get_all_permissions() 

203 valid_permissions.append(Permissions.ALL_PERMISSIONS) # Allow wildcard 

204 

205 invalid_perms = [p for p in permissions if p not in valid_permissions] 

206 if invalid_perms: 

207 raise ValueError(f"Invalid permissions: {invalid_perms}") 

208 

209 # Validate inheritance 

210 parent_role = None 

211 if inherits_from: 

212 parent_role = await self.get_role_by_id(inherits_from) 

213 if not parent_role: 

214 raise ValueError(f"Parent role not found: {inherits_from}") 

215 

216 # Check for circular inheritance 

217 if await self._would_create_cycle(inherits_from, None): 

218 raise ValueError("Role inheritance would create a cycle") 

219 

220 # Create the role 

221 role = Role(name=name, description=description, scope=scope, permissions=permissions, created_by=created_by, inherits_from=inherits_from, is_system_role=is_system_role) 

222 

223 self.db.add(role) 

224 self.db.commit() 

225 self.db.refresh(role) 

226 

227 logger.info(f"Created role: {role.name} (scope: {role.scope}, id: {role.id})") 

228 return role 

229 

230 async def get_role_by_id(self, role_id: str) -> Optional[Role]: 

231 """Get role by ID. 

232 

233 Args: 

234 role_id: Role ID to lookup 

235 

236 Returns: 

237 Optional[Role]: The role if found, None otherwise 

238 

239 Examples: 

240 Check coroutine nature and signature: 

241 >>> import asyncio 

242 >>> from unittest.mock import Mock 

243 >>> service = RoleService(Mock()) 

244 >>> asyncio.iscoroutinefunction(service.get_role_by_id) 

245 True 

246 """ 

247 result = self.db.execute(select(Role).where(Role.id == role_id)) 

248 role = result.scalar_one_or_none() 

249 return role 

250 

251 async def get_role_by_name(self, name: str, scope: str) -> Optional[Role]: 

252 """Get role by name and scope. 

253 

254 Args: 

255 name: Role name 

256 scope: Role scope 

257 

258 Returns: 

259 Optional[Role]: The role if found, None otherwise 

260 

261 Examples: 

262 Basic callable validation: 

263 >>> import asyncio 

264 >>> from unittest.mock import Mock 

265 >>> service = RoleService(Mock()) 

266 >>> asyncio.iscoroutinefunction(service.get_role_by_name) 

267 True 

268 """ 

269 result = self.db.execute(select(Role).where(and_(Role.name == name, Role.scope == scope, Role.is_active.is_(True)))) 

270 role = result.scalar_one_or_none() 

271 return role 

272 

273 async def list_roles(self, scope: Optional[str] = None, include_system: bool = True, include_inactive: bool = False) -> List[Role]: 

274 """List roles with optional filtering. 

275 

276 Args: 

277 scope: Filter by scope ('global', 'team', 'personal') 

278 include_system: Whether to include system roles 

279 include_inactive: Whether to include inactive roles 

280 

281 Returns: 

282 List[Role]: List of matching roles 

283 

284 Examples: 

285 Callable check: 

286 >>> import asyncio 

287 >>> from unittest.mock import Mock 

288 >>> service = RoleService(Mock()) 

289 >>> asyncio.iscoroutinefunction(service.list_roles) 

290 True 

291 >>> # Simulate empty list result 

292 >>> class _Res: 

293 ... def scalars(self): 

294 ... class _S: 

295 ... def all(self): 

296 ... return [] 

297 ... return _S() 

298 >>> service.db.execute = lambda *_a, **_k: _Res() 

299 >>> asyncio.run(service.list_roles('team', include_system=False, include_inactive=True)) == [] 

300 True 

301 """ 

302 query = select(Role) 

303 

304 conditions = [] 

305 

306 if scope: 

307 conditions.append(Role.scope == scope) 

308 

309 if not include_system: 

310 conditions.append(Role.is_system_role.is_(False)) 

311 

312 if not include_inactive: 

313 conditions.append(Role.is_active.is_(True)) 

314 

315 if conditions: 

316 query = query.where(and_(*conditions)) 

317 

318 query = query.order_by(Role.scope, Role.name) 

319 

320 result = self.db.execute(query) 

321 roles = result.scalars().all() 

322 return roles 

323 

324 async def update_role( 

325 self, 

326 role_id: str, 

327 name: Optional[str] = None, 

328 description: Optional[str] = None, 

329 permissions: Optional[List[str]] = None, 

330 inherits_from: Optional[str] = None, 

331 is_active: Optional[bool] = None, 

332 ) -> Optional[Role]: 

333 """Update an existing role. 

334 

335 Args: 

336 role_id: ID of role to update 

337 name: New role name 

338 description: New role description 

339 permissions: New permissions list 

340 inherits_from: New parent role ID 

341 is_active: New active status 

342 

343 Returns: 

344 Optional[Role]: Updated role or None if not found 

345 

346 Raises: 

347 ValueError: If update would create invalid state 

348 

349 Examples: 

350 Signature and coroutine checks: 

351 >>> import asyncio, inspect 

352 >>> from unittest.mock import Mock 

353 >>> service = RoleService(Mock()) 

354 >>> asyncio.iscoroutinefunction(service.update_role) 

355 True 

356 >>> params = inspect.signature(RoleService.update_role).parameters 

357 >>> all(p in params for p in [ 

358 ... 'role_id', 'name', 'description', 'permissions', 'inherits_from', 'is_active' 

359 ... ]) 

360 True 

361 

362 Additional validation paths: 

363 Cannot modify system roles: 

364 >>> from unittest.mock import AsyncMock, patch 

365 >>> service = RoleService(object()) 

366 >>> mock_role = type('R', (), {'is_system_role': True})() 

367 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=mock_role)): 

368 ... try: 

369 ... _ = asyncio.run(service.update_role('rid')) 

370 ... except ValueError as e: 

371 ... 'system roles' in str(e) 

372 True 

373 

374 Returns None when role not found: 

375 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)): 

376 ... asyncio.run(service.update_role('missing')) is None 

377 True 

378 

379 Duplicate new name rejected: 

380 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'is_active': True})() 

381 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

382 ... with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=type('R2', (), {'id': 'id2'})())): 

383 ... try: 

384 ... asyncio.run(service.update_role('id1', name='new')) 

385 ... except ValueError as e: 

386 ... 'already exists' in str(e) 

387 True 

388 

389 Invalid permissions rejected on update: 

390 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'is_active': True})() 

391 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

392 ... with patch('mcpgateway.services.role_service.Permissions.get_all_permissions', return_value=[]): 

393 ... try: 

394 ... asyncio.run(service.update_role('id1', permissions=['bad'])) 

395 ... except ValueError as e: 

396 ... 'Invalid permissions' in str(e) 

397 True 

398 

399 Parent not found and cycle detection on update: 

400 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'inherits_from': None, 'is_active': True})() 

401 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(side_effect=[role, None])): 

402 ... try: 

403 ... asyncio.run(service.update_role('id1', inherits_from='p')) 

404 ... except ValueError as e: 

405 ... 'Parent role not found' in str(e) 

406 True 

407 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(side_effect=[role, object()])): 

408 ... with patch.object(RoleService, '_would_create_cycle', new=AsyncMock(return_value=True)): 

409 ... try: 

410 ... asyncio.run(service.update_role('id1', inherits_from='p')) 

411 ... except ValueError as e: 

412 ... 'create a cycle' in str(e) 

413 True 

414 """ 

415 role = await self.get_role_by_id(role_id) 

416 if not role: 

417 return None 

418 

419 # Prevent modification of system roles 

420 if role.is_system_role: 

421 raise ValueError("Cannot modify system roles") 

422 

423 # Validate new name if provided 

424 if name and name != role.name: 

425 existing = await self.get_role_by_name(name, role.scope) 

426 if existing and existing.id != role_id: 

427 raise ValueError(f"Role '{name}' already exists in scope '{role.scope}'") 

428 role.name = name 

429 

430 # Update description 

431 if description is not None: 

432 role.description = description 

433 

434 # Validate and update permissions 

435 if permissions is not None: 

436 valid_permissions = Permissions.get_all_permissions() 

437 valid_permissions.append(Permissions.ALL_PERMISSIONS) 

438 

439 invalid_perms = [p for p in permissions if p not in valid_permissions] 

440 if invalid_perms: 

441 raise ValueError(f"Invalid permissions: {invalid_perms}") 

442 

443 role.permissions = permissions 

444 

445 # Validate and update inheritance 

446 if inherits_from is not None: 

447 if inherits_from != role.inherits_from: 

448 if inherits_from: 

449 parent_role = await self.get_role_by_id(inherits_from) 

450 if not parent_role: 

451 raise ValueError(f"Parent role not found: {inherits_from}") 

452 

453 # Check for circular inheritance 

454 if await self._would_create_cycle(inherits_from, role_id): 

455 raise ValueError("Role inheritance would create a cycle") 

456 

457 role.inherits_from = inherits_from 

458 

459 # Update active status 

460 if is_active is not None: 

461 role.is_active = is_active 

462 

463 # Update timestamp 

464 role.updated_at = utc_now() 

465 

466 self.db.commit() 

467 self.db.refresh(role) 

468 

469 logger.info(f"Updated role: {role.name} (id: {role.id})") 

470 return role 

471 

472 async def delete_role(self, role_id: str) -> bool: 

473 """Delete a role. 

474 

475 Soft deletes the role by setting is_active to False. 

476 Also deactivates all user role assignments. 

477 

478 Args: 

479 role_id: ID of role to delete 

480 

481 Returns: 

482 bool: True if role was deleted, False if not found 

483 

484 Raises: 

485 ValueError: If trying to delete a system role 

486 

487 Examples: 

488 Coroutine check: 

489 >>> import asyncio 

490 >>> from unittest.mock import Mock 

491 >>> service = RoleService(Mock()) 

492 >>> asyncio.iscoroutinefunction(service.delete_role) 

493 True 

494 

495 Returns False when role not found: 

496 >>> from unittest.mock import AsyncMock, patch 

497 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)): 

498 ... asyncio.run(service.delete_role('missing')) 

499 False 

500 

501 System roles cannot be deleted: 

502 >>> sys_role = type('R', (), {'is_system_role': True})() 

503 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=sys_role)): 

504 ... try: 

505 ... asyncio.run(service.delete_role('rid')) 

506 ... except ValueError as e: 

507 ... 'system roles' in str(e) 

508 True 

509 """ 

510 role = await self.get_role_by_id(role_id) 

511 if not role: 

512 return False 

513 

514 if role.is_system_role: 

515 raise ValueError("Cannot delete system roles") 

516 

517 # Soft delete the role 

518 role.is_active = False 

519 role.updated_at = utc_now() 

520 

521 # Deactivate all user assignments of this role 

522 self.db.execute(update(UserRole).where(UserRole.role_id == role_id).values(is_active=False)) 

523 

524 self.db.commit() 

525 

526 logger.info(f"Deleted role: {role.name} (id: {role.id})") 

527 return True 

528 

529 async def assign_role_to_user( 

530 self, user_email: str, role_id: str, scope: str, scope_id: Optional[str], granted_by: str, expires_at: Optional[datetime] = None, grant_source: Optional[str] = None 

531 ) -> UserRole: 

532 """Assign a role to a user. 

533 

534 Args: 

535 user_email: Email of user to assign role to 

536 role_id: ID of role to assign 

537 scope: Scope of assignment ('global', 'team', 'personal') 

538 scope_id: Team ID if team-scoped 

539 granted_by: Email of user granting the role 

540 expires_at: Optional expiration datetime 

541 grant_source: Origin of the grant (e.g., 'sso', 'manual', 'bootstrap', 'auto') 

542 

543 Returns: 

544 UserRole: The role assignment 

545 

546 Raises: 

547 ValueError: If invalid parameters or assignment already exists 

548 

549 Examples: 

550 Coroutine check: 

551 >>> import asyncio 

552 >>> from unittest.mock import Mock 

553 >>> service = RoleService(Mock()) 

554 >>> asyncio.iscoroutinefunction(service.assign_role_to_user) 

555 True 

556 

557 Scope mismatch raises error: 

558 >>> from unittest.mock import AsyncMock, patch 

559 >>> role = type('Role', (), {'is_active': True, 'scope': 'team'})() 

560 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

561 ... try: 

562 ... asyncio.run(service.assign_role_to_user('u@e','rid','global',None,'admin')) 

563 ... except ValueError as e: 

564 ... "doesn't match" in str(e) 

565 True 

566 

567 Team scope requires scope_id: 

568 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

569 ... try: 

570 ... asyncio.run(service.assign_role_to_user('u@e','rid','team',None,'admin')) 

571 ... except ValueError as e: 

572 ... 'scope_id required' in str(e) 

573 True 

574 

575 Global scope forbids scope_id: 

576 >>> role.scope = 'global' 

577 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

578 ... try: 

579 ... asyncio.run(service.assign_role_to_user('u@e','rid','global','x','admin')) 

580 ... except ValueError as e: 

581 ... 'not allowed for global' in str(e) 

582 True 

583 

584 Duplicate active assignment is rejected: 

585 >>> role.scope = 'team' 

586 >>> active = type('UR', (), {'is_active': True, 'is_expired': lambda self: False})() 

587 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

588 ... with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=active)): 

589 ... try: 

590 ... asyncio.run(service.assign_role_to_user('u@e','rid','team','t1','admin')) 

591 ... except ValueError as e: 

592 ... 'already has this role' in str(e) 

593 True 

594 

595 Role not found or inactive raises: 

596 >>> inactive = type('Role', (), {'is_active': False, 'scope': 'team'})() 

597 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=inactive)): 

598 ... try: 

599 ... asyncio.run(service.assign_role_to_user('u@e','rid','team','t1','admin')) 

600 ... except ValueError as e: 

601 ... 'not found or inactive' in str(e) 

602 True 

603 """ 

604 # Validate role exists and is active 

605 role = await self.get_role_by_id(role_id) 

606 if not role or not role.is_active: 

607 raise ValueError(f"Role not found or inactive: {role_id}") 

608 

609 # Validate scope consistency 

610 if role.scope != scope: 

611 raise ValueError(f"Role scope '{role.scope}' doesn't match assignment scope '{scope}'") 

612 

613 # Validate scope_id requirements 

614 if scope == "team" and not scope_id: 

615 raise ValueError("scope_id required for team-scoped assignments") 

616 if scope in ["global", "personal"] and scope_id: 

617 raise ValueError(f"scope_id not allowed for {scope} assignments") 

618 

619 # Check for existing active assignment 

620 existing = await self.get_user_role_assignment(user_email, role_id, scope, scope_id) 

621 if existing and existing.is_active and not existing.is_expired(): 

622 raise ValueError("User already has this role assignment") 

623 

624 # Create the assignment 

625 user_role = UserRole(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id, granted_by=granted_by, expires_at=expires_at, grant_source=grant_source) 

626 

627 self.db.add(user_role) 

628 self.db.commit() 

629 self.db.refresh(user_role) 

630 

631 logger.info(f"Assigned role {role.name} to {user_email} (scope: {scope}, scope_id: {scope_id})") 

632 return user_role 

633 

634 async def revoke_role_from_user(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str]) -> bool: 

635 """Revoke a role from a user. 

636 

637 Args: 

638 user_email: Email of user 

639 role_id: ID of role to revoke 

640 scope: Scope of assignment 

641 scope_id: Team ID if team-scoped 

642 

643 Returns: 

644 bool: True if role was revoked, False if not found 

645 

646 Examples: 

647 Coroutine check: 

648 >>> import asyncio 

649 >>> from unittest.mock import Mock 

650 >>> service = RoleService(Mock()) 

651 >>> asyncio.iscoroutinefunction(service.revoke_role_from_user) 

652 True 

653 

654 Returns False when assignment not found or inactive: 

655 >>> from unittest.mock import AsyncMock, patch 

656 >>> with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=None)): 

657 ... asyncio.run(service.revoke_role_from_user('u','r','team','t')) 

658 False 

659 

660 Returns True on successful revoke: 

661 >>> active = type('UR', (), {'is_active': True})() 

662 >>> with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=active)): 

663 ... asyncio.run(service.revoke_role_from_user('u','r','team','t')) 

664 True 

665 """ 

666 user_role = await self.get_user_role_assignment(user_email, role_id, scope, scope_id) 

667 

668 if not user_role or not user_role.is_active: 

669 return False 

670 

671 user_role.is_active = False 

672 self.db.commit() 

673 

674 logger.info(f"Revoked role {role_id} from {user_email} (scope: {scope}, scope_id: {scope_id})") 

675 return True 

676 

677 async def get_user_role_assignment(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str]) -> Optional[UserRole]: 

678 """Get a specific user role assignment. 

679 

680 Args: 

681 user_email: Email of user 

682 role_id: ID of role 

683 scope: Scope of assignment 

684 scope_id: Team ID if team-scoped 

685 

686 Returns: 

687 Optional[UserRole]: The role assignment if found 

688 

689 Examples: 

690 Coroutine check: 

691 >>> import asyncio 

692 >>> from unittest.mock import Mock 

693 >>> service = RoleService(Mock()) 

694 >>> asyncio.iscoroutinefunction(service.get_user_role_assignment) 

695 True 

696 """ 

697 conditions = [UserRole.user_email == user_email, UserRole.role_id == role_id, UserRole.scope == scope] 

698 

699 if scope_id: 

700 conditions.append(UserRole.scope_id == scope_id) 

701 else: 

702 conditions.append(UserRole.scope_id.is_(None)) 

703 

704 result = self.db.execute(select(UserRole).where(and_(*conditions))) 

705 user_role = result.scalar_one_or_none() 

706 return user_role 

707 

708 async def list_user_roles(self, user_email: str, scope: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

709 """List all role assignments for a user. 

710 

711 Args: 

712 user_email: Email of user 

713 scope: Filter by scope 

714 include_expired: Whether to include expired roles 

715 

716 Returns: 

717 List[UserRole]: User's role assignments 

718 

719 Examples: 

720 Coroutine check: 

721 >>> import asyncio 

722 >>> from unittest.mock import Mock 

723 >>> service = RoleService(Mock()) 

724 >>> asyncio.iscoroutinefunction(service.list_user_roles) 

725 True 

726 >>> # Simulate scalar results aggregation 

727 >>> class _Res: 

728 ... def scalars(self): 

729 ... class _S: 

730 ... def all(self): 

731 ... return ['ur1', 'ur2'] 

732 ... return _S() 

733 >>> service.db.execute = lambda *_a, **_k: _Res() 

734 >>> result = asyncio.run(service.list_user_roles('u@example.com', 'team')) 

735 >>> isinstance(result, list) and len(result) == 2 

736 True 

737 """ 

738 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

739 

740 if scope: 

741 query = query.where(UserRole.scope == scope) 

742 

743 if not include_expired: 

744 now = utc_now() 

745 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

746 

747 query = query.order_by(UserRole.scope, Role.name) 

748 

749 result = self.db.execute(query) 

750 user_roles = result.scalars().all() 

751 return user_roles 

752 

753 async def list_role_assignments(self, role_id: str, scope: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

754 """List all user assignments for a role. 

755 

756 Args: 

757 role_id: ID of role 

758 scope: Filter by scope 

759 include_expired: Whether to include expired assignments 

760 

761 Returns: 

762 List[UserRole]: Role assignments 

763 

764 Examples: 

765 Coroutine check: 

766 >>> import asyncio 

767 >>> from unittest.mock import Mock 

768 >>> service = RoleService(Mock()) 

769 >>> asyncio.iscoroutinefunction(service.list_role_assignments) 

770 True 

771 >>> # Simulate scalar results aggregation 

772 >>> class _Res: 

773 ... def scalars(self): 

774 ... class _S: 

775 ... def all(self): 

776 ... return [] 

777 ... return _S() 

778 >>> service.db.execute = lambda *_a, **_k: _Res() 

779 >>> asyncio.run(service.list_role_assignments('rid')) 

780 [] 

781 """ 

782 query = select(UserRole).where(and_(UserRole.role_id == role_id, UserRole.is_active.is_(True))) 

783 

784 if scope: 

785 query = query.where(UserRole.scope == scope) 

786 

787 if not include_expired: 

788 now = utc_now() 

789 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

790 

791 query = query.order_by(UserRole.user_email) 

792 

793 result = self.db.execute(query) 

794 assignments = result.scalars().all() 

795 return assignments 

796 

797 async def _would_create_cycle(self, parent_id: str, child_id: Optional[str]) -> bool: 

798 """Check if setting parent_id as parent of child_id would create a cycle. 

799 

800 Args: 

801 parent_id: ID of the proposed parent role 

802 child_id: ID of the proposed child role 

803 

804 Returns: 

805 True if setting this relationship would create a cycle, False otherwise 

806 

807 Examples: 

808 Test cycle detection logic: 

809 >>> from mcpgateway.services.role_service import RoleService 

810 

811 Basic parameter validation: 

812 >>> parent_id = "role-admin" 

813 >>> child_id = "role-user" 

814 >>> parent_id != child_id 

815 True 

816 >>> isinstance(parent_id, str) 

817 True 

818 >>> isinstance(child_id, str) 

819 True 

820 

821 Test None child_id handling (line 584-585): 

822 >>> child_id_none = None 

823 >>> child_id_none is None 

824 True 

825 >>> # This should return False without cycle check 

826 

827 Test cycle detection scenarios: 

828 >>> # Direct cycle: A -> A 

829 >>> same_id = "role-123" 

830 >>> same_id == same_id 

831 True 

832 

833 >>> # Simple cycle: A -> B, B -> A 

834 >>> role_a = "role-a" 

835 >>> role_b = "role-b" 

836 >>> role_a != role_b 

837 True 

838 

839 Test visited set logic: 

840 >>> visited = set() 

841 >>> current = "role-1" 

842 >>> current not in visited 

843 True 

844 >>> visited.add(current) 

845 >>> current in visited 

846 True 

847 

848 Test role hierarchy traversal: 

849 >>> # Test hierarchy: admin -> manager -> user 

850 >>> admin_role = "admin-role" 

851 >>> manager_role = "manager-role" 

852 >>> user_role = "user-role" 

853 >>> all(isinstance(r, str) for r in [admin_role, manager_role, user_role]) 

854 True 

855 >>> len({admin_role, manager_role, user_role}) == 3 

856 True 

857 """ 

858 if not child_id: 

859 return False 

860 

861 visited = set() 

862 current = parent_id 

863 

864 while current and current not in visited: 

865 if current == child_id: 

866 return True 

867 

868 visited.add(current) 

869 

870 # Get parent of current role 

871 result = self.db.execute(select(Role.inherits_from).where(Role.id == current)) 

872 current = result.scalar_one_or_none() 

873 

874 return False 

875 

876 async def delete_all_user_roles(self, user_email: str) -> int: 

877 """Delete all role assignments for a user. 

878 

879 Hard-deletes all role assignments (active and inactive) for the given user. 

880 Intended for use when permanently deleting a user account. 

881 

882 Note: Does not commit the transaction. The caller is responsible for 

883 committing (e.g., as part of a larger user deletion operation). 

884 

885 Args: 

886 user_email: Email of user whose roles should be deleted 

887 

888 Returns: 

889 int: Number of role assignments deleted 

890 """ 

891 stmt = delete(UserRole).where(UserRole.user_email == user_email) 

892 result = self.db.execute(stmt) 

893 deleted_count = result.rowcount 

894 logger.info(f"Deleted {deleted_count} role assignment(s) for user {user_email}") 

895 return deleted_count