Coverage for mcpgateway / services / role_service.py: 99%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/role_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Role Management Service for RBAC System. 

8 

9This module provides CRUD operations for roles and user role assignments. 

10It handles role creation, assignment, revocation, and validation. 

11""" 

12 

13# Standard 

14from datetime import datetime 

15import logging 

16from typing import List, Optional 

17 

18# Third-Party 

19from sqlalchemy import and_, delete, select 

20from sqlalchemy.orm import Session 

21 

22# First-Party 

23from mcpgateway.db import Permissions, Role, UserRole, utc_now 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class RoleService: 

29 """Service for managing roles and role assignments. 

30 

31 Provides comprehensive role management including creation, assignment, 

32 revocation, and validation with support for role inheritance. 

33 

34 Attributes: 

35 Database session 

36 

37 Examples: 

38 Basic construction: 

39 >>> from unittest.mock import Mock 

40 >>> service = RoleService(Mock()) 

41 >>> isinstance(service, RoleService) 

42 True 

43 >>> hasattr(service, 'db') 

44 True 

45 """ 

46 

47 def __init__(self, db: Session): 

48 """Initialize role service. 

49 

50 Args: 

51 db: Database session 

52 

53 Examples: 

54 Basic initialization: 

55 >>> from mcpgateway.services.role_service import RoleService 

56 >>> from unittest.mock import Mock 

57 >>> db_session = Mock() 

58 >>> service = RoleService(db_session) 

59 >>> service.db is db_session 

60 True 

61 

62 Service instance attributes: 

63 >>> hasattr(service, 'db') 

64 True 

65 >>> service.__class__.__name__ 

66 'RoleService' 

67 """ 

68 self.db = db 

69 

70 async def create_role(self, name: str, description: str, scope: str, permissions: List[str], created_by: str, inherits_from: Optional[str] = None, is_system_role: bool = False) -> Role: 

71 """Create a new role. 

72 

73 Args: 

74 name: Role name (must be unique within scope) 

75 description: Role description 

76 scope: Role scope ('global', 'team', 'personal') 

77 permissions: List of permission strings 

78 created_by: Email of user creating the role 

79 inherits_from: ID of parent role for inheritance 

80 is_system_role: Whether this is a system-defined role 

81 

82 Returns: 

83 Role: The created role 

84 

85 Raises: 

86 ValueError: If role name already exists or invalid parameters 

87 

88 Examples: 

89 Basic role creation parameters: 

90 >>> from mcpgateway.services.role_service import RoleService 

91 >>> role_name = "developer" 

92 >>> len(role_name) > 0 

93 True 

94 >>> role_scope = "team" 

95 >>> role_scope in ["global", "team", "personal"] 

96 True 

97 >>> permissions = ["tools.read", "tools.execute"] 

98 >>> all(isinstance(p, str) for p in permissions) 

99 True 

100 

101 Role validation logic: 

102 >>> # Test role name validation 

103 >>> test_name = "admin-role" 

104 >>> len(test_name) <= 255 

105 True 

106 >>> bool(test_name.strip()) 

107 True 

108 

109 >>> # Test scope validation 

110 >>> valid_scopes = ["global", "team", "personal"] 

111 >>> "team" in valid_scopes 

112 True 

113 >>> "invalid" in valid_scopes 

114 False 

115 

116 >>> # Test permissions format 

117 >>> perms = ["users.read", "users.write", "teams.manage"] 

118 >>> all("." in p for p in perms) 

119 True 

120 >>> all(len(p) > 0 for p in perms) 

121 True 

122 

123 Role inheritance validation: 

124 >>> # Test inherits_from parameter 

125 >>> parent_role_id = "role-123" 

126 >>> isinstance(parent_role_id, str) 

127 True 

128 >>> parent_role_id != "" 

129 True 

130 

131 System role flags: 

132 >>> is_system = True 

133 >>> isinstance(is_system, bool) 

134 True 

135 >>> is_admin_role = False 

