Coverage for mcpgateway / routers / rbac.py: 100%
214 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/rbac.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7RBAC API Router.
9This module provides REST API endpoints for Role-Based Access Control (RBAC)
10management including roles, user role assignments, and permission checking.
12Examples:
13 >>> from mcpgateway.routers.rbac import router
14 >>> from fastapi import APIRouter
15 >>> isinstance(router, APIRouter)
16 True
17"""
19# Standard
20from datetime import datetime, timezone
21import logging
22from typing import Generator, List, Optional
24# Third-Party
25from fastapi import APIRouter, Depends, HTTPException, Query, status
26from sqlalchemy.orm import Session
28# First-Party
29from mcpgateway.db import Permissions, SessionLocal
30from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_admin_permission, require_permission
31from mcpgateway.schemas import PermissionCheckRequest, PermissionCheckResponse, PermissionListResponse, RoleCreateRequest, RoleResponse, RoleUpdateRequest, UserRoleAssignRequest, UserRoleResponse
32from mcpgateway.services.permission_service import PermissionService
33from mcpgateway.services.role_service import RoleService
35logger = logging.getLogger(__name__)
37router = APIRouter(prefix="/rbac", tags=["RBAC"])
40def get_db() -> Generator[Session, None, None]:
41 """Get database session for dependency injection.
43 Commits the transaction on successful completion to avoid implicit rollbacks
44 for read-only operations. Rolls back explicitly on exception.
46 Yields:
47 Session: SQLAlchemy database session
49 Raises:
50 Exception: Re-raises any exception after rolling back the transaction.
52 Examples:
53 >>> gen = get_db()
54 >>> db = next(gen)
55 >>> hasattr(db, 'close')
56 True
57 """
58 db = SessionLocal()
59 try:
60 yield db
61 db.commit()
62 except Exception:
63 try:
64 db.rollback()
65 except Exception:
66 try:
67 db.invalidate()
68 except Exception:
69 pass # nosec B110 - Best effort cleanup on connection failure
70 raise
71 finally:
72 db.close()
75# ===== Role Management Endpoints =====
78@router.post("/roles", response_model=RoleResponse)
79@require_admin_permission()
80async def create_role(role_data: RoleCreateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
81 """Create a new role.
83 Requires admin permissions to create roles.
85 Args:
86 role_data: Role creation data
87 user: Current authenticated user
88 db: Database session
90 Returns:
91 RoleResponse: Created role details
93 Raises:
94 HTTPException: If role creation fails
96 Examples:
97 >>> import asyncio
98 >>> asyncio.iscoroutinefunction(create_role)
99 True
100 """
101 try:
102 role_service = RoleService(db)
103 role = await role_service.create_role(
104 name=role_data.name,
105 description=role_data.description,
106 scope=role_data.scope,
107 permissions=role_data.permissions,
108 inherits_from=role_data.inherits_from,
109 created_by=user["email"],
110 is_system_role=role_data.is_system_role or False,
111 )
113 logger.info(f"Role created: {role.id} by {user['email']}")
114 db.commit()
115 db.close()
116 return RoleResponse.model_validate(role)
118 except ValueError as e:
119 logger.error(f"Role creation validation error: {e}")
120 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
121 except Exception as e:
122 logger.error(f"Role creation failed: {e}")
123 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create role")
126@router.get("/roles", response_model=List[RoleResponse])
127@require_permission("admin.user_management")
128async def list_roles(
129 scope: Optional[str] = Query(None, description="Filter by scope"),
130 active_only: bool = Query(True, description="Show only active roles"),
131 user=Depends(get_current_user_with_permissions),
132 db: Session = Depends(get_db),
133):
134 """List all roles.
136 Args:
137 scope: Optional scope filter
138 active_only: Whether to show only active roles
139 user: Current authenticated user
140 db: Database session
142 Returns:
143 List[RoleResponse]: List of roles
145 Raises:
146 HTTPException: If user lacks required permissions
148 Examples:
149 >>> import asyncio
150 >>> asyncio.iscoroutinefunction(list_roles)
151 True
152 """
153 try:
154 role_service = RoleService(db)
155 roles = await role_service.list_roles(scope=scope)
156 # Release transaction before response serialization
157 db.commit()
158 db.close()
160 return [RoleResponse.model_validate(role) for role in roles]
162 except Exception as e:
163 logger.error(f"Failed to list roles: {e}")
164 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve roles")
167@router.get("/roles/{role_id}", response_model=RoleResponse)
168@require_permission("admin.user_management")
169async def get_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
170 """Get role details by ID.
172 Args:
173 role_id: Role identifier
174 user: Current authenticated user
175 db: Database session
177 Returns:
178 RoleResponse: Role details
180 Raises:
181 HTTPException: If role not found
183 Examples:
184 >>> import asyncio
185 >>> asyncio.iscoroutinefunction(get_role)
186 True
187 """
188 try:
189 role_service = RoleService(db)
190 role = await role_service.get_role_by_id(role_id)
192 if not role:
193 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
195 db.commit()
196 db.close()
197 return RoleResponse.model_validate(role)
199 except HTTPException:
200 raise
201 except Exception as e:
202 logger.error(f"Failed to get role {role_id}: {e}")
203 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve role")
206@router.put("/roles/{role_id}", response_model=RoleResponse)
207@require_admin_permission()
208async def update_role(role_id: str, role_data: RoleUpdateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
209 """Update an existing role.
211 Args:
212 role_id: Role identifier
213 role_data: Role update data
214 user: Current authenticated user
215 db: Database session
217 Returns:
218 RoleResponse: Updated role details
220 Raises:
221 HTTPException: If role not found or update fails
223 Examples:
224 >>> import asyncio
225 >>> asyncio.iscoroutinefunction(update_role)
226 True
227 """
228 try:
229 role_service = RoleService(db)
230 role = await role_service.update_role(role_id, **role_data.model_dump(exclude_unset=True))
232 if not role:
233 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
235 logger.info(f"Role updated: {role_id} by {user['email']}")
236 db.commit()
237 db.close()
238 return RoleResponse.model_validate(role)
240 except HTTPException:
241 raise
242 except ValueError as e:
243 logger.error(f"Role update validation error: {e}")
244 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
245 except Exception as e:
246 logger.error(f"Role update failed: {e}")
247 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update role")
250@router.delete("/roles/{role_id}")
251@require_admin_permission()
252async def delete_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
253 """Delete a role.
255 Args:
256 role_id: Role identifier
257 user: Current authenticated user
258 db: Database session
260 Returns:
261 dict: Success message
263 Raises:
264 HTTPException: If role not found or deletion fails
266 Examples:
267 >>> import asyncio
268 >>> asyncio.iscoroutinefunction(delete_role)
269 True
270 """
271 try:
272 role_service = RoleService(db)
273 success = await role_service.delete_role(role_id)
275 if not success:
276 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
278 logger.info(f"Role deleted: {role_id} by {user['email']}")
279 db.commit()
280 db.close()
281 return {"message": "Role deleted successfully"}
283 except HTTPException:
284 raise
285 except ValueError as e:
286 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
287 except Exception as e:
288 logger.error(f"Role deletion failed: {e}")
289 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete role")
292# ===== User Role Assignment Endpoints =====
295@router.post("/users/{user_email}/roles", response_model=UserRoleResponse)
296@require_permission("admin.user_management")
297async def assign_role_to_user(user_email: str, assignment_data: UserRoleAssignRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
298 """Assign a role to a user.
300 Args:
301 user_email: User email address
302 assignment_data: Role assignment data
303 user: Current authenticated user
304 db: Database session
306 Returns:
307 UserRoleResponse: Created role assignment
309 Raises:
310 HTTPException: If assignment fails
312 Examples:
313 >>> import asyncio
314 >>> asyncio.iscoroutinefunction(assign_role_to_user)
315 True
316 """
317 try:
318 role_service = RoleService(db)
319 user_role = await role_service.assign_role_to_user(
320 user_email=user_email, role_id=assignment_data.role_id, scope=assignment_data.scope, scope_id=assignment_data.scope_id, granted_by=user["email"], expires_at=assignment_data.expires_at
321 )
323 logger.info(f"Role assigned: {assignment_data.role_id} to {user_email} by {user['email']}")
324 db.commit()
325 db.close()
326 return UserRoleResponse.model_validate(user_role)
328 except ValueError as e:
329 logger.error(f"Role assignment validation error: {e}")
330 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
331 except Exception as e:
332 logger.error(f"Role assignment failed: {e}")
333 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to assign role")
336@router.get("/users/{user_email}/roles", response_model=List[UserRoleResponse])
337@require_permission("admin.user_management")
338async def get_user_roles(
339 user_email: str,
340 scope: Optional[str] = Query(None, description="Filter by scope"),
341 active_only: bool = Query(True, description="Show only active assignments"),
342 user=Depends(get_current_user_with_permissions),
343 db: Session = Depends(get_db),
344):
345 """Get roles assigned to a user.
347 Args:
348 user_email: User email address
349 scope: Optional scope filter
350 active_only: Whether to show only active assignments
351 user: Current authenticated user
352 db: Database session
354 Returns:
355 List[UserRoleResponse]: User's role assignments
357 Raises:
358 HTTPException: If role retrieval fails
360 Examples:
361 >>> import asyncio
362 >>> asyncio.iscoroutinefunction(get_user_roles)
363 True
364 """
365 try:
366 permission_service = PermissionService(db)
367 user_roles = await permission_service.get_user_roles(user_email=user_email, scope=scope, include_expired=not active_only)
369 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles]
370 db.commit()
371 db.close()
372 return result
374 except Exception as e:
375 logger.error(f"Failed to get user roles for {user_email}: {e}")
376 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user roles")
379@router.delete("/users/{user_email}/roles/{role_id}")
380@require_permission("admin.user_management")
381async def revoke_user_role(
382 user_email: str,
383 role_id: str,
384 scope: Optional[str] = Query(None, description="Scope filter"),
385 scope_id: Optional[str] = Query(None, description="Scope ID filter"),
386 user=Depends(get_current_user_with_permissions),
387 db: Session = Depends(get_db),
388):
389 """Revoke a role from a user.
391 Args:
392 user_email: User email address
393 role_id: Role identifier
394 scope: Optional scope filter
395 scope_id: Optional scope ID filter
396 user: Current authenticated user
397 db: Database session
399 Returns:
400 dict: Success message
402 Raises:
403 HTTPException: If revocation fails
405 Examples:
406 >>> import asyncio
407 >>> asyncio.iscoroutinefunction(revoke_user_role)
408 True
409 """
410 try:
411 role_service = RoleService(db)
412 success = await role_service.revoke_role_from_user(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id)
414 if not success:
415 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role assignment not found")
417 logger.info(f"Role revoked: {role_id} from {user_email} by {user['email']}")
418 db.commit()
419 db.close()
420 return {"message": "Role revoked successfully"}
422 except HTTPException:
423 raise
424 except Exception as e:
425 logger.error(f"Role revocation failed: {e}")
426 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to revoke role")
429# ===== Permission Checking Endpoints =====
432@router.post("/permissions/check", response_model=PermissionCheckResponse)
433@require_permission("admin.security_audit")
434async def check_permission(check_data: PermissionCheckRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
435 """Check if a user has specific permission.
437 Args:
438 check_data: Permission check request
439 user: Current authenticated user
440 db: Database session
442 Returns:
443 PermissionCheckResponse: Permission check result
445 Raises:
446 HTTPException: If permission check fails
448 Examples:
449 >>> import asyncio
450 >>> asyncio.iscoroutinefunction(check_permission)
451 True
452 """
453 try:
454 permission_service = PermissionService(db)
455 granted = await permission_service.check_permission(
456 user_email=check_data.user_email,
457 permission=check_data.permission,
458 resource_type=check_data.resource_type,
459 resource_id=check_data.resource_id,
460 team_id=check_data.team_id,
461 ip_address=user.get("ip_address"),
462 user_agent=user.get("user_agent"),
463 )
465 db.commit()
466 db.close()
467 return PermissionCheckResponse(user_email=check_data.user_email, permission=check_data.permission, granted=granted, checked_at=datetime.now(tz=timezone.utc), checked_by=user["email"])
469 except Exception as e:
470 logger.error(f"Permission check failed: {e}")
471 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to check permission")
474@router.get("/permissions/user/{user_email}", response_model=List[str])
475@require_permission("admin.security_audit")
476async def get_user_permissions(user_email: str, team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
477 """Get all effective permissions for a user.
479 Args:
480 user_email: User email address
481 team_id: Optional team context
482 user: Current authenticated user
483 db: Database session
485 Returns:
486 List[str]: User's effective permissions
488 Raises:
489 HTTPException: If retrieving user permissions fails
491 Examples:
492 >>> import asyncio
493 >>> asyncio.iscoroutinefunction(get_user_permissions)
494 True
495 """
496 try:
497 permission_service = PermissionService(db)
498 permissions = await permission_service.get_user_permissions(user_email=user_email, team_id=team_id)
500 result = sorted(list(permissions))
501 db.commit()
502 db.close()
503 return result
505 except Exception as e:
506 logger.error(f"Failed to get user permissions for {user_email}: {e}")
507 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user permissions")
510@router.get("/permissions/available", response_model=PermissionListResponse)
511async def get_available_permissions(user=Depends(get_current_user_with_permissions)):
512 """Get all available permissions in the system.
514 Args:
515 user: Current authenticated user
517 Returns:
518 PermissionListResponse: Available permissions organized by resource type
520 Raises:
521 HTTPException: If retrieving available permissions fails
522 """
523 try:
524 all_permissions = Permissions.get_all_permissions()
525 permissions_by_resource = Permissions.get_permissions_by_resource()
527 return PermissionListResponse(all_permissions=all_permissions, permissions_by_resource=permissions_by_resource, total_count=len(all_permissions))
529 except Exception as e:
530 logger.error(f"Failed to get available permissions: {e}")
531 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve available permissions")
534# ===== Self-Service Endpoints =====
537@router.get("/my/roles", response_model=List[UserRoleResponse])
538async def get_my_roles(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
539 """Get current user's role assignments.
541 Args:
542 user: Current authenticated user
543 db: Database session
545 Returns:
546 List[UserRoleResponse]: Current user's role assignments
548 Raises:
549 HTTPException: If retrieving user roles fails
551 Examples:
552 >>> import asyncio
553 >>> asyncio.iscoroutinefunction(get_my_roles)
554 True
555 """
556 try:
557 permission_service = PermissionService(db)
558 user_roles = await permission_service.get_user_roles(user_email=user["email"], include_expired=False)
560 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles]
561 db.commit()
562 db.close()
563 return result
565 except Exception as e:
566 logger.error(f"Failed to get my roles for {user['email']}: {e}")
567 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your roles")
570@router.get("/my/permissions", response_model=List[str])
571async def get_my_permissions(team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
572 """Get current user's effective permissions.
574 Args:
575 team_id: Optional team context
576 user: Current authenticated user
577 db: Database session
579 Returns:
580 List[str]: Current user's effective permissions
582 Raises:
583 HTTPException: If retrieving user permissions fails
585 Examples:
586 >>> import asyncio
587 >>> asyncio.iscoroutinefunction(get_my_permissions)
588 True
589 """
590 try:
591 permission_service = PermissionService(db)
592 permissions = await permission_service.get_user_permissions(user_email=user["email"], team_id=team_id)
594 result = sorted(list(permissions))
595 db.commit()
596 db.close()
597 return result
599 except Exception as e:
600 logger.error(f"Failed to get my permissions for {user['email']}: {e}")
601 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your permissions")