Coverage for mcpgateway / routers / rbac.py: 100%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/rbac.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7RBAC API Router. 

8 

9This module provides REST API endpoints for Role-Based Access Control (RBAC) 

10management including roles, user role assignments, and permission checking. 

11 

12Examples: 

13 >>> from mcpgateway.routers.rbac import router 

14 >>> from fastapi import APIRouter 

15 >>> isinstance(router, APIRouter) 

16 True 

17""" 

18 

19# Standard 

20from datetime import datetime, timezone 

21import logging 

22from typing import Generator, List, Optional 

23 

24# Third-Party 

25from fastapi import APIRouter, Depends, HTTPException, Query, status 

26from sqlalchemy.orm import Session 

27 

28# First-Party 

29from mcpgateway.db import Permissions, SessionLocal 

30from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_admin_permission, require_permission 

31from mcpgateway.schemas import PermissionCheckRequest, PermissionCheckResponse, PermissionListResponse, RoleCreateRequest, RoleResponse, RoleUpdateRequest, UserRoleAssignRequest, UserRoleResponse 

32from mcpgateway.services.permission_service import PermissionService 

33from mcpgateway.services.role_service import RoleService 

34 

35logger = logging.getLogger(__name__) 

36 

37router = APIRouter(prefix="/rbac", tags=["RBAC"]) 

38 

39 

40def get_db() -> Generator[Session, None, None]: 

41 """Get database session for dependency injection. 

42 

43 Commits the transaction on successful completion to avoid implicit rollbacks 

44 for read-only operations. Rolls back explicitly on exception. 

45 

46 Yields: 

47 Session: SQLAlchemy database session 

48 

49 Raises: 

50 Exception: Re-raises any exception after rolling back the transaction. 

51 

52 Examples: 

53 >>> gen = get_db() 

54 >>> db = next(gen) 

55 >>> hasattr(db, 'close') 

56 True 

57 """ 

58 db = SessionLocal() 

59 try: 

60 yield db 

61 db.commit() 

62 except Exception: 

63 try: 

64 db.rollback() 

65 except Exception: 

66 try: 

67 db.invalidate() 

68 except Exception: 

69 pass # nosec B110 - Best effort cleanup on connection failure 

70 raise 

71 finally: 

72 db.close() 

73 

74 

75# ===== Role Management Endpoints ===== 

76 

77 

78@router.post("/roles", response_model=RoleResponse) 

79@require_admin_permission() 

80async def create_role(role_data: RoleCreateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

81 """Create a new role. 

82 

83 Requires admin permissions to create roles. 

84 

85 Args: 

86 role_data: Role creation data 

87 user: Current authenticated user 

88 db: Database session 

89 

90 Returns: 

91 RoleResponse: Created role details 

92 

93 Raises: 

94 HTTPException: If role creation fails 

95 

96 Examples: 

97 >>> import asyncio 

98 >>> asyncio.iscoroutinefunction(create_role) 

99 True 

100 """ 

101 try: 

102 role_service = RoleService(db) 

103 role = await role_service.create_role( 

104 name=role_data.name, 

105 description=role_data.description, 

106 scope=role_data.scope, 

107 permissions=role_data.permissions, 

108 inherits_from=role_data.inherits_from, 

109 created_by=user["email"], 

110 is_system_role=role_data.is_system_role or False, 

111 ) 

112 

113 logger.info(f"Role created: {role.id} by {user['email']}") 

114 db.commit() 

115 db.close() 

116 return RoleResponse.model_validate(role) 

117 

118 except ValueError as e: 

119 logger.error(f"Role creation validation error: {e}") 

120 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

121 except Exception as e: 

122 logger.error(f"Role creation failed: {e}") 

123 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create role") 

124 

125 

126@router.get("/roles", response_model=List[RoleResponse]) 

127@require_permission("admin.user_management") 

128async def list_roles( 

129 scope: Optional[str] = Query(None, description="Filter by scope"), 

130 active_only: bool = Query(True, description="Show only active roles"), 

131 user=Depends(get_current_user_with_permissions), 

132 db: Session = Depends(get_db), 

133): 

134 """List all roles. 

135 

136 Args: 

137 scope: Optional scope filter 

138 active_only: Whether to show only active roles 

139 user: Current authenticated user 

140 db: Database session 

141 

142 Returns: 

143 List[RoleResponse]: List of roles 

144 

145 Raises: 

146 HTTPException: If user lacks required permissions 

147 

148 Examples: 

149 >>> import asyncio 

150 >>> asyncio.iscoroutinefunction(list_roles) 

