Coverage for mcpgateway / routers / rbac.py: 100%

215 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/rbac.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7RBAC API Router. 

8 

9This module provides REST API endpoints for Role-Based Access Control (RBAC) 

10management including roles, user role assignments, and permission checking. 

11 

12Examples: 

13 >>> from mcpgateway.routers.rbac import router 

14 >>> from fastapi import APIRouter 

15 >>> isinstance(router, APIRouter) 

16 True 

17""" 

18 

19# Standard 

20from datetime import datetime, timezone 

21import logging 

22from typing import Generator, List, Optional 

23 

24# Third-Party 

25from fastapi import APIRouter, Depends, HTTPException, Query, status 

26from sqlalchemy.orm import Session 

27 

28# First-Party 

29from mcpgateway.common.validators import SecurityValidator 

30from mcpgateway.db import Permissions, SessionLocal 

31from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_admin_permission, require_permission 

32from mcpgateway.schemas import PermissionCheckRequest, PermissionCheckResponse, PermissionListResponse, RoleCreateRequest, RoleResponse, RoleUpdateRequest, UserRoleAssignRequest, UserRoleResponse 

33from mcpgateway.services.permission_service import PermissionService 

34from mcpgateway.services.role_service import RoleService 

35 

36logger = logging.getLogger(__name__) 

37 

38router = APIRouter(prefix="/rbac", tags=["RBAC"]) 

39 

40 

41def get_db() -> Generator[Session, None, None]: 

42 """Get database session for dependency injection. 

43 

44 Commits the transaction on successful completion to avoid implicit rollbacks 

45 for read-only operations. Rolls back explicitly on exception. 

46 

47 Yields: 

48 Session: SQLAlchemy database session 

49 

50 Raises: 

51 Exception: Re-raises any exception after rolling back the transaction. 

52 

53 Examples: 

54 >>> gen = get_db() 

55 >>> db = next(gen) 

56 >>> hasattr(db, 'close') 

57 True 

58 """ 

59 db = SessionLocal() 

60 try: 

61 yield db 

62 db.commit() 

63 except Exception: 

64 try: 

65 db.rollback() 

66 except Exception: 

67 try: 

68 db.invalidate() 

69 except Exception: 

70 pass # nosec B110 - Best effort cleanup on connection failure 

71 raise 

72 finally: 

73 db.close() 

74 

75 

76# ===== Role Management Endpoints ===== 

77 

78 

79@router.post("/roles", response_model=RoleResponse) 

80@require_admin_permission() 

81async def create_role(role_data: RoleCreateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

82 """Create a new role. 

83 

84 Requires admin permissions to create roles. 

85 

86 Args: 

87 role_data: Role creation data 

88 user: Current authenticated user 

89 db: Database session 

90 

91 Returns: 

92 RoleResponse: Created role details 

93 

94 Raises: 

95 HTTPException: If role creation fails 

96 

97 Examples: 

98 >>> import asyncio 

99 >>> asyncio.iscoroutinefunction(create_role) 

100 True 

101 """ 

102 try: 

103 role_service = RoleService(db) 

104 role = await role_service.create_role( 

105 name=role_data.name, 

106 description=role_data.description, 

107 scope=role_data.scope, 

108 permissions=role_data.permissions, 

109 inherits_from=role_data.inherits_from, 

110 created_by=user["email"], 

111 is_system_role=role_data.is_system_role or False, 

112 ) 

113 

114 logger.info(f"Role created: {role.id} by {SecurityValidator.sanitize_log_message(user['email'])}") 

115 db.commit() 

116 db.close() 

117 return RoleResponse.model_validate(role) 

118 

119 except ValueError as e: 

120 logger.error(f"Role creation validation error: {e}") 

121 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

122 except Exception as e: 

123 logger.error(f"Role creation failed: {e}") 

124 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create role") 

125 

126 

127@router.get("/roles", response_model=List[RoleResponse]) 

128@require_permission("admin.user_management") 

129async def list_roles( 

130 scope: Optional[str] = Query(None, description="Filter by scope"), 

131 active_only: bool = Query(True, description="Show only active roles"), 

132 user=Depends(get_current_user_with_permissions), 

133 db: Session = Depends(get_db), 

134): 

135 """List all roles. 

136 

137 Args: 

138 scope: Optional scope filter 

139 active_only: Whether to show only active roles 

140 user: Current authenticated user 

141 db: Database session 

142 

143 Returns: 

144 List[RoleResponse]: List of roles 

145 

146 Raises: 

147 HTTPException: If user lacks required permissions 

148 

149 Examples: 

150 >>> import asyncio 

151 >>> asyncio.iscoroutinefunction(list_roles) 