136 >>> isinstance(is_admin_role, bool) 

137 True 

138 

139 Creator validation: 

140 >>> created_by = "admin@example.com" 

141 >>> "@" in created_by 

142 True 

143 >>> len(created_by) > 0 

144 True 

145 

146 Invalid scope is rejected immediately: 

147 >>> import asyncio 

148 >>> from unittest.mock import Mock 

149 >>> svc = RoleService(Mock()) 

150 >>> try: 

151 ... asyncio.run(svc.create_role('n','d','invalid',[], 'u@example.com')) 

152 ... except ValueError as e: 

153 ... 'Invalid scope' in str(e) 

154 True 

155 

156 Duplicate name rejected within scope: 

157 >>> from unittest.mock import AsyncMock, patch 

158 >>> svc = RoleService(Mock()) 

159 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=object())): 

160 ... try: 

161 ... asyncio.run(svc.create_role('dup','d','global',[], 'u@example.com')) 

162 ... except ValueError as e: 

163 ... 'already exists' in str(e) 

164 True 

165 

166 Invalid permissions rejected: 

167 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)): 

168 ... with patch('mcpgateway.services.role_service.Permissions.get_all_permissions', return_value=[]): 

169 ... try: 

170 ... asyncio.run(svc.create_role('n','d','global',['bad'], 'u@example.com')) 

171 ... except ValueError as e: 

172 ... 'Invalid permissions' in str(e) 

173 True 

174 

175 Parent not found and cycle detection: 

176 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)): 

177 ... with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)): 

178 ... try: 

179 ... asyncio.run(svc.create_role('n','d','global',[], 'u@example.com', inherits_from='p')) 

180 ... except ValueError as e: 

181 ... 'Parent role not found' in str(e) 

182 True 

183 >>> with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=None)): 

184 ... with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=object())): 

185 ... with patch.object(RoleService, '_would_create_cycle', new=AsyncMock(return_value=True)): 

186 ... try: 

187 ... asyncio.run(svc.create_role('n','d','global',[], 'u@example.com', inherits_from='p')) 

188 ... except ValueError as e: 

189 ... 'create a cycle' in str(e) 

190 True 

191 """ 

192 # Validate scope 

193 if scope not in ["global", "team", "personal"]: 

194 raise ValueError(f"Invalid scope: {scope}") 

195 

196 # Check for duplicate name within scope 

197 existing = await self.get_role_by_name(name, scope) 

198 if existing: 

199 raise ValueError(f"Role '{name}' already exists in scope '{scope}'") 

200 

201 # Validate permissions 

202 valid_permissions = Permissions.get_all_permissions() 

203 valid_permissions.append(Permissions.ALL_PERMISSIONS) # Allow wildcard 

204 

205 invalid_perms = [p for p in permissions if p not in valid_permissions] 

206 if invalid_perms: 

207 raise ValueError(f"Invalid permissions: {invalid_perms}") 

208 

209 # Validate inheritance 

210 parent_role = None 

211 if inherits_from: 

212 parent_role = await self.get_role_by_id(inherits_from) 

213 if not parent_role: 

214 raise ValueError(f"Parent role not found: {inherits_from}") 

215 

216 # Check for circular inheritance 

217 if await self._would_create_cycle(inherits_from, None): 

218 raise ValueError("Role inheritance would create a cycle") 

219 

220 # Create the role 

221 role = Role(name=name, description=description, scope=scope, permissions=permissions, created_by=created_by, inherits_from=inherits_from, is_system_role=is_system_role) 

222 

223 self.db.add(role) 

224 self.db.commit() 

225 self.db.refresh(role) 

226 

227 logger.info(f"Created role: {role.name} (scope: {role.scope}, id: {role.id})") 

228 return role 

229 

230 async def get_role_by_id(self, role_id: str) -> Optional[Role]: 

231 """Get role by ID. 

232 

233 Args: 

234 role_id: Role ID to lookup 

235 