151 True 

152 """ 

153 try: 

154 role_service = RoleService(db) 

155 roles = await role_service.list_roles(scope=scope) 

156 # Release transaction before response serialization 

157 db.commit() 

158 db.close() 

159 

160 return [RoleResponse.model_validate(role) for role in roles] 

161 

162 except Exception as e: 

163 logger.error(f"Failed to list roles: {e}") 

164 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve roles") 

165 

166 

167@router.get("/roles/{role_id}", response_model=RoleResponse) 

168@require_permission("admin.user_management") 

169async def get_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

170 """Get role details by ID. 

171 

172 Args: 

173 role_id: Role identifier 

174 user: Current authenticated user 

175 db: Database session 

176 

177 Returns: 

178 RoleResponse: Role details 

179 

180 Raises: 

181 HTTPException: If role not found 

182 

183 Examples: 

184 >>> import asyncio 

185 >>> asyncio.iscoroutinefunction(get_role) 

186 True 

187 """ 

188 try: 

189 role_service = RoleService(db) 

190 role = await role_service.get_role_by_id(role_id) 

191 

192 if not role: 

193 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

194 

195 db.commit() 

196 db.close() 

197 return RoleResponse.model_validate(role) 

198 

199 except HTTPException: 

200 raise 

201 except Exception as e: 

202 logger.error(f"Failed to get role {role_id}: {e}") 

203 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve role") 

204 

205 

206@router.put("/roles/{role_id}", response_model=RoleResponse) 

207@require_admin_permission() 

208async def update_role(role_id: str, role_data: RoleUpdateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

209 """Update an existing role. 

210 

211 Args: 

212 role_id: Role identifier 

213 role_data: Role update data 

214 user: Current authenticated user 

215 db: Database session 

216 

217 Returns: 

218 RoleResponse: Updated role details 

219 

220 Raises: 

221 HTTPException: If role not found or update fails 

222 

223 Examples: 

224 >>> import asyncio 

225 >>> asyncio.iscoroutinefunction(update_role) 

226 True 

227 """ 

228 try: 

229 role_service = RoleService(db) 

230 role = await role_service.update_role(role_id, **role_data.model_dump(exclude_unset=True)) 

231 

232 if not role: 

233 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

234 

235 logger.info(f"Role updated: {role_id} by {user['email']}") 

236 db.commit() 

237 db.close() 

238 return RoleResponse.model_validate(role) 

239 

240 except HTTPException: 

241 raise 

242 except ValueError as e: 

243 logger.error(f"Role update validation error: {e}") 

244 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

245 except Exception as e: 

246 logger.error(f"Role update failed: {e}") 

247 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update role") 

248 

249 

250@router.delete("/roles/{role_id}") 

251@require_admin_permission() 

252async def delete_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

253 """Delete a role. 

254 

255 Args: 

256 role_id: Role identifier 

257 user: Current authenticated user 

258 db: Database session 

259 

260 Returns: 

261 dict: Success message 

262 

263 Raises: 

264 HTTPException: If role not found or deletion fails 

265 

266 Examples: 

267 >>> import asyncio 

268 >>> asyncio.iscoroutinefunction(delete_role) 

269 True 

270 """ 

271 try: 

272 role_service = RoleService(db) 

273 success = await role_service.delete_role(role_id) 

274 

275 if not success: 

276 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

277 

278 logger.info(f"Role deleted: {role_id} by {user['email']}") 

279 db.commit() 

280 db.close() 

281 return {"message": "Role deleted successfully"} 

282 

283 except HTTPException: 

284 raise 

285 except ValueError as e: 

286 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

287 except Exception as e: 

288 logger.error(f"Role deletion failed: {e}") 

289 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete role") 

290 

291 

292# ===== User Role Assignment Endpoints ===== 

293 

294 

295@router.post("/users/{user_email}/roles", response_model=UserRoleResponse) 

296@require_permission("admin.user_management") 

297async def assign_role_to_user(user_email: str, assignment_data: UserRoleAssignRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

298 """Assign a role to a user. 

299 

300 Args: 

301 user_email: User email address 

302 assignment_data: Role assignment data 

303 user: Current authenticated user 

304 db: Database session 

305 

306 Returns: 

307 UserRoleResponse: Created role assignment 

308 

309 Raises: 

310 HTTPException: If assignment fails 

311 

312 Examples: 

313 >>> import asyncio 

314 >>> asyncio.iscoroutinefunction(assign_role_to_user) 

315 True 