152 True 

153 """ 

154 try: 

155 role_service = RoleService(db) 

156 roles = await role_service.list_roles(scope=scope) 

157 # Release transaction before response serialization 

158 db.commit() 

159 db.close() 

160 

161 return [RoleResponse.model_validate(role) for role in roles] 

162 

163 except Exception as e: 

164 logger.error(f"Failed to list roles: {e}") 

165 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve roles") 

166 

167 

168@router.get("/roles/{role_id}", response_model=RoleResponse) 

169@require_permission("admin.user_management") 

170async def get_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

171 """Get role details by ID. 

172 

173 Args: 

174 role_id: Role identifier 

175 user: Current authenticated user 

176 db: Database session 

177 

178 Returns: 

179 RoleResponse: Role details 

180 

181 Raises: 

182 HTTPException: If role not found 

183 

184 Examples: 

185 >>> import asyncio 

186 >>> asyncio.iscoroutinefunction(get_role) 

187 True 

188 """ 

189 try: 

190 role_service = RoleService(db) 

191 role = await role_service.get_role_by_id(role_id) 

192 

193 if not role: 

194 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

195 

196 db.commit() 

197 db.close() 

198 return RoleResponse.model_validate(role) 

199 

200 except HTTPException: 

201 raise 

202 except Exception as e: 

203 logger.error(f"Failed to get role {role_id}: {e}") 

204 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve role") 

205 

206 

207@router.put("/roles/{role_id}", response_model=RoleResponse) 

208@require_admin_permission() 

209async def update_role(role_id: str, role_data: RoleUpdateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

210 """Update an existing role. 

211 

212 Args: 

213 role_id: Role identifier 

214 role_data: Role update data 

215 user: Current authenticated user 

216 db: Database session 

217 

218 Returns: 

219 RoleResponse: Updated role details 

220 

221 Raises: 

222 HTTPException: If role not found or update fails 

223 

224 Examples: 

225 >>> import asyncio 

226 >>> asyncio.iscoroutinefunction(update_role) 

227 True 

228 """ 

229 try: 

230 role_service = RoleService(db) 

231 role = await role_service.update_role(role_id, **role_data.model_dump(exclude_unset=True)) 

232 

233 if not role: 

234 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

235 

236 logger.info(f"Role updated: {role_id} by {SecurityValidator.sanitize_log_message(user['email'])}") 

237 db.commit() 

238 db.close() 

239 return RoleResponse.model_validate(role) 

240 

241 except HTTPException: 

242 raise 

243 except ValueError as e: 

244 logger.error(f"Role update validation error: {e}") 

245 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

246 except Exception as e: 

247 logger.error(f"Role update failed: {e}") 

248 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update role") 

249 

250 

251@router.delete("/roles/{role_id}") 

252@require_admin_permission() 

253async def delete_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

254 """Delete a role. 

255 

256 Args: 

257 role_id: Role identifier 

258 user: Current authenticated user 

259 db: Database session 

260 

261 Returns: 

262 dict: Success message 

263 

264 Raises: 

265 HTTPException: If role not found or deletion fails 

266 

267 Examples: 

268 >>> import asyncio 

269 >>> asyncio.iscoroutinefunction(delete_role) 

270 True 

271 """ 

272 try: 

273 role_service = RoleService(db) 

274 success = await role_service.delete_role(role_id) 

275 

276 if not success: 

277 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

278 

279 logger.info(f"Role deleted: {role_id} by {SecurityValidator.sanitize_log_message(user['email'])}") 

280 db.commit() 

281 db.close() 

282 return {"message": "Role deleted successfully"} 

283 

284 except HTTPException: 

285 raise 

286 except ValueError as e: 

287 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

288 except Exception as e: 

289 logger.error(f"Role deletion failed: {e}") 

290 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete role") 

291 

292 

293# ===== User Role Assignment Endpoints ===== 

294 

295 

296@router.post("/users/{user_email}/roles", response_model=UserRoleResponse) 

297@require_permission("admin.user_management") 

298async def assign_role_to_user(user_email: str, assignment_data: UserRoleAssignRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

299 """Assign a role to a user. 

300 

301 Args: 

302 user_email: User email address 

303 assignment_data: Role assignment data 

304 user: Current authenticated user 

305 db: Database session 

306 

307 Returns: 

308 UserRoleResponse: Created role assignment 

309 

310 Raises: 

311 HTTPException: If assignment fails 

312 

313 Examples: 

314 >>> import asyncio 

315 >>> asyncio.iscoroutinefunction(assign_role_to_user) 

316 True 

