Coverage for mcpgateway / routers / rbac.py: 100%
212 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/rbac.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7RBAC API Router.
9This module provides REST API endpoints for Role-Based Access Control (RBAC)
10management including roles, user role assignments, and permission checking.
12Examples:
13 >>> from mcpgateway.routers.rbac import router
14 >>> from fastapi import APIRouter
15 >>> isinstance(router, APIRouter)
16 True
17"""
19# Standard
20from datetime import datetime, timezone
21import logging
22from typing import Generator, List, Optional
24# Third-Party
25from fastapi import APIRouter, Depends, HTTPException, Query, status
26from sqlalchemy.orm import Session
28# First-Party
29from mcpgateway.db import Permissions, SessionLocal
30from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_admin_permission, require_permission
31from mcpgateway.schemas import PermissionCheckRequest, PermissionCheckResponse, PermissionListResponse, RoleCreateRequest, RoleResponse, RoleUpdateRequest, UserRoleAssignRequest, UserRoleResponse
32from mcpgateway.services.permission_service import PermissionService
33from mcpgateway.services.role_service import RoleService
35logger = logging.getLogger(__name__)
37router = APIRouter(prefix="/rbac", tags=["RBAC"])
40def get_db() -> Generator[Session, None, None]:
41 """Get database session for dependency injection.
43 Commits the transaction on successful completion to avoid implicit rollbacks
44 for read-only operations. Rolls back explicitly on exception.
46 Yields:
47 Session: SQLAlchemy database session
49 Raises:
50 Exception: Re-raises any exception after rolling back the transaction.
52 Examples:
53 >>> gen = get_db()
54 >>> db = next(gen)
55 >>> hasattr(db, 'close')
56 True
57 """
58 db = SessionLocal()
59 try:
60 yield db
61 db.commit()
62 except Exception:
63 try:
64 db.rollback()
65 except Exception:
66 try:
67 db.invalidate()
68 except Exception:
69 pass # nosec B110 - Best effort cleanup on connection failure
70 raise
71 finally:
72 db.close()
75# ===== Role Management Endpoints =====
78@router.post("/roles", response_model=RoleResponse)
79@require_admin_permission()
80async def create_role(role_data: RoleCreateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
81 """Create a new role.
83 Requires admin permissions to create roles.
85 Args:
86 role_data: Role creation data
87 user: Current authenticated user
88 db: Database session
90 Returns:
91 RoleResponse: Created role details
93 Raises:
94 HTTPException: If role creation fails
96 Examples:
97 >>> import asyncio
98 >>> asyncio.iscoroutinefunction(create_role)
99 True
100 """
101 try:
102 role_service = RoleService(db)
103 role = await role_service.create_role(
104 name=role_data.name,
105 description=role_data.description,
106 scope=role_data.scope,
107 permissions=role_data.permissions,
108 inherits_from=role_data.inherits_from,
109 created_by=user["email"],
110 is_system_role=role_data.is_system_role or False,
111 )
113 logger.info(f"Role created: {role.id} by {user['email']}")
114 db.commit()
115 db.close()
116 return RoleResponse.model_validate(role)
118 except ValueError as e:
119 logger.error(f"Role creation validation error: {e}")
120 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
121 except Exception as e:
122 logger.error(f"Role creation failed: {e}")
123 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create role")
126@router.get("/roles", response_model=List[RoleResponse])
127@require_permission("admin.user_management")
128async def list_roles(
129 scope: Optional[str] = Query(None, description="Filter by scope"),
130 active_only: bool = Query(True, description="Show only active roles"),
131 user=Depends(get_current_user_with_permissions),
132 db: Session = Depends(get_db),
133):
134 """List all roles.
136 Args:
137 scope: Optional scope filter
138 active_only: Whether to show only active roles
139 user: Current authenticated user
140 db: Database session
142 Returns:
143 List[RoleResponse]: List of roles
145 Raises:
146 HTTPException: If user lacks required permissions
148 Examples:
149 >>> import asyncio
150 >>> asyncio.iscoroutinefunction(list_roles)
151 True
152 """
153 try:
154 role_service = RoleService(db)
155 roles = await role_service.list_roles(scope=scope)
156 # Release transaction before response serialization
157 db.commit()
158 db.close()
160 return [RoleResponse.model_validate(role) for role in roles]
162 except Exception as e:
163 logger.error(f"Failed to list roles: {e}")
164 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve roles")
167@router.get("/roles/{role_id}", response_model=RoleResponse)
168@require_permission("admin.user_management")
169async def get_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
170 """Get role details by ID.
172 Args:
173 role_id: Role identifier
174 user: Current authenticated user
175 db: Database session
177 Returns:
178 RoleResponse: Role details
180 Raises:
181 HTTPException: If role not found
183 Examples:
184 >>> import asyncio
185 >>> asyncio.iscoroutinefunction(get_role)
186 True
187 """
188 try:
189 role_service = RoleService(db)
190 role = await role_service.get_role_by_id(role_id)
192 if not role:
193 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
195 db.commit()
196 db.close()
197 return RoleResponse.model_validate(role)
199 except HTTPException:
200 raise
201 except Exception as e:
202 logger.error(f"Failed to get role {role_id}: {e}")
203 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve role")
206@router.put("/roles/{role_id}", response_model=RoleResponse)
207@require_admin_permission()
208async def update_role(role_id: str, role_data: RoleUpdateRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
209 """Update an existing role.
211 Args:
212 role_id: Role identifier
213 role_data: Role update data
214 user: Current authenticated user
215 db: Database session
217 Returns:
218 RoleResponse: Updated role details
220 Raises:
221 HTTPException: If role not found or update fails
223 Examples:
224 >>> import asyncio
225 >>> asyncio.iscoroutinefunction(update_role)
226 True
227 """
228 try:
229 role_service = RoleService(db)
230 role = await role_service.update_role(role_id, **role_data.model_dump(exclude_unset=True))
232 if not role:
233 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
235 logger.info(f"Role updated: {role_id} by {user['email']}")
236 db.commit()
237 db.close()
238 return RoleResponse.model_validate(role)
240 except HTTPException:
241 raise
242 except ValueError as e:
243 logger.error(f"Role update validation error: {e}")
244 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
245 except Exception as e:
246 logger.error(f"Role update failed: {e}")
247 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update role")
250@router.delete("/roles/{role_id}")
251@require_admin_permission()
252async def delete_role(role_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
253 """Delete a role.
255 Args:
256 role_id: Role identifier
257 user: Current authenticated user
258 db: Database session
260 Returns:
261 dict: Success message
263 Raises:
264 HTTPException: If role not found or deletion fails
266 Examples:
267 >>> import asyncio
268 >>> asyncio.iscoroutinefunction(delete_role)
269 True
270 """
271 try:
272 role_service = RoleService(db)
273 success = await role_service.delete_role(role_id)
275 if not success:
276 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
278 logger.info(f"Role deleted: {role_id} by {user['email']}")
279 db.commit()
280 db.close()
281 return {"message": "Role deleted successfully"}
283 except HTTPException:
284 raise
285 except Exception as e:
286 logger.error(f"Role deletion failed: {e}")
287 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete role")
290# ===== User Role Assignment Endpoints =====
293@router.post("/users/{user_email}/roles", response_model=UserRoleResponse)
294@require_permission("admin.user_management")
295async def assign_role_to_user(user_email: str, assignment_data: UserRoleAssignRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
296 """Assign a role to a user.
298 Args:
299 user_email: User email address
300 assignment_data: Role assignment data
301 user: Current authenticated user
302 db: Database session
304 Returns:
305 UserRoleResponse: Created role assignment
307 Raises:
308 HTTPException: If assignment fails
310 Examples:
311 >>> import asyncio
312 >>> asyncio.iscoroutinefunction(assign_role_to_user)
313 True
314 """
315 try:
316 role_service = RoleService(db)
317 user_role = await role_service.assign_role_to_user(
318 user_email=user_email, role_id=assignment_data.role_id, scope=assignment_data.scope, scope_id=assignment_data.scope_id, granted_by=user["email"], expires_at=assignment_data.expires_at
319 )
321 logger.info(f"Role assigned: {assignment_data.role_id} to {user_email} by {user['email']}")
322 db.commit()
323 db.close()
324 return UserRoleResponse.model_validate(user_role)
326 except ValueError as e:
327 logger.error(f"Role assignment validation error: {e}")
328 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
329 except Exception as e:
330 logger.error(f"Role assignment failed: {e}")
331 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to assign role")
334@router.get("/users/{user_email}/roles", response_model=List[UserRoleResponse])
335@require_permission("admin.user_management")
336async def get_user_roles(
337 user_email: str,
338 scope: Optional[str] = Query(None, description="Filter by scope"),
339 active_only: bool = Query(True, description="Show only active assignments"),
340 user=Depends(get_current_user_with_permissions),
341 db: Session = Depends(get_db),
342):
343 """Get roles assigned to a user.
345 Args:
346 user_email: User email address
347 scope: Optional scope filter
348 active_only: Whether to show only active assignments
349 user: Current authenticated user
350 db: Database session
352 Returns:
353 List[UserRoleResponse]: User's role assignments
355 Raises:
356 HTTPException: If role retrieval fails
358 Examples:
359 >>> import asyncio
360 >>> asyncio.iscoroutinefunction(get_user_roles)
361 True
362 """
363 try:
364 permission_service = PermissionService(db)
365 user_roles = await permission_service.get_user_roles(user_email=user_email, scope=scope, include_expired=not active_only)
367 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles]
368 db.commit()
369 db.close()
370 return result
372 except Exception as e:
373 logger.error(f"Failed to get user roles for {user_email}: {e}")
374 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user roles")
377@router.delete("/users/{user_email}/roles/{role_id}")
378@require_permission("admin.user_management")
379async def revoke_user_role(
380 user_email: str,
381 role_id: str,
382 scope: Optional[str] = Query(None, description="Scope filter"),
383 scope_id: Optional[str] = Query(None, description="Scope ID filter"),
384 user=Depends(get_current_user_with_permissions),
385 db: Session = Depends(get_db),
386):
387 """Revoke a role from a user.
389 Args:
390 user_email: User email address
391 role_id: Role identifier
392 scope: Optional scope filter
393 scope_id: Optional scope ID filter
394 user: Current authenticated user
395 db: Database session
397 Returns:
398 dict: Success message
400 Raises:
401 HTTPException: If revocation fails
403 Examples:
404 >>> import asyncio
405 >>> asyncio.iscoroutinefunction(revoke_user_role)
406 True
407 """
408 try:
409 role_service = RoleService(db)
410 success = await role_service.revoke_role_from_user(user_email=user_email, role_id=role_id, scope=scope, scope_id=scope_id)
412 if not success:
413 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role assignment not found")
415 logger.info(f"Role revoked: {role_id} from {user_email} by {user['email']}")
416 db.commit()
417 db.close()
418 return {"message": "Role revoked successfully"}
420 except HTTPException:
421 raise
422 except Exception as e:
423 logger.error(f"Role revocation failed: {e}")
424 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to revoke role")
427# ===== Permission Checking Endpoints =====
430@router.post("/permissions/check", response_model=PermissionCheckResponse)
431@require_permission("admin.security_audit")
432async def check_permission(check_data: PermissionCheckRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
433 """Check if a user has specific permission.
435 Args:
436 check_data: Permission check request
437 user: Current authenticated user
438 db: Database session
440 Returns:
441 PermissionCheckResponse: Permission check result
443 Raises:
444 HTTPException: If permission check fails
446 Examples:
447 >>> import asyncio
448 >>> asyncio.iscoroutinefunction(check_permission)
449 True
450 """
451 try:
452 permission_service = PermissionService(db)
453 granted = await permission_service.check_permission(
454 user_email=check_data.user_email,
455 permission=check_data.permission,
456 resource_type=check_data.resource_type,
457 resource_id=check_data.resource_id,
458 team_id=check_data.team_id,
459 ip_address=user.get("ip_address"),
460 user_agent=user.get("user_agent"),
461 )
463 db.commit()
464 db.close()
465 return PermissionCheckResponse(user_email=check_data.user_email, permission=check_data.permission, granted=granted, checked_at=datetime.now(tz=timezone.utc), checked_by=user["email"])
467 except Exception as e:
468 logger.error(f"Permission check failed: {e}")
469 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to check permission")
472@router.get("/permissions/user/{user_email}", response_model=List[str])
473@require_permission("admin.security_audit")
474async def get_user_permissions(user_email: str, team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
475 """Get all effective permissions for a user.
477 Args:
478 user_email: User email address
479 team_id: Optional team context
480 user: Current authenticated user
481 db: Database session
483 Returns:
484 List[str]: User's effective permissions
486 Raises:
487 HTTPException: If retrieving user permissions fails
489 Examples:
490 >>> import asyncio
491 >>> asyncio.iscoroutinefunction(get_user_permissions)
492 True
493 """
494 try:
495 permission_service = PermissionService(db)
496 permissions = await permission_service.get_user_permissions(user_email=user_email, team_id=team_id)
498 result = sorted(list(permissions))
499 db.commit()
500 db.close()
501 return result
503 except Exception as e:
504 logger.error(f"Failed to get user permissions for {user_email}: {e}")
505 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve user permissions")
508@router.get("/permissions/available", response_model=PermissionListResponse)
509async def get_available_permissions(user=Depends(get_current_user_with_permissions)):
510 """Get all available permissions in the system.
512 Args:
513 user: Current authenticated user
515 Returns:
516 PermissionListResponse: Available permissions organized by resource type
518 Raises:
519 HTTPException: If retrieving available permissions fails
520 """
521 try:
522 all_permissions = Permissions.get_all_permissions()
523 permissions_by_resource = Permissions.get_permissions_by_resource()
525 return PermissionListResponse(all_permissions=all_permissions, permissions_by_resource=permissions_by_resource, total_count=len(all_permissions))
527 except Exception as e:
528 logger.error(f"Failed to get available permissions: {e}")
529 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve available permissions")
532# ===== Self-Service Endpoints =====
535@router.get("/my/roles", response_model=List[UserRoleResponse])
536async def get_my_roles(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
537 """Get current user's role assignments.
539 Args:
540 user: Current authenticated user
541 db: Database session
543 Returns:
544 List[UserRoleResponse]: Current user's role assignments
546 Raises:
547 HTTPException: If retrieving user roles fails
549 Examples:
550 >>> import asyncio
551 >>> asyncio.iscoroutinefunction(get_my_roles)
552 True
553 """
554 try:
555 permission_service = PermissionService(db)
556 user_roles = await permission_service.get_user_roles(user_email=user["email"], include_expired=False)
558 result = [UserRoleResponse.model_validate(user_role) for user_role in user_roles]
559 db.commit()
560 db.close()
561 return result
563 except Exception as e:
564 logger.error(f"Failed to get my roles for {user['email']}: {e}")
565 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your roles")
568@router.get("/my/permissions", response_model=List[str])
569async def get_my_permissions(team_id: Optional[str] = Query(None, description="Team context"), user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
570 """Get current user's effective permissions.
572 Args:
573 team_id: Optional team context
574 user: Current authenticated user
575 db: Database session
577 Returns:
578 List[str]: Current user's effective permissions
580 Raises:
581 HTTPException: If retrieving user permissions fails
583 Examples:
584 >>> import asyncio
585 >>> asyncio.iscoroutinefunction(get_my_permissions)
586 True
587 """
588 try:
589 permission_service = PermissionService(db)
590 permissions = await permission_service.get_user_permissions(user_email=user["email"], team_id=team_id)
592 result = sorted(list(permissions))
593 db.commit()
594 db.close()
595 return result
597 except Exception as e:
598 logger.error(f"Failed to get my permissions for {user['email']}: {e}")
599 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve your permissions")