Coverage for mcpgateway / routers / rbac.py: 100%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/rbac.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7RBAC API Router. 

8 

9This module provides REST API endpoints for Role-Based Access Control (RBAC) 

10management including roles, user role assignments, and permission checking. 

11 

12Examples: 

13 >>> from mcpgateway.routers.rbac import router 

14 >>> from fastapi import APIRouter 

15 >>> isinstance(router, APIRouter) 

16 True 

17""" 

18 

19# Standard 

20from datetime import datetime, timezone 

21import logging 

22from typing import Generator, List, Optional 

23 

24# Third-Party 

25from fastapi import APIRouter, Depends, HTTPException, Query, status 

26from sqlalchemy.orm import Session 

27 

28# First-Party 

29from mcpgateway.db import Permissions, SessionLocal 

30from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_admin_permission, require_permission 

31from mcpgateway.schemas import PermissionCheckRequest, PermissionCheckResponse, PermissionListResponse, RoleCreateRequest, RoleResponse, RoleUpdateRequest, UserRoleAssignRequest, UserRoleResponse 

32from mcpgateway.services.permission_service import PermissionService 

33from mcpgateway.services.role_service import RoleService 

34 

35logger = logging.getLogger(__name__) 

36 

37router = APIRouter(prefix="/rbac", tags=["RBAC"]) 

38 

39 

40def get_db() -> Generator[Session, None, None]: 

41 """Get database session for dependency injection. 

42 

43 Commits the transaction on successful completion to avoid implicit rollbacks 

44 for read-only operations. Rolls back explicitly on exception. 

45 

46 Yields: 

47 Session: SQLAlchemy database session 

48 

49 Raises: 

50 Exception: Re-raises any exception after rolling back the transaction. 

51 

52 Examples: 

53 >>> gen = get_db() 

54 >>> db = next(gen) 

55 >>> hasattr(db, 'close') 

56 True 

57 """ 

58 db = SessionLocal() 

59 try: 

60 yield db 

61 db.commit() 

62 except Exception: 

63 try: 

64 db.rollback() 

65 except Exception: 

66 try: 

67 db.invalidate() 

68 except Exception: 

69 pass # nosec B110 - Best effort cleanup on connection failure 

70 raise 

71 finally: 

72 db.close() 

73 

74 

75# ===== Role Management Endpoints ===== 

76 

77 

78@router.post("/roles", response_model=RoleResponse) 

79@require_admin_permission() 

80async def create_role(role_data: RoleCreateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

81 """Create a new role. 

82 

83 Requires admin permissions to create roles. 

84 

85 Args: 

86 role_data: Role creation data 

87 user: Current authenticated user 

88 db: Database session 

89 

90 Returns: 

91 RoleResponse: Created role details 

92 

93 Raises: 

94 HTTPException: If role creation fails 

95 

96 Examples: 

97 >>> import asyncio 

98 >>> asyncio.iscoroutinefunction(create_role) 

99 True 

100 """ 

101 try: 

102 role_service = RoleService(db) 

103 role = await role_service.create_role( 

104 name=role_data.name, 

105 description=role_data.description, 

106 scope=role_data.scope, 

107 permissions=role_data.permissions, 

108 inherits_from=role_data.inherits_from, 

109 created_by=user["email"], 

110 is_system_role=role_data.is_system_role or False, 

111 ) 

112 

113 logger.info(f"Role created: {role.id} by {user['email']}") 

114 db.commit() 

115 db.close() 

116 return RoleResponse.model_validate(role) 

117 

118 except ValueError as e: 

119 logger.error(f"Role creation validation error: {e}") 

120 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

121 except Exception as e: 

122 logger.error(f"Role creation failed: {e}") 

123 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create role") 

124 

125 

126@router.get("/roles", response_model=List[RoleResponse]) 

127@require_permission("admin.user_management") 

128async def list_roles( 

129 scope: Optional[str] = Query(None, description="Filter by scope"), 

130 active_only: bool = Query(True, description="Show only active roles"), 

131 user=Depends(get_current_user_with_permissions), 

132 db: Session = Depends(get_db), 

133): 

134 """List all roles. 

135 

136 Args: 

137 scope: Optional scope filter 

138 active_only: Whether to show only active roles 

139 user: Current authenticated user 

140 db: Database session 

141 

142 Returns: 

143 List[RoleResponse]: List of roles 

144 

145 Raises: 

146 HTTPException: If user lacks required permissions 

147 

148 Examples: 

149 >>> import asyncio 

150 >>> asyncio.iscoroutinefunction(list_roles) 