317 """ 

318 try: 

319 role_service = RoleService(db) 

320 user_role = await role_service.assign_role_to_user( 

321 user_email=user_email, role_id=assignment_data.role_id, scope=assignment_data.scope, scope_id=assignment_data.scope_id, granted_by=user["email"], expires_at=assignment_data.expires_at 

322 ) 

323 

324 logger.info(f"Role assigned: {assignment_data.role_id} to {SecurityValidator.sanitize_log_message(user_email)} by {SecurityValidator.sanitize_log_message(user['email'])}") 

325 db.commit() 

326 db.close() 

327 return UserRoleResponse.model_validate(user_role) 

328 

329 except ValueError as e: 

330 logger.error(f"Role assignment validation error: {e}") 

331 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

332 except Exception as e: 

333 logger.error(f"Role assignment failed: {e}") 

334 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to assign role") 

335 

336 

337@router.get("/users/{user_email}/roles", response_model=List[UserRoleResponse]) 

338@require_permission("admin.user_management") 

339async def get_user_roles( 

340 user_email: str, 

341 scope: Optional[str] = Query(None, description="Filter by scope"), 

342 active_only: bool = Query(True, description="Show only active assignments"), 

343 user=Depends(get_current_user_with_permissions), 

344 db: Session = Depends(get_db), 

345): 

346 """Get roles assigned to a user. 

347 

348 Args: 

349 user_email: User email address 

350 scope: Optional scope filter 

351 active_only: Whether to show only active assignments 

352 user: Current authenticated user 

353 db: Database session 

354 

355 Returns: 

356 List[UserRoleResponse]: User's role assignments 

357 

358 Raises: 

359 HTTPException: If role retrieval fails 

360 

361 Examples: 

362 >>> import asyncio 

363 >>> asyncio.iscoroutinefunction(get_user_roles) 

364 True 

365 """ 

366 try: 

367 permission_service = PermissionService(db) 

368 user_roles = await permission_service.get_user_roles(user_email=user_email, scope=scope, include_expired=not active_only) 

369 

370 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles] 

371 db.commit() 

372 db.close() 

373 return result 

374 

375 except Exception as e: 

376 logger.error(f"Failed to get user roles for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

377 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user roles") 

378 

379 

380@router.delete("/users/{user_email}/roles/{role_id}") 

381@require_permission("admin.user_management") 

382async def revoke_user_role( 

383 user_email: str, 

384 role_id: str, 

385 scope: Optional[str] = Query(None, description="Scope filter"), 

386 scope_id: Optional[str] = Query(None, description="Scope ID filter"), 

387 user=Depends(get_current_user_with_permissions), 

388 db: Session = Depends(get_db), 

389): 

390 """Revoke a role from a user. 

391 

392 Args: 

393 user_email: User email address 

394 role_id: Role identifier 

395 scope: Optional scope filter 

396 scope_id: Optional scope ID filter 

397 user: Current authenticated user 

398 db: Database session 

399 

400 Returns: 

401 dict: Success message 

402 

403 Raises: 

404 HTTPException: If revocation fails 

405 

406 Examples: 

407 >>> import asyncio 

408 >>> asyncio.iscoroutinefunction(revoke_user_role) 

409 True 

410 """ 

411 try: 

412 role_service = RoleService(db) 

413 success = await role_service.revoke_role_from_user(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id) 

414 

415 if not success: 

416 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role assignment not found") 

417 

418 logger.info(f"Role revoked: {role_id} from {SecurityValidator.sanitize_log_message(user_email)} by {SecurityValidator.sanitize_log_message(user['email'])}") 

419 db.commit() 

420 db.close() 

421 return {"message": "Role revoked successfully"} 

422 

423 except HTTPException: 

424 raise 

425 except Exception as e: 

426 logger.error(f"Role revocation failed: {e}") 

427 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to revoke role") 

428 

429 

430# ===== Permission Checking Endpoints ===== 

431 

432 

433@router.post("/permissions/check", response_model=PermissionCheckResponse) 

434@require_permission("admin.security_audit") 

435async def check_permission(check_data: PermissionCheckRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

436 """Check if a user has specific permission. 

437 

438 Args: 

439 check_data: Permission check request 

440 user: Current authenticated user 

441 db: Database session 

442 

443 Returns: 

444 PermissionCheckResponse: Permission check result 

445 

446 Raises: 

447 HTTPException: If permission check fails 

448 

449 Examples: 

450 >>> import asyncio 

451 >>> asyncio.iscoroutinefunction(check_permission) 

452 True 

