Coverage for mcpgateway / middleware / rbac.py: 100%
299 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/rbac.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7RBAC Permission Checking Middleware.
9This module provides middleware for FastAPI to enforce role-based access control
10on API endpoints. It includes permission decorators and dependency injection
11functions for protecting routes.
12"""
14# Standard
15import functools
16from functools import wraps
17import logging
18from typing import Callable, Generator, List, Optional
19import uuid
21# Third-Party
22from fastapi import Cookie, Depends, HTTPException, Request, status
23from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
24from sqlalchemy.orm import Session
26# First-Party
27from mcpgateway.auth import get_current_user
28from mcpgateway.config import settings
29from mcpgateway.db import fresh_db_session, SessionLocal
30from mcpgateway.services.permission_service import PermissionService
32logger = logging.getLogger(__name__)
34# HTTP Bearer security scheme for token extraction
35security = HTTPBearer(auto_error=False)
38def get_db() -> Generator[Session, None, None]:
39 """Get database session for dependency injection.
41 DEPRECATED: Use fresh_db_session() context manager instead to avoid session accumulation.
42 This function is kept for backwards compatibility with endpoints that still use Depends(get_db).
44 Commits the transaction on successful completion to avoid implicit rollbacks
45 for read-only operations. Rolls back explicitly on exception.
47 Yields:
48 Session: SQLAlchemy database session
50 Raises:
51 Exception: Re-raises any exception after rolling back the transaction.
53 Examples:
54 >>> gen = get_db()
55 >>> db = next(gen)
56 >>> hasattr(db, 'query')
57 True
58 """
59 db = SessionLocal()
60 try:
61 yield db
62 db.commit()
63 except Exception:
64 try:
65 db.rollback()
66 except Exception:
67 try:
68 db.invalidate()
69 except Exception:
70 pass # nosec B110 - Best effort cleanup on connection failure
71 raise
72 finally:
73 db.close()
76async def get_permission_service(db: Session = Depends(get_db)) -> PermissionService:
77 """Get permission service instance for dependency injection.
79 DEPRECATED: Use PermissionService(db) directly with fresh_db_session() context manager instead.
80 This function is kept for backwards compatibility with endpoints that still use dependency injection.
82 Args:
83 db: Database session
85 Returns:
86 PermissionService: Permission checking service instance
88 Examples:
89 >>> import asyncio
90 >>> asyncio.iscoroutinefunction(get_permission_service)
91 True
92 """
93 return PermissionService(db)
96async def get_current_user_with_permissions(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)):
97 """Extract current user from JWT token and prepare for permission checking.
99 Uses fresh_db_session() context manager to avoid session accumulation under high load.
100 Database sessions are created only when needed and closed immediately after use.
102 Args:
103 request: FastAPI request object for IP/user-agent extraction
104 credentials: HTTP Bearer credentials
105 jwt_token: JWT token from cookie
107 Returns:
108 dict: User information with permission checking context
110 Raises:
111 HTTPException: If authentication fails
113 Examples:
114 Use as FastAPI dependency::
116 @app.get("/protected-endpoint")
117 async def protected_route(user = Depends(get_current_user_with_permissions)):
118 return {"user": user["email"]}
119 """
120 # Check for proxy authentication first (if MCP client auth is disabled)
121 if not settings.mcp_client_auth_enabled:
122 # Read plugin context from request.state for cross-hook context sharing
123 # (set by HttpAuthMiddleware for passing contexts between different hook types)
124 plugin_context_table = getattr(request.state, "plugin_context_table", None)
125 plugin_global_context = getattr(request.state, "plugin_global_context", None)
127 if settings.trust_proxy_auth:
128 # Extract user from proxy header
129 proxy_user = request.headers.get(settings.proxy_user_header)
130 if proxy_user:
131 # Lookup user in DB to get is_admin status, or check platform_admin_email
132 is_admin = False
133 full_name = proxy_user
134 if proxy_user == settings.platform_admin_email:
135 is_admin = True
136 full_name = "Platform Admin"
137 else:
138 # Try to lookup user in EmailUser table for is_admin status
139 try:
140 # Third-Party
141 from sqlalchemy import select # pylint: disable=import-outside-toplevel
143 # First-Party
144 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel
146 # Use fresh_db_session for short-lived database access
147 with fresh_db_session() as db:
148 user = db.execute(select(EmailUser).where(EmailUser.email == proxy_user)).scalar_one_or_none()
149 if user:
150 is_admin = user.is_admin
151 full_name = user.full_name or proxy_user
152 except Exception as e:
153 logger.debug(f"Could not lookup proxy user in DB: {e}")
154 # Continue with is_admin=False if lookup fails
156 return {
157 "email": proxy_user,
158 "full_name": full_name,
159 "is_admin": is_admin,
160 "ip_address": request.client.host if request.client else None,
161 "user_agent": request.headers.get("user-agent"),
162 "db": None, # Session closed; use endpoint's db param instead
163 "auth_method": "proxy",
164 "request_id": getattr(request.state, "request_id", None),
165 "team_id": getattr(request.state, "team_id", None),
166 "plugin_context_table": plugin_context_table,
167 "plugin_global_context": plugin_global_context,
168 }
170 # No proxy header - check auth_required to align with WebSocket behavior
171 # For browser requests, redirect to login; for API requests, return 401
172 if settings.auth_required:
173 accept_header = request.headers.get("accept", "")
174 is_htmx = request.headers.get("hx-request") == "true"
175 if "text/html" in accept_header or is_htmx:
176 raise HTTPException(
177 status_code=status.HTTP_302_FOUND,
178 detail="Authentication required",
179 headers={"Location": f"{settings.app_root_path}/admin/login"},
180 )
181 raise HTTPException(
182 status_code=status.HTTP_401_UNAUTHORIZED,
183 detail="Proxy authentication header required",
184 )
186 # auth_required=false: allow anonymous access
188 return {
189 "email": "anonymous",
190 "full_name": "Anonymous User",
191 "is_admin": False,
192 "ip_address": request.client.host if request.client else None,
193 "user_agent": request.headers.get("user-agent"),
194 "db": None, # Session closed; use endpoint's db param instead
195 "auth_method": "anonymous",
196 "request_id": getattr(request.state, "request_id", None),
197 "team_id": getattr(request.state, "team_id", None),
198 "plugin_context_table": plugin_context_table,
199 "plugin_global_context": plugin_global_context,
200 }
202 # Warning: MCP auth disabled without proxy trust - security risk!
203 # This case is already warned about in config validation
204 # Still check auth_required for consistency
205 if settings.auth_required:
206 accept_header = request.headers.get("accept", "")
207 is_htmx = request.headers.get("hx-request") == "true"
208 if "text/html" in accept_header or is_htmx:
209 raise HTTPException(
210 status_code=status.HTTP_302_FOUND,
211 detail="Authentication required",
212 headers={"Location": f"{settings.app_root_path}/admin/login"},
213 )
214 raise HTTPException(
215 status_code=status.HTTP_401_UNAUTHORIZED,
216 detail="Authentication required but no auth method configured",
217 )
219 return {
220 "email": "anonymous",
221 "full_name": "Anonymous User",
222 "is_admin": False,
223 "ip_address": request.client.host if request.client else None,
224 "user_agent": request.headers.get("user-agent"),
225 "db": None, # Session closed; use endpoint's db param instead
226 "auth_method": "anonymous",
227 "request_id": getattr(request.state, "request_id", None),
228 "team_id": getattr(request.state, "team_id", None),
229 "plugin_context_table": plugin_context_table,
230 "plugin_global_context": plugin_global_context,
231 }
233 # Standard JWT authentication flow
234 # Try multiple sources for the token, prioritizing Authorization header for API requests
235 token = None
236 token_from_cookie = False
238 # 1. First try Authorization header (preferred for API requests)
239 if credentials and credentials.credentials:
240 token = credentials.credentials
242 # 2. Try manual cookie reading (for browser requests)
243 if not token and request.cookies:
244 # Try both jwt_token and access_token cookie names
245 manual_token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
246 if manual_token:
247 token = manual_token
248 token_from_cookie = True
250 # 3. Finally try FastAPI Cookie dependency (fallback)
251 if not token and jwt_token:
252 token = jwt_token
253 token_from_cookie = True
255 # Check if this is a browser/admin-UI request (not an external API request)
256 accept_header = request.headers.get("accept", "")
257 is_htmx = request.headers.get("hx-request") == "true"
258 referer = request.headers.get("referer", "")
259 is_admin_ui_request = "/admin" in referer
260 is_browser_request = "text/html" in accept_header or is_htmx or is_admin_ui_request
262 # SECURITY: Reject cookie-only authentication for API requests
263 # Cookies should only be used for browser/HTML requests (including admin UI fetch calls)
264 if token_from_cookie and not is_browser_request:
265 raise HTTPException(
266 status_code=status.HTTP_401_UNAUTHORIZED,
267 detail="Cookie authentication not allowed for API requests. Use Authorization header.",
268 headers={"WWW-Authenticate": "Bearer"},
269 )
271 if not token:
272 # For browser requests (HTML Accept header or HTMX), redirect to login
273 if is_browser_request:
274 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"})
276 # If auth is disabled, return the stock admin user
277 if not settings.auth_required:
278 return {
279 "email": settings.platform_admin_email,
280 "full_name": "Platform Admin",
281 "is_admin": True,
282 "ip_address": request.client.host if request.client else None,
283 "user_agent": request.headers.get("user-agent"),
284 "db": None, # Session closed; use endpoint's db param instead
285 "auth_method": "disabled",
286 "request_id": getattr(request.state, "request_id", None),
287 "team_id": getattr(request.state, "team_id", None),
288 }
290 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization token required")
292 try:
293 # Create credentials object if we got token from cookie
294 if not credentials:
295 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
297 # Extract user from token using the email auth function
298 # Pass request to get_current_user so plugins can store auth_method in request.state
299 user = await get_current_user(credentials, request=request)
301 # Read auth_method and request_id from request.state
302 # (auth_method set by plugin in get_current_user, request_id set by HTTP middleware)
303 auth_method = getattr(request.state, "auth_method", None)
304 request_id = getattr(request.state, "request_id", None)
305 team_id = getattr(request.state, "team_id", None)
307 # Read plugin context data from request.state for cross-hook context sharing
308 # (set by HttpAuthMiddleware for passing contexts between different hook types)
309 plugin_context_table = getattr(request.state, "plugin_context_table", None)
310 plugin_global_context = getattr(request.state, "plugin_global_context", None)
312 # Get token_use from request.state (set by get_current_user)
313 token_use = getattr(request.state, "token_use", None)
315 # Add request context for permission auditing
316 return {
317 "email": user.email,
318 "full_name": user.full_name,
319 "is_admin": user.is_admin,
320 "ip_address": request.client.host if request.client else None,
321 "user_agent": request.headers.get("user-agent"),
322 "db": None, # Session closed; use endpoint's db param instead
323 "auth_method": auth_method, # Include auth_method from plugin
324 "request_id": request_id, # Include request_id from middleware
325 "team_id": team_id, # Include team_id from token
326 "token_use": token_use, # Include token_use for RBAC team derivation
327 "plugin_context_table": plugin_context_table, # Plugin contexts for cross-hook sharing
328 "plugin_global_context": plugin_global_context, # Global context for consistency
329 }
330 except Exception as e:
331 logger.error(f"Authentication failed: {type(e).__name__}: {e}")
333 # For browser requests (HTML Accept header or HTMX), redirect to login
334 accept_header = request.headers.get("accept", "")
335 is_htmx = request.headers.get("hx-request") == "true"
336 if "text/html" in accept_header or is_htmx:
337 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"})
339 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials")
342# --- Team derivation helpers for multi-team session tokens ---
345@functools.lru_cache(maxsize=1)
346def _get_resource_param_to_model():
347 """Lazy-initialize the resource param to model mapping.
349 Returns:
350 dict: Mapping of URL parameter names to SQLAlchemy model classes.
351 """
352 # First-Party
353 from mcpgateway.db import A2AAgent, Gateway, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel
355 return {
356 "tool_id": Tool,
357 "server_id": Server,
358 "resource_id": Resource,
359 "prompt_id": Prompt,
360 "gateway_id": Gateway,
361 "agent_id": A2AAgent,
362 }
365def _derive_team_from_resource(kwargs, db_session) -> Optional[str]:
366 """Look up resource's team_id from DB for RBAC context (Tier 1).
368 For endpoints that target a specific resource (get, update, delete, execute),
369 derive the team context from the resource's owner team.
371 Args:
372 kwargs: Endpoint function kwargs containing resource ID params
373 db_session: Active SQLAlchemy session
375 Returns:
376 team_id string if found, None otherwise
377 """
378 mapping = _get_resource_param_to_model()
379 for param_name, model_cls in mapping.items():
380 resource_id = kwargs.get(param_name)
381 if resource_id:
382 try:
383 resource = db_session.get(model_cls, resource_id)
384 if resource:
385 return getattr(resource, "team_id", None)
386 except Exception: # nosec B110 - DB lookup failure falls through to None
387 pass
388 return None # Resource not found; let endpoint handle 404
389 return None # No resource ID param
392async def _derive_team_from_payload(kwargs) -> Optional[str]:
393 """Extract team_id from create payload objects or form data (Tier 3).
395 For create endpoints, derive team context from the Pydantic payload or form data.
397 Args:
398 kwargs: Endpoint function kwargs
400 Returns:
401 team_id string if found, None otherwise
402 """
403 # Try Pydantic payload objects (API endpoints)
404 for param_name in ("gateway", "tool", "server", "resource", "prompt", "agent"):
405 payload_obj = kwargs.get(param_name)
406 if payload_obj and hasattr(payload_obj, "team_id"):
407 tid = getattr(payload_obj, "team_id", None)
408 if tid:
409 return tid
411 # Try request form data (admin UI endpoints)
412 # Note: use 'is not None' rather than truthiness check because some
413 # objects (e.g. Pydantic models) may be truthy yet lack .headers.
414 request = kwargs.get("request")
415 if request is not None and isinstance(request, Request):
416 content_type = request.headers.get("content-type", "")
417 if "form" in content_type:
418 try:
419 form = await request.form()
420 tid = form.get("team_id")
421 if tid:
422 return tid
423 except Exception: # nosec B110 - Form parse failure is non-fatal
424 pass
426 return None
429# Permissions that indicate create/mutate operations (not safe for "any-team" aggregation)
430_MUTATE_PERMISSION_ACTIONS = frozenset(
431 {
432 "create",
433 "update",
434 "delete",
435 "execute",
436 "invoke",
437 "toggle",
438 "set_state",
439 "revoke",
440 "manage_members",
441 "join",
442 "manage",
443 "share",
444 "invite",
445 "use",
446 }
447)
450def _is_mutate_permission(permission: str) -> bool:
451 """Check if a permission string represents a mutate operation.
453 Handles both dot-separated (tools.create) and colon-separated
454 (admin.sso_providers:create) permission formats.
456 Args:
457 permission: Permission string like 'tools.create' or 'admin.sso_providers:create'.
459 Returns:
460 bool: True if the permission's action component is a mutating operation.
461 """
462 # Handle colon separator: admin.sso_providers:create → action is "create"
463 if ":" in permission:
464 action = permission.rsplit(":", 1)[-1]
465 return action in _MUTATE_PERMISSION_ACTIONS
466 parts = permission.split(".")
467 return parts[-1] in _MUTATE_PERMISSION_ACTIONS if len(parts) >= 2 else False
470def require_permission(permission: str, resource_type: Optional[str] = None, allow_admin_bypass: bool = True):
471 """Decorator to require specific permission for accessing an endpoint.
473 Args:
474 permission: Required permission (e.g., 'tools.create')
475 resource_type: Optional resource type for resource-specific permissions
476 allow_admin_bypass: If True (default), admin users bypass all permission checks.
477 If False, even admins must have explicit permissions.
478 Use False for admin UI routes to enforce granular RBAC.
480 Returns:
481 Callable: Decorated function that enforces the permission requirement
483 Examples:
484 >>> decorator = require_permission("tools.create", "tools")
485 >>> callable(decorator)
486 True
488 Execute wrapped function when permission granted:
489 >>> import asyncio
490 >>> class DummyPS:
491 ... def __init__(self, db):
492 ... pass
493 ... async def check_permission(self, **kwargs):
494 ... return True
495 >>> @require_permission("tools.read")
496 ... async def demo(user=None):
497 ... return "ok"
498 >>> from unittest.mock import patch
499 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS):
500 ... asyncio.run(demo(user={"email": "u", "db": object()}))
501 'ok'
502 """
504 def decorator(func: Callable) -> Callable:
505 """Decorator function that wraps the original function with permission checking.
507 Args:
508 func: The function to be decorated
510 Returns:
511 Callable: The wrapped function with permission checking
512 """
514 @wraps(func)
515 async def wrapper(*args, **kwargs):
516 """Async wrapper function that performs permission check before calling original function.
518 Args:
519 *args: Positional arguments passed to the wrapped function
520 **kwargs: Keyword arguments passed to the wrapped function
522 Returns:
523 Any: Result from the wrapped function if permission check passes
525 Raises:
526 HTTPException: If user authentication or permission check fails
527 """
528 # Extract user context from named kwargs only (security: avoid picking up request body dicts)
529 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx")
530 if not user_context or not isinstance(user_context, dict) or "email" not in user_context:
531 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required")
533 # Extract team_id from path parameters if available
534 team_id = kwargs.get("team_id")
536 # If team_id is None or blank in kwargs then check
537 if not team_id:
538 # check if user_context has team_id
539 team_id = user_context.get("team_id", None)
541 # For multi-team session tokens (team_id is None), derive team from context
542 check_any_team = False
543 if not team_id and user_context.get("token_use") == "session":
544 db_session = kwargs.get("db") or user_context.get("db")
545 if db_session:
546 # Tier 1: Try to derive team from existing resource
547 team_id = _derive_team_from_resource(kwargs, db_session)
548 # Tier 3: Try to derive team from create payload / form
549 if team_id is None:
550 team_id = await _derive_team_from_payload(kwargs)
551 # If still no team_id: Tier 2 for read/list, fail-closed-for-teams for mutate
552 if not team_id:
553 if _is_mutate_permission(permission):
554 # Mutate without team context: proceed with team_id=None which
555 # restricts RBAC to global + personal roles only. Team-scoped
556 # roles cannot match (fail-closed for team grants), but global
557 # roles (e.g. platform_admin) still work as intended.
558 pass
559 else:
560 # List/read endpoint: check if user has permission in any team
561 check_any_team = True
563 # First, check if any plugins want to handle permission checking
564 # First-Party
565 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthCheckPermissionPayload, HttpHookType # pylint: disable=import-outside-toplevel
567 plugin_manager = get_plugin_manager()
568 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_CHECK_PERMISSION):
569 # Get plugin contexts from user_context (stored in request.state by HttpAuthMiddleware)
570 # These enable cross-hook context sharing between HTTP_PRE_REQUEST and HTTP_AUTH_CHECK_PERMISSION
571 plugin_context_table = user_context.get("plugin_context_table")
572 plugin_global_context = user_context.get("plugin_global_context")
574 # Reuse existing global context from middleware if available for consistency
575 # Otherwise create a new one (fallback for cases where middleware didn't run)
576 if plugin_global_context:
577 global_context = plugin_global_context
578 else:
579 request_id = user_context.get("request_id") or uuid.uuid4().hex
580 global_context = GlobalContext(
581 request_id=request_id,
582 server_id=None,
583 tenant_id=None,
584 )
586 # Invoke permission check hook, passing plugin contexts from HTTP_PRE_REQUEST hook
587 result, _ = await plugin_manager.invoke_hook(
588 HttpHookType.HTTP_AUTH_CHECK_PERMISSION,
589 payload=HttpAuthCheckPermissionPayload(
590 user_email=user_context["email"],
591 permission=permission,
592 resource_type=resource_type,
593 team_id=team_id,
594 is_admin=user_context.get("is_admin", False),
595 auth_method=user_context.get("auth_method"),
596 client_host=user_context.get("ip_address"),
597 user_agent=user_context.get("user_agent"),
598 ),
599 global_context=global_context,
600 local_contexts=plugin_context_table, # Pass context table for cross-hook state
601 )
603 # If a plugin made a decision, respect it
604 if result and result.modified_payload:
605 if result.modified_payload.granted:
606 logger.info(f"Permission granted by plugin: user={user_context['email']}, " f"permission={permission}, reason={result.modified_payload.reason}")
607 return await func(*args, **kwargs)
608 logger.warning(f"Permission denied by plugin: user={user_context['email']}, " f"permission={permission}, reason={result.modified_payload.reason}")
609 raise HTTPException(
610 status_code=status.HTTP_403_FORBIDDEN,
611 detail=f"Insufficient permissions. Required: {permission}",
612 )
614 # No plugin handled it, fall through to standard RBAC check
615 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh
616 db_session = kwargs.get("db") or user_context.get("db")
617 if db_session:
618 # Use existing session from endpoint or user_context
619 permission_service = PermissionService(db_session)
620 granted = await permission_service.check_permission(
621 user_email=user_context["email"],
622 permission=permission,
623 resource_type=resource_type,
624 team_id=team_id,
625 ip_address=user_context.get("ip_address"),
626 user_agent=user_context.get("user_agent"),
627 allow_admin_bypass=allow_admin_bypass,
628 check_any_team=check_any_team,
629 )
630 else:
631 # Create fresh db session for permission check
632 with fresh_db_session() as db:
633 permission_service = PermissionService(db)
634 granted = await permission_service.check_permission(
635 user_email=user_context["email"],
636 permission=permission,
637 resource_type=resource_type,
638 team_id=team_id,
639 ip_address=user_context.get("ip_address"),
640 user_agent=user_context.get("user_agent"),
641 allow_admin_bypass=allow_admin_bypass,
642 check_any_team=check_any_team,
643 )
645 if not granted:
646 logger.warning(f"Permission denied: user={user_context['email']}, permission={permission}, resource_type={resource_type}")
647 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required: {permission}")
649 # Permission granted, execute the original function
650 return await func(*args, **kwargs)
652 return wrapper
654 return decorator
657def require_admin_permission():
658 """Decorator to require admin permissions for accessing an endpoint.
660 Returns:
661 Callable: Decorated function that enforces admin permission requirement
663 Examples:
664 >>> decorator = require_admin_permission()
665 >>> callable(decorator)
666 True
668 Execute when admin permission granted:
669 >>> import asyncio
670 >>> class DummyPS:
671 ... def __init__(self, db):
672 ... pass
673 ... async def check_admin_permission(self, email):
674 ... return True
675 >>> @require_admin_permission()
676 ... async def demo(user=None):
677 ... return "admin-ok"
678 >>> from unittest.mock import patch
679 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS):
680 ... asyncio.run(demo(user={"email": "u", "db": object()}))
681 'admin-ok'
682 """
684 def decorator(func: Callable) -> Callable:
685 """Decorator function that wraps the original function with admin permission checking.
687 Args:
688 func: The function to be decorated
690 Returns:
691 Callable: The wrapped function with admin permission checking
692 """
694 @wraps(func)
695 async def wrapper(*args, **kwargs):
696 """Async wrapper function that performs admin permission check before calling original function.
698 Args:
699 *args: Positional arguments passed to the wrapped function
700 **kwargs: Keyword arguments passed to the wrapped function
702 Returns:
703 Any: Result from the wrapped function if admin permission check passes
705 Raises:
706 HTTPException: If user authentication or admin permission check fails
707 """
708 # Extract user context from named kwargs only (security: avoid picking up request body dicts)
709 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx")
710 if not user_context or not isinstance(user_context, dict) or "email" not in user_context:
711 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required")
713 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh
714 db_session = kwargs.get("db") or user_context.get("db")
715 if db_session:
716 # Use existing session from endpoint or user_context
717 permission_service = PermissionService(db_session)
718 has_admin_permission = await permission_service.check_admin_permission(user_context["email"])
719 else:
720 # Create fresh db session for permission check
721 with fresh_db_session() as db:
722 permission_service = PermissionService(db)
723 has_admin_permission = await permission_service.check_admin_permission(user_context["email"])
725 if not has_admin_permission:
726 logger.warning(f"Admin permission denied: user={user_context['email']}")
727 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permissions required")
729 # Admin permission granted, execute the original function
730 return await func(*args, **kwargs)
732 return wrapper
734 return decorator
737def require_any_permission(permissions: List[str], resource_type: Optional[str] = None, allow_admin_bypass: bool = True):
738 """Decorator to require any of the specified permissions for accessing an endpoint.
740 Args:
741 permissions: List of permissions, user needs at least one
742 resource_type: Optional resource type for resource-specific permissions
743 allow_admin_bypass: If True (default), admin users bypass all permission checks.
744 If False, even admins must have explicit permissions.
746 Returns:
747 Callable: Decorated function that enforces the permission requirements
749 Examples:
750 >>> decorator = require_any_permission(["tools.read", "tools.execute"], "tools")
751 >>> callable(decorator)
752 True
754 Execute when any permission granted:
755 >>> import asyncio
756 >>> class DummyPS:
757 ... def __init__(self, db):
758 ... pass
759 ... async def check_permission(self, **kwargs):
760 ... return True
761 >>> @require_any_permission(["tools.read", "tools.execute"], "tools")
762 ... async def demo(user=None):
763 ... return "any-ok"
764 >>> from unittest.mock import patch
765 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS):
766 ... asyncio.run(demo(user={"email": "u", "db": object()}))
767 'any-ok'
768 """
770 def decorator(func: Callable) -> Callable:
771 """Decorator function that wraps the original function with any-permission checking.
773 Args:
774 func: The function to be decorated
776 Returns:
777 Callable: The wrapped function with any-permission checking
778 """
780 @wraps(func)
781 async def wrapper(*args, **kwargs):
782 """Async wrapper function that performs any-permission check before calling original function.
784 Args:
785 *args: Positional arguments passed to the wrapped function
786 **kwargs: Keyword arguments passed to the wrapped function
788 Returns:
789 Any: Result from the wrapped function if any-permission check passes
791 Raises:
792 HTTPException: If user authentication or any-permission check fails
793 """
794 # Extract user context from named kwargs only (security: avoid picking up request body dicts)
795 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx")
796 if not user_context or not isinstance(user_context, dict) or "email" not in user_context:
797 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required")
799 # Extract team_id from path parameters if available
800 team_id = kwargs.get("team_id")
802 # If team_id is None or blank in kwargs then check
803 if not team_id:
804 # check if user_context has team_id
805 team_id = user_context.get("team_id", None)
807 # For multi-team session tokens (team_id is None), derive team from context
808 check_any_team = False
809 if not team_id and user_context.get("token_use") == "session":
810 db_session = kwargs.get("db") or user_context.get("db")
811 if db_session:
812 # Tier 1: Try to derive team from existing resource
813 team_id = _derive_team_from_resource(kwargs, db_session)
814 # Tier 3: Try to derive team from create payload / form
815 if team_id is None:
816 team_id = await _derive_team_from_payload(kwargs)
817 # If still no team_id: check if any permission is read-only
818 if not team_id:
819 # If ALL permissions are mutating, fail closed (team_id=None, global+personal only)
820 # If ANY permission is non-mutating, use check_any_team for broader access
821 if any(not _is_mutate_permission(p) for p in permissions):
822 check_any_team = True
824 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh
825 db_session = kwargs.get("db") or user_context.get("db")
826 if db_session:
827 # Use existing session from endpoint or user_context
828 permission_service = PermissionService(db_session)
829 # Check if user has any of the required permissions
830 granted = False
831 for permission in permissions:
832 if await permission_service.check_permission(
833 user_email=user_context["email"],
834 permission=permission,
835 resource_type=resource_type,
836 team_id=team_id,
837 ip_address=user_context.get("ip_address"),
838 user_agent=user_context.get("user_agent"),
839 allow_admin_bypass=allow_admin_bypass,
840 check_any_team=check_any_team,
841 ):
842 granted = True
843 break
844 else:
845 # Create fresh db session for permission check
846 with fresh_db_session() as db:
847 permission_service = PermissionService(db)
848 # Check if user has any of the required permissions
849 granted = False
850 for permission in permissions:
851 if await permission_service.check_permission(
852 user_email=user_context["email"],
853 permission=permission,
854 resource_type=resource_type,
855 team_id=team_id,
856 ip_address=user_context.get("ip_address"),
857 user_agent=user_context.get("user_agent"),
858 allow_admin_bypass=allow_admin_bypass,
859 check_any_team=check_any_team,
860 ):
861 granted = True
862 break
864 if not granted:
865 logger.warning(f"Permission denied: user={user_context['email']}, permissions={permissions}, resource_type={resource_type}")
866 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required one of: {', '.join(permissions)}")
868 # Permission granted, execute the original function
869 return await func(*args, **kwargs)
871 return wrapper
873 return decorator
876class PermissionChecker:
877 """Context manager for manual permission checking.
879 Useful for complex permission logic that can't be handled by decorators.
881 Examples:
882 >>> from unittest.mock import Mock
883 >>> checker = PermissionChecker({"email": "user@example.com", "db": Mock()})
884 >>> hasattr(checker, 'has_permission') and hasattr(checker, 'has_admin_permission')
885 True
886 """
888 def __init__(self, user_context: dict):
889 """Initialize permission checker with user context.
891 Args:
892 user_context: User context from get_current_user_with_permissions
893 """
894 self.user_context = user_context
895 self.db_session = user_context.get("db")
897 async def has_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None) -> bool:
898 """Check if user has specific permission.
900 Args:
901 permission: Permission to check
902 resource_type: Optional resource type
903 resource_id: Optional resource ID
904 team_id: Optional team context
906 Returns:
907 bool: True if user has permission
908 """
909 if self.db_session:
910 # Use existing session
911 permission_service = PermissionService(self.db_session)
912 return await permission_service.check_permission(
913 user_email=self.user_context["email"],
914 permission=permission,
915 resource_type=resource_type,
916 resource_id=resource_id,
917 team_id=team_id,
918 ip_address=self.user_context.get("ip_address"),
919 user_agent=self.user_context.get("user_agent"),
920 )
921 # Create fresh db session
922 with fresh_db_session() as db:
923 permission_service = PermissionService(db)
924 return await permission_service.check_permission(
925 user_email=self.user_context["email"],
926 permission=permission,
927 resource_type=resource_type,
928 resource_id=resource_id,
929 team_id=team_id,
930 ip_address=self.user_context.get("ip_address"),
931 user_agent=self.user_context.get("user_agent"),
932 )
934 async def has_admin_permission(self) -> bool:
935 """Check if user has admin permissions.
937 Returns:
938 bool: True if user has admin permissions
939 """
940 if self.db_session:
941 # Use existing session
942 permission_service = PermissionService(self.db_session)
943 return await permission_service.check_admin_permission(self.user_context["email"])
944 # Create fresh db session
945 with fresh_db_session() as db:
946 permission_service = PermissionService(db)
947 return await permission_service.check_admin_permission(self.user_context["email"])
949 async def has_any_permission(self, permissions: List[str], resource_type: Optional[str] = None, team_id: Optional[str] = None) -> bool:
950 """Check if user has any of the specified permissions.
952 Args:
953 permissions: List of permissions to check
954 resource_type: Optional resource type
955 team_id: Optional team context
957 Returns:
958 bool: True if user has at least one permission
959 """
960 if self.db_session:
961 # Use existing session for all checks
962 permission_service = PermissionService(self.db_session)
963 for permission in permissions:
964 if await permission_service.check_permission(
965 user_email=self.user_context["email"],
966 permission=permission,
967 resource_type=resource_type,
968 team_id=team_id,
969 ip_address=self.user_context.get("ip_address"),
970 user_agent=self.user_context.get("user_agent"),
971 ):
972 return True
973 return False
974 # Create single fresh session for all checks (avoid N sessions for N permissions)
975 with fresh_db_session() as db:
976 permission_service = PermissionService(db)
977 for permission in permissions:
978 if await permission_service.check_permission(
979 user_email=self.user_context["email"],
980 permission=permission,
981 resource_type=resource_type,
982 team_id=team_id,
983 ip_address=self.user_context.get("ip_address"),
984 user_agent=self.user_context.get("user_agent"),
985 ):
986 return True
987 return False
989 async def require_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None) -> None:
990 """Require specific permission, raise HTTPException if not granted.
992 Args:
993 permission: Required permission
994 resource_type: Optional resource type
995 resource_id: Optional resource ID
996 team_id: Optional team context
998 Raises:
999 HTTPException: If permission is not granted
1000 """
1001 if not await self.has_permission(permission, resource_type, resource_id, team_id):
1002 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required: {permission}")