151 True 

152 """ 

153 try: 

154 role_service = RoleService(db) 

155 roles = await role_service.list_roles(scope=scope) 

156 # Release transaction before response serialization 

157 db.commit() 

158 db.close() 

159 

160 return [RoleResponse.model_validate(role) for role in roles] 

161 

162 except Exception as e: 

163 logger.error(f"Failed to list roles: {e}") 

164 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve roles") 

165 

166 

167@router.get("/roles/{role_id}", response_model=RoleResponse) 

168@require_permission("admin.user_management") 

169async def get_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

170 """Get role details by ID. 

171 

172 Args: 

173 role_id: Role identifier 

174 user: Current authenticated user 

175 db: Database session 

176 

177 Returns: 

178 RoleResponse: Role details 

179 

180 Raises: 

181 HTTPException: If role not found 

182 

183 Examples: 

184 >>> import asyncio 

185 >>> asyncio.iscoroutinefunction(get_role) 

186 True 

187 """ 

188 try: 

189 role_service = RoleService(db) 

190 role = await role_service.get_role_by_id(role_id) 

191 

192 if not role: 

193 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

194 

195 db.commit() 

196 db.close() 

197 return RoleResponse.model_validate(role) 

198 

199 except HTTPException: 

200 raise 

201 except Exception as e: 

202 logger.error(f"Failed to get role {role_id}: {e}") 

203 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve role") 

204 

205 

206@router.put("/roles/{role_id}", response_model=RoleResponse) 

207@require_admin_permission() 

208async def update_role(role_id: str, role_data: RoleUpdateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

209 """Update an existing role. 

210 

211 Args: 

212 role_id: Role identifier 

213 role_data: Role update data 

214 user: Current authenticated user 

215 db: Database session 

216 

217 Returns: 

218 RoleResponse: Updated role details 

219 

220 Raises: 

221 HTTPException: If role not found or update fails 

222 

223 Examples: 

224 >>> import asyncio 

225 >>> asyncio.iscoroutinefunction(update_role) 

226 True 

227 """ 

228 try: 

229 role_service = RoleService(db) 

230 role = await role_service.update_role(role_id, **role_data.model_dump(exclude_unset=True)) 

231 

232 if not role: 

233 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

234 

235 logger.info(f"Role updated: {role_id} by {user['email']}") 

236 db.commit() 

237 db.close() 

238 return RoleResponse.model_validate(role) 

239 

240 except HTTPException: 

241 raise 

242 except ValueError as e: 

243 logger.error(f"Role update validation error: {e}") 

244 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

245 except Exception as e: 

246 logger.error(f"Role update failed: {e}") 

247 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update role") 

248 

249 

250@router.delete("/roles/{role_id}") 

251@require_admin_permission() 

252async def delete_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

253 """Delete a role. 

254 

255 Args: 

256 role_id: Role identifier 

257 user: Current authenticated user 

258 db: Database session 

259 

260 Returns: 

261 dict: Success message 

262 

263 Raises: 

264 HTTPException: If role not found or deletion fails 

265 

266 Examples: 

267 >>> import asyncio 

268 >>> asyncio.iscoroutinefunction(delete_role) 

269 True 

270 """ 

271 try: 

272 role_service = RoleService(db) 

273 success = await role_service.delete_role(role_id) 

274 

275 if not success: 

276 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found") 

277 

278 logger.info(f"Role deleted: {role_id} by {user['email']}") 

279 db.commit() 

280 db.close() 

281 return {"message": "Role deleted successfully"} 

282 

283 except HTTPException: 

284 raise 

285 except Exception as e: 

286 logger.error(f"Role deletion failed: {e}") 

287 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete role") 

288 

289 

290# ===== User Role Assignment Endpoints ===== 

291 

292 

293@router.post("/users/{user_email}/roles", response_model=UserRoleResponse) 

294@require_permission("admin.user_management") 

295async def assign_role_to_user(user_email: str, assignment_data: UserRoleAssignRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

296 """Assign a role to a user. 

297 

298 Args: 

299 user_email: User email address 

300 assignment_data: Role assignment data 

301 user: Current authenticated user 

302 db: Database session 

303 

304 Returns: 

305 UserRoleResponse: Created role assignment 

306 

307 Raises: 

308 HTTPException: If assignment fails 

309 

310 Examples: 

311 >>> import asyncio 

312 >>> asyncio.iscoroutinefunction(assign_role_to_user) 

313 True 