236 Returns: 

237 Optional[Role]: The role if found, None otherwise 

238 

239 Examples: 

240 Check coroutine nature and signature: 

241 >>> import asyncio 

242 >>> from unittest.mock import Mock 

243 >>> service = RoleService(Mock()) 

244 >>> asyncio.iscoroutinefunction(service.get_role_by_id) 

245 True 

246 """ 

247 result = self.db.execute(select(Role).where(Role.id == role_id)) 

248 role = result.scalar_one_or_none() 

249 return role 

250 

251 async def get_role_by_name(self, name: str, scope: str) -> Optional[Role]: 

252 """Get role by name and scope. 

253 

254 Args: 

255 name: Role name 

256 scope: Role scope 

257 

258 Returns: 

259 Optional[Role]: The role if found, None otherwise 

260 

261 Examples: 

262 Basic callable validation: 

263 >>> import asyncio 

264 >>> from unittest.mock import Mock 

265 >>> service = RoleService(Mock()) 

266 >>> asyncio.iscoroutinefunction(service.get_role_by_name) 

267 True 

268 """ 

269 result = self.db.execute(select(Role).where(and_(Role.name == name, Role.scope == scope, Role.is_active.is_(True)))) 

270 role = result.scalar_one_or_none() 

271 return role 

272 

273 async def list_roles(self, scope: Optional[str] = None, include_system: bool = True, include_inactive: bool = False) -> List[Role]: 

274 """List roles with optional filtering. 

275 

276 Args: 

277 scope: Filter by scope ('global', 'team', 'personal') 

278 include_system: Whether to include system roles 

279 include_inactive: Whether to include inactive roles 

280 

281 Returns: 

282 List[Role]: List of matching roles 

283 

284 Examples: 

285 Callable check: 

286 >>> import asyncio 

287 >>> from unittest.mock import Mock 

288 >>> service = RoleService(Mock()) 

289 >>> asyncio.iscoroutinefunction(service.list_roles) 

290 True 

291 >>> # Simulate empty list result 

292 >>> class _Res: 

293 ... def scalars(self): 

294 ... class _S: 

295 ... def all(self): 

296 ... return [] 

297 ... return _S() 

298 >>> service.db.execute = lambda *_a, **_k: _Res() 

299 >>> asyncio.run(service.list_roles('team', include_system=False, include_inactive=True)) == [] 

300 True 

301 """ 

302 query = select(Role) 

303 

304 conditions = [] 

305 

306 if scope: 

307 conditions.append(Role.scope == scope) 

308 

309 if not include_system: 

310 conditions.append(Role.is_system_role.is_(False)) 

311 

312 if not include_inactive: 

313 conditions.append(Role.is_active.is_(True)) 

314 

315 if conditions: 

316 query = query.where(and_(*conditions)) 

317 

318 query = query.order_by(Role.scope, Role.name) 

319 

320 result = self.db.execute(query) 

321 roles = result.scalars().all() 

322 return roles 

323 

324 async def update_role( 

325 self, 

326 role_id: str, 

327 name: Optional[str] = None, 

328 description: Optional[str] = None, 

329 permissions: Optional[List[str]] = None, 

330 inherits_from: Optional[str] = None, 

331 is_active: Optional[bool] = None, 

332 ) -> Optional[Role]: 

333 """Update an existing role. 

334 

335 Args: 

336 role_id: ID of role to update 

337 name: New role name 

338 description: New role description 

339 permissions: New permissions list 

340 inherits_from: New parent role ID 

341 is_active: New active status 

342 

343 Returns: 

344 Optional[Role]: Updated role or None if not found 

345 

346 Raises: 

347 ValueError: If update would create invalid state 

348 

349 Examples: 

350 Signature and coroutine checks: 

351 >>> import asyncio, inspect 

352 >>> from unittest.mock import Mock 

353 >>> service = RoleService(Mock()) 

354 >>> asyncio.iscoroutinefunction(service.update_role) 