316 """ 

317 try: 

318 role_service = RoleService(db) 

319 user_role = await role_service.assign_role_to_user( 

320 user_email=user_email, role_id=assignment_data.role_id, scope=assignment_data.scope, scope_id=assignment_data.scope_id, granted_by=user["email"], expires_at=assignment_data.expires_at 

321 ) 

322 

323 logger.info(f"Role assigned: {assignment_data.role_id} to {user_email} by {user['email']}") 

324 db.commit() 

325 db.close() 

326 return UserRoleResponse.model_validate(user_role) 

327 

328 except ValueError as e: 

329 logger.error(f"Role assignment validation error: {e}") 

330 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

331 except Exception as e: 

332 logger.error(f"Role assignment failed: {e}") 

333 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to assign role") 

334 

335 

336@router.get("/users/{user_email}/roles", response_model=List[UserRoleResponse]) 

337@require_permission("admin.user_management") 

338async def get_user_roles( 

339 user_email: str, 

340 scope: Optional[str] = Query(None, description="Filter by scope"), 

341 active_only: bool = Query(True, description="Show only active assignments"), 

342 user=Depends(get_current_user_with_permissions), 

343 db: Session = Depends(get_db), 

344): 

345 """Get roles assigned to a user. 

346 

347 Args: 

348 user_email: User email address 

349 scope: Optional scope filter 

350 active_only: Whether to show only active assignments 

351 user: Current authenticated user 

352 db: Database session 

353 

354 Returns: 

355 List[UserRoleResponse]: User's role assignments 

356 

357 Raises: 

358 HTTPException: If role retrieval fails 

359 

360 Examples: 

361 >>> import asyncio 

362 >>> asyncio.iscoroutinefunction(get_user_roles) 

363 True 

364 """ 

365 try: 

366 permission_service = PermissionService(db) 

367 user_roles = await permission_service.get_user_roles(user_email=user_email, scope=scope, include_expired=not active_only) 

368 

369 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles] 

370 db.commit() 

371 db.close() 

372 return result 

373 

374 except Exception as e: 

375 logger.error(f"Failed to get user roles for {user_email}: {e}") 

376 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user roles") 

377 

378 

379@router.delete("/users/{user_email}/roles/{role_id}") 

380@require_permission("admin.user_management") 

381async def revoke_user_role( 

382 user_email: str, 

383 role_id: str, 

384 scope: Optional[str] = Query(None, description="Scope filter"), 

385 scope_id: Optional[str] = Query(None, description="Scope ID filter"), 

386 user=Depends(get_current_user_with_permissions), 

387 db: Session = Depends(get_db), 

388): 

389 """Revoke a role from a user. 

390 

391 Args: 

392 user_email: User email address 

393 role_id: Role identifier 

394 scope: Optional scope filter 

395 scope_id: Optional scope ID filter 

396 user: Current authenticated user 

397 db: Database session 

398 

399 Returns: 

400 dict: Success message 

401 

402 Raises: 

403 HTTPException: If revocation fails 

404 

405 Examples: 

406 >>> import asyncio 

407 >>> asyncio.iscoroutinefunction(revoke_user_role) 

408 True 

409 """ 

410 try: 

411 role_service = RoleService(db) 

412 success = await role_service.revoke_role_from_user(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id) 

413 

414 if not success: 

415 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role assignment not found") 

416 

417 logger.info(f"Role revoked: {role_id} from {user_email} by {user['email']}") 

418 db.commit() 

419 db.close() 

420 return {"message": "Role revoked successfully"} 

421 

422 except HTTPException: 

423 raise 

424 except Exception as e: 

425 logger.error(f"Role revocation failed: {e}") 

426 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to revoke role") 

427 

428 

429# ===== Permission Checking Endpoints ===== 

430 

431 

432@router.post("/permissions/check", response_model=PermissionCheckResponse) 

433@require_permission("admin.security_audit") 

434async def check_permission(check_data: PermissionCheckRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

435 """Check if a user has specific permission. 

436 

437 Args: 

438 check_data: Permission check request 

439 user: Current authenticated user 

440 db: Database session 

441 

442 Returns: 

443 PermissionCheckResponse: Permission check result 

444 

445 Raises: 

446 HTTPException: If permission check fails 

447 

448 Examples: 

449 >>> import asyncio 

450 >>> asyncio.iscoroutinefunction(check_permission) 

451 True 