453 """ 

454 try: 

455 permission_service = PermissionService(db) 

456 granted = await permission_service.check_permission( 

457 user_email=check_data.user_email, 

458 permission=check_data.permission, 

459 resource_type=check_data.resource_type, 

460 resource_id=check_data.resource_id, 

461 team_id=check_data.team_id, 

462 ip_address=user.get("ip_address"), 

463 user_agent=user.get("user_agent"), 

464 ) 

465 

466 db.commit() 

467 db.close() 

468 return PermissionCheckResponse(user_email=check_data.user_email, permission=check_data.permission, granted=granted, checked_at=datetime.now(tz=timezone.utc), checked_by=user["email"]) 

469 

470 except Exception as e: 

471 logger.error(f"Permission check failed: {e}") 

472 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to check permission") 

473 

474 

475@router.get("/permissions/user/{user_email}", response_model=List[str]) 

476@require_permission("admin.security_audit") 

477async def get_user_permissions(user_email: str, team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

478 """Get all effective permissions for a user. 

479 

480 Args: 

481 user_email: User email address 

482 team_id: Optional team context 

483 user: Current authenticated user 

484 db: Database session 

485 

486 Returns: 

487 List[str]: User's effective permissions 

488 

489 Raises: 

490 HTTPException: If retrieving user permissions fails 

491 

492 Examples: 

493 >>> import asyncio 

494 >>> asyncio.iscoroutinefunction(get_user_permissions) 

495 True 

496 """ 

497 try: 

498 permission_service = PermissionService(db) 

499 permissions = await permission_service.get_user_permissions(user_email=user_email, team_id=team_id) 

500 

501 result = sorted(list(permissions)) 

502 db.commit() 

503 db.close() 

504 return result 

505 

506 except Exception as e: 

507 logger.error(f"Failed to get user permissions for {SecurityValidator.sanitize_log_message(user_email)}: {e}") 

508 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user permissions") 

509 

510 

511@router.get("/permissions/available", response_model=PermissionListResponse) 

512async def get_available_permissions(user=Depends(get_current_user_with_permissions)): 

513 """Get all available permissions in the system. 

514 

515 Args: 

516 user: Current authenticated user 

517 

518 Returns: 

519 PermissionListResponse: Available permissions organized by resource type 

520 

521 Raises: 

522 HTTPException: If retrieving available permissions fails 

523 """ 

524 try: 

525 all_permissions = Permissions.get_all_permissions() 

526 permissions_by_resource = Permissions.get_permissions_by_resource() 

527 

528 return PermissionListResponse(all_permissions=all_permissions, permissions_by_resource=permissions_by_resource, total_count=len(all_permissions)) 

529 

530 except Exception as e: 

531 logger.error(f"Failed to get available permissions: {e}") 

532 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve available permissions") 

533 

534 

535# ===== Self-Service Endpoints ===== 

536 

537 

538@router.get("/my/roles", response_model=List[UserRoleResponse]) 

539async def get_my_roles(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

540 """Get current user's role assignments. 

541 

542 Args: 

543 user: Current authenticated user 

544 db: Database session 

545 

546 Returns: 

547 List[UserRoleResponse]: Current user's role assignments 

548 

549 Raises: 

550 HTTPException: If retrieving user roles fails 

551 

552 Examples: 

553 >>> import asyncio 

554 >>> asyncio.iscoroutinefunction(get_my_roles) 

555 True 

556 """ 

557 try: 

558 permission_service = PermissionService(db) 

559 user_roles = await permission_service.get_user_roles(user_email=user["email"], include_expired=False) 

560 

561 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles] 

562 db.commit() 

563 db.close() 

564 return result 

565 

566 except Exception as e: 

567 logger.error(f"Failed to get my roles for {SecurityValidator.sanitize_log_message(user['email'])}: {e}") 

568 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your roles") 

569 

570 

571@router.get("/my/permissions", response_model=List[str]) 

572async def get_my_permissions(team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

573 """Get current user's effective permissions. 

574 

575 Args: 

576 team_id: Optional team context 

577 user: Current authenticated user 

578 db: Database session 

579 

580 Returns: 

581 List[str]: Current user's effective permissions 

582 

583 Raises: 

584 HTTPException: If retrieving user permissions fails 

585 

586 Examples: 

587 >>> import asyncio 

588 >>> asyncio.iscoroutinefunction(get_my_permissions) 

589 True 

590 """ 

591 try: 

592 permission_service = PermissionService(db) 

593 permissions = await permission_service.get_user_permissions(user_email=user["email"], team_id=team_id, token_teams=user.get("token_teams")) 

594 

595 result = sorted(list(permissions)) 

596 db.commit() 

597 db.close() 

598 return result 

599 

600 except Exception as e: 

601 logger.error(f"Failed to get my permissions for {SecurityValidator.sanitize_log_message(user['email'])}: {e}") 

602 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your permissions")