355 True 

356 >>> params = inspect.signature(RoleService.update_role).parameters 

357 >>> all(p in params for p in [ 

358 ... 'role_id', 'name', 'description', 'permissions', 'inherits_from', 'is_active' 

359 ... ]) 

360 True 

361 

362 Additional validation paths: 

363 Cannot modify system roles: 

364 >>> from unittest.mock import AsyncMock, patch 

365 >>> service = RoleService(object()) 

366 >>> mock_role = type('R', (), {'is_system_role': True})() 

367 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=mock_role)): 

368 ... try: 

369 ... _ = asyncio.run(service.update_role('rid')) 

370 ... except ValueError as e: 

371 ... 'system roles' in str(e) 

372 True 

373 

374 Returns None when role not found: 

375 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)): 

376 ... asyncio.run(service.update_role('missing')) is None 

377 True 

378 

379 Duplicate new name rejected: 

380 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'is_active': True})() 

381 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

382 ... with patch.object(RoleService, 'get_role_by_name', new=AsyncMock(return_value=type('R2', (), {'id': 'id2'})())): 

383 ... try: 

384 ... asyncio.run(service.update_role('id1', name='new')) 

385 ... except ValueError as e: 

386 ... 'already exists' in str(e) 

387 True 

388 

389 Invalid permissions rejected on update: 

390 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'is_active': True})() 

391 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

392 ... with patch('mcpgateway.services.role_service.Permissions.get_all_permissions', return_value=[]): 

393 ... try: 

394 ... asyncio.run(service.update_role('id1', permissions=['bad'])) 

395 ... except ValueError as e: 

396 ... 'Invalid permissions' in str(e) 

397 True 

398 

399 Parent not found and cycle detection on update: 

400 >>> role = type('R', (), {'is_system_role': False, 'name': 'old', 'scope': 'global', 'id': 'id1', 'inherits_from': None, 'is_active': True})() 

401 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(side_effect=[role, None])): 

402 ... try: 

403 ... asyncio.run(service.update_role('id1', inherits_from='p')) 

404 ... except ValueError as e: 

405 ... 'Parent role not found' in str(e) 

406 True 

407 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(side_effect=[role, object()])): 

408 ... with patch.object(RoleService, '_would_create_cycle', new=AsyncMock(return_value=True)): 

409 ... try: 

410 ... asyncio.run(service.update_role('id1', inherits_from='p')) 

411 ... except ValueError as e: 

412 ... 'create a cycle' in str(e) 

413 True 

414 """ 

415 role = await self.get_role_by_id(role_id) 

416 if not role: 

417 return None 

418 

419 # Prevent modification of system roles 

420 if role.is_system_role: 

421 raise ValueError("Cannot modify system roles") 

422 

423 # Validate new name if provided 

424 if name and name != role.name: 

425 existing = await self.get_role_by_name(name, role.scope) 

426 if existing and existing.id != role_id: 426 ↛ 428line 426 didn't jump to line 428 because the condition on line 426 was always true

427 raise ValueError(f"Role '{name}' already exists in scope '{role.scope}'") 

428 role.name = name 

429 

430 # Update description 

431 if description is not None: 

432 role.description = description 

433 

434 # Validate and update permissions 

435 if permissions is not None: 

436 valid_permissions = Permissions.get_all_permissions() 

437 valid_permissions.append(Permissions.ALL_PERMISSIONS) 

438 

439 invalid_perms = [p for p in permissions if p not in valid_permissions] 

440 if invalid_perms: 

441 raise ValueError(f"Invalid permissions: {invalid_perms}") 

442 

443 role.permissions = permissions 

444 

445 # Validate and update inheritance 

446 if inherits_from is not None: 

447 if inherits_from != role.inherits_from: 447 ↛ 460line 447 didn't jump to line 460 because the condition on line 447 was always true

448 if inherits_from: 

449 parent_role = await self.get_role_by_id(inherits_from) 

450 if not parent_role: 

451 raise ValueError(f"Parent role not found: {inherits_from}") 

452 

453 # Check for circular inheritance 

454 if await self._would_create_cycle(inherits_from, role_id): 

455 raise ValueError("Role inheritance would create a cycle") 

456 

457 role.inherits_from = inherits_from 

458 

459 # Update active status 

460 if is_active is not None: 

461 role.is_active = is_active 

462 

463 # Update timestamp 

464 role.updated_at = utc_now() 

465 

466 self.db.commit() 

467 self.db.refresh(role) 

468 

469 logger.info(f"Updated role: {role.name} (id: {role.id})") 

470 return role 

471 

472 async def delete_role(self, role_id: str) -> bool: 

473 """Delete a role. 