452 """ 

453 try: 

454 permission_service = PermissionService(db) 

455 granted = await permission_service.check_permission( 

456 user_email=check_data.user_email, 

457 permission=check_data.permission, 

458 resource_type=check_data.resource_type, 

459 resource_id=check_data.resource_id, 

460 team_id=check_data.team_id, 

461 ip_address=user.get("ip_address"), 

462 user_agent=user.get("user_agent"), 

463 ) 

464 

465 db.commit() 

466 db.close() 

467 return PermissionCheckResponse(user_email=check_data.user_email, permission=check_data.permission, granted=granted, checked_at=datetime.now(tz=timezone.utc), checked_by=user["email"]) 

468 

469 except Exception as e: 

470 logger.error(f"Permission check failed: {e}") 

471 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to check permission") 

472 

473 

474@router.get("/permissions/user/{user_email}", response_model=List[str]) 

475@require_permission("admin.security_audit") 

476async def get_user_permissions(user_email: str, team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

477 """Get all effective permissions for a user. 

478 

479 Args: 

480 user_email: User email address 

481 team_id: Optional team context 

482 user: Current authenticated user 

483 db: Database session 

484 

485 Returns: 

486 List[str]: User's effective permissions 

487 

488 Raises: 

489 HTTPException: If retrieving user permissions fails 

490 

491 Examples: 

492 >>> import asyncio 

493 >>> asyncio.iscoroutinefunction(get_user_permissions) 

494 True 

495 """ 

496 try: 

497 permission_service = PermissionService(db) 

498 permissions = await permission_service.get_user_permissions(user_email=user_email, team_id=team_id) 

499 

500 result = sorted(list(permissions)) 

501 db.commit() 

502 db.close() 

503 return result 

504 

505 except Exception as e: 

506 logger.error(f"Failed to get user permissions for {user_email}: {e}") 

507 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user permissions") 

508 

509 

510@router.get("/permissions/available", response_model=PermissionListResponse) 

511async def get_available_permissions(user=Depends(get_current_user_with_permissions)): 

512 """Get all available permissions in the system. 

513 

514 Args: 

515 user: Current authenticated user 

516 

517 Returns: 

518 PermissionListResponse: Available permissions organized by resource type 

519 

520 Raises: 

521 HTTPException: If retrieving available permissions fails 

522 """ 

523 try: 

524 all_permissions = Permissions.get_all_permissions() 

525 permissions_by_resource = Permissions.get_permissions_by_resource() 

526 

527 return PermissionListResponse(all_permissions=all_permissions, permissions_by_resource=permissions_by_resource, total_count=len(all_permissions)) 

528 

529 except Exception as e: 

530 logger.error(f"Failed to get available permissions: {e}") 

531 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve available permissions") 

532 

533 

534# ===== Self-Service Endpoints ===== 

535 

536 

537@router.get("/my/roles", response_model=List[UserRoleResponse]) 

538async def get_my_roles(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

539 """Get current user's role assignments. 

540 

541 Args: 

542 user: Current authenticated user 

543 db: Database session 

544 

545 Returns: 

546 List[UserRoleResponse]: Current user's role assignments 

547 

548 Raises: 

549 HTTPException: If retrieving user roles fails 

550 

551 Examples: 

552 >>> import asyncio 

553 >>> asyncio.iscoroutinefunction(get_my_roles) 

554 True 

555 """ 

556 try: 

557 permission_service = PermissionService(db) 

558 user_roles = await permission_service.get_user_roles(user_email=user["email"], include_expired=False) 

559 

560 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles] 

561 db.commit() 

562 db.close() 

563 return result 

564 

565 except Exception as e: 

566 logger.error(f"Failed to get my roles for {user['email']}: {e}") 

567 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your roles") 

568 

569 

570@router.get("/my/permissions", response_model=List[str]) 

571async def get_my_permissions(team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

572 """Get current user's effective permissions. 

573 

574 Args: 

575 team_id: Optional team context 

576 user: Current authenticated user 

577 db: Database session 

578 

579 Returns: 

580 List[str]: Current user's effective permissions 

581 

582 Raises: 

583 HTTPException: If retrieving user permissions fails 

584 

585 Examples: 

586 >>> import asyncio 

587 >>> asyncio.iscoroutinefunction(get_my_permissions) 

588 True 

589 """ 

590 try: 

591 permission_service = PermissionService(db) 

592 permissions = await permission_service.get_user_permissions(user_email=user["email"], team_id=team_id) 

593 

594 result = sorted(list(permissions)) 

595 db.commit() 

596 db.close() 

597 return result 

598 

599 except Exception as e: 

600 logger.error(f"Failed to get my permissions for {user['email']}: {e}") 

601 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your permissions")