Coverage for mcpgateway / routers / rbac.py: 100%
215 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/rbac.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7RBAC API Router.
9This module provides REST API endpoints for Role-Based Access Control (RBAC)
10management including roles, user role assignments, and permission checking.
12Examples:
13 >>> from mcpgateway.routers.rbac import router
14 >>> from fastapi import APIRouter
15 >>> isinstance(router, APIRouter)
16 True
17"""
19# Standard
20from datetime import datetime, timezone
21import logging
22from typing import Generator, List, Optional
24# Third-Party
25from fastapi import APIRouter, Depends, HTTPException, Query, status
26from sqlalchemy.orm import Session
28# First-Party
29from mcpgateway.common.validators import SecurityValidator
30from mcpgateway.db import Permissions, SessionLocal
31from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_admin_permission, require_permission
32from mcpgateway.schemas import PermissionCheckRequest, PermissionCheckResponse, PermissionListResponse, RoleCreateRequest, RoleResponse, RoleUpdateRequest, UserRoleAssignRequest, UserRoleResponse
33from mcpgateway.services.permission_service import PermissionService
34from mcpgateway.services.role_service import RoleService
36logger = logging.getLogger(__name__)
38router = APIRouter(prefix="/rbac", tags=["RBAC"])
41def get_db() -> Generator[Session, None, None]:
42 """Get database session for dependency injection.
44 Commits the transaction on successful completion to avoid implicit rollbacks
45 for read-only operations. Rolls back explicitly on exception.
47 Yields:
48 Session: SQLAlchemy database session
50 Raises:
51 Exception: Re-raises any exception after rolling back the transaction.
53 Examples:
54 >>> gen = get_db()
55 >>> db = next(gen)
56 >>> hasattr(db, 'close')
57 True
58 """
59 db = SessionLocal()
60 try:
61 yield db
62 db.commit()
63 except Exception:
64 try:
65 db.rollback()
66 except Exception:
67 try:
68 db.invalidate()
69 except Exception:
70 pass # nosec B110 - Best effort cleanup on connection failure
71 raise
72 finally:
73 db.close()
76# ===== Role Management Endpoints =====
79@router.post("/roles", response_model=RoleResponse)
80@require_admin_permission()
81async def create_role(role_data: RoleCreateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
82 """Create a new role.
84 Requires admin permissions to create roles.
86 Args:
87 role_data: Role creation data
88 user: Current authenticated user
89 db: Database session
91 Returns:
92 RoleResponse: Created role details
94 Raises:
95 HTTPException: If role creation fails
97 Examples:
98 >>> import asyncio
99 >>> asyncio.iscoroutinefunction(create_role)
100 True
101 """
102 try:
103 role_service = RoleService(db)
104 role = await role_service.create_role(
105 name=role_data.name,
106 description=role_data.description,
107 scope=role_data.scope,
108 permissions=role_data.permissions,
109 inherits_from=role_data.inherits_from,
110 created_by=user["email"],
111 is_system_role=role_data.is_system_role or False,
112 )
114 logger.info(f"Role created: {role.id} by {SecurityValidator.sanitize_log_message(user['email'])}")
115 db.commit()
116 db.close()
117 return RoleResponse.model_validate(role)
119 except ValueError as e:
120 logger.error(f"Role creation validation error: {e}")
121 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
122 except Exception as e:
123 logger.error(f"Role creation failed: {e}")
124 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create role")
127@router.get("/roles", response_model=List[RoleResponse])
128@require_permission("admin.user_management")
129async def list_roles(
130 scope: Optional[str] = Query(None, description="Filter by scope"),
131 active_only: bool = Query(True, description="Show only active roles"),
132 user=Depends(get_current_user_with_permissions),
133 db: Session = Depends(get_db),
134):
135 """List all roles.
137 Args:
138 scope: Optional scope filter
139 active_only: Whether to show only active roles
140 user: Current authenticated user
141 db: Database session
143 Returns:
144 List[RoleResponse]: List of roles
146 Raises:
147 HTTPException: If user lacks required permissions
149 Examples:
150 >>> import asyncio
151 >>> asyncio.iscoroutinefunction(list_roles)
152 True
153 """
154 try:
155 role_service = RoleService(db)
156 roles = await role_service.list_roles(scope=scope)
157 # Release transaction before response serialization
158 db.commit()
159 db.close()
161 return [RoleResponse.model_validate(role) for role in roles]
163 except Exception as e:
164 logger.error(f"Failed to list roles: {e}")
165 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve roles")
168@router.get("/roles/{role_id}", response_model=RoleResponse)
169@require_permission("admin.user_management")
170async def get_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
171 """Get role details by ID.
173 Args:
174 role_id: Role identifier
175 user: Current authenticated user
176 db: Database session
178 Returns:
179 RoleResponse: Role details
181 Raises:
182 HTTPException: If role not found
184 Examples:
185 >>> import asyncio
186 >>> asyncio.iscoroutinefunction(get_role)
187 True
188 """
189 try:
190 role_service = RoleService(db)
191 role = await role_service.get_role_by_id(role_id)
193 if not role:
194 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
196 db.commit()
197 db.close()
198 return RoleResponse.model_validate(role)
200 except HTTPException:
201 raise
202 except Exception as e:
203 logger.error(f"Failed to get role {role_id}: {e}")
204 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve role")
207@router.put("/roles/{role_id}", response_model=RoleResponse)
208@require_admin_permission()
209async def update_role(role_id: str, role_data: RoleUpdateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
210 """Update an existing role.
212 Args:
213 role_id: Role identifier
214 role_data: Role update data
215 user: Current authenticated user
216 db: Database session
218 Returns:
219 RoleResponse: Updated role details
221 Raises:
222 HTTPException: If role not found or update fails
224 Examples:
225 >>> import asyncio
226 >>> asyncio.iscoroutinefunction(update_role)
227 True
228 """
229 try:
230 role_service = RoleService(db)
231 role = await role_service.update_role(role_id, **role_data.model_dump(exclude_unset=True))
233 if not role:
234 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
236 logger.info(f"Role updated: {role_id} by {SecurityValidator.sanitize_log_message(user['email'])}")
237 db.commit()
238 db.close()
239 return RoleResponse.model_validate(role)
241 except HTTPException:
242 raise
243 except ValueError as e:
244 logger.error(f"Role update validation error: {e}")
245 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
246 except Exception as e:
247 logger.error(f"Role update failed: {e}")
248 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update role")
251@router.delete("/roles/{role_id}")
252@require_admin_permission()
253async def delete_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
254 """Delete a role.
256 Args:
257 role_id: Role identifier
258 user: Current authenticated user
259 db: Database session
261 Returns:
262 dict: Success message
264 Raises:
265 HTTPException: If role not found or deletion fails
267 Examples:
268 >>> import asyncio
269 >>> asyncio.iscoroutinefunction(delete_role)
270 True
271 """
272 try:
273 role_service = RoleService(db)
274 success = await role_service.delete_role(role_id)
276 if not success:
277 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
279 logger.info(f"Role deleted: {role_id} by {SecurityValidator.sanitize_log_message(user['email'])}")
280 db.commit()
281 db.close()
282 return {"message": "Role deleted successfully"}
284 except HTTPException:
285 raise
286 except ValueError as e:
287 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
288 except Exception as e:
289 logger.error(f"Role deletion failed: {e}")
290 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete role")
293# ===== User Role Assignment Endpoints =====
296@router.post("/users/{user_email}/roles", response_model=UserRoleResponse)
297@require_permission("admin.user_management")
298async def assign_role_to_user(user_email: str, assignment_data: UserRoleAssignRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
299 """Assign a role to a user.
301 Args:
302 user_email: User email address
303 assignment_data: Role assignment data
304 user: Current authenticated user
305 db: Database session
307 Returns:
308 UserRoleResponse: Created role assignment
310 Raises:
311 HTTPException: If assignment fails
313 Examples:
314 >>> import asyncio
315 >>> asyncio.iscoroutinefunction(assign_role_to_user)
316 True
317 """
318 try:
319 role_service = RoleService(db)
320 user_role = await role_service.assign_role_to_user(
321 user_email=user_email, role_id=assignment_data.role_id, scope=assignment_data.scope, scope_id=assignment_data.scope_id, granted_by=user["email"], expires_at=assignment_data.expires_at
322 )
324 logger.info(f"Role assigned: {assignment_data.role_id} to {SecurityValidator.sanitize_log_message(user_email)} by {SecurityValidator.sanitize_log_message(user['email'])}")
325 db.commit()
326 db.close()
327 return UserRoleResponse.model_validate(user_role)
329 except ValueError as e:
330 logger.error(f"Role assignment validation error: {e}")
331 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
332 except Exception as e:
333 logger.error(f"Role assignment failed: {e}")
334 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to assign role")
337@router.get("/users/{user_email}/roles", response_model=List[UserRoleResponse])
338@require_permission("admin.user_management")
339async def get_user_roles(
340 user_email: str,
341 scope: Optional[str] = Query(None, description="Filter by scope"),
342 active_only: bool = Query(True, description="Show only active assignments"),
343 user=Depends(get_current_user_with_permissions),
344 db: Session = Depends(get_db),
345):
346 """Get roles assigned to a user.
348 Args:
349 user_email: User email address
350 scope: Optional scope filter
351 active_only: Whether to show only active assignments
352 user: Current authenticated user
353 db: Database session
355 Returns:
356 List[UserRoleResponse]: User's role assignments
358 Raises:
359 HTTPException: If role retrieval fails
361 Examples:
362 >>> import asyncio
363 >>> asyncio.iscoroutinefunction(get_user_roles)
364 True
365 """
366 try:
367 permission_service = PermissionService(db)
368 user_roles = await permission_service.get_user_roles(user_email=user_email, scope=scope, include_expired=not active_only)
370 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles]
371 db.commit()
372 db.close()
373 return result
375 except Exception as e:
376 logger.error(f"Failed to get user roles for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
377 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user roles")
380@router.delete("/users/{user_email}/roles/{role_id}")
381@require_permission("admin.user_management")
382async def revoke_user_role(
383 user_email: str,
384 role_id: str,
385 scope: Optional[str] = Query(None, description="Scope filter"),
386 scope_id: Optional[str] = Query(None, description="Scope ID filter"),
387 user=Depends(get_current_user_with_permissions),
388 db: Session = Depends(get_db),
389):
390 """Revoke a role from a user.
392 Args:
393 user_email: User email address
394 role_id: Role identifier
395 scope: Optional scope filter
396 scope_id: Optional scope ID filter
397 user: Current authenticated user
398 db: Database session
400 Returns:
401 dict: Success message
403 Raises:
404 HTTPException: If revocation fails
406 Examples:
407 >>> import asyncio
408 >>> asyncio.iscoroutinefunction(revoke_user_role)
409 True
410 """
411 try:
412 role_service = RoleService(db)
413 success = await role_service.revoke_role_from_user(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id)
415 if not success:
416 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role assignment not found")
418 logger.info(f"Role revoked: {role_id} from {SecurityValidator.sanitize_log_message(user_email)} by {SecurityValidator.sanitize_log_message(user['email'])}")
419 db.commit()
420 db.close()
421 return {"message": "Role revoked successfully"}
423 except HTTPException:
424 raise
425 except Exception as e:
426 logger.error(f"Role revocation failed: {e}")
427 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to revoke role")
430# ===== Permission Checking Endpoints =====
433@router.post("/permissions/check", response_model=PermissionCheckResponse)
434@require_permission("admin.security_audit")
435async def check_permission(check_data: PermissionCheckRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
436 """Check if a user has specific permission.
438 Args:
439 check_data: Permission check request
440 user: Current authenticated user
441 db: Database session
443 Returns:
444 PermissionCheckResponse: Permission check result
446 Raises:
447 HTTPException: If permission check fails
449 Examples:
450 >>> import asyncio
451 >>> asyncio.iscoroutinefunction(check_permission)
452 True
453 """
454 try:
455 permission_service = PermissionService(db)
456 granted = await permission_service.check_permission(
457 user_email=check_data.user_email,
458 permission=check_data.permission,
459 resource_type=check_data.resource_type,
460 resource_id=check_data.resource_id,
461 team_id=check_data.team_id,
462 ip_address=user.get("ip_address"),
463 user_agent=user.get("user_agent"),
464 )
466 db.commit()
467 db.close()
468 return PermissionCheckResponse(user_email=check_data.user_email, permission=check_data.permission, granted=granted, checked_at=datetime.now(tz=timezone.utc), checked_by=user["email"])
470 except Exception as e:
471 logger.error(f"Permission check failed: {e}")
472 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to check permission")
475@router.get("/permissions/user/{user_email}", response_model=List[str])
476@require_permission("admin.security_audit")
477async def get_user_permissions(user_email: str, team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
478 """Get all effective permissions for a user.
480 Args:
481 user_email: User email address
482 team_id: Optional team context
483 user: Current authenticated user
484 db: Database session
486 Returns:
487 List[str]: User's effective permissions
489 Raises:
490 HTTPException: If retrieving user permissions fails
492 Examples:
493 >>> import asyncio
494 >>> asyncio.iscoroutinefunction(get_user_permissions)
495 True
496 """
497 try:
498 permission_service = PermissionService(db)
499 permissions = await permission_service.get_user_permissions(user_email=user_email, team_id=team_id)
501 result = sorted(list(permissions))
502 db.commit()
503 db.close()
504 return result
506 except Exception as e:
507 logger.error(f"Failed to get user permissions for {SecurityValidator.sanitize_log_message(user_email)}: {e}")
508 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user permissions")
511@router.get("/permissions/available", response_model=PermissionListResponse)
512async def get_available_permissions(user=Depends(get_current_user_with_permissions)):
513 """Get all available permissions in the system.
515 Args:
516 user: Current authenticated user
518 Returns:
519 PermissionListResponse: Available permissions organized by resource type
521 Raises:
522 HTTPException: If retrieving available permissions fails
523 """
524 try:
525 all_permissions = Permissions.get_all_permissions()
526 permissions_by_resource = Permissions.get_permissions_by_resource()
528 return PermissionListResponse(all_permissions=all_permissions, permissions_by_resource=permissions_by_resource, total_count=len(all_permissions))
530 except Exception as e:
531 logger.error(f"Failed to get available permissions: {e}")
532 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve available permissions")
535# ===== Self-Service Endpoints =====
538@router.get("/my/roles", response_model=List[UserRoleResponse])
539async def get_my_roles(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
540 """Get current user's role assignments.
542 Args:
543 user: Current authenticated user
544 db: Database session
546 Returns:
547 List[UserRoleResponse]: Current user's role assignments
549 Raises:
550 HTTPException: If retrieving user roles fails
552 Examples:
553 >>> import asyncio
554 >>> asyncio.iscoroutinefunction(get_my_roles)
555 True
556 """
557 try:
558 permission_service = PermissionService(db)
559 user_roles = await permission_service.get_user_roles(user_email=user["email"], include_expired=False)
561 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles]
562 db.commit()
563 db.close()
564 return result
566 except Exception as e:
567 logger.error(f"Failed to get my roles for {SecurityValidator.sanitize_log_message(user['email'])}: {e}")
568 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your roles")
571@router.get("/my/permissions", response_model=List[str])
572async def get_my_permissions(team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
573 """Get current user's effective permissions.
575 Args:
576 team_id: Optional team context
577 user: Current authenticated user
578 db: Database session
580 Returns:
581 List[str]: Current user's effective permissions
583 Raises:
584 HTTPException: If retrieving user permissions fails
586 Examples:
587 >>> import asyncio
588 >>> asyncio.iscoroutinefunction(get_my_permissions)
589 True
590 """
591 try:
592 permission_service = PermissionService(db)
593 permissions = await permission_service.get_user_permissions(user_email=user["email"], team_id=team_id, token_teams=user.get("token_teams"))
595 result = sorted(list(permissions))
596 db.commit()
597 db.close()
598 return result
600 except Exception as e:
601 logger.error(f"Failed to get my permissions for {SecurityValidator.sanitize_log_message(user['email'])}: {e}")
602 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your permissions")