474 

475 Soft deletes the role by setting is_active to False. 

476 Also deactivates all user role assignments. 

477 

478 Args: 

479 role_id: ID of role to delete 

480 

481 Returns: 

482 bool: True if role was deleted, False if not found 

483 

484 Raises: 

485 ValueError: If trying to delete a system role 

486 

487 Examples: 

488 Coroutine check: 

489 >>> import asyncio 

490 >>> from unittest.mock import Mock 

491 >>> service = RoleService(Mock()) 

492 >>> asyncio.iscoroutinefunction(service.delete_role) 

493 True 

494 

495 Returns False when role not found: 

496 >>> from unittest.mock import AsyncMock, patch 

497 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=None)): 

498 ... asyncio.run(service.delete_role('missing')) 

499 False 

500 

501 System roles cannot be deleted: 

502 >>> sys_role = type('R', (), {'is_system_role': True})() 

503 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=sys_role)): 

504 ... try: 

505 ... asyncio.run(service.delete_role('rid')) 

506 ... except ValueError as e: 

507 ... 'system roles' in str(e) 

508 True 

509 """ 

510 role = await self.get_role_by_id(role_id) 

511 if not role: 

512 return False 

513 

514 if role.is_system_role: 

515 raise ValueError("Cannot delete system roles") 

516 

517 # Soft delete the role 

518 role.is_active = False 

519 role.updated_at = utc_now() 

520 

521 # Deactivate all user assignments of this role 

522 self.db.execute(select(UserRole).where(UserRole.role_id == role_id)).update({"is_active": False}) 

523 

524 self.db.commit() 

525 

526 logger.info(f"Deleted role: {role.name} (id: {role.id})") 

527 return True 

528 

529 async def assign_role_to_user(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str], granted_by: str, expires_at: Optional[datetime] = None) -> UserRole: 

530 """Assign a role to a user. 

531 

532 Args: 

533 user_email: Email of user to assign role to 

534 role_id: ID of role to assign 

535 scope: Scope of assignment ('global', 'team', 'personal') 

536 scope_id: Team ID if team-scoped 

537 granted_by: Email of user granting the role 

538 expires_at: Optional expiration datetime 

539 

540 Returns: 

541 UserRole: The role assignment 

542 

543 Raises: 

544 ValueError: If invalid parameters or assignment already exists 

545 

546 Examples: 

547 Coroutine check: 

548 >>> import asyncio 

549 >>> from unittest.mock import Mock 

550 >>> service = RoleService(Mock()) 

551 >>> asyncio.iscoroutinefunction(service.assign_role_to_user) 

552 True 

553 

554 Scope mismatch raises error: 

555 >>> from unittest.mock import AsyncMock, patch 

556 >>> role = type('Role', (), {'is_active': True, 'scope': 'team'})() 

557 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

558 ... try: 

559 ... asyncio.run(service.assign_role_to_user('u@e','rid','global',None,'admin')) 

560 ... except ValueError as e: 

561 ... "doesn't match" in str(e) 

562 True 

563 

564 Team scope requires scope_id: 

565 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

566 ... try: 

567 ... asyncio.run(service.assign_role_to_user('u@e','rid','team',None,'admin')) 

568 ... except ValueError as e: 