314 """ 

315 try: 

316 role_service = RoleService(db) 

317 user_role = await role_service.assign_role_to_user( 

318 user_email=user_email, role_id=assignment_data.role_id, scope=assignment_data.scope, scope_id=assignment_data.scope_id, granted_by=user["email"], expires_at=assignment_data.expires_at 

319 ) 

320 

321 logger.info(f"Role assigned: {assignment_data.role_id} to {user_email} by {user['email']}") 

322 db.commit() 

323 db.close() 

324 return UserRoleResponse.model_validate(user_role) 

325 

326 except ValueError as e: 

327 logger.error(f"Role assignment validation error: {e}") 

328 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

329 except Exception as e: 

330 logger.error(f"Role assignment failed: {e}") 

331 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to assign role") 

332 

333 

334@router.get("/users/{user_email}/roles", response_model=List[UserRoleResponse]) 

335@require_permission("admin.user_management") 

336async def get_user_roles( 

337 user_email: str, 

338 scope: Optional[str] = Query(None, description="Filter by scope"), 

339 active_only: bool = Query(True, description="Show only active assignments"), 

340 user=Depends(get_current_user_with_permissions), 

341 db: Session = Depends(get_db), 

342): 

343 """Get roles assigned to a user. 

344 

345 Args: 

346 user_email: User email address 

347 scope: Optional scope filter 

348 active_only: Whether to show only active assignments 

349 user: Current authenticated user 

350 db: Database session 

351 

352 Returns: 

353 List[UserRoleResponse]: User's role assignments 

354 

355 Raises: 

356 HTTPException: If role retrieval fails 

357 

358 Examples: 

359 >>> import asyncio 

360 >>> asyncio.iscoroutinefunction(get_user_roles) 

361 True 

362 """ 

363 try: 

364 permission_service = PermissionService(db) 

365 user_roles = await permission_service.get_user_roles(user_email=user_email, scope=scope, include_expired=not active_only) 

366 

367 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles] 

368 db.commit() 

369 db.close() 

370 return result 

371 

372 except Exception as e: 

373 logger.error(f"Failed to get user roles for {user_email}: {e}") 

374 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user roles") 

375 

376 

377@router.delete("/users/{user_email}/roles/{role_id}") 

378@require_permission("admin.user_management") 

379async def revoke_user_role( 

380 user_email: str, 

381 role_id: str, 

382 scope: Optional[str] = Query(None, description="Scope filter"), 

383 scope_id: Optional[str] = Query(None, description="Scope ID filter"), 

384 user=Depends(get_current_user_with_permissions), 

385 db: Session = Depends(get_db), 

386): 

387 """Revoke a role from a user. 

388 

389 Args: 

390 user_email: User email address 

391 role_id: Role identifier 

392 scope: Optional scope filter 

393 scope_id: Optional scope ID filter 

394 user: Current authenticated user 

395 db: Database session 

396 

397 Returns: 

398 dict: Success message 

399 

400 Raises: 

401 HTTPException: If revocation fails 

402 

403 Examples: 

404 >>> import asyncio 

405 >>> asyncio.iscoroutinefunction(revoke_user_role) 

406 True 

407 """ 

408 try: 

409 role_service = RoleService(db) 

410 success = await role_service.revoke_role_from_user(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id) 

411 

412 if not success: 

413 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role assignment not found") 

414 

415 logger.info(f"Role revoked: {role_id} from {user_email} by {user['email']}") 

416 db.commit() 

417 db.close() 

418 return {"message": "Role revoked successfully"} 

419 

420 except HTTPException: 

421 raise 

422 except Exception as e: 

423 logger.error(f"Role revocation failed: {e}") 

424 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to revoke role") 

425 

426 

427# ===== Permission Checking Endpoints ===== 

428 

429 

430@router.post("/permissions/check", response_model=PermissionCheckResponse) 

431@require_permission("admin.security_audit") 

432async def check_permission(check_data: PermissionCheckRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

433 """Check if a user has specific permission. 

434 

435 Args: 

436 check_data: Permission check request 

437 user: Current authenticated user 

438 db: Database session 

439 

440 Returns: 

441 PermissionCheckResponse: Permission check result 

442 

443 Raises: 

444 HTTPException: If permission check fails 

445 

446 Examples: 

447 >>> import asyncio 

448 >>> asyncio.iscoroutinefunction(check_permission) 

449 True 