569 ... 'scope_id required' in str(e) 

570 True 

571 

572 Global scope forbids scope_id: 

573 >>> role.scope = 'global' 

574 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

575 ... try: 

576 ... asyncio.run(service.assign_role_to_user('u@e','rid','global','x','admin')) 

577 ... except ValueError as e: 

578 ... 'not allowed for global' in str(e) 

579 True 

580 

581 Duplicate active assignment is rejected: 

582 >>> role.scope = 'team' 

583 >>> active = type('UR', (), {'is_active': True, 'is_expired': lambda self: False})() 

584 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=role)): 

585 ... with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=active)): 

586 ... try: 

587 ... asyncio.run(service.assign_role_to_user('u@e','rid','team','t1','admin')) 

588 ... except ValueError as e: 

589 ... 'already has this role' in str(e) 

590 True 

591 

592 Role not found or inactive raises: 

593 >>> inactive = type('Role', (), {'is_active': False, 'scope': 'team'})() 

594 >>> with patch.object(RoleService, 'get_role_by_id', new=AsyncMock(return_value=inactive)): 

595 ... try: 

596 ... asyncio.run(service.assign_role_to_user('u@e','rid','team','t1','admin')) 

597 ... except ValueError as e: 

598 ... 'not found or inactive' in str(e) 

599 True 

600 """ 

601 # Validate role exists and is active 

602 role = await self.get_role_by_id(role_id) 

603 if not role or not role.is_active: 

604 raise ValueError(f"Role not found or inactive: {role_id}") 

605 

606 # Validate scope consistency 

607 if role.scope != scope: 

608 raise ValueError(f"Role scope '{role.scope}' doesn't match assignment scope '{scope}'") 

609 

610 # Validate scope_id requirements 

611 if scope == "team" and not scope_id: 

612 raise ValueError("scope_id required for team-scoped assignments") 

613 if scope in ["global", "personal"] and scope_id: 

614 raise ValueError(f"scope_id not allowed for {scope} assignments") 

615 

616 # Check for existing active assignment 

617 existing = await self.get_user_role_assignment(user_email, role_id, scope, scope_id) 

618 if existing and existing.is_active and not existing.is_expired(): 

619 raise ValueError("User already has this role assignment") 

620 

621 # Create the assignment 

622 user_role = UserRole(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id, granted_by=granted_by, expires_at=expires_at) 

623 

624 self.db.add(user_role) 

625 self.db.commit() 

626 self.db.refresh(user_role) 

627 

628 logger.info(f"Assigned role {role.name} to {user_email} (scope: {scope}, scope_id: {scope_id})") 

629 return user_role 

630 

631 async def revoke_role_from_user(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str]) -> bool: 

632 """Revoke a role from a user. 

633 

634 Args: 

635 user_email: Email of user 

636 role_id: ID of role to revoke 

637 scope: Scope of assignment 

638 scope_id: Team ID if team-scoped 

639 

640 Returns: 

641 bool: True if role was revoked, False if not found 

642 

643 Examples: 

644 Coroutine check: 

645 >>> import asyncio 

646 >>> from unittest.mock import Mock 

647 >>> service = RoleService(Mock()) 

648 >>> asyncio.iscoroutinefunction(service.revoke_role_from_user) 

649 True 

650 

651 Returns False when assignment not found or inactive: 

652 >>> from unittest.mock import AsyncMock, patch 

653 >>> with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=None)): 

654 ... asyncio.run(service.revoke_role_from_user('u','r','team','t')) 

655 False 

656 

657 Returns True on successful revoke: 

658 >>> active = type('UR', (), {'is_active': True})() 

659 >>> with patch.object(RoleService, 'get_user_role_assignment', new=AsyncMock(return_value=active)): 

660 ... asyncio.run(service.revoke_role_from_user('u','r','team','t')) 

661 True 

662 """ 

663 user_role = await self.get_user_role_assignment(user_email, role_id, scope, scope_id) 

664 

665 if not user_role or not user_role.is_active: 

666 return False 

667 

668 user_role.is_active = False 

669 self.db.commit() 

670 

671 logger.info(f"Revoked role {role_id} from {user_email} (scope: {scope}, scope_id: {scope_id})") 

672 return True 

673 

674 async def get_user_role_assignment(self, user_email: str, role_id: str, scope: str, scope_id: Optional[str]) -> Optional[UserRole]: 

675 """Get a specific user role assignment. 

676 

677 Args: 

678 user_email: Email of user 

679 role_id: ID of role 

680 scope: Scope of assignment 

681 scope_id: Team ID if team-scoped 

682 

683 Returns: 

684 Optional[UserRole]: The role assignment if found 

685 

686 Examples: 

687 Coroutine check: 

688 >>> import asyncio 

689 >>> from unittest.mock import Mock 

690 >>> service = RoleService(Mock()) 

691 >>> asyncio.iscoroutinefunction(service.get_user_role_assignment) 

692 True 

693 """ 

694 conditions = [UserRole.user_email == user_email, UserRole.role_id == role_id, UserRole.scope == scope] 

695 

696 if scope_id: 

697 conditions.append(UserRole.scope_id == scope_id) 

698 else: 

699 conditions.append(UserRole.scope_id.is_(None)) 

700 

701 result = self.db.execute(select(UserRole).where(and_(*conditions))) 

702 user_role = result.scalar_one_or_none() 

703 return user_role 

704 

705 async def list_user_roles(self, user_email: str, scope: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

706 """List all role assignments for a user. 

707 

708 Args: 

709 user_email: Email of user 

710 scope: Filter by scope 

711 include_expired: Whether to include expired roles 

712 

713 Returns: 

714 List[UserRole]: User's role assignments 

715 

716 Examples: 

717 Coroutine check: 

718 >>> import asyncio 

719 >>> from unittest.mock import Mock 

720 >>> service = RoleService(Mock()) 

721 >>> asyncio.iscoroutinefunction(service.list_user_roles) 

722 True 

723 >>> # Simulate scalar results aggregation 

724 >>> class _Res: 

725 ... def scalars(self): 

726 ... class _S: 

727 ... def all(self): 

728 ... return ['ur1', 'ur2'] 

729 ... return _S() 

730 >>> service.db.execute = lambda *_a, **_k: _Res() 

731 >>> result = asyncio.run(service.list_user_roles('u@example.com', 'team')) 

732 >>> isinstance(result, list) and len(result) == 2 

733 True 

734 """ 

735 query = select(UserRole).join(Role).where(and_(UserRole.user_email == user_email, UserRole.is_active.is_(True), Role.is_active.is_(True))) 

736 

737 if scope: 

738 query = query.where(UserRole.scope == scope) 

739 

740 if not include_expired: 

741 now = utc_now() 

742 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

743 

744 query = query.order_by(UserRole.scope, Role.name) 

745 

746 result = self.db.execute(query) 

747 user_roles = result.scalars().all() 

748 return user_roles 

749 

750 async def list_role_assignments(self, role_id: str, scope: Optional[str] = None, include_expired: bool = False) -> List[UserRole]: 

751 """List all user assignments for a role. 

752 

753 Args: 

754 role_id: ID of role 

755 scope: Filter by scope 

756 include_expired: Whether to include expired assignments 

757 

758 Returns: 

759 List[UserRole]: Role assignments 

760 

761 Examples: 

762 Coroutine check: 

763 >>> import asyncio 

764 >>> from unittest.mock import Mock 

765 >>> service = RoleService(Mock()) 

766 >>> asyncio.iscoroutinefunction(service.list_role_assignments) 

767 True 

768 >>> # Simulate scalar results aggregation 

769 >>> class _Res: 

770 ... def scalars(self): 

771 ... class _S: 

772 ... def all(self): 

773 ... return [] 

774 ... return _S() 