450 """ 

451 try: 

452 permission_service = PermissionService(db) 

453 granted = await permission_service.check_permission( 

454 user_email=check_data.user_email, 

455 permission=check_data.permission, 

456 resource_type=check_data.resource_type, 

457 resource_id=check_data.resource_id, 

458 team_id=check_data.team_id, 

459 ip_address=user.get("ip_address"), 

460 user_agent=user.get("user_agent"), 

461 ) 

462 

463 db.commit() 

464 db.close() 

465 return PermissionCheckResponse(user_email=check_data.user_email, permission=check_data.permission, granted=granted, checked_at=datetime.now(tz=timezone.utc), checked_by=user["email"]) 

466 

467 except Exception as e: 

468 logger.error(f"Permission check failed: {e}") 

469 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to check permission") 

470 

471 

472@router.get("/permissions/user/{user_email}", response_model=List[str]) 

473@require_permission("admin.security_audit") 

474async def get_user_permissions(user_email: str, team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

475 """Get all effective permissions for a user. 

476 

477 Args: 

478 user_email: User email address 

479 team_id: Optional team context 

480 user: Current authenticated user 

481 db: Database session 

482 

483 Returns: 

484 List[str]: User's effective permissions 

485 

486 Raises: 

487 HTTPException: If retrieving user permissions fails 

488 

489 Examples: 

490 >>> import asyncio 

491 >>> asyncio.iscoroutinefunction(get_user_permissions) 

492 True 

493 """ 

494 try: 

495 permission_service = PermissionService(db) 

496 permissions = await permission_service.get_user_permissions(user_email=user_email, team_id=team_id) 

497 

498 result = sorted(list(permissions)) 

499 db.commit() 

500 db.close() 

501 return result 

502 

503 except Exception as e: 

504 logger.error(f"Failed to get user permissions for {user_email}: {e}") 

505 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user permissions") 

506 

507 

508@router.get("/permissions/available", response_model=PermissionListResponse) 

509async def get_available_permissions(user=Depends(get_current_user_with_permissions)): 

510 """Get all available permissions in the system. 

511 

512 Args: 

513 user: Current authenticated user 

514 

515 Returns: 

516 PermissionListResponse: Available permissions organized by resource type 

517 

518 Raises: 

519 HTTPException: If retrieving available permissions fails 

520 """ 

521 try: 

522 all_permissions = Permissions.get_all_permissions() 

523 permissions_by_resource = Permissions.get_permissions_by_resource() 

524 

525 return PermissionListResponse(all_permissions=all_permissions, permissions_by_resource=permissions_by_resource, total_count=len(all_permissions)) 

526 

527 except Exception as e: 

528 logger.error(f"Failed to get available permissions: {e}") 

529 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve available permissions") 

530 

531 

532# ===== Self-Service Endpoints ===== 

533 

534 

535@router.get("/my/roles", response_model=List[UserRoleResponse]) 

536async def get_my_roles(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

537 """Get current user's role assignments. 

538 

539 Args: 

540 user: Current authenticated user 

541 db: Database session 

542 

543 Returns: 

544 List[UserRoleResponse]: Current user's role assignments 

545 

546 Raises: 

547 HTTPException: If retrieving user roles fails 

548 

549 Examples: 

550 >>> import asyncio 

551 >>> asyncio.iscoroutinefunction(get_my_roles) 

552 True 

553 """ 

554 try: 

555 permission_service = PermissionService(db) 

556 user_roles = await permission_service.get_user_roles(user_email=user["email"], include_expired=False) 

557 

558 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles] 

559 db.commit() 

560 db.close() 

561 return result 

562 

563 except Exception as e: 

564 logger.error(f"Failed to get my roles for {user['email']}: {e}") 

565 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your roles") 

566 

567 

568@router.get("/my/permissions", response_model=List[str]) 

569async def get_my_permissions(team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): 

570 """Get current user's effective permissions. 

571 

572 Args: 

573 team_id: Optional team context 

574 user: Current authenticated user 

575 db: Database session 

576 

577 Returns: 

578 List[str]: Current user's effective permissions 

579 

580 Raises: 

581 HTTPException: If retrieving user permissions fails 

582 

583 Examples: 

584 >>> import asyncio 

585 >>> asyncio.iscoroutinefunction(get_my_permissions) 

586 True 

587 """ 

588 try: 

589 permission_service = PermissionService(db) 

590 permissions = await permission_service.get_user_permissions(user_email=user["email"], team_id=team_id) 

591 

592 result = sorted(list(permissions)) 

593 db.commit() 

594 db.close() 

595 return result 

596 

597 except Exception as e: 

598 logger.error(f"Failed to get my permissions for {user['email']}: {e}") 

599 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your permissions")