775 >>> service.db.execute = lambda *_a, **_k: _Res() 

776 >>> asyncio.run(service.list_role_assignments('rid')) 

777 [] 

778 """ 

779 query = select(UserRole).where(and_(UserRole.role_id == role_id, UserRole.is_active.is_(True))) 

780 

781 if scope: 

782 query = query.where(UserRole.scope == scope) 

783 

784 if not include_expired: 

785 now = utc_now() 

786 query = query.where((UserRole.expires_at.is_(None)) | (UserRole.expires_at > now)) 

787 

788 query = query.order_by(UserRole.user_email) 

789 

790 result = self.db.execute(query) 

791 assignments = result.scalars().all() 

792 return assignments 

793 

794 async def _would_create_cycle(self, parent_id: str, child_id: Optional[str]) -> bool: 

795 """Check if setting parent_id as parent of child_id would create a cycle. 

796 

797 Args: 

798 parent_id: ID of the proposed parent role 

799 child_id: ID of the proposed child role 

800 

801 Returns: 

802 True if setting this relationship would create a cycle, False otherwise 

803 

804 Examples: 

805 Test cycle detection logic: 

806 >>> from mcpgateway.services.role_service import RoleService 

807 

808 Basic parameter validation: 

809 >>> parent_id = "role-admin" 

810 >>> child_id = "role-user" 

811 >>> parent_id != child_id 

812 True 

813 >>> isinstance(parent_id, str) 

814 True 

815 >>> isinstance(child_id, str) 

816 True 

817 

818 Test None child_id handling (line 584-585): 

819 >>> child_id_none = None 

820 >>> child_id_none is None 

821 True 

822 >>> # This should return False without cycle check 

823 

824 Test cycle detection scenarios: 

825 >>> # Direct cycle: A -> A 

826 >>> same_id = "role-123" 

827 >>> same_id == same_id 

828 True 

829 

830 >>> # Simple cycle: A -> B, B -> A 

831 >>> role_a = "role-a" 

832 >>> role_b = "role-b" 

833 >>> role_a != role_b 

834 True 

835 

836 Test visited set logic: 

837 >>> visited = set() 

838 >>> current = "role-1" 

839 >>> current not in visited 

840 True 

841 >>> visited.add(current) 

842 >>> current in visited 

843 True 

844 

845 Test role hierarchy traversal: 

846 >>> # Test hierarchy: admin -> manager -> user 

847 >>> admin_role = "admin-role" 

848 >>> manager_role = "manager-role" 

849 >>> user_role = "user-role" 

850 >>> all(isinstance(r, str) for r in [admin_role, manager_role, user_role]) 

851 True 

852 >>> len({admin_role, manager_role, user_role}) == 3 

853 True 

854 """ 

855 if not child_id: 

856 return False 

857 

858 visited = set() 

859 current = parent_id 

860 

861 while current and current not in visited: 

862 if current == child_id: 

863 return True 

864 

865 visited.add(current) 

866 

867 # Get parent of current role 

868 result = self.db.execute(select(Role.inherits_from).where(Role.id == current)) 

869 current = result.scalar_one_or_none() 

870 

871 return False 

872 

873 async def delete_all_user_roles(self, user_email: str) -> int: 

874 """Delete all role assignments for a user. 

875 

876 Hard-deletes all role assignments (active and inactive) for the given user. 

877 Intended for use when permanently deleting a user account. 

878 

879 Note: Does not commit the transaction. The caller is responsible for 

880 committing (e.g., as part of a larger user deletion operation). 

881 

882 Args: 

883 user_email: Email of user whose roles should be deleted 

884 

885 Returns: 

886 int: Number of role assignments deleted 

887 """ 

888 stmt = delete(UserRole).where(UserRole.user_email == user_email) 

889 result = self.db.execute(stmt) 

890 deleted_count = result.rowcount 

891 logger.info(f"Deleted {deleted_count} role assignment(s) for user {user_email}") 

892 return deleted_count